Coverage Report

Created: 2026-04-22 11:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/pipeline/pipeline_fragment_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/pipeline/pipeline_fragment_context.h"
19
20
#include <gen_cpp/DataSinks_types.h>
21
#include <gen_cpp/FrontendService.h>
22
#include <gen_cpp/FrontendService_types.h>
23
#include <gen_cpp/PaloInternalService_types.h>
24
#include <gen_cpp/PlanNodes_types.h>
25
#include <pthread.h>
26
27
#include <algorithm>
28
#include <cstdlib>
29
// IWYU pragma: no_include <bits/chrono.h>
30
#include <fmt/format.h>
31
#include <thrift/Thrift.h>
32
#include <thrift/protocol/TDebugProtocol.h>
33
#include <thrift/transport/TTransportException.h>
34
35
#include <chrono> // IWYU pragma: keep
36
#include <map>
37
#include <memory>
38
#include <ostream>
39
#include <utility>
40
41
#include "cloud/config.h"
42
#include "common/cast_set.h"
43
#include "common/config.h"
44
#include "common/exception.h"
45
#include "common/logging.h"
46
#include "common/status.h"
47
#include "exec/exchange/local_exchange_sink_operator.h"
48
#include "exec/exchange/local_exchange_source_operator.h"
49
#include "exec/exchange/local_exchanger.h"
50
#include "exec/exchange/vdata_stream_mgr.h"
51
#include "exec/operator/aggregation_sink_operator.h"
52
#include "exec/operator/aggregation_source_operator.h"
53
#include "exec/operator/analytic_sink_operator.h"
54
#include "exec/operator/analytic_source_operator.h"
55
#include "exec/operator/assert_num_rows_operator.h"
56
#include "exec/operator/blackhole_sink_operator.h"
57
#include "exec/operator/bucketed_aggregation_sink_operator.h"
58
#include "exec/operator/bucketed_aggregation_source_operator.h"
59
#include "exec/operator/cache_sink_operator.h"
60
#include "exec/operator/cache_source_operator.h"
61
#include "exec/operator/datagen_operator.h"
62
#include "exec/operator/dict_sink_operator.h"
63
#include "exec/operator/distinct_streaming_aggregation_operator.h"
64
#include "exec/operator/empty_set_operator.h"
65
#include "exec/operator/es_scan_operator.h"
66
#include "exec/operator/exchange_sink_operator.h"
67
#include "exec/operator/exchange_source_operator.h"
68
#include "exec/operator/file_scan_operator.h"
69
#include "exec/operator/group_commit_block_sink_operator.h"
70
#include "exec/operator/group_commit_scan_operator.h"
71
#include "exec/operator/hashjoin_build_sink.h"
72
#include "exec/operator/hashjoin_probe_operator.h"
73
#include "exec/operator/hive_table_sink_operator.h"
74
#include "exec/operator/iceberg_delete_sink_operator.h"
75
#include "exec/operator/iceberg_merge_sink_operator.h"
76
#include "exec/operator/iceberg_table_sink_operator.h"
77
#include "exec/operator/jdbc_scan_operator.h"
78
#include "exec/operator/jdbc_table_sink_operator.h"
79
#include "exec/operator/local_merge_sort_source_operator.h"
80
#include "exec/operator/materialization_opertor.h"
81
#include "exec/operator/maxcompute_table_sink_operator.h"
82
#include "exec/operator/memory_scratch_sink_operator.h"
83
#include "exec/operator/meta_scan_operator.h"
84
#include "exec/operator/multi_cast_data_stream_sink.h"
85
#include "exec/operator/multi_cast_data_stream_source.h"
86
#include "exec/operator/nested_loop_join_build_operator.h"
87
#include "exec/operator/nested_loop_join_probe_operator.h"
88
#include "exec/operator/olap_scan_operator.h"
89
#include "exec/operator/olap_table_sink_operator.h"
90
#include "exec/operator/olap_table_sink_v2_operator.h"
91
#include "exec/operator/partition_sort_sink_operator.h"
92
#include "exec/operator/partition_sort_source_operator.h"
93
#include "exec/operator/partitioned_aggregation_sink_operator.h"
94
#include "exec/operator/partitioned_aggregation_source_operator.h"
95
#include "exec/operator/partitioned_hash_join_probe_operator.h"
96
#include "exec/operator/partitioned_hash_join_sink_operator.h"
97
#include "exec/operator/rec_cte_anchor_sink_operator.h"
98
#include "exec/operator/rec_cte_scan_operator.h"
99
#include "exec/operator/rec_cte_sink_operator.h"
100
#include "exec/operator/rec_cte_source_operator.h"
101
#include "exec/operator/repeat_operator.h"
102
#include "exec/operator/result_file_sink_operator.h"
103
#include "exec/operator/result_sink_operator.h"
104
#include "exec/operator/schema_scan_operator.h"
105
#include "exec/operator/select_operator.h"
106
#include "exec/operator/set_probe_sink_operator.h"
107
#include "exec/operator/set_sink_operator.h"
108
#include "exec/operator/set_source_operator.h"
109
#include "exec/operator/sort_sink_operator.h"
110
#include "exec/operator/sort_source_operator.h"
111
#include "exec/operator/spill_iceberg_table_sink_operator.h"
112
#include "exec/operator/spill_sort_sink_operator.h"
113
#include "exec/operator/spill_sort_source_operator.h"
114
#include "exec/operator/streaming_aggregation_operator.h"
115
#include "exec/operator/table_function_operator.h"
116
#include "exec/operator/tvf_table_sink_operator.h"
117
#include "exec/operator/union_sink_operator.h"
118
#include "exec/operator/union_source_operator.h"
119
#include "exec/pipeline/dependency.h"
120
#include "exec/pipeline/pipeline_task.h"
121
#include "exec/pipeline/task_scheduler.h"
122
#include "exec/runtime_filter/runtime_filter_mgr.h"
123
#include "exec/sort/topn_sorter.h"
124
#include "exec/spill/spill_file.h"
125
#include "io/fs/stream_load_pipe.h"
126
#include "load/stream_load/new_load_stream_mgr.h"
127
#include "runtime/exec_env.h"
128
#include "runtime/fragment_mgr.h"
129
#include "runtime/result_buffer_mgr.h"
130
#include "runtime/runtime_state.h"
131
#include "runtime/thread_context.h"
132
#include "service/backend_options.h"
133
#include "util/client_cache.h"
134
#include "util/countdown_latch.h"
135
#include "util/debug_util.h"
136
#include "util/network_util.h"
137
#include "util/uid_util.h"
138
139
namespace doris {
140
PipelineFragmentContext::PipelineFragmentContext(
141
        TUniqueId query_id, const TPipelineFragmentParams& request,
142
        std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
143
        const std::function<void(RuntimeState*, Status*)>& call_back)
144
431k
        : _query_id(std::move(query_id)),
145
431k
          _fragment_id(request.fragment_id),
146
431k
          _exec_env(exec_env),
147
431k
          _query_ctx(std::move(query_ctx)),
148
431k
          _call_back(call_back),
149
431k
          _is_report_on_cancel(true),
150
431k
          _params(request),
151
431k
          _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0),
152
431k
          _need_notify_close(request.__isset.need_notify_close ? request.need_notify_close
153
431k
                                                               : false) {
154
431k
    _fragment_watcher.start();
155
431k
}
156
157
431k
PipelineFragmentContext::~PipelineFragmentContext() {
158
431k
    LOG_INFO("PipelineFragmentContext::~PipelineFragmentContext")
159
431k
            .tag("query_id", print_id(_query_id))
160
431k
            .tag("fragment_id", _fragment_id);
161
431k
    _release_resource();
162
431k
    {
163
        // The memory released by the query end is recorded in the query mem tracker.
164
431k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
165
431k
        _runtime_state.reset();
166
431k
        _query_ctx.reset();
167
431k
    }
168
431k
}
169
170
19
bool PipelineFragmentContext::is_timeout(timespec now) const {
171
19
    if (_timeout <= 0) {
172
0
        return false;
173
0
    }
174
19
    return _fragment_watcher.elapsed_time_seconds(now) > _timeout;
175
19
}
176
177
// notify_close() transitions the PFC from "waiting for external close notification" to
178
// "self-managed close". For recursive CTE fragments, the old PFC is kept alive until
179
// the rerun_fragment(wait_for_destroy) RPC calls this to trigger shutdown.
180
// Returns true if all tasks have already closed (i.e., the PFC can be safely destroyed).
181
9.16k
bool PipelineFragmentContext::notify_close() {
182
9.16k
    bool all_closed = false;
183
9.16k
    bool need_remove = false;
184
9.16k
    {
185
9.16k
        std::lock_guard<std::mutex> l(_task_mutex);
186
9.16k
        if (_closed_tasks >= _total_tasks) {
187
3.47k
            if (_need_notify_close) {
188
                // Fragment was cancelled and waiting for notify to close.
189
                // Record that we need to remove from fragment mgr, but do it
190
                // after releasing _task_mutex to avoid ABBA deadlock with
191
                // dump_pipeline_tasks() (which acquires _pipeline_map lock
192
                // first, then _task_mutex via debug_string()).
193
3.40k
                need_remove = true;
194
3.40k
            }
195
3.47k
            all_closed = true;
196
3.47k
        }
197
        // make fragment release by self after cancel
198
9.16k
        _need_notify_close = false;
199
9.16k
    }
200
9.16k
    if (need_remove) {
201
3.40k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
202
3.40k
    }
203
9.16k
    return all_closed;
204
9.16k
}
205
206
// Must not add lock in this method. Because it will call query ctx cancel. And
207
// QueryCtx cancel will call fragment ctx cancel. And Also Fragment ctx's running
208
// Method like exchange sink buffer will call query ctx cancel. If we add lock here
209
// There maybe dead lock.
210
5.68k
void PipelineFragmentContext::cancel(const Status reason) {
211
5.68k
    LOG_INFO("PipelineFragmentContext::cancel")
212
5.68k
            .tag("query_id", print_id(_query_id))
213
5.68k
            .tag("fragment_id", _fragment_id)
214
5.68k
            .tag("reason", reason.to_string());
215
5.68k
    if (notify_close()) {
216
90
        return;
217
90
    }
218
    // Timeout is a special error code, we need print current stack to debug timeout issue.
219
5.59k
    if (reason.is<ErrorCode::TIMEOUT>()) {
220
1
        auto dbg_str = fmt::format("PipelineFragmentContext is cancelled due to timeout:\n{}",
221
1
                                   debug_string());
222
1
        LOG_LONG_STRING(WARNING, dbg_str);
223
1
    }
224
225
    // `ILLEGAL_STATE` means queries this fragment belongs to was not found in FE (maybe finished)
226
5.59k
    if (reason.is<ErrorCode::ILLEGAL_STATE>()) {
227
0
        LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}",
228
0
                    debug_string());
229
0
    }
230
231
5.59k
    if (reason.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || reason.is<ErrorCode::MEM_ALLOC_FAILED>()) {
232
42
        print_profile("cancel pipeline, reason: " + reason.to_string());
233
42
    }
234
235
5.59k
    if (auto error_url = get_load_error_url(); !error_url.empty()) {
236
24
        _query_ctx->set_load_error_url(error_url);
237
24
    }
238
239
5.59k
    if (auto first_error_msg = get_first_error_msg(); !first_error_msg.empty()) {
240
24
        _query_ctx->set_first_error_msg(first_error_msg);
241
24
    }
242
243
5.59k
    _query_ctx->cancel(reason, _fragment_id);
244
5.59k
    if (reason.is<ErrorCode::LIMIT_REACH>()) {
245
493
        _is_report_on_cancel = false;
246
5.10k
    } else {
247
16.0k
        for (auto& id : _fragment_instance_ids) {
248
16.0k
            LOG(WARNING) << "PipelineFragmentContext cancel instance: " << print_id(id);
249
16.0k
        }
250
5.10k
    }
251
    // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
252
    // For stream load the fragment's query_id == load id, it is set in FE.
253
5.59k
    auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
254
5.59k
    if (stream_load_ctx != nullptr) {
255
30
        stream_load_ctx->pipe->cancel(reason.to_string());
256
        // Set error URL here because after pipe is cancelled, stream load execution may return early.
257
        // We need to set the error URL at this point to ensure error information is properly
258
        // propagated to the client.
259
30
        stream_load_ctx->error_url = get_load_error_url();
260
30
        stream_load_ctx->first_error_msg = get_first_error_msg();
261
30
    }
262
263
18.2k
    for (auto& tasks : _tasks) {
264
41.1k
        for (auto& task : tasks) {
265
41.1k
            task.first->unblock_all_dependencies();
266
41.1k
        }
267
18.2k
    }
268
5.59k
}
269
270
664k
PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
271
664k
    PipelineId id = _next_pipeline_id++;
272
664k
    auto pipeline = std::make_shared<Pipeline>(
273
664k
            id, parent ? std::min(parent->num_tasks(), _num_instances) : _num_instances,
274
664k
            parent ? parent->num_tasks() : _num_instances);
275
664k
    if (idx >= 0) {
276
104k
        _pipelines.insert(_pipelines.begin() + idx, pipeline);
277
559k
    } else {
278
559k
        _pipelines.emplace_back(pipeline);
279
559k
    }
280
664k
    if (parent) {
281
228k
        parent->set_children(pipeline);
282
228k
    }
283
664k
    return pipeline;
284
664k
}
285
286
430k
Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thread_pool) {
287
430k
    {
288
430k
        SCOPED_TIMER(_build_pipelines_timer);
289
        // 2. Build pipelines with operators in this fragment.
290
430k
        auto root_pipeline = add_pipeline();
291
430k
        RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl,
292
430k
                                         &_root_op, root_pipeline));
293
294
        // 3. Create sink operator
295
430k
        if (!_params.fragment.__isset.output_sink) {
296
0
            return Status::InternalError("No output sink in this fragment!");
297
0
        }
298
430k
        RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), _params.fragment.output_sink,
299
430k
                                          _params.fragment.output_exprs, _params,
300
430k
                                          root_pipeline->output_row_desc(), _runtime_state.get(),
301
430k
                                          *_desc_tbl, root_pipeline->id()));
302
430k
        RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink));
303
430k
        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
304
305
559k
        for (PipelinePtr& pipeline : _pipelines) {
306
559k
            DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size();
307
559k
            RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
308
559k
        }
309
430k
    }
310
    // 4. Build local exchanger
311
430k
    if (_runtime_state->enable_local_shuffle()) {
312
428k
        SCOPED_TIMER(_plan_local_exchanger_timer);
313
428k
        RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets,
314
428k
                                             _params.bucket_seq_to_instance_idx,
315
428k
                                             _params.shuffle_idx_to_instance_idx));
316
428k
    }
317
318
    // 5. Initialize global states in pipelines.
319
666k
    for (PipelinePtr& pipeline : _pipelines) {
320
666k
        SCOPED_TIMER(_prepare_all_pipelines_timer);
321
666k
        pipeline->children().clear();
322
666k
        RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
323
666k
    }
324
325
429k
    {
326
429k
        SCOPED_TIMER(_build_tasks_timer);
327
        // 6. Build pipeline tasks and initialize local state.
328
429k
        RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool));
329
429k
    }
330
331
429k
    return Status::OK();
332
429k
}
333
334
430k
Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) {
335
430k
    if (_prepared) {
336
0
        return Status::InternalError("Already prepared");
337
0
    }
338
431k
    if (_params.__isset.query_options && _params.query_options.__isset.execution_timeout) {
339
431k
        _timeout = _params.query_options.execution_timeout;
340
431k
    }
341
342
430k
    _fragment_level_profile = std::make_unique<RuntimeProfile>("PipelineContext");
343
430k
    _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime");
344
430k
    SCOPED_TIMER(_prepare_timer);
345
430k
    _build_pipelines_timer = ADD_TIMER(_fragment_level_profile, "BuildPipelinesTime");
346
430k
    _init_context_timer = ADD_TIMER(_fragment_level_profile, "InitContextTime");
347
430k
    _plan_local_exchanger_timer = ADD_TIMER(_fragment_level_profile, "PlanLocalLocalExchangerTime");
348
430k
    _build_tasks_timer = ADD_TIMER(_fragment_level_profile, "BuildTasksTime");
349
430k
    _prepare_all_pipelines_timer = ADD_TIMER(_fragment_level_profile, "PrepareAllPipelinesTime");
350
430k
    {
351
430k
        SCOPED_TIMER(_init_context_timer);
352
430k
        cast_set(_num_instances, _params.local_params.size());
353
430k
        _total_instances =
354
430k
                _params.__isset.total_instances ? _params.total_instances : _num_instances;
355
356
430k
        auto* fragment_context = this;
357
358
430k
        if (_params.query_options.__isset.is_report_success) {
359
429k
            fragment_context->set_is_report_success(_params.query_options.is_report_success);
360
429k
        }
361
362
        // 1. Set up the global runtime state.
363
430k
        _runtime_state = RuntimeState::create_unique(
364
430k
                _params.query_id, _params.fragment_id, _params.query_options,
365
430k
                _query_ctx->query_globals, _exec_env, _query_ctx.get());
366
430k
        _runtime_state->set_task_execution_context(shared_from_this());
367
430k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
368
430k
        if (_params.__isset.backend_id) {
369
428k
            _runtime_state->set_backend_id(_params.backend_id);
370
428k
        }
371
430k
        if (_params.__isset.import_label) {
372
239
            _runtime_state->set_import_label(_params.import_label);
373
239
        }
374
430k
        if (_params.__isset.db_name) {
375
191
            _runtime_state->set_db_name(_params.db_name);
376
191
        }
377
430k
        if (_params.__isset.load_job_id) {
378
0
            _runtime_state->set_load_job_id(_params.load_job_id);
379
0
        }
380
381
430k
        if (_params.is_simplified_param) {
382
145k
            _desc_tbl = _query_ctx->desc_tbl;
383
284k
        } else {
384
284k
            DCHECK(_params.__isset.desc_tbl);
385
284k
            RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), _params.desc_tbl,
386
284k
                                                  &_desc_tbl));
387
284k
        }
388
430k
        _runtime_state->set_desc_tbl(_desc_tbl);
389
430k
        _runtime_state->set_num_per_fragment_instances(_params.num_senders);
390
430k
        _runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
391
430k
        _runtime_state->set_total_load_streams(_params.total_load_streams);
392
430k
        _runtime_state->set_num_local_sink(_params.num_local_sink);
393
394
        // init fragment_instance_ids
395
430k
        const auto target_size = _params.local_params.size();
396
430k
        _fragment_instance_ids.resize(target_size);
397
1.67M
        for (size_t i = 0; i < _params.local_params.size(); i++) {
398
1.24M
            auto fragment_instance_id = _params.local_params[i].fragment_instance_id;
399
1.24M
            _fragment_instance_ids[i] = fragment_instance_id;
400
1.24M
        }
401
430k
    }
402
403
430k
    RETURN_IF_ERROR(_build_and_prepare_full_pipeline(thread_pool));
404
405
429k
    _init_next_report_time();
406
407
429k
    _prepared = true;
408
429k
    return Status::OK();
409
430k
}
410
411
Status PipelineFragmentContext::_build_pipeline_tasks_for_instance(
412
        int instance_idx,
413
1.24M
        const std::vector<std::shared_ptr<RuntimeProfile>>& pipeline_id_to_profile) {
414
1.24M
    const auto& local_params = _params.local_params[instance_idx];
415
1.24M
    auto fragment_instance_id = local_params.fragment_instance_id;
416
1.24M
    auto runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(false);
417
1.24M
    std::map<PipelineId, PipelineTask*> pipeline_id_to_task;
418
1.24M
    auto get_shared_state = [&](PipelinePtr pipeline)
419
1.24M
            -> std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
420
2.04M
                                       std::vector<std::shared_ptr<Dependency>>>> {
421
2.04M
        std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
422
2.04M
                                std::vector<std::shared_ptr<Dependency>>>>
423
2.04M
                shared_state_map;
424
2.68M
        for (auto& op : pipeline->operators()) {
425
2.68M
            auto source_id = op->operator_id();
426
2.68M
            if (auto iter = _op_id_to_shared_state.find(source_id);
427
2.68M
                iter != _op_id_to_shared_state.end()) {
428
762k
                shared_state_map.insert({source_id, iter->second});
429
762k
            }
430
2.68M
        }
431
2.04M
        for (auto sink_to_source_id : pipeline->sink()->dests_id()) {
432
2.04M
            if (auto iter = _op_id_to_shared_state.find(sink_to_source_id);
433
2.04M
                iter != _op_id_to_shared_state.end()) {
434
290k
                shared_state_map.insert({sink_to_source_id, iter->second});
435
290k
            }
436
2.04M
        }
437
2.04M
        return shared_state_map;
438
2.04M
    };
439
440
3.74M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
441
2.50M
        auto& pipeline = _pipelines[pip_idx];
442
2.50M
        if (pipeline->num_tasks() > 1 || instance_idx == 0) {
443
2.03M
            auto task_runtime_state = RuntimeState::create_unique(
444
2.03M
                    local_params.fragment_instance_id, _params.query_id, _params.fragment_id,
445
2.03M
                    _params.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get());
446
2.03M
            {
447
                // Initialize runtime state for this task
448
2.03M
                task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker());
449
450
2.03M
                task_runtime_state->set_task_execution_context(shared_from_this());
451
2.03M
                task_runtime_state->set_be_number(local_params.backend_num);
452
453
2.04M
                if (_params.__isset.backend_id) {
454
2.04M
                    task_runtime_state->set_backend_id(_params.backend_id);
455
2.04M
                }
456
2.03M
                if (_params.__isset.import_label) {
457
240
                    task_runtime_state->set_import_label(_params.import_label);
458
240
                }
459
2.03M
                if (_params.__isset.db_name) {
460
192
                    task_runtime_state->set_db_name(_params.db_name);
461
192
                }
462
2.03M
                if (_params.__isset.load_job_id) {
463
0
                    task_runtime_state->set_load_job_id(_params.load_job_id);
464
0
                }
465
2.03M
                if (_params.__isset.wal_id) {
466
114
                    task_runtime_state->set_wal_id(_params.wal_id);
467
114
                }
468
2.03M
                if (_params.__isset.content_length) {
469
31
                    task_runtime_state->set_content_length(_params.content_length);
470
31
                }
471
472
2.03M
                task_runtime_state->set_desc_tbl(_desc_tbl);
473
2.03M
                task_runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
474
2.03M
                task_runtime_state->set_num_per_fragment_instances(_params.num_senders);
475
2.03M
                task_runtime_state->resize_op_id_to_local_state(max_operator_id());
476
2.03M
                task_runtime_state->set_max_operator_id(max_operator_id());
477
2.03M
                task_runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
478
2.03M
                task_runtime_state->set_total_load_streams(_params.total_load_streams);
479
2.03M
                task_runtime_state->set_num_local_sink(_params.num_local_sink);
480
481
2.03M
                task_runtime_state->set_runtime_filter_mgr(runtime_filter_mgr.get());
482
2.03M
            }
483
2.03M
            auto cur_task_id = _total_tasks++;
484
2.03M
            task_runtime_state->set_task_id(cur_task_id);
485
2.03M
            task_runtime_state->set_task_num(pipeline->num_tasks());
486
2.03M
            auto task = std::make_shared<PipelineTask>(
487
2.03M
                    pipeline, cur_task_id, task_runtime_state.get(),
488
2.03M
                    std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
489
2.03M
                    pipeline_id_to_profile[pip_idx].get(), get_shared_state(pipeline),
490
2.03M
                    instance_idx);
491
2.03M
            pipeline->incr_created_tasks(instance_idx, task.get());
492
2.03M
            pipeline_id_to_task.insert({pipeline->id(), task.get()});
493
2.03M
            _tasks[instance_idx].emplace_back(
494
2.03M
                    std::pair<std::shared_ptr<PipelineTask>, std::unique_ptr<RuntimeState>> {
495
2.03M
                            std::move(task), std::move(task_runtime_state)});
496
2.03M
        }
497
2.50M
    }
498
499
    /**
500
         * Build DAG for pipeline tasks.
501
         * For example, we have
502
         *
503
         *   ExchangeSink (Pipeline1)     JoinBuildSink (Pipeline2)
504
         *            \                      /
505
         *          JoinProbeOperator1 (Pipeline1)    JoinBuildSink (Pipeline3)
506
         *                 \                          /
507
         *               JoinProbeOperator2 (Pipeline1)
508
         *
509
         * In this fragment, we have three pipelines and pipeline 1 depends on pipeline 2 and pipeline 3.
510
         * To build this DAG, `_dag` manage dependencies between pipelines by pipeline ID and
511
         * `pipeline_id_to_task` is used to find the task by a unique pipeline ID.
512
         *
513
         * Finally, we have two upstream dependencies in Pipeline1 corresponding to JoinProbeOperator1
514
         * and JoinProbeOperator2.
515
         */
516
2.50M
    for (auto& _pipeline : _pipelines) {
517
2.50M
        if (pipeline_id_to_task.contains(_pipeline->id())) {
518
2.03M
            auto* task = pipeline_id_to_task[_pipeline->id()];
519
2.03M
            DCHECK(task != nullptr);
520
521
            // If this task has upstream dependency, then inject it into this task.
522
2.03M
            if (_dag.contains(_pipeline->id())) {
523
1.26M
                auto& deps = _dag[_pipeline->id()];
524
2.00M
                for (auto& dep : deps) {
525
2.00M
                    if (pipeline_id_to_task.contains(dep)) {
526
1.06M
                        auto ss = pipeline_id_to_task[dep]->get_sink_shared_state();
527
1.06M
                        if (ss) {
528
493k
                            task->inject_shared_state(ss);
529
572k
                        } else {
530
572k
                            pipeline_id_to_task[dep]->inject_shared_state(
531
572k
                                    task->get_source_shared_state());
532
572k
                        }
533
1.06M
                    }
534
2.00M
                }
535
1.26M
            }
536
2.03M
        }
537
2.50M
    }
538
3.75M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
539
2.50M
        if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
540
2.03M
            auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
541
2.03M
            DCHECK(pipeline_id_to_profile[pip_idx]);
542
2.03M
            std::vector<TScanRangeParams> scan_ranges;
543
2.03M
            auto node_id = _pipelines[pip_idx]->operators().front()->node_id();
544
2.03M
            if (local_params.per_node_scan_ranges.contains(node_id)) {
545
355k
                scan_ranges = local_params.per_node_scan_ranges.find(node_id)->second;
546
355k
            }
547
2.03M
            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(task->prepare(scan_ranges, local_params.sender_id,
548
2.03M
                                                             _params.fragment.output_sink));
549
2.03M
        }
550
2.50M
    }
551
1.25M
    {
552
1.25M
        std::lock_guard<std::mutex> l(_state_map_lock);
553
1.25M
        _runtime_filter_mgr_map[instance_idx] = std::move(runtime_filter_mgr);
554
1.25M
    }
555
1.25M
    return Status::OK();
556
1.24M
}
557
558
430k
Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
559
430k
    _total_tasks = 0;
560
430k
    _closed_tasks = 0;
561
430k
    const auto target_size = _params.local_params.size();
562
430k
    _tasks.resize(target_size);
563
430k
    _runtime_filter_mgr_map.resize(target_size);
564
1.09M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
565
664k
        _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get();
566
664k
    }
567
430k
    auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size());
568
569
430k
    if (target_size > 1 &&
570
430k
        (_runtime_state->query_options().__isset.parallel_prepare_threshold &&
571
137k
         target_size > _runtime_state->query_options().parallel_prepare_threshold)) {
572
        // If instances parallelism is big enough ( > parallel_prepare_threshold), we will prepare all tasks by multi-threads
573
22.1k
        std::vector<Status> prepare_status(target_size);
574
22.1k
        int submitted_tasks = 0;
575
22.1k
        Status submit_status;
576
22.1k
        CountDownLatch latch((int)target_size);
577
311k
        for (int i = 0; i < target_size; i++) {
578
289k
            submit_status = thread_pool->submit_func([&, i]() {
579
289k
                SCOPED_ATTACH_TASK(_query_ctx.get());
580
289k
                prepare_status[i] = _build_pipeline_tasks_for_instance(i, pipeline_id_to_profile);
581
289k
                latch.count_down();
582
289k
            });
583
289k
            if (LIKELY(submit_status.ok())) {
584
289k
                submitted_tasks++;
585
18.4E
            } else {
586
18.4E
                break;
587
18.4E
            }
588
289k
        }
589
22.1k
        latch.arrive_and_wait(target_size - submitted_tasks);
590
22.1k
        if (UNLIKELY(!submit_status.ok())) {
591
0
            return submit_status;
592
0
        }
593
311k
        for (int i = 0; i < submitted_tasks; i++) {
594
289k
            if (!prepare_status[i].ok()) {
595
0
                return prepare_status[i];
596
0
            }
597
289k
        }
598
407k
    } else {
599
1.36M
        for (int i = 0; i < target_size; i++) {
600
957k
            RETURN_IF_ERROR(_build_pipeline_tasks_for_instance(i, pipeline_id_to_profile));
601
957k
        }
602
407k
    }
603
430k
    _pipeline_parent_map.clear();
604
430k
    _op_id_to_shared_state.clear();
605
606
430k
    return Status::OK();
607
430k
}
608
609
428k
void PipelineFragmentContext::_init_next_report_time() {
610
428k
    auto interval_s = config::pipeline_status_report_interval;
611
428k
    if (_is_report_success && interval_s > 0 && _timeout > interval_s) {
612
41.3k
        VLOG_FILE << "enable period report: fragment id=" << _fragment_id;
613
41.3k
        uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC;
614
        // We don't want to wait longer than it takes to run the entire fragment.
615
41.3k
        _previous_report_time =
616
41.3k
                MonotonicNanos() + report_fragment_offset - (uint64_t)(interval_s)*NANOS_PER_SEC;
617
41.3k
        _disable_period_report = false;
618
41.3k
    }
619
428k
}
620
621
4.69k
void PipelineFragmentContext::refresh_next_report_time() {
622
4.69k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
623
4.69k
    DCHECK(disable == true);
624
4.69k
    _previous_report_time.store(MonotonicNanos(), std::memory_order_release);
625
4.69k
    _disable_period_report.compare_exchange_strong(disable, false);
626
4.69k
}
627
628
7.37M
void PipelineFragmentContext::trigger_report_if_necessary() {
629
7.37M
    if (!_is_report_success) {
630
6.82M
        return;
631
6.82M
    }
632
552k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
633
552k
    if (disable) {
634
10.9k
        return;
635
10.9k
    }
636
541k
    int32_t interval_s = config::pipeline_status_report_interval;
637
541k
    if (interval_s <= 0) {
638
0
        LOG(WARNING) << "config::status_report_interval is equal to or less than zero, do not "
639
0
                        "trigger "
640
0
                        "report.";
641
0
    }
642
541k
    uint64_t next_report_time = _previous_report_time.load(std::memory_order_acquire) +
643
541k
                                (uint64_t)(interval_s)*NANOS_PER_SEC;
644
541k
    if (MonotonicNanos() > next_report_time) {
645
4.70k
        if (!_disable_period_report.compare_exchange_strong(disable, true,
646
4.70k
                                                            std::memory_order_acq_rel)) {
647
8
            return;
648
8
        }
649
4.69k
        if (VLOG_FILE_IS_ON) {
650
0
            VLOG_FILE << "Reporting "
651
0
                      << "profile for query_id " << print_id(_query_id)
652
0
                      << ", fragment id: " << _fragment_id;
653
654
0
            std::stringstream ss;
655
0
            _runtime_state->runtime_profile()->compute_time_in_profile();
656
0
            _runtime_state->runtime_profile()->pretty_print(&ss);
657
0
            if (_runtime_state->load_channel_profile()) {
658
0
                _runtime_state->load_channel_profile()->pretty_print(&ss);
659
0
            }
660
661
0
            VLOG_FILE << "Query " << print_id(get_query_id()) << " fragment " << get_fragment_id()
662
0
                      << " profile:\n"
663
0
                      << ss.str();
664
0
        }
665
4.69k
        auto st = send_report(false);
666
4.69k
        if (!st.ok()) {
667
0
            disable = true;
668
0
            _disable_period_report.compare_exchange_strong(disable, false,
669
0
                                                           std::memory_order_acq_rel);
670
0
        }
671
4.69k
    }
672
541k
}
673
674
Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const DescriptorTbl& descs,
675
427k
                                                 OperatorPtr* root, PipelinePtr cur_pipe) {
676
427k
    if (_params.fragment.plan.nodes.empty()) {
677
0
        throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no plan node!");
678
0
    }
679
680
427k
    int node_idx = 0;
681
682
427k
    RETURN_IF_ERROR(_create_tree_helper(pool, _params.fragment.plan.nodes, descs, nullptr,
683
427k
                                        &node_idx, root, cur_pipe, 0, false, false));
684
685
427k
    if (node_idx + 1 != _params.fragment.plan.nodes.size()) {
686
0
        return Status::InternalError(
687
0
                "Plan tree only partially reconstructed. Not all thrift nodes were used.");
688
0
    }
689
427k
    return Status::OK();
690
427k
}
691
692
Status PipelineFragmentContext::_create_tree_helper(
693
        ObjectPool* pool, const std::vector<TPlanNode>& tnodes, const DescriptorTbl& descs,
694
        OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx,
695
662k
        const bool followed_by_shuffled_operator, const bool require_bucket_distribution) {
696
    // propagate error case
697
662k
    if (*node_idx >= tnodes.size()) {
698
0
        return Status::InternalError(
699
0
                "Failed to reconstruct plan tree from thrift. Node id: {}, number of nodes: {}",
700
0
                *node_idx, tnodes.size());
701
0
    }
702
662k
    const TPlanNode& tnode = tnodes[*node_idx];
703
704
662k
    int num_children = tnodes[*node_idx].num_children;
705
662k
    bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
706
662k
    bool current_require_bucket_distribution = require_bucket_distribution;
707
    // TODO: Create CacheOperator is confused now
708
662k
    OperatorPtr op = nullptr;
709
662k
    OperatorPtr cache_op = nullptr;
710
662k
    RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op, cur_pipe,
711
662k
                                     parent == nullptr ? -1 : parent->node_id(), child_idx,
712
662k
                                     followed_by_shuffled_operator,
713
662k
                                     current_require_bucket_distribution, cache_op));
714
    // Initialization must be done here. For example, group by expressions in agg will be used to
715
    // decide if a local shuffle should be planed, so it must be initialized here.
716
662k
    RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
717
    // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
718
662k
    if (parent != nullptr) {
719
        // add to parent's child(s)
720
236k
        RETURN_IF_ERROR(parent->set_child(cache_op ? cache_op : op));
721
426k
    } else {
722
426k
        *root = op;
723
426k
    }
724
    /**
725
     * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
726
     *
727
     * For plan:
728
     * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
729
     *                           Exchange(id=3) -> ShuffledHashJoinBuild(id=2)
730
     * We must ensure data distribution of `LocalExchange(id=0)` is same as Exchange(id=3).
731
     *
732
     * If an operator's is followed by a local exchange without shuffle (e.g. passthrough), a
733
     * shuffled local exchanger will be used before join so it is not followed by shuffle join.
734
     */
735
662k
    auto required_data_distribution =
736
662k
            cur_pipe->operators().empty()
737
662k
                    ? cur_pipe->sink()->required_data_distribution(_runtime_state.get())
738
662k
                    : op->required_data_distribution(_runtime_state.get());
739
662k
    current_followed_by_shuffled_operator =
740
662k
            ((followed_by_shuffled_operator ||
741
662k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_shuffled_operator()
742
607k
                                             : op->is_shuffled_operator())) &&
743
662k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
744
662k
            (followed_by_shuffled_operator &&
745
554k
             required_data_distribution.distribution_type == ExchangeType::NOOP);
746
747
662k
    current_require_bucket_distribution =
748
662k
            ((require_bucket_distribution ||
749
662k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_colocated_operator()
750
611k
                                             : op->is_colocated_operator())) &&
751
662k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
752
662k
            (require_bucket_distribution &&
753
560k
             required_data_distribution.distribution_type == ExchangeType::NOOP);
754
755
662k
    if (num_children == 0) {
756
444k
        _use_serial_source = op->is_serial_operator();
757
444k
    }
758
    // rely on that tnodes is preorder of the plan
759
898k
    for (int i = 0; i < num_children; i++) {
760
236k
        ++*node_idx;
761
236k
        RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, descs, op, node_idx, nullptr, cur_pipe, i,
762
236k
                                            current_followed_by_shuffled_operator,
763
236k
                                            current_require_bucket_distribution));
764
765
        // we are expecting a child, but have used all nodes
766
        // this means we have been given a bad tree and must fail
767
236k
        if (*node_idx >= tnodes.size()) {
768
0
            return Status::InternalError(
769
0
                    "Failed to reconstruct plan tree from thrift. Node id: {}, number of "
770
0
                    "nodes: {}",
771
0
                    *node_idx, tnodes.size());
772
0
        }
773
236k
    }
774
775
662k
    return Status::OK();
776
662k
}
777
778
void PipelineFragmentContext::_inherit_pipeline_properties(
779
        const DataDistribution& data_distribution, PipelinePtr pipe_with_source,
780
104k
        PipelinePtr pipe_with_sink) {
781
104k
    pipe_with_sink->set_num_tasks(pipe_with_source->num_tasks());
782
104k
    pipe_with_source->set_num_tasks(_num_instances);
783
104k
    pipe_with_source->set_data_distribution(data_distribution);
784
104k
}
785
786
Status PipelineFragmentContext::_add_local_exchange_impl(
787
        int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip,
788
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
789
        const std::map<int, int>& bucket_seq_to_instance_idx,
790
104k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
791
104k
    auto& operators = cur_pipe->operators();
792
104k
    const auto downstream_pipeline_id = cur_pipe->id();
793
104k
    auto local_exchange_id = next_operator_id();
794
    // 1. Create a new pipeline with local exchange sink.
795
104k
    DataSinkOperatorPtr sink;
796
104k
    auto sink_id = next_sink_operator_id();
797
798
    /**
799
     * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
800
     * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
801
     */
802
104k
    const bool followed_by_shuffled_operator =
803
104k
            operators.size() > idx ? operators[idx]->followed_by_shuffled_operator()
804
104k
                                   : cur_pipe->sink()->followed_by_shuffled_operator();
805
104k
    const bool use_global_hash_shuffle = bucket_seq_to_instance_idx.empty() &&
806
104k
                                         !shuffle_idx_to_instance_idx.contains(-1) &&
807
104k
                                         followed_by_shuffled_operator && !_use_serial_source;
808
104k
    sink = std::make_shared<LocalExchangeSinkOperatorX>(
809
104k
            sink_id, local_exchange_id, use_global_hash_shuffle ? _total_instances : _num_instances,
810
104k
            data_distribution.partition_exprs, bucket_seq_to_instance_idx);
811
104k
    if (bucket_seq_to_instance_idx.empty() &&
812
104k
        data_distribution.distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE) {
813
7
        data_distribution.distribution_type = ExchangeType::HASH_SHUFFLE;
814
7
    }
815
104k
    RETURN_IF_ERROR(new_pip->set_sink(sink));
816
104k
    RETURN_IF_ERROR(new_pip->sink()->init(_runtime_state.get(), data_distribution.distribution_type,
817
104k
                                          num_buckets, use_global_hash_shuffle,
818
104k
                                          shuffle_idx_to_instance_idx));
819
820
    // 2. Create and initialize LocalExchangeSharedState.
821
104k
    std::shared_ptr<LocalExchangeSharedState> shared_state =
822
104k
            LocalExchangeSharedState::create_shared(_num_instances);
823
104k
    switch (data_distribution.distribution_type) {
824
8.89k
    case ExchangeType::HASH_SHUFFLE:
825
8.89k
        shared_state->exchanger = ShuffleExchanger::create_unique(
826
8.89k
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
827
8.89k
                use_global_hash_shuffle ? _total_instances : _num_instances,
828
8.89k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
829
8.89k
                        ? cast_set<int>(
830
8.89k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
831
18.4E
                        : 0);
832
8.89k
        break;
833
439
    case ExchangeType::BUCKET_HASH_SHUFFLE:
834
439
        shared_state->exchanger = BucketShuffleExchanger::create_unique(
835
439
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, num_buckets,
836
439
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
837
439
                        ? cast_set<int>(
838
439
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
839
439
                        : 0);
840
439
        break;
841
91.6k
    case ExchangeType::PASSTHROUGH:
842
91.6k
        shared_state->exchanger = PassthroughExchanger::create_unique(
843
91.6k
                cur_pipe->num_tasks(), _num_instances,
844
91.6k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
845
91.6k
                        ? cast_set<int>(
846
91.5k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
847
91.6k
                        : 0);
848
91.6k
        break;
849
429
    case ExchangeType::BROADCAST:
850
429
        shared_state->exchanger = BroadcastExchanger::create_unique(
851
429
                cur_pipe->num_tasks(), _num_instances,
852
429
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
853
429
                        ? cast_set<int>(
854
429
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
855
429
                        : 0);
856
429
        break;
857
2.59k
    case ExchangeType::PASS_TO_ONE:
858
2.59k
        if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
859
            // If shared hash table is enabled for BJ, hash table will be built by only one task
860
1.53k
            shared_state->exchanger = PassToOneExchanger::create_unique(
861
1.53k
                    cur_pipe->num_tasks(), _num_instances,
862
1.53k
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
863
1.53k
                            ? cast_set<int>(_runtime_state->query_options()
864
1.53k
                                                    .local_exchange_free_blocks_limit)
865
1.53k
                            : 0);
866
1.53k
        } else {
867
1.05k
            shared_state->exchanger = BroadcastExchanger::create_unique(
868
1.05k
                    cur_pipe->num_tasks(), _num_instances,
869
1.05k
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
870
1.05k
                            ? cast_set<int>(_runtime_state->query_options()
871
1.05k
                                                    .local_exchange_free_blocks_limit)
872
1.05k
                            : 0);
873
1.05k
        }
874
2.59k
        break;
875
773
    case ExchangeType::ADAPTIVE_PASSTHROUGH:
876
773
        shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
877
773
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
878
773
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
879
773
                        ? cast_set<int>(
880
773
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
881
773
                        : 0);
882
773
        break;
883
0
    default:
884
0
        return Status::InternalError("Unsupported local exchange type : " +
885
0
                                     std::to_string((int)data_distribution.distribution_type));
886
104k
    }
887
105k
    shared_state->create_source_dependencies(_num_instances, local_exchange_id, local_exchange_id,
888
105k
                                             "LOCAL_EXCHANGE_OPERATOR");
889
105k
    shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
890
105k
    _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
891
892
    // 3. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to
893
    // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink].
894
895
    // 3.1 Initialize new pipeline's operator list.
896
105k
    std::copy(operators.begin(), operators.begin() + idx,
897
105k
              std::inserter(new_pip->operators(), new_pip->operators().end()));
898
899
    // 3.2 Erase unused operators in previous pipeline.
900
105k
    operators.erase(operators.begin(), operators.begin() + idx);
901
902
    // 4. Initialize LocalExchangeSource and insert it into this pipeline.
903
105k
    OperatorPtr source_op;
904
105k
    source_op = std::make_shared<LocalExchangeSourceOperatorX>(pool, local_exchange_id);
905
105k
    RETURN_IF_ERROR(source_op->set_child(new_pip->operators().back()));
906
105k
    RETURN_IF_ERROR(source_op->init(data_distribution.distribution_type));
907
105k
    if (!operators.empty()) {
908
44.3k
        RETURN_IF_ERROR(operators.front()->set_child(nullptr));
909
44.3k
        RETURN_IF_ERROR(operators.front()->set_child(source_op));
910
44.3k
    }
911
105k
    operators.insert(operators.begin(), source_op);
912
913
    // 5. Set children for two pipelines separately.
914
105k
    std::vector<std::shared_ptr<Pipeline>> new_children;
915
105k
    std::vector<PipelineId> edges_with_source;
916
121k
    for (auto child : cur_pipe->children()) {
917
121k
        bool found = false;
918
134k
        for (auto op : new_pip->operators()) {
919
134k
            if (child->sink()->node_id() == op->node_id()) {
920
11.3k
                new_pip->set_children(child);
921
11.3k
                found = true;
922
11.3k
            };
923
134k
        }
924
121k
        if (!found) {
925
109k
            new_children.push_back(child);
926
109k
            edges_with_source.push_back(child->id());
927
109k
        }
928
121k
    }
929
105k
    new_children.push_back(new_pip);
930
105k
    edges_with_source.push_back(new_pip->id());
931
932
    // 6. Set DAG for new pipelines.
933
105k
    if (!new_pip->children().empty()) {
934
6.71k
        std::vector<PipelineId> edges_with_sink;
935
11.3k
        for (auto child : new_pip->children()) {
936
11.3k
            edges_with_sink.push_back(child->id());
937
11.3k
        }
938
6.71k
        _dag.insert({new_pip->id(), edges_with_sink});
939
6.71k
    }
940
105k
    cur_pipe->set_children(new_children);
941
105k
    _dag[downstream_pipeline_id] = edges_with_source;
942
105k
    RETURN_IF_ERROR(new_pip->sink()->set_child(new_pip->operators().back()));
943
105k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(nullptr));
944
105k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(cur_pipe->operators().back()));
945
946
    // 7. Inherit properties from current pipeline.
947
105k
    _inherit_pipeline_properties(data_distribution, cur_pipe, new_pip);
948
105k
    return Status::OK();
949
105k
}
950
951
Status PipelineFragmentContext::_add_local_exchange(
952
        int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe,
953
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
954
        const std::map<int, int>& bucket_seq_to_instance_idx,
955
185k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
956
185k
    if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) {
957
42.4k
        return Status::OK();
958
42.4k
    }
959
960
143k
    if (!cur_pipe->need_to_local_exchange(data_distribution, idx)) {
961
47.9k
        return Status::OK();
962
47.9k
    }
963
95.4k
    *do_local_exchange = true;
964
965
95.4k
    auto& operators = cur_pipe->operators();
966
95.4k
    auto total_op_num = operators.size();
967
95.4k
    auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
968
95.4k
    RETURN_IF_ERROR(_add_local_exchange_impl(
969
95.4k
            idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets,
970
95.4k
            bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
971
972
18.4E
    CHECK(total_op_num + 1 == cur_pipe->operators().size() + new_pip->operators().size())
973
18.4E
            << "total_op_num: " << total_op_num
974
18.4E
            << " cur_pipe->operators().size(): " << cur_pipe->operators().size()
975
18.4E
            << " new_pip->operators().size(): " << new_pip->operators().size();
976
977
    // There are some local shuffles with relatively heavy operations on the sink.
978
    // If the local sink concurrency is 1 and the local source concurrency is n, the sink becomes a bottleneck.
979
    // Therefore, local passthrough is used to increase the concurrency of the sink.
980
    // op -> local sink(1) -> local source (n)
981
    // op -> local passthrough(1) -> local passthrough(n) ->  local sink(n) -> local source (n)
982
95.5k
    if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 &&
983
95.4k
        Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) {
984
9.31k
        RETURN_IF_ERROR(_add_local_exchange_impl(
985
9.31k
                cast_set<int>(new_pip->operators().size()), pool, new_pip,
986
9.31k
                add_pipeline(new_pip, pip_idx + 2), DataDistribution(ExchangeType::PASSTHROUGH),
987
9.31k
                do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
988
9.31k
                shuffle_idx_to_instance_idx));
989
9.31k
    }
990
95.4k
    return Status::OK();
991
95.4k
}
992
993
Status PipelineFragmentContext::_plan_local_exchange(
994
        int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx,
995
426k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
996
983k
    for (int pip_idx = cast_set<int>(_pipelines.size()) - 1; pip_idx >= 0; pip_idx--) {
997
556k
        _pipelines[pip_idx]->init_data_distribution(_runtime_state.get());
998
        // Set property if child pipeline is not join operator's child.
999
556k
        if (!_pipelines[pip_idx]->children().empty()) {
1000
124k
            for (auto& child : _pipelines[pip_idx]->children()) {
1001
124k
                if (child->sink()->node_id() ==
1002
124k
                    _pipelines[pip_idx]->operators().front()->node_id()) {
1003
109k
                    _pipelines[pip_idx]->set_data_distribution(child->data_distribution());
1004
109k
                }
1005
124k
            }
1006
118k
        }
1007
1008
        // if 'num_buckets == 0' means the fragment is colocated by exchange node not the
1009
        // scan node. so here use `_num_instance` to replace the `num_buckets` to prevent dividing 0
1010
        // still keep colocate plan after local shuffle
1011
556k
        RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx, _pipelines[pip_idx],
1012
556k
                                             bucket_seq_to_instance_idx,
1013
556k
                                             shuffle_idx_to_instance_idx));
1014
556k
    }
1015
426k
    return Status::OK();
1016
426k
}
1017
1018
Status PipelineFragmentContext::_plan_local_exchange(
1019
        int num_buckets, int pip_idx, PipelinePtr pip,
1020
        const std::map<int, int>& bucket_seq_to_instance_idx,
1021
556k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1022
556k
    int idx = 1;
1023
556k
    bool do_local_exchange = false;
1024
600k
    do {
1025
600k
        auto& ops = pip->operators();
1026
600k
        do_local_exchange = false;
1027
        // Plan local exchange for each operator.
1028
669k
        for (; idx < ops.size();) {
1029
113k
            if (ops[idx]->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1030
101k
                RETURN_IF_ERROR(_add_local_exchange(
1031
101k
                        pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip,
1032
101k
                        ops[idx]->required_data_distribution(_runtime_state.get()),
1033
101k
                        &do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
1034
101k
                        shuffle_idx_to_instance_idx));
1035
101k
            }
1036
113k
            if (do_local_exchange) {
1037
                // If local exchange is needed for current operator, we will split this pipeline to
1038
                // two pipelines by local exchange sink/source. And then we need to process remaining
1039
                // operators in this pipeline so we set idx to 2 (0 is local exchange source and 1
1040
                // is current operator was already processed) and continue to plan local exchange.
1041
44.4k
                idx = 2;
1042
44.4k
                break;
1043
44.4k
            }
1044
68.9k
            idx++;
1045
68.9k
        }
1046
600k
    } while (do_local_exchange);
1047
556k
    if (pip->sink()->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1048
84.5k
        RETURN_IF_ERROR(_add_local_exchange(
1049
84.5k
                pip_idx, idx, pip->sink()->node_id(), _runtime_state->obj_pool(), pip,
1050
84.5k
                pip->sink()->required_data_distribution(_runtime_state.get()), &do_local_exchange,
1051
84.5k
                num_buckets, bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1052
84.5k
    }
1053
556k
    return Status::OK();
1054
556k
}
1055
1056
Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
1057
                                                  const std::vector<TExpr>& output_exprs,
1058
                                                  const TPipelineFragmentParams& params,
1059
                                                  const RowDescriptor& row_desc,
1060
                                                  RuntimeState* state, DescriptorTbl& desc_tbl,
1061
429k
                                                  PipelineId cur_pipeline_id) {
1062
429k
    switch (thrift_sink.type) {
1063
144k
    case TDataSinkType::DATA_STREAM_SINK: {
1064
144k
        if (!thrift_sink.__isset.stream_sink) {
1065
0
            return Status::InternalError("Missing data stream sink.");
1066
0
        }
1067
144k
        _sink = std::make_shared<ExchangeSinkOperatorX>(
1068
144k
                state, row_desc, next_sink_operator_id(), thrift_sink.stream_sink,
1069
144k
                params.destinations, _fragment_instance_ids);
1070
144k
        break;
1071
144k
    }
1072
248k
    case TDataSinkType::RESULT_SINK: {
1073
248k
        if (!thrift_sink.__isset.result_sink) {
1074
0
            return Status::InternalError("Missing data buffer sink.");
1075
0
        }
1076
1077
248k
        auto& pipeline = _pipelines[cur_pipeline_id];
1078
248k
        int child_node_id = pipeline->operators().back()->node_id();
1079
248k
        _sink = std::make_shared<ResultSinkOperatorX>(next_sink_operator_id(), child_node_id + 1,
1080
248k
                                                      row_desc, output_exprs,
1081
248k
                                                      thrift_sink.result_sink);
1082
248k
        break;
1083
248k
    }
1084
105
    case TDataSinkType::DICTIONARY_SINK: {
1085
105
        if (!thrift_sink.__isset.dictionary_sink) {
1086
0
            return Status::InternalError("Missing dict sink.");
1087
0
        }
1088
1089
105
        _sink = std::make_shared<DictSinkOperatorX>(next_sink_operator_id(), row_desc, output_exprs,
1090
105
                                                    thrift_sink.dictionary_sink);
1091
105
        break;
1092
105
    }
1093
0
    case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
1094
30.9k
    case TDataSinkType::OLAP_TABLE_SINK: {
1095
30.9k
        auto& pipeline = _pipelines[cur_pipeline_id];
1096
30.9k
        int child_node_id = pipeline->operators().back()->node_id();
1097
30.9k
        if (state->query_options().enable_memtable_on_sink_node &&
1098
30.9k
            !_has_inverted_index_v1_or_partial_update(thrift_sink.olap_table_sink) &&
1099
30.9k
            !config::is_cloud_mode()) {
1100
2.09k
            _sink = std::make_shared<OlapTableSinkV2OperatorX>(
1101
2.09k
                    pool, next_sink_operator_id(), child_node_id + 1, row_desc, output_exprs);
1102
28.8k
        } else {
1103
28.8k
            _sink = std::make_shared<OlapTableSinkOperatorX>(
1104
28.8k
                    pool, next_sink_operator_id(), child_node_id + 1, row_desc, output_exprs);
1105
28.8k
        }
1106
30.9k
        break;
1107
0
    }
1108
165
    case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
1109
165
        DCHECK(thrift_sink.__isset.olap_table_sink);
1110
165
        DCHECK(state->get_query_ctx() != nullptr);
1111
165
        state->get_query_ctx()->query_mem_tracker()->is_group_commit_load = true;
1112
165
        _sink = std::make_shared<GroupCommitBlockSinkOperatorX>(next_sink_operator_id(), row_desc,
1113
165
                                                                output_exprs);
1114
165
        break;
1115
0
    }
1116
1.46k
    case TDataSinkType::HIVE_TABLE_SINK: {
1117
1.46k
        if (!thrift_sink.__isset.hive_table_sink) {
1118
0
            return Status::InternalError("Missing hive table sink.");
1119
0
        }
1120
1.46k
        _sink = std::make_shared<HiveTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1121
1.46k
                                                         output_exprs);
1122
1.46k
        break;
1123
1.46k
    }
1124
1.73k
    case TDataSinkType::ICEBERG_TABLE_SINK: {
1125
1.73k
        if (!thrift_sink.__isset.iceberg_table_sink) {
1126
0
            return Status::InternalError("Missing iceberg table sink.");
1127
0
        }
1128
1.73k
        if (thrift_sink.iceberg_table_sink.__isset.sort_info) {
1129
0
            _sink = std::make_shared<SpillIcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1130
0
                                                                     row_desc, output_exprs);
1131
1.73k
        } else {
1132
1.73k
            _sink = std::make_shared<IcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1133
1.73k
                                                                row_desc, output_exprs);
1134
1.73k
        }
1135
1.73k
        break;
1136
1.73k
    }
1137
20
    case TDataSinkType::ICEBERG_DELETE_SINK: {
1138
20
        if (!thrift_sink.__isset.iceberg_delete_sink) {
1139
0
            return Status::InternalError("Missing iceberg delete sink.");
1140
0
        }
1141
20
        _sink = std::make_shared<IcebergDeleteSinkOperatorX>(pool, next_sink_operator_id(),
1142
20
                                                             row_desc, output_exprs);
1143
20
        break;
1144
20
    }
1145
80
    case TDataSinkType::ICEBERG_MERGE_SINK: {
1146
80
        if (!thrift_sink.__isset.iceberg_merge_sink) {
1147
0
            return Status::InternalError("Missing iceberg merge sink.");
1148
0
        }
1149
80
        _sink = std::make_shared<IcebergMergeSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1150
80
                                                            output_exprs);
1151
80
        break;
1152
80
    }
1153
0
    case TDataSinkType::MAXCOMPUTE_TABLE_SINK: {
1154
0
        if (!thrift_sink.__isset.max_compute_table_sink) {
1155
0
            return Status::InternalError("Missing max compute table sink.");
1156
0
        }
1157
0
        _sink = std::make_shared<MCTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1158
0
                                                       output_exprs);
1159
0
        break;
1160
0
    }
1161
88
    case TDataSinkType::JDBC_TABLE_SINK: {
1162
88
        if (!thrift_sink.__isset.jdbc_table_sink) {
1163
0
            return Status::InternalError("Missing data jdbc sink.");
1164
0
        }
1165
88
        if (config::enable_java_support) {
1166
88
            _sink = std::make_shared<JdbcTableSinkOperatorX>(row_desc, next_sink_operator_id(),
1167
88
                                                             output_exprs);
1168
88
        } else {
1169
0
            return Status::InternalError(
1170
0
                    "Jdbc table sink is not enabled, you can change be config "
1171
0
                    "enable_java_support to true and restart be.");
1172
0
        }
1173
88
        break;
1174
88
    }
1175
88
    case TDataSinkType::MEMORY_SCRATCH_SINK: {
1176
3
        if (!thrift_sink.__isset.memory_scratch_sink) {
1177
0
            return Status::InternalError("Missing data buffer sink.");
1178
0
        }
1179
1180
3
        _sink = std::make_shared<MemoryScratchSinkOperatorX>(row_desc, next_sink_operator_id(),
1181
3
                                                             output_exprs);
1182
3
        break;
1183
3
    }
1184
502
    case TDataSinkType::RESULT_FILE_SINK: {
1185
502
        if (!thrift_sink.__isset.result_file_sink) {
1186
0
            return Status::InternalError("Missing result file sink.");
1187
0
        }
1188
1189
        // Result file sink is not the top sink
1190
502
        if (params.__isset.destinations && !params.destinations.empty()) {
1191
0
            _sink = std::make_shared<ResultFileSinkOperatorX>(
1192
0
                    next_sink_operator_id(), row_desc, thrift_sink.result_file_sink,
1193
0
                    params.destinations, output_exprs, desc_tbl);
1194
502
        } else {
1195
502
            _sink = std::make_shared<ResultFileSinkOperatorX>(next_sink_operator_id(), row_desc,
1196
502
                                                              output_exprs);
1197
502
        }
1198
502
        break;
1199
502
    }
1200
1.96k
    case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
1201
1.96k
        DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
1202
1.96k
        DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
1203
1.96k
        auto sink_id = next_sink_operator_id();
1204
1.96k
        const int multi_cast_node_id = sink_id;
1205
1.96k
        auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size();
1206
        // one sink has multiple sources.
1207
1.96k
        std::vector<int> sources;
1208
7.64k
        for (int i = 0; i < sender_size; ++i) {
1209
5.67k
            auto source_id = next_operator_id();
1210
5.67k
            sources.push_back(source_id);
1211
5.67k
        }
1212
1213
1.96k
        _sink = std::make_shared<MultiCastDataStreamSinkOperatorX>(
1214
1.96k
                sink_id, multi_cast_node_id, sources, pool, thrift_sink.multi_cast_stream_sink);
1215
7.64k
        for (int i = 0; i < sender_size; ++i) {
1216
5.67k
            auto new_pipeline = add_pipeline();
1217
            // use to exchange sink
1218
5.67k
            RowDescriptor* exchange_row_desc = nullptr;
1219
5.67k
            {
1220
5.67k
                const auto& tmp_row_desc =
1221
5.67k
                        !thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty()
1222
5.67k
                                ? RowDescriptor(state->desc_tbl(),
1223
5.67k
                                                {thrift_sink.multi_cast_stream_sink.sinks[i]
1224
5.67k
                                                         .output_tuple_id})
1225
5.67k
                                : row_desc;
1226
5.67k
                exchange_row_desc = pool->add(new RowDescriptor(tmp_row_desc));
1227
5.67k
            }
1228
5.67k
            auto source_id = sources[i];
1229
5.67k
            OperatorPtr source_op;
1230
            // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
1231
5.67k
            source_op = std::make_shared<MultiCastDataStreamerSourceOperatorX>(
1232
5.67k
                    /*node_id*/ source_id, /*consumer_id*/ i, pool,
1233
5.67k
                    thrift_sink.multi_cast_stream_sink.sinks[i], row_desc,
1234
5.67k
                    /*operator_id=*/source_id);
1235
5.67k
            RETURN_IF_ERROR(new_pipeline->add_operator(
1236
5.67k
                    source_op, params.__isset.parallel_instances ? params.parallel_instances : 0));
1237
            // 2. create and set sink operator of data stream sender for new pipeline
1238
1239
5.67k
            DataSinkOperatorPtr sink_op;
1240
5.67k
            sink_op = std::make_shared<ExchangeSinkOperatorX>(
1241
5.67k
                    state, *exchange_row_desc, next_sink_operator_id(),
1242
5.67k
                    thrift_sink.multi_cast_stream_sink.sinks[i],
1243
5.67k
                    thrift_sink.multi_cast_stream_sink.destinations[i], _fragment_instance_ids);
1244
1245
5.67k
            RETURN_IF_ERROR(new_pipeline->set_sink(sink_op));
1246
5.67k
            {
1247
5.67k
                TDataSink* t = pool->add(new TDataSink());
1248
5.67k
                t->stream_sink = thrift_sink.multi_cast_stream_sink.sinks[i];
1249
5.67k
                RETURN_IF_ERROR(sink_op->init(*t));
1250
5.67k
            }
1251
1252
            // 3. set dependency dag
1253
5.67k
            _dag[new_pipeline->id()].push_back(cur_pipeline_id);
1254
5.67k
        }
1255
1.96k
        if (sources.empty()) {
1256
0
            return Status::InternalError("size of sources must be greater than 0");
1257
0
        }
1258
1.96k
        break;
1259
1.96k
    }
1260
1.96k
    case TDataSinkType::BLACKHOLE_SINK: {
1261
13
        if (!thrift_sink.__isset.blackhole_sink) {
1262
0
            return Status::InternalError("Missing blackhole sink.");
1263
0
        }
1264
1265
13
        _sink.reset(new BlackholeSinkOperatorX(next_sink_operator_id()));
1266
13
        break;
1267
13
    }
1268
156
    case TDataSinkType::TVF_TABLE_SINK: {
1269
156
        if (!thrift_sink.__isset.tvf_table_sink) {
1270
0
            return Status::InternalError("Missing TVF table sink.");
1271
0
        }
1272
156
        _sink = std::make_shared<TVFTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1273
156
                                                        output_exprs);
1274
156
        break;
1275
156
    }
1276
0
    default:
1277
0
        return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type);
1278
429k
    }
1279
429k
    return Status::OK();
1280
429k
}
1281
1282
// NOLINTBEGIN(readability-function-size)
1283
// NOLINTBEGIN(readability-function-cognitive-complexity)
1284
Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNode& tnode,
1285
                                                 const DescriptorTbl& descs, OperatorPtr& op,
1286
                                                 PipelinePtr& cur_pipe, int parent_idx,
1287
                                                 int child_idx,
1288
                                                 const bool followed_by_shuffled_operator,
1289
                                                 const bool require_bucket_distribution,
1290
666k
                                                 OperatorPtr& cache_op) {
1291
666k
    std::vector<DataSinkOperatorPtr> sink_ops;
1292
666k
    Defer defer = Defer([&]() {
1293
666k
        if (op) {
1294
665k
            op->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1295
665k
        }
1296
666k
        for (auto& s : sink_ops) {
1297
123k
            s->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1298
123k
        }
1299
666k
    });
1300
    // We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
1301
    // Therefore, here we need to use a stack-like structure.
1302
666k
    _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
1303
666k
    std::stringstream error_msg;
1304
666k
    bool enable_query_cache = _params.fragment.__isset.query_cache_param;
1305
1306
666k
    bool fe_with_old_version = false;
1307
666k
    switch (tnode.node_type) {
1308
210k
    case TPlanNodeType::OLAP_SCAN_NODE: {
1309
210k
        op = std::make_shared<OlapScanOperatorX>(
1310
210k
                pool, tnode, next_operator_id(), descs, _num_instances,
1311
210k
                enable_query_cache ? _params.fragment.query_cache_param : TQueryCacheParam {});
1312
210k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1313
210k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1314
210k
        break;
1315
210k
    }
1316
78
    case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
1317
78
        DCHECK(_query_ctx != nullptr);
1318
78
        _query_ctx->query_mem_tracker()->is_group_commit_load = true;
1319
78
        op = std::make_shared<GroupCommitOperatorX>(pool, tnode, next_operator_id(), descs,
1320
78
                                                    _num_instances);
1321
78
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1322
78
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1323
78
        break;
1324
78
    }
1325
0
    case TPlanNodeType::JDBC_SCAN_NODE: {
1326
0
        if (config::enable_java_support) {
1327
0
            op = std::make_shared<JDBCScanOperatorX>(pool, tnode, next_operator_id(), descs,
1328
0
                                                     _num_instances);
1329
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1330
0
        } else {
1331
0
            return Status::InternalError(
1332
0
                    "Jdbc scan node is disabled, you can change be config enable_java_support "
1333
0
                    "to true and restart be.");
1334
0
        }
1335
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1336
0
        break;
1337
0
    }
1338
22.5k
    case TPlanNodeType::FILE_SCAN_NODE: {
1339
22.5k
        op = std::make_shared<FileScanOperatorX>(pool, tnode, next_operator_id(), descs,
1340
22.5k
                                                 _num_instances);
1341
22.5k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1342
22.5k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1343
22.5k
        break;
1344
22.5k
    }
1345
0
    case TPlanNodeType::ES_SCAN_NODE:
1346
620
    case TPlanNodeType::ES_HTTP_SCAN_NODE: {
1347
620
        op = std::make_shared<EsScanOperatorX>(pool, tnode, next_operator_id(), descs,
1348
620
                                               _num_instances);
1349
620
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1350
620
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1351
620
        break;
1352
620
    }
1353
147k
    case TPlanNodeType::EXCHANGE_NODE: {
1354
147k
        int num_senders = _params.per_exch_num_senders.contains(tnode.node_id)
1355
147k
                                  ? _params.per_exch_num_senders.find(tnode.node_id)->second
1356
18.4E
                                  : 0;
1357
147k
        DCHECK_GT(num_senders, 0);
1358
147k
        op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs,
1359
147k
                                                       num_senders);
1360
147k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1361
147k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1362
147k
        break;
1363
147k
    }
1364
156k
    case TPlanNodeType::AGGREGATION_NODE: {
1365
156k
        if (tnode.agg_node.grouping_exprs.empty() &&
1366
156k
            descs.get_tuple_descriptor(tnode.agg_node.output_tuple_id)->slots().empty()) {
1367
0
            return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) +
1368
0
                                         ": group by and output is empty");
1369
0
        }
1370
156k
        bool need_create_cache_op =
1371
156k
                enable_query_cache && tnode.node_id == _params.fragment.query_cache_param.node_id;
1372
156k
        auto create_query_cache_operator = [&](PipelinePtr& new_pipe) {
1373
10
            auto cache_node_id = _params.local_params[0].per_node_scan_ranges.begin()->first;
1374
10
            auto cache_source_id = next_operator_id();
1375
10
            op = std::make_shared<CacheSourceOperatorX>(pool, cache_node_id, cache_source_id,
1376
10
                                                        _params.fragment.query_cache_param);
1377
10
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1378
1379
10
            const auto downstream_pipeline_id = cur_pipe->id();
1380
10
            if (!_dag.contains(downstream_pipeline_id)) {
1381
10
                _dag.insert({downstream_pipeline_id, {}});
1382
10
            }
1383
10
            new_pipe = add_pipeline(cur_pipe);
1384
10
            _dag[downstream_pipeline_id].push_back(new_pipe->id());
1385
1386
10
            DataSinkOperatorPtr cache_sink(new CacheSinkOperatorX(
1387
10
                    next_sink_operator_id(), op->node_id(), op->operator_id()));
1388
10
            RETURN_IF_ERROR(new_pipe->set_sink(cache_sink));
1389
10
            return Status::OK();
1390
10
        };
1391
156k
        const bool group_by_limit_opt =
1392
156k
                tnode.agg_node.__isset.agg_sort_info_by_group_key && tnode.limit > 0;
1393
1394
        /// PartitionedAggSourceOperatorX does not support "group by limit opt(#29641)" yet.
1395
        /// If `group_by_limit_opt` is true, then it might not need to spill at all.
1396
156k
        const bool enable_spill = _runtime_state->enable_spill() &&
1397
156k
                                  !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt;
1398
156k
        const bool is_streaming_agg = tnode.agg_node.__isset.use_streaming_preaggregation &&
1399
156k
                                      tnode.agg_node.use_streaming_preaggregation &&
1400
156k
                                      !tnode.agg_node.grouping_exprs.empty();
1401
        // TODO: distinct streaming agg does not support spill.
1402
156k
        const bool can_use_distinct_streaming_agg =
1403
156k
                (!enable_spill || is_streaming_agg) && tnode.agg_node.aggregate_functions.empty() &&
1404
156k
                !tnode.agg_node.__isset.agg_sort_info_by_group_key &&
1405
156k
                _params.query_options.__isset.enable_distinct_streaming_aggregation &&
1406
156k
                _params.query_options.enable_distinct_streaming_aggregation;
1407
1408
156k
        if (can_use_distinct_streaming_agg) {
1409
90.6k
            if (need_create_cache_op) {
1410
8
                PipelinePtr new_pipe;
1411
8
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1412
1413
8
                cache_op = op;
1414
8
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1415
8
                                                                     tnode, descs);
1416
8
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1417
8
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1418
8
                cur_pipe = new_pipe;
1419
90.6k
            } else {
1420
90.6k
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1421
90.6k
                                                                     tnode, descs);
1422
90.6k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1423
90.6k
            }
1424
90.6k
        } else if (is_streaming_agg) {
1425
3.01k
            if (need_create_cache_op) {
1426
0
                PipelinePtr new_pipe;
1427
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1428
0
                cache_op = op;
1429
0
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1430
0
                                                             descs);
1431
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1432
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1433
0
                cur_pipe = new_pipe;
1434
3.01k
            } else {
1435
3.01k
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1436
3.01k
                                                             descs);
1437
3.01k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1438
3.01k
            }
1439
63.1k
        } else {
1440
            // create new pipeline to add query cache operator
1441
63.1k
            PipelinePtr new_pipe;
1442
63.1k
            if (need_create_cache_op) {
1443
2
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1444
2
                cache_op = op;
1445
2
            }
1446
1447
63.1k
            if (enable_spill) {
1448
178
                op = std::make_shared<PartitionedAggSourceOperatorX>(pool, tnode,
1449
178
                                                                     next_operator_id(), descs);
1450
62.9k
            } else {
1451
62.9k
                op = std::make_shared<AggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1452
62.9k
            }
1453
63.1k
            if (need_create_cache_op) {
1454
2
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1455
2
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1456
2
                cur_pipe = new_pipe;
1457
63.1k
            } else {
1458
63.1k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1459
63.1k
            }
1460
1461
63.1k
            const auto downstream_pipeline_id = cur_pipe->id();
1462
63.1k
            if (!_dag.contains(downstream_pipeline_id)) {
1463
60.2k
                _dag.insert({downstream_pipeline_id, {}});
1464
60.2k
            }
1465
63.1k
            cur_pipe = add_pipeline(cur_pipe);
1466
63.1k
            _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1467
1468
63.1k
            if (enable_spill) {
1469
178
                sink_ops.push_back(std::make_shared<PartitionedAggSinkOperatorX>(
1470
178
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1471
62.9k
            } else {
1472
62.9k
                sink_ops.push_back(std::make_shared<AggSinkOperatorX>(
1473
62.9k
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1474
62.9k
            }
1475
63.1k
            RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1476
63.1k
            RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1477
63.1k
        }
1478
156k
        break;
1479
156k
    }
1480
156k
    case TPlanNodeType::BUCKETED_AGGREGATION_NODE: {
1481
83
        if (tnode.bucketed_agg_node.grouping_exprs.empty()) {
1482
0
            return Status::InternalError(
1483
0
                    "Bucketed aggregation node {} should not be used without group by keys",
1484
0
                    tnode.node_id);
1485
0
        }
1486
1487
        // Create source operator (goes on the current / downstream pipeline).
1488
83
        op = std::make_shared<BucketedAggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1489
83
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1490
1491
        // Create a new pipeline for the sink side.
1492
83
        const auto downstream_pipeline_id = cur_pipe->id();
1493
83
        if (!_dag.contains(downstream_pipeline_id)) {
1494
83
            _dag.insert({downstream_pipeline_id, {}});
1495
83
        }
1496
83
        cur_pipe = add_pipeline(cur_pipe);
1497
83
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1498
1499
        // Create sink operator.
1500
83
        sink_ops.push_back(std::make_shared<BucketedAggSinkOperatorX>(
1501
83
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1502
83
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1503
83
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1504
1505
        // Pre-register a single shared state for ALL instances so that every
1506
        // sink instance writes its per-instance hash table into the same
1507
        // BucketedAggSharedState and every source instance can merge across
1508
        // all of them.
1509
83
        {
1510
83
            auto shared_state = BucketedAggSharedState::create_shared();
1511
83
            shared_state->id = op->operator_id();
1512
83
            shared_state->related_op_ids.insert(op->operator_id());
1513
1514
732
            for (int i = 0; i < _num_instances; i++) {
1515
649
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1516
649
                                                             "BUCKETED_AGG_SINK_DEPENDENCY");
1517
649
                sink_dep->set_shared_state(shared_state.get());
1518
649
                shared_state->sink_deps.push_back(sink_dep);
1519
649
            }
1520
83
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1521
83
                                                     op->node_id(), "BUCKETED_AGG_SOURCE");
1522
83
            _op_id_to_shared_state.insert(
1523
83
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1524
83
        }
1525
83
        break;
1526
83
    }
1527
9.75k
    case TPlanNodeType::HASH_JOIN_NODE: {
1528
9.75k
        const auto is_broadcast_join = tnode.hash_join_node.__isset.is_broadcast_join &&
1529
9.75k
                                       tnode.hash_join_node.is_broadcast_join;
1530
9.75k
        const auto enable_spill = _runtime_state->enable_spill();
1531
9.75k
        if (enable_spill && !is_broadcast_join) {
1532
0
            auto tnode_ = tnode;
1533
0
            tnode_.runtime_filters.clear();
1534
0
            auto inner_probe_operator =
1535
0
                    std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, descs);
1536
1537
            // probe side inner sink operator is used to build hash table on probe side when data is spilled.
1538
            // So here use `tnode_` which has no runtime filters.
1539
0
            auto probe_side_inner_sink_operator =
1540
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode_, descs);
1541
1542
0
            RETURN_IF_ERROR(inner_probe_operator->init(tnode_, _runtime_state.get()));
1543
0
            RETURN_IF_ERROR(probe_side_inner_sink_operator->init(tnode_, _runtime_state.get()));
1544
1545
0
            auto probe_operator = std::make_shared<PartitionedHashJoinProbeOperatorX>(
1546
0
                    pool, tnode_, next_operator_id(), descs);
1547
0
            probe_operator->set_inner_operators(probe_side_inner_sink_operator,
1548
0
                                                inner_probe_operator);
1549
0
            op = std::move(probe_operator);
1550
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1551
1552
0
            const auto downstream_pipeline_id = cur_pipe->id();
1553
0
            if (!_dag.contains(downstream_pipeline_id)) {
1554
0
                _dag.insert({downstream_pipeline_id, {}});
1555
0
            }
1556
0
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1557
0
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1558
1559
0
            auto inner_sink_operator =
1560
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode, descs);
1561
0
            auto sink_operator = std::make_shared<PartitionedHashJoinSinkOperatorX>(
1562
0
                    pool, next_sink_operator_id(), op->operator_id(), tnode_, descs);
1563
0
            RETURN_IF_ERROR(inner_sink_operator->init(tnode, _runtime_state.get()));
1564
1565
0
            sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator);
1566
0
            sink_ops.push_back(std::move(sink_operator));
1567
0
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1568
0
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode_, _runtime_state.get()));
1569
1570
0
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1571
0
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1572
9.75k
        } else {
1573
9.75k
            op = std::make_shared<HashJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1574
9.75k
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1575
1576
9.75k
            const auto downstream_pipeline_id = cur_pipe->id();
1577
9.75k
            if (!_dag.contains(downstream_pipeline_id)) {
1578
8.04k
                _dag.insert({downstream_pipeline_id, {}});
1579
8.04k
            }
1580
9.75k
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1581
9.75k
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1582
1583
9.75k
            sink_ops.push_back(std::make_shared<HashJoinBuildSinkOperatorX>(
1584
9.75k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1585
9.75k
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1586
9.75k
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1587
1588
9.75k
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1589
9.75k
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1590
9.75k
        }
1591
9.75k
        if (is_broadcast_join && _runtime_state->enable_share_hash_table_for_broadcast_join()) {
1592
3.20k
            std::shared_ptr<HashJoinSharedState> shared_state =
1593
3.20k
                    HashJoinSharedState::create_shared(_num_instances);
1594
20.5k
            for (int i = 0; i < _num_instances; i++) {
1595
17.3k
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1596
17.3k
                                                             "HASH_JOIN_BUILD_DEPENDENCY");
1597
17.3k
                sink_dep->set_shared_state(shared_state.get());
1598
17.3k
                shared_state->sink_deps.push_back(sink_dep);
1599
17.3k
            }
1600
3.20k
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1601
3.20k
                                                     op->node_id(), "HASH_JOIN_PROBE");
1602
3.20k
            _op_id_to_shared_state.insert(
1603
3.20k
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1604
3.20k
        }
1605
9.75k
        break;
1606
9.75k
    }
1607
4.85k
    case TPlanNodeType::CROSS_JOIN_NODE: {
1608
4.85k
        op = std::make_shared<NestedLoopJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1609
4.85k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1610
1611
4.85k
        const auto downstream_pipeline_id = cur_pipe->id();
1612
4.85k
        if (!_dag.contains(downstream_pipeline_id)) {
1613
4.62k
            _dag.insert({downstream_pipeline_id, {}});
1614
4.62k
        }
1615
4.85k
        PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1616
4.85k
        _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1617
1618
4.85k
        sink_ops.push_back(std::make_shared<NestedLoopJoinBuildSinkOperatorX>(
1619
4.85k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1620
4.85k
        RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1621
4.85k
        RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1622
4.85k
        _pipeline_parent_map.push(op->node_id(), cur_pipe);
1623
4.85k
        _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1624
4.85k
        break;
1625
4.85k
    }
1626
52.9k
    case TPlanNodeType::UNION_NODE: {
1627
52.9k
        int child_count = tnode.num_children;
1628
52.9k
        op = std::make_shared<UnionSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1629
52.9k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1630
1631
52.9k
        const auto downstream_pipeline_id = cur_pipe->id();
1632
52.9k
        if (!_dag.contains(downstream_pipeline_id)) {
1633
52.6k
            _dag.insert({downstream_pipeline_id, {}});
1634
52.6k
        }
1635
54.3k
        for (int i = 0; i < child_count; i++) {
1636
1.40k
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1637
1.40k
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1638
1.40k
            sink_ops.push_back(std::make_shared<UnionSinkOperatorX>(
1639
1.40k
                    i, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1640
1.40k
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1641
1.40k
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1642
            // preset children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1643
1.40k
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1644
1.40k
        }
1645
52.9k
        break;
1646
52.9k
    }
1647
52.9k
    case TPlanNodeType::SORT_NODE: {
1648
43.2k
        const auto should_spill = _runtime_state->enable_spill() &&
1649
43.2k
                                  tnode.sort_node.algorithm == TSortAlgorithm::FULL_SORT;
1650
43.2k
        const bool use_local_merge =
1651
43.2k
                tnode.sort_node.__isset.use_local_merge && tnode.sort_node.use_local_merge;
1652
43.2k
        if (should_spill) {
1653
9
            op = std::make_shared<SpillSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1654
43.2k
        } else if (use_local_merge) {
1655
41.0k
            op = std::make_shared<LocalMergeSortSourceOperatorX>(pool, tnode, next_operator_id(),
1656
41.0k
                                                                 descs);
1657
41.0k
        } else {
1658
2.24k
            op = std::make_shared<SortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1659
2.24k
        }
1660
43.2k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1661
1662
43.2k
        const auto downstream_pipeline_id = cur_pipe->id();
1663
43.2k
        if (!_dag.contains(downstream_pipeline_id)) {
1664
43.2k
            _dag.insert({downstream_pipeline_id, {}});
1665
43.2k
        }
1666
43.2k
        cur_pipe = add_pipeline(cur_pipe);
1667
43.2k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1668
1669
43.2k
        if (should_spill) {
1670
9
            sink_ops.push_back(std::make_shared<SpillSortSinkOperatorX>(
1671
9
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1672
43.2k
        } else {
1673
43.2k
            sink_ops.push_back(std::make_shared<SortSinkOperatorX>(
1674
43.2k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1675
43.2k
        }
1676
43.2k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1677
43.2k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1678
43.2k
        break;
1679
43.2k
    }
1680
43.2k
    case TPlanNodeType::PARTITION_SORT_NODE: {
1681
60
        op = std::make_shared<PartitionSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1682
60
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1683
1684
60
        const auto downstream_pipeline_id = cur_pipe->id();
1685
60
        if (!_dag.contains(downstream_pipeline_id)) {
1686
60
            _dag.insert({downstream_pipeline_id, {}});
1687
60
        }
1688
60
        cur_pipe = add_pipeline(cur_pipe);
1689
60
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1690
1691
60
        sink_ops.push_back(std::make_shared<PartitionSortSinkOperatorX>(
1692
60
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1693
60
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1694
60
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1695
60
        break;
1696
60
    }
1697
1.55k
    case TPlanNodeType::ANALYTIC_EVAL_NODE: {
1698
1.55k
        op = std::make_shared<AnalyticSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1699
1.55k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1700
1701
1.55k
        const auto downstream_pipeline_id = cur_pipe->id();
1702
1.55k
        if (!_dag.contains(downstream_pipeline_id)) {
1703
1.53k
            _dag.insert({downstream_pipeline_id, {}});
1704
1.53k
        }
1705
1.55k
        cur_pipe = add_pipeline(cur_pipe);
1706
1.55k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1707
1708
1.55k
        sink_ops.push_back(std::make_shared<AnalyticSinkOperatorX>(
1709
1.55k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1710
1.55k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1711
1.55k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1712
1.55k
        break;
1713
1.55k
    }
1714
1.61k
    case TPlanNodeType::MATERIALIZATION_NODE: {
1715
1.61k
        op = std::make_shared<MaterializationOperator>(pool, tnode, next_operator_id(), descs);
1716
1.61k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1717
1.61k
        break;
1718
1.61k
    }
1719
1.61k
    case TPlanNodeType::INTERSECT_NODE: {
1720
118
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(pool, tnode, descs, op,
1721
118
                                                                      cur_pipe, sink_ops));
1722
118
        break;
1723
118
    }
1724
122
    case TPlanNodeType::EXCEPT_NODE: {
1725
122
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(pool, tnode, descs, op,
1726
122
                                                                       cur_pipe, sink_ops));
1727
122
        break;
1728
122
    }
1729
314
    case TPlanNodeType::REPEAT_NODE: {
1730
314
        op = std::make_shared<RepeatOperatorX>(pool, tnode, next_operator_id(), descs);
1731
314
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1732
314
        break;
1733
314
    }
1734
640
    case TPlanNodeType::TABLE_FUNCTION_NODE: {
1735
640
        op = std::make_shared<TableFunctionOperatorX>(pool, tnode, next_operator_id(), descs);
1736
640
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1737
640
        break;
1738
640
    }
1739
640
    case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
1740
218
        op = std::make_shared<AssertNumRowsOperatorX>(pool, tnode, next_operator_id(), descs);
1741
218
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1742
218
        break;
1743
218
    }
1744
1.58k
    case TPlanNodeType::EMPTY_SET_NODE: {
1745
1.58k
        op = std::make_shared<EmptySetSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1746
1.58k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1747
1.58k
        break;
1748
1.58k
    }
1749
1.58k
    case TPlanNodeType::DATA_GEN_SCAN_NODE: {
1750
458
        op = std::make_shared<DataGenSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1751
458
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1752
458
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1753
458
        break;
1754
458
    }
1755
2.04k
    case TPlanNodeType::SCHEMA_SCAN_NODE: {
1756
2.04k
        op = std::make_shared<SchemaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1757
2.04k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1758
2.04k
        break;
1759
2.04k
    }
1760
6.13k
    case TPlanNodeType::META_SCAN_NODE: {
1761
6.13k
        op = std::make_shared<MetaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1762
6.13k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1763
6.13k
        break;
1764
6.13k
    }
1765
6.13k
    case TPlanNodeType::SELECT_NODE: {
1766
1.93k
        op = std::make_shared<SelectOperatorX>(pool, tnode, next_operator_id(), descs);
1767
1.93k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1768
1.93k
        break;
1769
1.93k
    }
1770
1.93k
    case TPlanNodeType::REC_CTE_NODE: {
1771
151
        op = std::make_shared<RecCTESourceOperatorX>(pool, tnode, next_operator_id(), descs);
1772
151
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1773
1774
151
        const auto downstream_pipeline_id = cur_pipe->id();
1775
151
        if (!_dag.contains(downstream_pipeline_id)) {
1776
148
            _dag.insert({downstream_pipeline_id, {}});
1777
148
        }
1778
1779
151
        PipelinePtr anchor_side_pipe = add_pipeline(cur_pipe);
1780
151
        _dag[downstream_pipeline_id].push_back(anchor_side_pipe->id());
1781
1782
151
        DataSinkOperatorPtr anchor_sink;
1783
151
        anchor_sink = std::make_shared<RecCTEAnchorSinkOperatorX>(next_sink_operator_id(),
1784
151
                                                                  op->operator_id(), tnode, descs);
1785
151
        RETURN_IF_ERROR(anchor_side_pipe->set_sink(anchor_sink));
1786
151
        RETURN_IF_ERROR(anchor_side_pipe->sink()->init(tnode, _runtime_state.get()));
1787
151
        _pipeline_parent_map.push(op->node_id(), anchor_side_pipe);
1788
1789
151
        PipelinePtr rec_side_pipe = add_pipeline(cur_pipe);
1790
151
        _dag[downstream_pipeline_id].push_back(rec_side_pipe->id());
1791
1792
151
        DataSinkOperatorPtr rec_sink;
1793
151
        rec_sink = std::make_shared<RecCTESinkOperatorX>(next_sink_operator_id(), op->operator_id(),
1794
151
                                                         tnode, descs);
1795
151
        RETURN_IF_ERROR(rec_side_pipe->set_sink(rec_sink));
1796
151
        RETURN_IF_ERROR(rec_side_pipe->sink()->init(tnode, _runtime_state.get()));
1797
151
        _pipeline_parent_map.push(op->node_id(), rec_side_pipe);
1798
1799
151
        break;
1800
151
    }
1801
1.95k
    case TPlanNodeType::REC_CTE_SCAN_NODE: {
1802
1.95k
        op = std::make_shared<RecCTEScanOperatorX>(pool, tnode, next_operator_id(), descs);
1803
1.95k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1804
1.95k
        break;
1805
1.95k
    }
1806
1.95k
    default:
1807
0
        return Status::InternalError("Unsupported exec type in pipeline: {}",
1808
0
                                     print_plan_node_type(tnode.node_type));
1809
666k
    }
1810
664k
    if (_params.__isset.parallel_instances && fe_with_old_version) {
1811
0
        cur_pipe->set_num_tasks(_params.parallel_instances);
1812
0
        op->set_serial_operator();
1813
0
    }
1814
1815
664k
    return Status::OK();
1816
666k
}
1817
// NOLINTEND(readability-function-cognitive-complexity)
1818
// NOLINTEND(readability-function-size)
1819
1820
template <bool is_intersect>
1821
Status PipelineFragmentContext::_build_operators_for_set_operation_node(
1822
        ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op,
1823
240
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1824
240
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1825
240
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1826
1827
240
    const auto downstream_pipeline_id = cur_pipe->id();
1828
240
    if (!_dag.contains(downstream_pipeline_id)) {
1829
215
        _dag.insert({downstream_pipeline_id, {}});
1830
215
    }
1831
1832
812
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1833
572
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1834
572
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1835
1836
572
        if (child_id == 0) {
1837
240
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1838
240
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1839
332
        } else {
1840
332
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1841
332
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1842
332
        }
1843
572
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1844
572
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1845
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1846
572
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1847
572
    }
1848
1849
240
    return Status::OK();
1850
240
}
_ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb1EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Line
Count
Source
1823
118
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1824
118
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1825
118
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1826
1827
118
    const auto downstream_pipeline_id = cur_pipe->id();
1828
118
    if (!_dag.contains(downstream_pipeline_id)) {
1829
102
        _dag.insert({downstream_pipeline_id, {}});
1830
102
    }
1831
1832
432
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1833
314
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1834
314
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1835
1836
314
        if (child_id == 0) {
1837
118
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1838
118
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1839
196
        } else {
1840
196
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1841
196
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1842
196
        }
1843
314
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1844
314
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1845
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1846
314
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1847
314
    }
1848
1849
118
    return Status::OK();
1850
118
}
_ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb0EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Line
Count
Source
1823
122
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1824
122
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1825
122
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1826
1827
122
    const auto downstream_pipeline_id = cur_pipe->id();
1828
122
    if (!_dag.contains(downstream_pipeline_id)) {
1829
113
        _dag.insert({downstream_pipeline_id, {}});
1830
113
    }
1831
1832
380
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1833
258
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1834
258
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1835
1836
258
        if (child_id == 0) {
1837
122
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1838
122
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1839
136
        } else {
1840
136
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1841
136
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1842
136
        }
1843
258
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1844
258
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1845
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1846
258
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1847
258
    }
1848
1849
122
    return Status::OK();
1850
122
}
1851
1852
427k
Status PipelineFragmentContext::submit() {
1853
427k
    if (_submitted) {
1854
0
        return Status::InternalError("submitted");
1855
0
    }
1856
427k
    _submitted = true;
1857
1858
427k
    int submit_tasks = 0;
1859
427k
    Status st;
1860
427k
    auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
1861
1.24M
    for (auto& task : _tasks) {
1862
2.04M
        for (auto& t : task) {
1863
2.04M
            st = scheduler->submit(t.first);
1864
2.04M
            DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed",
1865
2.04M
                            { st = Status::Aborted("PipelineFragmentContext.submit.failed"); });
1866
2.04M
            if (!st) {
1867
0
                cancel(Status::InternalError("submit context to executor fail"));
1868
0
                std::lock_guard<std::mutex> l(_task_mutex);
1869
0
                _total_tasks = submit_tasks;
1870
0
                break;
1871
0
            }
1872
2.04M
            submit_tasks++;
1873
2.04M
        }
1874
1.24M
    }
1875
427k
    if (!st.ok()) {
1876
0
        bool need_remove = false;
1877
0
        {
1878
0
            std::lock_guard<std::mutex> l(_task_mutex);
1879
0
            if (_closed_tasks >= _total_tasks) {
1880
0
                need_remove = _close_fragment_instance();
1881
0
            }
1882
0
        }
1883
        // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
1884
0
        if (need_remove) {
1885
0
            _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
1886
0
        }
1887
0
        return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(),
1888
0
                                     BackendOptions::get_localhost());
1889
427k
    } else {
1890
427k
        return st;
1891
427k
    }
1892
427k
}
1893
1894
42
void PipelineFragmentContext::print_profile(const std::string& extra_info) {
1895
42
    if (_runtime_state->enable_profile()) {
1896
0
        std::stringstream ss;
1897
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
1898
0
            runtime_profile_ptr->pretty_print(&ss);
1899
0
        }
1900
1901
0
        if (_runtime_state->load_channel_profile()) {
1902
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
1903
0
        }
1904
1905
0
        auto profile_str =
1906
0
                fmt::format("Query {} fragment {} {}, profile, {}", print_id(this->_query_id),
1907
0
                            this->_fragment_id, extra_info, ss.str());
1908
0
        LOG_LONG_STRING(INFO, profile_str);
1909
0
    }
1910
42
}
1911
// If all pipeline tasks binded to the fragment instance are finished, then we could
1912
// close the fragment instance.
1913
// Returns true if the caller should call remove_pipeline_context() **after** releasing
1914
// _task_mutex. We must not call remove_pipeline_context() here because it acquires
1915
// _pipeline_map's shard lock, and this function is called while _task_mutex is held.
1916
// Acquiring _pipeline_map while holding _task_mutex creates an ABBA deadlock with
1917
// dump_pipeline_tasks(), which acquires _pipeline_map first and then _task_mutex
1918
// (via debug_string()).
1919
430k
bool PipelineFragmentContext::_close_fragment_instance() {
1920
430k
    if (_is_fragment_instance_closed) {
1921
0
        return false;
1922
0
    }
1923
430k
    Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
1924
430k
    _fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
1925
430k
    if (!_need_notify_close) {
1926
426k
        auto st = send_report(true);
1927
426k
        if (!st) {
1928
0
            LOG(WARNING) << fmt::format("Failed to send report for query {}, fragment {}: {}",
1929
0
                                        print_id(_query_id), _fragment_id, st.to_string());
1930
0
        }
1931
426k
    }
1932
    // Print profile content in info log is a tempoeray solution for stream load and external_connector.
1933
    // Since stream load does not have someting like coordinator on FE, so
1934
    // backend can not report profile to FE, ant its profile can not be shown
1935
    // in the same way with other query. So we print the profile content to info log.
1936
1937
430k
    if (_runtime_state->enable_profile() &&
1938
430k
        (_query_ctx->get_query_source() == QuerySource::STREAM_LOAD ||
1939
2.53k
         _query_ctx->get_query_source() == QuerySource::EXTERNAL_CONNECTOR ||
1940
2.53k
         _query_ctx->get_query_source() == QuerySource::GROUP_COMMIT_LOAD)) {
1941
0
        std::stringstream ss;
1942
        // Compute the _local_time_percent before pretty_print the runtime_profile
1943
        // Before add this operation, the print out like that:
1944
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 00.00%)
1945
        // After add the operation, the print out like that:
1946
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 82.53%)
1947
        // We can easily know the exec node execute time without child time consumed.
1948
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
1949
0
            runtime_profile_ptr->pretty_print(&ss);
1950
0
        }
1951
1952
0
        if (_runtime_state->load_channel_profile()) {
1953
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
1954
0
        }
1955
1956
0
        LOG_INFO("Query {} fragment {} profile:\n {}", print_id(_query_id), _fragment_id, ss.str());
1957
0
    }
1958
1959
430k
    if (_query_ctx->enable_profile()) {
1960
2.53k
        _query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile(),
1961
2.53k
                                         collect_realtime_load_channel_profile());
1962
2.53k
    }
1963
1964
    // Return whether the caller needs to remove from the pipeline map.
1965
    // The caller must do this after releasing _task_mutex.
1966
430k
    return !_need_notify_close;
1967
430k
}
1968
1969
2.03M
void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
1970
    // If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here
1971
2.03M
    DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
1972
2.03M
    if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
1973
665k
        if (_dag.contains(pipeline_id)) {
1974
339k
            for (auto dep : _dag[pipeline_id]) {
1975
339k
                _pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id);
1976
339k
            }
1977
276k
        }
1978
665k
    }
1979
2.03M
    bool need_remove = false;
1980
2.03M
    {
1981
2.03M
        std::lock_guard<std::mutex> l(_task_mutex);
1982
2.03M
        ++_closed_tasks;
1983
2.03M
        if (_closed_tasks >= _total_tasks) {
1984
430k
            need_remove = _close_fragment_instance();
1985
430k
        }
1986
2.03M
    }
1987
    // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
1988
2.03M
    if (need_remove) {
1989
426k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
1990
426k
    }
1991
2.03M
}
1992
1993
53.1k
std::string PipelineFragmentContext::get_load_error_url() {
1994
53.1k
    if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) {
1995
0
        return to_load_error_http_path(str);
1996
0
    }
1997
141k
    for (auto& tasks : _tasks) {
1998
224k
        for (auto& task : tasks) {
1999
224k
            if (const auto& str = task.second->get_error_log_file_path(); !str.empty()) {
2000
170
                return to_load_error_http_path(str);
2001
170
            }
2002
224k
        }
2003
141k
    }
2004
52.9k
    return "";
2005
53.1k
}
2006
2007
53.1k
std::string PipelineFragmentContext::get_first_error_msg() {
2008
53.1k
    if (const auto& str = _runtime_state->get_first_error_msg(); !str.empty()) {
2009
0
        return str;
2010
0
    }
2011
141k
    for (auto& tasks : _tasks) {
2012
224k
        for (auto& task : tasks) {
2013
224k
            if (const auto& str = task.second->get_first_error_msg(); !str.empty()) {
2014
170
                return str;
2015
170
            }
2016
224k
        }
2017
141k
    }
2018
52.9k
    return "";
2019
53.1k
}
2020
2021
0
std::string PipelineFragmentContext::_to_http_path(const std::string& file_name) const {
2022
0
    std::stringstream url;
2023
0
    url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
2024
0
        << "/api/_download_load?"
2025
0
        << "token=" << _exec_env->token() << "&file=" << file_name;
2026
0
    return url.str();
2027
0
}
2028
2029
47.5k
void PipelineFragmentContext::_coordinator_callback(const ReportStatusRequest& req) {
2030
47.5k
    DBUG_EXECUTE_IF("FragmentMgr::coordinator_callback.report_delay", {
2031
47.5k
        int random_seconds = req.status.is<ErrorCode::DATA_QUALITY_ERROR>() ? 8 : 2;
2032
47.5k
        LOG_INFO("sleep : ").tag("time", random_seconds).tag("query_id", print_id(req.query_id));
2033
47.5k
        std::this_thread::sleep_for(std::chrono::seconds(random_seconds));
2034
47.5k
        LOG_INFO("sleep done").tag("query_id", print_id(req.query_id));
2035
47.5k
    });
2036
2037
47.5k
    DCHECK(req.status.ok() || req.done); // if !status.ok() => done
2038
47.5k
    if (req.coord_addr.hostname == "external") {
2039
        // External query (flink/spark read tablets) not need to report to FE.
2040
0
        return;
2041
0
    }
2042
47.5k
    int callback_retries = 10;
2043
47.5k
    const int sleep_ms = 1000;
2044
47.5k
    Status exec_status = req.status;
2045
47.5k
    Status coord_status;
2046
47.5k
    std::unique_ptr<FrontendServiceConnection> coord = nullptr;
2047
47.5k
    do {
2048
47.5k
        coord = std::make_unique<FrontendServiceConnection>(_exec_env->frontend_client_cache(),
2049
47.5k
                                                            req.coord_addr, &coord_status);
2050
47.5k
        if (!coord_status.ok()) {
2051
0
            std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
2052
0
        }
2053
47.5k
    } while (!coord_status.ok() && callback_retries-- > 0);
2054
2055
47.5k
    if (!coord_status.ok()) {
2056
0
        UniqueId uid(req.query_id.hi, req.query_id.lo);
2057
0
        static_cast<void>(req.cancel_fn(Status::InternalError(
2058
0
                "query_id: {}, couldn't get a client for {}, reason is {}", uid.to_string(),
2059
0
                PrintThriftNetworkAddress(req.coord_addr), coord_status.to_string())));
2060
0
        return;
2061
0
    }
2062
2063
47.5k
    TReportExecStatusParams params;
2064
47.5k
    params.protocol_version = FrontendServiceVersion::V1;
2065
47.5k
    params.__set_query_id(req.query_id);
2066
47.5k
    params.__set_backend_num(req.backend_num);
2067
47.5k
    params.__set_fragment_instance_id(req.fragment_instance_id);
2068
47.5k
    params.__set_fragment_id(req.fragment_id);
2069
47.5k
    params.__set_status(exec_status.to_thrift());
2070
47.5k
    params.__set_done(req.done);
2071
47.5k
    params.__set_query_type(req.runtime_state->query_type());
2072
47.5k
    params.__isset.profile = false;
2073
2074
47.5k
    DCHECK(req.runtime_state != nullptr);
2075
2076
47.5k
    if (req.runtime_state->query_type() == TQueryType::LOAD) {
2077
42.8k
        params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
2078
42.8k
        params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
2079
42.8k
    } else {
2080
4.65k
        DCHECK(!req.runtime_states.empty());
2081
4.65k
        if (!req.runtime_state->output_files().empty()) {
2082
0
            params.__isset.delta_urls = true;
2083
0
            for (auto& it : req.runtime_state->output_files()) {
2084
0
                params.delta_urls.push_back(_to_http_path(it));
2085
0
            }
2086
0
        }
2087
4.65k
        if (!params.delta_urls.empty()) {
2088
0
            params.__isset.delta_urls = true;
2089
0
        }
2090
4.65k
    }
2091
2092
47.5k
    static std::string s_dpp_normal_all = "dpp.norm.ALL";
2093
47.5k
    static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
2094
47.5k
    static std::string s_unselected_rows = "unselected.rows";
2095
47.5k
    int64_t num_rows_load_success = 0;
2096
47.5k
    int64_t num_rows_load_filtered = 0;
2097
47.5k
    int64_t num_rows_load_unselected = 0;
2098
47.5k
    if (req.runtime_state->num_rows_load_total() > 0 ||
2099
47.5k
        req.runtime_state->num_rows_load_filtered() > 0 ||
2100
47.5k
        req.runtime_state->num_finished_range() > 0) {
2101
0
        params.__isset.load_counters = true;
2102
2103
0
        num_rows_load_success = req.runtime_state->num_rows_load_success();
2104
0
        num_rows_load_filtered = req.runtime_state->num_rows_load_filtered();
2105
0
        num_rows_load_unselected = req.runtime_state->num_rows_load_unselected();
2106
0
        params.__isset.fragment_instance_reports = true;
2107
0
        TFragmentInstanceReport t;
2108
0
        t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id());
2109
0
        t.__set_num_finished_range(cast_set<int>(req.runtime_state->num_finished_range()));
2110
0
        t.__set_loaded_rows(req.runtime_state->num_rows_load_total());
2111
0
        t.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
2112
0
        params.fragment_instance_reports.push_back(t);
2113
47.5k
    } else if (!req.runtime_states.empty()) {
2114
183k
        for (auto* rs : req.runtime_states) {
2115
183k
            if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 ||
2116
183k
                rs->num_finished_range() > 0) {
2117
35.5k
                params.__isset.load_counters = true;
2118
35.5k
                num_rows_load_success += rs->num_rows_load_success();
2119
35.5k
                num_rows_load_filtered += rs->num_rows_load_filtered();
2120
35.5k
                num_rows_load_unselected += rs->num_rows_load_unselected();
2121
35.5k
                params.__isset.fragment_instance_reports = true;
2122
35.5k
                TFragmentInstanceReport t;
2123
35.5k
                t.__set_fragment_instance_id(rs->fragment_instance_id());
2124
35.5k
                t.__set_num_finished_range(cast_set<int>(rs->num_finished_range()));
2125
35.5k
                t.__set_loaded_rows(rs->num_rows_load_total());
2126
35.5k
                t.__set_loaded_bytes(rs->num_bytes_load_total());
2127
35.5k
                params.fragment_instance_reports.push_back(t);
2128
35.5k
            }
2129
183k
        }
2130
47.5k
    }
2131
47.5k
    params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success));
2132
47.5k
    params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered));
2133
47.5k
    params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected));
2134
2135
47.5k
    if (!req.load_error_url.empty()) {
2136
155
        params.__set_tracking_url(req.load_error_url);
2137
155
    }
2138
47.5k
    if (!req.first_error_msg.empty()) {
2139
155
        params.__set_first_error_msg(req.first_error_msg);
2140
155
    }
2141
183k
    for (auto* rs : req.runtime_states) {
2142
183k
        if (rs->wal_id() > 0) {
2143
104
            params.__set_txn_id(rs->wal_id());
2144
104
            params.__set_label(rs->import_label());
2145
104
        }
2146
183k
    }
2147
47.5k
    if (!req.runtime_state->export_output_files().empty()) {
2148
0
        params.__isset.export_files = true;
2149
0
        params.export_files = req.runtime_state->export_output_files();
2150
47.5k
    } else if (!req.runtime_states.empty()) {
2151
183k
        for (auto* rs : req.runtime_states) {
2152
183k
            if (!rs->export_output_files().empty()) {
2153
0
                params.__isset.export_files = true;
2154
0
                params.export_files.insert(params.export_files.end(),
2155
0
                                           rs->export_output_files().begin(),
2156
0
                                           rs->export_output_files().end());
2157
0
            }
2158
183k
        }
2159
47.5k
    }
2160
47.5k
    if (auto tci = req.runtime_state->tablet_commit_infos(); !tci.empty()) {
2161
0
        params.__isset.commitInfos = true;
2162
0
        params.commitInfos.insert(params.commitInfos.end(), tci.begin(), tci.end());
2163
47.5k
    } else if (!req.runtime_states.empty()) {
2164
183k
        for (auto* rs : req.runtime_states) {
2165
183k
            if (auto rs_tci = rs->tablet_commit_infos(); !rs_tci.empty()) {
2166
26.7k
                params.__isset.commitInfos = true;
2167
26.7k
                params.commitInfos.insert(params.commitInfos.end(), rs_tci.begin(), rs_tci.end());
2168
26.7k
            }
2169
183k
        }
2170
47.5k
    }
2171
47.5k
    if (auto eti = req.runtime_state->error_tablet_infos(); !eti.empty()) {
2172
0
        params.__isset.errorTabletInfos = true;
2173
0
        params.errorTabletInfos.insert(params.errorTabletInfos.end(), eti.begin(), eti.end());
2174
47.5k
    } else if (!req.runtime_states.empty()) {
2175
183k
        for (auto* rs : req.runtime_states) {
2176
183k
            if (auto rs_eti = rs->error_tablet_infos(); !rs_eti.empty()) {
2177
0
                params.__isset.errorTabletInfos = true;
2178
0
                params.errorTabletInfos.insert(params.errorTabletInfos.end(), rs_eti.begin(),
2179
0
                                               rs_eti.end());
2180
0
            }
2181
183k
        }
2182
47.5k
    }
2183
47.5k
    if (auto hpu = req.runtime_state->hive_partition_updates(); !hpu.empty()) {
2184
0
        params.__isset.hive_partition_updates = true;
2185
0
        params.hive_partition_updates.insert(params.hive_partition_updates.end(), hpu.begin(),
2186
0
                                             hpu.end());
2187
47.5k
    } else if (!req.runtime_states.empty()) {
2188
183k
        for (auto* rs : req.runtime_states) {
2189
183k
            if (auto rs_hpu = rs->hive_partition_updates(); !rs_hpu.empty()) {
2190
2.11k
                params.__isset.hive_partition_updates = true;
2191
2.11k
                params.hive_partition_updates.insert(params.hive_partition_updates.end(),
2192
2.11k
                                                     rs_hpu.begin(), rs_hpu.end());
2193
2.11k
            }
2194
183k
        }
2195
47.5k
    }
2196
47.5k
    if (auto icd = req.runtime_state->iceberg_commit_datas(); !icd.empty()) {
2197
0
        params.__isset.iceberg_commit_datas = true;
2198
0
        params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(), icd.begin(),
2199
0
                                           icd.end());
2200
47.5k
    } else if (!req.runtime_states.empty()) {
2201
183k
        for (auto* rs : req.runtime_states) {
2202
183k
            if (auto rs_icd = rs->iceberg_commit_datas(); !rs_icd.empty()) {
2203
2.07k
                params.__isset.iceberg_commit_datas = true;
2204
2.07k
                params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(),
2205
2.07k
                                                   rs_icd.begin(), rs_icd.end());
2206
2.07k
            }
2207
183k
        }
2208
47.5k
    }
2209
2210
47.5k
    if (auto mcd = req.runtime_state->mc_commit_datas(); !mcd.empty()) {
2211
0
        params.__isset.mc_commit_datas = true;
2212
0
        params.mc_commit_datas.insert(params.mc_commit_datas.end(), mcd.begin(), mcd.end());
2213
47.5k
    } else if (!req.runtime_states.empty()) {
2214
183k
        for (auto* rs : req.runtime_states) {
2215
183k
            if (auto rs_mcd = rs->mc_commit_datas(); !rs_mcd.empty()) {
2216
0
                params.__isset.mc_commit_datas = true;
2217
0
                params.mc_commit_datas.insert(params.mc_commit_datas.end(), rs_mcd.begin(),
2218
0
                                              rs_mcd.end());
2219
0
            }
2220
183k
        }
2221
47.5k
    }
2222
2223
47.5k
    req.runtime_state->get_unreported_errors(&(params.error_log));
2224
47.5k
    params.__isset.error_log = (!params.error_log.empty());
2225
2226
47.5k
    if (_exec_env->cluster_info()->backend_id != 0) {
2227
47.5k
        params.__set_backend_id(_exec_env->cluster_info()->backend_id);
2228
47.5k
    }
2229
2230
47.5k
    TReportExecStatusResult res;
2231
47.5k
    Status rpc_status;
2232
2233
47.5k
    VLOG_DEBUG << "reportExecStatus params is "
2234
8
               << apache::thrift::ThriftDebugString(params).c_str();
2235
47.5k
    if (!exec_status.ok()) {
2236
1.65k
        LOG(WARNING) << "report error status: " << exec_status.msg()
2237
1.65k
                     << " to coordinator: " << req.coord_addr
2238
1.65k
                     << ", query id: " << print_id(req.query_id);
2239
1.65k
    }
2240
47.5k
    try {
2241
47.5k
        try {
2242
47.5k
            (*coord)->reportExecStatus(res, params);
2243
47.5k
        } catch ([[maybe_unused]] apache::thrift::transport::TTransportException& e) {
2244
#ifndef ADDRESS_SANITIZER
2245
            LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(req.query_id)
2246
                         << ", instance id: " << print_id(req.fragment_instance_id) << " to "
2247
                         << req.coord_addr << ", err: " << e.what();
2248
#endif
2249
0
            rpc_status = coord->reopen();
2250
2251
0
            if (!rpc_status.ok()) {
2252
0
                req.cancel_fn(rpc_status);
2253
0
                return;
2254
0
            }
2255
0
            (*coord)->reportExecStatus(res, params);
2256
0
        }
2257
2258
47.5k
        rpc_status = Status::create<false>(res.status);
2259
47.5k
    } catch (apache::thrift::TException& e) {
2260
0
        rpc_status = Status::InternalError("ReportExecStatus() to {} failed: {}",
2261
0
                                           PrintThriftNetworkAddress(req.coord_addr), e.what());
2262
0
    }
2263
2264
47.5k
    if (!rpc_status.ok()) {
2265
0
        LOG_INFO("Going to cancel query {} since report exec status got rpc failed: {}",
2266
0
                 print_id(req.query_id), rpc_status.to_string());
2267
0
        req.cancel_fn(rpc_status);
2268
0
    }
2269
47.5k
}
2270
2271
431k
Status PipelineFragmentContext::send_report(bool done) {
2272
431k
    Status exec_status = _query_ctx->exec_status();
2273
    // If plan is done successfully, but _is_report_success is false,
2274
    // no need to send report.
2275
    // Load will set _is_report_success to true because load wants to know
2276
    // the process.
2277
431k
    if (!_is_report_success && done && exec_status.ok()) {
2278
383k
        return Status::OK();
2279
383k
    }
2280
2281
    // If both _is_report_success and _is_report_on_cancel are false,
2282
    // which means no matter query is success or failed, no report is needed.
2283
    // This may happen when the query limit reached and
2284
    // a internal cancellation being processed
2285
    // When limit is reached the fragment is also cancelled, but _is_report_on_cancel will
2286
    // be set to false, to avoid sending fault report to FE.
2287
47.8k
    if (!_is_report_success && !_is_report_on_cancel) {
2288
373
        if (done) {
2289
            // if done is true, which means the query is finished successfully, we can safely close the fragment instance without sending report to FE, and just return OK status here.
2290
373
            return Status::OK();
2291
373
        }
2292
0
        return Status::NeedSendAgain("");
2293
373
    }
2294
2295
47.5k
    std::vector<RuntimeState*> runtime_states;
2296
2297
123k
    for (auto& tasks : _tasks) {
2298
183k
        for (auto& task : tasks) {
2299
183k
            runtime_states.push_back(task.second.get());
2300
183k
        }
2301
123k
    }
2302
2303
47.5k
    std::string load_eror_url = _query_ctx->get_load_error_url().empty()
2304
47.5k
                                        ? get_load_error_url()
2305
18.4E
                                        : _query_ctx->get_load_error_url();
2306
47.5k
    std::string first_error_msg = _query_ctx->get_first_error_msg().empty()
2307
47.5k
                                          ? get_first_error_msg()
2308
18.4E
                                          : _query_ctx->get_first_error_msg();
2309
2310
47.5k
    ReportStatusRequest req {.status = exec_status,
2311
47.5k
                             .runtime_states = runtime_states,
2312
47.5k
                             .done = done || !exec_status.ok(),
2313
47.5k
                             .coord_addr = _query_ctx->coord_addr,
2314
47.5k
                             .query_id = _query_id,
2315
47.5k
                             .fragment_id = _fragment_id,
2316
47.5k
                             .fragment_instance_id = TUniqueId(),
2317
47.5k
                             .backend_num = -1,
2318
47.5k
                             .runtime_state = _runtime_state.get(),
2319
47.5k
                             .load_error_url = load_eror_url,
2320
47.5k
                             .first_error_msg = first_error_msg,
2321
47.5k
                             .cancel_fn = [this](const Status& reason) { cancel(reason); }};
2322
47.5k
    auto ctx = std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this());
2323
47.5k
    return _exec_env->fragment_mgr()->get_thread_pool()->submit_func([this, req, ctx]() {
2324
47.5k
        SCOPED_ATTACH_TASK(ctx->get_query_ctx()->query_mem_tracker());
2325
47.5k
        _coordinator_callback(req);
2326
47.5k
        if (!req.done) {
2327
4.69k
            ctx->refresh_next_report_time();
2328
4.69k
        }
2329
47.5k
    });
2330
47.8k
}
2331
2332
0
size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const {
2333
0
    size_t res = 0;
2334
    // _tasks will be cleared during ~PipelineFragmentContext, so that it's safe
2335
    // here to traverse the vector.
2336
0
    for (const auto& task_instances : _tasks) {
2337
0
        for (const auto& task : task_instances) {
2338
0
            if (task.first->is_running()) {
2339
0
                LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id)
2340
0
                                      << " is running, task: " << (void*)task.first.get()
2341
0
                                      << ", is_running: " << task.first->is_running();
2342
0
                *has_running_task = true;
2343
0
                return 0;
2344
0
            }
2345
2346
0
            size_t revocable_size = task.first->get_revocable_size();
2347
0
            if (revocable_size >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2348
0
                res += revocable_size;
2349
0
            }
2350
0
        }
2351
0
    }
2352
0
    return res;
2353
0
}
2354
2355
0
std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const {
2356
0
    std::vector<PipelineTask*> revocable_tasks;
2357
0
    for (const auto& task_instances : _tasks) {
2358
0
        for (const auto& task : task_instances) {
2359
0
            size_t revocable_size_ = task.first->get_revocable_size();
2360
2361
0
            if (revocable_size_ >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2362
0
                revocable_tasks.emplace_back(task.first.get());
2363
0
            }
2364
0
        }
2365
0
    }
2366
0
    return revocable_tasks;
2367
0
}
2368
2369
20
std::string PipelineFragmentContext::debug_string() {
2370
20
    std::lock_guard<std::mutex> l(_task_mutex);
2371
20
    fmt::memory_buffer debug_string_buffer;
2372
20
    fmt::format_to(debug_string_buffer,
2373
20
                   "PipelineFragmentContext Info: _closed_tasks={}, _total_tasks={}, "
2374
20
                   "need_notify_close={}, fragment_id={}, _rec_cte_stage={}\n",
2375
20
                   _closed_tasks, _total_tasks, _need_notify_close, _fragment_id, _rec_cte_stage);
2376
109
    for (size_t j = 0; j < _tasks.size(); j++) {
2377
89
        fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
2378
277
        for (size_t i = 0; i < _tasks[j].size(); i++) {
2379
188
            fmt::format_to(debug_string_buffer, "Task {}: {}\n", i,
2380
188
                           _tasks[j][i].first->debug_string());
2381
188
        }
2382
89
    }
2383
2384
20
    return fmt::to_string(debug_string_buffer);
2385
20
}
2386
2387
std::vector<std::shared_ptr<TRuntimeProfileTree>>
2388
2.53k
PipelineFragmentContext::collect_realtime_profile() const {
2389
2.53k
    std::vector<std::shared_ptr<TRuntimeProfileTree>> res;
2390
2391
    // we do not have mutex to protect pipeline_id_to_profile
2392
    // so we need to make sure this funciton is invoked after fragment context
2393
    // has already been prepared.
2394
2.53k
    if (!_prepared) {
2395
0
        std::string msg =
2396
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2397
0
        DCHECK(false) << msg;
2398
0
        LOG_ERROR(msg);
2399
0
        return res;
2400
0
    }
2401
2402
    // Make sure first profile is fragment level profile
2403
2.53k
    auto fragment_profile = std::make_shared<TRuntimeProfileTree>();
2404
2.53k
    _fragment_level_profile->to_thrift(fragment_profile.get(), _runtime_state->profile_level());
2405
2.53k
    res.push_back(fragment_profile);
2406
2407
    // pipeline_id_to_profile is initialized in prepare stage
2408
4.65k
    for (auto pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
2409
4.65k
        auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
2410
4.65k
        pipeline_profile->to_thrift(profile_ptr.get(), _runtime_state->profile_level());
2411
4.65k
        res.push_back(profile_ptr);
2412
4.65k
    }
2413
2414
2.53k
    return res;
2415
2.53k
}
2416
2417
std::shared_ptr<TRuntimeProfileTree>
2418
2.53k
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
2419
    // we do not have mutex to protect pipeline_id_to_profile
2420
    // so we need to make sure this funciton is invoked after fragment context
2421
    // has already been prepared.
2422
2.53k
    if (!_prepared) {
2423
0
        std::string msg =
2424
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2425
0
        DCHECK(false) << msg;
2426
0
        LOG_ERROR(msg);
2427
0
        return nullptr;
2428
0
    }
2429
2430
9.45k
    for (const auto& tasks : _tasks) {
2431
18.7k
        for (const auto& task : tasks) {
2432
18.7k
            if (task.second->load_channel_profile() == nullptr) {
2433
0
                continue;
2434
0
            }
2435
2436
18.7k
            auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2437
2438
18.7k
            task.second->load_channel_profile()->to_thrift(tmp_load_channel_profile.get(),
2439
18.7k
                                                           _runtime_state->profile_level());
2440
18.7k
            _runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
2441
18.7k
        }
2442
9.45k
    }
2443
2444
2.53k
    auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2445
2.53k
    _runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get(),
2446
2.53k
                                                      _runtime_state->profile_level());
2447
2.53k
    return load_channel_profile;
2448
2.53k
}
2449
2450
// Collect runtime filter IDs registered by all tasks in this PFC.
2451
// Used during recursive CTE stage transitions to know which filters to deregister
2452
// before creating the new PFC for the next recursion round.
2453
// Called from rerun_fragment(wait_for_destroy) while tasks are still closing.
2454
// Thread safety: safe because _tasks is structurally immutable after prepare() —
2455
// the vector sizes do not change, and individual RuntimeState filter sets are
2456
// written only during open() which has completed by the time we reach rerun.
2457
3.28k
std::set<int> PipelineFragmentContext::get_deregister_runtime_filter() const {
2458
3.28k
    std::set<int> result;
2459
7.19k
    for (const auto& _task : _tasks) {
2460
14.7k
        for (const auto& task : _task) {
2461
14.7k
            auto set = task.first->runtime_state()->get_deregister_runtime_filter();
2462
14.7k
            result.merge(set);
2463
14.7k
        }
2464
7.19k
    }
2465
3.28k
    if (_runtime_state) {
2466
3.28k
        auto set = _runtime_state->get_deregister_runtime_filter();
2467
3.28k
        result.merge(set);
2468
3.28k
    }
2469
3.28k
    return result;
2470
3.28k
}
2471
2472
431k
void PipelineFragmentContext::_release_resource() {
2473
431k
    std::lock_guard<std::mutex> l(_task_mutex);
2474
    // The memory released by the query end is recorded in the query mem tracker.
2475
431k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
2476
431k
    auto st = _query_ctx->exec_status();
2477
1.24M
    for (auto& _task : _tasks) {
2478
1.24M
        if (!_task.empty()) {
2479
1.24M
            _call_back(_task.front().first->runtime_state(), &st);
2480
1.24M
        }
2481
1.24M
    }
2482
431k
    _tasks.clear();
2483
431k
    _dag.clear();
2484
431k
    _pip_id_to_pipeline.clear();
2485
431k
    _pipelines.clear();
2486
431k
    _sink.reset();
2487
431k
    _root_op.reset();
2488
431k
    _runtime_filter_mgr_map.clear();
2489
431k
    _op_id_to_shared_state.clear();
2490
431k
}
2491
2492
} // namespace doris