be/src/load/routine_load/data_consumer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "load/routine_load/data_consumer.h" |
19 | | |
20 | | #include <absl/strings/str_split.h> |
21 | | #include <gen_cpp/Types_types.h> |
22 | | #include <gen_cpp/internal_service.pb.h> |
23 | | #include <librdkafka/rdkafkacpp.h> |
24 | | |
25 | | #include <algorithm> |
26 | | // IWYU pragma: no_include <bits/chrono.h> |
27 | | #include <chrono> // IWYU pragma: keep |
28 | | #include <string> |
29 | | #include <thread> |
30 | | #include <utility> |
31 | | #include <vector> |
32 | | |
33 | | #include "common/config.h" |
34 | | #include "common/metrics/doris_metrics.h" |
35 | | #include "common/status.h" |
36 | | #include "runtime/exec_env.h" |
37 | | #include "runtime/small_file_mgr.h" |
38 | | #include "service/backend_options.h" |
39 | | #include "util/blocking_queue.hpp" |
40 | | #include "util/debug_points.h" |
41 | | #include "util/defer_op.h" |
42 | | #include "util/stopwatch.hpp" |
43 | | #include "util/string_util.h" |
44 | | #include "util/uid_util.h" |
45 | | |
46 | | namespace doris { |
47 | | |
48 | | static const std::string PROP_GROUP_ID = "group.id"; |
49 | | // init kafka consumer will only set common configs such as |
50 | | // brokers, groupid |
51 | 80 | Status KafkaDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) { |
52 | 80 | std::unique_lock<std::mutex> l(_lock); |
53 | 80 | if (_init) { |
54 | | // this consumer has already been initialized. |
55 | 0 | return Status::OK(); |
56 | 0 | } |
57 | | |
58 | 80 | RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); |
59 | | |
60 | | // conf has to be deleted finally |
61 | 80 | Defer delete_conf {[conf]() { delete conf; }}; |
62 | | |
63 | 80 | std::string errstr; |
64 | 1.04k | auto set_conf = [&conf, &errstr](const std::string& conf_key, const std::string& conf_val) { |
65 | 1.04k | RdKafka::Conf::ConfResult res = conf->set(conf_key, conf_val, errstr); |
66 | 1.04k | if (res == RdKafka::Conf::CONF_UNKNOWN) { |
67 | | // ignore unknown config |
68 | 0 | return Status::OK(); |
69 | 1.04k | } else if (errstr.find("not supported") != std::string::npos) { |
70 | | // some java-only properties may be passed to here, and librdkafak will return 'xxx' not supported |
71 | | // ignore it |
72 | 0 | return Status::OK(); |
73 | 1.04k | } else if (res != RdKafka::Conf::CONF_OK) { |
74 | 0 | std::stringstream ss; |
75 | 0 | ss << "PAUSE: failed to set '" << conf_key << "', value: '" << conf_val |
76 | 0 | << "', err: " << errstr; |
77 | 0 | LOG(WARNING) << ss.str(); |
78 | 0 | return Status::InternalError(ss.str()); |
79 | 0 | } |
80 | 1.04k | VLOG_NOTICE << "set " << conf_key << ": " << conf_val; |
81 | 1.04k | return Status::OK(); |
82 | 1.04k | }; |
83 | | |
84 | 80 | RETURN_IF_ERROR(set_conf("metadata.broker.list", ctx->kafka_info->brokers)); |
85 | 80 | RETURN_IF_ERROR(set_conf("enable.partition.eof", "true")); |
86 | 80 | RETURN_IF_ERROR(set_conf("enable.auto.offset.store", "false")); |
87 | | // TODO: set it larger than 0 after we set rd_kafka_conf_set_stats_cb() |
88 | 80 | RETURN_IF_ERROR(set_conf("statistics.interval.ms", "0")); |
89 | 80 | RETURN_IF_ERROR(set_conf("auto.offset.reset", "error")); |
90 | 80 | RETURN_IF_ERROR(set_conf("socket.keepalive.enable", "true")); |
91 | 80 | RETURN_IF_ERROR(set_conf("reconnect.backoff.ms", "100")); |
92 | 80 | RETURN_IF_ERROR(set_conf("reconnect.backoff.max.ms", "10000")); |
93 | 80 | RETURN_IF_ERROR(set_conf("api.version.request", config::kafka_api_version_request)); |
94 | 80 | RETURN_IF_ERROR(set_conf("api.version.fallback.ms", "0")); |
95 | 80 | RETURN_IF_ERROR(set_conf("broker.version.fallback", config::kafka_broker_version_fallback)); |
96 | 80 | RETURN_IF_ERROR(set_conf("broker.address.ttl", "0")); |
97 | 80 | if (config::kafka_debug != "disable") { |
98 | 0 | RETURN_IF_ERROR(set_conf("debug", config::kafka_debug)); |
99 | 0 | } |
100 | | |
101 | 80 | for (auto& item : ctx->kafka_info->properties) { |
102 | 76 | if (starts_with(item.second, "FILE:")) { |
103 | | // file property should has format: FILE:file_id:md5 |
104 | 0 | std::vector<std::string> parts = |
105 | 0 | absl::StrSplit(item.second, ":", absl::SkipWhitespace()); |
106 | 0 | if (parts.size() != 3) { |
107 | 0 | return Status::InternalError("PAUSE: Invalid file property of kafka: " + |
108 | 0 | item.second); |
109 | 0 | } |
110 | 0 | int64_t file_id = std::stol(parts[1]); |
111 | 0 | std::string file_path; |
112 | 0 | Status st = ctx->exec_env()->small_file_mgr()->get_file(file_id, parts[2], &file_path); |
113 | 0 | if (!st.ok()) { |
114 | 0 | return Status::InternalError("PAUSE: failed to get file for config: {}, error: {}", |
115 | 0 | item.first, st.to_string()); |
116 | 0 | } |
117 | 0 | RETURN_IF_ERROR(set_conf(item.first, file_path)); |
118 | 76 | } else { |
119 | 76 | RETURN_IF_ERROR(set_conf(item.first, item.second)); |
120 | 76 | } |
121 | 76 | _custom_properties.emplace(item.first, item.second); |
122 | 76 | } |
123 | | |
124 | | // if not specified group id, generate a random one. |
125 | | // ATTN: In the new version, we have set a group.id on the FE side for jobs that have not set a groupid, |
126 | | // but in order to ensure compatibility, we still do a check here. |
127 | 80 | if (_custom_properties.find(PROP_GROUP_ID) == _custom_properties.end()) { |
128 | 5 | std::stringstream ss; |
129 | 5 | ss << BackendOptions::get_localhost() << "_"; |
130 | 5 | std::string group_id = ss.str() + UniqueId::gen_uid().to_string(); |
131 | 5 | RETURN_IF_ERROR(set_conf(PROP_GROUP_ID, group_id)); |
132 | 5 | _custom_properties.emplace(PROP_GROUP_ID, group_id); |
133 | 5 | } |
134 | 80 | LOG(INFO) << "init kafka consumer with group id: " << _custom_properties[PROP_GROUP_ID]; |
135 | | |
136 | 80 | if (conf->set("event_cb", &_k_event_cb, errstr) != RdKafka::Conf::CONF_OK) { |
137 | 0 | std::stringstream ss; |
138 | 0 | ss << "PAUSE: failed to set 'event_cb'"; |
139 | 0 | LOG(WARNING) << ss.str(); |
140 | 0 | return Status::InternalError(ss.str()); |
141 | 0 | } |
142 | | |
143 | | // create consumer |
144 | 80 | _k_consumer = RdKafka::KafkaConsumer::create(conf, errstr); |
145 | 80 | if (!_k_consumer) { |
146 | 0 | LOG(WARNING) << "PAUSE: failed to create kafka consumer: " << errstr; |
147 | 0 | return Status::InternalError("PAUSE: failed to create kafka consumer: " + errstr); |
148 | 0 | } |
149 | | |
150 | 80 | VLOG_NOTICE << "finished to init kafka consumer. " << ctx->brief(); |
151 | | |
152 | 80 | _init = true; |
153 | 80 | return Status::OK(); |
154 | 80 | } |
155 | | |
156 | | Status KafkaDataConsumer::assign_topic_partitions( |
157 | | const std::map<int32_t, int64_t>& begin_partition_offset, const std::string& topic, |
158 | 71 | std::shared_ptr<StreamLoadContext> ctx) { |
159 | 71 | DCHECK(_k_consumer); |
160 | | // create TopicPartitions |
161 | 71 | std::stringstream ss; |
162 | 71 | std::vector<RdKafka::TopicPartition*> topic_partitions; |
163 | 71 | for (auto& entry : begin_partition_offset) { |
164 | 71 | RdKafka::TopicPartition* tp1 = |
165 | 71 | RdKafka::TopicPartition::create(topic, entry.first, entry.second); |
166 | 71 | topic_partitions.push_back(tp1); |
167 | 71 | _consuming_partition_ids.insert(entry.first); |
168 | 71 | ss << "[" << entry.first << ": " << entry.second << "] "; |
169 | 71 | } |
170 | | |
171 | 71 | LOG(INFO) << "consumer: " << _id << ", grp: " << _grp_id |
172 | 71 | << " assign topic partitions: " << topic << ", " << ss.str(); |
173 | | |
174 | | // delete TopicPartition finally |
175 | 71 | Defer delete_tp {[&topic_partitions]() { |
176 | 71 | std::for_each(topic_partitions.begin(), topic_partitions.end(), |
177 | 71 | [](RdKafka::TopicPartition* tp1) { delete tp1; }); |
178 | 71 | }}; |
179 | | |
180 | | // assign partition |
181 | 71 | RdKafka::ErrorCode err = _k_consumer->assign(topic_partitions); |
182 | 71 | if (err) { |
183 | 0 | LOG(WARNING) << "failed to assign topic partitions: " << ctx->brief(true) |
184 | 0 | << ", err: " << RdKafka::err2str(err); |
185 | 0 | _k_consumer->unassign(); |
186 | 0 | return Status::InternalError("failed to assign topic partitions"); |
187 | 0 | } |
188 | | |
189 | 71 | return Status::OK(); |
190 | 71 | } |
191 | | |
192 | | Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue, |
193 | 71 | int64_t max_running_time_ms) { |
194 | 71 | static constexpr int MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE = 3; |
195 | 71 | int64_t left_time = max_running_time_ms; |
196 | 71 | LOG(INFO) << "start kafka consumer: " << _id << ", grp: " << _grp_id |
197 | 71 | << ", max running time(ms): " << left_time; |
198 | | |
199 | 71 | int64_t received_rows = 0; |
200 | 71 | int64_t put_rows = 0; |
201 | 71 | int32_t retry_times = 0; |
202 | 71 | Status st = Status::OK(); |
203 | 71 | MonotonicStopWatch consumer_watch; |
204 | 71 | MonotonicStopWatch watch; |
205 | 71 | watch.start(); |
206 | 741 | while (true) { |
207 | 741 | { |
208 | 741 | std::unique_lock<std::mutex> l(_lock); |
209 | 741 | if (_cancelled) { |
210 | 0 | break; |
211 | 0 | } |
212 | 741 | } |
213 | | |
214 | 741 | if (left_time <= 0) { |
215 | 1 | break; |
216 | 1 | } |
217 | | |
218 | 740 | bool done = false; |
219 | | // consume 1 message at a time |
220 | 740 | consumer_watch.start(); |
221 | 740 | std::unique_ptr<RdKafka::Message> msg(_k_consumer->consume(1000 /* timeout, ms */)); |
222 | 740 | consumer_watch.stop(); |
223 | 740 | DorisMetrics::instance()->routine_load_get_msg_count->increment(1); |
224 | 740 | DorisMetrics::instance()->routine_load_get_msg_latency->increment( |
225 | 740 | consumer_watch.elapsed_time() / 1000 / 1000); |
226 | 740 | DBUG_EXECUTE_IF("KafkaDataConsumer.group_consume.out_of_range", { |
227 | 740 | done = true; |
228 | 740 | std::stringstream ss; |
229 | 740 | ss << "Offset out of range" |
230 | 740 | << ", consume partition " << msg->partition() << ", consume offset " |
231 | 740 | << msg->offset(); |
232 | 740 | LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << ss.str(); |
233 | 740 | st = Status::InternalError<false>(ss.str()); |
234 | 740 | break; |
235 | 740 | }); |
236 | 740 | switch (msg->err()) { |
237 | 665 | case RdKafka::ERR_NO_ERROR: |
238 | 665 | if (_consuming_partition_ids.count(msg->partition()) <= 0) { |
239 | 0 | _consuming_partition_ids.insert(msg->partition()); |
240 | 0 | } |
241 | 665 | DorisMetrics::instance()->routine_load_consume_bytes->increment(msg->len()); |
242 | 665 | if (msg->len() == 0) { |
243 | | // ignore msg with length 0. |
244 | | // put empty msg into queue will cause the load process shutting down. |
245 | 0 | break; |
246 | 665 | } else if (!queue->controlled_blocking_put(msg.get(), |
247 | 665 | config::blocking_queue_cv_wait_timeout_ms)) { |
248 | | // queue is shutdown |
249 | 0 | done = true; |
250 | 665 | } else { |
251 | 665 | ++put_rows; |
252 | 665 | msg.release(); // release the ownership, msg will be deleted after being processed |
253 | 665 | } |
254 | 665 | ++received_rows; |
255 | 665 | DorisMetrics::instance()->routine_load_consume_rows->increment(1); |
256 | 665 | break; |
257 | 5 | case RdKafka::ERR__TIMED_OUT: |
258 | | // leave the status as OK, because this may happened |
259 | | // if there is no data in kafka. |
260 | 5 | LOG(INFO) << "kafka consume timeout: " << _id; |
261 | 5 | break; |
262 | 0 | case RdKafka::ERR__TRANSPORT: |
263 | 0 | LOG(INFO) << "kafka consume Disconnected: " << _id |
264 | 0 | << ", retry times: " << retry_times++; |
265 | 0 | if (retry_times <= MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE) { |
266 | 0 | std::this_thread::sleep_for(std::chrono::milliseconds(200)); |
267 | 0 | break; |
268 | 0 | } |
269 | 0 | [[fallthrough]]; |
270 | 70 | case RdKafka::ERR__PARTITION_EOF: { |
271 | 70 | VLOG_NOTICE << "consumer meet partition eof: " << _id |
272 | 0 | << " partition offset: " << msg->offset(); |
273 | 70 | _consuming_partition_ids.erase(msg->partition()); |
274 | 70 | if (!queue->controlled_blocking_put(msg.get(), |
275 | 70 | config::blocking_queue_cv_wait_timeout_ms)) { |
276 | 0 | done = true; |
277 | 70 | } else if (_consuming_partition_ids.size() <= 0) { |
278 | 70 | LOG(INFO) << "all partitions meet eof: " << _id; |
279 | 70 | msg.release(); |
280 | 70 | done = true; |
281 | 70 | } else { |
282 | 0 | msg.release(); |
283 | 0 | } |
284 | 70 | break; |
285 | 0 | } |
286 | 0 | case RdKafka::ERR_OFFSET_OUT_OF_RANGE: { |
287 | 0 | done = true; |
288 | 0 | std::stringstream ss; |
289 | 0 | ss << msg->errstr() << ", consume partition " << msg->partition() << ", consume offset " |
290 | 0 | << msg->offset(); |
291 | 0 | LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << ss.str(); |
292 | 0 | st = Status::InternalError<false>(ss.str()); |
293 | 0 | break; |
294 | 0 | } |
295 | 0 | default: |
296 | 0 | LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr(); |
297 | 0 | done = true; |
298 | 0 | st = Status::InternalError<false>(msg->errstr()); |
299 | 0 | break; |
300 | 740 | } |
301 | | |
302 | 740 | left_time = max_running_time_ms - watch.elapsed_time() / 1000 / 1000; |
303 | 740 | if (done) { |
304 | 70 | break; |
305 | 70 | } |
306 | 740 | } |
307 | | |
308 | 71 | LOG(INFO) << "kafka consumer done: " << _id << ", grp: " << _grp_id |
309 | 71 | << ". cancelled: " << _cancelled << ", left time(ms): " << left_time |
310 | 71 | << ", total cost(ms): " << watch.elapsed_time() / 1000 / 1000 |
311 | 71 | << ", consume cost(ms): " << consumer_watch.elapsed_time() / 1000 / 1000 |
312 | 71 | << ", received rows: " << received_rows << ", put rows: " << put_rows; |
313 | | |
314 | 71 | return st; |
315 | 71 | } |
316 | | |
317 | 82 | Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids) { |
318 | | // create topic conf |
319 | 82 | RdKafka::Conf* tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); |
320 | 82 | Defer delete_conf {[tconf]() { delete tconf; }}; |
321 | | |
322 | | // create topic |
323 | 82 | std::string errstr; |
324 | 82 | RdKafka::Topic* topic = RdKafka::Topic::create(_k_consumer, _topic, tconf, errstr); |
325 | 82 | if (topic == nullptr) { |
326 | 0 | std::stringstream ss; |
327 | 0 | ss << "failed to create topic: " << errstr; |
328 | 0 | LOG(WARNING) << ss.str(); |
329 | 0 | return Status::InternalError(ss.str()); |
330 | 0 | } |
331 | | |
332 | 82 | Defer delete_topic {[topic]() { delete topic; }}; |
333 | | |
334 | | // get topic metadata |
335 | 82 | RdKafka::Metadata* metadata = nullptr; |
336 | 82 | RdKafka::ErrorCode err = |
337 | 82 | _k_consumer->metadata(false /* for this topic */, topic, &metadata, 5000); |
338 | 82 | if (err != RdKafka::ERR_NO_ERROR) { |
339 | 0 | std::stringstream ss; |
340 | 0 | ss << "failed to get partition meta: " << RdKafka::err2str(err); |
341 | 0 | LOG(WARNING) << ss.str(); |
342 | 0 | return Status::InternalError(ss.str()); |
343 | 0 | } |
344 | | |
345 | 82 | Defer delete_meta {[metadata]() { delete metadata; }}; |
346 | | |
347 | | // get partition ids |
348 | 82 | RdKafka::Metadata::TopicMetadataIterator it; |
349 | 161 | for (it = metadata->topics()->begin(); it != metadata->topics()->end(); ++it) { |
350 | 82 | if ((*it)->topic() != _topic) { |
351 | 0 | continue; |
352 | 0 | } |
353 | | |
354 | 82 | if ((*it)->err() != RdKafka::ERR_NO_ERROR) { |
355 | 3 | std::stringstream ss; |
356 | 3 | ss << "error: " << err2str((*it)->err()); |
357 | 3 | if ((*it)->err() == RdKafka::ERR_LEADER_NOT_AVAILABLE) { |
358 | 0 | ss << ", try again"; |
359 | 0 | } |
360 | 3 | LOG(WARNING) << ss.str(); |
361 | 3 | return Status::InternalError(ss.str()); |
362 | 3 | } |
363 | | |
364 | 79 | RdKafka::TopicMetadata::PartitionMetadataIterator ip; |
365 | 158 | for (ip = (*it)->partitions()->begin(); ip != (*it)->partitions()->end(); ++ip) { |
366 | 79 | partition_ids->push_back((*ip)->id()); |
367 | 79 | } |
368 | 79 | } |
369 | | |
370 | 79 | if (partition_ids->empty()) { |
371 | 0 | return Status::InternalError("no partition in this topic"); |
372 | 0 | } |
373 | | |
374 | 79 | return Status::OK(); |
375 | 79 | } |
376 | | |
377 | | // get offsets of each partition for times. |
378 | | // The input parameter "times" holds <partition, timestamps> |
379 | | // The output parameter "offsets" returns <partition, offsets> |
380 | | // |
381 | | // The returned offset for each partition is the earliest offset whose |
382 | | // timestamp is greater than or equal to the given timestamp in the |
383 | | // corresponding partition. |
384 | | // See librdkafka/rdkafkacpp.h##offsetsForTimes() |
385 | | Status KafkaDataConsumer::get_offsets_for_times(const std::vector<PIntegerPair>& times, |
386 | 1 | std::vector<PIntegerPair>* offsets, int timeout) { |
387 | | // create topic partition |
388 | 1 | std::vector<RdKafka::TopicPartition*> topic_partitions; |
389 | 1 | for (const auto& entry : times) { |
390 | 1 | RdKafka::TopicPartition* tp1 = |
391 | 1 | RdKafka::TopicPartition::create(_topic, entry.key(), entry.val()); |
392 | 1 | topic_partitions.push_back(tp1); |
393 | 1 | } |
394 | | // delete TopicPartition finally |
395 | 1 | Defer delete_tp {[&topic_partitions]() { |
396 | 1 | std::for_each(topic_partitions.begin(), topic_partitions.end(), |
397 | 1 | [](RdKafka::TopicPartition* tp1) { delete tp1; }); |
398 | 1 | }}; |
399 | | |
400 | | // get offsets for times |
401 | 1 | RdKafka::ErrorCode err = _k_consumer->offsetsForTimes(topic_partitions, timeout); |
402 | 1 | if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) { |
403 | 0 | std::stringstream ss; |
404 | 0 | ss << "failed to get offsets for times: " << RdKafka::err2str(err); |
405 | 0 | LOG(WARNING) << ss.str(); |
406 | 0 | return Status::InternalError(ss.str()); |
407 | 0 | } |
408 | | |
409 | 1 | for (const auto& topic_partition : topic_partitions) { |
410 | 1 | PIntegerPair pair; |
411 | 1 | pair.set_key(topic_partition->partition()); |
412 | 1 | pair.set_val(topic_partition->offset()); |
413 | 1 | offsets->push_back(std::move(pair)); |
414 | 1 | } |
415 | | |
416 | 1 | return Status::OK(); |
417 | 1 | } |
418 | | |
419 | | // get latest offsets for given partitions |
420 | | Status KafkaDataConsumer::get_latest_offsets_for_partitions( |
421 | | const std::vector<int32_t>& partition_ids, std::vector<PIntegerPair>* offsets, |
422 | 4.15k | int timeout) { |
423 | 4.15k | DBUG_EXECUTE_IF("KafkaDataConsumer.get_latest_offsets_for_partitions.timeout", { |
424 | | // sleep 60s |
425 | 4.15k | std::this_thread::sleep_for(std::chrono::seconds(60)); |
426 | 4.15k | }); |
427 | 4.15k | MonotonicStopWatch watch; |
428 | 4.15k | watch.start(); |
429 | 4.15k | for (int32_t partition_id : partition_ids) { |
430 | 4.15k | int64_t low = 0; |
431 | 4.15k | int64_t high = 0; |
432 | 4.15k | auto timeout_ms = timeout - static_cast<int>(watch.elapsed_time() / 1000 / 1000); |
433 | 4.15k | if (UNLIKELY(timeout_ms <= 0)) { |
434 | 0 | return Status::InternalError("get kafka latest offsets for partitions timeout"); |
435 | 0 | } |
436 | | |
437 | 4.15k | RdKafka::ErrorCode err = |
438 | 4.15k | _k_consumer->query_watermark_offsets(_topic, partition_id, &low, &high, timeout_ms); |
439 | 4.15k | if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) { |
440 | 0 | std::stringstream ss; |
441 | 0 | ss << "failed to get latest offset for partition: " << partition_id |
442 | 0 | << ", err: " << RdKafka::err2str(err); |
443 | 0 | LOG(WARNING) << ss.str(); |
444 | 0 | return Status::InternalError(ss.str()); |
445 | 0 | } |
446 | | |
447 | 4.15k | PIntegerPair pair; |
448 | 4.15k | pair.set_key(partition_id); |
449 | 4.15k | pair.set_val(high); |
450 | 4.15k | offsets->push_back(std::move(pair)); |
451 | 4.15k | } |
452 | | |
453 | 4.15k | return Status::OK(); |
454 | 4.15k | } |
455 | | |
456 | | Status KafkaDataConsumer::get_real_offsets_for_partitions( |
457 | | const std::vector<PIntegerPair>& offset_flags, std::vector<PIntegerPair>* offsets, |
458 | 65 | int timeout) { |
459 | 65 | MonotonicStopWatch watch; |
460 | 65 | watch.start(); |
461 | 65 | for (const auto& entry : offset_flags) { |
462 | 65 | PIntegerPair pair; |
463 | 65 | if (UNLIKELY(entry.val() >= 0)) { |
464 | 0 | pair.set_key(entry.key()); |
465 | 0 | pair.set_val(entry.val()); |
466 | 0 | offsets->push_back(std::move(pair)); |
467 | 0 | continue; |
468 | 0 | } |
469 | | |
470 | 65 | int64_t low = 0; |
471 | 65 | int64_t high = 0; |
472 | 65 | auto timeout_ms = timeout - static_cast<int>(watch.elapsed_time() / 1000 / 1000); |
473 | 65 | if (UNLIKELY(timeout_ms <= 0)) { |
474 | 0 | return Status::InternalError("get kafka real offsets for partitions timeout"); |
475 | 0 | } |
476 | | |
477 | 65 | RdKafka::ErrorCode err = |
478 | 65 | _k_consumer->query_watermark_offsets(_topic, entry.key(), &low, &high, timeout_ms); |
479 | 65 | if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) { |
480 | 0 | std::stringstream ss; |
481 | 0 | ss << "failed to get latest offset for partition: " << entry.key() |
482 | 0 | << ", err: " << RdKafka::err2str(err); |
483 | 0 | LOG(WARNING) << ss.str(); |
484 | 0 | return Status::InternalError(ss.str()); |
485 | 0 | } |
486 | | |
487 | 65 | pair.set_key(entry.key()); |
488 | 65 | if (entry.val() == -1) { |
489 | | // OFFSET_END_VAL = -1 |
490 | 5 | pair.set_val(high); |
491 | 60 | } else if (entry.val() == -2) { |
492 | | // OFFSET_BEGINNING_VAL = -2 |
493 | 60 | pair.set_val(low); |
494 | 60 | } |
495 | 65 | offsets->push_back(std::move(pair)); |
496 | 65 | } |
497 | | |
498 | 65 | return Status::OK(); |
499 | 65 | } |
500 | | |
501 | 71 | Status KafkaDataConsumer::cancel(std::shared_ptr<StreamLoadContext> ctx) { |
502 | 71 | std::unique_lock<std::mutex> l(_lock); |
503 | 71 | if (!_init) { |
504 | 0 | return Status::InternalError("consumer is not initialized"); |
505 | 0 | } |
506 | | |
507 | 71 | _cancelled = true; |
508 | 71 | LOG(INFO) << "kafka consumer cancelled. " << _id; |
509 | 71 | return Status::OK(); |
510 | 71 | } |
511 | | |
512 | 8.79k | Status KafkaDataConsumer::reset() { |
513 | 8.79k | std::unique_lock<std::mutex> l(_lock); |
514 | 8.79k | _cancelled = false; |
515 | 8.79k | _k_consumer->unassign(); |
516 | | // reset will be called before this consumer being returned to the pool. |
517 | | // so update _last_visit_time is reasonable. |
518 | 8.79k | _last_visit_time = time(nullptr); |
519 | 8.79k | return Status::OK(); |
520 | 8.79k | } |
521 | | |
522 | 69 | Status KafkaDataConsumer::commit(std::vector<RdKafka::TopicPartition*>& offset) { |
523 | | // Use async commit so that it will not block for a long time. |
524 | | // Commit failure has no effect on Doris, subsequent tasks will continue to commit the new offset |
525 | 69 | RdKafka::ErrorCode err = _k_consumer->commitAsync(offset); |
526 | 69 | if (err != RdKafka::ERR_NO_ERROR) { |
527 | 0 | return Status::InternalError("failed to commit kafka offset : {}", RdKafka::err2str(err)); |
528 | 0 | } |
529 | 69 | return Status::OK(); |
530 | 69 | } |
531 | | |
532 | | // if the kafka brokers and topic are same, |
533 | | // we considered this consumer as matched, thus can be reused. |
534 | 154k | bool KafkaDataConsumer::match(std::shared_ptr<StreamLoadContext> ctx) { |
535 | 154k | if (ctx->load_src_type != TLoadSourceType::KAFKA) { |
536 | 0 | return false; |
537 | 0 | } |
538 | 154k | if (_brokers != ctx->kafka_info->brokers || _topic != ctx->kafka_info->topic) { |
539 | 149k | return false; |
540 | 149k | } |
541 | | // check properties |
542 | 5.72k | if (_custom_properties.size() != ctx->kafka_info->properties.size()) { |
543 | 30 | return false; |
544 | 30 | } |
545 | 5.70k | for (auto& item : ctx->kafka_info->properties) { |
546 | 5.70k | std::unordered_map<std::string, std::string>::const_iterator itr = |
547 | 5.70k | _custom_properties.find(item.first); |
548 | 5.70k | if (itr == _custom_properties.end()) { |
549 | 0 | return false; |
550 | 0 | } |
551 | | |
552 | 5.70k | if (itr->second != item.second) { |
553 | 1.33k | return false; |
554 | 1.33k | } |
555 | 5.70k | } |
556 | 4.36k | return true; |
557 | 5.69k | } |
558 | | |
559 | | } // end namespace doris |