be/src/load/routine_load/data_consumer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "load/routine_load/data_consumer.h" |
19 | | |
20 | | #include <absl/strings/str_split.h> |
21 | | #include <gen_cpp/Types_types.h> |
22 | | #include <gen_cpp/internal_service.pb.h> |
23 | | #include <librdkafka/rdkafkacpp.h> |
24 | | |
25 | | #include <algorithm> |
26 | | // IWYU pragma: no_include <bits/chrono.h> |
27 | | #include <chrono> // IWYU pragma: keep |
28 | | #include <string> |
29 | | #include <thread> |
30 | | #include <utility> |
31 | | #include <vector> |
32 | | |
33 | | #include "common/config.h" |
34 | | #include "common/metrics/doris_metrics.h" |
35 | | #include "common/status.h" |
36 | | #include "runtime/aws_msk_iam_auth.h" |
37 | | #include "runtime/exec_env.h" |
38 | | #include "runtime/small_file_mgr.h" |
39 | | #include "service/backend_options.h" |
40 | | #include "util/blocking_queue.hpp" |
41 | | #include "util/debug_points.h" |
42 | | #include "util/defer_op.h" |
43 | | #include "util/stopwatch.hpp" |
44 | | #include "util/string_util.h" |
45 | | #include "util/uid_util.h" |
46 | | |
47 | | namespace doris { |
48 | | |
49 | | static const std::string PROP_GROUP_ID = "group.id"; |
50 | | // init kafka consumer will only set common configs such as |
51 | | // brokers, groupid |
52 | 77 | Status KafkaDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) { |
53 | 77 | std::unique_lock<std::mutex> l(_lock); |
54 | 77 | if (_init) { |
55 | | // this consumer has already been initialized. |
56 | 0 | return Status::OK(); |
57 | 0 | } |
58 | | |
59 | 77 | RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); |
60 | | |
61 | | // conf has to be deleted finally |
62 | 77 | Defer delete_conf {[conf]() { delete conf; }}; |
63 | | |
64 | 77 | std::string errstr; |
65 | 1.00k | auto set_conf = [&conf, &errstr](const std::string& conf_key, const std::string& conf_val) { |
66 | 1.00k | RdKafka::Conf::ConfResult res = conf->set(conf_key, conf_val, errstr); |
67 | 1.00k | if (res == RdKafka::Conf::CONF_UNKNOWN) { |
68 | | // ignore unknown config |
69 | 0 | return Status::OK(); |
70 | 1.00k | } else if (errstr.find("not supported") != std::string::npos) { |
71 | | // some java-only properties may be passed to here, and librdkafak will return 'xxx' not supported |
72 | | // ignore it |
73 | 0 | return Status::OK(); |
74 | 1.00k | } else if (res != RdKafka::Conf::CONF_OK) { |
75 | 0 | std::stringstream ss; |
76 | 0 | ss << "PAUSE: failed to set '" << conf_key << "', value: '" << conf_val |
77 | 0 | << "', err: " << errstr; |
78 | 0 | LOG(WARNING) << ss.str(); |
79 | 0 | return Status::InternalError(ss.str()); |
80 | 0 | } |
81 | 1.00k | VLOG_NOTICE << "set " << conf_key << ": " << conf_val; |
82 | 1.00k | return Status::OK(); |
83 | 1.00k | }; |
84 | | |
85 | 77 | RETURN_IF_ERROR(set_conf("metadata.broker.list", ctx->kafka_info->brokers)); |
86 | 77 | RETURN_IF_ERROR(set_conf("enable.partition.eof", "true")); |
87 | 77 | RETURN_IF_ERROR(set_conf("enable.auto.offset.store", "false")); |
88 | | // TODO: set it larger than 0 after we set rd_kafka_conf_set_stats_cb() |
89 | 77 | RETURN_IF_ERROR(set_conf("statistics.interval.ms", "0")); |
90 | 77 | RETURN_IF_ERROR(set_conf("auto.offset.reset", "error")); |
91 | 77 | RETURN_IF_ERROR(set_conf("socket.keepalive.enable", "true")); |
92 | 77 | RETURN_IF_ERROR(set_conf("reconnect.backoff.ms", "100")); |
93 | 77 | RETURN_IF_ERROR(set_conf("reconnect.backoff.max.ms", "10000")); |
94 | 77 | RETURN_IF_ERROR(set_conf("api.version.request", config::kafka_api_version_request)); |
95 | 77 | RETURN_IF_ERROR(set_conf("api.version.fallback.ms", "0")); |
96 | 77 | RETURN_IF_ERROR(set_conf("broker.version.fallback", config::kafka_broker_version_fallback)); |
97 | 77 | RETURN_IF_ERROR(set_conf("broker.address.ttl", "0")); |
98 | 77 | if (config::kafka_debug != "disable") { |
99 | 0 | RETURN_IF_ERROR(set_conf("debug", config::kafka_debug)); |
100 | 0 | } |
101 | | |
102 | 77 | for (auto& item : ctx->kafka_info->properties) { |
103 | 73 | _custom_properties.emplace(item.first, item.second); |
104 | | |
105 | | // AWS properties (aws.*) are Doris-specific for MSK IAM authentication |
106 | | // and should not be passed to librdkafka |
107 | 73 | if (starts_with(item.first, "aws.")) { |
108 | 0 | LOG(INFO) << "Skipping AWS property for librdkafka: " << item.first; |
109 | 0 | continue; |
110 | 0 | } |
111 | | |
112 | 73 | if (starts_with(item.second, "FILE:")) { |
113 | | // file property should has format: FILE:file_id:md5 |
114 | 0 | std::vector<std::string> parts = |
115 | 0 | absl::StrSplit(item.second, ":", absl::SkipWhitespace()); |
116 | 0 | if (parts.size() != 3) { |
117 | 0 | return Status::InternalError("PAUSE: Invalid file property of kafka: " + |
118 | 0 | item.second); |
119 | 0 | } |
120 | 0 | int64_t file_id = std::stol(parts[1]); |
121 | 0 | std::string file_path; |
122 | 0 | Status st = ctx->exec_env()->small_file_mgr()->get_file(file_id, parts[2], &file_path); |
123 | 0 | if (!st.ok()) { |
124 | 0 | return Status::InternalError("PAUSE: failed to get file for config: {}, error: {}", |
125 | 0 | item.first, st.to_string()); |
126 | 0 | } |
127 | 0 | RETURN_IF_ERROR(set_conf(item.first, file_path)); |
128 | 73 | } else { |
129 | 73 | RETURN_IF_ERROR(set_conf(item.first, item.second)); |
130 | 73 | } |
131 | 73 | } |
132 | | |
133 | | // if not specified group id, generate a random one. |
134 | | // ATTN: In the new version, we have set a group.id on the FE side for jobs that have not set a groupid, |
135 | | // but in order to ensure compatibility, we still do a check here. |
136 | 77 | if (!_custom_properties.contains(PROP_GROUP_ID)) { |
137 | 5 | std::stringstream ss; |
138 | 5 | ss << BackendOptions::get_localhost() << "_"; |
139 | 5 | std::string group_id = ss.str() + UniqueId::gen_uid().to_string(); |
140 | 5 | RETURN_IF_ERROR(set_conf(PROP_GROUP_ID, group_id)); |
141 | 5 | _custom_properties.emplace(PROP_GROUP_ID, group_id); |
142 | 5 | } |
143 | 77 | LOG(INFO) << "init kafka consumer with group id: " << _custom_properties[PROP_GROUP_ID]; |
144 | | |
145 | 77 | if (conf->set("event_cb", &_k_event_cb, errstr) != RdKafka::Conf::CONF_OK) { |
146 | 0 | std::stringstream ss; |
147 | 0 | ss << "PAUSE: failed to set 'event_cb'"; |
148 | 0 | LOG(WARNING) << ss.str(); |
149 | 0 | return Status::InternalError(ss.str()); |
150 | 0 | } |
151 | | |
152 | | // Set up AWS MSK IAM authentication if configured |
153 | 77 | _aws_msk_oauth_callback = AwsMskIamOAuthCallback::create_from_properties( |
154 | 77 | _custom_properties, ctx->kafka_info->brokers); |
155 | 77 | if (_aws_msk_oauth_callback) { |
156 | | // Enable SASL queue to support background callbacks |
157 | 0 | if (conf->enable_sasl_queue(true, errstr) != RdKafka::Conf::CONF_OK) { |
158 | 0 | LOG(WARNING) << "PAUSE: failed to enable SASL queue: " << errstr; |
159 | 0 | return Status::InternalError("PAUSE: failed to enable SASL queue: " + errstr); |
160 | 0 | } |
161 | | |
162 | 0 | if (conf->set("oauthbearer_token_refresh_cb", _aws_msk_oauth_callback.get(), errstr) != |
163 | 0 | RdKafka::Conf::CONF_OK) { |
164 | 0 | LOG(WARNING) << "PAUSE: failed to set OAuth callback: " << errstr; |
165 | 0 | return Status::InternalError("PAUSE: failed to set OAuth callback: " + errstr); |
166 | 0 | } |
167 | 0 | LOG(INFO) << "AWS MSK IAM authentication enabled successfully"; |
168 | 0 | } |
169 | | |
170 | | // create consumer |
171 | 77 | _k_consumer = RdKafka::KafkaConsumer::create(conf, errstr); |
172 | 77 | if (!_k_consumer) { |
173 | 0 | LOG(WARNING) << "PAUSE: failed to create kafka consumer: " << errstr; |
174 | 0 | return Status::InternalError("PAUSE: failed to create kafka consumer: " + errstr); |
175 | 0 | } |
176 | | |
177 | | // If AWS MSK IAM auth is enabled, inject initial token and enable background refresh |
178 | 77 | if (_aws_msk_oauth_callback) { |
179 | 0 | RETURN_IF_ERROR(_aws_msk_oauth_callback->refresh_now(_k_consumer)); |
180 | | |
181 | 0 | std::unique_ptr<RdKafka::Error> bg_err(_k_consumer->sasl_background_callbacks_enable()); |
182 | 0 | if (bg_err) { |
183 | 0 | return Status::InternalError("Failed to enable SASL background callbacks: " + |
184 | 0 | bg_err->str()); |
185 | 0 | } |
186 | 0 | LOG(INFO) << "AWS MSK IAM: initial token set, background refresh enabled"; |
187 | 0 | } |
188 | | |
189 | 77 | VLOG_NOTICE << "finished to init kafka consumer. " << ctx->brief(); |
190 | | |
191 | 77 | _init = true; |
192 | 77 | return Status::OK(); |
193 | 77 | } |
194 | | |
195 | | Status KafkaDataConsumer::assign_topic_partitions( |
196 | | const std::map<int32_t, int64_t>& begin_partition_offset, const std::string& topic, |
197 | 71 | std::shared_ptr<StreamLoadContext> ctx) { |
198 | 71 | DCHECK(_k_consumer); |
199 | | // create TopicPartitions |
200 | 71 | std::stringstream ss; |
201 | 71 | std::vector<RdKafka::TopicPartition*> topic_partitions; |
202 | 71 | for (auto& entry : begin_partition_offset) { |
203 | 71 | RdKafka::TopicPartition* tp1 = |
204 | 71 | RdKafka::TopicPartition::create(topic, entry.first, entry.second); |
205 | 71 | topic_partitions.push_back(tp1); |
206 | 71 | _consuming_partition_ids.insert(entry.first); |
207 | 71 | ss << "[" << entry.first << ": " << entry.second << "] "; |
208 | 71 | } |
209 | | |
210 | 71 | LOG(INFO) << "consumer: " << _id << ", grp: " << _grp_id |
211 | 71 | << " assign topic partitions: " << topic << ", " << ss.str(); |
212 | | |
213 | | // delete TopicPartition finally |
214 | 71 | Defer delete_tp {[&topic_partitions]() { |
215 | 71 | std::for_each(topic_partitions.begin(), topic_partitions.end(), |
216 | 71 | [](RdKafka::TopicPartition* tp1) { delete tp1; }); |
217 | 71 | }}; |
218 | | |
219 | | // assign partition |
220 | 71 | RdKafka::ErrorCode err = _k_consumer->assign(topic_partitions); |
221 | 71 | if (err) { |
222 | 0 | LOG(WARNING) << "failed to assign topic partitions: " << ctx->brief(true) |
223 | 0 | << ", err: " << RdKafka::err2str(err); |
224 | 0 | _k_consumer->unassign(); |
225 | 0 | return Status::InternalError("failed to assign topic partitions"); |
226 | 0 | } |
227 | | |
228 | 71 | return Status::OK(); |
229 | 71 | } |
230 | | |
231 | | Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue, |
232 | 71 | int64_t max_running_time_ms) { |
233 | 71 | static constexpr int MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE = 3; |
234 | 71 | int64_t left_time = max_running_time_ms; |
235 | 71 | LOG(INFO) << "start kafka consumer: " << _id << ", grp: " << _grp_id |
236 | 71 | << ", max running time(ms): " << left_time; |
237 | | |
238 | 71 | int64_t received_rows = 0; |
239 | 71 | int64_t put_rows = 0; |
240 | 71 | int32_t retry_times = 0; |
241 | 71 | Status st = Status::OK(); |
242 | 71 | MonotonicStopWatch consumer_watch; |
243 | 71 | MonotonicStopWatch watch; |
244 | 71 | watch.start(); |
245 | 741 | while (true) { |
246 | 741 | { |
247 | 741 | std::unique_lock<std::mutex> l(_lock); |
248 | 741 | if (_cancelled) { |
249 | 0 | break; |
250 | 0 | } |
251 | 741 | } |
252 | | |
253 | 741 | if (left_time <= 0) { |
254 | 1 | break; |
255 | 1 | } |
256 | | |
257 | 740 | bool done = false; |
258 | | // consume 1 message at a time |
259 | 740 | consumer_watch.start(); |
260 | 740 | std::unique_ptr<RdKafka::Message> msg(_k_consumer->consume(1000 /* timeout, ms */)); |
261 | 740 | consumer_watch.stop(); |
262 | 740 | DorisMetrics::instance()->routine_load_get_msg_count->increment(1); |
263 | 740 | DorisMetrics::instance()->routine_load_get_msg_latency->increment( |
264 | 740 | consumer_watch.elapsed_time() / 1000 / 1000); |
265 | 740 | DBUG_EXECUTE_IF("KafkaDataConsumer.group_consume.out_of_range", { |
266 | 740 | done = true; |
267 | 740 | std::stringstream ss; |
268 | 740 | ss << "Offset out of range" |
269 | 740 | << ", consume partition " << msg->partition() << ", consume offset " |
270 | 740 | << msg->offset(); |
271 | 740 | LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << ss.str(); |
272 | 740 | st = Status::InternalError<false>(ss.str()); |
273 | 740 | break; |
274 | 740 | }); |
275 | 740 | switch (msg->err()) { |
276 | 665 | case RdKafka::ERR_NO_ERROR: |
277 | 665 | if (_consuming_partition_ids.count(msg->partition()) <= 0) { |
278 | 0 | _consuming_partition_ids.insert(msg->partition()); |
279 | 0 | } |
280 | 665 | DorisMetrics::instance()->routine_load_consume_bytes->increment(msg->len()); |
281 | 665 | if (msg->len() == 0) { |
282 | | // ignore msg with length 0. |
283 | | // put empty msg into queue will cause the load process shutting down. |
284 | 0 | break; |
285 | 665 | } else if (!queue->controlled_blocking_put(msg.get(), |
286 | 665 | config::blocking_queue_cv_wait_timeout_ms)) { |
287 | | // queue is shutdown |
288 | 0 | done = true; |
289 | 665 | } else { |
290 | 665 | ++put_rows; |
291 | 665 | msg.release(); // release the ownership, msg will be deleted after being processed |
292 | 665 | } |
293 | 665 | ++received_rows; |
294 | 665 | DorisMetrics::instance()->routine_load_consume_rows->increment(1); |
295 | 665 | break; |
296 | 5 | case RdKafka::ERR__TIMED_OUT: |
297 | | // leave the status as OK, because this may happened |
298 | | // if there is no data in kafka. |
299 | 5 | LOG(INFO) << "kafka consume timeout: " << _id; |
300 | 5 | break; |
301 | 0 | case RdKafka::ERR__TRANSPORT: |
302 | 0 | LOG(INFO) << "kafka consume Disconnected: " << _id |
303 | 0 | << ", retry times: " << retry_times++; |
304 | 0 | if (retry_times <= MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE) { |
305 | 0 | std::this_thread::sleep_for(std::chrono::milliseconds(200)); |
306 | 0 | break; |
307 | 0 | } |
308 | 0 | [[fallthrough]]; |
309 | 70 | case RdKafka::ERR__PARTITION_EOF: { |
310 | 70 | VLOG_NOTICE << "consumer meet partition eof: " << _id |
311 | 0 | << " partition offset: " << msg->offset(); |
312 | 70 | _consuming_partition_ids.erase(msg->partition()); |
313 | 70 | if (!queue->controlled_blocking_put(msg.get(), |
314 | 70 | config::blocking_queue_cv_wait_timeout_ms)) { |
315 | 0 | done = true; |
316 | 70 | } else if (_consuming_partition_ids.size() <= 0) { |
317 | 70 | LOG(INFO) << "all partitions meet eof: " << _id; |
318 | 70 | msg.release(); |
319 | 70 | done = true; |
320 | 70 | } else { |
321 | 0 | msg.release(); |
322 | 0 | } |
323 | 70 | break; |
324 | 0 | } |
325 | 0 | case RdKafka::ERR_OFFSET_OUT_OF_RANGE: { |
326 | 0 | done = true; |
327 | 0 | std::stringstream ss; |
328 | 0 | ss << msg->errstr() << ", consume partition " << msg->partition() << ", consume offset " |
329 | 0 | << msg->offset(); |
330 | 0 | LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << ss.str(); |
331 | 0 | st = Status::InternalError<false>(ss.str()); |
332 | 0 | break; |
333 | 0 | } |
334 | 0 | default: |
335 | 0 | LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr(); |
336 | 0 | done = true; |
337 | 0 | st = Status::InternalError<false>(msg->errstr()); |
338 | 0 | break; |
339 | 740 | } |
340 | | |
341 | 740 | left_time = max_running_time_ms - watch.elapsed_time() / 1000 / 1000; |
342 | 740 | if (done) { |
343 | 70 | break; |
344 | 70 | } |
345 | 740 | } |
346 | | |
347 | 71 | LOG(INFO) << "kafka consumer done: " << _id << ", grp: " << _grp_id |
348 | 71 | << ". cancelled: " << _cancelled << ", left time(ms): " << left_time |
349 | 71 | << ", total cost(ms): " << watch.elapsed_time() / 1000 / 1000 |
350 | 71 | << ", consume cost(ms): " << consumer_watch.elapsed_time() / 1000 / 1000 |
351 | 71 | << ", received rows: " << received_rows << ", put rows: " << put_rows; |
352 | | |
353 | 71 | return st; |
354 | 71 | } |
355 | | |
356 | 74 | Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids) { |
357 | | // create topic conf |
358 | 74 | RdKafka::Conf* tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); |
359 | 74 | Defer delete_conf {[tconf]() { delete tconf; }}; |
360 | | |
361 | | // create topic |
362 | 74 | std::string errstr; |
363 | 74 | RdKafka::Topic* topic = RdKafka::Topic::create(_k_consumer, _topic, tconf, errstr); |
364 | 74 | if (topic == nullptr) { |
365 | 0 | std::stringstream ss; |
366 | 0 | ss << "failed to create topic: " << errstr; |
367 | 0 | LOG(WARNING) << ss.str(); |
368 | 0 | return Status::InternalError(ss.str()); |
369 | 0 | } |
370 | | |
371 | 74 | Defer delete_topic {[topic]() { delete topic; }}; |
372 | | |
373 | | // get topic metadata |
374 | 74 | RdKafka::Metadata* metadata = nullptr; |
375 | 74 | RdKafka::ErrorCode err = |
376 | 74 | _k_consumer->metadata(false /* for this topic */, topic, &metadata, 5000); |
377 | 74 | if (err != RdKafka::ERR_NO_ERROR) { |
378 | 0 | std::stringstream ss; |
379 | 0 | ss << "failed to get partition meta: " << RdKafka::err2str(err); |
380 | 0 | LOG(WARNING) << ss.str(); |
381 | 0 | return Status::InternalError(ss.str()); |
382 | 0 | } |
383 | | |
384 | 74 | Defer delete_meta {[metadata]() { delete metadata; }}; |
385 | | |
386 | | // get partition ids |
387 | 74 | RdKafka::Metadata::TopicMetadataIterator it; |
388 | 145 | for (it = metadata->topics()->begin(); it != metadata->topics()->end(); ++it) { |
389 | 74 | if ((*it)->topic() != _topic) { |
390 | 0 | continue; |
391 | 0 | } |
392 | | |
393 | 74 | if ((*it)->err() != RdKafka::ERR_NO_ERROR) { |
394 | 3 | std::stringstream ss; |
395 | 3 | ss << "error: " << err2str((*it)->err()); |
396 | 3 | if ((*it)->err() == RdKafka::ERR_LEADER_NOT_AVAILABLE) { |
397 | 0 | ss << ", try again"; |
398 | 0 | } |
399 | 3 | LOG(WARNING) << ss.str(); |
400 | 3 | return Status::InternalError(ss.str()); |
401 | 3 | } |
402 | | |
403 | 71 | RdKafka::TopicMetadata::PartitionMetadataIterator ip; |
404 | 142 | for (ip = (*it)->partitions()->begin(); ip != (*it)->partitions()->end(); ++ip) { |
405 | 71 | partition_ids->push_back((*ip)->id()); |
406 | 71 | } |
407 | 71 | } |
408 | | |
409 | 71 | if (partition_ids->empty()) { |
410 | 0 | return Status::InternalError("no partition in this topic"); |
411 | 0 | } |
412 | | |
413 | 71 | return Status::OK(); |
414 | 71 | } |
415 | | |
416 | | // get offsets of each partition for times. |
417 | | // The input parameter "times" holds <partition, timestamps> |
418 | | // The output parameter "offsets" returns <partition, offsets> |
419 | | // |
420 | | // The returned offset for each partition is the earliest offset whose |
421 | | // timestamp is greater than or equal to the given timestamp in the |
422 | | // corresponding partition. |
423 | | // See librdkafka/rdkafkacpp.h##offsetsForTimes() |
424 | | Status KafkaDataConsumer::get_offsets_for_times(const std::vector<PIntegerPair>& times, |
425 | 1 | std::vector<PIntegerPair>* offsets, int timeout) { |
426 | | // create topic partition |
427 | 1 | std::vector<RdKafka::TopicPartition*> topic_partitions; |
428 | 1 | for (const auto& entry : times) { |
429 | 1 | RdKafka::TopicPartition* tp1 = |
430 | 1 | RdKafka::TopicPartition::create(_topic, entry.key(), entry.val()); |
431 | 1 | topic_partitions.push_back(tp1); |
432 | 1 | } |
433 | | // delete TopicPartition finally |
434 | 1 | Defer delete_tp {[&topic_partitions]() { |
435 | 1 | std::for_each(topic_partitions.begin(), topic_partitions.end(), |
436 | 1 | [](RdKafka::TopicPartition* tp1) { delete tp1; }); |
437 | 1 | }}; |
438 | | |
439 | | // get offsets for times |
440 | 1 | RdKafka::ErrorCode err = _k_consumer->offsetsForTimes(topic_partitions, timeout); |
441 | 1 | if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) { |
442 | 0 | std::stringstream ss; |
443 | 0 | ss << "failed to get offsets for times: " << RdKafka::err2str(err); |
444 | 0 | LOG(WARNING) << ss.str(); |
445 | 0 | return Status::InternalError(ss.str()); |
446 | 0 | } |
447 | | |
448 | 1 | for (const auto& topic_partition : topic_partitions) { |
449 | 1 | PIntegerPair pair; |
450 | 1 | pair.set_key(topic_partition->partition()); |
451 | 1 | pair.set_val(topic_partition->offset()); |
452 | 1 | offsets->push_back(std::move(pair)); |
453 | 1 | } |
454 | | |
455 | 1 | return Status::OK(); |
456 | 1 | } |
457 | | |
458 | | // get latest offsets for given partitions |
459 | | Status KafkaDataConsumer::get_latest_offsets_for_partitions( |
460 | | const std::vector<int32_t>& partition_ids, std::vector<PIntegerPair>* offsets, |
461 | 355 | int timeout) { |
462 | 355 | DBUG_EXECUTE_IF("KafkaDataConsumer.get_offsets_for_partitions.timeout", { |
463 | | // sleep 61s |
464 | 355 | std::this_thread::sleep_for(std::chrono::seconds(61)); |
465 | 355 | }); |
466 | 355 | MonotonicStopWatch watch; |
467 | 355 | watch.start(); |
468 | 355 | for (int32_t partition_id : partition_ids) { |
469 | 355 | int64_t low = 0; |
470 | 355 | int64_t high = 0; |
471 | 355 | auto timeout_ms = timeout - static_cast<int>(watch.elapsed_time() / 1000 / 1000); |
472 | 355 | if (UNLIKELY(timeout_ms <= 0)) { |
473 | 0 | return Status::InternalError("get kafka latest offsets for partitions timeout"); |
474 | 0 | } |
475 | | |
476 | 355 | RdKafka::ErrorCode err = |
477 | 355 | _k_consumer->query_watermark_offsets(_topic, partition_id, &low, &high, timeout_ms); |
478 | 355 | if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) { |
479 | 0 | std::stringstream ss; |
480 | 0 | ss << "failed to get latest offset for partition: " << partition_id |
481 | 0 | << ", err: " << RdKafka::err2str(err); |
482 | 0 | LOG(WARNING) << ss.str(); |
483 | 0 | return Status::InternalError(ss.str()); |
484 | 0 | } |
485 | | |
486 | 355 | PIntegerPair pair; |
487 | 355 | pair.set_key(partition_id); |
488 | 355 | pair.set_val(high); |
489 | 355 | offsets->push_back(std::move(pair)); |
490 | 355 | } |
491 | | |
492 | 355 | return Status::OK(); |
493 | 355 | } |
494 | | |
495 | | Status KafkaDataConsumer::get_real_offsets_for_partitions( |
496 | | const std::vector<PIntegerPair>& offset_flags, std::vector<PIntegerPair>* offsets, |
497 | 65 | int timeout) { |
498 | 65 | DBUG_EXECUTE_IF("KafkaDataConsumer.get_offsets_for_partitions.timeout", { |
499 | | // sleep 61s |
500 | 65 | std::this_thread::sleep_for(std::chrono::seconds(61)); |
501 | 65 | }); |
502 | 65 | MonotonicStopWatch watch; |
503 | 65 | watch.start(); |
504 | 65 | for (const auto& entry : offset_flags) { |
505 | 65 | PIntegerPair pair; |
506 | 65 | if (UNLIKELY(entry.val() >= 0)) { |
507 | 0 | pair.set_key(entry.key()); |
508 | 0 | pair.set_val(entry.val()); |
509 | 0 | offsets->push_back(std::move(pair)); |
510 | 0 | continue; |
511 | 0 | } |
512 | | |
513 | 65 | int64_t low = 0; |
514 | 65 | int64_t high = 0; |
515 | 65 | auto timeout_ms = timeout - static_cast<int>(watch.elapsed_time() / 1000 / 1000); |
516 | 65 | if (UNLIKELY(timeout_ms <= 0)) { |
517 | 0 | return Status::InternalError("get kafka real offsets for partitions timeout"); |
518 | 0 | } |
519 | | |
520 | 65 | RdKafka::ErrorCode err = |
521 | 65 | _k_consumer->query_watermark_offsets(_topic, entry.key(), &low, &high, timeout_ms); |
522 | 65 | if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) { |
523 | 0 | std::stringstream ss; |
524 | 0 | ss << "failed to get latest offset for partition: " << entry.key() |
525 | 0 | << ", err: " << RdKafka::err2str(err); |
526 | 0 | LOG(WARNING) << ss.str(); |
527 | 0 | return Status::InternalError(ss.str()); |
528 | 0 | } |
529 | | |
530 | 65 | pair.set_key(entry.key()); |
531 | 65 | if (entry.val() == -1) { |
532 | | // OFFSET_END_VAL = -1 |
533 | 5 | pair.set_val(high); |
534 | 60 | } else if (entry.val() == -2) { |
535 | | // OFFSET_BEGINNING_VAL = -2 |
536 | 60 | pair.set_val(low); |
537 | 60 | } |
538 | 65 | offsets->push_back(std::move(pair)); |
539 | 65 | } |
540 | | |
541 | 65 | return Status::OK(); |
542 | 65 | } |
543 | | |
544 | 71 | Status KafkaDataConsumer::cancel(std::shared_ptr<StreamLoadContext> ctx) { |
545 | 71 | std::unique_lock<std::mutex> l(_lock); |
546 | 71 | if (!_init) { |
547 | 0 | return Status::InternalError("consumer is not initialized"); |
548 | 0 | } |
549 | | |
550 | 71 | _cancelled = true; |
551 | 71 | LOG(INFO) << "kafka consumer cancelled. " << _id; |
552 | 71 | return Status::OK(); |
553 | 71 | } |
554 | | |
555 | 1.18k | Status KafkaDataConsumer::reset() { |
556 | 1.18k | std::unique_lock<std::mutex> l(_lock); |
557 | 1.18k | _cancelled = false; |
558 | 1.18k | _k_consumer->unassign(); |
559 | | // reset will be called before this consumer being returned to the pool. |
560 | | // so update _last_visit_time is reasonable. |
561 | 1.18k | _last_visit_time = time(nullptr); |
562 | 1.18k | return Status::OK(); |
563 | 1.18k | } |
564 | | |
565 | 69 | Status KafkaDataConsumer::commit(std::vector<RdKafka::TopicPartition*>& offset) { |
566 | | // Use async commit so that it will not block for a long time. |
567 | | // Commit failure has no effect on Doris, subsequent tasks will continue to commit the new offset |
568 | 69 | RdKafka::ErrorCode err = _k_consumer->commitAsync(offset); |
569 | 69 | if (err != RdKafka::ERR_NO_ERROR) { |
570 | 0 | return Status::InternalError("failed to commit kafka offset : {}", RdKafka::err2str(err)); |
571 | 0 | } |
572 | 69 | return Status::OK(); |
573 | 69 | } |
574 | | |
575 | | // if the kafka brokers and topic are same, |
576 | | // we considered this consumer as matched, thus can be reused. |
577 | 13.8k | bool KafkaDataConsumer::match(std::shared_ptr<StreamLoadContext> ctx) { |
578 | 13.8k | if (ctx->load_src_type != TLoadSourceType::KAFKA) { |
579 | 0 | return false; |
580 | 0 | } |
581 | 13.8k | if (_brokers != ctx->kafka_info->brokers || _topic != ctx->kafka_info->topic) { |
582 | 12.3k | return false; |
583 | 12.3k | } |
584 | | // check properties |
585 | 1.48k | if (_custom_properties.size() != ctx->kafka_info->properties.size()) { |
586 | 30 | return false; |
587 | 30 | } |
588 | 1.46k | for (auto& item : ctx->kafka_info->properties) { |
589 | 1.46k | std::unordered_map<std::string, std::string>::const_iterator itr = |
590 | 1.46k | _custom_properties.find(item.first); |
591 | 1.46k | if (itr == _custom_properties.end()) { |
592 | 0 | return false; |
593 | 0 | } |
594 | | |
595 | 1.46k | if (itr->second != item.second) { |
596 | 900 | return false; |
597 | 900 | } |
598 | 1.46k | } |
599 | 558 | return true; |
600 | 1.45k | } |
601 | | |
602 | | } // end namespace doris |