Coverage Report

Created: 2026-05-12 23:26

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/udf/python/python_server.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "udf/python/python_server.h"
19
20
#include <arrow/type_fwd.h>
21
#include <butil/fd_utility.h>
22
#include <dirent.h>
23
#include <fmt/core.h>
24
#include <sys/poll.h>
25
#include <sys/stat.h>
26
27
#include <boost/asio.hpp>
28
#include <boost/process.hpp>
29
#include <chrono>
30
#include <fstream>
31
#include <future>
32
#include <thread>
33
34
#include "arrow/flight/client.h"
35
#include "common/config.h"
36
#include "common/status.h"
37
#include "udf/python/python_udaf_client.h"
38
#include "udf/python/python_udf_client.h"
39
#include "udf/python/python_udtf_client.h"
40
#include "util/cpu_info.h"
41
42
namespace doris {
43
44
std::shared_ptr<PythonServerManager::VersionedProcessPool>
45
29
PythonServerManager::_get_or_create_process_pool(const PythonVersion& version) {
46
29
    std::lock_guard<std::mutex> lock(_pools_mutex);
47
29
    auto& pool = _process_pools[version];
48
29
    if (!pool) {
49
19
        pool = std::make_shared<VersionedProcessPool>();
50
19
    }
51
29
    return pool;
52
29
}
53
54
std::vector<std::pair<PythonVersion, std::shared_ptr<PythonServerManager::VersionedProcessPool>>>
55
54
PythonServerManager::_snapshot_process_pools() {
56
54
    std::lock_guard<std::mutex> lock(_pools_mutex);
57
54
    std::vector<std::pair<PythonVersion, std::shared_ptr<VersionedProcessPool>>> snapshot;
58
54
    snapshot.reserve(_process_pools.size());
59
54
    for (const auto& [version, pool] : _process_pools) {
60
22
        snapshot.emplace_back(version, pool);
61
22
    }
62
54
    return snapshot;
63
54
}
64
65
#ifdef BE_TEST
66
void PythonServerManager::set_process_pool_for_test(const PythonVersion& version,
67
                                                    std::vector<ProcessPtr> processes,
68
5
                                                    bool initialized) {
69
5
    auto versioned_pool = _get_or_create_process_pool(version);
70
5
    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
71
5
    versioned_pool->processes = std::move(processes);
72
5
    versioned_pool->initialized = initialized;
73
5
}
74
75
7
std::vector<ProcessPtr>& PythonServerManager::process_pool_for_test(const PythonVersion& version) {
76
7
    auto versioned_pool = _get_or_create_process_pool(version);
77
7
    return versioned_pool->processes;
78
7
}
79
#endif
80
81
template <typename ClientType>
82
Status PythonServerManager::get_client(const PythonUDFMeta& func_meta, const PythonVersion& version,
83
                                       std::shared_ptr<ClientType>* client,
84
1
                                       const std::shared_ptr<arrow::Schema>& data_schema) {
85
1
    std::shared_ptr<VersionedProcessPool> versioned_pool =
86
1
            DORIS_TRY(_ensure_pool_initialized(version));
87
88
0
    ProcessPtr process;
89
0
    RETURN_IF_ERROR(_get_process(version, versioned_pool, &process));
90
91
0
    if constexpr (std::is_same_v<ClientType, PythonUDAFClient>) {
92
0
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), data_schema, client));
93
0
    } else {
94
0
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), client));
95
0
    }
96
97
0
    return Status::OK();
98
0
}
_ZN5doris19PythonServerManager10get_clientINS_15PythonUDFClientEEENS_6StatusERKNS_13PythonUDFMetaERKNS_13PythonVersionEPSt10shared_ptrIT_ERKSA_IN5arrow6SchemaEE
Line
Count
Source
84
1
                                       const std::shared_ptr<arrow::Schema>& data_schema) {
85
1
    std::shared_ptr<VersionedProcessPool> versioned_pool =
86
1
            DORIS_TRY(_ensure_pool_initialized(version));
87
88
0
    ProcessPtr process;
89
0
    RETURN_IF_ERROR(_get_process(version, versioned_pool, &process));
90
91
    if constexpr (std::is_same_v<ClientType, PythonUDAFClient>) {
92
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), data_schema, client));
93
0
    } else {
94
0
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), client));
95
0
    }
96
97
0
    return Status::OK();
98
0
}
Unexecuted instantiation: _ZN5doris19PythonServerManager10get_clientINS_16PythonUDAFClientEEENS_6StatusERKNS_13PythonUDFMetaERKNS_13PythonVersionEPSt10shared_ptrIT_ERKSA_IN5arrow6SchemaEE
Unexecuted instantiation: _ZN5doris19PythonServerManager10get_clientINS_16PythonUDTFClientEEENS_6StatusERKNS_13PythonUDFMetaERKNS_13PythonVersionEPSt10shared_ptrIT_ERKSA_IN5arrow6SchemaEE
99
100
Result<std::shared_ptr<PythonServerManager::VersionedProcessPool>>
101
17
PythonServerManager::_ensure_pool_initialized(const PythonVersion& version) {
102
17
    auto versioned_pool = _get_or_create_process_pool(version);
103
17
    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
104
105
    // Check if already initialized
106
17
    if (versioned_pool->initialized) return versioned_pool;
107
108
    // 0 means use CPU core count as default, otherwise use the specified value
109
14
    int max_pool_size = config::max_python_process_num > 0 ? config::max_python_process_num
110
14
                                                           : CpuInfo::num_cores();
111
112
14
    LOG(INFO) << "Initializing Python process pool for version " << version.to_string() << " with "
113
14
              << max_pool_size
114
14
              << " processes (config::max_python_process_num=" << config::max_python_process_num
115
14
              << ", CPU cores=" << CpuInfo::num_cores() << ")";
116
117
14
    std::vector<std::future<Status>> futures;
118
14
    std::vector<ProcessPtr> temp_processes(max_pool_size);
119
120
63
    for (int i = 0; i < max_pool_size; i++) {
121
49
        futures.push_back(std::async(std::launch::async, [this, &version, i, &temp_processes]() {
122
49
            ProcessPtr process;
123
49
            Status s = fork(version, &process);
124
49
            if (s.ok()) {
125
13
                temp_processes[i] = std::move(process);
126
13
            }
127
49
            return s;
128
49
        }));
129
49
    }
130
131
14
    int success_count = 0;
132
14
    int failure_count = 0;
133
14
    const auto init_start_time = std::chrono::steady_clock::now();
134
14
#ifdef BE_TEST
135
14
    constexpr auto progress_log_interval = std::chrono::milliseconds(50);
136
#else
137
    constexpr auto progress_log_interval = std::chrono::seconds(20);
138
#endif
139
63
    for (int i = 0; i < max_pool_size; i++) {
140
        // Print init log every 20s until the current slot is ready.
141
137
        while (futures[i].wait_for(progress_log_interval) != std::future_status::ready) {
142
88
            const auto now = std::chrono::steady_clock::now();
143
88
            const auto total_elapsed_ms =
144
88
                    std::chrono::duration_cast<std::chrono::milliseconds>(now - init_start_time)
145
88
                            .count();
146
88
            LOG(INFO) << "Python process pool initialization progress for version "
147
88
                      << version.to_string() << ": waiting_slot=" << (i + 1) << "/" << max_pool_size
148
88
                      << ", success=" << success_count << ", failed=" << failure_count
149
88
                      << ", elapsed_ms=" << total_elapsed_ms;
150
88
        }
151
152
49
        Status s = futures[i].get();
153
49
        if (s.ok() && temp_processes[i]) {
154
13
            versioned_pool->processes.emplace_back(std::move(temp_processes[i]));
155
13
            success_count++;
156
36
        } else {
157
36
            failure_count++;
158
36
            LOG(WARNING) << "Failed to create Python process " << (i + 1) << "/" << max_pool_size
159
36
                         << ": " << s.to_string();
160
36
        }
161
49
    }
162
163
14
    if (versioned_pool->processes.empty()) {
164
3
        return ResultError(Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
165
3
                "Failed to initialize Python process pool: all {} process creation attempts failed",
166
3
                max_pool_size));
167
3
    }
168
169
11
    const auto total_elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
170
11
                                          std::chrono::steady_clock::now() - init_start_time)
171
11
                                          .count();
172
11
    LOG(INFO) << "Python process pool initialized for version " << version.to_string()
173
11
              << ": created " << success_count << " processes"
174
11
              << (failure_count > 0 ? fmt::format(" ({} failed)", failure_count) : "")
175
11
              << ", elapsed_ms=" << total_elapsed_ms;
176
177
11
    versioned_pool->initialized = true;
178
11
    _start_health_check_thread();
179
180
11
    return versioned_pool;
181
14
}
182
183
Status PythonServerManager::_get_process(
184
        const PythonVersion& version, const std::shared_ptr<VersionedProcessPool>& versioned_pool,
185
12
        ProcessPtr* process) {
186
12
    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
187
12
    std::vector<ProcessPtr>& pool = versioned_pool->processes;
188
189
12
    if (UNLIKELY(pool.empty())) {
190
1
        return Status::InternalError("Python process pool is empty for version {}",
191
1
                                     version.to_string());
192
1
    }
193
194
    // Prefer an already-alive process and only use load balancing inside that alive subset.
195
    // keep dead entries stay in the pool for the background health checker
196
    // unless there is no alive process left for the current request.
197
11
    auto min_alive_iter = std::min_element(pool.begin(), pool.end(),
198
11
                                           [](const ProcessPtr& a, const ProcessPtr& b) {
199
6
                                               const bool a_alive = a && a->is_alive();
200
6
                                               const bool b_alive = b && b->is_alive();
201
6
                                               if (a_alive != b_alive) {
202
1
                                                   return a_alive > b_alive;
203
1
                                               }
204
5
                                               return a.use_count() < b.use_count();
205
6
                                           });
206
207
11
    if (min_alive_iter != pool.end() && *min_alive_iter && (*min_alive_iter)->is_alive()) {
208
10
        *process = *min_alive_iter;
209
10
        return Status::OK();
210
10
    }
211
212
    // Only reach here when the pool has no alive process at all. Try one foreground
213
    // recovery so the caller has a chance to proceed; leave batch repair to health check.
214
1
    auto& candidate = pool.front();
215
1
    ProcessPtr replacement;
216
1
    Status status = fork(version, &replacement);
217
1
    if (!status.ok()) {
218
0
        return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
219
0
                "Python process pool has no available process for version {}, reason: {}",
220
0
                version.to_string(), status.to_string());
221
0
    }
222
223
1
    candidate = std::move(replacement);
224
1
    *process = candidate;
225
1
    return Status::OK();
226
1
}
227
228
64
Status PythonServerManager::fork(const PythonVersion& version, ProcessPtr* process) {
229
64
    std::string python_executable_path = version.get_executable_path();
230
64
    std::string fight_server_path = get_fight_server_path();
231
64
    std::string base_unix_socket_path = get_base_unix_socket_path();
232
64
    std::vector<std::string> args = {"-u", fight_server_path, base_unix_socket_path};
233
64
    boost::process::environment env = boost::this_process::environment();
234
64
    boost::process::ipstream child_output;
235
236
64
    try {
237
64
        boost::process::child c(
238
64
                python_executable_path, args, boost::process::std_out > child_output,
239
64
                boost::process::env = env,
240
64
                boost::process::on_exit([](int exit_code, const std::error_code& ec) {
241
0
                    if (ec) {
242
0
                        LOG(WARNING) << "Python UDF server exited with error: " << ec.message();
243
0
                    }
244
0
                }));
245
246
        // Wait for socket file to be created (indicates server is ready)
247
64
        std::string expected_socket_path = get_unix_socket_file_path(c.id());
248
64
        bool started_successfully = false;
249
64
        std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();
250
64
        const auto timeout = std::chrono::milliseconds(5000);
251
252
2.55k
        while (std::chrono::steady_clock::now() - start < timeout) {
253
2.51k
            struct stat buffer;
254
2.51k
            if (stat(expected_socket_path.c_str(), &buffer) == 0) {
255
22
                started_successfully = true;
256
22
                break;
257
22
            }
258
259
2.49k
            if (!c.running()) {
260
2
                break;
261
2
            }
262
2.49k
            std::this_thread::sleep_for(std::chrono::milliseconds(1));
263
2.49k
        }
264
265
64
        if (!started_successfully) {
266
2
            if (c.running()) {
267
0
                c.terminate();
268
0
                c.wait();
269
0
            }
270
2
            return Status::InternalError("Python server start failed: socket file not found at {}",
271
2
                                         expected_socket_path);
272
2
        }
273
274
62
        *process = std::make_shared<PythonUDFProcess>(std::move(c), std::move(child_output));
275
276
62
    } catch (const std::exception& e) {
277
40
        return Status::InternalError("Failed to start Python UDF server: {}", e.what());
278
40
    }
279
280
22
    return Status::OK();
281
64
}
282
283
11
void PythonServerManager::_start_health_check_thread() {
284
11
    std::lock_guard<std::mutex> lock(_health_check_mutex);
285
11
    if (_health_check_thread) return;
286
287
11
    LOG(INFO) << "Starting Python process health check thread (interval: 30 seconds)";
288
289
9
    _health_check_thread = std::make_unique<std::thread>([this]() {
290
        // Health check loop
291
9
        while (!_shutdown_flag.load(std::memory_order_acquire)) {
292
            // Wait for interval or shutdown signal
293
4
            {
294
4
                std::unique_lock<std::mutex> lock(_health_check_mutex);
295
8
                _health_check_cv.wait_for(lock, std::chrono::seconds(30), [this]() {
296
8
                    return _shutdown_flag.load(std::memory_order_acquire);
297
8
                });
298
4
            }
299
300
4
            if (_shutdown_flag.load(std::memory_order_acquire)) break;
301
302
0
            _check_and_recreate_processes();
303
0
            _refresh_memory_stats();
304
0
        }
305
306
9
        LOG(INFO) << "Python process health check thread exiting";
307
9
    });
308
9
}
309
310
2
void PythonServerManager::_check_and_recreate_processes() {
311
2
    int total_checked = 0;
312
2
    int total_dead = 0;
313
2
    int total_recreated = 0;
314
315
2
    for (auto& [version, versioned_pool] : _snapshot_process_pools()) {
316
2
        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
317
2
        auto& pool = versioned_pool->processes;
318
7
        for (size_t i = 0; i < pool.size(); ++i) {
319
5
            auto& process = pool[i];
320
5
            if (!process) continue;
321
322
4
            total_checked++;
323
4
            if (!process->is_alive()) {
324
3
                total_dead++;
325
3
                LOG(WARNING) << "Detected dead Python process (pid=" << process->get_child_pid()
326
3
                             << ", version=" << version.to_string() << "), recreating...";
327
328
3
                ProcessPtr new_process;
329
3
                Status s = fork(version, &new_process);
330
3
                if (s.ok()) {
331
1
                    pool[i] = std::move(new_process);
332
1
                    total_recreated++;
333
1
                    LOG(INFO) << "Successfully recreated Python process for version "
334
1
                              << version.to_string();
335
2
                } else {
336
2
                    LOG(ERROR) << "Failed to recreate Python process for version "
337
2
                               << version.to_string() << ": " << s.to_string();
338
2
                    pool.erase(pool.begin() + i);
339
2
                    --i;
340
2
                }
341
3
            }
342
4
        }
343
2
    }
344
345
2
    if (total_dead > 0) {
346
2
        LOG(INFO) << "Health check completed: checked=" << total_checked << ", dead=" << total_dead
347
2
                  << ", recreated=" << total_recreated;
348
2
    }
349
2
}
350
351
49
void PythonServerManager::shutdown() {
352
    // Signal health check thread to stop
353
49
    _shutdown_flag.store(true, std::memory_order_release);
354
49
    _health_check_cv.notify_one();
355
356
49
    if (_health_check_thread && _health_check_thread->joinable()) {
357
9
        _health_check_thread->join();
358
9
        _health_check_thread.reset();
359
9
    }
360
361
    // Shutdown all processes
362
49
    for (auto& [version, versioned_pool] : _snapshot_process_pools()) {
363
19
        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
364
19
        auto& pool = versioned_pool->processes;
365
19
        for (auto& process : pool) {
366
19
            if (process) {
367
18
                process->shutdown();
368
18
            }
369
19
        }
370
19
        pool.clear();
371
19
        versioned_pool->initialized = false;
372
19
    }
373
374
49
    {
375
49
        std::lock_guard<std::mutex> lock(_pools_mutex);
376
49
        _process_pools.clear();
377
49
    }
378
49
}
379
380
0
Status PythonServerManager::_read_process_memory(pid_t pid, size_t* rss_bytes) {
381
    // Read from /proc/{pid}/statm
382
    // Format: size resident shared text lib data dt
383
0
    std::string statm_path = fmt::format("/proc/{}/statm", pid);
384
0
    std::ifstream statm_file(statm_path);
385
386
0
    if (!statm_file.is_open()) {
387
0
        return Status::InternalError("Cannot open {}", statm_path);
388
0
    }
389
390
0
    size_t size_pages = 0, rss_pages = 0;
391
    // we only care about RSS, read and ignore the total size field
392
0
    statm_file >> size_pages >> rss_pages;
393
394
0
    if (statm_file.fail()) {
395
0
        return Status::InternalError("Failed to read {}", statm_path);
396
0
    }
397
398
    // Convert pages to bytes
399
0
    long page_size = sysconf(_SC_PAGESIZE);
400
0
    *rss_bytes = rss_pages * page_size;
401
402
0
    return Status::OK();
403
0
}
404
405
0
void PythonServerManager::_refresh_memory_stats() {
406
0
    int64_t total_rss = 0;
407
408
0
    for (const auto& [version, versioned_pool] : _snapshot_process_pools()) {
409
0
        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
410
0
        const auto& pool = versioned_pool->processes;
411
0
        for (const auto& process : pool) {
412
0
            if (!process || !process->is_alive()) continue;
413
414
0
            size_t rss_bytes = 0;
415
0
            Status s = _read_process_memory(process->get_child_pid(), &rss_bytes);
416
417
0
            if (s.ok()) {
418
0
                total_rss += rss_bytes;
419
0
            } else [[unlikely]] {
420
0
                LOG(WARNING) << "Failed to read memory info for Python process (pid="
421
0
                             << process->get_child_pid() << "): " << s.to_string();
422
0
            }
423
0
        }
424
0
    }
425
0
    _mem_tracker.set_consumption(total_rss);
426
0
    LOG(INFO) << _mem_tracker.log_usage();
427
428
0
    if (config::python_udf_processes_memory_limit_bytes > 0 &&
429
0
        total_rss > config::python_udf_processes_memory_limit_bytes) {
430
0
        LOG(WARNING) << "Python UDF process memory usage exceeds limit: rss_bytes=" << total_rss
431
0
                     << ", limit_bytes=" << config::python_udf_processes_memory_limit_bytes;
432
0
    }
433
0
}
434
435
1
Status PythonServerManager::clear_module_cache(const std::string& location) {
436
1
    if (location.empty()) {
437
0
        return Status::InvalidArgument("Empty location for clear_module_cache");
438
0
    }
439
440
1
    std::string body = fmt::format(R"({{"location": "{}"}})", location);
441
1
    return _broadcast_action_to_processes("clear_module_cache", body,
442
1
                                          fmt::format("location={}", location));
443
1
}
444
445
1
void PythonServerManager::clear_udaf_state_cache(int64_t function_id) {
446
1
    std::string body = fmt::format(R"({{"function_id": {}}})", function_id);
447
1
    WARN_IF_ERROR(_broadcast_action_to_processes("clear_udaf_state_cache", body,
448
1
                                                 fmt::format("function_id={}", function_id)),
449
1
                  "failed to clear Python UDAF state cache");
450
1
}
451
452
Status PythonServerManager::_broadcast_action_to_processes(const std::string& action_type,
453
                                                           const std::string& body,
454
3
                                                           const std::string& log_name) {
455
3
    int success_count = 0;
456
3
    int fail_count = 0;
457
3
    bool has_active_process = false;
458
459
3
    for (auto& [version, versioned_pool] : _snapshot_process_pools()) {
460
1
        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
461
1
        auto& pool = versioned_pool->processes;
462
1
        for (auto& process : pool) {
463
1
            if (!process || !process->is_alive()) {
464
0
                continue;
465
0
            }
466
1
            has_active_process = true;
467
1
            try {
468
1
                auto loc_result = arrow::flight::Location::Parse(process->get_uri());
469
1
                if (!loc_result.ok()) [[unlikely]] {
470
1
                    fail_count++;
471
1
                    continue;
472
1
                }
473
474
0
                auto client_result = arrow::flight::FlightClient::Connect(*loc_result);
475
0
                if (!client_result.ok()) [[unlikely]] {
476
0
                    fail_count++;
477
0
                    continue;
478
0
                }
479
0
                auto client = std::move(*client_result);
480
481
0
                arrow::flight::Action action;
482
0
                action.type = action_type;
483
0
                action.body = arrow::Buffer::FromString(body);
484
485
0
                auto result_stream = client->DoAction(action);
486
0
                if (!result_stream.ok()) {
487
0
                    fail_count++;
488
0
                    continue;
489
0
                }
490
491
0
                auto result = (*result_stream)->Next();
492
0
                if (result.ok() && *result) {
493
0
                    success_count++;
494
0
                } else {
495
0
                    fail_count++;
496
0
                }
497
498
0
            } catch (...) {
499
0
                fail_count++;
500
0
            }
501
1
        }
502
1
    }
503
504
3
    if (!has_active_process) {
505
2
        return Status::OK();
506
2
    }
507
508
3
    LOG(INFO) << action_type << " completed for " << log_name << ", success=" << success_count
509
1
              << ", failed=" << fail_count;
510
511
1
    if (fail_count > 0) {
512
1
        return Status::InternalError("{} failed for {}, success={}, failed={}", action_type,
513
1
                                     log_name, success_count, fail_count);
514
1
    }
515
516
0
    return Status::OK();
517
1
}
518
519
// Explicit template instantiation for UDF, UDAF and UDTF clients
520
template Status PythonServerManager::get_client<PythonUDFClient>(
521
        const PythonUDFMeta& func_meta, const PythonVersion& version,
522
        std::shared_ptr<PythonUDFClient>* client,
523
        const std::shared_ptr<arrow::Schema>& data_schema);
524
525
template Status PythonServerManager::get_client<PythonUDAFClient>(
526
        const PythonUDFMeta& func_meta, const PythonVersion& version,
527
        std::shared_ptr<PythonUDAFClient>* client,
528
        const std::shared_ptr<arrow::Schema>& data_schema);
529
530
template Status PythonServerManager::get_client<PythonUDTFClient>(
531
        const PythonUDFMeta& func_meta, const PythonVersion& version,
532
        std::shared_ptr<PythonUDTFClient>* client,
533
        const std::shared_ptr<arrow::Schema>& data_schema);
534
535
} // namespace doris