Coverage Report

Created: 2026-03-14 13:33

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/udf/python/python_server.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "udf/python/python_server.h"
19
20
#include <arrow/type_fwd.h>
21
#include <butil/fd_utility.h>
22
#include <dirent.h>
23
#include <fmt/core.h>
24
#include <sys/poll.h>
25
#include <sys/stat.h>
26
27
#include <boost/asio.hpp>
28
#include <boost/process.hpp>
29
#include <fstream>
30
31
#include "arrow/flight/client.h"
32
#include "common/config.h"
33
#include "udf/python/python_udaf_client.h"
34
#include "udf/python/python_udf_client.h"
35
#include "udf/python/python_udtf_client.h"
36
#include "util/cpu_info.h"
37
38
namespace doris {
39
40
template <typename T>
41
Status PythonServerManager::get_client(const PythonUDFMeta& func_meta, const PythonVersion& version,
42
                                       std::shared_ptr<T>* client,
43
6.05k
                                       const std::shared_ptr<arrow::Schema>& data_schema) {
44
    // Ensure process pool is initialized for this version
45
6.05k
    RETURN_IF_ERROR(ensure_pool_initialized(version));
46
47
6.05k
    ProcessPtr process;
48
6.05k
    RETURN_IF_ERROR(get_process(version, &process));
49
50
6.05k
    if constexpr (std::is_same_v<T, PythonUDAFClient>) {
51
3.22k
        RETURN_IF_ERROR(T::create(func_meta, std::move(process), data_schema, client));
52
3.22k
    } else {
53
2.82k
        RETURN_IF_ERROR(T::create(func_meta, std::move(process), client));
54
2.82k
    }
55
56
6.05k
    return Status::OK();
57
6.05k
}
_ZN5doris19PythonServerManager10get_clientINS_15PythonUDFClientEEENS_6StatusERKNS_13PythonUDFMetaERKNS_13PythonVersionEPSt10shared_ptrIT_ERKSA_IN5arrow6SchemaEE
Line
Count
Source
43
1.19k
                                       const std::shared_ptr<arrow::Schema>& data_schema) {
44
    // Ensure process pool is initialized for this version
45
1.19k
    RETURN_IF_ERROR(ensure_pool_initialized(version));
46
47
1.19k
    ProcessPtr process;
48
1.19k
    RETURN_IF_ERROR(get_process(version, &process));
49
50
    if constexpr (std::is_same_v<T, PythonUDAFClient>) {
51
        RETURN_IF_ERROR(T::create(func_meta, std::move(process), data_schema, client));
52
1.19k
    } else {
53
1.19k
        RETURN_IF_ERROR(T::create(func_meta, std::move(process), client));
54
1.19k
    }
55
56
1.19k
    return Status::OK();
57
1.19k
}
_ZN5doris19PythonServerManager10get_clientINS_16PythonUDAFClientEEENS_6StatusERKNS_13PythonUDFMetaERKNS_13PythonVersionEPSt10shared_ptrIT_ERKSA_IN5arrow6SchemaEE
Line
Count
Source
43
3.22k
                                       const std::shared_ptr<arrow::Schema>& data_schema) {
44
    // Ensure process pool is initialized for this version
45
3.22k
    RETURN_IF_ERROR(ensure_pool_initialized(version));
46
47
3.22k
    ProcessPtr process;
48
3.22k
    RETURN_IF_ERROR(get_process(version, &process));
49
50
3.22k
    if constexpr (std::is_same_v<T, PythonUDAFClient>) {
51
3.22k
        RETURN_IF_ERROR(T::create(func_meta, std::move(process), data_schema, client));
52
    } else {
53
        RETURN_IF_ERROR(T::create(func_meta, std::move(process), client));
54
    }
55
56
3.22k
    return Status::OK();
57
3.22k
}
_ZN5doris19PythonServerManager10get_clientINS_16PythonUDTFClientEEENS_6StatusERKNS_13PythonUDFMetaERKNS_13PythonVersionEPSt10shared_ptrIT_ERKSA_IN5arrow6SchemaEE
Line
Count
Source
43
1.63k
                                       const std::shared_ptr<arrow::Schema>& data_schema) {
44
    // Ensure process pool is initialized for this version
45
1.63k
    RETURN_IF_ERROR(ensure_pool_initialized(version));
46
47
1.63k
    ProcessPtr process;
48
1.63k
    RETURN_IF_ERROR(get_process(version, &process));
49
50
    if constexpr (std::is_same_v<T, PythonUDAFClient>) {
51
        RETURN_IF_ERROR(T::create(func_meta, std::move(process), data_schema, client));
52
1.63k
    } else {
53
1.63k
        RETURN_IF_ERROR(T::create(func_meta, std::move(process), client));
54
1.63k
    }
55
56
1.63k
    return Status::OK();
57
1.63k
}
58
59
6.06k
Status PythonServerManager::ensure_pool_initialized(const PythonVersion& version) {
60
6.06k
    std::lock_guard<std::mutex> lock(_pools_mutex);
61
62
    // Check if already initialized
63
6.07k
    if (_initialized_versions.count(version)) return Status::OK();
64
65
18.4E
    std::vector<ProcessPtr>& pool = _process_pools[version];
66
    // 0 means use CPU core count as default, otherwise use the specified value
67
18.4E
    int max_pool_size = config::max_python_process_num > 0 ? config::max_python_process_num
68
18.4E
                                                           : CpuInfo::num_cores();
69
70
18.4E
    LOG(INFO) << "Initializing Python process pool for version " << version.to_string() << " with "
71
18.4E
              << max_pool_size
72
18.4E
              << " processes (config::max_python_process_num=" << config::max_python_process_num
73
18.4E
              << ", CPU cores=" << CpuInfo::num_cores() << ")";
74
75
18.4E
    std::vector<std::future<Status>> futures;
76
18.4E
    std::vector<ProcessPtr> temp_processes(max_pool_size);
77
78
18.4E
    for (int i = 0; i < max_pool_size; i++) {
79
109
        futures.push_back(std::async(std::launch::async, [this, &version, i, &temp_processes]() {
80
109
            ProcessPtr process;
81
109
            Status s = fork(version, &process);
82
109
            if (s.ok()) {
83
73
                temp_processes[i] = std::move(process);
84
73
            }
85
109
            return s;
86
109
        }));
87
109
    }
88
89
18.4E
    int success_count = 0;
90
18.4E
    int failure_count = 0;
91
18.4E
    for (int i = 0; i < max_pool_size; i++) {
92
109
        Status s = futures[i].get();
93
109
        if (s.ok() && temp_processes[i]) {
94
73
            pool.push_back(std::move(temp_processes[i]));
95
73
            success_count++;
96
73
        } else {
97
36
            failure_count++;
98
36
            LOG(WARNING) << "Failed to create Python process " << (i + 1) << "/" << max_pool_size
99
36
                         << ": " << s.to_string();
100
36
        }
101
109
    }
102
103
18.4E
    if (pool.empty()) {
104
3
        return Status::InternalError(
105
3
                "Failed to initialize Python process pool: all {} process creation attempts failed",
106
3
                max_pool_size);
107
3
    }
108
109
18.4E
    LOG(INFO) << "Python process pool initialized for version " << version.to_string()
110
18.4E
              << ": created " << success_count << " processes"
111
18.4E
              << (failure_count > 0 ? fmt::format(" ({} failed)", failure_count) : "");
112
113
18.4E
    _initialized_versions.insert(version);
114
18.4E
    _start_health_check_thread();
115
116
18.4E
    return Status::OK();
117
18.4E
}
118
119
6.08k
Status PythonServerManager::get_process(const PythonVersion& version, ProcessPtr* process) {
120
6.08k
    std::lock_guard<std::mutex> lock(_pools_mutex);
121
6.08k
    std::vector<ProcessPtr>& pool = _process_pools[version];
122
123
6.08k
    if (UNLIKELY(pool.empty())) {
124
1
        return Status::InternalError("Python process pool is empty for version {}",
125
1
                                     version.to_string());
126
1
    }
127
128
    // Find process with minimum load (use_count - 1 gives active client count)
129
6.08k
    auto min_iter = std::min_element(
130
6.08k
            pool.begin(), pool.end(),
131
382k
            [](const ProcessPtr& a, const ProcessPtr& b) { return a.use_count() < b.use_count(); });
132
133
    // Return process with minimum load
134
6.08k
    *process = *min_iter;
135
6.08k
    return Status::OK();
136
6.08k
}
137
138
120
Status PythonServerManager::fork(const PythonVersion& version, ProcessPtr* process) {
139
120
    std::string python_executable_path = version.get_executable_path();
140
120
    std::string fight_server_path = get_fight_server_path();
141
120
    std::string base_unix_socket_path = get_base_unix_socket_path();
142
120
    std::vector<std::string> args = {"-u", fight_server_path, base_unix_socket_path};
143
120
    boost::process::environment env = boost::this_process::environment();
144
120
    boost::process::ipstream child_output;
145
146
120
    try {
147
120
        boost::process::child c(
148
120
                python_executable_path, args, boost::process::std_out > child_output,
149
120
                boost::process::env = env,
150
120
                boost::process::on_exit([](int exit_code, const std::error_code& ec) {
151
0
                    if (ec) {
152
0
                        LOG(WARNING) << "Python UDF server exited with error: " << ec.message();
153
0
                    }
154
0
                }));
155
156
        // Wait for socket file to be created (indicates server is ready)
157
120
        std::string expected_socket_path = get_unix_socket_file_path(c.id());
158
120
        bool started_successfully = false;
159
120
        std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();
160
120
        const auto timeout = std::chrono::milliseconds(5000);
161
162
3.12k
        while (std::chrono::steady_clock::now() - start < timeout) {
163
3.09k
            struct stat buffer;
164
3.09k
            if (stat(expected_socket_path.c_str(), &buffer) == 0) {
165
79
                started_successfully = true;
166
79
                break;
167
79
            }
168
169
3.01k
            if (!c.running()) {
170
2
                break;
171
2
            }
172
3.00k
            std::this_thread::sleep_for(std::chrono::milliseconds(1));
173
3.00k
        }
174
175
120
        if (!started_successfully) {
176
2
            if (c.running()) {
177
0
                c.terminate();
178
0
                c.wait();
179
0
            }
180
2
            return Status::InternalError("Python server start failed: socket file not found at {}",
181
2
                                         expected_socket_path);
182
2
        }
183
184
118
        *process = std::make_shared<PythonUDFProcess>(std::move(c), std::move(child_output));
185
186
118
    } catch (const std::exception& e) {
187
40
        return Status::InternalError("Failed to start Python UDF server: {}", e.what());
188
40
    }
189
190
79
    return Status::OK();
191
120
}
192
193
8
void PythonServerManager::_start_health_check_thread() {
194
8
    if (_health_check_thread) return;
195
196
8
    LOG(INFO) << "Starting Python process health check thread (interval: 30 seconds)";
197
198
7
    _health_check_thread = std::make_unique<std::thread>([this]() {
199
        // Health check loop
200
87
        while (!_shutdown_flag.load(std::memory_order_acquire)) {
201
            // Wait for interval or shutdown signal
202
81
            {
203
81
                std::unique_lock<std::mutex> lock(_health_check_mutex);
204
161
                _health_check_cv.wait_for(lock, std::chrono::seconds(30), [this]() {
205
161
                    return _shutdown_flag.load(std::memory_order_acquire);
206
161
                });
207
81
            }
208
209
81
            if (_shutdown_flag.load(std::memory_order_acquire)) break;
210
211
80
            _check_and_recreate_processes();
212
80
            _refresh_memory_stats();
213
80
        }
214
215
7
        LOG(INFO) << "Python process health check thread exiting";
216
7
    });
217
7
}
218
219
81
void PythonServerManager::_check_and_recreate_processes() {
220
81
    std::lock_guard<std::mutex> lock(_pools_mutex);
221
222
81
    int total_checked = 0;
223
81
    int total_dead = 0;
224
81
    int total_recreated = 0;
225
226
81
    for (auto& [version, pool] : _process_pools) {
227
5.14k
        for (size_t i = 0; i < pool.size(); ++i) {
228
5.06k
            auto& process = pool[i];
229
5.06k
            if (!process) continue;
230
231
5.06k
            total_checked++;
232
5.06k
            if (!process->is_alive()) {
233
3
                total_dead++;
234
3
                LOG(WARNING) << "Detected dead Python process (pid=" << process->get_child_pid()
235
3
                             << ", version=" << version.to_string() << "), recreating...";
236
237
3
                ProcessPtr new_process;
238
3
                Status s = fork(version, &new_process);
239
3
                if (s.ok()) {
240
1
                    pool[i] = std::move(new_process);
241
1
                    total_recreated++;
242
1
                    LOG(INFO) << "Successfully recreated Python process for version "
243
1
                              << version.to_string();
244
2
                } else {
245
2
                    LOG(ERROR) << "Failed to recreate Python process for version "
246
2
                               << version.to_string() << ": " << s.to_string();
247
2
                    pool.erase(pool.begin() + i);
248
2
                    --i;
249
2
                }
250
3
            }
251
5.06k
        }
252
81
    }
253
254
81
    if (total_dead > 0) {
255
2
        LOG(INFO) << "Health check completed: checked=" << total_checked << ", dead=" << total_dead
256
2
                  << ", recreated=" << total_recreated;
257
2
    }
258
81
}
259
260
43
void PythonServerManager::shutdown() {
261
    // Signal health check thread to stop
262
43
    _shutdown_flag.store(true, std::memory_order_release);
263
43
    _health_check_cv.notify_one();
264
265
43
    if (_health_check_thread && _health_check_thread->joinable()) {
266
6
        _health_check_thread->join();
267
6
        _health_check_thread.reset();
268
6
    }
269
270
    // Shutdown all processes
271
43
    std::lock_guard<std::mutex> lock(_pools_mutex);
272
43
    for (auto& [version, pool] : _process_pools) {
273
13
        for (auto& process : pool) {
274
12
            if (process) {
275
11
                process->shutdown();
276
11
            }
277
12
        }
278
13
    }
279
43
    _process_pools.clear();
280
43
}
281
282
5.05k
Status PythonServerManager::_read_process_memory(pid_t pid, size_t* rss_bytes) {
283
    // Read from /proc/{pid}/statm
284
    // Format: size resident shared text lib data dt
285
5.05k
    std::string statm_path = fmt::format("/proc/{}/statm", pid);
286
5.05k
    std::ifstream statm_file(statm_path);
287
288
5.05k
    if (!statm_file.is_open()) {
289
0
        return Status::InternalError("Cannot open {}", statm_path);
290
0
    }
291
292
5.05k
    size_t size_pages = 0, rss_pages = 0;
293
    // we only care about RSS, read and ignore the total size field
294
5.05k
    statm_file >> size_pages >> rss_pages;
295
296
5.05k
    if (statm_file.fail()) {
297
0
        return Status::InternalError("Failed to read {}", statm_path);
298
0
    }
299
300
    // Convert pages to bytes
301
5.05k
    long page_size = sysconf(_SC_PAGESIZE);
302
5.05k
    *rss_bytes = rss_pages * page_size;
303
304
5.05k
    return Status::OK();
305
5.05k
}
306
307
79
void PythonServerManager::_refresh_memory_stats() {
308
79
    std::lock_guard<std::mutex> lock(_pools_mutex);
309
310
79
    int64_t total_rss = 0;
311
312
79
    for (const auto& [version, pool] : _process_pools) {
313
5.05k
        for (const auto& process : pool) {
314
5.05k
            if (!process || !process->is_alive()) continue;
315
316
5.05k
            size_t rss_bytes = 0;
317
5.05k
            Status s = _read_process_memory(process->get_child_pid(), &rss_bytes);
318
319
5.05k
            if (s.ok()) {
320
5.05k
                total_rss += rss_bytes;
321
5.05k
            } else [[unlikely]] {
322
0
                LOG(WARNING) << "Failed to read memory info for Python process (pid="
323
0
                             << process->get_child_pid() << "): " << s.to_string();
324
0
            }
325
5.05k
        }
326
79
    }
327
79
    _mem_tracker.set_consumption(total_rss);
328
79
    LOG(INFO) << _mem_tracker.log_usage();
329
330
79
    if (config::python_udf_processes_memory_limit_bytes > 0 &&
331
79
        total_rss > config::python_udf_processes_memory_limit_bytes) {
332
0
        LOG(WARNING) << "Python UDF process memory usage exceeds limit: rss_bytes=" << total_rss
333
0
                     << ", limit_bytes=" << config::python_udf_processes_memory_limit_bytes;
334
0
    }
335
79
}
336
337
0
Status PythonServerManager::clear_module_cache(const std::string& location) {
338
0
    if (location.empty()) {
339
0
        return Status::InvalidArgument("Empty location for clear_module_cache");
340
0
    }
341
342
0
    std::lock_guard<std::mutex> lock(_pools_mutex);
343
344
0
    std::string body = fmt::format(R"({{"location": "{}"}})", location);
345
346
0
    int success_count = 0;
347
0
    int fail_count = 0;
348
0
    bool has_active_process = false;
349
350
0
    for (auto& [version, pool] : _process_pools) {
351
0
        for (auto& process : pool) {
352
0
            if (!process || !process->is_alive()) {
353
0
                continue;
354
0
            }
355
0
            has_active_process = true;
356
0
            try {
357
0
                auto loc_result = arrow::flight::Location::Parse(process->get_uri());
358
0
                if (!loc_result.ok()) [[unlikely]] {
359
0
                    fail_count++;
360
0
                    continue;
361
0
                }
362
363
0
                auto client_result = arrow::flight::FlightClient::Connect(*loc_result);
364
0
                if (!client_result.ok()) [[unlikely]] {
365
0
                    fail_count++;
366
0
                    continue;
367
0
                }
368
0
                auto client = std::move(*client_result);
369
370
0
                arrow::flight::Action action;
371
0
                action.type = "clear_module_cache";
372
0
                action.body = arrow::Buffer::FromString(body);
373
374
0
                auto result_stream = client->DoAction(action);
375
0
                if (!result_stream.ok()) {
376
0
                    fail_count++;
377
0
                    continue;
378
0
                }
379
380
0
                auto result = (*result_stream)->Next();
381
0
                if (result.ok() && *result) {
382
0
                    success_count++;
383
0
                } else {
384
0
                    fail_count++;
385
0
                }
386
387
0
            } catch (...) {
388
0
                fail_count++;
389
0
            }
390
0
        }
391
0
    }
392
393
0
    if (!has_active_process) {
394
0
        return Status::OK();
395
0
    }
396
397
0
    LOG(INFO) << "clear_module_cache completed for location=" << location
398
0
              << ", success=" << success_count << ", failed=" << fail_count;
399
400
0
    if (fail_count > 0) {
401
0
        return Status::InternalError(
402
0
                "clear_module_cache failed for location={}, success={}, failed={}", location,
403
0
                success_count, fail_count);
404
0
    }
405
406
0
    return Status::OK();
407
0
}
408
409
// Explicit template instantiation for UDF, UDAF and UDTF clients
410
template Status PythonServerManager::get_client<PythonUDFClient>(
411
        const PythonUDFMeta& func_meta, const PythonVersion& version,
412
        std::shared_ptr<PythonUDFClient>* client,
413
        const std::shared_ptr<arrow::Schema>& data_schema);
414
415
template Status PythonServerManager::get_client<PythonUDAFClient>(
416
        const PythonUDFMeta& func_meta, const PythonVersion& version,
417
        std::shared_ptr<PythonUDAFClient>* client,
418
        const std::shared_ptr<arrow::Schema>& data_schema);
419
420
template Status PythonServerManager::get_client<PythonUDTFClient>(
421
        const PythonUDFMeta& func_meta, const PythonVersion& version,
422
        std::shared_ptr<PythonUDTFClient>* client,
423
        const std::shared_ptr<arrow::Schema>& data_schema);
424
425
} // namespace doris