Coverage Report

Created: 2026-03-13 09:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/cdc_client_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/cdc_client_mgr.h"
19
20
#include <brpc/closure_guard.h>
21
#include <fcntl.h>
22
#include <fmt/core.h>
23
#include <gen_cpp/internal_service.pb.h>
24
#include <google/protobuf/stubs/callback.h>
25
#include <signal.h>
26
#include <sys/stat.h>
27
#include <sys/wait.h>
28
#include <unistd.h>
29
30
#include <cstdio>
31
#ifndef __APPLE__
32
#include <sys/prctl.h>
33
#endif
34
35
#include <atomic>
36
#include <chrono>
37
#include <mutex>
38
#include <string>
39
#include <thread>
40
41
#include "common/config.h"
42
#include "common/logging.h"
43
#include "common/status.h"
44
#include "runtime/exec_env.h"
45
#include "service/http/http_client.h"
46
47
namespace doris {
48
49
namespace {
50
// Handle SIGCHLD signal to prevent zombie processes
51
215
void handle_sigchld(int sig_no) {
52
215
    int status = 0;
53
215
    pid_t pid;
54
283
    while ((pid = waitpid(-1, &status, WNOHANG)) > 0) {
55
68
    }
56
215
}
57
58
// Check CDC client health
59
#ifndef BE_TEST
60
836
Status check_cdc_client_health(int retry_times, int sleep_time, std::string& health_response) {
61
836
    const std::string cdc_health_url =
62
836
            "http://127.0.0.1:" + std::to_string(doris::config::cdc_client_port) +
63
836
            "/actuator/health";
64
65
838
    auto health_request = [cdc_health_url, &health_response](HttpClient* client) {
66
838
        RETURN_IF_ERROR(client->init(cdc_health_url));
67
838
        client->set_timeout_ms(5000);
68
838
        RETURN_IF_ERROR(client->execute(&health_response));
69
836
        return Status::OK();
70
838
    };
71
72
836
    Status status = HttpClient::execute_with_retry(retry_times, sleep_time, health_request);
73
74
836
    if (!status.ok()) {
75
0
        return Status::InternalError("CDC client health check failed");
76
0
    }
77
78
836
    bool is_up = health_response.find("UP") != std::string::npos;
79
80
836
    if (!is_up) {
81
0
        return Status::InternalError(fmt::format("CDC client unhealthy: {}", health_response));
82
0
    }
83
84
836
    return Status::OK();
85
836
}
86
#endif
87
88
} // anonymous namespace
89
90
36
CdcClientMgr::CdcClientMgr() = default;
91
92
32
CdcClientMgr::~CdcClientMgr() {
93
32
    stop();
94
32
}
95
96
47
void CdcClientMgr::stop() {
97
47
    pid_t pid = _child_pid.load();
98
47
    if (pid > 0) {
99
        // Check if process is still alive
100
24
        if (kill(pid, 0) == 0) {
101
2
            LOG(INFO) << "Stopping CDC client process, pid=" << pid;
102
            // Send SIGTERM for graceful shutdown
103
2
            kill(pid, SIGTERM);
104
            // Wait a short time for graceful shutdown
105
2
            std::this_thread::sleep_for(std::chrono::milliseconds(200));
106
            // Force kill if still alive
107
2
            if (kill(pid, 0) == 0) {
108
1
                LOG(INFO) << "Force killing CDC client process, pid=" << pid;
109
1
                kill(pid, SIGKILL);
110
1
                int status = 0;
111
1
                waitpid(pid, &status, 0);
112
1
            }
113
2
        }
114
24
        _child_pid.store(0);
115
24
    }
116
117
47
    LOG(INFO) << "CdcClientMgr is stopped";
118
47
}
119
120
836
Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
121
836
    std::lock_guard<std::mutex> lock(_start_mutex);
122
123
836
    Status st = Status::OK();
124
836
    pid_t exist_pid = _child_pid.load();
125
836
    if (exist_pid > 0) {
126
#ifdef BE_TEST
127
        // In test mode, directly return OK if PID exists
128
        LOG(INFO) << "cdc client already started (BE_TEST mode), pid=" << exist_pid;
129
        return Status::OK();
130
#else
131
        // Check if process is still alive
132
834
        if (kill(exist_pid, 0) == 0) {
133
            // Process exists, verify it's actually our CDC client by health check
134
834
            std::string check_response;
135
834
            auto check_st = check_cdc_client_health(3, 1, check_response);
136
834
            if (check_st.ok()) {
137
                // Process exists and responding, CDC client is running
138
834
                return Status::OK();
139
834
            } else {
140
                // Process exists but CDC client not responding
141
                // Either it's a different process (PID reused) or CDC client is unhealthy
142
0
                st = Status::InternalError(fmt::format("CDC client {} unresponsive", exist_pid));
143
0
                st.to_protobuf(result->mutable_status());
144
0
                return st;
145
0
            }
146
834
        } else {
147
0
            LOG(INFO) << "CDC client is dead, pid=" << exist_pid;
148
            // Process is dead, reset PID and continue to start
149
0
            _child_pid.store(0);
150
0
        }
151
834
#endif
152
834
    } else {
153
2
        LOG(INFO) << "CDC client has never been started";
154
2
    }
155
156
2
    const char* doris_home = getenv("DORIS_HOME");
157
2
    const char* log_dir = getenv("LOG_DIR");
158
2
    const std::string cdc_jar_path = std::string(doris_home) + "/lib/cdc_client/cdc-client.jar";
159
2
    const std::string cdc_jar_port =
160
2
            "--server.port=" + std::to_string(doris::config::cdc_client_port);
161
2
    const std::string backend_http_port =
162
2
            "--backend.http.port=" + std::to_string(config::webserver_port);
163
2
    const std::string cluster_token = "--cluster.token=" + ExecEnv::GetInstance()->token();
164
2
    const std::string java_opts = "-Dlog.path=" + std::string(log_dir);
165
166
    // check cdc jar exists
167
2
    struct stat buffer;
168
2
    if (stat(cdc_jar_path.c_str(), &buffer) != 0) {
169
0
        st = Status::InternalError("Can not find cdc-client.jar.");
170
0
        st.to_protobuf(result->mutable_status());
171
0
        return st;
172
0
    }
173
174
    // Ready to start cdc client
175
2
    LOG(INFO) << "Ready to start cdc client";
176
2
    const auto* java_home = getenv("JAVA_HOME");
177
2
    if (!java_home) {
178
0
        st = Status::InternalError("Can not find JAVA_HOME");
179
0
        st.to_protobuf(result->mutable_status());
180
0
        return st;
181
0
    }
182
2
    std::string path(java_home);
183
2
    std::string java_bin = path + "/bin/java";
184
    // Capture signal to prevent child process from becoming a zombie process
185
2
    struct sigaction act;
186
2
    act.sa_flags = 0;
187
2
    act.sa_handler = handle_sigchld;
188
2
    sigaction(SIGCHLD, &act, NULL);
189
2
    LOG(INFO) << "Start to fork cdc client process with " << path;
190
#ifdef BE_TEST
191
    _child_pid.store(99999);
192
    st = Status::OK();
193
    return st;
194
#else
195
2
    pid_t pid = fork();
196
2
    if (pid < 0) {
197
        // Fork failed
198
0
        st = Status::InternalError("Fork cdc client failed.");
199
0
        st.to_protobuf(result->mutable_status());
200
0
        return st;
201
2
    } else if (pid == 0) {
202
        // Child process
203
        // When the parent process is killed, the child process also needs to exit
204
0
#ifndef __APPLE__
205
0
        prctl(PR_SET_PDEATHSIG, SIGKILL);
206
0
#endif
207
        // Redirect stdout and stderr to log out file
208
0
        std::string cdc_out_file = std::string(log_dir) + "/cdc-client.out";
209
0
        int out_fd = open(cdc_out_file.c_str(), O_WRONLY | O_CREAT | O_APPEND | O_CLOEXEC, 0644);
210
0
        if (out_fd < 0) {
211
0
            perror("open cdc-client.out file failed");
212
0
            exit(1);
213
0
        }
214
0
        dup2(out_fd, STDOUT_FILENO);
215
0
        dup2(out_fd, STDERR_FILENO);
216
0
        close(out_fd);
217
218
        // java -jar -Dlog.path=xx cdc-client.jar --server.port=9096 --backend.http.port=8040
219
0
        execlp(java_bin.c_str(), "java", java_opts.c_str(), "-jar", cdc_jar_path.c_str(),
220
0
               cdc_jar_port.c_str(), backend_http_port.c_str(), cluster_token.c_str(), (char*)NULL);
221
        // If execlp returns, it means it failed
222
0
        perror("Cdc client child process error");
223
0
        exit(1);
224
2
    } else {
225
        // Parent process: save PID and wait for startup
226
2
        _child_pid.store(pid);
227
228
        // Waiting for cdc to start, failed after more than 3 * 10 seconds
229
2
        std::string health_response;
230
2
        Status status = check_cdc_client_health(3, 10, health_response);
231
2
        if (!status.ok()) {
232
            // Reset PID if startup failed
233
0
            _child_pid.store(0);
234
0
            st = Status::InternalError("Start cdc client failed.");
235
0
            st.to_protobuf(result->mutable_status());
236
2
        } else {
237
2
            LOG(INFO) << "Start cdc client success, pid=" << pid
238
2
                      << ", status=" << status.to_string() << ", response=" << health_response;
239
2
        }
240
2
    }
241
2
#endif //BE_TEST
242
2
    return st;
243
2
}
244
245
void CdcClientMgr::request_cdc_client_impl(const PRequestCdcClientRequest* request,
246
                                           PRequestCdcClientResult* result,
247
843
                                           google::protobuf::Closure* done) {
248
843
    brpc::ClosureGuard closure_guard(done);
249
250
    // Start CDC client if not started
251
843
    Status start_st = start_cdc_client(result);
252
843
    if (!start_st.ok()) {
253
1
        LOG(ERROR) << "Failed to start CDC client, status=" << start_st.to_string();
254
1
        start_st.to_protobuf(result->mutable_status());
255
1
        return;
256
1
    }
257
258
842
    std::string cdc_response;
259
842
    Status st = send_request_to_cdc_client(request->api(), request->params(), &cdc_response);
260
842
    result->set_response(cdc_response);
261
842
    st.to_protobuf(result->mutable_status());
262
842
}
263
264
Status CdcClientMgr::send_request_to_cdc_client(const std::string& api,
265
                                                const std::string& params_body,
266
848
                                                std::string* response) {
267
848
    std::string remote_url_prefix =
268
848
            fmt::format("http://127.0.0.1:{}{}", doris::config::cdc_client_port, api);
269
270
871
    auto cdc_request = [&remote_url_prefix, response, &params_body](HttpClient* client) {
271
871
        RETURN_IF_ERROR(client->init(remote_url_prefix));
272
871
        client->set_timeout_ms(doris::config::request_cdc_client_timeout_ms);
273
871
        if (!params_body.empty()) {
274
865
            client->set_payload(params_body);
275
865
        }
276
871
        client->set_content_type("application/json");
277
871
        client->set_method(POST);
278
871
        RETURN_IF_ERROR(client->execute(response));
279
835
        return Status::OK();
280
871
    };
281
282
848
    return HttpClient::execute_with_retry(3, 1, cdc_request);
283
848
}
284
285
} // namespace doris