Coverage Report

Created: 2026-03-12 02:33

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/user_function_cache.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/user_function_cache.h"
19
20
// IWYU pragma: no_include <bthread/errno.h>
21
#include <errno.h> // IWYU pragma: keep
22
#include <glog/logging.h>
23
#include <minizip/unzip.h>
24
#include <stdio.h>
25
#include <string.h>
26
#include <unistd.h>
27
28
#include <atomic>
29
#include <cstdint>
30
#include <memory>
31
#include <ostream>
32
#include <regex>
33
#include <utility>
34
#include <vector>
35
36
#include "cloud/config.h"
37
#include "common/config.h"
38
#include "common/factory_creator.h"
39
#include "common/status.h"
40
#include "io/fs/file_system.h"
41
#include "io/fs/local_file_system.h"
42
#include "runtime/exec_env.h"
43
#include "runtime/plugin/cloud_plugin_downloader.h"
44
#include "service/http/http_client.h"
45
#include "udf/python/python_server.h"
46
#include "util/defer_op.h"
47
#include "util/dynamic_util.h"
48
#include "util/md5.h"
49
#include "util/string_util.h"
50
51
namespace doris {
52
53
static const int kLibShardNum = 128;
54
55
// function cache entry, store information for
56
struct UserFunctionCacheEntry {
57
    ENABLE_FACTORY_CREATOR(UserFunctionCacheEntry);
58
    UserFunctionCacheEntry(int64_t fid_, const std::string& checksum_, const std::string& lib_file_,
59
                           LibType type)
60
1
            : function_id(fid_), checksum(checksum_), lib_file(lib_file_), type(type) {}
61
    ~UserFunctionCacheEntry();
62
63
0
    std::string debug_string() {
64
0
        fmt::memory_buffer error_msg;
65
0
        fmt::format_to(error_msg,
66
0
                       " the info of UserFunctionCacheEntry save in BE, function_id:{}, "
67
0
                       "checksum:{}, lib_file:{}, is_downloaded:{}. ",
68
0
                       function_id, checksum, lib_file, is_downloaded);
69
0
        return fmt::to_string(error_msg);
70
0
    }
71
72
    int64_t function_id = 0;
73
    // used to check if this library is valid.
74
    std::string checksum;
75
76
    // library file
77
    std::string lib_file;
78
79
    // make it atomic variable instead of holding a lock
80
    std::atomic<bool> is_loaded {false};
81
82
    // Set to true when this library is not needed.
83
    // e.g. deleting some unused library to re
84
    std::atomic<bool> should_delete_library {false};
85
86
    // lock to make sure only one can load this cache
87
    std::mutex load_lock;
88
89
    // To reduce cache lock held time, cache entry is
90
    // added to cache map before library is downloaded.
91
    // And this is used to indicate whether library is downloaded.
92
    bool is_downloaded = false;
93
94
    // Indicate if the zip file is unziped.
95
    bool is_unziped = false;
96
97
    // used to lookup a symbol
98
    void* lib_handle = nullptr;
99
100
    // from symbol_name to function pointer
101
    std::unordered_map<std::string, void*> fptr_map;
102
103
    LibType type;
104
};
105
106
1
UserFunctionCacheEntry::~UserFunctionCacheEntry() {
107
    // close lib_handle if it was opened
108
1
    if (lib_handle != nullptr) {
109
0
        dynamic_close(lib_handle);
110
0
        lib_handle = nullptr;
111
0
    }
112
113
    // delete library file if should_delete_library is set
114
1
    if (should_delete_library.load()) {
115
0
        WARN_IF_ERROR(
116
0
                io::global_local_filesystem()->delete_directory_or_file(lib_file),
117
0
                "failed to delete unzipped directory of python udf library, lib_file=" + lib_file);
118
119
0
        if (type == LibType::PY_ZIP) {
120
            // For Python UDF, we need to delete both the unzipped directory and the original zip file.
121
0
            std::string zip_file = lib_file + ".zip";
122
0
            WARN_IF_ERROR(io::global_local_filesystem()->delete_directory_or_file(zip_file),
123
0
                          "failed to delete zip file of python udf library, lib_file=" + zip_file);
124
0
        }
125
0
    }
126
1
}
127
128
27
UserFunctionCache::UserFunctionCache() = default;
129
130
27
UserFunctionCache::~UserFunctionCache() {
131
27
    std::lock_guard<std::mutex> l(_cache_lock);
132
27
    auto it = _entry_map.begin();
133
28
    while (it != _entry_map.end()) {
134
1
        auto entry = it->second;
135
1
        it = _entry_map.erase(it);
136
1
    }
137
27
}
138
139
0
UserFunctionCache* UserFunctionCache::instance() {
140
0
    return ExecEnv::GetInstance()->user_function_cache();
141
0
}
142
143
0
Status UserFunctionCache::init(const std::string& lib_dir) {
144
#ifndef BE_TEST
145
    // _lib_dir may be reused between unit tests
146
    DCHECK(_lib_dir.empty()) << _lib_dir;
147
#endif
148
0
    _lib_dir = lib_dir;
149
    // 1. dynamic open current process
150
0
    RETURN_IF_ERROR(dynamic_open(nullptr, &_current_process_handle));
151
    // 2. load all cached
152
0
    RETURN_IF_ERROR(_load_cached_lib());
153
0
    return Status::OK();
154
0
}
155
156
3
Status UserFunctionCache::_load_entry_from_lib(const std::string& dir, const std::string& file) {
157
3
    LibType lib_type;
158
3
    if (ends_with(file, ".so")) {
159
0
        lib_type = LibType::SO;
160
3
    } else if (ends_with(file, ".jar")) {
161
0
        lib_type = LibType::JAR;
162
3
    } else if (ends_with(file, ".zip") && _check_cache_is_python_udf(dir, file)) {
163
1
        lib_type = LibType::PY_ZIP;
164
2
    } else {
165
2
        return Status::InternalError(
166
2
                "unknown library file format. the file type is not end with xxx.jar or xxx.so"
167
2
                " or xxx.zip : " +
168
2
                file);
169
2
    }
170
171
1
    std::vector<std::string> split_parts = _split_string_by_checksum(file);
172
1
    if (split_parts.size() != 3 && split_parts.size() != 4) {
173
0
        return Status::InternalError(
174
0
                "user function's name should be function_id.checksum[.file_name].file_type, now "
175
0
                "the all split parts are by delimiter(.): " +
176
0
                file);
177
0
    }
178
1
    int64_t function_id = std::stol(split_parts[0]);
179
1
    std::string checksum = split_parts[1];
180
1
    auto it = _entry_map.find(function_id);
181
1
    if (it != _entry_map.end()) {
182
0
        LOG(WARNING) << "meet a same function id user function library, function_id=" << function_id
183
0
                     << ", one_checksum=" << checksum
184
0
                     << ", other_checksum info: = " << it->second->debug_string();
185
0
        return Status::InternalError("duplicate function id");
186
0
    }
187
188
1
    std::string full_path = dir + "/" + file;
189
    // create a cache entry and put it into entry map
190
1
    std::shared_ptr<UserFunctionCacheEntry> entry =
191
1
            UserFunctionCacheEntry::create_shared(function_id, checksum, full_path, lib_type);
192
1
    entry->is_downloaded = true;
193
194
    // For Python UDF, _check_cache_is_python_udf has already unzipped the file.
195
    // Set lib_file to the unzipped directory.
196
1
    if (lib_type == LibType::PY_ZIP) {
197
1
        entry->lib_file = full_path.substr(0, full_path.size() - 4);
198
1
        entry->is_unziped = true;
199
1
    }
200
201
1
    _entry_map[function_id] = entry;
202
203
1
    return Status::OK();
204
1
}
205
206
0
Status UserFunctionCache::_load_cached_lib() {
207
    // create library directory if not exist
208
0
    RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(_lib_dir));
209
210
0
    for (int i = 0; i < kLibShardNum; ++i) {
211
0
        std::string sub_dir = _lib_dir + "/" + std::to_string(i);
212
0
        RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(sub_dir));
213
214
0
        auto scan_cb = [this, &sub_dir](const io::FileInfo& file) {
215
0
            if (!file.is_file) {
216
0
                return true;
217
0
            }
218
0
            auto st = _load_entry_from_lib(sub_dir, file.file_name);
219
0
            if (!st.ok()) {
220
0
                LOG(WARNING) << "load a library failed, dir=" << sub_dir
221
0
                             << ", file=" << file.file_name << ": " << st.to_string();
222
0
            }
223
0
            return true;
224
0
        };
225
0
        RETURN_IF_ERROR(io::global_local_filesystem()->iterate_directory(sub_dir, scan_cb));
226
0
    }
227
0
    return Status::OK();
228
0
}
229
230
Status UserFunctionCache::_get_cache_entry(int64_t fid, const std::string& url,
231
                                           const std::string& checksum,
232
                                           std::shared_ptr<UserFunctionCacheEntry>& output_entry,
233
0
                                           LibType type) {
234
0
    std::shared_ptr<UserFunctionCacheEntry> entry = nullptr;
235
0
    std::string file_name = _get_file_name_from_url(url);
236
0
    {
237
0
        std::lock_guard<std::mutex> l(_cache_lock);
238
0
        auto it = _entry_map.find(fid);
239
0
        if (it != _entry_map.end()) {
240
0
            entry = it->second;
241
0
        } else {
242
0
            entry = UserFunctionCacheEntry::create_shared(
243
0
                    fid, checksum, _make_lib_file(fid, checksum, type, file_name), type);
244
0
            _entry_map.emplace(fid, entry);
245
0
        }
246
0
    }
247
0
    auto st = _load_cache_entry(url, entry);
248
0
    if (!st.ok()) {
249
0
        LOG(WARNING) << "fail to load cache entry, fid=" << fid << " " << file_name << " " << url;
250
        // if we load a cache entry failed, I think we should delete this entry cache
251
        // even if this cache was valid before.
252
0
        _destroy_cache_entry(entry);
253
0
        return st;
254
0
    }
255
256
0
    output_entry = entry;
257
0
    return Status::OK();
258
0
}
259
260
0
void UserFunctionCache::_destroy_cache_entry(std::shared_ptr<UserFunctionCacheEntry> entry) {
261
    // 1. we remove cache entry from entry map
262
0
    std::lock_guard<std::mutex> l(_cache_lock);
263
    // set should delete flag to true, so that the jar file will be removed when
264
    // the entry is removed from map, and deconstruct method is called.
265
0
    entry->should_delete_library.store(true);
266
0
    _entry_map.erase(entry->function_id);
267
0
}
268
269
Status UserFunctionCache::_load_cache_entry(const std::string& url,
270
0
                                            std::shared_ptr<UserFunctionCacheEntry> entry) {
271
0
    if (entry->is_loaded.load()) {
272
0
        return Status::OK();
273
0
    }
274
275
0
    std::unique_lock<std::mutex> l(entry->load_lock);
276
0
    if (!entry->is_downloaded) {
277
0
        RETURN_IF_ERROR(_download_lib(url, entry));
278
0
    }
279
280
0
    if (!entry->is_unziped && entry->type == LibType::PY_ZIP) {
281
0
        RETURN_IF_ERROR(_unzip_lib(entry->lib_file));
282
0
        entry->lib_file = entry->lib_file.substr(0, entry->lib_file.size() - 4);
283
0
        entry->is_unziped = true;
284
0
    }
285
286
0
    if (entry->type == LibType::SO) {
287
0
        RETURN_IF_ERROR(_load_cache_entry_internal(entry));
288
0
    } else if (entry->type != LibType::JAR && entry->type != LibType::PY_ZIP) {
289
0
        return Status::InvalidArgument(
290
0
                "Unsupported lib type! Make sure your lib type is one of 'so' and 'jar' and "
291
0
                "python 'zip'!");
292
0
    }
293
0
    return Status::OK();
294
0
}
295
296
Status UserFunctionCache::_check_cache_is_python_udf(const std::string& dir,
297
7
                                                     const std::string& file) {
298
7
    const std::string& full_path = dir + "/" + file;
299
7
    RETURN_IF_ERROR(_unzip_lib(full_path));
300
5
    std::string unzip_dir = full_path.substr(0, full_path.size() - 4);
301
302
5
    bool has_python_file = false;
303
304
7
    auto scan_cb = [&has_python_file](const io::FileInfo& file) {
305
7
        if (file.is_file && ends_with(file.file_name, ".py")) {
306
2
            has_python_file = true;
307
2
            return false; // Stop iteration once we find a Python file
308
2
        }
309
5
        return true;
310
7
    };
311
5
    RETURN_IF_ERROR(io::global_local_filesystem()->iterate_directory(unzip_dir, scan_cb));
312
5
    if (!has_python_file) {
313
3
        return Status::InternalError("No Python file found in the unzipped directory.");
314
3
    }
315
2
    return Status::OK();
316
5
}
317
318
16
Status UserFunctionCache::_unzip_lib(const std::string& zip_file) {
319
16
    std::string unzip_dir = zip_file.substr(0, zip_file.size() - 4);
320
16
    RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(unzip_dir));
321
322
16
    unzFile zip_file_handle = unzOpen(zip_file.c_str());
323
16
    if (zip_file_handle == nullptr) {
324
5
        return Status::InternalError("Failed to open zip file: " + zip_file);
325
5
    }
326
327
11
    Defer defer([&] { unzClose(zip_file_handle); });
328
329
11
    unz_global_info global_info;
330
11
    if (unzGetGlobalInfo(zip_file_handle, &global_info) != UNZ_OK) {
331
0
        return Status::InternalError("Failed to get global info from zip file: " + zip_file);
332
0
    }
333
334
38
    for (uLong i = 0; i < global_info.number_entry; ++i) {
335
27
        unz_file_info file_info;
336
27
        char filename[256];
337
27
        if (unzGetCurrentFileInfo(zip_file_handle, &file_info, filename, sizeof(filename), nullptr,
338
27
                                  0, nullptr, 0) != UNZ_OK) {
339
0
            return Status::InternalError("Failed to get file info from zip file: " + zip_file);
340
0
        }
341
342
27
        if (std::string(filename).find("__MACOSX") != std::string::npos) {
343
3
            if ((i + 1) < global_info.number_entry) {
344
3
                if (unzGoToNextFile(zip_file_handle) != UNZ_OK) {
345
0
                    return Status::InternalError("Failed to go to next file in zip: " + zip_file);
346
0
                }
347
3
            }
348
3
            continue;
349
3
        }
350
351
24
        std::string full_filename = unzip_dir + "/" + filename;
352
24
        if (full_filename.length() > PATH_MAX) {
353
0
            return Status::InternalError(
354
0
                    fmt::format("File path {}... is too long, maximum path length is {}",
355
0
                                full_filename.substr(0, 50), PATH_MAX));
356
0
        }
357
358
24
        if (filename[strlen(filename) - 1] == '/') {
359
3
            RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(full_filename));
360
21
        } else {
361
21
            if (unzOpenCurrentFile(zip_file_handle) != UNZ_OK) {
362
0
                return Status::InternalError("Failed to open file in zip: " +
363
0
                                             std::string(filename));
364
0
            }
365
366
21
            FILE* out = fopen(full_filename.c_str(), "wb");
367
21
            if (out == nullptr) {
368
0
                unzCloseCurrentFile(zip_file_handle);
369
0
                return Status::InternalError("Failed to create file: " + full_filename);
370
0
            }
371
21
            char buffer[8192];
372
21
            int bytes_read;
373
54
            while ((bytes_read = unzReadCurrentFile(zip_file_handle, buffer, sizeof(buffer))) > 0) {
374
33
                fwrite(buffer, bytes_read, 1, out);
375
33
            }
376
21
            fclose(out);
377
21
            unzCloseCurrentFile(zip_file_handle);
378
21
            if (bytes_read < 0) {
379
0
                return Status::InternalError("Failed to read file in zip: " +
380
0
                                             std::string(filename));
381
0
            }
382
21
        }
383
384
24
        if ((i + 1) < global_info.number_entry) {
385
14
            if (unzGoToNextFile(zip_file_handle) != UNZ_OK) {
386
0
                return Status::InternalError("Failed to go to next file in zip: " + zip_file);
387
0
            }
388
14
        }
389
24
    }
390
391
11
    return Status::OK();
392
11
}
393
394
// entry's lock must be held
395
Status UserFunctionCache::_download_lib(const std::string& url,
396
0
                                        std::shared_ptr<UserFunctionCacheEntry> entry) {
397
0
    DCHECK(!entry->is_downloaded);
398
399
    // get local path to save library
400
0
    std::string tmp_file = entry->lib_file + ".tmp";
401
0
    auto fp_closer = [](FILE* fp) { fclose(fp); };
402
0
    std::unique_ptr<FILE, decltype(fp_closer)> fp(fopen(tmp_file.c_str(), "w"), fp_closer);
403
0
    if (fp == nullptr) {
404
0
        LOG(WARNING) << "fail to open file, file=" << tmp_file;
405
0
        return Status::InternalError("fail to open file");
406
0
    }
407
408
0
    std::string real_url;
409
0
    RETURN_IF_ERROR(_get_real_url(url, &real_url));
410
0
    Md5Digest digest;
411
0
    HttpClient client;
412
0
    int64_t file_size = 0;
413
0
    RETURN_IF_ERROR(client.init(real_url));
414
0
    Status status;
415
0
    auto download_cb = [&status, &tmp_file, &fp, &digest, &file_size](const void* data,
416
0
                                                                      size_t length) {
417
0
        digest.update(data, length);
418
0
        file_size = file_size + length;
419
0
        auto res = fwrite(data, length, 1, fp.get());
420
0
        if (res != 1) {
421
0
            LOG(WARNING) << "fail to write data to file, file=" << tmp_file
422
0
                         << ", error=" << ferror(fp.get());
423
0
            status = Status::InternalError("fail to write data when download");
424
0
            return false;
425
0
        }
426
0
        return true;
427
0
    };
428
0
    RETURN_IF_ERROR(client.execute(download_cb));
429
0
    RETURN_IF_ERROR(status);
430
0
    digest.digest();
431
0
    if (!iequal(digest.hex(), entry->checksum)) {
432
0
        fmt::memory_buffer error_msg;
433
0
        fmt::format_to(error_msg,
434
0
                       " The checksum is not equal of {}. The init info of first create entry is:"
435
0
                       "{} But download file check_sum is: {}, file_size is: {}.",
436
0
                       url, entry->debug_string(), digest.hex(), file_size);
437
0
        std::string error(fmt::to_string(error_msg));
438
0
        LOG(WARNING) << error;
439
0
        return Status::InternalError(error);
440
0
    }
441
    // close this file
442
0
    fp.reset();
443
444
    // rename temporary file to library file
445
0
    auto ret = rename(tmp_file.c_str(), entry->lib_file.c_str());
446
0
    if (ret != 0) {
447
0
        char buf[64];
448
0
        LOG(WARNING) << "fail to rename file from=" << tmp_file << ", to=" << entry->lib_file
449
0
                     << ", errno=" << errno << ", errmsg=" << strerror_r(errno, buf, 64);
450
0
        return Status::InternalError("fail to rename file");
451
0
    }
452
453
    // check download
454
0
    entry->is_downloaded = true;
455
0
    return Status::OK();
456
0
}
457
458
0
std::string UserFunctionCache::_get_file_name_from_url(const std::string& url) const {
459
0
    std::string file_name;
460
0
    size_t last_slash_pos = url.find_last_of('/');
461
0
    if (last_slash_pos != std::string::npos) {
462
0
        file_name = url.substr(last_slash_pos + 1, url.size());
463
0
    } else {
464
0
        file_name = url;
465
0
    }
466
0
    return file_name;
467
0
}
468
469
// entry's lock must be held
470
Status UserFunctionCache::_load_cache_entry_internal(
471
0
        std::shared_ptr<UserFunctionCacheEntry> entry) {
472
0
    RETURN_IF_ERROR(dynamic_open(entry->lib_file.c_str(), &entry->lib_handle));
473
0
    entry->is_loaded.store(true);
474
0
    return Status::OK();
475
0
}
476
477
std::string UserFunctionCache::_make_lib_file(int64_t function_id, const std::string& checksum,
478
3
                                              LibType type, const std::string& file_name) {
479
3
    int shard = function_id % kLibShardNum;
480
3
    std::stringstream ss;
481
3
    ss << _lib_dir << '/' << shard << '/' << function_id << '.' << checksum;
482
3
    if (type == LibType::JAR) {
483
1
        ss << '.' << file_name;
484
2
    } else if (type == LibType::PY_ZIP) {
485
1
        ss << '.' << file_name;
486
1
    } else {
487
1
        ss << ".so";
488
1
    }
489
3
    return ss.str();
490
3
}
491
492
Status UserFunctionCache::get_jarpath(int64_t fid, const std::string& url,
493
0
                                      const std::string& checksum, std::string* libpath) {
494
0
    std::shared_ptr<UserFunctionCacheEntry> entry = nullptr;
495
0
    RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, entry, LibType::JAR));
496
0
    *libpath = entry->lib_file;
497
0
    return Status::OK();
498
0
}
499
500
Status UserFunctionCache::get_pypath(int64_t fid, const std::string& url,
501
0
                                     const std::string& checksum, std::string* libpath) {
502
0
    std::shared_ptr<UserFunctionCacheEntry> entry = nullptr;
503
0
    RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, entry, LibType::PY_ZIP));
504
0
    *libpath = entry->lib_file;
505
0
    return Status::OK();
506
0
}
507
508
2
std::vector<std::string> UserFunctionCache::_split_string_by_checksum(const std::string& file) {
509
2
    std::vector<std::string> result;
510
511
    // Find the first dot from the start
512
2
    size_t firstDot = file.find('.');
513
2
    if (firstDot == std::string::npos) return {};
514
515
    // Find the second dot starting from the first dot's position
516
2
    size_t secondDot = file.find('.', firstDot + 1);
517
2
    if (secondDot == std::string::npos) return {};
518
519
    // Find the last dot from the end
520
2
    size_t lastDot = file.rfind('.');
521
2
    if (lastDot == std::string::npos || lastDot <= secondDot) return {};
522
523
    // Split based on these dots
524
2
    result.push_back(file.substr(0, firstDot));
525
2
    result.push_back(file.substr(firstDot + 1, secondDot - firstDot - 1));
526
2
    result.push_back(file.substr(secondDot + 1, lastDot - secondDot - 1));
527
2
    result.push_back(file.substr(lastDot + 1));
528
529
2
    return result;
530
2
}
531
532
17
Status UserFunctionCache::_get_real_url(const std::string& url, std::string* result_url) {
533
17
    if (url.find(":/") == std::string::npos) {
534
8
        return _check_and_return_default_java_udf_url(url, result_url);
535
8
    }
536
9
    *result_url = url;
537
9
    return Status::OK();
538
17
}
539
540
Status UserFunctionCache::_check_and_return_default_java_udf_url(const std::string& url,
541
13
                                                                 std::string* result_url) {
542
13
    const char* doris_home = std::getenv("DORIS_HOME");
543
13
    std::string default_url = std::string(doris_home) + "/plugins/java_udf";
544
545
13
    std::filesystem::path file = default_url + "/" + url;
546
547
    // In cloud mode, always try cloud download first (prioritize cloud mode)
548
13
    if (config::is_cloud_mode()) {
549
0
        std::string target_path = default_url + "/" + url;
550
0
        std::string downloaded_path;
551
0
        Status status = CloudPluginDownloader::download_from_cloud(
552
0
                CloudPluginDownloader::PluginType::JAVA_UDF, url, target_path, &downloaded_path);
553
0
        if (status.ok() && !downloaded_path.empty()) {
554
0
            *result_url = "file://" + downloaded_path;
555
0
            return Status::OK();
556
0
        } else {
557
0
            LOG(WARNING) << "Failed to download Java UDF from cloud: " << status.to_string();
558
0
            return Status::RuntimeError(
559
0
                    "Cannot download Java UDF from cloud: {}. "
560
0
                    "Please retry later or check your UDF has been uploaded to cloud.",
561
0
                    url);
562
0
        }
563
0
    }
564
565
    // Return the file path regardless of whether it exists (original UDF behavior)
566
13
    *result_url = "file://" + default_url + "/" + url;
567
13
    return Status::OK();
568
13
}
569
570
0
void UserFunctionCache::drop_function_cache(int64_t fid) {
571
0
    std::shared_ptr<UserFunctionCacheEntry> entry = nullptr;
572
0
    {
573
0
        std::lock_guard<std::mutex> l(_cache_lock);
574
0
        auto it = _entry_map.find(fid);
575
0
        if (it == _entry_map.end()) {
576
0
            return;
577
0
        }
578
0
        entry = it->second;
579
0
        _entry_map.erase(it);
580
0
    }
581
582
    // For Python UDF, clear module cache in Python server before deleting files
583
0
    if (entry->type == LibType::PY_ZIP && !entry->lib_file.empty()) {
584
0
        auto status = PythonServerManager::instance().clear_module_cache(entry->lib_file);
585
0
        if (!status.ok()) [[unlikely]] {
586
0
            LOG(WARNING) << "drop_function_cache: failed to clear Python module cache for "
587
0
                         << entry->lib_file << ": " << status.to_string();
588
0
        }
589
0
    }
590
591
    // Mark for deletion, destructor will delete the files
592
0
    entry->should_delete_library.store(true);
593
0
}
594
595
} // namespace doris