Coverage Report

Created: 2025-10-16 17:20

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/be/src/util/s3_util.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "util/s3_util.h"
19
20
#include <aws/core/auth/AWSAuthSigner.h>
21
#include <aws/core/auth/AWSCredentials.h>
22
#include <aws/core/auth/AWSCredentialsProviderChain.h>
23
#include <aws/core/client/DefaultRetryStrategy.h>
24
#include <aws/core/utils/logging/LogLevel.h>
25
#include <aws/core/utils/logging/LogSystemInterface.h>
26
#include <aws/core/utils/memory/stl/AWSStringStream.h>
27
#include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h>
28
#include <aws/s3/S3Client.h>
29
#include <aws/sts/STSClient.h>
30
#include <bvar/reducer.h>
31
#include <util/string_util.h>
32
33
#include <atomic>
34
#ifdef USE_AZURE
35
#include <azure/core/diagnostics/logger.hpp>
36
#include <azure/storage/blobs/blob_container_client.hpp>
37
#endif
38
#include <cstdlib>
39
#include <filesystem>
40
#include <functional>
41
#include <memory>
42
#include <ostream>
43
#include <utility>
44
45
#include "common/config.h"
46
#include "common/logging.h"
47
#include "common/status.h"
48
#include "cpp/aws_logger.h"
49
#include "cpp/custom_aws_credentials_provider_chain.h"
50
#include "cpp/obj_retry_strategy.h"
51
#include "cpp/sync_point.h"
52
#include "cpp/util.h"
53
#ifdef USE_AZURE
54
#include "io/fs/azure_obj_storage_client.h"
55
#endif
56
#include "io/fs/obj_storage_client.h"
57
#include "io/fs/s3_obj_storage_client.h"
58
#include "runtime/exec_env.h"
59
#include "s3_uri.h"
60
#include "vec/exec/scan/scanner_scheduler.h"
61
62
namespace doris {
63
namespace s3_bvar {
64
bvar::LatencyRecorder s3_get_latency("s3_get");
65
bvar::LatencyRecorder s3_put_latency("s3_put");
66
bvar::LatencyRecorder s3_delete_object_latency("s3_delete_object");
67
bvar::LatencyRecorder s3_delete_objects_latency("s3_delete_objects");
68
bvar::LatencyRecorder s3_head_latency("s3_head");
69
bvar::LatencyRecorder s3_multi_part_upload_latency("s3_multi_part_upload");
70
bvar::LatencyRecorder s3_list_latency("s3_list");
71
bvar::LatencyRecorder s3_list_object_versions_latency("s3_list_object_versions");
72
bvar::LatencyRecorder s3_get_bucket_version_latency("s3_get_bucket_version");
73
bvar::LatencyRecorder s3_copy_object_latency("s3_copy_object");
74
}; // namespace s3_bvar
75
76
namespace {
77
78
5
doris::Status is_s3_conf_valid(const S3ClientConf& conf) {
79
5
    if (conf.endpoint.empty()) {
80
0
        return Status::InvalidArgument<false>("Invalid s3 conf, empty endpoint");
81
0
    }
82
5
    if (conf.region.empty()) {
83
0
        return Status::InvalidArgument<false>("Invalid s3 conf, empty region");
84
0
    }
85
86
5
    if (conf.role_arn.empty()) {
87
        // Allow anonymous access when both ak and sk are empty
88
5
        bool hasAk = !conf.ak.empty();
89
5
        bool hasSk = !conf.sk.empty();
90
91
        // Either both credentials are provided or both are empty (anonymous access)
92
5
        if (hasAk && conf.sk.empty()) {
93
0
            return Status::InvalidArgument<false>("Invalid s3 conf, empty sk");
94
0
        }
95
5
        if (hasSk && conf.ak.empty()) {
96
0
            return Status::InvalidArgument<false>("Invalid s3 conf, empty ak");
97
0
        }
98
5
    }
99
5
    return Status::OK();
100
5
}
101
102
// Return true is convert `str` to int successfully
103
0
bool to_int(std::string_view str, int& res) {
104
0
    auto [_, ec] = std::from_chars(str.data(), str.data() + str.size(), res);
105
0
    return ec == std::errc {};
106
0
}
107
108
constexpr char USE_PATH_STYLE[] = "use_path_style";
109
110
constexpr char AZURE_PROVIDER_STRING[] = "AZURE";
111
constexpr char S3_PROVIDER[] = "provider";
112
constexpr char S3_AK[] = "AWS_ACCESS_KEY";
113
constexpr char S3_SK[] = "AWS_SECRET_KEY";
114
constexpr char S3_ENDPOINT[] = "AWS_ENDPOINT";
115
constexpr char S3_REGION[] = "AWS_REGION";
116
constexpr char S3_TOKEN[] = "AWS_TOKEN";
117
constexpr char S3_MAX_CONN_SIZE[] = "AWS_MAX_CONN_SIZE";
118
constexpr char S3_REQUEST_TIMEOUT_MS[] = "AWS_REQUEST_TIMEOUT_MS";
119
constexpr char S3_CONN_TIMEOUT_MS[] = "AWS_CONNECTION_TIMEOUT_MS";
120
constexpr char S3_NEED_OVERRIDE_ENDPOINT[] = "AWS_NEED_OVERRIDE_ENDPOINT";
121
122
constexpr char S3_ROLE_ARN[] = "AWS_ROLE_ARN";
123
constexpr char S3_EXTERNAL_ID[] = "AWS_EXTERNAL_ID";
124
} // namespace
125
126
bvar::Adder<int64_t> get_rate_limit_ns("get_rate_limit_ns");
127
bvar::Adder<int64_t> get_rate_limit_exceed_req_num("get_rate_limit_exceed_req_num");
128
bvar::Adder<int64_t> put_rate_limit_ns("put_rate_limit_ns");
129
bvar::Adder<int64_t> put_rate_limit_exceed_req_num("put_rate_limit_exceed_req_num");
130
131
0
S3RateLimiterHolder* S3ClientFactory::rate_limiter(S3RateLimitType type) {
132
0
    CHECK(type == S3RateLimitType::GET || type == S3RateLimitType::PUT) << to_string(type);
133
0
    return _rate_limiters[static_cast<size_t>(type)].get();
134
0
}
135
136
0
int reset_s3_rate_limiter(S3RateLimitType type, size_t max_speed, size_t max_burst, size_t limit) {
137
0
    if (type == S3RateLimitType::UNKNOWN) {
138
0
        return -1;
139
0
    }
140
0
    return S3ClientFactory::instance().rate_limiter(type)->reset(max_speed, max_burst, limit);
141
0
}
142
143
1
S3ClientFactory::S3ClientFactory() {
144
1
    _aws_options = Aws::SDKOptions {};
145
1
    auto logLevel = static_cast<Aws::Utils::Logging::LogLevel>(config::aws_log_level);
146
1
    _aws_options.loggingOptions.logLevel = logLevel;
147
1
    _aws_options.loggingOptions.logger_create_fn = [logLevel] {
148
1
        return std::make_shared<DorisAWSLogger>(logLevel);
149
1
    };
150
1
    Aws::InitAPI(_aws_options);
151
1
    _ca_cert_file_path = get_valid_ca_cert_path(doris::split(config::ca_cert_file_paths, ";"));
152
1
    _rate_limiters = {
153
1
            std::make_unique<S3RateLimiterHolder>(
154
1
                    config::s3_get_token_per_second, config::s3_get_bucket_tokens,
155
1
                    config::s3_get_token_limit,
156
1
                    metric_func_factory(get_rate_limit_ns, get_rate_limit_exceed_req_num)),
157
1
            std::make_unique<S3RateLimiterHolder>(
158
1
                    config::s3_put_token_per_second, config::s3_put_bucket_tokens,
159
1
                    config::s3_put_token_limit,
160
1
                    metric_func_factory(put_rate_limit_ns, put_rate_limit_exceed_req_num))};
161
162
#ifdef USE_AZURE
163
    auto azureLogLevel =
164
            static_cast<Azure::Core::Diagnostics::Logger::Level>(config::azure_log_level);
165
    Azure::Core::Diagnostics::Logger::SetLevel(azureLogLevel);
166
    Azure::Core::Diagnostics::Logger::SetListener(
167
            [&](Azure::Core::Diagnostics::Logger::Level level, const std::string& message) {
168
                switch (level) {
169
                case Azure::Core::Diagnostics::Logger::Level::Verbose:
170
                    LOG(INFO) << message;
171
                    break;
172
                case Azure::Core::Diagnostics::Logger::Level::Informational:
173
                    LOG(INFO) << message;
174
                    break;
175
                case Azure::Core::Diagnostics::Logger::Level::Warning:
176
                    LOG(WARNING) << message;
177
                    break;
178
                case Azure::Core::Diagnostics::Logger::Level::Error:
179
                    LOG(ERROR) << message;
180
                    break;
181
                default:
182
                    LOG(WARNING) << "Unknown level: " << static_cast<int>(level)
183
                                 << ", message: " << message;
184
                    break;
185
                }
186
            });
187
#endif
188
1
}
189
190
1
S3ClientFactory::~S3ClientFactory() {
191
1
    Aws::ShutdownAPI(_aws_options);
192
1
}
193
194
7
S3ClientFactory& S3ClientFactory::instance() {
195
7
    static S3ClientFactory ret;
196
7
    return ret;
197
7
}
198
199
5
std::shared_ptr<io::ObjStorageClient> S3ClientFactory::create(const S3ClientConf& s3_conf) {
200
5
    if (!is_s3_conf_valid(s3_conf).ok()) {
201
0
        return nullptr;
202
0
    }
203
204
5
    uint64_t hash = s3_conf.get_hash();
205
5
    {
206
5
        std::lock_guard l(_lock);
207
5
        auto it = _cache.find(hash);
208
5
        if (it != _cache.end()) {
209
2
            return it->second;
210
2
        }
211
5
    }
212
213
3
    auto obj_client = (s3_conf.provider == io::ObjStorageType::AZURE)
214
3
                              ? _create_azure_client(s3_conf)
215
3
                              : _create_s3_client(s3_conf);
216
217
3
    {
218
3
        uint64_t hash = s3_conf.get_hash();
219
3
        std::lock_guard l(_lock);
220
3
        _cache[hash] = obj_client;
221
3
    }
222
3
    return obj_client;
223
5
}
224
225
std::shared_ptr<io::ObjStorageClient> S3ClientFactory::_create_azure_client(
226
0
        const S3ClientConf& s3_conf) {
227
#ifdef USE_AZURE
228
    auto cred =
229
            std::make_shared<Azure::Storage::StorageSharedKeyCredential>(s3_conf.ak, s3_conf.sk);
230
231
    const std::string container_name = s3_conf.bucket;
232
    std::string uri;
233
    if (config::force_azure_blob_global_endpoint) {
234
        uri = fmt::format("https://{}.blob.core.windows.net/{}", s3_conf.ak, container_name);
235
    } else {
236
        uri = fmt::format("{}/{}", s3_conf.endpoint, container_name);
237
        if (s3_conf.endpoint.find("://") == std::string::npos) {
238
            uri = "https://" + uri;
239
        }
240
    }
241
242
    Azure::Storage::Blobs::BlobClientOptions options;
243
    options.Retry.StatusCodes.insert(Azure::Core::Http::HttpStatusCode::TooManyRequests);
244
    options.Retry.MaxRetries = config::max_s3_client_retry;
245
    options.PerRetryPolicies.emplace_back(std::make_unique<AzureRetryRecordPolicy>());
246
247
    std::string normalized_uri = normalize_http_uri(uri);
248
    VLOG_DEBUG << "uri:" << uri << ", normalized_uri:" << normalized_uri;
249
250
    auto containerClient = std::make_shared<Azure::Storage::Blobs::BlobContainerClient>(
251
            uri, cred, std::move(options));
252
    LOG_INFO("create one azure client with {}", s3_conf.to_string());
253
    return std::make_shared<io::AzureObjStorageClient>(std::move(containerClient));
254
#else
255
0
    LOG_FATAL("BE is not compiled with azure support, export BUILD_AZURE=ON before building");
256
0
    return nullptr;
257
0
#endif
258
0
}
259
260
std::shared_ptr<Aws::Auth::AWSCredentialsProvider>
261
4
S3ClientFactory::_get_aws_credentials_provider_v1(const S3ClientConf& s3_conf) {
262
4
    if (!s3_conf.ak.empty() && !s3_conf.sk.empty()) {
263
1
        Aws::Auth::AWSCredentials aws_cred(s3_conf.ak, s3_conf.sk);
264
1
        DCHECK(!aws_cred.IsExpiredOrEmpty());
265
1
        if (!s3_conf.token.empty()) {
266
0
            aws_cred.SetSessionToken(s3_conf.token);
267
0
        }
268
1
        return std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(std::move(aws_cred));
269
1
    }
270
271
3
    if (s3_conf.cred_provider_type == CredProviderType::InstanceProfile) {
272
2
        if (s3_conf.role_arn.empty()) {
273
1
            return std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>();
274
1
        }
275
276
1
        Aws::Client::ClientConfiguration clientConfiguration =
277
1
                S3ClientFactory::getClientConfiguration();
278
279
1
        if (_ca_cert_file_path.empty()) {
280
0
            _ca_cert_file_path =
281
0
                    get_valid_ca_cert_path(doris::split(config::ca_cert_file_paths, ";"));
282
0
        }
283
1
        if (!_ca_cert_file_path.empty()) {
284
1
            clientConfiguration.caFile = _ca_cert_file_path;
285
1
        }
286
287
1
        auto stsClient = std::make_shared<Aws::STS::STSClient>(
288
1
                std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>(),
289
1
                clientConfiguration);
290
291
1
        return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>(
292
1
                s3_conf.role_arn, Aws::String(), s3_conf.external_id,
293
1
                Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, stsClient);
294
2
    }
295
296
    // Support anonymous access for public datasets when no credentials are provided
297
1
    if (s3_conf.ak.empty() && s3_conf.sk.empty()) {
298
1
        return std::make_shared<Aws::Auth::AnonymousAWSCredentialsProvider>();
299
1
    }
300
301
0
    return std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
302
1
}
303
304
std::shared_ptr<Aws::Auth::AWSCredentialsProvider>
305
6
S3ClientFactory::_get_aws_credentials_provider_v2(const S3ClientConf& s3_conf) {
306
6
    if (!s3_conf.ak.empty() && !s3_conf.sk.empty()) {
307
3
        Aws::Auth::AWSCredentials aws_cred(s3_conf.ak, s3_conf.sk);
308
3
        DCHECK(!aws_cred.IsExpiredOrEmpty());
309
3
        if (!s3_conf.token.empty()) {
310
0
            aws_cred.SetSessionToken(s3_conf.token);
311
0
        }
312
3
        return std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(std::move(aws_cred));
313
3
    }
314
315
3
    if (s3_conf.cred_provider_type == CredProviderType::InstanceProfile) {
316
2
        if (s3_conf.role_arn.empty()) {
317
1
            return std::make_shared<CustomAwsCredentialsProviderChain>();
318
1
        }
319
320
1
        Aws::Client::ClientConfiguration clientConfiguration =
321
1
                S3ClientFactory::getClientConfiguration();
322
323
1
        if (_ca_cert_file_path.empty()) {
324
0
            _ca_cert_file_path =
325
0
                    get_valid_ca_cert_path(doris::split(config::ca_cert_file_paths, ";"));
326
0
        }
327
1
        if (!_ca_cert_file_path.empty()) {
328
1
            clientConfiguration.caFile = _ca_cert_file_path;
329
1
        }
330
331
1
        auto stsClient = std::make_shared<Aws::STS::STSClient>(
332
1
                std::make_shared<CustomAwsCredentialsProviderChain>(), clientConfiguration);
333
334
1
        return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>(
335
1
                s3_conf.role_arn, Aws::String(), s3_conf.external_id,
336
1
                Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, stsClient);
337
2
    }
338
339
1
    return std::make_shared<CustomAwsCredentialsProviderChain>();
340
3
}
341
342
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> S3ClientFactory::get_aws_credentials_provider(
343
10
        const S3ClientConf& s3_conf) {
344
10
    if (config::aws_credentials_provider_version == "v2") {
345
6
        return _get_aws_credentials_provider_v2(s3_conf);
346
6
    }
347
4
    return _get_aws_credentials_provider_v1(s3_conf);
348
10
}
349
350
std::shared_ptr<io::ObjStorageClient> S3ClientFactory::_create_s3_client(
351
3
        const S3ClientConf& s3_conf) {
352
3
    TEST_SYNC_POINT_RETURN_WITH_VALUE(
353
2
            "s3_client_factory::create",
354
2
            std::make_shared<io::S3ObjStorageClient>(std::make_shared<Aws::S3::S3Client>()));
355
2
    Aws::Client::ClientConfiguration aws_config = S3ClientFactory::getClientConfiguration();
356
2
    if (s3_conf.need_override_endpoint) {
357
2
        aws_config.endpointOverride = s3_conf.endpoint;
358
2
    }
359
2
    aws_config.region = s3_conf.region;
360
361
2
    if (_ca_cert_file_path.empty()) {
362
0
        _ca_cert_file_path = get_valid_ca_cert_path(doris::split(config::ca_cert_file_paths, ";"));
363
0
    }
364
365
2
    if (!_ca_cert_file_path.empty()) {
366
2
        aws_config.caFile = _ca_cert_file_path;
367
2
    }
368
369
2
    if (s3_conf.max_connections > 0) {
370
0
        aws_config.maxConnections = s3_conf.max_connections;
371
2
    } else {
372
        // AWS SDK max concurrent tcp connections for a single http client to use. Default 25.
373
2
        aws_config.maxConnections = std::max(config::doris_scanner_thread_pool_thread_num, 25);
374
2
    }
375
376
2
    aws_config.requestTimeoutMs = 30000;
377
2
    if (s3_conf.request_timeout_ms > 0) {
378
0
        aws_config.requestTimeoutMs = s3_conf.request_timeout_ms;
379
0
    }
380
381
2
    if (s3_conf.connect_timeout_ms > 0) {
382
0
        aws_config.connectTimeoutMs = s3_conf.connect_timeout_ms;
383
0
    }
384
385
2
    if (config::s3_client_http_scheme == "http") {
386
2
        aws_config.scheme = Aws::Http::Scheme::HTTP;
387
2
    }
388
389
2
    aws_config.retryStrategy = std::make_shared<S3CustomRetryStrategy>(
390
2
            config::max_s3_client_retry /*scaleFactor = 25*/);
391
392
2
    std::shared_ptr<Aws::S3::S3Client> new_client = std::make_shared<Aws::S3::S3Client>(
393
2
            get_aws_credentials_provider(s3_conf), std::move(aws_config),
394
2
            Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
395
2
            s3_conf.use_virtual_addressing);
396
397
2
    auto obj_client = std::make_shared<io::S3ObjStorageClient>(std::move(new_client));
398
2
    LOG_INFO("create one s3 client with {}", s3_conf.to_string());
399
2
    return obj_client;
400
3
}
401
402
Status S3ClientFactory::convert_properties_to_s3_conf(
403
0
        const std::map<std::string, std::string>& prop, const S3URI& s3_uri, S3Conf* s3_conf) {
404
0
    StringCaseMap<std::string> properties(prop.begin(), prop.end());
405
0
    if (auto it = properties.find(S3_AK); it != properties.end()) {
406
0
        s3_conf->client_conf.ak = it->second;
407
0
    }
408
0
    if (auto it = properties.find(S3_SK); it != properties.end()) {
409
0
        s3_conf->client_conf.sk = it->second;
410
0
    }
411
0
    if (auto it = properties.find(S3_TOKEN); it != properties.end()) {
412
0
        s3_conf->client_conf.token = it->second;
413
0
    }
414
0
    if (auto it = properties.find(S3_ENDPOINT); it != properties.end()) {
415
0
        s3_conf->client_conf.endpoint = it->second;
416
0
    }
417
0
    if (auto it = properties.find(S3_NEED_OVERRIDE_ENDPOINT); it != properties.end()) {
418
0
        s3_conf->client_conf.need_override_endpoint = (it->second == "true");
419
0
    }
420
0
    if (auto it = properties.find(S3_REGION); it != properties.end()) {
421
0
        s3_conf->client_conf.region = it->second;
422
0
    }
423
0
    if (auto it = properties.find(S3_MAX_CONN_SIZE); it != properties.end()) {
424
0
        if (!to_int(it->second, s3_conf->client_conf.max_connections)) {
425
0
            return Status::InvalidArgument("invalid {} value \"{}\"", S3_MAX_CONN_SIZE, it->second);
426
0
        }
427
0
    }
428
0
    if (auto it = properties.find(S3_REQUEST_TIMEOUT_MS); it != properties.end()) {
429
0
        if (!to_int(it->second, s3_conf->client_conf.request_timeout_ms)) {
430
0
            return Status::InvalidArgument("invalid {} value \"{}\"", S3_REQUEST_TIMEOUT_MS,
431
0
                                           it->second);
432
0
        }
433
0
    }
434
0
    if (auto it = properties.find(S3_CONN_TIMEOUT_MS); it != properties.end()) {
435
0
        if (!to_int(it->second, s3_conf->client_conf.connect_timeout_ms)) {
436
0
            return Status::InvalidArgument("invalid {} value \"{}\"", S3_CONN_TIMEOUT_MS,
437
0
                                           it->second);
438
0
        }
439
0
    }
440
0
    if (auto it = properties.find(S3_PROVIDER); it != properties.end()) {
441
        // S3 Provider properties should be case insensitive.
442
0
        if (0 == strcasecmp(it->second.c_str(), AZURE_PROVIDER_STRING)) {
443
0
            s3_conf->client_conf.provider = io::ObjStorageType::AZURE;
444
0
        }
445
0
    }
446
447
0
    if (s3_uri.get_bucket().empty()) {
448
0
        return Status::InvalidArgument("Invalid S3 URI {}, bucket is not specified",
449
0
                                       s3_uri.to_string());
450
0
    }
451
0
    s3_conf->bucket = s3_uri.get_bucket();
452
    // For azure's compatibility
453
0
    s3_conf->client_conf.bucket = s3_uri.get_bucket();
454
0
    s3_conf->prefix = "";
455
456
    // See https://sdk.amazonaws.com/cpp/api/LATEST/class_aws_1_1_s3_1_1_s3_client.html
457
0
    s3_conf->client_conf.use_virtual_addressing = true;
458
0
    if (auto it = properties.find(USE_PATH_STYLE); it != properties.end()) {
459
0
        s3_conf->client_conf.use_virtual_addressing = it->second != "true";
460
0
    }
461
462
0
    if (auto it = properties.find(S3_ROLE_ARN); it != properties.end()) {
463
0
        s3_conf->client_conf.cred_provider_type = CredProviderType::InstanceProfile;
464
0
        s3_conf->client_conf.role_arn = it->second;
465
0
    }
466
467
0
    if (auto it = properties.find(S3_EXTERNAL_ID); it != properties.end()) {
468
0
        s3_conf->client_conf.external_id = it->second;
469
0
    }
470
471
0
    if (auto st = is_s3_conf_valid(s3_conf->client_conf); !st.ok()) {
472
0
        return st;
473
0
    }
474
0
    return Status::OK();
475
0
}
476
477
0
static CredProviderType cred_provider_type_from_thrift(TCredProviderType::type cred_provider_type) {
478
0
    switch (cred_provider_type) {
479
0
    case TCredProviderType::DEFAULT:
480
0
        return CredProviderType::Default;
481
0
    case TCredProviderType::SIMPLE:
482
0
        return CredProviderType::Simple;
483
0
    case TCredProviderType::INSTANCE_PROFILE:
484
0
        return CredProviderType::InstanceProfile;
485
0
    default:
486
0
        __builtin_unreachable();
487
0
        LOG(WARNING) << "Invalid TCredProviderType value: " << cred_provider_type
488
0
                     << ", use default instead.";
489
0
        return CredProviderType::Default;
490
0
    }
491
0
}
492
493
0
S3Conf S3Conf::get_s3_conf(const cloud::ObjectStoreInfoPB& info) {
494
0
    S3Conf ret {
495
0
            .bucket = info.bucket(),
496
0
            .prefix = info.prefix(),
497
0
            .client_conf {
498
0
                    .endpoint = info.endpoint(),
499
0
                    .region = info.region(),
500
0
                    .ak = info.ak(),
501
0
                    .sk = info.sk(),
502
0
                    .token {},
503
0
                    .bucket = info.bucket(),
504
0
                    .provider = io::ObjStorageType::AWS,
505
0
                    .use_virtual_addressing =
506
0
                            info.has_use_path_style() ? !info.use_path_style() : true,
507
508
0
                    .role_arn = info.role_arn(),
509
0
                    .external_id = info.external_id(),
510
0
            },
511
0
            .sse_enabled = info.sse_enabled(),
512
0
    };
513
514
0
    if (info.has_cred_provider_type()) {
515
0
        ret.client_conf.cred_provider_type = cred_provider_type_from_pb(info.cred_provider_type());
516
0
    }
517
518
0
    io::ObjStorageType type = io::ObjStorageType::AWS;
519
0
    switch (info.provider()) {
520
0
    case cloud::ObjectStoreInfoPB_Provider_OSS:
521
0
        type = io::ObjStorageType::OSS;
522
0
        break;
523
0
    case cloud::ObjectStoreInfoPB_Provider_S3:
524
0
        type = io::ObjStorageType::AWS;
525
0
        break;
526
0
    case cloud::ObjectStoreInfoPB_Provider_COS:
527
0
        type = io::ObjStorageType::COS;
528
0
        break;
529
0
    case cloud::ObjectStoreInfoPB_Provider_OBS:
530
0
        type = io::ObjStorageType::OBS;
531
0
        break;
532
0
    case cloud::ObjectStoreInfoPB_Provider_BOS:
533
0
        type = io::ObjStorageType::BOS;
534
0
        break;
535
0
    case cloud::ObjectStoreInfoPB_Provider_GCP:
536
0
        type = io::ObjStorageType::GCP;
537
0
        break;
538
0
    case cloud::ObjectStoreInfoPB_Provider_AZURE:
539
0
        type = io::ObjStorageType::AZURE;
540
0
        break;
541
0
    case cloud::ObjectStoreInfoPB_Provider_TOS:
542
0
        type = io::ObjStorageType::TOS;
543
0
        break;
544
0
    default:
545
0
        __builtin_unreachable();
546
0
        LOG_FATAL("unknown provider type {}, info {}", info.provider(), ret.to_string());
547
0
    }
548
0
    ret.client_conf.provider = type;
549
0
    return ret;
550
0
}
551
552
0
S3Conf S3Conf::get_s3_conf(const TS3StorageParam& param) {
553
0
    S3Conf ret {
554
0
            .bucket = param.bucket,
555
0
            .prefix = param.root_path,
556
0
            .client_conf = {
557
0
                    .endpoint = param.endpoint,
558
0
                    .region = param.region,
559
0
                    .ak = param.ak,
560
0
                    .sk = param.sk,
561
0
                    .token = param.token,
562
0
                    .bucket = param.bucket,
563
0
                    .provider = io::ObjStorageType::AWS,
564
0
                    .max_connections = param.max_conn,
565
0
                    .request_timeout_ms = param.request_timeout_ms,
566
0
                    .connect_timeout_ms = param.conn_timeout_ms,
567
                    // When using cold heat separation in minio, user might use ip address directly,
568
                    // which needs enable use_virtual_addressing to true
569
0
                    .use_virtual_addressing = !param.use_path_style,
570
0
                    .role_arn = param.role_arn,
571
0
                    .external_id = param.external_id,
572
0
            }};
573
574
0
    if (param.__isset.cred_provider_type) {
575
0
        ret.client_conf.cred_provider_type =
576
0
                cred_provider_type_from_thrift(param.cred_provider_type);
577
0
    }
578
579
0
    io::ObjStorageType type = io::ObjStorageType::AWS;
580
0
    switch (param.provider) {
581
0
    case TObjStorageType::UNKNOWN:
582
0
        LOG_INFO("Receive one legal storage resource, set provider type to aws, param detail {}",
583
0
                 ret.to_string());
584
0
        type = io::ObjStorageType::AWS;
585
0
        break;
586
0
    case TObjStorageType::AWS:
587
0
        type = io::ObjStorageType::AWS;
588
0
        break;
589
0
    case TObjStorageType::AZURE:
590
0
        type = io::ObjStorageType::AZURE;
591
0
        break;
592
0
    case TObjStorageType::BOS:
593
0
        type = io::ObjStorageType::BOS;
594
0
        break;
595
0
    case TObjStorageType::COS:
596
0
        type = io::ObjStorageType::COS;
597
0
        break;
598
0
    case TObjStorageType::OBS:
599
0
        type = io::ObjStorageType::OBS;
600
0
        break;
601
0
    case TObjStorageType::OSS:
602
0
        type = io::ObjStorageType::OSS;
603
0
        break;
604
0
    case TObjStorageType::GCP:
605
0
        type = io::ObjStorageType::GCP;
606
0
        break;
607
0
    case TObjStorageType::TOS:
608
0
        type = io::ObjStorageType::TOS;
609
0
        break;
610
0
    default:
611
0
        LOG_FATAL("unknown provider type {}, info {}", param.provider, ret.to_string());
612
0
        __builtin_unreachable();
613
0
    }
614
0
    ret.client_conf.provider = type;
615
0
    return ret;
616
0
}
617
618
13
std::string hide_access_key(const std::string& ak) {
619
13
    std::string key = ak;
620
13
    size_t key_len = key.length();
621
13
    size_t reserved_count;
622
13
    if (key_len > 7) {
623
3
        reserved_count = 6;
624
10
    } else if (key_len > 2) {
625
6
        reserved_count = key_len - 2;
626
6
    } else {
627
4
        reserved_count = 0;
628
4
    }
629
630
13
    size_t x_count = key_len - reserved_count;
631
13
    size_t left_x_count = (x_count + 1) / 2;
632
633
13
    if (left_x_count > 0) {
634
12
        key.replace(0, left_x_count, left_x_count, 'x');
635
12
    }
636
637
13
    if (x_count - left_x_count > 0) {
638
11
        key.replace(key_len - (x_count - left_x_count), x_count - left_x_count,
639
11
                    x_count - left_x_count, 'x');
640
11
    }
641
13
    return key;
642
13
}
643
644
} // end namespace doris