be/src/cloud/cloud_storage_engine.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "cloud/cloud_storage_engine.h" |
19 | | |
20 | | #include <bvar/reducer.h> |
21 | | #include <gen_cpp/PlanNodes_types.h> |
22 | | #include <gen_cpp/cloud.pb.h> |
23 | | #include <gen_cpp/olap_file.pb.h> |
24 | | #include <rapidjson/document.h> |
25 | | #include <rapidjson/encodings.h> |
26 | | #include <rapidjson/prettywriter.h> |
27 | | #include <rapidjson/stringbuffer.h> |
28 | | |
29 | | #include <algorithm> |
30 | | #include <memory> |
31 | | #include <variant> |
32 | | |
33 | | #include "cloud/cloud_base_compaction.h" |
34 | | #include "cloud/cloud_compaction_stop_token.h" |
35 | | #include "cloud/cloud_cumulative_compaction.h" |
36 | | #include "cloud/cloud_cumulative_compaction_policy.h" |
37 | | #include "cloud/cloud_full_compaction.h" |
38 | | #include "cloud/cloud_index_change_compaction.h" |
39 | | #include "cloud/cloud_meta_mgr.h" |
40 | | #include "cloud/cloud_snapshot_mgr.h" |
41 | | #include "cloud/cloud_tablet_hotspot.h" |
42 | | #include "cloud/cloud_tablet_mgr.h" |
43 | | #include "cloud/cloud_txn_delete_bitmap_cache.h" |
44 | | #include "cloud/cloud_warm_up_manager.h" |
45 | | #include "cloud/config.h" |
46 | | #include "common/config.h" |
47 | | #include "common/signal_handler.h" |
48 | | #include "common/status.h" |
49 | | #include "core/assert_cast.h" |
50 | | #include "io/cache/block_file_cache_downloader.h" |
51 | | #include "io/cache/block_file_cache_factory.h" |
52 | | #include "io/cache/file_cache_common.h" |
53 | | #include "io/fs/file_system.h" |
54 | | #include "io/fs/hdfs_file_system.h" |
55 | | #include "io/fs/s3_file_system.h" |
56 | | #include "io/hdfs_util.h" |
57 | | #include "io/io_common.h" |
58 | | #include "load/memtable/memtable_flush_executor.h" |
59 | | #include "runtime/memory/cache_manager.h" |
60 | | #include "storage/compaction/cumulative_compaction_policy.h" |
61 | | #include "storage/compaction/cumulative_compaction_time_series_policy.h" |
62 | | #include "storage/storage_policy.h" |
63 | | #include "util/parse_util.h" |
64 | | #include "util/time.h" |
65 | | |
66 | | namespace doris { |
67 | | #include "common/compile_check_begin.h" |
68 | | |
69 | | using namespace std::literals; |
70 | | |
71 | | bvar::Adder<uint64_t> g_base_compaction_running_task_count("base_compaction_running_task_count"); |
72 | | bvar::Adder<uint64_t> g_full_compaction_running_task_count("full_compaction_running_task_count"); |
73 | | bvar::Adder<uint64_t> g_cumu_compaction_running_task_count( |
74 | | "cumulative_compaction_running_task_count"); |
75 | | |
76 | 18.6k | int get_cumu_thread_num() { |
77 | 18.6k | if (config::max_cumu_compaction_threads > 0) { |
78 | 0 | return config::max_cumu_compaction_threads; |
79 | 0 | } |
80 | | |
81 | 18.6k | int num_cores = doris::CpuInfo::num_cores(); |
82 | 18.6k | return std::min(std::max(int(num_cores * config::cumu_compaction_thread_num_factor), 2), 20); |
83 | 18.6k | } |
84 | | |
85 | 18.6k | int get_base_thread_num() { |
86 | 18.6k | if (config::max_base_compaction_threads > 0) { |
87 | 18.6k | return config::max_base_compaction_threads; |
88 | 18.6k | } |
89 | | |
90 | 0 | int num_cores = doris::CpuInfo::num_cores(); |
91 | 0 | return std::min(std::max(int(num_cores * config::base_compaction_thread_num_factor), 1), 10); |
92 | 18.6k | } |
93 | | |
94 | | CloudStorageEngine::CloudStorageEngine(const EngineOptions& options) |
95 | 132 | : BaseStorageEngine(Type::CLOUD, options.backend_uid), |
96 | 132 | _meta_mgr(std::make_unique<cloud::CloudMetaMgr>()), |
97 | 132 | _tablet_mgr(std::make_unique<CloudTabletMgr>(*this)), |
98 | 132 | _options(options) { |
99 | 132 | _cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY] = |
100 | 132 | std::make_shared<CloudSizeBasedCumulativeCompactionPolicy>(); |
101 | 132 | _cumulative_compaction_policies[CUMULATIVE_TIME_SERIES_POLICY] = |
102 | 132 | std::make_shared<CloudTimeSeriesCumulativeCompactionPolicy>(); |
103 | 132 | _startup_timepoint = std::chrono::system_clock::now(); |
104 | 132 | } |
105 | | |
106 | 131 | CloudStorageEngine::~CloudStorageEngine() { |
107 | 131 | stop(); |
108 | 131 | } |
109 | | |
110 | | static Status vault_process_error(std::string_view id, |
111 | 0 | std::variant<S3Conf, cloud::HdfsVaultInfo>& vault, Status err) { |
112 | 0 | std::stringstream ss; |
113 | 0 | std::visit( |
114 | 0 | [&]<typename T>(T& val) { |
115 | 0 | if constexpr (std::is_same_v<T, S3Conf>) { |
116 | 0 | ss << val.to_string(); |
117 | 0 | } else if constexpr (std::is_same_v<T, cloud::HdfsVaultInfo>) { |
118 | 0 | val.SerializeToOstream(&ss); |
119 | 0 | } |
120 | 0 | }, Unexecuted instantiation: cloud_storage_engine.cpp:_ZZN5dorisL19vault_process_errorESt17basic_string_viewIcSt11char_traitsIcEERSt7variantIJNS_6S3ConfENS_5cloud13HdfsVaultInfoEEENS_6StatusEENK3$_0clIS5_EEDaRT_ Unexecuted instantiation: cloud_storage_engine.cpp:_ZZN5dorisL19vault_process_errorESt17basic_string_viewIcSt11char_traitsIcEERSt7variantIJNS_6S3ConfENS_5cloud13HdfsVaultInfoEEENS_6StatusEENK3$_0clIS7_EEDaRT_ |
121 | 0 | vault); |
122 | 0 | return Status::IOError("Invalid vault, id {}, err {}, detail conf {}", id, err, ss.str()); |
123 | 0 | } |
124 | | |
125 | | struct VaultCreateFSVisitor { |
126 | | VaultCreateFSVisitor(const std::string& id, const cloud::StorageVaultPB_PathFormat& path_format, |
127 | | bool check_fs) |
128 | 1 | : id(id), path_format(path_format), check_fs(check_fs) {} |
129 | 1 | Status operator()(const S3Conf& s3_conf) const { |
130 | 1 | LOG(INFO) << "get new s3 info: " << s3_conf.to_string() << " resource_id=" << id |
131 | 1 | << " check_fs: " << check_fs; |
132 | | |
133 | 1 | auto fs = DORIS_TRY(io::S3FileSystem::create(s3_conf, id)); |
134 | 1 | if (check_fs && !s3_conf.client_conf.role_arn.empty()) { |
135 | 0 | bool res = false; |
136 | | // just check connectivity, not care object if exist |
137 | 0 | auto st = fs->exists("not_exist_object", &res); |
138 | 0 | if (!st.ok()) { |
139 | 0 | LOG(FATAL) << "failed to check s3 fs, resource_id: " << id << " st: " << st |
140 | 0 | << "s3_conf: " << s3_conf.to_string() |
141 | 0 | << "add enable_check_storage_vault=false to be.conf to skip the check"; |
142 | 0 | } |
143 | 0 | } |
144 | | |
145 | 1 | put_storage_resource(id, {std::move(fs), path_format}, 0); |
146 | 1 | LOG_INFO("successfully create s3 vault, vault id {}", id); |
147 | 1 | return Status::OK(); |
148 | 1 | } |
149 | | |
150 | | // TODO(ByteYue): Make sure enable_java_support is on |
151 | 0 | Status operator()(const cloud::HdfsVaultInfo& vault) const { |
152 | 0 | auto hdfs_params = io::to_hdfs_params(vault); |
153 | 0 | auto fs = DORIS_TRY(io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name, id, |
154 | 0 | nullptr, vault.prefix())); |
155 | 0 | put_storage_resource(id, {std::move(fs), path_format}, 0); |
156 | 0 | LOG_INFO("successfully create hdfs vault, vault id {}", id); |
157 | 0 | return Status::OK(); |
158 | 0 | } |
159 | | |
160 | | const std::string& id; |
161 | | const cloud::StorageVaultPB_PathFormat& path_format; |
162 | | bool check_fs; |
163 | | }; |
164 | | |
165 | | struct RefreshFSVaultVisitor { |
166 | | RefreshFSVaultVisitor(const std::string& id, io::FileSystemSPtr fs, |
167 | | const cloud::StorageVaultPB_PathFormat& path_format) |
168 | 119 | : id(id), fs(std::move(fs)), path_format(path_format) {} |
169 | | |
170 | 119 | Status operator()(const S3Conf& s3_conf) const { |
171 | 119 | DCHECK_EQ(fs->type(), io::FileSystemType::S3) << id; |
172 | 119 | auto s3_fs = std::static_pointer_cast<io::S3FileSystem>(fs); |
173 | 119 | auto client_holder = s3_fs->client_holder(); |
174 | 119 | auto st = client_holder->reset(s3_conf.client_conf); |
175 | 119 | if (!st.ok()) { |
176 | 0 | LOG(WARNING) << "failed to update s3 fs, resource_id=" << id << ": " << st; |
177 | 0 | } |
178 | 119 | return st; |
179 | 119 | } |
180 | | |
181 | 0 | Status operator()(const cloud::HdfsVaultInfo& vault) const { |
182 | 0 | auto hdfs_params = io::to_hdfs_params(vault); |
183 | 0 | auto hdfs_fs = |
184 | 0 | DORIS_TRY(io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name, id, nullptr, |
185 | 0 | vault.has_prefix() ? vault.prefix() : "")); |
186 | 0 | auto hdfs = std::static_pointer_cast<io::HdfsFileSystem>(hdfs_fs); |
187 | 0 | put_storage_resource(id, {std::move(hdfs), path_format}, 0); |
188 | 0 | return Status::OK(); |
189 | 0 | } |
190 | | |
191 | | const std::string& id; |
192 | | io::FileSystemSPtr fs; |
193 | | const cloud::StorageVaultPB_PathFormat& path_format; |
194 | | }; |
195 | | |
196 | 1 | Status CloudStorageEngine::open() { |
197 | 1 | sync_storage_vault(); |
198 | | |
199 | | // TODO(plat1ko): DeleteBitmapTxnManager |
200 | | |
201 | 1 | _memtable_flush_executor = std::make_unique<MemTableFlushExecutor>(); |
202 | | // Use file cache disks number |
203 | 1 | _memtable_flush_executor->init( |
204 | 1 | cast_set<int32_t>(io::FileCacheFactory::instance()->get_cache_instance_size())); |
205 | | |
206 | 1 | _calc_delete_bitmap_executor = std::make_unique<CalcDeleteBitmapExecutor>(); |
207 | 1 | _calc_delete_bitmap_executor->init(config::calc_delete_bitmap_max_thread); |
208 | | |
209 | 1 | _calc_delete_bitmap_executor_for_load = std::make_unique<CalcDeleteBitmapExecutor>(); |
210 | 1 | _calc_delete_bitmap_executor_for_load->init( |
211 | 1 | config::calc_delete_bitmap_for_load_max_thread > 0 |
212 | 1 | ? config::calc_delete_bitmap_for_load_max_thread |
213 | 1 | : std::max(1, CpuInfo::num_cores() / 2)); |
214 | | |
215 | | // The default cache is set to 100MB, use memory limit to dynamic adjustment |
216 | 1 | bool is_percent = false; |
217 | 1 | int64_t delete_bitmap_agg_cache_cache_limit = |
218 | 1 | ParseUtil::parse_mem_spec(config::delete_bitmap_dynamic_agg_cache_limit, |
219 | 1 | MemInfo::mem_limit(), MemInfo::physical_mem(), &is_percent); |
220 | 1 | _txn_delete_bitmap_cache = std::make_unique<CloudTxnDeleteBitmapCache>( |
221 | 1 | delete_bitmap_agg_cache_cache_limit > config::delete_bitmap_agg_cache_capacity |
222 | 1 | ? delete_bitmap_agg_cache_cache_limit |
223 | 1 | : config::delete_bitmap_agg_cache_capacity); |
224 | 1 | RETURN_IF_ERROR(_txn_delete_bitmap_cache->init()); |
225 | | |
226 | 1 | _file_cache_block_downloader = std::make_unique<io::FileCacheBlockDownloader>(*this); |
227 | | |
228 | 1 | _cloud_warm_up_manager = std::make_unique<CloudWarmUpManager>(*this); |
229 | | |
230 | 1 | _tablet_hotspot = std::make_unique<TabletHotspot>(); |
231 | | |
232 | 1 | _cloud_snapshot_mgr = std::make_unique<CloudSnapshotMgr>(*this); |
233 | | |
234 | 1 | RETURN_NOT_OK_STATUS_WITH_WARN( |
235 | 1 | init_stream_load_recorder(ExecEnv::GetInstance()->store_paths()[0].path), |
236 | 1 | "init StreamLoadRecorder failed"); |
237 | | |
238 | | // check cluster id |
239 | 1 | RETURN_NOT_OK_STATUS_WITH_WARN(_check_all_root_path_cluster_id(), "fail to check cluster id"); |
240 | | |
241 | 1 | RETURN_NOT_OK_STATUS_WITH_WARN(ThreadPoolBuilder("SyncLoadForTabletsThreadPool") |
242 | 1 | .set_max_threads(config::sync_load_for_tablets_thread) |
243 | 1 | .set_min_threads(config::sync_load_for_tablets_thread) |
244 | 1 | .build(&_sync_load_for_tablets_thread_pool), |
245 | 1 | "fail to build SyncLoadForTabletsThreadPool"); |
246 | | |
247 | 1 | RETURN_NOT_OK_STATUS_WITH_WARN(ThreadPoolBuilder("WarmupCacheAsyncThreadPool") |
248 | 1 | .set_max_threads(config::warmup_cache_async_thread) |
249 | 1 | .set_min_threads(config::warmup_cache_async_thread) |
250 | 1 | .build(&_warmup_cache_async_thread_pool), |
251 | 1 | "fail to build WarmupCacheAsyncThreadPool"); |
252 | | |
253 | 1 | return Status::OK(); |
254 | 1 | } |
255 | | |
256 | 131 | void CloudStorageEngine::stop() { |
257 | 131 | if (_stopped) { |
258 | 0 | return; |
259 | 0 | } |
260 | | |
261 | 131 | _stopped = true; |
262 | 131 | _stop_background_threads_latch.count_down(); |
263 | | |
264 | 131 | for (auto&& t : _bg_threads) { |
265 | 0 | if (t) { |
266 | 0 | t->join(); |
267 | 0 | } |
268 | 0 | } |
269 | | |
270 | 131 | if (_base_compaction_thread_pool) { |
271 | 0 | _base_compaction_thread_pool->shutdown(); |
272 | 0 | } |
273 | 131 | if (_cumu_compaction_thread_pool) { |
274 | 0 | _cumu_compaction_thread_pool->shutdown(); |
275 | 0 | } |
276 | 131 | LOG(INFO) << "Cloud storage engine is stopped."; |
277 | | |
278 | 131 | if (_calc_tablet_delete_bitmap_task_thread_pool) { |
279 | 0 | _calc_tablet_delete_bitmap_task_thread_pool->shutdown(); |
280 | 0 | } |
281 | 131 | if (_sync_delete_bitmap_thread_pool) { |
282 | 0 | _sync_delete_bitmap_thread_pool->shutdown(); |
283 | 0 | } |
284 | 131 | } |
285 | | |
286 | 80.5k | bool CloudStorageEngine::stopped() { |
287 | 80.5k | return _stopped; |
288 | 80.5k | } |
289 | | |
290 | | Result<BaseTabletSPtr> CloudStorageEngine::get_tablet(int64_t tablet_id, |
291 | | SyncRowsetStats* sync_stats, |
292 | | bool force_use_only_cached, |
293 | 1.60M | bool cache_on_miss) { |
294 | 1.60M | return _tablet_mgr |
295 | 1.60M | ->get_tablet(tablet_id, false, true, sync_stats, force_use_only_cached, cache_on_miss) |
296 | 1.60M | .transform([](auto&& t) { return static_pointer_cast<BaseTablet>(std::move(t)); }); |
297 | 1.60M | } |
298 | | |
299 | | Status CloudStorageEngine::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta, |
300 | 164k | bool force_use_only_cached) { |
301 | 164k | if (tablet_meta == nullptr) { |
302 | 0 | return Status::InvalidArgument("tablet_meta output is null"); |
303 | 0 | } |
304 | | |
305 | | #if 0 |
306 | | if (_tablet_mgr && _tablet_mgr->peek_tablet_meta(tablet_id, tablet_meta)) { |
307 | | return Status::OK(); |
308 | | } |
309 | | |
310 | | if (force_use_only_cached) { |
311 | | return Status::NotFound("tablet meta {} not found in cache", tablet_id); |
312 | | } |
313 | | #endif |
314 | | |
315 | 164k | if (_meta_mgr == nullptr) { |
316 | 0 | return Status::InternalError("cloud meta manager is not initialized"); |
317 | 0 | } |
318 | | |
319 | 164k | return _meta_mgr->get_tablet_meta(tablet_id, tablet_meta); |
320 | 164k | } |
321 | | |
322 | 1 | Status CloudStorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr) { |
323 | 1 | RETURN_IF_ERROR(Thread::create( |
324 | 1 | "CloudStorageEngine", "refresh_s3_info_thread", |
325 | 1 | [this]() { this->_refresh_storage_vault_info_thread_callback(); }, |
326 | 1 | &_bg_threads.emplace_back())); |
327 | 1 | LOG(INFO) << "refresh s3 info thread started"; |
328 | | |
329 | 1 | RETURN_IF_ERROR(Thread::create( |
330 | 1 | "CloudStorageEngine", "vacuum_stale_rowsets_thread", |
331 | 1 | [this]() { this->_vacuum_stale_rowsets_thread_callback(); }, |
332 | 1 | &_bg_threads.emplace_back())); |
333 | 1 | LOG(INFO) << "vacuum stale rowsets thread started"; |
334 | | |
335 | 1 | RETURN_IF_ERROR(Thread::create( |
336 | 1 | "CloudStorageEngine", "sync_tablets_thread", |
337 | 1 | [this]() { this->_sync_tablets_thread_callback(); }, &_bg_threads.emplace_back())); |
338 | 1 | LOG(INFO) << "sync tablets thread started"; |
339 | | |
340 | 1 | RETURN_IF_ERROR(Thread::create( |
341 | 1 | "CloudStorageEngine", "evict_querying_rowset_thread", |
342 | 1 | [this]() { this->_evict_quring_rowset_thread_callback(); }, |
343 | 1 | &_evict_quering_rowset_thread)); |
344 | 1 | LOG(INFO) << "evict quering thread started"; |
345 | | |
346 | | // add calculate tablet delete bitmap task thread pool |
347 | 1 | RETURN_IF_ERROR(ThreadPoolBuilder("TabletCalDeleteBitmapThreadPool") |
348 | 1 | .set_min_threads(config::calc_tablet_delete_bitmap_task_max_thread) |
349 | 1 | .set_max_threads(config::calc_tablet_delete_bitmap_task_max_thread) |
350 | 1 | .build(&_calc_tablet_delete_bitmap_task_thread_pool)); |
351 | 1 | RETURN_IF_ERROR(ThreadPoolBuilder("SyncDeleteBitmapThreadPool") |
352 | 1 | .set_min_threads(config::sync_delete_bitmap_task_max_thread) |
353 | 1 | .set_max_threads(config::sync_delete_bitmap_task_max_thread) |
354 | 1 | .build(&_sync_delete_bitmap_thread_pool)); |
355 | | |
356 | | // TODO(plat1ko): check_bucket_enable_versioning_thread |
357 | | |
358 | | // compaction tasks producer thread |
359 | 1 | int base_thread_num = get_base_thread_num(); |
360 | 1 | int cumu_thread_num = get_cumu_thread_num(); |
361 | | |
362 | 1 | RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool") |
363 | 1 | .set_min_threads(base_thread_num) |
364 | 1 | .set_max_threads(base_thread_num) |
365 | 1 | .build(&_base_compaction_thread_pool)); |
366 | 1 | RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool") |
367 | 1 | .set_min_threads(cumu_thread_num) |
368 | 1 | .set_max_threads(cumu_thread_num) |
369 | 1 | .build(&_cumu_compaction_thread_pool)); |
370 | 1 | RETURN_IF_ERROR(Thread::create( |
371 | 1 | "StorageEngine", "compaction_tasks_producer_thread", |
372 | 1 | [this]() { this->_compaction_tasks_producer_callback(); }, |
373 | 1 | &_bg_threads.emplace_back())); |
374 | 1 | LOG(INFO) << "compaction tasks producer thread started," |
375 | 1 | << " base thread num " << base_thread_num << " cumu thread num " << cumu_thread_num; |
376 | | |
377 | 1 | RETURN_IF_ERROR(Thread::create( |
378 | 1 | "StorageEngine", "lease_compaction_thread", |
379 | 1 | [this]() { this->_lease_compaction_thread_callback(); }, &_bg_threads.emplace_back())); |
380 | | |
381 | 1 | LOG(INFO) << "lease compaction thread started"; |
382 | | |
383 | 1 | RETURN_IF_ERROR(Thread::create( |
384 | 1 | "StorageEngine", "check_tablet_delete_bitmap_score_thread", |
385 | 1 | [this]() { this->_check_tablet_delete_bitmap_score_callback(); }, |
386 | 1 | &_bg_threads.emplace_back())); |
387 | 1 | LOG(INFO) << "check tablet delete bitmap score thread started"; |
388 | | |
389 | 1 | return Status::OK(); |
390 | 1 | } |
391 | | |
392 | 120 | void CloudStorageEngine::sync_storage_vault() { |
393 | 120 | cloud::StorageVaultInfos vault_infos; |
394 | 120 | bool enable_storage_vault = false; |
395 | | |
396 | 120 | auto st = _meta_mgr->get_storage_vault_info(&vault_infos, &enable_storage_vault); |
397 | 120 | if (!st.ok()) { |
398 | 0 | LOG(WARNING) << "failed to get storage vault info. err=" << st; |
399 | 0 | return; |
400 | 0 | } |
401 | | |
402 | 120 | if (vault_infos.empty()) { |
403 | 0 | LOG(WARNING) << "empty storage vault info"; |
404 | 0 | return; |
405 | 0 | } |
406 | | |
407 | 120 | bool check_storage_vault = false; |
408 | 120 | bool expected = false; |
409 | 120 | if (first_sync_storage_vault.compare_exchange_strong(expected, true)) { |
410 | 0 | check_storage_vault = config::enable_check_storage_vault; |
411 | 0 | LOG(INFO) << "first sync storage vault info, BE try to check iam role connectivity, " |
412 | 0 | "check_storage_vault=" |
413 | 0 | << check_storage_vault; |
414 | 0 | } |
415 | | |
416 | 120 | for (auto& [id, vault_info, path_format] : vault_infos) { |
417 | 120 | auto fs = get_filesystem(id); |
418 | 120 | auto status = |
419 | 120 | (fs == nullptr) |
420 | 120 | ? std::visit(VaultCreateFSVisitor {id, path_format, check_storage_vault}, |
421 | 1 | vault_info) |
422 | 120 | : std::visit(RefreshFSVaultVisitor {id, std::move(fs), path_format}, |
423 | 119 | vault_info); |
424 | 120 | if (!status.ok()) [[unlikely]] { |
425 | 0 | LOG(WARNING) << vault_process_error(id, vault_info, std::move(st)); |
426 | 0 | } |
427 | 120 | } |
428 | | |
429 | 120 | if (auto& id = std::get<0>(vault_infos.back()); |
430 | 120 | (latest_fs() == nullptr || latest_fs()->id() != id) && !enable_storage_vault) { |
431 | 1 | set_latest_fs(get_filesystem(id)); |
432 | 1 | } |
433 | 120 | } |
434 | | |
435 | | // We should enable_java_support if we want to use hdfs vault |
436 | 1 | void CloudStorageEngine::_refresh_storage_vault_info_thread_callback() { |
437 | 70 | while (!_stop_background_threads_latch.wait_for( |
438 | 70 | std::chrono::seconds(config::refresh_s3_info_interval_s))) { |
439 | 69 | sync_storage_vault(); |
440 | 69 | } |
441 | 1 | } |
442 | | |
443 | 1 | void CloudStorageEngine::_vacuum_stale_rowsets_thread_callback() { |
444 | 14 | while (!_stop_background_threads_latch.wait_for( |
445 | 14 | std::chrono::seconds(config::vacuum_stale_rowsets_interval_s))) { |
446 | 13 | _tablet_mgr->vacuum_stale_rowsets(_stop_background_threads_latch); |
447 | 13 | } |
448 | 1 | } |
449 | | |
450 | 1 | void CloudStorageEngine::_sync_tablets_thread_callback() { |
451 | 7 | while (!_stop_background_threads_latch.wait_for( |
452 | 7 | std::chrono::seconds(config::schedule_sync_tablets_interval_s))) { |
453 | 6 | _tablet_mgr->sync_tablets(_stop_background_threads_latch); |
454 | 6 | } |
455 | 1 | } |
456 | | |
457 | | void CloudStorageEngine::get_cumu_compaction( |
458 | 101k | int64_t tablet_id, std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res) { |
459 | 101k | std::lock_guard lock(_compaction_mtx); |
460 | 101k | if (auto it = _submitted_cumu_compactions.find(tablet_id); |
461 | 101k | it != _submitted_cumu_compactions.end()) { |
462 | 0 | res = it->second; |
463 | 0 | } |
464 | 101k | } |
465 | | |
466 | 18.6k | Status CloudStorageEngine::_adjust_compaction_thread_num() { |
467 | 18.6k | int base_thread_num = get_base_thread_num(); |
468 | | |
469 | 18.6k | if (!_base_compaction_thread_pool || !_cumu_compaction_thread_pool) { |
470 | 0 | LOG(WARNING) << "base or cumu compaction thread pool is not created"; |
471 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>(""); |
472 | 0 | } |
473 | | |
474 | 18.6k | if (_base_compaction_thread_pool->max_threads() != base_thread_num) { |
475 | 0 | int old_max_threads = _base_compaction_thread_pool->max_threads(); |
476 | 0 | Status status = _base_compaction_thread_pool->set_max_threads(base_thread_num); |
477 | 0 | if (status.ok()) { |
478 | 0 | VLOG_NOTICE << "update base compaction thread pool max_threads from " << old_max_threads |
479 | 0 | << " to " << base_thread_num; |
480 | 0 | } |
481 | 0 | } |
482 | 18.6k | if (_base_compaction_thread_pool->min_threads() != base_thread_num) { |
483 | 0 | int old_min_threads = _base_compaction_thread_pool->min_threads(); |
484 | 0 | Status status = _base_compaction_thread_pool->set_min_threads(base_thread_num); |
485 | 0 | if (status.ok()) { |
486 | 0 | VLOG_NOTICE << "update base compaction thread pool min_threads from " << old_min_threads |
487 | 0 | << " to " << base_thread_num; |
488 | 0 | } |
489 | 0 | } |
490 | | |
491 | 18.6k | int cumu_thread_num = get_cumu_thread_num(); |
492 | 18.6k | if (_cumu_compaction_thread_pool->max_threads() != cumu_thread_num) { |
493 | 0 | int old_max_threads = _cumu_compaction_thread_pool->max_threads(); |
494 | 0 | Status status = _cumu_compaction_thread_pool->set_max_threads(cumu_thread_num); |
495 | 0 | if (status.ok()) { |
496 | 0 | VLOG_NOTICE << "update cumu compaction thread pool max_threads from " << old_max_threads |
497 | 0 | << " to " << cumu_thread_num; |
498 | 0 | } |
499 | 0 | } |
500 | 18.6k | if (_cumu_compaction_thread_pool->min_threads() != cumu_thread_num) { |
501 | 0 | int old_min_threads = _cumu_compaction_thread_pool->min_threads(); |
502 | 0 | Status status = _cumu_compaction_thread_pool->set_min_threads(cumu_thread_num); |
503 | 0 | if (status.ok()) { |
504 | 0 | VLOG_NOTICE << "update cumu compaction thread pool min_threads from " << old_min_threads |
505 | 0 | << " to " << cumu_thread_num; |
506 | 0 | } |
507 | 0 | } |
508 | 18.6k | return Status::OK(); |
509 | 18.6k | } |
510 | | |
511 | 1 | void CloudStorageEngine::_compaction_tasks_producer_callback() { |
512 | 1 | LOG(INFO) << "try to start compaction producer process!"; |
513 | | |
514 | 1 | int round = 0; |
515 | 1 | CompactionType compaction_type; |
516 | | |
517 | | // Used to record the time when the score metric was last updated. |
518 | | // The update of the score metric is accompanied by the logic of selecting the tablet. |
519 | | // If there is no slot available, the logic of selecting the tablet will be terminated, |
520 | | // which causes the score metric update to be terminated. |
521 | | // In order to avoid this situation, we need to update the score regularly. |
522 | 1 | int64_t last_cumulative_score_update_time = 0; |
523 | 1 | int64_t last_base_score_update_time = 0; |
524 | 1 | static const int64_t check_score_interval_ms = 5000; // 5 secs |
525 | | |
526 | 1 | int64_t interval = config::generate_compaction_tasks_interval_ms; |
527 | 18.6k | do { |
528 | 18.6k | int64_t cur_time = UnixMillis(); |
529 | 18.6k | if (!config::disable_auto_compaction) { |
530 | 18.6k | Status st = _adjust_compaction_thread_num(); |
531 | 18.6k | if (!st.ok()) { |
532 | 0 | break; |
533 | 0 | } |
534 | | |
535 | 18.6k | bool check_score = false; |
536 | 18.6k | if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) { |
537 | 16.8k | compaction_type = CompactionType::CUMULATIVE_COMPACTION; |
538 | 16.8k | round++; |
539 | 16.8k | if (cur_time - last_cumulative_score_update_time >= check_score_interval_ms) { |
540 | 796 | check_score = true; |
541 | 796 | last_cumulative_score_update_time = cur_time; |
542 | 796 | } |
543 | 16.8k | } else { |
544 | 1.86k | compaction_type = CompactionType::BASE_COMPACTION; |
545 | 1.86k | round = 0; |
546 | 1.86k | if (cur_time - last_base_score_update_time >= check_score_interval_ms) { |
547 | 637 | check_score = true; |
548 | 637 | last_base_score_update_time = cur_time; |
549 | 637 | } |
550 | 1.86k | } |
551 | 18.6k | std::unique_ptr<ThreadPool>& thread_pool = |
552 | 18.6k | (compaction_type == CompactionType::CUMULATIVE_COMPACTION) |
553 | 18.6k | ? _cumu_compaction_thread_pool |
554 | 18.6k | : _base_compaction_thread_pool; |
555 | 18.6k | VLOG_CRITICAL << "compaction thread pool. type: " |
556 | 0 | << (compaction_type == CompactionType::CUMULATIVE_COMPACTION ? "CUMU" |
557 | 0 | : "BASE") |
558 | 0 | << ", num_threads: " << thread_pool->num_threads() |
559 | 0 | << ", num_threads_pending_start: " |
560 | 0 | << thread_pool->num_threads_pending_start() |
561 | 0 | << ", num_active_threads: " << thread_pool->num_active_threads() |
562 | 0 | << ", max_threads: " << thread_pool->max_threads() |
563 | 0 | << ", min_threads: " << thread_pool->min_threads() |
564 | 0 | << ", num_total_queued_tasks: " << thread_pool->get_queue_size(); |
565 | 18.6k | std::vector<CloudTabletSPtr> tablets_compaction = |
566 | 18.6k | _generate_cloud_compaction_tasks(compaction_type, check_score); |
567 | | |
568 | | /// Regardless of whether the tablet is submitted for compaction or not, |
569 | | /// we need to call 'reset_compaction' to clean up the base_compaction or cumulative_compaction objects |
570 | | /// in the tablet, because these two objects store the tablet's own shared_ptr. |
571 | | /// If it is not cleaned up, the reference count of the tablet will always be greater than 1, |
572 | | /// thus cannot be collected by the garbage collector. (TabletManager::start_trash_sweep) |
573 | 104k | for (const auto& tablet : tablets_compaction) { |
574 | 104k | Status status = submit_compaction_task(tablet, compaction_type); |
575 | 104k | if (status.ok()) continue; |
576 | 94.5k | if ((!status.is<ErrorCode::BE_NO_SUITABLE_VERSION>() && |
577 | 94.5k | !status.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) || |
578 | 94.5k | VLOG_DEBUG_IS_ON) { |
579 | 111 | LOG(WARNING) << "failed to submit compaction task for tablet: " |
580 | 111 | << tablet->tablet_id() << ", err: " << status; |
581 | 111 | } |
582 | 94.5k | } |
583 | 18.6k | interval = config::generate_compaction_tasks_interval_ms; |
584 | 18.6k | } else { |
585 | 0 | interval = config::check_auto_compaction_interval_seconds * 1000; |
586 | 0 | } |
587 | 18.6k | int64_t end_time = UnixMillis(); |
588 | 18.6k | DorisMetrics::instance()->compaction_producer_callback_a_round_time->set_value(end_time - |
589 | 18.6k | cur_time); |
590 | 18.6k | } while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval))); |
591 | 1 | } |
592 | | |
593 | | void CloudStorageEngine::unregister_index_change_compaction(int64_t tablet_id, |
594 | 508 | bool is_base_compact) { |
595 | 508 | std::lock_guard lock(_compaction_mtx); |
596 | 508 | if (is_base_compact) { |
597 | 0 | _submitted_index_change_base_compaction.erase(tablet_id); |
598 | 508 | } else { |
599 | 508 | _submitted_index_change_cumu_compaction.erase(tablet_id); |
600 | 508 | } |
601 | 508 | } |
602 | | |
603 | | bool CloudStorageEngine::register_index_change_compaction( |
604 | | std::shared_ptr<CloudIndexChangeCompaction> compact, int64_t tablet_id, |
605 | 510 | bool is_base_compact, std::string& err_reason) { |
606 | 510 | std::lock_guard lock(_compaction_mtx); |
607 | 510 | if (is_base_compact) { |
608 | 2 | if (_submitted_base_compactions.contains(tablet_id) || |
609 | 2 | _submitted_full_compactions.contains(tablet_id) || |
610 | 2 | _submitted_index_change_base_compaction.contains(tablet_id)) { |
611 | 1 | std::stringstream ss; |
612 | 1 | ss << "reason:" << ((int)_submitted_base_compactions.contains(tablet_id)) << ", " |
613 | 1 | << ((int)_submitted_full_compactions.contains(tablet_id)) << ", " |
614 | 1 | << ((int)_submitted_index_change_base_compaction.contains(tablet_id)); |
615 | 1 | err_reason = ss.str(); |
616 | 1 | return false; |
617 | 1 | } else { |
618 | 1 | _submitted_index_change_base_compaction[tablet_id] = compact; |
619 | 1 | return true; |
620 | 1 | } |
621 | 508 | } else { |
622 | 508 | if (_tablet_preparing_cumu_compaction.contains(tablet_id) || |
623 | 512 | _submitted_cumu_compactions.contains(tablet_id) || |
624 | 511 | _submitted_index_change_cumu_compaction.contains(tablet_id)) { |
625 | 1 | std::stringstream ss; |
626 | 1 | ss << "reason:" << ((int)_tablet_preparing_cumu_compaction.contains(tablet_id)) << ", " |
627 | 1 | << ((int)_submitted_cumu_compactions.contains(tablet_id)) << ", " |
628 | 1 | << ((int)_submitted_index_change_cumu_compaction.contains(tablet_id)); |
629 | 1 | err_reason = ss.str(); |
630 | 1 | return false; |
631 | 507 | } else { |
632 | 507 | _submitted_index_change_cumu_compaction[tablet_id] = compact; |
633 | 507 | } |
634 | 507 | return true; |
635 | 508 | } |
636 | 510 | } |
637 | | |
638 | | std::vector<CloudTabletSPtr> CloudStorageEngine::_generate_cloud_compaction_tasks( |
639 | 18.6k | CompactionType compaction_type, bool check_score) { |
640 | 18.6k | std::vector<std::shared_ptr<CloudTablet>> tablets_compaction; |
641 | | |
642 | 18.6k | int64_t max_compaction_score = 0; |
643 | 18.6k | std::unordered_set<int64_t> tablet_preparing_cumu_compaction; |
644 | 18.6k | std::unordered_map<int64_t, std::vector<std::shared_ptr<CloudCumulativeCompaction>>> |
645 | 18.6k | submitted_cumu_compactions; |
646 | 18.6k | std::unordered_map<int64_t, std::shared_ptr<CloudBaseCompaction>> submitted_base_compactions; |
647 | 18.6k | std::unordered_map<int64_t, std::shared_ptr<CloudFullCompaction>> submitted_full_compactions; |
648 | 18.6k | std::unordered_map<int64_t, std::shared_ptr<CloudIndexChangeCompaction>> |
649 | 18.6k | submitted_index_change_cumu_compactions; |
650 | 18.6k | std::unordered_map<int64_t, std::shared_ptr<CloudIndexChangeCompaction>> |
651 | 18.6k | submitted_index_change_base_compactions; |
652 | 18.6k | { |
653 | 18.6k | std::lock_guard lock(_compaction_mtx); |
654 | 18.6k | tablet_preparing_cumu_compaction = _tablet_preparing_cumu_compaction; |
655 | 18.6k | submitted_cumu_compactions = _submitted_cumu_compactions; |
656 | 18.6k | submitted_base_compactions = _submitted_base_compactions; |
657 | 18.6k | submitted_full_compactions = _submitted_full_compactions; |
658 | 18.6k | submitted_index_change_cumu_compactions = _submitted_index_change_cumu_compaction; |
659 | 18.6k | submitted_index_change_base_compactions = _submitted_index_change_base_compaction; |
660 | 18.6k | } |
661 | | |
662 | 18.6k | bool need_pick_tablet = true; |
663 | 18.6k | int thread_per_disk = |
664 | 18.6k | config::compaction_task_num_per_fast_disk; // all disks are fast in cloud mode |
665 | 18.6k | int num_cumu = |
666 | 18.6k | std::accumulate(submitted_cumu_compactions.begin(), submitted_cumu_compactions.end(), 0, |
667 | 20.8k | [](int a, auto& b) { return a + b.second.size(); }); |
668 | 18.6k | int num_base = |
669 | 18.6k | cast_set<int>(submitted_base_compactions.size() + submitted_full_compactions.size()); |
670 | 18.6k | int n = thread_per_disk - num_cumu - num_base; |
671 | 18.6k | if (compaction_type == CompactionType::BASE_COMPACTION) { |
672 | | // We need to reserve at least one thread for cumulative compaction, |
673 | | // because base compactions may take too long to complete, which may |
674 | | // leads to "too many rowsets" error. |
675 | 1.86k | int base_n = std::min(config::max_base_compaction_task_num_per_disk, thread_per_disk - 1) - |
676 | 1.86k | num_base; |
677 | 1.86k | n = std::min(base_n, n); |
678 | 1.86k | } |
679 | 18.6k | if (n <= 0) { // No threads available |
680 | 473 | if (!check_score) return tablets_compaction; |
681 | 30 | need_pick_tablet = false; |
682 | 30 | n = 0; |
683 | 30 | } |
684 | | |
685 | | // Return true for skipping compaction |
686 | 18.2k | std::function<bool(CloudTablet*)> filter_out; |
687 | 18.2k | if (compaction_type == CompactionType::BASE_COMPACTION) { |
688 | 1.82k | filter_out = [&submitted_base_compactions, &submitted_full_compactions, |
689 | 59.4M | &submitted_index_change_base_compactions](CloudTablet* t) { |
690 | 59.4M | return submitted_base_compactions.contains(t->tablet_id()) || |
691 | 59.4M | submitted_full_compactions.contains(t->tablet_id()) || |
692 | 59.4M | submitted_index_change_base_compactions.contains(t->tablet_id()) || |
693 | 59.4M | t->tablet_state() != TABLET_RUNNING; |
694 | 59.4M | }; |
695 | 16.4k | } else if (config::enable_parallel_cumu_compaction) { |
696 | 0 | filter_out = [&tablet_preparing_cumu_compaction, |
697 | 0 | &submitted_index_change_cumu_compactions](CloudTablet* t) { |
698 | 0 | return tablet_preparing_cumu_compaction.contains(t->tablet_id()) || |
699 | 0 | submitted_index_change_cumu_compactions.contains(t->tablet_id()) || |
700 | 0 | (t->tablet_state() != TABLET_RUNNING && |
701 | 0 | (!config::enable_new_tablet_do_compaction || t->alter_version() == -1)); |
702 | 0 | }; |
703 | 16.4k | } else { |
704 | 16.4k | filter_out = [&tablet_preparing_cumu_compaction, &submitted_cumu_compactions, |
705 | 415M | &submitted_index_change_cumu_compactions](CloudTablet* t) { |
706 | 415M | return tablet_preparing_cumu_compaction.contains(t->tablet_id()) || |
707 | 415M | submitted_index_change_cumu_compactions.contains(t->tablet_id()) || |
708 | 415M | submitted_cumu_compactions.contains(t->tablet_id()) || |
709 | 415M | (t->tablet_state() != TABLET_RUNNING && |
710 | 415M | (!config::enable_new_tablet_do_compaction || t->alter_version() == -1)); |
711 | 415M | }; |
712 | 16.4k | } |
713 | | |
714 | | // Even if need_pick_tablet is false, we still need to call find_best_tablet_to_compaction(), |
715 | | // So that we can update the max_compaction_score metric. |
716 | 18.2k | do { |
717 | 18.2k | std::vector<CloudTabletSPtr> tablets; |
718 | 18.2k | auto st = tablet_mgr().get_topn_tablets_to_compact(n, compaction_type, filter_out, &tablets, |
719 | 18.2k | &max_compaction_score); |
720 | 18.2k | if (!st.ok()) { |
721 | 0 | LOG(WARNING) << "failed to get tablets to compact, err=" << st; |
722 | 0 | break; |
723 | 0 | } |
724 | 18.2k | if (!need_pick_tablet) break; |
725 | 18.2k | tablets_compaction = std::move(tablets); |
726 | 18.2k | } while (false); |
727 | | |
728 | 18.2k | if (max_compaction_score > 0) { |
729 | 16.4k | if (compaction_type == CompactionType::BASE_COMPACTION) { |
730 | 1.77k | DorisMetrics::instance()->tablet_base_max_compaction_score->set_value( |
731 | 1.77k | max_compaction_score); |
732 | 14.6k | } else { |
733 | 14.6k | DorisMetrics::instance()->tablet_cumulative_max_compaction_score->set_value( |
734 | 14.6k | max_compaction_score); |
735 | 14.6k | } |
736 | 16.4k | } |
737 | | |
738 | 18.2k | return tablets_compaction; |
739 | 18.6k | } |
740 | | |
741 | | Status CloudStorageEngine::_request_tablet_global_compaction_lock( |
742 | | ReaderType compaction_type, const CloudTabletSPtr& tablet, |
743 | 10.0k | std::shared_ptr<CloudCompactionMixin> compaction) { |
744 | 10.0k | long now = duration_cast<std::chrono::milliseconds>( |
745 | 10.0k | std::chrono::system_clock::now().time_since_epoch()) |
746 | 10.0k | .count(); |
747 | 10.0k | if (compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION) { |
748 | 9.89k | auto cumu_compaction = static_pointer_cast<CloudCumulativeCompaction>(compaction); |
749 | 9.89k | if (auto st = cumu_compaction->request_global_lock(); !st.ok()) { |
750 | 296 | LOG_WARNING("failed to request cumu compactoin global lock") |
751 | 296 | .tag("tablet id", tablet->tablet_id()) |
752 | 296 | .tag("msg", st.to_string()); |
753 | 296 | tablet->set_last_cumu_compaction_failure_time(now); |
754 | 296 | return st; |
755 | 296 | } |
756 | 9.60k | { |
757 | 9.60k | std::lock_guard lock(_compaction_mtx); |
758 | 9.60k | _executing_cumu_compactions[tablet->tablet_id()].push_back(cumu_compaction); |
759 | 9.60k | } |
760 | 9.60k | return Status::OK(); |
761 | 9.89k | } else if (compaction_type == ReaderType::READER_BASE_COMPACTION) { |
762 | 87 | auto base_compaction = static_pointer_cast<CloudBaseCompaction>(compaction); |
763 | 87 | if (auto st = base_compaction->request_global_lock(); !st.ok()) { |
764 | 7 | LOG_WARNING("failed to request base compactoin global lock") |
765 | 7 | .tag("tablet id", tablet->tablet_id()) |
766 | 7 | .tag("msg", st.to_string()); |
767 | 7 | tablet->set_last_base_compaction_failure_time(now); |
768 | 7 | return st; |
769 | 7 | } |
770 | 80 | { |
771 | 80 | std::lock_guard lock(_compaction_mtx); |
772 | 80 | _executing_base_compactions[tablet->tablet_id()] = base_compaction; |
773 | 80 | } |
774 | 80 | return Status::OK(); |
775 | 104 | } else if (compaction_type == ReaderType::READER_FULL_COMPACTION) { |
776 | 104 | auto full_compaction = static_pointer_cast<CloudFullCompaction>(compaction); |
777 | 104 | if (auto st = full_compaction->request_global_lock(); !st.ok()) { |
778 | 0 | LOG_WARNING("failed to request full compactoin global lock") |
779 | 0 | .tag("tablet id", tablet->tablet_id()) |
780 | 0 | .tag("msg", st.to_string()); |
781 | 0 | tablet->set_last_full_compaction_failure_time(now); |
782 | 0 | return st; |
783 | 0 | } |
784 | 104 | { |
785 | 104 | std::lock_guard lock(_compaction_mtx); |
786 | 104 | _executing_full_compactions[tablet->tablet_id()] = full_compaction; |
787 | 104 | } |
788 | 104 | return Status::OK(); |
789 | 104 | } else { |
790 | 0 | LOG(WARNING) << "unsupport compaction task for tablet: " << tablet->tablet_id() |
791 | 0 | << ", compaction name: " << compaction->compaction_name(); |
792 | 0 | return Status::NotFound("Unsupport compaction type {}", compaction->compaction_name()); |
793 | 0 | } |
794 | 10.0k | } |
795 | | |
796 | 3.24k | Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& tablet) { |
797 | 3.24k | using namespace std::chrono; |
798 | 3.24k | { |
799 | 3.24k | std::lock_guard lock(_compaction_mtx); |
800 | | // Take a placeholder for base compaction |
801 | 3.24k | auto [_, success] = _submitted_base_compactions.emplace(tablet->tablet_id(), nullptr); |
802 | 3.24k | if (!success) { |
803 | 0 | return Status::AlreadyExist( |
804 | 0 | "other base compaction or full compaction is submitted, tablet_id={}", |
805 | 0 | tablet->tablet_id()); |
806 | 0 | } |
807 | 3.24k | } |
808 | 3.24k | auto compaction = std::make_shared<CloudBaseCompaction>(*this, tablet); |
809 | 3.24k | auto st = compaction->prepare_compact(); |
810 | 3.24k | if (!st.ok()) { |
811 | 3.15k | long now = duration_cast<std::chrono::milliseconds>( |
812 | 3.15k | std::chrono::system_clock::now().time_since_epoch()) |
813 | 3.15k | .count(); |
814 | 3.15k | tablet->set_last_base_compaction_failure_time(now); |
815 | 3.15k | std::lock_guard lock(_compaction_mtx); |
816 | 3.15k | _submitted_base_compactions.erase(tablet->tablet_id()); |
817 | 3.15k | return st; |
818 | 3.15k | } |
819 | 87 | { |
820 | 87 | std::lock_guard lock(_compaction_mtx); |
821 | 87 | _submitted_base_compactions[tablet->tablet_id()] = compaction; |
822 | 87 | } |
823 | 87 | st = _base_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() { |
824 | 87 | DorisMetrics::instance()->base_compaction_task_running_total->increment(1); |
825 | 87 | DorisMetrics::instance()->base_compaction_task_pending_total->set_value( |
826 | 87 | _base_compaction_thread_pool->get_queue_size()); |
827 | 87 | g_base_compaction_running_task_count << 1; |
828 | 87 | signal::tablet_id = tablet->tablet_id(); |
829 | 87 | Defer defer {[&]() { |
830 | 87 | g_base_compaction_running_task_count << -1; |
831 | 87 | std::lock_guard lock(_compaction_mtx); |
832 | 87 | _submitted_base_compactions.erase(tablet->tablet_id()); |
833 | 87 | DorisMetrics::instance()->base_compaction_task_running_total->increment(-1); |
834 | 87 | DorisMetrics::instance()->base_compaction_task_pending_total->set_value( |
835 | 87 | _base_compaction_thread_pool->get_queue_size()); |
836 | 87 | }}; |
837 | 87 | auto st = _request_tablet_global_compaction_lock(ReaderType::READER_BASE_COMPACTION, tablet, |
838 | 87 | compaction); |
839 | 87 | if (!st.ok()) return; |
840 | 80 | st = compaction->execute_compact(); |
841 | 80 | if (!st.ok()) { |
842 | | // Error log has been output in `execute_compact` |
843 | 0 | long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
844 | 0 | tablet->set_last_base_compaction_failure_time(now); |
845 | 0 | } |
846 | 80 | std::lock_guard lock(_compaction_mtx); |
847 | 80 | _executing_base_compactions.erase(tablet->tablet_id()); |
848 | 80 | }); |
849 | 87 | DorisMetrics::instance()->base_compaction_task_pending_total->set_value( |
850 | 87 | _base_compaction_thread_pool->get_queue_size()); |
851 | 87 | if (!st.ok()) { |
852 | 0 | std::lock_guard lock(_compaction_mtx); |
853 | 0 | _submitted_base_compactions.erase(tablet->tablet_id()); |
854 | 0 | return Status::InternalError("failed to submit base compaction, tablet_id={}", |
855 | 0 | tablet->tablet_id()); |
856 | 0 | } |
857 | 87 | return st; |
858 | 87 | } |
859 | | |
860 | 101k | Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletSPtr& tablet) { |
861 | 101k | using namespace std::chrono; |
862 | 101k | { |
863 | 101k | std::lock_guard lock(_compaction_mtx); |
864 | 101k | if (!config::enable_parallel_cumu_compaction && |
865 | 101k | _submitted_cumu_compactions.count(tablet->tablet_id())) { |
866 | 38 | return Status::AlreadyExist("other cumu compaction is submitted, tablet_id={}", |
867 | 38 | tablet->tablet_id()); |
868 | 38 | } |
869 | 101k | auto [_, success] = _tablet_preparing_cumu_compaction.insert(tablet->tablet_id()); |
870 | 101k | if (!success) { |
871 | 2 | return Status::AlreadyExist("other cumu compaction is preparing, tablet_id={}", |
872 | 2 | tablet->tablet_id()); |
873 | 2 | } |
874 | 101k | } |
875 | 101k | auto compaction = std::make_shared<CloudCumulativeCompaction>(*this, tablet); |
876 | 101k | auto st = compaction->prepare_compact(); |
877 | 101k | if (!st.ok()) { |
878 | 91.5k | long now = duration_cast<std::chrono::milliseconds>( |
879 | 91.5k | std::chrono::system_clock::now().time_since_epoch()) |
880 | 91.5k | .count(); |
881 | 91.5k | if (!st.is<ErrorCode::CUMULATIVE_MEET_DELETE_VERSION>()) { |
882 | 91.4k | if (st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) { |
883 | | // Backoff strategy if no suitable version |
884 | 91.4k | tablet->last_cumu_no_suitable_version_ms = now; |
885 | 91.4k | } else { |
886 | 0 | tablet->set_last_cumu_compaction_failure_time(now); |
887 | 0 | } |
888 | 91.4k | } |
889 | 91.5k | std::lock_guard lock(_compaction_mtx); |
890 | 91.5k | _tablet_preparing_cumu_compaction.erase(tablet->tablet_id()); |
891 | 91.5k | return st; |
892 | 91.5k | } |
893 | 9.89k | { |
894 | 9.89k | std::lock_guard lock(_compaction_mtx); |
895 | 9.89k | _tablet_preparing_cumu_compaction.erase(tablet->tablet_id()); |
896 | 9.89k | _submitted_cumu_compactions[tablet->tablet_id()].push_back(compaction); |
897 | 9.89k | } |
898 | 9.89k | auto erase_submitted_cumu_compaction = [=, this]() { |
899 | 9.89k | std::lock_guard lock(_compaction_mtx); |
900 | 9.89k | auto it = _submitted_cumu_compactions.find(tablet->tablet_id()); |
901 | 9.89k | DCHECK(it != _submitted_cumu_compactions.end()); |
902 | 9.89k | auto& compactions = it->second; |
903 | 9.89k | auto it1 = std::find(compactions.begin(), compactions.end(), compaction); |
904 | 9.89k | DCHECK(it1 != compactions.end()); |
905 | 9.89k | compactions.erase(it1); |
906 | 9.89k | if (compactions.empty()) { // No compactions on this tablet, erase key |
907 | 9.89k | _submitted_cumu_compactions.erase(it); |
908 | | // No cumu compaction on this tablet, reset `last_cumu_no_suitable_version_ms` to enable this tablet to |
909 | | // enter the compaction scheduling candidate set. The purpose of doing this is to have at least one BE perform |
910 | | // cumu compaction on tablet which has suitable versions for cumu compaction. |
911 | 9.89k | tablet->last_cumu_no_suitable_version_ms = 0; |
912 | 9.89k | } |
913 | 9.89k | }; |
914 | 9.89k | auto erase_executing_cumu_compaction = [=, this]() { |
915 | 9.56k | std::lock_guard lock(_compaction_mtx); |
916 | 9.56k | auto it = _executing_cumu_compactions.find(tablet->tablet_id()); |
917 | 9.56k | DCHECK(it != _executing_cumu_compactions.end()); |
918 | 9.56k | auto& compactions = it->second; |
919 | 9.56k | auto it1 = std::find(compactions.begin(), compactions.end(), compaction); |
920 | 9.56k | DCHECK(it1 != compactions.end()); |
921 | 9.56k | compactions.erase(it1); |
922 | 9.60k | if (compactions.empty()) { // No compactions on this tablet, erase key |
923 | 9.60k | _executing_cumu_compactions.erase(it); |
924 | | // No cumu compaction on this tablet, reset `last_cumu_no_suitable_version_ms` to enable this tablet to |
925 | | // enter the compaction scheduling candidate set. The purpose of doing this is to have at least one BE perform |
926 | | // cumu compaction on tablet which has suitable versions for cumu compaction. |
927 | 9.60k | tablet->last_cumu_no_suitable_version_ms = 0; |
928 | 9.60k | } |
929 | 9.56k | }; |
930 | 9.89k | st = _cumu_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() { |
931 | 9.89k | DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(1); |
932 | 9.89k | DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value( |
933 | 9.89k | _cumu_compaction_thread_pool->get_queue_size()); |
934 | 9.89k | DBUG_EXECUTE_IF("CloudStorageEngine._submit_cumulative_compaction_task.wait_in_line", |
935 | 9.89k | { sleep(5); }) |
936 | 9.89k | signal::tablet_id = tablet->tablet_id(); |
937 | 9.89k | g_cumu_compaction_running_task_count << 1; |
938 | 9.89k | bool is_large_task = true; |
939 | 9.89k | Defer defer {[&]() { |
940 | 9.89k | DBUG_EXECUTE_IF("CloudStorageEngine._submit_cumulative_compaction_task.sleep", |
941 | 9.89k | { sleep(5); }) |
942 | 9.89k | std::lock_guard lock(_cumu_compaction_delay_mtx); |
943 | 9.89k | _cumu_compaction_thread_pool_used_threads--; |
944 | 9.89k | if (!is_large_task) { |
945 | 9.60k | _cumu_compaction_thread_pool_small_tasks_running--; |
946 | 9.60k | } |
947 | 9.89k | g_cumu_compaction_running_task_count << -1; |
948 | 9.89k | erase_submitted_cumu_compaction(); |
949 | 9.89k | DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(-1); |
950 | 9.89k | DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value( |
951 | 9.89k | _cumu_compaction_thread_pool->get_queue_size()); |
952 | 9.89k | }}; |
953 | 9.89k | auto st = _request_tablet_global_compaction_lock(ReaderType::READER_CUMULATIVE_COMPACTION, |
954 | 9.89k | tablet, compaction); |
955 | 9.89k | if (!st.ok()) return; |
956 | 9.60k | do { |
957 | 9.60k | std::lock_guard lock(_cumu_compaction_delay_mtx); |
958 | 9.60k | _cumu_compaction_thread_pool_used_threads++; |
959 | 9.60k | if (config::large_cumu_compaction_task_min_thread_num > 1 && |
960 | 9.60k | _cumu_compaction_thread_pool->max_threads() >= |
961 | 9.60k | config::large_cumu_compaction_task_min_thread_num) { |
962 | | // Determine if this is a small task based on configured thresholds |
963 | 9.60k | is_large_task = (compaction->get_input_rowsets_bytes() > |
964 | 9.60k | config::large_cumu_compaction_task_bytes_threshold || |
965 | 9.60k | compaction->get_input_num_rows() > |
966 | 9.60k | config::large_cumu_compaction_task_row_num_threshold); |
967 | | // Small task. No delay needed |
968 | 9.60k | if (!is_large_task) { |
969 | 9.60k | _cumu_compaction_thread_pool_small_tasks_running++; |
970 | 9.60k | break; |
971 | 9.60k | } |
972 | | // Deal with large task |
973 | 0 | if (_should_delay_large_task()) { |
974 | 0 | long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()) |
975 | 0 | .count(); |
976 | | // sleep 5s for this tablet |
977 | 0 | tablet->set_last_cumu_compaction_failure_time(now); |
978 | 0 | erase_executing_cumu_compaction(); |
979 | 0 | LOG_WARNING( |
980 | 0 | "failed to do CloudCumulativeCompaction, cumu thread pool is " |
981 | 0 | "intensive, delay large task.") |
982 | 0 | .tag("tablet_id", tablet->tablet_id()) |
983 | 0 | .tag("input_rows", compaction->get_input_num_rows()) |
984 | 0 | .tag("input_rowsets_total_size", compaction->get_input_rowsets_bytes()) |
985 | 0 | .tag("config::large_cumu_compaction_task_bytes_threshold", |
986 | 0 | config::large_cumu_compaction_task_bytes_threshold) |
987 | 0 | .tag("config::large_cumu_compaction_task_row_num_threshold", |
988 | 0 | config::large_cumu_compaction_task_row_num_threshold) |
989 | 0 | .tag("remaining threads", _cumu_compaction_thread_pool_used_threads) |
990 | 0 | .tag("small_tasks_running", |
991 | 0 | _cumu_compaction_thread_pool_small_tasks_running); |
992 | 0 | return; |
993 | 0 | } |
994 | 0 | } |
995 | 9.60k | } while (false); |
996 | 9.60k | st = compaction->execute_compact(); |
997 | 9.60k | if (!st.ok()) { |
998 | | // Error log has been output in `execute_compact` |
999 | 106 | long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
1000 | 106 | tablet->set_last_cumu_compaction_failure_time(now); |
1001 | 106 | } |
1002 | 9.60k | erase_executing_cumu_compaction(); |
1003 | 9.60k | }); |
1004 | 9.89k | DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value( |
1005 | 9.89k | _cumu_compaction_thread_pool->get_queue_size()); |
1006 | 9.89k | if (!st.ok()) { |
1007 | 0 | erase_submitted_cumu_compaction(); |
1008 | 0 | return Status::InternalError("failed to submit cumu compaction, tablet_id={}", |
1009 | 0 | tablet->tablet_id()); |
1010 | 0 | } |
1011 | 9.89k | return st; |
1012 | 9.89k | } |
1013 | | |
1014 | 105 | Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& tablet) { |
1015 | 105 | using namespace std::chrono; |
1016 | 105 | { |
1017 | 105 | std::lock_guard lock(_compaction_mtx); |
1018 | | // Take a placeholder for full compaction |
1019 | 105 | auto [_, success] = _submitted_full_compactions.emplace(tablet->tablet_id(), nullptr); |
1020 | 105 | if (!success) { |
1021 | 0 | return Status::AlreadyExist( |
1022 | 0 | "other full compaction or base compaction is submitted, tablet_id={}", |
1023 | 0 | tablet->tablet_id()); |
1024 | 0 | } |
1025 | 105 | } |
1026 | | //auto compaction = std::make_shared<CloudFullCompaction>(tablet); |
1027 | 105 | auto compaction = std::make_shared<CloudFullCompaction>(*this, tablet); |
1028 | 105 | auto st = compaction->prepare_compact(); |
1029 | 105 | if (!st.ok()) { |
1030 | 1 | long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
1031 | 1 | tablet->set_last_full_compaction_failure_time(now); |
1032 | 1 | std::lock_guard lock(_compaction_mtx); |
1033 | 1 | _submitted_full_compactions.erase(tablet->tablet_id()); |
1034 | 1 | return st; |
1035 | 1 | } |
1036 | 104 | { |
1037 | 104 | std::lock_guard lock(_compaction_mtx); |
1038 | 104 | _submitted_full_compactions[tablet->tablet_id()] = compaction; |
1039 | 104 | } |
1040 | 104 | st = _base_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() { |
1041 | 104 | g_full_compaction_running_task_count << 1; |
1042 | 104 | signal::tablet_id = tablet->tablet_id(); |
1043 | 104 | Defer defer {[&]() { |
1044 | 104 | g_full_compaction_running_task_count << -1; |
1045 | 104 | std::lock_guard lock(_compaction_mtx); |
1046 | 104 | _submitted_full_compactions.erase(tablet->tablet_id()); |
1047 | 104 | }}; |
1048 | 104 | auto st = _request_tablet_global_compaction_lock(ReaderType::READER_FULL_COMPACTION, tablet, |
1049 | 104 | compaction); |
1050 | 104 | if (!st.ok()) return; |
1051 | 104 | st = compaction->execute_compact(); |
1052 | 104 | if (!st.ok()) { |
1053 | | // Error log has been output in `execute_compact` |
1054 | 0 | long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
1055 | 0 | tablet->set_last_full_compaction_failure_time(now); |
1056 | 0 | } |
1057 | 104 | std::lock_guard lock(_compaction_mtx); |
1058 | 104 | _executing_full_compactions.erase(tablet->tablet_id()); |
1059 | 104 | }); |
1060 | 104 | if (!st.ok()) { |
1061 | 0 | std::lock_guard lock(_compaction_mtx); |
1062 | 0 | _submitted_full_compactions.erase(tablet->tablet_id()); |
1063 | 0 | return Status::InternalError("failed to submit full compaction, tablet_id={}", |
1064 | 0 | tablet->tablet_id()); |
1065 | 0 | } |
1066 | 104 | return st; |
1067 | 104 | } |
1068 | | |
1069 | | Status CloudStorageEngine::submit_compaction_task(const CloudTabletSPtr& tablet, |
1070 | 104k | CompactionType compaction_type) { |
1071 | 104k | DCHECK(compaction_type == CompactionType::CUMULATIVE_COMPACTION || |
1072 | 104k | compaction_type == CompactionType::BASE_COMPACTION || |
1073 | 104k | compaction_type == CompactionType::FULL_COMPACTION); |
1074 | 104k | switch (compaction_type) { |
1075 | 3.24k | case CompactionType::BASE_COMPACTION: |
1076 | 3.24k | RETURN_IF_ERROR(_submit_base_compaction_task(tablet)); |
1077 | 87 | return Status::OK(); |
1078 | 101k | case CompactionType::CUMULATIVE_COMPACTION: |
1079 | 101k | RETURN_IF_ERROR(_submit_cumulative_compaction_task(tablet)); |
1080 | 9.89k | return Status::OK(); |
1081 | 105 | case CompactionType::FULL_COMPACTION: |
1082 | 105 | RETURN_IF_ERROR(_submit_full_compaction_task(tablet)); |
1083 | 104 | return Status::OK(); |
1084 | 0 | default: |
1085 | 0 | return Status::InternalError("unknown compaction type!"); |
1086 | 104k | } |
1087 | 104k | } |
1088 | | |
1089 | 1 | void CloudStorageEngine::_lease_compaction_thread_callback() { |
1090 | 209 | while (!_stop_background_threads_latch.wait_for( |
1091 | 209 | std::chrono::seconds(config::lease_compaction_interval_seconds))) { |
1092 | 208 | std::vector<std::shared_ptr<CloudFullCompaction>> full_compactions; |
1093 | 208 | std::vector<std::shared_ptr<CloudBaseCompaction>> base_compactions; |
1094 | 208 | std::vector<std::shared_ptr<CloudCumulativeCompaction>> cumu_compactions; |
1095 | 208 | std::vector<std::shared_ptr<CloudCompactionStopToken>> compation_stop_tokens; |
1096 | 208 | std::vector<std::shared_ptr<CloudIndexChangeCompaction>> index_change_compations; |
1097 | 208 | { |
1098 | 208 | std::lock_guard lock(_compaction_mtx); |
1099 | 208 | for (auto& [_, base] : _executing_base_compactions) { |
1100 | 3 | if (base) { // `base` might be a nullptr placeholder |
1101 | 3 | base_compactions.push_back(base); |
1102 | 3 | } |
1103 | 3 | } |
1104 | 248 | for (auto& [_, cumus] : _executing_cumu_compactions) { |
1105 | 248 | for (auto& cumu : cumus) { |
1106 | 248 | cumu_compactions.push_back(cumu); |
1107 | 248 | } |
1108 | 248 | } |
1109 | 208 | for (auto& [_, full] : _executing_full_compactions) { |
1110 | 6 | if (full) { |
1111 | 6 | full_compactions.push_back(full); |
1112 | 6 | } |
1113 | 6 | } |
1114 | 208 | for (auto& [_, stop_token] : _active_compaction_stop_tokens) { |
1115 | 12 | if (stop_token) { |
1116 | 12 | compation_stop_tokens.push_back(stop_token); |
1117 | 12 | } |
1118 | 12 | } |
1119 | 208 | for (auto& [_, index_change] : _submitted_index_change_cumu_compaction) { |
1120 | 3 | if (index_change) { |
1121 | 3 | index_change_compations.push_back(index_change); |
1122 | 3 | } |
1123 | 3 | } |
1124 | 208 | for (auto& [_, index_change] : _submitted_index_change_base_compaction) { |
1125 | 0 | if (index_change) { |
1126 | 0 | index_change_compations.push_back(index_change); |
1127 | 0 | } |
1128 | 0 | } |
1129 | 208 | } |
1130 | | // TODO(plat1ko): Support batch lease rpc |
1131 | 208 | for (auto& stop_token : compation_stop_tokens) { |
1132 | 12 | stop_token->do_lease(); |
1133 | 12 | } |
1134 | 208 | for (auto& comp : full_compactions) { |
1135 | 6 | comp->do_lease(); |
1136 | 6 | } |
1137 | 248 | for (auto& comp : cumu_compactions) { |
1138 | 248 | comp->do_lease(); |
1139 | 248 | } |
1140 | 208 | for (auto& comp : base_compactions) { |
1141 | 3 | comp->do_lease(); |
1142 | 3 | } |
1143 | 208 | for (auto& comp : index_change_compations) { |
1144 | 3 | comp->do_lease(); |
1145 | 3 | } |
1146 | 208 | } |
1147 | 1 | } |
1148 | | |
1149 | 1 | void CloudStorageEngine::_check_tablet_delete_bitmap_score_callback() { |
1150 | 1 | LOG(INFO) << "try to start check tablet delete bitmap score!"; |
1151 | 14 | while (!_stop_background_threads_latch.wait_for( |
1152 | 14 | std::chrono::seconds(config::check_tablet_delete_bitmap_interval_seconds))) { |
1153 | 13 | if (!config::enable_check_tablet_delete_bitmap_score) { |
1154 | 0 | return; |
1155 | 0 | } |
1156 | 13 | uint64_t max_delete_bitmap_score = 0; |
1157 | 13 | uint64_t max_base_rowset_delete_bitmap_score = 0; |
1158 | 13 | tablet_mgr().get_topn_tablet_delete_bitmap_score(&max_delete_bitmap_score, |
1159 | 13 | &max_base_rowset_delete_bitmap_score); |
1160 | 13 | if (max_delete_bitmap_score > 0) { |
1161 | 13 | _tablet_max_delete_bitmap_score_metrics->set_value(max_delete_bitmap_score); |
1162 | 13 | } |
1163 | 13 | if (max_base_rowset_delete_bitmap_score > 0) { |
1164 | 13 | _tablet_max_base_rowset_delete_bitmap_score_metrics->set_value( |
1165 | 13 | max_base_rowset_delete_bitmap_score); |
1166 | 13 | } |
1167 | 13 | } |
1168 | 1 | } |
1169 | | |
1170 | 0 | Status CloudStorageEngine::get_compaction_status_json(std::string* result) { |
1171 | 0 | rapidjson::Document root; |
1172 | 0 | root.SetObject(); |
1173 | |
|
1174 | 0 | std::lock_guard lock(_compaction_mtx); |
1175 | | // cumu |
1176 | 0 | std::string_view cumu = "CumulativeCompaction"; |
1177 | 0 | rapidjson::Value cumu_key; |
1178 | 0 | cumu_key.SetString(cumu.data(), cast_set<uint32_t>(cumu.length()), root.GetAllocator()); |
1179 | 0 | rapidjson::Document cumu_arr; |
1180 | 0 | cumu_arr.SetArray(); |
1181 | 0 | for (auto& [tablet_id, v] : _submitted_cumu_compactions) { |
1182 | 0 | for (int i = 0; i < v.size(); ++i) { |
1183 | 0 | cumu_arr.PushBack(tablet_id, root.GetAllocator()); |
1184 | 0 | } |
1185 | 0 | } |
1186 | 0 | root.AddMember(cumu_key, cumu_arr, root.GetAllocator()); |
1187 | | // base |
1188 | 0 | std::string_view base = "BaseCompaction"; |
1189 | 0 | rapidjson::Value base_key; |
1190 | 0 | base_key.SetString(base.data(), cast_set<uint32_t>(base.length()), root.GetAllocator()); |
1191 | 0 | rapidjson::Document base_arr; |
1192 | 0 | base_arr.SetArray(); |
1193 | 0 | for (auto& [tablet_id, _] : _submitted_base_compactions) { |
1194 | 0 | base_arr.PushBack(tablet_id, root.GetAllocator()); |
1195 | 0 | } |
1196 | 0 | root.AddMember(base_key, base_arr, root.GetAllocator()); |
1197 | |
|
1198 | 0 | rapidjson::StringBuffer strbuf; |
1199 | 0 | rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf); |
1200 | 0 | root.Accept(writer); |
1201 | 0 | *result = std::string(strbuf.GetString()); |
1202 | 0 | return Status::OK(); |
1203 | 0 | } |
1204 | | |
1205 | | std::shared_ptr<CloudCumulativeCompactionPolicy> CloudStorageEngine::cumu_compaction_policy( |
1206 | 120k | std::string_view compaction_policy) { |
1207 | 120k | if (!_cumulative_compaction_policies.contains(compaction_policy)) { |
1208 | 0 | return _cumulative_compaction_policies.at(CUMULATIVE_SIZE_BASED_POLICY); |
1209 | 0 | } |
1210 | 120k | return _cumulative_compaction_policies.at(compaction_policy); |
1211 | 120k | } |
1212 | | |
1213 | | Status CloudStorageEngine::register_compaction_stop_token(CloudTabletSPtr tablet, |
1214 | 1.16k | int64_t initiator) { |
1215 | 1.16k | { |
1216 | 1.16k | std::lock_guard lock(_compaction_mtx); |
1217 | 1.16k | auto [_, success] = _active_compaction_stop_tokens.emplace(tablet->tablet_id(), nullptr); |
1218 | 1.16k | if (!success) { |
1219 | 0 | return Status::AlreadyExist("stop token already exists for tablet_id={}", |
1220 | 0 | tablet->tablet_id()); |
1221 | 0 | } |
1222 | 1.16k | } |
1223 | | |
1224 | 1.16k | auto stop_token = std::make_shared<CloudCompactionStopToken>(*this, tablet, initiator); |
1225 | 1.16k | auto st = stop_token->do_register(); |
1226 | | |
1227 | 1.16k | if (!st.ok()) { |
1228 | 0 | std::lock_guard lock(_compaction_mtx); |
1229 | 0 | _active_compaction_stop_tokens.erase(tablet->tablet_id()); |
1230 | 0 | return st; |
1231 | 0 | } |
1232 | | |
1233 | 1.16k | { |
1234 | 1.16k | std::lock_guard lock(_compaction_mtx); |
1235 | 1.16k | _active_compaction_stop_tokens[tablet->tablet_id()] = stop_token; |
1236 | 1.16k | } |
1237 | 1.16k | LOG_INFO( |
1238 | 1.16k | "successfully register compaction stop token for tablet_id={}, " |
1239 | 1.16k | "delete_bitmap_lock_initiator={}", |
1240 | 1.16k | tablet->tablet_id(), initiator); |
1241 | 1.16k | return st; |
1242 | 1.16k | } |
1243 | | |
1244 | 1.16k | Status CloudStorageEngine::unregister_compaction_stop_token(CloudTabletSPtr tablet, bool clear_ms) { |
1245 | 1.16k | std::shared_ptr<CloudCompactionStopToken> stop_token; |
1246 | 1.16k | { |
1247 | 1.16k | std::lock_guard lock(_compaction_mtx); |
1248 | 1.16k | if (auto it = _active_compaction_stop_tokens.find(tablet->tablet_id()); |
1249 | 1.16k | it != _active_compaction_stop_tokens.end()) { |
1250 | 1.16k | stop_token = it->second; |
1251 | 1.16k | } else { |
1252 | 0 | return Status::NotFound("stop token not found for tablet_id={}", tablet->tablet_id()); |
1253 | 0 | } |
1254 | 1.16k | _active_compaction_stop_tokens.erase(tablet->tablet_id()); |
1255 | 1.16k | } |
1256 | 1.16k | LOG_INFO("successfully unregister compaction stop token for tablet_id={}", tablet->tablet_id()); |
1257 | 1.16k | if (stop_token && clear_ms) { |
1258 | 0 | RETURN_IF_ERROR(stop_token->do_unregister()); |
1259 | 0 | LOG_INFO( |
1260 | 0 | "successfully remove compaction stop token from MS for tablet_id={}, " |
1261 | 0 | "delete_bitmap_lock_initiator={}", |
1262 | 0 | tablet->tablet_id(), stop_token->initiator()); |
1263 | 0 | } |
1264 | 1.16k | return Status::OK(); |
1265 | 1.16k | } |
1266 | | |
1267 | 1 | Status CloudStorageEngine::_check_all_root_path_cluster_id() { |
1268 | | // Check if all root paths have the same cluster id |
1269 | 1 | std::set<int32_t> cluster_ids; |
1270 | 1 | for (const auto& path : _options.store_paths) { |
1271 | 1 | auto cluster_id_path = fmt::format("{}/{}", path.path, CLUSTER_ID_PREFIX); |
1272 | 1 | bool exists = false; |
1273 | 1 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(cluster_id_path, &exists)); |
1274 | 1 | if (exists) { |
1275 | 0 | io::FileReaderSPtr reader; |
1276 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cluster_id_path, &reader)); |
1277 | 0 | size_t fsize = reader->size(); |
1278 | 0 | if (fsize > 0) { |
1279 | 0 | std::string content; |
1280 | 0 | content.resize(fsize, '\0'); |
1281 | 0 | size_t bytes_read = 0; |
1282 | 0 | RETURN_IF_ERROR(reader->read_at(0, {content.data(), fsize}, &bytes_read)); |
1283 | 0 | DCHECK_EQ(fsize, bytes_read); |
1284 | 0 | int32_t tmp_cluster_id = std::stoi(content); |
1285 | 0 | cluster_ids.insert(tmp_cluster_id); |
1286 | 0 | } |
1287 | 0 | } |
1288 | 1 | } |
1289 | 1 | _effective_cluster_id = config::cluster_id; |
1290 | | // first init |
1291 | 1 | if (cluster_ids.empty()) { |
1292 | | // not set configured cluster id |
1293 | 1 | if (_effective_cluster_id == -1) { |
1294 | 1 | return Status::OK(); |
1295 | 1 | } else { |
1296 | | // If no cluster id file exists, use the configured cluster id |
1297 | 0 | return set_cluster_id(_effective_cluster_id); |
1298 | 0 | } |
1299 | 1 | } |
1300 | 0 | if (cluster_ids.size() > 1) { |
1301 | 0 | return Status::InternalError( |
1302 | 0 | "All root paths must have the same cluster id, but you have " |
1303 | 0 | "different cluster ids: {}", |
1304 | 0 | fmt::join(cluster_ids, ", ")); |
1305 | 0 | } |
1306 | 0 | if (_effective_cluster_id != -1 && !cluster_ids.empty() && |
1307 | 0 | *cluster_ids.begin() != _effective_cluster_id) { |
1308 | 0 | return Status::Corruption( |
1309 | 0 | "multiple cluster ids is not equal. config::cluster_id={}, " |
1310 | 0 | "storage path cluster_id={}", |
1311 | 0 | _effective_cluster_id, *cluster_ids.begin()); |
1312 | 0 | } |
1313 | 0 | return Status::OK(); |
1314 | 0 | } |
1315 | | |
1316 | 1 | Status CloudStorageEngine::set_cluster_id(int32_t cluster_id) { |
1317 | 1 | std::lock_guard<std::mutex> l(_store_lock); |
1318 | 1 | for (auto& path : _options.store_paths) { |
1319 | 1 | auto cluster_id_path = fmt::format("{}/{}", path.path, CLUSTER_ID_PREFIX); |
1320 | 1 | bool exists = false; |
1321 | 1 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(cluster_id_path, &exists)); |
1322 | 1 | if (!exists) { |
1323 | 1 | io::FileWriterPtr file_writer; |
1324 | 1 | RETURN_IF_ERROR( |
1325 | 1 | io::global_local_filesystem()->create_file(cluster_id_path, &file_writer)); |
1326 | 1 | RETURN_IF_ERROR(file_writer->append(std::to_string(cluster_id))); |
1327 | 1 | RETURN_IF_ERROR(file_writer->close()); |
1328 | 1 | } |
1329 | 1 | } |
1330 | 1 | _effective_cluster_id = cluster_id; |
1331 | 1 | return Status::OK(); |
1332 | 1 | } |
1333 | | |
1334 | | #include "common/compile_check_end.h" |
1335 | | } // namespace doris |