Coverage Report

Created: 2026-04-11 00:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/routine_load/data_consumer_group.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <stdint.h>
21
22
#include <functional>
23
#include <memory>
24
#include <mutex>
25
#include <vector>
26
27
#include "common/cast_set.h"
28
#include "common/status.h"
29
#include "io/fs/kafka_consumer_pipe.h"
30
#include "load/routine_load/data_consumer.h"
31
#include "util/blocking_queue.hpp"
32
#include "util/uid_util.h"
33
#include "util/work_thread_pool.hpp"
34
35
namespace RdKafka {
36
class Message;
37
} // namespace RdKafka
38
39
namespace doris {
40
class StreamLoadContext;
41
42
// data consumer group saves a group of data consumers.
43
// These data consumers share the same stream load pipe.
44
// This class is not thread safe.
45
class DataConsumerGroup {
46
public:
47
    typedef std::function<void(const Status&)> ConsumeFinishCallback;
48
49
    DataConsumerGroup(size_t consumer_num)
50
1
            : _grp_id(UniqueId::gen_uid()),
51
1
              _thread_pool(doris::cast_set<uint32_t>(consumer_num),
52
1
                           doris::cast_set<uint32_t>(consumer_num), "data_consumer"),
53
1
              _counter(0) {}
54
55
1
    virtual ~DataConsumerGroup() { _consumers.clear(); }
56
57
1
    const UniqueId& grp_id() { return _grp_id; }
58
59
1
    const std::vector<std::shared_ptr<DataConsumer>>& consumers() { return _consumers; }
60
61
1
    void add_consumer(std::shared_ptr<DataConsumer> consumer) {
62
1
        consumer->set_grp(_grp_id);
63
1
        _consumers.push_back(consumer);
64
1
        ++_counter;
65
1
    }
66
67
    // start all consumers
68
    virtual Status start_all(std::shared_ptr<StreamLoadContext> ctx,
69
0
                             std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe) {
70
0
        return Status::OK();
71
0
    }
72
73
protected:
74
    UniqueId _grp_id;
75
    std::vector<std::shared_ptr<DataConsumer>> _consumers;
76
    // thread pool to run each consumer in multi thread
77
    PriorityThreadPool _thread_pool;
78
    // mutex to protect counter.
79
    // the counter is init as the number of consumers.
80
    // once a consumer is done, decrease the counter.
81
    // when the counter becomes zero, shutdown the queue to finish
82
    std::mutex _mutex;
83
    int _counter;
84
};
85
86
// for kafka
87
class KafkaDataConsumerGroup : public DataConsumerGroup {
88
public:
89
1
    KafkaDataConsumerGroup(size_t consumer_num) : DataConsumerGroup(consumer_num), _queue(500) {}
90
91
    virtual ~KafkaDataConsumerGroup();
92
93
    Status start_all(std::shared_ptr<StreamLoadContext> ctx,
94
                     std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe) override;
95
    // assign topic partitions to all consumers equally
96
    Status assign_topic_partitions(std::shared_ptr<StreamLoadContext> ctx);
97
98
private:
99
    // start a single consumer
100
    void actual_consume(std::shared_ptr<DataConsumer> consumer,
101
                        BlockingQueue<RdKafka::Message*>* queue, int64_t max_running_time_ms,
102
                        ConsumeFinishCallback cb);
103
104
private:
105
    // blocking queue to receive msgs from all consumers
106
    BlockingQueue<RdKafka::Message*> _queue;
107
};
108
109
} // end namespace doris