Coverage Report

Created: 2026-03-12 15:22

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
common/cpp/token_bucket_rate_limiter.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "token_bucket_rate_limiter.h"
19
20
#include <bthread/bthread.h>
21
#include <glog/logging.h> // IWYU pragma: export
22
23
#include <chrono>
24
#include <mutex>
25
#include <thread>
26
27
#if defined(__APPLE__)
28
#include <ctime>
29
#endif
30
31
namespace doris {
32
// Just 10^6.
33
static constexpr auto NS = 1000000000UL;
34
35
class TokenBucketRateLimiter::SimpleSpinLock {
36
public:
37
1.61k
    SimpleSpinLock() = default;
38
    ~SimpleSpinLock() = default;
39
40
1.02M
    void lock() {
41
1.02M
        int spin_count = 0;
42
1.02M
        static constexpr int MAX_SPIN_COUNT = 50;
43
1.17M
        while (_flag.test_and_set(std::memory_order_acq_rel)) {
44
148k
            spin_count++;
45
148k
            if (spin_count >= MAX_SPIN_COUNT) {
46
228
                LOG(WARNING) << "Warning: Excessive spinning detected while acquiring lock. Spin "
47
228
                                "count: "
48
228
                             << spin_count;
49
228
                spin_count = 0;
50
228
            }
51
            // Spin until we acquire the lock
52
148k
        }
53
1.02M
    }
54
55
1.03M
    void unlock() { _flag.clear(std::memory_order_release); }
56
57
private:
58
    std::atomic_flag _flag = ATOMIC_FLAG_INIT;
59
};
60
61
TokenBucketRateLimiter::TokenBucketRateLimiter(size_t max_speed, size_t max_burst, size_t limit)
62
1.61k
        : _max_speed(max_speed),
63
1.61k
          _max_burst(max_burst),
64
1.61k
          _limit(limit),
65
1.61k
          _mutex(std::make_unique<TokenBucketRateLimiter::SimpleSpinLock>()),
66
1.61k
          _remain_tokens(max_burst) {}
67
68
1.58k
TokenBucketRateLimiter::~TokenBucketRateLimiter() = default;
69
70
1.57k
TokenBucketRateLimiterHolder::~TokenBucketRateLimiterHolder() = default;
71
72
1.02M
std::pair<size_t, double> TokenBucketRateLimiter::_update_remain_token(long now, size_t amount) {
73
    // Values obtained under lock to be checked after release
74
1.02M
    size_t count_value;
75
1.02M
    double tokens_value;
76
1.02M
    {
77
1.02M
        std::lock_guard<SimpleSpinLock> lock(*_mutex);
78
1.02M
        now = (now < _prev_ns_count) ? _prev_ns_count : now;
79
1.03M
        if (_max_speed) {
80
1.03M
            double delta_seconds =
81
1.03M
                    _prev_ns_count ? static_cast<double>(now - _prev_ns_count) / NS : 0;
82
1.03M
            _remain_tokens = std::min<double>(_remain_tokens + _max_speed * delta_seconds - amount,
83
1.03M
                                              _max_burst);
84
1.03M
        }
85
1.02M
        _count += amount;
86
1.02M
        count_value = _count;
87
1.02M
        tokens_value = _remain_tokens;
88
1.02M
        _prev_ns_count = now;
89
1.02M
    }
90
1.02M
    return {count_value, tokens_value};
91
1.02M
}
92
93
1.02M
int64_t TokenBucketRateLimiter::add(size_t amount) {
94
    // Values obtained under lock to be checked after release
95
1.02M
    auto duration = std::chrono::steady_clock::now().time_since_epoch();
96
1.02M
    auto time_nano_count = std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count();
97
1.02M
    auto [count_value, tokens_value] = _update_remain_token(time_nano_count, amount);
98
99
1.02M
    if (_limit && count_value > _limit) {
100
        // CK would throw exception
101
0
        return -1;
102
0
    }
103
104
    // Wait unless there is positive amount of remain_tokens - throttling
105
1.02M
    int64_t sleep_time_ns = 0;
106
1.03M
    if (_max_speed && tokens_value < 0) {
107
280
        sleep_time_ns = static_cast<int64_t>(-tokens_value / _max_speed * NS);
108
280
        bthread_usleep(sleep_time_ns / 1000);
109
280
    }
110
111
1.02M
    return sleep_time_ns;
112
1.02M
}
113
114
TokenBucketRateLimiterHolder::TokenBucketRateLimiterHolder(size_t max_speed, size_t max_burst,
115
                                                           size_t limit,
116
                                                           std::function<void(int64_t)> metric_func)
117
1.60k
        : rate_limiter(std::make_unique<TokenBucketRateLimiter>(max_speed, max_burst, limit)),
118
1.60k
          metric_func(std::move(metric_func)) {}
119
120
1.02M
int64_t TokenBucketRateLimiterHolder::add(size_t amount) {
121
1.02M
    int64_t sleep;
122
1.02M
    {
123
1.02M
        std::shared_lock read {rate_limiter_rw_lock};
124
1.02M
        sleep = rate_limiter->add(amount);
125
1.02M
    }
126
1.02M
    metric_func(sleep);
127
1.02M
    return sleep;
128
1.02M
}
129
130
13
int TokenBucketRateLimiterHolder::reset(size_t max_speed, size_t max_burst, size_t limit) {
131
13
    {
132
13
        std::unique_lock write {rate_limiter_rw_lock};
133
13
        rate_limiter = std::make_unique<TokenBucketRateLimiter>(max_speed, max_burst, limit);
134
13
    }
135
13
    return 0;
136
13
}
137
138
0
std::string to_string(S3RateLimitType type) {
139
0
    switch (type) {
140
0
    case S3RateLimitType::GET:
141
0
        return "get";
142
0
    case S3RateLimitType::PUT:
143
0
        return "put";
144
0
    default:
145
0
        return std::to_string(static_cast<size_t>(type));
146
0
    }
147
0
}
148
149
0
S3RateLimitType string_to_s3_rate_limit_type(std::string_view value) {
150
0
    if (value == "get") {
151
0
        return S3RateLimitType::GET;
152
0
    } else if (value == "put") {
153
0
        return S3RateLimitType::PUT;
154
0
    }
155
0
    return S3RateLimitType::UNKNOWN;
156
0
}
157
} // namespace doris