be/src/cloud/injection_point_action.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "cloud/injection_point_action.h" |
19 | | |
20 | | #include <glog/logging.h> |
21 | | |
22 | | #include <chrono> |
23 | | #include <mutex> |
24 | | #include <random> |
25 | | |
26 | | #include "common/status.h" |
27 | | #include "cpp/sync_point.h" |
28 | | #include "io/cache/cached_remote_file_reader.h" |
29 | | #include "service/http/http_channel.h" |
30 | | #include "service/http/http_request.h" |
31 | | #include "service/http/http_status.h" |
32 | | #include "storage/rowset/rowset.h" |
33 | | #include "storage/segment/page_io.h" |
34 | | #include "util/stack_util.h" |
35 | | |
36 | | namespace doris { |
37 | | namespace { |
38 | | |
39 | | // TODO(cyx): Provide an object pool |
40 | | // `suite_map` won't be modified after `register_suites` |
41 | | std::map<std::string, std::function<void()>> suite_map; |
42 | | std::once_flag register_suites_once; |
43 | | |
44 | | // only call once |
45 | 0 | void register_suites() { |
46 | 0 | suite_map.emplace("test_compaction", [] { |
47 | 0 | auto sp = SyncPoint::get_instance(); |
48 | 0 | sp->set_call_back("new_cumulative_point", [](auto&& args) { |
49 | 0 | auto output_rowset = try_any_cast<Rowset*>(args[0]); |
50 | 0 | auto last_cumulative_point = try_any_cast<int64_t>(args[1]); |
51 | 0 | auto& [ret_vault, should_ret] = *try_any_cast<std::pair<int64_t, bool>*>(args.back()); |
52 | 0 | ret_vault = output_rowset->start_version() == last_cumulative_point |
53 | 0 | ? output_rowset->end_version() + 1 |
54 | 0 | : last_cumulative_point; |
55 | 0 | should_ret = true; |
56 | 0 | }); |
57 | 0 | }); |
58 | 0 | suite_map.emplace("test_s3_file_writer", [] { |
59 | 0 | auto* sp = SyncPoint::get_instance(); |
60 | 0 | sp->set_call_back("UploadFileBuffer::upload_to_local_file_cache", [](auto&&) { |
61 | 0 | std::srand(static_cast<unsigned int>(std::time(nullptr))); |
62 | 0 | int random_sleep_time_second = std::rand() % 10 + 1; |
63 | 0 | std::this_thread::sleep_for(std::chrono::seconds(random_sleep_time_second)); |
64 | 0 | }); |
65 | 0 | sp->set_call_back("UploadFileBuffer::upload_to_local_file_cache_inject", [](auto&& args) { |
66 | 0 | auto& [ret_status, should_ret] = *try_any_cast<std::pair<Status, bool>*>(args.back()); |
67 | 0 | ret_status = |
68 | 0 | Status::IOError<false>("failed to write into file cache due to inject error"); |
69 | 0 | should_ret = true; |
70 | 0 | }); |
71 | 0 | }); |
72 | 0 | suite_map.emplace("test_storage_vault", [] { |
73 | 0 | auto* sp = SyncPoint::get_instance(); |
74 | 0 | sp->set_call_back("HdfsFileWriter::append_hdfs_file_delay", [](auto&&) { |
75 | 0 | std::srand(static_cast<unsigned int>(std::time(nullptr))); |
76 | 0 | int random_sleep_time_second = std::rand() % 10 + 1; |
77 | 0 | std::this_thread::sleep_for(std::chrono::seconds(random_sleep_time_second)); |
78 | 0 | }); |
79 | 0 | sp->set_call_back("HdfsFileWriter::append_hdfs_file_error", [](auto&& args) { |
80 | 0 | auto& [_, should_ret] = *try_any_cast<std::pair<Status, bool>*>(args.back()); |
81 | 0 | should_ret = true; |
82 | 0 | }); |
83 | 0 | sp->set_call_back("HdfsFileWriter::hdfsFlush", [](auto&& args) { |
84 | 0 | auto& [ret_value, should_ret] = *try_any_cast<std::pair<Status, bool>*>(args.back()); |
85 | 0 | ret_value = Status::InternalError("failed to flush hdfs file"); |
86 | 0 | should_ret = true; |
87 | 0 | }); |
88 | 0 | sp->set_call_back("HdfsFileWriter::hdfsCloseFile", [](auto&& args) { |
89 | 0 | auto& [ret_value, should_ret] = *try_any_cast<std::pair<Status, bool>*>(args.back()); |
90 | 0 | ret_value = Status::InternalError("failed to flush hdfs file"); |
91 | 0 | should_ret = true; |
92 | 0 | }); |
93 | 0 | sp->set_call_back("HdfsFileWriter::hdfeSync", [](auto&& args) { |
94 | 0 | auto& [ret_value, should_ret] = *try_any_cast<std::pair<Status, bool>*>(args.back()); |
95 | 0 | ret_value = Status::InternalError("failed to flush hdfs file"); |
96 | 0 | should_ret = true; |
97 | 0 | }); |
98 | 0 | sp->set_call_back("HdfsFileReader:read_error", [](auto&& args) { |
99 | 0 | auto& [ret_status, should_ret] = *try_any_cast<std::pair<Status, bool>*>(args.back()); |
100 | 0 | ret_status = Status::InternalError("read hdfs error"); |
101 | 0 | should_ret = true; |
102 | 0 | }); |
103 | 0 | }); |
104 | 0 | suite_map.emplace("test_cancel_node_channel", [] { |
105 | 0 | auto* sp = SyncPoint::get_instance(); |
106 | 0 | sp->set_call_back("VNodeChannel::try_send_block", [](auto&& args) { |
107 | 0 | LOG(INFO) << "injection VNodeChannel::try_send_block"; |
108 | 0 | auto* arg0 = try_any_cast<Status*>(args[0]); |
109 | 0 | *arg0 = Status::InternalError<false>("test_cancel_node_channel injection error"); |
110 | 0 | }); |
111 | 0 | sp->set_call_back("VOlapTableSink::close", |
112 | 0 | [](auto&&) { std::this_thread::sleep_for(std::chrono::seconds(5)); }); |
113 | 0 | }); |
114 | 0 | suite_map.emplace("test_file_segment_cache_corruption", [] { |
115 | 0 | auto* sp = SyncPoint::get_instance(); |
116 | 0 | sp->set_call_back("Segment::open:corruption", [](auto&& args) { |
117 | 0 | LOG(INFO) << "injection Segment::open:corruption"; |
118 | 0 | auto* arg0 = try_any_cast<Status*>(args[0]); |
119 | 0 | *arg0 = Status::Corruption<false>("test_file_segment_cache_corruption injection error"); |
120 | 0 | }); |
121 | 0 | }); |
122 | 0 | suite_map.emplace("test_file_segment_cache_corruption1", [] { |
123 | 0 | auto* sp = SyncPoint::get_instance(); |
124 | 0 | sp->set_call_back("Segment::open:corruption1", [](auto&& args) { |
125 | 0 | LOG(INFO) << "injection Segment::open:corruption1"; |
126 | 0 | auto* arg0 = try_any_cast<Status*>(args[0]); |
127 | 0 | *arg0 = Status::Corruption<false>("test_file_segment_cache_corruption injection error"); |
128 | 0 | }); |
129 | 0 | }); |
130 | | // curl "be_ip:http_port/api/injection_point/apply_suite/PageIO::read_and_decompress_page:crc_failure" |
131 | 0 | suite_map.emplace("PageIO::read_and_decompress_page:crc_failure", [] { |
132 | 0 | auto* sp = SyncPoint::get_instance(); |
133 | 0 | sp->set_call_back("PageIO::read_and_decompress_page:crc_failure_inj", [](auto&& args) { |
134 | 0 | LOG(INFO) << "PageIO::read_and_decompress_page:crc_failure_inj"; |
135 | 0 | if (auto ctx = std::any_cast<segment_v2::InjectionContext*>(args[0])) { |
136 | 0 | uint32_t* crc = ctx->crc; |
137 | 0 | segment_v2::PageReadOptions* opts = ctx->opts; |
138 | 0 | auto cached_file_reader = |
139 | 0 | dynamic_cast<io::CachedRemoteFileReader*>(opts->file_reader); |
140 | 0 | if (cached_file_reader == nullptr) { |
141 | 0 | return; // if not cachedreader, then do nothing |
142 | 0 | } else { |
143 | 0 | memset(crc, 0, 32); |
144 | 0 | } |
145 | 0 | } else { |
146 | 0 | std::cerr << "Failed to cast std::any to InjectionContext*" << std::endl; |
147 | 0 | } |
148 | 0 | }); |
149 | 0 | }); |
150 | | // curl be_ip:http_port/api/injection_point/apply_suite?name=test_cloud_meta_mgr_commit_txn' |
151 | 0 | suite_map.emplace("test_cloud_meta_mgr_commit_txn", [] { |
152 | 0 | auto* sp = SyncPoint::get_instance(); |
153 | 0 | sp->set_call_back("CloudMetaMgr::commit_txn", [](auto&& args) { |
154 | 0 | LOG(INFO) << "injection CloudMetaMgr::commit_txn"; |
155 | 0 | auto* arg0 = try_any_cast_ret<Status>(args); |
156 | 0 | arg0->first = Status::InternalError<false>( |
157 | 0 | "test_file_segment_cache_corruption injection error"); |
158 | 0 | arg0->second = true; |
159 | 0 | }); |
160 | 0 | }); |
161 | | // curl be_ip:http_port/api/injection_point/apply_suite?name=Segment::parse_footer:magic_number_corruption' |
162 | 0 | suite_map.emplace("Segment::parse_footer:magic_number_corruption", [] { |
163 | 0 | auto* sp = SyncPoint::get_instance(); |
164 | 0 | sp->set_call_back("Segment::parse_footer:magic_number_corruption_inj", [](auto&& args) { |
165 | 0 | if (auto p = std::any_cast<uint8_t*>(args[0])) { |
166 | 0 | memset(p, 0, 12); |
167 | 0 | } else { |
168 | 0 | std::cerr << "Failed to cast std::any to uint8_t*" << std::endl; |
169 | 0 | } |
170 | 0 | }); |
171 | 0 | }); |
172 | 0 | } |
173 | | |
174 | 0 | void set_sleep(const std::string& point, HttpRequest* req) { |
175 | 0 | int duration = 0; |
176 | 0 | auto& duration_str = req->param("duration"); |
177 | 0 | if (!duration_str.empty()) { |
178 | 0 | try { |
179 | 0 | duration = std::stoi(duration_str); |
180 | 0 | } catch (const std::exception&) { |
181 | 0 | HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, |
182 | 0 | "invalid duration: " + duration_str); |
183 | 0 | return; |
184 | 0 | } |
185 | 0 | } |
186 | 0 | auto sp = SyncPoint::get_instance(); |
187 | 0 | sp->set_call_back(point, [point, duration](auto&& args) { |
188 | 0 | LOG(INFO) << "injection point hit, point=" << point << " sleep milliseconds=" << duration; |
189 | 0 | std::this_thread::sleep_for(std::chrono::milliseconds(duration)); |
190 | 0 | }); |
191 | 0 | HttpChannel::send_reply(req, HttpStatus::OK, "OK"); |
192 | 0 | } |
193 | | |
194 | 0 | void set_return(const std::string& point, HttpRequest* req) { |
195 | 0 | auto sp = SyncPoint::get_instance(); |
196 | 0 | sp->set_call_back(point, [point](auto&& args) { |
197 | 0 | try { |
198 | 0 | LOG(INFO) << "injection point hit, point=" << point << " return void"; |
199 | 0 | auto pred = try_any_cast<bool*>(args.back()); |
200 | 0 | *pred = true; |
201 | 0 | } catch (const std::bad_any_cast&) { |
202 | 0 | LOG_EVERY_N(ERROR, 10) << "failed to process `return` callback\n" << get_stack_trace(); |
203 | 0 | } |
204 | 0 | }); |
205 | 0 | HttpChannel::send_reply(req, HttpStatus::OK, "OK"); |
206 | 0 | } |
207 | | |
208 | 0 | void set_return_ok(const std::string& point, HttpRequest* req) { |
209 | 0 | auto sp = SyncPoint::get_instance(); |
210 | 0 | sp->set_call_back(point, [point](auto&& args) { |
211 | 0 | try { |
212 | 0 | LOG(INFO) << "injection point hit, point=" << point << " return ok"; |
213 | 0 | auto* pair = try_any_cast_ret<Status>(args); |
214 | 0 | pair->first = Status::OK(); |
215 | 0 | pair->second = true; |
216 | 0 | } catch (const std::bad_any_cast&) { |
217 | 0 | LOG_EVERY_N(ERROR, 10) << "failed to process `return_ok` callback\n" |
218 | 0 | << get_stack_trace(); |
219 | 0 | } |
220 | 0 | }); |
221 | 0 | HttpChannel::send_reply(req, HttpStatus::OK, "OK"); |
222 | 0 | } |
223 | | |
224 | 0 | void set_return_error(const std::string& point, HttpRequest* req) { |
225 | 0 | const std::string CODE_PARAM = "code"; |
226 | 0 | const std::string PROBABILITY_PARAM = "probability"; |
227 | 0 | int code = ErrorCode::INTERNAL_ERROR; |
228 | 0 | double probability = 100.0; |
229 | 0 | auto& code_str = req->param(CODE_PARAM); |
230 | 0 | if (!code_str.empty()) { |
231 | 0 | try { |
232 | 0 | code = std::stoi(code_str); |
233 | 0 | } catch (const std::exception& e) { |
234 | 0 | HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, |
235 | 0 | fmt::format("convert topn failed, {}", e.what())); |
236 | 0 | return; |
237 | 0 | } |
238 | 0 | } |
239 | 0 | auto& probability_str = req->param(PROBABILITY_PARAM); |
240 | 0 | if (!probability_str.empty()) { |
241 | 0 | try { |
242 | 0 | probability = std::stod(probability_str); |
243 | 0 | } catch (const std::exception& e) { |
244 | 0 | HttpChannel::send_reply( |
245 | 0 | req, HttpStatus::BAD_REQUEST, |
246 | 0 | fmt::format("invalid probability: {}, {}", probability_str, e.what())); |
247 | 0 | return; |
248 | 0 | } |
249 | 0 | if (probability < 0.0 || probability > 100.0) { |
250 | 0 | HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, |
251 | 0 | "probability should be between 0 and 100"); |
252 | 0 | return; |
253 | 0 | } |
254 | 0 | } |
255 | | |
256 | 0 | auto sp = SyncPoint::get_instance(); |
257 | 0 | sp->set_call_back(point, [code, point, probability](auto&& args) { |
258 | 0 | try { |
259 | 0 | if (probability < 100.0) { |
260 | 0 | static thread_local std::mt19937 gen(std::random_device {}()); |
261 | 0 | std::uniform_real_distribution<double> dist(0.0, 100.0); |
262 | 0 | double dice = dist(gen); |
263 | 0 | if (dice >= probability) { |
264 | 0 | LOG(INFO) << "injection point hit, point=" << point |
265 | 0 | << " skip error injection, probability=" << probability |
266 | 0 | << "%, random=" << dice; |
267 | 0 | return; |
268 | 0 | } |
269 | 0 | } |
270 | 0 | LOG(INFO) << "injection point hit, point=" << point << " return error code=" << code; |
271 | 0 | auto* pair = try_any_cast_ret<Status>(args); |
272 | 0 | pair->first = Status::Error<false>(code, "injected error"); |
273 | 0 | pair->second = true; |
274 | 0 | } catch (const std::bad_any_cast&) { |
275 | 0 | LOG_EVERY_N(ERROR, 10) << "failed to process `return_error` callback\n" |
276 | 0 | << get_stack_trace(); |
277 | 0 | } |
278 | 0 | }); |
279 | 0 | HttpChannel::send_reply(req, HttpStatus::OK, "OK"); |
280 | 0 | } |
281 | | |
282 | 0 | void set_segfault(const std::string& point, HttpRequest* req) { |
283 | 0 | auto sp = SyncPoint::get_instance(); |
284 | 0 | sp->set_call_back(point, [point](auto&&) { |
285 | 0 | LOG(INFO) << "injection point hit, point=" << point << " trigger segfault"; |
286 | | // Intentional null dereference to crash the BE for testing. |
287 | 0 | volatile int* p = nullptr; |
288 | 0 | *p = 1; |
289 | 0 | }); |
290 | 0 | HttpChannel::send_reply(req, HttpStatus::OK, "OK"); |
291 | 0 | } |
292 | | |
293 | 0 | void handle_set(HttpRequest* req) { |
294 | 0 | auto& point = req->param("name"); |
295 | 0 | if (point.empty()) { |
296 | 0 | HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, "empty point name"); |
297 | 0 | return; |
298 | 0 | } |
299 | 0 | auto& behavior = req->param("behavior"); |
300 | 0 | if (behavior.empty()) { |
301 | 0 | HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, "empty behavior"); |
302 | 0 | return; |
303 | 0 | } |
304 | 0 | if (behavior == "sleep") { |
305 | 0 | set_sleep(point, req); |
306 | 0 | return; |
307 | 0 | } else if (behavior == "return") { |
308 | 0 | set_return(point, req); |
309 | 0 | return; |
310 | 0 | } else if (behavior == "return_ok") { |
311 | 0 | set_return_ok(point, req); |
312 | 0 | return; |
313 | 0 | } else if (behavior == "return_error") { |
314 | 0 | set_return_error(point, req); |
315 | 0 | return; |
316 | 0 | } else if (behavior == "segfault") { |
317 | 0 | set_segfault(point, req); |
318 | 0 | return; |
319 | 0 | } |
320 | 0 | HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, "unknown behavior: " + behavior); |
321 | 0 | } |
322 | | |
323 | 0 | void handle_clear(HttpRequest* req) { |
324 | 0 | const auto& point = req->param("name"); |
325 | 0 | auto* sp = SyncPoint::get_instance(); |
326 | 0 | LOG(INFO) << "clear injection point : " << (point.empty() ? "(all points)" : point); |
327 | 0 | if (point.empty()) { |
328 | | // If point name is emtpy, clear all |
329 | 0 | sp->clear_all_call_backs(); |
330 | 0 | HttpChannel::send_reply(req, HttpStatus::OK, "OK"); |
331 | 0 | return; |
332 | 0 | } |
333 | | |
334 | 0 | sp->clear_call_back(point); |
335 | 0 | HttpChannel::send_reply(req, HttpStatus::OK, "OK"); |
336 | 0 | } |
337 | | |
338 | 0 | void handle_apply_suite(HttpRequest* req) { |
339 | 0 | auto& suite = req->param("name"); |
340 | 0 | if (suite.empty()) { |
341 | 0 | HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, "empty suite name"); |
342 | 0 | return; |
343 | 0 | } |
344 | | |
345 | 0 | std::call_once(register_suites_once, register_suites); |
346 | 0 | if (auto it = suite_map.find(suite); it != suite_map.end()) { |
347 | 0 | it->second(); // set injection callbacks |
348 | 0 | HttpChannel::send_reply(req, HttpStatus::OK, "OK apply suite " + suite + "\n"); |
349 | 0 | return; |
350 | 0 | } |
351 | 0 | HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, |
352 | 0 | "unknown suite: " + suite + "\n"); |
353 | 0 | } |
354 | | |
355 | 0 | void handle_enable(HttpRequest* req) { |
356 | 0 | SyncPoint::get_instance()->enable_processing(); |
357 | 0 | HttpChannel::send_reply(req, HttpStatus::OK, "OK"); |
358 | 0 | } |
359 | | |
360 | 0 | void handle_disable(HttpRequest* req) { |
361 | 0 | SyncPoint::get_instance()->disable_processing(); |
362 | 0 | HttpChannel::send_reply(req, HttpStatus::OK, "OK"); |
363 | 0 | } |
364 | | |
365 | | } // namespace |
366 | | |
367 | 0 | InjectionPointAction::InjectionPointAction() = default; |
368 | | |
369 | | // |
370 | | // enable/disable injection point |
371 | | // ``` |
372 | | // curl "be_ip:http_port/api/injection_point/enable" |
373 | | // curl "be_ip:http_port/api/injection_point/disable" |
374 | | // ``` |
375 | | // |
376 | | // clear all injection points |
377 | | // ``` |
378 | | // curl "be_ip:http_port/api/injection_point/clear" |
379 | | // ``` |
380 | | // |
381 | | // apply/activate specific suite with registered action, see `register_suites()` for more details |
382 | | // ``` |
383 | | // curl "be_ip:http_port/api/injection_point/apply_suite?name=${suite_name}" |
384 | | // ``` |
385 | | // |
386 | | // set predifined action for specific injection point, supported actions are: |
387 | | // * sleep: for injection point with callback, accepted param is `duration` in milliseconds |
388 | | // * return: for injection point without return value (return void) |
389 | | // * return_ok: for injection point with return value, always return Status::OK |
390 | | // * return_error: for injection point with return value, accepted param is `code`, |
391 | | // which is an int, valid values can be found in status.h, e.g. -235 or -230, |
392 | | // if `code` is not present return Status::InternalError. Optional `probability` |
393 | | // determines the percentage of times to inject the error (default 100). |
394 | | // * segfault: dereference a null pointer to crash BE intentionally |
395 | | // ``` |
396 | | // curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=sleep&duration=${x_millsec}" # sleep x millisecs |
397 | | // curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=return" # return void |
398 | | // curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=return_ok" # return ok |
399 | | // curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=return_error" # internal error |
400 | | // curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=return_error&code=${code}" # -235 |
401 | | // curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=return_error&code=${code}&probability=50" # inject with 50% probability |
402 | | // curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=segfault" # crash BE |
403 | | // ``` |
404 | 0 | void InjectionPointAction::handle(HttpRequest* req) { |
405 | 0 | LOG(INFO) << "handle InjectionPointAction " << req->debug_string(); |
406 | 0 | auto& op = req->param("op"); |
407 | 0 | if (op == "set") { |
408 | 0 | handle_set(req); |
409 | 0 | return; |
410 | 0 | } else if (op == "clear") { |
411 | 0 | handle_clear(req); |
412 | 0 | return; |
413 | 0 | } else if (op == "apply_suite") { |
414 | 0 | handle_apply_suite(req); |
415 | 0 | return; |
416 | 0 | } else if (op == "enable") { |
417 | 0 | handle_enable(req); |
418 | 0 | return; |
419 | 0 | } else if (op == "disable") { |
420 | 0 | handle_disable(req); |
421 | 0 | return; |
422 | 0 | } |
423 | | |
424 | 0 | HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, "unknown op: " + op); |
425 | 0 | } |
426 | | |
427 | | } // namespace doris |