be/src/runtime/cdc_client_mgr.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "runtime/cdc_client_mgr.h" |
19 | | |
20 | | #include <brpc/closure_guard.h> |
21 | | #include <fcntl.h> |
22 | | #include <fmt/core.h> |
23 | | #include <gen_cpp/internal_service.pb.h> |
24 | | #include <google/protobuf/stubs/callback.h> |
25 | | #include <signal.h> |
26 | | #include <sys/stat.h> |
27 | | #include <sys/wait.h> |
28 | | #include <unistd.h> |
29 | | |
30 | | #include <cstdio> |
31 | | #ifndef __APPLE__ |
32 | | #include <sys/prctl.h> |
33 | | #endif |
34 | | |
35 | | #include <atomic> |
36 | | #include <chrono> |
37 | | #include <mutex> |
38 | | #include <string> |
39 | | #include <thread> |
40 | | |
41 | | #include "common/config.h" |
42 | | #include "common/logging.h" |
43 | | #include "common/status.h" |
44 | | #include "runtime/exec_env.h" |
45 | | #include "service/http/http_client.h" |
46 | | |
47 | | namespace doris { |
48 | | |
49 | | namespace { |
50 | | // Handle SIGCHLD signal to prevent zombie processes |
51 | 215 | void handle_sigchld(int sig_no) { |
52 | 215 | int status = 0; |
53 | 215 | pid_t pid; |
54 | 283 | while ((pid = waitpid(-1, &status, WNOHANG)) > 0) { |
55 | 68 | } |
56 | 215 | } |
57 | | |
58 | | // Check CDC client health |
59 | | #ifndef BE_TEST |
60 | 836 | Status check_cdc_client_health(int retry_times, int sleep_time, std::string& health_response) { |
61 | 836 | const std::string cdc_health_url = |
62 | 836 | "http://127.0.0.1:" + std::to_string(doris::config::cdc_client_port) + |
63 | 836 | "/actuator/health"; |
64 | | |
65 | 838 | auto health_request = [cdc_health_url, &health_response](HttpClient* client) { |
66 | 838 | RETURN_IF_ERROR(client->init(cdc_health_url)); |
67 | 838 | client->set_timeout_ms(5000); |
68 | 838 | RETURN_IF_ERROR(client->execute(&health_response)); |
69 | 836 | return Status::OK(); |
70 | 838 | }; |
71 | | |
72 | 836 | Status status = HttpClient::execute_with_retry(retry_times, sleep_time, health_request); |
73 | | |
74 | 836 | if (!status.ok()) { |
75 | 0 | return Status::InternalError("CDC client health check failed"); |
76 | 0 | } |
77 | | |
78 | 836 | bool is_up = health_response.find("UP") != std::string::npos; |
79 | | |
80 | 836 | if (!is_up) { |
81 | 0 | return Status::InternalError(fmt::format("CDC client unhealthy: {}", health_response)); |
82 | 0 | } |
83 | | |
84 | 836 | return Status::OK(); |
85 | 836 | } |
86 | | #endif |
87 | | |
88 | | } // anonymous namespace |
89 | | |
90 | 36 | CdcClientMgr::CdcClientMgr() = default; |
91 | | |
92 | 32 | CdcClientMgr::~CdcClientMgr() { |
93 | 32 | stop(); |
94 | 32 | } |
95 | | |
96 | 47 | void CdcClientMgr::stop() { |
97 | 47 | pid_t pid = _child_pid.load(); |
98 | 47 | if (pid > 0) { |
99 | | // Check if process is still alive |
100 | 24 | if (kill(pid, 0) == 0) { |
101 | 2 | LOG(INFO) << "Stopping CDC client process, pid=" << pid; |
102 | | // Send SIGTERM for graceful shutdown |
103 | 2 | kill(pid, SIGTERM); |
104 | | // Wait a short time for graceful shutdown |
105 | 2 | std::this_thread::sleep_for(std::chrono::milliseconds(200)); |
106 | | // Force kill if still alive |
107 | 2 | if (kill(pid, 0) == 0) { |
108 | 1 | LOG(INFO) << "Force killing CDC client process, pid=" << pid; |
109 | 1 | kill(pid, SIGKILL); |
110 | 1 | int status = 0; |
111 | 1 | waitpid(pid, &status, 0); |
112 | 1 | } |
113 | 2 | } |
114 | 24 | _child_pid.store(0); |
115 | 24 | } |
116 | | |
117 | 47 | LOG(INFO) << "CdcClientMgr is stopped"; |
118 | 47 | } |
119 | | |
120 | 836 | Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) { |
121 | 836 | std::lock_guard<std::mutex> lock(_start_mutex); |
122 | | |
123 | 836 | Status st = Status::OK(); |
124 | 836 | pid_t exist_pid = _child_pid.load(); |
125 | 836 | if (exist_pid > 0) { |
126 | | #ifdef BE_TEST |
127 | | // In test mode, directly return OK if PID exists |
128 | | LOG(INFO) << "cdc client already started (BE_TEST mode), pid=" << exist_pid; |
129 | | return Status::OK(); |
130 | | #else |
131 | | // Check if process is still alive |
132 | 834 | if (kill(exist_pid, 0) == 0) { |
133 | | // Process exists, verify it's actually our CDC client by health check |
134 | 834 | std::string check_response; |
135 | 834 | auto check_st = check_cdc_client_health(3, 1, check_response); |
136 | 834 | if (check_st.ok()) { |
137 | | // Process exists and responding, CDC client is running |
138 | 834 | return Status::OK(); |
139 | 834 | } else { |
140 | | // Process exists but CDC client not responding |
141 | | // Either it's a different process (PID reused) or CDC client is unhealthy |
142 | 0 | st = Status::InternalError(fmt::format("CDC client {} unresponsive", exist_pid)); |
143 | 0 | st.to_protobuf(result->mutable_status()); |
144 | 0 | return st; |
145 | 0 | } |
146 | 834 | } else { |
147 | 0 | LOG(INFO) << "CDC client is dead, pid=" << exist_pid; |
148 | | // Process is dead, reset PID and continue to start |
149 | 0 | _child_pid.store(0); |
150 | 0 | } |
151 | 834 | #endif |
152 | 834 | } else { |
153 | 2 | LOG(INFO) << "CDC client has never been started"; |
154 | 2 | } |
155 | | |
156 | 2 | const char* doris_home = getenv("DORIS_HOME"); |
157 | 2 | const char* log_dir = getenv("LOG_DIR"); |
158 | 2 | const std::string cdc_jar_path = std::string(doris_home) + "/lib/cdc_client/cdc-client.jar"; |
159 | 2 | const std::string cdc_jar_port = |
160 | 2 | "--server.port=" + std::to_string(doris::config::cdc_client_port); |
161 | 2 | const std::string backend_http_port = |
162 | 2 | "--backend.http.port=" + std::to_string(config::webserver_port); |
163 | 2 | const std::string cluster_token = "--cluster.token=" + ExecEnv::GetInstance()->token(); |
164 | 2 | const std::string java_opts = "-Dlog.path=" + std::string(log_dir); |
165 | | |
166 | | // check cdc jar exists |
167 | 2 | struct stat buffer; |
168 | 2 | if (stat(cdc_jar_path.c_str(), &buffer) != 0) { |
169 | 0 | st = Status::InternalError("Can not find cdc-client.jar."); |
170 | 0 | st.to_protobuf(result->mutable_status()); |
171 | 0 | return st; |
172 | 0 | } |
173 | | |
174 | | // Ready to start cdc client |
175 | 2 | LOG(INFO) << "Ready to start cdc client"; |
176 | 2 | const auto* java_home = getenv("JAVA_HOME"); |
177 | 2 | if (!java_home) { |
178 | 0 | st = Status::InternalError("Can not find JAVA_HOME"); |
179 | 0 | st.to_protobuf(result->mutable_status()); |
180 | 0 | return st; |
181 | 0 | } |
182 | 2 | std::string path(java_home); |
183 | 2 | std::string java_bin = path + "/bin/java"; |
184 | | // Capture signal to prevent child process from becoming a zombie process |
185 | 2 | struct sigaction act; |
186 | 2 | act.sa_flags = 0; |
187 | 2 | act.sa_handler = handle_sigchld; |
188 | 2 | sigaction(SIGCHLD, &act, NULL); |
189 | 2 | LOG(INFO) << "Start to fork cdc client process with " << path; |
190 | | #ifdef BE_TEST |
191 | | _child_pid.store(99999); |
192 | | st = Status::OK(); |
193 | | return st; |
194 | | #else |
195 | 2 | pid_t pid = fork(); |
196 | 2 | if (pid < 0) { |
197 | | // Fork failed |
198 | 0 | st = Status::InternalError("Fork cdc client failed."); |
199 | 0 | st.to_protobuf(result->mutable_status()); |
200 | 0 | return st; |
201 | 2 | } else if (pid == 0) { |
202 | | // Child process |
203 | | // When the parent process is killed, the child process also needs to exit |
204 | 0 | #ifndef __APPLE__ |
205 | 0 | prctl(PR_SET_PDEATHSIG, SIGKILL); |
206 | 0 | #endif |
207 | | // Redirect stdout and stderr to log out file |
208 | 0 | std::string cdc_out_file = std::string(log_dir) + "/cdc-client.out"; |
209 | 0 | int out_fd = open(cdc_out_file.c_str(), O_WRONLY | O_CREAT | O_APPEND | O_CLOEXEC, 0644); |
210 | 0 | if (out_fd < 0) { |
211 | 0 | perror("open cdc-client.out file failed"); |
212 | 0 | exit(1); |
213 | 0 | } |
214 | 0 | dup2(out_fd, STDOUT_FILENO); |
215 | 0 | dup2(out_fd, STDERR_FILENO); |
216 | 0 | close(out_fd); |
217 | | |
218 | | // java -jar -Dlog.path=xx cdc-client.jar --server.port=9096 --backend.http.port=8040 |
219 | 0 | execlp(java_bin.c_str(), "java", java_opts.c_str(), "-jar", cdc_jar_path.c_str(), |
220 | 0 | cdc_jar_port.c_str(), backend_http_port.c_str(), cluster_token.c_str(), (char*)NULL); |
221 | | // If execlp returns, it means it failed |
222 | 0 | perror("Cdc client child process error"); |
223 | 0 | exit(1); |
224 | 2 | } else { |
225 | | // Parent process: save PID and wait for startup |
226 | 2 | _child_pid.store(pid); |
227 | | |
228 | | // Waiting for cdc to start, failed after more than 3 * 10 seconds |
229 | 2 | std::string health_response; |
230 | 2 | Status status = check_cdc_client_health(3, 10, health_response); |
231 | 2 | if (!status.ok()) { |
232 | | // Reset PID if startup failed |
233 | 0 | _child_pid.store(0); |
234 | 0 | st = Status::InternalError("Start cdc client failed."); |
235 | 0 | st.to_protobuf(result->mutable_status()); |
236 | 2 | } else { |
237 | 2 | LOG(INFO) << "Start cdc client success, pid=" << pid |
238 | 2 | << ", status=" << status.to_string() << ", response=" << health_response; |
239 | 2 | } |
240 | 2 | } |
241 | 2 | #endif //BE_TEST |
242 | 2 | return st; |
243 | 2 | } |
244 | | |
245 | | void CdcClientMgr::request_cdc_client_impl(const PRequestCdcClientRequest* request, |
246 | | PRequestCdcClientResult* result, |
247 | 843 | google::protobuf::Closure* done) { |
248 | 843 | brpc::ClosureGuard closure_guard(done); |
249 | | |
250 | | // Start CDC client if not started |
251 | 843 | Status start_st = start_cdc_client(result); |
252 | 843 | if (!start_st.ok()) { |
253 | 1 | LOG(ERROR) << "Failed to start CDC client, status=" << start_st.to_string(); |
254 | 1 | start_st.to_protobuf(result->mutable_status()); |
255 | 1 | return; |
256 | 1 | } |
257 | | |
258 | 842 | std::string cdc_response; |
259 | 842 | Status st = send_request_to_cdc_client(request->api(), request->params(), &cdc_response); |
260 | 842 | result->set_response(cdc_response); |
261 | 842 | st.to_protobuf(result->mutable_status()); |
262 | 842 | } |
263 | | |
264 | | Status CdcClientMgr::send_request_to_cdc_client(const std::string& api, |
265 | | const std::string& params_body, |
266 | 848 | std::string* response) { |
267 | 848 | std::string remote_url_prefix = |
268 | 848 | fmt::format("http://127.0.0.1:{}{}", doris::config::cdc_client_port, api); |
269 | | |
270 | 871 | auto cdc_request = [&remote_url_prefix, response, ¶ms_body](HttpClient* client) { |
271 | 871 | RETURN_IF_ERROR(client->init(remote_url_prefix)); |
272 | 871 | client->set_timeout_ms(doris::config::request_cdc_client_timeout_ms); |
273 | 871 | if (!params_body.empty()) { |
274 | 865 | client->set_payload(params_body); |
275 | 865 | } |
276 | 871 | client->set_content_type("application/json"); |
277 | 871 | client->set_method(POST); |
278 | 871 | RETURN_IF_ERROR(client->execute(response)); |
279 | 835 | return Status::OK(); |
280 | 871 | }; |
281 | | |
282 | 848 | return HttpClient::execute_with_retry(3, 1, cdc_request); |
283 | 848 | } |
284 | | |
285 | | } // namespace doris |