Coverage Report

Created: 2026-06-18 17:10

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/bthread_shared_mutex.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <bthread/condition_variable.h>
21
#include <bthread/mutex.h>
22
23
#include <climits>
24
#include <mutex>
25
26
namespace doris {
27
28
// A reader-writer lock for bthread contexts. It is a port of libc++'s
29
// std::shared_mutex (the two-gate condition-variable algorithm) onto
30
// bthread::Mutex/bthread::ConditionVariable. Unlike std::shared_mutex
31
// (pthread_rwlock_t), ownership carries no OS-thread identity, so it is safe to
32
// lock on one bthread worker and unlock on another after a bthread migrates.
33
// Satisfies the C++ SharedMutex requirements (usable with std::unique_lock /
34
// std::shared_lock). Writer-preferring.
35
class BthreadSharedMutex {
36
public:
37
720
    BthreadSharedMutex() = default;
38
720
    ~BthreadSharedMutex() = default;
39
40
    BthreadSharedMutex(const BthreadSharedMutex&) = delete;
41
    BthreadSharedMutex& operator=(const BthreadSharedMutex&) = delete;
42
43
9.71k
    void lock() {
44
9.71k
        std::unique_lock<bthread::Mutex> lk(_mutex);
45
30.0k
        while (_state & _write_entered) {
46
20.3k
            _gate1.wait(lk);
47
20.3k
        }
48
9.71k
        _state |= _write_entered;
49
11.4k
        while (_state & _n_readers) {
50
1.75k
            _gate2.wait(lk);
51
1.75k
        }
52
9.71k
    }
53
54
9
    bool try_lock() {
55
9
        std::unique_lock<bthread::Mutex> lk(_mutex);
56
9
        if (_state == 0) {
57
5
            _state = _write_entered;
58
5
            return true;
59
5
        }
60
4
        return false;
61
9
    }
62
63
9.72k
    void unlock() {
64
9.72k
        std::lock_guard<bthread::Mutex> lk(_mutex);
65
9.72k
        _state = 0;
66
        // Notify while still holding `_mutex`. Releasing the mutex before
67
        // notifying can lose a wakeup with bthread::ConditionVariable when the
68
        // waiter is a pthread and the notifier is a bthread (or vice versa).
69
9.72k
        _gate1.notify_all();
70
9.72k
    }
71
72
14.6k
    void lock_shared() {
73
14.6k
        std::unique_lock<bthread::Mutex> lk(_mutex);
74
42.0k
        while ((_state & _write_entered) || (_state & _n_readers) == _n_readers) {
75
27.3k
            _gate1.wait(lk);
76
27.3k
        }
77
14.6k
        unsigned num_readers = (_state & _n_readers) + 1;
78
14.6k
        _state &= ~_n_readers;
79
14.6k
        _state |= num_readers;
80
14.6k
    }
81
82
9
    bool try_lock_shared() {
83
9
        std::unique_lock<bthread::Mutex> lk(_mutex);
84
9
        unsigned num_readers = _state & _n_readers;
85
9
        if (!(_state & _write_entered) && num_readers != _n_readers) {
86
6
            ++num_readers;
87
6
            _state &= ~_n_readers;
88
6
            _state |= num_readers;
89
6
            return true;
90
6
        }
91
3
        return false;
92
9
    }
93
94
14.6k
    void unlock_shared() {
95
14.6k
        std::unique_lock<bthread::Mutex> lk(_mutex);
96
14.6k
        unsigned num_readers = (_state & _n_readers) - 1;
97
14.6k
        _state &= ~_n_readers;
98
14.6k
        _state |= num_readers;
99
        // Notify while still holding `_mutex` (do not unlock first): with
100
        // bthread::ConditionVariable a notify issued after releasing the mutex
101
        // can be lost across pthread/bthread contexts, leaving a writer parked
102
        // forever on a reader count that is already zero.
103
14.6k
        if (_state & _write_entered) {
104
6.98k
            if (num_readers == 0) {
105
1.75k
                _gate2.notify_one();
106
1.75k
            }
107
7.71k
        } else {
108
7.71k
            if (num_readers == _n_readers - 1) {
109
0
                _gate1.notify_one();
110
0
            }
111
7.71k
        }
112
14.6k
    }
113
114
private:
115
    bthread::Mutex _mutex;
116
    bthread::ConditionVariable _gate1;
117
    bthread::ConditionVariable _gate2;
118
    unsigned _state {0};
119
120
    // High bit: a writer has entered. Remaining bits: active reader count.
121
    static constexpr unsigned _write_entered = 1U << (sizeof(unsigned) * CHAR_BIT - 1);
122
    static constexpr unsigned _n_readers = ~_write_entered;
123
};
124
125
} // namespace doris