/root/doris/common/cpp/sync_point.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | // Most code of this file is copied from rocksdb SyncPoint. |
19 | | // https://github.com/facebook/rocksdb |
20 | | |
21 | | // clang-format off |
22 | | #include "sync_point.h" |
23 | | |
24 | | #include <atomic> |
25 | | #include <condition_variable> |
26 | | #include <functional> |
27 | | #include <mutex> |
28 | | #include <random> |
29 | | #include <string> |
30 | | #include <thread> |
31 | | #include <unordered_map> |
32 | | #include <unordered_set> |
33 | | |
34 | | namespace doris { |
35 | | |
36 | | struct SyncPoint::Data { // impl |
37 | | public: |
38 | 1 | Data() : enabled_(false) { } |
39 | 1 | virtual ~Data() {} |
40 | | void process(const std::string& point, std::vector<std::any>&& cb_args); |
41 | | void load_dependency(const std::vector<SyncPointPair>& dependencies); |
42 | | void load_dependency_and_markers( |
43 | | const std::vector<SyncPointPair>& dependencies, |
44 | | const std::vector<SyncPointPair>& markers); |
45 | | bool predecessors_all_cleared(const std::string& point); |
46 | | void set_call_back(const std::string& point, |
47 | | const std::function<void(std::vector<std::any>&&)>& callback, CallbackGuard*); |
48 | | void clear_call_back(const std::string& point); |
49 | | void clear_all_call_backs(); |
50 | | void enable_processing(); |
51 | | void disable_processing(); |
52 | | void clear_trace(); |
53 | | bool has_point(const std::string& point); |
54 | | bool get_enable(); |
55 | | private: |
56 | | bool disable_by_marker(const std::string& point, std::thread::id thread_id); |
57 | | private: |
58 | | // successor/predecessor map loaded from load_dependency |
59 | | std::unordered_map<std::string, std::vector<std::string>> successors_; |
60 | | std::unordered_map<std::string, std::vector<std::string>> predecessors_; |
61 | | std::unordered_map<std::string, std::function<void(std::vector<std::any>&&)>> callbacks_; |
62 | | std::unordered_map<std::string, std::vector<std::string>> markers_; |
63 | | std::unordered_map<std::string, std::thread::id> marked_thread_id_; |
64 | | std::mutex mutex_; |
65 | | std::condition_variable cv_; |
66 | | // sync points that have been passed through |
67 | | std::unordered_set<std::string> cleared_points_; |
68 | | std::atomic<bool> enabled_; |
69 | | int num_callbacks_running_ = 0; |
70 | | }; |
71 | | |
72 | 212k | SyncPoint* SyncPoint::get_instance() { |
73 | 212k | static SyncPoint sync_point; |
74 | 212k | return &sync_point; |
75 | 212k | } |
76 | | SyncPoint::SyncPoint() : |
77 | 1 | impl_(new Data) { |
78 | 1 | } |
79 | 1 | SyncPoint:: ~SyncPoint() { |
80 | 1 | delete impl_; |
81 | 1 | } |
82 | 0 | void SyncPoint::load_dependency(const std::vector<SyncPointPair>& dependencies) { |
83 | 0 | impl_->load_dependency(dependencies); |
84 | 0 | } |
85 | | void SyncPoint::load_dependency_and_markers( |
86 | | const std::vector<SyncPointPair>& dependencies, |
87 | 0 | const std::vector<SyncPointPair>& markers) { |
88 | 0 | impl_->load_dependency_and_markers(dependencies, markers); |
89 | 0 | } |
90 | | void SyncPoint::set_call_back(const std::string& point, |
91 | 73 | const std::function<void(std::vector<std::any>&&)>& callback, CallbackGuard* guard) { |
92 | 73 | impl_->set_call_back(point, callback, guard); |
93 | 73 | } |
94 | 64 | void SyncPoint::clear_call_back(const std::string& point) { |
95 | 64 | impl_->clear_call_back(point); |
96 | 64 | } |
97 | 3 | void SyncPoint::clear_all_call_backs() { |
98 | 3 | impl_->clear_all_call_backs(); |
99 | 3 | } |
100 | 27 | void SyncPoint::enable_processing() { |
101 | 27 | impl_->enable_processing(); |
102 | 27 | } |
103 | 1 | void SyncPoint::disable_processing() { |
104 | 1 | impl_->disable_processing(); |
105 | 1 | } |
106 | 0 | void SyncPoint::clear_trace() { |
107 | 0 | impl_->clear_trace(); |
108 | 0 | } |
109 | 16.4k | void SyncPoint::process(const std::string& point, std::vector<std::any>&& cb_arg) { |
110 | 16.4k | impl_->process(point, std::move(cb_arg)); |
111 | 16.4k | } |
112 | 192k | bool SyncPoint::has_point(const std::string& point) { |
113 | 192k | return impl_->has_point(point); |
114 | 192k | } |
115 | | |
116 | 198k | bool SyncPoint::get_enable() { |
117 | 198k | return impl_->get_enable(); |
118 | 198k | } |
119 | | |
120 | | // ============================================================================= |
121 | | // SyncPoint implementation |
122 | | // ============================================================================= |
123 | | |
124 | | void SyncPoint::Data::load_dependency( |
125 | 0 | const std::vector<SyncPointPair>& dependencies) { |
126 | 0 | std::lock_guard lock(mutex_); |
127 | 0 | successors_.clear(); |
128 | 0 | predecessors_.clear(); |
129 | 0 | cleared_points_.clear(); |
130 | 0 | for (const auto& dependency : dependencies) { |
131 | 0 | successors_[dependency.predecessor].push_back(dependency.successor); |
132 | 0 | predecessors_[dependency.successor].push_back(dependency.predecessor); |
133 | 0 | } |
134 | 0 | cv_.notify_all(); |
135 | 0 | } |
136 | | |
137 | | /** |
138 | | * Markers are also dependency descriptions |
139 | | */ |
140 | | void SyncPoint::Data::load_dependency_and_markers( |
141 | | const std::vector<SyncPointPair>& dependencies, |
142 | 0 | const std::vector<SyncPointPair>& markers) { |
143 | 0 | std::lock_guard lock(mutex_); |
144 | 0 | successors_.clear(); |
145 | 0 | predecessors_.clear(); |
146 | 0 | cleared_points_.clear(); |
147 | 0 | markers_.clear(); |
148 | 0 | marked_thread_id_.clear(); |
149 | 0 | for (const auto& dependency : dependencies) { |
150 | 0 | successors_[dependency.predecessor].push_back(dependency.successor); |
151 | 0 | predecessors_[dependency.successor].push_back(dependency.predecessor); |
152 | 0 | } |
153 | 0 | for (const auto& marker : markers) { |
154 | 0 | successors_[marker.predecessor].push_back(marker.successor); |
155 | 0 | predecessors_[marker.successor].push_back(marker.predecessor); |
156 | 0 | markers_[marker.predecessor].push_back(marker.successor); |
157 | 0 | } |
158 | 0 | cv_.notify_all(); |
159 | 0 | } |
160 | | |
161 | 16.3k | bool SyncPoint::Data::predecessors_all_cleared(const std::string& point) { |
162 | 16.3k | for (const auto& pred : predecessors_[point]) { |
163 | 0 | if (cleared_points_.count(pred) == 0) { |
164 | 0 | return false; |
165 | 0 | } |
166 | 0 | } |
167 | 16.3k | return true; |
168 | 16.3k | } |
169 | | |
170 | 64 | void SyncPoint::Data::clear_call_back(const std::string& point) { |
171 | 64 | std::unique_lock lock(mutex_); |
172 | 64 | callbacks_.erase(point); |
173 | 64 | } |
174 | | |
175 | 3 | void SyncPoint::Data::clear_all_call_backs() { |
176 | 3 | std::unique_lock lock(mutex_); |
177 | 3 | callbacks_.clear(); |
178 | 3 | } |
179 | | |
180 | 16.4k | void SyncPoint::Data::process(const std::string& point, std::vector<std::any>&& cb_arg) { |
181 | 16.4k | if (!enabled_) { |
182 | 140 | return; |
183 | 140 | } |
184 | 16.3k | std::unique_lock lock(mutex_); |
185 | 16.3k | auto thread_id = std::this_thread::get_id(); |
186 | 16.3k | auto marker_iter = markers_.find(point); |
187 | | // if current sync point is a marker |
188 | | // record it in marked_thread_id_ for all its successors |
189 | 16.3k | if (marker_iter != markers_.end()) { |
190 | 0 | for (auto& marked_point : marker_iter->second) { |
191 | 0 | marked_thread_id_.emplace(marked_point, thread_id); |
192 | 0 | } |
193 | 0 | } |
194 | | // if current point is a marker's successor |
195 | 16.3k | if (disable_by_marker(point, thread_id)) { |
196 | 0 | return; |
197 | 0 | } |
198 | 16.3k | while (!predecessors_all_cleared(point)) { |
199 | 0 | cv_.wait(lock); |
200 | 0 | if (disable_by_marker(point, thread_id)) { |
201 | 0 | return; |
202 | 0 | } |
203 | 0 | } |
204 | 16.3k | auto callback_pair = callbacks_.find(point); |
205 | 16.3k | if (callback_pair != callbacks_.end()) { |
206 | 3.36k | num_callbacks_running_++; |
207 | 3.36k | auto callback = callback_pair->second; |
208 | 3.36k | mutex_.unlock(); |
209 | 3.36k | callback(std::move(cb_arg)); |
210 | 3.36k | mutex_.lock(); |
211 | 3.36k | num_callbacks_running_--; |
212 | 3.36k | } |
213 | 16.3k | cleared_points_.insert(point); |
214 | 16.3k | cv_.notify_all(); |
215 | 16.3k | } |
216 | | |
217 | | bool SyncPoint::Data::disable_by_marker(const std::string& point, |
218 | 16.3k | std::thread::id thread_id) { |
219 | 16.3k | auto marked_point_iter = marked_thread_id_.find(point); |
220 | 16.3k | return marked_point_iter != marked_thread_id_.end() // is a successor |
221 | 16.3k | && thread_id != marked_point_iter->second; |
222 | 16.3k | } |
223 | | |
224 | | void SyncPoint::Data::set_call_back(const std::string& point, |
225 | 73 | const std::function<void(std::vector<std::any>&&)>& callback, CallbackGuard* guard) { |
226 | 73 | { |
227 | 73 | std::lock_guard lock(mutex_); |
228 | 73 | callbacks_[point] = callback; |
229 | 73 | } |
230 | | |
231 | 73 | if (guard != nullptr) { |
232 | 35 | *guard = CallbackGuard(point); |
233 | 35 | } |
234 | 73 | } |
235 | | |
236 | 0 | void SyncPoint::Data::clear_trace() { |
237 | 0 | std::lock_guard lock(mutex_); |
238 | 0 | cleared_points_.clear(); |
239 | 0 | } |
240 | | |
241 | 27 | void SyncPoint::Data::enable_processing() { |
242 | 27 | enabled_ = true; |
243 | 27 | } |
244 | | |
245 | 1 | void SyncPoint::Data::disable_processing() { |
246 | 1 | enabled_ = false; |
247 | 1 | } |
248 | | |
249 | 192k | bool SyncPoint::Data::has_point(const std::string& point) { |
250 | 192k | std::unique_lock lock(mutex_); |
251 | 192k | return callbacks_.find(point) != callbacks_.end(); |
252 | 192k | } |
253 | | |
254 | 198k | bool SyncPoint::Data::get_enable() { |
255 | 198k | return enabled_; |
256 | 198k | } |
257 | | |
258 | | } // namespace doris |
259 | | // clang-format on |
260 | | // vim: et tw=80 ts=2 sw=2 cc=80: |