Coverage Report

Created: 2026-06-05 20:48

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/cdc_client_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/cdc_client_mgr.h"
19
20
#include <brpc/closure_guard.h>
21
#include <fcntl.h>
22
#include <fmt/core.h>
23
#include <gen_cpp/internal_service.pb.h>
24
#include <google/protobuf/stubs/callback.h>
25
#include <signal.h>
26
#include <sys/stat.h>
27
#include <sys/wait.h>
28
#include <unistd.h>
29
30
#include <cstdio>
31
#ifndef __APPLE__
32
#include <sys/prctl.h>
33
#endif
34
35
#include <atomic>
36
#include <chrono>
37
#include <iterator>
38
#include <mutex>
39
#include <sstream>
40
#include <string>
41
#include <thread>
42
#include <vector>
43
44
#include "common/config.h"
45
#include "common/logging.h"
46
#include "common/status.h"
47
#include "runtime/exec_env.h"
48
#include "service/http/http_client.h"
49
50
namespace doris {
51
52
namespace {
53
// Handle SIGCHLD signal to prevent zombie processes
54
96
void handle_sigchld(int sig_no) {
55
96
    int status = 0;
56
96
    pid_t pid;
57
96
    while ((pid = waitpid(-1, &status, WNOHANG)) > 0) {
58
0
    }
59
96
}
60
61
// Check CDC client health
62
#ifndef BE_TEST
63
Status check_cdc_client_health(int retry_times, int sleep_time, std::string& health_response) {
64
    const std::string cdc_health_url =
65
            "http://127.0.0.1:" + std::to_string(doris::config::cdc_client_port) +
66
            "/actuator/health";
67
68
    auto health_request = [cdc_health_url, &health_response](HttpClient* client) {
69
        RETURN_IF_ERROR(client->init(cdc_health_url));
70
        client->set_timeout_ms(5000);
71
        RETURN_IF_ERROR(client->execute(&health_response));
72
        return Status::OK();
73
    };
74
75
    Status status = HttpClient::execute_with_retry(retry_times, sleep_time, health_request);
76
77
    if (!status.ok()) {
78
        return Status::InternalError("CDC client health check failed");
79
    }
80
81
    bool is_up = health_response.find("UP") != std::string::npos;
82
83
    if (!is_up) {
84
        return Status::InternalError(fmt::format("CDC client unhealthy: {}", health_response));
85
    }
86
87
    return Status::OK();
88
}
89
#endif
90
91
} // anonymous namespace
92
93
31
CdcClientMgr::CdcClientMgr() = default;
94
95
31
CdcClientMgr::~CdcClientMgr() {
96
31
    stop();
97
31
}
98
99
46
void CdcClientMgr::stop() {
100
46
    pid_t pid = _child_pid.load();
101
46
    if (pid > 0) {
102
        // Check if process is still alive
103
24
        if (kill(pid, 0) == 0) {
104
2
            LOG(INFO) << "Stopping CDC client process, pid=" << pid;
105
            // Send SIGTERM for graceful shutdown
106
2
            kill(pid, SIGTERM);
107
            // Wait a short time for graceful shutdown
108
2
            std::this_thread::sleep_for(std::chrono::milliseconds(200));
109
            // Force kill if still alive
110
2
            if (kill(pid, 0) == 0) {
111
1
                LOG(INFO) << "Force killing CDC client process, pid=" << pid;
112
1
                kill(pid, SIGKILL);
113
1
                int status = 0;
114
1
                waitpid(pid, &status, 0);
115
1
            }
116
2
        }
117
24
        _child_pid.store(0);
118
24
    }
119
120
46
    LOG(INFO) << "CdcClientMgr is stopped";
121
46
}
122
123
35
Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
124
35
    std::lock_guard<std::mutex> lock(_start_mutex);
125
126
35
    Status st = Status::OK();
127
35
    pid_t exist_pid = _child_pid.load();
128
35
    if (exist_pid > 0) {
129
8
#ifdef BE_TEST
130
        // In test mode, directly return OK if PID exists
131
8
        LOG(INFO) << "cdc client already started (BE_TEST mode), pid=" << exist_pid;
132
8
        return Status::OK();
133
#else
134
        // Check if process is still alive
135
        if (kill(exist_pid, 0) == 0) {
136
            // Process exists, verify it's actually our CDC client by health check
137
            std::string check_response;
138
            auto check_st = check_cdc_client_health(3, 1, check_response);
139
            if (check_st.ok()) {
140
                // Process exists and responding, CDC client is running
141
                return Status::OK();
142
            } else {
143
                // Process exists but CDC client not responding
144
                // Either it's a different process (PID reused) or CDC client is unhealthy
145
                st = Status::InternalError(fmt::format("CDC client {} unresponsive", exist_pid));
146
                st.to_protobuf(result->mutable_status());
147
                return st;
148
            }
149
        } else {
150
            LOG(INFO) << "CDC client is dead, pid=" << exist_pid;
151
            // Process is dead, reset PID and continue to start
152
            _child_pid.store(0);
153
        }
154
#endif
155
27
    } else if (!_adopted_external.load()) {
156
27
        LOG(INFO) << "CDC client has never been started";
157
27
    }
158
159
#ifndef BE_TEST
160
    // Adopt an externally-managed cdc_client if the port already answers
161
    // healthy (e.g. one started manually for debug / hotfix).
162
    {
163
        std::string adopt_response;
164
        if (check_cdc_client_health(1, 0, adopt_response).ok()) {
165
            if (!_adopted_external.exchange(true)) {
166
                LOG(INFO) << "Adopting external cdc client on port "
167
                          << doris::config::cdc_client_port;
168
            }
169
            return Status::OK();
170
        }
171
    }
172
    _adopted_external.store(false);
173
#endif
174
175
27
    const char* doris_home = getenv("DORIS_HOME");
176
27
    const char* log_dir = getenv("LOG_DIR");
177
27
    const std::string cdc_jar_path = std::string(doris_home) + "/lib/cdc_client/cdc-client.jar";
178
27
    const std::string cdc_jar_port =
179
27
            "--server.port=" + std::to_string(doris::config::cdc_client_port);
180
27
    const std::string backend_http_port =
181
27
            "--backend.http.port=" + std::to_string(config::webserver_port);
182
27
    const std::string cluster_token = "--cluster.token=" + ExecEnv::GetInstance()->token();
183
27
    const std::string java_opts = "-Dlog.path=" + std::string(log_dir);
184
185
    // check cdc jar exists
186
27
    struct stat buffer;
187
27
    if (stat(cdc_jar_path.c_str(), &buffer) != 0) {
188
1
        st = Status::InternalError("Can not find cdc-client.jar.");
189
1
        st.to_protobuf(result->mutable_status());
190
1
        return st;
191
1
    }
192
193
    // Ready to start cdc client
194
27
    LOG(INFO) << "Ready to start cdc client";
195
26
    const auto* java_home = getenv("JAVA_HOME");
196
26
    if (!java_home) {
197
4
        st = Status::InternalError("Can not find JAVA_HOME");
198
4
        st.to_protobuf(result->mutable_status());
199
4
        return st;
200
4
    }
201
22
    std::string path(java_home);
202
22
    std::string java_bin = path + "/bin/java";
203
204
    // Pre-build everything the child needs before fork(): heap allocation after
205
    // fork() in a multi-threaded process can deadlock on inherited libc locks.
206
22
    std::vector<std::string> argv_storage;
207
22
    argv_storage.emplace_back("java");
208
22
    const std::string user_java_opts = doris::config::cdc_client_java_opts;
209
22
    if (!user_java_opts.empty()) {
210
0
        std::istringstream iss(user_java_opts);
211
0
        argv_storage.insert(argv_storage.end(), std::istream_iterator<std::string>(iss),
212
0
                            std::istream_iterator<std::string>());
213
0
    }
214
22
    argv_storage.emplace_back(java_opts);
215
    // OOM safety net (last-wins, user opts cannot disable).
216
22
    argv_storage.emplace_back("-XX:+ExitOnOutOfMemoryError");
217
22
    argv_storage.emplace_back("-jar");
218
22
    argv_storage.emplace_back(cdc_jar_path);
219
22
    argv_storage.emplace_back(cdc_jar_port);
220
22
    argv_storage.emplace_back(backend_http_port);
221
22
    argv_storage.emplace_back(cluster_token);
222
223
22
    std::vector<char*> argv;
224
22
    argv.reserve(argv_storage.size() + 1);
225
176
    for (auto& s : argv_storage) {
226
176
        argv.push_back(const_cast<char*>(s.c_str()));
227
176
    }
228
22
    argv.push_back(nullptr);
229
230
22
    const std::string cdc_out_file = std::string(log_dir) + "/cdc-client.out";
231
232
22
    struct sigaction act;
233
22
    act.sa_flags = 0;
234
22
    act.sa_handler = handle_sigchld;
235
22
    sigaction(SIGCHLD, &act, NULL);
236
22
    LOG(INFO) << "Start to fork cdc client process with " << path;
237
22
#ifdef BE_TEST
238
22
    _child_pid.store(99999);
239
22
    st = Status::OK();
240
22
    return st;
241
#else
242
    pid_t pid = fork();
243
    if (pid < 0) {
244
        st = Status::InternalError("Fork cdc client failed.");
245
        st.to_protobuf(result->mutable_status());
246
        return st;
247
    } else if (pid == 0) {
248
        // Child: async-signal-safe operations only until execv().
249
#ifndef __APPLE__
250
        prctl(PR_SET_PDEATHSIG, SIGKILL);
251
#endif
252
        int out_fd = open(cdc_out_file.c_str(), O_WRONLY | O_CREAT | O_APPEND | O_CLOEXEC, 0644);
253
        if (out_fd < 0) {
254
            perror("open cdc-client.out file failed");
255
            _exit(1);
256
        }
257
        dup2(out_fd, STDOUT_FILENO);
258
        dup2(out_fd, STDERR_FILENO);
259
        close(out_fd);
260
        execv(java_bin.c_str(), argv.data());
261
        perror("Cdc client child process error");
262
        _exit(1);
263
    } else {
264
        // Parent process: save PID and wait for startup
265
        _child_pid.store(pid);
266
267
        // Waiting for cdc to start, failed after more than 3 * 10 seconds
268
        std::string health_response;
269
        Status status = check_cdc_client_health(3, 10, health_response);
270
        if (!status.ok()) {
271
            // Reset PID if startup failed
272
            _child_pid.store(0);
273
            st = Status::InternalError("Start cdc client failed.");
274
            st.to_protobuf(result->mutable_status());
275
        } else if (kill(pid, 0) != 0) {
276
            // Port healthy but our child has exited: an external process is
277
            // answering. Treat as adoption instead of masking dead PID as success.
278
            _child_pid.store(0);
279
            if (!_adopted_external.exchange(true)) {
280
                LOG(INFO) << "Forked cdc client " << pid << " exited but port "
281
                          << doris::config::cdc_client_port
282
                          << " is healthy, adopting external instance";
283
            }
284
        } else {
285
            _adopted_external.store(false);
286
            LOG(INFO) << "Start cdc client success, pid=" << pid
287
                      << ", status=" << status.to_string() << ", response=" << health_response;
288
        }
289
    }
290
#endif //BE_TEST
291
0
    return st;
292
26
}
293
294
void CdcClientMgr::request_cdc_client_impl(const PRequestCdcClientRequest* request,
295
                                           PRequestCdcClientResult* result,
296
7
                                           google::protobuf::Closure* done) {
297
7
    brpc::ClosureGuard closure_guard(done);
298
299
    // Start CDC client if not started
300
7
    Status start_st = start_cdc_client(result);
301
7
    if (!start_st.ok()) {
302
1
        LOG(ERROR) << "Failed to start CDC client, status=" << start_st.to_string();
303
1
        start_st.to_protobuf(result->mutable_status());
304
1
        return;
305
1
    }
306
307
6
    std::string cdc_response;
308
6
    Status st = send_request_to_cdc_client(request->api(), request->params(), &cdc_response);
309
6
    result->set_response(cdc_response);
310
6
    st.to_protobuf(result->mutable_status());
311
6
}
312
313
Status CdcClientMgr::send_request_to_cdc_client(const std::string& api,
314
                                                const std::string& params_body,
315
12
                                                std::string* response) {
316
12
    std::string remote_url_prefix =
317
12
            fmt::format("http://127.0.0.1:{}{}", doris::config::cdc_client_port, api);
318
319
36
    auto cdc_request = [&remote_url_prefix, response, &params_body](HttpClient* client) {
320
36
        RETURN_IF_ERROR(client->init(remote_url_prefix));
321
36
        client->set_timeout_ms(doris::config::request_cdc_client_timeout_ms);
322
36
        if (!params_body.empty()) {
323
33
            client->set_payload(params_body);
324
33
        }
325
36
        client->set_content_type("application/json");
326
36
        client->set_method(POST);
327
36
        RETURN_IF_ERROR(client->execute(response));
328
0
        return Status::OK();
329
36
    };
330
331
12
    return HttpClient::execute_with_retry(3, 1, cdc_request);
332
12
}
333
334
} // namespace doris