Coverage Report

Created: 2026-04-02 14:19

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/adaptive_thread_pool_controller.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <bthread/unstable.h>
21
22
#include <atomic>
23
#include <cstdint>
24
#include <functional>
25
#include <map>
26
#include <mutex>
27
#include <string>
28
#include <vector>
29
30
namespace doris {
31
32
class ThreadPool;
33
class SystemMetrics;
34
class AdaptiveThreadPoolController;
35
36
// Each pool group's timer state. Heap-allocated; shared between the controller
37
// and the brpc TimerThread callback.
38
struct TimerArg {
39
    AdaptiveThreadPoolController* ctrl; // never null
40
    std::string name;
41
    int64_t interval_ms;
42
43
    // Set by cancel() before calling bthread_timer_del. The callback checks
44
    // this flag after acquiring `mu` and skips re-registration when true.
45
    std::atomic<bool> stopped {false};
46
47
    // Tracks the most recently registered timer id. Updated under `mu` by the
48
    // callback after each re-registration; read by cancel() to call
49
    // bthread_timer_del on the latest pending timer.
50
    std::atomic<bthread_timer_t> timer_id {0};
51
52
    // Held for the entire duration of the callback (fire + re-registration).
53
    // cancel() acquires it after bthread_timer_del to wait for any in-flight
54
    // invocation to complete before freeing `this`.
55
    std::mutex mu;
56
};
57
58
// AdaptiveThreadPoolController dynamically adjusts thread pool sizes based on
59
// system load (IO utilisation, CPU load average, flush queue depth).
60
//
61
// Each registered pool group runs as a one-shot bthread_timer_add chain: the
62
// callback fires, adjusts the pool, then re-registers the next one-shot timer.
63
// All groups share the single brpc TimerThread, keeping the overhead minimal.
64
//
65
// Usage:
66
//   AdaptiveThreadPoolController ctrl;
67
//   ctrl.init(system_metrics, s3_pool);
68
//   ctrl.add("flush", {pool1, pool2},
69
//       AdaptiveThreadPoolController::make_flush_adjust_func(&ctrl, pool1),
70
//       max_per_cpu, min_per_cpu);
71
//   // ... later ...
72
//   ctrl.cancel("flush");   // or ctrl.stop()
73
class AdaptiveThreadPoolController {
74
public:
75
    using AdjustFunc =
76
            std::function<int(int current, int min_threads, int max_threads, std::string& reason)>;
77
78
    static constexpr int kDefaultIntervalMs = 10000;
79
80
    static constexpr int kQueueThreshold = 10;
81
    static constexpr int kIOBusyThresholdPercent = 90;
82
    static constexpr int kCPUBusyThresholdPercent = 90;
83
    static constexpr int kS3QueueBusyThreshold = 100;
84
85
509
    AdaptiveThreadPoolController() = default;
86
509
    ~AdaptiveThreadPoolController() { stop(); }
87
88
    // Initialize with system-level dependencies.
89
    void init(SystemMetrics* system_metrics, ThreadPool* s3_file_upload_pool);
90
91
    // Cancel all registered pool groups. Must be called before the pools are destroyed.
92
    void stop();
93
94
    // Register a pool group and start a recurring bthread_timer_add chain.
95
    void add(std::string name, std::vector<ThreadPool*> pools, AdjustFunc adjust_func,
96
             double max_threads_per_cpu, double min_threads_per_cpu,
97
             int64_t interval_ms = kDefaultIntervalMs);
98
99
    // Cancel the timer chain and remove the pool group. Blocks until any
100
    // in-flight callback finishes, then returns. Safe to call before pool teardown.
101
    void cancel(const std::string& name);
102
103
    // Fire all registered groups once, ignoring the schedule. For testing.
104
    void adjust_once();
105
106
    // Get current thread count for a named group. For testing/debugging.
107
    int get_current_threads(const std::string& name) const;
108
109
    // System-state helpers; safe to call from inside an AdjustFunc.
110
    bool is_io_busy();
111
    bool is_cpu_busy();
112
113
    // Factory: standard flush-pool adjust function.
114
    static AdjustFunc make_flush_adjust_func(AdaptiveThreadPoolController* controller,
115
                                             ThreadPool* flush_pool);
116
117
    // Callback registered with bthread_timer_add. Public only for the C linkage
118
    // requirement; do not call directly.
119
    static void _on_timer(void* arg);
120
121
private:
122
    struct PoolGroup {
123
        std::string name;
124
        std::vector<ThreadPool*> pools;
125
        AdjustFunc adjust_func;
126
        double max_threads_per_cpu = 4.0;
127
        double min_threads_per_cpu = 0.5;
128
        int current_threads = 0;
129
        TimerArg* timer_arg = nullptr; // owned; freed by cancel()
130
131
        int get_max_threads() const;
132
        int get_min_threads() const;
133
    };
134
135
    // Run one group's adjustment. Called from _on_timer (no lock on entry).
136
    void _fire_group(const std::string& name);
137
138
    void _apply_thread_count(PoolGroup& group, int target_threads, const std::string& reason);
139
140
private:
141
    SystemMetrics* _system_metrics = nullptr;
142
    ThreadPool* _s3_file_upload_pool = nullptr;
143
144
    mutable std::mutex _mutex;
145
    std::map<std::string, PoolGroup> _pool_groups;
146
147
    // Last successfully computed IO-busy result. Returned as-is when the
148
    // measurement interval is too short to produce a valid new delta.
149
    bool _last_io_busy = false;
150
151
    // For disk IO util calculation (used by is_io_busy).
152
    std::map<std::string, int64_t> _last_disk_io_time;
153
    int64_t _last_check_time_sec = 0;
154
};
155
156
} // namespace doris