Coverage Report

Created: 2026-07-01 04:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/s3_file_system.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/s3_file_system.h"
19
20
#include <fmt/format.h>
21
22
#include <cstddef>
23
24
#include "common/compiler_util.h" // IWYU pragma: keep
25
// IWYU pragma: no_include <bits/chrono.h>
26
#include <aws/core/utils/threading/Executor.h>
27
#include <aws/s3/S3Client.h>
28
29
#include <chrono> // IWYU pragma: keep
30
#include <filesystem>
31
#include <fstream> // IWYU pragma: keep
32
#include <future>
33
#include <memory>
34
35
#include "common/config.h"
36
#include "common/logging.h"
37
#include "common/status.h"
38
#include "cpp/sync_point.h"
39
#include "io/fs/err_utils.h"
40
#include "io/fs/file_system.h"
41
#include "io/fs/file_writer.h"
42
#include "io/fs/local_file_system.h"
43
#include "io/fs/remote_file_system.h"
44
#include "io/fs/s3_common.h"
45
#include "io/fs/s3_file_reader.h"
46
#include "io/fs/s3_file_writer.h"
47
#include "io/fs/s3_obj_storage_client.h"
48
#include "runtime/exec_env.h"
49
#include "runtime/thread_context.h"
50
#include "util/s3_uri.h"
51
#include "util/s3_util.h"
52
53
namespace doris::io {
54
namespace {
55
constexpr std::string_view OSS_PRIVATE_ENDPOINT_SUFFIX = "-internal.aliyuncs.com";
56
constexpr int LEN_OF_OSS_PRIVATE_SUFFIX = 9; // length of "-internal"
57
58
#ifndef CHECK_S3_CLIENT
59
#define CHECK_S3_CLIENT(client)                                 \
60
75.6k
    if (!client) {                                              \
61
0
        return Status::InvalidArgument("init s3 client error"); \
62
0
    }
63
#endif
64
65
308k
Result<std::string> get_key(const Path& full_path) {
66
    // FIXME(plat1ko): Check bucket in full path and support relative path
67
308k
    S3URI uri(full_path.native());
68
308k
    RETURN_IF_ERROR_RESULT(uri.parse());
69
308k
    return uri.get_key();
70
308k
}
71
72
} // namespace
73
74
37.4k
ObjClientHolder::ObjClientHolder(S3ClientConf conf) : _conf(std::move(conf)) {}
75
76
37.3k
ObjClientHolder::~ObjClientHolder() = default;
77
78
37.2k
Status ObjClientHolder::init() {
79
37.2k
    _client = S3ClientFactory::instance().create(_conf);
80
37.2k
    if (!_client) {
81
8
        return Status::InvalidArgument("failed to init s3 client with conf {}", _conf.to_string());
82
8
    }
83
84
37.2k
    return Status::OK();
85
37.2k
}
86
87
306
Status ObjClientHolder::reset(const S3ClientConf& conf) {
88
306
    S3ClientConf reset_conf;
89
306
    {
90
306
        std::shared_lock lock(_mtx);
91
306
        if (conf.get_hash() == _conf.get_hash()) {
92
304
            return Status::OK(); // Same conf
93
304
        }
94
95
2
        reset_conf = _conf;
96
2
        reset_conf.ak = conf.ak;
97
2
        reset_conf.sk = conf.sk;
98
2
        reset_conf.token = conf.token;
99
2
        reset_conf.bucket = conf.bucket;
100
2
        reset_conf.connect_timeout_ms = conf.connect_timeout_ms;
101
2
        reset_conf.max_connections = conf.max_connections;
102
2
        reset_conf.request_timeout_ms = conf.request_timeout_ms;
103
2
        reset_conf.use_virtual_addressing = conf.use_virtual_addressing;
104
105
2
        reset_conf.role_arn = conf.role_arn;
106
2
        reset_conf.external_id = conf.external_id;
107
2
        reset_conf.cred_provider_type = conf.cred_provider_type;
108
        // Should check endpoint here?
109
2
    }
110
111
0
    auto client = S3ClientFactory::instance().create(reset_conf);
112
2
    if (!client) {
113
0
        return Status::InvalidArgument("failed to init s3 client with conf {}", conf.to_string());
114
0
    }
115
116
2
    LOG(WARNING) << "reset s3 client with new conf: " << conf.to_string();
117
118
2
    {
119
2
        std::lock_guard lock(_mtx);
120
2
        _client = std::move(client);
121
2
        _conf = std::move(reset_conf);
122
2
    }
123
124
2
    return Status::OK();
125
2
}
126
127
Result<int64_t> ObjClientHolder::object_file_size(const std::string& bucket,
128
32.3k
                                                  const std::string& key) const {
129
32.3k
    auto client = get();
130
32.3k
    if (!client) {
131
0
        return ResultError(Status::InvalidArgument("init s3 client error"));
132
0
    }
133
134
32.3k
    auto resp = client->head_object({
135
32.3k
            .bucket = bucket,
136
32.3k
            .key = key,
137
32.3k
    });
138
139
32.3k
    if (resp.resp.status.code != ErrorCode::OK) {
140
3
        return ResultError(std::move(Status(resp.resp.status.code, std::move(resp.resp.status.msg))
141
3
                                             .append(fmt::format("failed to head s3 file {}",
142
3
                                                                 full_s3_path(bucket, key)))));
143
3
    }
144
145
32.3k
    return resp.file_size;
146
32.3k
}
147
148
751
std::string ObjClientHolder::full_s3_path(std::string_view bucket, std::string_view key) const {
149
751
    return fmt::format("{}/{}/{}", _conf.endpoint, bucket, key);
150
751
}
151
152
748
std::string S3FileSystem::full_s3_path(std::string_view key) const {
153
748
    return _client->full_s3_path(_bucket, key);
154
748
}
155
156
2.12k
Result<std::shared_ptr<S3FileSystem>> S3FileSystem::create(S3Conf s3_conf, std::string id) {
157
2.12k
    std::shared_ptr<S3FileSystem> fs(new S3FileSystem(std::move(s3_conf), std::move(id)));
158
2.12k
    RETURN_IF_ERROR_RESULT(fs->init());
159
2.11k
    return fs;
160
2.12k
}
161
162
S3FileSystem::S3FileSystem(S3Conf s3_conf, std::string id)
163
2.12k
        : RemoteFileSystem(s3_conf.prefix, std::move(id), FileSystemType::S3),
164
2.12k
          _bucket(std::move(s3_conf.bucket)),
165
2.12k
          _prefix(std::move(s3_conf.prefix)),
166
2.12k
          _client(std::make_shared<ObjClientHolder>(std::move(s3_conf.client_conf))) {
167
    // FIXME(plat1ko): Normalize prefix
168
    // remove the first and last '/'
169
2.12k
    if (!_prefix.empty()) {
170
42
        size_t start = _prefix.find_first_not_of('/');
171
42
        if (start == std::string::npos) {
172
0
            _prefix = "";
173
42
        } else {
174
42
            size_t end = _prefix.find_last_not_of('/');
175
42
            if (start > 0 || end < _prefix.size() - 1) {
176
0
                _prefix = _prefix.substr(start, end - start + 1);
177
0
            }
178
42
        }
179
42
    }
180
2.12k
}
181
182
2.12k
Status S3FileSystem::init() {
183
2.12k
    return _client->init();
184
2.12k
}
185
186
2.10k
S3FileSystem::~S3FileSystem() = default;
187
188
Status S3FileSystem::create_file_impl(const Path& file, FileWriterPtr* writer,
189
73.0k
                                      const FileWriterOptions* opts) {
190
73.0k
    auto client = _client->get();
191
73.0k
    CHECK_S3_CLIENT(client);
192
73.0k
    auto key = DORIS_TRY(get_key(file));
193
73.0k
    *writer = std::make_unique<S3FileWriter>(_client, _bucket, std::move(key), opts);
194
73.0k
    return Status::OK();
195
73.0k
}
196
197
Status S3FileSystem::open_file_internal(const Path& file, FileReaderSPtr* reader,
198
233k
                                        const FileReaderOptions& opts) {
199
233k
    TEST_SYNC_POINT_CALLBACK("S3FileSystem::open_file_internal", &file, &opts);
200
233k
    auto key = DORIS_TRY(get_key(file));
201
233k
    *reader = DORIS_TRY(S3FileReader::create(_client, _bucket, key, opts.file_size, nullptr));
202
233k
    return Status::OK();
203
233k
}
204
205
596
Status S3FileSystem::create_directory_impl(const Path& dir, bool failed_if_exists) {
206
596
    return Status::OK();
207
596
}
208
209
2
Status S3FileSystem::delete_file_impl(const Path& file) {
210
2
    auto client = _client->get();
211
2
    CHECK_S3_CLIENT(client);
212
213
2
    auto key = DORIS_TRY(get_key(file));
214
215
2
    auto resp = client->delete_object({.bucket = _bucket, .key = key});
216
217
2
    if (resp.status.code == ErrorCode::OK || resp.status.code == ErrorCode::NOT_FOUND) {
218
2
        return Status::OK();
219
2
    }
220
0
    return std::move(Status(resp.status.code, std::move(resp.status.msg))
221
0
                             .append(fmt::format("failed to delete file {}", full_s3_path(key))));
222
2
}
223
224
0
Status S3FileSystem::delete_directory_impl(const Path& dir) {
225
0
    auto client = _client->get();
226
0
    CHECK_S3_CLIENT(client);
227
228
0
    auto prefix = DORIS_TRY(get_key(dir));
229
0
    if (!prefix.empty() && prefix.back() != '/') {
230
0
        prefix.push_back('/');
231
0
    }
232
233
0
    auto resp = client->delete_objects_recursively({
234
0
            .path = full_s3_path(prefix),
235
0
            .bucket = _bucket,
236
0
            .prefix = prefix,
237
0
    });
238
0
    return {resp.status.code, std::move(resp.status.msg)};
239
0
}
240
241
0
Status S3FileSystem::batch_delete_impl(const std::vector<Path>& remote_files) {
242
0
    auto client = _client->get();
243
0
    CHECK_S3_CLIENT(client);
244
245
    // `DeleteObjectsRequest` can only contain 1000 keys at most.
246
0
    constexpr size_t max_delete_batch = 1000;
247
0
    auto path_iter = remote_files.begin();
248
249
0
    do {
250
0
        std::vector<std::string> objects;
251
0
        auto path_begin = path_iter;
252
0
        for (; path_iter != remote_files.end() && (path_iter - path_begin < max_delete_batch);
253
0
             ++path_iter) {
254
0
            auto key = DORIS_TRY(get_key(*path_iter));
255
0
            objects.emplace_back(std::move(key));
256
0
        }
257
0
        if (objects.empty()) {
258
0
            return Status::OK();
259
0
        }
260
        // clang-format off
261
0
        if (auto resp = client->delete_objects( {.bucket = _bucket,}, std::move(objects)); resp.status.code != ErrorCode::OK) {
262
0
            return {resp.status.code, std::move(resp.status.msg)};
263
0
        }
264
        // clang-format on
265
0
    } while (path_iter != remote_files.end());
266
267
0
    return Status::OK();
268
0
}
269
270
1.19k
Status S3FileSystem::exists_impl(const Path& path, bool* res) const {
271
1.19k
    auto client = _client->get();
272
1.19k
    CHECK_S3_CLIENT(client);
273
1.19k
    auto key = DORIS_TRY(get_key(path));
274
275
18.4E
    VLOG_DEBUG << "key:" << key << " path:" << path;
276
277
1.19k
    auto resp = client->head_object({.bucket = _bucket, .key = key});
278
279
1.19k
    if (resp.resp.status.code == ErrorCode::OK) {
280
0
        *res = true;
281
1.19k
    } else if (resp.resp.status.code == ErrorCode::NOT_FOUND) {
282
1.19k
        *res = false;
283
18.4E
    } else {
284
18.4E
        return std::move(
285
18.4E
                Status(resp.resp.status.code, std::move(resp.resp.status.msg))
286
18.4E
                        .append(fmt::format(" failed to check exists {}", full_s3_path(key))));
287
18.4E
    }
288
1.19k
    return Status::OK();
289
1.19k
}
290
291
348
Status S3FileSystem::file_size_impl(const Path& file, int64_t* file_size) const {
292
348
    auto key = DORIS_TRY(get_key(file));
293
348
    *file_size = DORIS_TRY(_client->object_file_size(_bucket, key));
294
348
    return Status::OK();
295
348
}
296
297
Status S3FileSystem::list_impl(const Path& dir, bool only_file, std::vector<FileInfo>* files,
298
327
                               bool* exists) {
299
    // For object storage, this path is always not exist.
300
    // So we ignore this property and set exists to true.
301
327
    *exists = true;
302
327
    auto client = _client->get();
303
327
    CHECK_S3_CLIENT(client);
304
327
    auto prefix = DORIS_TRY(get_key(dir));
305
327
    if (!prefix.empty() && prefix.back() != '/') {
306
327
        prefix.push_back('/');
307
327
    }
308
309
    // clang-format off
310
327
    auto resp = client->list_objects( {.bucket = _bucket, .prefix = prefix,}, files);
311
    // clang-format on
312
327
    if (resp.status.code == ErrorCode::OK) {
313
327
        for (auto&& file : *files) {
314
280
            file.file_name.erase(0, prefix.size());
315
280
        }
316
327
    }
317
318
327
    return {resp.status.code, std::move(resp.status.msg)};
319
327
}
320
321
0
Status S3FileSystem::rename_impl(const Path& orig_name, const Path& new_name) {
322
0
    return Status::NotSupported("S3FileSystem::rename_impl");
323
0
}
324
325
748
Status S3FileSystem::upload_impl(const Path& local_file, const Path& remote_file) {
326
748
    auto client = _client->get();
327
748
    CHECK_S3_CLIENT(client);
328
329
748
    auto key = DORIS_TRY(get_key(remote_file));
330
748
    auto start = std::chrono::steady_clock::now();
331
748
    FileWriterPtr obj_writer;
332
748
    RETURN_IF_ERROR(create_file_impl(key, &obj_writer, nullptr));
333
748
    FileReaderSPtr local_reader;
334
748
    RETURN_IF_ERROR(io::global_local_filesystem()->open_file(local_file, &local_reader));
335
748
    size_t local_buffer_size = config::s3_file_system_local_upload_buffer_size;
336
748
    std::unique_ptr<char[]> write_buffer =
337
748
            std::make_unique_for_overwrite<char[]>(local_buffer_size);
338
748
    size_t cur_read = 0;
339
1.49k
    while (cur_read < local_reader->size()) {
340
748
        size_t bytes_read = 0;
341
748
        RETURN_IF_ERROR(local_reader->read_at(
342
748
                cur_read, Slice {write_buffer.get(), local_buffer_size}, &bytes_read));
343
748
        RETURN_IF_ERROR(obj_writer->append({write_buffer.get(), bytes_read}));
344
748
        cur_read += bytes_read;
345
748
    }
346
748
    RETURN_IF_ERROR(obj_writer->close());
347
748
    auto duration = std::chrono::duration<float>(std::chrono::steady_clock::now() - start);
348
349
748
    auto size = local_reader->size();
350
748
    LOG(INFO) << "Upload " << local_file.native() << " to " << full_s3_path(key)
351
748
              << ", duration=" << duration.count() << ", bytes=" << size;
352
353
748
    return Status::OK();
354
748
}
355
356
Status S3FileSystem::batch_upload_impl(const std::vector<Path>& local_files,
357
0
                                       const std::vector<Path>& remote_files) {
358
0
    auto client = _client->get();
359
0
    CHECK_S3_CLIENT(client);
360
361
0
    if (local_files.size() != remote_files.size()) {
362
0
        return Status::InvalidArgument("local_files.size({}) != remote_files.size({})",
363
0
                                       local_files.size(), remote_files.size());
364
0
    }
365
366
0
    std::vector<FileWriterPtr> obj_writers(local_files.size());
367
368
0
    auto upload_task = [&, this](size_t idx) {
369
0
        SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
370
0
        const auto& local_file = local_files[idx];
371
0
        const auto& remote_file = remote_files[idx];
372
0
        auto& obj_writer = obj_writers[idx];
373
0
        auto key = DORIS_TRY(get_key(remote_file));
374
0
        LOG(INFO) << "Start to upload " << local_file.native() << " to " << full_s3_path(key);
375
0
        RETURN_IF_ERROR(create_file_impl(key, &obj_writer, nullptr));
376
0
        FileReaderSPtr local_reader;
377
0
        RETURN_IF_ERROR(io::global_local_filesystem()->open_file(local_file, &local_reader));
378
0
        size_t local_buffer_size = config::s3_file_system_local_upload_buffer_size;
379
0
        std::unique_ptr<char[]> write_buffer =
380
0
                std::make_unique_for_overwrite<char[]>(local_buffer_size);
381
0
        size_t cur_read = 0;
382
0
        while (cur_read < local_reader->size()) {
383
0
            size_t bytes_read = 0;
384
0
            RETURN_IF_ERROR(local_reader->read_at(
385
0
                    cur_read, Slice {write_buffer.get(), local_buffer_size}, &bytes_read));
386
0
            RETURN_IF_ERROR((*obj_writer).append({write_buffer.get(), bytes_read}));
387
0
            cur_read += bytes_read;
388
0
        }
389
0
        RETURN_IF_ERROR((*obj_writer).close());
390
0
        return Status::OK();
391
0
    };
392
393
0
    Status s = Status::OK();
394
0
    std::vector<std::future<Status>> futures;
395
0
    for (int i = 0; i < local_files.size(); ++i) {
396
0
        auto task = std::make_shared<std::packaged_task<Status(size_t idx)>>(upload_task);
397
0
        futures.emplace_back(task->get_future());
398
0
        auto st = ExecEnv::GetInstance()->s3_file_system_thread_pool()->submit_func(
399
0
                [t = std::move(task), idx = i]() mutable { (*t)(idx); });
400
        // We shouldn't return immediately since the previous submitted tasks might still be running in the thread pool
401
0
        if (!st.ok()) {
402
0
            s = st;
403
0
            break;
404
0
        }
405
0
    }
406
0
    for (auto&& f : futures) {
407
0
        auto cur_s = f.get();
408
0
        if (!cur_s.ok()) {
409
0
            s = std::move(cur_s);
410
0
        }
411
0
    }
412
0
    return s;
413
0
}
414
415
266
Status S3FileSystem::download_impl(const Path& remote_file, const Path& local_file) {
416
266
    auto client = _client->get();
417
266
    CHECK_S3_CLIENT(client);
418
266
    auto key = DORIS_TRY(get_key(remote_file));
419
266
    int64_t size;
420
266
    RETURN_IF_ERROR(file_size(remote_file, &size));
421
266
    std::unique_ptr<char[]> buf = std::make_unique_for_overwrite<char[]>(size);
422
266
    size_t bytes_read = 0;
423
    // clang-format off
424
266
    auto resp = client->get_object( {.bucket = _bucket, .key = key,},
425
266
            buf.get(), 0, size, &bytes_read);
426
    // clang-format on
427
266
    if (resp.status.code != ErrorCode::OK) {
428
0
        return {resp.status.code, std::move(resp.status.msg)};
429
0
    }
430
266
    Aws::OFStream local_file_s;
431
266
    local_file_s.open(local_file, std::ios::out | std::ios::binary);
432
266
    if (local_file_s.good()) {
433
266
        local_file_s << StringViewStream(buf.get(), size).rdbuf();
434
266
    } else {
435
0
        return localfs_error(errno, fmt::format("failed to write file {}", local_file.native()));
436
0
    }
437
438
266
    return Status::OK();
439
266
}
440
441
// oss has public endpoint and private endpoint, is_public_endpoint determines
442
// whether to return a public endpoint.
443
std::string S3FileSystem::generate_presigned_url(const Path& path, int64_t expiration_secs,
444
566
                                                 bool is_public_endpoint) const {
445
566
    std::string key = fmt::format("{}/{}", _prefix, path.native());
446
566
    std::shared_ptr<ObjStorageClient> client;
447
566
    if (is_public_endpoint &&
448
566
        _client->s3_client_conf().endpoint.ends_with(OSS_PRIVATE_ENDPOINT_SUFFIX)) {
449
566
        auto new_s3_conf = _client->s3_client_conf();
450
566
        new_s3_conf.endpoint.erase(
451
566
                _client->s3_client_conf().endpoint.size() - OSS_PRIVATE_ENDPOINT_SUFFIX.size(),
452
566
                LEN_OF_OSS_PRIVATE_SUFFIX);
453
566
        client = S3ClientFactory::instance().create(new_s3_conf);
454
566
    } else {
455
0
        client = _client->get();
456
0
    }
457
566
    return client->generate_presigned_url({.bucket = _bucket, .key = key}, expiration_secs,
458
566
                                          _client->s3_client_conf());
459
566
}
460
461
} // namespace doris::io