Coverage Report

Created: 2026-03-12 17:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_storage_engine.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <chrono>
21
#include <memory>
22
#include <mutex>
23
24
//#include "cloud/cloud_cumulative_compaction.h"
25
//#include "cloud/cloud_base_compaction.h"
26
//#include "cloud/cloud_full_compaction.h"
27
#include "cloud/cloud_cumulative_compaction_policy.h"
28
#include "cloud/cloud_tablet.h"
29
#include "cloud/cloud_txn_delete_bitmap_cache.h"
30
#include "cloud/config.h"
31
#include "io/cache/block_file_cache_factory.h"
32
#include "storage/compaction/compaction.h"
33
#include "storage/storage_engine.h"
34
#include "storage/storage_policy.h"
35
#include "util/threadpool.h"
36
37
namespace doris {
38
namespace cloud {
39
class CloudMetaMgr;
40
}
41
namespace io {
42
class FileCacheBlockDownloader;
43
}
44
45
class CloudTabletMgr;
46
class CloudCumulativeCompaction;
47
class CloudBaseCompaction;
48
class CloudFullCompaction;
49
class TabletHotspot;
50
class CloudWarmUpManager;
51
class CloudCompactionStopToken;
52
class CloudSnapshotMgr;
53
class CloudIndexChangeCompaction;
54
55
class CloudStorageEngine final : public BaseStorageEngine {
56
public:
57
    CloudStorageEngine(const EngineOptions& options);
58
59
    ~CloudStorageEngine() override;
60
61
    Status open() override;
62
    void stop() override;
63
    bool stopped() override;
64
65
    /* Parameters:
66
     * - tablet_id: the id of tablet to get
67
     * - sync_stats: the stats of sync rowset
68
     * - force_use_only_cached: whether only use cached tablet meta
69
     * - cache_on_miss: whether cache the tablet meta when missing in cache
70
     */
71
    Result<BaseTabletSPtr> get_tablet(int64_t tablet_id, SyncRowsetStats* sync_stats = nullptr,
72
                                      bool force_use_only_cached = false,
73
                                      bool cache_on_miss = true) override;
74
75
    /*
76
     * Get the tablet meta for a specific tablet
77
     * Parameters:
78
     * - tablet_id: the id of tablet to get meta for
79
     * - tablet_meta: output TabletMeta shared pointer
80
     * - force_use_only_cached: whether only use cached tablet meta (return NotFound on miss)
81
     */
82
    Status get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta,
83
                           bool force_use_only_cached = false) override;
84
85
    Status start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr = nullptr) override;
86
87
    Status set_cluster_id(int32_t cluster_id) override;
88
89
1.08M
    cloud::CloudMetaMgr& meta_mgr() const { return *_meta_mgr; }
90
91
48.8k
    CloudTabletMgr& tablet_mgr() const { return *_tablet_mgr; }
92
93
0
    CloudSnapshotMgr& cloud_snapshot_mgr() { return *_cloud_snapshot_mgr; }
94
95
458k
    CloudTxnDeleteBitmapCache& txn_delete_bitmap_cache() const { return *_txn_delete_bitmap_cache; }
96
97
9.11k
    ThreadPool& calc_tablet_delete_bitmap_task_thread_pool() const {
98
9.11k
        return *_calc_tablet_delete_bitmap_task_thread_pool;
99
9.11k
    }
100
0
    ThreadPool& sync_delete_bitmap_thread_pool() const { return *_sync_delete_bitmap_thread_pool; }
101
102
193k
    std::optional<StorageResource> get_storage_resource(const std::string& vault_id) {
103
18.4E
        VLOG_DEBUG << "Getting storage resource for vault_id: " << vault_id;
104
105
193k
        bool synced = false;
106
193k
        do {
107
194k
            if (vault_id.empty() && latest_fs() != nullptr) {
108
194k
                return StorageResource {latest_fs()};
109
194k
            }
110
18.4E
            if (auto storage_resource = doris::get_storage_resource(vault_id); storage_resource) {
111
1
                return storage_resource->first;
112
1
            }
113
18.4E
            if (synced) {
114
0
                break;
115
0
            }
116
18.4E
            sync_storage_vault();
117
18.4E
            synced = true;
118
18.4E
        } while (true);
119
120
18.4E
        return std::nullopt;
121
193k
    }
122
123
388k
    io::RemoteFileSystemSPtr latest_fs() const {
124
388k
        std::lock_guard lock(_latest_fs_mtx);
125
388k
        return _latest_fs;
126
388k
    }
127
128
1
    void set_latest_fs(const io::RemoteFileSystemSPtr& fs) {
129
1
        std::lock_guard lock(_latest_fs_mtx);
130
1
        _latest_fs = fs;
131
1
    }
132
133
    void get_cumu_compaction(int64_t tablet_id,
134
                             std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res);
135
136
    Status submit_compaction_task(const CloudTabletSPtr& tablet, CompactionType compaction_type);
137
138
    Status get_compaction_status_json(std::string* result);
139
140
435
    bool has_base_compaction(int64_t tablet_id) const {
141
435
        std::lock_guard lock(_compaction_mtx);
142
435
        return _submitted_base_compactions.contains(tablet_id);
143
435
    }
144
145
472
    bool has_cumu_compaction(int64_t tablet_id) const {
146
472
        std::lock_guard lock(_compaction_mtx);
147
472
        return _submitted_cumu_compactions.contains(tablet_id);
148
472
    }
149
150
435
    bool has_full_compaction(int64_t tablet_id) const {
151
435
        std::lock_guard lock(_compaction_mtx);
152
435
        return _submitted_full_compactions.contains(tablet_id);
153
435
    }
154
155
    std::shared_ptr<CloudCumulativeCompactionPolicy> cumu_compaction_policy(
156
            std::string_view compaction_policy);
157
158
    void sync_storage_vault();
159
160
0
    io::FileCacheBlockDownloader& file_cache_block_downloader() const {
161
0
        return *_file_cache_block_downloader;
162
0
    }
163
164
252k
    CloudWarmUpManager& cloud_warm_up_manager() const { return *_cloud_warm_up_manager; }
165
166
1.80M
    TabletHotspot& tablet_hotspot() const { return *_tablet_hotspot; }
167
168
0
    ThreadPool& sync_load_for_tablets_thread_pool() const {
169
0
        return *_sync_load_for_tablets_thread_pool;
170
0
    }
171
172
0
    ThreadPool& warmup_cache_async_thread_pool() const { return *_warmup_cache_async_thread_pool; }
173
174
    Status register_compaction_stop_token(CloudTabletSPtr tablet, int64_t initiator);
175
176
    Status unregister_compaction_stop_token(CloudTabletSPtr tablet, bool clear_ms);
177
178
    bool register_index_change_compaction(std::shared_ptr<CloudIndexChangeCompaction> compact,
179
                                          int64_t tablet_id, bool is_base_compact,
180
                                          std::string& err_reason);
181
182
    void unregister_index_change_compaction(int64_t tablet_id, bool is_base_compact);
183
184
260
    std::chrono::time_point<std::chrono::system_clock> startup_timepoint() const {
185
260
        return _startup_timepoint;
186
260
    }
187
188
#ifdef BE_TEST
189
    void set_startup_timepoint(const std::chrono::time_point<std::chrono::system_clock>& tp) {
190
        _startup_timepoint = tp;
191
    }
192
#endif
193
194
private:
195
    void _refresh_storage_vault_info_thread_callback();
196
    void _vacuum_stale_rowsets_thread_callback();
197
    void _sync_tablets_thread_callback();
198
    void _compaction_tasks_producer_callback();
199
    std::vector<CloudTabletSPtr> _generate_cloud_compaction_tasks(CompactionType compaction_type,
200
                                                                  bool check_score);
201
    Status _adjust_compaction_thread_num();
202
    Status _submit_base_compaction_task(const CloudTabletSPtr& tablet);
203
    Status _submit_cumulative_compaction_task(const CloudTabletSPtr& tablet);
204
    Status _submit_full_compaction_task(const CloudTabletSPtr& tablet);
205
    Status _request_tablet_global_compaction_lock(ReaderType compaction_type,
206
                                                  const CloudTabletSPtr& tablet,
207
                                                  std::shared_ptr<CloudCompactionMixin> compaction);
208
    Status _check_all_root_path_cluster_id();
209
    void _lease_compaction_thread_callback();
210
    void _check_tablet_delete_bitmap_score_callback();
211
212
    std::atomic_bool _stopped {false};
213
214
    std::unique_ptr<cloud::CloudMetaMgr> _meta_mgr;
215
    std::unique_ptr<CloudTabletMgr> _tablet_mgr;
216
    std::unique_ptr<CloudTxnDeleteBitmapCache> _txn_delete_bitmap_cache;
217
    std::unique_ptr<ThreadPool> _calc_tablet_delete_bitmap_task_thread_pool;
218
    std::unique_ptr<ThreadPool> _sync_delete_bitmap_thread_pool;
219
220
    // Components for cache warmup
221
    std::unique_ptr<io::FileCacheBlockDownloader> _file_cache_block_downloader;
222
    // Depended by `FileCacheBlockDownloader`
223
    std::unique_ptr<CloudWarmUpManager> _cloud_warm_up_manager;
224
    std::unique_ptr<TabletHotspot> _tablet_hotspot;
225
    std::unique_ptr<ThreadPool> _sync_load_for_tablets_thread_pool;
226
    std::unique_ptr<ThreadPool> _warmup_cache_async_thread_pool;
227
    std::unique_ptr<CloudSnapshotMgr> _cloud_snapshot_mgr;
228
229
    // FileSystem with latest shared storage info, new data will be written to this fs.
230
    mutable std::mutex _latest_fs_mtx;
231
    io::RemoteFileSystemSPtr _latest_fs;
232
233
    std::vector<std::shared_ptr<Thread>> _bg_threads;
234
235
    // ATTN: Compactions in maps depend on `CloudTabletMgr` and `CloudMetaMgr`
236
    mutable std::mutex _compaction_mtx;
237
    mutable std::mutex _cumu_compaction_delay_mtx;
238
    // tablet_id -> submitted base compaction, guarded by `_compaction_mtx`
239
    std::unordered_map<int64_t, std::shared_ptr<CloudBaseCompaction>> _submitted_base_compactions;
240
    // tablet_id -> submitted full compaction, guarded by `_compaction_mtx`
241
    std::unordered_map<int64_t, std::shared_ptr<CloudFullCompaction>> _submitted_full_compactions;
242
    // Store tablets which are preparing cumu compaction, guarded by `_compaction_mtx`
243
    std::unordered_set<int64_t> _tablet_preparing_cumu_compaction;
244
    // tablet_id -> submitted cumu compactions, guarded by `_compaction_mtx`
245
    std::unordered_map<int64_t, std::vector<std::shared_ptr<CloudCumulativeCompaction>>>
246
            _submitted_cumu_compactions;
247
    // tablet_id -> active compaction stop tokens
248
    std::unordered_map<int64_t, std::shared_ptr<CloudCompactionStopToken>>
249
            _active_compaction_stop_tokens;
250
    // tablet_id -> executing cumu compactions, guarded by `_compaction_mtx`
251
    std::unordered_map<int64_t, std::vector<std::shared_ptr<CloudCumulativeCompaction>>>
252
            _executing_cumu_compactions;
253
    // tablet_id -> executing base compactions, guarded by `_compaction_mtx`
254
    std::unordered_map<int64_t, std::shared_ptr<CloudBaseCompaction>> _executing_base_compactions;
255
    // tablet_id -> executing full compactions, guarded by `_compaction_mtx`
256
    std::unordered_map<int64_t, std::shared_ptr<CloudFullCompaction>> _executing_full_compactions;
257
258
    // for index change compaction
259
    std::unordered_map<int64_t, std::shared_ptr<CloudIndexChangeCompaction>>
260
            _submitted_index_change_cumu_compaction;
261
    std::unordered_map<int64_t, std::shared_ptr<CloudIndexChangeCompaction>>
262
            _submitted_index_change_base_compaction;
263
264
    using CumuPolices =
265
            std::unordered_map<std::string_view, std::shared_ptr<CloudCumulativeCompactionPolicy>>;
266
    CumuPolices _cumulative_compaction_policies;
267
268
    std::atomic_bool first_sync_storage_vault {true};
269
270
    EngineOptions _options;
271
    std::mutex _store_lock;
272
273
    std::chrono::time_point<std::chrono::system_clock> _startup_timepoint;
274
};
275
276
} // namespace doris