be/src/runtime/memory/mem_tracker_limiter.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <gen_cpp/Metrics_types.h> |
21 | | #include <gen_cpp/Types_types.h> |
22 | | #include <glog/logging.h> |
23 | | |
24 | | #include <atomic> |
25 | | #include <cstdint> |
26 | | // IWYU pragma: no_include <bits/std_abs.h> |
27 | | #include <cmath> // IWYU pragma: keep |
28 | | #include <list> |
29 | | #include <memory> |
30 | | #include <ostream> |
31 | | #include <string> |
32 | | #include <unordered_map> |
33 | | #include <vector> |
34 | | |
35 | | #include "common/config.h" |
36 | | #include "common/status.h" |
37 | | #include "runtime/memory/mem_counter.h" |
38 | | #include "runtime/memory/mem_tracker.h" |
39 | | #include "util/string_util.h" |
40 | | #include "util/uid_util.h" |
41 | | |
42 | | namespace doris { |
43 | | #include "common/compile_check_begin.h" |
44 | | |
45 | | class RuntimeProfile; |
46 | | class MemTrackerLimiter; |
47 | | |
48 | | constexpr size_t MEM_TRACKER_GROUP_NUM = 1000; |
49 | | |
50 | | struct TrackerLimiterGroup { |
51 | | // Note! in order to enable ExecEnv::mem_tracker_limiter_pool support resize, |
52 | | // the copy construction of TrackerLimiterGroup is disabled. |
53 | | // so cannot copy TrackerLimiterGroup anywhere, should use reference. |
54 | 1 | TrackerLimiterGroup() = default; |
55 | 0 | TrackerLimiterGroup(TrackerLimiterGroup&&) noexcept {} |
56 | 1.00k | TrackerLimiterGroup(const TrackerLimiterGroup&) {} |
57 | 0 | TrackerLimiterGroup& operator=(const TrackerLimiterGroup&) { return *this; } |
58 | | |
59 | | std::list<std::weak_ptr<MemTrackerLimiter>> trackers; |
60 | | std::mutex group_lock; |
61 | | }; |
62 | | |
63 | | /* |
64 | | * Track and limit the memory usage of process and query. |
65 | | * |
66 | | * Usually, put Query MemTrackerLimiter into SCOPED_ATTACH_TASK when the thread starts, |
67 | | * all memory used by this thread will be recorded on this Query. |
68 | | * |
69 | | * This class is thread-safe. |
70 | | */ |
71 | | class MemTrackerLimiter final { |
72 | | public: |
73 | | /* |
74 | | * Part 1, Type definition |
75 | | */ |
76 | | |
77 | | enum class Type { |
78 | | GLOBAL = 0, // Life cycle is the same as the process, except cache and metadata. |
79 | | QUERY = 1, // Count the memory consumption of all Query tasks. |
80 | | LOAD = 2, // Count the memory consumption of all Load tasks. |
81 | | COMPACTION = 3, // Count the memory consumption of all Base and Cumulative tasks. |
82 | | SCHEMA_CHANGE = 4, // Count the memory consumption of all SchemaChange tasks. |
83 | | METADATA = 5, // Count the memory consumption of all Metadata. |
84 | | CACHE = 6, // Count the memory consumption of all Cache. |
85 | | OTHER = 7, // Count the memory consumption of all other tasks, such as Clone, Snapshot, etc.. |
86 | | }; |
87 | | |
88 | 0 | static std::string type_string(Type type) { |
89 | 0 | switch (type) { |
90 | 0 | case Type::GLOBAL: |
91 | 0 | return "global"; |
92 | 0 | case Type::QUERY: |
93 | 0 | return "query"; |
94 | 0 | case Type::LOAD: |
95 | 0 | return "load"; |
96 | 0 | case Type::COMPACTION: |
97 | 0 | return "compaction"; |
98 | 0 | case Type::SCHEMA_CHANGE: |
99 | 0 | return "schema_change"; |
100 | 0 | case Type::METADATA: |
101 | 0 | return "metadata"; |
102 | 0 | case Type::CACHE: |
103 | 0 | return "cache"; |
104 | 0 | case Type::OTHER: |
105 | 0 | return "other_task"; |
106 | 0 | default: |
107 | 0 | LOG(FATAL) << "not match type of mem tracker limiter :" << static_cast<int>(type); |
108 | 0 | } |
109 | 0 | LOG(FATAL) << "__builtin_unreachable"; |
110 | 0 | __builtin_unreachable(); |
111 | 0 | } |
112 | | |
113 | | /* |
114 | | * Part 2, Constructors and property methods |
115 | | */ |
116 | | |
117 | | static std::shared_ptr<MemTrackerLimiter> create_shared(MemTrackerLimiter::Type type, |
118 | | const std::string& label, |
119 | | int64_t byte_limit = -1); |
120 | | // byte_limit equal to -1 means no consumption limit, only participate in process memory statistics. |
121 | | MemTrackerLimiter(Type type, const std::string& label, int64_t byte_limit); |
122 | | ~MemTrackerLimiter(); |
123 | | |
124 | 0 | Type type() const { return _type; } |
125 | 21 | const std::string& label() const { return _label; } |
126 | 0 | int64_t group_num() const { return _group_num; } |
127 | 244k | int64_t limit() const { return _limit; } |
128 | 25 | void set_limit(int64_t new_mem_limit) { _limit = new_mem_limit; } |
129 | 121k | void set_enable_check_limit(bool enable_check_limit) { |
130 | 121k | _enable_check_limit = enable_check_limit; |
131 | 121k | } |
132 | | Status check_limit(int64_t bytes = 0); |
133 | | // Log the memory usage when memory limit is exceeded. |
134 | | std::string tracker_limit_exceeded_str(); |
135 | | |
136 | | static void clean_tracker_limiter_group(); |
137 | | |
138 | | /* |
139 | | * Part 3, Memory tracking method (use carefully!) |
140 | | * |
141 | | * Note: Only memory not allocated by Doris Allocator can be tracked by manually calling consume() and release(). |
142 | | * Memory allocated by Doris Allocator needs to be tracked using SCOPED_ATTACH_TASK or |
143 | | * SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER. |
144 | | */ |
145 | | |
146 | 608k | int64_t consumption() const { return _mem_counter.current_value(); } |
147 | 122k | int64_t peak_consumption() const { return _mem_counter.peak_value(); } |
148 | | |
149 | | // Use carefully! only memory that cannot be allocated using Doris Allocator needs to be consumed manually. |
150 | | // Ideally, all memory should use Doris Allocator. |
151 | 1.08M | void consume(int64_t bytes) { _mem_counter.add(bytes); } |
152 | | |
153 | 0 | void consume_no_update_peak(int64_t bytes) { _mem_counter.add_no_update_peak(bytes); } |
154 | | |
155 | 317k | void release(int64_t bytes) { _mem_counter.sub(bytes); } |
156 | | |
157 | 13 | bool try_consume(int64_t bytes) { |
158 | 13 | if (UNLIKELY(bytes == 0)) { |
159 | 0 | return true; |
160 | 0 | } |
161 | 13 | if (limit() >= 0) { |
162 | 4 | return _mem_counter.try_add(bytes, _limit); |
163 | 9 | } else { |
164 | 9 | _mem_counter.add(bytes); |
165 | 9 | return true; |
166 | 9 | } |
167 | 13 | } |
168 | | |
169 | 0 | void set_consumption(int64_t bytes) { _mem_counter.set(bytes); } |
170 | | |
171 | | // Transfer 'bytes' of consumption from this tracker to 'dst'. |
172 | 0 | void transfer_to(int64_t size, MemTrackerLimiter* dst) { |
173 | 0 | if (label() == dst->label()) { |
174 | 0 | return; |
175 | 0 | } |
176 | 0 | cache_consume(-size); |
177 | 0 | dst->cache_consume(size); |
178 | 0 | } |
179 | | |
180 | | // If need to consume the tracker frequently, use it |
181 | | void cache_consume(int64_t bytes); |
182 | | |
183 | | /* |
184 | | * Part 4, Reserved memory tracking method |
185 | | */ |
186 | | |
187 | 608k | int64_t reserved_consumption() const { return _reserved_counter.current_value(); } |
188 | 0 | int64_t reserved_peak_consumption() const { return _reserved_counter.peak_value(); } |
189 | | |
190 | 12 | void reserve(int64_t bytes) { |
191 | 12 | if (UNLIKELY(bytes == 0)) { |
192 | 0 | return; |
193 | 0 | } |
194 | 12 | _mem_counter.add(bytes); |
195 | 12 | _reserved_counter.add(bytes); |
196 | 12 | } |
197 | | |
198 | 13 | bool try_reserve(int64_t bytes) { |
199 | 13 | if (try_consume(bytes)) { |
200 | 12 | _reserved_counter.add(bytes); |
201 | 12 | return true; |
202 | 12 | } else { |
203 | 1 | return false; |
204 | 1 | } |
205 | 13 | } |
206 | | |
207 | 34 | void shrink_reserved(int64_t bytes) { |
208 | 34 | _reserved_counter.sub(bytes); |
209 | 34 | DCHECK(reserved_consumption() >= 0); |
210 | 34 | } |
211 | | |
212 | | /* |
213 | | * Part 5, Memory profile and log method |
214 | | */ |
215 | | RuntimeProfile* make_profile(RuntimeProfile* profile) const; |
216 | | std::string make_profile_str() const; |
217 | | static void make_type_trackers_profile(RuntimeProfile* profile, MemTrackerLimiter::Type type); |
218 | | static std::string make_type_trackers_profile_str(MemTrackerLimiter::Type type); |
219 | | static void make_top_consumption_tasks_tracker_profile(RuntimeProfile* profile, int top_num); |
220 | | static void make_all_tasks_tracker_profile(RuntimeProfile* profile); |
221 | | |
222 | 62 | std::shared_ptr<MemTrackerLimiter> write_tracker() { return _write_tracker; } |
223 | | |
224 | | void print_log_usage(const std::string& msg); |
225 | 0 | void enable_print_log_usage() { _enable_print_log_usage = true; } |
226 | | |
227 | | /* |
228 | | * Part 6, Memory debug method |
229 | | */ |
230 | | |
231 | | void add_address_sanitizers(void* buf, size_t size); |
232 | | void remove_address_sanitizers(void* buf, size_t size); |
233 | | bool is_group_commit_load {false}; |
234 | | |
235 | | private: |
236 | | // When the accumulated untracked memory value exceeds the upper limit, |
237 | | // the current value is returned and set to 0. |
238 | | // Thread safety. |
239 | | int64_t add_untracked_mem(int64_t bytes); |
240 | | |
241 | | /* |
242 | | * Part 8, Property definition |
243 | | */ |
244 | | |
245 | | Type _type; |
246 | | |
247 | | // label used in the make snapshot, not guaranteed unique. |
248 | | std::string _label; |
249 | | // For generate runtime profile, profile name must be unique. |
250 | | UniqueId _uid; |
251 | | |
252 | | MemCounter _mem_counter; |
253 | | MemCounter _reserved_counter; |
254 | | |
255 | | // Limit on memory consumption, in bytes. |
256 | | std::atomic<int64_t> _limit; |
257 | | bool _enable_check_limit = true; |
258 | | |
259 | | // Group number in mem_tracker_limiter_pool and mem_tracker_pool, generated by the timestamp. |
260 | | int64_t _group_num; |
261 | | |
262 | | // Consume size smaller than mem_tracker_consume_min_size_bytes will continue to accumulate |
263 | | // to avoid frequent calls to consume/release of MemTracker. |
264 | | std::atomic<int64_t> _untracked_mem = 0; |
265 | | |
266 | | // Avoid frequent printing. |
267 | | bool _enable_print_log_usage = false; |
268 | | |
269 | | std::shared_ptr<MemTrackerLimiter> _write_tracker; |
270 | | |
271 | | struct AddressSanitizer { |
272 | | size_t size; |
273 | | std::string stack_trace; |
274 | | }; |
275 | | |
276 | | std::string print_address_sanitizers(); |
277 | | bool open_memory_tracker_inaccurate_detect(); |
278 | | std::mutex _address_sanitizers_mtx; |
279 | | std::unordered_map<void*, AddressSanitizer> _address_sanitizers; |
280 | | std::vector<std::string> _error_address_sanitizers; |
281 | | }; |
282 | | |
283 | 0 | inline int64_t MemTrackerLimiter::add_untracked_mem(int64_t bytes) { |
284 | 0 | _untracked_mem += bytes; |
285 | 0 | if (std::abs(_untracked_mem) >= config::mem_tracker_consume_min_size_bytes) { |
286 | 0 | return _untracked_mem.exchange(0); |
287 | 0 | } |
288 | 0 | return 0; |
289 | 0 | } |
290 | | |
291 | 0 | inline void MemTrackerLimiter::cache_consume(int64_t bytes) { |
292 | 0 | if (bytes == 0) { |
293 | 0 | return; |
294 | 0 | } |
295 | 0 | int64_t consume_bytes = add_untracked_mem(bytes); |
296 | 0 | consume(consume_bytes); |
297 | 0 | } |
298 | | |
299 | 2.84M | inline Status MemTrackerLimiter::check_limit(int64_t bytes) { |
300 | 2.84M | if (bytes <= 0 || !_enable_check_limit || _limit <= 0) { |
301 | 2.84M | return Status::OK(); |
302 | 2.84M | } |
303 | | |
304 | | // If reserve not enabled, then should check limit here to kill the query when limit exceed. |
305 | | // For insert into select or pure load job, its memtable is accounted in a seperate memtracker limiter, |
306 | | // and its reserve is set to true. So that it will not reach this logic. |
307 | | // Only query and load job has exec_mem_limit and the _limit > 0, other memtracker limiter's _limit is -1 so |
308 | | // it will not take effect. |
309 | 0 | if (consumption() + bytes > _limit) { |
310 | 0 | return Status::MemoryLimitExceeded(fmt::format("failed alloc size {}, {}", |
311 | 0 | PrettyPrinter::print_bytes(bytes), |
312 | 0 | tracker_limit_exceeded_str())); |
313 | 0 | } |
314 | 0 | return Status::OK(); |
315 | 0 | } |
316 | | |
317 | | #include "common/compile_check_end.h" |
318 | | } // namespace doris |