/root/doris/be/src/common/daemon.cpp
| Line | Count | Source | 
| 1 |  | // Licensed to the Apache Software Foundation (ASF) under one | 
| 2 |  | // or more contributor license agreements.  See the NOTICE file | 
| 3 |  | // distributed with this work for additional information | 
| 4 |  | // regarding copyright ownership.  The ASF licenses this file | 
| 5 |  | // to you under the Apache License, Version 2.0 (the | 
| 6 |  | // "License"); you may not use this file except in compliance | 
| 7 |  | // with the License.  You may obtain a copy of the License at | 
| 8 |  | // | 
| 9 |  | //   http://www.apache.org/licenses/LICENSE-2.0 | 
| 10 |  | // | 
| 11 |  | // Unless required by applicable law or agreed to in writing, | 
| 12 |  | // software distributed under the License is distributed on an | 
| 13 |  | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
| 14 |  | // KIND, either express or implied.  See the License for the | 
| 15 |  | // specific language governing permissions and limitations | 
| 16 |  | // under the License. | 
| 17 |  |  | 
| 18 |  | #include "common/daemon.h" | 
| 19 |  |  | 
| 20 |  | // IWYU pragma: no_include <bthread/errno.h> | 
| 21 |  | #include <errno.h> // IWYU pragma: keep | 
| 22 |  | #include <gflags/gflags.h> | 
| 23 |  |  | 
| 24 |  | #include "runtime/memory/jemalloc_control.h" | 
| 25 |  | #if !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && \ | 
| 26 |  |         !defined(THREAD_SANITIZER) && !defined(USE_JEMALLOC) | 
| 27 |  | #include <gperftools/malloc_extension.h> // IWYU pragma: keep | 
| 28 |  | #endif | 
| 29 |  | // IWYU pragma: no_include <bits/std_abs.h> | 
| 30 |  | #include <butil/iobuf.h> | 
| 31 |  | #include <math.h> | 
| 32 |  | #include <stdint.h> | 
| 33 |  | #include <stdlib.h> | 
| 34 |  |  | 
| 35 |  | // IWYU pragma: no_include <bits/chrono.h> | 
| 36 |  | #include <chrono> // IWYU pragma: keep | 
| 37 |  | #include <map> | 
| 38 |  | #include <ostream> | 
| 39 |  | #include <string> | 
| 40 |  |  | 
| 41 |  | #include "cloud/config.h" | 
| 42 |  | #include "common/config.h" | 
| 43 |  | #include "common/logging.h" | 
| 44 |  | #include "common/status.h" | 
| 45 |  | #include "olap/memtable_memory_limiter.h" | 
| 46 |  | #include "olap/storage_engine.h" | 
| 47 |  | #include "olap/tablet_manager.h" | 
| 48 |  | #include "runtime/be_proc_monitor.h" | 
| 49 |  | #include "runtime/exec_env.h" | 
| 50 |  | #include "runtime/fragment_mgr.h" | 
| 51 |  | #include "runtime/memory/global_memory_arbitrator.h" | 
| 52 |  | #include "runtime/memory/mem_tracker_limiter.h" | 
| 53 |  | #include "runtime/memory/memory_reclamation.h" | 
| 54 |  | #include "runtime/process_profile.h" | 
| 55 |  | #include "runtime/runtime_query_statistics_mgr.h" | 
| 56 |  | #include "runtime/workload_group/workload_group_manager.h" | 
| 57 |  | #include "util/algorithm_util.h" | 
| 58 |  | #include "util/doris_metrics.h" | 
| 59 |  | #include "util/mem_info.h" | 
| 60 |  | #include "util/metrics.h" | 
| 61 |  | #include "util/perf_counters.h" | 
| 62 |  | #include "util/system_metrics.h" | 
| 63 |  | #include "util/time.h" | 
| 64 |  |  | 
| 65 |  | namespace doris { | 
| 66 |  | namespace { | 
| 67 |  |  | 
| 68 |  | std::atomic<int64_t> last_print_proc_mem = 0; | 
| 69 |  | std::atomic<int32_t> refresh_cache_capacity_sleep_time_ms = 0; | 
| 70 |  | std::atomic<int32_t> memory_gc_sleep_time = 0; | 
| 71 |  | #ifdef USE_JEMALLOC | 
| 72 |  | std::atomic<int32_t> je_reset_dirty_decay_sleep_time_ms = 0; | 
| 73 |  | #endif | 
| 74 |  |  | 
| 75 | 498 | void update_rowsets_and_segments_num_metrics() { | 
| 76 | 498 |     if (config::is_cloud_mode()) { | 
| 77 |  |         // TODO(plat1ko): CloudStorageEngine | 
| 78 | 398 |     } else { | 
| 79 | 100 |         StorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_local(); | 
| 80 | 100 |         auto* metrics = DorisMetrics::instance(); | 
| 81 | 100 |         metrics->all_rowsets_num->set_value(engine.tablet_manager()->get_rowset_nums()); | 
| 82 | 100 |         metrics->all_segments_num->set_value(engine.tablet_manager()->get_segment_nums()); | 
| 83 | 100 |     } | 
| 84 | 498 | } | 
| 85 |  |  | 
| 86 |  | } // namespace | 
| 87 |  |  | 
| 88 | 7 | void Daemon::tcmalloc_gc_thread() { | 
| 89 |  |     // TODO All cache GC wish to be supported | 
| 90 |  | #if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) && \ | 
| 91 |  |         !defined(USE_JEMALLOC) | 
| 92 |  |  | 
| 93 |  |     // Limit size of tcmalloc cache via release_rate and max_cache_percent. | 
| 94 |  |     // We adjust release_rate according to memory_pressure, which is usage percent of memory. | 
| 95 |  |     int64_t max_cache_percent = 60; | 
| 96 |  |     double release_rates[10] = {1.0, 1.0, 1.0, 5.0, 5.0, 20.0, 50.0, 100.0, 500.0, 2000.0}; | 
| 97 |  |     int64_t pressure_limit = 90; | 
| 98 |  |     bool is_performance_mode = false; | 
| 99 |  |     int64_t physical_limit_bytes = | 
| 100 |  |             std::min(MemInfo::physical_mem() - MemInfo::sys_mem_available_low_water_mark(), | 
| 101 |  |                      MemInfo::mem_limit()); | 
| 102 |  |  | 
| 103 |  |     if (config::memory_mode == std::string("performance")) { | 
| 104 |  |         max_cache_percent = 100; | 
| 105 |  |         pressure_limit = 90; | 
| 106 |  |         is_performance_mode = true; | 
| 107 |  |         physical_limit_bytes = std::min(MemInfo::mem_limit(), MemInfo::physical_mem()); | 
| 108 |  |     } else if (config::memory_mode == std::string("compact")) { | 
| 109 |  |         max_cache_percent = 20; | 
| 110 |  |         pressure_limit = 80; | 
| 111 |  |     } | 
| 112 |  |  | 
| 113 |  |     int last_ms = 0; | 
| 114 |  |     const int kMaxLastMs = 30000; | 
| 115 |  |     const int kIntervalMs = 10; | 
| 116 |  |     size_t init_aggressive_decommit = 0; | 
| 117 |  |     size_t current_aggressive_decommit = 0; | 
| 118 |  |     size_t expected_aggressive_decommit = 0; | 
| 119 |  |     int64_t last_memory_pressure = 0; | 
| 120 |  |  | 
| 121 |  |     MallocExtension::instance()->GetNumericProperty("tcmalloc.aggressive_memory_decommit", | 
| 122 |  |                                                     &init_aggressive_decommit); | 
| 123 |  |     current_aggressive_decommit = init_aggressive_decommit; | 
| 124 |  |  | 
| 125 |  |     while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(kIntervalMs))) { | 
| 126 |  |         size_t tc_used_bytes = 0; | 
| 127 |  |         size_t tc_alloc_bytes = 0; | 
| 128 |  |         size_t rss = PerfCounters::get_vm_rss(); | 
| 129 |  |  | 
| 130 |  |         MallocExtension::instance()->GetNumericProperty("generic.total_physical_bytes", | 
| 131 |  |                                                         &tc_alloc_bytes); | 
| 132 |  |         MallocExtension::instance()->GetNumericProperty("generic.current_allocated_bytes", | 
| 133 |  |                                                         &tc_used_bytes); | 
| 134 |  |         int64_t tc_cached_bytes = (int64_t)tc_alloc_bytes - (int64_t)tc_used_bytes; | 
| 135 |  |         int64_t to_free_bytes = | 
| 136 |  |                 (int64_t)tc_cached_bytes - ((int64_t)tc_used_bytes * max_cache_percent / 100); | 
| 137 |  |         to_free_bytes = std::max(to_free_bytes, (int64_t)0); | 
| 138 |  |  | 
| 139 |  |         int64_t memory_pressure = 0; | 
| 140 |  |         int64_t rss_pressure = 0; | 
| 141 |  |         int64_t alloc_bytes = std::max(rss, tc_alloc_bytes); | 
| 142 |  |         memory_pressure = alloc_bytes * 100 / physical_limit_bytes; | 
| 143 |  |         rss_pressure = rss * 100 / physical_limit_bytes; | 
| 144 |  |  | 
| 145 |  |         expected_aggressive_decommit = init_aggressive_decommit; | 
| 146 |  |         if (memory_pressure > pressure_limit) { | 
| 147 |  |             // We are reaching oom, so release cache aggressively. | 
| 148 |  |             // Ideally, we should reuse cache and not allocate from system any more, | 
| 149 |  |             // however, it is hard to set limit on cache of tcmalloc and doris | 
| 150 |  |             // use mmap in vectorized mode. | 
| 151 |  |             // Limit cache capactiy is enough. | 
| 152 |  |             if (rss_pressure > pressure_limit) { | 
| 153 |  |                 int64_t min_free_bytes = alloc_bytes - physical_limit_bytes * 9 / 10; | 
| 154 |  |                 to_free_bytes = std::max(to_free_bytes, min_free_bytes); | 
| 155 |  |                 to_free_bytes = std::max(to_free_bytes, tc_cached_bytes * 30 / 100); | 
| 156 |  |                 // We assure that we have at least 500M bytes in cache. | 
| 157 |  |                 to_free_bytes = std::min(to_free_bytes, tc_cached_bytes - 500 * 1024 * 1024); | 
| 158 |  |                 expected_aggressive_decommit = 1; | 
| 159 |  |             } | 
| 160 |  |             last_ms = kMaxLastMs; | 
| 161 |  |         } else if (memory_pressure > (pressure_limit - 10)) { | 
| 162 |  |             // In most cases, adjusting release rate is enough, if memory are consumed quickly | 
| 163 |  |             // we should release manually. | 
| 164 |  |             if (last_memory_pressure <= (pressure_limit - 10)) { | 
| 165 |  |                 to_free_bytes = std::max(to_free_bytes, tc_cached_bytes * 10 / 100); | 
| 166 |  |             } | 
| 167 |  |         } | 
| 168 |  |  | 
| 169 |  |         int release_rate_index = int(memory_pressure) / 10; | 
| 170 |  |         double release_rate = 1.0; | 
| 171 |  |         if (release_rate_index >= sizeof(release_rates) / sizeof(release_rates[0])) { | 
| 172 |  |             release_rate = 2000.0; | 
| 173 |  |         } else { | 
| 174 |  |             release_rate = release_rates[release_rate_index]; | 
| 175 |  |         } | 
| 176 |  |         MallocExtension::instance()->SetMemoryReleaseRate(release_rate); | 
| 177 |  |  | 
| 178 |  |         if ((current_aggressive_decommit != expected_aggressive_decommit) && !is_performance_mode) { | 
| 179 |  |             MallocExtension::instance()->SetNumericProperty("tcmalloc.aggressive_memory_decommit", | 
| 180 |  |                                                             expected_aggressive_decommit); | 
| 181 |  |             current_aggressive_decommit = expected_aggressive_decommit; | 
| 182 |  |         } | 
| 183 |  |  | 
| 184 |  |         last_memory_pressure = memory_pressure; | 
| 185 |  |         // We release at least 2% bytes once, frequent releasing hurts performance. | 
| 186 |  |         if (to_free_bytes > (physical_limit_bytes * 2 / 100)) { | 
| 187 |  |             last_ms += kIntervalMs; | 
| 188 |  |             if (last_ms >= kMaxLastMs) { | 
| 189 |  |                 LOG(INFO) << "generic.current_allocated_bytes " << tc_used_bytes | 
| 190 |  |                           << ", generic.total_physical_bytes " << tc_alloc_bytes << ", rss " << rss | 
| 191 |  |                           << ", max_cache_percent " << max_cache_percent << ", release_rate " | 
| 192 |  |                           << release_rate << ", memory_pressure " << memory_pressure | 
| 193 |  |                           << ", physical_limit_bytes " << physical_limit_bytes << ", to_free_bytes " | 
| 194 |  |                           << to_free_bytes << ", current_aggressive_decommit " | 
| 195 |  |                           << current_aggressive_decommit; | 
| 196 |  |                 MallocExtension::instance()->ReleaseToSystem(to_free_bytes); | 
| 197 |  |                 last_ms = 0; | 
| 198 |  |             } | 
| 199 |  |         } else { | 
| 200 |  |             last_ms = 0; | 
| 201 |  |         } | 
| 202 |  |     } | 
| 203 |  | #endif | 
| 204 | 7 | } | 
| 205 |  |  | 
| 206 | 130k | void refresh_process_memory_metrics() { | 
| 207 | 130k |     doris::PerfCounters::refresh_proc_status(); | 
| 208 | 130k |     doris::MemInfo::refresh_proc_meminfo(); | 
| 209 | 130k |     doris::GlobalMemoryArbitrator::reset_refresh_interval_memory_growth(); | 
| 210 | 130k |     ExecEnv::GetInstance()->brpc_iobuf_block_memory_tracker()->set_consumption( | 
| 211 | 130k |             butil::IOBuf::block_memory()); | 
| 212 | 130k | } | 
| 213 |  |  | 
| 214 | 130k | void refresh_common_allocator_metrics() { | 
| 215 |  | #if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) | 
| 216 |  |     doris::JemallocControl::refresh_allocator_mem(); | 
| 217 |  |     if (config::enable_system_metrics) { | 
| 218 |  |         DorisMetrics::instance()->system_metrics()->update_allocator_metrics(); | 
| 219 |  |     } | 
| 220 |  | #endif | 
| 221 | 130k |     doris::GlobalMemoryArbitrator::refresh_memory_bvar(); | 
| 222 | 130k | } | 
| 223 |  |  | 
| 224 | 130k | void refresh_memory_state_after_memory_change() { | 
| 225 | 130k |     if (abs(last_print_proc_mem - PerfCounters::get_vm_rss()) > 268435456) { | 
| 226 | 251 |         last_print_proc_mem = PerfCounters::get_vm_rss(); | 
| 227 | 251 |         doris::MemTrackerLimiter::clean_tracker_limiter_group(); | 
| 228 | 251 |         doris::ProcessProfile::instance()->memory_profile()->enable_print_log_process_usage(); | 
| 229 | 251 |         doris::ProcessProfile::instance()->memory_profile()->refresh_memory_overview_profile(); | 
| 230 | 251 |         doris::JemallocControl::notify_je_purge_dirty_pages(); | 
| 231 | 251 |         LOG(INFO) << doris::GlobalMemoryArbitrator:: | 
| 232 | 251 |                         process_mem_log_str(); // print mem log when memory state by 256M | 
| 233 | 251 |     } | 
| 234 | 130k | } | 
| 235 |  |  | 
| 236 | 130k | void refresh_cache_capacity() { | 
| 237 | 130k |     if (doris::GlobalMemoryArbitrator::cache_adjust_capacity_notify.load( | 
| 238 | 130k |                 std::memory_order_relaxed)) { | 
| 239 |  |         // the last cache capacity adjustment has not been completed. | 
| 240 |  |         // if not return, last_periodic_refreshed_cache_capacity_adjust_weighted may be modified, but notify is ignored. | 
| 241 | 0 |         return; | 
| 242 | 0 |     } | 
| 243 | 130k |     if (refresh_cache_capacity_sleep_time_ms <= 0) { | 
| 244 | 130k |         auto cache_capacity_reduce_mem_limit = int64_t( | 
| 245 | 130k |                 doris::MemInfo::soft_mem_limit() * config::cache_capacity_reduce_mem_limit_frac); | 
| 246 | 130k |         int64_t process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage(); | 
| 247 |  |         // the rule is like this: | 
| 248 |  |         // 1. if the process mem usage < soft memlimit * 0.6, then do not need adjust cache capacity. | 
| 249 |  |         // 2. if the process mem usage > soft memlimit * 0.6 and process mem usage < soft memlimit, then it will be adjusted to a lower value. | 
| 250 |  |         // 3. if the process mem usage > soft memlimit, then the capacity is adjusted to 0. | 
| 251 | 130k |         double new_cache_capacity_adjust_weighted = | 
| 252 | 130k |                 AlgoUtil::descent_by_step(10, cache_capacity_reduce_mem_limit, | 
| 253 | 130k |                                           doris::MemInfo::soft_mem_limit(), process_memory_usage); | 
| 254 | 130k |         if (new_cache_capacity_adjust_weighted != | 
| 255 | 130k |             doris::GlobalMemoryArbitrator::last_periodic_refreshed_cache_capacity_adjust_weighted) { | 
| 256 | 0 |             doris::GlobalMemoryArbitrator::last_periodic_refreshed_cache_capacity_adjust_weighted = | 
| 257 | 0 |                     new_cache_capacity_adjust_weighted; | 
| 258 | 0 |             doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity(); | 
| 259 | 0 |             refresh_cache_capacity_sleep_time_ms = config::memory_gc_sleep_time_ms; | 
| 260 | 130k |         } else { | 
| 261 | 130k |             refresh_cache_capacity_sleep_time_ms = 0; | 
| 262 | 130k |         } | 
| 263 | 130k |     } | 
| 264 | 130k |     refresh_cache_capacity_sleep_time_ms -= config::memory_maintenance_sleep_time_ms; | 
| 265 | 130k | } | 
| 266 |  |  | 
| 267 | 130k | void je_reset_dirty_decay() { | 
| 268 |  | #ifdef USE_JEMALLOC | 
| 269 |  |     if (doris::JemallocControl::je_reset_dirty_decay_notify.load(std::memory_order_relaxed)) { | 
| 270 |  |         // if not return, je_enable_dirty_page may be modified, but notify is ignored. | 
| 271 |  |         return; | 
| 272 |  |     } | 
| 273 |  |  | 
| 274 |  |     if (je_reset_dirty_decay_sleep_time_ms <= 0) { | 
| 275 |  |         bool new_je_enable_dirty_page = true; | 
| 276 |  |         if (doris::JemallocControl::je_enable_dirty_page) { | 
| 277 |  |             // if Jemalloc dirty page is enabled and process memory exceed soft mem limit, | 
| 278 |  |             // disable Jemalloc dirty page. | 
| 279 |  |             new_je_enable_dirty_page = !GlobalMemoryArbitrator::is_exceed_soft_mem_limit(); | 
| 280 |  |         } else { | 
| 281 |  |             // if Jemalloc dirty page is disabled and 10% free memory left to exceed soft mem limit, | 
| 282 |  |             // enable Jemalloc dirty page, this is to avoid frequent changes | 
| 283 |  |             // between enabling and disabling Jemalloc dirty pages, if the process memory does | 
| 284 |  |             // not exceed the soft mem limit after turning off Jemalloc dirty pages, | 
| 285 |  |             // but it will exceed soft mem limit after turning it on. | 
| 286 |  |             new_je_enable_dirty_page = !GlobalMemoryArbitrator::is_exceed_soft_mem_limit( | 
| 287 |  |                     int64_t(doris::MemInfo::soft_mem_limit() * 0.1)); | 
| 288 |  |         } | 
| 289 |  |  | 
| 290 |  |         if (doris::JemallocControl::je_enable_dirty_page != new_je_enable_dirty_page) { | 
| 291 |  |             // `notify_je_reset_dirty_decay` only if `je_enable_dirty_page` changes. | 
| 292 |  |             doris::JemallocControl::je_enable_dirty_page = new_je_enable_dirty_page; | 
| 293 |  |             doris::JemallocControl::notify_je_reset_dirty_decay(); | 
| 294 |  |             je_reset_dirty_decay_sleep_time_ms = config::memory_gc_sleep_time_ms; | 
| 295 |  |         } else { | 
| 296 |  |             je_reset_dirty_decay_sleep_time_ms = 0; | 
| 297 |  |         } | 
| 298 |  |     } | 
| 299 |  |     je_reset_dirty_decay_sleep_time_ms -= config::memory_maintenance_sleep_time_ms; | 
| 300 |  | #endif | 
| 301 | 130k | } | 
| 302 |  |  | 
| 303 | 130k | void memory_gc() { | 
| 304 | 130k |     if (config::disable_memory_gc) { | 
| 305 | 0 |         return; | 
| 306 | 0 |     } | 
| 307 | 130k |     if (memory_gc_sleep_time <= 0) { | 
| 308 | 13.0k |         auto gc_func = [](const std::string& revoke_reason) { | 
| 309 | 0 |             doris::ProcessProfile::instance()->memory_profile()->print_log_process_usage(); | 
| 310 | 0 |             if (doris::MemoryReclamation::revoke_process_memory(revoke_reason)) { | 
| 311 |  |                 // If there is not enough memory to be gc, the process memory usage will not be printed in the next continuous gc. | 
| 312 | 0 |                 doris::ProcessProfile::instance() | 
| 313 | 0 |                         ->memory_profile() | 
| 314 | 0 |                         ->enable_print_log_process_usage(); | 
| 315 | 0 |             } | 
| 316 | 0 |         }; | 
| 317 |  |  | 
| 318 | 13.0k |         if (doris::GlobalMemoryArbitrator::sys_mem_available() < | 
| 319 | 13.0k |             doris::MemInfo::sys_mem_available_low_water_mark()) { | 
| 320 | 0 |             gc_func("sys available memory less than low water mark"); | 
| 321 | 13.0k |         } else if (doris::GlobalMemoryArbitrator::process_memory_usage() > | 
| 322 | 13.0k |                    doris::MemInfo::mem_limit()) { | 
| 323 | 0 |             gc_func("process memory used exceed limit"); | 
| 324 | 0 |         } | 
| 325 | 13.0k |         memory_gc_sleep_time = config::memory_gc_sleep_time_ms; | 
| 326 | 13.0k |     } | 
| 327 | 130k |     memory_gc_sleep_time -= config::memory_maintenance_sleep_time_ms; | 
| 328 | 130k | } | 
| 329 |  |  | 
| 330 | 7 | void Daemon::memory_maintenance_thread() { | 
| 331 | 130k |     while (!_stop_background_threads_latch.wait_for( | 
| 332 | 130k |             std::chrono::milliseconds(config::memory_maintenance_sleep_time_ms))) { | 
| 333 |  |         // step 1. Refresh process memory metrics. | 
| 334 | 130k |         refresh_process_memory_metrics(); | 
| 335 |  |  | 
| 336 |  |         // step 2. Refresh jemalloc/tcmalloc metrics. | 
| 337 | 130k |         refresh_common_allocator_metrics(); | 
| 338 |  |  | 
| 339 |  |         // step 3. Update and print memory stat when the memory changes by 256M. | 
| 340 | 130k |         refresh_memory_state_after_memory_change(); | 
| 341 |  |  | 
| 342 |  |         // step 4. Asyn Refresh cache capacity | 
| 343 |  |         // TODO adjust cache capacity based on smoothstep (smooth gradient). | 
| 344 | 130k |         refresh_cache_capacity(); | 
| 345 |  |  | 
| 346 |  |         // step 5. Cancel top memory task when process memory exceed hard limit. | 
| 347 | 130k |         memory_gc(); | 
| 348 |  |  | 
| 349 |  |         // step 6. Refresh weighted memory ratio of workload groups. | 
| 350 | 130k |         doris::ExecEnv::GetInstance()->workload_group_mgr()->do_sweep(); | 
| 351 | 130k |         doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_workload_group_memory_state(); | 
| 352 |  |  | 
| 353 |  |         // step 7: handle paused queries(caused by memory insufficient) | 
| 354 | 130k |         doris::ExecEnv::GetInstance()->workload_group_mgr()->handle_paused_queries(); | 
| 355 |  |  | 
| 356 |  |         // step 8. Flush memtable | 
| 357 | 130k |         doris::GlobalMemoryArbitrator::notify_memtable_memory_refresh(); | 
| 358 |  |         // TODO notify flush memtable | 
| 359 |  |  | 
| 360 |  |         // step 9. Reset Jemalloc dirty page decay. | 
| 361 | 130k |         je_reset_dirty_decay(); | 
| 362 | 130k |     } | 
| 363 | 7 | } | 
| 364 |  |  | 
| 365 | 7 | void Daemon::memtable_memory_refresh_thread() { | 
| 366 |  |     // Refresh the memory statistics of the load channel tracker more frequently, | 
| 367 |  |     // which helps to accurately control the memory of LoadChannelMgr. | 
| 368 | 130k |     do { | 
| 369 | 130k |         std::unique_lock<std::mutex> l(doris::GlobalMemoryArbitrator::memtable_memory_refresh_lock); | 
| 370 | 264k |         while (_stop_background_threads_latch.count() != 0 && | 
| 371 | 264k |                !doris::GlobalMemoryArbitrator::memtable_memory_refresh_notify.load( | 
| 372 | 264k |                        std::memory_order_relaxed)) { | 
| 373 | 134k |             doris::GlobalMemoryArbitrator::memtable_memory_refresh_cv.wait_for( | 
| 374 | 134k |                     l, std::chrono::milliseconds(100)); | 
| 375 | 134k |         } | 
| 376 | 130k |         if (_stop_background_threads_latch.count() == 0) { | 
| 377 | 3 |             break; | 
| 378 | 3 |         } | 
| 379 |  |  | 
| 380 | 130k |         Defer defer {[&]() { | 
| 381 | 130k |             doris::GlobalMemoryArbitrator::memtable_memory_refresh_notify.store( | 
| 382 | 130k |                     false, std::memory_order_relaxed); | 
| 383 | 130k |         }}; | 
| 384 | 130k |         doris::ExecEnv::GetInstance()->memtable_memory_limiter()->refresh_mem_tracker(); | 
| 385 | 130k |     } while (true); | 
| 386 | 7 | } | 
| 387 |  |  | 
| 388 |  | /* | 
| 389 |  |  * this thread will calculate some metrics at a fix interval(15 sec) | 
| 390 |  |  * 1. push bytes per second | 
| 391 |  |  * 2. scan bytes per second | 
| 392 |  |  * 3. max io util of all disks | 
| 393 |  |  * 4. max network send bytes rate | 
| 394 |  |  * 5. max network receive bytes rate | 
| 395 |  |  */ | 
| 396 | 7 | void Daemon::calculate_metrics_thread() { | 
| 397 | 7 |     int64_t last_ts = -1L; | 
| 398 | 7 |     int64_t lst_query_bytes = -1; | 
| 399 |  |  | 
| 400 | 7 |     std::map<std::string, int64_t> lst_disks_io_time; | 
| 401 | 7 |     std::map<std::string, int64_t> lst_net_send_bytes; | 
| 402 | 7 |     std::map<std::string, int64_t> lst_net_receive_bytes; | 
| 403 |  |  | 
| 404 | 505 |     do { | 
| 405 | 505 |         DorisMetrics::instance()->metric_registry()->trigger_all_hooks(true); | 
| 406 |  |  | 
| 407 | 505 |         if (last_ts == -1L) { | 
| 408 | 7 |             last_ts = GetMonoTimeMicros() / 1000; | 
| 409 | 7 |             lst_query_bytes = DorisMetrics::instance()->query_scan_bytes->value(); | 
| 410 | 7 |             if (config::enable_system_metrics) { | 
| 411 | 2 |                 DorisMetrics::instance()->system_metrics()->get_disks_io_time(&lst_disks_io_time); | 
| 412 | 2 |                 DorisMetrics::instance()->system_metrics()->get_network_traffic( | 
| 413 | 2 |                         &lst_net_send_bytes, &lst_net_receive_bytes); | 
| 414 | 2 |             } | 
| 415 | 498 |         } else { | 
| 416 | 498 |             int64_t current_ts = GetMonoTimeMicros() / 1000; | 
| 417 | 498 |             long interval = (current_ts - last_ts) / 1000; | 
| 418 | 498 |             last_ts = current_ts; | 
| 419 |  |  | 
| 420 |  |             // 1. query bytes per second | 
| 421 | 498 |             int64_t current_query_bytes = DorisMetrics::instance()->query_scan_bytes->value(); | 
| 422 | 498 |             int64_t qps = (current_query_bytes - lst_query_bytes) / (interval + 1); | 
| 423 | 498 |             DorisMetrics::instance()->query_scan_bytes_per_second->set_value(qps < 0 ? 0 : qps); | 
| 424 | 498 |             lst_query_bytes = current_query_bytes; | 
| 425 |  |  | 
| 426 | 498 |             if (config::enable_system_metrics) { | 
| 427 |  |                 // 2. max disk io util | 
| 428 | 38 |                 DorisMetrics::instance()->system_metrics()->update_max_disk_io_util_percent( | 
| 429 | 38 |                         lst_disks_io_time, 15); | 
| 430 |  |  | 
| 431 |  |                 // update lst map | 
| 432 | 38 |                 DorisMetrics::instance()->system_metrics()->get_disks_io_time(&lst_disks_io_time); | 
| 433 |  |  | 
| 434 |  |                 // 3. max network traffic | 
| 435 | 38 |                 int64_t max_send = 0; | 
| 436 | 38 |                 int64_t max_receive = 0; | 
| 437 | 38 |                 DorisMetrics::instance()->system_metrics()->get_max_net_traffic( | 
| 438 | 38 |                         lst_net_send_bytes, lst_net_receive_bytes, 15, &max_send, &max_receive); | 
| 439 | 38 |                 DorisMetrics::instance()->system_metrics()->update_max_network_send_bytes_rate( | 
| 440 | 38 |                         max_send); | 
| 441 | 38 |                 DorisMetrics::instance()->system_metrics()->update_max_network_receive_bytes_rate( | 
| 442 | 38 |                         max_receive); | 
| 443 |  |                 // update lst map | 
| 444 | 38 |                 DorisMetrics::instance()->system_metrics()->get_network_traffic( | 
| 445 | 38 |                         &lst_net_send_bytes, &lst_net_receive_bytes); | 
| 446 |  |  | 
| 447 | 38 |                 DorisMetrics::instance()->system_metrics()->update_be_avail_cpu_num(); | 
| 448 | 38 |             } | 
| 449 | 498 |             update_rowsets_and_segments_num_metrics(); | 
| 450 | 498 |         } | 
| 451 | 505 |     } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(15))); | 
| 452 | 7 | } | 
| 453 |  |  | 
| 454 | 7 | void Daemon::report_runtime_query_statistics_thread() { | 
| 455 | 2.50k |     while (!_stop_background_threads_latch.wait_for( | 
| 456 | 2.50k |             std::chrono::milliseconds(config::report_query_statistics_interval_ms))) { | 
| 457 | 2.49k |         ExecEnv::GetInstance()->runtime_query_statistics_mgr()->report_runtime_query_statistics(); | 
| 458 | 2.49k |     } | 
| 459 | 7 | } | 
| 460 |  |  | 
| 461 | 7 | void Daemon::je_reset_dirty_decay_thread() const { | 
| 462 | 7 |     do { | 
| 463 | 7 |         std::unique_lock<std::mutex> l(doris::JemallocControl::je_reset_dirty_decay_lock); | 
| 464 | 75.7k |         while (_stop_background_threads_latch.count() != 0 && | 
| 465 | 75.7k |                !doris::JemallocControl::je_reset_dirty_decay_notify.load( | 
| 466 | 75.7k |                        std::memory_order_relaxed)) { | 
| 467 | 75.7k |             doris::JemallocControl::je_reset_dirty_decay_cv.wait_for( | 
| 468 | 75.7k |                     l, std::chrono::milliseconds(100)); | 
| 469 | 75.7k |         } | 
| 470 | 7 |         if (_stop_background_threads_latch.count() == 0) { | 
| 471 | 3 |             break; | 
| 472 | 3 |         } | 
| 473 |  |  | 
| 474 | 4 |         Defer defer {[&]() { | 
| 475 | 0 |             doris::JemallocControl::je_reset_dirty_decay_notify.store(false, | 
| 476 | 0 |                                                                       std::memory_order_relaxed); | 
| 477 | 0 |         }}; | 
| 478 |  | #ifdef USE_JEMALLOC | 
| 479 |  |         if (config::disable_memory_gc || !config::enable_je_purge_dirty_pages) { | 
| 480 |  |             continue; | 
| 481 |  |         } | 
| 482 |  |  | 
| 483 |  |         // There is a significant difference only when dirty_decay_ms is equal to 0 or not. | 
| 484 |  |         // | 
| 485 |  |         // 1. When dirty_decay_ms is not equal to 0, the free memory will be cached in the Jemalloc | 
| 486 |  |         // dirty page first. even if dirty_decay_ms is equal to 1, the Jemalloc dirty page will not | 
| 487 |  |         // be released to the system exactly after 1ms, it will be released according to the decay rule. | 
| 488 |  |         // The Jemalloc document specifies that dirty_decay_ms is an approximate time. | 
| 489 |  |         // | 
| 490 |  |         // 2. It has been observed in an actual cluster that even if dirty_decay_ms is changed | 
| 491 |  |         // from th default 5000 to 1, Jemalloc dirty page will still cache a large amount of memory, everything | 
| 492 |  |         // seems to be the same as `dirty_decay_ms:5000`. only when dirty_decay_ms is changed to 0, | 
| 493 |  |         // jemalloc dirty page will stop caching and free memory will be released to the system immediately. | 
| 494 |  |         // of course, performance will be affected. | 
| 495 |  |         // | 
| 496 |  |         // 3. After reducing dirty_decay_ms, manually calling `decay_all_arena_dirty_pages` may release dirty pages | 
| 497 |  |         // as soon as possible, but no relevant experimental data can be found, so it is simple and safe | 
| 498 |  |         // to adjust dirty_decay_ms only between zero and non-zero. | 
| 499 |  |  | 
| 500 |  |         if (doris::JemallocControl::je_enable_dirty_page) { | 
| 501 |  |             doris::JemallocControl::je_reset_all_arena_dirty_decay_ms(config::je_dirty_decay_ms); | 
| 502 |  |         } else { | 
| 503 |  |             doris::JemallocControl::je_reset_all_arena_dirty_decay_ms(0); | 
| 504 |  |         } | 
| 505 |  | #endif | 
| 506 | 4 |     } while (true); | 
| 507 | 7 | } | 
| 508 |  |  | 
| 509 | 7 | void Daemon::cache_adjust_capacity_thread() { | 
| 510 | 7 |     do { | 
| 511 | 7 |         std::unique_lock<std::mutex> l(doris::GlobalMemoryArbitrator::cache_adjust_capacity_lock); | 
| 512 | 75.7k |         while (_stop_background_threads_latch.count() != 0 && | 
| 513 | 75.7k |                !doris::GlobalMemoryArbitrator::cache_adjust_capacity_notify.load( | 
| 514 | 75.7k |                        std::memory_order_relaxed)) { | 
| 515 | 75.7k |             doris::GlobalMemoryArbitrator::cache_adjust_capacity_cv.wait_for( | 
| 516 | 75.7k |                     l, std::chrono::milliseconds(100)); | 
| 517 | 75.7k |         } | 
| 518 | 7 |         double adjust_weighted = std::min<double>( | 
| 519 | 7 |                 GlobalMemoryArbitrator::last_periodic_refreshed_cache_capacity_adjust_weighted, | 
| 520 | 7 |                 GlobalMemoryArbitrator::last_memory_exceeded_cache_capacity_adjust_weighted); | 
| 521 | 7 |         if (_stop_background_threads_latch.count() == 0) { | 
| 522 | 3 |             break; | 
| 523 | 3 |         } | 
| 524 |  |  | 
| 525 | 4 |         Defer defer {[&]() { | 
| 526 | 0 |             doris::GlobalMemoryArbitrator::cache_adjust_capacity_notify.store( | 
| 527 | 0 |                     false, std::memory_order_relaxed); | 
| 528 | 0 |         }}; | 
| 529 | 4 |         if (config::disable_memory_gc) { | 
| 530 | 0 |             continue; | 
| 531 | 0 |         } | 
| 532 | 4 |         if (GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted == | 
| 533 | 4 |             adjust_weighted) { | 
| 534 | 0 |             LOG(INFO) << fmt::format( | 
| 535 | 0 |                     "[MemoryGC] adjust cache capacity end, adjust_weighted {} has not been " | 
| 536 | 0 |                     "modified.", | 
| 537 | 0 |                     adjust_weighted); | 
| 538 | 0 |             continue; | 
| 539 | 0 |         } | 
| 540 | 4 |         std::unique_ptr<RuntimeProfile> profile = std::make_unique<RuntimeProfile>(""); | 
| 541 | 4 |         auto freed_mem = CacheManager::instance()->for_each_cache_refresh_capacity(adjust_weighted, | 
| 542 | 4 |                                                                                    profile.get()); | 
| 543 | 4 |         std::stringstream ss; | 
| 544 | 4 |         profile->pretty_print(&ss); | 
| 545 | 4 |         LOG(INFO) << fmt::format( | 
| 546 | 4 |                 "[MemoryGC] adjust cache capacity end, free memory {}, details: {}", | 
| 547 | 4 |                 PrettyPrinter::print(freed_mem, TUnit::BYTES), ss.str()); | 
| 548 | 4 |         GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted = adjust_weighted; | 
| 549 | 4 |     } while (true); | 
| 550 | 7 | } | 
| 551 |  |  | 
| 552 | 7 | void Daemon::cache_prune_stale_thread() { | 
| 553 | 7 |     int32_t interval = config::cache_periodic_prune_stale_sweep_sec; | 
| 554 | 129 |     while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))) { | 
| 555 | 122 |         if (config::cache_periodic_prune_stale_sweep_sec <= 0) { | 
| 556 | 0 |             LOG(WARNING) << "config of cache clean interval is: [" << interval | 
| 557 | 0 |                          << "], it means the cache prune stale thread is disabled, will wait 3s " | 
| 558 | 0 |                             "and check again."; | 
| 559 | 0 |             interval = 3; | 
| 560 | 0 |             continue; | 
| 561 | 0 |         } | 
| 562 | 122 |         if (config::disable_memory_gc) { | 
| 563 | 0 |             continue; | 
| 564 | 0 |         } | 
| 565 | 122 |         CacheManager::instance()->for_each_cache_prune_stale(); | 
| 566 | 122 |         interval = config::cache_periodic_prune_stale_sweep_sec; | 
| 567 | 122 |     } | 
| 568 | 7 | } | 
| 569 |  |  | 
| 570 | 4 | void Daemon::be_proc_monitor_thread() { | 
| 571 | 34 |     while (!_stop_background_threads_latch.wait_for( | 
| 572 | 34 |             std::chrono::milliseconds(config::be_proc_monitor_interval_ms))) { | 
| 573 | 30 |         LOG(INFO) << "log be thread num, " << BeProcMonitor::get_be_thread_info(); | 
| 574 | 30 |     } | 
| 575 | 4 | } | 
| 576 |  |  | 
| 577 | 7 | void Daemon::calculate_workload_group_metrics_thread() { | 
| 578 | 1.52k |     while (!_stop_background_threads_latch.wait_for( | 
| 579 | 1.52k |             std::chrono::milliseconds(config::workload_group_metrics_interval_ms))) { | 
| 580 | 1.51k |         ExecEnv::GetInstance()->workload_group_mgr()->refresh_workload_group_metrics(); | 
| 581 | 1.51k |     } | 
| 582 | 7 | } | 
| 583 |  |  | 
| 584 | 7 | void Daemon::start() { | 
| 585 | 7 |     Status st; | 
| 586 | 7 |     st = Thread::create( | 
| 587 | 7 |             "Daemon", "tcmalloc_gc_thread", [this]() { this->tcmalloc_gc_thread(); }, | 
| 588 | 7 |             &_threads.emplace_back()); | 
| 589 | 7 |     CHECK(st.ok()) << st; | 
| 590 | 7 |     st = Thread::create( | 
| 591 | 7 |             "Daemon", "memory_maintenance_thread", [this]() { this->memory_maintenance_thread(); }, | 
| 592 | 7 |             &_threads.emplace_back()); | 
| 593 | 7 |     CHECK(st.ok()) << st; | 
| 594 | 7 |     st = Thread::create( | 
| 595 | 7 |             "Daemon", "memtable_memory_refresh_thread", | 
| 596 | 7 |             [this]() { this->memtable_memory_refresh_thread(); }, &_threads.emplace_back()); | 
| 597 | 7 |     CHECK(st.ok()) << st; | 
| 598 |  |  | 
| 599 | 7 |     if (config::enable_metric_calculator) { | 
| 600 | 7 |         st = Thread::create( | 
| 601 | 7 |                 "Daemon", "calculate_metrics_thread", | 
| 602 | 7 |                 [this]() { this->calculate_metrics_thread(); }, &_threads.emplace_back()); | 
| 603 | 7 |         CHECK(st.ok()) << st; | 
| 604 | 7 |     } | 
| 605 | 7 |     st = Thread::create( | 
| 606 | 7 |             "Daemon", "je_reset_dirty_decay_thread", | 
| 607 | 7 |             [this]() { this->je_reset_dirty_decay_thread(); }, &_threads.emplace_back()); | 
| 608 | 7 |     CHECK(st.ok()) << st; | 
| 609 | 7 |     st = Thread::create( | 
| 610 | 7 |             "Daemon", "cache_adjust_capacity_thread", | 
| 611 | 7 |             [this]() { this->cache_adjust_capacity_thread(); }, &_threads.emplace_back()); | 
| 612 | 7 |     CHECK(st.ok()) << st; | 
| 613 | 7 |     st = Thread::create( | 
| 614 | 7 |             "Daemon", "cache_prune_stale_thread", [this]() { this->cache_prune_stale_thread(); }, | 
| 615 | 7 |             &_threads.emplace_back()); | 
| 616 | 7 |     CHECK(st.ok()) << st; | 
| 617 | 7 |     st = Thread::create( | 
| 618 | 7 |             "Daemon", "query_runtime_statistics_thread", | 
| 619 | 7 |             [this]() { this->report_runtime_query_statistics_thread(); }, &_threads.emplace_back()); | 
| 620 | 7 |     CHECK(st.ok()) << st; | 
| 621 |  |  | 
| 622 | 7 |     if (config::enable_be_proc_monitor) { | 
| 623 | 4 |         st = Thread::create( | 
| 624 | 4 |                 "Daemon", "be_proc_monitor_thread", [this]() { this->be_proc_monitor_thread(); }, | 
| 625 | 4 |                 &_threads.emplace_back()); | 
| 626 | 4 |     } | 
| 627 | 7 |     CHECK(st.ok()) << st; | 
| 628 |  |  | 
| 629 | 7 |     st = Thread::create( | 
| 630 | 7 |             "Daemon", "workload_group_metrics", | 
| 631 | 7 |             [this]() { this->calculate_workload_group_metrics_thread(); }, | 
| 632 | 7 |             &_threads.emplace_back()); | 
| 633 | 7 |     CHECK(st.ok()) << st; | 
| 634 | 7 | } | 
| 635 |  |  | 
| 636 | 3 | void Daemon::stop() { | 
| 637 | 3 |     LOG(INFO) << "Doris daemon is stopping."; | 
| 638 | 3 |     if (_stop_background_threads_latch.count() == 0) { | 
| 639 | 0 |         LOG(INFO) << "Doris daemon stop returned since no bg threads latch."; | 
| 640 | 0 |         return; | 
| 641 | 0 |     } | 
| 642 | 3 |     _stop_background_threads_latch.count_down(); | 
| 643 | 29 |     for (auto&& t : _threads) { | 
| 644 | 29 |         if (t) { | 
| 645 | 29 |             t->join(); | 
| 646 | 29 |         } | 
| 647 | 29 |     } | 
| 648 |  |     LOG(INFO) << "Doris daemon stopped after background threads are joined."; | 
| 649 | 3 | } | 
| 650 |  |  | 
| 651 |  | } // namespace doris |