be/src/load/routine_load/data_consumer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "load/routine_load/data_consumer.h" |
19 | | |
20 | | #include <absl/strings/str_split.h> |
21 | | #include <gen_cpp/Types_types.h> |
22 | | #include <gen_cpp/internal_service.pb.h> |
23 | | #include <librdkafka/rdkafkacpp.h> |
24 | | |
25 | | // AWS Kinesis SDK includes |
26 | | #include <aws/core/client/ClientConfiguration.h> |
27 | | #include <aws/core/utils/Outcome.h> |
28 | | #include <aws/kinesis/KinesisClient.h> |
29 | | #include <aws/kinesis/model/GetRecordsRequest.h> |
30 | | #include <aws/kinesis/model/GetRecordsResult.h> |
31 | | #include <aws/kinesis/model/GetShardIteratorRequest.h> |
32 | | #include <aws/kinesis/model/GetShardIteratorResult.h> |
33 | | #include <aws/kinesis/model/ListShardsRequest.h> |
34 | | #include <aws/kinesis/model/ListShardsResult.h> |
35 | | #include <aws/kinesis/model/Record.h> |
36 | | #include <aws/kinesis/model/ShardIteratorType.h> |
37 | | |
38 | | #include <algorithm> |
39 | | // IWYU pragma: no_include <bits/chrono.h> |
40 | | #include <chrono> // IWYU pragma: keep |
41 | | #include <string> |
42 | | #include <thread> |
43 | | #include <utility> |
44 | | #include <vector> |
45 | | |
46 | | #include "common/config.h" |
47 | | #include "common/metrics/doris_metrics.h" |
48 | | #include "common/status.h" |
49 | | #include "load/routine_load/consumer_helpers.h" |
50 | | #include "load/routine_load/kinesis_conf.h" |
51 | | #include "runtime/aws_msk_iam_auth.h" |
52 | | #include "runtime/exec_env.h" |
53 | | #include "runtime/small_file_mgr.h" |
54 | | #include "service/backend_options.h" |
55 | | #include "util/blocking_queue.hpp" |
56 | | #include "util/debug_points.h" |
57 | | #include "util/defer_op.h" |
58 | | #include "util/s3_util.h" |
59 | | #include "util/stopwatch.hpp" |
60 | | #include "util/string_util.h" |
61 | | #include "util/uid_util.h" |
62 | | |
63 | | namespace doris { |
64 | | |
65 | | static const std::string PROP_GROUP_ID = "group.id"; |
66 | | // init kafka consumer will only set common configs such as |
67 | | // brokers, groupid |
68 | 2 | Status KafkaDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) { |
69 | 2 | std::unique_lock<std::mutex> l(_lock); |
70 | 2 | if (_init) { |
71 | | // this consumer has already been initialized. |
72 | 0 | return Status::OK(); |
73 | 0 | } |
74 | | |
75 | 2 | RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); |
76 | | |
77 | | // conf has to be deleted finally |
78 | 2 | Defer delete_conf {[conf]() { delete conf; }}; |
79 | | |
80 | 2 | std::string errstr; |
81 | 26 | auto set_conf = [&conf, &errstr](const std::string& conf_key, const std::string& conf_val) { |
82 | 26 | RdKafka::Conf::ConfResult res = conf->set(conf_key, conf_val, errstr); |
83 | 26 | if (res == RdKafka::Conf::CONF_UNKNOWN) { |
84 | | // ignore unknown config |
85 | 0 | return Status::OK(); |
86 | 26 | } else if (errstr.find("not supported") != std::string::npos) { |
87 | | // some java-only properties may be passed to here, and librdkafak will return 'xxx' not supported |
88 | | // ignore it |
89 | 0 | return Status::OK(); |
90 | 26 | } else if (res != RdKafka::Conf::CONF_OK) { |
91 | 0 | std::stringstream ss; |
92 | 0 | ss << "PAUSE: failed to set '" << conf_key << "', value: '" << conf_val |
93 | 0 | << "', err: " << errstr; |
94 | 0 | LOG(WARNING) << ss.str(); |
95 | 0 | return Status::InternalError(ss.str()); |
96 | 0 | } |
97 | 26 | VLOG_NOTICE << "set " << conf_key << ": " << conf_val; |
98 | 26 | return Status::OK(); |
99 | 26 | }; |
100 | | |
101 | 2 | RETURN_IF_ERROR(set_conf("metadata.broker.list", ctx->kafka_info->brokers)); |
102 | 2 | RETURN_IF_ERROR(set_conf("enable.partition.eof", "true")); |
103 | 2 | RETURN_IF_ERROR(set_conf("enable.auto.offset.store", "false")); |
104 | | // TODO: set it larger than 0 after we set rd_kafka_conf_set_stats_cb() |
105 | 2 | RETURN_IF_ERROR(set_conf("statistics.interval.ms", "0")); |
106 | 2 | RETURN_IF_ERROR(set_conf("auto.offset.reset", "error")); |
107 | 2 | RETURN_IF_ERROR(set_conf("socket.keepalive.enable", "true")); |
108 | 2 | RETURN_IF_ERROR(set_conf("reconnect.backoff.ms", "100")); |
109 | 2 | RETURN_IF_ERROR(set_conf("reconnect.backoff.max.ms", "10000")); |
110 | 2 | RETURN_IF_ERROR(set_conf("api.version.request", config::kafka_api_version_request)); |
111 | 2 | RETURN_IF_ERROR(set_conf("api.version.fallback.ms", "0")); |
112 | 2 | RETURN_IF_ERROR(set_conf("broker.version.fallback", config::kafka_broker_version_fallback)); |
113 | 2 | RETURN_IF_ERROR(set_conf("broker.address.ttl", "0")); |
114 | 2 | if (config::kafka_debug != "disable") { |
115 | 0 | RETURN_IF_ERROR(set_conf("debug", config::kafka_debug)); |
116 | 0 | } |
117 | | |
118 | 2 | for (auto& item : ctx->kafka_info->properties) { |
119 | 0 | _custom_properties.emplace(item.first, item.second); |
120 | | |
121 | | // AWS properties (aws.*) are Doris-specific for MSK IAM authentication |
122 | | // and should not be passed to librdkafka |
123 | 0 | if (starts_with(item.first, "aws.")) { |
124 | 0 | LOG(INFO) << "Skipping AWS property for librdkafka: " << item.first; |
125 | 0 | continue; |
126 | 0 | } |
127 | | |
128 | 0 | if (starts_with(item.second, "FILE:")) { |
129 | | // file property should has format: FILE:file_id:md5 |
130 | 0 | std::vector<std::string> parts = |
131 | 0 | absl::StrSplit(item.second, ":", absl::SkipWhitespace()); |
132 | 0 | if (parts.size() != 3) { |
133 | 0 | return Status::InternalError("PAUSE: Invalid file property of kafka: " + |
134 | 0 | item.second); |
135 | 0 | } |
136 | 0 | int64_t file_id = std::stol(parts[1]); |
137 | 0 | std::string file_path; |
138 | 0 | Status st = ctx->exec_env()->small_file_mgr()->get_file(file_id, parts[2], &file_path); |
139 | 0 | if (!st.ok()) { |
140 | 0 | return Status::InternalError("PAUSE: failed to get file for config: {}, error: {}", |
141 | 0 | item.first, st.to_string()); |
142 | 0 | } |
143 | 0 | RETURN_IF_ERROR(set_conf(item.first, file_path)); |
144 | 0 | } else { |
145 | 0 | RETURN_IF_ERROR(set_conf(item.first, item.second)); |
146 | 0 | } |
147 | 0 | } |
148 | | |
149 | | // if not specified group id, generate a random one. |
150 | | // ATTN: In the new version, we have set a group.id on the FE side for jobs that have not set a groupid, |
151 | | // but in order to ensure compatibility, we still do a check here. |
152 | 2 | if (!_custom_properties.contains(PROP_GROUP_ID)) { |
153 | 2 | std::stringstream ss; |
154 | 2 | ss << BackendOptions::get_localhost() << "_"; |
155 | 2 | std::string group_id = ss.str() + UniqueId::gen_uid().to_string(); |
156 | 2 | RETURN_IF_ERROR(set_conf(PROP_GROUP_ID, group_id)); |
157 | 2 | _custom_properties.emplace(PROP_GROUP_ID, group_id); |
158 | 2 | } |
159 | 2 | LOG(INFO) << "init kafka consumer with group id: " << _custom_properties[PROP_GROUP_ID]; |
160 | | |
161 | 2 | if (conf->set("event_cb", &_k_event_cb, errstr) != RdKafka::Conf::CONF_OK) { |
162 | 0 | std::stringstream ss; |
163 | 0 | ss << "PAUSE: failed to set 'event_cb'"; |
164 | 0 | LOG(WARNING) << ss.str(); |
165 | 0 | return Status::InternalError(ss.str()); |
166 | 0 | } |
167 | | |
168 | | // Set up AWS MSK IAM authentication if configured |
169 | 2 | _aws_msk_oauth_callback = AwsMskIamOAuthCallback::create_from_properties( |
170 | 2 | _custom_properties, ctx->kafka_info->brokers); |
171 | 2 | if (_aws_msk_oauth_callback) { |
172 | | // Enable SASL queue to support background callbacks |
173 | 0 | if (conf->enable_sasl_queue(true, errstr) != RdKafka::Conf::CONF_OK) { |
174 | 0 | LOG(WARNING) << "PAUSE: failed to enable SASL queue: " << errstr; |
175 | 0 | return Status::InternalError("PAUSE: failed to enable SASL queue: " + errstr); |
176 | 0 | } |
177 | | |
178 | 0 | if (conf->set("oauthbearer_token_refresh_cb", _aws_msk_oauth_callback.get(), errstr) != |
179 | 0 | RdKafka::Conf::CONF_OK) { |
180 | 0 | LOG(WARNING) << "PAUSE: failed to set OAuth callback: " << errstr; |
181 | 0 | return Status::InternalError("PAUSE: failed to set OAuth callback: " + errstr); |
182 | 0 | } |
183 | 0 | LOG(INFO) << "AWS MSK IAM authentication enabled successfully"; |
184 | 0 | } |
185 | | |
186 | | // create consumer |
187 | 2 | _k_consumer = RdKafka::KafkaConsumer::create(conf, errstr); |
188 | 2 | if (!_k_consumer) { |
189 | 0 | LOG(WARNING) << "PAUSE: failed to create kafka consumer: " << errstr; |
190 | 0 | return Status::InternalError("PAUSE: failed to create kafka consumer: " + errstr); |
191 | 0 | } |
192 | | |
193 | | // If AWS MSK IAM auth is enabled, inject initial token and enable background refresh |
194 | 2 | if (_aws_msk_oauth_callback) { |
195 | 0 | RETURN_IF_ERROR(_aws_msk_oauth_callback->refresh_now(_k_consumer)); |
196 | | |
197 | 0 | std::unique_ptr<RdKafka::Error> bg_err(_k_consumer->sasl_background_callbacks_enable()); |
198 | 0 | if (bg_err) { |
199 | 0 | return Status::InternalError("Failed to enable SASL background callbacks: " + |
200 | 0 | bg_err->str()); |
201 | 0 | } |
202 | 0 | LOG(INFO) << "AWS MSK IAM: initial token set, background refresh enabled"; |
203 | 0 | } |
204 | | |
205 | 2 | VLOG_NOTICE << "finished to init kafka consumer. " << ctx->brief(); |
206 | | |
207 | 2 | _init = true; |
208 | 2 | return Status::OK(); |
209 | 2 | } |
210 | | |
211 | | Status KafkaDataConsumer::assign_topic_partitions( |
212 | | const std::map<int32_t, int64_t>& begin_partition_offset, const std::string& topic, |
213 | 1 | std::shared_ptr<StreamLoadContext> ctx) { |
214 | 1 | DCHECK(_k_consumer); |
215 | | // create TopicPartitions |
216 | 1 | std::stringstream ss; |
217 | 1 | std::vector<RdKafka::TopicPartition*> topic_partitions; |
218 | 1 | for (auto& entry : begin_partition_offset) { |
219 | 1 | RdKafka::TopicPartition* tp1 = |
220 | 1 | RdKafka::TopicPartition::create(topic, entry.first, entry.second); |
221 | 1 | topic_partitions.push_back(tp1); |
222 | 1 | _consuming_partition_ids.insert(entry.first); |
223 | 1 | ss << "[" << entry.first << ": " << entry.second << "] "; |
224 | 1 | } |
225 | | |
226 | 1 | LOG(INFO) << "consumer: " << _id << ", grp: " << _grp_id |
227 | 1 | << " assign topic partitions: " << topic << ", " << ss.str(); |
228 | | |
229 | | // delete TopicPartition finally |
230 | 1 | Defer delete_tp {[&topic_partitions]() { |
231 | 1 | std::for_each(topic_partitions.begin(), topic_partitions.end(), |
232 | 1 | [](RdKafka::TopicPartition* tp1) { delete tp1; }); |
233 | 1 | }}; |
234 | | |
235 | | // assign partition |
236 | 1 | RdKafka::ErrorCode err = _k_consumer->assign(topic_partitions); |
237 | 1 | if (err) { |
238 | 0 | LOG(WARNING) << "failed to assign topic partitions: " << ctx->brief(true) |
239 | 0 | << ", err: " << RdKafka::err2str(err); |
240 | 0 | _k_consumer->unassign(); |
241 | 0 | return Status::InternalError("failed to assign topic partitions"); |
242 | 0 | } |
243 | | |
244 | 1 | return Status::OK(); |
245 | 1 | } |
246 | | |
247 | | Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue, |
248 | 1 | int64_t max_running_time_ms) { |
249 | 1 | int64_t left_time = max_running_time_ms; |
250 | 1 | LOG(INFO) << "start kafka consumer: " << _id << ", grp: " << _grp_id |
251 | 1 | << ", max running time(ms): " << left_time; |
252 | | |
253 | 1 | int64_t received_rows = 0; |
254 | 1 | int64_t put_rows = 0; |
255 | 1 | RetryPolicy retry_policy(3, 200); |
256 | 1 | Status st = Status::OK(); |
257 | 1 | MonotonicStopWatch consumer_watch; |
258 | 1 | MonotonicStopWatch watch; |
259 | 1 | watch.start(); |
260 | | |
261 | 6 | while (true) { |
262 | 6 | { |
263 | 6 | std::unique_lock<std::mutex> l(_lock); |
264 | 6 | if (_cancelled) { |
265 | 0 | break; |
266 | 0 | } |
267 | 6 | } |
268 | | |
269 | 6 | if (left_time <= 0) { |
270 | 1 | break; |
271 | 1 | } |
272 | | |
273 | 5 | bool done = false; |
274 | | // consume 1 message at a time |
275 | 5 | consumer_watch.start(); |
276 | 5 | std::unique_ptr<RdKafka::Message> msg(_k_consumer->consume(1000 /* timeout, ms */)); |
277 | 5 | consumer_watch.stop(); |
278 | | |
279 | 5 | DorisMetrics::instance()->routine_load_get_msg_count->increment(1); |
280 | 5 | DorisMetrics::instance()->routine_load_get_msg_latency->increment( |
281 | 5 | consumer_watch.elapsed_time() / 1000 / 1000); |
282 | | |
283 | 5 | DBUG_EXECUTE_IF("KafkaDataConsumer.group_consume.out_of_range", { |
284 | 5 | done = true; |
285 | 5 | std::stringstream ss; |
286 | 5 | ss << "Offset out of range" |
287 | 5 | << ", consume partition " << msg->partition() << ", consume offset " |
288 | 5 | << msg->offset(); |
289 | 5 | LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << ss.str(); |
290 | 5 | st = Status::InternalError<false>(ss.str()); |
291 | 5 | break; |
292 | 5 | }); |
293 | | |
294 | 5 | switch (msg->err()) { |
295 | 0 | case RdKafka::ERR_NO_ERROR: |
296 | 0 | retry_policy.reset(); |
297 | 0 | if (_consuming_partition_ids.count(msg->partition()) <= 0) { |
298 | 0 | _consuming_partition_ids.insert(msg->partition()); |
299 | 0 | } |
300 | 0 | DorisMetrics::instance()->routine_load_consume_bytes->increment(msg->len()); |
301 | 0 | if (msg->len() == 0) { |
302 | | // ignore msg with length 0. |
303 | | // put empty msg into queue will cause the load process shutting down. |
304 | 0 | break; |
305 | 0 | } else if (!queue->controlled_blocking_put(msg.get(), |
306 | 0 | config::blocking_queue_cv_wait_timeout_ms)) { |
307 | | // queue is shutdown |
308 | 0 | done = true; |
309 | 0 | } else { |
310 | 0 | ++put_rows; |
311 | 0 | msg.release(); // release the ownership, msg will be deleted after being processed |
312 | 0 | } |
313 | 0 | ++received_rows; |
314 | 0 | DorisMetrics::instance()->routine_load_consume_rows->increment(1); |
315 | 0 | break; |
316 | 5 | case RdKafka::ERR__TIMED_OUT: |
317 | | // leave the status as OK, because this may happened |
318 | | // if there is no data in kafka. |
319 | 5 | LOG(INFO) << "kafka consume timeout: " << _id; |
320 | 5 | break; |
321 | 0 | case RdKafka::ERR__TRANSPORT: |
322 | 0 | LOG(INFO) << "kafka consume Disconnected: " << _id |
323 | 0 | << ", retry times: " << retry_policy.retry_count(); |
324 | 0 | if (retry_policy.should_retry()) { |
325 | 0 | retry_policy.retry_with_backoff(); |
326 | 0 | break; |
327 | 0 | } |
328 | 0 | [[fallthrough]]; |
329 | 0 | case RdKafka::ERR__PARTITION_EOF: { |
330 | 0 | VLOG_NOTICE << "consumer meet partition eof: " << _id |
331 | 0 | << " partition offset: " << msg->offset(); |
332 | 0 | _consuming_partition_ids.erase(msg->partition()); |
333 | 0 | if (!queue->controlled_blocking_put(msg.get(), |
334 | 0 | config::blocking_queue_cv_wait_timeout_ms)) { |
335 | 0 | done = true; |
336 | 0 | } else if (_consuming_partition_ids.size() <= 0) { |
337 | 0 | LOG(INFO) << "all partitions meet eof: " << _id; |
338 | 0 | msg.release(); |
339 | 0 | done = true; |
340 | 0 | } else { |
341 | 0 | msg.release(); |
342 | 0 | } |
343 | 0 | break; |
344 | 0 | } |
345 | 0 | case RdKafka::ERR_OFFSET_OUT_OF_RANGE: { |
346 | 0 | done = true; |
347 | 0 | std::stringstream ss; |
348 | 0 | ss << msg->errstr() << ", consume partition " << msg->partition() << ", consume offset " |
349 | 0 | << msg->offset(); |
350 | 0 | LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << ss.str(); |
351 | 0 | st = Status::InternalError<false>(ss.str()); |
352 | 0 | break; |
353 | 0 | } |
354 | 0 | default: |
355 | 0 | LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr(); |
356 | 0 | done = true; |
357 | 0 | st = Status::InternalError<false>(msg->errstr()); |
358 | 0 | break; |
359 | 5 | } |
360 | | |
361 | 5 | left_time = max_running_time_ms - watch.elapsed_time() / 1000 / 1000; |
362 | 5 | if (done) { |
363 | 0 | break; |
364 | 0 | } |
365 | 5 | } |
366 | | |
367 | 1 | LOG(INFO) << "kafka consumer done: " << _id << ", grp: " << _grp_id |
368 | 1 | << ". cancelled: " << _cancelled << ", left time(ms): " << left_time |
369 | 1 | << ", total cost(ms): " << watch.elapsed_time() / 1000 / 1000 |
370 | 1 | << ", consume cost(ms): " << consumer_watch.elapsed_time() / 1000 / 1000 |
371 | 1 | << ", received rows: " << received_rows << ", put rows: " << put_rows; |
372 | | |
373 | 1 | return st; |
374 | 1 | } |
375 | | |
376 | 0 | Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids) { |
377 | | // create topic conf |
378 | 0 | RdKafka::Conf* tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); |
379 | 0 | Defer delete_conf {[tconf]() { delete tconf; }}; |
380 | | |
381 | | // create topic |
382 | 0 | std::string errstr; |
383 | 0 | RdKafka::Topic* topic = RdKafka::Topic::create(_k_consumer, _topic, tconf, errstr); |
384 | 0 | if (topic == nullptr) { |
385 | 0 | std::stringstream ss; |
386 | 0 | ss << "failed to create topic: " << errstr; |
387 | 0 | LOG(WARNING) << ss.str(); |
388 | 0 | return Status::InternalError(ss.str()); |
389 | 0 | } |
390 | | |
391 | 0 | Defer delete_topic {[topic]() { delete topic; }}; |
392 | | |
393 | | // get topic metadata |
394 | 0 | RdKafka::Metadata* metadata = nullptr; |
395 | 0 | RdKafka::ErrorCode err = |
396 | 0 | _k_consumer->metadata(false /* for this topic */, topic, &metadata, 5000); |
397 | 0 | if (err != RdKafka::ERR_NO_ERROR) { |
398 | 0 | std::stringstream ss; |
399 | 0 | ss << "failed to get partition meta: " << RdKafka::err2str(err); |
400 | 0 | LOG(WARNING) << ss.str(); |
401 | 0 | return Status::InternalError(ss.str()); |
402 | 0 | } |
403 | | |
404 | 0 | Defer delete_meta {[metadata]() { delete metadata; }}; |
405 | | |
406 | | // get partition ids |
407 | 0 | RdKafka::Metadata::TopicMetadataIterator it; |
408 | 0 | for (it = metadata->topics()->begin(); it != metadata->topics()->end(); ++it) { |
409 | 0 | if ((*it)->topic() != _topic) { |
410 | 0 | continue; |
411 | 0 | } |
412 | | |
413 | 0 | if ((*it)->err() != RdKafka::ERR_NO_ERROR) { |
414 | 0 | std::stringstream ss; |
415 | 0 | ss << "error: " << err2str((*it)->err()); |
416 | 0 | if ((*it)->err() == RdKafka::ERR_LEADER_NOT_AVAILABLE) { |
417 | 0 | ss << ", try again"; |
418 | 0 | } |
419 | 0 | LOG(WARNING) << ss.str(); |
420 | 0 | return Status::InternalError(ss.str()); |
421 | 0 | } |
422 | | |
423 | 0 | RdKafka::TopicMetadata::PartitionMetadataIterator ip; |
424 | 0 | for (ip = (*it)->partitions()->begin(); ip != (*it)->partitions()->end(); ++ip) { |
425 | 0 | partition_ids->push_back((*ip)->id()); |
426 | 0 | } |
427 | 0 | } |
428 | | |
429 | 0 | if (partition_ids->empty()) { |
430 | 0 | return Status::InternalError("no partition in this topic"); |
431 | 0 | } |
432 | | |
433 | 0 | return Status::OK(); |
434 | 0 | } |
435 | | |
436 | | // get offsets of each partition for times. |
437 | | // The input parameter "times" holds <partition, timestamps> |
438 | | // The output parameter "offsets" returns <partition, offsets> |
439 | | // |
440 | | // The returned offset for each partition is the earliest offset whose |
441 | | // timestamp is greater than or equal to the given timestamp in the |
442 | | // corresponding partition. |
443 | | // See librdkafka/rdkafkacpp.h##offsetsForTimes() |
444 | | Status KafkaDataConsumer::get_offsets_for_times(const std::vector<PIntegerPair>& times, |
445 | 0 | std::vector<PIntegerPair>* offsets, int timeout) { |
446 | | // create topic partition |
447 | 0 | std::vector<RdKafka::TopicPartition*> topic_partitions; |
448 | 0 | for (const auto& entry : times) { |
449 | 0 | RdKafka::TopicPartition* tp1 = |
450 | 0 | RdKafka::TopicPartition::create(_topic, entry.key(), entry.val()); |
451 | 0 | topic_partitions.push_back(tp1); |
452 | 0 | } |
453 | | // delete TopicPartition finally |
454 | 0 | Defer delete_tp {[&topic_partitions]() { |
455 | 0 | std::for_each(topic_partitions.begin(), topic_partitions.end(), |
456 | 0 | [](RdKafka::TopicPartition* tp1) { delete tp1; }); |
457 | 0 | }}; |
458 | | |
459 | | // get offsets for times |
460 | 0 | RdKafka::ErrorCode err = _k_consumer->offsetsForTimes(topic_partitions, timeout); |
461 | 0 | if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) { |
462 | 0 | std::stringstream ss; |
463 | 0 | ss << "failed to get offsets for times: " << RdKafka::err2str(err); |
464 | 0 | LOG(WARNING) << ss.str(); |
465 | 0 | return Status::InternalError(ss.str()); |
466 | 0 | } |
467 | | |
468 | 0 | for (const auto& topic_partition : topic_partitions) { |
469 | 0 | PIntegerPair pair; |
470 | 0 | pair.set_key(topic_partition->partition()); |
471 | 0 | pair.set_val(topic_partition->offset()); |
472 | 0 | offsets->push_back(std::move(pair)); |
473 | 0 | } |
474 | |
|
475 | 0 | return Status::OK(); |
476 | 0 | } |
477 | | |
478 | | // get latest offsets for given partitions |
479 | | Status KafkaDataConsumer::get_latest_offsets_for_partitions( |
480 | | const std::vector<int32_t>& partition_ids, std::vector<PIntegerPair>* offsets, |
481 | 0 | int timeout) { |
482 | 0 | DBUG_EXECUTE_IF("KafkaDataConsumer.get_offsets_for_partitions.timeout", { |
483 | | // sleep 61s |
484 | 0 | std::this_thread::sleep_for(std::chrono::seconds(61)); |
485 | 0 | }); |
486 | 0 | MonotonicStopWatch watch; |
487 | 0 | watch.start(); |
488 | 0 | for (int32_t partition_id : partition_ids) { |
489 | 0 | int64_t low = 0; |
490 | 0 | int64_t high = 0; |
491 | 0 | auto timeout_ms = timeout - static_cast<int>(watch.elapsed_time() / 1000 / 1000); |
492 | 0 | if (UNLIKELY(timeout_ms <= 0)) { |
493 | 0 | return Status::InternalError("get kafka latest offsets for partitions timeout"); |
494 | 0 | } |
495 | | |
496 | 0 | RdKafka::ErrorCode err = |
497 | 0 | _k_consumer->query_watermark_offsets(_topic, partition_id, &low, &high, timeout_ms); |
498 | 0 | if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) { |
499 | 0 | std::stringstream ss; |
500 | 0 | ss << "failed to get latest offset for partition: " << partition_id |
501 | 0 | << ", err: " << RdKafka::err2str(err); |
502 | 0 | LOG(WARNING) << ss.str(); |
503 | 0 | return Status::InternalError(ss.str()); |
504 | 0 | } |
505 | | |
506 | 0 | PIntegerPair pair; |
507 | 0 | pair.set_key(partition_id); |
508 | 0 | pair.set_val(high); |
509 | 0 | offsets->push_back(std::move(pair)); |
510 | 0 | } |
511 | | |
512 | 0 | return Status::OK(); |
513 | 0 | } |
514 | | |
515 | | Status KafkaDataConsumer::get_real_offsets_for_partitions( |
516 | | const std::vector<PIntegerPair>& offset_flags, std::vector<PIntegerPair>* offsets, |
517 | 0 | int timeout) { |
518 | 0 | DBUG_EXECUTE_IF("KafkaDataConsumer.get_offsets_for_partitions.timeout", { |
519 | | // sleep 61s |
520 | 0 | std::this_thread::sleep_for(std::chrono::seconds(61)); |
521 | 0 | }); |
522 | 0 | MonotonicStopWatch watch; |
523 | 0 | watch.start(); |
524 | 0 | for (const auto& entry : offset_flags) { |
525 | 0 | PIntegerPair pair; |
526 | 0 | if (UNLIKELY(entry.val() >= 0)) { |
527 | 0 | pair.set_key(entry.key()); |
528 | 0 | pair.set_val(entry.val()); |
529 | 0 | offsets->push_back(std::move(pair)); |
530 | 0 | continue; |
531 | 0 | } |
532 | | |
533 | 0 | int64_t low = 0; |
534 | 0 | int64_t high = 0; |
535 | 0 | auto timeout_ms = timeout - static_cast<int>(watch.elapsed_time() / 1000 / 1000); |
536 | 0 | if (UNLIKELY(timeout_ms <= 0)) { |
537 | 0 | return Status::InternalError("get kafka real offsets for partitions timeout"); |
538 | 0 | } |
539 | | |
540 | 0 | RdKafka::ErrorCode err = |
541 | 0 | _k_consumer->query_watermark_offsets(_topic, entry.key(), &low, &high, timeout_ms); |
542 | 0 | if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) { |
543 | 0 | std::stringstream ss; |
544 | 0 | ss << "failed to get latest offset for partition: " << entry.key() |
545 | 0 | << ", err: " << RdKafka::err2str(err); |
546 | 0 | LOG(WARNING) << ss.str(); |
547 | 0 | return Status::InternalError(ss.str()); |
548 | 0 | } |
549 | | |
550 | 0 | pair.set_key(entry.key()); |
551 | 0 | if (entry.val() == -1) { |
552 | | // OFFSET_END_VAL = -1 |
553 | 0 | pair.set_val(high); |
554 | 0 | } else if (entry.val() == -2) { |
555 | | // OFFSET_BEGINNING_VAL = -2 |
556 | 0 | pair.set_val(low); |
557 | 0 | } |
558 | 0 | offsets->push_back(std::move(pair)); |
559 | 0 | } |
560 | | |
561 | 0 | return Status::OK(); |
562 | 0 | } |
563 | | |
564 | 1 | Status KafkaDataConsumer::cancel(std::shared_ptr<StreamLoadContext> ctx) { |
565 | 1 | std::unique_lock<std::mutex> l(_lock); |
566 | 1 | if (!_init) { |
567 | 0 | return Status::InternalError("consumer is not initialized"); |
568 | 0 | } |
569 | | |
570 | 1 | _cancelled = true; |
571 | 1 | LOG(INFO) << "kafka consumer cancelled. " << _id; |
572 | 1 | return Status::OK(); |
573 | 1 | } |
574 | | |
575 | 2 | Status KafkaDataConsumer::reset() { |
576 | 2 | std::unique_lock<std::mutex> l(_lock); |
577 | 2 | _cancelled = false; |
578 | 2 | _k_consumer->unassign(); |
579 | | // reset will be called before this consumer being returned to the pool. |
580 | | // so update _last_visit_time is reasonable. |
581 | 2 | _last_visit_time = time(nullptr); |
582 | 2 | return Status::OK(); |
583 | 2 | } |
584 | | |
585 | 1 | Status KafkaDataConsumer::commit(std::vector<RdKafka::TopicPartition*>& offset) { |
586 | | // Use async commit so that it will not block for a long time. |
587 | | // Commit failure has no effect on Doris, subsequent tasks will continue to commit the new offset |
588 | 1 | RdKafka::ErrorCode err = _k_consumer->commitAsync(offset); |
589 | 1 | if (err != RdKafka::ERR_NO_ERROR) { |
590 | 0 | return Status::InternalError("failed to commit kafka offset : {}", RdKafka::err2str(err)); |
591 | 0 | } |
592 | 1 | return Status::OK(); |
593 | 1 | } |
594 | | |
595 | | // if the kafka brokers and topic are same, |
596 | | // we considered this consumer as matched, thus can be reused. |
597 | 1 | bool KafkaDataConsumer::match(std::shared_ptr<StreamLoadContext> ctx) { |
598 | 1 | if (ctx->load_src_type != TLoadSourceType::KAFKA) { |
599 | 0 | return false; |
600 | 0 | } |
601 | 1 | if (_brokers != ctx->kafka_info->brokers || _topic != ctx->kafka_info->topic) { |
602 | 0 | return false; |
603 | 0 | } |
604 | | // check properties |
605 | 1 | return PropertyMatcher::properties_match(_custom_properties, ctx->kafka_info->properties); |
606 | 1 | } |
607 | | |
608 | | // ==================== AWS Kinesis Data Consumer Implementation ==================== |
609 | | |
610 | | KinesisDataConsumer::KinesisDataConsumer(std::shared_ptr<StreamLoadContext> ctx) |
611 | 0 | : _region(ctx->kinesis_info->region), |
612 | 0 | _stream(ctx->kinesis_info->stream), |
613 | 0 | _endpoint(ctx->kinesis_info->endpoint) { |
614 | 0 | VLOG_NOTICE << "construct Kinesis consumer: stream=" << _stream << ", region=" << _region; |
615 | 0 | } |
616 | | |
617 | 0 | KinesisDataConsumer::~KinesisDataConsumer() { |
618 | 0 | VLOG_NOTICE << "destruct Kinesis consumer: stream=" << _stream; |
619 | | // AWS SDK client managed by shared_ptr, will be automatically cleaned up |
620 | 0 | } |
621 | | |
622 | 0 | Status KinesisDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) { |
623 | 0 | std::unique_lock<std::mutex> l(_lock); |
624 | 0 | if (_init) { |
625 | 0 | return Status::OK(); // Already initialized (idempotent) |
626 | 0 | } |
627 | | |
628 | | // Store custom properties (AWS credentials, etc.) |
629 | 0 | _custom_properties.insert(ctx->kinesis_info->properties.begin(), |
630 | 0 | ctx->kinesis_info->properties.end()); |
631 | | |
632 | | // Create KinesisConf and configure it |
633 | 0 | _kinesis_conf = std::make_unique<KinesisConf>(); |
634 | 0 | std::string errstr; |
635 | | |
636 | | // Parse and categorize aws.kinesis.* properties into three types |
637 | 0 | for (auto& item : _custom_properties) { |
638 | 0 | if (starts_with(item.first, "aws.kinesis.")) { |
639 | 0 | std::string conf_key = item.first.substr(12); // Remove "aws.kinesis." prefix |
640 | | |
641 | | // Type 2: Frequently-used parameters (explicit members) |
642 | 0 | if (conf_key == "shards") { |
643 | 0 | std::vector<std::string> parts = |
644 | 0 | absl::StrSplit(item.second, ",", absl::SkipWhitespace()); |
645 | 0 | _explicit_shards = std::move(parts); |
646 | 0 | VLOG_NOTICE << "Set explicit shards: " << item.second; |
647 | 0 | } else if (conf_key == "default.pos") { |
648 | 0 | _default_position = item.second; |
649 | 0 | VLOG_NOTICE << "Set default position: " << item.second; |
650 | 0 | } else if (starts_with(conf_key, "shards.pos.")) { |
651 | 0 | std::string shard_id = conf_key.substr(11); // Remove "shards.pos." prefix |
652 | 0 | _shard_positions[shard_id] = item.second; |
653 | 0 | VLOG_NOTICE << "Set shard position: " << shard_id << " = " << item.second; |
654 | 0 | } |
655 | | // Type 3: Less-frequently-used API parameters (KinesisConf determines which API) |
656 | 0 | else { |
657 | 0 | KinesisConf::ConfResult res = _kinesis_conf->set(conf_key, item.second, errstr); |
658 | 0 | if (res == KinesisConf::CONF_INVALID) { |
659 | 0 | return Status::InternalError("Failed to set '{}': {}", conf_key, errstr); |
660 | 0 | } |
661 | | // CONF_UNKNOWN is acceptable (parameter will be ignored) |
662 | 0 | } |
663 | 0 | } |
664 | 0 | } |
665 | | |
666 | | // Create AWS Kinesis client |
667 | 0 | RETURN_IF_ERROR(_create_kinesis_client(ctx)); |
668 | | |
669 | 0 | VLOG_NOTICE << "finished to init Kinesis consumer. stream=" << _stream << ", region=" << _region |
670 | 0 | << ", " << ctx->brief(); |
671 | 0 | _init = true; |
672 | 0 | return Status::OK(); |
673 | 0 | } |
674 | | |
675 | 0 | Status KinesisDataConsumer::_create_kinesis_client(std::shared_ptr<StreamLoadContext> ctx) { |
676 | | // Reuse S3ClientFactory's credential provider logic |
677 | | // This supports all AWS authentication methods: |
678 | | // - Simple AK/SK |
679 | | // - IAM instance profile (EC2) |
680 | | // - STS assume role |
681 | | // - Session tokens |
682 | | // - Environment variables |
683 | | // - Default credential chain |
684 | |
|
685 | 0 | S3ClientConf s3_conf; |
686 | 0 | s3_conf.region = _region; |
687 | 0 | s3_conf.endpoint = _endpoint; |
688 | |
|
689 | 0 | auto get_property = [this](const char* key) -> std::string { |
690 | 0 | auto it = _custom_properties.find(key); |
691 | 0 | if (it != _custom_properties.end() && !it->second.empty()) { |
692 | 0 | return it->second; |
693 | 0 | } |
694 | 0 | return ""; |
695 | 0 | }; |
696 | | |
697 | | // Keep one naming convention aligned with FE-side Kinesis properties. |
698 | 0 | s3_conf.ak = get_property("aws.access_key"); |
699 | 0 | s3_conf.sk = get_property("aws.secret_key"); |
700 | 0 | s3_conf.token = get_property("aws.session_key"); |
701 | 0 | s3_conf.role_arn = get_property("aws.role_arn"); |
702 | 0 | s3_conf.external_id = get_property("aws.external.id"); |
703 | |
|
704 | 0 | const std::string provider = get_property("aws.credentials.provider"); |
705 | 0 | if (!provider.empty()) { |
706 | | // Map provider type string to enum |
707 | 0 | if (provider == "instance_profile") { |
708 | 0 | s3_conf.cred_provider_type = CredProviderType::InstanceProfile; |
709 | 0 | } else if (provider == "env") { |
710 | 0 | s3_conf.cred_provider_type = CredProviderType::Env; |
711 | 0 | } else if (provider == "simple") { |
712 | 0 | s3_conf.cred_provider_type = CredProviderType::Simple; |
713 | 0 | } |
714 | 0 | } |
715 | | |
716 | | // Create AWS ClientConfiguration |
717 | 0 | Aws::Client::ClientConfiguration aws_config = S3ClientFactory::getClientConfiguration(); |
718 | 0 | aws_config.region = _region; |
719 | |
|
720 | 0 | if (!_endpoint.empty()) { |
721 | 0 | aws_config.endpointOverride = _endpoint; |
722 | 0 | } |
723 | |
|
724 | 0 | std::string ca_cert_file_path = |
725 | 0 | get_valid_ca_cert_path(doris::split(config::ca_cert_file_paths, ";")); |
726 | 0 | if (!ca_cert_file_path.empty()) { |
727 | 0 | aws_config.caFile = ca_cert_file_path; |
728 | 0 | } |
729 | |
|
730 | 0 | auto parse_timeout_ms = [](const std::string& timeout_value, const std::string& property_name, |
731 | 0 | long* timeout_ms) -> Status { |
732 | 0 | try { |
733 | 0 | *timeout_ms = std::stol(timeout_value); |
734 | 0 | } catch (const std::exception&) { |
735 | 0 | return Status::InternalError("Invalid value for {}: {}", property_name, timeout_value); |
736 | 0 | } |
737 | 0 | return Status::OK(); |
738 | 0 | }; |
739 | | |
740 | | // Set timeouts from properties or use defaults |
741 | 0 | auto it_request_timeout = _custom_properties.find("aws.request.timeout.ms"); |
742 | 0 | if (it_request_timeout != _custom_properties.end()) { |
743 | 0 | RETURN_IF_ERROR(parse_timeout_ms(it_request_timeout->second, "aws.request.timeout.ms", |
744 | 0 | &aws_config.requestTimeoutMs)); |
745 | 0 | } else { |
746 | 0 | aws_config.requestTimeoutMs = 30000; // 30s default |
747 | 0 | } |
748 | | |
749 | 0 | auto it_conn_timeout = _custom_properties.find("aws.connection.timeout.ms"); |
750 | 0 | if (it_conn_timeout != _custom_properties.end()) { |
751 | 0 | RETURN_IF_ERROR(parse_timeout_ms(it_conn_timeout->second, "aws.connection.timeout.ms", |
752 | 0 | &aws_config.connectTimeoutMs)); |
753 | 0 | } |
754 | | |
755 | | // Get credentials provider (reuses S3 infrastructure) |
756 | 0 | auto credentials_provider = S3ClientFactory::instance().get_aws_credentials_provider(s3_conf); |
757 | | |
758 | | // Create Kinesis client |
759 | 0 | _kinesis_client = |
760 | 0 | std::make_shared<Aws::Kinesis::KinesisClient>(credentials_provider, aws_config); |
761 | |
|
762 | 0 | if (!_kinesis_client) { |
763 | 0 | return Status::InternalError( |
764 | 0 | "Failed to create AWS Kinesis client for stream: {}, region: {}", _stream, _region); |
765 | 0 | } |
766 | | |
767 | 0 | LOG(INFO) << "Created Kinesis client for stream: " << _stream << ", region: " << _region; |
768 | 0 | return Status::OK(); |
769 | 0 | } |
770 | | |
771 | | Status KinesisDataConsumer::assign_shards( |
772 | | const std::map<std::string, std::string>& shard_sequence_numbers, |
773 | 0 | const std::string& stream_name, std::shared_ptr<StreamLoadContext> ctx) { |
774 | 0 | DORIS_CHECK(_kinesis_client); |
775 | | |
776 | 0 | std::stringstream ss; |
777 | 0 | ss << "Assigning shards to Kinesis consumer " << _id << ": "; |
778 | |
|
779 | 0 | for (auto& entry : shard_sequence_numbers) { |
780 | 0 | const std::string& shard_id = entry.first; |
781 | 0 | const std::string& sequence_number = entry.second; |
782 | | |
783 | | // Get shard iterator for this shard |
784 | 0 | std::string iterator; |
785 | 0 | RETURN_IF_ERROR(_get_shard_iterator(shard_id, sequence_number, &iterator)); |
786 | | |
787 | 0 | _shard_iterators[shard_id] = iterator; |
788 | 0 | _consuming_shard_ids.insert(shard_id); |
789 | |
|
790 | 0 | ss << "[" << shard_id << ": " << sequence_number << "] "; |
791 | 0 | } |
792 | | |
793 | 0 | LOG(INFO) << ss.str(); |
794 | 0 | return Status::OK(); |
795 | 0 | } |
796 | | |
797 | | Status KinesisDataConsumer::_get_shard_iterator(const std::string& shard_id, |
798 | | const std::string& sequence_number, |
799 | 0 | std::string* iterator) { |
800 | 0 | Aws::Kinesis::Model::GetShardIteratorRequest request; |
801 | | |
802 | | // Apply all configurations through KinesisConf |
803 | 0 | DCHECK(_kinesis_conf != nullptr); |
804 | 0 | Status st = _kinesis_conf->apply_to_get_shard_iterator_request(request, _stream, shard_id, |
805 | 0 | sequence_number); |
806 | 0 | if (!st.ok()) { |
807 | 0 | return Status::InternalError( |
808 | 0 | "Failed to apply Kinesis config to GetShardIteratorRequest: {}", st.to_string()); |
809 | 0 | } |
810 | | |
811 | 0 | auto outcome = _kinesis_client->GetShardIterator(request); |
812 | 0 | if (!outcome.IsSuccess()) { |
813 | 0 | auto& error = outcome.GetError(); |
814 | 0 | return Status::InternalError("Failed to get shard iterator for shard {}: {} ({})", shard_id, |
815 | 0 | error.GetMessage(), static_cast<int>(error.GetErrorType())); |
816 | 0 | } |
817 | | |
818 | 0 | *iterator = outcome.GetResult().GetShardIterator(); |
819 | 0 | VLOG_NOTICE << "Got shard iterator for shard: " << shard_id; |
820 | 0 | return Status::OK(); |
821 | 0 | } |
822 | | |
823 | | Status KinesisDataConsumer::group_consume( |
824 | | BlockingQueue<std::shared_ptr<Aws::Kinesis::Model::Record>>* queue, |
825 | 0 | int64_t max_running_time_ms) { |
826 | 0 | static constexpr int INTER_SHARD_SLEEP_MS = 10; // Small sleep between shards |
827 | 0 | static constexpr int MIN_INTERVAL_BETWEEN_ROUNDS_MS = 200; // Min 200ms between rounds |
828 | |
|
829 | 0 | int64_t left_time = max_running_time_ms; |
830 | 0 | LOG(INFO) << "start Kinesis consumer: " << _id << ", grp: " << _grp_id |
831 | 0 | << ", stream: " << _stream << ", max running time(ms): " << left_time; |
832 | |
|
833 | 0 | int64_t received_rows = 0; |
834 | 0 | int64_t put_rows = 0; |
835 | 0 | RetryPolicy retry_policy(3, 200); |
836 | 0 | ThrottleBackoff throttle_backoff(1000, 10000); |
837 | 0 | Status st = Status::OK(); |
838 | 0 | bool done = false; |
839 | |
|
840 | 0 | MonotonicStopWatch consumer_watch; |
841 | 0 | MonotonicStopWatch watch; |
842 | 0 | watch.start(); |
843 | |
|
844 | 0 | while (true) { |
845 | | // Check cancellation flag |
846 | 0 | { |
847 | 0 | std::unique_lock<std::mutex> l(_lock); |
848 | 0 | if (_cancelled) { |
849 | 0 | break; |
850 | 0 | } |
851 | 0 | } |
852 | | |
853 | 0 | if (left_time <= 0) { |
854 | 0 | break; |
855 | 0 | } |
856 | | |
857 | | // Round-robin through all active shards |
858 | 0 | for (auto it = _consuming_shard_ids.begin(); it != _consuming_shard_ids.end() && !done;) { |
859 | 0 | const std::string& shard_id = *it; |
860 | 0 | auto iter_it = _shard_iterators.find(shard_id); |
861 | |
|
862 | 0 | if (iter_it == _shard_iterators.end() || iter_it->second.empty()) { |
863 | | // Shard exhausted (closed due to split/merge), remove from active set |
864 | 0 | LOG(INFO) << "Shard exhausted: " << shard_id; |
865 | 0 | it = _consuming_shard_ids.erase(it); |
866 | 0 | continue; |
867 | 0 | } |
868 | | |
869 | 0 | consumer_watch.start(); |
870 | |
|
871 | 0 | Aws::Kinesis::Model::GetRecordsRequest request; |
872 | |
|
873 | 0 | DCHECK(_kinesis_conf != nullptr); |
874 | 0 | st = _kinesis_conf->apply_to_get_records_request(request, iter_it->second); |
875 | 0 | if (!st.ok()) { |
876 | 0 | LOG(WARNING) << "Failed to apply Kinesis config to GetRecordsRequest: " << st; |
877 | 0 | done = true; |
878 | 0 | break; |
879 | 0 | } |
880 | | |
881 | 0 | auto outcome = _kinesis_client->GetRecords(request); |
882 | 0 | consumer_watch.stop(); |
883 | | |
884 | | // Track generic routine load metrics and Kinesis-specific metrics. |
885 | 0 | DorisMetrics::instance()->routine_load_get_msg_count->increment(1); |
886 | 0 | DorisMetrics::instance()->routine_load_get_msg_latency->increment( |
887 | 0 | consumer_watch.elapsed_time() / 1000 / 1000); |
888 | 0 | DorisMetrics::instance()->routine_load_kinesis_get_records_count->increment(1); |
889 | 0 | DorisMetrics::instance()->routine_load_kinesis_get_records_latency->increment( |
890 | 0 | consumer_watch.elapsed_time() / 1000 / 1000); |
891 | |
|
892 | 0 | if (!outcome.IsSuccess()) { |
893 | 0 | auto& error = outcome.GetError(); |
894 | | |
895 | | // Handle throttling (ProvisionedThroughputExceededException) |
896 | 0 | if (error.GetErrorType() == |
897 | 0 | Aws::Kinesis::KinesisErrors::PROVISIONED_THROUGHPUT_EXCEEDED) { |
898 | 0 | DorisMetrics::instance()->routine_load_kinesis_throttle_count->increment(1); |
899 | 0 | LOG(INFO) << "Kinesis rate limit exceeded for shard: " << shard_id |
900 | 0 | << ", throttle_count: " << throttle_backoff.throttle_count() |
901 | 0 | << ", backing off"; |
902 | 0 | throttle_backoff.backoff_and_sleep(); |
903 | 0 | ++it; // Move to next shard, will retry this one next round |
904 | 0 | continue; |
905 | 0 | } |
906 | | |
907 | | // Handle retriable errors |
908 | 0 | if (_is_retriable_error(error)) { |
909 | 0 | DorisMetrics::instance()->routine_load_kinesis_retriable_error_count->increment( |
910 | 0 | 1); |
911 | 0 | LOG(INFO) << "Kinesis retriable error for shard " << shard_id << ": " |
912 | 0 | << error.GetMessage() |
913 | 0 | << ", retry times: " << retry_policy.retry_count(); |
914 | 0 | if (retry_policy.should_retry()) { |
915 | 0 | retry_policy.retry_with_backoff(); |
916 | 0 | continue; |
917 | 0 | } |
918 | 0 | } |
919 | | |
920 | | // Fatal error |
921 | 0 | LOG(WARNING) << "Kinesis consume failed for shard " << shard_id << ": " |
922 | 0 | << error.GetMessage() << " (" << static_cast<int>(error.GetErrorType()) |
923 | 0 | << ")"; |
924 | 0 | st = Status::InternalError("Kinesis GetRecords failed for shard {}: {}", shard_id, |
925 | 0 | error.GetMessage()); |
926 | 0 | done = true; |
927 | 0 | break; |
928 | 0 | } |
929 | | |
930 | | // Reset retry counter on success |
931 | 0 | retry_policy.reset(); |
932 | 0 | throttle_backoff.reset(); |
933 | | |
934 | | // Process records - move result to allow moving individual records |
935 | 0 | auto result = outcome.GetResultWithOwnership(); |
936 | 0 | auto millis_behind = result.GetMillisBehindLatest(); |
937 | 0 | std::string next_iterator = result.GetNextShardIterator(); |
938 | 0 | size_t record_count = result.GetRecords().size(); |
939 | 0 | RETURN_IF_ERROR(_process_records(shard_id, std::move(result), queue, &received_rows, |
940 | 0 | &put_rows)); |
941 | | |
942 | | // Track MillisBehindLatest for this shard (used by FE for lag monitoring & scheduling) |
943 | 0 | _millis_behind_latest[shard_id] = millis_behind; |
944 | | |
945 | | // Update shard iterator for next call |
946 | 0 | if (next_iterator.empty()) { |
947 | | // Shard is closed (split/merge), mark as closed and remove from active set |
948 | 0 | LOG(INFO) << "Shard closed: " << shard_id << " (split/merge detected)"; |
949 | 0 | DorisMetrics::instance()->routine_load_kinesis_closed_shard_count->increment(1); |
950 | 0 | _closed_shard_ids.insert(shard_id); |
951 | 0 | _shard_iterators.erase(shard_id); |
952 | 0 | it = _consuming_shard_ids.erase(it); |
953 | 0 | } else { |
954 | | // Update iterator for next consumption |
955 | 0 | _shard_iterators[shard_id] = next_iterator; |
956 | |
|
957 | 0 | if (record_count == 0) { |
958 | | // No records in this batch - shard has caught up with latest data |
959 | | // Remove from active set for this round (similar to Kafka PARTITION_EOF) |
960 | | // but keep iterator and progress for next task execution |
961 | 0 | LOG(INFO) << "Shard has no new data: " << shard_id |
962 | 0 | << " (MillisBehindLatest=" << millis_behind << ")"; |
963 | 0 | it = _consuming_shard_ids.erase(it); |
964 | 0 | } else { |
965 | 0 | ++it; |
966 | 0 | } |
967 | 0 | } |
968 | | |
969 | | // Check if all shards are exhausted |
970 | 0 | if (_consuming_shard_ids.empty()) { |
971 | 0 | LOG(INFO) << "All shards exhausted for consumer: " << _id; |
972 | 0 | done = true; |
973 | 0 | break; |
974 | 0 | } |
975 | | |
976 | | // Small sleep to avoid tight loop |
977 | 0 | std::this_thread::sleep_for(std::chrono::milliseconds(INTER_SHARD_SLEEP_MS)); |
978 | 0 | } |
979 | | |
980 | | // Ensure minimum interval between rounds to respect Kinesis rate limits (5 GetRecords/sec per shard) |
981 | 0 | std::this_thread::sleep_for(std::chrono::milliseconds(MIN_INTERVAL_BETWEEN_ROUNDS_MS)); |
982 | |
|
983 | 0 | left_time = max_running_time_ms - watch.elapsed_time() / 1000 / 1000; |
984 | 0 | if (done) { |
985 | 0 | break; |
986 | 0 | } |
987 | 0 | } |
988 | | |
989 | 0 | LOG(INFO) << "Kinesis consumer done: " << _id << ", grp: " << _grp_id |
990 | 0 | << ". cancelled: " << _cancelled << ", left time(ms): " << left_time |
991 | 0 | << ", total cost(ms): " << watch.elapsed_time() / 1000 / 1000 |
992 | 0 | << ", consume cost(ms): " << consumer_watch.elapsed_time() / 1000 / 1000 |
993 | 0 | << ", received rows: " << received_rows << ", put rows: " << put_rows; |
994 | |
|
995 | 0 | return st; |
996 | 0 | } |
997 | | |
998 | | Status KinesisDataConsumer::_process_records( |
999 | | const std::string& shard_id, Aws::Kinesis::Model::GetRecordsResult result, |
1000 | | BlockingQueue<std::shared_ptr<Aws::Kinesis::Model::Record>>* queue, int64_t* received_rows, |
1001 | 0 | int64_t* put_rows) { |
1002 | | // result is owned by value, safe to get mutable access to its records |
1003 | 0 | auto records = |
1004 | 0 | std::move(const_cast<Aws::Vector<Aws::Kinesis::Model::Record>&>(result.GetRecords())); |
1005 | 0 | for (auto& record : records) { |
1006 | 0 | DorisMetrics::instance()->routine_load_consume_bytes->increment( |
1007 | 0 | record.GetData().GetLength()); |
1008 | |
|
1009 | 0 | if (record.GetData().GetLength() == 0) { |
1010 | | // Skip empty records |
1011 | 0 | continue; |
1012 | 0 | } |
1013 | | |
1014 | | // Track the last sequence number for this shard |
1015 | 0 | _committed_sequence_numbers[shard_id] = record.GetSequenceNumber(); |
1016 | | |
1017 | | // Move record into shared_ptr to avoid expensive copy |
1018 | 0 | auto record_ptr = std::make_shared<Aws::Kinesis::Model::Record>(std::move(record)); |
1019 | |
|
1020 | 0 | if (!queue->controlled_blocking_put(record_ptr, |
1021 | 0 | config::blocking_queue_cv_wait_timeout_ms)) { |
1022 | | // Queue shutdown |
1023 | 0 | return Status::InternalError("Queue shutdown during record processing"); |
1024 | 0 | } |
1025 | | |
1026 | 0 | (*put_rows)++; |
1027 | 0 | (*received_rows)++; |
1028 | 0 | DorisMetrics::instance()->routine_load_consume_rows->increment(1); |
1029 | 0 | } |
1030 | | |
1031 | 0 | return Status::OK(); |
1032 | 0 | } |
1033 | | |
1034 | | bool KinesisDataConsumer::_is_retriable_error( |
1035 | 0 | const Aws::Client::AWSError<Aws::Kinesis::KinesisErrors>& error) { |
1036 | 0 | auto error_type = error.GetErrorType(); |
1037 | |
|
1038 | 0 | return error_type == Aws::Kinesis::KinesisErrors::PROVISIONED_THROUGHPUT_EXCEEDED || |
1039 | 0 | error_type == Aws::Kinesis::KinesisErrors::SERVICE_UNAVAILABLE || |
1040 | 0 | error_type == Aws::Kinesis::KinesisErrors::INTERNAL_FAILURE || |
1041 | 0 | error_type == Aws::Kinesis::KinesisErrors::NETWORK_CONNECTION || error.ShouldRetry(); |
1042 | 0 | } |
1043 | | |
1044 | 0 | Status KinesisDataConsumer::reset() { |
1045 | 0 | std::unique_lock<std::mutex> l(_lock); |
1046 | 0 | _cancelled = false; |
1047 | 0 | _consuming_shard_ids.clear(); |
1048 | 0 | _shard_iterators.clear(); |
1049 | 0 | _millis_behind_latest.clear(); |
1050 | 0 | _committed_sequence_numbers.clear(); |
1051 | 0 | _closed_shard_ids.clear(); |
1052 | 0 | _last_visit_time = time(nullptr); |
1053 | 0 | LOG(INFO) << "Kinesis consumer reset: " << _id; |
1054 | 0 | return Status::OK(); |
1055 | 0 | } |
1056 | | |
1057 | 0 | Status KinesisDataConsumer::cancel(std::shared_ptr<StreamLoadContext> ctx) { |
1058 | 0 | std::unique_lock<std::mutex> l(_lock); |
1059 | 0 | if (!_init) { |
1060 | 0 | return Status::InternalError("Kinesis consumer is not initialized"); |
1061 | 0 | } |
1062 | 0 | _cancelled = true; |
1063 | 0 | LOG(INFO) << "Kinesis consumer cancelled: " << _id << ", " << ctx->brief(); |
1064 | 0 | return Status::OK(); |
1065 | 0 | } |
1066 | | |
1067 | 0 | bool KinesisDataConsumer::match(std::shared_ptr<StreamLoadContext> ctx) { |
1068 | 0 | if (ctx->load_src_type != TLoadSourceType::KINESIS) { |
1069 | 0 | return false; |
1070 | 0 | } |
1071 | | |
1072 | 0 | if (_region != ctx->kinesis_info->region || _stream != ctx->kinesis_info->stream || |
1073 | 0 | _endpoint != ctx->kinesis_info->endpoint) { |
1074 | 0 | return false; |
1075 | 0 | } |
1076 | | |
1077 | | // Check that properties match |
1078 | 0 | return PropertyMatcher::properties_match(_custom_properties, ctx->kinesis_info->properties); |
1079 | 0 | } |
1080 | | |
1081 | 0 | Status KinesisDataConsumer::get_shard_list(std::vector<std::string>* shard_ids) { |
1082 | 0 | DORIS_CHECK(_kinesis_client); |
1083 | | |
1084 | | // If user specified explicit shards, return those |
1085 | 0 | if (!_explicit_shards.empty()) { |
1086 | 0 | *shard_ids = _explicit_shards; |
1087 | 0 | LOG(INFO) << "Using " << shard_ids->size() << " explicit shards for stream: " << _stream; |
1088 | 0 | return Status::OK(); |
1089 | 0 | } |
1090 | | |
1091 | | // Discover all shards |
1092 | 0 | Aws::Kinesis::Model::ListShardsRequest request; |
1093 | |
|
1094 | 0 | DCHECK(_kinesis_conf != nullptr); |
1095 | 0 | Status st = _kinesis_conf->apply_to_list_shards_request(request, _stream); |
1096 | 0 | if (!st.ok()) { |
1097 | 0 | return Status::InternalError("Failed to apply Kinesis config to ListShardsRequest: {}", |
1098 | 0 | st.to_string()); |
1099 | 0 | } |
1100 | | |
1101 | | // Only return OPEN shards here. FE will keep recently retired parent shards in its |
1102 | | // closed list until they are fully drained, then remove them permanently. Returning |
1103 | | // CLOSED shards from ListShards would make already-drained parents look newly discovered |
1104 | | // and cause them to restart from TRIM_HORIZON. |
1105 | 0 | std::vector<std::string> discovered_shard_ids; |
1106 | 0 | bool saw_any_shard = false; |
1107 | 0 | while (true) { |
1108 | 0 | auto outcome = _kinesis_client->ListShards(request); |
1109 | 0 | if (!outcome.IsSuccess()) { |
1110 | 0 | auto& error = outcome.GetError(); |
1111 | 0 | return Status::InternalError("Failed to list shards for stream {}: {} ({})", _stream, |
1112 | 0 | error.GetMessage(), |
1113 | 0 | static_cast<int>(error.GetErrorType())); |
1114 | 0 | } |
1115 | | |
1116 | 0 | const auto& result = outcome.GetResult(); |
1117 | 0 | if (!result.GetShards().empty()) { |
1118 | 0 | saw_any_shard = true; |
1119 | 0 | } |
1120 | 0 | for (const auto& shard : result.GetShards()) { |
1121 | 0 | const auto& ending_sequence_number = |
1122 | 0 | shard.GetSequenceNumberRange().GetEndingSequenceNumber(); |
1123 | 0 | if (!ending_sequence_number.empty()) { |
1124 | 0 | continue; |
1125 | 0 | } |
1126 | 0 | discovered_shard_ids.emplace_back(shard.GetShardId()); |
1127 | 0 | } |
1128 | |
|
1129 | 0 | const Aws::String& next_token = result.GetNextToken(); |
1130 | 0 | if (next_token.empty()) { |
1131 | 0 | break; |
1132 | 0 | } |
1133 | | |
1134 | 0 | Aws::Kinesis::Model::ListShardsRequest next_request; |
1135 | | // AWS requires paginated ListShards requests to use NextToken instead of StreamName. |
1136 | 0 | next_request.SetNextToken(next_token); |
1137 | 0 | if (request.MaxResultsHasBeenSet()) { |
1138 | 0 | next_request.SetMaxResults(request.GetMaxResults()); |
1139 | 0 | } |
1140 | 0 | request = std::move(next_request); |
1141 | 0 | } |
1142 | | |
1143 | 0 | if (discovered_shard_ids.empty() && !saw_any_shard) { |
1144 | 0 | return Status::InternalError("No shards found in Kinesis stream: {}", _stream); |
1145 | 0 | } |
1146 | | |
1147 | 0 | *shard_ids = std::move(discovered_shard_ids); |
1148 | | LOG(INFO) << "Found " << shard_ids->size() << " open shards in stream: " << _stream; |
1149 | 0 | return Status::OK(); |
1150 | 0 | } |
1151 | | |
1152 | | } // end namespace doris |