Coverage Report

Created: 2026-03-15 08:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/memory/mem_tracker_limiter.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/memory/mem_tracker_limiter.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/types.pb.h>
22
23
#include <functional>
24
#include <mutex>
25
#include <queue>
26
#include <utility>
27
28
#include "common/config.h"
29
#include "runtime/exec_env.h"
30
#include "runtime/fragment_mgr.h"
31
#include "runtime/memory/global_memory_arbitrator.h"
32
#include "runtime/runtime_profile.h"
33
#include "runtime/thread_context.h"
34
#include "runtime/workload_group/workload_group.h"
35
#include "service/backend_options.h"
36
#include "util/mem_info.h"
37
38
namespace doris {
39
#include "common/compile_check_begin.h"
40
41
static bvar::Adder<int64_t> memory_memtrackerlimiter_cnt("memory_memtrackerlimiter_cnt");
42
43
std::atomic<long> mem_tracker_limiter_group_counter(0);
44
45
608k
MemTrackerLimiter::MemTrackerLimiter(Type type, const std::string& label, int64_t byte_limit) {
46
608k
    DCHECK_GE(byte_limit, -1);
47
608k
    _type = type;
48
608k
    _label = label;
49
608k
    _limit = byte_limit;
50
608k
    _uid = UniqueId::gen_uid();
51
608k
    if (_type == Type::GLOBAL) {
52
49
        _group_num = 0;
53
607k
    } else if (_type == Type::METADATA) {
54
12
        _group_num = 1;
55
607k
    } else if (_type == Type::CACHE) {
56
2.22k
        _group_num = 2;
57
605k
    } else {
58
605k
        _group_num =
59
605k
                mem_tracker_limiter_group_counter.fetch_add(1) % (MEM_TRACKER_GROUP_NUM - 3) + 3;
60
605k
    }
61
608k
    memory_memtrackerlimiter_cnt << 1;
62
608k
}
63
64
std::shared_ptr<MemTrackerLimiter> MemTrackerLimiter::create_shared(MemTrackerLimiter::Type type,
65
                                                                    const std::string& label,
66
304k
                                                                    int64_t byte_limit) {
67
304k
    auto tracker = std::make_shared<MemTrackerLimiter>(type, label, byte_limit);
68
    // Write tracker is only used to tracker the size, memtable has a separate logic to deal with memory flush,
69
    // so limit == -1, so that should not check the limit in memtracker.
70
304k
    auto write_tracker = std::make_shared<MemTrackerLimiter>(type, "Memtable#" + label, -1);
71
304k
    tracker->_write_tracker.swap(write_tracker);
72
#ifndef BE_TEST
73
    DCHECK(ExecEnv::tracking_memory());
74
    std::lock_guard<std::mutex> l(
75
            ExecEnv::GetInstance()->mem_tracker_limiter_pool[tracker->group_num()].group_lock);
76
    ExecEnv::GetInstance()->mem_tracker_limiter_pool[tracker->group_num()].trackers.insert(
77
            ExecEnv::GetInstance()->mem_tracker_limiter_pool[tracker->group_num()].trackers.end(),
78
            tracker);
79
#endif
80
304k
    return tracker;
81
304k
}
82
83
608k
bool MemTrackerLimiter::open_memory_tracker_inaccurate_detect() {
84
608k
    return doris::config::crash_in_memory_tracker_inaccurate &&
85
608k
           (_type == Type::COMPACTION || _type == Type::SCHEMA_CHANGE || _type == Type::QUERY ||
86
0
            (_type == Type::LOAD && !is_group_commit_load));
87
608k
}
88
89
608k
MemTrackerLimiter::~MemTrackerLimiter() {
90
608k
    consume(_untracked_mem);
91
608k
    static std::string mem_tracker_inaccurate_msg =
92
608k
            "mem tracker not equal to 0 when mem tracker destruct, this usually means that "
93
608k
            "memory tracking is inaccurate and SCOPED_ATTACH_TASK and "
94
608k
            "SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used correctly. "
95
608k
            "If the log is truncated, search for `Address Sanitizer` in the be.INFO log to see "
96
608k
            "more information."
97
608k
            "1. For query and load, memory leaks may have occurred, it is expected that the query "
98
608k
            "mem tracker will be bound to the thread context using SCOPED_ATTACH_TASK and "
99
608k
            "SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER before all memory alloc and free. "
100
608k
            "2. If a memory alloc is recorded by this tracker, it is expected that be "
101
608k
            "recorded in this tracker when memory is freed. "
102
608k
            "3. Merge the remaining memory tracking value by "
103
608k
            "this tracker into Orphan, if you observe that Orphan is not equal to 0 in the mem "
104
608k
            "tracker web or log, this indicates that there may be a memory leak. "
105
608k
            "4. If you need to "
106
608k
            "transfer memory tracking value between two trackers, can use transfer_to.";
107
608k
    if (consumption() != 0) {
108
72
        if (open_memory_tracker_inaccurate_detect()) {
109
            // Maybe the memory is recorded in orphan memory tracker, so that we print the stack trace in
110
            // orphan memory tracker first to help us debug.
111
0
            LOG(INFO) << "Orphan memory tracker consumption: "
112
0
                      << ExecEnv::GetInstance()->orphan_mem_tracker()->print_address_sanitizers();
113
0
            std::string err_msg = fmt::format(
114
0
                    "mem tracker label: {}, consumption: {}, peak consumption: {}, {}.", label(),
115
0
                    consumption(), peak_consumption(), mem_tracker_inaccurate_msg);
116
0
            LOG(FATAL) << err_msg << print_address_sanitizers();
117
0
        }
118
72
        if (ExecEnv::tracking_memory()) {
119
0
            ExecEnv::GetInstance()->orphan_mem_tracker()->consume(consumption());
120
0
        }
121
72
        _mem_counter.set(0);
122
607k
    } else if (open_memory_tracker_inaccurate_detect() && !_address_sanitizers.empty()) {
123
        // Maybe the memory is recorded in orphan memory tracker, so that we print the stack trace in
124
        // orphan memory tracker first to help us debug.
125
0
        LOG(INFO) << "Orphan memory tracker consumption: "
126
0
                  << ExecEnv::GetInstance()->orphan_mem_tracker()->print_address_sanitizers();
127
0
        LOG(FATAL) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. "
128
0
                   << ", mem tracker label: " << _label
129
0
                   << ", peak consumption: " << peak_consumption() << print_address_sanitizers();
130
0
    }
131
608k
    DCHECK_EQ(reserved_consumption(), 0);
132
608k
    memory_memtrackerlimiter_cnt << -1;
133
608k
}
134
135
0
void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) {
136
0
    if (open_memory_tracker_inaccurate_detect()) {
137
0
        std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
138
0
        auto it = _address_sanitizers.find(buf);
139
0
        if (it != _address_sanitizers.end()) {
140
0
            _error_address_sanitizers.emplace_back(
141
0
                    fmt::format("[Address Sanitizer] memory buf repeat add, mem tracker label: {}, "
142
0
                                "consumption: {}, peak consumption: {}, buf: {}, size: {}, old "
143
0
                                "buf: {}, old size: {}, new stack_trace: {}, old stack_trace: {}.",
144
0
                                _label, consumption(), peak_consumption(), buf, size, it->first,
145
0
                                it->second.size, get_stack_trace(1, "FULL_WITH_INLINE"),
146
0
                                it->second.stack_trace));
147
0
        }
148
149
        // if alignment not equal to 0, maybe usable_size > size.
150
0
        AddressSanitizer as = {size, doris::config::enable_address_sanitizers_with_stack_trace
151
0
                                             ? get_stack_trace(1, "DISABLED")
152
0
                                             : ""};
153
0
        _address_sanitizers.emplace(buf, as);
154
0
    }
155
0
}
156
157
0
void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) {
158
0
    if (open_memory_tracker_inaccurate_detect()) {
159
0
        std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
160
0
        auto it = _address_sanitizers.find(buf);
161
0
        if (it != _address_sanitizers.end()) {
162
0
            if (it->second.size != size) {
163
0
                _error_address_sanitizers.emplace_back(fmt::format(
164
0
                        "[Address Sanitizer] free memory buf size inaccurate, mem tracker label: "
165
0
                        "{}, consumption: {}, peak consumption: {}, buf: {}, size: {}, old buf: "
166
0
                        "{}, old size: {}, new stack_trace: {}, old stack_trace: {}.",
167
0
                        _label, consumption(), peak_consumption(), buf, size, it->first,
168
0
                        it->second.size, get_stack_trace(1, "FULL_WITH_INLINE"),
169
0
                        it->second.stack_trace));
170
0
            }
171
0
            _address_sanitizers.erase(buf);
172
0
        } else {
173
0
            _error_address_sanitizers.emplace_back(fmt::format(
174
0
                    "[Address Sanitizer] memory buf not exist, mem tracker label: {}, consumption: "
175
0
                    "{}, peak consumption: {}, buf: {}, size: {}, stack_trace: {}.",
176
0
                    _label, consumption(), peak_consumption(), buf, size,
177
0
                    get_stack_trace(1, "FULL_WITH_INLINE")));
178
0
        }
179
0
    }
180
0
}
181
182
0
std::string MemTrackerLimiter::print_address_sanitizers() {
183
0
    std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
184
0
    std::string detail = "[Address Sanitizer]:";
185
0
    detail += "\n memory not be freed:";
186
0
    for (const auto& it : _address_sanitizers) {
187
0
        auto msg = fmt::format(
188
0
                "\n    [Address Sanitizer] buf not be freed, mem tracker label: {}, consumption: "
189
0
                "{}, peak consumption: {}, buf: {}, size {}, strack trace: {}",
190
0
                _label, consumption(), peak_consumption(), it.first, it.second.size,
191
0
                it.second.stack_trace);
192
0
        LOG(INFO) << msg;
193
0
        detail += msg;
194
0
    }
195
0
    detail += "\n incorrect memory alloc and free:";
196
0
    for (const auto& err_msg : _error_address_sanitizers) {
197
0
        LOG(INFO) << err_msg;
198
0
        detail += fmt::format("\n    {}", err_msg);
199
0
    }
200
0
    return detail;
201
0
}
202
203
0
RuntimeProfile* MemTrackerLimiter::make_profile(RuntimeProfile* profile) const {
204
0
    RuntimeProfile* profile_snapshot = profile->create_child(
205
0
            fmt::format("{}@{}@id={}", _label, type_string(_type), _uid.to_string()), true, false);
206
0
    RuntimeProfile::HighWaterMarkCounter* usage_counter =
207
0
            profile_snapshot->AddHighWaterMarkCounter("Memory", TUnit::BYTES);
208
0
    COUNTER_SET(usage_counter, peak_consumption());
209
0
    COUNTER_SET(usage_counter, consumption());
210
0
    if (limit() >= 0) {
211
0
        RuntimeProfile::Counter* limit_counter =
212
0
                ADD_COUNTER(profile_snapshot, "Limit", TUnit::BYTES);
213
0
        COUNTER_SET(limit_counter, _limit);
214
0
    }
215
0
    if (reserved_peak_consumption() != 0) {
216
0
        RuntimeProfile::HighWaterMarkCounter* reserved_counter =
217
0
                profile_snapshot->AddHighWaterMarkCounter("ReservedMemory", TUnit::BYTES);
218
0
        COUNTER_SET(reserved_counter, reserved_peak_consumption());
219
0
        COUNTER_SET(reserved_counter, reserved_consumption());
220
0
    }
221
0
    return profile_snapshot;
222
0
}
223
224
0
std::string MemTrackerLimiter::make_profile_str() const {
225
0
    std::unique_ptr<RuntimeProfile> profile_snapshot =
226
0
            std::make_unique<RuntimeProfile>("MemTrackerSnapshot");
227
0
    make_profile(profile_snapshot.get());
228
0
    std::stringstream ss;
229
0
    profile_snapshot->pretty_print(&ss);
230
0
    return ss.str();
231
0
}
232
233
0
void MemTrackerLimiter::clean_tracker_limiter_group() {
234
#ifndef BE_TEST
235
    if (ExecEnv::tracking_memory()) {
236
        for (auto& group : ExecEnv::GetInstance()->mem_tracker_limiter_pool) {
237
            std::lock_guard<std::mutex> l(group.group_lock);
238
            auto it = group.trackers.begin();
239
            while (it != group.trackers.end()) {
240
                if ((*it).expired()) {
241
                    it = group.trackers.erase(it);
242
                } else {
243
                    ++it;
244
                }
245
            }
246
        }
247
    }
248
#endif
249
0
}
250
251
void MemTrackerLimiter::make_type_trackers_profile(RuntimeProfile* profile,
252
0
                                                   MemTrackerLimiter::Type type) {
253
0
    if (type == Type::GLOBAL) {
254
0
        std::lock_guard<std::mutex> l(
255
0
                ExecEnv::GetInstance()->mem_tracker_limiter_pool[0].group_lock);
256
0
        for (auto trackerWptr : ExecEnv::GetInstance()->mem_tracker_limiter_pool[0].trackers) {
257
0
            auto tracker = trackerWptr.lock();
258
0
            if (tracker != nullptr) {
259
0
                tracker->make_profile(profile);
260
0
            }
261
0
        }
262
0
    } else if (type == Type::METADATA) {
263
0
        std::lock_guard<std::mutex> l(
264
0
                ExecEnv::GetInstance()->mem_tracker_limiter_pool[1].group_lock);
265
0
        for (auto trackerWptr : ExecEnv::GetInstance()->mem_tracker_limiter_pool[1].trackers) {
266
0
            auto tracker = trackerWptr.lock();
267
0
            if (tracker != nullptr) {
268
0
                tracker->make_profile(profile);
269
0
            }
270
0
        }
271
0
    } else if (type == Type::CACHE) {
272
0
        std::lock_guard<std::mutex> l(
273
0
                ExecEnv::GetInstance()->mem_tracker_limiter_pool[2].group_lock);
274
0
        for (auto trackerWptr : ExecEnv::GetInstance()->mem_tracker_limiter_pool[2].trackers) {
275
0
            auto tracker = trackerWptr.lock();
276
0
            if (tracker != nullptr) {
277
0
                tracker->make_profile(profile);
278
0
            }
279
0
        }
280
0
    } else {
281
0
        for (unsigned i = 3; i < ExecEnv::GetInstance()->mem_tracker_limiter_pool.size(); ++i) {
282
0
            std::lock_guard<std::mutex> l(
283
0
                    ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].group_lock);
284
0
            for (auto trackerWptr : ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].trackers) {
285
0
                auto tracker = trackerWptr.lock();
286
0
                if (tracker != nullptr && tracker->type() == type) {
287
0
                    tracker->make_profile(profile);
288
0
                }
289
0
            }
290
0
        }
291
0
    }
292
0
}
293
294
0
std::string MemTrackerLimiter::make_type_trackers_profile_str(MemTrackerLimiter::Type type) {
295
0
    std::unique_ptr<RuntimeProfile> profile_snapshot =
296
0
            std::make_unique<RuntimeProfile>("TypeMemTrackersSnapshot");
297
0
    make_type_trackers_profile(profile_snapshot.get(), type);
298
0
    std::stringstream ss;
299
0
    profile_snapshot->pretty_print(&ss);
300
0
    return ss.str();
301
0
}
302
303
void MemTrackerLimiter::make_top_consumption_tasks_tracker_profile(RuntimeProfile* profile,
304
0
                                                                   int top_num) {
305
0
    std::unique_ptr<RuntimeProfile> tmp_profile_snapshot =
306
0
            std::make_unique<RuntimeProfile>("tmpSnapshot");
307
0
    std::priority_queue<std::pair<int64_t, RuntimeProfile*>> max_pq;
308
    // start from 3, not include global/metadata/cache type.
309
0
    for (unsigned i = 3; i < ExecEnv::GetInstance()->mem_tracker_limiter_pool.size(); ++i) {
310
0
        std::lock_guard<std::mutex> l(
311
0
                ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].group_lock);
312
0
        for (auto trackerWptr : ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].trackers) {
313
0
            auto tracker = trackerWptr.lock();
314
0
            if (tracker != nullptr) {
315
0
                auto* profile_snapshot = tracker->make_profile(tmp_profile_snapshot.get());
316
0
                max_pq.emplace(tracker->consumption(), profile_snapshot);
317
0
            }
318
0
        }
319
0
    }
320
321
0
    while (!max_pq.empty() && top_num > 0) {
322
0
        RuntimeProfile* profile_snapshot =
323
0
                profile->create_child(max_pq.top().second->name(), true, false);
324
0
        profile_snapshot->merge(max_pq.top().second);
325
0
        top_num--;
326
0
        max_pq.pop();
327
0
    }
328
0
}
329
330
0
void MemTrackerLimiter::make_all_tasks_tracker_profile(RuntimeProfile* profile) {
331
0
    std::unordered_map<Type, RuntimeProfile*> types_profile;
332
0
    types_profile[Type::QUERY] = profile->create_child("QueryTasks", true, false);
333
0
    types_profile[Type::LOAD] = profile->create_child("LoadTasks", true, false);
334
0
    types_profile[Type::COMPACTION] = profile->create_child("CompactionTasks", true, false);
335
0
    types_profile[Type::SCHEMA_CHANGE] = profile->create_child("SchemaChangeTasks", true, false);
336
0
    types_profile[Type::OTHER] = profile->create_child("OtherTasks", true, false);
337
338
    // start from 3, not include global/metadata/cache type.
339
0
    for (unsigned i = 3; i < ExecEnv::GetInstance()->mem_tracker_limiter_pool.size(); ++i) {
340
0
        std::lock_guard<std::mutex> l(
341
0
                ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].group_lock);
342
0
        for (auto trackerWptr : ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].trackers) {
343
0
            auto tracker = trackerWptr.lock();
344
0
            if (tracker != nullptr) {
345
                // ResultBlockBufferBase will continue to exist for 5 minutes after the query ends, even if the
346
                // result buffer is empty, and will not be shown in the profile. of course, this code is tricky.
347
0
                if (tracker->consumption() == 0 &&
348
0
                    tracker->label().starts_with("ResultBlockBuffer")) {
349
0
                    continue;
350
0
                }
351
0
                tracker->make_profile(types_profile[tracker->type()]);
352
0
            }
353
0
        }
354
0
    }
355
0
}
356
357
0
void MemTrackerLimiter::print_log_usage(const std::string& msg) {
358
0
    if (_enable_print_log_usage) {
359
0
        _enable_print_log_usage = false;
360
0
        std::string detail = msg;
361
0
        detail += "\nProcess Memory Summary: " + GlobalMemoryArbitrator::process_mem_log_str();
362
0
        detail += "\n" + make_profile_str();
363
0
        LOG(WARNING) << detail;
364
0
    }
365
0
}
366
367
0
std::string MemTrackerLimiter::tracker_limit_exceeded_str() {
368
0
    std::string err_msg = fmt::format(
369
0
            "memory tracker limit exceeded, tracker label:{}, type:{}, limit "
370
0
            "{}, peak used {}, current used {}. backend {}, {}.",
371
0
            label(), type_string(_type), PrettyPrinter::print_bytes(limit()),
372
0
            PrettyPrinter::print_bytes(peak_consumption()),
373
0
            PrettyPrinter::print_bytes(consumption()), BackendOptions::get_localhost(),
374
0
            GlobalMemoryArbitrator::process_memory_used_str());
375
0
    if (_type == Type::QUERY || _type == Type::LOAD) {
376
0
        err_msg += fmt::format(
377
0
                " exec node:<{}>, can `set exec_mem_limit` to change limit, details see be.INFO.",
378
0
                doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker_label());
379
0
    } else if (_type == Type::SCHEMA_CHANGE) {
380
0
        err_msg += fmt::format(
381
0
                " can modify `memory_limitation_per_thread_for_schema_change_bytes` in be.conf to "
382
0
                "change limit, details see be.INFO.");
383
0
    }
384
0
    return err_msg;
385
0
}
386
387
#include "common/compile_check_end.h"
388
} // namespace doris