Coverage Report

Created: 2026-03-16 01:21

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/routine_load/data_consumer_pool.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/routine_load/data_consumer_pool.h"
19
20
#include <gen_cpp/Types_types.h>
21
22
#include <algorithm>
23
// IWYU pragma: no_include <bits/chrono.h>
24
#include <chrono> // IWYU pragma: keep
25
#include <ctime>
26
#include <iterator>
27
#include <map>
28
#include <ostream>
29
#include <vector>
30
31
#include "common/config.h"
32
#include "common/logging.h"
33
#include "common/status.h"
34
#include "load/routine_load/data_consumer.h"
35
#include "load/routine_load/data_consumer_group.h"
36
#include "load/stream_load/stream_load_context.h"
37
#include "util/uid_util.h"
38
39
namespace doris {
40
41
Status DataConsumerPool::get_consumer(std::shared_ptr<StreamLoadContext> ctx,
42
2
                                      std::shared_ptr<DataConsumer>* ret) {
43
2
    std::unique_lock<std::mutex> l(_lock);
44
45
    // check if there is an available consumer.
46
    // if has, return it, also remove it from the pool
47
2
    auto iter = std::begin(_pool);
48
3
    while (iter != std::end(_pool)) {
49
1
        if ((*iter)->match(ctx)) {
50
0
            VLOG_NOTICE << "get an available data consumer from pool: " << (*iter)->id();
51
0
            static_cast<void>((*iter)->reset());
52
0
            *ret = *iter;
53
0
            iter = _pool.erase(iter);
54
0
            return Status::OK();
55
1
        } else {
56
1
            ++iter;
57
1
        }
58
1
    }
59
60
    // no available consumer, create a new one
61
2
    std::shared_ptr<DataConsumer> consumer;
62
2
    switch (ctx->load_src_type) {
63
2
    case TLoadSourceType::KAFKA:
64
2
        consumer = std::make_shared<KafkaDataConsumer>(ctx);
65
2
        break;
66
0
    default:
67
0
        return Status::InternalError("PAUSE: unknown routine load task type: {}", ctx->load_type);
68
2
    }
69
70
    // init the consumer
71
2
    RETURN_IF_ERROR(consumer->init(ctx));
72
73
2
    VLOG_NOTICE << "create new data consumer: " << consumer->id();
74
2
    *ret = consumer;
75
2
    return Status::OK();
76
2
}
77
78
Status DataConsumerPool::get_consumer_grp(std::shared_ptr<StreamLoadContext> ctx,
79
1
                                          std::shared_ptr<DataConsumerGroup>* ret) {
80
1
    if (ctx->load_src_type != TLoadSourceType::KAFKA) {
81
0
        return Status::InternalError(
82
0
                "PAUSE: Currently only support consumer group for Kafka data source");
83
0
    }
84
1
    DCHECK(ctx->kafka_info);
85
86
1
    if (ctx->kafka_info->begin_offset.size() == 0) {
87
0
        return Status::InternalError("PAUSE: The size of begin_offset of task should not be 0.");
88
0
    }
89
90
    // one data consumer group contains at least one data consumers.
91
1
    int max_consumer_num = config::max_consumer_num_per_group;
92
1
    size_t consumer_num = std::min((size_t)max_consumer_num, ctx->kafka_info->begin_offset.size());
93
94
1
    std::shared_ptr<KafkaDataConsumerGroup> grp =
95
1
            std::make_shared<KafkaDataConsumerGroup>(consumer_num);
96
97
2
    for (int i = 0; i < consumer_num; ++i) {
98
1
        std::shared_ptr<DataConsumer> consumer;
99
1
        RETURN_IF_ERROR(get_consumer(ctx, &consumer));
100
1
        grp->add_consumer(consumer);
101
1
    }
102
103
1
    LOG(INFO) << "get consumer group " << grp->grp_id() << " with " << consumer_num << " consumers";
104
1
    *ret = grp;
105
1
    return Status::OK();
106
1
}
107
108
2
void DataConsumerPool::return_consumer(std::shared_ptr<DataConsumer> consumer) {
109
2
    std::unique_lock<std::mutex> l(_lock);
110
111
2
    if (_pool.size() == config::routine_load_consumer_pool_size) {
112
0
        VLOG_NOTICE << "data consumer pool is full: " << _pool.size() << "-"
113
0
                    << config::routine_load_consumer_pool_size
114
0
                    << ", discard the returned consumer: " << consumer->id();
115
0
        return;
116
0
    }
117
118
2
    static_cast<void>(consumer->reset());
119
2
    _pool.push_back(consumer);
120
2
    VLOG_NOTICE << "return the data consumer: " << consumer->id()
121
0
                << ", current pool size: " << _pool.size();
122
2
}
123
124
1
void DataConsumerPool::return_consumers(DataConsumerGroup* grp) {
125
1
    for (std::shared_ptr<DataConsumer> consumer : grp->consumers()) {
126
1
        return_consumer(consumer);
127
1
    }
128
1
}
129
130
1
Status DataConsumerPool::start_bg_worker() {
131
1
    RETURN_IF_ERROR(Thread::create(
132
1
            "ResultBufferMgr", "clean_idle_consumer",
133
1
            [this]() {
134
1
                do {
135
1
                    _clean_idle_consumer_bg();
136
1
                } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(60)));
137
1
            },
138
1
            &_clean_idle_consumer_thread));
139
1
    return Status::OK();
140
1
}
141
142
1
void DataConsumerPool::_clean_idle_consumer_bg() {
143
1
    const static int32_t max_idle_time_second = 600;
144
145
1
    std::unique_lock<std::mutex> l(_lock);
146
1
    time_t now = time(nullptr);
147
148
1
    auto iter = std::begin(_pool);
149
1
    while (iter != std::end(_pool)) {
150
0
        if (difftime(now, (*iter)->last_visit_time()) >= max_idle_time_second) {
151
0
            LOG(INFO) << "remove data consumer " << (*iter)->id()
152
0
                      << ", since it last visit: " << (*iter)->last_visit_time()
153
0
                      << ", now: " << now;
154
0
            iter = _pool.erase(iter);
155
0
        } else {
156
0
            ++iter;
157
0
        }
158
0
    }
159
1
}
160
161
} // end namespace doris