Coverage Report

Created: 2026-05-14 14:31

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/memory/memory_reclamation.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/memory/memory_reclamation.h"
19
20
#include <algorithm>
21
#include <unordered_map>
22
23
#include "runtime/exec_env.h"
24
#include "runtime/memory/global_memory_arbitrator.h"
25
#include "runtime/memory/jemalloc_control.h"
26
#include "runtime/memory/mem_tracker_limiter.h"
27
#include "runtime/runtime_profile.h"
28
#include "runtime/runtime_query_statistics_mgr.h"
29
#include "runtime/workload_group/workload_group.h"
30
#include "util/mem_info.h"
31
#include "util/stopwatch.hpp"
32
33
namespace doris {
34
35
int64_t MemoryReclamation::revoke_tasks_memory(
36
        int64_t need_free_mem, const std::vector<std::shared_ptr<ResourceContext>>& resource_ctxs,
37
        const std::string& revoke_reason, RuntimeProfile* profile, PriorityCmpFunc priority_cmp,
38
27
        std::vector<FilterFunc> filters, ActionFunc action) {
39
27
    if (need_free_mem <= 0) {
40
1
        return 0;
41
1
    }
42
26
    auto process_mem_stat = GlobalMemoryArbitrator::process_mem_log_str();
43
26
    auto process_mem_stat_to_client = fmt::format(
44
26
            "os physical memory {}. {}, limit {}. {}, low water mark {}.",
45
26
            PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES),
46
26
            GlobalMemoryArbitrator::process_memory_used_str(), MemInfo::mem_limit_str(),
47
26
            GlobalMemoryArbitrator::sys_mem_available_str(),
48
26
            PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES));
49
26
    RuntimeProfile::Counter* request_revoke_tasks_counter =
50
26
            ADD_COUNTER(profile, "RequestRevokeTasksNum", TUnit::UNIT);
51
26
    RuntimeProfile::Counter* revocable_tasks_counter =
52
26
            ADD_COUNTER(profile, "RevocableTasksNum", TUnit::UNIT);
53
26
    RuntimeProfile::Counter* this_time_revoked_tasks_counter =
54
26
            ADD_COUNTER(profile, "ThisTimeRevokedTasksNum", TUnit::UNIT);
55
26
    RuntimeProfile::Counter* skip_cancelling_tasks_counter =
56
26
            ADD_COUNTER(profile, "SkipCancellingTasksNum", TUnit::UNIT);
57
26
    RuntimeProfile::Counter* keep_wait_cancelling_tasks_counter =
58
26
            ADD_COUNTER(profile, "KeepWaitCancellingTasksNum", TUnit::UNIT);
59
26
    RuntimeProfile::Counter* freed_memory_counter =
60
26
            ADD_COUNTER(profile, "FreedMemory", TUnit::BYTES);
61
26
    RuntimeProfile::Counter* filter_cost_time = ADD_TIMER(profile, "FilterCostTime");
62
26
    RuntimeProfile::Counter* revoke_cost_time = ADD_TIMER(profile, "revokeCostTime");
63
64
26
    std::priority_queue<std::pair<int64_t, std::shared_ptr<ResourceContext>>>
65
26
            revocable_resource_ctxs;
66
26
    std::vector<std::string> this_time_revoked_tasks;
67
26
    std::vector<std::string> skip_cancelling_tasks;
68
26
    std::vector<std::string> keep_wait_cancelling_tasks;
69
70
26
    auto config_str = fmt::format(
71
26
            "need free memory: {}, request revoke tasks: {}, revoke reason: {}, priority compare "
72
26
            "function: {}, filter function: {}, action function: {}. {}",
73
26
            need_free_mem, resource_ctxs.size(), revoke_reason,
74
26
            priority_cmp_func_string(priority_cmp), filter_func_string(filters),
75
26
            action_func_string(action), process_mem_stat);
76
26
    LOG(INFO) << fmt::format("[MemoryGC] start revoke_tasks_memory, {}.", config_str);
77
26
    Defer defer {[&]() {
78
26
        LOG(INFO) << fmt::format(
79
26
                "[MemoryGC] end revoke_tasks_memory, {}. freed memory: {}, "
80
26
                "revocable tasks: {}, this time revoked tasks: {}, consist of: [{}], call revoke "
81
26
                "func cost(us): {}."
82
26
                " some tasks is being canceled and has not been completed yet, among them, skip "
83
26
                "canceling tasks: {}(cancel cost too long, not counted in freed memory), consist "
84
26
                "of: [{}], keep wait canceling tasks: {}(counted in freed memory), consist of: "
85
26
                "[{}], filter cost(us): {}.",
86
26
                config_str, freed_memory_counter->value(), revocable_tasks_counter->value(),
87
26
                this_time_revoked_tasks_counter->value(), join(this_time_revoked_tasks, " | "),
88
26
                revoke_cost_time->value(), skip_cancelling_tasks_counter->value(),
89
26
                join(skip_cancelling_tasks, " | "), keep_wait_cancelling_tasks_counter->value(),
90
26
                join(keep_wait_cancelling_tasks, " | "), filter_cost_time->value());
91
26
    }};
92
93
26
    {
94
26
        SCOPED_TIMER(filter_cost_time);
95
68
        for (auto resource_ctx : resource_ctxs) {
96
68
            bool is_filtered = false;
97
68
            for (auto filter : filters) {
98
61
                if (!FilterFuncImpl[filter](resource_ctx.get())) {
99
6
                    is_filtered = true;
100
6
                    break;
101
6
                }
102
61
            }
103
68
            if (is_filtered) {
104
6
                continue;
105
6
            }
106
107
            // skip cancelling tasks
108
62
            if (resource_ctx->task_controller()->is_cancelled()) {
109
                // for the query being canceled,
110
                // if (current time - cancel start time) < 3s (revoke_memory_max_tolerance_ms), the query memory is counted in `freed_memory`,
111
                // and the query memory is expected to be released soon.
112
                // if > 3s, the query memory will not be counted in `freed_memory`,
113
                // and the query may be blocked during the cancel process. skip this query and continue to cancel other queries.
114
24
                if (MonotonicMillis() - resource_ctx->task_controller()->cancelled_time() >
115
24
                    config::revoke_memory_max_tolerance_ms) {
116
9
                    skip_cancelling_tasks.push_back(
117
9
                            resource_ctx->task_controller()->debug_string());
118
15
                } else {
119
15
                    keep_wait_cancelling_tasks.push_back(
120
15
                            resource_ctx->task_controller()->debug_string());
121
                    // Memory tracker consumption can be slightly negative due to
122
                    // concurrent batched tracking; clamp to 0 for freed memory accounting.
123
15
                    COUNTER_UPDATE(
124
15
                            freed_memory_counter,
125
15
                            std::max(int64_t(0),
126
15
                                     resource_ctx->memory_context()->current_memory_bytes()));
127
15
                }
128
24
                is_filtered = true;
129
24
            }
130
131
            // TODO, if ActionFunc::SPILL, should skip spilling tasks.
132
133
62
            if (is_filtered) {
134
24
                continue;
135
24
            }
136
38
            int64_t weight = PriorityCmpFuncImpl[priority_cmp](resource_ctx.get());
137
38
            if (weight != -1) {
138
25
                revocable_resource_ctxs.emplace(weight, resource_ctx);
139
25
            }
140
38
        }
141
26
    }
142
143
26
    COUNTER_UPDATE(request_revoke_tasks_counter, resource_ctxs.size());
144
26
    COUNTER_UPDATE(revocable_tasks_counter, revocable_resource_ctxs.size());
145
26
    COUNTER_UPDATE(skip_cancelling_tasks_counter, skip_cancelling_tasks.size());
146
26
    COUNTER_UPDATE(keep_wait_cancelling_tasks_counter, keep_wait_cancelling_tasks.size());
147
148
26
    if (revocable_resource_ctxs.empty()) {
149
11
        return freed_memory_counter->value();
150
11
    }
151
152
15
    {
153
15
        SCOPED_TIMER(revoke_cost_time);
154
17
        while (!revocable_resource_ctxs.empty()) {
155
15
            auto resource_ctx = revocable_resource_ctxs.top().second;
156
15
            std::string task_revoke_reason = fmt::format(
157
15
                    "{} {} task: {}. because {}. in backend {}, {} execute again after enough "
158
15
                    "memory, details see be.INFO.",
159
15
                    action_func_string(action), priority_cmp_func_string(priority_cmp),
160
15
                    resource_ctx->memory_context()->debug_string(), revoke_reason,
161
15
                    BackendOptions::get_localhost(), process_mem_stat_to_client);
162
15
            if (ActionFuncImpl[action](resource_ctx.get(),
163
15
                                       Status::MemoryLimitExceeded(task_revoke_reason))) {
164
15
                this_time_revoked_tasks.push_back(resource_ctx->task_controller()->debug_string());
165
                // Memory tracker consumption can be slightly negative due to
166
                // concurrent batched tracking; clamp to 0 for freed memory accounting.
167
15
                COUNTER_UPDATE(freed_memory_counter,
168
15
                               std::max(int64_t(0),
169
15
                                        resource_ctx->memory_context()->current_memory_bytes()));
170
15
                COUNTER_UPDATE(this_time_revoked_tasks_counter, 1);
171
15
                if (freed_memory_counter->value() > need_free_mem) {
172
13
                    break;
173
13
                }
174
15
            }
175
2
            revocable_resource_ctxs.pop();
176
2
        }
177
15
    }
178
15
    return freed_memory_counter->value();
179
26
}
180
181
// step1: free process top memory query
182
// step2: free process top memory load, load retries are more expensive, so revoke at the end.
183
0
bool MemoryReclamation::revoke_process_memory(const std::string& revoke_reason) {
184
0
    MonotonicStopWatch watch;
185
0
    watch.start();
186
0
    int64_t freed_mem = 0;
187
0
    std::unique_ptr<RuntimeProfile> profile =
188
0
            std::make_unique<RuntimeProfile>("RevokeProcessMemory");
189
190
0
    LOG(INFO) << fmt::format(
191
0
            "[MemoryGC] start MemoryReclamation::revoke_process_memory, {}, need free size: {}.",
192
0
            GlobalMemoryArbitrator::process_mem_log_str(),
193
0
            PrettyPrinter::print_bytes(MemInfo::process_full_gc_size()));
194
0
    Defer defer {[&]() {
195
0
        std::stringstream ss;
196
0
        profile->pretty_print(&ss);
197
0
        LOG(INFO) << fmt::format(
198
0
                "[MemoryGC] end MemoryReclamation::revoke_process_memory, {}, need free size: {}, "
199
0
                "free Memory {}. cost(us): {}, details: {}",
200
0
                GlobalMemoryArbitrator::process_mem_log_str(),
201
0
                PrettyPrinter::print_bytes(MemInfo::process_full_gc_size()),
202
0
                PrettyPrinter::print_bytes(freed_mem), watch.elapsed_time() / 1000, ss.str());
203
0
    }};
204
205
    // step1: start canceling from the query with the largest memory usage until the memory of process_full_gc_size is freed.
206
0
    VLOG_DEBUG << fmt::format(
207
0
            "[MemoryGC] before free top memory query in revoke process memory, Type:{}, Memory "
208
0
            "Tracker "
209
0
            "Summary: {}",
210
0
            MemTrackerLimiter::type_string(MemTrackerLimiter::Type::QUERY),
211
0
            MemTrackerLimiter::make_type_trackers_profile_str(MemTrackerLimiter::Type::QUERY));
212
0
    RuntimeProfile* free_top_query_profile =
213
0
            profile->create_child("FreeTopMemoryQuery", true, true);
214
0
    std::vector<std::shared_ptr<ResourceContext>> resource_ctxs;
215
0
    ExecEnv::GetInstance()->runtime_query_statistics_mgr()->get_tasks_resource_context(
216
0
            resource_ctxs);
217
0
    freed_mem +=
218
0
            revoke_tasks_memory(MemInfo::process_full_gc_size() - freed_mem, resource_ctxs,
219
0
                                revoke_reason, free_top_query_profile, PriorityCmpFunc::TOP_MEMORY,
220
0
                                {FilterFunc::IS_QUERY}, ActionFunc::CANCEL);
221
0
    if (freed_mem > MemInfo::process_full_gc_size()) {
222
0
        return true;
223
0
    }
224
225
    // step2: start canceling from the load with the largest memory usage until the memory of process_full_gc_size is freed.
226
0
    VLOG_DEBUG << fmt::format(
227
0
            "[MemoryGC] before free top memory load in revoke process memory, Type:{}, Memory "
228
0
            "Tracker "
229
0
            "Summary: {}",
230
0
            MemTrackerLimiter::type_string(MemTrackerLimiter::Type::LOAD),
231
0
            MemTrackerLimiter::make_type_trackers_profile_str(MemTrackerLimiter::Type::LOAD));
232
0
    RuntimeProfile* free_top_load_profile = profile->create_child("FreeTopMemoryLoad", true, true);
233
0
    freed_mem +=
234
0
            revoke_tasks_memory(MemInfo::process_full_gc_size() - freed_mem, resource_ctxs,
235
0
                                revoke_reason, free_top_load_profile, PriorityCmpFunc::TOP_MEMORY,
236
0
                                {FilterFunc::IS_LOAD}, ActionFunc::CANCEL);
237
0
    return freed_mem > MemInfo::process_full_gc_size();
238
0
}
239
240
0
void MemoryReclamation::je_purge_dirty_pages() {
241
#ifdef USE_JEMALLOC
242
    if (config::disable_memory_gc || !config::enable_je_purge_dirty_pages) {
243
        return;
244
    }
245
    std::unique_lock<std::mutex> l(doris::JemallocControl::je_purge_dirty_pages_lock);
246
247
    // Allow `purge_all_arena_dirty_pages` again after the process memory changes by 256M,
248
    // otherwise execute `decay_all_arena_dirty_pages`, because `purge_all_arena_dirty_pages` is very expensive.
249
    if (doris::JemallocControl::je_purge_dirty_pages_notify.load(std::memory_order_relaxed)) {
250
        doris::JemallocControl::je_purge_all_arena_dirty_pages();
251
        doris::JemallocControl::je_purge_dirty_pages_notify.store(false, std::memory_order_relaxed);
252
    } else {
253
        doris::JemallocControl::je_decay_all_arena_dirty_pages();
254
    }
255
#endif
256
0
}
257
258
} // namespace doris