Coverage Report

Created: 2026-04-14 17:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/memory/memory_reclamation.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/memory/memory_reclamation.h"
19
20
#include <unordered_map>
21
22
#include "runtime/exec_env.h"
23
#include "runtime/memory/global_memory_arbitrator.h"
24
#include "runtime/memory/jemalloc_control.h"
25
#include "runtime/memory/mem_tracker_limiter.h"
26
#include "runtime/runtime_profile.h"
27
#include "runtime/runtime_query_statistics_mgr.h"
28
#include "runtime/workload_group/workload_group.h"
29
#include "util/mem_info.h"
30
#include "util/stopwatch.hpp"
31
32
namespace doris {
33
34
int64_t MemoryReclamation::revoke_tasks_memory(
35
        int64_t need_free_mem, const std::vector<std::shared_ptr<ResourceContext>>& resource_ctxs,
36
        const std::string& revoke_reason, RuntimeProfile* profile, PriorityCmpFunc priority_cmp,
37
27
        std::vector<FilterFunc> filters, ActionFunc action) {
38
27
    if (need_free_mem <= 0) {
39
1
        return 0;
40
1
    }
41
26
    auto process_mem_stat = GlobalMemoryArbitrator::process_mem_log_str();
42
26
    auto process_mem_stat_to_client = fmt::format(
43
26
            "os physical memory {}. {}, limit {}. {}, low water mark {}.",
44
26
            PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES),
45
26
            GlobalMemoryArbitrator::process_memory_used_str(), MemInfo::mem_limit_str(),
46
26
            GlobalMemoryArbitrator::sys_mem_available_str(),
47
26
            PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES));
48
26
    RuntimeProfile::Counter* request_revoke_tasks_counter =
49
26
            ADD_COUNTER(profile, "RequestRevokeTasksNum", TUnit::UNIT);
50
26
    RuntimeProfile::Counter* revocable_tasks_counter =
51
26
            ADD_COUNTER(profile, "RevocableTasksNum", TUnit::UNIT);
52
26
    RuntimeProfile::Counter* this_time_revoked_tasks_counter =
53
26
            ADD_COUNTER(profile, "ThisTimeRevokedTasksNum", TUnit::UNIT);
54
26
    RuntimeProfile::Counter* skip_cancelling_tasks_counter =
55
26
            ADD_COUNTER(profile, "SkipCancellingTasksNum", TUnit::UNIT);
56
26
    RuntimeProfile::Counter* keep_wait_cancelling_tasks_counter =
57
26
            ADD_COUNTER(profile, "KeepWaitCancellingTasksNum", TUnit::UNIT);
58
26
    RuntimeProfile::Counter* freed_memory_counter =
59
26
            ADD_COUNTER(profile, "FreedMemory", TUnit::BYTES);
60
26
    RuntimeProfile::Counter* filter_cost_time = ADD_TIMER(profile, "FilterCostTime");
61
26
    RuntimeProfile::Counter* revoke_cost_time = ADD_TIMER(profile, "revokeCostTime");
62
63
26
    std::priority_queue<std::pair<int64_t, std::shared_ptr<ResourceContext>>>
64
26
            revocable_resource_ctxs;
65
26
    std::vector<std::string> this_time_revoked_tasks;
66
26
    std::vector<std::string> skip_cancelling_tasks;
67
26
    std::vector<std::string> keep_wait_cancelling_tasks;
68
69
26
    auto config_str = fmt::format(
70
26
            "need free memory: {}, request revoke tasks: {}, revoke reason: {}, priority compare "
71
26
            "function: {}, filter function: {}, action function: {}. {}",
72
26
            need_free_mem, resource_ctxs.size(), revoke_reason,
73
26
            priority_cmp_func_string(priority_cmp), filter_func_string(filters),
74
26
            action_func_string(action), process_mem_stat);
75
26
    LOG(INFO) << fmt::format("[MemoryGC] start revoke_tasks_memory, {}.", config_str);
76
26
    Defer defer {[&]() {
77
26
        LOG(INFO) << fmt::format(
78
26
                "[MemoryGC] end revoke_tasks_memory, {}. freed memory: {}, "
79
26
                "revocable tasks: {}, this time revoked tasks: {}, consist of: [{}], call revoke "
80
26
                "func cost(us): {}."
81
26
                " some tasks is being canceled and has not been completed yet, among them, skip "
82
26
                "canceling tasks: {}(cancel cost too long, not counted in freed memory), consist "
83
26
                "of: [{}], keep wait canceling tasks: {}(counted in freed memory), consist of: "
84
26
                "[{}], filter cost(us): {}.",
85
26
                config_str, freed_memory_counter->value(), revocable_tasks_counter->value(),
86
26
                this_time_revoked_tasks_counter->value(), join(this_time_revoked_tasks, " | "),
87
26
                revoke_cost_time->value(), skip_cancelling_tasks_counter->value(),
88
26
                join(skip_cancelling_tasks, " | "), keep_wait_cancelling_tasks_counter->value(),
89
26
                join(keep_wait_cancelling_tasks, " | "), filter_cost_time->value());
90
26
    }};
91
92
26
    {
93
26
        SCOPED_TIMER(filter_cost_time);
94
68
        for (auto resource_ctx : resource_ctxs) {
95
68
            bool is_filtered = false;
96
68
            for (auto filter : filters) {
97
61
                if (!FilterFuncImpl[filter](resource_ctx.get())) {
98
6
                    is_filtered = true;
99
6
                    break;
100
6
                }
101
61
            }
102
68
            if (is_filtered) {
103
6
                continue;
104
6
            }
105
106
            // skip cancelling tasks
107
62
            if (resource_ctx->task_controller()->is_cancelled()) {
108
                // for the query being canceled,
109
                // if (current time - cancel start time) < 3s (revoke_memory_max_tolerance_ms), the query memory is counted in `freed_memory`,
110
                // and the query memory is expected to be released soon.
111
                // if > 3s, the query memory will not be counted in `freed_memory`,
112
                // and the query may be blocked during the cancel process. skip this query and continue to cancel other queries.
113
24
                if (MonotonicMillis() - resource_ctx->task_controller()->cancelled_time() >
114
24
                    config::revoke_memory_max_tolerance_ms) {
115
9
                    skip_cancelling_tasks.push_back(
116
9
                            resource_ctx->task_controller()->debug_string());
117
15
                } else {
118
15
                    keep_wait_cancelling_tasks.push_back(
119
15
                            resource_ctx->task_controller()->debug_string());
120
15
                    COUNTER_UPDATE(freed_memory_counter,
121
15
                                   resource_ctx->memory_context()->current_memory_bytes());
122
15
                }
123
24
                is_filtered = true;
124
24
            }
125
126
            // TODO, if ActionFunc::SPILL, should skip spilling tasks.
127
128
62
            if (is_filtered) {
129
24
                continue;
130
24
            }
131
38
            int64_t weight = PriorityCmpFuncImpl[priority_cmp](resource_ctx.get());
132
38
            if (weight != -1) {
133
25
                revocable_resource_ctxs.emplace(weight, resource_ctx);
134
25
            }
135
38
        }
136
26
    }
137
138
26
    COUNTER_UPDATE(request_revoke_tasks_counter, resource_ctxs.size());
139
26
    COUNTER_UPDATE(revocable_tasks_counter, revocable_resource_ctxs.size());
140
26
    COUNTER_UPDATE(skip_cancelling_tasks_counter, skip_cancelling_tasks.size());
141
26
    COUNTER_UPDATE(keep_wait_cancelling_tasks_counter, keep_wait_cancelling_tasks.size());
142
143
26
    if (revocable_resource_ctxs.empty()) {
144
11
        return freed_memory_counter->value();
145
11
    }
146
147
15
    {
148
15
        SCOPED_TIMER(revoke_cost_time);
149
17
        while (!revocable_resource_ctxs.empty()) {
150
15
            auto resource_ctx = revocable_resource_ctxs.top().second;
151
15
            std::string task_revoke_reason = fmt::format(
152
15
                    "{} {} task: {}. because {}. in backend {}, {} execute again after enough "
153
15
                    "memory, details see be.INFO.",
154
15
                    action_func_string(action), priority_cmp_func_string(priority_cmp),
155
15
                    resource_ctx->memory_context()->debug_string(), revoke_reason,
156
15
                    BackendOptions::get_localhost(), process_mem_stat_to_client);
157
15
            if (ActionFuncImpl[action](resource_ctx.get(),
158
15
                                       Status::MemoryLimitExceeded(task_revoke_reason))) {
159
15
                this_time_revoked_tasks.push_back(resource_ctx->task_controller()->debug_string());
160
15
                COUNTER_UPDATE(freed_memory_counter,
161
15
                               resource_ctx->memory_context()->current_memory_bytes());
162
15
                COUNTER_UPDATE(this_time_revoked_tasks_counter, 1);
163
15
                if (freed_memory_counter->value() > need_free_mem) {
164
13
                    break;
165
13
                }
166
15
            }
167
2
            revocable_resource_ctxs.pop();
168
2
        }
169
15
    }
170
15
    return freed_memory_counter->value();
171
26
}
172
173
// step1: free process top memory query
174
// step2: free process top memory load, load retries are more expensive, so revoke at the end.
175
0
bool MemoryReclamation::revoke_process_memory(const std::string& revoke_reason) {
176
0
    MonotonicStopWatch watch;
177
0
    watch.start();
178
0
    int64_t freed_mem = 0;
179
0
    std::unique_ptr<RuntimeProfile> profile =
180
0
            std::make_unique<RuntimeProfile>("RevokeProcessMemory");
181
182
0
    LOG(INFO) << fmt::format(
183
0
            "[MemoryGC] start MemoryReclamation::revoke_process_memory, {}, need free size: {}.",
184
0
            GlobalMemoryArbitrator::process_mem_log_str(),
185
0
            PrettyPrinter::print_bytes(MemInfo::process_full_gc_size()));
186
0
    Defer defer {[&]() {
187
0
        std::stringstream ss;
188
0
        profile->pretty_print(&ss);
189
0
        LOG(INFO) << fmt::format(
190
0
                "[MemoryGC] end MemoryReclamation::revoke_process_memory, {}, need free size: {}, "
191
0
                "free Memory {}. cost(us): {}, details: {}",
192
0
                GlobalMemoryArbitrator::process_mem_log_str(),
193
0
                PrettyPrinter::print_bytes(MemInfo::process_full_gc_size()),
194
0
                PrettyPrinter::print_bytes(freed_mem), watch.elapsed_time() / 1000, ss.str());
195
0
    }};
196
197
    // step1: start canceling from the query with the largest memory usage until the memory of process_full_gc_size is freed.
198
0
    VLOG_DEBUG << fmt::format(
199
0
            "[MemoryGC] before free top memory query in revoke process memory, Type:{}, Memory "
200
0
            "Tracker "
201
0
            "Summary: {}",
202
0
            MemTrackerLimiter::type_string(MemTrackerLimiter::Type::QUERY),
203
0
            MemTrackerLimiter::make_type_trackers_profile_str(MemTrackerLimiter::Type::QUERY));
204
0
    RuntimeProfile* free_top_query_profile =
205
0
            profile->create_child("FreeTopMemoryQuery", true, true);
206
0
    std::vector<std::shared_ptr<ResourceContext>> resource_ctxs;
207
0
    ExecEnv::GetInstance()->runtime_query_statistics_mgr()->get_tasks_resource_context(
208
0
            resource_ctxs);
209
0
    freed_mem +=
210
0
            revoke_tasks_memory(MemInfo::process_full_gc_size() - freed_mem, resource_ctxs,
211
0
                                revoke_reason, free_top_query_profile, PriorityCmpFunc::TOP_MEMORY,
212
0
                                {FilterFunc::IS_QUERY}, ActionFunc::CANCEL);
213
0
    if (freed_mem > MemInfo::process_full_gc_size()) {
214
0
        return true;
215
0
    }
216
217
    // step2: start canceling from the load with the largest memory usage until the memory of process_full_gc_size is freed.
218
0
    VLOG_DEBUG << fmt::format(
219
0
            "[MemoryGC] before free top memory load in revoke process memory, Type:{}, Memory "
220
0
            "Tracker "
221
0
            "Summary: {}",
222
0
            MemTrackerLimiter::type_string(MemTrackerLimiter::Type::LOAD),
223
0
            MemTrackerLimiter::make_type_trackers_profile_str(MemTrackerLimiter::Type::LOAD));
224
0
    RuntimeProfile* free_top_load_profile = profile->create_child("FreeTopMemoryLoad", true, true);
225
0
    freed_mem +=
226
0
            revoke_tasks_memory(MemInfo::process_full_gc_size() - freed_mem, resource_ctxs,
227
0
                                revoke_reason, free_top_load_profile, PriorityCmpFunc::TOP_MEMORY,
228
0
                                {FilterFunc::IS_LOAD}, ActionFunc::CANCEL);
229
0
    return freed_mem > MemInfo::process_full_gc_size();
230
0
}
231
232
0
void MemoryReclamation::je_purge_dirty_pages() {
233
#ifdef USE_JEMALLOC
234
    if (config::disable_memory_gc || !config::enable_je_purge_dirty_pages) {
235
        return;
236
    }
237
    std::unique_lock<std::mutex> l(doris::JemallocControl::je_purge_dirty_pages_lock);
238
239
    // Allow `purge_all_arena_dirty_pages` again after the process memory changes by 256M,
240
    // otherwise execute `decay_all_arena_dirty_pages`, because `purge_all_arena_dirty_pages` is very expensive.
241
    if (doris::JemallocControl::je_purge_dirty_pages_notify.load(std::memory_order_relaxed)) {
242
        doris::JemallocControl::je_purge_all_arena_dirty_pages();
243
        doris::JemallocControl::je_purge_dirty_pages_notify.store(false, std::memory_order_relaxed);
244
    } else {
245
        doris::JemallocControl::je_decay_all_arena_dirty_pages();
246
    }
247
#endif
248
0
}
249
250
} // namespace doris