Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "util/s3_util.h" |
19 | | |
20 | | #include <aws/core/auth/AWSAuthSigner.h> |
21 | | #include <aws/core/auth/AWSCredentials.h> |
22 | | #include <aws/core/auth/AWSCredentialsProviderChain.h> |
23 | | #include <aws/core/auth/STSCredentialsProvider.h> |
24 | | #include <aws/core/client/DefaultRetryStrategy.h> |
25 | | #include <aws/core/platform/Environment.h> |
26 | | #include <aws/core/utils/logging/LogLevel.h> |
27 | | #include <aws/core/utils/logging/LogSystemInterface.h> |
28 | | #include <aws/core/utils/memory/stl/AWSStringStream.h> |
29 | | #include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h> |
30 | | #include <aws/s3/S3Client.h> |
31 | | #include <aws/sts/STSClient.h> |
32 | | #include <bvar/reducer.h> |
33 | | #include <cpp/s3_rate_limiter.h> |
34 | | |
35 | | #include <atomic> |
36 | | |
37 | | #include "util/string_util.h" |
38 | | |
39 | | #ifdef USE_AZURE |
40 | | #include <azure/core/diagnostics/logger.hpp> |
41 | | #include <azure/core/http/curl_transport.hpp> |
42 | | #include <azure/storage/blobs/blob_container_client.hpp> |
43 | | #endif |
44 | | #include <cstdlib> |
45 | | #include <filesystem> |
46 | | #include <fstream> |
47 | | #include <functional> |
48 | | #include <memory> |
49 | | #include <ostream> |
50 | | #include <utility> |
51 | | |
52 | | #include "common/config.h" |
53 | | #include "common/logging.h" |
54 | | #include "common/status.h" |
55 | | #include "cpp/aws_logger.h" |
56 | | #include "cpp/custom_aws_credentials_provider_chain.h" |
57 | | #include "cpp/obj_retry_strategy.h" |
58 | | #include "cpp/sync_point.h" |
59 | | #include "cpp/util.h" |
60 | | #ifdef USE_AZURE |
61 | | #include "io/fs/azure_obj_storage_client.h" |
62 | | #endif |
63 | | #include "exec/scan/scanner_scheduler.h" |
64 | | #include "io/fs/obj_storage_client.h" |
65 | | #include "io/fs/s3_obj_storage_client.h" |
66 | | #include "runtime/exec_env.h" |
67 | | #include "util/s3_uri.h" |
68 | | |
69 | | namespace doris { |
70 | | namespace s3_bvar { |
71 | | bvar::LatencyRecorder s3_get_latency("s3_get"); |
72 | | bvar::LatencyRecorder s3_put_latency("s3_put"); |
73 | | bvar::LatencyRecorder s3_delete_object_latency("s3_delete_object"); |
74 | | bvar::LatencyRecorder s3_delete_objects_latency("s3_delete_objects"); |
75 | | bvar::LatencyRecorder s3_head_latency("s3_head"); |
76 | | bvar::LatencyRecorder s3_multi_part_upload_latency("s3_multi_part_upload"); |
77 | | bvar::LatencyRecorder s3_list_latency("s3_list"); |
78 | | bvar::LatencyRecorder s3_list_object_versions_latency("s3_list_object_versions"); |
79 | | bvar::LatencyRecorder s3_get_bucket_version_latency("s3_get_bucket_version"); |
80 | | bvar::LatencyRecorder s3_copy_object_latency("s3_copy_object"); |
81 | | }; // namespace s3_bvar |
82 | | |
83 | | namespace { |
84 | | |
85 | 3.01k | doris::Status is_s3_conf_valid(const S3ClientConf& conf) { |
86 | 3.01k | if (conf.endpoint.empty()) { |
87 | 0 | return Status::InvalidArgument<false>("Invalid s3 conf, empty endpoint"); |
88 | 0 | } |
89 | 3.01k | if (conf.region.empty()) { |
90 | 0 | return Status::InvalidArgument<false>("Invalid s3 conf, empty region"); |
91 | 0 | } |
92 | | |
93 | 3.01k | if (conf.role_arn.empty()) { |
94 | | // Allow anonymous access when both ak and sk are empty |
95 | 3.00k | bool hasAk = !conf.ak.empty(); |
96 | 3.00k | bool hasSk = !conf.sk.empty(); |
97 | | |
98 | | // Either both credentials are provided or both are empty (anonymous access) |
99 | 3.00k | if (hasAk && conf.sk.empty()) { |
100 | 1 | return Status::InvalidArgument<false>("Invalid s3 conf, empty sk"); |
101 | 1 | } |
102 | 3.00k | if (hasSk && conf.ak.empty()) { |
103 | 8 | return Status::InvalidArgument<false>("Invalid s3 conf, empty ak"); |
104 | 8 | } |
105 | 3.00k | } |
106 | 3.00k | return Status::OK(); |
107 | 3.01k | } |
108 | | |
109 | | // Return true is convert `str` to int successfully |
110 | 3.63k | bool to_int(std::string_view str, int& res) { |
111 | 3.63k | auto [_, ec] = std::from_chars(str.data(), str.data() + str.size(), res); |
112 | 3.63k | return ec == std::errc {}; |
113 | 3.63k | } |
114 | | |
115 | | #ifdef USE_AZURE |
116 | 0 | std::string env_or_empty(const char* env_name) { |
117 | 0 | if (const char* value = std::getenv(env_name); value != nullptr) { |
118 | 0 | return value; |
119 | 0 | } |
120 | 0 | return ""; |
121 | 0 | } |
122 | | |
123 | 0 | std::string build_azure_tls_debug_context(const std::string& selected_ca_file) { |
124 | 0 | bool selected_ca_exists = false; |
125 | 0 | bool selected_ca_readable = false; |
126 | 0 | if (!selected_ca_file.empty()) { |
127 | 0 | std::error_code ec; |
128 | 0 | selected_ca_exists = std::filesystem::exists(selected_ca_file, ec) && !ec; |
129 | 0 | std::ifstream input(selected_ca_file); |
130 | 0 | selected_ca_readable = input.good(); |
131 | 0 | } |
132 | |
|
133 | 0 | return fmt::format( |
134 | 0 | "tls_debug(ca_cert_file_paths='{}', selected_ca_file='{}', selected_ca_exists={}, " |
135 | 0 | "selected_ca_readable={}, SSL_CERT_FILE='{}', CURL_CA_BUNDLE='{}', SSL_CERT_DIR='{}')", |
136 | 0 | config::ca_cert_file_paths, selected_ca_file, selected_ca_exists, selected_ca_readable, |
137 | 0 | env_or_empty("SSL_CERT_FILE"), env_or_empty("CURL_CA_BUNDLE"), |
138 | 0 | env_or_empty("SSL_CERT_DIR")); |
139 | 0 | } |
140 | | #endif |
141 | | |
142 | | constexpr char USE_PATH_STYLE[] = "use_path_style"; |
143 | | |
144 | | constexpr char AZURE_PROVIDER_STRING[] = "AZURE"; |
145 | | constexpr char S3_PROVIDER[] = "provider"; |
146 | | constexpr char S3_AK[] = "AWS_ACCESS_KEY"; |
147 | | constexpr char S3_SK[] = "AWS_SECRET_KEY"; |
148 | | constexpr char S3_ENDPOINT[] = "AWS_ENDPOINT"; |
149 | | constexpr char S3_REGION[] = "AWS_REGION"; |
150 | | constexpr char S3_TOKEN[] = "AWS_TOKEN"; |
151 | | constexpr char S3_MAX_CONN_SIZE[] = "AWS_MAX_CONNECTIONS"; |
152 | | constexpr char S3_REQUEST_TIMEOUT_MS[] = "AWS_REQUEST_TIMEOUT_MS"; |
153 | | constexpr char S3_CONN_TIMEOUT_MS[] = "AWS_CONNECTION_TIMEOUT_MS"; |
154 | | constexpr char S3_NEED_OVERRIDE_ENDPOINT[] = "AWS_NEED_OVERRIDE_ENDPOINT"; |
155 | | |
156 | | constexpr char S3_ROLE_ARN[] = "AWS_ROLE_ARN"; |
157 | | constexpr char S3_EXTERNAL_ID[] = "AWS_EXTERNAL_ID"; |
158 | | constexpr char S3_CREDENTIALS_PROVIDER_TYPE[] = "AWS_CREDENTIALS_PROVIDER_TYPE"; |
159 | | } // namespace |
160 | | |
161 | | bvar::Adder<int64_t> get_rate_limit_ns("get_rate_limit_ns"); |
162 | | bvar::Adder<int64_t> get_rate_limit_exceed_req_num("get_rate_limit_exceed_req_num"); |
163 | | bvar::Adder<int64_t> put_rate_limit_ns("put_rate_limit_ns"); |
164 | | bvar::Adder<int64_t> put_rate_limit_exceed_req_num("put_rate_limit_exceed_req_num"); |
165 | | |
166 | | static std::atomic<int64_t> last_s3_get_token_bucket_tokens {0}; |
167 | | static std::atomic<int64_t> last_s3_get_token_limit {0}; |
168 | | static std::atomic<int64_t> last_s3_get_token_per_second {0}; |
169 | | static std::atomic<int64_t> last_s3_put_token_per_second {0}; |
170 | | static std::atomic<int64_t> last_s3_put_token_bucket_tokens {0}; |
171 | | static std::atomic<int64_t> last_s3_put_token_limit {0}; |
172 | | |
173 | | static std::atomic<bool> updating_get_limiter {false}; |
174 | | static std::atomic<bool> updating_put_limiter {false}; |
175 | | |
176 | 12 | S3RateLimiterHolder* S3ClientFactory::rate_limiter(S3RateLimitType type) { |
177 | 12 | CHECK(type == S3RateLimitType::GET || type == S3RateLimitType::PUT) << to_string(type); |
178 | 12 | return _rate_limiters[static_cast<size_t>(type)].get(); |
179 | 12 | } |
180 | | |
181 | | template <S3RateLimitType LimiterType> |
182 | | void update_rate_limiter_if_changed(int64_t current_tps, int64_t current_bucket, |
183 | | int64_t current_limit, std::atomic<int64_t>& last_tps, |
184 | | std::atomic<int64_t>& last_bucket, |
185 | | std::atomic<int64_t>& last_limit, |
186 | 3.54k | std::atomic<bool>& updating_flag, const char* limiter_name) { |
187 | 3.54k | if (last_tps.load(std::memory_order_relaxed) != current_tps || |
188 | 3.54k | last_bucket.load(std::memory_order_relaxed) != current_bucket || |
189 | 3.54k | last_limit.load(std::memory_order_relaxed) != current_limit) { |
190 | 12 | bool expected = false; |
191 | 12 | if (!updating_flag.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { |
192 | 0 | return; |
193 | 0 | } |
194 | 12 | if (last_tps.load(std::memory_order_acquire) != current_tps || |
195 | 12 | last_bucket.load(std::memory_order_acquire) != current_bucket || |
196 | 12 | last_limit.load(std::memory_order_acquire) != current_limit) { |
197 | 12 | int ret = |
198 | 12 | reset_s3_rate_limiter(LimiterType, current_tps, current_bucket, current_limit); |
199 | | |
200 | 12 | if (ret == 0) { |
201 | 12 | last_tps.store(current_tps, std::memory_order_release); |
202 | 12 | last_bucket.store(current_bucket, std::memory_order_release); |
203 | 12 | last_limit.store(current_limit, std::memory_order_release); |
204 | 12 | } else { |
205 | 0 | LOG(WARNING) << "Failed to reset S3 " << limiter_name |
206 | 0 | << " rate limiter, error code: " << ret; |
207 | 0 | } |
208 | 12 | } |
209 | | |
210 | 12 | updating_flag.store(false, std::memory_order_release); |
211 | 12 | } |
212 | 3.54k | } _ZN5doris30update_rate_limiter_if_changedILNS_15S3RateLimitTypeE0EEEvlllRSt6atomicIlES4_S4_RS2_IbEPKc Line | Count | Source | 186 | 1.77k | std::atomic<bool>& updating_flag, const char* limiter_name) { | 187 | 1.77k | if (last_tps.load(std::memory_order_relaxed) != current_tps || | 188 | 1.77k | last_bucket.load(std::memory_order_relaxed) != current_bucket || | 189 | 1.77k | last_limit.load(std::memory_order_relaxed) != current_limit) { | 190 | 6 | bool expected = false; | 191 | 6 | if (!updating_flag.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { | 192 | 0 | return; | 193 | 0 | } | 194 | 6 | if (last_tps.load(std::memory_order_acquire) != current_tps || | 195 | 6 | last_bucket.load(std::memory_order_acquire) != current_bucket || | 196 | 6 | last_limit.load(std::memory_order_acquire) != current_limit) { | 197 | 6 | int ret = | 198 | 6 | reset_s3_rate_limiter(LimiterType, current_tps, current_bucket, current_limit); | 199 | | | 200 | 6 | if (ret == 0) { | 201 | 6 | last_tps.store(current_tps, std::memory_order_release); | 202 | 6 | last_bucket.store(current_bucket, std::memory_order_release); | 203 | 6 | last_limit.store(current_limit, std::memory_order_release); | 204 | 6 | } else { | 205 | 0 | LOG(WARNING) << "Failed to reset S3 " << limiter_name | 206 | 0 | << " rate limiter, error code: " << ret; | 207 | 0 | } | 208 | 6 | } | 209 | | | 210 | 6 | updating_flag.store(false, std::memory_order_release); | 211 | 6 | } | 212 | 1.77k | } |
_ZN5doris30update_rate_limiter_if_changedILNS_15S3RateLimitTypeE1EEEvlllRSt6atomicIlES4_S4_RS2_IbEPKc Line | Count | Source | 186 | 1.77k | std::atomic<bool>& updating_flag, const char* limiter_name) { | 187 | 1.77k | if (last_tps.load(std::memory_order_relaxed) != current_tps || | 188 | 1.77k | last_bucket.load(std::memory_order_relaxed) != current_bucket || | 189 | 1.77k | last_limit.load(std::memory_order_relaxed) != current_limit) { | 190 | 6 | bool expected = false; | 191 | 6 | if (!updating_flag.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { | 192 | 0 | return; | 193 | 0 | } | 194 | 6 | if (last_tps.load(std::memory_order_acquire) != current_tps || | 195 | 6 | last_bucket.load(std::memory_order_acquire) != current_bucket || | 196 | 6 | last_limit.load(std::memory_order_acquire) != current_limit) { | 197 | 6 | int ret = | 198 | 6 | reset_s3_rate_limiter(LimiterType, current_tps, current_bucket, current_limit); | 199 | | | 200 | 6 | if (ret == 0) { | 201 | 6 | last_tps.store(current_tps, std::memory_order_release); | 202 | 6 | last_bucket.store(current_bucket, std::memory_order_release); | 203 | 6 | last_limit.store(current_limit, std::memory_order_release); | 204 | 6 | } else { | 205 | 0 | LOG(WARNING) << "Failed to reset S3 " << limiter_name | 206 | 0 | << " rate limiter, error code: " << ret; | 207 | 0 | } | 208 | 6 | } | 209 | | | 210 | 6 | updating_flag.store(false, std::memory_order_release); | 211 | 6 | } | 212 | 1.77k | } |
|
213 | | |
214 | 1.77k | void check_s3_rate_limiter_config_changed() { |
215 | 1.77k | update_rate_limiter_if_changed<S3RateLimitType::GET>( |
216 | 1.77k | config::s3_get_token_per_second, config::s3_get_bucket_tokens, |
217 | 1.77k | config::s3_get_token_limit, last_s3_get_token_per_second, |
218 | 1.77k | last_s3_get_token_bucket_tokens, last_s3_get_token_limit, updating_get_limiter, "GET"); |
219 | | |
220 | 1.77k | update_rate_limiter_if_changed<S3RateLimitType::PUT>( |
221 | 1.77k | config::s3_put_token_per_second, config::s3_put_bucket_tokens, |
222 | 1.77k | config::s3_put_token_limit, last_s3_put_token_per_second, |
223 | 1.77k | last_s3_put_token_bucket_tokens, last_s3_put_token_limit, updating_put_limiter, "PUT"); |
224 | 1.77k | } |
225 | | |
226 | 12 | int reset_s3_rate_limiter(S3RateLimitType type, size_t max_speed, size_t max_burst, size_t limit) { |
227 | 12 | if (type == S3RateLimitType::UNKNOWN) { |
228 | 0 | return -1; |
229 | 0 | } |
230 | 12 | return S3ClientFactory::instance().rate_limiter(type)->reset(max_speed, max_burst, limit); |
231 | 12 | } |
232 | | |
233 | 8 | S3ClientFactory::S3ClientFactory() { |
234 | 8 | _aws_options = Aws::SDKOptions {}; |
235 | 8 | auto logLevel = static_cast<Aws::Utils::Logging::LogLevel>(config::aws_log_level); |
236 | 8 | _aws_options.loggingOptions.logLevel = logLevel; |
237 | 8 | _aws_options.loggingOptions.logger_create_fn = [logLevel] { |
238 | 8 | return std::make_shared<DorisAWSLogger>(logLevel); |
239 | 8 | }; |
240 | 8 | Aws::InitAPI(_aws_options); |
241 | 8 | _ca_cert_file_path = get_valid_ca_cert_path(doris::split(config::ca_cert_file_paths, ";")); |
242 | 8 | _rate_limiters = { |
243 | 8 | std::make_unique<S3RateLimiterHolder>( |
244 | 8 | config::s3_get_token_per_second, config::s3_get_bucket_tokens, |
245 | 8 | config::s3_get_token_limit, |
246 | 8 | metric_func_factory(get_rate_limit_ns, get_rate_limit_exceed_req_num)), |
247 | 8 | std::make_unique<S3RateLimiterHolder>( |
248 | 8 | config::s3_put_token_per_second, config::s3_put_bucket_tokens, |
249 | 8 | config::s3_put_token_limit, |
250 | 8 | metric_func_factory(put_rate_limit_ns, put_rate_limit_exceed_req_num))}; |
251 | | |
252 | 8 | #ifdef USE_AZURE |
253 | 8 | auto azureLogLevel = |
254 | 8 | static_cast<Azure::Core::Diagnostics::Logger::Level>(config::azure_log_level); |
255 | 8 | Azure::Core::Diagnostics::Logger::SetLevel(azureLogLevel); |
256 | 8 | Azure::Core::Diagnostics::Logger::SetListener( |
257 | 8 | [&](Azure::Core::Diagnostics::Logger::Level level, const std::string& message) { |
258 | 0 | switch (level) { |
259 | 0 | case Azure::Core::Diagnostics::Logger::Level::Verbose: |
260 | 0 | LOG(INFO) << message; |
261 | 0 | break; |
262 | 0 | case Azure::Core::Diagnostics::Logger::Level::Informational: |
263 | 0 | LOG(INFO) << message; |
264 | 0 | break; |
265 | 0 | case Azure::Core::Diagnostics::Logger::Level::Warning: |
266 | 0 | LOG(WARNING) << message; |
267 | 0 | break; |
268 | 0 | case Azure::Core::Diagnostics::Logger::Level::Error: |
269 | 0 | LOG(ERROR) << message; |
270 | 0 | break; |
271 | 0 | default: |
272 | 0 | LOG(WARNING) << "Unknown level: " << static_cast<int>(level) |
273 | 0 | << ", message: " << message; |
274 | 0 | break; |
275 | 0 | } |
276 | 0 | }); |
277 | 8 | #endif |
278 | 8 | } |
279 | | |
280 | 4 | S3ClientFactory::~S3ClientFactory() { |
281 | 4 | Aws::ShutdownAPI(_aws_options); |
282 | 4 | } |
283 | | |
284 | 1.81k | S3ClientFactory& S3ClientFactory::instance() { |
285 | 1.81k | static S3ClientFactory ret; |
286 | 1.81k | return ret; |
287 | 1.81k | } |
288 | | |
289 | 1.77k | std::shared_ptr<io::ObjStorageClient> S3ClientFactory::create(const S3ClientConf& s3_conf) { |
290 | 1.77k | if (!is_s3_conf_valid(s3_conf).ok()) { |
291 | 7 | return nullptr; |
292 | 7 | } |
293 | | |
294 | 1.76k | check_s3_rate_limiter_config_changed(); |
295 | | |
296 | | #ifdef BE_TEST |
297 | | { |
298 | | std::lock_guard l(_lock); |
299 | | if (_test_client_creator) { |
300 | | return _test_client_creator(s3_conf); |
301 | | } |
302 | | } |
303 | | #endif |
304 | | |
305 | 1.76k | { |
306 | 1.76k | uint64_t hash = s3_conf.get_hash(); |
307 | 1.76k | std::lock_guard l(_lock); |
308 | 1.76k | auto it = _cache.find(hash); |
309 | 1.76k | if (it != _cache.end()) { |
310 | 1.75k | return it->second; |
311 | 1.75k | } |
312 | 1.76k | } |
313 | | |
314 | 19 | auto obj_client = (s3_conf.provider == io::ObjStorageType::AZURE) |
315 | 19 | ? _create_azure_client(s3_conf) |
316 | 19 | : _create_s3_client(s3_conf); |
317 | | |
318 | 19 | { |
319 | 19 | uint64_t hash = s3_conf.get_hash(); |
320 | 19 | std::lock_guard l(_lock); |
321 | 19 | _cache[hash] = obj_client; |
322 | 19 | } |
323 | 19 | return obj_client; |
324 | 1.76k | } |
325 | | |
326 | | #ifdef BE_TEST |
327 | | void S3ClientFactory::set_client_creator_for_test( |
328 | | std::function<std::shared_ptr<io::ObjStorageClient>(const S3ClientConf&)> creator) { |
329 | | std::lock_guard l(_lock); |
330 | | _test_client_creator = std::move(creator); |
331 | | } |
332 | | |
333 | | void S3ClientFactory::clear_client_creator_for_test() { |
334 | | std::lock_guard l(_lock); |
335 | | _test_client_creator = nullptr; |
336 | | } |
337 | | #endif |
338 | | |
339 | | std::shared_ptr<io::ObjStorageClient> S3ClientFactory::_create_azure_client( |
340 | 0 | const S3ClientConf& s3_conf) { |
341 | 0 | #ifdef USE_AZURE |
342 | 0 | auto cred = |
343 | 0 | std::make_shared<Azure::Storage::StorageSharedKeyCredential>(s3_conf.ak, s3_conf.sk); |
344 | |
|
345 | 0 | const std::string container_name = s3_conf.bucket; |
346 | 0 | std::string uri; |
347 | 0 | if (config::force_azure_blob_global_endpoint) { |
348 | 0 | uri = fmt::format("https://{}.blob.core.windows.net/{}", s3_conf.ak, container_name); |
349 | 0 | } else { |
350 | 0 | uri = fmt::format("{}/{}", s3_conf.endpoint, container_name); |
351 | 0 | if (s3_conf.endpoint.find("://") == std::string::npos) { |
352 | 0 | uri = "https://" + uri; |
353 | 0 | } |
354 | 0 | } |
355 | |
|
356 | 0 | Azure::Storage::Blobs::BlobClientOptions options; |
357 | 0 | options.Retry.StatusCodes.insert(Azure::Core::Http::HttpStatusCode::TooManyRequests); |
358 | 0 | options.Retry.MaxRetries = config::max_s3_client_retry; |
359 | 0 | options.PerRetryPolicies.emplace_back(std::make_unique<AzureRetryRecordPolicy>()); |
360 | 0 | if (_ca_cert_file_path.empty()) { |
361 | 0 | _ca_cert_file_path = get_valid_ca_cert_path(doris::split(config::ca_cert_file_paths, ";")); |
362 | 0 | } |
363 | 0 | if (!_ca_cert_file_path.empty()) { |
364 | 0 | Azure::Core::Http::CurlTransportOptions curl_options; |
365 | 0 | curl_options.CAInfo = _ca_cert_file_path; |
366 | 0 | options.Transport.Transport = |
367 | 0 | std::make_shared<Azure::Core::Http::CurlTransport>(std::move(curl_options)); |
368 | 0 | } |
369 | |
|
370 | 0 | std::string normalized_uri = normalize_http_uri(uri); |
371 | 0 | VLOG_DEBUG << "uri:" << uri << ", normalized_uri:" << normalized_uri; |
372 | 0 | std::string tls_debug_context = build_azure_tls_debug_context(_ca_cert_file_path); |
373 | |
|
374 | 0 | auto containerClient = std::make_shared<Azure::Storage::Blobs::BlobContainerClient>( |
375 | 0 | uri, cred, std::move(options)); |
376 | 0 | LOG_INFO("create one azure client with {}", s3_conf.to_string()); |
377 | 0 | return std::make_shared<io::AzureObjStorageClient>(std::move(containerClient), |
378 | 0 | std::move(tls_debug_context)); |
379 | | #else |
380 | | LOG_FATAL("BE is not compiled with azure support, export BUILD_AZURE=ON before building"); |
381 | | return nullptr; |
382 | | #endif |
383 | 0 | } |
384 | | |
385 | | std::shared_ptr<Aws::Auth::AWSCredentialsProvider> |
386 | 6 | S3ClientFactory::_get_aws_credentials_provider_v1(const S3ClientConf& s3_conf) { |
387 | 6 | if (!s3_conf.ak.empty() && !s3_conf.sk.empty()) { |
388 | 2 | Aws::Auth::AWSCredentials aws_cred(s3_conf.ak, s3_conf.sk); |
389 | 2 | DCHECK(!aws_cred.IsExpiredOrEmpty()); |
390 | 2 | if (!s3_conf.token.empty()) { |
391 | 0 | aws_cred.SetSessionToken(s3_conf.token); |
392 | 0 | } |
393 | 2 | return std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(std::move(aws_cred)); |
394 | 2 | } |
395 | | |
396 | 4 | if (s3_conf.cred_provider_type == CredProviderType::InstanceProfile) { |
397 | 2 | if (s3_conf.role_arn.empty()) { |
398 | 1 | return std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>(); |
399 | 1 | } |
400 | | |
401 | 1 | Aws::Client::ClientConfiguration clientConfiguration = |
402 | 1 | S3ClientFactory::getClientConfiguration(); |
403 | | |
404 | 1 | if (_ca_cert_file_path.empty()) { |
405 | 0 | _ca_cert_file_path = |
406 | 0 | get_valid_ca_cert_path(doris::split(config::ca_cert_file_paths, ";")); |
407 | 0 | } |
408 | 1 | if (!_ca_cert_file_path.empty()) { |
409 | 1 | clientConfiguration.caFile = _ca_cert_file_path; |
410 | 1 | } |
411 | | |
412 | 1 | auto stsClient = std::make_shared<Aws::STS::STSClient>( |
413 | 1 | std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>(), |
414 | 1 | clientConfiguration); |
415 | | |
416 | 1 | return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>( |
417 | 1 | s3_conf.role_arn, Aws::String(), s3_conf.external_id, |
418 | 1 | Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, stsClient); |
419 | 2 | } |
420 | | |
421 | | // Support anonymous access for public datasets when no credentials are provided |
422 | 2 | if (s3_conf.ak.empty() && s3_conf.sk.empty()) { |
423 | 2 | return std::make_shared<Aws::Auth::AnonymousAWSCredentialsProvider>(); |
424 | 2 | } |
425 | | |
426 | 0 | return std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>(); |
427 | 2 | } |
428 | | |
429 | | std::shared_ptr<Aws::Auth::AWSCredentialsProvider> S3ClientFactory::_create_credentials_provider( |
430 | 20 | CredProviderType type) { |
431 | 20 | switch (type) { |
432 | 2 | case CredProviderType::Env: |
433 | 2 | return std::make_shared<Aws::Auth::EnvironmentAWSCredentialsProvider>(); |
434 | 2 | case CredProviderType::SystemProperties: |
435 | 2 | return std::make_shared<Aws::Auth::ProfileConfigFileAWSCredentialsProvider>(); |
436 | 3 | case CredProviderType::WebIdentity: |
437 | 3 | return std::make_shared<Aws::Auth::STSAssumeRoleWebIdentityCredentialsProvider>(); |
438 | 2 | case CredProviderType::Container: |
439 | 2 | return std::make_shared<Aws::Auth::TaskRoleCredentialsProvider>( |
440 | 2 | Aws::Environment::GetEnv("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI").c_str()); |
441 | 4 | case CredProviderType::InstanceProfile: |
442 | 4 | return std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>(); |
443 | 4 | case CredProviderType::Anonymous: |
444 | 4 | return std::make_shared<Aws::Auth::AnonymousAWSCredentialsProvider>(); |
445 | 3 | case CredProviderType::Default: |
446 | 3 | default: |
447 | 3 | return std::make_shared<CustomAwsCredentialsProviderChain>(); |
448 | 20 | } |
449 | 20 | } |
450 | | |
451 | | std::shared_ptr<Aws::Auth::AWSCredentialsProvider> |
452 | 42 | S3ClientFactory::_get_aws_credentials_provider_v2(const S3ClientConf& s3_conf) { |
453 | 42 | if (!s3_conf.ak.empty() && !s3_conf.sk.empty()) { |
454 | 22 | Aws::Auth::AWSCredentials aws_cred(s3_conf.ak, s3_conf.sk); |
455 | 22 | DCHECK(!aws_cred.IsExpiredOrEmpty()); |
456 | 22 | if (!s3_conf.token.empty()) { |
457 | 3 | aws_cred.SetSessionToken(s3_conf.token); |
458 | 3 | } |
459 | 22 | return std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(std::move(aws_cred)); |
460 | 22 | } |
461 | | |
462 | | // Handle role_arn for assume role scenario |
463 | 20 | if (!s3_conf.role_arn.empty()) { |
464 | 8 | Aws::Client::ClientConfiguration clientConfiguration = |
465 | 8 | S3ClientFactory::getClientConfiguration(); |
466 | | |
467 | 8 | if (_ca_cert_file_path.empty()) { |
468 | 0 | _ca_cert_file_path = |
469 | 0 | get_valid_ca_cert_path(doris::split(config::ca_cert_file_paths, ";")); |
470 | 0 | } |
471 | 8 | if (!_ca_cert_file_path.empty()) { |
472 | 8 | clientConfiguration.caFile = _ca_cert_file_path; |
473 | 8 | } |
474 | | |
475 | 8 | auto baseProvider = _create_credentials_provider(s3_conf.cred_provider_type); |
476 | 8 | auto stsClient = std::make_shared<Aws::STS::STSClient>(baseProvider, clientConfiguration); |
477 | | |
478 | 8 | return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>( |
479 | 8 | s3_conf.role_arn, Aws::String(), s3_conf.external_id, |
480 | 8 | Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, stsClient); |
481 | 8 | } |
482 | | |
483 | | // Return provider based on cred_provider_type |
484 | 12 | return _create_credentials_provider(s3_conf.cred_provider_type); |
485 | 20 | } |
486 | | |
487 | | std::shared_ptr<Aws::Auth::AWSCredentialsProvider> S3ClientFactory::get_aws_credentials_provider( |
488 | 48 | const S3ClientConf& s3_conf) { |
489 | 48 | if (config::aws_credentials_provider_version == "v2") { |
490 | 42 | return _get_aws_credentials_provider_v2(s3_conf); |
491 | 42 | } |
492 | 6 | return _get_aws_credentials_provider_v1(s3_conf); |
493 | 48 | } |
494 | | |
495 | | std::shared_ptr<io::ObjStorageClient> S3ClientFactory::_create_s3_client( |
496 | 20 | const S3ClientConf& s3_conf) { |
497 | 20 | TEST_SYNC_POINT_RETURN_WITH_VALUE( |
498 | 20 | "s3_client_factory::create", |
499 | 20 | std::make_shared<io::S3ObjStorageClient>(std::make_shared<Aws::S3::S3Client>())); |
500 | 20 | Aws::Client::ClientConfiguration aws_config = S3ClientFactory::getClientConfiguration(); |
501 | 20 | if (s3_conf.need_override_endpoint) { |
502 | 20 | aws_config.endpointOverride = s3_conf.endpoint; |
503 | 20 | } |
504 | 20 | aws_config.region = s3_conf.region; |
505 | | |
506 | 20 | if (_ca_cert_file_path.empty()) { |
507 | 0 | _ca_cert_file_path = get_valid_ca_cert_path(doris::split(config::ca_cert_file_paths, ";")); |
508 | 0 | } |
509 | | |
510 | 20 | if (!_ca_cert_file_path.empty()) { |
511 | 20 | aws_config.caFile = _ca_cert_file_path; |
512 | 20 | } |
513 | | |
514 | 20 | if (s3_conf.max_connections > 0) { |
515 | 18 | aws_config.maxConnections = s3_conf.max_connections; |
516 | 18 | } else { |
517 | 2 | aws_config.maxConnections = 102400; |
518 | 2 | } |
519 | | |
520 | 20 | aws_config.requestTimeoutMs = 30000; |
521 | 20 | if (s3_conf.request_timeout_ms > 0) { |
522 | 18 | aws_config.requestTimeoutMs = s3_conf.request_timeout_ms; |
523 | 18 | } |
524 | | |
525 | 20 | if (s3_conf.connect_timeout_ms > 0) { |
526 | 18 | aws_config.connectTimeoutMs = s3_conf.connect_timeout_ms; |
527 | 18 | } |
528 | | |
529 | 20 | if (config::s3_client_http_scheme == "http") { |
530 | 20 | aws_config.scheme = Aws::Http::Scheme::HTTP; |
531 | 20 | } |
532 | | |
533 | 20 | aws_config.retryStrategy = std::make_shared<S3CustomRetryStrategy>( |
534 | 20 | config::max_s3_client_retry /*scaleFactor = 25*/); |
535 | | |
536 | 20 | std::shared_ptr<Aws::S3::S3Client> new_client = std::make_shared<Aws::S3::S3Client>( |
537 | 20 | get_aws_credentials_provider(s3_conf), std::move(aws_config), |
538 | 20 | Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, |
539 | 20 | s3_conf.use_virtual_addressing); |
540 | | |
541 | 20 | auto obj_client = std::make_shared<io::S3ObjStorageClient>(std::move(new_client)); |
542 | 20 | LOG_INFO("create one s3 client with {}", s3_conf.to_string()); |
543 | 20 | return obj_client; |
544 | 20 | } |
545 | | |
546 | | Status S3ClientFactory::convert_properties_to_s3_conf( |
547 | 1.22k | const std::map<std::string, std::string>& prop, const S3URI& s3_uri, S3Conf* s3_conf) { |
548 | 1.22k | StringCaseMap<std::string> properties(prop.begin(), prop.end()); |
549 | 1.22k | if (auto it = properties.find(S3_AK); it != properties.end()) { |
550 | 1.21k | s3_conf->client_conf.ak = it->second; |
551 | 1.21k | } |
552 | 1.22k | if (auto it = properties.find(S3_SK); it != properties.end()) { |
553 | 1.21k | s3_conf->client_conf.sk = it->second; |
554 | 1.21k | } |
555 | 1.22k | if (auto it = properties.find(S3_TOKEN); it != properties.end()) { |
556 | 2 | s3_conf->client_conf.token = it->second; |
557 | 2 | } |
558 | 1.22k | if (auto it = properties.find(S3_ENDPOINT); it != properties.end()) { |
559 | 1.22k | s3_conf->client_conf.endpoint = it->second; |
560 | 1.22k | } |
561 | 1.22k | if (auto it = properties.find(S3_NEED_OVERRIDE_ENDPOINT); it != properties.end()) { |
562 | 0 | s3_conf->client_conf.need_override_endpoint = (it->second == "true"); |
563 | 0 | } |
564 | 1.22k | if (auto it = properties.find(S3_REGION); it != properties.end()) { |
565 | 1.22k | s3_conf->client_conf.region = it->second; |
566 | 1.22k | } |
567 | 1.22k | if (auto it = properties.find(S3_MAX_CONN_SIZE); it != properties.end()) { |
568 | 1.21k | if (!to_int(it->second, s3_conf->client_conf.max_connections)) { |
569 | 0 | return Status::InvalidArgument("invalid {} value \"{}\"", S3_MAX_CONN_SIZE, it->second); |
570 | 0 | } |
571 | 1.21k | } |
572 | 1.22k | if (auto it = properties.find(S3_REQUEST_TIMEOUT_MS); it != properties.end()) { |
573 | 1.21k | if (!to_int(it->second, s3_conf->client_conf.request_timeout_ms)) { |
574 | 0 | return Status::InvalidArgument("invalid {} value \"{}\"", S3_REQUEST_TIMEOUT_MS, |
575 | 0 | it->second); |
576 | 0 | } |
577 | 1.21k | } |
578 | 1.22k | if (auto it = properties.find(S3_CONN_TIMEOUT_MS); it != properties.end()) { |
579 | 1.21k | if (!to_int(it->second, s3_conf->client_conf.connect_timeout_ms)) { |
580 | 0 | return Status::InvalidArgument("invalid {} value \"{}\"", S3_CONN_TIMEOUT_MS, |
581 | 0 | it->second); |
582 | 0 | } |
583 | 1.21k | } |
584 | 1.22k | if (auto it = properties.find(S3_PROVIDER); it != properties.end()) { |
585 | | // S3 Provider properties should be case insensitive. |
586 | 0 | if (0 == strcasecmp(it->second.c_str(), AZURE_PROVIDER_STRING)) { |
587 | 0 | s3_conf->client_conf.provider = io::ObjStorageType::AZURE; |
588 | 0 | } |
589 | 0 | } |
590 | | |
591 | 1.22k | if (s3_uri.get_bucket().empty()) { |
592 | 0 | return Status::InvalidArgument("Invalid S3 URI {}, bucket is not specified", |
593 | 0 | s3_uri.to_string()); |
594 | 0 | } |
595 | 1.22k | s3_conf->bucket = s3_uri.get_bucket(); |
596 | | // For azure's compatibility |
597 | 1.22k | s3_conf->client_conf.bucket = s3_uri.get_bucket(); |
598 | 1.22k | s3_conf->prefix = ""; |
599 | | |
600 | | // See https://sdk.amazonaws.com/cpp/api/LATEST/class_aws_1_1_s3_1_1_s3_client.html |
601 | 1.22k | s3_conf->client_conf.use_virtual_addressing = true; |
602 | 1.22k | if (auto it = properties.find(USE_PATH_STYLE); it != properties.end()) { |
603 | 1.21k | s3_conf->client_conf.use_virtual_addressing = it->second != "true"; |
604 | 1.21k | } |
605 | | |
606 | 1.22k | if (auto it = properties.find(S3_ROLE_ARN); it != properties.end()) { |
607 | | // Keep provider type as Default unless explicitly configured by |
608 | | // AWS_CREDENTIALS_PROVIDER_TYPE, consistent with FE behavior. |
609 | 5 | s3_conf->client_conf.role_arn = it->second; |
610 | 5 | } |
611 | | |
612 | 1.22k | if (auto it = properties.find(S3_EXTERNAL_ID); it != properties.end()) { |
613 | 0 | s3_conf->client_conf.external_id = it->second; |
614 | 0 | } |
615 | | |
616 | 1.22k | if (auto it = properties.find(S3_CREDENTIALS_PROVIDER_TYPE); it != properties.end()) { |
617 | 22 | s3_conf->client_conf.cred_provider_type = cred_provider_type_from_string(it->second); |
618 | 22 | } |
619 | | |
620 | 1.22k | if (auto st = is_s3_conf_valid(s3_conf->client_conf); !st.ok()) { |
621 | 2 | return st; |
622 | 2 | } |
623 | 1.22k | return Status::OK(); |
624 | 1.22k | } |
625 | | |
626 | 0 | static CredProviderType cred_provider_type_from_thrift(TCredProviderType::type cred_provider_type) { |
627 | 0 | switch (cred_provider_type) { |
628 | 0 | case TCredProviderType::DEFAULT: |
629 | 0 | return CredProviderType::Default; |
630 | 0 | case TCredProviderType::SIMPLE: |
631 | 0 | return CredProviderType::Simple; |
632 | 0 | case TCredProviderType::INSTANCE_PROFILE: |
633 | 0 | return CredProviderType::InstanceProfile; |
634 | 0 | default: |
635 | 0 | __builtin_unreachable(); |
636 | 0 | LOG(WARNING) << "Invalid TCredProviderType value: " << cred_provider_type |
637 | 0 | << ", use default instead."; |
638 | 0 | return CredProviderType::Default; |
639 | 0 | } |
640 | 0 | } |
641 | | |
642 | 542 | S3Conf S3Conf::get_s3_conf(const cloud::ObjectStoreInfoPB& info) { |
643 | 542 | S3Conf ret { |
644 | 542 | .bucket = info.bucket(), |
645 | 542 | .prefix = info.prefix(), |
646 | 542 | .client_conf { |
647 | 542 | .endpoint = info.endpoint(), |
648 | 542 | .region = info.region(), |
649 | 542 | .ak = info.ak(), |
650 | 542 | .sk = info.sk(), |
651 | 542 | .token {}, |
652 | 542 | .bucket = info.bucket(), |
653 | 542 | .provider = io::ObjStorageType::AWS, |
654 | 542 | .use_virtual_addressing = |
655 | 542 | info.has_use_path_style() ? !info.use_path_style() : true, |
656 | | |
657 | 542 | .role_arn = info.role_arn(), |
658 | 542 | .external_id = info.external_id(), |
659 | 542 | }, |
660 | 542 | .sse_enabled = info.sse_enabled(), |
661 | 542 | }; |
662 | | |
663 | 542 | if (info.has_cred_provider_type()) { |
664 | 0 | ret.client_conf.cred_provider_type = cred_provider_type_from_pb(info.cred_provider_type()); |
665 | 0 | } |
666 | | |
667 | 542 | io::ObjStorageType type = io::ObjStorageType::AWS; |
668 | 542 | switch (info.provider()) { |
669 | 542 | case cloud::ObjectStoreInfoPB_Provider_OSS: |
670 | 542 | type = io::ObjStorageType::OSS; |
671 | 542 | break; |
672 | 0 | case cloud::ObjectStoreInfoPB_Provider_S3: |
673 | 0 | type = io::ObjStorageType::AWS; |
674 | 0 | break; |
675 | 0 | case cloud::ObjectStoreInfoPB_Provider_COS: |
676 | 0 | type = io::ObjStorageType::COS; |
677 | 0 | break; |
678 | 0 | case cloud::ObjectStoreInfoPB_Provider_OBS: |
679 | 0 | type = io::ObjStorageType::OBS; |
680 | 0 | break; |
681 | 0 | case cloud::ObjectStoreInfoPB_Provider_BOS: |
682 | 0 | type = io::ObjStorageType::BOS; |
683 | 0 | break; |
684 | 0 | case cloud::ObjectStoreInfoPB_Provider_GCP: |
685 | 0 | type = io::ObjStorageType::GCP; |
686 | 0 | break; |
687 | 0 | case cloud::ObjectStoreInfoPB_Provider_AZURE: |
688 | 0 | type = io::ObjStorageType::AZURE; |
689 | 0 | break; |
690 | 0 | case cloud::ObjectStoreInfoPB_Provider_TOS: |
691 | 0 | type = io::ObjStorageType::TOS; |
692 | 0 | break; |
693 | 0 | default: |
694 | 0 | __builtin_unreachable(); |
695 | 0 | LOG_FATAL("unknown provider type {}, info {}", info.provider(), ret.to_string()); |
696 | 542 | } |
697 | 542 | ret.client_conf.provider = type; |
698 | 542 | return ret; |
699 | 542 | } |
700 | | |
701 | 33 | S3Conf S3Conf::get_s3_conf(const TS3StorageParam& param) { |
702 | 33 | S3Conf ret { |
703 | 33 | .bucket = param.bucket, |
704 | 33 | .prefix = param.root_path, |
705 | 33 | .client_conf = { |
706 | 33 | .endpoint = param.endpoint, |
707 | 33 | .region = param.region, |
708 | 33 | .ak = param.ak, |
709 | 33 | .sk = param.sk, |
710 | 33 | .token = param.token, |
711 | 33 | .bucket = param.bucket, |
712 | 33 | .provider = io::ObjStorageType::AWS, |
713 | 33 | .max_connections = param.max_conn, |
714 | 33 | .request_timeout_ms = param.request_timeout_ms, |
715 | 33 | .connect_timeout_ms = param.conn_timeout_ms, |
716 | | // When using cold heat separation in minio, user might use ip address directly, |
717 | | // which needs enable use_virtual_addressing to true |
718 | 33 | .use_virtual_addressing = !param.use_path_style, |
719 | 33 | .role_arn = param.role_arn, |
720 | 33 | .external_id = param.external_id, |
721 | 33 | }}; |
722 | | |
723 | 33 | if (param.__isset.cred_provider_type) { |
724 | 0 | ret.client_conf.cred_provider_type = |
725 | 0 | cred_provider_type_from_thrift(param.cred_provider_type); |
726 | 0 | } |
727 | | |
728 | 33 | io::ObjStorageType type = io::ObjStorageType::AWS; |
729 | 33 | switch (param.provider) { |
730 | 33 | case TObjStorageType::UNKNOWN: |
731 | 33 | LOG_INFO("Receive one legal storage resource, set provider type to aws, param detail {}", |
732 | 33 | ret.to_string()); |
733 | 33 | type = io::ObjStorageType::AWS; |
734 | 33 | break; |
735 | 0 | case TObjStorageType::AWS: |
736 | 0 | type = io::ObjStorageType::AWS; |
737 | 0 | break; |
738 | 0 | case TObjStorageType::AZURE: |
739 | 0 | type = io::ObjStorageType::AZURE; |
740 | 0 | break; |
741 | 0 | case TObjStorageType::BOS: |
742 | 0 | type = io::ObjStorageType::BOS; |
743 | 0 | break; |
744 | 0 | case TObjStorageType::COS: |
745 | 0 | type = io::ObjStorageType::COS; |
746 | 0 | break; |
747 | 0 | case TObjStorageType::OBS: |
748 | 0 | type = io::ObjStorageType::OBS; |
749 | 0 | break; |
750 | 0 | case TObjStorageType::OSS: |
751 | 0 | type = io::ObjStorageType::OSS; |
752 | 0 | break; |
753 | 0 | case TObjStorageType::GCP: |
754 | 0 | type = io::ObjStorageType::GCP; |
755 | 0 | break; |
756 | 0 | case TObjStorageType::TOS: |
757 | 0 | type = io::ObjStorageType::TOS; |
758 | 0 | break; |
759 | 0 | default: |
760 | 0 | LOG_FATAL("unknown provider type {}, info {}", param.provider, ret.to_string()); |
761 | 0 | __builtin_unreachable(); |
762 | 33 | } |
763 | 33 | ret.client_conf.provider = type; |
764 | 33 | return ret; |
765 | 33 | } |
766 | | |
767 | 616 | std::string hide_access_key(const std::string& ak) { |
768 | 616 | std::string key = ak; |
769 | 616 | size_t key_len = key.length(); |
770 | 616 | size_t reserved_count; |
771 | 616 | if (key_len > 7) { |
772 | 31 | reserved_count = 6; |
773 | 585 | } else if (key_len > 2) { |
774 | 564 | reserved_count = key_len - 2; |
775 | 564 | } else { |
776 | 21 | reserved_count = 0; |
777 | 21 | } |
778 | | |
779 | 616 | size_t x_count = key_len - reserved_count; |
780 | 616 | size_t left_x_count = (x_count + 1) / 2; |
781 | | |
782 | 616 | if (left_x_count > 0) { |
783 | 599 | key.replace(0, left_x_count, left_x_count, 'x'); |
784 | 599 | } |
785 | | |
786 | 616 | if (x_count - left_x_count > 0) { |
787 | 595 | key.replace(key_len - (x_count - left_x_count), x_count - left_x_count, |
788 | 595 | x_count - left_x_count, 'x'); |
789 | 595 | } |
790 | 616 | return key; |
791 | 616 | } |
792 | | |
793 | | } // end namespace doris |