be/src/storage/compaction/compaction_permit_limiter.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/compaction/compaction_permit_limiter.h" |
19 | | |
20 | | #include "common/config.h" |
21 | | #include "common/metrics/doris_metrics.h" |
22 | | |
23 | | namespace doris { |
24 | | |
25 | 339 | CompactionPermitLimiter::CompactionPermitLimiter() : _used_permits(0) {} |
26 | | |
27 | 32 | void CompactionPermitLimiter::request(int64_t permits) { |
28 | 32 | DorisMetrics::instance()->compaction_waitting_permits->set_value(permits); |
29 | | // 1. config::total_permits_for_compaction_score = 20000 |
30 | | // 2. Thread-B requests permits 11000, used_permits = 11000 |
31 | | // 3. Thread-A requests permits 12000,wait for used_permits + 12000 <= 20000 |
32 | | // 4. adjust config::total_permits_for_compaction_score = 10000 |
33 | | // 5. Thread-B releases permits,used_permits = 0,notify Thread-A,used_permits + 12000 <= 10000 |
34 | | // we need to initialize total_permits instead of using the config. |
35 | 32 | int64_t total_permits = config::total_permits_for_compaction_score; |
36 | 32 | if (permits > total_permits) { |
37 | | // when tablet's compaction score is larger than "config::total_permits_for_compaction_score", |
38 | | // it's necessary to do compaction for this tablet because this tablet will not get "permits" |
39 | | // anyway. otherwise, compaction task for this tablet will not be executed forever. |
40 | 0 | std::unique_lock<std::mutex> lock(_permits_mutex); |
41 | 0 | _permits_cv.wait(lock, [permits, total_permits, this] { |
42 | 0 | return _used_permits == 0 || _used_permits + permits <= total_permits; |
43 | 0 | }); |
44 | 32 | } else { |
45 | 32 | if (_used_permits + permits > total_permits) { |
46 | 0 | std::unique_lock<std::mutex> lock(_permits_mutex); |
47 | 0 | _permits_cv.wait(lock, [permits, total_permits, this] { |
48 | 0 | return _used_permits + permits <= total_permits; |
49 | 0 | }); |
50 | 0 | } |
51 | 32 | } |
52 | 32 | _used_permits += permits; |
53 | 32 | DorisMetrics::instance()->compaction_waitting_permits->set_value(0); |
54 | 32 | DorisMetrics::instance()->compaction_used_permits->set_value(_used_permits); |
55 | 32 | } |
56 | | |
57 | 19 | void CompactionPermitLimiter::release(int64_t permits) { |
58 | 19 | std::unique_lock<std::mutex> lock(_permits_mutex); |
59 | 19 | _used_permits -= permits; |
60 | 19 | _permits_cv.notify_one(); |
61 | 19 | DorisMetrics::instance()->compaction_used_permits->set_value(_used_permits); |
62 | 19 | } |
63 | | } // namespace doris |