be/src/runtime/plugin/cloud_plugin_downloader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "runtime/plugin/cloud_plugin_downloader.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | |
22 | | #include "cloud/cloud_storage_engine.h" |
23 | | #include "io/fs/local_file_system.h" |
24 | | #include "io/fs/remote_file_system.h" |
25 | | #include "runtime/exec_env.h" |
26 | | |
27 | | namespace doris { |
28 | | |
29 | | // Use 10MB buffer for all downloads - same as cloud_warm_up_manager |
30 | | static constexpr size_t DOWNLOAD_BUFFER_SIZE = 10 * 1024 * 1024; // 10MB |
31 | | |
32 | | // Static mutex definition for synchronizing downloads |
33 | | std::mutex CloudPluginDownloader::_download_mutex; |
34 | | |
35 | | Status CloudPluginDownloader::download_from_cloud(PluginType type, const std::string& name, |
36 | | const std::string& local_path, |
37 | 3 | std::string* result_path) { |
38 | | // Use lock_guard to synchronize concurrent downloads |
39 | 3 | std::lock_guard<std::mutex> lock(_download_mutex); |
40 | | |
41 | 3 | if (name.empty()) { |
42 | 1 | return Status::InvalidArgument("Plugin name cannot be empty"); |
43 | 1 | } |
44 | | |
45 | 2 | CloudPluginDownloader downloader; |
46 | | |
47 | | // 1. Get FileSystem |
48 | 2 | io::RemoteFileSystemSPtr filesystem; |
49 | 2 | RETURN_IF_ERROR(downloader._get_cloud_filesystem(&filesystem)); |
50 | | |
51 | | // 2. Build remote plugin path |
52 | 0 | std::string remote_path; |
53 | 0 | RETURN_IF_ERROR(downloader._build_plugin_path(type, name, &remote_path)); |
54 | 0 | LOG(INFO) << "Downloading plugin: " << name << " -> " << local_path; |
55 | | |
56 | | // 3. Prepare local environment |
57 | 0 | RETURN_IF_ERROR(downloader._prepare_local_path(local_path)); |
58 | | |
59 | | // 4. Download remote file to local |
60 | 0 | RETURN_IF_ERROR(downloader._download_remote_file(filesystem, remote_path, local_path)); |
61 | | |
62 | 0 | *result_path = local_path; |
63 | 0 | LOG(INFO) << "Successfully downloaded plugin: " << name << " to " << local_path; |
64 | |
|
65 | 0 | return Status::OK(); |
66 | 0 | } |
67 | | |
68 | | Status CloudPluginDownloader::_build_plugin_path(PluginType type, const std::string& name, |
69 | 10 | std::string* path) { |
70 | 10 | std::string type_name; |
71 | 10 | switch (type) { |
72 | 4 | case PluginType::JDBC_DRIVERS: |
73 | 4 | type_name = "jdbc_drivers"; |
74 | 4 | break; |
75 | 3 | case PluginType::JAVA_UDF: |
76 | 3 | type_name = "java_udf"; |
77 | 3 | break; |
78 | 3 | default: |
79 | 3 | return Status::InvalidArgument("Unsupported plugin type: {}", static_cast<int>(type)); |
80 | 10 | } |
81 | 7 | *path = fmt::format("plugins/{}/{}", type_name, name); |
82 | 7 | return Status::OK(); |
83 | 10 | } |
84 | | |
85 | 5 | Status CloudPluginDownloader::_get_cloud_filesystem(io::RemoteFileSystemSPtr* filesystem) { |
86 | 5 | BaseStorageEngine& base_engine = ExecEnv::GetInstance()->storage_engine(); |
87 | 5 | CloudStorageEngine* cloud_engine = dynamic_cast<CloudStorageEngine*>(&base_engine); |
88 | 5 | if (!cloud_engine) { |
89 | 3 | return Status::NotFound("CloudStorageEngine not found, not in cloud mode"); |
90 | 3 | } |
91 | | |
92 | 2 | *filesystem = cloud_engine->latest_fs(); |
93 | 2 | if (!*filesystem) { |
94 | 2 | return Status::NotFound("No latest filesystem available in cloud mode"); |
95 | 2 | } |
96 | | |
97 | 0 | return Status::OK(); |
98 | 2 | } |
99 | | |
100 | 4 | Status CloudPluginDownloader::_prepare_local_path(const std::string& local_path) { |
101 | | // Remove existing file if present |
102 | 4 | bool exists = false; |
103 | 4 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(local_path, &exists)); |
104 | 4 | if (exists) { |
105 | 1 | RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(local_path)); |
106 | 1 | LOG(INFO) << "Removed existing file: " << local_path; |
107 | 1 | } |
108 | | |
109 | | // Ensure local directory exists |
110 | 4 | std::string dir_path = local_path.substr(0, local_path.find_last_of('/')); |
111 | 4 | if (!dir_path.empty()) { |
112 | 3 | RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(dir_path)); |
113 | 3 | } |
114 | | |
115 | 4 | return Status::OK(); |
116 | 4 | } |
117 | | |
118 | | Status CloudPluginDownloader::_download_remote_file(io::RemoteFileSystemSPtr filesystem, |
119 | | const std::string& remote_path, |
120 | 0 | const std::string& local_path) { |
121 | | // Open remote file for reading |
122 | 0 | io::FileReaderSPtr remote_reader; |
123 | 0 | io::FileReaderOptions opts; |
124 | 0 | RETURN_IF_ERROR(filesystem->open_file(remote_path, &remote_reader, &opts)); |
125 | | |
126 | | // Get file size |
127 | 0 | int64_t file_size; |
128 | 0 | RETURN_IF_ERROR(filesystem->file_size(remote_path, &file_size)); |
129 | | |
130 | | // Create local file writer |
131 | 0 | io::FileWriterPtr local_writer; |
132 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->create_file(local_path, &local_writer)); |
133 | | |
134 | 0 | auto buffer = std::make_unique<char[]>(DOWNLOAD_BUFFER_SIZE); |
135 | 0 | size_t total_read = 0; |
136 | 0 | while (total_read < file_size) { |
137 | 0 | size_t to_read = |
138 | 0 | std::min(DOWNLOAD_BUFFER_SIZE, static_cast<size_t>(file_size - total_read)); |
139 | 0 | size_t bytes_read; |
140 | 0 | RETURN_IF_ERROR(remote_reader->read_at(total_read, {buffer.get(), to_read}, &bytes_read)); |
141 | 0 | RETURN_IF_ERROR(local_writer->append({buffer.get(), bytes_read})); |
142 | 0 | total_read += bytes_read; |
143 | 0 | } |
144 | | |
145 | 0 | RETURN_IF_ERROR(local_writer->close()); |
146 | 0 | return Status::OK(); |
147 | 0 | } |
148 | | |
149 | | } // namespace doris |