Coverage Report

Created: 2024-11-22 12:06

/root/doris/be/src/common/sync_point.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
// Most code of this file is copied from rocksdb SyncPoint.
19
// https://github.com/facebook/rocksdb
20
21
#pragma once
22
// clang-format off
23
#include <functional>
24
#include <iostream>
25
#include <string>
26
#include <vector>
27
#include <any>
28
29
namespace doris {
30
31
#define SYNC_POINT_HOOK_RETURN_VALUE(expr, point_name, ...)                              \
32
25.8k
    [&]() mutable {                                                                      \
33
25.8k
        TEST_SYNC_POINT_RETURN_WITH_VALUE(point_name, decltype((expr)) {}, __VA_ARGS__); \
34
25.8k
        return (expr);                                                                   \
35
25.8k
    }()
36
37
// This class provides facility to reproduce race conditions deterministically
38
// in unit tests.
39
// Developer could specify sync points in the codebase via TEST_SYNC_POINT.
40
// Each sync point represents a position in the execution stream of a thread.
41
// In the unit test, 'Happens After' relationship among sync points could be
42
// setup via SyncPoint::load_dependency, to reproduce a desired interleave of
43
// threads execution.
44
// Refer to (DBTest,TransactionLogIteratorRace), for an example use case.
45
class SyncPoint {
46
public:
47
  static SyncPoint* get_instance();
48
  SyncPoint(const SyncPoint&) = delete;
49
  SyncPoint& operator=(const SyncPoint&) = delete;
50
  ~SyncPoint();
51
  struct SyncPointPair {
52
    std::string predecessor;
53
    std::string successor;
54
  };
55
56
  // call once at the beginning of a test to setup the dependency between
57
  // sync points
58
  //
59
  // Example:
60
  // load_dependency({{"point1", "point2"},
61
  //                  {"point2", "point3"},
62
  //                  {"point3", "point4"}});
63
  //
64
  //    test case thread            thread for object being tested
65
  //        |                                  |
66
  //        |                                  |
67
  //        | \-------------0-------------\    |
68
  //        |                              \-> x  sync point1 set in code
69
  //        |    /----------1----------------/ |
70
  // point2 o <-/                          /-> x  sync point4 set in code
71
  //        |                             /    |
72
  //        z                            /     |
73
  //        z     /---------2-----------/      |  there may be nothing
74
  //        |    /                             |  between point1 point4
75
  // ponit3 o --/                              |  they are for sync
76
  //        |                                  |  between test case and object
77
  //        v                                  v
78
  //
79
  // vertical arrow means the procedure of each thread, the running order will
80
  // be:
81
  // test case thread -> point1 -> point2 -> point3 -> point4 -> object being
82
  // tested
83
  //
84
  // we may do a lot of things between point2 and point3, say, change the
85
  // object's status, call another method, propagate data race and etc.
86
  void load_dependency(const std::vector<SyncPointPair>& dependencies);
87
88
  // call once at the beginning of a test to setup the dependency between
89
  // sync points and setup markers indicating the successor is only enabled
90
  // when it is processed on the same thread as the predecessor.
91
  // When adding a marker, it implicitly adds a dependency for the marker pair.
92
  void load_dependency_and_markers(
93
                                const std::vector<SyncPointPair>& dependencies,
94
                                const std::vector<SyncPointPair>& markers);
95
96
  // The argument to the callback is passed through from
97
  // TEST_SYNC_POINT_CALLBACK(); nullptr if TEST_SYNC_POINT or
98
  // TEST_IDX_SYNC_POINT was used.
99
  void set_call_back(const std::string& point,
100
                     const std::function<void(std::vector<std::any>&&)>& callback);
101
102
  // Clear callback function by point
103
  void clear_call_back(const std::string& point);
104
105
  // Clear all call back functions.
106
  void clear_all_call_backs();
107
108
  // Enable sync point processing (disabled on startup)
109
  void enable_processing();
110
111
  // Disable sync point processing
112
  void disable_processing();
113
114
  // Remove the execution trace of all sync points
115
  void clear_trace();
116
117
  // Triggered by TEST_SYNC_POINT, blocking execution until all predecessors
118
  // are executed.
119
  // And/or call registered callback function, with argument `cb_args`
120
  void process(const std::string& point, std::vector<std::any>&& cb_args = {});
121
122
  // TODO: it might be useful to provide a function that blocks until all
123
  //       sync points are cleared.
124
  // We want this to be public so we can subclass the implementation
125
  struct Data;
126
127
private:
128
   // Singleton
129
  SyncPoint();
130
  Data* impl_; // impletation which is hidden in cpp file
131
};
132
133
template <class T>
134
38
T try_any_cast(const std::any& a) {
135
38
  try {
136
38
    return std::any_cast<T>(a);
137
38
  } catch (const std::bad_any_cast& e) { 
138
0
    std::cerr << e.what() << " expected=" << typeid(T).name() << " actual=" << a.type().name() << std::endl;
139
0
    throw e;
140
0
  }
141
38
}
_ZN5doris12try_any_castIPSt4pairIlbEEET_RKSt3any
Line
Count
Source
134
18
T try_any_cast(const std::any& a) {
135
18
  try {
136
18
    return std::any_cast<T>(a);
137
18
  } catch (const std::bad_any_cast& e) { 
138
0
    std::cerr << e.what() << " expected=" << typeid(T).name() << " actual=" << a.type().name() << std::endl;
139
0
    throw e;
140
0
  }
141
18
}
_ZN5doris12try_any_castIiEET_RKSt3any
Line
Count
Source
134
10
T try_any_cast(const std::any& a) {
135
10
  try {
136
10
    return std::any_cast<T>(a);
137
10
  } catch (const std::bad_any_cast& e) { 
138
0
    std::cerr << e.what() << " expected=" << typeid(T).name() << " actual=" << a.type().name() << std::endl;
139
0
    throw e;
140
0
  }
141
10
}
_ZN5doris12try_any_castIPcEET_RKSt3any
Line
Count
Source
134
6
T try_any_cast(const std::any& a) {
135
6
  try {
136
6
    return std::any_cast<T>(a);
137
6
  } catch (const std::bad_any_cast& e) { 
138
0
    std::cerr << e.what() << " expected=" << typeid(T).name() << " actual=" << a.type().name() << std::endl;
139
0
    throw e;
140
0
  }
141
6
}
_ZN5doris12try_any_castIPbEET_RKSt3any
Line
Count
Source
134
2
T try_any_cast(const std::any& a) {
135
2
  try {
136
2
    return std::any_cast<T>(a);
137
2
  } catch (const std::bad_any_cast& e) { 
138
0
    std::cerr << e.what() << " expected=" << typeid(T).name() << " actual=" << a.type().name() << std::endl;
139
0
    throw e;
140
0
  }
141
2
}
_ZN5doris12try_any_castIPlEET_RKSt3any
Line
Count
Source
134
1
T try_any_cast(const std::any& a) {
135
1
  try {
136
1
    return std::any_cast<T>(a);
137
1
  } catch (const std::bad_any_cast& e) { 
138
0
    std::cerr << e.what() << " expected=" << typeid(T).name() << " actual=" << a.type().name() << std::endl;
139
0
    throw e;
140
0
  }
141
1
}
_ZN5doris12try_any_castIPmEET_RKSt3any
Line
Count
Source
134
1
T try_any_cast(const std::any& a) {
135
1
  try {
136
1
    return std::any_cast<T>(a);
137
1
  } catch (const std::bad_any_cast& e) { 
138
0
    std::cerr << e.what() << " expected=" << typeid(T).name() << " actual=" << a.type().name() << std::endl;
139
0
    throw e;
140
0
  }
141
1
}
142
143
template <typename T>
144
18
auto try_any_cast_ret(std::vector<std::any>& any) {
145
18
    return try_any_cast<std::pair<T, bool>*>(any.back());
146
18
}
147
148
} // namespace doris
149
150
#define SYNC_POINT(x) doris::SyncPoint::get_instance()->process(x)
151
#define IDX_SYNC_POINT(x, index) \
152
    doris::SyncPoint::get_instance()->process(x + std::to_string(index))
153
6
#define SYNC_POINT_CALLBACK(x, ...) doris::SyncPoint::get_instance()->process(x, {__VA_ARGS__})
154
36.7k
#define SYNC_POINT_RETURN_WITH_VALUE(x, default_ret_val, ...) \
155
36.7k
{ \
156
36.7k
  std::pair ret {default_ret_val, false}; \
157
36.7k
  std::vector<std::any> args {__VA_ARGS__}; \
158
36.7k
  args.push_back(&ret); \
159
36.7k
  doris::SyncPoint::get_instance()->process(x, std::move(args)); \
160
36.7k
  if (ret.second) return std::move(ret.first); \
161
36.7k
}
162
2
#define SYNC_POINT_RETURN_WITH_VOID(x, ...) \
163
2
{ \
164
2
  bool pred = false; \
165
2
  std::vector<std::any> args {__VA_ARGS__}; \
166
2
  args.push_back(&pred); \
167
2
  doris::SyncPoint::get_instance()->process(x, std::move(args)); \
168
2
  if (pred) return; \
169
2
}
170
#define SYNC_POINT_SINGLETON() (void)doris::SyncPoint::get_instance() 
171
172
// TEST_SYNC_POINT is no op in release build.
173
// Turn on this feature by defining the macro
174
#ifndef BE_TEST
175
# define TEST_SYNC_POINT(x)
176
# define TEST_IDX_SYNC_POINT(x, index)
177
# define TEST_SYNC_POINT_CALLBACK(x, ...)
178
# define TEST_SYNC_POINT_RETURN_WITH_VALUE(x, default_ret_val, ...)
179
# define TEST_SYNC_POINT_RETURN_WITH_VOID(x, ...)
180
// seldom called
181
# define TEST_SYNC_POINT_SINGLETON()
182
#else
183
// Use TEST_SYNC_POINT to specify sync points inside code base.
184
// Sync points can have happens-after depedency on other sync points,
185
// configured at runtime via SyncPoint::load_dependency. This could be
186
// utilized to re-produce race conditions between threads.
187
# define TEST_SYNC_POINT(x) SYNC_POINT(x)
188
# define TEST_IDX_SYNC_POINT(x, index) IDX_SYNC_POINT(x, index)
189
6
# define TEST_SYNC_POINT_CALLBACK(x, ...) SYNC_POINT_CALLBACK(x, __VA_ARGS__)
190
# define TEST_SYNC_POINT_SINGLETON() SYNC_POINT_SINGLETON()
191
192
/**
193
 * Inject return points for testing.
194
 *
195
 * Currently we can only insert more points to get context from tested thread
196
 * and process in testing thread, e.g.
197
 *
198
 * tested thread:
199
 * ...
200
 * TEST_SYNC_POINT_RETURN_WITH_VALUE("point_ret", int(0), ctx0);
201
 * ...
202
 *
203
 * testing thread:
204
 * sync_point->add("point_ret", [](auto&& args) {
205
 *     auto ctx0 = try_any_cast<bool>(args[0]);
206
 *     auto pair = try_any_cast<std::pair<int, bool>*>(args.back());
207
 *     pair->first = ...;
208
 *     pair->second = ctx0; });
209
 *
210
 * See sync_piont_test.cpp for more details.
211
 */
212
#pragma GCC diagnostic ignored "-Waddress"
213
36.7k
# define TEST_SYNC_POINT_RETURN_WITH_VALUE(x, default_ret_val, ...) SYNC_POINT_RETURN_WITH_VALUE(x, default_ret_val, __VA_ARGS__)
214
2
# define TEST_SYNC_POINT_RETURN_WITH_VOID(x, ...) SYNC_POINT_RETURN_WITH_VOID(x, __VA_ARGS__)
215
216
#endif // BE_TEST
217
218
// TODO: define injection point in production env.
219
//       the `if` expr can be live configure of the application
220
#ifndef ENABLE_INJECTION_POINT
221
# define TEST_INJECTION_POINT(x)
222
# define TEST_IDX_INJECTION_POINT(x, index)
223
# define TEST_INJECTION_POINT_CALLBACK(x, ...)
224
# define TEST_INJECTION_POINT_RETURN_WITH_VALUE(x, default_ret_val, ...)
225
# define TEST_INJECTION_POINT_RETURN_WITH_VOID(x, ...)
226
# define TEST_INJECTION_POINT_SINGLETON()
227
#else
228
namespace doris::config {
229
extern bool enable_injection_point;
230
}
231
# define TEST_INJECTION_POINT(x) if (doris::config::enable_injection_point) { SYNC_POINT(x); }
232
# define TEST_IDX_INJECTION_POINT(x, index) if (doris::config::enable_injection_point) { IDX_SYNC_POINT(x, index); }
233
# define TEST_INJECTION_POINT_CALLBACK(x, ...) if (doris::config::enable_injection_point) { SYNC_POINT_CALLBACK(x, __VA_ARGS__); }
234
# define TEST_INJECTION_POINT_SINGLETON() if (doris::config::enable_injection_point) { SYNC_POINT_SINGLETON(); }
235
# define TEST_INJECTION_POINT_RETURN_WITH_VALUE(x, default_ret_val, ...) if (doris::config::enable_injection_point) { SYNC_POINT_RETURN_WITH_VALUE(x, default_ret_val, __VA_ARGS__); }
236
# define TEST_INJECTION_POINT_RETURN_WITH_VOID(x, ...) if (doris::config::enable_injection_point) { SYNC_POINT_RETURN_WITH_VOID(x, __VA_ARGS__); }
237
#endif // ENABLE_INJECTION_POINT
238
239
// clang-format on
240
// vim: et tw=80 ts=2 sw=2 cc=80: