be/src/io/fs/s3_file_system.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <filesystem> |
21 | | #include <memory> |
22 | | #include <shared_mutex> |
23 | | #include <string> |
24 | | #include <utility> |
25 | | #include <vector> |
26 | | |
27 | | #include "common/status.h" |
28 | | #include "io/fs/file_reader_writer_fwd.h" |
29 | | #include "io/fs/path.h" |
30 | | #include "io/fs/remote_file_system.h" |
31 | | #include "util/s3_util.h" |
32 | | |
33 | | namespace Aws::S3 { |
34 | | class S3Client; |
35 | | } // namespace Aws::S3 |
36 | | namespace Aws::Utils::Threading { |
37 | | class PooledThreadExecutor; |
38 | | } // namespace Aws::Utils::Threading |
39 | | |
40 | | namespace doris::io { |
41 | | class ObjStorageClient; |
42 | | // In runtime, AK and SK may be modified, and the original `S3Client` instance will be replaced. |
43 | | // The `S3FileReader` cached by the `Segment` must hold a shared `ObjClientHolder` in order to |
44 | | // access S3 data with latest AK SK. |
45 | | class ObjClientHolder { |
46 | | public: |
47 | | explicit ObjClientHolder(S3ClientConf conf); |
48 | | ~ObjClientHolder(); |
49 | | |
50 | | Status init(); |
51 | | |
52 | | // Update s3 conf and reset client if `conf` is different. This method is threadsafe. |
53 | | Status reset(const S3ClientConf& conf); |
54 | | |
55 | 299k | std::shared_ptr<ObjStorageClient> get() const { |
56 | 299k | std::shared_lock lock(_mtx); |
57 | 299k | return _client; |
58 | 299k | } |
59 | | |
60 | | Result<int64_t> object_file_size(const std::string& bucket, const std::string& key) const; |
61 | | |
62 | | // For error msg |
63 | | std::string full_s3_path(std::string_view bucket, std::string_view key) const; |
64 | | |
65 | 2.12k | const S3ClientConf& s3_client_conf() { return _conf; } |
66 | | |
67 | | private: |
68 | | mutable std::shared_mutex _mtx; |
69 | | std::shared_ptr<ObjStorageClient> _client; |
70 | | S3ClientConf _conf; |
71 | | }; |
72 | | |
73 | | // File system for S3 compatible object storage |
74 | | // When creating S3FileSystem, all required info should be set in S3Conf, |
75 | | // such as ak, sk, region, endpoint, bucket. |
76 | | // And the root_path of S3FileSystem is s3_conf.prefix. |
77 | | // When using S3FileSystem, it accepts 2 kinds of path: |
78 | | // 1. Full path: s3://bucket/path/to/file.txt |
79 | | // In this case, the root_path is not used. |
80 | | // 2. only key: path/to/file.txt |
81 | | // In this case, the final key will be "prefix + path/to/file.txt" |
82 | | class S3FileSystem final : public RemoteFileSystem { |
83 | | public: |
84 | | static Result<std::shared_ptr<S3FileSystem>> create(S3Conf s3_conf, std::string id); |
85 | | |
86 | | ~S3FileSystem() override; |
87 | | |
88 | 125 | const std::shared_ptr<ObjClientHolder>& client_holder() const { return _client; } |
89 | | |
90 | | const std::string& bucket() const { return _bucket; } |
91 | | const std::string& prefix() const { return _prefix; } |
92 | | |
93 | | std::string generate_presigned_url(const Path& path, int64_t expiration_secs, |
94 | | bool is_public_endpoint) const; |
95 | | |
96 | | protected: |
97 | | Status create_file_impl(const Path& file, FileWriterPtr* writer, |
98 | | const FileWriterOptions* opts) override; |
99 | | Status open_file_internal(const Path& file, FileReaderSPtr* reader, |
100 | | const FileReaderOptions& opts) override; |
101 | | Status create_directory_impl(const Path& dir, bool failed_if_exists = false) override; |
102 | | Status delete_file_impl(const Path& file) override; |
103 | | Status delete_directory_impl(const Path& dir) override; |
104 | | Status batch_delete_impl(const std::vector<Path>& files) override; |
105 | | Status exists_impl(const Path& path, bool* res) const override; |
106 | | Status file_size_impl(const Path& file, int64_t* file_size) const override; |
107 | | Status list_impl(const Path& dir, bool only_file, std::vector<FileInfo>* files, |
108 | | bool* exists) override; |
109 | | Status rename_impl(const Path& orig_name, const Path& new_name) override; |
110 | | |
111 | | Status upload_impl(const Path& local_file, const Path& remote_file) override; |
112 | | Status batch_upload_impl(const std::vector<Path>& local_files, |
113 | | const std::vector<Path>& remote_files) override; |
114 | | Status download_impl(const Path& remote_file, const Path& local_file) override; |
115 | | |
116 | 1.84M | Status absolute_path(const Path& path, Path& abs_path) const override { |
117 | 1.84M | if (path.string().find("://") != std::string::npos) { |
118 | | // the path is with schema, which means this is a full path like: |
119 | | // s3://bucket/path/to/file.txt |
120 | | // so no need to concat with prefix |
121 | 3.01k | abs_path = path; |
122 | 1.84M | } else { |
123 | | // path with no schema |
124 | 1.84M | abs_path = _prefix / path; |
125 | 1.84M | } |
126 | 1.84M | return Status::OK(); |
127 | 1.84M | } |
128 | | |
129 | | private: |
130 | | S3FileSystem(S3Conf s3_conf, std::string id); |
131 | | |
132 | | Status init(); |
133 | | |
134 | | // For error msg |
135 | | std::string full_s3_path(std::string_view key) const; |
136 | | |
137 | | std::string _bucket; |
138 | | std::string _prefix; |
139 | | std::shared_ptr<ObjClientHolder> _client; |
140 | | }; |
141 | | |
142 | | } // namespace doris::io |