Coverage Report

Created: 2024-11-21 14:00

/root/doris/common/cpp/sync_point.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
// Most code of this file is copied from rocksdb SyncPoint.
19
// https://github.com/facebook/rocksdb
20
21
// clang-format off
22
#include "sync_point.h"
23
24
#include <atomic>
25
#include <condition_variable>
26
#include <functional>
27
#include <mutex>
28
#include <random>
29
#include <string>
30
#include <thread>
31
#include <unordered_map>
32
#include <unordered_set>
33
34
namespace doris {
35
36
struct SyncPoint::Data { // impl
37
public:
38
1
  Data() : enabled_(false) { }
39
1
  virtual ~Data() {}
40
  void process(const std::string& point, std::vector<std::any>&& cb_args);
41
  void load_dependency(const std::vector<SyncPointPair>& dependencies);
42
  void load_dependency_and_markers(
43
                                const std::vector<SyncPointPair>& dependencies,
44
                                const std::vector<SyncPointPair>& markers);
45
  bool predecessors_all_cleared(const std::string& point);
46
  void set_call_back(const std::string& point,
47
                    const std::function<void(std::vector<std::any>&&)>& callback, CallbackGuard*);
48
  void clear_call_back(const std::string& point);
49
  void clear_all_call_backs();
50
  void enable_processing();
51
  void disable_processing();
52
  void clear_trace();
53
  bool has_point(const std::string& point);
54
  bool get_enable();
55
private:
56
  bool disable_by_marker(const std::string& point, std::thread::id thread_id);
57
private:
58
  // successor/predecessor map loaded from load_dependency
59
  std::unordered_map<std::string, std::vector<std::string>> successors_;
60
  std::unordered_map<std::string, std::vector<std::string>> predecessors_;
61
  std::unordered_map<std::string, std::function<void(std::vector<std::any>&&)>> callbacks_;
62
  std::unordered_map<std::string, std::vector<std::string>> markers_;
63
  std::unordered_map<std::string, std::thread::id> marked_thread_id_;
64
  std::mutex mutex_;
65
  std::condition_variable cv_;
66
  // sync points that have been passed through
67
  std::unordered_set<std::string> cleared_points_;
68
  std::atomic<bool> enabled_;
69
  int num_callbacks_running_ = 0;
70
};
71
72
212k
SyncPoint* SyncPoint::get_instance() {
73
212k
  static SyncPoint sync_point;
74
212k
  return &sync_point;
75
212k
}
76
SyncPoint::SyncPoint() : 
77
1
  impl_(new Data) {
78
1
}
79
1
SyncPoint:: ~SyncPoint() {
80
1
  delete impl_;
81
1
}
82
0
void SyncPoint::load_dependency(const std::vector<SyncPointPair>& dependencies) {
83
0
  impl_->load_dependency(dependencies);
84
0
}
85
void SyncPoint::load_dependency_and_markers(
86
                                const std::vector<SyncPointPair>& dependencies,
87
0
                                const std::vector<SyncPointPair>& markers) {
88
0
  impl_->load_dependency_and_markers(dependencies, markers);
89
0
}
90
void SyncPoint::set_call_back(const std::string& point,
91
73
                              const std::function<void(std::vector<std::any>&&)>& callback, CallbackGuard* guard) {
92
73
  impl_->set_call_back(point, callback, guard);
93
73
}
94
64
void SyncPoint::clear_call_back(const std::string& point) {
95
64
  impl_->clear_call_back(point);
96
64
}
97
3
void SyncPoint::clear_all_call_backs() {
98
3
  impl_->clear_all_call_backs();
99
3
}
100
27
void SyncPoint::enable_processing() {
101
27
  impl_->enable_processing();
102
27
}
103
1
void SyncPoint::disable_processing() {
104
1
  impl_->disable_processing();
105
1
}
106
0
void SyncPoint::clear_trace() {
107
0
  impl_->clear_trace();
108
0
}
109
16.4k
void SyncPoint::process(const std::string& point, std::vector<std::any>&& cb_arg) {
110
16.4k
  impl_->process(point, std::move(cb_arg));
111
16.4k
}
112
192k
bool SyncPoint::has_point(const std::string& point) {
113
192k
  return impl_->has_point(point);
114
192k
}
115
116
198k
bool SyncPoint::get_enable() {
117
198k
  return impl_->get_enable();
118
198k
}
119
120
// =============================================================================
121
// SyncPoint implementation
122
// =============================================================================
123
124
void SyncPoint::Data::load_dependency(
125
0
                               const std::vector<SyncPointPair>& dependencies) {
126
0
  std::lock_guard lock(mutex_);
127
0
  successors_.clear();
128
0
  predecessors_.clear();
129
0
  cleared_points_.clear();
130
0
  for (const auto& dependency : dependencies) {
131
0
    successors_[dependency.predecessor].push_back(dependency.successor);
132
0
    predecessors_[dependency.successor].push_back(dependency.predecessor);
133
0
  }
134
0
  cv_.notify_all();
135
0
}
136
137
/**
138
 * Markers are also dependency descriptions
139
 */
140
void SyncPoint::Data::load_dependency_and_markers(
141
                                const std::vector<SyncPointPair>& dependencies,
142
0
                                const std::vector<SyncPointPair>& markers) {
143
0
  std::lock_guard lock(mutex_);
144
0
  successors_.clear();
145
0
  predecessors_.clear();
146
0
  cleared_points_.clear();
147
0
  markers_.clear();
148
0
  marked_thread_id_.clear();
149
0
  for (const auto& dependency : dependencies) {
150
0
    successors_[dependency.predecessor].push_back(dependency.successor);
151
0
    predecessors_[dependency.successor].push_back(dependency.predecessor);
152
0
  }
153
0
  for (const auto& marker : markers) {
154
0
    successors_[marker.predecessor].push_back(marker.successor);
155
0
    predecessors_[marker.successor].push_back(marker.predecessor);
156
0
    markers_[marker.predecessor].push_back(marker.successor);
157
0
  }
158
0
  cv_.notify_all();
159
0
}
160
161
16.3k
bool SyncPoint::Data::predecessors_all_cleared(const std::string& point) {
162
16.3k
  for (const auto& pred : predecessors_[point]) {
163
0
    if (cleared_points_.count(pred) == 0) {
164
0
      return false;
165
0
    }
166
0
  }
167
16.3k
  return true;
168
16.3k
}
169
170
64
void SyncPoint::Data::clear_call_back(const std::string& point) {
171
64
  std::unique_lock lock(mutex_);
172
64
  callbacks_.erase(point);
173
64
}
174
175
3
void SyncPoint::Data::clear_all_call_backs() {
176
3
  std::unique_lock lock(mutex_);
177
3
  callbacks_.clear();
178
3
}
179
180
16.4k
void SyncPoint::Data::process(const std::string& point, std::vector<std::any>&& cb_arg) {
181
16.4k
  if (!enabled_) {
182
140
    return;
183
140
  }
184
16.3k
  std::unique_lock lock(mutex_);
185
16.3k
  auto thread_id = std::this_thread::get_id();
186
16.3k
  auto marker_iter = markers_.find(point);
187
  // if current sync point is a marker
188
  // record it in marked_thread_id_ for all its successors
189
16.3k
  if (marker_iter != markers_.end()) {
190
0
    for (auto& marked_point : marker_iter->second) {
191
0
      marked_thread_id_.emplace(marked_point, thread_id);
192
0
    }
193
0
  }
194
  // if current point is a marker's successor 
195
16.3k
  if (disable_by_marker(point, thread_id)) {
196
0
    return;
197
0
  }
198
16.3k
  while (!predecessors_all_cleared(point)) {
199
0
    cv_.wait(lock);
200
0
    if (disable_by_marker(point, thread_id)) {
201
0
      return;
202
0
    }
203
0
  }
204
16.3k
  auto callback_pair = callbacks_.find(point);
205
16.3k
  if (callback_pair != callbacks_.end()) {
206
3.36k
    num_callbacks_running_++;
207
3.36k
    auto callback = callback_pair->second; 
208
3.36k
    mutex_.unlock();
209
3.36k
    callback(std::move(cb_arg));
210
3.36k
    mutex_.lock();
211
3.36k
    num_callbacks_running_--;
212
3.36k
  }
213
16.3k
  cleared_points_.insert(point);
214
16.3k
  cv_.notify_all();
215
16.3k
}
216
217
bool SyncPoint::Data::disable_by_marker(const std::string& point,
218
16.3k
                                        std::thread::id thread_id) {
219
16.3k
  auto marked_point_iter = marked_thread_id_.find(point);
220
16.3k
  return marked_point_iter != marked_thread_id_.end() // is a successor
221
16.3k
          && thread_id != marked_point_iter->second;
222
16.3k
}
223
224
void SyncPoint::Data::set_call_back(const std::string& point,
225
73
                                  const std::function<void(std::vector<std::any>&&)>& callback, CallbackGuard* guard) {
226
73
  {
227
73
    std::lock_guard lock(mutex_);
228
73
    callbacks_[point] = callback;
229
73
  }
230
231
73
  if (guard != nullptr) {
232
35
    *guard = CallbackGuard(point);
233
35
  }
234
73
}
235
236
0
void SyncPoint::Data::clear_trace() {
237
0
  std::lock_guard lock(mutex_);
238
0
  cleared_points_.clear();
239
0
}
240
241
27
void SyncPoint::Data::enable_processing() {
242
27
  enabled_ = true;
243
27
}
244
245
1
void SyncPoint::Data::disable_processing() {
246
1
  enabled_ = false;
247
1
}
248
249
192k
bool SyncPoint::Data::has_point(const std::string& point) {
250
192k
  std::unique_lock lock(mutex_);
251
192k
  return callbacks_.find(point) != callbacks_.end();
252
192k
}
253
254
198k
bool SyncPoint::Data::get_enable() {
255
198k
  return enabled_;
256
198k
}
257
258
} // namespace doris
259
// clang-format on
260
// vim: et tw=80 ts=2 sw=2 cc=80: