Coverage Report

Created: 2026-05-15 00:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/routine_load/data_consumer.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <aws/kinesis/KinesisClient.h>
21
#include <aws/kinesis/model/GetRecordsRequest.h>
22
#include <aws/kinesis/model/GetRecordsResult.h>
23
#include <aws/kinesis/model/GetShardIteratorRequest.h>
24
#include <aws/kinesis/model/ListShardsRequest.h>
25
#include <aws/kinesis/model/Record.h>
26
#include <stdint.h>
27
28
#include <ctime>
29
#include <map>
30
#include <memory>
31
#include <mutex>
32
#include <ostream>
33
#include <string>
34
#include <unordered_map>
35
#include <vector>
36
37
#include "common/logging.h"
38
#include "common/status.h"
39
#include "librdkafka/rdkafkacpp.h"
40
#include "load/routine_load/kinesis_conf.h"
41
#include "load/stream_load/stream_load_context.h"
42
#include "runtime/aws_msk_iam_auth.h"
43
#include "util/uid_util.h"
44
45
namespace doris {
46
47
template <typename T>
48
class BlockingQueue;
49
50
class DataConsumer {
51
public:
52
    DataConsumer()
53
2
            : _id(UniqueId::gen_uid()),
54
2
              _grp_id(UniqueId::gen_uid()),
55
2
              _has_grp(false),
56
2
              _init(false),
57
2
              _cancelled(false),
58
2
              _last_visit_time(0) {}
59
60
2
    virtual ~DataConsumer() {}
61
62
    // init the consumer with the given parameters
63
    virtual Status init(std::shared_ptr<StreamLoadContext> ctx) = 0;
64
    // start consuming
65
    virtual Status consume(std::shared_ptr<StreamLoadContext> ctx) = 0;
66
    // cancel the consuming process.
67
    // if the consumer is not initialized, or the consuming
68
    // process is already finished, call cancel() will
69
    // return ERROR
70
    virtual Status cancel(std::shared_ptr<StreamLoadContext> ctx) = 0;
71
    // reset the data consumer before being reused
72
    virtual Status reset() = 0;
73
    // return true the if the consumer match the need
74
    virtual bool match(std::shared_ptr<StreamLoadContext> ctx) = 0;
75
76
0
    const UniqueId& id() { return _id; }
77
0
    time_t last_visit_time() { return _last_visit_time; }
78
1
    void set_grp(const UniqueId& grp_id) {
79
1
        _grp_id = grp_id;
80
1
        _has_grp = true;
81
1
    }
82
83
protected:
84
    UniqueId _id;
85
    UniqueId _grp_id;
86
    bool _has_grp;
87
88
    // lock to protect the following bools
89
    std::mutex _lock;
90
    bool _init;
91
    bool _cancelled;
92
    time_t _last_visit_time;
93
};
94
95
class PIntegerPair;
96
97
class KafkaEventCb : public RdKafka::EventCb {
98
public:
99
19
    void event_cb(RdKafka::Event& event) {
100
19
        switch (event.type()) {
101
7
        case RdKafka::Event::EVENT_ERROR:
102
7
            LOG(INFO) << "kafka error: " << RdKafka::err2str(event.err())
103
7
                      << ", event: " << event.str();
104
7
            break;
105
0
        case RdKafka::Event::EVENT_STATS:
106
0
            LOG(INFO) << "kafka stats: " << event.str();
107
0
            break;
108
109
12
        case RdKafka::Event::EVENT_LOG:
110
12
            LOG(INFO) << "kafka log-" << event.severity() << "-" << event.fac().c_str()
111
12
                      << ", event: " << event.str();
112
12
            break;
113
114
0
        case RdKafka::Event::EVENT_THROTTLE:
115
0
            LOG(INFO) << "kafka throttled: " << event.throttle_time() << "ms by "
116
0
                      << event.broker_name() << " id " << (int)event.broker_id();
117
0
            break;
118
119
0
        default:
120
0
            LOG(INFO) << "kafka event: " << event.type()
121
0
                      << ", err: " << RdKafka::err2str(event.err()) << ", event: " << event.str();
122
0
            break;
123
19
        }
124
19
    }
125
};
126
127
class KafkaDataConsumer : public DataConsumer {
128
public:
129
    KafkaDataConsumer(std::shared_ptr<StreamLoadContext> ctx)
130
2
            : _brokers(ctx->kafka_info->brokers), _topic(ctx->kafka_info->topic) {}
131
132
2
    virtual ~KafkaDataConsumer() {
133
2
        VLOG_NOTICE << "deconstruct consumer";
134
2
        if (_k_consumer) {
135
2
            _k_consumer->close();
136
2
            delete _k_consumer;
137
2
            _k_consumer = nullptr;
138
2
        }
139
2
    }
140
141
    Status init(std::shared_ptr<StreamLoadContext> ctx) override;
142
    // TODO(cmy): currently do not implement single consumer start method, using group_consume
143
0
    Status consume(std::shared_ptr<StreamLoadContext> ctx) override { return Status::OK(); }
144
    Status cancel(std::shared_ptr<StreamLoadContext> ctx) override;
145
    // reassign partition topics
146
    virtual Status reset() override;
147
    bool match(std::shared_ptr<StreamLoadContext> ctx) override;
148
    // commit kafka offset
149
    Status commit(std::vector<RdKafka::TopicPartition*>& offset);
150
151
    Status assign_topic_partitions(const std::map<int32_t, int64_t>& begin_partition_offset,
152
                                   const std::string& topic,
153
                                   std::shared_ptr<StreamLoadContext> ctx);
154
155
    // start the consumer and put msgs to queue
156
    Status group_consume(BlockingQueue<RdKafka::Message*>* queue, int64_t max_running_time_ms);
157
158
    // get the partitions ids of the topic
159
    Status get_partition_meta(std::vector<int32_t>* partition_ids);
160
    // get offsets for times
161
    Status get_offsets_for_times(const std::vector<PIntegerPair>& times,
162
                                 std::vector<PIntegerPair>* offsets, int timeout);
163
    // get latest offsets for partitions
164
    Status get_latest_offsets_for_partitions(const std::vector<int32_t>& partition_ids,
165
                                             std::vector<PIntegerPair>* offsets, int timeout);
166
    // get offsets for times
167
    Status get_real_offsets_for_partitions(const std::vector<PIntegerPair>& offset_flags,
168
                                           std::vector<PIntegerPair>* offsets, int timeout);
169
170
private:
171
    std::string _brokers;
172
    std::string _topic;
173
    std::unordered_map<std::string, std::string> _custom_properties;
174
    std::set<int32_t> _consuming_partition_ids;
175
176
    KafkaEventCb _k_event_cb;
177
    RdKafka::KafkaConsumer* _k_consumer = nullptr;
178
179
    // AWS MSK IAM authentication callback (must outlive _k_consumer)
180
    std::unique_ptr<AwsMskIamOAuthCallback> _aws_msk_oauth_callback;
181
};
182
183
// AWS Kinesis Data Consumer
184
// Consumes data from AWS Kinesis Data Streams for routine load jobs.
185
// Kinesis is similar to Kafka but uses shards instead of partitions
186
// and sequence numbers (strings) instead of offsets (integers).
187
class KinesisDataConsumer : public DataConsumer {
188
public:
189
    KinesisDataConsumer(std::shared_ptr<StreamLoadContext> ctx);
190
    virtual ~KinesisDataConsumer();
191
192
    // DataConsumer interface implementation
193
    Status init(std::shared_ptr<StreamLoadContext> ctx) override;
194
0
    Status consume(std::shared_ptr<StreamLoadContext> ctx) override { return Status::OK(); }
195
    Status cancel(std::shared_ptr<StreamLoadContext> ctx) override;
196
    Status reset() override;
197
    bool match(std::shared_ptr<StreamLoadContext> ctx) override;
198
199
    // Kinesis-specific methods
200
    // Assign shards with their starting sequence numbers
201
    Status assign_shards(const std::map<std::string, std::string>& shard_sequence_numbers,
202
                         const std::string& stream_name, std::shared_ptr<StreamLoadContext> ctx);
203
204
    // Main consumption loop - pulls records from all assigned shards
205
    Status group_consume(BlockingQueue<std::shared_ptr<Aws::Kinesis::Model::Record>>* queue,
206
                         int64_t max_running_time_ms);
207
208
    // Get list of shard IDs
209
    Status get_shard_list(std::vector<std::string>* shard_ids);
210
211
private:
212
    // Configuration - Basic AWS settings
213
    std::string _region;
214
    std::string _stream;
215
    std::string _endpoint; // Optional custom endpoint (e.g., LocalStack)
216
217
    // Type 1: Doris-internal parameters (not passed to AWS SDK)
218
    std::unordered_map<std::string, std::string> _doris_internal_properties;
219
220
    // Type 2: Frequently-used AWS parameters (explicit members for performance)
221
    // These are parsed from aws.kinesis.* properties during init()
222
    std::vector<std::string> _explicit_shards; // aws.kinesis.shards (comma-separated)
223
    std::string _default_position;             // aws.kinesis.default.pos (LATEST/TRIM_HORIZON)
224
    std::map<std::string, std::string>
225
            _shard_positions; // aws.kinesis.shards.pos (shard_id:position)
226
227
    // Type 3: Less-frequently-used AWS API parameters (wrapped in KinesisConf)
228
    std::unique_ptr<KinesisConf> _kinesis_conf;
229
230
    // AWS credentials and other properties
231
    std::unordered_map<std::string, std::string> _custom_properties;
232
233
    // Active shards being consumed
234
    std::set<std::string> _consuming_shard_ids;
235
236
    // AWS Kinesis client
237
    std::shared_ptr<Aws::Kinesis::KinesisClient> _kinesis_client;
238
239
    // Shard iterator management
240
    // Kinesis requires shard iterators to consume records
241
    // shard_id -> current shard iterator
242
    std::map<std::string, std::string> _shard_iterators;
243
244
    // Tracks the MillisBehindLatest value per shard from the last GetRecords call.
245
    // Updated during group_consume; read by the task executor to populate ctx after consumption.
246
    std::map<std::string, int64_t> _millis_behind_latest;
247
248
    // Tracks the last consumed sequence number per shard.
249
    // Updated during group_consume via _process_records; read by the consumer group
250
    // to populate ctx->kinesis_info->cmt_sequence_number after consumption.
251
    std::map<std::string, std::string> _committed_sequence_numbers;
252
253
    // Tracks shards that have been closed (split/merge) during consumption.
254
    // FE should remove these shards from its tracking to avoid reassigning them.
255
    std::set<std::string> _closed_shard_ids;
256
257
public:
258
    // Returns the MillisBehindLatest snapshot collected during group_consume.
259
0
    const std::map<std::string, int64_t>& get_millis_behind_latest() const {
260
0
        return _millis_behind_latest;
261
0
    }
262
263
    // Returns the committed sequence numbers per shard collected during group_consume.
264
0
    const std::map<std::string, std::string>& get_committed_sequence_numbers() const {
265
0
        return _committed_sequence_numbers;
266
0
    }
267
268
    // Returns the set of closed shard IDs detected during group_consume.
269
0
    const std::set<std::string>& get_closed_shard_ids() const { return _closed_shard_ids; }
270
271
private:
272
    // Helper methods
273
    // Create and configure AWS Kinesis client with credentials
274
    Status _create_kinesis_client(std::shared_ptr<StreamLoadContext> ctx);
275
276
    // Get shard iterator for a shard at a specific sequence number position
277
    Status _get_shard_iterator(const std::string& shard_id, const std::string& sequence_number,
278
                               std::string* iterator);
279
280
    // Process records from GetRecords result and add to queue
281
    Status _process_records(const std::string& shard_id,
282
                            Aws::Kinesis::Model::GetRecordsResult result,
283
                            BlockingQueue<std::shared_ptr<Aws::Kinesis::Model::Record>>* queue,
284
                            int64_t* received_rows, int64_t* put_rows);
285
286
    // Check if an AWS error is retriable (throttling, network, etc.)
287
    bool _is_retriable_error(const Aws::Client::AWSError<Aws::Kinesis::KinesisErrors>& error);
288
};
289
290
} // end namespace doris