Coverage Report

Created: 2026-03-13 09:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/udf/python/python_udf_runtime.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <boost/process.hpp>
21
22
#include "udf/python/python_env.h"
23
24
namespace doris {
25
26
static const char* UNIX_SOCKET_PREFIX = "grpc+unix://";
27
// Use /tmp directory for Unix socket to avoid path length limit (max 107 chars)
28
static const char* BASE_UNIX_SOCKET_PATH_TEMPLATE = "/tmp/doris_python_udf";
29
static const char* UNIX_SOCKET_PATH_TEMPLATE = "{}_{}.sock";
30
static const char* FLIGHT_SERVER_PATH_TEMPLATE = "{}/plugins/python_udf/{}";
31
static const char* FLIGHT_SERVER_FILENAME = "python_server.py";
32
33
220
inline std::string get_base_unix_socket_path() {
34
220
    return fmt::format("{}{}", UNIX_SOCKET_PREFIX, BASE_UNIX_SOCKET_PATH_TEMPLATE);
35
220
}
36
37
98
inline std::string get_unix_socket_path(pid_t child_pid) {
38
98
    return fmt::format(UNIX_SOCKET_PATH_TEMPLATE, get_base_unix_socket_path(), child_pid);
39
98
}
40
41
184
inline std::string get_unix_socket_file_path(pid_t child_pid) {
42
184
    return fmt::format(UNIX_SOCKET_PATH_TEMPLATE, BASE_UNIX_SOCKET_PATH_TEMPLATE, child_pid);
43
184
}
44
45
122
inline std::string get_fight_server_path() {
46
122
    return fmt::format(FLIGHT_SERVER_PATH_TEMPLATE, std::getenv("DORIS_HOME"),
47
122
                       FLIGHT_SERVER_FILENAME);
48
122
}
49
50
class PythonUDFProcess;
51
52
using ProcessPtr = std::shared_ptr<PythonUDFProcess>;
53
54
class PythonUDFProcess {
55
public:
56
    PythonUDFProcess(boost::process::child child, boost::process::ipstream output_stream)
57
92
            : _is_shutdown(false),
58
92
              _child_pid(child.id()),
59
92
              _uri(get_unix_socket_path(_child_pid)),
60
92
              _unix_socket_file_path(get_unix_socket_file_path(_child_pid)),
61
92
              _child(std::move(child)),
62
92
              _output_stream(std::move(output_stream)) {}
63
64
28
    ~PythonUDFProcess() { shutdown(); }
65
66
5.71k
    std::string get_uri() const { return _uri; }
67
68
    const std::string& get_socket_file_path() const { return _unix_socket_file_path; }
69
70
    bool is_shutdown() const { return _is_shutdown; }
71
72
10.7k
    bool is_alive() const { return !_is_shutdown && _child.running(); }
73
74
    void remove_unix_socket();
75
76
    void shutdown();
77
78
    std::string to_string() const;
79
80
5.38k
    pid_t get_child_pid() const { return _child_pid; }
81
82
1.20k
    bool operator==(const PythonUDFProcess& other) const { return _child_pid == other._child_pid; }
83
84
1.20k
    bool operator!=(const PythonUDFProcess& other) const { return !(*this == other); }
85
86
private:
87
    constexpr static int TERMINATE_RETRY_TIMES = 10;
88
    constexpr static size_t MAX_ACCUMULATED_LOG_SIZE = 65536;
89
90
    bool _is_shutdown {false};
91
    pid_t _child_pid;
92
    std::string _uri;
93
    std::string _unix_socket_file_path;
94
    mutable boost::process::child _child;
95
    boost::process::ipstream _output_stream;
96
    std::string _accumulated_log;
97
};
98
99
} // namespace doris