Coverage Report

Created: 2024-11-21 23:27

/root/doris/be/src/pipeline/pipeline.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "pipeline.h"
19
20
#include <memory>
21
#include <string>
22
#include <utility>
23
24
#include "pipeline/exec/operator.h"
25
#include "pipeline/pipeline_fragment_context.h"
26
#include "pipeline/pipeline_task.h"
27
28
namespace doris::pipeline {
29
30
0
void Pipeline::_init_profile() {
31
0
    auto s = fmt::format("Pipeline (pipeline id={})", _pipeline_id);
32
0
    _pipeline_profile = std::make_unique<RuntimeProfile>(std::move(s));
33
0
}
34
35
bool Pipeline::need_to_local_exchange(const DataDistribution target_data_distribution,
36
0
                                      const int idx) const {
37
    // If serial operator exists after `idx`-th operator, we should not improve parallelism.
38
0
    if (std::any_of(_operators.begin() + idx, _operators.end(),
39
0
                    [&](OperatorPtr op) -> bool { return op->is_serial_operator(); })) {
40
0
        return false;
41
0
    }
42
    // If all operators are serial and sink is not serial, we should improve parallelism for sink.
43
0
    if (std::all_of(_operators.begin(), _operators.end(),
44
0
                    [&](OperatorPtr op) -> bool { return op->is_serial_operator(); })) {
45
0
        if (!_sink->is_serial_operator()) {
46
0
            return true;
47
0
        }
48
0
    } else if (std::any_of(_operators.begin(), _operators.end(),
49
0
                           [&](OperatorPtr op) -> bool { return op->is_serial_operator(); })) {
50
        // If non-serial operators exist, we should improve parallelism for those.
51
0
        return true;
52
0
    }
53
54
0
    if (target_data_distribution.distribution_type != ExchangeType::BUCKET_HASH_SHUFFLE &&
55
0
        target_data_distribution.distribution_type != ExchangeType::HASH_SHUFFLE) {
56
        // Always do local exchange if non-hash-partition exchanger is required.
57
        // For example, `PASSTHROUGH` exchanger is always required to distribute data evenly.
58
0
        return true;
59
0
    } else if (_operators.front()->is_serial_operator()) {
60
0
        DCHECK(std::all_of(_operators.begin(), _operators.end(),
61
0
                           [&](OperatorPtr op) -> bool { return op->is_serial_operator(); }) &&
62
0
               _sink->is_serial_operator())
63
0
                << debug_string();
64
        // All operators and sink are serial in this path.
65
0
        return false;
66
0
    } else {
67
0
        return _data_distribution.distribution_type != target_data_distribution.distribution_type &&
68
0
               !(is_hash_exchange(_data_distribution.distribution_type) &&
69
0
                 is_hash_exchange(target_data_distribution.distribution_type));
70
0
    }
71
0
}
72
73
0
Status Pipeline::add_operator(OperatorPtr& op, const int parallelism) {
74
0
    if (parallelism > 0 && op->is_serial_operator()) {
75
0
        set_num_tasks(parallelism);
76
0
    }
77
0
    op->set_parallel_tasks(num_tasks());
78
0
    _operators.emplace_back(op);
79
0
    if (op->is_source()) {
80
0
        std::reverse(_operators.begin(), _operators.end());
81
0
    }
82
0
    return Status::OK();
83
0
}
84
85
0
Status Pipeline::prepare(RuntimeState* state) {
86
0
    RETURN_IF_ERROR(_operators.back()->open(state));
87
0
    RETURN_IF_ERROR(_sink->open(state));
88
0
    _name.append(std::to_string(id()));
89
0
    _name.push_back('-');
90
0
    for (auto& op : _operators) {
91
0
        _name.append(std::to_string(op->node_id()));
92
0
        _name.append(op->get_name());
93
0
    }
94
0
    _name.push_back('-');
95
0
    _name.append(std::to_string(_sink->node_id()));
96
0
    _name.append(_sink->get_name());
97
0
    return Status::OK();
98
0
}
99
100
0
Status Pipeline::set_sink(DataSinkOperatorPtr& sink) {
101
0
    if (_sink) {
102
0
        return Status::InternalError("set sink twice");
103
0
    }
104
0
    if (!sink->is_sink()) {
105
0
        return Status::InternalError("should set a sink operator but {}", typeid(sink).name());
106
0
    }
107
0
    _sink = sink;
108
0
    return Status::OK();
109
0
}
110
111
0
void Pipeline::make_all_runnable() {
112
0
    if (_sink->count_down_destination()) {
113
0
        for (auto* task : _tasks) {
114
0
            if (task) {
115
0
                task->clear_blocking_state(true);
116
0
            }
117
0
        }
118
0
    }
119
0
}
120
121
} // namespace doris::pipeline