Coverage Report

Created: 2026-07-03 18:59

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/pipeline/task_queue.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/pipeline/task_queue.h"
19
20
// IWYU pragma: no_include <bits/chrono.h>
21
#include <chrono> // IWYU pragma: keep
22
#include <memory>
23
#include <string>
24
#include <vector>
25
26
#include "common/logging.h"
27
#include "exec/pipeline/pipeline_task.h"
28
#include "runtime/workload_group/workload_group.h"
29
30
namespace doris {
31
32
8.53M
PipelineTaskSPtr SubTaskQueue::try_take(bool is_steal) {
33
8.53M
    if (_queue.empty()) {
34
0
        return nullptr;
35
0
    }
36
8.53M
    auto task = _queue.front();
37
8.53M
    _queue.pop();
38
8.53M
    return task;
39
8.53M
}
40
41
8.53k
std::queue<PipelineTaskSPtr> SubTaskQueue::take_all() {
42
8.53k
    std::queue<PipelineTaskSPtr> tasks;
43
8.53k
    tasks.swap(_queue);
44
8.53k
    return tasks;
45
8.53k
}
46
47
////////////////////  PriorityTaskQueue ////////////////////
48
49
3.45k
PriorityTaskQueue::PriorityTaskQueue() : _closed(false) {
50
3.45k
    double factor = 1;
51
24.1k
    for (int i = SUB_QUEUE_LEVEL - 1; i >= 0; i--) {
52
20.7k
        _sub_queues[i].set_level_factor(factor);
53
20.7k
        factor *= LEVEL_QUEUE_TIME_FACTOR;
54
20.7k
    }
55
3.45k
}
56
57
1.42k
void PriorityTaskQueue::close() {
58
1.42k
    std::vector<std::queue<PipelineTaskSPtr>> tasks_to_release;
59
1.42k
    {
60
1.42k
        std::unique_lock<std::mutex> lock(_work_size_mutex);
61
1.42k
        _closed = true;
62
1.42k
        const auto total_task_size = _total_task_size.exchange(0);
63
1.42k
        _wait_task.notify_all();
64
1.42k
        DorisMetrics::instance()->pipeline_task_queue_size->increment(
65
1.42k
                -static_cast<int64_t>(total_task_size));
66
8.53k
        for (auto& sub_queue : _sub_queues) {
67
8.53k
            tasks_to_release.emplace_back(sub_queue.take_all());
68
8.53k
        }
69
1.42k
    }
70
8.53k
    for (auto& tasks : tasks_to_release) {
71
8.53k
        while (!tasks.empty()) {
72
0
            tasks.front()->pop_out_runnable_queue();
73
0
            tasks.pop();
74
0
        }
75
8.53k
    }
76
1.42k
}
77
78
1.80G
PipelineTaskSPtr PriorityTaskQueue::_try_take_unprotected(bool is_steal) {
79
1.80G
    if (_total_task_size == 0 || _closed) {
80
1.80G
        return nullptr;
81
1.80G
    }
82
83
18.4E
    double min_vruntime = 0;
84
18.4E
    int level = -1;
85
18.4E
    for (int i = 0; i < SUB_QUEUE_LEVEL; ++i) {
86
51.1M
        double cur_queue_vruntime = _sub_queues[i].get_vruntime();
87
51.1M
        if (!_sub_queues[i].empty()) {
88
8.53M
            if (level == -1 || cur_queue_vruntime < min_vruntime) {
89
8.53M
                level = i;
90
8.53M
                min_vruntime = cur_queue_vruntime;
91
8.53M
            }
92
8.53M
        }
93
51.1M
    }
94
18.4E
    DCHECK(level != -1);
95
18.4E
    _queue_level_min_vruntime = uint64_t(min_vruntime);
96
97
18.4E
    auto task = _sub_queues[level].try_take(is_steal);
98
18.4E
    if (task) {
99
8.54M
        task->update_queue_level(level);
100
8.54M
        _total_task_size--;
101
8.54M
        DorisMetrics::instance()->pipeline_task_queue_size->increment(-1);
102
8.54M
    }
103
18.4E
    return task;
104
1.80G
}
105
106
8.53M
int PriorityTaskQueue::_compute_level(uint64_t runtime) {
107
18.4E
    for (int i = 0; i < SUB_QUEUE_LEVEL - 1; ++i) {
108
8.54M
        if (runtime <= _queue_level_limit[i]) {
109
8.54M
            return i;
110
8.54M
        }
111
8.53M
    }
112
18.4E
    return SUB_QUEUE_LEVEL - 1;
113
8.53M
}
114
115
1.78G
PipelineTaskSPtr PriorityTaskQueue::try_take(bool is_steal) {
116
    // TODO other efficient lock? e.g. if get lock fail, return null_ptr
117
1.78G
    std::unique_lock<std::mutex> lock(_work_size_mutex);
118
1.78G
    return _try_take_unprotected(is_steal);
119
1.78G
}
120
121
37.7M
PipelineTaskSPtr PriorityTaskQueue::take(uint32_t timeout_ms) {
122
37.7M
    std::unique_lock<std::mutex> lock(_work_size_mutex);
123
37.7M
    auto task = _try_take_unprotected(false);
124
37.7M
    if (task) {
125
7.39k
        return task;
126
37.7M
    } else {
127
37.7M
        if (timeout_ms > 0) {
128
37.7M
            _wait_task.wait_for(lock, std::chrono::milliseconds(timeout_ms));
129
18.4E
        } else {
130
18.4E
            _wait_task.wait(lock);
131
18.4E
        }
132
37.7M
        return _try_take_unprotected(false);
133
37.7M
    }
134
37.7M
}
135
136
8.54M
Status PriorityTaskQueue::push(PipelineTaskSPtr task) {
137
8.54M
    auto level = _compute_level(task->get_runtime_ns());
138
8.54M
    std::unique_lock<std::mutex> lock(_work_size_mutex);
139
8.54M
    if (_closed) {
140
0
        return Status::InternalError("WorkTaskQueue closed");
141
0
    }
142
143
    // update empty queue's  runtime, to avoid too high priority
144
8.54M
    if (_sub_queues[level].empty() &&
145
8.54M
        double(_queue_level_min_vruntime) > _sub_queues[level].get_vruntime()) {
146
0
        _sub_queues[level].adjust_runtime(_queue_level_min_vruntime);
147
0
    }
148
149
8.54M
    task->put_in_runnable_queue();
150
8.54M
    _sub_queues[level].push_back(task);
151
8.54M
    _total_task_size++;
152
8.54M
    DorisMetrics::instance()->pipeline_task_queue_size->increment(1);
153
8.54M
    _wait_task.notify_one();
154
8.54M
    return Status::OK();
155
8.54M
}
156
157
138
MultiCoreTaskQueue::~MultiCoreTaskQueue() = default;
158
159
MultiCoreTaskQueue::MultiCoreTaskQueue(int core_size)
160
204
        : _prio_task_queues(core_size), _closed(false), _core_size(core_size) {}
161
162
99
void MultiCoreTaskQueue::close() {
163
99
    if (_closed) {
164
0
        return;
165
0
    }
166
99
    _closed = true;
167
    // close all priority task queue
168
99
    std::ranges::for_each(_prio_task_queues,
169
1.42k
                          [](auto& prio_task_queue) { prio_task_queue.close(); });
170
99
}
171
172
15.5M
PipelineTaskSPtr MultiCoreTaskQueue::take(int core_id) {
173
15.5M
    PipelineTaskSPtr task = nullptr;
174
49.7M
    while (!_closed) {
175
18.4E
        DCHECK(_prio_task_queues.size() > core_id)
176
18.4E
                << " list size: " << _prio_task_queues.size() << " core_id: " << core_id
177
18.4E
                << " _core_size: " << _core_size << " _next_core: " << _next_core.load();
178
42.7M
        task = _prio_task_queues[core_id].try_take(false);
179
42.7M
        if (task) {
180
1.62M
            break;
181
1.62M
        }
182
41.0M
        task = _steal_take(core_id);
183
41.0M
        if (task) {
184
3.30M
            break;
185
3.30M
        }
186
37.7M
        task = _prio_task_queues[core_id].take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */);
187
37.7M
        if (task) {
188
3.60M
            break;
189
3.60M
        }
190
37.7M
    }
191
15.5M
    if (task) {
192
8.54M
        task->pop_out_runnable_queue();
193
8.54M
    }
194
15.5M
    return task;
195
15.5M
}
196
197
40.9M
PipelineTaskSPtr MultiCoreTaskQueue::_steal_take(int core_id) {
198
40.9M
    DCHECK(core_id < _core_size);
199
40.9M
    int next_id = core_id;
200
1.80G
    for (int i = 1; i < _core_size; ++i) {
201
1.76G
        ++next_id;
202
1.76G
        if (next_id == _core_size) {
203
37.8M
            next_id = 0;
204
37.8M
        }
205
1.76G
        DCHECK(next_id < _core_size);
206
1.76G
        auto task = _prio_task_queues[next_id].try_take(true);
207
1.76G
        if (task) {
208
3.30M
            return task;
209
3.30M
        }
210
1.76G
    }
211
37.6M
    return nullptr;
212
40.9M
}
213
214
7.88M
Status MultiCoreTaskQueue::push_back(PipelineTaskSPtr task) {
215
7.88M
    int thread_id = task->get_thread_id(_core_size);
216
7.88M
    if (thread_id < 0) {
217
2.20M
        thread_id = _next_core.fetch_add(1) % _core_size;
218
2.20M
    }
219
7.88M
    return push_back(task, thread_id);
220
7.88M
}
221
222
8.52M
Status MultiCoreTaskQueue::push_back(PipelineTaskSPtr task, int core_id) {
223
8.52M
    DCHECK(core_id < _core_size);
224
8.52M
    return _prio_task_queues[core_id].push(task);
225
8.52M
}
226
227
7.88M
void MultiCoreTaskQueue::update_statistics(PipelineTask* task, int64_t time_spent) {
228
    // if the task not execute but exception early close, core_id == -1
229
    // should not do update_statistics
230
7.89M
    if (auto core_id = task->get_thread_id(_core_size); core_id >= 0) {
231
7.89M
        task->inc_runtime_ns(time_spent);
232
7.89M
        _prio_task_queues[core_id].inc_sub_queue_runtime(task->get_queue_level(), time_spent);
233
7.89M
    }
234
7.88M
}
235
236
} // namespace doris