Coverage Report

Created: 2026-05-14 18:32

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/routine_load/data_consumer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/routine_load/data_consumer.h"
19
20
#include <absl/strings/str_split.h>
21
#include <gen_cpp/Types_types.h>
22
#include <gen_cpp/internal_service.pb.h>
23
#include <librdkafka/rdkafkacpp.h>
24
25
// AWS Kinesis SDK includes
26
#include <aws/core/client/ClientConfiguration.h>
27
#include <aws/core/utils/Outcome.h>
28
#include <aws/kinesis/KinesisClient.h>
29
#include <aws/kinesis/model/GetRecordsRequest.h>
30
#include <aws/kinesis/model/GetRecordsResult.h>
31
#include <aws/kinesis/model/GetShardIteratorRequest.h>
32
#include <aws/kinesis/model/GetShardIteratorResult.h>
33
#include <aws/kinesis/model/ListShardsRequest.h>
34
#include <aws/kinesis/model/ListShardsResult.h>
35
#include <aws/kinesis/model/Record.h>
36
#include <aws/kinesis/model/ShardIteratorType.h>
37
38
#include <algorithm>
39
// IWYU pragma: no_include <bits/chrono.h>
40
#include <chrono> // IWYU pragma: keep
41
#include <string>
42
#include <thread>
43
#include <utility>
44
#include <vector>
45
46
#include "common/config.h"
47
#include "common/metrics/doris_metrics.h"
48
#include "common/status.h"
49
#include "load/routine_load/consumer_helpers.h"
50
#include "load/routine_load/kinesis_conf.h"
51
#include "runtime/aws_msk_iam_auth.h"
52
#include "runtime/exec_env.h"
53
#include "runtime/small_file_mgr.h"
54
#include "service/backend_options.h"
55
#include "util/blocking_queue.hpp"
56
#include "util/debug_points.h"
57
#include "util/defer_op.h"
58
#include "util/s3_util.h"
59
#include "util/stopwatch.hpp"
60
#include "util/string_util.h"
61
#include "util/uid_util.h"
62
63
namespace doris {
64
65
static const std::string PROP_GROUP_ID = "group.id";
66
// init kafka consumer will only set common configs such as
67
// brokers, groupid
68
77
Status KafkaDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) {
69
77
    std::unique_lock<std::mutex> l(_lock);
70
77
    if (_init) {
71
        // this consumer has already been initialized.
72
0
        return Status::OK();
73
0
    }
74
75
77
    RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
76
77
    // conf has to be deleted finally
78
77
    Defer delete_conf {[conf]() { delete conf; }};
79
80
77
    std::string errstr;
81
1.00k
    auto set_conf = [&conf, &errstr](const std::string& conf_key, const std::string& conf_val) {
82
1.00k
        RdKafka::Conf::ConfResult res = conf->set(conf_key, conf_val, errstr);
83
1.00k
        if (res == RdKafka::Conf::CONF_UNKNOWN) {
84
            // ignore unknown config
85
0
            return Status::OK();
86
1.00k
        } else if (errstr.find("not supported") != std::string::npos) {
87
            // some java-only properties may be passed to here, and librdkafak will return 'xxx' not supported
88
            // ignore it
89
0
            return Status::OK();
90
1.00k
        } else if (res != RdKafka::Conf::CONF_OK) {
91
0
            std::stringstream ss;
92
0
            ss << "PAUSE: failed to set '" << conf_key << "', value: '" << conf_val
93
0
               << "', err: " << errstr;
94
0
            LOG(WARNING) << ss.str();
95
0
            return Status::InternalError(ss.str());
96
0
        }
97
1.00k
        VLOG_NOTICE << "set " << conf_key << ": " << conf_val;
98
1.00k
        return Status::OK();
99
1.00k
    };
100
101
77
    RETURN_IF_ERROR(set_conf("metadata.broker.list", ctx->kafka_info->brokers));
102
77
    RETURN_IF_ERROR(set_conf("enable.partition.eof", "true"));
103
77
    RETURN_IF_ERROR(set_conf("enable.auto.offset.store", "false"));
104
    // TODO: set it larger than 0 after we set rd_kafka_conf_set_stats_cb()
105
77
    RETURN_IF_ERROR(set_conf("statistics.interval.ms", "0"));
106
77
    RETURN_IF_ERROR(set_conf("auto.offset.reset", "error"));
107
77
    RETURN_IF_ERROR(set_conf("socket.keepalive.enable", "true"));
108
77
    RETURN_IF_ERROR(set_conf("reconnect.backoff.ms", "100"));
109
77
    RETURN_IF_ERROR(set_conf("reconnect.backoff.max.ms", "10000"));
110
77
    RETURN_IF_ERROR(set_conf("api.version.request", config::kafka_api_version_request));
111
77
    RETURN_IF_ERROR(set_conf("api.version.fallback.ms", "0"));
112
77
    RETURN_IF_ERROR(set_conf("broker.version.fallback", config::kafka_broker_version_fallback));
113
77
    RETURN_IF_ERROR(set_conf("broker.address.ttl", "0"));
114
77
    if (config::kafka_debug != "disable") {
115
0
        RETURN_IF_ERROR(set_conf("debug", config::kafka_debug));
116
0
    }
117
118
77
    for (auto& item : ctx->kafka_info->properties) {
119
73
        _custom_properties.emplace(item.first, item.second);
120
121
        // AWS properties (aws.*) are Doris-specific for MSK IAM authentication
122
        // and should not be passed to librdkafka
123
73
        if (starts_with(item.first, "aws.")) {
124
0
            LOG(INFO) << "Skipping AWS property for librdkafka: " << item.first;
125
0
            continue;
126
0
        }
127
128
73
        if (starts_with(item.second, "FILE:")) {
129
            // file property should has format: FILE:file_id:md5
130
0
            std::vector<std::string> parts =
131
0
                    absl::StrSplit(item.second, ":", absl::SkipWhitespace());
132
0
            if (parts.size() != 3) {
133
0
                return Status::InternalError("PAUSE: Invalid file property of kafka: " +
134
0
                                             item.second);
135
0
            }
136
0
            int64_t file_id = std::stol(parts[1]);
137
0
            std::string file_path;
138
0
            Status st = ctx->exec_env()->small_file_mgr()->get_file(file_id, parts[2], &file_path);
139
0
            if (!st.ok()) {
140
0
                return Status::InternalError("PAUSE: failed to get file for config: {}, error: {}",
141
0
                                             item.first, st.to_string());
142
0
            }
143
0
            RETURN_IF_ERROR(set_conf(item.first, file_path));
144
73
        } else {
145
73
            RETURN_IF_ERROR(set_conf(item.first, item.second));
146
73
        }
147
73
    }
148
149
    // if not specified group id, generate a random one.
150
    // ATTN: In the new version, we have set a group.id on the FE side for jobs that have not set a groupid,
151
    // but in order to ensure compatibility, we still do a check here.
152
77
    if (!_custom_properties.contains(PROP_GROUP_ID)) {
153
5
        std::stringstream ss;
154
5
        ss << BackendOptions::get_localhost() << "_";
155
5
        std::string group_id = ss.str() + UniqueId::gen_uid().to_string();
156
5
        RETURN_IF_ERROR(set_conf(PROP_GROUP_ID, group_id));
157
5
        _custom_properties.emplace(PROP_GROUP_ID, group_id);
158
5
    }
159
77
    LOG(INFO) << "init kafka consumer with group id: " << _custom_properties[PROP_GROUP_ID];
160
161
77
    if (conf->set("event_cb", &_k_event_cb, errstr) != RdKafka::Conf::CONF_OK) {
162
0
        std::stringstream ss;
163
0
        ss << "PAUSE: failed to set 'event_cb'";
164
0
        LOG(WARNING) << ss.str();
165
0
        return Status::InternalError(ss.str());
166
0
    }
167
168
    // Set up AWS MSK IAM authentication if configured
169
77
    _aws_msk_oauth_callback = AwsMskIamOAuthCallback::create_from_properties(
170
77
            _custom_properties, ctx->kafka_info->brokers);
171
77
    if (_aws_msk_oauth_callback) {
172
        // Enable SASL queue to support background callbacks
173
0
        if (conf->enable_sasl_queue(true, errstr) != RdKafka::Conf::CONF_OK) {
174
0
            LOG(WARNING) << "PAUSE: failed to enable SASL queue: " << errstr;
175
0
            return Status::InternalError("PAUSE: failed to enable SASL queue: " + errstr);
176
0
        }
177
178
0
        if (conf->set("oauthbearer_token_refresh_cb", _aws_msk_oauth_callback.get(), errstr) !=
179
0
            RdKafka::Conf::CONF_OK) {
180
0
            LOG(WARNING) << "PAUSE: failed to set OAuth callback: " << errstr;
181
0
            return Status::InternalError("PAUSE: failed to set OAuth callback: " + errstr);
182
0
        }
183
0
        LOG(INFO) << "AWS MSK IAM authentication enabled successfully";
184
0
    }
185
186
    // create consumer
187
77
    _k_consumer = RdKafka::KafkaConsumer::create(conf, errstr);
188
77
    if (!_k_consumer) {
189
0
        LOG(WARNING) << "PAUSE: failed to create kafka consumer: " << errstr;
190
0
        return Status::InternalError("PAUSE: failed to create kafka consumer: " + errstr);
191
0
    }
192
193
    // If AWS MSK IAM auth is enabled, inject initial token and enable background refresh
194
77
    if (_aws_msk_oauth_callback) {
195
0
        RETURN_IF_ERROR(_aws_msk_oauth_callback->refresh_now(_k_consumer));
196
197
0
        std::unique_ptr<RdKafka::Error> bg_err(_k_consumer->sasl_background_callbacks_enable());
198
0
        if (bg_err) {
199
0
            return Status::InternalError("Failed to enable SASL background callbacks: " +
200
0
                                         bg_err->str());
201
0
        }
202
0
        LOG(INFO) << "AWS MSK IAM: initial token set, background refresh enabled";
203
0
    }
204
205
77
    VLOG_NOTICE << "finished to init kafka consumer. " << ctx->brief();
206
207
77
    _init = true;
208
77
    return Status::OK();
209
77
}
210
211
Status KafkaDataConsumer::assign_topic_partitions(
212
        const std::map<int32_t, int64_t>& begin_partition_offset, const std::string& topic,
213
71
        std::shared_ptr<StreamLoadContext> ctx) {
214
71
    DCHECK(_k_consumer);
215
    // create TopicPartitions
216
71
    std::stringstream ss;
217
71
    std::vector<RdKafka::TopicPartition*> topic_partitions;
218
71
    for (auto& entry : begin_partition_offset) {
219
71
        RdKafka::TopicPartition* tp1 =
220
71
                RdKafka::TopicPartition::create(topic, entry.first, entry.second);
221
71
        topic_partitions.push_back(tp1);
222
71
        _consuming_partition_ids.insert(entry.first);
223
71
        ss << "[" << entry.first << ": " << entry.second << "] ";
224
71
    }
225
226
71
    LOG(INFO) << "consumer: " << _id << ", grp: " << _grp_id
227
71
              << " assign topic partitions: " << topic << ", " << ss.str();
228
229
    // delete TopicPartition finally
230
71
    Defer delete_tp {[&topic_partitions]() {
231
71
        std::for_each(topic_partitions.begin(), topic_partitions.end(),
232
71
                      [](RdKafka::TopicPartition* tp1) { delete tp1; });
233
71
    }};
234
235
    // assign partition
236
71
    RdKafka::ErrorCode err = _k_consumer->assign(topic_partitions);
237
71
    if (err) {
238
0
        LOG(WARNING) << "failed to assign topic partitions: " << ctx->brief(true)
239
0
                     << ", err: " << RdKafka::err2str(err);
240
0
        _k_consumer->unassign();
241
0
        return Status::InternalError("failed to assign topic partitions");
242
0
    }
243
244
71
    return Status::OK();
245
71
}
246
247
Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
248
71
                                        int64_t max_running_time_ms) {
249
71
    int64_t left_time = max_running_time_ms;
250
71
    LOG(INFO) << "start kafka consumer: " << _id << ", grp: " << _grp_id
251
71
              << ", max running time(ms): " << left_time;
252
253
71
    int64_t received_rows = 0;
254
71
    int64_t put_rows = 0;
255
71
    RetryPolicy retry_policy(3, 200);
256
71
    Status st = Status::OK();
257
71
    MonotonicStopWatch consumer_watch;
258
71
    MonotonicStopWatch watch;
259
71
    watch.start();
260
261
741
    while (true) {
262
741
        {
263
741
            std::unique_lock<std::mutex> l(_lock);
264
741
            if (_cancelled) {
265
0
                break;
266
0
            }
267
741
        }
268
269
741
        if (left_time <= 0) {
270
1
            break;
271
1
        }
272
273
740
        bool done = false;
274
        // consume 1 message at a time
275
740
        consumer_watch.start();
276
740
        std::unique_ptr<RdKafka::Message> msg(_k_consumer->consume(1000 /* timeout, ms */));
277
740
        consumer_watch.stop();
278
279
740
        DorisMetrics::instance()->routine_load_get_msg_count->increment(1);
280
740
        DorisMetrics::instance()->routine_load_get_msg_latency->increment(
281
740
                consumer_watch.elapsed_time() / 1000 / 1000);
282
283
740
        DBUG_EXECUTE_IF("KafkaDataConsumer.group_consume.out_of_range", {
284
740
            done = true;
285
740
            std::stringstream ss;
286
740
            ss << "Offset out of range"
287
740
               << ", consume partition " << msg->partition() << ", consume offset "
288
740
               << msg->offset();
289
740
            LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << ss.str();
290
740
            st = Status::InternalError<false>(ss.str());
291
740
            break;
292
740
        });
293
294
740
        switch (msg->err()) {
295
665
        case RdKafka::ERR_NO_ERROR:
296
665
            retry_policy.reset();
297
665
            if (_consuming_partition_ids.count(msg->partition()) <= 0) {
298
0
                _consuming_partition_ids.insert(msg->partition());
299
0
            }
300
665
            DorisMetrics::instance()->routine_load_consume_bytes->increment(msg->len());
301
665
            if (msg->len() == 0) {
302
                // ignore msg with length 0.
303
                // put empty msg into queue will cause the load process shutting down.
304
0
                break;
305
665
            } else if (!queue->controlled_blocking_put(msg.get(),
306
665
                                                       config::blocking_queue_cv_wait_timeout_ms)) {
307
                // queue is shutdown
308
0
                done = true;
309
665
            } else {
310
665
                ++put_rows;
311
665
                msg.release(); // release the ownership, msg will be deleted after being processed
312
665
            }
313
665
            ++received_rows;
314
665
            DorisMetrics::instance()->routine_load_consume_rows->increment(1);
315
665
            break;
316
5
        case RdKafka::ERR__TIMED_OUT:
317
            // leave the status as OK, because this may happened
318
            // if there is no data in kafka.
319
5
            LOG(INFO) << "kafka consume timeout: " << _id;
320
5
            break;
321
0
        case RdKafka::ERR__TRANSPORT:
322
0
            LOG(INFO) << "kafka consume Disconnected: " << _id
323
0
                      << ", retry times: " << retry_policy.retry_count();
324
0
            if (retry_policy.should_retry()) {
325
0
                retry_policy.retry_with_backoff();
326
0
                break;
327
0
            }
328
0
            [[fallthrough]];
329
70
        case RdKafka::ERR__PARTITION_EOF: {
330
70
            VLOG_NOTICE << "consumer meet partition eof: " << _id
331
0
                        << " partition offset: " << msg->offset();
332
70
            _consuming_partition_ids.erase(msg->partition());
333
70
            if (!queue->controlled_blocking_put(msg.get(),
334
70
                                                config::blocking_queue_cv_wait_timeout_ms)) {
335
0
                done = true;
336
70
            } else if (_consuming_partition_ids.size() <= 0) {
337
70
                LOG(INFO) << "all partitions meet eof: " << _id;
338
70
                msg.release();
339
70
                done = true;
340
70
            } else {
341
0
                msg.release();
342
0
            }
343
70
            break;
344
0
        }
345
0
        case RdKafka::ERR_OFFSET_OUT_OF_RANGE: {
346
0
            done = true;
347
0
            std::stringstream ss;
348
0
            ss << msg->errstr() << ", consume partition " << msg->partition() << ", consume offset "
349
0
               << msg->offset();
350
0
            LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << ss.str();
351
0
            st = Status::InternalError<false>(ss.str());
352
0
            break;
353
0
        }
354
0
        default:
355
0
            LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr();
356
0
            done = true;
357
0
            st = Status::InternalError<false>(msg->errstr());
358
0
            break;
359
740
        }
360
361
740
        left_time = max_running_time_ms - watch.elapsed_time() / 1000 / 1000;
362
740
        if (done) {
363
70
            break;
364
70
        }
365
740
    }
366
367
71
    LOG(INFO) << "kafka consumer done: " << _id << ", grp: " << _grp_id
368
71
              << ". cancelled: " << _cancelled << ", left time(ms): " << left_time
369
71
              << ", total cost(ms): " << watch.elapsed_time() / 1000 / 1000
370
71
              << ", consume cost(ms): " << consumer_watch.elapsed_time() / 1000 / 1000
371
71
              << ", received rows: " << received_rows << ", put rows: " << put_rows;
372
373
71
    return st;
374
71
}
375
376
75
Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids) {
377
    // create topic conf
378
75
    RdKafka::Conf* tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
379
75
    Defer delete_conf {[tconf]() { delete tconf; }};
380
381
    // create topic
382
75
    std::string errstr;
383
75
    RdKafka::Topic* topic = RdKafka::Topic::create(_k_consumer, _topic, tconf, errstr);
384
75
    if (topic == nullptr) {
385
0
        std::stringstream ss;
386
0
        ss << "failed to create topic: " << errstr;
387
0
        LOG(WARNING) << ss.str();
388
0
        return Status::InternalError(ss.str());
389
0
    }
390
391
75
    Defer delete_topic {[topic]() { delete topic; }};
392
393
    // get topic metadata
394
75
    RdKafka::Metadata* metadata = nullptr;
395
75
    RdKafka::ErrorCode err =
396
75
            _k_consumer->metadata(false /* for this topic */, topic, &metadata, 5000);
397
75
    if (err != RdKafka::ERR_NO_ERROR) {
398
0
        std::stringstream ss;
399
0
        ss << "failed to get partition meta: " << RdKafka::err2str(err);
400
0
        LOG(WARNING) << ss.str();
401
0
        return Status::InternalError(ss.str());
402
0
    }
403
404
75
    Defer delete_meta {[metadata]() { delete metadata; }};
405
406
    // get partition ids
407
75
    RdKafka::Metadata::TopicMetadataIterator it;
408
147
    for (it = metadata->topics()->begin(); it != metadata->topics()->end(); ++it) {
409
75
        if ((*it)->topic() != _topic) {
410
0
            continue;
411
0
        }
412
413
75
        if ((*it)->err() != RdKafka::ERR_NO_ERROR) {
414
3
            std::stringstream ss;
415
3
            ss << "error: " << err2str((*it)->err());
416
3
            if ((*it)->err() == RdKafka::ERR_LEADER_NOT_AVAILABLE) {
417
0
                ss << ", try again";
418
0
            }
419
3
            LOG(WARNING) << ss.str();
420
3
            return Status::InternalError(ss.str());
421
3
        }
422
423
72
        RdKafka::TopicMetadata::PartitionMetadataIterator ip;
424
144
        for (ip = (*it)->partitions()->begin(); ip != (*it)->partitions()->end(); ++ip) {
425
72
            partition_ids->push_back((*ip)->id());
426
72
        }
427
72
    }
428
429
72
    if (partition_ids->empty()) {
430
0
        return Status::InternalError("no partition in this topic");
431
0
    }
432
433
72
    return Status::OK();
434
72
}
435
436
// get offsets of each partition for times.
437
// The input parameter "times" holds <partition, timestamps>
438
// The output parameter "offsets" returns <partition, offsets>
439
//
440
// The returned offset for each partition is the earliest offset whose
441
// timestamp is greater than or equal to the given timestamp in the
442
// corresponding partition.
443
// See librdkafka/rdkafkacpp.h##offsetsForTimes()
444
Status KafkaDataConsumer::get_offsets_for_times(const std::vector<PIntegerPair>& times,
445
1
                                                std::vector<PIntegerPair>* offsets, int timeout) {
446
    // create topic partition
447
1
    std::vector<RdKafka::TopicPartition*> topic_partitions;
448
1
    for (const auto& entry : times) {
449
1
        RdKafka::TopicPartition* tp1 =
450
1
                RdKafka::TopicPartition::create(_topic, entry.key(), entry.val());
451
1
        topic_partitions.push_back(tp1);
452
1
    }
453
    // delete TopicPartition finally
454
1
    Defer delete_tp {[&topic_partitions]() {
455
1
        std::for_each(topic_partitions.begin(), topic_partitions.end(),
456
1
                      [](RdKafka::TopicPartition* tp1) { delete tp1; });
457
1
    }};
458
459
    // get offsets for times
460
1
    RdKafka::ErrorCode err = _k_consumer->offsetsForTimes(topic_partitions, timeout);
461
1
    if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) {
462
0
        std::stringstream ss;
463
0
        ss << "failed to get offsets for times: " << RdKafka::err2str(err);
464
0
        LOG(WARNING) << ss.str();
465
0
        return Status::InternalError(ss.str());
466
0
    }
467
468
1
    for (const auto& topic_partition : topic_partitions) {
469
1
        PIntegerPair pair;
470
1
        pair.set_key(topic_partition->partition());
471
1
        pair.set_val(topic_partition->offset());
472
1
        offsets->push_back(std::move(pair));
473
1
    }
474
475
1
    return Status::OK();
476
1
}
477
478
// get latest offsets for given partitions
479
Status KafkaDataConsumer::get_latest_offsets_for_partitions(
480
        const std::vector<int32_t>& partition_ids, std::vector<PIntegerPair>* offsets,
481
283
        int timeout) {
482
283
    DBUG_EXECUTE_IF("KafkaDataConsumer.get_offsets_for_partitions.timeout", {
483
        // sleep 61s
484
283
        std::this_thread::sleep_for(std::chrono::seconds(61));
485
283
    });
486
283
    MonotonicStopWatch watch;
487
283
    watch.start();
488
283
    for (int32_t partition_id : partition_ids) {
489
283
        int64_t low = 0;
490
283
        int64_t high = 0;
491
283
        auto timeout_ms = timeout - static_cast<int>(watch.elapsed_time() / 1000 / 1000);
492
283
        if (UNLIKELY(timeout_ms <= 0)) {
493
0
            return Status::InternalError("get kafka latest offsets for partitions timeout");
494
0
        }
495
496
283
        RdKafka::ErrorCode err =
497
283
                _k_consumer->query_watermark_offsets(_topic, partition_id, &low, &high, timeout_ms);
498
283
        if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) {
499
0
            std::stringstream ss;
500
0
            ss << "failed to get latest offset for partition: " << partition_id
501
0
               << ", err: " << RdKafka::err2str(err);
502
0
            LOG(WARNING) << ss.str();
503
0
            return Status::InternalError(ss.str());
504
0
        }
505
506
283
        PIntegerPair pair;
507
283
        pair.set_key(partition_id);
508
283
        pair.set_val(high);
509
283
        offsets->push_back(std::move(pair));
510
283
    }
511
512
283
    return Status::OK();
513
283
}
514
515
Status KafkaDataConsumer::get_real_offsets_for_partitions(
516
        const std::vector<PIntegerPair>& offset_flags, std::vector<PIntegerPair>* offsets,
517
65
        int timeout) {
518
65
    DBUG_EXECUTE_IF("KafkaDataConsumer.get_offsets_for_partitions.timeout", {
519
        // sleep 61s
520
65
        std::this_thread::sleep_for(std::chrono::seconds(61));
521
65
    });
522
65
    MonotonicStopWatch watch;
523
65
    watch.start();
524
65
    for (const auto& entry : offset_flags) {
525
65
        PIntegerPair pair;
526
65
        if (UNLIKELY(entry.val() >= 0)) {
527
0
            pair.set_key(entry.key());
528
0
            pair.set_val(entry.val());
529
0
            offsets->push_back(std::move(pair));
530
0
            continue;
531
0
        }
532
533
65
        int64_t low = 0;
534
65
        int64_t high = 0;
535
65
        auto timeout_ms = timeout - static_cast<int>(watch.elapsed_time() / 1000 / 1000);
536
65
        if (UNLIKELY(timeout_ms <= 0)) {
537
0
            return Status::InternalError("get kafka real offsets for partitions timeout");
538
0
        }
539
540
65
        RdKafka::ErrorCode err =
541
65
                _k_consumer->query_watermark_offsets(_topic, entry.key(), &low, &high, timeout_ms);
542
65
        if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) {
543
0
            std::stringstream ss;
544
0
            ss << "failed to get latest offset for partition: " << entry.key()
545
0
               << ", err: " << RdKafka::err2str(err);
546
0
            LOG(WARNING) << ss.str();
547
0
            return Status::InternalError(ss.str());
548
0
        }
549
550
65
        pair.set_key(entry.key());
551
65
        if (entry.val() == -1) {
552
            // OFFSET_END_VAL = -1
553
5
            pair.set_val(high);
554
60
        } else if (entry.val() == -2) {
555
            // OFFSET_BEGINNING_VAL = -2
556
60
            pair.set_val(low);
557
60
        }
558
65
        offsets->push_back(std::move(pair));
559
65
    }
560
561
65
    return Status::OK();
562
65
}
563
564
71
Status KafkaDataConsumer::cancel(std::shared_ptr<StreamLoadContext> ctx) {
565
71
    std::unique_lock<std::mutex> l(_lock);
566
71
    if (!_init) {
567
0
        return Status::InternalError("consumer is not initialized");
568
0
    }
569
570
71
    _cancelled = true;
571
71
    LOG(INFO) << "kafka consumer cancelled. " << _id;
572
71
    return Status::OK();
573
71
}
574
575
1.04k
Status KafkaDataConsumer::reset() {
576
1.04k
    std::unique_lock<std::mutex> l(_lock);
577
1.04k
    _cancelled = false;
578
1.04k
    _k_consumer->unassign();
579
    // reset will be called before this consumer being returned to the pool.
580
    // so update _last_visit_time is reasonable.
581
1.04k
    _last_visit_time = time(nullptr);
582
1.04k
    return Status::OK();
583
1.04k
}
584
585
69
Status KafkaDataConsumer::commit(std::vector<RdKafka::TopicPartition*>& offset) {
586
    // Use async commit so that it will not block for a long time.
587
    // Commit failure has no effect on Doris, subsequent tasks will continue to commit the new offset
588
69
    RdKafka::ErrorCode err = _k_consumer->commitAsync(offset);
589
69
    if (err != RdKafka::ERR_NO_ERROR) {
590
0
        return Status::InternalError("failed to commit kafka offset : {}", RdKafka::err2str(err));
591
0
    }
592
69
    return Status::OK();
593
69
}
594
595
// if the kafka brokers and topic are same,
596
// we considered this consumer as matched, thus can be reused.
597
12.1k
bool KafkaDataConsumer::match(std::shared_ptr<StreamLoadContext> ctx) {
598
12.1k
    if (ctx->load_src_type != TLoadSourceType::KAFKA) {
599
0
        return false;
600
0
    }
601
12.1k
    if (_brokers != ctx->kafka_info->brokers || _topic != ctx->kafka_info->topic) {
602
10.8k
        return false;
603
10.8k
    }
604
    // check properties
605
1.28k
    return PropertyMatcher::properties_match(_custom_properties, ctx->kafka_info->properties);
606
12.1k
}
607
608
// ==================== AWS Kinesis Data Consumer Implementation ====================
609
610
KinesisDataConsumer::KinesisDataConsumer(std::shared_ptr<StreamLoadContext> ctx)
611
0
        : _region(ctx->kinesis_info->region),
612
0
          _stream(ctx->kinesis_info->stream),
613
0
          _endpoint(ctx->kinesis_info->endpoint) {
614
0
    VLOG_NOTICE << "construct Kinesis consumer: stream=" << _stream << ", region=" << _region;
615
0
}
616
617
0
KinesisDataConsumer::~KinesisDataConsumer() {
618
0
    VLOG_NOTICE << "destruct Kinesis consumer: stream=" << _stream;
619
    // AWS SDK client managed by shared_ptr, will be automatically cleaned up
620
0
}
621
622
0
Status KinesisDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) {
623
0
    std::unique_lock<std::mutex> l(_lock);
624
0
    if (_init) {
625
0
        return Status::OK(); // Already initialized (idempotent)
626
0
    }
627
628
    // Store custom properties (AWS credentials, etc.)
629
0
    _custom_properties.insert(ctx->kinesis_info->properties.begin(),
630
0
                              ctx->kinesis_info->properties.end());
631
632
    // Create KinesisConf and configure it
633
0
    _kinesis_conf = std::make_unique<KinesisConf>();
634
0
    std::string errstr;
635
636
    // Parse and categorize aws.kinesis.* properties into three types
637
0
    for (auto& item : _custom_properties) {
638
0
        if (starts_with(item.first, "aws.kinesis.")) {
639
0
            std::string conf_key = item.first.substr(12); // Remove "aws.kinesis." prefix
640
641
            // Type 2: Frequently-used parameters (explicit members)
642
0
            if (conf_key == "shards") {
643
0
                std::vector<std::string> parts =
644
0
                        absl::StrSplit(item.second, ",", absl::SkipWhitespace());
645
0
                _explicit_shards = std::move(parts);
646
0
                VLOG_NOTICE << "Set explicit shards: " << item.second;
647
0
            } else if (conf_key == "default.pos") {
648
0
                _default_position = item.second;
649
0
                VLOG_NOTICE << "Set default position: " << item.second;
650
0
            } else if (starts_with(conf_key, "shards.pos.")) {
651
0
                std::string shard_id = conf_key.substr(11); // Remove "shards.pos." prefix
652
0
                _shard_positions[shard_id] = item.second;
653
0
                VLOG_NOTICE << "Set shard position: " << shard_id << " = " << item.second;
654
0
            }
655
            // Type 3: Less-frequently-used API parameters (KinesisConf determines which API)
656
0
            else {
657
0
                KinesisConf::ConfResult res = _kinesis_conf->set(conf_key, item.second, errstr);
658
0
                if (res == KinesisConf::CONF_INVALID) {
659
0
                    return Status::InternalError("Failed to set '{}': {}", conf_key, errstr);
660
0
                }
661
                // CONF_UNKNOWN is acceptable (parameter will be ignored)
662
0
            }
663
0
        }
664
0
    }
665
666
    // Create AWS Kinesis client
667
0
    RETURN_IF_ERROR(_create_kinesis_client(ctx));
668
669
0
    VLOG_NOTICE << "finished to init Kinesis consumer. stream=" << _stream << ", region=" << _region
670
0
                << ", " << ctx->brief();
671
0
    _init = true;
672
0
    return Status::OK();
673
0
}
674
675
0
Status KinesisDataConsumer::_create_kinesis_client(std::shared_ptr<StreamLoadContext> ctx) {
676
    // Reuse S3ClientFactory's credential provider logic
677
    // This supports all AWS authentication methods:
678
    // - Simple AK/SK
679
    // - IAM instance profile (EC2)
680
    // - STS assume role
681
    // - Session tokens
682
    // - Environment variables
683
    // - Default credential chain
684
685
0
    S3ClientConf s3_conf;
686
0
    s3_conf.region = _region;
687
0
    s3_conf.endpoint = _endpoint;
688
689
0
    auto get_property = [this](const char* key) -> std::string {
690
0
        auto it = _custom_properties.find(key);
691
0
        if (it != _custom_properties.end() && !it->second.empty()) {
692
0
            return it->second;
693
0
        }
694
0
        return "";
695
0
    };
696
697
    // Keep one naming convention aligned with FE-side Kinesis properties.
698
0
    s3_conf.ak = get_property("aws.access_key");
699
0
    s3_conf.sk = get_property("aws.secret_key");
700
0
    s3_conf.token = get_property("aws.session_key");
701
0
    s3_conf.role_arn = get_property("aws.role_arn");
702
0
    s3_conf.external_id = get_property("aws.external.id");
703
704
0
    const std::string provider = get_property("aws.credentials.provider");
705
0
    if (!provider.empty()) {
706
        // Map provider type string to enum
707
0
        if (provider == "instance_profile") {
708
0
            s3_conf.cred_provider_type = CredProviderType::InstanceProfile;
709
0
        } else if (provider == "env") {
710
0
            s3_conf.cred_provider_type = CredProviderType::Env;
711
0
        } else if (provider == "simple") {
712
0
            s3_conf.cred_provider_type = CredProviderType::Simple;
713
0
        }
714
0
    }
715
716
    // Create AWS ClientConfiguration
717
0
    Aws::Client::ClientConfiguration aws_config = S3ClientFactory::getClientConfiguration();
718
0
    aws_config.region = _region;
719
720
0
    if (!_endpoint.empty()) {
721
0
        aws_config.endpointOverride = _endpoint;
722
0
    }
723
724
0
    std::string ca_cert_file_path =
725
0
            get_valid_ca_cert_path(doris::split(config::ca_cert_file_paths, ";"));
726
0
    if (!ca_cert_file_path.empty()) {
727
0
        aws_config.caFile = ca_cert_file_path;
728
0
    }
729
730
0
    auto parse_timeout_ms = [](const std::string& timeout_value, const std::string& property_name,
731
0
                               long* timeout_ms) -> Status {
732
0
        try {
733
0
            *timeout_ms = std::stol(timeout_value);
734
0
        } catch (const std::exception&) {
735
0
            return Status::InternalError("Invalid value for {}: {}", property_name, timeout_value);
736
0
        }
737
0
        return Status::OK();
738
0
    };
739
740
    // Set timeouts from properties or use defaults
741
0
    auto it_request_timeout = _custom_properties.find("aws.request.timeout.ms");
742
0
    if (it_request_timeout != _custom_properties.end()) {
743
0
        RETURN_IF_ERROR(parse_timeout_ms(it_request_timeout->second, "aws.request.timeout.ms",
744
0
                                         &aws_config.requestTimeoutMs));
745
0
    } else {
746
0
        aws_config.requestTimeoutMs = 30000; // 30s default
747
0
    }
748
749
0
    auto it_conn_timeout = _custom_properties.find("aws.connection.timeout.ms");
750
0
    if (it_conn_timeout != _custom_properties.end()) {
751
0
        RETURN_IF_ERROR(parse_timeout_ms(it_conn_timeout->second, "aws.connection.timeout.ms",
752
0
                                         &aws_config.connectTimeoutMs));
753
0
    }
754
755
    // Get credentials provider (reuses S3 infrastructure)
756
0
    auto credentials_provider = S3ClientFactory::instance().get_aws_credentials_provider(s3_conf);
757
758
    // Create Kinesis client
759
0
    _kinesis_client =
760
0
            std::make_shared<Aws::Kinesis::KinesisClient>(credentials_provider, aws_config);
761
762
0
    if (!_kinesis_client) {
763
0
        return Status::InternalError(
764
0
                "Failed to create AWS Kinesis client for stream: {}, region: {}", _stream, _region);
765
0
    }
766
767
0
    LOG(INFO) << "Created Kinesis client for stream: " << _stream << ", region: " << _region;
768
0
    return Status::OK();
769
0
}
770
771
Status KinesisDataConsumer::assign_shards(
772
        const std::map<std::string, std::string>& shard_sequence_numbers,
773
0
        const std::string& stream_name, std::shared_ptr<StreamLoadContext> ctx) {
774
0
    DORIS_CHECK(_kinesis_client);
775
776
0
    std::stringstream ss;
777
0
    ss << "Assigning shards to Kinesis consumer " << _id << ": ";
778
779
0
    for (auto& entry : shard_sequence_numbers) {
780
0
        const std::string& shard_id = entry.first;
781
0
        const std::string& sequence_number = entry.second;
782
783
        // Get shard iterator for this shard
784
0
        std::string iterator;
785
0
        RETURN_IF_ERROR(_get_shard_iterator(shard_id, sequence_number, &iterator));
786
787
0
        _shard_iterators[shard_id] = iterator;
788
0
        _consuming_shard_ids.insert(shard_id);
789
790
0
        ss << "[" << shard_id << ": " << sequence_number << "] ";
791
0
    }
792
793
0
    LOG(INFO) << ss.str();
794
0
    return Status::OK();
795
0
}
796
797
Status KinesisDataConsumer::_get_shard_iterator(const std::string& shard_id,
798
                                                const std::string& sequence_number,
799
0
                                                std::string* iterator) {
800
0
    Aws::Kinesis::Model::GetShardIteratorRequest request;
801
802
    // Apply all configurations through KinesisConf
803
0
    DCHECK(_kinesis_conf != nullptr);
804
0
    Status st = _kinesis_conf->apply_to_get_shard_iterator_request(request, _stream, shard_id,
805
0
                                                                   sequence_number);
806
0
    if (!st.ok()) {
807
0
        return Status::InternalError(
808
0
                "Failed to apply Kinesis config to GetShardIteratorRequest: {}", st.to_string());
809
0
    }
810
811
0
    auto outcome = _kinesis_client->GetShardIterator(request);
812
0
    if (!outcome.IsSuccess()) {
813
0
        auto& error = outcome.GetError();
814
0
        return Status::InternalError("Failed to get shard iterator for shard {}: {} ({})", shard_id,
815
0
                                     error.GetMessage(), static_cast<int>(error.GetErrorType()));
816
0
    }
817
818
0
    *iterator = outcome.GetResult().GetShardIterator();
819
0
    VLOG_NOTICE << "Got shard iterator for shard: " << shard_id;
820
0
    return Status::OK();
821
0
}
822
823
Status KinesisDataConsumer::group_consume(
824
        BlockingQueue<std::shared_ptr<Aws::Kinesis::Model::Record>>* queue,
825
0
        int64_t max_running_time_ms) {
826
0
    static constexpr int INTER_SHARD_SLEEP_MS = 10;            // Small sleep between shards
827
0
    static constexpr int MIN_INTERVAL_BETWEEN_ROUNDS_MS = 200; // Min 200ms between rounds
828
829
0
    int64_t left_time = max_running_time_ms;
830
0
    LOG(INFO) << "start Kinesis consumer: " << _id << ", grp: " << _grp_id
831
0
              << ", stream: " << _stream << ", max running time(ms): " << left_time;
832
833
0
    int64_t received_rows = 0;
834
0
    int64_t put_rows = 0;
835
0
    RetryPolicy retry_policy(3, 200);
836
0
    ThrottleBackoff throttle_backoff(1000, 10000);
837
0
    Status st = Status::OK();
838
0
    bool done = false;
839
840
0
    MonotonicStopWatch consumer_watch;
841
0
    MonotonicStopWatch watch;
842
0
    watch.start();
843
844
0
    while (true) {
845
        // Check cancellation flag
846
0
        {
847
0
            std::unique_lock<std::mutex> l(_lock);
848
0
            if (_cancelled) {
849
0
                break;
850
0
            }
851
0
        }
852
853
0
        if (left_time <= 0) {
854
0
            break;
855
0
        }
856
857
        // Round-robin through all active shards
858
0
        for (auto it = _consuming_shard_ids.begin(); it != _consuming_shard_ids.end() && !done;) {
859
0
            const std::string& shard_id = *it;
860
0
            auto iter_it = _shard_iterators.find(shard_id);
861
862
0
            if (iter_it == _shard_iterators.end() || iter_it->second.empty()) {
863
                // Shard exhausted (closed due to split/merge), remove from active set
864
0
                LOG(INFO) << "Shard exhausted: " << shard_id;
865
0
                it = _consuming_shard_ids.erase(it);
866
0
                continue;
867
0
            }
868
869
0
            consumer_watch.start();
870
871
0
            Aws::Kinesis::Model::GetRecordsRequest request;
872
873
0
            DCHECK(_kinesis_conf != nullptr);
874
0
            st = _kinesis_conf->apply_to_get_records_request(request, iter_it->second);
875
0
            if (!st.ok()) {
876
0
                LOG(WARNING) << "Failed to apply Kinesis config to GetRecordsRequest: " << st;
877
0
                done = true;
878
0
                break;
879
0
            }
880
881
0
            auto outcome = _kinesis_client->GetRecords(request);
882
0
            consumer_watch.stop();
883
884
            // Track generic routine load metrics and Kinesis-specific metrics.
885
0
            DorisMetrics::instance()->routine_load_get_msg_count->increment(1);
886
0
            DorisMetrics::instance()->routine_load_get_msg_latency->increment(
887
0
                    consumer_watch.elapsed_time() / 1000 / 1000);
888
0
            DorisMetrics::instance()->routine_load_kinesis_get_records_count->increment(1);
889
0
            DorisMetrics::instance()->routine_load_kinesis_get_records_latency->increment(
890
0
                    consumer_watch.elapsed_time() / 1000 / 1000);
891
892
0
            if (!outcome.IsSuccess()) {
893
0
                auto& error = outcome.GetError();
894
895
                // Handle throttling (ProvisionedThroughputExceededException)
896
0
                if (error.GetErrorType() ==
897
0
                    Aws::Kinesis::KinesisErrors::PROVISIONED_THROUGHPUT_EXCEEDED) {
898
0
                    DorisMetrics::instance()->routine_load_kinesis_throttle_count->increment(1);
899
0
                    LOG(INFO) << "Kinesis rate limit exceeded for shard: " << shard_id
900
0
                              << ", throttle_count: " << throttle_backoff.throttle_count()
901
0
                              << ", backing off";
902
0
                    throttle_backoff.backoff_and_sleep();
903
0
                    ++it; // Move to next shard, will retry this one next round
904
0
                    continue;
905
0
                }
906
907
                // Handle retriable errors
908
0
                if (_is_retriable_error(error)) {
909
0
                    DorisMetrics::instance()->routine_load_kinesis_retriable_error_count->increment(
910
0
                            1);
911
0
                    LOG(INFO) << "Kinesis retriable error for shard " << shard_id << ": "
912
0
                              << error.GetMessage()
913
0
                              << ", retry times: " << retry_policy.retry_count();
914
0
                    if (retry_policy.should_retry()) {
915
0
                        retry_policy.retry_with_backoff();
916
0
                        continue;
917
0
                    }
918
0
                }
919
920
                // Fatal error
921
0
                LOG(WARNING) << "Kinesis consume failed for shard " << shard_id << ": "
922
0
                             << error.GetMessage() << " (" << static_cast<int>(error.GetErrorType())
923
0
                             << ")";
924
0
                st = Status::InternalError("Kinesis GetRecords failed for shard {}: {}", shard_id,
925
0
                                           error.GetMessage());
926
0
                done = true;
927
0
                break;
928
0
            }
929
930
            // Reset retry counter on success
931
0
            retry_policy.reset();
932
0
            throttle_backoff.reset();
933
934
            // Process records - move result to allow moving individual records
935
0
            auto result = outcome.GetResultWithOwnership();
936
0
            auto millis_behind = result.GetMillisBehindLatest();
937
0
            std::string next_iterator = result.GetNextShardIterator();
938
0
            size_t record_count = result.GetRecords().size();
939
0
            RETURN_IF_ERROR(_process_records(shard_id, std::move(result), queue, &received_rows,
940
0
                                             &put_rows));
941
942
            // Track MillisBehindLatest for this shard (used by FE for lag monitoring & scheduling)
943
0
            _millis_behind_latest[shard_id] = millis_behind;
944
945
            // Update shard iterator for next call
946
0
            if (next_iterator.empty()) {
947
                // Shard is closed (split/merge), mark as closed and remove from active set
948
0
                LOG(INFO) << "Shard closed: " << shard_id << " (split/merge detected)";
949
0
                DorisMetrics::instance()->routine_load_kinesis_closed_shard_count->increment(1);
950
0
                _closed_shard_ids.insert(shard_id);
951
0
                _shard_iterators.erase(shard_id);
952
0
                it = _consuming_shard_ids.erase(it);
953
0
            } else {
954
                // Update iterator for next consumption
955
0
                _shard_iterators[shard_id] = next_iterator;
956
957
0
                if (record_count == 0) {
958
                    // No records in this batch - shard has caught up with latest data
959
                    // Remove from active set for this round (similar to Kafka PARTITION_EOF)
960
                    // but keep iterator and progress for next task execution
961
0
                    LOG(INFO) << "Shard has no new data: " << shard_id
962
0
                              << " (MillisBehindLatest=" << millis_behind << ")";
963
0
                    it = _consuming_shard_ids.erase(it);
964
0
                } else {
965
0
                    ++it;
966
0
                }
967
0
            }
968
969
            // Check if all shards are exhausted
970
0
            if (_consuming_shard_ids.empty()) {
971
0
                LOG(INFO) << "All shards exhausted for consumer: " << _id;
972
0
                done = true;
973
0
                break;
974
0
            }
975
976
            // Small sleep to avoid tight loop
977
0
            std::this_thread::sleep_for(std::chrono::milliseconds(INTER_SHARD_SLEEP_MS));
978
0
        }
979
980
        // Ensure minimum interval between rounds to respect Kinesis rate limits (5 GetRecords/sec per shard)
981
0
        std::this_thread::sleep_for(std::chrono::milliseconds(MIN_INTERVAL_BETWEEN_ROUNDS_MS));
982
983
0
        left_time = max_running_time_ms - watch.elapsed_time() / 1000 / 1000;
984
0
        if (done) {
985
0
            break;
986
0
        }
987
0
    }
988
989
0
    LOG(INFO) << "Kinesis consumer done: " << _id << ", grp: " << _grp_id
990
0
              << ". cancelled: " << _cancelled << ", left time(ms): " << left_time
991
0
              << ", total cost(ms): " << watch.elapsed_time() / 1000 / 1000
992
0
              << ", consume cost(ms): " << consumer_watch.elapsed_time() / 1000 / 1000
993
0
              << ", received rows: " << received_rows << ", put rows: " << put_rows;
994
995
0
    return st;
996
0
}
997
998
Status KinesisDataConsumer::_process_records(
999
        const std::string& shard_id, Aws::Kinesis::Model::GetRecordsResult result,
1000
        BlockingQueue<std::shared_ptr<Aws::Kinesis::Model::Record>>* queue, int64_t* received_rows,
1001
0
        int64_t* put_rows) {
1002
    // result is owned by value, safe to get mutable access to its records
1003
0
    auto records =
1004
0
            std::move(const_cast<Aws::Vector<Aws::Kinesis::Model::Record>&>(result.GetRecords()));
1005
0
    for (auto& record : records) {
1006
0
        DorisMetrics::instance()->routine_load_consume_bytes->increment(
1007
0
                record.GetData().GetLength());
1008
1009
0
        if (record.GetData().GetLength() == 0) {
1010
            // Skip empty records
1011
0
            continue;
1012
0
        }
1013
1014
        // Track the last sequence number for this shard
1015
0
        _committed_sequence_numbers[shard_id] = record.GetSequenceNumber();
1016
1017
        // Move record into shared_ptr to avoid expensive copy
1018
0
        auto record_ptr = std::make_shared<Aws::Kinesis::Model::Record>(std::move(record));
1019
1020
0
        if (!queue->controlled_blocking_put(record_ptr,
1021
0
                                            config::blocking_queue_cv_wait_timeout_ms)) {
1022
            // Queue shutdown
1023
0
            return Status::InternalError("Queue shutdown during record processing");
1024
0
        }
1025
1026
0
        (*put_rows)++;
1027
0
        (*received_rows)++;
1028
0
        DorisMetrics::instance()->routine_load_consume_rows->increment(1);
1029
0
    }
1030
1031
0
    return Status::OK();
1032
0
}
1033
1034
bool KinesisDataConsumer::_is_retriable_error(
1035
0
        const Aws::Client::AWSError<Aws::Kinesis::KinesisErrors>& error) {
1036
0
    auto error_type = error.GetErrorType();
1037
1038
0
    return error_type == Aws::Kinesis::KinesisErrors::PROVISIONED_THROUGHPUT_EXCEEDED ||
1039
0
           error_type == Aws::Kinesis::KinesisErrors::SERVICE_UNAVAILABLE ||
1040
0
           error_type == Aws::Kinesis::KinesisErrors::INTERNAL_FAILURE ||
1041
0
           error_type == Aws::Kinesis::KinesisErrors::NETWORK_CONNECTION || error.ShouldRetry();
1042
0
}
1043
1044
0
Status KinesisDataConsumer::reset() {
1045
0
    std::unique_lock<std::mutex> l(_lock);
1046
0
    _cancelled = false;
1047
0
    _consuming_shard_ids.clear();
1048
0
    _shard_iterators.clear();
1049
0
    _millis_behind_latest.clear();
1050
0
    _committed_sequence_numbers.clear();
1051
0
    _closed_shard_ids.clear();
1052
0
    _last_visit_time = time(nullptr);
1053
0
    LOG(INFO) << "Kinesis consumer reset: " << _id;
1054
0
    return Status::OK();
1055
0
}
1056
1057
0
Status KinesisDataConsumer::cancel(std::shared_ptr<StreamLoadContext> ctx) {
1058
0
    std::unique_lock<std::mutex> l(_lock);
1059
0
    if (!_init) {
1060
0
        return Status::InternalError("Kinesis consumer is not initialized");
1061
0
    }
1062
0
    _cancelled = true;
1063
0
    LOG(INFO) << "Kinesis consumer cancelled: " << _id << ", " << ctx->brief();
1064
0
    return Status::OK();
1065
0
}
1066
1067
0
bool KinesisDataConsumer::match(std::shared_ptr<StreamLoadContext> ctx) {
1068
0
    if (ctx->load_src_type != TLoadSourceType::KINESIS) {
1069
0
        return false;
1070
0
    }
1071
1072
0
    if (_region != ctx->kinesis_info->region || _stream != ctx->kinesis_info->stream ||
1073
0
        _endpoint != ctx->kinesis_info->endpoint) {
1074
0
        return false;
1075
0
    }
1076
1077
    // Check that properties match
1078
0
    return PropertyMatcher::properties_match(_custom_properties, ctx->kinesis_info->properties);
1079
0
}
1080
1081
0
Status KinesisDataConsumer::get_shard_list(std::vector<std::string>* shard_ids) {
1082
0
    DORIS_CHECK(_kinesis_client);
1083
1084
    // If user specified explicit shards, return those
1085
0
    if (!_explicit_shards.empty()) {
1086
0
        *shard_ids = _explicit_shards;
1087
0
        LOG(INFO) << "Using " << shard_ids->size() << " explicit shards for stream: " << _stream;
1088
0
        return Status::OK();
1089
0
    }
1090
1091
    // Discover all shards
1092
0
    Aws::Kinesis::Model::ListShardsRequest request;
1093
1094
0
    DCHECK(_kinesis_conf != nullptr);
1095
0
    Status st = _kinesis_conf->apply_to_list_shards_request(request, _stream);
1096
0
    if (!st.ok()) {
1097
0
        return Status::InternalError("Failed to apply Kinesis config to ListShardsRequest: {}",
1098
0
                                     st.to_string());
1099
0
    }
1100
1101
    // Only return OPEN shards here. FE will keep recently retired parent shards in its
1102
    // closed list until they are fully drained, then remove them permanently. Returning
1103
    // CLOSED shards from ListShards would make already-drained parents look newly discovered
1104
    // and cause them to restart from TRIM_HORIZON.
1105
0
    std::vector<std::string> discovered_shard_ids;
1106
0
    bool saw_any_shard = false;
1107
0
    while (true) {
1108
0
        auto outcome = _kinesis_client->ListShards(request);
1109
0
        if (!outcome.IsSuccess()) {
1110
0
            auto& error = outcome.GetError();
1111
0
            return Status::InternalError("Failed to list shards for stream {}: {} ({})", _stream,
1112
0
                                         error.GetMessage(),
1113
0
                                         static_cast<int>(error.GetErrorType()));
1114
0
        }
1115
1116
0
        const auto& result = outcome.GetResult();
1117
0
        if (!result.GetShards().empty()) {
1118
0
            saw_any_shard = true;
1119
0
        }
1120
0
        for (const auto& shard : result.GetShards()) {
1121
0
            const auto& ending_sequence_number =
1122
0
                    shard.GetSequenceNumberRange().GetEndingSequenceNumber();
1123
0
            if (!ending_sequence_number.empty()) {
1124
0
                continue;
1125
0
            }
1126
0
            discovered_shard_ids.emplace_back(shard.GetShardId());
1127
0
        }
1128
1129
0
        const Aws::String& next_token = result.GetNextToken();
1130
0
        if (next_token.empty()) {
1131
0
            break;
1132
0
        }
1133
1134
0
        Aws::Kinesis::Model::ListShardsRequest next_request;
1135
        // AWS requires paginated ListShards requests to use NextToken instead of StreamName.
1136
0
        next_request.SetNextToken(next_token);
1137
0
        if (request.MaxResultsHasBeenSet()) {
1138
0
            next_request.SetMaxResults(request.GetMaxResults());
1139
0
        }
1140
0
        request = std::move(next_request);
1141
0
    }
1142
1143
0
    if (discovered_shard_ids.empty() && !saw_any_shard) {
1144
0
        return Status::InternalError("No shards found in Kinesis stream: {}", _stream);
1145
0
    }
1146
1147
0
    *shard_ids = std::move(discovered_shard_ids);
1148
    LOG(INFO) << "Found " << shard_ids->size() << " open shards in stream: " << _stream;
1149
0
    return Status::OK();
1150
0
}
1151
1152
} // end namespace doris