Coverage Report

Created: 2025-03-13 11:28

/root/doris/be/src/pipeline/pipeline.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <glog/logging.h>
21
22
#include <cstdint>
23
#include <memory>
24
#include <string_view>
25
#include <utility>
26
#include <vector>
27
28
#include "common/status.h"
29
#include "pipeline/exec/operator.h"
30
#include "pipeline/pipeline_x/operator.h"
31
#include "util/runtime_profile.h"
32
33
namespace doris::pipeline {
34
35
class PipelineFragmentContext;
36
class Pipeline;
37
38
using PipelinePtr = std::shared_ptr<Pipeline>;
39
using Pipelines = std::vector<PipelinePtr>;
40
using PipelineId = uint32_t;
41
42
class Pipeline : public std::enable_shared_from_this<Pipeline> {
43
    friend class PipelineTask;
44
    friend class PipelineXTask;
45
    friend class PipelineXFragmentContext;
46
47
public:
48
    Pipeline() = delete;
49
    explicit Pipeline(PipelineId pipeline_id, int num_tasks,
50
                      std::weak_ptr<PipelineFragmentContext> context)
51
0
            : _pipeline_id(pipeline_id), _context(std::move(context)), _num_tasks(num_tasks) {
52
0
        _init_profile();
53
0
        _tasks.resize(_num_tasks, nullptr);
54
0
    }
55
56
0
    void add_dependency(std::shared_ptr<Pipeline>& pipeline) {
57
0
        pipeline->_parents.emplace_back(_operator_builders.size(), weak_from_this());
58
0
        _dependencies.emplace_back(_operator_builders.size(), pipeline);
59
0
    }
60
61
    // If all dependencies are finished, this pipeline task should be scheduled.
62
    // e.g. Hash join probe task will be scheduled once Hash join build task is finished.
63
0
    void finish_one_dependency(int dep_opr, int dependency_core_id) {
64
0
        std::lock_guard l(_depend_mutex);
65
0
        if (!_operators.empty() && _operators[dep_opr - 1]->can_terminate_early()) {
66
0
            _always_can_read = true;
67
0
            _always_can_write = (dep_opr == _operators.size());
68
69
0
            for (int i = 0; i < _dependencies.size(); ++i) {
70
0
                if (dep_opr == _dependencies[i].first) {
71
0
                    _dependencies.erase(_dependencies.begin(), _dependencies.begin() + i + 1);
72
0
                    break;
73
0
                }
74
0
            }
75
0
        } else {
76
0
            for (int i = 0; i < _dependencies.size(); ++i) {
77
0
                if (dep_opr == _dependencies[i].first) {
78
0
                    _dependencies.erase(_dependencies.begin() + i);
79
0
                    break;
80
0
                }
81
0
            }
82
0
        }
83
84
0
        if (_dependencies.empty()) {
85
0
            _previous_schedule_id = dependency_core_id;
86
0
        }
87
0
    }
88
89
0
    bool has_dependency() {
90
0
        std::lock_guard l(_depend_mutex);
91
0
        return !_dependencies.empty();
92
0
    }
93
94
    Status add_operator(OperatorBuilderPtr& op);
95
96
    // Add operators for pipelineX
97
    Status add_operator(OperatorXPtr& op);
98
    // prepare operators for pipelineX
99
    Status prepare(RuntimeState* state);
100
101
    Status set_sink_builder(OperatorBuilderPtr& sink_operator_builder);
102
    Status set_sink(DataSinkOperatorXPtr& sink_operator);
103
104
0
    OperatorBuilderBase* get_sink_builder() { return _sink_builder.get(); }
105
0
    DataSinkOperatorXBase* sink_x() { return _sink_x.get(); }
106
0
    OperatorXs& operator_xs() { return operatorXs; }
107
0
    DataSinkOperatorXPtr sink_shared_pointer() { return _sink_x; }
108
109
    Status build_operators();
110
111
0
    RuntimeProfile* pipeline_profile() { return _pipeline_profile.get(); }
112
113
0
    [[nodiscard]] const RowDescriptor& output_row_desc() const {
114
0
        return operatorXs.back()->row_desc();
115
0
    }
116
117
0
    [[nodiscard]] PipelineId id() const { return _pipeline_id; }
118
0
    void set_is_root_pipeline() { _is_root_pipeline = true; }
119
0
    bool is_root_pipeline() const { return _is_root_pipeline; }
120
121
0
    static bool is_hash_exchange(ExchangeType idx) {
122
0
        return idx == ExchangeType::HASH_SHUFFLE || idx == ExchangeType::BUCKET_HASH_SHUFFLE;
123
0
    }
124
125
    // For HASH_SHUFFLE, BUCKET_HASH_SHUFFLE, and ADAPTIVE_PASSTHROUGH,
126
    // data is processed and shuffled on the sink.
127
    // Compared to PASSTHROUGH, this is a relatively heavy operation.
128
0
    static bool heavy_operations_on_the_sink(ExchangeType idx) {
129
0
        return idx == ExchangeType::HASH_SHUFFLE || idx == ExchangeType::BUCKET_HASH_SHUFFLE ||
130
0
               idx == ExchangeType::ADAPTIVE_PASSTHROUGH;
131
0
    }
132
133
0
    bool need_to_local_exchange(const DataDistribution target_data_distribution) const {
134
0
        if (target_data_distribution.distribution_type != ExchangeType::BUCKET_HASH_SHUFFLE &&
135
0
            target_data_distribution.distribution_type != ExchangeType::HASH_SHUFFLE) {
136
0
            return true;
137
0
        } else if (operatorXs.front()->ignore_data_hash_distribution()) {
138
0
            if (_data_distribution.distribution_type ==
139
0
                        target_data_distribution.distribution_type &&
140
0
                (_data_distribution.partition_exprs.empty() ||
141
0
                 target_data_distribution.partition_exprs.empty())) {
142
0
                return true;
143
0
            }
144
0
            return _data_distribution.distribution_type !=
145
0
                           target_data_distribution.distribution_type &&
146
0
                   !(is_hash_exchange(_data_distribution.distribution_type) &&
147
0
                     is_hash_exchange(target_data_distribution.distribution_type));
148
0
        } else {
149
0
            return _data_distribution.distribution_type !=
150
0
                           target_data_distribution.distribution_type &&
151
0
                   !(is_hash_exchange(_data_distribution.distribution_type) &&
152
0
                     is_hash_exchange(target_data_distribution.distribution_type));
153
0
        }
154
0
    }
155
0
    void init_data_distribution() {
156
0
        set_data_distribution(operatorXs.front()->required_data_distribution());
157
0
    }
158
0
    void set_data_distribution(const DataDistribution& data_distribution) {
159
0
        _data_distribution = data_distribution;
160
0
    }
161
0
    const DataDistribution& data_distribution() const { return _data_distribution; }
162
163
0
    std::vector<std::shared_ptr<Pipeline>>& children() { return _children; }
164
0
    void set_children(std::shared_ptr<Pipeline> child) { _children.push_back(child); }
165
0
    void set_children(std::vector<std::shared_ptr<Pipeline>> children) { _children = children; }
166
167
0
    void incr_created_tasks(int i, PipelineTask* task) {
168
0
        _num_tasks_created++;
169
0
        _num_tasks_running++;
170
0
        DCHECK_LT(i, _tasks.size());
171
0
        _tasks[i] = task;
172
0
    }
173
174
    void make_all_runnable();
175
176
0
    void set_num_tasks(int num_tasks) {
177
0
        _num_tasks = num_tasks;
178
0
        _tasks.resize(_num_tasks, nullptr);
179
0
        for (auto& op : operatorXs) {
180
0
            op->set_parallel_tasks(_num_tasks);
181
0
        }
182
0
    }
183
0
    int num_tasks() const { return _num_tasks; }
184
0
    bool close_task() { return _num_tasks_running.fetch_sub(1) == 1; }
185
186
0
    std::string debug_string() {
187
0
        fmt::memory_buffer debug_string_buffer;
188
0
        fmt::format_to(debug_string_buffer,
189
0
                       "Pipeline [id: {}, _num_tasks: {}, _num_tasks_created: {}]", _pipeline_id,
190
0
                       _num_tasks, _num_tasks_created);
191
0
        for (size_t i = 0; i < operatorXs.size(); i++) {
192
0
            fmt::format_to(debug_string_buffer, "\n{}", operatorXs[i]->debug_string(i));
193
0
        }
194
0
        fmt::format_to(debug_string_buffer, "\n{}", _sink_x->debug_string(operatorXs.size()));
195
0
        return fmt::to_string(debug_string_buffer);
196
0
    }
197
198
private:
199
    void _init_profile();
200
201
    OperatorBuilders _operator_builders; // left is _source, right is _root
202
    OperatorBuilderPtr _sink_builder;    // put block to sink
203
204
    std::mutex _depend_mutex;
205
    std::vector<std::pair<int, std::weak_ptr<Pipeline>>> _parents;
206
    std::vector<std::pair<int, std::shared_ptr<Pipeline>>> _dependencies;
207
208
    std::vector<std::shared_ptr<Pipeline>> _children;
209
210
    PipelineId _pipeline_id;
211
    std::weak_ptr<PipelineFragmentContext> _context;
212
    int _previous_schedule_id = -1;
213
214
    // pipline id + operator names. init when:
215
    //  build_operators(), if pipeline;
216
    //  _build_pipelines() and _create_tree_helper(), if pipelineX.
217
    std::string _name;
218
219
    std::unique_ptr<RuntimeProfile> _pipeline_profile;
220
221
    // Operators for pipelineX. All pipeline tasks share operators from this.
222
    // [SourceOperator -> ... -> SinkOperator]
223
    OperatorXs operatorXs;
224
    DataSinkOperatorXPtr _sink_x = nullptr;
225
226
    std::shared_ptr<ObjectPool> _obj_pool;
227
228
    Operators _operators;
229
    /**
230
     * Consider the query plan below:
231
     *
232
     *      ExchangeSource     JoinBuild1
233
     *            \              /
234
     *         JoinProbe1 (Right Outer)    JoinBuild2
235
     *                   \                   /
236
     *                 JoinProbe2 (Right Outer)
237
     *                          |
238
     *                        Sink
239
     *
240
     * Assume JoinBuild1/JoinBuild2 outputs 0 rows, this pipeline task should not be blocked by ExchangeSource
241
     * because we have a determined conclusion that JoinProbe1/JoinProbe2 will also output 0 rows.
242
     *
243
     * Assume JoinBuild2 outputs > 0 rows, this pipeline task may be blocked by Sink because JoinProbe2 will
244
     * produce more data.
245
     *
246
     * Assume both JoinBuild2 outputs 0 rows this pipeline task should not be blocked by ExchangeSource
247
     * and Sink because JoinProbe2 will always produce 0 rows and terminate early.
248
     *
249
     * In a nutshell, we should follow the rules:
250
     * 1. if any operator in pipeline can terminate early, this task should never be blocked by source operator.
251
     * 2. if the last operator (except sink) can terminate early, this task should never be blocked by sink operator.
252
     */
253
    bool _always_can_read = false;
254
    bool _always_can_write = false;
255
    bool _is_root_pipeline = false;
256
257
    // Input data distribution of this pipeline. We do local exchange when input data distribution
258
    // does not match the target data distribution.
259
    DataDistribution _data_distribution {ExchangeType::NOOP};
260
261
    // How many tasks should be created ?
262
    int _num_tasks = 1;
263
    // How many tasks are already created?
264
    std::atomic<int> _num_tasks_created = 0;
265
    // How many tasks are already created and not finished?
266
    std::atomic<int> _num_tasks_running = 0;
267
    // Tasks in this pipeline.
268
    std::vector<PipelineTask*> _tasks;
269
};
270
271
} // namespace doris::pipeline