Coverage Report

Created: 2026-03-19 15:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_storage_engine.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <chrono>
21
#include <memory>
22
#include <mutex>
23
24
//#include "cloud/cloud_cumulative_compaction.h"
25
//#include "cloud/cloud_base_compaction.h"
26
//#include "cloud/cloud_full_compaction.h"
27
#include "cloud/cloud_committed_rs_mgr.h"
28
#include "cloud/cloud_cumulative_compaction_policy.h"
29
#include "cloud/cloud_tablet.h"
30
#include "cloud/cloud_txn_delete_bitmap_cache.h"
31
#include "cloud/config.h"
32
#include "io/cache/block_file_cache_factory.h"
33
#include "storage/compaction/compaction.h"
34
#include "storage/storage_engine.h"
35
#include "storage/storage_policy.h"
36
#include "util/threadpool.h"
37
38
namespace doris {
39
namespace cloud {
40
class CloudMetaMgr;
41
}
42
namespace io {
43
class FileCacheBlockDownloader;
44
}
45
46
class CloudTabletMgr;
47
class CloudCumulativeCompaction;
48
class CloudBaseCompaction;
49
class CloudFullCompaction;
50
class TabletHotspot;
51
class CloudWarmUpManager;
52
class CloudCompactionStopToken;
53
class CloudSnapshotMgr;
54
class CloudIndexChangeCompaction;
55
56
class CloudStorageEngine final : public BaseStorageEngine {
57
public:
58
    CloudStorageEngine(const EngineOptions& options);
59
60
    ~CloudStorageEngine() override;
61
62
    Status open() override;
63
    void stop() override;
64
    bool stopped() override;
65
66
    /* Parameters:
67
     * - tablet_id: the id of tablet to get
68
     * - sync_stats: the stats of sync rowset
69
     * - force_use_only_cached: whether only use cached tablet meta
70
     * - cache_on_miss: whether cache the tablet meta when missing in cache
71
     */
72
    Result<BaseTabletSPtr> get_tablet(int64_t tablet_id, SyncRowsetStats* sync_stats = nullptr,
73
                                      bool force_use_only_cached = false,
74
                                      bool cache_on_miss = true) override;
75
76
    /*
77
     * Get the tablet meta for a specific tablet
78
     * Parameters:
79
     * - tablet_id: the id of tablet to get meta for
80
     * - tablet_meta: output TabletMeta shared pointer
81
     * - force_use_only_cached: whether only use cached tablet meta (return NotFound on miss)
82
     */
83
    Status get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta,
84
                           bool force_use_only_cached = false) override;
85
86
    Status start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr = nullptr) override;
87
88
    Status set_cluster_id(int32_t cluster_id) override;
89
90
43
    cloud::CloudMetaMgr& meta_mgr() const { return *_meta_mgr; }
91
92
10
    CloudTabletMgr& tablet_mgr() const { return *_tablet_mgr; }
93
94
0
    CloudSnapshotMgr& cloud_snapshot_mgr() { return *_cloud_snapshot_mgr; }
95
96
0
    CloudTxnDeleteBitmapCache& txn_delete_bitmap_cache() const { return *_txn_delete_bitmap_cache; }
97
98
0
    CloudCommittedRSMgr& committed_rs_mgr() const { return *_committed_rs_mgr; }
99
100
0
    ThreadPool& calc_tablet_delete_bitmap_task_thread_pool() const {
101
0
        return *_calc_tablet_delete_bitmap_task_thread_pool;
102
0
    }
103
0
    ThreadPool& sync_delete_bitmap_thread_pool() const { return *_sync_delete_bitmap_thread_pool; }
104
105
0
    std::optional<StorageResource> get_storage_resource(const std::string& vault_id) {
106
0
        VLOG_DEBUG << "Getting storage resource for vault_id: " << vault_id;
107
108
0
        bool synced = false;
109
0
        do {
110
0
            if (vault_id.empty() && latest_fs() != nullptr) {
111
0
                return StorageResource {latest_fs()};
112
0
            }
113
0
            if (auto storage_resource = doris::get_storage_resource(vault_id); storage_resource) {
114
0
                return storage_resource->first;
115
0
            }
116
0
            if (synced) {
117
0
                break;
118
0
            }
119
0
            sync_storage_vault();
120
0
            synced = true;
121
0
        } while (true);
122
123
0
        return std::nullopt;
124
0
    }
125
126
2
    io::RemoteFileSystemSPtr latest_fs() const {
127
2
        std::lock_guard lock(_latest_fs_mtx);
128
2
        return _latest_fs;
129
2
    }
130
131
0
    void set_latest_fs(const io::RemoteFileSystemSPtr& fs) {
132
0
        std::lock_guard lock(_latest_fs_mtx);
133
0
        _latest_fs = fs;
134
0
    }
135
136
    void get_cumu_compaction(int64_t tablet_id,
137
                             std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res);
138
139
    Status submit_compaction_task(const CloudTabletSPtr& tablet, CompactionType compaction_type);
140
141
    Status get_compaction_status_json(std::string* result);
142
143
0
    bool has_base_compaction(int64_t tablet_id) const {
144
0
        std::lock_guard lock(_compaction_mtx);
145
0
        return _submitted_base_compactions.contains(tablet_id);
146
0
    }
147
148
0
    bool has_cumu_compaction(int64_t tablet_id) const {
149
0
        std::lock_guard lock(_compaction_mtx);
150
0
        return _submitted_cumu_compactions.contains(tablet_id);
151
0
    }
152
153
0
    bool has_full_compaction(int64_t tablet_id) const {
154
0
        std::lock_guard lock(_compaction_mtx);
155
0
        return _submitted_full_compactions.contains(tablet_id);
156
0
    }
157
158
    std::shared_ptr<CloudCumulativeCompactionPolicy> cumu_compaction_policy(
159
            std::string_view compaction_policy);
160
161
    void sync_storage_vault();
162
163
0
    io::FileCacheBlockDownloader& file_cache_block_downloader() const {
164
0
        return *_file_cache_block_downloader;
165
0
    }
166
167
0
    CloudWarmUpManager& cloud_warm_up_manager() const { return *_cloud_warm_up_manager; }
168
169
0
    TabletHotspot& tablet_hotspot() const { return *_tablet_hotspot; }
170
171
0
    ThreadPool& sync_load_for_tablets_thread_pool() const {
172
0
        return *_sync_load_for_tablets_thread_pool;
173
0
    }
174
175
0
    ThreadPool& warmup_cache_async_thread_pool() const { return *_warmup_cache_async_thread_pool; }
176
177
    Status register_compaction_stop_token(CloudTabletSPtr tablet, int64_t initiator);
178
179
    Status unregister_compaction_stop_token(CloudTabletSPtr tablet, bool clear_ms);
180
181
    bool register_index_change_compaction(std::shared_ptr<CloudIndexChangeCompaction> compact,
182
                                          int64_t tablet_id, bool is_base_compact,
183
                                          std::string& err_reason);
184
185
    void unregister_index_change_compaction(int64_t tablet_id, bool is_base_compact);
186
187
230
    std::chrono::time_point<std::chrono::system_clock> startup_timepoint() const {
188
230
        return _startup_timepoint;
189
230
    }
190
191
#ifdef BE_TEST
192
37
    void set_startup_timepoint(const std::chrono::time_point<std::chrono::system_clock>& tp) {
193
37
        _startup_timepoint = tp;
194
37
    }
195
#endif
196
197
private:
198
    void _refresh_storage_vault_info_thread_callback();
199
    void _vacuum_stale_rowsets_thread_callback();
200
    void _sync_tablets_thread_callback();
201
    void _compaction_tasks_producer_callback();
202
    std::vector<CloudTabletSPtr> _generate_cloud_compaction_tasks(CompactionType compaction_type,
203
                                                                  bool check_score);
204
    Status _adjust_compaction_thread_num();
205
    Status _submit_base_compaction_task(const CloudTabletSPtr& tablet);
206
    Status _submit_cumulative_compaction_task(const CloudTabletSPtr& tablet);
207
    Status _submit_full_compaction_task(const CloudTabletSPtr& tablet);
208
    Status _request_tablet_global_compaction_lock(ReaderType compaction_type,
209
                                                  const CloudTabletSPtr& tablet,
210
                                                  std::shared_ptr<CloudCompactionMixin> compaction);
211
    Status _check_all_root_path_cluster_id();
212
    void _lease_compaction_thread_callback();
213
    void _check_tablet_delete_bitmap_score_callback();
214
215
    std::atomic_bool _stopped {false};
216
217
    std::unique_ptr<cloud::CloudMetaMgr> _meta_mgr;
218
    std::unique_ptr<CloudTabletMgr> _tablet_mgr;
219
    std::unique_ptr<CloudTxnDeleteBitmapCache> _txn_delete_bitmap_cache;
220
    std::unique_ptr<CloudCommittedRSMgr> _committed_rs_mgr;
221
    std::unique_ptr<ThreadPool> _calc_tablet_delete_bitmap_task_thread_pool;
222
    std::unique_ptr<ThreadPool> _sync_delete_bitmap_thread_pool;
223
224
    // Components for cache warmup
225
    std::unique_ptr<io::FileCacheBlockDownloader> _file_cache_block_downloader;
226
    // Depended by `FileCacheBlockDownloader`
227
    std::unique_ptr<CloudWarmUpManager> _cloud_warm_up_manager;
228
    std::unique_ptr<TabletHotspot> _tablet_hotspot;
229
    std::unique_ptr<ThreadPool> _sync_load_for_tablets_thread_pool;
230
    std::unique_ptr<ThreadPool> _warmup_cache_async_thread_pool;
231
    std::unique_ptr<CloudSnapshotMgr> _cloud_snapshot_mgr;
232
233
    // FileSystem with latest shared storage info, new data will be written to this fs.
234
    mutable std::mutex _latest_fs_mtx;
235
    io::RemoteFileSystemSPtr _latest_fs;
236
237
    std::vector<std::shared_ptr<Thread>> _bg_threads;
238
239
    // ATTN: Compactions in maps depend on `CloudTabletMgr` and `CloudMetaMgr`
240
    mutable std::mutex _compaction_mtx;
241
    mutable std::mutex _cumu_compaction_delay_mtx;
242
    // tablet_id -> submitted base compaction, guarded by `_compaction_mtx`
243
    std::unordered_map<int64_t, std::shared_ptr<CloudBaseCompaction>> _submitted_base_compactions;
244
    // tablet_id -> submitted full compaction, guarded by `_compaction_mtx`
245
    std::unordered_map<int64_t, std::shared_ptr<CloudFullCompaction>> _submitted_full_compactions;
246
    // Store tablets which are preparing cumu compaction, guarded by `_compaction_mtx`
247
    std::unordered_set<int64_t> _tablet_preparing_cumu_compaction;
248
    // tablet_id -> submitted cumu compactions, guarded by `_compaction_mtx`
249
    std::unordered_map<int64_t, std::vector<std::shared_ptr<CloudCumulativeCompaction>>>
250
            _submitted_cumu_compactions;
251
    // tablet_id -> active compaction stop tokens
252
    std::unordered_map<int64_t, std::shared_ptr<CloudCompactionStopToken>>
253
            _active_compaction_stop_tokens;
254
    // tablet_id -> executing cumu compactions, guarded by `_compaction_mtx`
255
    std::unordered_map<int64_t, std::vector<std::shared_ptr<CloudCumulativeCompaction>>>
256
            _executing_cumu_compactions;
257
    // tablet_id -> executing base compactions, guarded by `_compaction_mtx`
258
    std::unordered_map<int64_t, std::shared_ptr<CloudBaseCompaction>> _executing_base_compactions;
259
    // tablet_id -> executing full compactions, guarded by `_compaction_mtx`
260
    std::unordered_map<int64_t, std::shared_ptr<CloudFullCompaction>> _executing_full_compactions;
261
262
    // for index change compaction
263
    std::unordered_map<int64_t, std::shared_ptr<CloudIndexChangeCompaction>>
264
            _submitted_index_change_cumu_compaction;
265
    std::unordered_map<int64_t, std::shared_ptr<CloudIndexChangeCompaction>>
266
            _submitted_index_change_base_compaction;
267
268
    using CumuPolices =
269
            std::unordered_map<std::string_view, std::shared_ptr<CloudCumulativeCompactionPolicy>>;
270
    CumuPolices _cumulative_compaction_policies;
271
272
    std::atomic_bool first_sync_storage_vault {true};
273
274
    EngineOptions _options;
275
    std::mutex _store_lock;
276
277
    std::chrono::time_point<std::chrono::system_clock> _startup_timepoint;
278
};
279
280
} // namespace doris