Coverage Report

Created: 2026-03-13 03:47

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/pipeline/pipeline_fragment_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/pipeline/pipeline_fragment_context.h"
19
20
#include <gen_cpp/DataSinks_types.h>
21
#include <gen_cpp/PaloInternalService_types.h>
22
#include <gen_cpp/PlanNodes_types.h>
23
#include <pthread.h>
24
25
#include <algorithm>
26
#include <cstdlib>
27
// IWYU pragma: no_include <bits/chrono.h>
28
#include <fmt/format.h>
29
30
#include <chrono> // IWYU pragma: keep
31
#include <map>
32
#include <memory>
33
#include <ostream>
34
#include <utility>
35
36
#include "cloud/config.h"
37
#include "common/cast_set.h"
38
#include "common/config.h"
39
#include "common/exception.h"
40
#include "common/logging.h"
41
#include "common/status.h"
42
#include "exec/exchange/local_exchange_sink_operator.h"
43
#include "exec/exchange/local_exchange_source_operator.h"
44
#include "exec/exchange/local_exchanger.h"
45
#include "exec/exchange/vdata_stream_mgr.h"
46
#include "exec/operator/aggregation_sink_operator.h"
47
#include "exec/operator/aggregation_source_operator.h"
48
#include "exec/operator/analytic_sink_operator.h"
49
#include "exec/operator/analytic_source_operator.h"
50
#include "exec/operator/assert_num_rows_operator.h"
51
#include "exec/operator/blackhole_sink_operator.h"
52
#include "exec/operator/cache_sink_operator.h"
53
#include "exec/operator/cache_source_operator.h"
54
#include "exec/operator/datagen_operator.h"
55
#include "exec/operator/dict_sink_operator.h"
56
#include "exec/operator/distinct_streaming_aggregation_operator.h"
57
#include "exec/operator/empty_set_operator.h"
58
#include "exec/operator/es_scan_operator.h"
59
#include "exec/operator/exchange_sink_operator.h"
60
#include "exec/operator/exchange_source_operator.h"
61
#include "exec/operator/file_scan_operator.h"
62
#include "exec/operator/group_commit_block_sink_operator.h"
63
#include "exec/operator/group_commit_scan_operator.h"
64
#include "exec/operator/hashjoin_build_sink.h"
65
#include "exec/operator/hashjoin_probe_operator.h"
66
#include "exec/operator/hive_table_sink_operator.h"
67
#include "exec/operator/iceberg_table_sink_operator.h"
68
#include "exec/operator/jdbc_scan_operator.h"
69
#include "exec/operator/jdbc_table_sink_operator.h"
70
#include "exec/operator/local_merge_sort_source_operator.h"
71
#include "exec/operator/materialization_opertor.h"
72
#include "exec/operator/maxcompute_table_sink_operator.h"
73
#include "exec/operator/memory_scratch_sink_operator.h"
74
#include "exec/operator/meta_scan_operator.h"
75
#include "exec/operator/multi_cast_data_stream_sink.h"
76
#include "exec/operator/multi_cast_data_stream_source.h"
77
#include "exec/operator/nested_loop_join_build_operator.h"
78
#include "exec/operator/nested_loop_join_probe_operator.h"
79
#include "exec/operator/olap_scan_operator.h"
80
#include "exec/operator/olap_table_sink_operator.h"
81
#include "exec/operator/olap_table_sink_v2_operator.h"
82
#include "exec/operator/partition_sort_sink_operator.h"
83
#include "exec/operator/partition_sort_source_operator.h"
84
#include "exec/operator/partitioned_aggregation_sink_operator.h"
85
#include "exec/operator/partitioned_aggregation_source_operator.h"
86
#include "exec/operator/partitioned_hash_join_probe_operator.h"
87
#include "exec/operator/partitioned_hash_join_sink_operator.h"
88
#include "exec/operator/rec_cte_anchor_sink_operator.h"
89
#include "exec/operator/rec_cte_scan_operator.h"
90
#include "exec/operator/rec_cte_sink_operator.h"
91
#include "exec/operator/rec_cte_source_operator.h"
92
#include "exec/operator/repeat_operator.h"
93
#include "exec/operator/result_file_sink_operator.h"
94
#include "exec/operator/result_sink_operator.h"
95
#include "exec/operator/schema_scan_operator.h"
96
#include "exec/operator/select_operator.h"
97
#include "exec/operator/set_probe_sink_operator.h"
98
#include "exec/operator/set_sink_operator.h"
99
#include "exec/operator/set_source_operator.h"
100
#include "exec/operator/sort_sink_operator.h"
101
#include "exec/operator/sort_source_operator.h"
102
#include "exec/operator/spill_iceberg_table_sink_operator.h"
103
#include "exec/operator/spill_sort_sink_operator.h"
104
#include "exec/operator/spill_sort_source_operator.h"
105
#include "exec/operator/streaming_aggregation_operator.h"
106
#include "exec/operator/table_function_operator.h"
107
#include "exec/operator/tvf_table_sink_operator.h"
108
#include "exec/operator/union_sink_operator.h"
109
#include "exec/operator/union_source_operator.h"
110
#include "exec/pipeline/dependency.h"
111
#include "exec/pipeline/pipeline_task.h"
112
#include "exec/pipeline/task_scheduler.h"
113
#include "exec/runtime_filter/runtime_filter_mgr.h"
114
#include "exec/sort/topn_sorter.h"
115
#include "exec/spill/spill_stream.h"
116
#include "io/fs/stream_load_pipe.h"
117
#include "load/stream_load/new_load_stream_mgr.h"
118
#include "runtime/exec_env.h"
119
#include "runtime/fragment_mgr.h"
120
#include "runtime/result_block_buffer.h"
121
#include "runtime/result_buffer_mgr.h"
122
#include "runtime/runtime_state.h"
123
#include "runtime/thread_context.h"
124
#include "service/backend_options.h"
125
#include "util/countdown_latch.h"
126
#include "util/debug_util.h"
127
#include "util/uid_util.h"
128
129
namespace doris {
130
#include "common/compile_check_begin.h"
131
PipelineFragmentContext::PipelineFragmentContext(
132
        TUniqueId query_id, const TPipelineFragmentParams& request,
133
        std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
134
        const std::function<void(RuntimeState*, Status*)>& call_back,
135
        report_status_callback report_status_cb)
136
331k
        : _query_id(std::move(query_id)),
137
331k
          _fragment_id(request.fragment_id),
138
331k
          _exec_env(exec_env),
139
331k
          _query_ctx(std::move(query_ctx)),
140
331k
          _call_back(call_back),
141
331k
          _is_report_on_cancel(true),
142
331k
          _report_status_cb(std::move(report_status_cb)),
143
331k
          _params(request),
144
331k
          _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0),
145
331k
          _need_notify_close(request.__isset.need_notify_close ? request.need_notify_close
146
331k
                                                               : false) {
147
331k
    _fragment_watcher.start();
148
331k
}
149
150
332k
PipelineFragmentContext::~PipelineFragmentContext() {
151
332k
    LOG_INFO("PipelineFragmentContext::~PipelineFragmentContext")
152
332k
            .tag("query_id", print_id(_query_id))
153
332k
            .tag("fragment_id", _fragment_id);
154
332k
    _release_resource();
155
332k
    {
156
        // The memory released by the query end is recorded in the query mem tracker.
157
332k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
158
332k
        _runtime_state.reset();
159
332k
        _query_ctx.reset();
160
332k
    }
161
332k
}
162
163
472
bool PipelineFragmentContext::is_timeout(timespec now) const {
164
472
    if (_timeout <= 0) {
165
0
        return false;
166
0
    }
167
472
    return _fragment_watcher.elapsed_time_seconds(now) > _timeout;
168
472
}
169
170
// Must not add lock in this method. Because it will call query ctx cancel. And
171
// QueryCtx cancel will call fragment ctx cancel. And Also Fragment ctx's running
172
// Method like exchange sink buffer will call query ctx cancel. If we add lock here
173
// There maybe dead lock.
174
4.67k
void PipelineFragmentContext::cancel(const Status reason) {
175
4.67k
    LOG_INFO("PipelineFragmentContext::cancel")
176
4.67k
            .tag("query_id", print_id(_query_id))
177
4.67k
            .tag("fragment_id", _fragment_id)
178
4.67k
            .tag("reason", reason.to_string());
179
4.67k
    {
180
4.67k
        std::lock_guard<std::mutex> l(_task_mutex);
181
4.67k
        if (_closed_tasks >= _total_tasks) {
182
            // All tasks in this PipelineXFragmentContext already closed.
183
56
            return;
184
56
        }
185
4.67k
    }
186
    // Timeout is a special error code, we need print current stack to debug timeout issue.
187
4.61k
    if (reason.is<ErrorCode::TIMEOUT>()) {
188
1
        auto dbg_str = fmt::format("PipelineFragmentContext is cancelled due to timeout:\n{}",
189
1
                                   debug_string());
190
1
        LOG_LONG_STRING(WARNING, dbg_str);
191
1
    }
192
193
    // `ILLEGAL_STATE` means queries this fragment belongs to was not found in FE (maybe finished)
194
4.61k
    if (reason.is<ErrorCode::ILLEGAL_STATE>()) {
195
0
        LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}",
196
0
                    debug_string());
197
0
    }
198
199
4.62k
    if (reason.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || reason.is<ErrorCode::MEM_ALLOC_FAILED>()) {
200
0
        print_profile("cancel pipeline, reason: " + reason.to_string());
201
0
    }
202
203
4.61k
    if (auto error_url = get_load_error_url(); !error_url.empty()) {
204
22
        _query_ctx->set_load_error_url(error_url);
205
22
    }
206
207
4.61k
    if (auto first_error_msg = get_first_error_msg(); !first_error_msg.empty()) {
208
22
        _query_ctx->set_first_error_msg(first_error_msg);
209
22
    }
210
211
4.61k
    _query_ctx->cancel(reason, _fragment_id);
212
4.61k
    if (reason.is<ErrorCode::LIMIT_REACH>()) {
213
82
        _is_report_on_cancel = false;
214
4.53k
    } else {
215
18.1k
        for (auto& id : _fragment_instance_ids) {
216
18.1k
            LOG(WARNING) << "PipelineFragmentContext cancel instance: " << print_id(id);
217
18.1k
        }
218
4.53k
    }
219
    // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
220
    // For stream load the fragment's query_id == load id, it is set in FE.
221
4.61k
    auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
222
4.61k
    if (stream_load_ctx != nullptr) {
223
30
        stream_load_ctx->pipe->cancel(reason.to_string());
224
        // Set error URL here because after pipe is cancelled, stream load execution may return early.
225
        // We need to set the error URL at this point to ensure error information is properly
226
        // propagated to the client.
227
30
        stream_load_ctx->error_url = get_load_error_url();
228
30
        stream_load_ctx->first_error_msg = get_first_error_msg();
229
30
    }
230
231
18.4k
    for (auto& tasks : _tasks) {
232
38.8k
        for (auto& task : tasks) {
233
38.8k
            task.first->terminate();
234
38.8k
        }
235
18.4k
    }
236
4.61k
}
237
238
542k
PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
239
542k
    PipelineId id = _next_pipeline_id++;
240
542k
    auto pipeline = std::make_shared<Pipeline>(
241
542k
            id, parent ? std::min(parent->num_tasks(), _num_instances) : _num_instances,
242
542k
            parent ? parent->num_tasks() : _num_instances);
243
542k
    if (idx >= 0) {
244
108k
        _pipelines.insert(_pipelines.begin() + idx, pipeline);
245
434k
    } else {
246
434k
        _pipelines.emplace_back(pipeline);
247
434k
    }
248
542k
    if (parent) {
249
205k
        parent->set_children(pipeline);
250
205k
    }
251
542k
    return pipeline;
252
542k
}
253
254
334k
Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thread_pool) {
255
334k
    {
256
334k
        SCOPED_TIMER(_build_pipelines_timer);
257
        // 2. Build pipelines with operators in this fragment.
258
334k
        auto root_pipeline = add_pipeline();
259
334k
        RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl,
260
334k
                                         &_root_op, root_pipeline));
261
262
        // 3. Create sink operator
263
334k
        if (!_params.fragment.__isset.output_sink) {
264
0
            return Status::InternalError("No output sink in this fragment!");
265
0
        }
266
334k
        RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), _params.fragment.output_sink,
267
334k
                                          _params.fragment.output_exprs, _params,
268
334k
                                          root_pipeline->output_row_desc(), _runtime_state.get(),
269
334k
                                          *_desc_tbl, root_pipeline->id()));
270
334k
        RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink));
271
334k
        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
272
273
433k
        for (PipelinePtr& pipeline : _pipelines) {
274
18.4E
            DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size();
275
433k
            RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
276
433k
        }
277
334k
    }
278
    // 4. Build local exchanger
279
334k
    if (_runtime_state->enable_local_shuffle()) {
280
332k
        SCOPED_TIMER(_plan_local_exchanger_timer);
281
332k
        RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets,
282
332k
                                             _params.bucket_seq_to_instance_idx,
283
332k
                                             _params.shuffle_idx_to_instance_idx));
284
332k
    }
285
286
    // 5. Initialize global states in pipelines.
287
542k
    for (PipelinePtr& pipeline : _pipelines) {
288
542k
        SCOPED_TIMER(_prepare_all_pipelines_timer);
289
542k
        pipeline->children().clear();
290
542k
        RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
291
542k
    }
292
293
333k
    {
294
333k
        SCOPED_TIMER(_build_tasks_timer);
295
        // 6. Build pipeline tasks and initialize local state.
296
333k
        RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool));
297
333k
    }
298
299
333k
    return Status::OK();
300
333k
}
301
302
331k
Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) {
303
331k
    if (_prepared) {
304
0
        return Status::InternalError("Already prepared");
305
0
    }
306
332k
    if (_params.__isset.query_options && _params.query_options.__isset.execution_timeout) {
307
332k
        _timeout = _params.query_options.execution_timeout;
308
332k
    }
309
310
331k
    _fragment_level_profile = std::make_unique<RuntimeProfile>("PipelineContext");
311
331k
    _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime");
312
331k
    SCOPED_TIMER(_prepare_timer);
313
331k
    _build_pipelines_timer = ADD_TIMER(_fragment_level_profile, "BuildPipelinesTime");
314
331k
    _init_context_timer = ADD_TIMER(_fragment_level_profile, "InitContextTime");
315
331k
    _plan_local_exchanger_timer = ADD_TIMER(_fragment_level_profile, "PlanLocalLocalExchangerTime");
316
331k
    _build_tasks_timer = ADD_TIMER(_fragment_level_profile, "BuildTasksTime");
317
331k
    _prepare_all_pipelines_timer = ADD_TIMER(_fragment_level_profile, "PrepareAllPipelinesTime");
318
331k
    {
319
331k
        SCOPED_TIMER(_init_context_timer);
320
331k
        cast_set(_num_instances, _params.local_params.size());
321
331k
        _total_instances =
322
331k
                _params.__isset.total_instances ? _params.total_instances : _num_instances;
323
324
331k
        auto* fragment_context = this;
325
326
331k
        if (_params.query_options.__isset.is_report_success) {
327
331k
            fragment_context->set_is_report_success(_params.query_options.is_report_success);
328
331k
        }
329
330
        // 1. Set up the global runtime state.
331
331k
        _runtime_state = RuntimeState::create_unique(
332
331k
                _params.query_id, _params.fragment_id, _params.query_options,
333
331k
                _query_ctx->query_globals, _exec_env, _query_ctx.get());
334
331k
        _runtime_state->set_task_execution_context(shared_from_this());
335
331k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
336
331k
        if (_params.__isset.backend_id) {
337
329k
            _runtime_state->set_backend_id(_params.backend_id);
338
329k
        }
339
331k
        if (_params.__isset.import_label) {
340
236
            _runtime_state->set_import_label(_params.import_label);
341
236
        }
342
331k
        if (_params.__isset.db_name) {
343
188
            _runtime_state->set_db_name(_params.db_name);
344
188
        }
345
331k
        if (_params.__isset.load_job_id) {
346
0
            _runtime_state->set_load_job_id(_params.load_job_id);
347
0
        }
348
349
331k
        if (_params.is_simplified_param) {
350
122k
            _desc_tbl = _query_ctx->desc_tbl;
351
209k
        } else {
352
209k
            DCHECK(_params.__isset.desc_tbl);
353
209k
            RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), _params.desc_tbl,
354
209k
                                                  &_desc_tbl));
355
209k
        }
356
331k
        _runtime_state->set_desc_tbl(_desc_tbl);
357
331k
        _runtime_state->set_num_per_fragment_instances(_params.num_senders);
358
331k
        _runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
359
331k
        _runtime_state->set_total_load_streams(_params.total_load_streams);
360
331k
        _runtime_state->set_num_local_sink(_params.num_local_sink);
361
362
        // init fragment_instance_ids
363
331k
        const auto target_size = _params.local_params.size();
364
331k
        _fragment_instance_ids.resize(target_size);
365
1.46M
        for (size_t i = 0; i < _params.local_params.size(); i++) {
366
1.13M
            auto fragment_instance_id = _params.local_params[i].fragment_instance_id;
367
1.13M
            _fragment_instance_ids[i] = fragment_instance_id;
368
1.13M
        }
369
331k
    }
370
371
331k
    RETURN_IF_ERROR(_build_and_prepare_full_pipeline(thread_pool));
372
373
330k
    _init_next_report_time();
374
375
330k
    _prepared = true;
376
330k
    return Status::OK();
377
331k
}
378
379
Status PipelineFragmentContext::_build_pipeline_tasks_for_instance(
380
        int instance_idx,
381
1.14M
        const std::vector<std::shared_ptr<RuntimeProfile>>& pipeline_id_to_profile) {
382
1.14M
    const auto& local_params = _params.local_params[instance_idx];
383
1.14M
    auto fragment_instance_id = local_params.fragment_instance_id;
384
1.14M
    auto runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(false);
385
1.14M
    std::map<PipelineId, PipelineTask*> pipeline_id_to_task;
386
1.14M
    auto get_shared_state = [&](PipelinePtr pipeline)
387
1.14M
            -> std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
388
2.03M
                                       std::vector<std::shared_ptr<Dependency>>>> {
389
2.03M
        std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
390
2.03M
                                std::vector<std::shared_ptr<Dependency>>>>
391
2.03M
                shared_state_map;
392
2.77M
        for (auto& op : pipeline->operators()) {
393
2.77M
            auto source_id = op->operator_id();
394
2.77M
            if (auto iter = _op_id_to_shared_state.find(source_id);
395
2.77M
                iter != _op_id_to_shared_state.end()) {
396
846k
                shared_state_map.insert({source_id, iter->second});
397
846k
            }
398
2.77M
        }
399
2.03M
        for (auto sink_to_source_id : pipeline->sink()->dests_id()) {
400
2.03M
            if (auto iter = _op_id_to_shared_state.find(sink_to_source_id);
401
2.03M
                iter != _op_id_to_shared_state.end()) {
402
425k
                shared_state_map.insert({sink_to_source_id, iter->second});
403
425k
            }
404
2.03M
        }
405
2.03M
        return shared_state_map;
406
2.03M
    };
407
408
3.58M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
409
2.44M
        auto& pipeline = _pipelines[pip_idx];
410
2.44M
        if (pipeline->num_tasks() > 1 || instance_idx == 0) {
411
2.02M
            auto task_runtime_state = RuntimeState::create_unique(
412
2.02M
                    local_params.fragment_instance_id, _params.query_id, _params.fragment_id,
413
2.02M
                    _params.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get());
414
2.02M
            {
415
                // Initialize runtime state for this task
416
2.02M
                task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker());
417
418
2.02M
                task_runtime_state->set_task_execution_context(shared_from_this());
419
2.02M
                task_runtime_state->set_be_number(local_params.backend_num);
420
2.02M
                if (_need_notify_close) {
421
                    // rec cte require child rf to wait infinitely to make sure all rpc done
422
14.9k
                    task_runtime_state->set_force_make_rf_wait_infinite();
423
14.9k
                }
424
425
2.02M
                if (_params.__isset.backend_id) {
426
2.02M
                    task_runtime_state->set_backend_id(_params.backend_id);
427
2.02M
                }
428
2.02M
                if (_params.__isset.import_label) {
429
237
                    task_runtime_state->set_import_label(_params.import_label);
430
237
                }
431
2.02M
                if (_params.__isset.db_name) {
432
189
                    task_runtime_state->set_db_name(_params.db_name);
433
189
                }
434
2.02M
                if (_params.__isset.load_job_id) {
435
0
                    task_runtime_state->set_load_job_id(_params.load_job_id);
436
0
                }
437
2.02M
                if (_params.__isset.wal_id) {
438
114
                    task_runtime_state->set_wal_id(_params.wal_id);
439
114
                }
440
2.02M
                if (_params.__isset.content_length) {
441
31
                    task_runtime_state->set_content_length(_params.content_length);
442
31
                }
443
444
2.02M
                task_runtime_state->set_desc_tbl(_desc_tbl);
445
2.02M
                task_runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
446
2.02M
                task_runtime_state->set_num_per_fragment_instances(_params.num_senders);
447
2.02M
                task_runtime_state->resize_op_id_to_local_state(max_operator_id());
448
2.02M
                task_runtime_state->set_max_operator_id(max_operator_id());
449
2.02M
                task_runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
450
2.02M
                task_runtime_state->set_total_load_streams(_params.total_load_streams);
451
2.02M
                task_runtime_state->set_num_local_sink(_params.num_local_sink);
452
453
2.02M
                task_runtime_state->set_runtime_filter_mgr(runtime_filter_mgr.get());
454
2.02M
            }
455
2.02M
            auto cur_task_id = _total_tasks++;
456
2.02M
            task_runtime_state->set_task_id(cur_task_id);
457
2.02M
            task_runtime_state->set_task_num(pipeline->num_tasks());
458
2.02M
            auto task = std::make_shared<PipelineTask>(
459
2.02M
                    pipeline, cur_task_id, task_runtime_state.get(),
460
2.02M
                    std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
461
2.02M
                    pipeline_id_to_profile[pip_idx].get(), get_shared_state(pipeline),
462
2.02M
                    instance_idx);
463
2.02M
            pipeline->incr_created_tasks(instance_idx, task.get());
464
2.02M
            pipeline_id_to_task.insert({pipeline->id(), task.get()});
465
2.02M
            _tasks[instance_idx].emplace_back(
466
2.02M
                    std::pair<std::shared_ptr<PipelineTask>, std::unique_ptr<RuntimeState>> {
467
2.02M
                            std::move(task), std::move(task_runtime_state)});
468
2.02M
        }
469
2.44M
    }
470
471
    /**
472
         * Build DAG for pipeline tasks.
473
         * For example, we have
474
         *
475
         *   ExchangeSink (Pipeline1)     JoinBuildSink (Pipeline2)
476
         *            \                      /
477
         *          JoinProbeOperator1 (Pipeline1)    JoinBuildSink (Pipeline3)
478
         *                 \                          /
479
         *               JoinProbeOperator2 (Pipeline1)
480
         *
481
         * In this fragment, we have three pipelines and pipeline 1 depends on pipeline 2 and pipeline 3.
482
         * To build this DAG, `_dag` manage dependencies between pipelines by pipeline ID and
483
         * `pipeline_id_to_task` is used to find the task by a unique pipeline ID.
484
         *
485
         * Finally, we have two upstream dependencies in Pipeline1 corresponding to JoinProbeOperator1
486
         * and JoinProbeOperator2.
487
         */
488
2.44M
    for (auto& _pipeline : _pipelines) {
489
2.44M
        if (pipeline_id_to_task.contains(_pipeline->id())) {
490
2.02M
            auto* task = pipeline_id_to_task[_pipeline->id()];
491
2.02M
            DCHECK(task != nullptr);
492
493
            // If this task has upstream dependency, then inject it into this task.
494
2.02M
            if (_dag.contains(_pipeline->id())) {
495
1.28M
                auto& deps = _dag[_pipeline->id()];
496
2.14M
                for (auto& dep : deps) {
497
2.14M
                    if (pipeline_id_to_task.contains(dep)) {
498
1.29M
                        auto ss = pipeline_id_to_task[dep]->get_sink_shared_state();
499
1.29M
                        if (ss) {
500
453k
                            task->inject_shared_state(ss);
501
845k
                        } else {
502
845k
                            pipeline_id_to_task[dep]->inject_shared_state(
503
845k
                                    task->get_source_shared_state());
504
845k
                        }
505
1.29M
                    }
506
2.14M
                }
507
1.28M
            }
508
2.02M
        }
509
2.44M
    }
510
3.58M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
511
2.44M
        if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
512
2.02M
            auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
513
2.02M
            DCHECK(pipeline_id_to_profile[pip_idx]);
514
2.02M
            std::vector<TScanRangeParams> scan_ranges;
515
2.02M
            auto node_id = _pipelines[pip_idx]->operators().front()->node_id();
516
2.02M
            if (local_params.per_node_scan_ranges.contains(node_id)) {
517
321k
                scan_ranges = local_params.per_node_scan_ranges.find(node_id)->second;
518
321k
            }
519
2.02M
            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(task->prepare(scan_ranges, local_params.sender_id,
520
2.02M
                                                             _params.fragment.output_sink));
521
2.02M
        }
522
2.44M
    }
523
1.13M
    {
524
1.13M
        std::lock_guard<std::mutex> l(_state_map_lock);
525
1.13M
        _runtime_filter_mgr_map[instance_idx] = std::move(runtime_filter_mgr);
526
1.13M
    }
527
1.13M
    return Status::OK();
528
1.14M
}
529
530
333k
Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
531
333k
    _total_tasks = 0;
532
333k
    _closed_tasks = 0;
533
333k
    const auto target_size = _params.local_params.size();
534
333k
    _tasks.resize(target_size);
535
333k
    _runtime_filter_mgr_map.resize(target_size);
536
875k
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
537
541k
        _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get();
538
541k
    }
539
333k
    auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size());
540
541
333k
    if (target_size > 1 &&
542
333k
        (_runtime_state->query_options().__isset.parallel_prepare_threshold &&
543
123k
         target_size > _runtime_state->query_options().parallel_prepare_threshold)) {
544
        // If instances parallelism is big enough ( > parallel_prepare_threshold), we will prepare all tasks by multi-threads
545
17.2k
        std::vector<Status> prepare_status(target_size);
546
17.2k
        int submitted_tasks = 0;
547
17.2k
        Status submit_status;
548
17.2k
        CountDownLatch latch((int)target_size);
549
286k
        for (int i = 0; i < target_size; i++) {
550
268k
            submit_status = thread_pool->submit_func([&, i]() {
551
268k
                SCOPED_ATTACH_TASK(_query_ctx.get());
552
268k
                prepare_status[i] = _build_pipeline_tasks_for_instance(i, pipeline_id_to_profile);
553
268k
                latch.count_down();
554
268k
            });
555
268k
            if (LIKELY(submit_status.ok())) {
556
268k
                submitted_tasks++;
557
18.4E
            } else {
558
18.4E
                break;
559
18.4E
            }
560
268k
        }
561
17.2k
        latch.arrive_and_wait(target_size - submitted_tasks);
562
17.2k
        if (UNLIKELY(!submit_status.ok())) {
563
0
            return submit_status;
564
0
        }
565
286k
        for (int i = 0; i < submitted_tasks; i++) {
566
268k
            if (!prepare_status[i].ok()) {
567
0
                return prepare_status[i];
568
0
            }
569
268k
        }
570
316k
    } else {
571
1.18M
        for (int i = 0; i < target_size; i++) {
572
872k
            RETURN_IF_ERROR(_build_pipeline_tasks_for_instance(i, pipeline_id_to_profile));
573
872k
        }
574
316k
    }
575
333k
    _pipeline_parent_map.clear();
576
333k
    _op_id_to_shared_state.clear();
577
578
333k
    return Status::OK();
579
333k
}
580
581
330k
void PipelineFragmentContext::_init_next_report_time() {
582
330k
    auto interval_s = config::pipeline_status_report_interval;
583
330k
    if (_is_report_success && interval_s > 0 && _timeout > interval_s) {
584
34.1k
        VLOG_FILE << "enable period report: fragment id=" << _fragment_id;
585
34.1k
        uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC;
586
        // We don't want to wait longer than it takes to run the entire fragment.
587
34.1k
        _previous_report_time =
588
34.1k
                MonotonicNanos() + report_fragment_offset - (uint64_t)(interval_s)*NANOS_PER_SEC;
589
34.1k
        _disable_period_report = false;
590
34.1k
    }
591
330k
}
592
593
4.21k
void PipelineFragmentContext::refresh_next_report_time() {
594
4.21k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
595
4.21k
    DCHECK(disable == true);
596
4.21k
    _previous_report_time.store(MonotonicNanos(), std::memory_order_release);
597
4.21k
    _disable_period_report.compare_exchange_strong(disable, false);
598
4.21k
}
599
600
6.88M
void PipelineFragmentContext::trigger_report_if_necessary() {
601
6.88M
    if (!_is_report_success) {
602
6.31M
        return;
603
6.31M
    }
604
568k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
605
568k
    if (disable) {
606
7.61k
        return;
607
7.61k
    }
608
561k
    int32_t interval_s = config::pipeline_status_report_interval;
609
561k
    if (interval_s <= 0) {
610
0
        LOG(WARNING)
611
0
                << "config::status_report_interval is equal to or less than zero, do not trigger "
612
0
                   "report.";
613
0
    }
614
561k
    uint64_t next_report_time = _previous_report_time.load(std::memory_order_acquire) +
615
561k
                                (uint64_t)(interval_s)*NANOS_PER_SEC;
616
561k
    if (MonotonicNanos() > next_report_time) {
617
4.22k
        if (!_disable_period_report.compare_exchange_strong(disable, true,
618
4.22k
                                                            std::memory_order_acq_rel)) {
619
6
            return;
620
6
        }
621
4.21k
        if (VLOG_FILE_IS_ON) {
622
0
            VLOG_FILE << "Reporting "
623
0
                      << "profile for query_id " << print_id(_query_id)
624
0
                      << ", fragment id: " << _fragment_id;
625
626
0
            std::stringstream ss;
627
0
            _runtime_state->runtime_profile()->compute_time_in_profile();
628
0
            _runtime_state->runtime_profile()->pretty_print(&ss);
629
0
            if (_runtime_state->load_channel_profile()) {
630
0
                _runtime_state->load_channel_profile()->pretty_print(&ss);
631
0
            }
632
633
0
            VLOG_FILE << "Query " << print_id(get_query_id()) << " fragment " << get_fragment_id()
634
0
                      << " profile:\n"
635
0
                      << ss.str();
636
0
        }
637
4.21k
        auto st = send_report(false);
638
4.21k
        if (!st.ok()) {
639
0
            disable = true;
640
0
            _disable_period_report.compare_exchange_strong(disable, false,
641
0
                                                           std::memory_order_acq_rel);
642
0
        }
643
4.21k
    }
644
561k
}
645
646
Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const DescriptorTbl& descs,
647
333k
                                                 OperatorPtr* root, PipelinePtr cur_pipe) {
648
333k
    if (_params.fragment.plan.nodes.empty()) {
649
0
        throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no plan node!");
650
0
    }
651
652
333k
    int node_idx = 0;
653
654
333k
    RETURN_IF_ERROR(_create_tree_helper(pool, _params.fragment.plan.nodes, descs, nullptr,
655
333k
                                        &node_idx, root, cur_pipe, 0, false, false));
656
657
333k
    if (node_idx + 1 != _params.fragment.plan.nodes.size()) {
658
0
        return Status::InternalError(
659
0
                "Plan tree only partially reconstructed. Not all thrift nodes were used.");
660
0
    }
661
333k
    return Status::OK();
662
333k
}
663
664
Status PipelineFragmentContext::_create_tree_helper(
665
        ObjectPool* pool, const std::vector<TPlanNode>& tnodes, const DescriptorTbl& descs,
666
        OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx,
667
545k
        const bool followed_by_shuffled_operator, const bool require_bucket_distribution) {
668
    // propagate error case
669
545k
    if (*node_idx >= tnodes.size()) {
670
0
        return Status::InternalError(
671
0
                "Failed to reconstruct plan tree from thrift. Node id: {}, number of nodes: {}",
672
0
                *node_idx, tnodes.size());
673
0
    }
674
545k
    const TPlanNode& tnode = tnodes[*node_idx];
675
676
545k
    int num_children = tnodes[*node_idx].num_children;
677
545k
    bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
678
545k
    bool current_require_bucket_distribution = require_bucket_distribution;
679
545k
    OperatorPtr op = nullptr;
680
545k
    RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op, cur_pipe,
681
545k
                                     parent == nullptr ? -1 : parent->node_id(), child_idx,
682
545k
                                     followed_by_shuffled_operator,
683
545k
                                     current_require_bucket_distribution));
684
    // Initialization must be done here. For example, group by expressions in agg will be used to
685
    // decide if a local shuffle should be planed, so it must be initialized here.
686
545k
    RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
687
    // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
688
545k
    if (parent != nullptr) {
689
        // add to parent's child(s)
690
211k
        RETURN_IF_ERROR(parent->set_child(op));
691
333k
    } else {
692
333k
        *root = op;
693
333k
    }
694
    /**
695
     * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
696
     *
697
     * For plan:
698
     * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
699
     *                           Exchange(id=3) -> ShuffledHashJoinBuild(id=2)
700
     * We must ensure data distribution of `LocalExchange(id=0)` is same as Exchange(id=3).
701
     *
702
     * If an operator's is followed by a local exchange without shuffle (e.g. passthrough), a
703
     * shuffled local exchanger will be used before join so it is not followed by shuffle join.
704
     */
705
545k
    auto required_data_distribution =
706
545k
            cur_pipe->operators().empty()
707
545k
                    ? cur_pipe->sink()->required_data_distribution(_runtime_state.get())
708
545k
                    : op->required_data_distribution(_runtime_state.get());
709
545k
    current_followed_by_shuffled_operator =
710
545k
            (followed_by_shuffled_operator ||
711
545k
             (cur_pipe->operators().empty() ? cur_pipe->sink()->is_shuffled_operator()
712
483k
                                            : op->is_shuffled_operator())) &&
713
545k
            Pipeline::is_hash_exchange(required_data_distribution.distribution_type);
714
715
545k
    current_require_bucket_distribution =
716
545k
            (require_bucket_distribution ||
717
545k
             (cur_pipe->operators().empty() ? cur_pipe->sink()->is_colocated_operator()
718
487k
                                            : op->is_colocated_operator())) &&
719
545k
            Pipeline::is_hash_exchange(required_data_distribution.distribution_type);
720
721
545k
    if (num_children == 0) {
722
346k
        _use_serial_source = op->is_serial_operator();
723
346k
    }
724
    // rely on that tnodes is preorder of the plan
725
757k
    for (int i = 0; i < num_children; i++) {
726
211k
        ++*node_idx;
727
211k
        RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, descs, op, node_idx, nullptr, cur_pipe, i,
728
211k
                                            current_followed_by_shuffled_operator,
729
211k
                                            current_require_bucket_distribution));
730
731
        // we are expecting a child, but have used all nodes
732
        // this means we have been given a bad tree and must fail
733
211k
        if (*node_idx >= tnodes.size()) {
734
0
            return Status::InternalError(
735
0
                    "Failed to reconstruct plan tree from thrift. Node id: {}, number of nodes: {}",
736
0
                    *node_idx, tnodes.size());
737
0
        }
738
211k
    }
739
740
545k
    return Status::OK();
741
545k
}
742
743
void PipelineFragmentContext::_inherit_pipeline_properties(
744
        const DataDistribution& data_distribution, PipelinePtr pipe_with_source,
745
108k
        PipelinePtr pipe_with_sink) {
746
108k
    pipe_with_sink->set_num_tasks(pipe_with_source->num_tasks());
747
108k
    pipe_with_source->set_num_tasks(_num_instances);
748
108k
    pipe_with_source->set_data_distribution(data_distribution);
749
108k
}
750
751
Status PipelineFragmentContext::_add_local_exchange_impl(
752
        int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip,
753
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
754
        const std::map<int, int>& bucket_seq_to_instance_idx,
755
108k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
756
108k
    auto& operators = cur_pipe->operators();
757
108k
    const auto downstream_pipeline_id = cur_pipe->id();
758
108k
    auto local_exchange_id = next_operator_id();
759
    // 1. Create a new pipeline with local exchange sink.
760
108k
    DataSinkOperatorPtr sink;
761
108k
    auto sink_id = next_sink_operator_id();
762
763
    /**
764
     * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
765
     * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
766
     */
767
108k
    const bool followed_by_shuffled_operator =
768
108k
            operators.size() > idx ? operators[idx]->followed_by_shuffled_operator()
769
108k
                                   : cur_pipe->sink()->followed_by_shuffled_operator();
770
108k
    const bool use_global_hash_shuffle = bucket_seq_to_instance_idx.empty() &&
771
108k
                                         !shuffle_idx_to_instance_idx.contains(-1) &&
772
108k
                                         followed_by_shuffled_operator && !_use_serial_source;
773
108k
    sink = std::make_shared<LocalExchangeSinkOperatorX>(
774
108k
            sink_id, local_exchange_id, use_global_hash_shuffle ? _total_instances : _num_instances,
775
108k
            data_distribution.partition_exprs, bucket_seq_to_instance_idx);
776
108k
    if (bucket_seq_to_instance_idx.empty() &&
777
108k
        data_distribution.distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE) {
778
39
        data_distribution.distribution_type = ExchangeType::HASH_SHUFFLE;
779
39
    }
780
108k
    RETURN_IF_ERROR(new_pip->set_sink(sink));
781
108k
    RETURN_IF_ERROR(new_pip->sink()->init(_runtime_state.get(), data_distribution.distribution_type,
782
108k
                                          num_buckets, use_global_hash_shuffle,
783
108k
                                          shuffle_idx_to_instance_idx));
784
785
    // 2. Create and initialize LocalExchangeSharedState.
786
108k
    std::shared_ptr<LocalExchangeSharedState> shared_state =
787
108k
            LocalExchangeSharedState::create_shared(_num_instances);
788
108k
    switch (data_distribution.distribution_type) {
789
24.5k
    case ExchangeType::HASH_SHUFFLE:
790
24.5k
        shared_state->exchanger = ShuffleExchanger::create_unique(
791
24.5k
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
792
24.5k
                use_global_hash_shuffle ? _total_instances : _num_instances,
793
24.5k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
794
24.5k
                        ? cast_set<int>(
795
24.5k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
796
24.5k
                        : 0);
797
24.5k
        break;
798
551
    case ExchangeType::BUCKET_HASH_SHUFFLE:
799
551
        shared_state->exchanger = BucketShuffleExchanger::create_unique(
800
551
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, num_buckets,
801
551
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
802
551
                        ? cast_set<int>(
803
551
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
804
551
                        : 0);
805
551
        break;
806
79.6k
    case ExchangeType::PASSTHROUGH:
807
79.6k
        shared_state->exchanger = PassthroughExchanger::create_unique(
808
79.6k
                cur_pipe->num_tasks(), _num_instances,
809
79.6k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
810
79.6k
                        ? cast_set<int>(
811
79.6k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
812
18.4E
                        : 0);
813
79.6k
        break;
814
417
    case ExchangeType::BROADCAST:
815
417
        shared_state->exchanger = BroadcastExchanger::create_unique(
816
417
                cur_pipe->num_tasks(), _num_instances,
817
417
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
818
417
                        ? cast_set<int>(
819
417
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
820
417
                        : 0);
821
417
        break;
822
2.13k
    case ExchangeType::PASS_TO_ONE:
823
2.13k
        if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
824
            // If shared hash table is enabled for BJ, hash table will be built by only one task
825
871
            shared_state->exchanger = PassToOneExchanger::create_unique(
826
871
                    cur_pipe->num_tasks(), _num_instances,
827
871
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
828
871
                            ? cast_set<int>(_runtime_state->query_options()
829
871
                                                    .local_exchange_free_blocks_limit)
830
871
                            : 0);
831
1.26k
        } else {
832
1.26k
            shared_state->exchanger = BroadcastExchanger::create_unique(
833
1.26k
                    cur_pipe->num_tasks(), _num_instances,
834
1.26k
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
835
1.26k
                            ? cast_set<int>(_runtime_state->query_options()
836
1.26k
                                                    .local_exchange_free_blocks_limit)
837
1.26k
                            : 0);
838
1.26k
        }
839
2.13k
        break;
840
832
    case ExchangeType::ADAPTIVE_PASSTHROUGH:
841
832
        shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
842
832
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
843
832
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
844
832
                        ? cast_set<int>(
845
832
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
846
832
                        : 0);
847
832
        break;
848
0
    default:
849
0
        return Status::InternalError("Unsupported local exchange type : " +
850
0
                                     std::to_string((int)data_distribution.distribution_type));
851
108k
    }
852
108k
    shared_state->create_source_dependencies(_num_instances, local_exchange_id, local_exchange_id,
853
108k
                                             "LOCAL_EXCHANGE_OPERATOR");
854
108k
    shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
855
108k
    _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
856
857
    // 3. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to
858
    // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink].
859
860
    // 3.1 Initialize new pipeline's operator list.
861
108k
    std::copy(operators.begin(), operators.begin() + idx,
862
108k
              std::inserter(new_pip->operators(), new_pip->operators().end()));
863
864
    // 3.2 Erase unused operators in previous pipeline.
865
108k
    operators.erase(operators.begin(), operators.begin() + idx);
866
867
    // 4. Initialize LocalExchangeSource and insert it into this pipeline.
868
108k
    OperatorPtr source_op;
869
108k
    source_op = std::make_shared<LocalExchangeSourceOperatorX>(pool, local_exchange_id);
870
108k
    RETURN_IF_ERROR(source_op->set_child(new_pip->operators().back()));
871
108k
    RETURN_IF_ERROR(source_op->init(data_distribution.distribution_type));
872
108k
    if (!operators.empty()) {
873
44.2k
        RETURN_IF_ERROR(operators.front()->set_child(nullptr));
874
44.2k
        RETURN_IF_ERROR(operators.front()->set_child(source_op));
875
44.2k
    }
876
108k
    operators.insert(operators.begin(), source_op);
877
878
    // 5. Set children for two pipelines separately.
879
108k
    std::vector<std::shared_ptr<Pipeline>> new_children;
880
108k
    std::vector<PipelineId> edges_with_source;
881
125k
    for (auto child : cur_pipe->children()) {
882
125k
        bool found = false;
883
139k
        for (auto op : new_pip->operators()) {
884
139k
            if (child->sink()->node_id() == op->node_id()) {
885
12.2k
                new_pip->set_children(child);
886
12.2k
                found = true;
887
12.2k
            };
888
139k
        }
889
125k
        if (!found) {
890
112k
            new_children.push_back(child);
891
112k
            edges_with_source.push_back(child->id());
892
112k
        }
893
125k
    }
894
108k
    new_children.push_back(new_pip);
895
108k
    edges_with_source.push_back(new_pip->id());
896
897
    // 6. Set DAG for new pipelines.
898
108k
    if (!new_pip->children().empty()) {
899
6.74k
        std::vector<PipelineId> edges_with_sink;
900
12.2k
        for (auto child : new_pip->children()) {
901
12.2k
            edges_with_sink.push_back(child->id());
902
12.2k
        }
903
6.74k
        _dag.insert({new_pip->id(), edges_with_sink});
904
6.74k
    }
905
108k
    cur_pipe->set_children(new_children);
906
108k
    _dag[downstream_pipeline_id] = edges_with_source;
907
108k
    RETURN_IF_ERROR(new_pip->sink()->set_child(new_pip->operators().back()));
908
108k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(nullptr));
909
108k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(cur_pipe->operators().back()));
910
911
    // 7. Inherit properties from current pipeline.
912
108k
    _inherit_pipeline_properties(data_distribution, cur_pipe, new_pip);
913
108k
    return Status::OK();
914
108k
}
915
916
Status PipelineFragmentContext::_add_local_exchange(
917
        int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe,
918
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
919
        const std::map<int, int>& bucket_seq_to_instance_idx,
920
173k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
921
173k
    if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) {
922
41.3k
        return Status::OK();
923
41.3k
    }
924
925
131k
    if (!cur_pipe->need_to_local_exchange(data_distribution, idx)) {
926
48.6k
        return Status::OK();
927
48.6k
    }
928
83.1k
    *do_local_exchange = true;
929
930
83.1k
    auto& operators = cur_pipe->operators();
931
83.1k
    auto total_op_num = operators.size();
932
83.1k
    auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
933
83.1k
    RETURN_IF_ERROR(_add_local_exchange_impl(
934
83.1k
            idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets,
935
83.1k
            bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
936
937
18.4E
    CHECK(total_op_num + 1 == cur_pipe->operators().size() + new_pip->operators().size())
938
18.4E
            << "total_op_num: " << total_op_num
939
18.4E
            << " cur_pipe->operators().size(): " << cur_pipe->operators().size()
940
18.4E
            << " new_pip->operators().size(): " << new_pip->operators().size();
941
942
    // There are some local shuffles with relatively heavy operations on the sink.
943
    // If the local sink concurrency is 1 and the local source concurrency is n, the sink becomes a bottleneck.
944
    // Therefore, local passthrough is used to increase the concurrency of the sink.
945
    // op -> local sink(1) -> local source (n)
946
    // op -> local passthrough(1) -> local passthrough(n) ->  local sink(n) -> local source (n)
947
83.2k
    if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 &&
948
83.1k
        Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) {
949
24.8k
        RETURN_IF_ERROR(_add_local_exchange_impl(
950
24.8k
                cast_set<int>(new_pip->operators().size()), pool, new_pip,
951
24.8k
                add_pipeline(new_pip, pip_idx + 2), DataDistribution(ExchangeType::PASSTHROUGH),
952
24.8k
                do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
953
24.8k
                shuffle_idx_to_instance_idx));
954
24.8k
    }
955
83.1k
    return Status::OK();
956
83.1k
}
957
958
Status PipelineFragmentContext::_plan_local_exchange(
959
        int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx,
960
331k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
961
763k
    for (int pip_idx = cast_set<int>(_pipelines.size()) - 1; pip_idx >= 0; pip_idx--) {
962
431k
        _pipelines[pip_idx]->init_data_distribution(_runtime_state.get());
963
        // Set property if child pipeline is not join operator's child.
964
431k
        if (!_pipelines[pip_idx]->children().empty()) {
965
97.6k
            for (auto& child : _pipelines[pip_idx]->children()) {
966
97.6k
                if (child->sink()->node_id() ==
967
97.6k
                    _pipelines[pip_idx]->operators().front()->node_id()) {
968
86.2k
                    _pipelines[pip_idx]->set_data_distribution(child->data_distribution());
969
86.2k
                }
970
97.6k
            }
971
93.8k
        }
972
973
        // if 'num_buckets == 0' means the fragment is colocated by exchange node not the
974
        // scan node. so here use `_num_instance` to replace the `num_buckets` to prevent dividing 0
975
        // still keep colocate plan after local shuffle
976
431k
        RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx, _pipelines[pip_idx],
977
431k
                                             bucket_seq_to_instance_idx,
978
431k
                                             shuffle_idx_to_instance_idx));
979
431k
    }
980
331k
    return Status::OK();
981
331k
}
982
983
Status PipelineFragmentContext::_plan_local_exchange(
984
        int num_buckets, int pip_idx, PipelinePtr pip,
985
        const std::map<int, int>& bucket_seq_to_instance_idx,
986
431k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
987
431k
    int idx = 1;
988
431k
    bool do_local_exchange = false;
989
475k
    do {
990
475k
        auto& ops = pip->operators();
991
475k
        do_local_exchange = false;
992
        // Plan local exchange for each operator.
993
545k
        for (; idx < ops.size();) {
994
114k
            if (ops[idx]->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
995
110k
                RETURN_IF_ERROR(_add_local_exchange(
996
110k
                        pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip,
997
110k
                        ops[idx]->required_data_distribution(_runtime_state.get()),
998
110k
                        &do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
999
110k
                        shuffle_idx_to_instance_idx));
1000
110k
            }
1001
114k
            if (do_local_exchange) {
1002
                // If local exchange is needed for current operator, we will split this pipeline to
1003
                // two pipelines by local exchange sink/source. And then we need to process remaining
1004
                // operators in this pipeline so we set idx to 2 (0 is local exchange source and 1
1005
                // is current operator was already processed) and continue to plan local exchange.
1006
44.2k
                idx = 2;
1007
44.2k
                break;
1008
44.2k
            }
1009
70.2k
            idx++;
1010
70.2k
        }
1011
475k
    } while (do_local_exchange);
1012
431k
    if (pip->sink()->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1013
62.4k
        RETURN_IF_ERROR(_add_local_exchange(
1014
62.4k
                pip_idx, idx, pip->sink()->node_id(), _runtime_state->obj_pool(), pip,
1015
62.4k
                pip->sink()->required_data_distribution(_runtime_state.get()), &do_local_exchange,
1016
62.4k
                num_buckets, bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1017
62.4k
    }
1018
431k
    return Status::OK();
1019
431k
}
1020
1021
Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
1022
                                                  const std::vector<TExpr>& output_exprs,
1023
                                                  const TPipelineFragmentParams& params,
1024
                                                  const RowDescriptor& row_desc,
1025
                                                  RuntimeState* state, DescriptorTbl& desc_tbl,
1026
334k
                                                  PipelineId cur_pipeline_id) {
1027
334k
    switch (thrift_sink.type) {
1028
124k
    case TDataSinkType::DATA_STREAM_SINK: {
1029
124k
        if (!thrift_sink.__isset.stream_sink) {
1030
0
            return Status::InternalError("Missing data stream sink.");
1031
0
        }
1032
124k
        _sink = std::make_shared<ExchangeSinkOperatorX>(
1033
124k
                state, row_desc, next_sink_operator_id(), thrift_sink.stream_sink,
1034
124k
                params.destinations, _fragment_instance_ids);
1035
124k
        break;
1036
124k
    }
1037
180k
    case TDataSinkType::RESULT_SINK: {
1038
180k
        if (!thrift_sink.__isset.result_sink) {
1039
0
            return Status::InternalError("Missing data buffer sink.");
1040
0
        }
1041
1042
180k
        _sink = std::make_shared<ResultSinkOperatorX>(next_sink_operator_id(), row_desc,
1043
180k
                                                      output_exprs, thrift_sink.result_sink);
1044
180k
        break;
1045
180k
    }
1046
105
    case TDataSinkType::DICTIONARY_SINK: {
1047
105
        if (!thrift_sink.__isset.dictionary_sink) {
1048
0
            return Status::InternalError("Missing dict sink.");
1049
0
        }
1050
1051
105
        _sink = std::make_shared<DictSinkOperatorX>(next_sink_operator_id(), row_desc, output_exprs,
1052
105
                                                    thrift_sink.dictionary_sink);
1053
105
        break;
1054
105
    }
1055
0
    case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
1056
28.3k
    case TDataSinkType::OLAP_TABLE_SINK: {
1057
28.3k
        if (state->query_options().enable_memtable_on_sink_node &&
1058
28.3k
            !_has_inverted_index_v1_or_partial_update(thrift_sink.olap_table_sink) &&
1059
28.3k
            !config::is_cloud_mode()) {
1060
17
            _sink = std::make_shared<OlapTableSinkV2OperatorX>(pool, next_sink_operator_id(),
1061
17
                                                               row_desc, output_exprs);
1062
28.3k
        } else {
1063
28.3k
            _sink = std::make_shared<OlapTableSinkOperatorX>(pool, next_sink_operator_id(),
1064
28.3k
                                                             row_desc, output_exprs);
1065
28.3k
        }
1066
28.3k
        break;
1067
0
    }
1068
163
    case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
1069
163
        DCHECK(thrift_sink.__isset.olap_table_sink);
1070
163
        DCHECK(state->get_query_ctx() != nullptr);
1071
163
        state->get_query_ctx()->query_mem_tracker()->is_group_commit_load = true;
1072
163
        _sink = std::make_shared<GroupCommitBlockSinkOperatorX>(next_sink_operator_id(), row_desc,
1073
163
                                                                output_exprs);
1074
163
        break;
1075
0
    }
1076
0
    case TDataSinkType::HIVE_TABLE_SINK: {
1077
0
        if (!thrift_sink.__isset.hive_table_sink) {
1078
0
            return Status::InternalError("Missing hive table sink.");
1079
0
        }
1080
0
        _sink = std::make_shared<HiveTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1081
0
                                                         output_exprs);
1082
0
        break;
1083
0
    }
1084
0
    case TDataSinkType::ICEBERG_TABLE_SINK: {
1085
0
        if (!thrift_sink.__isset.iceberg_table_sink) {
1086
0
            return Status::InternalError("Missing iceberg table sink.");
1087
0
        }
1088
0
        if (thrift_sink.iceberg_table_sink.__isset.sort_info) {
1089
0
            _sink = std::make_shared<SpillIcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1090
0
                                                                     row_desc, output_exprs);
1091
0
        } else {
1092
0
            _sink = std::make_shared<IcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1093
0
                                                                row_desc, output_exprs);
1094
0
        }
1095
0
        break;
1096
0
    }
1097
0
    case TDataSinkType::MAXCOMPUTE_TABLE_SINK: {
1098
0
        if (!thrift_sink.__isset.max_compute_table_sink) {
1099
0
            return Status::InternalError("Missing max compute table sink.");
1100
0
        }
1101
0
        _sink = std::make_shared<MCTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1102
0
                                                       output_exprs);
1103
0
        break;
1104
0
    }
1105
0
    case TDataSinkType::JDBC_TABLE_SINK: {
1106
0
        if (!thrift_sink.__isset.jdbc_table_sink) {
1107
0
            return Status::InternalError("Missing data jdbc sink.");
1108
0
        }
1109
0
        if (config::enable_java_support) {
1110
0
            _sink = std::make_shared<JdbcTableSinkOperatorX>(row_desc, next_sink_operator_id(),
1111
0
                                                             output_exprs);
1112
0
        } else {
1113
0
            return Status::InternalError(
1114
0
                    "Jdbc table sink is not enabled, you can change be config "
1115
0
                    "enable_java_support to true and restart be.");
1116
0
        }
1117
0
        break;
1118
0
    }
1119
3
    case TDataSinkType::MEMORY_SCRATCH_SINK: {
1120
3
        if (!thrift_sink.__isset.memory_scratch_sink) {
1121
0
            return Status::InternalError("Missing data buffer sink.");
1122
0
        }
1123
1124
3
        _sink = std::make_shared<MemoryScratchSinkOperatorX>(row_desc, next_sink_operator_id(),
1125
3
                                                             output_exprs);
1126
3
        break;
1127
3
    }
1128
321
    case TDataSinkType::RESULT_FILE_SINK: {
1129
321
        if (!thrift_sink.__isset.result_file_sink) {
1130
0
            return Status::InternalError("Missing result file sink.");
1131
0
        }
1132
1133
        // Result file sink is not the top sink
1134
321
        if (params.__isset.destinations && !params.destinations.empty()) {
1135
0
            _sink = std::make_shared<ResultFileSinkOperatorX>(
1136
0
                    next_sink_operator_id(), row_desc, thrift_sink.result_file_sink,
1137
0
                    params.destinations, output_exprs, desc_tbl);
1138
321
        } else {
1139
321
            _sink = std::make_shared<ResultFileSinkOperatorX>(next_sink_operator_id(), row_desc,
1140
321
                                                              output_exprs);
1141
321
        }
1142
321
        break;
1143
321
    }
1144
770
    case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
1145
770
        DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
1146
770
        DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
1147
770
        auto sink_id = next_sink_operator_id();
1148
770
        const int multi_cast_node_id = sink_id;
1149
770
        auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size();
1150
        // one sink has multiple sources.
1151
770
        std::vector<int> sources;
1152
2.86k
        for (int i = 0; i < sender_size; ++i) {
1153
2.09k
            auto source_id = next_operator_id();
1154
2.09k
            sources.push_back(source_id);
1155
2.09k
        }
1156
1157
770
        _sink = std::make_shared<MultiCastDataStreamSinkOperatorX>(
1158
770
                sink_id, multi_cast_node_id, sources, pool, thrift_sink.multi_cast_stream_sink);
1159
2.86k
        for (int i = 0; i < sender_size; ++i) {
1160
2.09k
            auto new_pipeline = add_pipeline();
1161
            // use to exchange sink
1162
2.09k
            RowDescriptor* exchange_row_desc = nullptr;
1163
2.09k
            {
1164
2.09k
                const auto& tmp_row_desc =
1165
2.09k
                        !thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty()
1166
2.09k
                                ? RowDescriptor(state->desc_tbl(),
1167
2.09k
                                                {thrift_sink.multi_cast_stream_sink.sinks[i]
1168
2.09k
                                                         .output_tuple_id})
1169
2.09k
                                : row_desc;
1170
2.09k
                exchange_row_desc = pool->add(new RowDescriptor(tmp_row_desc));
1171
2.09k
            }
1172
2.09k
            auto source_id = sources[i];
1173
2.09k
            OperatorPtr source_op;
1174
            // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
1175
2.09k
            source_op = std::make_shared<MultiCastDataStreamerSourceOperatorX>(
1176
2.09k
                    /*node_id*/ source_id, /*consumer_id*/ i, pool,
1177
2.09k
                    thrift_sink.multi_cast_stream_sink.sinks[i], row_desc,
1178
2.09k
                    /*operator_id=*/source_id);
1179
2.09k
            RETURN_IF_ERROR(new_pipeline->add_operator(
1180
2.09k
                    source_op, params.__isset.parallel_instances ? params.parallel_instances : 0));
1181
            // 2. create and set sink operator of data stream sender for new pipeline
1182
1183
2.09k
            DataSinkOperatorPtr sink_op;
1184
2.09k
            sink_op = std::make_shared<ExchangeSinkOperatorX>(
1185
2.09k
                    state, *exchange_row_desc, next_sink_operator_id(),
1186
2.09k
                    thrift_sink.multi_cast_stream_sink.sinks[i],
1187
2.09k
                    thrift_sink.multi_cast_stream_sink.destinations[i], _fragment_instance_ids);
1188
1189
2.09k
            RETURN_IF_ERROR(new_pipeline->set_sink(sink_op));
1190
2.09k
            {
1191
2.09k
                TDataSink* t = pool->add(new TDataSink());
1192
2.09k
                t->stream_sink = thrift_sink.multi_cast_stream_sink.sinks[i];
1193
2.09k
                RETURN_IF_ERROR(sink_op->init(*t));
1194
2.09k
            }
1195
1196
            // 3. set dependency dag
1197
2.09k
            _dag[new_pipeline->id()].push_back(cur_pipeline_id);
1198
2.09k
        }
1199
770
        if (sources.empty()) {
1200
0
            return Status::InternalError("size of sources must be greater than 0");
1201
0
        }
1202
770
        break;
1203
770
    }
1204
770
    case TDataSinkType::BLACKHOLE_SINK: {
1205
3
        if (!thrift_sink.__isset.blackhole_sink) {
1206
0
            return Status::InternalError("Missing blackhole sink.");
1207
0
        }
1208
1209
3
        _sink.reset(new BlackholeSinkOperatorX(next_sink_operator_id()));
1210
3
        break;
1211
3
    }
1212
0
    case TDataSinkType::TVF_TABLE_SINK: {
1213
0
        if (!thrift_sink.__isset.tvf_table_sink) {
1214
0
            return Status::InternalError("Missing TVF table sink.");
1215
0
        }
1216
0
        _sink = std::make_shared<TVFTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1217
0
                                                        output_exprs);
1218
0
        break;
1219
0
    }
1220
0
    default:
1221
0
        return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type);
1222
334k
    }
1223
334k
    return Status::OK();
1224
334k
}
1225
1226
// NOLINTBEGIN(readability-function-size)
1227
// NOLINTBEGIN(readability-function-cognitive-complexity)
1228
Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNode& tnode,
1229
                                                 const DescriptorTbl& descs, OperatorPtr& op,
1230
                                                 PipelinePtr& cur_pipe, int parent_idx,
1231
                                                 int child_idx,
1232
                                                 const bool followed_by_shuffled_operator,
1233
547k
                                                 const bool require_bucket_distribution) {
1234
547k
    std::vector<DataSinkOperatorPtr> sink_ops;
1235
547k
    Defer defer = Defer([&]() {
1236
546k
        if (op) {
1237
546k
            op->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1238
546k
        }
1239
546k
        for (auto& s : sink_ops) {
1240
97.3k
            s->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1241
97.3k
        }
1242
546k
    });
1243
    // We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
1244
    // Therefore, here we need to use a stack-like structure.
1245
547k
    _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
1246
547k
    std::stringstream error_msg;
1247
547k
    bool enable_query_cache = _params.fragment.__isset.query_cache_param;
1248
1249
547k
    bool fe_with_old_version = false;
1250
547k
    switch (tnode.node_type) {
1251
171k
    case TPlanNodeType::OLAP_SCAN_NODE: {
1252
171k
        op = std::make_shared<OlapScanOperatorX>(
1253
171k
                pool, tnode, next_operator_id(), descs, _num_instances,
1254
171k
                enable_query_cache ? _params.fragment.query_cache_param : TQueryCacheParam {});
1255
171k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1256
171k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1257
171k
        break;
1258
171k
    }
1259
75
    case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
1260
75
        DCHECK(_query_ctx != nullptr);
1261
75
        _query_ctx->query_mem_tracker()->is_group_commit_load = true;
1262
75
        op = std::make_shared<GroupCommitOperatorX>(pool, tnode, next_operator_id(), descs,
1263
75
                                                    _num_instances);
1264
75
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1265
75
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1266
75
        break;
1267
75
    }
1268
4
    case TPlanNodeType::JDBC_SCAN_NODE: {
1269
4
        if (config::enable_java_support) {
1270
4
            op = std::make_shared<JDBCScanOperatorX>(pool, tnode, next_operator_id(), descs,
1271
4
                                                     _num_instances);
1272
4
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1273
4
        } else {
1274
0
            return Status::InternalError(
1275
0
                    "Jdbc scan node is disabled, you can change be config enable_java_support "
1276
0
                    "to true and restart be.");
1277
0
        }
1278
4
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1279
4
        break;
1280
4
    }
1281
2.66k
    case TPlanNodeType::FILE_SCAN_NODE: {
1282
2.66k
        op = std::make_shared<FileScanOperatorX>(pool, tnode, next_operator_id(), descs,
1283
2.66k
                                                 _num_instances);
1284
2.66k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1285
2.66k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1286
2.66k
        break;
1287
2.66k
    }
1288
0
    case TPlanNodeType::ES_SCAN_NODE:
1289
0
    case TPlanNodeType::ES_HTTP_SCAN_NODE: {
1290
0
        op = std::make_shared<EsScanOperatorX>(pool, tnode, next_operator_id(), descs,
1291
0
                                               _num_instances);
1292
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1293
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1294
0
        break;
1295
0
    }
1296
124k
    case TPlanNodeType::EXCHANGE_NODE: {
1297
124k
        int num_senders = _params.per_exch_num_senders.contains(tnode.node_id)
1298
124k
                                  ? _params.per_exch_num_senders.find(tnode.node_id)->second
1299
18.4E
                                  : 0;
1300
124k
        DCHECK_GT(num_senders, 0);
1301
124k
        op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs,
1302
124k
                                                       num_senders);
1303
124k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1304
124k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1305
124k
        break;
1306
124k
    }
1307
149k
    case TPlanNodeType::AGGREGATION_NODE: {
1308
149k
        if (tnode.agg_node.grouping_exprs.empty() &&
1309
149k
            descs.get_tuple_descriptor(tnode.agg_node.output_tuple_id)->slots().empty()) {
1310
0
            return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) +
1311
0
                                         ": group by and output is empty");
1312
0
        }
1313
149k
        bool need_create_cache_op =
1314
149k
                enable_query_cache && tnode.node_id == _params.fragment.query_cache_param.node_id;
1315
149k
        auto create_query_cache_operator = [&](PipelinePtr& new_pipe) {
1316
19
            auto cache_node_id = _params.local_params[0].per_node_scan_ranges.begin()->first;
1317
19
            auto cache_source_id = next_operator_id();
1318
19
            op = std::make_shared<CacheSourceOperatorX>(pool, cache_node_id, cache_source_id,
1319
19
                                                        _params.fragment.query_cache_param);
1320
19
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1321
1322
19
            const auto downstream_pipeline_id = cur_pipe->id();
1323
19
            if (!_dag.contains(downstream_pipeline_id)) {
1324
19
                _dag.insert({downstream_pipeline_id, {}});
1325
19
            }
1326
19
            new_pipe = add_pipeline(cur_pipe);
1327
19
            _dag[downstream_pipeline_id].push_back(new_pipe->id());
1328
1329
19
            DataSinkOperatorPtr cache_sink(new CacheSinkOperatorX(
1330
19
                    next_sink_operator_id(), cache_source_id, op->operator_id()));
1331
19
            RETURN_IF_ERROR(new_pipe->set_sink(cache_sink));
1332
19
            return Status::OK();
1333
19
        };
1334
149k
        const bool group_by_limit_opt =
1335
149k
                tnode.agg_node.__isset.agg_sort_info_by_group_key && tnode.limit > 0;
1336
1337
        /// PartitionedAggSourceOperatorX does not support "group by limit opt(#29641)" yet.
1338
        /// If `group_by_limit_opt` is true, then it might not need to spill at all.
1339
149k
        const bool enable_spill = _runtime_state->enable_spill() &&
1340
149k
                                  !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt;
1341
149k
        const bool is_streaming_agg = tnode.agg_node.__isset.use_streaming_preaggregation &&
1342
149k
                                      tnode.agg_node.use_streaming_preaggregation &&
1343
149k
                                      !tnode.agg_node.grouping_exprs.empty();
1344
149k
        const bool can_use_distinct_streaming_agg =
1345
149k
                tnode.agg_node.aggregate_functions.empty() &&
1346
149k
                !tnode.agg_node.__isset.agg_sort_info_by_group_key &&
1347
149k
                _params.query_options.__isset.enable_distinct_streaming_aggregation &&
1348
149k
                _params.query_options.enable_distinct_streaming_aggregation;
1349
1350
149k
        if (can_use_distinct_streaming_agg) {
1351
99.2k
            if (need_create_cache_op) {
1352
8
                PipelinePtr new_pipe;
1353
8
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1354
1355
8
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1356
8
                                                                     tnode, descs);
1357
8
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1358
8
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1359
8
                cur_pipe = new_pipe;
1360
99.2k
            } else {
1361
99.2k
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1362
99.2k
                                                                     tnode, descs);
1363
99.2k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1364
99.2k
            }
1365
99.2k
        } else if (is_streaming_agg) {
1366
1.39k
            if (need_create_cache_op) {
1367
0
                PipelinePtr new_pipe;
1368
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1369
1370
0
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1371
0
                                                             descs);
1372
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1373
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1374
0
                cur_pipe = new_pipe;
1375
1.39k
            } else {
1376
1.39k
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1377
1.39k
                                                             descs);
1378
1.39k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1379
1.39k
            }
1380
48.9k
        } else {
1381
            // create new pipeline to add query cache operator
1382
48.9k
            PipelinePtr new_pipe;
1383
48.9k
            if (need_create_cache_op) {
1384
11
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1385
11
            }
1386
1387
48.9k
            if (enable_spill) {
1388
295
                op = std::make_shared<PartitionedAggSourceOperatorX>(pool, tnode,
1389
295
                                                                     next_operator_id(), descs);
1390
48.6k
            } else {
1391
48.6k
                op = std::make_shared<AggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1392
48.6k
            }
1393
48.9k
            if (need_create_cache_op) {
1394
11
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1395
11
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1396
11
                cur_pipe = new_pipe;
1397
48.9k
            } else {
1398
48.9k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1399
48.9k
            }
1400
1401
48.9k
            const auto downstream_pipeline_id = cur_pipe->id();
1402
48.9k
            if (!_dag.contains(downstream_pipeline_id)) {
1403
48.0k
                _dag.insert({downstream_pipeline_id, {}});
1404
48.0k
            }
1405
48.9k
            cur_pipe = add_pipeline(cur_pipe);
1406
48.9k
            _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1407
1408
48.9k
            if (enable_spill) {
1409
295
                sink_ops.push_back(std::make_shared<PartitionedAggSinkOperatorX>(
1410
295
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1411
48.6k
            } else {
1412
48.6k
                sink_ops.push_back(std::make_shared<AggSinkOperatorX>(
1413
48.6k
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1414
48.6k
            }
1415
48.9k
            RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1416
48.9k
            RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1417
48.9k
        }
1418
149k
        break;
1419
149k
    }
1420
149k
    case TPlanNodeType::HASH_JOIN_NODE: {
1421
8.96k
        const auto is_broadcast_join = tnode.hash_join_node.__isset.is_broadcast_join &&
1422
8.96k
                                       tnode.hash_join_node.is_broadcast_join;
1423
8.96k
        const auto enable_spill = _runtime_state->enable_spill();
1424
8.96k
        if (enable_spill && !is_broadcast_join) {
1425
0
            auto tnode_ = tnode;
1426
0
            tnode_.runtime_filters.clear();
1427
0
            uint32_t partition_count = _runtime_state->spill_hash_join_partition_count();
1428
0
            auto inner_probe_operator =
1429
0
                    std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, descs);
1430
1431
            // probe side inner sink operator is used to build hash table on probe side when data is spilled.
1432
            // So here use `tnode_` which has no runtime filters.
1433
0
            auto probe_side_inner_sink_operator =
1434
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode_, descs);
1435
1436
0
            RETURN_IF_ERROR(inner_probe_operator->init(tnode_, _runtime_state.get()));
1437
0
            RETURN_IF_ERROR(probe_side_inner_sink_operator->init(tnode_, _runtime_state.get()));
1438
1439
0
            auto probe_operator = std::make_shared<PartitionedHashJoinProbeOperatorX>(
1440
0
                    pool, tnode_, next_operator_id(), descs, partition_count);
1441
0
            probe_operator->set_inner_operators(probe_side_inner_sink_operator,
1442
0
                                                inner_probe_operator);
1443
0
            op = std::move(probe_operator);
1444
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1445
1446
0
            const auto downstream_pipeline_id = cur_pipe->id();
1447
0
            if (!_dag.contains(downstream_pipeline_id)) {
1448
0
                _dag.insert({downstream_pipeline_id, {}});
1449
0
            }
1450
0
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1451
0
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1452
1453
0
            auto inner_sink_operator =
1454
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode, descs);
1455
0
            auto sink_operator = std::make_shared<PartitionedHashJoinSinkOperatorX>(
1456
0
                    pool, next_sink_operator_id(), op->operator_id(), tnode_, descs,
1457
0
                    partition_count);
1458
0
            RETURN_IF_ERROR(inner_sink_operator->init(tnode, _runtime_state.get()));
1459
1460
0
            sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator);
1461
0
            sink_ops.push_back(std::move(sink_operator));
1462
0
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1463
0
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode_, _runtime_state.get()));
1464
1465
0
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1466
0
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1467
8.96k
        } else {
1468
8.96k
            op = std::make_shared<HashJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1469
8.96k
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1470
1471
8.96k
            const auto downstream_pipeline_id = cur_pipe->id();
1472
8.96k
            if (!_dag.contains(downstream_pipeline_id)) {
1473
7.25k
                _dag.insert({downstream_pipeline_id, {}});
1474
7.25k
            }
1475
8.96k
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1476
8.96k
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1477
1478
8.96k
            sink_ops.push_back(std::make_shared<HashJoinBuildSinkOperatorX>(
1479
8.96k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1480
8.96k
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1481
8.96k
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1482
1483
8.96k
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1484
8.96k
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1485
8.96k
        }
1486
8.96k
        if (is_broadcast_join && _runtime_state->enable_share_hash_table_for_broadcast_join()) {
1487
1.97k
            std::shared_ptr<HashJoinSharedState> shared_state =
1488
1.97k
                    HashJoinSharedState::create_shared(_num_instances);
1489
15.8k
            for (int i = 0; i < _num_instances; i++) {
1490
13.8k
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1491
13.8k
                                                             "HASH_JOIN_BUILD_DEPENDENCY");
1492
13.8k
                sink_dep->set_shared_state(shared_state.get());
1493
13.8k
                shared_state->sink_deps.push_back(sink_dep);
1494
13.8k
            }
1495
1.97k
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1496
1.97k
                                                     op->node_id(), "HASH_JOIN_PROBE");
1497
1.97k
            _op_id_to_shared_state.insert(
1498
1.97k
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1499
1.97k
        }
1500
8.96k
        break;
1501
8.96k
    }
1502
2.39k
    case TPlanNodeType::CROSS_JOIN_NODE: {
1503
2.39k
        op = std::make_shared<NestedLoopJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1504
2.39k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1505
1506
2.39k
        const auto downstream_pipeline_id = cur_pipe->id();
1507
2.39k
        if (!_dag.contains(downstream_pipeline_id)) {
1508
2.14k
            _dag.insert({downstream_pipeline_id, {}});
1509
2.14k
        }
1510
2.39k
        PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1511
2.39k
        _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1512
1513
2.39k
        sink_ops.push_back(std::make_shared<NestedLoopJoinBuildSinkOperatorX>(
1514
2.39k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1515
2.39k
        RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1516
2.39k
        RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1517
2.39k
        _pipeline_parent_map.push(op->node_id(), cur_pipe);
1518
2.39k
        _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1519
2.39k
        break;
1520
2.39k
    }
1521
37.9k
    case TPlanNodeType::UNION_NODE: {
1522
37.9k
        int child_count = tnode.num_children;
1523
37.9k
        op = std::make_shared<UnionSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1524
37.9k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1525
1526
37.9k
        const auto downstream_pipeline_id = cur_pipe->id();
1527
37.9k
        if (!_dag.contains(downstream_pipeline_id)) {
1528
37.7k
            _dag.insert({downstream_pipeline_id, {}});
1529
37.7k
        }
1530
39.0k
        for (int i = 0; i < child_count; i++) {
1531
1.12k
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1532
1.12k
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1533
1.12k
            sink_ops.push_back(std::make_shared<UnionSinkOperatorX>(
1534
1.12k
                    i, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1535
1.12k
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1536
1.12k
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1537
            // preset children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1538
1.12k
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1539
1.12k
        }
1540
37.9k
        break;
1541
37.9k
    }
1542
37.9k
    case TPlanNodeType::SORT_NODE: {
1543
33.6k
        const auto should_spill = _runtime_state->enable_spill() &&
1544
33.6k
                                  tnode.sort_node.algorithm == TSortAlgorithm::FULL_SORT;
1545
33.6k
        const bool use_local_merge =
1546
33.6k
                tnode.sort_node.__isset.use_local_merge && tnode.sort_node.use_local_merge;
1547
33.6k
        if (should_spill) {
1548
7
            op = std::make_shared<SpillSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1549
33.6k
        } else if (use_local_merge) {
1550
29.0k
            op = std::make_shared<LocalMergeSortSourceOperatorX>(pool, tnode, next_operator_id(),
1551
29.0k
                                                                 descs);
1552
29.0k
        } else {
1553
4.54k
            op = std::make_shared<SortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1554
4.54k
        }
1555
33.6k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1556
1557
33.6k
        const auto downstream_pipeline_id = cur_pipe->id();
1558
33.6k
        if (!_dag.contains(downstream_pipeline_id)) {
1559
33.6k
            _dag.insert({downstream_pipeline_id, {}});
1560
33.6k
        }
1561
33.6k
        cur_pipe = add_pipeline(cur_pipe);
1562
33.6k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1563
1564
33.6k
        if (should_spill) {
1565
7
            sink_ops.push_back(std::make_shared<SpillSortSinkOperatorX>(
1566
7
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1567
33.6k
        } else {
1568
33.6k
            sink_ops.push_back(std::make_shared<SortSinkOperatorX>(
1569
33.6k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1570
33.6k
        }
1571
33.6k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1572
33.6k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1573
33.6k
        break;
1574
33.6k
    }
1575
33.6k
    case TPlanNodeType::PARTITION_SORT_NODE: {
1576
64
        op = std::make_shared<PartitionSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1577
64
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1578
1579
64
        const auto downstream_pipeline_id = cur_pipe->id();
1580
64
        if (!_dag.contains(downstream_pipeline_id)) {
1581
64
            _dag.insert({downstream_pipeline_id, {}});
1582
64
        }
1583
64
        cur_pipe = add_pipeline(cur_pipe);
1584
64
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1585
1586
64
        sink_ops.push_back(std::make_shared<PartitionSortSinkOperatorX>(
1587
64
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1588
64
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1589
64
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1590
64
        break;
1591
64
    }
1592
1.76k
    case TPlanNodeType::ANALYTIC_EVAL_NODE: {
1593
1.76k
        op = std::make_shared<AnalyticSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1594
1.76k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1595
1596
1.76k
        const auto downstream_pipeline_id = cur_pipe->id();
1597
1.76k
        if (!_dag.contains(downstream_pipeline_id)) {
1598
1.75k
            _dag.insert({downstream_pipeline_id, {}});
1599
1.75k
        }
1600
1.76k
        cur_pipe = add_pipeline(cur_pipe);
1601
1.76k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1602
1603
1.76k
        sink_ops.push_back(std::make_shared<AnalyticSinkOperatorX>(
1604
1.76k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1605
1.76k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1606
1.76k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1607
1.76k
        break;
1608
1.76k
    }
1609
1.76k
    case TPlanNodeType::MATERIALIZATION_NODE: {
1610
666
        op = std::make_shared<MaterializationOperator>(pool, tnode, next_operator_id(), descs);
1611
666
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1612
666
        break;
1613
666
    }
1614
666
    case TPlanNodeType::INTERSECT_NODE: {
1615
119
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(pool, tnode, descs, op,
1616
119
                                                                      cur_pipe, sink_ops));
1617
119
        break;
1618
119
    }
1619
129
    case TPlanNodeType::EXCEPT_NODE: {
1620
129
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(pool, tnode, descs, op,
1621
129
                                                                       cur_pipe, sink_ops));
1622
129
        break;
1623
129
    }
1624
311
    case TPlanNodeType::REPEAT_NODE: {
1625
311
        op = std::make_shared<RepeatOperatorX>(pool, tnode, next_operator_id(), descs);
1626
311
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1627
311
        break;
1628
311
    }
1629
867
    case TPlanNodeType::TABLE_FUNCTION_NODE: {
1630
867
        op = std::make_shared<TableFunctionOperatorX>(pool, tnode, next_operator_id(), descs);
1631
867
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1632
867
        break;
1633
867
    }
1634
867
    case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
1635
18
        op = std::make_shared<AssertNumRowsOperatorX>(pool, tnode, next_operator_id(), descs);
1636
18
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1637
18
        break;
1638
18
    }
1639
1.56k
    case TPlanNodeType::EMPTY_SET_NODE: {
1640
1.56k
        op = std::make_shared<EmptySetSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1641
1.56k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1642
1.56k
        break;
1643
1.56k
    }
1644
1.56k
    case TPlanNodeType::DATA_GEN_SCAN_NODE: {
1645
265
        op = std::make_shared<DataGenSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1646
265
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1647
265
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1648
265
        break;
1649
265
    }
1650
1.54k
    case TPlanNodeType::SCHEMA_SCAN_NODE: {
1651
1.54k
        op = std::make_shared<SchemaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1652
1.54k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1653
1.54k
        break;
1654
1.54k
    }
1655
5.24k
    case TPlanNodeType::META_SCAN_NODE: {
1656
5.24k
        op = std::make_shared<MetaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1657
5.24k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1658
5.24k
        break;
1659
5.24k
    }
1660
5.24k
    case TPlanNodeType::SELECT_NODE: {
1661
733
        op = std::make_shared<SelectOperatorX>(pool, tnode, next_operator_id(), descs);
1662
733
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1663
733
        break;
1664
733
    }
1665
733
    case TPlanNodeType::REC_CTE_NODE: {
1666
150
        op = std::make_shared<RecCTESourceOperatorX>(pool, tnode, next_operator_id(), descs);
1667
150
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1668
1669
150
        const auto downstream_pipeline_id = cur_pipe->id();
1670
150
        if (!_dag.contains(downstream_pipeline_id)) {
1671
147
            _dag.insert({downstream_pipeline_id, {}});
1672
147
        }
1673
1674
150
        PipelinePtr anchor_side_pipe = add_pipeline(cur_pipe);
1675
150
        _dag[downstream_pipeline_id].push_back(anchor_side_pipe->id());
1676
1677
150
        DataSinkOperatorPtr anchor_sink;
1678
150
        anchor_sink = std::make_shared<RecCTEAnchorSinkOperatorX>(next_sink_operator_id(),
1679
150
                                                                  op->operator_id(), tnode, descs);
1680
150
        RETURN_IF_ERROR(anchor_side_pipe->set_sink(anchor_sink));
1681
150
        RETURN_IF_ERROR(anchor_side_pipe->sink()->init(tnode, _runtime_state.get()));
1682
150
        _pipeline_parent_map.push(op->node_id(), anchor_side_pipe);
1683
1684
150
        PipelinePtr rec_side_pipe = add_pipeline(cur_pipe);
1685
150
        _dag[downstream_pipeline_id].push_back(rec_side_pipe->id());
1686
1687
150
        DataSinkOperatorPtr rec_sink;
1688
150
        rec_sink = std::make_shared<RecCTESinkOperatorX>(next_sink_operator_id(), op->operator_id(),
1689
150
                                                         tnode, descs);
1690
150
        RETURN_IF_ERROR(rec_side_pipe->set_sink(rec_sink));
1691
150
        RETURN_IF_ERROR(rec_side_pipe->sink()->init(tnode, _runtime_state.get()));
1692
150
        _pipeline_parent_map.push(op->node_id(), rec_side_pipe);
1693
1694
150
        break;
1695
150
    }
1696
1.85k
    case TPlanNodeType::REC_CTE_SCAN_NODE: {
1697
1.85k
        op = std::make_shared<RecCTEScanOperatorX>(pool, tnode, next_operator_id(), descs);
1698
1.85k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1699
1.85k
        break;
1700
1.85k
    }
1701
1.85k
    default:
1702
0
        return Status::InternalError("Unsupported exec type in pipeline: {}",
1703
0
                                     print_plan_node_type(tnode.node_type));
1704
547k
    }
1705
546k
    if (_params.__isset.parallel_instances && fe_with_old_version) {
1706
0
        cur_pipe->set_num_tasks(_params.parallel_instances);
1707
0
        op->set_serial_operator();
1708
0
    }
1709
1710
546k
    return Status::OK();
1711
547k
}
1712
// NOLINTEND(readability-function-cognitive-complexity)
1713
// NOLINTEND(readability-function-size)
1714
1715
template <bool is_intersect>
1716
Status PipelineFragmentContext::_build_operators_for_set_operation_node(
1717
        ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op,
1718
248
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1719
248
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1720
248
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1721
1722
248
    const auto downstream_pipeline_id = cur_pipe->id();
1723
248
    if (!_dag.contains(downstream_pipeline_id)) {
1724
217
        _dag.insert({downstream_pipeline_id, {}});
1725
217
    }
1726
1727
837
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1728
589
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1729
589
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1730
1731
589
        if (child_id == 0) {
1732
248
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1733
248
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1734
341
        } else {
1735
341
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1736
341
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1737
341
        }
1738
589
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1739
589
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1740
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1741
589
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1742
589
    }
1743
1744
248
    return Status::OK();
1745
248
}
_ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb1EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Line
Count
Source
1718
119
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1719
119
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1720
119
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1721
1722
119
    const auto downstream_pipeline_id = cur_pipe->id();
1723
119
    if (!_dag.contains(downstream_pipeline_id)) {
1724
96
        _dag.insert({downstream_pipeline_id, {}});
1725
96
    }
1726
1727
435
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1728
316
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1729
316
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1730
1731
316
        if (child_id == 0) {
1732
119
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1733
119
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1734
197
        } else {
1735
197
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1736
197
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1737
197
        }
1738
316
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1739
316
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1740
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1741
316
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1742
316
    }
1743
1744
119
    return Status::OK();
1745
119
}
_ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb0EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Line
Count
Source
1718
129
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1719
129
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1720
129
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1721
1722
129
    const auto downstream_pipeline_id = cur_pipe->id();
1723
129
    if (!_dag.contains(downstream_pipeline_id)) {
1724
121
        _dag.insert({downstream_pipeline_id, {}});
1725
121
    }
1726
1727
402
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1728
273
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1729
273
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1730
1731
273
        if (child_id == 0) {
1732
129
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1733
129
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1734
144
        } else {
1735
144
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1736
144
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1737
144
        }
1738
273
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1739
273
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1740
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1741
273
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1742
273
    }
1743
1744
129
    return Status::OK();
1745
129
}
1746
1747
332k
Status PipelineFragmentContext::submit() {
1748
332k
    if (_submitted) {
1749
0
        return Status::InternalError("submitted");
1750
0
    }
1751
332k
    _submitted = true;
1752
1753
332k
    int submit_tasks = 0;
1754
332k
    Status st;
1755
332k
    auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
1756
1.14M
    for (auto& task : _tasks) {
1757
2.03M
        for (auto& t : task) {
1758
2.03M
            st = scheduler->submit(t.first);
1759
2.03M
            DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed",
1760
2.03M
                            { st = Status::Aborted("PipelineFragmentContext.submit.failed"); });
1761
2.03M
            if (!st) {
1762
0
                cancel(Status::InternalError("submit context to executor fail"));
1763
0
                std::lock_guard<std::mutex> l(_task_mutex);
1764
0
                _total_tasks = submit_tasks;
1765
0
                break;
1766
0
            }
1767
2.03M
            submit_tasks++;
1768
2.03M
        }
1769
1.14M
    }
1770
332k
    if (!st.ok()) {
1771
0
        std::lock_guard<std::mutex> l(_task_mutex);
1772
0
        if (_closed_tasks >= _total_tasks) {
1773
0
            _close_fragment_instance();
1774
0
        }
1775
0
        return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(),
1776
0
                                     BackendOptions::get_localhost());
1777
332k
    } else {
1778
332k
        return st;
1779
332k
    }
1780
332k
}
1781
1782
0
void PipelineFragmentContext::print_profile(const std::string& extra_info) {
1783
0
    if (_runtime_state->enable_profile()) {
1784
0
        std::stringstream ss;
1785
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
1786
0
            runtime_profile_ptr->pretty_print(&ss);
1787
0
        }
1788
1789
0
        if (_runtime_state->load_channel_profile()) {
1790
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
1791
0
        }
1792
1793
0
        auto profile_str =
1794
0
                fmt::format("Query {} fragment {} {}, profile, {}", print_id(this->_query_id),
1795
0
                            this->_fragment_id, extra_info, ss.str());
1796
0
        LOG_LONG_STRING(INFO, profile_str);
1797
0
    }
1798
0
}
1799
// If all pipeline tasks binded to the fragment instance are finished, then we could
1800
// close the fragment instance.
1801
333k
void PipelineFragmentContext::_close_fragment_instance() {
1802
333k
    if (_is_fragment_instance_closed) {
1803
0
        return;
1804
0
    }
1805
333k
    Defer defer_op {[&]() {
1806
333k
        _is_fragment_instance_closed = true;
1807
333k
        _notify_cv.notify_all();
1808
333k
    }};
1809
333k
    _fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
1810
333k
    if (!_need_notify_close) {
1811
330k
        auto st = send_report(true);
1812
330k
        if (!st) {
1813
295k
            LOG(WARNING) << fmt::format("Failed to send report for query {}, fragment {}: {}",
1814
295k
                                        print_id(_query_id), _fragment_id, st.to_string());
1815
295k
        }
1816
330k
    }
1817
    // Print profile content in info log is a tempoeray solution for stream load and external_connector.
1818
    // Since stream load does not have someting like coordinator on FE, so
1819
    // backend can not report profile to FE, ant its profile can not be shown
1820
    // in the same way with other query. So we print the profile content to info log.
1821
1822
333k
    if (_runtime_state->enable_profile() &&
1823
333k
        (_query_ctx->get_query_source() == QuerySource::STREAM_LOAD ||
1824
2.26k
         _query_ctx->get_query_source() == QuerySource::EXTERNAL_CONNECTOR ||
1825
2.26k
         _query_ctx->get_query_source() == QuerySource::GROUP_COMMIT_LOAD)) {
1826
0
        std::stringstream ss;
1827
        // Compute the _local_time_percent before pretty_print the runtime_profile
1828
        // Before add this operation, the print out like that:
1829
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 00.00%)
1830
        // After add the operation, the print out like that:
1831
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 82.53%)
1832
        // We can easily know the exec node execute time without child time consumed.
1833
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
1834
0
            runtime_profile_ptr->pretty_print(&ss);
1835
0
        }
1836
1837
0
        if (_runtime_state->load_channel_profile()) {
1838
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
1839
0
        }
1840
1841
0
        LOG_INFO("Query {} fragment {} profile:\n {}", print_id(_query_id), _fragment_id, ss.str());
1842
0
    }
1843
1844
333k
    if (_query_ctx->enable_profile()) {
1845
2.26k
        _query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile(),
1846
2.26k
                                         collect_realtime_load_channel_profile());
1847
2.26k
    }
1848
1849
333k
    if (!_need_notify_close) {
1850
        // all submitted tasks done
1851
330k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
1852
330k
    }
1853
333k
}
1854
1855
2.02M
void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
1856
    // If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here
1857
2.02M
    DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
1858
2.02M
    if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
1859
541k
        if (_dag.contains(pipeline_id)) {
1860
316k
            for (auto dep : _dag[pipeline_id]) {
1861
316k
                _pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id);
1862
316k
            }
1863
236k
        }
1864
541k
    }
1865
2.02M
    std::lock_guard<std::mutex> l(_task_mutex);
1866
2.02M
    ++_closed_tasks;
1867
2.02M
    if (_closed_tasks >= _total_tasks) {
1868
333k
        _close_fragment_instance();
1869
333k
    }
1870
2.02M
}
1871
1872
44.4k
std::string PipelineFragmentContext::get_load_error_url() {
1873
44.4k
    if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) {
1874
0
        return to_load_error_http_path(str);
1875
0
    }
1876
135k
    for (auto& tasks : _tasks) {
1877
230k
        for (auto& task : tasks) {
1878
230k
            if (const auto& str = task.second->get_error_log_file_path(); !str.empty()) {
1879
152
                return to_load_error_http_path(str);
1880
152
            }
1881
230k
        }
1882
135k
    }
1883
44.2k
    return "";
1884
44.4k
}
1885
1886
44.4k
std::string PipelineFragmentContext::get_first_error_msg() {
1887
44.4k
    if (const auto& str = _runtime_state->get_first_error_msg(); !str.empty()) {
1888
0
        return str;
1889
0
    }
1890
135k
    for (auto& tasks : _tasks) {
1891
230k
        for (auto& task : tasks) {
1892
230k
            if (const auto& str = task.second->get_first_error_msg(); !str.empty()) {
1893
152
                return str;
1894
152
            }
1895
230k
        }
1896
135k
    }
1897
44.2k
    return "";
1898
44.4k
}
1899
1900
335k
Status PipelineFragmentContext::send_report(bool done) {
1901
335k
    Status exec_status = _query_ctx->exec_status();
1902
    // If plan is done successfully, but _is_report_success is false,
1903
    // no need to send report.
1904
    // Load will set _is_report_success to true because load wants to know
1905
    // the process.
1906
335k
    if (!_is_report_success && done && exec_status.ok()) {
1907
295k
        return Status::NeedSendAgain("");
1908
295k
    }
1909
1910
    // If both _is_report_success and _is_report_on_cancel are false,
1911
    // which means no matter query is success or failed, no report is needed.
1912
    // This may happen when the query limit reached and
1913
    // a internal cancellation being processed
1914
    // When limit is reached the fragment is also cancelled, but _is_report_on_cancel will
1915
    // be set to false, to avoid sending fault report to FE.
1916
39.8k
    if (!_is_report_success && !_is_report_on_cancel) {
1917
53
        return Status::NeedSendAgain("");
1918
53
    }
1919
1920
39.7k
    std::vector<RuntimeState*> runtime_states;
1921
1922
117k
    for (auto& tasks : _tasks) {
1923
191k
        for (auto& task : tasks) {
1924
191k
            runtime_states.push_back(task.second.get());
1925
191k
        }
1926
117k
    }
1927
1928
39.7k
    std::string load_eror_url = _query_ctx->get_load_error_url().empty()
1929
39.7k
                                        ? get_load_error_url()
1930
39.7k
                                        : _query_ctx->get_load_error_url();
1931
39.7k
    std::string first_error_msg = _query_ctx->get_first_error_msg().empty()
1932
39.7k
                                          ? get_first_error_msg()
1933
39.7k
                                          : _query_ctx->get_first_error_msg();
1934
1935
39.7k
    ReportStatusRequest req {.status = exec_status,
1936
39.7k
                             .runtime_states = runtime_states,
1937
39.7k
                             .done = done || !exec_status.ok(),
1938
39.7k
                             .coord_addr = _query_ctx->coord_addr,
1939
39.7k
                             .query_id = _query_id,
1940
39.7k
                             .fragment_id = _fragment_id,
1941
39.7k
                             .fragment_instance_id = TUniqueId(),
1942
39.7k
                             .backend_num = -1,
1943
39.7k
                             .runtime_state = _runtime_state.get(),
1944
39.7k
                             .load_error_url = load_eror_url,
1945
39.7k
                             .first_error_msg = first_error_msg,
1946
39.7k
                             .cancel_fn = [this](const Status& reason) { cancel(reason); }};
1947
1948
39.7k
    return _report_status_cb(
1949
39.7k
            req, std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
1950
39.8k
}
1951
1952
4.37k
size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const {
1953
4.37k
    size_t res = 0;
1954
    // _tasks will be cleared during ~PipelineFragmentContext, so that it's safe
1955
    // here to traverse the vector.
1956
4.37k
    for (const auto& task_instances : _tasks) {
1957
12.1k
        for (const auto& task : task_instances) {
1958
12.1k
            if (task.first->is_running()) {
1959
2
                LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id)
1960
2
                                      << " is running, task: " << (void*)task.first.get()
1961
2
                                      << ", is_running: " << task.first->is_running();
1962
2
                *has_running_task = true;
1963
2
                return 0;
1964
2
            }
1965
1966
12.1k
            size_t revocable_size = task.first->get_revocable_size();
1967
12.1k
            if (revocable_size >= SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
1968
1.01k
                res += revocable_size;
1969
1.01k
            }
1970
12.1k
        }
1971
4.37k
    }
1972
4.37k
    return res;
1973
4.37k
}
1974
1975
8.74k
std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const {
1976
8.74k
    std::vector<PipelineTask*> revocable_tasks;
1977
8.75k
    for (const auto& task_instances : _tasks) {
1978
24.2k
        for (const auto& task : task_instances) {
1979
24.2k
            size_t revocable_size_ = task.first->get_revocable_size();
1980
24.2k
            if (revocable_size_ >= SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
1981
2.03k
                revocable_tasks.emplace_back(task.first.get());
1982
2.03k
            }
1983
24.2k
        }
1984
8.75k
    }
1985
8.74k
    return revocable_tasks;
1986
8.74k
}
1987
1988
473
std::string PipelineFragmentContext::debug_string() {
1989
473
    std::lock_guard<std::mutex> l(_task_mutex);
1990
473
    fmt::memory_buffer debug_string_buffer;
1991
473
    fmt::format_to(debug_string_buffer,
1992
473
                   "PipelineFragmentContext Info: _closed_tasks={}, _total_tasks={}, "
1993
473
                   "need_notify_close={}, has_task_execution_ctx_ref_count={}\n",
1994
473
                   _closed_tasks, _total_tasks, _need_notify_close,
1995
473
                   _has_task_execution_ctx_ref_count);
1996
1.58k
    for (size_t j = 0; j < _tasks.size(); j++) {
1997
1.11k
        fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
1998
2.82k
        for (size_t i = 0; i < _tasks[j].size(); i++) {
1999
1.71k
            fmt::format_to(debug_string_buffer, "Task {}: {}\n", i,
2000
1.71k
                           _tasks[j][i].first->debug_string());
2001
1.71k
        }
2002
1.11k
    }
2003
2004
473
    return fmt::to_string(debug_string_buffer);
2005
473
}
2006
2007
std::vector<std::shared_ptr<TRuntimeProfileTree>>
2008
2.26k
PipelineFragmentContext::collect_realtime_profile() const {
2009
2.26k
    std::vector<std::shared_ptr<TRuntimeProfileTree>> res;
2010
2011
    // we do not have mutex to protect pipeline_id_to_profile
2012
    // so we need to make sure this funciton is invoked after fragment context
2013
    // has already been prepared.
2014
2.26k
    if (!_prepared) {
2015
0
        std::string msg =
2016
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2017
0
        DCHECK(false) << msg;
2018
0
        LOG_ERROR(msg);
2019
0
        return res;
2020
0
    }
2021
2022
    // Make sure first profile is fragment level profile
2023
2.26k
    auto fragment_profile = std::make_shared<TRuntimeProfileTree>();
2024
2.26k
    _fragment_level_profile->to_thrift(fragment_profile.get(), _runtime_state->profile_level());
2025
2.26k
    res.push_back(fragment_profile);
2026
2027
    // pipeline_id_to_profile is initialized in prepare stage
2028
4.03k
    for (auto pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
2029
4.03k
        auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
2030
4.03k
        pipeline_profile->to_thrift(profile_ptr.get(), _runtime_state->profile_level());
2031
4.03k
        res.push_back(profile_ptr);
2032
4.03k
    }
2033
2034
2.26k
    return res;
2035
2.26k
}
2036
2037
std::shared_ptr<TRuntimeProfileTree>
2038
2.26k
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
2039
    // we do not have mutex to protect pipeline_id_to_profile
2040
    // so we need to make sure this funciton is invoked after fragment context
2041
    // has already been prepared.
2042
2.26k
    if (!_prepared) {
2043
0
        std::string msg =
2044
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2045
0
        DCHECK(false) << msg;
2046
0
        LOG_ERROR(msg);
2047
0
        return nullptr;
2048
0
    }
2049
2050
4.89k
    for (const auto& tasks : _tasks) {
2051
9.71k
        for (const auto& task : tasks) {
2052
9.71k
            if (task.second->load_channel_profile() == nullptr) {
2053
0
                continue;
2054
0
            }
2055
2056
9.71k
            auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2057
2058
9.71k
            task.second->load_channel_profile()->to_thrift(tmp_load_channel_profile.get(),
2059
9.71k
                                                           _runtime_state->profile_level());
2060
9.71k
            _runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
2061
9.71k
        }
2062
4.89k
    }
2063
2064
2.26k
    auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2065
2.26k
    _runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get(),
2066
2.26k
                                                      _runtime_state->profile_level());
2067
2.26k
    return load_channel_profile;
2068
2.26k
}
2069
2070
3.08k
Status PipelineFragmentContext::wait_close(bool close) {
2071
3.08k
    if (_exec_env->new_load_stream_mgr()->get(_query_id) != nullptr) {
2072
0
        return Status::InternalError("stream load do not support reset");
2073
0
    }
2074
3.08k
    if (!_need_notify_close) {
2075
0
        return Status::InternalError("_need_notify_close is false, do not support reset");
2076
0
    }
2077
2078
3.08k
    {
2079
3.08k
        std::unique_lock<std::mutex> lock(_task_mutex);
2080
3.18k
        while (!(_is_fragment_instance_closed.load() && !_has_task_execution_ctx_ref_count)) {
2081
95
            if (_query_ctx->is_cancelled()) {
2082
0
                return Status::Cancelled("Query has been cancelled");
2083
0
            }
2084
95
            _notify_cv.wait_for(lock, std::chrono::seconds(1));
2085
95
        }
2086
3.08k
    }
2087
2088
3.08k
    if (close) {
2089
192
        auto st = send_report(true);
2090
192
        if (!st) {
2091
154
            LOG(WARNING) << fmt::format("Failed to send report for query {}, fragment {}: {}",
2092
154
                                        print_id(_query_id), _fragment_id, st.to_string());
2093
154
        }
2094
192
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
2095
192
    }
2096
3.08k
    return Status::OK();
2097
3.08k
}
2098
2099
2.89k
Status PipelineFragmentContext::set_to_rerun() {
2100
2.89k
    {
2101
2.89k
        std::lock_guard<std::mutex> l(_task_mutex);
2102
2.89k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
2103
7.49k
        for (auto& tasks : _tasks) {
2104
13.6k
            for (const auto& task : tasks) {
2105
13.6k
                task.first->runtime_state()->reset_to_rerun();
2106
13.6k
            }
2107
7.49k
        }
2108
2.89k
    }
2109
2.89k
    _release_resource();
2110
2.89k
    _runtime_state->reset_to_rerun();
2111
2.89k
    return Status::OK();
2112
2.89k
}
2113
2114
2.89k
Status PipelineFragmentContext::rebuild(ThreadPool* thread_pool) {
2115
2.89k
    _submitted = false;
2116
2.89k
    _is_fragment_instance_closed = false;
2117
2.89k
    return _build_and_prepare_full_pipeline(thread_pool);
2118
2.89k
}
2119
2120
335k
void PipelineFragmentContext::_release_resource() {
2121
335k
    std::lock_guard<std::mutex> l(_task_mutex);
2122
    // The memory released by the query end is recorded in the query mem tracker.
2123
335k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
2124
335k
    auto st = _query_ctx->exec_status();
2125
1.14M
    for (auto& _task : _tasks) {
2126
1.14M
        if (!_task.empty()) {
2127
1.14M
            _call_back(_task.front().first->runtime_state(), &st);
2128
1.14M
        }
2129
1.14M
    }
2130
335k
    _tasks.clear();
2131
335k
    _dag.clear();
2132
335k
    _pip_id_to_pipeline.clear();
2133
335k
    _pipelines.clear();
2134
335k
    _sink.reset();
2135
335k
    _root_op.reset();
2136
335k
    _runtime_filter_mgr_map.clear();
2137
335k
    _op_id_to_shared_state.clear();
2138
335k
}
2139
2140
#include "common/compile_check_end.h"
2141
} // namespace doris