Coverage Report

Created: 2026-05-12 23:26

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/udf/python/python_server.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <atomic>
21
#include <condition_variable>
22
#include <memory>
23
#include <mutex>
24
#include <thread>
25
#include <unordered_map>
26
#include <vector>
27
28
#include "common/status.h"
29
#include "runtime/memory/mem_tracker.h"
30
#include "udf/python/python_udf_meta.h"
31
#include "udf/python/python_udf_runtime.h"
32
33
namespace doris {
34
class PythonServerManager {
35
public:
36
27
    PythonServerManager() = default;
37
38
27
    ~PythonServerManager() { shutdown(); }
39
40
6
    static PythonServerManager& instance() {
41
6
        static PythonServerManager instance;
42
6
        return instance;
43
6
    }
44
45
    template <typename T>
46
    Status get_client(const PythonUDFMeta& func_meta, const PythonVersion& version,
47
                      std::shared_ptr<T>* client,
48
                      const std::shared_ptr<arrow::Schema>& data_schema = nullptr);
49
50
    Status fork(const PythonVersion& version, ProcessPtr* process);
51
52
    // Clear Python module cache for a specific UDF location across all processes
53
    Status clear_module_cache(const std::string& location);
54
55
    // Clear Python UDAF runtime state after DROP FUNCTION
56
    void clear_udaf_state_cache(int64_t function_id);
57
58
    void shutdown();
59
60
#ifdef BE_TEST
61
    // For unit testing only.
62
2
    void check_and_recreate_processes_for_test() { _check_and_recreate_processes(); }
63
64
    void set_process_pool_for_test(const PythonVersion& version, std::vector<ProcessPtr> processes,
65
                                   bool initialized = true);
66
67
    std::vector<ProcessPtr>& process_pool_for_test(const PythonVersion& version);
68
69
    Status broadcast_action_to_processes_for_test(const std::string& action_type,
70
                                                  const std::string& body,
71
1
                                                  const std::string& log_name) {
72
1
        return _broadcast_action_to_processes(action_type, body, log_name);
73
1
    }
74
#endif
75
76
private:
77
    struct VersionedProcessPool {
78
        std::mutex mutex;
79
        std::vector<ProcessPtr> processes;
80
        bool initialized = false;
81
    };
82
83
    /** 
84
     * Lazily initialize and return the process pool for specific Python version. 
85
     */
86
    Result<std::shared_ptr<VersionedProcessPool>> _ensure_pool_initialized(
87
            const PythonVersion& version);
88
89
    /**
90
     * Pick an available process from specific pool, recreating one on demand if needed.
91
     */
92
    Status _get_process(const PythonVersion& version,
93
                        const std::shared_ptr<VersionedProcessPool>& versioned_pool,
94
                        ProcessPtr* process);
95
96
    /**
97
     * Start health check background thread (called once by ensure_pool_initialized)
98
     * Thread periodically checks process health and refreshes memory stats
99
     */
100
    void _start_health_check_thread();
101
102
    /**
103
     * Check process health and recreate dead processes
104
     */
105
    void _check_and_recreate_processes();
106
107
    /**
108
     * Read resident set size (RSS) for a single process from /proc/{pid}/statm
109
     */
110
    Status _read_process_memory(pid_t pid, size_t* rss_bytes);
111
112
    /**
113
     * Refresh memory statistics for all Python processes
114
     */
115
    void _refresh_memory_stats();
116
117
    std::shared_ptr<VersionedProcessPool> _get_or_create_process_pool(const PythonVersion& version);
118
    std::vector<std::pair<PythonVersion, std::shared_ptr<VersionedProcessPool>>>
119
    _snapshot_process_pools();
120
    Status _broadcast_action_to_processes(const std::string& action_type, const std::string& body,
121
                                          const std::string& log_name);
122
123
    std::unordered_map<PythonVersion, std::shared_ptr<VersionedProcessPool>> _process_pools;
124
    // Protects the version -> pool handle map only. Per-version process operations are guarded
125
    // by VersionedProcessPool::mutex.
126
    std::mutex _pools_mutex;
127
    // Health check background thread
128
    std::unique_ptr<std::thread> _health_check_thread;
129
    std::atomic<bool> _shutdown_flag {false};
130
    std::condition_variable _health_check_cv;
131
    std::mutex _health_check_mutex;
132
    MemTracker _mem_tracker {"PythonUDFProcesses"};
133
};
134
135
} // namespace doris