Coverage Report

Created: 2026-06-11 15:31

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/udf/python/python_udf_runtime.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "udf/python/python_udf_runtime.h"
19
20
#include <butil/fd_utility.h>
21
#include <signal.h>
22
#include <string.h>
23
#include <sys/wait.h>
24
#include <unistd.h>
25
26
#include <algorithm>
27
#include <cerrno>
28
#include <chrono>
29
#include <condition_variable>
30
#include <deque>
31
#include <mutex>
32
#include <thread>
33
34
#include "udf/python/boost_process_compat.h"
35
#ifdef BE_TEST
36
#include <atomic>
37
#endif
38
39
#include "common/logging.h"
40
#include "runtime/thread_context.h"
41
42
namespace doris {
43
44
#ifdef BE_TEST
45
static constexpr std::chrono::milliseconds PROCESS_TERMINATE_TIMEOUT {100};
46
#else
47
static constexpr std::chrono::milliseconds PROCESS_TERMINATE_TIMEOUT {1000};
48
#endif
49
static constexpr std::chrono::milliseconds BACKGROUND_REAP_INTERVAL {1000};
50
51
#ifdef BE_TEST
52
static std::atomic<int> FORCED_CHILD_EXIT_TIMEOUTS {0};
53
54
static bool consume_forced_child_exit_timeout() {
55
    int remaining = FORCED_CHILD_EXIT_TIMEOUTS.load(std::memory_order_relaxed);
56
    while (remaining > 0) {
57
        if (FORCED_CHILD_EXIT_TIMEOUTS.compare_exchange_weak(remaining, remaining - 1,
58
                                                             std::memory_order_relaxed)) {
59
            return true;
60
        }
61
    }
62
    return false;
63
}
64
#endif
65
66
struct BackgroundChildReaper {
67
    std::mutex mutex;
68
    std::condition_variable cv;
69
    std::deque<pid_t> pids;
70
#ifdef BE_TEST
71
    std::deque<pid_t> reaped_pids;
72
#endif
73
    std::thread thread;
74
};
75
76
0
static BackgroundChildReaper& background_child_reaper() {
77
0
    static auto* reaper = new BackgroundChildReaper();
78
0
    return *reaper;
79
0
}
80
81
0
void PythonUDFProcess::enqueue_child_for_reap(pid_t pid) {
82
0
    if (pid <= 0) [[unlikely]] {
83
0
        return;
84
0
    }
85
86
0
    auto& reaper = background_child_reaper();
87
0
    {
88
0
        std::lock_guard<std::mutex> lock(reaper.mutex);
89
0
        if (std::find(reaper.pids.begin(), reaper.pids.end(), pid) != reaper.pids.end()) {
90
0
            return;
91
0
        }
92
0
        reaper.pids.push_back(pid);
93
0
        if (!reaper.thread.joinable()) {
94
            // This thread only owns pids that were already SIGKILLed but could not be reaped within
95
            // the bounded shutdown wait. Such processes may be stuck in uninterruptible I/O; if they
96
            // exit later and nobody calls waitpid(), they stay as zombies under BE. Keep reaping them
97
            // asynchronously so foreground shutdown remains bounded without dropping wait ownership.
98
0
            reaper.thread = std::thread([]() {
99
0
                SCOPED_INIT_THREAD_CONTEXT();
100
0
                std::deque<pid_t> pending_pids;
101
0
                while (true) {
102
0
                    auto& reaper_ref = background_child_reaper();
103
0
                    std::unique_lock<std::mutex> lock(reaper_ref.mutex);
104
0
                    if (pending_pids.empty()) {
105
0
                        reaper_ref.cv.wait(lock,
106
0
                                           [&reaper_ref]() { return !reaper_ref.pids.empty(); });
107
0
                    } else {
108
0
                        reaper_ref.cv.wait_for(lock, BACKGROUND_REAP_INTERVAL);
109
0
                    }
110
0
                    pending_pids.insert(pending_pids.end(), reaper_ref.pids.begin(),
111
0
                                        reaper_ref.pids.end());
112
0
                    reaper_ref.pids.clear();
113
0
                    std::deque<pid_t> pids;
114
0
                    pids.swap(pending_pids);
115
0
                    lock.unlock();
116
117
0
                    for (pid_t pending_pid : pids) {
118
0
                        int exit_status = 0;
119
0
                        auto wait_result = PythonUDFProcess::wait_child_exit(
120
0
                                pending_pid, std::chrono::milliseconds(0), &exit_status);
121
0
                        if (wait_result == ChildExitWaitResult::EXITED ||
122
0
                            wait_result == ChildExitWaitResult::ALREADY_REAPED) {
123
0
                            LOG(INFO) << "Background reaped Python process pid=" << pending_pid;
124
#ifdef BE_TEST
125
                            {
126
                                std::lock_guard<std::mutex> reaped_lock(reaper_ref.mutex);
127
                                reaper_ref.reaped_pids.push_back(pending_pid);
128
                            }
129
                            reaper_ref.cv.notify_all();
130
#endif
131
0
                        } else if (wait_result == ChildExitWaitResult::TIMEOUT) {
132
0
                            pending_pids.push_back(pending_pid);
133
0
                        } else {
134
0
                            LOG(WARNING) << "Background failed to reap Python process pid="
135
0
                                         << pending_pid;
136
0
                        }
137
0
                    }
138
0
                }
139
0
            });
140
0
        }
141
0
    }
142
0
    reaper.cv.notify_one();
143
0
}
144
145
#ifdef BE_TEST
146
bool PythonUDFProcess::wait_background_reaped_for_test(pid_t pid,
147
                                                       std::chrono::milliseconds timeout) {
148
    auto& reaper = background_child_reaper();
149
    std::unique_lock<std::mutex> lock(reaper.mutex);
150
    return reaper.cv.wait_for(lock, timeout, [&reaper, pid]() {
151
        return std::find(reaper.reaped_pids.begin(), reaper.reaped_pids.end(), pid) !=
152
               reaper.reaped_pids.end();
153
    });
154
}
155
156
void PythonUDFProcess::force_child_exit_timeouts_for_test(int count) {
157
    FORCED_CHILD_EXIT_TIMEOUTS.store(count, std::memory_order_relaxed);
158
}
159
#endif
160
161
PythonUDFProcess::ChildExitWaitResult PythonUDFProcess::wait_child_exit(
162
0
        pid_t pid, std::chrono::milliseconds timeout, int* exit_status) {
163
#ifdef BE_TEST
164
    if (consume_forced_child_exit_timeout()) {
165
        return ChildExitWaitResult::TIMEOUT;
166
    }
167
#endif
168
0
    const auto deadline = std::chrono::steady_clock::now() + timeout;
169
0
    while (true) {
170
0
        pid_t ret = waitpid(pid, exit_status, WNOHANG);
171
0
        if (ret == pid) {
172
0
            return ChildExitWaitResult::EXITED;
173
0
        }
174
0
        if (ret < 0) {
175
0
            if (errno == EINTR) {
176
                // retry if interrupted
177
0
                continue;
178
0
            }
179
            // Another owner may already have observed the child exit through boost::process.
180
0
            if (errno == ECHILD) {
181
0
                return ChildExitWaitResult::ALREADY_REAPED;
182
0
            }
183
0
            LOG(WARNING) << "Failed to wait Python process pid=" << pid << ": " << strerror(errno);
184
0
            return ChildExitWaitResult::ERROR;
185
0
        }
186
0
        if (std::chrono::steady_clock::now() >= deadline) {
187
0
            return ChildExitWaitResult::TIMEOUT;
188
0
        }
189
0
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
190
0
    }
191
0
}
192
193
0
void PythonUDFProcess::remove_unix_socket() {
194
0
    if (_uri.empty() || _unix_socket_file_path.empty()) return;
195
196
0
    if (unlink(_unix_socket_file_path.c_str()) == 0) {
197
0
        LOG(INFO) << "Successfully removed unix socket: " << _unix_socket_file_path;
198
0
        return;
199
0
    }
200
201
0
    if (errno == ENOENT) {
202
        // File does not exist, this is fine, no need to warn
203
0
        LOG(INFO) << "Unix socket not found (already removed): " << _uri;
204
0
    } else {
205
0
        LOG(WARNING) << "Failed to remove unix socket " << _uri << ": " << std::strerror(errno)
206
0
                     << " (errno=" << errno << ")";
207
0
    }
208
0
}
209
210
0
void PythonUDFProcess::shutdown() {
211
0
    if (!_child.valid() || _is_shutdown) return;
212
213
0
    int exit_status = 0;
214
0
    bool exited = !_child.running();
215
0
    bool status_available = false;
216
0
    bool already_reaped = false;
217
0
    if (!exited) {
218
0
        ::kill(_child_pid, SIGTERM);
219
0
        auto wait_result = wait_child_exit(_child_pid, PROCESS_TERMINATE_TIMEOUT, &exit_status);
220
0
        exited = wait_result == ChildExitWaitResult::EXITED ||
221
0
                 wait_result == ChildExitWaitResult::ALREADY_REAPED;
222
0
        status_available = wait_result == ChildExitWaitResult::EXITED;
223
0
        already_reaped = wait_result == ChildExitWaitResult::ALREADY_REAPED;
224
0
    } else {
225
0
        auto wait_result = wait_child_exit(_child_pid, std::chrono::milliseconds(0), &exit_status);
226
0
        status_available = wait_result == ChildExitWaitResult::EXITED;
227
0
        already_reaped = wait_result == ChildExitWaitResult::ALREADY_REAPED;
228
0
    }
229
230
0
    if (!exited) {
231
0
        LOG(WARNING) << "Python process did not terminate gracefully, sending SIGKILL, pid="
232
0
                     << _child_pid;
233
0
        ::kill(_child_pid, SIGKILL);
234
0
        auto wait_result = wait_child_exit(_child_pid, PROCESS_TERMINATE_TIMEOUT, &exit_status);
235
0
        exited = wait_result == ChildExitWaitResult::EXITED ||
236
0
                 wait_result == ChildExitWaitResult::ALREADY_REAPED;
237
0
        status_available = wait_result == ChildExitWaitResult::EXITED;
238
0
        already_reaped = wait_result == ChildExitWaitResult::ALREADY_REAPED;
239
0
    }
240
0
    _child.detach();
241
242
0
    if (!exited) [[unlikely]] {
243
0
        LOG(WARNING) << "Python process did not exit after SIGKILL, enqueue background reap, pid="
244
0
                     << _child_pid;
245
0
        enqueue_child_for_reap(_child_pid);
246
0
    } else if (already_reaped) {
247
0
        LOG(INFO) << "Python process already reaped by another owner, pid=" << _child_pid;
248
0
    } else if (!status_available) {
249
0
        LOG(INFO) << "Python process exited but exit status is unavailable, pid=" << _child_pid;
250
0
    } else {
251
0
        if (WIFSIGNALED(exit_status)) {
252
0
            LOG(INFO) << "Python process was killed by signal " << WTERMSIG(exit_status);
253
0
        } else if (WIFEXITED(exit_status)) {
254
0
            LOG(INFO) << "Python process exited normally with code: " << WEXITSTATUS(exit_status);
255
0
        } else {
256
0
            LOG(INFO) << "Python process exited";
257
0
        }
258
0
    }
259
260
0
    _output_stream.close();
261
0
    remove_unix_socket();
262
0
    _is_shutdown = true;
263
0
}
264
265
0
std::string PythonUDFProcess::to_string() const {
266
0
    return fmt::format(
267
0
            "PythonUDFProcess(child_pid={}, uri={}, "
268
0
            "unix_socket_file_path={}, is_shutdown={})",
269
0
            _child_pid, _uri, _unix_socket_file_path, _is_shutdown);
270
0
}
271
272
} // namespace doris