Coverage Report

Created: 2026-03-17 08:24

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/pipeline/pipeline_fragment_context.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/Types_types.h>
21
#include <gen_cpp/types.pb.h>
22
23
#include <atomic>
24
#include <cstddef>
25
#include <cstdint>
26
#include <functional>
27
#include <memory>
28
#include <mutex>
29
#include <string>
30
#include <vector>
31
32
#include "common/status.h"
33
#include "exec/pipeline/pipeline.h"
34
#include "exec/pipeline/pipeline_task.h"
35
#include "runtime/query_context.h"
36
#include "runtime/runtime_profile.h"
37
#include "runtime/runtime_state.h"
38
#include "runtime/task_execution_context.h"
39
#include "util/stopwatch.hpp"
40
#include "util/uid_util.h"
41
namespace doris {
42
struct ReportStatusRequest;
43
class ExecEnv;
44
class RuntimeFilterMergeControllerEntity;
45
class TDataSink;
46
class TPipelineFragmentParams;
47
48
class Dependency;
49
50
class PipelineFragmentContext : public TaskExecutionContext {
51
public:
52
    ENABLE_FACTORY_CREATOR(PipelineFragmentContext);
53
    // Callback to report execution status of plan fragment.
54
    // 'profile' is the cumulative profile, 'done' indicates whether the execution
55
    // is done or still continuing.
56
    // Note: this does not take a const RuntimeProfile&, because it might need to call
57
    // functions like PrettyPrint() or to_thrift(), neither of which is const
58
    // because they take locks.
59
    using report_status_callback = std::function<Status(
60
            const ReportStatusRequest, std::shared_ptr<PipelineFragmentContext>&&)>;
61
    PipelineFragmentContext(TUniqueId query_id, const TPipelineFragmentParams& request,
62
                            std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
63
                            const std::function<void(RuntimeState*, Status*)>& call_back,
64
                            report_status_callback report_status_cb);
65
66
    ~PipelineFragmentContext() override;
67
68
    void print_profile(const std::string& extra_info);
69
70
    std::vector<std::shared_ptr<TRuntimeProfileTree>> collect_realtime_profile() const;
71
    std::shared_ptr<TRuntimeProfileTree> collect_realtime_load_channel_profile() const;
72
73
    bool is_timeout(timespec now) const;
74
75
52.4k
    uint64_t elapsed_time() const { return _fragment_watcher.elapsed_time(); }
76
77
436
    int timeout_second() const { return _timeout; }
78
79
    PipelinePtr add_pipeline(PipelinePtr parent = nullptr, int idx = -1);
80
81
16.6M
    QueryContext* get_query_ctx() { return _query_ctx.get(); }
82
30.5M
    [[nodiscard]] bool is_canceled() const { return _query_ctx->is_cancelled(); }
83
84
    Status prepare(ThreadPool* thread_pool);
85
86
    Status submit();
87
88
434k
    void set_is_report_success(bool is_report_success) { _is_report_success = is_report_success; }
89
90
    void cancel(const Status reason);
91
92
2.33M
    TUniqueId get_query_id() const { return _query_id; }
93
94
6
    [[nodiscard]] int get_fragment_id() const { return _fragment_id; }
95
96
482k
    [[nodiscard]] int get_total_instances() const { return (int)_tasks.size(); }
97
98
482k
    [[nodiscard]] int get_finished_instances() const { return _finished_instances_count.load(); }
99
100
    void decrement_running_task(PipelineId pipeline_id, int instance_idx);
101
102
    Status send_report(bool);
103
104
    void trigger_report_if_necessary();
105
    void refresh_next_report_time();
106
107
    std::string debug_string();
108
109
810k
    [[nodiscard]] int next_operator_id() { return _operator_id--; }
110
111
4.66M
    [[nodiscard]] int max_operator_id() const { return _operator_id; }
112
113
689k
    [[nodiscard]] int next_sink_operator_id() { return _sink_operator_id--; }
114
115
    [[nodiscard]] size_t get_revocable_size(bool* has_running_task) const;
116
117
    [[nodiscard]] std::vector<PipelineTask*> get_revocable_tasks() const;
118
119
111k
    void clear_finished_tasks() {
120
111k
        if (_need_notify_close) {
121
23.9k
            return;
122
23.9k
        }
123
516k
        for (size_t j = 0; j < _tasks.size(); j++) {
124
1.46M
            for (size_t i = 0; i < _tasks[j].size(); i++) {
125
1.03M
                _tasks[j][i].first->stop_if_finished();
126
1.03M
            }
127
428k
        }
128
87.8k
    }
129
130
    std::string get_load_error_url();
131
    std::string get_first_error_msg();
132
133
    Status wait_close(bool close);
134
    Status rebuild(ThreadPool* thread_pool);
135
    Status set_to_rerun();
136
137
0
    bool need_notify_close() const { return _need_notify_close; }
138
139
private:
140
    void _release_resource();
141
142
    Status _build_and_prepare_full_pipeline(ThreadPool* thread_pool);
143
144
    Status _build_pipelines(ObjectPool* pool, const DescriptorTbl& descs, OperatorPtr* root,
145
                            PipelinePtr cur_pipe);
146
    Status _create_tree_helper(ObjectPool* pool, const std::vector<TPlanNode>& tnodes,
147
                               const DescriptorTbl& descs, OperatorPtr parent, int* node_idx,
148
                               OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx,
149
                               const bool followed_by_shuffled_join,
150
                               const bool require_bucket_distribution);
151
152
    Status _create_operator(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs,
153
                            OperatorPtr& op, PipelinePtr& cur_pipe, int parent_idx, int child_idx,
154
                            const bool followed_by_shuffled_join,
155
                            const bool require_bucket_distribution);
156
    template <bool is_intersect>
157
    Status _build_operators_for_set_operation_node(ObjectPool* pool, const TPlanNode& tnode,
158
                                                   const DescriptorTbl& descs, OperatorPtr& op,
159
                                                   PipelinePtr& cur_pipe,
160
                                                   std::vector<DataSinkOperatorPtr>& sink_ops);
161
162
    Status _create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
163
                             const std::vector<TExpr>& output_exprs,
164
                             const TPipelineFragmentParams& params, const RowDescriptor& row_desc,
165
                             RuntimeState* state, DescriptorTbl& desc_tbl,
166
                             PipelineId cur_pipeline_id);
167
    Status _plan_local_exchange(int num_buckets,
168
                                const std::map<int, int>& bucket_seq_to_instance_idx,
169
                                const std::map<int, int>& shuffle_idx_to_instance_idx);
170
    Status _plan_local_exchange(int num_buckets, int pip_idx, PipelinePtr pip,
171
                                const std::map<int, int>& bucket_seq_to_instance_idx,
172
                                const std::map<int, int>& shuffle_idx_to_instance_idx);
173
    void _inherit_pipeline_properties(const DataDistribution& data_distribution,
174
                                      PipelinePtr pipe_with_source, PipelinePtr pipe_with_sink);
175
    Status _add_local_exchange(int pip_idx, int idx, int node_id, ObjectPool* pool,
176
                               PipelinePtr cur_pipe, DataDistribution data_distribution,
177
                               bool* do_local_exchange, int num_buckets,
178
                               const std::map<int, int>& bucket_seq_to_instance_idx,
179
                               const std::map<int, int>& shuffle_idx_to_instance_idx);
180
    Status _add_local_exchange_impl(int idx, ObjectPool* pool, PipelinePtr cur_pipe,
181
                                    PipelinePtr new_pip, DataDistribution data_distribution,
182
                                    bool* do_local_exchange, int num_buckets,
183
                                    const std::map<int, int>& bucket_seq_to_instance_idx,
184
                                    const std::map<int, int>& shuffle_idx_to_instance_idx);
185
186
    Status _build_pipeline_tasks(ThreadPool* thread_pool);
187
    Status _build_pipeline_tasks_for_instance(
188
            int instance_idx,
189
            const std::vector<std::shared_ptr<RuntimeProfile>>& pipeline_id_to_profile);
190
    void _close_fragment_instance();
191
    void _init_next_report_time();
192
193
    // Id of this query
194
    TUniqueId _query_id;
195
    int _fragment_id;
196
197
    ExecEnv* _exec_env = nullptr;
198
199
    std::atomic_bool _prepared = false;
200
    bool _submitted = false;
201
202
    Pipelines _pipelines;
203
    PipelineId _next_pipeline_id = 0;
204
    std::mutex _task_mutex;
205
    std::atomic<int> _finished_instances_count {0};
206
    std::vector<int> _instance_tasks_count;
207
    std::unique_ptr<std::atomic<int>[]> _instance_closed_tasks_count;
208
    int _closed_tasks = 0;
209
    // After prepared, `_total_tasks` is equal to the size of `_tasks`.
210
    // When submit fail, `_total_tasks` is equal to the number of tasks submitted.
211
    std::atomic<int> _total_tasks = 0;
212
213
    std::unique_ptr<RuntimeProfile> _fragment_level_profile;
214
    bool _is_report_success = false;
215
216
    std::unique_ptr<RuntimeState> _runtime_state;
217
218
    std::shared_ptr<QueryContext> _query_ctx;
219
220
    MonotonicStopWatch _fragment_watcher;
221
    RuntimeProfile::Counter* _prepare_timer = nullptr;
222
    RuntimeProfile::Counter* _init_context_timer = nullptr;
223
    RuntimeProfile::Counter* _build_pipelines_timer = nullptr;
224
    RuntimeProfile::Counter* _plan_local_exchanger_timer = nullptr;
225
    RuntimeProfile::Counter* _prepare_all_pipelines_timer = nullptr;
226
    RuntimeProfile::Counter* _build_tasks_timer = nullptr;
227
228
    std::function<void(RuntimeState*, Status*)> _call_back;
229
    std::atomic_bool _is_fragment_instance_closed = false;
230
231
    // If this is set to false, and '_is_report_success' is false as well,
232
    // This executor will not report status to FE on being cancelled.
233
    bool _is_report_on_cancel;
234
235
    // 0 indicates reporting is in progress or not required
236
    std::atomic_bool _disable_period_report = true;
237
    std::atomic_uint64_t _previous_report_time = 0;
238
239
    // This callback is used to notify the FE of the status of the fragment.
240
    // For example:
241
    // 1. when the fragment is cancelled, it will be called.
242
    // 2. when the fragment is finished, it will be called. especially, when the fragment is
243
    // a insert into select statement, it should notfiy FE every fragment's status.
244
    // And also, this callback is called periodly to notify FE the load process.
245
    report_status_callback _report_status_cb;
246
247
    DescriptorTbl* _desc_tbl = nullptr;
248
    int _num_instances = 1;
249
250
    int _timeout = -1;
251
    bool _use_serial_source = false;
252
253
    OperatorPtr _root_op = nullptr;
254
    //
255
    /**
256
     * Matrix stores tasks with local runtime states.
257
     * This is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines.
258
     *
259
     * 2-D matrix:
260
     * +-------------------------+------------+-------+
261
     * |            | Pipeline 0 | Pipeline 1 |  ...  |
262
     * +------------+------------+------------+-------+
263
     * | Instance 0 |  task 0-0  |  task 0-1  |  ...  |
264
     * +------------+------------+------------+-------+
265
     * | Instance 1 |  task 1-0  |  task 1-1  |  ...  |
266
     * +------------+------------+------------+-------+
267
     * | ...                                          |
268
     * +--------------------------------------+-------+
269
     */
270
    std::vector<
271
            std::vector<std::pair<std::shared_ptr<PipelineTask>, std::unique_ptr<RuntimeState>>>>
272
            _tasks;
273
274
    // TODO: remove the _sink and _multi_cast_stream_sink_senders to set both
275
    // of it in pipeline task not the fragment_context
276
#ifdef __clang__
277
#pragma clang diagnostic push
278
#pragma clang diagnostic ignored "-Wshadow-field"
279
#endif
280
    DataSinkOperatorPtr _sink = nullptr;
281
#ifdef __clang__
282
#pragma clang diagnostic pop
283
#endif
284
285
    // `_dag` manage dependencies between pipelines by pipeline ID. the indices will be blocked by members
286
    std::map<PipelineId, std::vector<PipelineId>> _dag;
287
288
    // We use preorder traversal to create an operator tree. When we meet a join node, we should
289
    // build probe operator and build operator in separate pipelines. To do this, we should build
290
    // ProbeSide first, and use `_pipelines_to_build` to store which pipeline the build operator
291
    // is in, so we can build BuildSide once we complete probe side.
292
    struct pipeline_parent_map {
293
        std::map<int, std::vector<PipelinePtr>> _build_side_pipelines;
294
31.0k
        void push(int parent_node_id, PipelinePtr pipeline) {
295
31.0k
            if (!_build_side_pipelines.contains(parent_node_id)) {
296
15.4k
                _build_side_pipelines.insert({parent_node_id, {pipeline}});
297
15.6k
            } else {
298
15.6k
                _build_side_pipelines[parent_node_id].push_back(pipeline);
299
15.6k
            }
300
31.0k
        }
301
687k
        void pop(PipelinePtr& cur_pipe, int parent_node_id, int child_idx) {
302
687k
            if (!_build_side_pipelines.contains(parent_node_id)) {
303
657k
                return;
304
657k
            }
305
687k
            DCHECK(_build_side_pipelines.contains(parent_node_id));
306
30.1k
            auto& child_pipeline = _build_side_pipelines[parent_node_id];
307
30.1k
            DCHECK(child_idx < child_pipeline.size());
308
30.1k
            cur_pipe = child_pipeline[child_idx];
309
30.1k
        }
310
436k
        void clear() { _build_side_pipelines.clear(); }
311
    } _pipeline_parent_map;
312
313
    std::mutex _state_map_lock;
314
315
    int _operator_id = 0;
316
    int _sink_operator_id = 0;
317
    /**
318
     * Some states are shared by tasks in different pipeline task (e.g. local exchange , broadcast join).
319
     *
320
     * local exchange sink 0 ->                               -> local exchange source 0
321
     *                            LocalExchangeSharedState
322
     * local exchange sink 1 ->                               -> local exchange source 1
323
     *
324
     * hash join build sink 0 ->                               -> hash join build source 0
325
     *                              HashJoinSharedState
326
     * hash join build sink 1 ->                               -> hash join build source 1
327
     *
328
     * So we should keep states here.
329
     */
330
    std::map<int,
331
             std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
332
            _op_id_to_shared_state;
333
334
    std::map<PipelineId, Pipeline*> _pip_id_to_pipeline;
335
    std::vector<std::unique_ptr<RuntimeFilterMgr>> _runtime_filter_mgr_map;
336
337
    //Here are two types of runtime states:
338
    //    - _runtime state is at the Fragment level.
339
    //    - _task_runtime_states is at the task level, unique to each task.
340
341
    std::vector<TUniqueId> _fragment_instance_ids;
342
343
    // Total instance num running on all BEs
344
    int _total_instances = -1;
345
346
    TPipelineFragmentParams _params;
347
    int32_t _parallel_instances = 0;
348
349
    bool _need_notify_close = false;
350
};
351
} // namespace doris