Coverage Report

Created: 2026-04-16 10:20

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/pipeline/pipeline_fragment_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/pipeline/pipeline_fragment_context.h"
19
20
#include <gen_cpp/DataSinks_types.h>
21
#include <gen_cpp/FrontendService.h>
22
#include <gen_cpp/FrontendService_types.h>
23
#include <gen_cpp/PaloInternalService_types.h>
24
#include <gen_cpp/PlanNodes_types.h>
25
#include <pthread.h>
26
27
#include <algorithm>
28
#include <cstdlib>
29
// IWYU pragma: no_include <bits/chrono.h>
30
#include <fmt/format.h>
31
#include <thrift/Thrift.h>
32
#include <thrift/protocol/TDebugProtocol.h>
33
#include <thrift/transport/TTransportException.h>
34
35
#include <chrono> // IWYU pragma: keep
36
#include <map>
37
#include <memory>
38
#include <ostream>
39
#include <utility>
40
41
#include "cloud/config.h"
42
#include "common/cast_set.h"
43
#include "common/config.h"
44
#include "common/exception.h"
45
#include "common/logging.h"
46
#include "common/status.h"
47
#include "exec/exchange/local_exchange_sink_operator.h"
48
#include "exec/exchange/local_exchange_source_operator.h"
49
#include "exec/exchange/local_exchanger.h"
50
#include "exec/exchange/vdata_stream_mgr.h"
51
#include "exec/operator/aggregation_sink_operator.h"
52
#include "exec/operator/aggregation_source_operator.h"
53
#include "exec/operator/analytic_sink_operator.h"
54
#include "exec/operator/analytic_source_operator.h"
55
#include "exec/operator/assert_num_rows_operator.h"
56
#include "exec/operator/blackhole_sink_operator.h"
57
#include "exec/operator/cache_sink_operator.h"
58
#include "exec/operator/cache_source_operator.h"
59
#include "exec/operator/datagen_operator.h"
60
#include "exec/operator/dict_sink_operator.h"
61
#include "exec/operator/distinct_streaming_aggregation_operator.h"
62
#include "exec/operator/empty_set_operator.h"
63
#include "exec/operator/es_scan_operator.h"
64
#include "exec/operator/exchange_sink_operator.h"
65
#include "exec/operator/exchange_source_operator.h"
66
#include "exec/operator/file_scan_operator.h"
67
#include "exec/operator/group_commit_block_sink_operator.h"
68
#include "exec/operator/group_commit_scan_operator.h"
69
#include "exec/operator/hashjoin_build_sink.h"
70
#include "exec/operator/hashjoin_probe_operator.h"
71
#include "exec/operator/hive_table_sink_operator.h"
72
#include "exec/operator/iceberg_delete_sink_operator.h"
73
#include "exec/operator/iceberg_merge_sink_operator.h"
74
#include "exec/operator/iceberg_table_sink_operator.h"
75
#include "exec/operator/jdbc_scan_operator.h"
76
#include "exec/operator/jdbc_table_sink_operator.h"
77
#include "exec/operator/local_merge_sort_source_operator.h"
78
#include "exec/operator/materialization_opertor.h"
79
#include "exec/operator/maxcompute_table_sink_operator.h"
80
#include "exec/operator/memory_scratch_sink_operator.h"
81
#include "exec/operator/meta_scan_operator.h"
82
#include "exec/operator/multi_cast_data_stream_sink.h"
83
#include "exec/operator/multi_cast_data_stream_source.h"
84
#include "exec/operator/nested_loop_join_build_operator.h"
85
#include "exec/operator/nested_loop_join_probe_operator.h"
86
#include "exec/operator/olap_scan_operator.h"
87
#include "exec/operator/olap_table_sink_operator.h"
88
#include "exec/operator/olap_table_sink_v2_operator.h"
89
#include "exec/operator/partition_sort_sink_operator.h"
90
#include "exec/operator/partition_sort_source_operator.h"
91
#include "exec/operator/partitioned_aggregation_sink_operator.h"
92
#include "exec/operator/partitioned_aggregation_source_operator.h"
93
#include "exec/operator/partitioned_hash_join_probe_operator.h"
94
#include "exec/operator/partitioned_hash_join_sink_operator.h"
95
#include "exec/operator/rec_cte_anchor_sink_operator.h"
96
#include "exec/operator/rec_cte_scan_operator.h"
97
#include "exec/operator/rec_cte_sink_operator.h"
98
#include "exec/operator/rec_cte_source_operator.h"
99
#include "exec/operator/repeat_operator.h"
100
#include "exec/operator/result_file_sink_operator.h"
101
#include "exec/operator/result_sink_operator.h"
102
#include "exec/operator/schema_scan_operator.h"
103
#include "exec/operator/select_operator.h"
104
#include "exec/operator/set_probe_sink_operator.h"
105
#include "exec/operator/set_sink_operator.h"
106
#include "exec/operator/set_source_operator.h"
107
#include "exec/operator/sort_sink_operator.h"
108
#include "exec/operator/sort_source_operator.h"
109
#include "exec/operator/spill_iceberg_table_sink_operator.h"
110
#include "exec/operator/spill_sort_sink_operator.h"
111
#include "exec/operator/spill_sort_source_operator.h"
112
#include "exec/operator/streaming_aggregation_operator.h"
113
#include "exec/operator/table_function_operator.h"
114
#include "exec/operator/tvf_table_sink_operator.h"
115
#include "exec/operator/union_sink_operator.h"
116
#include "exec/operator/union_source_operator.h"
117
#include "exec/pipeline/dependency.h"
118
#include "exec/pipeline/pipeline_task.h"
119
#include "exec/pipeline/task_scheduler.h"
120
#include "exec/runtime_filter/runtime_filter_mgr.h"
121
#include "exec/sort/topn_sorter.h"
122
#include "exec/spill/spill_file.h"
123
#include "io/fs/stream_load_pipe.h"
124
#include "load/stream_load/new_load_stream_mgr.h"
125
#include "runtime/exec_env.h"
126
#include "runtime/fragment_mgr.h"
127
#include "runtime/result_buffer_mgr.h"
128
#include "runtime/runtime_state.h"
129
#include "runtime/thread_context.h"
130
#include "service/backend_options.h"
131
#include "util/client_cache.h"
132
#include "util/countdown_latch.h"
133
#include "util/debug_util.h"
134
#include "util/network_util.h"
135
#include "util/uid_util.h"
136
137
namespace doris {
138
PipelineFragmentContext::PipelineFragmentContext(
139
        TUniqueId query_id, const TPipelineFragmentParams& request,
140
        std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
141
        const std::function<void(RuntimeState*, Status*)>& call_back)
142
430k
        : _query_id(std::move(query_id)),
143
430k
          _fragment_id(request.fragment_id),
144
430k
          _exec_env(exec_env),
145
430k
          _query_ctx(std::move(query_ctx)),
146
430k
          _call_back(call_back),
147
430k
          _is_report_on_cancel(true),
148
430k
          _params(request),
149
430k
          _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0),
150
430k
          _need_notify_close(request.__isset.need_notify_close ? request.need_notify_close
151
430k
                                                               : false) {
152
430k
    _fragment_watcher.start();
153
430k
}
154
155
430k
PipelineFragmentContext::~PipelineFragmentContext() {
156
430k
    LOG_INFO("PipelineFragmentContext::~PipelineFragmentContext")
157
430k
            .tag("query_id", print_id(_query_id))
158
430k
            .tag("fragment_id", _fragment_id);
159
430k
    _release_resource();
160
430k
    {
161
        // The memory released by the query end is recorded in the query mem tracker.
162
430k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
163
430k
        _runtime_state.reset();
164
430k
        _query_ctx.reset();
165
430k
    }
166
430k
}
167
168
98
bool PipelineFragmentContext::is_timeout(timespec now) const {
169
98
    if (_timeout <= 0) {
170
0
        return false;
171
0
    }
172
98
    return _fragment_watcher.elapsed_time_seconds(now) > _timeout;
173
98
}
174
175
// notify_close() transitions the PFC from "waiting for external close notification" to
176
// "self-managed close". For recursive CTE fragments, the old PFC is kept alive until
177
// the rerun_fragment(wait_for_destroy) RPC calls this to trigger shutdown.
178
// Returns true if all tasks have already closed (i.e., the PFC can be safely destroyed).
179
9.05k
bool PipelineFragmentContext::notify_close() {
180
9.05k
    bool all_closed = false;
181
9.05k
    bool need_remove = false;
182
9.05k
    {
183
9.05k
        std::lock_guard<std::mutex> l(_task_mutex);
184
9.05k
        if (_closed_tasks >= _total_tasks) {
185
3.21k
            if (_need_notify_close) {
186
                // Fragment was cancelled and waiting for notify to close.
187
                // Record that we need to remove from fragment mgr, but do it
188
                // after releasing _task_mutex to avoid ABBA deadlock with
189
                // dump_pipeline_tasks() (which acquires _pipeline_map lock
190
                // first, then _task_mutex via debug_string()).
191
3.14k
                need_remove = true;
192
3.14k
            }
193
3.21k
            all_closed = true;
194
3.21k
        }
195
        // make fragment release by self after cancel
196
9.05k
        _need_notify_close = false;
197
9.05k
    }
198
9.05k
    if (need_remove) {
199
3.14k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
200
3.14k
    }
201
9.05k
    return all_closed;
202
9.05k
}
203
204
// Must not add lock in this method. Because it will call query ctx cancel. And
205
// QueryCtx cancel will call fragment ctx cancel. And Also Fragment ctx's running
206
// Method like exchange sink buffer will call query ctx cancel. If we add lock here
207
// There maybe dead lock.
208
5.77k
void PipelineFragmentContext::cancel(const Status reason) {
209
5.77k
    LOG_INFO("PipelineFragmentContext::cancel")
210
5.77k
            .tag("query_id", print_id(_query_id))
211
5.77k
            .tag("fragment_id", _fragment_id)
212
5.77k
            .tag("reason", reason.to_string());
213
5.77k
    if (notify_close()) {
214
78
        return;
215
78
    }
216
    // Timeout is a special error code, we need print current stack to debug timeout issue.
217
5.69k
    if (reason.is<ErrorCode::TIMEOUT>()) {
218
8
        auto dbg_str = fmt::format("PipelineFragmentContext is cancelled due to timeout:\n{}",
219
8
                                   debug_string());
220
8
        LOG_LONG_STRING(WARNING, dbg_str);
221
8
    }
222
223
    // `ILLEGAL_STATE` means queries this fragment belongs to was not found in FE (maybe finished)
224
5.69k
    if (reason.is<ErrorCode::ILLEGAL_STATE>()) {
225
0
        LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}",
226
0
                    debug_string());
227
0
    }
228
229
5.69k
    if (reason.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || reason.is<ErrorCode::MEM_ALLOC_FAILED>()) {
230
0
        print_profile("cancel pipeline, reason: " + reason.to_string());
231
0
    }
232
233
5.69k
    if (auto error_url = get_load_error_url(); !error_url.empty()) {
234
23
        _query_ctx->set_load_error_url(error_url);
235
23
    }
236
237
5.69k
    if (auto first_error_msg = get_first_error_msg(); !first_error_msg.empty()) {
238
23
        _query_ctx->set_first_error_msg(first_error_msg);
239
23
    }
240
241
5.69k
    _query_ctx->cancel(reason, _fragment_id);
242
5.69k
    if (reason.is<ErrorCode::LIMIT_REACH>()) {
243
501
        _is_report_on_cancel = false;
244
5.19k
    } else {
245
23.6k
        for (auto& id : _fragment_instance_ids) {
246
23.6k
            LOG(WARNING) << "PipelineFragmentContext cancel instance: " << print_id(id);
247
23.6k
        }
248
5.19k
    }
249
    // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
250
    // For stream load the fragment's query_id == load id, it is set in FE.
251
5.69k
    auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
252
5.69k
    if (stream_load_ctx != nullptr) {
253
31
        stream_load_ctx->pipe->cancel(reason.to_string());
254
        // Set error URL here because after pipe is cancelled, stream load execution may return early.
255
        // We need to set the error URL at this point to ensure error information is properly
256
        // propagated to the client.
257
31
        stream_load_ctx->error_url = get_load_error_url();
258
31
        stream_load_ctx->first_error_msg = get_first_error_msg();
259
31
    }
260
261
25.2k
    for (auto& tasks : _tasks) {
262
54.9k
        for (auto& task : tasks) {
263
54.9k
            task.first->unblock_all_dependencies();
264
54.9k
        }
265
25.2k
    }
266
5.69k
}
267
268
656k
PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
269
656k
    PipelineId id = _next_pipeline_id++;
270
656k
    auto pipeline = std::make_shared<Pipeline>(
271
656k
            id, parent ? std::min(parent->num_tasks(), _num_instances) : _num_instances,
272
656k
            parent ? parent->num_tasks() : _num_instances);
273
656k
    if (idx >= 0) {
274
97.9k
        _pipelines.insert(_pipelines.begin() + idx, pipeline);
275
558k
    } else {
276
558k
        _pipelines.emplace_back(pipeline);
277
558k
    }
278
656k
    if (parent) {
279
220k
        parent->set_children(pipeline);
280
220k
    }
281
656k
    return pipeline;
282
656k
}
283
284
429k
Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thread_pool) {
285
429k
    {
286
429k
        SCOPED_TIMER(_build_pipelines_timer);
287
        // 2. Build pipelines with operators in this fragment.
288
429k
        auto root_pipeline = add_pipeline();
289
429k
        RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl,
290
429k
                                         &_root_op, root_pipeline));
291
292
        // 3. Create sink operator
293
429k
        if (!_params.fragment.__isset.output_sink) {
294
0
            return Status::InternalError("No output sink in this fragment!");
295
0
        }
296
429k
        RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), _params.fragment.output_sink,
297
429k
                                          _params.fragment.output_exprs, _params,
298
429k
                                          root_pipeline->output_row_desc(), _runtime_state.get(),
299
429k
                                          *_desc_tbl, root_pipeline->id()));
300
429k
        RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink));
301
429k
        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
302
303
557k
        for (PipelinePtr& pipeline : _pipelines) {
304
18.4E
            DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size();
305
557k
            RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
306
557k
        }
307
429k
    }
308
    // 4. Build local exchanger
309
429k
    if (_runtime_state->enable_local_shuffle()) {
310
427k
        SCOPED_TIMER(_plan_local_exchanger_timer);
311
427k
        RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets,
312
427k
                                             _params.bucket_seq_to_instance_idx,
313
427k
                                             _params.shuffle_idx_to_instance_idx));
314
427k
    }
315
316
    // 5. Initialize global states in pipelines.
317
657k
    for (PipelinePtr& pipeline : _pipelines) {
318
657k
        SCOPED_TIMER(_prepare_all_pipelines_timer);
319
657k
        pipeline->children().clear();
320
657k
        RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
321
657k
    }
322
323
428k
    {
324
428k
        SCOPED_TIMER(_build_tasks_timer);
325
        // 6. Build pipeline tasks and initialize local state.
326
428k
        RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool));
327
428k
    }
328
329
428k
    return Status::OK();
330
428k
}
331
332
430k
Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) {
333
430k
    if (_prepared) {
334
0
        return Status::InternalError("Already prepared");
335
0
    }
336
430k
    if (_params.__isset.query_options && _params.query_options.__isset.execution_timeout) {
337
430k
        _timeout = _params.query_options.execution_timeout;
338
430k
    }
339
340
430k
    _fragment_level_profile = std::make_unique<RuntimeProfile>("PipelineContext");
341
430k
    _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime");
342
430k
    SCOPED_TIMER(_prepare_timer);
343
430k
    _build_pipelines_timer = ADD_TIMER(_fragment_level_profile, "BuildPipelinesTime");
344
430k
    _init_context_timer = ADD_TIMER(_fragment_level_profile, "InitContextTime");
345
430k
    _plan_local_exchanger_timer = ADD_TIMER(_fragment_level_profile, "PlanLocalLocalExchangerTime");
346
430k
    _build_tasks_timer = ADD_TIMER(_fragment_level_profile, "BuildTasksTime");
347
430k
    _prepare_all_pipelines_timer = ADD_TIMER(_fragment_level_profile, "PrepareAllPipelinesTime");
348
430k
    {
349
430k
        SCOPED_TIMER(_init_context_timer);
350
430k
        cast_set(_num_instances, _params.local_params.size());
351
430k
        _total_instances =
352
430k
                _params.__isset.total_instances ? _params.total_instances : _num_instances;
353
354
430k
        auto* fragment_context = this;
355
356
430k
        if (_params.query_options.__isset.is_report_success) {
357
428k
            fragment_context->set_is_report_success(_params.query_options.is_report_success);
358
428k
        }
359
360
        // 1. Set up the global runtime state.
361
430k
        _runtime_state = RuntimeState::create_unique(
362
430k
                _params.query_id, _params.fragment_id, _params.query_options,
363
430k
                _query_ctx->query_globals, _exec_env, _query_ctx.get());
364
430k
        _runtime_state->set_task_execution_context(shared_from_this());
365
430k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
366
430k
        if (_params.__isset.backend_id) {
367
427k
            _runtime_state->set_backend_id(_params.backend_id);
368
427k
        }
369
430k
        if (_params.__isset.import_label) {
370
240
            _runtime_state->set_import_label(_params.import_label);
371
240
        }
372
430k
        if (_params.__isset.db_name) {
373
192
            _runtime_state->set_db_name(_params.db_name);
374
192
        }
375
430k
        if (_params.__isset.load_job_id) {
376
0
            _runtime_state->set_load_job_id(_params.load_job_id);
377
0
        }
378
379
430k
        if (_params.is_simplified_param) {
380
143k
            _desc_tbl = _query_ctx->desc_tbl;
381
286k
        } else {
382
286k
            DCHECK(_params.__isset.desc_tbl);
383
286k
            RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), _params.desc_tbl,
384
286k
                                                  &_desc_tbl));
385
286k
        }
386
430k
        _runtime_state->set_desc_tbl(_desc_tbl);
387
430k
        _runtime_state->set_num_per_fragment_instances(_params.num_senders);
388
430k
        _runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
389
430k
        _runtime_state->set_total_load_streams(_params.total_load_streams);
390
430k
        _runtime_state->set_num_local_sink(_params.num_local_sink);
391
392
        // init fragment_instance_ids
393
430k
        const auto target_size = _params.local_params.size();
394
430k
        _fragment_instance_ids.resize(target_size);
395
1.52M
        for (size_t i = 0; i < _params.local_params.size(); i++) {
396
1.09M
            auto fragment_instance_id = _params.local_params[i].fragment_instance_id;
397
1.09M
            _fragment_instance_ids[i] = fragment_instance_id;
398
1.09M
        }
399
430k
    }
400
401
430k
    RETURN_IF_ERROR(_build_and_prepare_full_pipeline(thread_pool));
402
403
428k
    _init_next_report_time();
404
405
428k
    _prepared = true;
406
428k
    return Status::OK();
407
430k
}
408
409
Status PipelineFragmentContext::_build_pipeline_tasks_for_instance(
410
        int instance_idx,
411
1.09M
        const std::vector<std::shared_ptr<RuntimeProfile>>& pipeline_id_to_profile) {
412
1.09M
    const auto& local_params = _params.local_params[instance_idx];
413
1.09M
    auto fragment_instance_id = local_params.fragment_instance_id;
414
1.09M
    auto runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(false);
415
1.09M
    std::map<PipelineId, PipelineTask*> pipeline_id_to_task;
416
1.09M
    auto get_shared_state = [&](PipelinePtr pipeline)
417
1.09M
            -> std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
418
1.85M
                                       std::vector<std::shared_ptr<Dependency>>>> {
419
1.85M
        std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
420
1.85M
                                std::vector<std::shared_ptr<Dependency>>>>
421
1.85M
                shared_state_map;
422
2.37M
        for (auto& op : pipeline->operators()) {
423
2.37M
            auto source_id = op->operator_id();
424
2.37M
            if (auto iter = _op_id_to_shared_state.find(source_id);
425
2.37M
                iter != _op_id_to_shared_state.end()) {
426
681k
                shared_state_map.insert({source_id, iter->second});
427
681k
            }
428
2.37M
        }
429
1.85M
        for (auto sink_to_source_id : pipeline->sink()->dests_id()) {
430
1.85M
            if (auto iter = _op_id_to_shared_state.find(sink_to_source_id);
431
1.85M
                iter != _op_id_to_shared_state.end()) {
432
277k
                shared_state_map.insert({sink_to_source_id, iter->second});
433
277k
            }
434
1.85M
        }
435
1.85M
        return shared_state_map;
436
1.85M
    };
437
438
3.35M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
439
2.25M
        auto& pipeline = _pipelines[pip_idx];
440
2.25M
        if (pipeline->num_tasks() > 1 || instance_idx == 0) {
441
1.84M
            auto task_runtime_state = RuntimeState::create_unique(
442
1.84M
                    local_params.fragment_instance_id, _params.query_id, _params.fragment_id,
443
1.84M
                    _params.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get());
444
1.84M
            {
445
                // Initialize runtime state for this task
446
1.84M
                task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker());
447
448
1.84M
                task_runtime_state->set_task_execution_context(shared_from_this());
449
1.84M
                task_runtime_state->set_be_number(local_params.backend_num);
450
451
1.84M
                if (_params.__isset.backend_id) {
452
1.84M
                    task_runtime_state->set_backend_id(_params.backend_id);
453
1.84M
                }
454
1.84M
                if (_params.__isset.import_label) {
455
241
                    task_runtime_state->set_import_label(_params.import_label);
456
241
                }
457
1.84M
                if (_params.__isset.db_name) {
458
193
                    task_runtime_state->set_db_name(_params.db_name);
459
193
                }
460
1.84M
                if (_params.__isset.load_job_id) {
461
0
                    task_runtime_state->set_load_job_id(_params.load_job_id);
462
0
                }
463
1.84M
                if (_params.__isset.wal_id) {
464
114
                    task_runtime_state->set_wal_id(_params.wal_id);
465
114
                }
466
1.84M
                if (_params.__isset.content_length) {
467
31
                    task_runtime_state->set_content_length(_params.content_length);
468
31
                }
469
470
1.84M
                task_runtime_state->set_desc_tbl(_desc_tbl);
471
1.84M
                task_runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
472
1.84M
                task_runtime_state->set_num_per_fragment_instances(_params.num_senders);
473
1.84M
                task_runtime_state->resize_op_id_to_local_state(max_operator_id());
474
1.84M
                task_runtime_state->set_max_operator_id(max_operator_id());
475
1.84M
                task_runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
476
1.84M
                task_runtime_state->set_total_load_streams(_params.total_load_streams);
477
1.84M
                task_runtime_state->set_num_local_sink(_params.num_local_sink);
478
479
1.84M
                task_runtime_state->set_runtime_filter_mgr(runtime_filter_mgr.get());
480
1.84M
            }
481
1.84M
            auto cur_task_id = _total_tasks++;
482
1.84M
            task_runtime_state->set_task_id(cur_task_id);
483
1.84M
            task_runtime_state->set_task_num(pipeline->num_tasks());
484
1.84M
            auto task = std::make_shared<PipelineTask>(
485
1.84M
                    pipeline, cur_task_id, task_runtime_state.get(),
486
1.84M
                    std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
487
1.84M
                    pipeline_id_to_profile[pip_idx].get(), get_shared_state(pipeline),
488
1.84M
                    instance_idx);
489
1.84M
            pipeline->incr_created_tasks(instance_idx, task.get());
490
1.84M
            pipeline_id_to_task.insert({pipeline->id(), task.get()});
491
1.84M
            _tasks[instance_idx].emplace_back(
492
1.84M
                    std::pair<std::shared_ptr<PipelineTask>, std::unique_ptr<RuntimeState>> {
493
1.84M
                            std::move(task), std::move(task_runtime_state)});
494
1.84M
        }
495
2.25M
    }
496
497
    /**
498
         * Build DAG for pipeline tasks.
499
         * For example, we have
500
         *
501
         *   ExchangeSink (Pipeline1)     JoinBuildSink (Pipeline2)
502
         *            \                      /
503
         *          JoinProbeOperator1 (Pipeline1)    JoinBuildSink (Pipeline3)
504
         *                 \                          /
505
         *               JoinProbeOperator2 (Pipeline1)
506
         *
507
         * In this fragment, we have three pipelines and pipeline 1 depends on pipeline 2 and pipeline 3.
508
         * To build this DAG, `_dag` manage dependencies between pipelines by pipeline ID and
509
         * `pipeline_id_to_task` is used to find the task by a unique pipeline ID.
510
         *
511
         * Finally, we have two upstream dependencies in Pipeline1 corresponding to JoinProbeOperator1
512
         * and JoinProbeOperator2.
513
         */
514
2.25M
    for (auto& _pipeline : _pipelines) {
515
2.25M
        if (pipeline_id_to_task.contains(_pipeline->id())) {
516
1.84M
            auto* task = pipeline_id_to_task[_pipeline->id()];
517
1.84M
            DCHECK(task != nullptr);
518
519
            // If this task has upstream dependency, then inject it into this task.
520
1.84M
            if (_dag.contains(_pipeline->id())) {
521
1.16M
                auto& deps = _dag[_pipeline->id()];
522
1.81M
                for (auto& dep : deps) {
523
1.81M
                    if (pipeline_id_to_task.contains(dep)) {
524
1.00M
                        auto ss = pipeline_id_to_task[dep]->get_sink_shared_state();
525
1.00M
                        if (ss) {
526
465k
                            task->inject_shared_state(ss);
527
542k
                        } else {
528
542k
                            pipeline_id_to_task[dep]->inject_shared_state(
529
542k
                                    task->get_source_shared_state());
530
542k
                        }
531
1.00M
                    }
532
1.81M
                }
533
1.16M
            }
534
1.84M
        }
535
2.25M
    }
536
3.35M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
537
2.25M
        if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
538
1.84M
            auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
539
1.84M
            DCHECK(pipeline_id_to_profile[pip_idx]);
540
1.84M
            std::vector<TScanRangeParams> scan_ranges;
541
1.84M
            auto node_id = _pipelines[pip_idx]->operators().front()->node_id();
542
1.84M
            if (local_params.per_node_scan_ranges.contains(node_id)) {
543
343k
                scan_ranges = local_params.per_node_scan_ranges.find(node_id)->second;
544
343k
            }
545
1.84M
            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(task->prepare(scan_ranges, local_params.sender_id,
546
1.84M
                                                             _params.fragment.output_sink));
547
1.84M
        }
548
2.25M
    }
549
1.09M
    {
550
1.09M
        std::lock_guard<std::mutex> l(_state_map_lock);
551
1.09M
        _runtime_filter_mgr_map[instance_idx] = std::move(runtime_filter_mgr);
552
1.09M
    }
553
1.09M
    return Status::OK();
554
1.09M
}
555
556
429k
Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
557
429k
    _total_tasks = 0;
558
429k
    _closed_tasks = 0;
559
429k
    const auto target_size = _params.local_params.size();
560
429k
    _tasks.resize(target_size);
561
429k
    _runtime_filter_mgr_map.resize(target_size);
562
1.08M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
563
655k
        _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get();
564
655k
    }
565
429k
    auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size());
566
567
429k
    if (target_size > 1 &&
568
429k
        (_runtime_state->query_options().__isset.parallel_prepare_threshold &&
569
122k
         target_size > _runtime_state->query_options().parallel_prepare_threshold)) {
570
        // If instances parallelism is big enough ( > parallel_prepare_threshold), we will prepare all tasks by multi-threads
571
17.5k
        std::vector<Status> prepare_status(target_size);
572
17.5k
        int submitted_tasks = 0;
573
17.5k
        Status submit_status;
574
17.5k
        CountDownLatch latch((int)target_size);
575
244k
        for (int i = 0; i < target_size; i++) {
576
226k
            submit_status = thread_pool->submit_func([&, i]() {
577
226k
                SCOPED_ATTACH_TASK(_query_ctx.get());
578
226k
                prepare_status[i] = _build_pipeline_tasks_for_instance(i, pipeline_id_to_profile);
579
226k
                latch.count_down();
580
226k
            });
581
226k
            if (LIKELY(submit_status.ok())) {
582
226k
                submitted_tasks++;
583
18.4E
            } else {
584
18.4E
                break;
585
18.4E
            }
586
226k
        }
587
17.5k
        latch.arrive_and_wait(target_size - submitted_tasks);
588
17.5k
        if (UNLIKELY(!submit_status.ok())) {
589
0
            return submit_status;
590
0
        }
591
244k
        for (int i = 0; i < submitted_tasks; i++) {
592
226k
            if (!prepare_status[i].ok()) {
593
0
                return prepare_status[i];
594
0
            }
595
226k
        }
596
411k
    } else {
597
1.28M
        for (int i = 0; i < target_size; i++) {
598
873k
            RETURN_IF_ERROR(_build_pipeline_tasks_for_instance(i, pipeline_id_to_profile));
599
873k
        }
600
411k
    }
601
429k
    _pipeline_parent_map.clear();
602
429k
    _op_id_to_shared_state.clear();
603
604
429k
    return Status::OK();
605
429k
}
606
607
427k
void PipelineFragmentContext::_init_next_report_time() {
608
427k
    auto interval_s = config::pipeline_status_report_interval;
609
427k
    if (_is_report_success && interval_s > 0 && _timeout > interval_s) {
610
40.8k
        VLOG_FILE << "enable period report: fragment id=" << _fragment_id;
611
40.8k
        uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC;
612
        // We don't want to wait longer than it takes to run the entire fragment.
613
40.8k
        _previous_report_time =
614
40.8k
                MonotonicNanos() + report_fragment_offset - (uint64_t)(interval_s)*NANOS_PER_SEC;
615
40.8k
        _disable_period_report = false;
616
40.8k
    }
617
427k
}
618
619
4.82k
void PipelineFragmentContext::refresh_next_report_time() {
620
4.82k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
621
4.82k
    DCHECK(disable == true);
622
4.82k
    _previous_report_time.store(MonotonicNanos(), std::memory_order_release);
623
4.82k
    _disable_period_report.compare_exchange_strong(disable, false);
624
4.82k
}
625
626
6.72M
void PipelineFragmentContext::trigger_report_if_necessary() {
627
6.72M
    if (!_is_report_success) {
628
6.22M
        return;
629
6.22M
    }
630
500k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
631
500k
    if (disable) {
632
8.96k
        return;
633
8.96k
    }
634
491k
    int32_t interval_s = config::pipeline_status_report_interval;
635
491k
    if (interval_s <= 0) {
636
0
        LOG(WARNING) << "config::status_report_interval is equal to or less than zero, do not "
637
0
                        "trigger "
638
0
                        "report.";
639
0
    }
640
491k
    uint64_t next_report_time = _previous_report_time.load(std::memory_order_acquire) +
641
491k
                                (uint64_t)(interval_s)*NANOS_PER_SEC;
642
491k
    if (MonotonicNanos() > next_report_time) {
643
4.83k
        if (!_disable_period_report.compare_exchange_strong(disable, true,
644
4.83k
                                                            std::memory_order_acq_rel)) {
645
10
            return;
646
10
        }
647
4.82k
        if (VLOG_FILE_IS_ON) {
648
0
            VLOG_FILE << "Reporting "
649
0
                      << "profile for query_id " << print_id(_query_id)
650
0
                      << ", fragment id: " << _fragment_id;
651
652
0
            std::stringstream ss;
653
0
            _runtime_state->runtime_profile()->compute_time_in_profile();
654
0
            _runtime_state->runtime_profile()->pretty_print(&ss);
655
0
            if (_runtime_state->load_channel_profile()) {
656
0
                _runtime_state->load_channel_profile()->pretty_print(&ss);
657
0
            }
658
659
0
            VLOG_FILE << "Query " << print_id(get_query_id()) << " fragment " << get_fragment_id()
660
0
                      << " profile:\n"
661
0
                      << ss.str();
662
0
        }
663
4.82k
        auto st = send_report(false);
664
4.82k
        if (!st.ok()) {
665
0
            disable = true;
666
0
            _disable_period_report.compare_exchange_strong(disable, false,
667
0
                                                           std::memory_order_acq_rel);
668
0
        }
669
4.82k
    }
670
491k
}
671
672
Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const DescriptorTbl& descs,
673
427k
                                                 OperatorPtr* root, PipelinePtr cur_pipe) {
674
427k
    if (_params.fragment.plan.nodes.empty()) {
675
0
        throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no plan node!");
676
0
    }
677
678
427k
    int node_idx = 0;
679
680
427k
    RETURN_IF_ERROR(_create_tree_helper(pool, _params.fragment.plan.nodes, descs, nullptr,
681
427k
                                        &node_idx, root, cur_pipe, 0, false, false));
682
683
427k
    if (node_idx + 1 != _params.fragment.plan.nodes.size()) {
684
0
        return Status::InternalError(
685
0
                "Plan tree only partially reconstructed. Not all thrift nodes were used.");
686
0
    }
687
427k
    return Status::OK();
688
427k
}
689
690
Status PipelineFragmentContext::_create_tree_helper(
691
        ObjectPool* pool, const std::vector<TPlanNode>& tnodes, const DescriptorTbl& descs,
692
        OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx,
693
661k
        const bool followed_by_shuffled_operator, const bool require_bucket_distribution) {
694
    // propagate error case
695
661k
    if (*node_idx >= tnodes.size()) {
696
0
        return Status::InternalError(
697
0
                "Failed to reconstruct plan tree from thrift. Node id: {}, number of nodes: {}",
698
0
                *node_idx, tnodes.size());
699
0
    }
700
661k
    const TPlanNode& tnode = tnodes[*node_idx];
701
702
661k
    int num_children = tnodes[*node_idx].num_children;
703
661k
    bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
704
661k
    bool current_require_bucket_distribution = require_bucket_distribution;
705
    // TODO: Create CacheOperator is confused now
706
661k
    OperatorPtr op = nullptr;
707
661k
    OperatorPtr cache_op = nullptr;
708
661k
    RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op, cur_pipe,
709
661k
                                     parent == nullptr ? -1 : parent->node_id(), child_idx,
710
661k
                                     followed_by_shuffled_operator,
711
661k
                                     current_require_bucket_distribution, cache_op));
712
    // Initialization must be done here. For example, group by expressions in agg will be used to
713
    // decide if a local shuffle should be planed, so it must be initialized here.
714
661k
    RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
715
    // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
716
661k
    if (parent != nullptr) {
717
        // add to parent's child(s)
718
234k
        RETURN_IF_ERROR(parent->set_child(cache_op ? cache_op : op));
719
426k
    } else {
720
426k
        *root = op;
721
426k
    }
722
    /**
723
     * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
724
     *
725
     * For plan:
726
     * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
727
     *                           Exchange(id=3) -> ShuffledHashJoinBuild(id=2)
728
     * We must ensure data distribution of `LocalExchange(id=0)` is same as Exchange(id=3).
729
     *
730
     * If an operator's is followed by a local exchange without shuffle (e.g. passthrough), a
731
     * shuffled local exchanger will be used before join so it is not followed by shuffle join.
732
     */
733
661k
    auto required_data_distribution =
734
661k
            cur_pipe->operators().empty()
735
661k
                    ? cur_pipe->sink()->required_data_distribution(_runtime_state.get())
736
661k
                    : op->required_data_distribution(_runtime_state.get());
737
661k
    current_followed_by_shuffled_operator =
738
661k
            ((followed_by_shuffled_operator ||
739
661k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_shuffled_operator()
740
603k
                                             : op->is_shuffled_operator())) &&
741
661k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
742
661k
            (followed_by_shuffled_operator &&
743
550k
             required_data_distribution.distribution_type == ExchangeType::NOOP);
744
745
661k
    current_require_bucket_distribution =
746
661k
            ((require_bucket_distribution ||
747
661k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_colocated_operator()
748
607k
                                             : op->is_colocated_operator())) &&
749
661k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
750
661k
            (require_bucket_distribution &&
751
556k
             required_data_distribution.distribution_type == ExchangeType::NOOP);
752
753
661k
    if (num_children == 0) {
754
443k
        _use_serial_source = op->is_serial_operator();
755
443k
    }
756
    // rely on that tnodes is preorder of the plan
757
895k
    for (int i = 0; i < num_children; i++) {
758
234k
        ++*node_idx;
759
234k
        RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, descs, op, node_idx, nullptr, cur_pipe, i,
760
234k
                                            current_followed_by_shuffled_operator,
761
234k
                                            current_require_bucket_distribution));
762
763
        // we are expecting a child, but have used all nodes
764
        // this means we have been given a bad tree and must fail
765
234k
        if (*node_idx >= tnodes.size()) {
766
0
            return Status::InternalError(
767
0
                    "Failed to reconstruct plan tree from thrift. Node id: {}, number of "
768
0
                    "nodes: {}",
769
0
                    *node_idx, tnodes.size());
770
0
        }
771
234k
    }
772
773
661k
    return Status::OK();
774
661k
}
775
776
void PipelineFragmentContext::_inherit_pipeline_properties(
777
        const DataDistribution& data_distribution, PipelinePtr pipe_with_source,
778
98.0k
        PipelinePtr pipe_with_sink) {
779
98.0k
    pipe_with_sink->set_num_tasks(pipe_with_source->num_tasks());
780
98.0k
    pipe_with_source->set_num_tasks(_num_instances);
781
98.0k
    pipe_with_source->set_data_distribution(data_distribution);
782
98.0k
}
783
784
Status PipelineFragmentContext::_add_local_exchange_impl(
785
        int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip,
786
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
787
        const std::map<int, int>& bucket_seq_to_instance_idx,
788
97.9k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
789
97.9k
    auto& operators = cur_pipe->operators();
790
97.9k
    const auto downstream_pipeline_id = cur_pipe->id();
791
97.9k
    auto local_exchange_id = next_operator_id();
792
    // 1. Create a new pipeline with local exchange sink.
793
97.9k
    DataSinkOperatorPtr sink;
794
97.9k
    auto sink_id = next_sink_operator_id();
795
796
    /**
797
     * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
798
     * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
799
     */
800
97.9k
    const bool followed_by_shuffled_operator =
801
97.9k
            operators.size() > idx ? operators[idx]->followed_by_shuffled_operator()
802
97.9k
                                   : cur_pipe->sink()->followed_by_shuffled_operator();
803
97.9k
    const bool use_global_hash_shuffle = bucket_seq_to_instance_idx.empty() &&
804
97.9k
                                         !shuffle_idx_to_instance_idx.contains(-1) &&
805
97.9k
                                         followed_by_shuffled_operator && !_use_serial_source;
806
97.9k
    sink = std::make_shared<LocalExchangeSinkOperatorX>(
807
97.9k
            sink_id, local_exchange_id, use_global_hash_shuffle ? _total_instances : _num_instances,
808
97.9k
            data_distribution.partition_exprs, bucket_seq_to_instance_idx);
809
97.9k
    if (bucket_seq_to_instance_idx.empty() &&
810
97.9k
        data_distribution.distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE) {
811
5
        data_distribution.distribution_type = ExchangeType::HASH_SHUFFLE;
812
5
    }
813
97.9k
    RETURN_IF_ERROR(new_pip->set_sink(sink));
814
97.9k
    RETURN_IF_ERROR(new_pip->sink()->init(_runtime_state.get(), data_distribution.distribution_type,
815
97.9k
                                          num_buckets, use_global_hash_shuffle,
816
97.9k
                                          shuffle_idx_to_instance_idx));
817
818
    // 2. Create and initialize LocalExchangeSharedState.
819
97.9k
    std::shared_ptr<LocalExchangeSharedState> shared_state =
820
97.9k
            LocalExchangeSharedState::create_shared(_num_instances);
821
97.9k
    switch (data_distribution.distribution_type) {
822
10.6k
    case ExchangeType::HASH_SHUFFLE:
823
10.6k
        shared_state->exchanger = ShuffleExchanger::create_unique(
824
10.6k
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
825
10.6k
                use_global_hash_shuffle ? _total_instances : _num_instances,
826
10.6k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
827
10.6k
                        ? cast_set<int>(
828
10.6k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
829
10.6k
                        : 0);
830
10.6k
        break;
831
502
    case ExchangeType::BUCKET_HASH_SHUFFLE:
832
502
        shared_state->exchanger = BucketShuffleExchanger::create_unique(
833
502
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, num_buckets,
834
502
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
835
502
                        ? cast_set<int>(
836
502
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
837
502
                        : 0);
838
502
        break;
839
83.4k
    case ExchangeType::PASSTHROUGH:
840
83.4k
        shared_state->exchanger = PassthroughExchanger::create_unique(
841
83.4k
                cur_pipe->num_tasks(), _num_instances,
842
83.4k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
843
83.4k
                        ? cast_set<int>(
844
83.4k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
845
83.4k
                        : 0);
846
83.4k
        break;
847
281
    case ExchangeType::BROADCAST:
848
281
        shared_state->exchanger = BroadcastExchanger::create_unique(
849
281
                cur_pipe->num_tasks(), _num_instances,
850
281
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
851
281
                        ? cast_set<int>(
852
281
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
853
281
                        : 0);
854
281
        break;
855
2.34k
    case ExchangeType::PASS_TO_ONE:
856
2.34k
        if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
857
            // If shared hash table is enabled for BJ, hash table will be built by only one task
858
1.55k
            shared_state->exchanger = PassToOneExchanger::create_unique(
859
1.55k
                    cur_pipe->num_tasks(), _num_instances,
860
1.55k
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
861
1.55k
                            ? cast_set<int>(_runtime_state->query_options()
862
1.55k
                                                    .local_exchange_free_blocks_limit)
863
1.55k
                            : 0);
864
1.55k
        } else {
865
788
            shared_state->exchanger = BroadcastExchanger::create_unique(
866
788
                    cur_pipe->num_tasks(), _num_instances,
867
788
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
868
788
                            ? cast_set<int>(_runtime_state->query_options()
869
788
                                                    .local_exchange_free_blocks_limit)
870
788
                            : 0);
871
788
        }
872
2.34k
        break;
873
737
    case ExchangeType::ADAPTIVE_PASSTHROUGH:
874
737
        shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
875
737
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
876
737
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
877
737
                        ? cast_set<int>(
878
737
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
879
737
                        : 0);
880
737
        break;
881
0
    default:
882
0
        return Status::InternalError("Unsupported local exchange type : " +
883
0
                                     std::to_string((int)data_distribution.distribution_type));
884
97.9k
    }
885
98.0k
    shared_state->create_source_dependencies(_num_instances, local_exchange_id, local_exchange_id,
886
98.0k
                                             "LOCAL_EXCHANGE_OPERATOR");
887
98.0k
    shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
888
98.0k
    _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
889
890
    // 3. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to
891
    // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink].
892
893
    // 3.1 Initialize new pipeline's operator list.
894
98.0k
    std::copy(operators.begin(), operators.begin() + idx,
895
98.0k
              std::inserter(new_pip->operators(), new_pip->operators().end()));
896
897
    // 3.2 Erase unused operators in previous pipeline.
898
98.0k
    operators.erase(operators.begin(), operators.begin() + idx);
899
900
    // 4. Initialize LocalExchangeSource and insert it into this pipeline.
901
98.0k
    OperatorPtr source_op;
902
98.0k
    source_op = std::make_shared<LocalExchangeSourceOperatorX>(pool, local_exchange_id);
903
98.0k
    RETURN_IF_ERROR(source_op->set_child(new_pip->operators().back()));
904
98.0k
    RETURN_IF_ERROR(source_op->init(data_distribution.distribution_type));
905
98.0k
    if (!operators.empty()) {
906
36.7k
        RETURN_IF_ERROR(operators.front()->set_child(nullptr));
907
36.7k
        RETURN_IF_ERROR(operators.front()->set_child(source_op));
908
36.7k
    }
909
98.0k
    operators.insert(operators.begin(), source_op);
910
911
    // 5. Set children for two pipelines separately.
912
98.0k
    std::vector<std::shared_ptr<Pipeline>> new_children;
913
98.0k
    std::vector<PipelineId> edges_with_source;
914
114k
    for (auto child : cur_pipe->children()) {
915
114k
        bool found = false;
916
127k
        for (auto op : new_pip->operators()) {
917
127k
            if (child->sink()->node_id() == op->node_id()) {
918
11.8k
                new_pip->set_children(child);
919
11.8k
                found = true;
920
11.8k
            };
921
127k
        }
922
114k
        if (!found) {
923
102k
            new_children.push_back(child);
924
102k
            edges_with_source.push_back(child->id());
925
102k
        }
926
114k
    }
927
98.0k
    new_children.push_back(new_pip);
928
98.0k
    edges_with_source.push_back(new_pip->id());
929
930
    // 6. Set DAG for new pipelines.
931
98.0k
    if (!new_pip->children().empty()) {
932
6.97k
        std::vector<PipelineId> edges_with_sink;
933
11.8k
        for (auto child : new_pip->children()) {
934
11.8k
            edges_with_sink.push_back(child->id());
935
11.8k
        }
936
6.97k
        _dag.insert({new_pip->id(), edges_with_sink});
937
6.97k
    }
938
98.0k
    cur_pipe->set_children(new_children);
939
98.0k
    _dag[downstream_pipeline_id] = edges_with_source;
940
98.0k
    RETURN_IF_ERROR(new_pip->sink()->set_child(new_pip->operators().back()));
941
98.0k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(nullptr));
942
98.0k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(cur_pipe->operators().back()));
943
944
    // 7. Inherit properties from current pipeline.
945
98.0k
    _inherit_pipeline_properties(data_distribution, cur_pipe, new_pip);
946
98.0k
    return Status::OK();
947
98.0k
}
948
949
Status PipelineFragmentContext::_add_local_exchange(
950
        int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe,
951
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
952
        const std::map<int, int>& bucket_seq_to_instance_idx,
953
180k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
954
180k
    if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) {
955
52.7k
        return Status::OK();
956
52.7k
    }
957
958
127k
    if (!cur_pipe->need_to_local_exchange(data_distribution, idx)) {
959
40.8k
        return Status::OK();
960
40.8k
    }
961
86.9k
    *do_local_exchange = true;
962
963
86.9k
    auto& operators = cur_pipe->operators();
964
86.9k
    auto total_op_num = operators.size();
965
86.9k
    auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
966
86.9k
    RETURN_IF_ERROR(_add_local_exchange_impl(
967
86.9k
            idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets,
968
86.9k
            bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
969
970
18.4E
    CHECK(total_op_num + 1 == cur_pipe->operators().size() + new_pip->operators().size())
971
18.4E
            << "total_op_num: " << total_op_num
972
18.4E
            << " cur_pipe->operators().size(): " << cur_pipe->operators().size()
973
18.4E
            << " new_pip->operators().size(): " << new_pip->operators().size();
974
975
    // There are some local shuffles with relatively heavy operations on the sink.
976
    // If the local sink concurrency is 1 and the local source concurrency is n, the sink becomes a bottleneck.
977
    // Therefore, local passthrough is used to increase the concurrency of the sink.
978
    // op -> local sink(1) -> local source (n)
979
    // op -> local passthrough(1) -> local passthrough(n) ->  local sink(n) -> local source (n)
980
87.1k
    if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 &&
981
86.9k
        Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) {
982
10.7k
        RETURN_IF_ERROR(_add_local_exchange_impl(
983
10.7k
                cast_set<int>(new_pip->operators().size()), pool, new_pip,
984
10.7k
                add_pipeline(new_pip, pip_idx + 2), DataDistribution(ExchangeType::PASSTHROUGH),
985
10.7k
                do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
986
10.7k
                shuffle_idx_to_instance_idx));
987
10.7k
    }
988
86.9k
    return Status::OK();
989
86.9k
}
990
991
Status PipelineFragmentContext::_plan_local_exchange(
992
        int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx,
993
426k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
994
982k
    for (int pip_idx = cast_set<int>(_pipelines.size()) - 1; pip_idx >= 0; pip_idx--) {
995
555k
        _pipelines[pip_idx]->init_data_distribution(_runtime_state.get());
996
        // Set property if child pipeline is not join operator's child.
997
555k
        if (!_pipelines[pip_idx]->children().empty()) {
998
122k
            for (auto& child : _pipelines[pip_idx]->children()) {
999
122k
                if (child->sink()->node_id() ==
1000
122k
                    _pipelines[pip_idx]->operators().front()->node_id()) {
1001
110k
                    _pipelines[pip_idx]->set_data_distribution(child->data_distribution());
1002
110k
                }
1003
122k
            }
1004
118k
        }
1005
1006
        // if 'num_buckets == 0' means the fragment is colocated by exchange node not the
1007
        // scan node. so here use `_num_instance` to replace the `num_buckets` to prevent dividing 0
1008
        // still keep colocate plan after local shuffle
1009
555k
        RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx, _pipelines[pip_idx],
1010
555k
                                             bucket_seq_to_instance_idx,
1011
555k
                                             shuffle_idx_to_instance_idx));
1012
555k
    }
1013
426k
    return Status::OK();
1014
426k
}
1015
1016
Status PipelineFragmentContext::_plan_local_exchange(
1017
        int num_buckets, int pip_idx, PipelinePtr pip,
1018
        const std::map<int, int>& bucket_seq_to_instance_idx,
1019
554k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1020
554k
    int idx = 1;
1021
554k
    bool do_local_exchange = false;
1022
591k
    do {
1023
591k
        auto& ops = pip->operators();
1024
591k
        do_local_exchange = false;
1025
        // Plan local exchange for each operator.
1026
666k
        for (; idx < ops.size();) {
1027
111k
            if (ops[idx]->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1028
95.3k
                RETURN_IF_ERROR(_add_local_exchange(
1029
95.3k
                        pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip,
1030
95.3k
                        ops[idx]->required_data_distribution(_runtime_state.get()),
1031
95.3k
                        &do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
1032
95.3k
                        shuffle_idx_to_instance_idx));
1033
95.3k
            }
1034
111k
            if (do_local_exchange) {
1035
                // If local exchange is needed for current operator, we will split this pipeline to
1036
                // two pipelines by local exchange sink/source. And then we need to process remaining
1037
                // operators in this pipeline so we set idx to 2 (0 is local exchange source and 1
1038
                // is current operator was already processed) and continue to plan local exchange.
1039
36.7k
                idx = 2;
1040
36.7k
                break;
1041
36.7k
            }
1042
74.9k
            idx++;
1043
74.9k
        }
1044
591k
    } while (do_local_exchange);
1045
554k
    if (pip->sink()->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1046
85.1k
        RETURN_IF_ERROR(_add_local_exchange(
1047
85.1k
                pip_idx, idx, pip->sink()->node_id(), _runtime_state->obj_pool(), pip,
1048
85.1k
                pip->sink()->required_data_distribution(_runtime_state.get()), &do_local_exchange,
1049
85.1k
                num_buckets, bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1050
85.1k
    }
1051
554k
    return Status::OK();
1052
554k
}
1053
1054
Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
1055
                                                  const std::vector<TExpr>& output_exprs,
1056
                                                  const TPipelineFragmentParams& params,
1057
                                                  const RowDescriptor& row_desc,
1058
                                                  RuntimeState* state, DescriptorTbl& desc_tbl,
1059
429k
                                                  PipelineId cur_pipeline_id) {
1060
429k
    switch (thrift_sink.type) {
1061
141k
    case TDataSinkType::DATA_STREAM_SINK: {
1062
141k
        if (!thrift_sink.__isset.stream_sink) {
1063
0
            return Status::InternalError("Missing data stream sink.");
1064
0
        }
1065
141k
        _sink = std::make_shared<ExchangeSinkOperatorX>(
1066
141k
                state, row_desc, next_sink_operator_id(), thrift_sink.stream_sink,
1067
141k
                params.destinations, _fragment_instance_ids);
1068
141k
        break;
1069
141k
    }
1070
250k
    case TDataSinkType::RESULT_SINK: {
1071
250k
        if (!thrift_sink.__isset.result_sink) {
1072
0
            return Status::InternalError("Missing data buffer sink.");
1073
0
        }
1074
1075
250k
        _sink = std::make_shared<ResultSinkOperatorX>(next_sink_operator_id(), row_desc,
1076
250k
                                                      output_exprs, thrift_sink.result_sink);
1077
250k
        break;
1078
250k
    }
1079
104
    case TDataSinkType::DICTIONARY_SINK: {
1080
104
        if (!thrift_sink.__isset.dictionary_sink) {
1081
0
            return Status::InternalError("Missing dict sink.");
1082
0
        }
1083
1084
104
        _sink = std::make_shared<DictSinkOperatorX>(next_sink_operator_id(), row_desc, output_exprs,
1085
104
                                                    thrift_sink.dictionary_sink);
1086
104
        break;
1087
104
    }
1088
0
    case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
1089
31.2k
    case TDataSinkType::OLAP_TABLE_SINK: {
1090
31.2k
        if (state->query_options().enable_memtable_on_sink_node &&
1091
31.2k
            !_has_inverted_index_v1_or_partial_update(thrift_sink.olap_table_sink) &&
1092
31.2k
            !config::is_cloud_mode()) {
1093
2.11k
            _sink = std::make_shared<OlapTableSinkV2OperatorX>(pool, next_sink_operator_id(),
1094
2.11k
                                                               row_desc, output_exprs);
1095
29.1k
        } else {
1096
29.1k
            _sink = std::make_shared<OlapTableSinkOperatorX>(pool, next_sink_operator_id(),
1097
29.1k
                                                             row_desc, output_exprs);
1098
29.1k
        }
1099
31.2k
        break;
1100
0
    }
1101
165
    case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
1102
165
        DCHECK(thrift_sink.__isset.olap_table_sink);
1103
165
        DCHECK(state->get_query_ctx() != nullptr);
1104
165
        state->get_query_ctx()->query_mem_tracker()->is_group_commit_load = true;
1105
165
        _sink = std::make_shared<GroupCommitBlockSinkOperatorX>(next_sink_operator_id(), row_desc,
1106
165
                                                                output_exprs);
1107
165
        break;
1108
0
    }
1109
1.22k
    case TDataSinkType::HIVE_TABLE_SINK: {
1110
1.22k
        if (!thrift_sink.__isset.hive_table_sink) {
1111
0
            return Status::InternalError("Missing hive table sink.");
1112
0
        }
1113
1.22k
        _sink = std::make_shared<HiveTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1114
1.22k
                                                         output_exprs);
1115
1.22k
        break;
1116
1.22k
    }
1117
1.73k
    case TDataSinkType::ICEBERG_TABLE_SINK: {
1118
1.73k
        if (!thrift_sink.__isset.iceberg_table_sink) {
1119
0
            return Status::InternalError("Missing iceberg table sink.");
1120
0
        }
1121
1.73k
        if (thrift_sink.iceberg_table_sink.__isset.sort_info) {
1122
0
            _sink = std::make_shared<SpillIcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1123
0
                                                                     row_desc, output_exprs);
1124
1.73k
        } else {
1125
1.73k
            _sink = std::make_shared<IcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1126
1.73k
                                                                row_desc, output_exprs);
1127
1.73k
        }
1128
1.73k
        break;
1129
1.73k
    }
1130
20
    case TDataSinkType::ICEBERG_DELETE_SINK: {
1131
20
        if (!thrift_sink.__isset.iceberg_delete_sink) {
1132
0
            return Status::InternalError("Missing iceberg delete sink.");
1133
0
        }
1134
20
        _sink = std::make_shared<IcebergDeleteSinkOperatorX>(pool, next_sink_operator_id(),
1135
20
                                                             row_desc, output_exprs);
1136
20
        break;
1137
20
    }
1138
80
    case TDataSinkType::ICEBERG_MERGE_SINK: {
1139
80
        if (!thrift_sink.__isset.iceberg_merge_sink) {
1140
0
            return Status::InternalError("Missing iceberg merge sink.");
1141
0
        }
1142
80
        _sink = std::make_shared<IcebergMergeSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1143
80
                                                            output_exprs);
1144
80
        break;
1145
80
    }
1146
0
    case TDataSinkType::MAXCOMPUTE_TABLE_SINK: {
1147
0
        if (!thrift_sink.__isset.max_compute_table_sink) {
1148
0
            return Status::InternalError("Missing max compute table sink.");
1149
0
        }
1150
0
        _sink = std::make_shared<MCTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1151
0
                                                       output_exprs);
1152
0
        break;
1153
0
    }
1154
80
    case TDataSinkType::JDBC_TABLE_SINK: {
1155
80
        if (!thrift_sink.__isset.jdbc_table_sink) {
1156
0
            return Status::InternalError("Missing data jdbc sink.");
1157
0
        }
1158
80
        if (config::enable_java_support) {
1159
80
            _sink = std::make_shared<JdbcTableSinkOperatorX>(row_desc, next_sink_operator_id(),
1160
80
                                                             output_exprs);
1161
80
        } else {
1162
0
            return Status::InternalError(
1163
0
                    "Jdbc table sink is not enabled, you can change be config "
1164
0
                    "enable_java_support to true and restart be.");
1165
0
        }
1166
80
        break;
1167
80
    }
1168
80
    case TDataSinkType::MEMORY_SCRATCH_SINK: {
1169
3
        if (!thrift_sink.__isset.memory_scratch_sink) {
1170
0
            return Status::InternalError("Missing data buffer sink.");
1171
0
        }
1172
1173
3
        _sink = std::make_shared<MemoryScratchSinkOperatorX>(row_desc, next_sink_operator_id(),
1174
3
                                                             output_exprs);
1175
3
        break;
1176
3
    }
1177
502
    case TDataSinkType::RESULT_FILE_SINK: {
1178
502
        if (!thrift_sink.__isset.result_file_sink) {
1179
0
            return Status::InternalError("Missing result file sink.");
1180
0
        }
1181
1182
        // Result file sink is not the top sink
1183
502
        if (params.__isset.destinations && !params.destinations.empty()) {
1184
0
            _sink = std::make_shared<ResultFileSinkOperatorX>(
1185
0
                    next_sink_operator_id(), row_desc, thrift_sink.result_file_sink,
1186
0
                    params.destinations, output_exprs, desc_tbl);
1187
502
        } else {
1188
502
            _sink = std::make_shared<ResultFileSinkOperatorX>(next_sink_operator_id(), row_desc,
1189
502
                                                              output_exprs);
1190
502
        }
1191
502
        break;
1192
502
    }
1193
2.06k
    case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
1194
2.06k
        DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
1195
2.06k
        DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
1196
2.06k
        auto sink_id = next_sink_operator_id();
1197
2.06k
        const int multi_cast_node_id = sink_id;
1198
2.06k
        auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size();
1199
        // one sink has multiple sources.
1200
2.06k
        std::vector<int> sources;
1201
8.03k
        for (int i = 0; i < sender_size; ++i) {
1202
5.97k
            auto source_id = next_operator_id();
1203
5.97k
            sources.push_back(source_id);
1204
5.97k
        }
1205
1206
2.06k
        _sink = std::make_shared<MultiCastDataStreamSinkOperatorX>(
1207
2.06k
                sink_id, multi_cast_node_id, sources, pool, thrift_sink.multi_cast_stream_sink);
1208
8.03k
        for (int i = 0; i < sender_size; ++i) {
1209
5.97k
            auto new_pipeline = add_pipeline();
1210
            // use to exchange sink
1211
5.97k
            RowDescriptor* exchange_row_desc = nullptr;
1212
5.97k
            {
1213
5.97k
                const auto& tmp_row_desc =
1214
5.97k
                        !thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty()
1215
5.97k
                                ? RowDescriptor(state->desc_tbl(),
1216
5.97k
                                                {thrift_sink.multi_cast_stream_sink.sinks[i]
1217
5.97k
                                                         .output_tuple_id})
1218
5.97k
                                : row_desc;
1219
5.97k
                exchange_row_desc = pool->add(new RowDescriptor(tmp_row_desc));
1220
5.97k
            }
1221
5.97k
            auto source_id = sources[i];
1222
5.97k
            OperatorPtr source_op;
1223
            // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
1224
5.97k
            source_op = std::make_shared<MultiCastDataStreamerSourceOperatorX>(
1225
5.97k
                    /*node_id*/ source_id, /*consumer_id*/ i, pool,
1226
5.97k
                    thrift_sink.multi_cast_stream_sink.sinks[i], row_desc,
1227
5.97k
                    /*operator_id=*/source_id);
1228
5.97k
            RETURN_IF_ERROR(new_pipeline->add_operator(
1229
5.97k
                    source_op, params.__isset.parallel_instances ? params.parallel_instances : 0));
1230
            // 2. create and set sink operator of data stream sender for new pipeline
1231
1232
5.97k
            DataSinkOperatorPtr sink_op;
1233
5.97k
            sink_op = std::make_shared<ExchangeSinkOperatorX>(
1234
5.97k
                    state, *exchange_row_desc, next_sink_operator_id(),
1235
5.97k
                    thrift_sink.multi_cast_stream_sink.sinks[i],
1236
5.97k
                    thrift_sink.multi_cast_stream_sink.destinations[i], _fragment_instance_ids);
1237
1238
5.97k
            RETURN_IF_ERROR(new_pipeline->set_sink(sink_op));
1239
5.97k
            {
1240
5.97k
                TDataSink* t = pool->add(new TDataSink());
1241
5.97k
                t->stream_sink = thrift_sink.multi_cast_stream_sink.sinks[i];
1242
5.97k
                RETURN_IF_ERROR(sink_op->init(*t));
1243
5.97k
            }
1244
1245
            // 3. set dependency dag
1246
5.97k
            _dag[new_pipeline->id()].push_back(cur_pipeline_id);
1247
5.97k
        }
1248
2.06k
        if (sources.empty()) {
1249
0
            return Status::InternalError("size of sources must be greater than 0");
1250
0
        }
1251
2.06k
        break;
1252
2.06k
    }
1253
2.06k
    case TDataSinkType::BLACKHOLE_SINK: {
1254
13
        if (!thrift_sink.__isset.blackhole_sink) {
1255
0
            return Status::InternalError("Missing blackhole sink.");
1256
0
        }
1257
1258
13
        _sink.reset(new BlackholeSinkOperatorX(next_sink_operator_id()));
1259
13
        break;
1260
13
    }
1261
156
    case TDataSinkType::TVF_TABLE_SINK: {
1262
156
        if (!thrift_sink.__isset.tvf_table_sink) {
1263
0
            return Status::InternalError("Missing TVF table sink.");
1264
0
        }
1265
156
        _sink = std::make_shared<TVFTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1266
156
                                                        output_exprs);
1267
156
        break;
1268
156
    }
1269
0
    default:
1270
0
        return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type);
1271
429k
    }
1272
429k
    return Status::OK();
1273
429k
}
1274
1275
// NOLINTBEGIN(readability-function-size)
1276
// NOLINTBEGIN(readability-function-cognitive-complexity)
1277
Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNode& tnode,
1278
                                                 const DescriptorTbl& descs, OperatorPtr& op,
1279
                                                 PipelinePtr& cur_pipe, int parent_idx,
1280
                                                 int child_idx,
1281
                                                 const bool followed_by_shuffled_operator,
1282
                                                 const bool require_bucket_distribution,
1283
663k
                                                 OperatorPtr& cache_op) {
1284
663k
    std::vector<DataSinkOperatorPtr> sink_ops;
1285
663k
    Defer defer = Defer([&]() {
1286
663k
        if (op) {
1287
663k
            op->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1288
663k
        }
1289
663k
        for (auto& s : sink_ops) {
1290
122k
            s->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1291
122k
        }
1292
663k
    });
1293
    // We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
1294
    // Therefore, here we need to use a stack-like structure.
1295
663k
    _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
1296
663k
    std::stringstream error_msg;
1297
663k
    bool enable_query_cache = _params.fragment.__isset.query_cache_param;
1298
1299
663k
    bool fe_with_old_version = false;
1300
663k
    switch (tnode.node_type) {
1301
209k
    case TPlanNodeType::OLAP_SCAN_NODE: {
1302
209k
        op = std::make_shared<OlapScanOperatorX>(
1303
209k
                pool, tnode, next_operator_id(), descs, _num_instances,
1304
209k
                enable_query_cache ? _params.fragment.query_cache_param : TQueryCacheParam {});
1305
209k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1306
209k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1307
209k
        break;
1308
209k
    }
1309
79
    case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
1310
79
        DCHECK(_query_ctx != nullptr);
1311
79
        _query_ctx->query_mem_tracker()->is_group_commit_load = true;
1312
79
        op = std::make_shared<GroupCommitOperatorX>(pool, tnode, next_operator_id(), descs,
1313
79
                                                    _num_instances);
1314
79
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1315
79
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1316
79
        break;
1317
79
    }
1318
0
    case TPlanNodeType::JDBC_SCAN_NODE: {
1319
0
        if (config::enable_java_support) {
1320
0
            op = std::make_shared<JDBCScanOperatorX>(pool, tnode, next_operator_id(), descs,
1321
0
                                                     _num_instances);
1322
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1323
0
        } else {
1324
0
            return Status::InternalError(
1325
0
                    "Jdbc scan node is disabled, you can change be config enable_java_support "
1326
0
                    "to true and restart be.");
1327
0
        }
1328
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1329
0
        break;
1330
0
    }
1331
22.7k
    case TPlanNodeType::FILE_SCAN_NODE: {
1332
22.7k
        op = std::make_shared<FileScanOperatorX>(pool, tnode, next_operator_id(), descs,
1333
22.7k
                                                 _num_instances);
1334
22.7k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1335
22.7k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1336
22.7k
        break;
1337
22.7k
    }
1338
0
    case TPlanNodeType::ES_SCAN_NODE:
1339
592
    case TPlanNodeType::ES_HTTP_SCAN_NODE: {
1340
592
        op = std::make_shared<EsScanOperatorX>(pool, tnode, next_operator_id(), descs,
1341
592
                                               _num_instances);
1342
592
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1343
592
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1344
592
        break;
1345
592
    }
1346
145k
    case TPlanNodeType::EXCHANGE_NODE: {
1347
145k
        int num_senders = _params.per_exch_num_senders.contains(tnode.node_id)
1348
145k
                                  ? _params.per_exch_num_senders.find(tnode.node_id)->second
1349
18.4E
                                  : 0;
1350
145k
        DCHECK_GT(num_senders, 0);
1351
145k
        op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs,
1352
145k
                                                       num_senders);
1353
145k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1354
145k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1355
145k
        break;
1356
145k
    }
1357
157k
    case TPlanNodeType::AGGREGATION_NODE: {
1358
157k
        if (tnode.agg_node.grouping_exprs.empty() &&
1359
157k
            descs.get_tuple_descriptor(tnode.agg_node.output_tuple_id)->slots().empty()) {
1360
0
            return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) +
1361
0
                                         ": group by and output is empty");
1362
0
        }
1363
157k
        bool need_create_cache_op =
1364
157k
                enable_query_cache && tnode.node_id == _params.fragment.query_cache_param.node_id;
1365
157k
        auto create_query_cache_operator = [&](PipelinePtr& new_pipe) {
1366
10
            auto cache_node_id = _params.local_params[0].per_node_scan_ranges.begin()->first;
1367
10
            auto cache_source_id = next_operator_id();
1368
10
            op = std::make_shared<CacheSourceOperatorX>(pool, cache_node_id, cache_source_id,
1369
10
                                                        _params.fragment.query_cache_param);
1370
10
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1371
1372
10
            const auto downstream_pipeline_id = cur_pipe->id();
1373
10
            if (!_dag.contains(downstream_pipeline_id)) {
1374
10
                _dag.insert({downstream_pipeline_id, {}});
1375
10
            }
1376
10
            new_pipe = add_pipeline(cur_pipe);
1377
10
            _dag[downstream_pipeline_id].push_back(new_pipe->id());
1378
1379
10
            DataSinkOperatorPtr cache_sink(new CacheSinkOperatorX(
1380
10
                    next_sink_operator_id(), op->node_id(), op->operator_id()));
1381
10
            RETURN_IF_ERROR(new_pipe->set_sink(cache_sink));
1382
10
            return Status::OK();
1383
10
        };
1384
157k
        const bool group_by_limit_opt =
1385
157k
                tnode.agg_node.__isset.agg_sort_info_by_group_key && tnode.limit > 0;
1386
1387
        /// PartitionedAggSourceOperatorX does not support "group by limit opt(#29641)" yet.
1388
        /// If `group_by_limit_opt` is true, then it might not need to spill at all.
1389
157k
        const bool enable_spill = _runtime_state->enable_spill() &&
1390
157k
                                  !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt;
1391
157k
        const bool is_streaming_agg = tnode.agg_node.__isset.use_streaming_preaggregation &&
1392
157k
                                      tnode.agg_node.use_streaming_preaggregation &&
1393
157k
                                      !tnode.agg_node.grouping_exprs.empty();
1394
        // TODO: distinct streaming agg does not support spill.
1395
157k
        const bool can_use_distinct_streaming_agg =
1396
157k
                (!enable_spill || is_streaming_agg) && tnode.agg_node.aggregate_functions.empty() &&
1397
157k
                !tnode.agg_node.__isset.agg_sort_info_by_group_key &&
1398
157k
                _params.query_options.__isset.enable_distinct_streaming_aggregation &&
1399
157k
                _params.query_options.enable_distinct_streaming_aggregation;
1400
1401
157k
        if (can_use_distinct_streaming_agg) {
1402
91.5k
            if (need_create_cache_op) {
1403
8
                PipelinePtr new_pipe;
1404
8
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1405
1406
8
                cache_op = op;
1407
8
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1408
8
                                                                     tnode, descs);
1409
8
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1410
8
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1411
8
                cur_pipe = new_pipe;
1412
91.4k
            } else {
1413
91.4k
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1414
91.4k
                                                                     tnode, descs);
1415
91.4k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1416
91.4k
            }
1417
91.5k
        } else if (is_streaming_agg) {
1418
3.09k
            if (need_create_cache_op) {
1419
0
                PipelinePtr new_pipe;
1420
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1421
0
                cache_op = op;
1422
0
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1423
0
                                                             descs);
1424
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1425
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1426
0
                cur_pipe = new_pipe;
1427
3.09k
            } else {
1428
3.09k
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1429
3.09k
                                                             descs);
1430
3.09k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1431
3.09k
            }
1432
62.4k
        } else {
1433
            // create new pipeline to add query cache operator
1434
62.4k
            PipelinePtr new_pipe;
1435
62.4k
            if (need_create_cache_op) {
1436
2
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1437
2
                cache_op = op;
1438
2
            }
1439
1440
62.4k
            if (enable_spill) {
1441
182
                op = std::make_shared<PartitionedAggSourceOperatorX>(pool, tnode,
1442
182
                                                                     next_operator_id(), descs);
1443
62.2k
            } else {
1444
62.2k
                op = std::make_shared<AggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1445
62.2k
            }
1446
62.4k
            if (need_create_cache_op) {
1447
2
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1448
2
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1449
2
                cur_pipe = new_pipe;
1450
62.4k
            } else {
1451
62.4k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1452
62.4k
            }
1453
1454
62.4k
            const auto downstream_pipeline_id = cur_pipe->id();
1455
62.4k
            if (!_dag.contains(downstream_pipeline_id)) {
1456
60.1k
                _dag.insert({downstream_pipeline_id, {}});
1457
60.1k
            }
1458
62.4k
            cur_pipe = add_pipeline(cur_pipe);
1459
62.4k
            _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1460
1461
62.4k
            if (enable_spill) {
1462
182
                sink_ops.push_back(std::make_shared<PartitionedAggSinkOperatorX>(
1463
182
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1464
62.2k
            } else {
1465
62.2k
                sink_ops.push_back(std::make_shared<AggSinkOperatorX>(
1466
62.2k
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1467
62.2k
            }
1468
62.4k
            RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1469
62.4k
            RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1470
62.4k
        }
1471
157k
        break;
1472
157k
    }
1473
157k
    case TPlanNodeType::HASH_JOIN_NODE: {
1474
7.68k
        const auto is_broadcast_join = tnode.hash_join_node.__isset.is_broadcast_join &&
1475
7.68k
                                       tnode.hash_join_node.is_broadcast_join;
1476
7.68k
        const auto enable_spill = _runtime_state->enable_spill();
1477
7.68k
        if (enable_spill && !is_broadcast_join) {
1478
0
            auto tnode_ = tnode;
1479
0
            tnode_.runtime_filters.clear();
1480
0
            auto inner_probe_operator =
1481
0
                    std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, descs);
1482
1483
            // probe side inner sink operator is used to build hash table on probe side when data is spilled.
1484
            // So here use `tnode_` which has no runtime filters.
1485
0
            auto probe_side_inner_sink_operator =
1486
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode_, descs);
1487
1488
0
            RETURN_IF_ERROR(inner_probe_operator->init(tnode_, _runtime_state.get()));
1489
0
            RETURN_IF_ERROR(probe_side_inner_sink_operator->init(tnode_, _runtime_state.get()));
1490
1491
0
            auto probe_operator = std::make_shared<PartitionedHashJoinProbeOperatorX>(
1492
0
                    pool, tnode_, next_operator_id(), descs);
1493
0
            probe_operator->set_inner_operators(probe_side_inner_sink_operator,
1494
0
                                                inner_probe_operator);
1495
0
            op = std::move(probe_operator);
1496
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1497
1498
0
            const auto downstream_pipeline_id = cur_pipe->id();
1499
0
            if (!_dag.contains(downstream_pipeline_id)) {
1500
0
                _dag.insert({downstream_pipeline_id, {}});
1501
0
            }
1502
0
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1503
0
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1504
1505
0
            auto inner_sink_operator =
1506
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode, descs);
1507
0
            auto sink_operator = std::make_shared<PartitionedHashJoinSinkOperatorX>(
1508
0
                    pool, next_sink_operator_id(), op->operator_id(), tnode_, descs);
1509
0
            RETURN_IF_ERROR(inner_sink_operator->init(tnode, _runtime_state.get()));
1510
1511
0
            sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator);
1512
0
            sink_ops.push_back(std::move(sink_operator));
1513
0
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1514
0
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode_, _runtime_state.get()));
1515
1516
0
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1517
0
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1518
7.68k
        } else {
1519
7.68k
            op = std::make_shared<HashJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1520
7.68k
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1521
1522
7.68k
            const auto downstream_pipeline_id = cur_pipe->id();
1523
7.68k
            if (!_dag.contains(downstream_pipeline_id)) {
1524
6.89k
                _dag.insert({downstream_pipeline_id, {}});
1525
6.89k
            }
1526
7.68k
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1527
7.68k
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1528
1529
7.68k
            sink_ops.push_back(std::make_shared<HashJoinBuildSinkOperatorX>(
1530
7.68k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1531
7.68k
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1532
7.68k
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1533
1534
7.68k
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1535
7.68k
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1536
7.68k
        }
1537
7.68k
        if (is_broadcast_join && _runtime_state->enable_share_hash_table_for_broadcast_join()) {
1538
3.31k
            std::shared_ptr<HashJoinSharedState> shared_state =
1539
3.31k
                    HashJoinSharedState::create_shared(_num_instances);
1540
22.4k
            for (int i = 0; i < _num_instances; i++) {
1541
19.0k
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1542
19.0k
                                                             "HASH_JOIN_BUILD_DEPENDENCY");
1543
19.0k
                sink_dep->set_shared_state(shared_state.get());
1544
19.0k
                shared_state->sink_deps.push_back(sink_dep);
1545
19.0k
            }
1546
3.31k
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1547
3.31k
                                                     op->node_id(), "HASH_JOIN_PROBE");
1548
3.31k
            _op_id_to_shared_state.insert(
1549
3.31k
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1550
3.31k
        }
1551
7.68k
        break;
1552
7.68k
    }
1553
4.83k
    case TPlanNodeType::CROSS_JOIN_NODE: {
1554
4.83k
        op = std::make_shared<NestedLoopJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1555
4.83k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1556
1557
4.83k
        const auto downstream_pipeline_id = cur_pipe->id();
1558
4.83k
        if (!_dag.contains(downstream_pipeline_id)) {
1559
4.60k
            _dag.insert({downstream_pipeline_id, {}});
1560
4.60k
        }
1561
4.83k
        PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1562
4.83k
        _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1563
1564
4.83k
        sink_ops.push_back(std::make_shared<NestedLoopJoinBuildSinkOperatorX>(
1565
4.83k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1566
4.83k
        RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1567
4.83k
        RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1568
4.83k
        _pipeline_parent_map.push(op->node_id(), cur_pipe);
1569
4.83k
        _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1570
4.83k
        break;
1571
4.83k
    }
1572
53.4k
    case TPlanNodeType::UNION_NODE: {
1573
53.4k
        int child_count = tnode.num_children;
1574
53.4k
        op = std::make_shared<UnionSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1575
53.4k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1576
1577
53.4k
        const auto downstream_pipeline_id = cur_pipe->id();
1578
53.4k
        if (!_dag.contains(downstream_pipeline_id)) {
1579
53.1k
            _dag.insert({downstream_pipeline_id, {}});
1580
53.1k
        }
1581
55.1k
        for (int i = 0; i < child_count; i++) {
1582
1.68k
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1583
1.68k
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1584
1.68k
            sink_ops.push_back(std::make_shared<UnionSinkOperatorX>(
1585
1.68k
                    i, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1586
1.68k
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1587
1.68k
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1588
            // preset children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1589
1.68k
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1590
1.68k
        }
1591
53.4k
        break;
1592
53.4k
    }
1593
53.4k
    case TPlanNodeType::SORT_NODE: {
1594
43.8k
        const auto should_spill = _runtime_state->enable_spill() &&
1595
43.8k
                                  tnode.sort_node.algorithm == TSortAlgorithm::FULL_SORT;
1596
43.8k
        const bool use_local_merge =
1597
43.8k
                tnode.sort_node.__isset.use_local_merge && tnode.sort_node.use_local_merge;
1598
43.8k
        if (should_spill) {
1599
9
            op = std::make_shared<SpillSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1600
43.8k
        } else if (use_local_merge) {
1601
41.5k
            op = std::make_shared<LocalMergeSortSourceOperatorX>(pool, tnode, next_operator_id(),
1602
41.5k
                                                                 descs);
1603
41.5k
        } else {
1604
2.30k
            op = std::make_shared<SortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1605
2.30k
        }
1606
43.8k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1607
1608
43.8k
        const auto downstream_pipeline_id = cur_pipe->id();
1609
43.8k
        if (!_dag.contains(downstream_pipeline_id)) {
1610
43.8k
            _dag.insert({downstream_pipeline_id, {}});
1611
43.8k
        }
1612
43.8k
        cur_pipe = add_pipeline(cur_pipe);
1613
43.8k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1614
1615
43.8k
        if (should_spill) {
1616
9
            sink_ops.push_back(std::make_shared<SpillSortSinkOperatorX>(
1617
9
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1618
43.8k
        } else {
1619
43.8k
            sink_ops.push_back(std::make_shared<SortSinkOperatorX>(
1620
43.8k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1621
43.8k
        }
1622
43.8k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1623
43.8k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1624
43.8k
        break;
1625
43.8k
    }
1626
43.8k
    case TPlanNodeType::PARTITION_SORT_NODE: {
1627
62
        op = std::make_shared<PartitionSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1628
62
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1629
1630
62
        const auto downstream_pipeline_id = cur_pipe->id();
1631
62
        if (!_dag.contains(downstream_pipeline_id)) {
1632
62
            _dag.insert({downstream_pipeline_id, {}});
1633
62
        }
1634
62
        cur_pipe = add_pipeline(cur_pipe);
1635
62
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1636
1637
62
        sink_ops.push_back(std::make_shared<PartitionSortSinkOperatorX>(
1638
62
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1639
62
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1640
62
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1641
62
        break;
1642
62
    }
1643
1.64k
    case TPlanNodeType::ANALYTIC_EVAL_NODE: {
1644
1.64k
        op = std::make_shared<AnalyticSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1645
1.64k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1646
1647
1.64k
        const auto downstream_pipeline_id = cur_pipe->id();
1648
1.64k
        if (!_dag.contains(downstream_pipeline_id)) {
1649
1.63k
            _dag.insert({downstream_pipeline_id, {}});
1650
1.63k
        }
1651
1.64k
        cur_pipe = add_pipeline(cur_pipe);
1652
1.64k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1653
1654
1.64k
        sink_ops.push_back(std::make_shared<AnalyticSinkOperatorX>(
1655
1.64k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1656
1.64k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1657
1.64k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1658
1.64k
        break;
1659
1.64k
    }
1660
1.64k
    case TPlanNodeType::MATERIALIZATION_NODE: {
1661
1.61k
        op = std::make_shared<MaterializationOperator>(pool, tnode, next_operator_id(), descs);
1662
1.61k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1663
1.61k
        break;
1664
1.61k
    }
1665
1.61k
    case TPlanNodeType::INTERSECT_NODE: {
1666
126
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(pool, tnode, descs, op,
1667
126
                                                                      cur_pipe, sink_ops));
1668
126
        break;
1669
126
    }
1670
129
    case TPlanNodeType::EXCEPT_NODE: {
1671
129
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(pool, tnode, descs, op,
1672
129
                                                                       cur_pipe, sink_ops));
1673
129
        break;
1674
129
    }
1675
295
    case TPlanNodeType::REPEAT_NODE: {
1676
295
        op = std::make_shared<RepeatOperatorX>(pool, tnode, next_operator_id(), descs);
1677
295
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1678
295
        break;
1679
295
    }
1680
921
    case TPlanNodeType::TABLE_FUNCTION_NODE: {
1681
921
        op = std::make_shared<TableFunctionOperatorX>(pool, tnode, next_operator_id(), descs);
1682
921
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1683
921
        break;
1684
921
    }
1685
921
    case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
1686
218
        op = std::make_shared<AssertNumRowsOperatorX>(pool, tnode, next_operator_id(), descs);
1687
218
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1688
218
        break;
1689
218
    }
1690
1.58k
    case TPlanNodeType::EMPTY_SET_NODE: {
1691
1.58k
        op = std::make_shared<EmptySetSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1692
1.58k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1693
1.58k
        break;
1694
1.58k
    }
1695
1.58k
    case TPlanNodeType::DATA_GEN_SCAN_NODE: {
1696
459
        op = std::make_shared<DataGenSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1697
459
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1698
459
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1699
459
        break;
1700
459
    }
1701
2.32k
    case TPlanNodeType::SCHEMA_SCAN_NODE: {
1702
2.32k
        op = std::make_shared<SchemaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1703
2.32k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1704
2.32k
        break;
1705
2.32k
    }
1706
6.26k
    case TPlanNodeType::META_SCAN_NODE: {
1707
6.26k
        op = std::make_shared<MetaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1708
6.26k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1709
6.26k
        break;
1710
6.26k
    }
1711
6.26k
    case TPlanNodeType::SELECT_NODE: {
1712
1.82k
        op = std::make_shared<SelectOperatorX>(pool, tnode, next_operator_id(), descs);
1713
1.82k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1714
1.82k
        break;
1715
1.82k
    }
1716
1.82k
    case TPlanNodeType::REC_CTE_NODE: {
1717
151
        op = std::make_shared<RecCTESourceOperatorX>(pool, tnode, next_operator_id(), descs);
1718
151
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1719
1720
151
        const auto downstream_pipeline_id = cur_pipe->id();
1721
151
        if (!_dag.contains(downstream_pipeline_id)) {
1722
148
            _dag.insert({downstream_pipeline_id, {}});
1723
148
        }
1724
1725
151
        PipelinePtr anchor_side_pipe = add_pipeline(cur_pipe);
1726
151
        _dag[downstream_pipeline_id].push_back(anchor_side_pipe->id());
1727
1728
151
        DataSinkOperatorPtr anchor_sink;
1729
151
        anchor_sink = std::make_shared<RecCTEAnchorSinkOperatorX>(next_sink_operator_id(),
1730
151
                                                                  op->operator_id(), tnode, descs);
1731
151
        RETURN_IF_ERROR(anchor_side_pipe->set_sink(anchor_sink));
1732
151
        RETURN_IF_ERROR(anchor_side_pipe->sink()->init(tnode, _runtime_state.get()));
1733
151
        _pipeline_parent_map.push(op->node_id(), anchor_side_pipe);
1734
1735
151
        PipelinePtr rec_side_pipe = add_pipeline(cur_pipe);
1736
151
        _dag[downstream_pipeline_id].push_back(rec_side_pipe->id());
1737
1738
151
        DataSinkOperatorPtr rec_sink;
1739
151
        rec_sink = std::make_shared<RecCTESinkOperatorX>(next_sink_operator_id(), op->operator_id(),
1740
151
                                                         tnode, descs);
1741
151
        RETURN_IF_ERROR(rec_side_pipe->set_sink(rec_sink));
1742
151
        RETURN_IF_ERROR(rec_side_pipe->sink()->init(tnode, _runtime_state.get()));
1743
151
        _pipeline_parent_map.push(op->node_id(), rec_side_pipe);
1744
1745
151
        break;
1746
151
    }
1747
1.95k
    case TPlanNodeType::REC_CTE_SCAN_NODE: {
1748
1.95k
        op = std::make_shared<RecCTEScanOperatorX>(pool, tnode, next_operator_id(), descs);
1749
1.95k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1750
1.95k
        break;
1751
1.95k
    }
1752
1.95k
    default:
1753
0
        return Status::InternalError("Unsupported exec type in pipeline: {}",
1754
0
                                     print_plan_node_type(tnode.node_type));
1755
663k
    }
1756
662k
    if (_params.__isset.parallel_instances && fe_with_old_version) {
1757
0
        cur_pipe->set_num_tasks(_params.parallel_instances);
1758
0
        op->set_serial_operator();
1759
0
    }
1760
1761
662k
    return Status::OK();
1762
663k
}
1763
// NOLINTEND(readability-function-cognitive-complexity)
1764
// NOLINTEND(readability-function-size)
1765
1766
template <bool is_intersect>
1767
Status PipelineFragmentContext::_build_operators_for_set_operation_node(
1768
        ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op,
1769
255
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1770
255
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1771
255
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1772
1773
255
    const auto downstream_pipeline_id = cur_pipe->id();
1774
255
    if (!_dag.contains(downstream_pipeline_id)) {
1775
230
        _dag.insert({downstream_pipeline_id, {}});
1776
230
    }
1777
1778
858
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1779
603
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1780
603
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1781
1782
603
        if (child_id == 0) {
1783
255
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1784
255
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1785
348
        } else {
1786
348
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1787
348
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1788
348
        }
1789
603
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1790
603
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1791
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1792
603
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1793
603
    }
1794
1795
255
    return Status::OK();
1796
255
}
_ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb1EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Line
Count
Source
1769
126
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1770
126
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1771
126
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1772
1773
126
    const auto downstream_pipeline_id = cur_pipe->id();
1774
126
    if (!_dag.contains(downstream_pipeline_id)) {
1775
110
        _dag.insert({downstream_pipeline_id, {}});
1776
110
    }
1777
1778
456
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1779
330
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1780
330
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1781
1782
330
        if (child_id == 0) {
1783
126
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1784
126
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1785
204
        } else {
1786
204
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1787
204
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1788
204
        }
1789
330
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1790
330
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1791
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1792
330
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1793
330
    }
1794
1795
126
    return Status::OK();
1796
126
}
_ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb0EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Line
Count
Source
1769
129
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1770
129
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1771
129
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1772
1773
129
    const auto downstream_pipeline_id = cur_pipe->id();
1774
129
    if (!_dag.contains(downstream_pipeline_id)) {
1775
120
        _dag.insert({downstream_pipeline_id, {}});
1776
120
    }
1777
1778
402
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1779
273
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1780
273
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1781
1782
273
        if (child_id == 0) {
1783
129
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1784
129
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1785
144
        } else {
1786
144
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1787
144
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1788
144
        }
1789
273
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1790
273
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1791
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1792
273
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1793
273
    }
1794
1795
129
    return Status::OK();
1796
129
}
1797
1798
427k
Status PipelineFragmentContext::submit() {
1799
427k
    if (_submitted) {
1800
0
        return Status::InternalError("submitted");
1801
0
    }
1802
427k
    _submitted = true;
1803
1804
427k
    int submit_tasks = 0;
1805
427k
    Status st;
1806
427k
    auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
1807
1.09M
    for (auto& task : _tasks) {
1808
1.85M
        for (auto& t : task) {
1809
1.85M
            st = scheduler->submit(t.first);
1810
1.85M
            DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed",
1811
1.85M
                            { st = Status::Aborted("PipelineFragmentContext.submit.failed"); });
1812
1.85M
            if (!st) {
1813
0
                cancel(Status::InternalError("submit context to executor fail"));
1814
0
                std::lock_guard<std::mutex> l(_task_mutex);
1815
0
                _total_tasks = submit_tasks;
1816
0
                break;
1817
0
            }
1818
1.85M
            submit_tasks++;
1819
1.85M
        }
1820
1.09M
    }
1821
427k
    if (!st.ok()) {
1822
0
        bool need_remove = false;
1823
0
        {
1824
0
            std::lock_guard<std::mutex> l(_task_mutex);
1825
0
            if (_closed_tasks >= _total_tasks) {
1826
0
                need_remove = _close_fragment_instance();
1827
0
            }
1828
0
        }
1829
        // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
1830
0
        if (need_remove) {
1831
0
            _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
1832
0
        }
1833
0
        return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(),
1834
0
                                     BackendOptions::get_localhost());
1835
427k
    } else {
1836
427k
        return st;
1837
427k
    }
1838
427k
}
1839
1840
0
void PipelineFragmentContext::print_profile(const std::string& extra_info) {
1841
0
    if (_runtime_state->enable_profile()) {
1842
0
        std::stringstream ss;
1843
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
1844
0
            runtime_profile_ptr->pretty_print(&ss);
1845
0
        }
1846
1847
0
        if (_runtime_state->load_channel_profile()) {
1848
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
1849
0
        }
1850
1851
0
        auto profile_str =
1852
0
                fmt::format("Query {} fragment {} {}, profile, {}", print_id(this->_query_id),
1853
0
                            this->_fragment_id, extra_info, ss.str());
1854
0
        LOG_LONG_STRING(INFO, profile_str);
1855
0
    }
1856
0
}
1857
// If all pipeline tasks binded to the fragment instance are finished, then we could
1858
// close the fragment instance.
1859
// Returns true if the caller should call remove_pipeline_context() **after** releasing
1860
// _task_mutex. We must not call remove_pipeline_context() here because it acquires
1861
// _pipeline_map's shard lock, and this function is called while _task_mutex is held.
1862
// Acquiring _pipeline_map while holding _task_mutex creates an ABBA deadlock with
1863
// dump_pipeline_tasks(), which acquires _pipeline_map first and then _task_mutex
1864
// (via debug_string()).
1865
429k
bool PipelineFragmentContext::_close_fragment_instance() {
1866
429k
    if (_is_fragment_instance_closed) {
1867
0
        return false;
1868
0
    }
1869
429k
    Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
1870
429k
    _fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
1871
429k
    if (!_need_notify_close) {
1872
426k
        auto st = send_report(true);
1873
426k
        if (!st) {
1874
0
            LOG(WARNING) << fmt::format("Failed to send report for query {}, fragment {}: {}",
1875
0
                                        print_id(_query_id), _fragment_id, st.to_string());
1876
0
        }
1877
426k
    }
1878
    // Print profile content in info log is a tempoeray solution for stream load and external_connector.
1879
    // Since stream load does not have someting like coordinator on FE, so
1880
    // backend can not report profile to FE, ant its profile can not be shown
1881
    // in the same way with other query. So we print the profile content to info log.
1882
1883
429k
    if (_runtime_state->enable_profile() &&
1884
429k
        (_query_ctx->get_query_source() == QuerySource::STREAM_LOAD ||
1885
2.23k
         _query_ctx->get_query_source() == QuerySource::EXTERNAL_CONNECTOR ||
1886
2.23k
         _query_ctx->get_query_source() == QuerySource::GROUP_COMMIT_LOAD)) {
1887
0
        std::stringstream ss;
1888
        // Compute the _local_time_percent before pretty_print the runtime_profile
1889
        // Before add this operation, the print out like that:
1890
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 00.00%)
1891
        // After add the operation, the print out like that:
1892
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 82.53%)
1893
        // We can easily know the exec node execute time without child time consumed.
1894
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
1895
0
            runtime_profile_ptr->pretty_print(&ss);
1896
0
        }
1897
1898
0
        if (_runtime_state->load_channel_profile()) {
1899
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
1900
0
        }
1901
1902
0
        LOG_INFO("Query {} fragment {} profile:\n {}", print_id(_query_id), _fragment_id, ss.str());
1903
0
    }
1904
1905
429k
    if (_query_ctx->enable_profile()) {
1906
2.23k
        _query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile(),
1907
2.23k
                                         collect_realtime_load_channel_profile());
1908
2.23k
    }
1909
1910
    // Return whether the caller needs to remove from the pipeline map.
1911
    // The caller must do this after releasing _task_mutex.
1912
429k
    return !_need_notify_close;
1913
429k
}
1914
1915
1.84M
void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
1916
    // If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here
1917
1.84M
    DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
1918
1.84M
    if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
1919
656k
        if (_dag.contains(pipeline_id)) {
1920
324k
            for (auto dep : _dag[pipeline_id]) {
1921
324k
                _pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id);
1922
324k
            }
1923
270k
        }
1924
656k
    }
1925
1.84M
    bool need_remove = false;
1926
1.84M
    {
1927
1.84M
        std::lock_guard<std::mutex> l(_task_mutex);
1928
1.84M
        ++_closed_tasks;
1929
1.84M
        if (_closed_tasks >= _total_tasks) {
1930
429k
            need_remove = _close_fragment_instance();
1931
429k
        }
1932
1.84M
    }
1933
    // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
1934
1.84M
    if (need_remove) {
1935
426k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
1936
426k
    }
1937
1.84M
}
1938
1939
52.8k
std::string PipelineFragmentContext::get_load_error_url() {
1940
52.8k
    if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) {
1941
0
        return to_load_error_http_path(str);
1942
0
    }
1943
141k
    for (auto& tasks : _tasks) {
1944
223k
        for (auto& task : tasks) {
1945
223k
            if (const auto& str = task.second->get_error_log_file_path(); !str.empty()) {
1946
171
                return to_load_error_http_path(str);
1947
171
            }
1948
223k
        }
1949
141k
    }
1950
52.6k
    return "";
1951
52.8k
}
1952
1953
52.8k
std::string PipelineFragmentContext::get_first_error_msg() {
1954
52.8k
    if (const auto& str = _runtime_state->get_first_error_msg(); !str.empty()) {
1955
0
        return str;
1956
0
    }
1957
141k
    for (auto& tasks : _tasks) {
1958
223k
        for (auto& task : tasks) {
1959
223k
            if (const auto& str = task.second->get_first_error_msg(); !str.empty()) {
1960
171
                return str;
1961
171
            }
1962
223k
        }
1963
141k
    }
1964
52.6k
    return "";
1965
52.8k
}
1966
1967
0
std::string PipelineFragmentContext::_to_http_path(const std::string& file_name) const {
1968
0
    std::stringstream url;
1969
0
    url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
1970
0
        << "/api/_download_load?"
1971
0
        << "token=" << _exec_env->token() << "&file=" << file_name;
1972
0
    return url.str();
1973
0
}
1974
1975
47.1k
void PipelineFragmentContext::_coordinator_callback(const ReportStatusRequest& req) {
1976
47.1k
    DBUG_EXECUTE_IF("FragmentMgr::coordinator_callback.report_delay", {
1977
47.1k
        int random_seconds = req.status.is<ErrorCode::DATA_QUALITY_ERROR>() ? 8 : 2;
1978
47.1k
        LOG_INFO("sleep : ").tag("time", random_seconds).tag("query_id", print_id(req.query_id));
1979
47.1k
        std::this_thread::sleep_for(std::chrono::seconds(random_seconds));
1980
47.1k
        LOG_INFO("sleep done").tag("query_id", print_id(req.query_id));
1981
47.1k
    });
1982
1983
47.1k
    DCHECK(req.status.ok() || req.done); // if !status.ok() => done
1984
47.1k
    if (req.coord_addr.hostname == "external") {
1985
        // External query (flink/spark read tablets) not need to report to FE.
1986
0
        return;
1987
0
    }
1988
47.1k
    int callback_retries = 10;
1989
47.1k
    const int sleep_ms = 1000;
1990
47.1k
    Status exec_status = req.status;
1991
47.1k
    Status coord_status;
1992
47.1k
    std::unique_ptr<FrontendServiceConnection> coord = nullptr;
1993
47.1k
    do {
1994
47.1k
        coord = std::make_unique<FrontendServiceConnection>(_exec_env->frontend_client_cache(),
1995
47.1k
                                                            req.coord_addr, &coord_status);
1996
47.1k
        if (!coord_status.ok()) {
1997
0
            std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
1998
0
        }
1999
47.1k
    } while (!coord_status.ok() && callback_retries-- > 0);
2000
2001
47.1k
    if (!coord_status.ok()) {
2002
0
        UniqueId uid(req.query_id.hi, req.query_id.lo);
2003
0
        static_cast<void>(req.cancel_fn(Status::InternalError(
2004
0
                "query_id: {}, couldn't get a client for {}, reason is {}", uid.to_string(),
2005
0
                PrintThriftNetworkAddress(req.coord_addr), coord_status.to_string())));
2006
0
        return;
2007
0
    }
2008
2009
47.1k
    TReportExecStatusParams params;
2010
47.1k
    params.protocol_version = FrontendServiceVersion::V1;
2011
47.1k
    params.__set_query_id(req.query_id);
2012
47.1k
    params.__set_backend_num(req.backend_num);
2013
47.1k
    params.__set_fragment_instance_id(req.fragment_instance_id);
2014
47.1k
    params.__set_fragment_id(req.fragment_id);
2015
47.1k
    params.__set_status(exec_status.to_thrift());
2016
47.1k
    params.__set_done(req.done);
2017
47.1k
    params.__set_query_type(req.runtime_state->query_type());
2018
47.1k
    params.__isset.profile = false;
2019
2020
47.1k
    DCHECK(req.runtime_state != nullptr);
2021
2022
47.1k
    if (req.runtime_state->query_type() == TQueryType::LOAD) {
2023
42.8k
        params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
2024
42.8k
        params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
2025
42.8k
    } else {
2026
4.28k
        DCHECK(!req.runtime_states.empty());
2027
4.28k
        if (!req.runtime_state->output_files().empty()) {
2028
0
            params.__isset.delta_urls = true;
2029
0
            for (auto& it : req.runtime_state->output_files()) {
2030
0
                params.delta_urls.push_back(_to_http_path(it));
2031
0
            }
2032
0
        }
2033
4.28k
        if (!params.delta_urls.empty()) {
2034
0
            params.__isset.delta_urls = true;
2035
0
        }
2036
4.28k
    }
2037
2038
47.1k
    static std::string s_dpp_normal_all = "dpp.norm.ALL";
2039
47.1k
    static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
2040
47.1k
    static std::string s_unselected_rows = "unselected.rows";
2041
47.1k
    int64_t num_rows_load_success = 0;
2042
47.1k
    int64_t num_rows_load_filtered = 0;
2043
47.1k
    int64_t num_rows_load_unselected = 0;
2044
47.1k
    if (req.runtime_state->num_rows_load_total() > 0 ||
2045
47.1k
        req.runtime_state->num_rows_load_filtered() > 0 ||
2046
47.1k
        req.runtime_state->num_finished_range() > 0) {
2047
0
        params.__isset.load_counters = true;
2048
2049
0
        num_rows_load_success = req.runtime_state->num_rows_load_success();
2050
0
        num_rows_load_filtered = req.runtime_state->num_rows_load_filtered();
2051
0
        num_rows_load_unselected = req.runtime_state->num_rows_load_unselected();
2052
0
        params.__isset.fragment_instance_reports = true;
2053
0
        TFragmentInstanceReport t;
2054
0
        t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id());
2055
0
        t.__set_num_finished_range(cast_set<int>(req.runtime_state->num_finished_range()));
2056
0
        t.__set_loaded_rows(req.runtime_state->num_rows_load_total());
2057
0
        t.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
2058
0
        params.fragment_instance_reports.push_back(t);
2059
47.1k
    } else if (!req.runtime_states.empty()) {
2060
169k
        for (auto* rs : req.runtime_states) {
2061
169k
            if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 ||
2062
169k
                rs->num_finished_range() > 0) {
2063
35.0k
                params.__isset.load_counters = true;
2064
35.0k
                num_rows_load_success += rs->num_rows_load_success();
2065
35.0k
                num_rows_load_filtered += rs->num_rows_load_filtered();
2066
35.0k
                num_rows_load_unselected += rs->num_rows_load_unselected();
2067
35.0k
                params.__isset.fragment_instance_reports = true;
2068
35.0k
                TFragmentInstanceReport t;
2069
35.0k
                t.__set_fragment_instance_id(rs->fragment_instance_id());
2070
35.0k
                t.__set_num_finished_range(cast_set<int>(rs->num_finished_range()));
2071
35.0k
                t.__set_loaded_rows(rs->num_rows_load_total());
2072
35.0k
                t.__set_loaded_bytes(rs->num_bytes_load_total());
2073
35.0k
                params.fragment_instance_reports.push_back(t);
2074
35.0k
            }
2075
169k
        }
2076
47.1k
    }
2077
47.1k
    params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success));
2078
47.1k
    params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered));
2079
47.1k
    params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected));
2080
2081
47.1k
    if (!req.load_error_url.empty()) {
2082
156
        params.__set_tracking_url(req.load_error_url);
2083
156
    }
2084
47.1k
    if (!req.first_error_msg.empty()) {
2085
156
        params.__set_first_error_msg(req.first_error_msg);
2086
156
    }
2087
169k
    for (auto* rs : req.runtime_states) {
2088
169k
        if (rs->wal_id() > 0) {
2089
109
            params.__set_txn_id(rs->wal_id());
2090
109
            params.__set_label(rs->import_label());
2091
109
        }
2092
169k
    }
2093
47.1k
    if (!req.runtime_state->export_output_files().empty()) {
2094
0
        params.__isset.export_files = true;
2095
0
        params.export_files = req.runtime_state->export_output_files();
2096
47.1k
    } else if (!req.runtime_states.empty()) {
2097
169k
        for (auto* rs : req.runtime_states) {
2098
169k
            if (!rs->export_output_files().empty()) {
2099
0
                params.__isset.export_files = true;
2100
0
                params.export_files.insert(params.export_files.end(),
2101
0
                                           rs->export_output_files().begin(),
2102
0
                                           rs->export_output_files().end());
2103
0
            }
2104
169k
        }
2105
47.1k
    }
2106
47.1k
    if (auto tci = req.runtime_state->tablet_commit_infos(); !tci.empty()) {
2107
0
        params.__isset.commitInfos = true;
2108
0
        params.commitInfos.insert(params.commitInfos.end(), tci.begin(), tci.end());
2109
47.1k
    } else if (!req.runtime_states.empty()) {
2110
169k
        for (auto* rs : req.runtime_states) {
2111
169k
            if (auto rs_tci = rs->tablet_commit_infos(); !rs_tci.empty()) {
2112
27.0k
                params.__isset.commitInfos = true;
2113
27.0k
                params.commitInfos.insert(params.commitInfos.end(), rs_tci.begin(), rs_tci.end());
2114
27.0k
            }
2115
169k
        }
2116
47.1k
    }
2117
47.1k
    if (auto eti = req.runtime_state->error_tablet_infos(); !eti.empty()) {
2118
0
        params.__isset.errorTabletInfos = true;
2119
0
        params.errorTabletInfos.insert(params.errorTabletInfos.end(), eti.begin(), eti.end());
2120
47.1k
    } else if (!req.runtime_states.empty()) {
2121
169k
        for (auto* rs : req.runtime_states) {
2122
169k
            if (auto rs_eti = rs->error_tablet_infos(); !rs_eti.empty()) {
2123
0
                params.__isset.errorTabletInfos = true;
2124
0
                params.errorTabletInfos.insert(params.errorTabletInfos.end(), rs_eti.begin(),
2125
0
                                               rs_eti.end());
2126
0
            }
2127
169k
        }
2128
47.1k
    }
2129
47.1k
    if (auto hpu = req.runtime_state->hive_partition_updates(); !hpu.empty()) {
2130
0
        params.__isset.hive_partition_updates = true;
2131
0
        params.hive_partition_updates.insert(params.hive_partition_updates.end(), hpu.begin(),
2132
0
                                             hpu.end());
2133
47.1k
    } else if (!req.runtime_states.empty()) {
2134
169k
        for (auto* rs : req.runtime_states) {
2135
169k
            if (auto rs_hpu = rs->hive_partition_updates(); !rs_hpu.empty()) {
2136
1.45k
                params.__isset.hive_partition_updates = true;
2137
1.45k
                params.hive_partition_updates.insert(params.hive_partition_updates.end(),
2138
1.45k
                                                     rs_hpu.begin(), rs_hpu.end());
2139
1.45k
            }
2140
169k
        }
2141
47.1k
    }
2142
47.1k
    if (auto icd = req.runtime_state->iceberg_commit_datas(); !icd.empty()) {
2143
0
        params.__isset.iceberg_commit_datas = true;
2144
0
        params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(), icd.begin(),
2145
0
                                           icd.end());
2146
47.1k
    } else if (!req.runtime_states.empty()) {
2147
169k
        for (auto* rs : req.runtime_states) {
2148
169k
            if (auto rs_icd = rs->iceberg_commit_datas(); !rs_icd.empty()) {
2149
2.05k
                params.__isset.iceberg_commit_datas = true;
2150
2.05k
                params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(),
2151
2.05k
                                                   rs_icd.begin(), rs_icd.end());
2152
2.05k
            }
2153
169k
        }
2154
47.1k
    }
2155
2156
47.1k
    if (auto mcd = req.runtime_state->mc_commit_datas(); !mcd.empty()) {
2157
0
        params.__isset.mc_commit_datas = true;
2158
0
        params.mc_commit_datas.insert(params.mc_commit_datas.end(), mcd.begin(), mcd.end());
2159
47.1k
    } else if (!req.runtime_states.empty()) {
2160
169k
        for (auto* rs : req.runtime_states) {
2161
169k
            if (auto rs_mcd = rs->mc_commit_datas(); !rs_mcd.empty()) {
2162
0
                params.__isset.mc_commit_datas = true;
2163
0
                params.mc_commit_datas.insert(params.mc_commit_datas.end(), rs_mcd.begin(),
2164
0
                                              rs_mcd.end());
2165
0
            }
2166
169k
        }
2167
47.1k
    }
2168
2169
47.1k
    req.runtime_state->get_unreported_errors(&(params.error_log));
2170
47.1k
    params.__isset.error_log = (!params.error_log.empty());
2171
2172
47.1k
    if (_exec_env->cluster_info()->backend_id != 0) {
2173
47.1k
        params.__set_backend_id(_exec_env->cluster_info()->backend_id);
2174
47.1k
    }
2175
2176
47.1k
    TReportExecStatusResult res;
2177
47.1k
    Status rpc_status;
2178
2179
47.1k
    VLOG_DEBUG << "reportExecStatus params is "
2180
13
               << apache::thrift::ThriftDebugString(params).c_str();
2181
47.1k
    if (!exec_status.ok()) {
2182
1.62k
        LOG(WARNING) << "report error status: " << exec_status.msg()
2183
1.62k
                     << " to coordinator: " << req.coord_addr
2184
1.62k
                     << ", query id: " << print_id(req.query_id);
2185
1.62k
    }
2186
47.1k
    try {
2187
47.1k
        try {
2188
47.1k
            (*coord)->reportExecStatus(res, params);
2189
47.1k
        } catch ([[maybe_unused]] apache::thrift::transport::TTransportException& e) {
2190
#ifndef ADDRESS_SANITIZER
2191
            LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(req.query_id)
2192
                         << ", instance id: " << print_id(req.fragment_instance_id) << " to "
2193
                         << req.coord_addr << ", err: " << e.what();
2194
#endif
2195
0
            rpc_status = coord->reopen();
2196
2197
0
            if (!rpc_status.ok()) {
2198
0
                req.cancel_fn(rpc_status);
2199
0
                return;
2200
0
            }
2201
0
            (*coord)->reportExecStatus(res, params);
2202
0
        }
2203
2204
47.1k
        rpc_status = Status::create<false>(res.status);
2205
47.1k
    } catch (apache::thrift::TException& e) {
2206
0
        rpc_status = Status::InternalError("ReportExecStatus() to {} failed: {}",
2207
0
                                           PrintThriftNetworkAddress(req.coord_addr), e.what());
2208
0
    }
2209
2210
47.1k
    if (!rpc_status.ok()) {
2211
0
        LOG_INFO("Going to cancel query {} since report exec status got rpc failed: {}",
2212
0
                 print_id(req.query_id), rpc_status.to_string());
2213
0
        req.cancel_fn(rpc_status);
2214
0
    }
2215
47.1k
}
2216
2217
431k
Status PipelineFragmentContext::send_report(bool done) {
2218
431k
    Status exec_status = _query_ctx->exec_status();
2219
    // If plan is done successfully, but _is_report_success is false,
2220
    // no need to send report.
2221
    // Load will set _is_report_success to true because load wants to know
2222
    // the process.
2223
431k
    if (!_is_report_success && done && exec_status.ok()) {
2224
383k
        return Status::OK();
2225
383k
    }
2226
2227
    // If both _is_report_success and _is_report_on_cancel are false,
2228
    // which means no matter query is success or failed, no report is needed.
2229
    // This may happen when the query limit reached and
2230
    // a internal cancellation being processed
2231
    // When limit is reached the fragment is also cancelled, but _is_report_on_cancel will
2232
    // be set to false, to avoid sending fault report to FE.
2233
47.5k
    if (!_is_report_success && !_is_report_on_cancel) {
2234
384
        if (done) {
2235
            // if done is true, which means the query is finished successfully, we can safely close the fragment instance without sending report to FE, and just return OK status here.
2236
384
            return Status::OK();
2237
384
        }
2238
0
        return Status::NeedSendAgain("");
2239
384
    }
2240
2241
47.1k
    std::vector<RuntimeState*> runtime_states;
2242
2243
116k
    for (auto& tasks : _tasks) {
2244
169k
        for (auto& task : tasks) {
2245
169k
            runtime_states.push_back(task.second.get());
2246
169k
        }
2247
116k
    }
2248
2249
47.1k
    std::string load_eror_url = _query_ctx->get_load_error_url().empty()
2250
47.1k
                                        ? get_load_error_url()
2251
18.4E
                                        : _query_ctx->get_load_error_url();
2252
47.1k
    std::string first_error_msg = _query_ctx->get_first_error_msg().empty()
2253
47.1k
                                          ? get_first_error_msg()
2254
18.4E
                                          : _query_ctx->get_first_error_msg();
2255
2256
47.1k
    ReportStatusRequest req {.status = exec_status,
2257
47.1k
                             .runtime_states = runtime_states,
2258
47.1k
                             .done = done || !exec_status.ok(),
2259
47.1k
                             .coord_addr = _query_ctx->coord_addr,
2260
47.1k
                             .query_id = _query_id,
2261
47.1k
                             .fragment_id = _fragment_id,
2262
47.1k
                             .fragment_instance_id = TUniqueId(),
2263
47.1k
                             .backend_num = -1,
2264
47.1k
                             .runtime_state = _runtime_state.get(),
2265
47.1k
                             .load_error_url = load_eror_url,
2266
47.1k
                             .first_error_msg = first_error_msg,
2267
47.1k
                             .cancel_fn = [this](const Status& reason) { cancel(reason); }};
2268
47.1k
    auto ctx = std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this());
2269
47.1k
    return _exec_env->fragment_mgr()->get_thread_pool()->submit_func([this, req, ctx]() {
2270
47.1k
        SCOPED_ATTACH_TASK(ctx->get_query_ctx()->query_mem_tracker());
2271
47.1k
        _coordinator_callback(req);
2272
47.1k
        if (!req.done) {
2273
4.82k
            ctx->refresh_next_report_time();
2274
4.82k
        }
2275
47.1k
    });
2276
47.5k
}
2277
2278
0
size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const {
2279
0
    size_t res = 0;
2280
    // _tasks will be cleared during ~PipelineFragmentContext, so that it's safe
2281
    // here to traverse the vector.
2282
0
    for (const auto& task_instances : _tasks) {
2283
0
        for (const auto& task : task_instances) {
2284
0
            if (task.first->is_running()) {
2285
0
                LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id)
2286
0
                                      << " is running, task: " << (void*)task.first.get()
2287
0
                                      << ", is_running: " << task.first->is_running();
2288
0
                *has_running_task = true;
2289
0
                return 0;
2290
0
            }
2291
2292
0
            size_t revocable_size = task.first->get_revocable_size();
2293
0
            if (revocable_size >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2294
0
                res += revocable_size;
2295
0
            }
2296
0
        }
2297
0
    }
2298
0
    return res;
2299
0
}
2300
2301
0
std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const {
2302
0
    std::vector<PipelineTask*> revocable_tasks;
2303
0
    for (const auto& task_instances : _tasks) {
2304
0
        for (const auto& task : task_instances) {
2305
0
            size_t revocable_size_ = task.first->get_revocable_size();
2306
2307
0
            if (revocable_size_ >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2308
0
                revocable_tasks.emplace_back(task.first.get());
2309
0
            }
2310
0
        }
2311
0
    }
2312
0
    return revocable_tasks;
2313
0
}
2314
2315
106
std::string PipelineFragmentContext::debug_string() {
2316
106
    std::lock_guard<std::mutex> l(_task_mutex);
2317
106
    fmt::memory_buffer debug_string_buffer;
2318
106
    fmt::format_to(debug_string_buffer,
2319
106
                   "PipelineFragmentContext Info: _closed_tasks={}, _total_tasks={}, "
2320
106
                   "need_notify_close={}, fragment_id={}, _rec_cte_stage={}\n",
2321
106
                   _closed_tasks, _total_tasks, _need_notify_close, _fragment_id, _rec_cte_stage);
2322
300
    for (size_t j = 0; j < _tasks.size(); j++) {
2323
194
        fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
2324
631
        for (size_t i = 0; i < _tasks[j].size(); i++) {
2325
437
            fmt::format_to(debug_string_buffer, "Task {}: {}\n", i,
2326
437
                           _tasks[j][i].first->debug_string());
2327
437
        }
2328
194
    }
2329
2330
106
    return fmt::to_string(debug_string_buffer);
2331
106
}
2332
2333
std::vector<std::shared_ptr<TRuntimeProfileTree>>
2334
2.23k
PipelineFragmentContext::collect_realtime_profile() const {
2335
2.23k
    std::vector<std::shared_ptr<TRuntimeProfileTree>> res;
2336
2337
    // we do not have mutex to protect pipeline_id_to_profile
2338
    // so we need to make sure this funciton is invoked after fragment context
2339
    // has already been prepared.
2340
2.23k
    if (!_prepared) {
2341
0
        std::string msg =
2342
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2343
0
        DCHECK(false) << msg;
2344
0
        LOG_ERROR(msg);
2345
0
        return res;
2346
0
    }
2347
2348
    // Make sure first profile is fragment level profile
2349
2.23k
    auto fragment_profile = std::make_shared<TRuntimeProfileTree>();
2350
2.23k
    _fragment_level_profile->to_thrift(fragment_profile.get(), _runtime_state->profile_level());
2351
2.23k
    res.push_back(fragment_profile);
2352
2353
    // pipeline_id_to_profile is initialized in prepare stage
2354
3.87k
    for (auto pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
2355
3.87k
        auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
2356
3.87k
        pipeline_profile->to_thrift(profile_ptr.get(), _runtime_state->profile_level());
2357
3.87k
        res.push_back(profile_ptr);
2358
3.87k
    }
2359
2360
2.23k
    return res;
2361
2.23k
}
2362
2363
std::shared_ptr<TRuntimeProfileTree>
2364
2.23k
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
2365
    // we do not have mutex to protect pipeline_id_to_profile
2366
    // so we need to make sure this funciton is invoked after fragment context
2367
    // has already been prepared.
2368
2.23k
    if (!_prepared) {
2369
0
        std::string msg =
2370
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2371
0
        DCHECK(false) << msg;
2372
0
        LOG_ERROR(msg);
2373
0
        return nullptr;
2374
0
    }
2375
2376
4.56k
    for (const auto& tasks : _tasks) {
2377
8.82k
        for (const auto& task : tasks) {
2378
8.82k
            if (task.second->load_channel_profile() == nullptr) {
2379
0
                continue;
2380
0
            }
2381
2382
8.82k
            auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2383
2384
8.82k
            task.second->load_channel_profile()->to_thrift(tmp_load_channel_profile.get(),
2385
8.82k
                                                           _runtime_state->profile_level());
2386
8.82k
            _runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
2387
8.82k
        }
2388
4.56k
    }
2389
2390
2.23k
    auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2391
2.23k
    _runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get(),
2392
2.23k
                                                      _runtime_state->profile_level());
2393
2.23k
    return load_channel_profile;
2394
2.23k
}
2395
2396
// Collect runtime filter IDs registered by all tasks in this PFC.
2397
// Used during recursive CTE stage transitions to know which filters to deregister
2398
// before creating the new PFC for the next recursion round.
2399
// Called from rerun_fragment(wait_for_destroy) while tasks are still closing.
2400
// Thread safety: safe because _tasks is structurally immutable after prepare() —
2401
// the vector sizes do not change, and individual RuntimeState filter sets are
2402
// written only during open() which has completed by the time we reach rerun.
2403
3.08k
std::set<int> PipelineFragmentContext::get_deregister_runtime_filter() const {
2404
3.08k
    std::set<int> result;
2405
4.71k
    for (const auto& _task : _tasks) {
2406
7.65k
        for (const auto& task : _task) {
2407
7.65k
            auto set = task.first->runtime_state()->get_deregister_runtime_filter();
2408
7.65k
            result.merge(set);
2409
7.65k
        }
2410
4.71k
    }
2411
3.08k
    if (_runtime_state) {
2412
3.08k
        auto set = _runtime_state->get_deregister_runtime_filter();
2413
3.08k
        result.merge(set);
2414
3.08k
    }
2415
3.08k
    return result;
2416
3.08k
}
2417
2418
430k
void PipelineFragmentContext::_release_resource() {
2419
430k
    std::lock_guard<std::mutex> l(_task_mutex);
2420
    // The memory released by the query end is recorded in the query mem tracker.
2421
430k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
2422
430k
    auto st = _query_ctx->exec_status();
2423
1.10M
    for (auto& _task : _tasks) {
2424
1.10M
        if (!_task.empty()) {
2425
1.10M
            _call_back(_task.front().first->runtime_state(), &st);
2426
1.10M
        }
2427
1.10M
    }
2428
430k
    _tasks.clear();
2429
430k
    _dag.clear();
2430
430k
    _pip_id_to_pipeline.clear();
2431
430k
    _pipelines.clear();
2432
430k
    _sink.reset();
2433
430k
    _root_op.reset();
2434
430k
    _runtime_filter_mgr_map.clear();
2435
430k
    _op_id_to_shared_state.clear();
2436
430k
}
2437
2438
} // namespace doris