Coverage Report

Created: 2026-02-27 22:01

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/be/src/pipeline/pipeline.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <glog/logging.h>
21
22
#include <cstdint>
23
#include <memory>
24
#include <string_view>
25
#include <utility>
26
#include <vector>
27
28
#include "common/cast_set.h"
29
#include "common/status.h"
30
#include "pipeline/exec/operator.h"
31
#include "util/runtime_profile.h"
32
33
namespace doris::pipeline {
34
#include "common/compile_check_begin.h"
35
class PipelineFragmentContext;
36
class Pipeline;
37
38
using PipelinePtr = std::shared_ptr<Pipeline>;
39
using Pipelines = std::vector<PipelinePtr>;
40
using PipelineId = uint32_t;
41
42
class Pipeline : public std::enable_shared_from_this<Pipeline> {
43
    friend class PipelineTask;
44
    friend class PipelineFragmentContext;
45
46
public:
47
    explicit Pipeline(PipelineId pipeline_id, int num_tasks, int num_tasks_of_parent)
48
144k
            : _pipeline_id(pipeline_id),
49
144k
              _num_tasks(num_tasks),
50
144k
              _num_tasks_of_parent(num_tasks_of_parent) {
51
144k
        _init_profile();
52
144k
        _tasks.resize(_num_tasks, nullptr);
53
144k
    }
54
55
    // Add operators for pipelineX
56
    Status add_operator(OperatorPtr& op, const int parallelism);
57
    // prepare operators for pipelineX
58
    Status prepare(RuntimeState* state);
59
60
    Status set_sink(DataSinkOperatorPtr& sink_operator);
61
62
10
    DataSinkOperatorXBase* sink() { return _sink.get(); }
63
72.1k
    Operators& operators() { return _operators; }
64
72.1k
    DataSinkOperatorPtr sink_shared_pointer() { return _sink; }
65
66
0
    [[nodiscard]] const RowDescriptor& output_row_desc() const {
67
0
        return _operators.back()->row_desc();
68
0
    }
69
70
58
    [[nodiscard]] PipelineId id() const { return _pipeline_id; }
71
72
0
    static bool is_hash_exchange(TLocalPartitionType::type idx) {
73
0
        return is_shuffled_exchange(idx);
74
0
    }
75
76
    // For HASH_SHUFFLE, BUCKET_HASH_SHUFFLE, and ADAPTIVE_PASSTHROUGH,
77
    // data is processed and shuffled on the sink.
78
    // Compared to PASSTHROUGH, this is a relatively heavy operation.
79
0
    static bool heavy_operations_on_the_sink(TLocalPartitionType::type idx) {
80
0
        return is_shuffled_exchange(idx) || idx == TLocalPartitionType::ADAPTIVE_PASSTHROUGH;
81
0
    }
82
83
    bool need_to_local_exchange(const DataDistribution target_data_distribution,
84
                                const int idx) const;
85
4
    void init_data_distribution(RuntimeState* state) {
86
4
        set_data_distribution(_operators.front()->required_data_distribution(state));
87
4
    }
88
5
    void set_data_distribution(const DataDistribution& data_distribution) {
89
5
        _data_distribution = data_distribution;
90
5
    }
91
5
    const DataDistribution& data_distribution() const { return _data_distribution; }
92
93
3
    std::vector<std::shared_ptr<Pipeline>>& children() { return _children; }
94
1
    void set_children(std::shared_ptr<Pipeline> child) { _children.push_back(child); }
95
0
    void set_children(std::vector<std::shared_ptr<Pipeline>> children) {
96
0
        _children = std::move(children);
97
0
    }
98
99
5
    void incr_created_tasks(int i, PipelineTask* task) {
100
5
        _num_tasks_created++;
101
5
        _num_tasks_running++;
102
5
        DCHECK_LT(i, _tasks.size());
103
5
        _tasks[i] = task;
104
5
    }
105
106
    void make_all_runnable(PipelineId wake_by);
107
108
0
    void set_num_tasks(int num_tasks) {
109
0
        _num_tasks = num_tasks;
110
0
        _tasks.resize(_num_tasks, nullptr);
111
0
        for (auto& op : _operators) {
112
0
            op->set_parallel_tasks(_num_tasks);
113
0
        }
114
115
0
#ifndef NDEBUG
116
0
        if (num_tasks > 1 &&
117
0
            std::any_of(_operators.begin(), _operators.end(),
118
0
                        [&](OperatorPtr op) -> bool { return op->is_serial_operator(); })) {
119
0
            DCHECK(false) << debug_string();
120
0
        }
121
0
#endif
122
0
    }
123
144k
    int num_tasks() const { return _num_tasks; }
124
0
    bool close_task() { return _num_tasks_running.fetch_sub(1) == 1; }
125
126
0
    std::string debug_string() const {
127
0
        fmt::memory_buffer debug_string_buffer;
128
0
        fmt::format_to(debug_string_buffer,
129
0
                       "Pipeline [id: {}, _num_tasks: {}, _num_tasks_created: {}]", _pipeline_id,
130
0
                       _num_tasks, _num_tasks_created);
131
0
        for (int i = 0; i < _operators.size(); i++) {
132
0
            fmt::format_to(debug_string_buffer, "\n{}", _operators[i]->debug_string(i));
133
0
        }
134
0
        fmt::format_to(debug_string_buffer, "\n{}",
135
0
                       _sink ? _sink->debug_string(cast_set<int>(_operators.size())) : "null");
136
0
        return fmt::to_string(debug_string_buffer);
137
0
    }
138
139
0
    int num_tasks_of_parent() const { return _num_tasks_of_parent; }
140
72.1k
    std::string& name() { return _name; }
141
142
private:
143
    void _init_profile();
144
145
    std::vector<std::pair<int, std::weak_ptr<Pipeline>>> _parents;
146
    std::vector<std::pair<int, std::shared_ptr<Pipeline>>> _dependencies;
147
148
    std::vector<std::shared_ptr<Pipeline>> _children;
149
150
    PipelineId _pipeline_id;
151
152
    // pipline id + operator names. init when:
153
    //  build_operators(), if pipeline;
154
    //  _build_pipelines() and _create_tree_helper(), if pipelineX.
155
    std::string _name;
156
157
    std::unique_ptr<RuntimeProfile> _pipeline_profile;
158
159
    // Operators for pipelineX. All pipeline tasks share operators from this.
160
    // [SourceOperator -> ... -> SinkOperator]
161
    Operators _operators;
162
    DataSinkOperatorPtr _sink = nullptr;
163
164
    std::shared_ptr<ObjectPool> _obj_pool;
165
166
    // Input data distribution of this pipeline. We do local exchange when input data distribution
167
    // does not match the target data distribution.
168
    DataDistribution _data_distribution {TLocalPartitionType::NOOP};
169
170
    // How many tasks should be created ?
171
    int _num_tasks = 1;
172
    // How many tasks are already created?
173
    std::atomic<int> _num_tasks_created = 0;
174
    // How many tasks are already created and not finished?
175
    std::atomic<int> _num_tasks_running = 0;
176
    // Tasks in this pipeline.
177
    std::vector<PipelineTask*> _tasks;
178
    // Parallelism of parent pipeline.
179
    const int _num_tasks_of_parent;
180
};
181
#include "common/compile_check_end.h"
182
} // namespace doris::pipeline