Coverage Report

Created: 2026-04-01 15:21

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/routine_load/data_consumer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/routine_load/data_consumer.h"
19
20
#include <absl/strings/str_split.h>
21
#include <gen_cpp/Types_types.h>
22
#include <gen_cpp/internal_service.pb.h>
23
#include <librdkafka/rdkafkacpp.h>
24
25
#include <algorithm>
26
// IWYU pragma: no_include <bits/chrono.h>
27
#include <chrono> // IWYU pragma: keep
28
#include <string>
29
#include <thread>
30
#include <utility>
31
#include <vector>
32
33
#include "common/config.h"
34
#include "common/metrics/doris_metrics.h"
35
#include "common/status.h"
36
#include "runtime/aws_msk_iam_auth.h"
37
#include "runtime/exec_env.h"
38
#include "runtime/small_file_mgr.h"
39
#include "service/backend_options.h"
40
#include "util/blocking_queue.hpp"
41
#include "util/debug_points.h"
42
#include "util/defer_op.h"
43
#include "util/stopwatch.hpp"
44
#include "util/string_util.h"
45
#include "util/uid_util.h"
46
47
namespace doris {
48
49
static const std::string PROP_GROUP_ID = "group.id";
50
// init kafka consumer will only set common configs such as
51
// brokers, groupid
52
77
Status KafkaDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) {
53
77
    std::unique_lock<std::mutex> l(_lock);
54
77
    if (_init) {
55
        // this consumer has already been initialized.
56
0
        return Status::OK();
57
0
    }
58
59
77
    RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
60
61
    // conf has to be deleted finally
62
77
    Defer delete_conf {[conf]() { delete conf; }};
63
64
77
    std::string errstr;
65
1.00k
    auto set_conf = [&conf, &errstr](const std::string& conf_key, const std::string& conf_val) {
66
1.00k
        RdKafka::Conf::ConfResult res = conf->set(conf_key, conf_val, errstr);
67
1.00k
        if (res == RdKafka::Conf::CONF_UNKNOWN) {
68
            // ignore unknown config
69
0
            return Status::OK();
70
1.00k
        } else if (errstr.find("not supported") != std::string::npos) {
71
            // some java-only properties may be passed to here, and librdkafak will return 'xxx' not supported
72
            // ignore it
73
0
            return Status::OK();
74
1.00k
        } else if (res != RdKafka::Conf::CONF_OK) {
75
0
            std::stringstream ss;
76
0
            ss << "PAUSE: failed to set '" << conf_key << "', value: '" << conf_val
77
0
               << "', err: " << errstr;
78
0
            LOG(WARNING) << ss.str();
79
0
            return Status::InternalError(ss.str());
80
0
        }
81
1.00k
        VLOG_NOTICE << "set " << conf_key << ": " << conf_val;
82
1.00k
        return Status::OK();
83
1.00k
    };
84
85
77
    RETURN_IF_ERROR(set_conf("metadata.broker.list", ctx->kafka_info->brokers));
86
77
    RETURN_IF_ERROR(set_conf("enable.partition.eof", "true"));
87
77
    RETURN_IF_ERROR(set_conf("enable.auto.offset.store", "false"));
88
    // TODO: set it larger than 0 after we set rd_kafka_conf_set_stats_cb()
89
77
    RETURN_IF_ERROR(set_conf("statistics.interval.ms", "0"));
90
77
    RETURN_IF_ERROR(set_conf("auto.offset.reset", "error"));
91
77
    RETURN_IF_ERROR(set_conf("socket.keepalive.enable", "true"));
92
77
    RETURN_IF_ERROR(set_conf("reconnect.backoff.ms", "100"));
93
77
    RETURN_IF_ERROR(set_conf("reconnect.backoff.max.ms", "10000"));
94
77
    RETURN_IF_ERROR(set_conf("api.version.request", config::kafka_api_version_request));
95
77
    RETURN_IF_ERROR(set_conf("api.version.fallback.ms", "0"));
96
77
    RETURN_IF_ERROR(set_conf("broker.version.fallback", config::kafka_broker_version_fallback));
97
77
    RETURN_IF_ERROR(set_conf("broker.address.ttl", "0"));
98
77
    if (config::kafka_debug != "disable") {
99
0
        RETURN_IF_ERROR(set_conf("debug", config::kafka_debug));
100
0
    }
101
102
77
    for (auto& item : ctx->kafka_info->properties) {
103
73
        _custom_properties.emplace(item.first, item.second);
104
105
        // AWS properties (aws.*) are Doris-specific for MSK IAM authentication
106
        // and should not be passed to librdkafka
107
73
        if (starts_with(item.first, "aws.")) {
108
0
            LOG(INFO) << "Skipping AWS property for librdkafka: " << item.first;
109
0
            continue;
110
0
        }
111
112
73
        if (starts_with(item.second, "FILE:")) {
113
            // file property should has format: FILE:file_id:md5
114
0
            std::vector<std::string> parts =
115
0
                    absl::StrSplit(item.second, ":", absl::SkipWhitespace());
116
0
            if (parts.size() != 3) {
117
0
                return Status::InternalError("PAUSE: Invalid file property of kafka: " +
118
0
                                             item.second);
119
0
            }
120
0
            int64_t file_id = std::stol(parts[1]);
121
0
            std::string file_path;
122
0
            Status st = ctx->exec_env()->small_file_mgr()->get_file(file_id, parts[2], &file_path);
123
0
            if (!st.ok()) {
124
0
                return Status::InternalError("PAUSE: failed to get file for config: {}, error: {}",
125
0
                                             item.first, st.to_string());
126
0
            }
127
0
            RETURN_IF_ERROR(set_conf(item.first, file_path));
128
73
        } else {
129
73
            RETURN_IF_ERROR(set_conf(item.first, item.second));
130
73
        }
131
73
    }
132
133
    // if not specified group id, generate a random one.
134
    // ATTN: In the new version, we have set a group.id on the FE side for jobs that have not set a groupid,
135
    // but in order to ensure compatibility, we still do a check here.
136
77
    if (!_custom_properties.contains(PROP_GROUP_ID)) {
137
5
        std::stringstream ss;
138
5
        ss << BackendOptions::get_localhost() << "_";
139
5
        std::string group_id = ss.str() + UniqueId::gen_uid().to_string();
140
5
        RETURN_IF_ERROR(set_conf(PROP_GROUP_ID, group_id));
141
5
        _custom_properties.emplace(PROP_GROUP_ID, group_id);
142
5
    }
143
77
    LOG(INFO) << "init kafka consumer with group id: " << _custom_properties[PROP_GROUP_ID];
144
145
77
    if (conf->set("event_cb", &_k_event_cb, errstr) != RdKafka::Conf::CONF_OK) {
146
0
        std::stringstream ss;
147
0
        ss << "PAUSE: failed to set 'event_cb'";
148
0
        LOG(WARNING) << ss.str();
149
0
        return Status::InternalError(ss.str());
150
0
    }
151
152
    // Set up AWS MSK IAM authentication if configured
153
77
    _aws_msk_oauth_callback = AwsMskIamOAuthCallback::create_from_properties(
154
77
            _custom_properties, ctx->kafka_info->brokers);
155
77
    if (_aws_msk_oauth_callback) {
156
        // Enable SASL queue to support background callbacks
157
0
        if (conf->enable_sasl_queue(true, errstr) != RdKafka::Conf::CONF_OK) {
158
0
            LOG(WARNING) << "PAUSE: failed to enable SASL queue: " << errstr;
159
0
            return Status::InternalError("PAUSE: failed to enable SASL queue: " + errstr);
160
0
        }
161
162
0
        if (conf->set("oauthbearer_token_refresh_cb", _aws_msk_oauth_callback.get(), errstr) !=
163
0
            RdKafka::Conf::CONF_OK) {
164
0
            LOG(WARNING) << "PAUSE: failed to set OAuth callback: " << errstr;
165
0
            return Status::InternalError("PAUSE: failed to set OAuth callback: " + errstr);
166
0
        }
167
0
        LOG(INFO) << "AWS MSK IAM authentication enabled successfully";
168
0
    }
169
170
    // create consumer
171
77
    _k_consumer = RdKafka::KafkaConsumer::create(conf, errstr);
172
77
    if (!_k_consumer) {
173
0
        LOG(WARNING) << "PAUSE: failed to create kafka consumer: " << errstr;
174
0
        return Status::InternalError("PAUSE: failed to create kafka consumer: " + errstr);
175
0
    }
176
177
    // If AWS MSK IAM auth is enabled, inject initial token and enable background refresh
178
77
    if (_aws_msk_oauth_callback) {
179
0
        RETURN_IF_ERROR(_aws_msk_oauth_callback->refresh_now(_k_consumer));
180
181
0
        std::unique_ptr<RdKafka::Error> bg_err(_k_consumer->sasl_background_callbacks_enable());
182
0
        if (bg_err) {
183
0
            return Status::InternalError("Failed to enable SASL background callbacks: " +
184
0
                                         bg_err->str());
185
0
        }
186
0
        LOG(INFO) << "AWS MSK IAM: initial token set, background refresh enabled";
187
0
    }
188
189
77
    VLOG_NOTICE << "finished to init kafka consumer. " << ctx->brief();
190
191
77
    _init = true;
192
77
    return Status::OK();
193
77
}
194
195
Status KafkaDataConsumer::assign_topic_partitions(
196
        const std::map<int32_t, int64_t>& begin_partition_offset, const std::string& topic,
197
71
        std::shared_ptr<StreamLoadContext> ctx) {
198
71
    DCHECK(_k_consumer);
199
    // create TopicPartitions
200
71
    std::stringstream ss;
201
71
    std::vector<RdKafka::TopicPartition*> topic_partitions;
202
71
    for (auto& entry : begin_partition_offset) {
203
71
        RdKafka::TopicPartition* tp1 =
204
71
                RdKafka::TopicPartition::create(topic, entry.first, entry.second);
205
71
        topic_partitions.push_back(tp1);
206
71
        _consuming_partition_ids.insert(entry.first);
207
71
        ss << "[" << entry.first << ": " << entry.second << "] ";
208
71
    }
209
210
71
    LOG(INFO) << "consumer: " << _id << ", grp: " << _grp_id
211
71
              << " assign topic partitions: " << topic << ", " << ss.str();
212
213
    // delete TopicPartition finally
214
71
    Defer delete_tp {[&topic_partitions]() {
215
71
        std::for_each(topic_partitions.begin(), topic_partitions.end(),
216
71
                      [](RdKafka::TopicPartition* tp1) { delete tp1; });
217
71
    }};
218
219
    // assign partition
220
71
    RdKafka::ErrorCode err = _k_consumer->assign(topic_partitions);
221
71
    if (err) {
222
0
        LOG(WARNING) << "failed to assign topic partitions: " << ctx->brief(true)
223
0
                     << ", err: " << RdKafka::err2str(err);
224
0
        _k_consumer->unassign();
225
0
        return Status::InternalError("failed to assign topic partitions");
226
0
    }
227
228
71
    return Status::OK();
229
71
}
230
231
Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
232
71
                                        int64_t max_running_time_ms) {
233
71
    static constexpr int MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE = 3;
234
71
    int64_t left_time = max_running_time_ms;
235
71
    LOG(INFO) << "start kafka consumer: " << _id << ", grp: " << _grp_id
236
71
              << ", max running time(ms): " << left_time;
237
238
71
    int64_t received_rows = 0;
239
71
    int64_t put_rows = 0;
240
71
    int32_t retry_times = 0;
241
71
    Status st = Status::OK();
242
71
    MonotonicStopWatch consumer_watch;
243
71
    MonotonicStopWatch watch;
244
71
    watch.start();
245
741
    while (true) {
246
741
        {
247
741
            std::unique_lock<std::mutex> l(_lock);
248
741
            if (_cancelled) {
249
0
                break;
250
0
            }
251
741
        }
252
253
741
        if (left_time <= 0) {
254
1
            break;
255
1
        }
256
257
740
        bool done = false;
258
        // consume 1 message at a time
259
740
        consumer_watch.start();
260
740
        std::unique_ptr<RdKafka::Message> msg(_k_consumer->consume(1000 /* timeout, ms */));
261
740
        consumer_watch.stop();
262
740
        DorisMetrics::instance()->routine_load_get_msg_count->increment(1);
263
740
        DorisMetrics::instance()->routine_load_get_msg_latency->increment(
264
740
                consumer_watch.elapsed_time() / 1000 / 1000);
265
740
        DBUG_EXECUTE_IF("KafkaDataConsumer.group_consume.out_of_range", {
266
740
            done = true;
267
740
            std::stringstream ss;
268
740
            ss << "Offset out of range"
269
740
               << ", consume partition " << msg->partition() << ", consume offset "
270
740
               << msg->offset();
271
740
            LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << ss.str();
272
740
            st = Status::InternalError<false>(ss.str());
273
740
            break;
274
740
        });
275
740
        switch (msg->err()) {
276
665
        case RdKafka::ERR_NO_ERROR:
277
665
            if (_consuming_partition_ids.count(msg->partition()) <= 0) {
278
0
                _consuming_partition_ids.insert(msg->partition());
279
0
            }
280
665
            DorisMetrics::instance()->routine_load_consume_bytes->increment(msg->len());
281
665
            if (msg->len() == 0) {
282
                // ignore msg with length 0.
283
                // put empty msg into queue will cause the load process shutting down.
284
0
                break;
285
665
            } else if (!queue->controlled_blocking_put(msg.get(),
286
665
                                                       config::blocking_queue_cv_wait_timeout_ms)) {
287
                // queue is shutdown
288
0
                done = true;
289
665
            } else {
290
665
                ++put_rows;
291
665
                msg.release(); // release the ownership, msg will be deleted after being processed
292
665
            }
293
665
            ++received_rows;
294
665
            DorisMetrics::instance()->routine_load_consume_rows->increment(1);
295
665
            break;
296
5
        case RdKafka::ERR__TIMED_OUT:
297
            // leave the status as OK, because this may happened
298
            // if there is no data in kafka.
299
5
            LOG(INFO) << "kafka consume timeout: " << _id;
300
5
            break;
301
0
        case RdKafka::ERR__TRANSPORT:
302
0
            LOG(INFO) << "kafka consume Disconnected: " << _id
303
0
                      << ", retry times: " << retry_times++;
304
0
            if (retry_times <= MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE) {
305
0
                std::this_thread::sleep_for(std::chrono::milliseconds(200));
306
0
                break;
307
0
            }
308
0
            [[fallthrough]];
309
70
        case RdKafka::ERR__PARTITION_EOF: {
310
70
            VLOG_NOTICE << "consumer meet partition eof: " << _id
311
0
                        << " partition offset: " << msg->offset();
312
70
            _consuming_partition_ids.erase(msg->partition());
313
70
            if (!queue->controlled_blocking_put(msg.get(),
314
70
                                                config::blocking_queue_cv_wait_timeout_ms)) {
315
0
                done = true;
316
70
            } else if (_consuming_partition_ids.size() <= 0) {
317
70
                LOG(INFO) << "all partitions meet eof: " << _id;
318
70
                msg.release();
319
70
                done = true;
320
70
            } else {
321
0
                msg.release();
322
0
            }
323
70
            break;
324
0
        }
325
0
        case RdKafka::ERR_OFFSET_OUT_OF_RANGE: {
326
0
            done = true;
327
0
            std::stringstream ss;
328
0
            ss << msg->errstr() << ", consume partition " << msg->partition() << ", consume offset "
329
0
               << msg->offset();
330
0
            LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << ss.str();
331
0
            st = Status::InternalError<false>(ss.str());
332
0
            break;
333
0
        }
334
0
        default:
335
0
            LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr();
336
0
            done = true;
337
0
            st = Status::InternalError<false>(msg->errstr());
338
0
            break;
339
740
        }
340
341
740
        left_time = max_running_time_ms - watch.elapsed_time() / 1000 / 1000;
342
740
        if (done) {
343
70
            break;
344
70
        }
345
740
    }
346
347
71
    LOG(INFO) << "kafka consumer done: " << _id << ", grp: " << _grp_id
348
71
              << ". cancelled: " << _cancelled << ", left time(ms): " << left_time
349
71
              << ", total cost(ms): " << watch.elapsed_time() / 1000 / 1000
350
71
              << ", consume cost(ms): " << consumer_watch.elapsed_time() / 1000 / 1000
351
71
              << ", received rows: " << received_rows << ", put rows: " << put_rows;
352
353
71
    return st;
354
71
}
355
356
74
Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids) {
357
    // create topic conf
358
74
    RdKafka::Conf* tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
359
74
    Defer delete_conf {[tconf]() { delete tconf; }};
360
361
    // create topic
362
74
    std::string errstr;
363
74
    RdKafka::Topic* topic = RdKafka::Topic::create(_k_consumer, _topic, tconf, errstr);
364
74
    if (topic == nullptr) {
365
0
        std::stringstream ss;
366
0
        ss << "failed to create topic: " << errstr;
367
0
        LOG(WARNING) << ss.str();
368
0
        return Status::InternalError(ss.str());
369
0
    }
370
371
74
    Defer delete_topic {[topic]() { delete topic; }};
372
373
    // get topic metadata
374
74
    RdKafka::Metadata* metadata = nullptr;
375
74
    RdKafka::ErrorCode err =
376
74
            _k_consumer->metadata(false /* for this topic */, topic, &metadata, 5000);
377
74
    if (err != RdKafka::ERR_NO_ERROR) {
378
0
        std::stringstream ss;
379
0
        ss << "failed to get partition meta: " << RdKafka::err2str(err);
380
0
        LOG(WARNING) << ss.str();
381
0
        return Status::InternalError(ss.str());
382
0
    }
383
384
74
    Defer delete_meta {[metadata]() { delete metadata; }};
385
386
    // get partition ids
387
74
    RdKafka::Metadata::TopicMetadataIterator it;
388
145
    for (it = metadata->topics()->begin(); it != metadata->topics()->end(); ++it) {
389
74
        if ((*it)->topic() != _topic) {
390
0
            continue;
391
0
        }
392
393
74
        if ((*it)->err() != RdKafka::ERR_NO_ERROR) {
394
3
            std::stringstream ss;
395
3
            ss << "error: " << err2str((*it)->err());
396
3
            if ((*it)->err() == RdKafka::ERR_LEADER_NOT_AVAILABLE) {
397
0
                ss << ", try again";
398
0
            }
399
3
            LOG(WARNING) << ss.str();
400
3
            return Status::InternalError(ss.str());
401
3
        }
402
403
71
        RdKafka::TopicMetadata::PartitionMetadataIterator ip;
404
142
        for (ip = (*it)->partitions()->begin(); ip != (*it)->partitions()->end(); ++ip) {
405
71
            partition_ids->push_back((*ip)->id());
406
71
        }
407
71
    }
408
409
71
    if (partition_ids->empty()) {
410
0
        return Status::InternalError("no partition in this topic");
411
0
    }
412
413
71
    return Status::OK();
414
71
}
415
416
// get offsets of each partition for times.
417
// The input parameter "times" holds <partition, timestamps>
418
// The output parameter "offsets" returns <partition, offsets>
419
//
420
// The returned offset for each partition is the earliest offset whose
421
// timestamp is greater than or equal to the given timestamp in the
422
// corresponding partition.
423
// See librdkafka/rdkafkacpp.h##offsetsForTimes()
424
Status KafkaDataConsumer::get_offsets_for_times(const std::vector<PIntegerPair>& times,
425
1
                                                std::vector<PIntegerPair>* offsets, int timeout) {
426
    // create topic partition
427
1
    std::vector<RdKafka::TopicPartition*> topic_partitions;
428
1
    for (const auto& entry : times) {
429
1
        RdKafka::TopicPartition* tp1 =
430
1
                RdKafka::TopicPartition::create(_topic, entry.key(), entry.val());
431
1
        topic_partitions.push_back(tp1);
432
1
    }
433
    // delete TopicPartition finally
434
1
    Defer delete_tp {[&topic_partitions]() {
435
1
        std::for_each(topic_partitions.begin(), topic_partitions.end(),
436
1
                      [](RdKafka::TopicPartition* tp1) { delete tp1; });
437
1
    }};
438
439
    // get offsets for times
440
1
    RdKafka::ErrorCode err = _k_consumer->offsetsForTimes(topic_partitions, timeout);
441
1
    if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) {
442
0
        std::stringstream ss;
443
0
        ss << "failed to get offsets for times: " << RdKafka::err2str(err);
444
0
        LOG(WARNING) << ss.str();
445
0
        return Status::InternalError(ss.str());
446
0
    }
447
448
1
    for (const auto& topic_partition : topic_partitions) {
449
1
        PIntegerPair pair;
450
1
        pair.set_key(topic_partition->partition());
451
1
        pair.set_val(topic_partition->offset());
452
1
        offsets->push_back(std::move(pair));
453
1
    }
454
455
1
    return Status::OK();
456
1
}
457
458
// get latest offsets for given partitions
459
Status KafkaDataConsumer::get_latest_offsets_for_partitions(
460
        const std::vector<int32_t>& partition_ids, std::vector<PIntegerPair>* offsets,
461
355
        int timeout) {
462
355
    DBUG_EXECUTE_IF("KafkaDataConsumer.get_offsets_for_partitions.timeout", {
463
        // sleep 61s
464
355
        std::this_thread::sleep_for(std::chrono::seconds(61));
465
355
    });
466
355
    MonotonicStopWatch watch;
467
355
    watch.start();
468
355
    for (int32_t partition_id : partition_ids) {
469
355
        int64_t low = 0;
470
355
        int64_t high = 0;
471
355
        auto timeout_ms = timeout - static_cast<int>(watch.elapsed_time() / 1000 / 1000);
472
355
        if (UNLIKELY(timeout_ms <= 0)) {
473
0
            return Status::InternalError("get kafka latest offsets for partitions timeout");
474
0
        }
475
476
355
        RdKafka::ErrorCode err =
477
355
                _k_consumer->query_watermark_offsets(_topic, partition_id, &low, &high, timeout_ms);
478
355
        if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) {
479
0
            std::stringstream ss;
480
0
            ss << "failed to get latest offset for partition: " << partition_id
481
0
               << ", err: " << RdKafka::err2str(err);
482
0
            LOG(WARNING) << ss.str();
483
0
            return Status::InternalError(ss.str());
484
0
        }
485
486
355
        PIntegerPair pair;
487
355
        pair.set_key(partition_id);
488
355
        pair.set_val(high);
489
355
        offsets->push_back(std::move(pair));
490
355
    }
491
492
355
    return Status::OK();
493
355
}
494
495
Status KafkaDataConsumer::get_real_offsets_for_partitions(
496
        const std::vector<PIntegerPair>& offset_flags, std::vector<PIntegerPair>* offsets,
497
65
        int timeout) {
498
65
    DBUG_EXECUTE_IF("KafkaDataConsumer.get_offsets_for_partitions.timeout", {
499
        // sleep 61s
500
65
        std::this_thread::sleep_for(std::chrono::seconds(61));
501
65
    });
502
65
    MonotonicStopWatch watch;
503
65
    watch.start();
504
65
    for (const auto& entry : offset_flags) {
505
65
        PIntegerPair pair;
506
65
        if (UNLIKELY(entry.val() >= 0)) {
507
0
            pair.set_key(entry.key());
508
0
            pair.set_val(entry.val());
509
0
            offsets->push_back(std::move(pair));
510
0
            continue;
511
0
        }
512
513
65
        int64_t low = 0;
514
65
        int64_t high = 0;
515
65
        auto timeout_ms = timeout - static_cast<int>(watch.elapsed_time() / 1000 / 1000);
516
65
        if (UNLIKELY(timeout_ms <= 0)) {
517
0
            return Status::InternalError("get kafka real offsets for partitions timeout");
518
0
        }
519
520
65
        RdKafka::ErrorCode err =
521
65
                _k_consumer->query_watermark_offsets(_topic, entry.key(), &low, &high, timeout_ms);
522
65
        if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) {
523
0
            std::stringstream ss;
524
0
            ss << "failed to get latest offset for partition: " << entry.key()
525
0
               << ", err: " << RdKafka::err2str(err);
526
0
            LOG(WARNING) << ss.str();
527
0
            return Status::InternalError(ss.str());
528
0
        }
529
530
65
        pair.set_key(entry.key());
531
65
        if (entry.val() == -1) {
532
            // OFFSET_END_VAL = -1
533
5
            pair.set_val(high);
534
60
        } else if (entry.val() == -2) {
535
            // OFFSET_BEGINNING_VAL = -2
536
60
            pair.set_val(low);
537
60
        }
538
65
        offsets->push_back(std::move(pair));
539
65
    }
540
541
65
    return Status::OK();
542
65
}
543
544
71
Status KafkaDataConsumer::cancel(std::shared_ptr<StreamLoadContext> ctx) {
545
71
    std::unique_lock<std::mutex> l(_lock);
546
71
    if (!_init) {
547
0
        return Status::InternalError("consumer is not initialized");
548
0
    }
549
550
71
    _cancelled = true;
551
71
    LOG(INFO) << "kafka consumer cancelled. " << _id;
552
71
    return Status::OK();
553
71
}
554
555
1.18k
Status KafkaDataConsumer::reset() {
556
1.18k
    std::unique_lock<std::mutex> l(_lock);
557
1.18k
    _cancelled = false;
558
1.18k
    _k_consumer->unassign();
559
    // reset will be called before this consumer being returned to the pool.
560
    // so update _last_visit_time is reasonable.
561
1.18k
    _last_visit_time = time(nullptr);
562
1.18k
    return Status::OK();
563
1.18k
}
564
565
69
Status KafkaDataConsumer::commit(std::vector<RdKafka::TopicPartition*>& offset) {
566
    // Use async commit so that it will not block for a long time.
567
    // Commit failure has no effect on Doris, subsequent tasks will continue to commit the new offset
568
69
    RdKafka::ErrorCode err = _k_consumer->commitAsync(offset);
569
69
    if (err != RdKafka::ERR_NO_ERROR) {
570
0
        return Status::InternalError("failed to commit kafka offset : {}", RdKafka::err2str(err));
571
0
    }
572
69
    return Status::OK();
573
69
}
574
575
// if the kafka brokers and topic are same,
576
// we considered this consumer as matched, thus can be reused.
577
13.8k
bool KafkaDataConsumer::match(std::shared_ptr<StreamLoadContext> ctx) {
578
13.8k
    if (ctx->load_src_type != TLoadSourceType::KAFKA) {
579
0
        return false;
580
0
    }
581
13.8k
    if (_brokers != ctx->kafka_info->brokers || _topic != ctx->kafka_info->topic) {
582
12.3k
        return false;
583
12.3k
    }
584
    // check properties
585
1.48k
    if (_custom_properties.size() != ctx->kafka_info->properties.size()) {
586
30
        return false;
587
30
    }
588
1.46k
    for (auto& item : ctx->kafka_info->properties) {
589
1.46k
        std::unordered_map<std::string, std::string>::const_iterator itr =
590
1.46k
                _custom_properties.find(item.first);
591
1.46k
        if (itr == _custom_properties.end()) {
592
0
            return false;
593
0
        }
594
595
1.46k
        if (itr->second != item.second) {
596
900
            return false;
597
900
        }
598
1.46k
    }
599
558
    return true;
600
1.45k
}
601
602
} // end namespace doris