Coverage Report

Created: 2026-06-13 18:01

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/udf/python/python_udf_runtime.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "udf/python/python_udf_runtime.h"
19
20
#include <butil/fd_utility.h>
21
#include <signal.h>
22
#include <string.h>
23
#include <sys/wait.h>
24
#include <unistd.h>
25
26
#include <algorithm>
27
#include <boost/process.hpp>
28
#include <cerrno>
29
#include <chrono>
30
#include <condition_variable>
31
#include <deque>
32
#include <mutex>
33
#include <thread>
34
#ifdef BE_TEST
35
#include <atomic>
36
#endif
37
38
#include "common/logging.h"
39
#include "runtime/thread_context.h"
40
41
namespace doris {
42
43
#ifdef BE_TEST
44
static constexpr std::chrono::milliseconds PROCESS_TERMINATE_TIMEOUT {100};
45
#else
46
static constexpr std::chrono::milliseconds PROCESS_TERMINATE_TIMEOUT {1000};
47
#endif
48
static constexpr std::chrono::milliseconds BACKGROUND_REAP_INTERVAL {1000};
49
50
#ifdef BE_TEST
51
static std::atomic<int> FORCED_CHILD_EXIT_TIMEOUTS {0};
52
53
static bool consume_forced_child_exit_timeout() {
54
    int remaining = FORCED_CHILD_EXIT_TIMEOUTS.load(std::memory_order_relaxed);
55
    while (remaining > 0) {
56
        if (FORCED_CHILD_EXIT_TIMEOUTS.compare_exchange_weak(remaining, remaining - 1,
57
                                                             std::memory_order_relaxed)) {
58
            return true;
59
        }
60
    }
61
    return false;
62
}
63
#endif
64
65
struct BackgroundChildReaper {
66
    std::mutex mutex;
67
    std::condition_variable cv;
68
    std::deque<pid_t> pids;
69
#ifdef BE_TEST
70
    std::deque<pid_t> reaped_pids;
71
#endif
72
    std::thread thread;
73
};
74
75
11
static BackgroundChildReaper& background_child_reaper() {
76
11
    static auto* reaper = new BackgroundChildReaper();
77
11
    return *reaper;
78
11
}
79
80
3
void PythonUDFProcess::enqueue_child_for_reap(pid_t pid) {
81
3
    if (pid <= 0) [[unlikely]] {
82
0
        return;
83
0
    }
84
85
3
    auto& reaper = background_child_reaper();
86
3
    {
87
3
        std::lock_guard<std::mutex> lock(reaper.mutex);
88
3
        if (std::find(reaper.pids.begin(), reaper.pids.end(), pid) != reaper.pids.end()) {
89
0
            return;
90
0
        }
91
3
        reaper.pids.push_back(pid);
92
3
        if (!reaper.thread.joinable()) {
93
            // This thread only owns pids that were already SIGKILLed but could not be reaped within
94
            // the bounded shutdown wait. Such processes may be stuck in uninterruptible I/O; if they
95
            // exit later and nobody calls waitpid(), they stay as zombies under BE. Keep reaping them
96
            // asynchronously so foreground shutdown remains bounded without dropping wait ownership.
97
1
            reaper.thread = std::thread([]() {
98
1
                SCOPED_INIT_THREAD_CONTEXT();
99
1
                std::deque<pid_t> pending_pids;
100
6
                while (true) {
101
5
                    auto& reaper_ref = background_child_reaper();
102
5
                    std::unique_lock<std::mutex> lock(reaper_ref.mutex);
103
5
                    if (pending_pids.empty()) {
104
4
                        reaper_ref.cv.wait(lock,
105
6
                                           [&reaper_ref]() { return !reaper_ref.pids.empty(); });
106
4
                    } else {
107
1
                        reaper_ref.cv.wait_for(lock, BACKGROUND_REAP_INTERVAL);
108
1
                    }
109
5
                    pending_pids.insert(pending_pids.end(), reaper_ref.pids.begin(),
110
5
                                        reaper_ref.pids.end());
111
5
                    reaper_ref.pids.clear();
112
5
                    std::deque<pid_t> pids;
113
5
                    pids.swap(pending_pids);
114
5
                    lock.unlock();
115
116
5
                    for (pid_t pending_pid : pids) {
117
4
                        int exit_status = 0;
118
4
                        auto wait_result = PythonUDFProcess::wait_child_exit(
119
4
                                pending_pid, std::chrono::milliseconds(0), &exit_status);
120
4
                        if (wait_result == ChildExitWaitResult::EXITED ||
121
4
                            wait_result == ChildExitWaitResult::ALREADY_REAPED) {
122
3
                            LOG(INFO) << "Background reaped Python process pid=" << pending_pid;
123
#ifdef BE_TEST
124
                            {
125
                                std::lock_guard<std::mutex> reaped_lock(reaper_ref.mutex);
126
                                reaper_ref.reaped_pids.push_back(pending_pid);
127
                            }
128
                            reaper_ref.cv.notify_all();
129
#endif
130
3
                        } else if (wait_result == ChildExitWaitResult::TIMEOUT) {
131
1
                            pending_pids.push_back(pending_pid);
132
1
                        } else {
133
0
                            LOG(WARNING) << "Background failed to reap Python process pid="
134
0
                                         << pending_pid;
135
0
                        }
136
4
                    }
137
5
                }
138
1
            });
139
1
        }
140
3
    }
141
0
    reaper.cv.notify_one();
142
3
}
143
144
#ifdef BE_TEST
145
bool PythonUDFProcess::wait_background_reaped_for_test(pid_t pid,
146
                                                       std::chrono::milliseconds timeout) {
147
    auto& reaper = background_child_reaper();
148
    std::unique_lock<std::mutex> lock(reaper.mutex);
149
    return reaper.cv.wait_for(lock, timeout, [&reaper, pid]() {
150
        return std::find(reaper.reaped_pids.begin(), reaper.reaped_pids.end(), pid) !=
151
               reaper.reaped_pids.end();
152
    });
153
}
154
155
void PythonUDFProcess::force_child_exit_timeouts_for_test(int count) {
156
    FORCED_CHILD_EXIT_TIMEOUTS.store(count, std::memory_order_relaxed);
157
}
158
#endif
159
160
PythonUDFProcess::ChildExitWaitResult PythonUDFProcess::wait_child_exit(
161
0
        pid_t pid, std::chrono::milliseconds timeout, int* exit_status) {
162
#ifdef BE_TEST
163
    if (consume_forced_child_exit_timeout()) {
164
        return ChildExitWaitResult::TIMEOUT;
165
    }
166
#endif
167
0
    const auto deadline = std::chrono::steady_clock::now() + timeout;
168
0
    while (true) {
169
0
        pid_t ret = waitpid(pid, exit_status, WNOHANG);
170
0
        if (ret == pid) {
171
0
            return ChildExitWaitResult::EXITED;
172
0
        }
173
0
        if (ret < 0) {
174
0
            if (errno == EINTR) {
175
                // retry if interrupted
176
0
                continue;
177
0
            }
178
            // Another owner may already have observed the child exit through boost::process.
179
0
            if (errno == ECHILD) {
180
0
                return ChildExitWaitResult::ALREADY_REAPED;
181
0
            }
182
0
            LOG(WARNING) << "Failed to wait Python process pid=" << pid << ": " << strerror(errno);
183
0
            return ChildExitWaitResult::ERROR;
184
0
        }
185
0
        if (std::chrono::steady_clock::now() >= deadline) {
186
0
            return ChildExitWaitResult::TIMEOUT;
187
0
        }
188
0
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
189
0
    }
190
0
}
191
192
43
void PythonUDFProcess::remove_unix_socket() {
193
43
    if (_uri.empty() || _unix_socket_file_path.empty()) return;
194
195
43
    if (unlink(_unix_socket_file_path.c_str()) == 0) {
196
29
        LOG(INFO) << "Successfully removed unix socket: " << _unix_socket_file_path;
197
29
        return;
198
29
    }
199
200
14
    if (errno == ENOENT) {
201
        // File does not exist, this is fine, no need to warn
202
13
        LOG(INFO) << "Unix socket not found (already removed): " << _uri;
203
13
    } else {
204
1
        LOG(WARNING) << "Failed to remove unix socket " << _uri << ": " << std::strerror(errno)
205
1
                     << " (errno=" << errno << ")";
206
1
    }
207
14
}
208
209
92
void PythonUDFProcess::shutdown() {
210
92
    if (!_child.valid() || _is_shutdown) return;
211
212
43
    int exit_status = 0;
213
43
    bool exited = !_child.running();
214
43
    bool status_available = false;
215
43
    bool already_reaped = false;
216
43
    if (!exited) {
217
43
        ::kill(_child_pid, SIGTERM);
218
43
        auto wait_result = wait_child_exit(_child_pid, PROCESS_TERMINATE_TIMEOUT, &exit_status);
219
43
        exited = wait_result == ChildExitWaitResult::EXITED ||
220
43
                 wait_result == ChildExitWaitResult::ALREADY_REAPED;
221
43
        status_available = wait_result == ChildExitWaitResult::EXITED;
222
43
        already_reaped = wait_result == ChildExitWaitResult::ALREADY_REAPED;
223
43
    } else {
224
0
        auto wait_result = wait_child_exit(_child_pid, std::chrono::milliseconds(0), &exit_status);
225
0
        status_available = wait_result == ChildExitWaitResult::EXITED;
226
0
        already_reaped = wait_result == ChildExitWaitResult::ALREADY_REAPED;
227
0
    }
228
229
43
    if (!exited) {
230
29
        LOG(WARNING) << "Python process did not terminate gracefully, sending SIGKILL, pid="
231
29
                     << _child_pid;
232
29
        ::kill(_child_pid, SIGKILL);
233
29
        auto wait_result = wait_child_exit(_child_pid, PROCESS_TERMINATE_TIMEOUT, &exit_status);
234
29
        exited = wait_result == ChildExitWaitResult::EXITED ||
235
29
                 wait_result == ChildExitWaitResult::ALREADY_REAPED;
236
29
        status_available = wait_result == ChildExitWaitResult::EXITED;
237
29
        already_reaped = wait_result == ChildExitWaitResult::ALREADY_REAPED;
238
29
    }
239
43
    _child.detach();
240
241
43
    if (!exited) [[unlikely]] {
242
1
        LOG(WARNING) << "Python process did not exit after SIGKILL, enqueue background reap, pid="
243
1
                     << _child_pid;
244
1
        enqueue_child_for_reap(_child_pid);
245
42
    } else if (already_reaped) {
246
42
        LOG(INFO) << "Python process already reaped by another owner, pid=" << _child_pid;
247
42
    } else if (!status_available) {
248
0
        LOG(INFO) << "Python process exited but exit status is unavailable, pid=" << _child_pid;
249
0
    } else {
250
0
        if (WIFSIGNALED(exit_status)) {
251
0
            LOG(INFO) << "Python process was killed by signal " << WTERMSIG(exit_status);
252
0
        } else if (WIFEXITED(exit_status)) {
253
0
            LOG(INFO) << "Python process exited normally with code: " << WEXITSTATUS(exit_status);
254
0
        } else {
255
0
            LOG(INFO) << "Python process exited";
256
0
        }
257
0
    }
258
259
43
    _output_stream.close();
260
43
    remove_unix_socket();
261
43
    _is_shutdown = true;
262
43
}
263
264
1.30k
std::string PythonUDFProcess::to_string() const {
265
1.30k
    return fmt::format(
266
1.30k
            "PythonUDFProcess(child_pid={}, uri={}, "
267
1.30k
            "unix_socket_file_path={}, is_shutdown={})",
268
1.30k
            _child_pid, _uri, _unix_socket_file_path, _is_shutdown);
269
1.30k
}
270
271
} // namespace doris