Coverage Report

Created: 2025-07-23 16:39

/root/doris/be/src/pipeline/task_queue.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "task_queue.h"
19
20
// IWYU pragma: no_include <bits/chrono.h>
21
#include <chrono> // IWYU pragma: keep
22
#include <memory>
23
#include <string>
24
25
#include "common/logging.h"
26
#include "pipeline/pipeline_task.h"
27
#include "runtime/workload_group/workload_group.h"
28
29
namespace doris::pipeline {
30
31
0
TaskQueue::~TaskQueue() = default;
32
33
0
PipelineTask* SubTaskQueue::try_take(bool is_steal) {
34
0
    if (_queue.empty()) {
  Branch (34:9): [True: 0, False: 0]
35
0
        return nullptr;
36
0
    }
37
0
    auto task = _queue.front();
38
0
    _queue.pop();
39
0
    return task;
40
0
}
41
42
////////////////////  PriorityTaskQueue ////////////////////
43
44
0
PriorityTaskQueue::PriorityTaskQueue() : _closed(false) {
45
0
    double factor = 1;
46
0
    for (int i = SUB_QUEUE_LEVEL - 1; i >= 0; i--) {
  Branch (46:39): [True: 0, False: 0]
47
0
        _sub_queues[i].set_level_factor(factor);
48
0
        factor *= LEVEL_QUEUE_TIME_FACTOR;
49
0
    }
50
0
}
51
52
0
void PriorityTaskQueue::close() {
53
0
    std::unique_lock<std::mutex> lock(_work_size_mutex);
54
0
    _closed = true;
55
0
    _wait_task.notify_all();
56
0
    DorisMetrics::instance()->pipeline_task_queue_size->increment(-_total_task_size);
57
0
}
58
59
0
PipelineTask* PriorityTaskQueue::_try_take_unprotected(bool is_steal) {
60
0
    if (_total_task_size == 0 || _closed) {
  Branch (60:9): [True: 0, False: 0]
  Branch (60:34): [True: 0, False: 0]
61
0
        return nullptr;
62
0
    }
63
64
0
    double min_vruntime = 0;
65
0
    int level = -1;
66
0
    for (int i = 0; i < SUB_QUEUE_LEVEL; ++i) {
  Branch (66:21): [True: 0, False: 0]
67
0
        double cur_queue_vruntime = _sub_queues[i].get_vruntime();
68
0
        if (!_sub_queues[i].empty()) {
  Branch (68:13): [True: 0, False: 0]
69
0
            if (level == -1 || cur_queue_vruntime < min_vruntime) {
  Branch (69:17): [True: 0, False: 0]
  Branch (69:32): [True: 0, False: 0]
70
0
                level = i;
71
0
                min_vruntime = cur_queue_vruntime;
72
0
            }
73
0
        }
74
0
    }
75
0
    DCHECK(level != -1);
76
0
    _queue_level_min_vruntime = uint64_t(min_vruntime);
77
78
0
    auto task = _sub_queues[level].try_take(is_steal);
79
0
    if (task) {
  Branch (79:9): [True: 0, False: 0]
80
0
        task->update_queue_level(level);
81
0
        _total_task_size--;
82
0
        DorisMetrics::instance()->pipeline_task_queue_size->increment(-1);
83
0
    }
84
0
    return task;
85
0
}
86
87
0
int PriorityTaskQueue::_compute_level(uint64_t runtime) {
88
0
    for (int i = 0; i < SUB_QUEUE_LEVEL - 1; ++i) {
  Branch (88:21): [True: 0, False: 0]
89
0
        if (runtime <= _queue_level_limit[i]) {
  Branch (89:13): [True: 0, False: 0]
90
0
            return i;
91
0
        }
92
0
    }
93
0
    return SUB_QUEUE_LEVEL - 1;
94
0
}
95
96
0
PipelineTask* PriorityTaskQueue::try_take(bool is_steal) {
97
    // TODO other efficient lock? e.g. if get lock fail, return null_ptr
98
0
    std::unique_lock<std::mutex> lock(_work_size_mutex);
99
0
    return _try_take_unprotected(is_steal);
100
0
}
101
102
0
PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) {
103
0
    std::unique_lock<std::mutex> lock(_work_size_mutex);
104
0
    auto task = _try_take_unprotected(false);
105
0
    if (task) {
  Branch (105:9): [True: 0, False: 0]
106
0
        return task;
107
0
    } else {
108
0
        if (timeout_ms > 0) {
  Branch (108:13): [True: 0, False: 0]
109
0
            _wait_task.wait_for(lock, std::chrono::milliseconds(timeout_ms));
110
0
        } else {
111
0
            _wait_task.wait(lock);
112
0
        }
113
0
        return _try_take_unprotected(false);
114
0
    }
115
0
}
116
117
0
Status PriorityTaskQueue::push(PipelineTask* task) {
118
0
    if (_closed) {
  Branch (118:9): [True: 0, False: 0]
119
0
        return Status::InternalError("WorkTaskQueue closed");
120
0
    }
121
0
    auto level = _compute_level(task->get_runtime_ns());
122
0
    std::unique_lock<std::mutex> lock(_work_size_mutex);
123
124
    // update empty queue's  runtime, to avoid too high priority
125
0
    if (_sub_queues[level].empty() &&
  Branch (125:9): [True: 0, False: 0]
126
0
        _queue_level_min_vruntime > _sub_queues[level].get_vruntime()) {
  Branch (126:9): [True: 0, False: 0]
127
0
        _sub_queues[level].adjust_runtime(_queue_level_min_vruntime);
128
0
    }
129
130
0
    _sub_queues[level].push_back(task);
131
0
    _total_task_size++;
132
0
    DorisMetrics::instance()->pipeline_task_queue_size->increment(1);
133
0
    _wait_task.notify_one();
134
0
    return Status::OK();
135
0
}
136
137
0
MultiCoreTaskQueue::~MultiCoreTaskQueue() = default;
138
139
0
MultiCoreTaskQueue::MultiCoreTaskQueue(int core_size) : TaskQueue(core_size), _closed(false) {
140
0
    _prio_task_queue_list =
141
0
            std::make_shared<std::vector<std::unique_ptr<PriorityTaskQueue>>>(core_size);
142
0
    for (int i = 0; i < core_size; i++) {
  Branch (142:21): [True: 0, False: 0]
143
0
        (*_prio_task_queue_list)[i] = std::make_unique<PriorityTaskQueue>();
144
0
    }
145
0
}
146
147
0
void MultiCoreTaskQueue::close() {
148
0
    if (_closed) {
  Branch (148:9): [True: 0, False: 0]
149
0
        return;
150
0
    }
151
0
    _closed = true;
152
0
    for (int i = 0; i < _core_size; ++i) {
  Branch (152:21): [True: 0, False: 0]
153
0
        (*_prio_task_queue_list)[i]->close();
154
0
    }
155
0
}
156
157
0
PipelineTask* MultiCoreTaskQueue::take(int core_id) {
158
0
    PipelineTask* task = nullptr;
159
0
    while (!_closed) {
  Branch (159:12): [True: 0, False: 0]
160
0
        DCHECK(_prio_task_queue_list->size() > core_id)
161
0
                << " list size: " << _prio_task_queue_list->size() << " core_id: " << core_id
162
0
                << " _core_size: " << _core_size << " _next_core: " << _next_core.load();
163
0
        task = (*_prio_task_queue_list)[core_id]->try_take(false);
164
0
        if (task) {
  Branch (164:13): [True: 0, False: 0]
165
0
            task->set_core_id(core_id);
166
0
            break;
167
0
        }
168
0
        task = _steal_take(core_id, *_prio_task_queue_list);
169
0
        if (task) {
  Branch (169:13): [True: 0, False: 0]
170
0
            break;
171
0
        }
172
0
        task = (*_prio_task_queue_list)[core_id]->take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */);
173
0
        if (task) {
  Branch (173:13): [True: 0, False: 0]
174
0
            task->set_core_id(core_id);
175
0
            break;
176
0
        }
177
0
    }
178
0
    if (task) {
  Branch (178:9): [True: 0, False: 0]
179
0
        task->pop_out_runnable_queue();
180
0
    }
181
0
    return task;
182
0
}
183
184
PipelineTask* MultiCoreTaskQueue::_steal_take(
185
0
        int core_id, std::vector<std::unique_ptr<PriorityTaskQueue>>& prio_task_queue_list) {
186
0
    DCHECK(core_id < _core_size);
187
0
    int next_id = core_id;
188
0
    for (int i = 1; i < _core_size; ++i) {
  Branch (188:21): [True: 0, False: 0]
189
0
        ++next_id;
190
0
        if (next_id == _core_size) {
  Branch (190:13): [True: 0, False: 0]
191
0
            next_id = 0;
192
0
        }
193
0
        DCHECK(next_id < _core_size);
194
0
        auto task = prio_task_queue_list[next_id]->try_take(true);
195
0
        if (task) {
  Branch (195:13): [True: 0, False: 0]
196
0
            task->set_core_id(next_id);
197
0
            return task;
198
0
        }
199
0
    }
200
0
    return nullptr;
201
0
}
202
203
0
Status MultiCoreTaskQueue::push_back(PipelineTask* task) {
204
0
    int core_id = task->get_previous_core_id();
205
0
    if (core_id < 0) {
  Branch (205:9): [True: 0, False: 0]
206
0
        core_id = _next_core.fetch_add(1) % _core_size;
207
0
    }
208
0
    return push_back(task, core_id);
209
0
}
210
211
0
Status MultiCoreTaskQueue::push_back(PipelineTask* task, int core_id) {
212
0
    DCHECK(core_id < _core_size);
213
0
    task->put_in_runnable_queue();
214
0
    return (*_prio_task_queue_list)[core_id]->push(task);
215
0
}
216
217
0
void MultiCoreTaskQueue::update_statistics(PipelineTask* task, int64_t time_spent) {
218
0
    task->inc_runtime_ns(time_spent);
219
0
    (*_prio_task_queue_list)[task->get_core_id()]->inc_sub_queue_runtime(task->get_queue_level(),
220
0
                                                                         time_spent);
221
0
}
222
223
} // namespace doris::pipeline