be/src/load/routine_load/data_consumer_group.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | #include "load/routine_load/data_consumer_group.h" |
18 | | |
19 | | #include <gen_cpp/PlanNodes_types.h> |
20 | | #include <stddef.h> |
21 | | |
22 | | #include <map> |
23 | | #include <ostream> |
24 | | #include <utility> |
25 | | |
26 | | #include "common/logging.h" |
27 | | #include "io/fs/kafka_consumer_pipe.h" |
28 | | #include "io/fs/kinesis_consumer_pipe.h" |
29 | | #include "librdkafka/rdkafkacpp.h" |
30 | | #include "load/routine_load/consumer_group_helpers.h" |
31 | | #include "load/routine_load/data_consumer.h" |
32 | | #include "load/stream_load/stream_load_context.h" |
33 | | #include "util/stopwatch.hpp" |
34 | | |
35 | | namespace doris { |
36 | | |
37 | | bool DataConsumerGroup::_submit_all_consumers( |
38 | | std::function<void(std::shared_ptr<DataConsumer>, ConsumeFinishCallback)> consume_fn, |
39 | 1 | std::function<void()> shutdown_fn, Status& result_st) { |
40 | 1 | for (auto& consumer : _consumers) { |
41 | 1 | auto cb = [this, shutdown_fn, &result_st](const Status& st) { |
42 | 1 | std::unique_lock<std::mutex> lock(_mutex); |
43 | 1 | if (--_counter == 0) { |
44 | 1 | shutdown_fn(); |
45 | 1 | LOG(INFO) << "all consumers finished, shutdown queue. grp: " << _grp_id; |
46 | 1 | } |
47 | 1 | if (result_st.ok() && !st.ok()) { |
48 | 0 | result_st = st; |
49 | 0 | } |
50 | 1 | }; |
51 | 1 | if (!_thread_pool.offer([consume_fn, consumer, cb] { consume_fn(consumer, cb); })) { |
52 | 0 | LOG(WARNING) << "failed to submit consumer: " << consumer->id() << ", grp: " << _grp_id; |
53 | 0 | return false; |
54 | 0 | } |
55 | 1 | VLOG_CRITICAL << "submit consumer: " << consumer->id() << ", grp: " << _grp_id; |
56 | 1 | } |
57 | 1 | return true; |
58 | 1 | } |
59 | | |
60 | | Status DataConsumerGroup::_run_consume_loop(std::shared_ptr<StreamLoadContext> ctx, |
61 | | std::shared_ptr<io::StreamLoadPipe> pipe, |
62 | 1 | Status& result_st) { |
63 | 1 | int64_t left_time = ctx->max_interval_s * 1000; |
64 | 1 | int64_t left_rows = ctx->max_batch_rows; |
65 | 1 | int64_t left_bytes = ctx->max_batch_size; |
66 | | |
67 | 1 | LOG(INFO) << "start consumer group: " << _grp_id << ". max time(ms): " << left_time |
68 | 1 | << ", batch rows: " << left_rows << ", batch size: " << left_bytes << ". " |
69 | 1 | << ctx->brief(); |
70 | | |
71 | 1 | MonotonicStopWatch watch; |
72 | 1 | watch.start(); |
73 | 1 | bool eos = false; |
74 | 2 | while (true) { |
75 | 2 | if (eos || left_time <= 0 || left_rows <= 0 || left_bytes <= 0) { |
76 | 1 | LOG(INFO) << "consumer group done: " << _grp_id |
77 | 1 | << ". consume time(ms)=" << ctx->max_interval_s * 1000 - left_time |
78 | 1 | << ", received rows=" << ctx->max_batch_rows - left_rows |
79 | 1 | << ", received bytes=" << ctx->max_batch_size - left_bytes << ", eos: " << eos |
80 | 1 | << ", left_time: " << left_time << ", left_rows: " << left_rows |
81 | 1 | << ", left_bytes: " << left_bytes |
82 | 1 | << ", blocking get time(us): " << pipe->get_queue_size() << ", " |
83 | 1 | << ctx->brief(); |
84 | | |
85 | 1 | _shutdown_queue(); |
86 | 1 | for (auto& consumer : _consumers) { |
87 | 1 | static_cast<void>(consumer->cancel(ctx)); |
88 | 1 | } |
89 | 1 | _thread_pool.shutdown(); |
90 | 1 | _thread_pool.join(); |
91 | 1 | if (!result_st.ok()) { |
92 | 0 | pipe->cancel(result_st.to_string()); |
93 | 0 | return result_st; |
94 | 0 | } |
95 | 1 | RETURN_IF_ERROR(pipe->finish()); |
96 | 1 | _on_finish(ctx); |
97 | 1 | ctx->receive_bytes = ctx->max_batch_size - left_bytes; |
98 | 1 | return Status::OK(); |
99 | 1 | } |
100 | | |
101 | 1 | if (!_dequeue_and_process(pipe.get(), left_rows, left_bytes, result_st)) { |
102 | 1 | eos = true; |
103 | 1 | } |
104 | 1 | left_time = ctx->max_interval_s * 1000 - watch.elapsed_time() / 1000 / 1000; |
105 | 1 | } |
106 | 1 | } |
107 | | |
108 | 1 | Status KafkaDataConsumerGroup::assign_topic_partitions(std::shared_ptr<StreamLoadContext> ctx) { |
109 | 1 | DCHECK(ctx->kafka_info); |
110 | 1 | DCHECK(_consumers.size() >= 1); |
111 | | |
112 | | // divide partitions using round-robin partitioner |
113 | 1 | int consumer_size = doris::cast_set<int>(_consumers.size()); |
114 | 1 | auto divide_parts = WorkPartitioner<int32_t, int64_t>::partition_round_robin( |
115 | 1 | ctx->kafka_info->begin_offset, consumer_size); |
116 | | |
117 | | // assign partitions to consumers equally |
118 | 2 | for (int j = 0; j < consumer_size; ++j) { |
119 | 1 | RETURN_IF_ERROR( |
120 | 1 | std::static_pointer_cast<KafkaDataConsumer>(_consumers[j]) |
121 | 1 | ->assign_topic_partitions(divide_parts[j], ctx->kafka_info->topic, ctx)); |
122 | 1 | } |
123 | | |
124 | 1 | return Status::OK(); |
125 | 1 | } |
126 | | |
127 | 1 | KafkaDataConsumerGroup::~KafkaDataConsumerGroup() { |
128 | | // clean the msgs left in queue |
129 | 1 | _queue.shutdown(); |
130 | 1 | while (true) { |
131 | 1 | RdKafka::Message* msg; |
132 | 1 | if (_queue.blocking_get(&msg)) { |
133 | 0 | delete msg; |
134 | 0 | msg = nullptr; |
135 | 1 | } else { |
136 | 1 | break; |
137 | 1 | } |
138 | 1 | } |
139 | 1 | DCHECK(_queue.get_size() == 0); |
140 | 1 | } |
141 | | |
142 | | Status KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx, |
143 | 1 | std::shared_ptr<io::StreamLoadPipe> pipe) { |
144 | 1 | DORIS_CHECK(std::dynamic_pointer_cast<io::KafkaConsumerPipe>(pipe) != nullptr); |
145 | 1 | Status result_st = Status::OK(); |
146 | 1 | _cmt_offset = ctx->kafka_info->cmt_offset; |
147 | 1 | _format = ctx->format; |
148 | | |
149 | 1 | if (!_submit_all_consumers( |
150 | 1 | [this, max_time = ctx->max_interval_s * 1000](std::shared_ptr<DataConsumer> c, |
151 | 1 | ConsumeFinishCallback cb) { |
152 | 1 | actual_consume(c, &_queue, max_time, cb); |
153 | 1 | }, |
154 | 1 | [this] { _queue.shutdown(); }, result_st)) { |
155 | 0 | return Status::InternalError("failed to submit data consumer"); |
156 | 0 | } |
157 | 1 | RETURN_IF_ERROR(_run_consume_loop(ctx, pipe, result_st)); |
158 | 1 | ctx->kafka_info->cmt_offset = std::move(_cmt_offset); |
159 | 1 | return Status::OK(); |
160 | 1 | } |
161 | | |
162 | | bool KafkaDataConsumerGroup::_dequeue_and_process(io::StreamLoadPipe* pipe, int64_t& left_rows, |
163 | 1 | int64_t& left_bytes, Status& result_st) { |
164 | 1 | RdKafka::Message* msg = nullptr; |
165 | 1 | if (!_queue.controlled_blocking_get(&msg, config::blocking_queue_cv_wait_timeout_ms)) { |
166 | 1 | return false; |
167 | 1 | } |
168 | 0 | Defer delete_msg {[msg] { delete msg; }}; |
169 | 0 | VLOG_NOTICE << "get kafka message, partition: " << msg->partition() |
170 | 0 | << ", offset: " << msg->offset() << ", len: " << msg->len(); |
171 | |
|
172 | 0 | if (msg->err() == RdKafka::ERR__PARTITION_EOF) { |
173 | 0 | if (msg->offset() > 0) { |
174 | 0 | _cmt_offset[msg->partition()] = msg->offset() - 1; |
175 | 0 | } |
176 | 0 | return true; |
177 | 0 | } |
178 | | |
179 | 0 | auto append_fn = FormatAppender::get_append_function<io::StreamLoadPipe>(_format); |
180 | 0 | Status st = (pipe->*append_fn)(static_cast<const char*>(msg->payload()), |
181 | 0 | static_cast<size_t>(msg->len())); |
182 | 0 | if (st.ok()) { |
183 | 0 | left_rows--; |
184 | 0 | left_bytes -= msg->len(); |
185 | 0 | _cmt_offset[msg->partition()] = msg->offset(); |
186 | 0 | VLOG_NOTICE << "consume partition[" << msg->partition() << " - " << msg->offset() << "]"; |
187 | 0 | } else { |
188 | 0 | LOG(WARNING) << "failed to append msg to pipe. grp: " << _grp_id; |
189 | 0 | std::unique_lock<std::mutex> lock(_mutex); |
190 | 0 | if (result_st.ok()) { |
191 | 0 | result_st = st; |
192 | 0 | } |
193 | 0 | } |
194 | 0 | return true; |
195 | 0 | } |
196 | | |
197 | 1 | void KafkaDataConsumerGroup::_on_finish(std::shared_ptr<StreamLoadContext> ctx) { |
198 | | // cmt_offset is moved back in start_all after _run_consume_loop returns |
199 | 1 | } |
200 | | |
201 | | void KafkaDataConsumerGroup::actual_consume(std::shared_ptr<DataConsumer> consumer, |
202 | | BlockingQueue<RdKafka::Message*>* queue, |
203 | 1 | int64_t max_running_time_ms, ConsumeFinishCallback cb) { |
204 | 1 | Status st = std::static_pointer_cast<KafkaDataConsumer>(consumer)->group_consume( |
205 | 1 | queue, max_running_time_ms); |
206 | 1 | cb(st); |
207 | 1 | } |
208 | | |
209 | 0 | Status KinesisDataConsumerGroup::assign_stream_shards(std::shared_ptr<StreamLoadContext> ctx) { |
210 | 0 | DCHECK(ctx->kinesis_info); |
211 | 0 | DCHECK(_consumers.size() >= 1); |
212 | | |
213 | | // divide shards using round-robin partitioner |
214 | 0 | int consumer_size = doris::cast_set<int>(_consumers.size()); |
215 | 0 | auto divide_shards = WorkPartitioner<std::string, std::string>::partition_round_robin( |
216 | 0 | ctx->kinesis_info->begin_sequence_number, consumer_size); |
217 | | |
218 | | // assign shards to consumers equally |
219 | 0 | for (int j = 0; j < consumer_size; ++j) { |
220 | 0 | RETURN_IF_ERROR(std::static_pointer_cast<KinesisDataConsumer>(_consumers[j]) |
221 | 0 | ->assign_shards(divide_shards[j], ctx->kinesis_info->stream, ctx)); |
222 | 0 | } |
223 | | |
224 | 0 | return Status::OK(); |
225 | 0 | } |
226 | | |
227 | 0 | KinesisDataConsumerGroup::~KinesisDataConsumerGroup() { |
228 | 0 | _queue.shutdown(); |
229 | 0 | while (true) { |
230 | 0 | std::shared_ptr<Aws::Kinesis::Model::Record> record; |
231 | 0 | if (_queue.blocking_get(&record)) { |
232 | 0 | record.reset(); |
233 | 0 | } else { |
234 | 0 | break; |
235 | 0 | } |
236 | 0 | } |
237 | 0 | DCHECK(_queue.get_size() == 0); |
238 | 0 | } |
239 | | |
240 | | Status KinesisDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx, |
241 | 0 | std::shared_ptr<io::StreamLoadPipe> pipe) { |
242 | 0 | DORIS_CHECK(std::dynamic_pointer_cast<io::KinesisConsumerPipe>(pipe) != nullptr); |
243 | 0 | Status result_st = Status::OK(); |
244 | 0 | _format = ctx->format; |
245 | |
|
246 | 0 | if (!_submit_all_consumers( |
247 | 0 | [this, max_time = ctx->max_interval_s * 1000](std::shared_ptr<DataConsumer> c, |
248 | 0 | ConsumeFinishCallback cb) { |
249 | 0 | actual_consume(c, &_queue, max_time, cb); |
250 | 0 | }, |
251 | 0 | [this] { _queue.shutdown(); }, result_st)) { |
252 | 0 | return Status::InternalError("failed to submit kinesis data consumer"); |
253 | 0 | } |
254 | 0 | return _run_consume_loop(ctx, pipe, result_st); |
255 | 0 | } |
256 | | |
257 | | bool KinesisDataConsumerGroup::_dequeue_and_process(io::StreamLoadPipe* pipe, int64_t& left_rows, |
258 | 0 | int64_t& left_bytes, Status& result_st) { |
259 | 0 | std::shared_ptr<Aws::Kinesis::Model::Record> record; |
260 | 0 | if (!_queue.controlled_blocking_get(&record, config::blocking_queue_cv_wait_timeout_ms)) { |
261 | 0 | return false; |
262 | 0 | } |
263 | 0 | auto& data = record->GetData(); |
264 | 0 | const char* payload = reinterpret_cast<const char*>(data.GetUnderlyingData()); |
265 | 0 | size_t len = data.GetLength(); |
266 | 0 | VLOG_NOTICE << "get kinesis record, seq: " << record->GetSequenceNumber() << ", len: " << len; |
267 | |
|
268 | 0 | auto append_fn = FormatAppender::get_append_function<io::StreamLoadPipe>(_format); |
269 | 0 | Status st = (pipe->*append_fn)(payload, len); |
270 | 0 | if (st.ok()) { |
271 | 0 | left_rows--; |
272 | 0 | left_bytes -= len; |
273 | 0 | VLOG_NOTICE << "consume kinesis record [seq=" << record->GetSequenceNumber() << "]"; |
274 | 0 | } else { |
275 | 0 | LOG(WARNING) << "failed to append kinesis record to pipe. grp: " << _grp_id; |
276 | 0 | std::unique_lock<std::mutex> lock(_mutex); |
277 | 0 | if (result_st.ok()) { |
278 | 0 | result_st = st; |
279 | 0 | } |
280 | 0 | } |
281 | 0 | return true; |
282 | 0 | } |
283 | | |
284 | 0 | void KinesisDataConsumerGroup::_on_finish(std::shared_ptr<StreamLoadContext> ctx) { |
285 | 0 | for (auto& consumer : _consumers) { |
286 | 0 | auto kinesis_consumer = std::static_pointer_cast<KinesisDataConsumer>(consumer); |
287 | 0 | for (auto& [shard_id, seq_num] : kinesis_consumer->get_committed_sequence_numbers()) { |
288 | 0 | ctx->kinesis_info->cmt_sequence_number[shard_id] = seq_num; |
289 | 0 | } |
290 | 0 | for (auto& [shard_id, millis] : kinesis_consumer->get_millis_behind_latest()) { |
291 | 0 | auto [it, inserted] = ctx->kinesis_info->millis_behind_latest.emplace(shard_id, millis); |
292 | 0 | if (!inserted && it->second < millis) { |
293 | 0 | it->second = millis; |
294 | 0 | } |
295 | 0 | } |
296 | 0 | for (auto& shard_id : kinesis_consumer->get_closed_shard_ids()) { |
297 | 0 | ctx->kinesis_info->closed_shard_ids.insert(shard_id); |
298 | 0 | } |
299 | 0 | } |
300 | 0 | } |
301 | | |
302 | | void KinesisDataConsumerGroup::actual_consume( |
303 | | std::shared_ptr<DataConsumer> consumer, |
304 | | BlockingQueue<std::shared_ptr<Aws::Kinesis::Model::Record>>* queue, |
305 | 0 | int64_t max_running_time_ms, ConsumeFinishCallback cb) { |
306 | 0 | Status st = std::static_pointer_cast<KinesisDataConsumer>(consumer)->group_consume( |
307 | 0 | queue, max_running_time_ms); |
308 | 0 | cb(st); |
309 | 0 | } |
310 | | |
311 | | } // namespace doris |