Coverage Report

Created: 2026-06-06 10:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/cdc_client_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/cdc_client_mgr.h"
19
20
#include <brpc/closure_guard.h>
21
#include <fcntl.h>
22
#include <fmt/core.h>
23
#include <gen_cpp/internal_service.pb.h>
24
#include <google/protobuf/stubs/callback.h>
25
#include <signal.h>
26
#include <sys/stat.h>
27
#include <sys/wait.h>
28
#include <unistd.h>
29
30
#include <cstdio>
31
#ifndef __APPLE__
32
#include <sys/prctl.h>
33
#endif
34
35
#include <atomic>
36
#include <chrono>
37
#include <iterator>
38
#include <mutex>
39
#include <sstream>
40
#include <string>
41
#include <thread>
42
#include <vector>
43
44
#include "common/config.h"
45
#include "common/logging.h"
46
#include "common/status.h"
47
#include "runtime/exec_env.h"
48
#include "service/http/http_client.h"
49
50
namespace doris {
51
52
namespace {
53
// Handle SIGCHLD signal to prevent zombie processes
54
125
void handle_sigchld(int sig_no) {
55
125
    int status = 0;
56
125
    pid_t pid;
57
144
    while ((pid = waitpid(-1, &status, WNOHANG)) > 0) {
58
19
    }
59
125
}
60
61
// Check CDC client health
62
#ifndef BE_TEST
63
4.63k
Status check_cdc_client_health(int retry_times, int sleep_time, std::string& health_response) {
64
4.63k
    const std::string cdc_health_url =
65
4.63k
            "http://127.0.0.1:" + std::to_string(doris::config::cdc_client_port) +
66
4.63k
            "/actuator/health";
67
68
4.64k
    auto health_request = [cdc_health_url, &health_response](HttpClient* client) {
69
4.64k
        RETURN_IF_ERROR(client->init(cdc_health_url));
70
4.64k
        client->set_timeout_ms(5000);
71
4.64k
        RETURN_IF_ERROR(client->execute(&health_response));
72
4.63k
        return Status::OK();
73
4.64k
    };
74
75
4.63k
    Status status = HttpClient::execute_with_retry(retry_times, sleep_time, health_request);
76
77
4.63k
    if (!status.ok()) {
78
2
        return Status::InternalError("CDC client health check failed");
79
2
    }
80
81
4.63k
    bool is_up = health_response.find("UP") != std::string::npos;
82
83
4.63k
    if (!is_up) {
84
0
        return Status::InternalError(fmt::format("CDC client unhealthy: {}", health_response));
85
0
    }
86
87
4.63k
    return Status::OK();
88
4.63k
}
89
#endif
90
91
} // anonymous namespace
92
93
38
CdcClientMgr::CdcClientMgr() = default;
94
95
34
CdcClientMgr::~CdcClientMgr() {
96
34
    stop();
97
34
}
98
99
49
void CdcClientMgr::stop() {
100
49
    pid_t pid = _child_pid.load();
101
49
    if (pid > 0) {
102
        // Check if process is still alive
103
25
        if (kill(pid, 0) == 0) {
104
2
            LOG(INFO) << "Stopping CDC client process, pid=" << pid;
105
            // Send SIGTERM for graceful shutdown
106
2
            kill(pid, SIGTERM);
107
            // Wait a short time for graceful shutdown
108
2
            std::this_thread::sleep_for(std::chrono::milliseconds(200));
109
            // Force kill if still alive
110
2
            if (kill(pid, 0) == 0) {
111
1
                LOG(INFO) << "Force killing CDC client process, pid=" << pid;
112
1
                kill(pid, SIGKILL);
113
1
                int status = 0;
114
1
                waitpid(pid, &status, 0);
115
1
            }
116
2
        }
117
25
        _child_pid.store(0);
118
25
    }
119
120
49
    LOG(INFO) << "CdcClientMgr is stopped";
121
49
}
122
123
4.63k
Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
124
4.63k
    std::lock_guard<std::mutex> lock(_start_mutex);
125
126
4.63k
    Status st = Status::OK();
127
4.63k
    pid_t exist_pid = _child_pid.load();
128
4.63k
    if (exist_pid > 0) {
129
#ifdef BE_TEST
130
        // In test mode, directly return OK if PID exists
131
        LOG(INFO) << "cdc client already started (BE_TEST mode), pid=" << exist_pid;
132
        return Status::OK();
133
#else
134
        // Check if process is still alive
135
4.63k
        if (kill(exist_pid, 0) == 0) {
136
            // Process exists, verify it's actually our CDC client by health check
137
4.63k
            std::string check_response;
138
4.63k
            auto check_st = check_cdc_client_health(3, 1, check_response);
139
4.63k
            if (check_st.ok()) {
140
                // Process exists and responding, CDC client is running
141
4.63k
                return Status::OK();
142
4.63k
            } else {
143
                // Process exists but CDC client not responding
144
                // Either it's a different process (PID reused) or CDC client is unhealthy
145
0
                st = Status::InternalError(fmt::format("CDC client {} unresponsive", exist_pid));
146
0
                st.to_protobuf(result->mutable_status());
147
0
                return st;
148
0
            }
149
4.63k
        } else {
150
0
            LOG(INFO) << "CDC client is dead, pid=" << exist_pid;
151
            // Process is dead, reset PID and continue to start
152
0
            _child_pid.store(0);
153
0
        }
154
4.63k
#endif
155
4.63k
    } else if (!_adopted_external.load()) {
156
2
        LOG(INFO) << "CDC client has never been started";
157
2
    }
158
159
2
#ifndef BE_TEST
160
    // Adopt an externally-managed cdc_client if the port already answers
161
    // healthy (e.g. one started manually for debug / hotfix).
162
2
    {
163
2
        std::string adopt_response;
164
2
        if (check_cdc_client_health(1, 0, adopt_response).ok()) {
165
0
            if (!_adopted_external.exchange(true)) {
166
0
                LOG(INFO) << "Adopting external cdc client on port "
167
0
                          << doris::config::cdc_client_port;
168
0
            }
169
0
            return Status::OK();
170
0
        }
171
2
    }
172
2
    _adopted_external.store(false);
173
2
#endif
174
175
2
    const char* doris_home = getenv("DORIS_HOME");
176
2
    const char* log_dir = getenv("LOG_DIR");
177
2
    const std::string cdc_jar_path = std::string(doris_home) + "/lib/cdc_client/cdc-client.jar";
178
2
    const std::string cdc_jar_port =
179
2
            "--server.port=" + std::to_string(doris::config::cdc_client_port);
180
2
    const std::string backend_http_port =
181
2
            "--backend.http.port=" + std::to_string(config::webserver_port);
182
2
    const std::string cluster_token = "--cluster.token=" + ExecEnv::GetInstance()->token();
183
2
    const std::string java_opts = "-Dlog.path=" + std::string(log_dir);
184
185
    // check cdc jar exists
186
2
    struct stat buffer;
187
2
    if (stat(cdc_jar_path.c_str(), &buffer) != 0) {
188
0
        st = Status::InternalError("Can not find cdc-client.jar.");
189
0
        st.to_protobuf(result->mutable_status());
190
0
        return st;
191
0
    }
192
193
    // Ready to start cdc client
194
2
    LOG(INFO) << "Ready to start cdc client";
195
2
    const auto* java_home = getenv("JAVA_HOME");
196
2
    if (!java_home) {
197
0
        st = Status::InternalError("Can not find JAVA_HOME");
198
0
        st.to_protobuf(result->mutable_status());
199
0
        return st;
200
0
    }
201
2
    std::string path(java_home);
202
2
    std::string java_bin = path + "/bin/java";
203
204
    // Pre-build everything the child needs before fork(): heap allocation after
205
    // fork() in a multi-threaded process can deadlock on inherited libc locks.
206
2
    std::vector<std::string> argv_storage;
207
2
    argv_storage.emplace_back("java");
208
2
    const std::string user_java_opts = doris::config::cdc_client_java_opts;
209
2
    if (!user_java_opts.empty()) {
210
0
        std::istringstream iss(user_java_opts);
211
0
        argv_storage.insert(argv_storage.end(), std::istream_iterator<std::string>(iss),
212
0
                            std::istream_iterator<std::string>());
213
0
    }
214
2
    argv_storage.emplace_back(java_opts);
215
    // OOM safety net (last-wins, user opts cannot disable).
216
2
    argv_storage.emplace_back("-XX:+ExitOnOutOfMemoryError");
217
2
    argv_storage.emplace_back("-jar");
218
2
    argv_storage.emplace_back(cdc_jar_path);
219
2
    argv_storage.emplace_back(cdc_jar_port);
220
2
    argv_storage.emplace_back(backend_http_port);
221
2
    argv_storage.emplace_back(cluster_token);
222
223
2
    std::vector<char*> argv;
224
2
    argv.reserve(argv_storage.size() + 1);
225
16
    for (auto& s : argv_storage) {
226
16
        argv.push_back(const_cast<char*>(s.c_str()));
227
16
    }
228
2
    argv.push_back(nullptr);
229
230
2
    const std::string cdc_out_file = std::string(log_dir) + "/cdc-client.out";
231
232
2
    struct sigaction act;
233
2
    act.sa_flags = 0;
234
2
    act.sa_handler = handle_sigchld;
235
2
    sigaction(SIGCHLD, &act, NULL);
236
2
    LOG(INFO) << "Start to fork cdc client process with " << path;
237
#ifdef BE_TEST
238
    _child_pid.store(99999);
239
    st = Status::OK();
240
    return st;
241
#else
242
2
    pid_t pid = fork();
243
2
    if (pid < 0) {
244
0
        st = Status::InternalError("Fork cdc client failed.");
245
0
        st.to_protobuf(result->mutable_status());
246
0
        return st;
247
2
    } else if (pid == 0) {
248
        // Child: async-signal-safe operations only until execv().
249
0
#ifndef __APPLE__
250
0
        prctl(PR_SET_PDEATHSIG, SIGKILL);
251
0
#endif
252
0
        int out_fd = open(cdc_out_file.c_str(), O_WRONLY | O_CREAT | O_APPEND | O_CLOEXEC, 0644);
253
0
        if (out_fd < 0) {
254
0
            perror("open cdc-client.out file failed");
255
0
            _exit(1);
256
0
        }
257
0
        dup2(out_fd, STDOUT_FILENO);
258
0
        dup2(out_fd, STDERR_FILENO);
259
0
        close(out_fd);
260
0
        execv(java_bin.c_str(), argv.data());
261
0
        perror("Cdc client child process error");
262
0
        _exit(1);
263
2
    } else {
264
        // Parent process: save PID and wait for startup
265
2
        _child_pid.store(pid);
266
267
        // Waiting for cdc to start, failed after more than 3 * 10 seconds
268
2
        std::string health_response;
269
2
        Status status = check_cdc_client_health(3, 10, health_response);
270
2
        if (!status.ok()) {
271
            // Reset PID if startup failed
272
0
            _child_pid.store(0);
273
0
            st = Status::InternalError("Start cdc client failed.");
274
0
            st.to_protobuf(result->mutable_status());
275
2
        } else if (kill(pid, 0) != 0) {
276
            // Port healthy but our child has exited: an external process is
277
            // answering. Treat as adoption instead of masking dead PID as success.
278
0
            _child_pid.store(0);
279
0
            if (!_adopted_external.exchange(true)) {
280
0
                LOG(INFO) << "Forked cdc client " << pid << " exited but port "
281
0
                          << doris::config::cdc_client_port
282
0
                          << " is healthy, adopting external instance";
283
0
            }
284
2
        } else {
285
2
            _adopted_external.store(false);
286
2
            LOG(INFO) << "Start cdc client success, pid=" << pid
287
2
                      << ", status=" << status.to_string() << ", response=" << health_response;
288
2
        }
289
2
    }
290
2
#endif //BE_TEST
291
2
    return st;
292
2
}
293
294
void CdcClientMgr::request_cdc_client_impl(const PRequestCdcClientRequest* request,
295
                                           PRequestCdcClientResult* result,
296
4.21k
                                           google::protobuf::Closure* done) {
297
4.21k
    brpc::ClosureGuard closure_guard(done);
298
299
    // Start CDC client if not started
300
4.21k
    Status start_st = start_cdc_client(result);
301
4.21k
    if (!start_st.ok()) {
302
1
        LOG(ERROR) << "Failed to start CDC client, status=" << start_st.to_string();
303
1
        start_st.to_protobuf(result->mutable_status());
304
1
        return;
305
1
    }
306
307
4.21k
    std::string cdc_response;
308
4.21k
    Status st = send_request_to_cdc_client(request->api(), request->params(), &cdc_response);
309
4.21k
    result->set_response(cdc_response);
310
4.21k
    st.to_protobuf(result->mutable_status());
311
4.21k
}
312
313
Status CdcClientMgr::send_request_to_cdc_client(const std::string& api,
314
                                                const std::string& params_body,
315
4.21k
                                                std::string* response) {
316
4.21k
    std::string remote_url_prefix =
317
4.21k
            fmt::format("http://127.0.0.1:{}{}", doris::config::cdc_client_port, api);
318
319
4.24k
    auto cdc_request = [&remote_url_prefix, response, &params_body](HttpClient* client) {
320
4.24k
        RETURN_IF_ERROR(client->init(remote_url_prefix));
321
4.24k
        client->set_timeout_ms(doris::config::request_cdc_client_timeout_ms);
322
4.24k
        if (!params_body.empty()) {
323
3.81k
            client->set_payload(params_body);
324
3.81k
        }
325
4.24k
        client->set_content_type("application/json");
326
4.24k
        client->set_method(POST);
327
4.24k
        RETURN_IF_ERROR(client->execute(response));
328
4.20k
        return Status::OK();
329
4.24k
    };
330
331
4.21k
    return HttpClient::execute_with_retry(3, 1, cdc_request);
332
4.21k
}
333
334
} // namespace doris