Coverage Report

Created: 2025-10-23 08:07

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/common/cpp/sync_point.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
// Most code of this file is copied from rocksdb SyncPoint.
19
// https://github.com/facebook/rocksdb
20
21
#pragma once
22
// clang-format off
23
#include <functional>
24
#include <iostream>
25
#include <string>
26
#include <vector>
27
#include <any>
28
29
namespace doris {
30
31
#define SYNC_POINT_HOOK_RETURN_VALUE(expr, point_name, ...)                              \
32
    [&]() mutable {                                                                      \
33
        TEST_SYNC_POINT_RETURN_WITH_VALUE(point_name, decltype((expr)) {}, __VA_ARGS__); \
34
        return (expr);                                                                   \
35
    }()
36
37
// This class provides facility to reproduce race conditions deterministically
38
// in unit tests.
39
// Developer could specify sync points in the codebase via TEST_SYNC_POINT.
40
// Each sync point represents a position in the execution stream of a thread.
41
// In the unit test, 'Happens After' relationship among sync points could be
42
// setup via SyncPoint::load_dependency, to reproduce a desired interleave of
43
// threads execution.
44
// Refer to (DBTest,TransactionLogIteratorRace), for an example use case.
45
class SyncPoint {
46
public:
47
  static SyncPoint* get_instance();
48
  SyncPoint(const SyncPoint&) = delete;
49
  SyncPoint& operator=(const SyncPoint&) = delete;
50
  ~SyncPoint();
51
  struct SyncPointPair {
52
    std::string predecessor;
53
    std::string successor;
54
  };
55
56
  // call once at the beginning of a test to setup the dependency between
57
  // sync points
58
  //
59
  // Example:
60
  // load_dependency({{"point1", "point2"},
61
  //                  {"point2", "point3"},
62
  //                  {"point3", "point4"}});
63
  //
64
  //    test case thread            thread for object being tested
65
  //        |                                  |
66
  //        |                                  |
67
  //        | \-------------0-------------\    |
68
  //        |                              \-> x  sync point1 set in code
69
  //        |    /----------1----------------/ |
70
  // point2 o <-/                          /-> x  sync point4 set in code
71
  //        |                             /    |
72
  //        z                            /     |
73
  //        z     /---------2-----------/      |  there may be nothing
74
  //        |    /                             |  between point1 point4
75
  // ponit3 o --/                              |  they are for sync
76
  //        |                                  |  between test case and object
77
  //        v                                  v
78
  //
79
  // vertical arrow means the procedure of each thread, the running order will
80
  // be:
81
  // test case thread -> point1 -> point2 -> point3 -> point4 -> object being
82
  // tested
83
  //
84
  // we may do a lot of things between point2 and point3, say, change the
85
  // object's status, call another method, propagate data race and etc.
86
  void load_dependency(const std::vector<SyncPointPair>& dependencies);
87
88
  // call once at the beginning of a test to setup the dependency between
89
  // sync points and setup markers indicating the successor is only enabled
90
  // when it is processed on the same thread as the predecessor.
91
  // When adding a marker, it implicitly adds a dependency for the marker pair.
92
  void load_dependency_and_markers(
93
                                const std::vector<SyncPointPair>& dependencies,
94
                                const std::vector<SyncPointPair>& markers);
95
96
  class CallbackGuard {
97
  public:
98
    CallbackGuard() = default;
99
35
    explicit CallbackGuard(std::string point) : _point(std::move(point)) {}
100
0
    ~CallbackGuard() {
101
0
      if (!_point.empty()) {
102
0
        get_instance()->clear_call_back(_point);
103
0
      }
104
0
    }
105
    CallbackGuard(const CallbackGuard&) = delete;
106
    CallbackGuard& operator=(const CallbackGuard&) = delete;
107
108
0
    CallbackGuard(CallbackGuard&& other) noexcept {
109
0
      if (!_point.empty() && _point != other._point) {
110
0
        get_instance()->clear_call_back(_point);
111
0
      }
112
0
      _point = std::move(other._point);
113
0
    }
114
115
35
    CallbackGuard& operator=(CallbackGuard&& other) noexcept {
116
35
      if (!_point.empty() && _point != other._point) {
117
0
        get_instance()->clear_call_back(_point);
118
0
      }
119
35
      _point = std::move(other._point);
120
35
      return *this;
121
35
    };
122
123
  private:
124
    std::string _point;
125
  };
126
127
  // The argument to the callback is passed through from
128
  // TEST_SYNC_POINT_CALLBACK(); nullptr if TEST_SYNC_POINT or
129
  // TEST_IDX_SYNC_POINT was used.
130
  // If `guard` is not nullptr, method will return a `CallbackGuard` object which will clear the
131
  // callback when it is destructed.
132
  void set_call_back(const std::string& point,
133
                     const std::function<void(std::vector<std::any>&&)>& callback,
134
                     CallbackGuard* guard = nullptr);
135
136
  // Clear callback function by point
137
  void clear_call_back(const std::string& point);
138
139
  // Clear all call back functions.
140
  void clear_all_call_backs();
141
142
  // Enable sync point processing (disabled on startup)
143
  void enable_processing();
144
145
  // Disable sync point processing
146
  void disable_processing();
147
148
  // Remove the execution trace of all sync points
149
  void clear_trace();
150
151
  // Triggered by TEST_SYNC_POINT, blocking execution until all predecessors
152
  // are executed.
153
  // And/or call registered callback function, with argument `cb_args`
154
  void process(const std::string& point, std::vector<std::any>&& cb_args = {});
155
156
  // Check if this point is registered
157
  bool has_point(const std::string& point);
158
159
  // Get this point if enabled
160
  bool get_enable();
161
  // TODO: it might be useful to provide a function that blocks until all
162
  //       sync points are cleared.
163
  // We want this to be public so we can subclass the implementation
164
  struct Data;
165
166
private:
167
   // Singleton
168
  SyncPoint();
169
  Data* impl_; // impletation which is hidden in cpp file
170
};
171
172
template <class T>
173
T try_any_cast(const std::any& a) {
174
  try {
175
    return std::any_cast<T>(a);
176
  } catch (const std::bad_any_cast& e) { 
177
    std::cerr << e.what() << " expected=" << typeid(T).name() << " actual=" << a.type().name() << std::endl;
178
    throw e;
179
  }
180
}
181
182
template <typename T>
183
auto try_any_cast_ret(std::vector<std::any>& any) {
184
    return try_any_cast<std::pair<T, bool>*>(any.back());
185
}
186
187
} // namespace doris
188
189
#define SYNC_POINT(x) doris::SyncPoint::get_instance()->process(x)
190
#define IDX_SYNC_POINT(x, index) \
191
    doris::SyncPoint::get_instance()->process(x + std::to_string(index))
192
#define SYNC_POINT_CALLBACK(x, ...) doris::SyncPoint::get_instance()->process(x, {__VA_ARGS__})
193
#define SYNC_POINT_RETURN_WITH_VALUE(x, default_ret_val, ...) \
194
{ \
195
  auto sync_point = doris::SyncPoint::get_instance(); \
196
  if (sync_point->get_enable() && sync_point->has_point(x)) { \
197
    std::pair ret {default_ret_val, false}; \
198
    std::vector<std::any> args {__VA_ARGS__}; \
199
    args.emplace_back(&ret); \
200
    sync_point->process(x, std::move(args)); \
201
    if (ret.second) return std::move(ret.first); \
202
  } \
203
}
204
#define SYNC_POINT_RETURN_WITH_VOID(x, ...) \
205
{ \
206
  bool pred = false; \
207
  std::vector<std::any> args {__VA_ARGS__}; \
208
  args.emplace_back(&pred); \
209
  doris::SyncPoint::get_instance()->process(x, std::move(args)); \
210
  if (pred) return; \
211
}
212
#define SYNC_POINT_SINGLETON() (void)doris::SyncPoint::get_instance() 
213
214
// TEST_SYNC_POINT is no op in release build.
215
// Turn on this feature by defining the macro
216
#if !defined(BE_TEST) && !defined(ENABLE_INJECTION_POINT)
217
# define TEST_SYNC_POINT(x)
218
# define TEST_IDX_SYNC_POINT(x, index)
219
# define TEST_SYNC_POINT_CALLBACK(x, ...)
220
# define TEST_SYNC_POINT_RETURN_WITH_VALUE(x, default_ret_val, ...)
221
# define TEST_SYNC_POINT_RETURN_WITH_VOID(x, ...)
222
// seldom called
223
# define TEST_SYNC_POINT_SINGLETON()
224
#else
225
// Use TEST_SYNC_POINT to specify sync points inside code base.
226
// Sync points can have happens-after depedency on other sync points,
227
// configured at runtime via SyncPoint::load_dependency. This could be
228
// utilized to re-produce race conditions between threads.
229
# define TEST_SYNC_POINT(x) SYNC_POINT(x)
230
# define TEST_IDX_SYNC_POINT(x, index) IDX_SYNC_POINT(x, index)
231
# define TEST_SYNC_POINT_CALLBACK(x, ...) SYNC_POINT_CALLBACK(x, __VA_ARGS__)
232
# define TEST_SYNC_POINT_SINGLETON() SYNC_POINT_SINGLETON()
233
234
/**
235
 * Inject return points for testing.
236
 *
237
 * Currently we can only insert more points to get context from tested thread
238
 * and process in testing thread, e.g.
239
 *
240
 * tested thread:
241
 * ...
242
 * TEST_SYNC_POINT_RETURN_WITH_VALUE("point_ret", int(0), ctx0);
243
 * ...
244
 *
245
 * testing thread:
246
 * sync_point->add("point_ret", [](auto&& args) {
247
 *     auto ctx0 = try_any_cast<bool>(args[0]);
248
 *     auto pair = try_any_cast<std::pair<int, bool>*>(args.back());
249
 *     pair->first = ...;
250
 *     pair->second = ctx0; });
251
 *
252
 * See sync_piont_test.cpp for more details.
253
 */
254
#pragma GCC diagnostic ignored "-Waddress"
255
# define TEST_SYNC_POINT_RETURN_WITH_VALUE(x, default_ret_val, ...) SYNC_POINT_RETURN_WITH_VALUE(x, default_ret_val, __VA_ARGS__)
256
# define TEST_SYNC_POINT_RETURN_WITH_VOID(x, ...) SYNC_POINT_RETURN_WITH_VOID(x, __VA_ARGS__)
257
258
#endif // BE_TEST
259
260
// TODO: define injection point in production env.
261
//       the `if` expr can be live configure of the application
262
#ifndef ENABLE_INJECTION_POINT
263
# define TEST_INJECTION_POINT(x)
264
# define TEST_IDX_INJECTION_POINT(x, index)
265
# define TEST_INJECTION_POINT_CALLBACK(x, ...)
266
# define TEST_INJECTION_POINT_RETURN_WITH_VALUE(x, default_ret_val, ...)
267
# define TEST_INJECTION_POINT_RETURN_WITH_VOID(x, ...)
268
# define TEST_INJECTION_POINT_SINGLETON()
269
#else
270
# define TEST_INJECTION_POINT(x) SYNC_POINT(x);
271
# define TEST_IDX_INJECTION_POINT(x, index) IDX_SYNC_POINT(x, index);
272
# define TEST_INJECTION_POINT_CALLBACK(x, ...) SYNC_POINT_CALLBACK(x, __VA_ARGS__);
273
# define TEST_INJECTION_POINT_SINGLETON() SYNC_POINT_SINGLETON();
274
# define TEST_INJECTION_POINT_RETURN_WITH_VALUE(x, default_ret_val, ...) SYNC_POINT_RETURN_WITH_VALUE(x, default_ret_val, __VA_ARGS__);
275
# define TEST_INJECTION_POINT_RETURN_WITH_VOID(x, ...) SYNC_POINT_RETURN_WITH_VOID(x, __VA_ARGS__);
276
#endif // ENABLE_INJECTION_POINT
277
278
// clang-format on
279
// vim: et tw=80 ts=2 sw=2 cc=80: