Coverage Report

Created: 2026-03-15 08:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/memory/mem_tracker_limiter.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/Metrics_types.h>
21
#include <gen_cpp/Types_types.h>
22
#include <glog/logging.h>
23
24
#include <atomic>
25
#include <cstdint>
26
// IWYU pragma: no_include <bits/std_abs.h>
27
#include <cmath> // IWYU pragma: keep
28
#include <list>
29
#include <memory>
30
#include <ostream>
31
#include <string>
32
#include <unordered_map>
33
#include <vector>
34
35
#include "common/config.h"
36
#include "common/status.h"
37
#include "runtime/memory/mem_counter.h"
38
#include "runtime/memory/mem_tracker.h"
39
#include "util/string_util.h"
40
#include "util/uid_util.h"
41
42
namespace doris {
43
#include "common/compile_check_begin.h"
44
45
class RuntimeProfile;
46
class MemTrackerLimiter;
47
48
constexpr size_t MEM_TRACKER_GROUP_NUM = 1000;
49
50
struct TrackerLimiterGroup {
51
    // Note! in order to enable ExecEnv::mem_tracker_limiter_pool support resize,
52
    // the copy construction of TrackerLimiterGroup is disabled.
53
    // so cannot copy TrackerLimiterGroup anywhere, should use reference.
54
1
    TrackerLimiterGroup() = default;
55
0
    TrackerLimiterGroup(TrackerLimiterGroup&&) noexcept {}
56
1.00k
    TrackerLimiterGroup(const TrackerLimiterGroup&) {}
57
0
    TrackerLimiterGroup& operator=(const TrackerLimiterGroup&) { return *this; }
58
59
    std::list<std::weak_ptr<MemTrackerLimiter>> trackers;
60
    std::mutex group_lock;
61
};
62
63
/*
64
 * Track and limit the memory usage of process and query.
65
 *
66
 * Usually, put Query MemTrackerLimiter into SCOPED_ATTACH_TASK when the thread starts,
67
 * all memory used by this thread will be recorded on this Query.
68
 *
69
 * This class is thread-safe.
70
*/
71
class MemTrackerLimiter final {
72
public:
73
    /*
74
    * Part 1, Type definition
75
    */
76
77
    enum class Type {
78
        GLOBAL = 0,        // Life cycle is the same as the process, except cache and metadata.
79
        QUERY = 1,         // Count the memory consumption of all Query tasks.
80
        LOAD = 2,          // Count the memory consumption of all Load tasks.
81
        COMPACTION = 3,    // Count the memory consumption of all Base and Cumulative tasks.
82
        SCHEMA_CHANGE = 4, // Count the memory consumption of all SchemaChange tasks.
83
        METADATA = 5,      // Count the memory consumption of all Metadata.
84
        CACHE = 6,         // Count the memory consumption of all Cache.
85
        OTHER = 7, // Count the memory consumption of all other tasks, such as Clone, Snapshot, etc..
86
    };
87
88
0
    static std::string type_string(Type type) {
89
0
        switch (type) {
90
0
        case Type::GLOBAL:
91
0
            return "global";
92
0
        case Type::QUERY:
93
0
            return "query";
94
0
        case Type::LOAD:
95
0
            return "load";
96
0
        case Type::COMPACTION:
97
0
            return "compaction";
98
0
        case Type::SCHEMA_CHANGE:
99
0
            return "schema_change";
100
0
        case Type::METADATA:
101
0
            return "metadata";
102
0
        case Type::CACHE:
103
0
            return "cache";
104
0
        case Type::OTHER:
105
0
            return "other_task";
106
0
        default:
107
0
            LOG(FATAL) << "not match type of mem tracker limiter :" << static_cast<int>(type);
108
0
        }
109
0
        LOG(FATAL) << "__builtin_unreachable";
110
0
        __builtin_unreachable();
111
0
    }
112
113
    /*
114
    * Part 2, Constructors and property methods
115
    */
116
117
    static std::shared_ptr<MemTrackerLimiter> create_shared(MemTrackerLimiter::Type type,
118
                                                            const std::string& label,
119
                                                            int64_t byte_limit = -1);
120
    // byte_limit equal to -1 means no consumption limit, only participate in process memory statistics.
121
    MemTrackerLimiter(Type type, const std::string& label, int64_t byte_limit);
122
    ~MemTrackerLimiter();
123
124
0
    Type type() const { return _type; }
125
21
    const std::string& label() const { return _label; }
126
0
    int64_t group_num() const { return _group_num; }
127
244k
    int64_t limit() const { return _limit; }
128
25
    void set_limit(int64_t new_mem_limit) { _limit = new_mem_limit; }
129
121k
    void set_enable_check_limit(bool enable_check_limit) {
130
121k
        _enable_check_limit = enable_check_limit;
131
121k
    }
132
    Status check_limit(int64_t bytes = 0);
133
    // Log the memory usage when memory limit is exceeded.
134
    std::string tracker_limit_exceeded_str();
135
136
    static void clean_tracker_limiter_group();
137
138
    /*
139
    * Part 3, Memory tracking method (use carefully!)
140
    *
141
    * Note: Only memory not allocated by Doris Allocator can be tracked by manually calling consume() and release().
142
    *       Memory allocated by Doris Allocator needs to be tracked using SCOPED_ATTACH_TASK or
143
    *       SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER.
144
    */
145
146
608k
    int64_t consumption() const { return _mem_counter.current_value(); }
147
122k
    int64_t peak_consumption() const { return _mem_counter.peak_value(); }
148
149
    // Use carefully! only memory that cannot be allocated using Doris Allocator needs to be consumed manually.
150
    // Ideally, all memory should use Doris Allocator.
151
1.08M
    void consume(int64_t bytes) { _mem_counter.add(bytes); }
152
153
0
    void consume_no_update_peak(int64_t bytes) { _mem_counter.add_no_update_peak(bytes); }
154
155
317k
    void release(int64_t bytes) { _mem_counter.sub(bytes); }
156
157
13
    bool try_consume(int64_t bytes) {
158
13
        if (UNLIKELY(bytes == 0)) {
159
0
            return true;
160
0
        }
161
13
        if (limit() >= 0) {
162
4
            return _mem_counter.try_add(bytes, _limit);
163
9
        } else {
164
9
            _mem_counter.add(bytes);
165
9
            return true;
166
9
        }
167
13
    }
168
169
0
    void set_consumption(int64_t bytes) { _mem_counter.set(bytes); }
170
171
    // Transfer 'bytes' of consumption from this tracker to 'dst'.
172
0
    void transfer_to(int64_t size, MemTrackerLimiter* dst) {
173
0
        if (label() == dst->label()) {
174
0
            return;
175
0
        }
176
0
        cache_consume(-size);
177
0
        dst->cache_consume(size);
178
0
    }
179
180
    // If need to consume the tracker frequently, use it
181
    void cache_consume(int64_t bytes);
182
183
    /*
184
    * Part 4, Reserved memory tracking method
185
    */
186
187
608k
    int64_t reserved_consumption() const { return _reserved_counter.current_value(); }
188
0
    int64_t reserved_peak_consumption() const { return _reserved_counter.peak_value(); }
189
190
12
    void reserve(int64_t bytes) {
191
12
        if (UNLIKELY(bytes == 0)) {
192
0
            return;
193
0
        }
194
12
        _mem_counter.add(bytes);
195
12
        _reserved_counter.add(bytes);
196
12
    }
197
198
13
    bool try_reserve(int64_t bytes) {
199
13
        if (try_consume(bytes)) {
200
12
            _reserved_counter.add(bytes);
201
12
            return true;
202
12
        } else {
203
1
            return false;
204
1
        }
205
13
    }
206
207
34
    void shrink_reserved(int64_t bytes) {
208
34
        _reserved_counter.sub(bytes);
209
34
        DCHECK(reserved_consumption() >= 0);
210
34
    }
211
212
    /*
213
    * Part 5, Memory profile and log method
214
    */
215
    RuntimeProfile* make_profile(RuntimeProfile* profile) const;
216
    std::string make_profile_str() const;
217
    static void make_type_trackers_profile(RuntimeProfile* profile, MemTrackerLimiter::Type type);
218
    static std::string make_type_trackers_profile_str(MemTrackerLimiter::Type type);
219
    static void make_top_consumption_tasks_tracker_profile(RuntimeProfile* profile, int top_num);
220
    static void make_all_tasks_tracker_profile(RuntimeProfile* profile);
221
222
62
    std::shared_ptr<MemTrackerLimiter> write_tracker() { return _write_tracker; }
223
224
    void print_log_usage(const std::string& msg);
225
0
    void enable_print_log_usage() { _enable_print_log_usage = true; }
226
227
    /*
228
    * Part 6, Memory debug method
229
    */
230
231
    void add_address_sanitizers(void* buf, size_t size);
232
    void remove_address_sanitizers(void* buf, size_t size);
233
    bool is_group_commit_load {false};
234
235
private:
236
    // When the accumulated untracked memory value exceeds the upper limit,
237
    // the current value is returned and set to 0.
238
    // Thread safety.
239
    int64_t add_untracked_mem(int64_t bytes);
240
241
    /*
242
    * Part 8, Property definition
243
    */
244
245
    Type _type;
246
247
    // label used in the make snapshot, not guaranteed unique.
248
    std::string _label;
249
    // For generate runtime profile, profile name must be unique.
250
    UniqueId _uid;
251
252
    MemCounter _mem_counter;
253
    MemCounter _reserved_counter;
254
255
    // Limit on memory consumption, in bytes.
256
    std::atomic<int64_t> _limit;
257
    bool _enable_check_limit = true;
258
259
    // Group number in mem_tracker_limiter_pool and mem_tracker_pool, generated by the timestamp.
260
    int64_t _group_num;
261
262
    // Consume size smaller than mem_tracker_consume_min_size_bytes will continue to accumulate
263
    // to avoid frequent calls to consume/release of MemTracker.
264
    std::atomic<int64_t> _untracked_mem = 0;
265
266
    // Avoid frequent printing.
267
    bool _enable_print_log_usage = false;
268
269
    std::shared_ptr<MemTrackerLimiter> _write_tracker;
270
271
    struct AddressSanitizer {
272
        size_t size;
273
        std::string stack_trace;
274
    };
275
276
    std::string print_address_sanitizers();
277
    bool open_memory_tracker_inaccurate_detect();
278
    std::mutex _address_sanitizers_mtx;
279
    std::unordered_map<void*, AddressSanitizer> _address_sanitizers;
280
    std::vector<std::string> _error_address_sanitizers;
281
};
282
283
0
inline int64_t MemTrackerLimiter::add_untracked_mem(int64_t bytes) {
284
0
    _untracked_mem += bytes;
285
0
    if (std::abs(_untracked_mem) >= config::mem_tracker_consume_min_size_bytes) {
286
0
        return _untracked_mem.exchange(0);
287
0
    }
288
0
    return 0;
289
0
}
290
291
0
inline void MemTrackerLimiter::cache_consume(int64_t bytes) {
292
0
    if (bytes == 0) {
293
0
        return;
294
0
    }
295
0
    int64_t consume_bytes = add_untracked_mem(bytes);
296
0
    consume(consume_bytes);
297
0
}
298
299
2.84M
inline Status MemTrackerLimiter::check_limit(int64_t bytes) {
300
2.84M
    if (bytes <= 0 || !_enable_check_limit || _limit <= 0) {
301
2.84M
        return Status::OK();
302
2.84M
    }
303
304
    // If reserve not enabled, then should check limit here to kill the query when limit exceed.
305
    // For insert into select or pure load job, its memtable is accounted in a seperate memtracker limiter,
306
    // and its reserve is set to true. So that it will not reach this logic.
307
    // Only query and load job has exec_mem_limit and the _limit > 0, other memtracker limiter's _limit is -1 so
308
    // it will not take effect.
309
0
    if (consumption() + bytes > _limit) {
310
0
        return Status::MemoryLimitExceeded(fmt::format("failed alloc size {}, {}",
311
0
                                                       PrettyPrinter::print_bytes(bytes),
312
0
                                                       tracker_limit_exceeded_str()));
313
0
    }
314
0
    return Status::OK();
315
0
}
316
317
#include "common/compile_check_end.h"
318
} // namespace doris