be/src/cloud/cloud_storage_engine.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <chrono> |
21 | | #include <memory> |
22 | | #include <mutex> |
23 | | |
24 | | //#include "cloud/cloud_cumulative_compaction.h" |
25 | | //#include "cloud/cloud_base_compaction.h" |
26 | | //#include "cloud/cloud_full_compaction.h" |
27 | | #include "cloud/cloud_cumulative_compaction_policy.h" |
28 | | #include "cloud/cloud_tablet.h" |
29 | | #include "cloud/cloud_txn_delete_bitmap_cache.h" |
30 | | #include "cloud/config.h" |
31 | | #include "io/cache/block_file_cache_factory.h" |
32 | | #include "storage/compaction/compaction.h" |
33 | | #include "storage/storage_engine.h" |
34 | | #include "storage/storage_policy.h" |
35 | | #include "util/threadpool.h" |
36 | | |
37 | | namespace doris { |
38 | | namespace cloud { |
39 | | class CloudMetaMgr; |
40 | | } |
41 | | namespace io { |
42 | | class FileCacheBlockDownloader; |
43 | | } |
44 | | |
45 | | class CloudTabletMgr; |
46 | | class CloudCumulativeCompaction; |
47 | | class CloudBaseCompaction; |
48 | | class CloudFullCompaction; |
49 | | class TabletHotspot; |
50 | | class CloudWarmUpManager; |
51 | | class CloudCompactionStopToken; |
52 | | class CloudSnapshotMgr; |
53 | | class CloudIndexChangeCompaction; |
54 | | |
55 | | class CloudStorageEngine final : public BaseStorageEngine { |
56 | | public: |
57 | | CloudStorageEngine(const EngineOptions& options); |
58 | | |
59 | | ~CloudStorageEngine() override; |
60 | | |
61 | | Status open() override; |
62 | | void stop() override; |
63 | | bool stopped() override; |
64 | | |
65 | | /* Parameters: |
66 | | * - tablet_id: the id of tablet to get |
67 | | * - sync_stats: the stats of sync rowset |
68 | | * - force_use_only_cached: whether only use cached tablet meta |
69 | | * - cache_on_miss: whether cache the tablet meta when missing in cache |
70 | | */ |
71 | | Result<BaseTabletSPtr> get_tablet(int64_t tablet_id, SyncRowsetStats* sync_stats = nullptr, |
72 | | bool force_use_only_cached = false, |
73 | | bool cache_on_miss = true) override; |
74 | | |
75 | | /* |
76 | | * Get the tablet meta for a specific tablet |
77 | | * Parameters: |
78 | | * - tablet_id: the id of tablet to get meta for |
79 | | * - tablet_meta: output TabletMeta shared pointer |
80 | | * - force_use_only_cached: whether only use cached tablet meta (return NotFound on miss) |
81 | | */ |
82 | | Status get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta, |
83 | | bool force_use_only_cached = false) override; |
84 | | |
85 | | Status start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr = nullptr) override; |
86 | | |
87 | | Status set_cluster_id(int32_t cluster_id) override; |
88 | | |
89 | 1.08M | cloud::CloudMetaMgr& meta_mgr() const { return *_meta_mgr; } |
90 | | |
91 | 48.8k | CloudTabletMgr& tablet_mgr() const { return *_tablet_mgr; } |
92 | | |
93 | 0 | CloudSnapshotMgr& cloud_snapshot_mgr() { return *_cloud_snapshot_mgr; } |
94 | | |
95 | 458k | CloudTxnDeleteBitmapCache& txn_delete_bitmap_cache() const { return *_txn_delete_bitmap_cache; } |
96 | | |
97 | 9.11k | ThreadPool& calc_tablet_delete_bitmap_task_thread_pool() const { |
98 | 9.11k | return *_calc_tablet_delete_bitmap_task_thread_pool; |
99 | 9.11k | } |
100 | 0 | ThreadPool& sync_delete_bitmap_thread_pool() const { return *_sync_delete_bitmap_thread_pool; } |
101 | | |
102 | 193k | std::optional<StorageResource> get_storage_resource(const std::string& vault_id) { |
103 | 18.4E | VLOG_DEBUG << "Getting storage resource for vault_id: " << vault_id; |
104 | | |
105 | 193k | bool synced = false; |
106 | 193k | do { |
107 | 194k | if (vault_id.empty() && latest_fs() != nullptr) { |
108 | 194k | return StorageResource {latest_fs()}; |
109 | 194k | } |
110 | 18.4E | if (auto storage_resource = doris::get_storage_resource(vault_id); storage_resource) { |
111 | 1 | return storage_resource->first; |
112 | 1 | } |
113 | 18.4E | if (synced) { |
114 | 0 | break; |
115 | 0 | } |
116 | 18.4E | sync_storage_vault(); |
117 | 18.4E | synced = true; |
118 | 18.4E | } while (true); |
119 | | |
120 | 18.4E | return std::nullopt; |
121 | 193k | } |
122 | | |
123 | 388k | io::RemoteFileSystemSPtr latest_fs() const { |
124 | 388k | std::lock_guard lock(_latest_fs_mtx); |
125 | 388k | return _latest_fs; |
126 | 388k | } |
127 | | |
128 | 1 | void set_latest_fs(const io::RemoteFileSystemSPtr& fs) { |
129 | 1 | std::lock_guard lock(_latest_fs_mtx); |
130 | 1 | _latest_fs = fs; |
131 | 1 | } |
132 | | |
133 | | void get_cumu_compaction(int64_t tablet_id, |
134 | | std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res); |
135 | | |
136 | | Status submit_compaction_task(const CloudTabletSPtr& tablet, CompactionType compaction_type); |
137 | | |
138 | | Status get_compaction_status_json(std::string* result); |
139 | | |
140 | 435 | bool has_base_compaction(int64_t tablet_id) const { |
141 | 435 | std::lock_guard lock(_compaction_mtx); |
142 | 435 | return _submitted_base_compactions.contains(tablet_id); |
143 | 435 | } |
144 | | |
145 | 472 | bool has_cumu_compaction(int64_t tablet_id) const { |
146 | 472 | std::lock_guard lock(_compaction_mtx); |
147 | 472 | return _submitted_cumu_compactions.contains(tablet_id); |
148 | 472 | } |
149 | | |
150 | 435 | bool has_full_compaction(int64_t tablet_id) const { |
151 | 435 | std::lock_guard lock(_compaction_mtx); |
152 | 435 | return _submitted_full_compactions.contains(tablet_id); |
153 | 435 | } |
154 | | |
155 | | std::shared_ptr<CloudCumulativeCompactionPolicy> cumu_compaction_policy( |
156 | | std::string_view compaction_policy); |
157 | | |
158 | | void sync_storage_vault(); |
159 | | |
160 | 0 | io::FileCacheBlockDownloader& file_cache_block_downloader() const { |
161 | 0 | return *_file_cache_block_downloader; |
162 | 0 | } |
163 | | |
164 | 252k | CloudWarmUpManager& cloud_warm_up_manager() const { return *_cloud_warm_up_manager; } |
165 | | |
166 | 1.80M | TabletHotspot& tablet_hotspot() const { return *_tablet_hotspot; } |
167 | | |
168 | 0 | ThreadPool& sync_load_for_tablets_thread_pool() const { |
169 | 0 | return *_sync_load_for_tablets_thread_pool; |
170 | 0 | } |
171 | | |
172 | 0 | ThreadPool& warmup_cache_async_thread_pool() const { return *_warmup_cache_async_thread_pool; } |
173 | | |
174 | | Status register_compaction_stop_token(CloudTabletSPtr tablet, int64_t initiator); |
175 | | |
176 | | Status unregister_compaction_stop_token(CloudTabletSPtr tablet, bool clear_ms); |
177 | | |
178 | | bool register_index_change_compaction(std::shared_ptr<CloudIndexChangeCompaction> compact, |
179 | | int64_t tablet_id, bool is_base_compact, |
180 | | std::string& err_reason); |
181 | | |
182 | | void unregister_index_change_compaction(int64_t tablet_id, bool is_base_compact); |
183 | | |
184 | 260 | std::chrono::time_point<std::chrono::system_clock> startup_timepoint() const { |
185 | 260 | return _startup_timepoint; |
186 | 260 | } |
187 | | |
188 | | #ifdef BE_TEST |
189 | | void set_startup_timepoint(const std::chrono::time_point<std::chrono::system_clock>& tp) { |
190 | | _startup_timepoint = tp; |
191 | | } |
192 | | #endif |
193 | | |
194 | | private: |
195 | | void _refresh_storage_vault_info_thread_callback(); |
196 | | void _vacuum_stale_rowsets_thread_callback(); |
197 | | void _sync_tablets_thread_callback(); |
198 | | void _compaction_tasks_producer_callback(); |
199 | | std::vector<CloudTabletSPtr> _generate_cloud_compaction_tasks(CompactionType compaction_type, |
200 | | bool check_score); |
201 | | Status _adjust_compaction_thread_num(); |
202 | | Status _submit_base_compaction_task(const CloudTabletSPtr& tablet); |
203 | | Status _submit_cumulative_compaction_task(const CloudTabletSPtr& tablet); |
204 | | Status _submit_full_compaction_task(const CloudTabletSPtr& tablet); |
205 | | Status _request_tablet_global_compaction_lock(ReaderType compaction_type, |
206 | | const CloudTabletSPtr& tablet, |
207 | | std::shared_ptr<CloudCompactionMixin> compaction); |
208 | | Status _check_all_root_path_cluster_id(); |
209 | | void _lease_compaction_thread_callback(); |
210 | | void _check_tablet_delete_bitmap_score_callback(); |
211 | | |
212 | | std::atomic_bool _stopped {false}; |
213 | | |
214 | | std::unique_ptr<cloud::CloudMetaMgr> _meta_mgr; |
215 | | std::unique_ptr<CloudTabletMgr> _tablet_mgr; |
216 | | std::unique_ptr<CloudTxnDeleteBitmapCache> _txn_delete_bitmap_cache; |
217 | | std::unique_ptr<ThreadPool> _calc_tablet_delete_bitmap_task_thread_pool; |
218 | | std::unique_ptr<ThreadPool> _sync_delete_bitmap_thread_pool; |
219 | | |
220 | | // Components for cache warmup |
221 | | std::unique_ptr<io::FileCacheBlockDownloader> _file_cache_block_downloader; |
222 | | // Depended by `FileCacheBlockDownloader` |
223 | | std::unique_ptr<CloudWarmUpManager> _cloud_warm_up_manager; |
224 | | std::unique_ptr<TabletHotspot> _tablet_hotspot; |
225 | | std::unique_ptr<ThreadPool> _sync_load_for_tablets_thread_pool; |
226 | | std::unique_ptr<ThreadPool> _warmup_cache_async_thread_pool; |
227 | | std::unique_ptr<CloudSnapshotMgr> _cloud_snapshot_mgr; |
228 | | |
229 | | // FileSystem with latest shared storage info, new data will be written to this fs. |
230 | | mutable std::mutex _latest_fs_mtx; |
231 | | io::RemoteFileSystemSPtr _latest_fs; |
232 | | |
233 | | std::vector<std::shared_ptr<Thread>> _bg_threads; |
234 | | |
235 | | // ATTN: Compactions in maps depend on `CloudTabletMgr` and `CloudMetaMgr` |
236 | | mutable std::mutex _compaction_mtx; |
237 | | mutable std::mutex _cumu_compaction_delay_mtx; |
238 | | // tablet_id -> submitted base compaction, guarded by `_compaction_mtx` |
239 | | std::unordered_map<int64_t, std::shared_ptr<CloudBaseCompaction>> _submitted_base_compactions; |
240 | | // tablet_id -> submitted full compaction, guarded by `_compaction_mtx` |
241 | | std::unordered_map<int64_t, std::shared_ptr<CloudFullCompaction>> _submitted_full_compactions; |
242 | | // Store tablets which are preparing cumu compaction, guarded by `_compaction_mtx` |
243 | | std::unordered_set<int64_t> _tablet_preparing_cumu_compaction; |
244 | | // tablet_id -> submitted cumu compactions, guarded by `_compaction_mtx` |
245 | | std::unordered_map<int64_t, std::vector<std::shared_ptr<CloudCumulativeCompaction>>> |
246 | | _submitted_cumu_compactions; |
247 | | // tablet_id -> active compaction stop tokens |
248 | | std::unordered_map<int64_t, std::shared_ptr<CloudCompactionStopToken>> |
249 | | _active_compaction_stop_tokens; |
250 | | // tablet_id -> executing cumu compactions, guarded by `_compaction_mtx` |
251 | | std::unordered_map<int64_t, std::vector<std::shared_ptr<CloudCumulativeCompaction>>> |
252 | | _executing_cumu_compactions; |
253 | | // tablet_id -> executing base compactions, guarded by `_compaction_mtx` |
254 | | std::unordered_map<int64_t, std::shared_ptr<CloudBaseCompaction>> _executing_base_compactions; |
255 | | // tablet_id -> executing full compactions, guarded by `_compaction_mtx` |
256 | | std::unordered_map<int64_t, std::shared_ptr<CloudFullCompaction>> _executing_full_compactions; |
257 | | |
258 | | // for index change compaction |
259 | | std::unordered_map<int64_t, std::shared_ptr<CloudIndexChangeCompaction>> |
260 | | _submitted_index_change_cumu_compaction; |
261 | | std::unordered_map<int64_t, std::shared_ptr<CloudIndexChangeCompaction>> |
262 | | _submitted_index_change_base_compaction; |
263 | | |
264 | | using CumuPolices = |
265 | | std::unordered_map<std::string_view, std::shared_ptr<CloudCumulativeCompactionPolicy>>; |
266 | | CumuPolices _cumulative_compaction_policies; |
267 | | |
268 | | std::atomic_bool first_sync_storage_vault {true}; |
269 | | |
270 | | EngineOptions _options; |
271 | | std::mutex _store_lock; |
272 | | |
273 | | std::chrono::time_point<std::chrono::system_clock> _startup_timepoint; |
274 | | }; |
275 | | |
276 | | } // namespace doris |