Coverage Report

Created: 2026-03-13 03:47

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/udf/python/python_server.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <atomic>
21
#include <condition_variable>
22
#include <memory>
23
#include <thread>
24
25
#include "common/status.h"
26
#include "runtime/memory/mem_tracker.h"
27
#include "udf/python/python_udf_meta.h"
28
#include "udf/python/python_udf_runtime.h"
29
30
namespace doris {
31
class PythonServerManager {
32
public:
33
24
    PythonServerManager() = default;
34
35
23
    ~PythonServerManager() { shutdown(); }
36
37
6.34k
    static PythonServerManager& instance() {
38
6.34k
        static PythonServerManager instance;
39
6.34k
        return instance;
40
6.34k
    }
41
42
    template <typename T>
43
    Status get_client(const PythonUDFMeta& func_meta, const PythonVersion& version,
44
                      std::shared_ptr<T>* client,
45
                      const std::shared_ptr<arrow::Schema>& data_schema = nullptr);
46
47
    Status fork(const PythonVersion& version, ProcessPtr* process);
48
49
    Status get_process(const PythonVersion& version, ProcessPtr* process);
50
51
    // Clear Python module cache for a specific UDF location across all processes
52
    Status clear_module_cache(const std::string& location);
53
54
    Status ensure_pool_initialized(const PythonVersion& version);
55
56
    void shutdown();
57
58
#ifdef BE_TEST
59
    // For unit testing only.
60
    void check_and_recreate_processes_for_test() { _check_and_recreate_processes(); }
61
62
    std::unordered_map<PythonVersion, std::vector<ProcessPtr>>& process_pools_for_test() {
63
        return _process_pools;
64
    }
65
#endif
66
67
private:
68
    /**
69
     * Start health check background thread (called once by ensure_pool_initialized)
70
     * Thread periodically checks process health and refreshes memory stats
71
     */
72
    void _start_health_check_thread();
73
74
    /**
75
     * Check process health and recreate dead processes
76
     */
77
    void _check_and_recreate_processes();
78
79
    /**
80
     * Read resident set size (RSS) for a single process from /proc/{pid}/statm
81
     */
82
    Status _read_process_memory(pid_t pid, size_t* rss_bytes);
83
84
    /**
85
     * Refresh memory statistics for all Python processes
86
     */
87
    void _refresh_memory_stats();
88
89
    std::unordered_map<PythonVersion, std::vector<ProcessPtr>> _process_pools;
90
    // Protects _process_pools access
91
    std::mutex _pools_mutex;
92
    // Track which versions have been initialized
93
    std::unordered_set<PythonVersion> _initialized_versions;
94
    // Health check background thread
95
    std::unique_ptr<std::thread> _health_check_thread;
96
    std::atomic<bool> _shutdown_flag {false};
97
    std::condition_variable _health_check_cv;
98
    std::mutex _health_check_mutex;
99
    MemTracker _mem_tracker {"PythonUDFProcesses"};
100
};
101
102
} // namespace doris