Coverage Report

Created: 2026-03-15 08:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/memtable/memtable_memory_limiter.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/memtable/memtable_memory_limiter.h"
19
20
#include <bvar/bvar.h>
21
22
#include "common/config.h"
23
#include "common/metrics/doris_metrics.h"
24
#include "common/metrics/metrics.h"
25
#include "load/memtable/memtable.h"
26
#include "load/memtable/memtable_writer.h"
27
#include "runtime/workload_group/workload_group_manager.h"
28
#include "util/mem_info.h"
29
30
namespace doris {
31
DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(memtable_memory_limiter_mem_consumption, MetricUnit::BYTES, "",
32
                                   memtable_memory_limiter_mem_consumption,
33
                                   Labels({{"type", "load"}}));
34
35
bvar::LatencyRecorder g_memtable_memory_limit_latency_ms("mm_limiter_limit_time_ms");
36
bvar::Adder<int> g_memtable_memory_limit_waiting_threads("mm_limiter_waiting_threads");
37
bvar::Status<int64_t> g_memtable_active_memory("mm_limiter_mem_active", 0);
38
bvar::Status<int64_t> g_memtable_write_memory("mm_limiter_mem_write", 0);
39
bvar::Status<int64_t> g_memtable_flush_memory("mm_limiter_mem_flush", 0);
40
bvar::Status<int64_t> g_memtable_load_memory("mm_limiter_mem_load", 0);
41
bvar::Status<int64_t> g_load_hard_mem_limit("mm_limiter_limit_hard", 0);
42
bvar::Status<int64_t> g_load_soft_mem_limit("mm_limiter_limit_soft", 0);
43
bvar::Adder<uint64_t> g_flush_cuz_load_mem_exceed_hard_limit("flush_cuz_hard_limit");
44
bvar::Adder<uint64_t> g_flush_cuz_sys_mem_exceed_soft_limit("flush_cuz_soft_limit");
45
bvar::Adder<int> g_memtable_memory_limit_flush_memtable_count("mm_limiter_flush_memtable_count");
46
bvar::LatencyRecorder g_memtable_memory_limit_flush_size_bytes("mm_limiter_flush_size_bytes");
47
48
// Calculate the total memory limit of all load tasks on this BE
49
1
static int64_t calc_process_max_load_memory(int64_t process_mem_limit) {
50
1
    if (process_mem_limit == -1) {
51
        // no limit
52
0
        return -1;
53
0
    }
54
1
    int32_t max_load_memory_percent = config::load_process_max_memory_limit_percent;
55
1
    return process_mem_limit * max_load_memory_percent / 100;
56
1
}
57
58
18
MemTableMemoryLimiter::MemTableMemoryLimiter() {}
59
60
18
MemTableMemoryLimiter::~MemTableMemoryLimiter() {
61
18
    DEREGISTER_HOOK_METRIC(memtable_memory_limiter_mem_consumption);
62
18
}
63
64
1
Status MemTableMemoryLimiter::init(int64_t process_mem_limit) {
65
1
    _load_hard_mem_limit = calc_process_max_load_memory(process_mem_limit);
66
1
    _load_soft_mem_limit = _load_hard_mem_limit * config::load_process_soft_mem_limit_percent / 100;
67
1
    _load_safe_mem_permit =
68
1
            _load_hard_mem_limit * config::load_process_safe_mem_permit_percent / 100;
69
1
    g_load_hard_mem_limit.set_value(_load_hard_mem_limit);
70
1
    g_load_soft_mem_limit.set_value(_load_soft_mem_limit);
71
1
    _mem_tracker = std::make_unique<MemTracker>("AllMemTableMemory");
72
1
    REGISTER_HOOK_METRIC(memtable_memory_limiter_mem_consumption,
73
1
                         [this]() { return _mem_tracker->consumption(); });
74
1
    _log_timer.start();
75
1
    return Status::OK();
76
1
}
77
78
15
void MemTableMemoryLimiter::register_writer(std::weak_ptr<MemTableWriter> writer) {
79
15
    std::lock_guard<std::mutex> l(_lock);
80
15
    _writers.push_back(writer);
81
15
}
82
83
1
int64_t MemTableMemoryLimiter::_sys_avail_mem_less_than_warning_water_mark() {
84
    // reserve a small amount of memory so we do not trigger MinorGC
85
1
    return doris::MemInfo::sys_mem_available_warning_water_mark() -
86
1
           doris::GlobalMemoryArbitrator::sys_mem_available() +
87
1
           config::memtable_limiter_reserved_memory_bytes;
88
1
}
89
90
1
int64_t MemTableMemoryLimiter::_process_used_mem_more_than_soft_mem_limit() {
91
    // reserve a small amount of memory so we do not trigger MinorGC
92
1
    return GlobalMemoryArbitrator::process_memory_usage() - MemInfo::soft_mem_limit() +
93
1
           config::memtable_limiter_reserved_memory_bytes;
94
1
}
95
96
1
bool MemTableMemoryLimiter::_soft_limit_reached() {
97
1
    return _mem_tracker->consumption() > _load_soft_mem_limit || _hard_limit_reached();
98
1
}
99
100
1
bool MemTableMemoryLimiter::_hard_limit_reached() {
101
1
    return _mem_tracker->consumption() > _load_hard_mem_limit ||
102
1
           _sys_avail_mem_less_than_warning_water_mark() > 0 ||
103
1
           _process_used_mem_more_than_soft_mem_limit() > 0;
104
1
}
105
106
0
bool MemTableMemoryLimiter::_load_usage_low() {
107
0
    return _mem_tracker->consumption() <= _load_safe_mem_permit;
108
0
}
109
110
0
int64_t MemTableMemoryLimiter::_need_flush() {
111
0
    DBUG_EXECUTE_IF("MemTableMemoryLimiter._need_flush.random_flush", {
112
0
        if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
113
0
            LOG(INFO) << "debug memtable need flush return 1";
114
0
            return 1;
115
0
        }
116
0
    });
117
0
    int64_t limit1 = _mem_tracker->consumption() - _load_soft_mem_limit;
118
0
    int64_t limit2 = _sys_avail_mem_less_than_warning_water_mark();
119
0
    int64_t limit3 = _process_used_mem_more_than_soft_mem_limit();
120
0
    int64_t need_flush = std::max({limit1, limit2, limit3});
121
0
    return need_flush - _queue_mem_usage - _flush_mem_usage;
122
0
}
123
124
1
void MemTableMemoryLimiter::handle_memtable_flush(std::function<bool()> cancel_check) {
125
    // Check the soft limit.
126
1
    DCHECK(_load_soft_mem_limit > 0);
127
1
    do {
128
1
        DBUG_EXECUTE_IF("MemTableMemoryLimiter.handle_memtable_flush.limit_reached", {
129
1
            LOG(INFO) << "debug memtable limit reached";
130
1
            break;
131
1
        });
132
1
        if (!_soft_limit_reached() || _load_usage_low()) {
133
1
            return;
134
1
        }
135
1
    } while (false);
136
0
    MonotonicStopWatch timer;
137
0
    timer.start();
138
0
    std::unique_lock<std::mutex> l(_lock);
139
0
    g_memtable_memory_limit_waiting_threads << 1;
140
0
    bool first = true;
141
0
    do {
142
0
        if (!first) {
143
0
            auto st = _hard_limit_end_cond.wait_for(l, std::chrono::milliseconds(1000));
144
0
            if (st == std::cv_status::timeout) {
145
0
                LOG(INFO) << "timeout when waiting for memory hard limit end, try again";
146
0
            }
147
0
        }
148
0
        if (cancel_check && cancel_check()) {
149
0
            LOG(INFO) << "cancelled when waiting for memtable flush";
150
0
            g_memtable_memory_limit_waiting_threads << -1;
151
0
            return;
152
0
        }
153
0
        first = false;
154
0
        int64_t need_flush = _need_flush();
155
0
        if (need_flush > 0) {
156
0
            auto limit = _hard_limit_reached() ? Limit::HARD : Limit::SOFT;
157
0
            LOG(INFO) << "reached memtable memory " << (limit == Limit::HARD ? "hard" : "soft")
158
0
                      << ", " << GlobalMemoryArbitrator::process_memory_used_details_str() << ", "
159
0
                      << GlobalMemoryArbitrator::sys_mem_available_details_str()
160
0
                      << ", load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption())
161
0
                      << ", memtable writers num: " << _writers.size()
162
0
                      << ", active: " << PrettyPrinter::print_bytes(_active_mem_usage)
163
0
                      << ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage)
164
0
                      << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage)
165
0
                      << ", need flush: " << PrettyPrinter::print_bytes(need_flush);
166
0
            if (VLOG_DEBUG_IS_ON) {
167
0
                auto log_str = doris::ProcessProfile::instance()
168
0
                                       ->memory_profile()
169
0
                                       ->process_memory_detail_str();
170
0
                LOG_LONG_STRING(INFO, log_str);
171
0
            }
172
0
            if (limit == Limit::HARD) {
173
0
                g_flush_cuz_load_mem_exceed_hard_limit << 1;
174
0
            } else if (limit == Limit::SOFT) {
175
0
                g_flush_cuz_sys_mem_exceed_soft_limit << 1;
176
0
            } else {
177
                // will not reach here
178
0
            }
179
0
            _flush_active_memtables(need_flush);
180
0
        }
181
0
    } while (_hard_limit_reached() && !_load_usage_low());
182
0
    g_memtable_memory_limit_waiting_threads << -1;
183
0
    timer.stop();
184
0
    int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
185
0
    g_memtable_memory_limit_latency_ms << time_ms;
186
0
    if (time_ms > 0) {
187
0
        LOG(INFO) << "waited " << PrettyPrinter::print(timer.elapsed_time(), TUnit::TIME_NS)
188
0
                  << " for memtable memory limit"
189
0
                  << ", " << GlobalMemoryArbitrator::process_memory_used_details_str() << ", "
190
0
                  << GlobalMemoryArbitrator::sys_mem_available_details_str()
191
0
                  << ", load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption())
192
0
                  << ", memtable writers num: " << _writers.size()
193
0
                  << ", active: " << PrettyPrinter::print_bytes(_active_mem_usage)
194
0
                  << ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage)
195
0
                  << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage);
196
0
    }
197
0
}
198
199
0
int64_t MemTableMemoryLimiter::_flush_active_memtables(int64_t need_flush) {
200
0
    _refresh_mem_tracker();
201
0
    if (_active_writers.size() == 0) {
202
0
        return 0;
203
0
    }
204
205
0
    using WriterMem = std::pair<std::weak_ptr<MemTableWriter>, int64_t>;
206
0
    auto cmp = [](WriterMem left, WriterMem right) { return left.second < right.second; };
207
0
    std::priority_queue<WriterMem, std::vector<WriterMem>, decltype(cmp)> heap(cmp);
208
209
0
    for (auto writer : _active_writers) {
210
0
        auto w = writer.lock();
211
0
        if (w == nullptr) {
212
0
            continue;
213
0
        }
214
0
        heap.emplace(w, w->active_memtable_mem_consumption());
215
0
    }
216
217
0
    int64_t mem_flushed = 0;
218
0
    int64_t num_flushed = 0;
219
220
0
    while (mem_flushed < need_flush && !heap.empty()) {
221
0
        auto [writer, sort_mem] = heap.top();
222
0
        heap.pop();
223
0
        auto w = writer.lock();
224
0
        if (w == nullptr) {
225
0
            continue;
226
0
        }
227
228
0
        int64_t mem = w->active_memtable_mem_consumption();
229
0
        if (mem < sort_mem * 0.9) {
230
            // if the memtable writer just got flushed, don't flush it again
231
0
            continue;
232
0
        }
233
0
        Status st = w->flush_async();
234
0
        if (!st.ok()) {
235
0
            auto err_msg = fmt::format(
236
0
                    "tablet writer failed to reduce mem consumption by flushing memtable, "
237
0
                    "tablet_id={}, err={}",
238
0
                    w->tablet_id(), st.to_string());
239
0
            LOG(WARNING) << err_msg;
240
0
            static_cast<void>(w->cancel_with_status(st));
241
0
        }
242
0
        mem_flushed += mem;
243
0
        num_flushed += (mem > 0);
244
0
        g_memtable_memory_limit_flush_memtable_count << 1;
245
0
        g_memtable_memory_limit_flush_size_bytes << mem;
246
0
    }
247
0
    LOG(INFO) << "flushed " << num_flushed << " out of " << _active_writers.size()
248
0
              << " active writers, flushed size: " << PrettyPrinter::print_bytes(mem_flushed);
249
0
    return mem_flushed;
250
0
}
251
252
0
void MemTableMemoryLimiter::refresh_mem_tracker() {
253
0
    std::lock_guard<std::mutex> l(_lock);
254
0
    _refresh_mem_tracker();
255
0
    std::stringstream ss;
256
0
    Limit limit = Limit::NONE;
257
0
    if (_soft_limit_reached()) {
258
0
        limit = _hard_limit_reached() ? Limit::HARD : Limit::SOFT;
259
0
        ss << "reached " << (limit == Limit::HARD ? "hard" : "soft") << " limit";
260
0
    } else if (_last_limit == Limit::NONE) {
261
0
        return;
262
0
    } else {
263
0
        ss << "ended " << (_last_limit == Limit::HARD ? "hard" : "soft") << " limit";
264
0
    }
265
266
0
    if (_last_limit == limit && _log_timer.elapsed_time() < LOG_INTERVAL) {
267
0
        return;
268
0
    }
269
270
0
    _last_limit = limit;
271
0
    _log_timer.reset();
272
0
    LOG(INFO) << ss.str()
273
0
              << ", load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption())
274
0
              << ", memtable writers num: " << _writers.size()
275
0
              << ", active: " << PrettyPrinter::print_bytes(_active_mem_usage)
276
0
              << ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage)
277
0
              << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage);
278
0
    if (VLOG_DEBUG_IS_ON) {
279
0
        auto log_str =
280
0
                doris::ProcessProfile::instance()->memory_profile()->process_memory_detail_str();
281
0
        LOG_LONG_STRING(INFO, log_str);
282
0
    }
283
0
}
284
285
0
void MemTableMemoryLimiter::_refresh_mem_tracker() {
286
0
    _flush_mem_usage = 0;
287
0
    _queue_mem_usage = 0;
288
0
    _active_mem_usage = 0;
289
0
    _active_writers.clear();
290
0
    for (auto it = _writers.begin(); it != _writers.end();) {
291
0
        if (auto writer = it->lock()) {
292
            // The memtable is currently used by writer to insert blocks.
293
0
            auto active_usage = writer->active_memtable_mem_consumption();
294
0
            _active_mem_usage += active_usage;
295
0
            if (active_usage > 0) {
296
0
                _active_writers.push_back(writer);
297
0
            }
298
299
0
            auto flush_usage = writer->mem_consumption(MemType::FLUSH);
300
0
            _flush_mem_usage += flush_usage;
301
302
0
            auto write_usage = writer->mem_consumption(MemType::WRITE_FINISHED);
303
0
            _queue_mem_usage += write_usage;
304
0
            ++it;
305
0
        } else {
306
0
            *it = std::move(_writers.back());
307
0
            _writers.pop_back();
308
0
        }
309
0
    }
310
0
    _mem_usage = _active_mem_usage + _queue_mem_usage + _flush_mem_usage;
311
0
    g_memtable_active_memory.set_value(_active_mem_usage);
312
0
    g_memtable_write_memory.set_value(_queue_mem_usage);
313
0
    g_memtable_flush_memory.set_value(_flush_mem_usage);
314
0
    g_memtable_load_memory.set_value(_mem_usage);
315
0
    VLOG_DEBUG << "refreshed mem_tracker, num writers: " << _writers.size();
316
0
    _mem_tracker->set_consumption(_mem_usage);
317
0
    if (!_hard_limit_reached()) {
318
0
        _hard_limit_end_cond.notify_all();
319
0
    }
320
0
}
321
322
} // namespace doris