Coverage Report

Created: 2026-04-14 17:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/routine_load/data_consumer_group.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
#include "load/routine_load/data_consumer_group.h"
18
19
#include <gen_cpp/PlanNodes_types.h>
20
#include <stddef.h>
21
22
#include <map>
23
#include <ostream>
24
#include <string>
25
#include <utility>
26
27
#include "common/logging.h"
28
#include "librdkafka/rdkafkacpp.h"
29
#include "load/routine_load/data_consumer.h"
30
#include "load/stream_load/stream_load_context.h"
31
#include "util/stopwatch.hpp"
32
33
namespace doris {
34
35
1
Status KafkaDataConsumerGroup::assign_topic_partitions(std::shared_ptr<StreamLoadContext> ctx) {
36
1
    DCHECK(ctx->kafka_info);
37
1
    DCHECK(_consumers.size() >= 1);
38
39
    // divide partitions
40
1
    int consumer_size = doris::cast_set<int>(_consumers.size());
41
1
    std::vector<std::map<int32_t, int64_t>> divide_parts(consumer_size);
42
1
    int i = 0;
43
1
    for (auto& kv : ctx->kafka_info->begin_offset) {
44
1
        int idx = i % consumer_size;
45
1
        divide_parts[idx].emplace(kv.first, kv.second);
46
1
        i++;
47
1
    }
48
49
    // assign partitions to consumers equally
50
2
    for (int j = 0; j < consumer_size; ++j) {
51
1
        RETURN_IF_ERROR(
52
1
                std::static_pointer_cast<KafkaDataConsumer>(_consumers[j])
53
1
                        ->assign_topic_partitions(divide_parts[j], ctx->kafka_info->topic, ctx));
54
1
    }
55
56
1
    return Status::OK();
57
1
}
58
59
1
KafkaDataConsumerGroup::~KafkaDataConsumerGroup() {
60
    // clean the msgs left in queue
61
1
    _queue.shutdown();
62
1
    while (true) {
63
1
        RdKafka::Message* msg;
64
1
        if (_queue.blocking_get(&msg)) {
65
0
            delete msg;
66
0
            msg = nullptr;
67
1
        } else {
68
1
            break;
69
1
        }
70
1
    }
71
1
    DCHECK(_queue.get_size() == 0);
72
1
}
73
74
Status KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx,
75
1
                                         std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe) {
76
1
    Status result_st = Status::OK();
77
    // start all consumers
78
1
    for (auto& consumer : _consumers) {
79
1
        if (!_thread_pool.offer(std::bind<void>(
80
1
                    &KafkaDataConsumerGroup::actual_consume, this, consumer, &_queue,
81
1
                    ctx->max_interval_s * 1000, [this, &result_st](const Status& st) {
82
1
                        std::unique_lock<std::mutex> lock(_mutex);
83
1
                        _counter--;
84
1
                        VLOG_CRITICAL << "group counter is: " << _counter << ", grp: " << _grp_id;
85
1
                        if (_counter == 0) {
86
1
                            _queue.shutdown();
87
1
                            LOG(INFO) << "all consumers are finished. shutdown queue. group id: "
88
1
                                      << _grp_id;
89
1
                        }
90
1
                        if (result_st.ok() && !st.ok()) {
91
0
                            result_st = st;
92
0
                        }
93
1
                    }))) {
94
0
            LOG(WARNING) << "failed to submit data consumer: " << consumer->id()
95
0
                         << ", group id: " << _grp_id;
96
0
            return Status::InternalError("failed to submit data consumer");
97
1
        } else {
98
1
            VLOG_CRITICAL << "submit a data consumer: " << consumer->id()
99
0
                          << ", group id: " << _grp_id;
100
1
        }
101
1
    }
102
103
    // consuming from queue and put data to stream load pipe
104
1
    int64_t left_time = ctx->max_interval_s * 1000;
105
1
    int64_t left_rows = ctx->max_batch_rows;
106
1
    int64_t left_bytes = ctx->max_batch_size;
107
108
1
    LOG(INFO) << "start consumer group: " << _grp_id << ". max time(ms): " << left_time
109
1
              << ", batch rows: " << left_rows << ", batch size: " << left_bytes << ". "
110
1
              << ctx->brief();
111
112
    // copy one
113
1
    std::map<int32_t, int64_t> cmt_offset = ctx->kafka_info->cmt_offset;
114
115
    //improve performance
116
1
    Status (io::KafkaConsumerPipe::*append_data)(const char* data, size_t size);
117
1
    if (ctx->format == TFileFormatType::FORMAT_JSON) {
118
0
        append_data = &io::KafkaConsumerPipe::append_json;
119
1
    } else {
120
1
        append_data = &io::KafkaConsumerPipe::append_with_line_delimiter;
121
1
    }
122
123
1
    MonotonicStopWatch watch;
124
1
    watch.start();
125
1
    bool eos = false;
126
2
    while (true) {
127
2
        if (eos || left_time <= 0 || left_rows <= 0 || left_bytes <= 0) {
128
1
            LOG(INFO) << "consumer group done: " << _grp_id
129
1
                      << ". consume time(ms)=" << ctx->max_interval_s * 1000 - left_time
130
1
                      << ", received rows=" << ctx->max_batch_rows - left_rows
131
1
                      << ", received bytes=" << ctx->max_batch_size - left_bytes << ", eos: " << eos
132
1
                      << ", left_time: " << left_time << ", left_rows: " << left_rows
133
1
                      << ", left_bytes: " << left_bytes
134
1
                      << ", blocking get time(us): " << _queue.total_get_wait_time() / 1000
135
1
                      << ", blocking put time(us): " << _queue.total_put_wait_time() / 1000 << ", "
136
1
                      << ctx->brief();
137
138
            // shutdown queue
139
1
            _queue.shutdown();
140
            // cancel all consumers
141
1
            for (auto& consumer : _consumers) {
142
1
                static_cast<void>(consumer->cancel(ctx));
143
1
            }
144
145
            // waiting all threads finished
146
1
            _thread_pool.shutdown();
147
1
            _thread_pool.join();
148
1
            if (!result_st.ok()) {
149
0
                kafka_pipe->cancel(result_st.to_string());
150
0
                return result_st;
151
0
            }
152
1
            static_cast<void>(kafka_pipe->finish());
153
1
            ctx->kafka_info->cmt_offset = std::move(cmt_offset);
154
1
            ctx->receive_bytes = ctx->max_batch_size - left_bytes;
155
1
            return Status::OK();
156
1
        }
157
158
1
        RdKafka::Message* msg;
159
1
        bool res = _queue.controlled_blocking_get(&msg, config::blocking_queue_cv_wait_timeout_ms);
160
1
        if (res) {
161
            // conf has to be deleted finally
162
0
            Defer delete_msg {[msg]() { delete msg; }};
163
0
            VLOG_NOTICE << "get kafka message"
164
0
                        << ", partition: " << msg->partition() << ", offset: " << msg->offset()
165
0
                        << ", len: " << msg->len();
166
167
0
            if (msg->err() == RdKafka::ERR__PARTITION_EOF) {
168
0
                if (msg->offset() > 0) {
169
0
                    cmt_offset[msg->partition()] = msg->offset() - 1;
170
0
                }
171
0
            } else {
172
0
                Status st = (kafka_pipe.get()->*append_data)(
173
0
                        static_cast<const char*>(msg->payload()), static_cast<size_t>(msg->len()));
174
0
                if (st.ok()) {
175
0
                    left_rows--;
176
0
                    left_bytes -= msg->len();
177
0
                    cmt_offset[msg->partition()] = msg->offset();
178
0
                    VLOG_NOTICE << "consume partition[" << msg->partition() << " - "
179
0
                                << msg->offset() << "]";
180
0
                } else {
181
                    // failed to append this msg, we must stop
182
0
                    LOG(WARNING) << "failed to append msg to pipe. grp: " << _grp_id;
183
0
                    eos = true;
184
0
                    {
185
0
                        std::unique_lock<std::mutex> lock(_mutex);
186
0
                        if (result_st.ok()) {
187
0
                            result_st = st;
188
0
                        }
189
0
                    }
190
0
                }
191
0
            }
192
1
        } else {
193
            // queue is empty and shutdown
194
1
            eos = true;
195
1
        }
196
197
1
        left_time = ctx->max_interval_s * 1000 - watch.elapsed_time() / 1000 / 1000;
198
1
    }
199
200
0
    return Status::OK();
201
1
}
202
203
void KafkaDataConsumerGroup::actual_consume(std::shared_ptr<DataConsumer> consumer,
204
                                            BlockingQueue<RdKafka::Message*>* queue,
205
1
                                            int64_t max_running_time_ms, ConsumeFinishCallback cb) {
206
1
    Status st = std::static_pointer_cast<KafkaDataConsumer>(consumer)->group_consume(
207
1
            queue, max_running_time_ms);
208
1
    cb(st);
209
1
}
210
211
} // namespace doris