be/src/cloud/cloud_storage_engine.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "cloud/cloud_storage_engine.h" |
19 | | |
20 | | #include <bvar/reducer.h> |
21 | | #include <gen_cpp/PlanNodes_types.h> |
22 | | #include <gen_cpp/cloud.pb.h> |
23 | | #include <gen_cpp/olap_file.pb.h> |
24 | | #include <rapidjson/document.h> |
25 | | #include <rapidjson/encodings.h> |
26 | | #include <rapidjson/prettywriter.h> |
27 | | #include <rapidjson/stringbuffer.h> |
28 | | |
29 | | #include <algorithm> |
30 | | #include <memory> |
31 | | #include <variant> |
32 | | |
33 | | #include "cloud/cloud_base_compaction.h" |
34 | | #include "cloud/cloud_compaction_stop_token.h" |
35 | | #include "cloud/cloud_cumulative_compaction.h" |
36 | | #include "cloud/cloud_cumulative_compaction_policy.h" |
37 | | #include "cloud/cloud_full_compaction.h" |
38 | | #include "cloud/cloud_index_change_compaction.h" |
39 | | #include "cloud/cloud_meta_mgr.h" |
40 | | #include "cloud/cloud_snapshot_mgr.h" |
41 | | #include "cloud/cloud_tablet_hotspot.h" |
42 | | #include "cloud/cloud_tablet_mgr.h" |
43 | | #include "cloud/cloud_txn_delete_bitmap_cache.h" |
44 | | #include "cloud/cloud_warm_up_manager.h" |
45 | | #include "cloud/config.h" |
46 | | #include "common/config.h" |
47 | | #include "common/signal_handler.h" |
48 | | #include "common/status.h" |
49 | | #include "core/assert_cast.h" |
50 | | #include "io/cache/block_file_cache_downloader.h" |
51 | | #include "io/cache/block_file_cache_factory.h" |
52 | | #include "io/cache/file_cache_common.h" |
53 | | #include "io/fs/file_system.h" |
54 | | #include "io/fs/hdfs_file_system.h" |
55 | | #include "io/fs/s3_file_system.h" |
56 | | #include "io/hdfs_util.h" |
57 | | #include "io/io_common.h" |
58 | | #include "load/memtable/memtable_flush_executor.h" |
59 | | #include "runtime/memory/cache_manager.h" |
60 | | #include "storage/compaction/cumulative_compaction_policy.h" |
61 | | #include "storage/compaction/cumulative_compaction_time_series_policy.h" |
62 | | #include "storage/storage_policy.h" |
63 | | #include "util/parse_util.h" |
64 | | #include "util/time.h" |
65 | | |
66 | | namespace doris { |
67 | | #include "common/compile_check_begin.h" |
68 | | |
69 | | using namespace std::literals; |
70 | | |
71 | | bvar::Adder<uint64_t> g_base_compaction_running_task_count("base_compaction_running_task_count"); |
72 | | bvar::Adder<uint64_t> g_full_compaction_running_task_count("full_compaction_running_task_count"); |
73 | | bvar::Adder<uint64_t> g_cumu_compaction_running_task_count( |
74 | | "cumulative_compaction_running_task_count"); |
75 | | |
76 | 0 | int get_cumu_thread_num() { |
77 | 0 | if (config::max_cumu_compaction_threads > 0) { |
78 | 0 | return config::max_cumu_compaction_threads; |
79 | 0 | } |
80 | | |
81 | 0 | int num_cores = doris::CpuInfo::num_cores(); |
82 | 0 | return std::min(std::max(int(num_cores * config::cumu_compaction_thread_num_factor), 2), 20); |
83 | 0 | } |
84 | | |
85 | 0 | int get_base_thread_num() { |
86 | 0 | if (config::max_base_compaction_threads > 0) { |
87 | 0 | return config::max_base_compaction_threads; |
88 | 0 | } |
89 | | |
90 | 0 | int num_cores = doris::CpuInfo::num_cores(); |
91 | 0 | return std::min(std::max(int(num_cores * config::base_compaction_thread_num_factor), 1), 10); |
92 | 0 | } |
93 | | |
94 | | CloudStorageEngine::CloudStorageEngine(const EngineOptions& options) |
95 | 131 | : BaseStorageEngine(Type::CLOUD, options.backend_uid), |
96 | 131 | _meta_mgr(std::make_unique<cloud::CloudMetaMgr>()), |
97 | 131 | _tablet_mgr(std::make_unique<CloudTabletMgr>(*this)), |
98 | 131 | _options(options) { |
99 | 131 | _cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY] = |
100 | 131 | std::make_shared<CloudSizeBasedCumulativeCompactionPolicy>(); |
101 | 131 | _cumulative_compaction_policies[CUMULATIVE_TIME_SERIES_POLICY] = |
102 | 131 | std::make_shared<CloudTimeSeriesCumulativeCompactionPolicy>(); |
103 | 131 | _startup_timepoint = std::chrono::system_clock::now(); |
104 | 131 | } |
105 | | |
106 | 131 | CloudStorageEngine::~CloudStorageEngine() { |
107 | 131 | stop(); |
108 | 131 | } |
109 | | |
110 | | static Status vault_process_error(std::string_view id, |
111 | 0 | std::variant<S3Conf, cloud::HdfsVaultInfo>& vault, Status err) { |
112 | 0 | std::stringstream ss; |
113 | 0 | std::visit( |
114 | 0 | [&]<typename T>(T& val) { |
115 | 0 | if constexpr (std::is_same_v<T, S3Conf>) { |
116 | 0 | ss << val.to_string(); |
117 | 0 | } else if constexpr (std::is_same_v<T, cloud::HdfsVaultInfo>) { |
118 | 0 | val.SerializeToOstream(&ss); |
119 | 0 | } |
120 | 0 | }, Unexecuted instantiation: cloud_storage_engine.cpp:_ZZN5dorisL19vault_process_errorESt17basic_string_viewIcSt11char_traitsIcEERSt7variantIJNS_6S3ConfENS_5cloud13HdfsVaultInfoEEENS_6StatusEENK3$_0clIS5_EEDaRT_ Unexecuted instantiation: cloud_storage_engine.cpp:_ZZN5dorisL19vault_process_errorESt17basic_string_viewIcSt11char_traitsIcEERSt7variantIJNS_6S3ConfENS_5cloud13HdfsVaultInfoEEENS_6StatusEENK3$_0clIS7_EEDaRT_ |
121 | 0 | vault); |
122 | 0 | return Status::IOError("Invalid vault, id {}, err {}, detail conf {}", id, err, ss.str()); |
123 | 0 | } |
124 | | |
125 | | struct VaultCreateFSVisitor { |
126 | | VaultCreateFSVisitor(const std::string& id, const cloud::StorageVaultPB_PathFormat& path_format, |
127 | | bool check_fs) |
128 | 0 | : id(id), path_format(path_format), check_fs(check_fs) {} |
129 | 0 | Status operator()(const S3Conf& s3_conf) const { |
130 | 0 | LOG(INFO) << "get new s3 info: " << s3_conf.to_string() << " resource_id=" << id |
131 | 0 | << " check_fs: " << check_fs; |
132 | |
|
133 | 0 | auto fs = DORIS_TRY(io::S3FileSystem::create(s3_conf, id)); |
134 | 0 | if (check_fs && !s3_conf.client_conf.role_arn.empty()) { |
135 | 0 | bool res = false; |
136 | | // just check connectivity, not care object if exist |
137 | 0 | auto st = fs->exists("not_exist_object", &res); |
138 | 0 | if (!st.ok()) { |
139 | 0 | LOG(FATAL) << "failed to check s3 fs, resource_id: " << id << " st: " << st |
140 | 0 | << "s3_conf: " << s3_conf.to_string() |
141 | 0 | << "add enable_check_storage_vault=false to be.conf to skip the check"; |
142 | 0 | } |
143 | 0 | } |
144 | |
|
145 | 0 | put_storage_resource(id, {std::move(fs), path_format}, 0); |
146 | 0 | LOG_INFO("successfully create s3 vault, vault id {}", id); |
147 | 0 | return Status::OK(); |
148 | 0 | } |
149 | | |
150 | | // TODO(ByteYue): Make sure enable_java_support is on |
151 | 0 | Status operator()(const cloud::HdfsVaultInfo& vault) const { |
152 | 0 | auto hdfs_params = io::to_hdfs_params(vault); |
153 | 0 | auto fs = DORIS_TRY(io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name, id, |
154 | 0 | nullptr, vault.prefix())); |
155 | 0 | put_storage_resource(id, {std::move(fs), path_format}, 0); |
156 | 0 | LOG_INFO("successfully create hdfs vault, vault id {}", id); |
157 | 0 | return Status::OK(); |
158 | 0 | } |
159 | | |
160 | | const std::string& id; |
161 | | const cloud::StorageVaultPB_PathFormat& path_format; |
162 | | bool check_fs; |
163 | | }; |
164 | | |
165 | | struct RefreshFSVaultVisitor { |
166 | | RefreshFSVaultVisitor(const std::string& id, io::FileSystemSPtr fs, |
167 | | const cloud::StorageVaultPB_PathFormat& path_format) |
168 | 0 | : id(id), fs(std::move(fs)), path_format(path_format) {} |
169 | | |
170 | 0 | Status operator()(const S3Conf& s3_conf) const { |
171 | 0 | DCHECK_EQ(fs->type(), io::FileSystemType::S3) << id; |
172 | 0 | auto s3_fs = std::static_pointer_cast<io::S3FileSystem>(fs); |
173 | 0 | auto client_holder = s3_fs->client_holder(); |
174 | 0 | auto st = client_holder->reset(s3_conf.client_conf); |
175 | 0 | if (!st.ok()) { |
176 | 0 | LOG(WARNING) << "failed to update s3 fs, resource_id=" << id << ": " << st; |
177 | 0 | } |
178 | 0 | return st; |
179 | 0 | } |
180 | | |
181 | 0 | Status operator()(const cloud::HdfsVaultInfo& vault) const { |
182 | 0 | auto hdfs_params = io::to_hdfs_params(vault); |
183 | 0 | auto hdfs_fs = |
184 | 0 | DORIS_TRY(io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name, id, nullptr, |
185 | 0 | vault.has_prefix() ? vault.prefix() : "")); |
186 | 0 | auto hdfs = std::static_pointer_cast<io::HdfsFileSystem>(hdfs_fs); |
187 | 0 | put_storage_resource(id, {std::move(hdfs), path_format}, 0); |
188 | 0 | return Status::OK(); |
189 | 0 | } |
190 | | |
191 | | const std::string& id; |
192 | | io::FileSystemSPtr fs; |
193 | | const cloud::StorageVaultPB_PathFormat& path_format; |
194 | | }; |
195 | | |
196 | 0 | Status CloudStorageEngine::open() { |
197 | 0 | sync_storage_vault(); |
198 | | |
199 | | // TODO(plat1ko): DeleteBitmapTxnManager |
200 | |
|
201 | 0 | _memtable_flush_executor = std::make_unique<MemTableFlushExecutor>(); |
202 | | // Use file cache disks number |
203 | 0 | _memtable_flush_executor->init( |
204 | 0 | cast_set<int32_t>(io::FileCacheFactory::instance()->get_cache_instance_size())); |
205 | |
|
206 | 0 | _calc_delete_bitmap_executor = std::make_unique<CalcDeleteBitmapExecutor>(); |
207 | 0 | _calc_delete_bitmap_executor->init(config::calc_delete_bitmap_max_thread); |
208 | |
|
209 | 0 | _calc_delete_bitmap_executor_for_load = std::make_unique<CalcDeleteBitmapExecutor>(); |
210 | 0 | _calc_delete_bitmap_executor_for_load->init( |
211 | 0 | config::calc_delete_bitmap_for_load_max_thread > 0 |
212 | 0 | ? config::calc_delete_bitmap_for_load_max_thread |
213 | 0 | : std::max(1, CpuInfo::num_cores() / 2)); |
214 | | |
215 | | // The default cache is set to 100MB, use memory limit to dynamic adjustment |
216 | 0 | bool is_percent = false; |
217 | 0 | int64_t delete_bitmap_agg_cache_cache_limit = |
218 | 0 | ParseUtil::parse_mem_spec(config::delete_bitmap_dynamic_agg_cache_limit, |
219 | 0 | MemInfo::mem_limit(), MemInfo::physical_mem(), &is_percent); |
220 | 0 | _txn_delete_bitmap_cache = std::make_unique<CloudTxnDeleteBitmapCache>( |
221 | 0 | delete_bitmap_agg_cache_cache_limit > config::delete_bitmap_agg_cache_capacity |
222 | 0 | ? delete_bitmap_agg_cache_cache_limit |
223 | 0 | : config::delete_bitmap_agg_cache_capacity); |
224 | 0 | RETURN_IF_ERROR(_txn_delete_bitmap_cache->init()); |
225 | | |
226 | 0 | _file_cache_block_downloader = std::make_unique<io::FileCacheBlockDownloader>(*this); |
227 | |
|
228 | 0 | _cloud_warm_up_manager = std::make_unique<CloudWarmUpManager>(*this); |
229 | |
|
230 | 0 | _tablet_hotspot = std::make_unique<TabletHotspot>(); |
231 | |
|
232 | 0 | _cloud_snapshot_mgr = std::make_unique<CloudSnapshotMgr>(*this); |
233 | |
|
234 | 0 | RETURN_NOT_OK_STATUS_WITH_WARN( |
235 | 0 | init_stream_load_recorder(ExecEnv::GetInstance()->store_paths()[0].path), |
236 | 0 | "init StreamLoadRecorder failed"); |
237 | | |
238 | | // check cluster id |
239 | 0 | RETURN_NOT_OK_STATUS_WITH_WARN(_check_all_root_path_cluster_id(), "fail to check cluster id"); |
240 | |
|
241 | 0 | RETURN_NOT_OK_STATUS_WITH_WARN(ThreadPoolBuilder("SyncLoadForTabletsThreadPool") |
242 | 0 | .set_max_threads(config::sync_load_for_tablets_thread) |
243 | 0 | .set_min_threads(config::sync_load_for_tablets_thread) |
244 | 0 | .build(&_sync_load_for_tablets_thread_pool), |
245 | 0 | "fail to build SyncLoadForTabletsThreadPool"); |
246 | |
|
247 | 0 | RETURN_NOT_OK_STATUS_WITH_WARN(ThreadPoolBuilder("WarmupCacheAsyncThreadPool") |
248 | 0 | .set_max_threads(config::warmup_cache_async_thread) |
249 | 0 | .set_min_threads(config::warmup_cache_async_thread) |
250 | 0 | .build(&_warmup_cache_async_thread_pool), |
251 | 0 | "fail to build WarmupCacheAsyncThreadPool"); |
252 | |
|
253 | 0 | return Status::OK(); |
254 | 0 | } |
255 | | |
256 | 131 | void CloudStorageEngine::stop() { |
257 | 131 | if (_stopped) { |
258 | 0 | return; |
259 | 0 | } |
260 | | |
261 | 131 | _stopped = true; |
262 | 131 | _stop_background_threads_latch.count_down(); |
263 | | |
264 | 131 | for (auto&& t : _bg_threads) { |
265 | 0 | if (t) { |
266 | 0 | t->join(); |
267 | 0 | } |
268 | 0 | } |
269 | | |
270 | 131 | if (_base_compaction_thread_pool) { |
271 | 0 | _base_compaction_thread_pool->shutdown(); |
272 | 0 | } |
273 | 131 | if (_cumu_compaction_thread_pool) { |
274 | 0 | _cumu_compaction_thread_pool->shutdown(); |
275 | 0 | } |
276 | 131 | LOG(INFO) << "Cloud storage engine is stopped."; |
277 | | |
278 | 131 | if (_calc_tablet_delete_bitmap_task_thread_pool) { |
279 | 0 | _calc_tablet_delete_bitmap_task_thread_pool->shutdown(); |
280 | 0 | } |
281 | 131 | if (_sync_delete_bitmap_thread_pool) { |
282 | 0 | _sync_delete_bitmap_thread_pool->shutdown(); |
283 | 0 | } |
284 | 131 | } |
285 | | |
286 | 0 | bool CloudStorageEngine::stopped() { |
287 | 0 | return _stopped; |
288 | 0 | } |
289 | | |
290 | | Result<BaseTabletSPtr> CloudStorageEngine::get_tablet(int64_t tablet_id, |
291 | | SyncRowsetStats* sync_stats, |
292 | | bool force_use_only_cached, |
293 | 0 | bool cache_on_miss) { |
294 | 0 | return _tablet_mgr |
295 | 0 | ->get_tablet(tablet_id, false, true, sync_stats, force_use_only_cached, cache_on_miss) |
296 | 0 | .transform([](auto&& t) { return static_pointer_cast<BaseTablet>(std::move(t)); }); |
297 | 0 | } |
298 | | |
299 | | Status CloudStorageEngine::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta, |
300 | 0 | bool force_use_only_cached) { |
301 | 0 | if (tablet_meta == nullptr) { |
302 | 0 | return Status::InvalidArgument("tablet_meta output is null"); |
303 | 0 | } |
304 | | |
305 | | #if 0 |
306 | | if (_tablet_mgr && _tablet_mgr->peek_tablet_meta(tablet_id, tablet_meta)) { |
307 | | return Status::OK(); |
308 | | } |
309 | | |
310 | | if (force_use_only_cached) { |
311 | | return Status::NotFound("tablet meta {} not found in cache", tablet_id); |
312 | | } |
313 | | #endif |
314 | | |
315 | 0 | if (_meta_mgr == nullptr) { |
316 | 0 | return Status::InternalError("cloud meta manager is not initialized"); |
317 | 0 | } |
318 | | |
319 | 0 | return _meta_mgr->get_tablet_meta(tablet_id, tablet_meta); |
320 | 0 | } |
321 | | |
322 | 0 | Status CloudStorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr) { |
323 | 0 | RETURN_IF_ERROR(Thread::create( |
324 | 0 | "CloudStorageEngine", "refresh_s3_info_thread", |
325 | 0 | [this]() { this->_refresh_storage_vault_info_thread_callback(); }, |
326 | 0 | &_bg_threads.emplace_back())); |
327 | 0 | LOG(INFO) << "refresh s3 info thread started"; |
328 | |
|
329 | 0 | RETURN_IF_ERROR(Thread::create( |
330 | 0 | "CloudStorageEngine", "vacuum_stale_rowsets_thread", |
331 | 0 | [this]() { this->_vacuum_stale_rowsets_thread_callback(); }, |
332 | 0 | &_bg_threads.emplace_back())); |
333 | 0 | LOG(INFO) << "vacuum stale rowsets thread started"; |
334 | |
|
335 | 0 | RETURN_IF_ERROR(Thread::create( |
336 | 0 | "CloudStorageEngine", "sync_tablets_thread", |
337 | 0 | [this]() { this->_sync_tablets_thread_callback(); }, &_bg_threads.emplace_back())); |
338 | 0 | LOG(INFO) << "sync tablets thread started"; |
339 | |
|
340 | 0 | RETURN_IF_ERROR(Thread::create( |
341 | 0 | "CloudStorageEngine", "evict_querying_rowset_thread", |
342 | 0 | [this]() { this->_evict_quring_rowset_thread_callback(); }, |
343 | 0 | &_evict_quering_rowset_thread)); |
344 | 0 | LOG(INFO) << "evict quering thread started"; |
345 | | |
346 | | // add calculate tablet delete bitmap task thread pool |
347 | 0 | RETURN_IF_ERROR(ThreadPoolBuilder("TabletCalDeleteBitmapThreadPool") |
348 | 0 | .set_min_threads(config::calc_tablet_delete_bitmap_task_max_thread) |
349 | 0 | .set_max_threads(config::calc_tablet_delete_bitmap_task_max_thread) |
350 | 0 | .build(&_calc_tablet_delete_bitmap_task_thread_pool)); |
351 | 0 | RETURN_IF_ERROR(ThreadPoolBuilder("SyncDeleteBitmapThreadPool") |
352 | 0 | .set_min_threads(config::sync_delete_bitmap_task_max_thread) |
353 | 0 | .set_max_threads(config::sync_delete_bitmap_task_max_thread) |
354 | 0 | .build(&_sync_delete_bitmap_thread_pool)); |
355 | | |
356 | | // TODO(plat1ko): check_bucket_enable_versioning_thread |
357 | | |
358 | | // compaction tasks producer thread |
359 | 0 | int base_thread_num = get_base_thread_num(); |
360 | 0 | int cumu_thread_num = get_cumu_thread_num(); |
361 | |
|
362 | 0 | RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool") |
363 | 0 | .set_min_threads(base_thread_num) |
364 | 0 | .set_max_threads(base_thread_num) |
365 | 0 | .build(&_base_compaction_thread_pool)); |
366 | 0 | RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool") |
367 | 0 | .set_min_threads(cumu_thread_num) |
368 | 0 | .set_max_threads(cumu_thread_num) |
369 | 0 | .build(&_cumu_compaction_thread_pool)); |
370 | 0 | RETURN_IF_ERROR(Thread::create( |
371 | 0 | "StorageEngine", "compaction_tasks_producer_thread", |
372 | 0 | [this]() { this->_compaction_tasks_producer_callback(); }, |
373 | 0 | &_bg_threads.emplace_back())); |
374 | 0 | LOG(INFO) << "compaction tasks producer thread started," |
375 | 0 | << " base thread num " << base_thread_num << " cumu thread num " << cumu_thread_num; |
376 | |
|
377 | 0 | RETURN_IF_ERROR(Thread::create( |
378 | 0 | "StorageEngine", "lease_compaction_thread", |
379 | 0 | [this]() { this->_lease_compaction_thread_callback(); }, &_bg_threads.emplace_back())); |
380 | | |
381 | 0 | LOG(INFO) << "lease compaction thread started"; |
382 | |
|
383 | 0 | RETURN_IF_ERROR(Thread::create( |
384 | 0 | "StorageEngine", "check_tablet_delete_bitmap_score_thread", |
385 | 0 | [this]() { this->_check_tablet_delete_bitmap_score_callback(); }, |
386 | 0 | &_bg_threads.emplace_back())); |
387 | 0 | LOG(INFO) << "check tablet delete bitmap score thread started"; |
388 | |
|
389 | 0 | return Status::OK(); |
390 | 0 | } |
391 | | |
392 | 0 | void CloudStorageEngine::sync_storage_vault() { |
393 | 0 | cloud::StorageVaultInfos vault_infos; |
394 | 0 | bool enable_storage_vault = false; |
395 | |
|
396 | 0 | auto st = _meta_mgr->get_storage_vault_info(&vault_infos, &enable_storage_vault); |
397 | 0 | if (!st.ok()) { |
398 | 0 | LOG(WARNING) << "failed to get storage vault info. err=" << st; |
399 | 0 | return; |
400 | 0 | } |
401 | | |
402 | 0 | if (vault_infos.empty()) { |
403 | 0 | LOG(WARNING) << "empty storage vault info"; |
404 | 0 | return; |
405 | 0 | } |
406 | | |
407 | 0 | bool check_storage_vault = false; |
408 | 0 | bool expected = false; |
409 | 0 | if (first_sync_storage_vault.compare_exchange_strong(expected, true)) { |
410 | 0 | check_storage_vault = config::enable_check_storage_vault; |
411 | 0 | LOG(INFO) << "first sync storage vault info, BE try to check iam role connectivity, " |
412 | 0 | "check_storage_vault=" |
413 | 0 | << check_storage_vault; |
414 | 0 | } |
415 | |
|
416 | 0 | for (auto& [id, vault_info, path_format] : vault_infos) { |
417 | 0 | auto fs = get_filesystem(id); |
418 | 0 | auto status = |
419 | 0 | (fs == nullptr) |
420 | 0 | ? std::visit(VaultCreateFSVisitor {id, path_format, check_storage_vault}, |
421 | 0 | vault_info) |
422 | 0 | : std::visit(RefreshFSVaultVisitor {id, std::move(fs), path_format}, |
423 | 0 | vault_info); |
424 | 0 | if (!status.ok()) [[unlikely]] { |
425 | 0 | LOG(WARNING) << vault_process_error(id, vault_info, std::move(st)); |
426 | 0 | } |
427 | 0 | } |
428 | |
|
429 | 0 | if (auto& id = std::get<0>(vault_infos.back()); |
430 | 0 | (latest_fs() == nullptr || latest_fs()->id() != id) && !enable_storage_vault) { |
431 | 0 | set_latest_fs(get_filesystem(id)); |
432 | 0 | } |
433 | 0 | } |
434 | | |
435 | | // We should enable_java_support if we want to use hdfs vault |
436 | 0 | void CloudStorageEngine::_refresh_storage_vault_info_thread_callback() { |
437 | 0 | while (!_stop_background_threads_latch.wait_for( |
438 | 0 | std::chrono::seconds(config::refresh_s3_info_interval_s))) { |
439 | 0 | sync_storage_vault(); |
440 | 0 | } |
441 | 0 | } |
442 | | |
443 | 0 | void CloudStorageEngine::_vacuum_stale_rowsets_thread_callback() { |
444 | 0 | while (!_stop_background_threads_latch.wait_for( |
445 | 0 | std::chrono::seconds(config::vacuum_stale_rowsets_interval_s))) { |
446 | 0 | _tablet_mgr->vacuum_stale_rowsets(_stop_background_threads_latch); |
447 | 0 | } |
448 | 0 | } |
449 | | |
450 | 0 | void CloudStorageEngine::_sync_tablets_thread_callback() { |
451 | 0 | while (!_stop_background_threads_latch.wait_for( |
452 | 0 | std::chrono::seconds(config::schedule_sync_tablets_interval_s))) { |
453 | 0 | _tablet_mgr->sync_tablets(_stop_background_threads_latch); |
454 | 0 | } |
455 | 0 | } |
456 | | |
457 | | void CloudStorageEngine::get_cumu_compaction( |
458 | 0 | int64_t tablet_id, std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res) { |
459 | 0 | std::lock_guard lock(_compaction_mtx); |
460 | 0 | if (auto it = _submitted_cumu_compactions.find(tablet_id); |
461 | 0 | it != _submitted_cumu_compactions.end()) { |
462 | 0 | res = it->second; |
463 | 0 | } |
464 | 0 | } |
465 | | |
466 | 0 | Status CloudStorageEngine::_adjust_compaction_thread_num() { |
467 | 0 | int base_thread_num = get_base_thread_num(); |
468 | |
|
469 | 0 | if (!_base_compaction_thread_pool || !_cumu_compaction_thread_pool) { |
470 | 0 | LOG(WARNING) << "base or cumu compaction thread pool is not created"; |
471 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>(""); |
472 | 0 | } |
473 | | |
474 | 0 | if (_base_compaction_thread_pool->max_threads() != base_thread_num) { |
475 | 0 | int old_max_threads = _base_compaction_thread_pool->max_threads(); |
476 | 0 | Status status = _base_compaction_thread_pool->set_max_threads(base_thread_num); |
477 | 0 | if (status.ok()) { |
478 | 0 | VLOG_NOTICE << "update base compaction thread pool max_threads from " << old_max_threads |
479 | 0 | << " to " << base_thread_num; |
480 | 0 | } |
481 | 0 | } |
482 | 0 | if (_base_compaction_thread_pool->min_threads() != base_thread_num) { |
483 | 0 | int old_min_threads = _base_compaction_thread_pool->min_threads(); |
484 | 0 | Status status = _base_compaction_thread_pool->set_min_threads(base_thread_num); |
485 | 0 | if (status.ok()) { |
486 | 0 | VLOG_NOTICE << "update base compaction thread pool min_threads from " << old_min_threads |
487 | 0 | << " to " << base_thread_num; |
488 | 0 | } |
489 | 0 | } |
490 | |
|
491 | 0 | int cumu_thread_num = get_cumu_thread_num(); |
492 | 0 | if (_cumu_compaction_thread_pool->max_threads() != cumu_thread_num) { |
493 | 0 | int old_max_threads = _cumu_compaction_thread_pool->max_threads(); |
494 | 0 | Status status = _cumu_compaction_thread_pool->set_max_threads(cumu_thread_num); |
495 | 0 | if (status.ok()) { |
496 | 0 | VLOG_NOTICE << "update cumu compaction thread pool max_threads from " << old_max_threads |
497 | 0 | << " to " << cumu_thread_num; |
498 | 0 | } |
499 | 0 | } |
500 | 0 | if (_cumu_compaction_thread_pool->min_threads() != cumu_thread_num) { |
501 | 0 | int old_min_threads = _cumu_compaction_thread_pool->min_threads(); |
502 | 0 | Status status = _cumu_compaction_thread_pool->set_min_threads(cumu_thread_num); |
503 | 0 | if (status.ok()) { |
504 | 0 | VLOG_NOTICE << "update cumu compaction thread pool min_threads from " << old_min_threads |
505 | 0 | << " to " << cumu_thread_num; |
506 | 0 | } |
507 | 0 | } |
508 | 0 | return Status::OK(); |
509 | 0 | } |
510 | | |
511 | 0 | void CloudStorageEngine::_compaction_tasks_producer_callback() { |
512 | 0 | LOG(INFO) << "try to start compaction producer process!"; |
513 | |
|
514 | 0 | int round = 0; |
515 | 0 | CompactionType compaction_type; |
516 | | |
517 | | // Used to record the time when the score metric was last updated. |
518 | | // The update of the score metric is accompanied by the logic of selecting the tablet. |
519 | | // If there is no slot available, the logic of selecting the tablet will be terminated, |
520 | | // which causes the score metric update to be terminated. |
521 | | // In order to avoid this situation, we need to update the score regularly. |
522 | 0 | int64_t last_cumulative_score_update_time = 0; |
523 | 0 | int64_t last_base_score_update_time = 0; |
524 | 0 | static const int64_t check_score_interval_ms = 5000; // 5 secs |
525 | |
|
526 | 0 | int64_t interval = config::generate_compaction_tasks_interval_ms; |
527 | 0 | do { |
528 | 0 | int64_t cur_time = UnixMillis(); |
529 | 0 | if (!config::disable_auto_compaction) { |
530 | 0 | Status st = _adjust_compaction_thread_num(); |
531 | 0 | if (!st.ok()) { |
532 | 0 | break; |
533 | 0 | } |
534 | | |
535 | 0 | bool check_score = false; |
536 | 0 | if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) { |
537 | 0 | compaction_type = CompactionType::CUMULATIVE_COMPACTION; |
538 | 0 | round++; |
539 | 0 | if (cur_time - last_cumulative_score_update_time >= check_score_interval_ms) { |
540 | 0 | check_score = true; |
541 | 0 | last_cumulative_score_update_time = cur_time; |
542 | 0 | } |
543 | 0 | } else { |
544 | 0 | compaction_type = CompactionType::BASE_COMPACTION; |
545 | 0 | round = 0; |
546 | 0 | if (cur_time - last_base_score_update_time >= check_score_interval_ms) { |
547 | 0 | check_score = true; |
548 | 0 | last_base_score_update_time = cur_time; |
549 | 0 | } |
550 | 0 | } |
551 | 0 | std::unique_ptr<ThreadPool>& thread_pool = |
552 | 0 | (compaction_type == CompactionType::CUMULATIVE_COMPACTION) |
553 | 0 | ? _cumu_compaction_thread_pool |
554 | 0 | : _base_compaction_thread_pool; |
555 | 0 | VLOG_CRITICAL << "compaction thread pool. type: " |
556 | 0 | << (compaction_type == CompactionType::CUMULATIVE_COMPACTION ? "CUMU" |
557 | 0 | : "BASE") |
558 | 0 | << ", num_threads: " << thread_pool->num_threads() |
559 | 0 | << ", num_threads_pending_start: " |
560 | 0 | << thread_pool->num_threads_pending_start() |
561 | 0 | << ", num_active_threads: " << thread_pool->num_active_threads() |
562 | 0 | << ", max_threads: " << thread_pool->max_threads() |
563 | 0 | << ", min_threads: " << thread_pool->min_threads() |
564 | 0 | << ", num_total_queued_tasks: " << thread_pool->get_queue_size(); |
565 | 0 | std::vector<CloudTabletSPtr> tablets_compaction = |
566 | 0 | _generate_cloud_compaction_tasks(compaction_type, check_score); |
567 | | |
568 | | /// Regardless of whether the tablet is submitted for compaction or not, |
569 | | /// we need to call 'reset_compaction' to clean up the base_compaction or cumulative_compaction objects |
570 | | /// in the tablet, because these two objects store the tablet's own shared_ptr. |
571 | | /// If it is not cleaned up, the reference count of the tablet will always be greater than 1, |
572 | | /// thus cannot be collected by the garbage collector. (TabletManager::start_trash_sweep) |
573 | 0 | for (const auto& tablet : tablets_compaction) { |
574 | 0 | Status status = submit_compaction_task(tablet, compaction_type); |
575 | 0 | if (status.ok()) continue; |
576 | 0 | if ((!status.is<ErrorCode::BE_NO_SUITABLE_VERSION>() && |
577 | 0 | !status.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) || |
578 | 0 | VLOG_DEBUG_IS_ON) { |
579 | 0 | LOG(WARNING) << "failed to submit compaction task for tablet: " |
580 | 0 | << tablet->tablet_id() << ", err: " << status; |
581 | 0 | } |
582 | 0 | } |
583 | 0 | interval = config::generate_compaction_tasks_interval_ms; |
584 | 0 | } else { |
585 | 0 | interval = config::check_auto_compaction_interval_seconds * 1000; |
586 | 0 | } |
587 | 0 | int64_t end_time = UnixMillis(); |
588 | 0 | DorisMetrics::instance()->compaction_producer_callback_a_round_time->set_value(end_time - |
589 | 0 | cur_time); |
590 | 0 | } while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval))); |
591 | 0 | } |
592 | | |
593 | | void CloudStorageEngine::unregister_index_change_compaction(int64_t tablet_id, |
594 | 4 | bool is_base_compact) { |
595 | 4 | std::lock_guard lock(_compaction_mtx); |
596 | 4 | if (is_base_compact) { |
597 | 0 | _submitted_index_change_base_compaction.erase(tablet_id); |
598 | 4 | } else { |
599 | 4 | _submitted_index_change_cumu_compaction.erase(tablet_id); |
600 | 4 | } |
601 | 4 | } |
602 | | |
603 | | bool CloudStorageEngine::register_index_change_compaction( |
604 | | std::shared_ptr<CloudIndexChangeCompaction> compact, int64_t tablet_id, |
605 | 8 | bool is_base_compact, std::string& err_reason) { |
606 | 8 | std::lock_guard lock(_compaction_mtx); |
607 | 8 | if (is_base_compact) { |
608 | 2 | if (_submitted_base_compactions.contains(tablet_id) || |
609 | 2 | _submitted_full_compactions.contains(tablet_id) || |
610 | 2 | _submitted_index_change_base_compaction.contains(tablet_id)) { |
611 | 1 | std::stringstream ss; |
612 | 1 | ss << "reason:" << ((int)_submitted_base_compactions.contains(tablet_id)) << ", " |
613 | 1 | << ((int)_submitted_full_compactions.contains(tablet_id)) << ", " |
614 | 1 | << ((int)_submitted_index_change_base_compaction.contains(tablet_id)); |
615 | 1 | err_reason = ss.str(); |
616 | 1 | return false; |
617 | 1 | } else { |
618 | 1 | _submitted_index_change_base_compaction[tablet_id] = compact; |
619 | 1 | return true; |
620 | 1 | } |
621 | 6 | } else { |
622 | 6 | if (_tablet_preparing_cumu_compaction.contains(tablet_id) || |
623 | 6 | _submitted_cumu_compactions.contains(tablet_id) || |
624 | 6 | _submitted_index_change_cumu_compaction.contains(tablet_id)) { |
625 | 1 | std::stringstream ss; |
626 | 1 | ss << "reason:" << ((int)_tablet_preparing_cumu_compaction.contains(tablet_id)) << ", " |
627 | 1 | << ((int)_submitted_cumu_compactions.contains(tablet_id)) << ", " |
628 | 1 | << ((int)_submitted_index_change_cumu_compaction.contains(tablet_id)); |
629 | 1 | err_reason = ss.str(); |
630 | 1 | return false; |
631 | 5 | } else { |
632 | 5 | _submitted_index_change_cumu_compaction[tablet_id] = compact; |
633 | 5 | } |
634 | 5 | return true; |
635 | 6 | } |
636 | 8 | } |
637 | | |
638 | | std::vector<CloudTabletSPtr> CloudStorageEngine::_generate_cloud_compaction_tasks( |
639 | 0 | CompactionType compaction_type, bool check_score) { |
640 | 0 | std::vector<std::shared_ptr<CloudTablet>> tablets_compaction; |
641 | |
|
642 | 0 | int64_t max_compaction_score = 0; |
643 | 0 | std::unordered_set<int64_t> tablet_preparing_cumu_compaction; |
644 | 0 | std::unordered_map<int64_t, std::vector<std::shared_ptr<CloudCumulativeCompaction>>> |
645 | 0 | submitted_cumu_compactions; |
646 | 0 | std::unordered_map<int64_t, std::shared_ptr<CloudBaseCompaction>> submitted_base_compactions; |
647 | 0 | std::unordered_map<int64_t, std::shared_ptr<CloudFullCompaction>> submitted_full_compactions; |
648 | 0 | std::unordered_map<int64_t, std::shared_ptr<CloudIndexChangeCompaction>> |
649 | 0 | submitted_index_change_cumu_compactions; |
650 | 0 | std::unordered_map<int64_t, std::shared_ptr<CloudIndexChangeCompaction>> |
651 | 0 | submitted_index_change_base_compactions; |
652 | 0 | { |
653 | 0 | std::lock_guard lock(_compaction_mtx); |
654 | 0 | tablet_preparing_cumu_compaction = _tablet_preparing_cumu_compaction; |
655 | 0 | submitted_cumu_compactions = _submitted_cumu_compactions; |
656 | 0 | submitted_base_compactions = _submitted_base_compactions; |
657 | 0 | submitted_full_compactions = _submitted_full_compactions; |
658 | 0 | submitted_index_change_cumu_compactions = _submitted_index_change_cumu_compaction; |
659 | 0 | submitted_index_change_base_compactions = _submitted_index_change_base_compaction; |
660 | 0 | } |
661 | |
|
662 | 0 | bool need_pick_tablet = true; |
663 | 0 | int thread_per_disk = |
664 | 0 | config::compaction_task_num_per_fast_disk; // all disks are fast in cloud mode |
665 | 0 | int num_cumu = |
666 | 0 | std::accumulate(submitted_cumu_compactions.begin(), submitted_cumu_compactions.end(), 0, |
667 | 0 | [](int a, auto& b) { return a + b.second.size(); }); |
668 | 0 | int num_base = |
669 | 0 | cast_set<int>(submitted_base_compactions.size() + submitted_full_compactions.size()); |
670 | 0 | int n = thread_per_disk - num_cumu - num_base; |
671 | 0 | if (compaction_type == CompactionType::BASE_COMPACTION) { |
672 | | // We need to reserve at least one thread for cumulative compaction, |
673 | | // because base compactions may take too long to complete, which may |
674 | | // leads to "too many rowsets" error. |
675 | 0 | int base_n = std::min(config::max_base_compaction_task_num_per_disk, thread_per_disk - 1) - |
676 | 0 | num_base; |
677 | 0 | n = std::min(base_n, n); |
678 | 0 | } |
679 | 0 | if (n <= 0) { // No threads available |
680 | 0 | if (!check_score) return tablets_compaction; |
681 | 0 | need_pick_tablet = false; |
682 | 0 | n = 0; |
683 | 0 | } |
684 | | |
685 | | // Return true for skipping compaction |
686 | 0 | std::function<bool(CloudTablet*)> filter_out; |
687 | 0 | if (compaction_type == CompactionType::BASE_COMPACTION) { |
688 | 0 | filter_out = [&submitted_base_compactions, &submitted_full_compactions, |
689 | 0 | &submitted_index_change_base_compactions](CloudTablet* t) { |
690 | 0 | return submitted_base_compactions.contains(t->tablet_id()) || |
691 | 0 | submitted_full_compactions.contains(t->tablet_id()) || |
692 | 0 | submitted_index_change_base_compactions.contains(t->tablet_id()) || |
693 | 0 | t->tablet_state() != TABLET_RUNNING; |
694 | 0 | }; |
695 | 0 | } else if (config::enable_parallel_cumu_compaction) { |
696 | 0 | filter_out = [&tablet_preparing_cumu_compaction, |
697 | 0 | &submitted_index_change_cumu_compactions](CloudTablet* t) { |
698 | 0 | return tablet_preparing_cumu_compaction.contains(t->tablet_id()) || |
699 | 0 | submitted_index_change_cumu_compactions.contains(t->tablet_id()) || |
700 | 0 | (t->tablet_state() != TABLET_RUNNING && |
701 | 0 | (!config::enable_new_tablet_do_compaction || t->alter_version() == -1)); |
702 | 0 | }; |
703 | 0 | } else { |
704 | 0 | filter_out = [&tablet_preparing_cumu_compaction, &submitted_cumu_compactions, |
705 | 0 | &submitted_index_change_cumu_compactions](CloudTablet* t) { |
706 | 0 | return tablet_preparing_cumu_compaction.contains(t->tablet_id()) || |
707 | 0 | submitted_index_change_cumu_compactions.contains(t->tablet_id()) || |
708 | 0 | submitted_cumu_compactions.contains(t->tablet_id()) || |
709 | 0 | (t->tablet_state() != TABLET_RUNNING && |
710 | 0 | (!config::enable_new_tablet_do_compaction || t->alter_version() == -1)); |
711 | 0 | }; |
712 | 0 | } |
713 | | |
714 | | // Even if need_pick_tablet is false, we still need to call find_best_tablet_to_compaction(), |
715 | | // So that we can update the max_compaction_score metric. |
716 | 0 | do { |
717 | 0 | std::vector<CloudTabletSPtr> tablets; |
718 | 0 | auto st = tablet_mgr().get_topn_tablets_to_compact(n, compaction_type, filter_out, &tablets, |
719 | 0 | &max_compaction_score); |
720 | 0 | if (!st.ok()) { |
721 | 0 | LOG(WARNING) << "failed to get tablets to compact, err=" << st; |
722 | 0 | break; |
723 | 0 | } |
724 | 0 | if (!need_pick_tablet) break; |
725 | 0 | tablets_compaction = std::move(tablets); |
726 | 0 | } while (false); |
727 | | |
728 | 0 | if (max_compaction_score > 0) { |
729 | 0 | if (compaction_type == CompactionType::BASE_COMPACTION) { |
730 | 0 | DorisMetrics::instance()->tablet_base_max_compaction_score->set_value( |
731 | 0 | max_compaction_score); |
732 | 0 | } else { |
733 | 0 | DorisMetrics::instance()->tablet_cumulative_max_compaction_score->set_value( |
734 | 0 | max_compaction_score); |
735 | 0 | } |
736 | 0 | } |
737 | |
|
738 | 0 | return tablets_compaction; |
739 | 0 | } |
740 | | |
741 | | Status CloudStorageEngine::_request_tablet_global_compaction_lock( |
742 | | ReaderType compaction_type, const CloudTabletSPtr& tablet, |
743 | 0 | std::shared_ptr<CloudCompactionMixin> compaction) { |
744 | 0 | long now = duration_cast<std::chrono::milliseconds>( |
745 | 0 | std::chrono::system_clock::now().time_since_epoch()) |
746 | 0 | .count(); |
747 | 0 | if (compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION) { |
748 | 0 | auto cumu_compaction = static_pointer_cast<CloudCumulativeCompaction>(compaction); |
749 | 0 | if (auto st = cumu_compaction->request_global_lock(); !st.ok()) { |
750 | 0 | LOG_WARNING("failed to request cumu compactoin global lock") |
751 | 0 | .tag("tablet id", tablet->tablet_id()) |
752 | 0 | .tag("msg", st.to_string()); |
753 | 0 | tablet->set_last_cumu_compaction_failure_time(now); |
754 | 0 | return st; |
755 | 0 | } |
756 | 0 | { |
757 | 0 | std::lock_guard lock(_compaction_mtx); |
758 | 0 | _executing_cumu_compactions[tablet->tablet_id()].push_back(cumu_compaction); |
759 | 0 | } |
760 | 0 | return Status::OK(); |
761 | 0 | } else if (compaction_type == ReaderType::READER_BASE_COMPACTION) { |
762 | 0 | auto base_compaction = static_pointer_cast<CloudBaseCompaction>(compaction); |
763 | 0 | if (auto st = base_compaction->request_global_lock(); !st.ok()) { |
764 | 0 | LOG_WARNING("failed to request base compactoin global lock") |
765 | 0 | .tag("tablet id", tablet->tablet_id()) |
766 | 0 | .tag("msg", st.to_string()); |
767 | 0 | tablet->set_last_base_compaction_failure_time(now); |
768 | 0 | return st; |
769 | 0 | } |
770 | 0 | { |
771 | 0 | std::lock_guard lock(_compaction_mtx); |
772 | 0 | _executing_base_compactions[tablet->tablet_id()] = base_compaction; |
773 | 0 | } |
774 | 0 | return Status::OK(); |
775 | 0 | } else if (compaction_type == ReaderType::READER_FULL_COMPACTION) { |
776 | 0 | auto full_compaction = static_pointer_cast<CloudFullCompaction>(compaction); |
777 | 0 | if (auto st = full_compaction->request_global_lock(); !st.ok()) { |
778 | 0 | LOG_WARNING("failed to request full compactoin global lock") |
779 | 0 | .tag("tablet id", tablet->tablet_id()) |
780 | 0 | .tag("msg", st.to_string()); |
781 | 0 | tablet->set_last_full_compaction_failure_time(now); |
782 | 0 | return st; |
783 | 0 | } |
784 | 0 | { |
785 | 0 | std::lock_guard lock(_compaction_mtx); |
786 | 0 | _executing_full_compactions[tablet->tablet_id()] = full_compaction; |
787 | 0 | } |
788 | 0 | return Status::OK(); |
789 | 0 | } else { |
790 | 0 | LOG(WARNING) << "unsupport compaction task for tablet: " << tablet->tablet_id() |
791 | 0 | << ", compaction name: " << compaction->compaction_name(); |
792 | 0 | return Status::NotFound("Unsupport compaction type {}", compaction->compaction_name()); |
793 | 0 | } |
794 | 0 | } |
795 | | |
796 | 0 | Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& tablet) { |
797 | 0 | using namespace std::chrono; |
798 | 0 | { |
799 | 0 | std::lock_guard lock(_compaction_mtx); |
800 | | // Take a placeholder for base compaction |
801 | 0 | auto [_, success] = _submitted_base_compactions.emplace(tablet->tablet_id(), nullptr); |
802 | 0 | if (!success) { |
803 | 0 | return Status::AlreadyExist( |
804 | 0 | "other base compaction or full compaction is submitted, tablet_id={}", |
805 | 0 | tablet->tablet_id()); |
806 | 0 | } |
807 | 0 | } |
808 | 0 | auto compaction = std::make_shared<CloudBaseCompaction>(*this, tablet); |
809 | 0 | auto st = compaction->prepare_compact(); |
810 | 0 | if (!st.ok()) { |
811 | 0 | long now = duration_cast<std::chrono::milliseconds>( |
812 | 0 | std::chrono::system_clock::now().time_since_epoch()) |
813 | 0 | .count(); |
814 | 0 | tablet->set_last_base_compaction_failure_time(now); |
815 | 0 | std::lock_guard lock(_compaction_mtx); |
816 | 0 | _submitted_base_compactions.erase(tablet->tablet_id()); |
817 | 0 | return st; |
818 | 0 | } |
819 | 0 | { |
820 | 0 | std::lock_guard lock(_compaction_mtx); |
821 | 0 | _submitted_base_compactions[tablet->tablet_id()] = compaction; |
822 | 0 | } |
823 | 0 | st = _base_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() { |
824 | 0 | DorisMetrics::instance()->base_compaction_task_running_total->increment(1); |
825 | 0 | DorisMetrics::instance()->base_compaction_task_pending_total->set_value( |
826 | 0 | _base_compaction_thread_pool->get_queue_size()); |
827 | 0 | g_base_compaction_running_task_count << 1; |
828 | 0 | signal::tablet_id = tablet->tablet_id(); |
829 | 0 | Defer defer {[&]() { |
830 | 0 | g_base_compaction_running_task_count << -1; |
831 | 0 | std::lock_guard lock(_compaction_mtx); |
832 | 0 | _submitted_base_compactions.erase(tablet->tablet_id()); |
833 | 0 | DorisMetrics::instance()->base_compaction_task_running_total->increment(-1); |
834 | 0 | DorisMetrics::instance()->base_compaction_task_pending_total->set_value( |
835 | 0 | _base_compaction_thread_pool->get_queue_size()); |
836 | 0 | }}; |
837 | 0 | auto st = _request_tablet_global_compaction_lock(ReaderType::READER_BASE_COMPACTION, tablet, |
838 | 0 | compaction); |
839 | 0 | if (!st.ok()) return; |
840 | 0 | st = compaction->execute_compact(); |
841 | 0 | if (!st.ok()) { |
842 | | // Error log has been output in `execute_compact` |
843 | 0 | long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
844 | 0 | tablet->set_last_base_compaction_failure_time(now); |
845 | 0 | } |
846 | 0 | std::lock_guard lock(_compaction_mtx); |
847 | 0 | _executing_base_compactions.erase(tablet->tablet_id()); |
848 | 0 | }); |
849 | 0 | DorisMetrics::instance()->base_compaction_task_pending_total->set_value( |
850 | 0 | _base_compaction_thread_pool->get_queue_size()); |
851 | 0 | if (!st.ok()) { |
852 | 0 | std::lock_guard lock(_compaction_mtx); |
853 | 0 | _submitted_base_compactions.erase(tablet->tablet_id()); |
854 | 0 | return Status::InternalError("failed to submit base compaction, tablet_id={}", |
855 | 0 | tablet->tablet_id()); |
856 | 0 | } |
857 | 0 | return st; |
858 | 0 | } |
859 | | |
860 | 0 | Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletSPtr& tablet) { |
861 | 0 | using namespace std::chrono; |
862 | 0 | { |
863 | 0 | std::lock_guard lock(_compaction_mtx); |
864 | 0 | if (!config::enable_parallel_cumu_compaction && |
865 | 0 | _submitted_cumu_compactions.count(tablet->tablet_id())) { |
866 | 0 | return Status::AlreadyExist("other cumu compaction is submitted, tablet_id={}", |
867 | 0 | tablet->tablet_id()); |
868 | 0 | } |
869 | 0 | auto [_, success] = _tablet_preparing_cumu_compaction.insert(tablet->tablet_id()); |
870 | 0 | if (!success) { |
871 | 0 | return Status::AlreadyExist("other cumu compaction is preparing, tablet_id={}", |
872 | 0 | tablet->tablet_id()); |
873 | 0 | } |
874 | 0 | } |
875 | 0 | auto compaction = std::make_shared<CloudCumulativeCompaction>(*this, tablet); |
876 | 0 | auto st = compaction->prepare_compact(); |
877 | 0 | if (!st.ok()) { |
878 | 0 | long now = duration_cast<std::chrono::milliseconds>( |
879 | 0 | std::chrono::system_clock::now().time_since_epoch()) |
880 | 0 | .count(); |
881 | 0 | if (!st.is<ErrorCode::CUMULATIVE_MEET_DELETE_VERSION>()) { |
882 | 0 | if (st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) { |
883 | | // Backoff strategy if no suitable version |
884 | 0 | tablet->last_cumu_no_suitable_version_ms = now; |
885 | 0 | } else { |
886 | 0 | tablet->set_last_cumu_compaction_failure_time(now); |
887 | 0 | } |
888 | 0 | } |
889 | 0 | std::lock_guard lock(_compaction_mtx); |
890 | 0 | _tablet_preparing_cumu_compaction.erase(tablet->tablet_id()); |
891 | 0 | return st; |
892 | 0 | } |
893 | 0 | { |
894 | 0 | std::lock_guard lock(_compaction_mtx); |
895 | 0 | _tablet_preparing_cumu_compaction.erase(tablet->tablet_id()); |
896 | 0 | _submitted_cumu_compactions[tablet->tablet_id()].push_back(compaction); |
897 | 0 | } |
898 | 0 | auto erase_submitted_cumu_compaction = [=, this]() { |
899 | 0 | std::lock_guard lock(_compaction_mtx); |
900 | 0 | auto it = _submitted_cumu_compactions.find(tablet->tablet_id()); |
901 | 0 | DCHECK(it != _submitted_cumu_compactions.end()); |
902 | 0 | auto& compactions = it->second; |
903 | 0 | auto it1 = std::find(compactions.begin(), compactions.end(), compaction); |
904 | 0 | DCHECK(it1 != compactions.end()); |
905 | 0 | compactions.erase(it1); |
906 | 0 | if (compactions.empty()) { // No compactions on this tablet, erase key |
907 | 0 | _submitted_cumu_compactions.erase(it); |
908 | | // No cumu compaction on this tablet, reset `last_cumu_no_suitable_version_ms` to enable this tablet to |
909 | | // enter the compaction scheduling candidate set. The purpose of doing this is to have at least one BE perform |
910 | | // cumu compaction on tablet which has suitable versions for cumu compaction. |
911 | 0 | tablet->last_cumu_no_suitable_version_ms = 0; |
912 | 0 | } |
913 | 0 | }; |
914 | 0 | auto erase_executing_cumu_compaction = [=, this]() { |
915 | 0 | std::lock_guard lock(_compaction_mtx); |
916 | 0 | auto it = _executing_cumu_compactions.find(tablet->tablet_id()); |
917 | 0 | DCHECK(it != _executing_cumu_compactions.end()); |
918 | 0 | auto& compactions = it->second; |
919 | 0 | auto it1 = std::find(compactions.begin(), compactions.end(), compaction); |
920 | 0 | DCHECK(it1 != compactions.end()); |
921 | 0 | compactions.erase(it1); |
922 | 0 | if (compactions.empty()) { // No compactions on this tablet, erase key |
923 | 0 | _executing_cumu_compactions.erase(it); |
924 | | // No cumu compaction on this tablet, reset `last_cumu_no_suitable_version_ms` to enable this tablet to |
925 | | // enter the compaction scheduling candidate set. The purpose of doing this is to have at least one BE perform |
926 | | // cumu compaction on tablet which has suitable versions for cumu compaction. |
927 | 0 | tablet->last_cumu_no_suitable_version_ms = 0; |
928 | 0 | } |
929 | 0 | }; |
930 | 0 | st = _cumu_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() { |
931 | 0 | DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(1); |
932 | 0 | DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value( |
933 | 0 | _cumu_compaction_thread_pool->get_queue_size()); |
934 | 0 | DBUG_EXECUTE_IF("CloudStorageEngine._submit_cumulative_compaction_task.wait_in_line", |
935 | 0 | { sleep(5); }) |
936 | 0 | signal::tablet_id = tablet->tablet_id(); |
937 | 0 | g_cumu_compaction_running_task_count << 1; |
938 | 0 | bool is_large_task = true; |
939 | 0 | Defer defer {[&]() { |
940 | 0 | DBUG_EXECUTE_IF("CloudStorageEngine._submit_cumulative_compaction_task.sleep", |
941 | 0 | { sleep(5); }) |
942 | 0 | std::lock_guard lock(_cumu_compaction_delay_mtx); |
943 | 0 | _cumu_compaction_thread_pool_used_threads--; |
944 | 0 | if (!is_large_task) { |
945 | 0 | _cumu_compaction_thread_pool_small_tasks_running--; |
946 | 0 | } |
947 | 0 | g_cumu_compaction_running_task_count << -1; |
948 | 0 | erase_submitted_cumu_compaction(); |
949 | 0 | DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(-1); |
950 | 0 | DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value( |
951 | 0 | _cumu_compaction_thread_pool->get_queue_size()); |
952 | 0 | }}; |
953 | 0 | auto st = _request_tablet_global_compaction_lock(ReaderType::READER_CUMULATIVE_COMPACTION, |
954 | 0 | tablet, compaction); |
955 | 0 | if (!st.ok()) return; |
956 | 0 | do { |
957 | 0 | std::lock_guard lock(_cumu_compaction_delay_mtx); |
958 | 0 | _cumu_compaction_thread_pool_used_threads++; |
959 | 0 | if (config::large_cumu_compaction_task_min_thread_num > 1 && |
960 | 0 | _cumu_compaction_thread_pool->max_threads() >= |
961 | 0 | config::large_cumu_compaction_task_min_thread_num) { |
962 | | // Determine if this is a small task based on configured thresholds |
963 | 0 | is_large_task = (compaction->get_input_rowsets_bytes() > |
964 | 0 | config::large_cumu_compaction_task_bytes_threshold || |
965 | 0 | compaction->get_input_num_rows() > |
966 | 0 | config::large_cumu_compaction_task_row_num_threshold); |
967 | | // Small task. No delay needed |
968 | 0 | if (!is_large_task) { |
969 | 0 | _cumu_compaction_thread_pool_small_tasks_running++; |
970 | 0 | break; |
971 | 0 | } |
972 | | // Deal with large task |
973 | 0 | if (_should_delay_large_task()) { |
974 | 0 | long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()) |
975 | 0 | .count(); |
976 | | // sleep 5s for this tablet |
977 | 0 | tablet->set_last_cumu_compaction_failure_time(now); |
978 | 0 | erase_executing_cumu_compaction(); |
979 | 0 | LOG_WARNING( |
980 | 0 | "failed to do CloudCumulativeCompaction, cumu thread pool is " |
981 | 0 | "intensive, delay large task.") |
982 | 0 | .tag("tablet_id", tablet->tablet_id()) |
983 | 0 | .tag("input_rows", compaction->get_input_num_rows()) |
984 | 0 | .tag("input_rowsets_total_size", compaction->get_input_rowsets_bytes()) |
985 | 0 | .tag("config::large_cumu_compaction_task_bytes_threshold", |
986 | 0 | config::large_cumu_compaction_task_bytes_threshold) |
987 | 0 | .tag("config::large_cumu_compaction_task_row_num_threshold", |
988 | 0 | config::large_cumu_compaction_task_row_num_threshold) |
989 | 0 | .tag("remaining threads", _cumu_compaction_thread_pool_used_threads) |
990 | 0 | .tag("small_tasks_running", |
991 | 0 | _cumu_compaction_thread_pool_small_tasks_running); |
992 | 0 | return; |
993 | 0 | } |
994 | 0 | } |
995 | 0 | } while (false); |
996 | 0 | st = compaction->execute_compact(); |
997 | 0 | if (!st.ok()) { |
998 | | // Error log has been output in `execute_compact` |
999 | 0 | long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
1000 | 0 | tablet->set_last_cumu_compaction_failure_time(now); |
1001 | 0 | } |
1002 | 0 | erase_executing_cumu_compaction(); |
1003 | 0 | }); |
1004 | 0 | DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value( |
1005 | 0 | _cumu_compaction_thread_pool->get_queue_size()); |
1006 | 0 | if (!st.ok()) { |
1007 | 0 | erase_submitted_cumu_compaction(); |
1008 | 0 | return Status::InternalError("failed to submit cumu compaction, tablet_id={}", |
1009 | 0 | tablet->tablet_id()); |
1010 | 0 | } |
1011 | 0 | return st; |
1012 | 0 | } |
1013 | | |
1014 | 0 | Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& tablet) { |
1015 | 0 | using namespace std::chrono; |
1016 | 0 | { |
1017 | 0 | std::lock_guard lock(_compaction_mtx); |
1018 | | // Take a placeholder for full compaction |
1019 | 0 | auto [_, success] = _submitted_full_compactions.emplace(tablet->tablet_id(), nullptr); |
1020 | 0 | if (!success) { |
1021 | 0 | return Status::AlreadyExist( |
1022 | 0 | "other full compaction or base compaction is submitted, tablet_id={}", |
1023 | 0 | tablet->tablet_id()); |
1024 | 0 | } |
1025 | 0 | } |
1026 | | //auto compaction = std::make_shared<CloudFullCompaction>(tablet); |
1027 | 0 | auto compaction = std::make_shared<CloudFullCompaction>(*this, tablet); |
1028 | 0 | auto st = compaction->prepare_compact(); |
1029 | 0 | if (!st.ok()) { |
1030 | 0 | long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
1031 | 0 | tablet->set_last_full_compaction_failure_time(now); |
1032 | 0 | std::lock_guard lock(_compaction_mtx); |
1033 | 0 | _submitted_full_compactions.erase(tablet->tablet_id()); |
1034 | 0 | return st; |
1035 | 0 | } |
1036 | 0 | { |
1037 | 0 | std::lock_guard lock(_compaction_mtx); |
1038 | 0 | _submitted_full_compactions[tablet->tablet_id()] = compaction; |
1039 | 0 | } |
1040 | 0 | st = _base_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() { |
1041 | 0 | g_full_compaction_running_task_count << 1; |
1042 | 0 | signal::tablet_id = tablet->tablet_id(); |
1043 | 0 | Defer defer {[&]() { |
1044 | 0 | g_full_compaction_running_task_count << -1; |
1045 | 0 | std::lock_guard lock(_compaction_mtx); |
1046 | 0 | _submitted_full_compactions.erase(tablet->tablet_id()); |
1047 | 0 | }}; |
1048 | 0 | auto st = _request_tablet_global_compaction_lock(ReaderType::READER_FULL_COMPACTION, tablet, |
1049 | 0 | compaction); |
1050 | 0 | if (!st.ok()) return; |
1051 | 0 | st = compaction->execute_compact(); |
1052 | 0 | if (!st.ok()) { |
1053 | | // Error log has been output in `execute_compact` |
1054 | 0 | long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
1055 | 0 | tablet->set_last_full_compaction_failure_time(now); |
1056 | 0 | } |
1057 | 0 | std::lock_guard lock(_compaction_mtx); |
1058 | 0 | _executing_full_compactions.erase(tablet->tablet_id()); |
1059 | 0 | }); |
1060 | 0 | if (!st.ok()) { |
1061 | 0 | std::lock_guard lock(_compaction_mtx); |
1062 | 0 | _submitted_full_compactions.erase(tablet->tablet_id()); |
1063 | 0 | return Status::InternalError("failed to submit full compaction, tablet_id={}", |
1064 | 0 | tablet->tablet_id()); |
1065 | 0 | } |
1066 | 0 | return st; |
1067 | 0 | } |
1068 | | |
1069 | | Status CloudStorageEngine::submit_compaction_task(const CloudTabletSPtr& tablet, |
1070 | 0 | CompactionType compaction_type) { |
1071 | 0 | DCHECK(compaction_type == CompactionType::CUMULATIVE_COMPACTION || |
1072 | 0 | compaction_type == CompactionType::BASE_COMPACTION || |
1073 | 0 | compaction_type == CompactionType::FULL_COMPACTION); |
1074 | 0 | switch (compaction_type) { |
1075 | 0 | case CompactionType::BASE_COMPACTION: |
1076 | 0 | RETURN_IF_ERROR(_submit_base_compaction_task(tablet)); |
1077 | 0 | return Status::OK(); |
1078 | 0 | case CompactionType::CUMULATIVE_COMPACTION: |
1079 | 0 | RETURN_IF_ERROR(_submit_cumulative_compaction_task(tablet)); |
1080 | 0 | return Status::OK(); |
1081 | 0 | case CompactionType::FULL_COMPACTION: |
1082 | 0 | RETURN_IF_ERROR(_submit_full_compaction_task(tablet)); |
1083 | 0 | return Status::OK(); |
1084 | 0 | default: |
1085 | 0 | return Status::InternalError("unknown compaction type!"); |
1086 | 0 | } |
1087 | 0 | } |
1088 | | |
1089 | 0 | void CloudStorageEngine::_lease_compaction_thread_callback() { |
1090 | 0 | while (!_stop_background_threads_latch.wait_for( |
1091 | 0 | std::chrono::seconds(config::lease_compaction_interval_seconds))) { |
1092 | 0 | std::vector<std::shared_ptr<CloudFullCompaction>> full_compactions; |
1093 | 0 | std::vector<std::shared_ptr<CloudBaseCompaction>> base_compactions; |
1094 | 0 | std::vector<std::shared_ptr<CloudCumulativeCompaction>> cumu_compactions; |
1095 | 0 | std::vector<std::shared_ptr<CloudCompactionStopToken>> compation_stop_tokens; |
1096 | 0 | std::vector<std::shared_ptr<CloudIndexChangeCompaction>> index_change_compations; |
1097 | 0 | { |
1098 | 0 | std::lock_guard lock(_compaction_mtx); |
1099 | 0 | for (auto& [_, base] : _executing_base_compactions) { |
1100 | 0 | if (base) { // `base` might be a nullptr placeholder |
1101 | 0 | base_compactions.push_back(base); |
1102 | 0 | } |
1103 | 0 | } |
1104 | 0 | for (auto& [_, cumus] : _executing_cumu_compactions) { |
1105 | 0 | for (auto& cumu : cumus) { |
1106 | 0 | cumu_compactions.push_back(cumu); |
1107 | 0 | } |
1108 | 0 | } |
1109 | 0 | for (auto& [_, full] : _executing_full_compactions) { |
1110 | 0 | if (full) { |
1111 | 0 | full_compactions.push_back(full); |
1112 | 0 | } |
1113 | 0 | } |
1114 | 0 | for (auto& [_, stop_token] : _active_compaction_stop_tokens) { |
1115 | 0 | if (stop_token) { |
1116 | 0 | compation_stop_tokens.push_back(stop_token); |
1117 | 0 | } |
1118 | 0 | } |
1119 | 0 | for (auto& [_, index_change] : _submitted_index_change_cumu_compaction) { |
1120 | 0 | if (index_change) { |
1121 | 0 | index_change_compations.push_back(index_change); |
1122 | 0 | } |
1123 | 0 | } |
1124 | 0 | for (auto& [_, index_change] : _submitted_index_change_base_compaction) { |
1125 | 0 | if (index_change) { |
1126 | 0 | index_change_compations.push_back(index_change); |
1127 | 0 | } |
1128 | 0 | } |
1129 | 0 | } |
1130 | | // TODO(plat1ko): Support batch lease rpc |
1131 | 0 | for (auto& stop_token : compation_stop_tokens) { |
1132 | 0 | stop_token->do_lease(); |
1133 | 0 | } |
1134 | 0 | for (auto& comp : full_compactions) { |
1135 | 0 | comp->do_lease(); |
1136 | 0 | } |
1137 | 0 | for (auto& comp : cumu_compactions) { |
1138 | 0 | comp->do_lease(); |
1139 | 0 | } |
1140 | 0 | for (auto& comp : base_compactions) { |
1141 | 0 | comp->do_lease(); |
1142 | 0 | } |
1143 | 0 | for (auto& comp : index_change_compations) { |
1144 | 0 | comp->do_lease(); |
1145 | 0 | } |
1146 | 0 | } |
1147 | 0 | } |
1148 | | |
1149 | 0 | void CloudStorageEngine::_check_tablet_delete_bitmap_score_callback() { |
1150 | 0 | LOG(INFO) << "try to start check tablet delete bitmap score!"; |
1151 | 0 | while (!_stop_background_threads_latch.wait_for( |
1152 | 0 | std::chrono::seconds(config::check_tablet_delete_bitmap_interval_seconds))) { |
1153 | 0 | if (!config::enable_check_tablet_delete_bitmap_score) { |
1154 | 0 | return; |
1155 | 0 | } |
1156 | 0 | uint64_t max_delete_bitmap_score = 0; |
1157 | 0 | uint64_t max_base_rowset_delete_bitmap_score = 0; |
1158 | 0 | tablet_mgr().get_topn_tablet_delete_bitmap_score(&max_delete_bitmap_score, |
1159 | 0 | &max_base_rowset_delete_bitmap_score); |
1160 | 0 | if (max_delete_bitmap_score > 0) { |
1161 | 0 | _tablet_max_delete_bitmap_score_metrics->set_value(max_delete_bitmap_score); |
1162 | 0 | } |
1163 | 0 | if (max_base_rowset_delete_bitmap_score > 0) { |
1164 | 0 | _tablet_max_base_rowset_delete_bitmap_score_metrics->set_value( |
1165 | 0 | max_base_rowset_delete_bitmap_score); |
1166 | 0 | } |
1167 | 0 | } |
1168 | 0 | } |
1169 | | |
1170 | 0 | Status CloudStorageEngine::get_compaction_status_json(std::string* result) { |
1171 | 0 | rapidjson::Document root; |
1172 | 0 | root.SetObject(); |
1173 | |
|
1174 | 0 | std::lock_guard lock(_compaction_mtx); |
1175 | | // cumu |
1176 | 0 | std::string_view cumu = "CumulativeCompaction"; |
1177 | 0 | rapidjson::Value cumu_key; |
1178 | 0 | cumu_key.SetString(cumu.data(), cast_set<uint32_t>(cumu.length()), root.GetAllocator()); |
1179 | 0 | rapidjson::Document cumu_arr; |
1180 | 0 | cumu_arr.SetArray(); |
1181 | 0 | for (auto& [tablet_id, v] : _submitted_cumu_compactions) { |
1182 | 0 | for (int i = 0; i < v.size(); ++i) { |
1183 | 0 | cumu_arr.PushBack(tablet_id, root.GetAllocator()); |
1184 | 0 | } |
1185 | 0 | } |
1186 | 0 | root.AddMember(cumu_key, cumu_arr, root.GetAllocator()); |
1187 | | // base |
1188 | 0 | std::string_view base = "BaseCompaction"; |
1189 | 0 | rapidjson::Value base_key; |
1190 | 0 | base_key.SetString(base.data(), cast_set<uint32_t>(base.length()), root.GetAllocator()); |
1191 | 0 | rapidjson::Document base_arr; |
1192 | 0 | base_arr.SetArray(); |
1193 | 0 | for (auto& [tablet_id, _] : _submitted_base_compactions) { |
1194 | 0 | base_arr.PushBack(tablet_id, root.GetAllocator()); |
1195 | 0 | } |
1196 | 0 | root.AddMember(base_key, base_arr, root.GetAllocator()); |
1197 | |
|
1198 | 0 | rapidjson::StringBuffer strbuf; |
1199 | 0 | rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf); |
1200 | 0 | root.Accept(writer); |
1201 | 0 | *result = std::string(strbuf.GetString()); |
1202 | 0 | return Status::OK(); |
1203 | 0 | } |
1204 | | |
1205 | | std::shared_ptr<CloudCumulativeCompactionPolicy> CloudStorageEngine::cumu_compaction_policy( |
1206 | 0 | std::string_view compaction_policy) { |
1207 | 0 | if (!_cumulative_compaction_policies.contains(compaction_policy)) { |
1208 | 0 | return _cumulative_compaction_policies.at(CUMULATIVE_SIZE_BASED_POLICY); |
1209 | 0 | } |
1210 | 0 | return _cumulative_compaction_policies.at(compaction_policy); |
1211 | 0 | } |
1212 | | |
1213 | | Status CloudStorageEngine::register_compaction_stop_token(CloudTabletSPtr tablet, |
1214 | 0 | int64_t initiator) { |
1215 | 0 | { |
1216 | 0 | std::lock_guard lock(_compaction_mtx); |
1217 | 0 | auto [_, success] = _active_compaction_stop_tokens.emplace(tablet->tablet_id(), nullptr); |
1218 | 0 | if (!success) { |
1219 | 0 | return Status::AlreadyExist("stop token already exists for tablet_id={}", |
1220 | 0 | tablet->tablet_id()); |
1221 | 0 | } |
1222 | 0 | } |
1223 | | |
1224 | 0 | auto stop_token = std::make_shared<CloudCompactionStopToken>(*this, tablet, initiator); |
1225 | 0 | auto st = stop_token->do_register(); |
1226 | |
|
1227 | 0 | if (!st.ok()) { |
1228 | 0 | std::lock_guard lock(_compaction_mtx); |
1229 | 0 | _active_compaction_stop_tokens.erase(tablet->tablet_id()); |
1230 | 0 | return st; |
1231 | 0 | } |
1232 | | |
1233 | 0 | { |
1234 | 0 | std::lock_guard lock(_compaction_mtx); |
1235 | 0 | _active_compaction_stop_tokens[tablet->tablet_id()] = stop_token; |
1236 | 0 | } |
1237 | 0 | LOG_INFO( |
1238 | 0 | "successfully register compaction stop token for tablet_id={}, " |
1239 | 0 | "delete_bitmap_lock_initiator={}", |
1240 | 0 | tablet->tablet_id(), initiator); |
1241 | 0 | return st; |
1242 | 0 | } |
1243 | | |
1244 | 0 | Status CloudStorageEngine::unregister_compaction_stop_token(CloudTabletSPtr tablet, bool clear_ms) { |
1245 | 0 | std::shared_ptr<CloudCompactionStopToken> stop_token; |
1246 | 0 | { |
1247 | 0 | std::lock_guard lock(_compaction_mtx); |
1248 | 0 | if (auto it = _active_compaction_stop_tokens.find(tablet->tablet_id()); |
1249 | 0 | it != _active_compaction_stop_tokens.end()) { |
1250 | 0 | stop_token = it->second; |
1251 | 0 | } else { |
1252 | 0 | return Status::NotFound("stop token not found for tablet_id={}", tablet->tablet_id()); |
1253 | 0 | } |
1254 | 0 | _active_compaction_stop_tokens.erase(tablet->tablet_id()); |
1255 | 0 | } |
1256 | 0 | LOG_INFO("successfully unregister compaction stop token for tablet_id={}", tablet->tablet_id()); |
1257 | 0 | if (stop_token && clear_ms) { |
1258 | 0 | RETURN_IF_ERROR(stop_token->do_unregister()); |
1259 | 0 | LOG_INFO( |
1260 | 0 | "successfully remove compaction stop token from MS for tablet_id={}, " |
1261 | 0 | "delete_bitmap_lock_initiator={}", |
1262 | 0 | tablet->tablet_id(), stop_token->initiator()); |
1263 | 0 | } |
1264 | 0 | return Status::OK(); |
1265 | 0 | } |
1266 | | |
1267 | 0 | Status CloudStorageEngine::_check_all_root_path_cluster_id() { |
1268 | | // Check if all root paths have the same cluster id |
1269 | 0 | std::set<int32_t> cluster_ids; |
1270 | 0 | for (const auto& path : _options.store_paths) { |
1271 | 0 | auto cluster_id_path = fmt::format("{}/{}", path.path, CLUSTER_ID_PREFIX); |
1272 | 0 | bool exists = false; |
1273 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(cluster_id_path, &exists)); |
1274 | 0 | if (exists) { |
1275 | 0 | io::FileReaderSPtr reader; |
1276 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cluster_id_path, &reader)); |
1277 | 0 | size_t fsize = reader->size(); |
1278 | 0 | if (fsize > 0) { |
1279 | 0 | std::string content; |
1280 | 0 | content.resize(fsize, '\0'); |
1281 | 0 | size_t bytes_read = 0; |
1282 | 0 | RETURN_IF_ERROR(reader->read_at(0, {content.data(), fsize}, &bytes_read)); |
1283 | 0 | DCHECK_EQ(fsize, bytes_read); |
1284 | 0 | int32_t tmp_cluster_id = std::stoi(content); |
1285 | 0 | cluster_ids.insert(tmp_cluster_id); |
1286 | 0 | } |
1287 | 0 | } |
1288 | 0 | } |
1289 | 0 | _effective_cluster_id = config::cluster_id; |
1290 | | // first init |
1291 | 0 | if (cluster_ids.empty()) { |
1292 | | // not set configured cluster id |
1293 | 0 | if (_effective_cluster_id == -1) { |
1294 | 0 | return Status::OK(); |
1295 | 0 | } else { |
1296 | | // If no cluster id file exists, use the configured cluster id |
1297 | 0 | return set_cluster_id(_effective_cluster_id); |
1298 | 0 | } |
1299 | 0 | } |
1300 | 0 | if (cluster_ids.size() > 1) { |
1301 | 0 | return Status::InternalError( |
1302 | 0 | "All root paths must have the same cluster id, but you have " |
1303 | 0 | "different cluster ids: {}", |
1304 | 0 | fmt::join(cluster_ids, ", ")); |
1305 | 0 | } |
1306 | 0 | if (_effective_cluster_id != -1 && !cluster_ids.empty() && |
1307 | 0 | *cluster_ids.begin() != _effective_cluster_id) { |
1308 | 0 | return Status::Corruption( |
1309 | 0 | "multiple cluster ids is not equal. config::cluster_id={}, " |
1310 | 0 | "storage path cluster_id={}", |
1311 | 0 | _effective_cluster_id, *cluster_ids.begin()); |
1312 | 0 | } |
1313 | 0 | return Status::OK(); |
1314 | 0 | } |
1315 | | |
1316 | 0 | Status CloudStorageEngine::set_cluster_id(int32_t cluster_id) { |
1317 | 0 | std::lock_guard<std::mutex> l(_store_lock); |
1318 | 0 | for (auto& path : _options.store_paths) { |
1319 | 0 | auto cluster_id_path = fmt::format("{}/{}", path.path, CLUSTER_ID_PREFIX); |
1320 | 0 | bool exists = false; |
1321 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(cluster_id_path, &exists)); |
1322 | 0 | if (!exists) { |
1323 | 0 | io::FileWriterPtr file_writer; |
1324 | 0 | RETURN_IF_ERROR( |
1325 | 0 | io::global_local_filesystem()->create_file(cluster_id_path, &file_writer)); |
1326 | 0 | RETURN_IF_ERROR(file_writer->append(std::to_string(cluster_id))); |
1327 | 0 | RETURN_IF_ERROR(file_writer->close()); |
1328 | 0 | } |
1329 | 0 | } |
1330 | 0 | _effective_cluster_id = cluster_id; |
1331 | 0 | return Status::OK(); |
1332 | 0 | } |
1333 | | |
1334 | | #include "common/compile_check_end.h" |
1335 | | } // namespace doris |