be/src/runtime/memory/memory_reclamation.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "runtime/memory/memory_reclamation.h" |
19 | | |
20 | | #include <unordered_map> |
21 | | |
22 | | #include "runtime/exec_env.h" |
23 | | #include "runtime/memory/global_memory_arbitrator.h" |
24 | | #include "runtime/memory/jemalloc_control.h" |
25 | | #include "runtime/memory/mem_tracker_limiter.h" |
26 | | #include "runtime/runtime_profile.h" |
27 | | #include "runtime/runtime_query_statistics_mgr.h" |
28 | | #include "runtime/workload_group/workload_group.h" |
29 | | #include "util/mem_info.h" |
30 | | #include "util/stopwatch.hpp" |
31 | | |
32 | | namespace doris { |
33 | | #include "common/compile_check_begin.h" |
34 | | |
35 | | int64_t MemoryReclamation::revoke_tasks_memory( |
36 | | int64_t need_free_mem, const std::vector<std::shared_ptr<ResourceContext>>& resource_ctxs, |
37 | | const std::string& revoke_reason, RuntimeProfile* profile, PriorityCmpFunc priority_cmp, |
38 | 27 | std::vector<FilterFunc> filters, ActionFunc action) { |
39 | 27 | if (need_free_mem <= 0) { |
40 | 1 | return 0; |
41 | 1 | } |
42 | 26 | auto process_mem_stat = GlobalMemoryArbitrator::process_mem_log_str(); |
43 | 26 | auto process_mem_stat_to_client = fmt::format( |
44 | 26 | "os physical memory {}. {}, limit {}. {}, low water mark {}.", |
45 | 26 | PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES), |
46 | 26 | GlobalMemoryArbitrator::process_memory_used_str(), MemInfo::mem_limit_str(), |
47 | 26 | GlobalMemoryArbitrator::sys_mem_available_str(), |
48 | 26 | PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES)); |
49 | 26 | RuntimeProfile::Counter* request_revoke_tasks_counter = |
50 | 26 | ADD_COUNTER(profile, "RequestRevokeTasksNum", TUnit::UNIT); |
51 | 26 | RuntimeProfile::Counter* revocable_tasks_counter = |
52 | 26 | ADD_COUNTER(profile, "RevocableTasksNum", TUnit::UNIT); |
53 | 26 | RuntimeProfile::Counter* this_time_revoked_tasks_counter = |
54 | 26 | ADD_COUNTER(profile, "ThisTimeRevokedTasksNum", TUnit::UNIT); |
55 | 26 | RuntimeProfile::Counter* skip_cancelling_tasks_counter = |
56 | 26 | ADD_COUNTER(profile, "SkipCancellingTasksNum", TUnit::UNIT); |
57 | 26 | RuntimeProfile::Counter* keep_wait_cancelling_tasks_counter = |
58 | 26 | ADD_COUNTER(profile, "KeepWaitCancellingTasksNum", TUnit::UNIT); |
59 | 26 | RuntimeProfile::Counter* freed_memory_counter = |
60 | 26 | ADD_COUNTER(profile, "FreedMemory", TUnit::BYTES); |
61 | 26 | RuntimeProfile::Counter* filter_cost_time = ADD_TIMER(profile, "FilterCostTime"); |
62 | 26 | RuntimeProfile::Counter* revoke_cost_time = ADD_TIMER(profile, "revokeCostTime"); |
63 | | |
64 | 26 | std::priority_queue<std::pair<int64_t, std::shared_ptr<ResourceContext>>> |
65 | 26 | revocable_resource_ctxs; |
66 | 26 | std::vector<std::string> this_time_revoked_tasks; |
67 | 26 | std::vector<std::string> skip_cancelling_tasks; |
68 | 26 | std::vector<std::string> keep_wait_cancelling_tasks; |
69 | | |
70 | 26 | auto config_str = fmt::format( |
71 | 26 | "need free memory: {}, request revoke tasks: {}, revoke reason: {}, priority compare " |
72 | 26 | "function: {}, filter function: {}, action function: {}. {}", |
73 | 26 | need_free_mem, resource_ctxs.size(), revoke_reason, |
74 | 26 | priority_cmp_func_string(priority_cmp), filter_func_string(filters), |
75 | 26 | action_func_string(action), process_mem_stat); |
76 | 26 | LOG(INFO) << fmt::format("[MemoryGC] start revoke_tasks_memory, {}.", config_str); |
77 | 26 | Defer defer {[&]() { |
78 | 26 | LOG(INFO) << fmt::format( |
79 | 26 | "[MemoryGC] end revoke_tasks_memory, {}. freed memory: {}, " |
80 | 26 | "revocable tasks: {}, this time revoked tasks: {}, consist of: [{}], call revoke " |
81 | 26 | "func cost(us): {}." |
82 | 26 | " some tasks is being canceled and has not been completed yet, among them, skip " |
83 | 26 | "canceling tasks: {}(cancel cost too long, not counted in freed memory), consist " |
84 | 26 | "of: [{}], keep wait canceling tasks: {}(counted in freed memory), consist of: " |
85 | 26 | "[{}], filter cost(us): {}.", |
86 | 26 | config_str, freed_memory_counter->value(), revocable_tasks_counter->value(), |
87 | 26 | this_time_revoked_tasks_counter->value(), join(this_time_revoked_tasks, " | "), |
88 | 26 | revoke_cost_time->value(), skip_cancelling_tasks_counter->value(), |
89 | 26 | join(skip_cancelling_tasks, " | "), keep_wait_cancelling_tasks_counter->value(), |
90 | 26 | join(keep_wait_cancelling_tasks, " | "), filter_cost_time->value()); |
91 | 26 | }}; |
92 | | |
93 | 26 | { |
94 | 26 | SCOPED_TIMER(filter_cost_time); |
95 | 68 | for (auto resource_ctx : resource_ctxs) { |
96 | 68 | bool is_filtered = false; |
97 | 68 | for (auto filter : filters) { |
98 | 61 | if (!FilterFuncImpl[filter](resource_ctx.get())) { |
99 | 6 | is_filtered = true; |
100 | 6 | break; |
101 | 6 | } |
102 | 61 | } |
103 | 68 | if (is_filtered) { |
104 | 6 | continue; |
105 | 6 | } |
106 | | |
107 | | // skip cancelling tasks |
108 | 62 | if (resource_ctx->task_controller()->is_cancelled()) { |
109 | | // for the query being canceled, |
110 | | // if (current time - cancel start time) < 3s (revoke_memory_max_tolerance_ms), the query memory is counted in `freed_memory`, |
111 | | // and the query memory is expected to be released soon. |
112 | | // if > 3s, the query memory will not be counted in `freed_memory`, |
113 | | // and the query may be blocked during the cancel process. skip this query and continue to cancel other queries. |
114 | 24 | if (MonotonicMillis() - resource_ctx->task_controller()->cancelled_time() > |
115 | 24 | config::revoke_memory_max_tolerance_ms) { |
116 | 9 | skip_cancelling_tasks.push_back( |
117 | 9 | resource_ctx->task_controller()->debug_string()); |
118 | 15 | } else { |
119 | 15 | keep_wait_cancelling_tasks.push_back( |
120 | 15 | resource_ctx->task_controller()->debug_string()); |
121 | 15 | COUNTER_UPDATE(freed_memory_counter, |
122 | 15 | resource_ctx->memory_context()->current_memory_bytes()); |
123 | 15 | } |
124 | 24 | is_filtered = true; |
125 | 24 | } |
126 | | |
127 | | // TODO, if ActionFunc::SPILL, should skip spilling tasks. |
128 | | |
129 | 62 | if (is_filtered) { |
130 | 24 | continue; |
131 | 24 | } |
132 | 38 | int64_t weight = PriorityCmpFuncImpl[priority_cmp](resource_ctx.get()); |
133 | 38 | if (weight != -1) { |
134 | 25 | revocable_resource_ctxs.emplace(weight, resource_ctx); |
135 | 25 | } |
136 | 38 | } |
137 | 26 | } |
138 | | |
139 | 26 | COUNTER_UPDATE(request_revoke_tasks_counter, resource_ctxs.size()); |
140 | 26 | COUNTER_UPDATE(revocable_tasks_counter, revocable_resource_ctxs.size()); |
141 | 26 | COUNTER_UPDATE(skip_cancelling_tasks_counter, skip_cancelling_tasks.size()); |
142 | 26 | COUNTER_UPDATE(keep_wait_cancelling_tasks_counter, keep_wait_cancelling_tasks.size()); |
143 | | |
144 | 26 | if (revocable_resource_ctxs.empty()) { |
145 | 11 | return freed_memory_counter->value(); |
146 | 11 | } |
147 | | |
148 | 15 | { |
149 | 15 | SCOPED_TIMER(revoke_cost_time); |
150 | 17 | while (!revocable_resource_ctxs.empty()) { |
151 | 15 | auto resource_ctx = revocable_resource_ctxs.top().second; |
152 | 15 | std::string task_revoke_reason = fmt::format( |
153 | 15 | "{} {} task: {}. because {}. in backend {}, {} execute again after enough " |
154 | 15 | "memory, details see be.INFO.", |
155 | 15 | action_func_string(action), priority_cmp_func_string(priority_cmp), |
156 | 15 | resource_ctx->memory_context()->debug_string(), revoke_reason, |
157 | 15 | BackendOptions::get_localhost(), process_mem_stat_to_client); |
158 | 15 | if (ActionFuncImpl[action](resource_ctx.get(), |
159 | 15 | Status::MemoryLimitExceeded(task_revoke_reason))) { |
160 | 15 | this_time_revoked_tasks.push_back(resource_ctx->task_controller()->debug_string()); |
161 | 15 | COUNTER_UPDATE(freed_memory_counter, |
162 | 15 | resource_ctx->memory_context()->current_memory_bytes()); |
163 | 15 | COUNTER_UPDATE(this_time_revoked_tasks_counter, 1); |
164 | 15 | if (freed_memory_counter->value() > need_free_mem) { |
165 | 13 | break; |
166 | 13 | } |
167 | 15 | } |
168 | 2 | revocable_resource_ctxs.pop(); |
169 | 2 | } |
170 | 15 | } |
171 | 15 | return freed_memory_counter->value(); |
172 | 26 | } |
173 | | |
174 | | // step1: free process top memory query |
175 | | // step2: free process top memory load, load retries are more expensive, so revoke at the end. |
176 | 0 | bool MemoryReclamation::revoke_process_memory(const std::string& revoke_reason) { |
177 | 0 | MonotonicStopWatch watch; |
178 | 0 | watch.start(); |
179 | 0 | int64_t freed_mem = 0; |
180 | 0 | std::unique_ptr<RuntimeProfile> profile = |
181 | 0 | std::make_unique<RuntimeProfile>("RevokeProcessMemory"); |
182 | |
|
183 | 0 | LOG(INFO) << fmt::format( |
184 | 0 | "[MemoryGC] start MemoryReclamation::revoke_process_memory, {}, need free size: {}.", |
185 | 0 | GlobalMemoryArbitrator::process_mem_log_str(), |
186 | 0 | PrettyPrinter::print_bytes(MemInfo::process_full_gc_size())); |
187 | 0 | Defer defer {[&]() { |
188 | 0 | std::stringstream ss; |
189 | 0 | profile->pretty_print(&ss); |
190 | 0 | LOG(INFO) << fmt::format( |
191 | 0 | "[MemoryGC] end MemoryReclamation::revoke_process_memory, {}, need free size: {}, " |
192 | 0 | "free Memory {}. cost(us): {}, details: {}", |
193 | 0 | GlobalMemoryArbitrator::process_mem_log_str(), |
194 | 0 | PrettyPrinter::print_bytes(MemInfo::process_full_gc_size()), |
195 | 0 | PrettyPrinter::print_bytes(freed_mem), watch.elapsed_time() / 1000, ss.str()); |
196 | 0 | }}; |
197 | | |
198 | | // step1: start canceling from the query with the largest memory usage until the memory of process_full_gc_size is freed. |
199 | 0 | VLOG_DEBUG << fmt::format( |
200 | 0 | "[MemoryGC] before free top memory query in revoke process memory, Type:{}, Memory " |
201 | 0 | "Tracker " |
202 | 0 | "Summary: {}", |
203 | 0 | MemTrackerLimiter::type_string(MemTrackerLimiter::Type::QUERY), |
204 | 0 | MemTrackerLimiter::make_type_trackers_profile_str(MemTrackerLimiter::Type::QUERY)); |
205 | 0 | RuntimeProfile* free_top_query_profile = |
206 | 0 | profile->create_child("FreeTopMemoryQuery", true, true); |
207 | 0 | std::vector<std::shared_ptr<ResourceContext>> resource_ctxs; |
208 | 0 | ExecEnv::GetInstance()->runtime_query_statistics_mgr()->get_tasks_resource_context( |
209 | 0 | resource_ctxs); |
210 | 0 | freed_mem += |
211 | 0 | revoke_tasks_memory(MemInfo::process_full_gc_size() - freed_mem, resource_ctxs, |
212 | 0 | revoke_reason, free_top_query_profile, PriorityCmpFunc::TOP_MEMORY, |
213 | 0 | {FilterFunc::IS_QUERY}, ActionFunc::CANCEL); |
214 | 0 | if (freed_mem > MemInfo::process_full_gc_size()) { |
215 | 0 | return true; |
216 | 0 | } |
217 | | |
218 | | // step2: start canceling from the load with the largest memory usage until the memory of process_full_gc_size is freed. |
219 | 0 | VLOG_DEBUG << fmt::format( |
220 | 0 | "[MemoryGC] before free top memory load in revoke process memory, Type:{}, Memory " |
221 | 0 | "Tracker " |
222 | 0 | "Summary: {}", |
223 | 0 | MemTrackerLimiter::type_string(MemTrackerLimiter::Type::LOAD), |
224 | 0 | MemTrackerLimiter::make_type_trackers_profile_str(MemTrackerLimiter::Type::LOAD)); |
225 | 0 | RuntimeProfile* free_top_load_profile = profile->create_child("FreeTopMemoryLoad", true, true); |
226 | 0 | freed_mem += |
227 | 0 | revoke_tasks_memory(MemInfo::process_full_gc_size() - freed_mem, resource_ctxs, |
228 | 0 | revoke_reason, free_top_load_profile, PriorityCmpFunc::TOP_MEMORY, |
229 | 0 | {FilterFunc::IS_LOAD}, ActionFunc::CANCEL); |
230 | 0 | return freed_mem > MemInfo::process_full_gc_size(); |
231 | 0 | } |
232 | | |
233 | 0 | void MemoryReclamation::je_purge_dirty_pages() { |
234 | | #ifdef USE_JEMALLOC |
235 | | if (config::disable_memory_gc || !config::enable_je_purge_dirty_pages) { |
236 | | return; |
237 | | } |
238 | | std::unique_lock<std::mutex> l(doris::JemallocControl::je_purge_dirty_pages_lock); |
239 | | |
240 | | // Allow `purge_all_arena_dirty_pages` again after the process memory changes by 256M, |
241 | | // otherwise execute `decay_all_arena_dirty_pages`, because `purge_all_arena_dirty_pages` is very expensive. |
242 | | if (doris::JemallocControl::je_purge_dirty_pages_notify.load(std::memory_order_relaxed)) { |
243 | | doris::JemallocControl::je_purge_all_arena_dirty_pages(); |
244 | | doris::JemallocControl::je_purge_dirty_pages_notify.store(false, std::memory_order_relaxed); |
245 | | } else { |
246 | | doris::JemallocControl::je_decay_all_arena_dirty_pages(); |
247 | | } |
248 | | #endif |
249 | 0 | } |
250 | | |
251 | | #include "common/compile_check_end.h" |
252 | | } // namespace doris |