/root/doris/be/src/util/s3_util.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <aws/core/Aws.h> |
21 | | #include <aws/core/client/ClientConfiguration.h> |
22 | | #include <aws/s3/S3Errors.h> |
23 | | #include <bvar/bvar.h> |
24 | | #include <fmt/format.h> |
25 | | #include <gen_cpp/AgentService_types.h> |
26 | | #include <gen_cpp/cloud.pb.h> |
27 | | |
28 | | #include <map> |
29 | | #include <memory> |
30 | | #include <mutex> |
31 | | #include <string> |
32 | | #include <unordered_map> |
33 | | |
34 | | #include "common/status.h" |
35 | | #include "cpp/s3_rate_limiter.h" |
36 | | #include "io/fs/obj_storage_client.h" |
37 | | #include "vec/common/string_ref.h" |
38 | | |
39 | | namespace Aws::S3 { |
40 | | class S3Client; |
41 | | } // namespace Aws::S3 |
42 | | |
43 | | namespace bvar { |
44 | | template <typename T> |
45 | | class Adder; |
46 | | } |
47 | | |
48 | | namespace doris { |
49 | | |
50 | | namespace s3_bvar { |
51 | | extern bvar::LatencyRecorder s3_get_latency; |
52 | | extern bvar::LatencyRecorder s3_put_latency; |
53 | | extern bvar::LatencyRecorder s3_delete_object_latency; |
54 | | extern bvar::LatencyRecorder s3_delete_objects_latency; |
55 | | extern bvar::LatencyRecorder s3_head_latency; |
56 | | extern bvar::LatencyRecorder s3_multi_part_upload_latency; |
57 | | extern bvar::LatencyRecorder s3_list_latency; |
58 | | extern bvar::LatencyRecorder s3_list_object_versions_latency; |
59 | | extern bvar::LatencyRecorder s3_get_bucket_version_latency; |
60 | | extern bvar::LatencyRecorder s3_copy_object_latency; |
61 | | }; // namespace s3_bvar |
62 | | |
63 | | class S3URI; |
64 | | |
65 | | struct S3ClientConf { |
66 | | std::string endpoint; |
67 | | std::string region; |
68 | | std::string ak; |
69 | | std::string sk; |
70 | | std::string token; |
71 | | // For azure we'd better support the bucket at the first time init azure blob container client |
72 | | std::string bucket; |
73 | | io::ObjStorageType provider = io::ObjStorageType::AWS; |
74 | | int max_connections = -1; |
75 | | int request_timeout_ms = -1; |
76 | | int connect_timeout_ms = -1; |
77 | | bool use_virtual_addressing = true; |
78 | | |
79 | 4 | uint64_t get_hash() const { |
80 | 4 | uint64_t hash_code = 0; |
81 | 4 | hash_code ^= crc32_hash(ak); |
82 | 4 | hash_code ^= crc32_hash(sk); |
83 | 4 | hash_code ^= crc32_hash(token); |
84 | 4 | hash_code ^= crc32_hash(endpoint); |
85 | 4 | hash_code ^= crc32_hash(region); |
86 | 4 | hash_code ^= crc32_hash(bucket); |
87 | 4 | hash_code ^= max_connections; |
88 | 4 | hash_code ^= request_timeout_ms; |
89 | 4 | hash_code ^= connect_timeout_ms; |
90 | 4 | hash_code ^= use_virtual_addressing; |
91 | 4 | hash_code ^= static_cast<int>(provider); |
92 | 4 | return hash_code; |
93 | 4 | } |
94 | | |
95 | 2 | std::string to_string() const { |
96 | 2 | return fmt::format( |
97 | 2 | "(ak={}, token={}, endpoint={}, region={}, bucket={}, max_connections={}, " |
98 | 2 | "request_timeout_ms={}, connect_timeout_ms={}, use_virtual_addressing={}", |
99 | 2 | ak, token, endpoint, region, bucket, max_connections, request_timeout_ms, |
100 | 2 | connect_timeout_ms, use_virtual_addressing); |
101 | 2 | } |
102 | | }; |
103 | | |
104 | | struct S3Conf { |
105 | | std::string bucket; |
106 | | std::string prefix; |
107 | | S3ClientConf client_conf; |
108 | | |
109 | | bool sse_enabled = false; |
110 | | static S3Conf get_s3_conf(const cloud::ObjectStoreInfoPB&); |
111 | | static S3Conf get_s3_conf(const TS3StorageParam&); |
112 | | |
113 | 1 | std::string to_string() const { |
114 | 1 | return fmt::format("(bucket={}, prefix={}, client_conf={}, sse_enabled={})", bucket, prefix, |
115 | 1 | client_conf.to_string(), sse_enabled); |
116 | 1 | } |
117 | | }; |
118 | | |
119 | | class S3ClientFactory { |
120 | | public: |
121 | | ~S3ClientFactory(); |
122 | | |
123 | | static S3ClientFactory& instance(); |
124 | | |
125 | | std::shared_ptr<io::ObjStorageClient> create(const S3ClientConf& s3_conf); |
126 | | |
127 | | static Status convert_properties_to_s3_conf(const std::map<std::string, std::string>& prop, |
128 | | const S3URI& s3_uri, S3Conf* s3_conf); |
129 | | |
130 | 1 | static Aws::Client::ClientConfiguration& getClientConfiguration() { |
131 | | // The default constructor of ClientConfiguration will do some http call |
132 | | // such as Aws::Internal::GetEC2MetadataClient and other init operation, |
133 | | // which is unnecessary. |
134 | | // So here we use a static instance, and deep copy every time |
135 | | // to avoid unnecessary operations. |
136 | 1 | static Aws::Client::ClientConfiguration instance; |
137 | 1 | return instance; |
138 | 1 | } |
139 | | |
140 | | S3RateLimiterHolder* rate_limiter(S3RateLimitType type); |
141 | | |
142 | | private: |
143 | | std::shared_ptr<io::ObjStorageClient> _create_s3_client(const S3ClientConf& s3_conf); |
144 | | std::shared_ptr<io::ObjStorageClient> _create_azure_client(const S3ClientConf& s3_conf); |
145 | | S3ClientFactory(); |
146 | | static std::string get_valid_ca_cert_path(); |
147 | | |
148 | | Aws::SDKOptions _aws_options; |
149 | | std::mutex _lock; |
150 | | std::unordered_map<uint64_t, std::shared_ptr<io::ObjStorageClient>> _cache; |
151 | | std::string _ca_cert_file_path; |
152 | | std::array<std::unique_ptr<S3RateLimiterHolder>, 2> _rate_limiters; |
153 | | }; |
154 | | |
155 | | } // end namespace doris |