Coverage Report

Created: 2026-03-16 19:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/routine_load/data_consumer_group.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
#include "load/routine_load/data_consumer_group.h"
18
19
#include <gen_cpp/PlanNodes_types.h>
20
#include <stddef.h>
21
22
#include <map>
23
#include <ostream>
24
#include <string>
25
#include <utility>
26
27
#include "common/logging.h"
28
#include "librdkafka/rdkafkacpp.h"
29
#include "load/routine_load/data_consumer.h"
30
#include "load/stream_load/stream_load_context.h"
31
#include "util/stopwatch.hpp"
32
33
namespace doris {
34
#include "common/compile_check_begin.h"
35
36
1
Status KafkaDataConsumerGroup::assign_topic_partitions(std::shared_ptr<StreamLoadContext> ctx) {
37
1
    DCHECK(ctx->kafka_info);
38
1
    DCHECK(_consumers.size() >= 1);
39
40
    // divide partitions
41
1
    int consumer_size = doris::cast_set<int>(_consumers.size());
42
1
    std::vector<std::map<int32_t, int64_t>> divide_parts(consumer_size);
43
1
    int i = 0;
44
1
    for (auto& kv : ctx->kafka_info->begin_offset) {
45
1
        int idx = i % consumer_size;
46
1
        divide_parts[idx].emplace(kv.first, kv.second);
47
1
        i++;
48
1
    }
49
50
    // assign partitions to consumers equally
51
2
    for (int j = 0; j < consumer_size; ++j) {
52
1
        RETURN_IF_ERROR(
53
1
                std::static_pointer_cast<KafkaDataConsumer>(_consumers[j])
54
1
                        ->assign_topic_partitions(divide_parts[j], ctx->kafka_info->topic, ctx));
55
1
    }
56
57
1
    return Status::OK();
58
1
}
59
60
1
KafkaDataConsumerGroup::~KafkaDataConsumerGroup() {
61
    // clean the msgs left in queue
62
1
    _queue.shutdown();
63
1
    while (true) {
64
1
        RdKafka::Message* msg;
65
1
        if (_queue.blocking_get(&msg)) {
66
0
            delete msg;
67
0
            msg = nullptr;
68
1
        } else {
69
1
            break;
70
1
        }
71
1
    }
72
1
    DCHECK(_queue.get_size() == 0);
73
1
}
74
75
Status KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx,
76
1
                                         std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe) {
77
1
    Status result_st = Status::OK();
78
    // start all consumers
79
1
    for (auto& consumer : _consumers) {
80
1
        if (!_thread_pool.offer(std::bind<void>(
81
1
                    &KafkaDataConsumerGroup::actual_consume, this, consumer, &_queue,
82
1
                    ctx->max_interval_s * 1000, [this, &result_st](const Status& st) {
83
1
                        std::unique_lock<std::mutex> lock(_mutex);
84
1
                        _counter--;
85
1
                        VLOG_CRITICAL << "group counter is: " << _counter << ", grp: " << _grp_id;
86
1
                        if (_counter == 0) {
87
1
                            _queue.shutdown();
88
1
                            LOG(INFO) << "all consumers are finished. shutdown queue. group id: "
89
1
                                      << _grp_id;
90
1
                        }
91
1
                        if (result_st.ok() && !st.ok()) {
92
0
                            result_st = st;
93
0
                        }
94
1
                    }))) {
95
0
            LOG(WARNING) << "failed to submit data consumer: " << consumer->id()
96
0
                         << ", group id: " << _grp_id;
97
0
            return Status::InternalError("failed to submit data consumer");
98
1
        } else {
99
1
            VLOG_CRITICAL << "submit a data consumer: " << consumer->id()
100
0
                          << ", group id: " << _grp_id;
101
1
        }
102
1
    }
103
104
    // consuming from queue and put data to stream load pipe
105
1
    int64_t left_time = ctx->max_interval_s * 1000;
106
1
    int64_t left_rows = ctx->max_batch_rows;
107
1
    int64_t left_bytes = ctx->max_batch_size;
108
109
1
    LOG(INFO) << "start consumer group: " << _grp_id << ". max time(ms): " << left_time
110
1
              << ", batch rows: " << left_rows << ", batch size: " << left_bytes << ". "
111
1
              << ctx->brief();
112
113
    // copy one
114
1
    std::map<int32_t, int64_t> cmt_offset = ctx->kafka_info->cmt_offset;
115
116
    //improve performance
117
1
    Status (io::KafkaConsumerPipe::*append_data)(const char* data, size_t size);
118
1
    if (ctx->format == TFileFormatType::FORMAT_JSON) {
119
0
        append_data = &io::KafkaConsumerPipe::append_json;
120
1
    } else {
121
1
        append_data = &io::KafkaConsumerPipe::append_with_line_delimiter;
122
1
    }
123
124
1
    MonotonicStopWatch watch;
125
1
    watch.start();
126
1
    bool eos = false;
127
2
    while (true) {
128
2
        if (eos || left_time <= 0 || left_rows <= 0 || left_bytes <= 0) {
129
1
            LOG(INFO) << "consumer group done: " << _grp_id
130
1
                      << ". consume time(ms)=" << ctx->max_interval_s * 1000 - left_time
131
1
                      << ", received rows=" << ctx->max_batch_rows - left_rows
132
1
                      << ", received bytes=" << ctx->max_batch_size - left_bytes << ", eos: " << eos
133
1
                      << ", left_time: " << left_time << ", left_rows: " << left_rows
134
1
                      << ", left_bytes: " << left_bytes
135
1
                      << ", blocking get time(us): " << _queue.total_get_wait_time() / 1000
136
1
                      << ", blocking put time(us): " << _queue.total_put_wait_time() / 1000 << ", "
137
1
                      << ctx->brief();
138
139
            // shutdown queue
140
1
            _queue.shutdown();
141
            // cancel all consumers
142
1
            for (auto& consumer : _consumers) {
143
1
                static_cast<void>(consumer->cancel(ctx));
144
1
            }
145
146
            // waiting all threads finished
147
1
            _thread_pool.shutdown();
148
1
            _thread_pool.join();
149
1
            if (!result_st.ok()) {
150
0
                kafka_pipe->cancel(result_st.to_string());
151
0
                return result_st;
152
0
            }
153
1
            static_cast<void>(kafka_pipe->finish());
154
1
            ctx->kafka_info->cmt_offset = std::move(cmt_offset);
155
1
            ctx->receive_bytes = ctx->max_batch_size - left_bytes;
156
1
            return Status::OK();
157
1
        }
158
159
1
        RdKafka::Message* msg;
160
1
        bool res = _queue.controlled_blocking_get(&msg, config::blocking_queue_cv_wait_timeout_ms);
161
1
        if (res) {
162
            // conf has to be deleted finally
163
0
            Defer delete_msg {[msg]() { delete msg; }};
164
0
            VLOG_NOTICE << "get kafka message"
165
0
                        << ", partition: " << msg->partition() << ", offset: " << msg->offset()
166
0
                        << ", len: " << msg->len();
167
168
0
            if (msg->err() == RdKafka::ERR__PARTITION_EOF) {
169
0
                if (msg->offset() > 0) {
170
0
                    cmt_offset[msg->partition()] = msg->offset() - 1;
171
0
                }
172
0
            } else {
173
0
                Status st = (kafka_pipe.get()->*append_data)(
174
0
                        static_cast<const char*>(msg->payload()), static_cast<size_t>(msg->len()));
175
0
                if (st.ok()) {
176
0
                    left_rows--;
177
0
                    left_bytes -= msg->len();
178
0
                    cmt_offset[msg->partition()] = msg->offset();
179
0
                    VLOG_NOTICE << "consume partition[" << msg->partition() << " - "
180
0
                                << msg->offset() << "]";
181
0
                } else {
182
                    // failed to append this msg, we must stop
183
0
                    LOG(WARNING) << "failed to append msg to pipe. grp: " << _grp_id;
184
0
                    eos = true;
185
0
                    {
186
0
                        std::unique_lock<std::mutex> lock(_mutex);
187
0
                        if (result_st.ok()) {
188
0
                            result_st = st;
189
0
                        }
190
0
                    }
191
0
                }
192
0
            }
193
1
        } else {
194
            // queue is empty and shutdown
195
1
            eos = true;
196
1
        }
197
198
1
        left_time = ctx->max_interval_s * 1000 - watch.elapsed_time() / 1000 / 1000;
199
1
    }
200
201
0
    return Status::OK();
202
1
}
203
204
void KafkaDataConsumerGroup::actual_consume(std::shared_ptr<DataConsumer> consumer,
205
                                            BlockingQueue<RdKafka::Message*>* queue,
206
1
                                            int64_t max_running_time_ms, ConsumeFinishCallback cb) {
207
1
    Status st = std::static_pointer_cast<KafkaDataConsumer>(consumer)->group_consume(
208
1
            queue, max_running_time_ms);
209
1
    cb(st);
210
1
}
211
#include "common/compile_check_end.h"
212
213
} // namespace doris