Coverage Report

Created: 2026-06-25 13:43

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/pipeline/pipeline_fragment_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/pipeline/pipeline_fragment_context.h"
19
20
#include <gen_cpp/DataSinks_types.h>
21
#include <gen_cpp/FrontendService.h>
22
#include <gen_cpp/FrontendService_types.h>
23
#include <gen_cpp/PaloInternalService_types.h>
24
#include <gen_cpp/PlanNodes_types.h>
25
#include <pthread.h>
26
27
#include <algorithm>
28
#include <cstdlib>
29
// IWYU pragma: no_include <bits/chrono.h>
30
#include <fmt/format.h>
31
#include <thrift/Thrift.h>
32
#include <thrift/protocol/TDebugProtocol.h>
33
#include <thrift/transport/TTransportException.h>
34
35
#include <chrono> // IWYU pragma: keep
36
#include <map>
37
#include <memory>
38
#include <ostream>
39
#include <utility>
40
41
#include "cloud/config.h"
42
#include "common/cast_set.h"
43
#include "common/config.h"
44
#include "common/exception.h"
45
#include "common/logging.h"
46
#include "common/status.h"
47
#include "exec/exchange/local_exchange_sink_operator.h"
48
#include "exec/exchange/local_exchange_source_operator.h"
49
#include "exec/exchange/local_exchanger.h"
50
#include "exec/exchange/vdata_stream_mgr.h"
51
#include "exec/operator/aggregation_sink_operator.h"
52
#include "exec/operator/aggregation_source_operator.h"
53
#include "exec/operator/analytic_sink_operator.h"
54
#include "exec/operator/analytic_source_operator.h"
55
#include "exec/operator/assert_num_rows_operator.h"
56
#include "exec/operator/blackhole_sink_operator.h"
57
#include "exec/operator/bucketed_aggregation_sink_operator.h"
58
#include "exec/operator/bucketed_aggregation_source_operator.h"
59
#include "exec/operator/cache_sink_operator.h"
60
#include "exec/operator/cache_source_operator.h"
61
#include "exec/operator/datagen_operator.h"
62
#include "exec/operator/dict_sink_operator.h"
63
#include "exec/operator/distinct_streaming_aggregation_operator.h"
64
#include "exec/operator/empty_set_operator.h"
65
#include "exec/operator/exchange_sink_operator.h"
66
#include "exec/operator/exchange_source_operator.h"
67
#include "exec/operator/file_scan_operator.h"
68
#include "exec/operator/group_commit_block_sink_operator.h"
69
#include "exec/operator/group_commit_scan_operator.h"
70
#include "exec/operator/hashjoin_build_sink.h"
71
#include "exec/operator/hashjoin_probe_operator.h"
72
#include "exec/operator/hive_table_sink_operator.h"
73
#include "exec/operator/iceberg_delete_sink_operator.h"
74
#include "exec/operator/iceberg_merge_sink_operator.h"
75
#include "exec/operator/iceberg_table_sink_operator.h"
76
#include "exec/operator/jdbc_scan_operator.h"
77
#include "exec/operator/jdbc_table_sink_operator.h"
78
#include "exec/operator/local_merge_sort_source_operator.h"
79
#include "exec/operator/materialization_opertor.h"
80
#include "exec/operator/maxcompute_table_sink_operator.h"
81
#include "exec/operator/memory_scratch_sink_operator.h"
82
#include "exec/operator/meta_scan_operator.h"
83
#include "exec/operator/multi_cast_data_stream_sink.h"
84
#include "exec/operator/multi_cast_data_stream_source.h"
85
#include "exec/operator/nested_loop_join_build_operator.h"
86
#include "exec/operator/nested_loop_join_probe_operator.h"
87
#include "exec/operator/olap_scan_operator.h"
88
#include "exec/operator/olap_table_sink_operator.h"
89
#include "exec/operator/olap_table_sink_v2_operator.h"
90
#include "exec/operator/partition_sort_sink_operator.h"
91
#include "exec/operator/partition_sort_source_operator.h"
92
#include "exec/operator/partitioned_aggregation_sink_operator.h"
93
#include "exec/operator/partitioned_aggregation_source_operator.h"
94
#include "exec/operator/partitioned_hash_join_probe_operator.h"
95
#include "exec/operator/partitioned_hash_join_sink_operator.h"
96
#include "exec/operator/rec_cte_anchor_sink_operator.h"
97
#include "exec/operator/rec_cte_scan_operator.h"
98
#include "exec/operator/rec_cte_sink_operator.h"
99
#include "exec/operator/rec_cte_source_operator.h"
100
#include "exec/operator/repeat_operator.h"
101
#include "exec/operator/result_file_sink_operator.h"
102
#include "exec/operator/result_sink_operator.h"
103
#include "exec/operator/schema_scan_operator.h"
104
#include "exec/operator/select_operator.h"
105
#include "exec/operator/set_probe_sink_operator.h"
106
#include "exec/operator/set_sink_operator.h"
107
#include "exec/operator/set_source_operator.h"
108
#include "exec/operator/sort_sink_operator.h"
109
#include "exec/operator/sort_source_operator.h"
110
#include "exec/operator/spill_iceberg_table_sink_operator.h"
111
#include "exec/operator/spill_sort_sink_operator.h"
112
#include "exec/operator/spill_sort_source_operator.h"
113
#include "exec/operator/streaming_aggregation_operator.h"
114
#include "exec/operator/table_function_operator.h"
115
#include "exec/operator/tvf_table_sink_operator.h"
116
#include "exec/operator/union_sink_operator.h"
117
#include "exec/operator/union_source_operator.h"
118
#include "exec/pipeline/dependency.h"
119
#include "exec/pipeline/pipeline_task.h"
120
#include "exec/pipeline/task_scheduler.h"
121
#include "exec/runtime_filter/runtime_filter_mgr.h"
122
#include "exec/sort/topn_sorter.h"
123
#include "exec/spill/spill_file.h"
124
#include "io/fs/stream_load_pipe.h"
125
#include "load/stream_load/new_load_stream_mgr.h"
126
#include "runtime/exec_env.h"
127
#include "runtime/fragment_mgr.h"
128
#include "runtime/result_buffer_mgr.h"
129
#include "runtime/runtime_state.h"
130
#include "runtime/thread_context.h"
131
#include "service/backend_options.h"
132
#include "util/client_cache.h"
133
#include "util/countdown_latch.h"
134
#include "util/debug_util.h"
135
#include "util/network_util.h"
136
#include "util/uid_util.h"
137
138
namespace doris {
139
PipelineFragmentContext::PipelineFragmentContext(
140
        TUniqueId query_id, const TPipelineFragmentParams& request,
141
        std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
142
        const std::function<void(RuntimeState*, Status*)>& call_back)
143
27
        : _query_id(std::move(query_id)),
144
27
          _fragment_id(request.fragment_id),
145
27
          _exec_env(exec_env),
146
27
          _query_ctx(std::move(query_ctx)),
147
27
          _call_back(call_back),
148
27
          _is_report_on_cancel(true),
149
27
          _params(request),
150
27
          _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0),
151
27
          _need_notify_close(request.__isset.need_notify_close ? request.need_notify_close
152
27
                                                               : false) {
153
27
    _fragment_watcher.start();
154
27
}
155
156
27
PipelineFragmentContext::~PipelineFragmentContext() {
157
27
    LOG_INFO("PipelineFragmentContext::~PipelineFragmentContext")
158
27
            .tag("query_id", print_id(_query_id))
159
27
            .tag("fragment_id", _fragment_id);
160
27
    _release_resource();
161
27
    {
162
        // The memory released by the query end is recorded in the query mem tracker.
163
27
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
164
27
        _runtime_state.reset();
165
27
        _query_ctx.reset();
166
27
    }
167
27
}
168
169
0
bool PipelineFragmentContext::is_timeout(timespec now) const {
170
0
    if (_timeout <= 0) {
171
0
        return false;
172
0
    }
173
0
    return _fragment_watcher.elapsed_time_seconds(now) > _timeout;
174
0
}
175
176
// notify_close() transitions the PFC from "waiting for external close notification" to
177
// "self-managed close". For recursive CTE fragments, the old PFC is kept alive until
178
// the rerun_fragment(wait_for_destroy) RPC calls this to trigger shutdown.
179
// Returns true if all tasks have already closed (i.e., the PFC can be safely destroyed).
180
0
bool PipelineFragmentContext::notify_close() {
181
0
    bool all_closed = false;
182
0
    bool need_remove = false;
183
0
    {
184
0
        std::lock_guard<std::mutex> l(_task_mutex);
185
0
        if (_closed_tasks >= _total_tasks) {
186
0
            if (_need_notify_close) {
187
                // Fragment was cancelled and waiting for notify to close.
188
                // Record that we need to remove from fragment mgr, but do it
189
                // after releasing _task_mutex to avoid ABBA deadlock with
190
                // dump_pipeline_tasks() (which acquires _pipeline_map lock
191
                // first, then _task_mutex via debug_string()).
192
0
                need_remove = true;
193
0
            }
194
0
            all_closed = true;
195
0
        }
196
        // make fragment release by self after cancel
197
0
        _need_notify_close = false;
198
0
    }
199
0
    if (need_remove) {
200
0
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
201
0
    }
202
0
    return all_closed;
203
0
}
204
205
// Must not add lock in this method. Because it will call query ctx cancel. And
206
// QueryCtx cancel will call fragment ctx cancel. And Also Fragment ctx's running
207
// Method like exchange sink buffer will call query ctx cancel. If we add lock here
208
// There maybe dead lock.
209
0
void PipelineFragmentContext::cancel(const Status reason) {
210
0
    LOG_INFO("PipelineFragmentContext::cancel")
211
0
            .tag("query_id", print_id(_query_id))
212
0
            .tag("fragment_id", _fragment_id)
213
0
            .tag("reason", reason.to_string());
214
0
    if (notify_close()) {
215
0
        return;
216
0
    }
217
    // Timeout is a special error code, we need print current stack to debug timeout issue.
218
0
    if (reason.is<ErrorCode::TIMEOUT>()) {
219
0
        auto dbg_str = fmt::format("PipelineFragmentContext is cancelled due to timeout:\n{}",
220
0
                                   debug_string());
221
0
        LOG_LONG_STRING(WARNING, dbg_str);
222
0
    }
223
224
    // `ILLEGAL_STATE` means queries this fragment belongs to was not found in FE (maybe finished)
225
0
    if (reason.is<ErrorCode::ILLEGAL_STATE>()) {
226
0
        LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}",
227
0
                    debug_string());
228
0
    }
229
230
0
    if (reason.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || reason.is<ErrorCode::MEM_ALLOC_FAILED>()) {
231
0
        print_profile("cancel pipeline, reason: " + reason.to_string());
232
0
    }
233
234
0
    if (auto error_url = get_load_error_url(); !error_url.empty()) {
235
0
        _query_ctx->set_load_error_url(error_url);
236
0
    }
237
238
0
    if (auto first_error_msg = get_first_error_msg(); !first_error_msg.empty()) {
239
0
        _query_ctx->set_first_error_msg(first_error_msg);
240
0
    }
241
242
0
    _query_ctx->cancel(reason, _fragment_id);
243
0
    if (reason.is<ErrorCode::LIMIT_REACH>()) {
244
0
        _is_report_on_cancel = false;
245
0
    } else {
246
0
        for (auto& id : _fragment_instance_ids) {
247
0
            LOG(WARNING) << "PipelineFragmentContext cancel instance: " << print_id(id);
248
0
        }
249
0
    }
250
    // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
251
    // For stream load the fragment's query_id == load id, it is set in FE.
252
0
    auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
253
0
    if (stream_load_ctx != nullptr) {
254
0
        stream_load_ctx->pipe->cancel(reason.to_string());
255
        // Set error URL here because after pipe is cancelled, stream load execution may return early.
256
        // We need to set the error URL at this point to ensure error information is properly
257
        // propagated to the client.
258
0
        stream_load_ctx->error_url = get_load_error_url();
259
0
        stream_load_ctx->first_error_msg = get_first_error_msg();
260
0
    }
261
262
0
    for (auto& tasks : _tasks) {
263
0
        for (auto& task : tasks) {
264
0
            task.first->unblock_all_dependencies();
265
0
        }
266
0
    }
267
0
}
268
269
0
PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
270
0
    PipelineId id = _next_pipeline_id++;
271
0
    auto pipeline = std::make_shared<Pipeline>(
272
0
            id, parent ? std::min(parent->num_tasks(), _num_instances) : _num_instances,
273
0
            parent ? parent->num_tasks() : _num_instances);
274
0
    if (idx >= 0) {
275
0
        _pipelines.insert(_pipelines.begin() + idx, pipeline);
276
0
    } else {
277
0
        _pipelines.emplace_back(pipeline);
278
0
    }
279
0
    if (parent) {
280
0
        parent->set_children(pipeline);
281
0
    }
282
0
    return pipeline;
283
0
}
284
285
0
Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thread_pool) {
286
0
    {
287
0
        SCOPED_TIMER(_build_pipelines_timer);
288
        // 2. Build pipelines with operators in this fragment.
289
0
        auto root_pipeline = add_pipeline();
290
0
        RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl,
291
0
                                         &_root_op, root_pipeline));
292
293
        // Propagate _num_instances from LOCAL_EXCHANGE pipelines to ancestor pipelines
294
        // that inherited reduced num_tasks from a serial operator.
295
0
        _propagate_local_exchange_num_tasks();
296
297
        // Create deferred local exchangers now that all pipelines have final num_tasks.
298
0
        RETURN_IF_ERROR(_create_deferred_local_exchangers());
299
300
        // Raise num_tasks for pipelines whose serial non-scan operators (e.g.,
301
        // UNPARTITIONED Exchange) reduced num_tasks below _num_instances.
302
        // Without this, fragment instances 1+ have no task for these pipelines
303
        // and downstream operators fail with "must set shared state".
304
        //
305
        // This applies to ALL pipelines (not just deferred exchanger upstreams):
306
        // fragments with UNION/INTERSECT/EXCEPT + serial Exchange in child
307
        // pipelines also need the raise, even without FE-planned local exchange.
308
        //
309
        // Exception: serial scan sources (pooling scan) keep num_tasks=1 — the
310
        // PassthroughExchanger(1, N) handles the fan-out correctly.
311
        // NOTE: Do NOT raise pipelines whose source is a serial operator
312
        // (Exchange or scan) — they legitimately have 1 task, and raising
313
        // them causes crashes (e.g., 4 Exchange tasks but only 1 receives
314
        // data).  The correct fix for shared state injection across
315
        // instances is handled by the FE: it inserts local exchange nodes
316
        // between serial operators and their downstream consumers, creating
317
        // proper pipeline boundaries with _num_instances tasks.
318
319
        // 3. Create sink operator
320
0
        if (!_params.fragment.__isset.output_sink) {
321
0
            return Status::InternalError("No output sink in this fragment!");
322
0
        }
323
0
        RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), _params.fragment.output_sink,
324
0
                                          _params.fragment.output_exprs, _params,
325
0
                                          root_pipeline->output_row_desc(), _runtime_state.get(),
326
0
                                          *_desc_tbl, root_pipeline->id()));
327
0
        RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink));
328
0
        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
329
330
0
        for (PipelinePtr& pipeline : _pipelines) {
331
0
            DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size();
332
0
            RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
333
0
        }
334
0
    }
335
    // 4. Build local exchanger
336
0
    if (_runtime_state->plan_local_shuffle()) {
337
0
        SCOPED_TIMER(_plan_local_exchanger_timer);
338
0
        RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets,
339
0
                                             _params.bucket_seq_to_instance_idx,
340
0
                                             _params.shuffle_idx_to_instance_idx));
341
0
    }
342
343
    // 5. Initialize global states in pipelines.
344
0
    for (PipelinePtr& pipeline : _pipelines) {
345
0
        SCOPED_TIMER(_prepare_all_pipelines_timer);
346
0
        pipeline->children().clear();
347
0
        RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
348
0
    }
349
350
0
    {
351
0
        SCOPED_TIMER(_build_tasks_timer);
352
        // 6. Build pipeline tasks and initialize local state.
353
0
        RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool));
354
0
    }
355
356
0
    return Status::OK();
357
0
}
358
359
0
Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) {
360
0
    if (_prepared) {
361
0
        return Status::InternalError("Already prepared");
362
0
    }
363
0
    if (_params.__isset.query_options && _params.query_options.__isset.execution_timeout) {
364
0
        _timeout = _params.query_options.execution_timeout;
365
0
    }
366
367
0
    _fragment_level_profile = std::make_unique<RuntimeProfile>("PipelineContext");
368
0
    _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime");
369
0
    SCOPED_TIMER(_prepare_timer);
370
0
    _build_pipelines_timer = ADD_TIMER(_fragment_level_profile, "BuildPipelinesTime");
371
0
    _init_context_timer = ADD_TIMER(_fragment_level_profile, "InitContextTime");
372
0
    _plan_local_exchanger_timer = ADD_TIMER(_fragment_level_profile, "PlanLocalLocalExchangerTime");
373
0
    _build_tasks_timer = ADD_TIMER(_fragment_level_profile, "BuildTasksTime");
374
0
    _prepare_all_pipelines_timer = ADD_TIMER(_fragment_level_profile, "PrepareAllPipelinesTime");
375
0
    {
376
0
        SCOPED_TIMER(_init_context_timer);
377
0
        cast_set(_num_instances, _params.local_params.size());
378
0
        _total_instances =
379
0
                _params.__isset.total_instances ? _params.total_instances : _num_instances;
380
381
0
        auto* fragment_context = this;
382
383
0
        if (_params.query_options.__isset.is_report_success) {
384
0
            fragment_context->set_is_report_success(_params.query_options.is_report_success);
385
0
        }
386
387
        // 1. Set up the global runtime state.
388
0
        _runtime_state = RuntimeState::create_unique(
389
0
                _params.query_id, _params.fragment_id, _params.query_options,
390
0
                _query_ctx->query_globals, _exec_env, _query_ctx.get());
391
0
        _runtime_state->set_task_execution_context(shared_from_this());
392
0
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
393
0
        if (_params.__isset.backend_id) {
394
0
            _runtime_state->set_backend_id(_params.backend_id);
395
0
        }
396
0
        if (_params.__isset.import_label) {
397
0
            _runtime_state->set_import_label(_params.import_label);
398
0
        }
399
0
        if (_params.__isset.db_name) {
400
0
            _runtime_state->set_db_name(_params.db_name);
401
0
        }
402
0
        if (_params.__isset.load_job_id) {
403
0
            _runtime_state->set_load_job_id(_params.load_job_id);
404
0
        }
405
406
0
        if (_params.is_simplified_param) {
407
0
            _desc_tbl = _query_ctx->desc_tbl;
408
0
        } else {
409
0
            DCHECK(_params.__isset.desc_tbl);
410
0
            RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), _params.desc_tbl,
411
0
                                                  &_desc_tbl));
412
0
        }
413
0
        _runtime_state->set_desc_tbl(_desc_tbl);
414
0
        _runtime_state->set_num_per_fragment_instances(_params.num_senders);
415
0
        _runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
416
0
        _runtime_state->set_total_load_streams(_params.total_load_streams);
417
0
        _runtime_state->set_num_local_sink(_params.num_local_sink);
418
419
        // init fragment_instance_ids
420
0
        const auto target_size = _params.local_params.size();
421
0
        _fragment_instance_ids.resize(target_size);
422
0
        for (size_t i = 0; i < _params.local_params.size(); i++) {
423
0
            auto fragment_instance_id = _params.local_params[i].fragment_instance_id;
424
0
            _fragment_instance_ids[i] = fragment_instance_id;
425
0
        }
426
0
    }
427
428
0
    RETURN_IF_ERROR(_build_and_prepare_full_pipeline(thread_pool));
429
430
0
    _init_next_report_time();
431
432
0
    _prepared = true;
433
0
    return Status::OK();
434
0
}
435
436
Status PipelineFragmentContext::_build_pipeline_tasks_for_instance(
437
        int instance_idx,
438
0
        const std::vector<std::shared_ptr<RuntimeProfile>>& pipeline_id_to_profile) {
439
0
    const auto& local_params = _params.local_params[instance_idx];
440
0
    auto fragment_instance_id = local_params.fragment_instance_id;
441
0
    auto runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(false);
442
0
    std::map<PipelineId, PipelineTask*> pipeline_id_to_task;
443
0
    auto get_shared_state = [&](PipelinePtr pipeline)
444
0
            -> std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
445
0
                                       std::vector<std::shared_ptr<Dependency>>>> {
446
0
        std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
447
0
                                std::vector<std::shared_ptr<Dependency>>>>
448
0
                shared_state_map;
449
0
        for (auto& op : pipeline->operators()) {
450
0
            auto source_id = op->operator_id();
451
0
            if (auto iter = _op_id_to_shared_state.find(source_id);
452
0
                iter != _op_id_to_shared_state.end()) {
453
0
                shared_state_map.insert({source_id, iter->second});
454
0
            }
455
0
        }
456
0
        for (auto sink_to_source_id : pipeline->sink()->dests_id()) {
457
0
            if (auto iter = _op_id_to_shared_state.find(sink_to_source_id);
458
0
                iter != _op_id_to_shared_state.end()) {
459
0
                shared_state_map.insert({sink_to_source_id, iter->second});
460
0
            }
461
0
        }
462
0
        return shared_state_map;
463
0
    };
464
465
0
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
466
0
        auto& pipeline = _pipelines[pip_idx];
467
0
        if (pipeline->num_tasks() > 1 || instance_idx == 0) {
468
0
            auto task_runtime_state = RuntimeState::create_unique(
469
0
                    local_params.fragment_instance_id, _params.query_id, _params.fragment_id,
470
0
                    _params.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get());
471
0
            {
472
                // Initialize runtime state for this task
473
0
                task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker());
474
475
0
                task_runtime_state->set_task_execution_context(shared_from_this());
476
0
                task_runtime_state->set_be_number(local_params.backend_num);
477
478
0
                if (_params.__isset.backend_id) {
479
0
                    task_runtime_state->set_backend_id(_params.backend_id);
480
0
                }
481
0
                if (_params.__isset.import_label) {
482
0
                    task_runtime_state->set_import_label(_params.import_label);
483
0
                }
484
0
                if (_params.__isset.db_name) {
485
0
                    task_runtime_state->set_db_name(_params.db_name);
486
0
                }
487
0
                if (_params.__isset.load_job_id) {
488
0
                    task_runtime_state->set_load_job_id(_params.load_job_id);
489
0
                }
490
0
                if (_params.__isset.wal_id) {
491
0
                    task_runtime_state->set_wal_id(_params.wal_id);
492
0
                }
493
0
                if (_params.__isset.content_length) {
494
0
                    task_runtime_state->set_content_length(_params.content_length);
495
0
                }
496
497
0
                task_runtime_state->set_desc_tbl(_desc_tbl);
498
0
                task_runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
499
0
                task_runtime_state->set_num_per_fragment_instances(_params.num_senders);
500
0
                task_runtime_state->resize_op_id_to_local_state(max_operator_id());
501
0
                task_runtime_state->set_max_operator_id(max_operator_id());
502
0
                task_runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
503
0
                task_runtime_state->set_total_load_streams(_params.total_load_streams);
504
0
                task_runtime_state->set_num_local_sink(_params.num_local_sink);
505
506
0
                task_runtime_state->set_runtime_filter_mgr(runtime_filter_mgr.get());
507
0
            }
508
0
            auto cur_task_id = _total_tasks++;
509
0
            task_runtime_state->set_task_id(cur_task_id);
510
0
            task_runtime_state->set_task_num(pipeline->num_tasks());
511
0
            auto task = std::make_shared<PipelineTask>(
512
0
                    pipeline, cur_task_id, task_runtime_state.get(),
513
0
                    std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
514
0
                    pipeline_id_to_profile[pip_idx].get(), get_shared_state(pipeline),
515
0
                    instance_idx);
516
0
            pipeline->incr_created_tasks(instance_idx, task.get());
517
0
            pipeline_id_to_task.insert({pipeline->id(), task.get()});
518
0
            _tasks[instance_idx].emplace_back(
519
0
                    std::pair<std::shared_ptr<PipelineTask>, std::unique_ptr<RuntimeState>> {
520
0
                            std::move(task), std::move(task_runtime_state)});
521
0
        }
522
0
    }
523
524
    /**
525
         * Build DAG for pipeline tasks.
526
         * For example, we have
527
         *
528
         *   ExchangeSink (Pipeline1)     JoinBuildSink (Pipeline2)
529
         *            \                      /
530
         *          JoinProbeOperator1 (Pipeline1)    JoinBuildSink (Pipeline3)
531
         *                 \                          /
532
         *               JoinProbeOperator2 (Pipeline1)
533
         *
534
         * In this fragment, we have three pipelines and pipeline 1 depends on pipeline 2 and pipeline 3.
535
         * To build this DAG, `_dag` manage dependencies between pipelines by pipeline ID and
536
         * `pipeline_id_to_task` is used to find the task by a unique pipeline ID.
537
         *
538
         * Finally, we have two upstream dependencies in Pipeline1 corresponding to JoinProbeOperator1
539
         * and JoinProbeOperator2.
540
         */
541
0
    for (auto& _pipeline : _pipelines) {
542
0
        if (pipeline_id_to_task.contains(_pipeline->id())) {
543
0
            auto* task = pipeline_id_to_task[_pipeline->id()];
544
0
            DCHECK(task != nullptr);
545
546
            // If this task has upstream dependency, then inject it into this task.
547
0
            if (_dag.contains(_pipeline->id())) {
548
0
                auto& deps = _dag[_pipeline->id()];
549
0
                for (auto& dep : deps) {
550
0
                    if (pipeline_id_to_task.contains(dep)) {
551
0
                        auto ss = pipeline_id_to_task[dep]->get_sink_shared_state();
552
0
                        if (ss) {
553
0
                            task->inject_shared_state(ss);
554
0
                        } else {
555
0
                            pipeline_id_to_task[dep]->inject_shared_state(
556
0
                                    task->get_source_shared_state());
557
0
                        }
558
0
                    }
559
0
                }
560
0
            }
561
0
        }
562
0
    }
563
0
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
564
0
        if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
565
0
            auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
566
0
            DCHECK(pipeline_id_to_profile[pip_idx]);
567
0
            std::vector<TScanRangeParams> scan_ranges;
568
0
            auto node_id = _pipelines[pip_idx]->operators().front()->node_id();
569
0
            if (local_params.per_node_scan_ranges.contains(node_id)) {
570
0
                scan_ranges = local_params.per_node_scan_ranges.find(node_id)->second;
571
0
            }
572
0
            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(task->prepare(scan_ranges, local_params.sender_id,
573
0
                                                             _params.fragment.output_sink));
574
0
        }
575
0
    }
576
0
    {
577
0
        std::lock_guard<std::mutex> l(_state_map_lock);
578
0
        _runtime_filter_mgr_map[instance_idx] = std::move(runtime_filter_mgr);
579
0
    }
580
0
    return Status::OK();
581
0
}
582
583
0
Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
584
0
    _total_tasks = 0;
585
0
    _closed_tasks = 0;
586
0
    const auto target_size = _params.local_params.size();
587
0
    _tasks.resize(target_size);
588
0
    _runtime_filter_mgr_map.resize(target_size);
589
0
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
590
0
        _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get();
591
0
    }
592
0
    auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size());
593
594
0
    if (target_size > 1 &&
595
0
        (_runtime_state->query_options().__isset.parallel_prepare_threshold &&
596
0
         target_size > _runtime_state->query_options().parallel_prepare_threshold)) {
597
        // If instances parallelism is big enough ( > parallel_prepare_threshold), we will prepare all tasks by multi-threads
598
0
        std::vector<Status> prepare_status(target_size);
599
0
        int submitted_tasks = 0;
600
0
        Status submit_status;
601
0
        CountDownLatch latch((int)target_size);
602
0
        for (int i = 0; i < target_size; i++) {
603
0
            submit_status = thread_pool->submit_func([&, i]() {
604
0
                SCOPED_ATTACH_TASK(_query_ctx.get());
605
0
                prepare_status[i] = _build_pipeline_tasks_for_instance(i, pipeline_id_to_profile);
606
0
                latch.count_down();
607
0
            });
608
0
            if (LIKELY(submit_status.ok())) {
609
0
                submitted_tasks++;
610
0
            } else {
611
0
                break;
612
0
            }
613
0
        }
614
0
        latch.arrive_and_wait(target_size - submitted_tasks);
615
0
        if (UNLIKELY(!submit_status.ok())) {
616
0
            return submit_status;
617
0
        }
618
0
        for (int i = 0; i < submitted_tasks; i++) {
619
0
            if (!prepare_status[i].ok()) {
620
0
                return prepare_status[i];
621
0
            }
622
0
        }
623
0
    } else {
624
0
        for (int i = 0; i < target_size; i++) {
625
0
            RETURN_IF_ERROR(_build_pipeline_tasks_for_instance(i, pipeline_id_to_profile));
626
0
        }
627
0
    }
628
0
    _pipeline_parent_map.clear();
629
0
    _op_id_to_shared_state.clear();
630
    // Record task cardinality once when this fragment context finishes task initialization.
631
0
    _query_ctx->add_total_task_num(_total_tasks.load(std::memory_order_relaxed));
632
633
0
    return Status::OK();
634
0
}
635
636
0
void PipelineFragmentContext::_init_next_report_time() {
637
0
    auto interval_s = config::pipeline_status_report_interval;
638
0
    if (_is_report_success && interval_s > 0 && _timeout > interval_s) {
639
0
        VLOG_FILE << "enable period report: fragment id=" << _fragment_id;
640
0
        uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC;
641
        // We don't want to wait longer than it takes to run the entire fragment.
642
0
        _previous_report_time =
643
0
                MonotonicNanos() + report_fragment_offset - (uint64_t)(interval_s)*NANOS_PER_SEC;
644
0
        _disable_period_report = false;
645
0
    }
646
0
}
647
648
0
void PipelineFragmentContext::refresh_next_report_time() {
649
0
    auto disable = _disable_period_report.load(std::memory_order_acquire);
650
0
    DCHECK(disable == true);
651
0
    _previous_report_time.store(MonotonicNanos(), std::memory_order_release);
652
0
    _disable_period_report.compare_exchange_strong(disable, false);
653
0
}
654
655
0
void PipelineFragmentContext::trigger_report_if_necessary() {
656
0
    if (!_is_report_success) {
657
0
        return;
658
0
    }
659
0
    auto disable = _disable_period_report.load(std::memory_order_acquire);
660
0
    if (disable) {
661
0
        return;
662
0
    }
663
0
    int32_t interval_s = config::pipeline_status_report_interval;
664
0
    if (interval_s <= 0) {
665
0
        LOG(WARNING) << "config::status_report_interval is equal to or less than zero, do not "
666
0
                        "trigger "
667
0
                        "report.";
668
0
    }
669
0
    uint64_t next_report_time = _previous_report_time.load(std::memory_order_acquire) +
670
0
                                (uint64_t)(interval_s)*NANOS_PER_SEC;
671
0
    if (MonotonicNanos() > next_report_time) {
672
0
        if (!_disable_period_report.compare_exchange_strong(disable, true,
673
0
                                                            std::memory_order_acq_rel)) {
674
0
            return;
675
0
        }
676
0
        if (VLOG_FILE_IS_ON) {
677
0
            VLOG_FILE << "Reporting "
678
0
                      << "profile for query_id " << print_id(_query_id)
679
0
                      << ", fragment id: " << _fragment_id;
680
681
0
            std::stringstream ss;
682
0
            _runtime_state->runtime_profile()->compute_time_in_profile();
683
0
            _runtime_state->runtime_profile()->pretty_print(&ss);
684
0
            if (_runtime_state->load_channel_profile()) {
685
0
                _runtime_state->load_channel_profile()->pretty_print(&ss);
686
0
            }
687
688
0
            VLOG_FILE << "Query " << print_id(get_query_id()) << " fragment " << get_fragment_id()
689
0
                      << " profile:\n"
690
0
                      << ss.str();
691
0
        }
692
0
        auto st = send_report(false);
693
0
        if (!st.ok()) {
694
0
            disable = true;
695
0
            _disable_period_report.compare_exchange_strong(disable, false,
696
0
                                                           std::memory_order_acq_rel);
697
0
        }
698
0
    }
699
0
}
700
701
Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const DescriptorTbl& descs,
702
0
                                                 OperatorPtr* root, PipelinePtr cur_pipe) {
703
0
    if (_params.fragment.plan.nodes.empty()) {
704
0
        throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no plan node!");
705
0
    }
706
707
0
    int node_idx = 0;
708
709
0
    RETURN_IF_ERROR(_create_tree_helper(pool, _params.fragment.plan.nodes, descs, nullptr,
710
0
                                        &node_idx, root, cur_pipe, 0, false, false));
711
712
0
    if (node_idx + 1 != _params.fragment.plan.nodes.size()) {
713
0
        return Status::InternalError(
714
0
                "Plan tree only partially reconstructed. Not all thrift nodes were used.");
715
0
    }
716
0
    return Status::OK();
717
0
}
718
719
0
Status PipelineFragmentContext::_create_deferred_local_exchangers() {
720
0
    for (auto& info : _deferred_exchangers) {
721
        // DANGER ZONE — do not "fix" this line without reading the history.
722
        //
723
        // sender_count seeds Exchanger::_running_sink_operators, which the source side
724
        // waits to reach 0 via sub_running_sink_operators on each sink LocalState close.
725
        // The correct value is THIS pipeline-instance's sink task count, which is exactly
726
        // info.upstream_pipe->num_tasks() — one PipelineTask per task, one close per task.
727
        //
728
        // Tempting wrong fix #1: `std::max(num_tasks, _num_instances)` to mirror the
729
        //   BE-planned path in _add_local_exchange_impl (~line 1023).  THIS BREAKS the
730
        //   common FE-planned shape of `serial scan → LE(PT) → ...`: upstream_pipe
731
        //   genuinely has num_tasks=1, only 1 close arrives, but seed becomes
732
        //   _num_instances so _running_sink_operators never reaches 0 — downstream
733
        //   sources hang on SHUFFLE_DATA_DEPENDENCY (e.g. MTMV refresh from
734
        //   mtmv_up_down_job_p0/load.groovy stays at Status=RUNNING and regressed
735
        //   exactly this way).  BE-planned mode uses max() because its
736
        //   `cur_pipe` is the source-side pipeline (always raised to _num_instances by
737
        //   add_pipeline) — not analogous to our `upstream_pipe` here, which is the
738
        //   sink-side pipeline that may legitimately stay at 1 for serial sources.
739
        //
740
        // Tempting wrong fix #2: multiply by _num_instances on the theory shared_state
741
        //   is shared across all instances.  Same hang — each fragment-instance
742
        //   PipelineFragmentContext has its OWN _op_id_to_shared_state map, so the
743
        //   exchanger is per-instance, not per-BE.  num_tasks() is already the right
744
        //   close-count for one instance.
745
        //
746
        // If a hang shows up with `_running_sink_operators < 0`, the bug is upstream:
747
        // _propagate_local_exchange_num_tasks left num_tasks too low (or too high) for
748
        // this fragment shape.  Fix THAT pass, not this seed value.
749
0
        const int sender_count = info.upstream_pipe->num_tasks();
750
0
        switch (info.partition_type) {
751
0
        case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
752
0
        case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
753
0
            info.shared_state->exchanger = ShuffleExchanger::create_unique(
754
0
                    sender_count, _num_instances, info.num_partitions, info.free_blocks_limit,
755
0
                    info.partition_type);
756
0
            break;
757
0
        case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
758
0
            info.shared_state->exchanger = BucketShuffleExchanger::create_unique(
759
0
                    sender_count, _num_instances, info.num_partitions, info.free_blocks_limit);
760
0
            break;
761
0
        case TLocalPartitionType::PASSTHROUGH:
762
0
            info.shared_state->exchanger = PassthroughExchanger::create_unique(
763
0
                    sender_count, _num_instances, info.free_blocks_limit);
764
0
            break;
765
0
        case TLocalPartitionType::BROADCAST:
766
0
            info.shared_state->exchanger = BroadcastExchanger::create_unique(
767
0
                    sender_count, _num_instances, info.free_blocks_limit);
768
0
            break;
769
0
        case TLocalPartitionType::PASS_TO_ONE:
770
0
            if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
771
0
                info.shared_state->exchanger = PassToOneExchanger::create_unique(
772
0
                        sender_count, _num_instances, info.free_blocks_limit);
773
0
            } else {
774
0
                info.shared_state->exchanger = BroadcastExchanger::create_unique(
775
0
                        sender_count, _num_instances, info.free_blocks_limit);
776
0
            }
777
0
            break;
778
0
        case TLocalPartitionType::ADAPTIVE_PASSTHROUGH:
779
0
            info.shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
780
0
                    sender_count, _num_instances, info.free_blocks_limit);
781
0
            break;
782
0
        case TLocalPartitionType::NOOP:
783
0
        case TLocalPartitionType::LOCAL_MERGE_SORT:
784
            // FE-planned LocalExchangeNode currently never emits NOOP or LOCAL_MERGE_SORT
785
            // through the deferred-exchanger path.  NOOP means "no exchange needed" and
786
            // is filtered out before reaching here; LOCAL_MERGE_SORT is planned by the
787
            // legacy BE path only.  Crash in debug to surface the protocol violation if
788
            // that ever changes; return an error in release to avoid silently corrupting
789
            // execution.
790
0
            DCHECK(false) << "FE-planned local exchange should not emit partition_type="
791
0
                          << static_cast<int>(info.partition_type);
792
0
            return Status::InternalError("FE-planned local exchange emitted unsupported type: " +
793
0
                                         std::to_string(static_cast<int>(info.partition_type)));
794
0
        default:
795
            // New TLocalPartitionType added on FE side without a BE handler here.
796
0
            DCHECK(false) << "Unhandled TLocalPartitionType in deferred exchangers: "
797
0
                          << static_cast<int>(info.partition_type);
798
0
            return Status::InternalError("Unsupported FE-planned local exchange type: " +
799
0
                                         std::to_string(static_cast<int>(info.partition_type)));
800
0
        }
801
0
    }
802
0
    _deferred_exchangers.clear();
803
0
    return Status::OK();
804
0
}
805
806
0
void PipelineFragmentContext::_propagate_local_exchange_num_tasks() {
807
    // Only runs when FE has planned local exchanges and BE deferred their construction.
808
    // In legacy mode (enable_local_shuffle_planner=false) BE plans LE itself via
809
    // _plan_local_exchange and _deferred_exchangers stays empty — the legacy path
810
    // already gets its num_tasks right at construction time, so the propagate passes
811
    // would be no-ops and are skipped.  This is a transitional design: once the FE
812
    // planner is the only planner, the propagation logic itself should degrade into
813
    // a pure assertion that the FE plan already wired the right num_tasks everywhere.
814
0
    if (_deferred_exchangers.empty()) {
815
0
        return;
816
0
    }
817
    // Reconcile num_tasks across paired pipelines created by pipeline-splitting operators
818
    // (AGG, SORT, JOIN): they share state via inject_shared_state and must agree, or
819
    // instance 1+ tasks access null shared_state.  A pipeline's num_tasks is fully
820
    // determined by its source operator plus its upstreams:
821
    //   - LocalExchangeSource  -> _num_instances (the LE re-parallelizes)
822
    //   - serial source        -> its reduced count (kept as-is, typically 1)
823
    //   - otherwise (splitter) -> inherit from upstreams: raise to _num_instances if any
824
    //                             upstream was raised by an LE, then lower to a serial
825
    //                             upstream's count (lower wins).
826
    // Visiting each pipeline only after all its upstreams (topological order over _dag) lets
827
    // a single sweep reach the same fixpoint the previous two while-loops iterated to — those
828
    // only existed to reconcile the top-down build's parent-inherited num_tasks guesses.
829
0
    std::map<PipelineId, PipelinePtr> id_to_pipe;
830
0
    std::map<PipelineId, std::vector<PipelineId>> downstreams_of;
831
0
    std::map<PipelineId, int> in_degree;
832
0
    for (auto& p : _pipelines) {
833
0
        id_to_pipe[p->id()] = p;
834
0
        in_degree.try_emplace(p->id(), 0);
835
0
    }
836
0
    for (const auto& [downstream_id, upstream_ids] : _dag) {
837
0
        for (auto upstream_id : upstream_ids) {
838
0
            downstreams_of[upstream_id].push_back(downstream_id);
839
0
            in_degree[downstream_id]++;
840
0
        }
841
0
    }
842
0
    std::vector<PipelineId> ready;
843
0
    for (const auto& [id, deg] : in_degree) {
844
0
        if (deg == 0) {
845
0
            ready.push_back(id);
846
0
        }
847
0
    }
848
0
    size_t visited = 0;
849
0
    while (!ready.empty()) {
850
0
        const auto id = ready.back();
851
0
        ready.pop_back();
852
0
        visited++;
853
0
        auto pit = id_to_pipe.find(id);
854
0
        if (pit != id_to_pipe.end()) {
855
0
            auto& pipe = pit->second;
856
0
            const auto& ops = pipe->operators();
857
0
            const bool le_source =
858
0
                    !ops.empty() && dynamic_cast<LocalExchangeSourceOperatorX*>(ops.front().get());
859
0
            const bool serial_source = !ops.empty() && ops.front()->is_serial_operator();
860
0
            if (le_source) {
861
0
                pipe->set_num_tasks(_num_instances);
862
0
            } else if (!serial_source) {
863
0
                int target = pipe->num_tasks();
864
0
                const auto up_it = _dag.find(id);
865
0
                if (up_it != _dag.end()) {
866
                    // raise: any upstream already at _num_instances (e.g. an LE source)
867
0
                    for (auto upstream_id : up_it->second) {
868
0
                        auto uit = id_to_pipe.find(upstream_id);
869
0
                        if (uit != id_to_pipe.end() && uit->second->num_tasks() >= _num_instances) {
870
0
                            target = _num_instances;
871
0
                            break;
872
0
                        }
873
0
                    }
874
                    // lower: a serial upstream with fewer tasks (wins over the raise above)
875
0
                    for (auto upstream_id : up_it->second) {
876
0
                        auto uit = id_to_pipe.find(upstream_id);
877
0
                        if (uit != id_to_pipe.end() && uit->second->num_tasks() < target &&
878
0
                            !uit->second->operators().empty() &&
879
0
                            uit->second->operators().front()->is_serial_operator()) {
880
0
                            target = uit->second->num_tasks();
881
0
                        }
882
0
                    }
883
0
                }
884
0
                pipe->set_num_tasks(target);
885
0
            }
886
0
        }
887
0
        for (auto down : downstreams_of[id]) {
888
0
            if (--in_degree[down] == 0) {
889
0
                ready.push_back(down);
890
0
            }
891
0
        }
892
0
    }
893
    // The pipeline DAG is acyclic; if a future change introduces a back-edge, some pipelines
894
    // stay unvisited (in_degree never reaches 0) — fail loudly rather than silently leaving
895
    // their num_tasks unreconciled.
896
0
    DCHECK_EQ(visited, in_degree.size())
897
0
            << "pipeline num_tasks topological sweep visited " << visited << " of "
898
0
            << in_degree.size() << " pipelines (cycle in _dag?)";
899
0
}
900
901
Status PipelineFragmentContext::_create_tree_helper(
902
        ObjectPool* pool, const std::vector<TPlanNode>& tnodes, const DescriptorTbl& descs,
903
        OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx,
904
0
        const bool followed_by_shuffled_operator, const bool require_bucket_distribution) {
905
    // propagate error case
906
0
    if (*node_idx >= tnodes.size()) {
907
0
        return Status::InternalError(
908
0
                "Failed to reconstruct plan tree from thrift. Node id: {}, number of nodes: {}",
909
0
                *node_idx, tnodes.size());
910
0
    }
911
0
    const TPlanNode& tnode = tnodes[*node_idx];
912
913
0
    int num_children = tnodes[*node_idx].num_children;
914
0
    bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
915
0
    bool current_require_bucket_distribution = require_bucket_distribution;
916
    // TODO: Create CacheOperator is confused now
917
0
    OperatorPtr op = nullptr;
918
0
    OperatorPtr cache_op = nullptr;
919
0
    RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op, cur_pipe,
920
0
                                     parent == nullptr ? -1 : parent->node_id(), child_idx,
921
0
                                     followed_by_shuffled_operator,
922
0
                                     current_require_bucket_distribution, cache_op));
923
    // Initialization must be done here. For example, group by expressions in agg will be used to
924
    // decide if a local shuffle should be planed, so it must be initialized here.
925
0
    RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
926
    // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
927
0
    if (parent != nullptr) {
928
        // add to parent's child(s)
929
0
        RETURN_IF_ERROR(parent->set_child(cache_op ? cache_op : op));
930
0
    } else {
931
0
        *root = op;
932
0
    }
933
    /**
934
     * `TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
935
     *
936
     * For plan:
937
     * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
938
     *                           Exchange(id=3) -> ShuffledHashJoinBuild(id=2)
939
     * We must ensure data distribution of `LocalExchange(id=0)` is same as Exchange(id=3).
940
     *
941
     * If an operator's is followed by a local exchange without shuffle (e.g. passthrough), a
942
     * shuffled local exchanger will be used before join so it is not followed by shuffle join.
943
     */
944
0
    auto required_data_distribution =
945
0
            cur_pipe->operators().empty()
946
0
                    ? cur_pipe->sink()->required_data_distribution(_runtime_state.get())
947
0
                    : op->required_data_distribution(_runtime_state.get());
948
0
    current_followed_by_shuffled_operator =
949
0
            ((followed_by_shuffled_operator ||
950
0
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_shuffled_operator()
951
0
                                             : op->is_shuffled_operator())) &&
952
0
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
953
0
            (followed_by_shuffled_operator &&
954
0
             required_data_distribution.distribution_type == TLocalPartitionType::NOOP);
955
956
0
    current_require_bucket_distribution =
957
0
            ((require_bucket_distribution ||
958
0
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_colocated_operator()
959
0
                                             : op->is_colocated_operator())) &&
960
0
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
961
0
            (require_bucket_distribution &&
962
0
             required_data_distribution.distribution_type == TLocalPartitionType::NOOP);
963
964
0
    if (num_children == 0) {
965
0
        _use_serial_source = op->is_serial_operator();
966
0
    }
967
    // rely on that tnodes is preorder of the plan
968
0
    for (int i = 0; i < num_children; i++) {
969
0
        ++*node_idx;
970
0
        RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, descs, op, node_idx, nullptr, cur_pipe, i,
971
0
                                            current_followed_by_shuffled_operator,
972
0
                                            current_require_bucket_distribution));
973
974
        // we are expecting a child, but have used all nodes
975
        // this means we have been given a bad tree and must fail
976
0
        if (*node_idx >= tnodes.size()) {
977
0
            return Status::InternalError(
978
0
                    "Failed to reconstruct plan tree from thrift. Node id: {}, number of "
979
0
                    "nodes: {}",
980
0
                    *node_idx, tnodes.size());
981
0
        }
982
0
    }
983
984
0
    return Status::OK();
985
0
}
986
987
void PipelineFragmentContext::_inherit_pipeline_properties(
988
        const DataDistribution& data_distribution, PipelinePtr pipe_with_source,
989
0
        PipelinePtr pipe_with_sink) {
990
0
    pipe_with_sink->set_num_tasks(pipe_with_source->num_tasks());
991
0
    pipe_with_source->set_num_tasks(_num_instances);
992
0
    pipe_with_source->set_data_distribution(data_distribution);
993
0
}
994
995
Status PipelineFragmentContext::_add_local_exchange_impl(
996
        int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip,
997
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
998
        const std::map<int, int>& bucket_seq_to_instance_idx,
999
0
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1000
0
    auto& operators = cur_pipe->operators();
1001
0
    const auto downstream_pipeline_id = cur_pipe->id();
1002
0
    auto local_exchange_id = next_operator_id();
1003
    // 1. Create a new pipeline with local exchange sink.
1004
0
    DataSinkOperatorPtr sink;
1005
0
    auto sink_id = next_sink_operator_id();
1006
1007
    /**
1008
     * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
1009
     * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
1010
     */
1011
0
    const bool followed_by_shuffled_operator =
1012
0
            operators.size() > idx ? operators[idx]->followed_by_shuffled_operator()
1013
0
                                   : cur_pipe->sink()->followed_by_shuffled_operator();
1014
0
    const bool use_global_hash_shuffle = bucket_seq_to_instance_idx.empty() &&
1015
0
                                         !shuffle_idx_to_instance_idx.contains(-1) &&
1016
0
                                         followed_by_shuffled_operator && !_use_serial_source;
1017
0
    sink = std::make_shared<LocalExchangeSinkOperatorX>(
1018
0
            sink_id, local_exchange_id, use_global_hash_shuffle ? _total_instances : _num_instances,
1019
0
            data_distribution.partition_exprs, bucket_seq_to_instance_idx);
1020
0
    if (bucket_seq_to_instance_idx.empty() &&
1021
0
        data_distribution.distribution_type == TLocalPartitionType::BUCKET_HASH_SHUFFLE) {
1022
0
        data_distribution.distribution_type =
1023
0
                use_global_hash_shuffle ? TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE
1024
0
                                        : TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE;
1025
0
    }
1026
0
    if (!use_global_hash_shuffle &&
1027
0
        data_distribution.distribution_type == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE) {
1028
0
        data_distribution.distribution_type = TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE;
1029
0
    }
1030
0
    RETURN_IF_ERROR(new_pip->set_sink(sink));
1031
0
    RETURN_IF_ERROR(new_pip->sink()->init(_runtime_state.get(), data_distribution.distribution_type,
1032
0
                                          num_buckets, shuffle_idx_to_instance_idx));
1033
1034
    // 2. Create and initialize LocalExchangeSharedState.
1035
0
    std::shared_ptr<LocalExchangeSharedState> shared_state =
1036
0
            LocalExchangeSharedState::create_shared(_num_instances);
1037
0
    switch (data_distribution.distribution_type) {
1038
0
    case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
1039
0
    case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
1040
0
        shared_state->exchanger = ShuffleExchanger::create_unique(
1041
0
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
1042
0
                use_global_hash_shuffle ? _total_instances : _num_instances,
1043
0
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1044
0
                        ? cast_set<int>(
1045
0
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1046
0
                        : 0,
1047
0
                data_distribution.distribution_type);
1048
0
        break;
1049
0
    case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
1050
0
        shared_state->exchanger = BucketShuffleExchanger::create_unique(
1051
0
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, num_buckets,
1052
0
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1053
0
                        ? cast_set<int>(
1054
0
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1055
0
                        : 0);
1056
0
        break;
1057
0
    case TLocalPartitionType::PASSTHROUGH:
1058
0
        shared_state->exchanger = PassthroughExchanger::create_unique(
1059
0
                cur_pipe->num_tasks(), _num_instances,
1060
0
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1061
0
                        ? cast_set<int>(
1062
0
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1063
0
                        : 0);
1064
0
        break;
1065
0
    case TLocalPartitionType::BROADCAST:
1066
0
        shared_state->exchanger = BroadcastExchanger::create_unique(
1067
0
                cur_pipe->num_tasks(), _num_instances,
1068
0
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1069
0
                        ? cast_set<int>(
1070
0
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1071
0
                        : 0);
1072
0
        break;
1073
0
    case TLocalPartitionType::PASS_TO_ONE:
1074
0
        if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
1075
            // If shared hash table is enabled for BJ, hash table will be built by only one task
1076
0
            shared_state->exchanger = PassToOneExchanger::create_unique(
1077
0
                    cur_pipe->num_tasks(), _num_instances,
1078
0
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1079
0
                            ? cast_set<int>(_runtime_state->query_options()
1080
0
                                                    .local_exchange_free_blocks_limit)
1081
0
                            : 0);
1082
0
        } else {
1083
0
            shared_state->exchanger = BroadcastExchanger::create_unique(
1084
0
                    cur_pipe->num_tasks(), _num_instances,
1085
0
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1086
0
                            ? cast_set<int>(_runtime_state->query_options()
1087
0
                                                    .local_exchange_free_blocks_limit)
1088
0
                            : 0);
1089
0
        }
1090
0
        break;
1091
0
    case TLocalPartitionType::ADAPTIVE_PASSTHROUGH:
1092
0
        shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
1093
0
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
1094
0
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1095
0
                        ? cast_set<int>(
1096
0
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1097
0
                        : 0);
1098
0
        break;
1099
0
    default:
1100
0
        return Status::InternalError("Unsupported local exchange type : " +
1101
0
                                     std::to_string((int)data_distribution.distribution_type));
1102
0
    }
1103
0
    shared_state->create_source_dependencies(_num_instances, local_exchange_id, local_exchange_id,
1104
0
                                             "LOCAL_EXCHANGE_OPERATOR");
1105
0
    shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
1106
0
    _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
1107
1108
    // 3. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to
1109
    // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink].
1110
1111
    // 3.1 Initialize new pipeline's operator list.
1112
0
    std::copy(operators.begin(), operators.begin() + idx,
1113
0
              std::inserter(new_pip->operators(), new_pip->operators().end()));
1114
1115
    // 3.2 Erase unused operators in previous pipeline.
1116
0
    operators.erase(operators.begin(), operators.begin() + idx);
1117
1118
    // 4. Initialize LocalExchangeSource and insert it into this pipeline.
1119
0
    OperatorPtr source_op;
1120
0
    source_op = std::make_shared<LocalExchangeSourceOperatorX>(pool, local_exchange_id);
1121
0
    RETURN_IF_ERROR(source_op->set_child(new_pip->operators().back()));
1122
0
    RETURN_IF_ERROR(source_op->init(data_distribution.distribution_type));
1123
0
    if (!operators.empty()) {
1124
0
        RETURN_IF_ERROR(operators.front()->set_child(nullptr));
1125
0
        RETURN_IF_ERROR(operators.front()->set_child(source_op));
1126
0
    }
1127
0
    operators.insert(operators.begin(), source_op);
1128
1129
    // 5. Set children for two pipelines separately.
1130
0
    std::vector<std::shared_ptr<Pipeline>> new_children;
1131
0
    std::vector<PipelineId> edges_with_source;
1132
0
    for (auto child : cur_pipe->children()) {
1133
0
        bool found = false;
1134
0
        for (auto op : new_pip->operators()) {
1135
0
            if (child->sink()->node_id() == op->node_id()) {
1136
0
                new_pip->set_children(child);
1137
0
                found = true;
1138
0
            };
1139
0
        }
1140
0
        if (!found) {
1141
0
            new_children.push_back(child);
1142
0
            edges_with_source.push_back(child->id());
1143
0
        }
1144
0
    }
1145
0
    new_children.push_back(new_pip);
1146
0
    edges_with_source.push_back(new_pip->id());
1147
1148
    // 6. Set DAG for new pipelines.
1149
0
    if (!new_pip->children().empty()) {
1150
0
        std::vector<PipelineId> edges_with_sink;
1151
0
        for (auto child : new_pip->children()) {
1152
0
            edges_with_sink.push_back(child->id());
1153
0
        }
1154
0
        _dag.insert({new_pip->id(), edges_with_sink});
1155
0
    }
1156
0
    cur_pipe->set_children(new_children);
1157
0
    _dag[downstream_pipeline_id] = edges_with_source;
1158
0
    RETURN_IF_ERROR(new_pip->sink()->set_child(new_pip->operators().back()));
1159
0
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(nullptr));
1160
0
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(cur_pipe->operators().back()));
1161
1162
    // 7. Inherit properties from current pipeline.
1163
0
    _inherit_pipeline_properties(data_distribution, cur_pipe, new_pip);
1164
0
    return Status::OK();
1165
0
}
1166
1167
Status PipelineFragmentContext::_add_local_exchange(
1168
        int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe,
1169
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
1170
        const std::map<int, int>& bucket_seq_to_instance_idx,
1171
0
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1172
0
    if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) {
1173
0
        return Status::OK();
1174
0
    }
1175
1176
0
    if (!cur_pipe->need_to_local_exchange(data_distribution, idx)) {
1177
0
        return Status::OK();
1178
0
    }
1179
0
    *do_local_exchange = true;
1180
1181
0
    auto& operators = cur_pipe->operators();
1182
0
    auto total_op_num = operators.size();
1183
0
    auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
1184
0
    RETURN_IF_ERROR(_add_local_exchange_impl(
1185
0
            idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets,
1186
0
            bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1187
1188
0
    CHECK(total_op_num + 1 == cur_pipe->operators().size() + new_pip->operators().size())
1189
0
            << "total_op_num: " << total_op_num
1190
0
            << " cur_pipe->operators().size(): " << cur_pipe->operators().size()
1191
0
            << " new_pip->operators().size(): " << new_pip->operators().size();
1192
1193
    // There are some local shuffles with relatively heavy operations on the sink.
1194
    // If the local sink concurrency is 1 and the local source concurrency is n, the sink becomes a bottleneck.
1195
    // Therefore, local passthrough is used to increase the concurrency of the sink.
1196
    // op -> local sink(1) -> local source (n)
1197
    // op -> local passthrough(1) -> local passthrough(n) ->  local sink(n) -> local source (n)
1198
0
    if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 &&
1199
0
        Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) {
1200
0
        RETURN_IF_ERROR(_add_local_exchange_impl(
1201
0
                cast_set<int>(new_pip->operators().size()), pool, new_pip,
1202
0
                add_pipeline(new_pip, pip_idx + 2),
1203
0
                DataDistribution(TLocalPartitionType::PASSTHROUGH), do_local_exchange, num_buckets,
1204
0
                bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1205
0
    }
1206
0
    return Status::OK();
1207
0
}
1208
1209
Status PipelineFragmentContext::_plan_local_exchange(
1210
        int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx,
1211
0
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1212
0
    for (int pip_idx = cast_set<int>(_pipelines.size()) - 1; pip_idx >= 0; pip_idx--) {
1213
0
        _pipelines[pip_idx]->init_data_distribution(_runtime_state.get());
1214
        // Set property if child pipeline is not join operator's child.
1215
0
        if (!_pipelines[pip_idx]->children().empty()) {
1216
0
            for (auto& child : _pipelines[pip_idx]->children()) {
1217
0
                if (child->sink()->node_id() ==
1218
0
                    _pipelines[pip_idx]->operators().front()->node_id()) {
1219
0
                    _pipelines[pip_idx]->set_data_distribution(child->data_distribution());
1220
0
                }
1221
0
            }
1222
0
        }
1223
1224
        // if 'num_buckets == 0' means the fragment is colocated by exchange node not the
1225
        // scan node. so here use `_num_instance` to replace the `num_buckets` to prevent dividing 0
1226
        // still keep colocate plan after local shuffle
1227
0
        RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx, _pipelines[pip_idx],
1228
0
                                             bucket_seq_to_instance_idx,
1229
0
                                             shuffle_idx_to_instance_idx));
1230
0
    }
1231
0
    return Status::OK();
1232
0
}
1233
1234
Status PipelineFragmentContext::_plan_local_exchange(
1235
        int num_buckets, int pip_idx, PipelinePtr pip,
1236
        const std::map<int, int>& bucket_seq_to_instance_idx,
1237
0
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1238
0
    int idx = 1;
1239
0
    bool do_local_exchange = false;
1240
0
    do {
1241
0
        auto& ops = pip->operators();
1242
0
        do_local_exchange = false;
1243
        // Plan local exchange for each operator.
1244
0
        for (; idx < ops.size();) {
1245
0
            auto _le_req = ops[idx]->required_data_distribution(_runtime_state.get());
1246
0
            if (_le_req.need_local_exchange()) {
1247
0
                RETURN_IF_ERROR(_add_local_exchange(
1248
0
                        pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip, _le_req,
1249
0
                        &do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
1250
0
                        shuffle_idx_to_instance_idx));
1251
0
            }
1252
0
            if (do_local_exchange) {
1253
                // If local exchange is needed for current operator, we will split this pipeline to
1254
                // two pipelines by local exchange sink/source. And then we need to process remaining
1255
                // operators in this pipeline so we set idx to 2 (0 is local exchange source and 1
1256
                // is current operator was already processed) and continue to plan local exchange.
1257
0
                idx = 2;
1258
0
                break;
1259
0
            }
1260
0
            idx++;
1261
0
        }
1262
0
    } while (do_local_exchange);
1263
0
    if (pip->sink()->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1264
0
        RETURN_IF_ERROR(_add_local_exchange(
1265
0
                pip_idx, idx, pip->sink()->node_id(), _runtime_state->obj_pool(), pip,
1266
0
                pip->sink()->required_data_distribution(_runtime_state.get()), &do_local_exchange,
1267
0
                num_buckets, bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1268
0
    }
1269
0
    return Status::OK();
1270
0
}
1271
1272
Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
1273
                                                  const std::vector<TExpr>& output_exprs,
1274
                                                  const TPipelineFragmentParams& params,
1275
                                                  const RowDescriptor& row_desc,
1276
                                                  RuntimeState* state, DescriptorTbl& desc_tbl,
1277
0
                                                  PipelineId cur_pipeline_id) {
1278
0
    switch (thrift_sink.type) {
1279
0
    case TDataSinkType::DATA_STREAM_SINK: {
1280
0
        if (!thrift_sink.__isset.stream_sink) {
1281
0
            return Status::InternalError("Missing data stream sink.");
1282
0
        }
1283
0
        _sink = std::make_shared<ExchangeSinkOperatorX>(
1284
0
                state, row_desc, next_sink_operator_id(), thrift_sink.stream_sink,
1285
0
                params.destinations, _fragment_instance_ids);
1286
0
        break;
1287
0
    }
1288
0
    case TDataSinkType::RESULT_SINK: {
1289
0
        if (!thrift_sink.__isset.result_sink) {
1290
0
            return Status::InternalError("Missing data buffer sink.");
1291
0
        }
1292
1293
0
        auto& pipeline = _pipelines[cur_pipeline_id];
1294
0
        int child_node_id = pipeline->operators().back()->node_id();
1295
0
        _sink = std::make_shared<ResultSinkOperatorX>(next_sink_operator_id(), child_node_id + 1,
1296
0
                                                      row_desc, output_exprs,
1297
0
                                                      thrift_sink.result_sink);
1298
0
        break;
1299
0
    }
1300
0
    case TDataSinkType::DICTIONARY_SINK: {
1301
0
        if (!thrift_sink.__isset.dictionary_sink) {
1302
0
            return Status::InternalError("Missing dict sink.");
1303
0
        }
1304
1305
0
        _sink = std::make_shared<DictSinkOperatorX>(next_sink_operator_id(), row_desc, output_exprs,
1306
0
                                                    thrift_sink.dictionary_sink);
1307
0
        break;
1308
0
    }
1309
0
    case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
1310
0
    case TDataSinkType::OLAP_TABLE_SINK: {
1311
0
        auto& pipeline = _pipelines[cur_pipeline_id];
1312
0
        int child_node_id = pipeline->operators().back()->node_id();
1313
0
        if (state->query_options().enable_memtable_on_sink_node &&
1314
0
            !_has_inverted_index_v1_or_partial_update(thrift_sink.olap_table_sink) &&
1315
0
            !_has_row_binlog(thrift_sink.olap_table_sink) && !config::is_cloud_mode()) {
1316
0
            _sink = std::make_shared<OlapTableSinkV2OperatorX>(
1317
0
                    pool, next_sink_operator_id(), child_node_id + 1, row_desc, output_exprs);
1318
0
        } else {
1319
0
            _sink = std::make_shared<OlapTableSinkOperatorX>(
1320
0
                    pool, next_sink_operator_id(), child_node_id + 1, row_desc, output_exprs);
1321
0
        }
1322
0
        break;
1323
0
    }
1324
0
    case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
1325
0
        DCHECK(thrift_sink.__isset.olap_table_sink);
1326
0
        DCHECK(state->get_query_ctx() != nullptr);
1327
0
        state->get_query_ctx()->query_mem_tracker()->is_group_commit_load = true;
1328
0
        _sink = std::make_shared<GroupCommitBlockSinkOperatorX>(next_sink_operator_id(), row_desc,
1329
0
                                                                output_exprs);
1330
0
        break;
1331
0
    }
1332
0
    case TDataSinkType::HIVE_TABLE_SINK: {
1333
0
        if (!thrift_sink.__isset.hive_table_sink) {
1334
0
            return Status::InternalError("Missing hive table sink.");
1335
0
        }
1336
0
        _sink = std::make_shared<HiveTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1337
0
                                                         output_exprs);
1338
0
        break;
1339
0
    }
1340
0
    case TDataSinkType::ICEBERG_TABLE_SINK: {
1341
0
        if (!thrift_sink.__isset.iceberg_table_sink) {
1342
0
            return Status::InternalError("Missing iceberg table sink.");
1343
0
        }
1344
0
        if (thrift_sink.iceberg_table_sink.__isset.sort_info) {
1345
0
            _sink = std::make_shared<SpillIcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1346
0
                                                                     row_desc, output_exprs);
1347
0
        } else {
1348
0
            _sink = std::make_shared<IcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1349
0
                                                                row_desc, output_exprs);
1350
0
        }
1351
0
        break;
1352
0
    }
1353
0
    case TDataSinkType::ICEBERG_DELETE_SINK: {
1354
0
        if (!thrift_sink.__isset.iceberg_delete_sink) {
1355
0
            return Status::InternalError("Missing iceberg delete sink.");
1356
0
        }
1357
0
        _sink = std::make_shared<IcebergDeleteSinkOperatorX>(pool, next_sink_operator_id(),
1358
0
                                                             row_desc, output_exprs);
1359
0
        break;
1360
0
    }
1361
0
    case TDataSinkType::ICEBERG_MERGE_SINK: {
1362
0
        if (!thrift_sink.__isset.iceberg_merge_sink) {
1363
0
            return Status::InternalError("Missing iceberg merge sink.");
1364
0
        }
1365
0
        _sink = std::make_shared<IcebergMergeSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1366
0
                                                            output_exprs);
1367
0
        break;
1368
0
    }
1369
0
    case TDataSinkType::MAXCOMPUTE_TABLE_SINK: {
1370
0
        if (!thrift_sink.__isset.max_compute_table_sink) {
1371
0
            return Status::InternalError("Missing max compute table sink.");
1372
0
        }
1373
0
        _sink = std::make_shared<MCTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1374
0
                                                       output_exprs);
1375
0
        break;
1376
0
    }
1377
0
    case TDataSinkType::JDBC_TABLE_SINK: {
1378
0
        if (!thrift_sink.__isset.jdbc_table_sink) {
1379
0
            return Status::InternalError("Missing data jdbc sink.");
1380
0
        }
1381
0
        if (config::enable_java_support) {
1382
0
            _sink = std::make_shared<JdbcTableSinkOperatorX>(row_desc, next_sink_operator_id(),
1383
0
                                                             output_exprs);
1384
0
        } else {
1385
0
            return Status::InternalError(
1386
0
                    "Jdbc table sink is not enabled, you can change be config "
1387
0
                    "enable_java_support to true and restart be.");
1388
0
        }
1389
0
        break;
1390
0
    }
1391
0
    case TDataSinkType::MEMORY_SCRATCH_SINK: {
1392
0
        if (!thrift_sink.__isset.memory_scratch_sink) {
1393
0
            return Status::InternalError("Missing data buffer sink.");
1394
0
        }
1395
1396
0
        _sink = std::make_shared<MemoryScratchSinkOperatorX>(row_desc, next_sink_operator_id(),
1397
0
                                                             output_exprs);
1398
0
        break;
1399
0
    }
1400
0
    case TDataSinkType::RESULT_FILE_SINK: {
1401
0
        if (!thrift_sink.__isset.result_file_sink) {
1402
0
            return Status::InternalError("Missing result file sink.");
1403
0
        }
1404
1405
        // Result file sink is not the top sink
1406
0
        if (params.__isset.destinations && !params.destinations.empty()) {
1407
0
            _sink = std::make_shared<ResultFileSinkOperatorX>(
1408
0
                    next_sink_operator_id(), row_desc, thrift_sink.result_file_sink,
1409
0
                    params.destinations, output_exprs, desc_tbl);
1410
0
        } else {
1411
0
            _sink = std::make_shared<ResultFileSinkOperatorX>(next_sink_operator_id(), row_desc,
1412
0
                                                              output_exprs);
1413
0
        }
1414
0
        break;
1415
0
    }
1416
0
    case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
1417
0
        DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
1418
0
        DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
1419
0
        auto sink_id = next_sink_operator_id();
1420
0
        const int multi_cast_node_id = sink_id;
1421
0
        auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size();
1422
        // one sink has multiple sources.
1423
0
        std::vector<int> sources;
1424
0
        for (int i = 0; i < sender_size; ++i) {
1425
0
            auto source_id = next_operator_id();
1426
0
            sources.push_back(source_id);
1427
0
        }
1428
1429
0
        _sink = std::make_shared<MultiCastDataStreamSinkOperatorX>(
1430
0
                sink_id, multi_cast_node_id, sources, pool, thrift_sink.multi_cast_stream_sink);
1431
0
        for (int i = 0; i < sender_size; ++i) {
1432
0
            auto new_pipeline = add_pipeline();
1433
            // use to exchange sink
1434
0
            RowDescriptor* exchange_row_desc = nullptr;
1435
0
            {
1436
0
                const auto& tmp_row_desc =
1437
0
                        !thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty()
1438
0
                                ? RowDescriptor(state->desc_tbl(),
1439
0
                                                {thrift_sink.multi_cast_stream_sink.sinks[i]
1440
0
                                                         .output_tuple_id})
1441
0
                                : row_desc;
1442
0
                exchange_row_desc = pool->add(new RowDescriptor(tmp_row_desc));
1443
0
            }
1444
0
            auto source_id = sources[i];
1445
0
            OperatorPtr source_op;
1446
            // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
1447
0
            source_op = std::make_shared<MultiCastDataStreamerSourceOperatorX>(
1448
0
                    /*node_id*/ source_id, /*consumer_id*/ i, pool,
1449
0
                    thrift_sink.multi_cast_stream_sink.sinks[i], row_desc,
1450
0
                    /*operator_id=*/source_id);
1451
0
            RETURN_IF_ERROR(new_pipeline->add_operator(
1452
0
                    source_op, params.__isset.parallel_instances ? params.parallel_instances : 0));
1453
            // 2. create and set sink operator of data stream sender for new pipeline
1454
1455
0
            DataSinkOperatorPtr sink_op;
1456
0
            sink_op = std::make_shared<ExchangeSinkOperatorX>(
1457
0
                    state, *exchange_row_desc, next_sink_operator_id(),
1458
0
                    thrift_sink.multi_cast_stream_sink.sinks[i],
1459
0
                    thrift_sink.multi_cast_stream_sink.destinations[i], _fragment_instance_ids);
1460
1461
0
            RETURN_IF_ERROR(new_pipeline->set_sink(sink_op));
1462
0
            {
1463
0
                TDataSink* t = pool->add(new TDataSink());
1464
0
                t->stream_sink = thrift_sink.multi_cast_stream_sink.sinks[i];
1465
0
                RETURN_IF_ERROR(sink_op->init(*t));
1466
0
            }
1467
1468
            // 3. set dependency dag
1469
0
            _dag[new_pipeline->id()].push_back(cur_pipeline_id);
1470
0
        }
1471
0
        if (sources.empty()) {
1472
0
            return Status::InternalError("size of sources must be greater than 0");
1473
0
        }
1474
0
        break;
1475
0
    }
1476
0
    case TDataSinkType::BLACKHOLE_SINK: {
1477
0
        if (!thrift_sink.__isset.blackhole_sink) {
1478
0
            return Status::InternalError("Missing blackhole sink.");
1479
0
        }
1480
1481
0
        _sink.reset(new BlackholeSinkOperatorX(next_sink_operator_id()));
1482
0
        break;
1483
0
    }
1484
0
    case TDataSinkType::TVF_TABLE_SINK: {
1485
0
        if (!thrift_sink.__isset.tvf_table_sink) {
1486
0
            return Status::InternalError("Missing TVF table sink.");
1487
0
        }
1488
0
        _sink = std::make_shared<TVFTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1489
0
                                                        output_exprs);
1490
0
        break;
1491
0
    }
1492
0
    default:
1493
0
        return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type);
1494
0
    }
1495
0
    return Status::OK();
1496
0
}
1497
1498
// NOLINTBEGIN(readability-function-size)
1499
// NOLINTBEGIN(readability-function-cognitive-complexity)
1500
Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNode& tnode,
1501
                                                 const DescriptorTbl& descs, OperatorPtr& op,
1502
                                                 PipelinePtr& cur_pipe, int parent_idx,
1503
                                                 int child_idx,
1504
                                                 const bool followed_by_shuffled_operator,
1505
                                                 const bool require_bucket_distribution,
1506
0
                                                 OperatorPtr& cache_op) {
1507
0
    std::vector<DataSinkOperatorPtr> sink_ops;
1508
0
    Defer defer = Defer([&]() {
1509
0
        if (op) {
1510
0
            op->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1511
0
        }
1512
0
        for (auto& s : sink_ops) {
1513
0
            s->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1514
0
        }
1515
0
    });
1516
    // We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
1517
    // Therefore, here we need to use a stack-like structure.
1518
0
    _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
1519
0
    std::stringstream error_msg;
1520
0
    bool enable_query_cache = _params.fragment.__isset.query_cache_param;
1521
1522
0
    bool fe_with_old_version = false;
1523
0
    switch (tnode.node_type) {
1524
0
    case TPlanNodeType::OLAP_SCAN_NODE: {
1525
0
        op = std::make_shared<OlapScanOperatorX>(
1526
0
                pool, tnode, next_operator_id(), descs, _num_instances,
1527
0
                enable_query_cache ? _params.fragment.query_cache_param : TQueryCacheParam {});
1528
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1529
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1530
0
        break;
1531
0
    }
1532
0
    case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
1533
0
        DCHECK(_query_ctx != nullptr);
1534
0
        _query_ctx->query_mem_tracker()->is_group_commit_load = true;
1535
0
        op = std::make_shared<GroupCommitOperatorX>(pool, tnode, next_operator_id(), descs,
1536
0
                                                    _num_instances);
1537
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1538
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1539
0
        break;
1540
0
    }
1541
0
    case TPlanNodeType::JDBC_SCAN_NODE: {
1542
0
        if (config::enable_java_support) {
1543
0
            op = std::make_shared<JDBCScanOperatorX>(pool, tnode, next_operator_id(), descs,
1544
0
                                                     _num_instances);
1545
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1546
0
        } else {
1547
0
            return Status::InternalError(
1548
0
                    "Jdbc scan node is disabled, you can change be config enable_java_support "
1549
0
                    "to true and restart be.");
1550
0
        }
1551
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1552
0
        break;
1553
0
    }
1554
0
    case TPlanNodeType::FILE_SCAN_NODE: {
1555
0
        op = std::make_shared<FileScanOperatorX>(pool, tnode, next_operator_id(), descs,
1556
0
                                                 _num_instances);
1557
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1558
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1559
0
        break;
1560
0
    }
1561
0
    case TPlanNodeType::EXCHANGE_NODE: {
1562
0
        int num_senders = _params.per_exch_num_senders.contains(tnode.node_id)
1563
0
                                  ? _params.per_exch_num_senders.find(tnode.node_id)->second
1564
0
                                  : 0;
1565
0
        DCHECK_GT(num_senders, 0);
1566
0
        auto exchange_op = std::make_shared<ExchangeSourceOperatorX>(
1567
0
                pool, tnode, next_operator_id(), descs, num_senders);
1568
0
        if (!_params.bucket_seq_to_instance_idx.empty()) {
1569
            // Lets bucket-routed exchanges detect orphan instances (owning no bucket) that
1570
            // no sender channel will ever address — their receivers must start at EOS.
1571
0
            exchange_op->set_bucket_dest_instances(_params.bucket_seq_to_instance_idx);
1572
0
        }
1573
0
        op = exchange_op;
1574
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1575
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1576
0
        break;
1577
0
    }
1578
0
    case TPlanNodeType::AGGREGATION_NODE: {
1579
0
        if (tnode.agg_node.grouping_exprs.empty() &&
1580
0
            descs.get_tuple_descriptor(tnode.agg_node.output_tuple_id)->slots().empty()) {
1581
0
            return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) +
1582
0
                                         ": group by and output is empty");
1583
0
        }
1584
0
        bool need_create_cache_op =
1585
0
                enable_query_cache && tnode.node_id == _params.fragment.query_cache_param.node_id;
1586
0
        auto create_query_cache_operator = [&](PipelinePtr& new_pipe) {
1587
0
            auto cache_node_id = _params.local_params[0].per_node_scan_ranges.begin()->first;
1588
0
            auto cache_source_id = next_operator_id();
1589
0
            op = std::make_shared<CacheSourceOperatorX>(pool, cache_node_id, cache_source_id,
1590
0
                                                        _params.fragment.query_cache_param);
1591
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1592
1593
0
            const auto downstream_pipeline_id = cur_pipe->id();
1594
0
            if (!_dag.contains(downstream_pipeline_id)) {
1595
0
                _dag.insert({downstream_pipeline_id, {}});
1596
0
            }
1597
0
            new_pipe = add_pipeline(cur_pipe);
1598
0
            _dag[downstream_pipeline_id].push_back(new_pipe->id());
1599
1600
0
            DataSinkOperatorPtr cache_sink(new CacheSinkOperatorX(
1601
0
                    next_sink_operator_id(), op->node_id(), op->operator_id()));
1602
0
            RETURN_IF_ERROR(new_pipe->set_sink(cache_sink));
1603
0
            return Status::OK();
1604
0
        };
1605
0
        const bool group_by_limit_opt =
1606
0
                tnode.agg_node.__isset.agg_sort_info_by_group_key && tnode.limit > 0;
1607
1608
        /// PartitionedAggSourceOperatorX does not support "group by limit opt(#29641)" yet.
1609
        /// If `group_by_limit_opt` is true, then it might not need to spill at all.
1610
0
        const bool enable_spill = _runtime_state->enable_spill() &&
1611
0
                                  !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt;
1612
0
        const bool is_streaming_agg = tnode.agg_node.__isset.use_streaming_preaggregation &&
1613
0
                                      tnode.agg_node.use_streaming_preaggregation &&
1614
0
                                      !tnode.agg_node.grouping_exprs.empty();
1615
        // TODO: distinct streaming agg does not support spill.
1616
0
        const bool can_use_distinct_streaming_agg =
1617
0
                (!enable_spill || is_streaming_agg) && tnode.agg_node.aggregate_functions.empty() &&
1618
0
                !tnode.agg_node.__isset.agg_sort_info_by_group_key &&
1619
0
                _params.query_options.__isset.enable_distinct_streaming_aggregation &&
1620
0
                _params.query_options.enable_distinct_streaming_aggregation;
1621
1622
0
        if (can_use_distinct_streaming_agg) {
1623
0
            if (need_create_cache_op) {
1624
0
                PipelinePtr new_pipe;
1625
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1626
1627
0
                cache_op = op;
1628
0
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1629
0
                                                                     tnode, descs);
1630
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1631
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1632
0
                cur_pipe = new_pipe;
1633
0
            } else {
1634
0
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1635
0
                                                                     tnode, descs);
1636
0
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1637
0
            }
1638
0
        } else if (is_streaming_agg) {
1639
0
            if (need_create_cache_op) {
1640
0
                PipelinePtr new_pipe;
1641
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1642
0
                cache_op = op;
1643
0
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1644
0
                                                             descs);
1645
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1646
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1647
0
                cur_pipe = new_pipe;
1648
0
            } else {
1649
0
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1650
0
                                                             descs);
1651
0
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1652
0
            }
1653
0
        } else {
1654
            // create new pipeline to add query cache operator
1655
0
            PipelinePtr new_pipe;
1656
0
            if (need_create_cache_op) {
1657
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1658
0
                cache_op = op;
1659
0
            }
1660
1661
0
            if (enable_spill) {
1662
0
                op = std::make_shared<PartitionedAggSourceOperatorX>(pool, tnode,
1663
0
                                                                     next_operator_id(), descs);
1664
0
            } else {
1665
0
                op = std::make_shared<AggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1666
0
            }
1667
0
            if (need_create_cache_op) {
1668
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1669
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1670
0
                cur_pipe = new_pipe;
1671
0
            } else {
1672
0
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1673
0
            }
1674
1675
0
            const auto downstream_pipeline_id = cur_pipe->id();
1676
0
            if (!_dag.contains(downstream_pipeline_id)) {
1677
0
                _dag.insert({downstream_pipeline_id, {}});
1678
0
            }
1679
0
            cur_pipe = add_pipeline(cur_pipe);
1680
0
            _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1681
1682
0
            if (enable_spill) {
1683
0
                sink_ops.push_back(std::make_shared<PartitionedAggSinkOperatorX>(
1684
0
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1685
0
            } else {
1686
0
                sink_ops.push_back(std::make_shared<AggSinkOperatorX>(
1687
0
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1688
0
            }
1689
0
            RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1690
0
            RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1691
0
        }
1692
0
        break;
1693
0
    }
1694
0
    case TPlanNodeType::BUCKETED_AGGREGATION_NODE: {
1695
0
        if (tnode.bucketed_agg_node.grouping_exprs.empty()) {
1696
0
            return Status::InternalError(
1697
0
                    "Bucketed aggregation node {} should not be used without group by keys",
1698
0
                    tnode.node_id);
1699
0
        }
1700
1701
        // Create source operator (goes on the current / downstream pipeline).
1702
0
        op = std::make_shared<BucketedAggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1703
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1704
1705
        // Create a new pipeline for the sink side.
1706
0
        const auto downstream_pipeline_id = cur_pipe->id();
1707
0
        if (!_dag.contains(downstream_pipeline_id)) {
1708
0
            _dag.insert({downstream_pipeline_id, {}});
1709
0
        }
1710
0
        cur_pipe = add_pipeline(cur_pipe);
1711
0
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1712
1713
        // Create sink operator.
1714
0
        sink_ops.push_back(std::make_shared<BucketedAggSinkOperatorX>(
1715
0
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1716
0
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1717
0
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1718
1719
        // Pre-register a single shared state for ALL instances so that every
1720
        // sink instance writes its per-instance hash table into the same
1721
        // BucketedAggSharedState and every source instance can merge across
1722
        // all of them.
1723
0
        {
1724
0
            auto shared_state = BucketedAggSharedState::create_shared();
1725
0
            shared_state->id = op->operator_id();
1726
0
            shared_state->related_op_ids.insert(op->operator_id());
1727
1728
0
            for (int i = 0; i < _num_instances; i++) {
1729
0
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1730
0
                                                             "BUCKETED_AGG_SINK_DEPENDENCY");
1731
0
                sink_dep->set_shared_state(shared_state.get());
1732
0
                shared_state->sink_deps.push_back(sink_dep);
1733
0
            }
1734
0
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1735
0
                                                     op->node_id(), "BUCKETED_AGG_SOURCE");
1736
0
            _op_id_to_shared_state.insert(
1737
0
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1738
0
        }
1739
0
        break;
1740
0
    }
1741
0
    case TPlanNodeType::HASH_JOIN_NODE: {
1742
0
        const auto is_broadcast_join = tnode.hash_join_node.__isset.is_broadcast_join &&
1743
0
                                       tnode.hash_join_node.is_broadcast_join;
1744
0
        const auto enable_spill = _runtime_state->enable_spill();
1745
0
        if (enable_spill && !is_broadcast_join) {
1746
0
            auto tnode_ = tnode;
1747
0
            tnode_.runtime_filters.clear();
1748
0
            auto inner_probe_operator =
1749
0
                    std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, descs);
1750
1751
            // probe side inner sink operator is used to build hash table on probe side when data is spilled.
1752
            // So here use `tnode_` which has no runtime filters.
1753
0
            auto probe_side_inner_sink_operator =
1754
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode_, descs);
1755
1756
0
            RETURN_IF_ERROR(inner_probe_operator->init(tnode_, _runtime_state.get()));
1757
0
            RETURN_IF_ERROR(probe_side_inner_sink_operator->init(tnode_, _runtime_state.get()));
1758
1759
0
            auto probe_operator = std::make_shared<PartitionedHashJoinProbeOperatorX>(
1760
0
                    pool, tnode_, next_operator_id(), descs);
1761
0
            probe_operator->set_inner_operators(probe_side_inner_sink_operator,
1762
0
                                                inner_probe_operator);
1763
0
            op = std::move(probe_operator);
1764
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1765
1766
0
            const auto downstream_pipeline_id = cur_pipe->id();
1767
0
            if (!_dag.contains(downstream_pipeline_id)) {
1768
0
                _dag.insert({downstream_pipeline_id, {}});
1769
0
            }
1770
0
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1771
0
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1772
1773
0
            auto inner_sink_operator =
1774
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode, descs);
1775
0
            auto sink_operator = std::make_shared<PartitionedHashJoinSinkOperatorX>(
1776
0
                    pool, next_sink_operator_id(), op->operator_id(), tnode_, descs);
1777
0
            RETURN_IF_ERROR(inner_sink_operator->init(tnode, _runtime_state.get()));
1778
1779
0
            sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator);
1780
0
            sink_ops.push_back(std::move(sink_operator));
1781
0
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1782
0
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode_, _runtime_state.get()));
1783
1784
0
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1785
0
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1786
0
        } else {
1787
0
            op = std::make_shared<HashJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1788
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1789
1790
0
            const auto downstream_pipeline_id = cur_pipe->id();
1791
0
            if (!_dag.contains(downstream_pipeline_id)) {
1792
0
                _dag.insert({downstream_pipeline_id, {}});
1793
0
            }
1794
0
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1795
0
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1796
1797
0
            sink_ops.push_back(std::make_shared<HashJoinBuildSinkOperatorX>(
1798
0
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1799
0
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1800
0
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1801
1802
0
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1803
0
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1804
0
        }
1805
0
        if (is_broadcast_join && _runtime_state->enable_share_hash_table_for_broadcast_join()) {
1806
0
            std::shared_ptr<HashJoinSharedState> shared_state =
1807
0
                    HashJoinSharedState::create_shared(_num_instances);
1808
0
            for (int i = 0; i < _num_instances; i++) {
1809
0
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1810
0
                                                             "HASH_JOIN_BUILD_DEPENDENCY");
1811
0
                sink_dep->set_shared_state(shared_state.get());
1812
0
                shared_state->sink_deps.push_back(sink_dep);
1813
0
            }
1814
0
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1815
0
                                                     op->node_id(), "HASH_JOIN_PROBE");
1816
0
            _op_id_to_shared_state.insert(
1817
0
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1818
0
        }
1819
0
        break;
1820
0
    }
1821
0
    case TPlanNodeType::CROSS_JOIN_NODE: {
1822
0
        op = std::make_shared<NestedLoopJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1823
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1824
1825
0
        const auto downstream_pipeline_id = cur_pipe->id();
1826
0
        if (!_dag.contains(downstream_pipeline_id)) {
1827
0
            _dag.insert({downstream_pipeline_id, {}});
1828
0
        }
1829
0
        PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1830
0
        _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1831
1832
0
        sink_ops.push_back(std::make_shared<NestedLoopJoinBuildSinkOperatorX>(
1833
0
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1834
0
        RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1835
0
        RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1836
0
        _pipeline_parent_map.push(op->node_id(), cur_pipe);
1837
0
        _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1838
0
        break;
1839
0
    }
1840
0
    case TPlanNodeType::UNION_NODE: {
1841
0
        int child_count = tnode.num_children;
1842
0
        op = std::make_shared<UnionSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1843
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1844
1845
0
        const auto downstream_pipeline_id = cur_pipe->id();
1846
0
        if (!_dag.contains(downstream_pipeline_id)) {
1847
0
            _dag.insert({downstream_pipeline_id, {}});
1848
0
        }
1849
0
        for (int i = 0; i < child_count; i++) {
1850
0
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1851
0
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1852
0
            sink_ops.push_back(std::make_shared<UnionSinkOperatorX>(
1853
0
                    i, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1854
0
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1855
0
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1856
            // preset children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1857
0
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1858
0
        }
1859
0
        break;
1860
0
    }
1861
0
    case TPlanNodeType::SORT_NODE: {
1862
0
        const auto should_spill = _runtime_state->enable_spill() &&
1863
0
                                  tnode.sort_node.algorithm == TSortAlgorithm::FULL_SORT;
1864
0
        const bool use_local_merge =
1865
0
                tnode.sort_node.__isset.use_local_merge && tnode.sort_node.use_local_merge;
1866
0
        if (should_spill) {
1867
0
            op = std::make_shared<SpillSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1868
0
        } else if (use_local_merge) {
1869
0
            op = std::make_shared<LocalMergeSortSourceOperatorX>(pool, tnode, next_operator_id(),
1870
0
                                                                 descs);
1871
0
        } else {
1872
0
            op = std::make_shared<SortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1873
0
        }
1874
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1875
1876
0
        const auto downstream_pipeline_id = cur_pipe->id();
1877
0
        if (!_dag.contains(downstream_pipeline_id)) {
1878
0
            _dag.insert({downstream_pipeline_id, {}});
1879
0
        }
1880
0
        cur_pipe = add_pipeline(cur_pipe);
1881
0
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1882
1883
0
        if (should_spill) {
1884
0
            sink_ops.push_back(std::make_shared<SpillSortSinkOperatorX>(
1885
0
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1886
0
        } else {
1887
0
            sink_ops.push_back(std::make_shared<SortSinkOperatorX>(
1888
0
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1889
0
        }
1890
0
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1891
0
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1892
0
        break;
1893
0
    }
1894
0
    case TPlanNodeType::PARTITION_SORT_NODE: {
1895
0
        op = std::make_shared<PartitionSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1896
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1897
1898
0
        const auto downstream_pipeline_id = cur_pipe->id();
1899
0
        if (!_dag.contains(downstream_pipeline_id)) {
1900
0
            _dag.insert({downstream_pipeline_id, {}});
1901
0
        }
1902
0
        cur_pipe = add_pipeline(cur_pipe);
1903
0
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1904
1905
0
        sink_ops.push_back(std::make_shared<PartitionSortSinkOperatorX>(
1906
0
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1907
0
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1908
0
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1909
0
        break;
1910
0
    }
1911
0
    case TPlanNodeType::ANALYTIC_EVAL_NODE: {
1912
0
        op = std::make_shared<AnalyticSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1913
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1914
1915
0
        const auto downstream_pipeline_id = cur_pipe->id();
1916
0
        if (!_dag.contains(downstream_pipeline_id)) {
1917
0
            _dag.insert({downstream_pipeline_id, {}});
1918
0
        }
1919
0
        cur_pipe = add_pipeline(cur_pipe);
1920
0
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1921
1922
0
        sink_ops.push_back(std::make_shared<AnalyticSinkOperatorX>(
1923
0
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1924
0
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1925
0
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1926
0
        break;
1927
0
    }
1928
0
    case TPlanNodeType::MATERIALIZATION_NODE: {
1929
0
        op = std::make_shared<MaterializationOperator>(pool, tnode, next_operator_id(), descs);
1930
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1931
0
        break;
1932
0
    }
1933
0
    case TPlanNodeType::INTERSECT_NODE: {
1934
0
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(pool, tnode, descs, op,
1935
0
                                                                      cur_pipe, sink_ops));
1936
0
        break;
1937
0
    }
1938
0
    case TPlanNodeType::EXCEPT_NODE: {
1939
0
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(pool, tnode, descs, op,
1940
0
                                                                       cur_pipe, sink_ops));
1941
0
        break;
1942
0
    }
1943
0
    case TPlanNodeType::REPEAT_NODE: {
1944
0
        op = std::make_shared<RepeatOperatorX>(pool, tnode, next_operator_id(), descs);
1945
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1946
0
        break;
1947
0
    }
1948
0
    case TPlanNodeType::TABLE_FUNCTION_NODE: {
1949
0
        op = std::make_shared<TableFunctionOperatorX>(pool, tnode, next_operator_id(), descs);
1950
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1951
0
        break;
1952
0
    }
1953
0
    case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
1954
0
        op = std::make_shared<AssertNumRowsOperatorX>(pool, tnode, next_operator_id(), descs);
1955
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1956
0
        break;
1957
0
    }
1958
0
    case TPlanNodeType::EMPTY_SET_NODE: {
1959
0
        op = std::make_shared<EmptySetSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1960
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1961
0
        break;
1962
0
    }
1963
0
    case TPlanNodeType::DATA_GEN_SCAN_NODE: {
1964
0
        op = std::make_shared<DataGenSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1965
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1966
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1967
0
        break;
1968
0
    }
1969
0
    case TPlanNodeType::SCHEMA_SCAN_NODE: {
1970
0
        op = std::make_shared<SchemaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1971
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1972
0
        break;
1973
0
    }
1974
0
    case TPlanNodeType::META_SCAN_NODE: {
1975
0
        op = std::make_shared<MetaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1976
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1977
0
        break;
1978
0
    }
1979
0
    case TPlanNodeType::SELECT_NODE: {
1980
0
        op = std::make_shared<SelectOperatorX>(pool, tnode, next_operator_id(), descs);
1981
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1982
0
        break;
1983
0
    }
1984
0
    case TPlanNodeType::REC_CTE_NODE: {
1985
0
        op = std::make_shared<RecCTESourceOperatorX>(pool, tnode, next_operator_id(), descs);
1986
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1987
1988
0
        const auto downstream_pipeline_id = cur_pipe->id();
1989
0
        if (!_dag.contains(downstream_pipeline_id)) {
1990
0
            _dag.insert({downstream_pipeline_id, {}});
1991
0
        }
1992
1993
0
        PipelinePtr anchor_side_pipe = add_pipeline(cur_pipe);
1994
0
        _dag[downstream_pipeline_id].push_back(anchor_side_pipe->id());
1995
1996
0
        DataSinkOperatorPtr anchor_sink;
1997
0
        anchor_sink = std::make_shared<RecCTEAnchorSinkOperatorX>(next_sink_operator_id(),
1998
0
                                                                  op->operator_id(), tnode, descs);
1999
0
        RETURN_IF_ERROR(anchor_side_pipe->set_sink(anchor_sink));
2000
0
        RETURN_IF_ERROR(anchor_side_pipe->sink()->init(tnode, _runtime_state.get()));
2001
0
        _pipeline_parent_map.push(op->node_id(), anchor_side_pipe);
2002
2003
0
        PipelinePtr rec_side_pipe = add_pipeline(cur_pipe);
2004
0
        _dag[downstream_pipeline_id].push_back(rec_side_pipe->id());
2005
2006
0
        DataSinkOperatorPtr rec_sink;
2007
0
        rec_sink = std::make_shared<RecCTESinkOperatorX>(next_sink_operator_id(), op->operator_id(),
2008
0
                                                         tnode, descs);
2009
0
        RETURN_IF_ERROR(rec_side_pipe->set_sink(rec_sink));
2010
0
        RETURN_IF_ERROR(rec_side_pipe->sink()->init(tnode, _runtime_state.get()));
2011
0
        _pipeline_parent_map.push(op->node_id(), rec_side_pipe);
2012
2013
0
        break;
2014
0
    }
2015
0
    case TPlanNodeType::REC_CTE_SCAN_NODE: {
2016
0
        op = std::make_shared<RecCTEScanOperatorX>(pool, tnode, next_operator_id(), descs);
2017
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2018
0
        break;
2019
0
    }
2020
0
    case TPlanNodeType::LOCAL_EXCHANGE_NODE: {
2021
0
        op = std::make_shared<LocalExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs);
2022
        // The downstream pipeline (containing LocalExchangeSource) must have
2023
        // _num_instances tasks — matching BE-native _inherit_pipeline_properties
2024
        // which sets pipe_with_source.set_num_tasks(_num_instances).
2025
        // Without this, when the parent pipeline was reduced by a serial operator
2026
        // (e.g., serial Exchange with use_serial_exchange=true, or UNPARTITIONED
2027
        // Exchange), the downstream inherits the reduced num_tasks via
2028
        // add_pipeline(parent).  The deferred exchanger creates _num_instances
2029
        // channels but only fewer source tasks initialize mem_counters — the
2030
        // sink round-robins to all channels and crashes on uninitialized ones.
2031
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2032
        // Restore downstream pipeline's num_tasks (mirroring _inherit_pipeline_properties:
2033
        // downstream keeps _num_instances, upstream gets the serial/reduced count)
2034
0
        cur_pipe->set_num_tasks(_num_instances);
2035
2036
0
        const auto downstream_pipeline_id = cur_pipe->id();
2037
0
        if (!_dag.contains(downstream_pipeline_id)) {
2038
0
            _dag.insert({downstream_pipeline_id, {}});
2039
0
        }
2040
0
        cur_pipe = add_pipeline(cur_pipe);
2041
        // If this local exchange was inserted because of a serial scan (is_serial_operator),
2042
        // the upstream pipeline (cur_pipe) should have num_tasks=1 (only 1 scan task).
2043
        // We set this now so the exchanger is created with the correct sender count.
2044
        // Child operators added later (serial scan) will also set num_tasks=1, which is
2045
        // consistent with this.
2046
0
        if (op->is_serial_operator() && _parallel_instances > 0) {
2047
0
            cur_pipe->set_num_tasks(_parallel_instances);
2048
0
        }
2049
0
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
2050
0
        int num_partitions = 0;
2051
0
        std::map<int, int> shuffle_id_to_instance_idx;
2052
0
        auto partition_type = tnode.local_exchange_node.partition_type;
2053
0
        switch (partition_type) {
2054
0
        case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
2055
0
            num_partitions = _params.num_buckets;
2056
0
            shuffle_id_to_instance_idx = _params.bucket_seq_to_instance_idx;
2057
0
            break;
2058
0
        case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
2059
0
            for (int i = 0; i < _num_instances; i++) {
2060
0
                shuffle_id_to_instance_idx[i] = i;
2061
0
            }
2062
0
            num_partitions = _num_instances;
2063
0
            break;
2064
0
        case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
2065
0
            num_partitions = _total_instances;
2066
0
            shuffle_id_to_instance_idx = _params.shuffle_idx_to_instance_idx;
2067
0
            break;
2068
0
        default:
2069
0
            break;
2070
0
        }
2071
0
        auto local_exchange_id = op->operator_id();
2072
0
        auto sink_id = next_sink_operator_id();
2073
0
        DataSinkOperatorPtr sink = std::make_shared<LocalExchangeSinkOperatorX>(
2074
0
                sink_id, local_exchange_id, tnode, num_partitions, shuffle_id_to_instance_idx);
2075
0
        sink_ops.push_back(sink);
2076
0
        RETURN_IF_ERROR(cur_pipe->set_sink(sink));
2077
0
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
2078
2079
        // For FE-planned local exchange, we need to:
2080
        // 1. Initialize the partitioner for hash shuffle types
2081
        // 2. Defer exchanger creation until after the full plan tree is built
2082
        //    (child operators like serial ExchangeNode may change cur_pipe->num_tasks())
2083
        // 3. Register shared state so pipeline tasks can find it
2084
0
        RETURN_IF_ERROR(static_cast<LocalExchangeSinkOperatorX*>(cur_pipe->sink())
2085
0
                                ->init_partitioner(_runtime_state.get()));
2086
2087
0
        int free_blocks_limit =
2088
0
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
2089
0
                        ? cast_set<int>(
2090
0
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
2091
0
                        : 0;
2092
0
        auto shared_state = LocalExchangeSharedState::create_shared(_num_instances);
2093
0
        shared_state->create_source_dependencies(_num_instances, local_exchange_id,
2094
0
                                                 local_exchange_id, "LOCAL_EXCHANGE_OPERATOR");
2095
0
        shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
2096
0
        _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
2097
        // Defer exchanger creation: sender count depends on final upstream num_tasks
2098
0
        _deferred_exchangers.push_back({shared_state, cur_pipe, partition_type, num_partitions,
2099
0
                                        free_blocks_limit, local_exchange_id, sink_id});
2100
0
        break;
2101
0
    }
2102
0
    default:
2103
0
        return Status::InternalError("Unsupported exec type in pipeline: {}",
2104
0
                                     print_plan_node_type(tnode.node_type));
2105
0
    }
2106
0
    if (_params.__isset.parallel_instances && fe_with_old_version) {
2107
0
        cur_pipe->set_num_tasks(_params.parallel_instances);
2108
0
        op->set_serial_operator();
2109
0
    }
2110
2111
0
    return Status::OK();
2112
0
}
2113
// NOLINTEND(readability-function-cognitive-complexity)
2114
// NOLINTEND(readability-function-size)
2115
2116
template <bool is_intersect>
2117
Status PipelineFragmentContext::_build_operators_for_set_operation_node(
2118
        ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op,
2119
0
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
2120
0
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
2121
0
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2122
2123
0
    const auto downstream_pipeline_id = cur_pipe->id();
2124
0
    if (!_dag.contains(downstream_pipeline_id)) {
2125
0
        _dag.insert({downstream_pipeline_id, {}});
2126
0
    }
2127
2128
0
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
2129
0
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
2130
0
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
2131
2132
0
        if (child_id == 0) {
2133
0
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
2134
0
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2135
0
        } else {
2136
0
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
2137
0
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2138
0
        }
2139
0
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
2140
0
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
2141
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
2142
0
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
2143
0
    }
2144
2145
0
    return Status::OK();
2146
0
}
Unexecuted instantiation: _ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb1EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Unexecuted instantiation: _ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb0EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
2147
2148
0
Status PipelineFragmentContext::submit() {
2149
0
    if (_submitted) {
2150
0
        return Status::InternalError("submitted");
2151
0
    }
2152
0
    _submitted = true;
2153
2154
0
    int submit_tasks = 0;
2155
0
    Status st;
2156
0
    auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
2157
0
    for (auto& task : _tasks) {
2158
0
        for (auto& t : task) {
2159
0
            st = scheduler->submit(t.first);
2160
0
            DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed",
2161
0
                            { st = Status::Aborted("PipelineFragmentContext.submit.failed"); });
2162
0
            if (!st) {
2163
0
                cancel(Status::InternalError("submit context to executor fail"));
2164
0
                std::lock_guard<std::mutex> l(_task_mutex);
2165
0
                _total_tasks = submit_tasks;
2166
0
                break;
2167
0
            }
2168
0
            submit_tasks++;
2169
0
        }
2170
0
    }
2171
0
    if (!st.ok()) {
2172
0
        bool need_remove = false;
2173
0
        {
2174
0
            std::lock_guard<std::mutex> l(_task_mutex);
2175
0
            if (_closed_tasks >= _total_tasks) {
2176
0
                need_remove = _close_fragment_instance();
2177
0
            }
2178
0
        }
2179
        // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
2180
0
        if (need_remove) {
2181
0
            _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
2182
0
        }
2183
0
        return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(),
2184
0
                                     BackendOptions::get_localhost());
2185
0
    } else {
2186
0
        return st;
2187
0
    }
2188
0
}
2189
2190
0
void PipelineFragmentContext::print_profile(const std::string& extra_info) {
2191
0
    if (_runtime_state->enable_profile()) {
2192
0
        std::stringstream ss;
2193
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
2194
0
            runtime_profile_ptr->pretty_print(&ss);
2195
0
        }
2196
2197
0
        if (_runtime_state->load_channel_profile()) {
2198
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
2199
0
        }
2200
2201
0
        auto profile_str =
2202
0
                fmt::format("Query {} fragment {} {}, profile, {}", print_id(this->_query_id),
2203
0
                            this->_fragment_id, extra_info, ss.str());
2204
0
        LOG_LONG_STRING(INFO, profile_str);
2205
0
    }
2206
0
}
2207
// If all pipeline tasks binded to the fragment instance are finished, then we could
2208
// close the fragment instance.
2209
// Returns true if the caller should call remove_pipeline_context() **after** releasing
2210
// _task_mutex. We must not call remove_pipeline_context() here because it acquires
2211
// _pipeline_map's shard lock, and this function is called while _task_mutex is held.
2212
// Acquiring _pipeline_map while holding _task_mutex creates an ABBA deadlock with
2213
// dump_pipeline_tasks(), which acquires _pipeline_map first and then _task_mutex
2214
// (via debug_string()).
2215
0
bool PipelineFragmentContext::_close_fragment_instance() {
2216
0
    if (_is_fragment_instance_closed) {
2217
0
        return false;
2218
0
    }
2219
0
    Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
2220
0
    _fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
2221
0
    if (!_need_notify_close) {
2222
0
        auto st = send_report(true);
2223
0
        if (!st) {
2224
0
            LOG(WARNING) << fmt::format("Failed to send report for query {}, fragment {}: {}",
2225
0
                                        print_id(_query_id), _fragment_id, st.to_string());
2226
0
        }
2227
0
    }
2228
    // Print profile content in info log is a tempoeray solution for stream load and external_connector.
2229
    // Since stream load does not have someting like coordinator on FE, so
2230
    // backend can not report profile to FE, ant its profile can not be shown
2231
    // in the same way with other query. So we print the profile content to info log.
2232
2233
0
    if (_runtime_state->enable_profile() &&
2234
0
        (_query_ctx->get_query_source() == QuerySource::STREAM_LOAD ||
2235
0
         _query_ctx->get_query_source() == QuerySource::EXTERNAL_CONNECTOR ||
2236
0
         _query_ctx->get_query_source() == QuerySource::GROUP_COMMIT_LOAD)) {
2237
0
        std::stringstream ss;
2238
        // Compute the _local_time_percent before pretty_print the runtime_profile
2239
        // Before add this operation, the print out like that:
2240
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 00.00%)
2241
        // After add the operation, the print out like that:
2242
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 82.53%)
2243
        // We can easily know the exec node execute time without child time consumed.
2244
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
2245
0
            runtime_profile_ptr->pretty_print(&ss);
2246
0
        }
2247
2248
0
        if (_runtime_state->load_channel_profile()) {
2249
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
2250
0
        }
2251
2252
0
        LOG_INFO("Query {} fragment {} profile:\n {}", print_id(_query_id), _fragment_id, ss.str());
2253
0
    }
2254
2255
0
    if (_query_ctx->enable_profile()) {
2256
0
        _query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile(),
2257
0
                                         collect_realtime_load_channel_profile());
2258
0
    }
2259
2260
    // Return whether the caller needs to remove from the pipeline map.
2261
    // The caller must do this after releasing _task_mutex.
2262
0
    return !_need_notify_close;
2263
0
}
2264
2265
0
void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
2266
    // If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here
2267
0
    DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
2268
0
    if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
2269
0
        if (_dag.contains(pipeline_id)) {
2270
0
            for (auto dep : _dag[pipeline_id]) {
2271
0
                _pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id);
2272
0
            }
2273
0
        }
2274
0
    }
2275
0
    bool need_remove = false;
2276
0
    {
2277
0
        std::lock_guard<std::mutex> l(_task_mutex);
2278
0
        ++_closed_tasks;
2279
        // Update query-level finished task progress in real time.
2280
0
        _query_ctx->inc_finished_task_num();
2281
0
        if (_closed_tasks >= _total_tasks) {
2282
0
            need_remove = _close_fragment_instance();
2283
0
        }
2284
0
    }
2285
    // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
2286
0
    if (need_remove) {
2287
0
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
2288
0
    }
2289
0
}
2290
2291
0
std::string PipelineFragmentContext::get_load_error_url() {
2292
0
    if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) {
2293
0
        return to_load_error_http_path(str);
2294
0
    }
2295
0
    for (auto& tasks : _tasks) {
2296
0
        for (auto& task : tasks) {
2297
0
            if (const auto& str = task.second->get_error_log_file_path(); !str.empty()) {
2298
0
                return to_load_error_http_path(str);
2299
0
            }
2300
0
        }
2301
0
    }
2302
0
    return "";
2303
0
}
2304
2305
0
std::string PipelineFragmentContext::get_first_error_msg() {
2306
0
    if (const auto& str = _runtime_state->get_first_error_msg(); !str.empty()) {
2307
0
        return str;
2308
0
    }
2309
0
    for (auto& tasks : _tasks) {
2310
0
        for (auto& task : tasks) {
2311
0
            if (const auto& str = task.second->get_first_error_msg(); !str.empty()) {
2312
0
                return str;
2313
0
            }
2314
0
        }
2315
0
    }
2316
0
    return "";
2317
0
}
2318
2319
0
std::string PipelineFragmentContext::_to_http_path(const std::string& file_name) const {
2320
0
    std::stringstream url;
2321
0
    url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
2322
0
        << "/api/_download_load?"
2323
0
        << "token=" << _exec_env->token() << "&file=" << file_name;
2324
0
    return url.str();
2325
0
}
2326
2327
0
void PipelineFragmentContext::_coordinator_callback(const ReportStatusRequest& req) {
2328
0
    DBUG_EXECUTE_IF("FragmentMgr::coordinator_callback.report_delay", {
2329
0
        int random_seconds = req.status.is<ErrorCode::DATA_QUALITY_ERROR>() ? 8 : 2;
2330
0
        LOG_INFO("sleep : ").tag("time", random_seconds).tag("query_id", print_id(req.query_id));
2331
0
        std::this_thread::sleep_for(std::chrono::seconds(random_seconds));
2332
0
        LOG_INFO("sleep done").tag("query_id", print_id(req.query_id));
2333
0
    });
2334
2335
0
    DCHECK(req.status.ok() || req.done); // if !status.ok() => done
2336
0
    if (req.coord_addr.hostname == "external") {
2337
        // External query (flink/spark read tablets) not need to report to FE.
2338
0
        return;
2339
0
    }
2340
0
    int callback_retries = 10;
2341
0
    const int sleep_ms = 1000;
2342
0
    Status exec_status = req.status;
2343
0
    Status coord_status;
2344
0
    std::unique_ptr<FrontendServiceConnection> coord = nullptr;
2345
0
    do {
2346
0
        coord = std::make_unique<FrontendServiceConnection>(_exec_env->frontend_client_cache(),
2347
0
                                                            req.coord_addr, &coord_status);
2348
0
        if (!coord_status.ok()) {
2349
0
            std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
2350
0
        }
2351
0
    } while (!coord_status.ok() && callback_retries-- > 0);
2352
2353
0
    if (!coord_status.ok()) {
2354
0
        UniqueId uid(req.query_id.hi, req.query_id.lo);
2355
0
        static_cast<void>(req.cancel_fn(Status::InternalError(
2356
0
                "query_id: {}, couldn't get a client for {}, reason is {}", uid.to_string(),
2357
0
                PrintThriftNetworkAddress(req.coord_addr), coord_status.to_string())));
2358
0
        return;
2359
0
    }
2360
2361
0
    TReportExecStatusParams params;
2362
0
    params.protocol_version = FrontendServiceVersion::V1;
2363
0
    params.__set_query_id(req.query_id);
2364
0
    params.__set_backend_num(req.backend_num);
2365
0
    params.__set_fragment_instance_id(req.fragment_instance_id);
2366
0
    params.__set_fragment_id(req.fragment_id);
2367
0
    params.__set_status(exec_status.to_thrift());
2368
0
    params.__set_done(req.done);
2369
0
    params.__set_query_type(req.runtime_state->query_type());
2370
0
    params.__isset.profile = false;
2371
2372
0
    DCHECK(req.runtime_state != nullptr);
2373
2374
0
    if (req.runtime_state->query_type() == TQueryType::LOAD) {
2375
0
        params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
2376
0
        params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
2377
0
    } else {
2378
0
        DCHECK(!req.runtime_states.empty());
2379
0
        if (!req.runtime_state->output_files().empty()) {
2380
0
            params.__isset.delta_urls = true;
2381
0
            for (auto& it : req.runtime_state->output_files()) {
2382
0
                params.delta_urls.push_back(_to_http_path(it));
2383
0
            }
2384
0
        }
2385
0
        if (!params.delta_urls.empty()) {
2386
0
            params.__isset.delta_urls = true;
2387
0
        }
2388
0
    }
2389
2390
0
    static std::string s_dpp_normal_all = "dpp.norm.ALL";
2391
0
    static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
2392
0
    static std::string s_unselected_rows = "unselected.rows";
2393
0
    int64_t num_rows_load_success = 0;
2394
0
    int64_t num_rows_load_filtered = 0;
2395
0
    int64_t num_rows_load_unselected = 0;
2396
0
    if (req.runtime_state->num_rows_load_total() > 0 ||
2397
0
        req.runtime_state->num_rows_load_filtered() > 0 ||
2398
0
        req.runtime_state->num_finished_range() > 0) {
2399
0
        params.__isset.load_counters = true;
2400
2401
0
        num_rows_load_success = req.runtime_state->num_rows_load_success();
2402
0
        num_rows_load_filtered = req.runtime_state->num_rows_load_filtered();
2403
0
        num_rows_load_unselected = req.runtime_state->num_rows_load_unselected();
2404
0
        params.__isset.fragment_instance_reports = true;
2405
0
        TFragmentInstanceReport t;
2406
0
        t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id());
2407
0
        t.__set_num_finished_range(cast_set<int>(req.runtime_state->num_finished_range()));
2408
0
        t.__set_loaded_rows(req.runtime_state->num_rows_load_total());
2409
0
        t.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
2410
0
        params.fragment_instance_reports.push_back(t);
2411
0
    } else if (!req.runtime_states.empty()) {
2412
0
        for (auto* rs : req.runtime_states) {
2413
0
            if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 ||
2414
0
                rs->num_finished_range() > 0) {
2415
0
                params.__isset.load_counters = true;
2416
0
                num_rows_load_success += rs->num_rows_load_success();
2417
0
                num_rows_load_filtered += rs->num_rows_load_filtered();
2418
0
                num_rows_load_unselected += rs->num_rows_load_unselected();
2419
0
                params.__isset.fragment_instance_reports = true;
2420
0
                TFragmentInstanceReport t;
2421
0
                t.__set_fragment_instance_id(rs->fragment_instance_id());
2422
0
                t.__set_num_finished_range(cast_set<int>(rs->num_finished_range()));
2423
0
                t.__set_loaded_rows(rs->num_rows_load_total());
2424
0
                t.__set_loaded_bytes(rs->num_bytes_load_total());
2425
0
                params.fragment_instance_reports.push_back(t);
2426
0
            }
2427
0
        }
2428
0
    }
2429
0
    params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success));
2430
0
    params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered));
2431
0
    params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected));
2432
2433
0
    if (!req.load_error_url.empty()) {
2434
0
        params.__set_tracking_url(req.load_error_url);
2435
0
    }
2436
0
    if (!req.first_error_msg.empty()) {
2437
0
        params.__set_first_error_msg(req.first_error_msg);
2438
0
    }
2439
0
    for (auto* rs : req.runtime_states) {
2440
0
        if (rs->wal_id() > 0) {
2441
0
            params.__set_txn_id(rs->wal_id());
2442
0
            params.__set_label(rs->import_label());
2443
0
        }
2444
0
    }
2445
0
    if (!req.runtime_state->export_output_files().empty()) {
2446
0
        params.__isset.export_files = true;
2447
0
        params.export_files = req.runtime_state->export_output_files();
2448
0
    } else if (!req.runtime_states.empty()) {
2449
0
        for (auto* rs : req.runtime_states) {
2450
0
            if (!rs->export_output_files().empty()) {
2451
0
                params.__isset.export_files = true;
2452
0
                params.export_files.insert(params.export_files.end(),
2453
0
                                           rs->export_output_files().begin(),
2454
0
                                           rs->export_output_files().end());
2455
0
            }
2456
0
        }
2457
0
    }
2458
0
    if (auto tci = req.runtime_state->tablet_commit_infos(); !tci.empty()) {
2459
0
        params.__isset.commitInfos = true;
2460
0
        params.commitInfos.insert(params.commitInfos.end(), tci.begin(), tci.end());
2461
0
    } else if (!req.runtime_states.empty()) {
2462
0
        for (auto* rs : req.runtime_states) {
2463
0
            if (auto rs_tci = rs->tablet_commit_infos(); !rs_tci.empty()) {
2464
0
                params.__isset.commitInfos = true;
2465
0
                params.commitInfos.insert(params.commitInfos.end(), rs_tci.begin(), rs_tci.end());
2466
0
            }
2467
0
        }
2468
0
    }
2469
0
    if (auto eti = req.runtime_state->error_tablet_infos(); !eti.empty()) {
2470
0
        params.__isset.errorTabletInfos = true;
2471
0
        params.errorTabletInfos.insert(params.errorTabletInfos.end(), eti.begin(), eti.end());
2472
0
    } else if (!req.runtime_states.empty()) {
2473
0
        for (auto* rs : req.runtime_states) {
2474
0
            if (auto rs_eti = rs->error_tablet_infos(); !rs_eti.empty()) {
2475
0
                params.__isset.errorTabletInfos = true;
2476
0
                params.errorTabletInfos.insert(params.errorTabletInfos.end(), rs_eti.begin(),
2477
0
                                               rs_eti.end());
2478
0
            }
2479
0
        }
2480
0
    }
2481
0
    if (auto hpu = req.runtime_state->hive_partition_updates(); !hpu.empty()) {
2482
0
        params.__isset.hive_partition_updates = true;
2483
0
        params.hive_partition_updates.insert(params.hive_partition_updates.end(), hpu.begin(),
2484
0
                                             hpu.end());
2485
0
    } else if (!req.runtime_states.empty()) {
2486
0
        for (auto* rs : req.runtime_states) {
2487
0
            if (auto rs_hpu = rs->hive_partition_updates(); !rs_hpu.empty()) {
2488
0
                params.__isset.hive_partition_updates = true;
2489
0
                params.hive_partition_updates.insert(params.hive_partition_updates.end(),
2490
0
                                                     rs_hpu.begin(), rs_hpu.end());
2491
0
            }
2492
0
        }
2493
0
    }
2494
0
    if (auto icd = req.runtime_state->iceberg_commit_datas(); !icd.empty()) {
2495
0
        params.__isset.iceberg_commit_datas = true;
2496
0
        params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(), icd.begin(),
2497
0
                                           icd.end());
2498
0
    } else if (!req.runtime_states.empty()) {
2499
0
        for (auto* rs : req.runtime_states) {
2500
0
            if (auto rs_icd = rs->iceberg_commit_datas(); !rs_icd.empty()) {
2501
0
                params.__isset.iceberg_commit_datas = true;
2502
0
                params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(),
2503
0
                                                   rs_icd.begin(), rs_icd.end());
2504
0
            }
2505
0
        }
2506
0
    }
2507
2508
0
    if (auto mcd = req.runtime_state->mc_commit_datas(); !mcd.empty()) {
2509
0
        params.__isset.mc_commit_datas = true;
2510
0
        params.mc_commit_datas.insert(params.mc_commit_datas.end(), mcd.begin(), mcd.end());
2511
0
    } else if (!req.runtime_states.empty()) {
2512
0
        for (auto* rs : req.runtime_states) {
2513
0
            if (auto rs_mcd = rs->mc_commit_datas(); !rs_mcd.empty()) {
2514
0
                params.__isset.mc_commit_datas = true;
2515
0
                params.mc_commit_datas.insert(params.mc_commit_datas.end(), rs_mcd.begin(),
2516
0
                                              rs_mcd.end());
2517
0
            }
2518
0
        }
2519
0
    }
2520
2521
0
    req.runtime_state->get_unreported_errors(&(params.error_log));
2522
0
    params.__isset.error_log = (!params.error_log.empty());
2523
2524
0
    if (_exec_env->cluster_info()->backend_id != 0) {
2525
0
        params.__set_backend_id(_exec_env->cluster_info()->backend_id);
2526
0
    }
2527
2528
0
    TReportExecStatusResult res;
2529
0
    Status rpc_status;
2530
2531
0
    VLOG_DEBUG << "reportExecStatus params is "
2532
0
               << apache::thrift::ThriftDebugString(params).c_str();
2533
0
    if (!exec_status.ok()) {
2534
0
        LOG(WARNING) << "report error status: " << exec_status.msg()
2535
0
                     << " to coordinator: " << req.coord_addr
2536
0
                     << ", query id: " << print_id(req.query_id);
2537
0
    }
2538
0
    try {
2539
0
        try {
2540
0
            (*coord)->reportExecStatus(res, params);
2541
0
        } catch ([[maybe_unused]] apache::thrift::transport::TTransportException& e) {
2542
#ifndef ADDRESS_SANITIZER
2543
            LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(req.query_id)
2544
                         << ", instance id: " << print_id(req.fragment_instance_id) << " to "
2545
                         << req.coord_addr << ", err: " << e.what();
2546
#endif
2547
0
            rpc_status = coord->reopen();
2548
2549
0
            if (!rpc_status.ok()) {
2550
0
                req.cancel_fn(rpc_status);
2551
0
                return;
2552
0
            }
2553
0
            (*coord)->reportExecStatus(res, params);
2554
0
        }
2555
2556
0
        rpc_status = Status::create<false>(res.status);
2557
0
    } catch (apache::thrift::TException& e) {
2558
0
        rpc_status = Status::InternalError("ReportExecStatus() to {} failed: {}",
2559
0
                                           PrintThriftNetworkAddress(req.coord_addr), e.what());
2560
0
    }
2561
2562
0
    if (!rpc_status.ok()) {
2563
0
        LOG_INFO("Going to cancel query {} since report exec status got rpc failed: {}",
2564
0
                 print_id(req.query_id), rpc_status.to_string());
2565
0
        req.cancel_fn(rpc_status);
2566
0
    }
2567
0
}
2568
2569
0
Status PipelineFragmentContext::send_report(bool done) {
2570
0
    Status exec_status = _query_ctx->exec_status();
2571
    // If plan is done successfully, but _is_report_success is false,
2572
    // no need to send report.
2573
    // Load will set _is_report_success to true because load wants to know
2574
    // the process.
2575
0
    if (!_is_report_success && done && exec_status.ok()) {
2576
0
        return Status::OK();
2577
0
    }
2578
2579
    // If both _is_report_success and _is_report_on_cancel are false,
2580
    // which means no matter query is success or failed, no report is needed.
2581
    // This may happen when the query limit reached and
2582
    // a internal cancellation being processed
2583
    // When limit is reached the fragment is also cancelled, but _is_report_on_cancel will
2584
    // be set to false, to avoid sending fault report to FE.
2585
0
    if (!_is_report_success && !_is_report_on_cancel) {
2586
0
        if (done) {
2587
            // if done is true, which means the query is finished successfully, we can safely close the fragment instance without sending report to FE, and just return OK status here.
2588
0
            return Status::OK();
2589
0
        }
2590
0
        return Status::NeedSendAgain("");
2591
0
    }
2592
2593
0
    std::vector<RuntimeState*> runtime_states;
2594
2595
0
    for (auto& tasks : _tasks) {
2596
0
        for (auto& task : tasks) {
2597
0
            runtime_states.push_back(task.second.get());
2598
0
        }
2599
0
    }
2600
2601
0
    std::string load_eror_url = _query_ctx->get_load_error_url().empty()
2602
0
                                        ? get_load_error_url()
2603
0
                                        : _query_ctx->get_load_error_url();
2604
0
    std::string first_error_msg = _query_ctx->get_first_error_msg().empty()
2605
0
                                          ? get_first_error_msg()
2606
0
                                          : _query_ctx->get_first_error_msg();
2607
2608
0
    ReportStatusRequest req {.status = exec_status,
2609
0
                             .runtime_states = runtime_states,
2610
0
                             .done = done || !exec_status.ok(),
2611
0
                             .coord_addr = _query_ctx->coord_addr,
2612
0
                             .query_id = _query_id,
2613
0
                             .fragment_id = _fragment_id,
2614
0
                             .fragment_instance_id = TUniqueId(),
2615
0
                             .backend_num = -1,
2616
0
                             .runtime_state = _runtime_state.get(),
2617
0
                             .load_error_url = load_eror_url,
2618
0
                             .first_error_msg = first_error_msg,
2619
0
                             .cancel_fn = [this](const Status& reason) { cancel(reason); }};
2620
0
    auto ctx = std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this());
2621
0
    return _exec_env->fragment_mgr()->get_thread_pool()->submit_func([this, req, ctx]() {
2622
0
        SCOPED_ATTACH_TASK(ctx->get_query_ctx()->query_mem_tracker());
2623
0
        _coordinator_callback(req);
2624
0
        if (!req.done) {
2625
0
            ctx->refresh_next_report_time();
2626
0
        }
2627
0
    });
2628
0
}
2629
2630
0
size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const {
2631
0
    size_t res = 0;
2632
    // _tasks will be cleared during ~PipelineFragmentContext, so that it's safe
2633
    // here to traverse the vector.
2634
0
    for (const auto& task_instances : _tasks) {
2635
0
        for (const auto& task : task_instances) {
2636
0
            if (task.first->is_running()) {
2637
0
                LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id)
2638
0
                                      << " is running, task: " << (void*)task.first.get()
2639
0
                                      << ", is_running: " << task.first->is_running();
2640
0
                *has_running_task = true;
2641
0
                return 0;
2642
0
            }
2643
2644
0
            size_t revocable_size = task.first->get_revocable_size();
2645
0
            if (revocable_size >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2646
0
                res += revocable_size;
2647
0
            }
2648
0
        }
2649
0
    }
2650
0
    return res;
2651
0
}
2652
2653
0
std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const {
2654
0
    std::vector<PipelineTask*> revocable_tasks;
2655
0
    for (const auto& task_instances : _tasks) {
2656
0
        for (const auto& task : task_instances) {
2657
0
            size_t revocable_size_ = task.first->get_revocable_size();
2658
2659
0
            if (revocable_size_ >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2660
0
                revocable_tasks.emplace_back(task.first.get());
2661
0
            }
2662
0
        }
2663
0
    }
2664
0
    return revocable_tasks;
2665
0
}
2666
2667
0
std::string PipelineFragmentContext::debug_string() {
2668
0
    std::lock_guard<std::mutex> l(_task_mutex);
2669
0
    fmt::memory_buffer debug_string_buffer;
2670
0
    fmt::format_to(debug_string_buffer,
2671
0
                   "PipelineFragmentContext Info: _closed_tasks={}, _total_tasks={}, "
2672
0
                   "need_notify_close={}, fragment_id={}, _rec_cte_stage={}\n",
2673
0
                   _closed_tasks, _total_tasks, _need_notify_close, _fragment_id, _rec_cte_stage);
2674
0
    for (size_t j = 0; j < _tasks.size(); j++) {
2675
0
        fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
2676
0
        for (size_t i = 0; i < _tasks[j].size(); i++) {
2677
0
            fmt::format_to(debug_string_buffer, "Task {}: {}\n", i,
2678
0
                           _tasks[j][i].first->debug_string());
2679
0
        }
2680
0
    }
2681
2682
0
    return fmt::to_string(debug_string_buffer);
2683
0
}
2684
2685
std::vector<std::shared_ptr<TRuntimeProfileTree>>
2686
0
PipelineFragmentContext::collect_realtime_profile() const {
2687
0
    std::vector<std::shared_ptr<TRuntimeProfileTree>> res;
2688
2689
    // we do not have mutex to protect pipeline_id_to_profile
2690
    // so we need to make sure this funciton is invoked after fragment context
2691
    // has already been prepared.
2692
0
    if (!_prepared) {
2693
0
        std::string msg =
2694
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2695
0
        DCHECK(false) << msg;
2696
0
        LOG_ERROR(msg);
2697
0
        return res;
2698
0
    }
2699
2700
    // Make sure first profile is fragment level profile
2701
0
    auto fragment_profile = std::make_shared<TRuntimeProfileTree>();
2702
0
    _fragment_level_profile->to_thrift(fragment_profile.get(), _runtime_state->profile_level());
2703
0
    res.push_back(fragment_profile);
2704
2705
    // pipeline_id_to_profile is initialized in prepare stage
2706
0
    for (auto pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
2707
0
        auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
2708
0
        pipeline_profile->to_thrift(profile_ptr.get(), _runtime_state->profile_level());
2709
0
        res.push_back(profile_ptr);
2710
0
    }
2711
2712
0
    return res;
2713
0
}
2714
2715
std::shared_ptr<TRuntimeProfileTree>
2716
0
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
2717
    // we do not have mutex to protect pipeline_id_to_profile
2718
    // so we need to make sure this funciton is invoked after fragment context
2719
    // has already been prepared.
2720
0
    if (!_prepared) {
2721
0
        std::string msg =
2722
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2723
0
        DCHECK(false) << msg;
2724
0
        LOG_ERROR(msg);
2725
0
        return nullptr;
2726
0
    }
2727
2728
0
    for (const auto& tasks : _tasks) {
2729
0
        for (const auto& task : tasks) {
2730
0
            if (task.second->load_channel_profile() == nullptr) {
2731
0
                continue;
2732
0
            }
2733
2734
0
            auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2735
2736
0
            task.second->load_channel_profile()->to_thrift(tmp_load_channel_profile.get(),
2737
0
                                                           _runtime_state->profile_level());
2738
0
            _runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
2739
0
        }
2740
0
    }
2741
2742
0
    auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2743
0
    _runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get(),
2744
0
                                                      _runtime_state->profile_level());
2745
0
    return load_channel_profile;
2746
0
}
2747
2748
// Collect runtime filter IDs registered by all tasks in this PFC.
2749
// Used during recursive CTE stage transitions to know which filters to deregister
2750
// before creating the new PFC for the next recursion round.
2751
// Called from rerun_fragment(wait_for_destroy) while tasks are still closing.
2752
// Thread safety: safe because _tasks is structurally immutable after prepare() —
2753
// the vector sizes do not change, and individual RuntimeState filter sets are
2754
// written only during open() which has completed by the time we reach rerun.
2755
0
std::set<int> PipelineFragmentContext::get_deregister_runtime_filter() const {
2756
0
    std::set<int> result;
2757
0
    for (const auto& _task : _tasks) {
2758
0
        for (const auto& task : _task) {
2759
0
            auto set = task.first->runtime_state()->get_deregister_runtime_filter();
2760
0
            result.merge(set);
2761
0
        }
2762
0
    }
2763
0
    if (_runtime_state) {
2764
0
        auto set = _runtime_state->get_deregister_runtime_filter();
2765
0
        result.merge(set);
2766
0
    }
2767
0
    return result;
2768
0
}
2769
2770
27
void PipelineFragmentContext::_release_resource() {
2771
27
    std::lock_guard<std::mutex> l(_task_mutex);
2772
    // The memory released by the query end is recorded in the query mem tracker.
2773
27
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
2774
27
    auto st = _query_ctx->exec_status();
2775
27
    for (auto& _task : _tasks) {
2776
0
        if (!_task.empty()) {
2777
0
            _call_back(_task.front().first->runtime_state(), &st);
2778
0
        }
2779
0
    }
2780
27
    _tasks.clear();
2781
27
    _dag.clear();
2782
27
    _pip_id_to_pipeline.clear();
2783
27
    _pipelines.clear();
2784
27
    _sink.reset();
2785
27
    _root_op.reset();
2786
27
    _runtime_filter_mgr_map.clear();
2787
27
    _op_id_to_shared_state.clear();
2788
27
}
2789
2790
} // namespace doris