be/src/load/routine_load/data_consumer.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <stdint.h> |
21 | | |
22 | | #include <ctime> |
23 | | #include <map> |
24 | | #include <memory> |
25 | | #include <mutex> |
26 | | #include <ostream> |
27 | | #include <string> |
28 | | #include <unordered_map> |
29 | | #include <vector> |
30 | | |
31 | | #include "common/logging.h" |
32 | | #include "common/status.h" |
33 | | #include "librdkafka/rdkafkacpp.h" |
34 | | #include "load/stream_load/stream_load_context.h" |
35 | | #include "util/uid_util.h" |
36 | | |
37 | | namespace doris { |
38 | | |
39 | | template <typename T> |
40 | | class BlockingQueue; |
41 | | |
42 | | class DataConsumer { |
43 | | public: |
44 | | DataConsumer() |
45 | 78 | : _id(UniqueId::gen_uid()), |
46 | 78 | _grp_id(UniqueId::gen_uid()), |
47 | 78 | _has_grp(false), |
48 | 78 | _init(false), |
49 | 78 | _cancelled(false), |
50 | 78 | _last_visit_time(0) {} |
51 | | |
52 | 78 | virtual ~DataConsumer() {} |
53 | | |
54 | | // init the consumer with the given parameters |
55 | | virtual Status init(std::shared_ptr<StreamLoadContext> ctx) = 0; |
56 | | // start consuming |
57 | | virtual Status consume(std::shared_ptr<StreamLoadContext> ctx) = 0; |
58 | | // cancel the consuming process. |
59 | | // if the consumer is not initialized, or the consuming |
60 | | // process is already finished, call cancel() will |
61 | | // return ERROR |
62 | | virtual Status cancel(std::shared_ptr<StreamLoadContext> ctx) = 0; |
63 | | // reset the data consumer before being reused |
64 | | virtual Status reset() = 0; |
65 | | // return true the if the consumer match the need |
66 | | virtual bool match(std::shared_ptr<StreamLoadContext> ctx) = 0; |
67 | | |
68 | 71 | const UniqueId& id() { return _id; } |
69 | 853 | time_t last_visit_time() { return _last_visit_time; } |
70 | 71 | void set_grp(const UniqueId& grp_id) { |
71 | 71 | _grp_id = grp_id; |
72 | 71 | _has_grp = true; |
73 | 71 | } |
74 | | |
75 | | protected: |
76 | | UniqueId _id; |
77 | | UniqueId _grp_id; |
78 | | bool _has_grp; |
79 | | |
80 | | // lock to protect the following bools |
81 | | std::mutex _lock; |
82 | | bool _init; |
83 | | bool _cancelled; |
84 | | time_t _last_visit_time; |
85 | | }; |
86 | | |
87 | | class PIntegerPair; |
88 | | |
89 | | class KafkaEventCb : public RdKafka::EventCb { |
90 | | public: |
91 | 247 | void event_cb(RdKafka::Event& event) { |
92 | 247 | switch (event.type()) { |
93 | 7 | case RdKafka::Event::EVENT_ERROR: |
94 | 7 | LOG(INFO) << "kafka error: " << RdKafka::err2str(event.err()) |
95 | 7 | << ", event: " << event.str(); |
96 | 7 | break; |
97 | 0 | case RdKafka::Event::EVENT_STATS: |
98 | 0 | LOG(INFO) << "kafka stats: " << event.str(); |
99 | 0 | break; |
100 | | |
101 | 240 | case RdKafka::Event::EVENT_LOG: |
102 | 240 | LOG(INFO) << "kafka log-" << event.severity() << "-" << event.fac().c_str() |
103 | 240 | << ", event: " << event.str(); |
104 | 240 | break; |
105 | | |
106 | 0 | case RdKafka::Event::EVENT_THROTTLE: |
107 | 0 | LOG(INFO) << "kafka throttled: " << event.throttle_time() << "ms by " |
108 | 0 | << event.broker_name() << " id " << (int)event.broker_id(); |
109 | 0 | break; |
110 | | |
111 | 0 | default: |
112 | 0 | LOG(INFO) << "kafka event: " << event.type() |
113 | 0 | << ", err: " << RdKafka::err2str(event.err()) << ", event: " << event.str(); |
114 | 0 | break; |
115 | 247 | } |
116 | 247 | } |
117 | | }; |
118 | | |
119 | | class KafkaDataConsumer : public DataConsumer { |
120 | | public: |
121 | | KafkaDataConsumer(std::shared_ptr<StreamLoadContext> ctx) |
122 | 78 | : _brokers(ctx->kafka_info->brokers), _topic(ctx->kafka_info->topic) {} |
123 | | |
124 | 78 | virtual ~KafkaDataConsumer() { |
125 | 78 | VLOG_NOTICE << "deconstruct consumer"; |
126 | 78 | if (_k_consumer) { |
127 | 78 | _k_consumer->close(); |
128 | 78 | delete _k_consumer; |
129 | 78 | _k_consumer = nullptr; |
130 | 78 | } |
131 | 78 | } |
132 | | |
133 | | Status init(std::shared_ptr<StreamLoadContext> ctx) override; |
134 | | // TODO(cmy): currently do not implement single consumer start method, using group_consume |
135 | 0 | Status consume(std::shared_ptr<StreamLoadContext> ctx) override { return Status::OK(); } |
136 | | Status cancel(std::shared_ptr<StreamLoadContext> ctx) override; |
137 | | // reassign partition topics |
138 | | virtual Status reset() override; |
139 | | bool match(std::shared_ptr<StreamLoadContext> ctx) override; |
140 | | // commit kafka offset |
141 | | Status commit(std::vector<RdKafka::TopicPartition*>& offset); |
142 | | |
143 | | Status assign_topic_partitions(const std::map<int32_t, int64_t>& begin_partition_offset, |
144 | | const std::string& topic, |
145 | | std::shared_ptr<StreamLoadContext> ctx); |
146 | | |
147 | | // start the consumer and put msgs to queue |
148 | | Status group_consume(BlockingQueue<RdKafka::Message*>* queue, int64_t max_running_time_ms); |
149 | | |
150 | | // get the partitions ids of the topic |
151 | | Status get_partition_meta(std::vector<int32_t>* partition_ids); |
152 | | // get offsets for times |
153 | | Status get_offsets_for_times(const std::vector<PIntegerPair>& times, |
154 | | std::vector<PIntegerPair>* offsets, int timeout); |
155 | | // get latest offsets for partitions |
156 | | Status get_latest_offsets_for_partitions(const std::vector<int32_t>& partition_ids, |
157 | | std::vector<PIntegerPair>* offsets, int timeout); |
158 | | // get offsets for times |
159 | | Status get_real_offsets_for_partitions(const std::vector<PIntegerPair>& offset_flags, |
160 | | std::vector<PIntegerPair>* offsets, int timeout); |
161 | | |
162 | | private: |
163 | | std::string _brokers; |
164 | | std::string _topic; |
165 | | std::unordered_map<std::string, std::string> _custom_properties; |
166 | | std::set<int32_t> _consuming_partition_ids; |
167 | | |
168 | | KafkaEventCb _k_event_cb; |
169 | | RdKafka::KafkaConsumer* _k_consumer = nullptr; |
170 | | }; |
171 | | |
172 | | } // end namespace doris |