Coverage Report

Created: 2026-05-09 13:49

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/udf/python/python_server.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <atomic>
21
#include <condition_variable>
22
#include <memory>
23
#include <mutex>
24
#include <thread>
25
#include <unordered_map>
26
#include <vector>
27
28
#include "common/status.h"
29
#include "runtime/memory/mem_tracker.h"
30
#include "udf/python/python_udf_meta.h"
31
#include "udf/python/python_udf_runtime.h"
32
33
namespace doris {
34
class PythonServerManager {
35
public:
36
24
    PythonServerManager() = default;
37
38
24
    ~PythonServerManager() { shutdown(); }
39
40
6
    static PythonServerManager& instance() {
41
6
        static PythonServerManager instance;
42
6
        return instance;
43
6
    }
44
45
    template <typename T>
46
    Status get_client(const PythonUDFMeta& func_meta, const PythonVersion& version,
47
                      std::shared_ptr<T>* client,
48
                      const std::shared_ptr<arrow::Schema>& data_schema = nullptr);
49
50
    Status fork(const PythonVersion& version, ProcessPtr* process);
51
52
    // Clear Python module cache for a specific UDF location across all processes
53
    Status clear_module_cache(const std::string& location);
54
55
    void shutdown();
56
57
#ifdef BE_TEST
58
    // For unit testing only.
59
2
    void check_and_recreate_processes_for_test() { _check_and_recreate_processes(); }
60
61
    void set_process_pool_for_test(const PythonVersion& version, std::vector<ProcessPtr> processes,
62
                                   bool initialized = true);
63
64
    std::vector<ProcessPtr>& process_pool_for_test(const PythonVersion& version);
65
#endif
66
67
private:
68
    struct VersionedProcessPool {
69
        std::mutex mutex;
70
        std::vector<ProcessPtr> processes;
71
        bool initialized = false;
72
    };
73
74
    /** 
75
     * Lazily initialize and return the process pool for specific Python version. 
76
     */
77
    Result<std::shared_ptr<VersionedProcessPool>> _ensure_pool_initialized(
78
            const PythonVersion& version);
79
80
    /**
81
     * Pick an available process from specific pool, recreating one on demand if needed.
82
     */
83
    Status _get_process(const PythonVersion& version,
84
                        const std::shared_ptr<VersionedProcessPool>& versioned_pool,
85
                        ProcessPtr* process);
86
87
    /**
88
     * Start health check background thread (called once by ensure_pool_initialized)
89
     * Thread periodically checks process health and refreshes memory stats
90
     */
91
    void _start_health_check_thread();
92
93
    /**
94
     * Check process health and recreate dead processes
95
     */
96
    void _check_and_recreate_processes();
97
98
    /**
99
     * Read resident set size (RSS) for a single process from /proc/{pid}/statm
100
     */
101
    Status _read_process_memory(pid_t pid, size_t* rss_bytes);
102
103
    /**
104
     * Refresh memory statistics for all Python processes
105
     */
106
    void _refresh_memory_stats();
107
108
    std::shared_ptr<VersionedProcessPool> _get_or_create_process_pool(const PythonVersion& version);
109
    std::vector<std::pair<PythonVersion, std::shared_ptr<VersionedProcessPool>>>
110
    _snapshot_process_pools();
111
112
    std::unordered_map<PythonVersion, std::shared_ptr<VersionedProcessPool>> _process_pools;
113
    // Protects the version -> pool handle map only. Per-version process operations are guarded
114
    // by VersionedProcessPool::mutex.
115
    std::mutex _pools_mutex;
116
    // Health check background thread
117
    std::unique_ptr<std::thread> _health_check_thread;
118
    std::atomic<bool> _shutdown_flag {false};
119
    std::condition_variable _health_check_cv;
120
    std::mutex _health_check_mutex;
121
    MemTracker _mem_tracker {"PythonUDFProcesses"};
122
};
123
124
} // namespace doris