Coverage Report

Created: 2026-03-14 13:33

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/routine_load/data_consumer.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <stdint.h>
21
22
#include <ctime>
23
#include <map>
24
#include <memory>
25
#include <mutex>
26
#include <ostream>
27
#include <string>
28
#include <unordered_map>
29
#include <vector>
30
31
#include "common/logging.h"
32
#include "common/status.h"
33
#include "librdkafka/rdkafkacpp.h"
34
#include "load/stream_load/stream_load_context.h"
35
#include "util/uid_util.h"
36
37
namespace doris {
38
39
template <typename T>
40
class BlockingQueue;
41
42
class DataConsumer {
43
public:
44
    DataConsumer()
45
78
            : _id(UniqueId::gen_uid()),
46
78
              _grp_id(UniqueId::gen_uid()),
47
78
              _has_grp(false),
48
78
              _init(false),
49
78
              _cancelled(false),
50
78
              _last_visit_time(0) {}
51
52
78
    virtual ~DataConsumer() {}
53
54
    // init the consumer with the given parameters
55
    virtual Status init(std::shared_ptr<StreamLoadContext> ctx) = 0;
56
    // start consuming
57
    virtual Status consume(std::shared_ptr<StreamLoadContext> ctx) = 0;
58
    // cancel the consuming process.
59
    // if the consumer is not initialized, or the consuming
60
    // process is already finished, call cancel() will
61
    // return ERROR
62
    virtual Status cancel(std::shared_ptr<StreamLoadContext> ctx) = 0;
63
    // reset the data consumer before being reused
64
    virtual Status reset() = 0;
65
    // return true the if the consumer match the need
66
    virtual bool match(std::shared_ptr<StreamLoadContext> ctx) = 0;
67
68
71
    const UniqueId& id() { return _id; }
69
853
    time_t last_visit_time() { return _last_visit_time; }
70
71
    void set_grp(const UniqueId& grp_id) {
71
71
        _grp_id = grp_id;
72
71
        _has_grp = true;
73
71
    }
74
75
protected:
76
    UniqueId _id;
77
    UniqueId _grp_id;
78
    bool _has_grp;
79
80
    // lock to protect the following bools
81
    std::mutex _lock;
82
    bool _init;
83
    bool _cancelled;
84
    time_t _last_visit_time;
85
};
86
87
class PIntegerPair;
88
89
class KafkaEventCb : public RdKafka::EventCb {
90
public:
91
247
    void event_cb(RdKafka::Event& event) {
92
247
        switch (event.type()) {
93
7
        case RdKafka::Event::EVENT_ERROR:
94
7
            LOG(INFO) << "kafka error: " << RdKafka::err2str(event.err())
95
7
                      << ", event: " << event.str();
96
7
            break;
97
0
        case RdKafka::Event::EVENT_STATS:
98
0
            LOG(INFO) << "kafka stats: " << event.str();
99
0
            break;
100
101
240
        case RdKafka::Event::EVENT_LOG:
102
240
            LOG(INFO) << "kafka log-" << event.severity() << "-" << event.fac().c_str()
103
240
                      << ", event: " << event.str();
104
240
            break;
105
106
0
        case RdKafka::Event::EVENT_THROTTLE:
107
0
            LOG(INFO) << "kafka throttled: " << event.throttle_time() << "ms by "
108
0
                      << event.broker_name() << " id " << (int)event.broker_id();
109
0
            break;
110
111
0
        default:
112
0
            LOG(INFO) << "kafka event: " << event.type()
113
0
                      << ", err: " << RdKafka::err2str(event.err()) << ", event: " << event.str();
114
0
            break;
115
247
        }
116
247
    }
117
};
118
119
class KafkaDataConsumer : public DataConsumer {
120
public:
121
    KafkaDataConsumer(std::shared_ptr<StreamLoadContext> ctx)
122
78
            : _brokers(ctx->kafka_info->brokers), _topic(ctx->kafka_info->topic) {}
123
124
78
    virtual ~KafkaDataConsumer() {
125
78
        VLOG_NOTICE << "deconstruct consumer";
126
78
        if (_k_consumer) {
127
78
            _k_consumer->close();
128
78
            delete _k_consumer;
129
78
            _k_consumer = nullptr;
130
78
        }
131
78
    }
132
133
    Status init(std::shared_ptr<StreamLoadContext> ctx) override;
134
    // TODO(cmy): currently do not implement single consumer start method, using group_consume
135
0
    Status consume(std::shared_ptr<StreamLoadContext> ctx) override { return Status::OK(); }
136
    Status cancel(std::shared_ptr<StreamLoadContext> ctx) override;
137
    // reassign partition topics
138
    virtual Status reset() override;
139
    bool match(std::shared_ptr<StreamLoadContext> ctx) override;
140
    // commit kafka offset
141
    Status commit(std::vector<RdKafka::TopicPartition*>& offset);
142
143
    Status assign_topic_partitions(const std::map<int32_t, int64_t>& begin_partition_offset,
144
                                   const std::string& topic,
145
                                   std::shared_ptr<StreamLoadContext> ctx);
146
147
    // start the consumer and put msgs to queue
148
    Status group_consume(BlockingQueue<RdKafka::Message*>* queue, int64_t max_running_time_ms);
149
150
    // get the partitions ids of the topic
151
    Status get_partition_meta(std::vector<int32_t>* partition_ids);
152
    // get offsets for times
153
    Status get_offsets_for_times(const std::vector<PIntegerPair>& times,
154
                                 std::vector<PIntegerPair>* offsets, int timeout);
155
    // get latest offsets for partitions
156
    Status get_latest_offsets_for_partitions(const std::vector<int32_t>& partition_ids,
157
                                             std::vector<PIntegerPair>* offsets, int timeout);
158
    // get offsets for times
159
    Status get_real_offsets_for_partitions(const std::vector<PIntegerPair>& offset_flags,
160
                                           std::vector<PIntegerPair>* offsets, int timeout);
161
162
private:
163
    std::string _brokers;
164
    std::string _topic;
165
    std::unordered_map<std::string, std::string> _custom_properties;
166
    std::set<int32_t> _consuming_partition_ids;
167
168
    KafkaEventCb _k_event_cb;
169
    RdKafka::KafkaConsumer* _k_consumer = nullptr;
170
};
171
172
} // end namespace doris