Coverage Report

Created: 2026-04-10 12:12

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_storage_engine.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <chrono>
21
#include <memory>
22
#include <mutex>
23
24
//#include "cloud/cloud_cumulative_compaction.h"
25
//#include "cloud/cloud_base_compaction.h"
26
//#include "cloud/cloud_full_compaction.h"
27
#include "cloud/cloud_committed_rs_mgr.h"
28
#include "cloud/cloud_cumulative_compaction_policy.h"
29
#include "cloud/cloud_tablet.h"
30
#include "cloud/cloud_txn_delete_bitmap_cache.h"
31
#include "cloud/config.h"
32
#include "io/cache/block_file_cache_factory.h"
33
#include "storage/compaction/compaction.h"
34
#include "storage/storage_engine.h"
35
#include "storage/storage_policy.h"
36
#include "util/threadpool.h"
37
38
namespace doris {
39
namespace cloud {
40
class CloudMetaMgr;
41
}
42
namespace io {
43
class FileCacheBlockDownloader;
44
}
45
46
class CloudTabletMgr;
47
class CloudCumulativeCompaction;
48
class CloudBaseCompaction;
49
class CloudFullCompaction;
50
class TabletHotspot;
51
class CloudWarmUpManager;
52
class CloudCompactionStopToken;
53
class CloudSnapshotMgr;
54
class CloudIndexChangeCompaction;
55
56
class CloudStorageEngine final : public BaseStorageEngine {
57
public:
58
    CloudStorageEngine(const EngineOptions& options);
59
60
    ~CloudStorageEngine() override;
61
62
    Status open() override;
63
    void stop() override;
64
    bool stopped() override;
65
66
    /* Parameters:
67
     * - tablet_id: the id of tablet to get
68
     * - sync_stats: the stats of sync rowset
69
     * - force_use_only_cached: whether only use cached tablet meta
70
     * - cache_on_miss: whether cache the tablet meta when missing in cache
71
     */
72
    Result<BaseTabletSPtr> get_tablet(int64_t tablet_id, SyncRowsetStats* sync_stats = nullptr,
73
                                      bool force_use_only_cached = false,
74
                                      bool cache_on_miss = true) override;
75
76
    /*
77
     * Get the tablet meta for a specific tablet
78
     * Parameters:
79
     * - tablet_id: the id of tablet to get meta for
80
     * - tablet_meta: output TabletMeta shared pointer
81
     * - force_use_only_cached: whether only use cached tablet meta (return NotFound on miss)
82
     */
83
    Status get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta,
84
                           bool force_use_only_cached = false) override;
85
86
    Status start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr = nullptr) override;
87
88
    Status set_cluster_id(int32_t cluster_id) override;
89
90
696k
    cloud::CloudMetaMgr& meta_mgr() const { return *_meta_mgr; }
91
92
103k
    CloudTabletMgr& tablet_mgr() const { return *_tablet_mgr; }
93
94
0
    CloudSnapshotMgr& cloud_snapshot_mgr() { return *_cloud_snapshot_mgr; }
95
96
310k
    CloudTxnDeleteBitmapCache& txn_delete_bitmap_cache() const { return *_txn_delete_bitmap_cache; }
97
98
240k
    CloudCommittedRSMgr& committed_rs_mgr() const { return *_committed_rs_mgr; }
99
100
8.64k
    ThreadPool& calc_tablet_delete_bitmap_task_thread_pool() const {
101
8.64k
        return *_calc_tablet_delete_bitmap_task_thread_pool;
102
8.64k
    }
103
0
    ThreadPool& sync_delete_bitmap_thread_pool() const { return *_sync_delete_bitmap_thread_pool; }
104
105
183k
    std::optional<StorageResource> get_storage_resource(const std::string& vault_id) {
106
183k
        VLOG_DEBUG << "Getting storage resource for vault_id: " << vault_id;
107
108
183k
        bool synced = false;
109
183k
        do {
110
184k
            if (vault_id.empty() && latest_fs() != nullptr) {
111
184k
                return StorageResource {latest_fs()};
112
184k
            }
113
18.4E
            if (auto storage_resource = doris::get_storage_resource(vault_id); storage_resource) {
114
1
                return storage_resource->first;
115
1
            }
116
18.4E
            if (synced) {
117
0
                break;
118
0
            }
119
18.4E
            sync_storage_vault();
120
18.4E
            synced = true;
121
18.4E
        } while (true);
122
123
18.4E
        return std::nullopt;
124
183k
    }
125
126
367k
    io::RemoteFileSystemSPtr latest_fs() const {
127
367k
        std::lock_guard lock(_latest_fs_mtx);
128
367k
        return _latest_fs;
129
367k
    }
130
131
1
    void set_latest_fs(const io::RemoteFileSystemSPtr& fs) {
132
1
        std::lock_guard lock(_latest_fs_mtx);
133
1
        _latest_fs = fs;
134
1
    }
135
136
    void get_cumu_compaction(int64_t tablet_id,
137
                             std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res);
138
139
    Status submit_compaction_task(const CloudTabletSPtr& tablet, CompactionType compaction_type,
140
                                  int trigger_method = 0);
141
142
    Status get_compaction_status_json(std::string* result);
143
144
516
    bool has_base_compaction(int64_t tablet_id) const {
145
516
        std::lock_guard lock(_compaction_mtx);
146
516
        return _submitted_base_compactions.contains(tablet_id);
147
516
    }
148
149
587
    bool has_cumu_compaction(int64_t tablet_id) const {
150
587
        std::lock_guard lock(_compaction_mtx);
151
587
        return _submitted_cumu_compactions.contains(tablet_id);
152
587
    }
153
154
516
    bool has_full_compaction(int64_t tablet_id) const {
155
516
        std::lock_guard lock(_compaction_mtx);
156
516
        return _submitted_full_compactions.contains(tablet_id);
157
516
    }
158
159
    std::shared_ptr<CloudCumulativeCompactionPolicy> cumu_compaction_policy(
160
            std::string_view compaction_policy);
161
162
    void sync_storage_vault();
163
164
55.5k
    io::FileCacheBlockDownloader& file_cache_block_downloader() const {
165
55.5k
        return *_file_cache_block_downloader;
166
55.5k
    }
167
168
118k
    CloudWarmUpManager& cloud_warm_up_manager() const { return *_cloud_warm_up_manager; }
169
170
1.44M
    TabletHotspot& tablet_hotspot() const { return *_tablet_hotspot; }
171
172
0
    ThreadPool& sync_load_for_tablets_thread_pool() const {
173
0
        return *_sync_load_for_tablets_thread_pool;
174
0
    }
175
176
0
    ThreadPool& warmup_cache_async_thread_pool() const { return *_warmup_cache_async_thread_pool; }
177
178
    Status register_compaction_stop_token(CloudTabletSPtr tablet, int64_t initiator);
179
180
    Status unregister_compaction_stop_token(CloudTabletSPtr tablet, bool clear_ms);
181
182
    bool register_index_change_compaction(std::shared_ptr<CloudIndexChangeCompaction> compact,
183
                                          int64_t tablet_id, bool is_base_compact,
184
                                          std::string& err_reason);
185
186
    void unregister_index_change_compaction(int64_t tablet_id, bool is_base_compact);
187
188
356
    std::chrono::time_point<std::chrono::system_clock> startup_timepoint() const {
189
356
        return _startup_timepoint;
190
356
    }
191
192
#ifdef BE_TEST
193
    void set_startup_timepoint(const std::chrono::time_point<std::chrono::system_clock>& tp) {
194
        _startup_timepoint = tp;
195
    }
196
#endif
197
198
private:
199
    void _refresh_storage_vault_info_thread_callback();
200
    void _vacuum_stale_rowsets_thread_callback();
201
    void _sync_tablets_thread_callback();
202
    void _compaction_tasks_producer_callback();
203
    std::vector<CloudTabletSPtr> _generate_cloud_compaction_tasks(CompactionType compaction_type,
204
                                                                  bool check_score);
205
    Status _adjust_compaction_thread_num();
206
    Status _submit_base_compaction_task(const CloudTabletSPtr& tablet, int trigger_method = 0);
207
    Status _submit_cumulative_compaction_task(const CloudTabletSPtr& tablet,
208
                                              int trigger_method = 0);
209
    Status _submit_full_compaction_task(const CloudTabletSPtr& tablet, int trigger_method = 0);
210
    Status _request_tablet_global_compaction_lock(ReaderType compaction_type,
211
                                                  const CloudTabletSPtr& tablet,
212
                                                  std::shared_ptr<CloudCompactionMixin> compaction);
213
    Status _check_all_root_path_cluster_id();
214
    void _lease_compaction_thread_callback();
215
    void _check_tablet_delete_bitmap_score_callback();
216
217
    std::atomic_bool _stopped {false};
218
219
    std::unique_ptr<cloud::CloudMetaMgr> _meta_mgr;
220
    std::unique_ptr<CloudTabletMgr> _tablet_mgr;
221
    std::unique_ptr<CloudTxnDeleteBitmapCache> _txn_delete_bitmap_cache;
222
    std::unique_ptr<CloudCommittedRSMgr> _committed_rs_mgr;
223
    std::unique_ptr<ThreadPool> _calc_tablet_delete_bitmap_task_thread_pool;
224
    std::unique_ptr<ThreadPool> _sync_delete_bitmap_thread_pool;
225
226
    // Components for cache warmup
227
    std::unique_ptr<io::FileCacheBlockDownloader> _file_cache_block_downloader;
228
    // Depended by `FileCacheBlockDownloader`
229
    std::unique_ptr<CloudWarmUpManager> _cloud_warm_up_manager;
230
    std::unique_ptr<TabletHotspot> _tablet_hotspot;
231
    std::unique_ptr<ThreadPool> _sync_load_for_tablets_thread_pool;
232
    std::unique_ptr<ThreadPool> _warmup_cache_async_thread_pool;
233
    std::unique_ptr<CloudSnapshotMgr> _cloud_snapshot_mgr;
234
235
    // FileSystem with latest shared storage info, new data will be written to this fs.
236
    mutable std::mutex _latest_fs_mtx;
237
    io::RemoteFileSystemSPtr _latest_fs;
238
239
    std::vector<std::shared_ptr<Thread>> _bg_threads;
240
241
    // ATTN: Compactions in maps depend on `CloudTabletMgr` and `CloudMetaMgr`
242
    mutable std::mutex _compaction_mtx;
243
    mutable std::mutex _cumu_compaction_delay_mtx;
244
    // tablet_id -> submitted base compaction, guarded by `_compaction_mtx`
245
    std::unordered_map<int64_t, std::shared_ptr<CloudBaseCompaction>> _submitted_base_compactions;
246
    // tablet_id -> submitted full compaction, guarded by `_compaction_mtx`
247
    std::unordered_map<int64_t, std::shared_ptr<CloudFullCompaction>> _submitted_full_compactions;
248
    // Store tablets which are preparing cumu compaction, guarded by `_compaction_mtx`
249
    std::unordered_set<int64_t> _tablet_preparing_cumu_compaction;
250
    // tablet_id -> submitted cumu compactions, guarded by `_compaction_mtx`
251
    std::unordered_map<int64_t, std::vector<std::shared_ptr<CloudCumulativeCompaction>>>
252
            _submitted_cumu_compactions;
253
    // tablet_id -> active compaction stop tokens
254
    std::unordered_map<int64_t, std::shared_ptr<CloudCompactionStopToken>>
255
            _active_compaction_stop_tokens;
256
    // tablet_id -> executing cumu compactions, guarded by `_compaction_mtx`
257
    std::unordered_map<int64_t, std::vector<std::shared_ptr<CloudCumulativeCompaction>>>
258
            _executing_cumu_compactions;
259
    // tablet_id -> executing base compactions, guarded by `_compaction_mtx`
260
    std::unordered_map<int64_t, std::shared_ptr<CloudBaseCompaction>> _executing_base_compactions;
261
    // tablet_id -> executing full compactions, guarded by `_compaction_mtx`
262
    std::unordered_map<int64_t, std::shared_ptr<CloudFullCompaction>> _executing_full_compactions;
263
264
    // for index change compaction
265
    std::unordered_map<int64_t, std::shared_ptr<CloudIndexChangeCompaction>>
266
            _submitted_index_change_cumu_compaction;
267
    std::unordered_map<int64_t, std::shared_ptr<CloudIndexChangeCompaction>>
268
            _submitted_index_change_base_compaction;
269
270
    using CumuPolices =
271
            std::unordered_map<std::string_view, std::shared_ptr<CloudCumulativeCompactionPolicy>>;
272
    CumuPolices _cumulative_compaction_policies;
273
274
    std::atomic_bool first_sync_storage_vault {true};
275
276
    EngineOptions _options;
277
    std::mutex _store_lock;
278
279
    std::chrono::time_point<std::chrono::system_clock> _startup_timepoint;
280
};
281
282
} // namespace doris