Coverage Report

Created: 2026-06-12 17:35

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/udf/python/python_server.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "udf/python/python_server.h"
19
20
#include <arrow/type_fwd.h>
21
#include <butil/fd_utility.h>
22
#include <dirent.h>
23
#include <fmt/core.h>
24
#include <signal.h>
25
#include <sys/poll.h>
26
#include <sys/stat.h>
27
#include <unistd.h>
28
29
#include <algorithm>
30
#include <boost/asio.hpp>
31
#include <boost/process.hpp>
32
#include <chrono>
33
#include <fstream>
34
#include <thread>
35
36
#include "arrow/flight/client.h"
37
#include "common/config.h"
38
#include "common/status.h"
39
#include "runtime/thread_context.h"
40
#include "udf/python/python_udaf_client.h"
41
#include "udf/python/python_udf_client.h"
42
#include "udf/python/python_udtf_client.h"
43
#include "util/cpu_info.h"
44
45
namespace doris {
46
47
Result<std::shared_ptr<PythonServerManager::VersionedProcessPool>>
48
44
PythonServerManager::_get_or_create_process_pool(const PythonVersion& version) {
49
44
    std::lock_guard<std::mutex> lock(_pools_mutex);
50
    // shutdown() owns the manager lifecycle. Once it starts, creating a new pool would let detached
51
    // init workers publish Python processes that the manager no longer tracks.
52
44
    if (_shutdown_flag.load(std::memory_order_acquire)) {
53
1
        return ResultError(Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
54
1
                "Python server manager is shutting down"));
55
1
    }
56
43
    auto& pool = _process_pools[version];
57
43
    if (!pool) {
58
25
        pool = std::make_shared<VersionedProcessPool>();
59
25
    }
60
43
    return pool;
61
44
}
62
63
std::vector<std::pair<PythonVersion, std::shared_ptr<PythonServerManager::VersionedProcessPool>>>
64
7
PythonServerManager::_snapshot_process_pools() {
65
7
    std::lock_guard<std::mutex> lock(_pools_mutex);
66
7
    std::vector<std::pair<PythonVersion, std::shared_ptr<VersionedProcessPool>>> snapshot;
67
7
    snapshot.reserve(_process_pools.size());
68
7
    for (const auto& [version, pool] : _process_pools) {
69
5
        snapshot.emplace_back(version, pool);
70
5
    }
71
7
    return snapshot;
72
7
}
73
74
#ifdef BE_TEST
75
void PythonServerManager::set_process_pool_for_test(const PythonVersion& version,
76
                                                    std::vector<ProcessPtr> processes,
77
7
                                                    bool initialized) {
78
7
    auto versioned_pool = _get_or_create_process_pool(version).value();
79
7
    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
80
7
    versioned_pool->processes = std::move(processes);
81
7
    versioned_pool->state = initialized ? PoolState::INITIALIZED : PoolState::UNINITIALIZED;
82
7
    versioned_pool->has_available_process =
83
7
            std::any_of(versioned_pool->processes.begin(), versioned_pool->processes.end(),
84
7
                        [](const ProcessPtr& process) { return process && process->is_alive(); });
85
7
}
86
87
std::vector<ProcessPtr> PythonServerManager::process_pool_snapshot_for_test(
88
5
        const PythonVersion& version) {
89
5
    auto versioned_pool = _get_or_create_process_pool(version).value();
90
5
    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
91
5
    return versioned_pool->processes;
92
5
}
93
94
1
bool PythonServerManager::process_pool_is_initializing_for_test(const PythonVersion& version) {
95
1
    auto versioned_pool = _get_or_create_process_pool(version).value();
96
1
    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
97
1
    return versioned_pool->state == PoolState::INITIALIZING;
98
1
}
99
100
7
bool PythonServerManager::process_pool_is_initialized_for_test(const PythonVersion& version) {
101
7
    auto versioned_pool = _get_or_create_process_pool(version).value();
102
7
    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
103
7
    return versioned_pool->state == PoolState::INITIALIZED;
104
7
}
105
#endif
106
107
bool PythonServerManager::_select_alive_process_from_pool(const std::vector<ProcessPtr>& pool,
108
16
                                                          ProcessPtr* process) {
109
16
    auto alive_iter = std::min_element(pool.begin(), pool.end(),
110
16
                                       [](const ProcessPtr& a, const ProcessPtr& b) {
111
7
                                           const bool a_alive = a && a->is_alive();
112
7
                                           const bool b_alive = b && b->is_alive();
113
7
                                           if (a_alive != b_alive) {
114
7
                                               return a_alive > b_alive;
115
7
                                           }
116
0
                                           return a.use_count() < b.use_count();
117
7
                                       });
118
16
    if (alive_iter == pool.end() || !*alive_iter || !(*alive_iter)->is_alive()) {
119
3
        return false;
120
3
    }
121
13
    *process = *alive_iter;
122
13
    return true;
123
16
}
124
125
template <typename ClientType>
126
Status PythonServerManager::get_client(const PythonUDFMeta& func_meta, const PythonVersion& version,
127
                                       std::shared_ptr<ClientType>* client,
128
1
                                       const std::shared_ptr<arrow::Schema>& data_schema) {
129
1
    std::shared_ptr<VersionedProcessPool> versioned_pool =
130
1
            DORIS_TRY(_ensure_pool_initialized(version));
131
132
0
    ProcessPtr process;
133
0
    RETURN_IF_ERROR(_get_process(version, versioned_pool, &process));
134
135
0
    if constexpr (std::is_same_v<ClientType, PythonUDAFClient>) {
136
0
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), data_schema, client));
137
0
    } else {
138
0
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), client));
139
0
    }
140
141
0
    return Status::OK();
142
0
}
_ZN5doris19PythonServerManager10get_clientINS_15PythonUDFClientEEENS_6StatusERKNS_13PythonUDFMetaERKNS_13PythonVersionEPSt10shared_ptrIT_ERKSA_IN5arrow6SchemaEE
Line
Count
Source
128
1
                                       const std::shared_ptr<arrow::Schema>& data_schema) {
129
1
    std::shared_ptr<VersionedProcessPool> versioned_pool =
130
1
            DORIS_TRY(_ensure_pool_initialized(version));
131
132
0
    ProcessPtr process;
133
0
    RETURN_IF_ERROR(_get_process(version, versioned_pool, &process));
134
135
    if constexpr (std::is_same_v<ClientType, PythonUDAFClient>) {
136
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), data_schema, client));
137
0
    } else {
138
0
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), client));
139
0
    }
140
141
0
    return Status::OK();
142
0
}
Unexecuted instantiation: _ZN5doris19PythonServerManager10get_clientINS_16PythonUDAFClientEEENS_6StatusERKNS_13PythonUDFMetaERKNS_13PythonVersionEPSt10shared_ptrIT_ERKSA_IN5arrow6SchemaEE
Unexecuted instantiation: _ZN5doris19PythonServerManager10get_clientINS_16PythonUDTFClientEEENS_6StatusERKNS_13PythonUDFMetaERKNS_13PythonVersionEPSt10shared_ptrIT_ERKSA_IN5arrow6SchemaEE
143
144
Result<std::shared_ptr<PythonServerManager::VersionedProcessPool>>
145
24
PythonServerManager::_ensure_pool_initialized(const PythonVersion& version) {
146
24
    auto versioned_pool_result = _get_or_create_process_pool(version);
147
24
    if (!versioned_pool_result.has_value()) {
148
1
        return ResultError(versioned_pool_result.error());
149
1
    }
150
23
    auto versioned_pool = versioned_pool_result.value();
151
23
    const int max_pool_size = config::max_python_process_num > 0 ? config::max_python_process_num
152
23
                                                                 : CpuInfo::num_cores();
153
154
23
    std::unique_lock<std::mutex> lock(versioned_pool->mutex);
155
23
    if (versioned_pool->state == PoolState::INITIALIZED) {
156
3
        return versioned_pool;
157
3
    }
158
159
20
    if (versioned_pool->state != PoolState::STOPPED) {
160
20
        if (versioned_pool->state != PoolState::INITIALIZING) {
161
20
            versioned_pool->state = PoolState::INITIALIZING;
162
20
            versioned_pool->has_available_process = false;
163
20
            versioned_pool->processes.resize(max_pool_size);
164
20
            auto init_finished_count = std::make_shared<std::atomic<int>>(0);
165
166
20
            LOG(INFO) << "Initializing Python process pool for version " << version.to_string()
167
20
                      << " with " << max_pool_size << " processes (config::max_python_process_num="
168
20
                      << config::max_python_process_num << ", CPU cores=" << CpuInfo::num_cores()
169
20
                      << ")";
170
171
20
            std::thread([this, versioned_pool, init_finished_count, max_pool_size]() {
172
20
                SCOPED_INIT_THREAD_CONTEXT();
173
20
                std::unique_lock<std::mutex> lock(versioned_pool->mutex);
174
20
                versioned_pool->cv.wait_for(
175
20
                        lock, PROCESS_POOL_INIT_TIMEOUT,
176
44
                        [&versioned_pool, init_finished_count, max_pool_size]() {
177
44
                            return versioned_pool->state != PoolState::INITIALIZING ||
178
44
                                   init_finished_count->load(std::memory_order_acquire) >=
179
42
                                           max_pool_size;
180
44
                        });
181
20
                if (versioned_pool->state == PoolState::INITIALIZING) {
182
18
                    if (versioned_pool->has_available_process) {
183
                        // Keep this under the pool lock. shutdown() must acquire the same lock
184
                        // before it can destroy manager-owned health-check state.
185
12
                        _start_health_check_thread();
186
12
                        versioned_pool->state = PoolState::INITIALIZED;
187
12
                    } else {
188
6
                        versioned_pool->state = PoolState::UNINITIALIZED;
189
6
                    }
190
18
                }
191
20
                versioned_pool->cv.notify_all();
192
20
            }).detach();
193
194
44
            for (int i = 0; i < max_pool_size; ++i) {
195
24
                std::thread([version, versioned_pool, i, max_pool_size, init_finished_count]() {
196
24
                    SCOPED_INIT_THREAD_CONTEXT();
197
24
                    ProcessPtr process;
198
24
                    Status status = PythonServerManager::fork(version, &process);
199
24
                    const bool ok = status.ok() && process;
200
24
                    ProcessPtr process_to_shutdown;
201
24
                    {
202
24
                        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
203
                        // shutdown() and repair can race with detached init workers after timeout.
204
                        // Late successful forks only fill slots that are still empty or dead.
205
24
                        if (ok &&
206
24
                            (versioned_pool->state == PoolState::INITIALIZING ||
207
16
                             versioned_pool->state == PoolState::INITIALIZED) &&
208
24
                            i < versioned_pool->processes.size() &&
209
24
                            (!versioned_pool->processes[i] ||
210
14
                             !versioned_pool->processes[i]->is_alive())) {
211
14
                            versioned_pool->processes[i] = std::move(process);
212
14
                            versioned_pool->has_available_process = true;
213
14
                        } else if (ok) {
214
2
                            process_to_shutdown = std::move(process);
215
8
                        } else [[unlikely]] {
216
8
                            LOG(WARNING) << "Failed to create Python process " << (i + 1) << "/"
217
8
                                         << max_pool_size << " for version " << version.to_string()
218
8
                                         << ": " << status.to_string();
219
8
                        }
220
24
                    }
221
24
                    init_finished_count->fetch_add(1, std::memory_order_acq_rel);
222
24
                    versioned_pool->cv.notify_all();
223
24
                    if (process_to_shutdown) {
224
2
                        process_to_shutdown->shutdown();
225
2
                    }
226
24
                }).detach();
227
24
            }
228
20
        }
229
230
        // Wait only for the first usable process. INITIALIZED is set later by the last init worker
231
        // after every slot has attempted initialization.
232
45
        versioned_pool->cv.wait_for(lock, PROCESS_POOL_INIT_TIMEOUT, [&versioned_pool]() {
233
45
            return versioned_pool->has_available_process ||
234
45
                   versioned_pool->state == PoolState::STOPPED ||
235
45
                   versioned_pool->state != PoolState::INITIALIZING;
236
45
        });
237
20
        if (versioned_pool->has_available_process) {
238
14
            return versioned_pool;
239
14
        }
240
6
        versioned_pool->cv.notify_all();
241
6
    }
242
243
6
    return ResultError(Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
244
6
            "Failed to initialize Python process pool for version {}: no process became available "
245
6
            "within {} ms",
246
6
            version.to_string(), PROCESS_POOL_INIT_TIMEOUT.count()));
247
20
}
248
249
Status PythonServerManager::_get_process(
250
        const PythonVersion& version, const std::shared_ptr<VersionedProcessPool>& versioned_pool,
251
15
        ProcessPtr* process) {
252
15
    {
253
15
        std::unique_lock<std::mutex> lock(versioned_pool->mutex);
254
15
        std::vector<ProcessPtr>& pool = versioned_pool->processes;
255
256
15
        if (versioned_pool->state == PoolState::STOPPED) {
257
1
            versioned_pool->has_available_process = false;
258
1
            return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
259
1
                    "Python process pool has stopped for version {}", version.to_string());
260
1
        }
261
262
14
        if (_select_alive_process_from_pool(pool, process)) [[likely]] {
263
12
            versioned_pool->has_available_process = true;
264
12
            return Status::OK();
265
12
        }
266
2
        versioned_pool->has_available_process = false;
267
268
2
        if (versioned_pool->state == PoolState::INITIALIZING) {
269
0
            versioned_pool->cv.wait_for(lock, PROCESS_REPAIR_WAIT_TIMEOUT, [&versioned_pool]() {
270
0
                return std::any_of(versioned_pool->processes.begin(),
271
0
                                   versioned_pool->processes.end(),
272
0
                                   [](const ProcessPtr& p) { return p && p->is_alive(); }) ||
273
0
                       versioned_pool->state != PoolState::INITIALIZING;
274
0
            });
275
0
            if (versioned_pool->state == PoolState::STOPPED) {
276
0
                return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
277
0
                        "Python process pool has stopped for version {}", version.to_string());
278
0
            }
279
0
            if (_select_alive_process_from_pool(pool, process)) {
280
0
                versioned_pool->has_available_process = true;
281
0
                return Status::OK();
282
0
            }
283
0
            versioned_pool->has_available_process = false;
284
0
            return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
285
0
                    "Python process pool is initializing but has no available process for version "
286
0
                    "{} after waiting {} ms",
287
0
                    version.to_string(), PROCESS_REPAIR_WAIT_TIMEOUT.count());
288
0
        }
289
290
2
        if (versioned_pool->state != PoolState::INITIALIZED) {
291
0
            return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
292
0
                    "Python process pool is not initialized for version {}", version.to_string());
293
0
        }
294
295
2
        if (!versioned_pool->repairing) {
296
1
            versioned_pool->repairing = true;
297
            // Repair is done in the background because fork can be slow. The current request still
298
            // waits briefly below so a transient all-dead pool can recover without failing.
299
1
            std::thread([version, versioned_pool]() {
300
1
                SCOPED_INIT_THREAD_CONTEXT();
301
1
                int recreated = PythonServerManager::_repair_process_pool(version, versioned_pool);
302
1
                {
303
1
                    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
304
1
                    versioned_pool->repairing = false;
305
1
                }
306
1
                versioned_pool->cv.notify_all();
307
1
                if (recreated > 0) {
308
1
                    LOG(INFO) << "Repaired Python process pool for version " << version.to_string()
309
1
                              << ": recreated=" << recreated;
310
1
                }
311
1
            }).detach();
312
1
        }
313
314
        // Keep the request recoverable in the common case where the Python runtime can fork
315
        // normally and only the existing pool entries died. The wait is short so a wedged fork path
316
        // still returns SERVICE_UNAVAILABLE promptly.
317
4
        versioned_pool->cv.wait_for(lock, PROCESS_REPAIR_WAIT_TIMEOUT, [&versioned_pool]() {
318
4
            return std::any_of(versioned_pool->processes.begin(), versioned_pool->processes.end(),
319
4
                               [](const ProcessPtr& p) { return p && p->is_alive(); }) ||
320
4
                   versioned_pool->state == PoolState::STOPPED;
321
4
        });
322
2
        if (versioned_pool->state == PoolState::STOPPED) {
323
0
            versioned_pool->has_available_process = false;
324
0
            return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
325
0
                    "Python process pool has stopped for version {}", version.to_string());
326
0
        }
327
328
2
        if (_select_alive_process_from_pool(pool, process)) {
329
1
            versioned_pool->has_available_process = true;
330
1
            return Status::OK();
331
1
        }
332
1
        versioned_pool->has_available_process = false;
333
1
    }
334
335
0
    return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
336
1
            "Python process pool has no available process for version {} after waiting repair for "
337
1
            "{} ms",
338
1
            version.to_string(), PROCESS_REPAIR_WAIT_TIMEOUT.count());
339
2
}
340
341
44
Status PythonServerManager::fork(const PythonVersion& version, ProcessPtr* process) {
342
44
    std::string python_executable_path = version.get_executable_path();
343
44
    std::string fight_server_path = get_fight_server_path();
344
44
    std::string base_unix_socket_path = get_base_unix_socket_path();
345
44
    std::vector<std::string> args = {"-u", fight_server_path, base_unix_socket_path};
346
44
    boost::process::environment env = boost::this_process::environment();
347
44
    boost::process::ipstream child_output;
348
349
44
    try {
350
44
        boost::process::child c(
351
44
                python_executable_path, args, boost::process::std_out > child_output,
352
44
                boost::process::env = env,
353
44
                boost::process::on_exit([](int exit_code, const std::error_code& ec) {
354
0
                    if (ec) {
355
0
                        LOG(WARNING) << "Python UDF server exited with error: " << ec.message();
356
0
                    }
357
0
                }));
358
359
        // Bound socket readiness: a child process can start but never create the Flight socket.
360
        // Without this, pool initialization can block until FE reports send-fragments RPC timeout.
361
44
        pid_t child_pid = c.id();
362
44
        std::string expected_socket_path = get_unix_socket_file_path(child_pid);
363
44
        bool started_successfully = false;
364
44
        std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();
365
366
99
        while (std::chrono::steady_clock::now() - start < PROCESS_START_TIMEOUT) {
367
85
            struct stat buffer;
368
85
            if (stat(expected_socket_path.c_str(), &buffer) == 0) {
369
28
                started_successfully = true;
370
28
                break;
371
28
            }
372
373
57
            if (!c.running()) {
374
2
                break;
375
2
            }
376
55
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
377
55
        }
378
379
44
        if (!started_successfully) {
380
7
            int exit_status = 0;
381
7
            if (c.running()) {
382
                // Don't use the `wait` of boost::process, but use the operating system signal and waitpid with timeout instead.
383
                // Because boost::process may block the initialization/repair thread for a long time,
384
                // exceeding the timeout limit expected by the process pool.
385
5
                ::kill(child_pid, SIGTERM);
386
5
                auto wait_result = PythonUDFProcess::wait_child_exit(
387
5
                        child_pid, PROCESS_TERMINATE_TIMEOUT, &exit_status);
388
5
                if (wait_result == PythonUDFProcess::ChildExitWaitResult::TIMEOUT ||
389
5
                    wait_result == PythonUDFProcess::ChildExitWaitResult::ERROR) {
390
5
                    LOG(WARNING) << "Python server start timeout and terminate timeout exceeded,"
391
5
                                 << " sending SIGKILL to pid=" << child_pid;
392
5
                    ::kill(child_pid, SIGKILL);
393
5
                    wait_result = PythonUDFProcess::wait_child_exit(
394
5
                            child_pid, PROCESS_TERMINATE_TIMEOUT, &exit_status);
395
5
                    if (wait_result == PythonUDFProcess::ChildExitWaitResult::TIMEOUT ||
396
5
                        wait_result == PythonUDFProcess::ChildExitWaitResult::ERROR) [[unlikely]] {
397
                        // The child was SIGKILLed but not reaped within the bounded wait. Do not
398
                        // drop waitpid ownership after detach; otherwise a later exit can leave a
399
                        // zombie Python process under BE.
400
1
                        PythonUDFProcess::enqueue_child_for_reap(child_pid);
401
1
                        c.detach();
402
1
                        return Status::InternalError(
403
1
                                "Python server start failed: process did not exit after SIGKILL, "
404
1
                                "pid={}",
405
1
                                child_pid);
406
1
                    }
407
5
                }
408
5
            } else {
409
2
                PythonUDFProcess::wait_child_exit(child_pid, std::chrono::milliseconds(0),
410
2
                                                  &exit_status);
411
2
            }
412
6
            c.detach();
413
6
            return Status::InternalError("Python server start failed: socket file not found at {}",
414
6
                                         expected_socket_path);
415
7
        }
416
417
37
        *process = std::make_shared<PythonUDFProcess>(std::move(c), std::move(child_output));
418
419
37
    } catch (const std::exception& e) {
420
9
        return Status::InternalError("Failed to start Python UDF server: {}", e.what());
421
9
    }
422
423
28
    return Status::OK();
424
44
}
425
426
12
void PythonServerManager::_start_health_check_thread() {
427
12
    std::lock_guard<std::mutex> lock(_health_check_mutex);
428
12
    if (_health_check_thread || _shutdown_flag.load(std::memory_order_acquire)) return;
429
430
12
    LOG(INFO) << "Starting Python process health check thread (interval: 30 seconds)";
431
432
8
    _health_check_thread = std::make_unique<std::thread>([this]() {
433
        // Health check loop
434
8
        while (!_shutdown_flag.load(std::memory_order_acquire)) {
435
            // Wait for interval or shutdown signal
436
4
            {
437
4
                std::unique_lock<std::mutex> lock(_health_check_mutex);
438
8
                _health_check_cv.wait_for(lock, std::chrono::seconds(30), [this]() {
439
8
                    return _shutdown_flag.load(std::memory_order_acquire);
440
8
                });
441
4
            }
442
443
4
            if (_shutdown_flag.load(std::memory_order_acquire)) break;
444
445
0
            _check_and_recreate_processes();
446
0
            _refresh_memory_stats();
447
0
        }
448
449
8
        LOG(INFO) << "Python process health check thread exiting";
450
8
    });
451
8
}
452
453
4
void PythonServerManager::_check_and_recreate_processes() {
454
4
    int total_recreated = 0;
455
4
    for (auto& [version, versioned_pool] : _snapshot_process_pools()) {
456
4
        {
457
4
            std::lock_guard<std::mutex> lock(versioned_pool->mutex);
458
4
            if (versioned_pool->state != PoolState::INITIALIZED || versioned_pool->repairing) {
459
2
                continue;
460
2
            }
461
            // Share the same repair guard with foreground requests. Otherwise health check and
462
            // _get_process() can fork the same empty/dead slots at the same time under failures.
463
2
            versioned_pool->repairing = true;
464
2
        }
465
0
        int recreated = _repair_process_pool(version, versioned_pool);
466
2
        {
467
2
            std::lock_guard<std::mutex> lock(versioned_pool->mutex);
468
2
            versioned_pool->repairing = false;
469
2
        }
470
2
        versioned_pool->cv.notify_all();
471
2
        total_recreated += recreated;
472
2
    }
473
474
4
    if (total_recreated > 0) {
475
1
        LOG(INFO) << "Health check completed: recreated=" << total_recreated;
476
1
    }
477
4
}
478
479
int PythonServerManager::_repair_process_pool(
480
3
        const PythonVersion& version, const std::shared_ptr<VersionedProcessPool>& versioned_pool) {
481
3
    const int max_pool_size = config::max_python_process_num > 0 ? config::max_python_process_num
482
3
                                                                 : CpuInfo::num_cores();
483
3
    std::vector<size_t> died_process_indices;
484
3
    {
485
3
        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
486
3
        if (versioned_pool->state != PoolState::INITIALIZED) {
487
0
            return 0;
488
0
        }
489
490
3
        auto& pool = versioned_pool->processes;
491
3
        died_process_indices.reserve(std::max<size_t>(pool.size(), max_pool_size));
492
        // Need to fix the following two cases
493
        // 1. Existing processes have died
494
        // 2. Process pool not filled due to fork timeouts and other issues
495
9
        for (size_t i = 0; i < pool.size(); ++i) {
496
6
            const auto& process = pool[i];
497
6
            if (!process || !process->is_alive()) {
498
5
                died_process_indices.push_back(i);
499
5
            }
500
6
        }
501
3
        for (size_t i = pool.size(); i < static_cast<size_t>(max_pool_size); ++i) {
502
0
            died_process_indices.push_back(i);
503
0
        }
504
3
    }
505
506
3
    if (died_process_indices.empty()) [[likely]] {
507
0
        return 0;
508
0
    }
509
510
3
    int recreated = 0;
511
3
    std::vector<ProcessPtr> processes_to_shutdown;
512
5
    for (size_t index : died_process_indices) {
513
5
        {
514
5
            std::lock_guard<std::mutex> lock(versioned_pool->mutex);
515
5
            if (versioned_pool->state != PoolState::INITIALIZED) {
516
0
                break;
517
0
            }
518
5
        }
519
520
5
        ProcessPtr new_process;
521
5
        Status status = fork(version, &new_process);
522
5
        if (status.ok() && new_process) {
523
3
            bool published = false;
524
3
            {
525
3
                std::lock_guard<std::mutex> lock(versioned_pool->mutex);
526
3
                auto& pool = versioned_pool->processes;
527
3
                if (versioned_pool->state != PoolState::INITIALIZED) {
528
0
                    processes_to_shutdown.emplace_back(std::move(new_process));
529
3
                } else if (index < pool.size()) {
530
3
                    if (!pool[index] || !pool[index]->is_alive()) [[likely]] {
531
3
                        pool[index] = std::move(new_process);
532
3
                        versioned_pool->has_available_process = true;
533
3
                        recreated++;
534
3
                        published = true;
535
3
                    } else {
536
0
                        processes_to_shutdown.emplace_back(std::move(new_process));
537
0
                    }
538
3
                } else if (pool.size() < static_cast<size_t>(max_pool_size)) {
539
0
                    pool.emplace_back(std::move(new_process));
540
0
                    versioned_pool->has_available_process = true;
541
0
                    recreated++;
542
0
                    published = true;
543
0
                } else {
544
0
                    processes_to_shutdown.emplace_back(std::move(new_process));
545
0
                }
546
3
            }
547
3
            if (published) {
548
3
                versioned_pool->cv.notify_all();
549
3
            }
550
3
        } else {
551
2
            LOG(ERROR) << "Failed to recreate Python process for version " << version.to_string()
552
2
                       << ": " << status.to_string();
553
2
        }
554
5
    }
555
556
3
    {
557
3
        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
558
3
        auto& pool = versioned_pool->processes;
559
560
3
        if (versioned_pool->state != PoolState::INITIALIZED) {
561
0
            versioned_pool->has_available_process = false;
562
3
        } else {
563
            // Keep empty/dead slots instead of shrinking the vector. Init workers are detached and
564
            // publish by original slot index; shrinking here would make a late successful init look
565
            // out-of-range and discard a usable process.
566
3
            versioned_pool->has_available_process = std::any_of(
567
3
                    pool.begin(), pool.end(),
568
4
                    [](const ProcessPtr& process) { return process && process->is_alive(); });
569
3
        }
570
3
    }
571
3
    versioned_pool->cv.notify_all();
572
3
    for (auto& process : processes_to_shutdown) {
573
0
        process->shutdown();
574
0
    }
575
3
    return recreated;
576
3
}
577
578
67
void PythonServerManager::shutdown() {
579
67
    std::vector<std::pair<PythonVersion, std::shared_ptr<VersionedProcessPool>>> pools_to_shutdown;
580
67
    {
581
67
        std::lock_guard<std::mutex> lock(_pools_mutex);
582
67
        _shutdown_flag.store(true, std::memory_order_release);
583
67
        pools_to_shutdown.reserve(_process_pools.size());
584
67
        for (auto& [version, versioned_pool] : _process_pools) {
585
25
            pools_to_shutdown.emplace_back(version, std::move(versioned_pool));
586
25
        }
587
67
        _process_pools.clear();
588
67
    }
589
590
67
    for (auto& [version, versioned_pool] : pools_to_shutdown) {
591
25
        if (!versioned_pool) {
592
0
            continue;
593
0
        }
594
25
        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
595
25
        versioned_pool->state = PoolState::STOPPED;
596
25
        versioned_pool->has_available_process = false;
597
25
        versioned_pool->repairing = false;
598
25
        versioned_pool->cv.notify_all();
599
25
    }
600
601
67
    std::unique_ptr<std::thread> health_check_thread;
602
67
    {
603
67
        std::lock_guard<std::mutex> lock(_health_check_mutex);
604
67
        health_check_thread = std::move(_health_check_thread);
605
67
    }
606
67
    _health_check_cv.notify_one();
607
67
    if (health_check_thread && health_check_thread->joinable()) {
608
8
        health_check_thread->join();
609
8
    }
610
611
    // Shutdown all processes
612
67
    std::vector<ProcessPtr> processes_to_shutdown;
613
67
    for (auto& [version, versioned_pool] : pools_to_shutdown) {
614
25
        if (!versioned_pool) {
615
0
            continue;
616
0
        }
617
25
        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
618
25
        auto& pool = versioned_pool->processes;
619
33
        for (auto& process : pool) {
620
33
            if (process) {
621
24
                processes_to_shutdown.emplace_back(std::move(process));
622
24
            }
623
33
        }
624
25
        pool.clear();
625
25
        versioned_pool->cv.notify_all();
626
25
    }
627
67
    for (auto& process : processes_to_shutdown) {
628
24
        process->shutdown();
629
24
    }
630
67
}
631
632
2
Status PythonServerManager::_read_process_memory(pid_t pid, size_t* rss_bytes) {
633
    // Read from /proc/{pid}/statm
634
    // Format: size resident shared text lib data dt
635
2
    std::string statm_path = fmt::format("/proc/{}/statm", pid);
636
2
    std::ifstream statm_file(statm_path);
637
638
2
    if (!statm_file.is_open()) {
639
1
        return Status::InternalError("Cannot open {}", statm_path);
640
1
    }
641
642
1
    size_t size_pages = 0, rss_pages = 0;
643
    // we only care about RSS, read and ignore the total size field
644
1
    statm_file >> size_pages >> rss_pages;
645
646
1
    if (statm_file.fail()) {
647
0
        return Status::InternalError("Failed to read {}", statm_path);
648
0
    }
649
650
    // Convert pages to bytes
651
1
    long page_size = sysconf(_SC_PAGESIZE);
652
1
    *rss_bytes = rss_pages * page_size;
653
654
1
    return Status::OK();
655
1
}
656
657
0
void PythonServerManager::_refresh_memory_stats() {
658
0
    int64_t total_rss = 0;
659
660
0
    for (const auto& [version, versioned_pool] : _snapshot_process_pools()) {
661
0
        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
662
0
        const auto& pool = versioned_pool->processes;
663
0
        for (const auto& process : pool) {
664
0
            if (!process || !process->is_alive()) continue;
665
666
0
            size_t rss_bytes = 0;
667
0
            Status s = _read_process_memory(process->get_child_pid(), &rss_bytes);
668
669
0
            if (s.ok()) {
670
0
                total_rss += rss_bytes;
671
0
            } else [[unlikely]] {
672
0
                LOG(WARNING) << "Failed to read memory info for Python process (pid="
673
0
                             << process->get_child_pid() << "): " << s.to_string();
674
0
            }
675
0
        }
676
0
    }
677
0
    _mem_tracker.set_consumption(total_rss);
678
0
    LOG(INFO) << _mem_tracker.log_usage();
679
680
0
    if (config::python_udf_processes_memory_limit_bytes > 0 &&
681
0
        total_rss > config::python_udf_processes_memory_limit_bytes) {
682
0
        LOG(WARNING) << "Python UDF process memory usage exceeds limit: rss_bytes=" << total_rss
683
0
                     << ", limit_bytes=" << config::python_udf_processes_memory_limit_bytes;
684
0
    }
685
0
}
686
687
1
Status PythonServerManager::clear_module_cache(const std::string& location) {
688
1
    if (location.empty()) {
689
0
        return Status::InvalidArgument("Empty location for clear_module_cache");
690
0
    }
691
692
1
    std::string body = fmt::format(R"({{"location": "{}"}})", location);
693
1
    return _broadcast_action_to_processes("clear_module_cache", body,
694
1
                                          fmt::format("location={}", location));
695
1
}
696
697
1
void PythonServerManager::clear_udaf_state_cache(int64_t function_id) {
698
1
    std::string body = fmt::format(R"({{"function_id": {}}})", function_id);
699
1
    WARN_IF_ERROR(_broadcast_action_to_processes("clear_udaf_state_cache", body,
700
1
                                                 fmt::format("function_id={}", function_id)),
701
1
                  "failed to clear Python UDAF state cache");
702
1
}
703
704
Status PythonServerManager::_broadcast_action_to_processes(const std::string& action_type,
705
                                                           const std::string& body,
706
3
                                                           const std::string& log_name) {
707
3
    int success_count = 0;
708
3
    int fail_count = 0;
709
3
    bool has_active_process = false;
710
711
3
    for (auto& [version, versioned_pool] : _snapshot_process_pools()) {
712
1
        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
713
1
        auto& pool = versioned_pool->processes;
714
1
        for (auto& process : pool) {
715
1
            if (!process || !process->is_alive()) {
716
0
                continue;
717
0
            }
718
1
            has_active_process = true;
719
1
            try {
720
1
                auto loc_result = arrow::flight::Location::Parse(process->get_uri());
721
1
                if (!loc_result.ok()) [[unlikely]] {
722
1
                    fail_count++;
723
1
                    continue;
724
1
                }
725
726
0
                auto client_result = arrow::flight::FlightClient::Connect(*loc_result);
727
0
                if (!client_result.ok()) [[unlikely]] {
728
0
                    fail_count++;
729
0
                    continue;
730
0
                }
731
0
                auto client = std::move(*client_result);
732
733
0
                arrow::flight::Action action;
734
0
                action.type = action_type;
735
0
                action.body = arrow::Buffer::FromString(body);
736
737
0
                auto result_stream = client->DoAction(action);
738
0
                if (!result_stream.ok()) {
739
0
                    fail_count++;
740
0
                    continue;
741
0
                }
742
743
0
                auto result = (*result_stream)->Next();
744
0
                if (result.ok() && *result) {
745
0
                    success_count++;
746
0
                } else {
747
0
                    fail_count++;
748
0
                }
749
750
0
            } catch (...) {
751
0
                fail_count++;
752
0
            }
753
1
        }
754
1
    }
755
756
3
    if (!has_active_process) {
757
2
        return Status::OK();
758
2
    }
759
760
3
    LOG(INFO) << action_type << " completed for " << log_name << ", success=" << success_count
761
1
              << ", failed=" << fail_count;
762
763
1
    if (fail_count > 0) {
764
1
        return Status::InternalError("{} failed for {}, success={}, failed={}", action_type,
765
1
                                     log_name, success_count, fail_count);
766
1
    }
767
768
0
    return Status::OK();
769
1
}
770
771
// Explicit template instantiation for UDF, UDAF and UDTF clients
772
template Status PythonServerManager::get_client<PythonUDFClient>(
773
        const PythonUDFMeta& func_meta, const PythonVersion& version,
774
        std::shared_ptr<PythonUDFClient>* client,
775
        const std::shared_ptr<arrow::Schema>& data_schema);
776
777
template Status PythonServerManager::get_client<PythonUDAFClient>(
778
        const PythonUDFMeta& func_meta, const PythonVersion& version,
779
        std::shared_ptr<PythonUDAFClient>* client,
780
        const std::shared_ptr<arrow::Schema>& data_schema);
781
782
template Status PythonServerManager::get_client<PythonUDTFClient>(
783
        const PythonUDFMeta& func_meta, const PythonVersion& version,
784
        std::shared_ptr<PythonUDTFClient>* client,
785
        const std::shared_ptr<arrow::Schema>& data_schema);
786
787
} // namespace doris