be/src/udf/python/python_udf_runtime.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "udf/python/python_udf_runtime.h" |
19 | | |
20 | | #include <butil/fd_utility.h> |
21 | | #include <signal.h> |
22 | | #include <string.h> |
23 | | #include <sys/wait.h> |
24 | | #include <unistd.h> |
25 | | |
26 | | #include <algorithm> |
27 | | #include <cerrno> |
28 | | #include <chrono> |
29 | | #include <condition_variable> |
30 | | #include <deque> |
31 | | #include <mutex> |
32 | | #include <thread> |
33 | | |
34 | | #include "udf/python/boost_process_compat.h" |
35 | | #ifdef BE_TEST |
36 | | #include <atomic> |
37 | | #endif |
38 | | |
39 | | #include "common/logging.h" |
40 | | #include "runtime/thread_context.h" |
41 | | |
42 | | namespace doris { |
43 | | |
44 | | #ifdef BE_TEST |
45 | | static constexpr std::chrono::milliseconds PROCESS_TERMINATE_TIMEOUT {100}; |
46 | | #else |
47 | | static constexpr std::chrono::milliseconds PROCESS_TERMINATE_TIMEOUT {1000}; |
48 | | #endif |
49 | | static constexpr std::chrono::milliseconds BACKGROUND_REAP_INTERVAL {1000}; |
50 | | |
51 | | #ifdef BE_TEST |
52 | | static std::atomic<int> FORCED_CHILD_EXIT_TIMEOUTS {0}; |
53 | | |
54 | | static bool consume_forced_child_exit_timeout() { |
55 | | int remaining = FORCED_CHILD_EXIT_TIMEOUTS.load(std::memory_order_relaxed); |
56 | | while (remaining > 0) { |
57 | | if (FORCED_CHILD_EXIT_TIMEOUTS.compare_exchange_weak(remaining, remaining - 1, |
58 | | std::memory_order_relaxed)) { |
59 | | return true; |
60 | | } |
61 | | } |
62 | | return false; |
63 | | } |
64 | | #endif |
65 | | |
66 | | struct BackgroundChildReaper { |
67 | | std::mutex mutex; |
68 | | std::condition_variable cv; |
69 | | std::deque<pid_t> pids; |
70 | | #ifdef BE_TEST |
71 | | std::deque<pid_t> reaped_pids; |
72 | | #endif |
73 | | std::thread thread; |
74 | | }; |
75 | | |
76 | 0 | static BackgroundChildReaper& background_child_reaper() { |
77 | 0 | static auto* reaper = new BackgroundChildReaper(); |
78 | 0 | return *reaper; |
79 | 0 | } |
80 | | |
81 | 0 | void PythonUDFProcess::enqueue_child_for_reap(pid_t pid) { |
82 | 0 | if (pid <= 0) [[unlikely]] { |
83 | 0 | return; |
84 | 0 | } |
85 | | |
86 | 0 | auto& reaper = background_child_reaper(); |
87 | 0 | { |
88 | 0 | std::lock_guard<std::mutex> lock(reaper.mutex); |
89 | 0 | if (std::find(reaper.pids.begin(), reaper.pids.end(), pid) != reaper.pids.end()) { |
90 | 0 | return; |
91 | 0 | } |
92 | 0 | reaper.pids.push_back(pid); |
93 | 0 | if (!reaper.thread.joinable()) { |
94 | | // This thread only owns pids that were already SIGKILLed but could not be reaped within |
95 | | // the bounded shutdown wait. Such processes may be stuck in uninterruptible I/O; if they |
96 | | // exit later and nobody calls waitpid(), they stay as zombies under BE. Keep reaping them |
97 | | // asynchronously so foreground shutdown remains bounded without dropping wait ownership. |
98 | 0 | reaper.thread = std::thread([]() { |
99 | 0 | SCOPED_INIT_THREAD_CONTEXT(); |
100 | 0 | std::deque<pid_t> pending_pids; |
101 | 0 | while (true) { |
102 | 0 | auto& reaper_ref = background_child_reaper(); |
103 | 0 | std::unique_lock<std::mutex> lock(reaper_ref.mutex); |
104 | 0 | if (pending_pids.empty()) { |
105 | 0 | reaper_ref.cv.wait(lock, |
106 | 0 | [&reaper_ref]() { return !reaper_ref.pids.empty(); }); |
107 | 0 | } else { |
108 | 0 | reaper_ref.cv.wait_for(lock, BACKGROUND_REAP_INTERVAL); |
109 | 0 | } |
110 | 0 | pending_pids.insert(pending_pids.end(), reaper_ref.pids.begin(), |
111 | 0 | reaper_ref.pids.end()); |
112 | 0 | reaper_ref.pids.clear(); |
113 | 0 | std::deque<pid_t> pids; |
114 | 0 | pids.swap(pending_pids); |
115 | 0 | lock.unlock(); |
116 | |
|
117 | 0 | for (pid_t pending_pid : pids) { |
118 | 0 | int exit_status = 0; |
119 | 0 | auto wait_result = PythonUDFProcess::wait_child_exit( |
120 | 0 | pending_pid, std::chrono::milliseconds(0), &exit_status); |
121 | 0 | if (wait_result == ChildExitWaitResult::EXITED || |
122 | 0 | wait_result == ChildExitWaitResult::ALREADY_REAPED) { |
123 | 0 | LOG(INFO) << "Background reaped Python process pid=" << pending_pid; |
124 | | #ifdef BE_TEST |
125 | | { |
126 | | std::lock_guard<std::mutex> reaped_lock(reaper_ref.mutex); |
127 | | reaper_ref.reaped_pids.push_back(pending_pid); |
128 | | } |
129 | | reaper_ref.cv.notify_all(); |
130 | | #endif |
131 | 0 | } else if (wait_result == ChildExitWaitResult::TIMEOUT) { |
132 | 0 | pending_pids.push_back(pending_pid); |
133 | 0 | } else { |
134 | 0 | LOG(WARNING) << "Background failed to reap Python process pid=" |
135 | 0 | << pending_pid; |
136 | 0 | } |
137 | 0 | } |
138 | 0 | } |
139 | 0 | }); |
140 | 0 | } |
141 | 0 | } |
142 | 0 | reaper.cv.notify_one(); |
143 | 0 | } |
144 | | |
145 | | #ifdef BE_TEST |
146 | | bool PythonUDFProcess::wait_background_reaped_for_test(pid_t pid, |
147 | | std::chrono::milliseconds timeout) { |
148 | | auto& reaper = background_child_reaper(); |
149 | | std::unique_lock<std::mutex> lock(reaper.mutex); |
150 | | return reaper.cv.wait_for(lock, timeout, [&reaper, pid]() { |
151 | | return std::find(reaper.reaped_pids.begin(), reaper.reaped_pids.end(), pid) != |
152 | | reaper.reaped_pids.end(); |
153 | | }); |
154 | | } |
155 | | |
156 | | void PythonUDFProcess::force_child_exit_timeouts_for_test(int count) { |
157 | | FORCED_CHILD_EXIT_TIMEOUTS.store(count, std::memory_order_relaxed); |
158 | | } |
159 | | #endif |
160 | | |
161 | | PythonUDFProcess::ChildExitWaitResult PythonUDFProcess::wait_child_exit( |
162 | 0 | pid_t pid, std::chrono::milliseconds timeout, int* exit_status) { |
163 | | #ifdef BE_TEST |
164 | | if (consume_forced_child_exit_timeout()) { |
165 | | return ChildExitWaitResult::TIMEOUT; |
166 | | } |
167 | | #endif |
168 | 0 | const auto deadline = std::chrono::steady_clock::now() + timeout; |
169 | 0 | while (true) { |
170 | 0 | pid_t ret = waitpid(pid, exit_status, WNOHANG); |
171 | 0 | if (ret == pid) { |
172 | 0 | return ChildExitWaitResult::EXITED; |
173 | 0 | } |
174 | 0 | if (ret < 0) { |
175 | 0 | if (errno == EINTR) { |
176 | | // retry if interrupted |
177 | 0 | continue; |
178 | 0 | } |
179 | | // Another owner may already have observed the child exit through boost::process. |
180 | 0 | if (errno == ECHILD) { |
181 | 0 | return ChildExitWaitResult::ALREADY_REAPED; |
182 | 0 | } |
183 | 0 | LOG(WARNING) << "Failed to wait Python process pid=" << pid << ": " << strerror(errno); |
184 | 0 | return ChildExitWaitResult::ERROR; |
185 | 0 | } |
186 | 0 | if (std::chrono::steady_clock::now() >= deadline) { |
187 | 0 | return ChildExitWaitResult::TIMEOUT; |
188 | 0 | } |
189 | 0 | std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
190 | 0 | } |
191 | 0 | } |
192 | | |
193 | 0 | void PythonUDFProcess::remove_unix_socket() { |
194 | 0 | if (_uri.empty() || _unix_socket_file_path.empty()) return; |
195 | | |
196 | 0 | if (unlink(_unix_socket_file_path.c_str()) == 0) { |
197 | 0 | LOG(INFO) << "Successfully removed unix socket: " << _unix_socket_file_path; |
198 | 0 | return; |
199 | 0 | } |
200 | | |
201 | 0 | if (errno == ENOENT) { |
202 | | // File does not exist, this is fine, no need to warn |
203 | 0 | LOG(INFO) << "Unix socket not found (already removed): " << _uri; |
204 | 0 | } else { |
205 | 0 | LOG(WARNING) << "Failed to remove unix socket " << _uri << ": " << std::strerror(errno) |
206 | 0 | << " (errno=" << errno << ")"; |
207 | 0 | } |
208 | 0 | } |
209 | | |
210 | 0 | void PythonUDFProcess::shutdown() { |
211 | 0 | if (!_child.valid() || _is_shutdown) return; |
212 | | |
213 | 0 | int exit_status = 0; |
214 | 0 | bool exited = !_child.running(); |
215 | 0 | bool status_available = false; |
216 | 0 | bool already_reaped = false; |
217 | 0 | if (!exited) { |
218 | 0 | ::kill(_child_pid, SIGTERM); |
219 | 0 | auto wait_result = wait_child_exit(_child_pid, PROCESS_TERMINATE_TIMEOUT, &exit_status); |
220 | 0 | exited = wait_result == ChildExitWaitResult::EXITED || |
221 | 0 | wait_result == ChildExitWaitResult::ALREADY_REAPED; |
222 | 0 | status_available = wait_result == ChildExitWaitResult::EXITED; |
223 | 0 | already_reaped = wait_result == ChildExitWaitResult::ALREADY_REAPED; |
224 | 0 | } else { |
225 | 0 | auto wait_result = wait_child_exit(_child_pid, std::chrono::milliseconds(0), &exit_status); |
226 | 0 | status_available = wait_result == ChildExitWaitResult::EXITED; |
227 | 0 | already_reaped = wait_result == ChildExitWaitResult::ALREADY_REAPED; |
228 | 0 | } |
229 | |
|
230 | 0 | if (!exited) { |
231 | 0 | LOG(WARNING) << "Python process did not terminate gracefully, sending SIGKILL, pid=" |
232 | 0 | << _child_pid; |
233 | 0 | ::kill(_child_pid, SIGKILL); |
234 | 0 | auto wait_result = wait_child_exit(_child_pid, PROCESS_TERMINATE_TIMEOUT, &exit_status); |
235 | 0 | exited = wait_result == ChildExitWaitResult::EXITED || |
236 | 0 | wait_result == ChildExitWaitResult::ALREADY_REAPED; |
237 | 0 | status_available = wait_result == ChildExitWaitResult::EXITED; |
238 | 0 | already_reaped = wait_result == ChildExitWaitResult::ALREADY_REAPED; |
239 | 0 | } |
240 | 0 | _child.detach(); |
241 | |
|
242 | 0 | if (!exited) [[unlikely]] { |
243 | 0 | LOG(WARNING) << "Python process did not exit after SIGKILL, enqueue background reap, pid=" |
244 | 0 | << _child_pid; |
245 | 0 | enqueue_child_for_reap(_child_pid); |
246 | 0 | } else if (already_reaped) { |
247 | 0 | LOG(INFO) << "Python process already reaped by another owner, pid=" << _child_pid; |
248 | 0 | } else if (!status_available) { |
249 | 0 | LOG(INFO) << "Python process exited but exit status is unavailable, pid=" << _child_pid; |
250 | 0 | } else { |
251 | 0 | if (WIFSIGNALED(exit_status)) { |
252 | 0 | LOG(INFO) << "Python process was killed by signal " << WTERMSIG(exit_status); |
253 | 0 | } else if (WIFEXITED(exit_status)) { |
254 | 0 | LOG(INFO) << "Python process exited normally with code: " << WEXITSTATUS(exit_status); |
255 | 0 | } else { |
256 | 0 | LOG(INFO) << "Python process exited"; |
257 | 0 | } |
258 | 0 | } |
259 | |
|
260 | 0 | _output_stream.close(); |
261 | 0 | remove_unix_socket(); |
262 | 0 | _is_shutdown = true; |
263 | 0 | } |
264 | | |
265 | 0 | std::string PythonUDFProcess::to_string() const { |
266 | 0 | return fmt::format( |
267 | 0 | "PythonUDFProcess(child_pid={}, uri={}, " |
268 | 0 | "unix_socket_file_path={}, is_shutdown={})", |
269 | 0 | _child_pid, _uri, _unix_socket_file_path, _is_shutdown); |
270 | 0 | } |
271 | | |
272 | | } // namespace doris |