Coverage Report

Created: 2026-03-12 17:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/plugin/cloud_plugin_downloader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/plugin/cloud_plugin_downloader.h"
19
20
#include <fmt/format.h>
21
22
#include "cloud/cloud_storage_engine.h"
23
#include "io/fs/local_file_system.h"
24
#include "io/fs/remote_file_system.h"
25
#include "runtime/exec_env.h"
26
27
namespace doris {
28
29
// Use 10MB buffer for all downloads - same as cloud_warm_up_manager
30
static constexpr size_t DOWNLOAD_BUFFER_SIZE = 10 * 1024 * 1024; // 10MB
31
32
// Static mutex definition for synchronizing downloads
33
std::mutex CloudPluginDownloader::_download_mutex;
34
35
Status CloudPluginDownloader::download_from_cloud(PluginType type, const std::string& name,
36
                                                  const std::string& local_path,
37
3
                                                  std::string* result_path) {
38
    // Use lock_guard to synchronize concurrent downloads
39
3
    std::lock_guard<std::mutex> lock(_download_mutex);
40
41
3
    if (name.empty()) {
42
1
        return Status::InvalidArgument("Plugin name cannot be empty");
43
1
    }
44
45
2
    CloudPluginDownloader downloader;
46
47
    // 1. Get FileSystem
48
2
    io::RemoteFileSystemSPtr filesystem;
49
2
    RETURN_IF_ERROR(downloader._get_cloud_filesystem(&filesystem));
50
51
    // 2. Build remote plugin path
52
0
    std::string remote_path;
53
0
    RETURN_IF_ERROR(downloader._build_plugin_path(type, name, &remote_path));
54
0
    LOG(INFO) << "Downloading plugin: " << name << " -> " << local_path;
55
56
    // 3. Prepare local environment
57
0
    RETURN_IF_ERROR(downloader._prepare_local_path(local_path));
58
59
    // 4. Download remote file to local
60
0
    RETURN_IF_ERROR(downloader._download_remote_file(filesystem, remote_path, local_path));
61
62
0
    *result_path = local_path;
63
0
    LOG(INFO) << "Successfully downloaded plugin: " << name << " to " << local_path;
64
65
0
    return Status::OK();
66
0
}
67
68
Status CloudPluginDownloader::_build_plugin_path(PluginType type, const std::string& name,
69
10
                                                 std::string* path) {
70
10
    std::string type_name;
71
10
    switch (type) {
72
4
    case PluginType::JDBC_DRIVERS:
73
4
        type_name = "jdbc_drivers";
74
4
        break;
75
3
    case PluginType::JAVA_UDF:
76
3
        type_name = "java_udf";
77
3
        break;
78
3
    default:
79
3
        return Status::InvalidArgument("Unsupported plugin type: {}", static_cast<int>(type));
80
10
    }
81
7
    *path = fmt::format("plugins/{}/{}", type_name, name);
82
7
    return Status::OK();
83
10
}
84
85
5
Status CloudPluginDownloader::_get_cloud_filesystem(io::RemoteFileSystemSPtr* filesystem) {
86
5
    BaseStorageEngine& base_engine = ExecEnv::GetInstance()->storage_engine();
87
5
    CloudStorageEngine* cloud_engine = dynamic_cast<CloudStorageEngine*>(&base_engine);
88
5
    if (!cloud_engine) {
89
3
        return Status::NotFound("CloudStorageEngine not found, not in cloud mode");
90
3
    }
91
92
2
    *filesystem = cloud_engine->latest_fs();
93
2
    if (!*filesystem) {
94
2
        return Status::NotFound("No latest filesystem available in cloud mode");
95
2
    }
96
97
0
    return Status::OK();
98
2
}
99
100
4
Status CloudPluginDownloader::_prepare_local_path(const std::string& local_path) {
101
    // Remove existing file if present
102
4
    bool exists = false;
103
4
    RETURN_IF_ERROR(io::global_local_filesystem()->exists(local_path, &exists));
104
4
    if (exists) {
105
1
        RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(local_path));
106
1
        LOG(INFO) << "Removed existing file: " << local_path;
107
1
    }
108
109
    // Ensure local directory exists
110
4
    std::string dir_path = local_path.substr(0, local_path.find_last_of('/'));
111
4
    if (!dir_path.empty()) {
112
3
        RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(dir_path));
113
3
    }
114
115
4
    return Status::OK();
116
4
}
117
118
Status CloudPluginDownloader::_download_remote_file(io::RemoteFileSystemSPtr filesystem,
119
                                                    const std::string& remote_path,
120
0
                                                    const std::string& local_path) {
121
    // Open remote file for reading
122
0
    io::FileReaderSPtr remote_reader;
123
0
    io::FileReaderOptions opts;
124
0
    RETURN_IF_ERROR(filesystem->open_file(remote_path, &remote_reader, &opts));
125
126
    // Get file size
127
0
    int64_t file_size;
128
0
    RETURN_IF_ERROR(filesystem->file_size(remote_path, &file_size));
129
130
    // Create local file writer
131
0
    io::FileWriterPtr local_writer;
132
0
    RETURN_IF_ERROR(io::global_local_filesystem()->create_file(local_path, &local_writer));
133
134
0
    auto buffer = std::make_unique<char[]>(DOWNLOAD_BUFFER_SIZE);
135
0
    size_t total_read = 0;
136
0
    while (total_read < file_size) {
137
0
        size_t to_read =
138
0
                std::min(DOWNLOAD_BUFFER_SIZE, static_cast<size_t>(file_size - total_read));
139
0
        size_t bytes_read;
140
0
        RETURN_IF_ERROR(remote_reader->read_at(total_read, {buffer.get(), to_read}, &bytes_read));
141
0
        RETURN_IF_ERROR(local_writer->append({buffer.get(), bytes_read}));
142
0
        total_read += bytes_read;
143
0
    }
144
145
0
    RETURN_IF_ERROR(local_writer->close());
146
0
    return Status::OK();
147
0
}
148
149
} // namespace doris