Coverage Report

Created: 2026-05-17 12:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/routine_load/data_consumer_group.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <stdint.h>
21
22
#include <functional>
23
#include <memory>
24
#include <mutex>
25
#include <vector>
26
27
#include "common/cast_set.h"
28
#include "common/status.h"
29
#include "load/routine_load/data_consumer.h"
30
#include "util/blocking_queue.hpp"
31
#include "util/uid_util.h"
32
#include "util/work_thread_pool.hpp"
33
34
namespace RdKafka {
35
class Message;
36
} // namespace RdKafka
37
38
namespace doris {
39
class StreamLoadContext;
40
41
// data consumer group saves a group of data consumers.
42
// These data consumers share the same stream load pipe.
43
// This class is not thread safe.
44
class DataConsumerGroup {
45
public:
46
    typedef std::function<void(const Status&)> ConsumeFinishCallback;
47
48
    DataConsumerGroup(size_t consumer_num)
49
1
            : _grp_id(UniqueId::gen_uid()),
50
1
              _thread_pool(doris::cast_set<uint32_t>(consumer_num),
51
1
                           doris::cast_set<uint32_t>(consumer_num), "data_consumer"),
52
1
              _counter(0) {}
53
54
1
    virtual ~DataConsumerGroup() { _consumers.clear(); }
55
56
1
    const UniqueId& grp_id() { return _grp_id; }
57
58
1
    const std::vector<std::shared_ptr<DataConsumer>>& consumers() { return _consumers; }
59
60
1
    void add_consumer(std::shared_ptr<DataConsumer> consumer) {
61
1
        consumer->set_grp(_grp_id);
62
1
        _consumers.push_back(consumer);
63
1
        ++_counter;
64
1
    }
65
66
    // start all consumers
67
    virtual Status start_all(std::shared_ptr<StreamLoadContext> ctx,
68
0
                             std::shared_ptr<io::StreamLoadPipe> pipe) {
69
0
        return Status::OK();
70
0
    }
71
72
protected:
73
    // Submit all consumers to thread pool.
74
    // consume_fn: wraps actual_consume per consumer.
75
    // shutdown_fn: called when last consumer finishes (shuts down queue).
76
    // Returns false if any submission fails.
77
    bool _submit_all_consumers(
78
            std::function<void(std::shared_ptr<DataConsumer>, ConsumeFinishCallback)> consume_fn,
79
            std::function<void()> shutdown_fn, Status& result_st);
80
81
    // Shared consumption loop skeleton. Calls _dequeue_and_process per iteration.
82
    Status _run_consume_loop(std::shared_ptr<StreamLoadContext> ctx,
83
                             std::shared_ptr<io::StreamLoadPipe> pipe, Status& result_st);
84
85
    // Dequeue one item and append to pipe. Update left_rows/left_bytes.
86
    // Returns false → queue empty/shutdown (eos). Returns true → continue.
87
    virtual bool _dequeue_and_process(io::StreamLoadPipe* pipe, int64_t& left_rows,
88
                                      int64_t& left_bytes, Status& result_st) = 0;
89
90
    // Shutdown the subclass queue. Called at loop exit.
91
    virtual void _shutdown_queue() = 0;
92
93
    // Called after successful finish. Override to collect post-consume state.
94
0
    virtual void _on_finish(std::shared_ptr<StreamLoadContext> ctx) {}
95
96
    UniqueId _grp_id;
97
    std::vector<std::shared_ptr<DataConsumer>> _consumers;
98
    // thread pool to run each consumer in multi thread
99
    PriorityThreadPool _thread_pool;
100
    // mutex to protect counter.
101
    // the counter is init as the number of consumers.
102
    // once a consumer is done, decrease the counter.
103
    // when the counter becomes zero, shutdown the queue to finish
104
    std::mutex _mutex;
105
    int _counter;
106
};
107
108
// for kafka
109
class KafkaDataConsumerGroup : public DataConsumerGroup {
110
public:
111
1
    KafkaDataConsumerGroup(size_t consumer_num) : DataConsumerGroup(consumer_num), _queue(500) {}
112
113
    ~KafkaDataConsumerGroup() override;
114
115
    Status start_all(std::shared_ptr<StreamLoadContext> ctx,
116
                     std::shared_ptr<io::StreamLoadPipe> pipe) override;
117
    // assign topic partitions to all consumers equally
118
    Status assign_topic_partitions(std::shared_ptr<StreamLoadContext> ctx);
119
120
    // start a single consumer
121
    void actual_consume(std::shared_ptr<DataConsumer> consumer,
122
                        BlockingQueue<RdKafka::Message*>* queue, int64_t max_running_time_ms,
123
                        ConsumeFinishCallback cb);
124
125
private:
126
    bool _dequeue_and_process(io::StreamLoadPipe* pipe, int64_t& left_rows, int64_t& left_bytes,
127
                              Status& result_st) override;
128
1
    void _shutdown_queue() override { _queue.shutdown(); }
129
    void _on_finish(std::shared_ptr<StreamLoadContext> ctx) override;
130
131
    BlockingQueue<RdKafka::Message*> _queue;
132
    std::map<int32_t, int64_t> _cmt_offset;
133
    TFileFormatType::type _format;
134
};
135
136
// for kinesis
137
class KinesisDataConsumerGroup : public DataConsumerGroup {
138
public:
139
0
    KinesisDataConsumerGroup(size_t consumer_num) : DataConsumerGroup(consumer_num), _queue(500) {}
140
141
    ~KinesisDataConsumerGroup() override;
142
143
    Status start_all(std::shared_ptr<StreamLoadContext> ctx,
144
                     std::shared_ptr<io::StreamLoadPipe> pipe) override;
145
146
    Status assign_stream_shards(std::shared_ptr<StreamLoadContext> ctx);
147
148
private:
149
    void actual_consume(std::shared_ptr<DataConsumer> consumer,
150
                        BlockingQueue<std::shared_ptr<Aws::Kinesis::Model::Record>>* queue,
151
                        int64_t max_running_time_ms, ConsumeFinishCallback cb);
152
153
    bool _dequeue_and_process(io::StreamLoadPipe* pipe, int64_t& left_rows, int64_t& left_bytes,
154
                              Status& result_st) override;
155
0
    void _shutdown_queue() override { _queue.shutdown(); }
156
    void _on_finish(std::shared_ptr<StreamLoadContext> ctx) override;
157
158
    BlockingQueue<std::shared_ptr<Aws::Kinesis::Model::Record>> _queue;
159
    TFileFormatType::type _format;
160
};
161
162
} // end namespace doris