Coverage Report

Created: 2026-06-25 15:20

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_storage_engine.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_storage_engine.h"
19
20
#include <bvar/reducer.h>
21
#include <gen_cpp/PlanNodes_types.h>
22
#include <gen_cpp/cloud.pb.h>
23
#include <gen_cpp/olap_file.pb.h>
24
#include <rapidjson/document.h>
25
#include <rapidjson/encodings.h>
26
#include <rapidjson/prettywriter.h>
27
#include <rapidjson/stringbuffer.h>
28
29
#include <algorithm>
30
#include <memory>
31
#include <variant>
32
33
#include "cloud/cloud_base_compaction.h"
34
#include "cloud/cloud_compaction_stop_token.h"
35
#include "cloud/cloud_cumulative_compaction.h"
36
#include "cloud/cloud_cumulative_compaction_policy.h"
37
#include "cloud/cloud_full_compaction.h"
38
#include "cloud/cloud_index_change_compaction.h"
39
#include "cloud/cloud_meta_mgr.h"
40
#include "cloud/cloud_snapshot_mgr.h"
41
#include "cloud/cloud_tablet_hotspot.h"
42
#include "cloud/cloud_tablet_mgr.h"
43
#include "cloud/cloud_txn_delete_bitmap_cache.h"
44
#include "cloud/cloud_warm_up_manager.h"
45
#include "cloud/config.h"
46
#include "common/config.h"
47
#include "common/metrics/doris_metrics.h"
48
#include "common/signal_handler.h"
49
#include "common/status.h"
50
#include "core/assert_cast.h"
51
#include "io/cache/block_file_cache_downloader.h"
52
#include "io/cache/block_file_cache_factory.h"
53
#include "io/cache/file_cache_common.h"
54
#include "io/fs/file_system.h"
55
#include "io/fs/hdfs_file_system.h"
56
#include "io/fs/s3_file_system.h"
57
#include "io/hdfs_util.h"
58
#include "io/io_common.h"
59
#include "load/memtable/memtable_flush_executor.h"
60
#include "runtime/exec_env.h"
61
#include "runtime/memory/cache_manager.h"
62
#include "storage/compaction/cumulative_compaction_policy.h"
63
#include "storage/compaction/cumulative_compaction_time_series_policy.h"
64
#include "storage/compaction_task_tracker.h"
65
#include "storage/storage_policy.h"
66
#include "util/parse_util.h"
67
#include "util/time.h"
68
69
namespace doris {
70
71
using namespace std::literals;
72
73
bvar::Adder<uint64_t> g_base_compaction_running_task_count("base_compaction_running_task_count");
74
bvar::Adder<uint64_t> g_full_compaction_running_task_count("full_compaction_running_task_count");
75
bvar::Adder<uint64_t> g_cumu_compaction_running_task_count(
76
        "cumulative_compaction_running_task_count");
77
78
24.5k
int get_cumu_thread_num() {
79
24.5k
    if (config::max_cumu_compaction_threads > 0) {
80
0
        return config::max_cumu_compaction_threads;
81
0
    }
82
83
24.5k
    int num_cores = doris::CpuInfo::num_cores();
84
24.5k
    return std::min(std::max(int(num_cores * config::cumu_compaction_thread_num_factor), 2), 20);
85
24.5k
}
86
87
24.5k
int get_base_thread_num() {
88
24.5k
    if (config::max_base_compaction_threads > 0) {
89
24.5k
        return config::max_base_compaction_threads;
90
24.5k
    }
91
92
0
    int num_cores = doris::CpuInfo::num_cores();
93
0
    return std::min(std::max(int(num_cores * config::base_compaction_thread_num_factor), 1), 10);
94
24.5k
}
95
96
CloudStorageEngine::CloudStorageEngine(const EngineOptions& options)
97
184
        : BaseStorageEngine(Type::CLOUD, options.backend_uid),
98
184
          _meta_mgr(std::make_unique<cloud::CloudMetaMgr>()),
99
184
          _tablet_mgr(std::make_unique<CloudTabletMgr>(*this)),
100
184
          _options(options) {
101
184
    _cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY] =
102
184
            std::make_shared<CloudSizeBasedCumulativeCompactionPolicy>();
103
184
    _cumulative_compaction_policies[CUMULATIVE_TIME_SERIES_POLICY] =
104
184
            std::make_shared<CloudTimeSeriesCumulativeCompactionPolicy>();
105
184
    _startup_timepoint = std::chrono::system_clock::now();
106
184
}
107
108
183
CloudStorageEngine::~CloudStorageEngine() {
109
183
    stop();
110
183
}
111
112
static Status vault_process_error(std::string_view id,
113
0
                                  std::variant<S3Conf, cloud::HdfsVaultInfo>& vault, Status err) {
114
0
    std::stringstream ss;
115
0
    std::visit(
116
0
            [&]<typename T>(T& val) {
117
0
                if constexpr (std::is_same_v<T, S3Conf>) {
118
0
                    ss << val.to_string();
119
0
                } else if constexpr (std::is_same_v<T, cloud::HdfsVaultInfo>) {
120
0
                    val.SerializeToOstream(&ss);
121
0
                }
122
0
            },
Unexecuted instantiation: cloud_storage_engine.cpp:_ZZN5dorisL19vault_process_errorESt17basic_string_viewIcSt11char_traitsIcEERSt7variantIJNS_6S3ConfENS_5cloud13HdfsVaultInfoEEENS_6StatusEENK3$_0clIS5_EEDaRT_
Unexecuted instantiation: cloud_storage_engine.cpp:_ZZN5dorisL19vault_process_errorESt17basic_string_viewIcSt11char_traitsIcEERSt7variantIJNS_6S3ConfENS_5cloud13HdfsVaultInfoEEENS_6StatusEENK3$_0clIS7_EEDaRT_
123
0
            vault);
124
0
    return Status::IOError("Invalid vault, id {}, err {}, detail conf {}", id, err, ss.str());
125
0
}
126
127
struct VaultCreateFSVisitor {
128
    VaultCreateFSVisitor(const std::string& id, const cloud::StorageVaultPB_PathFormat& path_format,
129
                         bool check_fs)
130
1
            : id(id), path_format(path_format), check_fs(check_fs) {}
131
1
    Status operator()(const S3Conf& s3_conf) const {
132
1
        LOG(INFO) << "get new s3 info: " << s3_conf.to_string() << " resource_id=" << id
133
1
                  << " check_fs: " << check_fs;
134
135
1
        auto fs = DORIS_TRY(io::S3FileSystem::create(s3_conf, id));
136
1
        if (check_fs && !s3_conf.client_conf.role_arn.empty()) {
137
0
            bool res = false;
138
            // just check connectivity, not care object if exist
139
0
            auto st = fs->exists("not_exist_object", &res);
140
0
            if (!st.ok()) {
141
0
                LOG(FATAL) << "failed to check s3 fs, resource_id: " << id << " st: " << st
142
0
                           << "s3_conf: " << s3_conf.to_string()
143
0
                           << "add enable_check_storage_vault=false to be.conf to skip the check";
144
0
            }
145
0
        }
146
147
1
        put_storage_resource(id, {std::move(fs), path_format}, 0);
148
1
        LOG_INFO("successfully create s3 vault, vault id {}", id);
149
1
        return Status::OK();
150
1
    }
151
152
    // TODO(ByteYue): Make sure enable_java_support is on
153
0
    Status operator()(const cloud::HdfsVaultInfo& vault) const {
154
0
        auto hdfs_params = io::to_hdfs_params(vault);
155
0
        auto fs = DORIS_TRY(io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name, id,
156
0
                                                       nullptr, vault.prefix()));
157
0
        put_storage_resource(id, {std::move(fs), path_format}, 0);
158
0
        LOG_INFO("successfully create hdfs vault, vault id {}", id);
159
0
        return Status::OK();
160
0
    }
161
162
    const std::string& id;
163
    const cloud::StorageVaultPB_PathFormat& path_format;
164
    bool check_fs;
165
};
166
167
struct RefreshFSVaultVisitor {
168
    RefreshFSVaultVisitor(const std::string& id, io::FileSystemSPtr fs,
169
                          const cloud::StorageVaultPB_PathFormat& path_format)
170
318
            : id(id), fs(std::move(fs)), path_format(path_format) {}
171
172
318
    Status operator()(const S3Conf& s3_conf) const {
173
318
        DCHECK_EQ(fs->type(), io::FileSystemType::S3) << id;
174
318
        auto s3_fs = std::static_pointer_cast<io::S3FileSystem>(fs);
175
318
        auto client_holder = s3_fs->client_holder();
176
318
        auto st = client_holder->reset(s3_conf.client_conf);
177
318
        if (!st.ok()) {
178
0
            LOG(WARNING) << "failed to update s3 fs, resource_id=" << id << ": " << st;
179
0
        }
180
318
        return st;
181
318
    }
182
183
0
    Status operator()(const cloud::HdfsVaultInfo& vault) const {
184
0
        auto hdfs_params = io::to_hdfs_params(vault);
185
0
        auto hdfs_fs =
186
0
                DORIS_TRY(io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name, id, nullptr,
187
0
                                                     vault.has_prefix() ? vault.prefix() : ""));
188
0
        auto hdfs = std::static_pointer_cast<io::HdfsFileSystem>(hdfs_fs);
189
0
        put_storage_resource(id, {std::move(hdfs), path_format}, 0);
190
0
        return Status::OK();
191
0
    }
192
193
    const std::string& id;
194
    io::FileSystemSPtr fs;
195
    const cloud::StorageVaultPB_PathFormat& path_format;
196
};
197
198
1
Status CloudStorageEngine::open() {
199
1
    sync_storage_vault();
200
201
    // TODO(plat1ko): DeleteBitmapTxnManager
202
203
1
    _memtable_flush_executor = std::make_unique<MemTableFlushExecutor>();
204
    // Use file cache disks number
205
1
    _memtable_flush_executor->init(
206
1
            cast_set<int32_t>(io::FileCacheFactory::instance()->get_cache_instance_size()));
207
208
1
    _calc_delete_bitmap_executor = std::make_unique<CalcDeleteBitmapExecutor>();
209
1
    _calc_delete_bitmap_executor->init(config::calc_delete_bitmap_max_thread);
210
211
1
    _calc_delete_bitmap_executor_for_load = std::make_unique<CalcDeleteBitmapExecutor>();
212
1
    _calc_delete_bitmap_executor_for_load->init(
213
1
            config::calc_delete_bitmap_for_load_max_thread > 0
214
1
                    ? config::calc_delete_bitmap_for_load_max_thread
215
1
                    : std::max(1, CpuInfo::num_cores() / 2));
216
217
    // The default cache is set to 100MB, use memory limit to dynamic adjustment
218
1
    bool is_percent = false;
219
1
    int64_t delete_bitmap_agg_cache_cache_limit =
220
1
            ParseUtil::parse_mem_spec(config::delete_bitmap_dynamic_agg_cache_limit,
221
1
                                      MemInfo::mem_limit(), MemInfo::physical_mem(), &is_percent);
222
1
    _txn_delete_bitmap_cache = std::make_unique<CloudTxnDeleteBitmapCache>(
223
1
            delete_bitmap_agg_cache_cache_limit > config::delete_bitmap_agg_cache_capacity
224
1
                    ? delete_bitmap_agg_cache_cache_limit
225
1
                    : config::delete_bitmap_agg_cache_capacity);
226
1
    RETURN_IF_ERROR(_txn_delete_bitmap_cache->init());
227
228
1
    _committed_rs_mgr = std::make_unique<CloudCommittedRSMgr>();
229
1
    RETURN_IF_ERROR(_committed_rs_mgr->init());
230
231
1
    _file_cache_block_downloader = std::make_unique<io::FileCacheBlockDownloader>(*this);
232
233
1
    _cloud_warm_up_manager = std::make_unique<CloudWarmUpManager>(*this);
234
235
1
    _tablet_hotspot = std::make_unique<TabletHotspot>();
236
237
1
    _cloud_snapshot_mgr = std::make_unique<CloudSnapshotMgr>(*this);
238
239
1
    RETURN_NOT_OK_STATUS_WITH_WARN(
240
1
            init_stream_load_recorder(ExecEnv::GetInstance()->store_paths()[0].path),
241
1
            "init StreamLoadRecorder failed");
242
243
    // check cluster id
244
1
    RETURN_NOT_OK_STATUS_WITH_WARN(_check_all_root_path_cluster_id(), "fail to check cluster id");
245
246
1
    RETURN_NOT_OK_STATUS_WITH_WARN(ThreadPoolBuilder("SyncLoadForTabletsThreadPool")
247
1
                                           .set_max_threads(config::sync_load_for_tablets_thread)
248
1
                                           .set_min_threads(config::sync_load_for_tablets_thread)
249
1
                                           .build(&_sync_load_for_tablets_thread_pool),
250
1
                                   "fail to build SyncLoadForTabletsThreadPool");
251
252
1
    RETURN_NOT_OK_STATUS_WITH_WARN(ThreadPoolBuilder("WarmupCacheAsyncThreadPool")
253
1
                                           .set_max_threads(config::warmup_cache_async_thread)
254
1
                                           .set_min_threads(config::warmup_cache_async_thread)
255
1
                                           .build(&_warmup_cache_async_thread_pool),
256
1
                                   "fail to build WarmupCacheAsyncThreadPool");
257
258
1
    return Status::OK();
259
1
}
260
261
183
void CloudStorageEngine::stop() {
262
183
    if (_stopped) {
263
0
        return;
264
0
    }
265
266
183
    _stopped = true;
267
183
    _stop_background_threads_latch.count_down();
268
269
183
    for (auto&& t : _bg_threads) {
270
0
        if (t) {
271
0
            t->join();
272
0
        }
273
0
    }
274
275
183
    if (_base_compaction_thread_pool) {
276
0
        _base_compaction_thread_pool->shutdown();
277
0
    }
278
183
    if (_cumu_compaction_thread_pool) {
279
0
        _cumu_compaction_thread_pool->shutdown();
280
0
    }
281
183
    _adaptive_thread_controller.stop();
282
183
    LOG(INFO) << "Cloud storage engine is stopped.";
283
284
183
    if (_calc_tablet_delete_bitmap_task_thread_pool) {
285
0
        _calc_tablet_delete_bitmap_task_thread_pool->shutdown();
286
0
    }
287
183
    if (_sync_delete_bitmap_thread_pool) {
288
0
        _sync_delete_bitmap_thread_pool->shutdown();
289
0
    }
290
183
}
291
292
57.4k
bool CloudStorageEngine::stopped() {
293
57.4k
    return _stopped;
294
57.4k
}
295
296
Result<BaseTabletSPtr> CloudStorageEngine::get_tablet(int64_t tablet_id,
297
                                                      SyncRowsetStats* sync_stats,
298
                                                      bool force_use_only_cached,
299
1.27M
                                                      bool cache_on_miss) {
300
1.27M
    return _tablet_mgr
301
1.27M
            ->get_tablet(tablet_id, false, true, sync_stats, force_use_only_cached, cache_on_miss)
302
1.27M
            .transform([](auto&& t) { return static_pointer_cast<BaseTablet>(std::move(t)); });
303
1.27M
}
304
305
Status CloudStorageEngine::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta,
306
286k
                                           bool force_use_only_cached) {
307
286k
    if (tablet_meta == nullptr) {
308
0
        return Status::InvalidArgument("tablet_meta output is null");
309
0
    }
310
311
#if 0
312
    if (_tablet_mgr && _tablet_mgr->peek_tablet_meta(tablet_id, tablet_meta)) {
313
        return Status::OK();
314
    }
315
316
    if (force_use_only_cached) {
317
        return Status::NotFound("tablet meta {} not found in cache", tablet_id);
318
    }
319
#endif
320
321
286k
    if (_meta_mgr == nullptr) {
322
0
        return Status::InternalError("cloud meta manager is not initialized");
323
0
    }
324
325
286k
    return _meta_mgr->get_tablet_meta(tablet_id, tablet_meta);
326
286k
}
327
328
1
Status CloudStorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr) {
329
1
    RETURN_IF_ERROR(Thread::create(
330
1
            "CloudStorageEngine", "refresh_s3_info_thread",
331
1
            [this]() { this->_refresh_storage_vault_info_thread_callback(); },
332
1
            &_bg_threads.emplace_back()));
333
1
    LOG(INFO) << "refresh s3 info thread started";
334
335
1
    RETURN_IF_ERROR(Thread::create(
336
1
            "CloudStorageEngine", "vacuum_stale_rowsets_thread",
337
1
            [this]() { this->_vacuum_stale_rowsets_thread_callback(); },
338
1
            &_bg_threads.emplace_back()));
339
1
    LOG(INFO) << "vacuum stale rowsets thread started";
340
341
1
    RETURN_IF_ERROR(Thread::create(
342
1
            "CloudStorageEngine", "sync_tablets_thread",
343
1
            [this]() { this->_sync_tablets_thread_callback(); }, &_bg_threads.emplace_back()));
344
1
    LOG(INFO) << "sync tablets thread started";
345
346
1
    RETURN_IF_ERROR(Thread::create(
347
1
            "CloudStorageEngine", "evict_querying_rowset_thread",
348
1
            [this]() { this->_evict_quring_rowset_thread_callback(); },
349
1
            &_evict_quering_rowset_thread));
350
1
    LOG(INFO) << "evict quering thread started";
351
352
    // add calculate tablet delete bitmap task thread pool
353
1
    RETURN_IF_ERROR(ThreadPoolBuilder("TabletCalDeleteBitmapThreadPool")
354
1
                            .set_min_threads(config::calc_tablet_delete_bitmap_task_max_thread)
355
1
                            .set_max_threads(config::calc_tablet_delete_bitmap_task_max_thread)
356
1
                            .build(&_calc_tablet_delete_bitmap_task_thread_pool));
357
1
    RETURN_IF_ERROR(ThreadPoolBuilder("SyncDeleteBitmapThreadPool")
358
1
                            .set_min_threads(config::sync_delete_bitmap_task_max_thread)
359
1
                            .set_max_threads(config::sync_delete_bitmap_task_max_thread)
360
1
                            .build(&_sync_delete_bitmap_thread_pool));
361
362
    // TODO(plat1ko): check_bucket_enable_versioning_thread
363
364
    // compaction tasks producer thread
365
1
    int base_thread_num = get_base_thread_num();
366
1
    int cumu_thread_num = get_cumu_thread_num();
367
368
1
    RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
369
1
                            .set_min_threads(base_thread_num)
370
1
                            .set_max_threads(base_thread_num)
371
1
                            .build(&_base_compaction_thread_pool));
372
1
    RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
373
1
                            .set_min_threads(cumu_thread_num)
374
1
                            .set_max_threads(cumu_thread_num)
375
1
                            .build(&_cumu_compaction_thread_pool));
376
1
    RETURN_IF_ERROR(Thread::create(
377
1
            "StorageEngine", "compaction_tasks_producer_thread",
378
1
            [this]() { this->_compaction_tasks_producer_callback(); },
379
1
            &_bg_threads.emplace_back()));
380
1
    LOG(INFO) << "compaction tasks producer thread started,"
381
1
              << " base thread num " << base_thread_num << " cumu thread num " << cumu_thread_num;
382
383
1
    RETURN_IF_ERROR(Thread::create(
384
1
            "StorageEngine", "lease_compaction_thread",
385
1
            [this]() { this->_lease_compaction_thread_callback(); }, &_bg_threads.emplace_back()));
386
387
1
    LOG(INFO) << "lease compaction thread started";
388
389
1
    RETURN_IF_ERROR(Thread::create(
390
1
            "StorageEngine", "check_tablet_delete_bitmap_score_thread",
391
1
            [this]() { this->_check_tablet_delete_bitmap_score_callback(); },
392
1
            &_bg_threads.emplace_back()));
393
1
    LOG(INFO) << "check tablet delete bitmap score thread started";
394
395
1
    _start_adaptive_thread_controller();
396
397
1
    return Status::OK();
398
1
}
399
400
319
void CloudStorageEngine::sync_storage_vault() {
401
319
    cloud::StorageVaultInfos vault_infos;
402
319
    bool enable_storage_vault = false;
403
404
319
    auto st = _meta_mgr->get_storage_vault_info(&vault_infos, &enable_storage_vault);
405
319
    if (!st.ok()) {
406
0
        LOG(WARNING) << "failed to get storage vault info. err=" << st;
407
0
        return;
408
0
    }
409
410
319
    if (vault_infos.empty()) {
411
0
        LOG(WARNING) << "empty storage vault info";
412
0
        return;
413
0
    }
414
415
319
    bool check_storage_vault = false;
416
319
    bool expected = false;
417
319
    if (first_sync_storage_vault.compare_exchange_strong(expected, true)) {
418
0
        check_storage_vault = config::enable_check_storage_vault;
419
0
        LOG(INFO) << "first sync storage vault info, BE try to check iam role connectivity, "
420
0
                     "check_storage_vault="
421
0
                  << check_storage_vault;
422
0
    }
423
424
319
    for (auto& [id, vault_info, path_format] : vault_infos) {
425
319
        auto fs = get_filesystem(id);
426
319
        auto status =
427
319
                (fs == nullptr)
428
319
                        ? std::visit(VaultCreateFSVisitor {id, path_format, check_storage_vault},
429
1
                                     vault_info)
430
319
                        : std::visit(RefreshFSVaultVisitor {id, std::move(fs), path_format},
431
318
                                     vault_info);
432
319
        if (!status.ok()) [[unlikely]] {
433
0
            LOG(WARNING) << vault_process_error(id, vault_info, std::move(st));
434
0
        }
435
319
    }
436
437
319
    if (auto& id = std::get<0>(vault_infos.back());
438
319
        (latest_fs() == nullptr || latest_fs()->id() != id) && !enable_storage_vault) {
439
1
        set_latest_fs(get_filesystem(id));
440
1
    }
441
319
}
442
443
// We should enable_java_support if we want to use hdfs vault
444
1
void CloudStorageEngine::_refresh_storage_vault_info_thread_callback() {
445
87
    while (!_stop_background_threads_latch.wait_for(
446
87
            std::chrono::seconds(config::refresh_s3_info_interval_s))) {
447
86
        sync_storage_vault();
448
        // The other place that rebuilds the S3 rate limiter is S3ClientFactory::create(), which
449
        // is not called when an existing vault's conf is unchanged. Trigger the check here as well
450
        // so that dynamically modified s3_{get,put}_* rate limiter configs take effect within
451
        // refresh_s3_info_interval_s even when no vault is created or its conf does not change.
452
        // Gate it behind enable_s3_rate_limiter so that clusters with rate limiting disabled
453
        // (e.g. HDFS-only vaults) do not force-initialize S3ClientFactory / the AWS SDK here.
454
86
        if (config::enable_s3_rate_limiter) {
455
0
            check_s3_rate_limiter_config_changed();
456
0
        }
457
86
    }
458
1
}
459
460
1
void CloudStorageEngine::_vacuum_stale_rowsets_thread_callback() {
461
18
    while (!_stop_background_threads_latch.wait_for(
462
18
            std::chrono::seconds(config::vacuum_stale_rowsets_interval_s))) {
463
17
        _tablet_mgr->vacuum_stale_rowsets(_stop_background_threads_latch);
464
17
    }
465
1
}
466
467
1
void CloudStorageEngine::_sync_tablets_thread_callback() {
468
8
    while (!_stop_background_threads_latch.wait_for(
469
8
            std::chrono::seconds(config::schedule_sync_tablets_interval_s))) {
470
7
        _tablet_mgr->sync_tablets(_stop_background_threads_latch);
471
7
    }
472
1
}
473
474
void CloudStorageEngine::get_cumu_compaction(
475
144k
        int64_t tablet_id, std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res) {
476
144k
    std::lock_guard lock(_compaction_mtx);
477
144k
    if (auto it = _submitted_cumu_compactions.find(tablet_id);
478
144k
        it != _submitted_cumu_compactions.end()) {
479
0
        res = it->second;
480
0
    }
481
144k
}
482
483
24.5k
Status CloudStorageEngine::_adjust_compaction_thread_num() {
484
24.5k
    int base_thread_num = get_base_thread_num();
485
486
24.5k
    if (!_base_compaction_thread_pool || !_cumu_compaction_thread_pool) {
487
0
        LOG(WARNING) << "base or cumu compaction thread pool is not created";
488
0
        return Status::Error<ErrorCode::INTERNAL_ERROR, false>("");
489
0
    }
490
491
24.5k
    if (_base_compaction_thread_pool->max_threads() != base_thread_num) {
492
0
        int old_max_threads = _base_compaction_thread_pool->max_threads();
493
0
        Status status = _base_compaction_thread_pool->set_max_threads(base_thread_num);
494
0
        if (status.ok()) {
495
0
            VLOG_NOTICE << "update base compaction thread pool max_threads from " << old_max_threads
496
0
                        << " to " << base_thread_num;
497
0
        }
498
0
    }
499
24.5k
    if (_base_compaction_thread_pool->min_threads() != base_thread_num) {
500
0
        int old_min_threads = _base_compaction_thread_pool->min_threads();
501
0
        Status status = _base_compaction_thread_pool->set_min_threads(base_thread_num);
502
0
        if (status.ok()) {
503
0
            VLOG_NOTICE << "update base compaction thread pool min_threads from " << old_min_threads
504
0
                        << " to " << base_thread_num;
505
0
        }
506
0
    }
507
508
24.5k
    int cumu_thread_num = get_cumu_thread_num();
509
24.5k
    if (_cumu_compaction_thread_pool->max_threads() != cumu_thread_num) {
510
0
        int old_max_threads = _cumu_compaction_thread_pool->max_threads();
511
0
        Status status = _cumu_compaction_thread_pool->set_max_threads(cumu_thread_num);
512
0
        if (status.ok()) {
513
0
            VLOG_NOTICE << "update cumu compaction thread pool max_threads from " << old_max_threads
514
0
                        << " to " << cumu_thread_num;
515
0
        }
516
0
    }
517
24.5k
    if (_cumu_compaction_thread_pool->min_threads() != cumu_thread_num) {
518
0
        int old_min_threads = _cumu_compaction_thread_pool->min_threads();
519
0
        Status status = _cumu_compaction_thread_pool->set_min_threads(cumu_thread_num);
520
0
        if (status.ok()) {
521
0
            VLOG_NOTICE << "update cumu compaction thread pool min_threads from " << old_min_threads
522
0
                        << " to " << cumu_thread_num;
523
0
        }
524
0
    }
525
24.5k
    return Status::OK();
526
24.5k
}
527
528
1
void CloudStorageEngine::_compaction_tasks_producer_callback() {
529
1
    LOG(INFO) << "try to start compaction producer process!";
530
531
1
    int round = 0;
532
1
    CompactionType compaction_type;
533
534
    // Used to record the time when the score metric was last updated.
535
    // The update of the score metric is accompanied by the logic of selecting the tablet.
536
    // If there is no slot available, the logic of selecting the tablet will be terminated,
537
    // which causes the score metric update to be terminated.
538
    // In order to avoid this situation, we need to update the score regularly.
539
1
    int64_t last_cumulative_score_update_time = 0;
540
1
    int64_t last_base_score_update_time = 0;
541
1
    static const int64_t check_score_interval_ms = 5000; // 5 secs
542
543
1
    int64_t interval = config::generate_compaction_tasks_interval_ms;
544
24.5k
    do {
545
24.5k
        int64_t cur_time = UnixMillis();
546
24.5k
        if (!config::disable_auto_compaction) {
547
24.5k
            Status st = _adjust_compaction_thread_num();
548
24.5k
            if (!st.ok()) {
549
0
                break;
550
0
            }
551
552
24.5k
            bool check_score = false;
553
24.5k
            if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) {
554
22.1k
                compaction_type = CompactionType::CUMULATIVE_COMPACTION;
555
22.1k
                round++;
556
22.1k
                if (cur_time - last_cumulative_score_update_time >= check_score_interval_ms) {
557
999
                    check_score = true;
558
999
                    last_cumulative_score_update_time = cur_time;
559
999
                }
560
22.1k
            } else {
561
2.45k
                compaction_type = CompactionType::BASE_COMPACTION;
562
2.45k
                round = 0;
563
2.45k
                if (cur_time - last_base_score_update_time >= check_score_interval_ms) {
564
845
                    check_score = true;
565
845
                    last_base_score_update_time = cur_time;
566
845
                }
567
2.45k
            }
568
24.5k
            std::unique_ptr<ThreadPool>& thread_pool =
569
24.5k
                    (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
570
24.5k
                            ? _cumu_compaction_thread_pool
571
24.5k
                            : _base_compaction_thread_pool;
572
24.5k
            VLOG_CRITICAL << "compaction thread pool. type: "
573
0
                          << (compaction_type == CompactionType::CUMULATIVE_COMPACTION ? "CUMU"
574
0
                                                                                       : "BASE")
575
0
                          << ", num_threads: " << thread_pool->num_threads()
576
0
                          << ", num_threads_pending_start: "
577
0
                          << thread_pool->num_threads_pending_start()
578
0
                          << ", num_active_threads: " << thread_pool->num_active_threads()
579
0
                          << ", max_threads: " << thread_pool->max_threads()
580
0
                          << ", min_threads: " << thread_pool->min_threads()
581
0
                          << ", num_total_queued_tasks: " << thread_pool->get_queue_size();
582
24.5k
            std::vector<CloudTabletSPtr> tablets_compaction =
583
24.5k
                    _generate_cloud_compaction_tasks(compaction_type, check_score);
584
585
            /// Regardless of whether the tablet is submitted for compaction or not,
586
            /// we need to call 'reset_compaction' to clean up the base_compaction or cumulative_compaction objects
587
            /// in the tablet, because these two objects store the tablet's own shared_ptr.
588
            /// If it is not cleaned up, the reference count of the tablet will always be greater than 1,
589
            /// thus cannot be collected by the garbage collector. (TabletManager::start_trash_sweep)
590
148k
            for (const auto& tablet : tablets_compaction) {
591
148k
                Status status = submit_compaction_task(tablet, compaction_type);
592
148k
                if (status.ok()) continue;
593
140k
                if ((!status.is<ErrorCode::BE_NO_SUITABLE_VERSION>() &&
594
140k
                     !status.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) ||
595
140k
                    VLOG_DEBUG_IS_ON) {
596
50
                    LOG(WARNING) << "failed to submit compaction task for tablet: "
597
50
                                 << tablet->tablet_id() << ", err: " << status;
598
50
                }
599
140k
            }
600
24.5k
            interval = config::generate_compaction_tasks_interval_ms;
601
24.5k
        } else {
602
0
            interval = config::check_auto_compaction_interval_seconds * 1000;
603
0
        }
604
24.5k
        int64_t end_time = UnixMillis();
605
24.5k
        DorisMetrics::instance()->compaction_producer_callback_a_round_time->set_value(end_time -
606
24.5k
                                                                                       cur_time);
607
24.5k
    } while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
608
1
}
609
610
void CloudStorageEngine::unregister_index_change_compaction(int64_t tablet_id,
611
848
                                                            bool is_base_compact) {
612
848
    std::lock_guard lock(_compaction_mtx);
613
848
    if (is_base_compact) {
614
0
        _submitted_index_change_base_compaction.erase(tablet_id);
615
848
    } else {
616
848
        _submitted_index_change_cumu_compaction.erase(tablet_id);
617
848
    }
618
848
}
619
620
bool CloudStorageEngine::register_index_change_compaction(
621
        std::shared_ptr<CloudIndexChangeCompaction> compact, int64_t tablet_id,
622
851
        bool is_base_compact, std::string& err_reason) {
623
851
    std::lock_guard lock(_compaction_mtx);
624
851
    if (is_base_compact) {
625
2
        if (_submitted_base_compactions.contains(tablet_id) ||
626
2
            _submitted_full_compactions.contains(tablet_id) ||
627
2
            _submitted_index_change_base_compaction.contains(tablet_id)) {
628
1
            std::stringstream ss;
629
1
            ss << "reason:" << ((int)_submitted_base_compactions.contains(tablet_id)) << ", "
630
1
               << ((int)_submitted_full_compactions.contains(tablet_id)) << ", "
631
1
               << ((int)_submitted_index_change_base_compaction.contains(tablet_id));
632
1
            err_reason = ss.str();
633
1
            return false;
634
1
        } else {
635
1
            _submitted_index_change_base_compaction[tablet_id] = compact;
636
1
            return true;
637
1
        }
638
849
    } else {
639
849
        if (_tablet_preparing_cumu_compaction.contains(tablet_id) ||
640
852
            _submitted_cumu_compactions.contains(tablet_id) ||
641
849
            _submitted_index_change_cumu_compaction.contains(tablet_id)) {
642
3
            std::stringstream ss;
643
3
            ss << "reason:" << ((int)_tablet_preparing_cumu_compaction.contains(tablet_id)) << ", "
644
3
               << ((int)_submitted_cumu_compactions.contains(tablet_id)) << ", "
645
3
               << ((int)_submitted_index_change_cumu_compaction.contains(tablet_id));
646
3
            err_reason = ss.str();
647
3
            return false;
648
846
        } else {
649
846
            _submitted_index_change_cumu_compaction[tablet_id] = compact;
650
846
        }
651
846
        return true;
652
849
    }
653
851
}
654
655
std::vector<CloudTabletSPtr> CloudStorageEngine::_generate_cloud_compaction_tasks(
656
24.5k
        CompactionType compaction_type, bool check_score) {
657
24.5k
    std::vector<std::shared_ptr<CloudTablet>> tablets_compaction;
658
659
24.5k
    int64_t max_compaction_score = 0;
660
24.5k
    std::unordered_set<int64_t> tablet_preparing_cumu_compaction;
661
24.5k
    std::unordered_map<int64_t, std::vector<std::shared_ptr<CloudCumulativeCompaction>>>
662
24.5k
            submitted_cumu_compactions;
663
24.5k
    std::unordered_map<int64_t, std::shared_ptr<CloudBaseCompaction>> submitted_base_compactions;
664
24.5k
    std::unordered_map<int64_t, std::shared_ptr<CloudFullCompaction>> submitted_full_compactions;
665
24.5k
    std::unordered_map<int64_t, std::shared_ptr<CloudIndexChangeCompaction>>
666
24.5k
            submitted_index_change_cumu_compactions;
667
24.5k
    std::unordered_map<int64_t, std::shared_ptr<CloudIndexChangeCompaction>>
668
24.5k
            submitted_index_change_base_compactions;
669
24.5k
    {
670
24.5k
        std::lock_guard lock(_compaction_mtx);
671
24.5k
        tablet_preparing_cumu_compaction = _tablet_preparing_cumu_compaction;
672
24.5k
        submitted_cumu_compactions = _submitted_cumu_compactions;
673
24.5k
        submitted_base_compactions = _submitted_base_compactions;
674
24.5k
        submitted_full_compactions = _submitted_full_compactions;
675
24.5k
        submitted_index_change_cumu_compactions = _submitted_index_change_cumu_compaction;
676
24.5k
        submitted_index_change_base_compactions = _submitted_index_change_base_compaction;
677
24.5k
    }
678
679
24.5k
    bool need_pick_tablet = true;
680
24.5k
    int thread_per_disk =
681
24.5k
            config::compaction_task_num_per_fast_disk; // all disks are fast in cloud mode
682
24.5k
    int num_cumu =
683
24.5k
            std::accumulate(submitted_cumu_compactions.begin(), submitted_cumu_compactions.end(), 0,
684
24.5k
                            [](int a, auto& b) { return a + b.second.size(); });
685
24.5k
    int num_base =
686
24.5k
            cast_set<int>(submitted_base_compactions.size() + submitted_full_compactions.size());
687
24.5k
    int n = thread_per_disk - num_cumu - num_base;
688
24.5k
    if (compaction_type == CompactionType::BASE_COMPACTION) {
689
        // We need to reserve at least one thread for cumulative compaction,
690
        // because base compactions may take too long to complete, which may
691
        // leads to "too many rowsets" error.
692
2.45k
        int base_n = std::min(config::max_base_compaction_task_num_per_disk, thread_per_disk - 1) -
693
2.45k
                     num_base;
694
2.45k
        n = std::min(base_n, n);
695
2.45k
    }
696
24.5k
    if (n <= 0) { // No threads available
697
593
        if (!check_score) return tablets_compaction;
698
39
        need_pick_tablet = false;
699
39
        n = 0;
700
39
    }
701
702
    // Return true for skipping compaction
703
24.0k
    std::function<bool(CloudTablet*)> filter_out;
704
24.0k
    if (compaction_type == CompactionType::BASE_COMPACTION) {
705
2.40k
        filter_out = [&submitted_base_compactions, &submitted_full_compactions,
706
94.0M
                      &submitted_index_change_base_compactions](CloudTablet* t) {
707
94.0M
            return submitted_base_compactions.contains(t->tablet_id()) ||
708
94.0M
                   submitted_full_compactions.contains(t->tablet_id()) ||
709
94.0M
                   submitted_index_change_base_compactions.contains(t->tablet_id()) ||
710
94.0M
                   t->tablet_state() != TABLET_RUNNING;
711
94.0M
        };
712
21.6k
    } else if (config::enable_parallel_cumu_compaction) {
713
0
        filter_out = [&tablet_preparing_cumu_compaction,
714
0
                      &submitted_index_change_cumu_compactions](CloudTablet* t) {
715
0
            return tablet_preparing_cumu_compaction.contains(t->tablet_id()) ||
716
0
                   submitted_index_change_cumu_compactions.contains(t->tablet_id()) ||
717
0
                   (t->tablet_state() != TABLET_RUNNING &&
718
0
                    (!config::enable_new_tablet_do_compaction || t->alter_version() == -1));
719
0
        };
720
21.6k
    } else {
721
21.6k
        filter_out = [&tablet_preparing_cumu_compaction, &submitted_cumu_compactions,
722
408M
                      &submitted_index_change_cumu_compactions](CloudTablet* t) {
723
408M
            return tablet_preparing_cumu_compaction.contains(t->tablet_id()) ||
724
408M
                   submitted_index_change_cumu_compactions.contains(t->tablet_id()) ||
725
408M
                   submitted_cumu_compactions.contains(t->tablet_id()) ||
726
408M
                   (t->tablet_state() != TABLET_RUNNING &&
727
408M
                    (!config::enable_new_tablet_do_compaction || t->alter_version() == -1));
728
408M
        };
729
21.6k
    }
730
731
    // Even if need_pick_tablet is false, we still need to call find_best_tablet_to_compaction(),
732
    // So that we can update the max_compaction_score metric.
733
24.0k
    do {
734
24.0k
        std::vector<CloudTabletSPtr> tablets;
735
24.0k
        auto st = tablet_mgr().get_topn_tablets_to_compact(n, compaction_type, filter_out, &tablets,
736
24.0k
                                                           &max_compaction_score);
737
24.0k
        if (!st.ok()) {
738
0
            LOG(WARNING) << "failed to get tablets to compact, err=" << st;
739
0
            break;
740
0
        }
741
24.0k
        if (!need_pick_tablet) break;
742
23.9k
        tablets_compaction = std::move(tablets);
743
23.9k
    } while (false);
744
745
24.0k
    if (max_compaction_score > 0) {
746
22.5k
        if (compaction_type == CompactionType::BASE_COMPACTION) {
747
2.35k
            DorisMetrics::instance()->tablet_base_max_compaction_score->set_value(
748
2.35k
                    max_compaction_score);
749
20.1k
        } else {
750
20.1k
            DorisMetrics::instance()->tablet_cumulative_max_compaction_score->set_value(
751
20.1k
                    max_compaction_score);
752
20.1k
        }
753
22.5k
    }
754
755
24.0k
    return tablets_compaction;
756
24.5k
}
757
758
Status CloudStorageEngine::_request_tablet_global_compaction_lock(
759
        ReaderType compaction_type, const CloudTabletSPtr& tablet,
760
7.73k
        std::shared_ptr<CloudCompactionMixin> compaction) {
761
7.73k
    long now = duration_cast<std::chrono::milliseconds>(
762
7.73k
                       std::chrono::system_clock::now().time_since_epoch())
763
7.73k
                       .count();
764
7.73k
    if (compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION) {
765
7.57k
        auto cumu_compaction = static_pointer_cast<CloudCumulativeCompaction>(compaction);
766
7.57k
        if (auto st = cumu_compaction->request_global_lock(); !st.ok()) {
767
535
            LOG_WARNING("failed to request cumu compactoin global lock")
768
535
                    .tag("tablet id", tablet->tablet_id())
769
535
                    .tag("msg", st.to_string());
770
535
            tablet->set_last_cumu_compaction_failure_time(now);
771
535
            return st;
772
535
        }
773
7.03k
        {
774
7.03k
            std::lock_guard lock(_compaction_mtx);
775
7.03k
            _executing_cumu_compactions[tablet->tablet_id()].push_back(cumu_compaction);
776
7.03k
        }
777
7.03k
        return Status::OK();
778
7.57k
    } else if (compaction_type == ReaderType::READER_BASE_COMPACTION) {
779
69
        auto base_compaction = static_pointer_cast<CloudBaseCompaction>(compaction);
780
69
        if (auto st = base_compaction->request_global_lock(); !st.ok()) {
781
8
            LOG_WARNING("failed to request base compactoin global lock")
782
8
                    .tag("tablet id", tablet->tablet_id())
783
8
                    .tag("msg", st.to_string());
784
8
            tablet->set_last_base_compaction_failure_time(now);
785
8
            return st;
786
8
        }
787
61
        {
788
61
            std::lock_guard lock(_compaction_mtx);
789
61
            _executing_base_compactions[tablet->tablet_id()] = base_compaction;
790
61
        }
791
61
        return Status::OK();
792
94
    } else if (compaction_type == ReaderType::READER_FULL_COMPACTION) {
793
94
        auto full_compaction = static_pointer_cast<CloudFullCompaction>(compaction);
794
94
        if (auto st = full_compaction->request_global_lock(); !st.ok()) {
795
0
            LOG_WARNING("failed to request full compactoin global lock")
796
0
                    .tag("tablet id", tablet->tablet_id())
797
0
                    .tag("msg", st.to_string());
798
0
            tablet->set_last_full_compaction_failure_time(now);
799
0
            return st;
800
0
        }
801
94
        {
802
94
            std::lock_guard lock(_compaction_mtx);
803
94
            _executing_full_compactions[tablet->tablet_id()] = full_compaction;
804
94
        }
805
94
        return Status::OK();
806
94
    } else {
807
0
        LOG(WARNING) << "unsupport compaction task for tablet: " << tablet->tablet_id()
808
0
                     << ", compaction name: " << compaction->compaction_name();
809
0
        return Status::NotFound("Unsupport compaction type {}", compaction->compaction_name());
810
0
    }
811
7.73k
}
812
813
Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& tablet,
814
4.45k
                                                        int trigger_method) {
815
4.45k
    using namespace std::chrono;
816
4.45k
    {
817
4.45k
        std::lock_guard lock(_compaction_mtx);
818
        // Take a placeholder for base compaction
819
4.45k
        auto [_, success] = _submitted_base_compactions.emplace(tablet->tablet_id(), nullptr);
820
4.45k
        if (!success) {
821
0
            return Status::AlreadyExist(
822
0
                    "other base compaction or full compaction is submitted, tablet_id={}",
823
0
                    tablet->tablet_id());
824
0
        }
825
4.45k
    }
826
4.45k
    auto compaction = std::make_shared<CloudBaseCompaction>(*this, tablet);
827
4.45k
    auto st = compaction->prepare_compact();
828
4.45k
    if (!st.ok()) {
829
4.38k
        long now = duration_cast<std::chrono::milliseconds>(
830
4.38k
                           std::chrono::system_clock::now().time_since_epoch())
831
4.38k
                           .count();
832
4.38k
        tablet->set_last_base_compaction_failure_time(now);
833
4.38k
        std::lock_guard lock(_compaction_mtx);
834
4.38k
        _submitted_base_compactions.erase(tablet->tablet_id());
835
4.38k
        return st;
836
4.38k
    }
837
    // Register task with CompactionTaskTracker as PENDING
838
69
    auto* tracker = CompactionTaskTracker::instance();
839
69
    int64_t compaction_id = compaction->compaction_id();
840
69
    {
841
69
        CompactionTaskInfo info;
842
69
        info.compaction_id = compaction_id;
843
69
        info.tablet_id = tablet->tablet_id();
844
69
        info.table_id = tablet->table_id();
845
69
        info.partition_id = tablet->partition_id();
846
69
        info.compaction_type = CompactionProfileType::BASE;
847
69
        info.status = CompactionTaskStatus::PENDING;
848
69
        info.trigger_method = static_cast<TriggerMethod>(trigger_method);
849
69
        info.scheduled_time_ms =
850
69
                duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
851
69
        info.backend_id = BackendOptions::get_backend_id();
852
69
        info.compaction_score = tablet->get_real_compaction_score();
853
69
        info.input_rowsets_count = compaction->input_rowsets_count();
854
69
        info.input_row_num = compaction->input_row_num_value();
855
69
        info.input_data_size = compaction->input_rowsets_data_size();
856
69
        info.input_index_size = compaction->input_rowsets_index_size();
857
69
        info.input_total_size = compaction->input_rowsets_total_size();
858
69
        info.input_segments_num = compaction->input_segments_num_value();
859
69
        info.input_version_range = compaction->input_version_range_str();
860
69
        info.is_vertical = compaction->is_vertical();
861
69
        tracker->register_task(std::move(info));
862
69
    }
863
69
    {
864
69
        std::lock_guard lock(_compaction_mtx);
865
69
        _submitted_base_compactions[tablet->tablet_id()] = compaction;
866
69
    }
867
69
    st = _base_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() {
868
69
        DorisMetrics::instance()->base_compaction_task_running_total->increment(1);
869
69
        DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
870
69
                _base_compaction_thread_pool->get_queue_size());
871
69
        g_base_compaction_running_task_count << 1;
872
69
        signal::tablet_id = tablet->tablet_id();
873
69
        Defer defer {[&]() {
874
            // Idempotent cleanup: remove task from tracker
875
69
            CompactionTaskTracker::instance()->remove_task(compaction_id);
876
69
            g_base_compaction_running_task_count << -1;
877
69
            std::lock_guard lock(_compaction_mtx);
878
69
            _submitted_base_compactions.erase(tablet->tablet_id());
879
69
            DorisMetrics::instance()->base_compaction_task_running_total->increment(-1);
880
69
            DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
881
69
                    _base_compaction_thread_pool->get_queue_size());
882
69
        }};
883
69
        auto st = _request_tablet_global_compaction_lock(ReaderType::READER_BASE_COMPACTION, tablet,
884
69
                                                         compaction);
885
69
        if (!st.ok()) return;
886
        // Update tracker to RUNNING after acquiring global lock
887
61
        {
888
61
            RunningStats rs;
889
61
            rs.start_time_ms =
890
61
                    duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
891
61
            CompactionTaskTracker::instance()->update_to_running(compaction_id, rs);
892
61
        }
893
61
        st = compaction->execute_compact();
894
61
        if (!st.ok()) {
895
            // Error log has been output in `execute_compact`
896
0
            long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
897
0
            tablet->set_last_base_compaction_failure_time(now);
898
0
        }
899
61
        std::lock_guard lock(_compaction_mtx);
900
61
        _executing_base_compactions.erase(tablet->tablet_id());
901
61
    });
902
69
    DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
903
69
            _base_compaction_thread_pool->get_queue_size());
904
69
    if (!st.ok()) {
905
0
        tracker->remove_task(compaction_id);
906
0
        std::lock_guard lock(_compaction_mtx);
907
0
        _submitted_base_compactions.erase(tablet->tablet_id());
908
0
        return Status::InternalError("failed to submit base compaction, tablet_id={}",
909
0
                                     tablet->tablet_id());
910
0
    }
911
69
    return st;
912
69
}
913
914
Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletSPtr& tablet,
915
144k
                                                              int trigger_method) {
916
144k
    using namespace std::chrono;
917
144k
    {
918
144k
        std::lock_guard lock(_compaction_mtx);
919
144k
        if (!config::enable_parallel_cumu_compaction &&
920
144k
            _submitted_cumu_compactions.count(tablet->tablet_id())) {
921
2
            return Status::AlreadyExist("other cumu compaction is submitted, tablet_id={}",
922
2
                                        tablet->tablet_id());
923
2
        }
924
144k
        auto [_, success] = _tablet_preparing_cumu_compaction.insert(tablet->tablet_id());
925
144k
        if (!success) {
926
0
            return Status::AlreadyExist("other cumu compaction is preparing, tablet_id={}",
927
0
                                        tablet->tablet_id());
928
0
        }
929
144k
    }
930
144k
    auto compaction = std::make_shared<CloudCumulativeCompaction>(*this, tablet);
931
144k
    auto st = compaction->prepare_compact();
932
144k
    if (!st.ok()) {
933
136k
        long now = duration_cast<std::chrono::milliseconds>(
934
136k
                           std::chrono::system_clock::now().time_since_epoch())
935
136k
                           .count();
936
136k
        if (!st.is<ErrorCode::CUMULATIVE_MEET_DELETE_VERSION>()) {
937
136k
            if (st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) {
938
                // Backoff strategy if no suitable version
939
136k
                tablet->last_cumu_no_suitable_version_ms = now;
940
136k
            } else {
941
0
                tablet->set_last_cumu_compaction_failure_time(now);
942
0
            }
943
136k
        }
944
136k
        std::lock_guard lock(_compaction_mtx);
945
136k
        _tablet_preparing_cumu_compaction.erase(tablet->tablet_id());
946
136k
        return st;
947
136k
    }
948
    // Register task with CompactionTaskTracker as PENDING
949
    // IMPORTANT: use compaction->compaction_id(), NOT tracker->next_compaction_id(),
950
    // because the Compaction constructor already allocated an ID via the tracker.
951
7.57k
    auto* tracker = CompactionTaskTracker::instance();
952
7.57k
    int64_t compaction_id = compaction->compaction_id();
953
7.57k
    {
954
7.57k
        CompactionTaskInfo info;
955
7.57k
        info.compaction_id = compaction_id;
956
7.57k
        info.tablet_id = tablet->tablet_id();
957
7.57k
        info.table_id = tablet->table_id();
958
7.57k
        info.partition_id = tablet->partition_id();
959
7.57k
        info.compaction_type = CompactionProfileType::CUMULATIVE;
960
7.57k
        info.status = CompactionTaskStatus::PENDING;
961
7.57k
        info.trigger_method = static_cast<TriggerMethod>(trigger_method);
962
7.57k
        info.scheduled_time_ms =
963
7.57k
                duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
964
7.57k
        info.backend_id = BackendOptions::get_backend_id();
965
7.57k
        info.compaction_score = tablet->get_real_compaction_score();
966
7.57k
        info.input_rowsets_count = compaction->input_rowsets_count();
967
7.57k
        info.input_row_num = compaction->input_row_num_value();
968
7.57k
        info.input_data_size = compaction->input_rowsets_data_size();
969
7.57k
        info.input_index_size = compaction->input_rowsets_index_size();
970
7.57k
        info.input_total_size = compaction->input_rowsets_total_size();
971
7.57k
        info.input_segments_num = compaction->input_segments_num_value();
972
7.57k
        info.input_version_range = compaction->input_version_range_str();
973
7.57k
        info.is_vertical = compaction->is_vertical();
974
7.57k
        tracker->register_task(std::move(info));
975
7.57k
    }
976
7.57k
    {
977
7.57k
        std::lock_guard lock(_compaction_mtx);
978
7.57k
        _tablet_preparing_cumu_compaction.erase(tablet->tablet_id());
979
7.57k
        _submitted_cumu_compactions[tablet->tablet_id()].push_back(compaction);
980
7.57k
    }
981
7.57k
    auto erase_submitted_cumu_compaction = [=, this]() {
982
7.57k
        std::lock_guard lock(_compaction_mtx);
983
7.57k
        auto it = _submitted_cumu_compactions.find(tablet->tablet_id());
984
7.57k
        DCHECK(it != _submitted_cumu_compactions.end());
985
7.57k
        auto& compactions = it->second;
986
7.57k
        auto it1 = std::find(compactions.begin(), compactions.end(), compaction);
987
7.57k
        DCHECK(it1 != compactions.end());
988
7.57k
        compactions.erase(it1);
989
7.57k
        if (compactions.empty()) { // No compactions on this tablet, erase key
990
7.57k
            _submitted_cumu_compactions.erase(it);
991
            // No cumu compaction on this tablet, reset `last_cumu_no_suitable_version_ms` to enable this tablet to
992
            // enter the compaction scheduling candidate set. The purpose of doing this is to have at least one BE perform
993
            // cumu compaction on tablet which has suitable versions for cumu compaction.
994
7.57k
            tablet->last_cumu_no_suitable_version_ms = 0;
995
7.57k
        }
996
7.57k
    };
997
7.57k
    auto erase_executing_cumu_compaction = [=, this]() {
998
7.00k
        std::lock_guard lock(_compaction_mtx);
999
7.00k
        auto it = _executing_cumu_compactions.find(tablet->tablet_id());
1000
7.00k
        DCHECK(it != _executing_cumu_compactions.end());
1001
7.00k
        auto& compactions = it->second;
1002
7.00k
        auto it1 = std::find(compactions.begin(), compactions.end(), compaction);
1003
7.00k
        DCHECK(it1 != compactions.end());
1004
7.00k
        compactions.erase(it1);
1005
7.03k
        if (compactions.empty()) { // No compactions on this tablet, erase key
1006
7.03k
            _executing_cumu_compactions.erase(it);
1007
            // No cumu compaction on this tablet, reset `last_cumu_no_suitable_version_ms` to enable this tablet to
1008
            // enter the compaction scheduling candidate set. The purpose of doing this is to have at least one BE perform
1009
            // cumu compaction on tablet which has suitable versions for cumu compaction.
1010
7.03k
            tablet->last_cumu_no_suitable_version_ms = 0;
1011
7.03k
        }
1012
7.00k
    };
1013
7.57k
    st = _cumu_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() {
1014
7.57k
        DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(1);
1015
7.57k
        DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
1016
7.57k
                _cumu_compaction_thread_pool->get_queue_size());
1017
7.57k
        DBUG_EXECUTE_IF("CloudStorageEngine._submit_cumulative_compaction_task.wait_in_line",
1018
7.57k
                        { sleep(5); })
1019
7.57k
        signal::tablet_id = tablet->tablet_id();
1020
7.57k
        g_cumu_compaction_running_task_count << 1;
1021
7.57k
        bool is_large_task = true;
1022
7.57k
        Defer defer {[&]() {
1023
7.57k
            DBUG_EXECUTE_IF("CloudStorageEngine._submit_cumulative_compaction_task.sleep",
1024
7.57k
                            { sleep(5); })
1025
            // Idempotent cleanup: remove task from tracker
1026
7.57k
            CompactionTaskTracker::instance()->remove_task(compaction_id);
1027
7.57k
            std::lock_guard lock(_cumu_compaction_delay_mtx);
1028
7.57k
            _cumu_compaction_thread_pool_used_threads--;
1029
7.57k
            if (!is_large_task) {
1030
7.03k
                _cumu_compaction_thread_pool_small_tasks_running--;
1031
7.03k
            }
1032
7.57k
            g_cumu_compaction_running_task_count << -1;
1033
7.57k
            erase_submitted_cumu_compaction();
1034
7.57k
            DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(-1);
1035
7.57k
            DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
1036
7.57k
                    _cumu_compaction_thread_pool->get_queue_size());
1037
7.57k
        }};
1038
7.57k
        auto st = _request_tablet_global_compaction_lock(ReaderType::READER_CUMULATIVE_COMPACTION,
1039
7.57k
                                                         tablet, compaction);
1040
7.57k
        if (!st.ok()) return;
1041
        // Update tracker to RUNNING after acquiring global lock
1042
7.03k
        {
1043
7.03k
            RunningStats rs;
1044
7.03k
            rs.start_time_ms =
1045
7.03k
                    duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
1046
7.03k
            CompactionTaskTracker::instance()->update_to_running(compaction_id, rs);
1047
7.03k
        }
1048
7.03k
        do {
1049
7.03k
            std::lock_guard lock(_cumu_compaction_delay_mtx);
1050
7.03k
            _cumu_compaction_thread_pool_used_threads++;
1051
7.03k
            if (config::large_cumu_compaction_task_min_thread_num > 1 &&
1052
7.03k
                _cumu_compaction_thread_pool->max_threads() >=
1053
7.03k
                        config::large_cumu_compaction_task_min_thread_num) {
1054
                // Determine if this is a small task based on configured thresholds
1055
7.03k
                is_large_task = (compaction->get_input_rowsets_bytes() >
1056
7.03k
                                         config::large_cumu_compaction_task_bytes_threshold ||
1057
7.03k
                                 compaction->get_input_num_rows() >
1058
7.03k
                                         config::large_cumu_compaction_task_row_num_threshold);
1059
                // Small task. No delay needed
1060
7.03k
                if (!is_large_task) {
1061
7.03k
                    _cumu_compaction_thread_pool_small_tasks_running++;
1062
7.03k
                    break;
1063
7.03k
                }
1064
                // Deal with large task
1065
0
                if (_should_delay_large_task()) {
1066
0
                    long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch())
1067
0
                                       .count();
1068
                    // sleep 5s for this tablet
1069
0
                    tablet->set_last_cumu_compaction_failure_time(now);
1070
0
                    erase_executing_cumu_compaction();
1071
0
                    LOG_WARNING(
1072
0
                            "failed to do CloudCumulativeCompaction, cumu thread pool is "
1073
0
                            "intensive, delay large task.")
1074
0
                            .tag("tablet_id", tablet->tablet_id())
1075
0
                            .tag("input_rows", compaction->get_input_num_rows())
1076
0
                            .tag("input_rowsets_total_size", compaction->get_input_rowsets_bytes())
1077
0
                            .tag("config::large_cumu_compaction_task_bytes_threshold",
1078
0
                                 config::large_cumu_compaction_task_bytes_threshold)
1079
0
                            .tag("config::large_cumu_compaction_task_row_num_threshold",
1080
0
                                 config::large_cumu_compaction_task_row_num_threshold)
1081
0
                            .tag("remaining threads", _cumu_compaction_thread_pool_used_threads)
1082
0
                            .tag("small_tasks_running",
1083
0
                                 _cumu_compaction_thread_pool_small_tasks_running);
1084
0
                    return;
1085
0
                }
1086
0
            }
1087
7.03k
        } while (false);
1088
7.03k
        st = compaction->execute_compact();
1089
7.03k
        if (!st.ok()) {
1090
            // Error log has been output in `execute_compact`
1091
33
            long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
1092
33
            tablet->set_last_cumu_compaction_failure_time(now);
1093
33
        }
1094
7.03k
        erase_executing_cumu_compaction();
1095
7.03k
    });
1096
7.57k
    DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
1097
7.57k
            _cumu_compaction_thread_pool->get_queue_size());
1098
7.57k
    if (!st.ok()) {
1099
0
        tracker->remove_task(compaction_id);
1100
0
        erase_submitted_cumu_compaction();
1101
0
        return Status::InternalError("failed to submit cumu compaction, tablet_id={}",
1102
0
                                     tablet->tablet_id());
1103
0
    }
1104
7.57k
    return st;
1105
7.57k
}
1106
1107
Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& tablet,
1108
95
                                                        int trigger_method) {
1109
95
    using namespace std::chrono;
1110
95
    {
1111
95
        std::lock_guard lock(_compaction_mtx);
1112
        // Take a placeholder for full compaction
1113
95
        auto [_, success] = _submitted_full_compactions.emplace(tablet->tablet_id(), nullptr);
1114
95
        if (!success) {
1115
0
            return Status::AlreadyExist(
1116
0
                    "other full compaction or base compaction is submitted, tablet_id={}",
1117
0
                    tablet->tablet_id());
1118
0
        }
1119
95
    }
1120
    //auto compaction = std::make_shared<CloudFullCompaction>(tablet);
1121
95
    auto compaction = std::make_shared<CloudFullCompaction>(*this, tablet);
1122
95
    auto st = compaction->prepare_compact();
1123
95
    if (!st.ok()) {
1124
1
        long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
1125
1
        tablet->set_last_full_compaction_failure_time(now);
1126
1
        std::lock_guard lock(_compaction_mtx);
1127
1
        _submitted_full_compactions.erase(tablet->tablet_id());
1128
1
        return st;
1129
1
    }
1130
    // Register task with CompactionTaskTracker as PENDING
1131
94
    auto* tracker = CompactionTaskTracker::instance();
1132
94
    int64_t compaction_id = compaction->compaction_id();
1133
94
    {
1134
94
        CompactionTaskInfo info;
1135
94
        info.compaction_id = compaction_id;
1136
94
        info.tablet_id = tablet->tablet_id();
1137
94
        info.table_id = tablet->table_id();
1138
94
        info.partition_id = tablet->partition_id();
1139
94
        info.compaction_type = CompactionProfileType::FULL;
1140
94
        info.status = CompactionTaskStatus::PENDING;
1141
94
        info.trigger_method = static_cast<TriggerMethod>(trigger_method);
1142
94
        info.scheduled_time_ms =
1143
94
                duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
1144
94
        info.backend_id = BackendOptions::get_backend_id();
1145
94
        info.compaction_score = tablet->get_real_compaction_score();
1146
94
        info.input_rowsets_count = compaction->input_rowsets_count();
1147
94
        info.input_row_num = compaction->input_row_num_value();
1148
94
        info.input_data_size = compaction->input_rowsets_data_size();
1149
94
        info.input_index_size = compaction->input_rowsets_index_size();
1150
94
        info.input_total_size = compaction->input_rowsets_total_size();
1151
94
        info.input_segments_num = compaction->input_segments_num_value();
1152
94
        info.input_version_range = compaction->input_version_range_str();
1153
94
        info.is_vertical = compaction->is_vertical();
1154
94
        tracker->register_task(std::move(info));
1155
94
    }
1156
94
    {
1157
94
        std::lock_guard lock(_compaction_mtx);
1158
94
        _submitted_full_compactions[tablet->tablet_id()] = compaction;
1159
94
    }
1160
94
    st = _base_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() {
1161
94
        g_full_compaction_running_task_count << 1;
1162
94
        signal::tablet_id = tablet->tablet_id();
1163
94
        Defer defer {[&]() {
1164
            // Idempotent cleanup: remove task from tracker
1165
94
            CompactionTaskTracker::instance()->remove_task(compaction_id);
1166
94
            g_full_compaction_running_task_count << -1;
1167
94
            std::lock_guard lock(_compaction_mtx);
1168
94
            _submitted_full_compactions.erase(tablet->tablet_id());
1169
94
        }};
1170
94
        auto st = _request_tablet_global_compaction_lock(ReaderType::READER_FULL_COMPACTION, tablet,
1171
94
                                                         compaction);
1172
94
        if (!st.ok()) return;
1173
        // Update tracker to RUNNING after acquiring global lock
1174
94
        {
1175
94
            RunningStats rs;
1176
94
            rs.start_time_ms =
1177
94
                    duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
1178
94
            CompactionTaskTracker::instance()->update_to_running(compaction_id, rs);
1179
94
        }
1180
94
        st = compaction->execute_compact();
1181
94
        if (!st.ok()) {
1182
            // Error log has been output in `execute_compact`
1183
0
            long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
1184
0
            tablet->set_last_full_compaction_failure_time(now);
1185
0
        }
1186
94
        std::lock_guard lock(_compaction_mtx);
1187
94
        _executing_full_compactions.erase(tablet->tablet_id());
1188
94
    });
1189
94
    if (!st.ok()) {
1190
0
        tracker->remove_task(compaction_id);
1191
0
        std::lock_guard lock(_compaction_mtx);
1192
0
        _submitted_full_compactions.erase(tablet->tablet_id());
1193
0
        return Status::InternalError("failed to submit full compaction, tablet_id={}",
1194
0
                                     tablet->tablet_id());
1195
0
    }
1196
94
    return st;
1197
94
}
1198
1199
Status CloudStorageEngine::submit_compaction_task(const CloudTabletSPtr& tablet,
1200
                                                  CompactionType compaction_type,
1201
148k
                                                  int trigger_method) {
1202
148k
    DCHECK(compaction_type == CompactionType::CUMULATIVE_COMPACTION ||
1203
148k
           compaction_type == CompactionType::BASE_COMPACTION ||
1204
148k
           compaction_type == CompactionType::FULL_COMPACTION);
1205
148k
    switch (compaction_type) {
1206
4.45k
    case CompactionType::BASE_COMPACTION:
1207
4.45k
        RETURN_IF_ERROR(_submit_base_compaction_task(tablet, trigger_method));
1208
69
        return Status::OK();
1209
144k
    case CompactionType::CUMULATIVE_COMPACTION:
1210
144k
        RETURN_IF_ERROR(_submit_cumulative_compaction_task(tablet, trigger_method));
1211
7.57k
        return Status::OK();
1212
95
    case CompactionType::FULL_COMPACTION:
1213
95
        RETURN_IF_ERROR(_submit_full_compaction_task(tablet, trigger_method));
1214
94
        return Status::OK();
1215
0
    default:
1216
0
        return Status::InternalError("unknown compaction type!");
1217
148k
    }
1218
148k
}
1219
1220
1
void CloudStorageEngine::_lease_compaction_thread_callback() {
1221
260
    while (!_stop_background_threads_latch.wait_for(
1222
260
            std::chrono::seconds(config::lease_compaction_interval_seconds))) {
1223
259
        std::vector<std::shared_ptr<CloudFullCompaction>> full_compactions;
1224
259
        std::vector<std::shared_ptr<CloudBaseCompaction>> base_compactions;
1225
259
        std::vector<std::shared_ptr<CloudCumulativeCompaction>> cumu_compactions;
1226
259
        std::vector<std::shared_ptr<CloudCompactionStopToken>> compation_stop_tokens;
1227
259
        std::vector<std::shared_ptr<CloudIndexChangeCompaction>> index_change_compations;
1228
259
        {
1229
259
            std::lock_guard lock(_compaction_mtx);
1230
259
            for (auto& [_, base] : _executing_base_compactions) {
1231
1
                if (base) { // `base` might be a nullptr placeholder
1232
1
                    base_compactions.push_back(base);
1233
1
                }
1234
1
            }
1235
259
            for (auto& [_, cumus] : _executing_cumu_compactions) {
1236
208
                for (auto& cumu : cumus) {
1237
208
                    cumu_compactions.push_back(cumu);
1238
208
                }
1239
208
            }
1240
259
            for (auto& [_, full] : _executing_full_compactions) {
1241
6
                if (full) {
1242
6
                    full_compactions.push_back(full);
1243
6
                }
1244
6
            }
1245
259
            for (auto& [_, stop_token] : _active_compaction_stop_tokens) {
1246
7
                if (stop_token) {
1247
7
                    compation_stop_tokens.push_back(stop_token);
1248
7
                }
1249
7
            }
1250
259
            for (auto& [_, index_change] : _submitted_index_change_cumu_compaction) {
1251
3
                if (index_change) {
1252
3
                    index_change_compations.push_back(index_change);
1253
3
                }
1254
3
            }
1255
259
            for (auto& [_, index_change] : _submitted_index_change_base_compaction) {
1256
0
                if (index_change) {
1257
0
                    index_change_compations.push_back(index_change);
1258
0
                }
1259
0
            }
1260
259
        }
1261
        // TODO(plat1ko): Support batch lease rpc
1262
259
        for (auto& stop_token : compation_stop_tokens) {
1263
7
            stop_token->do_lease();
1264
7
        }
1265
259
        for (auto& comp : full_compactions) {
1266
6
            comp->do_lease();
1267
6
        }
1268
259
        for (auto& comp : cumu_compactions) {
1269
208
            comp->do_lease();
1270
208
        }
1271
259
        for (auto& comp : base_compactions) {
1272
1
            comp->do_lease();
1273
1
        }
1274
259
        for (auto& comp : index_change_compations) {
1275
3
            comp->do_lease();
1276
3
        }
1277
259
    }
1278
1
}
1279
1280
1
void CloudStorageEngine::_check_tablet_delete_bitmap_score_callback() {
1281
1
    LOG(INFO) << "try to start check tablet delete bitmap score!";
1282
18
    while (!_stop_background_threads_latch.wait_for(
1283
18
            std::chrono::seconds(config::check_tablet_delete_bitmap_interval_seconds))) {
1284
17
        if (!config::enable_check_tablet_delete_bitmap_score) {
1285
0
            return;
1286
0
        }
1287
17
        uint64_t max_delete_bitmap_score = 0;
1288
17
        uint64_t max_base_rowset_delete_bitmap_score = 0;
1289
17
        tablet_mgr().get_topn_tablet_delete_bitmap_score(&max_delete_bitmap_score,
1290
17
                                                         &max_base_rowset_delete_bitmap_score);
1291
17
        if (max_delete_bitmap_score > 0) {
1292
17
            _tablet_max_delete_bitmap_score_metrics->set_value(max_delete_bitmap_score);
1293
17
        }
1294
17
        if (max_base_rowset_delete_bitmap_score > 0) {
1295
17
            _tablet_max_base_rowset_delete_bitmap_score_metrics->set_value(
1296
17
                    max_base_rowset_delete_bitmap_score);
1297
17
        }
1298
17
    }
1299
1
}
1300
1301
0
Status CloudStorageEngine::get_compaction_status_json(std::string* result) {
1302
0
    rapidjson::Document root;
1303
0
    root.SetObject();
1304
1305
0
    std::lock_guard lock(_compaction_mtx);
1306
    // cumu
1307
0
    std::string_view cumu = "CumulativeCompaction";
1308
0
    rapidjson::Value cumu_key;
1309
0
    cumu_key.SetString(cumu.data(), cast_set<uint32_t>(cumu.length()), root.GetAllocator());
1310
0
    rapidjson::Document cumu_arr;
1311
0
    cumu_arr.SetArray();
1312
0
    for (auto& [tablet_id, v] : _submitted_cumu_compactions) {
1313
0
        for (int i = 0; i < v.size(); ++i) {
1314
0
            cumu_arr.PushBack(tablet_id, root.GetAllocator());
1315
0
        }
1316
0
    }
1317
0
    root.AddMember(cumu_key, cumu_arr, root.GetAllocator());
1318
    // base
1319
0
    std::string_view base = "BaseCompaction";
1320
0
    rapidjson::Value base_key;
1321
0
    base_key.SetString(base.data(), cast_set<uint32_t>(base.length()), root.GetAllocator());
1322
0
    rapidjson::Document base_arr;
1323
0
    base_arr.SetArray();
1324
0
    for (auto& [tablet_id, _] : _submitted_base_compactions) {
1325
0
        base_arr.PushBack(tablet_id, root.GetAllocator());
1326
0
    }
1327
0
    root.AddMember(base_key, base_arr, root.GetAllocator());
1328
1329
0
    rapidjson::StringBuffer strbuf;
1330
0
    rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
1331
0
    root.Accept(writer);
1332
0
    *result = std::string(strbuf.GetString());
1333
0
    return Status::OK();
1334
0
}
1335
1336
std::shared_ptr<CloudCumulativeCompactionPolicy> CloudStorageEngine::cumu_compaction_policy(
1337
158k
        std::string_view compaction_policy) {
1338
158k
    if (!_cumulative_compaction_policies.contains(compaction_policy)) {
1339
0
        return _cumulative_compaction_policies.at(CUMULATIVE_SIZE_BASED_POLICY);
1340
0
    }
1341
158k
    return _cumulative_compaction_policies.at(compaction_policy);
1342
158k
}
1343
1344
Status CloudStorageEngine::register_compaction_stop_token(CloudTabletSPtr tablet,
1345
1.15k
                                                          int64_t initiator) {
1346
1.15k
    {
1347
1.15k
        std::lock_guard lock(_compaction_mtx);
1348
1.15k
        auto [_, success] = _active_compaction_stop_tokens.emplace(tablet->tablet_id(), nullptr);
1349
1.15k
        if (!success) {
1350
0
            return Status::AlreadyExist("stop token already exists for tablet_id={}",
1351
0
                                        tablet->tablet_id());
1352
0
        }
1353
1.15k
    }
1354
1355
1.15k
    auto stop_token = std::make_shared<CloudCompactionStopToken>(*this, tablet, initiator);
1356
1.15k
    auto st = stop_token->do_register();
1357
1358
1.15k
    if (!st.ok()) {
1359
0
        std::lock_guard lock(_compaction_mtx);
1360
0
        _active_compaction_stop_tokens.erase(tablet->tablet_id());
1361
0
        return st;
1362
0
    }
1363
1364
1.15k
    {
1365
1.15k
        std::lock_guard lock(_compaction_mtx);
1366
1.15k
        _active_compaction_stop_tokens[tablet->tablet_id()] = stop_token;
1367
1.15k
    }
1368
1.15k
    LOG_INFO(
1369
1.15k
            "successfully register compaction stop token for tablet_id={}, "
1370
1.15k
            "delete_bitmap_lock_initiator={}",
1371
1.15k
            tablet->tablet_id(), initiator);
1372
1.15k
    return st;
1373
1.15k
}
1374
1375
1.15k
Status CloudStorageEngine::unregister_compaction_stop_token(CloudTabletSPtr tablet, bool clear_ms) {
1376
1.15k
    std::shared_ptr<CloudCompactionStopToken> stop_token;
1377
1.15k
    {
1378
1.15k
        std::lock_guard lock(_compaction_mtx);
1379
1.15k
        if (auto it = _active_compaction_stop_tokens.find(tablet->tablet_id());
1380
1.15k
            it != _active_compaction_stop_tokens.end()) {
1381
1.15k
            stop_token = it->second;
1382
1.15k
        } else {
1383
0
            return Status::NotFound("stop token not found for tablet_id={}", tablet->tablet_id());
1384
0
        }
1385
1.15k
        _active_compaction_stop_tokens.erase(tablet->tablet_id());
1386
1.15k
    }
1387
1.15k
    LOG_INFO("successfully unregister compaction stop token for tablet_id={}", tablet->tablet_id());
1388
1.15k
    if (stop_token && clear_ms) {
1389
0
        RETURN_IF_ERROR(stop_token->do_unregister());
1390
0
        LOG_INFO(
1391
0
                "successfully remove compaction stop token from MS for tablet_id={}, "
1392
0
                "delete_bitmap_lock_initiator={}",
1393
0
                tablet->tablet_id(), stop_token->initiator());
1394
0
    }
1395
1.15k
    return Status::OK();
1396
1.15k
}
1397
1398
1
Status CloudStorageEngine::_check_all_root_path_cluster_id() {
1399
    // Check if all root paths have the same cluster id
1400
1
    std::set<int32_t> cluster_ids;
1401
1
    for (const auto& path : _options.store_paths) {
1402
1
        auto cluster_id_path = fmt::format("{}/{}", path.path, CLUSTER_ID_PREFIX);
1403
1
        bool exists = false;
1404
1
        RETURN_IF_ERROR(io::global_local_filesystem()->exists(cluster_id_path, &exists));
1405
1
        if (exists) {
1406
0
            io::FileReaderSPtr reader;
1407
0
            RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cluster_id_path, &reader));
1408
0
            size_t fsize = reader->size();
1409
0
            if (fsize > 0) {
1410
0
                std::string content;
1411
0
                content.resize(fsize, '\0');
1412
0
                size_t bytes_read = 0;
1413
0
                RETURN_IF_ERROR(reader->read_at(0, {content.data(), fsize}, &bytes_read));
1414
0
                DCHECK_EQ(fsize, bytes_read);
1415
0
                int32_t tmp_cluster_id = std::stoi(content);
1416
0
                cluster_ids.insert(tmp_cluster_id);
1417
0
            }
1418
0
        }
1419
1
    }
1420
1
    _effective_cluster_id = config::cluster_id;
1421
    // first init
1422
1
    if (cluster_ids.empty()) {
1423
        // not set configured cluster id
1424
1
        if (_effective_cluster_id == -1) {
1425
1
            return Status::OK();
1426
1
        } else {
1427
            // If no cluster id file exists, use the configured cluster id
1428
0
            return set_cluster_id(_effective_cluster_id);
1429
0
        }
1430
1
    }
1431
0
    if (cluster_ids.size() > 1) {
1432
0
        return Status::InternalError(
1433
0
                "All root paths must have the same cluster id, but you have "
1434
0
                "different cluster ids: {}",
1435
0
                fmt::join(cluster_ids, ", "));
1436
0
    }
1437
0
    if (_effective_cluster_id != -1 && !cluster_ids.empty() &&
1438
0
        *cluster_ids.begin() != _effective_cluster_id) {
1439
0
        return Status::Corruption(
1440
0
                "multiple cluster ids is not equal. config::cluster_id={}, "
1441
0
                "storage path cluster_id={}",
1442
0
                _effective_cluster_id, *cluster_ids.begin());
1443
0
    }
1444
0
    return Status::OK();
1445
0
}
1446
1447
1
Status CloudStorageEngine::set_cluster_id(int32_t cluster_id) {
1448
1
    std::lock_guard<std::mutex> l(_store_lock);
1449
1
    for (auto& path : _options.store_paths) {
1450
1
        auto cluster_id_path = fmt::format("{}/{}", path.path, CLUSTER_ID_PREFIX);
1451
1
        bool exists = false;
1452
1
        RETURN_IF_ERROR(io::global_local_filesystem()->exists(cluster_id_path, &exists));
1453
1
        if (!exists) {
1454
1
            io::FileWriterPtr file_writer;
1455
1
            RETURN_IF_ERROR(
1456
1
                    io::global_local_filesystem()->create_file(cluster_id_path, &file_writer));
1457
1
            RETURN_IF_ERROR(file_writer->append(std::to_string(cluster_id)));
1458
1
            RETURN_IF_ERROR(file_writer->close());
1459
1
        }
1460
1
    }
1461
1
    _effective_cluster_id = cluster_id;
1462
1
    return Status::OK();
1463
1
}
1464
1465
} // namespace doris