be/src/runtime/user_function_cache.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "runtime/user_function_cache.h" |
19 | | |
20 | | // IWYU pragma: no_include <bthread/errno.h> |
21 | | #include <errno.h> // IWYU pragma: keep |
22 | | #include <glog/logging.h> |
23 | | #include <minizip/unzip.h> |
24 | | #include <stdio.h> |
25 | | #include <string.h> |
26 | | #include <unistd.h> |
27 | | |
28 | | #include <atomic> |
29 | | #include <cstdint> |
30 | | #include <memory> |
31 | | #include <ostream> |
32 | | #include <regex> |
33 | | #include <utility> |
34 | | #include <vector> |
35 | | |
36 | | #include "cloud/config.h" |
37 | | #include "common/config.h" |
38 | | #include "common/factory_creator.h" |
39 | | #include "common/status.h" |
40 | | #include "io/fs/file_system.h" |
41 | | #include "io/fs/local_file_system.h" |
42 | | #include "runtime/exec_env.h" |
43 | | #include "runtime/plugin/cloud_plugin_downloader.h" |
44 | | #include "service/http/http_client.h" |
45 | | #include "udf/python/python_server.h" |
46 | | #include "util/defer_op.h" |
47 | | #include "util/dynamic_util.h" |
48 | | #include "util/md5.h" |
49 | | #include "util/string_util.h" |
50 | | |
51 | | namespace doris { |
52 | | |
53 | | static const int kLibShardNum = 128; |
54 | | |
55 | | // function cache entry, store information for |
56 | | struct UserFunctionCacheEntry { |
57 | | ENABLE_FACTORY_CREATOR(UserFunctionCacheEntry); |
58 | | UserFunctionCacheEntry(int64_t fid_, const std::string& checksum_, const std::string& lib_file_, |
59 | | LibType type) |
60 | 515 | : function_id(fid_), checksum(checksum_), lib_file(lib_file_), type(type) {} |
61 | | ~UserFunctionCacheEntry(); |
62 | | |
63 | 0 | std::string debug_string() { |
64 | 0 | fmt::memory_buffer error_msg; |
65 | 0 | fmt::format_to(error_msg, |
66 | 0 | " the info of UserFunctionCacheEntry save in BE, function_id:{}, " |
67 | 0 | "checksum:{}, lib_file:{}, is_downloaded:{}. ", |
68 | 0 | function_id, checksum, lib_file, is_downloaded); |
69 | 0 | return fmt::to_string(error_msg); |
70 | 0 | } |
71 | | |
72 | | int64_t function_id = 0; |
73 | | // used to check if this library is valid. |
74 | | std::string checksum; |
75 | | |
76 | | // library file |
77 | | std::string lib_file; |
78 | | |
79 | | // make it atomic variable instead of holding a lock |
80 | | std::atomic<bool> is_loaded {false}; |
81 | | |
82 | | // Set to true when this library is not needed. |
83 | | // e.g. deleting some unused library to re |
84 | | std::atomic<bool> should_delete_library {false}; |
85 | | |
86 | | // lock to make sure only one can load this cache |
87 | | std::mutex load_lock; |
88 | | |
89 | | // To reduce cache lock held time, cache entry is |
90 | | // added to cache map before library is downloaded. |
91 | | // And this is used to indicate whether library is downloaded. |
92 | | bool is_downloaded = false; |
93 | | |
94 | | // Indicate if the zip file is unziped. |
95 | | bool is_unziped = false; |
96 | | |
97 | | // used to lookup a symbol |
98 | | void* lib_handle = nullptr; |
99 | | |
100 | | // from symbol_name to function pointer |
101 | | std::unordered_map<std::string, void*> fptr_map; |
102 | | |
103 | | LibType type; |
104 | | }; |
105 | | |
106 | 32 | UserFunctionCacheEntry::~UserFunctionCacheEntry() { |
107 | | // close lib_handle if it was opened |
108 | 32 | if (lib_handle != nullptr) { |
109 | 0 | dynamic_close(lib_handle); |
110 | 0 | lib_handle = nullptr; |
111 | 0 | } |
112 | | |
113 | | // delete library file if should_delete_library is set |
114 | 32 | if (should_delete_library.load()) { |
115 | 0 | WARN_IF_ERROR( |
116 | 0 | io::global_local_filesystem()->delete_directory_or_file(lib_file), |
117 | 0 | "failed to delete unzipped directory of python udf library, lib_file=" + lib_file); |
118 | |
|
119 | 0 | if (type == LibType::PY_ZIP) { |
120 | | // For Python UDF, we need to delete both the unzipped directory and the original zip file. |
121 | 0 | std::string zip_file = lib_file + ".zip"; |
122 | 0 | WARN_IF_ERROR(io::global_local_filesystem()->delete_directory_or_file(zip_file), |
123 | 0 | "failed to delete zip file of python udf library, lib_file=" + zip_file); |
124 | 0 | } |
125 | 0 | } |
126 | 32 | } |
127 | | |
128 | 34 | UserFunctionCache::UserFunctionCache() = default; |
129 | | |
130 | 30 | UserFunctionCache::~UserFunctionCache() { |
131 | 30 | std::lock_guard<std::mutex> l(_cache_lock); |
132 | 30 | auto it = _entry_map.begin(); |
133 | 62 | while (it != _entry_map.end()) { |
134 | 32 | auto entry = it->second; |
135 | 32 | it = _entry_map.erase(it); |
136 | 32 | } |
137 | 30 | } |
138 | | |
139 | 7.41k | UserFunctionCache* UserFunctionCache::instance() { |
140 | 7.41k | return ExecEnv::GetInstance()->user_function_cache(); |
141 | 7.41k | } |
142 | | |
143 | 7 | Status UserFunctionCache::init(const std::string& lib_dir) { |
144 | 7 | #ifndef BE_TEST |
145 | | // _lib_dir may be reused between unit tests |
146 | 7 | DCHECK(_lib_dir.empty()) << _lib_dir; |
147 | 7 | #endif |
148 | 7 | _lib_dir = lib_dir; |
149 | | // 1. dynamic open current process |
150 | 7 | RETURN_IF_ERROR(dynamic_open(nullptr, &_current_process_handle)); |
151 | | // 2. load all cached |
152 | 7 | RETURN_IF_ERROR(_load_cached_lib()); |
153 | 7 | return Status::OK(); |
154 | 7 | } |
155 | | |
156 | 65 | Status UserFunctionCache::_load_entry_from_lib(const std::string& dir, const std::string& file) { |
157 | 65 | LibType lib_type; |
158 | 65 | if (ends_with(file, ".so")) { |
159 | 0 | lib_type = LibType::SO; |
160 | 65 | } else if (ends_with(file, ".jar")) { |
161 | 62 | lib_type = LibType::JAR; |
162 | 62 | } else if (ends_with(file, ".zip") && _check_cache_is_python_udf(dir, file)) { |
163 | 1 | lib_type = LibType::PY_ZIP; |
164 | 2 | } else { |
165 | 2 | return Status::InternalError( |
166 | 2 | "unknown library file format. the file type is not end with xxx.jar or xxx.so" |
167 | 2 | " or xxx.zip : " + |
168 | 2 | file); |
169 | 2 | } |
170 | | |
171 | 63 | std::vector<std::string> split_parts = _split_string_by_checksum(file); |
172 | 63 | if (split_parts.size() != 3 && split_parts.size() != 4) { |
173 | 0 | return Status::InternalError( |
174 | 0 | "user function's name should be function_id.checksum[.file_name].file_type, now " |
175 | 0 | "the all split parts are by delimiter(.): " + |
176 | 0 | file); |
177 | 0 | } |
178 | 63 | int64_t function_id = std::stol(split_parts[0]); |
179 | 63 | std::string checksum = split_parts[1]; |
180 | 63 | auto it = _entry_map.find(function_id); |
181 | 63 | if (it != _entry_map.end()) { |
182 | 0 | LOG(WARNING) << "meet a same function id user function library, function_id=" << function_id |
183 | 0 | << ", one_checksum=" << checksum |
184 | 0 | << ", other_checksum info: = " << it->second->debug_string(); |
185 | 0 | return Status::InternalError("duplicate function id"); |
186 | 0 | } |
187 | | |
188 | 63 | std::string full_path = dir + "/" + file; |
189 | | // create a cache entry and put it into entry map |
190 | 63 | std::shared_ptr<UserFunctionCacheEntry> entry = |
191 | 63 | UserFunctionCacheEntry::create_shared(function_id, checksum, full_path, lib_type); |
192 | 63 | entry->is_downloaded = true; |
193 | | |
194 | | // For Python UDF, _check_cache_is_python_udf has already unzipped the file. |
195 | | // Set lib_file to the unzipped directory. |
196 | 63 | if (lib_type == LibType::PY_ZIP) { |
197 | 1 | entry->lib_file = full_path.substr(0, full_path.size() - 4); |
198 | 1 | entry->is_unziped = true; |
199 | 1 | } |
200 | | |
201 | 63 | _entry_map[function_id] = entry; |
202 | | |
203 | 63 | return Status::OK(); |
204 | 63 | } |
205 | | |
206 | 7 | Status UserFunctionCache::_load_cached_lib() { |
207 | | // create library directory if not exist |
208 | 7 | RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(_lib_dir)); |
209 | | |
210 | 903 | for (int i = 0; i < kLibShardNum; ++i) { |
211 | 896 | std::string sub_dir = _lib_dir + "/" + std::to_string(i); |
212 | 896 | RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(sub_dir)); |
213 | | |
214 | 896 | auto scan_cb = [this, &sub_dir](const io::FileInfo& file) { |
215 | 62 | if (!file.is_file) { |
216 | 0 | return true; |
217 | 0 | } |
218 | 62 | auto st = _load_entry_from_lib(sub_dir, file.file_name); |
219 | 62 | if (!st.ok()) { |
220 | 0 | LOG(WARNING) << "load a library failed, dir=" << sub_dir |
221 | 0 | << ", file=" << file.file_name << ": " << st.to_string(); |
222 | 0 | } |
223 | 62 | return true; |
224 | 62 | }; |
225 | 896 | RETURN_IF_ERROR(io::global_local_filesystem()->iterate_directory(sub_dir, scan_cb)); |
226 | 896 | } |
227 | 7 | return Status::OK(); |
228 | 7 | } |
229 | | |
230 | | Status UserFunctionCache::_get_cache_entry(int64_t fid, const std::string& url, |
231 | | const std::string& checksum, |
232 | | std::shared_ptr<UserFunctionCacheEntry>& output_entry, |
233 | 7.25k | LibType type) { |
234 | 7.25k | std::shared_ptr<UserFunctionCacheEntry> entry = nullptr; |
235 | 7.25k | std::string file_name = _get_file_name_from_url(url); |
236 | 7.25k | { |
237 | 7.25k | std::lock_guard<std::mutex> l(_cache_lock); |
238 | 7.25k | auto it = _entry_map.find(fid); |
239 | 7.25k | if (it != _entry_map.end()) { |
240 | 6.87k | entry = it->second; |
241 | 6.87k | } else { |
242 | 385 | entry = UserFunctionCacheEntry::create_shared( |
243 | 385 | fid, checksum, _make_lib_file(fid, checksum, type, file_name), type); |
244 | 385 | _entry_map.emplace(fid, entry); |
245 | 385 | } |
246 | 7.25k | } |
247 | 7.25k | auto st = _load_cache_entry(url, entry); |
248 | 7.25k | if (!st.ok()) { |
249 | 0 | LOG(WARNING) << "fail to load cache entry, fid=" << fid << " " << file_name << " " << url; |
250 | | // if we load a cache entry failed, I think we should delete this entry cache |
251 | | // even if this cache was valid before. |
252 | 0 | _destroy_cache_entry(entry); |
253 | 0 | return st; |
254 | 0 | } |
255 | | |
256 | 7.25k | output_entry = entry; |
257 | 7.25k | return Status::OK(); |
258 | 7.25k | } |
259 | | |
260 | 0 | void UserFunctionCache::_destroy_cache_entry(std::shared_ptr<UserFunctionCacheEntry> entry) { |
261 | | // 1. we remove cache entry from entry map |
262 | 0 | std::lock_guard<std::mutex> l(_cache_lock); |
263 | | // set should delete flag to true, so that the jar file will be removed when |
264 | | // the entry is removed from map, and deconstruct method is called. |
265 | 0 | entry->should_delete_library.store(true); |
266 | 0 | _entry_map.erase(entry->function_id); |
267 | 0 | } |
268 | | |
269 | | Status UserFunctionCache::_load_cache_entry(const std::string& url, |
270 | 7.32k | std::shared_ptr<UserFunctionCacheEntry> entry) { |
271 | 7.32k | if (entry->is_loaded.load()) { |
272 | 0 | return Status::OK(); |
273 | 0 | } |
274 | | |
275 | 7.32k | std::unique_lock<std::mutex> l(entry->load_lock); |
276 | 7.32k | if (!entry->is_downloaded) { |
277 | 452 | RETURN_IF_ERROR(_download_lib(url, entry)); |
278 | 452 | } |
279 | | |
280 | 7.32k | if (!entry->is_unziped && entry->type == LibType::PY_ZIP) { |
281 | 233 | RETURN_IF_ERROR(_unzip_lib(entry->lib_file)); |
282 | 233 | entry->lib_file = entry->lib_file.substr(0, entry->lib_file.size() - 4); |
283 | 233 | entry->is_unziped = true; |
284 | 233 | } |
285 | | |
286 | 7.32k | if (entry->type == LibType::SO) { |
287 | 0 | RETURN_IF_ERROR(_load_cache_entry_internal(entry)); |
288 | 7.32k | } else if (entry->type != LibType::JAR && entry->type != LibType::PY_ZIP) { |
289 | 0 | return Status::InvalidArgument( |
290 | 0 | "Unsupported lib type! Make sure your lib type is one of 'so' and 'jar' and " |
291 | 0 | "python 'zip'!"); |
292 | 0 | } |
293 | 7.32k | return Status::OK(); |
294 | 7.32k | } |
295 | | |
296 | | Status UserFunctionCache::_check_cache_is_python_udf(const std::string& dir, |
297 | 7 | const std::string& file) { |
298 | 7 | const std::string& full_path = dir + "/" + file; |
299 | 7 | RETURN_IF_ERROR(_unzip_lib(full_path)); |
300 | 5 | std::string unzip_dir = full_path.substr(0, full_path.size() - 4); |
301 | | |
302 | 5 | bool has_python_file = false; |
303 | | |
304 | 7 | auto scan_cb = [&has_python_file](const io::FileInfo& file) { |
305 | 7 | if (file.is_file && ends_with(file.file_name, ".py")) { |
306 | 2 | has_python_file = true; |
307 | 2 | return false; // Stop iteration once we find a Python file |
308 | 2 | } |
309 | 5 | return true; |
310 | 7 | }; |
311 | 5 | RETURN_IF_ERROR(io::global_local_filesystem()->iterate_directory(unzip_dir, scan_cb)); |
312 | 5 | if (!has_python_file) { |
313 | 3 | return Status::InternalError("No Python file found in the unzipped directory."); |
314 | 3 | } |
315 | 2 | return Status::OK(); |
316 | 5 | } |
317 | | |
318 | 249 | Status UserFunctionCache::_unzip_lib(const std::string& zip_file) { |
319 | 249 | std::string unzip_dir = zip_file.substr(0, zip_file.size() - 4); |
320 | 249 | RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(unzip_dir)); |
321 | | |
322 | 249 | unzFile zip_file_handle = unzOpen(zip_file.c_str()); |
323 | 249 | if (zip_file_handle == nullptr) { |
324 | 5 | return Status::InternalError("Failed to open zip file: " + zip_file); |
325 | 5 | } |
326 | | |
327 | 244 | Defer defer([&] { unzClose(zip_file_handle); }); |
328 | | |
329 | 244 | unz_global_info global_info; |
330 | 244 | if (unzGetGlobalInfo(zip_file_handle, &global_info) != UNZ_OK) { |
331 | 0 | return Status::InternalError("Failed to get global info from zip file: " + zip_file); |
332 | 0 | } |
333 | | |
334 | 1.87k | for (uLong i = 0; i < global_info.number_entry; ++i) { |
335 | 1.63k | unz_file_info file_info; |
336 | 1.63k | char filename[256]; |
337 | 1.63k | if (unzGetCurrentFileInfo(zip_file_handle, &file_info, filename, sizeof(filename), nullptr, |
338 | 1.63k | 0, nullptr, 0) != UNZ_OK) { |
339 | 0 | return Status::InternalError("Failed to get file info from zip file: " + zip_file); |
340 | 0 | } |
341 | | |
342 | 1.63k | if (std::string(filename).find("__MACOSX") != std::string::npos) { |
343 | 3 | if ((i + 1) < global_info.number_entry) { |
344 | 3 | if (unzGoToNextFile(zip_file_handle) != UNZ_OK) { |
345 | 0 | return Status::InternalError("Failed to go to next file in zip: " + zip_file); |
346 | 0 | } |
347 | 3 | } |
348 | 3 | continue; |
349 | 3 | } |
350 | | |
351 | 1.62k | std::string full_filename = unzip_dir + "/" + filename; |
352 | 1.62k | if (full_filename.length() > PATH_MAX) { |
353 | 0 | return Status::InternalError( |
354 | 0 | fmt::format("File path {}... is too long, maximum path length is {}", |
355 | 0 | full_filename.substr(0, 50), PATH_MAX)); |
356 | 0 | } |
357 | | |
358 | 1.62k | if (filename[strlen(filename) - 1] == '/') { |
359 | 88 | RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(full_filename)); |
360 | 1.54k | } else { |
361 | 1.54k | if (unzOpenCurrentFile(zip_file_handle) != UNZ_OK) { |
362 | 0 | return Status::InternalError("Failed to open file in zip: " + |
363 | 0 | std::string(filename)); |
364 | 0 | } |
365 | | |
366 | 1.54k | FILE* out = fopen(full_filename.c_str(), "wb"); |
367 | 1.54k | if (out == nullptr) { |
368 | 0 | unzCloseCurrentFile(zip_file_handle); |
369 | 0 | return Status::InternalError("Failed to create file: " + full_filename); |
370 | 0 | } |
371 | 1.54k | char buffer[8192]; |
372 | 1.54k | int bytes_read; |
373 | 3.52k | while ((bytes_read = unzReadCurrentFile(zip_file_handle, buffer, sizeof(buffer))) > 0) { |
374 | 1.98k | fwrite(buffer, bytes_read, 1, out); |
375 | 1.98k | } |
376 | 1.54k | fclose(out); |
377 | 1.54k | unzCloseCurrentFile(zip_file_handle); |
378 | 1.54k | if (bytes_read < 0) { |
379 | 0 | return Status::InternalError("Failed to read file in zip: " + |
380 | 0 | std::string(filename)); |
381 | 0 | } |
382 | 1.54k | } |
383 | | |
384 | 1.62k | if ((i + 1) < global_info.number_entry) { |
385 | 1.38k | if (unzGoToNextFile(zip_file_handle) != UNZ_OK) { |
386 | 0 | return Status::InternalError("Failed to go to next file in zip: " + zip_file); |
387 | 0 | } |
388 | 1.38k | } |
389 | 1.62k | } |
390 | | |
391 | 244 | return Status::OK(); |
392 | 244 | } |
393 | | |
394 | | // entry's lock must be held |
395 | | Status UserFunctionCache::_download_lib(const std::string& url, |
396 | 452 | std::shared_ptr<UserFunctionCacheEntry> entry) { |
397 | 452 | DCHECK(!entry->is_downloaded); |
398 | | |
399 | | // get local path to save library |
400 | 452 | std::string tmp_file = entry->lib_file + ".tmp"; |
401 | 452 | auto fp_closer = [](FILE* fp) { fclose(fp); }; |
402 | 452 | std::unique_ptr<FILE, decltype(fp_closer)> fp(fopen(tmp_file.c_str(), "w"), fp_closer); |
403 | 452 | if (fp == nullptr) { |
404 | 0 | LOG(WARNING) << "fail to open file, file=" << tmp_file; |
405 | 0 | return Status::InternalError("fail to open file"); |
406 | 0 | } |
407 | | |
408 | 452 | std::string real_url; |
409 | 452 | RETURN_IF_ERROR(_get_real_url(url, &real_url)); |
410 | 452 | Md5Digest digest; |
411 | 452 | HttpClient client; |
412 | 452 | int64_t file_size = 0; |
413 | 452 | RETURN_IF_ERROR(client.init(real_url)); |
414 | 452 | Status status; |
415 | 452 | auto download_cb = [&status, &tmp_file, &fp, &digest, &file_size](const void* data, |
416 | 2.04k | size_t length) { |
417 | 2.04k | digest.update(data, length); |
418 | 2.04k | file_size = file_size + length; |
419 | 2.04k | auto res = fwrite(data, length, 1, fp.get()); |
420 | 2.04k | if (res != 1) { |
421 | 0 | LOG(WARNING) << "fail to write data to file, file=" << tmp_file |
422 | 0 | << ", error=" << ferror(fp.get()); |
423 | 0 | status = Status::InternalError("fail to write data when download"); |
424 | 0 | return false; |
425 | 0 | } |
426 | 2.04k | return true; |
427 | 2.04k | }; |
428 | 452 | RETURN_IF_ERROR(client.execute(download_cb)); |
429 | 452 | RETURN_IF_ERROR(status); |
430 | 452 | digest.digest(); |
431 | 452 | if (!iequal(digest.hex(), entry->checksum)) { |
432 | 0 | fmt::memory_buffer error_msg; |
433 | 0 | fmt::format_to(error_msg, |
434 | 0 | " The checksum is not equal of {}. The init info of first create entry is:" |
435 | 0 | "{} But download file check_sum is: {}, file_size is: {}.", |
436 | 0 | url, entry->debug_string(), digest.hex(), file_size); |
437 | 0 | std::string error(fmt::to_string(error_msg)); |
438 | 0 | LOG(WARNING) << error; |
439 | 0 | return Status::InternalError(error); |
440 | 0 | } |
441 | | // close this file |
442 | 452 | fp.reset(); |
443 | | |
444 | | // rename temporary file to library file |
445 | 452 | auto ret = rename(tmp_file.c_str(), entry->lib_file.c_str()); |
446 | 452 | if (ret != 0) { |
447 | 0 | char buf[64]; |
448 | 0 | LOG(WARNING) << "fail to rename file from=" << tmp_file << ", to=" << entry->lib_file |
449 | 0 | << ", errno=" << errno << ", errmsg=" << strerror_r(errno, buf, 64); |
450 | 0 | return Status::InternalError("fail to rename file"); |
451 | 0 | } |
452 | | |
453 | | // check download |
454 | 452 | entry->is_downloaded = true; |
455 | 452 | return Status::OK(); |
456 | 452 | } |
457 | | |
458 | 7.26k | std::string UserFunctionCache::_get_file_name_from_url(const std::string& url) const { |
459 | 7.26k | std::string file_name; |
460 | 7.26k | size_t last_slash_pos = url.find_last_of('/'); |
461 | 7.29k | if (last_slash_pos != std::string::npos) { |
462 | 7.29k | file_name = url.substr(last_slash_pos + 1, url.size()); |
463 | 18.4E | } else { |
464 | 18.4E | file_name = url; |
465 | 18.4E | } |
466 | 7.26k | return file_name; |
467 | 7.26k | } |
468 | | |
469 | | // entry's lock must be held |
470 | | Status UserFunctionCache::_load_cache_entry_internal( |
471 | 0 | std::shared_ptr<UserFunctionCacheEntry> entry) { |
472 | 0 | RETURN_IF_ERROR(dynamic_open(entry->lib_file.c_str(), &entry->lib_handle)); |
473 | 0 | entry->is_loaded.store(true); |
474 | 0 | return Status::OK(); |
475 | 0 | } |
476 | | |
477 | | std::string UserFunctionCache::_make_lib_file(int64_t function_id, const std::string& checksum, |
478 | 455 | LibType type, const std::string& file_name) { |
479 | 455 | int shard = function_id % kLibShardNum; |
480 | 455 | std::stringstream ss; |
481 | 455 | ss << _lib_dir << '/' << shard << '/' << function_id << '.' << checksum; |
482 | 455 | if (type == LibType::JAR) { |
483 | 220 | ss << '.' << file_name; |
484 | 235 | } else if (type == LibType::PY_ZIP) { |
485 | 234 | ss << '.' << file_name; |
486 | 234 | } else { |
487 | 1 | ss << ".so"; |
488 | 1 | } |
489 | 455 | return ss.str(); |
490 | 455 | } |
491 | | |
492 | | Status UserFunctionCache::get_jarpath(int64_t fid, const std::string& url, |
493 | 5.71k | const std::string& checksum, std::string* libpath) { |
494 | 5.71k | std::shared_ptr<UserFunctionCacheEntry> entry = nullptr; |
495 | 5.71k | RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, entry, LibType::JAR)); |
496 | 5.71k | *libpath = entry->lib_file; |
497 | 5.71k | return Status::OK(); |
498 | 5.71k | } |
499 | | |
500 | | Status UserFunctionCache::get_pypath(int64_t fid, const std::string& url, |
501 | 1.54k | const std::string& checksum, std::string* libpath) { |
502 | 1.54k | std::shared_ptr<UserFunctionCacheEntry> entry = nullptr; |
503 | 1.54k | RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, entry, LibType::PY_ZIP)); |
504 | 1.54k | *libpath = entry->lib_file; |
505 | 1.54k | return Status::OK(); |
506 | 1.54k | } |
507 | | |
508 | 64 | std::vector<std::string> UserFunctionCache::_split_string_by_checksum(const std::string& file) { |
509 | 64 | std::vector<std::string> result; |
510 | | |
511 | | // Find the first dot from the start |
512 | 64 | size_t firstDot = file.find('.'); |
513 | 64 | if (firstDot == std::string::npos) return {}; |
514 | | |
515 | | // Find the second dot starting from the first dot's position |
516 | 64 | size_t secondDot = file.find('.', firstDot + 1); |
517 | 64 | if (secondDot == std::string::npos) return {}; |
518 | | |
519 | | // Find the last dot from the end |
520 | 64 | size_t lastDot = file.rfind('.'); |
521 | 64 | if (lastDot == std::string::npos || lastDot <= secondDot) return {}; |
522 | | |
523 | | // Split based on these dots |
524 | 64 | result.push_back(file.substr(0, firstDot)); |
525 | 64 | result.push_back(file.substr(firstDot + 1, secondDot - firstDot - 1)); |
526 | 64 | result.push_back(file.substr(secondDot + 1, lastDot - secondDot - 1)); |
527 | 64 | result.push_back(file.substr(lastDot + 1)); |
528 | | |
529 | 64 | return result; |
530 | 64 | } |
531 | | |
532 | 469 | Status UserFunctionCache::_get_real_url(const std::string& url, std::string* result_url) { |
533 | 469 | if (url.find(":/") == std::string::npos) { |
534 | 8 | return _check_and_return_default_java_udf_url(url, result_url); |
535 | 8 | } |
536 | 461 | *result_url = url; |
537 | 461 | return Status::OK(); |
538 | 469 | } |
539 | | |
540 | | Status UserFunctionCache::_check_and_return_default_java_udf_url(const std::string& url, |
541 | 13 | std::string* result_url) { |
542 | 13 | const char* doris_home = std::getenv("DORIS_HOME"); |
543 | 13 | std::string default_url = std::string(doris_home) + "/plugins/java_udf"; |
544 | | |
545 | 13 | std::filesystem::path file = default_url + "/" + url; |
546 | | |
547 | | // In cloud mode, always try cloud download first (prioritize cloud mode) |
548 | 13 | if (config::is_cloud_mode()) { |
549 | 0 | std::string target_path = default_url + "/" + url; |
550 | 0 | std::string downloaded_path; |
551 | 0 | Status status = CloudPluginDownloader::download_from_cloud( |
552 | 0 | CloudPluginDownloader::PluginType::JAVA_UDF, url, target_path, &downloaded_path); |
553 | 0 | if (status.ok() && !downloaded_path.empty()) { |
554 | 0 | *result_url = "file://" + downloaded_path; |
555 | 0 | return Status::OK(); |
556 | 0 | } else { |
557 | 0 | LOG(WARNING) << "Failed to download Java UDF from cloud: " << status.to_string(); |
558 | 0 | return Status::RuntimeError( |
559 | 0 | "Cannot download Java UDF from cloud: {}. " |
560 | 0 | "Please retry later or check your UDF has been uploaded to cloud.", |
561 | 0 | url); |
562 | 0 | } |
563 | 0 | } |
564 | | |
565 | | // Return the file path regardless of whether it exists (original UDF behavior) |
566 | 13 | *result_url = "file://" + default_url + "/" + url; |
567 | 13 | return Status::OK(); |
568 | 13 | } |
569 | | |
570 | 0 | void UserFunctionCache::drop_function_cache(int64_t fid) { |
571 | 0 | std::shared_ptr<UserFunctionCacheEntry> entry = nullptr; |
572 | 0 | { |
573 | 0 | std::lock_guard<std::mutex> l(_cache_lock); |
574 | 0 | auto it = _entry_map.find(fid); |
575 | 0 | if (it == _entry_map.end()) { |
576 | 0 | return; |
577 | 0 | } |
578 | 0 | entry = it->second; |
579 | 0 | _entry_map.erase(it); |
580 | 0 | } |
581 | | |
582 | | // For Python UDF, clear module cache in Python server before deleting files |
583 | 0 | if (entry->type == LibType::PY_ZIP && !entry->lib_file.empty()) { |
584 | 0 | auto status = PythonServerManager::instance().clear_module_cache(entry->lib_file); |
585 | 0 | if (!status.ok()) [[unlikely]] { |
586 | 0 | LOG(WARNING) << "drop_function_cache: failed to clear Python module cache for " |
587 | 0 | << entry->lib_file << ": " << status.to_string(); |
588 | 0 | } |
589 | 0 | } |
590 | | |
591 | | // Mark for deletion, destructor will delete the files |
592 | 0 | entry->should_delete_library.store(true); |
593 | 0 | } |
594 | | |
595 | | } // namespace doris |