be/src/load/routine_load/data_consumer.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <aws/kinesis/KinesisClient.h> |
21 | | #include <aws/kinesis/model/GetRecordsRequest.h> |
22 | | #include <aws/kinesis/model/GetRecordsResult.h> |
23 | | #include <aws/kinesis/model/GetShardIteratorRequest.h> |
24 | | #include <aws/kinesis/model/ListShardsRequest.h> |
25 | | #include <aws/kinesis/model/Record.h> |
26 | | #include <stdint.h> |
27 | | |
28 | | #include <ctime> |
29 | | #include <map> |
30 | | #include <memory> |
31 | | #include <mutex> |
32 | | #include <ostream> |
33 | | #include <string> |
34 | | #include <unordered_map> |
35 | | #include <vector> |
36 | | |
37 | | #include "common/logging.h" |
38 | | #include "common/status.h" |
39 | | #include "librdkafka/rdkafkacpp.h" |
40 | | #include "load/routine_load/kinesis_conf.h" |
41 | | #include "load/stream_load/stream_load_context.h" |
42 | | #include "runtime/aws_msk_iam_auth.h" |
43 | | #include "util/uid_util.h" |
44 | | |
45 | | namespace doris { |
46 | | |
47 | | template <typename T> |
48 | | class BlockingQueue; |
49 | | |
50 | | class DataConsumer { |
51 | | public: |
52 | | DataConsumer() |
53 | 2 | : _id(UniqueId::gen_uid()), |
54 | 2 | _grp_id(UniqueId::gen_uid()), |
55 | 2 | _has_grp(false), |
56 | 2 | _init(false), |
57 | 2 | _cancelled(false), |
58 | 2 | _last_visit_time(0) {} |
59 | | |
60 | 2 | virtual ~DataConsumer() {} |
61 | | |
62 | | // init the consumer with the given parameters |
63 | | virtual Status init(std::shared_ptr<StreamLoadContext> ctx) = 0; |
64 | | // start consuming |
65 | | virtual Status consume(std::shared_ptr<StreamLoadContext> ctx) = 0; |
66 | | // cancel the consuming process. |
67 | | // if the consumer is not initialized, or the consuming |
68 | | // process is already finished, call cancel() will |
69 | | // return ERROR |
70 | | virtual Status cancel(std::shared_ptr<StreamLoadContext> ctx) = 0; |
71 | | // reset the data consumer before being reused |
72 | | virtual Status reset() = 0; |
73 | | // return true the if the consumer match the need |
74 | | virtual bool match(std::shared_ptr<StreamLoadContext> ctx) = 0; |
75 | | |
76 | 0 | const UniqueId& id() { return _id; } |
77 | 0 | time_t last_visit_time() { return _last_visit_time; } |
78 | 1 | void set_grp(const UniqueId& grp_id) { |
79 | 1 | _grp_id = grp_id; |
80 | 1 | _has_grp = true; |
81 | 1 | } |
82 | | |
83 | | protected: |
84 | | UniqueId _id; |
85 | | UniqueId _grp_id; |
86 | | bool _has_grp; |
87 | | |
88 | | // lock to protect the following bools |
89 | | std::mutex _lock; |
90 | | bool _init; |
91 | | bool _cancelled; |
92 | | time_t _last_visit_time; |
93 | | }; |
94 | | |
95 | | class PIntegerPair; |
96 | | |
97 | | class KafkaEventCb : public RdKafka::EventCb { |
98 | | public: |
99 | 19 | void event_cb(RdKafka::Event& event) { |
100 | 19 | switch (event.type()) { |
101 | 7 | case RdKafka::Event::EVENT_ERROR: |
102 | 7 | LOG(INFO) << "kafka error: " << RdKafka::err2str(event.err()) |
103 | 7 | << ", event: " << event.str(); |
104 | 7 | break; |
105 | 0 | case RdKafka::Event::EVENT_STATS: |
106 | 0 | LOG(INFO) << "kafka stats: " << event.str(); |
107 | 0 | break; |
108 | | |
109 | 12 | case RdKafka::Event::EVENT_LOG: |
110 | 12 | LOG(INFO) << "kafka log-" << event.severity() << "-" << event.fac().c_str() |
111 | 12 | << ", event: " << event.str(); |
112 | 12 | break; |
113 | | |
114 | 0 | case RdKafka::Event::EVENT_THROTTLE: |
115 | 0 | LOG(INFO) << "kafka throttled: " << event.throttle_time() << "ms by " |
116 | 0 | << event.broker_name() << " id " << (int)event.broker_id(); |
117 | 0 | break; |
118 | | |
119 | 0 | default: |
120 | 0 | LOG(INFO) << "kafka event: " << event.type() |
121 | 0 | << ", err: " << RdKafka::err2str(event.err()) << ", event: " << event.str(); |
122 | 0 | break; |
123 | 19 | } |
124 | 19 | } |
125 | | }; |
126 | | |
127 | | class KafkaDataConsumer : public DataConsumer { |
128 | | public: |
129 | | KafkaDataConsumer(std::shared_ptr<StreamLoadContext> ctx) |
130 | 2 | : _brokers(ctx->kafka_info->brokers), _topic(ctx->kafka_info->topic) {} |
131 | | |
132 | 2 | virtual ~KafkaDataConsumer() { |
133 | 2 | VLOG_NOTICE << "deconstruct consumer"; |
134 | 2 | if (_k_consumer) { |
135 | 2 | _k_consumer->close(); |
136 | 2 | delete _k_consumer; |
137 | 2 | _k_consumer = nullptr; |
138 | 2 | } |
139 | 2 | } |
140 | | |
141 | | Status init(std::shared_ptr<StreamLoadContext> ctx) override; |
142 | | // TODO(cmy): currently do not implement single consumer start method, using group_consume |
143 | 0 | Status consume(std::shared_ptr<StreamLoadContext> ctx) override { return Status::OK(); } |
144 | | Status cancel(std::shared_ptr<StreamLoadContext> ctx) override; |
145 | | // reassign partition topics |
146 | | virtual Status reset() override; |
147 | | bool match(std::shared_ptr<StreamLoadContext> ctx) override; |
148 | | // commit kafka offset |
149 | | Status commit(std::vector<RdKafka::TopicPartition*>& offset); |
150 | | |
151 | | Status assign_topic_partitions(const std::map<int32_t, int64_t>& begin_partition_offset, |
152 | | const std::string& topic, |
153 | | std::shared_ptr<StreamLoadContext> ctx); |
154 | | |
155 | | // start the consumer and put msgs to queue |
156 | | Status group_consume(BlockingQueue<RdKafka::Message*>* queue, int64_t max_running_time_ms); |
157 | | |
158 | | // get the partitions ids of the topic |
159 | | Status get_partition_meta(std::vector<int32_t>* partition_ids); |
160 | | // get offsets for times |
161 | | Status get_offsets_for_times(const std::vector<PIntegerPair>& times, |
162 | | std::vector<PIntegerPair>* offsets, int timeout); |
163 | | // get latest offsets for partitions |
164 | | Status get_latest_offsets_for_partitions(const std::vector<int32_t>& partition_ids, |
165 | | std::vector<PIntegerPair>* offsets, int timeout); |
166 | | // get offsets for times |
167 | | Status get_real_offsets_for_partitions(const std::vector<PIntegerPair>& offset_flags, |
168 | | std::vector<PIntegerPair>* offsets, int timeout); |
169 | | |
170 | | private: |
171 | | std::string _brokers; |
172 | | std::string _topic; |
173 | | std::unordered_map<std::string, std::string> _custom_properties; |
174 | | std::set<int32_t> _consuming_partition_ids; |
175 | | |
176 | | KafkaEventCb _k_event_cb; |
177 | | RdKafka::KafkaConsumer* _k_consumer = nullptr; |
178 | | |
179 | | // AWS MSK IAM authentication callback (must outlive _k_consumer) |
180 | | std::unique_ptr<AwsMskIamOAuthCallback> _aws_msk_oauth_callback; |
181 | | }; |
182 | | |
183 | | // AWS Kinesis Data Consumer |
184 | | // Consumes data from AWS Kinesis Data Streams for routine load jobs. |
185 | | // Kinesis is similar to Kafka but uses shards instead of partitions |
186 | | // and sequence numbers (strings) instead of offsets (integers). |
187 | | class KinesisDataConsumer : public DataConsumer { |
188 | | public: |
189 | | KinesisDataConsumer(std::shared_ptr<StreamLoadContext> ctx); |
190 | | virtual ~KinesisDataConsumer(); |
191 | | |
192 | | // DataConsumer interface implementation |
193 | | Status init(std::shared_ptr<StreamLoadContext> ctx) override; |
194 | 0 | Status consume(std::shared_ptr<StreamLoadContext> ctx) override { return Status::OK(); } |
195 | | Status cancel(std::shared_ptr<StreamLoadContext> ctx) override; |
196 | | Status reset() override; |
197 | | bool match(std::shared_ptr<StreamLoadContext> ctx) override; |
198 | | |
199 | | // Kinesis-specific methods |
200 | | // Assign shards with their starting sequence numbers |
201 | | Status assign_shards(const std::map<std::string, std::string>& shard_sequence_numbers, |
202 | | const std::string& stream_name, std::shared_ptr<StreamLoadContext> ctx); |
203 | | |
204 | | // Main consumption loop - pulls records from all assigned shards |
205 | | Status group_consume(BlockingQueue<std::shared_ptr<Aws::Kinesis::Model::Record>>* queue, |
206 | | int64_t max_running_time_ms); |
207 | | |
208 | | // Get list of shard IDs |
209 | | Status get_shard_list(std::vector<std::string>* shard_ids); |
210 | | |
211 | | private: |
212 | | // Configuration - Basic AWS settings |
213 | | std::string _region; |
214 | | std::string _stream; |
215 | | std::string _endpoint; // Optional custom endpoint (e.g., LocalStack) |
216 | | |
217 | | // Type 1: Doris-internal parameters (not passed to AWS SDK) |
218 | | std::unordered_map<std::string, std::string> _doris_internal_properties; |
219 | | |
220 | | // Type 2: Frequently-used AWS parameters (explicit members for performance) |
221 | | // These are parsed from aws.kinesis.* properties during init() |
222 | | std::vector<std::string> _explicit_shards; // aws.kinesis.shards (comma-separated) |
223 | | std::string _default_position; // aws.kinesis.default.pos (LATEST/TRIM_HORIZON) |
224 | | std::map<std::string, std::string> |
225 | | _shard_positions; // aws.kinesis.shards.pos (shard_id:position) |
226 | | |
227 | | // Type 3: Less-frequently-used AWS API parameters (wrapped in KinesisConf) |
228 | | std::unique_ptr<KinesisConf> _kinesis_conf; |
229 | | |
230 | | // AWS credentials and other properties |
231 | | std::unordered_map<std::string, std::string> _custom_properties; |
232 | | |
233 | | // Active shards being consumed |
234 | | std::set<std::string> _consuming_shard_ids; |
235 | | |
236 | | // AWS Kinesis client |
237 | | std::shared_ptr<Aws::Kinesis::KinesisClient> _kinesis_client; |
238 | | |
239 | | // Shard iterator management |
240 | | // Kinesis requires shard iterators to consume records |
241 | | // shard_id -> current shard iterator |
242 | | std::map<std::string, std::string> _shard_iterators; |
243 | | |
244 | | // Tracks the MillisBehindLatest value per shard from the last GetRecords call. |
245 | | // Updated during group_consume; read by the task executor to populate ctx after consumption. |
246 | | std::map<std::string, int64_t> _millis_behind_latest; |
247 | | |
248 | | // Tracks the last consumed sequence number per shard. |
249 | | // Updated during group_consume via _process_records; read by the consumer group |
250 | | // to populate ctx->kinesis_info->cmt_sequence_number after consumption. |
251 | | std::map<std::string, std::string> _committed_sequence_numbers; |
252 | | |
253 | | // Tracks shards that have been closed (split/merge) during consumption. |
254 | | // FE should remove these shards from its tracking to avoid reassigning them. |
255 | | std::set<std::string> _closed_shard_ids; |
256 | | |
257 | | public: |
258 | | // Returns the MillisBehindLatest snapshot collected during group_consume. |
259 | 0 | const std::map<std::string, int64_t>& get_millis_behind_latest() const { |
260 | 0 | return _millis_behind_latest; |
261 | 0 | } |
262 | | |
263 | | // Returns the committed sequence numbers per shard collected during group_consume. |
264 | 0 | const std::map<std::string, std::string>& get_committed_sequence_numbers() const { |
265 | 0 | return _committed_sequence_numbers; |
266 | 0 | } |
267 | | |
268 | | // Returns the set of closed shard IDs detected during group_consume. |
269 | 0 | const std::set<std::string>& get_closed_shard_ids() const { return _closed_shard_ids; } |
270 | | |
271 | | private: |
272 | | // Helper methods |
273 | | // Create and configure AWS Kinesis client with credentials |
274 | | Status _create_kinesis_client(std::shared_ptr<StreamLoadContext> ctx); |
275 | | |
276 | | // Get shard iterator for a shard at a specific sequence number position |
277 | | Status _get_shard_iterator(const std::string& shard_id, const std::string& sequence_number, |
278 | | std::string* iterator); |
279 | | |
280 | | // Process records from GetRecords result and add to queue |
281 | | Status _process_records(const std::string& shard_id, |
282 | | Aws::Kinesis::Model::GetRecordsResult result, |
283 | | BlockingQueue<std::shared_ptr<Aws::Kinesis::Model::Record>>* queue, |
284 | | int64_t* received_rows, int64_t* put_rows); |
285 | | |
286 | | // Check if an AWS error is retriable (throttling, network, etc.) |
287 | | bool _is_retriable_error(const Aws::Client::AWSError<Aws::Kinesis::KinesisErrors>& error); |
288 | | }; |
289 | | |
290 | | } // end namespace doris |