Coverage Report

Created: 2026-04-18 03:38

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/pipeline/task_queue.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/pipeline/task_queue.h"
19
20
// IWYU pragma: no_include <bits/chrono.h>
21
#include <chrono> // IWYU pragma: keep
22
#include <memory>
23
#include <string>
24
25
#include "common/logging.h"
26
#include "exec/pipeline/pipeline_task.h"
27
#include "runtime/workload_group/workload_group.h"
28
29
namespace doris {
30
31
1.65M
PipelineTaskSPtr SubTaskQueue::try_take(bool is_steal) {
32
1.65M
    if (_queue.empty()) {
33
0
        return nullptr;
34
0
    }
35
1.65M
    auto task = _queue.front();
36
1.65M
    _queue.pop();
37
1.65M
    return task;
38
1.65M
}
39
40
////////////////////  PriorityTaskQueue ////////////////////
41
42
1.51k
PriorityTaskQueue::PriorityTaskQueue() : _closed(false) {
43
1.51k
    double factor = 1;
44
10.5k
    for (int i = SUB_QUEUE_LEVEL - 1; i >= 0; i--) {
45
9.08k
        _sub_queues[i].set_level_factor(factor);
46
9.08k
        factor *= LEVEL_QUEUE_TIME_FACTOR;
47
9.08k
    }
48
1.51k
}
49
50
744
void PriorityTaskQueue::close() {
51
744
    std::unique_lock<std::mutex> lock(_work_size_mutex);
52
744
    _closed = true;
53
744
    _wait_task.notify_all();
54
744
    DorisMetrics::instance()->pipeline_task_queue_size->increment(-_total_task_size);
55
744
}
56
57
310M
PipelineTaskSPtr PriorityTaskQueue::_try_take_unprotected(bool is_steal) {
58
310M
    if (_total_task_size == 0 || _closed) {
59
308M
        return nullptr;
60
308M
    }
61
62
1.26M
    double min_vruntime = 0;
63
1.26M
    int level = -1;
64
11.2M
    for (int i = 0; i < SUB_QUEUE_LEVEL; ++i) {
65
9.95M
        double cur_queue_vruntime = _sub_queues[i].get_vruntime();
66
9.95M
        if (!_sub_queues[i].empty()) {
67
1.66M
            if (level == -1 || cur_queue_vruntime < min_vruntime) {
68
1.66M
                level = i;
69
1.66M
                min_vruntime = cur_queue_vruntime;
70
1.66M
            }
71
1.66M
        }
72
9.95M
    }
73
1.26M
    DCHECK(level != -1);
74
1.26M
    _queue_level_min_vruntime = uint64_t(min_vruntime);
75
76
1.26M
    auto task = _sub_queues[level].try_take(is_steal);
77
1.66M
    if (task) {
78
1.66M
        task->update_queue_level(level);
79
1.66M
        _total_task_size--;
80
1.66M
        DorisMetrics::instance()->pipeline_task_queue_size->increment(-1);
81
1.66M
    }
82
1.26M
    return task;
83
310M
}
84
85
1.66M
int PriorityTaskQueue::_compute_level(uint64_t runtime) {
86
18.4E
    for (int i = 0; i < SUB_QUEUE_LEVEL - 1; ++i) {
87
1.66M
        if (runtime <= _queue_level_limit[i]) {
88
1.66M
            return i;
89
1.66M
        }
90
1.66M
    }
91
18.4E
    return SUB_QUEUE_LEVEL - 1;
92
1.66M
}
93
94
296M
PipelineTaskSPtr PriorityTaskQueue::try_take(bool is_steal) {
95
    // TODO other efficient lock? e.g. if get lock fail, return null_ptr
96
296M
    std::unique_lock<std::mutex> lock(_work_size_mutex);
97
296M
    return _try_take_unprotected(is_steal);
98
296M
}
99
100
8.32M
PipelineTaskSPtr PriorityTaskQueue::take(uint32_t timeout_ms) {
101
8.32M
    std::unique_lock<std::mutex> lock(_work_size_mutex);
102
8.32M
    auto task = _try_take_unprotected(false);
103
8.32M
    if (task) {
104
2.12k
        return task;
105
8.32M
    } else {
106
8.32M
        if (timeout_ms > 0) {
107
8.32M
            _wait_task.wait_for(lock, std::chrono::milliseconds(timeout_ms));
108
18.4E
        } else {
109
18.4E
            _wait_task.wait(lock);
110
18.4E
        }
111
8.32M
        return _try_take_unprotected(false);
112
8.32M
    }
113
8.32M
}
114
115
1.66M
Status PriorityTaskQueue::push(PipelineTaskSPtr task) {
116
1.66M
    if (_closed) {
117
0
        return Status::InternalError("WorkTaskQueue closed");
118
0
    }
119
1.66M
    auto level = _compute_level(task->get_runtime_ns());
120
1.66M
    std::unique_lock<std::mutex> lock(_work_size_mutex);
121
122
    // update empty queue's  runtime, to avoid too high priority
123
1.66M
    if (_sub_queues[level].empty() &&
124
1.66M
        double(_queue_level_min_vruntime) > _sub_queues[level].get_vruntime()) {
125
0
        _sub_queues[level].adjust_runtime(_queue_level_min_vruntime);
126
0
    }
127
128
1.66M
    _sub_queues[level].push_back(task);
129
1.66M
    _total_task_size++;
130
1.66M
    DorisMetrics::instance()->pipeline_task_queue_size->increment(1);
131
1.66M
    _wait_task.notify_one();
132
1.66M
    return Status::OK();
133
1.66M
}
134
135
79
MultiCoreTaskQueue::~MultiCoreTaskQueue() = default;
136
137
MultiCoreTaskQueue::MultiCoreTaskQueue(int core_size)
138
106
        : _prio_task_queues(core_size), _closed(false), _core_size(core_size) {}
139
140
53
void MultiCoreTaskQueue::close() {
141
53
    if (_closed) {
142
0
        return;
143
0
    }
144
53
    _closed = true;
145
    // close all priority task queue
146
53
    std::ranges::for_each(_prio_task_queues,
147
744
                          [](auto& prio_task_queue) { prio_task_queue.close(); });
148
53
}
149
150
1.99M
PipelineTaskSPtr MultiCoreTaskQueue::take(int core_id) {
151
1.99M
    PipelineTaskSPtr task = nullptr;
152
9.39M
    while (!_closed) {
153
18.4E
        DCHECK(_prio_task_queues.size() > core_id)
154
18.4E
                << " list size: " << _prio_task_queues.size() << " core_id: " << core_id
155
18.4E
                << " _core_size: " << _core_size << " _next_core: " << _next_core.load();
156
9.06M
        task = _prio_task_queues[core_id].try_take(false);
157
9.06M
        if (task) {
158
247k
            break;
159
247k
        }
160
8.81M
        task = _steal_take(core_id);
161
8.81M
        if (task) {
162
479k
            break;
163
479k
        }
164
8.33M
        task = _prio_task_queues[core_id].take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */);
165
8.33M
        if (task) {
166
933k
            break;
167
933k
        }
168
8.33M
    }
169
1.99M
    if (task) {
170
1.66M
        task->pop_out_runnable_queue();
171
1.66M
    }
172
1.99M
    return task;
173
1.99M
}
174
175
8.80M
PipelineTaskSPtr MultiCoreTaskQueue::_steal_take(int core_id) {
176
8.80M
    DCHECK(core_id < _core_size);
177
8.80M
    int next_id = core_id;
178
298M
    for (int i = 1; i < _core_size; ++i) {
179
290M
        ++next_id;
180
290M
        if (next_id == _core_size) {
181
8.21M
            next_id = 0;
182
8.21M
        }
183
290M
        DCHECK(next_id < _core_size);
184
290M
        auto task = _prio_task_queues[next_id].try_take(true);
185
290M
        if (task) {
186
480k
            return task;
187
480k
        }
188
290M
    }
189
8.32M
    return nullptr;
190
8.80M
}
191
192
1.54M
Status MultiCoreTaskQueue::push_back(PipelineTaskSPtr task) {
193
1.54M
    int thread_id = task->get_thread_id(_core_size);
194
1.54M
    if (thread_id < 0) {
195
395k
        thread_id = _next_core.fetch_add(1) % _core_size;
196
395k
    }
197
1.54M
    return push_back(task, thread_id);
198
1.54M
}
199
200
1.65M
Status MultiCoreTaskQueue::push_back(PipelineTaskSPtr task, int core_id) {
201
1.65M
    DCHECK(core_id < _core_size);
202
1.65M
    task->put_in_runnable_queue();
203
1.65M
    return _prio_task_queues[core_id].push(task);
204
1.65M
}
205
206
1.54M
void MultiCoreTaskQueue::update_statistics(PipelineTask* task, int64_t time_spent) {
207
    // if the task not execute but exception early close, core_id == -1
208
    // should not do update_statistics
209
1.54M
    if (auto core_id = task->get_thread_id(_core_size); core_id >= 0) {
210
1.54M
        task->inc_runtime_ns(time_spent);
211
1.54M
        _prio_task_queues[core_id].inc_sub_queue_runtime(task->get_queue_level(), time_spent);
212
1.54M
    }
213
1.54M
}
214
215
} // namespace doris