Coverage Report

Created: 2026-04-07 11:24

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_storage_engine.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_storage_engine.h"
19
20
#include <bvar/reducer.h>
21
#include <gen_cpp/PlanNodes_types.h>
22
#include <gen_cpp/cloud.pb.h>
23
#include <gen_cpp/olap_file.pb.h>
24
#include <rapidjson/document.h>
25
#include <rapidjson/encodings.h>
26
#include <rapidjson/prettywriter.h>
27
#include <rapidjson/stringbuffer.h>
28
29
#include <algorithm>
30
#include <memory>
31
#include <variant>
32
33
#include "cloud/cloud_base_compaction.h"
34
#include "cloud/cloud_compaction_stop_token.h"
35
#include "cloud/cloud_cumulative_compaction.h"
36
#include "cloud/cloud_cumulative_compaction_policy.h"
37
#include "cloud/cloud_full_compaction.h"
38
#include "cloud/cloud_index_change_compaction.h"
39
#include "cloud/cloud_meta_mgr.h"
40
#include "cloud/cloud_snapshot_mgr.h"
41
#include "cloud/cloud_tablet_hotspot.h"
42
#include "cloud/cloud_tablet_mgr.h"
43
#include "cloud/cloud_txn_delete_bitmap_cache.h"
44
#include "cloud/cloud_warm_up_manager.h"
45
#include "cloud/config.h"
46
#include "common/config.h"
47
#include "common/metrics/doris_metrics.h"
48
#include "common/signal_handler.h"
49
#include "common/status.h"
50
#include "core/assert_cast.h"
51
#include "io/cache/block_file_cache_downloader.h"
52
#include "io/cache/block_file_cache_factory.h"
53
#include "io/cache/file_cache_common.h"
54
#include "io/fs/file_system.h"
55
#include "io/fs/hdfs_file_system.h"
56
#include "io/fs/s3_file_system.h"
57
#include "io/hdfs_util.h"
58
#include "io/io_common.h"
59
#include "load/memtable/memtable_flush_executor.h"
60
#include "runtime/exec_env.h"
61
#include "runtime/memory/cache_manager.h"
62
#include "storage/compaction/cumulative_compaction_policy.h"
63
#include "storage/compaction/cumulative_compaction_time_series_policy.h"
64
#include "storage/compaction_task_tracker.h"
65
#include "storage/storage_policy.h"
66
#include "util/parse_util.h"
67
#include "util/time.h"
68
69
namespace doris {
70
#include "common/compile_check_begin.h"
71
72
using namespace std::literals;
73
74
bvar::Adder<uint64_t> g_base_compaction_running_task_count("base_compaction_running_task_count");
75
bvar::Adder<uint64_t> g_full_compaction_running_task_count("full_compaction_running_task_count");
76
bvar::Adder<uint64_t> g_cumu_compaction_running_task_count(
77
        "cumulative_compaction_running_task_count");
78
79
18.7k
int get_cumu_thread_num() {
80
18.7k
    if (config::max_cumu_compaction_threads > 0) {
81
0
        return config::max_cumu_compaction_threads;
82
0
    }
83
84
18.7k
    int num_cores = doris::CpuInfo::num_cores();
85
18.7k
    return std::min(std::max(int(num_cores * config::cumu_compaction_thread_num_factor), 2), 20);
86
18.7k
}
87
88
18.7k
int get_base_thread_num() {
89
18.7k
    if (config::max_base_compaction_threads > 0) {
90
18.7k
        return config::max_base_compaction_threads;
91
18.7k
    }
92
93
0
    int num_cores = doris::CpuInfo::num_cores();
94
0
    return std::min(std::max(int(num_cores * config::base_compaction_thread_num_factor), 1), 10);
95
18.7k
}
96
97
CloudStorageEngine::CloudStorageEngine(const EngineOptions& options)
98
154
        : BaseStorageEngine(Type::CLOUD, options.backend_uid),
99
154
          _meta_mgr(std::make_unique<cloud::CloudMetaMgr>()),
100
154
          _tablet_mgr(std::make_unique<CloudTabletMgr>(*this)),
101
154
          _options(options) {
102
154
    _cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY] =
103
154
            std::make_shared<CloudSizeBasedCumulativeCompactionPolicy>();
104
154
    _cumulative_compaction_policies[CUMULATIVE_TIME_SERIES_POLICY] =
105
154
            std::make_shared<CloudTimeSeriesCumulativeCompactionPolicy>();
106
154
    _startup_timepoint = std::chrono::system_clock::now();
107
154
}
108
109
153
CloudStorageEngine::~CloudStorageEngine() {
110
153
    stop();
111
153
}
112
113
static Status vault_process_error(std::string_view id,
114
0
                                  std::variant<S3Conf, cloud::HdfsVaultInfo>& vault, Status err) {
115
0
    std::stringstream ss;
116
0
    std::visit(
117
0
            [&]<typename T>(T& val) {
118
0
                if constexpr (std::is_same_v<T, S3Conf>) {
119
0
                    ss << val.to_string();
120
0
                } else if constexpr (std::is_same_v<T, cloud::HdfsVaultInfo>) {
121
0
                    val.SerializeToOstream(&ss);
122
0
                }
123
0
            },
Unexecuted instantiation: cloud_storage_engine.cpp:_ZZN5dorisL19vault_process_errorESt17basic_string_viewIcSt11char_traitsIcEERSt7variantIJNS_6S3ConfENS_5cloud13HdfsVaultInfoEEENS_6StatusEENK3$_0clIS5_EEDaRT_
Unexecuted instantiation: cloud_storage_engine.cpp:_ZZN5dorisL19vault_process_errorESt17basic_string_viewIcSt11char_traitsIcEERSt7variantIJNS_6S3ConfENS_5cloud13HdfsVaultInfoEEENS_6StatusEENK3$_0clIS7_EEDaRT_
124
0
            vault);
125
0
    return Status::IOError("Invalid vault, id {}, err {}, detail conf {}", id, err, ss.str());
126
0
}
127
128
struct VaultCreateFSVisitor {
129
    VaultCreateFSVisitor(const std::string& id, const cloud::StorageVaultPB_PathFormat& path_format,
130
                         bool check_fs)
131
1
            : id(id), path_format(path_format), check_fs(check_fs) {}
132
1
    Status operator()(const S3Conf& s3_conf) const {
133
1
        LOG(INFO) << "get new s3 info: " << s3_conf.to_string() << " resource_id=" << id
134
1
                  << " check_fs: " << check_fs;
135
136
1
        auto fs = DORIS_TRY(io::S3FileSystem::create(s3_conf, id));
137
1
        if (check_fs && !s3_conf.client_conf.role_arn.empty()) {
138
0
            bool res = false;
139
            // just check connectivity, not care object if exist
140
0
            auto st = fs->exists("not_exist_object", &res);
141
0
            if (!st.ok()) {
142
0
                LOG(FATAL) << "failed to check s3 fs, resource_id: " << id << " st: " << st
143
0
                           << "s3_conf: " << s3_conf.to_string()
144
0
                           << "add enable_check_storage_vault=false to be.conf to skip the check";
145
0
            }
146
0
        }
147
148
1
        put_storage_resource(id, {std::move(fs), path_format}, 0);
149
1
        LOG_INFO("successfully create s3 vault, vault id {}", id);
150
1
        return Status::OK();
151
1
    }
152
153
    // TODO(ByteYue): Make sure enable_java_support is on
154
0
    Status operator()(const cloud::HdfsVaultInfo& vault) const {
155
0
        auto hdfs_params = io::to_hdfs_params(vault);
156
0
        auto fs = DORIS_TRY(io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name, id,
157
0
                                                       nullptr, vault.prefix()));
158
0
        put_storage_resource(id, {std::move(fs), path_format}, 0);
159
0
        LOG_INFO("successfully create hdfs vault, vault id {}", id);
160
0
        return Status::OK();
161
0
    }
162
163
    const std::string& id;
164
    const cloud::StorageVaultPB_PathFormat& path_format;
165
    bool check_fs;
166
};
167
168
struct RefreshFSVaultVisitor {
169
    RefreshFSVaultVisitor(const std::string& id, io::FileSystemSPtr fs,
170
                          const cloud::StorageVaultPB_PathFormat& path_format)
171
110
            : id(id), fs(std::move(fs)), path_format(path_format) {}
172
173
110
    Status operator()(const S3Conf& s3_conf) const {
174
110
        DCHECK_EQ(fs->type(), io::FileSystemType::S3) << id;
175
110
        auto s3_fs = std::static_pointer_cast<io::S3FileSystem>(fs);
176
110
        auto client_holder = s3_fs->client_holder();
177
110
        auto st = client_holder->reset(s3_conf.client_conf);
178
110
        if (!st.ok()) {
179
0
            LOG(WARNING) << "failed to update s3 fs, resource_id=" << id << ": " << st;
180
0
        }
181
110
        return st;
182
110
    }
183
184
0
    Status operator()(const cloud::HdfsVaultInfo& vault) const {
185
0
        auto hdfs_params = io::to_hdfs_params(vault);
186
0
        auto hdfs_fs =
187
0
                DORIS_TRY(io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name, id, nullptr,
188
0
                                                     vault.has_prefix() ? vault.prefix() : ""));
189
0
        auto hdfs = std::static_pointer_cast<io::HdfsFileSystem>(hdfs_fs);
190
0
        put_storage_resource(id, {std::move(hdfs), path_format}, 0);
191
0
        return Status::OK();
192
0
    }
193
194
    const std::string& id;
195
    io::FileSystemSPtr fs;
196
    const cloud::StorageVaultPB_PathFormat& path_format;
197
};
198
199
1
Status CloudStorageEngine::open() {
200
1
    sync_storage_vault();
201
202
    // TODO(plat1ko): DeleteBitmapTxnManager
203
204
1
    _memtable_flush_executor = std::make_unique<MemTableFlushExecutor>();
205
    // Use file cache disks number
206
1
    _memtable_flush_executor->init(
207
1
            cast_set<int32_t>(io::FileCacheFactory::instance()->get_cache_instance_size()));
208
209
1
    _calc_delete_bitmap_executor = std::make_unique<CalcDeleteBitmapExecutor>();
210
1
    _calc_delete_bitmap_executor->init(config::calc_delete_bitmap_max_thread);
211
212
1
    _calc_delete_bitmap_executor_for_load = std::make_unique<CalcDeleteBitmapExecutor>();
213
1
    _calc_delete_bitmap_executor_for_load->init(
214
1
            config::calc_delete_bitmap_for_load_max_thread > 0
215
1
                    ? config::calc_delete_bitmap_for_load_max_thread
216
1
                    : std::max(1, CpuInfo::num_cores() / 2));
217
218
    // The default cache is set to 100MB, use memory limit to dynamic adjustment
219
1
    bool is_percent = false;
220
1
    int64_t delete_bitmap_agg_cache_cache_limit =
221
1
            ParseUtil::parse_mem_spec(config::delete_bitmap_dynamic_agg_cache_limit,
222
1
                                      MemInfo::mem_limit(), MemInfo::physical_mem(), &is_percent);
223
1
    _txn_delete_bitmap_cache = std::make_unique<CloudTxnDeleteBitmapCache>(
224
1
            delete_bitmap_agg_cache_cache_limit > config::delete_bitmap_agg_cache_capacity
225
1
                    ? delete_bitmap_agg_cache_cache_limit
226
1
                    : config::delete_bitmap_agg_cache_capacity);
227
1
    RETURN_IF_ERROR(_txn_delete_bitmap_cache->init());
228
229
1
    _committed_rs_mgr = std::make_unique<CloudCommittedRSMgr>();
230
1
    RETURN_IF_ERROR(_committed_rs_mgr->init());
231
232
1
    _file_cache_block_downloader = std::make_unique<io::FileCacheBlockDownloader>(*this);
233
234
1
    _cloud_warm_up_manager = std::make_unique<CloudWarmUpManager>(*this);
235
236
1
    _tablet_hotspot = std::make_unique<TabletHotspot>();
237
238
1
    _cloud_snapshot_mgr = std::make_unique<CloudSnapshotMgr>(*this);
239
240
1
    RETURN_NOT_OK_STATUS_WITH_WARN(
241
1
            init_stream_load_recorder(ExecEnv::GetInstance()->store_paths()[0].path),
242
1
            "init StreamLoadRecorder failed");
243
244
    // check cluster id
245
1
    RETURN_NOT_OK_STATUS_WITH_WARN(_check_all_root_path_cluster_id(), "fail to check cluster id");
246
247
1
    RETURN_NOT_OK_STATUS_WITH_WARN(ThreadPoolBuilder("SyncLoadForTabletsThreadPool")
248
1
                                           .set_max_threads(config::sync_load_for_tablets_thread)
249
1
                                           .set_min_threads(config::sync_load_for_tablets_thread)
250
1
                                           .build(&_sync_load_for_tablets_thread_pool),
251
1
                                   "fail to build SyncLoadForTabletsThreadPool");
252
253
1
    RETURN_NOT_OK_STATUS_WITH_WARN(ThreadPoolBuilder("WarmupCacheAsyncThreadPool")
254
1
                                           .set_max_threads(config::warmup_cache_async_thread)
255
1
                                           .set_min_threads(config::warmup_cache_async_thread)
256
1
                                           .build(&_warmup_cache_async_thread_pool),
257
1
                                   "fail to build WarmupCacheAsyncThreadPool");
258
259
1
    return Status::OK();
260
1
}
261
262
153
void CloudStorageEngine::stop() {
263
153
    if (_stopped) {
264
0
        return;
265
0
    }
266
267
153
    _stopped = true;
268
153
    _stop_background_threads_latch.count_down();
269
270
153
    for (auto&& t : _bg_threads) {
271
0
        if (t) {
272
0
            t->join();
273
0
        }
274
0
    }
275
276
153
    if (_base_compaction_thread_pool) {
277
0
        _base_compaction_thread_pool->shutdown();
278
0
    }
279
153
    if (_cumu_compaction_thread_pool) {
280
0
        _cumu_compaction_thread_pool->shutdown();
281
0
    }
282
153
    _adaptive_thread_controller.stop();
283
153
    LOG(INFO) << "Cloud storage engine is stopped.";
284
285
153
    if (_calc_tablet_delete_bitmap_task_thread_pool) {
286
0
        _calc_tablet_delete_bitmap_task_thread_pool->shutdown();
287
0
    }
288
153
    if (_sync_delete_bitmap_thread_pool) {
289
0
        _sync_delete_bitmap_thread_pool->shutdown();
290
0
    }
291
153
}
292
293
61.1k
bool CloudStorageEngine::stopped() {
294
61.1k
    return _stopped;
295
61.1k
}
296
297
Result<BaseTabletSPtr> CloudStorageEngine::get_tablet(int64_t tablet_id,
298
                                                      SyncRowsetStats* sync_stats,
299
                                                      bool force_use_only_cached,
300
1.27M
                                                      bool cache_on_miss) {
301
1.27M
    return _tablet_mgr
302
1.27M
            ->get_tablet(tablet_id, false, true, sync_stats, force_use_only_cached, cache_on_miss)
303
1.27M
            .transform([](auto&& t) { return static_pointer_cast<BaseTablet>(std::move(t)); });
304
1.27M
}
305
306
Status CloudStorageEngine::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta,
307
158k
                                           bool force_use_only_cached) {
308
158k
    if (tablet_meta == nullptr) {
309
0
        return Status::InvalidArgument("tablet_meta output is null");
310
0
    }
311
312
#if 0
313
    if (_tablet_mgr && _tablet_mgr->peek_tablet_meta(tablet_id, tablet_meta)) {
314
        return Status::OK();
315
    }
316
317
    if (force_use_only_cached) {
318
        return Status::NotFound("tablet meta {} not found in cache", tablet_id);
319
    }
320
#endif
321
322
158k
    if (_meta_mgr == nullptr) {
323
0
        return Status::InternalError("cloud meta manager is not initialized");
324
0
    }
325
326
158k
    return _meta_mgr->get_tablet_meta(tablet_id, tablet_meta);
327
158k
}
328
329
1
Status CloudStorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr) {
330
1
    RETURN_IF_ERROR(Thread::create(
331
1
            "CloudStorageEngine", "refresh_s3_info_thread",
332
1
            [this]() { this->_refresh_storage_vault_info_thread_callback(); },
333
1
            &_bg_threads.emplace_back()));
334
1
    LOG(INFO) << "refresh s3 info thread started";
335
336
1
    RETURN_IF_ERROR(Thread::create(
337
1
            "CloudStorageEngine", "vacuum_stale_rowsets_thread",
338
1
            [this]() { this->_vacuum_stale_rowsets_thread_callback(); },
339
1
            &_bg_threads.emplace_back()));
340
1
    LOG(INFO) << "vacuum stale rowsets thread started";
341
342
1
    RETURN_IF_ERROR(Thread::create(
343
1
            "CloudStorageEngine", "sync_tablets_thread",
344
1
            [this]() { this->_sync_tablets_thread_callback(); }, &_bg_threads.emplace_back()));
345
1
    LOG(INFO) << "sync tablets thread started";
346
347
1
    RETURN_IF_ERROR(Thread::create(
348
1
            "CloudStorageEngine", "evict_querying_rowset_thread",
349
1
            [this]() { this->_evict_quring_rowset_thread_callback(); },
350
1
            &_evict_quering_rowset_thread));
351
1
    LOG(INFO) << "evict quering thread started";
352
353
    // add calculate tablet delete bitmap task thread pool
354
1
    RETURN_IF_ERROR(ThreadPoolBuilder("TabletCalDeleteBitmapThreadPool")
355
1
                            .set_min_threads(config::calc_tablet_delete_bitmap_task_max_thread)
356
1
                            .set_max_threads(config::calc_tablet_delete_bitmap_task_max_thread)
357
1
                            .build(&_calc_tablet_delete_bitmap_task_thread_pool));
358
1
    RETURN_IF_ERROR(ThreadPoolBuilder("SyncDeleteBitmapThreadPool")
359
1
                            .set_min_threads(config::sync_delete_bitmap_task_max_thread)
360
1
                            .set_max_threads(config::sync_delete_bitmap_task_max_thread)
361
1
                            .build(&_sync_delete_bitmap_thread_pool));
362
363
    // TODO(plat1ko): check_bucket_enable_versioning_thread
364
365
    // compaction tasks producer thread
366
1
    int base_thread_num = get_base_thread_num();
367
1
    int cumu_thread_num = get_cumu_thread_num();
368
369
1
    RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
370
1
                            .set_min_threads(base_thread_num)
371
1
                            .set_max_threads(base_thread_num)
372
1
                            .build(&_base_compaction_thread_pool));
373
1
    RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
374
1
                            .set_min_threads(cumu_thread_num)
375
1
                            .set_max_threads(cumu_thread_num)
376
1
                            .build(&_cumu_compaction_thread_pool));
377
1
    RETURN_IF_ERROR(Thread::create(
378
1
            "StorageEngine", "compaction_tasks_producer_thread",
379
1
            [this]() { this->_compaction_tasks_producer_callback(); },
380
1
            &_bg_threads.emplace_back()));
381
1
    LOG(INFO) << "compaction tasks producer thread started,"
382
1
              << " base thread num " << base_thread_num << " cumu thread num " << cumu_thread_num;
383
384
1
    RETURN_IF_ERROR(Thread::create(
385
1
            "StorageEngine", "lease_compaction_thread",
386
1
            [this]() { this->_lease_compaction_thread_callback(); }, &_bg_threads.emplace_back()));
387
388
1
    LOG(INFO) << "lease compaction thread started";
389
390
1
    RETURN_IF_ERROR(Thread::create(
391
1
            "StorageEngine", "check_tablet_delete_bitmap_score_thread",
392
1
            [this]() { this->_check_tablet_delete_bitmap_score_callback(); },
393
1
            &_bg_threads.emplace_back()));
394
1
    LOG(INFO) << "check tablet delete bitmap score thread started";
395
396
1
    _start_adaptive_thread_controller();
397
398
1
    return Status::OK();
399
1
}
400
401
111
void CloudStorageEngine::sync_storage_vault() {
402
111
    cloud::StorageVaultInfos vault_infos;
403
111
    bool enable_storage_vault = false;
404
405
111
    auto st = _meta_mgr->get_storage_vault_info(&vault_infos, &enable_storage_vault);
406
111
    if (!st.ok()) {
407
0
        LOG(WARNING) << "failed to get storage vault info. err=" << st;
408
0
        return;
409
0
    }
410
411
111
    if (vault_infos.empty()) {
412
0
        LOG(WARNING) << "empty storage vault info";
413
0
        return;
414
0
    }
415
416
111
    bool check_storage_vault = false;
417
111
    bool expected = false;
418
111
    if (first_sync_storage_vault.compare_exchange_strong(expected, true)) {
419
0
        check_storage_vault = config::enable_check_storage_vault;
420
0
        LOG(INFO) << "first sync storage vault info, BE try to check iam role connectivity, "
421
0
                     "check_storage_vault="
422
0
                  << check_storage_vault;
423
0
    }
424
425
111
    for (auto& [id, vault_info, path_format] : vault_infos) {
426
111
        auto fs = get_filesystem(id);
427
111
        auto status =
428
111
                (fs == nullptr)
429
111
                        ? std::visit(VaultCreateFSVisitor {id, path_format, check_storage_vault},
430
1
                                     vault_info)
431
111
                        : std::visit(RefreshFSVaultVisitor {id, std::move(fs), path_format},
432
110
                                     vault_info);
433
111
        if (!status.ok()) [[unlikely]] {
434
0
            LOG(WARNING) << vault_process_error(id, vault_info, std::move(st));
435
0
        }
436
111
    }
437
438
111
    if (auto& id = std::get<0>(vault_infos.back());
439
111
        (latest_fs() == nullptr || latest_fs()->id() != id) && !enable_storage_vault) {
440
1
        set_latest_fs(get_filesystem(id));
441
1
    }
442
111
}
443
444
// We should enable_java_support if we want to use hdfs vault
445
1
void CloudStorageEngine::_refresh_storage_vault_info_thread_callback() {
446
61
    while (!_stop_background_threads_latch.wait_for(
447
61
            std::chrono::seconds(config::refresh_s3_info_interval_s))) {
448
60
        sync_storage_vault();
449
60
    }
450
1
}
451
452
1
void CloudStorageEngine::_vacuum_stale_rowsets_thread_callback() {
453
13
    while (!_stop_background_threads_latch.wait_for(
454
13
            std::chrono::seconds(config::vacuum_stale_rowsets_interval_s))) {
455
12
        _tablet_mgr->vacuum_stale_rowsets(_stop_background_threads_latch);
456
12
    }
457
1
}
458
459
1
void CloudStorageEngine::_sync_tablets_thread_callback() {
460
6
    while (!_stop_background_threads_latch.wait_for(
461
6
            std::chrono::seconds(config::schedule_sync_tablets_interval_s))) {
462
5
        _tablet_mgr->sync_tablets(_stop_background_threads_latch);
463
5
    }
464
1
}
465
466
void CloudStorageEngine::get_cumu_compaction(
467
105k
        int64_t tablet_id, std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res) {
468
105k
    std::lock_guard lock(_compaction_mtx);
469
105k
    if (auto it = _submitted_cumu_compactions.find(tablet_id);
470
105k
        it != _submitted_cumu_compactions.end()) {
471
0
        res = it->second;
472
0
    }
473
105k
}
474
475
18.7k
Status CloudStorageEngine::_adjust_compaction_thread_num() {
476
18.7k
    int base_thread_num = get_base_thread_num();
477
478
18.7k
    if (!_base_compaction_thread_pool || !_cumu_compaction_thread_pool) {
479
0
        LOG(WARNING) << "base or cumu compaction thread pool is not created";
480
0
        return Status::Error<ErrorCode::INTERNAL_ERROR, false>("");
481
0
    }
482
483
18.7k
    if (_base_compaction_thread_pool->max_threads() != base_thread_num) {
484
0
        int old_max_threads = _base_compaction_thread_pool->max_threads();
485
0
        Status status = _base_compaction_thread_pool->set_max_threads(base_thread_num);
486
0
        if (status.ok()) {
487
0
            VLOG_NOTICE << "update base compaction thread pool max_threads from " << old_max_threads
488
0
                        << " to " << base_thread_num;
489
0
        }
490
0
    }
491
18.7k
    if (_base_compaction_thread_pool->min_threads() != base_thread_num) {
492
0
        int old_min_threads = _base_compaction_thread_pool->min_threads();
493
0
        Status status = _base_compaction_thread_pool->set_min_threads(base_thread_num);
494
0
        if (status.ok()) {
495
0
            VLOG_NOTICE << "update base compaction thread pool min_threads from " << old_min_threads
496
0
                        << " to " << base_thread_num;
497
0
        }
498
0
    }
499
500
18.7k
    int cumu_thread_num = get_cumu_thread_num();
501
18.7k
    if (_cumu_compaction_thread_pool->max_threads() != cumu_thread_num) {
502
0
        int old_max_threads = _cumu_compaction_thread_pool->max_threads();
503
0
        Status status = _cumu_compaction_thread_pool->set_max_threads(cumu_thread_num);
504
0
        if (status.ok()) {
505
0
            VLOG_NOTICE << "update cumu compaction thread pool max_threads from " << old_max_threads
506
0
                        << " to " << cumu_thread_num;
507
0
        }
508
0
    }
509
18.7k
    if (_cumu_compaction_thread_pool->min_threads() != cumu_thread_num) {
510
0
        int old_min_threads = _cumu_compaction_thread_pool->min_threads();
511
0
        Status status = _cumu_compaction_thread_pool->set_min_threads(cumu_thread_num);
512
0
        if (status.ok()) {
513
0
            VLOG_NOTICE << "update cumu compaction thread pool min_threads from " << old_min_threads
514
0
                        << " to " << cumu_thread_num;
515
0
        }
516
0
    }
517
18.7k
    return Status::OK();
518
18.7k
}
519
520
1
void CloudStorageEngine::_compaction_tasks_producer_callback() {
521
1
    LOG(INFO) << "try to start compaction producer process!";
522
523
1
    int round = 0;
524
1
    CompactionType compaction_type;
525
526
    // Used to record the time when the score metric was last updated.
527
    // The update of the score metric is accompanied by the logic of selecting the tablet.
528
    // If there is no slot available, the logic of selecting the tablet will be terminated,
529
    // which causes the score metric update to be terminated.
530
    // In order to avoid this situation, we need to update the score regularly.
531
1
    int64_t last_cumulative_score_update_time = 0;
532
1
    int64_t last_base_score_update_time = 0;
533
1
    static const int64_t check_score_interval_ms = 5000; // 5 secs
534
535
1
    int64_t interval = config::generate_compaction_tasks_interval_ms;
536
18.7k
    do {
537
18.7k
        int64_t cur_time = UnixMillis();
538
18.7k
        if (!config::disable_auto_compaction) {
539
18.7k
            Status st = _adjust_compaction_thread_num();
540
18.7k
            if (!st.ok()) {
541
0
                break;
542
0
            }
543
544
18.7k
            bool check_score = false;
545
18.7k
            if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) {
546
16.9k
                compaction_type = CompactionType::CUMULATIVE_COMPACTION;
547
16.9k
                round++;
548
16.9k
                if (cur_time - last_cumulative_score_update_time >= check_score_interval_ms) {
549
704
                    check_score = true;
550
704
                    last_cumulative_score_update_time = cur_time;
551
704
                }
552
16.9k
            } else {
553
1.87k
                compaction_type = CompactionType::BASE_COMPACTION;
554
1.87k
                round = 0;
555
1.87k
                if (cur_time - last_base_score_update_time >= check_score_interval_ms) {
556
591
                    check_score = true;
557
591
                    last_base_score_update_time = cur_time;
558
591
                }
559
1.87k
            }
560
18.7k
            std::unique_ptr<ThreadPool>& thread_pool =
561
18.7k
                    (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
562
18.7k
                            ? _cumu_compaction_thread_pool
563
18.7k
                            : _base_compaction_thread_pool;
564
18.7k
            VLOG_CRITICAL << "compaction thread pool. type: "
565
0
                          << (compaction_type == CompactionType::CUMULATIVE_COMPACTION ? "CUMU"
566
0
                                                                                       : "BASE")
567
0
                          << ", num_threads: " << thread_pool->num_threads()
568
0
                          << ", num_threads_pending_start: "
569
0
                          << thread_pool->num_threads_pending_start()
570
0
                          << ", num_active_threads: " << thread_pool->num_active_threads()
571
0
                          << ", max_threads: " << thread_pool->max_threads()
572
0
                          << ", min_threads: " << thread_pool->min_threads()
573
0
                          << ", num_total_queued_tasks: " << thread_pool->get_queue_size();
574
18.7k
            std::vector<CloudTabletSPtr> tablets_compaction =
575
18.7k
                    _generate_cloud_compaction_tasks(compaction_type, check_score);
576
577
            /// Regardless of whether the tablet is submitted for compaction or not,
578
            /// we need to call 'reset_compaction' to clean up the base_compaction or cumulative_compaction objects
579
            /// in the tablet, because these two objects store the tablet's own shared_ptr.
580
            /// If it is not cleaned up, the reference count of the tablet will always be greater than 1,
581
            /// thus cannot be collected by the garbage collector. (TabletManager::start_trash_sweep)
582
108k
            for (const auto& tablet : tablets_compaction) {
583
108k
                Status status = submit_compaction_task(tablet, compaction_type);
584
108k
                if (status.ok()) continue;
585
101k
                if ((!status.is<ErrorCode::BE_NO_SUITABLE_VERSION>() &&
586
101k
                     !status.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) ||
587
101k
                    VLOG_DEBUG_IS_ON) {
588
24
                    LOG(WARNING) << "failed to submit compaction task for tablet: "
589
24
                                 << tablet->tablet_id() << ", err: " << status;
590
24
                }
591
101k
            }
592
18.7k
            interval = config::generate_compaction_tasks_interval_ms;
593
18.7k
        } else {
594
1
            interval = config::check_auto_compaction_interval_seconds * 1000;
595
1
        }
596
18.7k
        int64_t end_time = UnixMillis();
597
18.7k
        DorisMetrics::instance()->compaction_producer_callback_a_round_time->set_value(end_time -
598
18.7k
                                                                                       cur_time);
599
18.7k
    } while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
600
1
}
601
602
void CloudStorageEngine::unregister_index_change_compaction(int64_t tablet_id,
603
885
                                                            bool is_base_compact) {
604
885
    std::lock_guard lock(_compaction_mtx);
605
885
    if (is_base_compact) {
606
0
        _submitted_index_change_base_compaction.erase(tablet_id);
607
885
    } else {
608
885
        _submitted_index_change_cumu_compaction.erase(tablet_id);
609
885
    }
610
885
}
611
612
bool CloudStorageEngine::register_index_change_compaction(
613
        std::shared_ptr<CloudIndexChangeCompaction> compact, int64_t tablet_id,
614
888
        bool is_base_compact, std::string& err_reason) {
615
888
    std::lock_guard lock(_compaction_mtx);
616
888
    if (is_base_compact) {
617
2
        if (_submitted_base_compactions.contains(tablet_id) ||
618
2
            _submitted_full_compactions.contains(tablet_id) ||
619
2
            _submitted_index_change_base_compaction.contains(tablet_id)) {
620
1
            std::stringstream ss;
621
1
            ss << "reason:" << ((int)_submitted_base_compactions.contains(tablet_id)) << ", "
622
1
               << ((int)_submitted_full_compactions.contains(tablet_id)) << ", "
623
1
               << ((int)_submitted_index_change_base_compaction.contains(tablet_id));
624
1
            err_reason = ss.str();
625
1
            return false;
626
1
        } else {
627
1
            _submitted_index_change_base_compaction[tablet_id] = compact;
628
1
            return true;
629
1
        }
630
886
    } else {
631
886
        if (_tablet_preparing_cumu_compaction.contains(tablet_id) ||
632
889
            _submitted_cumu_compactions.contains(tablet_id) ||
633
888
            _submitted_index_change_cumu_compaction.contains(tablet_id)) {
634
1
            std::stringstream ss;
635
1
            ss << "reason:" << ((int)_tablet_preparing_cumu_compaction.contains(tablet_id)) << ", "
636
1
               << ((int)_submitted_cumu_compactions.contains(tablet_id)) << ", "
637
1
               << ((int)_submitted_index_change_cumu_compaction.contains(tablet_id));
638
1
            err_reason = ss.str();
639
1
            return false;
640
885
        } else {
641
885
            _submitted_index_change_cumu_compaction[tablet_id] = compact;
642
885
        }
643
885
        return true;
644
886
    }
645
888
}
646
647
std::vector<CloudTabletSPtr> CloudStorageEngine::_generate_cloud_compaction_tasks(
648
18.7k
        CompactionType compaction_type, bool check_score) {
649
18.7k
    std::vector<std::shared_ptr<CloudTablet>> tablets_compaction;
650
651
18.7k
    int64_t max_compaction_score = 0;
652
18.7k
    std::unordered_set<int64_t> tablet_preparing_cumu_compaction;
653
18.7k
    std::unordered_map<int64_t, std::vector<std::shared_ptr<CloudCumulativeCompaction>>>
654
18.7k
            submitted_cumu_compactions;
655
18.7k
    std::unordered_map<int64_t, std::shared_ptr<CloudBaseCompaction>> submitted_base_compactions;
656
18.7k
    std::unordered_map<int64_t, std::shared_ptr<CloudFullCompaction>> submitted_full_compactions;
657
18.7k
    std::unordered_map<int64_t, std::shared_ptr<CloudIndexChangeCompaction>>
658
18.7k
            submitted_index_change_cumu_compactions;
659
18.7k
    std::unordered_map<int64_t, std::shared_ptr<CloudIndexChangeCompaction>>
660
18.7k
            submitted_index_change_base_compactions;
661
18.7k
    {
662
18.7k
        std::lock_guard lock(_compaction_mtx);
663
18.7k
        tablet_preparing_cumu_compaction = _tablet_preparing_cumu_compaction;
664
18.7k
        submitted_cumu_compactions = _submitted_cumu_compactions;
665
18.7k
        submitted_base_compactions = _submitted_base_compactions;
666
18.7k
        submitted_full_compactions = _submitted_full_compactions;
667
18.7k
        submitted_index_change_cumu_compactions = _submitted_index_change_cumu_compaction;
668
18.7k
        submitted_index_change_base_compactions = _submitted_index_change_base_compaction;
669
18.7k
    }
670
671
18.7k
    bool need_pick_tablet = true;
672
18.7k
    int thread_per_disk =
673
18.7k
            config::compaction_task_num_per_fast_disk; // all disks are fast in cloud mode
674
18.7k
    int num_cumu =
675
18.7k
            std::accumulate(submitted_cumu_compactions.begin(), submitted_cumu_compactions.end(), 0,
676
18.7k
                            [](int a, auto& b) { return a + b.second.size(); });
677
18.7k
    int num_base =
678
18.7k
            cast_set<int>(submitted_base_compactions.size() + submitted_full_compactions.size());
679
18.7k
    int n = thread_per_disk - num_cumu - num_base;
680
18.7k
    if (compaction_type == CompactionType::BASE_COMPACTION) {
681
        // We need to reserve at least one thread for cumulative compaction,
682
        // because base compactions may take too long to complete, which may
683
        // leads to "too many rowsets" error.
684
1.87k
        int base_n = std::min(config::max_base_compaction_task_num_per_disk, thread_per_disk - 1) -
685
1.87k
                     num_base;
686
1.87k
        n = std::min(base_n, n);
687
1.87k
    }
688
18.7k
    if (n <= 0) { // No threads available
689
361
        if (!check_score) return tablets_compaction;
690
17
        need_pick_tablet = false;
691
17
        n = 0;
692
17
    }
693
694
    // Return true for skipping compaction
695
18.4k
    std::function<bool(CloudTablet*)> filter_out;
696
18.4k
    if (compaction_type == CompactionType::BASE_COMPACTION) {
697
1.84k
        filter_out = [&submitted_base_compactions, &submitted_full_compactions,
698
58.1M
                      &submitted_index_change_base_compactions](CloudTablet* t) {
699
58.1M
            return submitted_base_compactions.contains(t->tablet_id()) ||
700
58.1M
                   submitted_full_compactions.contains(t->tablet_id()) ||
701
58.1M
                   submitted_index_change_base_compactions.contains(t->tablet_id()) ||
702
58.1M
                   t->tablet_state() != TABLET_RUNNING;
703
58.1M
        };
704
16.6k
    } else if (config::enable_parallel_cumu_compaction) {
705
0
        filter_out = [&tablet_preparing_cumu_compaction,
706
0
                      &submitted_index_change_cumu_compactions](CloudTablet* t) {
707
0
            return tablet_preparing_cumu_compaction.contains(t->tablet_id()) ||
708
0
                   submitted_index_change_cumu_compactions.contains(t->tablet_id()) ||
709
0
                   (t->tablet_state() != TABLET_RUNNING &&
710
0
                    (!config::enable_new_tablet_do_compaction || t->alter_version() == -1));
711
0
        };
712
16.6k
    } else {
713
16.6k
        filter_out = [&tablet_preparing_cumu_compaction, &submitted_cumu_compactions,
714
217M
                      &submitted_index_change_cumu_compactions](CloudTablet* t) {
715
217M
            return tablet_preparing_cumu_compaction.contains(t->tablet_id()) ||
716
217M
                   submitted_index_change_cumu_compactions.contains(t->tablet_id()) ||
717
217M
                   submitted_cumu_compactions.contains(t->tablet_id()) ||
718
217M
                   (t->tablet_state() != TABLET_RUNNING &&
719
217M
                    (!config::enable_new_tablet_do_compaction || t->alter_version() == -1));
720
217M
        };
721
16.6k
    }
722
723
    // Even if need_pick_tablet is false, we still need to call find_best_tablet_to_compaction(),
724
    // So that we can update the max_compaction_score metric.
725
18.4k
    do {
726
18.4k
        std::vector<CloudTabletSPtr> tablets;
727
18.4k
        auto st = tablet_mgr().get_topn_tablets_to_compact(n, compaction_type, filter_out, &tablets,
728
18.4k
                                                           &max_compaction_score);
729
18.4k
        if (!st.ok()) {
730
0
            LOG(WARNING) << "failed to get tablets to compact, err=" << st;
731
0
            break;
732
0
        }
733
18.4k
        if (!need_pick_tablet) break;
734
18.4k
        tablets_compaction = std::move(tablets);
735
18.4k
    } while (false);
736
737
18.4k
    if (max_compaction_score > 0) {
738
17.9k
        if (compaction_type == CompactionType::BASE_COMPACTION) {
739
1.80k
            DorisMetrics::instance()->tablet_base_max_compaction_score->set_value(
740
1.80k
                    max_compaction_score);
741
16.1k
        } else {
742
16.1k
            DorisMetrics::instance()->tablet_cumulative_max_compaction_score->set_value(
743
16.1k
                    max_compaction_score);
744
16.1k
        }
745
17.9k
    }
746
747
18.4k
    return tablets_compaction;
748
18.7k
}
749
750
Status CloudStorageEngine::_request_tablet_global_compaction_lock(
751
        ReaderType compaction_type, const CloudTabletSPtr& tablet,
752
6.59k
        std::shared_ptr<CloudCompactionMixin> compaction) {
753
6.59k
    long now = duration_cast<std::chrono::milliseconds>(
754
6.59k
                       std::chrono::system_clock::now().time_since_epoch())
755
6.59k
                       .count();
756
6.59k
    if (compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION) {
757
6.42k
        auto cumu_compaction = static_pointer_cast<CloudCumulativeCompaction>(compaction);
758
6.42k
        if (auto st = cumu_compaction->request_global_lock(); !st.ok()) {
759
307
            LOG_WARNING("failed to request cumu compactoin global lock")
760
307
                    .tag("tablet id", tablet->tablet_id())
761
307
                    .tag("msg", st.to_string());
762
307
            tablet->set_last_cumu_compaction_failure_time(now);
763
307
            return st;
764
307
        }
765
6.12k
        {
766
6.12k
            std::lock_guard lock(_compaction_mtx);
767
6.12k
            _executing_cumu_compactions[tablet->tablet_id()].push_back(cumu_compaction);
768
6.12k
        }
769
6.12k
        return Status::OK();
770
6.42k
    } else if (compaction_type == ReaderType::READER_BASE_COMPACTION) {
771
64
        auto base_compaction = static_pointer_cast<CloudBaseCompaction>(compaction);
772
64
        if (auto st = base_compaction->request_global_lock(); !st.ok()) {
773
7
            LOG_WARNING("failed to request base compactoin global lock")
774
7
                    .tag("tablet id", tablet->tablet_id())
775
7
                    .tag("msg", st.to_string());
776
7
            tablet->set_last_base_compaction_failure_time(now);
777
7
            return st;
778
7
        }
779
57
        {
780
57
            std::lock_guard lock(_compaction_mtx);
781
57
            _executing_base_compactions[tablet->tablet_id()] = base_compaction;
782
57
        }
783
57
        return Status::OK();
784
104
    } else if (compaction_type == ReaderType::READER_FULL_COMPACTION) {
785
104
        auto full_compaction = static_pointer_cast<CloudFullCompaction>(compaction);
786
104
        if (auto st = full_compaction->request_global_lock(); !st.ok()) {
787
0
            LOG_WARNING("failed to request full compactoin global lock")
788
0
                    .tag("tablet id", tablet->tablet_id())
789
0
                    .tag("msg", st.to_string());
790
0
            tablet->set_last_full_compaction_failure_time(now);
791
0
            return st;
792
0
        }
793
104
        {
794
104
            std::lock_guard lock(_compaction_mtx);
795
104
            _executing_full_compactions[tablet->tablet_id()] = full_compaction;
796
104
        }
797
104
        return Status::OK();
798
104
    } else {
799
0
        LOG(WARNING) << "unsupport compaction task for tablet: " << tablet->tablet_id()
800
0
                     << ", compaction name: " << compaction->compaction_name();
801
0
        return Status::NotFound("Unsupport compaction type {}", compaction->compaction_name());
802
0
    }
803
6.59k
}
804
805
Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& tablet,
806
3.29k
                                                        int trigger_method) {
807
3.29k
    using namespace std::chrono;
808
3.29k
    {
809
3.29k
        std::lock_guard lock(_compaction_mtx);
810
        // Take a placeholder for base compaction
811
3.29k
        auto [_, success] = _submitted_base_compactions.emplace(tablet->tablet_id(), nullptr);
812
3.29k
        if (!success) {
813
0
            return Status::AlreadyExist(
814
0
                    "other base compaction or full compaction is submitted, tablet_id={}",
815
0
                    tablet->tablet_id());
816
0
        }
817
3.29k
    }
818
3.29k
    auto compaction = std::make_shared<CloudBaseCompaction>(*this, tablet);
819
3.29k
    auto st = compaction->prepare_compact();
820
3.29k
    if (!st.ok()) {
821
3.23k
        long now = duration_cast<std::chrono::milliseconds>(
822
3.23k
                           std::chrono::system_clock::now().time_since_epoch())
823
3.23k
                           .count();
824
3.23k
        tablet->set_last_base_compaction_failure_time(now);
825
3.23k
        std::lock_guard lock(_compaction_mtx);
826
3.23k
        _submitted_base_compactions.erase(tablet->tablet_id());
827
3.23k
        return st;
828
3.23k
    }
829
    // Register task with CompactionTaskTracker as PENDING
830
64
    auto* tracker = CompactionTaskTracker::instance();
831
64
    int64_t compaction_id = compaction->compaction_id();
832
64
    {
833
64
        CompactionTaskInfo info;
834
64
        info.compaction_id = compaction_id;
835
64
        info.tablet_id = tablet->tablet_id();
836
64
        info.table_id = tablet->table_id();
837
64
        info.partition_id = tablet->partition_id();
838
64
        info.compaction_type = CompactionProfileType::BASE;
839
64
        info.status = CompactionTaskStatus::PENDING;
840
64
        info.trigger_method = static_cast<TriggerMethod>(trigger_method);
841
64
        info.scheduled_time_ms =
842
64
                duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
843
64
        info.backend_id = BackendOptions::get_backend_id();
844
64
        info.compaction_score = tablet->get_real_compaction_score();
845
64
        info.input_rowsets_count = compaction->input_rowsets_count();
846
64
        info.input_row_num = compaction->input_row_num_value();
847
64
        info.input_data_size = compaction->input_rowsets_data_size();
848
64
        info.input_index_size = compaction->input_rowsets_index_size();
849
64
        info.input_total_size = compaction->input_rowsets_total_size();
850
64
        info.input_segments_num = compaction->input_segments_num_value();
851
64
        info.input_version_range = compaction->input_version_range_str();
852
64
        info.is_vertical = compaction->is_vertical();
853
64
        tracker->register_task(std::move(info));
854
64
    }
855
64
    {
856
64
        std::lock_guard lock(_compaction_mtx);
857
64
        _submitted_base_compactions[tablet->tablet_id()] = compaction;
858
64
    }
859
64
    st = _base_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() {
860
64
        DorisMetrics::instance()->base_compaction_task_running_total->increment(1);
861
64
        DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
862
64
                _base_compaction_thread_pool->get_queue_size());
863
64
        g_base_compaction_running_task_count << 1;
864
64
        signal::tablet_id = tablet->tablet_id();
865
64
        Defer defer {[&]() {
866
            // Idempotent cleanup: remove task from tracker
867
64
            CompactionTaskTracker::instance()->remove_task(compaction_id);
868
64
            g_base_compaction_running_task_count << -1;
869
64
            std::lock_guard lock(_compaction_mtx);
870
64
            _submitted_base_compactions.erase(tablet->tablet_id());
871
64
            DorisMetrics::instance()->base_compaction_task_running_total->increment(-1);
872
64
            DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
873
64
                    _base_compaction_thread_pool->get_queue_size());
874
64
        }};
875
64
        auto st = _request_tablet_global_compaction_lock(ReaderType::READER_BASE_COMPACTION, tablet,
876
64
                                                         compaction);
877
64
        if (!st.ok()) return;
878
        // Update tracker to RUNNING after acquiring global lock
879
57
        {
880
57
            RunningStats rs;
881
57
            rs.start_time_ms =
882
57
                    duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
883
57
            CompactionTaskTracker::instance()->update_to_running(compaction_id, rs);
884
57
        }
885
57
        st = compaction->execute_compact();
886
57
        if (!st.ok()) {
887
            // Error log has been output in `execute_compact`
888
0
            long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
889
0
            tablet->set_last_base_compaction_failure_time(now);
890
0
        }
891
57
        std::lock_guard lock(_compaction_mtx);
892
57
        _executing_base_compactions.erase(tablet->tablet_id());
893
57
    });
894
64
    DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
895
64
            _base_compaction_thread_pool->get_queue_size());
896
64
    if (!st.ok()) {
897
0
        tracker->remove_task(compaction_id);
898
0
        std::lock_guard lock(_compaction_mtx);
899
0
        _submitted_base_compactions.erase(tablet->tablet_id());
900
0
        return Status::InternalError("failed to submit base compaction, tablet_id={}",
901
0
                                     tablet->tablet_id());
902
0
    }
903
64
    return st;
904
64
}
905
906
Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletSPtr& tablet,
907
105k
                                                              int trigger_method) {
908
105k
    using namespace std::chrono;
909
105k
    {
910
105k
        std::lock_guard lock(_compaction_mtx);
911
105k
        if (!config::enable_parallel_cumu_compaction &&
912
105k
            _submitted_cumu_compactions.count(tablet->tablet_id())) {
913
5
            return Status::AlreadyExist("other cumu compaction is submitted, tablet_id={}",
914
5
                                        tablet->tablet_id());
915
5
        }
916
105k
        auto [_, success] = _tablet_preparing_cumu_compaction.insert(tablet->tablet_id());
917
105k
        if (!success) {
918
0
            return Status::AlreadyExist("other cumu compaction is preparing, tablet_id={}",
919
0
                                        tablet->tablet_id());
920
0
        }
921
105k
    }
922
105k
    auto compaction = std::make_shared<CloudCumulativeCompaction>(*this, tablet);
923
105k
    auto st = compaction->prepare_compact();
924
105k
    if (!st.ok()) {
925
98.7k
        long now = duration_cast<std::chrono::milliseconds>(
926
98.7k
                           std::chrono::system_clock::now().time_since_epoch())
927
98.7k
                           .count();
928
98.7k
        if (!st.is<ErrorCode::CUMULATIVE_MEET_DELETE_VERSION>()) {
929
98.7k
            if (st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) {
930
                // Backoff strategy if no suitable version
931
98.7k
                tablet->last_cumu_no_suitable_version_ms = now;
932
98.7k
            } else {
933
0
                tablet->set_last_cumu_compaction_failure_time(now);
934
0
            }
935
98.7k
        }
936
98.7k
        std::lock_guard lock(_compaction_mtx);
937
98.7k
        _tablet_preparing_cumu_compaction.erase(tablet->tablet_id());
938
98.7k
        return st;
939
98.7k
    }
940
    // Register task with CompactionTaskTracker as PENDING
941
    // IMPORTANT: use compaction->compaction_id(), NOT tracker->next_compaction_id(),
942
    // because the Compaction constructor already allocated an ID via the tracker.
943
6.42k
    auto* tracker = CompactionTaskTracker::instance();
944
6.42k
    int64_t compaction_id = compaction->compaction_id();
945
6.42k
    {
946
6.42k
        CompactionTaskInfo info;
947
6.42k
        info.compaction_id = compaction_id;
948
6.42k
        info.tablet_id = tablet->tablet_id();
949
6.42k
        info.table_id = tablet->table_id();
950
6.42k
        info.partition_id = tablet->partition_id();
951
6.42k
        info.compaction_type = CompactionProfileType::CUMULATIVE;
952
6.42k
        info.status = CompactionTaskStatus::PENDING;
953
6.42k
        info.trigger_method = static_cast<TriggerMethod>(trigger_method);
954
6.42k
        info.scheduled_time_ms =
955
6.42k
                duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
956
6.42k
        info.backend_id = BackendOptions::get_backend_id();
957
6.42k
        info.compaction_score = tablet->get_real_compaction_score();
958
6.42k
        info.input_rowsets_count = compaction->input_rowsets_count();
959
6.42k
        info.input_row_num = compaction->input_row_num_value();
960
6.42k
        info.input_data_size = compaction->input_rowsets_data_size();
961
6.42k
        info.input_index_size = compaction->input_rowsets_index_size();
962
6.42k
        info.input_total_size = compaction->input_rowsets_total_size();
963
6.42k
        info.input_segments_num = compaction->input_segments_num_value();
964
6.42k
        info.input_version_range = compaction->input_version_range_str();
965
6.42k
        info.is_vertical = compaction->is_vertical();
966
6.42k
        tracker->register_task(std::move(info));
967
6.42k
    }
968
6.42k
    {
969
6.42k
        std::lock_guard lock(_compaction_mtx);
970
6.42k
        _tablet_preparing_cumu_compaction.erase(tablet->tablet_id());
971
6.42k
        _submitted_cumu_compactions[tablet->tablet_id()].push_back(compaction);
972
6.42k
    }
973
6.42k
    auto erase_submitted_cumu_compaction = [=, this]() {
974
6.42k
        std::lock_guard lock(_compaction_mtx);
975
6.42k
        auto it = _submitted_cumu_compactions.find(tablet->tablet_id());
976
6.42k
        DCHECK(it != _submitted_cumu_compactions.end());
977
6.42k
        auto& compactions = it->second;
978
6.42k
        auto it1 = std::find(compactions.begin(), compactions.end(), compaction);
979
6.42k
        DCHECK(it1 != compactions.end());
980
6.42k
        compactions.erase(it1);
981
6.42k
        if (compactions.empty()) { // No compactions on this tablet, erase key
982
6.42k
            _submitted_cumu_compactions.erase(it);
983
            // No cumu compaction on this tablet, reset `last_cumu_no_suitable_version_ms` to enable this tablet to
984
            // enter the compaction scheduling candidate set. The purpose of doing this is to have at least one BE perform
985
            // cumu compaction on tablet which has suitable versions for cumu compaction.
986
6.42k
            tablet->last_cumu_no_suitable_version_ms = 0;
987
6.42k
        }
988
6.42k
    };
989
6.42k
    auto erase_executing_cumu_compaction = [=, this]() {
990
6.11k
        std::lock_guard lock(_compaction_mtx);
991
6.11k
        auto it = _executing_cumu_compactions.find(tablet->tablet_id());
992
6.11k
        DCHECK(it != _executing_cumu_compactions.end());
993
6.11k
        auto& compactions = it->second;
994
6.11k
        auto it1 = std::find(compactions.begin(), compactions.end(), compaction);
995
6.11k
        DCHECK(it1 != compactions.end());
996
6.11k
        compactions.erase(it1);
997
6.12k
        if (compactions.empty()) { // No compactions on this tablet, erase key
998
6.12k
            _executing_cumu_compactions.erase(it);
999
            // No cumu compaction on this tablet, reset `last_cumu_no_suitable_version_ms` to enable this tablet to
1000
            // enter the compaction scheduling candidate set. The purpose of doing this is to have at least one BE perform
1001
            // cumu compaction on tablet which has suitable versions for cumu compaction.
1002
6.12k
            tablet->last_cumu_no_suitable_version_ms = 0;
1003
6.12k
        }
1004
6.11k
    };
1005
6.42k
    st = _cumu_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() {
1006
6.42k
        DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(1);
1007
6.42k
        DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
1008
6.42k
                _cumu_compaction_thread_pool->get_queue_size());
1009
6.42k
        DBUG_EXECUTE_IF("CloudStorageEngine._submit_cumulative_compaction_task.wait_in_line",
1010
6.42k
                        { sleep(5); })
1011
6.42k
        signal::tablet_id = tablet->tablet_id();
1012
6.42k
        g_cumu_compaction_running_task_count << 1;
1013
6.42k
        bool is_large_task = true;
1014
6.42k
        Defer defer {[&]() {
1015
6.42k
            DBUG_EXECUTE_IF("CloudStorageEngine._submit_cumulative_compaction_task.sleep",
1016
6.42k
                            { sleep(5); })
1017
            // Idempotent cleanup: remove task from tracker
1018
6.42k
            CompactionTaskTracker::instance()->remove_task(compaction_id);
1019
6.42k
            std::lock_guard lock(_cumu_compaction_delay_mtx);
1020
6.42k
            _cumu_compaction_thread_pool_used_threads--;
1021
6.42k
            if (!is_large_task) {
1022
6.12k
                _cumu_compaction_thread_pool_small_tasks_running--;
1023
6.12k
            }
1024
6.42k
            g_cumu_compaction_running_task_count << -1;
1025
6.42k
            erase_submitted_cumu_compaction();
1026
6.42k
            DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(-1);
1027
6.42k
            DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
1028
6.42k
                    _cumu_compaction_thread_pool->get_queue_size());
1029
6.42k
        }};
1030
6.42k
        auto st = _request_tablet_global_compaction_lock(ReaderType::READER_CUMULATIVE_COMPACTION,
1031
6.42k
                                                         tablet, compaction);
1032
6.42k
        if (!st.ok()) return;
1033
        // Update tracker to RUNNING after acquiring global lock
1034
6.12k
        {
1035
6.12k
            RunningStats rs;
1036
6.12k
            rs.start_time_ms =
1037
6.12k
                    duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
1038
6.12k
            CompactionTaskTracker::instance()->update_to_running(compaction_id, rs);
1039
6.12k
        }
1040
6.12k
        do {
1041
6.12k
            std::lock_guard lock(_cumu_compaction_delay_mtx);
1042
6.12k
            _cumu_compaction_thread_pool_used_threads++;
1043
6.12k
            if (config::large_cumu_compaction_task_min_thread_num > 1 &&
1044
6.12k
                _cumu_compaction_thread_pool->max_threads() >=
1045
6.12k
                        config::large_cumu_compaction_task_min_thread_num) {
1046
                // Determine if this is a small task based on configured thresholds
1047
6.12k
                is_large_task = (compaction->get_input_rowsets_bytes() >
1048
6.12k
                                         config::large_cumu_compaction_task_bytes_threshold ||
1049
6.12k
                                 compaction->get_input_num_rows() >
1050
6.12k
                                         config::large_cumu_compaction_task_row_num_threshold);
1051
                // Small task. No delay needed
1052
6.12k
                if (!is_large_task) {
1053
6.12k
                    _cumu_compaction_thread_pool_small_tasks_running++;
1054
6.12k
                    break;
1055
6.12k
                }
1056
                // Deal with large task
1057
1
                if (_should_delay_large_task()) {
1058
0
                    long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch())
1059
0
                                       .count();
1060
                    // sleep 5s for this tablet
1061
0
                    tablet->set_last_cumu_compaction_failure_time(now);
1062
0
                    erase_executing_cumu_compaction();
1063
0
                    LOG_WARNING(
1064
0
                            "failed to do CloudCumulativeCompaction, cumu thread pool is "
1065
0
                            "intensive, delay large task.")
1066
0
                            .tag("tablet_id", tablet->tablet_id())
1067
0
                            .tag("input_rows", compaction->get_input_num_rows())
1068
0
                            .tag("input_rowsets_total_size", compaction->get_input_rowsets_bytes())
1069
0
                            .tag("config::large_cumu_compaction_task_bytes_threshold",
1070
0
                                 config::large_cumu_compaction_task_bytes_threshold)
1071
0
                            .tag("config::large_cumu_compaction_task_row_num_threshold",
1072
0
                                 config::large_cumu_compaction_task_row_num_threshold)
1073
0
                            .tag("remaining threads", _cumu_compaction_thread_pool_used_threads)
1074
0
                            .tag("small_tasks_running",
1075
0
                                 _cumu_compaction_thread_pool_small_tasks_running);
1076
0
                    return;
1077
0
                }
1078
1
            }
1079
6.12k
        } while (false);
1080
6.12k
        st = compaction->execute_compact();
1081
6.12k
        if (!st.ok()) {
1082
            // Error log has been output in `execute_compact`
1083
47
            long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
1084
47
            tablet->set_last_cumu_compaction_failure_time(now);
1085
47
        }
1086
6.12k
        erase_executing_cumu_compaction();
1087
6.12k
    });
1088
6.42k
    DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
1089
6.42k
            _cumu_compaction_thread_pool->get_queue_size());
1090
6.42k
    if (!st.ok()) {
1091
0
        tracker->remove_task(compaction_id);
1092
0
        erase_submitted_cumu_compaction();
1093
0
        return Status::InternalError("failed to submit cumu compaction, tablet_id={}",
1094
0
                                     tablet->tablet_id());
1095
0
    }
1096
6.42k
    return st;
1097
6.42k
}
1098
1099
Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& tablet,
1100
105
                                                        int trigger_method) {
1101
105
    using namespace std::chrono;
1102
105
    {
1103
105
        std::lock_guard lock(_compaction_mtx);
1104
        // Take a placeholder for full compaction
1105
105
        auto [_, success] = _submitted_full_compactions.emplace(tablet->tablet_id(), nullptr);
1106
105
        if (!success) {
1107
0
            return Status::AlreadyExist(
1108
0
                    "other full compaction or base compaction is submitted, tablet_id={}",
1109
0
                    tablet->tablet_id());
1110
0
        }
1111
105
    }
1112
    //auto compaction = std::make_shared<CloudFullCompaction>(tablet);
1113
105
    auto compaction = std::make_shared<CloudFullCompaction>(*this, tablet);
1114
105
    auto st = compaction->prepare_compact();
1115
105
    if (!st.ok()) {
1116
1
        long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
1117
1
        tablet->set_last_full_compaction_failure_time(now);
1118
1
        std::lock_guard lock(_compaction_mtx);
1119
1
        _submitted_full_compactions.erase(tablet->tablet_id());
1120
1
        return st;
1121
1
    }
1122
    // Register task with CompactionTaskTracker as PENDING
1123
104
    auto* tracker = CompactionTaskTracker::instance();
1124
104
    int64_t compaction_id = compaction->compaction_id();
1125
104
    {
1126
104
        CompactionTaskInfo info;
1127
104
        info.compaction_id = compaction_id;
1128
104
        info.tablet_id = tablet->tablet_id();
1129
104
        info.table_id = tablet->table_id();
1130
104
        info.partition_id = tablet->partition_id();
1131
104
        info.compaction_type = CompactionProfileType::FULL;
1132
104
        info.status = CompactionTaskStatus::PENDING;
1133
104
        info.trigger_method = static_cast<TriggerMethod>(trigger_method);
1134
104
        info.scheduled_time_ms =
1135
104
                duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
1136
104
        info.backend_id = BackendOptions::get_backend_id();
1137
104
        info.compaction_score = tablet->get_real_compaction_score();
1138
104
        info.input_rowsets_count = compaction->input_rowsets_count();
1139
104
        info.input_row_num = compaction->input_row_num_value();
1140
104
        info.input_data_size = compaction->input_rowsets_data_size();
1141
104
        info.input_index_size = compaction->input_rowsets_index_size();
1142
104
        info.input_total_size = compaction->input_rowsets_total_size();
1143
104
        info.input_segments_num = compaction->input_segments_num_value();
1144
104
        info.input_version_range = compaction->input_version_range_str();
1145
104
        info.is_vertical = compaction->is_vertical();
1146
104
        tracker->register_task(std::move(info));
1147
104
    }
1148
104
    {
1149
104
        std::lock_guard lock(_compaction_mtx);
1150
104
        _submitted_full_compactions[tablet->tablet_id()] = compaction;
1151
104
    }
1152
104
    st = _base_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() {
1153
104
        g_full_compaction_running_task_count << 1;
1154
104
        signal::tablet_id = tablet->tablet_id();
1155
104
        Defer defer {[&]() {
1156
            // Idempotent cleanup: remove task from tracker
1157
104
            CompactionTaskTracker::instance()->remove_task(compaction_id);
1158
104
            g_full_compaction_running_task_count << -1;
1159
104
            std::lock_guard lock(_compaction_mtx);
1160
104
            _submitted_full_compactions.erase(tablet->tablet_id());
1161
104
        }};
1162
104
        auto st = _request_tablet_global_compaction_lock(ReaderType::READER_FULL_COMPACTION, tablet,
1163
104
                                                         compaction);
1164
104
        if (!st.ok()) return;
1165
        // Update tracker to RUNNING after acquiring global lock
1166
104
        {
1167
104
            RunningStats rs;
1168
104
            rs.start_time_ms =
1169
104
                    duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
1170
104
            CompactionTaskTracker::instance()->update_to_running(compaction_id, rs);
1171
104
        }
1172
104
        st = compaction->execute_compact();
1173
104
        if (!st.ok()) {
1174
            // Error log has been output in `execute_compact`
1175
0
            long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
1176
0
            tablet->set_last_full_compaction_failure_time(now);
1177
0
        }
1178
104
        std::lock_guard lock(_compaction_mtx);
1179
104
        _executing_full_compactions.erase(tablet->tablet_id());
1180
104
    });
1181
104
    if (!st.ok()) {
1182
0
        tracker->remove_task(compaction_id);
1183
0
        std::lock_guard lock(_compaction_mtx);
1184
0
        _submitted_full_compactions.erase(tablet->tablet_id());
1185
0
        return Status::InternalError("failed to submit full compaction, tablet_id={}",
1186
0
                                     tablet->tablet_id());
1187
0
    }
1188
104
    return st;
1189
104
}
1190
1191
Status CloudStorageEngine::submit_compaction_task(const CloudTabletSPtr& tablet,
1192
                                                  CompactionType compaction_type,
1193
108k
                                                  int trigger_method) {
1194
108k
    DCHECK(compaction_type == CompactionType::CUMULATIVE_COMPACTION ||
1195
108k
           compaction_type == CompactionType::BASE_COMPACTION ||
1196
108k
           compaction_type == CompactionType::FULL_COMPACTION);
1197
108k
    switch (compaction_type) {
1198
3.29k
    case CompactionType::BASE_COMPACTION:
1199
3.29k
        RETURN_IF_ERROR(_submit_base_compaction_task(tablet, trigger_method));
1200
64
        return Status::OK();
1201
105k
    case CompactionType::CUMULATIVE_COMPACTION:
1202
105k
        RETURN_IF_ERROR(_submit_cumulative_compaction_task(tablet, trigger_method));
1203
6.42k
        return Status::OK();
1204
105
    case CompactionType::FULL_COMPACTION:
1205
105
        RETURN_IF_ERROR(_submit_full_compaction_task(tablet, trigger_method));
1206
104
        return Status::OK();
1207
0
    default:
1208
0
        return Status::InternalError("unknown compaction type!");
1209
108k
    }
1210
108k
}
1211
1212
1
void CloudStorageEngine::_lease_compaction_thread_callback() {
1213
183
    while (!_stop_background_threads_latch.wait_for(
1214
183
            std::chrono::seconds(config::lease_compaction_interval_seconds))) {
1215
182
        std::vector<std::shared_ptr<CloudFullCompaction>> full_compactions;
1216
182
        std::vector<std::shared_ptr<CloudBaseCompaction>> base_compactions;
1217
182
        std::vector<std::shared_ptr<CloudCumulativeCompaction>> cumu_compactions;
1218
182
        std::vector<std::shared_ptr<CloudCompactionStopToken>> compation_stop_tokens;
1219
182
        std::vector<std::shared_ptr<CloudIndexChangeCompaction>> index_change_compations;
1220
182
        {
1221
182
            std::lock_guard lock(_compaction_mtx);
1222
182
            for (auto& [_, base] : _executing_base_compactions) {
1223
1
                if (base) { // `base` might be a nullptr placeholder
1224
1
                    base_compactions.push_back(base);
1225
1
                }
1226
1
            }
1227
182
            for (auto& [_, cumus] : _executing_cumu_compactions) {
1228
175
                for (auto& cumu : cumus) {
1229
175
                    cumu_compactions.push_back(cumu);
1230
175
                }
1231
175
            }
1232
182
            for (auto& [_, full] : _executing_full_compactions) {
1233
7
                if (full) {
1234
7
                    full_compactions.push_back(full);
1235
7
                }
1236
7
            }
1237
182
            for (auto& [_, stop_token] : _active_compaction_stop_tokens) {
1238
8
                if (stop_token) {
1239
8
                    compation_stop_tokens.push_back(stop_token);
1240
8
                }
1241
8
            }
1242
182
            for (auto& [_, index_change] : _submitted_index_change_cumu_compaction) {
1243
4
                if (index_change) {
1244
4
                    index_change_compations.push_back(index_change);
1245
4
                }
1246
4
            }
1247
182
            for (auto& [_, index_change] : _submitted_index_change_base_compaction) {
1248
0
                if (index_change) {
1249
0
                    index_change_compations.push_back(index_change);
1250
0
                }
1251
0
            }
1252
182
        }
1253
        // TODO(plat1ko): Support batch lease rpc
1254
182
        for (auto& stop_token : compation_stop_tokens) {
1255
8
            stop_token->do_lease();
1256
8
        }
1257
182
        for (auto& comp : full_compactions) {
1258
7
            comp->do_lease();
1259
7
        }
1260
182
        for (auto& comp : cumu_compactions) {
1261
175
            comp->do_lease();
1262
175
        }
1263
182
        for (auto& comp : base_compactions) {
1264
1
            comp->do_lease();
1265
1
        }
1266
182
        for (auto& comp : index_change_compations) {
1267
4
            comp->do_lease();
1268
4
        }
1269
182
    }
1270
1
}
1271
1272
1
void CloudStorageEngine::_check_tablet_delete_bitmap_score_callback() {
1273
1
    LOG(INFO) << "try to start check tablet delete bitmap score!";
1274
13
    while (!_stop_background_threads_latch.wait_for(
1275
13
            std::chrono::seconds(config::check_tablet_delete_bitmap_interval_seconds))) {
1276
12
        if (!config::enable_check_tablet_delete_bitmap_score) {
1277
0
            return;
1278
0
        }
1279
12
        uint64_t max_delete_bitmap_score = 0;
1280
12
        uint64_t max_base_rowset_delete_bitmap_score = 0;
1281
12
        tablet_mgr().get_topn_tablet_delete_bitmap_score(&max_delete_bitmap_score,
1282
12
                                                         &max_base_rowset_delete_bitmap_score);
1283
12
        if (max_delete_bitmap_score > 0) {
1284
12
            _tablet_max_delete_bitmap_score_metrics->set_value(max_delete_bitmap_score);
1285
12
        }
1286
12
        if (max_base_rowset_delete_bitmap_score > 0) {
1287
11
            _tablet_max_base_rowset_delete_bitmap_score_metrics->set_value(
1288
11
                    max_base_rowset_delete_bitmap_score);
1289
11
        }
1290
12
    }
1291
1
}
1292
1293
0
Status CloudStorageEngine::get_compaction_status_json(std::string* result) {
1294
0
    rapidjson::Document root;
1295
0
    root.SetObject();
1296
1297
0
    std::lock_guard lock(_compaction_mtx);
1298
    // cumu
1299
0
    std::string_view cumu = "CumulativeCompaction";
1300
0
    rapidjson::Value cumu_key;
1301
0
    cumu_key.SetString(cumu.data(), cast_set<uint32_t>(cumu.length()), root.GetAllocator());
1302
0
    rapidjson::Document cumu_arr;
1303
0
    cumu_arr.SetArray();
1304
0
    for (auto& [tablet_id, v] : _submitted_cumu_compactions) {
1305
0
        for (int i = 0; i < v.size(); ++i) {
1306
0
            cumu_arr.PushBack(tablet_id, root.GetAllocator());
1307
0
        }
1308
0
    }
1309
0
    root.AddMember(cumu_key, cumu_arr, root.GetAllocator());
1310
    // base
1311
0
    std::string_view base = "BaseCompaction";
1312
0
    rapidjson::Value base_key;
1313
0
    base_key.SetString(base.data(), cast_set<uint32_t>(base.length()), root.GetAllocator());
1314
0
    rapidjson::Document base_arr;
1315
0
    base_arr.SetArray();
1316
0
    for (auto& [tablet_id, _] : _submitted_base_compactions) {
1317
0
        base_arr.PushBack(tablet_id, root.GetAllocator());
1318
0
    }
1319
0
    root.AddMember(base_key, base_arr, root.GetAllocator());
1320
1321
0
    rapidjson::StringBuffer strbuf;
1322
0
    rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
1323
0
    root.Accept(writer);
1324
0
    *result = std::string(strbuf.GetString());
1325
0
    return Status::OK();
1326
0
}
1327
1328
std::shared_ptr<CloudCumulativeCompactionPolicy> CloudStorageEngine::cumu_compaction_policy(
1329
116k
        std::string_view compaction_policy) {
1330
116k
    if (!_cumulative_compaction_policies.contains(compaction_policy)) {
1331
0
        return _cumulative_compaction_policies.at(CUMULATIVE_SIZE_BASED_POLICY);
1332
0
    }
1333
116k
    return _cumulative_compaction_policies.at(compaction_policy);
1334
116k
}
1335
1336
Status CloudStorageEngine::register_compaction_stop_token(CloudTabletSPtr tablet,
1337
1.17k
                                                          int64_t initiator) {
1338
1.17k
    {
1339
1.17k
        std::lock_guard lock(_compaction_mtx);
1340
1.17k
        auto [_, success] = _active_compaction_stop_tokens.emplace(tablet->tablet_id(), nullptr);
1341
1.17k
        if (!success) {
1342
0
            return Status::AlreadyExist("stop token already exists for tablet_id={}",
1343
0
                                        tablet->tablet_id());
1344
0
        }
1345
1.17k
    }
1346
1347
1.17k
    auto stop_token = std::make_shared<CloudCompactionStopToken>(*this, tablet, initiator);
1348
1.17k
    auto st = stop_token->do_register();
1349
1350
1.17k
    if (!st.ok()) {
1351
0
        std::lock_guard lock(_compaction_mtx);
1352
0
        _active_compaction_stop_tokens.erase(tablet->tablet_id());
1353
0
        return st;
1354
0
    }
1355
1356
1.17k
    {
1357
1.17k
        std::lock_guard lock(_compaction_mtx);
1358
1.17k
        _active_compaction_stop_tokens[tablet->tablet_id()] = stop_token;
1359
1.17k
    }
1360
1.17k
    LOG_INFO(
1361
1.17k
            "successfully register compaction stop token for tablet_id={}, "
1362
1.17k
            "delete_bitmap_lock_initiator={}",
1363
1.17k
            tablet->tablet_id(), initiator);
1364
1.17k
    return st;
1365
1.17k
}
1366
1367
1.17k
Status CloudStorageEngine::unregister_compaction_stop_token(CloudTabletSPtr tablet, bool clear_ms) {
1368
1.17k
    std::shared_ptr<CloudCompactionStopToken> stop_token;
1369
1.17k
    {
1370
1.17k
        std::lock_guard lock(_compaction_mtx);
1371
1.17k
        if (auto it = _active_compaction_stop_tokens.find(tablet->tablet_id());
1372
1.17k
            it != _active_compaction_stop_tokens.end()) {
1373
1.17k
            stop_token = it->second;
1374
18.4E
        } else {
1375
18.4E
            return Status::NotFound("stop token not found for tablet_id={}", tablet->tablet_id());
1376
18.4E
        }
1377
1.17k
        _active_compaction_stop_tokens.erase(tablet->tablet_id());
1378
1.17k
    }
1379
1.17k
    LOG_INFO("successfully unregister compaction stop token for tablet_id={}", tablet->tablet_id());
1380
1.17k
    if (stop_token && clear_ms) {
1381
0
        RETURN_IF_ERROR(stop_token->do_unregister());
1382
0
        LOG_INFO(
1383
0
                "successfully remove compaction stop token from MS for tablet_id={}, "
1384
0
                "delete_bitmap_lock_initiator={}",
1385
0
                tablet->tablet_id(), stop_token->initiator());
1386
0
    }
1387
1.17k
    return Status::OK();
1388
1.17k
}
1389
1390
1
Status CloudStorageEngine::_check_all_root_path_cluster_id() {
1391
    // Check if all root paths have the same cluster id
1392
1
    std::set<int32_t> cluster_ids;
1393
1
    for (const auto& path : _options.store_paths) {
1394
1
        auto cluster_id_path = fmt::format("{}/{}", path.path, CLUSTER_ID_PREFIX);
1395
1
        bool exists = false;
1396
1
        RETURN_IF_ERROR(io::global_local_filesystem()->exists(cluster_id_path, &exists));
1397
1
        if (exists) {
1398
0
            io::FileReaderSPtr reader;
1399
0
            RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cluster_id_path, &reader));
1400
0
            size_t fsize = reader->size();
1401
0
            if (fsize > 0) {
1402
0
                std::string content;
1403
0
                content.resize(fsize, '\0');
1404
0
                size_t bytes_read = 0;
1405
0
                RETURN_IF_ERROR(reader->read_at(0, {content.data(), fsize}, &bytes_read));
1406
0
                DCHECK_EQ(fsize, bytes_read);
1407
0
                int32_t tmp_cluster_id = std::stoi(content);
1408
0
                cluster_ids.insert(tmp_cluster_id);
1409
0
            }
1410
0
        }
1411
1
    }
1412
1
    _effective_cluster_id = config::cluster_id;
1413
    // first init
1414
1
    if (cluster_ids.empty()) {
1415
        // not set configured cluster id
1416
1
        if (_effective_cluster_id == -1) {
1417
1
            return Status::OK();
1418
1
        } else {
1419
            // If no cluster id file exists, use the configured cluster id
1420
0
            return set_cluster_id(_effective_cluster_id);
1421
0
        }
1422
1
    }
1423
0
    if (cluster_ids.size() > 1) {
1424
0
        return Status::InternalError(
1425
0
                "All root paths must have the same cluster id, but you have "
1426
0
                "different cluster ids: {}",
1427
0
                fmt::join(cluster_ids, ", "));
1428
0
    }
1429
0
    if (_effective_cluster_id != -1 && !cluster_ids.empty() &&
1430
0
        *cluster_ids.begin() != _effective_cluster_id) {
1431
0
        return Status::Corruption(
1432
0
                "multiple cluster ids is not equal. config::cluster_id={}, "
1433
0
                "storage path cluster_id={}",
1434
0
                _effective_cluster_id, *cluster_ids.begin());
1435
0
    }
1436
0
    return Status::OK();
1437
0
}
1438
1439
1
Status CloudStorageEngine::set_cluster_id(int32_t cluster_id) {
1440
1
    std::lock_guard<std::mutex> l(_store_lock);
1441
1
    for (auto& path : _options.store_paths) {
1442
1
        auto cluster_id_path = fmt::format("{}/{}", path.path, CLUSTER_ID_PREFIX);
1443
1
        bool exists = false;
1444
1
        RETURN_IF_ERROR(io::global_local_filesystem()->exists(cluster_id_path, &exists));
1445
1
        if (!exists) {
1446
1
            io::FileWriterPtr file_writer;
1447
1
            RETURN_IF_ERROR(
1448
1
                    io::global_local_filesystem()->create_file(cluster_id_path, &file_writer));
1449
1
            RETURN_IF_ERROR(file_writer->append(std::to_string(cluster_id)));
1450
1
            RETURN_IF_ERROR(file_writer->close());
1451
1
        }
1452
1
    }
1453
1
    _effective_cluster_id = cluster_id;
1454
1
    return Status::OK();
1455
1
}
1456
1457
#include "common/compile_check_end.h"
1458
} // namespace doris