Coverage Report

Created: 2026-06-02 10:57

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/olap_server.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include <gen_cpp/Types_types.h>
19
#include <gen_cpp/olap_file.pb.h>
20
#include <glog/logging.h>
21
#include <rapidjson/prettywriter.h>
22
#include <rapidjson/stringbuffer.h>
23
#include <stdint.h>
24
#include <sys/types.h>
25
26
#include <algorithm>
27
#include <atomic>
28
// IWYU pragma: no_include <bits/chrono.h>
29
#include <gen_cpp/FrontendService.h>
30
#include <gen_cpp/internal_service.pb.h>
31
32
#include <chrono> // IWYU pragma: keep
33
#include <cmath>
34
#include <condition_variable>
35
#include <cstdint>
36
#include <ctime>
37
#include <functional>
38
#include <map>
39
#include <memory>
40
#include <mutex>
41
#include <ostream>
42
#include <random>
43
#include <shared_mutex>
44
#include <string>
45
#include <thread>
46
#include <unordered_set>
47
#include <utility>
48
#include <vector>
49
50
#include "agent/utils.h"
51
#include "common/config.h"
52
#include "common/logging.h"
53
#include "common/metrics/doris_metrics.h"
54
#include "common/metrics/metrics.h"
55
#include "common/status.h"
56
#include "cpp/sync_point.h"
57
#include "io/fs/file_writer.h" // IWYU pragma: keep
58
#include "io/fs/path.h"
59
#include "load/memtable/memtable_flush_executor.h"
60
#include "runtime/memory/cache_manager.h"
61
#include "runtime/memory/global_memory_arbitrator.h"
62
#include "storage/compaction/cold_data_compaction.h"
63
#include "storage/compaction/compaction_permit_limiter.h"
64
#include "storage/compaction/cumulative_compaction.h"
65
#include "storage/compaction/cumulative_compaction_policy.h"
66
#include "storage/compaction/cumulative_compaction_time_series_policy.h"
67
#include "storage/compaction/single_replica_compaction.h"
68
#include "storage/compaction_task_tracker.h"
69
#include "storage/data_dir.h"
70
#include "storage/olap_common.h"
71
#include "storage/olap_define.h"
72
#include "storage/rowset/segcompaction.h"
73
#include "storage/schema_change/schema_change.h"
74
#include "storage/storage_engine.h"
75
#include "storage/storage_policy.h"
76
#include "storage/tablet/base_tablet.h"
77
#include "storage/tablet/tablet.h"
78
#include "storage/tablet/tablet_manager.h"
79
#include "storage/tablet/tablet_meta.h"
80
#include "storage/tablet/tablet_meta_manager.h"
81
#include "storage/tablet/tablet_schema.h"
82
#include "storage/task/engine_publish_version_task.h"
83
#include "storage/task/index_builder.h"
84
#include "util/client_cache.h"
85
#include "util/countdown_latch.h"
86
#include "util/debug_points.h"
87
#include "util/mem_info.h"
88
#include "util/thread.h"
89
#include "util/threadpool.h"
90
#include "util/thrift_rpc_helper.h"
91
#include "util/time.h"
92
#include "util/uid_util.h"
93
#include "util/work_thread_pool.hpp"
94
95
using std::string;
96
97
namespace doris {
98
using io::Path;
99
100
// number of running SCHEMA-CHANGE threads
101
volatile uint32_t g_schema_change_active_threads = 0;
102
bvar::Status<int64_t> g_cumu_compaction_task_num_per_round("cumu_compaction_task_num_per_round", 0);
103
bvar::Status<int64_t> g_base_compaction_task_num_per_round("base_compaction_task_num_per_round", 0);
104
105
static const uint64_t DEFAULT_SEED = 104729;
106
static const uint64_t MOD_PRIME = 7652413;
107
108
0
CompactionSubmitRegistry::CompactionSubmitRegistry(CompactionSubmitRegistry&& r) {
109
0
    std::swap(_tablet_submitted_cumu_compaction, r._tablet_submitted_cumu_compaction);
110
0
    std::swap(_tablet_submitted_base_compaction, r._tablet_submitted_base_compaction);
111
0
    std::swap(_tablet_submitted_full_compaction, r._tablet_submitted_full_compaction);
112
0
    std::swap(_tablet_submitted_binlog_compaction, r._tablet_submitted_binlog_compaction);
113
0
}
114
115
98.4k
CompactionSubmitRegistry CompactionSubmitRegistry::create_snapshot() {
116
    // full compaction is not engaged in this method
117
98.4k
    std::unique_lock<std::mutex> l(_tablet_submitted_compaction_mutex);
118
98.4k
    CompactionSubmitRegistry registry;
119
98.4k
    registry._tablet_submitted_base_compaction = _tablet_submitted_base_compaction;
120
98.4k
    registry._tablet_submitted_cumu_compaction = _tablet_submitted_cumu_compaction;
121
98.4k
    registry._tablet_submitted_binlog_compaction = _tablet_submitted_binlog_compaction;
122
98.4k
    return registry;
123
98.4k
}
124
125
13
void CompactionSubmitRegistry::reset(const std::vector<DataDir*>& stores) {
126
    // full compaction is not engaged in this method
127
13
    std::unique_lock<std::mutex> l(_tablet_submitted_compaction_mutex);
128
13
    for (const auto& store : stores) {
129
10
        _tablet_submitted_cumu_compaction[store] = {};
130
10
        _tablet_submitted_base_compaction[store] = {};
131
10
        _tablet_submitted_binlog_compaction[store] = {};
132
10
    }
133
13
}
134
135
uint32_t CompactionSubmitRegistry::count_executing_compaction(DataDir* dir,
136
117k
                                                              CompactionType compaction_type) {
137
    // non-lock, used in snapshot
138
117k
    const auto& compaction_tasks = _get_tablet_set(dir, compaction_type);
139
117k
    return cast_set<uint32_t>(std::count_if(
140
117k
            compaction_tasks.begin(), compaction_tasks.end(),
141
117k
            [](const auto& task) { return task->compaction_stage == CompactionStage::EXECUTING; }));
142
117k
}
143
144
11.0k
uint32_t CompactionSubmitRegistry::count_executing_cumu_and_base(DataDir* dir) {
145
    // non-lock, used in snapshot
146
11.0k
    return count_executing_compaction(dir, CompactionType::BASE_COMPACTION) +
147
11.0k
           count_executing_compaction(dir, CompactionType::CUMULATIVE_COMPACTION);
148
11.0k
}
149
150
213k
bool CompactionSubmitRegistry::has_compaction_task(DataDir* dir, CompactionType compaction_type) {
151
    // non-lock, used in snapshot
152
213k
    return !_get_tablet_set(dir, compaction_type).empty();
153
213k
}
154
155
std::vector<TabletCompactionContext> CompactionSubmitRegistry::pick_topn_tablets_for_compaction(
156
        TabletManager* tablet_mgr, DataDir* data_dir, CompactionType compaction_type,
157
106k
        const CumuCompactionPolicyTable& cumu_compaction_policies, uint32_t* disk_max_score) {
158
    // non-lock, used in snapshot
159
106k
    return tablet_mgr->find_best_tablets_to_compaction(compaction_type, data_dir,
160
106k
                                                       _get_tablet_set(data_dir, compaction_type),
161
106k
                                                       disk_max_score, cumu_compaction_policies);
162
106k
}
163
164
237k
bool CompactionSubmitRegistry::insert(TabletSharedPtr tablet, CompactionType compaction_type) {
165
237k
    std::unique_lock<std::mutex> l(_tablet_submitted_compaction_mutex);
166
237k
    auto& tablet_set = _get_tablet_set(tablet->data_dir(), compaction_type);
167
237k
    bool already_exist = !(tablet_set.insert(tablet).second);
168
237k
    return already_exist;
169
237k
}
170
171
void CompactionSubmitRegistry::remove(TabletSharedPtr tablet, CompactionType compaction_type,
172
237k
                                      std::function<void()> wakeup_cb) {
173
237k
    std::unique_lock<std::mutex> l(_tablet_submitted_compaction_mutex);
174
237k
    auto& tablet_set = _get_tablet_set(tablet->data_dir(), compaction_type);
175
237k
    size_t removed = tablet_set.erase(tablet);
176
237k
    if (removed == 1) {
177
237k
        wakeup_cb();
178
237k
    }
179
237k
}
180
181
CompactionSubmitRegistry::TabletSet& CompactionSubmitRegistry::_get_tablet_set(
182
912k
        DataDir* dir, CompactionType compaction_type) {
183
912k
    switch (compaction_type) {
184
114k
    case CompactionType::BASE_COMPACTION:
185
114k
        return _tablet_submitted_base_compaction[dir];
186
605k
    case CompactionType::CUMULATIVE_COMPACTION:
187
605k
        return _tablet_submitted_cumu_compaction[dir];
188
0
    case CompactionType::FULL_COMPACTION:
189
0
        return _tablet_submitted_full_compaction[dir];
190
192k
    case CompactionType::BINLOG_COMPACTION:
191
192k
        return _tablet_submitted_binlog_compaction[dir];
192
0
    default:
193
0
        CHECK(false) << "invalid compaction type";
194
912k
    }
195
912k
}
196
197
98.4k
static int32_t get_cumu_compaction_threads_num(size_t data_dirs_num) {
198
98.4k
    int32_t threads_num = config::max_cumu_compaction_threads;
199
98.4k
    if (threads_num == -1) {
200
98.4k
        int32_t num_cores = doris::CpuInfo::num_cores();
201
98.4k
        threads_num = std::max(cast_set<int32_t>(data_dirs_num), num_cores / 6);
202
98.4k
    }
203
98.4k
    threads_num = threads_num <= 0 ? 1 : threads_num;
204
98.4k
    return threads_num;
205
98.4k
}
206
207
98.4k
static int32_t get_base_compaction_threads_num(size_t data_dirs_num) {
208
98.4k
    int32_t threads_num = config::max_base_compaction_threads;
209
98.4k
    if (threads_num == -1) {
210
0
        threads_num = cast_set<int32_t>(data_dirs_num);
211
0
    }
212
98.4k
    threads_num = threads_num <= 0 ? 1 : threads_num;
213
98.4k
    return threads_num;
214
98.4k
}
215
216
98.4k
static int32_t get_single_replica_compaction_threads_num(size_t data_dirs_num) {
217
98.4k
    int32_t threads_num = config::max_single_replica_compaction_threads;
218
98.4k
    if (threads_num == -1) {
219
98.4k
        threads_num = cast_set<int32_t>(data_dirs_num);
220
98.4k
    }
221
98.4k
    threads_num = threads_num <= 0 ? 1 : threads_num;
222
98.4k
    return threads_num;
223
98.4k
}
224
225
98.4k
static int32_t get_binlog_compaction_threads_num(size_t data_dirs_num) {
226
98.4k
    int32_t threads_num = config::max_binlog_compaction_threads;
227
98.4k
    if (threads_num == -1) {
228
98.4k
        threads_num = cast_set<int32_t>(data_dirs_num);
229
98.4k
    }
230
98.4k
    threads_num = threads_num <= 0 ? 1 : threads_num;
231
98.4k
    return threads_num;
232
98.4k
}
233
234
6
Status StorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr) {
235
6
    RETURN_IF_ERROR(Thread::create(
236
6
            "StorageEngine", "unused_rowset_monitor_thread",
237
6
            [this]() { this->_unused_rowset_monitor_thread_callback(); },
238
6
            &_unused_rowset_monitor_thread));
239
6
    LOG(INFO) << "unused rowset monitor thread started";
240
241
6
    RETURN_IF_ERROR(Thread::create(
242
6
            "StorageEngine", "evict_querying_rowset_thread",
243
6
            [this]() { this->_evict_quring_rowset_thread_callback(); },
244
6
            &_evict_quering_rowset_thread));
245
6
    LOG(INFO) << "evict quering thread started";
246
247
    // start thread for monitoring the snapshot and trash folder
248
6
    RETURN_IF_ERROR(Thread::create(
249
6
            "StorageEngine", "garbage_sweeper_thread",
250
6
            [this]() { this->_garbage_sweeper_thread_callback(); }, &_garbage_sweeper_thread));
251
6
    LOG(INFO) << "garbage sweeper thread started";
252
253
    // start thread for monitoring the tablet with io error
254
6
    RETURN_IF_ERROR(Thread::create(
255
6
            "StorageEngine", "disk_stat_monitor_thread",
256
6
            [this]() { this->_disk_stat_monitor_thread_callback(); }, &_disk_stat_monitor_thread));
257
6
    LOG(INFO) << "disk stat monitor thread started";
258
259
    // convert store map to vector
260
6
    std::vector<DataDir*> data_dirs = get_stores();
261
262
6
    auto base_compaction_threads = get_base_compaction_threads_num(data_dirs.size());
263
6
    auto cumu_compaction_threads = get_cumu_compaction_threads_num(data_dirs.size());
264
6
    auto single_replica_compaction_threads =
265
6
            get_single_replica_compaction_threads_num(data_dirs.size());
266
6
    auto binlog_compaction_threads = get_binlog_compaction_threads_num(data_dirs.size());
267
268
6
    RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
269
6
                            .set_min_threads(base_compaction_threads)
270
6
                            .set_max_threads(base_compaction_threads)
271
6
                            .build(&_base_compaction_thread_pool));
272
6
    RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
273
6
                            .set_min_threads(cumu_compaction_threads)
274
6
                            .set_max_threads(cumu_compaction_threads)
275
6
                            .build(&_cumu_compaction_thread_pool));
276
6
    RETURN_IF_ERROR(ThreadPoolBuilder("SingleReplicaCompactionTaskThreadPool")
277
6
                            .set_min_threads(single_replica_compaction_threads)
278
6
                            .set_max_threads(single_replica_compaction_threads)
279
6
                            .build(&_single_replica_compaction_thread_pool));
280
6
    RETURN_IF_ERROR(ThreadPoolBuilder("BinlogCompactionTaskThreadPool")
281
6
                            .set_min_threads(binlog_compaction_threads)
282
6
                            .set_max_threads(binlog_compaction_threads)
283
6
                            .build(&_binlog_compaction_thread_pool));
284
285
6
    if (config::enable_segcompaction) {
286
6
        RETURN_IF_ERROR(ThreadPoolBuilder("SegCompactionTaskThreadPool")
287
6
                                .set_min_threads(config::segcompaction_num_threads)
288
6
                                .set_max_threads(config::segcompaction_num_threads)
289
6
                                .build(&_seg_compaction_thread_pool));
290
6
    }
291
6
    RETURN_IF_ERROR(ThreadPoolBuilder("ColdDataCompactionTaskThreadPool")
292
6
                            .set_min_threads(config::cold_data_compaction_thread_num)
293
6
                            .set_max_threads(config::cold_data_compaction_thread_num)
294
6
                            .build(&_cold_data_compaction_thread_pool));
295
296
6
    _compaction_submit_registry.reset(data_dirs);
297
298
    // compaction tasks producer thread
299
6
    RETURN_IF_ERROR(Thread::create(
300
6
            "StorageEngine", "compaction_tasks_producer_thread",
301
6
            [this]() { this->_compaction_tasks_producer_callback(); },
302
6
            &_compaction_tasks_producer_thread));
303
6
    LOG(INFO) << "compaction tasks producer thread started";
304
305
6
    RETURN_IF_ERROR(Thread::create(
306
6
            "StorageEngine", "binlog_compaction_tasks_producer_thread",
307
6
            [this]() { this->_binlog_compaction_tasks_producer_callback(); },
308
6
            &_binlog_compaction_tasks_producer_thread));
309
6
    LOG(INFO) << "binlog compaction tasks producer thread started";
310
311
6
    RETURN_IF_ERROR(Thread::create(
312
6
            "StorageEngine", "_update_replica_infos_thread",
313
6
            [this]() { this->_update_replica_infos_callback(); }, &_update_replica_infos_thread));
314
6
    LOG(INFO) << "tablet replicas info update thread started";
315
316
6
    int32_t max_checkpoint_thread_num = config::max_meta_checkpoint_threads;
317
6
    if (max_checkpoint_thread_num < 0) {
318
6
        max_checkpoint_thread_num = cast_set<int32_t>(data_dirs.size());
319
6
    }
320
6
    RETURN_IF_ERROR(ThreadPoolBuilder("TabletMetaCheckpointTaskThreadPool")
321
6
                            .set_max_threads(max_checkpoint_thread_num)
322
6
                            .build(&_tablet_meta_checkpoint_thread_pool));
323
324
6
    RETURN_IF_ERROR(Thread::create(
325
6
            "StorageEngine", "tablet_checkpoint_tasks_producer_thread",
326
6
            [this, data_dirs]() { this->_tablet_checkpoint_callback(data_dirs); },
327
6
            &_tablet_checkpoint_tasks_producer_thread));
328
6
    LOG(INFO) << "tablet checkpoint tasks producer thread started";
329
330
6
    RETURN_IF_ERROR(Thread::create(
331
6
            "StorageEngine", "tablet_path_check_thread",
332
6
            [this]() { this->_tablet_path_check_callback(); }, &_tablet_path_check_thread));
333
6
    LOG(INFO) << "tablet path check thread started";
334
335
    // path scan and gc thread
336
6
    if (config::path_gc_check) {
337
10
        for (auto data_dir : get_stores()) {
338
10
            std::shared_ptr<Thread> path_gc_thread;
339
10
            RETURN_IF_ERROR(Thread::create(
340
10
                    "StorageEngine", "path_gc_thread",
341
10
                    [this, data_dir]() { this->_path_gc_thread_callback(data_dir); },
342
10
                    &path_gc_thread));
343
10
            _path_gc_threads.emplace_back(path_gc_thread);
344
10
        }
345
6
        LOG(INFO) << "path gc threads started. number:" << get_stores().size();
346
6
    }
347
348
6
    RETURN_IF_ERROR(ThreadPoolBuilder("CooldownTaskThreadPool")
349
6
                            .set_min_threads(config::cooldown_thread_num)
350
6
                            .set_max_threads(config::cooldown_thread_num)
351
6
                            .build(&_cooldown_thread_pool));
352
6
    LOG(INFO) << "cooldown thread pool started";
353
354
6
    RETURN_IF_ERROR(Thread::create(
355
6
            "StorageEngine", "cooldown_tasks_producer_thread",
356
6
            [this]() { this->_cooldown_tasks_producer_callback(); },
357
6
            &_cooldown_tasks_producer_thread));
358
6
    LOG(INFO) << "cooldown tasks producer thread started";
359
360
6
    RETURN_IF_ERROR(Thread::create(
361
6
            "StorageEngine", "remove_unused_remote_files_thread",
362
6
            [this]() { this->_remove_unused_remote_files_callback(); },
363
6
            &_remove_unused_remote_files_thread));
364
6
    LOG(INFO) << "remove unused remote files thread started";
365
366
6
    RETURN_IF_ERROR(Thread::create(
367
6
            "StorageEngine", "cold_data_compaction_producer_thread",
368
6
            [this]() { this->_cold_data_compaction_producer_callback(); },
369
6
            &_cold_data_compaction_producer_thread));
370
6
    LOG(INFO) << "cold data compaction producer thread started";
371
372
    // add tablet publish version thread pool
373
6
    RETURN_IF_ERROR(ThreadPoolBuilder("TabletPublishTxnThreadPool")
374
6
                            .set_min_threads(config::tablet_publish_txn_max_thread)
375
6
                            .set_max_threads(config::tablet_publish_txn_max_thread)
376
6
                            .build(&_tablet_publish_txn_thread_pool));
377
378
6
    RETURN_IF_ERROR(Thread::create(
379
6
            "StorageEngine", "async_publish_version_thread",
380
6
            [this]() { this->_async_publish_callback(); }, &_async_publish_thread));
381
6
    LOG(INFO) << "async publish thread started";
382
383
6
    RETURN_IF_ERROR(Thread::create(
384
6
            "StorageEngine", "check_tablet_delete_bitmap_score_thread",
385
6
            [this]() { this->_check_tablet_delete_bitmap_score_callback(); },
386
6
            &_check_delete_bitmap_score_thread));
387
6
    LOG(INFO) << "check tablet delete bitmap score thread started";
388
389
6
    _start_adaptive_thread_controller();
390
391
6
    LOG(INFO) << "all storage engine's background threads are started.";
392
6
    return Status::OK();
393
6
}
394
395
6
void StorageEngine::_garbage_sweeper_thread_callback() {
396
6
    uint32_t max_interval = config::max_garbage_sweep_interval;
397
6
    uint32_t min_interval = config::min_garbage_sweep_interval;
398
399
6
    if (max_interval < min_interval || min_interval <= 0) {
400
0
        LOG(WARNING) << "garbage sweep interval config is illegal: [max=" << max_interval
401
0
                     << " min=" << min_interval << "].";
402
0
        min_interval = 1;
403
0
        max_interval = max_interval >= min_interval ? max_interval : min_interval;
404
0
        LOG(INFO) << "force reset garbage sweep interval. "
405
0
                  << "max_interval=" << max_interval << ", min_interval=" << min_interval;
406
0
    }
407
408
6
    const double pi = M_PI;
409
6
    double usage = 1.0;
410
    // After the program starts, the first round of cleaning starts after min_interval.
411
6
    uint32_t curr_interval = min_interval;
412
56
    do {
413
        // Function properties:
414
        // when usage < 0.6,          ratio close to 1.(interval close to max_interval)
415
        // when usage at [0.6, 0.75], ratio is rapidly decreasing from 0.87 to 0.27.
416
        // when usage > 0.75,         ratio is slowly decreasing.
417
        // when usage > 0.8,          ratio close to min_interval.
418
        // when usage = 0.88,         ratio is approximately 0.0057.
419
56
        double ratio = (1.1 * (pi / 2 - std::atan(usage * 100 / 5 - 14)) - 0.28) / pi;
420
56
        ratio = ratio > 0 ? ratio : 0;
421
        // TODO(dx): fix it
422
56
        auto curr_interval_not_work = uint32_t(max_interval * ratio);
423
56
        curr_interval_not_work = std::max(curr_interval_not_work, min_interval);
424
56
        curr_interval_not_work = std::min(curr_interval_not_work, max_interval);
425
426
        // start clean trash and update usage.
427
56
        Status res = start_trash_sweep(&usage);
428
56
        if (res.ok() && _need_clean_trash.exchange(false, std::memory_order_relaxed)) {
429
0
            res = start_trash_sweep(&usage, true);
430
0
        }
431
432
56
        if (!res.ok()) {
433
0
            LOG(WARNING) << "one or more errors occur when sweep trash."
434
0
                         << "see previous message for detail. err code=" << res;
435
            // do nothing. continue next loop.
436
0
        }
437
56
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(curr_interval)));
438
6
}
439
440
6
void StorageEngine::_disk_stat_monitor_thread_callback() {
441
6
    int32_t interval = config::disk_stat_monitor_interval;
442
1.86k
    do {
443
1.86k
        _start_disk_stat_monitor();
444
445
1.86k
        interval = config::disk_stat_monitor_interval;
446
1.86k
        if (interval <= 0) {
447
0
            LOG(WARNING) << "disk_stat_monitor_interval config is illegal: " << interval
448
0
                         << ", force set to 1";
449
0
            interval = 1;
450
0
        }
451
1.86k
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
452
6
}
453
454
6
void StorageEngine::_unused_rowset_monitor_thread_callback() {
455
6
    int32_t interval = config::unused_rowset_monitor_interval;
456
317
    do {
457
317
        start_delete_unused_rowset();
458
459
317
        interval = config::unused_rowset_monitor_interval;
460
317
        if (interval <= 0) {
461
0
            LOG(WARNING) << "unused_rowset_monitor_interval config is illegal: " << interval
462
0
                         << ", force set to 1";
463
0
            interval = 1;
464
0
        }
465
317
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
466
6
}
467
468
2.04k
int32_t StorageEngine::_auto_get_interval_by_disk_capacity(DataDir* data_dir) {
469
2.04k
    double disk_used = data_dir->get_usage(0);
470
2.04k
    double remain_used = 1 - disk_used;
471
2.04k
    DCHECK(remain_used >= 0 && remain_used <= 1);
472
2.04k
    DCHECK(config::path_gc_check_interval_second >= 0);
473
2.04k
    int32_t ret = 0;
474
2.04k
    if (remain_used > 0.9) {
475
        // if config::path_gc_check_interval_second == 24h
476
0
        ret = config::path_gc_check_interval_second;
477
2.04k
    } else if (remain_used > 0.7) {
478
        // 12h
479
0
        ret = config::path_gc_check_interval_second / 2;
480
2.04k
    } else if (remain_used > 0.5) {
481
        // 6h
482
0
        ret = config::path_gc_check_interval_second / 4;
483
2.04k
    } else if (remain_used > 0.3) {
484
        // 4h
485
413
        ret = config::path_gc_check_interval_second / 6;
486
1.63k
    } else {
487
        // 3h
488
1.63k
        ret = config::path_gc_check_interval_second / 8;
489
1.63k
    }
490
2.04k
    return ret;
491
2.04k
}
492
493
10
void StorageEngine::_path_gc_thread_callback(DataDir* data_dir) {
494
10
    LOG(INFO) << "try to start path gc thread!";
495
10
    time_t last_exec_time = 0;
496
2.04k
    do {
497
2.04k
        time_t current_time = time(nullptr);
498
499
2.04k
        int32_t interval = _auto_get_interval_by_disk_capacity(data_dir);
500
2.04k
        DBUG_EXECUTE_IF("_path_gc_thread_callback.interval.eq.1ms", {
501
2.04k
            LOG(INFO) << "debug point change interval eq 1ms";
502
2.04k
            interval = 1;
503
2.04k
            while (DebugPoints::instance()->is_enable("_path_gc_thread_callback.always.do")) {
504
2.04k
                data_dir->perform_path_gc();
505
2.04k
                std::this_thread::sleep_for(std::chrono::milliseconds(10));
506
2.04k
            }
507
2.04k
        });
508
2.04k
        if (interval <= 0) {
509
2.04k
            LOG(WARNING) << "path gc thread check interval config is illegal:" << interval
510
2.04k
                         << " will be forced set to half hour";
511
2.04k
            interval = 1800; // 0.5 hour
512
2.04k
        }
513
2.04k
        if (current_time - last_exec_time >= interval) {
514
14
            LOG(INFO) << "try to perform path gc! disk remain [" << 1 - data_dir->get_usage(0)
515
14
                      << "] internal [" << interval << "]";
516
14
            data_dir->perform_path_gc();
517
14
            last_exec_time = time(nullptr);
518
14
        }
519
2.04k
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(5)));
520
10
    LOG(INFO) << "stop path gc thread!";
521
10
}
522
523
6
void StorageEngine::_tablet_checkpoint_callback(const std::vector<DataDir*>& data_dirs) {
524
6
    int64_t interval = config::generate_tablet_meta_checkpoint_tasks_interval_secs;
525
19
    do {
526
23
        for (auto data_dir : data_dirs) {
527
23
            LOG(INFO) << "begin to produce tablet meta checkpoint tasks, data_dir="
528
23
                      << data_dir->path();
529
23
            auto st = _tablet_meta_checkpoint_thread_pool->submit_func(
530
23
                    [data_dir, this]() { _tablet_manager->do_tablet_meta_checkpoint(data_dir); });
531
23
            if (!st.ok()) {
532
0
                LOG(WARNING) << "submit tablet checkpoint tasks failed.";
533
0
            }
534
23
        }
535
19
        interval = config::generate_tablet_meta_checkpoint_tasks_interval_secs;
536
19
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
537
6
}
538
539
6
void StorageEngine::_tablet_path_check_callback() {
540
6
    struct TabletIdComparator {
541
6
        bool operator()(Tablet* a, Tablet* b) { return a->tablet_id() < b->tablet_id(); }
542
6
    };
543
544
6
    using TabletQueue = std::priority_queue<Tablet*, std::vector<Tablet*>, TabletIdComparator>;
545
546
6
    int64_t interval = config::tablet_path_check_interval_seconds;
547
6
    if (interval <= 0) {
548
6
        return;
549
6
    }
550
551
0
    int64_t last_tablet_id = 0;
552
0
    do {
553
0
        int32_t batch_size = config::tablet_path_check_batch_size;
554
0
        if (batch_size <= 0) {
555
0
            if (_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))) {
556
0
                break;
557
0
            }
558
0
            continue;
559
0
        }
560
561
0
        LOG(INFO) << "start to check tablet path";
562
563
0
        auto all_tablets = _tablet_manager->get_all_tablet(
564
0
                [](Tablet* t) { return t->is_used() && t->tablet_state() == TABLET_RUNNING; });
565
566
0
        TabletQueue big_id_tablets;
567
0
        TabletQueue small_id_tablets;
568
0
        for (auto tablet : all_tablets) {
569
0
            auto tablet_id = tablet->tablet_id();
570
0
            TabletQueue* belong_tablets = nullptr;
571
0
            if (tablet_id > last_tablet_id) {
572
0
                if (big_id_tablets.size() < batch_size ||
573
0
                    big_id_tablets.top()->tablet_id() > tablet_id) {
574
0
                    belong_tablets = &big_id_tablets;
575
0
                }
576
0
            } else if (big_id_tablets.size() < batch_size) {
577
0
                if (small_id_tablets.size() < batch_size ||
578
0
                    small_id_tablets.top()->tablet_id() > tablet_id) {
579
0
                    belong_tablets = &small_id_tablets;
580
0
                }
581
0
            }
582
0
            if (belong_tablets != nullptr) {
583
0
                belong_tablets->push(tablet.get());
584
0
                if (belong_tablets->size() > batch_size) {
585
0
                    belong_tablets->pop();
586
0
                }
587
0
            }
588
0
        }
589
590
0
        int32_t need_small_id_tablet_size =
591
0
                batch_size - static_cast<int32_t>(big_id_tablets.size());
592
593
0
        if (!big_id_tablets.empty()) {
594
0
            last_tablet_id = big_id_tablets.top()->tablet_id();
595
0
        }
596
0
        while (!big_id_tablets.empty()) {
597
0
            big_id_tablets.top()->check_tablet_path_exists();
598
0
            big_id_tablets.pop();
599
0
        }
600
601
0
        if (!small_id_tablets.empty() && need_small_id_tablet_size > 0) {
602
0
            while (static_cast<int32_t>(small_id_tablets.size()) > need_small_id_tablet_size) {
603
0
                small_id_tablets.pop();
604
0
            }
605
606
0
            last_tablet_id = small_id_tablets.top()->tablet_id();
607
0
            while (!small_id_tablets.empty()) {
608
0
                small_id_tablets.top()->check_tablet_path_exists();
609
0
                small_id_tablets.pop();
610
0
            }
611
0
        }
612
613
0
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
614
0
}
615
616
98.4k
void StorageEngine::_adjust_compaction_thread_num() {
617
98.4k
    TEST_SYNC_POINT_RETURN_WITH_VOID("StorageEngine::_adjust_compaction_thread_num.return_void");
618
98.4k
    auto base_compaction_threads_num = get_base_compaction_threads_num(_store_map.size());
619
98.4k
    if (_base_compaction_thread_pool->max_threads() != base_compaction_threads_num) {
620
0
        int old_max_threads = _base_compaction_thread_pool->max_threads();
621
0
        Status status = _base_compaction_thread_pool->set_max_threads(base_compaction_threads_num);
622
0
        if (status.ok()) {
623
0
            VLOG_NOTICE << "update base compaction thread pool max_threads from " << old_max_threads
624
0
                        << " to " << base_compaction_threads_num;
625
0
        }
626
0
    }
627
98.4k
    if (_base_compaction_thread_pool->min_threads() != base_compaction_threads_num) {
628
0
        int old_min_threads = _base_compaction_thread_pool->min_threads();
629
0
        Status status = _base_compaction_thread_pool->set_min_threads(base_compaction_threads_num);
630
0
        if (status.ok()) {
631
0
            VLOG_NOTICE << "update base compaction thread pool min_threads from " << old_min_threads
632
0
                        << " to " << base_compaction_threads_num;
633
0
        }
634
0
    }
635
636
98.4k
    auto cumu_compaction_threads_num = get_cumu_compaction_threads_num(_store_map.size());
637
98.4k
    if (_cumu_compaction_thread_pool->max_threads() != cumu_compaction_threads_num) {
638
0
        int old_max_threads = _cumu_compaction_thread_pool->max_threads();
639
0
        Status status = _cumu_compaction_thread_pool->set_max_threads(cumu_compaction_threads_num);
640
0
        if (status.ok()) {
641
0
            VLOG_NOTICE << "update cumu compaction thread pool max_threads from " << old_max_threads
642
0
                        << " to " << cumu_compaction_threads_num;
643
0
        }
644
0
    }
645
98.4k
    if (_cumu_compaction_thread_pool->min_threads() != cumu_compaction_threads_num) {
646
0
        int old_min_threads = _cumu_compaction_thread_pool->min_threads();
647
0
        Status status = _cumu_compaction_thread_pool->set_min_threads(cumu_compaction_threads_num);
648
0
        if (status.ok()) {
649
0
            VLOG_NOTICE << "update cumu compaction thread pool min_threads from " << old_min_threads
650
0
                        << " to " << cumu_compaction_threads_num;
651
0
        }
652
0
    }
653
654
98.4k
    auto binlog_compaction_threads_num = get_binlog_compaction_threads_num(_store_map.size());
655
98.4k
    if (_binlog_compaction_thread_pool->max_threads() != binlog_compaction_threads_num) {
656
0
        int old_max_threads = _binlog_compaction_thread_pool->max_threads();
657
0
        Status status =
658
0
                _binlog_compaction_thread_pool->set_max_threads(binlog_compaction_threads_num);
659
0
        if (status.ok()) {
660
0
            VLOG_NOTICE << "update binlog compaction thread pool max_threads from "
661
0
                        << old_max_threads << " to " << binlog_compaction_threads_num;
662
0
        }
663
0
    }
664
98.4k
    if (_binlog_compaction_thread_pool->min_threads() != binlog_compaction_threads_num) {
665
0
        int old_min_threads = _binlog_compaction_thread_pool->min_threads();
666
0
        Status status =
667
0
                _binlog_compaction_thread_pool->set_min_threads(binlog_compaction_threads_num);
668
0
        if (status.ok()) {
669
0
            VLOG_NOTICE << "update binlog compaction thread pool min_threads from "
670
0
                        << old_min_threads << " to " << binlog_compaction_threads_num;
671
0
        }
672
0
    }
673
674
98.4k
    auto single_replica_compaction_threads_num =
675
98.4k
            get_single_replica_compaction_threads_num(_store_map.size());
676
98.4k
    if (_single_replica_compaction_thread_pool->max_threads() !=
677
98.4k
        single_replica_compaction_threads_num) {
678
0
        int old_max_threads = _single_replica_compaction_thread_pool->max_threads();
679
0
        Status status = _single_replica_compaction_thread_pool->set_max_threads(
680
0
                single_replica_compaction_threads_num);
681
0
        if (status.ok()) {
682
0
            VLOG_NOTICE << "update single replica compaction thread pool max_threads from "
683
0
                        << old_max_threads << " to " << single_replica_compaction_threads_num;
684
0
        }
685
0
    }
686
98.4k
    if (_single_replica_compaction_thread_pool->min_threads() !=
687
98.4k
        single_replica_compaction_threads_num) {
688
0
        int old_min_threads = _single_replica_compaction_thread_pool->min_threads();
689
0
        Status status = _single_replica_compaction_thread_pool->set_min_threads(
690
0
                single_replica_compaction_threads_num);
691
0
        if (status.ok()) {
692
0
            VLOG_NOTICE << "update single replica compaction thread pool min_threads from "
693
0
                        << old_min_threads << " to " << single_replica_compaction_threads_num;
694
0
        }
695
0
    }
696
98.4k
}
697
698
6
void StorageEngine::_compaction_tasks_producer_callback() {
699
6
    LOG(INFO) << "try to start compaction producer process!";
700
701
6
    std::vector<DataDir*> data_dirs = get_stores();
702
703
6
    int round = 0;
704
6
    CompactionType compaction_type;
705
706
    // Used to record the time when the score metric was last updated.
707
    // The update of the score metric is accompanied by the logic of selecting the tablet.
708
    // If there is no slot available, the logic of selecting the tablet will be terminated,
709
    // which causes the score metric update to be terminated.
710
    // In order to avoid this situation, we need to update the score regularly.
711
6
    int64_t last_cumulative_score_update_time = 0;
712
6
    int64_t last_base_score_update_time = 0;
713
6
    static const int64_t check_score_interval_ms = 5000; // 5 secs
714
715
6
    int64_t interval = config::generate_compaction_tasks_interval_ms;
716
9.48k
    do {
717
9.48k
        int64_t cur_time = UnixMillis();
718
9.48k
        if (!config::disable_auto_compaction &&
719
9.48k
            (!config::enable_compaction_pause_on_high_memory ||
720
9.48k
             !GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE))) {
721
9.48k
            _adjust_compaction_thread_num();
722
723
9.48k
            bool check_score = false;
724
9.48k
            if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) {
725
8.54k
                compaction_type = CompactionType::CUMULATIVE_COMPACTION;
726
8.54k
                round++;
727
8.54k
                if (cur_time - last_cumulative_score_update_time >= check_score_interval_ms) {
728
1.48k
                    check_score = true;
729
1.48k
                    last_cumulative_score_update_time = cur_time;
730
1.48k
                }
731
8.54k
            } else {
732
946
                compaction_type = CompactionType::BASE_COMPACTION;
733
946
                round = 0;
734
946
                if (cur_time - last_base_score_update_time >= check_score_interval_ms) {
735
889
                    check_score = true;
736
889
                    last_base_score_update_time = cur_time;
737
889
                }
738
946
            }
739
9.48k
            std::unique_ptr<ThreadPool>& thread_pool =
740
9.48k
                    (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
741
9.48k
                            ? _cumu_compaction_thread_pool
742
9.48k
                            : _base_compaction_thread_pool;
743
9.48k
            bvar::Status<int64_t>& g_compaction_task_num_per_round =
744
9.48k
                    (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
745
9.48k
                            ? g_cumu_compaction_task_num_per_round
746
9.48k
                            : g_base_compaction_task_num_per_round;
747
9.48k
            if (config::compaction_num_per_round != -1) {
748
0
                _compaction_num_per_round = config::compaction_num_per_round;
749
9.48k
            } else if (thread_pool->get_queue_size() == 0) {
750
                // If all tasks in the thread pool queue are executed,
751
                // double the number of tasks generated each time,
752
                // with a maximum of config::max_automatic_compaction_num_per_round tasks per generation.
753
9.18k
                if (_compaction_num_per_round < config::max_automatic_compaction_num_per_round) {
754
36
                    _compaction_num_per_round *= 2;
755
36
                    g_compaction_task_num_per_round.set_value(_compaction_num_per_round);
756
36
                }
757
9.18k
            } else if (thread_pool->get_queue_size() > _compaction_num_per_round / 2) {
758
                // If all tasks in the thread pool is greater than
759
                // half of the tasks submitted in the previous round,
760
                // reduce the number of tasks generated each time by half, with a minimum of 1.
761
0
                if (_compaction_num_per_round > 1) {
762
0
                    _compaction_num_per_round /= 2;
763
0
                    g_compaction_task_num_per_round.set_value(_compaction_num_per_round);
764
0
                }
765
0
            }
766
9.48k
            _update_cumulative_compaction_policy();
767
9.48k
            std::vector<TabletCompactionContext> tablet_compaction_contexts =
768
9.48k
                    _generate_compaction_tasks(compaction_type, data_dirs, check_score);
769
9.48k
            if (tablet_compaction_contexts.empty()) {
770
4.38k
                std::unique_lock<std::mutex> lock(_compaction_producer_sleep_mutex);
771
4.38k
                _wakeup_producer_flag = 0;
772
                // It is necessary to wake up the thread on timeout to prevent deadlock
773
                // in case of no running compaction task.
774
4.38k
                _compaction_producer_sleep_cv.wait_for(
775
4.38k
                        lock, std::chrono::milliseconds(2000),
776
8.78k
                        [this] { return _wakeup_producer_flag == 1; });
777
4.38k
                continue;
778
4.38k
            }
779
780
236k
            for (const auto& tablet_compaction_context : tablet_compaction_contexts) {
781
236k
                const auto& tablet = tablet_compaction_context.tablet;
782
236k
                if (compaction_type == CompactionType::BASE_COMPACTION) {
783
51.3k
                    tablet->set_last_base_compaction_schedule_time(UnixMillis());
784
185k
                } else if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
785
185k
                    tablet->set_last_cumu_compaction_schedule_time(UnixMillis());
786
185k
                } else if (compaction_type == CompactionType::FULL_COMPACTION) {
787
0
                    tablet->set_last_full_compaction_schedule_time(UnixMillis());
788
0
                }
789
236k
                Status st = _submit_compaction_task(tablet, compaction_type, false);
790
236k
                if (!st.ok()) {
791
0
                    LOG(WARNING) << "failed to submit compaction task for tablet: "
792
0
                                 << tablet->tablet_id() << ", err: " << st;
793
0
                }
794
236k
            }
795
5.10k
            interval = config::generate_compaction_tasks_interval_ms;
796
5.10k
        } else {
797
0
            interval = 5000; // 5s to check disable_auto_compaction
798
0
        }
799
800
        // wait some seconds for ut test
801
5.10k
        {
802
5.10k
            std ::vector<std ::any> args {};
803
5.10k
            args.emplace_back(1);
804
5.10k
            doris ::SyncPoint ::get_instance()->process(
805
5.10k
                    "StorageEngine::_compaction_tasks_producer_callback", std ::move(args));
806
5.10k
        }
807
5.10k
        int64_t end_time = UnixMillis();
808
5.10k
        DorisMetrics::instance()->compaction_producer_callback_a_round_time->set_value(end_time -
809
5.10k
                                                                                       cur_time);
810
9.48k
    } while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
811
6
}
812
813
6
void StorageEngine::_binlog_compaction_tasks_producer_callback() {
814
6
    LOG(INFO) << "try to start binlog compaction producer process!";
815
816
6
    std::vector<DataDir*> data_dirs = get_stores();
817
818
6
    int64_t last_binlog_score_update_time = 0;
819
6
    static const int64_t check_score_interval_ms = 5000;
820
821
6
    int64_t interval = config::generate_compaction_tasks_interval_ms;
822
88.9k
    do {
823
88.9k
        int64_t cur_time = UnixMillis();
824
88.9k
        if (config::enable_feature_binlog && !config::disable_auto_compaction &&
825
88.9k
            (!config::enable_compaction_pause_on_high_memory ||
826
88.9k
             !GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE))) {
827
88.9k
            _adjust_compaction_thread_num();
828
829
88.9k
            bool check_score = false;
830
88.9k
            if (cur_time - last_binlog_score_update_time >= check_score_interval_ms) {
831
1.87k
                check_score = true;
832
1.87k
                last_binlog_score_update_time = cur_time;
833
1.87k
            }
834
835
88.9k
            std::vector<TabletCompactionContext> tablet_compaction_contexts =
836
88.9k
                    _generate_compaction_tasks(CompactionType::BINLOG_COMPACTION, data_dirs,
837
88.9k
                                               check_score);
838
88.9k
            for (const auto& tablet_compaction_context : tablet_compaction_contexts) {
839
492
                const auto& tablet = tablet_compaction_context.tablet;
840
492
                Status st =
841
492
                        _submit_compaction_task(tablet, CompactionType::BINLOG_COMPACTION, false, 0,
842
492
                                                tablet_compaction_context.prefer_compaction_level);
843
492
                if (!st.ok()) {
844
0
                    LOG(WARNING) << "failed to submit binlog compaction task for tablet: "
845
0
                                 << tablet->tablet_id() << ", err: " << st;
846
0
                }
847
492
            }
848
88.9k
            interval = config::generate_compaction_tasks_interval_ms;
849
88.9k
        } else {
850
0
            interval = 5000;
851
0
        }
852
88.9k
    } while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
853
6
}
854
855
6
void StorageEngine::_update_replica_infos_callback() {
856
#ifdef GOOGLE_PROFILER
857
    ProfilerRegisterThread();
858
#endif
859
6
    LOG(INFO) << "start to update replica infos!";
860
861
6
    int64_t interval = config::update_replica_infos_interval_seconds;
862
160
    do {
863
1.39M
        auto all_tablets = _tablet_manager->get_all_tablet([](Tablet* t) {
864
1.39M
            return t->is_used() && t->tablet_state() == TABLET_RUNNING &&
865
1.39M
                   !t->tablet_meta()->tablet_schema()->disable_auto_compaction() &&
866
1.39M
                   t->tablet_meta()->tablet_schema()->enable_single_replica_compaction();
867
1.39M
        });
868
160
        ClusterInfo* cluster_info = ExecEnv::GetInstance()->cluster_info();
869
160
        if (cluster_info == nullptr) {
870
0
            LOG(WARNING) << "Have not get FE Master heartbeat yet";
871
0
            std::this_thread::sleep_for(std::chrono::seconds(2));
872
0
            continue;
873
0
        }
874
160
        TNetworkAddress master_addr = cluster_info->master_fe_addr;
875
160
        if (master_addr.hostname == "" || master_addr.port == 0) {
876
6
            LOG(WARNING) << "Have not get FE Master heartbeat yet";
877
6
            std::this_thread::sleep_for(std::chrono::seconds(2));
878
6
            continue;
879
6
        }
880
881
154
        int start = 0;
882
154
        int tablet_size = cast_set<int>(all_tablets.size());
883
        // The while loop may take a long time, we should skip it when stop
884
154
        while (start < tablet_size && _stop_background_threads_latch.count() > 0) {
885
0
            int batch_size = std::min(100, tablet_size - start);
886
0
            int end = start + batch_size;
887
0
            TGetTabletReplicaInfosRequest request;
888
0
            TGetTabletReplicaInfosResult result;
889
0
            for (int i = start; i < end; i++) {
890
0
                request.tablet_ids.emplace_back(all_tablets[i]->tablet_id());
891
0
            }
892
0
            Status rpc_st = ThriftRpcHelper::rpc<FrontendServiceClient>(
893
0
                    master_addr.hostname, master_addr.port,
894
0
                    [&request, &result](FrontendServiceConnection& client) {
895
0
                        client->getTabletReplicaInfos(result, request);
896
0
                    });
897
898
0
            if (!rpc_st.ok()) {
899
0
                LOG(WARNING) << "Failed to get tablet replica infos, encounter rpc failure, "
900
0
                                "tablet start: "
901
0
                             << start << " end: " << end;
902
0
                continue;
903
0
            }
904
905
0
            std::unique_lock<std::mutex> lock(_peer_replica_infos_mutex);
906
0
            for (const auto& it : result.tablet_replica_infos) {
907
0
                auto tablet_id = it.first;
908
0
                auto tablet = _tablet_manager->get_tablet(tablet_id);
909
0
                if (tablet == nullptr) {
910
0
                    VLOG_CRITICAL << "tablet ptr is nullptr";
911
0
                    continue;
912
0
                }
913
914
0
                VLOG_NOTICE << tablet_id << " tablet has " << it.second.size() << " replicas";
915
0
                uint64_t min_modulo = MOD_PRIME;
916
0
                TReplicaInfo peer_replica;
917
0
                for (const auto& replica : it.second) {
918
0
                    int64_t peer_replica_id = replica.replica_id;
919
0
                    uint64_t modulo = HashUtil::hash64(&peer_replica_id, sizeof(peer_replica_id),
920
0
                                                       DEFAULT_SEED) %
921
0
                                      MOD_PRIME;
922
0
                    if (modulo < min_modulo) {
923
0
                        peer_replica = replica;
924
0
                        min_modulo = modulo;
925
0
                    }
926
0
                }
927
0
                VLOG_NOTICE << "tablet " << tablet_id << ", peer replica host is "
928
0
                            << peer_replica.host;
929
0
                _peer_replica_infos[tablet_id] = peer_replica;
930
0
            }
931
0
            _token = result.token;
932
0
            VLOG_NOTICE << "get tablet replica infos from fe, size is " << end - start
933
0
                        << " token = " << result.token;
934
0
            start = end;
935
0
        }
936
154
        interval = config::update_replica_infos_interval_seconds;
937
160
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
938
6
}
939
940
Status StorageEngine::_submit_single_replica_compaction_task(TabletSharedPtr tablet,
941
0
                                                             CompactionType compaction_type) {
942
    // For single replica compaction, the local version to be merged is determined based on the version fetched from the peer replica.
943
    // Therefore, it is currently not possible to determine whether it should be a base compaction or cumulative compaction.
944
    // As a result, the tablet needs to be pushed to both the _tablet_submitted_cumu_compaction and the _tablet_submitted_base_compaction simultaneously.
945
0
    bool already_exist =
946
0
            _compaction_submit_registry.insert(tablet, CompactionType::CUMULATIVE_COMPACTION);
947
0
    if (already_exist) {
948
0
        return Status::AlreadyExist<false>(
949
0
                "compaction task has already been submitted, tablet_id={}", tablet->tablet_id());
950
0
    }
951
952
0
    already_exist = _compaction_submit_registry.insert(tablet, CompactionType::BASE_COMPACTION);
953
0
    if (already_exist) {
954
0
        _pop_tablet_from_submitted_compaction(tablet, CompactionType::CUMULATIVE_COMPACTION);
955
0
        return Status::AlreadyExist<false>(
956
0
                "compaction task has already been submitted, tablet_id={}", tablet->tablet_id());
957
0
    }
958
959
0
    auto compaction = std::make_shared<SingleReplicaCompaction>(*this, tablet, compaction_type);
960
0
    DorisMetrics::instance()->single_compaction_request_total->increment(1);
961
0
    auto st = compaction->prepare_compact();
962
963
0
    auto clean_single_replica_compaction = [tablet, this]() {
964
0
        _pop_tablet_from_submitted_compaction(tablet, CompactionType::CUMULATIVE_COMPACTION);
965
0
        _pop_tablet_from_submitted_compaction(tablet, CompactionType::BASE_COMPACTION);
966
0
    };
967
968
0
    if (!st.ok()) {
969
0
        clean_single_replica_compaction();
970
0
        if (!st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) {
971
0
            LOG(WARNING) << "failed to prepare single replica compaction, tablet_id="
972
0
                         << tablet->tablet_id() << " : " << st;
973
0
            return st;
974
0
        }
975
0
        return Status::OK(); // No suitable version, regard as OK
976
0
    }
977
978
0
    auto submit_st = _single_replica_compaction_thread_pool->submit_func(
979
0
            [tablet, compaction = std::move(compaction),
980
0
             clean_single_replica_compaction]() mutable {
981
0
                tablet->execute_single_replica_compaction(*compaction);
982
0
                clean_single_replica_compaction();
983
0
            });
984
0
    if (!submit_st.ok()) {
985
0
        clean_single_replica_compaction();
986
0
        return Status::InternalError(
987
0
                "failed to submit single replica compaction task to thread pool, "
988
0
                "tablet_id={}",
989
0
                tablet->tablet_id());
990
0
    }
991
0
    return Status::OK();
992
0
}
993
994
void StorageEngine::get_tablet_rowset_versions(const PGetTabletVersionsRequest* request,
995
0
                                               PGetTabletVersionsResponse* response) {
996
0
    TabletSharedPtr tablet = _tablet_manager->get_tablet(request->tablet_id());
997
0
    if (tablet == nullptr) {
998
0
        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
999
0
        return;
1000
0
    }
1001
0
    std::vector<Version> local_versions = tablet->get_all_local_versions();
1002
0
    for (const auto& local_version : local_versions) {
1003
0
        auto version = response->add_versions();
1004
0
        version->set_first(local_version.first);
1005
0
        version->set_second(local_version.second);
1006
0
    }
1007
0
    response->mutable_status()->set_status_code(0);
1008
0
}
1009
1010
bool need_generate_compaction_tasks(int task_cnt_per_disk, int thread_per_disk,
1011
213k
                                    CompactionType compaction_type, bool all_base) {
1012
213k
    if (compaction_type == CompactionType::BINLOG_COMPACTION) {
1013
191k
        return task_cnt_per_disk < thread_per_disk;
1014
191k
    }
1015
1016
    // We need to reserve at least one Slot for cumulative compaction.
1017
    // So when there is only one Slot, we have to judge whether there is a cumulative compaction
1018
    // in the current submitted tasks.
1019
    // If so, the last Slot can be assigned to Base compaction,
1020
    // otherwise, this Slot needs to be reserved for cumulative compaction.
1021
22.1k
    if (task_cnt_per_disk >= thread_per_disk) {
1022
        // Return if no available slot
1023
0
        return false;
1024
22.1k
    } else if (task_cnt_per_disk >= thread_per_disk - 1) {
1025
        // Only one slot left, check if it can be assigned to base compaction task.
1026
0
        if (compaction_type == CompactionType::BASE_COMPACTION) {
1027
0
            if (all_base) {
1028
0
                return false;
1029
0
            }
1030
0
        }
1031
0
    }
1032
22.1k
    return true;
1033
22.1k
}
1034
1035
106k
int get_concurrent_per_disk(int max_score, int thread_per_disk) {
1036
106k
    if (!config::enable_compaction_priority_scheduling) {
1037
0
        return thread_per_disk;
1038
0
    }
1039
1040
106k
    double load_average = 0;
1041
106k
    if (DorisMetrics::instance()->system_metrics() != nullptr) {
1042
0
        load_average = DorisMetrics::instance()->system_metrics()->get_load_average_1_min();
1043
0
    }
1044
106k
    int num_cores = doris::CpuInfo::num_cores();
1045
106k
    bool cpu_usage_high = load_average > num_cores * 0.8;
1046
1047
106k
    auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage();
1048
106k
    bool memory_usage_high = static_cast<double>(process_memory_usage) >
1049
106k
                             static_cast<double>(MemInfo::soft_mem_limit()) * 0.8;
1050
1051
106k
    if (max_score <= config::low_priority_compaction_score_threshold &&
1052
106k
        (cpu_usage_high || memory_usage_high)) {
1053
0
        return config::low_priority_compaction_task_num_per_disk;
1054
0
    }
1055
1056
106k
    return thread_per_disk;
1057
106k
}
1058
1059
213k
int32_t disk_compaction_slot_num(const DataDir& data_dir, CompactionType compaction_type) {
1060
213k
    if (compaction_type == CompactionType::BINLOG_COMPACTION) {
1061
191k
        return config::binlog_compaction_task_num_per_disk;
1062
191k
    }
1063
22.1k
    return data_dir.is_ssd_disk() ? config::compaction_task_num_per_fast_disk
1064
22.1k
                                  : config::compaction_task_num_per_disk;
1065
213k
}
1066
1067
bool has_free_compaction_slot(CompactionSubmitRegistry* registry, DataDir* dir,
1068
106k
                              CompactionType compaction_type, uint32_t executing_cnt) {
1069
106k
    int32_t thread_per_disk = disk_compaction_slot_num(*dir, compaction_type);
1070
106k
    return need_generate_compaction_tasks(
1071
106k
            executing_cnt, thread_per_disk, compaction_type,
1072
106k
            !registry->has_compaction_task(dir, CompactionType::CUMULATIVE_COMPACTION));
1073
106k
}
1074
1075
std::vector<TabletCompactionContext> StorageEngine::_generate_compaction_tasks(
1076
98.4k
        CompactionType compaction_type, std::vector<DataDir*>& data_dirs, bool check_score) {
1077
98.4k
    TEST_SYNC_POINT_RETURN_WITH_VALUE("olap_server::_generate_compaction_tasks.return_empty",
1078
98.4k
                                      std::vector<TabletCompactionContext> {});
1079
98.4k
    _update_cumulative_compaction_policy();
1080
98.4k
    auto cumulative_compaction_policies = _snapshot_cumulative_compaction_policy();
1081
98.4k
    std::vector<TabletCompactionContext> tablet_compaction_contexts;
1082
98.4k
    uint32_t max_compaction_score = 0;
1083
1084
98.4k
    std::random_device rd;
1085
98.4k
    std::mt19937 g(rd());
1086
98.4k
    std::shuffle(data_dirs.begin(), data_dirs.end(), g);
1087
1088
    // Copy _tablet_submitted_xxx_compaction map so that we don't need to hold _tablet_submitted_compaction_mutex
1089
    // when traversing the data dir
1090
98.4k
    auto compaction_registry_snapshot = _compaction_submit_registry.create_snapshot();
1091
106k
    for (auto* data_dir : data_dirs) {
1092
106k
        bool need_pick_tablet = true;
1093
106k
        uint32_t executing_task_num =
1094
106k
                compaction_type == CompactionType::BINLOG_COMPACTION
1095
106k
                        ? compaction_registry_snapshot.count_executing_compaction(
1096
95.7k
                                  data_dir, CompactionType::BINLOG_COMPACTION)
1097
106k
                        : compaction_registry_snapshot.count_executing_cumu_and_base(data_dir);
1098
106k
        need_pick_tablet = has_free_compaction_slot(&compaction_registry_snapshot, data_dir,
1099
106k
                                                    compaction_type, executing_task_num);
1100
106k
        if (!need_pick_tablet && !check_score) {
1101
0
            continue;
1102
0
        }
1103
1104
        // Even if need_pick_tablet is false, we still need to call find_best_tablet_to_compaction(),
1105
        // So that we can update the max_compaction_score metric.
1106
106k
        if (!data_dir->reach_capacity_limit(0)) {
1107
106k
            uint32_t disk_max_score = 0;
1108
106k
            auto tablet_contexts = compaction_registry_snapshot.pick_topn_tablets_for_compaction(
1109
106k
                    _tablet_manager.get(), data_dir, compaction_type,
1110
106k
                    cumulative_compaction_policies, &disk_max_score);
1111
106k
            int concurrent_num = get_concurrent_per_disk(
1112
106k
                    disk_max_score, disk_compaction_slot_num(*data_dir, compaction_type));
1113
106k
            need_pick_tablet = need_generate_compaction_tasks(
1114
106k
                    executing_task_num, concurrent_num, compaction_type,
1115
106k
                    !compaction_registry_snapshot.has_compaction_task(
1116
106k
                            data_dir, CompactionType::CUMULATIVE_COMPACTION));
1117
237k
            for (const auto& context : tablet_contexts) {
1118
237k
                if (context.tablet != nullptr) {
1119
237k
                    if (need_pick_tablet) {
1120
237k
                        tablet_compaction_contexts.emplace_back(context);
1121
237k
                    }
1122
237k
                    max_compaction_score = std::max(max_compaction_score, disk_max_score);
1123
237k
                }
1124
237k
            }
1125
106k
        }
1126
106k
    }
1127
1128
98.4k
    if (max_compaction_score > 0) {
1129
5.17k
        if (compaction_type == CompactionType::BASE_COMPACTION) {
1130
942
            DorisMetrics::instance()->tablet_base_max_compaction_score->set_value(
1131
942
                    max_compaction_score);
1132
4.22k
        } else if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
1133
4.16k
            DorisMetrics::instance()->tablet_cumulative_max_compaction_score->set_value(
1134
4.16k
                    max_compaction_score);
1135
4.16k
        } else if (compaction_type == CompactionType::BINLOG_COMPACTION) {
1136
69
            DorisMetrics::instance()->tablet_binlog_max_compaction_score->set_value(
1137
69
                    max_compaction_score);
1138
69
        }
1139
5.17k
    }
1140
98.4k
    return tablet_compaction_contexts;
1141
98.4k
}
1142
1143
107k
void StorageEngine::_update_cumulative_compaction_policy() {
1144
107k
    std::lock_guard<std::mutex> lock(_cumulative_compaction_policy_mtx);
1145
107k
    if (_cumulative_compaction_policies.empty()) {
1146
6
        _cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY] =
1147
6
                CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
1148
6
                        CUMULATIVE_SIZE_BASED_POLICY);
1149
6
        _cumulative_compaction_policies[CUMULATIVE_TIME_SERIES_POLICY] =
1150
6
                CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
1151
6
                        CUMULATIVE_TIME_SERIES_POLICY);
1152
6
    }
1153
107k
}
1154
1155
98.4k
CumuCompactionPolicyTable StorageEngine::_snapshot_cumulative_compaction_policy() {
1156
98.4k
    std::lock_guard<std::mutex> lock(_cumulative_compaction_policy_mtx);
1157
98.4k
    return _cumulative_compaction_policies;
1158
98.4k
}
1159
1160
std::shared_ptr<CumulativeCompactionPolicy> StorageEngine::_get_cumulative_compaction_policy(
1161
0
        std::string_view compaction_policy) {
1162
0
    std::lock_guard<std::mutex> lock(_cumulative_compaction_policy_mtx);
1163
0
    return _cumulative_compaction_policies.at(compaction_policy);
1164
0
}
1165
1166
void StorageEngine::_pop_tablet_from_submitted_compaction(TabletSharedPtr tablet,
1167
237k
                                                          CompactionType compaction_type) {
1168
237k
    _compaction_submit_registry.remove(tablet, compaction_type, [this]() {
1169
237k
        std::unique_lock<std::mutex> lock(_compaction_producer_sleep_mutex);
1170
237k
        _wakeup_producer_flag = 1;
1171
237k
        _compaction_producer_sleep_cv.notify_one();
1172
237k
    });
1173
237k
}
1174
1175
Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
1176
                                              CompactionType compaction_type, bool force,
1177
237k
                                              int trigger_method, int8_t prefer_compaction_level) {
1178
237k
    if (tablet->tablet_meta()->tablet_schema()->enable_single_replica_compaction() &&
1179
237k
        should_fetch_from_peer(tablet->tablet_id()) &&
1180
237k
        compaction_type != CompactionType::BINLOG_COMPACTION) {
1181
0
        VLOG_CRITICAL << "start to submit single replica compaction task for tablet: "
1182
0
                      << tablet->tablet_id();
1183
0
        Status st = _submit_single_replica_compaction_task(tablet, compaction_type);
1184
0
        if (!st.ok()) {
1185
0
            LOG(WARNING) << "failed to submit single replica compaction task for tablet: "
1186
0
                         << tablet->tablet_id() << ", err: " << st;
1187
0
        }
1188
1189
0
        return Status::OK();
1190
0
    }
1191
237k
    bool already_exist = _compaction_submit_registry.insert(tablet, compaction_type);
1192
237k
    if (already_exist) {
1193
0
        return Status::AlreadyExist<false>(
1194
0
                "compaction task has already been submitted, tablet_id={}, compaction_type={}.",
1195
0
                tablet->tablet_id(), compaction_type);
1196
0
    }
1197
237k
    tablet->compaction_stage = CompactionStage::PENDING;
1198
237k
    std::shared_ptr<CompactionMixin> compaction;
1199
237k
    int64_t permits = 0;
1200
237k
    Status st = Tablet::prepare_compaction_and_calculate_permits(
1201
237k
            compaction_type, tablet, compaction, permits, prefer_compaction_level);
1202
237k
    if (st.ok() && permits > 0) {
1203
3.57k
        if (!force) {
1204
3.57k
            if (compaction_type == CompactionType::BINLOG_COMPACTION) {
1205
0
                if (!_permit_limiter.try_request(permits, compaction_type)) {
1206
0
                    _pop_tablet_from_submitted_compaction(tablet, compaction_type);
1207
0
                    tablet->compaction_stage = CompactionStage::NOT_SCHEDULED;
1208
0
                    return Status::OK();
1209
0
                }
1210
3.57k
            } else {
1211
3.57k
                _permit_limiter.request(permits);
1212
3.57k
            }
1213
3.57k
        }
1214
        // Register task with CompactionTaskTracker as PENDING
1215
3.57k
        auto* tracker = CompactionTaskTracker::instance();
1216
3.57k
        int64_t compaction_id = compaction->compaction_id();
1217
3.57k
        {
1218
3.57k
            CompactionTaskInfo info;
1219
3.57k
            info.compaction_id = compaction_id;
1220
3.57k
            info.tablet_id = tablet->tablet_id();
1221
3.57k
            info.table_id = tablet->get_table_id();
1222
3.57k
            info.partition_id = tablet->partition_id();
1223
3.57k
            switch (compaction_type) {
1224
2
            case CompactionType::BASE_COMPACTION:
1225
2
                info.compaction_type = CompactionProfileType::BASE;
1226
2
                break;
1227
3.56k
            case CompactionType::CUMULATIVE_COMPACTION:
1228
3.56k
                info.compaction_type = CompactionProfileType::CUMULATIVE;
1229
3.56k
                break;
1230
0
            case CompactionType::FULL_COMPACTION:
1231
0
                info.compaction_type = CompactionProfileType::FULL;
1232
0
                break;
1233
0
            case CompactionType::BINLOG_COMPACTION:
1234
0
                info.compaction_type = CompactionProfileType::BINLOG;
1235
0
                break;
1236
0
            default:
1237
0
                DCHECK(false) << "invalid compaction type: " << compaction_type;
1238
3.57k
            }
1239
3.57k
            info.status = CompactionTaskStatus::PENDING;
1240
3.57k
            info.trigger_method = static_cast<TriggerMethod>(trigger_method);
1241
3.57k
            info.scheduled_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
1242
3.57k
                                             std::chrono::system_clock::now().time_since_epoch())
1243
3.57k
                                             .count();
1244
3.57k
            info.permits = permits;
1245
3.57k
            info.backend_id = BackendOptions::get_backend_id();
1246
3.57k
            info.compaction_score = tablet->get_real_compaction_score();
1247
3.57k
            info.input_rowsets_count = compaction->input_rowsets_count();
1248
3.57k
            info.input_row_num = compaction->input_row_num_value();
1249
3.57k
            info.input_data_size = compaction->input_rowsets_data_size();
1250
3.57k
            info.input_index_size = compaction->input_rowsets_index_size();
1251
3.57k
            info.input_total_size = compaction->input_rowsets_total_size();
1252
3.57k
            info.input_segments_num = compaction->input_segments_num_value();
1253
3.57k
            info.input_version_range = compaction->input_version_range_str();
1254
3.57k
            info.is_vertical = compaction->is_vertical();
1255
3.57k
            tracker->register_task(std::move(info));
1256
3.57k
        }
1257
0
        std::unique_ptr<ThreadPool>* thread_pool = nullptr;
1258
3.57k
        const char* compaction_type_name = "UNKNOWN";
1259
3.57k
        switch (compaction_type) {
1260
3.56k
        case CompactionType::CUMULATIVE_COMPACTION:
1261
3.56k
            thread_pool = &_cumu_compaction_thread_pool;
1262
3.56k
            compaction_type_name = "CUMU";
1263
3.56k
            break;
1264
0
        case CompactionType::BINLOG_COMPACTION:
1265
0
            thread_pool = &_binlog_compaction_thread_pool;
1266
0
            compaction_type_name = "BINLOG";
1267
0
            break;
1268
2
        case CompactionType::BASE_COMPACTION:
1269
2
        case CompactionType::FULL_COMPACTION:
1270
2
            thread_pool = &_base_compaction_thread_pool;
1271
2
            compaction_type_name = "BASE";
1272
2
            break;
1273
0
        default:
1274
0
            DCHECK(false) << "invalid compaction type: " << compaction_type;
1275
3.57k
        }
1276
3.57k
        VLOG_CRITICAL << "compaction thread pool. type: " << compaction_type_name
1277
0
                      << ", num_threads: " << thread_pool->get()->num_threads()
1278
0
                      << ", num_threads_pending_start: "
1279
0
                      << thread_pool->get()->num_threads_pending_start()
1280
0
                      << ", num_active_threads: " << thread_pool->get()->num_active_threads()
1281
0
                      << ", max_threads: " << thread_pool->get()->max_threads()
1282
0
                      << ", min_threads: " << thread_pool->get()->min_threads()
1283
0
                      << ", num_total_queued_tasks: " << thread_pool->get()->get_queue_size();
1284
3.57k
        auto status =
1285
3.57k
                thread_pool->get()->submit_func([=, compaction = std::move(compaction), this]() {
1286
3.57k
                    _handle_compaction(std::move(tablet), std::move(compaction), compaction_type,
1287
3.57k
                                       permits, force, compaction_id);
1288
3.57k
                });
1289
3.57k
        if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) [[likely]] {
1290
3.56k
            DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
1291
3.56k
                    _cumu_compaction_thread_pool->get_queue_size());
1292
3.56k
        } else if (compaction_type == CompactionType::BASE_COMPACTION) {
1293
2
            DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
1294
2
                    _base_compaction_thread_pool->get_queue_size());
1295
2
        }
1296
3.57k
        if (!status.ok()) {
1297
            // Cleanup tracker on submit failure
1298
0
            tracker->remove_task(compaction_id);
1299
0
            if (!force) {
1300
0
                _permit_limiter.release(permits, compaction_type);
1301
0
            }
1302
0
            _pop_tablet_from_submitted_compaction(tablet, compaction_type);
1303
0
            tablet->compaction_stage = CompactionStage::NOT_SCHEDULED;
1304
0
            return Status::InternalError(
1305
0
                    "failed to submit compaction task to thread pool, "
1306
0
                    "tablet_id={}, compaction_type={}.",
1307
0
                    tablet->tablet_id(), compaction_type);
1308
0
        }
1309
3.57k
        return Status::OK();
1310
233k
    } else {
1311
233k
        _pop_tablet_from_submitted_compaction(tablet, compaction_type);
1312
233k
        tablet->compaction_stage = CompactionStage::NOT_SCHEDULED;
1313
233k
        if (!st.ok()) {
1314
0
            return Status::InternalError(
1315
0
                    "failed to prepare compaction task and calculate permits, "
1316
0
                    "tablet_id={}, compaction_type={}, "
1317
0
                    "permit={}, current_permit={}, status={}",
1318
0
                    tablet->tablet_id(), compaction_type, permits, _permit_limiter.usage(),
1319
0
                    st.to_string());
1320
0
        }
1321
233k
        return st;
1322
233k
    }
1323
237k
}
1324
1325
void StorageEngine::_handle_compaction(TabletSharedPtr tablet,
1326
                                       std::shared_ptr<CompactionMixin> compaction,
1327
                                       CompactionType compaction_type, int64_t permits, bool force,
1328
3.57k
                                       int64_t compaction_id) {
1329
3.57k
    if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) [[likely]] {
1330
3.56k
        DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(1);
1331
3.56k
        DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
1332
3.56k
                _cumu_compaction_thread_pool->get_queue_size());
1333
3.56k
    } else if (compaction_type == CompactionType::BASE_COMPACTION) {
1334
2
        DorisMetrics::instance()->base_compaction_task_running_total->increment(1);
1335
2
        DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
1336
2
                _base_compaction_thread_pool->get_queue_size());
1337
2
    } else if (compaction_type == CompactionType::BINLOG_COMPACTION) {
1338
0
        DorisMetrics::instance()->binlog_compaction_task_running_total->increment(1);
1339
0
        DorisMetrics::instance()->binlog_compaction_task_pending_total->set_value(
1340
0
                _binlog_compaction_thread_pool->get_queue_size());
1341
0
    }
1342
3.57k
    bool is_large_task = true;
1343
3.57k
    Defer defer {[&]() {
1344
3.56k
        DBUG_EXECUTE_IF("StorageEngine._submit_compaction_task.sleep", { sleep(5); })
1345
        // Idempotent cleanup: remove task from tracker
1346
3.56k
        CompactionTaskTracker::instance()->remove_task(compaction_id);
1347
3.57k
        if (!force) {
1348
3.57k
            _permit_limiter.release(permits, compaction_type);
1349
3.57k
        }
1350
3.56k
        _pop_tablet_from_submitted_compaction(tablet, compaction_type);
1351
3.56k
        tablet->compaction_stage = CompactionStage::NOT_SCHEDULED;
1352
3.56k
        if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
1353
3.56k
            std::lock_guard<std::mutex> lock(_cumu_compaction_delay_mtx);
1354
3.56k
            _cumu_compaction_thread_pool_used_threads--;
1355
3.56k
            if (!is_large_task) {
1356
2
                _cumu_compaction_thread_pool_small_tasks_running--;
1357
2
            }
1358
3.56k
            DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(-1);
1359
3.56k
            DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
1360
3.56k
                    _cumu_compaction_thread_pool->get_queue_size());
1361
3.56k
        } else if (compaction_type == CompactionType::BASE_COMPACTION) {
1362
2
            DorisMetrics::instance()->base_compaction_task_running_total->increment(-1);
1363
2
            DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
1364
2
                    _base_compaction_thread_pool->get_queue_size());
1365
18.4E
        } else if (compaction_type == CompactionType::BINLOG_COMPACTION) {
1366
0
            DorisMetrics::instance()->binlog_compaction_task_running_total->increment(-1);
1367
0
            DorisMetrics::instance()->binlog_compaction_task_pending_total->set_value(
1368
0
                    _binlog_compaction_thread_pool->get_queue_size());
1369
0
        }
1370
3.56k
    }};
1371
3.57k
    do {
1372
3.57k
        if (compaction->compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) {
1373
3.56k
            std::lock_guard<std::mutex> lock(_cumu_compaction_delay_mtx);
1374
3.56k
            _cumu_compaction_thread_pool_used_threads++;
1375
3.56k
            if (config::large_cumu_compaction_task_min_thread_num > 1 &&
1376
3.56k
                _cumu_compaction_thread_pool->max_threads() >=
1377
44
                        config::large_cumu_compaction_task_min_thread_num) {
1378
                // Determine if this is a large task based on configured thresholds
1379
2
                is_large_task = (compaction->calc_input_rowsets_total_size() >
1380
2
                                         config::large_cumu_compaction_task_bytes_threshold ||
1381
2
                                 compaction->calc_input_rowsets_row_num() >
1382
2
                                         config::large_cumu_compaction_task_row_num_threshold);
1383
1384
                // Small task. No delay needed
1385
2
                if (!is_large_task) {
1386
2
                    _cumu_compaction_thread_pool_small_tasks_running++;
1387
2
                    break;
1388
2
                }
1389
                // Deal with large task
1390
0
                if (_should_delay_large_task()) {
1391
0
                    LOG_WARNING(
1392
0
                            "failed to do CumulativeCompaction, cumu thread pool is "
1393
0
                            "intensive, delay large task.")
1394
0
                            .tag("tablet_id", tablet->tablet_id())
1395
0
                            .tag("input_rows", compaction->calc_input_rowsets_row_num())
1396
0
                            .tag("input_rowsets_total_size",
1397
0
                                 compaction->calc_input_rowsets_total_size())
1398
0
                            .tag("config::large_cumu_compaction_task_bytes_threshold",
1399
0
                                 config::large_cumu_compaction_task_bytes_threshold)
1400
0
                            .tag("config::large_cumu_compaction_task_row_num_threshold",
1401
0
                                 config::large_cumu_compaction_task_row_num_threshold)
1402
0
                            .tag("remaining threads", _cumu_compaction_thread_pool_used_threads)
1403
0
                            .tag("small_tasks_running",
1404
0
                                 _cumu_compaction_thread_pool_small_tasks_running);
1405
                    // Delay this task and sleep 5s for this tablet
1406
0
                    long now = duration_cast<std::chrono::milliseconds>(
1407
0
                                       std::chrono::system_clock::now().time_since_epoch())
1408
0
                                       .count();
1409
0
                    tablet->set_last_cumu_compaction_failure_time(now);
1410
0
                    return;
1411
0
                }
1412
0
            }
1413
3.56k
        }
1414
3.57k
    } while (false);
1415
3.57k
    if (!tablet->can_do_compaction(tablet->data_dir()->path_hash(), compaction_type)) {
1416
0
        LOG(INFO) << "Tablet state has been changed, no need to begin this compaction "
1417
0
                     "task, tablet_id="
1418
0
                  << tablet->tablet_id() << ", tablet_state=" << tablet->tablet_state();
1419
0
        return;
1420
0
    }
1421
3.57k
    tablet->compaction_stage = CompactionStage::EXECUTING;
1422
    // Update tracker to RUNNING
1423
3.57k
    {
1424
3.57k
        RunningStats rs;
1425
3.57k
        rs.start_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
1426
3.57k
                                   std::chrono::system_clock::now().time_since_epoch())
1427
3.57k
                                   .count();
1428
3.57k
        rs.permits = permits;
1429
3.57k
        CompactionTaskTracker::instance()->update_to_running(compaction_id, rs);
1430
3.57k
    }
1431
3.57k
    TEST_SYNC_POINT_RETURN_WITH_VOID("olap_server::execute_compaction");
1432
3.57k
    tablet->execute_compaction(*compaction);
1433
3.57k
}
1434
1435
Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type,
1436
0
                                             bool force, bool eager, int trigger_method) {
1437
0
    if (!eager) {
1438
0
        DCHECK(compaction_type == CompactionType::BASE_COMPACTION ||
1439
0
               compaction_type == CompactionType::CUMULATIVE_COMPACTION);
1440
0
        auto compaction_registry_snapshot = _compaction_submit_registry.create_snapshot();
1441
0
        auto stores = get_stores();
1442
1443
0
        bool is_busy = std::none_of(
1444
0
                stores.begin(), stores.end(),
1445
0
                [&compaction_registry_snapshot, compaction_type](auto* data_dir) {
1446
0
                    return has_free_compaction_slot(
1447
0
                            &compaction_registry_snapshot, data_dir, compaction_type,
1448
0
                            compaction_registry_snapshot.count_executing_cumu_and_base(data_dir));
1449
0
                });
1450
0
        if (is_busy) {
1451
0
            LOG_EVERY_N(WARNING, 100)
1452
0
                    << "Too busy to submit a compaction task, tablet=" << tablet->get_table_id();
1453
0
            return Status::OK();
1454
0
        }
1455
0
    }
1456
0
    _update_cumulative_compaction_policy();
1457
    // alter table tableName set ("compaction_policy"="time_series")
1458
    // if atler table's compaction  policy, we need to modify tablet compaction policy shared ptr
1459
0
    if (tablet->get_cumulative_compaction_policy() == nullptr ||
1460
0
        tablet->get_cumulative_compaction_policy()->name() !=
1461
0
                tablet->tablet_meta()->compaction_policy()) {
1462
0
        tablet->set_cumulative_compaction_policy(
1463
0
                _get_cumulative_compaction_policy(tablet->tablet_meta()->compaction_policy()));
1464
0
    }
1465
0
    tablet->set_skip_compaction(false);
1466
0
    int8_t prefer_compaction_level = -1;
1467
0
    if (compaction_type == CompactionType::BINLOG_COMPACTION) {
1468
0
        tablet->calc_compaction_score(compaction_type, &prefer_compaction_level);
1469
0
        if (prefer_compaction_level < 0) {
1470
0
            return Status::Error<ErrorCode::BINLOG_COMPACTION_NO_SUITABLE_VERSION>(
1471
0
                    "failed to init binlog compaction due to no suitable version");
1472
0
        }
1473
0
    }
1474
0
    return _submit_compaction_task(tablet, compaction_type, force, trigger_method,
1475
0
                                   prefer_compaction_level);
1476
0
}
1477
1478
Status StorageEngine::_handle_seg_compaction(std::shared_ptr<SegcompactionWorker> worker,
1479
                                             SegCompactionCandidatesSharedPtr segments,
1480
11
                                             uint64_t submission_time) {
1481
    // note: be aware that worker->_writer maybe released when the task is cancelled
1482
11
    uint64_t exec_queue_time = GetCurrentTimeMicros() - submission_time;
1483
11
    LOG(INFO) << "segcompaction thread pool queue time(ms): " << exec_queue_time / 1000;
1484
11
    worker->compact_segments(segments);
1485
    // return OK here. error will be reported via BetaRowsetWriter::_segcompaction_status
1486
11
    return Status::OK();
1487
11
}
1488
1489
Status StorageEngine::submit_seg_compaction_task(std::shared_ptr<SegcompactionWorker> worker,
1490
11
                                                 SegCompactionCandidatesSharedPtr segments) {
1491
11
    uint64_t submission_time = GetCurrentTimeMicros();
1492
11
    return _seg_compaction_thread_pool->submit_func([this, worker, segments, submission_time] {
1493
11
        static_cast<void>(_handle_seg_compaction(worker, segments, submission_time));
1494
11
    });
1495
11
}
1496
1497
0
Status StorageEngine::process_index_change_task(const TAlterInvertedIndexReq& request) {
1498
0
    auto tablet_id = request.tablet_id;
1499
0
    TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id);
1500
0
    DBUG_EXECUTE_IF("StorageEngine::process_index_change_task_tablet_nullptr",
1501
0
                    { tablet = nullptr; })
1502
0
    if (tablet == nullptr) {
1503
0
        LOG(WARNING) << "tablet: " << tablet_id << " not exist";
1504
0
        return Status::InternalError("tablet not exist, tablet_id={}.", tablet_id);
1505
0
    }
1506
1507
0
    IndexBuilderSharedPtr index_builder = std::make_shared<IndexBuilder>(
1508
0
            *this, tablet, request.columns, request.alter_inverted_indexes, request.is_drop_op);
1509
0
    RETURN_IF_ERROR(_handle_index_change(index_builder));
1510
0
    return Status::OK();
1511
0
}
1512
1513
0
Status StorageEngine::_handle_index_change(IndexBuilderSharedPtr index_builder) {
1514
0
    RETURN_IF_ERROR(index_builder->init());
1515
0
    RETURN_IF_ERROR(index_builder->do_build_inverted_index());
1516
0
    return Status::OK();
1517
0
}
1518
1519
6
void StorageEngine::_cooldown_tasks_producer_callback() {
1520
6
    int64_t interval = config::generate_cooldown_task_interval_sec;
1521
    // the cooldown replica may be slow to upload it's meta file, so we should wait
1522
    // until it has done uploaded
1523
6
    int64_t skip_failed_interval = interval * 10;
1524
476
    do {
1525
        // these tables are ordered by priority desc
1526
476
        std::vector<TabletSharedPtr> tablets;
1527
476
        std::vector<RowsetSharedPtr> rowsets;
1528
        // TODO(luwei) : a more efficient way to get cooldown tablets
1529
476
        auto cur_time = time(nullptr);
1530
        // we should skip all the tablets which are not running and those pending to do cooldown
1531
        // also tablets once failed to do follow cooldown
1532
476
        auto skip_tablet = [this, skip_failed_interval,
1533
3.89M
                            cur_time](const TabletSharedPtr& tablet) -> bool {
1534
3.89M
            bool is_skip =
1535
3.89M
                    cur_time - tablet->last_failed_follow_cooldown_time() < skip_failed_interval ||
1536
3.89M
                    TABLET_RUNNING != tablet->tablet_state();
1537
3.89M
            if (is_skip) {
1538
0
                return is_skip;
1539
0
            }
1540
3.89M
            std::lock_guard<std::mutex> lock(_running_cooldown_mutex);
1541
3.89M
            return _running_cooldown_tablets.find(tablet->tablet_id()) !=
1542
3.89M
                   _running_cooldown_tablets.end();
1543
3.89M
        };
1544
476
        _tablet_manager->get_cooldown_tablets(&tablets, &rowsets, std::move(skip_tablet));
1545
476
        LOG(INFO) << "cooldown producer get tablet num: " << tablets.size();
1546
476
        int max_priority = cast_set<int>(tablets.size());
1547
476
        int index = 0;
1548
476
        for (const auto& tablet : tablets) {
1549
0
            {
1550
0
                std::lock_guard<std::mutex> lock(_running_cooldown_mutex);
1551
0
                _running_cooldown_tablets.insert(tablet->tablet_id());
1552
0
            }
1553
0
            PriorityThreadPool::Task task;
1554
0
            RowsetSharedPtr rowset = std::move(rowsets[index++]);
1555
0
            task.work_function = [tablet, rowset, task_size = tablets.size(), this]() {
1556
0
                Status st = tablet->cooldown(rowset);
1557
0
                {
1558
0
                    std::lock_guard<std::mutex> lock(_running_cooldown_mutex);
1559
0
                    _running_cooldown_tablets.erase(tablet->tablet_id());
1560
0
                }
1561
0
                if (!st.ok()) {
1562
0
                    LOG(WARNING) << "failed to cooldown, tablet: " << tablet->tablet_id()
1563
0
                                 << " err: " << st;
1564
0
                } else {
1565
0
                    LOG(INFO) << "succeed to cooldown, tablet: " << tablet->tablet_id()
1566
0
                              << " cooldown progress ("
1567
0
                              << task_size - _cooldown_thread_pool->get_queue_size() << "/"
1568
0
                              << task_size << ")";
1569
0
                }
1570
0
            };
1571
0
            task.priority = max_priority--;
1572
0
            bool submited = _cooldown_thread_pool->offer(std::move(task));
1573
1574
0
            if (!submited) {
1575
0
                LOG(INFO) << "failed to submit cooldown task";
1576
0
            }
1577
0
        }
1578
476
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
1579
6
}
1580
1581
6
void StorageEngine::_remove_unused_remote_files_callback() {
1582
22
    while (!_stop_background_threads_latch.wait_for(
1583
22
            std::chrono::seconds(config::remove_unused_remote_files_interval_sec))) {
1584
16
        LOG(INFO) << "begin to remove unused remote files";
1585
16
        do_remove_unused_remote_files();
1586
16
    }
1587
6
}
1588
1589
static void collect_tablet_unused_remote_files(
1590
        Tablet* t, TConfirmUnusedRemoteFilesRequest& req,
1591
        std::unordered_map<int64_t, std::pair<StorageResource, std::vector<io::FileInfo>>>& buffer,
1592
7
        int64_t& num_files_in_buffer, PendingRowsetSet& pending_remote_rowsets) {
1593
7
    auto storage_resource = get_resource_by_storage_policy_id(t->storage_policy_id());
1594
7
    if (!storage_resource) {
1595
0
        LOG(WARNING) << "encounter error when remove unused remote files, tablet_id="
1596
0
                     << t->tablet_id() << " : " << storage_resource.error();
1597
0
        return;
1598
0
    }
1599
1600
    // TODO(plat1ko): Support path v1
1601
7
    if (storage_resource->path_version > 0) {
1602
0
        return;
1603
0
    }
1604
1605
7
    std::vector<io::FileInfo> files;
1606
    // FIXME(plat1ko): What if user reset resource in storage policy to another resource?
1607
    //  Maybe we should also list files in previously uploaded resources.
1608
7
    bool exists = true;
1609
7
    auto st = storage_resource->fs->list(storage_resource->remote_tablet_path(t->tablet_id()), true,
1610
7
                                         &files, &exists);
1611
7
    if (!st.ok()) {
1612
0
        LOG(WARNING) << "encounter error when remove unused remote files, tablet_id="
1613
0
                     << t->tablet_id() << " : " << st;
1614
0
        return;
1615
0
    }
1616
7
    if (!exists || files.empty()) {
1617
0
        return;
1618
0
    }
1619
    // get all cooldowned rowsets
1620
7
    RowsetIdUnorderedSet cooldowned_rowsets;
1621
7
    UniqueId cooldown_meta_id;
1622
7
    {
1623
7
        std::shared_lock rlock(t->get_header_lock());
1624
28
        for (const auto& [_, rs_meta] : t->tablet_meta()->all_rs_metas()) {
1625
28
            if (!rs_meta->is_local()) {
1626
28
                cooldowned_rowsets.insert(rs_meta->rowset_id());
1627
28
            }
1628
28
        }
1629
7
        if (cooldowned_rowsets.empty()) {
1630
0
            return;
1631
0
        }
1632
7
        cooldown_meta_id = t->tablet_meta()->cooldown_meta_id();
1633
7
    }
1634
0
    auto [cooldown_term, cooldown_replica_id] = t->cooldown_conf();
1635
7
    if (cooldown_replica_id != t->replica_id()) {
1636
0
        return;
1637
0
    }
1638
    // {cooldown_replica_id}.{cooldown_term}.meta
1639
7
    std::string remote_meta_path =
1640
7
            cooldown_tablet_meta_filename(cooldown_replica_id, cooldown_term);
1641
    // filter out the paths that should be reserved
1642
28
    auto filter = [&](io::FileInfo& info) {
1643
28
        std::string_view filename = info.file_name;
1644
28
        if (filename.ends_with(".meta")) {
1645
7
            return filename == remote_meta_path;
1646
7
        }
1647
21
        auto rowset_id = extract_rowset_id(filename);
1648
21
        if (rowset_id.hi == 0) {
1649
0
            return false;
1650
0
        }
1651
21
        return cooldowned_rowsets.contains(rowset_id) || pending_remote_rowsets.contains(rowset_id);
1652
21
    };
1653
7
    files.erase(std::remove_if(files.begin(), files.end(), std::move(filter)), files.end());
1654
7
    if (files.empty()) {
1655
7
        return;
1656
7
    }
1657
0
    files.shrink_to_fit();
1658
0
    num_files_in_buffer += files.size();
1659
0
    buffer.insert({t->tablet_id(), {*storage_resource, std::move(files)}});
1660
0
    auto& info = req.confirm_list.emplace_back();
1661
0
    info.__set_tablet_id(t->tablet_id());
1662
0
    info.__set_cooldown_replica_id(cooldown_replica_id);
1663
0
    info.__set_cooldown_meta_id(cooldown_meta_id.to_thrift());
1664
0
}
1665
1666
static void confirm_and_remove_unused_remote_files(
1667
        const TConfirmUnusedRemoteFilesRequest& req,
1668
        std::unordered_map<int64_t, std::pair<StorageResource, std::vector<io::FileInfo>>>& buffer,
1669
0
        const int64_t num_files_in_buffer) {
1670
0
    TConfirmUnusedRemoteFilesResult result;
1671
0
    LOG(INFO) << "begin to confirm unused remote files. num_tablets=" << buffer.size()
1672
0
              << " num_files=" << num_files_in_buffer;
1673
0
    auto st = MasterServerClient::instance()->confirm_unused_remote_files(req, &result);
1674
0
    if (!st.ok()) {
1675
0
        LOG(WARNING) << st;
1676
0
        return;
1677
0
    }
1678
0
    for (auto id : result.confirmed_tablets) {
1679
0
        if (auto it = buffer.find(id); LIKELY(it != buffer.end())) {
1680
0
            auto& storage_resource = it->second.first;
1681
0
            auto& files = it->second.second;
1682
0
            std::vector<io::Path> paths;
1683
0
            paths.reserve(files.size());
1684
            // delete unused files
1685
0
            LOG(INFO) << "delete unused files. root_path=" << storage_resource.fs->root_path()
1686
0
                      << " tablet_id=" << id;
1687
0
            io::Path dir = storage_resource.remote_tablet_path(id);
1688
0
            for (auto& file : files) {
1689
0
                auto file_path = dir / file.file_name;
1690
0
                LOG(INFO) << "delete unused file: " << file_path.native();
1691
0
                paths.push_back(std::move(file_path));
1692
0
            }
1693
0
            st = storage_resource.fs->batch_delete(paths);
1694
0
            if (!st.ok()) {
1695
0
                LOG(WARNING) << "failed to delete unused files, tablet_id=" << id << " : " << st;
1696
0
            }
1697
0
            buffer.erase(it);
1698
0
        }
1699
0
    }
1700
0
}
1701
1702
16
void StorageEngine::do_remove_unused_remote_files() {
1703
980k
    auto tablets = tablet_manager()->get_all_tablet([](Tablet* t) {
1704
980k
        return t->tablet_meta()->cooldown_meta_id().initialized() && t->is_used() &&
1705
980k
               t->tablet_state() == TABLET_RUNNING &&
1706
980k
               t->cooldown_conf_unlocked().cooldown_replica_id == t->replica_id();
1707
980k
    });
1708
16
    TConfirmUnusedRemoteFilesRequest req;
1709
16
    req.__isset.confirm_list = true;
1710
    // tablet_id -> [storage_resource, unused_remote_files]
1711
16
    using unused_remote_files_buffer_t =
1712
16
            std::unordered_map<int64_t, std::pair<StorageResource, std::vector<io::FileInfo>>>;
1713
16
    unused_remote_files_buffer_t buffer;
1714
16
    int64_t num_files_in_buffer = 0;
1715
    // assume a filename is 0.1KB, buffer size should not larger than 100MB
1716
16
    constexpr int64_t max_files_in_buffer = 1000000;
1717
1718
    // batch confirm to reduce FE's overhead
1719
16
    auto next_confirm_time = std::chrono::steady_clock::now() +
1720
16
                             std::chrono::seconds(config::confirm_unused_remote_files_interval_sec);
1721
16
    for (auto& t : tablets) {
1722
7
        if (t.use_count() <= 1 // this means tablet has been dropped
1723
7
            || t->cooldown_conf_unlocked().cooldown_replica_id != t->replica_id() ||
1724
7
            t->tablet_state() != TABLET_RUNNING) {
1725
0
            continue;
1726
0
        }
1727
7
        collect_tablet_unused_remote_files(t.get(), req, buffer, num_files_in_buffer,
1728
7
                                           _pending_remote_rowsets);
1729
7
        if (num_files_in_buffer > 0 && (num_files_in_buffer > max_files_in_buffer ||
1730
0
                                        std::chrono::steady_clock::now() > next_confirm_time)) {
1731
0
            confirm_and_remove_unused_remote_files(req, buffer, num_files_in_buffer);
1732
0
            buffer.clear();
1733
0
            req.confirm_list.clear();
1734
0
            num_files_in_buffer = 0;
1735
0
            next_confirm_time =
1736
0
                    std::chrono::steady_clock::now() +
1737
0
                    std::chrono::seconds(config::confirm_unused_remote_files_interval_sec);
1738
0
        }
1739
7
    }
1740
16
    if (num_files_in_buffer > 0) {
1741
0
        confirm_and_remove_unused_remote_files(req, buffer, num_files_in_buffer);
1742
0
    }
1743
16
}
1744
1745
6
void StorageEngine::_cold_data_compaction_producer_callback() {
1746
26
    while (!_stop_background_threads_latch.wait_for(
1747
26
            std::chrono::seconds(config::cold_data_compaction_interval_sec))) {
1748
20
        if (config::disable_auto_compaction ||
1749
20
            GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
1750
0
            continue;
1751
0
        }
1752
1753
20
        std::unordered_set<int64_t> copied_tablet_submitted;
1754
20
        {
1755
20
            std::lock_guard lock(_cold_compaction_tablet_submitted_mtx);
1756
20
            copied_tablet_submitted = _cold_compaction_tablet_submitted;
1757
20
        }
1758
20
        int64_t n = config::cold_data_compaction_thread_num - copied_tablet_submitted.size();
1759
20
        if (n <= 0) {
1760
0
            continue;
1761
0
        }
1762
985k
        auto tablets = _tablet_manager->get_all_tablet([&copied_tablet_submitted](Tablet* t) {
1763
985k
            return t->tablet_meta()->cooldown_meta_id().initialized() && t->is_used() &&
1764
985k
                   t->tablet_state() == TABLET_RUNNING &&
1765
985k
                   !copied_tablet_submitted.contains(t->tablet_id()) &&
1766
985k
                   !t->tablet_meta()->tablet_schema()->disable_auto_compaction();
1767
985k
        });
1768
20
        std::vector<std::pair<TabletSharedPtr, int64_t>> tablet_to_compact;
1769
20
        tablet_to_compact.reserve(n + 1);
1770
20
        std::vector<std::pair<TabletSharedPtr, int64_t>> tablet_to_follow;
1771
20
        tablet_to_follow.reserve(n + 1);
1772
1773
20
        for (auto& t : tablets) {
1774
7
            if (t->replica_id() == t->cooldown_conf_unlocked().cooldown_replica_id) {
1775
7
                auto score = t->calc_cold_data_compaction_score();
1776
7
                if (score < config::cold_data_compaction_score_threshold) {
1777
7
                    continue;
1778
7
                }
1779
0
                tablet_to_compact.emplace_back(t, score);
1780
0
                if (tablet_to_compact.size() > n) {
1781
0
                    std::sort(tablet_to_compact.begin(), tablet_to_compact.end(),
1782
0
                              [](auto& a, auto& b) { return a.second > b.second; });
1783
0
                    tablet_to_compact.pop_back();
1784
0
                }
1785
0
                continue;
1786
7
            }
1787
            // else, need to follow
1788
0
            {
1789
0
                std::lock_guard lock(_running_cooldown_mutex);
1790
0
                if (_running_cooldown_tablets.contains(t->table_id())) {
1791
                    // already in cooldown queue
1792
0
                    continue;
1793
0
                }
1794
0
            }
1795
            // TODO(plat1ko): some avoidance strategy if failed to follow
1796
0
            auto score = t->calc_cold_data_compaction_score();
1797
0
            tablet_to_follow.emplace_back(t, score);
1798
1799
0
            if (tablet_to_follow.size() > n) {
1800
0
                std::sort(tablet_to_follow.begin(), tablet_to_follow.end(),
1801
0
                          [](auto& a, auto& b) { return a.second > b.second; });
1802
0
                tablet_to_follow.pop_back();
1803
0
            }
1804
0
        }
1805
1806
20
        for (auto& [tablet, score] : tablet_to_compact) {
1807
0
            LOG(INFO) << "submit cold data compaction. tablet_id=" << tablet->tablet_id()
1808
0
                      << " score=" << score;
1809
0
            static_cast<void>(
1810
0
                    _cold_data_compaction_thread_pool->submit_func([t = std::move(tablet), this]() {
1811
0
                        _handle_cold_data_compaction(std::move(t));
1812
0
                    }));
1813
0
        }
1814
1815
20
        for (auto& [tablet, score] : tablet_to_follow) {
1816
0
            LOG(INFO) << "submit to follow cooldown meta. tablet_id=" << tablet->tablet_id()
1817
0
                      << " score=" << score;
1818
0
            static_cast<void>(_cold_data_compaction_thread_pool->submit_func(
1819
0
                    [t = std::move(tablet), this]() { _follow_cooldown_meta(std::move(t)); }));
1820
0
        }
1821
20
    }
1822
6
}
1823
1824
0
void StorageEngine::_handle_cold_data_compaction(TabletSharedPtr t) {
1825
0
    auto compaction = std::make_shared<ColdDataCompaction>(*this, t);
1826
0
    {
1827
0
        std::lock_guard lock(_cold_compaction_tablet_submitted_mtx);
1828
0
        _cold_compaction_tablet_submitted.insert(t->tablet_id());
1829
0
    }
1830
0
    Defer defer {[&] {
1831
0
        std::lock_guard lock(_cold_compaction_tablet_submitted_mtx);
1832
0
        _cold_compaction_tablet_submitted.erase(t->tablet_id());
1833
0
    }};
1834
0
    std::unique_lock cold_compaction_lock(t->get_cold_compaction_lock(), std::try_to_lock);
1835
0
    if (!cold_compaction_lock.owns_lock()) {
1836
0
        LOG(WARNING) << "try cold_compaction_lock failed, tablet_id=" << t->tablet_id();
1837
0
        return;
1838
0
    }
1839
0
    _update_cumulative_compaction_policy();
1840
0
    if (t->get_cumulative_compaction_policy() == nullptr ||
1841
0
        t->get_cumulative_compaction_policy()->name() != t->tablet_meta()->compaction_policy()) {
1842
0
        t->set_cumulative_compaction_policy(
1843
0
                _get_cumulative_compaction_policy(t->tablet_meta()->compaction_policy()));
1844
0
    }
1845
1846
0
    auto st = compaction->prepare_compact();
1847
0
    if (!st.ok()) {
1848
0
        LOG(WARNING) << "failed to prepare cold data compaction. tablet_id=" << t->tablet_id()
1849
0
                     << " err=" << st;
1850
0
        return;
1851
0
    }
1852
1853
0
    st = compaction->execute_compact();
1854
0
    if (!st.ok()) {
1855
0
        LOG(WARNING) << "failed to execute cold data compaction. tablet_id=" << t->tablet_id()
1856
0
                     << " err=" << st;
1857
0
        return;
1858
0
    }
1859
0
}
1860
1861
0
void StorageEngine::_follow_cooldown_meta(TabletSharedPtr t) {
1862
0
    {
1863
0
        std::lock_guard lock(_cold_compaction_tablet_submitted_mtx);
1864
0
        _cold_compaction_tablet_submitted.insert(t->tablet_id());
1865
0
    }
1866
0
    auto st = t->cooldown();
1867
0
    {
1868
0
        std::lock_guard lock(_cold_compaction_tablet_submitted_mtx);
1869
0
        _cold_compaction_tablet_submitted.erase(t->tablet_id());
1870
0
    }
1871
0
    if (!st.ok()) {
1872
        // The cooldown of the replica may be relatively slow
1873
        // resulting in a short period of time where following cannot be successful
1874
0
        LOG_EVERY_N(WARNING, 5) << "failed to cooldown. tablet_id=" << t->tablet_id()
1875
0
                                << " err=" << st;
1876
0
    }
1877
0
}
1878
1879
void StorageEngine::add_async_publish_task(int64_t partition_id, int64_t tablet_id,
1880
                                           int64_t publish_version, int64_t transaction_id,
1881
2.05k
                                           bool is_recovery, int64_t commit_tso) {
1882
2.05k
    if (!is_recovery) {
1883
2.05k
        bool exists = false;
1884
2.05k
        {
1885
2.05k
            std::shared_lock<std::shared_mutex> rlock(_async_publish_lock);
1886
2.05k
            if (auto tablet_iter = _async_publish_tasks.find(tablet_id);
1887
2.05k
                tablet_iter != _async_publish_tasks.end()) {
1888
2.05k
                if (auto iter = tablet_iter->second.find(publish_version);
1889
2.05k
                    iter != tablet_iter->second.end()) {
1890
20
                    exists = true;
1891
20
                }
1892
2.05k
            }
1893
2.05k
        }
1894
2.05k
        if (exists) {
1895
20
            return;
1896
20
        }
1897
2.03k
        TabletSharedPtr tablet = tablet_manager()->get_tablet(tablet_id);
1898
2.03k
        if (tablet == nullptr) {
1899
0
            LOG(INFO) << "tablet may be dropped when add async publish task, tablet_id: "
1900
0
                      << tablet_id;
1901
0
            return;
1902
0
        }
1903
2.03k
        PendingPublishInfoPB pending_publish_info_pb;
1904
2.03k
        pending_publish_info_pb.set_partition_id(partition_id);
1905
2.03k
        pending_publish_info_pb.set_transaction_id(transaction_id);
1906
2.03k
        pending_publish_info_pb.set_commit_tso(commit_tso);
1907
2.03k
        static_cast<void>(TabletMetaManager::save_pending_publish_info(
1908
2.03k
                tablet->data_dir(), tablet->tablet_id(), publish_version,
1909
2.03k
                pending_publish_info_pb.SerializeAsString()));
1910
2.03k
    }
1911
2.05k
    LOG(INFO) << "add pending publish task, tablet_id: " << tablet_id
1912
2.03k
              << " version: " << publish_version << " txn_id:" << transaction_id
1913
2.03k
              << " is_recovery: " << is_recovery;
1914
2.03k
    std::unique_lock<std::shared_mutex> wlock(_async_publish_lock);
1915
2.03k
    _async_publish_tasks[tablet_id][publish_version] = {transaction_id, partition_id, commit_tso};
1916
2.03k
}
1917
1918
3
int64_t StorageEngine::get_pending_publish_min_version(int64_t tablet_id) {
1919
3
    std::shared_lock<std::shared_mutex> rlock(_async_publish_lock);
1920
3
    auto iter = _async_publish_tasks.find(tablet_id);
1921
3
    if (iter == _async_publish_tasks.end()) {
1922
0
        return INT64_MAX;
1923
0
    }
1924
3
    if (iter->second.empty()) {
1925
0
        return INT64_MAX;
1926
0
    }
1927
3
    return iter->second.begin()->first;
1928
3
}
1929
1930
314k
void StorageEngine::_process_async_publish() {
1931
    // tablet, publish_version
1932
314k
    std::vector<std::pair<TabletSharedPtr, int64_t>> need_removed_tasks;
1933
314k
    {
1934
314k
        std::unique_lock<std::shared_mutex> wlock(_async_publish_lock);
1935
314k
        for (auto tablet_iter = _async_publish_tasks.begin();
1936
314k
             tablet_iter != _async_publish_tasks.end();) {
1937
10
            if (tablet_iter->second.empty()) {
1938
1
                tablet_iter = _async_publish_tasks.erase(tablet_iter);
1939
1
                continue;
1940
1
            }
1941
9
            int64_t tablet_id = tablet_iter->first;
1942
9
            TabletSharedPtr tablet = tablet_manager()->get_tablet(tablet_id);
1943
9
            if (!tablet) {
1944
1
                LOG(WARNING) << "tablet does not exist when async publush, tablet_id: "
1945
1
                             << tablet_id;
1946
1
                tablet_iter = _async_publish_tasks.erase(tablet_iter);
1947
1
                continue;
1948
1
            }
1949
1950
8
            auto task_iter = tablet_iter->second.begin();
1951
8
            int64_t version = task_iter->first;
1952
8
            int64_t transaction_id = std::get<0>(task_iter->second);
1953
8
            int64_t partition_id = std::get<1>(task_iter->second);
1954
8
            int64_t commit_tso = std::get<2>(task_iter->second);
1955
8
            int64_t max_version = tablet->max_version().second;
1956
1957
8
            if (version <= max_version) {
1958
6
                need_removed_tasks.emplace_back(tablet, version);
1959
6
                tablet_iter->second.erase(task_iter);
1960
6
                tablet_iter++;
1961
6
                continue;
1962
6
            }
1963
2
            if (version != max_version + 1) {
1964
1
                int32_t max_version_config = tablet->max_version_config();
1965
                // Keep only the most recent versions
1966
31
                while (tablet_iter->second.size() > max_version_config) {
1967
30
                    need_removed_tasks.emplace_back(tablet, version);
1968
30
                    task_iter = tablet_iter->second.erase(task_iter);
1969
30
                    version = task_iter->first;
1970
30
                }
1971
1
                tablet_iter++;
1972
1
                continue;
1973
1
            }
1974
1975
1
            auto async_publish_task = std::make_shared<AsyncTabletPublishTask>(
1976
1
                    *this, tablet, partition_id, transaction_id, version, commit_tso);
1977
1
            static_cast<void>(_tablet_publish_txn_thread_pool->submit_func(
1978
1
                    [=]() { async_publish_task->handle(); }));
1979
1
            tablet_iter->second.erase(task_iter);
1980
1
            need_removed_tasks.emplace_back(tablet, version);
1981
1
            tablet_iter++;
1982
1
        }
1983
314k
    }
1984
314k
    for (auto& [tablet, publish_version] : need_removed_tasks) {
1985
37
        static_cast<void>(TabletMetaManager::remove_pending_publish_info(
1986
37
                tablet->data_dir(), tablet->tablet_id(), publish_version));
1987
37
    }
1988
314k
}
1989
1990
6
void StorageEngine::_async_publish_callback() {
1991
314k
    while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(30))) {
1992
314k
        _process_async_publish();
1993
314k
    }
1994
6
}
1995
1996
6
void StorageEngine::_check_tablet_delete_bitmap_score_callback() {
1997
6
    LOG(INFO) << "try to start check tablet delete bitmap score!";
1998
34
    while (!_stop_background_threads_latch.wait_for(
1999
34
            std::chrono::seconds(config::check_tablet_delete_bitmap_interval_seconds))) {
2000
28
        if (!config::enable_check_tablet_delete_bitmap_score) {
2001
0
            return;
2002
0
        }
2003
28
        uint64_t max_delete_bitmap_score = 0;
2004
28
        uint64_t max_base_rowset_delete_bitmap_score = 0;
2005
28
        _tablet_manager->get_topn_tablet_delete_bitmap_score(&max_delete_bitmap_score,
2006
28
                                                             &max_base_rowset_delete_bitmap_score);
2007
28
        if (max_delete_bitmap_score > 0) {
2008
18
            _tablet_max_delete_bitmap_score_metrics->set_value(max_delete_bitmap_score);
2009
18
        }
2010
28
        if (max_base_rowset_delete_bitmap_score > 0) {
2011
16
            _tablet_max_base_rowset_delete_bitmap_score_metrics->set_value(
2012
16
                    max_base_rowset_delete_bitmap_score);
2013
16
        }
2014
28
    }
2015
6
}
2016
} // namespace doris