be/src/runtime/cdc_client_mgr.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "runtime/cdc_client_mgr.h" |
19 | | |
20 | | #include <brpc/closure_guard.h> |
21 | | #include <fcntl.h> |
22 | | #include <fmt/core.h> |
23 | | #include <gen_cpp/internal_service.pb.h> |
24 | | #include <google/protobuf/stubs/callback.h> |
25 | | #include <signal.h> |
26 | | #include <sys/stat.h> |
27 | | #include <sys/wait.h> |
28 | | #include <unistd.h> |
29 | | |
30 | | #include <cstdio> |
31 | | #ifndef __APPLE__ |
32 | | #include <sys/prctl.h> |
33 | | #endif |
34 | | |
35 | | #include <atomic> |
36 | | #include <chrono> |
37 | | #include <iterator> |
38 | | #include <mutex> |
39 | | #include <sstream> |
40 | | #include <string> |
41 | | #include <thread> |
42 | | #include <vector> |
43 | | |
44 | | #include "common/config.h" |
45 | | #include "common/logging.h" |
46 | | #include "common/status.h" |
47 | | #include "runtime/exec_env.h" |
48 | | #include "service/http/http_client.h" |
49 | | |
50 | | namespace doris { |
51 | | |
52 | | namespace { |
53 | | // Handle SIGCHLD signal to prevent zombie processes |
54 | 96 | void handle_sigchld(int sig_no) { |
55 | 96 | int status = 0; |
56 | 96 | pid_t pid; |
57 | 96 | while ((pid = waitpid(-1, &status, WNOHANG)) > 0) { |
58 | 0 | } |
59 | 96 | } |
60 | | |
61 | | // Check CDC client health |
62 | | #ifndef BE_TEST |
63 | 30 | Status check_cdc_client_health(int retry_times, int sleep_time, std::string& health_response) { |
64 | 30 | const std::string cdc_health_url = |
65 | 30 | "http://127.0.0.1:" + std::to_string(doris::config::cdc_client_port) + |
66 | 30 | "/actuator/health"; |
67 | | |
68 | 31 | auto health_request = [cdc_health_url, &health_response](HttpClient* client) { |
69 | 31 | RETURN_IF_ERROR(client->init(cdc_health_url)); |
70 | 31 | client->set_timeout_ms(5000); |
71 | 31 | RETURN_IF_ERROR(client->execute(&health_response)); |
72 | 29 | return Status::OK(); |
73 | 31 | }; |
74 | | |
75 | 30 | Status status = HttpClient::execute_with_retry(retry_times, sleep_time, health_request); |
76 | | |
77 | 30 | if (!status.ok()) { |
78 | 1 | return Status::InternalError("CDC client health check failed"); |
79 | 1 | } |
80 | | |
81 | 29 | bool is_up = health_response.find("UP") != std::string::npos; |
82 | | |
83 | 29 | if (!is_up) { |
84 | 0 | return Status::InternalError(fmt::format("CDC client unhealthy: {}", health_response)); |
85 | 0 | } |
86 | | |
87 | 29 | return Status::OK(); |
88 | 29 | } |
89 | | #endif |
90 | | |
91 | | } // anonymous namespace |
92 | | |
93 | 35 | CdcClientMgr::CdcClientMgr() = default; |
94 | | |
95 | 31 | CdcClientMgr::~CdcClientMgr() { |
96 | 31 | stop(); |
97 | 31 | } |
98 | | |
99 | 46 | void CdcClientMgr::stop() { |
100 | 46 | pid_t pid = _child_pid.load(); |
101 | 46 | if (pid > 0) { |
102 | | // Check if process is still alive |
103 | 24 | if (kill(pid, 0) == 0) { |
104 | 2 | LOG(INFO) << "Stopping CDC client process, pid=" << pid; |
105 | | // Send SIGTERM for graceful shutdown |
106 | 2 | kill(pid, SIGTERM); |
107 | | // Wait a short time for graceful shutdown |
108 | 2 | std::this_thread::sleep_for(std::chrono::milliseconds(200)); |
109 | | // Force kill if still alive |
110 | 2 | if (kill(pid, 0) == 0) { |
111 | 1 | LOG(INFO) << "Force killing CDC client process, pid=" << pid; |
112 | 1 | kill(pid, SIGKILL); |
113 | 1 | int status = 0; |
114 | 1 | waitpid(pid, &status, 0); |
115 | 1 | } |
116 | 2 | } |
117 | 24 | _child_pid.store(0); |
118 | 24 | } |
119 | | |
120 | 46 | LOG(INFO) << "CdcClientMgr is stopped"; |
121 | 46 | } |
122 | | |
123 | 29 | Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) { |
124 | 29 | std::lock_guard<std::mutex> lock(_start_mutex); |
125 | | |
126 | 29 | Status st = Status::OK(); |
127 | 29 | pid_t exist_pid = _child_pid.load(); |
128 | 29 | if (exist_pid > 0) { |
129 | | #ifdef BE_TEST |
130 | | // In test mode, directly return OK if PID exists |
131 | | LOG(INFO) << "cdc client already started (BE_TEST mode), pid=" << exist_pid; |
132 | | return Status::OK(); |
133 | | #else |
134 | | // Check if process is still alive |
135 | 28 | if (kill(exist_pid, 0) == 0) { |
136 | | // Process exists, verify it's actually our CDC client by health check |
137 | 28 | std::string check_response; |
138 | 28 | auto check_st = check_cdc_client_health(3, 1, check_response); |
139 | 28 | if (check_st.ok()) { |
140 | | // Process exists and responding, CDC client is running |
141 | 28 | return Status::OK(); |
142 | 28 | } else { |
143 | | // Process exists but CDC client not responding |
144 | | // Either it's a different process (PID reused) or CDC client is unhealthy |
145 | 0 | st = Status::InternalError(fmt::format("CDC client {} unresponsive", exist_pid)); |
146 | 0 | st.to_protobuf(result->mutable_status()); |
147 | 0 | return st; |
148 | 0 | } |
149 | 28 | } else { |
150 | 0 | LOG(INFO) << "CDC client is dead, pid=" << exist_pid; |
151 | | // Process is dead, reset PID and continue to start |
152 | 0 | _child_pid.store(0); |
153 | 0 | } |
154 | 28 | #endif |
155 | 28 | } else if (!_adopted_external.load()) { |
156 | 1 | LOG(INFO) << "CDC client has never been started"; |
157 | 1 | } |
158 | | |
159 | 1 | #ifndef BE_TEST |
160 | | // Adopt an externally-managed cdc_client if the port already answers |
161 | | // healthy (e.g. one started manually for debug / hotfix). |
162 | 1 | { |
163 | 1 | std::string adopt_response; |
164 | 1 | if (check_cdc_client_health(1, 0, adopt_response).ok()) { |
165 | 0 | if (!_adopted_external.exchange(true)) { |
166 | 0 | LOG(INFO) << "Adopting external cdc client on port " |
167 | 0 | << doris::config::cdc_client_port; |
168 | 0 | } |
169 | 0 | return Status::OK(); |
170 | 0 | } |
171 | 1 | } |
172 | 1 | _adopted_external.store(false); |
173 | 1 | #endif |
174 | | |
175 | 1 | const char* doris_home = getenv("DORIS_HOME"); |
176 | 1 | const char* log_dir = getenv("LOG_DIR"); |
177 | 1 | const std::string cdc_jar_path = std::string(doris_home) + "/lib/cdc_client/cdc-client.jar"; |
178 | 1 | const std::string cdc_jar_port = |
179 | 1 | "--server.port=" + std::to_string(doris::config::cdc_client_port); |
180 | 1 | const std::string backend_http_port = |
181 | 1 | "--backend.http.port=" + std::to_string(config::webserver_port); |
182 | 1 | const std::string cluster_token = "--cluster.token=" + ExecEnv::GetInstance()->token(); |
183 | 1 | const std::string java_opts = "-Dlog.path=" + std::string(log_dir); |
184 | | |
185 | | // check cdc jar exists |
186 | 1 | struct stat buffer; |
187 | 1 | if (stat(cdc_jar_path.c_str(), &buffer) != 0) { |
188 | 0 | st = Status::InternalError("Can not find cdc-client.jar."); |
189 | 0 | st.to_protobuf(result->mutable_status()); |
190 | 0 | return st; |
191 | 0 | } |
192 | | |
193 | | // Ready to start cdc client |
194 | 1 | LOG(INFO) << "Ready to start cdc client"; |
195 | 1 | const auto* java_home = getenv("JAVA_HOME"); |
196 | 1 | if (!java_home) { |
197 | 0 | st = Status::InternalError("Can not find JAVA_HOME"); |
198 | 0 | st.to_protobuf(result->mutable_status()); |
199 | 0 | return st; |
200 | 0 | } |
201 | 1 | std::string path(java_home); |
202 | 1 | std::string java_bin = path + "/bin/java"; |
203 | | |
204 | | // Pre-build everything the child needs before fork(): heap allocation after |
205 | | // fork() in a multi-threaded process can deadlock on inherited libc locks. |
206 | 1 | std::vector<std::string> argv_storage; |
207 | 1 | argv_storage.emplace_back("java"); |
208 | 1 | const std::string user_java_opts = doris::config::cdc_client_java_opts; |
209 | 1 | if (!user_java_opts.empty()) { |
210 | 0 | std::istringstream iss(user_java_opts); |
211 | 0 | argv_storage.insert(argv_storage.end(), std::istream_iterator<std::string>(iss), |
212 | 0 | std::istream_iterator<std::string>()); |
213 | 0 | } |
214 | 1 | argv_storage.emplace_back(java_opts); |
215 | | // OOM safety net (last-wins, user opts cannot disable). |
216 | 1 | argv_storage.emplace_back("-XX:+ExitOnOutOfMemoryError"); |
217 | 1 | argv_storage.emplace_back("-jar"); |
218 | 1 | argv_storage.emplace_back(cdc_jar_path); |
219 | 1 | argv_storage.emplace_back(cdc_jar_port); |
220 | 1 | argv_storage.emplace_back(backend_http_port); |
221 | 1 | argv_storage.emplace_back(cluster_token); |
222 | | |
223 | 1 | std::vector<char*> argv; |
224 | 1 | argv.reserve(argv_storage.size() + 1); |
225 | 8 | for (auto& s : argv_storage) { |
226 | 8 | argv.push_back(const_cast<char*>(s.c_str())); |
227 | 8 | } |
228 | 1 | argv.push_back(nullptr); |
229 | | |
230 | 1 | const std::string cdc_out_file = std::string(log_dir) + "/cdc-client.out"; |
231 | | |
232 | 1 | struct sigaction act; |
233 | 1 | act.sa_flags = 0; |
234 | 1 | act.sa_handler = handle_sigchld; |
235 | 1 | sigaction(SIGCHLD, &act, NULL); |
236 | 1 | LOG(INFO) << "Start to fork cdc client process with " << path; |
237 | | #ifdef BE_TEST |
238 | | _child_pid.store(99999); |
239 | | st = Status::OK(); |
240 | | return st; |
241 | | #else |
242 | 1 | pid_t pid = fork(); |
243 | 1 | if (pid < 0) { |
244 | 0 | st = Status::InternalError("Fork cdc client failed."); |
245 | 0 | st.to_protobuf(result->mutable_status()); |
246 | 0 | return st; |
247 | 1 | } else if (pid == 0) { |
248 | | // Child: async-signal-safe operations only until execv(). |
249 | 0 | #ifndef __APPLE__ |
250 | 0 | prctl(PR_SET_PDEATHSIG, SIGKILL); |
251 | 0 | #endif |
252 | 0 | int out_fd = open(cdc_out_file.c_str(), O_WRONLY | O_CREAT | O_APPEND | O_CLOEXEC, 0644); |
253 | 0 | if (out_fd < 0) { |
254 | 0 | perror("open cdc-client.out file failed"); |
255 | 0 | _exit(1); |
256 | 0 | } |
257 | 0 | dup2(out_fd, STDOUT_FILENO); |
258 | 0 | dup2(out_fd, STDERR_FILENO); |
259 | 0 | close(out_fd); |
260 | 0 | execv(java_bin.c_str(), argv.data()); |
261 | 0 | perror("Cdc client child process error"); |
262 | 0 | _exit(1); |
263 | 1 | } else { |
264 | | // Parent process: save PID and wait for startup |
265 | 1 | _child_pid.store(pid); |
266 | | |
267 | | // Waiting for cdc to start, failed after more than 3 * 10 seconds |
268 | 1 | std::string health_response; |
269 | 1 | Status status = check_cdc_client_health(3, 10, health_response); |
270 | 1 | if (!status.ok()) { |
271 | | // Reset PID if startup failed |
272 | 0 | _child_pid.store(0); |
273 | 0 | st = Status::InternalError("Start cdc client failed."); |
274 | 0 | st.to_protobuf(result->mutable_status()); |
275 | 1 | } else if (kill(pid, 0) != 0) { |
276 | | // Port healthy but our child has exited: an external process is |
277 | | // answering. Treat as adoption instead of masking dead PID as success. |
278 | 0 | _child_pid.store(0); |
279 | 0 | if (!_adopted_external.exchange(true)) { |
280 | 0 | LOG(INFO) << "Forked cdc client " << pid << " exited but port " |
281 | 0 | << doris::config::cdc_client_port |
282 | 0 | << " is healthy, adopting external instance"; |
283 | 0 | } |
284 | 1 | } else { |
285 | 1 | _adopted_external.store(false); |
286 | 1 | LOG(INFO) << "Start cdc client success, pid=" << pid |
287 | 1 | << ", status=" << status.to_string() << ", response=" << health_response; |
288 | 1 | } |
289 | 1 | } |
290 | 1 | #endif //BE_TEST |
291 | 1 | return st; |
292 | 1 | } |
293 | | |
294 | | void CdcClientMgr::request_cdc_client_impl(const PRequestCdcClientRequest* request, |
295 | | PRequestCdcClientResult* result, |
296 | 36 | google::protobuf::Closure* done) { |
297 | 36 | brpc::ClosureGuard closure_guard(done); |
298 | | |
299 | | // Start CDC client if not started |
300 | 36 | Status start_st = start_cdc_client(result); |
301 | 36 | if (!start_st.ok()) { |
302 | 1 | LOG(ERROR) << "Failed to start CDC client, status=" << start_st.to_string(); |
303 | 1 | start_st.to_protobuf(result->mutable_status()); |
304 | 1 | return; |
305 | 1 | } |
306 | | |
307 | 35 | std::string cdc_response; |
308 | 35 | Status st = send_request_to_cdc_client(request->api(), request->params(), &cdc_response); |
309 | 35 | result->set_response(cdc_response); |
310 | 35 | st.to_protobuf(result->mutable_status()); |
311 | 35 | } |
312 | | |
313 | | Status CdcClientMgr::send_request_to_cdc_client(const std::string& api, |
314 | | const std::string& params_body, |
315 | 41 | std::string* response) { |
316 | 41 | std::string remote_url_prefix = |
317 | 41 | fmt::format("http://127.0.0.1:{}{}", doris::config::cdc_client_port, api); |
318 | | |
319 | 65 | auto cdc_request = [&remote_url_prefix, response, ¶ms_body](HttpClient* client) { |
320 | 65 | RETURN_IF_ERROR(client->init(remote_url_prefix)); |
321 | 65 | client->set_timeout_ms(doris::config::request_cdc_client_timeout_ms); |
322 | 65 | if (!params_body.empty()) { |
323 | 59 | client->set_payload(params_body); |
324 | 59 | } |
325 | 65 | client->set_content_type("application/json"); |
326 | 65 | client->set_method(POST); |
327 | 65 | RETURN_IF_ERROR(client->execute(response)); |
328 | 29 | return Status::OK(); |
329 | 65 | }; |
330 | | |
331 | 41 | return HttpClient::execute_with_retry(3, 1, cdc_request); |
332 | 41 | } |
333 | | |
334 | | } // namespace doris |