Coverage Report

Created: 2026-03-30 13:24

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/pipeline/pipeline_fragment_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/pipeline/pipeline_fragment_context.h"
19
20
#include <gen_cpp/DataSinks_types.h>
21
#include <gen_cpp/PaloInternalService_types.h>
22
#include <gen_cpp/PlanNodes_types.h>
23
#include <pthread.h>
24
25
#include <algorithm>
26
#include <cstdlib>
27
// IWYU pragma: no_include <bits/chrono.h>
28
#include <fmt/format.h>
29
30
#include <chrono> // IWYU pragma: keep
31
#include <map>
32
#include <memory>
33
#include <ostream>
34
#include <utility>
35
36
#include "cloud/config.h"
37
#include "common/cast_set.h"
38
#include "common/config.h"
39
#include "common/exception.h"
40
#include "common/logging.h"
41
#include "common/status.h"
42
#include "exec/exchange/local_exchange_sink_operator.h"
43
#include "exec/exchange/local_exchange_source_operator.h"
44
#include "exec/exchange/local_exchanger.h"
45
#include "exec/exchange/vdata_stream_mgr.h"
46
#include "exec/operator/aggregation_sink_operator.h"
47
#include "exec/operator/aggregation_source_operator.h"
48
#include "exec/operator/analytic_sink_operator.h"
49
#include "exec/operator/analytic_source_operator.h"
50
#include "exec/operator/assert_num_rows_operator.h"
51
#include "exec/operator/blackhole_sink_operator.h"
52
#include "exec/operator/cache_sink_operator.h"
53
#include "exec/operator/cache_source_operator.h"
54
#include "exec/operator/datagen_operator.h"
55
#include "exec/operator/dict_sink_operator.h"
56
#include "exec/operator/distinct_streaming_aggregation_operator.h"
57
#include "exec/operator/empty_set_operator.h"
58
#include "exec/operator/es_scan_operator.h"
59
#include "exec/operator/exchange_sink_operator.h"
60
#include "exec/operator/exchange_source_operator.h"
61
#include "exec/operator/file_scan_operator.h"
62
#include "exec/operator/group_commit_block_sink_operator.h"
63
#include "exec/operator/group_commit_scan_operator.h"
64
#include "exec/operator/hashjoin_build_sink.h"
65
#include "exec/operator/hashjoin_probe_operator.h"
66
#include "exec/operator/hive_table_sink_operator.h"
67
#include "exec/operator/iceberg_delete_sink_operator.h"
68
#include "exec/operator/iceberg_merge_sink_operator.h"
69
#include "exec/operator/iceberg_table_sink_operator.h"
70
#include "exec/operator/jdbc_scan_operator.h"
71
#include "exec/operator/jdbc_table_sink_operator.h"
72
#include "exec/operator/local_merge_sort_source_operator.h"
73
#include "exec/operator/materialization_opertor.h"
74
#include "exec/operator/maxcompute_table_sink_operator.h"
75
#include "exec/operator/memory_scratch_sink_operator.h"
76
#include "exec/operator/meta_scan_operator.h"
77
#include "exec/operator/multi_cast_data_stream_sink.h"
78
#include "exec/operator/multi_cast_data_stream_source.h"
79
#include "exec/operator/nested_loop_join_build_operator.h"
80
#include "exec/operator/nested_loop_join_probe_operator.h"
81
#include "exec/operator/olap_scan_operator.h"
82
#include "exec/operator/olap_table_sink_operator.h"
83
#include "exec/operator/olap_table_sink_v2_operator.h"
84
#include "exec/operator/partition_sort_sink_operator.h"
85
#include "exec/operator/partition_sort_source_operator.h"
86
#include "exec/operator/partitioned_aggregation_sink_operator.h"
87
#include "exec/operator/partitioned_aggregation_source_operator.h"
88
#include "exec/operator/partitioned_hash_join_probe_operator.h"
89
#include "exec/operator/partitioned_hash_join_sink_operator.h"
90
#include "exec/operator/rec_cte_anchor_sink_operator.h"
91
#include "exec/operator/rec_cte_scan_operator.h"
92
#include "exec/operator/rec_cte_sink_operator.h"
93
#include "exec/operator/rec_cte_source_operator.h"
94
#include "exec/operator/repeat_operator.h"
95
#include "exec/operator/result_file_sink_operator.h"
96
#include "exec/operator/result_sink_operator.h"
97
#include "exec/operator/schema_scan_operator.h"
98
#include "exec/operator/select_operator.h"
99
#include "exec/operator/set_probe_sink_operator.h"
100
#include "exec/operator/set_sink_operator.h"
101
#include "exec/operator/set_source_operator.h"
102
#include "exec/operator/sort_sink_operator.h"
103
#include "exec/operator/sort_source_operator.h"
104
#include "exec/operator/spill_iceberg_table_sink_operator.h"
105
#include "exec/operator/spill_sort_sink_operator.h"
106
#include "exec/operator/spill_sort_source_operator.h"
107
#include "exec/operator/streaming_aggregation_operator.h"
108
#include "exec/operator/table_function_operator.h"
109
#include "exec/operator/tvf_table_sink_operator.h"
110
#include "exec/operator/union_sink_operator.h"
111
#include "exec/operator/union_source_operator.h"
112
#include "exec/pipeline/dependency.h"
113
#include "exec/pipeline/pipeline_task.h"
114
#include "exec/pipeline/task_scheduler.h"
115
#include "exec/runtime_filter/runtime_filter_mgr.h"
116
#include "exec/sort/topn_sorter.h"
117
#include "exec/spill/spill_file.h"
118
#include "io/fs/stream_load_pipe.h"
119
#include "load/stream_load/new_load_stream_mgr.h"
120
#include "runtime/exec_env.h"
121
#include "runtime/fragment_mgr.h"
122
#include "runtime/result_block_buffer.h"
123
#include "runtime/result_buffer_mgr.h"
124
#include "runtime/runtime_state.h"
125
#include "runtime/thread_context.h"
126
#include "service/backend_options.h"
127
#include "util/countdown_latch.h"
128
#include "util/debug_util.h"
129
#include "util/uid_util.h"
130
131
namespace doris {
132
#include "common/compile_check_begin.h"
133
PipelineFragmentContext::PipelineFragmentContext(
134
        TUniqueId query_id, const TPipelineFragmentParams& request,
135
        std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
136
        const std::function<void(RuntimeState*, Status*)>& call_back,
137
        report_status_callback report_status_cb)
138
335k
        : _query_id(std::move(query_id)),
139
335k
          _fragment_id(request.fragment_id),
140
335k
          _exec_env(exec_env),
141
335k
          _query_ctx(std::move(query_ctx)),
142
335k
          _call_back(call_back),
143
335k
          _is_report_on_cancel(true),
144
335k
          _report_status_cb(std::move(report_status_cb)),
145
335k
          _params(request),
146
335k
          _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0),
147
335k
          _need_notify_close(request.__isset.need_notify_close ? request.need_notify_close
148
335k
                                                               : false) {
149
335k
    _fragment_watcher.start();
150
335k
}
151
152
336k
PipelineFragmentContext::~PipelineFragmentContext() {
153
336k
    LOG_INFO("PipelineFragmentContext::~PipelineFragmentContext")
154
336k
            .tag("query_id", print_id(_query_id))
155
336k
            .tag("fragment_id", _fragment_id);
156
336k
    _release_resource();
157
336k
    {
158
        // The memory released by the query end is recorded in the query mem tracker.
159
336k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
160
336k
        _runtime_state.reset();
161
336k
        _query_ctx.reset();
162
336k
    }
163
336k
}
164
165
26
bool PipelineFragmentContext::is_timeout(timespec now) const {
166
26
    if (_timeout <= 0) {
167
0
        return false;
168
0
    }
169
26
    return _fragment_watcher.elapsed_time_seconds(now) > _timeout;
170
26
}
171
172
// notify_close() transitions the PFC from "waiting for external close notification" to
173
// "self-managed close". For recursive CTE fragments, the old PFC is kept alive until
174
// the rerun_fragment(wait_for_destroy) RPC calls this to trigger shutdown.
175
// Returns true if all tasks have already closed (i.e., the PFC can be safely destroyed).
176
8.95k
bool PipelineFragmentContext::notify_close() {
177
8.95k
    bool all_closed = false;
178
8.95k
    bool need_remove = false;
179
8.95k
    {
180
8.95k
        std::lock_guard<std::mutex> l(_task_mutex);
181
8.95k
        if (_closed_tasks >= _total_tasks) {
182
3.40k
            if (_need_notify_close) {
183
                // Fragment was cancelled and waiting for notify to close.
184
                // Record that we need to remove from fragment mgr, but do it
185
                // after releasing _task_mutex to avoid ABBA deadlock with
186
                // dump_pipeline_tasks() (which acquires _pipeline_map lock
187
                // first, then _task_mutex via debug_string()).
188
3.37k
                need_remove = true;
189
3.37k
            }
190
3.40k
            all_closed = true;
191
3.40k
        }
192
        // make fragment release by self after cancel
193
8.95k
        _need_notify_close = false;
194
8.95k
    }
195
8.95k
    if (need_remove) {
196
3.37k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
197
3.37k
    }
198
8.95k
    return all_closed;
199
8.95k
}
200
201
// Must not add lock in this method. Because it will call query ctx cancel. And
202
// QueryCtx cancel will call fragment ctx cancel. And Also Fragment ctx's running
203
// Method like exchange sink buffer will call query ctx cancel. If we add lock here
204
// There maybe dead lock.
205
5.47k
void PipelineFragmentContext::cancel(const Status reason) {
206
5.47k
    LOG_INFO("PipelineFragmentContext::cancel")
207
5.47k
            .tag("query_id", print_id(_query_id))
208
5.47k
            .tag("fragment_id", _fragment_id)
209
5.47k
            .tag("reason", reason.to_string());
210
5.47k
    if (notify_close()) {
211
47
        return;
212
47
    }
213
    // Timeout is a special error code, we need print current stack to debug timeout issue.
214
5.43k
    if (reason.is<ErrorCode::TIMEOUT>()) {
215
1
        auto dbg_str = fmt::format("PipelineFragmentContext is cancelled due to timeout:\n{}",
216
1
                                   debug_string());
217
1
        LOG_LONG_STRING(WARNING, dbg_str);
218
1
    }
219
220
    // `ILLEGAL_STATE` means queries this fragment belongs to was not found in FE (maybe finished)
221
5.43k
    if (reason.is<ErrorCode::ILLEGAL_STATE>()) {
222
0
        LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}",
223
0
                    debug_string());
224
0
    }
225
226
5.43k
    if (reason.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || reason.is<ErrorCode::MEM_ALLOC_FAILED>()) {
227
0
        print_profile("cancel pipeline, reason: " + reason.to_string());
228
0
    }
229
230
5.43k
    if (auto error_url = get_load_error_url(); !error_url.empty()) {
231
22
        _query_ctx->set_load_error_url(error_url);
232
22
    }
233
234
5.43k
    if (auto first_error_msg = get_first_error_msg(); !first_error_msg.empty()) {
235
22
        _query_ctx->set_first_error_msg(first_error_msg);
236
22
    }
237
238
5.43k
    _query_ctx->cancel(reason, _fragment_id);
239
5.43k
    if (reason.is<ErrorCode::LIMIT_REACH>()) {
240
315
        _is_report_on_cancel = false;
241
5.11k
    } else {
242
24.0k
        for (auto& id : _fragment_instance_ids) {
243
24.0k
            LOG(WARNING) << "PipelineFragmentContext cancel instance: " << print_id(id);
244
24.0k
        }
245
5.11k
    }
246
    // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
247
    // For stream load the fragment's query_id == load id, it is set in FE.
248
5.43k
    auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
249
5.43k
    if (stream_load_ctx != nullptr) {
250
30
        stream_load_ctx->pipe->cancel(reason.to_string());
251
        // Set error URL here because after pipe is cancelled, stream load execution may return early.
252
        // We need to set the error URL at this point to ensure error information is properly
253
        // propagated to the client.
254
30
        stream_load_ctx->error_url = get_load_error_url();
255
30
        stream_load_ctx->first_error_msg = get_first_error_msg();
256
30
    }
257
258
25.7k
    for (auto& tasks : _tasks) {
259
56.1k
        for (auto& task : tasks) {
260
56.1k
            task.first->unblock_all_dependencies();
261
56.1k
        }
262
25.7k
    }
263
5.43k
}
264
265
547k
PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
266
547k
    PipelineId id = _next_pipeline_id++;
267
547k
    auto pipeline = std::make_shared<Pipeline>(
268
547k
            id, parent ? std::min(parent->num_tasks(), _num_instances) : _num_instances,
269
547k
            parent ? parent->num_tasks() : _num_instances);
270
547k
    if (idx >= 0) {
271
113k
        _pipelines.insert(_pipelines.begin() + idx, pipeline);
272
433k
    } else {
273
433k
        _pipelines.emplace_back(pipeline);
274
433k
    }
275
547k
    if (parent) {
276
209k
        parent->set_children(pipeline);
277
209k
    }
278
547k
    return pipeline;
279
547k
}
280
281
335k
Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thread_pool) {
282
335k
    {
283
335k
        SCOPED_TIMER(_build_pipelines_timer);
284
        // 2. Build pipelines with operators in this fragment.
285
335k
        auto root_pipeline = add_pipeline();
286
335k
        RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl,
287
335k
                                         &_root_op, root_pipeline));
288
289
        // 3. Create sink operator
290
335k
        if (!_params.fragment.__isset.output_sink) {
291
0
            return Status::InternalError("No output sink in this fragment!");
292
0
        }
293
335k
        RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), _params.fragment.output_sink,
294
335k
                                          _params.fragment.output_exprs, _params,
295
335k
                                          root_pipeline->output_row_desc(), _runtime_state.get(),
296
335k
                                          *_desc_tbl, root_pipeline->id()));
297
335k
        RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink));
298
335k
        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
299
300
433k
        for (PipelinePtr& pipeline : _pipelines) {
301
18.4E
            DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size();
302
433k
            RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
303
433k
        }
304
335k
    }
305
    // 4. Build local exchanger
306
335k
    if (_runtime_state->enable_local_shuffle()) {
307
333k
        SCOPED_TIMER(_plan_local_exchanger_timer);
308
333k
        RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets,
309
333k
                                             _params.bucket_seq_to_instance_idx,
310
333k
                                             _params.shuffle_idx_to_instance_idx));
311
333k
    }
312
313
    // 5. Initialize global states in pipelines.
314
548k
    for (PipelinePtr& pipeline : _pipelines) {
315
548k
        SCOPED_TIMER(_prepare_all_pipelines_timer);
316
548k
        pipeline->children().clear();
317
548k
        RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
318
548k
    }
319
320
333k
    {
321
333k
        SCOPED_TIMER(_build_tasks_timer);
322
        // 6. Build pipeline tasks and initialize local state.
323
333k
        RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool));
324
333k
    }
325
326
333k
    return Status::OK();
327
333k
}
328
329
335k
Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) {
330
335k
    if (_prepared) {
331
0
        return Status::InternalError("Already prepared");
332
0
    }
333
335k
    if (_params.__isset.query_options && _params.query_options.__isset.execution_timeout) {
334
335k
        _timeout = _params.query_options.execution_timeout;
335
335k
    }
336
337
335k
    _fragment_level_profile = std::make_unique<RuntimeProfile>("PipelineContext");
338
335k
    _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime");
339
335k
    SCOPED_TIMER(_prepare_timer);
340
335k
    _build_pipelines_timer = ADD_TIMER(_fragment_level_profile, "BuildPipelinesTime");
341
335k
    _init_context_timer = ADD_TIMER(_fragment_level_profile, "InitContextTime");
342
335k
    _plan_local_exchanger_timer = ADD_TIMER(_fragment_level_profile, "PlanLocalLocalExchangerTime");
343
335k
    _build_tasks_timer = ADD_TIMER(_fragment_level_profile, "BuildTasksTime");
344
335k
    _prepare_all_pipelines_timer = ADD_TIMER(_fragment_level_profile, "PrepareAllPipelinesTime");
345
335k
    {
346
335k
        SCOPED_TIMER(_init_context_timer);
347
335k
        cast_set(_num_instances, _params.local_params.size());
348
335k
        _total_instances =
349
335k
                _params.__isset.total_instances ? _params.total_instances : _num_instances;
350
351
335k
        auto* fragment_context = this;
352
353
335k
        if (_params.query_options.__isset.is_report_success) {
354
332k
            fragment_context->set_is_report_success(_params.query_options.is_report_success);
355
332k
        }
356
357
        // 1. Set up the global runtime state.
358
335k
        _runtime_state = RuntimeState::create_unique(
359
335k
                _params.query_id, _params.fragment_id, _params.query_options,
360
335k
                _query_ctx->query_globals, _exec_env, _query_ctx.get());
361
335k
        _runtime_state->set_task_execution_context(shared_from_this());
362
335k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
363
335k
        if (_params.__isset.backend_id) {
364
333k
            _runtime_state->set_backend_id(_params.backend_id);
365
333k
        }
366
335k
        if (_params.__isset.import_label) {
367
237
            _runtime_state->set_import_label(_params.import_label);
368
237
        }
369
335k
        if (_params.__isset.db_name) {
370
189
            _runtime_state->set_db_name(_params.db_name);
371
189
        }
372
335k
        if (_params.__isset.load_job_id) {
373
0
            _runtime_state->set_load_job_id(_params.load_job_id);
374
0
        }
375
376
335k
        if (_params.is_simplified_param) {
377
119k
            _desc_tbl = _query_ctx->desc_tbl;
378
216k
        } else {
379
216k
            DCHECK(_params.__isset.desc_tbl);
380
216k
            RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), _params.desc_tbl,
381
216k
                                                  &_desc_tbl));
382
216k
        }
383
335k
        _runtime_state->set_desc_tbl(_desc_tbl);
384
335k
        _runtime_state->set_num_per_fragment_instances(_params.num_senders);
385
335k
        _runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
386
335k
        _runtime_state->set_total_load_streams(_params.total_load_streams);
387
335k
        _runtime_state->set_num_local_sink(_params.num_local_sink);
388
389
        // init fragment_instance_ids
390
335k
        const auto target_size = _params.local_params.size();
391
335k
        _fragment_instance_ids.resize(target_size);
392
1.46M
        for (size_t i = 0; i < _params.local_params.size(); i++) {
393
1.13M
            auto fragment_instance_id = _params.local_params[i].fragment_instance_id;
394
1.13M
            _fragment_instance_ids[i] = fragment_instance_id;
395
1.13M
        }
396
335k
    }
397
398
335k
    RETURN_IF_ERROR(_build_and_prepare_full_pipeline(thread_pool));
399
400
334k
    _init_next_report_time();
401
402
334k
    _prepared = true;
403
334k
    return Status::OK();
404
335k
}
405
406
Status PipelineFragmentContext::_build_pipeline_tasks_for_instance(
407
        int instance_idx,
408
1.13M
        const std::vector<std::shared_ptr<RuntimeProfile>>& pipeline_id_to_profile) {
409
1.13M
    const auto& local_params = _params.local_params[instance_idx];
410
1.13M
    auto fragment_instance_id = local_params.fragment_instance_id;
411
1.13M
    auto runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(false);
412
1.13M
    std::map<PipelineId, PipelineTask*> pipeline_id_to_task;
413
1.13M
    auto get_shared_state = [&](PipelinePtr pipeline)
414
1.13M
            -> std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
415
1.89M
                                       std::vector<std::shared_ptr<Dependency>>>> {
416
1.89M
        std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
417
1.89M
                                std::vector<std::shared_ptr<Dependency>>>>
418
1.89M
                shared_state_map;
419
2.60M
        for (auto& op : pipeline->operators()) {
420
2.60M
            auto source_id = op->operator_id();
421
2.60M
            if (auto iter = _op_id_to_shared_state.find(source_id);
422
2.60M
                iter != _op_id_to_shared_state.end()) {
423
790k
                shared_state_map.insert({source_id, iter->second});
424
790k
            }
425
2.60M
        }
426
1.89M
        for (auto sink_to_source_id : pipeline->sink()->dests_id()) {
427
1.88M
            if (auto iter = _op_id_to_shared_state.find(sink_to_source_id);
428
1.88M
                iter != _op_id_to_shared_state.end()) {
429
341k
                shared_state_map.insert({sink_to_source_id, iter->second});
430
341k
            }
431
1.88M
        }
432
1.89M
        return shared_state_map;
433
1.89M
    };
434
435
3.46M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
436
2.32M
        auto& pipeline = _pipelines[pip_idx];
437
2.32M
        if (pipeline->num_tasks() > 1 || instance_idx == 0) {
438
1.88M
            auto task_runtime_state = RuntimeState::create_unique(
439
1.88M
                    local_params.fragment_instance_id, _params.query_id, _params.fragment_id,
440
1.88M
                    _params.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get());
441
1.88M
            {
442
                // Initialize runtime state for this task
443
1.88M
                task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker());
444
445
1.88M
                task_runtime_state->set_task_execution_context(shared_from_this());
446
1.88M
                task_runtime_state->set_be_number(local_params.backend_num);
447
448
1.88M
                if (_params.__isset.backend_id) {
449
1.88M
                    task_runtime_state->set_backend_id(_params.backend_id);
450
1.88M
                }
451
1.88M
                if (_params.__isset.import_label) {
452
238
                    task_runtime_state->set_import_label(_params.import_label);
453
238
                }
454
1.88M
                if (_params.__isset.db_name) {
455
190
                    task_runtime_state->set_db_name(_params.db_name);
456
190
                }
457
1.88M
                if (_params.__isset.load_job_id) {
458
0
                    task_runtime_state->set_load_job_id(_params.load_job_id);
459
0
                }
460
1.88M
                if (_params.__isset.wal_id) {
461
114
                    task_runtime_state->set_wal_id(_params.wal_id);
462
114
                }
463
1.88M
                if (_params.__isset.content_length) {
464
31
                    task_runtime_state->set_content_length(_params.content_length);
465
31
                }
466
467
1.88M
                task_runtime_state->set_desc_tbl(_desc_tbl);
468
1.88M
                task_runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
469
1.88M
                task_runtime_state->set_num_per_fragment_instances(_params.num_senders);
470
1.88M
                task_runtime_state->resize_op_id_to_local_state(max_operator_id());
471
1.88M
                task_runtime_state->set_max_operator_id(max_operator_id());
472
1.88M
                task_runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
473
1.88M
                task_runtime_state->set_total_load_streams(_params.total_load_streams);
474
1.88M
                task_runtime_state->set_num_local_sink(_params.num_local_sink);
475
476
1.88M
                task_runtime_state->set_runtime_filter_mgr(runtime_filter_mgr.get());
477
1.88M
            }
478
1.88M
            auto cur_task_id = _total_tasks++;
479
1.88M
            task_runtime_state->set_task_id(cur_task_id);
480
1.88M
            task_runtime_state->set_task_num(pipeline->num_tasks());
481
1.88M
            auto task = std::make_shared<PipelineTask>(
482
1.88M
                    pipeline, cur_task_id, task_runtime_state.get(),
483
1.88M
                    std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
484
1.88M
                    pipeline_id_to_profile[pip_idx].get(), get_shared_state(pipeline),
485
1.88M
                    instance_idx);
486
1.88M
            pipeline->incr_created_tasks(instance_idx, task.get());
487
1.88M
            pipeline_id_to_task.insert({pipeline->id(), task.get()});
488
1.88M
            _tasks[instance_idx].emplace_back(
489
1.88M
                    std::pair<std::shared_ptr<PipelineTask>, std::unique_ptr<RuntimeState>> {
490
1.88M
                            std::move(task), std::move(task_runtime_state)});
491
1.88M
        }
492
2.32M
    }
493
494
    /**
495
         * Build DAG for pipeline tasks.
496
         * For example, we have
497
         *
498
         *   ExchangeSink (Pipeline1)     JoinBuildSink (Pipeline2)
499
         *            \                      /
500
         *          JoinProbeOperator1 (Pipeline1)    JoinBuildSink (Pipeline3)
501
         *                 \                          /
502
         *               JoinProbeOperator2 (Pipeline1)
503
         *
504
         * In this fragment, we have three pipelines and pipeline 1 depends on pipeline 2 and pipeline 3.
505
         * To build this DAG, `_dag` manage dependencies between pipelines by pipeline ID and
506
         * `pipeline_id_to_task` is used to find the task by a unique pipeline ID.
507
         *
508
         * Finally, we have two upstream dependencies in Pipeline1 corresponding to JoinProbeOperator1
509
         * and JoinProbeOperator2.
510
         */
511
2.33M
    for (auto& _pipeline : _pipelines) {
512
2.33M
        if (pipeline_id_to_task.contains(_pipeline->id())) {
513
1.88M
            auto* task = pipeline_id_to_task[_pipeline->id()];
514
1.88M
            DCHECK(task != nullptr);
515
516
            // If this task has upstream dependency, then inject it into this task.
517
1.88M
            if (_dag.contains(_pipeline->id())) {
518
1.19M
                auto& deps = _dag[_pipeline->id()];
519
1.97M
                for (auto& dep : deps) {
520
1.97M
                    if (pipeline_id_to_task.contains(dep)) {
521
1.07M
                        auto ss = pipeline_id_to_task[dep]->get_sink_shared_state();
522
1.07M
                        if (ss) {
523
399k
                            task->inject_shared_state(ss);
524
680k
                        } else {
525
680k
                            pipeline_id_to_task[dep]->inject_shared_state(
526
680k
                                    task->get_source_shared_state());
527
680k
                        }
528
1.07M
                    }
529
1.97M
                }
530
1.19M
            }
531
1.88M
        }
532
2.33M
    }
533
3.46M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
534
2.32M
        if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
535
1.88M
            auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
536
1.88M
            DCHECK(pipeline_id_to_profile[pip_idx]);
537
1.88M
            std::vector<TScanRangeParams> scan_ranges;
538
1.88M
            auto node_id = _pipelines[pip_idx]->operators().front()->node_id();
539
1.88M
            if (local_params.per_node_scan_ranges.contains(node_id)) {
540
266k
                scan_ranges = local_params.per_node_scan_ranges.find(node_id)->second;
541
266k
            }
542
1.88M
            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(task->prepare(scan_ranges, local_params.sender_id,
543
1.88M
                                                             _params.fragment.output_sink));
544
1.88M
        }
545
2.32M
    }
546
1.13M
    {
547
1.13M
        std::lock_guard<std::mutex> l(_state_map_lock);
548
1.13M
        _runtime_filter_mgr_map[instance_idx] = std::move(runtime_filter_mgr);
549
1.13M
    }
550
1.13M
    return Status::OK();
551
1.13M
}
552
553
335k
Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
554
335k
    _total_tasks = 0;
555
335k
    _closed_tasks = 0;
556
335k
    const auto target_size = _params.local_params.size();
557
335k
    _tasks.resize(target_size);
558
335k
    _runtime_filter_mgr_map.resize(target_size);
559
882k
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
560
547k
        _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get();
561
547k
    }
562
335k
    auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size());
563
564
335k
    if (target_size > 1 &&
565
335k
        (_runtime_state->query_options().__isset.parallel_prepare_threshold &&
566
137k
         target_size > _runtime_state->query_options().parallel_prepare_threshold)) {
567
        // If instances parallelism is big enough ( > parallel_prepare_threshold), we will prepare all tasks by multi-threads
568
15.4k
        std::vector<Status> prepare_status(target_size);
569
15.4k
        int submitted_tasks = 0;
570
15.4k
        Status submit_status;
571
15.4k
        CountDownLatch latch((int)target_size);
572
208k
        for (int i = 0; i < target_size; i++) {
573
193k
            submit_status = thread_pool->submit_func([&, i]() {
574
193k
                SCOPED_ATTACH_TASK(_query_ctx.get());
575
193k
                prepare_status[i] = _build_pipeline_tasks_for_instance(i, pipeline_id_to_profile);
576
193k
                latch.count_down();
577
193k
            });
578
193k
            if (LIKELY(submit_status.ok())) {
579
193k
                submitted_tasks++;
580
18.4E
            } else {
581
18.4E
                break;
582
18.4E
            }
583
193k
        }
584
15.4k
        latch.arrive_and_wait(target_size - submitted_tasks);
585
15.4k
        if (UNLIKELY(!submit_status.ok())) {
586
0
            return submit_status;
587
0
        }
588
208k
        for (int i = 0; i < submitted_tasks; i++) {
589
193k
            if (!prepare_status[i].ok()) {
590
0
                return prepare_status[i];
591
0
            }
592
193k
        }
593
319k
    } else {
594
1.26M
        for (int i = 0; i < target_size; i++) {
595
943k
            RETURN_IF_ERROR(_build_pipeline_tasks_for_instance(i, pipeline_id_to_profile));
596
943k
        }
597
319k
    }
598
335k
    _pipeline_parent_map.clear();
599
335k
    _op_id_to_shared_state.clear();
600
601
335k
    return Status::OK();
602
335k
}
603
604
333k
void PipelineFragmentContext::_init_next_report_time() {
605
333k
    auto interval_s = config::pipeline_status_report_interval;
606
333k
    if (_is_report_success && interval_s > 0 && _timeout > interval_s) {
607
31.3k
        VLOG_FILE << "enable period report: fragment id=" << _fragment_id;
608
31.3k
        uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC;
609
        // We don't want to wait longer than it takes to run the entire fragment.
610
31.3k
        _previous_report_time =
611
31.3k
                MonotonicNanos() + report_fragment_offset - (uint64_t)(interval_s)*NANOS_PER_SEC;
612
31.3k
        _disable_period_report = false;
613
31.3k
    }
614
333k
}
615
616
3.83k
void PipelineFragmentContext::refresh_next_report_time() {
617
3.83k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
618
3.83k
    DCHECK(disable == true);
619
3.83k
    _previous_report_time.store(MonotonicNanos(), std::memory_order_release);
620
3.83k
    _disable_period_report.compare_exchange_strong(disable, false);
621
3.83k
}
622
623
6.41M
void PipelineFragmentContext::trigger_report_if_necessary() {
624
6.41M
    if (!_is_report_success) {
625
6.02M
        return;
626
6.02M
    }
627
394k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
628
394k
    if (disable) {
629
6.43k
        return;
630
6.43k
    }
631
388k
    int32_t interval_s = config::pipeline_status_report_interval;
632
388k
    if (interval_s <= 0) {
633
0
        LOG(WARNING) << "config::status_report_interval is equal to or less than zero, do not "
634
0
                        "trigger "
635
0
                        "report.";
636
0
    }
637
388k
    uint64_t next_report_time = _previous_report_time.load(std::memory_order_acquire) +
638
388k
                                (uint64_t)(interval_s)*NANOS_PER_SEC;
639
388k
    if (MonotonicNanos() > next_report_time) {
640
3.84k
        if (!_disable_period_report.compare_exchange_strong(disable, true,
641
3.84k
                                                            std::memory_order_acq_rel)) {
642
10
            return;
643
10
        }
644
3.83k
        if (VLOG_FILE_IS_ON) {
645
0
            VLOG_FILE << "Reporting "
646
0
                      << "profile for query_id " << print_id(_query_id)
647
0
                      << ", fragment id: " << _fragment_id;
648
649
0
            std::stringstream ss;
650
0
            _runtime_state->runtime_profile()->compute_time_in_profile();
651
0
            _runtime_state->runtime_profile()->pretty_print(&ss);
652
0
            if (_runtime_state->load_channel_profile()) {
653
0
                _runtime_state->load_channel_profile()->pretty_print(&ss);
654
0
            }
655
656
0
            VLOG_FILE << "Query " << print_id(get_query_id()) << " fragment " << get_fragment_id()
657
0
                      << " profile:\n"
658
0
                      << ss.str();
659
0
        }
660
3.83k
        auto st = send_report(false);
661
3.83k
        if (!st.ok()) {
662
0
            disable = true;
663
0
            _disable_period_report.compare_exchange_strong(disable, false,
664
0
                                                           std::memory_order_acq_rel);
665
0
        }
666
3.83k
    }
667
388k
}
668
669
Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const DescriptorTbl& descs,
670
331k
                                                 OperatorPtr* root, PipelinePtr cur_pipe) {
671
331k
    if (_params.fragment.plan.nodes.empty()) {
672
0
        throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no plan node!");
673
0
    }
674
675
331k
    int node_idx = 0;
676
677
331k
    RETURN_IF_ERROR(_create_tree_helper(pool, _params.fragment.plan.nodes, descs, nullptr,
678
331k
                                        &node_idx, root, cur_pipe, 0, false, false));
679
680
331k
    if (node_idx + 1 != _params.fragment.plan.nodes.size()) {
681
0
        return Status::InternalError(
682
0
                "Plan tree only partially reconstructed. Not all thrift nodes were used.");
683
0
    }
684
331k
    return Status::OK();
685
331k
}
686
687
Status PipelineFragmentContext::_create_tree_helper(
688
        ObjectPool* pool, const std::vector<TPlanNode>& tnodes, const DescriptorTbl& descs,
689
        OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx,
690
535k
        const bool followed_by_shuffled_operator, const bool require_bucket_distribution) {
691
    // propagate error case
692
535k
    if (*node_idx >= tnodes.size()) {
693
0
        return Status::InternalError(
694
0
                "Failed to reconstruct plan tree from thrift. Node id: {}, number of nodes: {}",
695
0
                *node_idx, tnodes.size());
696
0
    }
697
535k
    const TPlanNode& tnode = tnodes[*node_idx];
698
699
535k
    int num_children = tnodes[*node_idx].num_children;
700
535k
    bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
701
535k
    bool current_require_bucket_distribution = require_bucket_distribution;
702
    // TODO: Create CacheOperator is confused now
703
535k
    OperatorPtr op = nullptr;
704
535k
    OperatorPtr cache_op = nullptr;
705
535k
    RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op, cur_pipe,
706
535k
                                     parent == nullptr ? -1 : parent->node_id(), child_idx,
707
535k
                                     followed_by_shuffled_operator,
708
535k
                                     current_require_bucket_distribution, cache_op));
709
    // Initialization must be done here. For example, group by expressions in agg will be used to
710
    // decide if a local shuffle should be planed, so it must be initialized here.
711
535k
    RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
712
    // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
713
535k
    if (parent != nullptr) {
714
        // add to parent's child(s)
715
204k
        RETURN_IF_ERROR(parent->set_child(cache_op ? cache_op : op));
716
331k
    } else {
717
331k
        *root = op;
718
331k
    }
719
    /**
720
     * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
721
     *
722
     * For plan:
723
     * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
724
     *                           Exchange(id=3) -> ShuffledHashJoinBuild(id=2)
725
     * We must ensure data distribution of `LocalExchange(id=0)` is same as Exchange(id=3).
726
     *
727
     * If an operator's is followed by a local exchange without shuffle (e.g. passthrough), a
728
     * shuffled local exchanger will be used before join so it is not followed by shuffle join.
729
     */
730
535k
    auto required_data_distribution =
731
535k
            cur_pipe->operators().empty()
732
535k
                    ? cur_pipe->sink()->required_data_distribution(_runtime_state.get())
733
535k
                    : op->required_data_distribution(_runtime_state.get());
734
535k
    current_followed_by_shuffled_operator =
735
535k
            ((followed_by_shuffled_operator ||
736
535k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_shuffled_operator()
737
480k
                                             : op->is_shuffled_operator())) &&
738
535k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
739
535k
            (followed_by_shuffled_operator &&
740
429k
             required_data_distribution.distribution_type == ExchangeType::NOOP);
741
742
535k
    current_require_bucket_distribution =
743
535k
            ((require_bucket_distribution ||
744
535k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_colocated_operator()
745
484k
                                             : op->is_colocated_operator())) &&
746
535k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
747
535k
            (require_bucket_distribution &&
748
435k
             required_data_distribution.distribution_type == ExchangeType::NOOP);
749
750
535k
    if (num_children == 0) {
751
347k
        _use_serial_source = op->is_serial_operator();
752
347k
    }
753
    // rely on that tnodes is preorder of the plan
754
739k
    for (int i = 0; i < num_children; i++) {
755
204k
        ++*node_idx;
756
204k
        RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, descs, op, node_idx, nullptr, cur_pipe, i,
757
204k
                                            current_followed_by_shuffled_operator,
758
204k
                                            current_require_bucket_distribution));
759
760
        // we are expecting a child, but have used all nodes
761
        // this means we have been given a bad tree and must fail
762
204k
        if (*node_idx >= tnodes.size()) {
763
0
            return Status::InternalError(
764
0
                    "Failed to reconstruct plan tree from thrift. Node id: {}, number of "
765
0
                    "nodes: {}",
766
0
                    *node_idx, tnodes.size());
767
0
        }
768
204k
    }
769
770
535k
    return Status::OK();
771
535k
}
772
773
void PipelineFragmentContext::_inherit_pipeline_properties(
774
        const DataDistribution& data_distribution, PipelinePtr pipe_with_source,
775
113k
        PipelinePtr pipe_with_sink) {
776
113k
    pipe_with_sink->set_num_tasks(pipe_with_source->num_tasks());
777
113k
    pipe_with_source->set_num_tasks(_num_instances);
778
113k
    pipe_with_source->set_data_distribution(data_distribution);
779
113k
}
780
781
Status PipelineFragmentContext::_add_local_exchange_impl(
782
        int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip,
783
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
784
        const std::map<int, int>& bucket_seq_to_instance_idx,
785
113k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
786
113k
    auto& operators = cur_pipe->operators();
787
113k
    const auto downstream_pipeline_id = cur_pipe->id();
788
113k
    auto local_exchange_id = next_operator_id();
789
    // 1. Create a new pipeline with local exchange sink.
790
113k
    DataSinkOperatorPtr sink;
791
113k
    auto sink_id = next_sink_operator_id();
792
793
    /**
794
     * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
795
     * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
796
     */
797
113k
    const bool followed_by_shuffled_operator =
798
113k
            operators.size() > idx ? operators[idx]->followed_by_shuffled_operator()
799
113k
                                   : cur_pipe->sink()->followed_by_shuffled_operator();
800
113k
    const bool use_global_hash_shuffle = bucket_seq_to_instance_idx.empty() &&
801
113k
                                         !shuffle_idx_to_instance_idx.contains(-1) &&
802
113k
                                         followed_by_shuffled_operator && !_use_serial_source;
803
113k
    sink = std::make_shared<LocalExchangeSinkOperatorX>(
804
113k
            sink_id, local_exchange_id, use_global_hash_shuffle ? _total_instances : _num_instances,
805
113k
            data_distribution.partition_exprs, bucket_seq_to_instance_idx);
806
113k
    if (bucket_seq_to_instance_idx.empty() &&
807
113k
        data_distribution.distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE) {
808
38
        data_distribution.distribution_type = ExchangeType::HASH_SHUFFLE;
809
38
    }
810
113k
    RETURN_IF_ERROR(new_pip->set_sink(sink));
811
113k
    RETURN_IF_ERROR(new_pip->sink()->init(_runtime_state.get(), data_distribution.distribution_type,
812
113k
                                          num_buckets, use_global_hash_shuffle,
813
113k
                                          shuffle_idx_to_instance_idx));
814
815
    // 2. Create and initialize LocalExchangeSharedState.
816
113k
    std::shared_ptr<LocalExchangeSharedState> shared_state =
817
113k
            LocalExchangeSharedState::create_shared(_num_instances);
818
113k
    switch (data_distribution.distribution_type) {
819
21.9k
    case ExchangeType::HASH_SHUFFLE:
820
21.9k
        shared_state->exchanger = ShuffleExchanger::create_unique(
821
21.9k
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
822
21.9k
                use_global_hash_shuffle ? _total_instances : _num_instances,
823
21.9k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
824
21.9k
                        ? cast_set<int>(
825
21.9k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
826
21.9k
                        : 0);
827
21.9k
        break;
828
815
    case ExchangeType::BUCKET_HASH_SHUFFLE:
829
815
        shared_state->exchanger = BucketShuffleExchanger::create_unique(
830
815
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, num_buckets,
831
815
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
832
815
                        ? cast_set<int>(
833
815
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
834
815
                        : 0);
835
815
        break;
836
87.0k
    case ExchangeType::PASSTHROUGH:
837
87.0k
        shared_state->exchanger = PassthroughExchanger::create_unique(
838
87.0k
                cur_pipe->num_tasks(), _num_instances,
839
87.0k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
840
87.0k
                        ? cast_set<int>(
841
86.9k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
842
87.0k
                        : 0);
843
87.0k
        break;
844
402
    case ExchangeType::BROADCAST:
845
402
        shared_state->exchanger = BroadcastExchanger::create_unique(
846
402
                cur_pipe->num_tasks(), _num_instances,
847
402
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
848
402
                        ? cast_set<int>(
849
402
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
850
402
                        : 0);
851
402
        break;
852
2.39k
    case ExchangeType::PASS_TO_ONE:
853
2.39k
        if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
854
            // If shared hash table is enabled for BJ, hash table will be built by only one task
855
1.05k
            shared_state->exchanger = PassToOneExchanger::create_unique(
856
1.05k
                    cur_pipe->num_tasks(), _num_instances,
857
1.05k
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
858
1.05k
                            ? cast_set<int>(_runtime_state->query_options()
859
1.05k
                                                    .local_exchange_free_blocks_limit)
860
1.05k
                            : 0);
861
1.33k
        } else {
862
1.33k
            shared_state->exchanger = BroadcastExchanger::create_unique(
863
1.33k
                    cur_pipe->num_tasks(), _num_instances,
864
1.33k
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
865
1.33k
                            ? cast_set<int>(_runtime_state->query_options()
866
1.33k
                                                    .local_exchange_free_blocks_limit)
867
1.33k
                            : 0);
868
1.33k
        }
869
2.39k
        break;
870
828
    case ExchangeType::ADAPTIVE_PASSTHROUGH:
871
828
        shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
872
828
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
873
828
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
874
828
                        ? cast_set<int>(
875
828
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
876
828
                        : 0);
877
828
        break;
878
0
    default:
879
0
        return Status::InternalError("Unsupported local exchange type : " +
880
0
                                     std::to_string((int)data_distribution.distribution_type));
881
113k
    }
882
113k
    shared_state->create_source_dependencies(_num_instances, local_exchange_id, local_exchange_id,
883
113k
                                             "LOCAL_EXCHANGE_OPERATOR");
884
113k
    shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
885
113k
    _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
886
887
    // 3. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to
888
    // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink].
889
890
    // 3.1 Initialize new pipeline's operator list.
891
113k
    std::copy(operators.begin(), operators.begin() + idx,
892
113k
              std::inserter(new_pip->operators(), new_pip->operators().end()));
893
894
    // 3.2 Erase unused operators in previous pipeline.
895
113k
    operators.erase(operators.begin(), operators.begin() + idx);
896
897
    // 4. Initialize LocalExchangeSource and insert it into this pipeline.
898
113k
    OperatorPtr source_op;
899
113k
    source_op = std::make_shared<LocalExchangeSourceOperatorX>(pool, local_exchange_id);
900
113k
    RETURN_IF_ERROR(source_op->set_child(new_pip->operators().back()));
901
113k
    RETURN_IF_ERROR(source_op->init(data_distribution.distribution_type));
902
113k
    if (!operators.empty()) {
903
50.5k
        RETURN_IF_ERROR(operators.front()->set_child(nullptr));
904
50.5k
        RETURN_IF_ERROR(operators.front()->set_child(source_op));
905
50.5k
    }
906
113k
    operators.insert(operators.begin(), source_op);
907
908
    // 5. Set children for two pipelines separately.
909
113k
    std::vector<std::shared_ptr<Pipeline>> new_children;
910
113k
    std::vector<PipelineId> edges_with_source;
911
130k
    for (auto child : cur_pipe->children()) {
912
130k
        bool found = false;
913
144k
        for (auto op : new_pip->operators()) {
914
144k
            if (child->sink()->node_id() == op->node_id()) {
915
12.1k
                new_pip->set_children(child);
916
12.1k
                found = true;
917
12.1k
            };
918
144k
        }
919
130k
        if (!found) {
920
118k
            new_children.push_back(child);
921
118k
            edges_with_source.push_back(child->id());
922
118k
        }
923
130k
    }
924
113k
    new_children.push_back(new_pip);
925
113k
    edges_with_source.push_back(new_pip->id());
926
927
    // 6. Set DAG for new pipelines.
928
113k
    if (!new_pip->children().empty()) {
929
6.82k
        std::vector<PipelineId> edges_with_sink;
930
12.1k
        for (auto child : new_pip->children()) {
931
12.1k
            edges_with_sink.push_back(child->id());
932
12.1k
        }
933
6.82k
        _dag.insert({new_pip->id(), edges_with_sink});
934
6.82k
    }
935
113k
    cur_pipe->set_children(new_children);
936
113k
    _dag[downstream_pipeline_id] = edges_with_source;
937
113k
    RETURN_IF_ERROR(new_pip->sink()->set_child(new_pip->operators().back()));
938
113k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(nullptr));
939
113k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(cur_pipe->operators().back()));
940
941
    // 7. Inherit properties from current pipeline.
942
113k
    _inherit_pipeline_properties(data_distribution, cur_pipe, new_pip);
943
113k
    return Status::OK();
944
113k
}
945
946
Status PipelineFragmentContext::_add_local_exchange(
947
        int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe,
948
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
949
        const std::map<int, int>& bucket_seq_to_instance_idx,
950
167k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
951
167k
    if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) {
952
22.3k
        return Status::OK();
953
22.3k
    }
954
955
144k
    if (!cur_pipe->need_to_local_exchange(data_distribution, idx)) {
956
53.6k
        return Status::OK();
957
53.6k
    }
958
91.3k
    *do_local_exchange = true;
959
960
91.3k
    auto& operators = cur_pipe->operators();
961
91.3k
    auto total_op_num = operators.size();
962
91.3k
    auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
963
91.3k
    RETURN_IF_ERROR(_add_local_exchange_impl(
964
91.3k
            idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets,
965
91.3k
            bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
966
967
91.3k
    CHECK(total_op_num + 1 == cur_pipe->operators().size() + new_pip->operators().size())
968
522
            << "total_op_num: " << total_op_num
969
522
            << " cur_pipe->operators().size(): " << cur_pipe->operators().size()
970
522
            << " new_pip->operators().size(): " << new_pip->operators().size();
971
972
    // There are some local shuffles with relatively heavy operations on the sink.
973
    // If the local sink concurrency is 1 and the local source concurrency is n, the sink becomes a bottleneck.
974
    // Therefore, local passthrough is used to increase the concurrency of the sink.
975
    // op -> local sink(1) -> local source (n)
976
    // op -> local passthrough(1) -> local passthrough(n) ->  local sink(n) -> local source (n)
977
91.3k
    if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 &&
978
91.3k
        Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) {
979
22.5k
        RETURN_IF_ERROR(_add_local_exchange_impl(
980
22.5k
                cast_set<int>(new_pip->operators().size()), pool, new_pip,
981
22.5k
                add_pipeline(new_pip, pip_idx + 2), DataDistribution(ExchangeType::PASSTHROUGH),
982
22.5k
                do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
983
22.5k
                shuffle_idx_to_instance_idx));
984
22.5k
    }
985
91.3k
    return Status::OK();
986
91.3k
}
987
988
Status PipelineFragmentContext::_plan_local_exchange(
989
        int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx,
990
332k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
991
764k
    for (int pip_idx = cast_set<int>(_pipelines.size()) - 1; pip_idx >= 0; pip_idx--) {
992
431k
        _pipelines[pip_idx]->init_data_distribution(_runtime_state.get());
993
        // Set property if child pipeline is not join operator's child.
994
431k
        if (!_pipelines[pip_idx]->children().empty()) {
995
96.3k
            for (auto& child : _pipelines[pip_idx]->children()) {
996
96.3k
                if (child->sink()->node_id() ==
997
96.3k
                    _pipelines[pip_idx]->operators().front()->node_id()) {
998
84.4k
                    _pipelines[pip_idx]->set_data_distribution(child->data_distribution());
999
84.4k
                }
1000
96.3k
            }
1001
92.2k
        }
1002
1003
        // if 'num_buckets == 0' means the fragment is colocated by exchange node not the
1004
        // scan node. so here use `_num_instance` to replace the `num_buckets` to prevent dividing 0
1005
        // still keep colocate plan after local shuffle
1006
431k
        RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx, _pipelines[pip_idx],
1007
431k
                                             bucket_seq_to_instance_idx,
1008
431k
                                             shuffle_idx_to_instance_idx));
1009
431k
    }
1010
332k
    return Status::OK();
1011
332k
}
1012
1013
Status PipelineFragmentContext::_plan_local_exchange(
1014
        int num_buckets, int pip_idx, PipelinePtr pip,
1015
        const std::map<int, int>& bucket_seq_to_instance_idx,
1016
430k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1017
430k
    int idx = 1;
1018
430k
    bool do_local_exchange = false;
1019
481k
    do {
1020
481k
        auto& ops = pip->operators();
1021
481k
        do_local_exchange = false;
1022
        // Plan local exchange for each operator.
1023
539k
        for (; idx < ops.size();) {
1024
108k
            if (ops[idx]->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1025
104k
                RETURN_IF_ERROR(_add_local_exchange(
1026
104k
                        pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip,
1027
104k
                        ops[idx]->required_data_distribution(_runtime_state.get()),
1028
104k
                        &do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
1029
104k
                        shuffle_idx_to_instance_idx));
1030
104k
            }
1031
108k
            if (do_local_exchange) {
1032
                // If local exchange is needed for current operator, we will split this pipeline to
1033
                // two pipelines by local exchange sink/source. And then we need to process remaining
1034
                // operators in this pipeline so we set idx to 2 (0 is local exchange source and 1
1035
                // is current operator was already processed) and continue to plan local exchange.
1036
50.6k
                idx = 2;
1037
50.6k
                break;
1038
50.6k
            }
1039
57.6k
            idx++;
1040
57.6k
        }
1041
481k
    } while (do_local_exchange);
1042
430k
    if (pip->sink()->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1043
63.3k
        RETURN_IF_ERROR(_add_local_exchange(
1044
63.3k
                pip_idx, idx, pip->sink()->node_id(), _runtime_state->obj_pool(), pip,
1045
63.3k
                pip->sink()->required_data_distribution(_runtime_state.get()), &do_local_exchange,
1046
63.3k
                num_buckets, bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1047
63.3k
    }
1048
430k
    return Status::OK();
1049
430k
}
1050
1051
Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
1052
                                                  const std::vector<TExpr>& output_exprs,
1053
                                                  const TPipelineFragmentParams& params,
1054
                                                  const RowDescriptor& row_desc,
1055
                                                  RuntimeState* state, DescriptorTbl& desc_tbl,
1056
334k
                                                  PipelineId cur_pipeline_id) {
1057
334k
    switch (thrift_sink.type) {
1058
118k
    case TDataSinkType::DATA_STREAM_SINK: {
1059
118k
        if (!thrift_sink.__isset.stream_sink) {
1060
0
            return Status::InternalError("Missing data stream sink.");
1061
0
        }
1062
118k
        _sink = std::make_shared<ExchangeSinkOperatorX>(
1063
118k
                state, row_desc, next_sink_operator_id(), thrift_sink.stream_sink,
1064
118k
                params.destinations, _fragment_instance_ids);
1065
118k
        break;
1066
118k
    }
1067
186k
    case TDataSinkType::RESULT_SINK: {
1068
186k
        if (!thrift_sink.__isset.result_sink) {
1069
0
            return Status::InternalError("Missing data buffer sink.");
1070
0
        }
1071
1072
186k
        _sink = std::make_shared<ResultSinkOperatorX>(next_sink_operator_id(), row_desc,
1073
186k
                                                      output_exprs, thrift_sink.result_sink);
1074
186k
        break;
1075
186k
    }
1076
105
    case TDataSinkType::DICTIONARY_SINK: {
1077
105
        if (!thrift_sink.__isset.dictionary_sink) {
1078
0
            return Status::InternalError("Missing dict sink.");
1079
0
        }
1080
1081
105
        _sink = std::make_shared<DictSinkOperatorX>(next_sink_operator_id(), row_desc, output_exprs,
1082
105
                                                    thrift_sink.dictionary_sink);
1083
105
        break;
1084
105
    }
1085
0
    case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
1086
28.1k
    case TDataSinkType::OLAP_TABLE_SINK: {
1087
28.1k
        if (state->query_options().enable_memtable_on_sink_node &&
1088
28.1k
            !_has_inverted_index_v1_or_partial_update(thrift_sink.olap_table_sink) &&
1089
28.1k
            !config::is_cloud_mode()) {
1090
130
            _sink = std::make_shared<OlapTableSinkV2OperatorX>(pool, next_sink_operator_id(),
1091
130
                                                               row_desc, output_exprs);
1092
28.0k
        } else {
1093
28.0k
            _sink = std::make_shared<OlapTableSinkOperatorX>(pool, next_sink_operator_id(),
1094
28.0k
                                                             row_desc, output_exprs);
1095
28.0k
        }
1096
28.1k
        break;
1097
0
    }
1098
165
    case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
1099
165
        DCHECK(thrift_sink.__isset.olap_table_sink);
1100
165
        DCHECK(state->get_query_ctx() != nullptr);
1101
165
        state->get_query_ctx()->query_mem_tracker()->is_group_commit_load = true;
1102
165
        _sink = std::make_shared<GroupCommitBlockSinkOperatorX>(next_sink_operator_id(), row_desc,
1103
165
                                                                output_exprs);
1104
165
        break;
1105
0
    }
1106
0
    case TDataSinkType::HIVE_TABLE_SINK: {
1107
0
        if (!thrift_sink.__isset.hive_table_sink) {
1108
0
            return Status::InternalError("Missing hive table sink.");
1109
0
        }
1110
0
        _sink = std::make_shared<HiveTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1111
0
                                                         output_exprs);
1112
0
        break;
1113
0
    }
1114
0
    case TDataSinkType::ICEBERG_TABLE_SINK: {
1115
0
        if (!thrift_sink.__isset.iceberg_table_sink) {
1116
0
            return Status::InternalError("Missing iceberg table sink.");
1117
0
        }
1118
0
        if (thrift_sink.iceberg_table_sink.__isset.sort_info) {
1119
0
            _sink = std::make_shared<SpillIcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1120
0
                                                                     row_desc, output_exprs);
1121
0
        } else {
1122
0
            _sink = std::make_shared<IcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1123
0
                                                                row_desc, output_exprs);
1124
0
        }
1125
0
        break;
1126
0
    }
1127
0
    case TDataSinkType::ICEBERG_DELETE_SINK: {
1128
0
        if (!thrift_sink.__isset.iceberg_delete_sink) {
1129
0
            return Status::InternalError("Missing iceberg delete sink.");
1130
0
        }
1131
0
        _sink = std::make_shared<IcebergDeleteSinkOperatorX>(pool, next_sink_operator_id(),
1132
0
                                                             row_desc, output_exprs);
1133
0
        break;
1134
0
    }
1135
0
    case TDataSinkType::ICEBERG_MERGE_SINK: {
1136
0
        if (!thrift_sink.__isset.iceberg_merge_sink) {
1137
0
            return Status::InternalError("Missing iceberg merge sink.");
1138
0
        }
1139
0
        _sink = std::make_shared<IcebergMergeSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1140
0
                                                            output_exprs);
1141
0
        break;
1142
0
    }
1143
0
    case TDataSinkType::MAXCOMPUTE_TABLE_SINK: {
1144
0
        if (!thrift_sink.__isset.max_compute_table_sink) {
1145
0
            return Status::InternalError("Missing max compute table sink.");
1146
0
        }
1147
0
        _sink = std::make_shared<MCTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1148
0
                                                       output_exprs);
1149
0
        break;
1150
0
    }
1151
0
    case TDataSinkType::JDBC_TABLE_SINK: {
1152
0
        if (!thrift_sink.__isset.jdbc_table_sink) {
1153
0
            return Status::InternalError("Missing data jdbc sink.");
1154
0
        }
1155
0
        if (config::enable_java_support) {
1156
0
            _sink = std::make_shared<JdbcTableSinkOperatorX>(row_desc, next_sink_operator_id(),
1157
0
                                                             output_exprs);
1158
0
        } else {
1159
0
            return Status::InternalError(
1160
0
                    "Jdbc table sink is not enabled, you can change be config "
1161
0
                    "enable_java_support to true and restart be.");
1162
0
        }
1163
0
        break;
1164
0
    }
1165
3
    case TDataSinkType::MEMORY_SCRATCH_SINK: {
1166
3
        if (!thrift_sink.__isset.memory_scratch_sink) {
1167
0
            return Status::InternalError("Missing data buffer sink.");
1168
0
        }
1169
1170
3
        _sink = std::make_shared<MemoryScratchSinkOperatorX>(row_desc, next_sink_operator_id(),
1171
3
                                                             output_exprs);
1172
3
        break;
1173
3
    }
1174
338
    case TDataSinkType::RESULT_FILE_SINK: {
1175
338
        if (!thrift_sink.__isset.result_file_sink) {
1176
0
            return Status::InternalError("Missing result file sink.");
1177
0
        }
1178
1179
        // Result file sink is not the top sink
1180
338
        if (params.__isset.destinations && !params.destinations.empty()) {
1181
0
            _sink = std::make_shared<ResultFileSinkOperatorX>(
1182
0
                    next_sink_operator_id(), row_desc, thrift_sink.result_file_sink,
1183
0
                    params.destinations, output_exprs, desc_tbl);
1184
338
        } else {
1185
338
            _sink = std::make_shared<ResultFileSinkOperatorX>(next_sink_operator_id(), row_desc,
1186
338
                                                              output_exprs);
1187
338
        }
1188
338
        break;
1189
338
    }
1190
1.01k
    case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
1191
1.01k
        DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
1192
1.01k
        DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
1193
1.01k
        auto sink_id = next_sink_operator_id();
1194
1.01k
        const int multi_cast_node_id = sink_id;
1195
1.01k
        auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size();
1196
        // one sink has multiple sources.
1197
1.01k
        std::vector<int> sources;
1198
3.82k
        for (int i = 0; i < sender_size; ++i) {
1199
2.81k
            auto source_id = next_operator_id();
1200
2.81k
            sources.push_back(source_id);
1201
2.81k
        }
1202
1203
1.01k
        _sink = std::make_shared<MultiCastDataStreamSinkOperatorX>(
1204
1.01k
                sink_id, multi_cast_node_id, sources, pool, thrift_sink.multi_cast_stream_sink);
1205
3.82k
        for (int i = 0; i < sender_size; ++i) {
1206
2.81k
            auto new_pipeline = add_pipeline();
1207
            // use to exchange sink
1208
2.81k
            RowDescriptor* exchange_row_desc = nullptr;
1209
2.81k
            {
1210
2.81k
                const auto& tmp_row_desc =
1211
2.81k
                        !thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty()
1212
2.81k
                                ? RowDescriptor(state->desc_tbl(),
1213
2.81k
                                                {thrift_sink.multi_cast_stream_sink.sinks[i]
1214
2.81k
                                                         .output_tuple_id})
1215
2.81k
                                : row_desc;
1216
2.81k
                exchange_row_desc = pool->add(new RowDescriptor(tmp_row_desc));
1217
2.81k
            }
1218
2.81k
            auto source_id = sources[i];
1219
2.81k
            OperatorPtr source_op;
1220
            // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
1221
2.81k
            source_op = std::make_shared<MultiCastDataStreamerSourceOperatorX>(
1222
2.81k
                    /*node_id*/ source_id, /*consumer_id*/ i, pool,
1223
2.81k
                    thrift_sink.multi_cast_stream_sink.sinks[i], row_desc,
1224
2.81k
                    /*operator_id=*/source_id);
1225
2.81k
            RETURN_IF_ERROR(new_pipeline->add_operator(
1226
2.81k
                    source_op, params.__isset.parallel_instances ? params.parallel_instances : 0));
1227
            // 2. create and set sink operator of data stream sender for new pipeline
1228
1229
2.81k
            DataSinkOperatorPtr sink_op;
1230
2.81k
            sink_op = std::make_shared<ExchangeSinkOperatorX>(
1231
2.81k
                    state, *exchange_row_desc, next_sink_operator_id(),
1232
2.81k
                    thrift_sink.multi_cast_stream_sink.sinks[i],
1233
2.81k
                    thrift_sink.multi_cast_stream_sink.destinations[i], _fragment_instance_ids);
1234
1235
2.81k
            RETURN_IF_ERROR(new_pipeline->set_sink(sink_op));
1236
2.81k
            {
1237
2.81k
                TDataSink* t = pool->add(new TDataSink());
1238
2.81k
                t->stream_sink = thrift_sink.multi_cast_stream_sink.sinks[i];
1239
2.81k
                RETURN_IF_ERROR(sink_op->init(*t));
1240
2.81k
            }
1241
1242
            // 3. set dependency dag
1243
2.81k
            _dag[new_pipeline->id()].push_back(cur_pipeline_id);
1244
2.81k
        }
1245
1.01k
        if (sources.empty()) {
1246
0
            return Status::InternalError("size of sources must be greater than 0");
1247
0
        }
1248
1.01k
        break;
1249
1.01k
    }
1250
1.01k
    case TDataSinkType::BLACKHOLE_SINK: {
1251
3
        if (!thrift_sink.__isset.blackhole_sink) {
1252
0
            return Status::InternalError("Missing blackhole sink.");
1253
0
        }
1254
1255
3
        _sink.reset(new BlackholeSinkOperatorX(next_sink_operator_id()));
1256
3
        break;
1257
3
    }
1258
0
    case TDataSinkType::TVF_TABLE_SINK: {
1259
0
        if (!thrift_sink.__isset.tvf_table_sink) {
1260
0
            return Status::InternalError("Missing TVF table sink.");
1261
0
        }
1262
0
        _sink = std::make_shared<TVFTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1263
0
                                                        output_exprs);
1264
0
        break;
1265
0
    }
1266
0
    default:
1267
0
        return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type);
1268
334k
    }
1269
334k
    return Status::OK();
1270
334k
}
1271
1272
// NOLINTBEGIN(readability-function-size)
1273
// NOLINTBEGIN(readability-function-cognitive-complexity)
1274
Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNode& tnode,
1275
                                                 const DescriptorTbl& descs, OperatorPtr& op,
1276
                                                 PipelinePtr& cur_pipe, int parent_idx,
1277
                                                 int child_idx,
1278
                                                 const bool followed_by_shuffled_operator,
1279
                                                 const bool require_bucket_distribution,
1280
539k
                                                 OperatorPtr& cache_op) {
1281
539k
    std::vector<DataSinkOperatorPtr> sink_ops;
1282
539k
    Defer defer = Defer([&]() {
1283
538k
        if (op) {
1284
538k
            op->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1285
538k
        }
1286
537k
        for (auto& s : sink_ops) {
1287
95.8k
            s->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1288
95.8k
        }
1289
537k
    });
1290
    // We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
1291
    // Therefore, here we need to use a stack-like structure.
1292
539k
    _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
1293
539k
    std::stringstream error_msg;
1294
539k
    bool enable_query_cache = _params.fragment.__isset.query_cache_param;
1295
1296
539k
    bool fe_with_old_version = false;
1297
539k
    switch (tnode.node_type) {
1298
167k
    case TPlanNodeType::OLAP_SCAN_NODE: {
1299
167k
        op = std::make_shared<OlapScanOperatorX>(
1300
167k
                pool, tnode, next_operator_id(), descs, _num_instances,
1301
167k
                enable_query_cache ? _params.fragment.query_cache_param : TQueryCacheParam {});
1302
167k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1303
167k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1304
167k
        break;
1305
167k
    }
1306
76
    case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
1307
76
        DCHECK(_query_ctx != nullptr);
1308
76
        _query_ctx->query_mem_tracker()->is_group_commit_load = true;
1309
76
        op = std::make_shared<GroupCommitOperatorX>(pool, tnode, next_operator_id(), descs,
1310
76
                                                    _num_instances);
1311
76
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1312
76
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1313
76
        break;
1314
76
    }
1315
0
    case TPlanNodeType::JDBC_SCAN_NODE: {
1316
0
        if (config::enable_java_support) {
1317
0
            op = std::make_shared<JDBCScanOperatorX>(pool, tnode, next_operator_id(), descs,
1318
0
                                                     _num_instances);
1319
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1320
0
        } else {
1321
0
            return Status::InternalError(
1322
0
                    "Jdbc scan node is disabled, you can change be config enable_java_support "
1323
0
                    "to true and restart be.");
1324
0
        }
1325
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1326
0
        break;
1327
0
    }
1328
2.68k
    case TPlanNodeType::FILE_SCAN_NODE: {
1329
2.68k
        op = std::make_shared<FileScanOperatorX>(pool, tnode, next_operator_id(), descs,
1330
2.68k
                                                 _num_instances);
1331
2.68k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1332
2.68k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1333
2.68k
        break;
1334
2.68k
    }
1335
0
    case TPlanNodeType::ES_SCAN_NODE:
1336
0
    case TPlanNodeType::ES_HTTP_SCAN_NODE: {
1337
0
        op = std::make_shared<EsScanOperatorX>(pool, tnode, next_operator_id(), descs,
1338
0
                                               _num_instances);
1339
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1340
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1341
0
        break;
1342
0
    }
1343
119k
    case TPlanNodeType::EXCHANGE_NODE: {
1344
119k
        int num_senders = _params.per_exch_num_senders.contains(tnode.node_id)
1345
119k
                                  ? _params.per_exch_num_senders.find(tnode.node_id)->second
1346
18.4E
                                  : 0;
1347
119k
        DCHECK_GT(num_senders, 0);
1348
119k
        op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs,
1349
119k
                                                       num_senders);
1350
119k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1351
119k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1352
119k
        break;
1353
119k
    }
1354
140k
    case TPlanNodeType::AGGREGATION_NODE: {
1355
140k
        if (tnode.agg_node.grouping_exprs.empty() &&
1356
140k
            descs.get_tuple_descriptor(tnode.agg_node.output_tuple_id)->slots().empty()) {
1357
0
            return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) +
1358
0
                                         ": group by and output is empty");
1359
0
        }
1360
140k
        bool need_create_cache_op =
1361
140k
                enable_query_cache && tnode.node_id == _params.fragment.query_cache_param.node_id;
1362
140k
        auto create_query_cache_operator = [&](PipelinePtr& new_pipe) {
1363
24
            auto cache_node_id = _params.local_params[0].per_node_scan_ranges.begin()->first;
1364
24
            auto cache_source_id = next_operator_id();
1365
24
            op = std::make_shared<CacheSourceOperatorX>(pool, cache_node_id, cache_source_id,
1366
24
                                                        _params.fragment.query_cache_param);
1367
24
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1368
1369
24
            const auto downstream_pipeline_id = cur_pipe->id();
1370
24
            if (!_dag.contains(downstream_pipeline_id)) {
1371
24
                _dag.insert({downstream_pipeline_id, {}});
1372
24
            }
1373
24
            new_pipe = add_pipeline(cur_pipe);
1374
24
            _dag[downstream_pipeline_id].push_back(new_pipe->id());
1375
1376
24
            DataSinkOperatorPtr cache_sink(new CacheSinkOperatorX(
1377
24
                    next_sink_operator_id(), op->node_id(), op->operator_id()));
1378
24
            RETURN_IF_ERROR(new_pipe->set_sink(cache_sink));
1379
24
            return Status::OK();
1380
24
        };
1381
140k
        const bool group_by_limit_opt =
1382
140k
                tnode.agg_node.__isset.agg_sort_info_by_group_key && tnode.limit > 0;
1383
1384
        /// PartitionedAggSourceOperatorX does not support "group by limit opt(#29641)" yet.
1385
        /// If `group_by_limit_opt` is true, then it might not need to spill at all.
1386
140k
        const bool enable_spill = _runtime_state->enable_spill() &&
1387
140k
                                  !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt;
1388
140k
        const bool is_streaming_agg = tnode.agg_node.__isset.use_streaming_preaggregation &&
1389
140k
                                      tnode.agg_node.use_streaming_preaggregation &&
1390
140k
                                      !tnode.agg_node.grouping_exprs.empty();
1391
        // TODO: distinct streaming agg does not support spill.
1392
140k
        const bool can_use_distinct_streaming_agg =
1393
140k
                (!enable_spill || is_streaming_agg) && tnode.agg_node.aggregate_functions.empty() &&
1394
140k
                !tnode.agg_node.__isset.agg_sort_info_by_group_key &&
1395
140k
                _params.query_options.__isset.enable_distinct_streaming_aggregation &&
1396
140k
                _params.query_options.enable_distinct_streaming_aggregation;
1397
1398
140k
        if (can_use_distinct_streaming_agg) {
1399
91.7k
            if (need_create_cache_op) {
1400
8
                PipelinePtr new_pipe;
1401
8
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1402
1403
8
                cache_op = op;
1404
8
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1405
8
                                                                     tnode, descs);
1406
8
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1407
8
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1408
8
                cur_pipe = new_pipe;
1409
91.7k
            } else {
1410
91.7k
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1411
91.7k
                                                                     tnode, descs);
1412
91.7k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1413
91.7k
            }
1414
91.7k
        } else if (is_streaming_agg) {
1415
1.50k
            if (need_create_cache_op) {
1416
0
                PipelinePtr new_pipe;
1417
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1418
0
                cache_op = op;
1419
0
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1420
0
                                                             descs);
1421
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1422
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1423
0
                cur_pipe = new_pipe;
1424
1.50k
            } else {
1425
1.50k
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1426
1.50k
                                                             descs);
1427
1.50k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1428
1.50k
            }
1429
47.5k
        } else {
1430
            // create new pipeline to add query cache operator
1431
47.5k
            PipelinePtr new_pipe;
1432
47.5k
            if (need_create_cache_op) {
1433
16
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1434
16
                cache_op = op;
1435
16
            }
1436
1437
47.5k
            if (enable_spill) {
1438
639
                op = std::make_shared<PartitionedAggSourceOperatorX>(pool, tnode,
1439
639
                                                                     next_operator_id(), descs);
1440
46.9k
            } else {
1441
46.9k
                op = std::make_shared<AggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1442
46.9k
            }
1443
47.5k
            if (need_create_cache_op) {
1444
16
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1445
16
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1446
16
                cur_pipe = new_pipe;
1447
47.5k
            } else {
1448
47.5k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1449
47.5k
            }
1450
1451
47.5k
            const auto downstream_pipeline_id = cur_pipe->id();
1452
47.5k
            if (!_dag.contains(downstream_pipeline_id)) {
1453
45.5k
                _dag.insert({downstream_pipeline_id, {}});
1454
45.5k
            }
1455
47.5k
            cur_pipe = add_pipeline(cur_pipe);
1456
47.5k
            _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1457
1458
47.5k
            if (enable_spill) {
1459
639
                sink_ops.push_back(std::make_shared<PartitionedAggSinkOperatorX>(
1460
639
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1461
46.9k
            } else {
1462
46.9k
                sink_ops.push_back(std::make_shared<AggSinkOperatorX>(
1463
46.9k
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1464
46.9k
            }
1465
47.5k
            RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1466
47.5k
            RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1467
47.5k
        }
1468
140k
        break;
1469
140k
    }
1470
140k
    case TPlanNodeType::HASH_JOIN_NODE: {
1471
8.97k
        const auto is_broadcast_join = tnode.hash_join_node.__isset.is_broadcast_join &&
1472
8.97k
                                       tnode.hash_join_node.is_broadcast_join;
1473
8.97k
        const auto enable_spill = _runtime_state->enable_spill();
1474
8.97k
        if (enable_spill && !is_broadcast_join) {
1475
0
            auto tnode_ = tnode;
1476
0
            tnode_.runtime_filters.clear();
1477
0
            auto inner_probe_operator =
1478
0
                    std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, descs);
1479
1480
            // probe side inner sink operator is used to build hash table on probe side when data is spilled.
1481
            // So here use `tnode_` which has no runtime filters.
1482
0
            auto probe_side_inner_sink_operator =
1483
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode_, descs);
1484
1485
0
            RETURN_IF_ERROR(inner_probe_operator->init(tnode_, _runtime_state.get()));
1486
0
            RETURN_IF_ERROR(probe_side_inner_sink_operator->init(tnode_, _runtime_state.get()));
1487
1488
0
            auto probe_operator = std::make_shared<PartitionedHashJoinProbeOperatorX>(
1489
0
                    pool, tnode_, next_operator_id(), descs);
1490
0
            probe_operator->set_inner_operators(probe_side_inner_sink_operator,
1491
0
                                                inner_probe_operator);
1492
0
            op = std::move(probe_operator);
1493
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1494
1495
0
            const auto downstream_pipeline_id = cur_pipe->id();
1496
0
            if (!_dag.contains(downstream_pipeline_id)) {
1497
0
                _dag.insert({downstream_pipeline_id, {}});
1498
0
            }
1499
0
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1500
0
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1501
1502
0
            auto inner_sink_operator =
1503
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode, descs);
1504
0
            auto sink_operator = std::make_shared<PartitionedHashJoinSinkOperatorX>(
1505
0
                    pool, next_sink_operator_id(), op->operator_id(), tnode_, descs);
1506
0
            RETURN_IF_ERROR(inner_sink_operator->init(tnode, _runtime_state.get()));
1507
1508
0
            sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator);
1509
0
            sink_ops.push_back(std::move(sink_operator));
1510
0
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1511
0
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode_, _runtime_state.get()));
1512
1513
0
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1514
0
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1515
8.97k
        } else {
1516
8.97k
            op = std::make_shared<HashJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1517
8.97k
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1518
1519
8.97k
            const auto downstream_pipeline_id = cur_pipe->id();
1520
8.97k
            if (!_dag.contains(downstream_pipeline_id)) {
1521
7.27k
                _dag.insert({downstream_pipeline_id, {}});
1522
7.27k
            }
1523
8.97k
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1524
8.97k
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1525
1526
8.97k
            sink_ops.push_back(std::make_shared<HashJoinBuildSinkOperatorX>(
1527
8.97k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1528
8.97k
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1529
8.97k
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1530
1531
8.97k
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1532
8.97k
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1533
8.97k
        }
1534
8.97k
        if (is_broadcast_join && _runtime_state->enable_share_hash_table_for_broadcast_join()) {
1535
2.11k
            std::shared_ptr<HashJoinSharedState> shared_state =
1536
2.11k
                    HashJoinSharedState::create_shared(_num_instances);
1537
13.6k
            for (int i = 0; i < _num_instances; i++) {
1538
11.5k
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1539
11.5k
                                                             "HASH_JOIN_BUILD_DEPENDENCY");
1540
11.5k
                sink_dep->set_shared_state(shared_state.get());
1541
11.5k
                shared_state->sink_deps.push_back(sink_dep);
1542
11.5k
            }
1543
2.11k
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1544
2.11k
                                                     op->node_id(), "HASH_JOIN_PROBE");
1545
2.11k
            _op_id_to_shared_state.insert(
1546
2.11k
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1547
2.11k
        }
1548
8.97k
        break;
1549
8.97k
    }
1550
2.89k
    case TPlanNodeType::CROSS_JOIN_NODE: {
1551
2.89k
        op = std::make_shared<NestedLoopJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1552
2.89k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1553
1554
2.89k
        const auto downstream_pipeline_id = cur_pipe->id();
1555
2.89k
        if (!_dag.contains(downstream_pipeline_id)) {
1556
2.64k
            _dag.insert({downstream_pipeline_id, {}});
1557
2.64k
        }
1558
2.89k
        PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1559
2.89k
        _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1560
1561
2.89k
        sink_ops.push_back(std::make_shared<NestedLoopJoinBuildSinkOperatorX>(
1562
2.89k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1563
2.89k
        RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1564
2.89k
        RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1565
2.89k
        _pipeline_parent_map.push(op->node_id(), cur_pipe);
1566
2.89k
        _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1567
2.89k
        break;
1568
2.89k
    }
1569
48.8k
    case TPlanNodeType::UNION_NODE: {
1570
48.8k
        int child_count = tnode.num_children;
1571
48.8k
        op = std::make_shared<UnionSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1572
48.8k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1573
1574
48.8k
        const auto downstream_pipeline_id = cur_pipe->id();
1575
48.8k
        if (!_dag.contains(downstream_pipeline_id)) {
1576
48.1k
            _dag.insert({downstream_pipeline_id, {}});
1577
48.1k
        }
1578
49.9k
        for (int i = 0; i < child_count; i++) {
1579
1.13k
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1580
1.13k
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1581
1.13k
            sink_ops.push_back(std::make_shared<UnionSinkOperatorX>(
1582
1.13k
                    i, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1583
1.13k
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1584
1.13k
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1585
            // preset children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1586
1.13k
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1587
1.13k
        }
1588
48.8k
        break;
1589
48.8k
    }
1590
48.8k
    case TPlanNodeType::SORT_NODE: {
1591
34.0k
        const auto should_spill = _runtime_state->enable_spill() &&
1592
34.0k
                                  tnode.sort_node.algorithm == TSortAlgorithm::FULL_SORT;
1593
34.0k
        const bool use_local_merge =
1594
34.0k
                tnode.sort_node.__isset.use_local_merge && tnode.sort_node.use_local_merge;
1595
34.0k
        if (should_spill) {
1596
9
            op = std::make_shared<SpillSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1597
33.9k
        } else if (use_local_merge) {
1598
31.9k
            op = std::make_shared<LocalMergeSortSourceOperatorX>(pool, tnode, next_operator_id(),
1599
31.9k
                                                                 descs);
1600
31.9k
        } else {
1601
2.03k
            op = std::make_shared<SortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1602
2.03k
        }
1603
34.0k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1604
1605
34.0k
        const auto downstream_pipeline_id = cur_pipe->id();
1606
34.0k
        if (!_dag.contains(downstream_pipeline_id)) {
1607
33.9k
            _dag.insert({downstream_pipeline_id, {}});
1608
33.9k
        }
1609
34.0k
        cur_pipe = add_pipeline(cur_pipe);
1610
34.0k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1611
1612
34.0k
        if (should_spill) {
1613
9
            sink_ops.push_back(std::make_shared<SpillSortSinkOperatorX>(
1614
9
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1615
33.9k
        } else {
1616
33.9k
            sink_ops.push_back(std::make_shared<SortSinkOperatorX>(
1617
33.9k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1618
33.9k
        }
1619
34.0k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1620
34.0k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1621
34.0k
        break;
1622
34.0k
    }
1623
34.0k
    case TPlanNodeType::PARTITION_SORT_NODE: {
1624
64
        op = std::make_shared<PartitionSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1625
64
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1626
1627
64
        const auto downstream_pipeline_id = cur_pipe->id();
1628
64
        if (!_dag.contains(downstream_pipeline_id)) {
1629
64
            _dag.insert({downstream_pipeline_id, {}});
1630
64
        }
1631
64
        cur_pipe = add_pipeline(cur_pipe);
1632
64
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1633
1634
64
        sink_ops.push_back(std::make_shared<PartitionSortSinkOperatorX>(
1635
64
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1636
64
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1637
64
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1638
64
        break;
1639
64
    }
1640
1.77k
    case TPlanNodeType::ANALYTIC_EVAL_NODE: {
1641
1.77k
        op = std::make_shared<AnalyticSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1642
1.77k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1643
1644
1.77k
        const auto downstream_pipeline_id = cur_pipe->id();
1645
1.77k
        if (!_dag.contains(downstream_pipeline_id)) {
1646
1.75k
            _dag.insert({downstream_pipeline_id, {}});
1647
1.75k
        }
1648
1.77k
        cur_pipe = add_pipeline(cur_pipe);
1649
1.77k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1650
1651
1.77k
        sink_ops.push_back(std::make_shared<AnalyticSinkOperatorX>(
1652
1.77k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1653
1.77k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1654
1.77k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1655
1.77k
        break;
1656
1.77k
    }
1657
1.77k
    case TPlanNodeType::MATERIALIZATION_NODE: {
1658
681
        op = std::make_shared<MaterializationOperator>(pool, tnode, next_operator_id(), descs);
1659
681
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1660
681
        break;
1661
681
    }
1662
681
    case TPlanNodeType::INTERSECT_NODE: {
1663
119
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(pool, tnode, descs, op,
1664
119
                                                                      cur_pipe, sink_ops));
1665
119
        break;
1666
119
    }
1667
129
    case TPlanNodeType::EXCEPT_NODE: {
1668
129
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(pool, tnode, descs, op,
1669
129
                                                                       cur_pipe, sink_ops));
1670
129
        break;
1671
129
    }
1672
311
    case TPlanNodeType::REPEAT_NODE: {
1673
311
        op = std::make_shared<RepeatOperatorX>(pool, tnode, next_operator_id(), descs);
1674
311
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1675
311
        break;
1676
311
    }
1677
921
    case TPlanNodeType::TABLE_FUNCTION_NODE: {
1678
921
        op = std::make_shared<TableFunctionOperatorX>(pool, tnode, next_operator_id(), descs);
1679
921
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1680
921
        break;
1681
921
    }
1682
921
    case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
1683
18
        op = std::make_shared<AssertNumRowsOperatorX>(pool, tnode, next_operator_id(), descs);
1684
18
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1685
18
        break;
1686
18
    }
1687
1.49k
    case TPlanNodeType::EMPTY_SET_NODE: {
1688
1.49k
        op = std::make_shared<EmptySetSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1689
1.49k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1690
1.49k
        break;
1691
1.49k
    }
1692
1.49k
    case TPlanNodeType::DATA_GEN_SCAN_NODE: {
1693
265
        op = std::make_shared<DataGenSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1694
265
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1695
265
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1696
265
        break;
1697
265
    }
1698
1.54k
    case TPlanNodeType::SCHEMA_SCAN_NODE: {
1699
1.54k
        op = std::make_shared<SchemaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1700
1.54k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1701
1.54k
        break;
1702
1.54k
    }
1703
5.00k
    case TPlanNodeType::META_SCAN_NODE: {
1704
5.00k
        op = std::make_shared<MetaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1705
5.00k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1706
5.00k
        break;
1707
5.00k
    }
1708
5.00k
    case TPlanNodeType::SELECT_NODE: {
1709
971
        op = std::make_shared<SelectOperatorX>(pool, tnode, next_operator_id(), descs);
1710
971
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1711
971
        break;
1712
971
    }
1713
971
    case TPlanNodeType::REC_CTE_NODE: {
1714
151
        op = std::make_shared<RecCTESourceOperatorX>(pool, tnode, next_operator_id(), descs);
1715
151
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1716
1717
151
        const auto downstream_pipeline_id = cur_pipe->id();
1718
151
        if (!_dag.contains(downstream_pipeline_id)) {
1719
148
            _dag.insert({downstream_pipeline_id, {}});
1720
148
        }
1721
1722
151
        PipelinePtr anchor_side_pipe = add_pipeline(cur_pipe);
1723
151
        _dag[downstream_pipeline_id].push_back(anchor_side_pipe->id());
1724
1725
151
        DataSinkOperatorPtr anchor_sink;
1726
151
        anchor_sink = std::make_shared<RecCTEAnchorSinkOperatorX>(next_sink_operator_id(),
1727
151
                                                                  op->operator_id(), tnode, descs);
1728
151
        RETURN_IF_ERROR(anchor_side_pipe->set_sink(anchor_sink));
1729
151
        RETURN_IF_ERROR(anchor_side_pipe->sink()->init(tnode, _runtime_state.get()));
1730
151
        _pipeline_parent_map.push(op->node_id(), anchor_side_pipe);
1731
1732
151
        PipelinePtr rec_side_pipe = add_pipeline(cur_pipe);
1733
151
        _dag[downstream_pipeline_id].push_back(rec_side_pipe->id());
1734
1735
151
        DataSinkOperatorPtr rec_sink;
1736
151
        rec_sink = std::make_shared<RecCTESinkOperatorX>(next_sink_operator_id(), op->operator_id(),
1737
151
                                                         tnode, descs);
1738
151
        RETURN_IF_ERROR(rec_side_pipe->set_sink(rec_sink));
1739
151
        RETURN_IF_ERROR(rec_side_pipe->sink()->init(tnode, _runtime_state.get()));
1740
151
        _pipeline_parent_map.push(op->node_id(), rec_side_pipe);
1741
1742
151
        break;
1743
151
    }
1744
1.95k
    case TPlanNodeType::REC_CTE_SCAN_NODE: {
1745
1.95k
        op = std::make_shared<RecCTEScanOperatorX>(pool, tnode, next_operator_id(), descs);
1746
1.95k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1747
1.95k
        break;
1748
1.95k
    }
1749
1.95k
    default:
1750
0
        return Status::InternalError("Unsupported exec type in pipeline: {}",
1751
0
                                     print_plan_node_type(tnode.node_type));
1752
539k
    }
1753
537k
    if (_params.__isset.parallel_instances && fe_with_old_version) {
1754
0
        cur_pipe->set_num_tasks(_params.parallel_instances);
1755
0
        op->set_serial_operator();
1756
0
    }
1757
1758
537k
    return Status::OK();
1759
539k
}
1760
// NOLINTEND(readability-function-cognitive-complexity)
1761
// NOLINTEND(readability-function-size)
1762
1763
template <bool is_intersect>
1764
Status PipelineFragmentContext::_build_operators_for_set_operation_node(
1765
        ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op,
1766
248
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1767
248
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1768
248
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1769
1770
248
    const auto downstream_pipeline_id = cur_pipe->id();
1771
248
    if (!_dag.contains(downstream_pipeline_id)) {
1772
231
        _dag.insert({downstream_pipeline_id, {}});
1773
231
    }
1774
1775
837
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1776
589
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1777
589
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1778
1779
589
        if (child_id == 0) {
1780
248
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1781
248
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1782
341
        } else {
1783
341
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1784
341
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1785
341
        }
1786
589
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1787
589
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1788
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1789
589
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1790
589
    }
1791
1792
248
    return Status::OK();
1793
248
}
_ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb1EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Line
Count
Source
1766
119
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1767
119
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1768
119
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1769
1770
119
    const auto downstream_pipeline_id = cur_pipe->id();
1771
119
    if (!_dag.contains(downstream_pipeline_id)) {
1772
110
        _dag.insert({downstream_pipeline_id, {}});
1773
110
    }
1774
1775
435
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1776
316
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1777
316
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1778
1779
316
        if (child_id == 0) {
1780
119
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1781
119
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1782
197
        } else {
1783
197
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1784
197
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1785
197
        }
1786
316
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1787
316
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1788
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1789
316
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1790
316
    }
1791
1792
119
    return Status::OK();
1793
119
}
_ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb0EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Line
Count
Source
1766
129
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1767
129
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1768
129
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1769
1770
129
    const auto downstream_pipeline_id = cur_pipe->id();
1771
129
    if (!_dag.contains(downstream_pipeline_id)) {
1772
121
        _dag.insert({downstream_pipeline_id, {}});
1773
121
    }
1774
1775
402
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1776
273
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1777
273
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1778
1779
273
        if (child_id == 0) {
1780
129
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1781
129
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1782
144
        } else {
1783
144
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1784
144
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1785
144
        }
1786
273
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1787
273
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1788
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1789
273
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1790
273
    }
1791
1792
129
    return Status::OK();
1793
129
}
1794
1795
331k
Status PipelineFragmentContext::submit() {
1796
331k
    if (_submitted) {
1797
0
        return Status::InternalError("submitted");
1798
0
    }
1799
331k
    _submitted = true;
1800
1801
331k
    int submit_tasks = 0;
1802
331k
    Status st;
1803
331k
    auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
1804
1.13M
    for (auto& task : _tasks) {
1805
1.89M
        for (auto& t : task) {
1806
1.89M
            st = scheduler->submit(t.first);
1807
1.89M
            DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed",
1808
1.89M
                            { st = Status::Aborted("PipelineFragmentContext.submit.failed"); });
1809
1.89M
            if (!st) {
1810
0
                cancel(Status::InternalError("submit context to executor fail"));
1811
0
                std::lock_guard<std::mutex> l(_task_mutex);
1812
0
                _total_tasks = submit_tasks;
1813
0
                break;
1814
0
            }
1815
1.89M
            submit_tasks++;
1816
1.89M
        }
1817
1.13M
    }
1818
331k
    if (!st.ok()) {
1819
0
        bool need_remove = false;
1820
0
        {
1821
0
            std::lock_guard<std::mutex> l(_task_mutex);
1822
0
            if (_closed_tasks >= _total_tasks) {
1823
0
                need_remove = _close_fragment_instance();
1824
0
            }
1825
0
        }
1826
        // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
1827
0
        if (need_remove) {
1828
0
            _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
1829
0
        }
1830
0
        return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(),
1831
0
                                     BackendOptions::get_localhost());
1832
331k
    } else {
1833
331k
        return st;
1834
331k
    }
1835
331k
}
1836
1837
0
void PipelineFragmentContext::print_profile(const std::string& extra_info) {
1838
0
    if (_runtime_state->enable_profile()) {
1839
0
        std::stringstream ss;
1840
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
1841
0
            runtime_profile_ptr->pretty_print(&ss);
1842
0
        }
1843
1844
0
        if (_runtime_state->load_channel_profile()) {
1845
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
1846
0
        }
1847
1848
0
        auto profile_str =
1849
0
                fmt::format("Query {} fragment {} {}, profile, {}", print_id(this->_query_id),
1850
0
                            this->_fragment_id, extra_info, ss.str());
1851
0
        LOG_LONG_STRING(INFO, profile_str);
1852
0
    }
1853
0
}
1854
// If all pipeline tasks binded to the fragment instance are finished, then we could
1855
// close the fragment instance.
1856
// Returns true if the caller should call remove_pipeline_context() **after** releasing
1857
// _task_mutex. We must not call remove_pipeline_context() here because it acquires
1858
// _pipeline_map's shard lock, and this function is called while _task_mutex is held.
1859
// Acquiring _pipeline_map while holding _task_mutex creates an ABBA deadlock with
1860
// dump_pipeline_tasks(), which acquires _pipeline_map first and then _task_mutex
1861
// (via debug_string()).
1862
335k
bool PipelineFragmentContext::_close_fragment_instance() {
1863
335k
    if (_is_fragment_instance_closed) {
1864
0
        return false;
1865
0
    }
1866
335k
    Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
1867
335k
    _fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
1868
335k
    if (!_need_notify_close) {
1869
331k
        auto st = send_report(true);
1870
331k
        if (!st) {
1871
0
            LOG(WARNING) << fmt::format("Failed to send report for query {}, fragment {}: {}",
1872
0
                                        print_id(_query_id), _fragment_id, st.to_string());
1873
0
        }
1874
331k
    }
1875
    // Print profile content in info log is a tempoeray solution for stream load and external_connector.
1876
    // Since stream load does not have someting like coordinator on FE, so
1877
    // backend can not report profile to FE, ant its profile can not be shown
1878
    // in the same way with other query. So we print the profile content to info log.
1879
1880
335k
    if (_runtime_state->enable_profile() &&
1881
335k
        (_query_ctx->get_query_source() == QuerySource::STREAM_LOAD ||
1882
2.59k
         _query_ctx->get_query_source() == QuerySource::EXTERNAL_CONNECTOR ||
1883
2.59k
         _query_ctx->get_query_source() == QuerySource::GROUP_COMMIT_LOAD)) {
1884
0
        std::stringstream ss;
1885
        // Compute the _local_time_percent before pretty_print the runtime_profile
1886
        // Before add this operation, the print out like that:
1887
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 00.00%)
1888
        // After add the operation, the print out like that:
1889
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 82.53%)
1890
        // We can easily know the exec node execute time without child time consumed.
1891
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
1892
0
            runtime_profile_ptr->pretty_print(&ss);
1893
0
        }
1894
1895
0
        if (_runtime_state->load_channel_profile()) {
1896
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
1897
0
        }
1898
1899
0
        LOG_INFO("Query {} fragment {} profile:\n {}", print_id(_query_id), _fragment_id, ss.str());
1900
0
    }
1901
1902
335k
    if (_query_ctx->enable_profile()) {
1903
2.59k
        _query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile(),
1904
2.59k
                                         collect_realtime_load_channel_profile());
1905
2.59k
    }
1906
1907
    // Return whether the caller needs to remove from the pipeline map.
1908
    // The caller must do this after releasing _task_mutex.
1909
335k
    return !_need_notify_close;
1910
335k
}
1911
1912
1.88M
void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
1913
    // If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here
1914
1.88M
    DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
1915
1.88M
    if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
1916
547k
        if (_dag.contains(pipeline_id)) {
1917
326k
            for (auto dep : _dag[pipeline_id]) {
1918
326k
                _pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id);
1919
326k
            }
1920
252k
        }
1921
547k
    }
1922
1.88M
    bool need_remove = false;
1923
1.88M
    {
1924
1.88M
        std::lock_guard<std::mutex> l(_task_mutex);
1925
1.88M
        ++_closed_tasks;
1926
1.88M
        if (_closed_tasks >= _total_tasks) {
1927
335k
            need_remove = _close_fragment_instance();
1928
335k
        }
1929
1.88M
    }
1930
    // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
1931
1.88M
    if (need_remove) {
1932
331k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
1933
331k
    }
1934
1.88M
}
1935
1936
42.0k
std::string PipelineFragmentContext::get_load_error_url() {
1937
42.0k
    if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) {
1938
0
        return to_load_error_http_path(str);
1939
0
    }
1940
111k
    for (auto& tasks : _tasks) {
1941
195k
        for (auto& task : tasks) {
1942
195k
            if (const auto& str = task.second->get_error_log_file_path(); !str.empty()) {
1943
152
                return to_load_error_http_path(str);
1944
152
            }
1945
195k
        }
1946
111k
    }
1947
41.8k
    return "";
1948
42.0k
}
1949
1950
41.9k
std::string PipelineFragmentContext::get_first_error_msg() {
1951
41.9k
    if (const auto& str = _runtime_state->get_first_error_msg(); !str.empty()) {
1952
0
        return str;
1953
0
    }
1954
111k
    for (auto& tasks : _tasks) {
1955
195k
        for (auto& task : tasks) {
1956
195k
            if (const auto& str = task.second->get_first_error_msg(); !str.empty()) {
1957
152
                return str;
1958
152
            }
1959
195k
        }
1960
111k
    }
1961
41.8k
    return "";
1962
41.9k
}
1963
1964
335k
Status PipelineFragmentContext::send_report(bool done) {
1965
335k
    Status exec_status = _query_ctx->exec_status();
1966
    // If plan is done successfully, but _is_report_success is false,
1967
    // no need to send report.
1968
    // Load will set _is_report_success to true because load wants to know
1969
    // the process.
1970
335k
    if (!_is_report_success && done && exec_status.ok()) {
1971
299k
        return Status::OK();
1972
299k
    }
1973
1974
    // If both _is_report_success and _is_report_on_cancel are false,
1975
    // which means no matter query is success or failed, no report is needed.
1976
    // This may happen when the query limit reached and
1977
    // a internal cancellation being processed
1978
    // When limit is reached the fragment is also cancelled, but _is_report_on_cancel will
1979
    // be set to false, to avoid sending fault report to FE.
1980
36.6k
    if (!_is_report_success && !_is_report_on_cancel) {
1981
152
        if (done) {
1982
            // if done is true, which means the query is finished successfully, we can safely close the fragment instance without sending report to FE, and just return OK status here.
1983
152
            return Status::OK();
1984
152
        }
1985
0
        return Status::NeedSendAgain("");
1986
152
    }
1987
1988
36.4k
    std::vector<RuntimeState*> runtime_states;
1989
1990
86.3k
    for (auto& tasks : _tasks) {
1991
139k
        for (auto& task : tasks) {
1992
139k
            runtime_states.push_back(task.second.get());
1993
139k
        }
1994
86.3k
    }
1995
1996
36.4k
    std::string load_eror_url = _query_ctx->get_load_error_url().empty()
1997
36.5k
                                        ? get_load_error_url()
1998
18.4E
                                        : _query_ctx->get_load_error_url();
1999
36.4k
    std::string first_error_msg = _query_ctx->get_first_error_msg().empty()
2000
36.5k
                                          ? get_first_error_msg()
2001
18.4E
                                          : _query_ctx->get_first_error_msg();
2002
2003
36.4k
    ReportStatusRequest req {.status = exec_status,
2004
36.4k
                             .runtime_states = runtime_states,
2005
36.4k
                             .done = done || !exec_status.ok(),
2006
36.4k
                             .coord_addr = _query_ctx->coord_addr,
2007
36.4k
                             .query_id = _query_id,
2008
36.4k
                             .fragment_id = _fragment_id,
2009
36.4k
                             .fragment_instance_id = TUniqueId(),
2010
36.4k
                             .backend_num = -1,
2011
36.4k
                             .runtime_state = _runtime_state.get(),
2012
36.4k
                             .load_error_url = load_eror_url,
2013
36.4k
                             .first_error_msg = first_error_msg,
2014
36.4k
                             .cancel_fn = [this](const Status& reason) { cancel(reason); }};
2015
2016
36.4k
    return _report_status_cb(
2017
36.4k
            req, std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
2018
36.6k
}
2019
2020
0
size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const {
2021
0
    size_t res = 0;
2022
    // _tasks will be cleared during ~PipelineFragmentContext, so that it's safe
2023
    // here to traverse the vector.
2024
0
    for (const auto& task_instances : _tasks) {
2025
0
        for (const auto& task : task_instances) {
2026
0
            if (task.first->is_running()) {
2027
0
                LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id)
2028
0
                                      << " is running, task: " << (void*)task.first.get()
2029
0
                                      << ", is_running: " << task.first->is_running();
2030
0
                *has_running_task = true;
2031
0
                return 0;
2032
0
            }
2033
2034
0
            size_t revocable_size = task.first->get_revocable_size();
2035
0
            if (revocable_size >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2036
0
                res += revocable_size;
2037
0
            }
2038
0
        }
2039
0
    }
2040
0
    return res;
2041
0
}
2042
2043
0
std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const {
2044
0
    std::vector<PipelineTask*> revocable_tasks;
2045
0
    for (const auto& task_instances : _tasks) {
2046
0
        for (const auto& task : task_instances) {
2047
0
            size_t revocable_size_ = task.first->get_revocable_size();
2048
2049
0
            if (revocable_size_ >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2050
0
                revocable_tasks.emplace_back(task.first.get());
2051
0
            }
2052
0
        }
2053
0
    }
2054
0
    return revocable_tasks;
2055
0
}
2056
2057
27
std::string PipelineFragmentContext::debug_string() {
2058
27
    std::lock_guard<std::mutex> l(_task_mutex);
2059
27
    fmt::memory_buffer debug_string_buffer;
2060
27
    fmt::format_to(debug_string_buffer,
2061
27
                   "PipelineFragmentContext Info: _closed_tasks={}, _total_tasks={}, "
2062
27
                   "need_notify_close={}, fragment_id={}, _rec_cte_stage={}\n",
2063
27
                   _closed_tasks, _total_tasks, _need_notify_close, _fragment_id, _rec_cte_stage);
2064
90
    for (size_t j = 0; j < _tasks.size(); j++) {
2065
63
        fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
2066
278
        for (size_t i = 0; i < _tasks[j].size(); i++) {
2067
215
            fmt::format_to(debug_string_buffer, "Task {}: {}\n", i,
2068
215
                           _tasks[j][i].first->debug_string());
2069
215
        }
2070
63
    }
2071
2072
27
    return fmt::to_string(debug_string_buffer);
2073
27
}
2074
2075
std::vector<std::shared_ptr<TRuntimeProfileTree>>
2076
2.59k
PipelineFragmentContext::collect_realtime_profile() const {
2077
2.59k
    std::vector<std::shared_ptr<TRuntimeProfileTree>> res;
2078
2079
    // we do not have mutex to protect pipeline_id_to_profile
2080
    // so we need to make sure this funciton is invoked after fragment context
2081
    // has already been prepared.
2082
2.59k
    if (!_prepared) {
2083
0
        std::string msg =
2084
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2085
0
        DCHECK(false) << msg;
2086
0
        LOG_ERROR(msg);
2087
0
        return res;
2088
0
    }
2089
2090
    // Make sure first profile is fragment level profile
2091
2.59k
    auto fragment_profile = std::make_shared<TRuntimeProfileTree>();
2092
2.59k
    _fragment_level_profile->to_thrift(fragment_profile.get(), _runtime_state->profile_level());
2093
2.59k
    res.push_back(fragment_profile);
2094
2095
    // pipeline_id_to_profile is initialized in prepare stage
2096
4.92k
    for (auto pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
2097
4.92k
        auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
2098
4.92k
        pipeline_profile->to_thrift(profile_ptr.get(), _runtime_state->profile_level());
2099
4.92k
        res.push_back(profile_ptr);
2100
4.92k
    }
2101
2102
2.59k
    return res;
2103
2.59k
}
2104
2105
std::shared_ptr<TRuntimeProfileTree>
2106
2.59k
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
2107
    // we do not have mutex to protect pipeline_id_to_profile
2108
    // so we need to make sure this funciton is invoked after fragment context
2109
    // has already been prepared.
2110
2.59k
    if (!_prepared) {
2111
0
        std::string msg =
2112
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2113
0
        DCHECK(false) << msg;
2114
0
        LOG_ERROR(msg);
2115
0
        return nullptr;
2116
0
    }
2117
2118
7.16k
    for (const auto& tasks : _tasks) {
2119
14.6k
        for (const auto& task : tasks) {
2120
14.6k
            if (task.second->load_channel_profile() == nullptr) {
2121
0
                continue;
2122
0
            }
2123
2124
14.6k
            auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2125
2126
14.6k
            task.second->load_channel_profile()->to_thrift(tmp_load_channel_profile.get(),
2127
14.6k
                                                           _runtime_state->profile_level());
2128
14.6k
            _runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
2129
14.6k
        }
2130
7.16k
    }
2131
2132
2.59k
    auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2133
2.59k
    _runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get(),
2134
2.59k
                                                      _runtime_state->profile_level());
2135
2.59k
    return load_channel_profile;
2136
2.59k
}
2137
2138
// Collect runtime filter IDs registered by all tasks in this PFC.
2139
// Used during recursive CTE stage transitions to know which filters to deregister
2140
// before creating the new PFC for the next recursion round.
2141
// Called from rerun_fragment(wait_for_destroy) while tasks are still closing.
2142
// Thread safety: safe because _tasks is structurally immutable after prepare() —
2143
// the vector sizes do not change, and individual RuntimeState filter sets are
2144
// written only during open() which has completed by the time we reach rerun.
2145
3.28k
std::set<int> PipelineFragmentContext::get_deregister_runtime_filter() const {
2146
3.28k
    std::set<int> result;
2147
8.37k
    for (const auto& _task : _tasks) {
2148
15.1k
        for (const auto& task : _task) {
2149
15.1k
            auto set = task.first->runtime_state()->get_deregister_runtime_filter();
2150
15.1k
            result.merge(set);
2151
15.1k
        }
2152
8.37k
    }
2153
3.28k
    if (_runtime_state) {
2154
3.28k
        auto set = _runtime_state->get_deregister_runtime_filter();
2155
3.28k
        result.merge(set);
2156
3.28k
    }
2157
3.28k
    return result;
2158
3.28k
}
2159
2160
336k
void PipelineFragmentContext::_release_resource() {
2161
336k
    std::lock_guard<std::mutex> l(_task_mutex);
2162
    // The memory released by the query end is recorded in the query mem tracker.
2163
336k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
2164
336k
    auto st = _query_ctx->exec_status();
2165
1.13M
    for (auto& _task : _tasks) {
2166
1.13M
        if (!_task.empty()) {
2167
1.13M
            _call_back(_task.front().first->runtime_state(), &st);
2168
1.13M
        }
2169
1.13M
    }
2170
336k
    _tasks.clear();
2171
336k
    _dag.clear();
2172
336k
    _pip_id_to_pipeline.clear();
2173
336k
    _pipelines.clear();
2174
336k
    _sink.reset();
2175
336k
    _root_op.reset();
2176
336k
    _runtime_filter_mgr_map.clear();
2177
336k
    _op_id_to_shared_state.clear();
2178
336k
}
2179
2180
#include "common/compile_check_end.h"
2181
} // namespace doris