Coverage Report

Created: 2026-03-15 22:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/injection_point_action.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/injection_point_action.h"
19
20
#include <glog/logging.h>
21
22
#include <chrono>
23
#include <mutex>
24
#include <random>
25
26
#include "common/status.h"
27
#include "cpp/sync_point.h"
28
#include "io/cache/cached_remote_file_reader.h"
29
#include "service/http/http_channel.h"
30
#include "service/http/http_request.h"
31
#include "service/http/http_status.h"
32
#include "storage/rowset/rowset.h"
33
#include "storage/segment/page_io.h"
34
#include "util/stack_util.h"
35
36
namespace doris {
37
namespace {
38
39
// TODO(cyx): Provide an object pool
40
// `suite_map` won't be modified after `register_suites`
41
std::map<std::string, std::function<void()>> suite_map;
42
std::once_flag register_suites_once;
43
44
// only call once
45
0
void register_suites() {
46
0
    suite_map.emplace("test_compaction", [] {
47
0
        auto sp = SyncPoint::get_instance();
48
0
        sp->set_call_back("new_cumulative_point", [](auto&& args) {
49
0
            auto output_rowset = try_any_cast<Rowset*>(args[0]);
50
0
            auto last_cumulative_point = try_any_cast<int64_t>(args[1]);
51
0
            auto& [ret_vault, should_ret] = *try_any_cast<std::pair<int64_t, bool>*>(args.back());
52
0
            ret_vault = output_rowset->start_version() == last_cumulative_point
53
0
                                ? output_rowset->end_version() + 1
54
0
                                : last_cumulative_point;
55
0
            should_ret = true;
56
0
        });
57
0
    });
58
0
    suite_map.emplace("test_s3_file_writer", [] {
59
0
        auto* sp = SyncPoint::get_instance();
60
0
        sp->set_call_back("UploadFileBuffer::upload_to_local_file_cache", [](auto&&) {
61
0
            std::srand(static_cast<unsigned int>(std::time(nullptr)));
62
0
            int random_sleep_time_second = std::rand() % 10 + 1;
63
0
            std::this_thread::sleep_for(std::chrono::seconds(random_sleep_time_second));
64
0
        });
65
0
        sp->set_call_back("UploadFileBuffer::upload_to_local_file_cache_inject", [](auto&& args) {
66
0
            auto& [ret_status, should_ret] = *try_any_cast<std::pair<Status, bool>*>(args.back());
67
0
            ret_status =
68
0
                    Status::IOError<false>("failed to write into file cache due to inject error");
69
0
            should_ret = true;
70
0
        });
71
0
    });
72
0
    suite_map.emplace("test_storage_vault", [] {
73
0
        auto* sp = SyncPoint::get_instance();
74
0
        sp->set_call_back("HdfsFileWriter::append_hdfs_file_delay", [](auto&&) {
75
0
            std::srand(static_cast<unsigned int>(std::time(nullptr)));
76
0
            int random_sleep_time_second = std::rand() % 10 + 1;
77
0
            std::this_thread::sleep_for(std::chrono::seconds(random_sleep_time_second));
78
0
        });
79
0
        sp->set_call_back("HdfsFileWriter::append_hdfs_file_error", [](auto&& args) {
80
0
            auto& [_, should_ret] = *try_any_cast<std::pair<Status, bool>*>(args.back());
81
0
            should_ret = true;
82
0
        });
83
0
        sp->set_call_back("HdfsFileWriter::hdfsFlush", [](auto&& args) {
84
0
            auto& [ret_value, should_ret] = *try_any_cast<std::pair<Status, bool>*>(args.back());
85
0
            ret_value = Status::InternalError("failed to flush hdfs file");
86
0
            should_ret = true;
87
0
        });
88
0
        sp->set_call_back("HdfsFileWriter::hdfsCloseFile", [](auto&& args) {
89
0
            auto& [ret_value, should_ret] = *try_any_cast<std::pair<Status, bool>*>(args.back());
90
0
            ret_value = Status::InternalError("failed to flush hdfs file");
91
0
            should_ret = true;
92
0
        });
93
0
        sp->set_call_back("HdfsFileWriter::hdfeSync", [](auto&& args) {
94
0
            auto& [ret_value, should_ret] = *try_any_cast<std::pair<Status, bool>*>(args.back());
95
0
            ret_value = Status::InternalError("failed to flush hdfs file");
96
0
            should_ret = true;
97
0
        });
98
0
        sp->set_call_back("HdfsFileReader:read_error", [](auto&& args) {
99
0
            auto& [ret_status, should_ret] = *try_any_cast<std::pair<Status, bool>*>(args.back());
100
0
            ret_status = Status::InternalError("read hdfs error");
101
0
            should_ret = true;
102
0
        });
103
0
    });
104
0
    suite_map.emplace("test_cancel_node_channel", [] {
105
0
        auto* sp = SyncPoint::get_instance();
106
0
        sp->set_call_back("VNodeChannel::try_send_block", [](auto&& args) {
107
0
            LOG(INFO) << "injection VNodeChannel::try_send_block";
108
0
            auto* arg0 = try_any_cast<Status*>(args[0]);
109
0
            *arg0 = Status::InternalError<false>("test_cancel_node_channel injection error");
110
0
        });
111
0
        sp->set_call_back("VOlapTableSink::close",
112
0
                          [](auto&&) { std::this_thread::sleep_for(std::chrono::seconds(5)); });
113
0
    });
114
0
    suite_map.emplace("test_file_segment_cache_corruption", [] {
115
0
        auto* sp = SyncPoint::get_instance();
116
0
        sp->set_call_back("Segment::open:corruption", [](auto&& args) {
117
0
            LOG(INFO) << "injection Segment::open:corruption";
118
0
            auto* arg0 = try_any_cast<Status*>(args[0]);
119
0
            *arg0 = Status::Corruption<false>("test_file_segment_cache_corruption injection error");
120
0
        });
121
0
    });
122
0
    suite_map.emplace("test_file_segment_cache_corruption1", [] {
123
0
        auto* sp = SyncPoint::get_instance();
124
0
        sp->set_call_back("Segment::open:corruption1", [](auto&& args) {
125
0
            LOG(INFO) << "injection Segment::open:corruption1";
126
0
            auto* arg0 = try_any_cast<Status*>(args[0]);
127
0
            *arg0 = Status::Corruption<false>("test_file_segment_cache_corruption injection error");
128
0
        });
129
0
    });
130
    // curl "be_ip:http_port/api/injection_point/apply_suite/PageIO::read_and_decompress_page:crc_failure"
131
0
    suite_map.emplace("PageIO::read_and_decompress_page:crc_failure", [] {
132
0
        auto* sp = SyncPoint::get_instance();
133
0
        sp->set_call_back("PageIO::read_and_decompress_page:crc_failure_inj", [](auto&& args) {
134
0
            LOG(INFO) << "PageIO::read_and_decompress_page:crc_failure_inj";
135
0
            if (auto ctx = std::any_cast<segment_v2::InjectionContext*>(args[0])) {
136
0
                uint32_t* crc = ctx->crc;
137
0
                segment_v2::PageReadOptions* opts = ctx->opts;
138
0
                auto cached_file_reader =
139
0
                        dynamic_cast<io::CachedRemoteFileReader*>(opts->file_reader);
140
0
                if (cached_file_reader == nullptr) {
141
0
                    return; // if not cachedreader, then do nothing
142
0
                } else {
143
0
                    memset(crc, 0, 32);
144
0
                }
145
0
            } else {
146
0
                std::cerr << "Failed to cast std::any to InjectionContext*" << std::endl;
147
0
            }
148
0
        });
149
0
    });
150
    // curl be_ip:http_port/api/injection_point/apply_suite?name=test_cloud_meta_mgr_commit_txn'
151
0
    suite_map.emplace("test_cloud_meta_mgr_commit_txn", [] {
152
0
        auto* sp = SyncPoint::get_instance();
153
0
        sp->set_call_back("CloudMetaMgr::commit_txn", [](auto&& args) {
154
0
            LOG(INFO) << "injection CloudMetaMgr::commit_txn";
155
0
            auto* arg0 = try_any_cast_ret<Status>(args);
156
0
            arg0->first = Status::InternalError<false>(
157
0
                    "test_file_segment_cache_corruption injection error");
158
0
            arg0->second = true;
159
0
        });
160
0
    });
161
    // curl be_ip:http_port/api/injection_point/apply_suite?name=Segment::parse_footer:magic_number_corruption'
162
0
    suite_map.emplace("Segment::parse_footer:magic_number_corruption", [] {
163
0
        auto* sp = SyncPoint::get_instance();
164
0
        sp->set_call_back("Segment::parse_footer:magic_number_corruption_inj", [](auto&& args) {
165
0
            if (auto p = std::any_cast<uint8_t*>(args[0])) {
166
0
                memset(p, 0, 12);
167
0
            } else {
168
0
                std::cerr << "Failed to cast std::any to uint8_t*" << std::endl;
169
0
            }
170
0
        });
171
0
    });
172
0
}
173
174
0
void set_sleep(const std::string& point, HttpRequest* req) {
175
0
    int duration = 0;
176
0
    auto& duration_str = req->param("duration");
177
0
    if (!duration_str.empty()) {
178
0
        try {
179
0
            duration = std::stoi(duration_str);
180
0
        } catch (const std::exception&) {
181
0
            HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST,
182
0
                                    "invalid duration: " + duration_str);
183
0
            return;
184
0
        }
185
0
    }
186
0
    auto sp = SyncPoint::get_instance();
187
0
    sp->set_call_back(point, [point, duration](auto&& args) {
188
0
        LOG(INFO) << "injection point hit, point=" << point << " sleep milliseconds=" << duration;
189
0
        std::this_thread::sleep_for(std::chrono::milliseconds(duration));
190
0
    });
191
0
    HttpChannel::send_reply(req, HttpStatus::OK, "OK");
192
0
}
193
194
0
void set_return(const std::string& point, HttpRequest* req) {
195
0
    auto sp = SyncPoint::get_instance();
196
0
    sp->set_call_back(point, [point](auto&& args) {
197
0
        try {
198
0
            LOG(INFO) << "injection point hit, point=" << point << " return void";
199
0
            auto pred = try_any_cast<bool*>(args.back());
200
0
            *pred = true;
201
0
        } catch (const std::bad_any_cast&) {
202
0
            LOG_EVERY_N(ERROR, 10) << "failed to process `return` callback\n" << get_stack_trace();
203
0
        }
204
0
    });
205
0
    HttpChannel::send_reply(req, HttpStatus::OK, "OK");
206
0
}
207
208
0
void set_return_ok(const std::string& point, HttpRequest* req) {
209
0
    auto sp = SyncPoint::get_instance();
210
0
    sp->set_call_back(point, [point](auto&& args) {
211
0
        try {
212
0
            LOG(INFO) << "injection point hit, point=" << point << " return ok";
213
0
            auto* pair = try_any_cast_ret<Status>(args);
214
0
            pair->first = Status::OK();
215
0
            pair->second = true;
216
0
        } catch (const std::bad_any_cast&) {
217
0
            LOG_EVERY_N(ERROR, 10) << "failed to process `return_ok` callback\n"
218
0
                                   << get_stack_trace();
219
0
        }
220
0
    });
221
0
    HttpChannel::send_reply(req, HttpStatus::OK, "OK");
222
0
}
223
224
0
void set_return_error(const std::string& point, HttpRequest* req) {
225
0
    const std::string CODE_PARAM = "code";
226
0
    const std::string PROBABILITY_PARAM = "probability";
227
0
    int code = ErrorCode::INTERNAL_ERROR;
228
0
    double probability = 100.0;
229
0
    auto& code_str = req->param(CODE_PARAM);
230
0
    if (!code_str.empty()) {
231
0
        try {
232
0
            code = std::stoi(code_str);
233
0
        } catch (const std::exception& e) {
234
0
            HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST,
235
0
                                    fmt::format("convert topn failed, {}", e.what()));
236
0
            return;
237
0
        }
238
0
    }
239
0
    auto& probability_str = req->param(PROBABILITY_PARAM);
240
0
    if (!probability_str.empty()) {
241
0
        try {
242
0
            probability = std::stod(probability_str);
243
0
        } catch (const std::exception& e) {
244
0
            HttpChannel::send_reply(
245
0
                    req, HttpStatus::BAD_REQUEST,
246
0
                    fmt::format("invalid probability: {}, {}", probability_str, e.what()));
247
0
            return;
248
0
        }
249
0
        if (probability < 0.0 || probability > 100.0) {
250
0
            HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST,
251
0
                                    "probability should be between 0 and 100");
252
0
            return;
253
0
        }
254
0
    }
255
256
0
    auto sp = SyncPoint::get_instance();
257
0
    sp->set_call_back(point, [code, point, probability](auto&& args) {
258
0
        try {
259
0
            if (probability < 100.0) {
260
0
                static thread_local std::mt19937 gen(std::random_device {}());
261
0
                std::uniform_real_distribution<double> dist(0.0, 100.0);
262
0
                double dice = dist(gen);
263
0
                if (dice >= probability) {
264
0
                    LOG(INFO) << "injection point hit, point=" << point
265
0
                              << " skip error injection, probability=" << probability
266
0
                              << "%, random=" << dice;
267
0
                    return;
268
0
                }
269
0
            }
270
0
            LOG(INFO) << "injection point hit, point=" << point << " return error code=" << code;
271
0
            auto* pair = try_any_cast_ret<Status>(args);
272
0
            pair->first = Status::Error<false>(code, "injected error");
273
0
            pair->second = true;
274
0
        } catch (const std::bad_any_cast&) {
275
0
            LOG_EVERY_N(ERROR, 10) << "failed to process `return_error` callback\n"
276
0
                                   << get_stack_trace();
277
0
        }
278
0
    });
279
0
    HttpChannel::send_reply(req, HttpStatus::OK, "OK");
280
0
}
281
282
0
void set_segfault(const std::string& point, HttpRequest* req) {
283
0
    auto sp = SyncPoint::get_instance();
284
0
    sp->set_call_back(point, [point](auto&&) {
285
0
        LOG(INFO) << "injection point hit, point=" << point << " trigger segfault";
286
        // Intentional null dereference to crash the BE for testing.
287
0
        volatile int* p = nullptr;
288
0
        *p = 1;
289
0
    });
290
0
    HttpChannel::send_reply(req, HttpStatus::OK, "OK");
291
0
}
292
293
0
void handle_set(HttpRequest* req) {
294
0
    auto& point = req->param("name");
295
0
    if (point.empty()) {
296
0
        HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, "empty point name");
297
0
        return;
298
0
    }
299
0
    auto& behavior = req->param("behavior");
300
0
    if (behavior.empty()) {
301
0
        HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, "empty behavior");
302
0
        return;
303
0
    }
304
0
    if (behavior == "sleep") {
305
0
        set_sleep(point, req);
306
0
        return;
307
0
    } else if (behavior == "return") {
308
0
        set_return(point, req);
309
0
        return;
310
0
    } else if (behavior == "return_ok") {
311
0
        set_return_ok(point, req);
312
0
        return;
313
0
    } else if (behavior == "return_error") {
314
0
        set_return_error(point, req);
315
0
        return;
316
0
    } else if (behavior == "segfault") {
317
0
        set_segfault(point, req);
318
0
        return;
319
0
    }
320
0
    HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, "unknown behavior: " + behavior);
321
0
}
322
323
0
void handle_clear(HttpRequest* req) {
324
0
    const auto& point = req->param("name");
325
0
    auto* sp = SyncPoint::get_instance();
326
0
    LOG(INFO) << "clear injection point : " << (point.empty() ? "(all points)" : point);
327
0
    if (point.empty()) {
328
        // If point name is emtpy, clear all
329
0
        sp->clear_all_call_backs();
330
0
        HttpChannel::send_reply(req, HttpStatus::OK, "OK");
331
0
        return;
332
0
    }
333
334
0
    sp->clear_call_back(point);
335
0
    HttpChannel::send_reply(req, HttpStatus::OK, "OK");
336
0
}
337
338
0
void handle_apply_suite(HttpRequest* req) {
339
0
    auto& suite = req->param("name");
340
0
    if (suite.empty()) {
341
0
        HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, "empty suite name");
342
0
        return;
343
0
    }
344
345
0
    std::call_once(register_suites_once, register_suites);
346
0
    if (auto it = suite_map.find(suite); it != suite_map.end()) {
347
0
        it->second(); // set injection callbacks
348
0
        HttpChannel::send_reply(req, HttpStatus::OK, "OK apply suite " + suite + "\n");
349
0
        return;
350
0
    }
351
0
    HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
352
0
                            "unknown suite: " + suite + "\n");
353
0
}
354
355
0
void handle_enable(HttpRequest* req) {
356
0
    SyncPoint::get_instance()->enable_processing();
357
0
    HttpChannel::send_reply(req, HttpStatus::OK, "OK");
358
0
}
359
360
0
void handle_disable(HttpRequest* req) {
361
0
    SyncPoint::get_instance()->disable_processing();
362
0
    HttpChannel::send_reply(req, HttpStatus::OK, "OK");
363
0
}
364
365
} // namespace
366
367
0
InjectionPointAction::InjectionPointAction() = default;
368
369
//
370
// enable/disable injection point
371
// ```
372
// curl "be_ip:http_port/api/injection_point/enable"
373
// curl "be_ip:http_port/api/injection_point/disable"
374
// ```
375
//
376
// clear all injection points
377
// ```
378
// curl "be_ip:http_port/api/injection_point/clear"
379
// ```
380
//
381
// apply/activate specific suite with registered action, see `register_suites()` for more details
382
// ```
383
// curl "be_ip:http_port/api/injection_point/apply_suite?name=${suite_name}"
384
// ```
385
//
386
// set predifined action for specific injection point, supported actions are:
387
// * sleep: for injection point with callback, accepted param is `duration` in milliseconds
388
// * return: for injection point without return value (return void)
389
// * return_ok: for injection point with return value, always return Status::OK
390
// * return_error: for injection point with return value, accepted param is `code`,
391
//                 which is an int, valid values can be found in status.h, e.g. -235 or -230,
392
//                 if `code` is not present return Status::InternalError. Optional `probability`
393
//                 determines the percentage of times to inject the error (default 100).
394
// * segfault: dereference a null pointer to crash BE intentionally
395
// ```
396
// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=sleep&duration=${x_millsec}" # sleep x millisecs
397
// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=return" # return void
398
// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=return_ok" # return ok
399
// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=return_error" # internal error
400
// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=return_error&code=${code}" # -235
401
// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=return_error&code=${code}&probability=50" # inject with 50% probability
402
// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=segfault" # crash BE
403
// ```
404
0
void InjectionPointAction::handle(HttpRequest* req) {
405
0
    LOG(INFO) << "handle InjectionPointAction " << req->debug_string();
406
0
    auto& op = req->param("op");
407
0
    if (op == "set") {
408
0
        handle_set(req);
409
0
        return;
410
0
    } else if (op == "clear") {
411
0
        handle_clear(req);
412
0
        return;
413
0
    } else if (op == "apply_suite") {
414
0
        handle_apply_suite(req);
415
0
        return;
416
0
    } else if (op == "enable") {
417
0
        handle_enable(req);
418
0
        return;
419
0
    } else if (op == "disable") {
420
0
        handle_disable(req);
421
0
        return;
422
0
    }
423
424
0
    HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, "unknown op: " + op);
425
0
}
426
427
} // namespace doris