Coverage Report

Created: 2026-05-09 12:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
common/cpp/token_bucket_rate_limiter.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "token_bucket_rate_limiter.h"
19
20
#include <bthread/bthread.h>
21
#include <glog/logging.h> // IWYU pragma: export
22
23
#include <chrono>
24
#include <mutex>
25
#include <thread>
26
27
#if defined(__APPLE__)
28
#include <ctime>
29
#endif
30
31
namespace doris {
32
// Just 10^6.
33
static constexpr auto NS = 1000000000UL;
34
35
class TokenBucketRateLimiter::SimpleSpinLock {
36
public:
37
1.61k
    SimpleSpinLock() = default;
38
    ~SimpleSpinLock() = default;
39
40
2.25k
    void lock() {
41
2.25k
        int spin_count = 0;
42
2.25k
        static constexpr int MAX_SPIN_COUNT = 50;
43
2.25k
        while (_flag.test_and_set(std::memory_order_acq_rel)) {
44
0
            spin_count++;
45
0
            if (spin_count >= MAX_SPIN_COUNT) {
46
0
                LOG(WARNING) << "Warning: Excessive spinning detected while acquiring lock. Spin "
47
0
                                "count: "
48
0
                             << spin_count;
49
0
                spin_count = 0;
50
0
            }
51
            // Spin until we acquire the lock
52
0
        }
53
2.25k
    }
54
55
2.25k
    void unlock() { _flag.clear(std::memory_order_release); }
56
57
private:
58
    std::atomic_flag _flag = ATOMIC_FLAG_INIT;
59
};
60
61
TokenBucketRateLimiter::TokenBucketRateLimiter(size_t max_speed, size_t max_burst, size_t limit)
62
1.61k
        : _max_speed(max_speed),
63
1.61k
          _max_burst(max_burst),
64
1.61k
          _limit(limit),
65
1.61k
          _mutex(std::make_unique<TokenBucketRateLimiter::SimpleSpinLock>()),
66
1.61k
          _remain_tokens(max_burst) {}
67
68
1.58k
TokenBucketRateLimiter::~TokenBucketRateLimiter() = default;
69
70
1.56k
TokenBucketRateLimiterHolder::~TokenBucketRateLimiterHolder() = default;
71
72
2.25k
std::pair<size_t, double> TokenBucketRateLimiter::_update_remain_token(long now, size_t amount) {
73
    // Values obtained under lock to be checked after release
74
2.25k
    size_t count_value;
75
2.25k
    double tokens_value;
76
2.25k
    {
77
2.25k
        std::lock_guard<SimpleSpinLock> lock(*_mutex);
78
2.25k
        now = (now < _prev_ns_count) ? _prev_ns_count : now;
79
2.25k
        if (_max_speed) {
80
2.25k
            double delta_seconds =
81
2.25k
                    _prev_ns_count ? static_cast<double>(now - _prev_ns_count) / NS : 0;
82
2.25k
            _remain_tokens = std::min<double>(_remain_tokens + _max_speed * delta_seconds - amount,
83
2.25k
                                              _max_burst);
84
2.25k
        }
85
2.25k
        _count += amount;
86
2.25k
        count_value = _count;
87
2.25k
        tokens_value = _remain_tokens;
88
2.25k
        _prev_ns_count = now;
89
2.25k
    }
90
2.25k
    return {count_value, tokens_value};
91
2.25k
}
92
93
2.25k
int64_t TokenBucketRateLimiter::add(size_t amount) {
94
    // Values obtained under lock to be checked after release
95
2.25k
    auto duration = std::chrono::steady_clock::now().time_since_epoch();
96
2.25k
    auto time_nano_count = std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count();
97
2.25k
    auto [count_value, tokens_value] = _update_remain_token(time_nano_count, amount);
98
99
2.25k
    if (_limit && count_value > _limit) {
100
        // CK would throw exception
101
0
        return -1;
102
0
    }
103
104
    // Wait unless there is positive amount of remain_tokens - throttling
105
2.25k
    int64_t sleep_time_ns = 0;
106
2.25k
    if (_max_speed && tokens_value < 0) {
107
280
        sleep_time_ns = static_cast<int64_t>(-tokens_value / _max_speed * NS);
108
280
        bthread_usleep(sleep_time_ns / 1000);
109
280
    }
110
111
2.25k
    return sleep_time_ns;
112
2.25k
}
113
114
TokenBucketRateLimiterHolder::TokenBucketRateLimiterHolder(size_t max_speed, size_t max_burst,
115
                                                           size_t limit,
116
                                                           std::function<void(int64_t)> metric_func)
117
1.60k
        : rate_limiter(std::make_unique<TokenBucketRateLimiter>(max_speed, max_burst, limit)),
118
1.60k
          metric_func(std::move(metric_func)) {}
119
120
2.25k
int64_t TokenBucketRateLimiterHolder::add(size_t amount) {
121
2.25k
    int64_t sleep;
122
2.25k
    {
123
2.25k
        std::shared_lock read {rate_limiter_rw_lock};
124
2.25k
        sleep = rate_limiter->add(amount);
125
2.25k
    }
126
2.25k
    metric_func(sleep);
127
2.25k
    return sleep;
128
2.25k
}
129
130
11
int TokenBucketRateLimiterHolder::reset(size_t max_speed, size_t max_burst, size_t limit) {
131
11
    {
132
11
        std::unique_lock write {rate_limiter_rw_lock};
133
11
        rate_limiter = std::make_unique<TokenBucketRateLimiter>(max_speed, max_burst, limit);
134
11
    }
135
11
    return 0;
136
11
}
137
138
0
std::string to_string(S3RateLimitType type) {
139
0
    switch (type) {
140
0
    case S3RateLimitType::GET:
141
0
        return "get";
142
0
    case S3RateLimitType::PUT:
143
0
        return "put";
144
0
    default:
145
0
        return std::to_string(static_cast<size_t>(type));
146
0
    }
147
0
}
148
149
0
S3RateLimitType string_to_s3_rate_limit_type(std::string_view value) {
150
0
    if (value == "get") {
151
0
        return S3RateLimitType::GET;
152
0
    } else if (value == "put") {
153
0
        return S3RateLimitType::PUT;
154
0
    }
155
0
    return S3RateLimitType::UNKNOWN;
156
0
}
157
} // namespace doris