Coverage Report

Created: 2026-05-15 01:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/routine_load/data_consumer_pool.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/routine_load/data_consumer_pool.h"
19
20
#include <gen_cpp/Types_types.h>
21
22
#include <algorithm>
23
// IWYU pragma: no_include <bits/chrono.h>
24
#include <chrono> // IWYU pragma: keep
25
#include <ctime>
26
#include <iterator>
27
#include <map>
28
#include <ostream>
29
#include <vector>
30
31
#include "common/config.h"
32
#include "common/logging.h"
33
#include "common/status.h"
34
#include "load/routine_load/data_consumer.h"
35
#include "load/routine_load/data_consumer_group.h"
36
#include "load/stream_load/stream_load_context.h"
37
#include "util/uid_util.h"
38
39
namespace doris {
40
41
Status DataConsumerPool::get_consumer(std::shared_ptr<StreamLoadContext> ctx,
42
2
                                      std::shared_ptr<DataConsumer>* ret) {
43
2
    std::unique_lock<std::mutex> l(_lock);
44
45
    // check if there is an available consumer.
46
    // if has, return it, also remove it from the pool
47
2
    auto iter = std::begin(_pool);
48
3
    while (iter != std::end(_pool)) {
49
1
        if ((*iter)->match(ctx)) {
50
0
            VLOG_NOTICE << "get an available data consumer from pool: " << (*iter)->id();
51
0
            static_cast<void>((*iter)->reset());
52
0
            *ret = *iter;
53
0
            iter = _pool.erase(iter);
54
0
            return Status::OK();
55
1
        } else {
56
1
            ++iter;
57
1
        }
58
1
    }
59
60
    // no available consumer, create a new one
61
2
    std::shared_ptr<DataConsumer> consumer;
62
2
    switch (ctx->load_src_type) {
63
2
    case TLoadSourceType::KAFKA:
64
2
        consumer = std::make_shared<KafkaDataConsumer>(ctx);
65
2
        break;
66
0
    case TLoadSourceType::KINESIS:
67
0
        consumer = std::make_shared<KinesisDataConsumer>(ctx);
68
0
        break;
69
0
    default:
70
0
        return Status::InternalError("PAUSE: unknown routine load task type: {}", ctx->load_type);
71
2
    }
72
73
    // init the consumer
74
2
    RETURN_IF_ERROR(consumer->init(ctx));
75
76
2
    VLOG_NOTICE << "create new data consumer: " << consumer->id();
77
2
    *ret = consumer;
78
2
    return Status::OK();
79
2
}
80
81
Status DataConsumerPool::get_consumer_grp(std::shared_ptr<StreamLoadContext> ctx,
82
1
                                          std::shared_ptr<DataConsumerGroup>* ret) {
83
1
    size_t consumer_num = config::max_consumer_num_per_group;
84
1
    std::shared_ptr<DataConsumerGroup> grp;
85
1
    switch (ctx->load_src_type) {
86
1
    case TLoadSourceType::KAFKA: {
87
1
        DCHECK(ctx->kafka_info);
88
1
        if (ctx->kafka_info->begin_offset.size() == 0) {
89
0
            return Status::InternalError(
90
0
                    "PAUSE: The size of begin_offset of task should not be 0.");
91
0
        }
92
1
        consumer_num = std::min(consumer_num, ctx->kafka_info->begin_offset.size());
93
1
        grp = std::make_shared<KafkaDataConsumerGroup>(consumer_num);
94
1
        break;
95
1
    }
96
0
    case TLoadSourceType::KINESIS: {
97
0
        DCHECK(ctx->kinesis_info);
98
0
        if (ctx->kinesis_info->begin_sequence_number.size() == 0) {
99
0
            return Status::InternalError(
100
0
                    "PAUSE: The size of begin_sequence_number of task should not be 0.");
101
0
        }
102
0
        consumer_num = std::min(consumer_num, ctx->kinesis_info->begin_sequence_number.size());
103
0
        grp = std::make_shared<KinesisDataConsumerGroup>(consumer_num);
104
0
        break;
105
0
    }
106
0
    default:
107
0
        return Status::Cancelled("unknown routine load task type: {}", ctx->load_type);
108
1
    }
109
110
2
    for (int i = 0; i < consumer_num; ++i) {
111
1
        std::shared_ptr<DataConsumer> consumer;
112
1
        RETURN_IF_ERROR(get_consumer(ctx, &consumer));
113
1
        grp->add_consumer(consumer);
114
1
    }
115
116
1
    LOG(INFO) << "get consumer group " << grp->grp_id() << " with " << consumer_num << " consumers";
117
1
    *ret = grp;
118
1
    return Status::OK();
119
1
}
120
121
2
void DataConsumerPool::return_consumer(std::shared_ptr<DataConsumer> consumer) {
122
2
    std::unique_lock<std::mutex> l(_lock);
123
124
2
    if (_pool.size() == config::routine_load_consumer_pool_size) {
125
0
        VLOG_NOTICE << "data consumer pool is full: " << _pool.size() << "-"
126
0
                    << config::routine_load_consumer_pool_size
127
0
                    << ", discard the returned consumer: " << consumer->id();
128
0
        return;
129
0
    }
130
131
2
    static_cast<void>(consumer->reset());
132
2
    _pool.push_back(consumer);
133
2
    VLOG_NOTICE << "return the data consumer: " << consumer->id()
134
0
                << ", current pool size: " << _pool.size();
135
2
}
136
137
1
void DataConsumerPool::return_consumers(DataConsumerGroup* grp) {
138
1
    for (std::shared_ptr<DataConsumer> consumer : grp->consumers()) {
139
1
        return_consumer(consumer);
140
1
    }
141
1
}
142
143
1
Status DataConsumerPool::start_bg_worker() {
144
1
    RETURN_IF_ERROR(Thread::create(
145
1
            "ResultBufferMgr", "clean_idle_consumer",
146
1
            [this]() {
147
1
                do {
148
1
                    _clean_idle_consumer_bg();
149
1
                } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(60)));
150
1
            },
151
1
            &_clean_idle_consumer_thread));
152
1
    return Status::OK();
153
1
}
154
155
1
void DataConsumerPool::_clean_idle_consumer_bg() {
156
1
    const static int32_t max_idle_time_second = 600;
157
158
1
    std::unique_lock<std::mutex> l(_lock);
159
1
    time_t now = time(nullptr);
160
161
1
    auto iter = std::begin(_pool);
162
1
    while (iter != std::end(_pool)) {
163
0
        if (difftime(now, (*iter)->last_visit_time()) >= max_idle_time_second) {
164
0
            LOG(INFO) << "remove data consumer " << (*iter)->id()
165
0
                      << ", since it last visit: " << (*iter)->last_visit_time()
166
0
                      << ", now: " << now;
167
0
            iter = _pool.erase(iter);
168
0
        } else {
169
0
            ++iter;
170
0
        }
171
0
    }
172
1
}
173
174
} // end namespace doris