Coverage Report

Created: 2025-06-08 09:26

/root/doris/be/src/common/daemon.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "common/daemon.h"
19
20
// IWYU pragma: no_include <bthread/errno.h>
21
#include <errno.h> // IWYU pragma: keep
22
#include <gflags/gflags.h>
23
#include <gperftools/malloc_extension.h> // IWYU pragma: keep
24
// IWYU pragma: no_include <bits/std_abs.h>
25
#include <butil/iobuf.h>
26
#include <math.h>
27
#include <signal.h>
28
#include <stdint.h>
29
#include <stdlib.h>
30
#include <string.h>
31
32
#include <algorithm>
33
// IWYU pragma: no_include <bits/chrono.h>
34
#include <chrono> // IWYU pragma: keep
35
#include <map>
36
#include <ostream>
37
#include <set>
38
#include <string>
39
40
#include "common/config.h"
41
#include "common/logging.h"
42
#include "common/status.h"
43
#include "olap/memtable_memory_limiter.h"
44
#include "olap/options.h"
45
#include "olap/storage_engine.h"
46
#include "olap/tablet_manager.h"
47
#include "runtime/client_cache.h"
48
#include "runtime/exec_env.h"
49
#include "runtime/fragment_mgr.h"
50
#include "runtime/memory/global_memory_arbitrator.h"
51
#include "runtime/memory/mem_tracker.h"
52
#include "runtime/memory/mem_tracker_limiter.h"
53
#include "runtime/memory/memory_reclamation.h"
54
#include "runtime/runtime_query_statistics_mgr.h"
55
#include "runtime/workload_group/workload_group_manager.h"
56
#include "util/cpu_info.h"
57
#include "util/debug_util.h"
58
#include "util/disk_info.h"
59
#include "util/doris_metrics.h"
60
#include "util/mem_info.h"
61
#include "util/metrics.h"
62
#include "util/network_util.h"
63
#include "util/perf_counters.h"
64
#include "util/system_metrics.h"
65
#include "util/thrift_util.h"
66
#include "util/time.h"
67
68
namespace doris {
69
70
3
void Daemon::tcmalloc_gc_thread() {
71
    // TODO All cache GC wish to be supported
72
#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) && \
73
        !defined(USE_JEMALLOC)
74
75
    // Limit size of tcmalloc cache via release_rate and max_cache_percent.
76
    // We adjust release_rate according to memory_pressure, which is usage percent of memory.
77
    int64_t max_cache_percent = 60;
78
    double release_rates[10] = {1.0, 1.0, 1.0, 5.0, 5.0, 20.0, 50.0, 100.0, 500.0, 2000.0};
79
    int64_t pressure_limit = 90;
80
    bool is_performance_mode = false;
81
    int64_t physical_limit_bytes =
82
            std::min(MemInfo::physical_mem() - MemInfo::sys_mem_available_low_water_mark(),
83
                     MemInfo::mem_limit());
84
85
    if (config::memory_mode == std::string("performance")) {
86
        max_cache_percent = 100;
87
        pressure_limit = 90;
88
        is_performance_mode = true;
89
        physical_limit_bytes = std::min(MemInfo::mem_limit(), MemInfo::physical_mem());
90
    } else if (config::memory_mode == std::string("compact")) {
91
        max_cache_percent = 20;
92
        pressure_limit = 80;
93
    }
94
95
    int last_ms = 0;
96
    const int kMaxLastMs = 30000;
97
    const int kIntervalMs = 10;
98
    size_t init_aggressive_decommit = 0;
99
    size_t current_aggressive_decommit = 0;
100
    size_t expected_aggressive_decommit = 0;
101
    int64_t last_memory_pressure = 0;
102
103
    MallocExtension::instance()->GetNumericProperty("tcmalloc.aggressive_memory_decommit",
104
                                                    &init_aggressive_decommit);
105
    current_aggressive_decommit = init_aggressive_decommit;
106
107
    while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(kIntervalMs))) {
108
        size_t tc_used_bytes = 0;
109
        size_t tc_alloc_bytes = 0;
110
        size_t rss = PerfCounters::get_vm_rss();
111
112
        MallocExtension::instance()->GetNumericProperty("generic.total_physical_bytes",
113
                                                        &tc_alloc_bytes);
114
        MallocExtension::instance()->GetNumericProperty("generic.current_allocated_bytes",
115
                                                        &tc_used_bytes);
116
        int64_t tc_cached_bytes = (int64_t)tc_alloc_bytes - (int64_t)tc_used_bytes;
117
        int64_t to_free_bytes =
118
                (int64_t)tc_cached_bytes - ((int64_t)tc_used_bytes * max_cache_percent / 100);
119
        to_free_bytes = std::max(to_free_bytes, (int64_t)0);
120
121
        int64_t memory_pressure = 0;
122
        int64_t rss_pressure = 0;
123
        int64_t alloc_bytes = std::max(rss, tc_alloc_bytes);
124
        memory_pressure = alloc_bytes * 100 / physical_limit_bytes;
125
        rss_pressure = rss * 100 / physical_limit_bytes;
126
127
        expected_aggressive_decommit = init_aggressive_decommit;
128
        if (memory_pressure > pressure_limit) {
129
            // We are reaching oom, so release cache aggressively.
130
            // Ideally, we should reuse cache and not allocate from system any more,
131
            // however, it is hard to set limit on cache of tcmalloc and doris
132
            // use mmap in vectorized mode.
133
            // Limit cache capactiy is enough.
134
            if (rss_pressure > pressure_limit) {
135
                int64_t min_free_bytes = alloc_bytes - physical_limit_bytes * 9 / 10;
136
                to_free_bytes = std::max(to_free_bytes, min_free_bytes);
137
                to_free_bytes = std::max(to_free_bytes, tc_cached_bytes * 30 / 100);
138
                // We assure that we have at least 500M bytes in cache.
139
                to_free_bytes = std::min(to_free_bytes, tc_cached_bytes - 500 * 1024 * 1024);
140
                expected_aggressive_decommit = 1;
141
            }
142
            last_ms = kMaxLastMs;
143
        } else if (memory_pressure > (pressure_limit - 10)) {
144
            // In most cases, adjusting release rate is enough, if memory are consumed quickly
145
            // we should release manually.
146
            if (last_memory_pressure <= (pressure_limit - 10)) {
147
                to_free_bytes = std::max(to_free_bytes, tc_cached_bytes * 10 / 100);
148
            }
149
        }
150
151
        int release_rate_index = memory_pressure / 10;
152
        double release_rate = 1.0;
153
        if (release_rate_index >= sizeof(release_rates) / sizeof(release_rates[0])) {
154
            release_rate = 2000.0;
155
        } else {
156
            release_rate = release_rates[release_rate_index];
157
        }
158
        MallocExtension::instance()->SetMemoryReleaseRate(release_rate);
159
160
        if ((current_aggressive_decommit != expected_aggressive_decommit) && !is_performance_mode) {
161
            MallocExtension::instance()->SetNumericProperty("tcmalloc.aggressive_memory_decommit",
162
                                                            expected_aggressive_decommit);
163
            current_aggressive_decommit = expected_aggressive_decommit;
164
        }
165
166
        last_memory_pressure = memory_pressure;
167
        // We release at least 2% bytes once, frequent releasing hurts performance.
168
        if (to_free_bytes > (physical_limit_bytes * 2 / 100)) {
169
            last_ms += kIntervalMs;
170
            if (last_ms >= kMaxLastMs) {
171
                LOG(INFO) << "generic.current_allocated_bytes " << tc_used_bytes
172
                          << ", generic.total_physical_bytes " << tc_alloc_bytes << ", rss " << rss
173
                          << ", max_cache_percent " << max_cache_percent << ", release_rate "
174
                          << release_rate << ", memory_pressure " << memory_pressure
175
                          << ", physical_limit_bytes " << physical_limit_bytes << ", to_free_bytes "
176
                          << to_free_bytes << ", current_aggressive_decommit "
177
                          << current_aggressive_decommit;
178
                MallocExtension::instance()->ReleaseToSystem(to_free_bytes);
179
                last_ms = 0;
180
            }
181
        } else {
182
            last_ms = 0;
183
        }
184
    }
185
#endif
186
3
}
187
188
3
void Daemon::memory_maintenance_thread() {
189
3
    int32_t interval_milliseconds = config::memory_maintenance_sleep_time_ms;
190
3
    int64_t last_print_proc_mem = PerfCounters::get_vm_rss();
191
64.5k
    while (!_stop_background_threads_latch.wait_for(
192
64.5k
            std::chrono::milliseconds(interval_milliseconds))) {
193
        // Refresh process memory metrics.
194
64.5k
        doris::PerfCounters::refresh_proc_status();
195
64.5k
        doris::MemInfo::refresh_proc_meminfo();
196
64.5k
        doris::GlobalMemoryArbitrator::reset_refresh_interval_memory_growth();
197
64.5k
        ExecEnv::GetInstance()->brpc_iobuf_block_memory_tracker()->set_consumption(
198
64.5k
                butil::IOBuf::block_memory());
199
        // Refresh allocator memory metrics.
200
#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER)
201
        doris::MemInfo::refresh_allocator_mem();
202
#ifdef USE_JEMALLOC
203
        if (doris::MemInfo::je_dirty_pages_mem() > doris::MemInfo::je_dirty_pages_mem_limit() &&
204
            GlobalMemoryArbitrator::is_exceed_soft_mem_limit()) {
205
            doris::MemInfo::notify_je_purge_dirty_pages();
206
        }
207
#endif
208
        if (config::enable_system_metrics) {
209
            DorisMetrics::instance()->system_metrics()->update_allocator_metrics();
210
        }
211
#endif
212
64.5k
        MemInfo::refresh_memory_bvar();
213
214
        // Update and print memory stat when the memory changes by 256M.
215
64.5k
        if (abs(last_print_proc_mem - PerfCounters::get_vm_rss()) > 268435456) {
216
218
            last_print_proc_mem = PerfCounters::get_vm_rss();
217
218
            doris::MemTrackerLimiter::clean_tracker_limiter_group();
218
218
            doris::MemTrackerLimiter::enable_print_log_process_usage();
219
            // Refresh mem tracker each type counter.
220
218
            doris::MemTrackerLimiter::refresh_global_counter();
221
218
            LOG(INFO) << doris::GlobalMemoryArbitrator::
222
218
                            process_mem_log_str(); // print mem log when memory state by 256M
223
218
        }
224
64.5k
    }
225
3
}
226
227
3
void Daemon::memory_gc_thread() {
228
3
    int32_t interval_milliseconds = config::memory_maintenance_sleep_time_ms;
229
3
    int32_t memory_minor_gc_sleep_time_ms = 0;
230
3
    int32_t memory_full_gc_sleep_time_ms = 0;
231
3
    int32_t memory_gc_sleep_time_ms = config::memory_gc_sleep_time_ms;
232
70.5k
    while (!_stop_background_threads_latch.wait_for(
233
70.5k
            std::chrono::milliseconds(interval_milliseconds))) {
234
70.5k
        if (config::disable_memory_gc) {
235
0
            continue;
236
0
        }
237
70.5k
        auto sys_mem_available = doris::GlobalMemoryArbitrator::sys_mem_available();
238
70.5k
        auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage();
239
240
        // GC excess memory for resource groups that not enable overcommit
241
70.5k
        auto tg_free_mem = doris::MemoryReclamation::tg_disable_overcommit_group_gc();
242
70.5k
        sys_mem_available += tg_free_mem;
243
70.5k
        process_memory_usage -= tg_free_mem;
244
245
70.5k
        if (memory_full_gc_sleep_time_ms <= 0 &&
246
70.5k
            (sys_mem_available < doris::MemInfo::sys_mem_available_low_water_mark() ||
247
70.5k
             process_memory_usage >= doris::MemInfo::mem_limit())) {
248
            // No longer full gc and minor gc during sleep.
249
0
            std::string mem_info =
250
0
                    doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str();
251
0
            memory_full_gc_sleep_time_ms = memory_gc_sleep_time_ms;
252
0
            memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms;
253
0
            LOG(INFO) << fmt::format("[MemoryGC] start full GC, {}.", mem_info);
254
0
            doris::MemTrackerLimiter::print_log_process_usage();
255
0
            if (doris::MemoryReclamation::process_full_gc(std::move(mem_info))) {
256
                // If there is not enough memory to be gc, the process memory usage will not be printed in the next continuous gc.
257
0
                doris::MemTrackerLimiter::enable_print_log_process_usage();
258
0
            }
259
70.5k
        } else if (memory_minor_gc_sleep_time_ms <= 0 &&
260
70.5k
                   (sys_mem_available < doris::MemInfo::sys_mem_available_warning_water_mark() ||
261
70.5k
                    process_memory_usage >= doris::MemInfo::soft_mem_limit())) {
262
            // No minor gc during sleep, but full gc is possible.
263
0
            std::string mem_info =
264
0
                    doris::GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str();
265
0
            memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms;
266
0
            LOG(INFO) << fmt::format("[MemoryGC] start minor GC, {}.", mem_info);
267
0
            doris::MemTrackerLimiter::print_log_process_usage();
268
0
            if (doris::MemoryReclamation::process_minor_gc(std::move(mem_info))) {
269
0
                doris::MemTrackerLimiter::enable_print_log_process_usage();
270
0
            }
271
70.5k
        } else {
272
70.5k
            if (memory_full_gc_sleep_time_ms > 0) {
273
0
                memory_full_gc_sleep_time_ms -= interval_milliseconds;
274
0
            }
275
70.5k
            if (memory_minor_gc_sleep_time_ms > 0) {
276
0
                memory_minor_gc_sleep_time_ms -= interval_milliseconds;
277
0
            }
278
70.5k
        }
279
70.5k
    }
280
3
}
281
282
3
void Daemon::memtable_memory_limiter_tracker_refresh_thread() {
283
    // Refresh the memory statistics of the load channel tracker more frequently,
284
    // which helps to accurately control the memory of LoadChannelMgr.
285
1.34M
    while (!_stop_background_threads_latch.wait_for(
286
1.34M
            std::chrono::milliseconds(config::memtable_mem_tracker_refresh_interval_ms))) {
287
1.34M
        doris::ExecEnv::GetInstance()->memtable_memory_limiter()->refresh_mem_tracker();
288
1.34M
    }
289
3
}
290
291
/*
292
 * this thread will calculate some metrics at a fix interval(15 sec)
293
 * 1. push bytes per second
294
 * 2. scan bytes per second
295
 * 3. max io util of all disks
296
 * 4. max network send bytes rate
297
 * 5. max network receive bytes rate
298
 */
299
3
void Daemon::calculate_metrics_thread() {
300
3
    int64_t last_ts = -1L;
301
3
    int64_t lst_query_bytes = -1;
302
303
3
    std::map<std::string, int64_t> lst_disks_io_time;
304
3
    std::map<std::string, int64_t> lst_net_send_bytes;
305
3
    std::map<std::string, int64_t> lst_net_receive_bytes;
306
307
458
    do {
308
458
        DorisMetrics::instance()->metric_registry()->trigger_all_hooks(true);
309
310
458
        if (last_ts == -1L) {
311
3
            last_ts = GetMonoTimeMicros() / 1000;
312
3
            lst_query_bytes = DorisMetrics::instance()->query_scan_bytes->value();
313
3
            if (config::enable_system_metrics) {
314
2
                DorisMetrics::instance()->system_metrics()->get_disks_io_time(&lst_disks_io_time);
315
2
                DorisMetrics::instance()->system_metrics()->get_network_traffic(
316
2
                        &lst_net_send_bytes, &lst_net_receive_bytes);
317
2
            }
318
455
        } else {
319
455
            int64_t current_ts = GetMonoTimeMicros() / 1000;
320
455
            long interval = (current_ts - last_ts) / 1000;
321
455
            last_ts = current_ts;
322
323
            // 1. query bytes per second
324
455
            int64_t current_query_bytes = DorisMetrics::instance()->query_scan_bytes->value();
325
455
            int64_t qps = (current_query_bytes - lst_query_bytes) / (interval + 1);
326
455
            DorisMetrics::instance()->query_scan_bytes_per_second->set_value(qps < 0 ? 0 : qps);
327
455
            lst_query_bytes = current_query_bytes;
328
329
455
            if (config::enable_system_metrics) {
330
                // 2. max disk io util
331
38
                DorisMetrics::instance()->system_metrics()->update_max_disk_io_util_percent(
332
38
                        lst_disks_io_time, 15);
333
334
                // update lst map
335
38
                DorisMetrics::instance()->system_metrics()->get_disks_io_time(&lst_disks_io_time);
336
337
                // 3. max network traffic
338
38
                int64_t max_send = 0;
339
38
                int64_t max_receive = 0;
340
38
                DorisMetrics::instance()->system_metrics()->get_max_net_traffic(
341
38
                        lst_net_send_bytes, lst_net_receive_bytes, 15, &max_send, &max_receive);
342
38
                DorisMetrics::instance()->system_metrics()->update_max_network_send_bytes_rate(
343
38
                        max_send);
344
38
                DorisMetrics::instance()->system_metrics()->update_max_network_receive_bytes_rate(
345
38
                        max_receive);
346
                // update lst map
347
38
                DorisMetrics::instance()->system_metrics()->get_network_traffic(
348
38
                        &lst_net_send_bytes, &lst_net_receive_bytes);
349
350
38
                DorisMetrics::instance()->system_metrics()->update_be_avail_cpu_num();
351
38
            }
352
353
455
            DorisMetrics::instance()->all_rowsets_num->set_value(
354
455
                    StorageEngine::instance()->tablet_manager()->get_rowset_nums());
355
455
            DorisMetrics::instance()->all_segments_num->set_value(
356
455
                    StorageEngine::instance()->tablet_manager()->get_segment_nums());
357
455
        }
358
458
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(15)));
359
3
}
360
361
3
void Daemon::report_runtime_query_statistics_thread() {
362
2.34k
    while (!_stop_background_threads_latch.wait_for(
363
2.34k
            std::chrono::milliseconds(config::report_query_statistics_interval_ms))) {
364
2.34k
        ExecEnv::GetInstance()->runtime_query_statistics_mgr()->report_runtime_query_statistics();
365
2.34k
    }
366
3
}
367
368
3
void Daemon::je_purge_dirty_pages_thread() const {
369
3
    do {
370
3
        std::unique_lock<std::mutex> l(doris::MemInfo::je_purge_dirty_pages_lock);
371
70.5k
        while (_stop_background_threads_latch.count() != 0 &&
372
70.5k
               !doris::MemInfo::je_purge_dirty_pages_notify.load(std::memory_order_relaxed)) {
373
70.5k
            doris::MemInfo::je_purge_dirty_pages_cv.wait_for(l, std::chrono::milliseconds(100));
374
70.5k
        }
375
3
        if (_stop_background_threads_latch.count() == 0) {
376
1
            break;
377
1
        }
378
379
2
        Defer defer {[&]() {
380
0
            doris::MemInfo::je_purge_dirty_pages_notify.store(false, std::memory_order_relaxed);
381
0
        }};
382
2
        if (config::disable_memory_gc) {
383
0
            continue;
384
0
        }
385
2
        doris::MemInfo::je_purge_all_arena_dirty_pages();
386
2
    } while (true);
387
3
}
388
389
3
void Daemon::cache_prune_stale_thread() {
390
3
    int32_t interval = config::cache_periodic_prune_stale_sweep_sec;
391
24
    while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))) {
392
21
        if (interval <= 0) {
393
0
            LOG(WARNING) << "config of cache clean interval is illegal: [" << interval
394
0
                         << "], force set to 3600 ";
395
0
            interval = 3600;
396
0
        }
397
21
        if (config::disable_memory_gc) {
398
0
            continue;
399
0
        }
400
21
        CacheManager::instance()->for_each_cache_prune_stale();
401
21
    }
402
3
}
403
404
3
void Daemon::wg_weighted_memory_ratio_refresh_thread() {
405
    // Refresh weighted memory ratio of workload groups
406
139k
    while (!_stop_background_threads_latch.wait_for(
407
139k
            std::chrono::milliseconds(config::wg_weighted_memory_ratio_refresh_interval_ms))) {
408
139k
        doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_limit();
409
139k
    }
410
3
}
411
412
3
void Daemon::calculate_workload_group_metrics_thread() {
413
1.41k
    while (!_stop_background_threads_latch.wait_for(
414
1.41k
            std::chrono::milliseconds(config::workload_group_metrics_interval_ms))) {
415
1.41k
        ExecEnv::GetInstance()->workload_group_mgr()->refresh_workload_group_metrics();
416
1.41k
    }
417
3
}
418
419
3
void Daemon::start() {
420
3
    Status st;
421
3
    st = Thread::create(
422
3
            "Daemon", "tcmalloc_gc_thread", [this]() { this->tcmalloc_gc_thread(); },
423
3
            &_threads.emplace_back());
424
3
    CHECK(st.ok()) << st;
425
3
    st = Thread::create(
426
3
            "Daemon", "memory_maintenance_thread", [this]() { this->memory_maintenance_thread(); },
427
3
            &_threads.emplace_back());
428
3
    CHECK(st.ok()) << st;
429
3
    st = Thread::create(
430
3
            "Daemon", "memory_gc_thread", [this]() { this->memory_gc_thread(); },
431
3
            &_threads.emplace_back());
432
3
    CHECK(st.ok()) << st;
433
3
    st = Thread::create(
434
3
            "Daemon", "memtable_memory_limiter_tracker_refresh_thread",
435
3
            [this]() { this->memtable_memory_limiter_tracker_refresh_thread(); },
436
3
            &_threads.emplace_back());
437
3
    CHECK(st.ok()) << st;
438
439
3
    if (config::enable_metric_calculator) {
440
3
        st = Thread::create(
441
3
                "Daemon", "calculate_metrics_thread",
442
3
                [this]() { this->calculate_metrics_thread(); }, &_threads.emplace_back());
443
3
        CHECK(st.ok()) << st;
444
3
    }
445
3
    st = Thread::create(
446
3
            "Daemon", "je_purge_dirty_pages_thread",
447
3
            [this]() { this->je_purge_dirty_pages_thread(); }, &_threads.emplace_back());
448
3
    CHECK(st.ok()) << st;
449
3
    st = Thread::create(
450
3
            "Daemon", "cache_prune_stale_thread", [this]() { this->cache_prune_stale_thread(); },
451
3
            &_threads.emplace_back());
452
3
    CHECK(st.ok()) << st;
453
3
    st = Thread::create(
454
3
            "Daemon", "query_runtime_statistics_thread",
455
3
            [this]() { this->report_runtime_query_statistics_thread(); }, &_threads.emplace_back());
456
3
    CHECK(st.ok()) << st;
457
458
3
    st = Thread::create(
459
3
            "Daemon", "wg_weighted_memory_ratio_refresh_thread",
460
3
            [this]() { this->wg_weighted_memory_ratio_refresh_thread(); },
461
3
            &_threads.emplace_back());
462
3
    CHECK(st.ok()) << st;
463
464
3
    st = Thread::create(
465
3
            "Daemon", "workload_group_metrics",
466
3
            [this]() { this->calculate_workload_group_metrics_thread(); },
467
3
            &_threads.emplace_back());
468
3
    CHECK(st.ok()) << st;
469
3
}
470
471
1
void Daemon::stop() {
472
1
    LOG(INFO) << "Doris daemon is stopping.";
473
1
    if (_stop_background_threads_latch.count() == 0) {
474
0
        LOG(INFO) << "Doris daemon stop returned since no bg threads latch.";
475
0
        return;
476
0
    }
477
1
    _stop_background_threads_latch.count_down();
478
10
    for (auto&& t : _threads) {
479
10
        if (t) {
480
10
            t->join();
481
10
        }
482
10
    }
483
1
    LOG(INFO) << "Doris daemon stopped after background threads are joined.";
484
1
}
485
486
} // namespace doris