be/src/load/channel/load_channel_mgr.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "load/channel/load_channel_mgr.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/internal_service.pb.h> |
22 | | |
23 | | #include <algorithm> |
24 | | // IWYU pragma: no_include <bits/chrono.h> |
25 | | #include <chrono> // IWYU pragma: keep |
26 | | #include <ctime> |
27 | | #include <memory> |
28 | | #include <ostream> |
29 | | #include <string> |
30 | | #include <vector> |
31 | | |
32 | | #include "common/config.h" |
33 | | #include "common/logging.h" |
34 | | #include "common/metrics/doris_metrics.h" |
35 | | #include "common/metrics/metrics.h" |
36 | | #include "load/channel/load_channel.h" |
37 | | #include "runtime/exec_env.h" |
38 | | #include "util/thread.h" |
39 | | |
40 | | namespace doris { |
41 | | |
42 | | #ifndef BE_TEST |
43 | | constexpr uint32_t START_BG_INTERVAL = 60; |
44 | | #else |
45 | | constexpr uint32_t START_BG_INTERVAL = 1; |
46 | | #endif |
47 | | |
48 | | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(load_channel_count, MetricUnit::NOUNIT); |
49 | | DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(load_channel_mem_consumption, MetricUnit::BYTES, "", |
50 | | mem_consumption, Labels({{"type", "load"}})); |
51 | | |
52 | 318 | static int64_t calc_channel_timeout_s(int64_t timeout_in_req_s) { |
53 | 318 | int64_t load_channel_timeout_s = config::streaming_load_rpc_max_alive_time_sec; |
54 | 318 | if (timeout_in_req_s > 0) { |
55 | 318 | load_channel_timeout_s = std::max<int64_t>(load_channel_timeout_s, timeout_in_req_s); |
56 | 318 | } |
57 | 318 | return load_channel_timeout_s; |
58 | 318 | } |
59 | | |
60 | 6 | LoadChannelMgr::LoadChannelMgr() : _stop_background_threads_latch(1) { |
61 | 6 | REGISTER_HOOK_METRIC(load_channel_count, [this]() { |
62 | | // std::lock_guard<std::mutex> l(_lock); |
63 | 6 | return _load_channels.size(); |
64 | 6 | }); |
65 | 6 | } |
66 | | |
67 | 3 | void LoadChannelMgr::stop() { |
68 | 3 | DEREGISTER_HOOK_METRIC(load_channel_count); |
69 | 3 | DEREGISTER_HOOK_METRIC(load_channel_mem_consumption); |
70 | 3 | _stop_background_threads_latch.count_down(); |
71 | 3 | if (_load_channels_clean_thread) { |
72 | 3 | _load_channels_clean_thread->join(); |
73 | 3 | } |
74 | 3 | } |
75 | | |
76 | 6 | Status LoadChannelMgr::init(int64_t process_mem_limit) { |
77 | 6 | _load_state_channels = std::make_unique<LoadStateChannelCache>(1024); |
78 | 6 | RETURN_IF_ERROR(_start_bg_worker()); |
79 | 6 | return Status::OK(); |
80 | 6 | } |
81 | | |
82 | 556 | Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) { |
83 | 556 | UniqueId load_id(params.id()); |
84 | 556 | std::shared_ptr<LoadChannel> channel; |
85 | 556 | { |
86 | 556 | std::lock_guard<std::mutex> l(_lock); |
87 | 556 | auto it = _load_channels.find(load_id); |
88 | 556 | if (it != _load_channels.end()) { |
89 | 238 | channel = it->second; |
90 | 318 | } else { |
91 | | // create a new load channel |
92 | 318 | int64_t timeout_in_req_s = |
93 | 318 | params.has_load_channel_timeout_s() ? params.load_channel_timeout_s() : -1; |
94 | 318 | int64_t channel_timeout_s = calc_channel_timeout_s(timeout_in_req_s); |
95 | 318 | bool is_high_priority = (params.has_is_high_priority() && params.is_high_priority()); |
96 | | |
97 | 318 | int64_t wg_id = -1; |
98 | 318 | if (params.has_workload_group_id()) { |
99 | 318 | wg_id = params.workload_group_id(); |
100 | 318 | } |
101 | 318 | channel.reset(new LoadChannel(load_id, channel_timeout_s, is_high_priority, |
102 | 318 | params.sender_ip(), params.backend_id(), |
103 | 318 | params.enable_profile(), wg_id)); |
104 | 318 | _load_channels.insert({load_id, channel}); |
105 | 318 | } |
106 | 556 | } |
107 | | |
108 | 556 | RETURN_IF_ERROR(channel->open(params)); |
109 | | |
110 | 556 | return Status::OK(); |
111 | 556 | } |
112 | | |
113 | | Status LoadChannelMgr::_get_load_channel(std::shared_ptr<LoadChannel>& channel, bool& is_eof, |
114 | | const UniqueId& load_id, |
115 | 556 | const PTabletWriterAddBlockRequest& request) { |
116 | 556 | is_eof = false; |
117 | 556 | std::lock_guard<std::mutex> l(_lock); |
118 | 556 | auto it = _load_channels.find(load_id); |
119 | 556 | if (it == _load_channels.end()) { |
120 | 0 | Cache::Handle* handle = _load_state_channels->lookup(load_id.to_string()); |
121 | 0 | if (handle != nullptr) { |
122 | | // load is cancelled |
123 | 0 | if (auto* value = _load_state_channels->value(handle); value != nullptr) { |
124 | 0 | const auto& cancel_reason = reinterpret_cast<CacheValue*>(value)->_cancel_reason; |
125 | 0 | _load_state_channels->release(handle); |
126 | 0 | if (!cancel_reason.empty()) { |
127 | 0 | LOG(INFO) << fmt::format( |
128 | 0 | "The channel has been cancelled, load_id = {}, error = {}", |
129 | 0 | print_id(load_id), cancel_reason); |
130 | 0 | return Status::Cancelled(cancel_reason); |
131 | 0 | } |
132 | 0 | } else { |
133 | | // load is success, success only when eos be true |
134 | 0 | _load_state_channels->release(handle); |
135 | 0 | if (request.has_eos() && request.eos()) { |
136 | 0 | is_eof = true; |
137 | 0 | return Status::OK(); |
138 | 0 | } |
139 | 0 | } |
140 | 0 | } |
141 | | |
142 | 0 | return Status::InternalError<false>( |
143 | 0 | "Fail to add batch in load channel: unknown load_id={}. " |
144 | 0 | "This may be due to a BE restart. Please retry the load.", |
145 | 0 | load_id.to_string()); |
146 | 0 | } |
147 | 556 | channel = it->second; |
148 | 556 | return Status::OK(); |
149 | 556 | } |
150 | | |
151 | | Status LoadChannelMgr::add_batch(const PTabletWriterAddBlockRequest& request, |
152 | 556 | PTabletWriterAddBlockResult* response) { |
153 | 556 | UniqueId load_id(request.id()); |
154 | | // 1. get load channel |
155 | 556 | std::shared_ptr<LoadChannel> channel; |
156 | 556 | bool is_eof; |
157 | 556 | auto status = _get_load_channel(channel, is_eof, load_id, request); |
158 | 556 | if (!status.ok() || is_eof) { |
159 | 0 | return status; |
160 | 0 | } |
161 | 556 | SCOPED_TIMER(channel->get_mgr_add_batch_timer()); |
162 | | |
163 | 556 | if (!channel->is_high_priority()) { |
164 | | // 2. check if mem consumption exceed limit |
165 | | // If this is a high priority load task, do not handle this. |
166 | | // because this may block for a while, which may lead to rpc timeout. |
167 | 464 | SCOPED_TIMER(channel->get_handle_mem_limit_timer()); |
168 | 464 | ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush( |
169 | 464 | [channel]() { return channel->is_cancelled(); }); |
170 | 464 | if (channel->is_cancelled()) { |
171 | 0 | return Status::Cancelled("LoadChannel has been cancelled: {}.", load_id.to_string()); |
172 | 0 | } |
173 | 464 | } |
174 | | |
175 | | // 3. add batch to load channel |
176 | | // batch may not exist in request(eg: eos request without batch), |
177 | | // this case will be handled in load channel's add batch method. |
178 | 556 | Status st = channel->add_batch(request, response); |
179 | 556 | if (UNLIKELY(!st.ok())) { |
180 | 0 | RETURN_IF_ERROR(channel->cancel()); |
181 | 0 | return st; |
182 | 0 | } |
183 | | |
184 | | // 4. handle finish |
185 | 556 | if (channel->is_finished()) { |
186 | 318 | _finish_load_channel(load_id); |
187 | 318 | } |
188 | 556 | return Status::OK(); |
189 | 556 | } |
190 | | |
191 | 318 | void LoadChannelMgr::_finish_load_channel(const UniqueId load_id) { |
192 | 318 | VLOG_NOTICE << "removing load channel " << load_id << " because it's finished"; |
193 | 318 | { |
194 | 318 | std::lock_guard<std::mutex> l(_lock); |
195 | 318 | if (_load_channels.contains(load_id)) { |
196 | 318 | _load_channels.erase(load_id); |
197 | 318 | } |
198 | 318 | auto* handle = _load_state_channels->insert(load_id.to_string(), nullptr, 1, 1); |
199 | 318 | _load_state_channels->release(handle); |
200 | 318 | } |
201 | 318 | VLOG_CRITICAL << "removed load channel " << load_id; |
202 | 318 | } |
203 | | |
204 | 0 | Status LoadChannelMgr::cancel(const PTabletWriterCancelRequest& params) { |
205 | 0 | UniqueId load_id(params.id()); |
206 | 0 | std::shared_ptr<LoadChannel> cancelled_channel; |
207 | 0 | { |
208 | 0 | std::lock_guard<std::mutex> l(_lock); |
209 | 0 | if (_load_channels.contains(load_id)) { |
210 | 0 | cancelled_channel = _load_channels[load_id]; |
211 | 0 | _load_channels.erase(load_id); |
212 | 0 | } |
213 | | // We just need to record the first cancel msg |
214 | 0 | auto* existing_handle = _load_state_channels->lookup(load_id.to_string()); |
215 | 0 | if (existing_handle == nullptr) { |
216 | 0 | if (params.has_cancel_reason() && !params.cancel_reason().empty()) { |
217 | 0 | std::unique_ptr<CacheValue> cancel_reason_ptr = std::make_unique<CacheValue>(); |
218 | 0 | cancel_reason_ptr->_cancel_reason = params.cancel_reason(); |
219 | 0 | size_t cache_capacity = |
220 | 0 | cancel_reason_ptr->_cancel_reason.capacity() + sizeof(CacheValue); |
221 | 0 | auto* handle = _load_state_channels->insert( |
222 | 0 | load_id.to_string(), cancel_reason_ptr.get(), 1, cache_capacity); |
223 | 0 | cancel_reason_ptr.release(); |
224 | 0 | _load_state_channels->release(handle); |
225 | 0 | LOG(INFO) << fmt::format("load_id = {}, record_error reason = {}", |
226 | 0 | print_id(load_id), params.cancel_reason()); |
227 | 0 | } |
228 | 0 | } else { |
229 | 0 | _load_state_channels->release(existing_handle); |
230 | 0 | } |
231 | 0 | } |
232 | |
|
233 | 0 | if (cancelled_channel != nullptr) { |
234 | 0 | RETURN_IF_ERROR(cancelled_channel->cancel()); |
235 | 0 | LOG(INFO) << "load channel has been cancelled: " << load_id; |
236 | 0 | } |
237 | | |
238 | 0 | return Status::OK(); |
239 | 0 | } |
240 | | |
241 | 6 | Status LoadChannelMgr::_start_bg_worker() { |
242 | 6 | RETURN_IF_ERROR(Thread::create( |
243 | 6 | "LoadChannelMgr", "cancel_timeout_load_channels", |
244 | 6 | [this]() { |
245 | 6 | while (!_stop_background_threads_latch.wait_for( |
246 | 6 | std::chrono::seconds(START_BG_INTERVAL))) { |
247 | 6 | static_cast<void>(_start_load_channels_clean()); |
248 | 6 | } |
249 | 6 | }, |
250 | 6 | &_load_channels_clean_thread)); |
251 | | |
252 | 6 | return Status::OK(); |
253 | 6 | } |
254 | | |
255 | 149 | Status LoadChannelMgr::_start_load_channels_clean() { |
256 | 149 | std::vector<std::shared_ptr<LoadChannel>> need_delete_channels; |
257 | 149 | LOG(INFO) << "cleaning timed out load channels"; |
258 | 149 | time_t now = time(nullptr); |
259 | 149 | { |
260 | 149 | std::vector<UniqueId> need_delete_channel_ids; |
261 | 149 | std::lock_guard<std::mutex> l(_lock); |
262 | 149 | int i = 0; |
263 | 149 | for (auto& kv : _load_channels) { |
264 | 38 | VLOG_CRITICAL << "load channel[" << i++ << "]: " << *(kv.second); |
265 | 38 | time_t last_updated_time = kv.second->last_updated_time(); |
266 | 38 | if (difftime(now, last_updated_time) >= kv.second->timeout()) { |
267 | 0 | need_delete_channel_ids.emplace_back(kv.first); |
268 | 0 | need_delete_channels.emplace_back(kv.second); |
269 | 0 | } |
270 | 38 | } |
271 | | |
272 | 149 | for (auto& key : need_delete_channel_ids) { |
273 | 0 | _load_channels.erase(key); |
274 | 0 | LOG(INFO) << "erase timeout load channel: " << key; |
275 | 0 | } |
276 | 149 | } |
277 | | |
278 | | // we must cancel these load channels before destroying them. |
279 | | // otherwise some object may be invalid before trying to visit it. |
280 | | // eg: MemTracker in load channel |
281 | 149 | for (auto& channel : need_delete_channels) { |
282 | 0 | RETURN_IF_ERROR(channel->cancel()); |
283 | 0 | LOG(INFO) << "load channel has been safely deleted: " << channel->load_id() |
284 | 0 | << ", timeout(s): " << channel->timeout(); |
285 | 0 | } |
286 | | |
287 | 149 | return Status::OK(); |
288 | 149 | } |
289 | | } // namespace doris |