be/src/load/routine_load/data_consumer_group.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | #include "load/routine_load/data_consumer_group.h" |
18 | | |
19 | | #include <gen_cpp/PlanNodes_types.h> |
20 | | #include <stddef.h> |
21 | | |
22 | | #include <map> |
23 | | #include <ostream> |
24 | | #include <string> |
25 | | #include <utility> |
26 | | |
27 | | #include "common/logging.h" |
28 | | #include "librdkafka/rdkafkacpp.h" |
29 | | #include "load/routine_load/data_consumer.h" |
30 | | #include "load/stream_load/stream_load_context.h" |
31 | | #include "util/stopwatch.hpp" |
32 | | |
33 | | namespace doris { |
34 | | #include "common/compile_check_begin.h" |
35 | | |
36 | 1 | Status KafkaDataConsumerGroup::assign_topic_partitions(std::shared_ptr<StreamLoadContext> ctx) { |
37 | 1 | DCHECK(ctx->kafka_info); |
38 | 1 | DCHECK(_consumers.size() >= 1); |
39 | | |
40 | | // divide partitions |
41 | 1 | int consumer_size = doris::cast_set<int>(_consumers.size()); |
42 | 1 | std::vector<std::map<int32_t, int64_t>> divide_parts(consumer_size); |
43 | 1 | int i = 0; |
44 | 1 | for (auto& kv : ctx->kafka_info->begin_offset) { |
45 | 1 | int idx = i % consumer_size; |
46 | 1 | divide_parts[idx].emplace(kv.first, kv.second); |
47 | 1 | i++; |
48 | 1 | } |
49 | | |
50 | | // assign partitions to consumers equally |
51 | 2 | for (int j = 0; j < consumer_size; ++j) { |
52 | 1 | RETURN_IF_ERROR( |
53 | 1 | std::static_pointer_cast<KafkaDataConsumer>(_consumers[j]) |
54 | 1 | ->assign_topic_partitions(divide_parts[j], ctx->kafka_info->topic, ctx)); |
55 | 1 | } |
56 | | |
57 | 1 | return Status::OK(); |
58 | 1 | } |
59 | | |
60 | 1 | KafkaDataConsumerGroup::~KafkaDataConsumerGroup() { |
61 | | // clean the msgs left in queue |
62 | 1 | _queue.shutdown(); |
63 | 1 | while (true) { |
64 | 1 | RdKafka::Message* msg; |
65 | 1 | if (_queue.blocking_get(&msg)) { |
66 | 0 | delete msg; |
67 | 0 | msg = nullptr; |
68 | 1 | } else { |
69 | 1 | break; |
70 | 1 | } |
71 | 1 | } |
72 | 1 | DCHECK(_queue.get_size() == 0); |
73 | 1 | } |
74 | | |
75 | | Status KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx, |
76 | 1 | std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe) { |
77 | 1 | Status result_st = Status::OK(); |
78 | | // start all consumers |
79 | 1 | for (auto& consumer : _consumers) { |
80 | 1 | if (!_thread_pool.offer(std::bind<void>( |
81 | 1 | &KafkaDataConsumerGroup::actual_consume, this, consumer, &_queue, |
82 | 1 | ctx->max_interval_s * 1000, [this, &result_st](const Status& st) { |
83 | 1 | std::unique_lock<std::mutex> lock(_mutex); |
84 | 1 | _counter--; |
85 | 1 | VLOG_CRITICAL << "group counter is: " << _counter << ", grp: " << _grp_id; |
86 | 1 | if (_counter == 0) { |
87 | 1 | _queue.shutdown(); |
88 | 1 | LOG(INFO) << "all consumers are finished. shutdown queue. group id: " |
89 | 1 | << _grp_id; |
90 | 1 | } |
91 | 1 | if (result_st.ok() && !st.ok()) { |
92 | 0 | result_st = st; |
93 | 0 | } |
94 | 1 | }))) { |
95 | 0 | LOG(WARNING) << "failed to submit data consumer: " << consumer->id() |
96 | 0 | << ", group id: " << _grp_id; |
97 | 0 | return Status::InternalError("failed to submit data consumer"); |
98 | 1 | } else { |
99 | 1 | VLOG_CRITICAL << "submit a data consumer: " << consumer->id() |
100 | 0 | << ", group id: " << _grp_id; |
101 | 1 | } |
102 | 1 | } |
103 | | |
104 | | // consuming from queue and put data to stream load pipe |
105 | 1 | int64_t left_time = ctx->max_interval_s * 1000; |
106 | 1 | int64_t left_rows = ctx->max_batch_rows; |
107 | 1 | int64_t left_bytes = ctx->max_batch_size; |
108 | | |
109 | 1 | LOG(INFO) << "start consumer group: " << _grp_id << ". max time(ms): " << left_time |
110 | 1 | << ", batch rows: " << left_rows << ", batch size: " << left_bytes << ". " |
111 | 1 | << ctx->brief(); |
112 | | |
113 | | // copy one |
114 | 1 | std::map<int32_t, int64_t> cmt_offset = ctx->kafka_info->cmt_offset; |
115 | | |
116 | | //improve performance |
117 | 1 | Status (io::KafkaConsumerPipe::*append_data)(const char* data, size_t size); |
118 | 1 | if (ctx->format == TFileFormatType::FORMAT_JSON) { |
119 | 0 | append_data = &io::KafkaConsumerPipe::append_json; |
120 | 1 | } else { |
121 | 1 | append_data = &io::KafkaConsumerPipe::append_with_line_delimiter; |
122 | 1 | } |
123 | | |
124 | 1 | MonotonicStopWatch watch; |
125 | 1 | watch.start(); |
126 | 1 | bool eos = false; |
127 | 2 | while (true) { |
128 | 2 | if (eos || left_time <= 0 || left_rows <= 0 || left_bytes <= 0) { |
129 | 1 | LOG(INFO) << "consumer group done: " << _grp_id |
130 | 1 | << ". consume time(ms)=" << ctx->max_interval_s * 1000 - left_time |
131 | 1 | << ", received rows=" << ctx->max_batch_rows - left_rows |
132 | 1 | << ", received bytes=" << ctx->max_batch_size - left_bytes << ", eos: " << eos |
133 | 1 | << ", left_time: " << left_time << ", left_rows: " << left_rows |
134 | 1 | << ", left_bytes: " << left_bytes |
135 | 1 | << ", blocking get time(us): " << _queue.total_get_wait_time() / 1000 |
136 | 1 | << ", blocking put time(us): " << _queue.total_put_wait_time() / 1000 << ", " |
137 | 1 | << ctx->brief(); |
138 | | |
139 | | // shutdown queue |
140 | 1 | _queue.shutdown(); |
141 | | // cancel all consumers |
142 | 1 | for (auto& consumer : _consumers) { |
143 | 1 | static_cast<void>(consumer->cancel(ctx)); |
144 | 1 | } |
145 | | |
146 | | // waiting all threads finished |
147 | 1 | _thread_pool.shutdown(); |
148 | 1 | _thread_pool.join(); |
149 | 1 | if (!result_st.ok()) { |
150 | 0 | kafka_pipe->cancel(result_st.to_string()); |
151 | 0 | return result_st; |
152 | 0 | } |
153 | 1 | static_cast<void>(kafka_pipe->finish()); |
154 | 1 | ctx->kafka_info->cmt_offset = std::move(cmt_offset); |
155 | 1 | ctx->receive_bytes = ctx->max_batch_size - left_bytes; |
156 | 1 | return Status::OK(); |
157 | 1 | } |
158 | | |
159 | 1 | RdKafka::Message* msg; |
160 | 1 | bool res = _queue.controlled_blocking_get(&msg, config::blocking_queue_cv_wait_timeout_ms); |
161 | 1 | if (res) { |
162 | | // conf has to be deleted finally |
163 | 0 | Defer delete_msg {[msg]() { delete msg; }}; |
164 | 0 | VLOG_NOTICE << "get kafka message" |
165 | 0 | << ", partition: " << msg->partition() << ", offset: " << msg->offset() |
166 | 0 | << ", len: " << msg->len(); |
167 | |
|
168 | 0 | if (msg->err() == RdKafka::ERR__PARTITION_EOF) { |
169 | 0 | if (msg->offset() > 0) { |
170 | 0 | cmt_offset[msg->partition()] = msg->offset() - 1; |
171 | 0 | } |
172 | 0 | } else { |
173 | 0 | Status st = (kafka_pipe.get()->*append_data)( |
174 | 0 | static_cast<const char*>(msg->payload()), static_cast<size_t>(msg->len())); |
175 | 0 | if (st.ok()) { |
176 | 0 | left_rows--; |
177 | 0 | left_bytes -= msg->len(); |
178 | 0 | cmt_offset[msg->partition()] = msg->offset(); |
179 | 0 | VLOG_NOTICE << "consume partition[" << msg->partition() << " - " |
180 | 0 | << msg->offset() << "]"; |
181 | 0 | } else { |
182 | | // failed to append this msg, we must stop |
183 | 0 | LOG(WARNING) << "failed to append msg to pipe. grp: " << _grp_id; |
184 | 0 | eos = true; |
185 | 0 | { |
186 | 0 | std::unique_lock<std::mutex> lock(_mutex); |
187 | 0 | if (result_st.ok()) { |
188 | 0 | result_st = st; |
189 | 0 | } |
190 | 0 | } |
191 | 0 | } |
192 | 0 | } |
193 | 1 | } else { |
194 | | // queue is empty and shutdown |
195 | 1 | eos = true; |
196 | 1 | } |
197 | | |
198 | 1 | left_time = ctx->max_interval_s * 1000 - watch.elapsed_time() / 1000 / 1000; |
199 | 1 | } |
200 | | |
201 | 0 | return Status::OK(); |
202 | 1 | } |
203 | | |
204 | | void KafkaDataConsumerGroup::actual_consume(std::shared_ptr<DataConsumer> consumer, |
205 | | BlockingQueue<RdKafka::Message*>* queue, |
206 | 1 | int64_t max_running_time_ms, ConsumeFinishCallback cb) { |
207 | 1 | Status st = std::static_pointer_cast<KafkaDataConsumer>(consumer)->group_consume( |
208 | 1 | queue, max_running_time_ms); |
209 | 1 | cb(st); |
210 | 1 | } |
211 | | #include "common/compile_check_end.h" |
212 | | |
213 | | } // namespace doris |