/root/doris/be/src/common/sync_point.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | // Most code of this file is copied from rocksdb SyncPoint. |
19 | | // https://github.com/facebook/rocksdb |
20 | | |
21 | | // clang-format off |
22 | | #include "sync_point.h" |
23 | | |
24 | | #include <atomic> |
25 | | #include <condition_variable> |
26 | | #include <functional> |
27 | | #include <mutex> |
28 | | #include <random> |
29 | | #include <string> |
30 | | #include <thread> |
31 | | #include <unordered_map> |
32 | | #include <unordered_set> |
33 | | |
34 | | namespace doris { |
35 | | |
36 | | struct SyncPoint::Data { // impl |
37 | | public: |
38 | 1 | Data() : enabled_(false) { } |
39 | 1 | virtual ~Data() {} |
40 | | void process(const std::string& point, std::vector<std::any>&& cb_args); |
41 | | void load_dependency(const std::vector<SyncPointPair>& dependencies); |
42 | | void load_dependency_and_markers( |
43 | | const std::vector<SyncPointPair>& dependencies, |
44 | | const std::vector<SyncPointPair>& markers); |
45 | | bool predecessors_all_cleared(const std::string& point); |
46 | | void set_call_back(const std::string& point, |
47 | | const std::function<void(std::vector<std::any>&&)>& callback); |
48 | | void clear_call_back(const std::string& point); |
49 | | void clear_all_call_backs(); |
50 | | void enable_processing(); |
51 | | void disable_processing(); |
52 | | void clear_trace(); |
53 | | private: |
54 | | bool disable_by_marker(const std::string& point, std::thread::id thread_id); |
55 | | private: |
56 | | // successor/predecessor map loaded from load_dependency |
57 | | std::unordered_map<std::string, std::vector<std::string>> successors_; |
58 | | std::unordered_map<std::string, std::vector<std::string>> predecessors_; |
59 | | std::unordered_map<std::string, std::function<void(std::vector<std::any>&&)>> callbacks_; |
60 | | std::unordered_map<std::string, std::vector<std::string>> markers_; |
61 | | std::unordered_map<std::string, std::thread::id> marked_thread_id_; |
62 | | std::mutex mutex_; |
63 | | std::condition_variable cv_; |
64 | | // sync points that have been passed through |
65 | | std::unordered_set<std::string> cleared_points_; |
66 | | std::atomic<bool> enabled_; |
67 | | int num_callbacks_running_ = 0; |
68 | | }; |
69 | | |
70 | 70.2k | SyncPoint* SyncPoint::get_instance() { |
71 | 70.2k | static SyncPoint sync_point; |
72 | 70.2k | return &sync_point; |
73 | 70.2k | } |
74 | | SyncPoint::SyncPoint() : |
75 | 1 | impl_(new Data) { |
76 | 1 | } |
77 | 1 | SyncPoint:: ~SyncPoint() { |
78 | 1 | delete impl_; |
79 | 1 | } |
80 | 0 | void SyncPoint::load_dependency(const std::vector<SyncPointPair>& dependencies) { |
81 | 0 | impl_->load_dependency(dependencies); |
82 | 0 | } |
83 | | void SyncPoint::load_dependency_and_markers( |
84 | | const std::vector<SyncPointPair>& dependencies, |
85 | 0 | const std::vector<SyncPointPair>& markers) { |
86 | 0 | impl_->load_dependency_and_markers(dependencies, markers); |
87 | 0 | } |
88 | | void SyncPoint::set_call_back(const std::string& point, |
89 | 9 | const std::function<void(std::vector<std::any>&&)>& callback) { |
90 | 9 | impl_->set_call_back(point, callback); |
91 | 9 | } |
92 | 4 | void SyncPoint::clear_call_back(const std::string& point) { |
93 | 4 | impl_->clear_call_back(point); |
94 | 4 | } |
95 | 0 | void SyncPoint::clear_all_call_backs() { |
96 | 0 | impl_->clear_all_call_backs(); |
97 | 0 | } |
98 | 3 | void SyncPoint::enable_processing() { |
99 | 3 | impl_->enable_processing(); |
100 | 3 | } |
101 | 0 | void SyncPoint::disable_processing() { |
102 | 0 | impl_->disable_processing(); |
103 | 0 | } |
104 | 0 | void SyncPoint::clear_trace() { |
105 | 0 | impl_->clear_trace(); |
106 | 0 | } |
107 | 70.2k | void SyncPoint::process(const std::string& point, std::vector<std::any>&& cb_arg) { |
108 | 70.2k | impl_->process(point, std::move(cb_arg)); |
109 | 70.2k | } |
110 | | |
111 | | // ============================================================================= |
112 | | // SyncPoint implementation |
113 | | // ============================================================================= |
114 | | |
115 | | void SyncPoint::Data::load_dependency( |
116 | 0 | const std::vector<SyncPointPair>& dependencies) { |
117 | 0 | std::lock_guard lock(mutex_); |
118 | 0 | successors_.clear(); |
119 | 0 | predecessors_.clear(); |
120 | 0 | cleared_points_.clear(); |
121 | 0 | for (const auto& dependency : dependencies) { |
122 | 0 | successors_[dependency.predecessor].push_back(dependency.successor); |
123 | 0 | predecessors_[dependency.successor].push_back(dependency.predecessor); |
124 | 0 | } |
125 | 0 | cv_.notify_all(); |
126 | 0 | } |
127 | | |
128 | | /** |
129 | | * Markers are also dependency descriptions |
130 | | */ |
131 | | void SyncPoint::Data::load_dependency_and_markers( |
132 | | const std::vector<SyncPointPair>& dependencies, |
133 | 0 | const std::vector<SyncPointPair>& markers) { |
134 | 0 | std::lock_guard lock(mutex_); |
135 | 0 | successors_.clear(); |
136 | 0 | predecessors_.clear(); |
137 | 0 | cleared_points_.clear(); |
138 | 0 | markers_.clear(); |
139 | 0 | marked_thread_id_.clear(); |
140 | 0 | for (const auto& dependency : dependencies) { |
141 | 0 | successors_[dependency.predecessor].push_back(dependency.successor); |
142 | 0 | predecessors_[dependency.successor].push_back(dependency.predecessor); |
143 | 0 | } |
144 | 0 | for (const auto& marker : markers) { |
145 | 0 | successors_[marker.predecessor].push_back(marker.successor); |
146 | 0 | predecessors_[marker.successor].push_back(marker.predecessor); |
147 | 0 | markers_[marker.predecessor].push_back(marker.successor); |
148 | 0 | } |
149 | 0 | cv_.notify_all(); |
150 | 0 | } |
151 | | |
152 | 70.0k | bool SyncPoint::Data::predecessors_all_cleared(const std::string& point) { |
153 | 70.0k | for (const auto& pred : predecessors_[point]) { |
154 | 0 | if (cleared_points_.count(pred) == 0) { |
155 | 0 | return false; |
156 | 0 | } |
157 | 0 | } |
158 | 70.0k | return true; |
159 | 70.0k | } |
160 | | |
161 | 4 | void SyncPoint::Data::clear_call_back(const std::string& point) { |
162 | 4 | std::unique_lock lock(mutex_); |
163 | 4 | callbacks_.erase(point); |
164 | 4 | } |
165 | | |
166 | 0 | void SyncPoint::Data::clear_all_call_backs() { |
167 | 0 | std::unique_lock lock(mutex_); |
168 | 0 | callbacks_.clear(); |
169 | 0 | } |
170 | | |
171 | 70.2k | void SyncPoint::Data::process(const std::string& point, std::vector<std::any>&& cb_arg) { |
172 | 70.2k | if (!enabled_) { |
173 | 248 | return; |
174 | 248 | } |
175 | 70.0k | std::unique_lock lock(mutex_); |
176 | 70.0k | auto thread_id = std::this_thread::get_id(); |
177 | 70.0k | auto marker_iter = markers_.find(point); |
178 | | // if current sync point is a marker |
179 | | // record it in marked_thread_id_ for all its successors |
180 | 70.0k | if (marker_iter != markers_.end()) { |
181 | 0 | for (auto& marked_point : marker_iter->second) { |
182 | 0 | marked_thread_id_.emplace(marked_point, thread_id); |
183 | 0 | } |
184 | 0 | } |
185 | | // if current point is a marker's successor |
186 | 70.0k | if (disable_by_marker(point, thread_id)) { |
187 | 0 | return; |
188 | 0 | } |
189 | 70.0k | while (!predecessors_all_cleared(point)) { |
190 | 0 | cv_.wait(lock); |
191 | 0 | if (disable_by_marker(point, thread_id)) { |
192 | 0 | return; |
193 | 0 | } |
194 | 0 | } |
195 | 70.0k | auto callback_pair = callbacks_.find(point); |
196 | 70.0k | if (callback_pair != callbacks_.end()) { |
197 | 22 | num_callbacks_running_++; |
198 | 22 | auto callback = callback_pair->second; |
199 | 22 | mutex_.unlock(); |
200 | 22 | callback(std::move(cb_arg)); |
201 | 22 | mutex_.lock(); |
202 | 22 | num_callbacks_running_--; |
203 | 22 | } |
204 | 70.0k | cleared_points_.insert(point); |
205 | 70.0k | cv_.notify_all(); |
206 | 70.0k | } |
207 | | |
208 | | bool SyncPoint::Data::disable_by_marker(const std::string& point, |
209 | 70.0k | std::thread::id thread_id) { |
210 | 70.0k | auto marked_point_iter = marked_thread_id_.find(point); |
211 | 70.0k | return marked_point_iter != marked_thread_id_.end() // is a successor |
212 | 70.0k | && thread_id != marked_point_iter->second; |
213 | 70.0k | } |
214 | | |
215 | | void SyncPoint::Data::set_call_back(const std::string& point, |
216 | 9 | const std::function<void(std::vector<std::any>&&)>& callback) { |
217 | 9 | std::lock_guard lock(mutex_); |
218 | 9 | callbacks_[point] = callback; |
219 | 9 | } |
220 | | |
221 | 0 | void SyncPoint::Data::clear_trace() { |
222 | 0 | std::lock_guard lock(mutex_); |
223 | 0 | cleared_points_.clear(); |
224 | 0 | } |
225 | | |
226 | 3 | void SyncPoint::Data::enable_processing() { |
227 | 3 | enabled_ = true; |
228 | 3 | } |
229 | | |
230 | 0 | void SyncPoint::Data::disable_processing() { |
231 | 0 | enabled_ = false; |
232 | 0 | } |
233 | | |
234 | | } // namespace doris |
235 | | // clang-format on |
236 | | // vim: et tw=80 ts=2 sw=2 cc=80: |