Coverage Report

Created: 2026-06-10 02:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/udf/python/python_server.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "udf/python/python_server.h"
19
20
#include <arrow/type_fwd.h>
21
#include <butil/fd_utility.h>
22
#include <dirent.h>
23
#include <fmt/core.h>
24
#include <signal.h>
25
#include <sys/poll.h>
26
#include <sys/stat.h>
27
#include <unistd.h>
28
29
#include <algorithm>
30
#include <boost/asio.hpp>
31
#include <boost/process.hpp>
32
#include <chrono>
33
#include <fstream>
34
#include <thread>
35
36
#include "arrow/flight/client.h"
37
#include "common/config.h"
38
#include "common/status.h"
39
#include "runtime/thread_context.h"
40
#include "udf/python/python_udaf_client.h"
41
#include "udf/python/python_udf_client.h"
42
#include "udf/python/python_udtf_client.h"
43
#include "util/cpu_info.h"
44
45
namespace doris {
46
47
Result<std::shared_ptr<PythonServerManager::VersionedProcessPool>>
48
5.76k
PythonServerManager::_get_or_create_process_pool(const PythonVersion& version) {
49
5.76k
    std::lock_guard<std::mutex> lock(_pools_mutex);
50
    // shutdown() owns the manager lifecycle. Once it starts, creating a new pool would let detached
51
    // init workers publish Python processes that the manager no longer tracks.
52
5.76k
    if (_shutdown_flag.load(std::memory_order_acquire)) {
53
0
        return ResultError(Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
54
0
                "Python server manager is shutting down"));
55
0
    }
56
5.76k
    auto& pool = _process_pools[version];
57
5.76k
    if (!pool) {
58
1
        pool = std::make_shared<VersionedProcessPool>();
59
1
    }
60
5.76k
    return pool;
61
5.76k
}
62
63
std::vector<std::pair<PythonVersion, std::shared_ptr<PythonServerManager::VersionedProcessPool>>>
64
114
PythonServerManager::_snapshot_process_pools() {
65
114
    std::lock_guard<std::mutex> lock(_pools_mutex);
66
114
    std::vector<std::pair<PythonVersion, std::shared_ptr<VersionedProcessPool>>> snapshot;
67
114
    snapshot.reserve(_process_pools.size());
68
114
    for (const auto& [version, pool] : _process_pools) {
69
114
        snapshot.emplace_back(version, pool);
70
114
    }
71
114
    return snapshot;
72
114
}
73
74
#ifdef BE_TEST
75
void PythonServerManager::set_process_pool_for_test(const PythonVersion& version,
76
                                                    std::vector<ProcessPtr> processes,
77
                                                    bool initialized) {
78
    auto versioned_pool = _get_or_create_process_pool(version).value();
79
    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
80
    versioned_pool->processes = std::move(processes);
81
    versioned_pool->state = initialized ? PoolState::INITIALIZED : PoolState::UNINITIALIZED;
82
    versioned_pool->has_available_process =
83
            std::any_of(versioned_pool->processes.begin(), versioned_pool->processes.end(),
84
                        [](const ProcessPtr& process) { return process && process->is_alive(); });
85
}
86
87
std::vector<ProcessPtr> PythonServerManager::process_pool_snapshot_for_test(
88
        const PythonVersion& version) {
89
    auto versioned_pool = _get_or_create_process_pool(version).value();
90
    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
91
    return versioned_pool->processes;
92
}
93
94
bool PythonServerManager::process_pool_is_initializing_for_test(const PythonVersion& version) {
95
    auto versioned_pool = _get_or_create_process_pool(version).value();
96
    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
97
    return versioned_pool->state == PoolState::INITIALIZING;
98
}
99
100
bool PythonServerManager::process_pool_is_initialized_for_test(const PythonVersion& version) {
101
    auto versioned_pool = _get_or_create_process_pool(version).value();
102
    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
103
    return versioned_pool->state == PoolState::INITIALIZED;
104
}
105
#endif
106
107
bool PythonServerManager::_select_alive_process_from_pool(const std::vector<ProcessPtr>& pool,
108
5.78k
                                                          ProcessPtr* process) {
109
5.78k
    auto alive_iter = std::min_element(pool.begin(), pool.end(),
110
86.7k
                                       [](const ProcessPtr& a, const ProcessPtr& b) {
111
86.7k
                                           const bool a_alive = a && a->is_alive();
112
86.7k
                                           const bool b_alive = b && b->is_alive();
113
86.7k
                                           if (a_alive != b_alive) {
114
61
                                               return a_alive > b_alive;
115
61
                                           }
116
86.6k
                                           return a.use_count() < b.use_count();
117
86.7k
                                       });
118
5.78k
    if (alive_iter == pool.end() || !*alive_iter || !(*alive_iter)->is_alive()) {
119
0
        return false;
120
0
    }
121
5.78k
    *process = *alive_iter;
122
5.78k
    return true;
123
5.78k
}
124
125
template <typename ClientType>
126
Status PythonServerManager::get_client(const PythonUDFMeta& func_meta, const PythonVersion& version,
127
                                       std::shared_ptr<ClientType>* client,
128
5.76k
                                       const std::shared_ptr<arrow::Schema>& data_schema) {
129
5.76k
    std::shared_ptr<VersionedProcessPool> versioned_pool =
130
5.76k
            DORIS_TRY(_ensure_pool_initialized(version));
131
132
5.76k
    ProcessPtr process;
133
5.76k
    RETURN_IF_ERROR(_get_process(version, versioned_pool, &process));
134
135
5.76k
    if constexpr (std::is_same_v<ClientType, PythonUDAFClient>) {
136
2.88k
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), data_schema, client));
137
2.88k
    } else {
138
2.87k
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), client));
139
2.87k
    }
140
141
5.76k
    return Status::OK();
142
5.76k
}
_ZN5doris19PythonServerManager10get_clientINS_15PythonUDFClientEEENS_6StatusERKNS_13PythonUDFMetaERKNS_13PythonVersionEPSt10shared_ptrIT_ERKSA_IN5arrow6SchemaEE
Line
Count
Source
128
1.33k
                                       const std::shared_ptr<arrow::Schema>& data_schema) {
129
1.33k
    std::shared_ptr<VersionedProcessPool> versioned_pool =
130
1.33k
            DORIS_TRY(_ensure_pool_initialized(version));
131
132
1.33k
    ProcessPtr process;
133
1.33k
    RETURN_IF_ERROR(_get_process(version, versioned_pool, &process));
134
135
    if constexpr (std::is_same_v<ClientType, PythonUDAFClient>) {
136
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), data_schema, client));
137
1.33k
    } else {
138
1.33k
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), client));
139
1.33k
    }
140
141
1.33k
    return Status::OK();
142
1.33k
}
_ZN5doris19PythonServerManager10get_clientINS_16PythonUDAFClientEEENS_6StatusERKNS_13PythonUDFMetaERKNS_13PythonVersionEPSt10shared_ptrIT_ERKSA_IN5arrow6SchemaEE
Line
Count
Source
128
2.88k
                                       const std::shared_ptr<arrow::Schema>& data_schema) {
129
2.88k
    std::shared_ptr<VersionedProcessPool> versioned_pool =
130
2.88k
            DORIS_TRY(_ensure_pool_initialized(version));
131
132
2.88k
    ProcessPtr process;
133
2.88k
    RETURN_IF_ERROR(_get_process(version, versioned_pool, &process));
134
135
2.88k
    if constexpr (std::is_same_v<ClientType, PythonUDAFClient>) {
136
2.88k
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), data_schema, client));
137
    } else {
138
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), client));
139
    }
140
141
2.88k
    return Status::OK();
142
2.88k
}
_ZN5doris19PythonServerManager10get_clientINS_16PythonUDTFClientEEENS_6StatusERKNS_13PythonUDFMetaERKNS_13PythonVersionEPSt10shared_ptrIT_ERKSA_IN5arrow6SchemaEE
Line
Count
Source
128
1.54k
                                       const std::shared_ptr<arrow::Schema>& data_schema) {
129
1.54k
    std::shared_ptr<VersionedProcessPool> versioned_pool =
130
1.54k
            DORIS_TRY(_ensure_pool_initialized(version));
131
132
1.54k
    ProcessPtr process;
133
1.54k
    RETURN_IF_ERROR(_get_process(version, versioned_pool, &process));
134
135
    if constexpr (std::is_same_v<ClientType, PythonUDAFClient>) {
136
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), data_schema, client));
137
1.54k
    } else {
138
1.54k
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), client));
139
1.54k
    }
140
141
1.54k
    return Status::OK();
142
1.54k
}
143
144
Result<std::shared_ptr<PythonServerManager::VersionedProcessPool>>
145
5.76k
PythonServerManager::_ensure_pool_initialized(const PythonVersion& version) {
146
5.76k
    auto versioned_pool_result = _get_or_create_process_pool(version);
147
5.76k
    if (!versioned_pool_result.has_value()) {
148
0
        return ResultError(versioned_pool_result.error());
149
0
    }
150
5.76k
    auto versioned_pool = versioned_pool_result.value();
151
5.78k
    const int max_pool_size = config::max_python_process_num > 0 ? config::max_python_process_num
152
18.4E
                                                                 : CpuInfo::num_cores();
153
154
5.76k
    std::unique_lock<std::mutex> lock(versioned_pool->mutex);
155
5.76k
    if (versioned_pool->state == PoolState::INITIALIZED) {
156
5.74k
        return versioned_pool;
157
5.74k
    }
158
159
39
    if (versioned_pool->state != PoolState::STOPPED) {
160
39
        if (versioned_pool->state != PoolState::INITIALIZING) {
161
1
            versioned_pool->state = PoolState::INITIALIZING;
162
1
            versioned_pool->has_available_process = false;
163
1
            versioned_pool->processes.resize(max_pool_size);
164
1
            auto init_finished_count = std::make_shared<std::atomic<int>>(0);
165
166
1
            LOG(INFO) << "Initializing Python process pool for version " << version.to_string()
167
1
                      << " with " << max_pool_size << " processes (config::max_python_process_num="
168
1
                      << config::max_python_process_num << ", CPU cores=" << CpuInfo::num_cores()
169
1
                      << ")";
170
171
1
            std::thread([this, versioned_pool, init_finished_count, max_pool_size]() {
172
1
                SCOPED_INIT_THREAD_CONTEXT();
173
1
                std::unique_lock<std::mutex> lock(versioned_pool->mutex);
174
1
                versioned_pool->cv.wait_for(
175
1
                        lock, PROCESS_POOL_INIT_TIMEOUT,
176
12
                        [&versioned_pool, init_finished_count, max_pool_size]() {
177
12
                            return versioned_pool->state != PoolState::INITIALIZING ||
178
12
                                   init_finished_count->load(std::memory_order_acquire) >=
179
12
                                           max_pool_size;
180
12
                        });
181
1
                if (versioned_pool->state == PoolState::INITIALIZING) {
182
1
                    if (versioned_pool->has_available_process) {
183
                        // Keep this under the pool lock. shutdown() must acquire the same lock
184
                        // before it can destroy manager-owned health-check state.
185
1
                        _start_health_check_thread();
186
1
                        versioned_pool->state = PoolState::INITIALIZED;
187
1
                    } else {
188
0
                        versioned_pool->state = PoolState::UNINITIALIZED;
189
0
                    }
190
1
                }
191
1
                versioned_pool->cv.notify_all();
192
1
            }).detach();
193
194
17
            for (int i = 0; i < max_pool_size; ++i) {
195
16
                std::thread([version, versioned_pool, i, max_pool_size, init_finished_count]() {
196
16
                    SCOPED_INIT_THREAD_CONTEXT();
197
16
                    ProcessPtr process;
198
16
                    Status status = PythonServerManager::fork(version, &process);
199
16
                    const bool ok = status.ok() && process;
200
16
                    ProcessPtr process_to_shutdown;
201
16
                    {
202
16
                        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
203
                        // shutdown() and repair can race with detached init workers after timeout.
204
                        // Late successful forks only fill slots that are still empty or dead.
205
16
                        if (ok &&
206
16
                            (versioned_pool->state == PoolState::INITIALIZING ||
207
16
                             versioned_pool->state == PoolState::INITIALIZED) &&
208
16
                            i < versioned_pool->processes.size() &&
209
16
                            (!versioned_pool->processes[i] ||
210
16
                             !versioned_pool->processes[i]->is_alive())) {
211
16
                            versioned_pool->processes[i] = std::move(process);
212
16
                            versioned_pool->has_available_process = true;
213
16
                        } else if (ok) {
214
0
                            process_to_shutdown = std::move(process);
215
0
                        } else [[unlikely]] {
216
0
                            LOG(WARNING) << "Failed to create Python process " << (i + 1) << "/"
217
0
                                         << max_pool_size << " for version " << version.to_string()
218
0
                                         << ": " << status.to_string();
219
0
                        }
220
16
                    }
221
16
                    init_finished_count->fetch_add(1, std::memory_order_acq_rel);
222
16
                    versioned_pool->cv.notify_all();
223
16
                    if (process_to_shutdown) {
224
0
                        process_to_shutdown->shutdown();
225
0
                    }
226
16
                }).detach();
227
16
            }
228
1
        }
229
230
        // Wait only for the first usable process. INITIALIZED is set later by the last init worker
231
        // after every slot has attempted initialization.
232
40
        versioned_pool->cv.wait_for(lock, PROCESS_POOL_INIT_TIMEOUT, [&versioned_pool]() {
233
40
            return versioned_pool->has_available_process ||
234
40
                   versioned_pool->state == PoolState::STOPPED ||
235
40
                   versioned_pool->state != PoolState::INITIALIZING;
236
40
        });
237
39
        if (versioned_pool->has_available_process) {
238
39
            return versioned_pool;
239
39
        }
240
0
        versioned_pool->cv.notify_all();
241
0
    }
242
243
18.4E
    return ResultError(Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
244
18.4E
            "Failed to initialize Python process pool for version {}: no process became available "
245
18.4E
            "within {} ms",
246
18.4E
            version.to_string(), PROCESS_POOL_INIT_TIMEOUT.count()));
247
21
}
248
249
Status PythonServerManager::_get_process(
250
        const PythonVersion& version, const std::shared_ptr<VersionedProcessPool>& versioned_pool,
251
5.77k
        ProcessPtr* process) {
252
5.77k
    {
253
5.77k
        std::unique_lock<std::mutex> lock(versioned_pool->mutex);
254
5.77k
        std::vector<ProcessPtr>& pool = versioned_pool->processes;
255
256
5.77k
        if (versioned_pool->state == PoolState::STOPPED) {
257
0
            versioned_pool->has_available_process = false;
258
0
            return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
259
0
                    "Python process pool has stopped for version {}", version.to_string());
260
0
        }
261
262
5.78k
        if (_select_alive_process_from_pool(pool, process)) [[likely]] {
263
5.78k
            versioned_pool->has_available_process = true;
264
5.78k
            return Status::OK();
265
5.78k
        }
266
18.4E
        versioned_pool->has_available_process = false;
267
268
18.4E
        if (versioned_pool->state == PoolState::INITIALIZING) {
269
0
            versioned_pool->cv.wait_for(lock, PROCESS_REPAIR_WAIT_TIMEOUT, [&versioned_pool]() {
270
0
                return std::any_of(versioned_pool->processes.begin(),
271
0
                                   versioned_pool->processes.end(),
272
0
                                   [](const ProcessPtr& p) { return p && p->is_alive(); }) ||
273
0
                       versioned_pool->state != PoolState::INITIALIZING;
274
0
            });
275
0
            if (versioned_pool->state == PoolState::STOPPED) {
276
0
                return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
277
0
                        "Python process pool has stopped for version {}", version.to_string());
278
0
            }
279
0
            if (_select_alive_process_from_pool(pool, process)) {
280
0
                versioned_pool->has_available_process = true;
281
0
                return Status::OK();
282
0
            }
283
0
            versioned_pool->has_available_process = false;
284
0
            return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
285
0
                    "Python process pool is initializing but has no available process for version "
286
0
                    "{} after waiting {} ms",
287
0
                    version.to_string(), PROCESS_REPAIR_WAIT_TIMEOUT.count());
288
0
        }
289
290
18.4E
        if (versioned_pool->state != PoolState::INITIALIZED) {
291
0
            return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
292
0
                    "Python process pool is not initialized for version {}", version.to_string());
293
0
        }
294
295
18.4E
        if (!versioned_pool->repairing) {
296
0
            versioned_pool->repairing = true;
297
            // Repair is done in the background because fork can be slow. The current request still
298
            // waits briefly below so a transient all-dead pool can recover without failing.
299
0
            std::thread([version, versioned_pool]() {
300
0
                SCOPED_INIT_THREAD_CONTEXT();
301
0
                int recreated = PythonServerManager::_repair_process_pool(version, versioned_pool);
302
0
                {
303
0
                    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
304
0
                    versioned_pool->repairing = false;
305
0
                }
306
0
                versioned_pool->cv.notify_all();
307
0
                if (recreated > 0) {
308
0
                    LOG(INFO) << "Repaired Python process pool for version " << version.to_string()
309
0
                              << ": recreated=" << recreated;
310
0
                }
311
0
            }).detach();
312
0
        }
313
314
        // Keep the request recoverable in the common case where the Python runtime can fork
315
        // normally and only the existing pool entries died. The wait is short so a wedged fork path
316
        // still returns SERVICE_UNAVAILABLE promptly.
317
18.4E
        versioned_pool->cv.wait_for(lock, PROCESS_REPAIR_WAIT_TIMEOUT, [&versioned_pool]() {
318
0
            return std::any_of(versioned_pool->processes.begin(), versioned_pool->processes.end(),
319
0
                               [](const ProcessPtr& p) { return p && p->is_alive(); }) ||
320
0
                   versioned_pool->state == PoolState::STOPPED;
321
0
        });
322
18.4E
        if (versioned_pool->state == PoolState::STOPPED) {
323
0
            versioned_pool->has_available_process = false;
324
0
            return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
325
0
                    "Python process pool has stopped for version {}", version.to_string());
326
0
        }
327
328
18.4E
        if (_select_alive_process_from_pool(pool, process)) {
329
0
            versioned_pool->has_available_process = true;
330
0
            return Status::OK();
331
0
        }
332
18.4E
        versioned_pool->has_available_process = false;
333
18.4E
    }
334
335
0
    return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
336
18.4E
            "Python process pool has no available process for version {} after waiting repair for "
337
18.4E
            "{} ms",
338
18.4E
            version.to_string(), PROCESS_REPAIR_WAIT_TIMEOUT.count());
339
18.4E
}
340
341
16
Status PythonServerManager::fork(const PythonVersion& version, ProcessPtr* process) {
342
16
    std::string python_executable_path = version.get_executable_path();
343
16
    std::string fight_server_path = get_fight_server_path();
344
16
    std::string base_unix_socket_path = get_base_unix_socket_path();
345
16
    std::vector<std::string> args = {"-u", fight_server_path, base_unix_socket_path};
346
16
    boost::process::environment env = boost::this_process::environment();
347
16
    boost::process::ipstream child_output;
348
349
16
    try {
350
16
        boost::process::child c(
351
16
                python_executable_path, args, boost::process::std_out > child_output,
352
16
                boost::process::env = env,
353
16
                boost::process::on_exit([](int exit_code, const std::error_code& ec) {
354
0
                    if (ec) {
355
0
                        LOG(WARNING) << "Python UDF server exited with error: " << ec.message();
356
0
                    }
357
0
                }));
358
359
        // Bound socket readiness: a child process can start but never create the Flight socket.
360
        // Without this, pool initialization can block until FE reports send-fragments RPC timeout.
361
16
        pid_t child_pid = c.id();
362
16
        std::string expected_socket_path = get_unix_socket_file_path(child_pid);
363
16
        bool started_successfully = false;
364
16
        std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();
365
366
43
        while (std::chrono::steady_clock::now() - start < PROCESS_START_TIMEOUT) {
367
43
            struct stat buffer;
368
43
            if (stat(expected_socket_path.c_str(), &buffer) == 0) {
369
16
                started_successfully = true;
370
16
                break;
371
16
            }
372
373
27
            if (!c.running()) {
374
0
                break;
375
0
            }
376
27
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
377
27
        }
378
379
16
        if (!started_successfully) {
380
0
            int exit_status = 0;
381
0
            if (c.running()) {
382
                // Don't use the `wait` of boost::process, but use the operating system signal and waitpid with timeout instead.
383
                // Because boost::process may block the initialization/repair thread for a long time,
384
                // exceeding the timeout limit expected by the process pool.
385
0
                ::kill(child_pid, SIGTERM);
386
0
                auto wait_result = PythonUDFProcess::wait_child_exit(
387
0
                        child_pid, PROCESS_TERMINATE_TIMEOUT, &exit_status);
388
0
                if (wait_result == PythonUDFProcess::ChildExitWaitResult::TIMEOUT ||
389
0
                    wait_result == PythonUDFProcess::ChildExitWaitResult::ERROR) {
390
0
                    LOG(WARNING) << "Python server start timeout and terminate timeout exceeded,"
391
0
                                 << " sending SIGKILL to pid=" << child_pid;
392
0
                    ::kill(child_pid, SIGKILL);
393
0
                    wait_result = PythonUDFProcess::wait_child_exit(
394
0
                            child_pid, PROCESS_TERMINATE_TIMEOUT, &exit_status);
395
0
                    if (wait_result == PythonUDFProcess::ChildExitWaitResult::TIMEOUT ||
396
0
                        wait_result == PythonUDFProcess::ChildExitWaitResult::ERROR) [[unlikely]] {
397
                        // The child was SIGKILLed but not reaped within the bounded wait. Do not
398
                        // drop waitpid ownership after detach; otherwise a later exit can leave a
399
                        // zombie Python process under BE.
400
0
                        PythonUDFProcess::enqueue_child_for_reap(child_pid);
401
0
                        c.detach();
402
0
                        return Status::InternalError(
403
0
                                "Python server start failed: process did not exit after SIGKILL, "
404
0
                                "pid={}",
405
0
                                child_pid);
406
0
                    }
407
0
                }
408
0
            } else {
409
0
                PythonUDFProcess::wait_child_exit(child_pid, std::chrono::milliseconds(0),
410
0
                                                  &exit_status);
411
0
            }
412
0
            c.detach();
413
0
            return Status::InternalError("Python server start failed: socket file not found at {}",
414
0
                                         expected_socket_path);
415
0
        }
416
417
16
        *process = std::make_shared<PythonUDFProcess>(std::move(c), std::move(child_output));
418
419
16
    } catch (const std::exception& e) {
420
0
        return Status::InternalError("Failed to start Python UDF server: {}", e.what());
421
0
    }
422
423
16
    return Status::OK();
424
16
}
425
426
1
void PythonServerManager::_start_health_check_thread() {
427
1
    std::lock_guard<std::mutex> lock(_health_check_mutex);
428
1
    if (_health_check_thread || _shutdown_flag.load(std::memory_order_acquire)) return;
429
430
1
    LOG(INFO) << "Starting Python process health check thread (interval: 30 seconds)";
431
432
1
    _health_check_thread = std::make_unique<std::thread>([this]() {
433
        // Health check loop
434
59
        while (!_shutdown_flag.load(std::memory_order_acquire)) {
435
            // Wait for interval or shutdown signal
436
58
            {
437
58
                std::unique_lock<std::mutex> lock(_health_check_mutex);
438
115
                _health_check_cv.wait_for(lock, std::chrono::seconds(30), [this]() {
439
115
                    return _shutdown_flag.load(std::memory_order_acquire);
440
115
                });
441
58
            }
442
443
58
            if (_shutdown_flag.load(std::memory_order_acquire)) break;
444
445
58
            _check_and_recreate_processes();
446
58
            _refresh_memory_stats();
447
58
        }
448
449
1
        LOG(INFO) << "Python process health check thread exiting";
450
1
    });
451
1
}
452
453
57
void PythonServerManager::_check_and_recreate_processes() {
454
57
    int total_recreated = 0;
455
57
    for (auto& [version, versioned_pool] : _snapshot_process_pools()) {
456
57
        {
457
57
            std::lock_guard<std::mutex> lock(versioned_pool->mutex);
458
57
            if (versioned_pool->state != PoolState::INITIALIZED || versioned_pool->repairing) {
459
0
                continue;
460
0
            }
461
            // Share the same repair guard with foreground requests. Otherwise health check and
462
            // _get_process() can fork the same empty/dead slots at the same time under failures.
463
57
            versioned_pool->repairing = true;
464
57
        }
465
0
        int recreated = _repair_process_pool(version, versioned_pool);
466
57
        {
467
57
            std::lock_guard<std::mutex> lock(versioned_pool->mutex);
468
57
            versioned_pool->repairing = false;
469
57
        }
470
57
        versioned_pool->cv.notify_all();
471
57
        total_recreated += recreated;
472
57
    }
473
474
57
    if (total_recreated > 0) {
475
0
        LOG(INFO) << "Health check completed: recreated=" << total_recreated;
476
0
    }
477
57
}
478
479
int PythonServerManager::_repair_process_pool(
480
57
        const PythonVersion& version, const std::shared_ptr<VersionedProcessPool>& versioned_pool) {
481
57
    const int max_pool_size = config::max_python_process_num > 0 ? config::max_python_process_num
482
57
                                                                 : CpuInfo::num_cores();
483
57
    std::vector<size_t> died_process_indices;
484
57
    {
485
57
        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
486
57
        if (versioned_pool->state != PoolState::INITIALIZED) {
487
0
            return 0;
488
0
        }
489
490
57
        auto& pool = versioned_pool->processes;
491
57
        died_process_indices.reserve(std::max<size_t>(pool.size(), max_pool_size));
492
        // Need to fix the following two cases
493
        // 1. Existing processes have died
494
        // 2. Process pool not filled due to fork timeouts and other issues
495
969
        for (size_t i = 0; i < pool.size(); ++i) {
496
912
            const auto& process = pool[i];
497
912
            if (!process || !process->is_alive()) {
498
0
                died_process_indices.push_back(i);
499
0
            }
500
912
        }
501
57
        for (size_t i = pool.size(); i < static_cast<size_t>(max_pool_size); ++i) {
502
0
            died_process_indices.push_back(i);
503
0
        }
504
57
    }
505
506
57
    if (died_process_indices.empty()) [[likely]] {
507
57
        return 0;
508
57
    }
509
510
0
    int recreated = 0;
511
0
    std::vector<ProcessPtr> processes_to_shutdown;
512
0
    for (size_t index : died_process_indices) {
513
0
        {
514
0
            std::lock_guard<std::mutex> lock(versioned_pool->mutex);
515
0
            if (versioned_pool->state != PoolState::INITIALIZED) {
516
0
                break;
517
0
            }
518
0
        }
519
520
0
        ProcessPtr new_process;
521
0
        Status status = fork(version, &new_process);
522
0
        if (status.ok() && new_process) {
523
0
            bool published = false;
524
0
            {
525
0
                std::lock_guard<std::mutex> lock(versioned_pool->mutex);
526
0
                auto& pool = versioned_pool->processes;
527
0
                if (versioned_pool->state != PoolState::INITIALIZED) {
528
0
                    processes_to_shutdown.emplace_back(std::move(new_process));
529
0
                } else if (index < pool.size()) {
530
0
                    if (!pool[index] || !pool[index]->is_alive()) [[likely]] {
531
0
                        pool[index] = std::move(new_process);
532
0
                        versioned_pool->has_available_process = true;
533
0
                        recreated++;
534
0
                        published = true;
535
0
                    } else {
536
0
                        processes_to_shutdown.emplace_back(std::move(new_process));
537
0
                    }
538
0
                } else if (pool.size() < static_cast<size_t>(max_pool_size)) {
539
0
                    pool.emplace_back(std::move(new_process));
540
0
                    versioned_pool->has_available_process = true;
541
0
                    recreated++;
542
0
                    published = true;
543
0
                } else {
544
0
                    processes_to_shutdown.emplace_back(std::move(new_process));
545
0
                }
546
0
            }
547
0
            if (published) {
548
0
                versioned_pool->cv.notify_all();
549
0
            }
550
0
        } else {
551
0
            LOG(ERROR) << "Failed to recreate Python process for version " << version.to_string()
552
0
                       << ": " << status.to_string();
553
0
        }
554
0
    }
555
556
0
    {
557
0
        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
558
0
        auto& pool = versioned_pool->processes;
559
560
0
        if (versioned_pool->state != PoolState::INITIALIZED) {
561
0
            versioned_pool->has_available_process = false;
562
0
        } else {
563
            // Keep empty/dead slots instead of shrinking the vector. Init workers are detached and
564
            // publish by original slot index; shrinking here would make a late successful init look
565
            // out-of-range and discard a usable process.
566
0
            versioned_pool->has_available_process = std::any_of(
567
0
                    pool.begin(), pool.end(),
568
0
                    [](const ProcessPtr& process) { return process && process->is_alive(); });
569
0
        }
570
0
    }
571
0
    versioned_pool->cv.notify_all();
572
0
    for (auto& process : processes_to_shutdown) {
573
0
        process->shutdown();
574
0
    }
575
0
    return recreated;
576
57
}
577
578
11
void PythonServerManager::shutdown() {
579
11
    std::vector<std::pair<PythonVersion, std::shared_ptr<VersionedProcessPool>>> pools_to_shutdown;
580
11
    {
581
11
        std::lock_guard<std::mutex> lock(_pools_mutex);
582
11
        _shutdown_flag.store(true, std::memory_order_release);
583
11
        pools_to_shutdown.reserve(_process_pools.size());
584
11
        for (auto& [version, versioned_pool] : _process_pools) {
585
0
            pools_to_shutdown.emplace_back(version, std::move(versioned_pool));
586
0
        }
587
11
        _process_pools.clear();
588
11
    }
589
590
11
    for (auto& [version, versioned_pool] : pools_to_shutdown) {
591
0
        if (!versioned_pool) {
592
0
            continue;
593
0
        }
594
0
        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
595
0
        versioned_pool->state = PoolState::STOPPED;
596
0
        versioned_pool->has_available_process = false;
597
0
        versioned_pool->repairing = false;
598
0
        versioned_pool->cv.notify_all();
599
0
    }
600
601
11
    std::unique_ptr<std::thread> health_check_thread;
602
11
    {
603
11
        std::lock_guard<std::mutex> lock(_health_check_mutex);
604
11
        health_check_thread = std::move(_health_check_thread);
605
11
    }
606
11
    _health_check_cv.notify_one();
607
11
    if (health_check_thread && health_check_thread->joinable()) {
608
0
        health_check_thread->join();
609
0
    }
610
611
    // Shutdown all processes
612
11
    std::vector<ProcessPtr> processes_to_shutdown;
613
11
    for (auto& [version, versioned_pool] : pools_to_shutdown) {
614
0
        if (!versioned_pool) {
615
0
            continue;
616
0
        }
617
0
        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
618
0
        auto& pool = versioned_pool->processes;
619
0
        for (auto& process : pool) {
620
0
            if (process) {
621
0
                processes_to_shutdown.emplace_back(std::move(process));
622
0
            }
623
0
        }
624
0
        pool.clear();
625
0
        versioned_pool->cv.notify_all();
626
0
    }
627
11
    for (auto& process : processes_to_shutdown) {
628
0
        process->shutdown();
629
0
    }
630
11
}
631
632
912
Status PythonServerManager::_read_process_memory(pid_t pid, size_t* rss_bytes) {
633
    // Read from /proc/{pid}/statm
634
    // Format: size resident shared text lib data dt
635
912
    std::string statm_path = fmt::format("/proc/{}/statm", pid);
636
912
    std::ifstream statm_file(statm_path);
637
638
912
    if (!statm_file.is_open()) {
639
0
        return Status::InternalError("Cannot open {}", statm_path);
640
0
    }
641
642
912
    size_t size_pages = 0, rss_pages = 0;
643
    // we only care about RSS, read and ignore the total size field
644
912
    statm_file >> size_pages >> rss_pages;
645
646
912
    if (statm_file.fail()) {
647
0
        return Status::InternalError("Failed to read {}", statm_path);
648
0
    }
649
650
    // Convert pages to bytes
651
912
    long page_size = sysconf(_SC_PAGESIZE);
652
912
    *rss_bytes = rss_pages * page_size;
653
654
912
    return Status::OK();
655
912
}
656
657
57
void PythonServerManager::_refresh_memory_stats() {
658
57
    int64_t total_rss = 0;
659
660
57
    for (const auto& [version, versioned_pool] : _snapshot_process_pools()) {
661
57
        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
662
57
        const auto& pool = versioned_pool->processes;
663
912
        for (const auto& process : pool) {
664
912
            if (!process || !process->is_alive()) continue;
665
666
912
            size_t rss_bytes = 0;
667
912
            Status s = _read_process_memory(process->get_child_pid(), &rss_bytes);
668
669
912
            if (s.ok()) {
670
912
                total_rss += rss_bytes;
671
912
            } else [[unlikely]] {
672
0
                LOG(WARNING) << "Failed to read memory info for Python process (pid="
673
0
                             << process->get_child_pid() << "): " << s.to_string();
674
0
            }
675
912
        }
676
57
    }
677
57
    _mem_tracker.set_consumption(total_rss);
678
57
    LOG(INFO) << _mem_tracker.log_usage();
679
680
57
    if (config::python_udf_processes_memory_limit_bytes > 0 &&
681
57
        total_rss > config::python_udf_processes_memory_limit_bytes) {
682
0
        LOG(WARNING) << "Python UDF process memory usage exceeds limit: rss_bytes=" << total_rss
683
0
                     << ", limit_bytes=" << config::python_udf_processes_memory_limit_bytes;
684
0
    }
685
57
}
686
687
0
Status PythonServerManager::clear_module_cache(const std::string& location) {
688
0
    if (location.empty()) {
689
0
        return Status::InvalidArgument("Empty location for clear_module_cache");
690
0
    }
691
692
0
    std::string body = fmt::format(R"({{"location": "{}"}})", location);
693
0
    return _broadcast_action_to_processes("clear_module_cache", body,
694
0
                                          fmt::format("location={}", location));
695
0
}
696
697
0
void PythonServerManager::clear_udaf_state_cache(int64_t function_id) {
698
0
    std::string body = fmt::format(R"({{"function_id": {}}})", function_id);
699
0
    WARN_IF_ERROR(_broadcast_action_to_processes("clear_udaf_state_cache", body,
700
0
                                                 fmt::format("function_id={}", function_id)),
701
0
                  "failed to clear Python UDAF state cache");
702
0
}
703
704
Status PythonServerManager::_broadcast_action_to_processes(const std::string& action_type,
705
                                                           const std::string& body,
706
0
                                                           const std::string& log_name) {
707
0
    int success_count = 0;
708
0
    int fail_count = 0;
709
0
    bool has_active_process = false;
710
711
0
    for (auto& [version, versioned_pool] : _snapshot_process_pools()) {
712
0
        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
713
0
        auto& pool = versioned_pool->processes;
714
0
        for (auto& process : pool) {
715
0
            if (!process || !process->is_alive()) {
716
0
                continue;
717
0
            }
718
0
            has_active_process = true;
719
0
            try {
720
0
                auto loc_result = arrow::flight::Location::Parse(process->get_uri());
721
0
                if (!loc_result.ok()) [[unlikely]] {
722
0
                    fail_count++;
723
0
                    continue;
724
0
                }
725
726
0
                auto client_result = arrow::flight::FlightClient::Connect(*loc_result);
727
0
                if (!client_result.ok()) [[unlikely]] {
728
0
                    fail_count++;
729
0
                    continue;
730
0
                }
731
0
                auto client = std::move(*client_result);
732
733
0
                arrow::flight::Action action;
734
0
                action.type = action_type;
735
0
                action.body = arrow::Buffer::FromString(body);
736
737
0
                auto result_stream = client->DoAction(action);
738
0
                if (!result_stream.ok()) {
739
0
                    fail_count++;
740
0
                    continue;
741
0
                }
742
743
0
                auto result = (*result_stream)->Next();
744
0
                if (result.ok() && *result) {
745
0
                    success_count++;
746
0
                } else {
747
0
                    fail_count++;
748
0
                }
749
750
0
            } catch (...) {
751
0
                fail_count++;
752
0
            }
753
0
        }
754
0
    }
755
756
0
    if (!has_active_process) {
757
0
        return Status::OK();
758
0
    }
759
760
0
    LOG(INFO) << action_type << " completed for " << log_name << ", success=" << success_count
761
0
              << ", failed=" << fail_count;
762
763
0
    if (fail_count > 0) {
764
0
        return Status::InternalError("{} failed for {}, success={}, failed={}", action_type,
765
0
                                     log_name, success_count, fail_count);
766
0
    }
767
768
0
    return Status::OK();
769
0
}
770
771
// Explicit template instantiation for UDF, UDAF and UDTF clients
772
template Status PythonServerManager::get_client<PythonUDFClient>(
773
        const PythonUDFMeta& func_meta, const PythonVersion& version,
774
        std::shared_ptr<PythonUDFClient>* client,
775
        const std::shared_ptr<arrow::Schema>& data_schema);
776
777
template Status PythonServerManager::get_client<PythonUDAFClient>(
778
        const PythonUDFMeta& func_meta, const PythonVersion& version,
779
        std::shared_ptr<PythonUDAFClient>* client,
780
        const std::shared_ptr<arrow::Schema>& data_schema);
781
782
template Status PythonServerManager::get_client<PythonUDTFClient>(
783
        const PythonUDFMeta& func_meta, const PythonVersion& version,
784
        std::shared_ptr<PythonUDTFClient>* client,
785
        const std::shared_ptr<arrow::Schema>& data_schema);
786
787
} // namespace doris