Coverage Report

Created: 2024-11-21 17:33

/root/doris/common/cpp/s3_rate_limiter.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "s3_rate_limiter.h"
19
20
#include <glog/logging.h> // IWYU pragma: export
21
22
#include <chrono>
23
#include <mutex>
24
#include <thread>
25
26
#if defined(__APPLE__)
27
#include <ctime>
28
#endif
29
0
#define CURRENT_TIME std::chrono::system_clock::now()
30
31
namespace doris {
32
// Just 10^6.
33
static constexpr auto NS = 1000000000UL;
34
35
class S3RateLimiter::SimpleSpinLock {
36
public:
37
2
    SimpleSpinLock() = default;
38
    ~SimpleSpinLock() = default;
39
40
0
    void lock() {
41
0
        int spin_count = 0;
42
0
        static constexpr int MAX_SPIN_COUNT = 50;
43
0
        while (_flag.test_and_set(std::memory_order_acq_rel)) {
44
0
            spin_count++;
45
0
            if (spin_count >= MAX_SPIN_COUNT) {
46
0
                LOG(WARNING) << "Warning: Excessive spinning detected while acquiring lock. Spin "
47
0
                                "count: ";
48
0
                spin_count = 0;
49
0
            }
50
            // Spin until we acquire the lock
51
0
        }
52
0
    }
53
54
0
    void unlock() { _flag.clear(std::memory_order_release); }
55
56
private:
57
    std::atomic_flag _flag = ATOMIC_FLAG_INIT;
58
};
59
60
S3RateLimiter::S3RateLimiter(size_t max_speed, size_t max_burst, size_t limit)
61
        : _max_speed(max_speed),
62
          _max_burst(max_burst),
63
          _limit(limit),
64
          _mutex(std::make_unique<S3RateLimiter::SimpleSpinLock>()),
65
2
          _remain_tokens(max_burst) {}
66
67
2
S3RateLimiter::~S3RateLimiter() = default;
68
69
2
S3RateLimiterHolder::~S3RateLimiterHolder() = default;
70
71
0
std::pair<size_t, double> S3RateLimiter::_update_remain_token(long now, size_t amount) {
72
    // Values obtained under lock to be checked after release
73
0
    size_t count_value;
74
0
    double tokens_value;
75
0
    {
76
0
        std::lock_guard<SimpleSpinLock> lock(*_mutex);
77
0
        now = (now < _prev_ns_count) ? _prev_ns_count : now;
78
0
        if (_max_speed) {
79
0
            double delta_seconds =
80
0
                    _prev_ns_count ? static_cast<double>(now - _prev_ns_count) / NS : 0;
81
0
            _remain_tokens = std::min<double>(_remain_tokens + _max_speed * delta_seconds - amount,
82
0
                                              _max_burst);
83
0
        }
84
0
        _count += amount;
85
0
        count_value = _count;
86
0
        tokens_value = _remain_tokens;
87
0
        _prev_ns_count = now;
88
0
    }
89
0
    return {count_value, tokens_value};
90
0
}
91
92
0
int64_t S3RateLimiter::add(size_t amount) {
93
    // Values obtained under lock to be checked after release
94
0
    auto time = CURRENT_TIME;
95
0
    auto time_nano_count = time.time_since_epoch().count();
96
0
    auto [count_value, tokens_value] = _update_remain_token(time_nano_count, amount);
97
98
0
    if (_limit && count_value > _limit) {
99
        // CK would throw exception
100
0
        return -1;
101
0
    }
102
103
    // Wait unless there is positive amount of remain_tokens - throttling
104
0
    int64_t sleep_time_ns = 0;
105
0
    if (_max_speed && tokens_value < 0) {
106
0
        sleep_time_ns = static_cast<int64_t>(-tokens_value / _max_speed * NS);
107
0
        std::this_thread::sleep_for(std::chrono::nanoseconds(sleep_time_ns));
108
0
    }
109
110
0
    return sleep_time_ns;
111
0
}
112
113
S3RateLimiterHolder::S3RateLimiterHolder(S3RateLimitType type, size_t max_speed, size_t max_burst,
114
                                         size_t limit, std::function<void(int64_t)> metric_func)
115
        : rate_limiter(std::make_unique<S3RateLimiter>(max_speed, max_burst, limit)),
116
2
          metric_func(std::move(metric_func)) {}
117
118
0
int64_t S3RateLimiterHolder::add(size_t amount) {
119
0
    int64_t sleep;
120
0
    {
121
0
        std::shared_lock read {rate_limiter_rw_lock};
122
0
        sleep = rate_limiter->add(amount);
123
0
    }
124
0
    metric_func(sleep);
125
0
    return sleep;
126
0
}
127
128
0
int S3RateLimiterHolder::reset(size_t max_speed, size_t max_burst, size_t limit) {
129
0
    {
130
0
        std::unique_lock write {rate_limiter_rw_lock};
131
0
        rate_limiter = std::make_unique<S3RateLimiter>(max_speed, max_burst, limit);
132
0
    }
133
0
    return 0;
134
0
}
135
136
0
std::string to_string(S3RateLimitType type) {
137
0
    switch (type) {
138
0
    case S3RateLimitType::GET:
139
0
        return "get";
140
0
    case S3RateLimitType::PUT:
141
0
        return "put";
142
0
    default:
143
0
        return std::to_string(static_cast<size_t>(type));
144
0
    }
145
0
}
146
147
0
S3RateLimitType string_to_s3_rate_limit_type(std::string_view value) {
148
0
    if (value == "get") {
149
0
        return S3RateLimitType::GET;
150
0
    } else if (value == "put") {
151
0
        return S3RateLimitType::PUT;
152
0
    }
153
0
    return S3RateLimitType::UNKNOWN;
154
0
}
155
} // namespace doris