Coverage Report

Created: 2026-03-16 21:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/memory/memory_reclamation.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/memory/memory_reclamation.h"
19
20
#include <unordered_map>
21
22
#include "runtime/exec_env.h"
23
#include "runtime/memory/global_memory_arbitrator.h"
24
#include "runtime/memory/jemalloc_control.h"
25
#include "runtime/memory/mem_tracker_limiter.h"
26
#include "runtime/runtime_profile.h"
27
#include "runtime/runtime_query_statistics_mgr.h"
28
#include "runtime/workload_group/workload_group.h"
29
#include "util/mem_info.h"
30
#include "util/stopwatch.hpp"
31
32
namespace doris {
33
#include "common/compile_check_begin.h"
34
35
int64_t MemoryReclamation::revoke_tasks_memory(
36
        int64_t need_free_mem, const std::vector<std::shared_ptr<ResourceContext>>& resource_ctxs,
37
        const std::string& revoke_reason, RuntimeProfile* profile, PriorityCmpFunc priority_cmp,
38
27
        std::vector<FilterFunc> filters, ActionFunc action) {
39
27
    if (need_free_mem <= 0) {
40
1
        return 0;
41
1
    }
42
26
    auto process_mem_stat = GlobalMemoryArbitrator::process_mem_log_str();
43
26
    auto process_mem_stat_to_client = fmt::format(
44
26
            "os physical memory {}. {}, limit {}. {}, low water mark {}.",
45
26
            PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES),
46
26
            GlobalMemoryArbitrator::process_memory_used_str(), MemInfo::mem_limit_str(),
47
26
            GlobalMemoryArbitrator::sys_mem_available_str(),
48
26
            PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES));
49
26
    RuntimeProfile::Counter* request_revoke_tasks_counter =
50
26
            ADD_COUNTER(profile, "RequestRevokeTasksNum", TUnit::UNIT);
51
26
    RuntimeProfile::Counter* revocable_tasks_counter =
52
26
            ADD_COUNTER(profile, "RevocableTasksNum", TUnit::UNIT);
53
26
    RuntimeProfile::Counter* this_time_revoked_tasks_counter =
54
26
            ADD_COUNTER(profile, "ThisTimeRevokedTasksNum", TUnit::UNIT);
55
26
    RuntimeProfile::Counter* skip_cancelling_tasks_counter =
56
26
            ADD_COUNTER(profile, "SkipCancellingTasksNum", TUnit::UNIT);
57
26
    RuntimeProfile::Counter* keep_wait_cancelling_tasks_counter =
58
26
            ADD_COUNTER(profile, "KeepWaitCancellingTasksNum", TUnit::UNIT);
59
26
    RuntimeProfile::Counter* freed_memory_counter =
60
26
            ADD_COUNTER(profile, "FreedMemory", TUnit::BYTES);
61
26
    RuntimeProfile::Counter* filter_cost_time = ADD_TIMER(profile, "FilterCostTime");
62
26
    RuntimeProfile::Counter* revoke_cost_time = ADD_TIMER(profile, "revokeCostTime");
63
64
26
    std::priority_queue<std::pair<int64_t, std::shared_ptr<ResourceContext>>>
65
26
            revocable_resource_ctxs;
66
26
    std::vector<std::string> this_time_revoked_tasks;
67
26
    std::vector<std::string> skip_cancelling_tasks;
68
26
    std::vector<std::string> keep_wait_cancelling_tasks;
69
70
26
    auto config_str = fmt::format(
71
26
            "need free memory: {}, request revoke tasks: {}, revoke reason: {}, priority compare "
72
26
            "function: {}, filter function: {}, action function: {}. {}",
73
26
            need_free_mem, resource_ctxs.size(), revoke_reason,
74
26
            priority_cmp_func_string(priority_cmp), filter_func_string(filters),
75
26
            action_func_string(action), process_mem_stat);
76
26
    LOG(INFO) << fmt::format("[MemoryGC] start revoke_tasks_memory, {}.", config_str);
77
26
    Defer defer {[&]() {
78
26
        LOG(INFO) << fmt::format(
79
26
                "[MemoryGC] end revoke_tasks_memory, {}. freed memory: {}, "
80
26
                "revocable tasks: {}, this time revoked tasks: {}, consist of: [{}], call revoke "
81
26
                "func cost(us): {}."
82
26
                " some tasks is being canceled and has not been completed yet, among them, skip "
83
26
                "canceling tasks: {}(cancel cost too long, not counted in freed memory), consist "
84
26
                "of: [{}], keep wait canceling tasks: {}(counted in freed memory), consist of: "
85
26
                "[{}], filter cost(us): {}.",
86
26
                config_str, freed_memory_counter->value(), revocable_tasks_counter->value(),
87
26
                this_time_revoked_tasks_counter->value(), join(this_time_revoked_tasks, " | "),
88
26
                revoke_cost_time->value(), skip_cancelling_tasks_counter->value(),
89
26
                join(skip_cancelling_tasks, " | "), keep_wait_cancelling_tasks_counter->value(),
90
26
                join(keep_wait_cancelling_tasks, " | "), filter_cost_time->value());
91
26
    }};
92
93
26
    {
94
26
        SCOPED_TIMER(filter_cost_time);
95
68
        for (auto resource_ctx : resource_ctxs) {
96
68
            bool is_filtered = false;
97
68
            for (auto filter : filters) {
98
61
                if (!FilterFuncImpl[filter](resource_ctx.get())) {
99
6
                    is_filtered = true;
100
6
                    break;
101
6
                }
102
61
            }
103
68
            if (is_filtered) {
104
6
                continue;
105
6
            }
106
107
            // skip cancelling tasks
108
62
            if (resource_ctx->task_controller()->is_cancelled()) {
109
                // for the query being canceled,
110
                // if (current time - cancel start time) < 3s (revoke_memory_max_tolerance_ms), the query memory is counted in `freed_memory`,
111
                // and the query memory is expected to be released soon.
112
                // if > 3s, the query memory will not be counted in `freed_memory`,
113
                // and the query may be blocked during the cancel process. skip this query and continue to cancel other queries.
114
24
                if (MonotonicMillis() - resource_ctx->task_controller()->cancelled_time() >
115
24
                    config::revoke_memory_max_tolerance_ms) {
116
9
                    skip_cancelling_tasks.push_back(
117
9
                            resource_ctx->task_controller()->debug_string());
118
15
                } else {
119
15
                    keep_wait_cancelling_tasks.push_back(
120
15
                            resource_ctx->task_controller()->debug_string());
121
15
                    COUNTER_UPDATE(freed_memory_counter,
122
15
                                   resource_ctx->memory_context()->current_memory_bytes());
123
15
                }
124
24
                is_filtered = true;
125
24
            }
126
127
            // TODO, if ActionFunc::SPILL, should skip spilling tasks.
128
129
62
            if (is_filtered) {
130
24
                continue;
131
24
            }
132
38
            int64_t weight = PriorityCmpFuncImpl[priority_cmp](resource_ctx.get());
133
38
            if (weight != -1) {
134
25
                revocable_resource_ctxs.emplace(weight, resource_ctx);
135
25
            }
136
38
        }
137
26
    }
138
139
26
    COUNTER_UPDATE(request_revoke_tasks_counter, resource_ctxs.size());
140
26
    COUNTER_UPDATE(revocable_tasks_counter, revocable_resource_ctxs.size());
141
26
    COUNTER_UPDATE(skip_cancelling_tasks_counter, skip_cancelling_tasks.size());
142
26
    COUNTER_UPDATE(keep_wait_cancelling_tasks_counter, keep_wait_cancelling_tasks.size());
143
144
26
    if (revocable_resource_ctxs.empty()) {
145
11
        return freed_memory_counter->value();
146
11
    }
147
148
15
    {
149
15
        SCOPED_TIMER(revoke_cost_time);
150
17
        while (!revocable_resource_ctxs.empty()) {
151
15
            auto resource_ctx = revocable_resource_ctxs.top().second;
152
15
            std::string task_revoke_reason = fmt::format(
153
15
                    "{} {} task: {}. because {}. in backend {}, {} execute again after enough "
154
15
                    "memory, details see be.INFO.",
155
15
                    action_func_string(action), priority_cmp_func_string(priority_cmp),
156
15
                    resource_ctx->memory_context()->debug_string(), revoke_reason,
157
15
                    BackendOptions::get_localhost(), process_mem_stat_to_client);
158
15
            if (ActionFuncImpl[action](resource_ctx.get(),
159
15
                                       Status::MemoryLimitExceeded(task_revoke_reason))) {
160
15
                this_time_revoked_tasks.push_back(resource_ctx->task_controller()->debug_string());
161
15
                COUNTER_UPDATE(freed_memory_counter,
162
15
                               resource_ctx->memory_context()->current_memory_bytes());
163
15
                COUNTER_UPDATE(this_time_revoked_tasks_counter, 1);
164
15
                if (freed_memory_counter->value() > need_free_mem) {
165
13
                    break;
166
13
                }
167
15
            }
168
2
            revocable_resource_ctxs.pop();
169
2
        }
170
15
    }
171
15
    return freed_memory_counter->value();
172
26
}
173
174
// step1: free process top memory query
175
// step2: free process top memory load, load retries are more expensive, so revoke at the end.
176
0
bool MemoryReclamation::revoke_process_memory(const std::string& revoke_reason) {
177
0
    MonotonicStopWatch watch;
178
0
    watch.start();
179
0
    int64_t freed_mem = 0;
180
0
    std::unique_ptr<RuntimeProfile> profile =
181
0
            std::make_unique<RuntimeProfile>("RevokeProcessMemory");
182
183
0
    LOG(INFO) << fmt::format(
184
0
            "[MemoryGC] start MemoryReclamation::revoke_process_memory, {}, need free size: {}.",
185
0
            GlobalMemoryArbitrator::process_mem_log_str(),
186
0
            PrettyPrinter::print_bytes(MemInfo::process_full_gc_size()));
187
0
    Defer defer {[&]() {
188
0
        std::stringstream ss;
189
0
        profile->pretty_print(&ss);
190
0
        LOG(INFO) << fmt::format(
191
0
                "[MemoryGC] end MemoryReclamation::revoke_process_memory, {}, need free size: {}, "
192
0
                "free Memory {}. cost(us): {}, details: {}",
193
0
                GlobalMemoryArbitrator::process_mem_log_str(),
194
0
                PrettyPrinter::print_bytes(MemInfo::process_full_gc_size()),
195
0
                PrettyPrinter::print_bytes(freed_mem), watch.elapsed_time() / 1000, ss.str());
196
0
    }};
197
198
    // step1: start canceling from the query with the largest memory usage until the memory of process_full_gc_size is freed.
199
0
    VLOG_DEBUG << fmt::format(
200
0
            "[MemoryGC] before free top memory query in revoke process memory, Type:{}, Memory "
201
0
            "Tracker "
202
0
            "Summary: {}",
203
0
            MemTrackerLimiter::type_string(MemTrackerLimiter::Type::QUERY),
204
0
            MemTrackerLimiter::make_type_trackers_profile_str(MemTrackerLimiter::Type::QUERY));
205
0
    RuntimeProfile* free_top_query_profile =
206
0
            profile->create_child("FreeTopMemoryQuery", true, true);
207
0
    std::vector<std::shared_ptr<ResourceContext>> resource_ctxs;
208
0
    ExecEnv::GetInstance()->runtime_query_statistics_mgr()->get_tasks_resource_context(
209
0
            resource_ctxs);
210
0
    freed_mem +=
211
0
            revoke_tasks_memory(MemInfo::process_full_gc_size() - freed_mem, resource_ctxs,
212
0
                                revoke_reason, free_top_query_profile, PriorityCmpFunc::TOP_MEMORY,
213
0
                                {FilterFunc::IS_QUERY}, ActionFunc::CANCEL);
214
0
    if (freed_mem > MemInfo::process_full_gc_size()) {
215
0
        return true;
216
0
    }
217
218
    // step2: start canceling from the load with the largest memory usage until the memory of process_full_gc_size is freed.
219
0
    VLOG_DEBUG << fmt::format(
220
0
            "[MemoryGC] before free top memory load in revoke process memory, Type:{}, Memory "
221
0
            "Tracker "
222
0
            "Summary: {}",
223
0
            MemTrackerLimiter::type_string(MemTrackerLimiter::Type::LOAD),
224
0
            MemTrackerLimiter::make_type_trackers_profile_str(MemTrackerLimiter::Type::LOAD));
225
0
    RuntimeProfile* free_top_load_profile = profile->create_child("FreeTopMemoryLoad", true, true);
226
0
    freed_mem +=
227
0
            revoke_tasks_memory(MemInfo::process_full_gc_size() - freed_mem, resource_ctxs,
228
0
                                revoke_reason, free_top_load_profile, PriorityCmpFunc::TOP_MEMORY,
229
0
                                {FilterFunc::IS_LOAD}, ActionFunc::CANCEL);
230
0
    return freed_mem > MemInfo::process_full_gc_size();
231
0
}
232
233
0
void MemoryReclamation::je_purge_dirty_pages() {
234
#ifdef USE_JEMALLOC
235
    if (config::disable_memory_gc || !config::enable_je_purge_dirty_pages) {
236
        return;
237
    }
238
    std::unique_lock<std::mutex> l(doris::JemallocControl::je_purge_dirty_pages_lock);
239
240
    // Allow `purge_all_arena_dirty_pages` again after the process memory changes by 256M,
241
    // otherwise execute `decay_all_arena_dirty_pages`, because `purge_all_arena_dirty_pages` is very expensive.
242
    if (doris::JemallocControl::je_purge_dirty_pages_notify.load(std::memory_order_relaxed)) {
243
        doris::JemallocControl::je_purge_all_arena_dirty_pages();
244
        doris::JemallocControl::je_purge_dirty_pages_notify.store(false, std::memory_order_relaxed);
245
    } else {
246
        doris::JemallocControl::je_decay_all_arena_dirty_pages();
247
    }
248
#endif
249
0
}
250
251
#include "common/compile_check_end.h"
252
} // namespace doris