Coverage Report

Created: 2026-06-09 20:15

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/udf/python/python_server.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <atomic>
21
#include <chrono>
22
#include <condition_variable>
23
#include <memory>
24
#include <mutex>
25
#include <thread>
26
#include <unordered_map>
27
#include <vector>
28
29
#include "common/status.h"
30
#include "runtime/memory/mem_tracker.h"
31
#include "udf/python/python_udf_meta.h"
32
#include "udf/python/python_udf_runtime.h"
33
34
namespace doris {
35
class PythonServerManager {
36
public:
37
5
    PythonServerManager() = default;
38
39
4
    ~PythonServerManager() { shutdown(); }
40
41
6.06k
    static PythonServerManager& instance() {
42
6.06k
        static PythonServerManager instance;
43
6.06k
        return instance;
44
6.06k
    }
45
46
    template <typename T>
47
    Status get_client(const PythonUDFMeta& func_meta, const PythonVersion& version,
48
                      std::shared_ptr<T>* client,
49
                      const std::shared_ptr<arrow::Schema>& data_schema = nullptr);
50
51
    static Status fork(const PythonVersion& version, ProcessPtr* process);
52
53
    // Clear Python module cache for a specific UDF location across all processes
54
    Status clear_module_cache(const std::string& location);
55
56
    // Clear Python UDAF runtime state after DROP FUNCTION
57
    void clear_udaf_state_cache(int64_t function_id);
58
59
    void shutdown();
60
61
#ifdef BE_TEST
62
    // For unit testing only.
63
    void check_and_recreate_processes_for_test() { _check_and_recreate_processes(); }
64
65
    void set_process_pool_for_test(const PythonVersion& version, std::vector<ProcessPtr> processes,
66
                                   bool initialized = true);
67
68
    std::vector<ProcessPtr> process_pool_snapshot_for_test(const PythonVersion& version);
69
70
    bool process_pool_is_initializing_for_test(const PythonVersion& version);
71
72
    bool process_pool_is_initialized_for_test(const PythonVersion& version);
73
74
    Status broadcast_action_to_processes_for_test(const std::string& action_type,
75
                                                  const std::string& body,
76
                                                  const std::string& log_name) {
77
        return _broadcast_action_to_processes(action_type, body, log_name);
78
    }
79
#endif
80
81
private:
82
    enum class PoolState {
83
        UNINITIALIZED,
84
        INITIALIZING,
85
        // `Initialized` means:
86
        // 1. All process slots have attempted initialization.
87
        // 2. At least one live process is available for requests.
88
        // 3. The health-check thread has been started.
89
        INITIALIZED,
90
        // Prevent the pool from being incorrectly reused after `shutdown()`
91
        STOPPED,
92
    };
93
94
    struct VersionedProcessPool {
95
        std::mutex mutex;
96
        // Coordinates initialization and repair workers with foreground requests.
97
        std::condition_variable cv;
98
        std::vector<ProcessPtr> processes;
99
        PoolState state = PoolState::UNINITIALIZED;
100
        // True when at least one process in the pool can serve requests.
101
        bool has_available_process = false;
102
        // True while a background repair is recreating dead or missing processes.
103
        bool repairing = false;
104
    };
105
106
    /** 
107
     * Lazily initialize and return the process pool for specific Python version. 
108
     */
109
    Result<std::shared_ptr<VersionedProcessPool>> _ensure_pool_initialized(
110
            const PythonVersion& version);
111
112
    /**
113
     * Pick an available process from specific pool, recreating one on demand if needed.
114
     */
115
    Status _get_process(const PythonVersion& version,
116
                        const std::shared_ptr<VersionedProcessPool>& versioned_pool,
117
                        ProcessPtr* process);
118
119
    /**
120
     * Start health check background thread (called once by ensure_pool_initialized)
121
     * Thread periodically checks process health and refreshes memory stats
122
     */
123
    void _start_health_check_thread();
124
125
    /**
126
     * Check process health and recreate dead processes
127
     */
128
    void _check_and_recreate_processes();
129
130
#ifdef BE_TEST
131
    static constexpr std::chrono::milliseconds PROCESS_START_TIMEOUT {500};
132
    static constexpr std::chrono::milliseconds PROCESS_TERMINATE_TIMEOUT {100};
133
    static constexpr std::chrono::milliseconds PROCESS_POOL_INIT_TIMEOUT {1000};
134
    static constexpr std::chrono::milliseconds PROCESS_REPAIR_WAIT_TIMEOUT {200};
135
#else
136
    static constexpr std::chrono::milliseconds PROCESS_START_TIMEOUT {5000};
137
    static constexpr std::chrono::milliseconds PROCESS_TERMINATE_TIMEOUT {1000};
138
    // FE's default send-fragments RPC timeout is 30s. Keep BE's Python pool wait below it so the
139
    // caller sees SERVICE_UNAVAILABLE with Python context instead of a generic RPC deadline error.
140
    static constexpr std::chrono::milliseconds PROCESS_POOL_INIT_TIMEOUT {20000};
141
    static constexpr std::chrono::milliseconds PROCESS_REPAIR_WAIT_TIMEOUT {1000};
142
#endif
143
    static int _repair_process_pool(const PythonVersion& version,
144
                                    const std::shared_ptr<VersionedProcessPool>& versioned_pool);
145
146
    /**
147
     * Read resident set size (RSS) for a single process from /proc/{pid}/statm
148
     */
149
    Status _read_process_memory(pid_t pid, size_t* rss_bytes);
150
151
    /**
152
     * Refresh memory statistics for all Python processes
153
     */
154
    void _refresh_memory_stats();
155
156
    Result<std::shared_ptr<VersionedProcessPool>> _get_or_create_process_pool(
157
            const PythonVersion& version);
158
    std::vector<std::pair<PythonVersion, std::shared_ptr<VersionedProcessPool>>>
159
    _snapshot_process_pools();
160
    Status _broadcast_action_to_processes(const std::string& action_type, const std::string& body,
161
                                          const std::string& log_name);
162
    static bool _select_alive_process_from_pool(const std::vector<ProcessPtr>& pool,
163
                                                ProcessPtr* process);
164
165
    std::unordered_map<PythonVersion, std::shared_ptr<VersionedProcessPool>> _process_pools;
166
    // Protects the version -> pool handle map only. Per-version process operations are guarded
167
    // by VersionedProcessPool::mutex.
168
    std::mutex _pools_mutex;
169
    // Health check background thread
170
    std::unique_ptr<std::thread> _health_check_thread;
171
    std::atomic<bool> _shutdown_flag {false};
172
    std::condition_variable _health_check_cv;
173
    std::mutex _health_check_mutex;
174
    MemTracker _mem_tracker {"PythonUDFProcesses"};
175
};
176
177
} // namespace doris