Coverage Report

Created: 2026-04-02 18:01

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/adaptive_thread_pool_controller.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/adaptive_thread_pool_controller.h"
19
20
#include <butil/time.h>
21
22
#include <algorithm>
23
#include <thread>
24
25
#include "cloud/config.h"
26
#include "common/config.h"
27
#include "common/logging.h"
28
#include "common/metrics/system_metrics.h"
29
#include "common/status.h"
30
#include "util/threadpool.h"
31
#include "util/time.h"
32
33
namespace doris {
34
35
40
int AdaptiveThreadPoolController::PoolGroup::get_max_threads() const {
36
40
    int num_cpus = std::thread::hardware_concurrency();
37
40
    if (num_cpus <= 0) num_cpus = 1;
38
40
    return static_cast<int>(num_cpus * max_threads_per_cpu);
39
40
}
40
41
26
int AdaptiveThreadPoolController::PoolGroup::get_min_threads() const {
42
26
    int num_cpus = std::thread::hardware_concurrency();
43
26
    if (num_cpus <= 0) num_cpus = 1;
44
26
    return std::max(1, static_cast<int>(num_cpus * min_threads_per_cpu));
45
26
}
46
47
// Static callback registered with bthread_timer_add.
48
// Runs in brpc TimerThread. Must be fast and non-blocking.
49
0
void AdaptiveThreadPoolController::_on_timer(void* raw) {
50
0
    auto* arg = static_cast<TimerArg*>(raw);
51
52
    // Hold mu for the entire callback (fire + re-registration).
53
    // cancel() acquires mu after bthread_timer_del, so this provides
54
    // cancel-with-wait semantics without a dedicated thread.
55
0
    std::lock_guard<std::mutex> lk(arg->mu);
56
57
0
    if (arg->stopped.load(std::memory_order_acquire)) {
58
        // cancel() set stopped before we took the lock.
59
        // cancel() owns arg and will delete it after taking mu.
60
0
        return;
61
0
    }
62
63
0
    arg->ctrl->_fire_group(arg->name);
64
65
0
    if (arg->stopped.load(std::memory_order_acquire)) {
66
0
        return; // cancel() will clean up
67
0
    }
68
69
    // Re-register the next one-shot timer.
70
0
    bthread_timer_t tid;
71
0
    if (bthread_timer_add(&tid, butil::milliseconds_from_now(arg->interval_ms), _on_timer, arg) ==
72
0
        0) {
73
0
        arg->timer_id.store(tid, std::memory_order_release);
74
0
    } else {
75
0
        LOG(WARNING) << "Adaptive: failed to re-register timer for group '" << arg->name << "'";
76
0
    }
77
0
}
78
79
void AdaptiveThreadPoolController::init(SystemMetrics* system_metrics,
80
12
                                        ThreadPool* s3_file_upload_pool) {
81
12
    _system_metrics = system_metrics;
82
12
    _s3_file_upload_pool = s3_file_upload_pool;
83
12
}
84
85
1.00k
void AdaptiveThreadPoolController::stop() {
86
1.00k
    std::vector<std::string> names;
87
1.00k
    {
88
1.00k
        std::lock_guard<std::mutex> lk(_mutex);
89
1.00k
        for (const auto& [name, _] : _pool_groups) {
90
13
            names.push_back(name);
91
13
        }
92
1.00k
    }
93
1.00k
    for (const auto& name : names) {
94
13
        cancel(name);
95
13
    }
96
1.00k
}
97
98
void AdaptiveThreadPoolController::add(std::string name, std::vector<ThreadPool*> pools,
99
                                       AdjustFunc adjust_func, double max_threads_per_cpu,
100
14
                                       double min_threads_per_cpu, int64_t interval_ms) {
101
14
    PoolGroup group;
102
14
    group.name = name;
103
14
    group.pools = std::move(pools);
104
14
    group.adjust_func = std::move(adjust_func);
105
14
    group.max_threads_per_cpu = max_threads_per_cpu;
106
14
    group.min_threads_per_cpu = min_threads_per_cpu;
107
14
    group.current_threads = group.get_max_threads();
108
109
14
    int log_max = group.get_max_threads();
110
14
    int log_min = group.get_min_threads();
111
112
14
    auto* arg = new TimerArg();
113
14
    arg->ctrl = this;
114
14
    arg->name = name;
115
14
    arg->interval_ms = interval_ms;
116
117
14
    bthread_timer_t tid;
118
14
    if (bthread_timer_add(&tid, butil::milliseconds_from_now(interval_ms), _on_timer, arg) == 0) {
119
14
        arg->timer_id.store(tid, std::memory_order_release);
120
14
    } else {
121
0
        LOG(WARNING) << "Adaptive: failed to register timer for pool group '" << name << "'";
122
0
    }
123
14
    group.timer_arg = arg;
124
125
14
    {
126
14
        std::lock_guard<std::mutex> lk(_mutex);
127
14
        _pool_groups[name] = std::move(group);
128
14
    }
129
130
14
    LOG(INFO) << "Adaptive: added pool group '" << name << "'"
131
14
              << ", max_threads=" << log_max << ", min_threads=" << log_min
132
14
              << ", interval_ms=" << interval_ms;
133
14
}
134
135
14
void AdaptiveThreadPoolController::cancel(const std::string& name) {
136
14
    TimerArg* arg = nullptr;
137
14
    {
138
14
        std::lock_guard<std::mutex> lk(_mutex);
139
14
        auto it = _pool_groups.find(name);
140
14
        if (it != _pool_groups.end()) {
141
14
            arg = it->second.timer_arg;
142
14
            _pool_groups.erase(it);
143
14
        }
144
14
    }
145
146
14
    if (arg == nullptr) {
147
0
        return;
148
0
    }
149
150
    // Signal the callback to stop re-registering.
151
14
    arg->stopped.store(true, std::memory_order_release);
152
153
    // Try to cancel a pending (not yet fired) timer. Read timer_id after
154
    // setting stopped so any re-registration in a concurrent callback has
155
    // already stored the latest id by now (it holds mu, which we haven't
156
    // taken yet).
157
14
    bthread_timer_t tid = arg->timer_id.load(std::memory_order_acquire);
158
14
    bthread_timer_del(tid); // returns non-zero if already fired; that's fine
159
160
    // Wait for any in-flight callback to finish. The callback holds mu while
161
    // running _fire_group and re-registering, so acquiring mu here ensures
162
    // we don't free arg while the callback is still executing.
163
14
    { std::lock_guard<std::mutex> lk(arg->mu); }
164
165
14
    delete arg;
166
14
    LOG(INFO) << "Adaptive: cancelled pool group '" << name << "'";
167
14
}
168
169
// Called from _on_timer. No lock held on entry.
170
6
void AdaptiveThreadPoolController::_fire_group(const std::string& name) {
171
6
    if (!config::enable_adaptive_flush_threads) {
172
0
        return;
173
0
    }
174
    // Phase 1: snapshot parameters under the lock.
175
6
    AdjustFunc fn;
176
6
    int current, min_t, max_t;
177
6
    {
178
6
        std::lock_guard<std::mutex> lk(_mutex);
179
6
        auto it = _pool_groups.find(name);
180
6
        if (it == _pool_groups.end()) return;
181
6
        const PoolGroup& g = it->second;
182
6
        fn = g.adjust_func;
183
6
        current = g.current_threads;
184
6
        min_t = g.get_min_threads();
185
6
        max_t = g.get_max_threads();
186
6
    }
187
188
    // Phase 2: compute target — no lock held (adjust_func may call is_io_busy etc.).
189
0
    std::string reason;
190
6
    int target = fn(current, min_t, max_t, reason);
191
192
    // Phase 3: apply under lock; recheck in case cancel() raced with us.
193
6
    std::lock_guard<std::mutex> lk(_mutex);
194
6
    auto it = _pool_groups.find(name);
195
6
    if (it == _pool_groups.end()) return;
196
6
    _apply_thread_count(it->second, target, reason);
197
6
}
198
199
// Fire all groups once regardless of schedule. For testing.
200
5
void AdaptiveThreadPoolController::adjust_once() {
201
5
    std::vector<std::string> names;
202
5
    {
203
5
        std::lock_guard<std::mutex> lk(_mutex);
204
6
        for (const auto& [name, _] : _pool_groups) {
205
6
            names.push_back(name);
206
6
        }
207
5
    }
208
6
    for (const auto& name : names) {
209
6
        _fire_group(name);
210
6
    }
211
5
}
212
213
void AdaptiveThreadPoolController::_apply_thread_count(PoolGroup& group, int target_threads,
214
6
                                                       const std::string& reason) {
215
6
    int max_threads = group.get_max_threads();
216
6
    int min_threads = group.get_min_threads();
217
6
    target_threads = std::max(min_threads, std::min(max_threads, target_threads));
218
6
    if (target_threads == group.current_threads) return;
219
220
6
    LOG(INFO) << "Adaptive[" << group.name << "]: adjusting threads from " << group.current_threads
221
2
              << " to " << target_threads << " (min=" << min_threads << ", max=" << max_threads
222
2
              << ")" << (reason.empty() ? "" : " reason=[" + reason + "]");
223
224
2
    bool all_success = true;
225
2
    for (auto* pool : group.pools) {
226
2
        if (pool == nullptr) continue;
227
        // Always sync min_threads to guard against races with update_memtable_flush_threads().
228
        // Order matters: when increasing, set max first so max >= min is always satisfied;
229
        // when decreasing, set min first so the new max is never below min.
230
2
        Status st;
231
2
        if (target_threads >= group.current_threads) {
232
0
            st = pool->set_max_threads(target_threads);
233
0
            if (st.ok()) static_cast<void>(pool->set_min_threads(min_threads));
234
2
        } else {
235
2
            st = pool->set_min_threads(min_threads);
236
2
            if (st.ok()) st = pool->set_max_threads(target_threads);
237
2
        }
238
2
        if (!st.ok()) {
239
0
            all_success = false;
240
0
            LOG(WARNING) << "Adaptive[" << group.name << "]: failed to set threads: " << st;
241
0
        }
242
2
    }
243
2
    if (all_success) {
244
2
        group.current_threads = target_threads;
245
2
    }
246
2
}
247
248
15
int AdaptiveThreadPoolController::get_current_threads(const std::string& name) const {
249
15
    std::lock_guard<std::mutex> lk(_mutex);
250
15
    auto it = _pool_groups.find(name);
251
15
    return it != _pool_groups.end() ? it->second.current_threads : 0;
252
15
}
253
254
2
bool AdaptiveThreadPoolController::is_io_busy() {
255
2
    if (config::is_cloud_mode()) {
256
0
        if (_s3_file_upload_pool == nullptr) return false;
257
0
        int queue_size = _s3_file_upload_pool->get_queue_size();
258
0
        return queue_size > kS3QueueBusyThreshold;
259
0
    }
260
261
2
    if (_system_metrics == nullptr) return false;
262
263
0
    int64_t current_time_sec = MonotonicSeconds();
264
0
    int64_t interval_sec = current_time_sec - _last_check_time_sec;
265
0
    if (interval_sec <= 0) {
266
0
        return _last_io_busy;
267
0
    }
268
269
0
    int64_t max_io_util = _system_metrics->get_max_io_util(_last_disk_io_time, interval_sec);
270
0
    _system_metrics->get_disks_io_time(&_last_disk_io_time);
271
0
    _last_check_time_sec = current_time_sec;
272
273
0
    _last_io_busy = max_io_util > kIOBusyThresholdPercent;
274
0
    return _last_io_busy;
275
0
}
276
277
2
bool AdaptiveThreadPoolController::is_cpu_busy() {
278
2
    if (_system_metrics == nullptr) return false;
279
280
0
    double load_avg = _system_metrics->get_load_average_1_min();
281
0
    int num_cpus = std::thread::hardware_concurrency();
282
0
    if (num_cpus <= 0) return false;
283
284
0
    double cpu_usage_percent = (load_avg / num_cpus) * 100.0;
285
0
    return cpu_usage_percent > kCPUBusyThresholdPercent;
286
0
}
287
288
AdaptiveThreadPoolController::AdjustFunc AdaptiveThreadPoolController::make_flush_adjust_func(
289
0
        AdaptiveThreadPoolController* controller, ThreadPool* flush_pool) {
290
0
    return [controller, flush_pool](int current, int min_t, int max_t, std::string& reason) {
291
0
        int target = current;
292
0
        int queue_size = flush_pool->get_queue_size();
293
0
        if (queue_size > kQueueThreshold) {
294
0
            target = std::min(max_t, target + 1);
295
0
            reason += "queue_size=" + std::to_string(queue_size) + ">" +
296
0
                      std::to_string(kQueueThreshold) + " -> target=" + std::to_string(target) +
297
0
                      "; ";
298
0
        }
299
0
        if (controller->is_io_busy()) {
300
0
            target = std::max(min_t, target - 2);
301
0
            reason += "io_busy -> target=" + std::to_string(target) + "; ";
302
0
        }
303
0
        if (controller->is_cpu_busy()) {
304
0
            target = std::max(min_t, target - 2);
305
0
            reason += "cpu_busy -> target=" + std::to_string(target) + "; ";
306
0
        }
307
0
        return target;
308
0
    };
309
0
}
310
311
} // namespace doris