Coverage Report

Created: 2026-03-13 03:47

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/user_function_cache.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/user_function_cache.h"
19
20
// IWYU pragma: no_include <bthread/errno.h>
21
#include <errno.h> // IWYU pragma: keep
22
#include <glog/logging.h>
23
#include <minizip/unzip.h>
24
#include <stdio.h>
25
#include <string.h>
26
#include <unistd.h>
27
28
#include <atomic>
29
#include <cstdint>
30
#include <memory>
31
#include <ostream>
32
#include <regex>
33
#include <utility>
34
#include <vector>
35
36
#include "cloud/config.h"
37
#include "common/config.h"
38
#include "common/factory_creator.h"
39
#include "common/status.h"
40
#include "io/fs/file_system.h"
41
#include "io/fs/local_file_system.h"
42
#include "runtime/exec_env.h"
43
#include "runtime/plugin/cloud_plugin_downloader.h"
44
#include "service/http/http_client.h"
45
#include "udf/python/python_server.h"
46
#include "util/defer_op.h"
47
#include "util/dynamic_util.h"
48
#include "util/md5.h"
49
#include "util/string_util.h"
50
51
namespace doris {
52
53
static const int kLibShardNum = 128;
54
55
// function cache entry, store information for
56
struct UserFunctionCacheEntry {
57
    ENABLE_FACTORY_CREATOR(UserFunctionCacheEntry);
58
    UserFunctionCacheEntry(int64_t fid_, const std::string& checksum_, const std::string& lib_file_,
59
                           LibType type)
60
515
            : function_id(fid_), checksum(checksum_), lib_file(lib_file_), type(type) {}
61
    ~UserFunctionCacheEntry();
62
63
0
    std::string debug_string() {
64
0
        fmt::memory_buffer error_msg;
65
0
        fmt::format_to(error_msg,
66
0
                       " the info of UserFunctionCacheEntry save in BE, function_id:{}, "
67
0
                       "checksum:{}, lib_file:{}, is_downloaded:{}. ",
68
0
                       function_id, checksum, lib_file, is_downloaded);
69
0
        return fmt::to_string(error_msg);
70
0
    }
71
72
    int64_t function_id = 0;
73
    // used to check if this library is valid.
74
    std::string checksum;
75
76
    // library file
77
    std::string lib_file;
78
79
    // make it atomic variable instead of holding a lock
80
    std::atomic<bool> is_loaded {false};
81
82
    // Set to true when this library is not needed.
83
    // e.g. deleting some unused library to re
84
    std::atomic<bool> should_delete_library {false};
85
86
    // lock to make sure only one can load this cache
87
    std::mutex load_lock;
88
89
    // To reduce cache lock held time, cache entry is
90
    // added to cache map before library is downloaded.
91
    // And this is used to indicate whether library is downloaded.
92
    bool is_downloaded = false;
93
94
    // Indicate if the zip file is unziped.
95
    bool is_unziped = false;
96
97
    // used to lookup a symbol
98
    void* lib_handle = nullptr;
99
100
    // from symbol_name to function pointer
101
    std::unordered_map<std::string, void*> fptr_map;
102
103
    LibType type;
104
};
105
106
32
UserFunctionCacheEntry::~UserFunctionCacheEntry() {
107
    // close lib_handle if it was opened
108
32
    if (lib_handle != nullptr) {
109
0
        dynamic_close(lib_handle);
110
0
        lib_handle = nullptr;
111
0
    }
112
113
    // delete library file if should_delete_library is set
114
32
    if (should_delete_library.load()) {
115
0
        WARN_IF_ERROR(
116
0
                io::global_local_filesystem()->delete_directory_or_file(lib_file),
117
0
                "failed to delete unzipped directory of python udf library, lib_file=" + lib_file);
118
119
0
        if (type == LibType::PY_ZIP) {
120
            // For Python UDF, we need to delete both the unzipped directory and the original zip file.
121
0
            std::string zip_file = lib_file + ".zip";
122
0
            WARN_IF_ERROR(io::global_local_filesystem()->delete_directory_or_file(zip_file),
123
0
                          "failed to delete zip file of python udf library, lib_file=" + zip_file);
124
0
        }
125
0
    }
126
32
}
127
128
34
UserFunctionCache::UserFunctionCache() = default;
129
130
30
UserFunctionCache::~UserFunctionCache() {
131
30
    std::lock_guard<std::mutex> l(_cache_lock);
132
30
    auto it = _entry_map.begin();
133
62
    while (it != _entry_map.end()) {
134
32
        auto entry = it->second;
135
32
        it = _entry_map.erase(it);
136
32
    }
137
30
}
138
139
7.41k
UserFunctionCache* UserFunctionCache::instance() {
140
7.41k
    return ExecEnv::GetInstance()->user_function_cache();
141
7.41k
}
142
143
7
Status UserFunctionCache::init(const std::string& lib_dir) {
144
7
#ifndef BE_TEST
145
    // _lib_dir may be reused between unit tests
146
7
    DCHECK(_lib_dir.empty()) << _lib_dir;
147
7
#endif
148
7
    _lib_dir = lib_dir;
149
    // 1. dynamic open current process
150
7
    RETURN_IF_ERROR(dynamic_open(nullptr, &_current_process_handle));
151
    // 2. load all cached
152
7
    RETURN_IF_ERROR(_load_cached_lib());
153
7
    return Status::OK();
154
7
}
155
156
65
Status UserFunctionCache::_load_entry_from_lib(const std::string& dir, const std::string& file) {
157
65
    LibType lib_type;
158
65
    if (ends_with(file, ".so")) {
159
0
        lib_type = LibType::SO;
160
65
    } else if (ends_with(file, ".jar")) {
161
62
        lib_type = LibType::JAR;
162
62
    } else if (ends_with(file, ".zip") && _check_cache_is_python_udf(dir, file)) {
163
1
        lib_type = LibType::PY_ZIP;
164
2
    } else {
165
2
        return Status::InternalError(
166
2
                "unknown library file format. the file type is not end with xxx.jar or xxx.so"
167
2
                " or xxx.zip : " +
168
2
                file);
169
2
    }
170
171
63
    std::vector<std::string> split_parts = _split_string_by_checksum(file);
172
63
    if (split_parts.size() != 3 && split_parts.size() != 4) {
173
0
        return Status::InternalError(
174
0
                "user function's name should be function_id.checksum[.file_name].file_type, now "
175
0
                "the all split parts are by delimiter(.): " +
176
0
                file);
177
0
    }
178
63
    int64_t function_id = std::stol(split_parts[0]);
179
63
    std::string checksum = split_parts[1];
180
63
    auto it = _entry_map.find(function_id);
181
63
    if (it != _entry_map.end()) {
182
0
        LOG(WARNING) << "meet a same function id user function library, function_id=" << function_id
183
0
                     << ", one_checksum=" << checksum
184
0
                     << ", other_checksum info: = " << it->second->debug_string();
185
0
        return Status::InternalError("duplicate function id");
186
0
    }
187
188
63
    std::string full_path = dir + "/" + file;
189
    // create a cache entry and put it into entry map
190
63
    std::shared_ptr<UserFunctionCacheEntry> entry =
191
63
            UserFunctionCacheEntry::create_shared(function_id, checksum, full_path, lib_type);
192
63
    entry->is_downloaded = true;
193
194
    // For Python UDF, _check_cache_is_python_udf has already unzipped the file.
195
    // Set lib_file to the unzipped directory.
196
63
    if (lib_type == LibType::PY_ZIP) {
197
1
        entry->lib_file = full_path.substr(0, full_path.size() - 4);
198
1
        entry->is_unziped = true;
199
1
    }
200
201
63
    _entry_map[function_id] = entry;
202
203
63
    return Status::OK();
204
63
}
205
206
7
Status UserFunctionCache::_load_cached_lib() {
207
    // create library directory if not exist
208
7
    RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(_lib_dir));
209
210
903
    for (int i = 0; i < kLibShardNum; ++i) {
211
896
        std::string sub_dir = _lib_dir + "/" + std::to_string(i);
212
896
        RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(sub_dir));
213
214
896
        auto scan_cb = [this, &sub_dir](const io::FileInfo& file) {
215
62
            if (!file.is_file) {
216
0
                return true;
217
0
            }
218
62
            auto st = _load_entry_from_lib(sub_dir, file.file_name);
219
62
            if (!st.ok()) {
220
0
                LOG(WARNING) << "load a library failed, dir=" << sub_dir
221
0
                             << ", file=" << file.file_name << ": " << st.to_string();
222
0
            }
223
62
            return true;
224
62
        };
225
896
        RETURN_IF_ERROR(io::global_local_filesystem()->iterate_directory(sub_dir, scan_cb));
226
896
    }
227
7
    return Status::OK();
228
7
}
229
230
Status UserFunctionCache::_get_cache_entry(int64_t fid, const std::string& url,
231
                                           const std::string& checksum,
232
                                           std::shared_ptr<UserFunctionCacheEntry>& output_entry,
233
7.25k
                                           LibType type) {
234
7.25k
    std::shared_ptr<UserFunctionCacheEntry> entry = nullptr;
235
7.25k
    std::string file_name = _get_file_name_from_url(url);
236
7.25k
    {
237
7.25k
        std::lock_guard<std::mutex> l(_cache_lock);
238
7.25k
        auto it = _entry_map.find(fid);
239
7.25k
        if (it != _entry_map.end()) {
240
6.87k
            entry = it->second;
241
6.87k
        } else {
242
385
            entry = UserFunctionCacheEntry::create_shared(
243
385
                    fid, checksum, _make_lib_file(fid, checksum, type, file_name), type);
244
385
            _entry_map.emplace(fid, entry);
245
385
        }
246
7.25k
    }
247
7.25k
    auto st = _load_cache_entry(url, entry);
248
7.25k
    if (!st.ok()) {
249
0
        LOG(WARNING) << "fail to load cache entry, fid=" << fid << " " << file_name << " " << url;
250
        // if we load a cache entry failed, I think we should delete this entry cache
251
        // even if this cache was valid before.
252
0
        _destroy_cache_entry(entry);
253
0
        return st;
254
0
    }
255
256
7.25k
    output_entry = entry;
257
7.25k
    return Status::OK();
258
7.25k
}
259
260
0
void UserFunctionCache::_destroy_cache_entry(std::shared_ptr<UserFunctionCacheEntry> entry) {
261
    // 1. we remove cache entry from entry map
262
0
    std::lock_guard<std::mutex> l(_cache_lock);
263
    // set should delete flag to true, so that the jar file will be removed when
264
    // the entry is removed from map, and deconstruct method is called.
265
0
    entry->should_delete_library.store(true);
266
0
    _entry_map.erase(entry->function_id);
267
0
}
268
269
Status UserFunctionCache::_load_cache_entry(const std::string& url,
270
7.32k
                                            std::shared_ptr<UserFunctionCacheEntry> entry) {
271
7.32k
    if (entry->is_loaded.load()) {
272
0
        return Status::OK();
273
0
    }
274
275
7.32k
    std::unique_lock<std::mutex> l(entry->load_lock);
276
7.32k
    if (!entry->is_downloaded) {
277
452
        RETURN_IF_ERROR(_download_lib(url, entry));
278
452
    }
279
280
7.32k
    if (!entry->is_unziped && entry->type == LibType::PY_ZIP) {
281
233
        RETURN_IF_ERROR(_unzip_lib(entry->lib_file));
282
233
        entry->lib_file = entry->lib_file.substr(0, entry->lib_file.size() - 4);
283
233
        entry->is_unziped = true;
284
233
    }
285
286
7.32k
    if (entry->type == LibType::SO) {
287
0
        RETURN_IF_ERROR(_load_cache_entry_internal(entry));
288
7.32k
    } else if (entry->type != LibType::JAR && entry->type != LibType::PY_ZIP) {
289
0
        return Status::InvalidArgument(
290
0
                "Unsupported lib type! Make sure your lib type is one of 'so' and 'jar' and "
291
0
                "python 'zip'!");
292
0
    }
293
7.32k
    return Status::OK();
294
7.32k
}
295
296
Status UserFunctionCache::_check_cache_is_python_udf(const std::string& dir,
297
7
                                                     const std::string& file) {
298
7
    const std::string& full_path = dir + "/" + file;
299
7
    RETURN_IF_ERROR(_unzip_lib(full_path));
300
5
    std::string unzip_dir = full_path.substr(0, full_path.size() - 4);
301
302
5
    bool has_python_file = false;
303
304
7
    auto scan_cb = [&has_python_file](const io::FileInfo& file) {
305
7
        if (file.is_file && ends_with(file.file_name, ".py")) {
306
2
            has_python_file = true;
307
2
            return false; // Stop iteration once we find a Python file
308
2
        }
309
5
        return true;
310
7
    };
311
5
    RETURN_IF_ERROR(io::global_local_filesystem()->iterate_directory(unzip_dir, scan_cb));
312
5
    if (!has_python_file) {
313
3
        return Status::InternalError("No Python file found in the unzipped directory.");
314
3
    }
315
2
    return Status::OK();
316
5
}
317
318
249
Status UserFunctionCache::_unzip_lib(const std::string& zip_file) {
319
249
    std::string unzip_dir = zip_file.substr(0, zip_file.size() - 4);
320
249
    RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(unzip_dir));
321
322
249
    unzFile zip_file_handle = unzOpen(zip_file.c_str());
323
249
    if (zip_file_handle == nullptr) {
324
5
        return Status::InternalError("Failed to open zip file: " + zip_file);
325
5
    }
326
327
244
    Defer defer([&] { unzClose(zip_file_handle); });
328
329
244
    unz_global_info global_info;
330
244
    if (unzGetGlobalInfo(zip_file_handle, &global_info) != UNZ_OK) {
331
0
        return Status::InternalError("Failed to get global info from zip file: " + zip_file);
332
0
    }
333
334
1.87k
    for (uLong i = 0; i < global_info.number_entry; ++i) {
335
1.63k
        unz_file_info file_info;
336
1.63k
        char filename[256];
337
1.63k
        if (unzGetCurrentFileInfo(zip_file_handle, &file_info, filename, sizeof(filename), nullptr,
338
1.63k
                                  0, nullptr, 0) != UNZ_OK) {
339
0
            return Status::InternalError("Failed to get file info from zip file: " + zip_file);
340
0
        }
341
342
1.63k
        if (std::string(filename).find("__MACOSX") != std::string::npos) {
343
3
            if ((i + 1) < global_info.number_entry) {
344
3
                if (unzGoToNextFile(zip_file_handle) != UNZ_OK) {
345
0
                    return Status::InternalError("Failed to go to next file in zip: " + zip_file);
346
0
                }
347
3
            }
348
3
            continue;
349
3
        }
350
351
1.62k
        std::string full_filename = unzip_dir + "/" + filename;
352
1.62k
        if (full_filename.length() > PATH_MAX) {
353
0
            return Status::InternalError(
354
0
                    fmt::format("File path {}... is too long, maximum path length is {}",
355
0
                                full_filename.substr(0, 50), PATH_MAX));
356
0
        }
357
358
1.62k
        if (filename[strlen(filename) - 1] == '/') {
359
88
            RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(full_filename));
360
1.54k
        } else {
361
1.54k
            if (unzOpenCurrentFile(zip_file_handle) != UNZ_OK) {
362
0
                return Status::InternalError("Failed to open file in zip: " +
363
0
                                             std::string(filename));
364
0
            }
365
366
1.54k
            FILE* out = fopen(full_filename.c_str(), "wb");
367
1.54k
            if (out == nullptr) {
368
0
                unzCloseCurrentFile(zip_file_handle);
369
0
                return Status::InternalError("Failed to create file: " + full_filename);
370
0
            }
371
1.54k
            char buffer[8192];
372
1.54k
            int bytes_read;
373
3.52k
            while ((bytes_read = unzReadCurrentFile(zip_file_handle, buffer, sizeof(buffer))) > 0) {
374
1.98k
                fwrite(buffer, bytes_read, 1, out);
375
1.98k
            }
376
1.54k
            fclose(out);
377
1.54k
            unzCloseCurrentFile(zip_file_handle);
378
1.54k
            if (bytes_read < 0) {
379
0
                return Status::InternalError("Failed to read file in zip: " +
380
0
                                             std::string(filename));
381
0
            }
382
1.54k
        }
383
384
1.62k
        if ((i + 1) < global_info.number_entry) {
385
1.38k
            if (unzGoToNextFile(zip_file_handle) != UNZ_OK) {
386
0
                return Status::InternalError("Failed to go to next file in zip: " + zip_file);
387
0
            }
388
1.38k
        }
389
1.62k
    }
390
391
244
    return Status::OK();
392
244
}
393
394
// entry's lock must be held
395
Status UserFunctionCache::_download_lib(const std::string& url,
396
452
                                        std::shared_ptr<UserFunctionCacheEntry> entry) {
397
452
    DCHECK(!entry->is_downloaded);
398
399
    // get local path to save library
400
452
    std::string tmp_file = entry->lib_file + ".tmp";
401
452
    auto fp_closer = [](FILE* fp) { fclose(fp); };
402
452
    std::unique_ptr<FILE, decltype(fp_closer)> fp(fopen(tmp_file.c_str(), "w"), fp_closer);
403
452
    if (fp == nullptr) {
404
0
        LOG(WARNING) << "fail to open file, file=" << tmp_file;
405
0
        return Status::InternalError("fail to open file");
406
0
    }
407
408
452
    std::string real_url;
409
452
    RETURN_IF_ERROR(_get_real_url(url, &real_url));
410
452
    Md5Digest digest;
411
452
    HttpClient client;
412
452
    int64_t file_size = 0;
413
452
    RETURN_IF_ERROR(client.init(real_url));
414
452
    Status status;
415
452
    auto download_cb = [&status, &tmp_file, &fp, &digest, &file_size](const void* data,
416
2.04k
                                                                      size_t length) {
417
2.04k
        digest.update(data, length);
418
2.04k
        file_size = file_size + length;
419
2.04k
        auto res = fwrite(data, length, 1, fp.get());
420
2.04k
        if (res != 1) {
421
0
            LOG(WARNING) << "fail to write data to file, file=" << tmp_file
422
0
                         << ", error=" << ferror(fp.get());
423
0
            status = Status::InternalError("fail to write data when download");
424
0
            return false;
425
0
        }
426
2.04k
        return true;
427
2.04k
    };
428
452
    RETURN_IF_ERROR(client.execute(download_cb));
429
452
    RETURN_IF_ERROR(status);
430
452
    digest.digest();
431
452
    if (!iequal(digest.hex(), entry->checksum)) {
432
0
        fmt::memory_buffer error_msg;
433
0
        fmt::format_to(error_msg,
434
0
                       " The checksum is not equal of {}. The init info of first create entry is:"
435
0
                       "{} But download file check_sum is: {}, file_size is: {}.",
436
0
                       url, entry->debug_string(), digest.hex(), file_size);
437
0
        std::string error(fmt::to_string(error_msg));
438
0
        LOG(WARNING) << error;
439
0
        return Status::InternalError(error);
440
0
    }
441
    // close this file
442
452
    fp.reset();
443
444
    // rename temporary file to library file
445
452
    auto ret = rename(tmp_file.c_str(), entry->lib_file.c_str());
446
452
    if (ret != 0) {
447
0
        char buf[64];
448
0
        LOG(WARNING) << "fail to rename file from=" << tmp_file << ", to=" << entry->lib_file
449
0
                     << ", errno=" << errno << ", errmsg=" << strerror_r(errno, buf, 64);
450
0
        return Status::InternalError("fail to rename file");
451
0
    }
452
453
    // check download
454
452
    entry->is_downloaded = true;
455
452
    return Status::OK();
456
452
}
457
458
7.26k
std::string UserFunctionCache::_get_file_name_from_url(const std::string& url) const {
459
7.26k
    std::string file_name;
460
7.26k
    size_t last_slash_pos = url.find_last_of('/');
461
7.29k
    if (last_slash_pos != std::string::npos) {
462
7.29k
        file_name = url.substr(last_slash_pos + 1, url.size());
463
18.4E
    } else {
464
18.4E
        file_name = url;
465
18.4E
    }
466
7.26k
    return file_name;
467
7.26k
}
468
469
// entry's lock must be held
470
Status UserFunctionCache::_load_cache_entry_internal(
471
0
        std::shared_ptr<UserFunctionCacheEntry> entry) {
472
0
    RETURN_IF_ERROR(dynamic_open(entry->lib_file.c_str(), &entry->lib_handle));
473
0
    entry->is_loaded.store(true);
474
0
    return Status::OK();
475
0
}
476
477
std::string UserFunctionCache::_make_lib_file(int64_t function_id, const std::string& checksum,
478
455
                                              LibType type, const std::string& file_name) {
479
455
    int shard = function_id % kLibShardNum;
480
455
    std::stringstream ss;
481
455
    ss << _lib_dir << '/' << shard << '/' << function_id << '.' << checksum;
482
455
    if (type == LibType::JAR) {
483
220
        ss << '.' << file_name;
484
235
    } else if (type == LibType::PY_ZIP) {
485
234
        ss << '.' << file_name;
486
234
    } else {
487
1
        ss << ".so";
488
1
    }
489
455
    return ss.str();
490
455
}
491
492
Status UserFunctionCache::get_jarpath(int64_t fid, const std::string& url,
493
5.71k
                                      const std::string& checksum, std::string* libpath) {
494
5.71k
    std::shared_ptr<UserFunctionCacheEntry> entry = nullptr;
495
5.71k
    RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, entry, LibType::JAR));
496
5.71k
    *libpath = entry->lib_file;
497
5.71k
    return Status::OK();
498
5.71k
}
499
500
Status UserFunctionCache::get_pypath(int64_t fid, const std::string& url,
501
1.54k
                                     const std::string& checksum, std::string* libpath) {
502
1.54k
    std::shared_ptr<UserFunctionCacheEntry> entry = nullptr;
503
1.54k
    RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, entry, LibType::PY_ZIP));
504
1.54k
    *libpath = entry->lib_file;
505
1.54k
    return Status::OK();
506
1.54k
}
507
508
64
std::vector<std::string> UserFunctionCache::_split_string_by_checksum(const std::string& file) {
509
64
    std::vector<std::string> result;
510
511
    // Find the first dot from the start
512
64
    size_t firstDot = file.find('.');
513
64
    if (firstDot == std::string::npos) return {};
514
515
    // Find the second dot starting from the first dot's position
516
64
    size_t secondDot = file.find('.', firstDot + 1);
517
64
    if (secondDot == std::string::npos) return {};
518
519
    // Find the last dot from the end
520
64
    size_t lastDot = file.rfind('.');
521
64
    if (lastDot == std::string::npos || lastDot <= secondDot) return {};
522
523
    // Split based on these dots
524
64
    result.push_back(file.substr(0, firstDot));
525
64
    result.push_back(file.substr(firstDot + 1, secondDot - firstDot - 1));
526
64
    result.push_back(file.substr(secondDot + 1, lastDot - secondDot - 1));
527
64
    result.push_back(file.substr(lastDot + 1));
528
529
64
    return result;
530
64
}
531
532
469
Status UserFunctionCache::_get_real_url(const std::string& url, std::string* result_url) {
533
469
    if (url.find(":/") == std::string::npos) {
534
8
        return _check_and_return_default_java_udf_url(url, result_url);
535
8
    }
536
461
    *result_url = url;
537
461
    return Status::OK();
538
469
}
539
540
Status UserFunctionCache::_check_and_return_default_java_udf_url(const std::string& url,
541
13
                                                                 std::string* result_url) {
542
13
    const char* doris_home = std::getenv("DORIS_HOME");
543
13
    std::string default_url = std::string(doris_home) + "/plugins/java_udf";
544
545
13
    std::filesystem::path file = default_url + "/" + url;
546
547
    // In cloud mode, always try cloud download first (prioritize cloud mode)
548
13
    if (config::is_cloud_mode()) {
549
0
        std::string target_path = default_url + "/" + url;
550
0
        std::string downloaded_path;
551
0
        Status status = CloudPluginDownloader::download_from_cloud(
552
0
                CloudPluginDownloader::PluginType::JAVA_UDF, url, target_path, &downloaded_path);
553
0
        if (status.ok() && !downloaded_path.empty()) {
554
0
            *result_url = "file://" + downloaded_path;
555
0
            return Status::OK();
556
0
        } else {
557
0
            LOG(WARNING) << "Failed to download Java UDF from cloud: " << status.to_string();
558
0
            return Status::RuntimeError(
559
0
                    "Cannot download Java UDF from cloud: {}. "
560
0
                    "Please retry later or check your UDF has been uploaded to cloud.",
561
0
                    url);
562
0
        }
563
0
    }
564
565
    // Return the file path regardless of whether it exists (original UDF behavior)
566
13
    *result_url = "file://" + default_url + "/" + url;
567
13
    return Status::OK();
568
13
}
569
570
0
void UserFunctionCache::drop_function_cache(int64_t fid) {
571
0
    std::shared_ptr<UserFunctionCacheEntry> entry = nullptr;
572
0
    {
573
0
        std::lock_guard<std::mutex> l(_cache_lock);
574
0
        auto it = _entry_map.find(fid);
575
0
        if (it == _entry_map.end()) {
576
0
            return;
577
0
        }
578
0
        entry = it->second;
579
0
        _entry_map.erase(it);
580
0
    }
581
582
    // For Python UDF, clear module cache in Python server before deleting files
583
0
    if (entry->type == LibType::PY_ZIP && !entry->lib_file.empty()) {
584
0
        auto status = PythonServerManager::instance().clear_module_cache(entry->lib_file);
585
0
        if (!status.ok()) [[unlikely]] {
586
0
            LOG(WARNING) << "drop_function_cache: failed to clear Python module cache for "
587
0
                         << entry->lib_file << ": " << status.to_string();
588
0
        }
589
0
    }
590
591
    // Mark for deletion, destructor will delete the files
592
0
    entry->should_delete_library.store(true);
593
0
}
594
595
} // namespace doris