be/src/runtime/aws_msk_iam_auth.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <aws/core/auth/AWSCredentialsProvider.h> |
21 | | #include <librdkafka/rdkafkacpp.h> |
22 | | |
23 | | #include <memory> |
24 | | #include <mutex> |
25 | | #include <string> |
26 | | #include <unordered_map> |
27 | | |
28 | | #include "common/status.h" |
29 | | |
30 | | namespace doris { |
31 | | /** |
32 | | * AWS MSK IAM authentication token generator. |
33 | | * |
34 | | * This class generates SASL/OAUTHBEARER tokens for AWS MSK IAM authentication. |
35 | | * It uses AWS SDK for C++ to obtain IAM credentials and generates signed tokens |
36 | | * that can be used with librdkafka's OAUTHBEARER mechanism. |
37 | | */ |
38 | | class AwsMskIamAuth { |
39 | | public: |
40 | | struct Config { |
41 | | std::string region; // AWS region (e.g., "us-east-1"), required |
42 | | std::string access_key; // AWS Access Key ID (optional, for explicit credentials) |
43 | | std::string secret_key; // AWS Secret Access Key (optional, for explicit credentials) |
44 | | std::string role_arn; // IAM role ARN (optional, for assume role) |
45 | | std::string profile_name; // AWS profile name (optional, reads from ~/.aws/credentials) |
46 | | std::string |
47 | | credentials_provider; // Credentials provider type (optional, e.g., "ENV", "INSTANCE_PROFILE") |
48 | | int token_refresh_margin_ms = 60000; // Refresh token 60s before expiry |
49 | | }; |
50 | | |
51 | | explicit AwsMskIamAuth(Config config); |
52 | 12 | ~AwsMskIamAuth() = default; |
53 | | |
54 | | /** |
55 | | * Generate AWS MSK IAM authentication token. |
56 | | * |
57 | | * The token is a base64url-encoded presigned URL following AWS SigV4 format: |
58 | | * https://kafka.<region>.amazonaws.com/?Action=kafka-cluster:Connect |
59 | | * &X-Amz-Algorithm=AWS4-HMAC-SHA256 |
60 | | * &X-Amz-Credential=<access-key>/<date>/<region>/kafka-cluster/aws4_request |
61 | | * &X-Amz-Date=<timestamp> |
62 | | * &X-Amz-Expires=900 |
63 | | * &X-Amz-SignedHeaders=host |
64 | | * &X-Amz-Signature=<signature> |
65 | | * &X-Amz-Security-Token=<session-token> // if using temporary credentials |
66 | | * &User-Agent=doris-msk-iam-auth/1.0 |
67 | | * |
68 | | * Reference: https://github.com/aws/aws-msk-iam-sasl-signer-python |
69 | | * |
70 | | * @param broker_hostname The MSK broker hostname (used for logging, not in token) |
71 | | * @param token Output: base64url-encoded signed URL token |
72 | | * @param token_lifetime_ms Output: token lifetime in milliseconds (3600000ms = 1 hour) |
73 | | * @return Status indicating success or failure |
74 | | */ |
75 | | Status generate_token(const std::string& broker_hostname, std::string* token, |
76 | | int64_t* token_lifetime_ms); |
77 | | |
78 | | /** |
79 | | * Get current AWS credentials. |
80 | | * This will refresh credentials if they are expired or about to expire. |
81 | | */ |
82 | | Status get_credentials(Aws::Auth::AWSCredentials* credentials); |
83 | | |
84 | | private: |
85 | | // Create AWS credentials provider based on configuration |
86 | | std::shared_ptr<Aws::Auth::AWSCredentialsProvider> _create_credentials_provider(); |
87 | | |
88 | | // HMAC-SHA256 returning hex string |
89 | | std::string _hmac_sha256_hex(const std::string& key, const std::string& data); |
90 | | |
91 | | std::string _url_encode(const std::string& value); |
92 | | |
93 | | std::string _base64url_encode(const std::string& input); |
94 | | |
95 | | // Calculate AWS SigV4 signing key |
96 | | std::string _calculate_signing_key(const std::string& secret_key, const std::string& date_stamp, |
97 | | const std::string& region, const std::string& service); |
98 | | |
99 | | std::string _hmac_sha256(const std::string& key, const std::string& data); |
100 | | |
101 | | std::string _sha256(const std::string& data); |
102 | | |
103 | | std::string _get_timestamp(); |
104 | | |
105 | | std::string _get_date_stamp(const std::string& timestamp); |
106 | | |
107 | | bool _should_refresh_credentials(); |
108 | | |
109 | | Config _config; |
110 | | std::shared_ptr<Aws::Auth::AWSCredentialsProvider> _credentials_provider; |
111 | | std::mutex _mutex; |
112 | | Aws::Auth::AWSCredentials _cached_credentials; |
113 | | std::chrono::time_point<std::chrono::system_clock> _credentials_expiry; |
114 | | }; |
115 | | |
116 | | /** |
117 | | * librdkafka OAUTHBEARER callback for AWS MSK IAM authentication. |
118 | | * |
119 | | * This callback is invoked by librdkafka when it needs to refresh the |
120 | | * OAUTHBEARER token. It uses AwsMskIamAuth to generate the token. |
121 | | */ |
122 | | class AwsMskIamOAuthCallback : public RdKafka::OAuthBearerTokenRefreshCb { |
123 | | public: |
124 | | /** |
125 | | * Create an OAuth callback from Kafka custom properties. |
126 | | * |
127 | | * This factory method checks if AWS MSK IAM authentication is configured |
128 | | * (security.protocol=SASL_SSL and sasl.mechanism=OAUTHBEARER) and creates |
129 | | * the callback with proper configuration. |
130 | | * |
131 | | * @param custom_properties Kafka custom properties map |
132 | | * @param brokers Kafka broker list (comma-separated) |
133 | | * @return unique_ptr to callback if IAM auth is configured, nullptr otherwise |
134 | | */ |
135 | | static std::unique_ptr<AwsMskIamOAuthCallback> create_from_properties( |
136 | | const std::unordered_map<std::string, std::string>& custom_properties, |
137 | | const std::string& brokers); |
138 | | |
139 | | explicit AwsMskIamOAuthCallback(std::shared_ptr<AwsMskIamAuth> auth, |
140 | | std::string broker_hostname); |
141 | | |
142 | | /** |
143 | | * Synchronously refresh and set OAuth token. |
144 | | * Can be called directly during initialization or by the callback. |
145 | | * |
146 | | * @param handle The Kafka handle (consumer or producer) |
147 | | * @return Status indicating success or failure |
148 | | */ |
149 | | Status refresh_now(RdKafka::Handle* handle); |
150 | | |
151 | | /** |
152 | | * Callback invoked by librdkafka to refresh OAuth token. |
153 | | * |
154 | | * @param handle The Kafka handle (consumer or producer) |
155 | | * @param oauthbearer_config Configuration string from 'sasl.oauthbearer.config' |
156 | | */ |
157 | | void oauthbearer_token_refresh_cb(RdKafka::Handle* handle, |
158 | | const std::string& oauthbearer_config) override; |
159 | | |
160 | | private: |
161 | | std::shared_ptr<AwsMskIamAuth> _auth; |
162 | | std::string _broker_hostname; |
163 | | }; |
164 | | |
165 | | } // namespace doris |