Coverage Report

Created: 2026-03-12 14:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/s3_file_system.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <filesystem>
21
#include <memory>
22
#include <shared_mutex>
23
#include <string>
24
#include <utility>
25
#include <vector>
26
27
#include "common/status.h"
28
#include "io/fs/file_reader_writer_fwd.h"
29
#include "io/fs/path.h"
30
#include "io/fs/remote_file_system.h"
31
#include "util/s3_util.h"
32
33
namespace Aws::S3 {
34
class S3Client;
35
} // namespace Aws::S3
36
namespace Aws::Utils::Threading {
37
class PooledThreadExecutor;
38
} // namespace Aws::Utils::Threading
39
40
namespace doris::io {
41
class ObjStorageClient;
42
// In runtime, AK and SK may be modified, and the original `S3Client` instance will be replaced.
43
// The `S3FileReader` cached by the `Segment` must hold a shared `ObjClientHolder` in order to
44
// access S3 data with latest AK SK.
45
class ObjClientHolder {
46
public:
47
    explicit ObjClientHolder(S3ClientConf conf);
48
    ~ObjClientHolder();
49
50
    Status init();
51
52
    // Update s3 conf and reset client if `conf` is different. This method is threadsafe.
53
    Status reset(const S3ClientConf& conf);
54
55
299k
    std::shared_ptr<ObjStorageClient> get() const {
56
299k
        std::shared_lock lock(_mtx);
57
299k
        return _client;
58
299k
    }
59
60
    Result<int64_t> object_file_size(const std::string& bucket, const std::string& key) const;
61
62
    // For error msg
63
    std::string full_s3_path(std::string_view bucket, std::string_view key) const;
64
65
2.12k
    const S3ClientConf& s3_client_conf() { return _conf; }
66
67
private:
68
    mutable std::shared_mutex _mtx;
69
    std::shared_ptr<ObjStorageClient> _client;
70
    S3ClientConf _conf;
71
};
72
73
// File system for S3 compatible object storage
74
// When creating S3FileSystem, all required info should be set in S3Conf,
75
// such as ak, sk, region, endpoint, bucket.
76
// And the root_path of S3FileSystem is s3_conf.prefix.
77
// When using S3FileSystem, it accepts 2 kinds of path:
78
//  1. Full path: s3://bucket/path/to/file.txt
79
//      In this case, the root_path is not used.
80
//  2. only key: path/to/file.txt
81
//      In this case, the final key will be "prefix + path/to/file.txt"
82
class S3FileSystem final : public RemoteFileSystem {
83
public:
84
    static Result<std::shared_ptr<S3FileSystem>> create(S3Conf s3_conf, std::string id);
85
86
    ~S3FileSystem() override;
87
88
125
    const std::shared_ptr<ObjClientHolder>& client_holder() const { return _client; }
89
90
    const std::string& bucket() const { return _bucket; }
91
    const std::string& prefix() const { return _prefix; }
92
93
    std::string generate_presigned_url(const Path& path, int64_t expiration_secs,
94
                                       bool is_public_endpoint) const;
95
96
protected:
97
    Status create_file_impl(const Path& file, FileWriterPtr* writer,
98
                            const FileWriterOptions* opts) override;
99
    Status open_file_internal(const Path& file, FileReaderSPtr* reader,
100
                              const FileReaderOptions& opts) override;
101
    Status create_directory_impl(const Path& dir, bool failed_if_exists = false) override;
102
    Status delete_file_impl(const Path& file) override;
103
    Status delete_directory_impl(const Path& dir) override;
104
    Status batch_delete_impl(const std::vector<Path>& files) override;
105
    Status exists_impl(const Path& path, bool* res) const override;
106
    Status file_size_impl(const Path& file, int64_t* file_size) const override;
107
    Status list_impl(const Path& dir, bool only_file, std::vector<FileInfo>* files,
108
                     bool* exists) override;
109
    Status rename_impl(const Path& orig_name, const Path& new_name) override;
110
111
    Status upload_impl(const Path& local_file, const Path& remote_file) override;
112
    Status batch_upload_impl(const std::vector<Path>& local_files,
113
                             const std::vector<Path>& remote_files) override;
114
    Status download_impl(const Path& remote_file, const Path& local_file) override;
115
116
1.84M
    Status absolute_path(const Path& path, Path& abs_path) const override {
117
1.84M
        if (path.string().find("://") != std::string::npos) {
118
            // the path is with schema, which means this is a full path like:
119
            // s3://bucket/path/to/file.txt
120
            // so no need to concat with prefix
121
3.01k
            abs_path = path;
122
1.84M
        } else {
123
            // path with no schema
124
1.84M
            abs_path = _prefix / path;
125
1.84M
        }
126
1.84M
        return Status::OK();
127
1.84M
    }
128
129
private:
130
    S3FileSystem(S3Conf s3_conf, std::string id);
131
132
    Status init();
133
134
    // For error msg
135
    std::string full_s3_path(std::string_view key) const;
136
137
    std::string _bucket;
138
    std::string _prefix;
139
    std::shared_ptr<ObjClientHolder> _client;
140
};
141
142
} // namespace doris::io