Coverage Report

Created: 2026-05-14 19:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/routine_load/data_consumer_group.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
#include "load/routine_load/data_consumer_group.h"
18
19
#include <gen_cpp/PlanNodes_types.h>
20
#include <stddef.h>
21
22
#include <map>
23
#include <ostream>
24
#include <utility>
25
26
#include "common/logging.h"
27
#include "io/fs/kafka_consumer_pipe.h"
28
#include "io/fs/kinesis_consumer_pipe.h"
29
#include "librdkafka/rdkafkacpp.h"
30
#include "load/routine_load/consumer_group_helpers.h"
31
#include "load/routine_load/data_consumer.h"
32
#include "load/stream_load/stream_load_context.h"
33
#include "util/stopwatch.hpp"
34
35
namespace doris {
36
37
bool DataConsumerGroup::_submit_all_consumers(
38
        std::function<void(std::shared_ptr<DataConsumer>, ConsumeFinishCallback)> consume_fn,
39
1
        std::function<void()> shutdown_fn, Status& result_st) {
40
1
    for (auto& consumer : _consumers) {
41
1
        auto cb = [this, shutdown_fn, &result_st](const Status& st) {
42
1
            std::unique_lock<std::mutex> lock(_mutex);
43
1
            if (--_counter == 0) {
44
1
                shutdown_fn();
45
1
                LOG(INFO) << "all consumers finished, shutdown queue. grp: " << _grp_id;
46
1
            }
47
1
            if (result_st.ok() && !st.ok()) {
48
0
                result_st = st;
49
0
            }
50
1
        };
51
1
        if (!_thread_pool.offer([consume_fn, consumer, cb] { consume_fn(consumer, cb); })) {
52
0
            LOG(WARNING) << "failed to submit consumer: " << consumer->id() << ", grp: " << _grp_id;
53
0
            return false;
54
0
        }
55
1
        VLOG_CRITICAL << "submit consumer: " << consumer->id() << ", grp: " << _grp_id;
56
1
    }
57
1
    return true;
58
1
}
59
60
Status DataConsumerGroup::_run_consume_loop(std::shared_ptr<StreamLoadContext> ctx,
61
                                            std::shared_ptr<io::StreamLoadPipe> pipe,
62
1
                                            Status& result_st) {
63
1
    int64_t left_time = ctx->max_interval_s * 1000;
64
1
    int64_t left_rows = ctx->max_batch_rows;
65
1
    int64_t left_bytes = ctx->max_batch_size;
66
67
1
    LOG(INFO) << "start consumer group: " << _grp_id << ". max time(ms): " << left_time
68
1
              << ", batch rows: " << left_rows << ", batch size: " << left_bytes << ". "
69
1
              << ctx->brief();
70
71
1
    MonotonicStopWatch watch;
72
1
    watch.start();
73
1
    bool eos = false;
74
2
    while (true) {
75
2
        if (eos || left_time <= 0 || left_rows <= 0 || left_bytes <= 0) {
76
1
            LOG(INFO) << "consumer group done: " << _grp_id
77
1
                      << ". consume time(ms)=" << ctx->max_interval_s * 1000 - left_time
78
1
                      << ", received rows=" << ctx->max_batch_rows - left_rows
79
1
                      << ", received bytes=" << ctx->max_batch_size - left_bytes << ", eos: " << eos
80
1
                      << ", left_time: " << left_time << ", left_rows: " << left_rows
81
1
                      << ", left_bytes: " << left_bytes
82
1
                      << ", blocking get time(us): " << pipe->get_queue_size() << ", "
83
1
                      << ctx->brief();
84
85
1
            _shutdown_queue();
86
1
            for (auto& consumer : _consumers) {
87
1
                static_cast<void>(consumer->cancel(ctx));
88
1
            }
89
1
            _thread_pool.shutdown();
90
1
            _thread_pool.join();
91
1
            if (!result_st.ok()) {
92
0
                pipe->cancel(result_st.to_string());
93
0
                return result_st;
94
0
            }
95
1
            RETURN_IF_ERROR(pipe->finish());
96
1
            _on_finish(ctx);
97
1
            ctx->receive_bytes = ctx->max_batch_size - left_bytes;
98
1
            return Status::OK();
99
1
        }
100
101
1
        if (!_dequeue_and_process(pipe.get(), left_rows, left_bytes, result_st)) {
102
1
            eos = true;
103
1
        }
104
1
        left_time = ctx->max_interval_s * 1000 - watch.elapsed_time() / 1000 / 1000;
105
1
    }
106
1
}
107
108
1
Status KafkaDataConsumerGroup::assign_topic_partitions(std::shared_ptr<StreamLoadContext> ctx) {
109
1
    DCHECK(ctx->kafka_info);
110
1
    DCHECK(_consumers.size() >= 1);
111
112
    // divide partitions using round-robin partitioner
113
1
    int consumer_size = doris::cast_set<int>(_consumers.size());
114
1
    auto divide_parts = WorkPartitioner<int32_t, int64_t>::partition_round_robin(
115
1
            ctx->kafka_info->begin_offset, consumer_size);
116
117
    // assign partitions to consumers equally
118
2
    for (int j = 0; j < consumer_size; ++j) {
119
1
        RETURN_IF_ERROR(
120
1
                std::static_pointer_cast<KafkaDataConsumer>(_consumers[j])
121
1
                        ->assign_topic_partitions(divide_parts[j], ctx->kafka_info->topic, ctx));
122
1
    }
123
124
1
    return Status::OK();
125
1
}
126
127
1
KafkaDataConsumerGroup::~KafkaDataConsumerGroup() {
128
    // clean the msgs left in queue
129
1
    _queue.shutdown();
130
1
    while (true) {
131
1
        RdKafka::Message* msg;
132
1
        if (_queue.blocking_get(&msg)) {
133
0
            delete msg;
134
0
            msg = nullptr;
135
1
        } else {
136
1
            break;
137
1
        }
138
1
    }
139
1
    DCHECK(_queue.get_size() == 0);
140
1
}
141
142
Status KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx,
143
1
                                         std::shared_ptr<io::StreamLoadPipe> pipe) {
144
1
    DORIS_CHECK(std::dynamic_pointer_cast<io::KafkaConsumerPipe>(pipe) != nullptr);
145
1
    Status result_st = Status::OK();
146
1
    _cmt_offset = ctx->kafka_info->cmt_offset;
147
1
    _format = ctx->format;
148
149
1
    if (!_submit_all_consumers(
150
1
                [this, max_time = ctx->max_interval_s * 1000](std::shared_ptr<DataConsumer> c,
151
1
                                                              ConsumeFinishCallback cb) {
152
1
                    actual_consume(c, &_queue, max_time, cb);
153
1
                },
154
1
                [this] { _queue.shutdown(); }, result_st)) {
155
0
        return Status::InternalError("failed to submit data consumer");
156
0
    }
157
1
    RETURN_IF_ERROR(_run_consume_loop(ctx, pipe, result_st));
158
1
    ctx->kafka_info->cmt_offset = std::move(_cmt_offset);
159
1
    return Status::OK();
160
1
}
161
162
bool KafkaDataConsumerGroup::_dequeue_and_process(io::StreamLoadPipe* pipe, int64_t& left_rows,
163
1
                                                  int64_t& left_bytes, Status& result_st) {
164
1
    RdKafka::Message* msg = nullptr;
165
1
    if (!_queue.controlled_blocking_get(&msg, config::blocking_queue_cv_wait_timeout_ms)) {
166
1
        return false;
167
1
    }
168
0
    Defer delete_msg {[msg] { delete msg; }};
169
0
    VLOG_NOTICE << "get kafka message, partition: " << msg->partition()
170
0
                << ", offset: " << msg->offset() << ", len: " << msg->len();
171
172
0
    if (msg->err() == RdKafka::ERR__PARTITION_EOF) {
173
0
        if (msg->offset() > 0) {
174
0
            _cmt_offset[msg->partition()] = msg->offset() - 1;
175
0
        }
176
0
        return true;
177
0
    }
178
179
0
    auto append_fn = FormatAppender::get_append_function<io::StreamLoadPipe>(_format);
180
0
    Status st = (pipe->*append_fn)(static_cast<const char*>(msg->payload()),
181
0
                                   static_cast<size_t>(msg->len()));
182
0
    if (st.ok()) {
183
0
        left_rows--;
184
0
        left_bytes -= msg->len();
185
0
        _cmt_offset[msg->partition()] = msg->offset();
186
0
        VLOG_NOTICE << "consume partition[" << msg->partition() << " - " << msg->offset() << "]";
187
0
    } else {
188
0
        LOG(WARNING) << "failed to append msg to pipe. grp: " << _grp_id;
189
0
        std::unique_lock<std::mutex> lock(_mutex);
190
0
        if (result_st.ok()) {
191
0
            result_st = st;
192
0
        }
193
0
    }
194
0
    return true;
195
0
}
196
197
1
void KafkaDataConsumerGroup::_on_finish(std::shared_ptr<StreamLoadContext> ctx) {
198
    // cmt_offset is moved back in start_all after _run_consume_loop returns
199
1
}
200
201
void KafkaDataConsumerGroup::actual_consume(std::shared_ptr<DataConsumer> consumer,
202
                                            BlockingQueue<RdKafka::Message*>* queue,
203
1
                                            int64_t max_running_time_ms, ConsumeFinishCallback cb) {
204
1
    Status st = std::static_pointer_cast<KafkaDataConsumer>(consumer)->group_consume(
205
1
            queue, max_running_time_ms);
206
1
    cb(st);
207
1
}
208
209
0
Status KinesisDataConsumerGroup::assign_stream_shards(std::shared_ptr<StreamLoadContext> ctx) {
210
0
    DCHECK(ctx->kinesis_info);
211
0
    DCHECK(_consumers.size() >= 1);
212
213
    // divide shards using round-robin partitioner
214
0
    int consumer_size = doris::cast_set<int>(_consumers.size());
215
0
    auto divide_shards = WorkPartitioner<std::string, std::string>::partition_round_robin(
216
0
            ctx->kinesis_info->begin_sequence_number, consumer_size);
217
218
    // assign shards to consumers equally
219
0
    for (int j = 0; j < consumer_size; ++j) {
220
0
        RETURN_IF_ERROR(std::static_pointer_cast<KinesisDataConsumer>(_consumers[j])
221
0
                                ->assign_shards(divide_shards[j], ctx->kinesis_info->stream, ctx));
222
0
    }
223
224
0
    return Status::OK();
225
0
}
226
227
0
KinesisDataConsumerGroup::~KinesisDataConsumerGroup() {
228
0
    _queue.shutdown();
229
0
    while (true) {
230
0
        std::shared_ptr<Aws::Kinesis::Model::Record> record;
231
0
        if (_queue.blocking_get(&record)) {
232
0
            record.reset();
233
0
        } else {
234
0
            break;
235
0
        }
236
0
    }
237
0
    DCHECK(_queue.get_size() == 0);
238
0
}
239
240
Status KinesisDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx,
241
0
                                           std::shared_ptr<io::StreamLoadPipe> pipe) {
242
0
    DORIS_CHECK(std::dynamic_pointer_cast<io::KinesisConsumerPipe>(pipe) != nullptr);
243
0
    Status result_st = Status::OK();
244
0
    _format = ctx->format;
245
246
0
    if (!_submit_all_consumers(
247
0
                [this, max_time = ctx->max_interval_s * 1000](std::shared_ptr<DataConsumer> c,
248
0
                                                              ConsumeFinishCallback cb) {
249
0
                    actual_consume(c, &_queue, max_time, cb);
250
0
                },
251
0
                [this] { _queue.shutdown(); }, result_st)) {
252
0
        return Status::InternalError("failed to submit kinesis data consumer");
253
0
    }
254
0
    return _run_consume_loop(ctx, pipe, result_st);
255
0
}
256
257
bool KinesisDataConsumerGroup::_dequeue_and_process(io::StreamLoadPipe* pipe, int64_t& left_rows,
258
0
                                                    int64_t& left_bytes, Status& result_st) {
259
0
    std::shared_ptr<Aws::Kinesis::Model::Record> record;
260
0
    if (!_queue.controlled_blocking_get(&record, config::blocking_queue_cv_wait_timeout_ms)) {
261
0
        return false;
262
0
    }
263
0
    auto& data = record->GetData();
264
0
    const char* payload = reinterpret_cast<const char*>(data.GetUnderlyingData());
265
0
    size_t len = data.GetLength();
266
0
    VLOG_NOTICE << "get kinesis record, seq: " << record->GetSequenceNumber() << ", len: " << len;
267
268
0
    auto append_fn = FormatAppender::get_append_function<io::StreamLoadPipe>(_format);
269
0
    Status st = (pipe->*append_fn)(payload, len);
270
0
    if (st.ok()) {
271
0
        left_rows--;
272
0
        left_bytes -= len;
273
0
        VLOG_NOTICE << "consume kinesis record [seq=" << record->GetSequenceNumber() << "]";
274
0
    } else {
275
0
        LOG(WARNING) << "failed to append kinesis record to pipe. grp: " << _grp_id;
276
0
        std::unique_lock<std::mutex> lock(_mutex);
277
0
        if (result_st.ok()) {
278
0
            result_st = st;
279
0
        }
280
0
    }
281
0
    return true;
282
0
}
283
284
0
void KinesisDataConsumerGroup::_on_finish(std::shared_ptr<StreamLoadContext> ctx) {
285
0
    for (auto& consumer : _consumers) {
286
0
        auto kinesis_consumer = std::static_pointer_cast<KinesisDataConsumer>(consumer);
287
0
        for (auto& [shard_id, seq_num] : kinesis_consumer->get_committed_sequence_numbers()) {
288
0
            ctx->kinesis_info->cmt_sequence_number[shard_id] = seq_num;
289
0
        }
290
0
        for (auto& [shard_id, millis] : kinesis_consumer->get_millis_behind_latest()) {
291
0
            auto [it, inserted] = ctx->kinesis_info->millis_behind_latest.emplace(shard_id, millis);
292
0
            if (!inserted && it->second < millis) {
293
0
                it->second = millis;
294
0
            }
295
0
        }
296
0
        for (auto& shard_id : kinesis_consumer->get_closed_shard_ids()) {
297
0
            ctx->kinesis_info->closed_shard_ids.insert(shard_id);
298
0
        }
299
0
    }
300
0
}
301
302
void KinesisDataConsumerGroup::actual_consume(
303
        std::shared_ptr<DataConsumer> consumer,
304
        BlockingQueue<std::shared_ptr<Aws::Kinesis::Model::Record>>* queue,
305
0
        int64_t max_running_time_ms, ConsumeFinishCallback cb) {
306
0
    Status st = std::static_pointer_cast<KinesisDataConsumer>(consumer)->group_consume(
307
0
            queue, max_running_time_ms);
308
0
    cb(st);
309
0
}
310
311
} // namespace doris