Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <bthread/condition_variable.h> |
21 | | #include <bthread/mutex.h> |
22 | | |
23 | | #include <climits> |
24 | | #include <mutex> |
25 | | |
26 | | namespace doris { |
27 | | |
28 | | // A reader-writer lock for bthread contexts. It is a port of libc++'s |
29 | | // std::shared_mutex (the two-gate condition-variable algorithm) onto |
30 | | // bthread::Mutex/bthread::ConditionVariable. Unlike std::shared_mutex |
31 | | // (pthread_rwlock_t), ownership carries no OS-thread identity, so it is safe to |
32 | | // lock on one bthread worker and unlock on another after a bthread migrates. |
33 | | // Satisfies the C++ SharedMutex requirements (usable with std::unique_lock / |
34 | | // std::shared_lock). Writer-preferring. |
35 | | class BthreadSharedMutex { |
36 | | public: |
37 | 286k | BthreadSharedMutex() = default; |
38 | 144k | ~BthreadSharedMutex() = default; |
39 | | |
40 | | BthreadSharedMutex(const BthreadSharedMutex&) = delete; |
41 | | BthreadSharedMutex& operator=(const BthreadSharedMutex&) = delete; |
42 | | |
43 | 978k | void lock() { |
44 | 978k | std::unique_lock<bthread::Mutex> lk(_mutex); |
45 | 1.01M | while (_state & _write_entered) { |
46 | 31.3k | _gate1.wait(lk); |
47 | 31.3k | } |
48 | 978k | _state |= _write_entered; |
49 | 982k | while (_state & _n_readers) { |
50 | 3.49k | _gate2.wait(lk); |
51 | 3.49k | } |
52 | 978k | } |
53 | | |
54 | | bool try_lock() { |
55 | | std::unique_lock<bthread::Mutex> lk(_mutex); |
56 | | if (_state == 0) { |
57 | | _state = _write_entered; |
58 | | return true; |
59 | | } |
60 | | return false; |
61 | | } |
62 | | |
63 | 978k | void unlock() { |
64 | 978k | std::lock_guard<bthread::Mutex> lk(_mutex); |
65 | 978k | _state = 0; |
66 | | // Notify while still holding `_mutex`. Releasing the mutex before |
67 | | // notifying can lose a wakeup with bthread::ConditionVariable when the |
68 | | // waiter is a pthread and the notifier is a bthread (or vice versa). |
69 | 978k | _gate1.notify_all(); |
70 | 978k | } |
71 | | |
72 | 36.3M | void lock_shared() { |
73 | 36.3M | std::unique_lock<bthread::Mutex> lk(_mutex); |
74 | 36.4M | while ((_state & _write_entered) || (_state & _n_readers) == _n_readers) { |
75 | 21.2k | _gate1.wait(lk); |
76 | 21.2k | } |
77 | 36.3M | unsigned num_readers = (_state & _n_readers) + 1; |
78 | 36.3M | _state &= ~_n_readers; |
79 | 36.3M | _state |= num_readers; |
80 | 36.3M | } |
81 | | |
82 | | bool try_lock_shared() { |
83 | | std::unique_lock<bthread::Mutex> lk(_mutex); |
84 | | unsigned num_readers = _state & _n_readers; |
85 | | if (!(_state & _write_entered) && num_readers != _n_readers) { |
86 | | ++num_readers; |
87 | | _state &= ~_n_readers; |
88 | | _state |= num_readers; |
89 | | return true; |
90 | | } |
91 | | return false; |
92 | | } |
93 | | |
94 | 36.4M | void unlock_shared() { |
95 | 36.4M | std::unique_lock<bthread::Mutex> lk(_mutex); |
96 | 36.4M | unsigned num_readers = (_state & _n_readers) - 1; |
97 | 36.4M | _state &= ~_n_readers; |
98 | 36.4M | _state |= num_readers; |
99 | | // Notify while still holding `_mutex` (do not unlock first): with |
100 | | // bthread::ConditionVariable a notify issued after releasing the mutex |
101 | | // can be lost across pthread/bthread contexts, leaving a writer parked |
102 | | // forever on a reader count that is already zero. |
103 | 36.4M | if (_state & _write_entered) { |
104 | 7.59k | if (num_readers == 0) { |
105 | 3.49k | _gate2.notify_one(); |
106 | 3.49k | } |
107 | 36.4M | } else { |
108 | 36.4M | if (num_readers == _n_readers - 1) { |
109 | 0 | _gate1.notify_one(); |
110 | 0 | } |
111 | 36.4M | } |
112 | 36.4M | } |
113 | | |
114 | | private: |
115 | | bthread::Mutex _mutex; |
116 | | bthread::ConditionVariable _gate1; |
117 | | bthread::ConditionVariable _gate2; |
118 | | unsigned _state {0}; |
119 | | |
120 | | // High bit: a writer has entered. Remaining bits: active reader count. |
121 | | static constexpr unsigned _write_entered = 1U << (sizeof(unsigned) * CHAR_BIT - 1); |
122 | | static constexpr unsigned _n_readers = ~_write_entered; |
123 | | }; |
124 | | |
125 | | } // namespace doris |