Coverage Report

Created: 2025-04-30 06:12

/root/doris/be/src/common/daemon.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "common/daemon.h"
19
20
// IWYU pragma: no_include <bthread/errno.h>
21
#include <errno.h> // IWYU pragma: keep
22
#include <gflags/gflags.h>
23
24
#include "runtime/memory/jemalloc_control.h"
25
#if !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && \
26
        !defined(THREAD_SANITIZER) && !defined(USE_JEMALLOC)
27
#include <gperftools/malloc_extension.h> // IWYU pragma: keep
28
#endif
29
// IWYU pragma: no_include <bits/std_abs.h>
30
#include <butil/iobuf.h>
31
#include <math.h>
32
#include <stdint.h>
33
#include <stdlib.h>
34
35
// IWYU pragma: no_include <bits/chrono.h>
36
#include <chrono> // IWYU pragma: keep
37
#include <map>
38
#include <ostream>
39
#include <string>
40
41
#include "cloud/config.h"
42
#include "common/config.h"
43
#include "common/logging.h"
44
#include "common/status.h"
45
#include "olap/memtable_memory_limiter.h"
46
#include "olap/storage_engine.h"
47
#include "olap/tablet_manager.h"
48
#include "runtime/be_proc_monitor.h"
49
#include "runtime/exec_env.h"
50
#include "runtime/fragment_mgr.h"
51
#include "runtime/memory/global_memory_arbitrator.h"
52
#include "runtime/memory/mem_tracker_limiter.h"
53
#include "runtime/memory/memory_reclamation.h"
54
#include "runtime/process_profile.h"
55
#include "runtime/runtime_query_statistics_mgr.h"
56
#include "runtime/workload_group/workload_group_manager.h"
57
#include "util/algorithm_util.h"
58
#include "util/doris_metrics.h"
59
#include "util/mem_info.h"
60
#include "util/metrics.h"
61
#include "util/perf_counters.h"
62
#include "util/system_metrics.h"
63
#include "util/time.h"
64
65
namespace doris {
66
namespace {
67
68
std::atomic<int64_t> last_print_proc_mem = 0;
69
std::atomic<int32_t> refresh_cache_capacity_sleep_time_ms = 0;
70
std::atomic<int32_t> memory_gc_sleep_time = 0;
71
#ifdef USE_JEMALLOC
72
std::atomic<int32_t> je_reset_dirty_decay_sleep_time_ms = 0;
73
#endif
74
75
1.28k
void update_rowsets_and_segments_num_metrics() {
76
1.28k
    if (config::is_cloud_mode()) {
77
        // TODO(plat1ko): CloudStorageEngine
78
1.28k
    } else {
79
1.28k
        StorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_local();
80
1.28k
        auto* metrics = DorisMetrics::instance();
81
1.28k
        metrics->all_rowsets_num->set_value(engine.tablet_manager()->get_rowset_nums());
82
1.28k
        metrics->all_segments_num->set_value(engine.tablet_manager()->get_segment_nums());
83
1.28k
    }
84
1.28k
}
85
86
} // namespace
87
88
4
void Daemon::tcmalloc_gc_thread() {
89
    // TODO All cache GC wish to be supported
90
#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) && \
91
        !defined(USE_JEMALLOC)
92
93
    // Limit size of tcmalloc cache via release_rate and max_cache_percent.
94
    // We adjust release_rate according to memory_pressure, which is usage percent of memory.
95
    int64_t max_cache_percent = 60;
96
    double release_rates[10] = {1.0, 1.0, 1.0, 5.0, 5.0, 20.0, 50.0, 100.0, 500.0, 2000.0};
97
    int64_t pressure_limit = 90;
98
    bool is_performance_mode = false;
99
    int64_t physical_limit_bytes =
100
            std::min(MemInfo::physical_mem() - MemInfo::sys_mem_available_low_water_mark(),
101
                     MemInfo::mem_limit());
102
103
    if (config::memory_mode == std::string("performance")) {
104
        max_cache_percent = 100;
105
        pressure_limit = 90;
106
        is_performance_mode = true;
107
        physical_limit_bytes = std::min(MemInfo::mem_limit(), MemInfo::physical_mem());
108
    } else if (config::memory_mode == std::string("compact")) {
109
        max_cache_percent = 20;
110
        pressure_limit = 80;
111
    }
112
113
    int last_ms = 0;
114
    const int kMaxLastMs = 30000;
115
    const int kIntervalMs = 10;
116
    size_t init_aggressive_decommit = 0;
117
    size_t current_aggressive_decommit = 0;
118
    size_t expected_aggressive_decommit = 0;
119
    int64_t last_memory_pressure = 0;
120
121
    MallocExtension::instance()->GetNumericProperty("tcmalloc.aggressive_memory_decommit",
122
                                                    &init_aggressive_decommit);
123
    current_aggressive_decommit = init_aggressive_decommit;
124
125
    while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(kIntervalMs))) {
126
        size_t tc_used_bytes = 0;
127
        size_t tc_alloc_bytes = 0;
128
        size_t rss = PerfCounters::get_vm_rss();
129
130
        MallocExtension::instance()->GetNumericProperty("generic.total_physical_bytes",
131
                                                        &tc_alloc_bytes);
132
        MallocExtension::instance()->GetNumericProperty("generic.current_allocated_bytes",
133
                                                        &tc_used_bytes);
134
        int64_t tc_cached_bytes = (int64_t)tc_alloc_bytes - (int64_t)tc_used_bytes;
135
        int64_t to_free_bytes =
136
                (int64_t)tc_cached_bytes - ((int64_t)tc_used_bytes * max_cache_percent / 100);
137
        to_free_bytes = std::max(to_free_bytes, (int64_t)0);
138
139
        int64_t memory_pressure = 0;
140
        int64_t rss_pressure = 0;
141
        int64_t alloc_bytes = std::max(rss, tc_alloc_bytes);
142
        memory_pressure = alloc_bytes * 100 / physical_limit_bytes;
143
        rss_pressure = rss * 100 / physical_limit_bytes;
144
145
        expected_aggressive_decommit = init_aggressive_decommit;
146
        if (memory_pressure > pressure_limit) {
147
            // We are reaching oom, so release cache aggressively.
148
            // Ideally, we should reuse cache and not allocate from system any more,
149
            // however, it is hard to set limit on cache of tcmalloc and doris
150
            // use mmap in vectorized mode.
151
            // Limit cache capactiy is enough.
152
            if (rss_pressure > pressure_limit) {
153
                int64_t min_free_bytes = alloc_bytes - physical_limit_bytes * 9 / 10;
154
                to_free_bytes = std::max(to_free_bytes, min_free_bytes);
155
                to_free_bytes = std::max(to_free_bytes, tc_cached_bytes * 30 / 100);
156
                // We assure that we have at least 500M bytes in cache.
157
                to_free_bytes = std::min(to_free_bytes, tc_cached_bytes - 500 * 1024 * 1024);
158
                expected_aggressive_decommit = 1;
159
            }
160
            last_ms = kMaxLastMs;
161
        } else if (memory_pressure > (pressure_limit - 10)) {
162
            // In most cases, adjusting release rate is enough, if memory are consumed quickly
163
            // we should release manually.
164
            if (last_memory_pressure <= (pressure_limit - 10)) {
165
                to_free_bytes = std::max(to_free_bytes, tc_cached_bytes * 10 / 100);
166
            }
167
        }
168
169
        int release_rate_index = memory_pressure / 10;
170
        double release_rate = 1.0;
171
        if (release_rate_index >= sizeof(release_rates) / sizeof(release_rates[0])) {
172
            release_rate = 2000.0;
173
        } else {
174
            release_rate = release_rates[release_rate_index];
175
        }
176
        MallocExtension::instance()->SetMemoryReleaseRate(release_rate);
177
178
        if ((current_aggressive_decommit != expected_aggressive_decommit) && !is_performance_mode) {
179
            MallocExtension::instance()->SetNumericProperty("tcmalloc.aggressive_memory_decommit",
180
                                                            expected_aggressive_decommit);
181
            current_aggressive_decommit = expected_aggressive_decommit;
182
        }
183
184
        last_memory_pressure = memory_pressure;
185
        // We release at least 2% bytes once, frequent releasing hurts performance.
186
        if (to_free_bytes > (physical_limit_bytes * 2 / 100)) {
187
            last_ms += kIntervalMs;
188
            if (last_ms >= kMaxLastMs) {
189
                LOG(INFO) << "generic.current_allocated_bytes " << tc_used_bytes
190
                          << ", generic.total_physical_bytes " << tc_alloc_bytes << ", rss " << rss
191
                          << ", max_cache_percent " << max_cache_percent << ", release_rate "
192
                          << release_rate << ", memory_pressure " << memory_pressure
193
                          << ", physical_limit_bytes " << physical_limit_bytes << ", to_free_bytes "
194
                          << to_free_bytes << ", current_aggressive_decommit "
195
                          << current_aggressive_decommit;
196
                MallocExtension::instance()->ReleaseToSystem(to_free_bytes);
197
                last_ms = 0;
198
            }
199
        } else {
200
            last_ms = 0;
201
        }
202
    }
203
#endif
204
4
}
205
206
301k
void refresh_process_memory_metrics() {
207
301k
    doris::PerfCounters::refresh_proc_status();
208
301k
    doris::MemInfo::refresh_proc_meminfo();
209
301k
    doris::GlobalMemoryArbitrator::reset_refresh_interval_memory_growth();
210
301k
    ExecEnv::GetInstance()->brpc_iobuf_block_memory_tracker()->set_consumption(
211
301k
            butil::IOBuf::block_memory());
212
301k
}
213
214
301k
void refresh_common_allocator_metrics() {
215
#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER)
216
    doris::JemallocControl::refresh_allocator_mem();
217
    if (config::enable_system_metrics) {
218
        DorisMetrics::instance()->system_metrics()->update_allocator_metrics();
219
    }
220
#endif
221
301k
    doris::GlobalMemoryArbitrator::refresh_memory_bvar();
222
301k
}
223
224
301k
void refresh_memory_state_after_memory_change() {
225
301k
    if (abs(last_print_proc_mem - PerfCounters::get_vm_rss()) > 268435456) {
226
1.07k
        last_print_proc_mem = PerfCounters::get_vm_rss();
227
1.07k
        doris::MemTrackerLimiter::clean_tracker_limiter_group();
228
1.07k
        doris::ProcessProfile::instance()->memory_profile()->enable_print_log_process_usage();
229
1.07k
        doris::ProcessProfile::instance()->memory_profile()->refresh_memory_overview_profile();
230
1.07k
        doris::JemallocControl::notify_je_purge_dirty_pages();
231
1.07k
        LOG(INFO) << doris::GlobalMemoryArbitrator::
232
1.07k
                        process_mem_log_str(); // print mem log when memory state by 256M
233
1.07k
    }
234
301k
}
235
236
301k
void refresh_cache_capacity() {
237
301k
    if (doris::GlobalMemoryArbitrator::cache_adjust_capacity_notify.load(
238
301k
                std::memory_order_relaxed)) {
239
        // the last cache capacity adjustment has not been completed.
240
        // if not return, last_periodic_refreshed_cache_capacity_adjust_weighted may be modified, but notify is ignored.
241
430
        return;
242
430
    }
243
301k
    if (refresh_cache_capacity_sleep_time_ms <= 0) {
244
301k
        auto cache_capacity_reduce_mem_limit = int64_t(
245
301k
                doris::MemInfo::soft_mem_limit() * config::cache_capacity_reduce_mem_limit_frac);
246
301k
        int64_t process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage();
247
        // the rule is like this:
248
        // 1. if the process mem usage < soft memlimit * 0.6, then do not need adjust cache capacity.
249
        // 2. if the process mem usage > soft memlimit * 0.6 and process mem usage < soft memlimit, then it will be adjusted to a lower value.
250
        // 3. if the process mem usage > soft memlimit, then the capacity is adjusted to 0.
251
301k
        double new_cache_capacity_adjust_weighted =
252
301k
                AlgoUtil::descent_by_step(10, cache_capacity_reduce_mem_limit,
253
301k
                                          doris::MemInfo::soft_mem_limit(), process_memory_usage);
254
301k
        if (new_cache_capacity_adjust_weighted !=
255
301k
            doris::GlobalMemoryArbitrator::last_periodic_refreshed_cache_capacity_adjust_weighted) {
256
0
            doris::GlobalMemoryArbitrator::last_periodic_refreshed_cache_capacity_adjust_weighted =
257
0
                    new_cache_capacity_adjust_weighted;
258
0
            doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity();
259
0
            refresh_cache_capacity_sleep_time_ms = config::memory_gc_sleep_time_ms;
260
301k
        } else {
261
301k
            refresh_cache_capacity_sleep_time_ms = 0;
262
301k
        }
263
301k
    }
264
301k
    refresh_cache_capacity_sleep_time_ms -= config::memory_maintenance_sleep_time_ms;
265
301k
}
266
267
301k
void je_reset_dirty_decay() {
268
#ifdef USE_JEMALLOC
269
    if (doris::JemallocControl::je_reset_dirty_decay_notify.load(std::memory_order_relaxed)) {
270
        // if not return, je_enable_dirty_page may be modified, but notify is ignored.
271
        return;
272
    }
273
274
    if (je_reset_dirty_decay_sleep_time_ms <= 0) {
275
        bool new_je_enable_dirty_page = true;
276
        if (doris::JemallocControl::je_enable_dirty_page) {
277
            // if Jemalloc dirty page is enabled and process memory exceed soft mem limit,
278
            // disable Jemalloc dirty page.
279
            new_je_enable_dirty_page = !GlobalMemoryArbitrator::is_exceed_soft_mem_limit();
280
        } else {
281
            // if Jemalloc dirty page is disabled and 10% free memory left to exceed soft mem limit,
282
            // enable Jemalloc dirty page, this is to avoid frequent changes
283
            // between enabling and disabling Jemalloc dirty pages, if the process memory does
284
            // not exceed the soft mem limit after turning off Jemalloc dirty pages,
285
            // but it will exceed soft mem limit after turning it on.
286
            new_je_enable_dirty_page = !GlobalMemoryArbitrator::is_exceed_soft_mem_limit(
287
                    int64_t(doris::MemInfo::soft_mem_limit() * 0.1));
288
        }
289
290
        if (doris::JemallocControl::je_enable_dirty_page != new_je_enable_dirty_page) {
291
            // `notify_je_reset_dirty_decay` only if `je_enable_dirty_page` changes.
292
            doris::JemallocControl::je_enable_dirty_page = new_je_enable_dirty_page;
293
            doris::JemallocControl::notify_je_reset_dirty_decay();
294
            je_reset_dirty_decay_sleep_time_ms = config::memory_gc_sleep_time_ms;
295
        } else {
296
            je_reset_dirty_decay_sleep_time_ms = 0;
297
        }
298
    }
299
    je_reset_dirty_decay_sleep_time_ms -= config::memory_maintenance_sleep_time_ms;
300
#endif
301
301k
}
302
303
301k
void memory_gc() {
304
301k
    if (config::disable_memory_gc) {
305
0
        return;
306
0
    }
307
301k
    if (memory_gc_sleep_time <= 0) {
308
30.1k
        auto gc_func = [](const std::string& revoke_reason) {
309
0
            doris::ProcessProfile::instance()->memory_profile()->print_log_process_usage();
310
0
            if (doris::MemoryReclamation::revoke_process_memory(revoke_reason)) {
311
                // If there is not enough memory to be gc, the process memory usage will not be printed in the next continuous gc.
312
0
                doris::ProcessProfile::instance()
313
0
                        ->memory_profile()
314
0
                        ->enable_print_log_process_usage();
315
0
            }
316
0
        };
317
318
30.1k
        if (doris::GlobalMemoryArbitrator::sys_mem_available() <
319
30.1k
            doris::MemInfo::sys_mem_available_low_water_mark()) {
320
0
            gc_func("sys available memory less than low water mark");
321
30.1k
        } else if (doris::GlobalMemoryArbitrator::process_memory_usage() >
322
30.1k
                   doris::MemInfo::mem_limit()) {
323
0
            gc_func("process memory used exceed limit");
324
0
        }
325
30.1k
        memory_gc_sleep_time = config::memory_gc_sleep_time_ms;
326
30.1k
    }
327
301k
    memory_gc_sleep_time -= config::memory_maintenance_sleep_time_ms;
328
301k
}
329
330
4
void Daemon::memory_maintenance_thread() {
331
301k
    while (!_stop_background_threads_latch.wait_for(
332
301k
            std::chrono::milliseconds(config::memory_maintenance_sleep_time_ms))) {
333
        // step 1. Refresh process memory metrics.
334
301k
        refresh_process_memory_metrics();
335
336
        // step 2. Refresh jemalloc/tcmalloc metrics.
337
301k
        refresh_common_allocator_metrics();
338
339
        // step 3. Update and print memory stat when the memory changes by 256M.
340
301k
        refresh_memory_state_after_memory_change();
341
342
        // step 4. Asyn Refresh cache capacity
343
        // TODO adjust cache capacity based on smoothstep (smooth gradient).
344
301k
        refresh_cache_capacity();
345
346
        // step 5. Cancel top memory task when process memory exceed hard limit.
347
301k
        memory_gc();
348
349
        // step 6. Refresh weighted memory ratio of workload groups.
350
301k
        doris::ExecEnv::GetInstance()->workload_group_mgr()->do_sweep();
351
301k
        doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_limit();
352
353
        // step 7: handle paused queries(caused by memory insufficient)
354
301k
        doris::ExecEnv::GetInstance()->workload_group_mgr()->handle_paused_queries();
355
356
        // step 8. Flush memtable
357
301k
        doris::GlobalMemoryArbitrator::notify_memtable_memory_refresh();
358
        // TODO notify flush memtable
359
360
        // step 9. Reset Jemalloc dirty page decay.
361
301k
        je_reset_dirty_decay();
362
301k
    }
363
4
}
364
365
4
void Daemon::memtable_memory_refresh_thread() {
366
    // Refresh the memory statistics of the load channel tracker more frequently,
367
    // which helps to accurately control the memory of LoadChannelMgr.
368
301k
    do {
369
301k
        std::unique_lock<std::mutex> l(doris::GlobalMemoryArbitrator::memtable_memory_refresh_lock);
370
624k
        while (_stop_background_threads_latch.count() != 0 &&
371
624k
               !doris::GlobalMemoryArbitrator::memtable_memory_refresh_notify.load(
372
624k
                       std::memory_order_relaxed)) {
373
322k
            doris::GlobalMemoryArbitrator::memtable_memory_refresh_cv.wait_for(
374
322k
                    l, std::chrono::milliseconds(100));
375
322k
        }
376
301k
        if (_stop_background_threads_latch.count() == 0) {
377
2
            break;
378
2
        }
379
380
301k
        Defer defer {[&]() {
381
301k
            doris::GlobalMemoryArbitrator::memtable_memory_refresh_notify.store(
382
301k
                    false, std::memory_order_relaxed);
383
301k
        }};
384
301k
        doris::ExecEnv::GetInstance()->memtable_memory_limiter()->refresh_mem_tracker();
385
301k
    } while (true);
386
4
}
387
388
/*
389
 * this thread will calculate some metrics at a fix interval(15 sec)
390
 * 1. push bytes per second
391
 * 2. scan bytes per second
392
 * 3. max io util of all disks
393
 * 4. max network send bytes rate
394
 * 5. max network receive bytes rate
395
 */
396
4
void Daemon::calculate_metrics_thread() {
397
4
    int64_t last_ts = -1L;
398
4
    int64_t lst_query_bytes = -1;
399
400
4
    std::map<std::string, int64_t> lst_disks_io_time;
401
4
    std::map<std::string, int64_t> lst_net_send_bytes;
402
4
    std::map<std::string, int64_t> lst_net_receive_bytes;
403
404
1.28k
    do {
405
1.28k
        DorisMetrics::instance()->metric_registry()->trigger_all_hooks(true);
406
407
1.28k
        if (last_ts == -1L) {
408
4
            last_ts = GetMonoTimeMicros() / 1000;
409
4
            lst_query_bytes = DorisMetrics::instance()->query_scan_bytes->value();
410
4
            if (config::enable_system_metrics) {
411
2
                DorisMetrics::instance()->system_metrics()->get_disks_io_time(&lst_disks_io_time);
412
2
                DorisMetrics::instance()->system_metrics()->get_network_traffic(
413
2
                        &lst_net_send_bytes, &lst_net_receive_bytes);
414
2
            }
415
1.28k
        } else {
416
1.28k
            int64_t current_ts = GetMonoTimeMicros() / 1000;
417
1.28k
            long interval = (current_ts - last_ts) / 1000;
418
1.28k
            last_ts = current_ts;
419
420
            // 1. query bytes per second
421
1.28k
            int64_t current_query_bytes = DorisMetrics::instance()->query_scan_bytes->value();
422
1.28k
            int64_t qps = (current_query_bytes - lst_query_bytes) / (interval + 1);
423
1.28k
            DorisMetrics::instance()->query_scan_bytes_per_second->set_value(qps < 0 ? 0 : qps);
424
1.28k
            lst_query_bytes = current_query_bytes;
425
426
1.28k
            if (config::enable_system_metrics) {
427
                // 2. max disk io util
428
506
                DorisMetrics::instance()->system_metrics()->update_max_disk_io_util_percent(
429
506
                        lst_disks_io_time, 15);
430
431
                // update lst map
432
506
                DorisMetrics::instance()->system_metrics()->get_disks_io_time(&lst_disks_io_time);
433
434
                // 3. max network traffic
435
506
                int64_t max_send = 0;
436
506
                int64_t max_receive = 0;
437
506
                DorisMetrics::instance()->system_metrics()->get_max_net_traffic(
438
506
                        lst_net_send_bytes, lst_net_receive_bytes, 15, &max_send, &max_receive);
439
506
                DorisMetrics::instance()->system_metrics()->update_max_network_send_bytes_rate(
440
506
                        max_send);
441
506
                DorisMetrics::instance()->system_metrics()->update_max_network_receive_bytes_rate(
442
506
                        max_receive);
443
                // update lst map
444
506
                DorisMetrics::instance()->system_metrics()->get_network_traffic(
445
506
                        &lst_net_send_bytes, &lst_net_receive_bytes);
446
447
506
                DorisMetrics::instance()->system_metrics()->update_be_avail_cpu_num();
448
506
            }
449
1.28k
            update_rowsets_and_segments_num_metrics();
450
1.28k
        }
451
1.28k
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(15)));
452
4
}
453
454
4
void Daemon::report_runtime_query_statistics_thread() {
455
6.60k
    while (!_stop_background_threads_latch.wait_for(
456
6.60k
            std::chrono::milliseconds(config::report_query_statistics_interval_ms))) {
457
6.60k
        ExecEnv::GetInstance()->runtime_query_statistics_mgr()->report_runtime_query_statistics();
458
6.60k
    }
459
4
}
460
461
4
void Daemon::report_delete_bitmap_metrics_thread() {
462
334
    while (!_stop_background_threads_latch.wait_for(
463
334
            std::chrono::seconds(config::report_delete_bitmap_metrics_interval_s))) {
464
330
        if (config::enable_report_delete_bitmap_metrics) {
465
0
            StorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_local();
466
0
            auto* metrics = DorisMetrics::instance();
467
0
            metrics->valid_delete_bitmap_key_count->set_value(
468
0
                    engine.tablet_manager()->get_valid_delete_bitmap_key_count());
469
0
            metrics->invalid_delete_bitmap_key_count->set_value(
470
0
                    engine.tablet_manager()->get_invalid_delete_bitmap_key_count());
471
0
        }
472
330
    }
473
4
}
474
475
4
void Daemon::je_reset_dirty_decay_thread() const {
476
4
    do {
477
4
        std::unique_lock<std::mutex> l(doris::JemallocControl::je_reset_dirty_decay_lock);
478
199k
        while (_stop_background_threads_latch.count() != 0 &&
479
199k
               !doris::JemallocControl::je_reset_dirty_decay_notify.load(
480
199k
                       std::memory_order_relaxed)) {
481
199k
            doris::JemallocControl::je_reset_dirty_decay_cv.wait_for(
482
199k
                    l, std::chrono::milliseconds(100));
483
199k
        }
484
4
        if (_stop_background_threads_latch.count() == 0) {
485
2
            break;
486
2
        }
487
488
2
        Defer defer {[&]() {
489
0
            doris::JemallocControl::je_reset_dirty_decay_notify.store(false,
490
0
                                                                      std::memory_order_relaxed);
491
0
        }};
492
#ifdef USE_JEMALLOC
493
        if (config::disable_memory_gc || !config::enable_je_purge_dirty_pages) {
494
            continue;
495
        }
496
497
        // There is a significant difference only when dirty_decay_ms is equal to 0 or not.
498
        //
499
        // 1. When dirty_decay_ms is not equal to 0, the free memory will be cached in the Jemalloc
500
        // dirty page first. even if dirty_decay_ms is equal to 1, the Jemalloc dirty page will not
501
        // be released to the system exactly after 1ms, it will be released according to the decay rule.
502
        // The Jemalloc document specifies that dirty_decay_ms is an approximate time.
503
        //
504
        // 2. It has been observed in an actual cluster that even if dirty_decay_ms is changed
505
        // from th default 5000 to 1, Jemalloc dirty page will still cache a large amount of memory, everything
506
        // seems to be the same as `dirty_decay_ms:5000`. only when dirty_decay_ms is changed to 0,
507
        // jemalloc dirty page will stop caching and free memory will be released to the system immediately.
508
        // of course, performance will be affected.
509
        //
510
        // 3. After reducing dirty_decay_ms, manually calling `decay_all_arena_dirty_pages` may release dirty pages
511
        // as soon as possible, but no relevant experimental data can be found, so it is simple and safe
512
        // to adjust dirty_decay_ms only between zero and non-zero.
513
514
        if (doris::JemallocControl::je_enable_dirty_page) {
515
            doris::JemallocControl::je_reset_all_arena_dirty_decay_ms(config::je_dirty_decay_ms);
516
        } else {
517
            doris::JemallocControl::je_reset_all_arena_dirty_decay_ms(0);
518
        }
519
#endif
520
2
    } while (true);
521
4
}
522
523
4
void Daemon::cache_adjust_capacity_thread() {
524
1.17k
    do {
525
1.17k
        std::unique_lock<std::mutex> l(doris::GlobalMemoryArbitrator::cache_adjust_capacity_lock);
526
200k
        while (_stop_background_threads_latch.count() != 0 &&
527
200k
               !doris::GlobalMemoryArbitrator::cache_adjust_capacity_notify.load(
528
200k
                       std::memory_order_relaxed)) {
529
199k
            doris::GlobalMemoryArbitrator::cache_adjust_capacity_cv.wait_for(
530
199k
                    l, std::chrono::milliseconds(100));
531
199k
        }
532
1.17k
        double adjust_weighted = std::min<double>(
533
1.17k
                GlobalMemoryArbitrator::last_periodic_refreshed_cache_capacity_adjust_weighted,
534
1.17k
                GlobalMemoryArbitrator::last_memory_exceeded_cache_capacity_adjust_weighted);
535
1.17k
        if (_stop_background_threads_latch.count() == 0) {
536
2
            break;
537
2
        }
538
539
1.17k
        Defer defer {[&]() {
540
1.17k
            doris::GlobalMemoryArbitrator::cache_adjust_capacity_notify.store(
541
1.17k
                    false, std::memory_order_relaxed);
542
1.17k
        }};
543
1.17k
        if (config::disable_memory_gc) {
544
0
            continue;
545
0
        }
546
1.17k
        if (GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted ==
547
1.17k
            adjust_weighted) {
548
0
            LOG(INFO) << fmt::format(
549
0
                    "[MemoryGC] adjust cache capacity end, adjust_weighted {} has not been "
550
0
                    "modified.",
551
0
                    adjust_weighted);
552
0
            continue;
553
0
        }
554
1.17k
        std::unique_ptr<RuntimeProfile> profile = std::make_unique<RuntimeProfile>("");
555
1.17k
        auto freed_mem = CacheManager::instance()->for_each_cache_refresh_capacity(adjust_weighted,
556
1.17k
                                                                                   profile.get());
557
1.17k
        std::stringstream ss;
558
1.17k
        profile->pretty_print(&ss);
559
1.17k
        LOG(INFO) << fmt::format(
560
1.17k
                "[MemoryGC] adjust cache capacity end, free memory {}, details: {}",
561
1.17k
                PrettyPrinter::print(freed_mem, TUnit::BYTES), ss.str());
562
1.17k
        GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted = adjust_weighted;
563
1.17k
    } while (true);
564
4
}
565
566
4
void Daemon::cache_prune_stale_thread() {
567
4
    int32_t interval = config::cache_periodic_prune_stale_sweep_sec;
568
334
    while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))) {
569
330
        if (config::cache_periodic_prune_stale_sweep_sec <= 0) {
570
0
            LOG(WARNING) << "config of cache clean interval is: [" << interval
571
0
                         << "], it means the cache prune stale thread is disabled, will wait 3s "
572
0
                            "and check again.";
573
0
            interval = 3;
574
0
            continue;
575
0
        }
576
330
        if (config::disable_memory_gc) {
577
0
            continue;
578
0
        }
579
330
        CacheManager::instance()->for_each_cache_prune_stale();
580
330
        interval = config::cache_periodic_prune_stale_sweep_sec;
581
330
    }
582
4
}
583
584
2
void Daemon::be_proc_monitor_thread() {
585
404
    while (!_stop_background_threads_latch.wait_for(
586
404
            std::chrono::milliseconds(config::be_proc_monitor_interval_ms))) {
587
402
        LOG(INFO) << "log be thread num, " << BeProcMonitor::get_be_thread_info();
588
402
    }
589
2
}
590
591
4
void Daemon::calculate_workload_group_metrics_thread() {
592
3.99k
    while (!_stop_background_threads_latch.wait_for(
593
3.99k
            std::chrono::milliseconds(config::workload_group_metrics_interval_ms))) {
594
3.99k
        ExecEnv::GetInstance()->workload_group_mgr()->refresh_workload_group_metrics();
595
3.99k
    }
596
4
}
597
598
4
void Daemon::start() {
599
4
    Status st;
600
4
    st = Thread::create(
601
4
            "Daemon", "tcmalloc_gc_thread", [this]() { this->tcmalloc_gc_thread(); },
602
4
            &_threads.emplace_back());
603
4
    CHECK(st.ok()) << st;
604
4
    st = Thread::create(
605
4
            "Daemon", "memory_maintenance_thread", [this]() { this->memory_maintenance_thread(); },
606
4
            &_threads.emplace_back());
607
4
    CHECK(st.ok()) << st;
608
4
    st = Thread::create(
609
4
            "Daemon", "memtable_memory_refresh_thread",
610
4
            [this]() { this->memtable_memory_refresh_thread(); }, &_threads.emplace_back());
611
4
    CHECK(st.ok()) << st;
612
613
4
    if (config::enable_metric_calculator) {
614
4
        st = Thread::create(
615
4
                "Daemon", "calculate_metrics_thread",
616
4
                [this]() { this->calculate_metrics_thread(); }, &_threads.emplace_back());
617
4
        CHECK(st.ok()) << st;
618
4
    }
619
4
    st = Thread::create(
620
4
            "Daemon", "je_reset_dirty_decay_thread",
621
4
            [this]() { this->je_reset_dirty_decay_thread(); }, &_threads.emplace_back());
622
4
    CHECK(st.ok()) << st;
623
4
    st = Thread::create(
624
4
            "Daemon", "cache_adjust_capacity_thread",
625
4
            [this]() { this->cache_adjust_capacity_thread(); }, &_threads.emplace_back());
626
4
    CHECK(st.ok()) << st;
627
4
    st = Thread::create(
628
4
            "Daemon", "cache_prune_stale_thread", [this]() { this->cache_prune_stale_thread(); },
629
4
            &_threads.emplace_back());
630
4
    CHECK(st.ok()) << st;
631
4
    st = Thread::create(
632
4
            "Daemon", "query_runtime_statistics_thread",
633
4
            [this]() { this->report_runtime_query_statistics_thread(); }, &_threads.emplace_back());
634
4
    CHECK(st.ok()) << st;
635
636
4
    if (!config::is_cloud_mode()) {
637
4
        st = Thread::create(
638
4
                "Daemon", "delete_bitmap_metrics_thread",
639
4
                [this]() { this->report_delete_bitmap_metrics_thread(); },
640
4
                &_threads.emplace_back());
641
4
        CHECK(st.ok()) << st;
642
4
    }
643
644
4
    if (config::enable_be_proc_monitor) {
645
2
        st = Thread::create(
646
2
                "Daemon", "be_proc_monitor_thread", [this]() { this->be_proc_monitor_thread(); },
647
2
                &_threads.emplace_back());
648
2
    }
649
4
    CHECK(st.ok()) << st;
650
651
4
    st = Thread::create(
652
4
            "Daemon", "workload_group_metrics",
653
4
            [this]() { this->calculate_workload_group_metrics_thread(); },
654
4
            &_threads.emplace_back());
655
4
    CHECK(st.ok()) << st;
656
4
}
657
658
2
void Daemon::stop() {
659
2
    LOG(INFO) << "Doris daemon is stopping.";
660
2
    if (_stop_background_threads_latch.count() == 0) {
661
0
        LOG(INFO) << "Doris daemon stop returned since no bg threads latch.";
662
0
        return;
663
0
    }
664
2
    _stop_background_threads_latch.count_down();
665
21
    for (auto&& t : _threads) {
666
21
        if (t) {
667
21
            t->join();
668
21
        }
669
21
    }
670
2
    LOG(INFO) << "Doris daemon stopped after background threads are joined.";
671
2
}
672
673
} // namespace doris