Coverage Report

Created: 2026-06-17 11:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/cdc_client_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/cdc_client_mgr.h"
19
20
#include <brpc/closure_guard.h>
21
#include <fcntl.h>
22
#include <fmt/core.h>
23
#include <gen_cpp/internal_service.pb.h>
24
#include <google/protobuf/stubs/callback.h>
25
#include <signal.h>
26
#include <sys/stat.h>
27
#include <sys/wait.h>
28
#include <unistd.h>
29
30
#include <cstdio>
31
#ifndef __APPLE__
32
#include <sys/prctl.h>
33
#endif
34
35
#include <atomic>
36
#include <chrono>
37
#include <iterator>
38
#include <mutex>
39
#include <sstream>
40
#include <string>
41
#include <thread>
42
#include <vector>
43
44
#include "common/config.h"
45
#include "common/logging.h"
46
#include "common/status.h"
47
#include "runtime/exec_env.h"
48
#include "service/http/http_client.h"
49
50
namespace doris {
51
52
namespace {
53
// Handle SIGCHLD signal to prevent zombie processes
54
159
void handle_sigchld(int sig_no) {
55
159
    int status = 0;
56
159
    pid_t pid;
57
220
    while ((pid = waitpid(-1, &status, WNOHANG)) > 0) {
58
61
    }
59
159
}
60
61
// Check CDC client health
62
#ifndef BE_TEST
63
Status check_cdc_client_health(int retry_times, int sleep_time, std::string& health_response) {
64
    const std::string cdc_health_url =
65
            "http://127.0.0.1:" + std::to_string(doris::config::cdc_client_port) +
66
            "/actuator/health";
67
68
    auto health_request = [cdc_health_url, &health_response](HttpClient* client) {
69
        RETURN_IF_ERROR(client->init(cdc_health_url));
70
        client->set_timeout_ms(5000);
71
        RETURN_IF_ERROR(client->execute(&health_response));
72
        return Status::OK();
73
    };
74
75
    Status status = HttpClient::execute_with_retry(retry_times, sleep_time, health_request);
76
77
    if (!status.ok()) {
78
        return Status::InternalError("CDC client health check failed");
79
    }
80
81
    bool is_up = health_response.find("UP") != std::string::npos;
82
83
    if (!is_up) {
84
        return Status::InternalError(fmt::format("CDC client unhealthy: {}", health_response));
85
    }
86
87
    return Status::OK();
88
}
89
#endif
90
91
} // anonymous namespace
92
93
31
CdcClientMgr::CdcClientMgr() = default;
94
95
31
CdcClientMgr::~CdcClientMgr() {
96
31
    stop();
97
31
}
98
99
46
void CdcClientMgr::stop() {
100
46
    pid_t pid = _child_pid.load();
101
46
    if (pid > 0) {
102
        // Check if process is still alive
103
24
        if (kill(pid, 0) == 0) {
104
2
            LOG(INFO) << "Stopping CDC client process, pid=" << pid;
105
            // Send SIGTERM for graceful shutdown
106
2
            kill(pid, SIGTERM);
107
            // Wait a short time for graceful shutdown
108
2
            std::this_thread::sleep_for(std::chrono::milliseconds(200));
109
            // Force kill if still alive
110
2
            if (kill(pid, 0) == 0) {
111
1
                LOG(INFO) << "Force killing CDC client process, pid=" << pid;
112
1
                kill(pid, SIGKILL);
113
1
                int status = 0;
114
1
                waitpid(pid, &status, 0);
115
1
            }
116
2
        }
117
24
        _child_pid.store(0);
118
24
    }
119
120
46
    LOG(INFO) << "CdcClientMgr is stopped";
121
46
}
122
123
35
Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
124
35
    std::lock_guard<std::mutex> lock(_start_mutex);
125
126
35
    Status st = Status::OK();
127
35
    pid_t exist_pid = _child_pid.load();
128
35
    if (exist_pid > 0) {
129
8
#ifdef BE_TEST
130
        // In test mode, directly return OK if PID exists
131
8
        LOG(INFO) << "cdc client already started (BE_TEST mode), pid=" << exist_pid;
132
8
        return Status::OK();
133
#else
134
        // Check if process is still alive
135
        if (kill(exist_pid, 0) == 0) {
136
            // Process exists, verify it's actually our CDC client by health check
137
            std::string check_response;
138
            auto check_st = check_cdc_client_health(3, 1, check_response);
139
            if (check_st.ok()) {
140
                // Process exists and responding, CDC client is running
141
                return Status::OK();
142
            } else {
143
                // Process exists but CDC client not responding
144
                // Either it's a different process (PID reused) or CDC client is unhealthy
145
                st = Status::InternalError(fmt::format("CDC client {} unresponsive", exist_pid));
146
                st.to_protobuf(result->mutable_status());
147
                return st;
148
            }
149
        } else {
150
            LOG(INFO) << "CDC client is dead, pid=" << exist_pid;
151
            // Process is dead, reset PID and continue to start
152
            _child_pid.store(0);
153
        }
154
#endif
155
27
    } else if (!_adopted_external.load()) {
156
27
        LOG(INFO) << "CDC client has never been started";
157
27
    }
158
159
#ifndef BE_TEST
160
    // Adopt an externally-managed cdc_client if the port already answers
161
    // healthy (e.g. one started manually for debug / hotfix).
162
    {
163
        std::string adopt_response;
164
        if (check_cdc_client_health(1, 0, adopt_response).ok()) {
165
            if (!_adopted_external.exchange(true)) {
166
                LOG(INFO) << "Adopting external cdc client on port "
167
                          << doris::config::cdc_client_port;
168
            }
169
            return Status::OK();
170
        }
171
    }
172
    _adopted_external.store(false);
173
#endif
174
175
27
    const char* doris_home = getenv("DORIS_HOME");
176
27
    const char* log_dir = getenv("LOG_DIR");
177
27
    const std::string cdc_jar_path = std::string(doris_home) + "/lib/cdc_client/cdc-client.jar";
178
27
    const std::string cdc_jar_port =
179
27
            "--server.port=" + std::to_string(doris::config::cdc_client_port);
180
27
    const std::string backend_http_port =
181
27
            "--backend.http.port=" + std::to_string(config::webserver_port);
182
27
    const std::string cluster_token = "--cluster.token=" + ExecEnv::GetInstance()->token();
183
27
    const std::string java_opts = "-Dlog.path=" + std::string(log_dir);
184
185
    // check cdc jar exists
186
27
    struct stat buffer;
187
27
    if (stat(cdc_jar_path.c_str(), &buffer) != 0) {
188
1
        st = Status::InternalError("Can not find cdc-client.jar.");
189
1
        st.to_protobuf(result->mutable_status());
190
1
        return st;
191
1
    }
192
193
    // Ready to start cdc client
194
27
    LOG(INFO) << "Ready to start cdc client";
195
26
    const auto* java_home = getenv("JAVA_HOME");
196
26
    if (!java_home) {
197
4
        st = Status::InternalError("Can not find JAVA_HOME");
198
4
        st.to_protobuf(result->mutable_status());
199
4
        return st;
200
4
    }
201
22
    std::string path(java_home);
202
22
    std::string java_bin = path + "/bin/java";
203
204
    // Pre-build everything the child needs before fork(): heap allocation after
205
    // fork() in a multi-threaded process can deadlock on inherited libc locks.
206
22
    std::vector<std::string> argv_storage;
207
22
    argv_storage.emplace_back("java");
208
22
    const std::string user_java_opts = doris::config::cdc_client_java_opts;
209
22
    if (!user_java_opts.empty()) {
210
0
        std::istringstream iss(user_java_opts);
211
0
        argv_storage.insert(argv_storage.end(), std::istream_iterator<std::string>(iss),
212
0
                            std::istream_iterator<std::string>());
213
0
    }
214
22
    argv_storage.emplace_back(java_opts);
215
    // OOM safety net (last-wins, user opts cannot disable).
216
22
    argv_storage.emplace_back("-XX:+ExitOnOutOfMemoryError");
217
    // JDK17 opens for debezium ObjectSizeCalculator reflection.
218
22
    argv_storage.emplace_back("--add-opens=java.base/java.lang=ALL-UNNAMED");
219
22
    argv_storage.emplace_back("--add-opens=java.base/java.util=ALL-UNNAMED");
220
22
    argv_storage.emplace_back("--add-opens=java.base/java.math=ALL-UNNAMED");
221
22
    argv_storage.emplace_back("--add-opens=java.base/java.nio=ALL-UNNAMED");
222
22
    argv_storage.emplace_back("-jar");
223
22
    argv_storage.emplace_back(cdc_jar_path);
224
22
    argv_storage.emplace_back(cdc_jar_port);
225
22
    argv_storage.emplace_back(backend_http_port);
226
22
    argv_storage.emplace_back(cluster_token);
227
228
22
    std::vector<char*> argv;
229
22
    argv.reserve(argv_storage.size() + 1);
230
264
    for (auto& s : argv_storage) {
231
264
        argv.push_back(const_cast<char*>(s.c_str()));
232
264
    }
233
22
    argv.push_back(nullptr);
234
235
22
    const std::string cdc_out_file = std::string(log_dir) + "/cdc-client.out";
236
237
22
    struct sigaction act;
238
22
    act.sa_flags = 0;
239
22
    act.sa_handler = handle_sigchld;
240
22
    sigaction(SIGCHLD, &act, NULL);
241
22
    LOG(INFO) << "Start to fork cdc client process with " << path;
242
22
#ifdef BE_TEST
243
22
    _child_pid.store(99999);
244
22
    st = Status::OK();
245
22
    return st;
246
#else
247
    pid_t pid = fork();
248
    if (pid < 0) {
249
        st = Status::InternalError("Fork cdc client failed.");
250
        st.to_protobuf(result->mutable_status());
251
        return st;
252
    } else if (pid == 0) {
253
        // Child: async-signal-safe operations only until execv().
254
#ifndef __APPLE__
255
        prctl(PR_SET_PDEATHSIG, SIGKILL);
256
#endif
257
        int out_fd = open(cdc_out_file.c_str(), O_WRONLY | O_CREAT | O_APPEND | O_CLOEXEC, 0644);
258
        if (out_fd < 0) {
259
            perror("open cdc-client.out file failed");
260
            _exit(1);
261
        }
262
        dup2(out_fd, STDOUT_FILENO);
263
        dup2(out_fd, STDERR_FILENO);
264
        close(out_fd);
265
        execv(java_bin.c_str(), argv.data());
266
        perror("Cdc client child process error");
267
        _exit(1);
268
    } else {
269
        // Parent process: save PID and wait for startup
270
        _child_pid.store(pid);
271
272
        // Waiting for cdc to start, failed after more than 3 * 10 seconds
273
        std::string health_response;
274
        Status status = check_cdc_client_health(3, 10, health_response);
275
        if (!status.ok()) {
276
            // Reset PID if startup failed
277
            _child_pid.store(0);
278
            st = Status::InternalError("Start cdc client failed.");
279
            st.to_protobuf(result->mutable_status());
280
        } else if (kill(pid, 0) != 0) {
281
            // Port healthy but our child has exited: an external process is
282
            // answering. Treat as adoption instead of masking dead PID as success.
283
            _child_pid.store(0);
284
            if (!_adopted_external.exchange(true)) {
285
                LOG(INFO) << "Forked cdc client " << pid << " exited but port "
286
                          << doris::config::cdc_client_port
287
                          << " is healthy, adopting external instance";
288
            }
289
        } else {
290
            _adopted_external.store(false);
291
            LOG(INFO) << "Start cdc client success, pid=" << pid
292
                      << ", status=" << status.to_string() << ", response=" << health_response;
293
        }
294
    }
295
#endif //BE_TEST
296
0
    return st;
297
26
}
298
299
void CdcClientMgr::request_cdc_client_impl(const PRequestCdcClientRequest* request,
300
                                           PRequestCdcClientResult* result,
301
7
                                           google::protobuf::Closure* done) {
302
7
    brpc::ClosureGuard closure_guard(done);
303
304
    // Start CDC client if not started
305
7
    Status start_st = start_cdc_client(result);
306
7
    if (!start_st.ok()) {
307
1
        LOG(ERROR) << "Failed to start CDC client, status=" << start_st.to_string();
308
1
        start_st.to_protobuf(result->mutable_status());
309
1
        return;
310
1
    }
311
312
6
    std::string cdc_response;
313
6
    Status st = send_request_to_cdc_client(request->api(), request->params(), &cdc_response);
314
6
    result->set_response(cdc_response);
315
6
    st.to_protobuf(result->mutable_status());
316
6
}
317
318
Status CdcClientMgr::send_request_to_cdc_client(const std::string& api,
319
                                                const std::string& params_body,
320
12
                                                std::string* response) {
321
12
    std::string remote_url_prefix =
322
12
            fmt::format("http://127.0.0.1:{}{}", doris::config::cdc_client_port, api);
323
324
36
    auto cdc_request = [&remote_url_prefix, response, &params_body](HttpClient* client) {
325
36
        RETURN_IF_ERROR(client->init(remote_url_prefix));
326
36
        client->set_timeout_ms(doris::config::request_cdc_client_timeout_ms);
327
36
        if (!params_body.empty()) {
328
33
            client->set_payload(params_body);
329
33
        }
330
36
        client->set_content_type("application/json");
331
36
        client->set_method(POST);
332
36
        RETURN_IF_ERROR(client->execute(response));
333
0
        return Status::OK();
334
36
    };
335
336
12
    return HttpClient::execute_with_retry(3, 1, cdc_request);
337
12
}
338
339
} // namespace doris