/root/doris/be/src/common/daemon.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "common/daemon.h" |
19 | | |
20 | | // IWYU pragma: no_include <bthread/errno.h> |
21 | | #include <errno.h> // IWYU pragma: keep |
22 | | #include <gflags/gflags.h> |
23 | | |
24 | | #include "runtime/memory/jemalloc_control.h" |
25 | | #if !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && \ |
26 | | !defined(THREAD_SANITIZER) && !defined(USE_JEMALLOC) |
27 | | #include <gperftools/malloc_extension.h> // IWYU pragma: keep |
28 | | #endif |
29 | | // IWYU pragma: no_include <bits/std_abs.h> |
30 | | #include <butil/iobuf.h> |
31 | | #include <math.h> |
32 | | #include <stdint.h> |
33 | | #include <stdlib.h> |
34 | | |
35 | | // IWYU pragma: no_include <bits/chrono.h> |
36 | | #include <chrono> // IWYU pragma: keep |
37 | | #include <map> |
38 | | #include <ostream> |
39 | | #include <string> |
40 | | |
41 | | #include "cloud/config.h" |
42 | | #include "common/config.h" |
43 | | #include "common/logging.h" |
44 | | #include "common/status.h" |
45 | | #include "olap/memtable_memory_limiter.h" |
46 | | #include "olap/storage_engine.h" |
47 | | #include "olap/tablet_manager.h" |
48 | | #include "runtime/be_proc_monitor.h" |
49 | | #include "runtime/exec_env.h" |
50 | | #include "runtime/fragment_mgr.h" |
51 | | #include "runtime/memory/global_memory_arbitrator.h" |
52 | | #include "runtime/memory/mem_tracker_limiter.h" |
53 | | #include "runtime/memory/memory_reclamation.h" |
54 | | #include "runtime/process_profile.h" |
55 | | #include "runtime/runtime_query_statistics_mgr.h" |
56 | | #include "runtime/workload_group/workload_group_manager.h" |
57 | | #include "util/algorithm_util.h" |
58 | | #include "util/doris_metrics.h" |
59 | | #include "util/mem_info.h" |
60 | | #include "util/metrics.h" |
61 | | #include "util/perf_counters.h" |
62 | | #include "util/system_metrics.h" |
63 | | #include "util/time.h" |
64 | | |
65 | | namespace doris { |
66 | | namespace { |
67 | | |
68 | | std::atomic<int64_t> last_print_proc_mem = 0; |
69 | | std::atomic<int32_t> refresh_cache_capacity_sleep_time_ms = 0; |
70 | | std::atomic<int32_t> memory_gc_sleep_time = 0; |
71 | | #ifdef USE_JEMALLOC |
72 | | std::atomic<int32_t> je_reset_dirty_decay_sleep_time_ms = 0; |
73 | | #endif |
74 | | |
75 | 16 | void update_rowsets_and_segments_num_metrics() { |
76 | 16 | if (config::is_cloud_mode()) { |
77 | | // TODO(plat1ko): CloudStorageEngine |
78 | 16 | } else { |
79 | 16 | StorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_local(); |
80 | 16 | auto* metrics = DorisMetrics::instance(); |
81 | 16 | metrics->all_rowsets_num->set_value(engine.tablet_manager()->get_rowset_nums()); |
82 | 16 | metrics->all_segments_num->set_value(engine.tablet_manager()->get_segment_nums()); |
83 | 16 | } |
84 | 16 | } |
85 | | |
86 | | } // namespace |
87 | | |
88 | 1 | void Daemon::tcmalloc_gc_thread() { |
89 | | // TODO All cache GC wish to be supported |
90 | | #if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) && \ |
91 | | !defined(USE_JEMALLOC) |
92 | | |
93 | | // Limit size of tcmalloc cache via release_rate and max_cache_percent. |
94 | | // We adjust release_rate according to memory_pressure, which is usage percent of memory. |
95 | | int64_t max_cache_percent = 60; |
96 | | double release_rates[10] = {1.0, 1.0, 1.0, 5.0, 5.0, 20.0, 50.0, 100.0, 500.0, 2000.0}; |
97 | | int64_t pressure_limit = 90; |
98 | | bool is_performance_mode = false; |
99 | | int64_t physical_limit_bytes = |
100 | | std::min(MemInfo::physical_mem() - MemInfo::sys_mem_available_low_water_mark(), |
101 | | MemInfo::mem_limit()); |
102 | | |
103 | | if (config::memory_mode == std::string("performance")) { |
104 | | max_cache_percent = 100; |
105 | | pressure_limit = 90; |
106 | | is_performance_mode = true; |
107 | | physical_limit_bytes = std::min(MemInfo::mem_limit(), MemInfo::physical_mem()); |
108 | | } else if (config::memory_mode == std::string("compact")) { |
109 | | max_cache_percent = 20; |
110 | | pressure_limit = 80; |
111 | | } |
112 | | |
113 | | int last_ms = 0; |
114 | | const int kMaxLastMs = 30000; |
115 | | const int kIntervalMs = 10; |
116 | | size_t init_aggressive_decommit = 0; |
117 | | size_t current_aggressive_decommit = 0; |
118 | | size_t expected_aggressive_decommit = 0; |
119 | | int64_t last_memory_pressure = 0; |
120 | | |
121 | | MallocExtension::instance()->GetNumericProperty("tcmalloc.aggressive_memory_decommit", |
122 | | &init_aggressive_decommit); |
123 | | current_aggressive_decommit = init_aggressive_decommit; |
124 | | |
125 | | while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(kIntervalMs))) { |
126 | | size_t tc_used_bytes = 0; |
127 | | size_t tc_alloc_bytes = 0; |
128 | | size_t rss = PerfCounters::get_vm_rss(); |
129 | | |
130 | | MallocExtension::instance()->GetNumericProperty("generic.total_physical_bytes", |
131 | | &tc_alloc_bytes); |
132 | | MallocExtension::instance()->GetNumericProperty("generic.current_allocated_bytes", |
133 | | &tc_used_bytes); |
134 | | int64_t tc_cached_bytes = (int64_t)tc_alloc_bytes - (int64_t)tc_used_bytes; |
135 | | int64_t to_free_bytes = |
136 | | (int64_t)tc_cached_bytes - ((int64_t)tc_used_bytes * max_cache_percent / 100); |
137 | | to_free_bytes = std::max(to_free_bytes, (int64_t)0); |
138 | | |
139 | | int64_t memory_pressure = 0; |
140 | | int64_t rss_pressure = 0; |
141 | | int64_t alloc_bytes = std::max(rss, tc_alloc_bytes); |
142 | | memory_pressure = alloc_bytes * 100 / physical_limit_bytes; |
143 | | rss_pressure = rss * 100 / physical_limit_bytes; |
144 | | |
145 | | expected_aggressive_decommit = init_aggressive_decommit; |
146 | | if (memory_pressure > pressure_limit) { |
147 | | // We are reaching oom, so release cache aggressively. |
148 | | // Ideally, we should reuse cache and not allocate from system any more, |
149 | | // however, it is hard to set limit on cache of tcmalloc and doris |
150 | | // use mmap in vectorized mode. |
151 | | // Limit cache capactiy is enough. |
152 | | if (rss_pressure > pressure_limit) { |
153 | | int64_t min_free_bytes = alloc_bytes - physical_limit_bytes * 9 / 10; |
154 | | to_free_bytes = std::max(to_free_bytes, min_free_bytes); |
155 | | to_free_bytes = std::max(to_free_bytes, tc_cached_bytes * 30 / 100); |
156 | | // We assure that we have at least 500M bytes in cache. |
157 | | to_free_bytes = std::min(to_free_bytes, tc_cached_bytes - 500 * 1024 * 1024); |
158 | | expected_aggressive_decommit = 1; |
159 | | } |
160 | | last_ms = kMaxLastMs; |
161 | | } else if (memory_pressure > (pressure_limit - 10)) { |
162 | | // In most cases, adjusting release rate is enough, if memory are consumed quickly |
163 | | // we should release manually. |
164 | | if (last_memory_pressure <= (pressure_limit - 10)) { |
165 | | to_free_bytes = std::max(to_free_bytes, tc_cached_bytes * 10 / 100); |
166 | | } |
167 | | } |
168 | | |
169 | | int release_rate_index = memory_pressure / 10; |
170 | | double release_rate = 1.0; |
171 | | if (release_rate_index >= sizeof(release_rates) / sizeof(release_rates[0])) { |
172 | | release_rate = 2000.0; |
173 | | } else { |
174 | | release_rate = release_rates[release_rate_index]; |
175 | | } |
176 | | MallocExtension::instance()->SetMemoryReleaseRate(release_rate); |
177 | | |
178 | | if ((current_aggressive_decommit != expected_aggressive_decommit) && !is_performance_mode) { |
179 | | MallocExtension::instance()->SetNumericProperty("tcmalloc.aggressive_memory_decommit", |
180 | | expected_aggressive_decommit); |
181 | | current_aggressive_decommit = expected_aggressive_decommit; |
182 | | } |
183 | | |
184 | | last_memory_pressure = memory_pressure; |
185 | | // We release at least 2% bytes once, frequent releasing hurts performance. |
186 | | if (to_free_bytes > (physical_limit_bytes * 2 / 100)) { |
187 | | last_ms += kIntervalMs; |
188 | | if (last_ms >= kMaxLastMs) { |
189 | | LOG(INFO) << "generic.current_allocated_bytes " << tc_used_bytes |
190 | | << ", generic.total_physical_bytes " << tc_alloc_bytes << ", rss " << rss |
191 | | << ", max_cache_percent " << max_cache_percent << ", release_rate " |
192 | | << release_rate << ", memory_pressure " << memory_pressure |
193 | | << ", physical_limit_bytes " << physical_limit_bytes << ", to_free_bytes " |
194 | | << to_free_bytes << ", current_aggressive_decommit " |
195 | | << current_aggressive_decommit; |
196 | | MallocExtension::instance()->ReleaseToSystem(to_free_bytes); |
197 | | last_ms = 0; |
198 | | } |
199 | | } else { |
200 | | last_ms = 0; |
201 | | } |
202 | | } |
203 | | #endif |
204 | 1 | } |
205 | | |
206 | 4.79k | void refresh_process_memory_metrics() { |
207 | 4.79k | doris::PerfCounters::refresh_proc_status(); |
208 | 4.79k | doris::MemInfo::refresh_proc_meminfo(); |
209 | 4.79k | doris::GlobalMemoryArbitrator::reset_refresh_interval_memory_growth(); |
210 | 4.79k | ExecEnv::GetInstance()->brpc_iobuf_block_memory_tracker()->set_consumption( |
211 | 4.79k | butil::IOBuf::block_memory()); |
212 | 4.79k | } |
213 | | |
214 | 4.79k | void refresh_common_allocator_metrics() { |
215 | | #if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) |
216 | | doris::JemallocControl::refresh_allocator_mem(); |
217 | | if (config::enable_system_metrics) { |
218 | | DorisMetrics::instance()->system_metrics()->update_allocator_metrics(); |
219 | | } |
220 | | #endif |
221 | 4.79k | doris::GlobalMemoryArbitrator::refresh_memory_bvar(); |
222 | 4.79k | } |
223 | | |
224 | 4.79k | void refresh_memory_state_after_memory_change() { |
225 | 4.79k | if (abs(last_print_proc_mem - PerfCounters::get_vm_rss()) > 268435456) { |
226 | 10 | last_print_proc_mem = PerfCounters::get_vm_rss(); |
227 | 10 | doris::MemTrackerLimiter::clean_tracker_limiter_group(); |
228 | 10 | doris::ProcessProfile::instance()->memory_profile()->enable_print_log_process_usage(); |
229 | 10 | doris::ProcessProfile::instance()->memory_profile()->refresh_memory_overview_profile(); |
230 | 10 | doris::JemallocControl::notify_je_purge_dirty_pages(); |
231 | 10 | LOG(INFO) << doris::GlobalMemoryArbitrator:: |
232 | 10 | process_mem_log_str(); // print mem log when memory state by 256M |
233 | 10 | } |
234 | 4.79k | } |
235 | | |
236 | 4.79k | void refresh_cache_capacity() { |
237 | 4.79k | if (doris::GlobalMemoryArbitrator::cache_adjust_capacity_notify.load( |
238 | 4.79k | std::memory_order_relaxed)) { |
239 | | // the last cache capacity adjustment has not been completed. |
240 | | // if not return, last_periodic_refreshed_cache_capacity_adjust_weighted may be modified, but notify is ignored. |
241 | 0 | return; |
242 | 0 | } |
243 | 4.79k | if (refresh_cache_capacity_sleep_time_ms <= 0) { |
244 | 4.79k | auto cache_capacity_reduce_mem_limit = int64_t( |
245 | 4.79k | doris::MemInfo::soft_mem_limit() * config::cache_capacity_reduce_mem_limit_frac); |
246 | 4.79k | int64_t process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage(); |
247 | | // the rule is like this: |
248 | | // 1. if the process mem usage < soft memlimit * 0.6, then do not need adjust cache capacity. |
249 | | // 2. if the process mem usage > soft memlimit * 0.6 and process mem usage < soft memlimit, then it will be adjusted to a lower value. |
250 | | // 3. if the process mem usage > soft memlimit, then the capacity is adjusted to 0. |
251 | 4.79k | double new_cache_capacity_adjust_weighted = |
252 | 4.79k | AlgoUtil::descent_by_step(10, cache_capacity_reduce_mem_limit, |
253 | 4.79k | doris::MemInfo::soft_mem_limit(), process_memory_usage); |
254 | 4.79k | if (new_cache_capacity_adjust_weighted != |
255 | 4.79k | doris::GlobalMemoryArbitrator::last_periodic_refreshed_cache_capacity_adjust_weighted) { |
256 | 0 | doris::GlobalMemoryArbitrator::last_periodic_refreshed_cache_capacity_adjust_weighted = |
257 | 0 | new_cache_capacity_adjust_weighted; |
258 | 0 | doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity(); |
259 | 0 | refresh_cache_capacity_sleep_time_ms = config::memory_gc_sleep_time_ms; |
260 | 4.79k | } else { |
261 | 4.79k | refresh_cache_capacity_sleep_time_ms = 0; |
262 | 4.79k | } |
263 | 4.79k | } |
264 | 4.79k | refresh_cache_capacity_sleep_time_ms -= config::memory_maintenance_sleep_time_ms; |
265 | 4.79k | } |
266 | | |
267 | 4.79k | void je_reset_dirty_decay() { |
268 | | #ifdef USE_JEMALLOC |
269 | | if (doris::JemallocControl::je_reset_dirty_decay_notify.load(std::memory_order_relaxed)) { |
270 | | // if not return, je_enable_dirty_page may be modified, but notify is ignored. |
271 | | return; |
272 | | } |
273 | | |
274 | | if (je_reset_dirty_decay_sleep_time_ms <= 0) { |
275 | | bool new_je_enable_dirty_page = true; |
276 | | if (doris::JemallocControl::je_enable_dirty_page) { |
277 | | // if Jemalloc dirty page is enabled and process memory exceed soft mem limit, |
278 | | // disable Jemalloc dirty page. |
279 | | new_je_enable_dirty_page = !GlobalMemoryArbitrator::is_exceed_soft_mem_limit(); |
280 | | } else { |
281 | | // if Jemalloc dirty page is disabled and 10% free memory left to exceed soft mem limit, |
282 | | // enable Jemalloc dirty page, this is to avoid frequent changes |
283 | | // between enabling and disabling Jemalloc dirty pages, if the process memory does |
284 | | // not exceed the soft mem limit after turning off Jemalloc dirty pages, |
285 | | // but it will exceed soft mem limit after turning it on. |
286 | | new_je_enable_dirty_page = !GlobalMemoryArbitrator::is_exceed_soft_mem_limit( |
287 | | int64_t(doris::MemInfo::soft_mem_limit() * 0.1)); |
288 | | } |
289 | | |
290 | | if (doris::JemallocControl::je_enable_dirty_page != new_je_enable_dirty_page) { |
291 | | // `notify_je_reset_dirty_decay` only if `je_enable_dirty_page` changes. |
292 | | doris::JemallocControl::je_enable_dirty_page = new_je_enable_dirty_page; |
293 | | doris::JemallocControl::notify_je_reset_dirty_decay(); |
294 | | je_reset_dirty_decay_sleep_time_ms = config::memory_gc_sleep_time_ms; |
295 | | } else { |
296 | | je_reset_dirty_decay_sleep_time_ms = 0; |
297 | | } |
298 | | } |
299 | | je_reset_dirty_decay_sleep_time_ms -= config::memory_maintenance_sleep_time_ms; |
300 | | #endif |
301 | 4.79k | } |
302 | | |
303 | 4.79k | void memory_gc() { |
304 | 4.79k | if (config::disable_memory_gc) { |
305 | 0 | return; |
306 | 0 | } |
307 | 4.79k | if (memory_gc_sleep_time <= 0) { |
308 | 480 | auto gc_func = [](const std::string& revoke_reason) { |
309 | 0 | doris::ProcessProfile::instance()->memory_profile()->print_log_process_usage(); |
310 | 0 | if (doris::MemoryReclamation::revoke_process_memory(revoke_reason)) { |
311 | | // If there is not enough memory to be gc, the process memory usage will not be printed in the next continuous gc. |
312 | 0 | doris::ProcessProfile::instance() |
313 | 0 | ->memory_profile() |
314 | 0 | ->enable_print_log_process_usage(); |
315 | 0 | } |
316 | 0 | }; |
317 | | |
318 | 480 | if (doris::GlobalMemoryArbitrator::sys_mem_available() < |
319 | 480 | doris::MemInfo::sys_mem_available_low_water_mark()) { |
320 | 0 | gc_func("sys available memory less than low water mark"); |
321 | 480 | } else if (doris::GlobalMemoryArbitrator::process_memory_usage() > |
322 | 480 | doris::MemInfo::mem_limit()) { |
323 | 0 | gc_func("process memory used exceed limit"); |
324 | 0 | } |
325 | 480 | memory_gc_sleep_time = config::memory_gc_sleep_time_ms; |
326 | 480 | } |
327 | 4.79k | memory_gc_sleep_time -= config::memory_maintenance_sleep_time_ms; |
328 | 4.79k | } |
329 | | |
330 | 1 | void Daemon::memory_maintenance_thread() { |
331 | 4.79k | while (!_stop_background_threads_latch.wait_for( |
332 | 4.79k | std::chrono::milliseconds(config::memory_maintenance_sleep_time_ms))) { |
333 | | // step 1. Refresh process memory metrics. |
334 | 4.79k | refresh_process_memory_metrics(); |
335 | | |
336 | | // step 2. Refresh jemalloc/tcmalloc metrics. |
337 | 4.79k | refresh_common_allocator_metrics(); |
338 | | |
339 | | // step 3. Update and print memory stat when the memory changes by 256M. |
340 | 4.79k | refresh_memory_state_after_memory_change(); |
341 | | |
342 | | // step 4. Asyn Refresh cache capacity |
343 | | // TODO adjust cache capacity based on smoothstep (smooth gradient). |
344 | 4.79k | refresh_cache_capacity(); |
345 | | |
346 | | // step 5. Cancel top memory task when process memory exceed hard limit. |
347 | 4.79k | memory_gc(); |
348 | | |
349 | | // step 6. Refresh weighted memory ratio of workload groups. |
350 | 4.79k | doris::ExecEnv::GetInstance()->workload_group_mgr()->do_sweep(); |
351 | 4.79k | doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_limit(); |
352 | | |
353 | | // step 7: handle paused queries(caused by memory insufficient) |
354 | 4.79k | doris::ExecEnv::GetInstance()->workload_group_mgr()->handle_paused_queries(); |
355 | | |
356 | | // step 8. Flush memtable |
357 | 4.79k | doris::GlobalMemoryArbitrator::notify_memtable_memory_refresh(); |
358 | | // TODO notify flush memtable |
359 | | |
360 | | // step 9. Reset Jemalloc dirty page decay. |
361 | 4.79k | je_reset_dirty_decay(); |
362 | 4.79k | } |
363 | 1 | } |
364 | | |
365 | 1 | void Daemon::memtable_memory_refresh_thread() { |
366 | | // Refresh the memory statistics of the load channel tracker more frequently, |
367 | | // which helps to accurately control the memory of LoadChannelMgr. |
368 | 4.79k | do { |
369 | 4.79k | std::unique_lock<std::mutex> l(doris::GlobalMemoryArbitrator::memtable_memory_refresh_lock); |
370 | 9.58k | while (_stop_background_threads_latch.count() != 0 && |
371 | 9.58k | !doris::GlobalMemoryArbitrator::memtable_memory_refresh_notify.load( |
372 | 9.58k | std::memory_order_relaxed)) { |
373 | 4.79k | doris::GlobalMemoryArbitrator::memtable_memory_refresh_cv.wait_for( |
374 | 4.79k | l, std::chrono::milliseconds(100)); |
375 | 4.79k | } |
376 | 4.79k | if (_stop_background_threads_latch.count() == 0) { |
377 | 0 | break; |
378 | 0 | } |
379 | | |
380 | 4.79k | Defer defer {[&]() { |
381 | 4.79k | doris::GlobalMemoryArbitrator::memtable_memory_refresh_notify.store( |
382 | 4.79k | false, std::memory_order_relaxed); |
383 | 4.79k | }}; |
384 | 4.79k | doris::ExecEnv::GetInstance()->memtable_memory_limiter()->refresh_mem_tracker(); |
385 | 4.79k | } while (true); |
386 | 1 | } |
387 | | |
388 | | /* |
389 | | * this thread will calculate some metrics at a fix interval(15 sec) |
390 | | * 1. push bytes per second |
391 | | * 2. scan bytes per second |
392 | | * 3. max io util of all disks |
393 | | * 4. max network send bytes rate |
394 | | * 5. max network receive bytes rate |
395 | | */ |
396 | 1 | void Daemon::calculate_metrics_thread() { |
397 | 1 | int64_t last_ts = -1L; |
398 | 1 | int64_t lst_query_bytes = -1; |
399 | | |
400 | 1 | std::map<std::string, int64_t> lst_disks_io_time; |
401 | 1 | std::map<std::string, int64_t> lst_net_send_bytes; |
402 | 1 | std::map<std::string, int64_t> lst_net_receive_bytes; |
403 | | |
404 | 17 | do { |
405 | 17 | DorisMetrics::instance()->metric_registry()->trigger_all_hooks(true); |
406 | | |
407 | 17 | if (last_ts == -1L) { |
408 | 1 | last_ts = GetMonoTimeMicros() / 1000; |
409 | 1 | lst_query_bytes = DorisMetrics::instance()->query_scan_bytes->value(); |
410 | 1 | if (config::enable_system_metrics) { |
411 | 0 | DorisMetrics::instance()->system_metrics()->get_disks_io_time(&lst_disks_io_time); |
412 | 0 | DorisMetrics::instance()->system_metrics()->get_network_traffic( |
413 | 0 | &lst_net_send_bytes, &lst_net_receive_bytes); |
414 | 0 | } |
415 | 16 | } else { |
416 | 16 | int64_t current_ts = GetMonoTimeMicros() / 1000; |
417 | 16 | long interval = (current_ts - last_ts) / 1000; |
418 | 16 | last_ts = current_ts; |
419 | | |
420 | | // 1. query bytes per second |
421 | 16 | int64_t current_query_bytes = DorisMetrics::instance()->query_scan_bytes->value(); |
422 | 16 | int64_t qps = (current_query_bytes - lst_query_bytes) / (interval + 1); |
423 | 16 | DorisMetrics::instance()->query_scan_bytes_per_second->set_value(qps < 0 ? 0 : qps); |
424 | 16 | lst_query_bytes = current_query_bytes; |
425 | | |
426 | 16 | if (config::enable_system_metrics) { |
427 | | // 2. max disk io util |
428 | 0 | DorisMetrics::instance()->system_metrics()->update_max_disk_io_util_percent( |
429 | 0 | lst_disks_io_time, 15); |
430 | | |
431 | | // update lst map |
432 | 0 | DorisMetrics::instance()->system_metrics()->get_disks_io_time(&lst_disks_io_time); |
433 | | |
434 | | // 3. max network traffic |
435 | 0 | int64_t max_send = 0; |
436 | 0 | int64_t max_receive = 0; |
437 | 0 | DorisMetrics::instance()->system_metrics()->get_max_net_traffic( |
438 | 0 | lst_net_send_bytes, lst_net_receive_bytes, 15, &max_send, &max_receive); |
439 | 0 | DorisMetrics::instance()->system_metrics()->update_max_network_send_bytes_rate( |
440 | 0 | max_send); |
441 | 0 | DorisMetrics::instance()->system_metrics()->update_max_network_receive_bytes_rate( |
442 | 0 | max_receive); |
443 | | // update lst map |
444 | 0 | DorisMetrics::instance()->system_metrics()->get_network_traffic( |
445 | 0 | &lst_net_send_bytes, &lst_net_receive_bytes); |
446 | |
|
447 | 0 | DorisMetrics::instance()->system_metrics()->update_be_avail_cpu_num(); |
448 | 0 | } |
449 | 16 | update_rowsets_and_segments_num_metrics(); |
450 | 16 | } |
451 | 17 | } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(15))); |
452 | 1 | } |
453 | | |
454 | 1 | void Daemon::report_runtime_query_statistics_thread() { |
455 | 83 | while (!_stop_background_threads_latch.wait_for( |
456 | 83 | std::chrono::milliseconds(config::report_query_statistics_interval_ms))) { |
457 | 82 | ExecEnv::GetInstance()->runtime_query_statistics_mgr()->report_runtime_query_statistics(); |
458 | 82 | } |
459 | 1 | } |
460 | | |
461 | 1 | void Daemon::je_reset_dirty_decay_thread() const { |
462 | 1 | do { |
463 | 1 | std::unique_lock<std::mutex> l(doris::JemallocControl::je_reset_dirty_decay_lock); |
464 | 2.46k | while (_stop_background_threads_latch.count() != 0 && |
465 | 2.46k | !doris::JemallocControl::je_reset_dirty_decay_notify.load( |
466 | 2.46k | std::memory_order_relaxed)) { |
467 | 2.46k | doris::JemallocControl::je_reset_dirty_decay_cv.wait_for( |
468 | 2.46k | l, std::chrono::milliseconds(100)); |
469 | 2.46k | } |
470 | 1 | if (_stop_background_threads_latch.count() == 0) { |
471 | 0 | break; |
472 | 0 | } |
473 | | |
474 | 1 | Defer defer {[&]() { |
475 | 0 | doris::JemallocControl::je_reset_dirty_decay_notify.store(false, |
476 | 0 | std::memory_order_relaxed); |
477 | 0 | }}; |
478 | | #ifdef USE_JEMALLOC |
479 | | if (config::disable_memory_gc || !config::enable_je_purge_dirty_pages) { |
480 | | continue; |
481 | | } |
482 | | |
483 | | // There is a significant difference only when dirty_decay_ms is equal to 0 or not. |
484 | | // |
485 | | // 1. When dirty_decay_ms is not equal to 0, the free memory will be cached in the Jemalloc |
486 | | // dirty page first. even if dirty_decay_ms is equal to 1, the Jemalloc dirty page will not |
487 | | // be released to the system exactly after 1ms, it will be released according to the decay rule. |
488 | | // The Jemalloc document specifies that dirty_decay_ms is an approximate time. |
489 | | // |
490 | | // 2. It has been observed in an actual cluster that even if dirty_decay_ms is changed |
491 | | // from th default 5000 to 1, Jemalloc dirty page will still cache a large amount of memory, everything |
492 | | // seems to be the same as `dirty_decay_ms:5000`. only when dirty_decay_ms is changed to 0, |
493 | | // jemalloc dirty page will stop caching and free memory will be released to the system immediately. |
494 | | // of course, performance will be affected. |
495 | | // |
496 | | // 3. After reducing dirty_decay_ms, manually calling `decay_all_arena_dirty_pages` may release dirty pages |
497 | | // as soon as possible, but no relevant experimental data can be found, so it is simple and safe |
498 | | // to adjust dirty_decay_ms only between zero and non-zero. |
499 | | |
500 | | if (doris::JemallocControl::je_enable_dirty_page) { |
501 | | doris::JemallocControl::je_reset_all_arena_dirty_decay_ms(config::je_dirty_decay_ms); |
502 | | } else { |
503 | | doris::JemallocControl::je_reset_all_arena_dirty_decay_ms(0); |
504 | | } |
505 | | #endif |
506 | 1 | } while (true); |
507 | 1 | } |
508 | | |
509 | 1 | void Daemon::cache_adjust_capacity_thread() { |
510 | 1 | do { |
511 | 1 | std::unique_lock<std::mutex> l(doris::GlobalMemoryArbitrator::cache_adjust_capacity_lock); |
512 | 2.46k | while (_stop_background_threads_latch.count() != 0 && |
513 | 2.46k | !doris::GlobalMemoryArbitrator::cache_adjust_capacity_notify.load( |
514 | 2.46k | std::memory_order_relaxed)) { |
515 | 2.46k | doris::GlobalMemoryArbitrator::cache_adjust_capacity_cv.wait_for( |
516 | 2.46k | l, std::chrono::milliseconds(100)); |
517 | 2.46k | } |
518 | 1 | double adjust_weighted = std::min<double>( |
519 | 1 | GlobalMemoryArbitrator::last_periodic_refreshed_cache_capacity_adjust_weighted, |
520 | 1 | GlobalMemoryArbitrator::last_memory_exceeded_cache_capacity_adjust_weighted); |
521 | 1 | if (_stop_background_threads_latch.count() == 0) { |
522 | 0 | break; |
523 | 0 | } |
524 | | |
525 | 1 | Defer defer {[&]() { |
526 | 0 | doris::GlobalMemoryArbitrator::cache_adjust_capacity_notify.store( |
527 | 0 | false, std::memory_order_relaxed); |
528 | 0 | }}; |
529 | 1 | if (config::disable_memory_gc) { |
530 | 0 | continue; |
531 | 0 | } |
532 | 1 | if (GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted == |
533 | 1 | adjust_weighted) { |
534 | 0 | LOG(INFO) << fmt::format( |
535 | 0 | "[MemoryGC] adjust cache capacity end, adjust_weighted {} has not been " |
536 | 0 | "modified.", |
537 | 0 | adjust_weighted); |
538 | 0 | continue; |
539 | 0 | } |
540 | 1 | std::unique_ptr<RuntimeProfile> profile = std::make_unique<RuntimeProfile>(""); |
541 | 1 | auto freed_mem = CacheManager::instance()->for_each_cache_refresh_capacity(adjust_weighted, |
542 | 1 | profile.get()); |
543 | 1 | std::stringstream ss; |
544 | 1 | profile->pretty_print(&ss); |
545 | 1 | LOG(INFO) << fmt::format( |
546 | 1 | "[MemoryGC] adjust cache capacity end, free memory {}, details: {}", |
547 | 1 | PrettyPrinter::print(freed_mem, TUnit::BYTES), ss.str()); |
548 | 1 | GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted = adjust_weighted; |
549 | 1 | } while (true); |
550 | 1 | } |
551 | | |
552 | 1 | void Daemon::cache_prune_stale_thread() { |
553 | 1 | int32_t interval = config::cache_periodic_prune_stale_sweep_sec; |
554 | 5 | while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))) { |
555 | 4 | if (config::cache_periodic_prune_stale_sweep_sec <= 0) { |
556 | 0 | LOG(WARNING) << "config of cache clean interval is: [" << interval |
557 | 0 | << "], it means the cache prune stale thread is disabled, will wait 3s " |
558 | 0 | "and check again."; |
559 | 0 | interval = 3; |
560 | 0 | continue; |
561 | 0 | } |
562 | 4 | if (config::disable_memory_gc) { |
563 | 0 | continue; |
564 | 0 | } |
565 | 4 | CacheManager::instance()->for_each_cache_prune_stale(); |
566 | 4 | interval = config::cache_periodic_prune_stale_sweep_sec; |
567 | 4 | } |
568 | 1 | } |
569 | | |
570 | 1 | void Daemon::be_proc_monitor_thread() { |
571 | 9 | while (!_stop_background_threads_latch.wait_for( |
572 | 9 | std::chrono::milliseconds(config::be_proc_monitor_interval_ms))) { |
573 | 8 | LOG(INFO) << "log be thread num, " << BeProcMonitor::get_be_thread_info(); |
574 | 8 | } |
575 | 1 | } |
576 | | |
577 | 1 | void Daemon::calculate_workload_group_metrics_thread() { |
578 | 50 | while (!_stop_background_threads_latch.wait_for( |
579 | 50 | std::chrono::milliseconds(config::workload_group_metrics_interval_ms))) { |
580 | 49 | ExecEnv::GetInstance()->workload_group_mgr()->refresh_workload_group_metrics(); |
581 | 49 | } |
582 | 1 | } |
583 | | |
584 | 1 | void Daemon::start() { |
585 | 1 | Status st; |
586 | 1 | st = Thread::create( |
587 | 1 | "Daemon", "tcmalloc_gc_thread", [this]() { this->tcmalloc_gc_thread(); }, |
588 | 1 | &_threads.emplace_back()); |
589 | 1 | CHECK(st.ok()) << st; |
590 | 1 | st = Thread::create( |
591 | 1 | "Daemon", "memory_maintenance_thread", [this]() { this->memory_maintenance_thread(); }, |
592 | 1 | &_threads.emplace_back()); |
593 | 1 | CHECK(st.ok()) << st; |
594 | 1 | st = Thread::create( |
595 | 1 | "Daemon", "memtable_memory_refresh_thread", |
596 | 1 | [this]() { this->memtable_memory_refresh_thread(); }, &_threads.emplace_back()); |
597 | 1 | CHECK(st.ok()) << st; |
598 | | |
599 | 1 | if (config::enable_metric_calculator) { |
600 | 1 | st = Thread::create( |
601 | 1 | "Daemon", "calculate_metrics_thread", |
602 | 1 | [this]() { this->calculate_metrics_thread(); }, &_threads.emplace_back()); |
603 | 1 | CHECK(st.ok()) << st; |
604 | 1 | } |
605 | 1 | st = Thread::create( |
606 | 1 | "Daemon", "je_reset_dirty_decay_thread", |
607 | 1 | [this]() { this->je_reset_dirty_decay_thread(); }, &_threads.emplace_back()); |
608 | 1 | CHECK(st.ok()) << st; |
609 | 1 | st = Thread::create( |
610 | 1 | "Daemon", "cache_adjust_capacity_thread", |
611 | 1 | [this]() { this->cache_adjust_capacity_thread(); }, &_threads.emplace_back()); |
612 | 1 | CHECK(st.ok()) << st; |
613 | 1 | st = Thread::create( |
614 | 1 | "Daemon", "cache_prune_stale_thread", [this]() { this->cache_prune_stale_thread(); }, |
615 | 1 | &_threads.emplace_back()); |
616 | 1 | CHECK(st.ok()) << st; |
617 | 1 | st = Thread::create( |
618 | 1 | "Daemon", "query_runtime_statistics_thread", |
619 | 1 | [this]() { this->report_runtime_query_statistics_thread(); }, &_threads.emplace_back()); |
620 | 1 | CHECK(st.ok()) << st; |
621 | | |
622 | 1 | if (config::enable_be_proc_monitor) { |
623 | 1 | st = Thread::create( |
624 | 1 | "Daemon", "be_proc_monitor_thread", [this]() { this->be_proc_monitor_thread(); }, |
625 | 1 | &_threads.emplace_back()); |
626 | 1 | } |
627 | 1 | CHECK(st.ok()) << st; |
628 | | |
629 | 1 | st = Thread::create( |
630 | 1 | "Daemon", "workload_group_metrics", |
631 | 1 | [this]() { this->calculate_workload_group_metrics_thread(); }, |
632 | 1 | &_threads.emplace_back()); |
633 | 1 | CHECK(st.ok()) << st; |
634 | 1 | } |
635 | | |
636 | 0 | void Daemon::stop() { |
637 | 0 | LOG(INFO) << "Doris daemon is stopping."; |
638 | 0 | if (_stop_background_threads_latch.count() == 0) { |
639 | 0 | LOG(INFO) << "Doris daemon stop returned since no bg threads latch."; |
640 | 0 | return; |
641 | 0 | } |
642 | 0 | _stop_background_threads_latch.count_down(); |
643 | 0 | for (auto&& t : _threads) { |
644 | 0 | if (t) { |
645 | 0 | t->join(); |
646 | 0 | } |
647 | 0 | } |
648 | 0 | LOG(INFO) << "Doris daemon stopped after background threads are joined."; |
649 | 0 | } |
650 | | |
651 | | } // namespace doris |