Coverage Report

Created: 2026-04-17 11:34

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/pipeline/pipeline_fragment_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/pipeline/pipeline_fragment_context.h"
19
20
#include <gen_cpp/DataSinks_types.h>
21
#include <gen_cpp/FrontendService.h>
22
#include <gen_cpp/FrontendService_types.h>
23
#include <gen_cpp/PaloInternalService_types.h>
24
#include <gen_cpp/PlanNodes_types.h>
25
#include <pthread.h>
26
27
#include <algorithm>
28
#include <cstdlib>
29
// IWYU pragma: no_include <bits/chrono.h>
30
#include <fmt/format.h>
31
#include <thrift/Thrift.h>
32
#include <thrift/protocol/TDebugProtocol.h>
33
#include <thrift/transport/TTransportException.h>
34
35
#include <chrono> // IWYU pragma: keep
36
#include <map>
37
#include <memory>
38
#include <ostream>
39
#include <utility>
40
41
#include "cloud/config.h"
42
#include "common/cast_set.h"
43
#include "common/config.h"
44
#include "common/exception.h"
45
#include "common/logging.h"
46
#include "common/status.h"
47
#include "exec/exchange/local_exchange_sink_operator.h"
48
#include "exec/exchange/local_exchange_source_operator.h"
49
#include "exec/exchange/local_exchanger.h"
50
#include "exec/exchange/vdata_stream_mgr.h"
51
#include "exec/operator/aggregation_sink_operator.h"
52
#include "exec/operator/aggregation_source_operator.h"
53
#include "exec/operator/analytic_sink_operator.h"
54
#include "exec/operator/analytic_source_operator.h"
55
#include "exec/operator/assert_num_rows_operator.h"
56
#include "exec/operator/blackhole_sink_operator.h"
57
#include "exec/operator/bucketed_aggregation_sink_operator.h"
58
#include "exec/operator/bucketed_aggregation_source_operator.h"
59
#include "exec/operator/cache_sink_operator.h"
60
#include "exec/operator/cache_source_operator.h"
61
#include "exec/operator/datagen_operator.h"
62
#include "exec/operator/dict_sink_operator.h"
63
#include "exec/operator/distinct_streaming_aggregation_operator.h"
64
#include "exec/operator/empty_set_operator.h"
65
#include "exec/operator/es_scan_operator.h"
66
#include "exec/operator/exchange_sink_operator.h"
67
#include "exec/operator/exchange_source_operator.h"
68
#include "exec/operator/file_scan_operator.h"
69
#include "exec/operator/group_commit_block_sink_operator.h"
70
#include "exec/operator/group_commit_scan_operator.h"
71
#include "exec/operator/hashjoin_build_sink.h"
72
#include "exec/operator/hashjoin_probe_operator.h"
73
#include "exec/operator/hive_table_sink_operator.h"
74
#include "exec/operator/iceberg_delete_sink_operator.h"
75
#include "exec/operator/iceberg_merge_sink_operator.h"
76
#include "exec/operator/iceberg_table_sink_operator.h"
77
#include "exec/operator/jdbc_scan_operator.h"
78
#include "exec/operator/jdbc_table_sink_operator.h"
79
#include "exec/operator/local_merge_sort_source_operator.h"
80
#include "exec/operator/materialization_opertor.h"
81
#include "exec/operator/maxcompute_table_sink_operator.h"
82
#include "exec/operator/memory_scratch_sink_operator.h"
83
#include "exec/operator/meta_scan_operator.h"
84
#include "exec/operator/multi_cast_data_stream_sink.h"
85
#include "exec/operator/multi_cast_data_stream_source.h"
86
#include "exec/operator/nested_loop_join_build_operator.h"
87
#include "exec/operator/nested_loop_join_probe_operator.h"
88
#include "exec/operator/olap_scan_operator.h"
89
#include "exec/operator/olap_table_sink_operator.h"
90
#include "exec/operator/olap_table_sink_v2_operator.h"
91
#include "exec/operator/partition_sort_sink_operator.h"
92
#include "exec/operator/partition_sort_source_operator.h"
93
#include "exec/operator/partitioned_aggregation_sink_operator.h"
94
#include "exec/operator/partitioned_aggregation_source_operator.h"
95
#include "exec/operator/partitioned_hash_join_probe_operator.h"
96
#include "exec/operator/partitioned_hash_join_sink_operator.h"
97
#include "exec/operator/rec_cte_anchor_sink_operator.h"
98
#include "exec/operator/rec_cte_scan_operator.h"
99
#include "exec/operator/rec_cte_sink_operator.h"
100
#include "exec/operator/rec_cte_source_operator.h"
101
#include "exec/operator/repeat_operator.h"
102
#include "exec/operator/result_file_sink_operator.h"
103
#include "exec/operator/result_sink_operator.h"
104
#include "exec/operator/schema_scan_operator.h"
105
#include "exec/operator/select_operator.h"
106
#include "exec/operator/set_probe_sink_operator.h"
107
#include "exec/operator/set_sink_operator.h"
108
#include "exec/operator/set_source_operator.h"
109
#include "exec/operator/sort_sink_operator.h"
110
#include "exec/operator/sort_source_operator.h"
111
#include "exec/operator/spill_iceberg_table_sink_operator.h"
112
#include "exec/operator/spill_sort_sink_operator.h"
113
#include "exec/operator/spill_sort_source_operator.h"
114
#include "exec/operator/streaming_aggregation_operator.h"
115
#include "exec/operator/table_function_operator.h"
116
#include "exec/operator/tvf_table_sink_operator.h"
117
#include "exec/operator/union_sink_operator.h"
118
#include "exec/operator/union_source_operator.h"
119
#include "exec/pipeline/dependency.h"
120
#include "exec/pipeline/pipeline_task.h"
121
#include "exec/pipeline/task_scheduler.h"
122
#include "exec/runtime_filter/runtime_filter_mgr.h"
123
#include "exec/sort/topn_sorter.h"
124
#include "exec/spill/spill_file.h"
125
#include "io/fs/stream_load_pipe.h"
126
#include "load/stream_load/new_load_stream_mgr.h"
127
#include "runtime/exec_env.h"
128
#include "runtime/fragment_mgr.h"
129
#include "runtime/result_buffer_mgr.h"
130
#include "runtime/runtime_state.h"
131
#include "runtime/thread_context.h"
132
#include "service/backend_options.h"
133
#include "util/client_cache.h"
134
#include "util/countdown_latch.h"
135
#include "util/debug_util.h"
136
#include "util/network_util.h"
137
#include "util/uid_util.h"
138
139
namespace doris {
140
PipelineFragmentContext::PipelineFragmentContext(
141
        TUniqueId query_id, const TPipelineFragmentParams& request,
142
        std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
143
        const std::function<void(RuntimeState*, Status*)>& call_back)
144
55.1k
        : _query_id(std::move(query_id)),
145
55.1k
          _fragment_id(request.fragment_id),
146
55.1k
          _exec_env(exec_env),
147
55.1k
          _query_ctx(std::move(query_ctx)),
148
55.1k
          _call_back(call_back),
149
55.1k
          _is_report_on_cancel(true),
150
55.1k
          _params(request),
151
55.1k
          _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0),
152
55.1k
          _need_notify_close(request.__isset.need_notify_close ? request.need_notify_close
153
55.1k
                                                               : false) {
154
55.1k
    _fragment_watcher.start();
155
55.1k
}
156
157
55.1k
PipelineFragmentContext::~PipelineFragmentContext() {
158
55.1k
    LOG_INFO("PipelineFragmentContext::~PipelineFragmentContext")
159
55.1k
            .tag("query_id", print_id(_query_id))
160
55.1k
            .tag("fragment_id", _fragment_id);
161
55.1k
    _release_resource();
162
55.1k
    {
163
        // The memory released by the query end is recorded in the query mem tracker.
164
55.1k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
165
55.1k
        _runtime_state.reset();
166
55.1k
        _query_ctx.reset();
167
55.1k
    }
168
55.1k
}
169
170
0
bool PipelineFragmentContext::is_timeout(timespec now) const {
171
0
    if (_timeout <= 0) {
172
0
        return false;
173
0
    }
174
0
    return _fragment_watcher.elapsed_time_seconds(now) > _timeout;
175
0
}
176
177
// notify_close() transitions the PFC from "waiting for external close notification" to
178
// "self-managed close". For recursive CTE fragments, the old PFC is kept alive until
179
// the rerun_fragment(wait_for_destroy) RPC calls this to trigger shutdown.
180
// Returns true if all tasks have already closed (i.e., the PFC can be safely destroyed).
181
417
bool PipelineFragmentContext::notify_close() {
182
417
    bool all_closed = false;
183
417
    bool need_remove = false;
184
417
    {
185
417
        std::lock_guard<std::mutex> l(_task_mutex);
186
417
        if (_closed_tasks >= _total_tasks) {
187
9
            if (_need_notify_close) {
188
                // Fragment was cancelled and waiting for notify to close.
189
                // Record that we need to remove from fragment mgr, but do it
190
                // after releasing _task_mutex to avoid ABBA deadlock with
191
                // dump_pipeline_tasks() (which acquires _pipeline_map lock
192
                // first, then _task_mutex via debug_string()).
193
0
                need_remove = true;
194
0
            }
195
9
            all_closed = true;
196
9
        }
197
        // make fragment release by self after cancel
198
417
        _need_notify_close = false;
199
417
    }
200
417
    if (need_remove) {
201
0
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
202
0
    }
203
417
    return all_closed;
204
417
}
205
206
// Must not add lock in this method. Because it will call query ctx cancel. And
207
// QueryCtx cancel will call fragment ctx cancel. And Also Fragment ctx's running
208
// Method like exchange sink buffer will call query ctx cancel. If we add lock here
209
// There maybe dead lock.
210
417
void PipelineFragmentContext::cancel(const Status reason) {
211
417
    LOG_INFO("PipelineFragmentContext::cancel")
212
417
            .tag("query_id", print_id(_query_id))
213
417
            .tag("fragment_id", _fragment_id)
214
417
            .tag("reason", reason.to_string());
215
417
    if (notify_close()) {
216
9
        return;
217
9
    }
218
    // Timeout is a special error code, we need print current stack to debug timeout issue.
219
408
    if (reason.is<ErrorCode::TIMEOUT>()) {
220
0
        auto dbg_str = fmt::format("PipelineFragmentContext is cancelled due to timeout:\n{}",
221
0
                                   debug_string());
222
0
        LOG_LONG_STRING(WARNING, dbg_str);
223
0
    }
224
225
    // `ILLEGAL_STATE` means queries this fragment belongs to was not found in FE (maybe finished)
226
408
    if (reason.is<ErrorCode::ILLEGAL_STATE>()) {
227
0
        LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}",
228
0
                    debug_string());
229
0
    }
230
231
408
    if (reason.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || reason.is<ErrorCode::MEM_ALLOC_FAILED>()) {
232
21
        print_profile("cancel pipeline, reason: " + reason.to_string());
233
21
    }
234
235
408
    if (auto error_url = get_load_error_url(); !error_url.empty()) {
236
0
        _query_ctx->set_load_error_url(error_url);
237
0
    }
238
239
408
    if (auto first_error_msg = get_first_error_msg(); !first_error_msg.empty()) {
240
0
        _query_ctx->set_first_error_msg(first_error_msg);
241
0
    }
242
243
408
    _query_ctx->cancel(reason, _fragment_id);
244
408
    if (reason.is<ErrorCode::LIMIT_REACH>()) {
245
102
        _is_report_on_cancel = false;
246
306
    } else {
247
1.78k
        for (auto& id : _fragment_instance_ids) {
248
1.78k
            LOG(WARNING) << "PipelineFragmentContext cancel instance: " << print_id(id);
249
1.78k
        }
250
306
    }
251
    // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
252
    // For stream load the fragment's query_id == load id, it is set in FE.
253
408
    auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
254
408
    if (stream_load_ctx != nullptr) {
255
0
        stream_load_ctx->pipe->cancel(reason.to_string());
256
        // Set error URL here because after pipe is cancelled, stream load execution may return early.
257
        // We need to set the error URL at this point to ensure error information is properly
258
        // propagated to the client.
259
0
        stream_load_ctx->error_url = get_load_error_url();
260
0
        stream_load_ctx->first_error_msg = get_first_error_msg();
261
0
    }
262
263
1.92k
    for (auto& tasks : _tasks) {
264
4.80k
        for (auto& task : tasks) {
265
4.80k
            task.first->unblock_all_dependencies();
266
4.80k
        }
267
1.92k
    }
268
408
}
269
270
84.6k
PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
271
84.6k
    PipelineId id = _next_pipeline_id++;
272
84.6k
    auto pipeline = std::make_shared<Pipeline>(
273
84.6k
            id, parent ? std::min(parent->num_tasks(), _num_instances) : _num_instances,
274
84.6k
            parent ? parent->num_tasks() : _num_instances);
275
84.6k
    if (idx >= 0) {
276
8.14k
        _pipelines.insert(_pipelines.begin() + idx, pipeline);
277
76.4k
    } else {
278
76.4k
        _pipelines.emplace_back(pipeline);
279
76.4k
    }
280
84.6k
    if (parent) {
281
27.1k
        parent->set_children(pipeline);
282
27.1k
    }
283
84.6k
    return pipeline;
284
84.6k
}
285
286
55.1k
Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thread_pool) {
287
55.1k
    {
288
55.1k
        SCOPED_TIMER(_build_pipelines_timer);
289
        // 2. Build pipelines with operators in this fragment.
290
55.1k
        auto root_pipeline = add_pipeline();
291
55.1k
        RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl,
292
55.1k
                                         &_root_op, root_pipeline));
293
294
        // 3. Create sink operator
295
55.1k
        if (!_params.fragment.__isset.output_sink) {
296
0
            return Status::InternalError("No output sink in this fragment!");
297
0
        }
298
55.1k
        RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), _params.fragment.output_sink,
299
55.1k
                                          _params.fragment.output_exprs, _params,
300
55.1k
                                          root_pipeline->output_row_desc(), _runtime_state.get(),
301
55.1k
                                          *_desc_tbl, root_pipeline->id()));
302
55.1k
        RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink));
303
55.1k
        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
304
305
76.4k
        for (PipelinePtr& pipeline : _pipelines) {
306
18.4E
            DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size();
307
76.4k
            RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
308
76.4k
        }
309
55.1k
    }
310
    // 4. Build local exchanger
311
55.1k
    if (_runtime_state->enable_local_shuffle()) {
312
54.9k
        SCOPED_TIMER(_plan_local_exchanger_timer);
313
54.9k
        RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets,
314
54.9k
                                             _params.bucket_seq_to_instance_idx,
315
54.9k
                                             _params.shuffle_idx_to_instance_idx));
316
54.9k
    }
317
318
    // 5. Initialize global states in pipelines.
319
84.6k
    for (PipelinePtr& pipeline : _pipelines) {
320
84.6k
        SCOPED_TIMER(_prepare_all_pipelines_timer);
321
84.6k
        pipeline->children().clear();
322
84.6k
        RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
323
84.6k
    }
324
325
55.1k
    {
326
55.1k
        SCOPED_TIMER(_build_tasks_timer);
327
        // 6. Build pipeline tasks and initialize local state.
328
55.1k
        RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool));
329
55.1k
    }
330
331
55.1k
    return Status::OK();
332
55.1k
}
333
334
55.1k
Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) {
335
55.1k
    if (_prepared) {
336
0
        return Status::InternalError("Already prepared");
337
0
    }
338
55.1k
    if (_params.__isset.query_options && _params.query_options.__isset.execution_timeout) {
339
55.1k
        _timeout = _params.query_options.execution_timeout;
340
55.1k
    }
341
342
55.1k
    _fragment_level_profile = std::make_unique<RuntimeProfile>("PipelineContext");
343
55.1k
    _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime");
344
55.1k
    SCOPED_TIMER(_prepare_timer);
345
55.1k
    _build_pipelines_timer = ADD_TIMER(_fragment_level_profile, "BuildPipelinesTime");
346
55.1k
    _init_context_timer = ADD_TIMER(_fragment_level_profile, "InitContextTime");
347
55.1k
    _plan_local_exchanger_timer = ADD_TIMER(_fragment_level_profile, "PlanLocalLocalExchangerTime");
348
55.1k
    _build_tasks_timer = ADD_TIMER(_fragment_level_profile, "BuildTasksTime");
349
55.1k
    _prepare_all_pipelines_timer = ADD_TIMER(_fragment_level_profile, "PrepareAllPipelinesTime");
350
55.1k
    {
351
55.1k
        SCOPED_TIMER(_init_context_timer);
352
55.1k
        cast_set(_num_instances, _params.local_params.size());
353
55.1k
        _total_instances =
354
55.1k
                _params.__isset.total_instances ? _params.total_instances : _num_instances;
355
356
55.1k
        auto* fragment_context = this;
357
358
55.1k
        if (_params.query_options.__isset.is_report_success) {
359
55.1k
            fragment_context->set_is_report_success(_params.query_options.is_report_success);
360
55.1k
        }
361
362
        // 1. Set up the global runtime state.
363
55.1k
        _runtime_state = RuntimeState::create_unique(
364
55.1k
                _params.query_id, _params.fragment_id, _params.query_options,
365
55.1k
                _query_ctx->query_globals, _exec_env, _query_ctx.get());
366
55.1k
        _runtime_state->set_task_execution_context(shared_from_this());
367
55.1k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
368
55.1k
        if (_params.__isset.backend_id) {
369
55.0k
            _runtime_state->set_backend_id(_params.backend_id);
370
55.0k
        }
371
55.1k
        if (_params.__isset.import_label) {
372
0
            _runtime_state->set_import_label(_params.import_label);
373
0
        }
374
55.1k
        if (_params.__isset.db_name) {
375
0
            _runtime_state->set_db_name(_params.db_name);
376
0
        }
377
55.1k
        if (_params.__isset.load_job_id) {
378
0
            _runtime_state->set_load_job_id(_params.load_job_id);
379
0
        }
380
381
55.1k
        if (_params.is_simplified_param) {
382
17.1k
            _desc_tbl = _query_ctx->desc_tbl;
383
38.0k
        } else {
384
38.0k
            DCHECK(_params.__isset.desc_tbl);
385
38.0k
            RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), _params.desc_tbl,
386
38.0k
                                                  &_desc_tbl));
387
38.0k
        }
388
55.1k
        _runtime_state->set_desc_tbl(_desc_tbl);
389
55.1k
        _runtime_state->set_num_per_fragment_instances(_params.num_senders);
390
55.1k
        _runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
391
55.1k
        _runtime_state->set_total_load_streams(_params.total_load_streams);
392
55.1k
        _runtime_state->set_num_local_sink(_params.num_local_sink);
393
394
        // init fragment_instance_ids
395
55.1k
        const auto target_size = _params.local_params.size();
396
55.1k
        _fragment_instance_ids.resize(target_size);
397
167k
        for (size_t i = 0; i < _params.local_params.size(); i++) {
398
111k
            auto fragment_instance_id = _params.local_params[i].fragment_instance_id;
399
111k
            _fragment_instance_ids[i] = fragment_instance_id;
400
111k
        }
401
55.1k
    }
402
403
55.1k
    RETURN_IF_ERROR(_build_and_prepare_full_pipeline(thread_pool));
404
405
55.1k
    _init_next_report_time();
406
407
55.1k
    _prepared = true;
408
55.1k
    return Status::OK();
409
55.1k
}
410
411
Status PipelineFragmentContext::_build_pipeline_tasks_for_instance(
412
        int instance_idx,
413
111k
        const std::vector<std::shared_ptr<RuntimeProfile>>& pipeline_id_to_profile) {
414
111k
    const auto& local_params = _params.local_params[instance_idx];
415
111k
    auto fragment_instance_id = local_params.fragment_instance_id;
416
111k
    auto runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(false);
417
111k
    std::map<PipelineId, PipelineTask*> pipeline_id_to_task;
418
111k
    auto get_shared_state = [&](PipelinePtr pipeline)
419
111k
            -> std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
420
198k
                                       std::vector<std::shared_ptr<Dependency>>>> {
421
198k
        std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
422
198k
                                std::vector<std::shared_ptr<Dependency>>>>
423
198k
                shared_state_map;
424
208k
        for (auto& op : pipeline->operators()) {
425
208k
            auto source_id = op->operator_id();
426
208k
            if (auto iter = _op_id_to_shared_state.find(source_id);
427
208k
                iter != _op_id_to_shared_state.end()) {
428
68.0k
                shared_state_map.insert({source_id, iter->second});
429
68.0k
            }
430
208k
        }
431
200k
        for (auto sink_to_source_id : pipeline->sink()->dests_id()) {
432
200k
            if (auto iter = _op_id_to_shared_state.find(sink_to_source_id);
433
200k
                iter != _op_id_to_shared_state.end()) {
434
21.2k
                shared_state_map.insert({sink_to_source_id, iter->second});
435
21.2k
            }
436
200k
        }
437
198k
        return shared_state_map;
438
198k
    };
439
440
357k
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
441
245k
        auto& pipeline = _pipelines[pip_idx];
442
245k
        if (pipeline->num_tasks() > 1 || instance_idx == 0) {
443
198k
            auto task_runtime_state = RuntimeState::create_unique(
444
198k
                    local_params.fragment_instance_id, _params.query_id, _params.fragment_id,
445
198k
                    _params.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get());
446
198k
            {
447
                // Initialize runtime state for this task
448
198k
                task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker());
449
450
198k
                task_runtime_state->set_task_execution_context(shared_from_this());
451
198k
                task_runtime_state->set_be_number(local_params.backend_num);
452
453
198k
                if (_params.__isset.backend_id) {
454
198k
                    task_runtime_state->set_backend_id(_params.backend_id);
455
198k
                }
456
198k
                if (_params.__isset.import_label) {
457
0
                    task_runtime_state->set_import_label(_params.import_label);
458
0
                }
459
198k
                if (_params.__isset.db_name) {
460
0
                    task_runtime_state->set_db_name(_params.db_name);
461
0
                }
462
198k
                if (_params.__isset.load_job_id) {
463
0
                    task_runtime_state->set_load_job_id(_params.load_job_id);
464
0
                }
465
198k
                if (_params.__isset.wal_id) {
466
0
                    task_runtime_state->set_wal_id(_params.wal_id);
467
0
                }
468
198k
                if (_params.__isset.content_length) {
469
0
                    task_runtime_state->set_content_length(_params.content_length);
470
0
                }
471
472
198k
                task_runtime_state->set_desc_tbl(_desc_tbl);
473
198k
                task_runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
474
198k
                task_runtime_state->set_num_per_fragment_instances(_params.num_senders);
475
198k
                task_runtime_state->resize_op_id_to_local_state(max_operator_id());
476
198k
                task_runtime_state->set_max_operator_id(max_operator_id());
477
198k
                task_runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
478
198k
                task_runtime_state->set_total_load_streams(_params.total_load_streams);
479
198k
                task_runtime_state->set_num_local_sink(_params.num_local_sink);
480
481
198k
                task_runtime_state->set_runtime_filter_mgr(runtime_filter_mgr.get());
482
198k
            }
483
198k
            auto cur_task_id = _total_tasks++;
484
198k
            task_runtime_state->set_task_id(cur_task_id);
485
198k
            task_runtime_state->set_task_num(pipeline->num_tasks());
486
198k
            auto task = std::make_shared<PipelineTask>(
487
198k
                    pipeline, cur_task_id, task_runtime_state.get(),
488
198k
                    std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
489
198k
                    pipeline_id_to_profile[pip_idx].get(), get_shared_state(pipeline),
490
198k
                    instance_idx);
491
198k
            pipeline->incr_created_tasks(instance_idx, task.get());
492
198k
            pipeline_id_to_task.insert({pipeline->id(), task.get()});
493
198k
            _tasks[instance_idx].emplace_back(
494
198k
                    std::pair<std::shared_ptr<PipelineTask>, std::unique_ptr<RuntimeState>> {
495
198k
                            std::move(task), std::move(task_runtime_state)});
496
198k
        }
497
245k
    }
498
499
    /**
500
         * Build DAG for pipeline tasks.
501
         * For example, we have
502
         *
503
         *   ExchangeSink (Pipeline1)     JoinBuildSink (Pipeline2)
504
         *            \                      /
505
         *          JoinProbeOperator1 (Pipeline1)    JoinBuildSink (Pipeline3)
506
         *                 \                          /
507
         *               JoinProbeOperator2 (Pipeline1)
508
         *
509
         * In this fragment, we have three pipelines and pipeline 1 depends on pipeline 2 and pipeline 3.
510
         * To build this DAG, `_dag` manage dependencies between pipelines by pipeline ID and
511
         * `pipeline_id_to_task` is used to find the task by a unique pipeline ID.
512
         *
513
         * Finally, we have two upstream dependencies in Pipeline1 corresponding to JoinProbeOperator1
514
         * and JoinProbeOperator2.
515
         */
516
245k
    for (auto& _pipeline : _pipelines) {
517
245k
        if (pipeline_id_to_task.contains(_pipeline->id())) {
518
198k
            auto* task = pipeline_id_to_task[_pipeline->id()];
519
198k
            DCHECK(task != nullptr);
520
521
            // If this task has upstream dependency, then inject it into this task.
522
198k
            if (_dag.contains(_pipeline->id())) {
523
131k
                auto& deps = _dag[_pipeline->id()];
524
199k
                for (auto& dep : deps) {
525
199k
                    if (pipeline_id_to_task.contains(dep)) {
526
105k
                        auto ss = pipeline_id_to_task[dep]->get_sink_shared_state();
527
105k
                        if (ss) {
528
65.5k
                            task->inject_shared_state(ss);
529
65.5k
                        } else {
530
39.5k
                            pipeline_id_to_task[dep]->inject_shared_state(
531
39.5k
                                    task->get_source_shared_state());
532
39.5k
                        }
533
105k
                    }
534
199k
                }
535
131k
            }
536
198k
        }
537
245k
    }
538
357k
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
539
245k
        if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
540
198k
            auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
541
198k
            DCHECK(pipeline_id_to_profile[pip_idx]);
542
198k
            std::vector<TScanRangeParams> scan_ranges;
543
198k
            auto node_id = _pipelines[pip_idx]->operators().front()->node_id();
544
198k
            if (local_params.per_node_scan_ranges.contains(node_id)) {
545
42.3k
                scan_ranges = local_params.per_node_scan_ranges.find(node_id)->second;
546
42.3k
            }
547
198k
            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(task->prepare(scan_ranges, local_params.sender_id,
548
198k
                                                             _params.fragment.output_sink));
549
198k
        }
550
245k
    }
551
111k
    {
552
111k
        std::lock_guard<std::mutex> l(_state_map_lock);
553
111k
        _runtime_filter_mgr_map[instance_idx] = std::move(runtime_filter_mgr);
554
111k
    }
555
111k
    return Status::OK();
556
111k
}
557
558
55.1k
Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
559
55.1k
    _total_tasks = 0;
560
55.1k
    _closed_tasks = 0;
561
55.1k
    const auto target_size = _params.local_params.size();
562
55.1k
    _tasks.resize(target_size);
563
55.1k
    _runtime_filter_mgr_map.resize(target_size);
564
139k
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
565
84.6k
        _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get();
566
84.6k
    }
567
55.1k
    auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size());
568
569
55.1k
    if (target_size > 1 &&
570
55.1k
        (_runtime_state->query_options().__isset.parallel_prepare_threshold &&
571
8.10k
         target_size > _runtime_state->query_options().parallel_prepare_threshold)) {
572
        // If instances parallelism is big enough ( > parallel_prepare_threshold), we will prepare all tasks by multi-threads
573
0
        std::vector<Status> prepare_status(target_size);
574
0
        int submitted_tasks = 0;
575
0
        Status submit_status;
576
0
        CountDownLatch latch((int)target_size);
577
0
        for (int i = 0; i < target_size; i++) {
578
0
            submit_status = thread_pool->submit_func([&, i]() {
579
0
                SCOPED_ATTACH_TASK(_query_ctx.get());
580
0
                prepare_status[i] = _build_pipeline_tasks_for_instance(i, pipeline_id_to_profile);
581
0
                latch.count_down();
582
0
            });
583
0
            if (LIKELY(submit_status.ok())) {
584
0
                submitted_tasks++;
585
0
            } else {
586
0
                break;
587
0
            }
588
0
        }
589
0
        latch.arrive_and_wait(target_size - submitted_tasks);
590
0
        if (UNLIKELY(!submit_status.ok())) {
591
0
            return submit_status;
592
0
        }
593
0
        for (int i = 0; i < submitted_tasks; i++) {
594
0
            if (!prepare_status[i].ok()) {
595
0
                return prepare_status[i];
596
0
            }
597
0
        }
598
55.1k
    } else {
599
167k
        for (int i = 0; i < target_size; i++) {
600
111k
            RETURN_IF_ERROR(_build_pipeline_tasks_for_instance(i, pipeline_id_to_profile));
601
111k
        }
602
55.1k
    }
603
55.1k
    _pipeline_parent_map.clear();
604
55.1k
    _op_id_to_shared_state.clear();
605
606
55.1k
    return Status::OK();
607
55.1k
}
608
609
55.1k
void PipelineFragmentContext::_init_next_report_time() {
610
55.1k
    auto interval_s = config::pipeline_status_report_interval;
611
55.1k
    if (_is_report_success && interval_s > 0 && _timeout > interval_s) {
612
5.09k
        VLOG_FILE << "enable period report: fragment id=" << _fragment_id;
613
5.09k
        uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC;
614
        // We don't want to wait longer than it takes to run the entire fragment.
615
5.09k
        _previous_report_time =
616
5.09k
                MonotonicNanos() + report_fragment_offset - (uint64_t)(interval_s)*NANOS_PER_SEC;
617
5.09k
        _disable_period_report = false;
618
5.09k
    }
619
55.1k
}
620
621
605
void PipelineFragmentContext::refresh_next_report_time() {
622
605
    auto disable = _disable_period_report.load(std::memory_order_acquire);
623
605
    DCHECK(disable == true);
624
605
    _previous_report_time.store(MonotonicNanos(), std::memory_order_release);
625
605
    _disable_period_report.compare_exchange_strong(disable, false);
626
605
}
627
628
777k
void PipelineFragmentContext::trigger_report_if_necessary() {
629
777k
    if (!_is_report_success) {
630
719k
        return;
631
719k
    }
632
57.9k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
633
57.9k
    if (disable) {
634
1.10k
        return;
635
1.10k
    }
636
56.8k
    int32_t interval_s = config::pipeline_status_report_interval;
637
56.8k
    if (interval_s <= 0) {
638
0
        LOG(WARNING) << "config::status_report_interval is equal to or less than zero, do not "
639
0
                        "trigger "
640
0
                        "report.";
641
0
    }
642
56.8k
    uint64_t next_report_time = _previous_report_time.load(std::memory_order_acquire) +
643
56.8k
                                (uint64_t)(interval_s)*NANOS_PER_SEC;
644
56.8k
    if (MonotonicNanos() > next_report_time) {
645
606
        if (!_disable_period_report.compare_exchange_strong(disable, true,
646
606
                                                            std::memory_order_acq_rel)) {
647
1
            return;
648
1
        }
649
605
        if (VLOG_FILE_IS_ON) {
650
0
            VLOG_FILE << "Reporting "
651
0
                      << "profile for query_id " << print_id(_query_id)
652
0
                      << ", fragment id: " << _fragment_id;
653
654
0
            std::stringstream ss;
655
0
            _runtime_state->runtime_profile()->compute_time_in_profile();
656
0
            _runtime_state->runtime_profile()->pretty_print(&ss);
657
0
            if (_runtime_state->load_channel_profile()) {
658
0
                _runtime_state->load_channel_profile()->pretty_print(&ss);
659
0
            }
660
661
0
            VLOG_FILE << "Query " << print_id(get_query_id()) << " fragment " << get_fragment_id()
662
0
                      << " profile:\n"
663
0
                      << ss.str();
664
0
        }
665
605
        auto st = send_report(false);
666
605
        if (!st.ok()) {
667
0
            disable = true;
668
0
            _disable_period_report.compare_exchange_strong(disable, false,
669
0
                                                           std::memory_order_acq_rel);
670
0
        }
671
605
    }
672
56.8k
}
673
674
Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const DescriptorTbl& descs,
675
55.1k
                                                 OperatorPtr* root, PipelinePtr cur_pipe) {
676
55.1k
    if (_params.fragment.plan.nodes.empty()) {
677
0
        throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no plan node!");
678
0
    }
679
680
55.1k
    int node_idx = 0;
681
682
55.1k
    RETURN_IF_ERROR(_create_tree_helper(pool, _params.fragment.plan.nodes, descs, nullptr,
683
55.1k
                                        &node_idx, root, cur_pipe, 0, false, false));
684
685
55.1k
    if (node_idx + 1 != _params.fragment.plan.nodes.size()) {
686
0
        return Status::InternalError(
687
0
                "Plan tree only partially reconstructed. Not all thrift nodes were used.");
688
0
    }
689
55.1k
    return Status::OK();
690
55.1k
}
691
692
Status PipelineFragmentContext::_create_tree_helper(
693
        ObjectPool* pool, const std::vector<TPlanNode>& tnodes, const DescriptorTbl& descs,
694
        OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx,
695
78.4k
        const bool followed_by_shuffled_operator, const bool require_bucket_distribution) {
696
    // propagate error case
697
78.4k
    if (*node_idx >= tnodes.size()) {
698
0
        return Status::InternalError(
699
0
                "Failed to reconstruct plan tree from thrift. Node id: {}, number of nodes: {}",
700
0
                *node_idx, tnodes.size());
701
0
    }
702
78.4k
    const TPlanNode& tnode = tnodes[*node_idx];
703
704
78.4k
    int num_children = tnodes[*node_idx].num_children;
705
78.4k
    bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
706
78.4k
    bool current_require_bucket_distribution = require_bucket_distribution;
707
    // TODO: Create CacheOperator is confused now
708
78.4k
    OperatorPtr op = nullptr;
709
78.4k
    OperatorPtr cache_op = nullptr;
710
78.4k
    RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op, cur_pipe,
711
78.4k
                                     parent == nullptr ? -1 : parent->node_id(), child_idx,
712
78.4k
                                     followed_by_shuffled_operator,
713
78.4k
                                     current_require_bucket_distribution, cache_op));
714
    // Initialization must be done here. For example, group by expressions in agg will be used to
715
    // decide if a local shuffle should be planed, so it must be initialized here.
716
78.4k
    RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
717
    // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
718
78.4k
    if (parent != nullptr) {
719
        // add to parent's child(s)
720
23.3k
        RETURN_IF_ERROR(parent->set_child(cache_op ? cache_op : op));
721
55.1k
    } else {
722
55.1k
        *root = op;
723
55.1k
    }
724
    /**
725
     * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
726
     *
727
     * For plan:
728
     * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
729
     *                           Exchange(id=3) -> ShuffledHashJoinBuild(id=2)
730
     * We must ensure data distribution of `LocalExchange(id=0)` is same as Exchange(id=3).
731
     *
732
     * If an operator's is followed by a local exchange without shuffle (e.g. passthrough), a
733
     * shuffled local exchanger will be used before join so it is not followed by shuffle join.
734
     */
735
78.4k
    auto required_data_distribution =
736
78.4k
            cur_pipe->operators().empty()
737
78.4k
                    ? cur_pipe->sink()->required_data_distribution(_runtime_state.get())
738
78.4k
                    : op->required_data_distribution(_runtime_state.get());
739
78.4k
    current_followed_by_shuffled_operator =
740
78.4k
            ((followed_by_shuffled_operator ||
741
78.4k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_shuffled_operator()
742
77.3k
                                             : op->is_shuffled_operator())) &&
743
78.4k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
744
78.4k
            (followed_by_shuffled_operator &&
745
76.2k
             required_data_distribution.distribution_type == ExchangeType::NOOP);
746
747
78.4k
    current_require_bucket_distribution =
748
78.4k
            ((require_bucket_distribution ||
749
78.4k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_colocated_operator()
750
77.4k
                                             : op->is_colocated_operator())) &&
751
78.4k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
752
78.4k
            (require_bucket_distribution &&
753
76.3k
             required_data_distribution.distribution_type == ExchangeType::NOOP);
754
755
78.4k
    if (num_children == 0) {
756
57.2k
        _use_serial_source = op->is_serial_operator();
757
57.2k
    }
758
    // rely on that tnodes is preorder of the plan
759
101k
    for (int i = 0; i < num_children; i++) {
760
23.3k
        ++*node_idx;
761
23.3k
        RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, descs, op, node_idx, nullptr, cur_pipe, i,
762
23.3k
                                            current_followed_by_shuffled_operator,
763
23.3k
                                            current_require_bucket_distribution));
764
765
        // we are expecting a child, but have used all nodes
766
        // this means we have been given a bad tree and must fail
767
23.3k
        if (*node_idx >= tnodes.size()) {
768
0
            return Status::InternalError(
769
0
                    "Failed to reconstruct plan tree from thrift. Node id: {}, number of "
770
0
                    "nodes: {}",
771
0
                    *node_idx, tnodes.size());
772
0
        }
773
23.3k
    }
774
775
78.4k
    return Status::OK();
776
78.4k
}
777
778
void PipelineFragmentContext::_inherit_pipeline_properties(
779
        const DataDistribution& data_distribution, PipelinePtr pipe_with_source,
780
8.14k
        PipelinePtr pipe_with_sink) {
781
8.14k
    pipe_with_sink->set_num_tasks(pipe_with_source->num_tasks());
782
8.14k
    pipe_with_source->set_num_tasks(_num_instances);
783
8.14k
    pipe_with_source->set_data_distribution(data_distribution);
784
8.14k
}
785
786
Status PipelineFragmentContext::_add_local_exchange_impl(
787
        int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip,
788
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
789
        const std::map<int, int>& bucket_seq_to_instance_idx,
790
8.14k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
791
8.14k
    auto& operators = cur_pipe->operators();
792
8.14k
    const auto downstream_pipeline_id = cur_pipe->id();
793
8.14k
    auto local_exchange_id = next_operator_id();
794
    // 1. Create a new pipeline with local exchange sink.
795
8.14k
    DataSinkOperatorPtr sink;
796
8.14k
    auto sink_id = next_sink_operator_id();
797
798
    /**
799
     * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
800
     * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
801
     */
802
8.14k
    const bool followed_by_shuffled_operator =
803
8.14k
            operators.size() > idx ? operators[idx]->followed_by_shuffled_operator()
804
8.14k
                                   : cur_pipe->sink()->followed_by_shuffled_operator();
805
8.14k
    const bool use_global_hash_shuffle = bucket_seq_to_instance_idx.empty() &&
806
8.14k
                                         !shuffle_idx_to_instance_idx.contains(-1) &&
807
8.14k
                                         followed_by_shuffled_operator && !_use_serial_source;
808
8.14k
    sink = std::make_shared<LocalExchangeSinkOperatorX>(
809
8.14k
            sink_id, local_exchange_id, use_global_hash_shuffle ? _total_instances : _num_instances,
810
8.14k
            data_distribution.partition_exprs, bucket_seq_to_instance_idx);
811
8.14k
    if (bucket_seq_to_instance_idx.empty() &&
812
8.14k
        data_distribution.distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE) {
813
0
        data_distribution.distribution_type = ExchangeType::HASH_SHUFFLE;
814
0
    }
815
8.14k
    RETURN_IF_ERROR(new_pip->set_sink(sink));
816
8.14k
    RETURN_IF_ERROR(new_pip->sink()->init(_runtime_state.get(), data_distribution.distribution_type,
817
8.14k
                                          num_buckets, use_global_hash_shuffle,
818
8.14k
                                          shuffle_idx_to_instance_idx));
819
820
    // 2. Create and initialize LocalExchangeSharedState.
821
8.14k
    std::shared_ptr<LocalExchangeSharedState> shared_state =
822
8.14k
            LocalExchangeSharedState::create_shared(_num_instances);
823
8.14k
    switch (data_distribution.distribution_type) {
824
315
    case ExchangeType::HASH_SHUFFLE:
825
315
        shared_state->exchanger = ShuffleExchanger::create_unique(
826
315
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
827
315
                use_global_hash_shuffle ? _total_instances : _num_instances,
828
315
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
829
315
                        ? cast_set<int>(
830
315
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
831
315
                        : 0);
832
315
        break;
833
4
    case ExchangeType::BUCKET_HASH_SHUFFLE:
834
4
        shared_state->exchanger = BucketShuffleExchanger::create_unique(
835
4
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, num_buckets,
836
4
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
837
4
                        ? cast_set<int>(
838
4
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
839
4
                        : 0);
840
4
        break;
841
7.43k
    case ExchangeType::PASSTHROUGH:
842
7.43k
        shared_state->exchanger = PassthroughExchanger::create_unique(
843
7.43k
                cur_pipe->num_tasks(), _num_instances,
844
7.43k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
845
7.43k
                        ? cast_set<int>(
846
7.42k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
847
7.43k
                        : 0);
848
7.43k
        break;
849
29
    case ExchangeType::BROADCAST:
850
29
        shared_state->exchanger = BroadcastExchanger::create_unique(
851
29
                cur_pipe->num_tasks(), _num_instances,
852
29
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
853
29
                        ? cast_set<int>(
854
29
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
855
29
                        : 0);
856
29
        break;
857
333
    case ExchangeType::PASS_TO_ONE:
858
333
        if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
859
            // If shared hash table is enabled for BJ, hash table will be built by only one task
860
333
            shared_state->exchanger = PassToOneExchanger::create_unique(
861
333
                    cur_pipe->num_tasks(), _num_instances,
862
333
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
863
333
                            ? cast_set<int>(_runtime_state->query_options()
864
333
                                                    .local_exchange_free_blocks_limit)
865
333
                            : 0);
866
333
        } else {
867
0
            shared_state->exchanger = BroadcastExchanger::create_unique(
868
0
                    cur_pipe->num_tasks(), _num_instances,
869
0
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
870
0
                            ? cast_set<int>(_runtime_state->query_options()
871
0
                                                    .local_exchange_free_blocks_limit)
872
0
                            : 0);
873
0
        }
874
333
        break;
875
29
    case ExchangeType::ADAPTIVE_PASSTHROUGH:
876
29
        shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
877
29
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
878
29
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
879
29
                        ? cast_set<int>(
880
29
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
881
29
                        : 0);
882
29
        break;
883
0
    default:
884
0
        return Status::InternalError("Unsupported local exchange type : " +
885
0
                                     std::to_string((int)data_distribution.distribution_type));
886
8.14k
    }
887
8.14k
    shared_state->create_source_dependencies(_num_instances, local_exchange_id, local_exchange_id,
888
8.14k
                                             "LOCAL_EXCHANGE_OPERATOR");
889
8.14k
    shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
890
8.14k
    _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
891
892
    // 3. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to
893
    // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink].
894
895
    // 3.1 Initialize new pipeline's operator list.
896
8.14k
    std::copy(operators.begin(), operators.begin() + idx,
897
8.14k
              std::inserter(new_pip->operators(), new_pip->operators().end()));
898
899
    // 3.2 Erase unused operators in previous pipeline.
900
8.14k
    operators.erase(operators.begin(), operators.begin() + idx);
901
902
    // 4. Initialize LocalExchangeSource and insert it into this pipeline.
903
8.14k
    OperatorPtr source_op;
904
8.14k
    source_op = std::make_shared<LocalExchangeSourceOperatorX>(pool, local_exchange_id);
905
8.14k
    RETURN_IF_ERROR(source_op->set_child(new_pip->operators().back()));
906
8.14k
    RETURN_IF_ERROR(source_op->init(data_distribution.distribution_type));
907
8.14k
    if (!operators.empty()) {
908
692
        RETURN_IF_ERROR(operators.front()->set_child(nullptr));
909
692
        RETURN_IF_ERROR(operators.front()->set_child(source_op));
910
692
    }
911
8.14k
    operators.insert(operators.begin(), source_op);
912
913
    // 5. Set children for two pipelines separately.
914
8.14k
    std::vector<std::shared_ptr<Pipeline>> new_children;
915
8.14k
    std::vector<PipelineId> edges_with_source;
916
9.17k
    for (auto child : cur_pipe->children()) {
917
9.17k
        bool found = false;
918
9.68k
        for (auto op : new_pip->operators()) {
919
9.68k
            if (child->sink()->node_id() == op->node_id()) {
920
666
                new_pip->set_children(child);
921
666
                found = true;
922
666
            };
923
9.68k
        }
924
9.17k
        if (!found) {
925
8.50k
            new_children.push_back(child);
926
8.50k
            edges_with_source.push_back(child->id());
927
8.50k
        }
928
9.17k
    }
929
8.14k
    new_children.push_back(new_pip);
930
8.14k
    edges_with_source.push_back(new_pip->id());
931
932
    // 6. Set DAG for new pipelines.
933
8.14k
    if (!new_pip->children().empty()) {
934
433
        std::vector<PipelineId> edges_with_sink;
935
666
        for (auto child : new_pip->children()) {
936
666
            edges_with_sink.push_back(child->id());
937
666
        }
938
433
        _dag.insert({new_pip->id(), edges_with_sink});
939
433
    }
940
8.14k
    cur_pipe->set_children(new_children);
941
8.14k
    _dag[downstream_pipeline_id] = edges_with_source;
942
8.14k
    RETURN_IF_ERROR(new_pip->sink()->set_child(new_pip->operators().back()));
943
8.14k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(nullptr));
944
8.14k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(cur_pipe->operators().back()));
945
946
    // 7. Inherit properties from current pipeline.
947
8.14k
    _inherit_pipeline_properties(data_distribution, cur_pipe, new_pip);
948
8.14k
    return Status::OK();
949
8.14k
}
950
951
Status PipelineFragmentContext::_add_local_exchange(
952
        int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe,
953
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
954
        const std::map<int, int>& bucket_seq_to_instance_idx,
955
16.7k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
956
16.7k
    if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) {
957
8.50k
        return Status::OK();
958
8.50k
    }
959
960
8.20k
    if (!cur_pipe->need_to_local_exchange(data_distribution, idx)) {
961
409
        return Status::OK();
962
409
    }
963
7.79k
    *do_local_exchange = true;
964
965
7.79k
    auto& operators = cur_pipe->operators();
966
7.79k
    auto total_op_num = operators.size();
967
7.79k
    auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
968
7.79k
    RETURN_IF_ERROR(_add_local_exchange_impl(
969
7.79k
            idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets,
970
7.79k
            bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
971
972
7.79k
    CHECK(total_op_num + 1 == cur_pipe->operators().size() + new_pip->operators().size())
973
0
            << "total_op_num: " << total_op_num
974
0
            << " cur_pipe->operators().size(): " << cur_pipe->operators().size()
975
0
            << " new_pip->operators().size(): " << new_pip->operators().size();
976
977
    // There are some local shuffles with relatively heavy operations on the sink.
978
    // If the local sink concurrency is 1 and the local source concurrency is n, the sink becomes a bottleneck.
979
    // Therefore, local passthrough is used to increase the concurrency of the sink.
980
    // op -> local sink(1) -> local source (n)
981
    // op -> local passthrough(1) -> local passthrough(n) ->  local sink(n) -> local source (n)
982
7.79k
    if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 &&
983
7.79k
        Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) {
984
348
        RETURN_IF_ERROR(_add_local_exchange_impl(
985
348
                cast_set<int>(new_pip->operators().size()), pool, new_pip,
986
348
                add_pipeline(new_pip, pip_idx + 2), DataDistribution(ExchangeType::PASSTHROUGH),
987
348
                do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
988
348
                shuffle_idx_to_instance_idx));
989
348
    }
990
7.79k
    return Status::OK();
991
7.79k
}
992
993
Status PipelineFragmentContext::_plan_local_exchange(
994
        int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx,
995
54.9k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
996
131k
    for (int pip_idx = cast_set<int>(_pipelines.size()) - 1; pip_idx >= 0; pip_idx--) {
997
76.3k
        _pipelines[pip_idx]->init_data_distribution(_runtime_state.get());
998
        // Set property if child pipeline is not join operator's child.
999
76.3k
        if (!_pipelines[pip_idx]->children().empty()) {
1000
19.0k
            for (auto& child : _pipelines[pip_idx]->children()) {
1001
19.0k
                if (child->sink()->node_id() ==
1002
19.0k
                    _pipelines[pip_idx]->operators().front()->node_id()) {
1003
17.0k
                    _pipelines[pip_idx]->set_data_distribution(child->data_distribution());
1004
17.0k
                }
1005
19.0k
            }
1006
18.1k
        }
1007
1008
        // if 'num_buckets == 0' means the fragment is colocated by exchange node not the
1009
        // scan node. so here use `_num_instance` to replace the `num_buckets` to prevent dividing 0
1010
        // still keep colocate plan after local shuffle
1011
76.3k
        RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx, _pipelines[pip_idx],
1012
76.3k
                                             bucket_seq_to_instance_idx,
1013
76.3k
                                             shuffle_idx_to_instance_idx));
1014
76.3k
    }
1015
54.9k
    return Status::OK();
1016
54.9k
}
1017
1018
Status PipelineFragmentContext::_plan_local_exchange(
1019
        int num_buckets, int pip_idx, PipelinePtr pip,
1020
        const std::map<int, int>& bucket_seq_to_instance_idx,
1021
76.2k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1022
76.2k
    int idx = 1;
1023
76.2k
    bool do_local_exchange = false;
1024
76.9k
    do {
1025
76.9k
        auto& ops = pip->operators();
1026
76.9k
        do_local_exchange = false;
1027
        // Plan local exchange for each operator.
1028
80.6k
        for (; idx < ops.size();) {
1029
4.31k
            if (ops[idx]->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1030
3.04k
                RETURN_IF_ERROR(_add_local_exchange(
1031
3.04k
                        pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip,
1032
3.04k
                        ops[idx]->required_data_distribution(_runtime_state.get()),
1033
3.04k
                        &do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
1034
3.04k
                        shuffle_idx_to_instance_idx));
1035
3.04k
            }
1036
4.31k
            if (do_local_exchange) {
1037
                // If local exchange is needed for current operator, we will split this pipeline to
1038
                // two pipelines by local exchange sink/source. And then we need to process remaining
1039
                // operators in this pipeline so we set idx to 2 (0 is local exchange source and 1
1040
                // is current operator was already processed) and continue to plan local exchange.
1041
692
                idx = 2;
1042
692
                break;
1043
692
            }
1044
3.62k
            idx++;
1045
3.62k
        }
1046
76.9k
    } while (do_local_exchange);
1047
76.2k
    if (pip->sink()->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1048
13.6k
        RETURN_IF_ERROR(_add_local_exchange(
1049
13.6k
                pip_idx, idx, pip->sink()->node_id(), _runtime_state->obj_pool(), pip,
1050
13.6k
                pip->sink()->required_data_distribution(_runtime_state.get()), &do_local_exchange,
1051
13.6k
                num_buckets, bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1052
13.6k
    }
1053
76.2k
    return Status::OK();
1054
76.2k
}
1055
1056
Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
1057
                                                  const std::vector<TExpr>& output_exprs,
1058
                                                  const TPipelineFragmentParams& params,
1059
                                                  const RowDescriptor& row_desc,
1060
                                                  RuntimeState* state, DescriptorTbl& desc_tbl,
1061
55.1k
                                                  PipelineId cur_pipeline_id) {
1062
55.1k
    switch (thrift_sink.type) {
1063
16.3k
    case TDataSinkType::DATA_STREAM_SINK: {
1064
16.3k
        if (!thrift_sink.__isset.stream_sink) {
1065
0
            return Status::InternalError("Missing data stream sink.");
1066
0
        }
1067
16.3k
        _sink = std::make_shared<ExchangeSinkOperatorX>(
1068
16.3k
                state, row_desc, next_sink_operator_id(), thrift_sink.stream_sink,
1069
16.3k
                params.destinations, _fragment_instance_ids);
1070
16.3k
        break;
1071
16.3k
    }
1072
34.9k
    case TDataSinkType::RESULT_SINK: {
1073
34.9k
        if (!thrift_sink.__isset.result_sink) {
1074
0
            return Status::InternalError("Missing data buffer sink.");
1075
0
        }
1076
1077
34.9k
        _sink = std::make_shared<ResultSinkOperatorX>(next_sink_operator_id(), row_desc,
1078
34.9k
                                                      output_exprs, thrift_sink.result_sink);
1079
34.9k
        break;
1080
34.9k
    }
1081
4
    case TDataSinkType::DICTIONARY_SINK: {
1082
4
        if (!thrift_sink.__isset.dictionary_sink) {
1083
0
            return Status::InternalError("Missing dict sink.");
1084
0
        }
1085
1086
4
        _sink = std::make_shared<DictSinkOperatorX>(next_sink_operator_id(), row_desc, output_exprs,
1087
4
                                                    thrift_sink.dictionary_sink);
1088
4
        break;
1089
4
    }
1090
0
    case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
1091
1.25k
    case TDataSinkType::OLAP_TABLE_SINK: {
1092
1.25k
        if (state->query_options().enable_memtable_on_sink_node &&
1093
1.25k
            !_has_inverted_index_v1_or_partial_update(thrift_sink.olap_table_sink) &&
1094
1.25k
            !config::is_cloud_mode()) {
1095
1.06k
            _sink = std::make_shared<OlapTableSinkV2OperatorX>(pool, next_sink_operator_id(),
1096
1.06k
                                                               row_desc, output_exprs);
1097
1.06k
        } else {
1098
194
            _sink = std::make_shared<OlapTableSinkOperatorX>(pool, next_sink_operator_id(),
1099
194
                                                             row_desc, output_exprs);
1100
194
        }
1101
1.25k
        break;
1102
0
    }
1103
0
    case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
1104
0
        DCHECK(thrift_sink.__isset.olap_table_sink);
1105
0
        DCHECK(state->get_query_ctx() != nullptr);
1106
0
        state->get_query_ctx()->query_mem_tracker()->is_group_commit_load = true;
1107
0
        _sink = std::make_shared<GroupCommitBlockSinkOperatorX>(next_sink_operator_id(), row_desc,
1108
0
                                                                output_exprs);
1109
0
        break;
1110
0
    }
1111
733
    case TDataSinkType::HIVE_TABLE_SINK: {
1112
733
        if (!thrift_sink.__isset.hive_table_sink) {
1113
0
            return Status::InternalError("Missing hive table sink.");
1114
0
        }
1115
733
        _sink = std::make_shared<HiveTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1116
733
                                                         output_exprs);
1117
733
        break;
1118
733
    }
1119
866
    case TDataSinkType::ICEBERG_TABLE_SINK: {
1120
866
        if (!thrift_sink.__isset.iceberg_table_sink) {
1121
0
            return Status::InternalError("Missing iceberg table sink.");
1122
0
        }
1123
866
        if (thrift_sink.iceberg_table_sink.__isset.sort_info) {
1124
0
            _sink = std::make_shared<SpillIcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1125
0
                                                                     row_desc, output_exprs);
1126
866
        } else {
1127
866
            _sink = std::make_shared<IcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1128
866
                                                                row_desc, output_exprs);
1129
866
        }
1130
866
        break;
1131
866
    }
1132
10
    case TDataSinkType::ICEBERG_DELETE_SINK: {
1133
10
        if (!thrift_sink.__isset.iceberg_delete_sink) {
1134
0
            return Status::InternalError("Missing iceberg delete sink.");
1135
0
        }
1136
10
        _sink = std::make_shared<IcebergDeleteSinkOperatorX>(pool, next_sink_operator_id(),
1137
10
                                                             row_desc, output_exprs);
1138
10
        break;
1139
10
    }
1140
40
    case TDataSinkType::ICEBERG_MERGE_SINK: {
1141
40
        if (!thrift_sink.__isset.iceberg_merge_sink) {
1142
0
            return Status::InternalError("Missing iceberg merge sink.");
1143
0
        }
1144
40
        _sink = std::make_shared<IcebergMergeSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1145
40
                                                            output_exprs);
1146
40
        break;
1147
40
    }
1148
0
    case TDataSinkType::MAXCOMPUTE_TABLE_SINK: {
1149
0
        if (!thrift_sink.__isset.max_compute_table_sink) {
1150
0
            return Status::InternalError("Missing max compute table sink.");
1151
0
        }
1152
0
        _sink = std::make_shared<MCTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1153
0
                                                       output_exprs);
1154
0
        break;
1155
0
    }
1156
40
    case TDataSinkType::JDBC_TABLE_SINK: {
1157
40
        if (!thrift_sink.__isset.jdbc_table_sink) {
1158
0
            return Status::InternalError("Missing data jdbc sink.");
1159
0
        }
1160
40
        if (config::enable_java_support) {
1161
40
            _sink = std::make_shared<JdbcTableSinkOperatorX>(row_desc, next_sink_operator_id(),
1162
40
                                                             output_exprs);
1163
40
        } else {
1164
0
            return Status::InternalError(
1165
0
                    "Jdbc table sink is not enabled, you can change be config "
1166
0
                    "enable_java_support to true and restart be.");
1167
0
        }
1168
40
        break;
1169
40
    }
1170
40
    case TDataSinkType::MEMORY_SCRATCH_SINK: {
1171
0
        if (!thrift_sink.__isset.memory_scratch_sink) {
1172
0
            return Status::InternalError("Missing data buffer sink.");
1173
0
        }
1174
1175
0
        _sink = std::make_shared<MemoryScratchSinkOperatorX>(row_desc, next_sink_operator_id(),
1176
0
                                                             output_exprs);
1177
0
        break;
1178
0
    }
1179
82
    case TDataSinkType::RESULT_FILE_SINK: {
1180
82
        if (!thrift_sink.__isset.result_file_sink) {
1181
0
            return Status::InternalError("Missing result file sink.");
1182
0
        }
1183
1184
        // Result file sink is not the top sink
1185
82
        if (params.__isset.destinations && !params.destinations.empty()) {
1186
0
            _sink = std::make_shared<ResultFileSinkOperatorX>(
1187
0
                    next_sink_operator_id(), row_desc, thrift_sink.result_file_sink,
1188
0
                    params.destinations, output_exprs, desc_tbl);
1189
82
        } else {
1190
82
            _sink = std::make_shared<ResultFileSinkOperatorX>(next_sink_operator_id(), row_desc,
1191
82
                                                              output_exprs);
1192
82
        }
1193
82
        break;
1194
82
    }
1195
766
    case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
1196
766
        DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
1197
766
        DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
1198
766
        auto sink_id = next_sink_operator_id();
1199
766
        const int multi_cast_node_id = sink_id;
1200
766
        auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size();
1201
        // one sink has multiple sources.
1202
766
        std::vector<int> sources;
1203
3.06k
        for (int i = 0; i < sender_size; ++i) {
1204
2.29k
            auto source_id = next_operator_id();
1205
2.29k
            sources.push_back(source_id);
1206
2.29k
        }
1207
1208
766
        _sink = std::make_shared<MultiCastDataStreamSinkOperatorX>(
1209
766
                sink_id, multi_cast_node_id, sources, pool, thrift_sink.multi_cast_stream_sink);
1210
3.06k
        for (int i = 0; i < sender_size; ++i) {
1211
2.29k
            auto new_pipeline = add_pipeline();
1212
            // use to exchange sink
1213
2.29k
            RowDescriptor* exchange_row_desc = nullptr;
1214
2.29k
            {
1215
2.29k
                const auto& tmp_row_desc =
1216
2.29k
                        !thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty()
1217
2.29k
                                ? RowDescriptor(state->desc_tbl(),
1218
2.29k
                                                {thrift_sink.multi_cast_stream_sink.sinks[i]
1219
2.29k
                                                         .output_tuple_id})
1220
2.29k
                                : row_desc;
1221
2.29k
                exchange_row_desc = pool->add(new RowDescriptor(tmp_row_desc));
1222
2.29k
            }
1223
2.29k
            auto source_id = sources[i];
1224
2.29k
            OperatorPtr source_op;
1225
            // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
1226
2.29k
            source_op = std::make_shared<MultiCastDataStreamerSourceOperatorX>(
1227
2.29k
                    /*node_id*/ source_id, /*consumer_id*/ i, pool,
1228
2.29k
                    thrift_sink.multi_cast_stream_sink.sinks[i], row_desc,
1229
2.29k
                    /*operator_id=*/source_id);
1230
2.29k
            RETURN_IF_ERROR(new_pipeline->add_operator(
1231
2.29k
                    source_op, params.__isset.parallel_instances ? params.parallel_instances : 0));
1232
            // 2. create and set sink operator of data stream sender for new pipeline
1233
1234
2.29k
            DataSinkOperatorPtr sink_op;
1235
2.29k
            sink_op = std::make_shared<ExchangeSinkOperatorX>(
1236
2.29k
                    state, *exchange_row_desc, next_sink_operator_id(),
1237
2.29k
                    thrift_sink.multi_cast_stream_sink.sinks[i],
1238
2.29k
                    thrift_sink.multi_cast_stream_sink.destinations[i], _fragment_instance_ids);
1239
1240
2.29k
            RETURN_IF_ERROR(new_pipeline->set_sink(sink_op));
1241
2.29k
            {
1242
2.29k
                TDataSink* t = pool->add(new TDataSink());
1243
2.29k
                t->stream_sink = thrift_sink.multi_cast_stream_sink.sinks[i];
1244
2.29k
                RETURN_IF_ERROR(sink_op->init(*t));
1245
2.29k
            }
1246
1247
            // 3. set dependency dag
1248
2.29k
            _dag[new_pipeline->id()].push_back(cur_pipeline_id);
1249
2.29k
        }
1250
766
        if (sources.empty()) {
1251
0
            return Status::InternalError("size of sources must be greater than 0");
1252
0
        }
1253
766
        break;
1254
766
    }
1255
766
    case TDataSinkType::BLACKHOLE_SINK: {
1256
5
        if (!thrift_sink.__isset.blackhole_sink) {
1257
0
            return Status::InternalError("Missing blackhole sink.");
1258
0
        }
1259
1260
5
        _sink.reset(new BlackholeSinkOperatorX(next_sink_operator_id()));
1261
5
        break;
1262
5
    }
1263
78
    case TDataSinkType::TVF_TABLE_SINK: {
1264
78
        if (!thrift_sink.__isset.tvf_table_sink) {
1265
0
            return Status::InternalError("Missing TVF table sink.");
1266
0
        }
1267
78
        _sink = std::make_shared<TVFTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1268
78
                                                        output_exprs);
1269
78
        break;
1270
78
    }
1271
0
    default:
1272
0
        return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type);
1273
55.1k
    }
1274
55.1k
    return Status::OK();
1275
55.1k
}
1276
1277
// NOLINTBEGIN(readability-function-size)
1278
// NOLINTBEGIN(readability-function-cognitive-complexity)
1279
Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNode& tnode,
1280
                                                 const DescriptorTbl& descs, OperatorPtr& op,
1281
                                                 PipelinePtr& cur_pipe, int parent_idx,
1282
                                                 int child_idx,
1283
                                                 const bool followed_by_shuffled_operator,
1284
                                                 const bool require_bucket_distribution,
1285
78.5k
                                                 OperatorPtr& cache_op) {
1286
78.5k
    std::vector<DataSinkOperatorPtr> sink_ops;
1287
78.5k
    Defer defer = Defer([&]() {
1288
78.5k
        if (op) {
1289
78.5k
            op->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1290
78.5k
        }
1291
78.4k
        for (auto& s : sink_ops) {
1292
19.0k
            s->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1293
19.0k
        }
1294
78.4k
    });
1295
    // We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
1296
    // Therefore, here we need to use a stack-like structure.
1297
78.5k
    _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
1298
78.5k
    std::stringstream error_msg;
1299
78.5k
    bool enable_query_cache = _params.fragment.__isset.query_cache_param;
1300
1301
78.5k
    bool fe_with_old_version = false;
1302
78.5k
    switch (tnode.node_type) {
1303
24.9k
    case TPlanNodeType::OLAP_SCAN_NODE: {
1304
24.9k
        op = std::make_shared<OlapScanOperatorX>(
1305
24.9k
                pool, tnode, next_operator_id(), descs, _num_instances,
1306
24.9k
                enable_query_cache ? _params.fragment.query_cache_param : TQueryCacheParam {});
1307
24.9k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1308
24.9k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1309
24.9k
        break;
1310
24.9k
    }
1311
0
    case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
1312
0
        DCHECK(_query_ctx != nullptr);
1313
0
        _query_ctx->query_mem_tracker()->is_group_commit_load = true;
1314
0
        op = std::make_shared<GroupCommitOperatorX>(pool, tnode, next_operator_id(), descs,
1315
0
                                                    _num_instances);
1316
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1317
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1318
0
        break;
1319
0
    }
1320
0
    case TPlanNodeType::JDBC_SCAN_NODE: {
1321
0
        if (config::enable_java_support) {
1322
0
            op = std::make_shared<JDBCScanOperatorX>(pool, tnode, next_operator_id(), descs,
1323
0
                                                     _num_instances);
1324
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1325
0
        } else {
1326
0
            return Status::InternalError(
1327
0
                    "Jdbc scan node is disabled, you can change be config enable_java_support "
1328
0
                    "to true and restart be.");
1329
0
        }
1330
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1331
0
        break;
1332
0
    }
1333
10.2k
    case TPlanNodeType::FILE_SCAN_NODE: {
1334
10.2k
        op = std::make_shared<FileScanOperatorX>(pool, tnode, next_operator_id(), descs,
1335
10.2k
                                                 _num_instances);
1336
10.2k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1337
10.2k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1338
10.2k
        break;
1339
10.2k
    }
1340
0
    case TPlanNodeType::ES_SCAN_NODE:
1341
296
    case TPlanNodeType::ES_HTTP_SCAN_NODE: {
1342
296
        op = std::make_shared<EsScanOperatorX>(pool, tnode, next_operator_id(), descs,
1343
296
                                               _num_instances);
1344
296
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1345
296
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1346
296
        break;
1347
296
    }
1348
18.6k
    case TPlanNodeType::EXCHANGE_NODE: {
1349
18.6k
        int num_senders = _params.per_exch_num_senders.contains(tnode.node_id)
1350
18.6k
                                  ? _params.per_exch_num_senders.find(tnode.node_id)->second
1351
18.4E
                                  : 0;
1352
18.6k
        DCHECK_GT(num_senders, 0);
1353
18.6k
        op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs,
1354
18.6k
                                                       num_senders);
1355
18.6k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1356
18.6k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1357
18.6k
        break;
1358
18.6k
    }
1359
11.8k
    case TPlanNodeType::AGGREGATION_NODE: {
1360
11.8k
        if (tnode.agg_node.grouping_exprs.empty() &&
1361
11.8k
            descs.get_tuple_descriptor(tnode.agg_node.output_tuple_id)->slots().empty()) {
1362
0
            return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) +
1363
0
                                         ": group by and output is empty");
1364
0
        }
1365
11.8k
        bool need_create_cache_op =
1366
11.8k
                enable_query_cache && tnode.node_id == _params.fragment.query_cache_param.node_id;
1367
11.8k
        auto create_query_cache_operator = [&](PipelinePtr& new_pipe) {
1368
0
            auto cache_node_id = _params.local_params[0].per_node_scan_ranges.begin()->first;
1369
0
            auto cache_source_id = next_operator_id();
1370
0
            op = std::make_shared<CacheSourceOperatorX>(pool, cache_node_id, cache_source_id,
1371
0
                                                        _params.fragment.query_cache_param);
1372
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1373
1374
0
            const auto downstream_pipeline_id = cur_pipe->id();
1375
0
            if (!_dag.contains(downstream_pipeline_id)) {
1376
0
                _dag.insert({downstream_pipeline_id, {}});
1377
0
            }
1378
0
            new_pipe = add_pipeline(cur_pipe);
1379
0
            _dag[downstream_pipeline_id].push_back(new_pipe->id());
1380
1381
0
            DataSinkOperatorPtr cache_sink(new CacheSinkOperatorX(
1382
0
                    next_sink_operator_id(), op->node_id(), op->operator_id()));
1383
0
            RETURN_IF_ERROR(new_pipe->set_sink(cache_sink));
1384
0
            return Status::OK();
1385
0
        };
1386
11.8k
        const bool group_by_limit_opt =
1387
11.8k
                tnode.agg_node.__isset.agg_sort_info_by_group_key && tnode.limit > 0;
1388
1389
        /// PartitionedAggSourceOperatorX does not support "group by limit opt(#29641)" yet.
1390
        /// If `group_by_limit_opt` is true, then it might not need to spill at all.
1391
11.8k
        const bool enable_spill = _runtime_state->enable_spill() &&
1392
11.8k
                                  !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt;
1393
11.8k
        const bool is_streaming_agg = tnode.agg_node.__isset.use_streaming_preaggregation &&
1394
11.8k
                                      tnode.agg_node.use_streaming_preaggregation &&
1395
11.8k
                                      !tnode.agg_node.grouping_exprs.empty();
1396
        // TODO: distinct streaming agg does not support spill.
1397
11.8k
        const bool can_use_distinct_streaming_agg =
1398
11.8k
                (!enable_spill || is_streaming_agg) && tnode.agg_node.aggregate_functions.empty() &&
1399
11.8k
                !tnode.agg_node.__isset.agg_sort_info_by_group_key &&
1400
11.8k
                _params.query_options.__isset.enable_distinct_streaming_aggregation &&
1401
11.8k
                _params.query_options.enable_distinct_streaming_aggregation;
1402
1403
11.8k
        if (can_use_distinct_streaming_agg) {
1404
18
            if (need_create_cache_op) {
1405
0
                PipelinePtr new_pipe;
1406
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1407
1408
0
                cache_op = op;
1409
0
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1410
0
                                                                     tnode, descs);
1411
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1412
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1413
0
                cur_pipe = new_pipe;
1414
18
            } else {
1415
18
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1416
18
                                                                     tnode, descs);
1417
18
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1418
18
            }
1419
11.8k
        } else if (is_streaming_agg) {
1420
1.00k
            if (need_create_cache_op) {
1421
0
                PipelinePtr new_pipe;
1422
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1423
0
                cache_op = op;
1424
0
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1425
0
                                                             descs);
1426
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1427
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1428
0
                cur_pipe = new_pipe;
1429
1.00k
            } else {
1430
1.00k
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1431
1.00k
                                                             descs);
1432
1.00k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1433
1.00k
            }
1434
10.8k
        } else {
1435
            // create new pipeline to add query cache operator
1436
10.8k
            PipelinePtr new_pipe;
1437
10.8k
            if (need_create_cache_op) {
1438
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1439
0
                cache_op = op;
1440
0
            }
1441
1442
10.8k
            if (enable_spill) {
1443
94
                op = std::make_shared<PartitionedAggSourceOperatorX>(pool, tnode,
1444
94
                                                                     next_operator_id(), descs);
1445
10.7k
            } else {
1446
10.7k
                op = std::make_shared<AggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1447
10.7k
            }
1448
10.8k
            if (need_create_cache_op) {
1449
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1450
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1451
0
                cur_pipe = new_pipe;
1452
10.8k
            } else {
1453
10.8k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1454
10.8k
            }
1455
1456
10.8k
            const auto downstream_pipeline_id = cur_pipe->id();
1457
10.8k
            if (!_dag.contains(downstream_pipeline_id)) {
1458
10.0k
                _dag.insert({downstream_pipeline_id, {}});
1459
10.0k
            }
1460
10.8k
            cur_pipe = add_pipeline(cur_pipe);
1461
10.8k
            _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1462
1463
10.8k
            if (enable_spill) {
1464
94
                sink_ops.push_back(std::make_shared<PartitionedAggSinkOperatorX>(
1465
94
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1466
10.7k
            } else {
1467
10.7k
                sink_ops.push_back(std::make_shared<AggSinkOperatorX>(
1468
10.7k
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1469
10.7k
            }
1470
10.8k
            RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1471
10.8k
            RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1472
10.8k
        }
1473
11.8k
        break;
1474
11.8k
    }
1475
11.8k
    case TPlanNodeType::BUCKETED_AGGREGATION_NODE: {
1476
0
        if (tnode.bucketed_agg_node.grouping_exprs.empty()) {
1477
0
            return Status::InternalError(
1478
0
                    "Bucketed aggregation node {} should not be used without group by keys",
1479
0
                    tnode.node_id);
1480
0
        }
1481
1482
        // Create source operator (goes on the current / downstream pipeline).
1483
0
        op = std::make_shared<BucketedAggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1484
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1485
1486
        // Create a new pipeline for the sink side.
1487
0
        const auto downstream_pipeline_id = cur_pipe->id();
1488
0
        if (!_dag.contains(downstream_pipeline_id)) {
1489
0
            _dag.insert({downstream_pipeline_id, {}});
1490
0
        }
1491
0
        cur_pipe = add_pipeline(cur_pipe);
1492
0
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1493
1494
        // Create sink operator.
1495
0
        sink_ops.push_back(std::make_shared<BucketedAggSinkOperatorX>(
1496
0
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1497
0
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1498
0
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1499
1500
        // Pre-register a single shared state for ALL instances so that every
1501
        // sink instance writes its per-instance hash table into the same
1502
        // BucketedAggSharedState and every source instance can merge across
1503
        // all of them.
1504
0
        {
1505
0
            auto shared_state = BucketedAggSharedState::create_shared();
1506
0
            shared_state->id = op->operator_id();
1507
0
            shared_state->related_op_ids.insert(op->operator_id());
1508
1509
0
            for (int i = 0; i < _num_instances; i++) {
1510
0
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1511
0
                                                             "BUCKETED_AGG_SINK_DEPENDENCY");
1512
0
                sink_dep->set_shared_state(shared_state.get());
1513
0
                shared_state->sink_deps.push_back(sink_dep);
1514
0
            }
1515
0
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1516
0
                                                     op->node_id(), "BUCKETED_AGG_SOURCE");
1517
0
            _op_id_to_shared_state.insert(
1518
0
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1519
0
        }
1520
0
        break;
1521
0
    }
1522
412
    case TPlanNodeType::HASH_JOIN_NODE: {
1523
412
        const auto is_broadcast_join = tnode.hash_join_node.__isset.is_broadcast_join &&
1524
412
                                       tnode.hash_join_node.is_broadcast_join;
1525
412
        const auto enable_spill = _runtime_state->enable_spill();
1526
412
        if (enable_spill && !is_broadcast_join) {
1527
0
            auto tnode_ = tnode;
1528
0
            tnode_.runtime_filters.clear();
1529
0
            auto inner_probe_operator =
1530
0
                    std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, descs);
1531
1532
            // probe side inner sink operator is used to build hash table on probe side when data is spilled.
1533
            // So here use `tnode_` which has no runtime filters.
1534
0
            auto probe_side_inner_sink_operator =
1535
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode_, descs);
1536
1537
0
            RETURN_IF_ERROR(inner_probe_operator->init(tnode_, _runtime_state.get()));
1538
0
            RETURN_IF_ERROR(probe_side_inner_sink_operator->init(tnode_, _runtime_state.get()));
1539
1540
0
            auto probe_operator = std::make_shared<PartitionedHashJoinProbeOperatorX>(
1541
0
                    pool, tnode_, next_operator_id(), descs);
1542
0
            probe_operator->set_inner_operators(probe_side_inner_sink_operator,
1543
0
                                                inner_probe_operator);
1544
0
            op = std::move(probe_operator);
1545
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1546
1547
0
            const auto downstream_pipeline_id = cur_pipe->id();
1548
0
            if (!_dag.contains(downstream_pipeline_id)) {
1549
0
                _dag.insert({downstream_pipeline_id, {}});
1550
0
            }
1551
0
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1552
0
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1553
1554
0
            auto inner_sink_operator =
1555
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode, descs);
1556
0
            auto sink_operator = std::make_shared<PartitionedHashJoinSinkOperatorX>(
1557
0
                    pool, next_sink_operator_id(), op->operator_id(), tnode_, descs);
1558
0
            RETURN_IF_ERROR(inner_sink_operator->init(tnode, _runtime_state.get()));
1559
1560
0
            sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator);
1561
0
            sink_ops.push_back(std::move(sink_operator));
1562
0
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1563
0
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode_, _runtime_state.get()));
1564
1565
0
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1566
0
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1567
412
        } else {
1568
412
            op = std::make_shared<HashJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1569
412
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1570
1571
412
            const auto downstream_pipeline_id = cur_pipe->id();
1572
412
            if (!_dag.contains(downstream_pipeline_id)) {
1573
406
                _dag.insert({downstream_pipeline_id, {}});
1574
406
            }
1575
412
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1576
412
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1577
1578
412
            sink_ops.push_back(std::make_shared<HashJoinBuildSinkOperatorX>(
1579
412
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1580
412
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1581
412
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1582
1583
412
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1584
412
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1585
412
        }
1586
412
        if (is_broadcast_join && _runtime_state->enable_share_hash_table_for_broadcast_join()) {
1587
374
            std::shared_ptr<HashJoinSharedState> shared_state =
1588
374
                    HashJoinSharedState::create_shared(_num_instances);
1589
3.26k
            for (int i = 0; i < _num_instances; i++) {
1590
2.89k
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1591
2.89k
                                                             "HASH_JOIN_BUILD_DEPENDENCY");
1592
2.89k
                sink_dep->set_shared_state(shared_state.get());
1593
2.89k
                shared_state->sink_deps.push_back(sink_dep);
1594
2.89k
            }
1595
374
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1596
374
                                                     op->node_id(), "HASH_JOIN_PROBE");
1597
374
            _op_id_to_shared_state.insert(
1598
374
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1599
374
        }
1600
412
        break;
1601
412
    }
1602
1.56k
    case TPlanNodeType::CROSS_JOIN_NODE: {
1603
1.56k
        op = std::make_shared<NestedLoopJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1604
1.56k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1605
1606
1.56k
        const auto downstream_pipeline_id = cur_pipe->id();
1607
1.56k
        if (!_dag.contains(downstream_pipeline_id)) {
1608
1.56k
            _dag.insert({downstream_pipeline_id, {}});
1609
1.56k
        }
1610
1.56k
        PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1611
1.56k
        _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1612
1613
1.56k
        sink_ops.push_back(std::make_shared<NestedLoopJoinBuildSinkOperatorX>(
1614
1.56k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1615
1.56k
        RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1616
1.56k
        RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1617
1.56k
        _pipeline_parent_map.push(op->node_id(), cur_pipe);
1618
1.56k
        _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1619
1.56k
        break;
1620
1.56k
    }
1621
2.09k
    case TPlanNodeType::UNION_NODE: {
1622
2.09k
        int child_count = tnode.num_children;
1623
2.09k
        op = std::make_shared<UnionSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1624
2.09k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1625
1626
2.09k
        const auto downstream_pipeline_id = cur_pipe->id();
1627
2.09k
        if (!_dag.contains(downstream_pipeline_id)) {
1628
2.06k
            _dag.insert({downstream_pipeline_id, {}});
1629
2.06k
        }
1630
2.42k
        for (int i = 0; i < child_count; i++) {
1631
338
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1632
338
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1633
338
            sink_ops.push_back(std::make_shared<UnionSinkOperatorX>(
1634
338
                    i, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1635
338
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1636
338
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1637
            // preset children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1638
338
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1639
338
        }
1640
2.09k
        break;
1641
2.09k
    }
1642
5.87k
    case TPlanNodeType::SORT_NODE: {
1643
5.87k
        const auto should_spill = _runtime_state->enable_spill() &&
1644
5.87k
                                  tnode.sort_node.algorithm == TSortAlgorithm::FULL_SORT;
1645
5.87k
        const bool use_local_merge =
1646
5.87k
                tnode.sort_node.__isset.use_local_merge && tnode.sort_node.use_local_merge;
1647
5.87k
        if (should_spill) {
1648
0
            op = std::make_shared<SpillSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1649
5.87k
        } else if (use_local_merge) {
1650
5.64k
            op = std::make_shared<LocalMergeSortSourceOperatorX>(pool, tnode, next_operator_id(),
1651
5.64k
                                                                 descs);
1652
5.64k
        } else {
1653
232
            op = std::make_shared<SortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1654
232
        }
1655
5.87k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1656
1657
5.87k
        const auto downstream_pipeline_id = cur_pipe->id();
1658
5.87k
        if (!_dag.contains(downstream_pipeline_id)) {
1659
5.87k
            _dag.insert({downstream_pipeline_id, {}});
1660
5.87k
        }
1661
5.87k
        cur_pipe = add_pipeline(cur_pipe);
1662
5.87k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1663
1664
5.87k
        if (should_spill) {
1665
0
            sink_ops.push_back(std::make_shared<SpillSortSinkOperatorX>(
1666
0
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1667
5.87k
        } else {
1668
5.87k
            sink_ops.push_back(std::make_shared<SortSinkOperatorX>(
1669
5.87k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1670
5.87k
        }
1671
5.87k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1672
5.87k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1673
5.87k
        break;
1674
5.87k
    }
1675
5.87k
    case TPlanNodeType::PARTITION_SORT_NODE: {
1676
0
        op = std::make_shared<PartitionSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1677
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1678
1679
0
        const auto downstream_pipeline_id = cur_pipe->id();
1680
0
        if (!_dag.contains(downstream_pipeline_id)) {
1681
0
            _dag.insert({downstream_pipeline_id, {}});
1682
0
        }
1683
0
        cur_pipe = add_pipeline(cur_pipe);
1684
0
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1685
1686
0
        sink_ops.push_back(std::make_shared<PartitionSortSinkOperatorX>(
1687
0
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1688
0
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1689
0
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1690
0
        break;
1691
0
    }
1692
22
    case TPlanNodeType::ANALYTIC_EVAL_NODE: {
1693
22
        op = std::make_shared<AnalyticSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1694
22
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1695
1696
22
        const auto downstream_pipeline_id = cur_pipe->id();
1697
22
        if (!_dag.contains(downstream_pipeline_id)) {
1698
22
            _dag.insert({downstream_pipeline_id, {}});
1699
22
        }
1700
22
        cur_pipe = add_pipeline(cur_pipe);
1701
22
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1702
1703
22
        sink_ops.push_back(std::make_shared<AnalyticSinkOperatorX>(
1704
22
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1705
22
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1706
22
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1707
22
        break;
1708
22
    }
1709
468
    case TPlanNodeType::MATERIALIZATION_NODE: {
1710
468
        op = std::make_shared<MaterializationOperator>(pool, tnode, next_operator_id(), descs);
1711
468
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1712
468
        break;
1713
468
    }
1714
468
    case TPlanNodeType::INTERSECT_NODE: {
1715
0
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(pool, tnode, descs, op,
1716
0
                                                                      cur_pipe, sink_ops));
1717
0
        break;
1718
0
    }
1719
0
    case TPlanNodeType::EXCEPT_NODE: {
1720
0
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(pool, tnode, descs, op,
1721
0
                                                                       cur_pipe, sink_ops));
1722
0
        break;
1723
0
    }
1724
0
    case TPlanNodeType::REPEAT_NODE: {
1725
0
        op = std::make_shared<RepeatOperatorX>(pool, tnode, next_operator_id(), descs);
1726
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1727
0
        break;
1728
0
    }
1729
2
    case TPlanNodeType::TABLE_FUNCTION_NODE: {
1730
2
        op = std::make_shared<TableFunctionOperatorX>(pool, tnode, next_operator_id(), descs);
1731
2
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1732
2
        break;
1733
2
    }
1734
100
    case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
1735
100
        op = std::make_shared<AssertNumRowsOperatorX>(pool, tnode, next_operator_id(), descs);
1736
100
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1737
100
        break;
1738
100
    }
1739
100
    case TPlanNodeType::EMPTY_SET_NODE: {
1740
69
        op = std::make_shared<EmptySetSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1741
69
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1742
69
        break;
1743
69
    }
1744
97
    case TPlanNodeType::DATA_GEN_SCAN_NODE: {
1745
97
        op = std::make_shared<DataGenSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1746
97
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1747
97
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1748
97
        break;
1749
97
    }
1750
392
    case TPlanNodeType::SCHEMA_SCAN_NODE: {
1751
392
        op = std::make_shared<SchemaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1752
392
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1753
392
        break;
1754
392
    }
1755
734
    case TPlanNodeType::META_SCAN_NODE: {
1756
734
        op = std::make_shared<MetaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1757
734
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1758
734
        break;
1759
734
    }
1760
754
    case TPlanNodeType::SELECT_NODE: {
1761
754
        op = std::make_shared<SelectOperatorX>(pool, tnode, next_operator_id(), descs);
1762
754
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1763
754
        break;
1764
754
    }
1765
754
    case TPlanNodeType::REC_CTE_NODE: {
1766
0
        op = std::make_shared<RecCTESourceOperatorX>(pool, tnode, next_operator_id(), descs);
1767
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1768
1769
0
        const auto downstream_pipeline_id = cur_pipe->id();
1770
0
        if (!_dag.contains(downstream_pipeline_id)) {
1771
0
            _dag.insert({downstream_pipeline_id, {}});
1772
0
        }
1773
1774
0
        PipelinePtr anchor_side_pipe = add_pipeline(cur_pipe);
1775
0
        _dag[downstream_pipeline_id].push_back(anchor_side_pipe->id());
1776
1777
0
        DataSinkOperatorPtr anchor_sink;
1778
0
        anchor_sink = std::make_shared<RecCTEAnchorSinkOperatorX>(next_sink_operator_id(),
1779
0
                                                                  op->operator_id(), tnode, descs);
1780
0
        RETURN_IF_ERROR(anchor_side_pipe->set_sink(anchor_sink));
1781
0
        RETURN_IF_ERROR(anchor_side_pipe->sink()->init(tnode, _runtime_state.get()));
1782
0
        _pipeline_parent_map.push(op->node_id(), anchor_side_pipe);
1783
1784
0
        PipelinePtr rec_side_pipe = add_pipeline(cur_pipe);
1785
0
        _dag[downstream_pipeline_id].push_back(rec_side_pipe->id());
1786
1787
0
        DataSinkOperatorPtr rec_sink;
1788
0
        rec_sink = std::make_shared<RecCTESinkOperatorX>(next_sink_operator_id(), op->operator_id(),
1789
0
                                                         tnode, descs);
1790
0
        RETURN_IF_ERROR(rec_side_pipe->set_sink(rec_sink));
1791
0
        RETURN_IF_ERROR(rec_side_pipe->sink()->init(tnode, _runtime_state.get()));
1792
0
        _pipeline_parent_map.push(op->node_id(), rec_side_pipe);
1793
1794
0
        break;
1795
0
    }
1796
0
    case TPlanNodeType::REC_CTE_SCAN_NODE: {
1797
0
        op = std::make_shared<RecCTEScanOperatorX>(pool, tnode, next_operator_id(), descs);
1798
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1799
0
        break;
1800
0
    }
1801
0
    default:
1802
0
        return Status::InternalError("Unsupported exec type in pipeline: {}",
1803
0
                                     print_plan_node_type(tnode.node_type));
1804
78.5k
    }
1805
78.5k
    if (_params.__isset.parallel_instances && fe_with_old_version) {
1806
0
        cur_pipe->set_num_tasks(_params.parallel_instances);
1807
0
        op->set_serial_operator();
1808
0
    }
1809
1810
78.5k
    return Status::OK();
1811
78.5k
}
1812
// NOLINTEND(readability-function-cognitive-complexity)
1813
// NOLINTEND(readability-function-size)
1814
1815
template <bool is_intersect>
1816
Status PipelineFragmentContext::_build_operators_for_set_operation_node(
1817
        ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op,
1818
0
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1819
0
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1820
0
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1821
1822
0
    const auto downstream_pipeline_id = cur_pipe->id();
1823
0
    if (!_dag.contains(downstream_pipeline_id)) {
1824
0
        _dag.insert({downstream_pipeline_id, {}});
1825
0
    }
1826
1827
0
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1828
0
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1829
0
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1830
1831
0
        if (child_id == 0) {
1832
0
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1833
0
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1834
0
        } else {
1835
0
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1836
0
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1837
0
        }
1838
0
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1839
0
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1840
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1841
0
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1842
0
    }
1843
1844
0
    return Status::OK();
1845
0
}
Unexecuted instantiation: _ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb1EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Unexecuted instantiation: _ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb0EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
1846
1847
55.0k
Status PipelineFragmentContext::submit() {
1848
55.0k
    if (_submitted) {
1849
0
        return Status::InternalError("submitted");
1850
0
    }
1851
55.0k
    _submitted = true;
1852
1853
55.0k
    int submit_tasks = 0;
1854
55.0k
    Status st;
1855
55.0k
    auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
1856
111k
    for (auto& task : _tasks) {
1857
198k
        for (auto& t : task) {
1858
198k
            st = scheduler->submit(t.first);
1859
198k
            DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed",
1860
198k
                            { st = Status::Aborted("PipelineFragmentContext.submit.failed"); });
1861
198k
            if (!st) {
1862
0
                cancel(Status::InternalError("submit context to executor fail"));
1863
0
                std::lock_guard<std::mutex> l(_task_mutex);
1864
0
                _total_tasks = submit_tasks;
1865
0
                break;
1866
0
            }
1867
198k
            submit_tasks++;
1868
198k
        }
1869
111k
    }
1870
55.0k
    if (!st.ok()) {
1871
0
        bool need_remove = false;
1872
0
        {
1873
0
            std::lock_guard<std::mutex> l(_task_mutex);
1874
0
            if (_closed_tasks >= _total_tasks) {
1875
0
                need_remove = _close_fragment_instance();
1876
0
            }
1877
0
        }
1878
        // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
1879
0
        if (need_remove) {
1880
0
            _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
1881
0
        }
1882
0
        return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(),
1883
0
                                     BackendOptions::get_localhost());
1884
55.0k
    } else {
1885
55.0k
        return st;
1886
55.0k
    }
1887
55.0k
}
1888
1889
21
void PipelineFragmentContext::print_profile(const std::string& extra_info) {
1890
21
    if (_runtime_state->enable_profile()) {
1891
0
        std::stringstream ss;
1892
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
1893
0
            runtime_profile_ptr->pretty_print(&ss);
1894
0
        }
1895
1896
0
        if (_runtime_state->load_channel_profile()) {
1897
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
1898
0
        }
1899
1900
0
        auto profile_str =
1901
0
                fmt::format("Query {} fragment {} {}, profile, {}", print_id(this->_query_id),
1902
0
                            this->_fragment_id, extra_info, ss.str());
1903
0
        LOG_LONG_STRING(INFO, profile_str);
1904
0
    }
1905
21
}
1906
// If all pipeline tasks binded to the fragment instance are finished, then we could
1907
// close the fragment instance.
1908
// Returns true if the caller should call remove_pipeline_context() **after** releasing
1909
// _task_mutex. We must not call remove_pipeline_context() here because it acquires
1910
// _pipeline_map's shard lock, and this function is called while _task_mutex is held.
1911
// Acquiring _pipeline_map while holding _task_mutex creates an ABBA deadlock with
1912
// dump_pipeline_tasks(), which acquires _pipeline_map first and then _task_mutex
1913
// (via debug_string()).
1914
55.1k
bool PipelineFragmentContext::_close_fragment_instance() {
1915
55.1k
    if (_is_fragment_instance_closed) {
1916
0
        return false;
1917
0
    }
1918
55.1k
    Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
1919
55.1k
    _fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
1920
55.1k
    if (!_need_notify_close) {
1921
55.1k
        auto st = send_report(true);
1922
55.1k
        if (!st) {
1923
0
            LOG(WARNING) << fmt::format("Failed to send report for query {}, fragment {}: {}",
1924
0
                                        print_id(_query_id), _fragment_id, st.to_string());
1925
0
        }
1926
55.1k
    }
1927
    // Print profile content in info log is a tempoeray solution for stream load and external_connector.
1928
    // Since stream load does not have someting like coordinator on FE, so
1929
    // backend can not report profile to FE, ant its profile can not be shown
1930
    // in the same way with other query. So we print the profile content to info log.
1931
1932
55.1k
    if (_runtime_state->enable_profile() &&
1933
55.1k
        (_query_ctx->get_query_source() == QuerySource::STREAM_LOAD ||
1934
220
         _query_ctx->get_query_source() == QuerySource::EXTERNAL_CONNECTOR ||
1935
220
         _query_ctx->get_query_source() == QuerySource::GROUP_COMMIT_LOAD)) {
1936
0
        std::stringstream ss;
1937
        // Compute the _local_time_percent before pretty_print the runtime_profile
1938
        // Before add this operation, the print out like that:
1939
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 00.00%)
1940
        // After add the operation, the print out like that:
1941
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 82.53%)
1942
        // We can easily know the exec node execute time without child time consumed.
1943
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
1944
0
            runtime_profile_ptr->pretty_print(&ss);
1945
0
        }
1946
1947
0
        if (_runtime_state->load_channel_profile()) {
1948
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
1949
0
        }
1950
1951
0
        LOG_INFO("Query {} fragment {} profile:\n {}", print_id(_query_id), _fragment_id, ss.str());
1952
0
    }
1953
1954
55.1k
    if (_query_ctx->enable_profile()) {
1955
220
        _query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile(),
1956
220
                                         collect_realtime_load_channel_profile());
1957
220
    }
1958
1959
    // Return whether the caller needs to remove from the pipeline map.
1960
    // The caller must do this after releasing _task_mutex.
1961
55.1k
    return !_need_notify_close;
1962
55.1k
}
1963
1964
198k
void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
1965
    // If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here
1966
198k
    DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
1967
198k
    if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
1968
84.6k
        if (_dag.contains(pipeline_id)) {
1969
37.6k
            for (auto dep : _dag[pipeline_id]) {
1970
37.6k
                _pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id);
1971
37.6k
            }
1972
30.0k
        }
1973
84.6k
    }
1974
198k
    bool need_remove = false;
1975
198k
    {
1976
198k
        std::lock_guard<std::mutex> l(_task_mutex);
1977
198k
        ++_closed_tasks;
1978
198k
        if (_closed_tasks >= _total_tasks) {
1979
55.1k
            need_remove = _close_fragment_instance();
1980
55.1k
        }
1981
198k
    }
1982
    // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
1983
198k
    if (need_remove) {
1984
55.1k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
1985
55.1k
    }
1986
198k
}
1987
1988
6.16k
std::string PipelineFragmentContext::get_load_error_url() {
1989
6.16k
    if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) {
1990
0
        return to_load_error_http_path(str);
1991
0
    }
1992
18.8k
    for (auto& tasks : _tasks) {
1993
24.6k
        for (auto& task : tasks) {
1994
24.6k
            if (const auto& str = task.second->get_error_log_file_path(); !str.empty()) {
1995
6
                return to_load_error_http_path(str);
1996
6
            }
1997
24.6k
        }
1998
18.8k
    }
1999
6.15k
    return "";
2000
6.16k
}
2001
2002
6.15k
std::string PipelineFragmentContext::get_first_error_msg() {
2003
6.15k
    if (const auto& str = _runtime_state->get_first_error_msg(); !str.empty()) {
2004
0
        return str;
2005
0
    }
2006
18.8k
    for (auto& tasks : _tasks) {
2007
24.6k
        for (auto& task : tasks) {
2008
24.6k
            if (const auto& str = task.second->get_first_error_msg(); !str.empty()) {
2009
6
                return str;
2010
6
            }
2011
24.6k
        }
2012
18.8k
    }
2013
6.15k
    return "";
2014
6.15k
}
2015
2016
0
std::string PipelineFragmentContext::_to_http_path(const std::string& file_name) const {
2017
0
    std::stringstream url;
2018
0
    url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
2019
0
        << "/api/_download_load?"
2020
0
        << "token=" << _exec_env->token() << "&file=" << file_name;
2021
0
    return url.str();
2022
0
}
2023
2024
5.75k
void PipelineFragmentContext::_coordinator_callback(const ReportStatusRequest& req) {
2025
5.75k
    DBUG_EXECUTE_IF("FragmentMgr::coordinator_callback.report_delay", {
2026
5.75k
        int random_seconds = req.status.is<ErrorCode::DATA_QUALITY_ERROR>() ? 8 : 2;
2027
5.75k
        LOG_INFO("sleep : ").tag("time", random_seconds).tag("query_id", print_id(req.query_id));
2028
5.75k
        std::this_thread::sleep_for(std::chrono::seconds(random_seconds));
2029
5.75k
        LOG_INFO("sleep done").tag("query_id", print_id(req.query_id));
2030
5.75k
    });
2031
2032
5.75k
    DCHECK(req.status.ok() || req.done); // if !status.ok() => done
2033
5.75k
    if (req.coord_addr.hostname == "external") {
2034
        // External query (flink/spark read tablets) not need to report to FE.
2035
0
        return;
2036
0
    }
2037
5.75k
    int callback_retries = 10;
2038
5.75k
    const int sleep_ms = 1000;
2039
5.75k
    Status exec_status = req.status;
2040
5.75k
    Status coord_status;
2041
5.75k
    std::unique_ptr<FrontendServiceConnection> coord = nullptr;
2042
5.75k
    do {
2043
5.75k
        coord = std::make_unique<FrontendServiceConnection>(_exec_env->frontend_client_cache(),
2044
5.75k
                                                            req.coord_addr, &coord_status);
2045
5.75k
        if (!coord_status.ok()) {
2046
0
            std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
2047
0
        }
2048
5.75k
    } while (!coord_status.ok() && callback_retries-- > 0);
2049
2050
5.75k
    if (!coord_status.ok()) {
2051
0
        UniqueId uid(req.query_id.hi, req.query_id.lo);
2052
0
        static_cast<void>(req.cancel_fn(Status::InternalError(
2053
0
                "query_id: {}, couldn't get a client for {}, reason is {}", uid.to_string(),
2054
0
                PrintThriftNetworkAddress(req.coord_addr), coord_status.to_string())));
2055
0
        return;
2056
0
    }
2057
2058
5.75k
    TReportExecStatusParams params;
2059
5.75k
    params.protocol_version = FrontendServiceVersion::V1;
2060
5.75k
    params.__set_query_id(req.query_id);
2061
5.75k
    params.__set_backend_num(req.backend_num);
2062
5.75k
    params.__set_fragment_instance_id(req.fragment_instance_id);
2063
5.75k
    params.__set_fragment_id(req.fragment_id);
2064
5.75k
    params.__set_status(exec_status.to_thrift());
2065
5.75k
    params.__set_done(req.done);
2066
5.75k
    params.__set_query_type(req.runtime_state->query_type());
2067
5.75k
    params.__isset.profile = false;
2068
2069
5.75k
    DCHECK(req.runtime_state != nullptr);
2070
2071
5.75k
    if (req.runtime_state->query_type() == TQueryType::LOAD) {
2072
5.26k
        params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
2073
5.26k
        params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
2074
5.26k
    } else {
2075
486
        DCHECK(!req.runtime_states.empty());
2076
486
        if (!req.runtime_state->output_files().empty()) {
2077
0
            params.__isset.delta_urls = true;
2078
0
            for (auto& it : req.runtime_state->output_files()) {
2079
0
                params.delta_urls.push_back(_to_http_path(it));
2080
0
            }
2081
0
        }
2082
486
        if (!params.delta_urls.empty()) {
2083
0
            params.__isset.delta_urls = true;
2084
0
        }
2085
486
    }
2086
2087
5.75k
    static std::string s_dpp_normal_all = "dpp.norm.ALL";
2088
5.75k
    static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
2089
5.75k
    static std::string s_unselected_rows = "unselected.rows";
2090
5.75k
    int64_t num_rows_load_success = 0;
2091
5.75k
    int64_t num_rows_load_filtered = 0;
2092
5.75k
    int64_t num_rows_load_unselected = 0;
2093
5.75k
    if (req.runtime_state->num_rows_load_total() > 0 ||
2094
5.75k
        req.runtime_state->num_rows_load_filtered() > 0 ||
2095
5.75k
        req.runtime_state->num_finished_range() > 0) {
2096
0
        params.__isset.load_counters = true;
2097
2098
0
        num_rows_load_success = req.runtime_state->num_rows_load_success();
2099
0
        num_rows_load_filtered = req.runtime_state->num_rows_load_filtered();
2100
0
        num_rows_load_unselected = req.runtime_state->num_rows_load_unselected();
2101
0
        params.__isset.fragment_instance_reports = true;
2102
0
        TFragmentInstanceReport t;
2103
0
        t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id());
2104
0
        t.__set_num_finished_range(cast_set<int>(req.runtime_state->num_finished_range()));
2105
0
        t.__set_loaded_rows(req.runtime_state->num_rows_load_total());
2106
0
        t.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
2107
0
        params.fragment_instance_reports.push_back(t);
2108
5.75k
    } else if (!req.runtime_states.empty()) {
2109
19.8k
        for (auto* rs : req.runtime_states) {
2110
19.8k
            if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 ||
2111
19.8k
                rs->num_finished_range() > 0) {
2112
2.79k
                params.__isset.load_counters = true;
2113
2.79k
                num_rows_load_success += rs->num_rows_load_success();
2114
2.79k
                num_rows_load_filtered += rs->num_rows_load_filtered();
2115
2.79k
                num_rows_load_unselected += rs->num_rows_load_unselected();
2116
2.79k
                params.__isset.fragment_instance_reports = true;
2117
2.79k
                TFragmentInstanceReport t;
2118
2.79k
                t.__set_fragment_instance_id(rs->fragment_instance_id());
2119
2.79k
                t.__set_num_finished_range(cast_set<int>(rs->num_finished_range()));
2120
2.79k
                t.__set_loaded_rows(rs->num_rows_load_total());
2121
2.79k
                t.__set_loaded_bytes(rs->num_bytes_load_total());
2122
2.79k
                params.fragment_instance_reports.push_back(t);
2123
2.79k
            }
2124
19.8k
        }
2125
5.75k
    }
2126
5.75k
    params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success));
2127
5.75k
    params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered));
2128
5.75k
    params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected));
2129
2130
5.75k
    if (!req.load_error_url.empty()) {
2131
6
        params.__set_tracking_url(req.load_error_url);
2132
6
    }
2133
5.75k
    if (!req.first_error_msg.empty()) {
2134
6
        params.__set_first_error_msg(req.first_error_msg);
2135
6
    }
2136
19.8k
    for (auto* rs : req.runtime_states) {
2137
19.8k
        if (rs->wal_id() > 0) {
2138
0
            params.__set_txn_id(rs->wal_id());
2139
0
            params.__set_label(rs->import_label());
2140
0
        }
2141
19.8k
    }
2142
5.75k
    if (!req.runtime_state->export_output_files().empty()) {
2143
0
        params.__isset.export_files = true;
2144
0
        params.export_files = req.runtime_state->export_output_files();
2145
5.75k
    } else if (!req.runtime_states.empty()) {
2146
19.8k
        for (auto* rs : req.runtime_states) {
2147
19.8k
            if (!rs->export_output_files().empty()) {
2148
0
                params.__isset.export_files = true;
2149
0
                params.export_files.insert(params.export_files.end(),
2150
0
                                           rs->export_output_files().begin(),
2151
0
                                           rs->export_output_files().end());
2152
0
            }
2153
19.8k
        }
2154
5.75k
    }
2155
5.75k
    if (auto tci = req.runtime_state->tablet_commit_infos(); !tci.empty()) {
2156
0
        params.__isset.commitInfos = true;
2157
0
        params.commitInfos.insert(params.commitInfos.end(), tci.begin(), tci.end());
2158
5.75k
    } else if (!req.runtime_states.empty()) {
2159
19.8k
        for (auto* rs : req.runtime_states) {
2160
19.8k
            if (auto rs_tci = rs->tablet_commit_infos(); !rs_tci.empty()) {
2161
1.08k
                params.__isset.commitInfos = true;
2162
1.08k
                params.commitInfos.insert(params.commitInfos.end(), rs_tci.begin(), rs_tci.end());
2163
1.08k
            }
2164
19.8k
        }
2165
5.75k
    }
2166
5.75k
    if (auto eti = req.runtime_state->error_tablet_infos(); !eti.empty()) {
2167
0
        params.__isset.errorTabletInfos = true;
2168
0
        params.errorTabletInfos.insert(params.errorTabletInfos.end(), eti.begin(), eti.end());
2169
5.75k
    } else if (!req.runtime_states.empty()) {
2170
19.8k
        for (auto* rs : req.runtime_states) {
2171
19.8k
            if (auto rs_eti = rs->error_tablet_infos(); !rs_eti.empty()) {
2172
0
                params.__isset.errorTabletInfos = true;
2173
0
                params.errorTabletInfos.insert(params.errorTabletInfos.end(), rs_eti.begin(),
2174
0
                                               rs_eti.end());
2175
0
            }
2176
19.8k
        }
2177
5.75k
    }
2178
5.75k
    if (auto hpu = req.runtime_state->hive_partition_updates(); !hpu.empty()) {
2179
0
        params.__isset.hive_partition_updates = true;
2180
0
        params.hive_partition_updates.insert(params.hive_partition_updates.end(), hpu.begin(),
2181
0
                                             hpu.end());
2182
5.75k
    } else if (!req.runtime_states.empty()) {
2183
19.8k
        for (auto* rs : req.runtime_states) {
2184
19.8k
            if (auto rs_hpu = rs->hive_partition_updates(); !rs_hpu.empty()) {
2185
1.04k
                params.__isset.hive_partition_updates = true;
2186
1.04k
                params.hive_partition_updates.insert(params.hive_partition_updates.end(),
2187
1.04k
                                                     rs_hpu.begin(), rs_hpu.end());
2188
1.04k
            }
2189
19.8k
        }
2190
5.75k
    }
2191
5.75k
    if (auto icd = req.runtime_state->iceberg_commit_datas(); !icd.empty()) {
2192
0
        params.__isset.iceberg_commit_datas = true;
2193
0
        params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(), icd.begin(),
2194
0
                                           icd.end());
2195
5.75k
    } else if (!req.runtime_states.empty()) {
2196
19.8k
        for (auto* rs : req.runtime_states) {
2197
19.8k
            if (auto rs_icd = rs->iceberg_commit_datas(); !rs_icd.empty()) {
2198
1.03k
                params.__isset.iceberg_commit_datas = true;
2199
1.03k
                params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(),
2200
1.03k
                                                   rs_icd.begin(), rs_icd.end());
2201
1.03k
            }
2202
19.8k
        }
2203
5.75k
    }
2204
2205
5.75k
    if (auto mcd = req.runtime_state->mc_commit_datas(); !mcd.empty()) {
2206
0
        params.__isset.mc_commit_datas = true;
2207
0
        params.mc_commit_datas.insert(params.mc_commit_datas.end(), mcd.begin(), mcd.end());
2208
5.75k
    } else if (!req.runtime_states.empty()) {
2209
19.8k
        for (auto* rs : req.runtime_states) {
2210
19.8k
            if (auto rs_mcd = rs->mc_commit_datas(); !rs_mcd.empty()) {
2211
0
                params.__isset.mc_commit_datas = true;
2212
0
                params.mc_commit_datas.insert(params.mc_commit_datas.end(), rs_mcd.begin(),
2213
0
                                              rs_mcd.end());
2214
0
            }
2215
19.8k
        }
2216
5.75k
    }
2217
2218
5.75k
    req.runtime_state->get_unreported_errors(&(params.error_log));
2219
5.75k
    params.__isset.error_log = (!params.error_log.empty());
2220
2221
5.75k
    if (_exec_env->cluster_info()->backend_id != 0) {
2222
5.75k
        params.__set_backend_id(_exec_env->cluster_info()->backend_id);
2223
5.75k
    }
2224
2225
5.75k
    TReportExecStatusResult res;
2226
5.75k
    Status rpc_status;
2227
2228
5.75k
    VLOG_DEBUG << "reportExecStatus params is "
2229
0
               << apache::thrift::ThriftDebugString(params).c_str();
2230
5.75k
    if (!exec_status.ok()) {
2231
55
        LOG(WARNING) << "report error status: " << exec_status.msg()
2232
55
                     << " to coordinator: " << req.coord_addr
2233
55
                     << ", query id: " << print_id(req.query_id);
2234
55
    }
2235
5.75k
    try {
2236
5.75k
        try {
2237
5.75k
            (*coord)->reportExecStatus(res, params);
2238
5.75k
        } catch ([[maybe_unused]] apache::thrift::transport::TTransportException& e) {
2239
#ifndef ADDRESS_SANITIZER
2240
            LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(req.query_id)
2241
                         << ", instance id: " << print_id(req.fragment_instance_id) << " to "
2242
                         << req.coord_addr << ", err: " << e.what();
2243
#endif
2244
0
            rpc_status = coord->reopen();
2245
2246
0
            if (!rpc_status.ok()) {
2247
0
                req.cancel_fn(rpc_status);
2248
0
                return;
2249
0
            }
2250
0
            (*coord)->reportExecStatus(res, params);
2251
0
        }
2252
2253
5.75k
        rpc_status = Status::create<false>(res.status);
2254
5.75k
    } catch (apache::thrift::TException& e) {
2255
0
        rpc_status = Status::InternalError("ReportExecStatus() to {} failed: {}",
2256
0
                                           PrintThriftNetworkAddress(req.coord_addr), e.what());
2257
0
    }
2258
2259
5.75k
    if (!rpc_status.ok()) {
2260
0
        LOG_INFO("Going to cancel query {} since report exec status got rpc failed: {}",
2261
0
                 print_id(req.query_id), rpc_status.to_string());
2262
0
        req.cancel_fn(rpc_status);
2263
0
    }
2264
5.75k
}
2265
2266
55.7k
Status PipelineFragmentContext::send_report(bool done) {
2267
55.7k
    Status exec_status = _query_ctx->exec_status();
2268
    // If plan is done successfully, but _is_report_success is false,
2269
    // no need to send report.
2270
    // Load will set _is_report_success to true because load wants to know
2271
    // the process.
2272
55.7k
    if (!_is_report_success && done && exec_status.ok()) {
2273
49.9k
        return Status::OK();
2274
49.9k
    }
2275
2276
    // If both _is_report_success and _is_report_on_cancel are false,
2277
    // which means no matter query is success or failed, no report is needed.
2278
    // This may happen when the query limit reached and
2279
    // a internal cancellation being processed
2280
    // When limit is reached the fragment is also cancelled, but _is_report_on_cancel will
2281
    // be set to false, to avoid sending fault report to FE.
2282
5.85k
    if (!_is_report_success && !_is_report_on_cancel) {
2283
102
        if (done) {
2284
            // if done is true, which means the query is finished successfully, we can safely close the fragment instance without sending report to FE, and just return OK status here.
2285
102
            return Status::OK();
2286
102
        }
2287
0
        return Status::NeedSendAgain("");
2288
102
    }
2289
2290
5.75k
    std::vector<RuntimeState*> runtime_states;
2291
2292
17.0k
    for (auto& tasks : _tasks) {
2293
19.8k
        for (auto& task : tasks) {
2294
19.8k
            runtime_states.push_back(task.second.get());
2295
19.8k
        }
2296
17.0k
    }
2297
2298
5.75k
    std::string load_eror_url = _query_ctx->get_load_error_url().empty()
2299
5.75k
                                        ? get_load_error_url()
2300
5.75k
                                        : _query_ctx->get_load_error_url();
2301
5.75k
    std::string first_error_msg = _query_ctx->get_first_error_msg().empty()
2302
5.75k
                                          ? get_first_error_msg()
2303
5.75k
                                          : _query_ctx->get_first_error_msg();
2304
2305
5.75k
    ReportStatusRequest req {.status = exec_status,
2306
5.75k
                             .runtime_states = runtime_states,
2307
5.75k
                             .done = done || !exec_status.ok(),
2308
5.75k
                             .coord_addr = _query_ctx->coord_addr,
2309
5.75k
                             .query_id = _query_id,
2310
5.75k
                             .fragment_id = _fragment_id,
2311
5.75k
                             .fragment_instance_id = TUniqueId(),
2312
5.75k
                             .backend_num = -1,
2313
5.75k
                             .runtime_state = _runtime_state.get(),
2314
5.75k
                             .load_error_url = load_eror_url,
2315
5.75k
                             .first_error_msg = first_error_msg,
2316
5.75k
                             .cancel_fn = [this](const Status& reason) { cancel(reason); }};
2317
5.75k
    auto ctx = std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this());
2318
5.75k
    return _exec_env->fragment_mgr()->get_thread_pool()->submit_func([this, req, ctx]() {
2319
5.75k
        SCOPED_ATTACH_TASK(ctx->get_query_ctx()->query_mem_tracker());
2320
5.75k
        _coordinator_callback(req);
2321
5.75k
        if (!req.done) {
2322
605
            ctx->refresh_next_report_time();
2323
605
        }
2324
5.75k
    });
2325
5.85k
}
2326
2327
0
size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const {
2328
0
    size_t res = 0;
2329
    // _tasks will be cleared during ~PipelineFragmentContext, so that it's safe
2330
    // here to traverse the vector.
2331
0
    for (const auto& task_instances : _tasks) {
2332
0
        for (const auto& task : task_instances) {
2333
0
            if (task.first->is_running()) {
2334
0
                LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id)
2335
0
                                      << " is running, task: " << (void*)task.first.get()
2336
0
                                      << ", is_running: " << task.first->is_running();
2337
0
                *has_running_task = true;
2338
0
                return 0;
2339
0
            }
2340
2341
0
            size_t revocable_size = task.first->get_revocable_size();
2342
0
            if (revocable_size >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2343
0
                res += revocable_size;
2344
0
            }
2345
0
        }
2346
0
    }
2347
0
    return res;
2348
0
}
2349
2350
0
std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const {
2351
0
    std::vector<PipelineTask*> revocable_tasks;
2352
0
    for (const auto& task_instances : _tasks) {
2353
0
        for (const auto& task : task_instances) {
2354
0
            size_t revocable_size_ = task.first->get_revocable_size();
2355
2356
0
            if (revocable_size_ >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2357
0
                revocable_tasks.emplace_back(task.first.get());
2358
0
            }
2359
0
        }
2360
0
    }
2361
0
    return revocable_tasks;
2362
0
}
2363
2364
0
std::string PipelineFragmentContext::debug_string() {
2365
0
    std::lock_guard<std::mutex> l(_task_mutex);
2366
0
    fmt::memory_buffer debug_string_buffer;
2367
0
    fmt::format_to(debug_string_buffer,
2368
0
                   "PipelineFragmentContext Info: _closed_tasks={}, _total_tasks={}, "
2369
0
                   "need_notify_close={}, fragment_id={}, _rec_cte_stage={}\n",
2370
0
                   _closed_tasks, _total_tasks, _need_notify_close, _fragment_id, _rec_cte_stage);
2371
0
    for (size_t j = 0; j < _tasks.size(); j++) {
2372
0
        fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
2373
0
        for (size_t i = 0; i < _tasks[j].size(); i++) {
2374
0
            fmt::format_to(debug_string_buffer, "Task {}: {}\n", i,
2375
0
                           _tasks[j][i].first->debug_string());
2376
0
        }
2377
0
    }
2378
2379
0
    return fmt::to_string(debug_string_buffer);
2380
0
}
2381
2382
std::vector<std::shared_ptr<TRuntimeProfileTree>>
2383
220
PipelineFragmentContext::collect_realtime_profile() const {
2384
220
    std::vector<std::shared_ptr<TRuntimeProfileTree>> res;
2385
2386
    // we do not have mutex to protect pipeline_id_to_profile
2387
    // so we need to make sure this funciton is invoked after fragment context
2388
    // has already been prepared.
2389
220
    if (!_prepared) {
2390
0
        std::string msg =
2391
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2392
0
        DCHECK(false) << msg;
2393
0
        LOG_ERROR(msg);
2394
0
        return res;
2395
0
    }
2396
2397
    // Make sure first profile is fragment level profile
2398
220
    auto fragment_profile = std::make_shared<TRuntimeProfileTree>();
2399
220
    _fragment_level_profile->to_thrift(fragment_profile.get(), _runtime_state->profile_level());
2400
220
    res.push_back(fragment_profile);
2401
2402
    // pipeline_id_to_profile is initialized in prepare stage
2403
324
    for (auto pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
2404
324
        auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
2405
324
        pipeline_profile->to_thrift(profile_ptr.get(), _runtime_state->profile_level());
2406
324
        res.push_back(profile_ptr);
2407
324
    }
2408
2409
220
    return res;
2410
220
}
2411
2412
std::shared_ptr<TRuntimeProfileTree>
2413
220
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
2414
    // we do not have mutex to protect pipeline_id_to_profile
2415
    // so we need to make sure this funciton is invoked after fragment context
2416
    // has already been prepared.
2417
220
    if (!_prepared) {
2418
0
        std::string msg =
2419
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2420
0
        DCHECK(false) << msg;
2421
0
        LOG_ERROR(msg);
2422
0
        return nullptr;
2423
0
    }
2424
2425
304
    for (const auto& tasks : _tasks) {
2426
534
        for (const auto& task : tasks) {
2427
534
            if (task.second->load_channel_profile() == nullptr) {
2428
0
                continue;
2429
0
            }
2430
2431
534
            auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2432
2433
534
            task.second->load_channel_profile()->to_thrift(tmp_load_channel_profile.get(),
2434
534
                                                           _runtime_state->profile_level());
2435
534
            _runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
2436
534
        }
2437
304
    }
2438
2439
220
    auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2440
220
    _runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get(),
2441
220
                                                      _runtime_state->profile_level());
2442
220
    return load_channel_profile;
2443
220
}
2444
2445
// Collect runtime filter IDs registered by all tasks in this PFC.
2446
// Used during recursive CTE stage transitions to know which filters to deregister
2447
// before creating the new PFC for the next recursion round.
2448
// Called from rerun_fragment(wait_for_destroy) while tasks are still closing.
2449
// Thread safety: safe because _tasks is structurally immutable after prepare() —
2450
// the vector sizes do not change, and individual RuntimeState filter sets are
2451
// written only during open() which has completed by the time we reach rerun.
2452
0
std::set<int> PipelineFragmentContext::get_deregister_runtime_filter() const {
2453
0
    std::set<int> result;
2454
0
    for (const auto& _task : _tasks) {
2455
0
        for (const auto& task : _task) {
2456
0
            auto set = task.first->runtime_state()->get_deregister_runtime_filter();
2457
0
            result.merge(set);
2458
0
        }
2459
0
    }
2460
0
    if (_runtime_state) {
2461
0
        auto set = _runtime_state->get_deregister_runtime_filter();
2462
0
        result.merge(set);
2463
0
    }
2464
0
    return result;
2465
0
}
2466
2467
55.1k
void PipelineFragmentContext::_release_resource() {
2468
55.1k
    std::lock_guard<std::mutex> l(_task_mutex);
2469
    // The memory released by the query end is recorded in the query mem tracker.
2470
55.1k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
2471
55.1k
    auto st = _query_ctx->exec_status();
2472
111k
    for (auto& _task : _tasks) {
2473
111k
        if (!_task.empty()) {
2474
111k
            _call_back(_task.front().first->runtime_state(), &st);
2475
111k
        }
2476
111k
    }
2477
55.1k
    _tasks.clear();
2478
55.1k
    _dag.clear();
2479
55.1k
    _pip_id_to_pipeline.clear();
2480
55.1k
    _pipelines.clear();
2481
55.1k
    _sink.reset();
2482
55.1k
    _root_op.reset();
2483
55.1k
    _runtime_filter_mgr_map.clear();
2484
55.1k
    _op_id_to_shared_state.clear();
2485
55.1k
}
2486
2487
} // namespace doris