Coverage Report

Created: 2026-03-26 10:56

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/pipeline/pipeline_fragment_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/pipeline/pipeline_fragment_context.h"
19
20
#include <gen_cpp/DataSinks_types.h>
21
#include <gen_cpp/PaloInternalService_types.h>
22
#include <gen_cpp/PlanNodes_types.h>
23
#include <pthread.h>
24
25
#include <algorithm>
26
#include <cstdlib>
27
// IWYU pragma: no_include <bits/chrono.h>
28
#include <fmt/format.h>
29
30
#include <chrono> // IWYU pragma: keep
31
#include <map>
32
#include <memory>
33
#include <ostream>
34
#include <utility>
35
36
#include "cloud/config.h"
37
#include "common/cast_set.h"
38
#include "common/config.h"
39
#include "common/exception.h"
40
#include "common/logging.h"
41
#include "common/status.h"
42
#include "exec/exchange/local_exchange_sink_operator.h"
43
#include "exec/exchange/local_exchange_source_operator.h"
44
#include "exec/exchange/local_exchanger.h"
45
#include "exec/exchange/vdata_stream_mgr.h"
46
#include "exec/operator/aggregation_sink_operator.h"
47
#include "exec/operator/aggregation_source_operator.h"
48
#include "exec/operator/analytic_sink_operator.h"
49
#include "exec/operator/analytic_source_operator.h"
50
#include "exec/operator/assert_num_rows_operator.h"
51
#include "exec/operator/blackhole_sink_operator.h"
52
#include "exec/operator/cache_sink_operator.h"
53
#include "exec/operator/cache_source_operator.h"
54
#include "exec/operator/datagen_operator.h"
55
#include "exec/operator/dict_sink_operator.h"
56
#include "exec/operator/distinct_streaming_aggregation_operator.h"
57
#include "exec/operator/empty_set_operator.h"
58
#include "exec/operator/es_scan_operator.h"
59
#include "exec/operator/exchange_sink_operator.h"
60
#include "exec/operator/exchange_source_operator.h"
61
#include "exec/operator/file_scan_operator.h"
62
#include "exec/operator/group_commit_block_sink_operator.h"
63
#include "exec/operator/group_commit_scan_operator.h"
64
#include "exec/operator/hashjoin_build_sink.h"
65
#include "exec/operator/hashjoin_probe_operator.h"
66
#include "exec/operator/hive_table_sink_operator.h"
67
#include "exec/operator/iceberg_delete_sink_operator.h"
68
#include "exec/operator/iceberg_merge_sink_operator.h"
69
#include "exec/operator/iceberg_table_sink_operator.h"
70
#include "exec/operator/jdbc_scan_operator.h"
71
#include "exec/operator/jdbc_table_sink_operator.h"
72
#include "exec/operator/local_merge_sort_source_operator.h"
73
#include "exec/operator/materialization_opertor.h"
74
#include "exec/operator/maxcompute_table_sink_operator.h"
75
#include "exec/operator/memory_scratch_sink_operator.h"
76
#include "exec/operator/meta_scan_operator.h"
77
#include "exec/operator/multi_cast_data_stream_sink.h"
78
#include "exec/operator/multi_cast_data_stream_source.h"
79
#include "exec/operator/nested_loop_join_build_operator.h"
80
#include "exec/operator/nested_loop_join_probe_operator.h"
81
#include "exec/operator/olap_scan_operator.h"
82
#include "exec/operator/olap_table_sink_operator.h"
83
#include "exec/operator/olap_table_sink_v2_operator.h"
84
#include "exec/operator/partition_sort_sink_operator.h"
85
#include "exec/operator/partition_sort_source_operator.h"
86
#include "exec/operator/partitioned_aggregation_sink_operator.h"
87
#include "exec/operator/partitioned_aggregation_source_operator.h"
88
#include "exec/operator/partitioned_hash_join_probe_operator.h"
89
#include "exec/operator/partitioned_hash_join_sink_operator.h"
90
#include "exec/operator/rec_cte_anchor_sink_operator.h"
91
#include "exec/operator/rec_cte_scan_operator.h"
92
#include "exec/operator/rec_cte_sink_operator.h"
93
#include "exec/operator/rec_cte_source_operator.h"
94
#include "exec/operator/repeat_operator.h"
95
#include "exec/operator/result_file_sink_operator.h"
96
#include "exec/operator/result_sink_operator.h"
97
#include "exec/operator/schema_scan_operator.h"
98
#include "exec/operator/select_operator.h"
99
#include "exec/operator/set_probe_sink_operator.h"
100
#include "exec/operator/set_sink_operator.h"
101
#include "exec/operator/set_source_operator.h"
102
#include "exec/operator/sort_sink_operator.h"
103
#include "exec/operator/sort_source_operator.h"
104
#include "exec/operator/spill_iceberg_table_sink_operator.h"
105
#include "exec/operator/spill_sort_sink_operator.h"
106
#include "exec/operator/spill_sort_source_operator.h"
107
#include "exec/operator/streaming_aggregation_operator.h"
108
#include "exec/operator/table_function_operator.h"
109
#include "exec/operator/tvf_table_sink_operator.h"
110
#include "exec/operator/union_sink_operator.h"
111
#include "exec/operator/union_source_operator.h"
112
#include "exec/pipeline/dependency.h"
113
#include "exec/pipeline/pipeline_task.h"
114
#include "exec/pipeline/task_scheduler.h"
115
#include "exec/runtime_filter/runtime_filter_mgr.h"
116
#include "exec/sort/topn_sorter.h"
117
#include "exec/spill/spill_file.h"
118
#include "io/fs/stream_load_pipe.h"
119
#include "load/stream_load/new_load_stream_mgr.h"
120
#include "runtime/exec_env.h"
121
#include "runtime/fragment_mgr.h"
122
#include "runtime/result_block_buffer.h"
123
#include "runtime/result_buffer_mgr.h"
124
#include "runtime/runtime_state.h"
125
#include "runtime/thread_context.h"
126
#include "service/backend_options.h"
127
#include "util/countdown_latch.h"
128
#include "util/debug_util.h"
129
#include "util/uid_util.h"
130
131
namespace doris {
132
#include "common/compile_check_begin.h"
133
PipelineFragmentContext::PipelineFragmentContext(
134
        TUniqueId query_id, const TPipelineFragmentParams& request,
135
        std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
136
        const std::function<void(RuntimeState*, Status*)>& call_back,
137
        report_status_callback report_status_cb)
138
347k
        : _query_id(std::move(query_id)),
139
347k
          _fragment_id(request.fragment_id),
140
347k
          _exec_env(exec_env),
141
347k
          _query_ctx(std::move(query_ctx)),
142
347k
          _call_back(call_back),
143
347k
          _is_report_on_cancel(true),
144
347k
          _report_status_cb(std::move(report_status_cb)),
145
347k
          _params(request),
146
347k
          _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0),
147
347k
          _need_notify_close(request.__isset.need_notify_close ? request.need_notify_close
148
347k
                                                               : false) {
149
347k
    _fragment_watcher.start();
150
347k
}
151
152
347k
PipelineFragmentContext::~PipelineFragmentContext() {
153
347k
    LOG_INFO("PipelineFragmentContext::~PipelineFragmentContext")
154
347k
            .tag("query_id", print_id(_query_id))
155
347k
            .tag("fragment_id", _fragment_id);
156
347k
    _release_resource();
157
347k
    {
158
        // The memory released by the query end is recorded in the query mem tracker.
159
347k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
160
347k
        _runtime_state.reset();
161
347k
        _query_ctx.reset();
162
347k
    }
163
347k
}
164
165
487
bool PipelineFragmentContext::is_timeout(timespec now) const {
166
487
    if (_timeout <= 0) {
167
0
        return false;
168
0
    }
169
487
    return _fragment_watcher.elapsed_time_seconds(now) > _timeout;
170
487
}
171
172
// Must not add lock in this method. Because it will call query ctx cancel. And
173
// QueryCtx cancel will call fragment ctx cancel. And Also Fragment ctx's running
174
// Method like exchange sink buffer will call query ctx cancel. If we add lock here
175
// There maybe dead lock.
176
5.27k
void PipelineFragmentContext::cancel(const Status reason) {
177
5.27k
    LOG_INFO("PipelineFragmentContext::cancel")
178
5.27k
            .tag("query_id", print_id(_query_id))
179
5.27k
            .tag("fragment_id", _fragment_id)
180
5.27k
            .tag("reason", reason.to_string());
181
5.27k
    {
182
5.27k
        std::lock_guard<std::mutex> l(_task_mutex);
183
5.27k
        if (_closed_tasks >= _total_tasks) {
184
            // All tasks in this PipelineXFragmentContext already closed.
185
59
            return;
186
59
        }
187
5.27k
    }
188
    // Timeout is a special error code, we need print current stack to debug timeout issue.
189
5.21k
    if (reason.is<ErrorCode::TIMEOUT>()) {
190
1
        auto dbg_str = fmt::format("PipelineFragmentContext is cancelled due to timeout:\n{}",
191
1
                                   debug_string());
192
1
        LOG_LONG_STRING(WARNING, dbg_str);
193
1
    }
194
195
    // `ILLEGAL_STATE` means queries this fragment belongs to was not found in FE (maybe finished)
196
5.21k
    if (reason.is<ErrorCode::ILLEGAL_STATE>()) {
197
0
        LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}",
198
0
                    debug_string());
199
0
    }
200
201
5.21k
    if (reason.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || reason.is<ErrorCode::MEM_ALLOC_FAILED>()) {
202
0
        print_profile("cancel pipeline, reason: " + reason.to_string());
203
0
    }
204
205
5.21k
    if (auto error_url = get_load_error_url(); !error_url.empty()) {
206
22
        _query_ctx->set_load_error_url(error_url);
207
22
    }
208
209
5.21k
    if (auto first_error_msg = get_first_error_msg(); !first_error_msg.empty()) {
210
22
        _query_ctx->set_first_error_msg(first_error_msg);
211
22
    }
212
213
5.21k
    _query_ctx->cancel(reason, _fragment_id);
214
5.21k
    if (reason.is<ErrorCode::LIMIT_REACH>()) {
215
54
        _is_report_on_cancel = false;
216
5.16k
    } else {
217
23.7k
        for (auto& id : _fragment_instance_ids) {
218
23.7k
            LOG(WARNING) << "PipelineFragmentContext cancel instance: " << print_id(id);
219
23.7k
        }
220
5.16k
    }
221
    // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
222
    // For stream load the fragment's query_id == load id, it is set in FE.
223
5.21k
    auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
224
5.21k
    if (stream_load_ctx != nullptr) {
225
30
        stream_load_ctx->pipe->cancel(reason.to_string());
226
        // Set error URL here because after pipe is cancelled, stream load execution may return early.
227
        // We need to set the error URL at this point to ensure error information is properly
228
        // propagated to the client.
229
30
        stream_load_ctx->error_url = get_load_error_url();
230
30
        stream_load_ctx->first_error_msg = get_first_error_msg();
231
30
    }
232
233
24.0k
    for (auto& tasks : _tasks) {
234
57.6k
        for (auto& task : tasks) {
235
57.6k
            task.first->terminate();
236
57.6k
        }
237
24.0k
    }
238
5.21k
}
239
240
558k
PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
241
558k
    PipelineId id = _next_pipeline_id++;
242
558k
    auto pipeline = std::make_shared<Pipeline>(
243
558k
            id, parent ? std::min(parent->num_tasks(), _num_instances) : _num_instances,
244
558k
            parent ? parent->num_tasks() : _num_instances);
245
558k
    if (idx >= 0) {
246
104k
        _pipelines.insert(_pipelines.begin() + idx, pipeline);
247
453k
    } else {
248
453k
        _pipelines.emplace_back(pipeline);
249
453k
    }
250
558k
    if (parent) {
251
205k
        parent->set_children(pipeline);
252
205k
    }
253
558k
    return pipeline;
254
558k
}
255
256
349k
Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thread_pool) {
257
349k
    {
258
349k
        SCOPED_TIMER(_build_pipelines_timer);
259
        // 2. Build pipelines with operators in this fragment.
260
349k
        auto root_pipeline = add_pipeline();
261
349k
        RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl,
262
349k
                                         &_root_op, root_pipeline));
263
264
        // 3. Create sink operator
265
349k
        if (!_params.fragment.__isset.output_sink) {
266
0
            return Status::InternalError("No output sink in this fragment!");
267
0
        }
268
349k
        RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), _params.fragment.output_sink,
269
349k
                                          _params.fragment.output_exprs, _params,
270
349k
                                          root_pipeline->output_row_desc(), _runtime_state.get(),
271
349k
                                          *_desc_tbl, root_pipeline->id()));
272
349k
        RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink));
273
349k
        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
274
275
453k
        for (PipelinePtr& pipeline : _pipelines) {
276
18.4E
            DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size();
277
453k
            RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
278
453k
        }
279
349k
    }
280
    // 4. Build local exchanger
281
349k
    if (_runtime_state->enable_local_shuffle()) {
282
348k
        SCOPED_TIMER(_plan_local_exchanger_timer);
283
348k
        RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets,
284
348k
                                             _params.bucket_seq_to_instance_idx,
285
348k
                                             _params.shuffle_idx_to_instance_idx));
286
348k
    }
287
288
    // 5. Initialize global states in pipelines.
289
559k
    for (PipelinePtr& pipeline : _pipelines) {
290
559k
        SCOPED_TIMER(_prepare_all_pipelines_timer);
291
559k
        pipeline->children().clear();
292
559k
        RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
293
559k
    }
294
295
348k
    {
296
348k
        SCOPED_TIMER(_build_tasks_timer);
297
        // 6. Build pipeline tasks and initialize local state.
298
348k
        RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool));
299
348k
    }
300
301
348k
    return Status::OK();
302
348k
}
303
304
347k
Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) {
305
347k
    if (_prepared) {
306
0
        return Status::InternalError("Already prepared");
307
0
    }
308
347k
    if (_params.__isset.query_options && _params.query_options.__isset.execution_timeout) {
309
347k
        _timeout = _params.query_options.execution_timeout;
310
347k
    }
311
312
347k
    _fragment_level_profile = std::make_unique<RuntimeProfile>("PipelineContext");
313
347k
    _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime");
314
347k
    SCOPED_TIMER(_prepare_timer);
315
347k
    _build_pipelines_timer = ADD_TIMER(_fragment_level_profile, "BuildPipelinesTime");
316
347k
    _init_context_timer = ADD_TIMER(_fragment_level_profile, "InitContextTime");
317
347k
    _plan_local_exchanger_timer = ADD_TIMER(_fragment_level_profile, "PlanLocalLocalExchangerTime");
318
347k
    _build_tasks_timer = ADD_TIMER(_fragment_level_profile, "BuildTasksTime");
319
347k
    _prepare_all_pipelines_timer = ADD_TIMER(_fragment_level_profile, "PrepareAllPipelinesTime");
320
347k
    {
321
347k
        SCOPED_TIMER(_init_context_timer);
322
347k
        cast_set(_num_instances, _params.local_params.size());
323
347k
        _total_instances =
324
347k
                _params.__isset.total_instances ? _params.total_instances : _num_instances;
325
326
347k
        auto* fragment_context = this;
327
328
347k
        if (_params.query_options.__isset.is_report_success) {
329
346k
            fragment_context->set_is_report_success(_params.query_options.is_report_success);
330
346k
        }
331
332
        // 1. Set up the global runtime state.
333
347k
        _runtime_state = RuntimeState::create_unique(
334
347k
                _params.query_id, _params.fragment_id, _params.query_options,
335
347k
                _query_ctx->query_globals, _exec_env, _query_ctx.get());
336
347k
        _runtime_state->set_task_execution_context(shared_from_this());
337
347k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
338
347k
        if (_params.__isset.backend_id) {
339
345k
            _runtime_state->set_backend_id(_params.backend_id);
340
345k
        }
341
347k
        if (_params.__isset.import_label) {
342
238
            _runtime_state->set_import_label(_params.import_label);
343
238
        }
344
347k
        if (_params.__isset.db_name) {
345
190
            _runtime_state->set_db_name(_params.db_name);
346
190
        }
347
347k
        if (_params.__isset.load_job_id) {
348
0
            _runtime_state->set_load_job_id(_params.load_job_id);
349
0
        }
350
351
347k
        if (_params.is_simplified_param) {
352
125k
            _desc_tbl = _query_ctx->desc_tbl;
353
222k
        } else {
354
222k
            DCHECK(_params.__isset.desc_tbl);
355
222k
            RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), _params.desc_tbl,
356
222k
                                                  &_desc_tbl));
357
222k
        }
358
347k
        _runtime_state->set_desc_tbl(_desc_tbl);
359
347k
        _runtime_state->set_num_per_fragment_instances(_params.num_senders);
360
347k
        _runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
361
347k
        _runtime_state->set_total_load_streams(_params.total_load_streams);
362
347k
        _runtime_state->set_num_local_sink(_params.num_local_sink);
363
364
        // init fragment_instance_ids
365
347k
        const auto target_size = _params.local_params.size();
366
347k
        _fragment_instance_ids.resize(target_size);
367
1.36M
        for (size_t i = 0; i < _params.local_params.size(); i++) {
368
1.02M
            auto fragment_instance_id = _params.local_params[i].fragment_instance_id;
369
1.02M
            _fragment_instance_ids[i] = fragment_instance_id;
370
1.02M
        }
371
347k
    }
372
373
347k
    RETURN_IF_ERROR(_build_and_prepare_full_pipeline(thread_pool));
374
375
346k
    _init_next_report_time();
376
377
346k
    _prepared = true;
378
346k
    return Status::OK();
379
347k
}
380
381
Status PipelineFragmentContext::_build_pipeline_tasks_for_instance(
382
        int instance_idx,
383
1.02M
        const std::vector<std::shared_ptr<RuntimeProfile>>& pipeline_id_to_profile) {
384
1.02M
    const auto& local_params = _params.local_params[instance_idx];
385
1.02M
    auto fragment_instance_id = local_params.fragment_instance_id;
386
1.02M
    auto runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(false);
387
1.02M
    std::map<PipelineId, PipelineTask*> pipeline_id_to_task;
388
1.02M
    auto get_shared_state = [&](PipelinePtr pipeline)
389
1.02M
            -> std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
390
1.81M
                                       std::vector<std::shared_ptr<Dependency>>>> {
391
1.81M
        std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
392
1.81M
                                std::vector<std::shared_ptr<Dependency>>>>
393
1.81M
                shared_state_map;
394
2.43M
        for (auto& op : pipeline->operators()) {
395
2.43M
            auto source_id = op->operator_id();
396
2.43M
            if (auto iter = _op_id_to_shared_state.find(source_id);
397
2.43M
                iter != _op_id_to_shared_state.end()) {
398
688k
                shared_state_map.insert({source_id, iter->second});
399
688k
            }
400
2.43M
        }
401
1.81M
        for (auto sink_to_source_id : pipeline->sink()->dests_id()) {
402
1.81M
            if (auto iter = _op_id_to_shared_state.find(sink_to_source_id);
403
1.81M
                iter != _op_id_to_shared_state.end()) {
404
333k
                shared_state_map.insert({sink_to_source_id, iter->second});
405
333k
            }
406
1.81M
        }
407
1.81M
        return shared_state_map;
408
1.81M
    };
409
410
3.19M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
411
2.16M
        auto& pipeline = _pipelines[pip_idx];
412
2.16M
        if (pipeline->num_tasks() > 1 || instance_idx == 0) {
413
1.81M
            auto task_runtime_state = RuntimeState::create_unique(
414
1.81M
                    local_params.fragment_instance_id, _params.query_id, _params.fragment_id,
415
1.81M
                    _params.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get());
416
1.81M
            {
417
                // Initialize runtime state for this task
418
1.81M
                task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker());
419
420
1.81M
                task_runtime_state->set_task_execution_context(shared_from_this());
421
1.81M
                task_runtime_state->set_be_number(local_params.backend_num);
422
1.81M
                if (_need_notify_close) {
423
                    // rec cte require child rf to wait infinitely to make sure all rpc done
424
11.1k
                    task_runtime_state->set_force_make_rf_wait_infinite();
425
11.1k
                }
426
427
1.81M
                if (_params.__isset.backend_id) {
428
1.81M
                    task_runtime_state->set_backend_id(_params.backend_id);
429
1.81M
                }
430
1.81M
                if (_params.__isset.import_label) {
431
239
                    task_runtime_state->set_import_label(_params.import_label);
432
239
                }
433
1.81M
                if (_params.__isset.db_name) {
434
191
                    task_runtime_state->set_db_name(_params.db_name);
435
191
                }
436
1.81M
                if (_params.__isset.load_job_id) {
437
0
                    task_runtime_state->set_load_job_id(_params.load_job_id);
438
0
                }
439
1.81M
                if (_params.__isset.wal_id) {
440
114
                    task_runtime_state->set_wal_id(_params.wal_id);
441
114
                }
442
1.81M
                if (_params.__isset.content_length) {
443
31
                    task_runtime_state->set_content_length(_params.content_length);
444
31
                }
445
446
1.81M
                task_runtime_state->set_desc_tbl(_desc_tbl);
447
1.81M
                task_runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
448
1.81M
                task_runtime_state->set_num_per_fragment_instances(_params.num_senders);
449
1.81M
                task_runtime_state->resize_op_id_to_local_state(max_operator_id());
450
1.81M
                task_runtime_state->set_max_operator_id(max_operator_id());
451
1.81M
                task_runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
452
1.81M
                task_runtime_state->set_total_load_streams(_params.total_load_streams);
453
1.81M
                task_runtime_state->set_num_local_sink(_params.num_local_sink);
454
455
1.81M
                task_runtime_state->set_runtime_filter_mgr(runtime_filter_mgr.get());
456
1.81M
            }
457
1.81M
            auto cur_task_id = _total_tasks++;
458
1.81M
            task_runtime_state->set_task_id(cur_task_id);
459
1.81M
            task_runtime_state->set_task_num(pipeline->num_tasks());
460
1.81M
            auto task = std::make_shared<PipelineTask>(
461
1.81M
                    pipeline, cur_task_id, task_runtime_state.get(),
462
1.81M
                    std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
463
1.81M
                    pipeline_id_to_profile[pip_idx].get(), get_shared_state(pipeline),
464
1.81M
                    instance_idx);
465
1.81M
            pipeline->incr_created_tasks(instance_idx, task.get());
466
1.81M
            pipeline_id_to_task.insert({pipeline->id(), task.get()});
467
1.81M
            _tasks[instance_idx].emplace_back(
468
1.81M
                    std::pair<std::shared_ptr<PipelineTask>, std::unique_ptr<RuntimeState>> {
469
1.81M
                            std::move(task), std::move(task_runtime_state)});
470
1.81M
        }
471
2.16M
    }
472
473
    /**
474
         * Build DAG for pipeline tasks.
475
         * For example, we have
476
         *
477
         *   ExchangeSink (Pipeline1)     JoinBuildSink (Pipeline2)
478
         *            \                      /
479
         *          JoinProbeOperator1 (Pipeline1)    JoinBuildSink (Pipeline3)
480
         *                 \                          /
481
         *               JoinProbeOperator2 (Pipeline1)
482
         *
483
         * In this fragment, we have three pipelines and pipeline 1 depends on pipeline 2 and pipeline 3.
484
         * To build this DAG, `_dag` manage dependencies between pipelines by pipeline ID and
485
         * `pipeline_id_to_task` is used to find the task by a unique pipeline ID.
486
         *
487
         * Finally, we have two upstream dependencies in Pipeline1 corresponding to JoinProbeOperator1
488
         * and JoinProbeOperator2.
489
         */
490
2.16M
    for (auto& _pipeline : _pipelines) {
491
2.16M
        if (pipeline_id_to_task.contains(_pipeline->id())) {
492
1.81M
            auto* task = pipeline_id_to_task[_pipeline->id()];
493
1.81M
            DCHECK(task != nullptr);
494
495
            // If this task has upstream dependency, then inject it into this task.
496
1.81M
            if (_dag.contains(_pipeline->id())) {
497
1.13M
                auto& deps = _dag[_pipeline->id()];
498
1.81M
                for (auto& dep : deps) {
499
1.81M
                    if (pipeline_id_to_task.contains(dep)) {
500
1.10M
                        auto ss = pipeline_id_to_task[dep]->get_sink_shared_state();
501
1.10M
                        if (ss) {
502
446k
                            task->inject_shared_state(ss);
503
663k
                        } else {
504
663k
                            pipeline_id_to_task[dep]->inject_shared_state(
505
663k
                                    task->get_source_shared_state());
506
663k
                        }
507
1.10M
                    }
508
1.81M
                }
509
1.13M
            }
510
1.81M
        }
511
2.16M
    }
512
3.19M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
513
2.16M
        if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
514
1.81M
            auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
515
1.81M
            DCHECK(pipeline_id_to_profile[pip_idx]);
516
1.81M
            std::vector<TScanRangeParams> scan_ranges;
517
1.81M
            auto node_id = _pipelines[pip_idx]->operators().front()->node_id();
518
1.81M
            if (local_params.per_node_scan_ranges.contains(node_id)) {
519
322k
                scan_ranges = local_params.per_node_scan_ranges.find(node_id)->second;
520
322k
            }
521
1.81M
            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(task->prepare(scan_ranges, local_params.sender_id,
522
1.81M
                                                             _params.fragment.output_sink));
523
1.81M
        }
524
2.16M
    }
525
1.02M
    {
526
1.02M
        std::lock_guard<std::mutex> l(_state_map_lock);
527
1.02M
        _runtime_filter_mgr_map[instance_idx] = std::move(runtime_filter_mgr);
528
1.02M
    }
529
1.02M
    return Status::OK();
530
1.02M
}
531
532
349k
Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
533
349k
    _total_tasks = 0;
534
349k
    _closed_tasks = 0;
535
349k
    const auto target_size = _params.local_params.size();
536
349k
    _tasks.resize(target_size);
537
349k
    _runtime_filter_mgr_map.resize(target_size);
538
907k
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
539
558k
        _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get();
540
558k
    }
541
349k
    auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size());
542
543
349k
    if (target_size > 1 &&
544
349k
        (_runtime_state->query_options().__isset.parallel_prepare_threshold &&
545
125k
         target_size > _runtime_state->query_options().parallel_prepare_threshold)) {
546
        // If instances parallelism is big enough ( > parallel_prepare_threshold), we will prepare all tasks by multi-threads
547
24.2k
        std::vector<Status> prepare_status(target_size);
548
24.2k
        int submitted_tasks = 0;
549
24.2k
        Status submit_status;
550
24.2k
        CountDownLatch latch((int)target_size);
551
319k
        for (int i = 0; i < target_size; i++) {
552
295k
            submit_status = thread_pool->submit_func([&, i]() {
553
295k
                SCOPED_ATTACH_TASK(_query_ctx.get());
554
295k
                prepare_status[i] = _build_pipeline_tasks_for_instance(i, pipeline_id_to_profile);
555
295k
                latch.count_down();
556
295k
            });
557
295k
            if (LIKELY(submit_status.ok())) {
558
295k
                submitted_tasks++;
559
18.4E
            } else {
560
18.4E
                break;
561
18.4E
            }
562
295k
        }
563
24.2k
        latch.arrive_and_wait(target_size - submitted_tasks);
564
24.2k
        if (UNLIKELY(!submit_status.ok())) {
565
0
            return submit_status;
566
0
        }
567
319k
        for (int i = 0; i < submitted_tasks; i++) {
568
295k
            if (!prepare_status[i].ok()) {
569
0
                return prepare_status[i];
570
0
            }
571
295k
        }
572
325k
    } else {
573
1.05M
        for (int i = 0; i < target_size; i++) {
574
731k
            RETURN_IF_ERROR(_build_pipeline_tasks_for_instance(i, pipeline_id_to_profile));
575
731k
        }
576
325k
    }
577
349k
    _pipeline_parent_map.clear();
578
349k
    _op_id_to_shared_state.clear();
579
580
349k
    return Status::OK();
581
349k
}
582
583
345k
void PipelineFragmentContext::_init_next_report_time() {
584
345k
    auto interval_s = config::pipeline_status_report_interval;
585
345k
    if (_is_report_success && interval_s > 0 && _timeout > interval_s) {
586
34.6k
        VLOG_FILE << "enable period report: fragment id=" << _fragment_id;
587
34.6k
        uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC;
588
        // We don't want to wait longer than it takes to run the entire fragment.
589
34.6k
        _previous_report_time =
590
34.6k
                MonotonicNanos() + report_fragment_offset - (uint64_t)(interval_s)*NANOS_PER_SEC;
591
34.6k
        _disable_period_report = false;
592
34.6k
    }
593
345k
}
594
595
4.11k
void PipelineFragmentContext::refresh_next_report_time() {
596
4.11k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
597
4.11k
    DCHECK(disable == true);
598
4.11k
    _previous_report_time.store(MonotonicNanos(), std::memory_order_release);
599
4.11k
    _disable_period_report.compare_exchange_strong(disable, false);
600
4.11k
}
601
602
6.29M
void PipelineFragmentContext::trigger_report_if_necessary() {
603
6.29M
    if (!_is_report_success) {
604
5.71M
        return;
605
5.71M
    }
606
585k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
607
585k
    if (disable) {
608
9.50k
        return;
609
9.50k
    }
610
576k
    int32_t interval_s = config::pipeline_status_report_interval;
611
576k
    if (interval_s <= 0) {
612
0
        LOG(WARNING) << "config::status_report_interval is equal to or less than zero, do not "
613
0
                        "trigger "
614
0
                        "report.";
615
0
    }
616
576k
    uint64_t next_report_time = _previous_report_time.load(std::memory_order_acquire) +
617
576k
                                (uint64_t)(interval_s)*NANOS_PER_SEC;
618
576k
    if (MonotonicNanos() > next_report_time) {
619
4.12k
        if (!_disable_period_report.compare_exchange_strong(disable, true,
620
4.12k
                                                            std::memory_order_acq_rel)) {
621
6
            return;
622
6
        }
623
4.11k
        if (VLOG_FILE_IS_ON) {
624
0
            VLOG_FILE << "Reporting "
625
0
                      << "profile for query_id " << print_id(_query_id)
626
0
                      << ", fragment id: " << _fragment_id;
627
628
0
            std::stringstream ss;
629
0
            _runtime_state->runtime_profile()->compute_time_in_profile();
630
0
            _runtime_state->runtime_profile()->pretty_print(&ss);
631
0
            if (_runtime_state->load_channel_profile()) {
632
0
                _runtime_state->load_channel_profile()->pretty_print(&ss);
633
0
            }
634
635
0
            VLOG_FILE << "Query " << print_id(get_query_id()) << " fragment " << get_fragment_id()
636
0
                      << " profile:\n"
637
0
                      << ss.str();
638
0
        }
639
4.11k
        auto st = send_report(false);
640
4.11k
        if (!st.ok()) {
641
0
            disable = true;
642
0
            _disable_period_report.compare_exchange_strong(disable, false,
643
0
                                                           std::memory_order_acq_rel);
644
0
        }
645
4.11k
    }
646
576k
}
647
648
Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const DescriptorTbl& descs,
649
348k
                                                 OperatorPtr* root, PipelinePtr cur_pipe) {
650
348k
    if (_params.fragment.plan.nodes.empty()) {
651
0
        throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no plan node!");
652
0
    }
653
654
348k
    int node_idx = 0;
655
656
348k
    RETURN_IF_ERROR(_create_tree_helper(pool, _params.fragment.plan.nodes, descs, nullptr,
657
348k
                                        &node_idx, root, cur_pipe, 0, false, false));
658
659
348k
    if (node_idx + 1 != _params.fragment.plan.nodes.size()) {
660
0
        return Status::InternalError(
661
0
                "Plan tree only partially reconstructed. Not all thrift nodes were used.");
662
0
    }
663
348k
    return Status::OK();
664
348k
}
665
666
Status PipelineFragmentContext::_create_tree_helper(
667
        ObjectPool* pool, const std::vector<TPlanNode>& tnodes, const DescriptorTbl& descs,
668
        OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx,
669
564k
        const bool followed_by_shuffled_operator, const bool require_bucket_distribution) {
670
    // propagate error case
671
564k
    if (*node_idx >= tnodes.size()) {
672
0
        return Status::InternalError(
673
0
                "Failed to reconstruct plan tree from thrift. Node id: {}, number of nodes: {}",
674
0
                *node_idx, tnodes.size());
675
0
    }
676
564k
    const TPlanNode& tnode = tnodes[*node_idx];
677
678
564k
    int num_children = tnodes[*node_idx].num_children;
679
564k
    bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
680
564k
    bool current_require_bucket_distribution = require_bucket_distribution;
681
    // TODO: Create CacheOperator is confused now
682
564k
    OperatorPtr op = nullptr;
683
564k
    OperatorPtr cache_op = nullptr;
684
564k
    RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op, cur_pipe,
685
564k
                                     parent == nullptr ? -1 : parent->node_id(), child_idx,
686
564k
                                     followed_by_shuffled_operator,
687
564k
                                     current_require_bucket_distribution, cache_op));
688
    // Initialization must be done here. For example, group by expressions in agg will be used to
689
    // decide if a local shuffle should be planed, so it must be initialized here.
690
564k
    RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
691
    // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
692
564k
    if (parent != nullptr) {
693
        // add to parent's child(s)
694
215k
        RETURN_IF_ERROR(parent->set_child(cache_op ? cache_op : op));
695
348k
    } else {
696
348k
        *root = op;
697
348k
    }
698
    /**
699
     * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
700
     *
701
     * For plan:
702
     * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
703
     *                           Exchange(id=3) -> ShuffledHashJoinBuild(id=2)
704
     * We must ensure data distribution of `LocalExchange(id=0)` is same as Exchange(id=3).
705
     *
706
     * If an operator's is followed by a local exchange without shuffle (e.g. passthrough), a
707
     * shuffled local exchanger will be used before join so it is not followed by shuffle join.
708
     */
709
564k
    auto required_data_distribution =
710
564k
            cur_pipe->operators().empty()
711
564k
                    ? cur_pipe->sink()->required_data_distribution(_runtime_state.get())
712
564k
                    : op->required_data_distribution(_runtime_state.get());
713
564k
    current_followed_by_shuffled_operator =
714
564k
            ((followed_by_shuffled_operator ||
715
564k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_shuffled_operator()
716
502k
                                             : op->is_shuffled_operator())) &&
717
564k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
718
564k
            (followed_by_shuffled_operator &&
719
448k
             required_data_distribution.distribution_type == ExchangeType::NOOP);
720
721
564k
    current_require_bucket_distribution =
722
564k
            ((require_bucket_distribution ||
723
564k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_colocated_operator()
724
506k
                                             : op->is_colocated_operator())) &&
725
564k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
726
564k
            (require_bucket_distribution &&
727
454k
             required_data_distribution.distribution_type == ExchangeType::NOOP);
728
729
564k
    if (num_children == 0) {
730
362k
        _use_serial_source = op->is_serial_operator();
731
362k
    }
732
    // rely on that tnodes is preorder of the plan
733
780k
    for (int i = 0; i < num_children; i++) {
734
216k
        ++*node_idx;
735
216k
        RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, descs, op, node_idx, nullptr, cur_pipe, i,
736
216k
                                            current_followed_by_shuffled_operator,
737
216k
                                            current_require_bucket_distribution));
738
739
        // we are expecting a child, but have used all nodes
740
        // this means we have been given a bad tree and must fail
741
216k
        if (*node_idx >= tnodes.size()) {
742
0
            return Status::InternalError(
743
0
                    "Failed to reconstruct plan tree from thrift. Node id: {}, number of "
744
0
                    "nodes: {}",
745
0
                    *node_idx, tnodes.size());
746
0
        }
747
216k
    }
748
749
564k
    return Status::OK();
750
564k
}
751
752
void PipelineFragmentContext::_inherit_pipeline_properties(
753
        const DataDistribution& data_distribution, PipelinePtr pipe_with_source,
754
104k
        PipelinePtr pipe_with_sink) {
755
104k
    pipe_with_sink->set_num_tasks(pipe_with_source->num_tasks());
756
104k
    pipe_with_source->set_num_tasks(_num_instances);
757
104k
    pipe_with_source->set_data_distribution(data_distribution);
758
104k
}
759
760
Status PipelineFragmentContext::_add_local_exchange_impl(
761
        int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip,
762
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
763
        const std::map<int, int>& bucket_seq_to_instance_idx,
764
104k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
765
104k
    auto& operators = cur_pipe->operators();
766
104k
    const auto downstream_pipeline_id = cur_pipe->id();
767
104k
    auto local_exchange_id = next_operator_id();
768
    // 1. Create a new pipeline with local exchange sink.
769
104k
    DataSinkOperatorPtr sink;
770
104k
    auto sink_id = next_sink_operator_id();
771
772
    /**
773
     * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
774
     * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
775
     */
776
104k
    const bool followed_by_shuffled_operator =
777
104k
            operators.size() > idx ? operators[idx]->followed_by_shuffled_operator()
778
104k
                                   : cur_pipe->sink()->followed_by_shuffled_operator();
779
104k
    const bool use_global_hash_shuffle = bucket_seq_to_instance_idx.empty() &&
780
104k
                                         !shuffle_idx_to_instance_idx.contains(-1) &&
781
104k
                                         followed_by_shuffled_operator && !_use_serial_source;
782
104k
    sink = std::make_shared<LocalExchangeSinkOperatorX>(
783
104k
            sink_id, local_exchange_id, use_global_hash_shuffle ? _total_instances : _num_instances,
784
104k
            data_distribution.partition_exprs, bucket_seq_to_instance_idx);
785
104k
    if (bucket_seq_to_instance_idx.empty() &&
786
104k
        data_distribution.distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE) {
787
8
        data_distribution.distribution_type = ExchangeType::HASH_SHUFFLE;
788
8
    }
789
104k
    RETURN_IF_ERROR(new_pip->set_sink(sink));
790
104k
    RETURN_IF_ERROR(new_pip->sink()->init(_runtime_state.get(), data_distribution.distribution_type,
791
104k
                                          num_buckets, use_global_hash_shuffle,
792
104k
                                          shuffle_idx_to_instance_idx));
793
794
    // 2. Create and initialize LocalExchangeSharedState.
795
104k
    std::shared_ptr<LocalExchangeSharedState> shared_state =
796
104k
            LocalExchangeSharedState::create_shared(_num_instances);
797
104k
    switch (data_distribution.distribution_type) {
798
20.1k
    case ExchangeType::HASH_SHUFFLE:
799
20.1k
        shared_state->exchanger = ShuffleExchanger::create_unique(
800
20.1k
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
801
20.1k
                use_global_hash_shuffle ? _total_instances : _num_instances,
802
20.1k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
803
20.1k
                        ? cast_set<int>(
804
20.1k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
805
20.1k
                        : 0);
806
20.1k
        break;
807
437
    case ExchangeType::BUCKET_HASH_SHUFFLE:
808
437
        shared_state->exchanger = BucketShuffleExchanger::create_unique(
809
437
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, num_buckets,
810
437
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
811
437
                        ? cast_set<int>(
812
437
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
813
437
                        : 0);
814
437
        break;
815
80.5k
    case ExchangeType::PASSTHROUGH:
816
80.5k
        shared_state->exchanger = PassthroughExchanger::create_unique(
817
80.5k
                cur_pipe->num_tasks(), _num_instances,
818
80.5k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
819
80.5k
                        ? cast_set<int>(
820
80.4k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
821
80.5k
                        : 0);
822
80.5k
        break;
823
477
    case ExchangeType::BROADCAST:
824
477
        shared_state->exchanger = BroadcastExchanger::create_unique(
825
477
                cur_pipe->num_tasks(), _num_instances,
826
477
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
827
477
                        ? cast_set<int>(
828
477
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
829
477
                        : 0);
830
477
        break;
831
2.33k
    case ExchangeType::PASS_TO_ONE:
832
2.33k
        if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
833
            // If shared hash table is enabled for BJ, hash table will be built by only one task
834
1.14k
            shared_state->exchanger = PassToOneExchanger::create_unique(
835
1.14k
                    cur_pipe->num_tasks(), _num_instances,
836
1.14k
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
837
1.14k
                            ? cast_set<int>(_runtime_state->query_options()
838
1.14k
                                                    .local_exchange_free_blocks_limit)
839
1.14k
                            : 0);
840
1.18k
        } else {
841
1.18k
            shared_state->exchanger = BroadcastExchanger::create_unique(
842
1.18k
                    cur_pipe->num_tasks(), _num_instances,
843
1.18k
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
844
1.18k
                            ? cast_set<int>(_runtime_state->query_options()
845
1.18k
                                                    .local_exchange_free_blocks_limit)
846
1.18k
                            : 0);
847
1.18k
        }
848
2.33k
        break;
849
888
    case ExchangeType::ADAPTIVE_PASSTHROUGH:
850
888
        shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
851
888
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
852
888
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
853
888
                        ? cast_set<int>(
854
888
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
855
888
                        : 0);
856
888
        break;
857
0
    default:
858
0
        return Status::InternalError("Unsupported local exchange type : " +
859
0
                                     std::to_string((int)data_distribution.distribution_type));
860
104k
    }
861
104k
    shared_state->create_source_dependencies(_num_instances, local_exchange_id, local_exchange_id,
862
104k
                                             "LOCAL_EXCHANGE_OPERATOR");
863
104k
    shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
864
104k
    _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
865
866
    // 3. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to
867
    // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink].
868
869
    // 3.1 Initialize new pipeline's operator list.
870
104k
    std::copy(operators.begin(), operators.begin() + idx,
871
104k
              std::inserter(new_pip->operators(), new_pip->operators().end()));
872
873
    // 3.2 Erase unused operators in previous pipeline.
874
104k
    operators.erase(operators.begin(), operators.begin() + idx);
875
876
    // 4. Initialize LocalExchangeSource and insert it into this pipeline.
877
104k
    OperatorPtr source_op;
878
104k
    source_op = std::make_shared<LocalExchangeSourceOperatorX>(pool, local_exchange_id);
879
104k
    RETURN_IF_ERROR(source_op->set_child(new_pip->operators().back()));
880
104k
    RETURN_IF_ERROR(source_op->init(data_distribution.distribution_type));
881
104k
    if (!operators.empty()) {
882
43.9k
        RETURN_IF_ERROR(operators.front()->set_child(nullptr));
883
43.9k
        RETURN_IF_ERROR(operators.front()->set_child(source_op));
884
43.9k
    }
885
104k
    operators.insert(operators.begin(), source_op);
886
887
    // 5. Set children for two pipelines separately.
888
104k
    std::vector<std::shared_ptr<Pipeline>> new_children;
889
104k
    std::vector<PipelineId> edges_with_source;
890
122k
    for (auto child : cur_pipe->children()) {
891
122k
        bool found = false;
892
137k
        for (auto op : new_pip->operators()) {
893
137k
            if (child->sink()->node_id() == op->node_id()) {
894
12.6k
                new_pip->set_children(child);
895
12.6k
                found = true;
896
12.6k
            };
897
137k
        }
898
122k
        if (!found) {
899
109k
            new_children.push_back(child);
900
109k
            edges_with_source.push_back(child->id());
901
109k
        }
902
122k
    }
903
104k
    new_children.push_back(new_pip);
904
104k
    edges_with_source.push_back(new_pip->id());
905
906
    // 6. Set DAG for new pipelines.
907
104k
    if (!new_pip->children().empty()) {
908
7.05k
        std::vector<PipelineId> edges_with_sink;
909
12.6k
        for (auto child : new_pip->children()) {
910
12.6k
            edges_with_sink.push_back(child->id());
911
12.6k
        }
912
7.05k
        _dag.insert({new_pip->id(), edges_with_sink});
913
7.05k
    }
914
104k
    cur_pipe->set_children(new_children);
915
104k
    _dag[downstream_pipeline_id] = edges_with_source;
916
104k
    RETURN_IF_ERROR(new_pip->sink()->set_child(new_pip->operators().back()));
917
104k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(nullptr));
918
104k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(cur_pipe->operators().back()));
919
920
    // 7. Inherit properties from current pipeline.
921
104k
    _inherit_pipeline_properties(data_distribution, cur_pipe, new_pip);
922
104k
    return Status::OK();
923
104k
}
924
925
Status PipelineFragmentContext::_add_local_exchange(
926
        int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe,
927
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
928
        const std::map<int, int>& bucket_seq_to_instance_idx,
929
168k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
930
168k
    if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) {
931
36.4k
        return Status::OK();
932
36.4k
    }
933
934
132k
    if (!cur_pipe->need_to_local_exchange(data_distribution, idx)) {
935
48.0k
        return Status::OK();
936
48.0k
    }
937
84.3k
    *do_local_exchange = true;
938
939
84.3k
    auto& operators = cur_pipe->operators();
940
84.3k
    auto total_op_num = operators.size();
941
84.3k
    auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
942
84.3k
    RETURN_IF_ERROR(_add_local_exchange_impl(
943
84.3k
            idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets,
944
84.3k
            bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
945
946
18.4E
    CHECK(total_op_num + 1 == cur_pipe->operators().size() + new_pip->operators().size())
947
18.4E
            << "total_op_num: " << total_op_num
948
18.4E
            << " cur_pipe->operators().size(): " << cur_pipe->operators().size()
949
18.4E
            << " new_pip->operators().size(): " << new_pip->operators().size();
950
951
    // There are some local shuffles with relatively heavy operations on the sink.
952
    // If the local sink concurrency is 1 and the local source concurrency is n, the sink becomes a bottleneck.
953
    // Therefore, local passthrough is used to increase the concurrency of the sink.
954
    // op -> local sink(1) -> local source (n)
955
    // op -> local passthrough(1) -> local passthrough(n) ->  local sink(n) -> local source (n)
956
84.5k
    if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 &&
957
84.3k
        Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) {
958
20.2k
        RETURN_IF_ERROR(_add_local_exchange_impl(
959
20.2k
                cast_set<int>(new_pip->operators().size()), pool, new_pip,
960
20.2k
                add_pipeline(new_pip, pip_idx + 2), DataDistribution(ExchangeType::PASSTHROUGH),
961
20.2k
                do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
962
20.2k
                shuffle_idx_to_instance_idx));
963
20.2k
    }
964
84.3k
    return Status::OK();
965
84.3k
}
966
967
Status PipelineFragmentContext::_plan_local_exchange(
968
        int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx,
969
347k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
970
799k
    for (int pip_idx = cast_set<int>(_pipelines.size()) - 1; pip_idx >= 0; pip_idx--) {
971
451k
        _pipelines[pip_idx]->init_data_distribution(_runtime_state.get());
972
        // Set property if child pipeline is not join operator's child.
973
451k
        if (!_pipelines[pip_idx]->children().empty()) {
974
101k
            for (auto& child : _pipelines[pip_idx]->children()) {
975
101k
                if (child->sink()->node_id() ==
976
101k
                    _pipelines[pip_idx]->operators().front()->node_id()) {
977
89.3k
                    _pipelines[pip_idx]->set_data_distribution(child->data_distribution());
978
89.3k
                }
979
101k
            }
980
97.1k
        }
981
982
        // if 'num_buckets == 0' means the fragment is colocated by exchange node not the
983
        // scan node. so here use `_num_instance` to replace the `num_buckets` to prevent dividing 0
984
        // still keep colocate plan after local shuffle
985
451k
        RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx, _pipelines[pip_idx],
986
451k
                                             bucket_seq_to_instance_idx,
987
451k
                                             shuffle_idx_to_instance_idx));
988
451k
    }
989
347k
    return Status::OK();
990
347k
}
991
992
Status PipelineFragmentContext::_plan_local_exchange(
993
        int num_buckets, int pip_idx, PipelinePtr pip,
994
        const std::map<int, int>& bucket_seq_to_instance_idx,
995
450k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
996
450k
    int idx = 1;
997
450k
    bool do_local_exchange = false;
998
494k
    do {
999
494k
        auto& ops = pip->operators();
1000
494k
        do_local_exchange = false;
1001
        // Plan local exchange for each operator.
1002
566k
        for (; idx < ops.size();) {
1003
115k
            if (ops[idx]->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1004
104k
                RETURN_IF_ERROR(_add_local_exchange(
1005
104k
                        pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip,
1006
104k
                        ops[idx]->required_data_distribution(_runtime_state.get()),
1007
104k
                        &do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
1008
104k
                        shuffle_idx_to_instance_idx));
1009
104k
            }
1010
115k
            if (do_local_exchange) {
1011
                // If local exchange is needed for current operator, we will split this pipeline to
1012
                // two pipelines by local exchange sink/source. And then we need to process remaining
1013
                // operators in this pipeline so we set idx to 2 (0 is local exchange source and 1
1014
                // is current operator was already processed) and continue to plan local exchange.
1015
43.9k
                idx = 2;
1016
43.9k
                break;
1017
43.9k
            }
1018
71.4k
            idx++;
1019
71.4k
        }
1020
494k
    } while (do_local_exchange);
1021
450k
    if (pip->sink()->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1022
63.9k
        RETURN_IF_ERROR(_add_local_exchange(
1023
63.9k
                pip_idx, idx, pip->sink()->node_id(), _runtime_state->obj_pool(), pip,
1024
63.9k
                pip->sink()->required_data_distribution(_runtime_state.get()), &do_local_exchange,
1025
63.9k
                num_buckets, bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1026
63.9k
    }
1027
450k
    return Status::OK();
1028
450k
}
1029
1030
Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
1031
                                                  const std::vector<TExpr>& output_exprs,
1032
                                                  const TPipelineFragmentParams& params,
1033
                                                  const RowDescriptor& row_desc,
1034
                                                  RuntimeState* state, DescriptorTbl& desc_tbl,
1035
349k
                                                  PipelineId cur_pipeline_id) {
1036
349k
    switch (thrift_sink.type) {
1037
127k
    case TDataSinkType::DATA_STREAM_SINK: {
1038
127k
        if (!thrift_sink.__isset.stream_sink) {
1039
0
            return Status::InternalError("Missing data stream sink.");
1040
0
        }
1041
127k
        _sink = std::make_shared<ExchangeSinkOperatorX>(
1042
127k
                state, row_desc, next_sink_operator_id(), thrift_sink.stream_sink,
1043
127k
                params.destinations, _fragment_instance_ids);
1044
127k
        break;
1045
127k
    }
1046
192k
    case TDataSinkType::RESULT_SINK: {
1047
192k
        if (!thrift_sink.__isset.result_sink) {
1048
0
            return Status::InternalError("Missing data buffer sink.");
1049
0
        }
1050
1051
192k
        _sink = std::make_shared<ResultSinkOperatorX>(next_sink_operator_id(), row_desc,
1052
192k
                                                      output_exprs, thrift_sink.result_sink);
1053
192k
        break;
1054
192k
    }
1055
105
    case TDataSinkType::DICTIONARY_SINK: {
1056
105
        if (!thrift_sink.__isset.dictionary_sink) {
1057
0
            return Status::InternalError("Missing dict sink.");
1058
0
        }
1059
1060
105
        _sink = std::make_shared<DictSinkOperatorX>(next_sink_operator_id(), row_desc, output_exprs,
1061
105
                                                    thrift_sink.dictionary_sink);
1062
105
        break;
1063
105
    }
1064
0
    case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
1065
28.5k
    case TDataSinkType::OLAP_TABLE_SINK: {
1066
28.5k
        if (state->query_options().enable_memtable_on_sink_node &&
1067
28.5k
            !_has_inverted_index_v1_or_partial_update(thrift_sink.olap_table_sink) &&
1068
28.5k
            !config::is_cloud_mode()) {
1069
126
            _sink = std::make_shared<OlapTableSinkV2OperatorX>(pool, next_sink_operator_id(),
1070
126
                                                               row_desc, output_exprs);
1071
28.4k
        } else {
1072
28.4k
            _sink = std::make_shared<OlapTableSinkOperatorX>(pool, next_sink_operator_id(),
1073
28.4k
                                                             row_desc, output_exprs);
1074
28.4k
        }
1075
28.5k
        break;
1076
0
    }
1077
165
    case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
1078
165
        DCHECK(thrift_sink.__isset.olap_table_sink);
1079
165
        DCHECK(state->get_query_ctx() != nullptr);
1080
165
        state->get_query_ctx()->query_mem_tracker()->is_group_commit_load = true;
1081
165
        _sink = std::make_shared<GroupCommitBlockSinkOperatorX>(next_sink_operator_id(), row_desc,
1082
165
                                                                output_exprs);
1083
165
        break;
1084
0
    }
1085
0
    case TDataSinkType::HIVE_TABLE_SINK: {
1086
0
        if (!thrift_sink.__isset.hive_table_sink) {
1087
0
            return Status::InternalError("Missing hive table sink.");
1088
0
        }
1089
0
        _sink = std::make_shared<HiveTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1090
0
                                                         output_exprs);
1091
0
        break;
1092
0
    }
1093
0
    case TDataSinkType::ICEBERG_TABLE_SINK: {
1094
0
        if (!thrift_sink.__isset.iceberg_table_sink) {
1095
0
            return Status::InternalError("Missing iceberg table sink.");
1096
0
        }
1097
0
        if (thrift_sink.iceberg_table_sink.__isset.sort_info) {
1098
0
            _sink = std::make_shared<SpillIcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1099
0
                                                                     row_desc, output_exprs);
1100
0
        } else {
1101
0
            _sink = std::make_shared<IcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1102
0
                                                                row_desc, output_exprs);
1103
0
        }
1104
0
        break;
1105
0
    }
1106
0
    case TDataSinkType::ICEBERG_DELETE_SINK: {
1107
0
        if (!thrift_sink.__isset.iceberg_delete_sink) {
1108
0
            return Status::InternalError("Missing iceberg delete sink.");
1109
0
        }
1110
0
        _sink = std::make_shared<IcebergDeleteSinkOperatorX>(pool, next_sink_operator_id(),
1111
0
                                                             row_desc, output_exprs);
1112
0
        break;
1113
0
    }
1114
0
    case TDataSinkType::ICEBERG_MERGE_SINK: {
1115
0
        if (!thrift_sink.__isset.iceberg_merge_sink) {
1116
0
            return Status::InternalError("Missing iceberg merge sink.");
1117
0
        }
1118
0
        _sink = std::make_shared<IcebergMergeSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1119
0
                                                            output_exprs);
1120
0
        break;
1121
0
    }
1122
0
    case TDataSinkType::MAXCOMPUTE_TABLE_SINK: {
1123
0
        if (!thrift_sink.__isset.max_compute_table_sink) {
1124
0
            return Status::InternalError("Missing max compute table sink.");
1125
0
        }
1126
0
        _sink = std::make_shared<MCTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1127
0
                                                       output_exprs);
1128
0
        break;
1129
0
    }
1130
0
    case TDataSinkType::JDBC_TABLE_SINK: {
1131
0
        if (!thrift_sink.__isset.jdbc_table_sink) {
1132
0
            return Status::InternalError("Missing data jdbc sink.");
1133
0
        }
1134
0
        if (config::enable_java_support) {
1135
0
            _sink = std::make_shared<JdbcTableSinkOperatorX>(row_desc, next_sink_operator_id(),
1136
0
                                                             output_exprs);
1137
0
        } else {
1138
0
            return Status::InternalError(
1139
0
                    "Jdbc table sink is not enabled, you can change be config "
1140
0
                    "enable_java_support to true and restart be.");
1141
0
        }
1142
0
        break;
1143
0
    }
1144
3
    case TDataSinkType::MEMORY_SCRATCH_SINK: {
1145
3
        if (!thrift_sink.__isset.memory_scratch_sink) {
1146
0
            return Status::InternalError("Missing data buffer sink.");
1147
0
        }
1148
1149
3
        _sink = std::make_shared<MemoryScratchSinkOperatorX>(row_desc, next_sink_operator_id(),
1150
3
                                                             output_exprs);
1151
3
        break;
1152
3
    }
1153
320
    case TDataSinkType::RESULT_FILE_SINK: {
1154
320
        if (!thrift_sink.__isset.result_file_sink) {
1155
0
            return Status::InternalError("Missing result file sink.");
1156
0
        }
1157
1158
        // Result file sink is not the top sink
1159
321
        if (params.__isset.destinations && !params.destinations.empty()) {
1160
0
            _sink = std::make_shared<ResultFileSinkOperatorX>(
1161
0
                    next_sink_operator_id(), row_desc, thrift_sink.result_file_sink,
1162
0
                    params.destinations, output_exprs, desc_tbl);
1163
320
        } else {
1164
320
            _sink = std::make_shared<ResultFileSinkOperatorX>(next_sink_operator_id(), row_desc,
1165
320
                                                              output_exprs);
1166
320
        }
1167
320
        break;
1168
320
    }
1169
995
    case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
1170
995
        DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
1171
995
        DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
1172
995
        auto sink_id = next_sink_operator_id();
1173
995
        const int multi_cast_node_id = sink_id;
1174
995
        auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size();
1175
        // one sink has multiple sources.
1176
995
        std::vector<int> sources;
1177
3.76k
        for (int i = 0; i < sender_size; ++i) {
1178
2.76k
            auto source_id = next_operator_id();
1179
2.76k
            sources.push_back(source_id);
1180
2.76k
        }
1181
1182
995
        _sink = std::make_shared<MultiCastDataStreamSinkOperatorX>(
1183
995
                sink_id, multi_cast_node_id, sources, pool, thrift_sink.multi_cast_stream_sink);
1184
3.76k
        for (int i = 0; i < sender_size; ++i) {
1185
2.76k
            auto new_pipeline = add_pipeline();
1186
            // use to exchange sink
1187
2.76k
            RowDescriptor* exchange_row_desc = nullptr;
1188
2.76k
            {
1189
2.76k
                const auto& tmp_row_desc =
1190
2.76k
                        !thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty()
1191
2.76k
                                ? RowDescriptor(state->desc_tbl(),
1192
2.76k
                                                {thrift_sink.multi_cast_stream_sink.sinks[i]
1193
2.76k
                                                         .output_tuple_id})
1194
2.76k
                                : row_desc;
1195
2.76k
                exchange_row_desc = pool->add(new RowDescriptor(tmp_row_desc));
1196
2.76k
            }
1197
2.76k
            auto source_id = sources[i];
1198
2.76k
            OperatorPtr source_op;
1199
            // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
1200
2.76k
            source_op = std::make_shared<MultiCastDataStreamerSourceOperatorX>(
1201
2.76k
                    /*node_id*/ source_id, /*consumer_id*/ i, pool,
1202
2.76k
                    thrift_sink.multi_cast_stream_sink.sinks[i], row_desc,
1203
2.76k
                    /*operator_id=*/source_id);
1204
2.76k
            RETURN_IF_ERROR(new_pipeline->add_operator(
1205
2.76k
                    source_op, params.__isset.parallel_instances ? params.parallel_instances : 0));
1206
            // 2. create and set sink operator of data stream sender for new pipeline
1207
1208
2.76k
            DataSinkOperatorPtr sink_op;
1209
2.76k
            sink_op = std::make_shared<ExchangeSinkOperatorX>(
1210
2.76k
                    state, *exchange_row_desc, next_sink_operator_id(),
1211
2.76k
                    thrift_sink.multi_cast_stream_sink.sinks[i],
1212
2.76k
                    thrift_sink.multi_cast_stream_sink.destinations[i], _fragment_instance_ids);
1213
1214
2.76k
            RETURN_IF_ERROR(new_pipeline->set_sink(sink_op));
1215
2.76k
            {
1216
2.76k
                TDataSink* t = pool->add(new TDataSink());
1217
2.76k
                t->stream_sink = thrift_sink.multi_cast_stream_sink.sinks[i];
1218
2.76k
                RETURN_IF_ERROR(sink_op->init(*t));
1219
2.76k
            }
1220
1221
            // 3. set dependency dag
1222
2.76k
            _dag[new_pipeline->id()].push_back(cur_pipeline_id);
1223
2.76k
        }
1224
995
        if (sources.empty()) {
1225
0
            return Status::InternalError("size of sources must be greater than 0");
1226
0
        }
1227
995
        break;
1228
995
    }
1229
995
    case TDataSinkType::BLACKHOLE_SINK: {
1230
3
        if (!thrift_sink.__isset.blackhole_sink) {
1231
0
            return Status::InternalError("Missing blackhole sink.");
1232
0
        }
1233
1234
3
        _sink.reset(new BlackholeSinkOperatorX(next_sink_operator_id()));
1235
3
        break;
1236
3
    }
1237
0
    case TDataSinkType::TVF_TABLE_SINK: {
1238
0
        if (!thrift_sink.__isset.tvf_table_sink) {
1239
0
            return Status::InternalError("Missing TVF table sink.");
1240
0
        }
1241
0
        _sink = std::make_shared<TVFTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1242
0
                                                        output_exprs);
1243
0
        break;
1244
0
    }
1245
0
    default:
1246
0
        return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type);
1247
349k
    }
1248
348k
    return Status::OK();
1249
349k
}
1250
1251
// NOLINTBEGIN(readability-function-size)
1252
// NOLINTBEGIN(readability-function-cognitive-complexity)
1253
Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNode& tnode,
1254
                                                 const DescriptorTbl& descs, OperatorPtr& op,
1255
                                                 PipelinePtr& cur_pipe, int parent_idx,
1256
                                                 int child_idx,
1257
                                                 const bool followed_by_shuffled_operator,
1258
                                                 const bool require_bucket_distribution,
1259
566k
                                                 OperatorPtr& cache_op) {
1260
566k
    std::vector<DataSinkOperatorPtr> sink_ops;
1261
566k
    Defer defer = Defer([&]() {
1262
565k
        if (op) {
1263
565k
            op->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1264
565k
        }
1265
564k
        for (auto& s : sink_ops) {
1266
100k
            s->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1267
100k
        }
1268
564k
    });
1269
    // We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
1270
    // Therefore, here we need to use a stack-like structure.
1271
566k
    _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
1272
566k
    std::stringstream error_msg;
1273
566k
    bool enable_query_cache = _params.fragment.__isset.query_cache_param;
1274
1275
566k
    bool fe_with_old_version = false;
1276
566k
    switch (tnode.node_type) {
1277
173k
    case TPlanNodeType::OLAP_SCAN_NODE: {
1278
173k
        op = std::make_shared<OlapScanOperatorX>(
1279
173k
                pool, tnode, next_operator_id(), descs, _num_instances,
1280
173k
                enable_query_cache ? _params.fragment.query_cache_param : TQueryCacheParam {});
1281
173k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1282
173k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1283
173k
        break;
1284
173k
    }
1285
77
    case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
1286
77
        DCHECK(_query_ctx != nullptr);
1287
77
        _query_ctx->query_mem_tracker()->is_group_commit_load = true;
1288
77
        op = std::make_shared<GroupCommitOperatorX>(pool, tnode, next_operator_id(), descs,
1289
77
                                                    _num_instances);
1290
77
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1291
77
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1292
77
        break;
1293
77
    }
1294
0
    case TPlanNodeType::JDBC_SCAN_NODE: {
1295
0
        if (config::enable_java_support) {
1296
0
            op = std::make_shared<JDBCScanOperatorX>(pool, tnode, next_operator_id(), descs,
1297
0
                                                     _num_instances);
1298
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1299
0
        } else {
1300
0
            return Status::InternalError(
1301
0
                    "Jdbc scan node is disabled, you can change be config enable_java_support "
1302
0
                    "to true and restart be.");
1303
0
        }
1304
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1305
0
        break;
1306
0
    }
1307
2.66k
    case TPlanNodeType::FILE_SCAN_NODE: {
1308
2.66k
        op = std::make_shared<FileScanOperatorX>(pool, tnode, next_operator_id(), descs,
1309
2.66k
                                                 _num_instances);
1310
2.66k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1311
2.66k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1312
2.66k
        break;
1313
2.66k
    }
1314
0
    case TPlanNodeType::ES_SCAN_NODE:
1315
0
    case TPlanNodeType::ES_HTTP_SCAN_NODE: {
1316
0
        op = std::make_shared<EsScanOperatorX>(pool, tnode, next_operator_id(), descs,
1317
0
                                               _num_instances);
1318
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1319
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1320
0
        break;
1321
0
    }
1322
127k
    case TPlanNodeType::EXCHANGE_NODE: {
1323
127k
        int num_senders = _params.per_exch_num_senders.contains(tnode.node_id)
1324
127k
                                  ? _params.per_exch_num_senders.find(tnode.node_id)->second
1325
18.4E
                                  : 0;
1326
127k
        DCHECK_GT(num_senders, 0);
1327
127k
        op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs,
1328
127k
                                                       num_senders);
1329
127k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1330
127k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1331
127k
        break;
1332
127k
    }
1333
152k
    case TPlanNodeType::AGGREGATION_NODE: {
1334
152k
        if (tnode.agg_node.grouping_exprs.empty() &&
1335
152k
            descs.get_tuple_descriptor(tnode.agg_node.output_tuple_id)->slots().empty()) {
1336
0
            return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) +
1337
0
                                         ": group by and output is empty");
1338
0
        }
1339
152k
        bool need_create_cache_op =
1340
152k
                enable_query_cache && tnode.node_id == _params.fragment.query_cache_param.node_id;
1341
152k
        auto create_query_cache_operator = [&](PipelinePtr& new_pipe) {
1342
24
            auto cache_node_id = _params.local_params[0].per_node_scan_ranges.begin()->first;
1343
24
            auto cache_source_id = next_operator_id();
1344
24
            op = std::make_shared<CacheSourceOperatorX>(pool, cache_node_id, cache_source_id,
1345
24
                                                        _params.fragment.query_cache_param);
1346
24
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1347
1348
24
            const auto downstream_pipeline_id = cur_pipe->id();
1349
24
            if (!_dag.contains(downstream_pipeline_id)) {
1350
24
                _dag.insert({downstream_pipeline_id, {}});
1351
24
            }
1352
24
            new_pipe = add_pipeline(cur_pipe);
1353
24
            _dag[downstream_pipeline_id].push_back(new_pipe->id());
1354
1355
24
            DataSinkOperatorPtr cache_sink(new CacheSinkOperatorX(
1356
24
                    next_sink_operator_id(), op->node_id(), op->operator_id()));
1357
24
            RETURN_IF_ERROR(new_pipe->set_sink(cache_sink));
1358
24
            return Status::OK();
1359
24
        };
1360
152k
        const bool group_by_limit_opt =
1361
152k
                tnode.agg_node.__isset.agg_sort_info_by_group_key && tnode.limit > 0;
1362
1363
        /// PartitionedAggSourceOperatorX does not support "group by limit opt(#29641)" yet.
1364
        /// If `group_by_limit_opt` is true, then it might not need to spill at all.
1365
152k
        const bool enable_spill = _runtime_state->enable_spill() &&
1366
152k
                                  !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt;
1367
152k
        const bool is_streaming_agg = tnode.agg_node.__isset.use_streaming_preaggregation &&
1368
152k
                                      tnode.agg_node.use_streaming_preaggregation &&
1369
152k
                                      !tnode.agg_node.grouping_exprs.empty();
1370
        // TODO: distinct streaming agg does not support spill.
1371
152k
        const bool can_use_distinct_streaming_agg =
1372
152k
                (!enable_spill || is_streaming_agg) && tnode.agg_node.aggregate_functions.empty() &&
1373
152k
                !tnode.agg_node.__isset.agg_sort_info_by_group_key &&
1374
152k
                _params.query_options.__isset.enable_distinct_streaming_aggregation &&
1375
152k
                _params.query_options.enable_distinct_streaming_aggregation;
1376
1377
152k
        if (can_use_distinct_streaming_agg) {
1378
98.7k
            if (need_create_cache_op) {
1379
8
                PipelinePtr new_pipe;
1380
8
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1381
1382
8
                cache_op = op;
1383
8
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1384
8
                                                                     tnode, descs);
1385
8
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1386
8
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1387
8
                cur_pipe = new_pipe;
1388
98.7k
            } else {
1389
98.7k
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1390
98.7k
                                                                     tnode, descs);
1391
98.7k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1392
98.7k
            }
1393
98.7k
        } else if (is_streaming_agg) {
1394
1.67k
            if (need_create_cache_op) {
1395
0
                PipelinePtr new_pipe;
1396
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1397
0
                cache_op = op;
1398
0
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1399
0
                                                             descs);
1400
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1401
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1402
0
                cur_pipe = new_pipe;
1403
1.67k
            } else {
1404
1.67k
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1405
1.67k
                                                             descs);
1406
1.67k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1407
1.67k
            }
1408
52.0k
        } else {
1409
            // create new pipeline to add query cache operator
1410
52.0k
            PipelinePtr new_pipe;
1411
52.0k
            if (need_create_cache_op) {
1412
16
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1413
16
                cache_op = op;
1414
16
            }
1415
1416
52.0k
            if (enable_spill) {
1417
601
                op = std::make_shared<PartitionedAggSourceOperatorX>(pool, tnode,
1418
601
                                                                     next_operator_id(), descs);
1419
51.4k
            } else {
1420
51.4k
                op = std::make_shared<AggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1421
51.4k
            }
1422
52.0k
            if (need_create_cache_op) {
1423
16
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1424
16
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1425
16
                cur_pipe = new_pipe;
1426
52.0k
            } else {
1427
52.0k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1428
52.0k
            }
1429
1430
52.0k
            const auto downstream_pipeline_id = cur_pipe->id();
1431
52.0k
            if (!_dag.contains(downstream_pipeline_id)) {
1432
50.6k
                _dag.insert({downstream_pipeline_id, {}});
1433
50.6k
            }
1434
52.0k
            cur_pipe = add_pipeline(cur_pipe);
1435
52.0k
            _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1436
1437
52.0k
            if (enable_spill) {
1438
601
                sink_ops.push_back(std::make_shared<PartitionedAggSinkOperatorX>(
1439
601
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1440
51.4k
            } else {
1441
51.4k
                sink_ops.push_back(std::make_shared<AggSinkOperatorX>(
1442
51.4k
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1443
51.4k
            }
1444
52.0k
            RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1445
52.0k
            RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1446
52.0k
        }
1447
152k
        break;
1448
152k
    }
1449
152k
    case TPlanNodeType::HASH_JOIN_NODE: {
1450
8.98k
        const auto is_broadcast_join = tnode.hash_join_node.__isset.is_broadcast_join &&
1451
8.98k
                                       tnode.hash_join_node.is_broadcast_join;
1452
8.98k
        const auto enable_spill = _runtime_state->enable_spill();
1453
8.98k
        if (enable_spill && !is_broadcast_join) {
1454
0
            auto tnode_ = tnode;
1455
0
            tnode_.runtime_filters.clear();
1456
0
            auto inner_probe_operator =
1457
0
                    std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, descs);
1458
1459
            // probe side inner sink operator is used to build hash table on probe side when data is spilled.
1460
            // So here use `tnode_` which has no runtime filters.
1461
0
            auto probe_side_inner_sink_operator =
1462
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode_, descs);
1463
1464
0
            RETURN_IF_ERROR(inner_probe_operator->init(tnode_, _runtime_state.get()));
1465
0
            RETURN_IF_ERROR(probe_side_inner_sink_operator->init(tnode_, _runtime_state.get()));
1466
1467
0
            auto probe_operator = std::make_shared<PartitionedHashJoinProbeOperatorX>(
1468
0
                    pool, tnode_, next_operator_id(), descs);
1469
0
            probe_operator->set_inner_operators(probe_side_inner_sink_operator,
1470
0
                                                inner_probe_operator);
1471
0
            op = std::move(probe_operator);
1472
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1473
1474
0
            const auto downstream_pipeline_id = cur_pipe->id();
1475
0
            if (!_dag.contains(downstream_pipeline_id)) {
1476
0
                _dag.insert({downstream_pipeline_id, {}});
1477
0
            }
1478
0
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1479
0
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1480
1481
0
            auto inner_sink_operator =
1482
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode, descs);
1483
0
            auto sink_operator = std::make_shared<PartitionedHashJoinSinkOperatorX>(
1484
0
                    pool, next_sink_operator_id(), op->operator_id(), tnode_, descs);
1485
0
            RETURN_IF_ERROR(inner_sink_operator->init(tnode, _runtime_state.get()));
1486
1487
0
            sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator);
1488
0
            sink_ops.push_back(std::move(sink_operator));
1489
0
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1490
0
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode_, _runtime_state.get()));
1491
1492
0
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1493
0
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1494
8.98k
        } else {
1495
8.98k
            op = std::make_shared<HashJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1496
8.98k
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1497
1498
8.98k
            const auto downstream_pipeline_id = cur_pipe->id();
1499
8.98k
            if (!_dag.contains(downstream_pipeline_id)) {
1500
7.28k
                _dag.insert({downstream_pipeline_id, {}});
1501
7.28k
            }
1502
8.98k
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1503
8.98k
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1504
1505
8.98k
            sink_ops.push_back(std::make_shared<HashJoinBuildSinkOperatorX>(
1506
8.98k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1507
8.98k
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1508
8.98k
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1509
1510
8.98k
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1511
8.98k
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1512
8.98k
        }
1513
8.98k
        if (is_broadcast_join && _runtime_state->enable_share_hash_table_for_broadcast_join()) {
1514
2.33k
            std::shared_ptr<HashJoinSharedState> shared_state =
1515
2.33k
                    HashJoinSharedState::create_shared(_num_instances);
1516
14.8k
            for (int i = 0; i < _num_instances; i++) {
1517
12.4k
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1518
12.4k
                                                             "HASH_JOIN_BUILD_DEPENDENCY");
1519
12.4k
                sink_dep->set_shared_state(shared_state.get());
1520
12.4k
                shared_state->sink_deps.push_back(sink_dep);
1521
12.4k
            }
1522
2.33k
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1523
2.33k
                                                     op->node_id(), "HASH_JOIN_PROBE");
1524
2.33k
            _op_id_to_shared_state.insert(
1525
2.33k
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1526
2.33k
        }
1527
8.98k
        break;
1528
8.98k
    }
1529
2.85k
    case TPlanNodeType::CROSS_JOIN_NODE: {
1530
2.85k
        op = std::make_shared<NestedLoopJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1531
2.85k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1532
1533
2.85k
        const auto downstream_pipeline_id = cur_pipe->id();
1534
2.85k
        if (!_dag.contains(downstream_pipeline_id)) {
1535
2.60k
            _dag.insert({downstream_pipeline_id, {}});
1536
2.60k
        }
1537
2.85k
        PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1538
2.85k
        _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1539
1540
2.85k
        sink_ops.push_back(std::make_shared<NestedLoopJoinBuildSinkOperatorX>(
1541
2.85k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1542
2.85k
        RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1543
2.85k
        RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1544
2.85k
        _pipeline_parent_map.push(op->node_id(), cur_pipe);
1545
2.85k
        _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1546
2.85k
        break;
1547
2.85k
    }
1548
49.3k
    case TPlanNodeType::UNION_NODE: {
1549
49.3k
        int child_count = tnode.num_children;
1550
49.3k
        op = std::make_shared<UnionSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1551
49.3k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1552
1553
49.3k
        const auto downstream_pipeline_id = cur_pipe->id();
1554
49.3k
        if (!_dag.contains(downstream_pipeline_id)) {
1555
48.8k
            _dag.insert({downstream_pipeline_id, {}});
1556
48.8k
        }
1557
50.4k
        for (int i = 0; i < child_count; i++) {
1558
1.15k
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1559
1.15k
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1560
1.15k
            sink_ops.push_back(std::make_shared<UnionSinkOperatorX>(
1561
1.15k
                    i, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1562
1.15k
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1563
1.15k
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1564
            // preset children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1565
1.15k
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1566
1.15k
        }
1567
49.3k
        break;
1568
49.3k
    }
1569
49.3k
    case TPlanNodeType::SORT_NODE: {
1570
33.8k
        const auto should_spill = _runtime_state->enable_spill() &&
1571
33.8k
                                  tnode.sort_node.algorithm == TSortAlgorithm::FULL_SORT;
1572
33.8k
        const bool use_local_merge =
1573
33.8k
                tnode.sort_node.__isset.use_local_merge && tnode.sort_node.use_local_merge;
1574
33.8k
        if (should_spill) {
1575
9
            op = std::make_shared<SpillSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1576
33.8k
        } else if (use_local_merge) {
1577
29.5k
            op = std::make_shared<LocalMergeSortSourceOperatorX>(pool, tnode, next_operator_id(),
1578
29.5k
                                                                 descs);
1579
29.5k
        } else {
1580
4.30k
            op = std::make_shared<SortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1581
4.30k
        }
1582
33.8k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1583
1584
33.8k
        const auto downstream_pipeline_id = cur_pipe->id();
1585
33.8k
        if (!_dag.contains(downstream_pipeline_id)) {
1586
33.8k
            _dag.insert({downstream_pipeline_id, {}});
1587
33.8k
        }
1588
33.8k
        cur_pipe = add_pipeline(cur_pipe);
1589
33.8k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1590
1591
33.8k
        if (should_spill) {
1592
9
            sink_ops.push_back(std::make_shared<SpillSortSinkOperatorX>(
1593
9
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1594
33.8k
        } else {
1595
33.8k
            sink_ops.push_back(std::make_shared<SortSinkOperatorX>(
1596
33.8k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1597
33.8k
        }
1598
33.8k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1599
33.8k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1600
33.8k
        break;
1601
33.8k
    }
1602
33.8k
    case TPlanNodeType::PARTITION_SORT_NODE: {
1603
64
        op = std::make_shared<PartitionSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1604
64
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1605
1606
64
        const auto downstream_pipeline_id = cur_pipe->id();
1607
64
        if (!_dag.contains(downstream_pipeline_id)) {
1608
64
            _dag.insert({downstream_pipeline_id, {}});
1609
64
        }
1610
64
        cur_pipe = add_pipeline(cur_pipe);
1611
64
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1612
1613
64
        sink_ops.push_back(std::make_shared<PartitionSortSinkOperatorX>(
1614
64
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1615
64
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1616
64
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1617
64
        break;
1618
64
    }
1619
1.76k
    case TPlanNodeType::ANALYTIC_EVAL_NODE: {
1620
1.76k
        op = std::make_shared<AnalyticSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1621
1.76k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1622
1623
1.76k
        const auto downstream_pipeline_id = cur_pipe->id();
1624
1.76k
        if (!_dag.contains(downstream_pipeline_id)) {
1625
1.75k
            _dag.insert({downstream_pipeline_id, {}});
1626
1.75k
        }
1627
1.76k
        cur_pipe = add_pipeline(cur_pipe);
1628
1.76k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1629
1630
1.76k
        sink_ops.push_back(std::make_shared<AnalyticSinkOperatorX>(
1631
1.76k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1632
1.76k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1633
1.76k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1634
1.76k
        break;
1635
1.76k
    }
1636
1.76k
    case TPlanNodeType::MATERIALIZATION_NODE: {
1637
681
        op = std::make_shared<MaterializationOperator>(pool, tnode, next_operator_id(), descs);
1638
681
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1639
681
        break;
1640
681
    }
1641
681
    case TPlanNodeType::INTERSECT_NODE: {
1642
119
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(pool, tnode, descs, op,
1643
119
                                                                      cur_pipe, sink_ops));
1644
119
        break;
1645
119
    }
1646
129
    case TPlanNodeType::EXCEPT_NODE: {
1647
129
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(pool, tnode, descs, op,
1648
129
                                                                       cur_pipe, sink_ops));
1649
129
        break;
1650
129
    }
1651
311
    case TPlanNodeType::REPEAT_NODE: {
1652
311
        op = std::make_shared<RepeatOperatorX>(pool, tnode, next_operator_id(), descs);
1653
311
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1654
311
        break;
1655
311
    }
1656
867
    case TPlanNodeType::TABLE_FUNCTION_NODE: {
1657
867
        op = std::make_shared<TableFunctionOperatorX>(pool, tnode, next_operator_id(), descs);
1658
867
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1659
867
        break;
1660
867
    }
1661
867
    case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
1662
18
        op = std::make_shared<AssertNumRowsOperatorX>(pool, tnode, next_operator_id(), descs);
1663
18
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1664
18
        break;
1665
18
    }
1666
1.55k
    case TPlanNodeType::EMPTY_SET_NODE: {
1667
1.55k
        op = std::make_shared<EmptySetSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1668
1.55k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1669
1.55k
        break;
1670
1.55k
    }
1671
1.55k
    case TPlanNodeType::DATA_GEN_SCAN_NODE: {
1672
265
        op = std::make_shared<DataGenSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1673
265
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1674
265
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1675
265
        break;
1676
265
    }
1677
1.54k
    case TPlanNodeType::SCHEMA_SCAN_NODE: {
1678
1.54k
        op = std::make_shared<SchemaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1679
1.54k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1680
1.54k
        break;
1681
1.54k
    }
1682
5.03k
    case TPlanNodeType::META_SCAN_NODE: {
1683
5.03k
        op = std::make_shared<MetaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1684
5.03k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1685
5.03k
        break;
1686
5.03k
    }
1687
5.03k
    case TPlanNodeType::SELECT_NODE: {
1688
954
        op = std::make_shared<SelectOperatorX>(pool, tnode, next_operator_id(), descs);
1689
954
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1690
954
        break;
1691
954
    }
1692
954
    case TPlanNodeType::REC_CTE_NODE: {
1693
150
        op = std::make_shared<RecCTESourceOperatorX>(pool, tnode, next_operator_id(), descs);
1694
150
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1695
1696
150
        const auto downstream_pipeline_id = cur_pipe->id();
1697
150
        if (!_dag.contains(downstream_pipeline_id)) {
1698
147
            _dag.insert({downstream_pipeline_id, {}});
1699
147
        }
1700
1701
150
        PipelinePtr anchor_side_pipe = add_pipeline(cur_pipe);
1702
150
        _dag[downstream_pipeline_id].push_back(anchor_side_pipe->id());
1703
1704
150
        DataSinkOperatorPtr anchor_sink;
1705
150
        anchor_sink = std::make_shared<RecCTEAnchorSinkOperatorX>(next_sink_operator_id(),
1706
150
                                                                  op->operator_id(), tnode, descs);
1707
150
        RETURN_IF_ERROR(anchor_side_pipe->set_sink(anchor_sink));
1708
150
        RETURN_IF_ERROR(anchor_side_pipe->sink()->init(tnode, _runtime_state.get()));
1709
150
        _pipeline_parent_map.push(op->node_id(), anchor_side_pipe);
1710
1711
150
        PipelinePtr rec_side_pipe = add_pipeline(cur_pipe);
1712
150
        _dag[downstream_pipeline_id].push_back(rec_side_pipe->id());
1713
1714
150
        DataSinkOperatorPtr rec_sink;
1715
150
        rec_sink = std::make_shared<RecCTESinkOperatorX>(next_sink_operator_id(), op->operator_id(),
1716
150
                                                         tnode, descs);
1717
150
        RETURN_IF_ERROR(rec_side_pipe->set_sink(rec_sink));
1718
150
        RETURN_IF_ERROR(rec_side_pipe->sink()->init(tnode, _runtime_state.get()));
1719
150
        _pipeline_parent_map.push(op->node_id(), rec_side_pipe);
1720
1721
150
        break;
1722
150
    }
1723
1.85k
    case TPlanNodeType::REC_CTE_SCAN_NODE: {
1724
1.85k
        op = std::make_shared<RecCTEScanOperatorX>(pool, tnode, next_operator_id(), descs);
1725
1.85k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1726
1.85k
        break;
1727
1.85k
    }
1728
1.85k
    default:
1729
0
        return Status::InternalError("Unsupported exec type in pipeline: {}",
1730
0
                                     print_plan_node_type(tnode.node_type));
1731
566k
    }
1732
564k
    if (_params.__isset.parallel_instances && fe_with_old_version) {
1733
0
        cur_pipe->set_num_tasks(_params.parallel_instances);
1734
0
        op->set_serial_operator();
1735
0
    }
1736
1737
564k
    return Status::OK();
1738
566k
}
1739
// NOLINTEND(readability-function-cognitive-complexity)
1740
// NOLINTEND(readability-function-size)
1741
1742
template <bool is_intersect>
1743
Status PipelineFragmentContext::_build_operators_for_set_operation_node(
1744
        ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op,
1745
248
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1746
248
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1747
248
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1748
1749
248
    const auto downstream_pipeline_id = cur_pipe->id();
1750
248
    if (!_dag.contains(downstream_pipeline_id)) {
1751
231
        _dag.insert({downstream_pipeline_id, {}});
1752
231
    }
1753
1754
837
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1755
589
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1756
589
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1757
1758
589
        if (child_id == 0) {
1759
248
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1760
248
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1761
341
        } else {
1762
341
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1763
341
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1764
341
        }
1765
589
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1766
589
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1767
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1768
589
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1769
589
    }
1770
1771
248
    return Status::OK();
1772
248
}
_ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb1EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Line
Count
Source
1745
119
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1746
119
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1747
119
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1748
1749
119
    const auto downstream_pipeline_id = cur_pipe->id();
1750
119
    if (!_dag.contains(downstream_pipeline_id)) {
1751
110
        _dag.insert({downstream_pipeline_id, {}});
1752
110
    }
1753
1754
435
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1755
316
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1756
316
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1757
1758
316
        if (child_id == 0) {
1759
119
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1760
119
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1761
197
        } else {
1762
197
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1763
197
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1764
197
        }
1765
316
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1766
316
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1767
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1768
316
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1769
316
    }
1770
1771
119
    return Status::OK();
1772
119
}
_ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb0EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Line
Count
Source
1745
129
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1746
129
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1747
129
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1748
1749
129
    const auto downstream_pipeline_id = cur_pipe->id();
1750
129
    if (!_dag.contains(downstream_pipeline_id)) {
1751
121
        _dag.insert({downstream_pipeline_id, {}});
1752
121
    }
1753
1754
402
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1755
273
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1756
273
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1757
1758
273
        if (child_id == 0) {
1759
129
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1760
129
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1761
144
        } else {
1762
144
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1763
144
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1764
144
        }
1765
273
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1766
273
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1767
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1768
273
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1769
273
    }
1770
1771
129
    return Status::OK();
1772
129
}
1773
1774
348k
Status PipelineFragmentContext::submit() {
1775
348k
    if (_submitted) {
1776
0
        return Status::InternalError("submitted");
1777
0
    }
1778
348k
    _submitted = true;
1779
1780
348k
    int submit_tasks = 0;
1781
348k
    Status st;
1782
348k
    auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
1783
1.02M
    for (auto& task : _tasks) {
1784
1.81M
        for (auto& t : task) {
1785
1.81M
            st = scheduler->submit(t.first);
1786
1.81M
            DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed",
1787
1.81M
                            { st = Status::Aborted("PipelineFragmentContext.submit.failed"); });
1788
1.81M
            if (!st) {
1789
0
                cancel(Status::InternalError("submit context to executor fail"));
1790
0
                std::lock_guard<std::mutex> l(_task_mutex);
1791
0
                _total_tasks = submit_tasks;
1792
0
                break;
1793
0
            }
1794
1.81M
            submit_tasks++;
1795
1.81M
        }
1796
1.02M
    }
1797
348k
    if (!st.ok()) {
1798
0
        std::lock_guard<std::mutex> l(_task_mutex);
1799
0
        if (_closed_tasks >= _total_tasks) {
1800
0
            _close_fragment_instance();
1801
0
        }
1802
0
        return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(),
1803
0
                                     BackendOptions::get_localhost());
1804
348k
    } else {
1805
348k
        return st;
1806
348k
    }
1807
348k
}
1808
1809
0
void PipelineFragmentContext::print_profile(const std::string& extra_info) {
1810
0
    if (_runtime_state->enable_profile()) {
1811
0
        std::stringstream ss;
1812
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
1813
0
            runtime_profile_ptr->pretty_print(&ss);
1814
0
        }
1815
1816
0
        if (_runtime_state->load_channel_profile()) {
1817
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
1818
0
        }
1819
1820
0
        auto profile_str =
1821
0
                fmt::format("Query {} fragment {} {}, profile, {}", print_id(this->_query_id),
1822
0
                            this->_fragment_id, extra_info, ss.str());
1823
0
        LOG_LONG_STRING(INFO, profile_str);
1824
0
    }
1825
0
}
1826
// If all pipeline tasks binded to the fragment instance are finished, then we could
1827
// close the fragment instance.
1828
349k
void PipelineFragmentContext::_close_fragment_instance() {
1829
349k
    if (_is_fragment_instance_closed) {
1830
0
        return;
1831
0
    }
1832
349k
    Defer defer_op {[&]() {
1833
349k
        _is_fragment_instance_closed = true;
1834
349k
        _notify_cv.notify_all();
1835
349k
    }};
1836
349k
    _fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
1837
349k
    if (!_need_notify_close) {
1838
346k
        auto st = send_report(true);
1839
346k
        if (!st) {
1840
310k
            LOG(WARNING) << fmt::format("Failed to send report for query {}, fragment {}: {}",
1841
310k
                                        print_id(_query_id), _fragment_id, st.to_string());
1842
310k
        }
1843
346k
    }
1844
    // Print profile content in info log is a tempoeray solution for stream load and external_connector.
1845
    // Since stream load does not have someting like coordinator on FE, so
1846
    // backend can not report profile to FE, ant its profile can not be shown
1847
    // in the same way with other query. So we print the profile content to info log.
1848
1849
349k
    if (_runtime_state->enable_profile() &&
1850
349k
        (_query_ctx->get_query_source() == QuerySource::STREAM_LOAD ||
1851
2.59k
         _query_ctx->get_query_source() == QuerySource::EXTERNAL_CONNECTOR ||
1852
2.59k
         _query_ctx->get_query_source() == QuerySource::GROUP_COMMIT_LOAD)) {
1853
0
        std::stringstream ss;
1854
        // Compute the _local_time_percent before pretty_print the runtime_profile
1855
        // Before add this operation, the print out like that:
1856
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 00.00%)
1857
        // After add the operation, the print out like that:
1858
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 82.53%)
1859
        // We can easily know the exec node execute time without child time consumed.
1860
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
1861
0
            runtime_profile_ptr->pretty_print(&ss);
1862
0
        }
1863
1864
0
        if (_runtime_state->load_channel_profile()) {
1865
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
1866
0
        }
1867
1868
0
        LOG_INFO("Query {} fragment {} profile:\n {}", print_id(_query_id), _fragment_id, ss.str());
1869
0
    }
1870
1871
349k
    if (_query_ctx->enable_profile()) {
1872
2.59k
        _query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile(),
1873
2.59k
                                         collect_realtime_load_channel_profile());
1874
2.59k
    }
1875
1876
349k
    if (!_need_notify_close) {
1877
        // all submitted tasks done
1878
346k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
1879
346k
    }
1880
349k
}
1881
1882
1.81M
void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
1883
    // If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here
1884
1.81M
    DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
1885
1.81M
    if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
1886
558k
        if (_dag.contains(pipeline_id)) {
1887
313k
            for (auto dep : _dag[pipeline_id]) {
1888
313k
                _pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id);
1889
313k
            }
1890
248k
        }
1891
558k
    }
1892
1.81M
    std::lock_guard<std::mutex> l(_task_mutex);
1893
1.81M
    ++_closed_tasks;
1894
1.81M
    if (_closed_tasks >= _total_tasks) {
1895
349k
        _close_fragment_instance();
1896
349k
    }
1897
1.81M
}
1898
1899
45.4k
std::string PipelineFragmentContext::get_load_error_url() {
1900
45.4k
    if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) {
1901
0
        return to_load_error_http_path(str);
1902
0
    }
1903
144k
    for (auto& tasks : _tasks) {
1904
253k
        for (auto& task : tasks) {
1905
253k
            if (const auto& str = task.second->get_error_log_file_path(); !str.empty()) {
1906
148
                return to_load_error_http_path(str);
1907
148
            }
1908
253k
        }
1909
144k
    }
1910
45.3k
    return "";
1911
45.4k
}
1912
1913
45.4k
std::string PipelineFragmentContext::get_first_error_msg() {
1914
45.4k
    if (const auto& str = _runtime_state->get_first_error_msg(); !str.empty()) {
1915
0
        return str;
1916
0
    }
1917
144k
    for (auto& tasks : _tasks) {
1918
253k
        for (auto& task : tasks) {
1919
253k
            if (const auto& str = task.second->get_first_error_msg(); !str.empty()) {
1920
148
                return str;
1921
148
            }
1922
253k
        }
1923
144k
    }
1924
45.3k
    return "";
1925
45.4k
}
1926
1927
350k
Status PipelineFragmentContext::send_report(bool done) {
1928
350k
    Status exec_status = _query_ctx->exec_status();
1929
    // If plan is done successfully, but _is_report_success is false,
1930
    // no need to send report.
1931
    // Load will set _is_report_success to true because load wants to know
1932
    // the process.
1933
350k
    if (!_is_report_success && done && exec_status.ok()) {
1934
310k
        return Status::NeedSendAgain("");
1935
310k
    }
1936
1937
    // If both _is_report_success and _is_report_on_cancel are false,
1938
    // which means no matter query is success or failed, no report is needed.
1939
    // This may happen when the query limit reached and
1940
    // a internal cancellation being processed
1941
    // When limit is reached the fragment is also cancelled, but _is_report_on_cancel will
1942
    // be set to false, to avoid sending fault report to FE.
1943
40.2k
    if (!_is_report_success && !_is_report_on_cancel) {
1944
34
        return Status::NeedSendAgain("");
1945
34
    }
1946
1947
40.1k
    std::vector<RuntimeState*> runtime_states;
1948
1949
120k
    for (auto& tasks : _tasks) {
1950
196k
        for (auto& task : tasks) {
1951
196k
            runtime_states.push_back(task.second.get());
1952
196k
        }
1953
120k
    }
1954
1955
40.1k
    std::string load_eror_url = _query_ctx->get_load_error_url().empty()
1956
40.2k
                                        ? get_load_error_url()
1957
18.4E
                                        : _query_ctx->get_load_error_url();
1958
40.1k
    std::string first_error_msg = _query_ctx->get_first_error_msg().empty()
1959
40.2k
                                          ? get_first_error_msg()
1960
18.4E
                                          : _query_ctx->get_first_error_msg();
1961
1962
40.1k
    ReportStatusRequest req {.status = exec_status,
1963
40.1k
                             .runtime_states = runtime_states,
1964
40.1k
                             .done = done || !exec_status.ok(),
1965
40.1k
                             .coord_addr = _query_ctx->coord_addr,
1966
40.1k
                             .query_id = _query_id,
1967
40.1k
                             .fragment_id = _fragment_id,
1968
40.1k
                             .fragment_instance_id = TUniqueId(),
1969
40.1k
                             .backend_num = -1,
1970
40.1k
                             .runtime_state = _runtime_state.get(),
1971
40.1k
                             .load_error_url = load_eror_url,
1972
40.1k
                             .first_error_msg = first_error_msg,
1973
40.1k
                             .cancel_fn = [this](const Status& reason) { cancel(reason); }};
1974
1975
40.1k
    return _report_status_cb(
1976
40.1k
            req, std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
1977
40.2k
}
1978
1979
99
size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const {
1980
99
    size_t res = 0;
1981
    // _tasks will be cleared during ~PipelineFragmentContext, so that it's safe
1982
    // here to traverse the vector.
1983
99
    for (const auto& task_instances : _tasks) {
1984
273
        for (const auto& task : task_instances) {
1985
273
            if (task.first->is_running()) {
1986
0
                LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id)
1987
0
                                      << " is running, task: " << (void*)task.first.get()
1988
0
                                      << ", is_running: " << task.first->is_running();
1989
0
                *has_running_task = true;
1990
0
                return 0;
1991
0
            }
1992
1993
273
            size_t revocable_size = task.first->get_revocable_size();
1994
273
            if (revocable_size >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
1995
23
                res += revocable_size;
1996
23
            }
1997
273
        }
1998
99
    }
1999
99
    return res;
2000
99
}
2001
2002
198
std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const {
2003
198
    std::vector<PipelineTask*> revocable_tasks;
2004
198
    for (const auto& task_instances : _tasks) {
2005
546
        for (const auto& task : task_instances) {
2006
546
            size_t revocable_size_ = task.first->get_revocable_size();
2007
2008
546
            if (revocable_size_ >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2009
46
                revocable_tasks.emplace_back(task.first.get());
2010
46
            }
2011
546
        }
2012
198
    }
2013
198
    return revocable_tasks;
2014
198
}
2015
2016
488
std::string PipelineFragmentContext::debug_string() {
2017
488
    std::lock_guard<std::mutex> l(_task_mutex);
2018
488
    fmt::memory_buffer debug_string_buffer;
2019
488
    fmt::format_to(debug_string_buffer,
2020
488
                   "PipelineFragmentContext Info: _closed_tasks={}, _total_tasks={}, "
2021
488
                   "need_notify_close={}, has_task_execution_ctx_ref_count={}\n",
2022
488
                   _closed_tasks, _total_tasks, _need_notify_close,
2023
488
                   _has_task_execution_ctx_ref_count);
2024
1.55k
    for (size_t j = 0; j < _tasks.size(); j++) {
2025
1.06k
        fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
2026
3.06k
        for (size_t i = 0; i < _tasks[j].size(); i++) {
2027
2.00k
            fmt::format_to(debug_string_buffer, "Task {}: {}\n", i,
2028
2.00k
                           _tasks[j][i].first->debug_string());
2029
2.00k
        }
2030
1.06k
    }
2031
2032
488
    return fmt::to_string(debug_string_buffer);
2033
488
}
2034
2035
std::vector<std::shared_ptr<TRuntimeProfileTree>>
2036
2.59k
PipelineFragmentContext::collect_realtime_profile() const {
2037
2.59k
    std::vector<std::shared_ptr<TRuntimeProfileTree>> res;
2038
2039
    // we do not have mutex to protect pipeline_id_to_profile
2040
    // so we need to make sure this funciton is invoked after fragment context
2041
    // has already been prepared.
2042
2.59k
    if (!_prepared) {
2043
0
        std::string msg =
2044
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2045
0
        DCHECK(false) << msg;
2046
0
        LOG_ERROR(msg);
2047
0
        return res;
2048
0
    }
2049
2050
    // Make sure first profile is fragment level profile
2051
2.59k
    auto fragment_profile = std::make_shared<TRuntimeProfileTree>();
2052
2.59k
    _fragment_level_profile->to_thrift(fragment_profile.get(), _runtime_state->profile_level());
2053
2.59k
    res.push_back(fragment_profile);
2054
2055
    // pipeline_id_to_profile is initialized in prepare stage
2056
4.91k
    for (auto pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
2057
4.91k
        auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
2058
4.91k
        pipeline_profile->to_thrift(profile_ptr.get(), _runtime_state->profile_level());
2059
4.91k
        res.push_back(profile_ptr);
2060
4.91k
    }
2061
2062
2.59k
    return res;
2063
2.59k
}
2064
2065
std::shared_ptr<TRuntimeProfileTree>
2066
2.59k
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
2067
    // we do not have mutex to protect pipeline_id_to_profile
2068
    // so we need to make sure this funciton is invoked after fragment context
2069
    // has already been prepared.
2070
2.59k
    if (!_prepared) {
2071
0
        std::string msg =
2072
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2073
0
        DCHECK(false) << msg;
2074
0
        LOG_ERROR(msg);
2075
0
        return nullptr;
2076
0
    }
2077
2078
7.20k
    for (const auto& tasks : _tasks) {
2079
14.6k
        for (const auto& task : tasks) {
2080
14.6k
            if (task.second->load_channel_profile() == nullptr) {
2081
0
                continue;
2082
0
            }
2083
2084
14.6k
            auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2085
2086
14.6k
            task.second->load_channel_profile()->to_thrift(tmp_load_channel_profile.get(),
2087
14.6k
                                                           _runtime_state->profile_level());
2088
14.6k
            _runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
2089
14.6k
        }
2090
7.20k
    }
2091
2092
2.59k
    auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2093
2.59k
    _runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get(),
2094
2.59k
                                                      _runtime_state->profile_level());
2095
2.59k
    return load_channel_profile;
2096
2.59k
}
2097
2098
3.17k
Status PipelineFragmentContext::wait_close(bool close) {
2099
3.17k
    if (_exec_env->new_load_stream_mgr()->get(_query_id) != nullptr) {
2100
0
        return Status::InternalError("stream load do not support reset");
2101
0
    }
2102
3.17k
    if (!_need_notify_close) {
2103
0
        return Status::InternalError("_need_notify_close is false, do not support reset");
2104
0
    }
2105
2106
3.17k
    {
2107
3.17k
        std::unique_lock<std::mutex> lock(_task_mutex);
2108
3.25k
        while (!(_is_fragment_instance_closed.load() && !_has_task_execution_ctx_ref_count)) {
2109
78
            if (_query_ctx->is_cancelled()) {
2110
0
                return Status::Cancelled("Query has been cancelled");
2111
0
            }
2112
78
            _notify_cv.wait_for(lock, std::chrono::seconds(1));
2113
78
        }
2114
3.17k
    }
2115
2116
3.17k
    if (close) {
2117
190
        auto st = send_report(true);
2118
190
        if (!st) {
2119
155
            LOG(WARNING) << fmt::format("Failed to send report for query {}, fragment {}: {}",
2120
155
                                        print_id(_query_id), _fragment_id, st.to_string());
2121
155
        }
2122
190
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
2123
190
    }
2124
3.17k
    return Status::OK();
2125
3.17k
}
2126
2127
2.98k
Status PipelineFragmentContext::set_to_rerun() {
2128
2.98k
    {
2129
2.98k
        std::lock_guard<std::mutex> l(_task_mutex);
2130
2.98k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
2131
5.22k
        for (auto& tasks : _tasks) {
2132
10.2k
            for (const auto& task : tasks) {
2133
10.2k
                task.first->runtime_state()->reset_to_rerun();
2134
10.2k
            }
2135
5.22k
        }
2136
2.98k
    }
2137
2.98k
    _release_resource();
2138
2.98k
    _runtime_state->reset_to_rerun();
2139
2.98k
    return Status::OK();
2140
2.98k
}
2141
2142
2.98k
Status PipelineFragmentContext::rebuild(ThreadPool* thread_pool) {
2143
2.98k
    _submitted = false;
2144
2.98k
    _is_fragment_instance_closed = false;
2145
2.98k
    return _build_and_prepare_full_pipeline(thread_pool);
2146
2.98k
}
2147
2148
350k
void PipelineFragmentContext::_release_resource() {
2149
350k
    std::lock_guard<std::mutex> l(_task_mutex);
2150
    // The memory released by the query end is recorded in the query mem tracker.
2151
350k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
2152
350k
    auto st = _query_ctx->exec_status();
2153
1.02M
    for (auto& _task : _tasks) {
2154
1.02M
        if (!_task.empty()) {
2155
1.02M
            _call_back(_task.front().first->runtime_state(), &st);
2156
1.02M
        }
2157
1.02M
    }
2158
350k
    _tasks.clear();
2159
350k
    _dag.clear();
2160
350k
    _pip_id_to_pipeline.clear();
2161
350k
    _pipelines.clear();
2162
350k
    _sink.reset();
2163
350k
    _root_op.reset();
2164
350k
    _runtime_filter_mgr_map.clear();
2165
350k
    _op_id_to_shared_state.clear();
2166
350k
}
2167
2168
#include "common/compile_check_end.h"
2169
} // namespace doris