be/src/io/fs/s3_file_system.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "io/fs/s3_file_system.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | |
22 | | #include <cstddef> |
23 | | |
24 | | #include "common/compiler_util.h" // IWYU pragma: keep |
25 | | // IWYU pragma: no_include <bits/chrono.h> |
26 | | #include <aws/core/utils/threading/Executor.h> |
27 | | #include <aws/s3/S3Client.h> |
28 | | |
29 | | #include <chrono> // IWYU pragma: keep |
30 | | #include <filesystem> |
31 | | #include <fstream> // IWYU pragma: keep |
32 | | #include <future> |
33 | | #include <memory> |
34 | | |
35 | | #include "common/config.h" |
36 | | #include "common/logging.h" |
37 | | #include "common/status.h" |
38 | | #include "cpp/sync_point.h" |
39 | | #include "io/fs/err_utils.h" |
40 | | #include "io/fs/file_system.h" |
41 | | #include "io/fs/file_writer.h" |
42 | | #include "io/fs/local_file_system.h" |
43 | | #include "io/fs/remote_file_system.h" |
44 | | #include "io/fs/s3_common.h" |
45 | | #include "io/fs/s3_file_reader.h" |
46 | | #include "io/fs/s3_file_writer.h" |
47 | | #include "io/fs/s3_obj_storage_client.h" |
48 | | #include "runtime/exec_env.h" |
49 | | #include "runtime/thread_context.h" |
50 | | #include "util/s3_uri.h" |
51 | | #include "util/s3_util.h" |
52 | | |
53 | | namespace doris::io { |
54 | | namespace { |
55 | | constexpr std::string_view OSS_PRIVATE_ENDPOINT_SUFFIX = "-internal.aliyuncs.com"; |
56 | | constexpr int LEN_OF_OSS_PRIVATE_SUFFIX = 9; // length of "-internal" |
57 | | |
58 | | #ifndef CHECK_S3_CLIENT |
59 | | #define CHECK_S3_CLIENT(client) \ |
60 | 73.8k | if (!client) { \ |
61 | 0 | return Status::InvalidArgument("init s3 client error"); \ |
62 | 0 | } |
63 | | #endif |
64 | | |
65 | 306k | Result<std::string> get_key(const Path& full_path) { |
66 | | // FIXME(plat1ko): Check bucket in full path and support relative path |
67 | 306k | S3URI uri(full_path.native()); |
68 | 306k | RETURN_IF_ERROR_RESULT(uri.parse()); |
69 | 306k | return uri.get_key(); |
70 | 306k | } |
71 | | |
72 | | } // namespace |
73 | | |
74 | 19.6k | ObjClientHolder::ObjClientHolder(S3ClientConf conf) : _conf(std::move(conf)) {} |
75 | | |
76 | 19.6k | ObjClientHolder::~ObjClientHolder() = default; |
77 | | |
78 | 19.5k | Status ObjClientHolder::init() { |
79 | 19.5k | _client = S3ClientFactory::instance().create(_conf); |
80 | 19.5k | if (!_client) { |
81 | 8 | return Status::InvalidArgument("failed to init s3 client with conf {}", _conf.to_string()); |
82 | 8 | } |
83 | | |
84 | 19.5k | return Status::OK(); |
85 | 19.5k | } |
86 | | |
87 | 119 | Status ObjClientHolder::reset(const S3ClientConf& conf) { |
88 | 119 | S3ClientConf reset_conf; |
89 | 119 | { |
90 | 119 | std::shared_lock lock(_mtx); |
91 | 119 | if (conf.get_hash() == _conf.get_hash()) { |
92 | 119 | return Status::OK(); // Same conf |
93 | 119 | } |
94 | | |
95 | 0 | reset_conf = _conf; |
96 | 0 | reset_conf.ak = conf.ak; |
97 | 0 | reset_conf.sk = conf.sk; |
98 | 0 | reset_conf.token = conf.token; |
99 | 0 | reset_conf.bucket = conf.bucket; |
100 | 0 | reset_conf.connect_timeout_ms = conf.connect_timeout_ms; |
101 | 0 | reset_conf.max_connections = conf.max_connections; |
102 | 0 | reset_conf.request_timeout_ms = conf.request_timeout_ms; |
103 | 0 | reset_conf.use_virtual_addressing = conf.use_virtual_addressing; |
104 | |
|
105 | 0 | reset_conf.role_arn = conf.role_arn; |
106 | 0 | reset_conf.external_id = conf.external_id; |
107 | 0 | reset_conf.cred_provider_type = conf.cred_provider_type; |
108 | | // Should check endpoint here? |
109 | 0 | } |
110 | | |
111 | 0 | auto client = S3ClientFactory::instance().create(reset_conf); |
112 | 0 | if (!client) { |
113 | 0 | return Status::InvalidArgument("failed to init s3 client with conf {}", conf.to_string()); |
114 | 0 | } |
115 | | |
116 | 0 | LOG(WARNING) << "reset s3 client with new conf: " << conf.to_string(); |
117 | |
|
118 | 0 | { |
119 | 0 | std::lock_guard lock(_mtx); |
120 | 0 | _client = std::move(client); |
121 | 0 | _conf = std::move(reset_conf); |
122 | 0 | } |
123 | |
|
124 | 0 | return Status::OK(); |
125 | 0 | } |
126 | | |
127 | | Result<int64_t> ObjClientHolder::object_file_size(const std::string& bucket, |
128 | 29.0k | const std::string& key) const { |
129 | 29.0k | auto client = get(); |
130 | 29.0k | if (!client) { |
131 | 0 | return ResultError(Status::InvalidArgument("init s3 client error")); |
132 | 0 | } |
133 | | |
134 | 29.0k | auto resp = client->head_object({ |
135 | 29.0k | .bucket = bucket, |
136 | 29.0k | .key = key, |
137 | 29.0k | }); |
138 | | |
139 | 29.0k | if (resp.resp.status.code != ErrorCode::OK) { |
140 | 3 | return ResultError(std::move(Status(resp.resp.status.code, std::move(resp.resp.status.msg)) |
141 | 3 | .append(fmt::format("failed to head s3 file {}", |
142 | 3 | full_s3_path(bucket, key))))); |
143 | 3 | } |
144 | | |
145 | 29.0k | return resp.file_size; |
146 | 29.0k | } |
147 | | |
148 | 652 | std::string ObjClientHolder::full_s3_path(std::string_view bucket, std::string_view key) const { |
149 | 652 | return fmt::format("{}/{}/{}", _conf.endpoint, bucket, key); |
150 | 652 | } |
151 | | |
152 | 649 | std::string S3FileSystem::full_s3_path(std::string_view key) const { |
153 | 649 | return _client->full_s3_path(_bucket, key); |
154 | 649 | } |
155 | | |
156 | 1.09k | Result<std::shared_ptr<S3FileSystem>> S3FileSystem::create(S3Conf s3_conf, std::string id) { |
157 | 1.09k | std::shared_ptr<S3FileSystem> fs(new S3FileSystem(std::move(s3_conf), std::move(id))); |
158 | 1.09k | RETURN_IF_ERROR_RESULT(fs->init()); |
159 | 1.08k | return fs; |
160 | 1.09k | } |
161 | | |
162 | | S3FileSystem::S3FileSystem(S3Conf s3_conf, std::string id) |
163 | 1.09k | : RemoteFileSystem(s3_conf.prefix, std::move(id), FileSystemType::S3), |
164 | 1.09k | _bucket(std::move(s3_conf.bucket)), |
165 | 1.09k | _prefix(std::move(s3_conf.prefix)), |
166 | 1.09k | _client(std::make_shared<ObjClientHolder>(std::move(s3_conf.client_conf))) { |
167 | | // FIXME(plat1ko): Normalize prefix |
168 | | // remove the first and last '/' |
169 | 1.09k | if (!_prefix.empty()) { |
170 | 42 | size_t start = _prefix.find_first_not_of('/'); |
171 | 42 | if (start == std::string::npos) { |
172 | 0 | _prefix = ""; |
173 | 42 | } else { |
174 | 42 | size_t end = _prefix.find_last_not_of('/'); |
175 | 42 | if (start > 0 || end < _prefix.size() - 1) { |
176 | 0 | _prefix = _prefix.substr(start, end - start + 1); |
177 | 0 | } |
178 | 42 | } |
179 | 42 | } |
180 | 1.09k | } |
181 | | |
182 | 1.09k | Status S3FileSystem::init() { |
183 | 1.09k | return _client->init(); |
184 | 1.09k | } |
185 | | |
186 | 1.07k | S3FileSystem::~S3FileSystem() = default; |
187 | | |
188 | | Status S3FileSystem::create_file_impl(const Path& file, FileWriterPtr* writer, |
189 | 71.7k | const FileWriterOptions* opts) { |
190 | 71.7k | auto client = _client->get(); |
191 | 71.7k | CHECK_S3_CLIENT(client); |
192 | 71.7k | auto key = DORIS_TRY(get_key(file)); |
193 | 71.7k | *writer = std::make_unique<S3FileWriter>(_client, _bucket, std::move(key), opts); |
194 | 71.7k | return Status::OK(); |
195 | 71.7k | } |
196 | | |
197 | | Status S3FileSystem::open_file_internal(const Path& file, FileReaderSPtr* reader, |
198 | 232k | const FileReaderOptions& opts) { |
199 | 232k | TEST_SYNC_POINT_CALLBACK("S3FileSystem::open_file_internal", &file, &opts); |
200 | 232k | auto key = DORIS_TRY(get_key(file)); |
201 | 232k | *reader = DORIS_TRY(S3FileReader::create(_client, _bucket, key, opts.file_size, nullptr)); |
202 | 232k | return Status::OK(); |
203 | 232k | } |
204 | | |
205 | 598 | Status S3FileSystem::create_directory_impl(const Path& dir, bool failed_if_exists) { |
206 | 598 | return Status::OK(); |
207 | 598 | } |
208 | | |
209 | 1 | Status S3FileSystem::delete_file_impl(const Path& file) { |
210 | 1 | auto client = _client->get(); |
211 | 1 | CHECK_S3_CLIENT(client); |
212 | | |
213 | 1 | auto key = DORIS_TRY(get_key(file)); |
214 | | |
215 | 1 | auto resp = client->delete_object({.bucket = _bucket, .key = key}); |
216 | | |
217 | 1 | if (resp.status.code == ErrorCode::OK || resp.status.code == ErrorCode::NOT_FOUND) { |
218 | 1 | return Status::OK(); |
219 | 1 | } |
220 | 0 | return std::move(Status(resp.status.code, std::move(resp.status.msg)) |
221 | 0 | .append(fmt::format("failed to delete file {}", full_s3_path(key)))); |
222 | 1 | } |
223 | | |
224 | 0 | Status S3FileSystem::delete_directory_impl(const Path& dir) { |
225 | 0 | auto client = _client->get(); |
226 | 0 | CHECK_S3_CLIENT(client); |
227 | |
|
228 | 0 | auto prefix = DORIS_TRY(get_key(dir)); |
229 | 0 | if (!prefix.empty() && prefix.back() != '/') { |
230 | 0 | prefix.push_back('/'); |
231 | 0 | } |
232 | |
|
233 | 0 | auto resp = client->delete_objects_recursively({ |
234 | 0 | .path = full_s3_path(prefix), |
235 | 0 | .bucket = _bucket, |
236 | 0 | .prefix = prefix, |
237 | 0 | }); |
238 | 0 | return {resp.status.code, std::move(resp.status.msg)}; |
239 | 0 | } |
240 | | |
241 | 0 | Status S3FileSystem::batch_delete_impl(const std::vector<Path>& remote_files) { |
242 | 0 | auto client = _client->get(); |
243 | 0 | CHECK_S3_CLIENT(client); |
244 | | |
245 | | // `DeleteObjectsRequest` can only contain 1000 keys at most. |
246 | 0 | constexpr size_t max_delete_batch = 1000; |
247 | 0 | auto path_iter = remote_files.begin(); |
248 | |
|
249 | 0 | do { |
250 | 0 | std::vector<std::string> objects; |
251 | 0 | auto path_begin = path_iter; |
252 | 0 | for (; path_iter != remote_files.end() && (path_iter - path_begin < max_delete_batch); |
253 | 0 | ++path_iter) { |
254 | 0 | auto key = DORIS_TRY(get_key(*path_iter)); |
255 | 0 | objects.emplace_back(std::move(key)); |
256 | 0 | } |
257 | 0 | if (objects.empty()) { |
258 | 0 | return Status::OK(); |
259 | 0 | } |
260 | | // clang-format off |
261 | 0 | if (auto resp = client->delete_objects( {.bucket = _bucket,}, std::move(objects)); resp.status.code != ErrorCode::OK) { |
262 | 0 | return {resp.status.code, std::move(resp.status.msg)}; |
263 | 0 | } |
264 | | // clang-format on |
265 | 0 | } while (path_iter != remote_files.end()); |
266 | | |
267 | 0 | return Status::OK(); |
268 | 0 | } |
269 | | |
270 | 1.20k | Status S3FileSystem::exists_impl(const Path& path, bool* res) const { |
271 | 1.20k | auto client = _client->get(); |
272 | 1.20k | CHECK_S3_CLIENT(client); |
273 | 1.20k | auto key = DORIS_TRY(get_key(path)); |
274 | | |
275 | 1.20k | VLOG_DEBUG << "key:" << key << " path:" << path; |
276 | | |
277 | 1.20k | auto resp = client->head_object({.bucket = _bucket, .key = key}); |
278 | | |
279 | 1.20k | if (resp.resp.status.code == ErrorCode::OK) { |
280 | 0 | *res = true; |
281 | 1.20k | } else if (resp.resp.status.code == ErrorCode::NOT_FOUND) { |
282 | 1.20k | *res = false; |
283 | 1.20k | } else { |
284 | 0 | return std::move( |
285 | 0 | Status(resp.resp.status.code, std::move(resp.resp.status.msg)) |
286 | 0 | .append(fmt::format(" failed to check exists {}", full_s3_path(key)))); |
287 | 0 | } |
288 | 1.20k | return Status::OK(); |
289 | 1.20k | } |
290 | | |
291 | 191 | Status S3FileSystem::file_size_impl(const Path& file, int64_t* file_size) const { |
292 | 191 | auto key = DORIS_TRY(get_key(file)); |
293 | 191 | *file_size = DORIS_TRY(_client->object_file_size(_bucket, key)); |
294 | 191 | return Status::OK(); |
295 | 191 | } |
296 | | |
297 | | Status S3FileSystem::list_impl(const Path& dir, bool only_file, std::vector<FileInfo>* files, |
298 | 167 | bool* exists) { |
299 | | // For object storage, this path is always not exist. |
300 | | // So we ignore this property and set exists to true. |
301 | 167 | *exists = true; |
302 | 167 | auto client = _client->get(); |
303 | 167 | CHECK_S3_CLIENT(client); |
304 | 167 | auto prefix = DORIS_TRY(get_key(dir)); |
305 | 167 | if (!prefix.empty() && prefix.back() != '/') { |
306 | 167 | prefix.push_back('/'); |
307 | 167 | } |
308 | | |
309 | | // clang-format off |
310 | 167 | auto resp = client->list_objects( {.bucket = _bucket, .prefix = prefix,}, files); |
311 | | // clang-format on |
312 | 167 | if (resp.status.code == ErrorCode::OK) { |
313 | 167 | for (auto&& file : *files) { |
314 | 123 | file.file_name.erase(0, prefix.size()); |
315 | 123 | } |
316 | 167 | } |
317 | | |
318 | 167 | return {resp.status.code, std::move(resp.status.msg)}; |
319 | 167 | } |
320 | | |
321 | 0 | Status S3FileSystem::rename_impl(const Path& orig_name, const Path& new_name) { |
322 | 0 | return Status::NotSupported("S3FileSystem::rename_impl"); |
323 | 0 | } |
324 | | |
325 | 649 | Status S3FileSystem::upload_impl(const Path& local_file, const Path& remote_file) { |
326 | 649 | auto client = _client->get(); |
327 | 649 | CHECK_S3_CLIENT(client); |
328 | | |
329 | 649 | auto key = DORIS_TRY(get_key(remote_file)); |
330 | 649 | auto start = std::chrono::steady_clock::now(); |
331 | 649 | FileWriterPtr obj_writer; |
332 | 649 | RETURN_IF_ERROR(create_file_impl(key, &obj_writer, nullptr)); |
333 | 649 | FileReaderSPtr local_reader; |
334 | 649 | RETURN_IF_ERROR(io::global_local_filesystem()->open_file(local_file, &local_reader)); |
335 | 649 | size_t local_buffer_size = config::s3_file_system_local_upload_buffer_size; |
336 | 649 | std::unique_ptr<char[]> write_buffer = |
337 | 649 | std::make_unique_for_overwrite<char[]>(local_buffer_size); |
338 | 649 | size_t cur_read = 0; |
339 | 1.29k | while (cur_read < local_reader->size()) { |
340 | 649 | size_t bytes_read = 0; |
341 | 649 | RETURN_IF_ERROR(local_reader->read_at( |
342 | 649 | cur_read, Slice {write_buffer.get(), local_buffer_size}, &bytes_read)); |
343 | 649 | RETURN_IF_ERROR(obj_writer->append({write_buffer.get(), bytes_read})); |
344 | 649 | cur_read += bytes_read; |
345 | 649 | } |
346 | 649 | RETURN_IF_ERROR(obj_writer->close()); |
347 | 649 | auto duration = std::chrono::duration<float>(std::chrono::steady_clock::now() - start); |
348 | | |
349 | 649 | auto size = local_reader->size(); |
350 | 649 | LOG(INFO) << "Upload " << local_file.native() << " to " << full_s3_path(key) |
351 | 649 | << ", duration=" << duration.count() << ", bytes=" << size; |
352 | | |
353 | 649 | return Status::OK(); |
354 | 649 | } |
355 | | |
356 | | Status S3FileSystem::batch_upload_impl(const std::vector<Path>& local_files, |
357 | 0 | const std::vector<Path>& remote_files) { |
358 | 0 | auto client = _client->get(); |
359 | 0 | CHECK_S3_CLIENT(client); |
360 | |
|
361 | 0 | if (local_files.size() != remote_files.size()) { |
362 | 0 | return Status::InvalidArgument("local_files.size({}) != remote_files.size({})", |
363 | 0 | local_files.size(), remote_files.size()); |
364 | 0 | } |
365 | | |
366 | 0 | std::vector<FileWriterPtr> obj_writers(local_files.size()); |
367 | |
|
368 | 0 | auto upload_task = [&, this](size_t idx) { |
369 | 0 | SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker()); |
370 | 0 | const auto& local_file = local_files[idx]; |
371 | 0 | const auto& remote_file = remote_files[idx]; |
372 | 0 | auto& obj_writer = obj_writers[idx]; |
373 | 0 | auto key = DORIS_TRY(get_key(remote_file)); |
374 | 0 | LOG(INFO) << "Start to upload " << local_file.native() << " to " << full_s3_path(key); |
375 | 0 | RETURN_IF_ERROR(create_file_impl(key, &obj_writer, nullptr)); |
376 | 0 | FileReaderSPtr local_reader; |
377 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->open_file(local_file, &local_reader)); |
378 | 0 | size_t local_buffer_size = config::s3_file_system_local_upload_buffer_size; |
379 | 0 | std::unique_ptr<char[]> write_buffer = |
380 | 0 | std::make_unique_for_overwrite<char[]>(local_buffer_size); |
381 | 0 | size_t cur_read = 0; |
382 | 0 | while (cur_read < local_reader->size()) { |
383 | 0 | size_t bytes_read = 0; |
384 | 0 | RETURN_IF_ERROR(local_reader->read_at( |
385 | 0 | cur_read, Slice {write_buffer.get(), local_buffer_size}, &bytes_read)); |
386 | 0 | RETURN_IF_ERROR((*obj_writer).append({write_buffer.get(), bytes_read})); |
387 | 0 | cur_read += bytes_read; |
388 | 0 | } |
389 | 0 | RETURN_IF_ERROR((*obj_writer).close()); |
390 | 0 | return Status::OK(); |
391 | 0 | }; |
392 | |
|
393 | 0 | Status s = Status::OK(); |
394 | 0 | std::vector<std::future<Status>> futures; |
395 | 0 | for (int i = 0; i < local_files.size(); ++i) { |
396 | 0 | auto task = std::make_shared<std::packaged_task<Status(size_t idx)>>(upload_task); |
397 | 0 | futures.emplace_back(task->get_future()); |
398 | 0 | auto st = ExecEnv::GetInstance()->s3_file_system_thread_pool()->submit_func( |
399 | 0 | [t = std::move(task), idx = i]() mutable { (*t)(idx); }); |
400 | | // We shouldn't return immediately since the previous submitted tasks might still be running in the thread pool |
401 | 0 | if (!st.ok()) { |
402 | 0 | s = st; |
403 | 0 | break; |
404 | 0 | } |
405 | 0 | } |
406 | 0 | for (auto&& f : futures) { |
407 | 0 | auto cur_s = f.get(); |
408 | 0 | if (!cur_s.ok()) { |
409 | 0 | s = std::move(cur_s); |
410 | 0 | } |
411 | 0 | } |
412 | 0 | return s; |
413 | 0 | } |
414 | | |
415 | 109 | Status S3FileSystem::download_impl(const Path& remote_file, const Path& local_file) { |
416 | 109 | auto client = _client->get(); |
417 | 109 | CHECK_S3_CLIENT(client); |
418 | 109 | auto key = DORIS_TRY(get_key(remote_file)); |
419 | 109 | int64_t size; |
420 | 109 | RETURN_IF_ERROR(file_size(remote_file, &size)); |
421 | 109 | std::unique_ptr<char[]> buf = std::make_unique_for_overwrite<char[]>(size); |
422 | 109 | size_t bytes_read = 0; |
423 | | // clang-format off |
424 | 109 | auto resp = client->get_object( {.bucket = _bucket, .key = key,}, |
425 | 109 | buf.get(), 0, size, &bytes_read); |
426 | | // clang-format on |
427 | 109 | if (resp.status.code != ErrorCode::OK) { |
428 | 0 | return {resp.status.code, std::move(resp.status.msg)}; |
429 | 0 | } |
430 | 109 | Aws::OFStream local_file_s; |
431 | 109 | local_file_s.open(local_file, std::ios::out | std::ios::binary); |
432 | 109 | if (local_file_s.good()) { |
433 | 109 | local_file_s << StringViewStream(buf.get(), size).rdbuf(); |
434 | 109 | } else { |
435 | 0 | return localfs_error(errno, fmt::format("failed to write file {}", local_file.native())); |
436 | 0 | } |
437 | | |
438 | 109 | return Status::OK(); |
439 | 109 | } |
440 | | |
441 | | // oss has public endpoint and private endpoint, is_public_endpoint determines |
442 | | // whether to return a public endpoint. |
443 | | std::string S3FileSystem::generate_presigned_url(const Path& path, int64_t expiration_secs, |
444 | 564 | bool is_public_endpoint) const { |
445 | 564 | std::string key = fmt::format("{}/{}", _prefix, path.native()); |
446 | 564 | std::shared_ptr<ObjStorageClient> client; |
447 | 564 | if (is_public_endpoint && |
448 | 564 | _client->s3_client_conf().endpoint.ends_with(OSS_PRIVATE_ENDPOINT_SUFFIX)) { |
449 | 564 | auto new_s3_conf = _client->s3_client_conf(); |
450 | 564 | new_s3_conf.endpoint.erase( |
451 | 564 | _client->s3_client_conf().endpoint.size() - OSS_PRIVATE_ENDPOINT_SUFFIX.size(), |
452 | 564 | LEN_OF_OSS_PRIVATE_SUFFIX); |
453 | 564 | client = S3ClientFactory::instance().create(new_s3_conf); |
454 | 564 | } else { |
455 | 0 | client = _client->get(); |
456 | 0 | } |
457 | 564 | return client->generate_presigned_url({.bucket = _bucket, .key = key}, expiration_secs, |
458 | 564 | _client->s3_client_conf()); |
459 | 564 | } |
460 | | |
461 | | } // namespace doris::io |