Coverage Report

Created: 2026-05-19 18:12

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/pipeline/pipeline_fragment_context.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <brpc/closure_guard.h>
21
#include <gen_cpp/Partitions_types.h>
22
#include <gen_cpp/Types_types.h>
23
#include <gen_cpp/types.pb.h>
24
25
#include <atomic>
26
#include <cstddef>
27
#include <cstdint>
28
#include <functional>
29
#include <memory>
30
#include <mutex>
31
#include <set>
32
#include <string>
33
#include <vector>
34
35
#include "common/status.h"
36
#include "exec/pipeline/pipeline.h"
37
#include "exec/pipeline/pipeline_task.h"
38
#include "runtime/query_context.h"
39
#include "runtime/runtime_profile.h"
40
#include "runtime/runtime_state.h"
41
#include "runtime/task_execution_context.h"
42
#include "util/stopwatch.hpp"
43
#include "util/uid_util.h"
44
45
namespace doris {
46
struct ReportStatusRequest;
47
class ExecEnv;
48
class RuntimeFilterMergeControllerEntity;
49
class TDataSink;
50
class TPipelineFragmentParams;
51
52
class Dependency;
53
struct LocalExchangeSharedState;
54
55
class PipelineFragmentContext : public TaskExecutionContext {
56
public:
57
    ENABLE_FACTORY_CREATOR(PipelineFragmentContext);
58
    PipelineFragmentContext(TUniqueId query_id, const TPipelineFragmentParams& request,
59
                            std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
60
                            const std::function<void(RuntimeState*, Status*)>& call_back);
61
62
    ~PipelineFragmentContext() override;
63
64
    void print_profile(const std::string& extra_info);
65
66
    std::vector<std::shared_ptr<TRuntimeProfileTree>> collect_realtime_profile() const;
67
    std::shared_ptr<TRuntimeProfileTree> collect_realtime_load_channel_profile() const;
68
69
    bool is_timeout(timespec now) const;
70
71
20.9k
    uint64_t elapsed_time() const { return _fragment_watcher.elapsed_time(); }
72
73
0
    int timeout_second() const { return _timeout; }
74
75
    PipelinePtr add_pipeline(PipelinePtr parent = nullptr, int idx = -1);
76
77
169k
    QueryContext* get_query_ctx() { return _query_ctx.get(); }
78
3.32M
    [[nodiscard]] bool is_canceled() const { return _query_ctx->is_cancelled(); }
79
80
    Status prepare(ThreadPool* thread_pool);
81
82
    Status submit();
83
84
0
    void set_is_report_success(bool is_report_success) { _is_report_success = is_report_success; }
85
86
    void cancel(const Status reason);
87
88
    bool notify_close();
89
90
26
    TUniqueId get_query_id() const { return _query_id; }
91
92
6
    [[nodiscard]] int get_fragment_id() const { return _fragment_id; }
93
94
    void decrement_running_task(PipelineId pipeline_id);
95
96
2
    uint32_t rec_cte_stage() const { return _rec_cte_stage; }
97
0
    void set_rec_cte_stage(uint32_t stage) { _rec_cte_stage = stage; }
98
99
    Status send_report(bool);
100
101
    void trigger_report_if_necessary();
102
    void refresh_next_report_time();
103
104
    std::string debug_string();
105
106
0
    [[nodiscard]] int next_operator_id() { return _operator_id--; }
107
108
0
    [[nodiscard]] int max_operator_id() const { return _operator_id; }
109
110
0
    [[nodiscard]] int next_sink_operator_id() { return _sink_operator_id--; }
111
112
    [[nodiscard]] size_t get_revocable_size(bool* has_running_task) const;
113
114
    [[nodiscard]] std::vector<PipelineTask*> get_revocable_tasks() const;
115
116
0
    void clear_finished_tasks() {
117
0
        if (_need_notify_close) {
118
0
            return;
119
0
        }
120
0
        for (size_t j = 0; j < _tasks.size(); j++) {
121
0
            for (size_t i = 0; i < _tasks[j].size(); i++) {
122
0
                _tasks[j][i].first->stop_if_finished();
123
0
            }
124
0
        }
125
0
    }
126
127
    std::string get_load_error_url();
128
    std::string get_first_error_msg();
129
130
    std::set<int> get_deregister_runtime_filter() const;
131
132
    // Store the brpc ClosureGuard so the RPC response is deferred until this PFC is destroyed.
133
    // When need_send_report_on_destruction is true (final_close), send the report immediately
134
    // and do not store the guard (let it fire on return to complete the RPC).
135
    //
136
    // Thread safety: This method is NOT thread-safe. It reads/writes _wait_close_guard without
137
    // synchronization. Currently it is only called from rerun_fragment() which is invoked
138
    // sequentially by RecCTESourceOperatorX (a serial operator) — one opcode at a time per
139
    // fragment. Do NOT call this concurrently from multiple threads.
140
    Status listen_wait_close(const std::shared_ptr<brpc::ClosureGuard>& guard,
141
0
                             bool need_send_report_on_destruction) {
142
0
        if (_wait_close_guard) {
143
0
            return Status::InternalError("Already listening wait close");
144
0
        }
145
0
        if (need_send_report_on_destruction) {
146
0
            return send_report(true);
147
0
        } else {
148
0
            _wait_close_guard = guard;
149
0
        }
150
0
        return Status::OK();
151
0
    }
152
153
private:
154
    void _coordinator_callback(const ReportStatusRequest& req);
155
    std::string _to_http_path(const std::string& file_name) const;
156
157
    void _release_resource();
158
159
    Status _build_and_prepare_full_pipeline(ThreadPool* thread_pool);
160
161
    Status _build_pipelines(ObjectPool* pool, const DescriptorTbl& descs, OperatorPtr* root,
162
                            PipelinePtr cur_pipe);
163
    Status _create_tree_helper(ObjectPool* pool, const std::vector<TPlanNode>& tnodes,
164
                               const DescriptorTbl& descs, OperatorPtr parent, int* node_idx,
165
                               OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx,
166
                               const bool followed_by_shuffled_join,
167
                               const bool require_bucket_distribution);
168
169
    Status _create_operator(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs,
170
                            OperatorPtr& op, PipelinePtr& cur_pipe, int parent_idx, int child_idx,
171
                            const bool followed_by_shuffled_join,
172
                            const bool require_bucket_distribution, OperatorPtr& cache_op);
173
    template <bool is_intersect>
174
    Status _build_operators_for_set_operation_node(ObjectPool* pool, const TPlanNode& tnode,
175
                                                   const DescriptorTbl& descs, OperatorPtr& op,
176
                                                   PipelinePtr& cur_pipe,
177
                                                   std::vector<DataSinkOperatorPtr>& sink_ops);
178
179
    Status _create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
180
                             const std::vector<TExpr>& output_exprs,
181
                             const TPipelineFragmentParams& params, const RowDescriptor& row_desc,
182
                             RuntimeState* state, DescriptorTbl& desc_tbl,
183
                             PipelineId cur_pipeline_id);
184
    Status _plan_local_exchange(int num_buckets,
185
                                const std::map<int, int>& bucket_seq_to_instance_idx,
186
                                const std::map<int, int>& shuffle_idx_to_instance_idx);
187
    Status _plan_local_exchange(int num_buckets, int pip_idx, PipelinePtr pip,
188
                                const std::map<int, int>& bucket_seq_to_instance_idx,
189
                                const std::map<int, int>& shuffle_idx_to_instance_idx);
190
    void _inherit_pipeline_properties(const DataDistribution& data_distribution,
191
                                      PipelinePtr pipe_with_source, PipelinePtr pipe_with_sink);
192
    Status _add_local_exchange(int pip_idx, int idx, int node_id, ObjectPool* pool,
193
                               PipelinePtr cur_pipe, DataDistribution data_distribution,
194
                               bool* do_local_exchange, int num_buckets,
195
                               const std::map<int, int>& bucket_seq_to_instance_idx,
196
                               const std::map<int, int>& shuffle_idx_to_instance_idx);
197
    Status _add_local_exchange_impl(int idx, ObjectPool* pool, PipelinePtr cur_pipe,
198
                                    PipelinePtr new_pip, DataDistribution data_distribution,
199
                                    bool* do_local_exchange, int num_buckets,
200
                                    const std::map<int, int>& bucket_seq_to_instance_idx,
201
                                    const std::map<int, int>& shuffle_idx_to_instance_idx);
202
203
    Status _build_pipeline_tasks(ThreadPool* thread_pool);
204
    Status _build_pipeline_tasks_for_instance(
205
            int instance_idx,
206
            const std::vector<std::shared_ptr<RuntimeProfile>>& pipeline_id_to_profile);
207
    // Close the fragment instance and return true if the caller should call
208
    // remove_pipeline_context() **after** releasing _task_mutex. This avoids
209
    // holding _task_mutex while acquiring _pipeline_map's shard lock, which
210
    // would create an ABBA deadlock with dump_pipeline_tasks().
211
    bool _close_fragment_instance();
212
    void _init_next_report_time();
213
214
    // Id of this query
215
    TUniqueId _query_id;
216
    int _fragment_id;
217
218
    ExecEnv* _exec_env = nullptr;
219
220
    std::atomic_bool _prepared = false;
221
    bool _submitted = false;
222
223
    Pipelines _pipelines;
224
    PipelineId _next_pipeline_id = 0;
225
    std::mutex _task_mutex;
226
    int _closed_tasks = 0;
227
    // After prepared, `_total_tasks` is equal to the size of `_tasks`.
228
    // When submit fail, `_total_tasks` is equal to the number of tasks submitted.
229
    std::atomic<int> _total_tasks = 0;
230
231
    std::unique_ptr<RuntimeProfile> _fragment_level_profile;
232
    bool _is_report_success = false;
233
234
    std::unique_ptr<RuntimeState> _runtime_state;
235
236
    std::shared_ptr<QueryContext> _query_ctx;
237
238
    MonotonicStopWatch _fragment_watcher;
239
    RuntimeProfile::Counter* _prepare_timer = nullptr;
240
    RuntimeProfile::Counter* _init_context_timer = nullptr;
241
    RuntimeProfile::Counter* _build_pipelines_timer = nullptr;
242
    RuntimeProfile::Counter* _plan_local_exchanger_timer = nullptr;
243
    RuntimeProfile::Counter* _prepare_all_pipelines_timer = nullptr;
244
    RuntimeProfile::Counter* _build_tasks_timer = nullptr;
245
246
    std::function<void(RuntimeState*, Status*)> _call_back;
247
    std::atomic_bool _is_fragment_instance_closed = false;
248
249
    // If this is set to false, and '_is_report_success' is false as well,
250
    // This executor will not report status to FE on being cancelled.
251
    bool _is_report_on_cancel;
252
253
    // 0 indicates reporting is in progress or not required
254
    std::atomic_bool _disable_period_report = true;
255
    std::atomic_uint64_t _previous_report_time = 0;
256
257
    DescriptorTbl* _desc_tbl = nullptr;
258
    int _num_instances = 1;
259
260
    int _timeout = -1;
261
    bool _use_serial_source = false;
262
263
    OperatorPtr _root_op = nullptr;
264
    //
265
    /**
266
     * Matrix stores tasks with local runtime states.
267
     * This is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines.
268
     *
269
     * 2-D matrix:
270
     * +-------------------------+------------+-------+
271
     * |            | Pipeline 0 | Pipeline 1 |  ...  |
272
     * +------------+------------+------------+-------+
273
     * | Instance 0 |  task 0-0  |  task 0-1  |  ...  |
274
     * +------------+------------+------------+-------+
275
     * | Instance 1 |  task 1-0  |  task 1-1  |  ...  |
276
     * +------------+------------+------------+-------+
277
     * | ...                                          |
278
     * +--------------------------------------+-------+
279
     */
280
    std::vector<
281
            std::vector<std::pair<std::shared_ptr<PipelineTask>, std::unique_ptr<RuntimeState>>>>
282
            _tasks;
283
284
    // TODO: remove the _sink and _multi_cast_stream_sink_senders to set both
285
    // of it in pipeline task not the fragment_context
286
#ifdef __clang__
287
#pragma clang diagnostic push
288
#pragma clang diagnostic ignored "-Wshadow-field"
289
#endif
290
    DataSinkOperatorPtr _sink = nullptr;
291
#ifdef __clang__
292
#pragma clang diagnostic pop
293
#endif
294
295
    // `_dag` manage dependencies between pipelines by pipeline ID. the indices will be blocked by members
296
    std::map<PipelineId, std::vector<PipelineId>> _dag;
297
298
    // We use preorder traversal to create an operator tree. When we meet a join node, we should
299
    // build probe operator and build operator in separate pipelines. To do this, we should build
300
    // ProbeSide first, and use `_pipelines_to_build` to store which pipeline the build operator
301
    // is in, so we can build BuildSide once we complete probe side.
302
    struct pipeline_parent_map {
303
        std::map<int, std::vector<PipelinePtr>> _build_side_pipelines;
304
0
        void push(int parent_node_id, PipelinePtr pipeline) {
305
0
            if (!_build_side_pipelines.contains(parent_node_id)) {
306
0
                _build_side_pipelines.insert({parent_node_id, {pipeline}});
307
0
            } else {
308
0
                _build_side_pipelines[parent_node_id].push_back(pipeline);
309
0
            }
310
0
        }
311
0
        void pop(PipelinePtr& cur_pipe, int parent_node_id, int child_idx) {
312
0
            if (!_build_side_pipelines.contains(parent_node_id)) {
313
0
                return;
314
0
            }
315
0
            DCHECK(_build_side_pipelines.contains(parent_node_id));
316
0
            auto& child_pipeline = _build_side_pipelines[parent_node_id];
317
0
            DCHECK(child_idx < child_pipeline.size());
318
0
            cur_pipe = child_pipeline[child_idx];
319
0
        }
320
0
        void clear() { _build_side_pipelines.clear(); }
321
    } _pipeline_parent_map;
322
323
    std::mutex _state_map_lock;
324
325
    // Start from -1 so all operator IDs are negative. This avoids collision with
326
    // unpaired sinks (OlapTableSink etc.) whose hardcoded dest_id=0 would otherwise
327
    // match the first operator's ID when FE-planned LocalExchangeNode is the root.
328
    int _operator_id = -1;
329
    int _sink_operator_id = -1;
330
    /**
331
     * Some states are shared by tasks in different pipeline task (e.g. local exchange , broadcast join).
332
     *
333
     * local exchange sink 0 ->                               -> local exchange source 0
334
     *                            LocalExchangeSharedState
335
     * local exchange sink 1 ->                               -> local exchange source 1
336
     *
337
     * hash join build sink 0 ->                               -> hash join build source 0
338
     *                              HashJoinSharedState
339
     * hash join build sink 1 ->                               -> hash join build source 1
340
     *
341
     * So we should keep states here.
342
     */
343
    std::map<int,
344
             std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
345
            _op_id_to_shared_state;
346
347
    std::map<PipelineId, Pipeline*> _pip_id_to_pipeline;
348
    std::vector<std::unique_ptr<RuntimeFilterMgr>> _runtime_filter_mgr_map;
349
350
    // Deferred exchanger creation info for FE-planned local exchanges.
351
    // Exchanger sender count depends on the upstream pipeline's final num_tasks,
352
    // which is only known after the full plan tree is built (child operators like
353
    // serial ExchangeNode may reduce num_tasks). So we defer exchanger creation
354
    // until after _build_pipelines completes.
355
    struct DeferredExchangerInfo {
356
        std::shared_ptr<LocalExchangeSharedState> shared_state;
357
        PipelinePtr upstream_pipe;
358
        TLocalPartitionType::type partition_type;
359
        int num_partitions;
360
        int free_blocks_limit;
361
        int local_exchange_id;
362
        int sink_id;
363
    };
364
    std::vector<DeferredExchangerInfo> _deferred_exchangers;
365
    Status _create_deferred_local_exchangers();
366
    // After _build_pipelines, propagate _num_instances from FE-planned LOCAL_EXCHANGE
367
    // pipelines upward through the DAG to ancestor pipelines that inherited reduced
368
    // num_tasks from a serial operator.
369
    void _propagate_local_exchange_num_tasks();
370
371
    //Here are two types of runtime states:
372
    //    - _runtime state is at the Fragment level.
373
    //    - _task_runtime_states is at the task level, unique to each task.
374
375
    std::vector<TUniqueId> _fragment_instance_ids;
376
377
    // Total instance num running on all BEs
378
    int _total_instances = -1;
379
380
    TPipelineFragmentParams _params;
381
    int32_t _parallel_instances = 0;
382
383
    std::atomic<bool> _need_notify_close = false;
384
    // Holds the brpc ClosureGuard for async wait-close during recursive CTE rerun.
385
    // When the PFC finishes closing and is destroyed, the shared_ptr destructor fires
386
    // the ClosureGuard, which completes the brpc response to the RecCTESourceOperatorX.
387
    // Only written by listen_wait_close() from a single rerun_fragment RPC thread.
388
    std::shared_ptr<brpc::ClosureGuard> _wait_close_guard = nullptr;
389
390
    // The recursion round number for recursive CTE fragments.
391
    // Incremented each time the fragment is rebuilt via rerun_fragment(rebuild).
392
    // Used to stamp runtime filter RPCs so stale messages from old rounds are discarded.
393
    uint32_t _rec_cte_stage = 0;
394
};
395
} // namespace doris