be/src/load/memtable/memtable_memory_limiter.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "load/memtable/memtable_memory_limiter.h" |
19 | | |
20 | | #include <bvar/bvar.h> |
21 | | |
22 | | #include "common/config.h" |
23 | | #include "common/metrics/doris_metrics.h" |
24 | | #include "common/metrics/metrics.h" |
25 | | #include "load/memtable/memtable.h" |
26 | | #include "load/memtable/memtable_writer.h" |
27 | | #include "runtime/workload_group/workload_group_manager.h" |
28 | | #include "util/mem_info.h" |
29 | | |
30 | | namespace doris { |
31 | | DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(memtable_memory_limiter_mem_consumption, MetricUnit::BYTES, "", |
32 | | memtable_memory_limiter_mem_consumption, |
33 | | Labels({{"type", "load"}})); |
34 | | |
35 | | bvar::LatencyRecorder g_memtable_memory_limit_latency_ms("mm_limiter_limit_time_ms"); |
36 | | bvar::Adder<int> g_memtable_memory_limit_waiting_threads("mm_limiter_waiting_threads"); |
37 | | bvar::Status<int64_t> g_memtable_active_memory("mm_limiter_mem_active", 0); |
38 | | bvar::Status<int64_t> g_memtable_write_memory("mm_limiter_mem_write", 0); |
39 | | bvar::Status<int64_t> g_memtable_flush_memory("mm_limiter_mem_flush", 0); |
40 | | bvar::Status<int64_t> g_memtable_load_memory("mm_limiter_mem_load", 0); |
41 | | bvar::Status<int64_t> g_load_hard_mem_limit("mm_limiter_limit_hard", 0); |
42 | | bvar::Status<int64_t> g_load_soft_mem_limit("mm_limiter_limit_soft", 0); |
43 | | bvar::Adder<uint64_t> g_flush_cuz_load_mem_exceed_hard_limit("flush_cuz_hard_limit"); |
44 | | bvar::Adder<uint64_t> g_flush_cuz_sys_mem_exceed_soft_limit("flush_cuz_soft_limit"); |
45 | | bvar::Adder<int> g_memtable_memory_limit_flush_memtable_count("mm_limiter_flush_memtable_count"); |
46 | | bvar::LatencyRecorder g_memtable_memory_limit_flush_size_bytes("mm_limiter_flush_size_bytes"); |
47 | | |
48 | | // Calculate the total memory limit of all load tasks on this BE |
49 | 1 | static int64_t calc_process_max_load_memory(int64_t process_mem_limit) { |
50 | 1 | if (process_mem_limit == -1) { |
51 | | // no limit |
52 | 0 | return -1; |
53 | 0 | } |
54 | 1 | int32_t max_load_memory_percent = config::load_process_max_memory_limit_percent; |
55 | 1 | return process_mem_limit * max_load_memory_percent / 100; |
56 | 1 | } |
57 | | |
58 | 18 | MemTableMemoryLimiter::MemTableMemoryLimiter() {} |
59 | | |
60 | 18 | MemTableMemoryLimiter::~MemTableMemoryLimiter() { |
61 | 18 | DEREGISTER_HOOK_METRIC(memtable_memory_limiter_mem_consumption); |
62 | 18 | } |
63 | | |
64 | 1 | Status MemTableMemoryLimiter::init(int64_t process_mem_limit) { |
65 | 1 | _load_hard_mem_limit = calc_process_max_load_memory(process_mem_limit); |
66 | 1 | _load_soft_mem_limit = _load_hard_mem_limit * config::load_process_soft_mem_limit_percent / 100; |
67 | 1 | _load_safe_mem_permit = |
68 | 1 | _load_hard_mem_limit * config::load_process_safe_mem_permit_percent / 100; |
69 | 1 | g_load_hard_mem_limit.set_value(_load_hard_mem_limit); |
70 | 1 | g_load_soft_mem_limit.set_value(_load_soft_mem_limit); |
71 | 1 | _mem_tracker = std::make_unique<MemTracker>("AllMemTableMemory"); |
72 | 1 | REGISTER_HOOK_METRIC(memtable_memory_limiter_mem_consumption, |
73 | 1 | [this]() { return _mem_tracker->consumption(); }); |
74 | 1 | _log_timer.start(); |
75 | 1 | return Status::OK(); |
76 | 1 | } |
77 | | |
78 | 15 | void MemTableMemoryLimiter::register_writer(std::weak_ptr<MemTableWriter> writer) { |
79 | 15 | std::lock_guard<std::mutex> l(_lock); |
80 | 15 | _writers.push_back(writer); |
81 | 15 | } |
82 | | |
83 | 1 | int64_t MemTableMemoryLimiter::_sys_avail_mem_less_than_warning_water_mark() { |
84 | | // reserve a small amount of memory so we do not trigger MinorGC |
85 | 1 | return doris::MemInfo::sys_mem_available_warning_water_mark() - |
86 | 1 | doris::GlobalMemoryArbitrator::sys_mem_available() + |
87 | 1 | config::memtable_limiter_reserved_memory_bytes; |
88 | 1 | } |
89 | | |
90 | 1 | int64_t MemTableMemoryLimiter::_process_used_mem_more_than_soft_mem_limit() { |
91 | | // reserve a small amount of memory so we do not trigger MinorGC |
92 | 1 | return GlobalMemoryArbitrator::process_memory_usage() - MemInfo::soft_mem_limit() + |
93 | 1 | config::memtable_limiter_reserved_memory_bytes; |
94 | 1 | } |
95 | | |
96 | 1 | bool MemTableMemoryLimiter::_soft_limit_reached() { |
97 | 1 | return _mem_tracker->consumption() > _load_soft_mem_limit || _hard_limit_reached(); |
98 | 1 | } |
99 | | |
100 | 1 | bool MemTableMemoryLimiter::_hard_limit_reached() { |
101 | 1 | return _mem_tracker->consumption() > _load_hard_mem_limit || |
102 | 1 | _sys_avail_mem_less_than_warning_water_mark() > 0 || |
103 | 1 | _process_used_mem_more_than_soft_mem_limit() > 0; |
104 | 1 | } |
105 | | |
106 | 0 | bool MemTableMemoryLimiter::_load_usage_low() { |
107 | 0 | return _mem_tracker->consumption() <= _load_safe_mem_permit; |
108 | 0 | } |
109 | | |
110 | 0 | int64_t MemTableMemoryLimiter::_need_flush() { |
111 | 0 | DBUG_EXECUTE_IF("MemTableMemoryLimiter._need_flush.random_flush", { |
112 | 0 | if (rand() % 100 < (100 * dp->param("percent", 0.5))) { |
113 | 0 | LOG(INFO) << "debug memtable need flush return 1"; |
114 | 0 | return 1; |
115 | 0 | } |
116 | 0 | }); |
117 | 0 | int64_t limit1 = _mem_tracker->consumption() - _load_soft_mem_limit; |
118 | 0 | int64_t limit2 = _sys_avail_mem_less_than_warning_water_mark(); |
119 | 0 | int64_t limit3 = _process_used_mem_more_than_soft_mem_limit(); |
120 | 0 | int64_t need_flush = std::max({limit1, limit2, limit3}); |
121 | 0 | return need_flush - _queue_mem_usage - _flush_mem_usage; |
122 | 0 | } |
123 | | |
124 | 1 | void MemTableMemoryLimiter::handle_memtable_flush(std::function<bool()> cancel_check) { |
125 | | // Check the soft limit. |
126 | 1 | DCHECK(_load_soft_mem_limit > 0); |
127 | 1 | do { |
128 | 1 | DBUG_EXECUTE_IF("MemTableMemoryLimiter.handle_memtable_flush.limit_reached", { |
129 | 1 | LOG(INFO) << "debug memtable limit reached"; |
130 | 1 | break; |
131 | 1 | }); |
132 | 1 | if (!_soft_limit_reached() || _load_usage_low()) { |
133 | 1 | return; |
134 | 1 | } |
135 | 1 | } while (false); |
136 | 0 | MonotonicStopWatch timer; |
137 | 0 | timer.start(); |
138 | 0 | std::unique_lock<std::mutex> l(_lock); |
139 | 0 | g_memtable_memory_limit_waiting_threads << 1; |
140 | 0 | bool first = true; |
141 | 0 | do { |
142 | 0 | if (!first) { |
143 | 0 | auto st = _hard_limit_end_cond.wait_for(l, std::chrono::milliseconds(1000)); |
144 | 0 | if (st == std::cv_status::timeout) { |
145 | 0 | LOG(INFO) << "timeout when waiting for memory hard limit end, try again"; |
146 | 0 | } |
147 | 0 | } |
148 | 0 | if (cancel_check && cancel_check()) { |
149 | 0 | LOG(INFO) << "cancelled when waiting for memtable flush"; |
150 | 0 | g_memtable_memory_limit_waiting_threads << -1; |
151 | 0 | return; |
152 | 0 | } |
153 | 0 | first = false; |
154 | 0 | int64_t need_flush = _need_flush(); |
155 | 0 | if (need_flush > 0) { |
156 | 0 | auto limit = _hard_limit_reached() ? Limit::HARD : Limit::SOFT; |
157 | 0 | LOG(INFO) << "reached memtable memory " << (limit == Limit::HARD ? "hard" : "soft") |
158 | 0 | << ", " << GlobalMemoryArbitrator::process_memory_used_details_str() << ", " |
159 | 0 | << GlobalMemoryArbitrator::sys_mem_available_details_str() |
160 | 0 | << ", load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption()) |
161 | 0 | << ", memtable writers num: " << _writers.size() |
162 | 0 | << ", active: " << PrettyPrinter::print_bytes(_active_mem_usage) |
163 | 0 | << ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage) |
164 | 0 | << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) |
165 | 0 | << ", need flush: " << PrettyPrinter::print_bytes(need_flush); |
166 | 0 | if (VLOG_DEBUG_IS_ON) { |
167 | 0 | auto log_str = doris::ProcessProfile::instance() |
168 | 0 | ->memory_profile() |
169 | 0 | ->process_memory_detail_str(); |
170 | 0 | LOG_LONG_STRING(INFO, log_str); |
171 | 0 | } |
172 | 0 | if (limit == Limit::HARD) { |
173 | 0 | g_flush_cuz_load_mem_exceed_hard_limit << 1; |
174 | 0 | } else if (limit == Limit::SOFT) { |
175 | 0 | g_flush_cuz_sys_mem_exceed_soft_limit << 1; |
176 | 0 | } else { |
177 | | // will not reach here |
178 | 0 | } |
179 | 0 | _flush_active_memtables(need_flush); |
180 | 0 | } |
181 | 0 | } while (_hard_limit_reached() && !_load_usage_low()); |
182 | 0 | g_memtable_memory_limit_waiting_threads << -1; |
183 | 0 | timer.stop(); |
184 | 0 | int64_t time_ms = timer.elapsed_time() / 1000 / 1000; |
185 | 0 | g_memtable_memory_limit_latency_ms << time_ms; |
186 | 0 | if (time_ms > 0) { |
187 | 0 | LOG(INFO) << "waited " << PrettyPrinter::print(timer.elapsed_time(), TUnit::TIME_NS) |
188 | 0 | << " for memtable memory limit" |
189 | 0 | << ", " << GlobalMemoryArbitrator::process_memory_used_details_str() << ", " |
190 | 0 | << GlobalMemoryArbitrator::sys_mem_available_details_str() |
191 | 0 | << ", load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption()) |
192 | 0 | << ", memtable writers num: " << _writers.size() |
193 | 0 | << ", active: " << PrettyPrinter::print_bytes(_active_mem_usage) |
194 | 0 | << ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage) |
195 | 0 | << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage); |
196 | 0 | } |
197 | 0 | } |
198 | | |
199 | 0 | int64_t MemTableMemoryLimiter::_flush_active_memtables(int64_t need_flush) { |
200 | 0 | _refresh_mem_tracker(); |
201 | 0 | if (_active_writers.size() == 0) { |
202 | 0 | return 0; |
203 | 0 | } |
204 | | |
205 | 0 | using WriterMem = std::pair<std::weak_ptr<MemTableWriter>, int64_t>; |
206 | 0 | auto cmp = [](WriterMem left, WriterMem right) { return left.second < right.second; }; |
207 | 0 | std::priority_queue<WriterMem, std::vector<WriterMem>, decltype(cmp)> heap(cmp); |
208 | |
|
209 | 0 | for (auto writer : _active_writers) { |
210 | 0 | auto w = writer.lock(); |
211 | 0 | if (w == nullptr) { |
212 | 0 | continue; |
213 | 0 | } |
214 | 0 | heap.emplace(w, w->active_memtable_mem_consumption()); |
215 | 0 | } |
216 | |
|
217 | 0 | int64_t mem_flushed = 0; |
218 | 0 | int64_t num_flushed = 0; |
219 | |
|
220 | 0 | while (mem_flushed < need_flush && !heap.empty()) { |
221 | 0 | auto [writer, sort_mem] = heap.top(); |
222 | 0 | heap.pop(); |
223 | 0 | auto w = writer.lock(); |
224 | 0 | if (w == nullptr) { |
225 | 0 | continue; |
226 | 0 | } |
227 | | |
228 | 0 | int64_t mem = w->active_memtable_mem_consumption(); |
229 | 0 | if (mem < sort_mem * 0.9) { |
230 | | // if the memtable writer just got flushed, don't flush it again |
231 | 0 | continue; |
232 | 0 | } |
233 | 0 | Status st = w->flush_async(); |
234 | 0 | if (!st.ok()) { |
235 | 0 | auto err_msg = fmt::format( |
236 | 0 | "tablet writer failed to reduce mem consumption by flushing memtable, " |
237 | 0 | "tablet_id={}, err={}", |
238 | 0 | w->tablet_id(), st.to_string()); |
239 | 0 | LOG(WARNING) << err_msg; |
240 | 0 | static_cast<void>(w->cancel_with_status(st)); |
241 | 0 | } |
242 | 0 | mem_flushed += mem; |
243 | 0 | num_flushed += (mem > 0); |
244 | 0 | g_memtable_memory_limit_flush_memtable_count << 1; |
245 | 0 | g_memtable_memory_limit_flush_size_bytes << mem; |
246 | 0 | } |
247 | 0 | LOG(INFO) << "flushed " << num_flushed << " out of " << _active_writers.size() |
248 | 0 | << " active writers, flushed size: " << PrettyPrinter::print_bytes(mem_flushed); |
249 | 0 | return mem_flushed; |
250 | 0 | } |
251 | | |
252 | 0 | void MemTableMemoryLimiter::refresh_mem_tracker() { |
253 | 0 | std::lock_guard<std::mutex> l(_lock); |
254 | 0 | _refresh_mem_tracker(); |
255 | 0 | std::stringstream ss; |
256 | 0 | Limit limit = Limit::NONE; |
257 | 0 | if (_soft_limit_reached()) { |
258 | 0 | limit = _hard_limit_reached() ? Limit::HARD : Limit::SOFT; |
259 | 0 | ss << "reached " << (limit == Limit::HARD ? "hard" : "soft") << " limit"; |
260 | 0 | } else if (_last_limit == Limit::NONE) { |
261 | 0 | return; |
262 | 0 | } else { |
263 | 0 | ss << "ended " << (_last_limit == Limit::HARD ? "hard" : "soft") << " limit"; |
264 | 0 | } |
265 | | |
266 | 0 | if (_last_limit == limit && _log_timer.elapsed_time() < LOG_INTERVAL) { |
267 | 0 | return; |
268 | 0 | } |
269 | | |
270 | 0 | _last_limit = limit; |
271 | 0 | _log_timer.reset(); |
272 | 0 | LOG(INFO) << ss.str() |
273 | 0 | << ", load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption()) |
274 | 0 | << ", memtable writers num: " << _writers.size() |
275 | 0 | << ", active: " << PrettyPrinter::print_bytes(_active_mem_usage) |
276 | 0 | << ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage) |
277 | 0 | << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage); |
278 | 0 | if (VLOG_DEBUG_IS_ON) { |
279 | 0 | auto log_str = |
280 | 0 | doris::ProcessProfile::instance()->memory_profile()->process_memory_detail_str(); |
281 | 0 | LOG_LONG_STRING(INFO, log_str); |
282 | 0 | } |
283 | 0 | } |
284 | | |
285 | 0 | void MemTableMemoryLimiter::_refresh_mem_tracker() { |
286 | 0 | _flush_mem_usage = 0; |
287 | 0 | _queue_mem_usage = 0; |
288 | 0 | _active_mem_usage = 0; |
289 | 0 | _active_writers.clear(); |
290 | 0 | for (auto it = _writers.begin(); it != _writers.end();) { |
291 | 0 | if (auto writer = it->lock()) { |
292 | | // The memtable is currently used by writer to insert blocks. |
293 | 0 | auto active_usage = writer->active_memtable_mem_consumption(); |
294 | 0 | _active_mem_usage += active_usage; |
295 | 0 | if (active_usage > 0) { |
296 | 0 | _active_writers.push_back(writer); |
297 | 0 | } |
298 | |
|
299 | 0 | auto flush_usage = writer->mem_consumption(MemType::FLUSH); |
300 | 0 | _flush_mem_usage += flush_usage; |
301 | |
|
302 | 0 | auto write_usage = writer->mem_consumption(MemType::WRITE_FINISHED); |
303 | 0 | _queue_mem_usage += write_usage; |
304 | 0 | ++it; |
305 | 0 | } else { |
306 | 0 | *it = std::move(_writers.back()); |
307 | 0 | _writers.pop_back(); |
308 | 0 | } |
309 | 0 | } |
310 | 0 | _mem_usage = _active_mem_usage + _queue_mem_usage + _flush_mem_usage; |
311 | 0 | g_memtable_active_memory.set_value(_active_mem_usage); |
312 | 0 | g_memtable_write_memory.set_value(_queue_mem_usage); |
313 | 0 | g_memtable_flush_memory.set_value(_flush_mem_usage); |
314 | 0 | g_memtable_load_memory.set_value(_mem_usage); |
315 | 0 | VLOG_DEBUG << "refreshed mem_tracker, num writers: " << _writers.size(); |
316 | 0 | _mem_tracker->set_consumption(_mem_usage); |
317 | 0 | if (!_hard_limit_reached()) { |
318 | 0 | _hard_limit_end_cond.notify_all(); |
319 | 0 | } |
320 | 0 | } |
321 | | |
322 | | } // namespace doris |