Coverage Report

Created: 2026-04-18 03:38

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/pipeline/pipeline_fragment_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/pipeline/pipeline_fragment_context.h"
19
20
#include <gen_cpp/DataSinks_types.h>
21
#include <gen_cpp/FrontendService.h>
22
#include <gen_cpp/FrontendService_types.h>
23
#include <gen_cpp/PaloInternalService_types.h>
24
#include <gen_cpp/PlanNodes_types.h>
25
#include <pthread.h>
26
27
#include <algorithm>
28
#include <cstdlib>
29
// IWYU pragma: no_include <bits/chrono.h>
30
#include <fmt/format.h>
31
#include <thrift/Thrift.h>
32
#include <thrift/protocol/TDebugProtocol.h>
33
#include <thrift/transport/TTransportException.h>
34
35
#include <chrono> // IWYU pragma: keep
36
#include <map>
37
#include <memory>
38
#include <ostream>
39
#include <utility>
40
41
#include "cloud/config.h"
42
#include "common/cast_set.h"
43
#include "common/config.h"
44
#include "common/exception.h"
45
#include "common/logging.h"
46
#include "common/status.h"
47
#include "exec/exchange/local_exchange_sink_operator.h"
48
#include "exec/exchange/local_exchange_source_operator.h"
49
#include "exec/exchange/local_exchanger.h"
50
#include "exec/exchange/vdata_stream_mgr.h"
51
#include "exec/operator/aggregation_sink_operator.h"
52
#include "exec/operator/aggregation_source_operator.h"
53
#include "exec/operator/analytic_sink_operator.h"
54
#include "exec/operator/analytic_source_operator.h"
55
#include "exec/operator/assert_num_rows_operator.h"
56
#include "exec/operator/blackhole_sink_operator.h"
57
#include "exec/operator/bucketed_aggregation_sink_operator.h"
58
#include "exec/operator/bucketed_aggregation_source_operator.h"
59
#include "exec/operator/cache_sink_operator.h"
60
#include "exec/operator/cache_source_operator.h"
61
#include "exec/operator/datagen_operator.h"
62
#include "exec/operator/dict_sink_operator.h"
63
#include "exec/operator/distinct_streaming_aggregation_operator.h"
64
#include "exec/operator/empty_set_operator.h"
65
#include "exec/operator/es_scan_operator.h"
66
#include "exec/operator/exchange_sink_operator.h"
67
#include "exec/operator/exchange_source_operator.h"
68
#include "exec/operator/file_scan_operator.h"
69
#include "exec/operator/group_commit_block_sink_operator.h"
70
#include "exec/operator/group_commit_scan_operator.h"
71
#include "exec/operator/hashjoin_build_sink.h"
72
#include "exec/operator/hashjoin_probe_operator.h"
73
#include "exec/operator/hive_table_sink_operator.h"
74
#include "exec/operator/iceberg_delete_sink_operator.h"
75
#include "exec/operator/iceberg_merge_sink_operator.h"
76
#include "exec/operator/iceberg_table_sink_operator.h"
77
#include "exec/operator/jdbc_scan_operator.h"
78
#include "exec/operator/jdbc_table_sink_operator.h"
79
#include "exec/operator/local_merge_sort_source_operator.h"
80
#include "exec/operator/materialization_opertor.h"
81
#include "exec/operator/maxcompute_table_sink_operator.h"
82
#include "exec/operator/memory_scratch_sink_operator.h"
83
#include "exec/operator/meta_scan_operator.h"
84
#include "exec/operator/multi_cast_data_stream_sink.h"
85
#include "exec/operator/multi_cast_data_stream_source.h"
86
#include "exec/operator/nested_loop_join_build_operator.h"
87
#include "exec/operator/nested_loop_join_probe_operator.h"
88
#include "exec/operator/olap_scan_operator.h"
89
#include "exec/operator/olap_table_sink_operator.h"
90
#include "exec/operator/olap_table_sink_v2_operator.h"
91
#include "exec/operator/partition_sort_sink_operator.h"
92
#include "exec/operator/partition_sort_source_operator.h"
93
#include "exec/operator/partitioned_aggregation_sink_operator.h"
94
#include "exec/operator/partitioned_aggregation_source_operator.h"
95
#include "exec/operator/partitioned_hash_join_probe_operator.h"
96
#include "exec/operator/partitioned_hash_join_sink_operator.h"
97
#include "exec/operator/rec_cte_anchor_sink_operator.h"
98
#include "exec/operator/rec_cte_scan_operator.h"
99
#include "exec/operator/rec_cte_sink_operator.h"
100
#include "exec/operator/rec_cte_source_operator.h"
101
#include "exec/operator/repeat_operator.h"
102
#include "exec/operator/result_file_sink_operator.h"
103
#include "exec/operator/result_sink_operator.h"
104
#include "exec/operator/schema_scan_operator.h"
105
#include "exec/operator/select_operator.h"
106
#include "exec/operator/set_probe_sink_operator.h"
107
#include "exec/operator/set_sink_operator.h"
108
#include "exec/operator/set_source_operator.h"
109
#include "exec/operator/sort_sink_operator.h"
110
#include "exec/operator/sort_source_operator.h"
111
#include "exec/operator/spill_iceberg_table_sink_operator.h"
112
#include "exec/operator/spill_sort_sink_operator.h"
113
#include "exec/operator/spill_sort_source_operator.h"
114
#include "exec/operator/streaming_aggregation_operator.h"
115
#include "exec/operator/table_function_operator.h"
116
#include "exec/operator/tvf_table_sink_operator.h"
117
#include "exec/operator/union_sink_operator.h"
118
#include "exec/operator/union_source_operator.h"
119
#include "exec/pipeline/dependency.h"
120
#include "exec/pipeline/pipeline_task.h"
121
#include "exec/pipeline/task_scheduler.h"
122
#include "exec/runtime_filter/runtime_filter_mgr.h"
123
#include "exec/sort/topn_sorter.h"
124
#include "exec/spill/spill_file.h"
125
#include "io/fs/stream_load_pipe.h"
126
#include "load/stream_load/new_load_stream_mgr.h"
127
#include "runtime/exec_env.h"
128
#include "runtime/fragment_mgr.h"
129
#include "runtime/result_buffer_mgr.h"
130
#include "runtime/runtime_state.h"
131
#include "runtime/thread_context.h"
132
#include "service/backend_options.h"
133
#include "util/client_cache.h"
134
#include "util/countdown_latch.h"
135
#include "util/debug_util.h"
136
#include "util/network_util.h"
137
#include "util/uid_util.h"
138
139
namespace doris {
140
PipelineFragmentContext::PipelineFragmentContext(
141
        TUniqueId query_id, const TPipelineFragmentParams& request,
142
        std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
143
        const std::function<void(RuntimeState*, Status*)>& call_back)
144
108k
        : _query_id(std::move(query_id)),
145
108k
          _fragment_id(request.fragment_id),
146
108k
          _exec_env(exec_env),
147
108k
          _query_ctx(std::move(query_ctx)),
148
108k
          _call_back(call_back),
149
108k
          _is_report_on_cancel(true),
150
108k
          _params(request),
151
108k
          _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0),
152
108k
          _need_notify_close(request.__isset.need_notify_close ? request.need_notify_close
153
108k
                                                               : false) {
154
108k
    _fragment_watcher.start();
155
108k
}
156
157
108k
PipelineFragmentContext::~PipelineFragmentContext() {
158
108k
    LOG_INFO("PipelineFragmentContext::~PipelineFragmentContext")
159
108k
            .tag("query_id", print_id(_query_id))
160
108k
            .tag("fragment_id", _fragment_id);
161
108k
    _release_resource();
162
108k
    {
163
        // The memory released by the query end is recorded in the query mem tracker.
164
108k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
165
108k
        _runtime_state.reset();
166
108k
        _query_ctx.reset();
167
108k
    }
168
108k
}
169
170
0
bool PipelineFragmentContext::is_timeout(timespec now) const {
171
0
    if (_timeout <= 0) {
172
0
        return false;
173
0
    }
174
0
    return _fragment_watcher.elapsed_time_seconds(now) > _timeout;
175
0
}
176
177
// notify_close() transitions the PFC from "waiting for external close notification" to
178
// "self-managed close". For recursive CTE fragments, the old PFC is kept alive until
179
// the rerun_fragment(wait_for_destroy) RPC calls this to trigger shutdown.
180
// Returns true if all tasks have already closed (i.e., the PFC can be safely destroyed).
181
834
bool PipelineFragmentContext::notify_close() {
182
834
    bool all_closed = false;
183
834
    bool need_remove = false;
184
834
    {
185
834
        std::lock_guard<std::mutex> l(_task_mutex);
186
834
        if (_closed_tasks >= _total_tasks) {
187
20
            if (_need_notify_close) {
188
                // Fragment was cancelled and waiting for notify to close.
189
                // Record that we need to remove from fragment mgr, but do it
190
                // after releasing _task_mutex to avoid ABBA deadlock with
191
                // dump_pipeline_tasks() (which acquires _pipeline_map lock
192
                // first, then _task_mutex via debug_string()).
193
0
                need_remove = true;
194
0
            }
195
20
            all_closed = true;
196
20
        }
197
        // make fragment release by self after cancel
198
834
        _need_notify_close = false;
199
834
    }
200
834
    if (need_remove) {
201
0
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
202
0
    }
203
834
    return all_closed;
204
834
}
205
206
// Must not add lock in this method. Because it will call query ctx cancel. And
207
// QueryCtx cancel will call fragment ctx cancel. And Also Fragment ctx's running
208
// Method like exchange sink buffer will call query ctx cancel. If we add lock here
209
// There maybe dead lock.
210
834
void PipelineFragmentContext::cancel(const Status reason) {
211
834
    LOG_INFO("PipelineFragmentContext::cancel")
212
834
            .tag("query_id", print_id(_query_id))
213
834
            .tag("fragment_id", _fragment_id)
214
834
            .tag("reason", reason.to_string());
215
834
    if (notify_close()) {
216
20
        return;
217
20
    }
218
    // Timeout is a special error code, we need print current stack to debug timeout issue.
219
814
    if (reason.is<ErrorCode::TIMEOUT>()) {
220
0
        auto dbg_str = fmt::format("PipelineFragmentContext is cancelled due to timeout:\n{}",
221
0
                                   debug_string());
222
0
        LOG_LONG_STRING(WARNING, dbg_str);
223
0
    }
224
225
    // `ILLEGAL_STATE` means queries this fragment belongs to was not found in FE (maybe finished)
226
814
    if (reason.is<ErrorCode::ILLEGAL_STATE>()) {
227
0
        LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}",
228
0
                    debug_string());
229
0
    }
230
231
814
    if (reason.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || reason.is<ErrorCode::MEM_ALLOC_FAILED>()) {
232
12
        print_profile("cancel pipeline, reason: " + reason.to_string());
233
12
    }
234
235
814
    if (auto error_url = get_load_error_url(); !error_url.empty()) {
236
0
        _query_ctx->set_load_error_url(error_url);
237
0
    }
238
239
814
    if (auto first_error_msg = get_first_error_msg(); !first_error_msg.empty()) {
240
0
        _query_ctx->set_first_error_msg(first_error_msg);
241
0
    }
242
243
814
    _query_ctx->cancel(reason, _fragment_id);
244
814
    if (reason.is<ErrorCode::LIMIT_REACH>()) {
245
208
        _is_report_on_cancel = false;
246
606
    } else {
247
3.64k
        for (auto& id : _fragment_instance_ids) {
248
3.64k
            LOG(WARNING) << "PipelineFragmentContext cancel instance: " << print_id(id);
249
3.64k
        }
250
606
    }
251
    // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
252
    // For stream load the fragment's query_id == load id, it is set in FE.
253
814
    auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
254
814
    if (stream_load_ctx != nullptr) {
255
0
        stream_load_ctx->pipe->cancel(reason.to_string());
256
        // Set error URL here because after pipe is cancelled, stream load execution may return early.
257
        // We need to set the error URL at this point to ensure error information is properly
258
        // propagated to the client.
259
0
        stream_load_ctx->error_url = get_load_error_url();
260
0
        stream_load_ctx->first_error_msg = get_first_error_msg();
261
0
    }
262
263
3.88k
    for (auto& tasks : _tasks) {
264
9.40k
        for (auto& task : tasks) {
265
9.40k
            task.first->unblock_all_dependencies();
266
9.40k
        }
267
3.88k
    }
268
814
}
269
270
166k
PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
271
166k
    PipelineId id = _next_pipeline_id++;
272
166k
    auto pipeline = std::make_shared<Pipeline>(
273
166k
            id, parent ? std::min(parent->num_tasks(), _num_instances) : _num_instances,
274
166k
            parent ? parent->num_tasks() : _num_instances);
275
166k
    if (idx >= 0) {
276
16.2k
        _pipelines.insert(_pipelines.begin() + idx, pipeline);
277
150k
    } else {
278
150k
        _pipelines.emplace_back(pipeline);
279
150k
    }
280
166k
    if (parent) {
281
53.2k
        parent->set_children(pipeline);
282
53.2k
    }
283
166k
    return pipeline;
284
166k
}
285
286
108k
Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thread_pool) {
287
108k
    {
288
108k
        SCOPED_TIMER(_build_pipelines_timer);
289
        // 2. Build pipelines with operators in this fragment.
290
108k
        auto root_pipeline = add_pipeline();
291
108k
        RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl,
292
108k
                                         &_root_op, root_pipeline));
293
294
        // 3. Create sink operator
295
108k
        if (!_params.fragment.__isset.output_sink) {
296
0
            return Status::InternalError("No output sink in this fragment!");
297
0
        }
298
108k
        RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), _params.fragment.output_sink,
299
108k
                                          _params.fragment.output_exprs, _params,
300
108k
                                          root_pipeline->output_row_desc(), _runtime_state.get(),
301
108k
                                          *_desc_tbl, root_pipeline->id()));
302
108k
        RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink));
303
108k
        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
304
305
150k
        for (PipelinePtr& pipeline : _pipelines) {
306
150k
            DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size();
307
150k
            RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
308
150k
        }
309
108k
    }
310
    // 4. Build local exchanger
311
108k
    if (_runtime_state->enable_local_shuffle()) {
312
108k
        SCOPED_TIMER(_plan_local_exchanger_timer);
313
108k
        RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets,
314
108k
                                             _params.bucket_seq_to_instance_idx,
315
108k
                                             _params.shuffle_idx_to_instance_idx));
316
108k
    }
317
318
    // 5. Initialize global states in pipelines.
319
166k
    for (PipelinePtr& pipeline : _pipelines) {
320
166k
        SCOPED_TIMER(_prepare_all_pipelines_timer);
321
166k
        pipeline->children().clear();
322
166k
        RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
323
166k
    }
324
325
108k
    {
326
108k
        SCOPED_TIMER(_build_tasks_timer);
327
        // 6. Build pipeline tasks and initialize local state.
328
108k
        RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool));
329
108k
    }
330
331
108k
    return Status::OK();
332
108k
}
333
334
108k
Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) {
335
108k
    if (_prepared) {
336
0
        return Status::InternalError("Already prepared");
337
0
    }
338
108k
    if (_params.__isset.query_options && _params.query_options.__isset.execution_timeout) {
339
108k
        _timeout = _params.query_options.execution_timeout;
340
108k
    }
341
342
108k
    _fragment_level_profile = std::make_unique<RuntimeProfile>("PipelineContext");
343
108k
    _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime");
344
108k
    SCOPED_TIMER(_prepare_timer);
345
108k
    _build_pipelines_timer = ADD_TIMER(_fragment_level_profile, "BuildPipelinesTime");
346
108k
    _init_context_timer = ADD_TIMER(_fragment_level_profile, "InitContextTime");
347
108k
    _plan_local_exchanger_timer = ADD_TIMER(_fragment_level_profile, "PlanLocalLocalExchangerTime");
348
108k
    _build_tasks_timer = ADD_TIMER(_fragment_level_profile, "BuildTasksTime");
349
108k
    _prepare_all_pipelines_timer = ADD_TIMER(_fragment_level_profile, "PrepareAllPipelinesTime");
350
108k
    {
351
108k
        SCOPED_TIMER(_init_context_timer);
352
108k
        cast_set(_num_instances, _params.local_params.size());
353
108k
        _total_instances =
354
108k
                _params.__isset.total_instances ? _params.total_instances : _num_instances;
355
356
108k
        auto* fragment_context = this;
357
358
108k
        if (_params.query_options.__isset.is_report_success) {
359
108k
            fragment_context->set_is_report_success(_params.query_options.is_report_success);
360
108k
        }
361
362
        // 1. Set up the global runtime state.
363
108k
        _runtime_state = RuntimeState::create_unique(
364
108k
                _params.query_id, _params.fragment_id, _params.query_options,
365
108k
                _query_ctx->query_globals, _exec_env, _query_ctx.get());
366
108k
        _runtime_state->set_task_execution_context(shared_from_this());
367
108k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
368
108k
        if (_params.__isset.backend_id) {
369
108k
            _runtime_state->set_backend_id(_params.backend_id);
370
108k
        }
371
108k
        if (_params.__isset.import_label) {
372
0
            _runtime_state->set_import_label(_params.import_label);
373
0
        }
374
108k
        if (_params.__isset.db_name) {
375
0
            _runtime_state->set_db_name(_params.db_name);
376
0
        }
377
108k
        if (_params.__isset.load_job_id) {
378
0
            _runtime_state->set_load_job_id(_params.load_job_id);
379
0
        }
380
381
108k
        if (_params.is_simplified_param) {
382
33.4k
            _desc_tbl = _query_ctx->desc_tbl;
383
75.4k
        } else {
384
75.4k
            DCHECK(_params.__isset.desc_tbl);
385
75.4k
            RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), _params.desc_tbl,
386
75.4k
                                                  &_desc_tbl));
387
75.4k
        }
388
108k
        _runtime_state->set_desc_tbl(_desc_tbl);
389
108k
        _runtime_state->set_num_per_fragment_instances(_params.num_senders);
390
108k
        _runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
391
108k
        _runtime_state->set_total_load_streams(_params.total_load_streams);
392
108k
        _runtime_state->set_num_local_sink(_params.num_local_sink);
393
394
        // init fragment_instance_ids
395
108k
        const auto target_size = _params.local_params.size();
396
108k
        _fragment_instance_ids.resize(target_size);
397
331k
        for (size_t i = 0; i < _params.local_params.size(); i++) {
398
222k
            auto fragment_instance_id = _params.local_params[i].fragment_instance_id;
399
222k
            _fragment_instance_ids[i] = fragment_instance_id;
400
222k
        }
401
108k
    }
402
403
108k
    RETURN_IF_ERROR(_build_and_prepare_full_pipeline(thread_pool));
404
405
108k
    _init_next_report_time();
406
407
108k
    _prepared = true;
408
108k
    return Status::OK();
409
108k
}
410
411
Status PipelineFragmentContext::_build_pipeline_tasks_for_instance(
412
        int instance_idx,
413
222k
        const std::vector<std::shared_ptr<RuntimeProfile>>& pipeline_id_to_profile) {
414
222k
    const auto& local_params = _params.local_params[instance_idx];
415
222k
    auto fragment_instance_id = local_params.fragment_instance_id;
416
222k
    auto runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(false);
417
222k
    std::map<PipelineId, PipelineTask*> pipeline_id_to_task;
418
222k
    auto get_shared_state = [&](PipelinePtr pipeline)
419
222k
            -> std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
420
395k
                                       std::vector<std::shared_ptr<Dependency>>>> {
421
395k
        std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
422
395k
                                std::vector<std::shared_ptr<Dependency>>>>
423
395k
                shared_state_map;
424
414k
        for (auto& op : pipeline->operators()) {
425
414k
            auto source_id = op->operator_id();
426
414k
            if (auto iter = _op_id_to_shared_state.find(source_id);
427
414k
                iter != _op_id_to_shared_state.end()) {
428
136k
                shared_state_map.insert({source_id, iter->second});
429
136k
            }
430
414k
        }
431
397k
        for (auto sink_to_source_id : pipeline->sink()->dests_id()) {
432
397k
            if (auto iter = _op_id_to_shared_state.find(sink_to_source_id);
433
397k
                iter != _op_id_to_shared_state.end()) {
434
42.6k
                shared_state_map.insert({sink_to_source_id, iter->second});
435
42.6k
            }
436
397k
        }
437
395k
        return shared_state_map;
438
395k
    };
439
440
711k
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
441
489k
        auto& pipeline = _pipelines[pip_idx];
442
489k
        if (pipeline->num_tasks() > 1 || instance_idx == 0) {
443
395k
            auto task_runtime_state = RuntimeState::create_unique(
444
395k
                    local_params.fragment_instance_id, _params.query_id, _params.fragment_id,
445
395k
                    _params.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get());
446
395k
            {
447
                // Initialize runtime state for this task
448
395k
                task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker());
449
450
395k
                task_runtime_state->set_task_execution_context(shared_from_this());
451
395k
                task_runtime_state->set_be_number(local_params.backend_num);
452
453
395k
                if (_params.__isset.backend_id) {
454
394k
                    task_runtime_state->set_backend_id(_params.backend_id);
455
394k
                }
456
395k
                if (_params.__isset.import_label) {
457
0
                    task_runtime_state->set_import_label(_params.import_label);
458
0
                }
459
395k
                if (_params.__isset.db_name) {
460
0
                    task_runtime_state->set_db_name(_params.db_name);
461
0
                }
462
395k
                if (_params.__isset.load_job_id) {
463
0
                    task_runtime_state->set_load_job_id(_params.load_job_id);
464
0
                }
465
395k
                if (_params.__isset.wal_id) {
466
0
                    task_runtime_state->set_wal_id(_params.wal_id);
467
0
                }
468
395k
                if (_params.__isset.content_length) {
469
0
                    task_runtime_state->set_content_length(_params.content_length);
470
0
                }
471
472
395k
                task_runtime_state->set_desc_tbl(_desc_tbl);
473
395k
                task_runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
474
395k
                task_runtime_state->set_num_per_fragment_instances(_params.num_senders);
475
395k
                task_runtime_state->resize_op_id_to_local_state(max_operator_id());
476
395k
                task_runtime_state->set_max_operator_id(max_operator_id());
477
395k
                task_runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
478
395k
                task_runtime_state->set_total_load_streams(_params.total_load_streams);
479
395k
                task_runtime_state->set_num_local_sink(_params.num_local_sink);
480
481
395k
                task_runtime_state->set_runtime_filter_mgr(runtime_filter_mgr.get());
482
395k
            }
483
395k
            auto cur_task_id = _total_tasks++;
484
395k
            task_runtime_state->set_task_id(cur_task_id);
485
395k
            task_runtime_state->set_task_num(pipeline->num_tasks());
486
395k
            auto task = std::make_shared<PipelineTask>(
487
395k
                    pipeline, cur_task_id, task_runtime_state.get(),
488
395k
                    std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
489
395k
                    pipeline_id_to_profile[pip_idx].get(), get_shared_state(pipeline),
490
395k
                    instance_idx);
491
395k
            pipeline->incr_created_tasks(instance_idx, task.get());
492
395k
            pipeline_id_to_task.insert({pipeline->id(), task.get()});
493
395k
            _tasks[instance_idx].emplace_back(
494
395k
                    std::pair<std::shared_ptr<PipelineTask>, std::unique_ptr<RuntimeState>> {
495
395k
                            std::move(task), std::move(task_runtime_state)});
496
395k
        }
497
489k
    }
498
499
    /**
500
         * Build DAG for pipeline tasks.
501
         * For example, we have
502
         *
503
         *   ExchangeSink (Pipeline1)     JoinBuildSink (Pipeline2)
504
         *            \                      /
505
         *          JoinProbeOperator1 (Pipeline1)    JoinBuildSink (Pipeline3)
506
         *                 \                          /
507
         *               JoinProbeOperator2 (Pipeline1)
508
         *
509
         * In this fragment, we have three pipelines and pipeline 1 depends on pipeline 2 and pipeline 3.
510
         * To build this DAG, `_dag` manage dependencies between pipelines by pipeline ID and
511
         * `pipeline_id_to_task` is used to find the task by a unique pipeline ID.
512
         *
513
         * Finally, we have two upstream dependencies in Pipeline1 corresponding to JoinProbeOperator1
514
         * and JoinProbeOperator2.
515
         */
516
489k
    for (auto& _pipeline : _pipelines) {
517
489k
        if (pipeline_id_to_task.contains(_pipeline->id())) {
518
395k
            auto* task = pipeline_id_to_task[_pipeline->id()];
519
395k
            DCHECK(task != nullptr);
520
521
            // If this task has upstream dependency, then inject it into this task.
522
395k
            if (_dag.contains(_pipeline->id())) {
523
262k
                auto& deps = _dag[_pipeline->id()];
524
397k
                for (auto& dep : deps) {
525
397k
                    if (pipeline_id_to_task.contains(dep)) {
526
209k
                        auto ss = pipeline_id_to_task[dep]->get_sink_shared_state();
527
209k
                        if (ss) {
528
129k
                            task->inject_shared_state(ss);
529
129k
                        } else {
530
79.2k
                            pipeline_id_to_task[dep]->inject_shared_state(
531
79.2k
                                    task->get_source_shared_state());
532
79.2k
                        }
533
209k
                    }
534
397k
                }
535
262k
            }
536
395k
        }
537
489k
    }
538
711k
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
539
489k
        if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
540
395k
            auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
541
395k
            DCHECK(pipeline_id_to_profile[pip_idx]);
542
395k
            std::vector<TScanRangeParams> scan_ranges;
543
395k
            auto node_id = _pipelines[pip_idx]->operators().front()->node_id();
544
395k
            if (local_params.per_node_scan_ranges.contains(node_id)) {
545
84.3k
                scan_ranges = local_params.per_node_scan_ranges.find(node_id)->second;
546
84.3k
            }
547
395k
            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(task->prepare(scan_ranges, local_params.sender_id,
548
395k
                                                             _params.fragment.output_sink));
549
395k
        }
550
489k
    }
551
222k
    {
552
222k
        std::lock_guard<std::mutex> l(_state_map_lock);
553
222k
        _runtime_filter_mgr_map[instance_idx] = std::move(runtime_filter_mgr);
554
222k
    }
555
222k
    return Status::OK();
556
222k
}
557
558
108k
Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
559
108k
    _total_tasks = 0;
560
108k
    _closed_tasks = 0;
561
108k
    const auto target_size = _params.local_params.size();
562
108k
    _tasks.resize(target_size);
563
108k
    _runtime_filter_mgr_map.resize(target_size);
564
275k
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
565
166k
        _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get();
566
166k
    }
567
108k
    auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size());
568
569
108k
    if (target_size > 1 &&
570
108k
        (_runtime_state->query_options().__isset.parallel_prepare_threshold &&
571
16.2k
         target_size > _runtime_state->query_options().parallel_prepare_threshold)) {
572
        // If instances parallelism is big enough ( > parallel_prepare_threshold), we will prepare all tasks by multi-threads
573
0
        std::vector<Status> prepare_status(target_size);
574
0
        int submitted_tasks = 0;
575
0
        Status submit_status;
576
0
        CountDownLatch latch((int)target_size);
577
0
        for (int i = 0; i < target_size; i++) {
578
0
            submit_status = thread_pool->submit_func([&, i]() {
579
0
                SCOPED_ATTACH_TASK(_query_ctx.get());
580
0
                prepare_status[i] = _build_pipeline_tasks_for_instance(i, pipeline_id_to_profile);
581
0
                latch.count_down();
582
0
            });
583
0
            if (LIKELY(submit_status.ok())) {
584
0
                submitted_tasks++;
585
0
            } else {
586
0
                break;
587
0
            }
588
0
        }
589
0
        latch.arrive_and_wait(target_size - submitted_tasks);
590
0
        if (UNLIKELY(!submit_status.ok())) {
591
0
            return submit_status;
592
0
        }
593
0
        for (int i = 0; i < submitted_tasks; i++) {
594
0
            if (!prepare_status[i].ok()) {
595
0
                return prepare_status[i];
596
0
            }
597
0
        }
598
108k
    } else {
599
331k
        for (int i = 0; i < target_size; i++) {
600
222k
            RETURN_IF_ERROR(_build_pipeline_tasks_for_instance(i, pipeline_id_to_profile));
601
222k
        }
602
108k
    }
603
108k
    _pipeline_parent_map.clear();
604
108k
    _op_id_to_shared_state.clear();
605
606
108k
    return Status::OK();
607
108k
}
608
609
108k
void PipelineFragmentContext::_init_next_report_time() {
610
108k
    auto interval_s = config::pipeline_status_report_interval;
611
108k
    if (_is_report_success && interval_s > 0 && _timeout > interval_s) {
612
10.0k
        VLOG_FILE << "enable period report: fragment id=" << _fragment_id;
613
10.0k
        uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC;
614
        // We don't want to wait longer than it takes to run the entire fragment.
615
10.0k
        _previous_report_time =
616
10.0k
                MonotonicNanos() + report_fragment_offset - (uint64_t)(interval_s)*NANOS_PER_SEC;
617
10.0k
        _disable_period_report = false;
618
10.0k
    }
619
108k
}
620
621
1.19k
void PipelineFragmentContext::refresh_next_report_time() {
622
1.19k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
623
1.19k
    DCHECK(disable == true);
624
1.19k
    _previous_report_time.store(MonotonicNanos(), std::memory_order_release);
625
1.19k
    _disable_period_report.compare_exchange_strong(disable, false);
626
1.19k
}
627
628
1.54M
void PipelineFragmentContext::trigger_report_if_necessary() {
629
1.54M
    if (!_is_report_success) {
630
1.42M
        return;
631
1.42M
    }
632
115k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
633
115k
    if (disable) {
634
2.54k
        return;
635
2.54k
    }
636
113k
    int32_t interval_s = config::pipeline_status_report_interval;
637
113k
    if (interval_s <= 0) {
638
0
        LOG(WARNING) << "config::status_report_interval is equal to or less than zero, do not "
639
0
                        "trigger "
640
0
                        "report.";
641
0
    }
642
113k
    uint64_t next_report_time = _previous_report_time.load(std::memory_order_acquire) +
643
113k
                                (uint64_t)(interval_s)*NANOS_PER_SEC;
644
113k
    if (MonotonicNanos() > next_report_time) {
645
1.19k
        if (!_disable_period_report.compare_exchange_strong(disable, true,
646
1.19k
                                                            std::memory_order_acq_rel)) {
647
2
            return;
648
2
        }
649
1.19k
        if (VLOG_FILE_IS_ON) {
650
0
            VLOG_FILE << "Reporting "
651
0
                      << "profile for query_id " << print_id(_query_id)
652
0
                      << ", fragment id: " << _fragment_id;
653
654
0
            std::stringstream ss;
655
0
            _runtime_state->runtime_profile()->compute_time_in_profile();
656
0
            _runtime_state->runtime_profile()->pretty_print(&ss);
657
0
            if (_runtime_state->load_channel_profile()) {
658
0
                _runtime_state->load_channel_profile()->pretty_print(&ss);
659
0
            }
660
661
0
            VLOG_FILE << "Query " << print_id(get_query_id()) << " fragment " << get_fragment_id()
662
0
                      << " profile:\n"
663
0
                      << ss.str();
664
0
        }
665
1.19k
        auto st = send_report(false);
666
1.19k
        if (!st.ok()) {
667
0
            disable = true;
668
0
            _disable_period_report.compare_exchange_strong(disable, false,
669
0
                                                           std::memory_order_acq_rel);
670
0
        }
671
1.19k
    }
672
113k
}
673
674
Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const DescriptorTbl& descs,
675
108k
                                                 OperatorPtr* root, PipelinePtr cur_pipe) {
676
108k
    if (_params.fragment.plan.nodes.empty()) {
677
0
        throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no plan node!");
678
0
    }
679
680
108k
    int node_idx = 0;
681
682
108k
    RETURN_IF_ERROR(_create_tree_helper(pool, _params.fragment.plan.nodes, descs, nullptr,
683
108k
                                        &node_idx, root, cur_pipe, 0, false, false));
684
685
108k
    if (node_idx + 1 != _params.fragment.plan.nodes.size()) {
686
0
        return Status::InternalError(
687
0
                "Plan tree only partially reconstructed. Not all thrift nodes were used.");
688
0
    }
689
108k
    return Status::OK();
690
108k
}
691
692
Status PipelineFragmentContext::_create_tree_helper(
693
        ObjectPool* pool, const std::vector<TPlanNode>& tnodes, const DescriptorTbl& descs,
694
        OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx,
695
154k
        const bool followed_by_shuffled_operator, const bool require_bucket_distribution) {
696
    // propagate error case
697
154k
    if (*node_idx >= tnodes.size()) {
698
0
        return Status::InternalError(
699
0
                "Failed to reconstruct plan tree from thrift. Node id: {}, number of nodes: {}",
700
0
                *node_idx, tnodes.size());
701
0
    }
702
154k
    const TPlanNode& tnode = tnodes[*node_idx];
703
704
154k
    int num_children = tnodes[*node_idx].num_children;
705
154k
    bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
706
154k
    bool current_require_bucket_distribution = require_bucket_distribution;
707
    // TODO: Create CacheOperator is confused now
708
154k
    OperatorPtr op = nullptr;
709
154k
    OperatorPtr cache_op = nullptr;
710
154k
    RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op, cur_pipe,
711
154k
                                     parent == nullptr ? -1 : parent->node_id(), child_idx,
712
154k
                                     followed_by_shuffled_operator,
713
154k
                                     current_require_bucket_distribution, cache_op));
714
    // Initialization must be done here. For example, group by expressions in agg will be used to
715
    // decide if a local shuffle should be planed, so it must be initialized here.
716
154k
    RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
717
    // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
718
154k
    if (parent != nullptr) {
719
        // add to parent's child(s)
720
45.3k
        RETURN_IF_ERROR(parent->set_child(cache_op ? cache_op : op));
721
108k
    } else {
722
108k
        *root = op;
723
108k
    }
724
    /**
725
     * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
726
     *
727
     * For plan:
728
     * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
729
     *                           Exchange(id=3) -> ShuffledHashJoinBuild(id=2)
730
     * We must ensure data distribution of `LocalExchange(id=0)` is same as Exchange(id=3).
731
     *
732
     * If an operator's is followed by a local exchange without shuffle (e.g. passthrough), a
733
     * shuffled local exchanger will be used before join so it is not followed by shuffle join.
734
     */
735
154k
    auto required_data_distribution =
736
154k
            cur_pipe->operators().empty()
737
154k
                    ? cur_pipe->sink()->required_data_distribution(_runtime_state.get())
738
154k
                    : op->required_data_distribution(_runtime_state.get());
739
154k
    current_followed_by_shuffled_operator =
740
154k
            ((followed_by_shuffled_operator ||
741
154k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_shuffled_operator()
742
151k
                                             : op->is_shuffled_operator())) &&
743
154k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
744
154k
            (followed_by_shuffled_operator &&
745
149k
             required_data_distribution.distribution_type == ExchangeType::NOOP);
746
747
154k
    current_require_bucket_distribution =
748
154k
            ((require_bucket_distribution ||
749
154k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_colocated_operator()
750
152k
                                             : op->is_colocated_operator())) &&
751
154k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
752
154k
            (require_bucket_distribution &&
753
149k
             required_data_distribution.distribution_type == ExchangeType::NOOP);
754
755
154k
    if (num_children == 0) {
756
112k
        _use_serial_source = op->is_serial_operator();
757
112k
    }
758
    // rely on that tnodes is preorder of the plan
759
199k
    for (int i = 0; i < num_children; i++) {
760
45.3k
        ++*node_idx;
761
45.3k
        RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, descs, op, node_idx, nullptr, cur_pipe, i,
762
45.3k
                                            current_followed_by_shuffled_operator,
763
45.3k
                                            current_require_bucket_distribution));
764
765
        // we are expecting a child, but have used all nodes
766
        // this means we have been given a bad tree and must fail
767
45.3k
        if (*node_idx >= tnodes.size()) {
768
0
            return Status::InternalError(
769
0
                    "Failed to reconstruct plan tree from thrift. Node id: {}, number of "
770
0
                    "nodes: {}",
771
0
                    *node_idx, tnodes.size());
772
0
        }
773
45.3k
    }
774
775
154k
    return Status::OK();
776
154k
}
777
778
void PipelineFragmentContext::_inherit_pipeline_properties(
779
        const DataDistribution& data_distribution, PipelinePtr pipe_with_source,
780
16.2k
        PipelinePtr pipe_with_sink) {
781
16.2k
    pipe_with_sink->set_num_tasks(pipe_with_source->num_tasks());
782
16.2k
    pipe_with_source->set_num_tasks(_num_instances);
783
16.2k
    pipe_with_source->set_data_distribution(data_distribution);
784
16.2k
}
785
786
Status PipelineFragmentContext::_add_local_exchange_impl(
787
        int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip,
788
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
789
        const std::map<int, int>& bucket_seq_to_instance_idx,
790
16.2k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
791
16.2k
    auto& operators = cur_pipe->operators();
792
16.2k
    const auto downstream_pipeline_id = cur_pipe->id();
793
16.2k
    auto local_exchange_id = next_operator_id();
794
    // 1. Create a new pipeline with local exchange sink.
795
16.2k
    DataSinkOperatorPtr sink;
796
16.2k
    auto sink_id = next_sink_operator_id();
797
798
    /**
799
     * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
800
     * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
801
     */
802
16.2k
    const bool followed_by_shuffled_operator =
803
16.2k
            operators.size() > idx ? operators[idx]->followed_by_shuffled_operator()
804
16.2k
                                   : cur_pipe->sink()->followed_by_shuffled_operator();
805
16.2k
    const bool use_global_hash_shuffle = bucket_seq_to_instance_idx.empty() &&
806
16.2k
                                         !shuffle_idx_to_instance_idx.contains(-1) &&
807
16.2k
                                         followed_by_shuffled_operator && !_use_serial_source;
808
16.2k
    sink = std::make_shared<LocalExchangeSinkOperatorX>(
809
16.2k
            sink_id, local_exchange_id, use_global_hash_shuffle ? _total_instances : _num_instances,
810
16.2k
            data_distribution.partition_exprs, bucket_seq_to_instance_idx);
811
16.2k
    if (bucket_seq_to_instance_idx.empty() &&
812
16.2k
        data_distribution.distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE) {
813
0
        data_distribution.distribution_type = ExchangeType::HASH_SHUFFLE;
814
0
    }
815
16.2k
    RETURN_IF_ERROR(new_pip->set_sink(sink));
816
16.2k
    RETURN_IF_ERROR(new_pip->sink()->init(_runtime_state.get(), data_distribution.distribution_type,
817
16.2k
                                          num_buckets, use_global_hash_shuffle,
818
16.2k
                                          shuffle_idx_to_instance_idx));
819
820
    // 2. Create and initialize LocalExchangeSharedState.
821
16.2k
    std::shared_ptr<LocalExchangeSharedState> shared_state =
822
16.2k
            LocalExchangeSharedState::create_shared(_num_instances);
823
16.2k
    switch (data_distribution.distribution_type) {
824
630
    case ExchangeType::HASH_SHUFFLE:
825
630
        shared_state->exchanger = ShuffleExchanger::create_unique(
826
630
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
827
630
                use_global_hash_shuffle ? _total_instances : _num_instances,
828
630
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
829
630
                        ? cast_set<int>(
830
630
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
831
630
                        : 0);
832
630
        break;
833
8
    case ExchangeType::BUCKET_HASH_SHUFFLE:
834
8
        shared_state->exchanger = BucketShuffleExchanger::create_unique(
835
8
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, num_buckets,
836
8
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
837
8
                        ? cast_set<int>(
838
8
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
839
8
                        : 0);
840
8
        break;
841
14.8k
    case ExchangeType::PASSTHROUGH:
842
14.8k
        shared_state->exchanger = PassthroughExchanger::create_unique(
843
14.8k
                cur_pipe->num_tasks(), _num_instances,
844
14.8k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
845
14.8k
                        ? cast_set<int>(
846
14.8k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
847
14.8k
                        : 0);
848
14.8k
        break;
849
58
    case ExchangeType::BROADCAST:
850
58
        shared_state->exchanger = BroadcastExchanger::create_unique(
851
58
                cur_pipe->num_tasks(), _num_instances,
852
58
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
853
58
                        ? cast_set<int>(
854
58
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
855
58
                        : 0);
856
58
        break;
857
666
    case ExchangeType::PASS_TO_ONE:
858
666
        if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
859
            // If shared hash table is enabled for BJ, hash table will be built by only one task
860
666
            shared_state->exchanger = PassToOneExchanger::create_unique(
861
666
                    cur_pipe->num_tasks(), _num_instances,
862
666
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
863
666
                            ? cast_set<int>(_runtime_state->query_options()
864
666
                                                    .local_exchange_free_blocks_limit)
865
666
                            : 0);
866
666
        } else {
867
0
            shared_state->exchanger = BroadcastExchanger::create_unique(
868
0
                    cur_pipe->num_tasks(), _num_instances,
869
0
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
870
0
                            ? cast_set<int>(_runtime_state->query_options()
871
0
                                                    .local_exchange_free_blocks_limit)
872
0
                            : 0);
873
0
        }
874
666
        break;
875
58
    case ExchangeType::ADAPTIVE_PASSTHROUGH:
876
58
        shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
877
58
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
878
58
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
879
58
                        ? cast_set<int>(
880
58
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
881
58
                        : 0);
882
58
        break;
883
0
    default:
884
0
        return Status::InternalError("Unsupported local exchange type : " +
885
0
                                     std::to_string((int)data_distribution.distribution_type));
886
16.2k
    }
887
16.2k
    shared_state->create_source_dependencies(_num_instances, local_exchange_id, local_exchange_id,
888
16.2k
                                             "LOCAL_EXCHANGE_OPERATOR");
889
16.2k
    shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
890
16.2k
    _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
891
892
    // 3. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to
893
    // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink].
894
895
    // 3.1 Initialize new pipeline's operator list.
896
16.2k
    std::copy(operators.begin(), operators.begin() + idx,
897
16.2k
              std::inserter(new_pip->operators(), new_pip->operators().end()));
898
899
    // 3.2 Erase unused operators in previous pipeline.
900
16.2k
    operators.erase(operators.begin(), operators.begin() + idx);
901
902
    // 4. Initialize LocalExchangeSource and insert it into this pipeline.
903
16.2k
    OperatorPtr source_op;
904
16.2k
    source_op = std::make_shared<LocalExchangeSourceOperatorX>(pool, local_exchange_id);
905
16.2k
    RETURN_IF_ERROR(source_op->set_child(new_pip->operators().back()));
906
16.2k
    RETURN_IF_ERROR(source_op->init(data_distribution.distribution_type));
907
16.2k
    if (!operators.empty()) {
908
1.38k
        RETURN_IF_ERROR(operators.front()->set_child(nullptr));
909
1.38k
        RETURN_IF_ERROR(operators.front()->set_child(source_op));
910
1.38k
    }
911
16.2k
    operators.insert(operators.begin(), source_op);
912
913
    // 5. Set children for two pipelines separately.
914
16.2k
    std::vector<std::shared_ptr<Pipeline>> new_children;
915
16.2k
    std::vector<PipelineId> edges_with_source;
916
18.3k
    for (auto child : cur_pipe->children()) {
917
18.3k
        bool found = false;
918
19.3k
        for (auto op : new_pip->operators()) {
919
19.3k
            if (child->sink()->node_id() == op->node_id()) {
920
1.33k
                new_pip->set_children(child);
921
1.33k
                found = true;
922
1.33k
            };
923
19.3k
        }
924
18.3k
        if (!found) {
925
17.0k
            new_children.push_back(child);
926
17.0k
            edges_with_source.push_back(child->id());
927
17.0k
        }
928
18.3k
    }
929
16.2k
    new_children.push_back(new_pip);
930
16.2k
    edges_with_source.push_back(new_pip->id());
931
932
    // 6. Set DAG for new pipelines.
933
16.2k
    if (!new_pip->children().empty()) {
934
866
        std::vector<PipelineId> edges_with_sink;
935
1.33k
        for (auto child : new_pip->children()) {
936
1.33k
            edges_with_sink.push_back(child->id());
937
1.33k
        }
938
866
        _dag.insert({new_pip->id(), edges_with_sink});
939
866
    }
940
16.2k
    cur_pipe->set_children(new_children);
941
16.2k
    _dag[downstream_pipeline_id] = edges_with_source;
942
16.2k
    RETURN_IF_ERROR(new_pip->sink()->set_child(new_pip->operators().back()));
943
16.2k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(nullptr));
944
16.2k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(cur_pipe->operators().back()));
945
946
    // 7. Inherit properties from current pipeline.
947
16.2k
    _inherit_pipeline_properties(data_distribution, cur_pipe, new_pip);
948
16.2k
    return Status::OK();
949
16.2k
}
950
951
Status PipelineFragmentContext::_add_local_exchange(
952
        int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe,
953
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
954
        const std::map<int, int>& bucket_seq_to_instance_idx,
955
32.8k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
956
32.8k
    if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) {
957
16.4k
        return Status::OK();
958
16.4k
    }
959
960
16.4k
    if (!cur_pipe->need_to_local_exchange(data_distribution, idx)) {
961
818
        return Status::OK();
962
818
    }
963
15.5k
    *do_local_exchange = true;
964
965
15.5k
    auto& operators = cur_pipe->operators();
966
15.5k
    auto total_op_num = operators.size();
967
15.5k
    auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
968
15.5k
    RETURN_IF_ERROR(_add_local_exchange_impl(
969
15.5k
            idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets,
970
15.5k
            bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
971
972
18.4E
    CHECK(total_op_num + 1 == cur_pipe->operators().size() + new_pip->operators().size())
973
18.4E
            << "total_op_num: " << total_op_num
974
18.4E
            << " cur_pipe->operators().size(): " << cur_pipe->operators().size()
975
18.4E
            << " new_pip->operators().size(): " << new_pip->operators().size();
976
977
    // There are some local shuffles with relatively heavy operations on the sink.
978
    // If the local sink concurrency is 1 and the local source concurrency is n, the sink becomes a bottleneck.
979
    // Therefore, local passthrough is used to increase the concurrency of the sink.
980
    // op -> local sink(1) -> local source (n)
981
    // op -> local passthrough(1) -> local passthrough(n) ->  local sink(n) -> local source (n)
982
15.6k
    if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 &&
983
15.5k
        Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) {
984
696
        RETURN_IF_ERROR(_add_local_exchange_impl(
985
696
                cast_set<int>(new_pip->operators().size()), pool, new_pip,
986
696
                add_pipeline(new_pip, pip_idx + 2), DataDistribution(ExchangeType::PASSTHROUGH),
987
696
                do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
988
696
                shuffle_idx_to_instance_idx));
989
696
    }
990
15.5k
    return Status::OK();
991
15.5k
}
992
993
Status PipelineFragmentContext::_plan_local_exchange(
994
        int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx,
995
108k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
996
258k
    for (int pip_idx = cast_set<int>(_pipelines.size()) - 1; pip_idx >= 0; pip_idx--) {
997
149k
        _pipelines[pip_idx]->init_data_distribution(_runtime_state.get());
998
        // Set property if child pipeline is not join operator's child.
999
149k
        if (!_pipelines[pip_idx]->children().empty()) {
1000
36.9k
            for (auto& child : _pipelines[pip_idx]->children()) {
1001
36.9k
                if (child->sink()->node_id() ==
1002
36.9k
                    _pipelines[pip_idx]->operators().front()->node_id()) {
1003
33.1k
                    _pipelines[pip_idx]->set_data_distribution(child->data_distribution());
1004
33.1k
                }
1005
36.9k
            }
1006
35.1k
        }
1007
1008
        // if 'num_buckets == 0' means the fragment is colocated by exchange node not the
1009
        // scan node. so here use `_num_instance` to replace the `num_buckets` to prevent dividing 0
1010
        // still keep colocate plan after local shuffle
1011
149k
        RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx, _pipelines[pip_idx],
1012
149k
                                             bucket_seq_to_instance_idx,
1013
149k
                                             shuffle_idx_to_instance_idx));
1014
149k
    }
1015
108k
    return Status::OK();
1016
108k
}
1017
1018
Status PipelineFragmentContext::_plan_local_exchange(
1019
        int num_buckets, int pip_idx, PipelinePtr pip,
1020
        const std::map<int, int>& bucket_seq_to_instance_idx,
1021
149k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1022
149k
    int idx = 1;
1023
149k
    bool do_local_exchange = false;
1024
151k
    do {
1025
151k
        auto& ops = pip->operators();
1026
151k
        do_local_exchange = false;
1027
        // Plan local exchange for each operator.
1028
158k
        for (; idx < ops.size();) {
1029
8.38k
            if (ops[idx]->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1030
5.92k
                RETURN_IF_ERROR(_add_local_exchange(
1031
5.92k
                        pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip,
1032
5.92k
                        ops[idx]->required_data_distribution(_runtime_state.get()),
1033
5.92k
                        &do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
1034
5.92k
                        shuffle_idx_to_instance_idx));
1035
5.92k
            }
1036
8.38k
            if (do_local_exchange) {
1037
                // If local exchange is needed for current operator, we will split this pipeline to
1038
                // two pipelines by local exchange sink/source. And then we need to process remaining
1039
                // operators in this pipeline so we set idx to 2 (0 is local exchange source and 1
1040
                // is current operator was already processed) and continue to plan local exchange.
1041
1.38k
                idx = 2;
1042
1.38k
                break;
1043
1.38k
            }
1044
7.00k
            idx++;
1045
7.00k
        }
1046
151k
    } while (do_local_exchange);
1047
149k
    if (pip->sink()->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1048
26.9k
        RETURN_IF_ERROR(_add_local_exchange(
1049
26.9k
                pip_idx, idx, pip->sink()->node_id(), _runtime_state->obj_pool(), pip,
1050
26.9k
                pip->sink()->required_data_distribution(_runtime_state.get()), &do_local_exchange,
1051
26.9k
                num_buckets, bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1052
26.9k
    }
1053
149k
    return Status::OK();
1054
149k
}
1055
1056
Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
1057
                                                  const std::vector<TExpr>& output_exprs,
1058
                                                  const TPipelineFragmentParams& params,
1059
                                                  const RowDescriptor& row_desc,
1060
                                                  RuntimeState* state, DescriptorTbl& desc_tbl,
1061
108k
                                                  PipelineId cur_pipeline_id) {
1062
108k
    switch (thrift_sink.type) {
1063
31.9k
    case TDataSinkType::DATA_STREAM_SINK: {
1064
31.9k
        if (!thrift_sink.__isset.stream_sink) {
1065
0
            return Status::InternalError("Missing data stream sink.");
1066
0
        }
1067
31.9k
        _sink = std::make_shared<ExchangeSinkOperatorX>(
1068
31.9k
                state, row_desc, next_sink_operator_id(), thrift_sink.stream_sink,
1069
31.9k
                params.destinations, _fragment_instance_ids);
1070
31.9k
        break;
1071
31.9k
    }
1072
69.3k
    case TDataSinkType::RESULT_SINK: {
1073
69.3k
        if (!thrift_sink.__isset.result_sink) {
1074
0
            return Status::InternalError("Missing data buffer sink.");
1075
0
        }
1076
1077
69.3k
        _sink = std::make_shared<ResultSinkOperatorX>(next_sink_operator_id(), row_desc,
1078
69.3k
                                                      output_exprs, thrift_sink.result_sink);
1079
69.3k
        break;
1080
69.3k
    }
1081
4
    case TDataSinkType::DICTIONARY_SINK: {
1082
4
        if (!thrift_sink.__isset.dictionary_sink) {
1083
0
            return Status::InternalError("Missing dict sink.");
1084
0
        }
1085
1086
4
        _sink = std::make_shared<DictSinkOperatorX>(next_sink_operator_id(), row_desc, output_exprs,
1087
4
                                                    thrift_sink.dictionary_sink);
1088
4
        break;
1089
4
    }
1090
0
    case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
1091
2.41k
    case TDataSinkType::OLAP_TABLE_SINK: {
1092
2.41k
        if (state->query_options().enable_memtable_on_sink_node &&
1093
2.41k
            !_has_inverted_index_v1_or_partial_update(thrift_sink.olap_table_sink) &&
1094
2.41k
            !config::is_cloud_mode()) {
1095
2.04k
            _sink = std::make_shared<OlapTableSinkV2OperatorX>(pool, next_sink_operator_id(),
1096
2.04k
                                                               row_desc, output_exprs);
1097
2.04k
        } else {
1098
368
            _sink = std::make_shared<OlapTableSinkOperatorX>(pool, next_sink_operator_id(),
1099
368
                                                             row_desc, output_exprs);
1100
368
        }
1101
2.41k
        break;
1102
0
    }
1103
0
    case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
1104
0
        DCHECK(thrift_sink.__isset.olap_table_sink);
1105
0
        DCHECK(state->get_query_ctx() != nullptr);
1106
0
        state->get_query_ctx()->query_mem_tracker()->is_group_commit_load = true;
1107
0
        _sink = std::make_shared<GroupCommitBlockSinkOperatorX>(next_sink_operator_id(), row_desc,
1108
0
                                                                output_exprs);
1109
0
        break;
1110
0
    }
1111
1.46k
    case TDataSinkType::HIVE_TABLE_SINK: {
1112
1.46k
        if (!thrift_sink.__isset.hive_table_sink) {
1113
0
            return Status::InternalError("Missing hive table sink.");
1114
0
        }
1115
1.46k
        _sink = std::make_shared<HiveTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1116
1.46k
                                                         output_exprs);
1117
1.46k
        break;
1118
1.46k
    }
1119
1.72k
    case TDataSinkType::ICEBERG_TABLE_SINK: {
1120
1.72k
        if (!thrift_sink.__isset.iceberg_table_sink) {
1121
0
            return Status::InternalError("Missing iceberg table sink.");
1122
0
        }
1123
1.72k
        if (thrift_sink.iceberg_table_sink.__isset.sort_info) {
1124
0
            _sink = std::make_shared<SpillIcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1125
0
                                                                     row_desc, output_exprs);
1126
1.72k
        } else {
1127
1.72k
            _sink = std::make_shared<IcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1128
1.72k
                                                                row_desc, output_exprs);
1129
1.72k
        }
1130
1.72k
        break;
1131
1.72k
    }
1132
20
    case TDataSinkType::ICEBERG_DELETE_SINK: {
1133
20
        if (!thrift_sink.__isset.iceberg_delete_sink) {
1134
0
            return Status::InternalError("Missing iceberg delete sink.");
1135
0
        }
1136
20
        _sink = std::make_shared<IcebergDeleteSinkOperatorX>(pool, next_sink_operator_id(),
1137
20
                                                             row_desc, output_exprs);
1138
20
        break;
1139
20
    }
1140
80
    case TDataSinkType::ICEBERG_MERGE_SINK: {
1141
80
        if (!thrift_sink.__isset.iceberg_merge_sink) {
1142
0
            return Status::InternalError("Missing iceberg merge sink.");
1143
0
        }
1144
80
        _sink = std::make_shared<IcebergMergeSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1145
80
                                                            output_exprs);
1146
80
        break;
1147
80
    }
1148
0
    case TDataSinkType::MAXCOMPUTE_TABLE_SINK: {
1149
0
        if (!thrift_sink.__isset.max_compute_table_sink) {
1150
0
            return Status::InternalError("Missing max compute table sink.");
1151
0
        }
1152
0
        _sink = std::make_shared<MCTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1153
0
                                                       output_exprs);
1154
0
        break;
1155
0
    }
1156
80
    case TDataSinkType::JDBC_TABLE_SINK: {
1157
80
        if (!thrift_sink.__isset.jdbc_table_sink) {
1158
0
            return Status::InternalError("Missing data jdbc sink.");
1159
0
        }
1160
80
        if (config::enable_java_support) {
1161
80
            _sink = std::make_shared<JdbcTableSinkOperatorX>(row_desc, next_sink_operator_id(),
1162
80
                                                             output_exprs);
1163
80
        } else {
1164
0
            return Status::InternalError(
1165
0
                    "Jdbc table sink is not enabled, you can change be config "
1166
0
                    "enable_java_support to true and restart be.");
1167
0
        }
1168
80
        break;
1169
80
    }
1170
80
    case TDataSinkType::MEMORY_SCRATCH_SINK: {
1171
0
        if (!thrift_sink.__isset.memory_scratch_sink) {
1172
0
            return Status::InternalError("Missing data buffer sink.");
1173
0
        }
1174
1175
0
        _sink = std::make_shared<MemoryScratchSinkOperatorX>(row_desc, next_sink_operator_id(),
1176
0
                                                             output_exprs);
1177
0
        break;
1178
0
    }
1179
164
    case TDataSinkType::RESULT_FILE_SINK: {
1180
164
        if (!thrift_sink.__isset.result_file_sink) {
1181
0
            return Status::InternalError("Missing result file sink.");
1182
0
        }
1183
1184
        // Result file sink is not the top sink
1185
164
        if (params.__isset.destinations && !params.destinations.empty()) {
1186
0
            _sink = std::make_shared<ResultFileSinkOperatorX>(
1187
0
                    next_sink_operator_id(), row_desc, thrift_sink.result_file_sink,
1188
0
                    params.destinations, output_exprs, desc_tbl);
1189
164
        } else {
1190
164
            _sink = std::make_shared<ResultFileSinkOperatorX>(next_sink_operator_id(), row_desc,
1191
164
                                                              output_exprs);
1192
164
        }
1193
164
        break;
1194
164
    }
1195
1.44k
    case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
1196
1.44k
        DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
1197
1.44k
        DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
1198
1.44k
        auto sink_id = next_sink_operator_id();
1199
1.44k
        const int multi_cast_node_id = sink_id;
1200
1.44k
        auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size();
1201
        // one sink has multiple sources.
1202
1.44k
        std::vector<int> sources;
1203
5.76k
        for (int i = 0; i < sender_size; ++i) {
1204
4.32k
            auto source_id = next_operator_id();
1205
4.32k
            sources.push_back(source_id);
1206
4.32k
        }
1207
1208
1.44k
        _sink = std::make_shared<MultiCastDataStreamSinkOperatorX>(
1209
1.44k
                sink_id, multi_cast_node_id, sources, pool, thrift_sink.multi_cast_stream_sink);
1210
5.76k
        for (int i = 0; i < sender_size; ++i) {
1211
4.32k
            auto new_pipeline = add_pipeline();
1212
            // use to exchange sink
1213
4.32k
            RowDescriptor* exchange_row_desc = nullptr;
1214
4.32k
            {
1215
4.32k
                const auto& tmp_row_desc =
1216
4.32k
                        !thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty()
1217
4.32k
                                ? RowDescriptor(state->desc_tbl(),
1218
4.32k
                                                {thrift_sink.multi_cast_stream_sink.sinks[i]
1219
4.32k
                                                         .output_tuple_id})
1220
4.32k
                                : row_desc;
1221
4.32k
                exchange_row_desc = pool->add(new RowDescriptor(tmp_row_desc));
1222
4.32k
            }
1223
4.32k
            auto source_id = sources[i];
1224
4.32k
            OperatorPtr source_op;
1225
            // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
1226
4.32k
            source_op = std::make_shared<MultiCastDataStreamerSourceOperatorX>(
1227
4.32k
                    /*node_id*/ source_id, /*consumer_id*/ i, pool,
1228
4.32k
                    thrift_sink.multi_cast_stream_sink.sinks[i], row_desc,
1229
4.32k
                    /*operator_id=*/source_id);
1230
4.32k
            RETURN_IF_ERROR(new_pipeline->add_operator(
1231
4.32k
                    source_op, params.__isset.parallel_instances ? params.parallel_instances : 0));
1232
            // 2. create and set sink operator of data stream sender for new pipeline
1233
1234
4.32k
            DataSinkOperatorPtr sink_op;
1235
4.32k
            sink_op = std::make_shared<ExchangeSinkOperatorX>(
1236
4.32k
                    state, *exchange_row_desc, next_sink_operator_id(),
1237
4.32k
                    thrift_sink.multi_cast_stream_sink.sinks[i],
1238
4.32k
                    thrift_sink.multi_cast_stream_sink.destinations[i], _fragment_instance_ids);
1239
1240
4.32k
            RETURN_IF_ERROR(new_pipeline->set_sink(sink_op));
1241
4.32k
            {
1242
4.32k
                TDataSink* t = pool->add(new TDataSink());
1243
4.32k
                t->stream_sink = thrift_sink.multi_cast_stream_sink.sinks[i];
1244
4.32k
                RETURN_IF_ERROR(sink_op->init(*t));
1245
4.32k
            }
1246
1247
            // 3. set dependency dag
1248
4.32k
            _dag[new_pipeline->id()].push_back(cur_pipeline_id);
1249
4.32k
        }
1250
1.44k
        if (sources.empty()) {
1251
0
            return Status::InternalError("size of sources must be greater than 0");
1252
0
        }
1253
1.44k
        break;
1254
1.44k
    }
1255
1.44k
    case TDataSinkType::BLACKHOLE_SINK: {
1256
10
        if (!thrift_sink.__isset.blackhole_sink) {
1257
0
            return Status::InternalError("Missing blackhole sink.");
1258
0
        }
1259
1260
10
        _sink.reset(new BlackholeSinkOperatorX(next_sink_operator_id()));
1261
10
        break;
1262
10
    }
1263
156
    case TDataSinkType::TVF_TABLE_SINK: {
1264
156
        if (!thrift_sink.__isset.tvf_table_sink) {
1265
0
            return Status::InternalError("Missing TVF table sink.");
1266
0
        }
1267
156
        _sink = std::make_shared<TVFTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1268
156
                                                        output_exprs);
1269
156
        break;
1270
156
    }
1271
0
    default:
1272
0
        return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type);
1273
108k
    }
1274
108k
    return Status::OK();
1275
108k
}
1276
1277
// NOLINTBEGIN(readability-function-size)
1278
// NOLINTBEGIN(readability-function-cognitive-complexity)
1279
Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNode& tnode,
1280
                                                 const DescriptorTbl& descs, OperatorPtr& op,
1281
                                                 PipelinePtr& cur_pipe, int parent_idx,
1282
                                                 int child_idx,
1283
                                                 const bool followed_by_shuffled_operator,
1284
                                                 const bool require_bucket_distribution,
1285
154k
                                                 OperatorPtr& cache_op) {
1286
154k
    std::vector<DataSinkOperatorPtr> sink_ops;
1287
154k
    Defer defer = Defer([&]() {
1288
154k
        if (op) {
1289
154k
            op->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1290
154k
        }
1291
154k
        for (auto& s : sink_ops) {
1292
36.9k
            s->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1293
36.9k
        }
1294
154k
    });
1295
    // We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
1296
    // Therefore, here we need to use a stack-like structure.
1297
154k
    _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
1298
154k
    std::stringstream error_msg;
1299
154k
    bool enable_query_cache = _params.fragment.__isset.query_cache_param;
1300
1301
154k
    bool fe_with_old_version = false;
1302
154k
    switch (tnode.node_type) {
1303
49.3k
    case TPlanNodeType::OLAP_SCAN_NODE: {
1304
49.3k
        op = std::make_shared<OlapScanOperatorX>(
1305
49.3k
                pool, tnode, next_operator_id(), descs, _num_instances,
1306
49.3k
                enable_query_cache ? _params.fragment.query_cache_param : TQueryCacheParam {});
1307
49.3k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1308
49.3k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1309
49.3k
        break;
1310
49.3k
    }
1311
0
    case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
1312
0
        DCHECK(_query_ctx != nullptr);
1313
0
        _query_ctx->query_mem_tracker()->is_group_commit_load = true;
1314
0
        op = std::make_shared<GroupCommitOperatorX>(pool, tnode, next_operator_id(), descs,
1315
0
                                                    _num_instances);
1316
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1317
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1318
0
        break;
1319
0
    }
1320
0
    case TPlanNodeType::JDBC_SCAN_NODE: {
1321
0
        if (config::enable_java_support) {
1322
0
            op = std::make_shared<JDBCScanOperatorX>(pool, tnode, next_operator_id(), descs,
1323
0
                                                     _num_instances);
1324
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1325
0
        } else {
1326
0
            return Status::InternalError(
1327
0
                    "Jdbc scan node is disabled, you can change be config enable_java_support "
1328
0
                    "to true and restart be.");
1329
0
        }
1330
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1331
0
        break;
1332
0
    }
1333
20.3k
    case TPlanNodeType::FILE_SCAN_NODE: {
1334
20.3k
        op = std::make_shared<FileScanOperatorX>(pool, tnode, next_operator_id(), descs,
1335
20.3k
                                                 _num_instances);
1336
20.3k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1337
20.3k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1338
20.3k
        break;
1339
20.3k
    }
1340
0
    case TPlanNodeType::ES_SCAN_NODE:
1341
592
    case TPlanNodeType::ES_HTTP_SCAN_NODE: {
1342
592
        op = std::make_shared<EsScanOperatorX>(pool, tnode, next_operator_id(), descs,
1343
592
                                               _num_instances);
1344
592
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1345
592
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1346
592
        break;
1347
592
    }
1348
36.2k
    case TPlanNodeType::EXCHANGE_NODE: {
1349
36.2k
        int num_senders = _params.per_exch_num_senders.contains(tnode.node_id)
1350
36.2k
                                  ? _params.per_exch_num_senders.find(tnode.node_id)->second
1351
36.2k
                                  : 0;
1352
36.2k
        DCHECK_GT(num_senders, 0);
1353
36.2k
        op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs,
1354
36.2k
                                                       num_senders);
1355
36.2k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1356
36.2k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1357
36.2k
        break;
1358
36.2k
    }
1359
22.8k
    case TPlanNodeType::AGGREGATION_NODE: {
1360
22.8k
        if (tnode.agg_node.grouping_exprs.empty() &&
1361
22.8k
            descs.get_tuple_descriptor(tnode.agg_node.output_tuple_id)->slots().empty()) {
1362
0
            return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) +
1363
0
                                         ": group by and output is empty");
1364
0
        }
1365
22.8k
        bool need_create_cache_op =
1366
22.8k
                enable_query_cache && tnode.node_id == _params.fragment.query_cache_param.node_id;
1367
22.8k
        auto create_query_cache_operator = [&](PipelinePtr& new_pipe) {
1368
0
            auto cache_node_id = _params.local_params[0].per_node_scan_ranges.begin()->first;
1369
0
            auto cache_source_id = next_operator_id();
1370
0
            op = std::make_shared<CacheSourceOperatorX>(pool, cache_node_id, cache_source_id,
1371
0
                                                        _params.fragment.query_cache_param);
1372
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1373
1374
0
            const auto downstream_pipeline_id = cur_pipe->id();
1375
0
            if (!_dag.contains(downstream_pipeline_id)) {
1376
0
                _dag.insert({downstream_pipeline_id, {}});
1377
0
            }
1378
0
            new_pipe = add_pipeline(cur_pipe);
1379
0
            _dag[downstream_pipeline_id].push_back(new_pipe->id());
1380
1381
0
            DataSinkOperatorPtr cache_sink(new CacheSinkOperatorX(
1382
0
                    next_sink_operator_id(), op->node_id(), op->operator_id()));
1383
0
            RETURN_IF_ERROR(new_pipe->set_sink(cache_sink));
1384
0
            return Status::OK();
1385
0
        };
1386
22.8k
        const bool group_by_limit_opt =
1387
22.8k
                tnode.agg_node.__isset.agg_sort_info_by_group_key && tnode.limit > 0;
1388
1389
        /// PartitionedAggSourceOperatorX does not support "group by limit opt(#29641)" yet.
1390
        /// If `group_by_limit_opt` is true, then it might not need to spill at all.
1391
22.8k
        const bool enable_spill = _runtime_state->enable_spill() &&
1392
22.8k
                                  !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt;
1393
22.8k
        const bool is_streaming_agg = tnode.agg_node.__isset.use_streaming_preaggregation &&
1394
22.8k
                                      tnode.agg_node.use_streaming_preaggregation &&
1395
22.8k
                                      !tnode.agg_node.grouping_exprs.empty();
1396
        // TODO: distinct streaming agg does not support spill.
1397
22.8k
        const bool can_use_distinct_streaming_agg =
1398
22.8k
                (!enable_spill || is_streaming_agg) && tnode.agg_node.aggregate_functions.empty() &&
1399
22.8k
                !tnode.agg_node.__isset.agg_sort_info_by_group_key &&
1400
22.8k
                _params.query_options.__isset.enable_distinct_streaming_aggregation &&
1401
22.8k
                _params.query_options.enable_distinct_streaming_aggregation;
1402
1403
22.8k
        if (can_use_distinct_streaming_agg) {
1404
36
            if (need_create_cache_op) {
1405
0
                PipelinePtr new_pipe;
1406
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1407
1408
0
                cache_op = op;
1409
0
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1410
0
                                                                     tnode, descs);
1411
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1412
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1413
0
                cur_pipe = new_pipe;
1414
36
            } else {
1415
36
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1416
36
                                                                     tnode, descs);
1417
36
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1418
36
            }
1419
22.8k
        } else if (is_streaming_agg) {
1420
2.00k
            if (need_create_cache_op) {
1421
0
                PipelinePtr new_pipe;
1422
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1423
0
                cache_op = op;
1424
0
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1425
0
                                                             descs);
1426
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1427
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1428
0
                cur_pipe = new_pipe;
1429
2.00k
            } else {
1430
2.00k
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1431
2.00k
                                                             descs);
1432
2.00k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1433
2.00k
            }
1434
20.8k
        } else {
1435
            // create new pipeline to add query cache operator
1436
20.8k
            PipelinePtr new_pipe;
1437
20.8k
            if (need_create_cache_op) {
1438
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1439
0
                cache_op = op;
1440
0
            }
1441
1442
20.8k
            if (enable_spill) {
1443
0
                op = std::make_shared<PartitionedAggSourceOperatorX>(pool, tnode,
1444
0
                                                                     next_operator_id(), descs);
1445
20.8k
            } else {
1446
20.8k
                op = std::make_shared<AggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1447
20.8k
            }
1448
20.8k
            if (need_create_cache_op) {
1449
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1450
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1451
0
                cur_pipe = new_pipe;
1452
20.8k
            } else {
1453
20.8k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1454
20.8k
            }
1455
1456
20.8k
            const auto downstream_pipeline_id = cur_pipe->id();
1457
20.8k
            if (!_dag.contains(downstream_pipeline_id)) {
1458
19.3k
                _dag.insert({downstream_pipeline_id, {}});
1459
19.3k
            }
1460
20.8k
            cur_pipe = add_pipeline(cur_pipe);
1461
20.8k
            _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1462
1463
20.8k
            if (enable_spill) {
1464
0
                sink_ops.push_back(std::make_shared<PartitionedAggSinkOperatorX>(
1465
0
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1466
20.8k
            } else {
1467
20.8k
                sink_ops.push_back(std::make_shared<AggSinkOperatorX>(
1468
20.8k
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1469
20.8k
            }
1470
20.8k
            RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1471
20.8k
            RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1472
20.8k
        }
1473
22.8k
        break;
1474
22.8k
    }
1475
22.8k
    case TPlanNodeType::BUCKETED_AGGREGATION_NODE: {
1476
0
        if (tnode.bucketed_agg_node.grouping_exprs.empty()) {
1477
0
            return Status::InternalError(
1478
0
                    "Bucketed aggregation node {} should not be used without group by keys",
1479
0
                    tnode.node_id);
1480
0
        }
1481
1482
        // Create source operator (goes on the current / downstream pipeline).
1483
0
        op = std::make_shared<BucketedAggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1484
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1485
1486
        // Create a new pipeline for the sink side.
1487
0
        const auto downstream_pipeline_id = cur_pipe->id();
1488
0
        if (!_dag.contains(downstream_pipeline_id)) {
1489
0
            _dag.insert({downstream_pipeline_id, {}});
1490
0
        }
1491
0
        cur_pipe = add_pipeline(cur_pipe);
1492
0
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1493
1494
        // Create sink operator.
1495
0
        sink_ops.push_back(std::make_shared<BucketedAggSinkOperatorX>(
1496
0
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1497
0
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1498
0
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1499
1500
        // Pre-register a single shared state for ALL instances so that every
1501
        // sink instance writes its per-instance hash table into the same
1502
        // BucketedAggSharedState and every source instance can merge across
1503
        // all of them.
1504
0
        {
1505
0
            auto shared_state = BucketedAggSharedState::create_shared();
1506
0
            shared_state->id = op->operator_id();
1507
0
            shared_state->related_op_ids.insert(op->operator_id());
1508
1509
0
            for (int i = 0; i < _num_instances; i++) {
1510
0
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1511
0
                                                             "BUCKETED_AGG_SINK_DEPENDENCY");
1512
0
                sink_dep->set_shared_state(shared_state.get());
1513
0
                shared_state->sink_deps.push_back(sink_dep);
1514
0
            }
1515
0
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1516
0
                                                     op->node_id(), "BUCKETED_AGG_SOURCE");
1517
0
            _op_id_to_shared_state.insert(
1518
0
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1519
0
        }
1520
0
        break;
1521
0
    }
1522
824
    case TPlanNodeType::HASH_JOIN_NODE: {
1523
824
        const auto is_broadcast_join = tnode.hash_join_node.__isset.is_broadcast_join &&
1524
824
                                       tnode.hash_join_node.is_broadcast_join;
1525
824
        const auto enable_spill = _runtime_state->enable_spill();
1526
824
        if (enable_spill && !is_broadcast_join) {
1527
0
            auto tnode_ = tnode;
1528
0
            tnode_.runtime_filters.clear();
1529
0
            auto inner_probe_operator =
1530
0
                    std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, descs);
1531
1532
            // probe side inner sink operator is used to build hash table on probe side when data is spilled.
1533
            // So here use `tnode_` which has no runtime filters.
1534
0
            auto probe_side_inner_sink_operator =
1535
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode_, descs);
1536
1537
0
            RETURN_IF_ERROR(inner_probe_operator->init(tnode_, _runtime_state.get()));
1538
0
            RETURN_IF_ERROR(probe_side_inner_sink_operator->init(tnode_, _runtime_state.get()));
1539
1540
0
            auto probe_operator = std::make_shared<PartitionedHashJoinProbeOperatorX>(
1541
0
                    pool, tnode_, next_operator_id(), descs);
1542
0
            probe_operator->set_inner_operators(probe_side_inner_sink_operator,
1543
0
                                                inner_probe_operator);
1544
0
            op = std::move(probe_operator);
1545
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1546
1547
0
            const auto downstream_pipeline_id = cur_pipe->id();
1548
0
            if (!_dag.contains(downstream_pipeline_id)) {
1549
0
                _dag.insert({downstream_pipeline_id, {}});
1550
0
            }
1551
0
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1552
0
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1553
1554
0
            auto inner_sink_operator =
1555
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode, descs);
1556
0
            auto sink_operator = std::make_shared<PartitionedHashJoinSinkOperatorX>(
1557
0
                    pool, next_sink_operator_id(), op->operator_id(), tnode_, descs);
1558
0
            RETURN_IF_ERROR(inner_sink_operator->init(tnode, _runtime_state.get()));
1559
1560
0
            sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator);
1561
0
            sink_ops.push_back(std::move(sink_operator));
1562
0
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1563
0
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode_, _runtime_state.get()));
1564
1565
0
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1566
0
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1567
824
        } else {
1568
824
            op = std::make_shared<HashJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1569
824
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1570
1571
824
            const auto downstream_pipeline_id = cur_pipe->id();
1572
824
            if (!_dag.contains(downstream_pipeline_id)) {
1573
812
                _dag.insert({downstream_pipeline_id, {}});
1574
812
            }
1575
824
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1576
824
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1577
1578
824
            sink_ops.push_back(std::make_shared<HashJoinBuildSinkOperatorX>(
1579
824
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1580
824
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1581
824
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1582
1583
824
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1584
824
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1585
824
        }
1586
824
        if (is_broadcast_join && _runtime_state->enable_share_hash_table_for_broadcast_join()) {
1587
748
            std::shared_ptr<HashJoinSharedState> shared_state =
1588
748
                    HashJoinSharedState::create_shared(_num_instances);
1589
6.53k
            for (int i = 0; i < _num_instances; i++) {
1590
5.78k
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1591
5.78k
                                                             "HASH_JOIN_BUILD_DEPENDENCY");
1592
5.78k
                sink_dep->set_shared_state(shared_state.get());
1593
5.78k
                shared_state->sink_deps.push_back(sink_dep);
1594
5.78k
            }
1595
748
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1596
748
                                                     op->node_id(), "HASH_JOIN_PROBE");
1597
748
            _op_id_to_shared_state.insert(
1598
748
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1599
748
        }
1600
824
        break;
1601
824
    }
1602
2.94k
    case TPlanNodeType::CROSS_JOIN_NODE: {
1603
2.94k
        op = std::make_shared<NestedLoopJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1604
2.94k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1605
1606
2.94k
        const auto downstream_pipeline_id = cur_pipe->id();
1607
2.94k
        if (!_dag.contains(downstream_pipeline_id)) {
1608
2.94k
            _dag.insert({downstream_pipeline_id, {}});
1609
2.94k
        }
1610
2.94k
        PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1611
2.94k
        _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1612
1613
2.94k
        sink_ops.push_back(std::make_shared<NestedLoopJoinBuildSinkOperatorX>(
1614
2.94k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1615
2.94k
        RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1616
2.94k
        RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1617
2.94k
        _pipeline_parent_map.push(op->node_id(), cur_pipe);
1618
2.94k
        _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1619
2.94k
        break;
1620
2.94k
    }
1621
4.09k
    case TPlanNodeType::UNION_NODE: {
1622
4.09k
        int child_count = tnode.num_children;
1623
4.09k
        op = std::make_shared<UnionSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1624
4.09k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1625
1626
4.09k
        const auto downstream_pipeline_id = cur_pipe->id();
1627
4.09k
        if (!_dag.contains(downstream_pipeline_id)) {
1628
4.04k
            _dag.insert({downstream_pipeline_id, {}});
1629
4.04k
        }
1630
4.76k
        for (int i = 0; i < child_count; i++) {
1631
676
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1632
676
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1633
676
            sink_ops.push_back(std::make_shared<UnionSinkOperatorX>(
1634
676
                    i, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1635
676
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1636
676
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1637
            // preset children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1638
676
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1639
676
        }
1640
4.09k
        break;
1641
4.09k
    }
1642
11.6k
    case TPlanNodeType::SORT_NODE: {
1643
11.6k
        const auto should_spill = _runtime_state->enable_spill() &&
1644
11.6k
                                  tnode.sort_node.algorithm == TSortAlgorithm::FULL_SORT;
1645
11.6k
        const bool use_local_merge =
1646
11.6k
                tnode.sort_node.__isset.use_local_merge && tnode.sort_node.use_local_merge;
1647
11.6k
        if (should_spill) {
1648
2
            op = std::make_shared<SpillSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1649
11.6k
        } else if (use_local_merge) {
1650
11.2k
            op = std::make_shared<LocalMergeSortSourceOperatorX>(pool, tnode, next_operator_id(),
1651
11.2k
                                                                 descs);
1652
11.2k
        } else {
1653
464
            op = std::make_shared<SortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1654
464
        }
1655
11.6k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1656
1657
11.6k
        const auto downstream_pipeline_id = cur_pipe->id();
1658
11.6k
        if (!_dag.contains(downstream_pipeline_id)) {
1659
11.6k
            _dag.insert({downstream_pipeline_id, {}});
1660
11.6k
        }
1661
11.6k
        cur_pipe = add_pipeline(cur_pipe);
1662
11.6k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1663
1664
11.6k
        if (should_spill) {
1665
2
            sink_ops.push_back(std::make_shared<SpillSortSinkOperatorX>(
1666
2
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1667
11.6k
        } else {
1668
11.6k
            sink_ops.push_back(std::make_shared<SortSinkOperatorX>(
1669
11.6k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1670
11.6k
        }
1671
11.6k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1672
11.6k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1673
11.6k
        break;
1674
11.6k
    }
1675
11.6k
    case TPlanNodeType::PARTITION_SORT_NODE: {
1676
0
        op = std::make_shared<PartitionSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1677
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1678
1679
0
        const auto downstream_pipeline_id = cur_pipe->id();
1680
0
        if (!_dag.contains(downstream_pipeline_id)) {
1681
0
            _dag.insert({downstream_pipeline_id, {}});
1682
0
        }
1683
0
        cur_pipe = add_pipeline(cur_pipe);
1684
0
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1685
1686
0
        sink_ops.push_back(std::make_shared<PartitionSortSinkOperatorX>(
1687
0
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1688
0
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1689
0
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1690
0
        break;
1691
0
    }
1692
44
    case TPlanNodeType::ANALYTIC_EVAL_NODE: {
1693
44
        op = std::make_shared<AnalyticSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1694
44
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1695
1696
44
        const auto downstream_pipeline_id = cur_pipe->id();
1697
44
        if (!_dag.contains(downstream_pipeline_id)) {
1698
44
            _dag.insert({downstream_pipeline_id, {}});
1699
44
        }
1700
44
        cur_pipe = add_pipeline(cur_pipe);
1701
44
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1702
1703
44
        sink_ops.push_back(std::make_shared<AnalyticSinkOperatorX>(
1704
44
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1705
44
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1706
44
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1707
44
        break;
1708
44
    }
1709
936
    case TPlanNodeType::MATERIALIZATION_NODE: {
1710
936
        op = std::make_shared<MaterializationOperator>(pool, tnode, next_operator_id(), descs);
1711
936
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1712
936
        break;
1713
936
    }
1714
936
    case TPlanNodeType::INTERSECT_NODE: {
1715
0
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(pool, tnode, descs, op,
1716
0
                                                                      cur_pipe, sink_ops));
1717
0
        break;
1718
0
    }
1719
0
    case TPlanNodeType::EXCEPT_NODE: {
1720
0
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(pool, tnode, descs, op,
1721
0
                                                                       cur_pipe, sink_ops));
1722
0
        break;
1723
0
    }
1724
0
    case TPlanNodeType::REPEAT_NODE: {
1725
0
        op = std::make_shared<RepeatOperatorX>(pool, tnode, next_operator_id(), descs);
1726
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1727
0
        break;
1728
0
    }
1729
4
    case TPlanNodeType::TABLE_FUNCTION_NODE: {
1730
4
        op = std::make_shared<TableFunctionOperatorX>(pool, tnode, next_operator_id(), descs);
1731
4
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1732
4
        break;
1733
4
    }
1734
200
    case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
1735
200
        op = std::make_shared<AssertNumRowsOperatorX>(pool, tnode, next_operator_id(), descs);
1736
200
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1737
200
        break;
1738
200
    }
1739
200
    case TPlanNodeType::EMPTY_SET_NODE: {
1740
142
        op = std::make_shared<EmptySetSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1741
142
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1742
142
        break;
1743
142
    }
1744
194
    case TPlanNodeType::DATA_GEN_SCAN_NODE: {
1745
194
        op = std::make_shared<DataGenSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1746
194
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1747
194
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1748
194
        break;
1749
194
    }
1750
784
    case TPlanNodeType::SCHEMA_SCAN_NODE: {
1751
784
        op = std::make_shared<SchemaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1752
784
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1753
784
        break;
1754
784
    }
1755
1.48k
    case TPlanNodeType::META_SCAN_NODE: {
1756
1.48k
        op = std::make_shared<MetaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1757
1.48k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1758
1.48k
        break;
1759
1.48k
    }
1760
1.48k
    case TPlanNodeType::SELECT_NODE: {
1761
1.44k
        op = std::make_shared<SelectOperatorX>(pool, tnode, next_operator_id(), descs);
1762
1.44k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1763
1.44k
        break;
1764
1.44k
    }
1765
1.44k
    case TPlanNodeType::REC_CTE_NODE: {
1766
0
        op = std::make_shared<RecCTESourceOperatorX>(pool, tnode, next_operator_id(), descs);
1767
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1768
1769
0
        const auto downstream_pipeline_id = cur_pipe->id();
1770
0
        if (!_dag.contains(downstream_pipeline_id)) {
1771
0
            _dag.insert({downstream_pipeline_id, {}});
1772
0
        }
1773
1774
0
        PipelinePtr anchor_side_pipe = add_pipeline(cur_pipe);
1775
0
        _dag[downstream_pipeline_id].push_back(anchor_side_pipe->id());
1776
1777
0
        DataSinkOperatorPtr anchor_sink;
1778
0
        anchor_sink = std::make_shared<RecCTEAnchorSinkOperatorX>(next_sink_operator_id(),
1779
0
                                                                  op->operator_id(), tnode, descs);
1780
0
        RETURN_IF_ERROR(anchor_side_pipe->set_sink(anchor_sink));
1781
0
        RETURN_IF_ERROR(anchor_side_pipe->sink()->init(tnode, _runtime_state.get()));
1782
0
        _pipeline_parent_map.push(op->node_id(), anchor_side_pipe);
1783
1784
0
        PipelinePtr rec_side_pipe = add_pipeline(cur_pipe);
1785
0
        _dag[downstream_pipeline_id].push_back(rec_side_pipe->id());
1786
1787
0
        DataSinkOperatorPtr rec_sink;
1788
0
        rec_sink = std::make_shared<RecCTESinkOperatorX>(next_sink_operator_id(), op->operator_id(),
1789
0
                                                         tnode, descs);
1790
0
        RETURN_IF_ERROR(rec_side_pipe->set_sink(rec_sink));
1791
0
        RETURN_IF_ERROR(rec_side_pipe->sink()->init(tnode, _runtime_state.get()));
1792
0
        _pipeline_parent_map.push(op->node_id(), rec_side_pipe);
1793
1794
0
        break;
1795
0
    }
1796
0
    case TPlanNodeType::REC_CTE_SCAN_NODE: {
1797
0
        op = std::make_shared<RecCTEScanOperatorX>(pool, tnode, next_operator_id(), descs);
1798
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1799
0
        break;
1800
0
    }
1801
0
    default:
1802
0
        return Status::InternalError("Unsupported exec type in pipeline: {}",
1803
0
                                     print_plan_node_type(tnode.node_type));
1804
154k
    }
1805
154k
    if (_params.__isset.parallel_instances && fe_with_old_version) {
1806
0
        cur_pipe->set_num_tasks(_params.parallel_instances);
1807
0
        op->set_serial_operator();
1808
0
    }
1809
1810
154k
    return Status::OK();
1811
154k
}
1812
// NOLINTEND(readability-function-cognitive-complexity)
1813
// NOLINTEND(readability-function-size)
1814
1815
template <bool is_intersect>
1816
Status PipelineFragmentContext::_build_operators_for_set_operation_node(
1817
        ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op,
1818
0
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1819
0
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1820
0
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1821
1822
0
    const auto downstream_pipeline_id = cur_pipe->id();
1823
0
    if (!_dag.contains(downstream_pipeline_id)) {
1824
0
        _dag.insert({downstream_pipeline_id, {}});
1825
0
    }
1826
1827
0
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1828
0
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1829
0
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1830
1831
0
        if (child_id == 0) {
1832
0
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1833
0
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1834
0
        } else {
1835
0
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1836
0
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1837
0
        }
1838
0
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1839
0
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1840
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1841
0
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1842
0
    }
1843
1844
0
    return Status::OK();
1845
0
}
Unexecuted instantiation: _ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb1EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Unexecuted instantiation: _ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb0EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
1846
1847
108k
Status PipelineFragmentContext::submit() {
1848
108k
    if (_submitted) {
1849
0
        return Status::InternalError("submitted");
1850
0
    }
1851
108k
    _submitted = true;
1852
1853
108k
    int submit_tasks = 0;
1854
108k
    Status st;
1855
108k
    auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
1856
222k
    for (auto& task : _tasks) {
1857
395k
        for (auto& t : task) {
1858
395k
            st = scheduler->submit(t.first);
1859
395k
            DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed",
1860
395k
                            { st = Status::Aborted("PipelineFragmentContext.submit.failed"); });
1861
395k
            if (!st) {
1862
0
                cancel(Status::InternalError("submit context to executor fail"));
1863
0
                std::lock_guard<std::mutex> l(_task_mutex);
1864
0
                _total_tasks = submit_tasks;
1865
0
                break;
1866
0
            }
1867
395k
            submit_tasks++;
1868
395k
        }
1869
222k
    }
1870
108k
    if (!st.ok()) {
1871
0
        bool need_remove = false;
1872
0
        {
1873
0
            std::lock_guard<std::mutex> l(_task_mutex);
1874
0
            if (_closed_tasks >= _total_tasks) {
1875
0
                need_remove = _close_fragment_instance();
1876
0
            }
1877
0
        }
1878
        // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
1879
0
        if (need_remove) {
1880
0
            _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
1881
0
        }
1882
0
        return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(),
1883
0
                                     BackendOptions::get_localhost());
1884
108k
    } else {
1885
108k
        return st;
1886
108k
    }
1887
108k
}
1888
1889
12
void PipelineFragmentContext::print_profile(const std::string& extra_info) {
1890
12
    if (_runtime_state->enable_profile()) {
1891
0
        std::stringstream ss;
1892
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
1893
0
            runtime_profile_ptr->pretty_print(&ss);
1894
0
        }
1895
1896
0
        if (_runtime_state->load_channel_profile()) {
1897
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
1898
0
        }
1899
1900
0
        auto profile_str =
1901
0
                fmt::format("Query {} fragment {} {}, profile, {}", print_id(this->_query_id),
1902
0
                            this->_fragment_id, extra_info, ss.str());
1903
0
        LOG_LONG_STRING(INFO, profile_str);
1904
0
    }
1905
12
}
1906
// If all pipeline tasks binded to the fragment instance are finished, then we could
1907
// close the fragment instance.
1908
// Returns true if the caller should call remove_pipeline_context() **after** releasing
1909
// _task_mutex. We must not call remove_pipeline_context() here because it acquires
1910
// _pipeline_map's shard lock, and this function is called while _task_mutex is held.
1911
// Acquiring _pipeline_map while holding _task_mutex creates an ABBA deadlock with
1912
// dump_pipeline_tasks(), which acquires _pipeline_map first and then _task_mutex
1913
// (via debug_string()).
1914
108k
bool PipelineFragmentContext::_close_fragment_instance() {
1915
108k
    if (_is_fragment_instance_closed) {
1916
0
        return false;
1917
0
    }
1918
108k
    Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
1919
108k
    _fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
1920
108k
    if (!_need_notify_close) {
1921
108k
        auto st = send_report(true);
1922
108k
        if (!st) {
1923
0
            LOG(WARNING) << fmt::format("Failed to send report for query {}, fragment {}: {}",
1924
0
                                        print_id(_query_id), _fragment_id, st.to_string());
1925
0
        }
1926
108k
    }
1927
    // Print profile content in info log is a tempoeray solution for stream load and external_connector.
1928
    // Since stream load does not have someting like coordinator on FE, so
1929
    // backend can not report profile to FE, ant its profile can not be shown
1930
    // in the same way with other query. So we print the profile content to info log.
1931
1932
108k
    if (_runtime_state->enable_profile() &&
1933
108k
        (_query_ctx->get_query_source() == QuerySource::STREAM_LOAD ||
1934
440
         _query_ctx->get_query_source() == QuerySource::EXTERNAL_CONNECTOR ||
1935
440
         _query_ctx->get_query_source() == QuerySource::GROUP_COMMIT_LOAD)) {
1936
0
        std::stringstream ss;
1937
        // Compute the _local_time_percent before pretty_print the runtime_profile
1938
        // Before add this operation, the print out like that:
1939
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 00.00%)
1940
        // After add the operation, the print out like that:
1941
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 82.53%)
1942
        // We can easily know the exec node execute time without child time consumed.
1943
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
1944
0
            runtime_profile_ptr->pretty_print(&ss);
1945
0
        }
1946
1947
0
        if (_runtime_state->load_channel_profile()) {
1948
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
1949
0
        }
1950
1951
0
        LOG_INFO("Query {} fragment {} profile:\n {}", print_id(_query_id), _fragment_id, ss.str());
1952
0
    }
1953
1954
108k
    if (_query_ctx->enable_profile()) {
1955
440
        _query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile(),
1956
440
                                         collect_realtime_load_channel_profile());
1957
440
    }
1958
1959
    // Return whether the caller needs to remove from the pipeline map.
1960
    // The caller must do this after releasing _task_mutex.
1961
108k
    return !_need_notify_close;
1962
108k
}
1963
1964
393k
void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
1965
    // If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here
1966
393k
    DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
1967
393k
    if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
1968
166k
        if (_dag.contains(pipeline_id)) {
1969
73.8k
            for (auto dep : _dag[pipeline_id]) {
1970
73.8k
                _pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id);
1971
73.8k
            }
1972
58.7k
        }
1973
166k
    }
1974
393k
    bool need_remove = false;
1975
393k
    {
1976
393k
        std::lock_guard<std::mutex> l(_task_mutex);
1977
393k
        ++_closed_tasks;
1978
393k
        if (_closed_tasks >= _total_tasks) {
1979
108k
            need_remove = _close_fragment_instance();
1980
108k
        }
1981
393k
    }
1982
    // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
1983
393k
    if (need_remove) {
1984
108k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
1985
108k
    }
1986
393k
}
1987
1988
12.1k
std::string PipelineFragmentContext::get_load_error_url() {
1989
12.1k
    if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) {
1990
0
        return to_load_error_http_path(str);
1991
0
    }
1992
37.7k
    for (auto& tasks : _tasks) {
1993
49.0k
        for (auto& task : tasks) {
1994
49.0k
            if (const auto& str = task.second->get_error_log_file_path(); !str.empty()) {
1995
14
                return to_load_error_http_path(str);
1996
14
            }
1997
49.0k
        }
1998
37.7k
    }
1999
12.1k
    return "";
2000
12.1k
}
2001
2002
12.1k
std::string PipelineFragmentContext::get_first_error_msg() {
2003
12.1k
    if (const auto& str = _runtime_state->get_first_error_msg(); !str.empty()) {
2004
0
        return str;
2005
0
    }
2006
37.8k
    for (auto& tasks : _tasks) {
2007
49.0k
        for (auto& task : tasks) {
2008
49.0k
            if (const auto& str = task.second->get_first_error_msg(); !str.empty()) {
2009
14
                return str;
2010
14
            }
2011
49.0k
        }
2012
37.8k
    }
2013
12.1k
    return "";
2014
12.1k
}
2015
2016
0
std::string PipelineFragmentContext::_to_http_path(const std::string& file_name) const {
2017
0
    std::stringstream url;
2018
0
    url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
2019
0
        << "/api/_download_load?"
2020
0
        << "token=" << _exec_env->token() << "&file=" << file_name;
2021
0
    return url.str();
2022
0
}
2023
2024
11.3k
void PipelineFragmentContext::_coordinator_callback(const ReportStatusRequest& req) {
2025
11.3k
    DBUG_EXECUTE_IF("FragmentMgr::coordinator_callback.report_delay", {
2026
11.3k
        int random_seconds = req.status.is<ErrorCode::DATA_QUALITY_ERROR>() ? 8 : 2;
2027
11.3k
        LOG_INFO("sleep : ").tag("time", random_seconds).tag("query_id", print_id(req.query_id));
2028
11.3k
        std::this_thread::sleep_for(std::chrono::seconds(random_seconds));
2029
11.3k
        LOG_INFO("sleep done").tag("query_id", print_id(req.query_id));
2030
11.3k
    });
2031
2032
11.3k
    DCHECK(req.status.ok() || req.done); // if !status.ok() => done
2033
11.3k
    if (req.coord_addr.hostname == "external") {
2034
        // External query (flink/spark read tablets) not need to report to FE.
2035
0
        return;
2036
0
    }
2037
11.3k
    int callback_retries = 10;
2038
11.3k
    const int sleep_ms = 1000;
2039
11.3k
    Status exec_status = req.status;
2040
11.3k
    Status coord_status;
2041
11.3k
    std::unique_ptr<FrontendServiceConnection> coord = nullptr;
2042
11.3k
    do {
2043
11.3k
        coord = std::make_unique<FrontendServiceConnection>(_exec_env->frontend_client_cache(),
2044
11.3k
                                                            req.coord_addr, &coord_status);
2045
11.3k
        if (!coord_status.ok()) {
2046
0
            std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
2047
0
        }
2048
11.3k
    } while (!coord_status.ok() && callback_retries-- > 0);
2049
2050
11.3k
    if (!coord_status.ok()) {
2051
0
        UniqueId uid(req.query_id.hi, req.query_id.lo);
2052
0
        static_cast<void>(req.cancel_fn(Status::InternalError(
2053
0
                "query_id: {}, couldn't get a client for {}, reason is {}", uid.to_string(),
2054
0
                PrintThriftNetworkAddress(req.coord_addr), coord_status.to_string())));
2055
0
        return;
2056
0
    }
2057
2058
11.3k
    TReportExecStatusParams params;
2059
11.3k
    params.protocol_version = FrontendServiceVersion::V1;
2060
11.3k
    params.__set_query_id(req.query_id);
2061
11.3k
    params.__set_backend_num(req.backend_num);
2062
11.3k
    params.__set_fragment_instance_id(req.fragment_instance_id);
2063
11.3k
    params.__set_fragment_id(req.fragment_id);
2064
11.3k
    params.__set_status(exec_status.to_thrift());
2065
11.3k
    params.__set_done(req.done);
2066
11.3k
    params.__set_query_type(req.runtime_state->query_type());
2067
11.3k
    params.__isset.profile = false;
2068
2069
11.3k
    DCHECK(req.runtime_state != nullptr);
2070
2071
11.3k
    if (req.runtime_state->query_type() == TQueryType::LOAD) {
2072
10.4k
        params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
2073
10.4k
        params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
2074
10.4k
    } else {
2075
950
        DCHECK(!req.runtime_states.empty());
2076
950
        if (!req.runtime_state->output_files().empty()) {
2077
0
            params.__isset.delta_urls = true;
2078
0
            for (auto& it : req.runtime_state->output_files()) {
2079
0
                params.delta_urls.push_back(_to_http_path(it));
2080
0
            }
2081
0
        }
2082
950
        if (!params.delta_urls.empty()) {
2083
0
            params.__isset.delta_urls = true;
2084
0
        }
2085
950
    }
2086
2087
11.3k
    static std::string s_dpp_normal_all = "dpp.norm.ALL";
2088
11.3k
    static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
2089
11.3k
    static std::string s_unselected_rows = "unselected.rows";
2090
11.3k
    int64_t num_rows_load_success = 0;
2091
11.3k
    int64_t num_rows_load_filtered = 0;
2092
11.3k
    int64_t num_rows_load_unselected = 0;
2093
11.3k
    if (req.runtime_state->num_rows_load_total() > 0 ||
2094
11.3k
        req.runtime_state->num_rows_load_filtered() > 0 ||
2095
11.3k
        req.runtime_state->num_finished_range() > 0) {
2096
0
        params.__isset.load_counters = true;
2097
2098
0
        num_rows_load_success = req.runtime_state->num_rows_load_success();
2099
0
        num_rows_load_filtered = req.runtime_state->num_rows_load_filtered();
2100
0
        num_rows_load_unselected = req.runtime_state->num_rows_load_unselected();
2101
0
        params.__isset.fragment_instance_reports = true;
2102
0
        TFragmentInstanceReport t;
2103
0
        t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id());
2104
0
        t.__set_num_finished_range(cast_set<int>(req.runtime_state->num_finished_range()));
2105
0
        t.__set_loaded_rows(req.runtime_state->num_rows_load_total());
2106
0
        t.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
2107
0
        params.fragment_instance_reports.push_back(t);
2108
11.3k
    } else if (!req.runtime_states.empty()) {
2109
39.7k
        for (auto* rs : req.runtime_states) {
2110
39.7k
            if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 ||
2111
39.7k
                rs->num_finished_range() > 0) {
2112
5.51k
                params.__isset.load_counters = true;
2113
5.51k
                num_rows_load_success += rs->num_rows_load_success();
2114
5.51k
                num_rows_load_filtered += rs->num_rows_load_filtered();
2115
5.51k
                num_rows_load_unselected += rs->num_rows_load_unselected();
2116
5.51k
                params.__isset.fragment_instance_reports = true;
2117
5.51k
                TFragmentInstanceReport t;
2118
5.51k
                t.__set_fragment_instance_id(rs->fragment_instance_id());
2119
5.51k
                t.__set_num_finished_range(cast_set<int>(rs->num_finished_range()));
2120
5.51k
                t.__set_loaded_rows(rs->num_rows_load_total());
2121
5.51k
                t.__set_loaded_bytes(rs->num_bytes_load_total());
2122
5.51k
                params.fragment_instance_reports.push_back(t);
2123
5.51k
            }
2124
39.7k
        }
2125
11.3k
    }
2126
11.3k
    params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success));
2127
11.3k
    params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered));
2128
11.3k
    params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected));
2129
2130
11.3k
    if (!req.load_error_url.empty()) {
2131
14
        params.__set_tracking_url(req.load_error_url);
2132
14
    }
2133
11.3k
    if (!req.first_error_msg.empty()) {
2134
14
        params.__set_first_error_msg(req.first_error_msg);
2135
14
    }
2136
39.7k
    for (auto* rs : req.runtime_states) {
2137
39.7k
        if (rs->wal_id() > 0) {
2138
0
            params.__set_txn_id(rs->wal_id());
2139
0
            params.__set_label(rs->import_label());
2140
0
        }
2141
39.7k
    }
2142
11.3k
    if (!req.runtime_state->export_output_files().empty()) {
2143
0
        params.__isset.export_files = true;
2144
0
        params.export_files = req.runtime_state->export_output_files();
2145
11.3k
    } else if (!req.runtime_states.empty()) {
2146
39.7k
        for (auto* rs : req.runtime_states) {
2147
39.7k
            if (!rs->export_output_files().empty()) {
2148
0
                params.__isset.export_files = true;
2149
0
                params.export_files.insert(params.export_files.end(),
2150
0
                                           rs->export_output_files().begin(),
2151
0
                                           rs->export_output_files().end());
2152
0
            }
2153
39.7k
        }
2154
11.3k
    }
2155
11.3k
    if (auto tci = req.runtime_state->tablet_commit_infos(); !tci.empty()) {
2156
0
        params.__isset.commitInfos = true;
2157
0
        params.commitInfos.insert(params.commitInfos.end(), tci.begin(), tci.end());
2158
11.3k
    } else if (!req.runtime_states.empty()) {
2159
39.7k
        for (auto* rs : req.runtime_states) {
2160
39.7k
            if (auto rs_tci = rs->tablet_commit_infos(); !rs_tci.empty()) {
2161
2.07k
                params.__isset.commitInfos = true;
2162
2.07k
                params.commitInfos.insert(params.commitInfos.end(), rs_tci.begin(), rs_tci.end());
2163
2.07k
            }
2164
39.7k
        }
2165
11.3k
    }
2166
11.3k
    if (auto eti = req.runtime_state->error_tablet_infos(); !eti.empty()) {
2167
0
        params.__isset.errorTabletInfos = true;
2168
0
        params.errorTabletInfos.insert(params.errorTabletInfos.end(), eti.begin(), eti.end());
2169
11.3k
    } else if (!req.runtime_states.empty()) {
2170
39.7k
        for (auto* rs : req.runtime_states) {
2171
39.7k
            if (auto rs_eti = rs->error_tablet_infos(); !rs_eti.empty()) {
2172
0
                params.__isset.errorTabletInfos = true;
2173
0
                params.errorTabletInfos.insert(params.errorTabletInfos.end(), rs_eti.begin(),
2174
0
                                               rs_eti.end());
2175
0
            }
2176
39.7k
        }
2177
11.3k
    }
2178
11.3k
    if (auto hpu = req.runtime_state->hive_partition_updates(); !hpu.empty()) {
2179
0
        params.__isset.hive_partition_updates = true;
2180
0
        params.hive_partition_updates.insert(params.hive_partition_updates.end(), hpu.begin(),
2181
0
                                             hpu.end());
2182
11.3k
    } else if (!req.runtime_states.empty()) {
2183
39.7k
        for (auto* rs : req.runtime_states) {
2184
39.7k
            if (auto rs_hpu = rs->hive_partition_updates(); !rs_hpu.empty()) {
2185
2.12k
                params.__isset.hive_partition_updates = true;
2186
2.12k
                params.hive_partition_updates.insert(params.hive_partition_updates.end(),
2187
2.12k
                                                     rs_hpu.begin(), rs_hpu.end());
2188
2.12k
            }
2189
39.7k
        }
2190
11.3k
    }
2191
11.3k
    if (auto icd = req.runtime_state->iceberg_commit_datas(); !icd.empty()) {
2192
0
        params.__isset.iceberg_commit_datas = true;
2193
0
        params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(), icd.begin(),
2194
0
                                           icd.end());
2195
11.3k
    } else if (!req.runtime_states.empty()) {
2196
39.7k
        for (auto* rs : req.runtime_states) {
2197
39.7k
            if (auto rs_icd = rs->iceberg_commit_datas(); !rs_icd.empty()) {
2198
2.07k
                params.__isset.iceberg_commit_datas = true;
2199
2.07k
                params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(),
2200
2.07k
                                                   rs_icd.begin(), rs_icd.end());
2201
2.07k
            }
2202
39.7k
        }
2203
11.3k
    }
2204
2205
11.3k
    if (auto mcd = req.runtime_state->mc_commit_datas(); !mcd.empty()) {
2206
0
        params.__isset.mc_commit_datas = true;
2207
0
        params.mc_commit_datas.insert(params.mc_commit_datas.end(), mcd.begin(), mcd.end());
2208
11.3k
    } else if (!req.runtime_states.empty()) {
2209
39.7k
        for (auto* rs : req.runtime_states) {
2210
39.7k
            if (auto rs_mcd = rs->mc_commit_datas(); !rs_mcd.empty()) {
2211
0
                params.__isset.mc_commit_datas = true;
2212
0
                params.mc_commit_datas.insert(params.mc_commit_datas.end(), rs_mcd.begin(),
2213
0
                                              rs_mcd.end());
2214
0
            }
2215
39.7k
        }
2216
11.3k
    }
2217
2218
11.3k
    req.runtime_state->get_unreported_errors(&(params.error_log));
2219
11.3k
    params.__isset.error_log = (!params.error_log.empty());
2220
2221
11.3k
    if (_exec_env->cluster_info()->backend_id != 0) {
2222
11.3k
        params.__set_backend_id(_exec_env->cluster_info()->backend_id);
2223
11.3k
    }
2224
2225
11.3k
    TReportExecStatusResult res;
2226
11.3k
    Status rpc_status;
2227
2228
11.3k
    VLOG_DEBUG << "reportExecStatus params is "
2229
0
               << apache::thrift::ThriftDebugString(params).c_str();
2230
11.3k
    if (!exec_status.ok()) {
2231
106
        LOG(WARNING) << "report error status: " << exec_status.msg()
2232
106
                     << " to coordinator: " << req.coord_addr
2233
106
                     << ", query id: " << print_id(req.query_id);
2234
106
    }
2235
11.3k
    try {
2236
11.3k
        try {
2237
11.3k
            (*coord)->reportExecStatus(res, params);
2238
11.3k
        } catch ([[maybe_unused]] apache::thrift::transport::TTransportException& e) {
2239
#ifndef ADDRESS_SANITIZER
2240
            LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(req.query_id)
2241
                         << ", instance id: " << print_id(req.fragment_instance_id) << " to "
2242
                         << req.coord_addr << ", err: " << e.what();
2243
#endif
2244
0
            rpc_status = coord->reopen();
2245
2246
0
            if (!rpc_status.ok()) {
2247
0
                req.cancel_fn(rpc_status);
2248
0
                return;
2249
0
            }
2250
0
            (*coord)->reportExecStatus(res, params);
2251
0
        }
2252
2253
11.3k
        rpc_status = Status::create<false>(res.status);
2254
11.3k
    } catch (apache::thrift::TException& e) {
2255
0
        rpc_status = Status::InternalError("ReportExecStatus() to {} failed: {}",
2256
0
                                           PrintThriftNetworkAddress(req.coord_addr), e.what());
2257
0
    }
2258
2259
11.3k
    if (!rpc_status.ok()) {
2260
0
        LOG_INFO("Going to cancel query {} since report exec status got rpc failed: {}",
2261
0
                 print_id(req.query_id), rpc_status.to_string());
2262
0
        req.cancel_fn(rpc_status);
2263
0
    }
2264
11.3k
}
2265
2266
110k
Status PipelineFragmentContext::send_report(bool done) {
2267
110k
    Status exec_status = _query_ctx->exec_status();
2268
    // If plan is done successfully, but _is_report_success is false,
2269
    // no need to send report.
2270
    // Load will set _is_report_success to true because load wants to know
2271
    // the process.
2272
110k
    if (!_is_report_success && done && exec_status.ok()) {
2273
98.5k
        return Status::OK();
2274
98.5k
    }
2275
2276
    // If both _is_report_success and _is_report_on_cancel are false,
2277
    // which means no matter query is success or failed, no report is needed.
2278
    // This may happen when the query limit reached and
2279
    // a internal cancellation being processed
2280
    // When limit is reached the fragment is also cancelled, but _is_report_on_cancel will
2281
    // be set to false, to avoid sending fault report to FE.
2282
11.5k
    if (!_is_report_success && !_is_report_on_cancel) {
2283
208
        if (done) {
2284
            // if done is true, which means the query is finished successfully, we can safely close the fragment instance without sending report to FE, and just return OK status here.
2285
208
            return Status::OK();
2286
208
        }
2287
0
        return Status::NeedSendAgain("");
2288
208
    }
2289
2290
11.3k
    std::vector<RuntimeState*> runtime_states;
2291
2292
34.0k
    for (auto& tasks : _tasks) {
2293
39.7k
        for (auto& task : tasks) {
2294
39.7k
            runtime_states.push_back(task.second.get());
2295
39.7k
        }
2296
34.0k
    }
2297
2298
11.3k
    std::string load_eror_url = _query_ctx->get_load_error_url().empty()
2299
11.3k
                                        ? get_load_error_url()
2300
18.4E
                                        : _query_ctx->get_load_error_url();
2301
11.3k
    std::string first_error_msg = _query_ctx->get_first_error_msg().empty()
2302
11.3k
                                          ? get_first_error_msg()
2303
18.4E
                                          : _query_ctx->get_first_error_msg();
2304
2305
11.3k
    ReportStatusRequest req {.status = exec_status,
2306
11.3k
                             .runtime_states = runtime_states,
2307
11.3k
                             .done = done || !exec_status.ok(),
2308
11.3k
                             .coord_addr = _query_ctx->coord_addr,
2309
11.3k
                             .query_id = _query_id,
2310
11.3k
                             .fragment_id = _fragment_id,
2311
11.3k
                             .fragment_instance_id = TUniqueId(),
2312
11.3k
                             .backend_num = -1,
2313
11.3k
                             .runtime_state = _runtime_state.get(),
2314
11.3k
                             .load_error_url = load_eror_url,
2315
11.3k
                             .first_error_msg = first_error_msg,
2316
11.3k
                             .cancel_fn = [this](const Status& reason) { cancel(reason); }};
2317
11.3k
    auto ctx = std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this());
2318
11.3k
    return _exec_env->fragment_mgr()->get_thread_pool()->submit_func([this, req, ctx]() {
2319
11.3k
        SCOPED_ATTACH_TASK(ctx->get_query_ctx()->query_mem_tracker());
2320
11.3k
        _coordinator_callback(req);
2321
11.3k
        if (!req.done) {
2322
1.19k
            ctx->refresh_next_report_time();
2323
1.19k
        }
2324
11.3k
    });
2325
11.5k
}
2326
2327
0
size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const {
2328
0
    size_t res = 0;
2329
    // _tasks will be cleared during ~PipelineFragmentContext, so that it's safe
2330
    // here to traverse the vector.
2331
0
    for (const auto& task_instances : _tasks) {
2332
0
        for (const auto& task : task_instances) {
2333
0
            if (task.first->is_running()) {
2334
0
                LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id)
2335
0
                                      << " is running, task: " << (void*)task.first.get()
2336
0
                                      << ", is_running: " << task.first->is_running();
2337
0
                *has_running_task = true;
2338
0
                return 0;
2339
0
            }
2340
2341
0
            size_t revocable_size = task.first->get_revocable_size();
2342
0
            if (revocable_size >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2343
0
                res += revocable_size;
2344
0
            }
2345
0
        }
2346
0
    }
2347
0
    return res;
2348
0
}
2349
2350
0
std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const {
2351
0
    std::vector<PipelineTask*> revocable_tasks;
2352
0
    for (const auto& task_instances : _tasks) {
2353
0
        for (const auto& task : task_instances) {
2354
0
            size_t revocable_size_ = task.first->get_revocable_size();
2355
2356
0
            if (revocable_size_ >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2357
0
                revocable_tasks.emplace_back(task.first.get());
2358
0
            }
2359
0
        }
2360
0
    }
2361
0
    return revocable_tasks;
2362
0
}
2363
2364
0
std::string PipelineFragmentContext::debug_string() {
2365
0
    std::lock_guard<std::mutex> l(_task_mutex);
2366
0
    fmt::memory_buffer debug_string_buffer;
2367
0
    fmt::format_to(debug_string_buffer,
2368
0
                   "PipelineFragmentContext Info: _closed_tasks={}, _total_tasks={}, "
2369
0
                   "need_notify_close={}, fragment_id={}, _rec_cte_stage={}\n",
2370
0
                   _closed_tasks, _total_tasks, _need_notify_close, _fragment_id, _rec_cte_stage);
2371
0
    for (size_t j = 0; j < _tasks.size(); j++) {
2372
0
        fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
2373
0
        for (size_t i = 0; i < _tasks[j].size(); i++) {
2374
0
            fmt::format_to(debug_string_buffer, "Task {}: {}\n", i,
2375
0
                           _tasks[j][i].first->debug_string());
2376
0
        }
2377
0
    }
2378
2379
0
    return fmt::to_string(debug_string_buffer);
2380
0
}
2381
2382
std::vector<std::shared_ptr<TRuntimeProfileTree>>
2383
440
PipelineFragmentContext::collect_realtime_profile() const {
2384
440
    std::vector<std::shared_ptr<TRuntimeProfileTree>> res;
2385
2386
    // we do not have mutex to protect pipeline_id_to_profile
2387
    // so we need to make sure this funciton is invoked after fragment context
2388
    // has already been prepared.
2389
440
    if (!_prepared) {
2390
0
        std::string msg =
2391
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2392
0
        DCHECK(false) << msg;
2393
0
        LOG_ERROR(msg);
2394
0
        return res;
2395
0
    }
2396
2397
    // Make sure first profile is fragment level profile
2398
440
    auto fragment_profile = std::make_shared<TRuntimeProfileTree>();
2399
440
    _fragment_level_profile->to_thrift(fragment_profile.get(), _runtime_state->profile_level());
2400
440
    res.push_back(fragment_profile);
2401
2402
    // pipeline_id_to_profile is initialized in prepare stage
2403
648
    for (auto pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
2404
648
        auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
2405
648
        pipeline_profile->to_thrift(profile_ptr.get(), _runtime_state->profile_level());
2406
648
        res.push_back(profile_ptr);
2407
648
    }
2408
2409
440
    return res;
2410
440
}
2411
2412
std::shared_ptr<TRuntimeProfileTree>
2413
440
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
2414
    // we do not have mutex to protect pipeline_id_to_profile
2415
    // so we need to make sure this funciton is invoked after fragment context
2416
    // has already been prepared.
2417
440
    if (!_prepared) {
2418
0
        std::string msg =
2419
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2420
0
        DCHECK(false) << msg;
2421
0
        LOG_ERROR(msg);
2422
0
        return nullptr;
2423
0
    }
2424
2425
608
    for (const auto& tasks : _tasks) {
2426
1.06k
        for (const auto& task : tasks) {
2427
1.06k
            if (task.second->load_channel_profile() == nullptr) {
2428
0
                continue;
2429
0
            }
2430
2431
1.06k
            auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2432
2433
1.06k
            task.second->load_channel_profile()->to_thrift(tmp_load_channel_profile.get(),
2434
1.06k
                                                           _runtime_state->profile_level());
2435
1.06k
            _runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
2436
1.06k
        }
2437
608
    }
2438
2439
440
    auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2440
440
    _runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get(),
2441
440
                                                      _runtime_state->profile_level());
2442
440
    return load_channel_profile;
2443
440
}
2444
2445
// Collect runtime filter IDs registered by all tasks in this PFC.
2446
// Used during recursive CTE stage transitions to know which filters to deregister
2447
// before creating the new PFC for the next recursion round.
2448
// Called from rerun_fragment(wait_for_destroy) while tasks are still closing.
2449
// Thread safety: safe because _tasks is structurally immutable after prepare() —
2450
// the vector sizes do not change, and individual RuntimeState filter sets are
2451
// written only during open() which has completed by the time we reach rerun.
2452
0
std::set<int> PipelineFragmentContext::get_deregister_runtime_filter() const {
2453
0
    std::set<int> result;
2454
0
    for (const auto& _task : _tasks) {
2455
0
        for (const auto& task : _task) {
2456
0
            auto set = task.first->runtime_state()->get_deregister_runtime_filter();
2457
0
            result.merge(set);
2458
0
        }
2459
0
    }
2460
0
    if (_runtime_state) {
2461
0
        auto set = _runtime_state->get_deregister_runtime_filter();
2462
0
        result.merge(set);
2463
0
    }
2464
0
    return result;
2465
0
}
2466
2467
108k
void PipelineFragmentContext::_release_resource() {
2468
108k
    std::lock_guard<std::mutex> l(_task_mutex);
2469
    // The memory released by the query end is recorded in the query mem tracker.
2470
108k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
2471
108k
    auto st = _query_ctx->exec_status();
2472
222k
    for (auto& _task : _tasks) {
2473
222k
        if (!_task.empty()) {
2474
222k
            _call_back(_task.front().first->runtime_state(), &st);
2475
222k
        }
2476
222k
    }
2477
108k
    _tasks.clear();
2478
108k
    _dag.clear();
2479
108k
    _pip_id_to_pipeline.clear();
2480
108k
    _pipelines.clear();
2481
108k
    _sink.reset();
2482
108k
    _root_op.reset();
2483
108k
    _runtime_filter_mgr_map.clear();
2484
108k
    _op_id_to_shared_state.clear();
2485
108k
}
2486
2487
} // namespace doris