be/src/load/routine_load/data_consumer_pool.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "load/routine_load/data_consumer_pool.h" |
19 | | |
20 | | #include <gen_cpp/Types_types.h> |
21 | | |
22 | | #include <algorithm> |
23 | | // IWYU pragma: no_include <bits/chrono.h> |
24 | | #include <chrono> // IWYU pragma: keep |
25 | | #include <ctime> |
26 | | #include <iterator> |
27 | | #include <map> |
28 | | #include <ostream> |
29 | | #include <vector> |
30 | | |
31 | | #include "common/config.h" |
32 | | #include "common/logging.h" |
33 | | #include "common/status.h" |
34 | | #include "load/routine_load/data_consumer.h" |
35 | | #include "load/routine_load/data_consumer_group.h" |
36 | | #include "load/stream_load/stream_load_context.h" |
37 | | #include "util/uid_util.h" |
38 | | |
39 | | namespace doris { |
40 | | |
41 | | Status DataConsumerPool::get_consumer(std::shared_ptr<StreamLoadContext> ctx, |
42 | 2 | std::shared_ptr<DataConsumer>* ret) { |
43 | 2 | std::unique_lock<std::mutex> l(_lock); |
44 | | |
45 | | // check if there is an available consumer. |
46 | | // if has, return it, also remove it from the pool |
47 | 2 | auto iter = std::begin(_pool); |
48 | 3 | while (iter != std::end(_pool)) { |
49 | 1 | if ((*iter)->match(ctx)) { |
50 | 0 | VLOG_NOTICE << "get an available data consumer from pool: " << (*iter)->id(); |
51 | 0 | static_cast<void>((*iter)->reset()); |
52 | 0 | *ret = *iter; |
53 | 0 | iter = _pool.erase(iter); |
54 | 0 | return Status::OK(); |
55 | 1 | } else { |
56 | 1 | ++iter; |
57 | 1 | } |
58 | 1 | } |
59 | | |
60 | | // no available consumer, create a new one |
61 | 2 | std::shared_ptr<DataConsumer> consumer; |
62 | 2 | switch (ctx->load_src_type) { |
63 | 2 | case TLoadSourceType::KAFKA: |
64 | 2 | consumer = std::make_shared<KafkaDataConsumer>(ctx); |
65 | 2 | break; |
66 | 0 | default: |
67 | 0 | return Status::InternalError("PAUSE: unknown routine load task type: {}", ctx->load_type); |
68 | 2 | } |
69 | | |
70 | | // init the consumer |
71 | 2 | RETURN_IF_ERROR(consumer->init(ctx)); |
72 | | |
73 | 2 | VLOG_NOTICE << "create new data consumer: " << consumer->id(); |
74 | 2 | *ret = consumer; |
75 | 2 | return Status::OK(); |
76 | 2 | } |
77 | | |
78 | | Status DataConsumerPool::get_consumer_grp(std::shared_ptr<StreamLoadContext> ctx, |
79 | 1 | std::shared_ptr<DataConsumerGroup>* ret) { |
80 | 1 | if (ctx->load_src_type != TLoadSourceType::KAFKA) { |
81 | 0 | return Status::InternalError( |
82 | 0 | "PAUSE: Currently only support consumer group for Kafka data source"); |
83 | 0 | } |
84 | 1 | DCHECK(ctx->kafka_info); |
85 | | |
86 | 1 | if (ctx->kafka_info->begin_offset.size() == 0) { |
87 | 0 | return Status::InternalError("PAUSE: The size of begin_offset of task should not be 0."); |
88 | 0 | } |
89 | | |
90 | | // one data consumer group contains at least one data consumers. |
91 | 1 | int max_consumer_num = config::max_consumer_num_per_group; |
92 | 1 | size_t consumer_num = std::min((size_t)max_consumer_num, ctx->kafka_info->begin_offset.size()); |
93 | | |
94 | 1 | std::shared_ptr<KafkaDataConsumerGroup> grp = |
95 | 1 | std::make_shared<KafkaDataConsumerGroup>(consumer_num); |
96 | | |
97 | 2 | for (int i = 0; i < consumer_num; ++i) { |
98 | 1 | std::shared_ptr<DataConsumer> consumer; |
99 | 1 | RETURN_IF_ERROR(get_consumer(ctx, &consumer)); |
100 | 1 | grp->add_consumer(consumer); |
101 | 1 | } |
102 | | |
103 | 1 | LOG(INFO) << "get consumer group " << grp->grp_id() << " with " << consumer_num << " consumers"; |
104 | 1 | *ret = grp; |
105 | 1 | return Status::OK(); |
106 | 1 | } |
107 | | |
108 | 2 | void DataConsumerPool::return_consumer(std::shared_ptr<DataConsumer> consumer) { |
109 | 2 | std::unique_lock<std::mutex> l(_lock); |
110 | | |
111 | 2 | if (_pool.size() == config::routine_load_consumer_pool_size) { |
112 | 0 | VLOG_NOTICE << "data consumer pool is full: " << _pool.size() << "-" |
113 | 0 | << config::routine_load_consumer_pool_size |
114 | 0 | << ", discard the returned consumer: " << consumer->id(); |
115 | 0 | return; |
116 | 0 | } |
117 | | |
118 | 2 | static_cast<void>(consumer->reset()); |
119 | 2 | _pool.push_back(consumer); |
120 | 2 | VLOG_NOTICE << "return the data consumer: " << consumer->id() |
121 | 0 | << ", current pool size: " << _pool.size(); |
122 | 2 | } |
123 | | |
124 | 1 | void DataConsumerPool::return_consumers(DataConsumerGroup* grp) { |
125 | 1 | for (std::shared_ptr<DataConsumer> consumer : grp->consumers()) { |
126 | 1 | return_consumer(consumer); |
127 | 1 | } |
128 | 1 | } |
129 | | |
130 | 1 | Status DataConsumerPool::start_bg_worker() { |
131 | 1 | RETURN_IF_ERROR(Thread::create( |
132 | 1 | "ResultBufferMgr", "clean_idle_consumer", |
133 | 1 | [this]() { |
134 | 1 | do { |
135 | 1 | _clean_idle_consumer_bg(); |
136 | 1 | } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(60))); |
137 | 1 | }, |
138 | 1 | &_clean_idle_consumer_thread)); |
139 | 1 | return Status::OK(); |
140 | 1 | } |
141 | | |
142 | 1 | void DataConsumerPool::_clean_idle_consumer_bg() { |
143 | 1 | const static int32_t max_idle_time_second = 600; |
144 | | |
145 | 1 | std::unique_lock<std::mutex> l(_lock); |
146 | 1 | time_t now = time(nullptr); |
147 | | |
148 | 1 | auto iter = std::begin(_pool); |
149 | 1 | while (iter != std::end(_pool)) { |
150 | 0 | if (difftime(now, (*iter)->last_visit_time()) >= max_idle_time_second) { |
151 | 0 | LOG(INFO) << "remove data consumer " << (*iter)->id() |
152 | 0 | << ", since it last visit: " << (*iter)->last_visit_time() |
153 | 0 | << ", now: " << now; |
154 | 0 | iter = _pool.erase(iter); |
155 | 0 | } else { |
156 | 0 | ++iter; |
157 | 0 | } |
158 | 0 | } |
159 | 1 | } |
160 | | |
161 | | } // end namespace doris |