Coverage Report

Created: 2026-03-13 03:47

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/pipeline/task_queue.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/pipeline/task_queue.h"
19
20
// IWYU pragma: no_include <bits/chrono.h>
21
#include <chrono> // IWYU pragma: keep
22
#include <memory>
23
#include <string>
24
25
#include "common/logging.h"
26
#include "exec/pipeline/pipeline_task.h"
27
#include "runtime/workload_group/workload_group.h"
28
29
namespace doris {
30
#include "common/compile_check_begin.h"
31
32
7.79M
PipelineTaskSPtr SubTaskQueue::try_take(bool is_steal) {
33
7.79M
    if (_queue.empty()) {
34
0
        return nullptr;
35
0
    }
36
7.79M
    auto task = _queue.front();
37
7.79M
    _queue.pop();
38
7.79M
    return task;
39
7.79M
}
40
41
////////////////////  PriorityTaskQueue ////////////////////
42
43
3.14k
PriorityTaskQueue::PriorityTaskQueue() : _closed(false) {
44
3.14k
    double factor = 1;
45
21.9k
    for (int i = SUB_QUEUE_LEVEL - 1; i >= 0; i--) {
46
18.8k
        _sub_queues[i].set_level_factor(factor);
47
18.8k
        factor *= LEVEL_QUEUE_TIME_FACTOR;
48
18.8k
    }
49
3.14k
}
50
51
1.12k
void PriorityTaskQueue::close() {
52
1.12k
    std::unique_lock<std::mutex> lock(_work_size_mutex);
53
1.12k
    _closed = true;
54
1.12k
    _wait_task.notify_all();
55
1.12k
    DorisMetrics::instance()->pipeline_task_queue_size->increment(-_total_task_size);
56
1.12k
}
57
58
1.54G
PipelineTaskSPtr PriorityTaskQueue::_try_take_unprotected(bool is_steal) {
59
1.54G
    if (_total_task_size == 0 || _closed) {
60
1.52G
        return nullptr;
61
1.52G
    }
62
63
18.7M
    double min_vruntime = 0;
64
18.7M
    int level = -1;
65
65.5M
    for (int i = 0; i < SUB_QUEUE_LEVEL; ++i) {
66
46.7M
        double cur_queue_vruntime = _sub_queues[i].get_vruntime();
67
46.7M
        if (!_sub_queues[i].empty()) {
68
7.80M
            if (level == -1 || cur_queue_vruntime < min_vruntime) {
69
7.80M
                level = i;
70
7.80M
                min_vruntime = cur_queue_vruntime;
71
7.80M
            }
72
7.79M
        }
73
46.7M
    }
74
18.7M
    DCHECK(level != -1);
75
18.7M
    _queue_level_min_vruntime = uint64_t(min_vruntime);
76
77
18.7M
    auto task = _sub_queues[level].try_take(is_steal);
78
18.7M
    if (task) {
79
7.80M
        task->update_queue_level(level);
80
7.80M
        _total_task_size--;
81
7.80M
        DorisMetrics::instance()->pipeline_task_queue_size->increment(-1);
82
7.80M
    }
83
18.7M
    return task;
84
1.54G
}
85
86
7.80M
int PriorityTaskQueue::_compute_level(uint64_t runtime) {
87
18.4E
    for (int i = 0; i < SUB_QUEUE_LEVEL - 1; ++i) {
88
7.80M
        if (runtime <= _queue_level_limit[i]) {
89
7.80M
            return i;
90
7.80M
        }
91
7.80M
    }
92
18.4E
    return SUB_QUEUE_LEVEL - 1;
93
7.80M
}
94
95
1.50G
PipelineTaskSPtr PriorityTaskQueue::try_take(bool is_steal) {
96
    // TODO other efficient lock? e.g. if get lock fail, return null_ptr
97
1.50G
    std::unique_lock<std::mutex> lock(_work_size_mutex);
98
1.50G
    return _try_take_unprotected(is_steal);
99
1.50G
}
100
101
29.2M
PipelineTaskSPtr PriorityTaskQueue::take(uint32_t timeout_ms) {
102
29.2M
    std::unique_lock<std::mutex> lock(_work_size_mutex);
103
29.2M
    auto task = _try_take_unprotected(false);
104
29.2M
    if (task) {
105
8.20k
        return task;
106
29.2M
    } else {
107
29.2M
        if (timeout_ms > 0) {
108
29.2M
            _wait_task.wait_for(lock, std::chrono::milliseconds(timeout_ms));
109
18.4E
        } else {
110
18.4E
            _wait_task.wait(lock);
111
18.4E
        }
112
29.2M
        return _try_take_unprotected(false);
113
29.2M
    }
114
29.2M
}
115
116
7.80M
Status PriorityTaskQueue::push(PipelineTaskSPtr task) {
117
7.80M
    if (_closed) {
118
0
        return Status::InternalError("WorkTaskQueue closed");
119
0
    }
120
7.80M
    auto level = _compute_level(task->get_runtime_ns());
121
7.80M
    std::unique_lock<std::mutex> lock(_work_size_mutex);
122
123
    // update empty queue's  runtime, to avoid too high priority
124
7.80M
    if (_sub_queues[level].empty() &&
125
7.80M
        double(_queue_level_min_vruntime) > _sub_queues[level].get_vruntime()) {
126
0
        _sub_queues[level].adjust_runtime(_queue_level_min_vruntime);
127
0
    }
128
129
7.80M
    _sub_queues[level].push_back(task);
130
7.80M
    _total_task_size++;
131
7.80M
    DorisMetrics::instance()->pipeline_task_queue_size->increment(1);
132
7.80M
    _wait_task.notify_one();
133
7.80M
    return Status::OK();
134
7.80M
}
135
136
81
MultiCoreTaskQueue::~MultiCoreTaskQueue() = default;
137
138
MultiCoreTaskQueue::MultiCoreTaskQueue(int core_size)
139
147
        : _prio_task_queues(core_size), _closed(false), _core_size(core_size) {}
140
141
60
void MultiCoreTaskQueue::close() {
142
60
    if (_closed) {
143
0
        return;
144
0
    }
145
60
    _closed = true;
146
    // close all priority task queue
147
60
    std::ranges::for_each(_prio_task_queues,
148
1.12k
                          [](auto& prio_task_queue) { prio_task_queue.close(); });
149
60
}
150
151
9.81M
PipelineTaskSPtr MultiCoreTaskQueue::take(int core_id) {
152
9.81M
    PipelineTaskSPtr task = nullptr;
153
36.7M
    while (!_closed) {
154
18.4E
        DCHECK(_prio_task_queues.size() > core_id)
155
18.4E
                << " list size: " << _prio_task_queues.size() << " core_id: " << core_id
156
18.4E
                << " _core_size: " << _core_size << " _next_core: " << _next_core.load();
157
34.7M
        task = _prio_task_queues[core_id].try_take(false);
158
34.7M
        if (task) {
159
2.11M
            break;
160
2.11M
        }
161
32.6M
        task = _steal_take(core_id);
162
32.6M
        if (task) {
163
3.37M
            break;
164
3.37M
        }
165
29.2M
        task = _prio_task_queues[core_id].take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */);
166
29.2M
        if (task) {
167
2.31M
            break;
168
2.31M
        }
169
29.2M
    }
170
9.81M
    if (task) {
171
7.80M
        task->pop_out_runnable_queue();
172
7.80M
    }
173
9.81M
    return task;
174
9.81M
}
175
176
32.5M
PipelineTaskSPtr MultiCoreTaskQueue::_steal_take(int core_id) {
177
32.5M
    DCHECK(core_id < _core_size);
178
32.5M
    int next_id = core_id;
179
1.51G
    for (int i = 1; i < _core_size; ++i) {
180
1.48G
        ++next_id;
181
1.48G
        if (next_id == _core_size) {
182
29.5M
            next_id = 0;
183
29.5M
        }
184
1.48G
        DCHECK(next_id < _core_size);
185
1.48G
        auto task = _prio_task_queues[next_id].try_take(true);
186
1.48G
        if (task) {
187
3.37M
            return task;
188
3.37M
        }
189
1.48G
    }
190
29.2M
    return nullptr;
191
32.5M
}
192
193
6.89M
Status MultiCoreTaskQueue::push_back(PipelineTaskSPtr task) {
194
6.89M
    int thread_id = task->get_thread_id(_core_size);
195
6.89M
    if (thread_id < 0) {
196
2.03M
        thread_id = _next_core.fetch_add(1) % _core_size;
197
2.03M
    }
198
6.89M
    return push_back(task, thread_id);
199
6.89M
}
200
201
7.79M
Status MultiCoreTaskQueue::push_back(PipelineTaskSPtr task, int core_id) {
202
7.79M
    DCHECK(core_id < _core_size);
203
7.79M
    task->put_in_runnable_queue();
204
7.79M
    return _prio_task_queues[core_id].push(task);
205
7.79M
}
206
207
6.88M
void MultiCoreTaskQueue::update_statistics(PipelineTask* task, int64_t time_spent) {
208
    // if the task not execute but exception early close, core_id == -1
209
    // should not do update_statistics
210
6.89M
    if (auto core_id = task->get_thread_id(_core_size); core_id >= 0) {
211
6.89M
        task->inc_runtime_ns(time_spent);
212
6.89M
        _prio_task_queues[core_id].inc_sub_queue_runtime(task->get_queue_level(), time_spent);
213
6.89M
    }
214
6.88M
}
215
216
} // namespace doris