be/src/cloud/cloud_storage_engine.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "cloud/cloud_storage_engine.h" |
19 | | |
20 | | #include <bvar/reducer.h> |
21 | | #include <gen_cpp/PlanNodes_types.h> |
22 | | #include <gen_cpp/cloud.pb.h> |
23 | | #include <gen_cpp/olap_file.pb.h> |
24 | | #include <rapidjson/document.h> |
25 | | #include <rapidjson/encodings.h> |
26 | | #include <rapidjson/prettywriter.h> |
27 | | #include <rapidjson/stringbuffer.h> |
28 | | |
29 | | #include <algorithm> |
30 | | #include <memory> |
31 | | #include <variant> |
32 | | |
33 | | #include "cloud/cloud_base_compaction.h" |
34 | | #include "cloud/cloud_compaction_stop_token.h" |
35 | | #include "cloud/cloud_cumulative_compaction.h" |
36 | | #include "cloud/cloud_cumulative_compaction_policy.h" |
37 | | #include "cloud/cloud_full_compaction.h" |
38 | | #include "cloud/cloud_index_change_compaction.h" |
39 | | #include "cloud/cloud_meta_mgr.h" |
40 | | #include "cloud/cloud_snapshot_mgr.h" |
41 | | #include "cloud/cloud_tablet_hotspot.h" |
42 | | #include "cloud/cloud_tablet_mgr.h" |
43 | | #include "cloud/cloud_txn_delete_bitmap_cache.h" |
44 | | #include "cloud/cloud_warm_up_manager.h" |
45 | | #include "cloud/config.h" |
46 | | #include "common/config.h" |
47 | | #include "common/metrics/doris_metrics.h" |
48 | | #include "common/signal_handler.h" |
49 | | #include "common/status.h" |
50 | | #include "core/assert_cast.h" |
51 | | #include "io/cache/block_file_cache_downloader.h" |
52 | | #include "io/cache/block_file_cache_factory.h" |
53 | | #include "io/cache/file_cache_common.h" |
54 | | #include "io/fs/file_system.h" |
55 | | #include "io/fs/hdfs_file_system.h" |
56 | | #include "io/fs/s3_file_system.h" |
57 | | #include "io/hdfs_util.h" |
58 | | #include "io/io_common.h" |
59 | | #include "load/memtable/memtable_flush_executor.h" |
60 | | #include "runtime/exec_env.h" |
61 | | #include "runtime/memory/cache_manager.h" |
62 | | #include "storage/compaction/cumulative_compaction_policy.h" |
63 | | #include "storage/compaction/cumulative_compaction_time_series_policy.h" |
64 | | #include "storage/compaction_task_tracker.h" |
65 | | #include "storage/storage_policy.h" |
66 | | #include "util/parse_util.h" |
67 | | #include "util/time.h" |
68 | | |
69 | | namespace doris { |
70 | | #include "common/compile_check_begin.h" |
71 | | |
72 | | using namespace std::literals; |
73 | | |
74 | | bvar::Adder<uint64_t> g_base_compaction_running_task_count("base_compaction_running_task_count"); |
75 | | bvar::Adder<uint64_t> g_full_compaction_running_task_count("full_compaction_running_task_count"); |
76 | | bvar::Adder<uint64_t> g_cumu_compaction_running_task_count( |
77 | | "cumulative_compaction_running_task_count"); |
78 | | |
79 | 18.7k | int get_cumu_thread_num() { |
80 | 18.7k | if (config::max_cumu_compaction_threads > 0) { |
81 | 0 | return config::max_cumu_compaction_threads; |
82 | 0 | } |
83 | | |
84 | 18.7k | int num_cores = doris::CpuInfo::num_cores(); |
85 | 18.7k | return std::min(std::max(int(num_cores * config::cumu_compaction_thread_num_factor), 2), 20); |
86 | 18.7k | } |
87 | | |
88 | 18.7k | int get_base_thread_num() { |
89 | 18.7k | if (config::max_base_compaction_threads > 0) { |
90 | 18.7k | return config::max_base_compaction_threads; |
91 | 18.7k | } |
92 | | |
93 | 0 | int num_cores = doris::CpuInfo::num_cores(); |
94 | 0 | return std::min(std::max(int(num_cores * config::base_compaction_thread_num_factor), 1), 10); |
95 | 18.7k | } |
96 | | |
97 | | CloudStorageEngine::CloudStorageEngine(const EngineOptions& options) |
98 | 154 | : BaseStorageEngine(Type::CLOUD, options.backend_uid), |
99 | 154 | _meta_mgr(std::make_unique<cloud::CloudMetaMgr>()), |
100 | 154 | _tablet_mgr(std::make_unique<CloudTabletMgr>(*this)), |
101 | 154 | _options(options) { |
102 | 154 | _cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY] = |
103 | 154 | std::make_shared<CloudSizeBasedCumulativeCompactionPolicy>(); |
104 | 154 | _cumulative_compaction_policies[CUMULATIVE_TIME_SERIES_POLICY] = |
105 | 154 | std::make_shared<CloudTimeSeriesCumulativeCompactionPolicy>(); |
106 | 154 | _startup_timepoint = std::chrono::system_clock::now(); |
107 | 154 | } |
108 | | |
109 | 153 | CloudStorageEngine::~CloudStorageEngine() { |
110 | 153 | stop(); |
111 | 153 | } |
112 | | |
113 | | static Status vault_process_error(std::string_view id, |
114 | 0 | std::variant<S3Conf, cloud::HdfsVaultInfo>& vault, Status err) { |
115 | 0 | std::stringstream ss; |
116 | 0 | std::visit( |
117 | 0 | [&]<typename T>(T& val) { |
118 | 0 | if constexpr (std::is_same_v<T, S3Conf>) { |
119 | 0 | ss << val.to_string(); |
120 | 0 | } else if constexpr (std::is_same_v<T, cloud::HdfsVaultInfo>) { |
121 | 0 | val.SerializeToOstream(&ss); |
122 | 0 | } |
123 | 0 | }, Unexecuted instantiation: cloud_storage_engine.cpp:_ZZN5dorisL19vault_process_errorESt17basic_string_viewIcSt11char_traitsIcEERSt7variantIJNS_6S3ConfENS_5cloud13HdfsVaultInfoEEENS_6StatusEENK3$_0clIS5_EEDaRT_ Unexecuted instantiation: cloud_storage_engine.cpp:_ZZN5dorisL19vault_process_errorESt17basic_string_viewIcSt11char_traitsIcEERSt7variantIJNS_6S3ConfENS_5cloud13HdfsVaultInfoEEENS_6StatusEENK3$_0clIS7_EEDaRT_ |
124 | 0 | vault); |
125 | 0 | return Status::IOError("Invalid vault, id {}, err {}, detail conf {}", id, err, ss.str()); |
126 | 0 | } |
127 | | |
128 | | struct VaultCreateFSVisitor { |
129 | | VaultCreateFSVisitor(const std::string& id, const cloud::StorageVaultPB_PathFormat& path_format, |
130 | | bool check_fs) |
131 | 1 | : id(id), path_format(path_format), check_fs(check_fs) {} |
132 | 1 | Status operator()(const S3Conf& s3_conf) const { |
133 | 1 | LOG(INFO) << "get new s3 info: " << s3_conf.to_string() << " resource_id=" << id |
134 | 1 | << " check_fs: " << check_fs; |
135 | | |
136 | 1 | auto fs = DORIS_TRY(io::S3FileSystem::create(s3_conf, id)); |
137 | 1 | if (check_fs && !s3_conf.client_conf.role_arn.empty()) { |
138 | 0 | bool res = false; |
139 | | // just check connectivity, not care object if exist |
140 | 0 | auto st = fs->exists("not_exist_object", &res); |
141 | 0 | if (!st.ok()) { |
142 | 0 | LOG(FATAL) << "failed to check s3 fs, resource_id: " << id << " st: " << st |
143 | 0 | << "s3_conf: " << s3_conf.to_string() |
144 | 0 | << "add enable_check_storage_vault=false to be.conf to skip the check"; |
145 | 0 | } |
146 | 0 | } |
147 | | |
148 | 1 | put_storage_resource(id, {std::move(fs), path_format}, 0); |
149 | 1 | LOG_INFO("successfully create s3 vault, vault id {}", id); |
150 | 1 | return Status::OK(); |
151 | 1 | } |
152 | | |
153 | | // TODO(ByteYue): Make sure enable_java_support is on |
154 | 0 | Status operator()(const cloud::HdfsVaultInfo& vault) const { |
155 | 0 | auto hdfs_params = io::to_hdfs_params(vault); |
156 | 0 | auto fs = DORIS_TRY(io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name, id, |
157 | 0 | nullptr, vault.prefix())); |
158 | 0 | put_storage_resource(id, {std::move(fs), path_format}, 0); |
159 | 0 | LOG_INFO("successfully create hdfs vault, vault id {}", id); |
160 | 0 | return Status::OK(); |
161 | 0 | } |
162 | | |
163 | | const std::string& id; |
164 | | const cloud::StorageVaultPB_PathFormat& path_format; |
165 | | bool check_fs; |
166 | | }; |
167 | | |
168 | | struct RefreshFSVaultVisitor { |
169 | | RefreshFSVaultVisitor(const std::string& id, io::FileSystemSPtr fs, |
170 | | const cloud::StorageVaultPB_PathFormat& path_format) |
171 | 110 | : id(id), fs(std::move(fs)), path_format(path_format) {} |
172 | | |
173 | 110 | Status operator()(const S3Conf& s3_conf) const { |
174 | 110 | DCHECK_EQ(fs->type(), io::FileSystemType::S3) << id; |
175 | 110 | auto s3_fs = std::static_pointer_cast<io::S3FileSystem>(fs); |
176 | 110 | auto client_holder = s3_fs->client_holder(); |
177 | 110 | auto st = client_holder->reset(s3_conf.client_conf); |
178 | 110 | if (!st.ok()) { |
179 | 0 | LOG(WARNING) << "failed to update s3 fs, resource_id=" << id << ": " << st; |
180 | 0 | } |
181 | 110 | return st; |
182 | 110 | } |
183 | | |
184 | 0 | Status operator()(const cloud::HdfsVaultInfo& vault) const { |
185 | 0 | auto hdfs_params = io::to_hdfs_params(vault); |
186 | 0 | auto hdfs_fs = |
187 | 0 | DORIS_TRY(io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name, id, nullptr, |
188 | 0 | vault.has_prefix() ? vault.prefix() : "")); |
189 | 0 | auto hdfs = std::static_pointer_cast<io::HdfsFileSystem>(hdfs_fs); |
190 | 0 | put_storage_resource(id, {std::move(hdfs), path_format}, 0); |
191 | 0 | return Status::OK(); |
192 | 0 | } |
193 | | |
194 | | const std::string& id; |
195 | | io::FileSystemSPtr fs; |
196 | | const cloud::StorageVaultPB_PathFormat& path_format; |
197 | | }; |
198 | | |
199 | 1 | Status CloudStorageEngine::open() { |
200 | 1 | sync_storage_vault(); |
201 | | |
202 | | // TODO(plat1ko): DeleteBitmapTxnManager |
203 | | |
204 | 1 | _memtable_flush_executor = std::make_unique<MemTableFlushExecutor>(); |
205 | | // Use file cache disks number |
206 | 1 | _memtable_flush_executor->init( |
207 | 1 | cast_set<int32_t>(io::FileCacheFactory::instance()->get_cache_instance_size())); |
208 | | |
209 | 1 | _calc_delete_bitmap_executor = std::make_unique<CalcDeleteBitmapExecutor>(); |
210 | 1 | _calc_delete_bitmap_executor->init(config::calc_delete_bitmap_max_thread); |
211 | | |
212 | 1 | _calc_delete_bitmap_executor_for_load = std::make_unique<CalcDeleteBitmapExecutor>(); |
213 | 1 | _calc_delete_bitmap_executor_for_load->init( |
214 | 1 | config::calc_delete_bitmap_for_load_max_thread > 0 |
215 | 1 | ? config::calc_delete_bitmap_for_load_max_thread |
216 | 1 | : std::max(1, CpuInfo::num_cores() / 2)); |
217 | | |
218 | | // The default cache is set to 100MB, use memory limit to dynamic adjustment |
219 | 1 | bool is_percent = false; |
220 | 1 | int64_t delete_bitmap_agg_cache_cache_limit = |
221 | 1 | ParseUtil::parse_mem_spec(config::delete_bitmap_dynamic_agg_cache_limit, |
222 | 1 | MemInfo::mem_limit(), MemInfo::physical_mem(), &is_percent); |
223 | 1 | _txn_delete_bitmap_cache = std::make_unique<CloudTxnDeleteBitmapCache>( |
224 | 1 | delete_bitmap_agg_cache_cache_limit > config::delete_bitmap_agg_cache_capacity |
225 | 1 | ? delete_bitmap_agg_cache_cache_limit |
226 | 1 | : config::delete_bitmap_agg_cache_capacity); |
227 | 1 | RETURN_IF_ERROR(_txn_delete_bitmap_cache->init()); |
228 | | |
229 | 1 | _committed_rs_mgr = std::make_unique<CloudCommittedRSMgr>(); |
230 | 1 | RETURN_IF_ERROR(_committed_rs_mgr->init()); |
231 | | |
232 | 1 | _file_cache_block_downloader = std::make_unique<io::FileCacheBlockDownloader>(*this); |
233 | | |
234 | 1 | _cloud_warm_up_manager = std::make_unique<CloudWarmUpManager>(*this); |
235 | | |
236 | 1 | _tablet_hotspot = std::make_unique<TabletHotspot>(); |
237 | | |
238 | 1 | _cloud_snapshot_mgr = std::make_unique<CloudSnapshotMgr>(*this); |
239 | | |
240 | 1 | RETURN_NOT_OK_STATUS_WITH_WARN( |
241 | 1 | init_stream_load_recorder(ExecEnv::GetInstance()->store_paths()[0].path), |
242 | 1 | "init StreamLoadRecorder failed"); |
243 | | |
244 | | // check cluster id |
245 | 1 | RETURN_NOT_OK_STATUS_WITH_WARN(_check_all_root_path_cluster_id(), "fail to check cluster id"); |
246 | | |
247 | 1 | RETURN_NOT_OK_STATUS_WITH_WARN(ThreadPoolBuilder("SyncLoadForTabletsThreadPool") |
248 | 1 | .set_max_threads(config::sync_load_for_tablets_thread) |
249 | 1 | .set_min_threads(config::sync_load_for_tablets_thread) |
250 | 1 | .build(&_sync_load_for_tablets_thread_pool), |
251 | 1 | "fail to build SyncLoadForTabletsThreadPool"); |
252 | | |
253 | 1 | RETURN_NOT_OK_STATUS_WITH_WARN(ThreadPoolBuilder("WarmupCacheAsyncThreadPool") |
254 | 1 | .set_max_threads(config::warmup_cache_async_thread) |
255 | 1 | .set_min_threads(config::warmup_cache_async_thread) |
256 | 1 | .build(&_warmup_cache_async_thread_pool), |
257 | 1 | "fail to build WarmupCacheAsyncThreadPool"); |
258 | | |
259 | 1 | return Status::OK(); |
260 | 1 | } |
261 | | |
262 | 153 | void CloudStorageEngine::stop() { |
263 | 153 | if (_stopped) { |
264 | 0 | return; |
265 | 0 | } |
266 | | |
267 | 153 | _stopped = true; |
268 | 153 | _stop_background_threads_latch.count_down(); |
269 | | |
270 | 153 | for (auto&& t : _bg_threads) { |
271 | 0 | if (t) { |
272 | 0 | t->join(); |
273 | 0 | } |
274 | 0 | } |
275 | | |
276 | 153 | if (_base_compaction_thread_pool) { |
277 | 0 | _base_compaction_thread_pool->shutdown(); |
278 | 0 | } |
279 | 153 | if (_cumu_compaction_thread_pool) { |
280 | 0 | _cumu_compaction_thread_pool->shutdown(); |
281 | 0 | } |
282 | 153 | _adaptive_thread_controller.stop(); |
283 | 153 | LOG(INFO) << "Cloud storage engine is stopped."; |
284 | | |
285 | 153 | if (_calc_tablet_delete_bitmap_task_thread_pool) { |
286 | 0 | _calc_tablet_delete_bitmap_task_thread_pool->shutdown(); |
287 | 0 | } |
288 | 153 | if (_sync_delete_bitmap_thread_pool) { |
289 | 0 | _sync_delete_bitmap_thread_pool->shutdown(); |
290 | 0 | } |
291 | 153 | } |
292 | | |
293 | 61.1k | bool CloudStorageEngine::stopped() { |
294 | 61.1k | return _stopped; |
295 | 61.1k | } |
296 | | |
297 | | Result<BaseTabletSPtr> CloudStorageEngine::get_tablet(int64_t tablet_id, |
298 | | SyncRowsetStats* sync_stats, |
299 | | bool force_use_only_cached, |
300 | 1.27M | bool cache_on_miss) { |
301 | 1.27M | return _tablet_mgr |
302 | 1.27M | ->get_tablet(tablet_id, false, true, sync_stats, force_use_only_cached, cache_on_miss) |
303 | 1.27M | .transform([](auto&& t) { return static_pointer_cast<BaseTablet>(std::move(t)); }); |
304 | 1.27M | } |
305 | | |
306 | | Status CloudStorageEngine::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta, |
307 | 158k | bool force_use_only_cached) { |
308 | 158k | if (tablet_meta == nullptr) { |
309 | 0 | return Status::InvalidArgument("tablet_meta output is null"); |
310 | 0 | } |
311 | | |
312 | | #if 0 |
313 | | if (_tablet_mgr && _tablet_mgr->peek_tablet_meta(tablet_id, tablet_meta)) { |
314 | | return Status::OK(); |
315 | | } |
316 | | |
317 | | if (force_use_only_cached) { |
318 | | return Status::NotFound("tablet meta {} not found in cache", tablet_id); |
319 | | } |
320 | | #endif |
321 | | |
322 | 158k | if (_meta_mgr == nullptr) { |
323 | 0 | return Status::InternalError("cloud meta manager is not initialized"); |
324 | 0 | } |
325 | | |
326 | 158k | return _meta_mgr->get_tablet_meta(tablet_id, tablet_meta); |
327 | 158k | } |
328 | | |
329 | 1 | Status CloudStorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr) { |
330 | 1 | RETURN_IF_ERROR(Thread::create( |
331 | 1 | "CloudStorageEngine", "refresh_s3_info_thread", |
332 | 1 | [this]() { this->_refresh_storage_vault_info_thread_callback(); }, |
333 | 1 | &_bg_threads.emplace_back())); |
334 | 1 | LOG(INFO) << "refresh s3 info thread started"; |
335 | | |
336 | 1 | RETURN_IF_ERROR(Thread::create( |
337 | 1 | "CloudStorageEngine", "vacuum_stale_rowsets_thread", |
338 | 1 | [this]() { this->_vacuum_stale_rowsets_thread_callback(); }, |
339 | 1 | &_bg_threads.emplace_back())); |
340 | 1 | LOG(INFO) << "vacuum stale rowsets thread started"; |
341 | | |
342 | 1 | RETURN_IF_ERROR(Thread::create( |
343 | 1 | "CloudStorageEngine", "sync_tablets_thread", |
344 | 1 | [this]() { this->_sync_tablets_thread_callback(); }, &_bg_threads.emplace_back())); |
345 | 1 | LOG(INFO) << "sync tablets thread started"; |
346 | | |
347 | 1 | RETURN_IF_ERROR(Thread::create( |
348 | 1 | "CloudStorageEngine", "evict_querying_rowset_thread", |
349 | 1 | [this]() { this->_evict_quring_rowset_thread_callback(); }, |
350 | 1 | &_evict_quering_rowset_thread)); |
351 | 1 | LOG(INFO) << "evict quering thread started"; |
352 | | |
353 | | // add calculate tablet delete bitmap task thread pool |
354 | 1 | RETURN_IF_ERROR(ThreadPoolBuilder("TabletCalDeleteBitmapThreadPool") |
355 | 1 | .set_min_threads(config::calc_tablet_delete_bitmap_task_max_thread) |
356 | 1 | .set_max_threads(config::calc_tablet_delete_bitmap_task_max_thread) |
357 | 1 | .build(&_calc_tablet_delete_bitmap_task_thread_pool)); |
358 | 1 | RETURN_IF_ERROR(ThreadPoolBuilder("SyncDeleteBitmapThreadPool") |
359 | 1 | .set_min_threads(config::sync_delete_bitmap_task_max_thread) |
360 | 1 | .set_max_threads(config::sync_delete_bitmap_task_max_thread) |
361 | 1 | .build(&_sync_delete_bitmap_thread_pool)); |
362 | | |
363 | | // TODO(plat1ko): check_bucket_enable_versioning_thread |
364 | | |
365 | | // compaction tasks producer thread |
366 | 1 | int base_thread_num = get_base_thread_num(); |
367 | 1 | int cumu_thread_num = get_cumu_thread_num(); |
368 | | |
369 | 1 | RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool") |
370 | 1 | .set_min_threads(base_thread_num) |
371 | 1 | .set_max_threads(base_thread_num) |
372 | 1 | .build(&_base_compaction_thread_pool)); |
373 | 1 | RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool") |
374 | 1 | .set_min_threads(cumu_thread_num) |
375 | 1 | .set_max_threads(cumu_thread_num) |
376 | 1 | .build(&_cumu_compaction_thread_pool)); |
377 | 1 | RETURN_IF_ERROR(Thread::create( |
378 | 1 | "StorageEngine", "compaction_tasks_producer_thread", |
379 | 1 | [this]() { this->_compaction_tasks_producer_callback(); }, |
380 | 1 | &_bg_threads.emplace_back())); |
381 | 1 | LOG(INFO) << "compaction tasks producer thread started," |
382 | 1 | << " base thread num " << base_thread_num << " cumu thread num " << cumu_thread_num; |
383 | | |
384 | 1 | RETURN_IF_ERROR(Thread::create( |
385 | 1 | "StorageEngine", "lease_compaction_thread", |
386 | 1 | [this]() { this->_lease_compaction_thread_callback(); }, &_bg_threads.emplace_back())); |
387 | | |
388 | 1 | LOG(INFO) << "lease compaction thread started"; |
389 | | |
390 | 1 | RETURN_IF_ERROR(Thread::create( |
391 | 1 | "StorageEngine", "check_tablet_delete_bitmap_score_thread", |
392 | 1 | [this]() { this->_check_tablet_delete_bitmap_score_callback(); }, |
393 | 1 | &_bg_threads.emplace_back())); |
394 | 1 | LOG(INFO) << "check tablet delete bitmap score thread started"; |
395 | | |
396 | 1 | _start_adaptive_thread_controller(); |
397 | | |
398 | 1 | return Status::OK(); |
399 | 1 | } |
400 | | |
401 | 111 | void CloudStorageEngine::sync_storage_vault() { |
402 | 111 | cloud::StorageVaultInfos vault_infos; |
403 | 111 | bool enable_storage_vault = false; |
404 | | |
405 | 111 | auto st = _meta_mgr->get_storage_vault_info(&vault_infos, &enable_storage_vault); |
406 | 111 | if (!st.ok()) { |
407 | 0 | LOG(WARNING) << "failed to get storage vault info. err=" << st; |
408 | 0 | return; |
409 | 0 | } |
410 | | |
411 | 111 | if (vault_infos.empty()) { |
412 | 0 | LOG(WARNING) << "empty storage vault info"; |
413 | 0 | return; |
414 | 0 | } |
415 | | |
416 | 111 | bool check_storage_vault = false; |
417 | 111 | bool expected = false; |
418 | 111 | if (first_sync_storage_vault.compare_exchange_strong(expected, true)) { |
419 | 0 | check_storage_vault = config::enable_check_storage_vault; |
420 | 0 | LOG(INFO) << "first sync storage vault info, BE try to check iam role connectivity, " |
421 | 0 | "check_storage_vault=" |
422 | 0 | << check_storage_vault; |
423 | 0 | } |
424 | | |
425 | 111 | for (auto& [id, vault_info, path_format] : vault_infos) { |
426 | 111 | auto fs = get_filesystem(id); |
427 | 111 | auto status = |
428 | 111 | (fs == nullptr) |
429 | 111 | ? std::visit(VaultCreateFSVisitor {id, path_format, check_storage_vault}, |
430 | 1 | vault_info) |
431 | 111 | : std::visit(RefreshFSVaultVisitor {id, std::move(fs), path_format}, |
432 | 110 | vault_info); |
433 | 111 | if (!status.ok()) [[unlikely]] { |
434 | 0 | LOG(WARNING) << vault_process_error(id, vault_info, std::move(st)); |
435 | 0 | } |
436 | 111 | } |
437 | | |
438 | 111 | if (auto& id = std::get<0>(vault_infos.back()); |
439 | 111 | (latest_fs() == nullptr || latest_fs()->id() != id) && !enable_storage_vault) { |
440 | 1 | set_latest_fs(get_filesystem(id)); |
441 | 1 | } |
442 | 111 | } |
443 | | |
444 | | // We should enable_java_support if we want to use hdfs vault |
445 | 1 | void CloudStorageEngine::_refresh_storage_vault_info_thread_callback() { |
446 | 61 | while (!_stop_background_threads_latch.wait_for( |
447 | 61 | std::chrono::seconds(config::refresh_s3_info_interval_s))) { |
448 | 60 | sync_storage_vault(); |
449 | 60 | } |
450 | 1 | } |
451 | | |
452 | 1 | void CloudStorageEngine::_vacuum_stale_rowsets_thread_callback() { |
453 | 13 | while (!_stop_background_threads_latch.wait_for( |
454 | 13 | std::chrono::seconds(config::vacuum_stale_rowsets_interval_s))) { |
455 | 12 | _tablet_mgr->vacuum_stale_rowsets(_stop_background_threads_latch); |
456 | 12 | } |
457 | 1 | } |
458 | | |
459 | 1 | void CloudStorageEngine::_sync_tablets_thread_callback() { |
460 | 6 | while (!_stop_background_threads_latch.wait_for( |
461 | 6 | std::chrono::seconds(config::schedule_sync_tablets_interval_s))) { |
462 | 5 | _tablet_mgr->sync_tablets(_stop_background_threads_latch); |
463 | 5 | } |
464 | 1 | } |
465 | | |
466 | | void CloudStorageEngine::get_cumu_compaction( |
467 | 105k | int64_t tablet_id, std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res) { |
468 | 105k | std::lock_guard lock(_compaction_mtx); |
469 | 105k | if (auto it = _submitted_cumu_compactions.find(tablet_id); |
470 | 105k | it != _submitted_cumu_compactions.end()) { |
471 | 0 | res = it->second; |
472 | 0 | } |
473 | 105k | } |
474 | | |
475 | 18.7k | Status CloudStorageEngine::_adjust_compaction_thread_num() { |
476 | 18.7k | int base_thread_num = get_base_thread_num(); |
477 | | |
478 | 18.7k | if (!_base_compaction_thread_pool || !_cumu_compaction_thread_pool) { |
479 | 0 | LOG(WARNING) << "base or cumu compaction thread pool is not created"; |
480 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>(""); |
481 | 0 | } |
482 | | |
483 | 18.7k | if (_base_compaction_thread_pool->max_threads() != base_thread_num) { |
484 | 0 | int old_max_threads = _base_compaction_thread_pool->max_threads(); |
485 | 0 | Status status = _base_compaction_thread_pool->set_max_threads(base_thread_num); |
486 | 0 | if (status.ok()) { |
487 | 0 | VLOG_NOTICE << "update base compaction thread pool max_threads from " << old_max_threads |
488 | 0 | << " to " << base_thread_num; |
489 | 0 | } |
490 | 0 | } |
491 | 18.7k | if (_base_compaction_thread_pool->min_threads() != base_thread_num) { |
492 | 0 | int old_min_threads = _base_compaction_thread_pool->min_threads(); |
493 | 0 | Status status = _base_compaction_thread_pool->set_min_threads(base_thread_num); |
494 | 0 | if (status.ok()) { |
495 | 0 | VLOG_NOTICE << "update base compaction thread pool min_threads from " << old_min_threads |
496 | 0 | << " to " << base_thread_num; |
497 | 0 | } |
498 | 0 | } |
499 | | |
500 | 18.7k | int cumu_thread_num = get_cumu_thread_num(); |
501 | 18.7k | if (_cumu_compaction_thread_pool->max_threads() != cumu_thread_num) { |
502 | 0 | int old_max_threads = _cumu_compaction_thread_pool->max_threads(); |
503 | 0 | Status status = _cumu_compaction_thread_pool->set_max_threads(cumu_thread_num); |
504 | 0 | if (status.ok()) { |
505 | 0 | VLOG_NOTICE << "update cumu compaction thread pool max_threads from " << old_max_threads |
506 | 0 | << " to " << cumu_thread_num; |
507 | 0 | } |
508 | 0 | } |
509 | 18.7k | if (_cumu_compaction_thread_pool->min_threads() != cumu_thread_num) { |
510 | 0 | int old_min_threads = _cumu_compaction_thread_pool->min_threads(); |
511 | 0 | Status status = _cumu_compaction_thread_pool->set_min_threads(cumu_thread_num); |
512 | 0 | if (status.ok()) { |
513 | 0 | VLOG_NOTICE << "update cumu compaction thread pool min_threads from " << old_min_threads |
514 | 0 | << " to " << cumu_thread_num; |
515 | 0 | } |
516 | 0 | } |
517 | 18.7k | return Status::OK(); |
518 | 18.7k | } |
519 | | |
520 | 1 | void CloudStorageEngine::_compaction_tasks_producer_callback() { |
521 | 1 | LOG(INFO) << "try to start compaction producer process!"; |
522 | | |
523 | 1 | int round = 0; |
524 | 1 | CompactionType compaction_type; |
525 | | |
526 | | // Used to record the time when the score metric was last updated. |
527 | | // The update of the score metric is accompanied by the logic of selecting the tablet. |
528 | | // If there is no slot available, the logic of selecting the tablet will be terminated, |
529 | | // which causes the score metric update to be terminated. |
530 | | // In order to avoid this situation, we need to update the score regularly. |
531 | 1 | int64_t last_cumulative_score_update_time = 0; |
532 | 1 | int64_t last_base_score_update_time = 0; |
533 | 1 | static const int64_t check_score_interval_ms = 5000; // 5 secs |
534 | | |
535 | 1 | int64_t interval = config::generate_compaction_tasks_interval_ms; |
536 | 18.7k | do { |
537 | 18.7k | int64_t cur_time = UnixMillis(); |
538 | 18.7k | if (!config::disable_auto_compaction) { |
539 | 18.7k | Status st = _adjust_compaction_thread_num(); |
540 | 18.7k | if (!st.ok()) { |
541 | 0 | break; |
542 | 0 | } |
543 | | |
544 | 18.7k | bool check_score = false; |
545 | 18.7k | if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) { |
546 | 16.9k | compaction_type = CompactionType::CUMULATIVE_COMPACTION; |
547 | 16.9k | round++; |
548 | 16.9k | if (cur_time - last_cumulative_score_update_time >= check_score_interval_ms) { |
549 | 704 | check_score = true; |
550 | 704 | last_cumulative_score_update_time = cur_time; |
551 | 704 | } |
552 | 16.9k | } else { |
553 | 1.87k | compaction_type = CompactionType::BASE_COMPACTION; |
554 | 1.87k | round = 0; |
555 | 1.87k | if (cur_time - last_base_score_update_time >= check_score_interval_ms) { |
556 | 591 | check_score = true; |
557 | 591 | last_base_score_update_time = cur_time; |
558 | 591 | } |
559 | 1.87k | } |
560 | 18.7k | std::unique_ptr<ThreadPool>& thread_pool = |
561 | 18.7k | (compaction_type == CompactionType::CUMULATIVE_COMPACTION) |
562 | 18.7k | ? _cumu_compaction_thread_pool |
563 | 18.7k | : _base_compaction_thread_pool; |
564 | 18.7k | VLOG_CRITICAL << "compaction thread pool. type: " |
565 | 0 | << (compaction_type == CompactionType::CUMULATIVE_COMPACTION ? "CUMU" |
566 | 0 | : "BASE") |
567 | 0 | << ", num_threads: " << thread_pool->num_threads() |
568 | 0 | << ", num_threads_pending_start: " |
569 | 0 | << thread_pool->num_threads_pending_start() |
570 | 0 | << ", num_active_threads: " << thread_pool->num_active_threads() |
571 | 0 | << ", max_threads: " << thread_pool->max_threads() |
572 | 0 | << ", min_threads: " << thread_pool->min_threads() |
573 | 0 | << ", num_total_queued_tasks: " << thread_pool->get_queue_size(); |
574 | 18.7k | std::vector<CloudTabletSPtr> tablets_compaction = |
575 | 18.7k | _generate_cloud_compaction_tasks(compaction_type, check_score); |
576 | | |
577 | | /// Regardless of whether the tablet is submitted for compaction or not, |
578 | | /// we need to call 'reset_compaction' to clean up the base_compaction or cumulative_compaction objects |
579 | | /// in the tablet, because these two objects store the tablet's own shared_ptr. |
580 | | /// If it is not cleaned up, the reference count of the tablet will always be greater than 1, |
581 | | /// thus cannot be collected by the garbage collector. (TabletManager::start_trash_sweep) |
582 | 108k | for (const auto& tablet : tablets_compaction) { |
583 | 108k | Status status = submit_compaction_task(tablet, compaction_type); |
584 | 108k | if (status.ok()) continue; |
585 | 101k | if ((!status.is<ErrorCode::BE_NO_SUITABLE_VERSION>() && |
586 | 101k | !status.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) || |
587 | 101k | VLOG_DEBUG_IS_ON) { |
588 | 24 | LOG(WARNING) << "failed to submit compaction task for tablet: " |
589 | 24 | << tablet->tablet_id() << ", err: " << status; |
590 | 24 | } |
591 | 101k | } |
592 | 18.7k | interval = config::generate_compaction_tasks_interval_ms; |
593 | 18.7k | } else { |
594 | 1 | interval = config::check_auto_compaction_interval_seconds * 1000; |
595 | 1 | } |
596 | 18.7k | int64_t end_time = UnixMillis(); |
597 | 18.7k | DorisMetrics::instance()->compaction_producer_callback_a_round_time->set_value(end_time - |
598 | 18.7k | cur_time); |
599 | 18.7k | } while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval))); |
600 | 1 | } |
601 | | |
602 | | void CloudStorageEngine::unregister_index_change_compaction(int64_t tablet_id, |
603 | 885 | bool is_base_compact) { |
604 | 885 | std::lock_guard lock(_compaction_mtx); |
605 | 885 | if (is_base_compact) { |
606 | 0 | _submitted_index_change_base_compaction.erase(tablet_id); |
607 | 885 | } else { |
608 | 885 | _submitted_index_change_cumu_compaction.erase(tablet_id); |
609 | 885 | } |
610 | 885 | } |
611 | | |
612 | | bool CloudStorageEngine::register_index_change_compaction( |
613 | | std::shared_ptr<CloudIndexChangeCompaction> compact, int64_t tablet_id, |
614 | 888 | bool is_base_compact, std::string& err_reason) { |
615 | 888 | std::lock_guard lock(_compaction_mtx); |
616 | 888 | if (is_base_compact) { |
617 | 2 | if (_submitted_base_compactions.contains(tablet_id) || |
618 | 2 | _submitted_full_compactions.contains(tablet_id) || |
619 | 2 | _submitted_index_change_base_compaction.contains(tablet_id)) { |
620 | 1 | std::stringstream ss; |
621 | 1 | ss << "reason:" << ((int)_submitted_base_compactions.contains(tablet_id)) << ", " |
622 | 1 | << ((int)_submitted_full_compactions.contains(tablet_id)) << ", " |
623 | 1 | << ((int)_submitted_index_change_base_compaction.contains(tablet_id)); |
624 | 1 | err_reason = ss.str(); |
625 | 1 | return false; |
626 | 1 | } else { |
627 | 1 | _submitted_index_change_base_compaction[tablet_id] = compact; |
628 | 1 | return true; |
629 | 1 | } |
630 | 886 | } else { |
631 | 886 | if (_tablet_preparing_cumu_compaction.contains(tablet_id) || |
632 | 889 | _submitted_cumu_compactions.contains(tablet_id) || |
633 | 888 | _submitted_index_change_cumu_compaction.contains(tablet_id)) { |
634 | 1 | std::stringstream ss; |
635 | 1 | ss << "reason:" << ((int)_tablet_preparing_cumu_compaction.contains(tablet_id)) << ", " |
636 | 1 | << ((int)_submitted_cumu_compactions.contains(tablet_id)) << ", " |
637 | 1 | << ((int)_submitted_index_change_cumu_compaction.contains(tablet_id)); |
638 | 1 | err_reason = ss.str(); |
639 | 1 | return false; |
640 | 885 | } else { |
641 | 885 | _submitted_index_change_cumu_compaction[tablet_id] = compact; |
642 | 885 | } |
643 | 885 | return true; |
644 | 886 | } |
645 | 888 | } |
646 | | |
647 | | std::vector<CloudTabletSPtr> CloudStorageEngine::_generate_cloud_compaction_tasks( |
648 | 18.7k | CompactionType compaction_type, bool check_score) { |
649 | 18.7k | std::vector<std::shared_ptr<CloudTablet>> tablets_compaction; |
650 | | |
651 | 18.7k | int64_t max_compaction_score = 0; |
652 | 18.7k | std::unordered_set<int64_t> tablet_preparing_cumu_compaction; |
653 | 18.7k | std::unordered_map<int64_t, std::vector<std::shared_ptr<CloudCumulativeCompaction>>> |
654 | 18.7k | submitted_cumu_compactions; |
655 | 18.7k | std::unordered_map<int64_t, std::shared_ptr<CloudBaseCompaction>> submitted_base_compactions; |
656 | 18.7k | std::unordered_map<int64_t, std::shared_ptr<CloudFullCompaction>> submitted_full_compactions; |
657 | 18.7k | std::unordered_map<int64_t, std::shared_ptr<CloudIndexChangeCompaction>> |
658 | 18.7k | submitted_index_change_cumu_compactions; |
659 | 18.7k | std::unordered_map<int64_t, std::shared_ptr<CloudIndexChangeCompaction>> |
660 | 18.7k | submitted_index_change_base_compactions; |
661 | 18.7k | { |
662 | 18.7k | std::lock_guard lock(_compaction_mtx); |
663 | 18.7k | tablet_preparing_cumu_compaction = _tablet_preparing_cumu_compaction; |
664 | 18.7k | submitted_cumu_compactions = _submitted_cumu_compactions; |
665 | 18.7k | submitted_base_compactions = _submitted_base_compactions; |
666 | 18.7k | submitted_full_compactions = _submitted_full_compactions; |
667 | 18.7k | submitted_index_change_cumu_compactions = _submitted_index_change_cumu_compaction; |
668 | 18.7k | submitted_index_change_base_compactions = _submitted_index_change_base_compaction; |
669 | 18.7k | } |
670 | | |
671 | 18.7k | bool need_pick_tablet = true; |
672 | 18.7k | int thread_per_disk = |
673 | 18.7k | config::compaction_task_num_per_fast_disk; // all disks are fast in cloud mode |
674 | 18.7k | int num_cumu = |
675 | 18.7k | std::accumulate(submitted_cumu_compactions.begin(), submitted_cumu_compactions.end(), 0, |
676 | 18.7k | [](int a, auto& b) { return a + b.second.size(); }); |
677 | 18.7k | int num_base = |
678 | 18.7k | cast_set<int>(submitted_base_compactions.size() + submitted_full_compactions.size()); |
679 | 18.7k | int n = thread_per_disk - num_cumu - num_base; |
680 | 18.7k | if (compaction_type == CompactionType::BASE_COMPACTION) { |
681 | | // We need to reserve at least one thread for cumulative compaction, |
682 | | // because base compactions may take too long to complete, which may |
683 | | // leads to "too many rowsets" error. |
684 | 1.87k | int base_n = std::min(config::max_base_compaction_task_num_per_disk, thread_per_disk - 1) - |
685 | 1.87k | num_base; |
686 | 1.87k | n = std::min(base_n, n); |
687 | 1.87k | } |
688 | 18.7k | if (n <= 0) { // No threads available |
689 | 361 | if (!check_score) return tablets_compaction; |
690 | 17 | need_pick_tablet = false; |
691 | 17 | n = 0; |
692 | 17 | } |
693 | | |
694 | | // Return true for skipping compaction |
695 | 18.4k | std::function<bool(CloudTablet*)> filter_out; |
696 | 18.4k | if (compaction_type == CompactionType::BASE_COMPACTION) { |
697 | 1.84k | filter_out = [&submitted_base_compactions, &submitted_full_compactions, |
698 | 58.1M | &submitted_index_change_base_compactions](CloudTablet* t) { |
699 | 58.1M | return submitted_base_compactions.contains(t->tablet_id()) || |
700 | 58.1M | submitted_full_compactions.contains(t->tablet_id()) || |
701 | 58.1M | submitted_index_change_base_compactions.contains(t->tablet_id()) || |
702 | 58.1M | t->tablet_state() != TABLET_RUNNING; |
703 | 58.1M | }; |
704 | 16.6k | } else if (config::enable_parallel_cumu_compaction) { |
705 | 0 | filter_out = [&tablet_preparing_cumu_compaction, |
706 | 0 | &submitted_index_change_cumu_compactions](CloudTablet* t) { |
707 | 0 | return tablet_preparing_cumu_compaction.contains(t->tablet_id()) || |
708 | 0 | submitted_index_change_cumu_compactions.contains(t->tablet_id()) || |
709 | 0 | (t->tablet_state() != TABLET_RUNNING && |
710 | 0 | (!config::enable_new_tablet_do_compaction || t->alter_version() == -1)); |
711 | 0 | }; |
712 | 16.6k | } else { |
713 | 16.6k | filter_out = [&tablet_preparing_cumu_compaction, &submitted_cumu_compactions, |
714 | 217M | &submitted_index_change_cumu_compactions](CloudTablet* t) { |
715 | 217M | return tablet_preparing_cumu_compaction.contains(t->tablet_id()) || |
716 | 217M | submitted_index_change_cumu_compactions.contains(t->tablet_id()) || |
717 | 217M | submitted_cumu_compactions.contains(t->tablet_id()) || |
718 | 217M | (t->tablet_state() != TABLET_RUNNING && |
719 | 217M | (!config::enable_new_tablet_do_compaction || t->alter_version() == -1)); |
720 | 217M | }; |
721 | 16.6k | } |
722 | | |
723 | | // Even if need_pick_tablet is false, we still need to call find_best_tablet_to_compaction(), |
724 | | // So that we can update the max_compaction_score metric. |
725 | 18.4k | do { |
726 | 18.4k | std::vector<CloudTabletSPtr> tablets; |
727 | 18.4k | auto st = tablet_mgr().get_topn_tablets_to_compact(n, compaction_type, filter_out, &tablets, |
728 | 18.4k | &max_compaction_score); |
729 | 18.4k | if (!st.ok()) { |
730 | 0 | LOG(WARNING) << "failed to get tablets to compact, err=" << st; |
731 | 0 | break; |
732 | 0 | } |
733 | 18.4k | if (!need_pick_tablet) break; |
734 | 18.4k | tablets_compaction = std::move(tablets); |
735 | 18.4k | } while (false); |
736 | | |
737 | 18.4k | if (max_compaction_score > 0) { |
738 | 17.9k | if (compaction_type == CompactionType::BASE_COMPACTION) { |
739 | 1.80k | DorisMetrics::instance()->tablet_base_max_compaction_score->set_value( |
740 | 1.80k | max_compaction_score); |
741 | 16.1k | } else { |
742 | 16.1k | DorisMetrics::instance()->tablet_cumulative_max_compaction_score->set_value( |
743 | 16.1k | max_compaction_score); |
744 | 16.1k | } |
745 | 17.9k | } |
746 | | |
747 | 18.4k | return tablets_compaction; |
748 | 18.7k | } |
749 | | |
750 | | Status CloudStorageEngine::_request_tablet_global_compaction_lock( |
751 | | ReaderType compaction_type, const CloudTabletSPtr& tablet, |
752 | 6.59k | std::shared_ptr<CloudCompactionMixin> compaction) { |
753 | 6.59k | long now = duration_cast<std::chrono::milliseconds>( |
754 | 6.59k | std::chrono::system_clock::now().time_since_epoch()) |
755 | 6.59k | .count(); |
756 | 6.59k | if (compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION) { |
757 | 6.42k | auto cumu_compaction = static_pointer_cast<CloudCumulativeCompaction>(compaction); |
758 | 6.42k | if (auto st = cumu_compaction->request_global_lock(); !st.ok()) { |
759 | 307 | LOG_WARNING("failed to request cumu compactoin global lock") |
760 | 307 | .tag("tablet id", tablet->tablet_id()) |
761 | 307 | .tag("msg", st.to_string()); |
762 | 307 | tablet->set_last_cumu_compaction_failure_time(now); |
763 | 307 | return st; |
764 | 307 | } |
765 | 6.12k | { |
766 | 6.12k | std::lock_guard lock(_compaction_mtx); |
767 | 6.12k | _executing_cumu_compactions[tablet->tablet_id()].push_back(cumu_compaction); |
768 | 6.12k | } |
769 | 6.12k | return Status::OK(); |
770 | 6.42k | } else if (compaction_type == ReaderType::READER_BASE_COMPACTION) { |
771 | 64 | auto base_compaction = static_pointer_cast<CloudBaseCompaction>(compaction); |
772 | 64 | if (auto st = base_compaction->request_global_lock(); !st.ok()) { |
773 | 7 | LOG_WARNING("failed to request base compactoin global lock") |
774 | 7 | .tag("tablet id", tablet->tablet_id()) |
775 | 7 | .tag("msg", st.to_string()); |
776 | 7 | tablet->set_last_base_compaction_failure_time(now); |
777 | 7 | return st; |
778 | 7 | } |
779 | 57 | { |
780 | 57 | std::lock_guard lock(_compaction_mtx); |
781 | 57 | _executing_base_compactions[tablet->tablet_id()] = base_compaction; |
782 | 57 | } |
783 | 57 | return Status::OK(); |
784 | 104 | } else if (compaction_type == ReaderType::READER_FULL_COMPACTION) { |
785 | 104 | auto full_compaction = static_pointer_cast<CloudFullCompaction>(compaction); |
786 | 104 | if (auto st = full_compaction->request_global_lock(); !st.ok()) { |
787 | 0 | LOG_WARNING("failed to request full compactoin global lock") |
788 | 0 | .tag("tablet id", tablet->tablet_id()) |
789 | 0 | .tag("msg", st.to_string()); |
790 | 0 | tablet->set_last_full_compaction_failure_time(now); |
791 | 0 | return st; |
792 | 0 | } |
793 | 104 | { |
794 | 104 | std::lock_guard lock(_compaction_mtx); |
795 | 104 | _executing_full_compactions[tablet->tablet_id()] = full_compaction; |
796 | 104 | } |
797 | 104 | return Status::OK(); |
798 | 104 | } else { |
799 | 0 | LOG(WARNING) << "unsupport compaction task for tablet: " << tablet->tablet_id() |
800 | 0 | << ", compaction name: " << compaction->compaction_name(); |
801 | 0 | return Status::NotFound("Unsupport compaction type {}", compaction->compaction_name()); |
802 | 0 | } |
803 | 6.59k | } |
804 | | |
805 | | Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& tablet, |
806 | 3.29k | int trigger_method) { |
807 | 3.29k | using namespace std::chrono; |
808 | 3.29k | { |
809 | 3.29k | std::lock_guard lock(_compaction_mtx); |
810 | | // Take a placeholder for base compaction |
811 | 3.29k | auto [_, success] = _submitted_base_compactions.emplace(tablet->tablet_id(), nullptr); |
812 | 3.29k | if (!success) { |
813 | 0 | return Status::AlreadyExist( |
814 | 0 | "other base compaction or full compaction is submitted, tablet_id={}", |
815 | 0 | tablet->tablet_id()); |
816 | 0 | } |
817 | 3.29k | } |
818 | 3.29k | auto compaction = std::make_shared<CloudBaseCompaction>(*this, tablet); |
819 | 3.29k | auto st = compaction->prepare_compact(); |
820 | 3.29k | if (!st.ok()) { |
821 | 3.23k | long now = duration_cast<std::chrono::milliseconds>( |
822 | 3.23k | std::chrono::system_clock::now().time_since_epoch()) |
823 | 3.23k | .count(); |
824 | 3.23k | tablet->set_last_base_compaction_failure_time(now); |
825 | 3.23k | std::lock_guard lock(_compaction_mtx); |
826 | 3.23k | _submitted_base_compactions.erase(tablet->tablet_id()); |
827 | 3.23k | return st; |
828 | 3.23k | } |
829 | | // Register task with CompactionTaskTracker as PENDING |
830 | 64 | auto* tracker = CompactionTaskTracker::instance(); |
831 | 64 | int64_t compaction_id = compaction->compaction_id(); |
832 | 64 | { |
833 | 64 | CompactionTaskInfo info; |
834 | 64 | info.compaction_id = compaction_id; |
835 | 64 | info.tablet_id = tablet->tablet_id(); |
836 | 64 | info.table_id = tablet->table_id(); |
837 | 64 | info.partition_id = tablet->partition_id(); |
838 | 64 | info.compaction_type = CompactionProfileType::BASE; |
839 | 64 | info.status = CompactionTaskStatus::PENDING; |
840 | 64 | info.trigger_method = static_cast<TriggerMethod>(trigger_method); |
841 | 64 | info.scheduled_time_ms = |
842 | 64 | duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
843 | 64 | info.backend_id = BackendOptions::get_backend_id(); |
844 | 64 | info.compaction_score = tablet->get_real_compaction_score(); |
845 | 64 | info.input_rowsets_count = compaction->input_rowsets_count(); |
846 | 64 | info.input_row_num = compaction->input_row_num_value(); |
847 | 64 | info.input_data_size = compaction->input_rowsets_data_size(); |
848 | 64 | info.input_index_size = compaction->input_rowsets_index_size(); |
849 | 64 | info.input_total_size = compaction->input_rowsets_total_size(); |
850 | 64 | info.input_segments_num = compaction->input_segments_num_value(); |
851 | 64 | info.input_version_range = compaction->input_version_range_str(); |
852 | 64 | info.is_vertical = compaction->is_vertical(); |
853 | 64 | tracker->register_task(std::move(info)); |
854 | 64 | } |
855 | 64 | { |
856 | 64 | std::lock_guard lock(_compaction_mtx); |
857 | 64 | _submitted_base_compactions[tablet->tablet_id()] = compaction; |
858 | 64 | } |
859 | 64 | st = _base_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() { |
860 | 64 | DorisMetrics::instance()->base_compaction_task_running_total->increment(1); |
861 | 64 | DorisMetrics::instance()->base_compaction_task_pending_total->set_value( |
862 | 64 | _base_compaction_thread_pool->get_queue_size()); |
863 | 64 | g_base_compaction_running_task_count << 1; |
864 | 64 | signal::tablet_id = tablet->tablet_id(); |
865 | 64 | Defer defer {[&]() { |
866 | | // Idempotent cleanup: remove task from tracker |
867 | 64 | CompactionTaskTracker::instance()->remove_task(compaction_id); |
868 | 64 | g_base_compaction_running_task_count << -1; |
869 | 64 | std::lock_guard lock(_compaction_mtx); |
870 | 64 | _submitted_base_compactions.erase(tablet->tablet_id()); |
871 | 64 | DorisMetrics::instance()->base_compaction_task_running_total->increment(-1); |
872 | 64 | DorisMetrics::instance()->base_compaction_task_pending_total->set_value( |
873 | 64 | _base_compaction_thread_pool->get_queue_size()); |
874 | 64 | }}; |
875 | 64 | auto st = _request_tablet_global_compaction_lock(ReaderType::READER_BASE_COMPACTION, tablet, |
876 | 64 | compaction); |
877 | 64 | if (!st.ok()) return; |
878 | | // Update tracker to RUNNING after acquiring global lock |
879 | 57 | { |
880 | 57 | RunningStats rs; |
881 | 57 | rs.start_time_ms = |
882 | 57 | duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
883 | 57 | CompactionTaskTracker::instance()->update_to_running(compaction_id, rs); |
884 | 57 | } |
885 | 57 | st = compaction->execute_compact(); |
886 | 57 | if (!st.ok()) { |
887 | | // Error log has been output in `execute_compact` |
888 | 0 | long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
889 | 0 | tablet->set_last_base_compaction_failure_time(now); |
890 | 0 | } |
891 | 57 | std::lock_guard lock(_compaction_mtx); |
892 | 57 | _executing_base_compactions.erase(tablet->tablet_id()); |
893 | 57 | }); |
894 | 64 | DorisMetrics::instance()->base_compaction_task_pending_total->set_value( |
895 | 64 | _base_compaction_thread_pool->get_queue_size()); |
896 | 64 | if (!st.ok()) { |
897 | 0 | tracker->remove_task(compaction_id); |
898 | 0 | std::lock_guard lock(_compaction_mtx); |
899 | 0 | _submitted_base_compactions.erase(tablet->tablet_id()); |
900 | 0 | return Status::InternalError("failed to submit base compaction, tablet_id={}", |
901 | 0 | tablet->tablet_id()); |
902 | 0 | } |
903 | 64 | return st; |
904 | 64 | } |
905 | | |
906 | | Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletSPtr& tablet, |
907 | 105k | int trigger_method) { |
908 | 105k | using namespace std::chrono; |
909 | 105k | { |
910 | 105k | std::lock_guard lock(_compaction_mtx); |
911 | 105k | if (!config::enable_parallel_cumu_compaction && |
912 | 105k | _submitted_cumu_compactions.count(tablet->tablet_id())) { |
913 | 5 | return Status::AlreadyExist("other cumu compaction is submitted, tablet_id={}", |
914 | 5 | tablet->tablet_id()); |
915 | 5 | } |
916 | 105k | auto [_, success] = _tablet_preparing_cumu_compaction.insert(tablet->tablet_id()); |
917 | 105k | if (!success) { |
918 | 0 | return Status::AlreadyExist("other cumu compaction is preparing, tablet_id={}", |
919 | 0 | tablet->tablet_id()); |
920 | 0 | } |
921 | 105k | } |
922 | 105k | auto compaction = std::make_shared<CloudCumulativeCompaction>(*this, tablet); |
923 | 105k | auto st = compaction->prepare_compact(); |
924 | 105k | if (!st.ok()) { |
925 | 98.7k | long now = duration_cast<std::chrono::milliseconds>( |
926 | 98.7k | std::chrono::system_clock::now().time_since_epoch()) |
927 | 98.7k | .count(); |
928 | 98.7k | if (!st.is<ErrorCode::CUMULATIVE_MEET_DELETE_VERSION>()) { |
929 | 98.7k | if (st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) { |
930 | | // Backoff strategy if no suitable version |
931 | 98.7k | tablet->last_cumu_no_suitable_version_ms = now; |
932 | 98.7k | } else { |
933 | 0 | tablet->set_last_cumu_compaction_failure_time(now); |
934 | 0 | } |
935 | 98.7k | } |
936 | 98.7k | std::lock_guard lock(_compaction_mtx); |
937 | 98.7k | _tablet_preparing_cumu_compaction.erase(tablet->tablet_id()); |
938 | 98.7k | return st; |
939 | 98.7k | } |
940 | | // Register task with CompactionTaskTracker as PENDING |
941 | | // IMPORTANT: use compaction->compaction_id(), NOT tracker->next_compaction_id(), |
942 | | // because the Compaction constructor already allocated an ID via the tracker. |
943 | 6.42k | auto* tracker = CompactionTaskTracker::instance(); |
944 | 6.42k | int64_t compaction_id = compaction->compaction_id(); |
945 | 6.42k | { |
946 | 6.42k | CompactionTaskInfo info; |
947 | 6.42k | info.compaction_id = compaction_id; |
948 | 6.42k | info.tablet_id = tablet->tablet_id(); |
949 | 6.42k | info.table_id = tablet->table_id(); |
950 | 6.42k | info.partition_id = tablet->partition_id(); |
951 | 6.42k | info.compaction_type = CompactionProfileType::CUMULATIVE; |
952 | 6.42k | info.status = CompactionTaskStatus::PENDING; |
953 | 6.42k | info.trigger_method = static_cast<TriggerMethod>(trigger_method); |
954 | 6.42k | info.scheduled_time_ms = |
955 | 6.42k | duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
956 | 6.42k | info.backend_id = BackendOptions::get_backend_id(); |
957 | 6.42k | info.compaction_score = tablet->get_real_compaction_score(); |
958 | 6.42k | info.input_rowsets_count = compaction->input_rowsets_count(); |
959 | 6.42k | info.input_row_num = compaction->input_row_num_value(); |
960 | 6.42k | info.input_data_size = compaction->input_rowsets_data_size(); |
961 | 6.42k | info.input_index_size = compaction->input_rowsets_index_size(); |
962 | 6.42k | info.input_total_size = compaction->input_rowsets_total_size(); |
963 | 6.42k | info.input_segments_num = compaction->input_segments_num_value(); |
964 | 6.42k | info.input_version_range = compaction->input_version_range_str(); |
965 | 6.42k | info.is_vertical = compaction->is_vertical(); |
966 | 6.42k | tracker->register_task(std::move(info)); |
967 | 6.42k | } |
968 | 6.42k | { |
969 | 6.42k | std::lock_guard lock(_compaction_mtx); |
970 | 6.42k | _tablet_preparing_cumu_compaction.erase(tablet->tablet_id()); |
971 | 6.42k | _submitted_cumu_compactions[tablet->tablet_id()].push_back(compaction); |
972 | 6.42k | } |
973 | 6.42k | auto erase_submitted_cumu_compaction = [=, this]() { |
974 | 6.42k | std::lock_guard lock(_compaction_mtx); |
975 | 6.42k | auto it = _submitted_cumu_compactions.find(tablet->tablet_id()); |
976 | 6.42k | DCHECK(it != _submitted_cumu_compactions.end()); |
977 | 6.42k | auto& compactions = it->second; |
978 | 6.42k | auto it1 = std::find(compactions.begin(), compactions.end(), compaction); |
979 | 6.42k | DCHECK(it1 != compactions.end()); |
980 | 6.42k | compactions.erase(it1); |
981 | 6.42k | if (compactions.empty()) { // No compactions on this tablet, erase key |
982 | 6.42k | _submitted_cumu_compactions.erase(it); |
983 | | // No cumu compaction on this tablet, reset `last_cumu_no_suitable_version_ms` to enable this tablet to |
984 | | // enter the compaction scheduling candidate set. The purpose of doing this is to have at least one BE perform |
985 | | // cumu compaction on tablet which has suitable versions for cumu compaction. |
986 | 6.42k | tablet->last_cumu_no_suitable_version_ms = 0; |
987 | 6.42k | } |
988 | 6.42k | }; |
989 | 6.42k | auto erase_executing_cumu_compaction = [=, this]() { |
990 | 6.11k | std::lock_guard lock(_compaction_mtx); |
991 | 6.11k | auto it = _executing_cumu_compactions.find(tablet->tablet_id()); |
992 | 6.11k | DCHECK(it != _executing_cumu_compactions.end()); |
993 | 6.11k | auto& compactions = it->second; |
994 | 6.11k | auto it1 = std::find(compactions.begin(), compactions.end(), compaction); |
995 | 6.11k | DCHECK(it1 != compactions.end()); |
996 | 6.11k | compactions.erase(it1); |
997 | 6.12k | if (compactions.empty()) { // No compactions on this tablet, erase key |
998 | 6.12k | _executing_cumu_compactions.erase(it); |
999 | | // No cumu compaction on this tablet, reset `last_cumu_no_suitable_version_ms` to enable this tablet to |
1000 | | // enter the compaction scheduling candidate set. The purpose of doing this is to have at least one BE perform |
1001 | | // cumu compaction on tablet which has suitable versions for cumu compaction. |
1002 | 6.12k | tablet->last_cumu_no_suitable_version_ms = 0; |
1003 | 6.12k | } |
1004 | 6.11k | }; |
1005 | 6.42k | st = _cumu_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() { |
1006 | 6.42k | DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(1); |
1007 | 6.42k | DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value( |
1008 | 6.42k | _cumu_compaction_thread_pool->get_queue_size()); |
1009 | 6.42k | DBUG_EXECUTE_IF("CloudStorageEngine._submit_cumulative_compaction_task.wait_in_line", |
1010 | 6.42k | { sleep(5); }) |
1011 | 6.42k | signal::tablet_id = tablet->tablet_id(); |
1012 | 6.42k | g_cumu_compaction_running_task_count << 1; |
1013 | 6.42k | bool is_large_task = true; |
1014 | 6.42k | Defer defer {[&]() { |
1015 | 6.42k | DBUG_EXECUTE_IF("CloudStorageEngine._submit_cumulative_compaction_task.sleep", |
1016 | 6.42k | { sleep(5); }) |
1017 | | // Idempotent cleanup: remove task from tracker |
1018 | 6.42k | CompactionTaskTracker::instance()->remove_task(compaction_id); |
1019 | 6.42k | std::lock_guard lock(_cumu_compaction_delay_mtx); |
1020 | 6.42k | _cumu_compaction_thread_pool_used_threads--; |
1021 | 6.42k | if (!is_large_task) { |
1022 | 6.12k | _cumu_compaction_thread_pool_small_tasks_running--; |
1023 | 6.12k | } |
1024 | 6.42k | g_cumu_compaction_running_task_count << -1; |
1025 | 6.42k | erase_submitted_cumu_compaction(); |
1026 | 6.42k | DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(-1); |
1027 | 6.42k | DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value( |
1028 | 6.42k | _cumu_compaction_thread_pool->get_queue_size()); |
1029 | 6.42k | }}; |
1030 | 6.42k | auto st = _request_tablet_global_compaction_lock(ReaderType::READER_CUMULATIVE_COMPACTION, |
1031 | 6.42k | tablet, compaction); |
1032 | 6.42k | if (!st.ok()) return; |
1033 | | // Update tracker to RUNNING after acquiring global lock |
1034 | 6.12k | { |
1035 | 6.12k | RunningStats rs; |
1036 | 6.12k | rs.start_time_ms = |
1037 | 6.12k | duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
1038 | 6.12k | CompactionTaskTracker::instance()->update_to_running(compaction_id, rs); |
1039 | 6.12k | } |
1040 | 6.12k | do { |
1041 | 6.12k | std::lock_guard lock(_cumu_compaction_delay_mtx); |
1042 | 6.12k | _cumu_compaction_thread_pool_used_threads++; |
1043 | 6.12k | if (config::large_cumu_compaction_task_min_thread_num > 1 && |
1044 | 6.12k | _cumu_compaction_thread_pool->max_threads() >= |
1045 | 6.12k | config::large_cumu_compaction_task_min_thread_num) { |
1046 | | // Determine if this is a small task based on configured thresholds |
1047 | 6.12k | is_large_task = (compaction->get_input_rowsets_bytes() > |
1048 | 6.12k | config::large_cumu_compaction_task_bytes_threshold || |
1049 | 6.12k | compaction->get_input_num_rows() > |
1050 | 6.12k | config::large_cumu_compaction_task_row_num_threshold); |
1051 | | // Small task. No delay needed |
1052 | 6.12k | if (!is_large_task) { |
1053 | 6.12k | _cumu_compaction_thread_pool_small_tasks_running++; |
1054 | 6.12k | break; |
1055 | 6.12k | } |
1056 | | // Deal with large task |
1057 | 1 | if (_should_delay_large_task()) { |
1058 | 0 | long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()) |
1059 | 0 | .count(); |
1060 | | // sleep 5s for this tablet |
1061 | 0 | tablet->set_last_cumu_compaction_failure_time(now); |
1062 | 0 | erase_executing_cumu_compaction(); |
1063 | 0 | LOG_WARNING( |
1064 | 0 | "failed to do CloudCumulativeCompaction, cumu thread pool is " |
1065 | 0 | "intensive, delay large task.") |
1066 | 0 | .tag("tablet_id", tablet->tablet_id()) |
1067 | 0 | .tag("input_rows", compaction->get_input_num_rows()) |
1068 | 0 | .tag("input_rowsets_total_size", compaction->get_input_rowsets_bytes()) |
1069 | 0 | .tag("config::large_cumu_compaction_task_bytes_threshold", |
1070 | 0 | config::large_cumu_compaction_task_bytes_threshold) |
1071 | 0 | .tag("config::large_cumu_compaction_task_row_num_threshold", |
1072 | 0 | config::large_cumu_compaction_task_row_num_threshold) |
1073 | 0 | .tag("remaining threads", _cumu_compaction_thread_pool_used_threads) |
1074 | 0 | .tag("small_tasks_running", |
1075 | 0 | _cumu_compaction_thread_pool_small_tasks_running); |
1076 | 0 | return; |
1077 | 0 | } |
1078 | 1 | } |
1079 | 6.12k | } while (false); |
1080 | 6.12k | st = compaction->execute_compact(); |
1081 | 6.12k | if (!st.ok()) { |
1082 | | // Error log has been output in `execute_compact` |
1083 | 47 | long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
1084 | 47 | tablet->set_last_cumu_compaction_failure_time(now); |
1085 | 47 | } |
1086 | 6.12k | erase_executing_cumu_compaction(); |
1087 | 6.12k | }); |
1088 | 6.42k | DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value( |
1089 | 6.42k | _cumu_compaction_thread_pool->get_queue_size()); |
1090 | 6.42k | if (!st.ok()) { |
1091 | 0 | tracker->remove_task(compaction_id); |
1092 | 0 | erase_submitted_cumu_compaction(); |
1093 | 0 | return Status::InternalError("failed to submit cumu compaction, tablet_id={}", |
1094 | 0 | tablet->tablet_id()); |
1095 | 0 | } |
1096 | 6.42k | return st; |
1097 | 6.42k | } |
1098 | | |
1099 | | Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& tablet, |
1100 | 105 | int trigger_method) { |
1101 | 105 | using namespace std::chrono; |
1102 | 105 | { |
1103 | 105 | std::lock_guard lock(_compaction_mtx); |
1104 | | // Take a placeholder for full compaction |
1105 | 105 | auto [_, success] = _submitted_full_compactions.emplace(tablet->tablet_id(), nullptr); |
1106 | 105 | if (!success) { |
1107 | 0 | return Status::AlreadyExist( |
1108 | 0 | "other full compaction or base compaction is submitted, tablet_id={}", |
1109 | 0 | tablet->tablet_id()); |
1110 | 0 | } |
1111 | 105 | } |
1112 | | //auto compaction = std::make_shared<CloudFullCompaction>(tablet); |
1113 | 105 | auto compaction = std::make_shared<CloudFullCompaction>(*this, tablet); |
1114 | 105 | auto st = compaction->prepare_compact(); |
1115 | 105 | if (!st.ok()) { |
1116 | 1 | long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
1117 | 1 | tablet->set_last_full_compaction_failure_time(now); |
1118 | 1 | std::lock_guard lock(_compaction_mtx); |
1119 | 1 | _submitted_full_compactions.erase(tablet->tablet_id()); |
1120 | 1 | return st; |
1121 | 1 | } |
1122 | | // Register task with CompactionTaskTracker as PENDING |
1123 | 104 | auto* tracker = CompactionTaskTracker::instance(); |
1124 | 104 | int64_t compaction_id = compaction->compaction_id(); |
1125 | 104 | { |
1126 | 104 | CompactionTaskInfo info; |
1127 | 104 | info.compaction_id = compaction_id; |
1128 | 104 | info.tablet_id = tablet->tablet_id(); |
1129 | 104 | info.table_id = tablet->table_id(); |
1130 | 104 | info.partition_id = tablet->partition_id(); |
1131 | 104 | info.compaction_type = CompactionProfileType::FULL; |
1132 | 104 | info.status = CompactionTaskStatus::PENDING; |
1133 | 104 | info.trigger_method = static_cast<TriggerMethod>(trigger_method); |
1134 | 104 | info.scheduled_time_ms = |
1135 | 104 | duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
1136 | 104 | info.backend_id = BackendOptions::get_backend_id(); |
1137 | 104 | info.compaction_score = tablet->get_real_compaction_score(); |
1138 | 104 | info.input_rowsets_count = compaction->input_rowsets_count(); |
1139 | 104 | info.input_row_num = compaction->input_row_num_value(); |
1140 | 104 | info.input_data_size = compaction->input_rowsets_data_size(); |
1141 | 104 | info.input_index_size = compaction->input_rowsets_index_size(); |
1142 | 104 | info.input_total_size = compaction->input_rowsets_total_size(); |
1143 | 104 | info.input_segments_num = compaction->input_segments_num_value(); |
1144 | 104 | info.input_version_range = compaction->input_version_range_str(); |
1145 | 104 | info.is_vertical = compaction->is_vertical(); |
1146 | 104 | tracker->register_task(std::move(info)); |
1147 | 104 | } |
1148 | 104 | { |
1149 | 104 | std::lock_guard lock(_compaction_mtx); |
1150 | 104 | _submitted_full_compactions[tablet->tablet_id()] = compaction; |
1151 | 104 | } |
1152 | 104 | st = _base_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() { |
1153 | 104 | g_full_compaction_running_task_count << 1; |
1154 | 104 | signal::tablet_id = tablet->tablet_id(); |
1155 | 104 | Defer defer {[&]() { |
1156 | | // Idempotent cleanup: remove task from tracker |
1157 | 104 | CompactionTaskTracker::instance()->remove_task(compaction_id); |
1158 | 104 | g_full_compaction_running_task_count << -1; |
1159 | 104 | std::lock_guard lock(_compaction_mtx); |
1160 | 104 | _submitted_full_compactions.erase(tablet->tablet_id()); |
1161 | 104 | }}; |
1162 | 104 | auto st = _request_tablet_global_compaction_lock(ReaderType::READER_FULL_COMPACTION, tablet, |
1163 | 104 | compaction); |
1164 | 104 | if (!st.ok()) return; |
1165 | | // Update tracker to RUNNING after acquiring global lock |
1166 | 104 | { |
1167 | 104 | RunningStats rs; |
1168 | 104 | rs.start_time_ms = |
1169 | 104 | duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
1170 | 104 | CompactionTaskTracker::instance()->update_to_running(compaction_id, rs); |
1171 | 104 | } |
1172 | 104 | st = compaction->execute_compact(); |
1173 | 104 | if (!st.ok()) { |
1174 | | // Error log has been output in `execute_compact` |
1175 | 0 | long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
1176 | 0 | tablet->set_last_full_compaction_failure_time(now); |
1177 | 0 | } |
1178 | 104 | std::lock_guard lock(_compaction_mtx); |
1179 | 104 | _executing_full_compactions.erase(tablet->tablet_id()); |
1180 | 104 | }); |
1181 | 104 | if (!st.ok()) { |
1182 | 0 | tracker->remove_task(compaction_id); |
1183 | 0 | std::lock_guard lock(_compaction_mtx); |
1184 | 0 | _submitted_full_compactions.erase(tablet->tablet_id()); |
1185 | 0 | return Status::InternalError("failed to submit full compaction, tablet_id={}", |
1186 | 0 | tablet->tablet_id()); |
1187 | 0 | } |
1188 | 104 | return st; |
1189 | 104 | } |
1190 | | |
1191 | | Status CloudStorageEngine::submit_compaction_task(const CloudTabletSPtr& tablet, |
1192 | | CompactionType compaction_type, |
1193 | 108k | int trigger_method) { |
1194 | 108k | DCHECK(compaction_type == CompactionType::CUMULATIVE_COMPACTION || |
1195 | 108k | compaction_type == CompactionType::BASE_COMPACTION || |
1196 | 108k | compaction_type == CompactionType::FULL_COMPACTION); |
1197 | 108k | switch (compaction_type) { |
1198 | 3.29k | case CompactionType::BASE_COMPACTION: |
1199 | 3.29k | RETURN_IF_ERROR(_submit_base_compaction_task(tablet, trigger_method)); |
1200 | 64 | return Status::OK(); |
1201 | 105k | case CompactionType::CUMULATIVE_COMPACTION: |
1202 | 105k | RETURN_IF_ERROR(_submit_cumulative_compaction_task(tablet, trigger_method)); |
1203 | 6.42k | return Status::OK(); |
1204 | 105 | case CompactionType::FULL_COMPACTION: |
1205 | 105 | RETURN_IF_ERROR(_submit_full_compaction_task(tablet, trigger_method)); |
1206 | 104 | return Status::OK(); |
1207 | 0 | default: |
1208 | 0 | return Status::InternalError("unknown compaction type!"); |
1209 | 108k | } |
1210 | 108k | } |
1211 | | |
1212 | 1 | void CloudStorageEngine::_lease_compaction_thread_callback() { |
1213 | 183 | while (!_stop_background_threads_latch.wait_for( |
1214 | 183 | std::chrono::seconds(config::lease_compaction_interval_seconds))) { |
1215 | 182 | std::vector<std::shared_ptr<CloudFullCompaction>> full_compactions; |
1216 | 182 | std::vector<std::shared_ptr<CloudBaseCompaction>> base_compactions; |
1217 | 182 | std::vector<std::shared_ptr<CloudCumulativeCompaction>> cumu_compactions; |
1218 | 182 | std::vector<std::shared_ptr<CloudCompactionStopToken>> compation_stop_tokens; |
1219 | 182 | std::vector<std::shared_ptr<CloudIndexChangeCompaction>> index_change_compations; |
1220 | 182 | { |
1221 | 182 | std::lock_guard lock(_compaction_mtx); |
1222 | 182 | for (auto& [_, base] : _executing_base_compactions) { |
1223 | 1 | if (base) { // `base` might be a nullptr placeholder |
1224 | 1 | base_compactions.push_back(base); |
1225 | 1 | } |
1226 | 1 | } |
1227 | 182 | for (auto& [_, cumus] : _executing_cumu_compactions) { |
1228 | 175 | for (auto& cumu : cumus) { |
1229 | 175 | cumu_compactions.push_back(cumu); |
1230 | 175 | } |
1231 | 175 | } |
1232 | 182 | for (auto& [_, full] : _executing_full_compactions) { |
1233 | 7 | if (full) { |
1234 | 7 | full_compactions.push_back(full); |
1235 | 7 | } |
1236 | 7 | } |
1237 | 182 | for (auto& [_, stop_token] : _active_compaction_stop_tokens) { |
1238 | 8 | if (stop_token) { |
1239 | 8 | compation_stop_tokens.push_back(stop_token); |
1240 | 8 | } |
1241 | 8 | } |
1242 | 182 | for (auto& [_, index_change] : _submitted_index_change_cumu_compaction) { |
1243 | 4 | if (index_change) { |
1244 | 4 | index_change_compations.push_back(index_change); |
1245 | 4 | } |
1246 | 4 | } |
1247 | 182 | for (auto& [_, index_change] : _submitted_index_change_base_compaction) { |
1248 | 0 | if (index_change) { |
1249 | 0 | index_change_compations.push_back(index_change); |
1250 | 0 | } |
1251 | 0 | } |
1252 | 182 | } |
1253 | | // TODO(plat1ko): Support batch lease rpc |
1254 | 182 | for (auto& stop_token : compation_stop_tokens) { |
1255 | 8 | stop_token->do_lease(); |
1256 | 8 | } |
1257 | 182 | for (auto& comp : full_compactions) { |
1258 | 7 | comp->do_lease(); |
1259 | 7 | } |
1260 | 182 | for (auto& comp : cumu_compactions) { |
1261 | 175 | comp->do_lease(); |
1262 | 175 | } |
1263 | 182 | for (auto& comp : base_compactions) { |
1264 | 1 | comp->do_lease(); |
1265 | 1 | } |
1266 | 182 | for (auto& comp : index_change_compations) { |
1267 | 4 | comp->do_lease(); |
1268 | 4 | } |
1269 | 182 | } |
1270 | 1 | } |
1271 | | |
1272 | 1 | void CloudStorageEngine::_check_tablet_delete_bitmap_score_callback() { |
1273 | 1 | LOG(INFO) << "try to start check tablet delete bitmap score!"; |
1274 | 13 | while (!_stop_background_threads_latch.wait_for( |
1275 | 13 | std::chrono::seconds(config::check_tablet_delete_bitmap_interval_seconds))) { |
1276 | 12 | if (!config::enable_check_tablet_delete_bitmap_score) { |
1277 | 0 | return; |
1278 | 0 | } |
1279 | 12 | uint64_t max_delete_bitmap_score = 0; |
1280 | 12 | uint64_t max_base_rowset_delete_bitmap_score = 0; |
1281 | 12 | tablet_mgr().get_topn_tablet_delete_bitmap_score(&max_delete_bitmap_score, |
1282 | 12 | &max_base_rowset_delete_bitmap_score); |
1283 | 12 | if (max_delete_bitmap_score > 0) { |
1284 | 12 | _tablet_max_delete_bitmap_score_metrics->set_value(max_delete_bitmap_score); |
1285 | 12 | } |
1286 | 12 | if (max_base_rowset_delete_bitmap_score > 0) { |
1287 | 11 | _tablet_max_base_rowset_delete_bitmap_score_metrics->set_value( |
1288 | 11 | max_base_rowset_delete_bitmap_score); |
1289 | 11 | } |
1290 | 12 | } |
1291 | 1 | } |
1292 | | |
1293 | 0 | Status CloudStorageEngine::get_compaction_status_json(std::string* result) { |
1294 | 0 | rapidjson::Document root; |
1295 | 0 | root.SetObject(); |
1296 | |
|
1297 | 0 | std::lock_guard lock(_compaction_mtx); |
1298 | | // cumu |
1299 | 0 | std::string_view cumu = "CumulativeCompaction"; |
1300 | 0 | rapidjson::Value cumu_key; |
1301 | 0 | cumu_key.SetString(cumu.data(), cast_set<uint32_t>(cumu.length()), root.GetAllocator()); |
1302 | 0 | rapidjson::Document cumu_arr; |
1303 | 0 | cumu_arr.SetArray(); |
1304 | 0 | for (auto& [tablet_id, v] : _submitted_cumu_compactions) { |
1305 | 0 | for (int i = 0; i < v.size(); ++i) { |
1306 | 0 | cumu_arr.PushBack(tablet_id, root.GetAllocator()); |
1307 | 0 | } |
1308 | 0 | } |
1309 | 0 | root.AddMember(cumu_key, cumu_arr, root.GetAllocator()); |
1310 | | // base |
1311 | 0 | std::string_view base = "BaseCompaction"; |
1312 | 0 | rapidjson::Value base_key; |
1313 | 0 | base_key.SetString(base.data(), cast_set<uint32_t>(base.length()), root.GetAllocator()); |
1314 | 0 | rapidjson::Document base_arr; |
1315 | 0 | base_arr.SetArray(); |
1316 | 0 | for (auto& [tablet_id, _] : _submitted_base_compactions) { |
1317 | 0 | base_arr.PushBack(tablet_id, root.GetAllocator()); |
1318 | 0 | } |
1319 | 0 | root.AddMember(base_key, base_arr, root.GetAllocator()); |
1320 | |
|
1321 | 0 | rapidjson::StringBuffer strbuf; |
1322 | 0 | rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf); |
1323 | 0 | root.Accept(writer); |
1324 | 0 | *result = std::string(strbuf.GetString()); |
1325 | 0 | return Status::OK(); |
1326 | 0 | } |
1327 | | |
1328 | | std::shared_ptr<CloudCumulativeCompactionPolicy> CloudStorageEngine::cumu_compaction_policy( |
1329 | 116k | std::string_view compaction_policy) { |
1330 | 116k | if (!_cumulative_compaction_policies.contains(compaction_policy)) { |
1331 | 0 | return _cumulative_compaction_policies.at(CUMULATIVE_SIZE_BASED_POLICY); |
1332 | 0 | } |
1333 | 116k | return _cumulative_compaction_policies.at(compaction_policy); |
1334 | 116k | } |
1335 | | |
1336 | | Status CloudStorageEngine::register_compaction_stop_token(CloudTabletSPtr tablet, |
1337 | 1.17k | int64_t initiator) { |
1338 | 1.17k | { |
1339 | 1.17k | std::lock_guard lock(_compaction_mtx); |
1340 | 1.17k | auto [_, success] = _active_compaction_stop_tokens.emplace(tablet->tablet_id(), nullptr); |
1341 | 1.17k | if (!success) { |
1342 | 0 | return Status::AlreadyExist("stop token already exists for tablet_id={}", |
1343 | 0 | tablet->tablet_id()); |
1344 | 0 | } |
1345 | 1.17k | } |
1346 | | |
1347 | 1.17k | auto stop_token = std::make_shared<CloudCompactionStopToken>(*this, tablet, initiator); |
1348 | 1.17k | auto st = stop_token->do_register(); |
1349 | | |
1350 | 1.17k | if (!st.ok()) { |
1351 | 0 | std::lock_guard lock(_compaction_mtx); |
1352 | 0 | _active_compaction_stop_tokens.erase(tablet->tablet_id()); |
1353 | 0 | return st; |
1354 | 0 | } |
1355 | | |
1356 | 1.17k | { |
1357 | 1.17k | std::lock_guard lock(_compaction_mtx); |
1358 | 1.17k | _active_compaction_stop_tokens[tablet->tablet_id()] = stop_token; |
1359 | 1.17k | } |
1360 | 1.17k | LOG_INFO( |
1361 | 1.17k | "successfully register compaction stop token for tablet_id={}, " |
1362 | 1.17k | "delete_bitmap_lock_initiator={}", |
1363 | 1.17k | tablet->tablet_id(), initiator); |
1364 | 1.17k | return st; |
1365 | 1.17k | } |
1366 | | |
1367 | 1.17k | Status CloudStorageEngine::unregister_compaction_stop_token(CloudTabletSPtr tablet, bool clear_ms) { |
1368 | 1.17k | std::shared_ptr<CloudCompactionStopToken> stop_token; |
1369 | 1.17k | { |
1370 | 1.17k | std::lock_guard lock(_compaction_mtx); |
1371 | 1.17k | if (auto it = _active_compaction_stop_tokens.find(tablet->tablet_id()); |
1372 | 1.17k | it != _active_compaction_stop_tokens.end()) { |
1373 | 1.17k | stop_token = it->second; |
1374 | 18.4E | } else { |
1375 | 18.4E | return Status::NotFound("stop token not found for tablet_id={}", tablet->tablet_id()); |
1376 | 18.4E | } |
1377 | 1.17k | _active_compaction_stop_tokens.erase(tablet->tablet_id()); |
1378 | 1.17k | } |
1379 | 1.17k | LOG_INFO("successfully unregister compaction stop token for tablet_id={}", tablet->tablet_id()); |
1380 | 1.17k | if (stop_token && clear_ms) { |
1381 | 0 | RETURN_IF_ERROR(stop_token->do_unregister()); |
1382 | 0 | LOG_INFO( |
1383 | 0 | "successfully remove compaction stop token from MS for tablet_id={}, " |
1384 | 0 | "delete_bitmap_lock_initiator={}", |
1385 | 0 | tablet->tablet_id(), stop_token->initiator()); |
1386 | 0 | } |
1387 | 1.17k | return Status::OK(); |
1388 | 1.17k | } |
1389 | | |
1390 | 1 | Status CloudStorageEngine::_check_all_root_path_cluster_id() { |
1391 | | // Check if all root paths have the same cluster id |
1392 | 1 | std::set<int32_t> cluster_ids; |
1393 | 1 | for (const auto& path : _options.store_paths) { |
1394 | 1 | auto cluster_id_path = fmt::format("{}/{}", path.path, CLUSTER_ID_PREFIX); |
1395 | 1 | bool exists = false; |
1396 | 1 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(cluster_id_path, &exists)); |
1397 | 1 | if (exists) { |
1398 | 0 | io::FileReaderSPtr reader; |
1399 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cluster_id_path, &reader)); |
1400 | 0 | size_t fsize = reader->size(); |
1401 | 0 | if (fsize > 0) { |
1402 | 0 | std::string content; |
1403 | 0 | content.resize(fsize, '\0'); |
1404 | 0 | size_t bytes_read = 0; |
1405 | 0 | RETURN_IF_ERROR(reader->read_at(0, {content.data(), fsize}, &bytes_read)); |
1406 | 0 | DCHECK_EQ(fsize, bytes_read); |
1407 | 0 | int32_t tmp_cluster_id = std::stoi(content); |
1408 | 0 | cluster_ids.insert(tmp_cluster_id); |
1409 | 0 | } |
1410 | 0 | } |
1411 | 1 | } |
1412 | 1 | _effective_cluster_id = config::cluster_id; |
1413 | | // first init |
1414 | 1 | if (cluster_ids.empty()) { |
1415 | | // not set configured cluster id |
1416 | 1 | if (_effective_cluster_id == -1) { |
1417 | 1 | return Status::OK(); |
1418 | 1 | } else { |
1419 | | // If no cluster id file exists, use the configured cluster id |
1420 | 0 | return set_cluster_id(_effective_cluster_id); |
1421 | 0 | } |
1422 | 1 | } |
1423 | 0 | if (cluster_ids.size() > 1) { |
1424 | 0 | return Status::InternalError( |
1425 | 0 | "All root paths must have the same cluster id, but you have " |
1426 | 0 | "different cluster ids: {}", |
1427 | 0 | fmt::join(cluster_ids, ", ")); |
1428 | 0 | } |
1429 | 0 | if (_effective_cluster_id != -1 && !cluster_ids.empty() && |
1430 | 0 | *cluster_ids.begin() != _effective_cluster_id) { |
1431 | 0 | return Status::Corruption( |
1432 | 0 | "multiple cluster ids is not equal. config::cluster_id={}, " |
1433 | 0 | "storage path cluster_id={}", |
1434 | 0 | _effective_cluster_id, *cluster_ids.begin()); |
1435 | 0 | } |
1436 | 0 | return Status::OK(); |
1437 | 0 | } |
1438 | | |
1439 | 1 | Status CloudStorageEngine::set_cluster_id(int32_t cluster_id) { |
1440 | 1 | std::lock_guard<std::mutex> l(_store_lock); |
1441 | 1 | for (auto& path : _options.store_paths) { |
1442 | 1 | auto cluster_id_path = fmt::format("{}/{}", path.path, CLUSTER_ID_PREFIX); |
1443 | 1 | bool exists = false; |
1444 | 1 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(cluster_id_path, &exists)); |
1445 | 1 | if (!exists) { |
1446 | 1 | io::FileWriterPtr file_writer; |
1447 | 1 | RETURN_IF_ERROR( |
1448 | 1 | io::global_local_filesystem()->create_file(cluster_id_path, &file_writer)); |
1449 | 1 | RETURN_IF_ERROR(file_writer->append(std::to_string(cluster_id))); |
1450 | 1 | RETURN_IF_ERROR(file_writer->close()); |
1451 | 1 | } |
1452 | 1 | } |
1453 | 1 | _effective_cluster_id = cluster_id; |
1454 | 1 | return Status::OK(); |
1455 | 1 | } |
1456 | | |
1457 | | #include "common/compile_check_end.h" |
1458 | | } // namespace doris |