Coverage Report

Created: 2026-06-08 13:27

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/pipeline/pipeline_fragment_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/pipeline/pipeline_fragment_context.h"
19
20
#include <gen_cpp/DataSinks_types.h>
21
#include <gen_cpp/FrontendService.h>
22
#include <gen_cpp/FrontendService_types.h>
23
#include <gen_cpp/PaloInternalService_types.h>
24
#include <gen_cpp/PlanNodes_types.h>
25
#include <pthread.h>
26
27
#include <algorithm>
28
#include <cstdlib>
29
// IWYU pragma: no_include <bits/chrono.h>
30
#include <fmt/format.h>
31
#include <thrift/Thrift.h>
32
#include <thrift/protocol/TDebugProtocol.h>
33
#include <thrift/transport/TTransportException.h>
34
35
#include <chrono> // IWYU pragma: keep
36
#include <map>
37
#include <memory>
38
#include <ostream>
39
#include <utility>
40
41
#include "cloud/config.h"
42
#include "common/cast_set.h"
43
#include "common/config.h"
44
#include "common/exception.h"
45
#include "common/logging.h"
46
#include "common/status.h"
47
#include "exec/exchange/local_exchange_sink_operator.h"
48
#include "exec/exchange/local_exchange_source_operator.h"
49
#include "exec/exchange/local_exchanger.h"
50
#include "exec/exchange/vdata_stream_mgr.h"
51
#include "exec/operator/aggregation_sink_operator.h"
52
#include "exec/operator/aggregation_source_operator.h"
53
#include "exec/operator/analytic_sink_operator.h"
54
#include "exec/operator/analytic_source_operator.h"
55
#include "exec/operator/assert_num_rows_operator.h"
56
#include "exec/operator/blackhole_sink_operator.h"
57
#include "exec/operator/bucketed_aggregation_sink_operator.h"
58
#include "exec/operator/bucketed_aggregation_source_operator.h"
59
#include "exec/operator/cache_sink_operator.h"
60
#include "exec/operator/cache_source_operator.h"
61
#include "exec/operator/datagen_operator.h"
62
#include "exec/operator/dict_sink_operator.h"
63
#include "exec/operator/distinct_streaming_aggregation_operator.h"
64
#include "exec/operator/empty_set_operator.h"
65
#include "exec/operator/exchange_sink_operator.h"
66
#include "exec/operator/exchange_source_operator.h"
67
#include "exec/operator/file_scan_operator.h"
68
#include "exec/operator/group_commit_block_sink_operator.h"
69
#include "exec/operator/group_commit_scan_operator.h"
70
#include "exec/operator/hashjoin_build_sink.h"
71
#include "exec/operator/hashjoin_probe_operator.h"
72
#include "exec/operator/hive_table_sink_operator.h"
73
#include "exec/operator/iceberg_delete_sink_operator.h"
74
#include "exec/operator/iceberg_merge_sink_operator.h"
75
#include "exec/operator/iceberg_table_sink_operator.h"
76
#include "exec/operator/jdbc_scan_operator.h"
77
#include "exec/operator/jdbc_table_sink_operator.h"
78
#include "exec/operator/local_merge_sort_source_operator.h"
79
#include "exec/operator/materialization_opertor.h"
80
#include "exec/operator/maxcompute_table_sink_operator.h"
81
#include "exec/operator/memory_scratch_sink_operator.h"
82
#include "exec/operator/meta_scan_operator.h"
83
#include "exec/operator/multi_cast_data_stream_sink.h"
84
#include "exec/operator/multi_cast_data_stream_source.h"
85
#include "exec/operator/nested_loop_join_build_operator.h"
86
#include "exec/operator/nested_loop_join_probe_operator.h"
87
#include "exec/operator/olap_scan_operator.h"
88
#include "exec/operator/olap_table_sink_operator.h"
89
#include "exec/operator/olap_table_sink_v2_operator.h"
90
#include "exec/operator/partition_sort_sink_operator.h"
91
#include "exec/operator/partition_sort_source_operator.h"
92
#include "exec/operator/partitioned_aggregation_sink_operator.h"
93
#include "exec/operator/partitioned_aggregation_source_operator.h"
94
#include "exec/operator/partitioned_hash_join_probe_operator.h"
95
#include "exec/operator/partitioned_hash_join_sink_operator.h"
96
#include "exec/operator/rec_cte_anchor_sink_operator.h"
97
#include "exec/operator/rec_cte_scan_operator.h"
98
#include "exec/operator/rec_cte_sink_operator.h"
99
#include "exec/operator/rec_cte_source_operator.h"
100
#include "exec/operator/repeat_operator.h"
101
#include "exec/operator/result_file_sink_operator.h"
102
#include "exec/operator/result_sink_operator.h"
103
#include "exec/operator/schema_scan_operator.h"
104
#include "exec/operator/select_operator.h"
105
#include "exec/operator/set_probe_sink_operator.h"
106
#include "exec/operator/set_sink_operator.h"
107
#include "exec/operator/set_source_operator.h"
108
#include "exec/operator/sort_sink_operator.h"
109
#include "exec/operator/sort_source_operator.h"
110
#include "exec/operator/spill_iceberg_table_sink_operator.h"
111
#include "exec/operator/spill_sort_sink_operator.h"
112
#include "exec/operator/spill_sort_source_operator.h"
113
#include "exec/operator/streaming_aggregation_operator.h"
114
#include "exec/operator/table_function_operator.h"
115
#include "exec/operator/tvf_table_sink_operator.h"
116
#include "exec/operator/union_sink_operator.h"
117
#include "exec/operator/union_source_operator.h"
118
#include "exec/pipeline/dependency.h"
119
#include "exec/pipeline/pipeline_task.h"
120
#include "exec/pipeline/task_scheduler.h"
121
#include "exec/runtime_filter/runtime_filter_mgr.h"
122
#include "exec/sort/topn_sorter.h"
123
#include "exec/spill/spill_file.h"
124
#include "io/fs/stream_load_pipe.h"
125
#include "load/stream_load/new_load_stream_mgr.h"
126
#include "runtime/exec_env.h"
127
#include "runtime/fragment_mgr.h"
128
#include "runtime/result_buffer_mgr.h"
129
#include "runtime/runtime_state.h"
130
#include "runtime/thread_context.h"
131
#include "service/backend_options.h"
132
#include "util/client_cache.h"
133
#include "util/countdown_latch.h"
134
#include "util/debug_util.h"
135
#include "util/network_util.h"
136
#include "util/uid_util.h"
137
138
namespace doris {
139
PipelineFragmentContext::PipelineFragmentContext(
140
        TUniqueId query_id, const TPipelineFragmentParams& request,
141
        std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
142
        const std::function<void(RuntimeState*, Status*)>& call_back)
143
123k
        : _query_id(std::move(query_id)),
144
123k
          _fragment_id(request.fragment_id),
145
123k
          _exec_env(exec_env),
146
123k
          _query_ctx(std::move(query_ctx)),
147
123k
          _call_back(call_back),
148
123k
          _is_report_on_cancel(true),
149
123k
          _params(request),
150
123k
          _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0),
151
123k
          _need_notify_close(request.__isset.need_notify_close ? request.need_notify_close
152
123k
                                                               : false) {
153
123k
    _fragment_watcher.start();
154
123k
}
155
156
123k
PipelineFragmentContext::~PipelineFragmentContext() {
157
123k
    LOG_INFO("PipelineFragmentContext::~PipelineFragmentContext")
158
123k
            .tag("query_id", print_id(_query_id))
159
123k
            .tag("fragment_id", _fragment_id);
160
123k
    _release_resource();
161
123k
    {
162
        // The memory released by the query end is recorded in the query mem tracker.
163
123k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
164
123k
        _runtime_state.reset();
165
123k
        _query_ctx.reset();
166
123k
    }
167
123k
}
168
169
0
bool PipelineFragmentContext::is_timeout(timespec now) const {
170
0
    if (_timeout <= 0) {
171
0
        return false;
172
0
    }
173
0
    return _fragment_watcher.elapsed_time_seconds(now) > _timeout;
174
0
}
175
176
// notify_close() transitions the PFC from "waiting for external close notification" to
177
// "self-managed close". For recursive CTE fragments, the old PFC is kept alive until
178
// the rerun_fragment(wait_for_destroy) RPC calls this to trigger shutdown.
179
// Returns true if all tasks have already closed (i.e., the PFC can be safely destroyed).
180
942
bool PipelineFragmentContext::notify_close() {
181
942
    bool all_closed = false;
182
942
    bool need_remove = false;
183
942
    {
184
942
        std::lock_guard<std::mutex> l(_task_mutex);
185
942
        if (_closed_tasks >= _total_tasks) {
186
26
            if (_need_notify_close) {
187
                // Fragment was cancelled and waiting for notify to close.
188
                // Record that we need to remove from fragment mgr, but do it
189
                // after releasing _task_mutex to avoid ABBA deadlock with
190
                // dump_pipeline_tasks() (which acquires _pipeline_map lock
191
                // first, then _task_mutex via debug_string()).
192
0
                need_remove = true;
193
0
            }
194
26
            all_closed = true;
195
26
        }
196
        // make fragment release by self after cancel
197
942
        _need_notify_close = false;
198
942
    }
199
942
    if (need_remove) {
200
0
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
201
0
    }
202
942
    return all_closed;
203
942
}
204
205
// Must not add lock in this method. Because it will call query ctx cancel. And
206
// QueryCtx cancel will call fragment ctx cancel. And Also Fragment ctx's running
207
// Method like exchange sink buffer will call query ctx cancel. If we add lock here
208
// There maybe dead lock.
209
942
void PipelineFragmentContext::cancel(const Status reason) {
210
942
    LOG_INFO("PipelineFragmentContext::cancel")
211
942
            .tag("query_id", print_id(_query_id))
212
942
            .tag("fragment_id", _fragment_id)
213
942
            .tag("reason", reason.to_string());
214
942
    if (notify_close()) {
215
26
        return;
216
26
    }
217
    // Timeout is a special error code, we need print current stack to debug timeout issue.
218
916
    if (reason.is<ErrorCode::TIMEOUT>()) {
219
0
        auto dbg_str = fmt::format("PipelineFragmentContext is cancelled due to timeout:\n{}",
220
0
                                   debug_string());
221
0
        LOG_LONG_STRING(WARNING, dbg_str);
222
0
    }
223
224
    // `ILLEGAL_STATE` means queries this fragment belongs to was not found in FE (maybe finished)
225
916
    if (reason.is<ErrorCode::ILLEGAL_STATE>()) {
226
0
        LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}",
227
0
                    debug_string());
228
0
    }
229
230
916
    if (reason.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || reason.is<ErrorCode::MEM_ALLOC_FAILED>()) {
231
0
        print_profile("cancel pipeline, reason: " + reason.to_string());
232
0
    }
233
234
916
    if (auto error_url = get_load_error_url(); !error_url.empty()) {
235
0
        _query_ctx->set_load_error_url(error_url);
236
0
    }
237
238
916
    if (auto first_error_msg = get_first_error_msg(); !first_error_msg.empty()) {
239
0
        _query_ctx->set_first_error_msg(first_error_msg);
240
0
    }
241
242
916
    _query_ctx->cancel(reason, _fragment_id);
243
916
    if (reason.is<ErrorCode::LIMIT_REACH>()) {
244
202
        _is_report_on_cancel = false;
245
714
    } else {
246
3.65k
        for (auto& id : _fragment_instance_ids) {
247
3.65k
            LOG(WARNING) << "PipelineFragmentContext cancel instance: " << print_id(id);
248
3.65k
        }
249
714
    }
250
    // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
251
    // For stream load the fragment's query_id == load id, it is set in FE.
252
916
    auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
253
916
    if (stream_load_ctx != nullptr) {
254
0
        stream_load_ctx->pipe->cancel(reason.to_string());
255
        // Set error URL here because after pipe is cancelled, stream load execution may return early.
256
        // We need to set the error URL at this point to ensure error information is properly
257
        // propagated to the client.
258
0
        stream_load_ctx->error_url = get_load_error_url();
259
0
        stream_load_ctx->first_error_msg = get_first_error_msg();
260
0
    }
261
262
3.88k
    for (auto& tasks : _tasks) {
263
9.02k
        for (auto& task : tasks) {
264
9.02k
            task.first->unblock_all_dependencies();
265
9.02k
        }
266
3.88k
    }
267
916
}
268
269
191k
PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
270
191k
    PipelineId id = _next_pipeline_id++;
271
191k
    auto pipeline = std::make_shared<Pipeline>(
272
191k
            id, parent ? std::min(parent->num_tasks(), _num_instances) : _num_instances,
273
191k
            parent ? parent->num_tasks() : _num_instances);
274
191k
    if (idx >= 0) {
275
19.0k
        _pipelines.insert(_pipelines.begin() + idx, pipeline);
276
172k
    } else {
277
172k
        _pipelines.emplace_back(pipeline);
278
172k
    }
279
191k
    if (parent) {
280
62.3k
        parent->set_children(pipeline);
281
62.3k
    }
282
191k
    return pipeline;
283
191k
}
284
285
123k
Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thread_pool) {
286
123k
    {
287
123k
        SCOPED_TIMER(_build_pipelines_timer);
288
        // 2. Build pipelines with operators in this fragment.
289
123k
        auto root_pipeline = add_pipeline();
290
123k
        RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl,
291
123k
                                         &_root_op, root_pipeline));
292
293
        // 3. Create sink operator
294
123k
        if (!_params.fragment.__isset.output_sink) {
295
0
            return Status::InternalError("No output sink in this fragment!");
296
0
        }
297
123k
        RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), _params.fragment.output_sink,
298
123k
                                          _params.fragment.output_exprs, _params,
299
123k
                                          root_pipeline->output_row_desc(), _runtime_state.get(),
300
123k
                                          *_desc_tbl, root_pipeline->id()));
301
123k
        RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink));
302
123k
        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
303
304
172k
        for (PipelinePtr& pipeline : _pipelines) {
305
18.4E
            DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size();
306
172k
            RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
307
172k
        }
308
123k
    }
309
    // 4. Build local exchanger
310
123k
    if (_runtime_state->enable_local_shuffle()) {
311
120k
        SCOPED_TIMER(_plan_local_exchanger_timer);
312
120k
        RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets,
313
120k
                                             _params.bucket_seq_to_instance_idx,
314
120k
                                             _params.shuffle_idx_to_instance_idx));
315
120k
    }
316
317
    // 5. Initialize global states in pipelines.
318
192k
    for (PipelinePtr& pipeline : _pipelines) {
319
192k
        SCOPED_TIMER(_prepare_all_pipelines_timer);
320
192k
        pipeline->children().clear();
321
192k
        RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
322
192k
    }
323
324
123k
    {
325
123k
        SCOPED_TIMER(_build_tasks_timer);
326
        // 6. Build pipeline tasks and initialize local state.
327
123k
        RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool));
328
123k
    }
329
330
123k
    return Status::OK();
331
123k
}
332
333
123k
Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) {
334
123k
    if (_prepared) {
335
0
        return Status::InternalError("Already prepared");
336
0
    }
337
123k
    if (_params.__isset.query_options && _params.query_options.__isset.execution_timeout) {
338
123k
        _timeout = _params.query_options.execution_timeout;
339
123k
    }
340
341
123k
    _fragment_level_profile = std::make_unique<RuntimeProfile>("PipelineContext");
342
123k
    _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime");
343
123k
    SCOPED_TIMER(_prepare_timer);
344
123k
    _build_pipelines_timer = ADD_TIMER(_fragment_level_profile, "BuildPipelinesTime");
345
123k
    _init_context_timer = ADD_TIMER(_fragment_level_profile, "InitContextTime");
346
123k
    _plan_local_exchanger_timer = ADD_TIMER(_fragment_level_profile, "PlanLocalLocalExchangerTime");
347
123k
    _build_tasks_timer = ADD_TIMER(_fragment_level_profile, "BuildTasksTime");
348
123k
    _prepare_all_pipelines_timer = ADD_TIMER(_fragment_level_profile, "PrepareAllPipelinesTime");
349
123k
    {
350
123k
        SCOPED_TIMER(_init_context_timer);
351
123k
        cast_set(_num_instances, _params.local_params.size());
352
123k
        _total_instances =
353
123k
                _params.__isset.total_instances ? _params.total_instances : _num_instances;
354
355
123k
        auto* fragment_context = this;
356
357
123k
        if (_params.query_options.__isset.is_report_success) {
358
123k
            fragment_context->set_is_report_success(_params.query_options.is_report_success);
359
123k
        }
360
361
        // 1. Set up the global runtime state.
362
123k
        _runtime_state = RuntimeState::create_unique(
363
123k
                _params.query_id, _params.fragment_id, _params.query_options,
364
123k
                _query_ctx->query_globals, _exec_env, _query_ctx.get());
365
123k
        _runtime_state->set_task_execution_context(shared_from_this());
366
123k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
367
123k
        if (_params.__isset.backend_id) {
368
121k
            _runtime_state->set_backend_id(_params.backend_id);
369
121k
        }
370
123k
        if (_params.__isset.import_label) {
371
0
            _runtime_state->set_import_label(_params.import_label);
372
0
        }
373
123k
        if (_params.__isset.db_name) {
374
0
            _runtime_state->set_db_name(_params.db_name);
375
0
        }
376
123k
        if (_params.__isset.load_job_id) {
377
0
            _runtime_state->set_load_job_id(_params.load_job_id);
378
0
        }
379
380
123k
        if (_params.is_simplified_param) {
381
36.6k
            _desc_tbl = _query_ctx->desc_tbl;
382
86.8k
        } else {
383
86.8k
            DCHECK(_params.__isset.desc_tbl);
384
86.8k
            RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), _params.desc_tbl,
385
86.8k
                                                  &_desc_tbl));
386
86.8k
        }
387
123k
        _runtime_state->set_desc_tbl(_desc_tbl);
388
123k
        _runtime_state->set_num_per_fragment_instances(_params.num_senders);
389
123k
        _runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
390
123k
        _runtime_state->set_total_load_streams(_params.total_load_streams);
391
123k
        _runtime_state->set_num_local_sink(_params.num_local_sink);
392
393
        // init fragment_instance_ids
394
123k
        const auto target_size = _params.local_params.size();
395
123k
        _fragment_instance_ids.resize(target_size);
396
392k
        for (size_t i = 0; i < _params.local_params.size(); i++) {
397
269k
            auto fragment_instance_id = _params.local_params[i].fragment_instance_id;
398
269k
            _fragment_instance_ids[i] = fragment_instance_id;
399
269k
        }
400
123k
    }
401
402
123k
    RETURN_IF_ERROR(_build_and_prepare_full_pipeline(thread_pool));
403
404
123k
    _init_next_report_time();
405
406
123k
    _prepared = true;
407
123k
    return Status::OK();
408
123k
}
409
410
Status PipelineFragmentContext::_build_pipeline_tasks_for_instance(
411
        int instance_idx,
412
269k
        const std::vector<std::shared_ptr<RuntimeProfile>>& pipeline_id_to_profile) {
413
269k
    const auto& local_params = _params.local_params[instance_idx];
414
269k
    auto fragment_instance_id = local_params.fragment_instance_id;
415
269k
    auto runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(false);
416
269k
    std::map<PipelineId, PipelineTask*> pipeline_id_to_task;
417
269k
    auto get_shared_state = [&](PipelinePtr pipeline)
418
269k
            -> std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
419
482k
                                       std::vector<std::shared_ptr<Dependency>>>> {
420
482k
        std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
421
482k
                                std::vector<std::shared_ptr<Dependency>>>>
422
482k
                shared_state_map;
423
502k
        for (auto& op : pipeline->operators()) {
424
502k
            auto source_id = op->operator_id();
425
502k
            if (auto iter = _op_id_to_shared_state.find(source_id);
426
502k
                iter != _op_id_to_shared_state.end()) {
427
158k
                shared_state_map.insert({source_id, iter->second});
428
158k
            }
429
502k
        }
430
486k
        for (auto sink_to_source_id : pipeline->sink()->dests_id()) {
431
486k
            if (auto iter = _op_id_to_shared_state.find(sink_to_source_id);
432
486k
                iter != _op_id_to_shared_state.end()) {
433
46.5k
                shared_state_map.insert({sink_to_source_id, iter->second});
434
46.5k
            }
435
486k
        }
436
482k
        return shared_state_map;
437
482k
    };
438
439
863k
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
440
594k
        auto& pipeline = _pipelines[pip_idx];
441
594k
        if (pipeline->num_tasks() > 1 || instance_idx == 0) {
442
482k
            auto task_runtime_state = RuntimeState::create_unique(
443
482k
                    local_params.fragment_instance_id, _params.query_id, _params.fragment_id,
444
482k
                    _params.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get());
445
482k
            {
446
                // Initialize runtime state for this task
447
482k
                task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker());
448
449
482k
                task_runtime_state->set_task_execution_context(shared_from_this());
450
482k
                task_runtime_state->set_be_number(local_params.backend_num);
451
452
482k
                if (_params.__isset.backend_id) {
453
480k
                    task_runtime_state->set_backend_id(_params.backend_id);
454
480k
                }
455
482k
                if (_params.__isset.import_label) {
456
0
                    task_runtime_state->set_import_label(_params.import_label);
457
0
                }
458
482k
                if (_params.__isset.db_name) {
459
0
                    task_runtime_state->set_db_name(_params.db_name);
460
0
                }
461
482k
                if (_params.__isset.load_job_id) {
462
0
                    task_runtime_state->set_load_job_id(_params.load_job_id);
463
0
                }
464
482k
                if (_params.__isset.wal_id) {
465
0
                    task_runtime_state->set_wal_id(_params.wal_id);
466
0
                }
467
482k
                if (_params.__isset.content_length) {
468
0
                    task_runtime_state->set_content_length(_params.content_length);
469
0
                }
470
471
482k
                task_runtime_state->set_desc_tbl(_desc_tbl);
472
482k
                task_runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
473
482k
                task_runtime_state->set_num_per_fragment_instances(_params.num_senders);
474
482k
                task_runtime_state->resize_op_id_to_local_state(max_operator_id());
475
482k
                task_runtime_state->set_max_operator_id(max_operator_id());
476
482k
                task_runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
477
482k
                task_runtime_state->set_total_load_streams(_params.total_load_streams);
478
482k
                task_runtime_state->set_num_local_sink(_params.num_local_sink);
479
480
482k
                task_runtime_state->set_runtime_filter_mgr(runtime_filter_mgr.get());
481
482k
            }
482
482k
            auto cur_task_id = _total_tasks++;
483
482k
            task_runtime_state->set_task_id(cur_task_id);
484
482k
            task_runtime_state->set_task_num(pipeline->num_tasks());
485
482k
            auto task = std::make_shared<PipelineTask>(
486
482k
                    pipeline, cur_task_id, task_runtime_state.get(),
487
482k
                    std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
488
482k
                    pipeline_id_to_profile[pip_idx].get(), get_shared_state(pipeline),
489
482k
                    instance_idx);
490
482k
            pipeline->incr_created_tasks(instance_idx, task.get());
491
482k
            pipeline_id_to_task.insert({pipeline->id(), task.get()});
492
482k
            _tasks[instance_idx].emplace_back(
493
482k
                    std::pair<std::shared_ptr<PipelineTask>, std::unique_ptr<RuntimeState>> {
494
482k
                            std::move(task), std::move(task_runtime_state)});
495
482k
        }
496
594k
    }
497
498
    /**
499
         * Build DAG for pipeline tasks.
500
         * For example, we have
501
         *
502
         *   ExchangeSink (Pipeline1)     JoinBuildSink (Pipeline2)
503
         *            \                      /
504
         *          JoinProbeOperator1 (Pipeline1)    JoinBuildSink (Pipeline3)
505
         *                 \                          /
506
         *               JoinProbeOperator2 (Pipeline1)
507
         *
508
         * In this fragment, we have three pipelines and pipeline 1 depends on pipeline 2 and pipeline 3.
509
         * To build this DAG, `_dag` manage dependencies between pipelines by pipeline ID and
510
         * `pipeline_id_to_task` is used to find the task by a unique pipeline ID.
511
         *
512
         * Finally, we have two upstream dependencies in Pipeline1 corresponding to JoinProbeOperator1
513
         * and JoinProbeOperator2.
514
         */
515
594k
    for (auto& _pipeline : _pipelines) {
516
594k
        if (pipeline_id_to_task.contains(_pipeline->id())) {
517
482k
            auto* task = pipeline_id_to_task[_pipeline->id()];
518
482k
            DCHECK(task != nullptr);
519
520
            // If this task has upstream dependency, then inject it into this task.
521
482k
            if (_dag.contains(_pipeline->id())) {
522
320k
                auto& deps = _dag[_pipeline->id()];
523
477k
                for (auto& dep : deps) {
524
477k
                    if (pipeline_id_to_task.contains(dep)) {
525
253k
                        auto ss = pipeline_id_to_task[dep]->get_sink_shared_state();
526
253k
                        if (ss) {
527
165k
                            task->inject_shared_state(ss);
528
165k
                        } else {
529
87.9k
                            pipeline_id_to_task[dep]->inject_shared_state(
530
87.9k
                                    task->get_source_shared_state());
531
87.9k
                        }
532
253k
                    }
533
477k
                }
534
320k
            }
535
482k
        }
536
594k
    }
537
863k
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
538
594k
        if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
539
482k
            auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
540
482k
            DCHECK(pipeline_id_to_profile[pip_idx]);
541
482k
            std::vector<TScanRangeParams> scan_ranges;
542
482k
            auto node_id = _pipelines[pip_idx]->operators().front()->node_id();
543
482k
            if (local_params.per_node_scan_ranges.contains(node_id)) {
544
109k
                scan_ranges = local_params.per_node_scan_ranges.find(node_id)->second;
545
109k
            }
546
482k
            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(task->prepare(scan_ranges, local_params.sender_id,
547
482k
                                                             _params.fragment.output_sink));
548
482k
        }
549
594k
    }
550
269k
    {
551
269k
        std::lock_guard<std::mutex> l(_state_map_lock);
552
269k
        _runtime_filter_mgr_map[instance_idx] = std::move(runtime_filter_mgr);
553
269k
    }
554
269k
    return Status::OK();
555
269k
}
556
557
123k
Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
558
123k
    _total_tasks = 0;
559
123k
    _closed_tasks = 0;
560
123k
    const auto target_size = _params.local_params.size();
561
123k
    _tasks.resize(target_size);
562
123k
    _runtime_filter_mgr_map.resize(target_size);
563
315k
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
564
192k
        _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get();
565
192k
    }
566
123k
    auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size());
567
568
123k
    if (target_size > 1 &&
569
123k
        (_runtime_state->query_options().__isset.parallel_prepare_threshold &&
570
20.8k
         target_size > _runtime_state->query_options().parallel_prepare_threshold)) {
571
        // If instances parallelism is big enough ( > parallel_prepare_threshold), we will prepare all tasks by multi-threads
572
6
        std::vector<Status> prepare_status(target_size);
573
6
        int submitted_tasks = 0;
574
6
        Status submit_status;
575
6
        CountDownLatch latch((int)target_size);
576
74
        for (int i = 0; i < target_size; i++) {
577
68
            submit_status = thread_pool->submit_func([&, i]() {
578
68
                SCOPED_ATTACH_TASK(_query_ctx.get());
579
68
                prepare_status[i] = _build_pipeline_tasks_for_instance(i, pipeline_id_to_profile);
580
68
                latch.count_down();
581
68
            });
582
68
            if (LIKELY(submit_status.ok())) {
583
68
                submitted_tasks++;
584
68
            } else {
585
0
                break;
586
0
            }
587
68
        }
588
6
        latch.arrive_and_wait(target_size - submitted_tasks);
589
6
        if (UNLIKELY(!submit_status.ok())) {
590
0
            return submit_status;
591
0
        }
592
74
        for (int i = 0; i < submitted_tasks; i++) {
593
68
            if (!prepare_status[i].ok()) {
594
0
                return prepare_status[i];
595
0
            }
596
68
        }
597
123k
    } else {
598
392k
        for (int i = 0; i < target_size; i++) {
599
269k
            RETURN_IF_ERROR(_build_pipeline_tasks_for_instance(i, pipeline_id_to_profile));
600
269k
        }
601
123k
    }
602
123k
    _pipeline_parent_map.clear();
603
123k
    _op_id_to_shared_state.clear();
604
    // Record task cardinality once when this fragment context finishes task initialization.
605
123k
    _query_ctx->add_total_task_num(_total_tasks.load(std::memory_order_relaxed));
606
607
123k
    return Status::OK();
608
123k
}
609
610
123k
void PipelineFragmentContext::_init_next_report_time() {
611
123k
    auto interval_s = config::pipeline_status_report_interval;
612
123k
    if (_is_report_success && interval_s > 0 && _timeout > interval_s) {
613
11.2k
        VLOG_FILE << "enable period report: fragment id=" << _fragment_id;
614
11.2k
        uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC;
615
        // We don't want to wait longer than it takes to run the entire fragment.
616
11.2k
        _previous_report_time =
617
11.2k
                MonotonicNanos() + report_fragment_offset - (uint64_t)(interval_s)*NANOS_PER_SEC;
618
11.2k
        _disable_period_report = false;
619
11.2k
    }
620
123k
}
621
622
1.34k
void PipelineFragmentContext::refresh_next_report_time() {
623
1.34k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
624
1.34k
    DCHECK(disable == true);
625
1.34k
    _previous_report_time.store(MonotonicNanos(), std::memory_order_release);
626
1.34k
    _disable_period_report.compare_exchange_strong(disable, false);
627
1.34k
}
628
629
1.86M
void PipelineFragmentContext::trigger_report_if_necessary() {
630
1.86M
    if (!_is_report_success) {
631
1.73M
        return;
632
1.73M
    }
633
135k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
634
135k
    if (disable) {
635
3.29k
        return;
636
3.29k
    }
637
131k
    int32_t interval_s = config::pipeline_status_report_interval;
638
131k
    if (interval_s <= 0) {
639
0
        LOG(WARNING) << "config::status_report_interval is equal to or less than zero, do not "
640
0
                        "trigger "
641
0
                        "report.";
642
0
    }
643
131k
    uint64_t next_report_time = _previous_report_time.load(std::memory_order_acquire) +
644
131k
                                (uint64_t)(interval_s)*NANOS_PER_SEC;
645
131k
    if (MonotonicNanos() > next_report_time) {
646
1.34k
        if (!_disable_period_report.compare_exchange_strong(disable, true,
647
1.34k
                                                            std::memory_order_acq_rel)) {
648
4
            return;
649
4
        }
650
1.34k
        if (VLOG_FILE_IS_ON) {
651
0
            VLOG_FILE << "Reporting "
652
0
                      << "profile for query_id " << print_id(_query_id)
653
0
                      << ", fragment id: " << _fragment_id;
654
655
0
            std::stringstream ss;
656
0
            _runtime_state->runtime_profile()->compute_time_in_profile();
657
0
            _runtime_state->runtime_profile()->pretty_print(&ss);
658
0
            if (_runtime_state->load_channel_profile()) {
659
0
                _runtime_state->load_channel_profile()->pretty_print(&ss);
660
0
            }
661
662
0
            VLOG_FILE << "Query " << print_id(get_query_id()) << " fragment " << get_fragment_id()
663
0
                      << " profile:\n"
664
0
                      << ss.str();
665
0
        }
666
1.34k
        auto st = send_report(false);
667
1.34k
        if (!st.ok()) {
668
0
            disable = true;
669
0
            _disable_period_report.compare_exchange_strong(disable, false,
670
0
                                                           std::memory_order_acq_rel);
671
0
        }
672
1.34k
    }
673
131k
}
674
675
Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const DescriptorTbl& descs,
676
123k
                                                 OperatorPtr* root, PipelinePtr cur_pipe) {
677
123k
    if (_params.fragment.plan.nodes.empty()) {
678
0
        throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no plan node!");
679
0
    }
680
681
123k
    int node_idx = 0;
682
683
123k
    RETURN_IF_ERROR(_create_tree_helper(pool, _params.fragment.plan.nodes, descs, nullptr,
684
123k
                                        &node_idx, root, cur_pipe, 0, false, false));
685
686
123k
    if (node_idx + 1 != _params.fragment.plan.nodes.size()) {
687
0
        return Status::InternalError(
688
0
                "Plan tree only partially reconstructed. Not all thrift nodes were used.");
689
0
    }
690
123k
    return Status::OK();
691
123k
}
692
693
Status PipelineFragmentContext::_create_tree_helper(
694
        ObjectPool* pool, const std::vector<TPlanNode>& tnodes, const DescriptorTbl& descs,
695
        OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx,
696
175k
        const bool followed_by_shuffled_operator, const bool require_bucket_distribution) {
697
    // propagate error case
698
175k
    if (*node_idx >= tnodes.size()) {
699
0
        return Status::InternalError(
700
0
                "Failed to reconstruct plan tree from thrift. Node id: {}, number of nodes: {}",
701
0
                *node_idx, tnodes.size());
702
0
    }
703
175k
    const TPlanNode& tnode = tnodes[*node_idx];
704
705
175k
    int num_children = tnodes[*node_idx].num_children;
706
175k
    bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
707
175k
    bool current_require_bucket_distribution = require_bucket_distribution;
708
    // TODO: Create CacheOperator is confused now
709
175k
    OperatorPtr op = nullptr;
710
175k
    OperatorPtr cache_op = nullptr;
711
175k
    RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op, cur_pipe,
712
175k
                                     parent == nullptr ? -1 : parent->node_id(), child_idx,
713
175k
                                     followed_by_shuffled_operator,
714
175k
                                     current_require_bucket_distribution, cache_op));
715
    // Initialization must be done here. For example, group by expressions in agg will be used to
716
    // decide if a local shuffle should be planed, so it must be initialized here.
717
175k
    RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
718
    // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
719
175k
    if (parent != nullptr) {
720
        // add to parent's child(s)
721
52.2k
        RETURN_IF_ERROR(parent->set_child(cache_op ? cache_op : op));
722
123k
    } else {
723
123k
        *root = op;
724
123k
    }
725
    /**
726
     * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
727
     *
728
     * For plan:
729
     * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
730
     *                           Exchange(id=3) -> ShuffledHashJoinBuild(id=2)
731
     * We must ensure data distribution of `LocalExchange(id=0)` is same as Exchange(id=3).
732
     *
733
     * If an operator's is followed by a local exchange without shuffle (e.g. passthrough), a
734
     * shuffled local exchanger will be used before join so it is not followed by shuffle join.
735
     */
736
175k
    auto required_data_distribution =
737
175k
            cur_pipe->operators().empty()
738
175k
                    ? cur_pipe->sink()->required_data_distribution(_runtime_state.get())
739
175k
                    : op->required_data_distribution(_runtime_state.get());
740
175k
    current_followed_by_shuffled_operator =
741
175k
            ((followed_by_shuffled_operator ||
742
175k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_shuffled_operator()
743
172k
                                             : op->is_shuffled_operator())) &&
744
175k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
745
175k
            (followed_by_shuffled_operator &&
746
169k
             required_data_distribution.distribution_type == ExchangeType::NOOP);
747
748
175k
    current_require_bucket_distribution =
749
175k
            ((require_bucket_distribution ||
750
175k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_colocated_operator()
751
172k
                                             : op->is_colocated_operator())) &&
752
175k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
753
175k
            (require_bucket_distribution &&
754
170k
             required_data_distribution.distribution_type == ExchangeType::NOOP);
755
756
175k
    if (num_children == 0) {
757
128k
        _use_serial_source = op->is_serial_operator();
758
128k
    }
759
    // rely on that tnodes is preorder of the plan
760
227k
    for (int i = 0; i < num_children; i++) {
761
52.2k
        ++*node_idx;
762
52.2k
        RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, descs, op, node_idx, nullptr, cur_pipe, i,
763
52.2k
                                            current_followed_by_shuffled_operator,
764
52.2k
                                            current_require_bucket_distribution));
765
766
        // we are expecting a child, but have used all nodes
767
        // this means we have been given a bad tree and must fail
768
52.2k
        if (*node_idx >= tnodes.size()) {
769
0
            return Status::InternalError(
770
0
                    "Failed to reconstruct plan tree from thrift. Node id: {}, number of "
771
0
                    "nodes: {}",
772
0
                    *node_idx, tnodes.size());
773
0
        }
774
52.2k
    }
775
776
175k
    return Status::OK();
777
175k
}
778
779
void PipelineFragmentContext::_inherit_pipeline_properties(
780
        const DataDistribution& data_distribution, PipelinePtr pipe_with_source,
781
19.0k
        PipelinePtr pipe_with_sink) {
782
19.0k
    pipe_with_sink->set_num_tasks(pipe_with_source->num_tasks());
783
19.0k
    pipe_with_source->set_num_tasks(_num_instances);
784
19.0k
    pipe_with_source->set_data_distribution(data_distribution);
785
19.0k
}
786
787
Status PipelineFragmentContext::_add_local_exchange_impl(
788
        int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip,
789
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
790
        const std::map<int, int>& bucket_seq_to_instance_idx,
791
19.0k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
792
19.0k
    auto& operators = cur_pipe->operators();
793
19.0k
    const auto downstream_pipeline_id = cur_pipe->id();
794
19.0k
    auto local_exchange_id = next_operator_id();
795
    // 1. Create a new pipeline with local exchange sink.
796
19.0k
    DataSinkOperatorPtr sink;
797
19.0k
    auto sink_id = next_sink_operator_id();
798
799
    /**
800
     * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
801
     * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
802
     */
803
19.0k
    const bool followed_by_shuffled_operator =
804
19.0k
            operators.size() > idx ? operators[idx]->followed_by_shuffled_operator()
805
19.0k
                                   : cur_pipe->sink()->followed_by_shuffled_operator();
806
19.0k
    const bool use_global_hash_shuffle = bucket_seq_to_instance_idx.empty() &&
807
19.0k
                                         !shuffle_idx_to_instance_idx.contains(-1) &&
808
19.0k
                                         followed_by_shuffled_operator && !_use_serial_source;
809
19.0k
    sink = std::make_shared<LocalExchangeSinkOperatorX>(
810
19.0k
            sink_id, local_exchange_id, use_global_hash_shuffle ? _total_instances : _num_instances,
811
19.0k
            data_distribution.partition_exprs, bucket_seq_to_instance_idx);
812
19.0k
    if (bucket_seq_to_instance_idx.empty() &&
813
19.0k
        data_distribution.distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE) {
814
0
        data_distribution.distribution_type = ExchangeType::HASH_SHUFFLE;
815
0
    }
816
19.0k
    RETURN_IF_ERROR(new_pip->set_sink(sink));
817
19.0k
    RETURN_IF_ERROR(new_pip->sink()->init(_runtime_state.get(), data_distribution.distribution_type,
818
19.0k
                                          num_buckets, use_global_hash_shuffle,
819
19.0k
                                          shuffle_idx_to_instance_idx));
820
821
    // 2. Create and initialize LocalExchangeSharedState.
822
19.0k
    std::shared_ptr<LocalExchangeSharedState> shared_state =
823
19.0k
            LocalExchangeSharedState::create_shared(_num_instances);
824
19.0k
    switch (data_distribution.distribution_type) {
825
638
    case ExchangeType::HASH_SHUFFLE:
826
638
        shared_state->exchanger = ShuffleExchanger::create_unique(
827
638
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
828
638
                use_global_hash_shuffle ? _total_instances : _num_instances,
829
638
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
830
638
                        ? cast_set<int>(
831
638
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
832
638
                        : 0);
833
638
        break;
834
8
    case ExchangeType::BUCKET_HASH_SHUFFLE:
835
8
        shared_state->exchanger = BucketShuffleExchanger::create_unique(
836
8
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, num_buckets,
837
8
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
838
8
                        ? cast_set<int>(
839
8
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
840
8
                        : 0);
841
8
        break;
842
17.6k
    case ExchangeType::PASSTHROUGH:
843
17.6k
        shared_state->exchanger = PassthroughExchanger::create_unique(
844
17.6k
                cur_pipe->num_tasks(), _num_instances,
845
17.6k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
846
17.6k
                        ? cast_set<int>(
847
17.6k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
848
17.6k
                        : 0);
849
17.6k
        break;
850
58
    case ExchangeType::BROADCAST:
851
58
        shared_state->exchanger = BroadcastExchanger::create_unique(
852
58
                cur_pipe->num_tasks(), _num_instances,
853
58
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
854
58
                        ? cast_set<int>(
855
58
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
856
58
                        : 0);
857
58
        break;
858
666
    case ExchangeType::PASS_TO_ONE:
859
666
        if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
860
            // If shared hash table is enabled for BJ, hash table will be built by only one task
861
666
            shared_state->exchanger = PassToOneExchanger::create_unique(
862
666
                    cur_pipe->num_tasks(), _num_instances,
863
666
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
864
666
                            ? cast_set<int>(_runtime_state->query_options()
865
666
                                                    .local_exchange_free_blocks_limit)
866
666
                            : 0);
867
666
        } else {
868
0
            shared_state->exchanger = BroadcastExchanger::create_unique(
869
0
                    cur_pipe->num_tasks(), _num_instances,
870
0
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
871
0
                            ? cast_set<int>(_runtime_state->query_options()
872
0
                                                    .local_exchange_free_blocks_limit)
873
0
                            : 0);
874
0
        }
875
666
        break;
876
58
    case ExchangeType::ADAPTIVE_PASSTHROUGH:
877
58
        shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
878
58
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
879
58
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
880
58
                        ? cast_set<int>(
881
58
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
882
58
                        : 0);
883
58
        break;
884
0
    default:
885
0
        return Status::InternalError("Unsupported local exchange type : " +
886
0
                                     std::to_string((int)data_distribution.distribution_type));
887
19.0k
    }
888
19.0k
    shared_state->create_source_dependencies(_num_instances, local_exchange_id, local_exchange_id,
889
19.0k
                                             "LOCAL_EXCHANGE_OPERATOR");
890
19.0k
    shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
891
19.0k
    _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
892
893
    // 3. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to
894
    // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink].
895
896
    // 3.1 Initialize new pipeline's operator list.
897
19.0k
    std::copy(operators.begin(), operators.begin() + idx,
898
19.0k
              std::inserter(new_pip->operators(), new_pip->operators().end()));
899
900
    // 3.2 Erase unused operators in previous pipeline.
901
19.0k
    operators.erase(operators.begin(), operators.begin() + idx);
902
903
    // 4. Initialize LocalExchangeSource and insert it into this pipeline.
904
19.0k
    OperatorPtr source_op;
905
19.0k
    source_op = std::make_shared<LocalExchangeSourceOperatorX>(pool, local_exchange_id);
906
19.0k
    RETURN_IF_ERROR(source_op->set_child(new_pip->operators().back()));
907
19.0k
    RETURN_IF_ERROR(source_op->init(data_distribution.distribution_type));
908
19.0k
    if (!operators.empty()) {
909
1.40k
        RETURN_IF_ERROR(operators.front()->set_child(nullptr));
910
1.40k
        RETURN_IF_ERROR(operators.front()->set_child(source_op));
911
1.40k
    }
912
19.0k
    operators.insert(operators.begin(), source_op);
913
914
    // 5. Set children for two pipelines separately.
915
19.0k
    std::vector<std::shared_ptr<Pipeline>> new_children;
916
19.0k
    std::vector<PipelineId> edges_with_source;
917
21.1k
    for (auto child : cur_pipe->children()) {
918
21.1k
        bool found = false;
919
22.1k
        for (auto op : new_pip->operators()) {
920
22.1k
            if (child->sink()->node_id() == op->node_id()) {
921
1.33k
                new_pip->set_children(child);
922
1.33k
                found = true;
923
1.33k
            };
924
22.1k
        }
925
21.1k
        if (!found) {
926
19.7k
            new_children.push_back(child);
927
19.7k
            edges_with_source.push_back(child->id());
928
19.7k
        }
929
21.1k
    }
930
19.0k
    new_children.push_back(new_pip);
931
19.0k
    edges_with_source.push_back(new_pip->id());
932
933
    // 6. Set DAG for new pipelines.
934
19.0k
    if (!new_pip->children().empty()) {
935
866
        std::vector<PipelineId> edges_with_sink;
936
1.33k
        for (auto child : new_pip->children()) {
937
1.33k
            edges_with_sink.push_back(child->id());
938
1.33k
        }
939
866
        _dag.insert({new_pip->id(), edges_with_sink});
940
866
    }
941
19.0k
    cur_pipe->set_children(new_children);
942
19.0k
    _dag[downstream_pipeline_id] = edges_with_source;
943
19.0k
    RETURN_IF_ERROR(new_pip->sink()->set_child(new_pip->operators().back()));
944
19.0k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(nullptr));
945
19.0k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(cur_pipe->operators().back()));
946
947
    // 7. Inherit properties from current pipeline.
948
19.0k
    _inherit_pipeline_properties(data_distribution, cur_pipe, new_pip);
949
19.0k
    return Status::OK();
950
19.0k
}
951
952
Status PipelineFragmentContext::_add_local_exchange(
953
        int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe,
954
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
955
        const std::map<int, int>& bucket_seq_to_instance_idx,
956
39.3k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
957
39.3k
    if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) {
958
20.1k
        return Status::OK();
959
20.1k
    }
960
961
19.1k
    if (!cur_pipe->need_to_local_exchange(data_distribution, idx)) {
962
854
        return Status::OK();
963
854
    }
964
18.3k
    *do_local_exchange = true;
965
966
18.3k
    auto& operators = cur_pipe->operators();
967
18.3k
    auto total_op_num = operators.size();
968
18.3k
    auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
969
18.3k
    RETURN_IF_ERROR(_add_local_exchange_impl(
970
18.3k
            idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets,
971
18.3k
            bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
972
973
18.3k
    CHECK(total_op_num + 1 == cur_pipe->operators().size() + new_pip->operators().size())
974
0
            << "total_op_num: " << total_op_num
975
0
            << " cur_pipe->operators().size(): " << cur_pipe->operators().size()
976
0
            << " new_pip->operators().size(): " << new_pip->operators().size();
977
978
    // There are some local shuffles with relatively heavy operations on the sink.
979
    // If the local sink concurrency is 1 and the local source concurrency is n, the sink becomes a bottleneck.
980
    // Therefore, local passthrough is used to increase the concurrency of the sink.
981
    // op -> local sink(1) -> local source (n)
982
    // op -> local passthrough(1) -> local passthrough(n) ->  local sink(n) -> local source (n)
983
18.3k
    if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 &&
984
18.3k
        Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) {
985
704
        RETURN_IF_ERROR(_add_local_exchange_impl(
986
704
                cast_set<int>(new_pip->operators().size()), pool, new_pip,
987
704
                add_pipeline(new_pip, pip_idx + 2), DataDistribution(ExchangeType::PASSTHROUGH),
988
704
                do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
989
704
                shuffle_idx_to_instance_idx));
990
704
    }
991
18.3k
    return Status::OK();
992
18.3k
}
993
994
Status PipelineFragmentContext::_plan_local_exchange(
995
        int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx,
996
120k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
997
291k
    for (int pip_idx = cast_set<int>(_pipelines.size()) - 1; pip_idx >= 0; pip_idx--) {
998
170k
        _pipelines[pip_idx]->init_data_distribution(_runtime_state.get());
999
        // Set property if child pipeline is not join operator's child.
1000
170k
        if (!_pipelines[pip_idx]->children().empty()) {
1001
43.2k
            for (auto& child : _pipelines[pip_idx]->children()) {
1002
43.2k
                if (child->sink()->node_id() ==
1003
43.2k
                    _pipelines[pip_idx]->operators().front()->node_id()) {
1004
38.2k
                    _pipelines[pip_idx]->set_data_distribution(child->data_distribution());
1005
38.2k
                }
1006
43.2k
            }
1007
40.9k
        }
1008
1009
        // if 'num_buckets == 0' means the fragment is colocated by exchange node not the
1010
        // scan node. so here use `_num_instance` to replace the `num_buckets` to prevent dividing 0
1011
        // still keep colocate plan after local shuffle
1012
170k
        RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx, _pipelines[pip_idx],
1013
170k
                                             bucket_seq_to_instance_idx,
1014
170k
                                             shuffle_idx_to_instance_idx));
1015
170k
    }
1016
120k
    return Status::OK();
1017
120k
}
1018
1019
Status PipelineFragmentContext::_plan_local_exchange(
1020
        int num_buckets, int pip_idx, PipelinePtr pip,
1021
        const std::map<int, int>& bucket_seq_to_instance_idx,
1022
170k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1023
170k
    int idx = 1;
1024
170k
    bool do_local_exchange = false;
1025
171k
    do {
1026
171k
        auto& ops = pip->operators();
1027
171k
        do_local_exchange = false;
1028
        // Plan local exchange for each operator.
1029
179k
        for (; idx < ops.size();) {
1030
8.97k
            if (ops[idx]->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1031
5.88k
                RETURN_IF_ERROR(_add_local_exchange(
1032
5.88k
                        pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip,
1033
5.88k
                        ops[idx]->required_data_distribution(_runtime_state.get()),
1034
5.88k
                        &do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
1035
5.88k
                        shuffle_idx_to_instance_idx));
1036
5.88k
            }
1037
8.97k
            if (do_local_exchange) {
1038
                // If local exchange is needed for current operator, we will split this pipeline to
1039
                // two pipelines by local exchange sink/source. And then we need to process remaining
1040
                // operators in this pipeline so we set idx to 2 (0 is local exchange source and 1
1041
                // is current operator was already processed) and continue to plan local exchange.
1042
1.40k
                idx = 2;
1043
1.40k
                break;
1044
1.40k
            }
1045
7.57k
            idx++;
1046
7.57k
        }
1047
171k
    } while (do_local_exchange);
1048
170k
    if (pip->sink()->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1049
33.4k
        RETURN_IF_ERROR(_add_local_exchange(
1050
33.4k
                pip_idx, idx, pip->sink()->node_id(), _runtime_state->obj_pool(), pip,
1051
33.4k
                pip->sink()->required_data_distribution(_runtime_state.get()), &do_local_exchange,
1052
33.4k
                num_buckets, bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1053
33.4k
    }
1054
170k
    return Status::OK();
1055
170k
}
1056
1057
Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
1058
                                                  const std::vector<TExpr>& output_exprs,
1059
                                                  const TPipelineFragmentParams& params,
1060
                                                  const RowDescriptor& row_desc,
1061
                                                  RuntimeState* state, DescriptorTbl& desc_tbl,
1062
123k
                                                  PipelineId cur_pipeline_id) {
1063
123k
    switch (thrift_sink.type) {
1064
34.5k
    case TDataSinkType::DATA_STREAM_SINK: {
1065
34.5k
        if (!thrift_sink.__isset.stream_sink) {
1066
0
            return Status::InternalError("Missing data stream sink.");
1067
0
        }
1068
34.5k
        _sink = std::make_shared<ExchangeSinkOperatorX>(
1069
34.5k
                state, row_desc, next_sink_operator_id(), thrift_sink.stream_sink,
1070
34.5k
                params.destinations, _fragment_instance_ids);
1071
34.5k
        break;
1072
34.5k
    }
1073
77.5k
    case TDataSinkType::RESULT_SINK: {
1074
77.5k
        if (!thrift_sink.__isset.result_sink) {
1075
0
            return Status::InternalError("Missing data buffer sink.");
1076
0
        }
1077
1078
77.5k
        auto& pipeline = _pipelines[cur_pipeline_id];
1079
77.5k
        int child_node_id = pipeline->operators().back()->node_id();
1080
77.5k
        _sink = std::make_shared<ResultSinkOperatorX>(next_sink_operator_id(), child_node_id + 1,
1081
77.5k
                                                      row_desc, output_exprs,
1082
77.5k
                                                      thrift_sink.result_sink);
1083
77.5k
        break;
1084
77.5k
    }
1085
54
    case TDataSinkType::DICTIONARY_SINK: {
1086
54
        if (!thrift_sink.__isset.dictionary_sink) {
1087
0
            return Status::InternalError("Missing dict sink.");
1088
0
        }
1089
1090
54
        _sink = std::make_shared<DictSinkOperatorX>(next_sink_operator_id(), row_desc, output_exprs,
1091
54
                                                    thrift_sink.dictionary_sink);
1092
54
        break;
1093
54
    }
1094
0
    case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
1095
5.52k
    case TDataSinkType::OLAP_TABLE_SINK: {
1096
5.52k
        auto& pipeline = _pipelines[cur_pipeline_id];
1097
5.52k
        int child_node_id = pipeline->operators().back()->node_id();
1098
5.52k
        if (state->query_options().enable_memtable_on_sink_node &&
1099
5.52k
            !_has_inverted_index_v1_or_partial_update(thrift_sink.olap_table_sink) &&
1100
5.52k
            !_has_row_binlog(thrift_sink.olap_table_sink) && !config::is_cloud_mode()) {
1101
2.88k
            _sink = std::make_shared<OlapTableSinkV2OperatorX>(
1102
2.88k
                    pool, next_sink_operator_id(), child_node_id + 1, row_desc, output_exprs);
1103
2.88k
        } else {
1104
2.63k
            _sink = std::make_shared<OlapTableSinkOperatorX>(
1105
2.63k
                    pool, next_sink_operator_id(), child_node_id + 1, row_desc, output_exprs);
1106
2.63k
        }
1107
5.52k
        break;
1108
0
    }
1109
0
    case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
1110
0
        DCHECK(thrift_sink.__isset.olap_table_sink);
1111
0
        DCHECK(state->get_query_ctx() != nullptr);
1112
0
        state->get_query_ctx()->query_mem_tracker()->is_group_commit_load = true;
1113
0
        _sink = std::make_shared<GroupCommitBlockSinkOperatorX>(next_sink_operator_id(), row_desc,
1114
0
                                                                output_exprs);
1115
0
        break;
1116
0
    }
1117
1.46k
    case TDataSinkType::HIVE_TABLE_SINK: {
1118
1.46k
        if (!thrift_sink.__isset.hive_table_sink) {
1119
0
            return Status::InternalError("Missing hive table sink.");
1120
0
        }
1121
1.46k
        _sink = std::make_shared<HiveTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1122
1.46k
                                                         output_exprs);
1123
1.46k
        break;
1124
1.46k
    }
1125
1.72k
    case TDataSinkType::ICEBERG_TABLE_SINK: {
1126
1.72k
        if (!thrift_sink.__isset.iceberg_table_sink) {
1127
0
            return Status::InternalError("Missing iceberg table sink.");
1128
0
        }
1129
1.72k
        if (thrift_sink.iceberg_table_sink.__isset.sort_info) {
1130
0
            _sink = std::make_shared<SpillIcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1131
0
                                                                     row_desc, output_exprs);
1132
1.72k
        } else {
1133
1.72k
            _sink = std::make_shared<IcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1134
1.72k
                                                                row_desc, output_exprs);
1135
1.72k
        }
1136
1.72k
        break;
1137
1.72k
    }
1138
20
    case TDataSinkType::ICEBERG_DELETE_SINK: {
1139
20
        if (!thrift_sink.__isset.iceberg_delete_sink) {
1140
0
            return Status::InternalError("Missing iceberg delete sink.");
1141
0
        }
1142
20
        _sink = std::make_shared<IcebergDeleteSinkOperatorX>(pool, next_sink_operator_id(),
1143
20
                                                             row_desc, output_exprs);
1144
20
        break;
1145
20
    }
1146
80
    case TDataSinkType::ICEBERG_MERGE_SINK: {
1147
80
        if (!thrift_sink.__isset.iceberg_merge_sink) {
1148
0
            return Status::InternalError("Missing iceberg merge sink.");
1149
0
        }
1150
80
        _sink = std::make_shared<IcebergMergeSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1151
80
                                                            output_exprs);
1152
80
        break;
1153
80
    }
1154
0
    case TDataSinkType::MAXCOMPUTE_TABLE_SINK: {
1155
0
        if (!thrift_sink.__isset.max_compute_table_sink) {
1156
0
            return Status::InternalError("Missing max compute table sink.");
1157
0
        }
1158
0
        _sink = std::make_shared<MCTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1159
0
                                                       output_exprs);
1160
0
        break;
1161
0
    }
1162
88
    case TDataSinkType::JDBC_TABLE_SINK: {
1163
88
        if (!thrift_sink.__isset.jdbc_table_sink) {
1164
0
            return Status::InternalError("Missing data jdbc sink.");
1165
0
        }
1166
88
        if (config::enable_java_support) {
1167
88
            _sink = std::make_shared<JdbcTableSinkOperatorX>(row_desc, next_sink_operator_id(),
1168
88
                                                             output_exprs);
1169
88
        } else {
1170
0
            return Status::InternalError(
1171
0
                    "Jdbc table sink is not enabled, you can change be config "
1172
0
                    "enable_java_support to true and restart be.");
1173
0
        }
1174
88
        break;
1175
88
    }
1176
88
    case TDataSinkType::MEMORY_SCRATCH_SINK: {
1177
0
        if (!thrift_sink.__isset.memory_scratch_sink) {
1178
0
            return Status::InternalError("Missing data buffer sink.");
1179
0
        }
1180
1181
0
        _sink = std::make_shared<MemoryScratchSinkOperatorX>(row_desc, next_sink_operator_id(),
1182
0
                                                             output_exprs);
1183
0
        break;
1184
0
    }
1185
164
    case TDataSinkType::RESULT_FILE_SINK: {
1186
164
        if (!thrift_sink.__isset.result_file_sink) {
1187
0
            return Status::InternalError("Missing result file sink.");
1188
0
        }
1189
1190
        // Result file sink is not the top sink
1191
164
        if (params.__isset.destinations && !params.destinations.empty()) {
1192
0
            _sink = std::make_shared<ResultFileSinkOperatorX>(
1193
0
                    next_sink_operator_id(), row_desc, thrift_sink.result_file_sink,
1194
0
                    params.destinations, output_exprs, desc_tbl);
1195
164
        } else {
1196
164
            _sink = std::make_shared<ResultFileSinkOperatorX>(next_sink_operator_id(), row_desc,
1197
164
                                                              output_exprs);
1198
164
        }
1199
164
        break;
1200
164
    }
1201
2.07k
    case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
1202
2.07k
        DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
1203
2.07k
        DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
1204
2.07k
        auto sink_id = next_sink_operator_id();
1205
2.07k
        const int multi_cast_node_id = sink_id;
1206
2.07k
        auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size();
1207
        // one sink has multiple sources.
1208
2.07k
        std::vector<int> sources;
1209
8.28k
        for (int i = 0; i < sender_size; ++i) {
1210
6.21k
            auto source_id = next_operator_id();
1211
6.21k
            sources.push_back(source_id);
1212
6.21k
        }
1213
1214
2.07k
        _sink = std::make_shared<MultiCastDataStreamSinkOperatorX>(
1215
2.07k
                sink_id, multi_cast_node_id, sources, pool, thrift_sink.multi_cast_stream_sink);
1216
8.28k
        for (int i = 0; i < sender_size; ++i) {
1217
6.21k
            auto new_pipeline = add_pipeline();
1218
            // use to exchange sink
1219
6.21k
            RowDescriptor* exchange_row_desc = nullptr;
1220
6.21k
            {
1221
6.21k
                const auto& tmp_row_desc =
1222
6.21k
                        !thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty()
1223
6.21k
                                ? RowDescriptor(state->desc_tbl(),
1224
6.21k
                                                {thrift_sink.multi_cast_stream_sink.sinks[i]
1225
6.21k
                                                         .output_tuple_id})
1226
6.21k
                                : row_desc;
1227
6.21k
                exchange_row_desc = pool->add(new RowDescriptor(tmp_row_desc));
1228
6.21k
            }
1229
6.21k
            auto source_id = sources[i];
1230
6.21k
            OperatorPtr source_op;
1231
            // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
1232
6.21k
            source_op = std::make_shared<MultiCastDataStreamerSourceOperatorX>(
1233
6.21k
                    /*node_id*/ source_id, /*consumer_id*/ i, pool,
1234
6.21k
                    thrift_sink.multi_cast_stream_sink.sinks[i], row_desc,
1235
6.21k
                    /*operator_id=*/source_id);
1236
6.21k
            RETURN_IF_ERROR(new_pipeline->add_operator(
1237
6.21k
                    source_op, params.__isset.parallel_instances ? params.parallel_instances : 0));
1238
            // 2. create and set sink operator of data stream sender for new pipeline
1239
1240
6.21k
            DataSinkOperatorPtr sink_op;
1241
6.21k
            sink_op = std::make_shared<ExchangeSinkOperatorX>(
1242
6.21k
                    state, *exchange_row_desc, next_sink_operator_id(),
1243
6.21k
                    thrift_sink.multi_cast_stream_sink.sinks[i],
1244
6.21k
                    thrift_sink.multi_cast_stream_sink.destinations[i], _fragment_instance_ids);
1245
1246
6.21k
            RETURN_IF_ERROR(new_pipeline->set_sink(sink_op));
1247
6.21k
            {
1248
6.21k
                TDataSink* t = pool->add(new TDataSink());
1249
6.21k
                t->stream_sink = thrift_sink.multi_cast_stream_sink.sinks[i];
1250
6.21k
                RETURN_IF_ERROR(sink_op->init(*t));
1251
6.21k
            }
1252
1253
            // 3. set dependency dag
1254
6.21k
            _dag[new_pipeline->id()].push_back(cur_pipeline_id);
1255
6.21k
        }
1256
2.07k
        if (sources.empty()) {
1257
0
            return Status::InternalError("size of sources must be greater than 0");
1258
0
        }
1259
2.07k
        break;
1260
2.07k
    }
1261
2.07k
    case TDataSinkType::BLACKHOLE_SINK: {
1262
10
        if (!thrift_sink.__isset.blackhole_sink) {
1263
0
            return Status::InternalError("Missing blackhole sink.");
1264
0
        }
1265
1266
10
        _sink.reset(new BlackholeSinkOperatorX(next_sink_operator_id()));
1267
10
        break;
1268
10
    }
1269
156
    case TDataSinkType::TVF_TABLE_SINK: {
1270
156
        if (!thrift_sink.__isset.tvf_table_sink) {
1271
0
            return Status::InternalError("Missing TVF table sink.");
1272
0
        }
1273
156
        _sink = std::make_shared<TVFTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1274
156
                                                        output_exprs);
1275
156
        break;
1276
156
    }
1277
0
    default:
1278
0
        return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type);
1279
123k
    }
1280
123k
    return Status::OK();
1281
123k
}
1282
1283
// NOLINTBEGIN(readability-function-size)
1284
// NOLINTBEGIN(readability-function-cognitive-complexity)
1285
Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNode& tnode,
1286
                                                 const DescriptorTbl& descs, OperatorPtr& op,
1287
                                                 PipelinePtr& cur_pipe, int parent_idx,
1288
                                                 int child_idx,
1289
                                                 const bool followed_by_shuffled_operator,
1290
                                                 const bool require_bucket_distribution,
1291
175k
                                                 OperatorPtr& cache_op) {
1292
175k
    std::vector<DataSinkOperatorPtr> sink_ops;
1293
175k
    Defer defer = Defer([&]() {
1294
175k
        if (op) {
1295
175k
            op->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1296
175k
        }
1297
175k
        for (auto& s : sink_ops) {
1298
43.2k
            s->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1299
43.2k
        }
1300
175k
    });
1301
    // We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
1302
    // Therefore, here we need to use a stack-like structure.
1303
175k
    _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
1304
175k
    std::stringstream error_msg;
1305
175k
    bool enable_query_cache = _params.fragment.__isset.query_cache_param;
1306
1307
175k
    bool fe_with_old_version = false;
1308
175k
    switch (tnode.node_type) {
1309
57.3k
    case TPlanNodeType::OLAP_SCAN_NODE: {
1310
57.3k
        op = std::make_shared<OlapScanOperatorX>(
1311
57.3k
                pool, tnode, next_operator_id(), descs, _num_instances,
1312
57.3k
                enable_query_cache ? _params.fragment.query_cache_param : TQueryCacheParam {});
1313
57.3k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1314
57.3k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1315
57.3k
        break;
1316
57.3k
    }
1317
0
    case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
1318
0
        DCHECK(_query_ctx != nullptr);
1319
0
        _query_ctx->query_mem_tracker()->is_group_commit_load = true;
1320
0
        op = std::make_shared<GroupCommitOperatorX>(pool, tnode, next_operator_id(), descs,
1321
0
                                                    _num_instances);
1322
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1323
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1324
0
        break;
1325
0
    }
1326
0
    case TPlanNodeType::JDBC_SCAN_NODE: {
1327
0
        if (config::enable_java_support) {
1328
0
            op = std::make_shared<JDBCScanOperatorX>(pool, tnode, next_operator_id(), descs,
1329
0
                                                     _num_instances);
1330
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1331
0
        } else {
1332
0
            return Status::InternalError(
1333
0
                    "Jdbc scan node is disabled, you can change be config enable_java_support "
1334
0
                    "to true and restart be.");
1335
0
        }
1336
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1337
0
        break;
1338
0
    }
1339
23.2k
    case TPlanNodeType::FILE_SCAN_NODE: {
1340
23.2k
        op = std::make_shared<FileScanOperatorX>(pool, tnode, next_operator_id(), descs,
1341
23.2k
                                                 _num_instances);
1342
23.2k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1343
23.2k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1344
23.2k
        break;
1345
23.2k
    }
1346
40.7k
    case TPlanNodeType::EXCHANGE_NODE: {
1347
40.7k
        int num_senders = _params.per_exch_num_senders.contains(tnode.node_id)
1348
40.7k
                                  ? _params.per_exch_num_senders.find(tnode.node_id)->second
1349
18.4E
                                  : 0;
1350
40.7k
        DCHECK_GT(num_senders, 0);
1351
40.7k
        op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs,
1352
40.7k
                                                       num_senders);
1353
40.7k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1354
40.7k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1355
40.7k
        break;
1356
40.7k
    }
1357
25.7k
    case TPlanNodeType::AGGREGATION_NODE: {
1358
25.7k
        if (tnode.agg_node.grouping_exprs.empty() &&
1359
25.7k
            descs.get_tuple_descriptor(tnode.agg_node.output_tuple_id)->slots().empty()) {
1360
0
            return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) +
1361
0
                                         ": group by and output is empty");
1362
0
        }
1363
25.7k
        bool need_create_cache_op =
1364
25.7k
                enable_query_cache && tnode.node_id == _params.fragment.query_cache_param.node_id;
1365
25.7k
        auto create_query_cache_operator = [&](PipelinePtr& new_pipe) {
1366
0
            auto cache_node_id = _params.local_params[0].per_node_scan_ranges.begin()->first;
1367
0
            auto cache_source_id = next_operator_id();
1368
0
            op = std::make_shared<CacheSourceOperatorX>(pool, cache_node_id, cache_source_id,
1369
0
                                                        _params.fragment.query_cache_param);
1370
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1371
1372
0
            const auto downstream_pipeline_id = cur_pipe->id();
1373
0
            if (!_dag.contains(downstream_pipeline_id)) {
1374
0
                _dag.insert({downstream_pipeline_id, {}});
1375
0
            }
1376
0
            new_pipe = add_pipeline(cur_pipe);
1377
0
            _dag[downstream_pipeline_id].push_back(new_pipe->id());
1378
1379
0
            DataSinkOperatorPtr cache_sink(new CacheSinkOperatorX(
1380
0
                    next_sink_operator_id(), op->node_id(), op->operator_id()));
1381
0
            RETURN_IF_ERROR(new_pipe->set_sink(cache_sink));
1382
0
            return Status::OK();
1383
0
        };
1384
25.7k
        const bool group_by_limit_opt =
1385
25.7k
                tnode.agg_node.__isset.agg_sort_info_by_group_key && tnode.limit > 0;
1386
1387
        /// PartitionedAggSourceOperatorX does not support "group by limit opt(#29641)" yet.
1388
        /// If `group_by_limit_opt` is true, then it might not need to spill at all.
1389
25.7k
        const bool enable_spill = _runtime_state->enable_spill() &&
1390
25.7k
                                  !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt;
1391
25.7k
        const bool is_streaming_agg = tnode.agg_node.__isset.use_streaming_preaggregation &&
1392
25.7k
                                      tnode.agg_node.use_streaming_preaggregation &&
1393
25.7k
                                      !tnode.agg_node.grouping_exprs.empty();
1394
        // TODO: distinct streaming agg does not support spill.
1395
25.7k
        const bool can_use_distinct_streaming_agg =
1396
25.7k
                (!enable_spill || is_streaming_agg) && tnode.agg_node.aggregate_functions.empty() &&
1397
25.7k
                !tnode.agg_node.__isset.agg_sort_info_by_group_key &&
1398
25.7k
                _params.query_options.__isset.enable_distinct_streaming_aggregation &&
1399
25.7k
                _params.query_options.enable_distinct_streaming_aggregation;
1400
1401
25.7k
        if (can_use_distinct_streaming_agg) {
1402
52
            if (need_create_cache_op) {
1403
0
                PipelinePtr new_pipe;
1404
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1405
1406
0
                cache_op = op;
1407
0
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1408
0
                                                                     tnode, descs);
1409
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1410
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1411
0
                cur_pipe = new_pipe;
1412
52
            } else {
1413
52
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1414
52
                                                                     tnode, descs);
1415
52
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1416
52
            }
1417
25.6k
        } else if (is_streaming_agg) {
1418
674
            if (need_create_cache_op) {
1419
0
                PipelinePtr new_pipe;
1420
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1421
0
                cache_op = op;
1422
0
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1423
0
                                                             descs);
1424
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1425
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1426
0
                cur_pipe = new_pipe;
1427
674
            } else {
1428
674
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1429
674
                                                             descs);
1430
674
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1431
674
            }
1432
25.0k
        } else {
1433
            // create new pipeline to add query cache operator
1434
25.0k
            PipelinePtr new_pipe;
1435
25.0k
            if (need_create_cache_op) {
1436
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1437
0
                cache_op = op;
1438
0
            }
1439
1440
25.0k
            if (enable_spill) {
1441
62
                op = std::make_shared<PartitionedAggSourceOperatorX>(pool, tnode,
1442
62
                                                                     next_operator_id(), descs);
1443
24.9k
            } else {
1444
24.9k
                op = std::make_shared<AggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1445
24.9k
            }
1446
25.0k
            if (need_create_cache_op) {
1447
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1448
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1449
0
                cur_pipe = new_pipe;
1450
25.0k
            } else {
1451
25.0k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1452
25.0k
            }
1453
1454
25.0k
            const auto downstream_pipeline_id = cur_pipe->id();
1455
25.0k
            if (!_dag.contains(downstream_pipeline_id)) {
1456
22.9k
                _dag.insert({downstream_pipeline_id, {}});
1457
22.9k
            }
1458
25.0k
            cur_pipe = add_pipeline(cur_pipe);
1459
25.0k
            _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1460
1461
25.0k
            if (enable_spill) {
1462
62
                sink_ops.push_back(std::make_shared<PartitionedAggSinkOperatorX>(
1463
62
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1464
24.9k
            } else {
1465
24.9k
                sink_ops.push_back(std::make_shared<AggSinkOperatorX>(
1466
24.9k
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1467
24.9k
            }
1468
25.0k
            RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1469
25.0k
            RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1470
25.0k
        }
1471
25.7k
        break;
1472
25.7k
    }
1473
25.7k
    case TPlanNodeType::BUCKETED_AGGREGATION_NODE: {
1474
0
        if (tnode.bucketed_agg_node.grouping_exprs.empty()) {
1475
0
            return Status::InternalError(
1476
0
                    "Bucketed aggregation node {} should not be used without group by keys",
1477
0
                    tnode.node_id);
1478
0
        }
1479
1480
        // Create source operator (goes on the current / downstream pipeline).
1481
0
        op = std::make_shared<BucketedAggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1482
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1483
1484
        // Create a new pipeline for the sink side.
1485
0
        const auto downstream_pipeline_id = cur_pipe->id();
1486
0
        if (!_dag.contains(downstream_pipeline_id)) {
1487
0
            _dag.insert({downstream_pipeline_id, {}});
1488
0
        }
1489
0
        cur_pipe = add_pipeline(cur_pipe);
1490
0
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1491
1492
        // Create sink operator.
1493
0
        sink_ops.push_back(std::make_shared<BucketedAggSinkOperatorX>(
1494
0
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1495
0
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1496
0
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1497
1498
        // Pre-register a single shared state for ALL instances so that every
1499
        // sink instance writes its per-instance hash table into the same
1500
        // BucketedAggSharedState and every source instance can merge across
1501
        // all of them.
1502
0
        {
1503
0
            auto shared_state = BucketedAggSharedState::create_shared();
1504
0
            shared_state->id = op->operator_id();
1505
0
            shared_state->related_op_ids.insert(op->operator_id());
1506
1507
0
            for (int i = 0; i < _num_instances; i++) {
1508
0
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1509
0
                                                             "BUCKETED_AGG_SINK_DEPENDENCY");
1510
0
                sink_dep->set_shared_state(shared_state.get());
1511
0
                shared_state->sink_deps.push_back(sink_dep);
1512
0
            }
1513
0
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1514
0
                                                     op->node_id(), "BUCKETED_AGG_SOURCE");
1515
0
            _op_id_to_shared_state.insert(
1516
0
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1517
0
        }
1518
0
        break;
1519
0
    }
1520
824
    case TPlanNodeType::HASH_JOIN_NODE: {
1521
824
        const auto is_broadcast_join = tnode.hash_join_node.__isset.is_broadcast_join &&
1522
824
                                       tnode.hash_join_node.is_broadcast_join;
1523
824
        const auto enable_spill = _runtime_state->enable_spill();
1524
824
        if (enable_spill && !is_broadcast_join) {
1525
0
            auto tnode_ = tnode;
1526
0
            tnode_.runtime_filters.clear();
1527
0
            auto inner_probe_operator =
1528
0
                    std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, descs);
1529
1530
            // probe side inner sink operator is used to build hash table on probe side when data is spilled.
1531
            // So here use `tnode_` which has no runtime filters.
1532
0
            auto probe_side_inner_sink_operator =
1533
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode_, descs);
1534
1535
0
            RETURN_IF_ERROR(inner_probe_operator->init(tnode_, _runtime_state.get()));
1536
0
            RETURN_IF_ERROR(probe_side_inner_sink_operator->init(tnode_, _runtime_state.get()));
1537
1538
0
            auto probe_operator = std::make_shared<PartitionedHashJoinProbeOperatorX>(
1539
0
                    pool, tnode_, next_operator_id(), descs);
1540
0
            probe_operator->set_inner_operators(probe_side_inner_sink_operator,
1541
0
                                                inner_probe_operator);
1542
0
            op = std::move(probe_operator);
1543
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1544
1545
0
            const auto downstream_pipeline_id = cur_pipe->id();
1546
0
            if (!_dag.contains(downstream_pipeline_id)) {
1547
0
                _dag.insert({downstream_pipeline_id, {}});
1548
0
            }
1549
0
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1550
0
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1551
1552
0
            auto inner_sink_operator =
1553
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode, descs);
1554
0
            auto sink_operator = std::make_shared<PartitionedHashJoinSinkOperatorX>(
1555
0
                    pool, next_sink_operator_id(), op->operator_id(), tnode_, descs);
1556
0
            RETURN_IF_ERROR(inner_sink_operator->init(tnode, _runtime_state.get()));
1557
1558
0
            sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator);
1559
0
            sink_ops.push_back(std::move(sink_operator));
1560
0
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1561
0
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode_, _runtime_state.get()));
1562
1563
0
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1564
0
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1565
824
        } else {
1566
824
            op = std::make_shared<HashJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1567
824
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1568
1569
824
            const auto downstream_pipeline_id = cur_pipe->id();
1570
824
            if (!_dag.contains(downstream_pipeline_id)) {
1571
812
                _dag.insert({downstream_pipeline_id, {}});
1572
812
            }
1573
824
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1574
824
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1575
1576
824
            sink_ops.push_back(std::make_shared<HashJoinBuildSinkOperatorX>(
1577
824
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1578
824
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1579
824
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1580
1581
824
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1582
824
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1583
824
        }
1584
824
        if (is_broadcast_join && _runtime_state->enable_share_hash_table_for_broadcast_join()) {
1585
748
            std::shared_ptr<HashJoinSharedState> shared_state =
1586
748
                    HashJoinSharedState::create_shared(_num_instances);
1587
6.53k
            for (int i = 0; i < _num_instances; i++) {
1588
5.78k
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1589
5.78k
                                                             "HASH_JOIN_BUILD_DEPENDENCY");
1590
5.78k
                sink_dep->set_shared_state(shared_state.get());
1591
5.78k
                shared_state->sink_deps.push_back(sink_dep);
1592
5.78k
            }
1593
748
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1594
748
                                                     op->node_id(), "HASH_JOIN_PROBE");
1595
748
            _op_id_to_shared_state.insert(
1596
748
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1597
748
        }
1598
824
        break;
1599
824
    }
1600
4.21k
    case TPlanNodeType::CROSS_JOIN_NODE: {
1601
4.21k
        op = std::make_shared<NestedLoopJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1602
4.21k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1603
1604
4.21k
        const auto downstream_pipeline_id = cur_pipe->id();
1605
4.21k
        if (!_dag.contains(downstream_pipeline_id)) {
1606
4.20k
            _dag.insert({downstream_pipeline_id, {}});
1607
4.20k
        }
1608
4.21k
        PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1609
4.21k
        _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1610
1611
4.21k
        sink_ops.push_back(std::make_shared<NestedLoopJoinBuildSinkOperatorX>(
1612
4.21k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1613
4.21k
        RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1614
4.21k
        RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1615
4.21k
        _pipeline_parent_map.push(op->node_id(), cur_pipe);
1616
4.21k
        _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1617
4.21k
        break;
1618
4.21k
    }
1619
4.40k
    case TPlanNodeType::UNION_NODE: {
1620
4.40k
        int child_count = tnode.num_children;
1621
4.40k
        op = std::make_shared<UnionSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1622
4.40k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1623
1624
4.40k
        const auto downstream_pipeline_id = cur_pipe->id();
1625
4.40k
        if (!_dag.contains(downstream_pipeline_id)) {
1626
4.35k
            _dag.insert({downstream_pipeline_id, {}});
1627
4.35k
        }
1628
4.81k
        for (int i = 0; i < child_count; i++) {
1629
408
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1630
408
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1631
408
            sink_ops.push_back(std::make_shared<UnionSinkOperatorX>(
1632
408
                    i, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1633
408
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1634
408
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1635
            // preset children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1636
408
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1637
408
        }
1638
4.40k
        break;
1639
4.40k
    }
1640
12.7k
    case TPlanNodeType::SORT_NODE: {
1641
12.7k
        const auto should_spill = _runtime_state->enable_spill() &&
1642
12.7k
                                  tnode.sort_node.algorithm == TSortAlgorithm::FULL_SORT;
1643
12.7k
        const bool use_local_merge =
1644
12.7k
                tnode.sort_node.__isset.use_local_merge && tnode.sort_node.use_local_merge;
1645
12.7k
        if (should_spill) {
1646
2
            op = std::make_shared<SpillSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1647
12.7k
        } else if (use_local_merge) {
1648
12.2k
            op = std::make_shared<LocalMergeSortSourceOperatorX>(pool, tnode, next_operator_id(),
1649
12.2k
                                                                 descs);
1650
12.2k
        } else {
1651
472
            op = std::make_shared<SortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1652
472
        }
1653
12.7k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1654
1655
12.7k
        const auto downstream_pipeline_id = cur_pipe->id();
1656
12.7k
        if (!_dag.contains(downstream_pipeline_id)) {
1657
12.7k
            _dag.insert({downstream_pipeline_id, {}});
1658
12.7k
        }
1659
12.7k
        cur_pipe = add_pipeline(cur_pipe);
1660
12.7k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1661
1662
12.7k
        if (should_spill) {
1663
2
            sink_ops.push_back(std::make_shared<SpillSortSinkOperatorX>(
1664
2
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1665
12.7k
        } else {
1666
12.7k
            sink_ops.push_back(std::make_shared<SortSinkOperatorX>(
1667
12.7k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1668
12.7k
        }
1669
12.7k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1670
12.7k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1671
12.7k
        break;
1672
12.7k
    }
1673
12.7k
    case TPlanNodeType::PARTITION_SORT_NODE: {
1674
0
        op = std::make_shared<PartitionSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1675
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1676
1677
0
        const auto downstream_pipeline_id = cur_pipe->id();
1678
0
        if (!_dag.contains(downstream_pipeline_id)) {
1679
0
            _dag.insert({downstream_pipeline_id, {}});
1680
0
        }
1681
0
        cur_pipe = add_pipeline(cur_pipe);
1682
0
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1683
1684
0
        sink_ops.push_back(std::make_shared<PartitionSortSinkOperatorX>(
1685
0
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1686
0
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1687
0
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1688
0
        break;
1689
0
    }
1690
44
    case TPlanNodeType::ANALYTIC_EVAL_NODE: {
1691
44
        op = std::make_shared<AnalyticSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1692
44
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1693
1694
44
        const auto downstream_pipeline_id = cur_pipe->id();
1695
44
        if (!_dag.contains(downstream_pipeline_id)) {
1696
44
            _dag.insert({downstream_pipeline_id, {}});
1697
44
        }
1698
44
        cur_pipe = add_pipeline(cur_pipe);
1699
44
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1700
1701
44
        sink_ops.push_back(std::make_shared<AnalyticSinkOperatorX>(
1702
44
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1703
44
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1704
44
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1705
44
        break;
1706
44
    }
1707
934
    case TPlanNodeType::MATERIALIZATION_NODE: {
1708
934
        op = std::make_shared<MaterializationOperator>(pool, tnode, next_operator_id(), descs);
1709
934
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1710
934
        break;
1711
934
    }
1712
934
    case TPlanNodeType::INTERSECT_NODE: {
1713
0
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(pool, tnode, descs, op,
1714
0
                                                                      cur_pipe, sink_ops));
1715
0
        break;
1716
0
    }
1717
0
    case TPlanNodeType::EXCEPT_NODE: {
1718
0
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(pool, tnode, descs, op,
1719
0
                                                                       cur_pipe, sink_ops));
1720
0
        break;
1721
0
    }
1722
0
    case TPlanNodeType::REPEAT_NODE: {
1723
0
        op = std::make_shared<RepeatOperatorX>(pool, tnode, next_operator_id(), descs);
1724
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1725
0
        break;
1726
0
    }
1727
4
    case TPlanNodeType::TABLE_FUNCTION_NODE: {
1728
4
        op = std::make_shared<TableFunctionOperatorX>(pool, tnode, next_operator_id(), descs);
1729
4
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1730
4
        break;
1731
4
    }
1732
200
    case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
1733
200
        op = std::make_shared<AssertNumRowsOperatorX>(pool, tnode, next_operator_id(), descs);
1734
200
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1735
200
        break;
1736
200
    }
1737
248
    case TPlanNodeType::EMPTY_SET_NODE: {
1738
248
        op = std::make_shared<EmptySetSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1739
248
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1740
248
        break;
1741
248
    }
1742
248
    case TPlanNodeType::DATA_GEN_SCAN_NODE: {
1743
194
        op = std::make_shared<DataGenSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1744
194
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1745
194
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1746
194
        break;
1747
194
    }
1748
602
    case TPlanNodeType::SCHEMA_SCAN_NODE: {
1749
602
        op = std::make_shared<SchemaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1750
602
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1751
602
        break;
1752
602
    }
1753
2.08k
    case TPlanNodeType::META_SCAN_NODE: {
1754
2.08k
        op = std::make_shared<MetaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1755
2.08k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1756
2.08k
        break;
1757
2.08k
    }
1758
2.08k
    case TPlanNodeType::SELECT_NODE: {
1759
2.07k
        op = std::make_shared<SelectOperatorX>(pool, tnode, next_operator_id(), descs);
1760
2.07k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1761
2.07k
        break;
1762
2.07k
    }
1763
2.07k
    case TPlanNodeType::REC_CTE_NODE: {
1764
0
        op = std::make_shared<RecCTESourceOperatorX>(pool, tnode, next_operator_id(), descs);
1765
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1766
1767
0
        const auto downstream_pipeline_id = cur_pipe->id();
1768
0
        if (!_dag.contains(downstream_pipeline_id)) {
1769
0
            _dag.insert({downstream_pipeline_id, {}});
1770
0
        }
1771
1772
0
        PipelinePtr anchor_side_pipe = add_pipeline(cur_pipe);
1773
0
        _dag[downstream_pipeline_id].push_back(anchor_side_pipe->id());
1774
1775
0
        DataSinkOperatorPtr anchor_sink;
1776
0
        anchor_sink = std::make_shared<RecCTEAnchorSinkOperatorX>(next_sink_operator_id(),
1777
0
                                                                  op->operator_id(), tnode, descs);
1778
0
        RETURN_IF_ERROR(anchor_side_pipe->set_sink(anchor_sink));
1779
0
        RETURN_IF_ERROR(anchor_side_pipe->sink()->init(tnode, _runtime_state.get()));
1780
0
        _pipeline_parent_map.push(op->node_id(), anchor_side_pipe);
1781
1782
0
        PipelinePtr rec_side_pipe = add_pipeline(cur_pipe);
1783
0
        _dag[downstream_pipeline_id].push_back(rec_side_pipe->id());
1784
1785
0
        DataSinkOperatorPtr rec_sink;
1786
0
        rec_sink = std::make_shared<RecCTESinkOperatorX>(next_sink_operator_id(), op->operator_id(),
1787
0
                                                         tnode, descs);
1788
0
        RETURN_IF_ERROR(rec_side_pipe->set_sink(rec_sink));
1789
0
        RETURN_IF_ERROR(rec_side_pipe->sink()->init(tnode, _runtime_state.get()));
1790
0
        _pipeline_parent_map.push(op->node_id(), rec_side_pipe);
1791
1792
0
        break;
1793
0
    }
1794
0
    case TPlanNodeType::REC_CTE_SCAN_NODE: {
1795
0
        op = std::make_shared<RecCTEScanOperatorX>(pool, tnode, next_operator_id(), descs);
1796
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1797
0
        break;
1798
0
    }
1799
0
    default:
1800
0
        return Status::InternalError("Unsupported exec type in pipeline: {}",
1801
0
                                     print_plan_node_type(tnode.node_type));
1802
175k
    }
1803
175k
    if (_params.__isset.parallel_instances && fe_with_old_version) {
1804
0
        cur_pipe->set_num_tasks(_params.parallel_instances);
1805
0
        op->set_serial_operator();
1806
0
    }
1807
1808
175k
    return Status::OK();
1809
175k
}
1810
// NOLINTEND(readability-function-cognitive-complexity)
1811
// NOLINTEND(readability-function-size)
1812
1813
template <bool is_intersect>
1814
Status PipelineFragmentContext::_build_operators_for_set_operation_node(
1815
        ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op,
1816
0
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1817
0
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1818
0
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1819
1820
0
    const auto downstream_pipeline_id = cur_pipe->id();
1821
0
    if (!_dag.contains(downstream_pipeline_id)) {
1822
0
        _dag.insert({downstream_pipeline_id, {}});
1823
0
    }
1824
1825
0
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1826
0
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1827
0
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1828
1829
0
        if (child_id == 0) {
1830
0
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1831
0
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1832
0
        } else {
1833
0
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1834
0
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1835
0
        }
1836
0
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1837
0
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1838
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1839
0
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1840
0
    }
1841
1842
0
    return Status::OK();
1843
0
}
Unexecuted instantiation: _ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb1EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Unexecuted instantiation: _ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb0EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
1844
1845
123k
Status PipelineFragmentContext::submit() {
1846
123k
    if (_submitted) {
1847
0
        return Status::InternalError("submitted");
1848
0
    }
1849
123k
    _submitted = true;
1850
1851
123k
    int submit_tasks = 0;
1852
123k
    Status st;
1853
123k
    auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
1854
269k
    for (auto& task : _tasks) {
1855
482k
        for (auto& t : task) {
1856
482k
            st = scheduler->submit(t.first);
1857
482k
            DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed",
1858
482k
                            { st = Status::Aborted("PipelineFragmentContext.submit.failed"); });
1859
482k
            if (!st) {
1860
0
                cancel(Status::InternalError("submit context to executor fail"));
1861
0
                std::lock_guard<std::mutex> l(_task_mutex);
1862
0
                _total_tasks = submit_tasks;
1863
0
                break;
1864
0
            }
1865
482k
            submit_tasks++;
1866
482k
        }
1867
269k
    }
1868
123k
    if (!st.ok()) {
1869
0
        bool need_remove = false;
1870
0
        {
1871
0
            std::lock_guard<std::mutex> l(_task_mutex);
1872
0
            if (_closed_tasks >= _total_tasks) {
1873
0
                need_remove = _close_fragment_instance();
1874
0
            }
1875
0
        }
1876
        // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
1877
0
        if (need_remove) {
1878
0
            _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
1879
0
        }
1880
0
        return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(),
1881
0
                                     BackendOptions::get_localhost());
1882
123k
    } else {
1883
123k
        return st;
1884
123k
    }
1885
123k
}
1886
1887
0
void PipelineFragmentContext::print_profile(const std::string& extra_info) {
1888
0
    if (_runtime_state->enable_profile()) {
1889
0
        std::stringstream ss;
1890
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
1891
0
            runtime_profile_ptr->pretty_print(&ss);
1892
0
        }
1893
1894
0
        if (_runtime_state->load_channel_profile()) {
1895
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
1896
0
        }
1897
1898
0
        auto profile_str =
1899
0
                fmt::format("Query {} fragment {} {}, profile, {}", print_id(this->_query_id),
1900
0
                            this->_fragment_id, extra_info, ss.str());
1901
0
        LOG_LONG_STRING(INFO, profile_str);
1902
0
    }
1903
0
}
1904
// If all pipeline tasks binded to the fragment instance are finished, then we could
1905
// close the fragment instance.
1906
// Returns true if the caller should call remove_pipeline_context() **after** releasing
1907
// _task_mutex. We must not call remove_pipeline_context() here because it acquires
1908
// _pipeline_map's shard lock, and this function is called while _task_mutex is held.
1909
// Acquiring _pipeline_map while holding _task_mutex creates an ABBA deadlock with
1910
// dump_pipeline_tasks(), which acquires _pipeline_map first and then _task_mutex
1911
// (via debug_string()).
1912
123k
bool PipelineFragmentContext::_close_fragment_instance() {
1913
123k
    if (_is_fragment_instance_closed) {
1914
0
        return false;
1915
0
    }
1916
123k
    Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
1917
123k
    _fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
1918
123k
    if (!_need_notify_close) {
1919
123k
        auto st = send_report(true);
1920
123k
        if (!st) {
1921
0
            LOG(WARNING) << fmt::format("Failed to send report for query {}, fragment {}: {}",
1922
0
                                        print_id(_query_id), _fragment_id, st.to_string());
1923
0
        }
1924
123k
    }
1925
    // Print profile content in info log is a tempoeray solution for stream load and external_connector.
1926
    // Since stream load does not have someting like coordinator on FE, so
1927
    // backend can not report profile to FE, ant its profile can not be shown
1928
    // in the same way with other query. So we print the profile content to info log.
1929
1930
123k
    if (_runtime_state->enable_profile() &&
1931
123k
        (_query_ctx->get_query_source() == QuerySource::STREAM_LOAD ||
1932
456
         _query_ctx->get_query_source() == QuerySource::EXTERNAL_CONNECTOR ||
1933
456
         _query_ctx->get_query_source() == QuerySource::GROUP_COMMIT_LOAD)) {
1934
0
        std::stringstream ss;
1935
        // Compute the _local_time_percent before pretty_print the runtime_profile
1936
        // Before add this operation, the print out like that:
1937
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 00.00%)
1938
        // After add the operation, the print out like that:
1939
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 82.53%)
1940
        // We can easily know the exec node execute time without child time consumed.
1941
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
1942
0
            runtime_profile_ptr->pretty_print(&ss);
1943
0
        }
1944
1945
0
        if (_runtime_state->load_channel_profile()) {
1946
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
1947
0
        }
1948
1949
0
        LOG_INFO("Query {} fragment {} profile:\n {}", print_id(_query_id), _fragment_id, ss.str());
1950
0
    }
1951
1952
123k
    if (_query_ctx->enable_profile()) {
1953
456
        _query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile(),
1954
456
                                         collect_realtime_load_channel_profile());
1955
456
    }
1956
1957
    // Return whether the caller needs to remove from the pipeline map.
1958
    // The caller must do this after releasing _task_mutex.
1959
123k
    return !_need_notify_close;
1960
123k
}
1961
1962
481k
void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
1963
    // If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here
1964
481k
    DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
1965
481k
    if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
1966
192k
        if (_dag.contains(pipeline_id)) {
1967
87.5k
            for (auto dep : _dag[pipeline_id]) {
1968
87.5k
                _pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id);
1969
87.5k
            }
1970
69.6k
        }
1971
192k
    }
1972
481k
    bool need_remove = false;
1973
481k
    {
1974
481k
        std::lock_guard<std::mutex> l(_task_mutex);
1975
481k
        ++_closed_tasks;
1976
        // Update query-level finished task progress in real time.
1977
481k
        _query_ctx->inc_finished_task_num();
1978
481k
        if (_closed_tasks >= _total_tasks) {
1979
123k
            need_remove = _close_fragment_instance();
1980
123k
        }
1981
481k
    }
1982
    // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
1983
481k
    if (need_remove) {
1984
123k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
1985
123k
    }
1986
481k
}
1987
1988
13.6k
std::string PipelineFragmentContext::get_load_error_url() {
1989
13.6k
    if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) {
1990
0
        return to_load_error_http_path(str);
1991
0
    }
1992
44.7k
    for (auto& tasks : _tasks) {
1993
56.2k
        for (auto& task : tasks) {
1994
56.2k
            if (const auto& str = task.second->get_error_log_file_path(); !str.empty()) {
1995
12
                return to_load_error_http_path(str);
1996
12
            }
1997
56.2k
        }
1998
44.7k
    }
1999
13.6k
    return "";
2000
13.6k
}
2001
2002
13.6k
std::string PipelineFragmentContext::get_first_error_msg() {
2003
13.6k
    if (const auto& str = _runtime_state->get_first_error_msg(); !str.empty()) {
2004
0
        return str;
2005
0
    }
2006
44.7k
    for (auto& tasks : _tasks) {
2007
56.2k
        for (auto& task : tasks) {
2008
56.2k
            if (const auto& str = task.second->get_first_error_msg(); !str.empty()) {
2009
12
                return str;
2010
12
            }
2011
56.2k
        }
2012
44.7k
    }
2013
13.6k
    return "";
2014
13.6k
}
2015
2016
0
std::string PipelineFragmentContext::_to_http_path(const std::string& file_name) const {
2017
0
    std::stringstream url;
2018
0
    url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
2019
0
        << "/api/_download_load?"
2020
0
        << "token=" << _exec_env->token() << "&file=" << file_name;
2021
0
    return url.str();
2022
0
}
2023
2024
12.7k
void PipelineFragmentContext::_coordinator_callback(const ReportStatusRequest& req) {
2025
12.7k
    DBUG_EXECUTE_IF("FragmentMgr::coordinator_callback.report_delay", {
2026
12.7k
        int random_seconds = req.status.is<ErrorCode::DATA_QUALITY_ERROR>() ? 8 : 2;
2027
12.7k
        LOG_INFO("sleep : ").tag("time", random_seconds).tag("query_id", print_id(req.query_id));
2028
12.7k
        std::this_thread::sleep_for(std::chrono::seconds(random_seconds));
2029
12.7k
        LOG_INFO("sleep done").tag("query_id", print_id(req.query_id));
2030
12.7k
    });
2031
2032
12.7k
    DCHECK(req.status.ok() || req.done); // if !status.ok() => done
2033
12.7k
    if (req.coord_addr.hostname == "external") {
2034
        // External query (flink/spark read tablets) not need to report to FE.
2035
0
        return;
2036
0
    }
2037
12.7k
    int callback_retries = 10;
2038
12.7k
    const int sleep_ms = 1000;
2039
12.7k
    Status exec_status = req.status;
2040
12.7k
    Status coord_status;
2041
12.7k
    std::unique_ptr<FrontendServiceConnection> coord = nullptr;
2042
12.7k
    do {
2043
12.7k
        coord = std::make_unique<FrontendServiceConnection>(_exec_env->frontend_client_cache(),
2044
12.7k
                                                            req.coord_addr, &coord_status);
2045
12.7k
        if (!coord_status.ok()) {
2046
0
            std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
2047
0
        }
2048
12.7k
    } while (!coord_status.ok() && callback_retries-- > 0);
2049
2050
12.7k
    if (!coord_status.ok()) {
2051
0
        UniqueId uid(req.query_id.hi, req.query_id.lo);
2052
0
        static_cast<void>(req.cancel_fn(Status::InternalError(
2053
0
                "query_id: {}, couldn't get a client for {}, reason is {}", uid.to_string(),
2054
0
                PrintThriftNetworkAddress(req.coord_addr), coord_status.to_string())));
2055
0
        return;
2056
0
    }
2057
2058
12.7k
    TReportExecStatusParams params;
2059
12.7k
    params.protocol_version = FrontendServiceVersion::V1;
2060
12.7k
    params.__set_query_id(req.query_id);
2061
12.7k
    params.__set_backend_num(req.backend_num);
2062
12.7k
    params.__set_fragment_instance_id(req.fragment_instance_id);
2063
12.7k
    params.__set_fragment_id(req.fragment_id);
2064
12.7k
    params.__set_status(exec_status.to_thrift());
2065
12.7k
    params.__set_done(req.done);
2066
12.7k
    params.__set_query_type(req.runtime_state->query_type());
2067
12.7k
    params.__isset.profile = false;
2068
2069
12.7k
    DCHECK(req.runtime_state != nullptr);
2070
2071
12.7k
    if (req.runtime_state->query_type() == TQueryType::LOAD) {
2072
11.7k
        params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
2073
11.7k
        params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
2074
11.7k
    } else {
2075
1.00k
        DCHECK(!req.runtime_states.empty());
2076
1.00k
        if (!req.runtime_state->output_files().empty()) {
2077
0
            params.__isset.delta_urls = true;
2078
0
            for (auto& it : req.runtime_state->output_files()) {
2079
0
                params.delta_urls.push_back(_to_http_path(it));
2080
0
            }
2081
0
        }
2082
1.00k
        if (!params.delta_urls.empty()) {
2083
0
            params.__isset.delta_urls = true;
2084
0
        }
2085
1.00k
    }
2086
2087
12.7k
    static std::string s_dpp_normal_all = "dpp.norm.ALL";
2088
12.7k
    static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
2089
12.7k
    static std::string s_unselected_rows = "unselected.rows";
2090
12.7k
    int64_t num_rows_load_success = 0;
2091
12.7k
    int64_t num_rows_load_filtered = 0;
2092
12.7k
    int64_t num_rows_load_unselected = 0;
2093
12.7k
    if (req.runtime_state->num_rows_load_total() > 0 ||
2094
12.7k
        req.runtime_state->num_rows_load_filtered() > 0 ||
2095
12.7k
        req.runtime_state->num_finished_range() > 0) {
2096
0
        params.__isset.load_counters = true;
2097
2098
0
        num_rows_load_success = req.runtime_state->num_rows_load_success();
2099
0
        num_rows_load_filtered = req.runtime_state->num_rows_load_filtered();
2100
0
        num_rows_load_unselected = req.runtime_state->num_rows_load_unselected();
2101
0
        params.__isset.fragment_instance_reports = true;
2102
0
        TFragmentInstanceReport t;
2103
0
        t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id());
2104
0
        t.__set_num_finished_range(cast_set<int>(req.runtime_state->num_finished_range()));
2105
0
        t.__set_loaded_rows(req.runtime_state->num_rows_load_total());
2106
0
        t.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
2107
0
        params.fragment_instance_reports.push_back(t);
2108
12.7k
    } else if (!req.runtime_states.empty()) {
2109
47.3k
        for (auto* rs : req.runtime_states) {
2110
47.3k
            if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 ||
2111
47.3k
                rs->num_finished_range() > 0) {
2112
6.95k
                params.__isset.load_counters = true;
2113
6.95k
                num_rows_load_success += rs->num_rows_load_success();
2114
6.95k
                num_rows_load_filtered += rs->num_rows_load_filtered();
2115
6.95k
                num_rows_load_unselected += rs->num_rows_load_unselected();
2116
6.95k
                params.__isset.fragment_instance_reports = true;
2117
6.95k
                TFragmentInstanceReport t;
2118
6.95k
                t.__set_fragment_instance_id(rs->fragment_instance_id());
2119
6.95k
                t.__set_num_finished_range(cast_set<int>(rs->num_finished_range()));
2120
6.95k
                t.__set_loaded_rows(rs->num_rows_load_total());
2121
6.95k
                t.__set_loaded_bytes(rs->num_bytes_load_total());
2122
6.95k
                params.fragment_instance_reports.push_back(t);
2123
6.95k
            }
2124
47.3k
        }
2125
12.7k
    }
2126
12.7k
    params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success));
2127
12.7k
    params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered));
2128
12.7k
    params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected));
2129
2130
12.7k
    if (!req.load_error_url.empty()) {
2131
12
        params.__set_tracking_url(req.load_error_url);
2132
12
    }
2133
12.7k
    if (!req.first_error_msg.empty()) {
2134
12
        params.__set_first_error_msg(req.first_error_msg);
2135
12
    }
2136
47.3k
    for (auto* rs : req.runtime_states) {
2137
47.3k
        if (rs->wal_id() > 0) {
2138
0
            params.__set_txn_id(rs->wal_id());
2139
0
            params.__set_label(rs->import_label());
2140
0
        }
2141
47.3k
    }
2142
12.7k
    if (!req.runtime_state->export_output_files().empty()) {
2143
0
        params.__isset.export_files = true;
2144
0
        params.export_files = req.runtime_state->export_output_files();
2145
12.7k
    } else if (!req.runtime_states.empty()) {
2146
47.3k
        for (auto* rs : req.runtime_states) {
2147
47.3k
            if (!rs->export_output_files().empty()) {
2148
0
                params.__isset.export_files = true;
2149
0
                params.export_files.insert(params.export_files.end(),
2150
0
                                           rs->export_output_files().begin(),
2151
0
                                           rs->export_output_files().end());
2152
0
            }
2153
47.3k
        }
2154
12.7k
    }
2155
12.7k
    if (auto tci = req.runtime_state->tablet_commit_infos(); !tci.empty()) {
2156
0
        params.__isset.commitInfos = true;
2157
0
        params.commitInfos.insert(params.commitInfos.end(), tci.begin(), tci.end());
2158
12.7k
    } else if (!req.runtime_states.empty()) {
2159
47.3k
        for (auto* rs : req.runtime_states) {
2160
47.3k
            if (auto rs_tci = rs->tablet_commit_infos(); !rs_tci.empty()) {
2161
3.00k
                params.__isset.commitInfos = true;
2162
3.00k
                params.commitInfos.insert(params.commitInfos.end(), rs_tci.begin(), rs_tci.end());
2163
3.00k
            }
2164
47.3k
        }
2165
12.7k
    }
2166
12.7k
    if (auto eti = req.runtime_state->error_tablet_infos(); !eti.empty()) {
2167
0
        params.__isset.errorTabletInfos = true;
2168
0
        params.errorTabletInfos.insert(params.errorTabletInfos.end(), eti.begin(), eti.end());
2169
12.7k
    } else if (!req.runtime_states.empty()) {
2170
47.3k
        for (auto* rs : req.runtime_states) {
2171
47.3k
            if (auto rs_eti = rs->error_tablet_infos(); !rs_eti.empty()) {
2172
0
                params.__isset.errorTabletInfos = true;
2173
0
                params.errorTabletInfos.insert(params.errorTabletInfos.end(), rs_eti.begin(),
2174
0
                                               rs_eti.end());
2175
0
            }
2176
47.3k
        }
2177
12.7k
    }
2178
12.7k
    if (auto hpu = req.runtime_state->hive_partition_updates(); !hpu.empty()) {
2179
0
        params.__isset.hive_partition_updates = true;
2180
0
        params.hive_partition_updates.insert(params.hive_partition_updates.end(), hpu.begin(),
2181
0
                                             hpu.end());
2182
12.7k
    } else if (!req.runtime_states.empty()) {
2183
47.3k
        for (auto* rs : req.runtime_states) {
2184
47.3k
            if (auto rs_hpu = rs->hive_partition_updates(); !rs_hpu.empty()) {
2185
2.13k
                params.__isset.hive_partition_updates = true;
2186
2.13k
                params.hive_partition_updates.insert(params.hive_partition_updates.end(),
2187
2.13k
                                                     rs_hpu.begin(), rs_hpu.end());
2188
2.13k
            }
2189
47.3k
        }
2190
12.7k
    }
2191
12.7k
    if (auto icd = req.runtime_state->iceberg_commit_datas(); !icd.empty()) {
2192
0
        params.__isset.iceberg_commit_datas = true;
2193
0
        params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(), icd.begin(),
2194
0
                                           icd.end());
2195
12.7k
    } else if (!req.runtime_states.empty()) {
2196
47.3k
        for (auto* rs : req.runtime_states) {
2197
47.3k
            if (auto rs_icd = rs->iceberg_commit_datas(); !rs_icd.empty()) {
2198
2.06k
                params.__isset.iceberg_commit_datas = true;
2199
2.06k
                params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(),
2200
2.06k
                                                   rs_icd.begin(), rs_icd.end());
2201
2.06k
            }
2202
47.3k
        }
2203
12.7k
    }
2204
2205
12.7k
    if (auto mcd = req.runtime_state->mc_commit_datas(); !mcd.empty()) {
2206
0
        params.__isset.mc_commit_datas = true;
2207
0
        params.mc_commit_datas.insert(params.mc_commit_datas.end(), mcd.begin(), mcd.end());
2208
12.7k
    } else if (!req.runtime_states.empty()) {
2209
47.3k
        for (auto* rs : req.runtime_states) {
2210
47.3k
            if (auto rs_mcd = rs->mc_commit_datas(); !rs_mcd.empty()) {
2211
0
                params.__isset.mc_commit_datas = true;
2212
0
                params.mc_commit_datas.insert(params.mc_commit_datas.end(), rs_mcd.begin(),
2213
0
                                              rs_mcd.end());
2214
0
            }
2215
47.3k
        }
2216
12.7k
    }
2217
2218
12.7k
    req.runtime_state->get_unreported_errors(&(params.error_log));
2219
12.7k
    params.__isset.error_log = (!params.error_log.empty());
2220
2221
12.7k
    if (_exec_env->cluster_info()->backend_id != 0) {
2222
12.7k
        params.__set_backend_id(_exec_env->cluster_info()->backend_id);
2223
12.7k
    }
2224
2225
12.7k
    TReportExecStatusResult res;
2226
12.7k
    Status rpc_status;
2227
2228
12.7k
    VLOG_DEBUG << "reportExecStatus params is "
2229
0
               << apache::thrift::ThriftDebugString(params).c_str();
2230
12.7k
    if (!exec_status.ok()) {
2231
144
        LOG(WARNING) << "report error status: " << exec_status.msg()
2232
144
                     << " to coordinator: " << req.coord_addr
2233
144
                     << ", query id: " << print_id(req.query_id);
2234
144
    }
2235
12.7k
    try {
2236
12.7k
        try {
2237
12.7k
            (*coord)->reportExecStatus(res, params);
2238
12.7k
        } catch ([[maybe_unused]] apache::thrift::transport::TTransportException& e) {
2239
#ifndef ADDRESS_SANITIZER
2240
            LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(req.query_id)
2241
                         << ", instance id: " << print_id(req.fragment_instance_id) << " to "
2242
                         << req.coord_addr << ", err: " << e.what();
2243
#endif
2244
0
            rpc_status = coord->reopen();
2245
2246
0
            if (!rpc_status.ok()) {
2247
0
                req.cancel_fn(rpc_status);
2248
0
                return;
2249
0
            }
2250
0
            (*coord)->reportExecStatus(res, params);
2251
0
        }
2252
2253
12.7k
        rpc_status = Status::create<false>(res.status);
2254
12.7k
    } catch (apache::thrift::TException& e) {
2255
0
        rpc_status = Status::InternalError("ReportExecStatus() to {} failed: {}",
2256
0
                                           PrintThriftNetworkAddress(req.coord_addr), e.what());
2257
0
    }
2258
2259
12.7k
    if (!rpc_status.ok()) {
2260
0
        LOG_INFO("Going to cancel query {} since report exec status got rpc failed: {}",
2261
0
                 print_id(req.query_id), rpc_status.to_string());
2262
0
        req.cancel_fn(rpc_status);
2263
0
    }
2264
12.7k
}
2265
2266
124k
Status PipelineFragmentContext::send_report(bool done) {
2267
124k
    Status exec_status = _query_ctx->exec_status();
2268
    // If plan is done successfully, but _is_report_success is false,
2269
    // no need to send report.
2270
    // Load will set _is_report_success to true because load wants to know
2271
    // the process.
2272
124k
    if (!_is_report_success && done && exec_status.ok()) {
2273
111k
        return Status::OK();
2274
111k
    }
2275
2276
    // If both _is_report_success and _is_report_on_cancel are false,
2277
    // which means no matter query is success or failed, no report is needed.
2278
    // This may happen when the query limit reached and
2279
    // a internal cancellation being processed
2280
    // When limit is reached the fragment is also cancelled, but _is_report_on_cancel will
2281
    // be set to false, to avoid sending fault report to FE.
2282
12.9k
    if (!_is_report_success && !_is_report_on_cancel) {
2283
202
        if (done) {
2284
            // if done is true, which means the query is finished successfully, we can safely close the fragment instance without sending report to FE, and just return OK status here.
2285
202
            return Status::OK();
2286
202
        }
2287
0
        return Status::NeedSendAgain("");
2288
202
    }
2289
2290
12.7k
    std::vector<RuntimeState*> runtime_states;
2291
2292
40.9k
    for (auto& tasks : _tasks) {
2293
47.3k
        for (auto& task : tasks) {
2294
47.3k
            runtime_states.push_back(task.second.get());
2295
47.3k
        }
2296
40.9k
    }
2297
2298
12.7k
    std::string load_eror_url = _query_ctx->get_load_error_url().empty()
2299
12.7k
                                        ? get_load_error_url()
2300
12.7k
                                        : _query_ctx->get_load_error_url();
2301
12.7k
    std::string first_error_msg = _query_ctx->get_first_error_msg().empty()
2302
12.7k
                                          ? get_first_error_msg()
2303
12.7k
                                          : _query_ctx->get_first_error_msg();
2304
2305
12.7k
    ReportStatusRequest req {.status = exec_status,
2306
12.7k
                             .runtime_states = runtime_states,
2307
12.7k
                             .done = done || !exec_status.ok(),
2308
12.7k
                             .coord_addr = _query_ctx->coord_addr,
2309
12.7k
                             .query_id = _query_id,
2310
12.7k
                             .fragment_id = _fragment_id,
2311
12.7k
                             .fragment_instance_id = TUniqueId(),
2312
12.7k
                             .backend_num = -1,
2313
12.7k
                             .runtime_state = _runtime_state.get(),
2314
12.7k
                             .load_error_url = load_eror_url,
2315
12.7k
                             .first_error_msg = first_error_msg,
2316
12.7k
                             .cancel_fn = [this](const Status& reason) { cancel(reason); }};
2317
12.7k
    auto ctx = std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this());
2318
12.7k
    return _exec_env->fragment_mgr()->get_thread_pool()->submit_func([this, req, ctx]() {
2319
12.7k
        SCOPED_ATTACH_TASK(ctx->get_query_ctx()->query_mem_tracker());
2320
12.7k
        _coordinator_callback(req);
2321
12.7k
        if (!req.done) {
2322
1.34k
            ctx->refresh_next_report_time();
2323
1.34k
        }
2324
12.7k
    });
2325
12.9k
}
2326
2327
0
size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const {
2328
0
    size_t res = 0;
2329
    // _tasks will be cleared during ~PipelineFragmentContext, so that it's safe
2330
    // here to traverse the vector.
2331
0
    for (const auto& task_instances : _tasks) {
2332
0
        for (const auto& task : task_instances) {
2333
0
            if (task.first->is_running()) {
2334
0
                LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id)
2335
0
                                      << " is running, task: " << (void*)task.first.get()
2336
0
                                      << ", is_running: " << task.first->is_running();
2337
0
                *has_running_task = true;
2338
0
                return 0;
2339
0
            }
2340
2341
0
            size_t revocable_size = task.first->get_revocable_size();
2342
0
            if (revocable_size >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2343
0
                res += revocable_size;
2344
0
            }
2345
0
        }
2346
0
    }
2347
0
    return res;
2348
0
}
2349
2350
0
std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const {
2351
0
    std::vector<PipelineTask*> revocable_tasks;
2352
0
    for (const auto& task_instances : _tasks) {
2353
0
        for (const auto& task : task_instances) {
2354
0
            size_t revocable_size_ = task.first->get_revocable_size();
2355
2356
0
            if (revocable_size_ >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2357
0
                revocable_tasks.emplace_back(task.first.get());
2358
0
            }
2359
0
        }
2360
0
    }
2361
0
    return revocable_tasks;
2362
0
}
2363
2364
0
std::string PipelineFragmentContext::debug_string() {
2365
0
    std::lock_guard<std::mutex> l(_task_mutex);
2366
0
    fmt::memory_buffer debug_string_buffer;
2367
0
    fmt::format_to(debug_string_buffer,
2368
0
                   "PipelineFragmentContext Info: _closed_tasks={}, _total_tasks={}, "
2369
0
                   "need_notify_close={}, fragment_id={}, _rec_cte_stage={}\n",
2370
0
                   _closed_tasks, _total_tasks, _need_notify_close, _fragment_id, _rec_cte_stage);
2371
0
    for (size_t j = 0; j < _tasks.size(); j++) {
2372
0
        fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
2373
0
        for (size_t i = 0; i < _tasks[j].size(); i++) {
2374
0
            fmt::format_to(debug_string_buffer, "Task {}: {}\n", i,
2375
0
                           _tasks[j][i].first->debug_string());
2376
0
        }
2377
0
    }
2378
2379
0
    return fmt::to_string(debug_string_buffer);
2380
0
}
2381
2382
std::vector<std::shared_ptr<TRuntimeProfileTree>>
2383
456
PipelineFragmentContext::collect_realtime_profile() const {
2384
456
    std::vector<std::shared_ptr<TRuntimeProfileTree>> res;
2385
2386
    // we do not have mutex to protect pipeline_id_to_profile
2387
    // so we need to make sure this funciton is invoked after fragment context
2388
    // has already been prepared.
2389
456
    if (!_prepared) {
2390
0
        std::string msg =
2391
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2392
0
        DCHECK(false) << msg;
2393
0
        LOG_ERROR(msg);
2394
0
        return res;
2395
0
    }
2396
2397
    // Make sure first profile is fragment level profile
2398
456
    auto fragment_profile = std::make_shared<TRuntimeProfileTree>();
2399
456
    _fragment_level_profile->to_thrift(fragment_profile.get(), _runtime_state->profile_level());
2400
456
    res.push_back(fragment_profile);
2401
2402
    // pipeline_id_to_profile is initialized in prepare stage
2403
664
    for (auto pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
2404
664
        auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
2405
664
        pipeline_profile->to_thrift(profile_ptr.get(), _runtime_state->profile_level());
2406
664
        res.push_back(profile_ptr);
2407
664
    }
2408
2409
456
    return res;
2410
456
}
2411
2412
std::shared_ptr<TRuntimeProfileTree>
2413
456
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
2414
    // we do not have mutex to protect pipeline_id_to_profile
2415
    // so we need to make sure this funciton is invoked after fragment context
2416
    // has already been prepared.
2417
456
    if (!_prepared) {
2418
0
        std::string msg =
2419
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2420
0
        DCHECK(false) << msg;
2421
0
        LOG_ERROR(msg);
2422
0
        return nullptr;
2423
0
    }
2424
2425
624
    for (const auto& tasks : _tasks) {
2426
1.08k
        for (const auto& task : tasks) {
2427
1.08k
            if (task.second->load_channel_profile() == nullptr) {
2428
0
                continue;
2429
0
            }
2430
2431
1.08k
            auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2432
2433
1.08k
            task.second->load_channel_profile()->to_thrift(tmp_load_channel_profile.get(),
2434
1.08k
                                                           _runtime_state->profile_level());
2435
1.08k
            _runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
2436
1.08k
        }
2437
624
    }
2438
2439
456
    auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2440
456
    _runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get(),
2441
456
                                                      _runtime_state->profile_level());
2442
456
    return load_channel_profile;
2443
456
}
2444
2445
// Collect runtime filter IDs registered by all tasks in this PFC.
2446
// Used during recursive CTE stage transitions to know which filters to deregister
2447
// before creating the new PFC for the next recursion round.
2448
// Called from rerun_fragment(wait_for_destroy) while tasks are still closing.
2449
// Thread safety: safe because _tasks is structurally immutable after prepare() —
2450
// the vector sizes do not change, and individual RuntimeState filter sets are
2451
// written only during open() which has completed by the time we reach rerun.
2452
0
std::set<int> PipelineFragmentContext::get_deregister_runtime_filter() const {
2453
0
    std::set<int> result;
2454
0
    for (const auto& _task : _tasks) {
2455
0
        for (const auto& task : _task) {
2456
0
            auto set = task.first->runtime_state()->get_deregister_runtime_filter();
2457
0
            result.merge(set);
2458
0
        }
2459
0
    }
2460
0
    if (_runtime_state) {
2461
0
        auto set = _runtime_state->get_deregister_runtime_filter();
2462
0
        result.merge(set);
2463
0
    }
2464
0
    return result;
2465
0
}
2466
2467
123k
void PipelineFragmentContext::_release_resource() {
2468
123k
    std::lock_guard<std::mutex> l(_task_mutex);
2469
    // The memory released by the query end is recorded in the query mem tracker.
2470
123k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
2471
123k
    auto st = _query_ctx->exec_status();
2472
269k
    for (auto& _task : _tasks) {
2473
269k
        if (!_task.empty()) {
2474
269k
            _call_back(_task.front().first->runtime_state(), &st);
2475
269k
        }
2476
269k
    }
2477
123k
    _tasks.clear();
2478
123k
    _dag.clear();
2479
123k
    _pip_id_to_pipeline.clear();
2480
123k
    _pipelines.clear();
2481
123k
    _sink.reset();
2482
123k
    _root_op.reset();
2483
123k
    _runtime_filter_mgr_map.clear();
2484
123k
    _op_id_to_shared_state.clear();
2485
123k
}
2486
2487
} // namespace doris