Coverage Report

Created: 2026-06-05 04:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/cdc_client_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/cdc_client_mgr.h"
19
20
#include <brpc/closure_guard.h>
21
#include <fcntl.h>
22
#include <fmt/core.h>
23
#include <gen_cpp/internal_service.pb.h>
24
#include <google/protobuf/stubs/callback.h>
25
#include <signal.h>
26
#include <sys/stat.h>
27
#include <sys/wait.h>
28
#include <unistd.h>
29
30
#include <cstdio>
31
#ifndef __APPLE__
32
#include <sys/prctl.h>
33
#endif
34
35
#include <atomic>
36
#include <chrono>
37
#include <iterator>
38
#include <mutex>
39
#include <sstream>
40
#include <string>
41
#include <thread>
42
#include <vector>
43
44
#include "common/config.h"
45
#include "common/logging.h"
46
#include "common/status.h"
47
#include "runtime/exec_env.h"
48
#include "service/http/http_client.h"
49
50
namespace doris {
51
52
namespace {
53
// Handle SIGCHLD signal to prevent zombie processes
54
96
void handle_sigchld(int sig_no) {
55
96
    int status = 0;
56
96
    pid_t pid;
57
96
    while ((pid = waitpid(-1, &status, WNOHANG)) > 0) {
58
0
    }
59
96
}
60
61
// Check CDC client health
62
#ifndef BE_TEST
63
30
Status check_cdc_client_health(int retry_times, int sleep_time, std::string& health_response) {
64
30
    const std::string cdc_health_url =
65
30
            "http://127.0.0.1:" + std::to_string(doris::config::cdc_client_port) +
66
30
            "/actuator/health";
67
68
31
    auto health_request = [cdc_health_url, &health_response](HttpClient* client) {
69
31
        RETURN_IF_ERROR(client->init(cdc_health_url));
70
31
        client->set_timeout_ms(5000);
71
31
        RETURN_IF_ERROR(client->execute(&health_response));
72
29
        return Status::OK();
73
31
    };
74
75
30
    Status status = HttpClient::execute_with_retry(retry_times, sleep_time, health_request);
76
77
30
    if (!status.ok()) {
78
1
        return Status::InternalError("CDC client health check failed");
79
1
    }
80
81
29
    bool is_up = health_response.find("UP") != std::string::npos;
82
83
29
    if (!is_up) {
84
0
        return Status::InternalError(fmt::format("CDC client unhealthy: {}", health_response));
85
0
    }
86
87
29
    return Status::OK();
88
29
}
89
#endif
90
91
} // anonymous namespace
92
93
35
CdcClientMgr::CdcClientMgr() = default;
94
95
31
CdcClientMgr::~CdcClientMgr() {
96
31
    stop();
97
31
}
98
99
46
void CdcClientMgr::stop() {
100
46
    pid_t pid = _child_pid.load();
101
46
    if (pid > 0) {
102
        // Check if process is still alive
103
24
        if (kill(pid, 0) == 0) {
104
2
            LOG(INFO) << "Stopping CDC client process, pid=" << pid;
105
            // Send SIGTERM for graceful shutdown
106
2
            kill(pid, SIGTERM);
107
            // Wait a short time for graceful shutdown
108
2
            std::this_thread::sleep_for(std::chrono::milliseconds(200));
109
            // Force kill if still alive
110
2
            if (kill(pid, 0) == 0) {
111
1
                LOG(INFO) << "Force killing CDC client process, pid=" << pid;
112
1
                kill(pid, SIGKILL);
113
1
                int status = 0;
114
1
                waitpid(pid, &status, 0);
115
1
            }
116
2
        }
117
24
        _child_pid.store(0);
118
24
    }
119
120
46
    LOG(INFO) << "CdcClientMgr is stopped";
121
46
}
122
123
29
Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
124
29
    std::lock_guard<std::mutex> lock(_start_mutex);
125
126
29
    Status st = Status::OK();
127
29
    pid_t exist_pid = _child_pid.load();
128
29
    if (exist_pid > 0) {
129
#ifdef BE_TEST
130
        // In test mode, directly return OK if PID exists
131
        LOG(INFO) << "cdc client already started (BE_TEST mode), pid=" << exist_pid;
132
        return Status::OK();
133
#else
134
        // Check if process is still alive
135
28
        if (kill(exist_pid, 0) == 0) {
136
            // Process exists, verify it's actually our CDC client by health check
137
28
            std::string check_response;
138
28
            auto check_st = check_cdc_client_health(3, 1, check_response);
139
28
            if (check_st.ok()) {
140
                // Process exists and responding, CDC client is running
141
28
                return Status::OK();
142
28
            } else {
143
                // Process exists but CDC client not responding
144
                // Either it's a different process (PID reused) or CDC client is unhealthy
145
0
                st = Status::InternalError(fmt::format("CDC client {} unresponsive", exist_pid));
146
0
                st.to_protobuf(result->mutable_status());
147
0
                return st;
148
0
            }
149
28
        } else {
150
0
            LOG(INFO) << "CDC client is dead, pid=" << exist_pid;
151
            // Process is dead, reset PID and continue to start
152
0
            _child_pid.store(0);
153
0
        }
154
28
#endif
155
28
    } else if (!_adopted_external.load()) {
156
1
        LOG(INFO) << "CDC client has never been started";
157
1
    }
158
159
1
#ifndef BE_TEST
160
    // Adopt an externally-managed cdc_client if the port already answers
161
    // healthy (e.g. one started manually for debug / hotfix).
162
1
    {
163
1
        std::string adopt_response;
164
1
        if (check_cdc_client_health(1, 0, adopt_response).ok()) {
165
0
            if (!_adopted_external.exchange(true)) {
166
0
                LOG(INFO) << "Adopting external cdc client on port "
167
0
                          << doris::config::cdc_client_port;
168
0
            }
169
0
            return Status::OK();
170
0
        }
171
1
    }
172
1
    _adopted_external.store(false);
173
1
#endif
174
175
1
    const char* doris_home = getenv("DORIS_HOME");
176
1
    const char* log_dir = getenv("LOG_DIR");
177
1
    const std::string cdc_jar_path = std::string(doris_home) + "/lib/cdc_client/cdc-client.jar";
178
1
    const std::string cdc_jar_port =
179
1
            "--server.port=" + std::to_string(doris::config::cdc_client_port);
180
1
    const std::string backend_http_port =
181
1
            "--backend.http.port=" + std::to_string(config::webserver_port);
182
1
    const std::string cluster_token = "--cluster.token=" + ExecEnv::GetInstance()->token();
183
1
    const std::string java_opts = "-Dlog.path=" + std::string(log_dir);
184
185
    // check cdc jar exists
186
1
    struct stat buffer;
187
1
    if (stat(cdc_jar_path.c_str(), &buffer) != 0) {
188
0
        st = Status::InternalError("Can not find cdc-client.jar.");
189
0
        st.to_protobuf(result->mutable_status());
190
0
        return st;
191
0
    }
192
193
    // Ready to start cdc client
194
1
    LOG(INFO) << "Ready to start cdc client";
195
1
    const auto* java_home = getenv("JAVA_HOME");
196
1
    if (!java_home) {
197
0
        st = Status::InternalError("Can not find JAVA_HOME");
198
0
        st.to_protobuf(result->mutable_status());
199
0
        return st;
200
0
    }
201
1
    std::string path(java_home);
202
1
    std::string java_bin = path + "/bin/java";
203
204
    // Pre-build everything the child needs before fork(): heap allocation after
205
    // fork() in a multi-threaded process can deadlock on inherited libc locks.
206
1
    std::vector<std::string> argv_storage;
207
1
    argv_storage.emplace_back("java");
208
1
    const std::string user_java_opts = doris::config::cdc_client_java_opts;
209
1
    if (!user_java_opts.empty()) {
210
0
        std::istringstream iss(user_java_opts);
211
0
        argv_storage.insert(argv_storage.end(), std::istream_iterator<std::string>(iss),
212
0
                            std::istream_iterator<std::string>());
213
0
    }
214
1
    argv_storage.emplace_back(java_opts);
215
    // OOM safety net (last-wins, user opts cannot disable).
216
1
    argv_storage.emplace_back("-XX:+ExitOnOutOfMemoryError");
217
1
    argv_storage.emplace_back("-jar");
218
1
    argv_storage.emplace_back(cdc_jar_path);
219
1
    argv_storage.emplace_back(cdc_jar_port);
220
1
    argv_storage.emplace_back(backend_http_port);
221
1
    argv_storage.emplace_back(cluster_token);
222
223
1
    std::vector<char*> argv;
224
1
    argv.reserve(argv_storage.size() + 1);
225
8
    for (auto& s : argv_storage) {
226
8
        argv.push_back(const_cast<char*>(s.c_str()));
227
8
    }
228
1
    argv.push_back(nullptr);
229
230
1
    const std::string cdc_out_file = std::string(log_dir) + "/cdc-client.out";
231
232
1
    struct sigaction act;
233
1
    act.sa_flags = 0;
234
1
    act.sa_handler = handle_sigchld;
235
1
    sigaction(SIGCHLD, &act, NULL);
236
1
    LOG(INFO) << "Start to fork cdc client process with " << path;
237
#ifdef BE_TEST
238
    _child_pid.store(99999);
239
    st = Status::OK();
240
    return st;
241
#else
242
1
    pid_t pid = fork();
243
1
    if (pid < 0) {
244
0
        st = Status::InternalError("Fork cdc client failed.");
245
0
        st.to_protobuf(result->mutable_status());
246
0
        return st;
247
1
    } else if (pid == 0) {
248
        // Child: async-signal-safe operations only until execv().
249
0
#ifndef __APPLE__
250
0
        prctl(PR_SET_PDEATHSIG, SIGKILL);
251
0
#endif
252
0
        int out_fd = open(cdc_out_file.c_str(), O_WRONLY | O_CREAT | O_APPEND | O_CLOEXEC, 0644);
253
0
        if (out_fd < 0) {
254
0
            perror("open cdc-client.out file failed");
255
0
            _exit(1);
256
0
        }
257
0
        dup2(out_fd, STDOUT_FILENO);
258
0
        dup2(out_fd, STDERR_FILENO);
259
0
        close(out_fd);
260
0
        execv(java_bin.c_str(), argv.data());
261
0
        perror("Cdc client child process error");
262
0
        _exit(1);
263
1
    } else {
264
        // Parent process: save PID and wait for startup
265
1
        _child_pid.store(pid);
266
267
        // Waiting for cdc to start, failed after more than 3 * 10 seconds
268
1
        std::string health_response;
269
1
        Status status = check_cdc_client_health(3, 10, health_response);
270
1
        if (!status.ok()) {
271
            // Reset PID if startup failed
272
0
            _child_pid.store(0);
273
0
            st = Status::InternalError("Start cdc client failed.");
274
0
            st.to_protobuf(result->mutable_status());
275
1
        } else if (kill(pid, 0) != 0) {
276
            // Port healthy but our child has exited: an external process is
277
            // answering. Treat as adoption instead of masking dead PID as success.
278
0
            _child_pid.store(0);
279
0
            if (!_adopted_external.exchange(true)) {
280
0
                LOG(INFO) << "Forked cdc client " << pid << " exited but port "
281
0
                          << doris::config::cdc_client_port
282
0
                          << " is healthy, adopting external instance";
283
0
            }
284
1
        } else {
285
1
            _adopted_external.store(false);
286
1
            LOG(INFO) << "Start cdc client success, pid=" << pid
287
1
                      << ", status=" << status.to_string() << ", response=" << health_response;
288
1
        }
289
1
    }
290
1
#endif //BE_TEST
291
1
    return st;
292
1
}
293
294
void CdcClientMgr::request_cdc_client_impl(const PRequestCdcClientRequest* request,
295
                                           PRequestCdcClientResult* result,
296
36
                                           google::protobuf::Closure* done) {
297
36
    brpc::ClosureGuard closure_guard(done);
298
299
    // Start CDC client if not started
300
36
    Status start_st = start_cdc_client(result);
301
36
    if (!start_st.ok()) {
302
1
        LOG(ERROR) << "Failed to start CDC client, status=" << start_st.to_string();
303
1
        start_st.to_protobuf(result->mutable_status());
304
1
        return;
305
1
    }
306
307
35
    std::string cdc_response;
308
35
    Status st = send_request_to_cdc_client(request->api(), request->params(), &cdc_response);
309
35
    result->set_response(cdc_response);
310
35
    st.to_protobuf(result->mutable_status());
311
35
}
312
313
Status CdcClientMgr::send_request_to_cdc_client(const std::string& api,
314
                                                const std::string& params_body,
315
41
                                                std::string* response) {
316
41
    std::string remote_url_prefix =
317
41
            fmt::format("http://127.0.0.1:{}{}", doris::config::cdc_client_port, api);
318
319
65
    auto cdc_request = [&remote_url_prefix, response, &params_body](HttpClient* client) {
320
65
        RETURN_IF_ERROR(client->init(remote_url_prefix));
321
65
        client->set_timeout_ms(doris::config::request_cdc_client_timeout_ms);
322
65
        if (!params_body.empty()) {
323
59
            client->set_payload(params_body);
324
59
        }
325
65
        client->set_content_type("application/json");
326
65
        client->set_method(POST);
327
65
        RETURN_IF_ERROR(client->execute(response));
328
29
        return Status::OK();
329
65
    };
330
331
41
    return HttpClient::execute_with_retry(3, 1, cdc_request);
332
41
}
333
334
} // namespace doris