Coverage Report

Created: 2024-11-20 19:28

/root/doris/be/src/util/mem_info.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/mem-info.cc
19
// and modified by Doris
20
21
#include "mem_info.h"
22
23
#include "gutil/strings/split.h"
24
25
#ifdef __APPLE__
26
#include <sys/sysctl.h>
27
#endif
28
29
#include <bvar/bvar.h>
30
#include <fmt/format.h>
31
#include <gen_cpp/Metrics_types.h>
32
#include <gen_cpp/segment_v2.pb.h>
33
#include <jemalloc/jemalloc.h>
34
35
#include <algorithm>
36
#include <boost/algorithm/string/trim.hpp>
37
#include <fstream>
38
#include <unordered_map>
39
40
#include "common/cgroup_memory_ctl.h"
41
#include "common/config.h"
42
#include "common/status.h"
43
#include "runtime/memory/global_memory_arbitrator.h"
44
#include "util/cgroup_util.h"
45
#include "util/parse_util.h"
46
#include "util/pretty_printer.h"
47
#include "util/string_parser.hpp"
48
49
namespace doris {
50
51
static bvar::Adder<int64_t> memory_jemalloc_cache_bytes("memory_jemalloc_cache_bytes");
52
static bvar::Adder<int64_t> memory_jemalloc_dirty_pages_bytes("memory_jemalloc_dirty_pages_bytes");
53
static bvar::Adder<int64_t> memory_jemalloc_metadata_bytes("memory_jemalloc_metadata_bytes");
54
static bvar::Adder<int64_t> memory_jemalloc_virtual_bytes("memory_jemalloc_virtual_bytes");
55
static bvar::Adder<int64_t> memory_cgroup_usage_bytes("memory_cgroup_usage_bytes");
56
static bvar::Adder<int64_t> memory_sys_available_bytes("memory_sys_available_bytes");
57
static bvar::Adder<int64_t> memory_arbitrator_sys_available_bytes(
58
        "memory_arbitrator_sys_available_bytes");
59
static bvar::Adder<int64_t> memory_arbitrator_process_usage_bytes(
60
        "memory_arbitrator_process_usage_bytes");
61
static bvar::Adder<int64_t> memory_arbitrator_reserve_memory_bytes(
62
        "memory_arbitrator_reserve_memory_bytes");
63
static bvar::Adder<int64_t> memory_arbitrator_refresh_interval_growth_bytes(
64
        "memory_arbitrator_refresh_interval_growth_bytes");
65
66
bool MemInfo::_s_initialized = false;
67
std::atomic<int64_t> MemInfo::_s_physical_mem = std::numeric_limits<int64_t>::max();
68
std::atomic<int64_t> MemInfo::_s_mem_limit = std::numeric_limits<int64_t>::max();
69
std::atomic<int64_t> MemInfo::_s_soft_mem_limit = std::numeric_limits<int64_t>::max();
70
71
std::atomic<int64_t> MemInfo::_s_allocator_cache_mem = 0;
72
std::atomic<int64_t> MemInfo::_s_allocator_metadata_mem = 0;
73
std::atomic<int64_t> MemInfo::_s_je_dirty_pages_mem = std::numeric_limits<int64_t>::min();
74
std::atomic<int64_t> MemInfo::_s_je_dirty_pages_mem_limit = std::numeric_limits<int64_t>::max();
75
std::atomic<int64_t> MemInfo::_s_virtual_memory_used = 0;
76
77
int64_t MemInfo::_s_cgroup_mem_limit = std::numeric_limits<int64_t>::max();
78
int64_t MemInfo::_s_cgroup_mem_usage = std::numeric_limits<int64_t>::min();
79
bool MemInfo::_s_cgroup_mem_refresh_state = false;
80
int64_t MemInfo::_s_cgroup_mem_refresh_wait_times = 0;
81
82
static std::unordered_map<std::string, int64_t> _mem_info_bytes;
83
std::atomic<int64_t> MemInfo::_s_sys_mem_available = -1;
84
int64_t MemInfo::_s_sys_mem_available_low_water_mark = std::numeric_limits<int64_t>::min();
85
int64_t MemInfo::_s_sys_mem_available_warning_water_mark = std::numeric_limits<int64_t>::min();
86
std::atomic<int64_t> MemInfo::_s_process_minor_gc_size = -1;
87
std::atomic<int64_t> MemInfo::_s_process_full_gc_size = -1;
88
std::mutex MemInfo::je_purge_dirty_pages_lock;
89
std::condition_variable MemInfo::je_purge_dirty_pages_cv;
90
std::atomic<bool> MemInfo::je_purge_dirty_pages_notify {false};
91
92
0
void MemInfo::refresh_allocator_mem() {
93
0
#if defined(ADDRESS_SANITIZER) || defined(LEAK_SANITIZER) || defined(THREAD_SANITIZER)
94
#elif defined(USE_JEMALLOC)
95
    // jemalloc mallctl refer to : https://jemalloc.net/jemalloc.3.html
96
    // https://www.bookstack.cn/read/aliyun-rds-core/4a0cdf677f62feb3.md
97
    //  Check the Doris BE web page `http://ip:webserver_port/memz` to get the Jemalloc Profile.
98
99
    // 'epoch' is a special mallctl -- it updates the statistics. Without it, all
100
    // the following calls will return stale values. It increments and returns
101
    // the current epoch number, which might be useful to log as a sanity check.
102
    uint64_t epoch = 0;
103
    size_t sz = sizeof(epoch);
104
    jemallctl("epoch", &epoch, &sz, &epoch, sz);
105
106
    // Number of extents of the given type in this arena in the bucket corresponding to page size index.
107
    // Large size class starts at 16384, the extents have three sizes before 16384: 4096, 8192, and 12288, so + 3
108
    int64_t dirty_pages_bytes = 0;
109
    for (unsigned i = 0; i < get_je_unsigned_metrics("arenas.nlextents") + 3; i++) {
110
        dirty_pages_bytes += get_je_all_arena_extents_metrics(i, "dirty_bytes");
111
    }
112
    _s_je_dirty_pages_mem.store(dirty_pages_bytes, std::memory_order_relaxed);
113
114
    // Doris uses Jemalloc as default Allocator, Jemalloc Cache consists of two parts:
115
    // - Thread Cache, cache a specified number of Pages in Thread Cache.
116
    // - Dirty Page, memory Page that can be reused in all Arenas.
117
    _s_allocator_cache_mem.store(get_je_all_arena_metrics("tcache_bytes") + dirty_pages_bytes,
118
                                 std::memory_order_relaxed);
119
    // Total number of bytes dedicated to metadata, which comprise base allocations used
120
    // for bootstrap-sensitive allocator metadata structures.
121
    _s_allocator_metadata_mem.store(get_je_metrics("stats.metadata"), std::memory_order_relaxed);
122
    _s_virtual_memory_used.store(get_je_metrics("stats.mapped"), std::memory_order_relaxed);
123
#else
124
    _s_allocator_cache_mem.store(get_tc_metrics("tcmalloc.pageheap_free_bytes") +
125
                                         get_tc_metrics("tcmalloc.central_cache_free_bytes") +
126
                                         get_tc_metrics("tcmalloc.transfer_cache_free_bytes") +
127
                                         get_tc_metrics("tcmalloc.thread_cache_free_bytes"),
128
                                 std::memory_order_relaxed);
129
    _s_virtual_memory_used.store(get_tc_metrics("generic.total_physical_bytes") +
130
                                         get_tc_metrics("tcmalloc.pageheap_unmapped_bytes"),
131
                                 std::memory_order_relaxed);
132
#endif
133
0
}
134
135
0
void MemInfo::refresh_memory_bvar() {
136
0
    memory_jemalloc_cache_bytes << MemInfo::allocator_cache_mem() -
137
0
                                           memory_jemalloc_cache_bytes.get_value();
138
0
    memory_jemalloc_dirty_pages_bytes
139
0
            << MemInfo::je_dirty_pages_mem() - memory_jemalloc_dirty_pages_bytes.get_value();
140
0
    memory_jemalloc_metadata_bytes
141
0
            << MemInfo::allocator_metadata_mem() - memory_jemalloc_metadata_bytes.get_value();
142
0
    memory_jemalloc_virtual_bytes << MemInfo::allocator_virtual_mem() -
143
0
                                             memory_jemalloc_virtual_bytes.get_value();
144
145
0
    memory_cgroup_usage_bytes << _s_cgroup_mem_usage - memory_cgroup_usage_bytes.get_value();
146
0
    memory_sys_available_bytes << _s_sys_mem_available - memory_sys_available_bytes.get_value();
147
148
0
    memory_arbitrator_sys_available_bytes
149
0
            << GlobalMemoryArbitrator::sys_mem_available() -
150
0
                       memory_arbitrator_sys_available_bytes.get_value();
151
0
    memory_arbitrator_process_usage_bytes
152
0
            << GlobalMemoryArbitrator::process_memory_usage() -
153
0
                       memory_arbitrator_process_usage_bytes.get_value();
154
0
    memory_arbitrator_reserve_memory_bytes
155
0
            << GlobalMemoryArbitrator::process_reserved_memory() -
156
0
                       memory_arbitrator_reserve_memory_bytes.get_value();
157
0
    memory_arbitrator_refresh_interval_growth_bytes
158
0
            << GlobalMemoryArbitrator::refresh_interval_memory_growth -
159
0
                       memory_arbitrator_refresh_interval_growth_bytes.get_value();
160
0
}
161
162
#ifndef __APPLE__
163
1
void MemInfo::refresh_proc_meminfo() {
164
1
    std::ifstream meminfo("/proc/meminfo", std::ios::in);
165
1
    std::string line;
166
167
55
    while (meminfo.good() && !meminfo.eof()) {
168
54
        getline(meminfo, line);
169
54
        std::vector<std::string> fields = strings::Split(line, " ", strings::SkipWhitespace());
170
54
        if (fields.size() < 2) {
171
1
            continue;
172
1
        }
173
53
        std::string key = fields[0].substr(0, fields[0].size() - 1);
174
175
53
        StringParser::ParseResult result;
176
53
        auto mem_value =
177
53
                StringParser::string_to_int<int64_t>(fields[1].data(), fields[1].size(), &result);
178
179
53
        if (result == StringParser::PARSE_SUCCESS) {
180
53
            if (fields.size() == 2) {
181
4
                _mem_info_bytes[key] = mem_value;
182
49
            } else if (fields[2] == "kB") {
183
49
                _mem_info_bytes[key] = mem_value * 1024L;
184
49
            }
185
53
        }
186
53
    }
187
1
    if (meminfo.is_open()) {
188
1
        meminfo.close();
189
1
    }
190
191
    // refresh cgroup memory
192
1
    if (config::enable_use_cgroup_memory_info) {
193
1
        if (_s_cgroup_mem_refresh_wait_times >= 0) {
194
1
            auto status = CGroupMemoryCtl::find_cgroup_mem_limit(&_s_cgroup_mem_limit);
195
1
            if (!status.ok()) {
196
0
                _s_cgroup_mem_limit = std::numeric_limits<int64_t>::max();
197
                // find cgroup limit failed, wait 300s, 1000 * 100ms.
198
0
                _s_cgroup_mem_refresh_wait_times = -3000;
199
0
                LOG(INFO) << "Refresh cgroup memory limit failed, refresh again after 300s, cgroup "
200
0
                             "mem limit: "
201
0
                          << _s_cgroup_mem_limit;
202
1
            } else {
203
                // wait 10s, 100 * 100ms, avoid too frequently.
204
1
                _s_cgroup_mem_refresh_wait_times = -100;
205
1
            }
206
1
        } else {
207
0
            _s_cgroup_mem_refresh_wait_times++;
208
0
        }
209
210
1
        if (_s_cgroup_mem_limit != std::numeric_limits<int64_t>::max()) {
211
1
            auto status = CGroupMemoryCtl::find_cgroup_mem_usage(&_s_cgroup_mem_usage);
212
1
            if (!status.ok()) {
213
0
                _s_cgroup_mem_usage = std::numeric_limits<int64_t>::min();
214
0
                _s_cgroup_mem_refresh_state = false;
215
1
            } else {
216
1
                _s_cgroup_mem_refresh_state = true;
217
1
            }
218
1
        } else {
219
0
            _s_cgroup_mem_refresh_state = false;
220
0
        }
221
1
    } else {
222
0
        _s_cgroup_mem_refresh_state = false;
223
0
    }
224
225
    // 1. calculate physical_mem
226
1
    int64_t physical_mem = -1;
227
228
1
    physical_mem = _mem_info_bytes["MemTotal"];
229
1
    if (_s_cgroup_mem_refresh_state) {
230
        // In theory, always cgroup_mem_limit < physical_mem
231
1
        if (physical_mem < 0) {
232
0
            physical_mem = _s_cgroup_mem_limit;
233
1
        } else {
234
1
            physical_mem = std::min(physical_mem, _s_cgroup_mem_limit);
235
1
        }
236
1
    }
237
238
1
    if (physical_mem <= 0) {
239
0
        LOG(WARNING)
240
0
                << "Could not determine amount of physical memory on this machine, physical_mem: "
241
0
                << physical_mem;
242
0
    }
243
244
    // 2. if physical_mem changed, refresh mem limit and gc size.
245
1
    if (physical_mem > 0 && _s_physical_mem.load(std::memory_order_relaxed) != physical_mem) {
246
1
        _s_physical_mem.store(physical_mem);
247
248
1
        bool is_percent = true;
249
1
        _s_mem_limit.store(
250
1
                ParseUtil::parse_mem_spec(config::mem_limit, -1, _s_physical_mem, &is_percent));
251
1
        if (_s_mem_limit <= 0) {
252
0
            LOG(WARNING) << "Failed to parse mem limit from '" + config::mem_limit + "'.";
253
0
        }
254
1
        if (_s_mem_limit > _s_physical_mem) {
255
0
            LOG(WARNING) << "Memory limit " << PrettyPrinter::print(_s_mem_limit, TUnit::BYTES)
256
0
                         << " exceeds physical memory of "
257
0
                         << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES)
258
0
                         << ". Using physical memory instead";
259
0
            _s_mem_limit.store(_s_physical_mem);
260
0
        }
261
1
        _s_soft_mem_limit.store(int64_t(_s_mem_limit * config::soft_mem_limit_frac));
262
263
1
        _s_process_minor_gc_size.store(ParseUtil::parse_mem_spec(config::process_minor_gc_size, -1,
264
1
                                                                 _s_mem_limit, &is_percent));
265
1
        _s_process_full_gc_size.store(ParseUtil::parse_mem_spec(config::process_full_gc_size, -1,
266
1
                                                                _s_mem_limit, &is_percent));
267
1
        _s_je_dirty_pages_mem_limit.store(ParseUtil::parse_mem_spec(
268
1
                config::je_dirty_pages_mem_limit_percent, -1, _s_mem_limit, &is_percent));
269
1
    }
270
271
    // 3. refresh process available memory
272
1
    int64_t mem_available = -1;
273
1
    if (_mem_info_bytes.find("MemAvailable") != _mem_info_bytes.end()) {
274
1
        mem_available = _mem_info_bytes["MemAvailable"];
275
1
    }
276
1
    if (_s_cgroup_mem_refresh_state) {
277
1
        if (mem_available < 0) {
278
0
            mem_available = _s_cgroup_mem_limit - _s_cgroup_mem_usage;
279
1
        } else {
280
1
            mem_available = std::min(mem_available, _s_cgroup_mem_limit - _s_cgroup_mem_usage);
281
1
        }
282
1
    }
283
1
    if (mem_available < 0) {
284
0
        LOG(WARNING) << "Failed to get available memory, set MAX_INT.";
285
0
        mem_available = std::numeric_limits<int64_t>::max();
286
0
    }
287
1
    if (_s_sys_mem_available.load(std::memory_order_relaxed) != mem_available) {
288
1
        _s_sys_mem_available.store(mem_available);
289
1
    }
290
1
}
291
292
1
void MemInfo::init() {
293
1
    refresh_proc_meminfo();
294
295
1
    std::string line;
296
1
    int64_t _s_vm_min_free_kbytes = 0;
297
1
    std::ifstream vminfo("/proc/sys/vm/min_free_kbytes", std::ios::in);
298
1
    if (vminfo.good() && !vminfo.eof()) {
299
1
        getline(vminfo, line);
300
1
        boost::algorithm::trim(line);
301
1
        StringParser::ParseResult result;
302
1
        auto mem_value = StringParser::string_to_int<int64_t>(line.data(), line.size(), &result);
303
304
1
        if (result == StringParser::PARSE_SUCCESS) {
305
1
            _s_vm_min_free_kbytes = mem_value * 1024L;
306
1
        }
307
1
    }
308
1
    if (vminfo.is_open()) {
309
1
        vminfo.close();
310
1
    }
311
312
    // Redhat 4.x OS, `/proc/meminfo` has no `MemAvailable`.
313
1
    if (_mem_info_bytes.find("MemAvailable") != _mem_info_bytes.end()) {
314
        // MemAvailable = MemFree - LowWaterMark + (PageCache - min(PageCache / 2, LowWaterMark))
315
        // LowWaterMark = /proc/sys/vm/min_free_kbytes
316
        // Ref:
317
        // https://serverfault.com/questions/940196/why-is-memavailable-a-lot-less-than-memfreebufferscached
318
        // https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=34e431b0ae398fc54ea69ff85ec700722c9da773
319
        //
320
        // upper sys_mem_available_low_water_mark, avoid wasting too much memory.
321
1
        _s_sys_mem_available_low_water_mark = std::max<int64_t>(
322
1
                std::min<int64_t>(std::min<int64_t>(_s_physical_mem - _s_mem_limit,
323
1
                                                    int64_t(_s_physical_mem * 0.05)),
324
1
                                  config::max_sys_mem_available_low_water_mark_bytes),
325
1
                0);
326
1
        _s_sys_mem_available_warning_water_mark = _s_sys_mem_available_low_water_mark * 2;
327
1
    }
328
329
1
    std::ifstream sys_transparent_hugepage("/sys/kernel/mm/transparent_hugepage/enabled",
330
1
                                           std::ios::in);
331
1
    std::string hugepage_enable;
332
    // If file not exist, getline returns an empty string.
333
1
    getline(sys_transparent_hugepage, hugepage_enable);
334
1
    if (sys_transparent_hugepage.is_open()) {
335
1
        sys_transparent_hugepage.close();
336
1
    }
337
1
    if (hugepage_enable == "[always] madvise never") {
338
0
        std::cout << "[WARNING!] /sys/kernel/mm/transparent_hugepage/enabled: " << hugepage_enable
339
0
                  << ", Doris not recommend turning on THP, which may cause the BE process to use "
340
0
                     "more memory and cannot be freed in time. Turn off THP: `echo madvise | sudo "
341
0
                     "tee /sys/kernel/mm/transparent_hugepage/enabled`"
342
0
                  << std::endl;
343
0
    }
344
345
    // Expect vm overcommit memory value to be 1, system will no longer throw bad_alloc, memory alloc are always accepted,
346
    // memory limit check is handed over to Doris Allocator, make sure throw exception position is controllable,
347
    // otherwise bad_alloc can be thrown anywhere and it will be difficult to achieve exception safety.
348
1
    std::ifstream sys_vm("/proc/sys/vm/overcommit_memory", std::ios::in);
349
1
    std::string vm_overcommit;
350
1
    getline(sys_vm, vm_overcommit);
351
1
    if (sys_vm.is_open()) {
352
1
        sys_vm.close();
353
1
    }
354
1
    if (!vm_overcommit.empty() && std::stoi(vm_overcommit) == 2) {
355
0
        std::cout << "[WARNING!] /proc/sys/vm/overcommit_memory: " << vm_overcommit
356
0
                  << ", expect is 1, memory limit check is handed over to Doris Allocator, "
357
0
                     "otherwise BE may crash even with remaining memory"
358
0
                  << std::endl;
359
0
    }
360
361
1
    LOG(INFO) << "Physical Memory: " << _mem_info_bytes["MemTotal"]
362
1
              << ", BE Available Physical Memory(consider cgroup): "
363
1
              << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES) << ", Mem Limit: "
364
1
              << PrettyPrinter::print(_s_mem_limit.load(std::memory_order_relaxed), TUnit::BYTES)
365
1
              << ", origin config value: " << config::mem_limit
366
1
              << ", System Mem Available Min Reserve: "
367
1
              << PrettyPrinter::print(_s_sys_mem_available_low_water_mark, TUnit::BYTES)
368
1
              << ", Vm Min Free KBytes: "
369
1
              << PrettyPrinter::print(_s_vm_min_free_kbytes, TUnit::BYTES)
370
1
              << ", Vm Overcommit Memory: " << vm_overcommit;
371
1
    _s_initialized = true;
372
1
}
373
#else
374
void MemInfo::refresh_proc_meminfo() {}
375
376
void MemInfo::init() {
377
    size_t size = sizeof(_s_physical_mem);
378
    if (sysctlbyname("hw.memsize", &_s_physical_mem, &size, nullptr, 0) != 0) {
379
        LOG(WARNING) << "Could not determine amount of physical memory on this machine.";
380
        _s_physical_mem = -1;
381
    }
382
383
    bool is_percent = true;
384
    _s_mem_limit = ParseUtil::parse_mem_spec(config::mem_limit, -1, _s_physical_mem, &is_percent);
385
    _s_soft_mem_limit = static_cast<int64_t>(_s_mem_limit * config::soft_mem_limit_frac);
386
387
    LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES);
388
    _s_initialized = true;
389
}
390
#endif
391
392
0
std::string MemInfo::debug_string() {
393
0
    DCHECK(_s_initialized);
394
0
    std::stringstream stream;
395
0
    stream << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES)
396
0
           << std::endl;
397
0
    stream << "Memory Limt: " << PrettyPrinter::print(_s_mem_limit, TUnit::BYTES) << std::endl;
398
0
    stream << "CGroup Info: " << doris::CGroupMemoryCtl::debug_string() << std::endl;
399
0
    return stream.str();
400
0
}
401
402
} // namespace doris