Coverage Report

Created: 2026-03-16 03:15

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/routine_load/data_consumer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/routine_load/data_consumer.h"
19
20
#include <absl/strings/str_split.h>
21
#include <gen_cpp/Types_types.h>
22
#include <gen_cpp/internal_service.pb.h>
23
#include <librdkafka/rdkafkacpp.h>
24
25
#include <algorithm>
26
// IWYU pragma: no_include <bits/chrono.h>
27
#include <chrono> // IWYU pragma: keep
28
#include <string>
29
#include <thread>
30
#include <utility>
31
#include <vector>
32
33
#include "common/config.h"
34
#include "common/metrics/doris_metrics.h"
35
#include "common/status.h"
36
#include "runtime/aws_msk_iam_auth.h"
37
#include "runtime/exec_env.h"
38
#include "runtime/small_file_mgr.h"
39
#include "service/backend_options.h"
40
#include "util/blocking_queue.hpp"
41
#include "util/debug_points.h"
42
#include "util/defer_op.h"
43
#include "util/stopwatch.hpp"
44
#include "util/string_util.h"
45
#include "util/uid_util.h"
46
47
namespace doris {
48
49
static const std::string PROP_GROUP_ID = "group.id";
50
// init kafka consumer will only set common configs such as
51
// brokers, groupid
52
78
Status KafkaDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) {
53
78
    std::unique_lock<std::mutex> l(_lock);
54
78
    if (_init) {
55
        // this consumer has already been initialized.
56
0
        return Status::OK();
57
0
    }
58
59
78
    RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
60
61
    // conf has to be deleted finally
62
78
    Defer delete_conf {[conf]() { delete conf; }};
63
64
78
    std::string errstr;
65
1.01k
    auto set_conf = [&conf, &errstr](const std::string& conf_key, const std::string& conf_val) {
66
1.01k
        RdKafka::Conf::ConfResult res = conf->set(conf_key, conf_val, errstr);
67
1.01k
        if (res == RdKafka::Conf::CONF_UNKNOWN) {
68
            // ignore unknown config
69
0
            return Status::OK();
70
1.01k
        } else if (errstr.find("not supported") != std::string::npos) {
71
            // some java-only properties may be passed to here, and librdkafak will return 'xxx' not supported
72
            // ignore it
73
0
            return Status::OK();
74
1.01k
        } else if (res != RdKafka::Conf::CONF_OK) {
75
0
            std::stringstream ss;
76
0
            ss << "PAUSE: failed to set '" << conf_key << "', value: '" << conf_val
77
0
               << "', err: " << errstr;
78
0
            LOG(WARNING) << ss.str();
79
0
            return Status::InternalError(ss.str());
80
0
        }
81
1.01k
        VLOG_NOTICE << "set " << conf_key << ": " << conf_val;
82
1.01k
        return Status::OK();
83
1.01k
    };
84
85
78
    RETURN_IF_ERROR(set_conf("metadata.broker.list", ctx->kafka_info->brokers));
86
78
    RETURN_IF_ERROR(set_conf("enable.partition.eof", "true"));
87
78
    RETURN_IF_ERROR(set_conf("enable.auto.offset.store", "false"));
88
    // TODO: set it larger than 0 after we set rd_kafka_conf_set_stats_cb()
89
78
    RETURN_IF_ERROR(set_conf("statistics.interval.ms", "0"));
90
78
    RETURN_IF_ERROR(set_conf("auto.offset.reset", "error"));
91
78
    RETURN_IF_ERROR(set_conf("socket.keepalive.enable", "true"));
92
78
    RETURN_IF_ERROR(set_conf("reconnect.backoff.ms", "100"));
93
78
    RETURN_IF_ERROR(set_conf("reconnect.backoff.max.ms", "10000"));
94
78
    RETURN_IF_ERROR(set_conf("api.version.request", config::kafka_api_version_request));
95
78
    RETURN_IF_ERROR(set_conf("api.version.fallback.ms", "0"));
96
78
    RETURN_IF_ERROR(set_conf("broker.version.fallback", config::kafka_broker_version_fallback));
97
78
    RETURN_IF_ERROR(set_conf("broker.address.ttl", "0"));
98
78
    if (config::kafka_debug != "disable") {
99
0
        RETURN_IF_ERROR(set_conf("debug", config::kafka_debug));
100
0
    }
101
102
78
    for (auto& item : ctx->kafka_info->properties) {
103
74
        _custom_properties.emplace(item.first, item.second);
104
105
        // AWS properties (aws.*) are Doris-specific for MSK IAM authentication
106
        // and should not be passed to librdkafka
107
74
        if (starts_with(item.second, "AWS:")) {
108
0
            LOG(INFO) << "Skipping AWS property for librdkafka: " << item.first;
109
0
            continue;
110
0
        }
111
112
74
        if (starts_with(item.second, "FILE:")) {
113
            // file property should has format: FILE:file_id:md5
114
0
            std::vector<std::string> parts =
115
0
                    absl::StrSplit(item.second, ":", absl::SkipWhitespace());
116
0
            if (parts.size() != 3) {
117
0
                return Status::InternalError("PAUSE: Invalid file property of kafka: " +
118
0
                                             item.second);
119
0
            }
120
0
            int64_t file_id = std::stol(parts[1]);
121
0
            std::string file_path;
122
0
            Status st = ctx->exec_env()->small_file_mgr()->get_file(file_id, parts[2], &file_path);
123
0
            if (!st.ok()) {
124
0
                return Status::InternalError("PAUSE: failed to get file for config: {}, error: {}",
125
0
                                             item.first, st.to_string());
126
0
            }
127
0
            RETURN_IF_ERROR(set_conf(item.first, file_path));
128
74
        } else {
129
74
            RETURN_IF_ERROR(set_conf(item.first, item.second));
130
74
        }
131
74
    }
132
133
    // if not specified group id, generate a random one.
134
    // ATTN: In the new version, we have set a group.id on the FE side for jobs that have not set a groupid,
135
    // but in order to ensure compatibility, we still do a check here.
136
78
    if (!_custom_properties.contains(PROP_GROUP_ID)) {
137
5
        std::stringstream ss;
138
5
        ss << BackendOptions::get_localhost() << "_";
139
5
        std::string group_id = ss.str() + UniqueId::gen_uid().to_string();
140
5
        RETURN_IF_ERROR(set_conf(PROP_GROUP_ID, group_id));
141
5
        _custom_properties.emplace(PROP_GROUP_ID, group_id);
142
5
    }
143
78
    LOG(INFO) << "init kafka consumer with group id: " << _custom_properties[PROP_GROUP_ID];
144
145
78
    if (conf->set("event_cb", &_k_event_cb, errstr) != RdKafka::Conf::CONF_OK) {
146
0
        std::stringstream ss;
147
0
        ss << "PAUSE: failed to set 'event_cb'";
148
0
        LOG(WARNING) << ss.str();
149
0
        return Status::InternalError(ss.str());
150
0
    }
151
152
    // Set up AWS MSK IAM authentication if configured
153
78
    _aws_msk_oauth_callback = AwsMskIamOAuthCallback::create_from_properties(
154
78
            _custom_properties, ctx->kafka_info->brokers);
155
78
    if (_aws_msk_oauth_callback) {
156
0
        if (conf->set("oauthbearer_token_refresh_cb", _aws_msk_oauth_callback.get(), errstr) !=
157
0
            RdKafka::Conf::CONF_OK) {
158
0
            LOG(WARNING) << "PAUSE: failed to set OAuth callback: " << errstr;
159
0
            return Status::InternalError("PAUSE: failed to set OAuth callback: " + errstr);
160
0
        }
161
0
        LOG(INFO) << "AWS MSK IAM authentication enabled successfully";
162
0
    }
163
164
    // create consumer
165
78
    _k_consumer = RdKafka::KafkaConsumer::create(conf, errstr);
166
78
    if (!_k_consumer) {
167
0
        LOG(WARNING) << "PAUSE: failed to create kafka consumer: " << errstr;
168
0
        return Status::InternalError("PAUSE: failed to create kafka consumer: " + errstr);
169
0
    }
170
171
78
    VLOG_NOTICE << "finished to init kafka consumer. " << ctx->brief();
172
173
78
    _init = true;
174
78
    return Status::OK();
175
78
}
176
177
Status KafkaDataConsumer::assign_topic_partitions(
178
        const std::map<int32_t, int64_t>& begin_partition_offset, const std::string& topic,
179
71
        std::shared_ptr<StreamLoadContext> ctx) {
180
71
    DCHECK(_k_consumer);
181
    // create TopicPartitions
182
71
    std::stringstream ss;
183
71
    std::vector<RdKafka::TopicPartition*> topic_partitions;
184
71
    for (auto& entry : begin_partition_offset) {
185
71
        RdKafka::TopicPartition* tp1 =
186
71
                RdKafka::TopicPartition::create(topic, entry.first, entry.second);
187
71
        topic_partitions.push_back(tp1);
188
71
        _consuming_partition_ids.insert(entry.first);
189
71
        ss << "[" << entry.first << ": " << entry.second << "] ";
190
71
    }
191
192
71
    LOG(INFO) << "consumer: " << _id << ", grp: " << _grp_id
193
71
              << " assign topic partitions: " << topic << ", " << ss.str();
194
195
    // delete TopicPartition finally
196
71
    Defer delete_tp {[&topic_partitions]() {
197
71
        std::for_each(topic_partitions.begin(), topic_partitions.end(),
198
71
                      [](RdKafka::TopicPartition* tp1) { delete tp1; });
199
71
    }};
200
201
    // assign partition
202
71
    RdKafka::ErrorCode err = _k_consumer->assign(topic_partitions);
203
71
    if (err) {
204
0
        LOG(WARNING) << "failed to assign topic partitions: " << ctx->brief(true)
205
0
                     << ", err: " << RdKafka::err2str(err);
206
0
        _k_consumer->unassign();
207
0
        return Status::InternalError("failed to assign topic partitions");
208
0
    }
209
210
71
    return Status::OK();
211
71
}
212
213
Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
214
71
                                        int64_t max_running_time_ms) {
215
71
    static constexpr int MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE = 3;
216
71
    int64_t left_time = max_running_time_ms;
217
71
    LOG(INFO) << "start kafka consumer: " << _id << ", grp: " << _grp_id
218
71
              << ", max running time(ms): " << left_time;
219
220
71
    int64_t received_rows = 0;
221
71
    int64_t put_rows = 0;
222
71
    int32_t retry_times = 0;
223
71
    Status st = Status::OK();
224
71
    MonotonicStopWatch consumer_watch;
225
71
    MonotonicStopWatch watch;
226
71
    watch.start();
227
741
    while (true) {
228
741
        {
229
741
            std::unique_lock<std::mutex> l(_lock);
230
741
            if (_cancelled) {
231
0
                break;
232
0
            }
233
741
        }
234
235
741
        if (left_time <= 0) {
236
1
            break;
237
1
        }
238
239
740
        bool done = false;
240
        // consume 1 message at a time
241
740
        consumer_watch.start();
242
740
        std::unique_ptr<RdKafka::Message> msg(_k_consumer->consume(1000 /* timeout, ms */));
243
740
        consumer_watch.stop();
244
740
        DorisMetrics::instance()->routine_load_get_msg_count->increment(1);
245
740
        DorisMetrics::instance()->routine_load_get_msg_latency->increment(
246
740
                consumer_watch.elapsed_time() / 1000 / 1000);
247
740
        DBUG_EXECUTE_IF("KafkaDataConsumer.group_consume.out_of_range", {
248
740
            done = true;
249
740
            std::stringstream ss;
250
740
            ss << "Offset out of range"
251
740
               << ", consume partition " << msg->partition() << ", consume offset "
252
740
               << msg->offset();
253
740
            LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << ss.str();
254
740
            st = Status::InternalError<false>(ss.str());
255
740
            break;
256
740
        });
257
740
        switch (msg->err()) {
258
665
        case RdKafka::ERR_NO_ERROR:
259
665
            if (_consuming_partition_ids.count(msg->partition()) <= 0) {
260
0
                _consuming_partition_ids.insert(msg->partition());
261
0
            }
262
665
            DorisMetrics::instance()->routine_load_consume_bytes->increment(msg->len());
263
665
            if (msg->len() == 0) {
264
                // ignore msg with length 0.
265
                // put empty msg into queue will cause the load process shutting down.
266
0
                break;
267
665
            } else if (!queue->controlled_blocking_put(msg.get(),
268
665
                                                       config::blocking_queue_cv_wait_timeout_ms)) {
269
                // queue is shutdown
270
0
                done = true;
271
665
            } else {
272
665
                ++put_rows;
273
665
                msg.release(); // release the ownership, msg will be deleted after being processed
274
665
            }
275
665
            ++received_rows;
276
665
            DorisMetrics::instance()->routine_load_consume_rows->increment(1);
277
665
            break;
278
5
        case RdKafka::ERR__TIMED_OUT:
279
            // leave the status as OK, because this may happened
280
            // if there is no data in kafka.
281
5
            LOG(INFO) << "kafka consume timeout: " << _id;
282
5
            break;
283
0
        case RdKafka::ERR__TRANSPORT:
284
0
            LOG(INFO) << "kafka consume Disconnected: " << _id
285
0
                      << ", retry times: " << retry_times++;
286
0
            if (retry_times <= MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE) {
287
0
                std::this_thread::sleep_for(std::chrono::milliseconds(200));
288
0
                break;
289
0
            }
290
0
            [[fallthrough]];
291
70
        case RdKafka::ERR__PARTITION_EOF: {
292
70
            VLOG_NOTICE << "consumer meet partition eof: " << _id
293
0
                        << " partition offset: " << msg->offset();
294
70
            _consuming_partition_ids.erase(msg->partition());
295
70
            if (!queue->controlled_blocking_put(msg.get(),
296
70
                                                config::blocking_queue_cv_wait_timeout_ms)) {
297
0
                done = true;
298
70
            } else if (_consuming_partition_ids.size() <= 0) {
299
70
                LOG(INFO) << "all partitions meet eof: " << _id;
300
70
                msg.release();
301
70
                done = true;
302
70
            } else {
303
0
                msg.release();
304
0
            }
305
70
            break;
306
0
        }
307
0
        case RdKafka::ERR_OFFSET_OUT_OF_RANGE: {
308
0
            done = true;
309
0
            std::stringstream ss;
310
0
            ss << msg->errstr() << ", consume partition " << msg->partition() << ", consume offset "
311
0
               << msg->offset();
312
0
            LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << ss.str();
313
0
            st = Status::InternalError<false>(ss.str());
314
0
            break;
315
0
        }
316
0
        default:
317
0
            LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr();
318
0
            done = true;
319
0
            st = Status::InternalError<false>(msg->errstr());
320
0
            break;
321
740
        }
322
323
740
        left_time = max_running_time_ms - watch.elapsed_time() / 1000 / 1000;
324
740
        if (done) {
325
70
            break;
326
70
        }
327
740
    }
328
329
71
    LOG(INFO) << "kafka consumer done: " << _id << ", grp: " << _grp_id
330
71
              << ". cancelled: " << _cancelled << ", left time(ms): " << left_time
331
71
              << ", total cost(ms): " << watch.elapsed_time() / 1000 / 1000
332
71
              << ", consume cost(ms): " << consumer_watch.elapsed_time() / 1000 / 1000
333
71
              << ", received rows: " << received_rows << ", put rows: " << put_rows;
334
335
71
    return st;
336
71
}
337
338
85
Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids) {
339
85
    if (_aws_msk_oauth_callback) {
340
        // Trigger OAuth token refresh by polling the event loop
341
        // librdkafka's OAUTHBEARER callback is only triggered through consume()/poll()
342
        // Without this, metadata() will fail because broker is waiting for OAuth token
343
0
        LOG(INFO) << "Polling to trigger OAuth token refresh before metadata request";
344
0
        int max_poll_attempts = 10; // 10 × 500ms = 5 seconds max
345
0
        for (int i = 0; i < max_poll_attempts; i++) {
346
0
            RdKafka::Message* msg = _k_consumer->consume(500);
347
0
            if (msg) {
348
0
                delete msg; // We don't expect messages before partition assignment
349
0
            }
350
0
        }
351
0
    }
352
353
    // create topic conf
354
85
    RdKafka::Conf* tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
355
85
    Defer delete_conf {[tconf]() { delete tconf; }};
356
357
    // create topic
358
85
    std::string errstr;
359
85
    RdKafka::Topic* topic = RdKafka::Topic::create(_k_consumer, _topic, tconf, errstr);
360
85
    if (topic == nullptr) {
361
0
        std::stringstream ss;
362
0
        ss << "failed to create topic: " << errstr;
363
0
        LOG(WARNING) << ss.str();
364
0
        return Status::InternalError(ss.str());
365
0
    }
366
367
85
    Defer delete_topic {[topic]() { delete topic; }};
368
369
    // get topic metadata
370
85
    RdKafka::Metadata* metadata = nullptr;
371
85
    RdKafka::ErrorCode err =
372
85
            _k_consumer->metadata(false /* for this topic */, topic, &metadata, 5000);
373
85
    if (err != RdKafka::ERR_NO_ERROR) {
374
0
        std::stringstream ss;
375
0
        ss << "failed to get partition meta: " << RdKafka::err2str(err);
376
0
        LOG(WARNING) << ss.str();
377
0
        return Status::InternalError(ss.str());
378
0
    }
379
380
85
    Defer delete_meta {[metadata]() { delete metadata; }};
381
382
    // get partition ids
383
85
    RdKafka::Metadata::TopicMetadataIterator it;
384
167
    for (it = metadata->topics()->begin(); it != metadata->topics()->end(); ++it) {
385
85
        if ((*it)->topic() != _topic) {
386
0
            continue;
387
0
        }
388
389
85
        if ((*it)->err() != RdKafka::ERR_NO_ERROR) {
390
3
            std::stringstream ss;
391
3
            ss << "error: " << err2str((*it)->err());
392
3
            if ((*it)->err() == RdKafka::ERR_LEADER_NOT_AVAILABLE) {
393
0
                ss << ", try again";
394
0
            }
395
3
            LOG(WARNING) << ss.str();
396
3
            return Status::InternalError(ss.str());
397
3
        }
398
399
82
        RdKafka::TopicMetadata::PartitionMetadataIterator ip;
400
164
        for (ip = (*it)->partitions()->begin(); ip != (*it)->partitions()->end(); ++ip) {
401
82
            partition_ids->push_back((*ip)->id());
402
82
        }
403
82
    }
404
405
82
    if (partition_ids->empty()) {
406
0
        return Status::InternalError("no partition in this topic");
407
0
    }
408
409
82
    return Status::OK();
410
82
}
411
412
// get offsets of each partition for times.
413
// The input parameter "times" holds <partition, timestamps>
414
// The output parameter "offsets" returns <partition, offsets>
415
//
416
// The returned offset for each partition is the earliest offset whose
417
// timestamp is greater than or equal to the given timestamp in the
418
// corresponding partition.
419
// See librdkafka/rdkafkacpp.h##offsetsForTimes()
420
Status KafkaDataConsumer::get_offsets_for_times(const std::vector<PIntegerPair>& times,
421
1
                                                std::vector<PIntegerPair>* offsets, int timeout) {
422
    // create topic partition
423
1
    std::vector<RdKafka::TopicPartition*> topic_partitions;
424
1
    for (const auto& entry : times) {
425
1
        RdKafka::TopicPartition* tp1 =
426
1
                RdKafka::TopicPartition::create(_topic, entry.key(), entry.val());
427
1
        topic_partitions.push_back(tp1);
428
1
    }
429
    // delete TopicPartition finally
430
1
    Defer delete_tp {[&topic_partitions]() {
431
1
        std::for_each(topic_partitions.begin(), topic_partitions.end(),
432
1
                      [](RdKafka::TopicPartition* tp1) { delete tp1; });
433
1
    }};
434
435
    // get offsets for times
436
1
    RdKafka::ErrorCode err = _k_consumer->offsetsForTimes(topic_partitions, timeout);
437
1
    if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) {
438
0
        std::stringstream ss;
439
0
        ss << "failed to get offsets for times: " << RdKafka::err2str(err);
440
0
        LOG(WARNING) << ss.str();
441
0
        return Status::InternalError(ss.str());
442
0
    }
443
444
1
    for (const auto& topic_partition : topic_partitions) {
445
1
        PIntegerPair pair;
446
1
        pair.set_key(topic_partition->partition());
447
1
        pair.set_val(topic_partition->offset());
448
1
        offsets->push_back(std::move(pair));
449
1
    }
450
451
1
    return Status::OK();
452
1
}
453
454
// get latest offsets for given partitions
455
Status KafkaDataConsumer::get_latest_offsets_for_partitions(
456
        const std::vector<int32_t>& partition_ids, std::vector<PIntegerPair>* offsets,
457
2.92k
        int timeout) {
458
2.92k
    DBUG_EXECUTE_IF("KafkaDataConsumer.get_latest_offsets_for_partitions.timeout", {
459
        // sleep 60s
460
2.92k
        std::this_thread::sleep_for(std::chrono::seconds(60));
461
2.92k
    });
462
2.92k
    MonotonicStopWatch watch;
463
2.92k
    watch.start();
464
2.92k
    for (int32_t partition_id : partition_ids) {
465
2.92k
        int64_t low = 0;
466
2.92k
        int64_t high = 0;
467
2.92k
        auto timeout_ms = timeout - static_cast<int>(watch.elapsed_time() / 1000 / 1000);
468
2.92k
        if (UNLIKELY(timeout_ms <= 0)) {
469
0
            return Status::InternalError("get kafka latest offsets for partitions timeout");
470
0
        }
471
472
2.92k
        RdKafka::ErrorCode err =
473
2.92k
                _k_consumer->query_watermark_offsets(_topic, partition_id, &low, &high, timeout_ms);
474
2.92k
        if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) {
475
0
            std::stringstream ss;
476
0
            ss << "failed to get latest offset for partition: " << partition_id
477
0
               << ", err: " << RdKafka::err2str(err);
478
0
            LOG(WARNING) << ss.str();
479
0
            return Status::InternalError(ss.str());
480
0
        }
481
482
2.92k
        PIntegerPair pair;
483
2.92k
        pair.set_key(partition_id);
484
2.92k
        pair.set_val(high);
485
2.92k
        offsets->push_back(std::move(pair));
486
2.92k
    }
487
488
2.92k
    return Status::OK();
489
2.92k
}
490
491
Status KafkaDataConsumer::get_real_offsets_for_partitions(
492
        const std::vector<PIntegerPair>& offset_flags, std::vector<PIntegerPair>* offsets,
493
65
        int timeout) {
494
65
    MonotonicStopWatch watch;
495
65
    watch.start();
496
65
    for (const auto& entry : offset_flags) {
497
65
        PIntegerPair pair;
498
65
        if (UNLIKELY(entry.val() >= 0)) {
499
0
            pair.set_key(entry.key());
500
0
            pair.set_val(entry.val());
501
0
            offsets->push_back(std::move(pair));
502
0
            continue;
503
0
        }
504
505
65
        int64_t low = 0;
506
65
        int64_t high = 0;
507
65
        auto timeout_ms = timeout - static_cast<int>(watch.elapsed_time() / 1000 / 1000);
508
65
        if (UNLIKELY(timeout_ms <= 0)) {
509
0
            return Status::InternalError("get kafka real offsets for partitions timeout");
510
0
        }
511
512
65
        RdKafka::ErrorCode err =
513
65
                _k_consumer->query_watermark_offsets(_topic, entry.key(), &low, &high, timeout_ms);
514
65
        if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) {
515
0
            std::stringstream ss;
516
0
            ss << "failed to get latest offset for partition: " << entry.key()
517
0
               << ", err: " << RdKafka::err2str(err);
518
0
            LOG(WARNING) << ss.str();
519
0
            return Status::InternalError(ss.str());
520
0
        }
521
522
65
        pair.set_key(entry.key());
523
65
        if (entry.val() == -1) {
524
            // OFFSET_END_VAL = -1
525
5
            pair.set_val(high);
526
60
        } else if (entry.val() == -2) {
527
            // OFFSET_BEGINNING_VAL = -2
528
60
            pair.set_val(low);
529
60
        }
530
65
        offsets->push_back(std::move(pair));
531
65
    }
532
533
65
    return Status::OK();
534
65
}
535
536
71
Status KafkaDataConsumer::cancel(std::shared_ptr<StreamLoadContext> ctx) {
537
71
    std::unique_lock<std::mutex> l(_lock);
538
71
    if (!_init) {
539
0
        return Status::InternalError("consumer is not initialized");
540
0
    }
541
542
71
    _cancelled = true;
543
71
    LOG(INFO) << "kafka consumer cancelled. " << _id;
544
71
    return Status::OK();
545
71
}
546
547
6.35k
Status KafkaDataConsumer::reset() {
548
6.35k
    std::unique_lock<std::mutex> l(_lock);
549
6.35k
    _cancelled = false;
550
6.35k
    _k_consumer->unassign();
551
    // reset will be called before this consumer being returned to the pool.
552
    // so update _last_visit_time is reasonable.
553
6.35k
    _last_visit_time = time(nullptr);
554
6.35k
    return Status::OK();
555
6.35k
}
556
557
69
Status KafkaDataConsumer::commit(std::vector<RdKafka::TopicPartition*>& offset) {
558
    // Use async commit so that it will not block for a long time.
559
    // Commit failure has no effect on Doris, subsequent tasks will continue to commit the new offset
560
69
    RdKafka::ErrorCode err = _k_consumer->commitAsync(offset);
561
69
    if (err != RdKafka::ERR_NO_ERROR) {
562
0
        return Status::InternalError("failed to commit kafka offset : {}", RdKafka::err2str(err));
563
0
    }
564
69
    return Status::OK();
565
69
}
566
567
// if the kafka brokers and topic are same,
568
// we considered this consumer as matched, thus can be reused.
569
102k
bool KafkaDataConsumer::match(std::shared_ptr<StreamLoadContext> ctx) {
570
102k
    if (ctx->load_src_type != TLoadSourceType::KAFKA) {
571
0
        return false;
572
0
    }
573
102k
    if (_brokers != ctx->kafka_info->brokers || _topic != ctx->kafka_info->topic) {
574
97.5k
        return false;
575
97.5k
    }
576
    // check properties
577
4.81k
    if (_custom_properties.size() != ctx->kafka_info->properties.size()) {
578
30
        return false;
579
30
    }
580
4.78k
    for (auto& item : ctx->kafka_info->properties) {
581
4.78k
        std::unordered_map<std::string, std::string>::const_iterator itr =
582
4.78k
                _custom_properties.find(item.first);
583
4.78k
        if (itr == _custom_properties.end()) {
584
0
            return false;
585
0
        }
586
587
4.78k
        if (itr->second != item.second) {
588
1.64k
            return false;
589
1.64k
        }
590
4.78k
    }
591
3.14k
    return true;
592
4.78k
}
593
594
} // end namespace doris