Coverage Report

Created: 2026-06-10 06:38

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/udf/python/python_udf_runtime.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <boost/process.hpp>
21
#include <chrono>
22
23
#include "udf/python/python_env.h"
24
25
namespace doris {
26
27
static const char* UNIX_SOCKET_PREFIX = "grpc+unix://";
28
// Use /tmp directory for Unix socket to avoid path length limit (max 107 chars)
29
static const char* BASE_UNIX_SOCKET_PATH_TEMPLATE = "/tmp/doris_python_udf";
30
static const char* UNIX_SOCKET_PATH_TEMPLATE = "{}_{}.sock";
31
static const char* FLIGHT_SERVER_PATH_TEMPLATE = "{}/plugins/python_udf/{}";
32
static const char* FLIGHT_SERVER_FILENAME = "python_server.py";
33
34
32
inline std::string get_base_unix_socket_path() {
35
32
    return fmt::format("{}{}", UNIX_SOCKET_PREFIX, BASE_UNIX_SOCKET_PATH_TEMPLATE);
36
32
}
37
38
16
inline std::string get_unix_socket_path(pid_t child_pid) {
39
16
    return fmt::format(UNIX_SOCKET_PATH_TEMPLATE, get_base_unix_socket_path(), child_pid);
40
16
}
41
42
32
inline std::string get_unix_socket_file_path(pid_t child_pid) {
43
32
    return fmt::format(UNIX_SOCKET_PATH_TEMPLATE, BASE_UNIX_SOCKET_PATH_TEMPLATE, child_pid);
44
32
}
45
46
16
inline std::string get_fight_server_path() {
47
16
    return fmt::format(FLIGHT_SERVER_PATH_TEMPLATE, std::getenv("DORIS_HOME"),
48
16
                       FLIGHT_SERVER_FILENAME);
49
16
}
50
51
class PythonUDFProcess;
52
53
using ProcessPtr = std::shared_ptr<PythonUDFProcess>;
54
55
class PythonUDFProcess {
56
public:
57
    PythonUDFProcess(boost::process::child child, boost::process::ipstream output_stream)
58
16
            : _is_shutdown(false),
59
16
              _child_pid(child.id()),
60
16
              _uri(get_unix_socket_path(_child_pid)),
61
16
              _unix_socket_file_path(get_unix_socket_file_path(_child_pid)),
62
16
              _child(std::move(child)),
63
16
              _output_stream(std::move(output_stream)) {}
64
65
0
    ~PythonUDFProcess() { shutdown(); }
66
67
6.14k
    std::string get_uri() const { return _uri; }
68
69
0
    const std::string& get_socket_file_path() const { return _unix_socket_file_path; }
70
71
0
    bool is_shutdown() const { return _is_shutdown; }
72
73
191k
    bool is_alive() const { return !_is_shutdown && _child.running(); }
74
75
    void remove_unix_socket();
76
77
    void shutdown();
78
79
    enum class ChildExitWaitResult { EXITED, ALREADY_REAPED, TIMEOUT, ERROR };
80
81
    static ChildExitWaitResult wait_child_exit(pid_t pid, std::chrono::milliseconds timeout,
82
                                               int* exit_status);
83
84
    // Hand off a killed child that could not be reaped synchronously. The background reaper keeps
85
    // waitpid ownership so a later child exit will not become a zombie under BE.
86
    static void enqueue_child_for_reap(pid_t pid);
87
88
    std::string to_string() const;
89
90
832
    pid_t get_child_pid() const { return _child_pid; }
91
92
1.11k
    bool operator==(const PythonUDFProcess& other) const { return _child_pid == other._child_pid; }
93
94
1.11k
    bool operator!=(const PythonUDFProcess& other) const { return !(*this == other); }
95
96
#ifdef BE_TEST
97
    void set_uri_for_test(std::string uri) { _uri = std::move(uri); }
98
99
    static bool wait_background_reaped_for_test(pid_t pid, std::chrono::milliseconds timeout);
100
101
    static void force_child_exit_timeouts_for_test(int count);
102
#endif
103
104
private:
105
    constexpr static size_t MAX_ACCUMULATED_LOG_SIZE = 65536;
106
107
    bool _is_shutdown {false};
108
    pid_t _child_pid;
109
    std::string _uri;
110
    std::string _unix_socket_file_path;
111
    mutable boost::process::child _child;
112
    boost::process::ipstream _output_stream;
113
    std::string _accumulated_log;
114
};
115
116
} // namespace doris