Coverage Report

Created: 2026-04-14 17:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/memory/mem_tracker_limiter.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/Metrics_types.h>
21
#include <gen_cpp/Types_types.h>
22
#include <glog/logging.h>
23
24
#include <atomic>
25
#include <cstdint>
26
// IWYU pragma: no_include <bits/std_abs.h>
27
#include <cmath> // IWYU pragma: keep
28
#include <list>
29
#include <memory>
30
#include <ostream>
31
#include <string>
32
#include <unordered_map>
33
#include <vector>
34
35
#include "common/config.h"
36
#include "common/status.h"
37
#include "runtime/memory/mem_counter.h"
38
#include "runtime/memory/mem_tracker.h"
39
#include "util/string_util.h"
40
#include "util/uid_util.h"
41
42
namespace doris {
43
44
class RuntimeProfile;
45
class MemTrackerLimiter;
46
47
constexpr size_t MEM_TRACKER_GROUP_NUM = 1000;
48
49
struct TrackerLimiterGroup {
50
    // Note! in order to enable ExecEnv::mem_tracker_limiter_pool support resize,
51
    // the copy construction of TrackerLimiterGroup is disabled.
52
    // so cannot copy TrackerLimiterGroup anywhere, should use reference.
53
1
    TrackerLimiterGroup() = default;
54
0
    TrackerLimiterGroup(TrackerLimiterGroup&&) noexcept {}
55
1.00k
    TrackerLimiterGroup(const TrackerLimiterGroup&) {}
56
0
    TrackerLimiterGroup& operator=(const TrackerLimiterGroup&) { return *this; }
57
58
    std::list<std::weak_ptr<MemTrackerLimiter>> trackers;
59
    std::mutex group_lock;
60
};
61
62
/*
63
 * Track and limit the memory usage of process and query.
64
 *
65
 * Usually, put Query MemTrackerLimiter into SCOPED_ATTACH_TASK when the thread starts,
66
 * all memory used by this thread will be recorded on this Query.
67
 *
68
 * This class is thread-safe.
69
*/
70
class MemTrackerLimiter final {
71
public:
72
    /*
73
    * Part 1, Type definition
74
    */
75
76
    enum class Type {
77
        GLOBAL = 0,        // Life cycle is the same as the process, except cache and metadata.
78
        QUERY = 1,         // Count the memory consumption of all Query tasks.
79
        LOAD = 2,          // Count the memory consumption of all Load tasks.
80
        COMPACTION = 3,    // Count the memory consumption of all Base and Cumulative tasks.
81
        SCHEMA_CHANGE = 4, // Count the memory consumption of all SchemaChange tasks.
82
        METADATA = 5,      // Count the memory consumption of all Metadata.
83
        CACHE = 6,         // Count the memory consumption of all Cache.
84
        OTHER = 7, // Count the memory consumption of all other tasks, such as Clone, Snapshot, etc..
85
    };
86
87
0
    static std::string type_string(Type type) {
88
0
        switch (type) {
89
0
        case Type::GLOBAL:
90
0
            return "global";
91
0
        case Type::QUERY:
92
0
            return "query";
93
0
        case Type::LOAD:
94
0
            return "load";
95
0
        case Type::COMPACTION:
96
0
            return "compaction";
97
0
        case Type::SCHEMA_CHANGE:
98
0
            return "schema_change";
99
0
        case Type::METADATA:
100
0
            return "metadata";
101
0
        case Type::CACHE:
102
0
            return "cache";
103
0
        case Type::OTHER:
104
0
            return "other_task";
105
0
        default:
106
0
            LOG(FATAL) << "not match type of mem tracker limiter :" << static_cast<int>(type);
107
0
        }
108
0
        LOG(FATAL) << "__builtin_unreachable";
109
0
        __builtin_unreachable();
110
0
    }
111
112
    /*
113
    * Part 2, Constructors and property methods
114
    */
115
116
    static std::shared_ptr<MemTrackerLimiter> create_shared(MemTrackerLimiter::Type type,
117
                                                            const std::string& label,
118
                                                            int64_t byte_limit = -1);
119
    // byte_limit equal to -1 means no consumption limit, only participate in process memory statistics.
120
    MemTrackerLimiter(Type type, const std::string& label, int64_t byte_limit);
121
    ~MemTrackerLimiter();
122
123
0
    Type type() const { return _type; }
124
40
    const std::string& label() const { return _label; }
125
0
    int64_t group_num() const { return _group_num; }
126
245k
    int64_t limit() const { return _limit; }
127
48
    void set_limit(int64_t new_mem_limit) { _limit = new_mem_limit; }
128
122k
    void set_enable_check_limit(bool enable_check_limit) {
129
122k
        _enable_check_limit = enable_check_limit;
130
122k
    }
131
    Status check_limit(int64_t bytes = 0);
132
    // Log the memory usage when memory limit is exceeded.
133
    std::string tracker_limit_exceeded_str();
134
135
    static void clean_tracker_limiter_group();
136
137
    /*
138
    * Part 3, Memory tracking method (use carefully!)
139
    *
140
    * Note: Only memory not allocated by Doris Allocator can be tracked by manually calling consume() and release().
141
    *       Memory allocated by Doris Allocator needs to be tracked using SCOPED_ATTACH_TASK or
142
    *       SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER.
143
    */
144
145
609k
    int64_t consumption() const { return _mem_counter.current_value(); }
146
122k
    int64_t peak_consumption() const { return _mem_counter.peak_value(); }
147
148
    // Use carefully! only memory that cannot be allocated using Doris Allocator needs to be consumed manually.
149
    // Ideally, all memory should use Doris Allocator.
150
1.09M
    void consume(int64_t bytes) { _mem_counter.add(bytes); }
151
152
0
    void consume_no_update_peak(int64_t bytes) { _mem_counter.add_no_update_peak(bytes); }
153
154
317k
    void release(int64_t bytes) { _mem_counter.sub(bytes); }
155
156
15
    bool try_consume(int64_t bytes) {
157
15
        if (UNLIKELY(bytes == 0)) {
158
0
            return true;
159
0
        }
160
15
        if (limit() >= 0) {
161
6
            return _mem_counter.try_add(bytes, _limit);
162
9
        } else {
163
9
            _mem_counter.add(bytes);
164
9
            return true;
165
9
        }
166
15
    }
167
168
0
    void set_consumption(int64_t bytes) { _mem_counter.set(bytes); }
169
170
    // Transfer 'bytes' of consumption from this tracker to 'dst'.
171
0
    void transfer_to(int64_t size, MemTrackerLimiter* dst) {
172
0
        if (label() == dst->label()) {
173
0
            return;
174
0
        }
175
0
        cache_consume(-size);
176
0
        dst->cache_consume(size);
177
0
    }
178
179
    // If need to consume the tracker frequently, use it
180
    void cache_consume(int64_t bytes);
181
182
    /*
183
    * Part 4, Reserved memory tracking method
184
    */
185
186
609k
    int64_t reserved_consumption() const { return _reserved_counter.current_value(); }
187
0
    int64_t reserved_peak_consumption() const { return _reserved_counter.peak_value(); }
188
189
0
    void reserve(int64_t bytes) {
190
0
        if (UNLIKELY(bytes == 0)) {
191
0
            return;
192
0
        }
193
0
        _mem_counter.add(bytes);
194
0
        _reserved_counter.add(bytes);
195
0
    }
196
197
15
    bool try_reserve(int64_t bytes) {
198
15
        if (try_consume(bytes)) {
199
14
            _reserved_counter.add(bytes);
200
14
            return true;
201
14
        } else {
202
1
            return false;
203
1
        }
204
15
    }
205
206
24
    void shrink_reserved(int64_t bytes) {
207
24
        _reserved_counter.sub(bytes);
208
24
        DCHECK(reserved_consumption() >= 0);
209
24
    }
210
211
    /*
212
    * Part 5, Memory profile and log method
213
    */
214
    RuntimeProfile* make_profile(RuntimeProfile* profile) const;
215
    std::string make_profile_str() const;
216
    static void make_type_trackers_profile(RuntimeProfile* profile, MemTrackerLimiter::Type type);
217
    static std::string make_type_trackers_profile_str(MemTrackerLimiter::Type type);
218
    static void make_top_consumption_tasks_tracker_profile(RuntimeProfile* profile, int top_num);
219
    static void make_all_tasks_tracker_profile(RuntimeProfile* profile);
220
221
62
    std::shared_ptr<MemTrackerLimiter> write_tracker() { return _write_tracker; }
222
223
    void print_log_usage(const std::string& msg);
224
0
    void enable_print_log_usage() { _enable_print_log_usage = true; }
225
226
    /*
227
    * Part 6, Memory debug method
228
    */
229
230
    void add_address_sanitizers(void* buf, size_t size);
231
    void remove_address_sanitizers(void* buf, size_t size);
232
    bool is_group_commit_load {false};
233
234
private:
235
    // When the accumulated untracked memory value exceeds the upper limit,
236
    // the current value is returned and set to 0.
237
    // Thread safety.
238
    int64_t add_untracked_mem(int64_t bytes);
239
240
    /*
241
    * Part 8, Property definition
242
    */
243
244
    Type _type;
245
246
    // label used in the make snapshot, not guaranteed unique.
247
    std::string _label;
248
    // For generate runtime profile, profile name must be unique.
249
    UniqueId _uid;
250
251
    MemCounter _mem_counter;
252
    MemCounter _reserved_counter;
253
254
    // Limit on memory consumption, in bytes.
255
    std::atomic<int64_t> _limit;
256
    bool _enable_check_limit = true;
257
258
    // Group number in mem_tracker_limiter_pool and mem_tracker_pool, generated by the timestamp.
259
    int64_t _group_num;
260
261
    // Consume size smaller than mem_tracker_consume_min_size_bytes will continue to accumulate
262
    // to avoid frequent calls to consume/release of MemTracker.
263
    std::atomic<int64_t> _untracked_mem = 0;
264
265
    // Avoid frequent printing.
266
    bool _enable_print_log_usage = false;
267
268
    std::shared_ptr<MemTrackerLimiter> _write_tracker;
269
270
    struct AddressSanitizer {
271
        size_t size;
272
        std::string stack_trace;
273
    };
274
275
    std::string print_address_sanitizers();
276
    bool open_memory_tracker_inaccurate_detect();
277
    std::mutex _address_sanitizers_mtx;
278
    std::unordered_map<void*, AddressSanitizer> _address_sanitizers;
279
    std::vector<std::string> _error_address_sanitizers;
280
};
281
282
0
inline int64_t MemTrackerLimiter::add_untracked_mem(int64_t bytes) {
283
0
    _untracked_mem += bytes;
284
0
    if (std::abs(_untracked_mem) >= config::mem_tracker_consume_min_size_bytes) {
285
0
        return _untracked_mem.exchange(0);
286
0
    }
287
0
    return 0;
288
0
}
289
290
0
inline void MemTrackerLimiter::cache_consume(int64_t bytes) {
291
0
    if (bytes == 0) {
292
0
        return;
293
0
    }
294
0
    int64_t consume_bytes = add_untracked_mem(bytes);
295
0
    consume(consume_bytes);
296
0
}
297
298
2.74M
inline Status MemTrackerLimiter::check_limit(int64_t bytes) {
299
2.74M
    if (bytes <= 0 || !_enable_check_limit || _limit <= 0) {
300
2.74M
        return Status::OK();
301
2.74M
    }
302
303
    // If reserve not enabled, then should check limit here to kill the query when limit exceed.
304
    // For insert into select or pure load job, its memtable is accounted in a seperate memtracker limiter,
305
    // and its reserve is set to true. So that it will not reach this logic.
306
    // Only query and load job has exec_mem_limit and the _limit > 0, other memtracker limiter's _limit is -1 so
307
    // it will not take effect.
308
0
    if (consumption() + bytes > _limit) {
309
0
        return Status::MemoryLimitExceeded(fmt::format("failed alloc size {}, {}",
310
0
                                                       PrettyPrinter::print_bytes(bytes),
311
0
                                                       tracker_limit_exceeded_str()));
312
0
    }
313
0
    return Status::OK();
314
0
}
315
316
} // namespace doris