Coverage Report

Created: 2026-06-03 15:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/olap_server.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include <gen_cpp/Types_types.h>
19
#include <gen_cpp/olap_file.pb.h>
20
#include <glog/logging.h>
21
#include <rapidjson/prettywriter.h>
22
#include <rapidjson/stringbuffer.h>
23
#include <stdint.h>
24
#include <sys/types.h>
25
26
#include <algorithm>
27
#include <atomic>
28
// IWYU pragma: no_include <bits/chrono.h>
29
#include <gen_cpp/internal_service.pb.h>
30
31
#include <chrono> // IWYU pragma: keep
32
#include <cmath>
33
#include <condition_variable>
34
#include <cstdint>
35
#include <ctime>
36
#include <functional>
37
#include <map>
38
#include <memory>
39
#include <mutex>
40
#include <ostream>
41
#include <random>
42
#include <shared_mutex>
43
#include <string>
44
#include <thread>
45
#include <unordered_set>
46
#include <utility>
47
#include <vector>
48
49
#include "agent/utils.h"
50
#include "common/config.h"
51
#include "common/logging.h"
52
#include "common/metrics/doris_metrics.h"
53
#include "common/metrics/metrics.h"
54
#include "common/status.h"
55
#include "cpp/sync_point.h"
56
#include "io/fs/file_writer.h" // IWYU pragma: keep
57
#include "io/fs/path.h"
58
#include "load/memtable/memtable_flush_executor.h"
59
#include "runtime/memory/cache_manager.h"
60
#include "runtime/memory/global_memory_arbitrator.h"
61
#include "storage/compaction/cold_data_compaction.h"
62
#include "storage/compaction/compaction_permit_limiter.h"
63
#include "storage/compaction/cumulative_compaction.h"
64
#include "storage/compaction/cumulative_compaction_policy.h"
65
#include "storage/compaction/cumulative_compaction_time_series_policy.h"
66
#include "storage/compaction_task_tracker.h"
67
#include "storage/data_dir.h"
68
#include "storage/olap_common.h"
69
#include "storage/olap_define.h"
70
#include "storage/rowset/segcompaction.h"
71
#include "storage/schema_change/schema_change.h"
72
#include "storage/storage_engine.h"
73
#include "storage/storage_policy.h"
74
#include "storage/tablet/base_tablet.h"
75
#include "storage/tablet/tablet.h"
76
#include "storage/tablet/tablet_manager.h"
77
#include "storage/tablet/tablet_meta.h"
78
#include "storage/tablet/tablet_meta_manager.h"
79
#include "storage/tablet/tablet_schema.h"
80
#include "storage/task/engine_publish_version_task.h"
81
#include "storage/task/index_builder.h"
82
#include "util/client_cache.h"
83
#include "util/countdown_latch.h"
84
#include "util/debug_points.h"
85
#include "util/mem_info.h"
86
#include "util/thread.h"
87
#include "util/threadpool.h"
88
#include "util/time.h"
89
#include "util/uid_util.h"
90
#include "util/work_thread_pool.hpp"
91
92
using std::string;
93
94
namespace doris {
95
using io::Path;
96
97
// number of running SCHEMA-CHANGE threads
98
volatile uint32_t g_schema_change_active_threads = 0;
99
bvar::Status<int64_t> g_cumu_compaction_task_num_per_round("cumu_compaction_task_num_per_round", 0);
100
bvar::Status<int64_t> g_base_compaction_task_num_per_round("base_compaction_task_num_per_round", 0);
101
102
0
CompactionSubmitRegistry::CompactionSubmitRegistry(CompactionSubmitRegistry&& r) {
103
0
    std::swap(_tablet_submitted_cumu_compaction, r._tablet_submitted_cumu_compaction);
104
0
    std::swap(_tablet_submitted_base_compaction, r._tablet_submitted_base_compaction);
105
0
    std::swap(_tablet_submitted_full_compaction, r._tablet_submitted_full_compaction);
106
0
    std::swap(_tablet_submitted_binlog_compaction, r._tablet_submitted_binlog_compaction);
107
0
}
108
109
7.04k
CompactionSubmitRegistry CompactionSubmitRegistry::create_snapshot() {
110
    // full compaction is not engaged in this method
111
7.04k
    std::unique_lock<std::mutex> l(_tablet_submitted_compaction_mutex);
112
7.04k
    CompactionSubmitRegistry registry;
113
7.04k
    registry._tablet_submitted_base_compaction = _tablet_submitted_base_compaction;
114
7.04k
    registry._tablet_submitted_cumu_compaction = _tablet_submitted_cumu_compaction;
115
7.04k
    registry._tablet_submitted_binlog_compaction = _tablet_submitted_binlog_compaction;
116
7.04k
    return registry;
117
7.04k
}
118
119
3
void CompactionSubmitRegistry::reset(const std::vector<DataDir*>& stores) {
120
    // full compaction is not engaged in this method
121
3
    std::unique_lock<std::mutex> l(_tablet_submitted_compaction_mutex);
122
5
    for (const auto& store : stores) {
123
5
        _tablet_submitted_cumu_compaction[store] = {};
124
5
        _tablet_submitted_base_compaction[store] = {};
125
5
        _tablet_submitted_binlog_compaction[store] = {};
126
5
    }
127
3
}
128
129
uint32_t CompactionSubmitRegistry::count_executing_compaction(DataDir* dir,
130
12.7k
                                                              CompactionType compaction_type) {
131
    // non-lock, used in snapshot
132
12.7k
    const auto& compaction_tasks = _get_tablet_set(dir, compaction_type);
133
12.7k
    return cast_set<uint32_t>(std::count_if(
134
12.7k
            compaction_tasks.begin(), compaction_tasks.end(),
135
12.7k
            [](const auto& task) { return task->compaction_stage == CompactionStage::EXECUTING; }));
136
12.7k
}
137
138
1.77k
uint32_t CompactionSubmitRegistry::count_executing_cumu_and_base(DataDir* dir) {
139
    // non-lock, used in snapshot
140
1.77k
    return count_executing_compaction(dir, CompactionType::BASE_COMPACTION) +
141
1.77k
           count_executing_compaction(dir, CompactionType::CUMULATIVE_COMPACTION);
142
1.77k
}
143
144
21.8k
bool CompactionSubmitRegistry::has_compaction_task(DataDir* dir, CompactionType compaction_type) {
145
    // non-lock, used in snapshot
146
21.8k
    return !_get_tablet_set(dir, compaction_type).empty();
147
21.8k
}
148
149
std::vector<TabletCompactionContext> CompactionSubmitRegistry::pick_topn_tablets_for_compaction(
150
        TabletManager* tablet_mgr, DataDir* data_dir, CompactionType compaction_type,
151
10.9k
        const CumuCompactionPolicyTable& cumu_compaction_policies, uint32_t* disk_max_score) {
152
    // non-lock, used in snapshot
153
10.9k
    return tablet_mgr->find_best_tablets_to_compaction(compaction_type, data_dir,
154
10.9k
                                                       _get_tablet_set(data_dir, compaction_type),
155
10.9k
                                                       disk_max_score, cumu_compaction_policies);
156
10.9k
}
157
158
39.5k
bool CompactionSubmitRegistry::insert(TabletSharedPtr tablet, CompactionType compaction_type) {
159
39.5k
    std::unique_lock<std::mutex> l(_tablet_submitted_compaction_mutex);
160
39.5k
    auto& tablet_set = _get_tablet_set(tablet->data_dir(), compaction_type);
161
39.5k
    bool already_exist = !(tablet_set.insert(tablet).second);
162
39.5k
    return already_exist;
163
39.5k
}
164
165
void CompactionSubmitRegistry::remove(TabletSharedPtr tablet, CompactionType compaction_type,
166
39.5k
                                      std::function<void()> wakeup_cb) {
167
39.5k
    std::unique_lock<std::mutex> l(_tablet_submitted_compaction_mutex);
168
39.5k
    auto& tablet_set = _get_tablet_set(tablet->data_dir(), compaction_type);
169
39.5k
    size_t removed = tablet_set.erase(tablet);
170
39.5k
    if (removed == 1) {
171
39.5k
        wakeup_cb();
172
39.5k
    }
173
39.5k
}
174
175
CompactionSubmitRegistry::TabletSet& CompactionSubmitRegistry::_get_tablet_set(
176
124k
        DataDir* dir, CompactionType compaction_type) {
177
124k
    switch (compaction_type) {
178
14.5k
    case CompactionType::BASE_COMPACTION:
179
14.5k
        return _tablet_submitted_base_compaction[dir];
180
91.2k
    case CompactionType::CUMULATIVE_COMPACTION:
181
91.2k
        return _tablet_submitted_cumu_compaction[dir];
182
0
    case CompactionType::FULL_COMPACTION:
183
0
        return _tablet_submitted_full_compaction[dir];
184
18.8k
    case CompactionType::BINLOG_COMPACTION:
185
18.8k
        return _tablet_submitted_binlog_compaction[dir];
186
0
    default:
187
0
        CHECK(false) << "invalid compaction type";
188
124k
    }
189
124k
}
190
191
7.04k
static int32_t get_cumu_compaction_threads_num(size_t data_dirs_num) {
192
7.04k
    int32_t threads_num = config::max_cumu_compaction_threads;
193
7.04k
    if (threads_num == -1) {
194
7.04k
        int32_t num_cores = doris::CpuInfo::num_cores();
195
7.04k
        threads_num = std::max(cast_set<int32_t>(data_dirs_num), num_cores / 6);
196
7.04k
    }
197
7.04k
    threads_num = threads_num <= 0 ? 1 : threads_num;
198
7.04k
    return threads_num;
199
7.04k
}
200
201
7.04k
static int32_t get_base_compaction_threads_num(size_t data_dirs_num) {
202
7.04k
    int32_t threads_num = config::max_base_compaction_threads;
203
7.04k
    if (threads_num == -1) {
204
0
        threads_num = cast_set<int32_t>(data_dirs_num);
205
0
    }
206
7.04k
    threads_num = threads_num <= 0 ? 1 : threads_num;
207
7.04k
    return threads_num;
208
7.04k
}
209
210
7.04k
static int32_t get_binlog_compaction_threads_num(size_t data_dirs_num) {
211
7.04k
    int32_t threads_num = config::max_binlog_compaction_threads;
212
7.04k
    if (threads_num == -1) {
213
7.04k
        threads_num = cast_set<int32_t>(data_dirs_num);
214
7.04k
    }
215
7.04k
    threads_num = threads_num <= 0 ? 1 : threads_num;
216
7.04k
    return threads_num;
217
7.04k
}
218
219
3
Status StorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr) {
220
3
    RETURN_IF_ERROR(Thread::create(
221
3
            "StorageEngine", "unused_rowset_monitor_thread",
222
3
            [this]() { this->_unused_rowset_monitor_thread_callback(); },
223
3
            &_unused_rowset_monitor_thread));
224
3
    LOG(INFO) << "unused rowset monitor thread started";
225
226
3
    RETURN_IF_ERROR(Thread::create(
227
3
            "StorageEngine", "evict_querying_rowset_thread",
228
3
            [this]() { this->_evict_quring_rowset_thread_callback(); },
229
3
            &_evict_quering_rowset_thread));
230
3
    LOG(INFO) << "evict quering thread started";
231
232
    // start thread for monitoring the snapshot and trash folder
233
3
    RETURN_IF_ERROR(Thread::create(
234
3
            "StorageEngine", "garbage_sweeper_thread",
235
3
            [this]() { this->_garbage_sweeper_thread_callback(); }, &_garbage_sweeper_thread));
236
3
    LOG(INFO) << "garbage sweeper thread started";
237
238
    // start thread for monitoring the tablet with io error
239
3
    RETURN_IF_ERROR(Thread::create(
240
3
            "StorageEngine", "disk_stat_monitor_thread",
241
3
            [this]() { this->_disk_stat_monitor_thread_callback(); }, &_disk_stat_monitor_thread));
242
3
    LOG(INFO) << "disk stat monitor thread started";
243
244
    // convert store map to vector
245
3
    std::vector<DataDir*> data_dirs = get_stores();
246
247
3
    auto base_compaction_threads = get_base_compaction_threads_num(data_dirs.size());
248
3
    auto cumu_compaction_threads = get_cumu_compaction_threads_num(data_dirs.size());
249
3
    auto binlog_compaction_threads = get_binlog_compaction_threads_num(data_dirs.size());
250
251
3
    RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
252
3
                            .set_min_threads(base_compaction_threads)
253
3
                            .set_max_threads(base_compaction_threads)
254
3
                            .build(&_base_compaction_thread_pool));
255
3
    RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
256
3
                            .set_min_threads(cumu_compaction_threads)
257
3
                            .set_max_threads(cumu_compaction_threads)
258
3
                            .build(&_cumu_compaction_thread_pool));
259
3
    RETURN_IF_ERROR(ThreadPoolBuilder("BinlogCompactionTaskThreadPool")
260
3
                            .set_min_threads(binlog_compaction_threads)
261
3
                            .set_max_threads(binlog_compaction_threads)
262
3
                            .build(&_binlog_compaction_thread_pool));
263
264
3
    if (config::enable_segcompaction) {
265
3
        RETURN_IF_ERROR(ThreadPoolBuilder("SegCompactionTaskThreadPool")
266
3
                                .set_min_threads(config::segcompaction_num_threads)
267
3
                                .set_max_threads(config::segcompaction_num_threads)
268
3
                                .build(&_seg_compaction_thread_pool));
269
3
    }
270
3
    RETURN_IF_ERROR(ThreadPoolBuilder("ColdDataCompactionTaskThreadPool")
271
3
                            .set_min_threads(config::cold_data_compaction_thread_num)
272
3
                            .set_max_threads(config::cold_data_compaction_thread_num)
273
3
                            .build(&_cold_data_compaction_thread_pool));
274
275
3
    _compaction_submit_registry.reset(data_dirs);
276
277
    // compaction tasks producer thread
278
3
    RETURN_IF_ERROR(Thread::create(
279
3
            "StorageEngine", "compaction_tasks_producer_thread",
280
3
            [this]() { this->_compaction_tasks_producer_callback(); },
281
3
            &_compaction_tasks_producer_thread));
282
3
    LOG(INFO) << "compaction tasks producer thread started";
283
284
3
    RETURN_IF_ERROR(Thread::create(
285
3
            "StorageEngine", "binlog_compaction_tasks_producer_thread",
286
3
            [this]() { this->_binlog_compaction_tasks_producer_callback(); },
287
3
            &_binlog_compaction_tasks_producer_thread));
288
3
    LOG(INFO) << "binlog compaction tasks producer thread started";
289
290
3
    int32_t max_checkpoint_thread_num = config::max_meta_checkpoint_threads;
291
3
    if (max_checkpoint_thread_num < 0) {
292
3
        max_checkpoint_thread_num = cast_set<int32_t>(data_dirs.size());
293
3
    }
294
3
    RETURN_IF_ERROR(ThreadPoolBuilder("TabletMetaCheckpointTaskThreadPool")
295
3
                            .set_max_threads(max_checkpoint_thread_num)
296
3
                            .build(&_tablet_meta_checkpoint_thread_pool));
297
298
3
    RETURN_IF_ERROR(Thread::create(
299
3
            "StorageEngine", "tablet_checkpoint_tasks_producer_thread",
300
3
            [this, data_dirs]() { this->_tablet_checkpoint_callback(data_dirs); },
301
3
            &_tablet_checkpoint_tasks_producer_thread));
302
3
    LOG(INFO) << "tablet checkpoint tasks producer thread started";
303
304
3
    RETURN_IF_ERROR(Thread::create(
305
3
            "StorageEngine", "tablet_path_check_thread",
306
3
            [this]() { this->_tablet_path_check_callback(); }, &_tablet_path_check_thread));
307
3
    LOG(INFO) << "tablet path check thread started";
308
309
    // path scan and gc thread
310
3
    if (config::path_gc_check) {
311
5
        for (auto data_dir : get_stores()) {
312
5
            std::shared_ptr<Thread> path_gc_thread;
313
5
            RETURN_IF_ERROR(Thread::create(
314
5
                    "StorageEngine", "path_gc_thread",
315
5
                    [this, data_dir]() { this->_path_gc_thread_callback(data_dir); },
316
5
                    &path_gc_thread));
317
5
            _path_gc_threads.emplace_back(path_gc_thread);
318
5
        }
319
3
        LOG(INFO) << "path gc threads started. number:" << get_stores().size();
320
3
    }
321
322
3
    RETURN_IF_ERROR(ThreadPoolBuilder("CooldownTaskThreadPool")
323
3
                            .set_min_threads(config::cooldown_thread_num)
324
3
                            .set_max_threads(config::cooldown_thread_num)
325
3
                            .build(&_cooldown_thread_pool));
326
3
    LOG(INFO) << "cooldown thread pool started";
327
328
3
    RETURN_IF_ERROR(Thread::create(
329
3
            "StorageEngine", "cooldown_tasks_producer_thread",
330
3
            [this]() { this->_cooldown_tasks_producer_callback(); },
331
3
            &_cooldown_tasks_producer_thread));
332
3
    LOG(INFO) << "cooldown tasks producer thread started";
333
334
3
    RETURN_IF_ERROR(Thread::create(
335
3
            "StorageEngine", "remove_unused_remote_files_thread",
336
3
            [this]() { this->_remove_unused_remote_files_callback(); },
337
3
            &_remove_unused_remote_files_thread));
338
3
    LOG(INFO) << "remove unused remote files thread started";
339
340
3
    RETURN_IF_ERROR(Thread::create(
341
3
            "StorageEngine", "cold_data_compaction_producer_thread",
342
3
            [this]() { this->_cold_data_compaction_producer_callback(); },
343
3
            &_cold_data_compaction_producer_thread));
344
3
    LOG(INFO) << "cold data compaction producer thread started";
345
346
    // add tablet publish version thread pool
347
3
    RETURN_IF_ERROR(ThreadPoolBuilder("TabletPublishTxnThreadPool")
348
3
                            .set_min_threads(config::tablet_publish_txn_max_thread)
349
3
                            .set_max_threads(config::tablet_publish_txn_max_thread)
350
3
                            .build(&_tablet_publish_txn_thread_pool));
351
352
3
    RETURN_IF_ERROR(Thread::create(
353
3
            "StorageEngine", "async_publish_version_thread",
354
3
            [this]() { this->_async_publish_callback(); }, &_async_publish_thread));
355
3
    LOG(INFO) << "async publish thread started";
356
357
3
    RETURN_IF_ERROR(Thread::create(
358
3
            "StorageEngine", "check_tablet_delete_bitmap_score_thread",
359
3
            [this]() { this->_check_tablet_delete_bitmap_score_callback(); },
360
3
            &_check_delete_bitmap_score_thread));
361
3
    LOG(INFO) << "check tablet delete bitmap score thread started";
362
363
3
    _start_adaptive_thread_controller();
364
365
3
    LOG(INFO) << "all storage engine's background threads are started.";
366
3
    return Status::OK();
367
3
}
368
369
3
void StorageEngine::_garbage_sweeper_thread_callback() {
370
3
    uint32_t max_interval = config::max_garbage_sweep_interval;
371
3
    uint32_t min_interval = config::min_garbage_sweep_interval;
372
373
3
    if (max_interval < min_interval || min_interval <= 0) {
374
0
        LOG(WARNING) << "garbage sweep interval config is illegal: [max=" << max_interval
375
0
                     << " min=" << min_interval << "].";
376
0
        min_interval = 1;
377
0
        max_interval = max_interval >= min_interval ? max_interval : min_interval;
378
0
        LOG(INFO) << "force reset garbage sweep interval. "
379
0
                  << "max_interval=" << max_interval << ", min_interval=" << min_interval;
380
0
    }
381
382
3
    const double pi = M_PI;
383
3
    double usage = 1.0;
384
    // After the program starts, the first round of cleaning starts after min_interval.
385
3
    uint32_t curr_interval = min_interval;
386
6
    do {
387
        // Function properties:
388
        // when usage < 0.6,          ratio close to 1.(interval close to max_interval)
389
        // when usage at [0.6, 0.75], ratio is rapidly decreasing from 0.87 to 0.27.
390
        // when usage > 0.75,         ratio is slowly decreasing.
391
        // when usage > 0.8,          ratio close to min_interval.
392
        // when usage = 0.88,         ratio is approximately 0.0057.
393
6
        double ratio = (1.1 * (pi / 2 - std::atan(usage * 100 / 5 - 14)) - 0.28) / pi;
394
6
        ratio = ratio > 0 ? ratio : 0;
395
        // TODO(dx): fix it
396
6
        auto curr_interval_not_work = uint32_t(max_interval * ratio);
397
6
        curr_interval_not_work = std::max(curr_interval_not_work, min_interval);
398
6
        curr_interval_not_work = std::min(curr_interval_not_work, max_interval);
399
400
        // start clean trash and update usage.
401
6
        Status res = start_trash_sweep(&usage);
402
6
        if (res.ok() && _need_clean_trash.exchange(false, std::memory_order_relaxed)) {
403
0
            res = start_trash_sweep(&usage, true);
404
0
        }
405
406
6
        if (!res.ok()) {
407
0
            LOG(WARNING) << "one or more errors occur when sweep trash."
408
0
                         << "see previous message for detail. err code=" << res;
409
            // do nothing. continue next loop.
410
0
        }
411
6
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(curr_interval)));
412
3
}
413
414
3
void StorageEngine::_disk_stat_monitor_thread_callback() {
415
3
    int32_t interval = config::disk_stat_monitor_interval;
416
161
    do {
417
161
        _start_disk_stat_monitor();
418
419
161
        interval = config::disk_stat_monitor_interval;
420
161
        if (interval <= 0) {
421
0
            LOG(WARNING) << "disk_stat_monitor_interval config is illegal: " << interval
422
0
                         << ", force set to 1";
423
0
            interval = 1;
424
0
        }
425
161
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
426
3
}
427
428
3
void StorageEngine::_unused_rowset_monitor_thread_callback() {
429
3
    int32_t interval = config::unused_rowset_monitor_interval;
430
28
    do {
431
28
        start_delete_unused_rowset();
432
433
28
        interval = config::unused_rowset_monitor_interval;
434
28
        if (interval <= 0) {
435
0
            LOG(WARNING) << "unused_rowset_monitor_interval config is illegal: " << interval
436
0
                         << ", force set to 1";
437
0
            interval = 1;
438
0
        }
439
28
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
440
3
}
441
442
253
int32_t StorageEngine::_auto_get_interval_by_disk_capacity(DataDir* data_dir) {
443
253
    double disk_used = data_dir->get_usage(0);
444
253
    double remain_used = 1 - disk_used;
445
253
    DCHECK(remain_used >= 0 && remain_used <= 1);
446
253
    DCHECK(config::path_gc_check_interval_second >= 0);
447
253
    int32_t ret = 0;
448
253
    if (remain_used > 0.9) {
449
        // if config::path_gc_check_interval_second == 24h
450
0
        ret = config::path_gc_check_interval_second;
451
253
    } else if (remain_used > 0.7) {
452
        // 12h
453
0
        ret = config::path_gc_check_interval_second / 2;
454
253
    } else if (remain_used > 0.5) {
455
        // 6h
456
0
        ret = config::path_gc_check_interval_second / 4;
457
253
    } else if (remain_used > 0.3) {
458
        // 4h
459
79
        ret = config::path_gc_check_interval_second / 6;
460
174
    } else {
461
        // 3h
462
174
        ret = config::path_gc_check_interval_second / 8;
463
174
    }
464
253
    return ret;
465
253
}
466
467
5
void StorageEngine::_path_gc_thread_callback(DataDir* data_dir) {
468
5
    LOG(INFO) << "try to start path gc thread!";
469
5
    time_t last_exec_time = 0;
470
253
    do {
471
253
        time_t current_time = time(nullptr);
472
473
253
        int32_t interval = _auto_get_interval_by_disk_capacity(data_dir);
474
253
        DBUG_EXECUTE_IF("_path_gc_thread_callback.interval.eq.1ms", {
475
253
            LOG(INFO) << "debug point change interval eq 1ms";
476
253
            interval = 1;
477
253
            while (DebugPoints::instance()->is_enable("_path_gc_thread_callback.always.do")) {
478
253
                data_dir->perform_path_gc();
479
253
                std::this_thread::sleep_for(std::chrono::milliseconds(10));
480
253
            }
481
253
        });
482
253
        if (interval <= 0) {
483
253
            LOG(WARNING) << "path gc thread check interval config is illegal:" << interval
484
253
                         << " will be forced set to half hour";
485
253
            interval = 1800; // 0.5 hour
486
253
        }
487
253
        if (current_time - last_exec_time >= interval) {
488
5
            LOG(INFO) << "try to perform path gc! disk remain [" << 1 - data_dir->get_usage(0)
489
5
                      << "] internal [" << interval << "]";
490
5
            data_dir->perform_path_gc();
491
5
            last_exec_time = time(nullptr);
492
5
        }
493
253
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(5)));
494
5
    LOG(INFO) << "stop path gc thread!";
495
5
}
496
497
3
void StorageEngine::_tablet_checkpoint_callback(const std::vector<DataDir*>& data_dirs) {
498
3
    int64_t interval = config::generate_tablet_meta_checkpoint_tasks_interval_secs;
499
3
    do {
500
5
        for (auto data_dir : data_dirs) {
501
5
            LOG(INFO) << "begin to produce tablet meta checkpoint tasks, data_dir="
502
5
                      << data_dir->path();
503
5
            auto st = _tablet_meta_checkpoint_thread_pool->submit_func(
504
5
                    [data_dir, this]() { _tablet_manager->do_tablet_meta_checkpoint(data_dir); });
505
5
            if (!st.ok()) {
506
0
                LOG(WARNING) << "submit tablet checkpoint tasks failed.";
507
0
            }
508
5
        }
509
3
        interval = config::generate_tablet_meta_checkpoint_tasks_interval_secs;
510
3
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
511
3
}
512
513
3
void StorageEngine::_tablet_path_check_callback() {
514
3
    struct TabletIdComparator {
515
3
        bool operator()(Tablet* a, Tablet* b) { return a->tablet_id() < b->tablet_id(); }
516
3
    };
517
518
3
    using TabletQueue = std::priority_queue<Tablet*, std::vector<Tablet*>, TabletIdComparator>;
519
520
3
    int64_t interval = config::tablet_path_check_interval_seconds;
521
3
    if (interval <= 0) {
522
3
        return;
523
3
    }
524
525
0
    int64_t last_tablet_id = 0;
526
0
    do {
527
0
        int32_t batch_size = config::tablet_path_check_batch_size;
528
0
        if (batch_size <= 0) {
529
0
            if (_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))) {
530
0
                break;
531
0
            }
532
0
            continue;
533
0
        }
534
535
0
        LOG(INFO) << "start to check tablet path";
536
537
0
        auto all_tablets = _tablet_manager->get_all_tablet(
538
0
                [](Tablet* t) { return t->is_used() && t->tablet_state() == TABLET_RUNNING; });
539
540
0
        TabletQueue big_id_tablets;
541
0
        TabletQueue small_id_tablets;
542
0
        for (auto tablet : all_tablets) {
543
0
            auto tablet_id = tablet->tablet_id();
544
0
            TabletQueue* belong_tablets = nullptr;
545
0
            if (tablet_id > last_tablet_id) {
546
0
                if (big_id_tablets.size() < batch_size ||
547
0
                    big_id_tablets.top()->tablet_id() > tablet_id) {
548
0
                    belong_tablets = &big_id_tablets;
549
0
                }
550
0
            } else if (big_id_tablets.size() < batch_size) {
551
0
                if (small_id_tablets.size() < batch_size ||
552
0
                    small_id_tablets.top()->tablet_id() > tablet_id) {
553
0
                    belong_tablets = &small_id_tablets;
554
0
                }
555
0
            }
556
0
            if (belong_tablets != nullptr) {
557
0
                belong_tablets->push(tablet.get());
558
0
                if (belong_tablets->size() > batch_size) {
559
0
                    belong_tablets->pop();
560
0
                }
561
0
            }
562
0
        }
563
564
0
        int32_t need_small_id_tablet_size =
565
0
                batch_size - static_cast<int32_t>(big_id_tablets.size());
566
567
0
        if (!big_id_tablets.empty()) {
568
0
            last_tablet_id = big_id_tablets.top()->tablet_id();
569
0
        }
570
0
        while (!big_id_tablets.empty()) {
571
0
            big_id_tablets.top()->check_tablet_path_exists();
572
0
            big_id_tablets.pop();
573
0
        }
574
575
0
        if (!small_id_tablets.empty() && need_small_id_tablet_size > 0) {
576
0
            while (static_cast<int32_t>(small_id_tablets.size()) > need_small_id_tablet_size) {
577
0
                small_id_tablets.pop();
578
0
            }
579
580
0
            last_tablet_id = small_id_tablets.top()->tablet_id();
581
0
            while (!small_id_tablets.empty()) {
582
0
                small_id_tablets.top()->check_tablet_path_exists();
583
0
                small_id_tablets.pop();
584
0
            }
585
0
        }
586
587
0
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
588
0
}
589
590
7.04k
void StorageEngine::_adjust_compaction_thread_num() {
591
7.04k
    TEST_SYNC_POINT_RETURN_WITH_VOID("StorageEngine::_adjust_compaction_thread_num.return_void");
592
7.04k
    auto base_compaction_threads_num = get_base_compaction_threads_num(_store_map.size());
593
7.04k
    if (_base_compaction_thread_pool->max_threads() != base_compaction_threads_num) {
594
0
        int old_max_threads = _base_compaction_thread_pool->max_threads();
595
0
        Status status = _base_compaction_thread_pool->set_max_threads(base_compaction_threads_num);
596
0
        if (status.ok()) {
597
0
            VLOG_NOTICE << "update base compaction thread pool max_threads from " << old_max_threads
598
0
                        << " to " << base_compaction_threads_num;
599
0
        }
600
0
    }
601
7.04k
    if (_base_compaction_thread_pool->min_threads() != base_compaction_threads_num) {
602
0
        int old_min_threads = _base_compaction_thread_pool->min_threads();
603
0
        Status status = _base_compaction_thread_pool->set_min_threads(base_compaction_threads_num);
604
0
        if (status.ok()) {
605
0
            VLOG_NOTICE << "update base compaction thread pool min_threads from " << old_min_threads
606
0
                        << " to " << base_compaction_threads_num;
607
0
        }
608
0
    }
609
610
7.04k
    auto cumu_compaction_threads_num = get_cumu_compaction_threads_num(_store_map.size());
611
7.04k
    if (_cumu_compaction_thread_pool->max_threads() != cumu_compaction_threads_num) {
612
0
        int old_max_threads = _cumu_compaction_thread_pool->max_threads();
613
0
        Status status = _cumu_compaction_thread_pool->set_max_threads(cumu_compaction_threads_num);
614
0
        if (status.ok()) {
615
0
            VLOG_NOTICE << "update cumu compaction thread pool max_threads from " << old_max_threads
616
0
                        << " to " << cumu_compaction_threads_num;
617
0
        }
618
0
    }
619
7.04k
    if (_cumu_compaction_thread_pool->min_threads() != cumu_compaction_threads_num) {
620
0
        int old_min_threads = _cumu_compaction_thread_pool->min_threads();
621
0
        Status status = _cumu_compaction_thread_pool->set_min_threads(cumu_compaction_threads_num);
622
0
        if (status.ok()) {
623
0
            VLOG_NOTICE << "update cumu compaction thread pool min_threads from " << old_min_threads
624
0
                        << " to " << cumu_compaction_threads_num;
625
0
        }
626
0
    }
627
628
7.04k
    auto binlog_compaction_threads_num = get_binlog_compaction_threads_num(_store_map.size());
629
7.04k
    if (_binlog_compaction_thread_pool->max_threads() != binlog_compaction_threads_num) {
630
0
        int old_max_threads = _binlog_compaction_thread_pool->max_threads();
631
0
        Status status =
632
0
                _binlog_compaction_thread_pool->set_max_threads(binlog_compaction_threads_num);
633
0
        if (status.ok()) {
634
0
            VLOG_NOTICE << "update binlog compaction thread pool max_threads from "
635
0
                        << old_max_threads << " to " << binlog_compaction_threads_num;
636
0
        }
637
0
    }
638
7.04k
    if (_binlog_compaction_thread_pool->min_threads() != binlog_compaction_threads_num) {
639
0
        int old_min_threads = _binlog_compaction_thread_pool->min_threads();
640
0
        Status status =
641
0
                _binlog_compaction_thread_pool->set_min_threads(binlog_compaction_threads_num);
642
0
        if (status.ok()) {
643
0
            VLOG_NOTICE << "update binlog compaction thread pool min_threads from "
644
0
                        << old_min_threads << " to " << binlog_compaction_threads_num;
645
0
        }
646
0
    }
647
7.04k
}
648
649
10
void StorageEngine::_compaction_tasks_producer_callback() {
650
10
    LOG(INFO) << "try to start compaction producer process!";
651
652
10
    std::vector<DataDir*> data_dirs = get_stores();
653
654
10
    int round = 0;
655
10
    CompactionType compaction_type;
656
657
    // Used to record the time when the score metric was last updated.
658
    // The update of the score metric is accompanied by the logic of selecting the tablet.
659
    // If there is no slot available, the logic of selecting the tablet will be terminated,
660
    // which causes the score metric update to be terminated.
661
    // In order to avoid this situation, we need to update the score regularly.
662
10
    int64_t last_cumulative_score_update_time = 0;
663
10
    int64_t last_base_score_update_time = 0;
664
10
    static const int64_t check_score_interval_ms = 5000; // 5 secs
665
666
10
    int64_t interval = config::generate_compaction_tasks_interval_ms;
667
1.04k
    do {
668
1.04k
        int64_t cur_time = UnixMillis();
669
1.04k
        if (!config::disable_auto_compaction &&
670
1.04k
            (!config::enable_compaction_pause_on_high_memory ||
671
1.04k
             !GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE))) {
672
1.04k
            _adjust_compaction_thread_num();
673
674
1.04k
            bool check_score = false;
675
1.04k
            if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) {
676
942
                compaction_type = CompactionType::CUMULATIVE_COMPACTION;
677
942
                round++;
678
942
                if (cur_time - last_cumulative_score_update_time >= check_score_interval_ms) {
679
139
                    check_score = true;
680
139
                    last_cumulative_score_update_time = cur_time;
681
139
                }
682
942
            } else {
683
103
                compaction_type = CompactionType::BASE_COMPACTION;
684
103
                round = 0;
685
103
                if (cur_time - last_base_score_update_time >= check_score_interval_ms) {
686
81
                    check_score = true;
687
81
                    last_base_score_update_time = cur_time;
688
81
                }
689
103
            }
690
1.04k
            std::unique_ptr<ThreadPool>& thread_pool =
691
1.04k
                    (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
692
1.04k
                            ? _cumu_compaction_thread_pool
693
1.04k
                            : _base_compaction_thread_pool;
694
1.04k
            bvar::Status<int64_t>& g_compaction_task_num_per_round =
695
1.04k
                    (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
696
1.04k
                            ? g_cumu_compaction_task_num_per_round
697
1.04k
                            : g_base_compaction_task_num_per_round;
698
1.04k
            if (config::compaction_num_per_round != -1) {
699
0
                _compaction_num_per_round = config::compaction_num_per_round;
700
1.04k
            } else if (thread_pool->get_queue_size() == 0) {
701
                // If all tasks in the thread pool queue are executed,
702
                // double the number of tasks generated each time,
703
                // with a maximum of config::max_automatic_compaction_num_per_round tasks per generation.
704
1.03k
                if (_compaction_num_per_round < config::max_automatic_compaction_num_per_round) {
705
19
                    _compaction_num_per_round *= 2;
706
19
                    g_compaction_task_num_per_round.set_value(_compaction_num_per_round);
707
19
                }
708
1.03k
            } else if (thread_pool->get_queue_size() > _compaction_num_per_round / 2) {
709
                // If all tasks in the thread pool is greater than
710
                // half of the tasks submitted in the previous round,
711
                // reduce the number of tasks generated each time by half, with a minimum of 1.
712
3
                if (_compaction_num_per_round > 1) {
713
1
                    _compaction_num_per_round /= 2;
714
1
                    g_compaction_task_num_per_round.set_value(_compaction_num_per_round);
715
1
                }
716
3
            }
717
1.04k
            _update_cumulative_compaction_policy();
718
1.04k
            std::vector<TabletCompactionContext> tablet_compaction_contexts =
719
1.04k
                    _generate_compaction_tasks(compaction_type, data_dirs, check_score);
720
1.04k
            if (tablet_compaction_contexts.empty()) {
721
287
                std::unique_lock<std::mutex> lock(_compaction_producer_sleep_mutex);
722
287
                _wakeup_producer_flag = 0;
723
                // It is necessary to wake up the thread on timeout to prevent deadlock
724
                // in case of no running compaction task.
725
287
                _compaction_producer_sleep_cv.wait_for(
726
287
                        lock, std::chrono::milliseconds(2000),
727
572
                        [this] { return _wakeup_producer_flag == 1; });
728
287
                continue;
729
287
            }
730
731
39.3k
            for (const auto& tablet_compaction_context : tablet_compaction_contexts) {
732
39.3k
                const auto& tablet = tablet_compaction_context.tablet;
733
39.3k
                if (compaction_type == CompactionType::BASE_COMPACTION) {
734
6.31k
                    tablet->set_last_base_compaction_schedule_time(UnixMillis());
735
33.0k
                } else if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
736
33.0k
                    tablet->set_last_cumu_compaction_schedule_time(UnixMillis());
737
33.0k
                } else if (compaction_type == CompactionType::FULL_COMPACTION) {
738
0
                    tablet->set_last_full_compaction_schedule_time(UnixMillis());
739
0
                }
740
39.3k
                Status st = _submit_compaction_task(tablet, compaction_type, false);
741
39.3k
                if (!st.ok()) {
742
0
                    LOG(WARNING) << "failed to submit compaction task for tablet: "
743
0
                                 << tablet->tablet_id() << ", err: " << st;
744
0
                }
745
39.3k
            }
746
758
            interval = config::generate_compaction_tasks_interval_ms;
747
758
        } else {
748
1
            interval = 5000; // 5s to check disable_auto_compaction
749
1
        }
750
751
        // wait some seconds for ut test
752
759
        {
753
759
            std ::vector<std ::any> args {};
754
759
            args.emplace_back(1);
755
759
            doris ::SyncPoint ::get_instance()->process(
756
759
                    "StorageEngine::_compaction_tasks_producer_callback", std ::move(args));
757
759
        }
758
759
        int64_t end_time = UnixMillis();
759
759
        DorisMetrics::instance()->compaction_producer_callback_a_round_time->set_value(end_time -
760
759
                                                                                       cur_time);
761
1.04k
    } while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
762
10
}
763
764
3
void StorageEngine::_binlog_compaction_tasks_producer_callback() {
765
3
    LOG(INFO) << "try to start binlog compaction producer process!";
766
767
3
    std::vector<DataDir*> data_dirs = get_stores();
768
769
3
    int64_t last_binlog_score_update_time = 0;
770
3
    static const int64_t check_score_interval_ms = 5000;
771
772
3
    int64_t interval = config::generate_compaction_tasks_interval_ms;
773
6.00k
    do {
774
6.00k
        int64_t cur_time = UnixMillis();
775
6.00k
        if (config::enable_feature_binlog && !config::disable_auto_compaction &&
776
6.00k
            (!config::enable_compaction_pause_on_high_memory ||
777
6.00k
             !GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE))) {
778
6.00k
            _adjust_compaction_thread_num();
779
780
6.00k
            bool check_score = false;
781
6.00k
            if (cur_time - last_binlog_score_update_time >= check_score_interval_ms) {
782
157
                check_score = true;
783
157
                last_binlog_score_update_time = cur_time;
784
157
            }
785
786
6.00k
            std::vector<TabletCompactionContext> tablet_compaction_contexts =
787
6.00k
                    _generate_compaction_tasks(CompactionType::BINLOG_COMPACTION, data_dirs,
788
6.00k
                                               check_score);
789
6.00k
            for (const auto& tablet_compaction_context : tablet_compaction_contexts) {
790
233
                const auto& tablet = tablet_compaction_context.tablet;
791
233
                Status st =
792
233
                        _submit_compaction_task(tablet, CompactionType::BINLOG_COMPACTION, false, 0,
793
233
                                                tablet_compaction_context.prefer_compaction_level);
794
233
                if (!st.ok()) {
795
0
                    LOG(WARNING) << "failed to submit binlog compaction task for tablet: "
796
0
                                 << tablet->tablet_id() << ", err: " << st;
797
0
                }
798
233
            }
799
6.00k
            interval = config::generate_compaction_tasks_interval_ms;
800
6.00k
        } else {
801
0
            interval = 5000;
802
0
        }
803
6.00k
    } while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
804
3
}
805
806
void StorageEngine::get_tablet_rowset_versions(const PGetTabletVersionsRequest* request,
807
0
                                               PGetTabletVersionsResponse* response) {
808
0
    TabletSharedPtr tablet = _tablet_manager->get_tablet(request->tablet_id());
809
0
    if (tablet == nullptr) {
810
0
        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
811
0
        return;
812
0
    }
813
0
    std::vector<Version> local_versions = tablet->get_all_local_versions();
814
0
    for (const auto& local_version : local_versions) {
815
0
        auto version = response->add_versions();
816
0
        version->set_first(local_version.first);
817
0
        version->set_second(local_version.second);
818
0
    }
819
0
    response->mutable_status()->set_status_code(0);
820
0
}
821
822
bool need_generate_compaction_tasks(int task_cnt_per_disk, int thread_per_disk,
823
21.8k
                                    CompactionType compaction_type, bool all_base) {
824
21.8k
    if (compaction_type == CompactionType::BINLOG_COMPACTION) {
825
18.3k
        return task_cnt_per_disk < thread_per_disk;
826
18.3k
    }
827
828
    // We need to reserve at least one Slot for cumulative compaction.
829
    // So when there is only one Slot, we have to judge whether there is a cumulative compaction
830
    // in the current submitted tasks.
831
    // If so, the last Slot can be assigned to Base compaction,
832
    // otherwise, this Slot needs to be reserved for cumulative compaction.
833
3.52k
    if (task_cnt_per_disk >= thread_per_disk) {
834
        // Return if no available slot
835
4
        return false;
836
3.52k
    } else if (task_cnt_per_disk >= thread_per_disk - 1) {
837
        // Only one slot left, check if it can be assigned to base compaction task.
838
0
        if (compaction_type == CompactionType::BASE_COMPACTION) {
839
0
            if (all_base) {
840
0
                return false;
841
0
            }
842
0
        }
843
0
    }
844
3.52k
    return true;
845
3.52k
}
846
847
10.9k
int get_concurrent_per_disk(int max_score, int thread_per_disk) {
848
10.9k
    if (!config::enable_compaction_priority_scheduling) {
849
0
        return thread_per_disk;
850
0
    }
851
852
10.9k
    double load_average = 0;
853
10.9k
    if (DorisMetrics::instance()->system_metrics() != nullptr) {
854
7.80k
        load_average = DorisMetrics::instance()->system_metrics()->get_load_average_1_min();
855
7.80k
    }
856
10.9k
    int num_cores = doris::CpuInfo::num_cores();
857
10.9k
    bool cpu_usage_high = load_average > num_cores * 0.8;
858
859
10.9k
    auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage();
860
10.9k
    bool memory_usage_high = static_cast<double>(process_memory_usage) >
861
10.9k
                             static_cast<double>(MemInfo::soft_mem_limit()) * 0.8;
862
863
10.9k
    if (max_score <= config::low_priority_compaction_score_threshold &&
864
10.9k
        (cpu_usage_high || memory_usage_high)) {
865
6.07k
        return config::low_priority_compaction_task_num_per_disk;
866
6.07k
    }
867
868
4.87k
    return thread_per_disk;
869
10.9k
}
870
871
21.8k
int32_t disk_compaction_slot_num(const DataDir& data_dir, CompactionType compaction_type) {
872
21.8k
    if (compaction_type == CompactionType::BINLOG_COMPACTION) {
873
18.3k
        return config::binlog_compaction_task_num_per_disk;
874
18.3k
    }
875
3.53k
    return data_dir.is_ssd_disk() ? config::compaction_task_num_per_fast_disk
876
3.53k
                                  : config::compaction_task_num_per_disk;
877
21.8k
}
878
879
bool has_free_compaction_slot(CompactionSubmitRegistry* registry, DataDir* dir,
880
10.9k
                              CompactionType compaction_type, uint32_t executing_cnt) {
881
10.9k
    int32_t thread_per_disk = disk_compaction_slot_num(*dir, compaction_type);
882
10.9k
    return need_generate_compaction_tasks(
883
10.9k
            executing_cnt, thread_per_disk, compaction_type,
884
10.9k
            !registry->has_compaction_task(dir, CompactionType::CUMULATIVE_COMPACTION));
885
10.9k
}
886
887
std::vector<TabletCompactionContext> StorageEngine::_generate_compaction_tasks(
888
7.04k
        CompactionType compaction_type, std::vector<DataDir*>& data_dirs, bool check_score) {
889
7.04k
    TEST_SYNC_POINT_RETURN_WITH_VALUE("olap_server::_generate_compaction_tasks.return_empty",
890
7.04k
                                      std::vector<TabletCompactionContext> {});
891
7.04k
    _update_cumulative_compaction_policy();
892
7.04k
    auto cumulative_compaction_policies = _snapshot_cumulative_compaction_policy();
893
7.04k
    std::vector<TabletCompactionContext> tablet_compaction_contexts;
894
7.04k
    uint32_t max_compaction_score = 0;
895
896
7.04k
    std::random_device rd;
897
7.04k
    std::mt19937 g(rd());
898
7.04k
    std::shuffle(data_dirs.begin(), data_dirs.end(), g);
899
900
    // Copy _tablet_submitted_xxx_compaction map so that we don't need to hold _tablet_submitted_compaction_mutex
901
    // when traversing the data dir
902
7.04k
    auto compaction_registry_snapshot = _compaction_submit_registry.create_snapshot();
903
10.9k
    for (auto* data_dir : data_dirs) {
904
10.9k
        bool need_pick_tablet = true;
905
10.9k
        uint32_t executing_task_num =
906
10.9k
                compaction_type == CompactionType::BINLOG_COMPACTION
907
10.9k
                        ? compaction_registry_snapshot.count_executing_compaction(
908
9.17k
                                  data_dir, CompactionType::BINLOG_COMPACTION)
909
10.9k
                        : compaction_registry_snapshot.count_executing_cumu_and_base(data_dir);
910
10.9k
        need_pick_tablet = has_free_compaction_slot(&compaction_registry_snapshot, data_dir,
911
10.9k
                                                    compaction_type, executing_task_num);
912
10.9k
        if (!need_pick_tablet && !check_score) {
913
0
            continue;
914
0
        }
915
916
        // Even if need_pick_tablet is false, we still need to call find_best_tablet_to_compaction(),
917
        // So that we can update the max_compaction_score metric.
918
10.9k
        if (!data_dir->reach_capacity_limit(0)) {
919
10.9k
            uint32_t disk_max_score = 0;
920
10.9k
            auto tablet_contexts = compaction_registry_snapshot.pick_topn_tablets_for_compaction(
921
10.9k
                    _tablet_manager.get(), data_dir, compaction_type,
922
10.9k
                    cumulative_compaction_policies, &disk_max_score);
923
10.9k
            int concurrent_num = get_concurrent_per_disk(
924
10.9k
                    disk_max_score, disk_compaction_slot_num(*data_dir, compaction_type));
925
10.9k
            need_pick_tablet = need_generate_compaction_tasks(
926
10.9k
                    executing_task_num, concurrent_num, compaction_type,
927
10.9k
                    !compaction_registry_snapshot.has_compaction_task(
928
10.9k
                            data_dir, CompactionType::CUMULATIVE_COMPACTION));
929
39.6k
            for (const auto& context : tablet_contexts) {
930
39.6k
                if (context.tablet != nullptr) {
931
39.6k
                    if (need_pick_tablet) {
932
39.5k
                        tablet_compaction_contexts.emplace_back(context);
933
39.5k
                    }
934
39.6k
                    max_compaction_score = std::max(max_compaction_score, disk_max_score);
935
39.6k
                }
936
39.6k
            }
937
10.9k
        }
938
10.9k
    }
939
940
7.04k
    if (max_compaction_score > 0) {
941
790
        if (compaction_type == CompactionType::BASE_COMPACTION) {
942
103
            DorisMetrics::instance()->tablet_base_max_compaction_score->set_value(
943
103
                    max_compaction_score);
944
687
        } else if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
945
655
            DorisMetrics::instance()->tablet_cumulative_max_compaction_score->set_value(
946
655
                    max_compaction_score);
947
655
        } else if (compaction_type == CompactionType::BINLOG_COMPACTION) {
948
32
            DorisMetrics::instance()->tablet_binlog_max_compaction_score->set_value(
949
32
                    max_compaction_score);
950
32
        }
951
790
    }
952
7.04k
    return tablet_compaction_contexts;
953
7.04k
}
954
955
8.08k
void StorageEngine::_update_cumulative_compaction_policy() {
956
8.08k
    std::lock_guard<std::mutex> lock(_cumulative_compaction_policy_mtx);
957
8.08k
    if (_cumulative_compaction_policies.empty()) {
958
5
        _cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY] =
959
5
                CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
960
5
                        CUMULATIVE_SIZE_BASED_POLICY);
961
5
        _cumulative_compaction_policies[CUMULATIVE_TIME_SERIES_POLICY] =
962
5
                CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
963
5
                        CUMULATIVE_TIME_SERIES_POLICY);
964
5
    }
965
8.08k
}
966
967
7.04k
CumuCompactionPolicyTable StorageEngine::_snapshot_cumulative_compaction_policy() {
968
7.04k
    std::lock_guard<std::mutex> lock(_cumulative_compaction_policy_mtx);
969
7.04k
    return _cumulative_compaction_policies;
970
7.04k
}
971
972
std::shared_ptr<CumulativeCompactionPolicy> StorageEngine::_get_cumulative_compaction_policy(
973
0
        std::string_view compaction_policy) {
974
0
    std::lock_guard<std::mutex> lock(_cumulative_compaction_policy_mtx);
975
0
    return _cumulative_compaction_policies.at(compaction_policy);
976
0
}
977
978
void StorageEngine::_pop_tablet_from_submitted_compaction(TabletSharedPtr tablet,
979
39.5k
                                                          CompactionType compaction_type) {
980
39.5k
    _compaction_submit_registry.remove(tablet, compaction_type, [this]() {
981
39.5k
        std::unique_lock<std::mutex> lock(_compaction_producer_sleep_mutex);
982
39.5k
        _wakeup_producer_flag = 1;
983
39.5k
        _compaction_producer_sleep_cv.notify_one();
984
39.5k
    });
985
39.5k
}
986
987
Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
988
                                              CompactionType compaction_type, bool force,
989
39.5k
                                              int trigger_method, int8_t prefer_compaction_level) {
990
39.5k
    bool already_exist = _compaction_submit_registry.insert(tablet, compaction_type);
991
39.5k
    if (already_exist) {
992
0
        return Status::AlreadyExist<false>(
993
0
                "compaction task has already been submitted, tablet_id={}, compaction_type={}.",
994
0
                tablet->tablet_id(), compaction_type);
995
0
    }
996
39.5k
    tablet->compaction_stage = CompactionStage::PENDING;
997
39.5k
    std::shared_ptr<CompactionMixin> compaction;
998
39.5k
    int64_t permits = 0;
999
39.5k
    Status st = Tablet::prepare_compaction_and_calculate_permits(
1000
39.5k
            compaction_type, tablet, compaction, permits, prefer_compaction_level);
1001
39.5k
    if (st.ok() && permits > 0) {
1002
45
        if (!force) {
1003
45
            if (compaction_type == CompactionType::BINLOG_COMPACTION) {
1004
0
                if (!_permit_limiter.try_request(permits, compaction_type)) {
1005
0
                    _pop_tablet_from_submitted_compaction(tablet, compaction_type);
1006
0
                    tablet->compaction_stage = CompactionStage::NOT_SCHEDULED;
1007
0
                    return Status::OK();
1008
0
                }
1009
45
            } else {
1010
45
                _permit_limiter.request(permits);
1011
45
            }
1012
45
        }
1013
        // Register task with CompactionTaskTracker as PENDING
1014
45
        auto* tracker = CompactionTaskTracker::instance();
1015
45
        int64_t compaction_id = compaction->compaction_id();
1016
45
        {
1017
45
            CompactionTaskInfo info;
1018
45
            info.compaction_id = compaction_id;
1019
45
            info.tablet_id = tablet->tablet_id();
1020
45
            info.table_id = tablet->get_table_id();
1021
45
            info.partition_id = tablet->partition_id();
1022
45
            switch (compaction_type) {
1023
0
            case CompactionType::BASE_COMPACTION:
1024
0
                info.compaction_type = CompactionProfileType::BASE;
1025
0
                break;
1026
45
            case CompactionType::CUMULATIVE_COMPACTION:
1027
45
                info.compaction_type = CompactionProfileType::CUMULATIVE;
1028
45
                break;
1029
0
            case CompactionType::FULL_COMPACTION:
1030
0
                info.compaction_type = CompactionProfileType::FULL;
1031
0
                break;
1032
0
            case CompactionType::BINLOG_COMPACTION:
1033
0
                info.compaction_type = CompactionProfileType::BINLOG;
1034
0
                break;
1035
0
            default:
1036
0
                DCHECK(false) << "invalid compaction type: " << compaction_type;
1037
45
            }
1038
45
            info.status = CompactionTaskStatus::PENDING;
1039
45
            info.trigger_method = static_cast<TriggerMethod>(trigger_method);
1040
45
            info.scheduled_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
1041
45
                                             std::chrono::system_clock::now().time_since_epoch())
1042
45
                                             .count();
1043
45
            info.permits = permits;
1044
45
            info.backend_id = BackendOptions::get_backend_id();
1045
45
            info.compaction_score = tablet->get_real_compaction_score();
1046
45
            info.input_rowsets_count = compaction->input_rowsets_count();
1047
45
            info.input_row_num = compaction->input_row_num_value();
1048
45
            info.input_data_size = compaction->input_rowsets_data_size();
1049
45
            info.input_index_size = compaction->input_rowsets_index_size();
1050
45
            info.input_total_size = compaction->input_rowsets_total_size();
1051
45
            info.input_segments_num = compaction->input_segments_num_value();
1052
45
            info.input_version_range = compaction->input_version_range_str();
1053
45
            info.is_vertical = compaction->is_vertical();
1054
45
            tracker->register_task(std::move(info));
1055
45
        }
1056
0
        std::unique_ptr<ThreadPool>* thread_pool = nullptr;
1057
45
        const char* compaction_type_name = "UNKNOWN";
1058
45
        switch (compaction_type) {
1059
45
        case CompactionType::CUMULATIVE_COMPACTION:
1060
45
            thread_pool = &_cumu_compaction_thread_pool;
1061
45
            compaction_type_name = "CUMU";
1062
45
            break;
1063
0
        case CompactionType::BINLOG_COMPACTION:
1064
0
            thread_pool = &_binlog_compaction_thread_pool;
1065
0
            compaction_type_name = "BINLOG";
1066
0
            break;
1067
0
        case CompactionType::BASE_COMPACTION:
1068
0
        case CompactionType::FULL_COMPACTION:
1069
0
            thread_pool = &_base_compaction_thread_pool;
1070
0
            compaction_type_name = "BASE";
1071
0
            break;
1072
0
        default:
1073
0
            DCHECK(false) << "invalid compaction type: " << compaction_type;
1074
45
        }
1075
45
        VLOG_CRITICAL << "compaction thread pool. type: " << compaction_type_name
1076
0
                      << ", num_threads: " << thread_pool->get()->num_threads()
1077
0
                      << ", num_threads_pending_start: "
1078
0
                      << thread_pool->get()->num_threads_pending_start()
1079
0
                      << ", num_active_threads: " << thread_pool->get()->num_active_threads()
1080
0
                      << ", max_threads: " << thread_pool->get()->max_threads()
1081
0
                      << ", min_threads: " << thread_pool->get()->min_threads()
1082
0
                      << ", num_total_queued_tasks: " << thread_pool->get()->get_queue_size();
1083
45
        auto status =
1084
45
                thread_pool->get()->submit_func([=, compaction = std::move(compaction), this]() {
1085
31
                    _handle_compaction(std::move(tablet), std::move(compaction), compaction_type,
1086
31
                                       permits, force, compaction_id);
1087
31
                });
1088
45
        if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) [[likely]] {
1089
45
            DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
1090
45
                    _cumu_compaction_thread_pool->get_queue_size());
1091
45
        } else if (compaction_type == CompactionType::BASE_COMPACTION) {
1092
0
            DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
1093
0
                    _base_compaction_thread_pool->get_queue_size());
1094
0
        }
1095
45
        if (!status.ok()) {
1096
            // Cleanup tracker on submit failure
1097
0
            tracker->remove_task(compaction_id);
1098
0
            if (!force) {
1099
0
                _permit_limiter.release(permits, compaction_type);
1100
0
            }
1101
0
            _pop_tablet_from_submitted_compaction(tablet, compaction_type);
1102
0
            tablet->compaction_stage = CompactionStage::NOT_SCHEDULED;
1103
0
            return Status::InternalError(
1104
0
                    "failed to submit compaction task to thread pool, "
1105
0
                    "tablet_id={}, compaction_type={}.",
1106
0
                    tablet->tablet_id(), compaction_type);
1107
0
        }
1108
45
        return Status::OK();
1109
39.5k
    } else {
1110
39.5k
        _pop_tablet_from_submitted_compaction(tablet, compaction_type);
1111
39.5k
        tablet->compaction_stage = CompactionStage::NOT_SCHEDULED;
1112
39.5k
        if (!st.ok()) {
1113
0
            return Status::InternalError(
1114
0
                    "failed to prepare compaction task and calculate permits, "
1115
0
                    "tablet_id={}, compaction_type={}, "
1116
0
                    "permit={}, current_permit={}, status={}",
1117
0
                    tablet->tablet_id(), compaction_type, permits, _permit_limiter.usage(),
1118
0
                    st.to_string());
1119
0
        }
1120
39.5k
        return st;
1121
39.5k
    }
1122
39.5k
}
1123
1124
void StorageEngine::_handle_compaction(TabletSharedPtr tablet,
1125
                                       std::shared_ptr<CompactionMixin> compaction,
1126
                                       CompactionType compaction_type, int64_t permits, bool force,
1127
24
                                       int64_t compaction_id) {
1128
24
    if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) [[likely]] {
1129
24
        DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(1);
1130
24
        DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
1131
24
                _cumu_compaction_thread_pool->get_queue_size());
1132
24
    } else if (compaction_type == CompactionType::BASE_COMPACTION) {
1133
0
        DorisMetrics::instance()->base_compaction_task_running_total->increment(1);
1134
0
        DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
1135
0
                _base_compaction_thread_pool->get_queue_size());
1136
0
    } else if (compaction_type == CompactionType::BINLOG_COMPACTION) {
1137
0
        DorisMetrics::instance()->binlog_compaction_task_running_total->increment(1);
1138
0
        DorisMetrics::instance()->binlog_compaction_task_pending_total->set_value(
1139
0
                _binlog_compaction_thread_pool->get_queue_size());
1140
0
    }
1141
24
    bool is_large_task = true;
1142
31
    Defer defer {[&]() {
1143
31
        DBUG_EXECUTE_IF("StorageEngine._submit_compaction_task.sleep", { sleep(5); })
1144
        // Idempotent cleanup: remove task from tracker
1145
31
        CompactionTaskTracker::instance()->remove_task(compaction_id);
1146
31
        if (!force) {
1147
31
            _permit_limiter.release(permits, compaction_type);
1148
31
        }
1149
31
        _pop_tablet_from_submitted_compaction(tablet, compaction_type);
1150
31
        tablet->compaction_stage = CompactionStage::NOT_SCHEDULED;
1151
31
        if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
1152
31
            std::lock_guard<std::mutex> lock(_cumu_compaction_delay_mtx);
1153
31
            _cumu_compaction_thread_pool_used_threads--;
1154
31
            if (!is_large_task) {
1155
1
                _cumu_compaction_thread_pool_small_tasks_running--;
1156
1
            }
1157
31
            DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(-1);
1158
31
            DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
1159
31
                    _cumu_compaction_thread_pool->get_queue_size());
1160
31
        } else if (compaction_type == CompactionType::BASE_COMPACTION) {
1161
0
            DorisMetrics::instance()->base_compaction_task_running_total->increment(-1);
1162
0
            DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
1163
0
                    _base_compaction_thread_pool->get_queue_size());
1164
0
        } else if (compaction_type == CompactionType::BINLOG_COMPACTION) {
1165
0
            DorisMetrics::instance()->binlog_compaction_task_running_total->increment(-1);
1166
0
            DorisMetrics::instance()->binlog_compaction_task_pending_total->set_value(
1167
0
                    _binlog_compaction_thread_pool->get_queue_size());
1168
0
        }
1169
31
    }};
1170
24
    do {
1171
24
        if (compaction->compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) {
1172
24
            std::lock_guard<std::mutex> lock(_cumu_compaction_delay_mtx);
1173
24
            _cumu_compaction_thread_pool_used_threads++;
1174
24
            if (config::large_cumu_compaction_task_min_thread_num > 1 &&
1175
24
                _cumu_compaction_thread_pool->max_threads() >=
1176
24
                        config::large_cumu_compaction_task_min_thread_num) {
1177
                // Determine if this is a large task based on configured thresholds
1178
1
                is_large_task = (compaction->calc_input_rowsets_total_size() >
1179
1
                                         config::large_cumu_compaction_task_bytes_threshold ||
1180
1
                                 compaction->calc_input_rowsets_row_num() >
1181
1
                                         config::large_cumu_compaction_task_row_num_threshold);
1182
1183
                // Small task. No delay needed
1184
1
                if (!is_large_task) {
1185
1
                    _cumu_compaction_thread_pool_small_tasks_running++;
1186
1
                    break;
1187
1
                }
1188
                // Deal with large task
1189
0
                if (_should_delay_large_task()) {
1190
0
                    LOG_WARNING(
1191
0
                            "failed to do CumulativeCompaction, cumu thread pool is "
1192
0
                            "intensive, delay large task.")
1193
0
                            .tag("tablet_id", tablet->tablet_id())
1194
0
                            .tag("input_rows", compaction->calc_input_rowsets_row_num())
1195
0
                            .tag("input_rowsets_total_size",
1196
0
                                 compaction->calc_input_rowsets_total_size())
1197
0
                            .tag("config::large_cumu_compaction_task_bytes_threshold",
1198
0
                                 config::large_cumu_compaction_task_bytes_threshold)
1199
0
                            .tag("config::large_cumu_compaction_task_row_num_threshold",
1200
0
                                 config::large_cumu_compaction_task_row_num_threshold)
1201
0
                            .tag("remaining threads", _cumu_compaction_thread_pool_used_threads)
1202
0
                            .tag("small_tasks_running",
1203
0
                                 _cumu_compaction_thread_pool_small_tasks_running);
1204
                    // Delay this task and sleep 5s for this tablet
1205
0
                    long now = duration_cast<std::chrono::milliseconds>(
1206
0
                                       std::chrono::system_clock::now().time_since_epoch())
1207
0
                                       .count();
1208
0
                    tablet->set_last_cumu_compaction_failure_time(now);
1209
0
                    return;
1210
0
                }
1211
0
            }
1212
24
        }
1213
24
    } while (false);
1214
24
    if (!tablet->can_do_compaction(tablet->data_dir()->path_hash(), compaction_type)) {
1215
0
        LOG(INFO) << "Tablet state has been changed, no need to begin this compaction "
1216
0
                     "task, tablet_id="
1217
0
                  << tablet->tablet_id() << ", tablet_state=" << tablet->tablet_state();
1218
0
        return;
1219
0
    }
1220
24
    tablet->compaction_stage = CompactionStage::EXECUTING;
1221
    // Update tracker to RUNNING
1222
24
    {
1223
24
        RunningStats rs;
1224
24
        rs.start_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
1225
24
                                   std::chrono::system_clock::now().time_since_epoch())
1226
24
                                   .count();
1227
24
        rs.permits = permits;
1228
24
        CompactionTaskTracker::instance()->update_to_running(compaction_id, rs);
1229
24
    }
1230
24
    TEST_SYNC_POINT_RETURN_WITH_VOID("olap_server::execute_compaction");
1231
24
    tablet->execute_compaction(*compaction);
1232
24
}
1233
1234
Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type,
1235
0
                                             bool force, bool eager, int trigger_method) {
1236
0
    if (!eager) {
1237
0
        DCHECK(compaction_type == CompactionType::BASE_COMPACTION ||
1238
0
               compaction_type == CompactionType::CUMULATIVE_COMPACTION);
1239
0
        auto compaction_registry_snapshot = _compaction_submit_registry.create_snapshot();
1240
0
        auto stores = get_stores();
1241
1242
0
        bool is_busy = std::none_of(
1243
0
                stores.begin(), stores.end(),
1244
0
                [&compaction_registry_snapshot, compaction_type](auto* data_dir) {
1245
0
                    return has_free_compaction_slot(
1246
0
                            &compaction_registry_snapshot, data_dir, compaction_type,
1247
0
                            compaction_registry_snapshot.count_executing_cumu_and_base(data_dir));
1248
0
                });
1249
0
        if (is_busy) {
1250
0
            LOG_EVERY_N(WARNING, 100)
1251
0
                    << "Too busy to submit a compaction task, tablet=" << tablet->get_table_id();
1252
0
            return Status::OK();
1253
0
        }
1254
0
    }
1255
0
    _update_cumulative_compaction_policy();
1256
    // alter table tableName set ("compaction_policy"="time_series")
1257
    // if atler table's compaction  policy, we need to modify tablet compaction policy shared ptr
1258
0
    if (tablet->get_cumulative_compaction_policy() == nullptr ||
1259
0
        tablet->get_cumulative_compaction_policy()->name() !=
1260
0
                tablet->tablet_meta()->compaction_policy()) {
1261
0
        tablet->set_cumulative_compaction_policy(
1262
0
                _get_cumulative_compaction_policy(tablet->tablet_meta()->compaction_policy()));
1263
0
    }
1264
0
    tablet->set_skip_compaction(false);
1265
0
    int8_t prefer_compaction_level = -1;
1266
0
    if (compaction_type == CompactionType::BINLOG_COMPACTION) {
1267
0
        tablet->calc_compaction_score(compaction_type, &prefer_compaction_level);
1268
0
        if (prefer_compaction_level < 0) {
1269
0
            return Status::Error<ErrorCode::BINLOG_COMPACTION_NO_SUITABLE_VERSION>(
1270
0
                    "failed to init binlog compaction due to no suitable version");
1271
0
        }
1272
0
    }
1273
0
    return _submit_compaction_task(tablet, compaction_type, force, trigger_method,
1274
0
                                   prefer_compaction_level);
1275
0
}
1276
1277
Status StorageEngine::_handle_seg_compaction(std::shared_ptr<SegcompactionWorker> worker,
1278
                                             SegCompactionCandidatesSharedPtr segments,
1279
11
                                             uint64_t submission_time) {
1280
    // note: be aware that worker->_writer maybe released when the task is cancelled
1281
11
    uint64_t exec_queue_time = GetCurrentTimeMicros() - submission_time;
1282
11
    LOG(INFO) << "segcompaction thread pool queue time(ms): " << exec_queue_time / 1000;
1283
11
    worker->compact_segments(segments);
1284
    // return OK here. error will be reported via BetaRowsetWriter::_segcompaction_status
1285
11
    return Status::OK();
1286
11
}
1287
1288
Status StorageEngine::submit_seg_compaction_task(std::shared_ptr<SegcompactionWorker> worker,
1289
11
                                                 SegCompactionCandidatesSharedPtr segments) {
1290
11
    uint64_t submission_time = GetCurrentTimeMicros();
1291
11
    return _seg_compaction_thread_pool->submit_func([this, worker, segments, submission_time] {
1292
11
        static_cast<void>(_handle_seg_compaction(worker, segments, submission_time));
1293
11
    });
1294
11
}
1295
1296
0
Status StorageEngine::process_index_change_task(const TAlterInvertedIndexReq& request) {
1297
0
    auto tablet_id = request.tablet_id;
1298
0
    TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id);
1299
0
    DBUG_EXECUTE_IF("StorageEngine::process_index_change_task_tablet_nullptr",
1300
0
                    { tablet = nullptr; })
1301
0
    if (tablet == nullptr) {
1302
0
        LOG(WARNING) << "tablet: " << tablet_id << " not exist";
1303
0
        return Status::InternalError("tablet not exist, tablet_id={}.", tablet_id);
1304
0
    }
1305
1306
0
    IndexBuilderSharedPtr index_builder = std::make_shared<IndexBuilder>(
1307
0
            *this, tablet, request.columns, request.alter_inverted_indexes, request.is_drop_op);
1308
0
    RETURN_IF_ERROR(_handle_index_change(index_builder));
1309
0
    return Status::OK();
1310
0
}
1311
1312
0
Status StorageEngine::_handle_index_change(IndexBuilderSharedPtr index_builder) {
1313
0
    RETURN_IF_ERROR(index_builder->init());
1314
0
    RETURN_IF_ERROR(index_builder->do_build_inverted_index());
1315
0
    return Status::OK();
1316
0
}
1317
1318
3
void StorageEngine::_cooldown_tasks_producer_callback() {
1319
3
    int64_t interval = config::generate_cooldown_task_interval_sec;
1320
    // the cooldown replica may be slow to upload it's meta file, so we should wait
1321
    // until it has done uploaded
1322
3
    int64_t skip_failed_interval = interval * 10;
1323
41
    do {
1324
        // these tables are ordered by priority desc
1325
41
        std::vector<TabletSharedPtr> tablets;
1326
41
        std::vector<RowsetSharedPtr> rowsets;
1327
        // TODO(luwei) : a more efficient way to get cooldown tablets
1328
41
        auto cur_time = time(nullptr);
1329
        // we should skip all the tablets which are not running and those pending to do cooldown
1330
        // also tablets once failed to do follow cooldown
1331
41
        auto skip_tablet = [this, skip_failed_interval,
1332
1.58M
                            cur_time](const TabletSharedPtr& tablet) -> bool {
1333
1.58M
            bool is_skip =
1334
1.58M
                    cur_time - tablet->last_failed_follow_cooldown_time() < skip_failed_interval ||
1335
1.58M
                    TABLET_RUNNING != tablet->tablet_state();
1336
1.58M
            if (is_skip) {
1337
0
                return is_skip;
1338
0
            }
1339
1.58M
            std::lock_guard<std::mutex> lock(_running_cooldown_mutex);
1340
1.58M
            return _running_cooldown_tablets.find(tablet->tablet_id()) !=
1341
1.58M
                   _running_cooldown_tablets.end();
1342
1.58M
        };
1343
41
        _tablet_manager->get_cooldown_tablets(&tablets, &rowsets, std::move(skip_tablet));
1344
41
        LOG(INFO) << "cooldown producer get tablet num: " << tablets.size();
1345
41
        int max_priority = cast_set<int>(tablets.size());
1346
41
        int index = 0;
1347
41
        for (const auto& tablet : tablets) {
1348
0
            {
1349
0
                std::lock_guard<std::mutex> lock(_running_cooldown_mutex);
1350
0
                _running_cooldown_tablets.insert(tablet->tablet_id());
1351
0
            }
1352
0
            PriorityThreadPool::Task task;
1353
0
            RowsetSharedPtr rowset = std::move(rowsets[index++]);
1354
0
            task.work_function = [tablet, rowset, task_size = tablets.size(), this]() {
1355
0
                Status st = tablet->cooldown(rowset);
1356
0
                {
1357
0
                    std::lock_guard<std::mutex> lock(_running_cooldown_mutex);
1358
0
                    _running_cooldown_tablets.erase(tablet->tablet_id());
1359
0
                }
1360
0
                if (!st.ok()) {
1361
0
                    LOG(WARNING) << "failed to cooldown, tablet: " << tablet->tablet_id()
1362
0
                                 << " err: " << st;
1363
0
                } else {
1364
0
                    LOG(INFO) << "succeed to cooldown, tablet: " << tablet->tablet_id()
1365
0
                              << " cooldown progress ("
1366
0
                              << task_size - _cooldown_thread_pool->get_queue_size() << "/"
1367
0
                              << task_size << ")";
1368
0
                }
1369
0
            };
1370
0
            task.priority = max_priority--;
1371
0
            bool submited = _cooldown_thread_pool->offer(std::move(task));
1372
1373
0
            if (!submited) {
1374
0
                LOG(INFO) << "failed to submit cooldown task";
1375
0
            }
1376
0
        }
1377
41
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
1378
3
}
1379
1380
3
void StorageEngine::_remove_unused_remote_files_callback() {
1381
10
    while (!_stop_background_threads_latch.wait_for(
1382
10
            std::chrono::seconds(config::remove_unused_remote_files_interval_sec))) {
1383
7
        LOG(INFO) << "begin to remove unused remote files";
1384
7
        do_remove_unused_remote_files();
1385
7
    }
1386
3
}
1387
1388
static void collect_tablet_unused_remote_files(
1389
        Tablet* t, TConfirmUnusedRemoteFilesRequest& req,
1390
        std::unordered_map<int64_t, std::pair<StorageResource, std::vector<io::FileInfo>>>& buffer,
1391
3
        int64_t& num_files_in_buffer, PendingRowsetSet& pending_remote_rowsets) {
1392
3
    auto storage_resource = get_resource_by_storage_policy_id(t->storage_policy_id());
1393
3
    if (!storage_resource) {
1394
0
        LOG(WARNING) << "encounter error when remove unused remote files, tablet_id="
1395
0
                     << t->tablet_id() << " : " << storage_resource.error();
1396
0
        return;
1397
0
    }
1398
1399
    // TODO(plat1ko): Support path v1
1400
3
    if (storage_resource->path_version > 0) {
1401
0
        return;
1402
0
    }
1403
1404
3
    std::vector<io::FileInfo> files;
1405
    // FIXME(plat1ko): What if user reset resource in storage policy to another resource?
1406
    //  Maybe we should also list files in previously uploaded resources.
1407
3
    bool exists = true;
1408
3
    auto st = storage_resource->fs->list(storage_resource->remote_tablet_path(t->tablet_id()), true,
1409
3
                                         &files, &exists);
1410
3
    if (!st.ok()) {
1411
0
        LOG(WARNING) << "encounter error when remove unused remote files, tablet_id="
1412
0
                     << t->tablet_id() << " : " << st;
1413
0
        return;
1414
0
    }
1415
3
    if (!exists || files.empty()) {
1416
0
        return;
1417
0
    }
1418
    // get all cooldowned rowsets
1419
3
    RowsetIdUnorderedSet cooldowned_rowsets;
1420
3
    UniqueId cooldown_meta_id;
1421
3
    {
1422
3
        std::shared_lock rlock(t->get_header_lock());
1423
12
        for (const auto& [_, rs_meta] : t->tablet_meta()->all_rs_metas()) {
1424
12
            if (!rs_meta->is_local()) {
1425
12
                cooldowned_rowsets.insert(rs_meta->rowset_id());
1426
12
            }
1427
12
        }
1428
3
        if (cooldowned_rowsets.empty()) {
1429
0
            return;
1430
0
        }
1431
3
        cooldown_meta_id = t->tablet_meta()->cooldown_meta_id();
1432
3
    }
1433
0
    auto [cooldown_term, cooldown_replica_id] = t->cooldown_conf();
1434
3
    if (cooldown_replica_id != t->replica_id()) {
1435
0
        return;
1436
0
    }
1437
    // {cooldown_replica_id}.{cooldown_term}.meta
1438
3
    std::string remote_meta_path =
1439
3
            cooldown_tablet_meta_filename(cooldown_replica_id, cooldown_term);
1440
    // filter out the paths that should be reserved
1441
12
    auto filter = [&](io::FileInfo& info) {
1442
12
        std::string_view filename = info.file_name;
1443
12
        if (filename.ends_with(".meta")) {
1444
3
            return filename == remote_meta_path;
1445
3
        }
1446
9
        auto rowset_id = extract_rowset_id(filename);
1447
9
        if (rowset_id.hi == 0) {
1448
0
            return false;
1449
0
        }
1450
9
        return cooldowned_rowsets.contains(rowset_id) || pending_remote_rowsets.contains(rowset_id);
1451
9
    };
1452
3
    files.erase(std::remove_if(files.begin(), files.end(), std::move(filter)), files.end());
1453
3
    if (files.empty()) {
1454
3
        return;
1455
3
    }
1456
0
    files.shrink_to_fit();
1457
0
    num_files_in_buffer += files.size();
1458
0
    buffer.insert({t->tablet_id(), {*storage_resource, std::move(files)}});
1459
0
    auto& info = req.confirm_list.emplace_back();
1460
0
    info.__set_tablet_id(t->tablet_id());
1461
0
    info.__set_cooldown_replica_id(cooldown_replica_id);
1462
0
    info.__set_cooldown_meta_id(cooldown_meta_id.to_thrift());
1463
0
}
1464
1465
static void confirm_and_remove_unused_remote_files(
1466
        const TConfirmUnusedRemoteFilesRequest& req,
1467
        std::unordered_map<int64_t, std::pair<StorageResource, std::vector<io::FileInfo>>>& buffer,
1468
0
        const int64_t num_files_in_buffer) {
1469
0
    TConfirmUnusedRemoteFilesResult result;
1470
0
    LOG(INFO) << "begin to confirm unused remote files. num_tablets=" << buffer.size()
1471
0
              << " num_files=" << num_files_in_buffer;
1472
0
    auto st = MasterServerClient::instance()->confirm_unused_remote_files(req, &result);
1473
0
    if (!st.ok()) {
1474
0
        LOG(WARNING) << st;
1475
0
        return;
1476
0
    }
1477
0
    for (auto id : result.confirmed_tablets) {
1478
0
        if (auto it = buffer.find(id); LIKELY(it != buffer.end())) {
1479
0
            auto& storage_resource = it->second.first;
1480
0
            auto& files = it->second.second;
1481
0
            std::vector<io::Path> paths;
1482
0
            paths.reserve(files.size());
1483
            // delete unused files
1484
0
            LOG(INFO) << "delete unused files. root_path=" << storage_resource.fs->root_path()
1485
0
                      << " tablet_id=" << id;
1486
0
            io::Path dir = storage_resource.remote_tablet_path(id);
1487
0
            for (auto& file : files) {
1488
0
                auto file_path = dir / file.file_name;
1489
0
                LOG(INFO) << "delete unused file: " << file_path.native();
1490
0
                paths.push_back(std::move(file_path));
1491
0
            }
1492
0
            st = storage_resource.fs->batch_delete(paths);
1493
0
            if (!st.ok()) {
1494
0
                LOG(WARNING) << "failed to delete unused files, tablet_id=" << id << " : " << st;
1495
0
            }
1496
0
            buffer.erase(it);
1497
0
        }
1498
0
    }
1499
0
}
1500
1501
7
void StorageEngine::do_remove_unused_remote_files() {
1502
426k
    auto tablets = tablet_manager()->get_all_tablet([](Tablet* t) {
1503
426k
        return t->tablet_meta()->cooldown_meta_id().initialized() && t->is_used() &&
1504
426k
               t->tablet_state() == TABLET_RUNNING &&
1505
426k
               t->cooldown_conf_unlocked().cooldown_replica_id == t->replica_id();
1506
426k
    });
1507
7
    TConfirmUnusedRemoteFilesRequest req;
1508
7
    req.__isset.confirm_list = true;
1509
    // tablet_id -> [storage_resource, unused_remote_files]
1510
7
    using unused_remote_files_buffer_t =
1511
7
            std::unordered_map<int64_t, std::pair<StorageResource, std::vector<io::FileInfo>>>;
1512
7
    unused_remote_files_buffer_t buffer;
1513
7
    int64_t num_files_in_buffer = 0;
1514
    // assume a filename is 0.1KB, buffer size should not larger than 100MB
1515
7
    constexpr int64_t max_files_in_buffer = 1000000;
1516
1517
    // batch confirm to reduce FE's overhead
1518
7
    auto next_confirm_time = std::chrono::steady_clock::now() +
1519
7
                             std::chrono::seconds(config::confirm_unused_remote_files_interval_sec);
1520
7
    for (auto& t : tablets) {
1521
3
        if (t.use_count() <= 1 // this means tablet has been dropped
1522
3
            || t->cooldown_conf_unlocked().cooldown_replica_id != t->replica_id() ||
1523
3
            t->tablet_state() != TABLET_RUNNING) {
1524
0
            continue;
1525
0
        }
1526
3
        collect_tablet_unused_remote_files(t.get(), req, buffer, num_files_in_buffer,
1527
3
                                           _pending_remote_rowsets);
1528
3
        if (num_files_in_buffer > 0 && (num_files_in_buffer > max_files_in_buffer ||
1529
0
                                        std::chrono::steady_clock::now() > next_confirm_time)) {
1530
0
            confirm_and_remove_unused_remote_files(req, buffer, num_files_in_buffer);
1531
0
            buffer.clear();
1532
0
            req.confirm_list.clear();
1533
0
            num_files_in_buffer = 0;
1534
0
            next_confirm_time =
1535
0
                    std::chrono::steady_clock::now() +
1536
0
                    std::chrono::seconds(config::confirm_unused_remote_files_interval_sec);
1537
0
        }
1538
3
    }
1539
7
    if (num_files_in_buffer > 0) {
1540
0
        confirm_and_remove_unused_remote_files(req, buffer, num_files_in_buffer);
1541
0
    }
1542
7
}
1543
1544
3
void StorageEngine::_cold_data_compaction_producer_callback() {
1545
10
    while (!_stop_background_threads_latch.wait_for(
1546
10
            std::chrono::seconds(config::cold_data_compaction_interval_sec))) {
1547
7
        if (config::disable_auto_compaction ||
1548
7
            GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
1549
0
            continue;
1550
0
        }
1551
1552
7
        std::unordered_set<int64_t> copied_tablet_submitted;
1553
7
        {
1554
7
            std::lock_guard lock(_cold_compaction_tablet_submitted_mtx);
1555
7
            copied_tablet_submitted = _cold_compaction_tablet_submitted;
1556
7
        }
1557
7
        int64_t n = config::cold_data_compaction_thread_num - copied_tablet_submitted.size();
1558
7
        if (n <= 0) {
1559
0
            continue;
1560
0
        }
1561
426k
        auto tablets = _tablet_manager->get_all_tablet([&copied_tablet_submitted](Tablet* t) {
1562
426k
            return t->tablet_meta()->cooldown_meta_id().initialized() && t->is_used() &&
1563
426k
                   t->tablet_state() == TABLET_RUNNING &&
1564
426k
                   !copied_tablet_submitted.contains(t->tablet_id()) &&
1565
426k
                   !t->tablet_meta()->tablet_schema()->disable_auto_compaction();
1566
426k
        });
1567
7
        std::vector<std::pair<TabletSharedPtr, int64_t>> tablet_to_compact;
1568
7
        tablet_to_compact.reserve(n + 1);
1569
7
        std::vector<std::pair<TabletSharedPtr, int64_t>> tablet_to_follow;
1570
7
        tablet_to_follow.reserve(n + 1);
1571
1572
7
        for (auto& t : tablets) {
1573
3
            if (t->replica_id() == t->cooldown_conf_unlocked().cooldown_replica_id) {
1574
3
                auto score = t->calc_cold_data_compaction_score();
1575
3
                if (score < config::cold_data_compaction_score_threshold) {
1576
3
                    continue;
1577
3
                }
1578
0
                tablet_to_compact.emplace_back(t, score);
1579
0
                if (tablet_to_compact.size() > n) {
1580
0
                    std::sort(tablet_to_compact.begin(), tablet_to_compact.end(),
1581
0
                              [](auto& a, auto& b) { return a.second > b.second; });
1582
0
                    tablet_to_compact.pop_back();
1583
0
                }
1584
0
                continue;
1585
3
            }
1586
            // else, need to follow
1587
0
            {
1588
0
                std::lock_guard lock(_running_cooldown_mutex);
1589
0
                if (_running_cooldown_tablets.contains(t->table_id())) {
1590
                    // already in cooldown queue
1591
0
                    continue;
1592
0
                }
1593
0
            }
1594
            // TODO(plat1ko): some avoidance strategy if failed to follow
1595
0
            auto score = t->calc_cold_data_compaction_score();
1596
0
            tablet_to_follow.emplace_back(t, score);
1597
1598
0
            if (tablet_to_follow.size() > n) {
1599
0
                std::sort(tablet_to_follow.begin(), tablet_to_follow.end(),
1600
0
                          [](auto& a, auto& b) { return a.second > b.second; });
1601
0
                tablet_to_follow.pop_back();
1602
0
            }
1603
0
        }
1604
1605
7
        for (auto& [tablet, score] : tablet_to_compact) {
1606
0
            LOG(INFO) << "submit cold data compaction. tablet_id=" << tablet->tablet_id()
1607
0
                      << " score=" << score;
1608
0
            static_cast<void>(
1609
0
                    _cold_data_compaction_thread_pool->submit_func([t = std::move(tablet), this]() {
1610
0
                        _handle_cold_data_compaction(std::move(t));
1611
0
                    }));
1612
0
        }
1613
1614
7
        for (auto& [tablet, score] : tablet_to_follow) {
1615
0
            LOG(INFO) << "submit to follow cooldown meta. tablet_id=" << tablet->tablet_id()
1616
0
                      << " score=" << score;
1617
0
            static_cast<void>(_cold_data_compaction_thread_pool->submit_func(
1618
0
                    [t = std::move(tablet), this]() { _follow_cooldown_meta(std::move(t)); }));
1619
0
        }
1620
7
    }
1621
3
}
1622
1623
0
void StorageEngine::_handle_cold_data_compaction(TabletSharedPtr t) {
1624
0
    auto compaction = std::make_shared<ColdDataCompaction>(*this, t);
1625
0
    {
1626
0
        std::lock_guard lock(_cold_compaction_tablet_submitted_mtx);
1627
0
        _cold_compaction_tablet_submitted.insert(t->tablet_id());
1628
0
    }
1629
0
    Defer defer {[&] {
1630
0
        std::lock_guard lock(_cold_compaction_tablet_submitted_mtx);
1631
0
        _cold_compaction_tablet_submitted.erase(t->tablet_id());
1632
0
    }};
1633
0
    std::unique_lock cold_compaction_lock(t->get_cold_compaction_lock(), std::try_to_lock);
1634
0
    if (!cold_compaction_lock.owns_lock()) {
1635
0
        LOG(WARNING) << "try cold_compaction_lock failed, tablet_id=" << t->tablet_id();
1636
0
        return;
1637
0
    }
1638
0
    _update_cumulative_compaction_policy();
1639
0
    if (t->get_cumulative_compaction_policy() == nullptr ||
1640
0
        t->get_cumulative_compaction_policy()->name() != t->tablet_meta()->compaction_policy()) {
1641
0
        t->set_cumulative_compaction_policy(
1642
0
                _get_cumulative_compaction_policy(t->tablet_meta()->compaction_policy()));
1643
0
    }
1644
1645
0
    auto st = compaction->prepare_compact();
1646
0
    if (!st.ok()) {
1647
0
        LOG(WARNING) << "failed to prepare cold data compaction. tablet_id=" << t->tablet_id()
1648
0
                     << " err=" << st;
1649
0
        return;
1650
0
    }
1651
1652
0
    st = compaction->execute_compact();
1653
0
    if (!st.ok()) {
1654
0
        LOG(WARNING) << "failed to execute cold data compaction. tablet_id=" << t->tablet_id()
1655
0
                     << " err=" << st;
1656
0
        return;
1657
0
    }
1658
0
}
1659
1660
0
void StorageEngine::_follow_cooldown_meta(TabletSharedPtr t) {
1661
0
    {
1662
0
        std::lock_guard lock(_cold_compaction_tablet_submitted_mtx);
1663
0
        _cold_compaction_tablet_submitted.insert(t->tablet_id());
1664
0
    }
1665
0
    auto st = t->cooldown();
1666
0
    {
1667
0
        std::lock_guard lock(_cold_compaction_tablet_submitted_mtx);
1668
0
        _cold_compaction_tablet_submitted.erase(t->tablet_id());
1669
0
    }
1670
0
    if (!st.ok()) {
1671
        // The cooldown of the replica may be relatively slow
1672
        // resulting in a short period of time where following cannot be successful
1673
0
        LOG_EVERY_N(WARNING, 5) << "failed to cooldown. tablet_id=" << t->tablet_id()
1674
0
                                << " err=" << st;
1675
0
    }
1676
0
}
1677
1678
void StorageEngine::add_async_publish_task(int64_t partition_id, int64_t tablet_id,
1679
                                           int64_t publish_version, int64_t transaction_id,
1680
2.05k
                                           bool is_recovery, int64_t commit_tso) {
1681
2.05k
    if (!is_recovery) {
1682
2.05k
        bool exists = false;
1683
2.05k
        {
1684
2.05k
            std::shared_lock<std::shared_mutex> rlock(_async_publish_lock);
1685
2.05k
            if (auto tablet_iter = _async_publish_tasks.find(tablet_id);
1686
2.05k
                tablet_iter != _async_publish_tasks.end()) {
1687
2.05k
                if (auto iter = tablet_iter->second.find(publish_version);
1688
2.05k
                    iter != tablet_iter->second.end()) {
1689
20
                    exists = true;
1690
20
                }
1691
2.05k
            }
1692
2.05k
        }
1693
2.05k
        if (exists) {
1694
20
            return;
1695
20
        }
1696
2.03k
        TabletSharedPtr tablet = tablet_manager()->get_tablet(tablet_id);
1697
2.03k
        if (tablet == nullptr) {
1698
0
            LOG(INFO) << "tablet may be dropped when add async publish task, tablet_id: "
1699
0
                      << tablet_id;
1700
0
            return;
1701
0
        }
1702
2.03k
        PendingPublishInfoPB pending_publish_info_pb;
1703
2.03k
        pending_publish_info_pb.set_partition_id(partition_id);
1704
2.03k
        pending_publish_info_pb.set_transaction_id(transaction_id);
1705
2.03k
        pending_publish_info_pb.set_commit_tso(commit_tso);
1706
2.03k
        static_cast<void>(TabletMetaManager::save_pending_publish_info(
1707
2.03k
                tablet->data_dir(), tablet->tablet_id(), publish_version,
1708
2.03k
                pending_publish_info_pb.SerializeAsString()));
1709
2.03k
    }
1710
2.05k
    LOG(INFO) << "add pending publish task, tablet_id: " << tablet_id
1711
2.03k
              << " version: " << publish_version << " txn_id:" << transaction_id
1712
2.03k
              << " is_recovery: " << is_recovery;
1713
2.03k
    std::unique_lock<std::shared_mutex> wlock(_async_publish_lock);
1714
2.03k
    _async_publish_tasks[tablet_id][publish_version] = {transaction_id, partition_id, commit_tso};
1715
2.03k
}
1716
1717
3
int64_t StorageEngine::get_pending_publish_min_version(int64_t tablet_id) {
1718
3
    std::shared_lock<std::shared_mutex> rlock(_async_publish_lock);
1719
3
    auto iter = _async_publish_tasks.find(tablet_id);
1720
3
    if (iter == _async_publish_tasks.end()) {
1721
0
        return INT64_MAX;
1722
0
    }
1723
3
    if (iter->second.empty()) {
1724
0
        return INT64_MAX;
1725
0
    }
1726
3
    return iter->second.begin()->first;
1727
3
}
1728
1729
26.4k
void StorageEngine::_process_async_publish() {
1730
    // tablet, publish_version
1731
26.4k
    std::vector<std::pair<TabletSharedPtr, int64_t>> need_removed_tasks;
1732
26.4k
    {
1733
26.4k
        std::unique_lock<std::shared_mutex> wlock(_async_publish_lock);
1734
26.4k
        for (auto tablet_iter = _async_publish_tasks.begin();
1735
26.4k
             tablet_iter != _async_publish_tasks.end();) {
1736
10
            if (tablet_iter->second.empty()) {
1737
1
                tablet_iter = _async_publish_tasks.erase(tablet_iter);
1738
1
                continue;
1739
1
            }
1740
9
            int64_t tablet_id = tablet_iter->first;
1741
9
            TabletSharedPtr tablet = tablet_manager()->get_tablet(tablet_id);
1742
9
            if (!tablet) {
1743
1
                LOG(WARNING) << "tablet does not exist when async publush, tablet_id: "
1744
1
                             << tablet_id;
1745
1
                tablet_iter = _async_publish_tasks.erase(tablet_iter);
1746
1
                continue;
1747
1
            }
1748
1749
8
            auto task_iter = tablet_iter->second.begin();
1750
8
            int64_t version = task_iter->first;
1751
8
            int64_t transaction_id = std::get<0>(task_iter->second);
1752
8
            int64_t partition_id = std::get<1>(task_iter->second);
1753
8
            int64_t commit_tso = std::get<2>(task_iter->second);
1754
8
            int64_t max_version = tablet->max_version().second;
1755
1756
8
            if (version <= max_version) {
1757
6
                need_removed_tasks.emplace_back(tablet, version);
1758
6
                tablet_iter->second.erase(task_iter);
1759
6
                tablet_iter++;
1760
6
                continue;
1761
6
            }
1762
2
            if (version != max_version + 1) {
1763
1
                int32_t max_version_config = tablet->max_version_config();
1764
                // Keep only the most recent versions
1765
31
                while (tablet_iter->second.size() > max_version_config) {
1766
30
                    need_removed_tasks.emplace_back(tablet, version);
1767
30
                    task_iter = tablet_iter->second.erase(task_iter);
1768
30
                    version = task_iter->first;
1769
30
                }
1770
1
                tablet_iter++;
1771
1
                continue;
1772
1
            }
1773
1774
1
            auto async_publish_task = std::make_shared<AsyncTabletPublishTask>(
1775
1
                    *this, tablet, partition_id, transaction_id, version, commit_tso);
1776
1
            static_cast<void>(_tablet_publish_txn_thread_pool->submit_func(
1777
1
                    [=]() { async_publish_task->handle(); }));
1778
1
            tablet_iter->second.erase(task_iter);
1779
1
            need_removed_tasks.emplace_back(tablet, version);
1780
1
            tablet_iter++;
1781
1
        }
1782
26.4k
    }
1783
26.4k
    for (auto& [tablet, publish_version] : need_removed_tasks) {
1784
37
        static_cast<void>(TabletMetaManager::remove_pending_publish_info(
1785
37
                tablet->data_dir(), tablet->tablet_id(), publish_version));
1786
37
    }
1787
26.4k
}
1788
1789
3
void StorageEngine::_async_publish_callback() {
1790
26.4k
    while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(30))) {
1791
26.4k
        _process_async_publish();
1792
26.4k
    }
1793
3
}
1794
1795
3
void StorageEngine::_check_tablet_delete_bitmap_score_callback() {
1796
3
    LOG(INFO) << "try to start check tablet delete bitmap score!";
1797
3
    while (!_stop_background_threads_latch.wait_for(
1798
3
            std::chrono::seconds(config::check_tablet_delete_bitmap_interval_seconds))) {
1799
0
        if (!config::enable_check_tablet_delete_bitmap_score) {
1800
0
            return;
1801
0
        }
1802
0
        uint64_t max_delete_bitmap_score = 0;
1803
0
        uint64_t max_base_rowset_delete_bitmap_score = 0;
1804
0
        _tablet_manager->get_topn_tablet_delete_bitmap_score(&max_delete_bitmap_score,
1805
0
                                                             &max_base_rowset_delete_bitmap_score);
1806
0
        if (max_delete_bitmap_score > 0) {
1807
0
            _tablet_max_delete_bitmap_score_metrics->set_value(max_delete_bitmap_score);
1808
0
        }
1809
0
        if (max_base_rowset_delete_bitmap_score > 0) {
1810
0
            _tablet_max_base_rowset_delete_bitmap_score_metrics->set_value(
1811
0
                    max_base_rowset_delete_bitmap_score);
1812
0
        }
1813
0
    }
1814
3
}
1815
} // namespace doris