Coverage Report

Created: 2026-05-09 04:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/udf/python/python_server.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "udf/python/python_server.h"
19
20
#include <arrow/type_fwd.h>
21
#include <butil/fd_utility.h>
22
#include <dirent.h>
23
#include <fmt/core.h>
24
#include <sys/poll.h>
25
#include <sys/stat.h>
26
27
#include <boost/asio.hpp>
28
#include <boost/process.hpp>
29
#include <chrono>
30
#include <fstream>
31
#include <future>
32
#include <thread>
33
34
#include "arrow/flight/client.h"
35
#include "common/config.h"
36
#include "udf/python/python_udaf_client.h"
37
#include "udf/python/python_udf_client.h"
38
#include "udf/python/python_udtf_client.h"
39
#include "util/cpu_info.h"
40
41
namespace doris {
42
43
std::shared_ptr<PythonServerManager::VersionedProcessPool>
44
5.41k
PythonServerManager::_get_or_create_process_pool(const PythonVersion& version) {
45
5.41k
    std::lock_guard<std::mutex> lock(_pools_mutex);
46
5.41k
    auto& pool = _process_pools[version];
47
5.41k
    if (!pool) {
48
19
        pool = std::make_shared<VersionedProcessPool>();
49
19
    }
50
5.41k
    return pool;
51
5.41k
}
52
53
std::vector<std::pair<PythonVersion, std::shared_ptr<PythonServerManager::VersionedProcessPool>>>
54
161
PythonServerManager::_snapshot_process_pools() {
55
161
    std::lock_guard<std::mutex> lock(_pools_mutex);
56
161
    std::vector<std::pair<PythonVersion, std::shared_ptr<VersionedProcessPool>>> snapshot;
57
161
    snapshot.reserve(_process_pools.size());
58
161
    for (const auto& [version, pool] : _process_pools) {
59
128
        snapshot.emplace_back(version, pool);
60
128
    }
61
161
    return snapshot;
62
161
}
63
64
#ifdef BE_TEST
65
void PythonServerManager::set_process_pool_for_test(const PythonVersion& version,
66
                                                    std::vector<ProcessPtr> processes,
67
                                                    bool initialized) {
68
    auto versioned_pool = _get_or_create_process_pool(version);
69
    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
70
    versioned_pool->processes = std::move(processes);
71
    versioned_pool->initialized = initialized;
72
}
73
74
std::vector<ProcessPtr>& PythonServerManager::process_pool_for_test(const PythonVersion& version) {
75
    auto versioned_pool = _get_or_create_process_pool(version);
76
    return versioned_pool->processes;
77
}
78
#endif
79
80
template <typename ClientType>
81
Status PythonServerManager::get_client(const PythonUDFMeta& func_meta, const PythonVersion& version,
82
                                       std::shared_ptr<ClientType>* client,
83
5.39k
                                       const std::shared_ptr<arrow::Schema>& data_schema) {
84
5.39k
    std::shared_ptr<VersionedProcessPool> versioned_pool =
85
5.39k
            DORIS_TRY(_ensure_pool_initialized(version));
86
87
5.39k
    ProcessPtr process;
88
5.39k
    RETURN_IF_ERROR(_get_process(version, versioned_pool, &process));
89
90
5.39k
    if constexpr (std::is_same_v<ClientType, PythonUDAFClient>) {
91
2.73k
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), data_schema, client));
92
2.73k
    } else {
93
2.65k
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), client));
94
2.65k
    }
95
96
5.39k
    return Status::OK();
97
5.39k
}
_ZN5doris19PythonServerManager10get_clientINS_15PythonUDFClientEEENS_6StatusERKNS_13PythonUDFMetaERKNS_13PythonVersionEPSt10shared_ptrIT_ERKSA_IN5arrow6SchemaEE
Line
Count
Source
83
1.30k
                                       const std::shared_ptr<arrow::Schema>& data_schema) {
84
1.30k
    std::shared_ptr<VersionedProcessPool> versioned_pool =
85
1.30k
            DORIS_TRY(_ensure_pool_initialized(version));
86
87
1.30k
    ProcessPtr process;
88
1.30k
    RETURN_IF_ERROR(_get_process(version, versioned_pool, &process));
89
90
    if constexpr (std::is_same_v<ClientType, PythonUDAFClient>) {
91
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), data_schema, client));
92
1.30k
    } else {
93
1.30k
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), client));
94
1.30k
    }
95
96
1.30k
    return Status::OK();
97
1.30k
}
_ZN5doris19PythonServerManager10get_clientINS_16PythonUDAFClientEEENS_6StatusERKNS_13PythonUDFMetaERKNS_13PythonVersionEPSt10shared_ptrIT_ERKSA_IN5arrow6SchemaEE
Line
Count
Source
83
2.73k
                                       const std::shared_ptr<arrow::Schema>& data_schema) {
84
2.73k
    std::shared_ptr<VersionedProcessPool> versioned_pool =
85
2.73k
            DORIS_TRY(_ensure_pool_initialized(version));
86
87
2.73k
    ProcessPtr process;
88
2.73k
    RETURN_IF_ERROR(_get_process(version, versioned_pool, &process));
89
90
2.73k
    if constexpr (std::is_same_v<ClientType, PythonUDAFClient>) {
91
2.73k
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), data_schema, client));
92
    } else {
93
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), client));
94
    }
95
96
2.73k
    return Status::OK();
97
2.73k
}
_ZN5doris19PythonServerManager10get_clientINS_16PythonUDTFClientEEENS_6StatusERKNS_13PythonUDFMetaERKNS_13PythonVersionEPSt10shared_ptrIT_ERKSA_IN5arrow6SchemaEE
Line
Count
Source
83
1.35k
                                       const std::shared_ptr<arrow::Schema>& data_schema) {
84
1.35k
    std::shared_ptr<VersionedProcessPool> versioned_pool =
85
1.35k
            DORIS_TRY(_ensure_pool_initialized(version));
86
87
1.35k
    ProcessPtr process;
88
1.35k
    RETURN_IF_ERROR(_get_process(version, versioned_pool, &process));
89
90
    if constexpr (std::is_same_v<ClientType, PythonUDAFClient>) {
91
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), data_schema, client));
92
1.35k
    } else {
93
1.35k
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), client));
94
1.35k
    }
95
96
1.35k
    return Status::OK();
97
1.35k
}
98
99
Result<std::shared_ptr<PythonServerManager::VersionedProcessPool>>
100
5.41k
PythonServerManager::_ensure_pool_initialized(const PythonVersion& version) {
101
5.41k
    auto versioned_pool = _get_or_create_process_pool(version);
102
5.41k
    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
103
104
    // Check if already initialized
105
5.41k
    if (versioned_pool->initialized) return versioned_pool;
106
107
    // 0 means use CPU core count as default, otherwise use the specified value
108
12
    int max_pool_size = config::max_python_process_num > 0 ? config::max_python_process_num
109
18.4E
                                                           : CpuInfo::num_cores();
110
111
0
    LOG(INFO) << "Initializing Python process pool for version " << version.to_string() << " with "
112
0
              << max_pool_size
113
0
              << " processes (config::max_python_process_num=" << config::max_python_process_num
114
0
              << ", CPU cores=" << CpuInfo::num_cores() << ")";
115
116
0
    std::vector<std::future<Status>> futures;
117
0
    std::vector<ProcessPtr> temp_processes(max_pool_size);
118
119
65
    for (int i = 0; i < max_pool_size; i++) {
120
65
        futures.push_back(std::async(std::launch::async, [this, &version, i, &temp_processes]() {
121
64
            ProcessPtr process;
122
64
            Status s = fork(version, &process);
123
64
            if (s.ok()) {
124
29
                temp_processes[i] = std::move(process);
125
29
            }
126
64
            return s;
127
64
        }));
128
65
    }
129
130
0
    int success_count = 0;
131
0
    int failure_count = 0;
132
0
    const auto init_start_time = std::chrono::steady_clock::now();
133
#ifdef BE_TEST
134
    constexpr auto progress_log_interval = std::chrono::milliseconds(50);
135
#else
136
0
    constexpr auto progress_log_interval = std::chrono::seconds(20);
137
0
#endif
138
65
    for (int i = 0; i < max_pool_size; i++) {
139
        // Print init log every 20s until the current slot is ready.
140
154
        while (futures[i].wait_for(progress_log_interval) != std::future_status::ready) {
141
89
            const auto now = std::chrono::steady_clock::now();
142
89
            const auto total_elapsed_ms =
143
89
                    std::chrono::duration_cast<std::chrono::milliseconds>(now - init_start_time)
144
89
                            .count();
145
89
            LOG(INFO) << "Python process pool initialization progress for version "
146
89
                      << version.to_string() << ": waiting_slot=" << (i + 1) << "/" << max_pool_size
147
89
                      << ", success=" << success_count << ", failed=" << failure_count
148
89
                      << ", elapsed_ms=" << total_elapsed_ms;
149
89
        }
150
151
65
        Status s = futures[i].get();
152
65
        if (s.ok() && temp_processes[i]) {
153
29
            versioned_pool->processes.emplace_back(std::move(temp_processes[i]));
154
29
            success_count++;
155
36
        } else {
156
36
            failure_count++;
157
36
            LOG(WARNING) << "Failed to create Python process " << (i + 1) << "/" << max_pool_size
158
36
                         << ": " << s.to_string();
159
36
        }
160
65
    }
161
162
3
    if (versioned_pool->processes.empty()) {
163
3
        return ResultError(Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
164
3
                "Failed to initialize Python process pool: all {} process creation attempts failed",
165
3
                max_pool_size));
166
3
    }
167
168
18.4E
    const auto total_elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
169
18.4E
                                          std::chrono::steady_clock::now() - init_start_time)
170
18.4E
                                          .count();
171
18.4E
    LOG(INFO) << "Python process pool initialized for version " << version.to_string()
172
18.4E
              << ": created " << success_count << " processes"
173
18.4E
              << (failure_count > 0 ? fmt::format(" ({} failed)", failure_count) : "")
174
18.4E
              << ", elapsed_ms=" << total_elapsed_ms;
175
176
18.4E
    versioned_pool->initialized = true;
177
18.4E
    _start_health_check_thread();
178
179
18.4E
    return versioned_pool;
180
0
}
181
182
Status PythonServerManager::_get_process(
183
        const PythonVersion& version, const std::shared_ptr<VersionedProcessPool>& versioned_pool,
184
5.41k
        ProcessPtr* process) {
185
5.41k
    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
186
5.41k
    std::vector<ProcessPtr>& pool = versioned_pool->processes;
187
188
5.41k
    if (UNLIKELY(pool.empty())) {
189
1
        return Status::InternalError("Python process pool is empty for version {}",
190
1
                                     version.to_string());
191
1
    }
192
193
    // Prefer an already-alive process and only use load balancing inside that alive subset.
194
    // keep dead entries stay in the pool for the background health checker
195
    // unless there is no alive process left for the current request.
196
5.41k
    auto min_alive_iter = std::min_element(pool.begin(), pool.end(),
197
81.1k
                                           [](const ProcessPtr& a, const ProcessPtr& b) {
198
81.1k
                                               const bool a_alive = a && a->is_alive();
199
81.1k
                                               const bool b_alive = b && b->is_alive();
200
81.1k
                                               if (a_alive != b_alive) {
201
1
                                                   return a_alive > b_alive;
202
1
                                               }
203
81.1k
                                               return a.use_count() < b.use_count();
204
81.1k
                                           });
205
206
5.41k
    if (min_alive_iter != pool.end() && *min_alive_iter && (*min_alive_iter)->is_alive()) {
207
5.41k
        *process = *min_alive_iter;
208
5.41k
        return Status::OK();
209
5.41k
    }
210
211
    // Only reach here when the pool has no alive process at all. Try one foreground
212
    // recovery so the caller has a chance to proceed; leave batch repair to health check.
213
18.4E
    auto& candidate = pool.front();
214
18.4E
    ProcessPtr replacement;
215
18.4E
    Status status = fork(version, &replacement);
216
18.4E
    if (!status.ok()) {
217
0
        return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
218
0
                "Python process pool has no available process for version {}, reason: {}",
219
0
                version.to_string(), status.to_string());
220
0
    }
221
222
18.4E
    candidate = std::move(replacement);
223
18.4E
    *process = candidate;
224
18.4E
    return Status::OK();
225
18.4E
}
226
227
80
Status PythonServerManager::fork(const PythonVersion& version, ProcessPtr* process) {
228
80
    std::string python_executable_path = version.get_executable_path();
229
80
    std::string fight_server_path = get_fight_server_path();
230
80
    std::string base_unix_socket_path = get_base_unix_socket_path();
231
80
    std::vector<std::string> args = {"-u", fight_server_path, base_unix_socket_path};
232
80
    boost::process::environment env = boost::this_process::environment();
233
80
    boost::process::ipstream child_output;
234
235
80
    try {
236
80
        boost::process::child c(
237
80
                python_executable_path, args, boost::process::std_out > child_output,
238
80
                boost::process::env = env,
239
80
                boost::process::on_exit([](int exit_code, const std::error_code& ec) {
240
0
                    if (ec) {
241
0
                        LOG(WARNING) << "Python UDF server exited with error: " << ec.message();
242
0
                    }
243
0
                }));
244
245
        // Wait for socket file to be created (indicates server is ready)
246
80
        std::string expected_socket_path = get_unix_socket_file_path(c.id());
247
80
        bool started_successfully = false;
248
80
        std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();
249
80
        const auto timeout = std::chrono::milliseconds(5000);
250
251
3.79k
        while (std::chrono::steady_clock::now() - start < timeout) {
252
3.75k
            struct stat buffer;
253
3.75k
            if (stat(expected_socket_path.c_str(), &buffer) == 0) {
254
38
                started_successfully = true;
255
38
                break;
256
38
            }
257
258
3.72k
            if (!c.running()) {
259
2
                break;
260
2
            }
261
3.71k
            std::this_thread::sleep_for(std::chrono::milliseconds(1));
262
3.71k
        }
263
264
80
        if (!started_successfully) {
265
2
            if (c.running()) {
266
0
                c.terminate();
267
0
                c.wait();
268
0
            }
269
2
            return Status::InternalError("Python server start failed: socket file not found at {}",
270
2
                                         expected_socket_path);
271
2
        }
272
273
78
        *process = std::make_shared<PythonUDFProcess>(std::move(c), std::move(child_output));
274
275
78
    } catch (const std::exception& e) {
276
40
        return Status::InternalError("Failed to start Python UDF server: {}", e.what());
277
40
    }
278
279
38
    return Status::OK();
280
80
}
281
282
12
void PythonServerManager::_start_health_check_thread() {
283
12
    std::lock_guard<std::mutex> lock(_health_check_mutex);
284
12
    if (_health_check_thread) return;
285
286
12
    LOG(INFO) << "Starting Python process health check thread (interval: 30 seconds)";
287
288
10
    _health_check_thread = std::make_unique<std::thread>([this]() {
289
        // Health check loop
290
65
        while (!_shutdown_flag.load(std::memory_order_acquire)) {
291
            // Wait for interval or shutdown signal
292
59
            {
293
59
                std::unique_lock<std::mutex> lock(_health_check_mutex);
294
117
                _health_check_cv.wait_for(lock, std::chrono::seconds(30), [this]() {
295
117
                    return _shutdown_flag.load(std::memory_order_acquire);
296
117
                });
297
59
            }
298
299
59
            if (_shutdown_flag.load(std::memory_order_acquire)) break;
300
301
55
            _check_and_recreate_processes();
302
55
            _refresh_memory_stats();
303
55
        }
304
305
10
        LOG(INFO) << "Python process health check thread exiting";
306
10
    });
307
10
}
308
309
56
void PythonServerManager::_check_and_recreate_processes() {
310
56
    int total_checked = 0;
311
56
    int total_dead = 0;
312
56
    int total_recreated = 0;
313
314
56
    for (auto& [version, versioned_pool] : _snapshot_process_pools()) {
315
56
        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
316
56
        auto& pool = versioned_pool->processes;
317
925
        for (size_t i = 0; i < pool.size(); ++i) {
318
869
            auto& process = pool[i];
319
869
            if (!process) continue;
320
321
868
            total_checked++;
322
868
            if (!process->is_alive()) {
323
3
                total_dead++;
324
3
                LOG(WARNING) << "Detected dead Python process (pid=" << process->get_child_pid()
325
3
                             << ", version=" << version.to_string() << "), recreating...";
326
327
3
                ProcessPtr new_process;
328
3
                Status s = fork(version, &new_process);
329
3
                if (s.ok()) {
330
1
                    pool[i] = std::move(new_process);
331
1
                    total_recreated++;
332
1
                    LOG(INFO) << "Successfully recreated Python process for version "
333
1
                              << version.to_string();
334
2
                } else {
335
2
                    LOG(ERROR) << "Failed to recreate Python process for version "
336
2
                               << version.to_string() << ": " << s.to_string();
337
2
                    pool.erase(pool.begin() + i);
338
2
                    --i;
339
2
                }
340
3
            }
341
868
        }
342
56
    }
343
344
56
    if (total_dead > 0) {
345
2
        LOG(INFO) << "Health check completed: checked=" << total_checked << ", dead=" << total_dead
346
2
                  << ", recreated=" << total_recreated;
347
2
    }
348
56
}
349
350
51
void PythonServerManager::shutdown() {
351
    // Signal health check thread to stop
352
51
    _shutdown_flag.store(true, std::memory_order_release);
353
51
    _health_check_cv.notify_one();
354
355
51
    if (_health_check_thread && _health_check_thread->joinable()) {
356
9
        _health_check_thread->join();
357
9
        _health_check_thread.reset();
358
9
    }
359
360
    // Shutdown all processes
361
51
    for (auto& [version, versioned_pool] : _snapshot_process_pools()) {
362
18
        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
363
18
        auto& pool = versioned_pool->processes;
364
18
        for (auto& process : pool) {
365
18
            if (process) {
366
17
                process->shutdown();
367
17
            }
368
18
        }
369
18
        pool.clear();
370
18
        versioned_pool->initialized = false;
371
18
    }
372
373
51
    {
374
51
        std::lock_guard<std::mutex> lock(_pools_mutex);
375
51
        _process_pools.clear();
376
51
    }
377
51
}
378
379
864
Status PythonServerManager::_read_process_memory(pid_t pid, size_t* rss_bytes) {
380
    // Read from /proc/{pid}/statm
381
    // Format: size resident shared text lib data dt
382
864
    std::string statm_path = fmt::format("/proc/{}/statm", pid);
383
864
    std::ifstream statm_file(statm_path);
384
385
864
    if (!statm_file.is_open()) {
386
0
        return Status::InternalError("Cannot open {}", statm_path);
387
0
    }
388
389
864
    size_t size_pages = 0, rss_pages = 0;
390
    // we only care about RSS, read and ignore the total size field
391
864
    statm_file >> size_pages >> rss_pages;
392
393
864
    if (statm_file.fail()) {
394
0
        return Status::InternalError("Failed to read {}", statm_path);
395
0
    }
396
397
    // Convert pages to bytes
398
864
    long page_size = sysconf(_SC_PAGESIZE);
399
864
    *rss_bytes = rss_pages * page_size;
400
401
864
    return Status::OK();
402
864
}
403
404
54
void PythonServerManager::_refresh_memory_stats() {
405
54
    int64_t total_rss = 0;
406
407
54
    for (const auto& [version, versioned_pool] : _snapshot_process_pools()) {
408
54
        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
409
54
        const auto& pool = versioned_pool->processes;
410
864
        for (const auto& process : pool) {
411
864
            if (!process || !process->is_alive()) continue;
412
413
864
            size_t rss_bytes = 0;
414
864
            Status s = _read_process_memory(process->get_child_pid(), &rss_bytes);
415
416
864
            if (s.ok()) {
417
864
                total_rss += rss_bytes;
418
864
            } else [[unlikely]] {
419
0
                LOG(WARNING) << "Failed to read memory info for Python process (pid="
420
0
                             << process->get_child_pid() << "): " << s.to_string();
421
0
            }
422
864
        }
423
54
    }
424
54
    _mem_tracker.set_consumption(total_rss);
425
54
    LOG(INFO) << _mem_tracker.log_usage();
426
427
54
    if (config::python_udf_processes_memory_limit_bytes > 0 &&
428
54
        total_rss > config::python_udf_processes_memory_limit_bytes) {
429
0
        LOG(WARNING) << "Python UDF process memory usage exceeds limit: rss_bytes=" << total_rss
430
0
                     << ", limit_bytes=" << config::python_udf_processes_memory_limit_bytes;
431
0
    }
432
54
}
433
434
0
Status PythonServerManager::clear_module_cache(const std::string& location) {
435
0
    if (location.empty()) {
436
0
        return Status::InvalidArgument("Empty location for clear_module_cache");
437
0
    }
438
439
0
    std::string body = fmt::format(R"({{"location": "{}"}})", location);
440
441
0
    int success_count = 0;
442
0
    int fail_count = 0;
443
0
    bool has_active_process = false;
444
445
0
    for (auto& [version, versioned_pool] : _snapshot_process_pools()) {
446
0
        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
447
0
        auto& pool = versioned_pool->processes;
448
0
        for (auto& process : pool) {
449
0
            if (!process || !process->is_alive()) {
450
0
                continue;
451
0
            }
452
0
            has_active_process = true;
453
0
            try {
454
0
                auto loc_result = arrow::flight::Location::Parse(process->get_uri());
455
0
                if (!loc_result.ok()) [[unlikely]] {
456
0
                    fail_count++;
457
0
                    continue;
458
0
                }
459
460
0
                auto client_result = arrow::flight::FlightClient::Connect(*loc_result);
461
0
                if (!client_result.ok()) [[unlikely]] {
462
0
                    fail_count++;
463
0
                    continue;
464
0
                }
465
0
                auto client = std::move(*client_result);
466
467
0
                arrow::flight::Action action;
468
0
                action.type = "clear_module_cache";
469
0
                action.body = arrow::Buffer::FromString(body);
470
471
0
                auto result_stream = client->DoAction(action);
472
0
                if (!result_stream.ok()) {
473
0
                    fail_count++;
474
0
                    continue;
475
0
                }
476
477
0
                auto result = (*result_stream)->Next();
478
0
                if (result.ok() && *result) {
479
0
                    success_count++;
480
0
                } else {
481
0
                    fail_count++;
482
0
                }
483
484
0
            } catch (...) {
485
0
                fail_count++;
486
0
            }
487
0
        }
488
0
    }
489
490
0
    if (!has_active_process) {
491
0
        return Status::OK();
492
0
    }
493
494
0
    LOG(INFO) << "clear_module_cache completed for location=" << location
495
0
              << ", success=" << success_count << ", failed=" << fail_count;
496
497
0
    if (fail_count > 0) {
498
0
        return Status::InternalError(
499
0
                "clear_module_cache failed for location={}, success={}, failed={}", location,
500
0
                success_count, fail_count);
501
0
    }
502
503
0
    return Status::OK();
504
0
}
505
506
// Explicit template instantiation for UDF, UDAF and UDTF clients
507
template Status PythonServerManager::get_client<PythonUDFClient>(
508
        const PythonUDFMeta& func_meta, const PythonVersion& version,
509
        std::shared_ptr<PythonUDFClient>* client,
510
        const std::shared_ptr<arrow::Schema>& data_schema);
511
512
template Status PythonServerManager::get_client<PythonUDAFClient>(
513
        const PythonUDFMeta& func_meta, const PythonVersion& version,
514
        std::shared_ptr<PythonUDAFClient>* client,
515
        const std::shared_ptr<arrow::Schema>& data_schema);
516
517
template Status PythonServerManager::get_client<PythonUDTFClient>(
518
        const PythonUDFMeta& func_meta, const PythonVersion& version,
519
        std::shared_ptr<PythonUDTFClient>* client,
520
        const std::shared_ptr<arrow::Schema>& data_schema);
521
522
} // namespace doris