Coverage Report

Created: 2026-03-15 08:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/routine_load/data_consumer_group.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <stdint.h>
21
22
#include <functional>
23
#include <memory>
24
#include <mutex>
25
#include <vector>
26
27
#include "common/cast_set.h"
28
#include "common/status.h"
29
#include "io/fs/kafka_consumer_pipe.h"
30
#include "load/routine_load/data_consumer.h"
31
#include "util/blocking_queue.hpp"
32
#include "util/uid_util.h"
33
#include "util/work_thread_pool.hpp"
34
35
namespace RdKafka {
36
class Message;
37
} // namespace RdKafka
38
39
namespace doris {
40
#include "common/compile_check_begin.h"
41
class StreamLoadContext;
42
43
// data consumer group saves a group of data consumers.
44
// These data consumers share the same stream load pipe.
45
// This class is not thread safe.
46
class DataConsumerGroup {
47
public:
48
    typedef std::function<void(const Status&)> ConsumeFinishCallback;
49
50
    DataConsumerGroup(size_t consumer_num)
51
1
            : _grp_id(UniqueId::gen_uid()),
52
1
              _thread_pool(doris::cast_set<uint32_t>(consumer_num),
53
1
                           doris::cast_set<uint32_t>(consumer_num), "data_consumer"),
54
1
              _counter(0) {}
55
56
1
    virtual ~DataConsumerGroup() { _consumers.clear(); }
57
58
1
    const UniqueId& grp_id() { return _grp_id; }
59
60
1
    const std::vector<std::shared_ptr<DataConsumer>>& consumers() { return _consumers; }
61
62
1
    void add_consumer(std::shared_ptr<DataConsumer> consumer) {
63
1
        consumer->set_grp(_grp_id);
64
1
        _consumers.push_back(consumer);
65
1
        ++_counter;
66
1
    }
67
68
    // start all consumers
69
    virtual Status start_all(std::shared_ptr<StreamLoadContext> ctx,
70
0
                             std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe) {
71
0
        return Status::OK();
72
0
    }
73
74
protected:
75
    UniqueId _grp_id;
76
    std::vector<std::shared_ptr<DataConsumer>> _consumers;
77
    // thread pool to run each consumer in multi thread
78
    PriorityThreadPool _thread_pool;
79
    // mutex to protect counter.
80
    // the counter is init as the number of consumers.
81
    // once a consumer is done, decrease the counter.
82
    // when the counter becomes zero, shutdown the queue to finish
83
    std::mutex _mutex;
84
    int _counter;
85
};
86
87
// for kafka
88
class KafkaDataConsumerGroup : public DataConsumerGroup {
89
public:
90
1
    KafkaDataConsumerGroup(size_t consumer_num) : DataConsumerGroup(consumer_num), _queue(500) {}
91
92
    virtual ~KafkaDataConsumerGroup();
93
94
    Status start_all(std::shared_ptr<StreamLoadContext> ctx,
95
                     std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe) override;
96
    // assign topic partitions to all consumers equally
97
    Status assign_topic_partitions(std::shared_ptr<StreamLoadContext> ctx);
98
99
private:
100
    // start a single consumer
101
    void actual_consume(std::shared_ptr<DataConsumer> consumer,
102
                        BlockingQueue<RdKafka::Message*>* queue, int64_t max_running_time_ms,
103
                        ConsumeFinishCallback cb);
104
105
private:
106
    // blocking queue to receive msgs from all consumers
107
    BlockingQueue<RdKafka::Message*> _queue;
108
};
109
#include "common/compile_check_end.h"
110
111
} // end namespace doris