Coverage Report

Created: 2026-06-18 00:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/bthread_shared_mutex.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <bthread/condition_variable.h>
21
#include <bthread/mutex.h>
22
23
#include <climits>
24
#include <mutex>
25
26
namespace doris {
27
28
// A reader-writer lock for bthread contexts. It is a port of libc++'s
29
// std::shared_mutex (the two-gate condition-variable algorithm) onto
30
// bthread::Mutex/bthread::ConditionVariable. Unlike std::shared_mutex
31
// (pthread_rwlock_t), ownership carries no OS-thread identity, so it is safe to
32
// lock on one bthread worker and unlock on another after a bthread migrates.
33
// Satisfies the C++ SharedMutex requirements (usable with std::unique_lock /
34
// std::shared_lock). Writer-preferring.
35
class BthreadSharedMutex {
36
public:
37
286k
    BthreadSharedMutex() = default;
38
144k
    ~BthreadSharedMutex() = default;
39
40
    BthreadSharedMutex(const BthreadSharedMutex&) = delete;
41
    BthreadSharedMutex& operator=(const BthreadSharedMutex&) = delete;
42
43
978k
    void lock() {
44
978k
        std::unique_lock<bthread::Mutex> lk(_mutex);
45
1.01M
        while (_state & _write_entered) {
46
31.3k
            _gate1.wait(lk);
47
31.3k
        }
48
978k
        _state |= _write_entered;
49
982k
        while (_state & _n_readers) {
50
3.49k
            _gate2.wait(lk);
51
3.49k
        }
52
978k
    }
53
54
    bool try_lock() {
55
        std::unique_lock<bthread::Mutex> lk(_mutex);
56
        if (_state == 0) {
57
            _state = _write_entered;
58
            return true;
59
        }
60
        return false;
61
    }
62
63
978k
    void unlock() {
64
978k
        std::lock_guard<bthread::Mutex> lk(_mutex);
65
978k
        _state = 0;
66
        // Notify while still holding `_mutex`. Releasing the mutex before
67
        // notifying can lose a wakeup with bthread::ConditionVariable when the
68
        // waiter is a pthread and the notifier is a bthread (or vice versa).
69
978k
        _gate1.notify_all();
70
978k
    }
71
72
36.3M
    void lock_shared() {
73
36.3M
        std::unique_lock<bthread::Mutex> lk(_mutex);
74
36.4M
        while ((_state & _write_entered) || (_state & _n_readers) == _n_readers) {
75
21.2k
            _gate1.wait(lk);
76
21.2k
        }
77
36.3M
        unsigned num_readers = (_state & _n_readers) + 1;
78
36.3M
        _state &= ~_n_readers;
79
36.3M
        _state |= num_readers;
80
36.3M
    }
81
82
    bool try_lock_shared() {
83
        std::unique_lock<bthread::Mutex> lk(_mutex);
84
        unsigned num_readers = _state & _n_readers;
85
        if (!(_state & _write_entered) && num_readers != _n_readers) {
86
            ++num_readers;
87
            _state &= ~_n_readers;
88
            _state |= num_readers;
89
            return true;
90
        }
91
        return false;
92
    }
93
94
36.4M
    void unlock_shared() {
95
36.4M
        std::unique_lock<bthread::Mutex> lk(_mutex);
96
36.4M
        unsigned num_readers = (_state & _n_readers) - 1;
97
36.4M
        _state &= ~_n_readers;
98
36.4M
        _state |= num_readers;
99
        // Notify while still holding `_mutex` (do not unlock first): with
100
        // bthread::ConditionVariable a notify issued after releasing the mutex
101
        // can be lost across pthread/bthread contexts, leaving a writer parked
102
        // forever on a reader count that is already zero.
103
36.4M
        if (_state & _write_entered) {
104
7.59k
            if (num_readers == 0) {
105
3.49k
                _gate2.notify_one();
106
3.49k
            }
107
36.4M
        } else {
108
36.4M
            if (num_readers == _n_readers - 1) {
109
0
                _gate1.notify_one();
110
0
            }
111
36.4M
        }
112
36.4M
    }
113
114
private:
115
    bthread::Mutex _mutex;
116
    bthread::ConditionVariable _gate1;
117
    bthread::ConditionVariable _gate2;
118
    unsigned _state {0};
119
120
    // High bit: a writer has entered. Remaining bits: active reader count.
121
    static constexpr unsigned _write_entered = 1U << (sizeof(unsigned) * CHAR_BIT - 1);
122
    static constexpr unsigned _n_readers = ~_write_entered;
123
};
124
125
} // namespace doris