Coverage Report

Created: 2026-04-14 17:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/memory/mem_tracker_limiter.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/memory/mem_tracker_limiter.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/types.pb.h>
22
23
#include <functional>
24
#include <mutex>
25
#include <queue>
26
#include <utility>
27
28
#include "common/config.h"
29
#include "runtime/exec_env.h"
30
#include "runtime/fragment_mgr.h"
31
#include "runtime/memory/global_memory_arbitrator.h"
32
#include "runtime/runtime_profile.h"
33
#include "runtime/thread_context.h"
34
#include "runtime/workload_group/workload_group.h"
35
#include "service/backend_options.h"
36
#include "util/mem_info.h"
37
38
namespace doris {
39
40
static bvar::Adder<int64_t> memory_memtrackerlimiter_cnt("memory_memtrackerlimiter_cnt");
41
42
std::atomic<long> mem_tracker_limiter_group_counter(0);
43
44
608k
MemTrackerLimiter::MemTrackerLimiter(Type type, const std::string& label, int64_t byte_limit) {
45
608k
    DCHECK_GE(byte_limit, -1);
46
608k
    _type = type;
47
608k
    _label = label;
48
608k
    _limit = byte_limit;
49
608k
    _uid = UniqueId::gen_uid();
50
608k
    if (_type == Type::GLOBAL) {
51
45
        _group_num = 0;
52
608k
    } else if (_type == Type::METADATA) {
53
12
        _group_num = 1;
54
608k
    } else if (_type == Type::CACHE) {
55
2.33k
        _group_num = 2;
56
606k
    } else {
57
606k
        _group_num =
58
606k
                mem_tracker_limiter_group_counter.fetch_add(1) % (MEM_TRACKER_GROUP_NUM - 3) + 3;
59
606k
    }
60
608k
    memory_memtrackerlimiter_cnt << 1;
61
608k
}
62
63
std::shared_ptr<MemTrackerLimiter> MemTrackerLimiter::create_shared(MemTrackerLimiter::Type type,
64
                                                                    const std::string& label,
65
304k
                                                                    int64_t byte_limit) {
66
304k
    auto tracker = std::make_shared<MemTrackerLimiter>(type, label, byte_limit);
67
    // Write tracker is only used to tracker the size, memtable has a separate logic to deal with memory flush,
68
    // so limit == -1, so that should not check the limit in memtracker.
69
304k
    auto write_tracker = std::make_shared<MemTrackerLimiter>(type, "Memtable#" + label, -1);
70
304k
    tracker->_write_tracker.swap(write_tracker);
71
#ifndef BE_TEST
72
    DCHECK(ExecEnv::tracking_memory());
73
    std::lock_guard<std::mutex> l(
74
            ExecEnv::GetInstance()->mem_tracker_limiter_pool[tracker->group_num()].group_lock);
75
    ExecEnv::GetInstance()->mem_tracker_limiter_pool[tracker->group_num()].trackers.insert(
76
            ExecEnv::GetInstance()->mem_tracker_limiter_pool[tracker->group_num()].trackers.end(),
77
            tracker);
78
#endif
79
304k
    return tracker;
80
304k
}
81
82
608k
bool MemTrackerLimiter::open_memory_tracker_inaccurate_detect() {
83
608k
    return doris::config::crash_in_memory_tracker_inaccurate &&
84
608k
           (_type == Type::COMPACTION || _type == Type::SCHEMA_CHANGE || _type == Type::QUERY ||
85
0
            (_type == Type::LOAD && !is_group_commit_load));
86
608k
}
87
88
608k
MemTrackerLimiter::~MemTrackerLimiter() {
89
608k
    consume(_untracked_mem);
90
608k
    static std::string mem_tracker_inaccurate_msg =
91
608k
            "mem tracker not equal to 0 when mem tracker destruct, this usually means that "
92
608k
            "memory tracking is inaccurate and SCOPED_ATTACH_TASK and "
93
608k
            "SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used correctly. "
94
608k
            "If the log is truncated, search for `Address Sanitizer` in the be.INFO log to see "
95
608k
            "more information."
96
608k
            "1. For query and load, memory leaks may have occurred, it is expected that the query "
97
608k
            "mem tracker will be bound to the thread context using SCOPED_ATTACH_TASK and "
98
608k
            "SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER before all memory alloc and free. "
99
608k
            "2. If a memory alloc is recorded by this tracker, it is expected that be "
100
608k
            "recorded in this tracker when memory is freed. "
101
608k
            "3. Merge the remaining memory tracking value by "
102
608k
            "this tracker into Orphan, if you observe that Orphan is not equal to 0 in the mem "
103
608k
            "tracker web or log, this indicates that there may be a memory leak. "
104
608k
            "4. If you need to "
105
608k
            "transfer memory tracking value between two trackers, can use transfer_to.";
106
608k
    if (consumption() != 0) {
107
74
        if (open_memory_tracker_inaccurate_detect()) {
108
            // Maybe the memory is recorded in orphan memory tracker, so that we print the stack trace in
109
            // orphan memory tracker first to help us debug.
110
0
            LOG(INFO) << "Orphan memory tracker consumption: "
111
0
                      << ExecEnv::GetInstance()->orphan_mem_tracker()->print_address_sanitizers();
112
0
            std::string err_msg = fmt::format(
113
0
                    "mem tracker label: {}, consumption: {}, peak consumption: {}, {}.", label(),
114
0
                    consumption(), peak_consumption(), mem_tracker_inaccurate_msg);
115
0
            LOG(FATAL) << err_msg << print_address_sanitizers();
116
0
        }
117
74
        if (ExecEnv::tracking_memory()) {
118
0
            ExecEnv::GetInstance()->orphan_mem_tracker()->consume(consumption());
119
0
        }
120
74
        _mem_counter.set(0);
121
608k
    } else if (open_memory_tracker_inaccurate_detect() && !_address_sanitizers.empty()) {
122
        // Maybe the memory is recorded in orphan memory tracker, so that we print the stack trace in
123
        // orphan memory tracker first to help us debug.
124
0
        LOG(INFO) << "Orphan memory tracker consumption: "
125
0
                  << ExecEnv::GetInstance()->orphan_mem_tracker()->print_address_sanitizers();
126
0
        LOG(FATAL) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. "
127
0
                   << ", mem tracker label: " << _label
128
0
                   << ", peak consumption: " << peak_consumption() << print_address_sanitizers();
129
0
    }
130
608k
    DCHECK_EQ(reserved_consumption(), 0);
131
608k
    memory_memtrackerlimiter_cnt << -1;
132
608k
}
133
134
0
void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) {
135
0
    if (open_memory_tracker_inaccurate_detect()) {
136
0
        std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
137
0
        auto it = _address_sanitizers.find(buf);
138
0
        if (it != _address_sanitizers.end()) {
139
0
            _error_address_sanitizers.emplace_back(
140
0
                    fmt::format("[Address Sanitizer] memory buf repeat add, mem tracker label: {}, "
141
0
                                "consumption: {}, peak consumption: {}, buf: {}, size: {}, old "
142
0
                                "buf: {}, old size: {}, new stack_trace: {}, old stack_trace: {}.",
143
0
                                _label, consumption(), peak_consumption(), buf, size, it->first,
144
0
                                it->second.size, get_stack_trace(1, "FULL_WITH_INLINE"),
145
0
                                it->second.stack_trace));
146
0
        }
147
148
        // if alignment not equal to 0, maybe usable_size > size.
149
0
        AddressSanitizer as = {size, doris::config::enable_address_sanitizers_with_stack_trace
150
0
                                             ? get_stack_trace(1, "DISABLED")
151
0
                                             : ""};
152
0
        _address_sanitizers.emplace(buf, as);
153
0
    }
154
0
}
155
156
0
void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) {
157
0
    if (open_memory_tracker_inaccurate_detect()) {
158
0
        std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
159
0
        auto it = _address_sanitizers.find(buf);
160
0
        if (it != _address_sanitizers.end()) {
161
0
            if (it->second.size != size) {
162
0
                _error_address_sanitizers.emplace_back(fmt::format(
163
0
                        "[Address Sanitizer] free memory buf size inaccurate, mem tracker label: "
164
0
                        "{}, consumption: {}, peak consumption: {}, buf: {}, size: {}, old buf: "
165
0
                        "{}, old size: {}, new stack_trace: {}, old stack_trace: {}.",
166
0
                        _label, consumption(), peak_consumption(), buf, size, it->first,
167
0
                        it->second.size, get_stack_trace(1, "FULL_WITH_INLINE"),
168
0
                        it->second.stack_trace));
169
0
            }
170
0
            _address_sanitizers.erase(buf);
171
0
        } else {
172
0
            _error_address_sanitizers.emplace_back(fmt::format(
173
0
                    "[Address Sanitizer] memory buf not exist, mem tracker label: {}, consumption: "
174
0
                    "{}, peak consumption: {}, buf: {}, size: {}, stack_trace: {}.",
175
0
                    _label, consumption(), peak_consumption(), buf, size,
176
0
                    get_stack_trace(1, "FULL_WITH_INLINE")));
177
0
        }
178
0
    }
179
0
}
180
181
0
std::string MemTrackerLimiter::print_address_sanitizers() {
182
0
    std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
183
0
    std::string detail = "[Address Sanitizer]:";
184
0
    detail += "\n memory not be freed:";
185
0
    for (const auto& it : _address_sanitizers) {
186
0
        auto msg = fmt::format(
187
0
                "\n    [Address Sanitizer] buf not be freed, mem tracker label: {}, consumption: "
188
0
                "{}, peak consumption: {}, buf: {}, size {}, strack trace: {}",
189
0
                _label, consumption(), peak_consumption(), it.first, it.second.size,
190
0
                it.second.stack_trace);
191
0
        LOG(INFO) << msg;
192
0
        detail += msg;
193
0
    }
194
0
    detail += "\n incorrect memory alloc and free:";
195
0
    for (const auto& err_msg : _error_address_sanitizers) {
196
0
        LOG(INFO) << err_msg;
197
0
        detail += fmt::format("\n    {}", err_msg);
198
0
    }
199
0
    return detail;
200
0
}
201
202
0
RuntimeProfile* MemTrackerLimiter::make_profile(RuntimeProfile* profile) const {
203
0
    RuntimeProfile* profile_snapshot = profile->create_child(
204
0
            fmt::format("{}@{}@id={}", _label, type_string(_type), _uid.to_string()), true, false);
205
0
    RuntimeProfile::HighWaterMarkCounter* usage_counter =
206
0
            profile_snapshot->AddHighWaterMarkCounter("Memory", TUnit::BYTES);
207
0
    COUNTER_SET(usage_counter, peak_consumption());
208
0
    COUNTER_SET(usage_counter, consumption());
209
0
    if (limit() >= 0) {
210
0
        RuntimeProfile::Counter* limit_counter =
211
0
                ADD_COUNTER(profile_snapshot, "Limit", TUnit::BYTES);
212
0
        COUNTER_SET(limit_counter, _limit);
213
0
    }
214
0
    if (reserved_peak_consumption() != 0) {
215
0
        RuntimeProfile::HighWaterMarkCounter* reserved_counter =
216
0
                profile_snapshot->AddHighWaterMarkCounter("ReservedMemory", TUnit::BYTES);
217
0
        COUNTER_SET(reserved_counter, reserved_peak_consumption());
218
0
        COUNTER_SET(reserved_counter, reserved_consumption());
219
0
    }
220
0
    return profile_snapshot;
221
0
}
222
223
0
std::string MemTrackerLimiter::make_profile_str() const {
224
0
    std::unique_ptr<RuntimeProfile> profile_snapshot =
225
0
            std::make_unique<RuntimeProfile>("MemTrackerSnapshot");
226
0
    make_profile(profile_snapshot.get());
227
0
    std::stringstream ss;
228
0
    profile_snapshot->pretty_print(&ss);
229
0
    return ss.str();
230
0
}
231
232
0
void MemTrackerLimiter::clean_tracker_limiter_group() {
233
#ifndef BE_TEST
234
    if (ExecEnv::tracking_memory()) {
235
        for (auto& group : ExecEnv::GetInstance()->mem_tracker_limiter_pool) {
236
            std::lock_guard<std::mutex> l(group.group_lock);
237
            auto it = group.trackers.begin();
238
            while (it != group.trackers.end()) {
239
                if ((*it).expired()) {
240
                    it = group.trackers.erase(it);
241
                } else {
242
                    ++it;
243
                }
244
            }
245
        }
246
    }
247
#endif
248
0
}
249
250
void MemTrackerLimiter::make_type_trackers_profile(RuntimeProfile* profile,
251
0
                                                   MemTrackerLimiter::Type type) {
252
0
    if (type == Type::GLOBAL) {
253
0
        std::lock_guard<std::mutex> l(
254
0
                ExecEnv::GetInstance()->mem_tracker_limiter_pool[0].group_lock);
255
0
        for (auto trackerWptr : ExecEnv::GetInstance()->mem_tracker_limiter_pool[0].trackers) {
256
0
            auto tracker = trackerWptr.lock();
257
0
            if (tracker != nullptr) {
258
0
                tracker->make_profile(profile);
259
0
            }
260
0
        }
261
0
    } else if (type == Type::METADATA) {
262
0
        std::lock_guard<std::mutex> l(
263
0
                ExecEnv::GetInstance()->mem_tracker_limiter_pool[1].group_lock);
264
0
        for (auto trackerWptr : ExecEnv::GetInstance()->mem_tracker_limiter_pool[1].trackers) {
265
0
            auto tracker = trackerWptr.lock();
266
0
            if (tracker != nullptr) {
267
0
                tracker->make_profile(profile);
268
0
            }
269
0
        }
270
0
    } else if (type == Type::CACHE) {
271
0
        std::lock_guard<std::mutex> l(
272
0
                ExecEnv::GetInstance()->mem_tracker_limiter_pool[2].group_lock);
273
0
        for (auto trackerWptr : ExecEnv::GetInstance()->mem_tracker_limiter_pool[2].trackers) {
274
0
            auto tracker = trackerWptr.lock();
275
0
            if (tracker != nullptr) {
276
0
                tracker->make_profile(profile);
277
0
            }
278
0
        }
279
0
    } else {
280
0
        for (unsigned i = 3; i < ExecEnv::GetInstance()->mem_tracker_limiter_pool.size(); ++i) {
281
0
            std::lock_guard<std::mutex> l(
282
0
                    ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].group_lock);
283
0
            for (auto trackerWptr : ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].trackers) {
284
0
                auto tracker = trackerWptr.lock();
285
0
                if (tracker != nullptr && tracker->type() == type) {
286
0
                    tracker->make_profile(profile);
287
0
                }
288
0
            }
289
0
        }
290
0
    }
291
0
}
292
293
0
std::string MemTrackerLimiter::make_type_trackers_profile_str(MemTrackerLimiter::Type type) {
294
0
    std::unique_ptr<RuntimeProfile> profile_snapshot =
295
0
            std::make_unique<RuntimeProfile>("TypeMemTrackersSnapshot");
296
0
    make_type_trackers_profile(profile_snapshot.get(), type);
297
0
    std::stringstream ss;
298
0
    profile_snapshot->pretty_print(&ss);
299
0
    return ss.str();
300
0
}
301
302
void MemTrackerLimiter::make_top_consumption_tasks_tracker_profile(RuntimeProfile* profile,
303
0
                                                                   int top_num) {
304
0
    std::unique_ptr<RuntimeProfile> tmp_profile_snapshot =
305
0
            std::make_unique<RuntimeProfile>("tmpSnapshot");
306
0
    std::priority_queue<std::pair<int64_t, RuntimeProfile*>> max_pq;
307
    // start from 3, not include global/metadata/cache type.
308
0
    for (unsigned i = 3; i < ExecEnv::GetInstance()->mem_tracker_limiter_pool.size(); ++i) {
309
0
        std::lock_guard<std::mutex> l(
310
0
                ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].group_lock);
311
0
        for (auto trackerWptr : ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].trackers) {
312
0
            auto tracker = trackerWptr.lock();
313
0
            if (tracker != nullptr) {
314
0
                auto* profile_snapshot = tracker->make_profile(tmp_profile_snapshot.get());
315
0
                max_pq.emplace(tracker->consumption(), profile_snapshot);
316
0
            }
317
0
        }
318
0
    }
319
320
0
    while (!max_pq.empty() && top_num > 0) {
321
0
        RuntimeProfile* profile_snapshot =
322
0
                profile->create_child(max_pq.top().second->name(), true, false);
323
0
        profile_snapshot->merge(max_pq.top().second);
324
0
        top_num--;
325
0
        max_pq.pop();
326
0
    }
327
0
}
328
329
0
void MemTrackerLimiter::make_all_tasks_tracker_profile(RuntimeProfile* profile) {
330
0
    std::unordered_map<Type, RuntimeProfile*> types_profile;
331
0
    types_profile[Type::QUERY] = profile->create_child("QueryTasks", true, false);
332
0
    types_profile[Type::LOAD] = profile->create_child("LoadTasks", true, false);
333
0
    types_profile[Type::COMPACTION] = profile->create_child("CompactionTasks", true, false);
334
0
    types_profile[Type::SCHEMA_CHANGE] = profile->create_child("SchemaChangeTasks", true, false);
335
0
    types_profile[Type::OTHER] = profile->create_child("OtherTasks", true, false);
336
337
    // start from 3, not include global/metadata/cache type.
338
0
    for (unsigned i = 3; i < ExecEnv::GetInstance()->mem_tracker_limiter_pool.size(); ++i) {
339
0
        std::lock_guard<std::mutex> l(
340
0
                ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].group_lock);
341
0
        for (auto trackerWptr : ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].trackers) {
342
0
            auto tracker = trackerWptr.lock();
343
0
            if (tracker != nullptr) {
344
                // ResultBlockBufferBase will continue to exist for 5 minutes after the query ends, even if the
345
                // result buffer is empty, and will not be shown in the profile. of course, this code is tricky.
346
0
                if (tracker->consumption() == 0 &&
347
0
                    tracker->label().starts_with("ResultBlockBuffer")) {
348
0
                    continue;
349
0
                }
350
0
                tracker->make_profile(types_profile[tracker->type()]);
351
0
            }
352
0
        }
353
0
    }
354
0
}
355
356
0
void MemTrackerLimiter::print_log_usage(const std::string& msg) {
357
0
    if (_enable_print_log_usage) {
358
0
        _enable_print_log_usage = false;
359
0
        std::string detail = msg;
360
0
        detail += "\nProcess Memory Summary: " + GlobalMemoryArbitrator::process_mem_log_str();
361
0
        detail += "\n" + make_profile_str();
362
0
        LOG(WARNING) << detail;
363
0
    }
364
0
}
365
366
0
std::string MemTrackerLimiter::tracker_limit_exceeded_str() {
367
0
    std::string err_msg = fmt::format(
368
0
            "memory tracker limit exceeded, tracker label:{}, type:{}, limit "
369
0
            "{}, peak used {}, current used {}. backend {}, {}.",
370
0
            label(), type_string(_type), PrettyPrinter::print_bytes(limit()),
371
0
            PrettyPrinter::print_bytes(peak_consumption()),
372
0
            PrettyPrinter::print_bytes(consumption()), BackendOptions::get_localhost(),
373
0
            GlobalMemoryArbitrator::process_memory_used_str());
374
0
    if (_type == Type::QUERY || _type == Type::LOAD) {
375
0
        err_msg += fmt::format(
376
0
                " exec node:<{}>, can `set exec_mem_limit` to change limit, details see be.INFO.",
377
0
                doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker_label());
378
0
    } else if (_type == Type::SCHEMA_CHANGE) {
379
0
        err_msg += fmt::format(
380
0
                " can modify `memory_limitation_per_thread_for_schema_change_bytes` in be.conf to "
381
0
                "change limit, details see be.INFO.");
382
0
    }
383
0
    return err_msg;
384
0
}
385
386
} // namespace doris