Coverage Report

Created: 2025-04-15 00:19

/root/doris/be/src/pipeline/task_queue.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "task_queue.h"
19
20
// IWYU pragma: no_include <bits/chrono.h>
21
#include <chrono> // IWYU pragma: keep
22
#include <memory>
23
#include <string>
24
25
#include "common/logging.h"
26
#include "pipeline/pipeline_task.h"
27
#include "runtime/workload_group/workload_group.h"
28
29
namespace doris::pipeline {
30
#include "common/compile_check_begin.h"
31
32
10
PipelineTaskSPtr SubTaskQueue::try_take(bool is_steal) {
33
10
    if (_queue.empty()) {
34
0
        return nullptr;
35
0
    }
36
10
    auto task = _queue.front();
37
10
    _queue.pop();
38
10
    return task;
39
10
}
40
41
////////////////////  PriorityTaskQueue ////////////////////
42
43
24
PriorityTaskQueue::PriorityTaskQueue() : _closed(false) {
44
24
    double factor = 1;
45
168
    for (int i = SUB_QUEUE_LEVEL - 1; i >= 0; i--) {
46
144
        _sub_queues[i].set_level_factor(factor);
47
144
        factor *= LEVEL_QUEUE_TIME_FACTOR;
48
144
    }
49
24
}
50
51
1
void PriorityTaskQueue::close() {
52
1
    std::unique_lock<std::mutex> lock(_work_size_mutex);
53
1
    _closed = true;
54
1
    _wait_task.notify_all();
55
1
}
56
57
4.55M
PipelineTaskSPtr PriorityTaskQueue::_try_take_unprotected(bool is_steal) {
58
4.56M
    if (_total_task_size == 0 || _closed) {
59
4.56M
        return nullptr;
60
4.56M
    }
61
62
18.4E
    double min_vruntime = 0;
63
18.4E
    int level = -1;
64
18.4E
    for (int i = 0; i < SUB_QUEUE_LEVEL; ++i) {
65
60
        double cur_queue_vruntime = _sub_queues[i].get_vruntime();
66
60
        if (!_sub_queues[i].empty()) {
67
10
            if (level == -1 || cur_queue_vruntime < min_vruntime) {
68
10
                level = i;
69
10
                min_vruntime = cur_queue_vruntime;
70
10
            }
71
10
        }
72
60
    }
73
18.4E
    DCHECK(level != -1);
74
18.4E
    _queue_level_min_vruntime = uint64_t(min_vruntime);
75
76
18.4E
    auto task = _sub_queues[level].try_take(is_steal);
77
18.4E
    if (task) {
78
10
        task->update_queue_level(level);
79
10
        _total_task_size--;
80
10
    }
81
18.4E
    return task;
82
4.55M
}
83
84
23
int PriorityTaskQueue::_compute_level(uint64_t runtime) {
85
23
    for (int i = 0; i < SUB_QUEUE_LEVEL - 1; ++i) {
86
23
        if (runtime <= _queue_level_limit[i]) {
87
23
            return i;
88
23
        }
89
23
    }
90
0
    return SUB_QUEUE_LEVEL - 1;
91
23
}
92
93
3.64M
PipelineTaskSPtr PriorityTaskQueue::try_take(bool is_steal) {
94
    // TODO other efficient lock? e.g. if get lock fail, return null_ptr
95
3.64M
    std::unique_lock<std::mutex> lock(_work_size_mutex);
96
3.64M
    return _try_take_unprotected(is_steal);
97
3.64M
}
98
99
457k
PipelineTaskSPtr PriorityTaskQueue::take(uint32_t timeout_ms) {
100
457k
    std::unique_lock<std::mutex> lock(_work_size_mutex);
101
457k
    auto task = _try_take_unprotected(false);
102
457k
    if (task) {
103
0
        return task;
104
457k
    } else {
105
457k
        if (timeout_ms > 0) {
106
457k
            _wait_task.wait_for(lock, std::chrono::milliseconds(timeout_ms));
107
18.4E
        } else {
108
18.4E
            _wait_task.wait(lock);
109
18.4E
        }
110
457k
        return _try_take_unprotected(false);
111
457k
    }
112
457k
}
113
114
23
Status PriorityTaskQueue::push(PipelineTaskSPtr task) {
115
23
    if (_closed) {
116
0
        return Status::InternalError("WorkTaskQueue closed");
117
0
    }
118
23
    auto level = _compute_level(task->get_runtime_ns());
119
23
    std::unique_lock<std::mutex> lock(_work_size_mutex);
120
121
    // update empty queue's  runtime, to avoid too high priority
122
23
    if (_sub_queues[level].empty() &&
123
23
        double(_queue_level_min_vruntime) > _sub_queues[level].get_vruntime()) {
124
0
        _sub_queues[level].adjust_runtime(_queue_level_min_vruntime);
125
0
    }
126
127
23
    _sub_queues[level].push_back(task);
128
23
    _total_task_size++;
129
23
    _wait_task.notify_one();
130
23
    return Status::OK();
131
23
}
132
133
16
MultiCoreTaskQueue::~MultiCoreTaskQueue() = default;
134
135
MultiCoreTaskQueue::MultiCoreTaskQueue(int core_size)
136
17
        : _prio_task_queues(core_size), _closed(false), _core_size(core_size) {}
137
138
1
void MultiCoreTaskQueue::close() {
139
1
    if (_closed) {
140
0
        return;
141
0
    }
142
1
    _closed = true;
143
    // close all priority task queue
144
1
    std::ranges::for_each(_prio_task_queues,
145
1
                          [](auto& prio_task_queue) { prio_task_queue.close(); });
146
1
}
147
148
8
PipelineTaskSPtr MultiCoreTaskQueue::take(int core_id) {
149
8
    PipelineTaskSPtr task = nullptr;
150
455k
    while (!_closed) {
151
455k
        DCHECK(_prio_task_queues.size() > core_id)
152
145
                << " list size: " << _prio_task_queues.size() << " core_id: " << core_id
153
145
                << " _core_size: " << _core_size << " _next_core: " << _next_core.load();
154
455k
        task = _prio_task_queues[core_id].try_take(false);
155
455k
        if (task) {
156
0
            break;
157
0
        }
158
455k
        task = _steal_take(core_id);
159
455k
        if (task) {
160
0
            break;
161
0
        }
162
455k
        task = _prio_task_queues[core_id].take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */);
163
455k
        if (task) {
164
0
            break;
165
0
        }
166
455k
    }
167
8
    if (task) {
168
0
        task->pop_out_runnable_queue();
169
0
    }
170
8
    return task;
171
8
}
172
173
456k
PipelineTaskSPtr MultiCoreTaskQueue::_steal_take(int core_id) {
174
456k
    DCHECK(core_id < _core_size);
175
456k
    int next_id = core_id;
176
3.65M
    for (int i = 1; i < _core_size; ++i) {
177
3.19M
        ++next_id;
178
3.19M
        if (next_id == _core_size) {
179
402k
            next_id = 0;
180
402k
        }
181
3.19M
        DCHECK(next_id < _core_size);
182
3.19M
        auto task = _prio_task_queues[next_id].try_take(true);
183
3.19M
        if (task) {
184
0
            return task;
185
0
        }
186
3.19M
    }
187
456k
    return nullptr;
188
456k
}
189
190
23
Status MultiCoreTaskQueue::push_back(PipelineTaskSPtr task) {
191
23
    int core_id = task->get_core_id();
192
23
    if (core_id < 0) {
193
23
        core_id = _next_core.fetch_add(1) % _core_size;
194
23
    }
195
23
    return push_back(task, core_id);
196
23
}
197
198
23
Status MultiCoreTaskQueue::push_back(PipelineTaskSPtr task, int core_id) {
199
23
    DCHECK(core_id < _core_size);
200
23
    task->put_in_runnable_queue();
201
23
    return _prio_task_queues[core_id].push(task);
202
23
}
203
204
42
void MultiCoreTaskQueue::update_statistics(PipelineTask* task, int64_t time_spent) {
205
    // if the task not execute but exception early close, core_id == -1
206
    // should not do update_statistics
207
42
    if (auto core_id = task->get_core_id(); core_id >= 0) {
208
0
        task->inc_runtime_ns(time_spent);
209
0
        _prio_task_queues[core_id].inc_sub_queue_runtime(task->get_queue_level(), time_spent);
210
0
    }
211
42
}
212
213
} // namespace doris::pipeline