Coverage Report

Created: 2026-03-14 06:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/routine_load/data_consumer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/routine_load/data_consumer.h"
19
20
#include <absl/strings/str_split.h>
21
#include <gen_cpp/Types_types.h>
22
#include <gen_cpp/internal_service.pb.h>
23
#include <librdkafka/rdkafkacpp.h>
24
25
#include <algorithm>
26
// IWYU pragma: no_include <bits/chrono.h>
27
#include <chrono> // IWYU pragma: keep
28
#include <string>
29
#include <thread>
30
#include <utility>
31
#include <vector>
32
33
#include "common/config.h"
34
#include "common/metrics/doris_metrics.h"
35
#include "common/status.h"
36
#include "runtime/exec_env.h"
37
#include "runtime/small_file_mgr.h"
38
#include "service/backend_options.h"
39
#include "util/blocking_queue.hpp"
40
#include "util/debug_points.h"
41
#include "util/defer_op.h"
42
#include "util/stopwatch.hpp"
43
#include "util/string_util.h"
44
#include "util/uid_util.h"
45
46
namespace doris {
47
48
static const std::string PROP_GROUP_ID = "group.id";
49
// init kafka consumer will only set common configs such as
50
// brokers, groupid
51
2
Status KafkaDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) {
52
2
    std::unique_lock<std::mutex> l(_lock);
53
2
    if (_init) {
54
        // this consumer has already been initialized.
55
0
        return Status::OK();
56
0
    }
57
58
2
    RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
59
60
    // conf has to be deleted finally
61
2
    Defer delete_conf {[conf]() { delete conf; }};
62
63
2
    std::string errstr;
64
26
    auto set_conf = [&conf, &errstr](const std::string& conf_key, const std::string& conf_val) {
65
26
        RdKafka::Conf::ConfResult res = conf->set(conf_key, conf_val, errstr);
66
26
        if (res == RdKafka::Conf::CONF_UNKNOWN) {
67
            // ignore unknown config
68
0
            return Status::OK();
69
26
        } else if (errstr.find("not supported") != std::string::npos) {
70
            // some java-only properties may be passed to here, and librdkafak will return 'xxx' not supported
71
            // ignore it
72
0
            return Status::OK();
73
26
        } else if (res != RdKafka::Conf::CONF_OK) {
74
0
            std::stringstream ss;
75
0
            ss << "PAUSE: failed to set '" << conf_key << "', value: '" << conf_val
76
0
               << "', err: " << errstr;
77
0
            LOG(WARNING) << ss.str();
78
0
            return Status::InternalError(ss.str());
79
0
        }
80
26
        VLOG_NOTICE << "set " << conf_key << ": " << conf_val;
81
26
        return Status::OK();
82
26
    };
83
84
2
    RETURN_IF_ERROR(set_conf("metadata.broker.list", ctx->kafka_info->brokers));
85
2
    RETURN_IF_ERROR(set_conf("enable.partition.eof", "true"));
86
2
    RETURN_IF_ERROR(set_conf("enable.auto.offset.store", "false"));
87
    // TODO: set it larger than 0 after we set rd_kafka_conf_set_stats_cb()
88
2
    RETURN_IF_ERROR(set_conf("statistics.interval.ms", "0"));
89
2
    RETURN_IF_ERROR(set_conf("auto.offset.reset", "error"));
90
2
    RETURN_IF_ERROR(set_conf("socket.keepalive.enable", "true"));
91
2
    RETURN_IF_ERROR(set_conf("reconnect.backoff.ms", "100"));
92
2
    RETURN_IF_ERROR(set_conf("reconnect.backoff.max.ms", "10000"));
93
2
    RETURN_IF_ERROR(set_conf("api.version.request", config::kafka_api_version_request));
94
2
    RETURN_IF_ERROR(set_conf("api.version.fallback.ms", "0"));
95
2
    RETURN_IF_ERROR(set_conf("broker.version.fallback", config::kafka_broker_version_fallback));
96
2
    RETURN_IF_ERROR(set_conf("broker.address.ttl", "0"));
97
2
    if (config::kafka_debug != "disable") {
98
0
        RETURN_IF_ERROR(set_conf("debug", config::kafka_debug));
99
0
    }
100
101
2
    for (auto& item : ctx->kafka_info->properties) {
102
0
        if (starts_with(item.second, "FILE:")) {
103
            // file property should has format: FILE:file_id:md5
104
0
            std::vector<std::string> parts =
105
0
                    absl::StrSplit(item.second, ":", absl::SkipWhitespace());
106
0
            if (parts.size() != 3) {
107
0
                return Status::InternalError("PAUSE: Invalid file property of kafka: " +
108
0
                                             item.second);
109
0
            }
110
0
            int64_t file_id = std::stol(parts[1]);
111
0
            std::string file_path;
112
0
            Status st = ctx->exec_env()->small_file_mgr()->get_file(file_id, parts[2], &file_path);
113
0
            if (!st.ok()) {
114
0
                return Status::InternalError("PAUSE: failed to get file for config: {}, error: {}",
115
0
                                             item.first, st.to_string());
116
0
            }
117
0
            RETURN_IF_ERROR(set_conf(item.first, file_path));
118
0
        } else {
119
0
            RETURN_IF_ERROR(set_conf(item.first, item.second));
120
0
        }
121
0
        _custom_properties.emplace(item.first, item.second);
122
0
    }
123
124
    // if not specified group id, generate a random one.
125
    // ATTN: In the new version, we have set a group.id on the FE side for jobs that have not set a groupid,
126
    // but in order to ensure compatibility, we still do a check here.
127
2
    if (_custom_properties.find(PROP_GROUP_ID) == _custom_properties.end()) {
128
2
        std::stringstream ss;
129
2
        ss << BackendOptions::get_localhost() << "_";
130
2
        std::string group_id = ss.str() + UniqueId::gen_uid().to_string();
131
2
        RETURN_IF_ERROR(set_conf(PROP_GROUP_ID, group_id));
132
2
        _custom_properties.emplace(PROP_GROUP_ID, group_id);
133
2
    }
134
2
    LOG(INFO) << "init kafka consumer with group id: " << _custom_properties[PROP_GROUP_ID];
135
136
2
    if (conf->set("event_cb", &_k_event_cb, errstr) != RdKafka::Conf::CONF_OK) {
137
0
        std::stringstream ss;
138
0
        ss << "PAUSE: failed to set 'event_cb'";
139
0
        LOG(WARNING) << ss.str();
140
0
        return Status::InternalError(ss.str());
141
0
    }
142
143
    // create consumer
144
2
    _k_consumer = RdKafka::KafkaConsumer::create(conf, errstr);
145
2
    if (!_k_consumer) {
146
0
        LOG(WARNING) << "PAUSE: failed to create kafka consumer: " << errstr;
147
0
        return Status::InternalError("PAUSE: failed to create kafka consumer: " + errstr);
148
0
    }
149
150
2
    VLOG_NOTICE << "finished to init kafka consumer. " << ctx->brief();
151
152
2
    _init = true;
153
2
    return Status::OK();
154
2
}
155
156
Status KafkaDataConsumer::assign_topic_partitions(
157
        const std::map<int32_t, int64_t>& begin_partition_offset, const std::string& topic,
158
1
        std::shared_ptr<StreamLoadContext> ctx) {
159
1
    DCHECK(_k_consumer);
160
    // create TopicPartitions
161
1
    std::stringstream ss;
162
1
    std::vector<RdKafka::TopicPartition*> topic_partitions;
163
1
    for (auto& entry : begin_partition_offset) {
164
1
        RdKafka::TopicPartition* tp1 =
165
1
                RdKafka::TopicPartition::create(topic, entry.first, entry.second);
166
1
        topic_partitions.push_back(tp1);
167
1
        _consuming_partition_ids.insert(entry.first);
168
1
        ss << "[" << entry.first << ": " << entry.second << "] ";
169
1
    }
170
171
1
    LOG(INFO) << "consumer: " << _id << ", grp: " << _grp_id
172
1
              << " assign topic partitions: " << topic << ", " << ss.str();
173
174
    // delete TopicPartition finally
175
1
    Defer delete_tp {[&topic_partitions]() {
176
1
        std::for_each(topic_partitions.begin(), topic_partitions.end(),
177
1
                      [](RdKafka::TopicPartition* tp1) { delete tp1; });
178
1
    }};
179
180
    // assign partition
181
1
    RdKafka::ErrorCode err = _k_consumer->assign(topic_partitions);
182
1
    if (err) {
183
0
        LOG(WARNING) << "failed to assign topic partitions: " << ctx->brief(true)
184
0
                     << ", err: " << RdKafka::err2str(err);
185
0
        _k_consumer->unassign();
186
0
        return Status::InternalError("failed to assign topic partitions");
187
0
    }
188
189
1
    return Status::OK();
190
1
}
191
192
Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
193
1
                                        int64_t max_running_time_ms) {
194
1
    static constexpr int MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE = 3;
195
1
    int64_t left_time = max_running_time_ms;
196
1
    LOG(INFO) << "start kafka consumer: " << _id << ", grp: " << _grp_id
197
1
              << ", max running time(ms): " << left_time;
198
199
1
    int64_t received_rows = 0;
200
1
    int64_t put_rows = 0;
201
1
    int32_t retry_times = 0;
202
1
    Status st = Status::OK();
203
1
    MonotonicStopWatch consumer_watch;
204
1
    MonotonicStopWatch watch;
205
1
    watch.start();
206
6
    while (true) {
207
6
        {
208
6
            std::unique_lock<std::mutex> l(_lock);
209
6
            if (_cancelled) {
210
0
                break;
211
0
            }
212
6
        }
213
214
6
        if (left_time <= 0) {
215
1
            break;
216
1
        }
217
218
5
        bool done = false;
219
        // consume 1 message at a time
220
5
        consumer_watch.start();
221
5
        std::unique_ptr<RdKafka::Message> msg(_k_consumer->consume(1000 /* timeout, ms */));
222
5
        consumer_watch.stop();
223
5
        DorisMetrics::instance()->routine_load_get_msg_count->increment(1);
224
5
        DorisMetrics::instance()->routine_load_get_msg_latency->increment(
225
5
                consumer_watch.elapsed_time() / 1000 / 1000);
226
5
        DBUG_EXECUTE_IF("KafkaDataConsumer.group_consume.out_of_range", {
227
5
            done = true;
228
5
            std::stringstream ss;
229
5
            ss << "Offset out of range"
230
5
               << ", consume partition " << msg->partition() << ", consume offset "
231
5
               << msg->offset();
232
5
            LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << ss.str();
233
5
            st = Status::InternalError<false>(ss.str());
234
5
            break;
235
5
        });
236
5
        switch (msg->err()) {
237
0
        case RdKafka::ERR_NO_ERROR:
238
0
            if (_consuming_partition_ids.count(msg->partition()) <= 0) {
239
0
                _consuming_partition_ids.insert(msg->partition());
240
0
            }
241
0
            DorisMetrics::instance()->routine_load_consume_bytes->increment(msg->len());
242
0
            if (msg->len() == 0) {
243
                // ignore msg with length 0.
244
                // put empty msg into queue will cause the load process shutting down.
245
0
                break;
246
0
            } else if (!queue->controlled_blocking_put(msg.get(),
247
0
                                                       config::blocking_queue_cv_wait_timeout_ms)) {
248
                // queue is shutdown
249
0
                done = true;
250
0
            } else {
251
0
                ++put_rows;
252
0
                msg.release(); // release the ownership, msg will be deleted after being processed
253
0
            }
254
0
            ++received_rows;
255
0
            DorisMetrics::instance()->routine_load_consume_rows->increment(1);
256
0
            break;
257
5
        case RdKafka::ERR__TIMED_OUT:
258
            // leave the status as OK, because this may happened
259
            // if there is no data in kafka.
260
5
            LOG(INFO) << "kafka consume timeout: " << _id;
261
5
            break;
262
0
        case RdKafka::ERR__TRANSPORT:
263
0
            LOG(INFO) << "kafka consume Disconnected: " << _id
264
0
                      << ", retry times: " << retry_times++;
265
0
            if (retry_times <= MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE) {
266
0
                std::this_thread::sleep_for(std::chrono::milliseconds(200));
267
0
                break;
268
0
            }
269
0
            [[fallthrough]];
270
0
        case RdKafka::ERR__PARTITION_EOF: {
271
0
            VLOG_NOTICE << "consumer meet partition eof: " << _id
272
0
                        << " partition offset: " << msg->offset();
273
0
            _consuming_partition_ids.erase(msg->partition());
274
0
            if (!queue->controlled_blocking_put(msg.get(),
275
0
                                                config::blocking_queue_cv_wait_timeout_ms)) {
276
0
                done = true;
277
0
            } else if (_consuming_partition_ids.size() <= 0) {
278
0
                LOG(INFO) << "all partitions meet eof: " << _id;
279
0
                msg.release();
280
0
                done = true;
281
0
            } else {
282
0
                msg.release();
283
0
            }
284
0
            break;
285
0
        }
286
0
        case RdKafka::ERR_OFFSET_OUT_OF_RANGE: {
287
0
            done = true;
288
0
            std::stringstream ss;
289
0
            ss << msg->errstr() << ", consume partition " << msg->partition() << ", consume offset "
290
0
               << msg->offset();
291
0
            LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << ss.str();
292
0
            st = Status::InternalError<false>(ss.str());
293
0
            break;
294
0
        }
295
0
        default:
296
0
            LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr();
297
0
            done = true;
298
0
            st = Status::InternalError<false>(msg->errstr());
299
0
            break;
300
5
        }
301
302
5
        left_time = max_running_time_ms - watch.elapsed_time() / 1000 / 1000;
303
5
        if (done) {
304
0
            break;
305
0
        }
306
5
    }
307
308
1
    LOG(INFO) << "kafka consumer done: " << _id << ", grp: " << _grp_id
309
1
              << ". cancelled: " << _cancelled << ", left time(ms): " << left_time
310
1
              << ", total cost(ms): " << watch.elapsed_time() / 1000 / 1000
311
1
              << ", consume cost(ms): " << consumer_watch.elapsed_time() / 1000 / 1000
312
1
              << ", received rows: " << received_rows << ", put rows: " << put_rows;
313
314
1
    return st;
315
1
}
316
317
0
Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids) {
318
    // create topic conf
319
0
    RdKafka::Conf* tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
320
0
    Defer delete_conf {[tconf]() { delete tconf; }};
321
322
    // create topic
323
0
    std::string errstr;
324
0
    RdKafka::Topic* topic = RdKafka::Topic::create(_k_consumer, _topic, tconf, errstr);
325
0
    if (topic == nullptr) {
326
0
        std::stringstream ss;
327
0
        ss << "failed to create topic: " << errstr;
328
0
        LOG(WARNING) << ss.str();
329
0
        return Status::InternalError(ss.str());
330
0
    }
331
332
0
    Defer delete_topic {[topic]() { delete topic; }};
333
334
    // get topic metadata
335
0
    RdKafka::Metadata* metadata = nullptr;
336
0
    RdKafka::ErrorCode err =
337
0
            _k_consumer->metadata(false /* for this topic */, topic, &metadata, 5000);
338
0
    if (err != RdKafka::ERR_NO_ERROR) {
339
0
        std::stringstream ss;
340
0
        ss << "failed to get partition meta: " << RdKafka::err2str(err);
341
0
        LOG(WARNING) << ss.str();
342
0
        return Status::InternalError(ss.str());
343
0
    }
344
345
0
    Defer delete_meta {[metadata]() { delete metadata; }};
346
347
    // get partition ids
348
0
    RdKafka::Metadata::TopicMetadataIterator it;
349
0
    for (it = metadata->topics()->begin(); it != metadata->topics()->end(); ++it) {
350
0
        if ((*it)->topic() != _topic) {
351
0
            continue;
352
0
        }
353
354
0
        if ((*it)->err() != RdKafka::ERR_NO_ERROR) {
355
0
            std::stringstream ss;
356
0
            ss << "error: " << err2str((*it)->err());
357
0
            if ((*it)->err() == RdKafka::ERR_LEADER_NOT_AVAILABLE) {
358
0
                ss << ", try again";
359
0
            }
360
0
            LOG(WARNING) << ss.str();
361
0
            return Status::InternalError(ss.str());
362
0
        }
363
364
0
        RdKafka::TopicMetadata::PartitionMetadataIterator ip;
365
0
        for (ip = (*it)->partitions()->begin(); ip != (*it)->partitions()->end(); ++ip) {
366
0
            partition_ids->push_back((*ip)->id());
367
0
        }
368
0
    }
369
370
0
    if (partition_ids->empty()) {
371
0
        return Status::InternalError("no partition in this topic");
372
0
    }
373
374
0
    return Status::OK();
375
0
}
376
377
// get offsets of each partition for times.
378
// The input parameter "times" holds <partition, timestamps>
379
// The output parameter "offsets" returns <partition, offsets>
380
//
381
// The returned offset for each partition is the earliest offset whose
382
// timestamp is greater than or equal to the given timestamp in the
383
// corresponding partition.
384
// See librdkafka/rdkafkacpp.h##offsetsForTimes()
385
Status KafkaDataConsumer::get_offsets_for_times(const std::vector<PIntegerPair>& times,
386
0
                                                std::vector<PIntegerPair>* offsets, int timeout) {
387
    // create topic partition
388
0
    std::vector<RdKafka::TopicPartition*> topic_partitions;
389
0
    for (const auto& entry : times) {
390
0
        RdKafka::TopicPartition* tp1 =
391
0
                RdKafka::TopicPartition::create(_topic, entry.key(), entry.val());
392
0
        topic_partitions.push_back(tp1);
393
0
    }
394
    // delete TopicPartition finally
395
0
    Defer delete_tp {[&topic_partitions]() {
396
0
        std::for_each(topic_partitions.begin(), topic_partitions.end(),
397
0
                      [](RdKafka::TopicPartition* tp1) { delete tp1; });
398
0
    }};
399
400
    // get offsets for times
401
0
    RdKafka::ErrorCode err = _k_consumer->offsetsForTimes(topic_partitions, timeout);
402
0
    if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) {
403
0
        std::stringstream ss;
404
0
        ss << "failed to get offsets for times: " << RdKafka::err2str(err);
405
0
        LOG(WARNING) << ss.str();
406
0
        return Status::InternalError(ss.str());
407
0
    }
408
409
0
    for (const auto& topic_partition : topic_partitions) {
410
0
        PIntegerPair pair;
411
0
        pair.set_key(topic_partition->partition());
412
0
        pair.set_val(topic_partition->offset());
413
0
        offsets->push_back(std::move(pair));
414
0
    }
415
416
0
    return Status::OK();
417
0
}
418
419
// get latest offsets for given partitions
420
Status KafkaDataConsumer::get_latest_offsets_for_partitions(
421
        const std::vector<int32_t>& partition_ids, std::vector<PIntegerPair>* offsets,
422
0
        int timeout) {
423
0
    DBUG_EXECUTE_IF("KafkaDataConsumer.get_latest_offsets_for_partitions.timeout", {
424
        // sleep 60s
425
0
        std::this_thread::sleep_for(std::chrono::seconds(60));
426
0
    });
427
0
    MonotonicStopWatch watch;
428
0
    watch.start();
429
0
    for (int32_t partition_id : partition_ids) {
430
0
        int64_t low = 0;
431
0
        int64_t high = 0;
432
0
        auto timeout_ms = timeout - static_cast<int>(watch.elapsed_time() / 1000 / 1000);
433
0
        if (UNLIKELY(timeout_ms <= 0)) {
434
0
            return Status::InternalError("get kafka latest offsets for partitions timeout");
435
0
        }
436
437
0
        RdKafka::ErrorCode err =
438
0
                _k_consumer->query_watermark_offsets(_topic, partition_id, &low, &high, timeout_ms);
439
0
        if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) {
440
0
            std::stringstream ss;
441
0
            ss << "failed to get latest offset for partition: " << partition_id
442
0
               << ", err: " << RdKafka::err2str(err);
443
0
            LOG(WARNING) << ss.str();
444
0
            return Status::InternalError(ss.str());
445
0
        }
446
447
0
        PIntegerPair pair;
448
0
        pair.set_key(partition_id);
449
0
        pair.set_val(high);
450
0
        offsets->push_back(std::move(pair));
451
0
    }
452
453
0
    return Status::OK();
454
0
}
455
456
Status KafkaDataConsumer::get_real_offsets_for_partitions(
457
        const std::vector<PIntegerPair>& offset_flags, std::vector<PIntegerPair>* offsets,
458
0
        int timeout) {
459
0
    MonotonicStopWatch watch;
460
0
    watch.start();
461
0
    for (const auto& entry : offset_flags) {
462
0
        PIntegerPair pair;
463
0
        if (UNLIKELY(entry.val() >= 0)) {
464
0
            pair.set_key(entry.key());
465
0
            pair.set_val(entry.val());
466
0
            offsets->push_back(std::move(pair));
467
0
            continue;
468
0
        }
469
470
0
        int64_t low = 0;
471
0
        int64_t high = 0;
472
0
        auto timeout_ms = timeout - static_cast<int>(watch.elapsed_time() / 1000 / 1000);
473
0
        if (UNLIKELY(timeout_ms <= 0)) {
474
0
            return Status::InternalError("get kafka real offsets for partitions timeout");
475
0
        }
476
477
0
        RdKafka::ErrorCode err =
478
0
                _k_consumer->query_watermark_offsets(_topic, entry.key(), &low, &high, timeout_ms);
479
0
        if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) {
480
0
            std::stringstream ss;
481
0
            ss << "failed to get latest offset for partition: " << entry.key()
482
0
               << ", err: " << RdKafka::err2str(err);
483
0
            LOG(WARNING) << ss.str();
484
0
            return Status::InternalError(ss.str());
485
0
        }
486
487
0
        pair.set_key(entry.key());
488
0
        if (entry.val() == -1) {
489
            // OFFSET_END_VAL = -1
490
0
            pair.set_val(high);
491
0
        } else if (entry.val() == -2) {
492
            // OFFSET_BEGINNING_VAL = -2
493
0
            pair.set_val(low);
494
0
        }
495
0
        offsets->push_back(std::move(pair));
496
0
    }
497
498
0
    return Status::OK();
499
0
}
500
501
1
Status KafkaDataConsumer::cancel(std::shared_ptr<StreamLoadContext> ctx) {
502
1
    std::unique_lock<std::mutex> l(_lock);
503
1
    if (!_init) {
504
0
        return Status::InternalError("consumer is not initialized");
505
0
    }
506
507
1
    _cancelled = true;
508
1
    LOG(INFO) << "kafka consumer cancelled. " << _id;
509
1
    return Status::OK();
510
1
}
511
512
2
Status KafkaDataConsumer::reset() {
513
2
    std::unique_lock<std::mutex> l(_lock);
514
2
    _cancelled = false;
515
2
    _k_consumer->unassign();
516
    // reset will be called before this consumer being returned to the pool.
517
    // so update _last_visit_time is reasonable.
518
2
    _last_visit_time = time(nullptr);
519
2
    return Status::OK();
520
2
}
521
522
1
Status KafkaDataConsumer::commit(std::vector<RdKafka::TopicPartition*>& offset) {
523
    // Use async commit so that it will not block for a long time.
524
    // Commit failure has no effect on Doris, subsequent tasks will continue to commit the new offset
525
1
    RdKafka::ErrorCode err = _k_consumer->commitAsync(offset);
526
1
    if (err != RdKafka::ERR_NO_ERROR) {
527
0
        return Status::InternalError("failed to commit kafka offset : {}", RdKafka::err2str(err));
528
0
    }
529
1
    return Status::OK();
530
1
}
531
532
// if the kafka brokers and topic are same,
533
// we considered this consumer as matched, thus can be reused.
534
1
bool KafkaDataConsumer::match(std::shared_ptr<StreamLoadContext> ctx) {
535
1
    if (ctx->load_src_type != TLoadSourceType::KAFKA) {
536
0
        return false;
537
0
    }
538
1
    if (_brokers != ctx->kafka_info->brokers || _topic != ctx->kafka_info->topic) {
539
0
        return false;
540
0
    }
541
    // check properties
542
1
    if (_custom_properties.size() != ctx->kafka_info->properties.size()) {
543
1
        return false;
544
1
    }
545
0
    for (auto& item : ctx->kafka_info->properties) {
546
0
        std::unordered_map<std::string, std::string>::const_iterator itr =
547
0
                _custom_properties.find(item.first);
548
0
        if (itr == _custom_properties.end()) {
549
0
            return false;
550
0
        }
551
552
0
        if (itr->second != item.second) {
553
0
            return false;
554
0
        }
555
0
    }
556
0
    return true;
557
0
}
558
559
} // end namespace doris