Coverage Report

Created: 2024-11-21 14:31

/root/doris/be/src/olap/olap_server.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include <gen_cpp/Types_types.h>
19
#include <gen_cpp/olap_file.pb.h>
20
#include <stdint.h>
21
22
#include <algorithm>
23
#include <atomic>
24
// IWYU pragma: no_include <bits/chrono.h>
25
#include <chrono> // IWYU pragma: keep
26
#include <cmath>
27
#include <condition_variable>
28
#include <ctime>
29
#include <functional>
30
#include <map>
31
#include <memory>
32
#include <mutex>
33
#include <ostream>
34
#include <random>
35
#include <shared_mutex>
36
#include <string>
37
#include <type_traits>
38
#include <unordered_set>
39
#include <utility>
40
#include <vector>
41
42
#include "agent/utils.h"
43
#include "common/config.h"
44
#include "common/logging.h"
45
#include "common/status.h"
46
#include "common/sync_point.h"
47
#include "gen_cpp/BackendService.h"
48
#include "gen_cpp/FrontendService.h"
49
#include "gen_cpp/Types_constants.h"
50
#include "gen_cpp/internal_service.pb.h"
51
#include "gutil/ref_counted.h"
52
#include "io/fs/file_writer.h" // IWYU pragma: keep
53
#include "io/fs/path.h"
54
#include "olap/base_tablet.h"
55
#include "olap/cold_data_compaction.h"
56
#include "olap/compaction_permit_limiter.h"
57
#include "olap/cumulative_compaction_policy.h"
58
#include "olap/cumulative_compaction_time_series_policy.h"
59
#include "olap/data_dir.h"
60
#include "olap/olap_common.h"
61
#include "olap/rowset/segcompaction.h"
62
#include "olap/schema_change.h"
63
#include "olap/single_replica_compaction.h"
64
#include "olap/storage_engine.h"
65
#include "olap/storage_policy.h"
66
#include "olap/tablet.h"
67
#include "olap/tablet_manager.h"
68
#include "olap/tablet_meta.h"
69
#include "olap/tablet_meta_manager.h"
70
#include "olap/tablet_schema.h"
71
#include "olap/task/engine_publish_version_task.h"
72
#include "olap/task/index_builder.h"
73
#include "runtime/client_cache.h"
74
#include "runtime/memory/cache_manager.h"
75
#include "runtime/memory/global_memory_arbitrator.h"
76
#include "service/brpc.h"
77
#include "service/point_query_executor.h"
78
#include "util/brpc_client_cache.h"
79
#include "util/countdown_latch.h"
80
#include "util/doris_metrics.h"
81
#include "util/mem_info.h"
82
#include "util/thread.h"
83
#include "util/threadpool.h"
84
#include "util/thrift_rpc_helper.h"
85
#include "util/time.h"
86
#include "util/uid_util.h"
87
#include "util/work_thread_pool.hpp"
88
89
using std::string;
90
91
namespace doris {
92
93
using io::Path;
94
95
// number of running SCHEMA-CHANGE threads
96
volatile uint32_t g_schema_change_active_threads = 0;
97
98
static const uint64_t DEFAULT_SEED = 104729;
99
static const uint64_t MOD_PRIME = 7652413;
100
101
55
static int32_t get_cumu_compaction_threads_num(size_t data_dirs_num) {
102
55
    int32_t threads_num = config::max_cumu_compaction_threads;
103
55
    if (threads_num == -1) {
104
55
        threads_num = data_dirs_num;
105
55
    }
106
55
    threads_num = threads_num <= 0 ? 1 : threads_num;
107
55
    return threads_num;
108
55
}
109
110
55
static int32_t get_base_compaction_threads_num(size_t data_dirs_num) {
111
55
    int32_t threads_num = config::max_base_compaction_threads;
112
55
    if (threads_num == -1) {
113
0
        threads_num = data_dirs_num;
114
0
    }
115
55
    threads_num = threads_num <= 0 ? 1 : threads_num;
116
55
    return threads_num;
117
55
}
118
119
55
static int32_t get_single_replica_compaction_threads_num(size_t data_dirs_num) {
120
55
    int32_t threads_num = config::max_single_replica_compaction_threads;
121
55
    if (threads_num == -1) {
122
55
        threads_num = data_dirs_num;
123
55
    }
124
55
    threads_num = threads_num <= 0 ? 1 : threads_num;
125
55
    return threads_num;
126
55
}
127
128
17
Status StorageEngine::start_bg_threads() {
129
17
    RETURN_IF_ERROR(Thread::create(
130
17
            "StorageEngine", "unused_rowset_monitor_thread",
131
17
            [this]() { this->_unused_rowset_monitor_thread_callback(); },
132
17
            &_unused_rowset_monitor_thread));
133
17
    LOG(INFO) << "unused rowset monitor thread started";
134
135
    // start thread for monitoring the snapshot and trash folder
136
17
    RETURN_IF_ERROR(Thread::create(
137
17
            "StorageEngine", "garbage_sweeper_thread",
138
17
            [this]() { this->_garbage_sweeper_thread_callback(); }, &_garbage_sweeper_thread));
139
17
    LOG(INFO) << "garbage sweeper thread started";
140
141
    // start thread for monitoring the tablet with io error
142
17
    RETURN_IF_ERROR(Thread::create(
143
17
            "StorageEngine", "disk_stat_monitor_thread",
144
17
            [this]() { this->_disk_stat_monitor_thread_callback(); }, &_disk_stat_monitor_thread));
145
17
    LOG(INFO) << "disk stat monitor thread started";
146
147
    // convert store map to vector
148
17
    std::vector<DataDir*> data_dirs;
149
18
    for (auto& tmp_store : _store_map) {
150
18
        data_dirs.push_back(tmp_store.second);
151
18
    }
152
153
17
    auto base_compaction_threads = get_base_compaction_threads_num(data_dirs.size());
154
17
    auto cumu_compaction_threads = get_cumu_compaction_threads_num(data_dirs.size());
155
17
    auto single_replica_compaction_threads =
156
17
            get_single_replica_compaction_threads_num(data_dirs.size());
157
158
17
    RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
159
17
                            .set_min_threads(base_compaction_threads)
160
17
                            .set_max_threads(base_compaction_threads)
161
17
                            .build(&_base_compaction_thread_pool));
162
17
    RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
163
17
                            .set_min_threads(cumu_compaction_threads)
164
17
                            .set_max_threads(cumu_compaction_threads)
165
17
                            .build(&_cumu_compaction_thread_pool));
166
17
    RETURN_IF_ERROR(ThreadPoolBuilder("SingleReplicaCompactionTaskThreadPool")
167
17
                            .set_min_threads(single_replica_compaction_threads)
168
17
                            .set_max_threads(single_replica_compaction_threads)
169
17
                            .build(&_single_replica_compaction_thread_pool));
170
171
17
    if (config::enable_segcompaction) {
172
3
        RETURN_IF_ERROR(ThreadPoolBuilder("SegCompactionTaskThreadPool")
173
3
                                .set_min_threads(config::segcompaction_num_threads)
174
3
                                .set_max_threads(config::segcompaction_num_threads)
175
3
                                .build(&_seg_compaction_thread_pool));
176
3
    }
177
17
    RETURN_IF_ERROR(ThreadPoolBuilder("ColdDataCompactionTaskThreadPool")
178
17
                            .set_min_threads(config::cold_data_compaction_thread_num)
179
17
                            .set_max_threads(config::cold_data_compaction_thread_num)
180
17
                            .build(&_cold_data_compaction_thread_pool));
181
182
    // compaction tasks producer thread
183
17
    RETURN_IF_ERROR(Thread::create(
184
17
            "StorageEngine", "compaction_tasks_producer_thread",
185
17
            [this]() { this->_compaction_tasks_producer_callback(); },
186
17
            &_compaction_tasks_producer_thread));
187
17
    LOG(INFO) << "compaction tasks producer thread started";
188
189
17
    RETURN_IF_ERROR(Thread::create(
190
17
            "StorageEngine", "_update_replica_infos_thread",
191
17
            [this]() { this->_update_replica_infos_callback(); }, &_update_replica_infos_thread));
192
17
    LOG(INFO) << "tablet replicas info update thread started";
193
194
17
    int32_t max_checkpoint_thread_num = config::max_meta_checkpoint_threads;
195
17
    if (max_checkpoint_thread_num < 0) {
196
17
        max_checkpoint_thread_num = data_dirs.size();
197
17
    }
198
17
    RETURN_IF_ERROR(ThreadPoolBuilder("TabletMetaCheckpointTaskThreadPool")
199
17
                            .set_max_threads(max_checkpoint_thread_num)
200
17
                            .build(&_tablet_meta_checkpoint_thread_pool));
201
202
17
    RETURN_IF_ERROR(ThreadPoolBuilder("MultiGetTaskThreadPool")
203
17
                            .set_min_threads(config::multi_get_max_threads)
204
17
                            .set_max_threads(config::multi_get_max_threads)
205
17
                            .build(&_bg_multi_get_thread_pool));
206
17
    RETURN_IF_ERROR(Thread::create(
207
17
            "StorageEngine", "tablet_checkpoint_tasks_producer_thread",
208
17
            [this, data_dirs]() { this->_tablet_checkpoint_callback(data_dirs); },
209
17
            &_tablet_checkpoint_tasks_producer_thread));
210
17
    LOG(INFO) << "tablet checkpoint tasks producer thread started";
211
212
17
    RETURN_IF_ERROR(Thread::create(
213
17
            "StorageEngine", "tablet_path_check_thread",
214
17
            [this]() { this->_tablet_path_check_callback(); }, &_tablet_path_check_thread));
215
17
    LOG(INFO) << "tablet path check thread started";
216
217
    // path scan and gc thread
218
17
    if (config::path_gc_check) {
219
18
        for (auto data_dir : get_stores()) {
220
18
            scoped_refptr<Thread> path_gc_thread;
221
18
            RETURN_IF_ERROR(Thread::create(
222
18
                    "StorageEngine", "path_gc_thread",
223
18
                    [this, data_dir]() { this->_path_gc_thread_callback(data_dir); },
224
18
                    &path_gc_thread));
225
18
            _path_gc_threads.emplace_back(path_gc_thread);
226
18
        }
227
17
        LOG(INFO) << "path gc threads started. number:" << get_stores().size();
228
17
    }
229
230
17
    RETURN_IF_ERROR(ThreadPoolBuilder("CooldownTaskThreadPool")
231
17
                            .set_min_threads(config::cooldown_thread_num)
232
17
                            .set_max_threads(config::cooldown_thread_num)
233
17
                            .build(&_cooldown_thread_pool));
234
17
    LOG(INFO) << "cooldown thread pool started";
235
236
17
    RETURN_IF_ERROR(Thread::create(
237
17
            "StorageEngine", "cooldown_tasks_producer_thread",
238
17
            [this]() { this->_cooldown_tasks_producer_callback(); },
239
17
            &_cooldown_tasks_producer_thread));
240
17
    LOG(INFO) << "cooldown tasks producer thread started";
241
242
17
    RETURN_IF_ERROR(Thread::create(
243
17
            "StorageEngine", "remove_unused_remote_files_thread",
244
17
            [this]() { this->_remove_unused_remote_files_callback(); },
245
17
            &_remove_unused_remote_files_thread));
246
17
    LOG(INFO) << "remove unused remote files thread started";
247
248
17
    RETURN_IF_ERROR(Thread::create(
249
17
            "StorageEngine", "cold_data_compaction_producer_thread",
250
17
            [this]() { this->_cold_data_compaction_producer_callback(); },
251
17
            &_cold_data_compaction_producer_thread));
252
17
    LOG(INFO) << "cold data compaction producer thread started";
253
254
    // add tablet publish version thread pool
255
17
    RETURN_IF_ERROR(ThreadPoolBuilder("TabletPublishTxnThreadPool")
256
17
                            .set_min_threads(config::tablet_publish_txn_max_thread)
257
17
                            .set_max_threads(config::tablet_publish_txn_max_thread)
258
17
                            .build(&_tablet_publish_txn_thread_pool));
259
260
17
    RETURN_IF_ERROR(Thread::create(
261
17
            "StorageEngine", "async_publish_version_thread",
262
17
            [this]() { this->_async_publish_callback(); }, &_async_publish_thread));
263
17
    LOG(INFO) << "async publish thread started";
264
265
17
    LOG(INFO) << "all storage engine's background threads are started.";
266
17
    return Status::OK();
267
17
}
268
269
17
void StorageEngine::_garbage_sweeper_thread_callback() {
270
17
    uint32_t max_interval = config::max_garbage_sweep_interval;
271
17
    uint32_t min_interval = config::min_garbage_sweep_interval;
272
273
17
    if (!(max_interval >= min_interval && min_interval > 0)) {
274
0
        LOG(WARNING) << "garbage sweep interval config is illegal: [max=" << max_interval
275
0
                     << " min=" << min_interval << "].";
276
0
        min_interval = 1;
277
0
        max_interval = max_interval >= min_interval ? max_interval : min_interval;
278
0
        LOG(INFO) << "force reset garbage sweep interval. "
279
0
                  << "max_interval=" << max_interval << ", min_interval=" << min_interval;
280
0
    }
281
282
17
    const double pi = M_PI;
283
17
    double usage = 1.0;
284
    // After the program starts, the first round of cleaning starts after min_interval.
285
17
    uint32_t curr_interval = min_interval;
286
17
    do {
287
        // Function properties:
288
        // when usage < 0.6,          ratio close to 1.(interval close to max_interval)
289
        // when usage at [0.6, 0.75], ratio is rapidly decreasing from 0.87 to 0.27.
290
        // when usage > 0.75,         ratio is slowly decreasing.
291
        // when usage > 0.8,          ratio close to min_interval.
292
        // when usage = 0.88,         ratio is approximately 0.0057.
293
17
        double ratio = (1.1 * (pi / 2 - std::atan(usage * 100 / 5 - 14)) - 0.28) / pi;
294
17
        ratio = ratio > 0 ? ratio : 0;
295
17
        auto curr_interval = uint32_t(max_interval * ratio);
296
17
        curr_interval = std::max(curr_interval, min_interval);
297
17
        curr_interval = std::min(curr_interval, max_interval);
298
299
        // start clean trash and update usage.
300
17
        Status res = start_trash_sweep(&usage);
301
17
        if (res.ok() && _need_clean_trash.exchange(false, std::memory_order_relaxed)) {
302
0
            res = start_trash_sweep(&usage, true);
303
0
        }
304
305
17
        if (!res.ok()) {
306
0
            LOG(WARNING) << "one or more errors occur when sweep trash."
307
0
                         << "see previous message for detail. err code=" << res;
308
            // do nothing. continue next loop.
309
0
        }
310
17
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(curr_interval)));
311
17
}
312
313
17
void StorageEngine::_disk_stat_monitor_thread_callback() {
314
17
    int32_t interval = config::disk_stat_monitor_interval;
315
25
    do {
316
25
        _start_disk_stat_monitor();
317
318
25
        interval = config::disk_stat_monitor_interval;
319
25
        if (interval <= 0) {
320
0
            LOG(WARNING) << "disk_stat_monitor_interval config is illegal: " << interval
321
0
                         << ", force set to 1";
322
0
            interval = 1;
323
0
        }
324
25
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
325
17
}
326
327
0
void StorageEngine::check_cumulative_compaction_config() {
328
0
    int64_t promotion_size = config::compaction_promotion_size_mbytes;
329
0
    int64_t promotion_min_size = config::compaction_promotion_min_size_mbytes;
330
0
    int64_t compaction_min_size = config::compaction_min_size_mbytes;
331
332
    // check size_based_promotion_size must be greater than size_based_promotion_min_size and 2 * size_based_compaction_lower_bound_size
333
0
    int64_t should_min_promotion_size = std::max(promotion_min_size, 2 * compaction_min_size);
334
335
0
    if (promotion_size < should_min_promotion_size) {
336
0
        promotion_size = should_min_promotion_size;
337
0
        LOG(WARNING) << "the config promotion_size is adjusted to "
338
0
                        "promotion_min_size or  2 * "
339
0
                        "compaction_min_size "
340
0
                     << should_min_promotion_size << ", because size_based_promotion_size is small";
341
0
    }
342
0
}
343
344
17
void StorageEngine::_unused_rowset_monitor_thread_callback() {
345
17
    int32_t interval = config::unused_rowset_monitor_interval;
346
18
    do {
347
18
        start_delete_unused_rowset();
348
349
18
        interval = config::unused_rowset_monitor_interval;
350
18
        if (interval <= 0) {
351
0
            LOG(WARNING) << "unused_rowset_monitor_interval config is illegal: " << interval
352
0
                         << ", force set to 1";
353
0
            interval = 1;
354
0
        }
355
18
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
356
17
}
357
358
26
int32_t StorageEngine::_auto_get_interval_by_disk_capacity(DataDir* data_dir) {
359
26
    double disk_used = data_dir->get_usage(0);
360
26
    double remain_used = 1 - disk_used;
361
26
    DCHECK(remain_used >= 0 && remain_used <= 1);
362
26
    DCHECK(config::path_gc_check_interval_second >= 0);
363
26
    int32_t ret = 0;
364
26
    if (remain_used > 0.9) {
365
        // if config::path_gc_check_interval_second == 24h
366
0
        ret = config::path_gc_check_interval_second;
367
26
    } else if (remain_used > 0.7) {
368
        // 12h
369
0
        ret = config::path_gc_check_interval_second / 2;
370
26
    } else if (remain_used > 0.5) {
371
        // 6h
372
0
        ret = config::path_gc_check_interval_second / 4;
373
26
    } else if (remain_used > 0.3) {
374
        // 4h
375
26
        ret = config::path_gc_check_interval_second / 6;
376
26
    } else {
377
        // 3h
378
0
        ret = config::path_gc_check_interval_second / 8;
379
0
    }
380
26
    return ret;
381
26
}
382
383
18
void StorageEngine::_path_gc_thread_callback(DataDir* data_dir) {
384
18
    LOG(INFO) << "try to start path gc thread!";
385
18
    int32_t last_exec_time = 0;
386
26
    do {
387
26
        int32_t current_time = time(nullptr);
388
389
26
        int32_t interval = _auto_get_interval_by_disk_capacity(data_dir);
390
26
        if (interval <= 0) {
391
0
            LOG(WARNING) << "path gc thread check interval config is illegal:" << interval
392
0
                         << "will be forced set to half hour";
393
0
            interval = 1800; // 0.5 hour
394
0
        }
395
26
        if (current_time - last_exec_time >= interval) {
396
18
            LOG(INFO) << "try to perform path gc! disk remain [" << 1 - data_dir->get_usage(0)
397
18
                      << "] internal [" << interval << "]";
398
18
            data_dir->perform_path_gc();
399
18
            last_exec_time = time(nullptr);
400
18
        }
401
26
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(5)));
402
18
    LOG(INFO) << "stop path gc thread!";
403
18
}
404
405
17
void StorageEngine::_tablet_checkpoint_callback(const std::vector<DataDir*>& data_dirs) {
406
17
    int64_t interval = config::generate_tablet_meta_checkpoint_tasks_interval_secs;
407
17
    do {
408
17
        LOG(INFO) << "begin to produce tablet meta checkpoint tasks.";
409
18
        for (auto data_dir : data_dirs) {
410
18
            auto st = _tablet_meta_checkpoint_thread_pool->submit_func(
411
18
                    [data_dir, this]() { _tablet_manager->do_tablet_meta_checkpoint(data_dir); });
412
18
            if (!st.ok()) {
413
0
                LOG(WARNING) << "submit tablet checkpoint tasks failed.";
414
0
            }
415
18
        }
416
17
        interval = config::generate_tablet_meta_checkpoint_tasks_interval_secs;
417
17
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
418
17
}
419
420
17
void StorageEngine::_tablet_path_check_callback() {
421
17
    struct TabletIdComparator {
422
17
        bool operator()(Tablet* a, Tablet* b) { return a->tablet_id() < b->tablet_id(); }
423
17
    };
424
425
17
    using TabletQueue = std::priority_queue<Tablet*, std::vector<Tablet*>, TabletIdComparator>;
426
427
17
    int64_t interval = config::tablet_path_check_interval_seconds;
428
17
    if (interval <= 0) {
429
17
        return;
430
17
    }
431
432
0
    int64_t last_tablet_id = 0;
433
0
    do {
434
0
        int32_t batch_size = config::tablet_path_check_batch_size;
435
0
        if (batch_size <= 0) {
436
0
            if (_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))) {
437
0
                break;
438
0
            }
439
0
            continue;
440
0
        }
441
442
0
        LOG(INFO) << "start to check tablet path";
443
444
0
        auto all_tablets = _tablet_manager->get_all_tablet(
445
0
                [](Tablet* t) { return t->is_used() && t->tablet_state() == TABLET_RUNNING; });
446
447
0
        TabletQueue big_id_tablets;
448
0
        TabletQueue small_id_tablets;
449
0
        for (auto tablet : all_tablets) {
450
0
            auto tablet_id = tablet->tablet_id();
451
0
            TabletQueue* belong_tablets = nullptr;
452
0
            if (tablet_id > last_tablet_id) {
453
0
                if (big_id_tablets.size() < batch_size ||
454
0
                    big_id_tablets.top()->tablet_id() > tablet_id) {
455
0
                    belong_tablets = &big_id_tablets;
456
0
                }
457
0
            } else if (big_id_tablets.size() < batch_size) {
458
0
                if (small_id_tablets.size() < batch_size ||
459
0
                    small_id_tablets.top()->tablet_id() > tablet_id) {
460
0
                    belong_tablets = &small_id_tablets;
461
0
                }
462
0
            }
463
0
            if (belong_tablets != nullptr) {
464
0
                belong_tablets->push(tablet.get());
465
0
                if (belong_tablets->size() > batch_size) {
466
0
                    belong_tablets->pop();
467
0
                }
468
0
            }
469
0
        }
470
471
0
        int32_t need_small_id_tablet_size =
472
0
                batch_size - static_cast<int32_t>(big_id_tablets.size());
473
474
0
        if (!big_id_tablets.empty()) {
475
0
            last_tablet_id = big_id_tablets.top()->tablet_id();
476
0
        }
477
0
        while (!big_id_tablets.empty()) {
478
0
            big_id_tablets.top()->check_tablet_path_exists();
479
0
            big_id_tablets.pop();
480
0
        }
481
482
0
        if (!small_id_tablets.empty() && need_small_id_tablet_size > 0) {
483
0
            while (static_cast<int32_t>(small_id_tablets.size()) > need_small_id_tablet_size) {
484
0
                small_id_tablets.pop();
485
0
            }
486
487
0
            last_tablet_id = small_id_tablets.top()->tablet_id();
488
0
            while (!small_id_tablets.empty()) {
489
0
                small_id_tablets.top()->check_tablet_path_exists();
490
0
                small_id_tablets.pop();
491
0
            }
492
0
        }
493
494
0
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
495
0
}
496
497
38
void StorageEngine::_adjust_compaction_thread_num() {
498
38
    auto base_compaction_threads_num = get_base_compaction_threads_num(_store_map.size());
499
38
    if (_base_compaction_thread_pool->max_threads() != base_compaction_threads_num) {
500
0
        int old_max_threads = _base_compaction_thread_pool->max_threads();
501
0
        Status status = _base_compaction_thread_pool->set_max_threads(base_compaction_threads_num);
502
0
        if (status.ok()) {
503
0
            VLOG_NOTICE << "update base compaction thread pool max_threads from " << old_max_threads
504
0
                        << " to " << base_compaction_threads_num;
505
0
        }
506
0
    }
507
38
    if (_base_compaction_thread_pool->min_threads() != base_compaction_threads_num) {
508
0
        int old_min_threads = _base_compaction_thread_pool->min_threads();
509
0
        Status status = _base_compaction_thread_pool->set_min_threads(base_compaction_threads_num);
510
0
        if (status.ok()) {
511
0
            VLOG_NOTICE << "update base compaction thread pool min_threads from " << old_min_threads
512
0
                        << " to " << base_compaction_threads_num;
513
0
        }
514
0
    }
515
516
38
    auto cumu_compaction_threads_num = get_cumu_compaction_threads_num(_store_map.size());
517
38
    if (_cumu_compaction_thread_pool->max_threads() != cumu_compaction_threads_num) {
518
0
        int old_max_threads = _cumu_compaction_thread_pool->max_threads();
519
0
        Status status = _cumu_compaction_thread_pool->set_max_threads(cumu_compaction_threads_num);
520
0
        if (status.ok()) {
521
0
            VLOG_NOTICE << "update cumu compaction thread pool max_threads from " << old_max_threads
522
0
                        << " to " << cumu_compaction_threads_num;
523
0
        }
524
0
    }
525
38
    if (_cumu_compaction_thread_pool->min_threads() != cumu_compaction_threads_num) {
526
0
        int old_min_threads = _cumu_compaction_thread_pool->min_threads();
527
0
        Status status = _cumu_compaction_thread_pool->set_min_threads(cumu_compaction_threads_num);
528
0
        if (status.ok()) {
529
0
            VLOG_NOTICE << "update cumu compaction thread pool min_threads from " << old_min_threads
530
0
                        << " to " << cumu_compaction_threads_num;
531
0
        }
532
0
    }
533
534
38
    auto single_replica_compaction_threads_num =
535
38
            get_single_replica_compaction_threads_num(_store_map.size());
536
38
    if (_single_replica_compaction_thread_pool->max_threads() !=
537
38
        single_replica_compaction_threads_num) {
538
0
        int old_max_threads = _single_replica_compaction_thread_pool->max_threads();
539
0
        Status status = _single_replica_compaction_thread_pool->set_max_threads(
540
0
                single_replica_compaction_threads_num);
541
0
        if (status.ok()) {
542
0
            VLOG_NOTICE << "update single replica compaction thread pool max_threads from "
543
0
                        << old_max_threads << " to " << single_replica_compaction_threads_num;
544
0
        }
545
0
    }
546
38
    if (_single_replica_compaction_thread_pool->min_threads() !=
547
38
        single_replica_compaction_threads_num) {
548
0
        int old_min_threads = _single_replica_compaction_thread_pool->min_threads();
549
0
        Status status = _single_replica_compaction_thread_pool->set_min_threads(
550
0
                single_replica_compaction_threads_num);
551
0
        if (status.ok()) {
552
0
            VLOG_NOTICE << "update single replica compaction thread pool min_threads from "
553
0
                        << old_min_threads << " to " << single_replica_compaction_threads_num;
554
0
        }
555
0
    }
556
38
}
557
558
17
void StorageEngine::_compaction_tasks_producer_callback() {
559
17
    LOG(INFO) << "try to start compaction producer process!";
560
561
17
    std::unordered_set<TabletSharedPtr> tablet_submitted_cumu;
562
17
    std::unordered_set<TabletSharedPtr> tablet_submitted_base;
563
17
    std::vector<DataDir*> data_dirs;
564
18
    for (auto& tmp_store : _store_map) {
565
18
        data_dirs.push_back(tmp_store.second);
566
18
        _tablet_submitted_cumu_compaction[tmp_store.second] = tablet_submitted_cumu;
567
18
        _tablet_submitted_base_compaction[tmp_store.second] = tablet_submitted_base;
568
18
    }
569
570
17
    int round = 0;
571
17
    CompactionType compaction_type;
572
573
    // Used to record the time when the score metric was last updated.
574
    // The update of the score metric is accompanied by the logic of selecting the tablet.
575
    // If there is no slot available, the logic of selecting the tablet will be terminated,
576
    // which causes the score metric update to be terminated.
577
    // In order to avoid this situation, we need to update the score regularly.
578
17
    int64_t last_cumulative_score_update_time = 0;
579
17
    int64_t last_base_score_update_time = 0;
580
17
    static const int64_t check_score_interval_ms = 5000; // 5 secs
581
582
17
    int64_t interval = config::generate_compaction_tasks_interval_ms;
583
38
    do {
584
38
        if (!config::disable_auto_compaction &&
585
38
            !GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
586
38
            _adjust_compaction_thread_num();
587
588
38
            bool check_score = false;
589
38
            int64_t cur_time = UnixMillis();
590
38
            if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) {
591
36
                compaction_type = CompactionType::CUMULATIVE_COMPACTION;
592
36
                round++;
593
36
                if (cur_time - last_cumulative_score_update_time >= check_score_interval_ms) {
594
22
                    check_score = true;
595
22
                    last_cumulative_score_update_time = cur_time;
596
22
                }
597
36
            } else {
598
2
                compaction_type = CompactionType::BASE_COMPACTION;
599
2
                round = 0;
600
2
                if (cur_time - last_base_score_update_time >= check_score_interval_ms) {
601
2
                    check_score = true;
602
2
                    last_base_score_update_time = cur_time;
603
2
                }
604
2
            }
605
38
            std::unique_ptr<ThreadPool>& thread_pool =
606
38
                    (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
607
38
                            ? _cumu_compaction_thread_pool
608
38
                            : _base_compaction_thread_pool;
609
38
            VLOG_CRITICAL << "compaction thread pool. type: "
610
0
                          << (compaction_type == CompactionType::CUMULATIVE_COMPACTION ? "CUMU"
611
0
                                                                                       : "BASE")
612
0
                          << ", num_threads: " << thread_pool->num_threads()
613
0
                          << ", num_threads_pending_start: "
614
0
                          << thread_pool->num_threads_pending_start()
615
0
                          << ", num_active_threads: " << thread_pool->num_active_threads()
616
0
                          << ", max_threads: " << thread_pool->max_threads()
617
0
                          << ", min_threads: " << thread_pool->min_threads()
618
0
                          << ", num_total_queued_tasks: " << thread_pool->get_queue_size();
619
38
            std::vector<TabletSharedPtr> tablets_compaction =
620
38
                    _generate_compaction_tasks(compaction_type, data_dirs, check_score);
621
38
            if (tablets_compaction.size() == 0) {
622
37
                std::unique_lock<std::mutex> lock(_compaction_producer_sleep_mutex);
623
37
                _wakeup_producer_flag = 0;
624
                // It is necessary to wake up the thread on timeout to prevent deadlock
625
                // in case of no running compaction task.
626
37
                _compaction_producer_sleep_cv.wait_for(
627
37
                        lock, std::chrono::milliseconds(2000),
628
74
                        [this] { return _wakeup_producer_flag == 1; });
629
37
                continue;
630
37
            }
631
632
1
            for (const auto& tablet : tablets_compaction) {
633
1
                if (compaction_type == CompactionType::BASE_COMPACTION) {
634
0
                    tablet->set_last_base_compaction_schedule_time(UnixMillis());
635
0
                }
636
1
                Status st = _submit_compaction_task(tablet, compaction_type, false);
637
1
                if (!st.ok()) {
638
0
                    LOG(WARNING) << "failed to submit compaction task for tablet: "
639
0
                                 << tablet->tablet_id() << ", err: " << st;
640
0
                }
641
1
            }
642
1
            interval = config::generate_compaction_tasks_interval_ms;
643
1
        } else {
644
0
            interval = 5000; // 5s to check disable_auto_compaction
645
0
        }
646
38
    } while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
647
17
}
648
649
17
void StorageEngine::_update_replica_infos_callback() {
650
#ifdef GOOGLE_PROFILER
651
    ProfilerRegisterThread();
652
#endif
653
17
    LOG(INFO) << "start to update replica infos!";
654
655
17
    int64_t interval = config::update_replica_infos_interval_seconds;
656
17
    do {
657
17
        auto all_tablets = _tablet_manager->get_all_tablet([](Tablet* t) {
658
0
            return t->is_used() && t->tablet_state() == TABLET_RUNNING &&
659
0
                   !t->tablet_meta()->tablet_schema()->disable_auto_compaction() &&
660
0
                   t->tablet_meta()->tablet_schema()->enable_single_replica_compaction();
661
0
        });
662
17
        TMasterInfo* master_info = ExecEnv::GetInstance()->master_info();
663
17
        if (master_info == nullptr) {
664
17
            LOG(WARNING) << "Have not get FE Master heartbeat yet";
665
17
            std::this_thread::sleep_for(std::chrono::seconds(2));
666
17
            continue;
667
17
        }
668
0
        TNetworkAddress master_addr = master_info->network_address;
669
0
        if (master_addr.hostname == "" || master_addr.port == 0) {
670
0
            LOG(WARNING) << "Have not get FE Master heartbeat yet";
671
0
            std::this_thread::sleep_for(std::chrono::seconds(2));
672
0
            continue;
673
0
        }
674
675
0
        int start = 0;
676
0
        int tablet_size = all_tablets.size();
677
        // The while loop may take a long time, we should skip it when stop
678
0
        while (start < tablet_size && _stop_background_threads_latch.count() > 0) {
679
0
            int batch_size = std::min(100, tablet_size - start);
680
0
            int end = start + batch_size;
681
0
            TGetTabletReplicaInfosRequest request;
682
0
            TGetTabletReplicaInfosResult result;
683
0
            for (int i = start; i < end; i++) {
684
0
                request.tablet_ids.emplace_back(all_tablets[i]->tablet_id());
685
0
            }
686
0
            Status rpc_st = ThriftRpcHelper::rpc<FrontendServiceClient>(
687
0
                    master_addr.hostname, master_addr.port,
688
0
                    [&request, &result](FrontendServiceConnection& client) {
689
0
                        client->getTabletReplicaInfos(result, request);
690
0
                    });
691
692
0
            if (!rpc_st.ok()) {
693
0
                LOG(WARNING) << "Failed to get tablet replica infos, encounter rpc failure, "
694
0
                                "tablet start: "
695
0
                             << start << " end: " << end;
696
0
                continue;
697
0
            }
698
699
0
            std::unique_lock<std::mutex> lock(_peer_replica_infos_mutex);
700
0
            for (const auto& it : result.tablet_replica_infos) {
701
0
                auto tablet_id = it.first;
702
0
                auto tablet = _tablet_manager->get_tablet(tablet_id);
703
0
                if (tablet == nullptr) {
704
0
                    VLOG_CRITICAL << "tablet ptr is nullptr";
705
0
                    continue;
706
0
                }
707
708
0
                VLOG_NOTICE << tablet_id << " tablet has " << it.second.size() << " replicas";
709
0
                uint64_t min_modulo = MOD_PRIME;
710
0
                TReplicaInfo peer_replica;
711
0
                for (const auto& replica : it.second) {
712
0
                    int64_t peer_replica_id = replica.replica_id;
713
0
                    uint64_t modulo = HashUtil::hash64(&peer_replica_id, sizeof(peer_replica_id),
714
0
                                                       DEFAULT_SEED) %
715
0
                                      MOD_PRIME;
716
0
                    if (modulo < min_modulo) {
717
0
                        peer_replica = replica;
718
0
                        min_modulo = modulo;
719
0
                    }
720
0
                }
721
0
                VLOG_NOTICE << "tablet " << tablet_id << ", peer replica host is "
722
0
                            << peer_replica.host;
723
0
                _peer_replica_infos[tablet_id] = peer_replica;
724
0
            }
725
0
            _token = result.token;
726
0
            VLOG_NOTICE << "get tablet replica infos from fe, size is " << end - start
727
0
                        << " token = " << result.token;
728
0
            start = end;
729
0
        }
730
0
        interval = config::update_replica_infos_interval_seconds;
731
17
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
732
17
}
733
734
Status StorageEngine::_submit_single_replica_compaction_task(TabletSharedPtr tablet,
735
0
                                                             CompactionType compaction_type) {
736
    // For single replica compaction, the local version to be merged is determined based on the version fetched from the peer replica.
737
    // Therefore, it is currently not possible to determine whether it should be a base compaction or cumulative compaction.
738
    // As a result, the tablet needs to be pushed to both the _tablet_submitted_cumu_compaction and the _tablet_submitted_base_compaction simultaneously.
739
0
    bool already_exist =
740
0
            _push_tablet_into_submitted_compaction(tablet, CompactionType::CUMULATIVE_COMPACTION);
741
0
    if (already_exist) {
742
0
        return Status::AlreadyExist<false>(
743
0
                "compaction task has already been submitted, tablet_id={}", tablet->tablet_id());
744
0
    }
745
746
0
    already_exist = _push_tablet_into_submitted_compaction(tablet, CompactionType::BASE_COMPACTION);
747
0
    if (already_exist) {
748
0
        _pop_tablet_from_submitted_compaction(tablet, CompactionType::CUMULATIVE_COMPACTION);
749
0
        return Status::AlreadyExist<false>(
750
0
                "compaction task has already been submitted, tablet_id={}", tablet->tablet_id());
751
0
    }
752
753
0
    auto compaction = std::make_shared<SingleReplicaCompaction>(tablet, compaction_type);
754
0
    DorisMetrics::instance()->single_compaction_request_total->increment(1);
755
0
    auto st = compaction->prepare_compact();
756
757
0
    auto clean_single_replica_compaction = [tablet, this]() {
758
0
        _pop_tablet_from_submitted_compaction(tablet, CompactionType::CUMULATIVE_COMPACTION);
759
0
        _pop_tablet_from_submitted_compaction(tablet, CompactionType::BASE_COMPACTION);
760
0
    };
761
762
0
    if (!st.ok()) {
763
0
        clean_single_replica_compaction();
764
0
        if (!st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) {
765
0
            LOG(WARNING) << "failed to prepare single replica compaction, tablet_id="
766
0
                         << tablet->tablet_id() << " : " << st;
767
0
            return st;
768
0
        }
769
0
        return Status::OK(); // No suitable version, regard as OK
770
0
    }
771
772
0
    auto submit_st = _single_replica_compaction_thread_pool->submit_func(
773
0
            [tablet, compaction = std::move(compaction),
774
0
             clean_single_replica_compaction]() mutable {
775
0
                tablet->execute_single_replica_compaction(*compaction);
776
0
                clean_single_replica_compaction();
777
0
            });
778
0
    if (!submit_st.ok()) {
779
0
        clean_single_replica_compaction();
780
0
        return Status::InternalError(
781
0
                "failed to submit single replica compaction task to thread pool, "
782
0
                "tablet_id={}",
783
0
                tablet->tablet_id());
784
0
    }
785
0
    return Status::OK();
786
0
}
787
788
void StorageEngine::get_tablet_rowset_versions(const PGetTabletVersionsRequest* request,
789
0
                                               PGetTabletVersionsResponse* response) {
790
0
    TabletSharedPtr tablet = _tablet_manager->get_tablet(request->tablet_id());
791
0
    if (tablet == nullptr) {
792
0
        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
793
0
        return;
794
0
    }
795
0
    std::vector<Version> local_versions = tablet->get_all_local_versions();
796
0
    for (const auto& local_version : local_versions) {
797
0
        auto version = response->add_versions();
798
0
        version->set_first(local_version.first);
799
0
        version->set_second(local_version.second);
800
0
    }
801
0
    response->mutable_status()->set_status_code(0);
802
0
}
803
804
int StorageEngine::_get_executing_compaction_num(
805
83
        std::unordered_set<TabletSharedPtr>& compaction_tasks) {
806
83
    int num = 0;
807
83
    for (const auto& task : compaction_tasks) {
808
10
        if (task->compaction_stage == CompactionStage::EXECUTING) {
809
2
            num++;
810
2
        }
811
10
    }
812
83
    return num;
813
83
}
814
815
bool need_generate_compaction_tasks(int task_cnt_per_disk, int thread_per_disk,
816
82
                                    CompactionType compaction_type, bool all_base) {
817
82
    if (task_cnt_per_disk >= thread_per_disk) {
818
        // Return if no available slot
819
0
        return false;
820
82
    } else if (task_cnt_per_disk >= thread_per_disk - 1) {
821
        // Only one slot left, check if it can be assigned to base compaction task.
822
0
        if (compaction_type == CompactionType::BASE_COMPACTION) {
823
0
            if (all_base) {
824
0
                return false;
825
0
            }
826
0
        }
827
0
    }
828
82
    return true;
829
82
}
830
831
41
int get_concurrent_per_disk(int max_score, int thread_per_disk) {
832
41
    if (!config::enable_compaction_priority_scheduling) {
833
0
        return thread_per_disk;
834
0
    }
835
836
41
    double load_average = 0;
837
41
    if (DorisMetrics::instance()->system_metrics() != nullptr) {
838
0
        load_average = DorisMetrics::instance()->system_metrics()->get_load_average_1_min();
839
0
    }
840
41
    int num_cores = doris::CpuInfo::num_cores();
841
41
    bool cpu_usage_high = load_average > num_cores * 0.8;
842
843
41
    auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage();
844
41
    bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit() * 0.8;
845
846
41
    if (max_score <= config::low_priority_compaction_score_threshold &&
847
41
        (cpu_usage_high || memory_usage_high)) {
848
0
        return config::low_priority_compaction_task_num_per_disk;
849
0
    }
850
851
41
    return thread_per_disk;
852
41
}
853
854
std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks(
855
38
        CompactionType compaction_type, std::vector<DataDir*>& data_dirs, bool check_score) {
856
38
    _update_cumulative_compaction_policy();
857
38
    std::vector<TabletSharedPtr> tablets_compaction;
858
38
    uint32_t max_compaction_score = 0;
859
860
38
    std::random_device rd;
861
38
    std::mt19937 g(rd());
862
38
    std::shuffle(data_dirs.begin(), data_dirs.end(), g);
863
864
    // Copy _tablet_submitted_xxx_compaction map so that we don't need to hold _tablet_submitted_compaction_mutex
865
    // when traversing the data dir
866
38
    std::map<DataDir*, std::unordered_set<TabletSharedPtr>> copied_cumu_map;
867
38
    std::map<DataDir*, std::unordered_set<TabletSharedPtr>> copied_base_map;
868
38
    {
869
38
        std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex);
870
38
        copied_cumu_map = _tablet_submitted_cumu_compaction;
871
38
        copied_base_map = _tablet_submitted_base_compaction;
872
38
    }
873
41
    for (auto* data_dir : data_dirs) {
874
41
        bool need_pick_tablet = true;
875
        // We need to reserve at least one Slot for cumulative compaction.
876
        // So when there is only one Slot, we have to judge whether there is a cumulative compaction
877
        // in the current submitted tasks.
878
        // If so, the last Slot can be assigned to Base compaction,
879
        // otherwise, this Slot needs to be reserved for cumulative compaction.
880
41
        int count = _get_executing_compaction_num(copied_cumu_map[data_dir]) +
881
41
                    _get_executing_compaction_num(copied_base_map[data_dir]);
882
41
        int thread_per_disk = data_dir->is_ssd_disk() ? config::compaction_task_num_per_fast_disk
883
41
                                                      : config::compaction_task_num_per_disk;
884
885
41
        need_pick_tablet = need_generate_compaction_tasks(count, thread_per_disk, compaction_type,
886
41
                                                          copied_cumu_map[data_dir].empty());
887
41
        if (!need_pick_tablet && !check_score) {
888
0
            continue;
889
0
        }
890
891
        // Even if need_pick_tablet is false, we still need to call find_best_tablet_to_compaction(),
892
        // So that we can update the max_compaction_score metric.
893
41
        if (!data_dir->reach_capacity_limit(0)) {
894
41
            uint32_t disk_max_score = 0;
895
41
            TabletSharedPtr tablet = _tablet_manager->find_best_tablet_to_compaction(
896
41
                    compaction_type, data_dir,
897
41
                    compaction_type == CompactionType::CUMULATIVE_COMPACTION
898
41
                            ? copied_cumu_map[data_dir]
899
41
                            : copied_base_map[data_dir],
900
41
                    &disk_max_score, _cumulative_compaction_policies);
901
41
            int concurrent_num = get_concurrent_per_disk(disk_max_score, thread_per_disk);
902
41
            need_pick_tablet = need_generate_compaction_tasks(
903
41
                    count, concurrent_num, compaction_type, copied_cumu_map[data_dir].empty());
904
41
            if (tablet != nullptr) {
905
1
                if (need_pick_tablet) {
906
1
                    tablets_compaction.emplace_back(tablet);
907
1
                }
908
1
                max_compaction_score = std::max(max_compaction_score, disk_max_score);
909
1
            }
910
41
        }
911
41
    }
912
913
38
    if (max_compaction_score > 0) {
914
1
        if (compaction_type == CompactionType::BASE_COMPACTION) {
915
0
            DorisMetrics::instance()->tablet_base_max_compaction_score->set_value(
916
0
                    max_compaction_score);
917
1
        } else {
918
1
            DorisMetrics::instance()->tablet_cumulative_max_compaction_score->set_value(
919
1
                    max_compaction_score);
920
1
        }
921
1
    }
922
38
    return tablets_compaction;
923
38
}
924
925
38
void StorageEngine::_update_cumulative_compaction_policy() {
926
38
    if (_cumulative_compaction_policies.empty()) {
927
17
        _cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY] =
928
17
                CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
929
17
                        CUMULATIVE_SIZE_BASED_POLICY);
930
17
        _cumulative_compaction_policies[CUMULATIVE_TIME_SERIES_POLICY] =
931
17
                CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
932
17
                        CUMULATIVE_TIME_SERIES_POLICY);
933
17
    }
934
38
}
935
936
bool StorageEngine::_push_tablet_into_submitted_compaction(TabletSharedPtr tablet,
937
11
                                                           CompactionType compaction_type) {
938
11
    std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex);
939
11
    bool already_existed = false;
940
11
    switch (compaction_type) {
941
11
    case CompactionType::CUMULATIVE_COMPACTION:
942
11
        already_existed =
943
11
                !(_tablet_submitted_cumu_compaction[tablet->data_dir()].insert(tablet).second);
944
11
        break;
945
0
    case CompactionType::BASE_COMPACTION:
946
0
        already_existed =
947
0
                !(_tablet_submitted_base_compaction[tablet->data_dir()].insert(tablet).second);
948
0
        break;
949
0
    case CompactionType::FULL_COMPACTION:
950
0
        already_existed =
951
0
                !(_tablet_submitted_full_compaction[tablet->data_dir()].insert(tablet).second);
952
0
        break;
953
11
    }
954
11
    return already_existed;
955
11
}
956
957
void StorageEngine::_pop_tablet_from_submitted_compaction(TabletSharedPtr tablet,
958
1
                                                          CompactionType compaction_type) {
959
1
    std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex);
960
1
    int removed = 0;
961
1
    switch (compaction_type) {
962
1
    case CompactionType::CUMULATIVE_COMPACTION:
963
1
        removed = _tablet_submitted_cumu_compaction[tablet->data_dir()].erase(tablet);
964
1
        break;
965
0
    case CompactionType::BASE_COMPACTION:
966
0
        removed = _tablet_submitted_base_compaction[tablet->data_dir()].erase(tablet);
967
0
        break;
968
0
    case CompactionType::FULL_COMPACTION:
969
0
        removed = _tablet_submitted_full_compaction[tablet->data_dir()].erase(tablet);
970
0
        break;
971
1
    }
972
973
1
    if (removed == 1) {
974
1
        std::unique_lock<std::mutex> lock(_compaction_producer_sleep_mutex);
975
1
        _wakeup_producer_flag = 1;
976
1
        _compaction_producer_sleep_cv.notify_one();
977
1
    }
978
1
}
979
980
Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
981
11
                                              CompactionType compaction_type, bool force) {
982
11
    if (tablet->tablet_meta()->tablet_schema()->enable_single_replica_compaction() &&
983
11
        should_fetch_from_peer(tablet->tablet_id())) {
984
0
        VLOG_CRITICAL << "start to submit single replica compaction task for tablet: "
985
0
                      << tablet->tablet_id();
986
0
        Status st = _submit_single_replica_compaction_task(tablet, compaction_type);
987
0
        if (!st.ok()) {
988
0
            LOG(WARNING) << "failed to submit single replica compaction task for tablet: "
989
0
                         << tablet->tablet_id() << ", err: " << st;
990
0
        }
991
992
0
        return Status::OK();
993
0
    }
994
11
    bool already_exist = _push_tablet_into_submitted_compaction(tablet, compaction_type);
995
11
    if (already_exist) {
996
0
        return Status::AlreadyExist<false>(
997
0
                "compaction task has already been submitted, tablet_id={}, compaction_type={}.",
998
0
                tablet->tablet_id(), compaction_type);
999
0
    }
1000
11
    std::shared_ptr<Compaction> compaction;
1001
11
    tablet->compaction_stage = CompactionStage::PENDING;
1002
11
    int64_t permits = 0;
1003
11
    Status st = Tablet::prepare_compaction_and_calculate_permits(compaction_type, tablet,
1004
11
                                                                 compaction, permits);
1005
11
    if (st.ok() && permits > 0) {
1006
10
        if (!force) {
1007
10
            _permit_limiter.request(permits);
1008
10
        }
1009
10
        std::unique_ptr<ThreadPool>& thread_pool =
1010
10
                (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
1011
10
                        ? _cumu_compaction_thread_pool
1012
10
                        : _base_compaction_thread_pool;
1013
10
        auto st = thread_pool->submit_func([tablet, compaction = std::move(compaction),
1014
10
                                            compaction_type, permits, force, this]() {
1015
2
            if (!tablet->can_do_compaction(tablet->data_dir()->path_hash(), compaction_type)) {
1016
0
                LOG(INFO) << "Tablet state has been changed, no need to begin this compaction "
1017
0
                             "task, tablet_id="
1018
0
                          << tablet->tablet_id() << "tablet_state=" << tablet->tablet_state();
1019
0
                return;
1020
0
            }
1021
2
            tablet->compaction_stage = CompactionStage::EXECUTING;
1022
2
            TEST_SYNC_POINT_RETURN_WITH_VOID("olap_server::execute_compaction");
1023
0
            tablet->execute_compaction(*compaction);
1024
0
            if (!force) {
1025
0
                _permit_limiter.release(permits);
1026
0
            }
1027
0
            _pop_tablet_from_submitted_compaction(tablet, compaction_type);
1028
0
            tablet->compaction_stage = CompactionStage::NOT_SCHEDULED;
1029
0
        });
1030
10
        if (!st.ok()) {
1031
0
            if (!force) {
1032
0
                _permit_limiter.release(permits);
1033
0
            }
1034
0
            _pop_tablet_from_submitted_compaction(tablet, compaction_type);
1035
0
            tablet->compaction_stage = CompactionStage::NOT_SCHEDULED;
1036
0
            return Status::InternalError(
1037
0
                    "failed to submit compaction task to thread pool, "
1038
0
                    "tablet_id={}, compaction_type={}.",
1039
0
                    tablet->tablet_id(), compaction_type);
1040
0
        }
1041
10
        return Status::OK();
1042
10
    } else {
1043
1
        _pop_tablet_from_submitted_compaction(tablet, compaction_type);
1044
1
        tablet->compaction_stage = CompactionStage::NOT_SCHEDULED;
1045
1
        if (!st.ok()) {
1046
0
            return Status::InternalError(
1047
0
                    "failed to prepare compaction task and calculate permits, "
1048
0
                    "tablet_id={}, compaction_type={}, "
1049
0
                    "permit={}, current_permit={}, status={}",
1050
0
                    tablet->tablet_id(), compaction_type, permits, _permit_limiter.usage(),
1051
0
                    st.to_string());
1052
0
        }
1053
1
        return st;
1054
1
    }
1055
11
}
1056
1057
Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type,
1058
0
                                             bool force, bool eager) {
1059
0
    if (!eager) {
1060
0
        DCHECK(compaction_type == CompactionType::BASE_COMPACTION ||
1061
0
               compaction_type == CompactionType::CUMULATIVE_COMPACTION);
1062
0
        std::map<DataDir*, std::unordered_set<TabletSharedPtr>> copied_cumu_map;
1063
0
        std::map<DataDir*, std::unordered_set<TabletSharedPtr>> copied_base_map;
1064
0
        {
1065
0
            std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex);
1066
0
            copied_cumu_map = _tablet_submitted_cumu_compaction;
1067
0
            copied_base_map = _tablet_submitted_base_compaction;
1068
0
        }
1069
0
        auto stores = get_stores();
1070
1071
0
        auto busy_pred = [&copied_cumu_map, &copied_base_map, compaction_type,
1072
0
                          this](auto* data_dir) {
1073
0
            int count = _get_executing_compaction_num(copied_base_map[data_dir]) +
1074
0
                        _get_executing_compaction_num(copied_cumu_map[data_dir]);
1075
0
            int paral = data_dir->is_ssd_disk() ? config::compaction_task_num_per_fast_disk
1076
0
                                                : config::compaction_task_num_per_disk;
1077
0
            bool all_base = copied_cumu_map[data_dir].empty();
1078
0
            return need_generate_compaction_tasks(count, paral, compaction_type, all_base);
1079
0
        };
1080
1081
0
        bool is_busy = std::none_of(stores.begin(), stores.end(), busy_pred);
1082
0
        if (is_busy) {
1083
0
            LOG_EVERY_N(WARNING, 100)
1084
0
                    << "Too busy to submit a compaction task, tablet=" << tablet->get_table_id();
1085
0
            return Status::OK();
1086
0
        }
1087
0
    }
1088
0
    _update_cumulative_compaction_policy();
1089
    // alter table tableName set ("compaction_policy"="time_series")
1090
    // if atler table's compaction  policy, we need to modify tablet compaction policy shared ptr
1091
0
    if (tablet->get_cumulative_compaction_policy() == nullptr ||
1092
0
        tablet->get_cumulative_compaction_policy()->name() !=
1093
0
                tablet->tablet_meta()->compaction_policy()) {
1094
0
        tablet->set_cumulative_compaction_policy(
1095
0
                _cumulative_compaction_policies.at(tablet->tablet_meta()->compaction_policy()));
1096
0
    }
1097
0
    tablet->set_skip_compaction(false);
1098
0
    return _submit_compaction_task(tablet, compaction_type, force);
1099
0
}
1100
1101
Status StorageEngine::_handle_seg_compaction(std::shared_ptr<SegcompactionWorker> worker,
1102
                                             SegCompactionCandidatesSharedPtr segments,
1103
11
                                             uint64_t submission_time) {
1104
    // note: be aware that worker->_writer maybe released when the task is cancelled
1105
11
    uint64_t exec_queue_time = GetCurrentTimeMicros() - submission_time;
1106
11
    LOG(INFO) << "segcompaction thread pool queue time(ms): " << exec_queue_time / 1000;
1107
11
    worker->compact_segments(segments);
1108
    // return OK here. error will be reported via BetaRowsetWriter::_segcompaction_status
1109
11
    return Status::OK();
1110
11
}
1111
1112
Status StorageEngine::submit_seg_compaction_task(std::shared_ptr<SegcompactionWorker> worker,
1113
11
                                                 SegCompactionCandidatesSharedPtr segments) {
1114
11
    uint64_t submission_time = GetCurrentTimeMicros();
1115
11
    return _seg_compaction_thread_pool->submit_func(std::bind<void>(
1116
11
            &StorageEngine::_handle_seg_compaction, this, worker, segments, submission_time));
1117
11
}
1118
1119
0
Status StorageEngine::process_index_change_task(const TAlterInvertedIndexReq& request) {
1120
0
    auto tablet_id = request.tablet_id;
1121
0
    TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id);
1122
0
    if (tablet == nullptr) {
1123
0
        LOG(WARNING) << "tablet: " << tablet_id << " not exist";
1124
0
        return Status::InternalError("tablet not exist, tablet_id={}.", tablet_id);
1125
0
    }
1126
1127
0
    IndexBuilderSharedPtr index_builder = std::make_shared<IndexBuilder>(
1128
0
            tablet, request.columns, request.alter_inverted_indexes, request.is_drop_op);
1129
0
    RETURN_IF_ERROR(_handle_index_change(index_builder));
1130
0
    return Status::OK();
1131
0
}
1132
1133
0
Status StorageEngine::_handle_index_change(IndexBuilderSharedPtr index_builder) {
1134
0
    RETURN_IF_ERROR(index_builder->init());
1135
0
    RETURN_IF_ERROR(index_builder->do_build_inverted_index());
1136
0
    return Status::OK();
1137
0
}
1138
1139
17
void StorageEngine::_cooldown_tasks_producer_callback() {
1140
17
    int64_t interval = config::generate_cooldown_task_interval_sec;
1141
    // the cooldown replica may be slow to upload it's meta file, so we should wait
1142
    // until it has done uploaded
1143
17
    int64_t skip_failed_interval = interval * 10;
1144
19
    do {
1145
        // these tables are ordered by priority desc
1146
19
        std::vector<TabletSharedPtr> tablets;
1147
19
        std::vector<RowsetSharedPtr> rowsets;
1148
        // TODO(luwei) : a more efficient way to get cooldown tablets
1149
19
        auto cur_time = time(nullptr);
1150
        // we should skip all the tablets which are not running and those pending to do cooldown
1151
        // also tablets once failed to do follow cooldown
1152
19
        auto skip_tablet = [this, skip_failed_interval,
1153
19
                            cur_time](const TabletSharedPtr& tablet) -> bool {
1154
0
            bool is_skip =
1155
0
                    cur_time - tablet->last_failed_follow_cooldown_time() < skip_failed_interval ||
1156
0
                    TABLET_RUNNING != tablet->tablet_state();
1157
0
            if (is_skip) {
1158
0
                return is_skip;
1159
0
            }
1160
0
            std::lock_guard<std::mutex> lock(_running_cooldown_mutex);
1161
0
            return _running_cooldown_tablets.find(tablet->tablet_id()) !=
1162
0
                   _running_cooldown_tablets.end();
1163
0
        };
1164
19
        _tablet_manager->get_cooldown_tablets(&tablets, &rowsets, std::move(skip_tablet));
1165
19
        LOG(INFO) << "cooldown producer get tablet num: " << tablets.size();
1166
19
        int max_priority = tablets.size();
1167
19
        int index = 0;
1168
19
        for (const auto& tablet : tablets) {
1169
0
            {
1170
0
                std::lock_guard<std::mutex> lock(_running_cooldown_mutex);
1171
0
                _running_cooldown_tablets.insert(tablet->tablet_id());
1172
0
            }
1173
0
            PriorityThreadPool::Task task;
1174
0
            RowsetSharedPtr rowset = std::move(rowsets[index++]);
1175
0
            task.work_function = [tablet, rowset, task_size = tablets.size(), this]() {
1176
0
                Status st = tablet->cooldown(rowset);
1177
0
                {
1178
0
                    std::lock_guard<std::mutex> lock(_running_cooldown_mutex);
1179
0
                    _running_cooldown_tablets.erase(tablet->tablet_id());
1180
0
                }
1181
0
                if (!st.ok()) {
1182
0
                    LOG(WARNING) << "failed to cooldown, tablet: " << tablet->tablet_id()
1183
0
                                 << " err: " << st;
1184
0
                } else {
1185
0
                    LOG(INFO) << "succeed to cooldown, tablet: " << tablet->tablet_id()
1186
0
                              << " cooldown progress ("
1187
0
                              << task_size - _cooldown_thread_pool->get_queue_size() << "/"
1188
0
                              << task_size << ")";
1189
0
                }
1190
0
            };
1191
0
            task.priority = max_priority--;
1192
0
            bool submited = _cooldown_thread_pool->offer(std::move(task));
1193
1194
0
            if (!submited) {
1195
0
                LOG(INFO) << "failed to submit cooldown task";
1196
0
            }
1197
0
        }
1198
19
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
1199
17
}
1200
1201
17
void StorageEngine::_remove_unused_remote_files_callback() {
1202
17
    while (!_stop_background_threads_latch.wait_for(
1203
17
            std::chrono::seconds(config::remove_unused_remote_files_interval_sec))) {
1204
0
        LOG(INFO) << "begin to remove unused remote files";
1205
0
        do_remove_unused_remote_files();
1206
0
    }
1207
17
}
1208
1209
0
void StorageEngine::do_remove_unused_remote_files() {
1210
0
    auto tablets = tablet_manager()->get_all_tablet([](Tablet* t) {
1211
0
        return t->tablet_meta()->cooldown_meta_id().initialized() && t->is_used() &&
1212
0
               t->tablet_state() == TABLET_RUNNING &&
1213
0
               t->cooldown_conf_unlocked().cooldown_replica_id == t->replica_id();
1214
0
    });
1215
0
    TConfirmUnusedRemoteFilesRequest req;
1216
0
    req.__isset.confirm_list = true;
1217
    // tablet_id -> [fs, unused_remote_files]
1218
0
    using unused_remote_files_buffer_t = std::unordered_map<
1219
0
            int64_t, std::pair<std::shared_ptr<io::RemoteFileSystem>, std::vector<io::FileInfo>>>;
1220
0
    unused_remote_files_buffer_t buffer;
1221
0
    int64_t num_files_in_buffer = 0;
1222
    // assume a filename is 0.1KB, buffer size should not larger than 100MB
1223
0
    constexpr int64_t max_files_in_buffer = 1000000;
1224
1225
0
    auto calc_unused_remote_files = [&req, &buffer, &num_files_in_buffer, this](Tablet* t) {
1226
0
        auto storage_policy = get_storage_policy(t->storage_policy_id());
1227
0
        if (storage_policy == nullptr) {
1228
0
            LOG(WARNING) << "could not find storage_policy, storage_policy_id="
1229
0
                         << t->storage_policy_id();
1230
0
            return;
1231
0
        }
1232
0
        auto resource = get_storage_resource(storage_policy->resource_id);
1233
0
        auto dest_fs = std::static_pointer_cast<io::RemoteFileSystem>(resource.fs);
1234
0
        if (dest_fs == nullptr) {
1235
0
            LOG(WARNING) << "could not find resource, resouce_id=" << storage_policy->resource_id;
1236
0
            return;
1237
0
        }
1238
0
        DCHECK(atol(dest_fs->id().c_str()) == storage_policy->resource_id);
1239
0
        DCHECK(dest_fs->type() != io::FileSystemType::LOCAL);
1240
1241
0
        std::shared_ptr<io::RemoteFileSystem> fs;
1242
0
        auto st = get_remote_file_system(t->storage_policy_id(), &fs);
1243
0
        if (!st.ok()) {
1244
0
            LOG(WARNING) << "encounter error when remove unused remote files, tablet_id="
1245
0
                         << t->tablet_id() << " : " << st;
1246
0
            return;
1247
0
        }
1248
1249
0
        std::vector<io::FileInfo> files;
1250
        // FIXME(plat1ko): What if user reset resource in storage policy to another resource?
1251
        //  Maybe we should also list files in previously uploaded resources.
1252
0
        bool exists = true;
1253
0
        st = dest_fs->list(io::Path(remote_tablet_path(t->tablet_id())), true, &files, &exists);
1254
0
        if (!st.ok()) {
1255
0
            LOG(WARNING) << "encounter error when remove unused remote files, tablet_id="
1256
0
                         << t->tablet_id() << " : " << st;
1257
0
            return;
1258
0
        }
1259
0
        if (!exists || files.empty()) {
1260
0
            return;
1261
0
        }
1262
        // get all cooldowned rowsets
1263
0
        RowsetIdUnorderedSet cooldowned_rowsets;
1264
0
        UniqueId cooldown_meta_id;
1265
0
        {
1266
0
            std::shared_lock rlock(t->get_header_lock());
1267
0
            for (auto&& rs_meta : t->tablet_meta()->all_rs_metas()) {
1268
0
                if (!rs_meta->is_local()) {
1269
0
                    cooldowned_rowsets.insert(rs_meta->rowset_id());
1270
0
                }
1271
0
            }
1272
0
            if (cooldowned_rowsets.empty()) {
1273
0
                return;
1274
0
            }
1275
0
            cooldown_meta_id = t->tablet_meta()->cooldown_meta_id();
1276
0
        }
1277
0
        auto [cooldown_replica_id, cooldown_term] = t->cooldown_conf();
1278
0
        if (cooldown_replica_id != t->replica_id()) {
1279
0
            return;
1280
0
        }
1281
        // {cooldown_replica_id}.{cooldown_term}.meta
1282
0
        std::string remote_meta_path =
1283
0
                fmt::format("{}.{}.meta", cooldown_replica_id, cooldown_term);
1284
        // filter out the paths that should be reserved
1285
0
        auto filter = [&, this](io::FileInfo& info) {
1286
0
            std::string_view filename = info.file_name;
1287
0
            if (filename.ends_with(".meta")) {
1288
0
                return filename == remote_meta_path;
1289
0
            }
1290
0
            auto rowset_id = extract_rowset_id(filename);
1291
0
            if (rowset_id.hi == 0) {
1292
0
                return false;
1293
0
            }
1294
0
            return cooldowned_rowsets.contains(rowset_id) ||
1295
0
                   pending_remote_rowsets().contains(rowset_id);
1296
0
        };
1297
0
        files.erase(std::remove_if(files.begin(), files.end(), std::move(filter)), files.end());
1298
0
        if (files.empty()) {
1299
0
            return;
1300
0
        }
1301
0
        files.shrink_to_fit();
1302
0
        num_files_in_buffer += files.size();
1303
0
        buffer.insert({t->tablet_id(), {std::move(dest_fs), std::move(files)}});
1304
0
        auto& info = req.confirm_list.emplace_back();
1305
0
        info.__set_tablet_id(t->tablet_id());
1306
0
        info.__set_cooldown_replica_id(cooldown_replica_id);
1307
0
        info.__set_cooldown_meta_id(cooldown_meta_id.to_thrift());
1308
0
    };
1309
1310
0
    auto confirm_and_remove_files = [&buffer, &req, &num_files_in_buffer]() {
1311
0
        TConfirmUnusedRemoteFilesResult result;
1312
0
        LOG(INFO) << "begin to confirm unused remote files. num_tablets=" << buffer.size()
1313
0
                  << " num_files=" << num_files_in_buffer;
1314
0
        auto st = MasterServerClient::instance()->confirm_unused_remote_files(req, &result);
1315
0
        if (!st.ok()) {
1316
0
            LOG(WARNING) << st;
1317
0
            return;
1318
0
        }
1319
0
        for (auto id : result.confirmed_tablets) {
1320
0
            if (auto it = buffer.find(id); LIKELY(it != buffer.end())) {
1321
0
                auto& fs = it->second.first;
1322
0
                auto& files = it->second.second;
1323
0
                std::vector<io::Path> paths;
1324
0
                paths.reserve(files.size());
1325
                // delete unused files
1326
0
                LOG(INFO) << "delete unused files. root_path=" << fs->root_path()
1327
0
                          << " tablet_id=" << id;
1328
0
                io::Path dir = remote_tablet_path(id);
1329
0
                for (auto& file : files) {
1330
0
                    auto file_path = dir / file.file_name;
1331
0
                    LOG(INFO) << "delete unused file: " << file_path.native();
1332
0
                    paths.push_back(std::move(file_path));
1333
0
                }
1334
0
                st = fs->batch_delete(paths);
1335
0
                if (!st.ok()) {
1336
0
                    LOG(WARNING) << "failed to delete unused files, tablet_id=" << id << " : "
1337
0
                                 << st;
1338
0
                }
1339
0
                buffer.erase(it);
1340
0
            }
1341
0
        }
1342
0
    };
1343
1344
    // batch confirm to reduce FE's overhead
1345
0
    auto next_confirm_time = std::chrono::steady_clock::now() +
1346
0
                             std::chrono::seconds(config::confirm_unused_remote_files_interval_sec);
1347
0
    for (auto& t : tablets) {
1348
0
        if (t.use_count() <= 1 // this means tablet has been dropped
1349
0
            || t->cooldown_conf_unlocked().cooldown_replica_id != t->replica_id() ||
1350
0
            t->tablet_state() != TABLET_RUNNING) {
1351
0
            continue;
1352
0
        }
1353
0
        calc_unused_remote_files(t.get());
1354
0
        if (num_files_in_buffer > 0 && (num_files_in_buffer > max_files_in_buffer ||
1355
0
                                        std::chrono::steady_clock::now() > next_confirm_time)) {
1356
0
            confirm_and_remove_files();
1357
0
            buffer.clear();
1358
0
            req.confirm_list.clear();
1359
0
            num_files_in_buffer = 0;
1360
0
            next_confirm_time =
1361
0
                    std::chrono::steady_clock::now() +
1362
0
                    std::chrono::seconds(config::confirm_unused_remote_files_interval_sec);
1363
0
        }
1364
0
    }
1365
0
    if (num_files_in_buffer > 0) {
1366
0
        confirm_and_remove_files();
1367
0
    }
1368
0
}
1369
1370
17
void StorageEngine::_cold_data_compaction_producer_callback() {
1371
17
    std::unordered_set<int64_t> tablet_submitted;
1372
17
    std::mutex tablet_submitted_mtx;
1373
1374
17
    while (!_stop_background_threads_latch.wait_for(
1375
17
            std::chrono::seconds(config::cold_data_compaction_interval_sec))) {
1376
0
        if (config::disable_auto_compaction ||
1377
0
            GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
1378
0
            continue;
1379
0
        }
1380
1381
0
        std::unordered_set<int64_t> copied_tablet_submitted;
1382
0
        {
1383
0
            std::lock_guard lock(tablet_submitted_mtx);
1384
0
            copied_tablet_submitted = tablet_submitted;
1385
0
        }
1386
0
        int n = config::cold_data_compaction_thread_num - copied_tablet_submitted.size();
1387
0
        if (n <= 0) {
1388
0
            continue;
1389
0
        }
1390
0
        auto tablets = _tablet_manager->get_all_tablet([&copied_tablet_submitted](Tablet* t) {
1391
0
            return t->tablet_meta()->cooldown_meta_id().initialized() && t->is_used() &&
1392
0
                   t->tablet_state() == TABLET_RUNNING &&
1393
0
                   !copied_tablet_submitted.count(t->tablet_id()) &&
1394
0
                   !t->tablet_meta()->tablet_schema()->disable_auto_compaction();
1395
0
        });
1396
0
        std::vector<std::pair<TabletSharedPtr, int64_t>> tablet_to_compact;
1397
0
        tablet_to_compact.reserve(n + 1);
1398
0
        std::vector<std::pair<TabletSharedPtr, int64_t>> tablet_to_follow;
1399
0
        tablet_to_follow.reserve(n + 1);
1400
1401
0
        for (auto& t : tablets) {
1402
0
            if (t->replica_id() == t->cooldown_conf_unlocked().cooldown_replica_id) {
1403
0
                auto score = t->calc_cold_data_compaction_score();
1404
0
                if (score < 4) {
1405
0
                    continue;
1406
0
                }
1407
0
                tablet_to_compact.emplace_back(t, score);
1408
0
                if (tablet_to_compact.size() > n) {
1409
0
                    std::sort(tablet_to_compact.begin(), tablet_to_compact.end(),
1410
0
                              [](auto& a, auto& b) { return a.second > b.second; });
1411
0
                    tablet_to_compact.pop_back();
1412
0
                }
1413
0
                continue;
1414
0
            }
1415
            // else, need to follow
1416
0
            {
1417
0
                std::lock_guard lock(_running_cooldown_mutex);
1418
0
                if (_running_cooldown_tablets.count(t->table_id())) {
1419
                    // already in cooldown queue
1420
0
                    continue;
1421
0
                }
1422
0
            }
1423
            // TODO(plat1ko): some avoidance strategy if failed to follow
1424
0
            auto score = t->calc_cold_data_compaction_score();
1425
0
            tablet_to_follow.emplace_back(t, score);
1426
1427
0
            if (tablet_to_follow.size() > n) {
1428
0
                std::sort(tablet_to_follow.begin(), tablet_to_follow.end(),
1429
0
                          [](auto& a, auto& b) { return a.second > b.second; });
1430
0
                tablet_to_follow.pop_back();
1431
0
            }
1432
0
        }
1433
1434
0
        for (auto& [tablet, score] : tablet_to_compact) {
1435
0
            LOG(INFO) << "submit cold data compaction. tablet_id=" << tablet->tablet_id()
1436
0
                      << " score=" << score;
1437
0
            static_cast<void>(
1438
0
                    _cold_data_compaction_thread_pool->submit_func([&, t = std::move(tablet)]() {
1439
0
                        auto compaction = std::make_shared<ColdDataCompaction>(t);
1440
0
                        {
1441
0
                            std::lock_guard lock(tablet_submitted_mtx);
1442
0
                            tablet_submitted.insert(t->tablet_id());
1443
0
                        }
1444
0
                        std::unique_lock cold_compaction_lock(t->get_cold_compaction_lock(),
1445
0
                                                              std::try_to_lock);
1446
0
                        if (!cold_compaction_lock.owns_lock()) {
1447
0
                            LOG(WARNING) << "try cold_compaction_lock failed, tablet_id="
1448
0
                                         << t->tablet_id();
1449
0
                        }
1450
0
                        auto st = compaction->compact();
1451
0
                        {
1452
0
                            std::lock_guard lock(tablet_submitted_mtx);
1453
0
                            tablet_submitted.erase(t->tablet_id());
1454
0
                        }
1455
0
                        if (!st.ok()) {
1456
0
                            LOG(WARNING) << "failed to do cold data compaction. tablet_id="
1457
0
                                         << t->tablet_id() << " err=" << st;
1458
0
                        }
1459
0
                    }));
1460
0
        }
1461
1462
0
        for (auto& [tablet, score] : tablet_to_follow) {
1463
0
            LOG(INFO) << "submit to follow cooldown meta. tablet_id=" << tablet->tablet_id()
1464
0
                      << " score=" << score;
1465
0
            static_cast<void>(
1466
0
                    _cold_data_compaction_thread_pool->submit_func([&, t = std::move(tablet)]() {
1467
0
                        {
1468
0
                            std::lock_guard lock(tablet_submitted_mtx);
1469
0
                            tablet_submitted.insert(t->tablet_id());
1470
0
                        }
1471
0
                        auto st = t->cooldown();
1472
0
                        {
1473
0
                            std::lock_guard lock(tablet_submitted_mtx);
1474
0
                            tablet_submitted.erase(t->tablet_id());
1475
0
                        }
1476
0
                        if (!st.ok()) {
1477
0
                            LOG(WARNING) << "failed to cooldown. tablet_id=" << t->tablet_id()
1478
0
                                         << " err=" << st;
1479
0
                        }
1480
0
                    }));
1481
0
        }
1482
0
    }
1483
17
}
1484
1485
void StorageEngine::add_async_publish_task(int64_t partition_id, int64_t tablet_id,
1486
                                           int64_t publish_version, int64_t transaction_id,
1487
2.05k
                                           bool is_recovery) {
1488
2.05k
    if (!is_recovery) {
1489
2.05k
        bool exists = false;
1490
2.05k
        {
1491
2.05k
            std::shared_lock<std::shared_mutex> rlock(_async_publish_lock);
1492
2.05k
            if (auto tablet_iter = _async_publish_tasks.find(tablet_id);
1493
2.05k
                tablet_iter != _async_publish_tasks.end()) {
1494
2.05k
                if (auto iter = tablet_iter->second.find(publish_version);
1495
2.05k
                    iter != tablet_iter->second.end()) {
1496
20
                    exists = true;
1497
20
                }
1498
2.05k
            }
1499
2.05k
        }
1500
2.05k
        if (exists) {
1501
20
            return;
1502
20
        }
1503
2.03k
        TabletSharedPtr tablet = tablet_manager()->get_tablet(tablet_id);
1504
2.03k
        if (tablet == nullptr) {
1505
0
            LOG(INFO) << "tablet may be dropped when add async publish task, tablet_id: "
1506
0
                      << tablet_id;
1507
0
            return;
1508
0
        }
1509
2.03k
        PendingPublishInfoPB pending_publish_info_pb;
1510
2.03k
        pending_publish_info_pb.set_partition_id(partition_id);
1511
2.03k
        pending_publish_info_pb.set_transaction_id(transaction_id);
1512
2.03k
        static_cast<void>(TabletMetaManager::save_pending_publish_info(
1513
2.03k
                tablet->data_dir(), tablet->tablet_id(), publish_version,
1514
2.03k
                pending_publish_info_pb.SerializeAsString()));
1515
2.03k
    }
1516
2.03k
    LOG(INFO) << "add pending publish task, tablet_id: " << tablet_id
1517
2.03k
              << " version: " << publish_version << " txn_id:" << transaction_id
1518
2.03k
              << " is_recovery: " << is_recovery;
1519
2.03k
    std::unique_lock<std::shared_mutex> wlock(_async_publish_lock);
1520
2.03k
    _async_publish_tasks[tablet_id][publish_version] = {transaction_id, partition_id};
1521
2.03k
}
1522
1523
3
int64_t StorageEngine::get_pending_publish_min_version(int64_t tablet_id) {
1524
3
    std::shared_lock<std::shared_mutex> rlock(_async_publish_lock);
1525
3
    auto iter = _async_publish_tasks.find(tablet_id);
1526
3
    if (iter == _async_publish_tasks.end()) {
1527
0
        return INT64_MAX;
1528
0
    }
1529
3
    if (iter->second.empty()) {
1530
0
        return INT64_MAX;
1531
0
    }
1532
3
    return iter->second.begin()->first;
1533
3
}
1534
1535
1.49k
void StorageEngine::_process_async_publish() {
1536
    // tablet, publish_version
1537
1.49k
    std::vector<std::pair<TabletSharedPtr, int64_t>> need_removed_tasks;
1538
1.49k
    {
1539
1.49k
        std::unique_lock<std::shared_mutex> wlock(_async_publish_lock);
1540
1.49k
        for (auto tablet_iter = _async_publish_tasks.begin();
1541
1.50k
             tablet_iter != _async_publish_tasks.end();) {
1542
10
            if (tablet_iter->second.empty()) {
1543
1
                tablet_iter = _async_publish_tasks.erase(tablet_iter);
1544
1
                continue;
1545
1
            }
1546
9
            int64_t tablet_id = tablet_iter->first;
1547
9
            TabletSharedPtr tablet = tablet_manager()->get_tablet(tablet_id);
1548
9
            if (!tablet) {
1549
1
                LOG(WARNING) << "tablet does not exist when async publush, tablet_id: "
1550
1
                             << tablet_id;
1551
1
                tablet_iter = _async_publish_tasks.erase(tablet_iter);
1552
1
                continue;
1553
1
            }
1554
1555
8
            auto task_iter = tablet_iter->second.begin();
1556
8
            int64_t version = task_iter->first;
1557
8
            int64_t transaction_id = task_iter->second.first;
1558
8
            int64_t partition_id = task_iter->second.second;
1559
8
            int64_t max_version = tablet->max_version().second;
1560
1561
8
            if (version <= max_version) {
1562
6
                need_removed_tasks.emplace_back(tablet, version);
1563
6
                tablet_iter->second.erase(task_iter);
1564
6
                tablet_iter++;
1565
6
                continue;
1566
6
            }
1567
2
            if (version != max_version + 1) {
1568
                // Keep only the most recent versions
1569
31
                while (tablet_iter->second.size() > config::max_tablet_version_num) {
1570
30
                    need_removed_tasks.emplace_back(tablet, version);
1571
30
                    task_iter = tablet_iter->second.erase(task_iter);
1572
30
                    version = task_iter->first;
1573
30
                }
1574
1
                tablet_iter++;
1575
1
                continue;
1576
1
            }
1577
1578
1
            auto async_publish_task = std::make_shared<AsyncTabletPublishTask>(
1579
1
                    tablet, partition_id, transaction_id, version);
1580
1
            static_cast<void>(_tablet_publish_txn_thread_pool->submit_func(
1581
1
                    [=]() { async_publish_task->handle(); }));
1582
1
            tablet_iter->second.erase(task_iter);
1583
1
            need_removed_tasks.emplace_back(tablet, version);
1584
1
            tablet_iter++;
1585
1
        }
1586
1.49k
    }
1587
1.49k
    for (auto& [tablet, publish_version] : need_removed_tasks) {
1588
37
        static_cast<void>(TabletMetaManager::remove_pending_publish_info(
1589
37
                tablet->data_dir(), tablet->tablet_id(), publish_version));
1590
37
    }
1591
1.49k
}
1592
1593
17
void StorageEngine::_async_publish_callback() {
1594
1.50k
    while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(30))) {
1595
1.48k
        _process_async_publish();
1596
1.48k
    }
1597
17
}
1598
1599
} // namespace doris