Coverage Report

Created: 2026-05-14 07:56

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_ms_backpressure_handler.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_ms_backpressure_handler.h"
19
20
#include <fmt/format.h>
21
#include <glog/logging.h>
22
23
#include <algorithm>
24
#include <cmath>
25
#include <queue>
26
#include <thread>
27
28
#include "cloud/config.h"
29
#include "common/status.h"
30
#include "util/thread.h"
31
32
namespace doris::cloud {
33
34
// Global bvar metrics
35
bvar::Adder<uint64_t> g_backpressure_upgrade_count("ms_rpc_backpressure_upgrade_count");
36
bvar::Window<bvar::Adder<uint64_t>> g_backpressure_upgrade_60s("ms_rpc_backpressure_upgrade_60s",
37
                                                               &g_backpressure_upgrade_count, 60);
38
bvar::Adder<uint64_t> g_backpressure_downgrade_count("ms_rpc_backpressure_downgrade_count");
39
bvar::Window<bvar::Adder<uint64_t>> g_backpressure_downgrade_60s(
40
        "ms_rpc_backpressure_downgrade_60s", &g_backpressure_downgrade_count, 60);
41
bvar::LatencyRecorder g_throttle_wait_prepare_rowset(
42
        "ms_rpc_backpressure_throttle_wait_prepare_rowset");
43
bvar::LatencyRecorder g_throttle_wait_commit_rowset(
44
        "ms_rpc_backpressure_throttle_wait_commit_rowset");
45
bvar::LatencyRecorder g_throttle_wait_update_tmp_rowset(
46
        "ms_rpc_backpressure_throttle_wait_update_tmp_rowset");
47
bvar::LatencyRecorder g_throttle_wait_update_packed_file_info(
48
        "ms_rpc_backpressure_throttle_wait_update_packed_file_info");
49
bvar::LatencyRecorder g_throttle_wait_update_delete_bitmap(
50
        "ms_rpc_backpressure_throttle_wait_update_delete_bitmap");
51
bvar::Adder<uint64_t> g_ms_busy_count("ms_rpc_backpressure_ms_busy_count");
52
bvar::Window<bvar::Adder<uint64_t>> g_ms_busy_60s("ms_rpc_backpressure_ms_busy_60s",
53
                                                  &g_ms_busy_count, 60);
54
55
static bvar::LatencyRecorder* s_throttle_wait_recorders[] = {
56
        &g_throttle_wait_prepare_rowset,       &g_throttle_wait_commit_rowset,
57
        &g_throttle_wait_update_tmp_rowset,    &g_throttle_wait_update_packed_file_info,
58
        &g_throttle_wait_update_delete_bitmap,
59
};
60
61
0
bvar::LatencyRecorder* get_throttle_wait_recorder(LoadRelatedRpc rpc) {
62
0
    size_t idx = static_cast<size_t>(rpc);
63
0
    if (idx >= static_cast<size_t>(LoadRelatedRpc::COUNT)) {
64
0
        return nullptr;
65
0
    }
66
0
    return s_throttle_wait_recorders[idx];
67
0
}
68
69
// ============== StrictQpsLimiter ==============
70
71
13
StrictQpsLimiter::StrictQpsLimiter(double qps) {
72
13
    if (qps <= 0) {
73
1
        qps = 1.0;
74
1
    }
75
13
    _interval_ns = static_cast<int64_t>(1e9 / qps);
76
13
    _next_allowed_time = Clock::now();
77
13
}
78
79
997
StrictQpsLimiter::Clock::time_point StrictQpsLimiter::reserve() {
80
997
    std::lock_guard lock(_mtx);
81
997
    auto now = Clock::now();
82
997
    if (_next_allowed_time <= now) {
83
4
        _next_allowed_time = now + std::chrono::nanoseconds(_interval_ns);
84
4
        return now;
85
4
    }
86
993
    auto result = _next_allowed_time;
87
993
    _next_allowed_time += std::chrono::nanoseconds(_interval_ns);
88
993
    return result;
89
997
}
90
91
1
void StrictQpsLimiter::update_qps(double new_qps) {
92
1
    if (new_qps <= 0) {
93
0
        new_qps = 1.0;
94
0
    }
95
1
    std::lock_guard lock(_mtx);
96
1
    _interval_ns = static_cast<int64_t>(1e9 / new_qps);
97
1
}
98
99
6
double StrictQpsLimiter::get_qps() const {
100
6
    std::lock_guard lock(_mtx);
101
6
    if (_interval_ns <= 0) {
102
0
        return 0;
103
0
    }
104
6
    return 1e9 / _interval_ns;
105
6
}
106
107
// ============== TableRpcQpsCounter ==============
108
109
TableRpcQpsCounter::TableRpcQpsCounter(int64_t table_id, LoadRelatedRpc rpc_type, int window_sec)
110
18
        : _table_id(table_id), _rpc_type(rpc_type) {
111
18
    _counter = std::make_unique<bvar::Adder<int64_t>>();
112
18
    _counter->hide();
113
18
    _qps = std::make_unique<bvar::PerSecond<bvar::Adder<int64_t>>>(_counter.get(), window_sec);
114
18
    _qps->hide();
115
18
}
116
117
766
void TableRpcQpsCounter::increment() {
118
766
    (*_counter) << 1;
119
766
}
120
121
15
double TableRpcQpsCounter::get_qps() const {
122
15
    return _qps->get_value();
123
15
}
124
125
// ============== TableRpcQpsRegistry ==============
126
127
19
TableRpcQpsRegistry::TableRpcQpsRegistry() = default;
128
129
766
void TableRpcQpsRegistry::record(LoadRelatedRpc rpc_type, int64_t table_id) {
130
766
    auto* counter = get_or_create_counter(rpc_type, table_id);
131
766
    if (counter) {
132
766
        counter->increment();
133
766
    }
134
766
}
135
136
TableRpcQpsCounter* TableRpcQpsRegistry::get_or_create_counter(LoadRelatedRpc rpc_type,
137
766
                                                               int64_t table_id) {
138
766
    size_t idx = static_cast<size_t>(rpc_type);
139
766
    if (idx >= static_cast<size_t>(LoadRelatedRpc::COUNT)) {
140
0
        return nullptr;
141
0
    }
142
143
766
    {
144
766
        std::shared_lock lock(_mutex);
145
766
        auto it = _counters[idx].find(table_id);
146
766
        if (it != _counters[idx].end()) {
147
748
            return it->second.get();
148
748
        }
149
766
    }
150
151
18
    std::unique_lock lock(_mutex);
152
    // Double check after acquiring exclusive lock
153
18
    auto it = _counters[idx].find(table_id);
154
18
    if (it != _counters[idx].end()) {
155
0
        return it->second.get();
156
0
    }
157
158
18
    auto counter = std::make_unique<TableRpcQpsCounter>(table_id, rpc_type,
159
18
                                                        config::ms_rpc_table_qps_window_sec);
160
18
    auto* ptr = counter.get();
161
18
    _counters[idx][table_id] = std::move(counter);
162
18
    return ptr;
163
18
}
164
165
std::vector<std::pair<int64_t, double>> TableRpcQpsRegistry::get_top_k_tables(
166
41
        LoadRelatedRpc rpc_type, int k) const {
167
41
    size_t idx = static_cast<size_t>(rpc_type);
168
41
    if (idx >= static_cast<size_t>(LoadRelatedRpc::COUNT) || k <= 0) {
169
4
        return {};
170
4
    }
171
172
    // Use a min-heap of size k to find top-k without allocating a vector for all tables.
173
    // The heap top is the smallest among the k largest elements seen so far.
174
37
    using Entry = std::pair<int64_t, double>; // (table_id, qps)
175
37
    auto min_cmp = [](const Entry& a, const Entry& b) { return a.second > b.second; };
176
37
    std::priority_queue<Entry, std::vector<Entry>, decltype(min_cmp)> min_heap(min_cmp);
177
178
37
    {
179
37
        std::shared_lock lock(_mutex);
180
37
        for (const auto& [table_id, counter] : _counters[idx]) {
181
14
            double qps = counter->get_qps();
182
14
            if (qps > 0) {
183
13
                if (static_cast<int>(min_heap.size()) < k) {
184
12
                    min_heap.push({table_id, qps});
185
12
                } else if (qps > min_heap.top().second) {
186
1
                    min_heap.pop();
187
1
                    min_heap.push({table_id, qps});
188
1
                }
189
13
            }
190
14
        }
191
37
    }
192
193
    // Extract results from heap (comes out in ascending order, reverse to descending)
194
37
    std::vector<Entry> result;
195
37
    result.reserve(min_heap.size());
196
49
    while (!min_heap.empty()) {
197
12
        result.push_back(min_heap.top());
198
12
        min_heap.pop();
199
12
    }
200
37
    std::reverse(result.begin(), result.end());
201
202
37
    return result;
203
41
}
204
205
1
double TableRpcQpsRegistry::get_qps(LoadRelatedRpc rpc_type, int64_t table_id) const {
206
1
    size_t idx = static_cast<size_t>(rpc_type);
207
1
    if (idx >= static_cast<size_t>(LoadRelatedRpc::COUNT)) {
208
0
        return 0;
209
0
    }
210
211
1
    std::shared_lock lock(_mutex);
212
1
    auto it = _counters[idx].find(table_id);
213
1
    if (it != _counters[idx].end()) {
214
1
        return it->second->get_qps();
215
1
    }
216
0
    return 0;
217
1
}
218
219
0
void TableRpcQpsRegistry::cleanup_inactive_tables() {
220
0
    std::unique_lock lock(_mutex);
221
222
0
    for (size_t idx = 0; idx < static_cast<size_t>(LoadRelatedRpc::COUNT); ++idx) {
223
0
        auto& counter_map = _counters[idx];
224
0
        for (auto it = counter_map.begin(); it != counter_map.end();) {
225
            // Remove counters with zero QPS for a long time
226
0
            if (it->second->get_qps() < 0.01) {
227
0
                it = counter_map.erase(it);
228
0
            } else {
229
0
                ++it;
230
0
            }
231
0
        }
232
0
    }
233
0
}
234
235
// ============== TableRpcThrottler ==============
236
237
15
TableRpcThrottler::TableRpcThrottler() {
238
    // Initialize bvar for throttled table counts
239
90
    for (size_t i = 0; i < static_cast<size_t>(LoadRelatedRpc::COUNT); ++i) {
240
75
        std::string bvar_name = fmt::format("ms_rpc_backpressure_throttled_tables_{}",
241
75
                                            load_related_rpc_name(static_cast<LoadRelatedRpc>(i)));
242
75
        _throttled_table_counts[i] = std::make_unique<bvar::Status<size_t>>(bvar_name, 0);
243
75
    }
244
15
}
245
246
std::chrono::steady_clock::time_point TableRpcThrottler::throttle(LoadRelatedRpc rpc_type,
247
6
                                                                  int64_t table_id) {
248
6
    std::shared_lock lock(_mutex);
249
6
    auto it = _limiters.find({rpc_type, table_id});
250
6
    if (it == _limiters.end()) {
251
2
        return std::chrono::steady_clock::now();
252
2
    }
253
4
    return it->second->reserve();
254
6
}
255
256
9
void TableRpcThrottler::set_qps_limit(LoadRelatedRpc rpc_type, int64_t table_id, double qps_limit) {
257
9
    if (qps_limit <= 0) {
258
0
        return;
259
0
    }
260
261
9
    std::unique_lock lock(_mutex);
262
9
    auto key = std::make_pair(rpc_type, table_id);
263
9
    auto it = _limiters.find(key);
264
9
    if (it != _limiters.end()) {
265
0
        it->second->update_qps(qps_limit);
266
9
    } else {
267
9
        _limiters[key] = std::make_unique<StrictQpsLimiter>(qps_limit);
268
        // Update bvar count
269
9
        size_t idx = static_cast<size_t>(rpc_type);
270
9
        if (idx < static_cast<size_t>(LoadRelatedRpc::COUNT)) {
271
9
            size_t count = 0;
272
11
            for (const auto& [k, _] : _limiters) {
273
11
                if (k.first == rpc_type) {
274
10
                    ++count;
275
10
                }
276
11
            }
277
9
            _throttled_table_counts[idx]->set_value(count);
278
9
        }
279
9
    }
280
281
9
    LOG(INFO) << "[ms-throttle] set table QPS limit: rpc=" << load_related_rpc_name(rpc_type)
282
9
              << ", table_id=" << table_id << ", qps_limit=" << qps_limit;
283
9
}
284
285
3
void TableRpcThrottler::remove_qps_limit(LoadRelatedRpc rpc_type, int64_t table_id) {
286
3
    std::unique_lock lock(_mutex);
287
3
    auto key = std::make_pair(rpc_type, table_id);
288
3
    auto it = _limiters.find(key);
289
3
    if (it != _limiters.end()) {
290
3
        _limiters.erase(it);
291
        // Update bvar count
292
3
        size_t idx = static_cast<size_t>(rpc_type);
293
3
        if (idx < static_cast<size_t>(LoadRelatedRpc::COUNT)) {
294
3
            size_t count = 0;
295
3
            for (const auto& [k, _] : _limiters) {
296
1
                if (k.first == rpc_type) {
297
1
                    ++count;
298
1
                }
299
1
            }
300
3
            _throttled_table_counts[idx]->set_value(count);
301
3
        }
302
303
3
        LOG(INFO) << "[ms-throttle] removed table QPS limit: rpc="
304
3
                  << load_related_rpc_name(rpc_type) << ", table_id=" << table_id;
305
3
    }
306
3
}
307
308
4
double TableRpcThrottler::get_qps_limit(LoadRelatedRpc rpc_type, int64_t table_id) const {
309
4
    std::shared_lock lock(_mutex);
310
4
    auto it = _limiters.find({rpc_type, table_id});
311
4
    if (it != _limiters.end()) {
312
3
        return it->second->get_qps();
313
3
    }
314
1
    return 0;
315
4
}
316
317
4
bool TableRpcThrottler::has_limit(LoadRelatedRpc rpc_type, int64_t table_id) const {
318
4
    std::shared_lock lock(_mutex);
319
4
    return _limiters.find({rpc_type, table_id}) != _limiters.end();
320
4
}
321
322
3
size_t TableRpcThrottler::get_throttled_table_count(LoadRelatedRpc rpc_type) const {
323
3
    size_t idx = static_cast<size_t>(rpc_type);
324
3
    if (idx >= static_cast<size_t>(LoadRelatedRpc::COUNT)) {
325
0
        return 0;
326
0
    }
327
3
    return _throttled_table_counts[idx]->get_value();
328
3
}
329
330
1
std::vector<TableRpcThrottler::ThrottleEntry> TableRpcThrottler::get_all_throttled_entries() const {
331
1
    std::shared_lock lock(_mutex);
332
1
    std::vector<ThrottleEntry> entries;
333
1
    entries.reserve(_limiters.size());
334
1
    for (const auto& [key, limiter] : _limiters) {
335
0
        entries.push_back({key.first, key.second, limiter->get_qps()});
336
0
    }
337
1
    return entries;
338
1
}
339
340
// ============== MSBackpressureHandler ==============
341
342
MSBackpressureHandler::MSBackpressureHandler(TableRpcQpsRegistry* qps_registry,
343
                                             TableRpcThrottler* throttler)
344
9
        : _qps_registry(qps_registry),
345
9
          _throttler(throttler),
346
9
          _stop_latch(1),
347
9
          _last_ms_busy_time(std::chrono::steady_clock::time_point::min()) {
348
    // Initialize state machine with config values
349
9
    RpcThrottleParams throttle_params {
350
9
            .top_k = config::ms_backpressure_upgrade_top_k,
351
9
            .ratio = config::ms_backpressure_throttle_ratio,
352
9
            .floor_qps = config::ms_rpc_table_qps_limit_floor,
353
9
    };
354
9
    _state_machine = std::make_unique<RpcThrottleStateMachine>(throttle_params);
355
356
    // Initialize coordinator with config values
357
    // Coordinator uses ticks where 1 tick = 1 millisecond (fixed unit)
358
    // This allows tick_interval_ms to change at runtime without affecting correctness
359
9
    ThrottleCoordinatorParams coordinator_params {
360
9
            .upgrade_cooldown_ticks = config::ms_backpressure_upgrade_interval_ms,
361
9
            .downgrade_after_ticks = config::ms_backpressure_downgrade_interval_ms,
362
9
    };
363
9
    _coordinator = std::make_unique<RpcThrottleCoordinator>(coordinator_params);
364
365
9
    auto st = Thread::create(
366
9
            "MSBackpressureHandler", "tick_thread", [this]() { this->_tick_thread_callback(); },
367
9
            &_tick_thread);
368
9
    if (!st.ok()) {
369
0
        LOG(WARNING) << "[ms-throttle] failed to create tick thread: " << st;
370
9
    } else {
371
9
        LOG(INFO) << "[ms-throttle] handler started: upgrade_cooldown="
372
9
                  << config::ms_backpressure_upgrade_interval_ms
373
9
                  << "ms, downgrade_interval=" << config::ms_backpressure_downgrade_interval_ms
374
9
                  << "ms";
375
9
    }
376
9
}
377
378
8
MSBackpressureHandler::~MSBackpressureHandler() {
379
8
    _stop_latch.count_down();
380
8
    if (_tick_thread) {
381
8
        _tick_thread->join();
382
8
    }
383
8
}
384
385
9
void MSBackpressureHandler::_tick_thread_callback() {
386
    // Fixed tick interval: 1 second. Since 1 tick = 1 ms, advance by 1000 ticks each iteration.
387
9
    constexpr int kTickIntervalMs = 1000;
388
4.31k
    while (!_stop_latch.wait_for(std::chrono::milliseconds(kTickIntervalMs))) {
389
4.30k
        _advance_time(kTickIntervalMs);
390
4.30k
    }
391
9
}
392
393
4.30k
void MSBackpressureHandler::_advance_time(int ticks) {
394
4.30k
    if (!config::enable_ms_backpressure_handling) {
395
4.30k
        return;
396
4.30k
    }
397
398
4
    std::lock_guard lock(_transition_mutex);
399
400
    // Advance coordinator time; if downgrade is triggered, handle it
401
4
    if (_coordinator->tick(ticks)) {
402
1
        LOG(INFO) << "[ms-throttle] triggering downgrade, upgrade_level="
403
1
                  << _state_machine->upgrade_level();
404
405
1
        auto actions = _state_machine->on_downgrade();
406
1
        _apply_actions(actions);
407
1
        _coordinator->set_has_pending_upgrades(_state_machine->upgrade_level() > 0);
408
409
1
        g_backpressure_downgrade_count << 1;
410
1
    }
411
4
}
412
413
9
bool MSBackpressureHandler::on_ms_busy() {
414
9
    g_ms_busy_count << 1;
415
416
9
    if (!config::enable_ms_backpressure_handling) {
417
1
        return false;
418
1
    }
419
420
8
    {
421
8
        std::lock_guard lock(_mutex);
422
8
        _last_ms_busy_time = std::chrono::steady_clock::now();
423
8
    }
424
425
8
    std::lock_guard lock(_transition_mutex);
426
427
    // Check with coordinator if upgrade should be triggered
428
8
    if (!_coordinator->report_ms_busy()) {
429
2
        return false;
430
2
    }
431
432
8
    LOG(INFO) << "[ms-throttle] received MS_BUSY, triggering upgrade";
433
434
6
    auto snapshot = _build_qps_snapshot();
435
6
    auto actions = _state_machine->on_upgrade(snapshot);
436
6
    _apply_actions(actions);
437
6
    _coordinator->set_has_pending_upgrades(_state_machine->upgrade_level() > 0);
438
439
6
    g_backpressure_upgrade_count << 1;
440
6
    return true;
441
8
}
442
443
std::chrono::steady_clock::time_point MSBackpressureHandler::before_rpc(LoadRelatedRpc rpc_type,
444
439k
                                                                        int64_t table_id) {
445
440k
    if (!config::enable_ms_backpressure_handling) {
446
440k
        return std::chrono::steady_clock::now();
447
440k
    }
448
449
18.4E
    return _throttler->throttle(rpc_type, table_id);
450
439k
}
451
452
445k
void MSBackpressureHandler::after_rpc(LoadRelatedRpc rpc_type, int64_t table_id) {
453
445k
    if (!config::enable_ms_backpressure_handling) {
454
445k
        return;
455
445k
    }
456
457
18.4E
    _qps_registry->record(rpc_type, table_id);
458
18.4E
}
459
460
0
void MSBackpressureHandler::update_throttle_params(RpcThrottleParams params) {
461
0
    _state_machine->update_params(params);
462
0
}
463
464
0
void MSBackpressureHandler::update_coordinator_params(ThrottleCoordinatorParams params) {
465
0
    _coordinator->update_params(params);
466
0
}
467
468
3
int64_t MSBackpressureHandler::seconds_since_last_ms_busy() const {
469
3
    std::lock_guard lock(_mutex);
470
3
    if (_last_ms_busy_time == std::chrono::steady_clock::time_point::min()) {
471
1
        return -1; // Never received MS_BUSY
472
1
    }
473
2
    return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() -
474
2
                                                            _last_ms_busy_time)
475
2
            .count();
476
3
}
477
478
2
size_t MSBackpressureHandler::upgrade_level() const {
479
2
    return _state_machine->upgrade_level();
480
2
}
481
482
0
int MSBackpressureHandler::ticks_since_last_ms_busy() const {
483
0
    return _coordinator->ticks_since_last_ms_busy();
484
0
}
485
486
0
int MSBackpressureHandler::ticks_since_last_upgrade() const {
487
0
    return _coordinator->ticks_since_last_upgrade();
488
0
}
489
490
7
void MSBackpressureHandler::_apply_actions(const std::vector<RpcThrottleAction>& actions) {
491
7
    for (const auto& action : actions) {
492
2
        switch (action.type) {
493
1
        case RpcThrottleAction::Type::SET_LIMIT:
494
1
            _throttler->set_qps_limit(action.rpc_type, action.table_id, action.qps_limit);
495
1
            break;
496
1
        case RpcThrottleAction::Type::REMOVE_LIMIT:
497
1
            _throttler->remove_qps_limit(action.rpc_type, action.table_id);
498
1
            break;
499
2
        }
500
2
    }
501
7
}
502
503
6
std::vector<RpcQpsSnapshot> MSBackpressureHandler::_build_qps_snapshot() const {
504
6
    std::vector<RpcQpsSnapshot> snapshot;
505
506
    // For each RPC type, get top-k tables
507
6
    int top_k = _state_machine->get_params().top_k;
508
509
36
    for (size_t i = 0; i < static_cast<size_t>(LoadRelatedRpc::COUNT); ++i) {
510
30
        LoadRelatedRpc rpc_type = static_cast<LoadRelatedRpc>(i);
511
30
        auto top_tables = _qps_registry->get_top_k_tables(rpc_type, top_k);
512
513
30
        for (const auto& [table_id, qps] : top_tables) {
514
1
            snapshot.push_back({rpc_type, table_id, qps});
515
1
        }
516
30
    }
517
518
6
    return snapshot;
519
6
}
520
521
} // namespace doris::cloud