Coverage Report

Created: 2025-03-12 00:38

/root/doris/common/cpp/s3_rate_limiter.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "s3_rate_limiter.h"
19
20
#include <glog/logging.h> // IWYU pragma: export
21
22
#include <chrono>
23
#include <mutex>
24
#include <thread>
25
26
#if defined(__APPLE__)
27
#include <ctime>
28
#endif
29
30
namespace doris {
31
// Just 10^6.
32
static constexpr auto NS = 1000000000UL;
33
34
class S3RateLimiter::SimpleSpinLock {
35
public:
36
2
    SimpleSpinLock() = default;
37
    ~SimpleSpinLock() = default;
38
39
0
    void lock() {
40
0
        int spin_count = 0;
41
0
        static constexpr int MAX_SPIN_COUNT = 50;
42
0
        while (_flag.test_and_set(std::memory_order_acq_rel)) {
43
0
            spin_count++;
44
0
            if (spin_count >= MAX_SPIN_COUNT) {
45
0
                LOG(WARNING) << "Warning: Excessive spinning detected while acquiring lock. Spin "
46
0
                                "count: " << spin_count;
47
0
                spin_count = 0;
48
0
            }
49
            // Spin until we acquire the lock
50
0
        }
51
0
    }
52
53
0
    void unlock() { _flag.clear(std::memory_order_release); }
54
55
private:
56
    std::atomic_flag _flag = ATOMIC_FLAG_INIT;
57
};
58
59
S3RateLimiter::S3RateLimiter(size_t max_speed, size_t max_burst, size_t limit)
60
        : _max_speed(max_speed),
61
          _max_burst(max_burst),
62
          _limit(limit),
63
          _mutex(std::make_unique<S3RateLimiter::SimpleSpinLock>()),
64
2
          _remain_tokens(max_burst) {}
65
66
2
S3RateLimiter::~S3RateLimiter() = default;
67
68
2
S3RateLimiterHolder::~S3RateLimiterHolder() = default;
69
70
0
std::pair<size_t, double> S3RateLimiter::_update_remain_token(long now, size_t amount) {
71
    // Values obtained under lock to be checked after release
72
0
    size_t count_value;
73
0
    double tokens_value;
74
0
    {
75
0
        std::lock_guard<SimpleSpinLock> lock(*_mutex);
76
0
        now = (now < _prev_ns_count) ? _prev_ns_count : now;
77
0
        if (_max_speed) {
78
0
            double delta_seconds =
79
0
                    _prev_ns_count ? static_cast<double>(now - _prev_ns_count) / NS : 0;
80
0
            _remain_tokens = std::min<double>(_remain_tokens + _max_speed * delta_seconds - amount,
81
0
                                              _max_burst);
82
0
        }
83
0
        _count += amount;
84
0
        count_value = _count;
85
0
        tokens_value = _remain_tokens;
86
0
        _prev_ns_count = now;
87
0
    }
88
0
    return {count_value, tokens_value};
89
0
}
90
91
0
int64_t S3RateLimiter::add(size_t amount) {
92
    // Values obtained under lock to be checked after release
93
0
    auto duration = std::chrono::steady_clock::now().time_since_epoch();
94
0
    auto time_nano_count = std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count();
95
0
    auto [count_value, tokens_value] = _update_remain_token(time_nano_count, amount);
96
97
0
    if (_limit && count_value > _limit) {
98
        // CK would throw exception
99
0
        return -1;
100
0
    }
101
102
    // Wait unless there is positive amount of remain_tokens - throttling
103
0
    int64_t sleep_time_ns = 0;
104
0
    if (_max_speed && tokens_value < 0) {
105
0
        sleep_time_ns = static_cast<int64_t>(-tokens_value / _max_speed * NS);
106
0
        std::this_thread::sleep_for(std::chrono::nanoseconds(sleep_time_ns));
107
0
    }
108
109
0
    return sleep_time_ns;
110
0
}
111
112
S3RateLimiterHolder::S3RateLimiterHolder(size_t max_speed, size_t max_burst, size_t limit,
113
                                         std::function<void(int64_t)> metric_func)
114
        : rate_limiter(std::make_unique<S3RateLimiter>(max_speed, max_burst, limit)),
115
2
          metric_func(std::move(metric_func)) {}
116
117
0
int64_t S3RateLimiterHolder::add(size_t amount) {
118
0
    int64_t sleep;
119
0
    {
120
0
        std::shared_lock read {rate_limiter_rw_lock};
121
0
        sleep = rate_limiter->add(amount);
122
0
    }
123
0
    metric_func(sleep);
124
0
    return sleep;
125
0
}
126
127
0
int S3RateLimiterHolder::reset(size_t max_speed, size_t max_burst, size_t limit) {
128
0
    {
129
0
        std::unique_lock write {rate_limiter_rw_lock};
130
0
        rate_limiter = std::make_unique<S3RateLimiter>(max_speed, max_burst, limit);
131
0
    }
132
0
    return 0;
133
0
}
134
135
0
std::string to_string(S3RateLimitType type) {
136
0
    switch (type) {
137
0
    case S3RateLimitType::GET:
138
0
        return "get";
139
0
    case S3RateLimitType::PUT:
140
0
        return "put";
141
0
    default:
142
0
        return std::to_string(static_cast<size_t>(type));
143
0
    }
144
0
}
145
146
0
S3RateLimitType string_to_s3_rate_limit_type(std::string_view value) {
147
0
    if (value == "get") {
148
0
        return S3RateLimitType::GET;
149
0
    } else if (value == "put") {
150
0
        return S3RateLimitType::PUT;
151
0
    }
152
0
    return S3RateLimitType::UNKNOWN;
153
0
}
154
} // namespace doris