Coverage Report

Created: 2025-07-27 00:06

/root/doris/be/src/common/sync_point.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
// Most code of this file is copied from rocksdb SyncPoint.
19
// https://github.com/facebook/rocksdb
20
21
// clang-format off
22
#include "sync_point.h"
23
24
#include <atomic>
25
#include <condition_variable>
26
#include <functional>
27
#include <mutex>
28
#include <random>
29
#include <string>
30
#include <thread>
31
#include <unordered_map>
32
#include <unordered_set>
33
34
namespace doris {
35
36
struct SyncPoint::Data { // impl
37
public:
38
1
  Data() : enabled_(false) { }
39
1
  virtual ~Data() {}
40
  void process(const std::string& point, std::vector<std::any>&& cb_args);
41
  void load_dependency(const std::vector<SyncPointPair>& dependencies);
42
  void load_dependency_and_markers(
43
                                const std::vector<SyncPointPair>& dependencies,
44
                                const std::vector<SyncPointPair>& markers);
45
  bool predecessors_all_cleared(const std::string& point);
46
  void set_call_back(const std::string& point,
47
                    const std::function<void(std::vector<std::any>&&)>& callback);
48
  void clear_call_back(const std::string& point);
49
  void clear_all_call_backs();
50
  void enable_processing();
51
  void disable_processing();
52
  void clear_trace();
53
private:
54
  bool disable_by_marker(const std::string& point, std::thread::id thread_id);
55
private:
56
  // successor/predecessor map loaded from load_dependency
57
  std::unordered_map<std::string, std::vector<std::string>> successors_;
58
  std::unordered_map<std::string, std::vector<std::string>> predecessors_;
59
  std::unordered_map<std::string, std::function<void(std::vector<std::any>&&)>> callbacks_;
60
  std::unordered_map<std::string, std::vector<std::string>> markers_;
61
  std::unordered_map<std::string, std::thread::id> marked_thread_id_;
62
  std::mutex mutex_;
63
  std::condition_variable cv_;
64
  // sync points that have been passed through
65
  std::unordered_set<std::string> cleared_points_;
66
  std::atomic<bool> enabled_;
67
  int num_callbacks_running_ = 0;
68
};
69
70
70.2k
SyncPoint* SyncPoint::get_instance() {
71
70.2k
  static SyncPoint sync_point;
72
70.2k
  return &sync_point;
73
70.2k
}
74
SyncPoint::SyncPoint() : 
75
1
  impl_(new Data) {
76
1
}
77
1
SyncPoint:: ~SyncPoint() {
78
1
  delete impl_;
79
1
}
80
0
void SyncPoint::load_dependency(const std::vector<SyncPointPair>& dependencies) {
81
0
  impl_->load_dependency(dependencies);
82
0
}
83
void SyncPoint::load_dependency_and_markers(
84
                                const std::vector<SyncPointPair>& dependencies,
85
0
                                const std::vector<SyncPointPair>& markers) {
86
0
  impl_->load_dependency_and_markers(dependencies, markers);
87
0
}
88
void SyncPoint::set_call_back(const std::string& point,
89
9
                              const std::function<void(std::vector<std::any>&&)>& callback) {
90
9
  impl_->set_call_back(point, callback);
91
9
}
92
4
void SyncPoint::clear_call_back(const std::string& point) {
93
4
  impl_->clear_call_back(point);
94
4
}
95
0
void SyncPoint::clear_all_call_backs() {
96
0
  impl_->clear_all_call_backs();
97
0
}
98
3
void SyncPoint::enable_processing() {
99
3
  impl_->enable_processing();
100
3
}
101
0
void SyncPoint::disable_processing() {
102
0
  impl_->disable_processing();
103
0
}
104
0
void SyncPoint::clear_trace() {
105
0
  impl_->clear_trace();
106
0
}
107
70.2k
void SyncPoint::process(const std::string& point, std::vector<std::any>&& cb_arg) {
108
70.2k
  impl_->process(point, std::move(cb_arg));
109
70.2k
}
110
111
// =============================================================================
112
// SyncPoint implementation
113
// =============================================================================
114
115
void SyncPoint::Data::load_dependency(
116
0
                               const std::vector<SyncPointPair>& dependencies) {
117
0
  std::lock_guard lock(mutex_);
118
0
  successors_.clear();
119
0
  predecessors_.clear();
120
0
  cleared_points_.clear();
121
0
  for (const auto& dependency : dependencies) {
122
0
    successors_[dependency.predecessor].push_back(dependency.successor);
123
0
    predecessors_[dependency.successor].push_back(dependency.predecessor);
124
0
  }
125
0
  cv_.notify_all();
126
0
}
127
128
/**
129
 * Markers are also dependency descriptions
130
 */
131
void SyncPoint::Data::load_dependency_and_markers(
132
                                const std::vector<SyncPointPair>& dependencies,
133
0
                                const std::vector<SyncPointPair>& markers) {
134
0
  std::lock_guard lock(mutex_);
135
0
  successors_.clear();
136
0
  predecessors_.clear();
137
0
  cleared_points_.clear();
138
0
  markers_.clear();
139
0
  marked_thread_id_.clear();
140
0
  for (const auto& dependency : dependencies) {
141
0
    successors_[dependency.predecessor].push_back(dependency.successor);
142
0
    predecessors_[dependency.successor].push_back(dependency.predecessor);
143
0
  }
144
0
  for (const auto& marker : markers) {
145
0
    successors_[marker.predecessor].push_back(marker.successor);
146
0
    predecessors_[marker.successor].push_back(marker.predecessor);
147
0
    markers_[marker.predecessor].push_back(marker.successor);
148
0
  }
149
0
  cv_.notify_all();
150
0
}
151
152
70.0k
bool SyncPoint::Data::predecessors_all_cleared(const std::string& point) {
153
70.0k
  for (const auto& pred : predecessors_[point]) {
154
0
    if (cleared_points_.count(pred) == 0) {
155
0
      return false;
156
0
    }
157
0
  }
158
70.0k
  return true;
159
70.0k
}
160
161
4
void SyncPoint::Data::clear_call_back(const std::string& point) {
162
4
  std::unique_lock lock(mutex_);
163
4
  callbacks_.erase(point);
164
4
}
165
166
0
void SyncPoint::Data::clear_all_call_backs() {
167
0
  std::unique_lock lock(mutex_);
168
0
  callbacks_.clear();
169
0
}
170
171
70.2k
void SyncPoint::Data::process(const std::string& point, std::vector<std::any>&& cb_arg) {
172
70.2k
  if (!enabled_) {
173
248
    return;
174
248
  }
175
70.0k
  std::unique_lock lock(mutex_);
176
70.0k
  auto thread_id = std::this_thread::get_id();
177
70.0k
  auto marker_iter = markers_.find(point);
178
  // if current sync point is a marker
179
  // record it in marked_thread_id_ for all its successors
180
70.0k
  if (marker_iter != markers_.end()) {
181
0
    for (auto& marked_point : marker_iter->second) {
182
0
      marked_thread_id_.emplace(marked_point, thread_id);
183
0
    }
184
0
  }
185
  // if current point is a marker's successor 
186
70.0k
  if (disable_by_marker(point, thread_id)) {
187
0
    return;
188
0
  }
189
70.0k
  while (!predecessors_all_cleared(point)) {
190
0
    cv_.wait(lock);
191
0
    if (disable_by_marker(point, thread_id)) {
192
0
      return;
193
0
    }
194
0
  }
195
70.0k
  auto callback_pair = callbacks_.find(point);
196
70.0k
  if (callback_pair != callbacks_.end()) {
197
22
    num_callbacks_running_++;
198
22
    auto callback = callback_pair->second; 
199
22
    mutex_.unlock();
200
22
    callback(std::move(cb_arg));
201
22
    mutex_.lock();
202
22
    num_callbacks_running_--;
203
22
  }
204
70.0k
  cleared_points_.insert(point);
205
70.0k
  cv_.notify_all();
206
70.0k
}
207
208
bool SyncPoint::Data::disable_by_marker(const std::string& point,
209
70.0k
                                        std::thread::id thread_id) {
210
70.0k
  auto marked_point_iter = marked_thread_id_.find(point);
211
70.0k
  return marked_point_iter != marked_thread_id_.end() // is a successor
212
70.0k
          && thread_id != marked_point_iter->second;
213
70.0k
}
214
215
void SyncPoint::Data::set_call_back(const std::string& point,
216
9
                                  const std::function<void(std::vector<std::any>&&)>& callback) {
217
9
  std::lock_guard lock(mutex_);
218
9
  callbacks_[point] = callback;
219
9
}
220
221
0
void SyncPoint::Data::clear_trace() {
222
0
  std::lock_guard lock(mutex_);
223
0
  cleared_points_.clear();
224
0
}
225
226
3
void SyncPoint::Data::enable_processing() {
227
3
  enabled_ = true;
228
3
}
229
230
0
void SyncPoint::Data::disable_processing() {
231
0
  enabled_ = false;
232
0
}
233
234
} // namespace doris
235
// clang-format on
236
// vim: et tw=80 ts=2 sw=2 cc=80: