Coverage Report

Created: 2026-04-01 15:21

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/aws_msk_iam_auth.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <aws/core/auth/AWSCredentialsProvider.h>
21
#include <librdkafka/rdkafkacpp.h>
22
23
#include <memory>
24
#include <mutex>
25
#include <string>
26
#include <unordered_map>
27
28
#include "common/status.h"
29
30
namespace doris {
31
/**
32
 * AWS MSK IAM authentication token generator.
33
 * 
34
 * This class generates SASL/OAUTHBEARER tokens for AWS MSK IAM authentication.
35
 * It uses AWS SDK for C++ to obtain IAM credentials and generates signed tokens
36
 * that can be used with librdkafka's OAUTHBEARER mechanism.
37
 */
38
class AwsMskIamAuth {
39
public:
40
    struct Config {
41
        std::string region;       // AWS region (e.g., "us-east-1"), required
42
        std::string access_key;   // AWS Access Key ID (optional, for explicit credentials)
43
        std::string secret_key;   // AWS Secret Access Key (optional, for explicit credentials)
44
        std::string role_arn;     // IAM role ARN (optional, for assume role)
45
        std::string external_id;  // Optional external ID for STS AssumeRole
46
        std::string profile_name; // AWS profile name (optional, reads from ~/.aws/credentials)
47
        std::string
48
                credentials_provider; // Credentials provider type (optional, e.g., "ENV", "INSTANCE_PROFILE")
49
        int token_refresh_margin_ms = 60000; // Refresh token 60s before expiry
50
    };
51
52
    explicit AwsMskIamAuth(Config config);
53
16
    ~AwsMskIamAuth() = default;
54
55
    /**
56
     * Generate AWS MSK IAM authentication token.
57
     * 
58
     * The token is a base64url-encoded presigned URL following AWS SigV4 format:
59
     * https://kafka.<region>.amazonaws.com/?Action=kafka-cluster:Connect
60
     *   &X-Amz-Algorithm=AWS4-HMAC-SHA256
61
     *   &X-Amz-Credential=<access-key>/<date>/<region>/kafka-cluster/aws4_request
62
     *   &X-Amz-Date=<timestamp>
63
     *   &X-Amz-Expires=900
64
     *   &X-Amz-SignedHeaders=host
65
     *   &X-Amz-Signature=<signature>
66
     *   &X-Amz-Security-Token=<session-token>  // if using temporary credentials
67
     *   &User-Agent=doris-msk-iam-auth/1.0
68
     * 
69
     * Reference: https://github.com/aws/aws-msk-iam-sasl-signer-python
70
     * 
71
     * @param broker_hostname The MSK broker hostname (used for logging, not in token)
72
     * @param token Output: base64url-encoded signed URL token
73
     * @param token_lifetime_ms Output: token lifetime in milliseconds (3600000ms = 1 hour)
74
     * @return Status indicating success or failure
75
     */
76
    Status generate_token(const std::string& broker_hostname, std::string* token,
77
                          int64_t* token_lifetime_ms);
78
79
    /**
80
     * Get current AWS credentials.
81
     * This will refresh credentials if they are expired or about to expire.
82
     */
83
    Status get_credentials(Aws::Auth::AWSCredentials* credentials);
84
85
private:
86
    // Create AWS credentials provider based on configuration
87
    std::shared_ptr<Aws::Auth::AWSCredentialsProvider> _create_credentials_provider();
88
    std::shared_ptr<Aws::Auth::AWSCredentialsProvider> _create_provider_from_type(
89
            const std::string& provider_type);
90
    std::shared_ptr<Aws::Auth::AWSCredentialsProvider> _create_assume_role_base_provider();
91
92
    // HMAC-SHA256 returning hex string
93
    std::string _hmac_sha256_hex(const std::string& key, const std::string& data);
94
95
    std::string _url_encode(const std::string& value);
96
97
    std::string _base64url_encode(const std::string& input);
98
99
    // Calculate AWS SigV4 signing key
100
    std::string _calculate_signing_key(const std::string& secret_key, const std::string& date_stamp,
101
                                       const std::string& region, const std::string& service);
102
103
    std::string _hmac_sha256(const std::string& key, const std::string& data);
104
105
    std::string _sha256(const std::string& data);
106
107
    std::string _get_timestamp();
108
109
    std::string _get_date_stamp(const std::string& timestamp);
110
111
    bool _should_refresh_credentials();
112
113
    Config _config;
114
    std::shared_ptr<Aws::Auth::AWSCredentialsProvider> _credentials_provider;
115
    std::mutex _mutex;
116
    Aws::Auth::AWSCredentials _cached_credentials;
117
    std::chrono::time_point<std::chrono::system_clock> _credentials_expiry;
118
};
119
120
/**
121
 * librdkafka OAUTHBEARER callback for AWS MSK IAM authentication.
122
 * 
123
 * This callback is invoked by librdkafka when it needs to refresh the
124
 * OAUTHBEARER token. It uses AwsMskIamAuth to generate the token.
125
 */
126
class AwsMskIamOAuthCallback : public RdKafka::OAuthBearerTokenRefreshCb {
127
public:
128
    /**
129
     * Create an OAuth callback from Kafka custom properties.
130
     * 
131
     * This factory method checks if AWS MSK IAM authentication is configured
132
     * (security.protocol=SASL_SSL and sasl.mechanism=OAUTHBEARER) and creates
133
     * the callback with proper configuration.
134
     * 
135
     * @param custom_properties Kafka custom properties map
136
     * @param brokers Kafka broker list (comma-separated)
137
     * @return unique_ptr to callback if IAM auth is configured, nullptr otherwise
138
     */
139
    static std::unique_ptr<AwsMskIamOAuthCallback> create_from_properties(
140
            const std::unordered_map<std::string, std::string>& custom_properties,
141
            const std::string& brokers);
142
143
    explicit AwsMskIamOAuthCallback(std::shared_ptr<AwsMskIamAuth> auth,
144
                                    std::string broker_hostname);
145
146
    /**
147
     * Synchronously refresh and set OAuth token.
148
     * Can be called directly during initialization or by the callback.
149
     *
150
     * @param handle The Kafka handle (consumer or producer)
151
     * @return Status indicating success or failure
152
     */
153
    Status refresh_now(RdKafka::Handle* handle);
154
155
    /**
156
     * Callback invoked by librdkafka to refresh OAuth token.
157
     *
158
     * @param handle The Kafka handle (consumer or producer)
159
     * @param oauthbearer_config Configuration string from 'sasl.oauthbearer.config'
160
     */
161
    void oauthbearer_token_refresh_cb(RdKafka::Handle* handle,
162
                                      const std::string& oauthbearer_config) override;
163
164
private:
165
    std::shared_ptr<AwsMskIamAuth> _auth;
166
    std::string _broker_hostname;
167
};
168
169
} // namespace doris