be/src/runtime/memory/mem_tracker_limiter.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "runtime/memory/mem_tracker_limiter.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/types.pb.h> |
22 | | |
23 | | #include <functional> |
24 | | #include <mutex> |
25 | | #include <queue> |
26 | | #include <utility> |
27 | | |
28 | | #include "common/config.h" |
29 | | #include "runtime/exec_env.h" |
30 | | #include "runtime/fragment_mgr.h" |
31 | | #include "runtime/memory/global_memory_arbitrator.h" |
32 | | #include "runtime/runtime_profile.h" |
33 | | #include "runtime/thread_context.h" |
34 | | #include "runtime/workload_group/workload_group.h" |
35 | | #include "service/backend_options.h" |
36 | | #include "util/mem_info.h" |
37 | | |
38 | | namespace doris { |
39 | | #include "common/compile_check_begin.h" |
40 | | |
41 | | static bvar::Adder<int64_t> memory_memtrackerlimiter_cnt("memory_memtrackerlimiter_cnt"); |
42 | | |
43 | | std::atomic<long> mem_tracker_limiter_group_counter(0); |
44 | | |
45 | 608k | MemTrackerLimiter::MemTrackerLimiter(Type type, const std::string& label, int64_t byte_limit) { |
46 | 608k | DCHECK_GE(byte_limit, -1); |
47 | 608k | _type = type; |
48 | 608k | _label = label; |
49 | 608k | _limit = byte_limit; |
50 | 608k | _uid = UniqueId::gen_uid(); |
51 | 608k | if (_type == Type::GLOBAL) { |
52 | 49 | _group_num = 0; |
53 | 607k | } else if (_type == Type::METADATA) { |
54 | 12 | _group_num = 1; |
55 | 607k | } else if (_type == Type::CACHE) { |
56 | 2.22k | _group_num = 2; |
57 | 605k | } else { |
58 | 605k | _group_num = |
59 | 605k | mem_tracker_limiter_group_counter.fetch_add(1) % (MEM_TRACKER_GROUP_NUM - 3) + 3; |
60 | 605k | } |
61 | 608k | memory_memtrackerlimiter_cnt << 1; |
62 | 608k | } |
63 | | |
64 | | std::shared_ptr<MemTrackerLimiter> MemTrackerLimiter::create_shared(MemTrackerLimiter::Type type, |
65 | | const std::string& label, |
66 | 304k | int64_t byte_limit) { |
67 | 304k | auto tracker = std::make_shared<MemTrackerLimiter>(type, label, byte_limit); |
68 | | // Write tracker is only used to tracker the size, memtable has a separate logic to deal with memory flush, |
69 | | // so limit == -1, so that should not check the limit in memtracker. |
70 | 304k | auto write_tracker = std::make_shared<MemTrackerLimiter>(type, "Memtable#" + label, -1); |
71 | 304k | tracker->_write_tracker.swap(write_tracker); |
72 | | #ifndef BE_TEST |
73 | | DCHECK(ExecEnv::tracking_memory()); |
74 | | std::lock_guard<std::mutex> l( |
75 | | ExecEnv::GetInstance()->mem_tracker_limiter_pool[tracker->group_num()].group_lock); |
76 | | ExecEnv::GetInstance()->mem_tracker_limiter_pool[tracker->group_num()].trackers.insert( |
77 | | ExecEnv::GetInstance()->mem_tracker_limiter_pool[tracker->group_num()].trackers.end(), |
78 | | tracker); |
79 | | #endif |
80 | 304k | return tracker; |
81 | 304k | } |
82 | | |
83 | 608k | bool MemTrackerLimiter::open_memory_tracker_inaccurate_detect() { |
84 | 608k | return doris::config::crash_in_memory_tracker_inaccurate && |
85 | 608k | (_type == Type::COMPACTION || _type == Type::SCHEMA_CHANGE || _type == Type::QUERY || |
86 | 0 | (_type == Type::LOAD && !is_group_commit_load)); |
87 | 608k | } |
88 | | |
89 | 608k | MemTrackerLimiter::~MemTrackerLimiter() { |
90 | 608k | consume(_untracked_mem); |
91 | 608k | static std::string mem_tracker_inaccurate_msg = |
92 | 608k | "mem tracker not equal to 0 when mem tracker destruct, this usually means that " |
93 | 608k | "memory tracking is inaccurate and SCOPED_ATTACH_TASK and " |
94 | 608k | "SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used correctly. " |
95 | 608k | "If the log is truncated, search for `Address Sanitizer` in the be.INFO log to see " |
96 | 608k | "more information." |
97 | 608k | "1. For query and load, memory leaks may have occurred, it is expected that the query " |
98 | 608k | "mem tracker will be bound to the thread context using SCOPED_ATTACH_TASK and " |
99 | 608k | "SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER before all memory alloc and free. " |
100 | 608k | "2. If a memory alloc is recorded by this tracker, it is expected that be " |
101 | 608k | "recorded in this tracker when memory is freed. " |
102 | 608k | "3. Merge the remaining memory tracking value by " |
103 | 608k | "this tracker into Orphan, if you observe that Orphan is not equal to 0 in the mem " |
104 | 608k | "tracker web or log, this indicates that there may be a memory leak. " |
105 | 608k | "4. If you need to " |
106 | 608k | "transfer memory tracking value between two trackers, can use transfer_to."; |
107 | 608k | if (consumption() != 0) { |
108 | 72 | if (open_memory_tracker_inaccurate_detect()) { |
109 | | // Maybe the memory is recorded in orphan memory tracker, so that we print the stack trace in |
110 | | // orphan memory tracker first to help us debug. |
111 | 0 | LOG(INFO) << "Orphan memory tracker consumption: " |
112 | 0 | << ExecEnv::GetInstance()->orphan_mem_tracker()->print_address_sanitizers(); |
113 | 0 | std::string err_msg = fmt::format( |
114 | 0 | "mem tracker label: {}, consumption: {}, peak consumption: {}, {}.", label(), |
115 | 0 | consumption(), peak_consumption(), mem_tracker_inaccurate_msg); |
116 | 0 | LOG(FATAL) << err_msg << print_address_sanitizers(); |
117 | 0 | } |
118 | 72 | if (ExecEnv::tracking_memory()) { |
119 | 0 | ExecEnv::GetInstance()->orphan_mem_tracker()->consume(consumption()); |
120 | 0 | } |
121 | 72 | _mem_counter.set(0); |
122 | 607k | } else if (open_memory_tracker_inaccurate_detect() && !_address_sanitizers.empty()) { |
123 | | // Maybe the memory is recorded in orphan memory tracker, so that we print the stack trace in |
124 | | // orphan memory tracker first to help us debug. |
125 | 0 | LOG(INFO) << "Orphan memory tracker consumption: " |
126 | 0 | << ExecEnv::GetInstance()->orphan_mem_tracker()->print_address_sanitizers(); |
127 | 0 | LOG(FATAL) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. " |
128 | 0 | << ", mem tracker label: " << _label |
129 | 0 | << ", peak consumption: " << peak_consumption() << print_address_sanitizers(); |
130 | 0 | } |
131 | 608k | DCHECK_EQ(reserved_consumption(), 0); |
132 | 608k | memory_memtrackerlimiter_cnt << -1; |
133 | 608k | } |
134 | | |
135 | 0 | void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) { |
136 | 0 | if (open_memory_tracker_inaccurate_detect()) { |
137 | 0 | std::lock_guard<std::mutex> l(_address_sanitizers_mtx); |
138 | 0 | auto it = _address_sanitizers.find(buf); |
139 | 0 | if (it != _address_sanitizers.end()) { |
140 | 0 | _error_address_sanitizers.emplace_back( |
141 | 0 | fmt::format("[Address Sanitizer] memory buf repeat add, mem tracker label: {}, " |
142 | 0 | "consumption: {}, peak consumption: {}, buf: {}, size: {}, old " |
143 | 0 | "buf: {}, old size: {}, new stack_trace: {}, old stack_trace: {}.", |
144 | 0 | _label, consumption(), peak_consumption(), buf, size, it->first, |
145 | 0 | it->second.size, get_stack_trace(1, "FULL_WITH_INLINE"), |
146 | 0 | it->second.stack_trace)); |
147 | 0 | } |
148 | | |
149 | | // if alignment not equal to 0, maybe usable_size > size. |
150 | 0 | AddressSanitizer as = {size, doris::config::enable_address_sanitizers_with_stack_trace |
151 | 0 | ? get_stack_trace(1, "DISABLED") |
152 | 0 | : ""}; |
153 | 0 | _address_sanitizers.emplace(buf, as); |
154 | 0 | } |
155 | 0 | } |
156 | | |
157 | 0 | void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) { |
158 | 0 | if (open_memory_tracker_inaccurate_detect()) { |
159 | 0 | std::lock_guard<std::mutex> l(_address_sanitizers_mtx); |
160 | 0 | auto it = _address_sanitizers.find(buf); |
161 | 0 | if (it != _address_sanitizers.end()) { |
162 | 0 | if (it->second.size != size) { |
163 | 0 | _error_address_sanitizers.emplace_back(fmt::format( |
164 | 0 | "[Address Sanitizer] free memory buf size inaccurate, mem tracker label: " |
165 | 0 | "{}, consumption: {}, peak consumption: {}, buf: {}, size: {}, old buf: " |
166 | 0 | "{}, old size: {}, new stack_trace: {}, old stack_trace: {}.", |
167 | 0 | _label, consumption(), peak_consumption(), buf, size, it->first, |
168 | 0 | it->second.size, get_stack_trace(1, "FULL_WITH_INLINE"), |
169 | 0 | it->second.stack_trace)); |
170 | 0 | } |
171 | 0 | _address_sanitizers.erase(buf); |
172 | 0 | } else { |
173 | 0 | _error_address_sanitizers.emplace_back(fmt::format( |
174 | 0 | "[Address Sanitizer] memory buf not exist, mem tracker label: {}, consumption: " |
175 | 0 | "{}, peak consumption: {}, buf: {}, size: {}, stack_trace: {}.", |
176 | 0 | _label, consumption(), peak_consumption(), buf, size, |
177 | 0 | get_stack_trace(1, "FULL_WITH_INLINE"))); |
178 | 0 | } |
179 | 0 | } |
180 | 0 | } |
181 | | |
182 | 0 | std::string MemTrackerLimiter::print_address_sanitizers() { |
183 | 0 | std::lock_guard<std::mutex> l(_address_sanitizers_mtx); |
184 | 0 | std::string detail = "[Address Sanitizer]:"; |
185 | 0 | detail += "\n memory not be freed:"; |
186 | 0 | for (const auto& it : _address_sanitizers) { |
187 | 0 | auto msg = fmt::format( |
188 | 0 | "\n [Address Sanitizer] buf not be freed, mem tracker label: {}, consumption: " |
189 | 0 | "{}, peak consumption: {}, buf: {}, size {}, strack trace: {}", |
190 | 0 | _label, consumption(), peak_consumption(), it.first, it.second.size, |
191 | 0 | it.second.stack_trace); |
192 | 0 | LOG(INFO) << msg; |
193 | 0 | detail += msg; |
194 | 0 | } |
195 | 0 | detail += "\n incorrect memory alloc and free:"; |
196 | 0 | for (const auto& err_msg : _error_address_sanitizers) { |
197 | 0 | LOG(INFO) << err_msg; |
198 | 0 | detail += fmt::format("\n {}", err_msg); |
199 | 0 | } |
200 | 0 | return detail; |
201 | 0 | } |
202 | | |
203 | 0 | RuntimeProfile* MemTrackerLimiter::make_profile(RuntimeProfile* profile) const { |
204 | 0 | RuntimeProfile* profile_snapshot = profile->create_child( |
205 | 0 | fmt::format("{}@{}@id={}", _label, type_string(_type), _uid.to_string()), true, false); |
206 | 0 | RuntimeProfile::HighWaterMarkCounter* usage_counter = |
207 | 0 | profile_snapshot->AddHighWaterMarkCounter("Memory", TUnit::BYTES); |
208 | 0 | COUNTER_SET(usage_counter, peak_consumption()); |
209 | 0 | COUNTER_SET(usage_counter, consumption()); |
210 | 0 | if (limit() >= 0) { |
211 | 0 | RuntimeProfile::Counter* limit_counter = |
212 | 0 | ADD_COUNTER(profile_snapshot, "Limit", TUnit::BYTES); |
213 | 0 | COUNTER_SET(limit_counter, _limit); |
214 | 0 | } |
215 | 0 | if (reserved_peak_consumption() != 0) { |
216 | 0 | RuntimeProfile::HighWaterMarkCounter* reserved_counter = |
217 | 0 | profile_snapshot->AddHighWaterMarkCounter("ReservedMemory", TUnit::BYTES); |
218 | 0 | COUNTER_SET(reserved_counter, reserved_peak_consumption()); |
219 | 0 | COUNTER_SET(reserved_counter, reserved_consumption()); |
220 | 0 | } |
221 | 0 | return profile_snapshot; |
222 | 0 | } |
223 | | |
224 | 0 | std::string MemTrackerLimiter::make_profile_str() const { |
225 | 0 | std::unique_ptr<RuntimeProfile> profile_snapshot = |
226 | 0 | std::make_unique<RuntimeProfile>("MemTrackerSnapshot"); |
227 | 0 | make_profile(profile_snapshot.get()); |
228 | 0 | std::stringstream ss; |
229 | 0 | profile_snapshot->pretty_print(&ss); |
230 | 0 | return ss.str(); |
231 | 0 | } |
232 | | |
233 | 0 | void MemTrackerLimiter::clean_tracker_limiter_group() { |
234 | | #ifndef BE_TEST |
235 | | if (ExecEnv::tracking_memory()) { |
236 | | for (auto& group : ExecEnv::GetInstance()->mem_tracker_limiter_pool) { |
237 | | std::lock_guard<std::mutex> l(group.group_lock); |
238 | | auto it = group.trackers.begin(); |
239 | | while (it != group.trackers.end()) { |
240 | | if ((*it).expired()) { |
241 | | it = group.trackers.erase(it); |
242 | | } else { |
243 | | ++it; |
244 | | } |
245 | | } |
246 | | } |
247 | | } |
248 | | #endif |
249 | 0 | } |
250 | | |
251 | | void MemTrackerLimiter::make_type_trackers_profile(RuntimeProfile* profile, |
252 | 0 | MemTrackerLimiter::Type type) { |
253 | 0 | if (type == Type::GLOBAL) { |
254 | 0 | std::lock_guard<std::mutex> l( |
255 | 0 | ExecEnv::GetInstance()->mem_tracker_limiter_pool[0].group_lock); |
256 | 0 | for (auto trackerWptr : ExecEnv::GetInstance()->mem_tracker_limiter_pool[0].trackers) { |
257 | 0 | auto tracker = trackerWptr.lock(); |
258 | 0 | if (tracker != nullptr) { |
259 | 0 | tracker->make_profile(profile); |
260 | 0 | } |
261 | 0 | } |
262 | 0 | } else if (type == Type::METADATA) { |
263 | 0 | std::lock_guard<std::mutex> l( |
264 | 0 | ExecEnv::GetInstance()->mem_tracker_limiter_pool[1].group_lock); |
265 | 0 | for (auto trackerWptr : ExecEnv::GetInstance()->mem_tracker_limiter_pool[1].trackers) { |
266 | 0 | auto tracker = trackerWptr.lock(); |
267 | 0 | if (tracker != nullptr) { |
268 | 0 | tracker->make_profile(profile); |
269 | 0 | } |
270 | 0 | } |
271 | 0 | } else if (type == Type::CACHE) { |
272 | 0 | std::lock_guard<std::mutex> l( |
273 | 0 | ExecEnv::GetInstance()->mem_tracker_limiter_pool[2].group_lock); |
274 | 0 | for (auto trackerWptr : ExecEnv::GetInstance()->mem_tracker_limiter_pool[2].trackers) { |
275 | 0 | auto tracker = trackerWptr.lock(); |
276 | 0 | if (tracker != nullptr) { |
277 | 0 | tracker->make_profile(profile); |
278 | 0 | } |
279 | 0 | } |
280 | 0 | } else { |
281 | 0 | for (unsigned i = 3; i < ExecEnv::GetInstance()->mem_tracker_limiter_pool.size(); ++i) { |
282 | 0 | std::lock_guard<std::mutex> l( |
283 | 0 | ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].group_lock); |
284 | 0 | for (auto trackerWptr : ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].trackers) { |
285 | 0 | auto tracker = trackerWptr.lock(); |
286 | 0 | if (tracker != nullptr && tracker->type() == type) { |
287 | 0 | tracker->make_profile(profile); |
288 | 0 | } |
289 | 0 | } |
290 | 0 | } |
291 | 0 | } |
292 | 0 | } |
293 | | |
294 | 0 | std::string MemTrackerLimiter::make_type_trackers_profile_str(MemTrackerLimiter::Type type) { |
295 | 0 | std::unique_ptr<RuntimeProfile> profile_snapshot = |
296 | 0 | std::make_unique<RuntimeProfile>("TypeMemTrackersSnapshot"); |
297 | 0 | make_type_trackers_profile(profile_snapshot.get(), type); |
298 | 0 | std::stringstream ss; |
299 | 0 | profile_snapshot->pretty_print(&ss); |
300 | 0 | return ss.str(); |
301 | 0 | } |
302 | | |
303 | | void MemTrackerLimiter::make_top_consumption_tasks_tracker_profile(RuntimeProfile* profile, |
304 | 0 | int top_num) { |
305 | 0 | std::unique_ptr<RuntimeProfile> tmp_profile_snapshot = |
306 | 0 | std::make_unique<RuntimeProfile>("tmpSnapshot"); |
307 | 0 | std::priority_queue<std::pair<int64_t, RuntimeProfile*>> max_pq; |
308 | | // start from 3, not include global/metadata/cache type. |
309 | 0 | for (unsigned i = 3; i < ExecEnv::GetInstance()->mem_tracker_limiter_pool.size(); ++i) { |
310 | 0 | std::lock_guard<std::mutex> l( |
311 | 0 | ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].group_lock); |
312 | 0 | for (auto trackerWptr : ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].trackers) { |
313 | 0 | auto tracker = trackerWptr.lock(); |
314 | 0 | if (tracker != nullptr) { |
315 | 0 | auto* profile_snapshot = tracker->make_profile(tmp_profile_snapshot.get()); |
316 | 0 | max_pq.emplace(tracker->consumption(), profile_snapshot); |
317 | 0 | } |
318 | 0 | } |
319 | 0 | } |
320 | |
|
321 | 0 | while (!max_pq.empty() && top_num > 0) { |
322 | 0 | RuntimeProfile* profile_snapshot = |
323 | 0 | profile->create_child(max_pq.top().second->name(), true, false); |
324 | 0 | profile_snapshot->merge(max_pq.top().second); |
325 | 0 | top_num--; |
326 | 0 | max_pq.pop(); |
327 | 0 | } |
328 | 0 | } |
329 | | |
330 | 0 | void MemTrackerLimiter::make_all_tasks_tracker_profile(RuntimeProfile* profile) { |
331 | 0 | std::unordered_map<Type, RuntimeProfile*> types_profile; |
332 | 0 | types_profile[Type::QUERY] = profile->create_child("QueryTasks", true, false); |
333 | 0 | types_profile[Type::LOAD] = profile->create_child("LoadTasks", true, false); |
334 | 0 | types_profile[Type::COMPACTION] = profile->create_child("CompactionTasks", true, false); |
335 | 0 | types_profile[Type::SCHEMA_CHANGE] = profile->create_child("SchemaChangeTasks", true, false); |
336 | 0 | types_profile[Type::OTHER] = profile->create_child("OtherTasks", true, false); |
337 | | |
338 | | // start from 3, not include global/metadata/cache type. |
339 | 0 | for (unsigned i = 3; i < ExecEnv::GetInstance()->mem_tracker_limiter_pool.size(); ++i) { |
340 | 0 | std::lock_guard<std::mutex> l( |
341 | 0 | ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].group_lock); |
342 | 0 | for (auto trackerWptr : ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].trackers) { |
343 | 0 | auto tracker = trackerWptr.lock(); |
344 | 0 | if (tracker != nullptr) { |
345 | | // ResultBlockBufferBase will continue to exist for 5 minutes after the query ends, even if the |
346 | | // result buffer is empty, and will not be shown in the profile. of course, this code is tricky. |
347 | 0 | if (tracker->consumption() == 0 && |
348 | 0 | tracker->label().starts_with("ResultBlockBuffer")) { |
349 | 0 | continue; |
350 | 0 | } |
351 | 0 | tracker->make_profile(types_profile[tracker->type()]); |
352 | 0 | } |
353 | 0 | } |
354 | 0 | } |
355 | 0 | } |
356 | | |
357 | 0 | void MemTrackerLimiter::print_log_usage(const std::string& msg) { |
358 | 0 | if (_enable_print_log_usage) { |
359 | 0 | _enable_print_log_usage = false; |
360 | 0 | std::string detail = msg; |
361 | 0 | detail += "\nProcess Memory Summary: " + GlobalMemoryArbitrator::process_mem_log_str(); |
362 | 0 | detail += "\n" + make_profile_str(); |
363 | 0 | LOG(WARNING) << detail; |
364 | 0 | } |
365 | 0 | } |
366 | | |
367 | 0 | std::string MemTrackerLimiter::tracker_limit_exceeded_str() { |
368 | 0 | std::string err_msg = fmt::format( |
369 | 0 | "memory tracker limit exceeded, tracker label:{}, type:{}, limit " |
370 | 0 | "{}, peak used {}, current used {}. backend {}, {}.", |
371 | 0 | label(), type_string(_type), PrettyPrinter::print_bytes(limit()), |
372 | 0 | PrettyPrinter::print_bytes(peak_consumption()), |
373 | 0 | PrettyPrinter::print_bytes(consumption()), BackendOptions::get_localhost(), |
374 | 0 | GlobalMemoryArbitrator::process_memory_used_str()); |
375 | 0 | if (_type == Type::QUERY || _type == Type::LOAD) { |
376 | 0 | err_msg += fmt::format( |
377 | 0 | " exec node:<{}>, can `set exec_mem_limit` to change limit, details see be.INFO.", |
378 | 0 | doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker_label()); |
379 | 0 | } else if (_type == Type::SCHEMA_CHANGE) { |
380 | 0 | err_msg += fmt::format( |
381 | 0 | " can modify `memory_limitation_per_thread_for_schema_change_bytes` in be.conf to " |
382 | 0 | "change limit, details see be.INFO."); |
383 | 0 | } |
384 | 0 | return err_msg; |
385 | 0 | } |
386 | | |
387 | | #include "common/compile_check_end.h" |
388 | | } // namespace doris |