be/src/runtime/memory/mem_tracker_limiter.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <gen_cpp/Metrics_types.h> |
21 | | #include <gen_cpp/Types_types.h> |
22 | | #include <glog/logging.h> |
23 | | |
24 | | #include <atomic> |
25 | | #include <cstdint> |
26 | | // IWYU pragma: no_include <bits/std_abs.h> |
27 | | #include <cmath> // IWYU pragma: keep |
28 | | #include <list> |
29 | | #include <memory> |
30 | | #include <ostream> |
31 | | #include <string> |
32 | | #include <unordered_map> |
33 | | #include <vector> |
34 | | |
35 | | #include "common/config.h" |
36 | | #include "common/status.h" |
37 | | #include "runtime/memory/mem_counter.h" |
38 | | #include "runtime/memory/mem_tracker.h" |
39 | | #include "util/string_util.h" |
40 | | #include "util/uid_util.h" |
41 | | |
42 | | namespace doris { |
43 | | |
44 | | class RuntimeProfile; |
45 | | class MemTrackerLimiter; |
46 | | |
47 | | constexpr size_t MEM_TRACKER_GROUP_NUM = 1000; |
48 | | |
49 | | struct TrackerLimiterGroup { |
50 | | // Note! in order to enable ExecEnv::mem_tracker_limiter_pool support resize, |
51 | | // the copy construction of TrackerLimiterGroup is disabled. |
52 | | // so cannot copy TrackerLimiterGroup anywhere, should use reference. |
53 | 1 | TrackerLimiterGroup() = default; |
54 | 0 | TrackerLimiterGroup(TrackerLimiterGroup&&) noexcept {} |
55 | 1.00k | TrackerLimiterGroup(const TrackerLimiterGroup&) {} |
56 | 0 | TrackerLimiterGroup& operator=(const TrackerLimiterGroup&) { return *this; } |
57 | | |
58 | | std::list<std::weak_ptr<MemTrackerLimiter>> trackers; |
59 | | std::mutex group_lock; |
60 | | }; |
61 | | |
62 | | /* |
63 | | * Track and limit the memory usage of process and query. |
64 | | * |
65 | | * Usually, put Query MemTrackerLimiter into SCOPED_ATTACH_TASK when the thread starts, |
66 | | * all memory used by this thread will be recorded on this Query. |
67 | | * |
68 | | * This class is thread-safe. |
69 | | */ |
70 | | class MemTrackerLimiter final { |
71 | | public: |
72 | | /* |
73 | | * Part 1, Type definition |
74 | | */ |
75 | | |
76 | | enum class Type { |
77 | | GLOBAL = 0, // Life cycle is the same as the process, except cache and metadata. |
78 | | QUERY = 1, // Count the memory consumption of all Query tasks. |
79 | | LOAD = 2, // Count the memory consumption of all Load tasks. |
80 | | COMPACTION = 3, // Count the memory consumption of all Base and Cumulative tasks. |
81 | | SCHEMA_CHANGE = 4, // Count the memory consumption of all SchemaChange tasks. |
82 | | METADATA = 5, // Count the memory consumption of all Metadata. |
83 | | CACHE = 6, // Count the memory consumption of all Cache. |
84 | | OTHER = 7, // Count the memory consumption of all other tasks, such as Clone, Snapshot, etc.. |
85 | | }; |
86 | | |
87 | 0 | static std::string type_string(Type type) { |
88 | 0 | switch (type) { |
89 | 0 | case Type::GLOBAL: |
90 | 0 | return "global"; |
91 | 0 | case Type::QUERY: |
92 | 0 | return "query"; |
93 | 0 | case Type::LOAD: |
94 | 0 | return "load"; |
95 | 0 | case Type::COMPACTION: |
96 | 0 | return "compaction"; |
97 | 0 | case Type::SCHEMA_CHANGE: |
98 | 0 | return "schema_change"; |
99 | 0 | case Type::METADATA: |
100 | 0 | return "metadata"; |
101 | 0 | case Type::CACHE: |
102 | 0 | return "cache"; |
103 | 0 | case Type::OTHER: |
104 | 0 | return "other_task"; |
105 | 0 | default: |
106 | 0 | LOG(FATAL) << "not match type of mem tracker limiter :" << static_cast<int>(type); |
107 | 0 | } |
108 | 0 | LOG(FATAL) << "__builtin_unreachable"; |
109 | 0 | __builtin_unreachable(); |
110 | 0 | } |
111 | | |
112 | | /* |
113 | | * Part 2, Constructors and property methods |
114 | | */ |
115 | | |
116 | | static std::shared_ptr<MemTrackerLimiter> create_shared(MemTrackerLimiter::Type type, |
117 | | const std::string& label, |
118 | | int64_t byte_limit = -1); |
119 | | // byte_limit equal to -1 means no consumption limit, only participate in process memory statistics. |
120 | | MemTrackerLimiter(Type type, const std::string& label, int64_t byte_limit); |
121 | | ~MemTrackerLimiter(); |
122 | | |
123 | 0 | Type type() const { return _type; } |
124 | 40 | const std::string& label() const { return _label; } |
125 | 0 | int64_t group_num() const { return _group_num; } |
126 | 245k | int64_t limit() const { return _limit; } |
127 | 48 | void set_limit(int64_t new_mem_limit) { _limit = new_mem_limit; } |
128 | 122k | void set_enable_check_limit(bool enable_check_limit) { |
129 | 122k | _enable_check_limit = enable_check_limit; |
130 | 122k | } |
131 | | Status check_limit(int64_t bytes = 0); |
132 | | // Log the memory usage when memory limit is exceeded. |
133 | | std::string tracker_limit_exceeded_str(); |
134 | | |
135 | | static void clean_tracker_limiter_group(); |
136 | | |
137 | | /* |
138 | | * Part 3, Memory tracking method (use carefully!) |
139 | | * |
140 | | * Note: Only memory not allocated by Doris Allocator can be tracked by manually calling consume() and release(). |
141 | | * Memory allocated by Doris Allocator needs to be tracked using SCOPED_ATTACH_TASK or |
142 | | * SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER. |
143 | | */ |
144 | | |
145 | 609k | int64_t consumption() const { return _mem_counter.current_value(); } |
146 | 122k | int64_t peak_consumption() const { return _mem_counter.peak_value(); } |
147 | | |
148 | | // Use carefully! only memory that cannot be allocated using Doris Allocator needs to be consumed manually. |
149 | | // Ideally, all memory should use Doris Allocator. |
150 | 1.09M | void consume(int64_t bytes) { _mem_counter.add(bytes); } |
151 | | |
152 | 0 | void consume_no_update_peak(int64_t bytes) { _mem_counter.add_no_update_peak(bytes); } |
153 | | |
154 | 317k | void release(int64_t bytes) { _mem_counter.sub(bytes); } |
155 | | |
156 | 15 | bool try_consume(int64_t bytes) { |
157 | 15 | if (UNLIKELY(bytes == 0)) { |
158 | 0 | return true; |
159 | 0 | } |
160 | 15 | if (limit() >= 0) { |
161 | 6 | return _mem_counter.try_add(bytes, _limit); |
162 | 9 | } else { |
163 | 9 | _mem_counter.add(bytes); |
164 | 9 | return true; |
165 | 9 | } |
166 | 15 | } |
167 | | |
168 | 0 | void set_consumption(int64_t bytes) { _mem_counter.set(bytes); } |
169 | | |
170 | | // Transfer 'bytes' of consumption from this tracker to 'dst'. |
171 | 0 | void transfer_to(int64_t size, MemTrackerLimiter* dst) { |
172 | 0 | if (label() == dst->label()) { |
173 | 0 | return; |
174 | 0 | } |
175 | 0 | cache_consume(-size); |
176 | 0 | dst->cache_consume(size); |
177 | 0 | } |
178 | | |
179 | | // If need to consume the tracker frequently, use it |
180 | | void cache_consume(int64_t bytes); |
181 | | |
182 | | /* |
183 | | * Part 4, Reserved memory tracking method |
184 | | */ |
185 | | |
186 | 609k | int64_t reserved_consumption() const { return _reserved_counter.current_value(); } |
187 | 0 | int64_t reserved_peak_consumption() const { return _reserved_counter.peak_value(); } |
188 | | |
189 | 0 | void reserve(int64_t bytes) { |
190 | 0 | if (UNLIKELY(bytes == 0)) { |
191 | 0 | return; |
192 | 0 | } |
193 | 0 | _mem_counter.add(bytes); |
194 | 0 | _reserved_counter.add(bytes); |
195 | 0 | } |
196 | | |
197 | 15 | bool try_reserve(int64_t bytes) { |
198 | 15 | if (try_consume(bytes)) { |
199 | 14 | _reserved_counter.add(bytes); |
200 | 14 | return true; |
201 | 14 | } else { |
202 | 1 | return false; |
203 | 1 | } |
204 | 15 | } |
205 | | |
206 | 24 | void shrink_reserved(int64_t bytes) { |
207 | 24 | _reserved_counter.sub(bytes); |
208 | 24 | DCHECK(reserved_consumption() >= 0); |
209 | 24 | } |
210 | | |
211 | | /* |
212 | | * Part 5, Memory profile and log method |
213 | | */ |
214 | | RuntimeProfile* make_profile(RuntimeProfile* profile) const; |
215 | | std::string make_profile_str() const; |
216 | | static void make_type_trackers_profile(RuntimeProfile* profile, MemTrackerLimiter::Type type); |
217 | | static std::string make_type_trackers_profile_str(MemTrackerLimiter::Type type); |
218 | | static void make_top_consumption_tasks_tracker_profile(RuntimeProfile* profile, int top_num); |
219 | | static void make_all_tasks_tracker_profile(RuntimeProfile* profile); |
220 | | |
221 | 62 | std::shared_ptr<MemTrackerLimiter> write_tracker() { return _write_tracker; } |
222 | | |
223 | | void print_log_usage(const std::string& msg); |
224 | 0 | void enable_print_log_usage() { _enable_print_log_usage = true; } |
225 | | |
226 | | /* |
227 | | * Part 6, Memory debug method |
228 | | */ |
229 | | |
230 | | void add_address_sanitizers(void* buf, size_t size); |
231 | | void remove_address_sanitizers(void* buf, size_t size); |
232 | | bool is_group_commit_load {false}; |
233 | | |
234 | | private: |
235 | | // When the accumulated untracked memory value exceeds the upper limit, |
236 | | // the current value is returned and set to 0. |
237 | | // Thread safety. |
238 | | int64_t add_untracked_mem(int64_t bytes); |
239 | | |
240 | | /* |
241 | | * Part 8, Property definition |
242 | | */ |
243 | | |
244 | | Type _type; |
245 | | |
246 | | // label used in the make snapshot, not guaranteed unique. |
247 | | std::string _label; |
248 | | // For generate runtime profile, profile name must be unique. |
249 | | UniqueId _uid; |
250 | | |
251 | | MemCounter _mem_counter; |
252 | | MemCounter _reserved_counter; |
253 | | |
254 | | // Limit on memory consumption, in bytes. |
255 | | std::atomic<int64_t> _limit; |
256 | | bool _enable_check_limit = true; |
257 | | |
258 | | // Group number in mem_tracker_limiter_pool and mem_tracker_pool, generated by the timestamp. |
259 | | int64_t _group_num; |
260 | | |
261 | | // Consume size smaller than mem_tracker_consume_min_size_bytes will continue to accumulate |
262 | | // to avoid frequent calls to consume/release of MemTracker. |
263 | | std::atomic<int64_t> _untracked_mem = 0; |
264 | | |
265 | | // Avoid frequent printing. |
266 | | bool _enable_print_log_usage = false; |
267 | | |
268 | | std::shared_ptr<MemTrackerLimiter> _write_tracker; |
269 | | |
270 | | struct AddressSanitizer { |
271 | | size_t size; |
272 | | std::string stack_trace; |
273 | | }; |
274 | | |
275 | | std::string print_address_sanitizers(); |
276 | | bool open_memory_tracker_inaccurate_detect(); |
277 | | std::mutex _address_sanitizers_mtx; |
278 | | std::unordered_map<void*, AddressSanitizer> _address_sanitizers; |
279 | | std::vector<std::string> _error_address_sanitizers; |
280 | | }; |
281 | | |
282 | 0 | inline int64_t MemTrackerLimiter::add_untracked_mem(int64_t bytes) { |
283 | 0 | _untracked_mem += bytes; |
284 | 0 | if (std::abs(_untracked_mem) >= config::mem_tracker_consume_min_size_bytes) { |
285 | 0 | return _untracked_mem.exchange(0); |
286 | 0 | } |
287 | 0 | return 0; |
288 | 0 | } |
289 | | |
290 | 0 | inline void MemTrackerLimiter::cache_consume(int64_t bytes) { |
291 | 0 | if (bytes == 0) { |
292 | 0 | return; |
293 | 0 | } |
294 | 0 | int64_t consume_bytes = add_untracked_mem(bytes); |
295 | 0 | consume(consume_bytes); |
296 | 0 | } |
297 | | |
298 | 2.74M | inline Status MemTrackerLimiter::check_limit(int64_t bytes) { |
299 | 2.74M | if (bytes <= 0 || !_enable_check_limit || _limit <= 0) { |
300 | 2.74M | return Status::OK(); |
301 | 2.74M | } |
302 | | |
303 | | // If reserve not enabled, then should check limit here to kill the query when limit exceed. |
304 | | // For insert into select or pure load job, its memtable is accounted in a seperate memtracker limiter, |
305 | | // and its reserve is set to true. So that it will not reach this logic. |
306 | | // Only query and load job has exec_mem_limit and the _limit > 0, other memtracker limiter's _limit is -1 so |
307 | | // it will not take effect. |
308 | 0 | if (consumption() + bytes > _limit) { |
309 | 0 | return Status::MemoryLimitExceeded(fmt::format("failed alloc size {}, {}", |
310 | 0 | PrettyPrinter::print_bytes(bytes), |
311 | 0 | tracker_limit_exceeded_str())); |
312 | 0 | } |
313 | 0 | return Status::OK(); |
314 | 0 | } |
315 | | |
316 | | } // namespace doris |