Coverage Report

Created: 2026-03-19 16:23

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_storage_engine.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_storage_engine.h"
19
20
#include <bvar/reducer.h>
21
#include <gen_cpp/PlanNodes_types.h>
22
#include <gen_cpp/cloud.pb.h>
23
#include <gen_cpp/olap_file.pb.h>
24
#include <rapidjson/document.h>
25
#include <rapidjson/encodings.h>
26
#include <rapidjson/prettywriter.h>
27
#include <rapidjson/stringbuffer.h>
28
29
#include <algorithm>
30
#include <memory>
31
#include <variant>
32
33
#include "cloud/cloud_base_compaction.h"
34
#include "cloud/cloud_compaction_stop_token.h"
35
#include "cloud/cloud_cumulative_compaction.h"
36
#include "cloud/cloud_cumulative_compaction_policy.h"
37
#include "cloud/cloud_full_compaction.h"
38
#include "cloud/cloud_index_change_compaction.h"
39
#include "cloud/cloud_meta_mgr.h"
40
#include "cloud/cloud_snapshot_mgr.h"
41
#include "cloud/cloud_tablet_hotspot.h"
42
#include "cloud/cloud_tablet_mgr.h"
43
#include "cloud/cloud_txn_delete_bitmap_cache.h"
44
#include "cloud/cloud_warm_up_manager.h"
45
#include "cloud/config.h"
46
#include "common/config.h"
47
#include "common/signal_handler.h"
48
#include "common/status.h"
49
#include "core/assert_cast.h"
50
#include "io/cache/block_file_cache_downloader.h"
51
#include "io/cache/block_file_cache_factory.h"
52
#include "io/cache/file_cache_common.h"
53
#include "io/fs/file_system.h"
54
#include "io/fs/hdfs_file_system.h"
55
#include "io/fs/s3_file_system.h"
56
#include "io/hdfs_util.h"
57
#include "io/io_common.h"
58
#include "load/memtable/memtable_flush_executor.h"
59
#include "runtime/memory/cache_manager.h"
60
#include "storage/compaction/cumulative_compaction_policy.h"
61
#include "storage/compaction/cumulative_compaction_time_series_policy.h"
62
#include "storage/storage_policy.h"
63
#include "util/parse_util.h"
64
#include "util/time.h"
65
66
namespace doris {
67
#include "common/compile_check_begin.h"
68
69
using namespace std::literals;
70
71
bvar::Adder<uint64_t> g_base_compaction_running_task_count("base_compaction_running_task_count");
72
bvar::Adder<uint64_t> g_full_compaction_running_task_count("full_compaction_running_task_count");
73
bvar::Adder<uint64_t> g_cumu_compaction_running_task_count(
74
        "cumulative_compaction_running_task_count");
75
76
0
int get_cumu_thread_num() {
77
0
    if (config::max_cumu_compaction_threads > 0) {
78
0
        return config::max_cumu_compaction_threads;
79
0
    }
80
81
0
    int num_cores = doris::CpuInfo::num_cores();
82
0
    return std::min(std::max(int(num_cores * config::cumu_compaction_thread_num_factor), 2), 20);
83
0
}
84
85
0
int get_base_thread_num() {
86
0
    if (config::max_base_compaction_threads > 0) {
87
0
        return config::max_base_compaction_threads;
88
0
    }
89
90
0
    int num_cores = doris::CpuInfo::num_cores();
91
0
    return std::min(std::max(int(num_cores * config::base_compaction_thread_num_factor), 1), 10);
92
0
}
93
94
CloudStorageEngine::CloudStorageEngine(const EngineOptions& options)
95
153
        : BaseStorageEngine(Type::CLOUD, options.backend_uid),
96
153
          _meta_mgr(std::make_unique<cloud::CloudMetaMgr>()),
97
153
          _tablet_mgr(std::make_unique<CloudTabletMgr>(*this)),
98
153
          _options(options) {
99
153
    _cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY] =
100
153
            std::make_shared<CloudSizeBasedCumulativeCompactionPolicy>();
101
153
    _cumulative_compaction_policies[CUMULATIVE_TIME_SERIES_POLICY] =
102
153
            std::make_shared<CloudTimeSeriesCumulativeCompactionPolicy>();
103
153
    _startup_timepoint = std::chrono::system_clock::now();
104
153
}
105
106
153
CloudStorageEngine::~CloudStorageEngine() {
107
153
    stop();
108
153
}
109
110
static Status vault_process_error(std::string_view id,
111
0
                                  std::variant<S3Conf, cloud::HdfsVaultInfo>& vault, Status err) {
112
0
    std::stringstream ss;
113
0
    std::visit(
114
0
            [&]<typename T>(T& val) {
115
0
                if constexpr (std::is_same_v<T, S3Conf>) {
116
0
                    ss << val.to_string();
117
0
                } else if constexpr (std::is_same_v<T, cloud::HdfsVaultInfo>) {
118
0
                    val.SerializeToOstream(&ss);
119
0
                }
120
0
            },
Unexecuted instantiation: cloud_storage_engine.cpp:_ZZN5dorisL19vault_process_errorESt17basic_string_viewIcSt11char_traitsIcEERSt7variantIJNS_6S3ConfENS_5cloud13HdfsVaultInfoEEENS_6StatusEENK3$_0clIS5_EEDaRT_
Unexecuted instantiation: cloud_storage_engine.cpp:_ZZN5dorisL19vault_process_errorESt17basic_string_viewIcSt11char_traitsIcEERSt7variantIJNS_6S3ConfENS_5cloud13HdfsVaultInfoEEENS_6StatusEENK3$_0clIS7_EEDaRT_
121
0
            vault);
122
0
    return Status::IOError("Invalid vault, id {}, err {}, detail conf {}", id, err, ss.str());
123
0
}
124
125
struct VaultCreateFSVisitor {
126
    VaultCreateFSVisitor(const std::string& id, const cloud::StorageVaultPB_PathFormat& path_format,
127
                         bool check_fs)
128
0
            : id(id), path_format(path_format), check_fs(check_fs) {}
129
0
    Status operator()(const S3Conf& s3_conf) const {
130
0
        LOG(INFO) << "get new s3 info: " << s3_conf.to_string() << " resource_id=" << id
131
0
                  << " check_fs: " << check_fs;
132
133
0
        auto fs = DORIS_TRY(io::S3FileSystem::create(s3_conf, id));
134
0
        if (check_fs && !s3_conf.client_conf.role_arn.empty()) {
135
0
            bool res = false;
136
            // just check connectivity, not care object if exist
137
0
            auto st = fs->exists("not_exist_object", &res);
138
0
            if (!st.ok()) {
139
0
                LOG(FATAL) << "failed to check s3 fs, resource_id: " << id << " st: " << st
140
0
                           << "s3_conf: " << s3_conf.to_string()
141
0
                           << "add enable_check_storage_vault=false to be.conf to skip the check";
142
0
            }
143
0
        }
144
145
0
        put_storage_resource(id, {std::move(fs), path_format}, 0);
146
0
        LOG_INFO("successfully create s3 vault, vault id {}", id);
147
0
        return Status::OK();
148
0
    }
149
150
    // TODO(ByteYue): Make sure enable_java_support is on
151
0
    Status operator()(const cloud::HdfsVaultInfo& vault) const {
152
0
        auto hdfs_params = io::to_hdfs_params(vault);
153
0
        auto fs = DORIS_TRY(io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name, id,
154
0
                                                       nullptr, vault.prefix()));
155
0
        put_storage_resource(id, {std::move(fs), path_format}, 0);
156
0
        LOG_INFO("successfully create hdfs vault, vault id {}", id);
157
0
        return Status::OK();
158
0
    }
159
160
    const std::string& id;
161
    const cloud::StorageVaultPB_PathFormat& path_format;
162
    bool check_fs;
163
};
164
165
struct RefreshFSVaultVisitor {
166
    RefreshFSVaultVisitor(const std::string& id, io::FileSystemSPtr fs,
167
                          const cloud::StorageVaultPB_PathFormat& path_format)
168
0
            : id(id), fs(std::move(fs)), path_format(path_format) {}
169
170
0
    Status operator()(const S3Conf& s3_conf) const {
171
0
        DCHECK_EQ(fs->type(), io::FileSystemType::S3) << id;
172
0
        auto s3_fs = std::static_pointer_cast<io::S3FileSystem>(fs);
173
0
        auto client_holder = s3_fs->client_holder();
174
0
        auto st = client_holder->reset(s3_conf.client_conf);
175
0
        if (!st.ok()) {
176
0
            LOG(WARNING) << "failed to update s3 fs, resource_id=" << id << ": " << st;
177
0
        }
178
0
        return st;
179
0
    }
180
181
0
    Status operator()(const cloud::HdfsVaultInfo& vault) const {
182
0
        auto hdfs_params = io::to_hdfs_params(vault);
183
0
        auto hdfs_fs =
184
0
                DORIS_TRY(io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name, id, nullptr,
185
0
                                                     vault.has_prefix() ? vault.prefix() : ""));
186
0
        auto hdfs = std::static_pointer_cast<io::HdfsFileSystem>(hdfs_fs);
187
0
        put_storage_resource(id, {std::move(hdfs), path_format}, 0);
188
0
        return Status::OK();
189
0
    }
190
191
    const std::string& id;
192
    io::FileSystemSPtr fs;
193
    const cloud::StorageVaultPB_PathFormat& path_format;
194
};
195
196
0
Status CloudStorageEngine::open() {
197
0
    sync_storage_vault();
198
199
    // TODO(plat1ko): DeleteBitmapTxnManager
200
201
0
    _memtable_flush_executor = std::make_unique<MemTableFlushExecutor>();
202
    // Use file cache disks number
203
0
    _memtable_flush_executor->init(
204
0
            cast_set<int32_t>(io::FileCacheFactory::instance()->get_cache_instance_size()));
205
206
0
    _calc_delete_bitmap_executor = std::make_unique<CalcDeleteBitmapExecutor>();
207
0
    _calc_delete_bitmap_executor->init(config::calc_delete_bitmap_max_thread);
208
209
0
    _calc_delete_bitmap_executor_for_load = std::make_unique<CalcDeleteBitmapExecutor>();
210
0
    _calc_delete_bitmap_executor_for_load->init(
211
0
            config::calc_delete_bitmap_for_load_max_thread > 0
212
0
                    ? config::calc_delete_bitmap_for_load_max_thread
213
0
                    : std::max(1, CpuInfo::num_cores() / 2));
214
215
    // The default cache is set to 100MB, use memory limit to dynamic adjustment
216
0
    bool is_percent = false;
217
0
    int64_t delete_bitmap_agg_cache_cache_limit =
218
0
            ParseUtil::parse_mem_spec(config::delete_bitmap_dynamic_agg_cache_limit,
219
0
                                      MemInfo::mem_limit(), MemInfo::physical_mem(), &is_percent);
220
0
    _txn_delete_bitmap_cache = std::make_unique<CloudTxnDeleteBitmapCache>(
221
0
            delete_bitmap_agg_cache_cache_limit > config::delete_bitmap_agg_cache_capacity
222
0
                    ? delete_bitmap_agg_cache_cache_limit
223
0
                    : config::delete_bitmap_agg_cache_capacity);
224
0
    RETURN_IF_ERROR(_txn_delete_bitmap_cache->init());
225
226
0
    _committed_rs_mgr = std::make_unique<CloudCommittedRSMgr>();
227
0
    RETURN_IF_ERROR(_committed_rs_mgr->init());
228
229
0
    _file_cache_block_downloader = std::make_unique<io::FileCacheBlockDownloader>(*this);
230
231
0
    _cloud_warm_up_manager = std::make_unique<CloudWarmUpManager>(*this);
232
233
0
    _tablet_hotspot = std::make_unique<TabletHotspot>();
234
235
0
    _cloud_snapshot_mgr = std::make_unique<CloudSnapshotMgr>(*this);
236
237
0
    RETURN_NOT_OK_STATUS_WITH_WARN(
238
0
            init_stream_load_recorder(ExecEnv::GetInstance()->store_paths()[0].path),
239
0
            "init StreamLoadRecorder failed");
240
241
    // check cluster id
242
0
    RETURN_NOT_OK_STATUS_WITH_WARN(_check_all_root_path_cluster_id(), "fail to check cluster id");
243
244
0
    RETURN_NOT_OK_STATUS_WITH_WARN(ThreadPoolBuilder("SyncLoadForTabletsThreadPool")
245
0
                                           .set_max_threads(config::sync_load_for_tablets_thread)
246
0
                                           .set_min_threads(config::sync_load_for_tablets_thread)
247
0
                                           .build(&_sync_load_for_tablets_thread_pool),
248
0
                                   "fail to build SyncLoadForTabletsThreadPool");
249
250
0
    RETURN_NOT_OK_STATUS_WITH_WARN(ThreadPoolBuilder("WarmupCacheAsyncThreadPool")
251
0
                                           .set_max_threads(config::warmup_cache_async_thread)
252
0
                                           .set_min_threads(config::warmup_cache_async_thread)
253
0
                                           .build(&_warmup_cache_async_thread_pool),
254
0
                                   "fail to build WarmupCacheAsyncThreadPool");
255
256
0
    return Status::OK();
257
0
}
258
259
153
void CloudStorageEngine::stop() {
260
153
    if (_stopped) {
261
0
        return;
262
0
    }
263
264
153
    _stopped = true;
265
153
    _stop_background_threads_latch.count_down();
266
267
153
    for (auto&& t : _bg_threads) {
268
0
        if (t) {
269
0
            t->join();
270
0
        }
271
0
    }
272
273
153
    if (_base_compaction_thread_pool) {
274
0
        _base_compaction_thread_pool->shutdown();
275
0
    }
276
153
    if (_cumu_compaction_thread_pool) {
277
0
        _cumu_compaction_thread_pool->shutdown();
278
0
    }
279
153
    LOG(INFO) << "Cloud storage engine is stopped.";
280
281
153
    if (_calc_tablet_delete_bitmap_task_thread_pool) {
282
0
        _calc_tablet_delete_bitmap_task_thread_pool->shutdown();
283
0
    }
284
153
    if (_sync_delete_bitmap_thread_pool) {
285
0
        _sync_delete_bitmap_thread_pool->shutdown();
286
0
    }
287
153
}
288
289
0
bool CloudStorageEngine::stopped() {
290
0
    return _stopped;
291
0
}
292
293
Result<BaseTabletSPtr> CloudStorageEngine::get_tablet(int64_t tablet_id,
294
                                                      SyncRowsetStats* sync_stats,
295
                                                      bool force_use_only_cached,
296
0
                                                      bool cache_on_miss) {
297
0
    return _tablet_mgr
298
0
            ->get_tablet(tablet_id, false, true, sync_stats, force_use_only_cached, cache_on_miss)
299
0
            .transform([](auto&& t) { return static_pointer_cast<BaseTablet>(std::move(t)); });
300
0
}
301
302
Status CloudStorageEngine::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta,
303
0
                                           bool force_use_only_cached) {
304
0
    if (tablet_meta == nullptr) {
305
0
        return Status::InvalidArgument("tablet_meta output is null");
306
0
    }
307
308
#if 0
309
    if (_tablet_mgr && _tablet_mgr->peek_tablet_meta(tablet_id, tablet_meta)) {
310
        return Status::OK();
311
    }
312
313
    if (force_use_only_cached) {
314
        return Status::NotFound("tablet meta {} not found in cache", tablet_id);
315
    }
316
#endif
317
318
0
    if (_meta_mgr == nullptr) {
319
0
        return Status::InternalError("cloud meta manager is not initialized");
320
0
    }
321
322
0
    return _meta_mgr->get_tablet_meta(tablet_id, tablet_meta);
323
0
}
324
325
0
Status CloudStorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr) {
326
0
    RETURN_IF_ERROR(Thread::create(
327
0
            "CloudStorageEngine", "refresh_s3_info_thread",
328
0
            [this]() { this->_refresh_storage_vault_info_thread_callback(); },
329
0
            &_bg_threads.emplace_back()));
330
0
    LOG(INFO) << "refresh s3 info thread started";
331
332
0
    RETURN_IF_ERROR(Thread::create(
333
0
            "CloudStorageEngine", "vacuum_stale_rowsets_thread",
334
0
            [this]() { this->_vacuum_stale_rowsets_thread_callback(); },
335
0
            &_bg_threads.emplace_back()));
336
0
    LOG(INFO) << "vacuum stale rowsets thread started";
337
338
0
    RETURN_IF_ERROR(Thread::create(
339
0
            "CloudStorageEngine", "sync_tablets_thread",
340
0
            [this]() { this->_sync_tablets_thread_callback(); }, &_bg_threads.emplace_back()));
341
0
    LOG(INFO) << "sync tablets thread started";
342
343
0
    RETURN_IF_ERROR(Thread::create(
344
0
            "CloudStorageEngine", "evict_querying_rowset_thread",
345
0
            [this]() { this->_evict_quring_rowset_thread_callback(); },
346
0
            &_evict_quering_rowset_thread));
347
0
    LOG(INFO) << "evict quering thread started";
348
349
    // add calculate tablet delete bitmap task thread pool
350
0
    RETURN_IF_ERROR(ThreadPoolBuilder("TabletCalDeleteBitmapThreadPool")
351
0
                            .set_min_threads(config::calc_tablet_delete_bitmap_task_max_thread)
352
0
                            .set_max_threads(config::calc_tablet_delete_bitmap_task_max_thread)
353
0
                            .build(&_calc_tablet_delete_bitmap_task_thread_pool));
354
0
    RETURN_IF_ERROR(ThreadPoolBuilder("SyncDeleteBitmapThreadPool")
355
0
                            .set_min_threads(config::sync_delete_bitmap_task_max_thread)
356
0
                            .set_max_threads(config::sync_delete_bitmap_task_max_thread)
357
0
                            .build(&_sync_delete_bitmap_thread_pool));
358
359
    // TODO(plat1ko): check_bucket_enable_versioning_thread
360
361
    // compaction tasks producer thread
362
0
    int base_thread_num = get_base_thread_num();
363
0
    int cumu_thread_num = get_cumu_thread_num();
364
365
0
    RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
366
0
                            .set_min_threads(base_thread_num)
367
0
                            .set_max_threads(base_thread_num)
368
0
                            .build(&_base_compaction_thread_pool));
369
0
    RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
370
0
                            .set_min_threads(cumu_thread_num)
371
0
                            .set_max_threads(cumu_thread_num)
372
0
                            .build(&_cumu_compaction_thread_pool));
373
0
    RETURN_IF_ERROR(Thread::create(
374
0
            "StorageEngine", "compaction_tasks_producer_thread",
375
0
            [this]() { this->_compaction_tasks_producer_callback(); },
376
0
            &_bg_threads.emplace_back()));
377
0
    LOG(INFO) << "compaction tasks producer thread started,"
378
0
              << " base thread num " << base_thread_num << " cumu thread num " << cumu_thread_num;
379
380
0
    RETURN_IF_ERROR(Thread::create(
381
0
            "StorageEngine", "lease_compaction_thread",
382
0
            [this]() { this->_lease_compaction_thread_callback(); }, &_bg_threads.emplace_back()));
383
384
0
    LOG(INFO) << "lease compaction thread started";
385
386
0
    RETURN_IF_ERROR(Thread::create(
387
0
            "StorageEngine", "check_tablet_delete_bitmap_score_thread",
388
0
            [this]() { this->_check_tablet_delete_bitmap_score_callback(); },
389
0
            &_bg_threads.emplace_back()));
390
0
    LOG(INFO) << "check tablet delete bitmap score thread started";
391
392
0
    return Status::OK();
393
0
}
394
395
0
void CloudStorageEngine::sync_storage_vault() {
396
0
    cloud::StorageVaultInfos vault_infos;
397
0
    bool enable_storage_vault = false;
398
399
0
    auto st = _meta_mgr->get_storage_vault_info(&vault_infos, &enable_storage_vault);
400
0
    if (!st.ok()) {
401
0
        LOG(WARNING) << "failed to get storage vault info. err=" << st;
402
0
        return;
403
0
    }
404
405
0
    if (vault_infos.empty()) {
406
0
        LOG(WARNING) << "empty storage vault info";
407
0
        return;
408
0
    }
409
410
0
    bool check_storage_vault = false;
411
0
    bool expected = false;
412
0
    if (first_sync_storage_vault.compare_exchange_strong(expected, true)) {
413
0
        check_storage_vault = config::enable_check_storage_vault;
414
0
        LOG(INFO) << "first sync storage vault info, BE try to check iam role connectivity, "
415
0
                     "check_storage_vault="
416
0
                  << check_storage_vault;
417
0
    }
418
419
0
    for (auto& [id, vault_info, path_format] : vault_infos) {
420
0
        auto fs = get_filesystem(id);
421
0
        auto status =
422
0
                (fs == nullptr)
423
0
                        ? std::visit(VaultCreateFSVisitor {id, path_format, check_storage_vault},
424
0
                                     vault_info)
425
0
                        : std::visit(RefreshFSVaultVisitor {id, std::move(fs), path_format},
426
0
                                     vault_info);
427
0
        if (!status.ok()) [[unlikely]] {
428
0
            LOG(WARNING) << vault_process_error(id, vault_info, std::move(st));
429
0
        }
430
0
    }
431
432
0
    if (auto& id = std::get<0>(vault_infos.back());
433
0
        (latest_fs() == nullptr || latest_fs()->id() != id) && !enable_storage_vault) {
434
0
        set_latest_fs(get_filesystem(id));
435
0
    }
436
0
}
437
438
// We should enable_java_support if we want to use hdfs vault
439
0
void CloudStorageEngine::_refresh_storage_vault_info_thread_callback() {
440
0
    while (!_stop_background_threads_latch.wait_for(
441
0
            std::chrono::seconds(config::refresh_s3_info_interval_s))) {
442
0
        sync_storage_vault();
443
0
    }
444
0
}
445
446
0
void CloudStorageEngine::_vacuum_stale_rowsets_thread_callback() {
447
0
    while (!_stop_background_threads_latch.wait_for(
448
0
            std::chrono::seconds(config::vacuum_stale_rowsets_interval_s))) {
449
0
        _tablet_mgr->vacuum_stale_rowsets(_stop_background_threads_latch);
450
0
    }
451
0
}
452
453
0
void CloudStorageEngine::_sync_tablets_thread_callback() {
454
0
    while (!_stop_background_threads_latch.wait_for(
455
0
            std::chrono::seconds(config::schedule_sync_tablets_interval_s))) {
456
0
        _tablet_mgr->sync_tablets(_stop_background_threads_latch);
457
0
    }
458
0
}
459
460
void CloudStorageEngine::get_cumu_compaction(
461
0
        int64_t tablet_id, std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res) {
462
0
    std::lock_guard lock(_compaction_mtx);
463
0
    if (auto it = _submitted_cumu_compactions.find(tablet_id);
464
0
        it != _submitted_cumu_compactions.end()) {
465
0
        res = it->second;
466
0
    }
467
0
}
468
469
0
Status CloudStorageEngine::_adjust_compaction_thread_num() {
470
0
    int base_thread_num = get_base_thread_num();
471
472
0
    if (!_base_compaction_thread_pool || !_cumu_compaction_thread_pool) {
473
0
        LOG(WARNING) << "base or cumu compaction thread pool is not created";
474
0
        return Status::Error<ErrorCode::INTERNAL_ERROR, false>("");
475
0
    }
476
477
0
    if (_base_compaction_thread_pool->max_threads() != base_thread_num) {
478
0
        int old_max_threads = _base_compaction_thread_pool->max_threads();
479
0
        Status status = _base_compaction_thread_pool->set_max_threads(base_thread_num);
480
0
        if (status.ok()) {
481
0
            VLOG_NOTICE << "update base compaction thread pool max_threads from " << old_max_threads
482
0
                        << " to " << base_thread_num;
483
0
        }
484
0
    }
485
0
    if (_base_compaction_thread_pool->min_threads() != base_thread_num) {
486
0
        int old_min_threads = _base_compaction_thread_pool->min_threads();
487
0
        Status status = _base_compaction_thread_pool->set_min_threads(base_thread_num);
488
0
        if (status.ok()) {
489
0
            VLOG_NOTICE << "update base compaction thread pool min_threads from " << old_min_threads
490
0
                        << " to " << base_thread_num;
491
0
        }
492
0
    }
493
494
0
    int cumu_thread_num = get_cumu_thread_num();
495
0
    if (_cumu_compaction_thread_pool->max_threads() != cumu_thread_num) {
496
0
        int old_max_threads = _cumu_compaction_thread_pool->max_threads();
497
0
        Status status = _cumu_compaction_thread_pool->set_max_threads(cumu_thread_num);
498
0
        if (status.ok()) {
499
0
            VLOG_NOTICE << "update cumu compaction thread pool max_threads from " << old_max_threads
500
0
                        << " to " << cumu_thread_num;
501
0
        }
502
0
    }
503
0
    if (_cumu_compaction_thread_pool->min_threads() != cumu_thread_num) {
504
0
        int old_min_threads = _cumu_compaction_thread_pool->min_threads();
505
0
        Status status = _cumu_compaction_thread_pool->set_min_threads(cumu_thread_num);
506
0
        if (status.ok()) {
507
0
            VLOG_NOTICE << "update cumu compaction thread pool min_threads from " << old_min_threads
508
0
                        << " to " << cumu_thread_num;
509
0
        }
510
0
    }
511
0
    return Status::OK();
512
0
}
513
514
0
void CloudStorageEngine::_compaction_tasks_producer_callback() {
515
0
    LOG(INFO) << "try to start compaction producer process!";
516
517
0
    int round = 0;
518
0
    CompactionType compaction_type;
519
520
    // Used to record the time when the score metric was last updated.
521
    // The update of the score metric is accompanied by the logic of selecting the tablet.
522
    // If there is no slot available, the logic of selecting the tablet will be terminated,
523
    // which causes the score metric update to be terminated.
524
    // In order to avoid this situation, we need to update the score regularly.
525
0
    int64_t last_cumulative_score_update_time = 0;
526
0
    int64_t last_base_score_update_time = 0;
527
0
    static const int64_t check_score_interval_ms = 5000; // 5 secs
528
529
0
    int64_t interval = config::generate_compaction_tasks_interval_ms;
530
0
    do {
531
0
        int64_t cur_time = UnixMillis();
532
0
        if (!config::disable_auto_compaction) {
533
0
            Status st = _adjust_compaction_thread_num();
534
0
            if (!st.ok()) {
535
0
                break;
536
0
            }
537
538
0
            bool check_score = false;
539
0
            if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) {
540
0
                compaction_type = CompactionType::CUMULATIVE_COMPACTION;
541
0
                round++;
542
0
                if (cur_time - last_cumulative_score_update_time >= check_score_interval_ms) {
543
0
                    check_score = true;
544
0
                    last_cumulative_score_update_time = cur_time;
545
0
                }
546
0
            } else {
547
0
                compaction_type = CompactionType::BASE_COMPACTION;
548
0
                round = 0;
549
0
                if (cur_time - last_base_score_update_time >= check_score_interval_ms) {
550
0
                    check_score = true;
551
0
                    last_base_score_update_time = cur_time;
552
0
                }
553
0
            }
554
0
            std::unique_ptr<ThreadPool>& thread_pool =
555
0
                    (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
556
0
                            ? _cumu_compaction_thread_pool
557
0
                            : _base_compaction_thread_pool;
558
0
            VLOG_CRITICAL << "compaction thread pool. type: "
559
0
                          << (compaction_type == CompactionType::CUMULATIVE_COMPACTION ? "CUMU"
560
0
                                                                                       : "BASE")
561
0
                          << ", num_threads: " << thread_pool->num_threads()
562
0
                          << ", num_threads_pending_start: "
563
0
                          << thread_pool->num_threads_pending_start()
564
0
                          << ", num_active_threads: " << thread_pool->num_active_threads()
565
0
                          << ", max_threads: " << thread_pool->max_threads()
566
0
                          << ", min_threads: " << thread_pool->min_threads()
567
0
                          << ", num_total_queued_tasks: " << thread_pool->get_queue_size();
568
0
            std::vector<CloudTabletSPtr> tablets_compaction =
569
0
                    _generate_cloud_compaction_tasks(compaction_type, check_score);
570
571
            /// Regardless of whether the tablet is submitted for compaction or not,
572
            /// we need to call 'reset_compaction' to clean up the base_compaction or cumulative_compaction objects
573
            /// in the tablet, because these two objects store the tablet's own shared_ptr.
574
            /// If it is not cleaned up, the reference count of the tablet will always be greater than 1,
575
            /// thus cannot be collected by the garbage collector. (TabletManager::start_trash_sweep)
576
0
            for (const auto& tablet : tablets_compaction) {
577
0
                Status status = submit_compaction_task(tablet, compaction_type);
578
0
                if (status.ok()) continue;
579
0
                if ((!status.is<ErrorCode::BE_NO_SUITABLE_VERSION>() &&
580
0
                     !status.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) ||
581
0
                    VLOG_DEBUG_IS_ON) {
582
0
                    LOG(WARNING) << "failed to submit compaction task for tablet: "
583
0
                                 << tablet->tablet_id() << ", err: " << status;
584
0
                }
585
0
            }
586
0
            interval = config::generate_compaction_tasks_interval_ms;
587
0
        } else {
588
0
            interval = config::check_auto_compaction_interval_seconds * 1000;
589
0
        }
590
0
        int64_t end_time = UnixMillis();
591
0
        DorisMetrics::instance()->compaction_producer_callback_a_round_time->set_value(end_time -
592
0
                                                                                       cur_time);
593
0
    } while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
594
0
}
595
596
void CloudStorageEngine::unregister_index_change_compaction(int64_t tablet_id,
597
4
                                                            bool is_base_compact) {
598
4
    std::lock_guard lock(_compaction_mtx);
599
4
    if (is_base_compact) {
600
0
        _submitted_index_change_base_compaction.erase(tablet_id);
601
4
    } else {
602
4
        _submitted_index_change_cumu_compaction.erase(tablet_id);
603
4
    }
604
4
}
605
606
bool CloudStorageEngine::register_index_change_compaction(
607
        std::shared_ptr<CloudIndexChangeCompaction> compact, int64_t tablet_id,
608
8
        bool is_base_compact, std::string& err_reason) {
609
8
    std::lock_guard lock(_compaction_mtx);
610
8
    if (is_base_compact) {
611
2
        if (_submitted_base_compactions.contains(tablet_id) ||
612
2
            _submitted_full_compactions.contains(tablet_id) ||
613
2
            _submitted_index_change_base_compaction.contains(tablet_id)) {
614
1
            std::stringstream ss;
615
1
            ss << "reason:" << ((int)_submitted_base_compactions.contains(tablet_id)) << ", "
616
1
               << ((int)_submitted_full_compactions.contains(tablet_id)) << ", "
617
1
               << ((int)_submitted_index_change_base_compaction.contains(tablet_id));
618
1
            err_reason = ss.str();
619
1
            return false;
620
1
        } else {
621
1
            _submitted_index_change_base_compaction[tablet_id] = compact;
622
1
            return true;
623
1
        }
624
6
    } else {
625
6
        if (_tablet_preparing_cumu_compaction.contains(tablet_id) ||
626
6
            _submitted_cumu_compactions.contains(tablet_id) ||
627
6
            _submitted_index_change_cumu_compaction.contains(tablet_id)) {
628
1
            std::stringstream ss;
629
1
            ss << "reason:" << ((int)_tablet_preparing_cumu_compaction.contains(tablet_id)) << ", "
630
1
               << ((int)_submitted_cumu_compactions.contains(tablet_id)) << ", "
631
1
               << ((int)_submitted_index_change_cumu_compaction.contains(tablet_id));
632
1
            err_reason = ss.str();
633
1
            return false;
634
5
        } else {
635
5
            _submitted_index_change_cumu_compaction[tablet_id] = compact;
636
5
        }
637
5
        return true;
638
6
    }
639
8
}
640
641
std::vector<CloudTabletSPtr> CloudStorageEngine::_generate_cloud_compaction_tasks(
642
0
        CompactionType compaction_type, bool check_score) {
643
0
    std::vector<std::shared_ptr<CloudTablet>> tablets_compaction;
644
645
0
    int64_t max_compaction_score = 0;
646
0
    std::unordered_set<int64_t> tablet_preparing_cumu_compaction;
647
0
    std::unordered_map<int64_t, std::vector<std::shared_ptr<CloudCumulativeCompaction>>>
648
0
            submitted_cumu_compactions;
649
0
    std::unordered_map<int64_t, std::shared_ptr<CloudBaseCompaction>> submitted_base_compactions;
650
0
    std::unordered_map<int64_t, std::shared_ptr<CloudFullCompaction>> submitted_full_compactions;
651
0
    std::unordered_map<int64_t, std::shared_ptr<CloudIndexChangeCompaction>>
652
0
            submitted_index_change_cumu_compactions;
653
0
    std::unordered_map<int64_t, std::shared_ptr<CloudIndexChangeCompaction>>
654
0
            submitted_index_change_base_compactions;
655
0
    {
656
0
        std::lock_guard lock(_compaction_mtx);
657
0
        tablet_preparing_cumu_compaction = _tablet_preparing_cumu_compaction;
658
0
        submitted_cumu_compactions = _submitted_cumu_compactions;
659
0
        submitted_base_compactions = _submitted_base_compactions;
660
0
        submitted_full_compactions = _submitted_full_compactions;
661
0
        submitted_index_change_cumu_compactions = _submitted_index_change_cumu_compaction;
662
0
        submitted_index_change_base_compactions = _submitted_index_change_base_compaction;
663
0
    }
664
665
0
    bool need_pick_tablet = true;
666
0
    int thread_per_disk =
667
0
            config::compaction_task_num_per_fast_disk; // all disks are fast in cloud mode
668
0
    int num_cumu =
669
0
            std::accumulate(submitted_cumu_compactions.begin(), submitted_cumu_compactions.end(), 0,
670
0
                            [](int a, auto& b) { return a + b.second.size(); });
671
0
    int num_base =
672
0
            cast_set<int>(submitted_base_compactions.size() + submitted_full_compactions.size());
673
0
    int n = thread_per_disk - num_cumu - num_base;
674
0
    if (compaction_type == CompactionType::BASE_COMPACTION) {
675
        // We need to reserve at least one thread for cumulative compaction,
676
        // because base compactions may take too long to complete, which may
677
        // leads to "too many rowsets" error.
678
0
        int base_n = std::min(config::max_base_compaction_task_num_per_disk, thread_per_disk - 1) -
679
0
                     num_base;
680
0
        n = std::min(base_n, n);
681
0
    }
682
0
    if (n <= 0) { // No threads available
683
0
        if (!check_score) return tablets_compaction;
684
0
        need_pick_tablet = false;
685
0
        n = 0;
686
0
    }
687
688
    // Return true for skipping compaction
689
0
    std::function<bool(CloudTablet*)> filter_out;
690
0
    if (compaction_type == CompactionType::BASE_COMPACTION) {
691
0
        filter_out = [&submitted_base_compactions, &submitted_full_compactions,
692
0
                      &submitted_index_change_base_compactions](CloudTablet* t) {
693
0
            return submitted_base_compactions.contains(t->tablet_id()) ||
694
0
                   submitted_full_compactions.contains(t->tablet_id()) ||
695
0
                   submitted_index_change_base_compactions.contains(t->tablet_id()) ||
696
0
                   t->tablet_state() != TABLET_RUNNING;
697
0
        };
698
0
    } else if (config::enable_parallel_cumu_compaction) {
699
0
        filter_out = [&tablet_preparing_cumu_compaction,
700
0
                      &submitted_index_change_cumu_compactions](CloudTablet* t) {
701
0
            return tablet_preparing_cumu_compaction.contains(t->tablet_id()) ||
702
0
                   submitted_index_change_cumu_compactions.contains(t->tablet_id()) ||
703
0
                   (t->tablet_state() != TABLET_RUNNING &&
704
0
                    (!config::enable_new_tablet_do_compaction || t->alter_version() == -1));
705
0
        };
706
0
    } else {
707
0
        filter_out = [&tablet_preparing_cumu_compaction, &submitted_cumu_compactions,
708
0
                      &submitted_index_change_cumu_compactions](CloudTablet* t) {
709
0
            return tablet_preparing_cumu_compaction.contains(t->tablet_id()) ||
710
0
                   submitted_index_change_cumu_compactions.contains(t->tablet_id()) ||
711
0
                   submitted_cumu_compactions.contains(t->tablet_id()) ||
712
0
                   (t->tablet_state() != TABLET_RUNNING &&
713
0
                    (!config::enable_new_tablet_do_compaction || t->alter_version() == -1));
714
0
        };
715
0
    }
716
717
    // Even if need_pick_tablet is false, we still need to call find_best_tablet_to_compaction(),
718
    // So that we can update the max_compaction_score metric.
719
0
    do {
720
0
        std::vector<CloudTabletSPtr> tablets;
721
0
        auto st = tablet_mgr().get_topn_tablets_to_compact(n, compaction_type, filter_out, &tablets,
722
0
                                                           &max_compaction_score);
723
0
        if (!st.ok()) {
724
0
            LOG(WARNING) << "failed to get tablets to compact, err=" << st;
725
0
            break;
726
0
        }
727
0
        if (!need_pick_tablet) break;
728
0
        tablets_compaction = std::move(tablets);
729
0
    } while (false);
730
731
0
    if (max_compaction_score > 0) {
732
0
        if (compaction_type == CompactionType::BASE_COMPACTION) {
733
0
            DorisMetrics::instance()->tablet_base_max_compaction_score->set_value(
734
0
                    max_compaction_score);
735
0
        } else {
736
0
            DorisMetrics::instance()->tablet_cumulative_max_compaction_score->set_value(
737
0
                    max_compaction_score);
738
0
        }
739
0
    }
740
741
0
    return tablets_compaction;
742
0
}
743
744
Status CloudStorageEngine::_request_tablet_global_compaction_lock(
745
        ReaderType compaction_type, const CloudTabletSPtr& tablet,
746
0
        std::shared_ptr<CloudCompactionMixin> compaction) {
747
0
    long now = duration_cast<std::chrono::milliseconds>(
748
0
                       std::chrono::system_clock::now().time_since_epoch())
749
0
                       .count();
750
0
    if (compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION) {
751
0
        auto cumu_compaction = static_pointer_cast<CloudCumulativeCompaction>(compaction);
752
0
        if (auto st = cumu_compaction->request_global_lock(); !st.ok()) {
753
0
            LOG_WARNING("failed to request cumu compactoin global lock")
754
0
                    .tag("tablet id", tablet->tablet_id())
755
0
                    .tag("msg", st.to_string());
756
0
            tablet->set_last_cumu_compaction_failure_time(now);
757
0
            return st;
758
0
        }
759
0
        {
760
0
            std::lock_guard lock(_compaction_mtx);
761
0
            _executing_cumu_compactions[tablet->tablet_id()].push_back(cumu_compaction);
762
0
        }
763
0
        return Status::OK();
764
0
    } else if (compaction_type == ReaderType::READER_BASE_COMPACTION) {
765
0
        auto base_compaction = static_pointer_cast<CloudBaseCompaction>(compaction);
766
0
        if (auto st = base_compaction->request_global_lock(); !st.ok()) {
767
0
            LOG_WARNING("failed to request base compactoin global lock")
768
0
                    .tag("tablet id", tablet->tablet_id())
769
0
                    .tag("msg", st.to_string());
770
0
            tablet->set_last_base_compaction_failure_time(now);
771
0
            return st;
772
0
        }
773
0
        {
774
0
            std::lock_guard lock(_compaction_mtx);
775
0
            _executing_base_compactions[tablet->tablet_id()] = base_compaction;
776
0
        }
777
0
        return Status::OK();
778
0
    } else if (compaction_type == ReaderType::READER_FULL_COMPACTION) {
779
0
        auto full_compaction = static_pointer_cast<CloudFullCompaction>(compaction);
780
0
        if (auto st = full_compaction->request_global_lock(); !st.ok()) {
781
0
            LOG_WARNING("failed to request full compactoin global lock")
782
0
                    .tag("tablet id", tablet->tablet_id())
783
0
                    .tag("msg", st.to_string());
784
0
            tablet->set_last_full_compaction_failure_time(now);
785
0
            return st;
786
0
        }
787
0
        {
788
0
            std::lock_guard lock(_compaction_mtx);
789
0
            _executing_full_compactions[tablet->tablet_id()] = full_compaction;
790
0
        }
791
0
        return Status::OK();
792
0
    } else {
793
0
        LOG(WARNING) << "unsupport compaction task for tablet: " << tablet->tablet_id()
794
0
                     << ", compaction name: " << compaction->compaction_name();
795
0
        return Status::NotFound("Unsupport compaction type {}", compaction->compaction_name());
796
0
    }
797
0
}
798
799
0
Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& tablet) {
800
0
    using namespace std::chrono;
801
0
    {
802
0
        std::lock_guard lock(_compaction_mtx);
803
        // Take a placeholder for base compaction
804
0
        auto [_, success] = _submitted_base_compactions.emplace(tablet->tablet_id(), nullptr);
805
0
        if (!success) {
806
0
            return Status::AlreadyExist(
807
0
                    "other base compaction or full compaction is submitted, tablet_id={}",
808
0
                    tablet->tablet_id());
809
0
        }
810
0
    }
811
0
    auto compaction = std::make_shared<CloudBaseCompaction>(*this, tablet);
812
0
    auto st = compaction->prepare_compact();
813
0
    if (!st.ok()) {
814
0
        long now = duration_cast<std::chrono::milliseconds>(
815
0
                           std::chrono::system_clock::now().time_since_epoch())
816
0
                           .count();
817
0
        tablet->set_last_base_compaction_failure_time(now);
818
0
        std::lock_guard lock(_compaction_mtx);
819
0
        _submitted_base_compactions.erase(tablet->tablet_id());
820
0
        return st;
821
0
    }
822
0
    {
823
0
        std::lock_guard lock(_compaction_mtx);
824
0
        _submitted_base_compactions[tablet->tablet_id()] = compaction;
825
0
    }
826
0
    st = _base_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() {
827
0
        DorisMetrics::instance()->base_compaction_task_running_total->increment(1);
828
0
        DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
829
0
                _base_compaction_thread_pool->get_queue_size());
830
0
        g_base_compaction_running_task_count << 1;
831
0
        signal::tablet_id = tablet->tablet_id();
832
0
        Defer defer {[&]() {
833
0
            g_base_compaction_running_task_count << -1;
834
0
            std::lock_guard lock(_compaction_mtx);
835
0
            _submitted_base_compactions.erase(tablet->tablet_id());
836
0
            DorisMetrics::instance()->base_compaction_task_running_total->increment(-1);
837
0
            DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
838
0
                    _base_compaction_thread_pool->get_queue_size());
839
0
        }};
840
0
        auto st = _request_tablet_global_compaction_lock(ReaderType::READER_BASE_COMPACTION, tablet,
841
0
                                                         compaction);
842
0
        if (!st.ok()) return;
843
0
        st = compaction->execute_compact();
844
0
        if (!st.ok()) {
845
            // Error log has been output in `execute_compact`
846
0
            long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
847
0
            tablet->set_last_base_compaction_failure_time(now);
848
0
        }
849
0
        std::lock_guard lock(_compaction_mtx);
850
0
        _executing_base_compactions.erase(tablet->tablet_id());
851
0
    });
852
0
    DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
853
0
            _base_compaction_thread_pool->get_queue_size());
854
0
    if (!st.ok()) {
855
0
        std::lock_guard lock(_compaction_mtx);
856
0
        _submitted_base_compactions.erase(tablet->tablet_id());
857
0
        return Status::InternalError("failed to submit base compaction, tablet_id={}",
858
0
                                     tablet->tablet_id());
859
0
    }
860
0
    return st;
861
0
}
862
863
0
Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletSPtr& tablet) {
864
0
    using namespace std::chrono;
865
0
    {
866
0
        std::lock_guard lock(_compaction_mtx);
867
0
        if (!config::enable_parallel_cumu_compaction &&
868
0
            _submitted_cumu_compactions.count(tablet->tablet_id())) {
869
0
            return Status::AlreadyExist("other cumu compaction is submitted, tablet_id={}",
870
0
                                        tablet->tablet_id());
871
0
        }
872
0
        auto [_, success] = _tablet_preparing_cumu_compaction.insert(tablet->tablet_id());
873
0
        if (!success) {
874
0
            return Status::AlreadyExist("other cumu compaction is preparing, tablet_id={}",
875
0
                                        tablet->tablet_id());
876
0
        }
877
0
    }
878
0
    auto compaction = std::make_shared<CloudCumulativeCompaction>(*this, tablet);
879
0
    auto st = compaction->prepare_compact();
880
0
    if (!st.ok()) {
881
0
        long now = duration_cast<std::chrono::milliseconds>(
882
0
                           std::chrono::system_clock::now().time_since_epoch())
883
0
                           .count();
884
0
        if (!st.is<ErrorCode::CUMULATIVE_MEET_DELETE_VERSION>()) {
885
0
            if (st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) {
886
                // Backoff strategy if no suitable version
887
0
                tablet->last_cumu_no_suitable_version_ms = now;
888
0
            } else {
889
0
                tablet->set_last_cumu_compaction_failure_time(now);
890
0
            }
891
0
        }
892
0
        std::lock_guard lock(_compaction_mtx);
893
0
        _tablet_preparing_cumu_compaction.erase(tablet->tablet_id());
894
0
        return st;
895
0
    }
896
0
    {
897
0
        std::lock_guard lock(_compaction_mtx);
898
0
        _tablet_preparing_cumu_compaction.erase(tablet->tablet_id());
899
0
        _submitted_cumu_compactions[tablet->tablet_id()].push_back(compaction);
900
0
    }
901
0
    auto erase_submitted_cumu_compaction = [=, this]() {
902
0
        std::lock_guard lock(_compaction_mtx);
903
0
        auto it = _submitted_cumu_compactions.find(tablet->tablet_id());
904
0
        DCHECK(it != _submitted_cumu_compactions.end());
905
0
        auto& compactions = it->second;
906
0
        auto it1 = std::find(compactions.begin(), compactions.end(), compaction);
907
0
        DCHECK(it1 != compactions.end());
908
0
        compactions.erase(it1);
909
0
        if (compactions.empty()) { // No compactions on this tablet, erase key
910
0
            _submitted_cumu_compactions.erase(it);
911
            // No cumu compaction on this tablet, reset `last_cumu_no_suitable_version_ms` to enable this tablet to
912
            // enter the compaction scheduling candidate set. The purpose of doing this is to have at least one BE perform
913
            // cumu compaction on tablet which has suitable versions for cumu compaction.
914
0
            tablet->last_cumu_no_suitable_version_ms = 0;
915
0
        }
916
0
    };
917
0
    auto erase_executing_cumu_compaction = [=, this]() {
918
0
        std::lock_guard lock(_compaction_mtx);
919
0
        auto it = _executing_cumu_compactions.find(tablet->tablet_id());
920
0
        DCHECK(it != _executing_cumu_compactions.end());
921
0
        auto& compactions = it->second;
922
0
        auto it1 = std::find(compactions.begin(), compactions.end(), compaction);
923
0
        DCHECK(it1 != compactions.end());
924
0
        compactions.erase(it1);
925
0
        if (compactions.empty()) { // No compactions on this tablet, erase key
926
0
            _executing_cumu_compactions.erase(it);
927
            // No cumu compaction on this tablet, reset `last_cumu_no_suitable_version_ms` to enable this tablet to
928
            // enter the compaction scheduling candidate set. The purpose of doing this is to have at least one BE perform
929
            // cumu compaction on tablet which has suitable versions for cumu compaction.
930
0
            tablet->last_cumu_no_suitable_version_ms = 0;
931
0
        }
932
0
    };
933
0
    st = _cumu_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() {
934
0
        DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(1);
935
0
        DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
936
0
                _cumu_compaction_thread_pool->get_queue_size());
937
0
        DBUG_EXECUTE_IF("CloudStorageEngine._submit_cumulative_compaction_task.wait_in_line",
938
0
                        { sleep(5); })
939
0
        signal::tablet_id = tablet->tablet_id();
940
0
        g_cumu_compaction_running_task_count << 1;
941
0
        bool is_large_task = true;
942
0
        Defer defer {[&]() {
943
0
            DBUG_EXECUTE_IF("CloudStorageEngine._submit_cumulative_compaction_task.sleep",
944
0
                            { sleep(5); })
945
0
            std::lock_guard lock(_cumu_compaction_delay_mtx);
946
0
            _cumu_compaction_thread_pool_used_threads--;
947
0
            if (!is_large_task) {
948
0
                _cumu_compaction_thread_pool_small_tasks_running--;
949
0
            }
950
0
            g_cumu_compaction_running_task_count << -1;
951
0
            erase_submitted_cumu_compaction();
952
0
            DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(-1);
953
0
            DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
954
0
                    _cumu_compaction_thread_pool->get_queue_size());
955
0
        }};
956
0
        auto st = _request_tablet_global_compaction_lock(ReaderType::READER_CUMULATIVE_COMPACTION,
957
0
                                                         tablet, compaction);
958
0
        if (!st.ok()) return;
959
0
        do {
960
0
            std::lock_guard lock(_cumu_compaction_delay_mtx);
961
0
            _cumu_compaction_thread_pool_used_threads++;
962
0
            if (config::large_cumu_compaction_task_min_thread_num > 1 &&
963
0
                _cumu_compaction_thread_pool->max_threads() >=
964
0
                        config::large_cumu_compaction_task_min_thread_num) {
965
                // Determine if this is a small task based on configured thresholds
966
0
                is_large_task = (compaction->get_input_rowsets_bytes() >
967
0
                                         config::large_cumu_compaction_task_bytes_threshold ||
968
0
                                 compaction->get_input_num_rows() >
969
0
                                         config::large_cumu_compaction_task_row_num_threshold);
970
                // Small task. No delay needed
971
0
                if (!is_large_task) {
972
0
                    _cumu_compaction_thread_pool_small_tasks_running++;
973
0
                    break;
974
0
                }
975
                // Deal with large task
976
0
                if (_should_delay_large_task()) {
977
0
                    long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch())
978
0
                                       .count();
979
                    // sleep 5s for this tablet
980
0
                    tablet->set_last_cumu_compaction_failure_time(now);
981
0
                    erase_executing_cumu_compaction();
982
0
                    LOG_WARNING(
983
0
                            "failed to do CloudCumulativeCompaction, cumu thread pool is "
984
0
                            "intensive, delay large task.")
985
0
                            .tag("tablet_id", tablet->tablet_id())
986
0
                            .tag("input_rows", compaction->get_input_num_rows())
987
0
                            .tag("input_rowsets_total_size", compaction->get_input_rowsets_bytes())
988
0
                            .tag("config::large_cumu_compaction_task_bytes_threshold",
989
0
                                 config::large_cumu_compaction_task_bytes_threshold)
990
0
                            .tag("config::large_cumu_compaction_task_row_num_threshold",
991
0
                                 config::large_cumu_compaction_task_row_num_threshold)
992
0
                            .tag("remaining threads", _cumu_compaction_thread_pool_used_threads)
993
0
                            .tag("small_tasks_running",
994
0
                                 _cumu_compaction_thread_pool_small_tasks_running);
995
0
                    return;
996
0
                }
997
0
            }
998
0
        } while (false);
999
0
        st = compaction->execute_compact();
1000
0
        if (!st.ok()) {
1001
            // Error log has been output in `execute_compact`
1002
0
            long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
1003
0
            tablet->set_last_cumu_compaction_failure_time(now);
1004
0
        }
1005
0
        erase_executing_cumu_compaction();
1006
0
    });
1007
0
    DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
1008
0
            _cumu_compaction_thread_pool->get_queue_size());
1009
0
    if (!st.ok()) {
1010
0
        erase_submitted_cumu_compaction();
1011
0
        return Status::InternalError("failed to submit cumu compaction, tablet_id={}",
1012
0
                                     tablet->tablet_id());
1013
0
    }
1014
0
    return st;
1015
0
}
1016
1017
0
Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& tablet) {
1018
0
    using namespace std::chrono;
1019
0
    {
1020
0
        std::lock_guard lock(_compaction_mtx);
1021
        // Take a placeholder for full compaction
1022
0
        auto [_, success] = _submitted_full_compactions.emplace(tablet->tablet_id(), nullptr);
1023
0
        if (!success) {
1024
0
            return Status::AlreadyExist(
1025
0
                    "other full compaction or base compaction is submitted, tablet_id={}",
1026
0
                    tablet->tablet_id());
1027
0
        }
1028
0
    }
1029
    //auto compaction = std::make_shared<CloudFullCompaction>(tablet);
1030
0
    auto compaction = std::make_shared<CloudFullCompaction>(*this, tablet);
1031
0
    auto st = compaction->prepare_compact();
1032
0
    if (!st.ok()) {
1033
0
        long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
1034
0
        tablet->set_last_full_compaction_failure_time(now);
1035
0
        std::lock_guard lock(_compaction_mtx);
1036
0
        _submitted_full_compactions.erase(tablet->tablet_id());
1037
0
        return st;
1038
0
    }
1039
0
    {
1040
0
        std::lock_guard lock(_compaction_mtx);
1041
0
        _submitted_full_compactions[tablet->tablet_id()] = compaction;
1042
0
    }
1043
0
    st = _base_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() {
1044
0
        g_full_compaction_running_task_count << 1;
1045
0
        signal::tablet_id = tablet->tablet_id();
1046
0
        Defer defer {[&]() {
1047
0
            g_full_compaction_running_task_count << -1;
1048
0
            std::lock_guard lock(_compaction_mtx);
1049
0
            _submitted_full_compactions.erase(tablet->tablet_id());
1050
0
        }};
1051
0
        auto st = _request_tablet_global_compaction_lock(ReaderType::READER_FULL_COMPACTION, tablet,
1052
0
                                                         compaction);
1053
0
        if (!st.ok()) return;
1054
0
        st = compaction->execute_compact();
1055
0
        if (!st.ok()) {
1056
            // Error log has been output in `execute_compact`
1057
0
            long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
1058
0
            tablet->set_last_full_compaction_failure_time(now);
1059
0
        }
1060
0
        std::lock_guard lock(_compaction_mtx);
1061
0
        _executing_full_compactions.erase(tablet->tablet_id());
1062
0
    });
1063
0
    if (!st.ok()) {
1064
0
        std::lock_guard lock(_compaction_mtx);
1065
0
        _submitted_full_compactions.erase(tablet->tablet_id());
1066
0
        return Status::InternalError("failed to submit full compaction, tablet_id={}",
1067
0
                                     tablet->tablet_id());
1068
0
    }
1069
0
    return st;
1070
0
}
1071
1072
Status CloudStorageEngine::submit_compaction_task(const CloudTabletSPtr& tablet,
1073
0
                                                  CompactionType compaction_type) {
1074
0
    DCHECK(compaction_type == CompactionType::CUMULATIVE_COMPACTION ||
1075
0
           compaction_type == CompactionType::BASE_COMPACTION ||
1076
0
           compaction_type == CompactionType::FULL_COMPACTION);
1077
0
    switch (compaction_type) {
1078
0
    case CompactionType::BASE_COMPACTION:
1079
0
        RETURN_IF_ERROR(_submit_base_compaction_task(tablet));
1080
0
        return Status::OK();
1081
0
    case CompactionType::CUMULATIVE_COMPACTION:
1082
0
        RETURN_IF_ERROR(_submit_cumulative_compaction_task(tablet));
1083
0
        return Status::OK();
1084
0
    case CompactionType::FULL_COMPACTION:
1085
0
        RETURN_IF_ERROR(_submit_full_compaction_task(tablet));
1086
0
        return Status::OK();
1087
0
    default:
1088
0
        return Status::InternalError("unknown compaction type!");
1089
0
    }
1090
0
}
1091
1092
0
void CloudStorageEngine::_lease_compaction_thread_callback() {
1093
0
    while (!_stop_background_threads_latch.wait_for(
1094
0
            std::chrono::seconds(config::lease_compaction_interval_seconds))) {
1095
0
        std::vector<std::shared_ptr<CloudFullCompaction>> full_compactions;
1096
0
        std::vector<std::shared_ptr<CloudBaseCompaction>> base_compactions;
1097
0
        std::vector<std::shared_ptr<CloudCumulativeCompaction>> cumu_compactions;
1098
0
        std::vector<std::shared_ptr<CloudCompactionStopToken>> compation_stop_tokens;
1099
0
        std::vector<std::shared_ptr<CloudIndexChangeCompaction>> index_change_compations;
1100
0
        {
1101
0
            std::lock_guard lock(_compaction_mtx);
1102
0
            for (auto& [_, base] : _executing_base_compactions) {
1103
0
                if (base) { // `base` might be a nullptr placeholder
1104
0
                    base_compactions.push_back(base);
1105
0
                }
1106
0
            }
1107
0
            for (auto& [_, cumus] : _executing_cumu_compactions) {
1108
0
                for (auto& cumu : cumus) {
1109
0
                    cumu_compactions.push_back(cumu);
1110
0
                }
1111
0
            }
1112
0
            for (auto& [_, full] : _executing_full_compactions) {
1113
0
                if (full) {
1114
0
                    full_compactions.push_back(full);
1115
0
                }
1116
0
            }
1117
0
            for (auto& [_, stop_token] : _active_compaction_stop_tokens) {
1118
0
                if (stop_token) {
1119
0
                    compation_stop_tokens.push_back(stop_token);
1120
0
                }
1121
0
            }
1122
0
            for (auto& [_, index_change] : _submitted_index_change_cumu_compaction) {
1123
0
                if (index_change) {
1124
0
                    index_change_compations.push_back(index_change);
1125
0
                }
1126
0
            }
1127
0
            for (auto& [_, index_change] : _submitted_index_change_base_compaction) {
1128
0
                if (index_change) {
1129
0
                    index_change_compations.push_back(index_change);
1130
0
                }
1131
0
            }
1132
0
        }
1133
        // TODO(plat1ko): Support batch lease rpc
1134
0
        for (auto& stop_token : compation_stop_tokens) {
1135
0
            stop_token->do_lease();
1136
0
        }
1137
0
        for (auto& comp : full_compactions) {
1138
0
            comp->do_lease();
1139
0
        }
1140
0
        for (auto& comp : cumu_compactions) {
1141
0
            comp->do_lease();
1142
0
        }
1143
0
        for (auto& comp : base_compactions) {
1144
0
            comp->do_lease();
1145
0
        }
1146
0
        for (auto& comp : index_change_compations) {
1147
0
            comp->do_lease();
1148
0
        }
1149
0
    }
1150
0
}
1151
1152
0
void CloudStorageEngine::_check_tablet_delete_bitmap_score_callback() {
1153
0
    LOG(INFO) << "try to start check tablet delete bitmap score!";
1154
0
    while (!_stop_background_threads_latch.wait_for(
1155
0
            std::chrono::seconds(config::check_tablet_delete_bitmap_interval_seconds))) {
1156
0
        if (!config::enable_check_tablet_delete_bitmap_score) {
1157
0
            return;
1158
0
        }
1159
0
        uint64_t max_delete_bitmap_score = 0;
1160
0
        uint64_t max_base_rowset_delete_bitmap_score = 0;
1161
0
        tablet_mgr().get_topn_tablet_delete_bitmap_score(&max_delete_bitmap_score,
1162
0
                                                         &max_base_rowset_delete_bitmap_score);
1163
0
        if (max_delete_bitmap_score > 0) {
1164
0
            _tablet_max_delete_bitmap_score_metrics->set_value(max_delete_bitmap_score);
1165
0
        }
1166
0
        if (max_base_rowset_delete_bitmap_score > 0) {
1167
0
            _tablet_max_base_rowset_delete_bitmap_score_metrics->set_value(
1168
0
                    max_base_rowset_delete_bitmap_score);
1169
0
        }
1170
0
    }
1171
0
}
1172
1173
0
Status CloudStorageEngine::get_compaction_status_json(std::string* result) {
1174
0
    rapidjson::Document root;
1175
0
    root.SetObject();
1176
1177
0
    std::lock_guard lock(_compaction_mtx);
1178
    // cumu
1179
0
    std::string_view cumu = "CumulativeCompaction";
1180
0
    rapidjson::Value cumu_key;
1181
0
    cumu_key.SetString(cumu.data(), cast_set<uint32_t>(cumu.length()), root.GetAllocator());
1182
0
    rapidjson::Document cumu_arr;
1183
0
    cumu_arr.SetArray();
1184
0
    for (auto& [tablet_id, v] : _submitted_cumu_compactions) {
1185
0
        for (int i = 0; i < v.size(); ++i) {
1186
0
            cumu_arr.PushBack(tablet_id, root.GetAllocator());
1187
0
        }
1188
0
    }
1189
0
    root.AddMember(cumu_key, cumu_arr, root.GetAllocator());
1190
    // base
1191
0
    std::string_view base = "BaseCompaction";
1192
0
    rapidjson::Value base_key;
1193
0
    base_key.SetString(base.data(), cast_set<uint32_t>(base.length()), root.GetAllocator());
1194
0
    rapidjson::Document base_arr;
1195
0
    base_arr.SetArray();
1196
0
    for (auto& [tablet_id, _] : _submitted_base_compactions) {
1197
0
        base_arr.PushBack(tablet_id, root.GetAllocator());
1198
0
    }
1199
0
    root.AddMember(base_key, base_arr, root.GetAllocator());
1200
1201
0
    rapidjson::StringBuffer strbuf;
1202
0
    rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
1203
0
    root.Accept(writer);
1204
0
    *result = std::string(strbuf.GetString());
1205
0
    return Status::OK();
1206
0
}
1207
1208
std::shared_ptr<CloudCumulativeCompactionPolicy> CloudStorageEngine::cumu_compaction_policy(
1209
0
        std::string_view compaction_policy) {
1210
0
    if (!_cumulative_compaction_policies.contains(compaction_policy)) {
1211
0
        return _cumulative_compaction_policies.at(CUMULATIVE_SIZE_BASED_POLICY);
1212
0
    }
1213
0
    return _cumulative_compaction_policies.at(compaction_policy);
1214
0
}
1215
1216
Status CloudStorageEngine::register_compaction_stop_token(CloudTabletSPtr tablet,
1217
0
                                                          int64_t initiator) {
1218
0
    {
1219
0
        std::lock_guard lock(_compaction_mtx);
1220
0
        auto [_, success] = _active_compaction_stop_tokens.emplace(tablet->tablet_id(), nullptr);
1221
0
        if (!success) {
1222
0
            return Status::AlreadyExist("stop token already exists for tablet_id={}",
1223
0
                                        tablet->tablet_id());
1224
0
        }
1225
0
    }
1226
1227
0
    auto stop_token = std::make_shared<CloudCompactionStopToken>(*this, tablet, initiator);
1228
0
    auto st = stop_token->do_register();
1229
1230
0
    if (!st.ok()) {
1231
0
        std::lock_guard lock(_compaction_mtx);
1232
0
        _active_compaction_stop_tokens.erase(tablet->tablet_id());
1233
0
        return st;
1234
0
    }
1235
1236
0
    {
1237
0
        std::lock_guard lock(_compaction_mtx);
1238
0
        _active_compaction_stop_tokens[tablet->tablet_id()] = stop_token;
1239
0
    }
1240
0
    LOG_INFO(
1241
0
            "successfully register compaction stop token for tablet_id={}, "
1242
0
            "delete_bitmap_lock_initiator={}",
1243
0
            tablet->tablet_id(), initiator);
1244
0
    return st;
1245
0
}
1246
1247
0
Status CloudStorageEngine::unregister_compaction_stop_token(CloudTabletSPtr tablet, bool clear_ms) {
1248
0
    std::shared_ptr<CloudCompactionStopToken> stop_token;
1249
0
    {
1250
0
        std::lock_guard lock(_compaction_mtx);
1251
0
        if (auto it = _active_compaction_stop_tokens.find(tablet->tablet_id());
1252
0
            it != _active_compaction_stop_tokens.end()) {
1253
0
            stop_token = it->second;
1254
0
        } else {
1255
0
            return Status::NotFound("stop token not found for tablet_id={}", tablet->tablet_id());
1256
0
        }
1257
0
        _active_compaction_stop_tokens.erase(tablet->tablet_id());
1258
0
    }
1259
0
    LOG_INFO("successfully unregister compaction stop token for tablet_id={}", tablet->tablet_id());
1260
0
    if (stop_token && clear_ms) {
1261
0
        RETURN_IF_ERROR(stop_token->do_unregister());
1262
0
        LOG_INFO(
1263
0
                "successfully remove compaction stop token from MS for tablet_id={}, "
1264
0
                "delete_bitmap_lock_initiator={}",
1265
0
                tablet->tablet_id(), stop_token->initiator());
1266
0
    }
1267
0
    return Status::OK();
1268
0
}
1269
1270
0
Status CloudStorageEngine::_check_all_root_path_cluster_id() {
1271
    // Check if all root paths have the same cluster id
1272
0
    std::set<int32_t> cluster_ids;
1273
0
    for (const auto& path : _options.store_paths) {
1274
0
        auto cluster_id_path = fmt::format("{}/{}", path.path, CLUSTER_ID_PREFIX);
1275
0
        bool exists = false;
1276
0
        RETURN_IF_ERROR(io::global_local_filesystem()->exists(cluster_id_path, &exists));
1277
0
        if (exists) {
1278
0
            io::FileReaderSPtr reader;
1279
0
            RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cluster_id_path, &reader));
1280
0
            size_t fsize = reader->size();
1281
0
            if (fsize > 0) {
1282
0
                std::string content;
1283
0
                content.resize(fsize, '\0');
1284
0
                size_t bytes_read = 0;
1285
0
                RETURN_IF_ERROR(reader->read_at(0, {content.data(), fsize}, &bytes_read));
1286
0
                DCHECK_EQ(fsize, bytes_read);
1287
0
                int32_t tmp_cluster_id = std::stoi(content);
1288
0
                cluster_ids.insert(tmp_cluster_id);
1289
0
            }
1290
0
        }
1291
0
    }
1292
0
    _effective_cluster_id = config::cluster_id;
1293
    // first init
1294
0
    if (cluster_ids.empty()) {
1295
        // not set configured cluster id
1296
0
        if (_effective_cluster_id == -1) {
1297
0
            return Status::OK();
1298
0
        } else {
1299
            // If no cluster id file exists, use the configured cluster id
1300
0
            return set_cluster_id(_effective_cluster_id);
1301
0
        }
1302
0
    }
1303
0
    if (cluster_ids.size() > 1) {
1304
0
        return Status::InternalError(
1305
0
                "All root paths must have the same cluster id, but you have "
1306
0
                "different cluster ids: {}",
1307
0
                fmt::join(cluster_ids, ", "));
1308
0
    }
1309
0
    if (_effective_cluster_id != -1 && !cluster_ids.empty() &&
1310
0
        *cluster_ids.begin() != _effective_cluster_id) {
1311
0
        return Status::Corruption(
1312
0
                "multiple cluster ids is not equal. config::cluster_id={}, "
1313
0
                "storage path cluster_id={}",
1314
0
                _effective_cluster_id, *cluster_ids.begin());
1315
0
    }
1316
0
    return Status::OK();
1317
0
}
1318
1319
0
Status CloudStorageEngine::set_cluster_id(int32_t cluster_id) {
1320
0
    std::lock_guard<std::mutex> l(_store_lock);
1321
0
    for (auto& path : _options.store_paths) {
1322
0
        auto cluster_id_path = fmt::format("{}/{}", path.path, CLUSTER_ID_PREFIX);
1323
0
        bool exists = false;
1324
0
        RETURN_IF_ERROR(io::global_local_filesystem()->exists(cluster_id_path, &exists));
1325
0
        if (!exists) {
1326
0
            io::FileWriterPtr file_writer;
1327
0
            RETURN_IF_ERROR(
1328
0
                    io::global_local_filesystem()->create_file(cluster_id_path, &file_writer));
1329
0
            RETURN_IF_ERROR(file_writer->append(std::to_string(cluster_id)));
1330
0
            RETURN_IF_ERROR(file_writer->close());
1331
0
        }
1332
0
    }
1333
0
    _effective_cluster_id = cluster_id;
1334
0
    return Status::OK();
1335
0
}
1336
1337
#include "common/compile_check_end.h"
1338
} // namespace doris