be/src/cloud/cloud_storage_engine.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <chrono> |
21 | | #include <memory> |
22 | | #include <mutex> |
23 | | |
24 | | //#include "cloud/cloud_cumulative_compaction.h" |
25 | | //#include "cloud/cloud_base_compaction.h" |
26 | | //#include "cloud/cloud_full_compaction.h" |
27 | | #include "cloud/cloud_committed_rs_mgr.h" |
28 | | #include "cloud/cloud_cumulative_compaction_policy.h" |
29 | | #include "cloud/cloud_tablet.h" |
30 | | #include "cloud/cloud_txn_delete_bitmap_cache.h" |
31 | | #include "cloud/config.h" |
32 | | #include "io/cache/block_file_cache_factory.h" |
33 | | #include "storage/compaction/compaction.h" |
34 | | #include "storage/storage_engine.h" |
35 | | #include "storage/storage_policy.h" |
36 | | #include "util/threadpool.h" |
37 | | |
38 | | namespace doris { |
39 | | namespace cloud { |
40 | | class CloudMetaMgr; |
41 | | } |
42 | | namespace io { |
43 | | class FileCacheBlockDownloader; |
44 | | } |
45 | | |
46 | | class CloudTabletMgr; |
47 | | class CloudCumulativeCompaction; |
48 | | class CloudBaseCompaction; |
49 | | class CloudFullCompaction; |
50 | | class TabletHotspot; |
51 | | class CloudWarmUpManager; |
52 | | class CloudCompactionStopToken; |
53 | | class CloudSnapshotMgr; |
54 | | class CloudIndexChangeCompaction; |
55 | | |
56 | | class CloudStorageEngine final : public BaseStorageEngine { |
57 | | public: |
58 | | CloudStorageEngine(const EngineOptions& options); |
59 | | |
60 | | ~CloudStorageEngine() override; |
61 | | |
62 | | Status open() override; |
63 | | void stop() override; |
64 | | bool stopped() override; |
65 | | |
66 | | /* Parameters: |
67 | | * - tablet_id: the id of tablet to get |
68 | | * - sync_stats: the stats of sync rowset |
69 | | * - force_use_only_cached: whether only use cached tablet meta |
70 | | * - cache_on_miss: whether cache the tablet meta when missing in cache |
71 | | */ |
72 | | Result<BaseTabletSPtr> get_tablet(int64_t tablet_id, SyncRowsetStats* sync_stats = nullptr, |
73 | | bool force_use_only_cached = false, |
74 | | bool cache_on_miss = true) override; |
75 | | |
76 | | /* |
77 | | * Get the tablet meta for a specific tablet |
78 | | * Parameters: |
79 | | * - tablet_id: the id of tablet to get meta for |
80 | | * - tablet_meta: output TabletMeta shared pointer |
81 | | * - force_use_only_cached: whether only use cached tablet meta (return NotFound on miss) |
82 | | */ |
83 | | Status get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta, |
84 | | bool force_use_only_cached = false) override; |
85 | | |
86 | | Status start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr = nullptr) override; |
87 | | |
88 | | Status set_cluster_id(int32_t cluster_id) override; |
89 | | |
90 | 43 | cloud::CloudMetaMgr& meta_mgr() const { return *_meta_mgr; } |
91 | | |
92 | 10 | CloudTabletMgr& tablet_mgr() const { return *_tablet_mgr; } |
93 | | |
94 | 0 | CloudSnapshotMgr& cloud_snapshot_mgr() { return *_cloud_snapshot_mgr; } |
95 | | |
96 | 0 | CloudTxnDeleteBitmapCache& txn_delete_bitmap_cache() const { return *_txn_delete_bitmap_cache; } |
97 | | |
98 | 0 | CloudCommittedRSMgr& committed_rs_mgr() const { return *_committed_rs_mgr; } |
99 | | |
100 | 0 | ThreadPool& calc_tablet_delete_bitmap_task_thread_pool() const { |
101 | 0 | return *_calc_tablet_delete_bitmap_task_thread_pool; |
102 | 0 | } |
103 | 0 | ThreadPool& sync_delete_bitmap_thread_pool() const { return *_sync_delete_bitmap_thread_pool; } |
104 | | |
105 | 0 | std::optional<StorageResource> get_storage_resource(const std::string& vault_id) { |
106 | 0 | VLOG_DEBUG << "Getting storage resource for vault_id: " << vault_id; |
107 | |
|
108 | 0 | bool synced = false; |
109 | 0 | do { |
110 | 0 | if (vault_id.empty() && latest_fs() != nullptr) { |
111 | 0 | return StorageResource {latest_fs()}; |
112 | 0 | } |
113 | 0 | if (auto storage_resource = doris::get_storage_resource(vault_id); storage_resource) { |
114 | 0 | return storage_resource->first; |
115 | 0 | } |
116 | 0 | if (synced) { |
117 | 0 | break; |
118 | 0 | } |
119 | 0 | sync_storage_vault(); |
120 | 0 | synced = true; |
121 | 0 | } while (true); |
122 | | |
123 | 0 | return std::nullopt; |
124 | 0 | } |
125 | | |
126 | 2 | io::RemoteFileSystemSPtr latest_fs() const { |
127 | 2 | std::lock_guard lock(_latest_fs_mtx); |
128 | 2 | return _latest_fs; |
129 | 2 | } |
130 | | |
131 | 0 | void set_latest_fs(const io::RemoteFileSystemSPtr& fs) { |
132 | 0 | std::lock_guard lock(_latest_fs_mtx); |
133 | 0 | _latest_fs = fs; |
134 | 0 | } |
135 | | |
136 | | void get_cumu_compaction(int64_t tablet_id, |
137 | | std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res); |
138 | | |
139 | | Status submit_compaction_task(const CloudTabletSPtr& tablet, CompactionType compaction_type); |
140 | | |
141 | | Status get_compaction_status_json(std::string* result); |
142 | | |
143 | 0 | bool has_base_compaction(int64_t tablet_id) const { |
144 | 0 | std::lock_guard lock(_compaction_mtx); |
145 | 0 | return _submitted_base_compactions.contains(tablet_id); |
146 | 0 | } |
147 | | |
148 | 0 | bool has_cumu_compaction(int64_t tablet_id) const { |
149 | 0 | std::lock_guard lock(_compaction_mtx); |
150 | 0 | return _submitted_cumu_compactions.contains(tablet_id); |
151 | 0 | } |
152 | | |
153 | 0 | bool has_full_compaction(int64_t tablet_id) const { |
154 | 0 | std::lock_guard lock(_compaction_mtx); |
155 | 0 | return _submitted_full_compactions.contains(tablet_id); |
156 | 0 | } |
157 | | |
158 | | std::shared_ptr<CloudCumulativeCompactionPolicy> cumu_compaction_policy( |
159 | | std::string_view compaction_policy); |
160 | | |
161 | | void sync_storage_vault(); |
162 | | |
163 | 0 | io::FileCacheBlockDownloader& file_cache_block_downloader() const { |
164 | 0 | return *_file_cache_block_downloader; |
165 | 0 | } |
166 | | |
167 | 0 | CloudWarmUpManager& cloud_warm_up_manager() const { return *_cloud_warm_up_manager; } |
168 | | |
169 | 0 | TabletHotspot& tablet_hotspot() const { return *_tablet_hotspot; } |
170 | | |
171 | 0 | ThreadPool& sync_load_for_tablets_thread_pool() const { |
172 | 0 | return *_sync_load_for_tablets_thread_pool; |
173 | 0 | } |
174 | | |
175 | 0 | ThreadPool& warmup_cache_async_thread_pool() const { return *_warmup_cache_async_thread_pool; } |
176 | | |
177 | | Status register_compaction_stop_token(CloudTabletSPtr tablet, int64_t initiator); |
178 | | |
179 | | Status unregister_compaction_stop_token(CloudTabletSPtr tablet, bool clear_ms); |
180 | | |
181 | | bool register_index_change_compaction(std::shared_ptr<CloudIndexChangeCompaction> compact, |
182 | | int64_t tablet_id, bool is_base_compact, |
183 | | std::string& err_reason); |
184 | | |
185 | | void unregister_index_change_compaction(int64_t tablet_id, bool is_base_compact); |
186 | | |
187 | 230 | std::chrono::time_point<std::chrono::system_clock> startup_timepoint() const { |
188 | 230 | return _startup_timepoint; |
189 | 230 | } |
190 | | |
191 | | #ifdef BE_TEST |
192 | 37 | void set_startup_timepoint(const std::chrono::time_point<std::chrono::system_clock>& tp) { |
193 | 37 | _startup_timepoint = tp; |
194 | 37 | } |
195 | | #endif |
196 | | |
197 | | private: |
198 | | void _refresh_storage_vault_info_thread_callback(); |
199 | | void _vacuum_stale_rowsets_thread_callback(); |
200 | | void _sync_tablets_thread_callback(); |
201 | | void _compaction_tasks_producer_callback(); |
202 | | std::vector<CloudTabletSPtr> _generate_cloud_compaction_tasks(CompactionType compaction_type, |
203 | | bool check_score); |
204 | | Status _adjust_compaction_thread_num(); |
205 | | Status _submit_base_compaction_task(const CloudTabletSPtr& tablet); |
206 | | Status _submit_cumulative_compaction_task(const CloudTabletSPtr& tablet); |
207 | | Status _submit_full_compaction_task(const CloudTabletSPtr& tablet); |
208 | | Status _request_tablet_global_compaction_lock(ReaderType compaction_type, |
209 | | const CloudTabletSPtr& tablet, |
210 | | std::shared_ptr<CloudCompactionMixin> compaction); |
211 | | Status _check_all_root_path_cluster_id(); |
212 | | void _lease_compaction_thread_callback(); |
213 | | void _check_tablet_delete_bitmap_score_callback(); |
214 | | |
215 | | std::atomic_bool _stopped {false}; |
216 | | |
217 | | std::unique_ptr<cloud::CloudMetaMgr> _meta_mgr; |
218 | | std::unique_ptr<CloudTabletMgr> _tablet_mgr; |
219 | | std::unique_ptr<CloudTxnDeleteBitmapCache> _txn_delete_bitmap_cache; |
220 | | std::unique_ptr<CloudCommittedRSMgr> _committed_rs_mgr; |
221 | | std::unique_ptr<ThreadPool> _calc_tablet_delete_bitmap_task_thread_pool; |
222 | | std::unique_ptr<ThreadPool> _sync_delete_bitmap_thread_pool; |
223 | | |
224 | | // Components for cache warmup |
225 | | std::unique_ptr<io::FileCacheBlockDownloader> _file_cache_block_downloader; |
226 | | // Depended by `FileCacheBlockDownloader` |
227 | | std::unique_ptr<CloudWarmUpManager> _cloud_warm_up_manager; |
228 | | std::unique_ptr<TabletHotspot> _tablet_hotspot; |
229 | | std::unique_ptr<ThreadPool> _sync_load_for_tablets_thread_pool; |
230 | | std::unique_ptr<ThreadPool> _warmup_cache_async_thread_pool; |
231 | | std::unique_ptr<CloudSnapshotMgr> _cloud_snapshot_mgr; |
232 | | |
233 | | // FileSystem with latest shared storage info, new data will be written to this fs. |
234 | | mutable std::mutex _latest_fs_mtx; |
235 | | io::RemoteFileSystemSPtr _latest_fs; |
236 | | |
237 | | std::vector<std::shared_ptr<Thread>> _bg_threads; |
238 | | |
239 | | // ATTN: Compactions in maps depend on `CloudTabletMgr` and `CloudMetaMgr` |
240 | | mutable std::mutex _compaction_mtx; |
241 | | mutable std::mutex _cumu_compaction_delay_mtx; |
242 | | // tablet_id -> submitted base compaction, guarded by `_compaction_mtx` |
243 | | std::unordered_map<int64_t, std::shared_ptr<CloudBaseCompaction>> _submitted_base_compactions; |
244 | | // tablet_id -> submitted full compaction, guarded by `_compaction_mtx` |
245 | | std::unordered_map<int64_t, std::shared_ptr<CloudFullCompaction>> _submitted_full_compactions; |
246 | | // Store tablets which are preparing cumu compaction, guarded by `_compaction_mtx` |
247 | | std::unordered_set<int64_t> _tablet_preparing_cumu_compaction; |
248 | | // tablet_id -> submitted cumu compactions, guarded by `_compaction_mtx` |
249 | | std::unordered_map<int64_t, std::vector<std::shared_ptr<CloudCumulativeCompaction>>> |
250 | | _submitted_cumu_compactions; |
251 | | // tablet_id -> active compaction stop tokens |
252 | | std::unordered_map<int64_t, std::shared_ptr<CloudCompactionStopToken>> |
253 | | _active_compaction_stop_tokens; |
254 | | // tablet_id -> executing cumu compactions, guarded by `_compaction_mtx` |
255 | | std::unordered_map<int64_t, std::vector<std::shared_ptr<CloudCumulativeCompaction>>> |
256 | | _executing_cumu_compactions; |
257 | | // tablet_id -> executing base compactions, guarded by `_compaction_mtx` |
258 | | std::unordered_map<int64_t, std::shared_ptr<CloudBaseCompaction>> _executing_base_compactions; |
259 | | // tablet_id -> executing full compactions, guarded by `_compaction_mtx` |
260 | | std::unordered_map<int64_t, std::shared_ptr<CloudFullCompaction>> _executing_full_compactions; |
261 | | |
262 | | // for index change compaction |
263 | | std::unordered_map<int64_t, std::shared_ptr<CloudIndexChangeCompaction>> |
264 | | _submitted_index_change_cumu_compaction; |
265 | | std::unordered_map<int64_t, std::shared_ptr<CloudIndexChangeCompaction>> |
266 | | _submitted_index_change_base_compaction; |
267 | | |
268 | | using CumuPolices = |
269 | | std::unordered_map<std::string_view, std::shared_ptr<CloudCumulativeCompactionPolicy>>; |
270 | | CumuPolices _cumulative_compaction_policies; |
271 | | |
272 | | std::atomic_bool first_sync_storage_vault {true}; |
273 | | |
274 | | EngineOptions _options; |
275 | | std::mutex _store_lock; |
276 | | |
277 | | std::chrono::time_point<std::chrono::system_clock> _startup_timepoint; |
278 | | }; |
279 | | |
280 | | } // namespace doris |