Coverage Report

Created: 2026-06-09 14:23

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/udf/python/python_udf_runtime.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "udf/python/python_udf_runtime.h"
19
20
#include <butil/fd_utility.h>
21
#include <signal.h>
22
#include <string.h>
23
#include <sys/wait.h>
24
#include <unistd.h>
25
26
#include <algorithm>
27
#include <boost/process.hpp>
28
#include <cerrno>
29
#include <chrono>
30
#include <condition_variable>
31
#include <deque>
32
#include <mutex>
33
#include <thread>
34
#ifdef BE_TEST
35
#include <atomic>
36
#endif
37
38
#include "common/logging.h"
39
#include "runtime/thread_context.h"
40
41
namespace doris {
42
43
#ifdef BE_TEST
44
static constexpr std::chrono::milliseconds PROCESS_TERMINATE_TIMEOUT {100};
45
#else
46
static constexpr std::chrono::milliseconds PROCESS_TERMINATE_TIMEOUT {1000};
47
#endif
48
static constexpr std::chrono::milliseconds BACKGROUND_REAP_INTERVAL {1000};
49
50
#ifdef BE_TEST
51
static std::atomic<int> FORCED_CHILD_EXIT_TIMEOUTS {0};
52
53
static bool consume_forced_child_exit_timeout() {
54
    int remaining = FORCED_CHILD_EXIT_TIMEOUTS.load(std::memory_order_relaxed);
55
    while (remaining > 0) {
56
        if (FORCED_CHILD_EXIT_TIMEOUTS.compare_exchange_weak(remaining, remaining - 1,
57
                                                             std::memory_order_relaxed)) {
58
            return true;
59
        }
60
    }
61
    return false;
62
}
63
#endif
64
65
struct BackgroundChildReaper {
66
    std::mutex mutex;
67
    std::condition_variable cv;
68
    std::deque<pid_t> pids;
69
#ifdef BE_TEST
70
    std::deque<pid_t> reaped_pids;
71
#endif
72
    std::thread thread;
73
};
74
75
0
static BackgroundChildReaper& background_child_reaper() {
76
0
    static auto* reaper = new BackgroundChildReaper();
77
0
    return *reaper;
78
0
}
79
80
0
void PythonUDFProcess::enqueue_child_for_reap(pid_t pid) {
81
0
    if (pid <= 0) [[unlikely]] {
82
0
        return;
83
0
    }
84
85
0
    auto& reaper = background_child_reaper();
86
0
    {
87
0
        std::lock_guard<std::mutex> lock(reaper.mutex);
88
0
        if (std::find(reaper.pids.begin(), reaper.pids.end(), pid) != reaper.pids.end()) {
89
0
            return;
90
0
        }
91
0
        reaper.pids.push_back(pid);
92
0
        if (!reaper.thread.joinable()) {
93
            // This thread only owns pids that were already SIGKILLed but could not be reaped within
94
            // the bounded shutdown wait. Such processes may be stuck in uninterruptible I/O; if they
95
            // exit later and nobody calls waitpid(), they stay as zombies under BE. Keep reaping them
96
            // asynchronously so foreground shutdown remains bounded without dropping wait ownership.
97
0
            reaper.thread = std::thread([]() {
98
0
                SCOPED_INIT_THREAD_CONTEXT();
99
0
                std::deque<pid_t> pending_pids;
100
0
                while (true) {
101
0
                    auto& reaper_ref = background_child_reaper();
102
0
                    std::unique_lock<std::mutex> lock(reaper_ref.mutex);
103
0
                    if (pending_pids.empty()) {
104
0
                        reaper_ref.cv.wait(lock,
105
0
                                           [&reaper_ref]() { return !reaper_ref.pids.empty(); });
106
0
                    } else {
107
0
                        reaper_ref.cv.wait_for(lock, BACKGROUND_REAP_INTERVAL);
108
0
                    }
109
0
                    pending_pids.insert(pending_pids.end(), reaper_ref.pids.begin(),
110
0
                                        reaper_ref.pids.end());
111
0
                    reaper_ref.pids.clear();
112
0
                    std::deque<pid_t> pids;
113
0
                    pids.swap(pending_pids);
114
0
                    lock.unlock();
115
116
0
                    for (pid_t pending_pid : pids) {
117
0
                        int exit_status = 0;
118
0
                        auto wait_result = PythonUDFProcess::wait_child_exit(
119
0
                                pending_pid, std::chrono::milliseconds(0), &exit_status);
120
0
                        if (wait_result == ChildExitWaitResult::EXITED ||
121
0
                            wait_result == ChildExitWaitResult::ALREADY_REAPED) {
122
0
                            LOG(INFO) << "Background reaped Python process pid=" << pending_pid;
123
#ifdef BE_TEST
124
                            {
125
                                std::lock_guard<std::mutex> reaped_lock(reaper_ref.mutex);
126
                                reaper_ref.reaped_pids.push_back(pending_pid);
127
                            }
128
                            reaper_ref.cv.notify_all();
129
#endif
130
0
                        } else if (wait_result == ChildExitWaitResult::TIMEOUT) {
131
0
                            pending_pids.push_back(pending_pid);
132
0
                        } else {
133
0
                            LOG(WARNING) << "Background failed to reap Python process pid="
134
0
                                         << pending_pid;
135
0
                        }
136
0
                    }
137
0
                }
138
0
            });
139
0
        }
140
0
    }
141
0
    reaper.cv.notify_one();
142
0
}
143
144
#ifdef BE_TEST
145
bool PythonUDFProcess::wait_background_reaped_for_test(pid_t pid,
146
                                                       std::chrono::milliseconds timeout) {
147
    auto& reaper = background_child_reaper();
148
    std::unique_lock<std::mutex> lock(reaper.mutex);
149
    return reaper.cv.wait_for(lock, timeout, [&reaper, pid]() {
150
        return std::find(reaper.reaped_pids.begin(), reaper.reaped_pids.end(), pid) !=
151
               reaper.reaped_pids.end();
152
    });
153
}
154
155
void PythonUDFProcess::force_child_exit_timeouts_for_test(int count) {
156
    FORCED_CHILD_EXIT_TIMEOUTS.store(count, std::memory_order_relaxed);
157
}
158
#endif
159
160
PythonUDFProcess::ChildExitWaitResult PythonUDFProcess::wait_child_exit(
161
0
        pid_t pid, std::chrono::milliseconds timeout, int* exit_status) {
162
#ifdef BE_TEST
163
    if (consume_forced_child_exit_timeout()) {
164
        return ChildExitWaitResult::TIMEOUT;
165
    }
166
#endif
167
0
    const auto deadline = std::chrono::steady_clock::now() + timeout;
168
0
    while (true) {
169
0
        pid_t ret = waitpid(pid, exit_status, WNOHANG);
170
0
        if (ret == pid) {
171
0
            return ChildExitWaitResult::EXITED;
172
0
        }
173
0
        if (ret < 0) {
174
0
            if (errno == EINTR) {
175
                // retry if interrupted
176
0
                continue;
177
0
            }
178
            // Another owner may already have observed the child exit through boost::process.
179
0
            if (errno == ECHILD) {
180
0
                return ChildExitWaitResult::ALREADY_REAPED;
181
0
            }
182
0
            LOG(WARNING) << "Failed to wait Python process pid=" << pid << ": " << strerror(errno);
183
0
            return ChildExitWaitResult::ERROR;
184
0
        }
185
0
        if (std::chrono::steady_clock::now() >= deadline) {
186
0
            return ChildExitWaitResult::TIMEOUT;
187
0
        }
188
0
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
189
0
    }
190
0
}
191
192
0
void PythonUDFProcess::remove_unix_socket() {
193
0
    if (_uri.empty() || _unix_socket_file_path.empty()) return;
194
195
0
    if (unlink(_unix_socket_file_path.c_str()) == 0) {
196
0
        LOG(INFO) << "Successfully removed unix socket: " << _unix_socket_file_path;
197
0
        return;
198
0
    }
199
200
0
    if (errno == ENOENT) {
201
        // File does not exist, this is fine, no need to warn
202
0
        LOG(INFO) << "Unix socket not found (already removed): " << _uri;
203
0
    } else {
204
0
        LOG(WARNING) << "Failed to remove unix socket " << _uri << ": " << std::strerror(errno)
205
0
                     << " (errno=" << errno << ")";
206
0
    }
207
0
}
208
209
0
void PythonUDFProcess::shutdown() {
210
0
    if (!_child.valid() || _is_shutdown) return;
211
212
0
    int exit_status = 0;
213
0
    bool exited = !_child.running();
214
0
    bool status_available = false;
215
0
    bool already_reaped = false;
216
0
    if (!exited) {
217
0
        ::kill(_child_pid, SIGTERM);
218
0
        auto wait_result = wait_child_exit(_child_pid, PROCESS_TERMINATE_TIMEOUT, &exit_status);
219
0
        exited = wait_result == ChildExitWaitResult::EXITED ||
220
0
                 wait_result == ChildExitWaitResult::ALREADY_REAPED;
221
0
        status_available = wait_result == ChildExitWaitResult::EXITED;
222
0
        already_reaped = wait_result == ChildExitWaitResult::ALREADY_REAPED;
223
0
    } else {
224
0
        auto wait_result = wait_child_exit(_child_pid, std::chrono::milliseconds(0), &exit_status);
225
0
        status_available = wait_result == ChildExitWaitResult::EXITED;
226
0
        already_reaped = wait_result == ChildExitWaitResult::ALREADY_REAPED;
227
0
    }
228
229
0
    if (!exited) {
230
0
        LOG(WARNING) << "Python process did not terminate gracefully, sending SIGKILL, pid="
231
0
                     << _child_pid;
232
0
        ::kill(_child_pid, SIGKILL);
233
0
        auto wait_result = wait_child_exit(_child_pid, PROCESS_TERMINATE_TIMEOUT, &exit_status);
234
0
        exited = wait_result == ChildExitWaitResult::EXITED ||
235
0
                 wait_result == ChildExitWaitResult::ALREADY_REAPED;
236
0
        status_available = wait_result == ChildExitWaitResult::EXITED;
237
0
        already_reaped = wait_result == ChildExitWaitResult::ALREADY_REAPED;
238
0
    }
239
0
    _child.detach();
240
241
0
    if (!exited) [[unlikely]] {
242
0
        LOG(WARNING) << "Python process did not exit after SIGKILL, enqueue background reap, pid="
243
0
                     << _child_pid;
244
0
        enqueue_child_for_reap(_child_pid);
245
0
    } else if (already_reaped) {
246
0
        LOG(INFO) << "Python process already reaped by another owner, pid=" << _child_pid;
247
0
    } else if (!status_available) {
248
0
        LOG(INFO) << "Python process exited but exit status is unavailable, pid=" << _child_pid;
249
0
    } else {
250
0
        if (WIFSIGNALED(exit_status)) {
251
0
            LOG(INFO) << "Python process was killed by signal " << WTERMSIG(exit_status);
252
0
        } else if (WIFEXITED(exit_status)) {
253
0
            LOG(INFO) << "Python process exited normally with code: " << WEXITSTATUS(exit_status);
254
0
        } else {
255
0
            LOG(INFO) << "Python process exited";
256
0
        }
257
0
    }
258
259
0
    _output_stream.close();
260
0
    remove_unix_socket();
261
0
    _is_shutdown = true;
262
0
}
263
264
1.33k
std::string PythonUDFProcess::to_string() const {
265
1.33k
    return fmt::format(
266
1.33k
            "PythonUDFProcess(child_pid={}, uri={}, "
267
1.33k
            "unix_socket_file_path={}, is_shutdown={})",
268
1.33k
            _child_pid, _uri, _unix_socket_file_path, _is_shutdown);
269
1.33k
}
270
271
} // namespace doris