Coverage Report

Created: 2026-04-01 00:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/aws_msk_iam_auth.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <aws/core/auth/AWSCredentialsProvider.h>
21
#include <librdkafka/rdkafkacpp.h>
22
23
#include <memory>
24
#include <mutex>
25
#include <string>
26
#include <unordered_map>
27
28
#include "common/status.h"
29
30
namespace doris {
31
/**
32
 * AWS MSK IAM authentication token generator.
33
 * 
34
 * This class generates SASL/OAUTHBEARER tokens for AWS MSK IAM authentication.
35
 * It uses AWS SDK for C++ to obtain IAM credentials and generates signed tokens
36
 * that can be used with librdkafka's OAUTHBEARER mechanism.
37
 */
38
class AwsMskIamAuth {
39
public:
40
    struct Config {
41
        std::string region;       // AWS region (e.g., "us-east-1"), required
42
        std::string access_key;   // AWS Access Key ID (optional, for explicit credentials)
43
        std::string secret_key;   // AWS Secret Access Key (optional, for explicit credentials)
44
        std::string role_arn;     // IAM role ARN (optional, for assume role)
45
        std::string profile_name; // AWS profile name (optional, reads from ~/.aws/credentials)
46
        std::string
47
                credentials_provider; // Credentials provider type (optional, e.g., "ENV", "INSTANCE_PROFILE")
48
        int token_refresh_margin_ms = 60000; // Refresh token 60s before expiry
49
    };
50
51
    explicit AwsMskIamAuth(Config config);
52
12
    ~AwsMskIamAuth() = default;
53
54
    /**
55
     * Generate AWS MSK IAM authentication token.
56
     * 
57
     * The token is a base64url-encoded presigned URL following AWS SigV4 format:
58
     * https://kafka.<region>.amazonaws.com/?Action=kafka-cluster:Connect
59
     *   &X-Amz-Algorithm=AWS4-HMAC-SHA256
60
     *   &X-Amz-Credential=<access-key>/<date>/<region>/kafka-cluster/aws4_request
61
     *   &X-Amz-Date=<timestamp>
62
     *   &X-Amz-Expires=900
63
     *   &X-Amz-SignedHeaders=host
64
     *   &X-Amz-Signature=<signature>
65
     *   &X-Amz-Security-Token=<session-token>  // if using temporary credentials
66
     *   &User-Agent=doris-msk-iam-auth/1.0
67
     * 
68
     * Reference: https://github.com/aws/aws-msk-iam-sasl-signer-python
69
     * 
70
     * @param broker_hostname The MSK broker hostname (used for logging, not in token)
71
     * @param token Output: base64url-encoded signed URL token
72
     * @param token_lifetime_ms Output: token lifetime in milliseconds (3600000ms = 1 hour)
73
     * @return Status indicating success or failure
74
     */
75
    Status generate_token(const std::string& broker_hostname, std::string* token,
76
                          int64_t* token_lifetime_ms);
77
78
    /**
79
     * Get current AWS credentials.
80
     * This will refresh credentials if they are expired or about to expire.
81
     */
82
    Status get_credentials(Aws::Auth::AWSCredentials* credentials);
83
84
private:
85
    // Create AWS credentials provider based on configuration
86
    std::shared_ptr<Aws::Auth::AWSCredentialsProvider> _create_credentials_provider();
87
88
    // HMAC-SHA256 returning hex string
89
    std::string _hmac_sha256_hex(const std::string& key, const std::string& data);
90
91
    std::string _url_encode(const std::string& value);
92
93
    std::string _base64url_encode(const std::string& input);
94
95
    // Calculate AWS SigV4 signing key
96
    std::string _calculate_signing_key(const std::string& secret_key, const std::string& date_stamp,
97
                                       const std::string& region, const std::string& service);
98
99
    std::string _hmac_sha256(const std::string& key, const std::string& data);
100
101
    std::string _sha256(const std::string& data);
102
103
    std::string _get_timestamp();
104
105
    std::string _get_date_stamp(const std::string& timestamp);
106
107
    bool _should_refresh_credentials();
108
109
    Config _config;
110
    std::shared_ptr<Aws::Auth::AWSCredentialsProvider> _credentials_provider;
111
    std::mutex _mutex;
112
    Aws::Auth::AWSCredentials _cached_credentials;
113
    std::chrono::time_point<std::chrono::system_clock> _credentials_expiry;
114
};
115
116
/**
117
 * librdkafka OAUTHBEARER callback for AWS MSK IAM authentication.
118
 * 
119
 * This callback is invoked by librdkafka when it needs to refresh the
120
 * OAUTHBEARER token. It uses AwsMskIamAuth to generate the token.
121
 */
122
class AwsMskIamOAuthCallback : public RdKafka::OAuthBearerTokenRefreshCb {
123
public:
124
    /**
125
     * Create an OAuth callback from Kafka custom properties.
126
     * 
127
     * This factory method checks if AWS MSK IAM authentication is configured
128
     * (security.protocol=SASL_SSL and sasl.mechanism=OAUTHBEARER) and creates
129
     * the callback with proper configuration.
130
     * 
131
     * @param custom_properties Kafka custom properties map
132
     * @param brokers Kafka broker list (comma-separated)
133
     * @return unique_ptr to callback if IAM auth is configured, nullptr otherwise
134
     */
135
    static std::unique_ptr<AwsMskIamOAuthCallback> create_from_properties(
136
            const std::unordered_map<std::string, std::string>& custom_properties,
137
            const std::string& brokers);
138
139
    explicit AwsMskIamOAuthCallback(std::shared_ptr<AwsMskIamAuth> auth,
140
                                    std::string broker_hostname);
141
142
    /**
143
     * Synchronously refresh and set OAuth token.
144
     * Can be called directly during initialization or by the callback.
145
     *
146
     * @param handle The Kafka handle (consumer or producer)
147
     * @return Status indicating success or failure
148
     */
149
    Status refresh_now(RdKafka::Handle* handle);
150
151
    /**
152
     * Callback invoked by librdkafka to refresh OAuth token.
153
     *
154
     * @param handle The Kafka handle (consumer or producer)
155
     * @param oauthbearer_config Configuration string from 'sasl.oauthbearer.config'
156
     */
157
    void oauthbearer_token_refresh_cb(RdKafka::Handle* handle,
158
                                      const std::string& oauthbearer_config) override;
159
160
private:
161
    std::shared_ptr<AwsMskIamAuth> _auth;
162
    std::string _broker_hostname;
163
};
164
165
} // namespace doris