be/src/storage/storage_engine.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/storage_engine.h" |
19 | | |
20 | | // IWYU pragma: no_include <bthread/errno.h> |
21 | | #include <fmt/format.h> |
22 | | #include <gen_cpp/AgentService_types.h> |
23 | | #include <gen_cpp/FrontendService.h> |
24 | | #include <gen_cpp/Types_types.h> |
25 | | #include <glog/logging.h> |
26 | | #include <rapidjson/document.h> |
27 | | #include <rapidjson/encodings.h> |
28 | | #include <rapidjson/prettywriter.h> |
29 | | #include <rapidjson/stringbuffer.h> |
30 | | #include <sys/resource.h> |
31 | | #include <thrift/protocol/TDebugProtocol.h> |
32 | | |
33 | | #include <algorithm> |
34 | | #include <boost/algorithm/string/case_conv.hpp> |
35 | | #include <boost/container/detail/std_fwd.hpp> |
36 | | #include <cassert> |
37 | | #include <cerrno> // IWYU pragma: keep |
38 | | #include <chrono> |
39 | | #include <cstdlib> |
40 | | #include <cstring> |
41 | | #include <filesystem> |
42 | | #include <iterator> |
43 | | #include <memory> |
44 | | #include <mutex> |
45 | | #include <ostream> |
46 | | #include <set> |
47 | | #include <thread> |
48 | | #include <unordered_set> |
49 | | #include <utility> |
50 | | |
51 | | #include "agent/task_worker_pool.h" |
52 | | #include "cloud/cloud_storage_engine.h" |
53 | | #include "common/config.h" |
54 | | #include "common/logging.h" |
55 | | #include "common/metrics/doris_metrics.h" |
56 | | #include "common/metrics/metrics.h" |
57 | | #include "common/status.h" |
58 | | #include "core/assert_cast.h" |
59 | | #include "io/fs/local_file_system.h" |
60 | | #include "load/memtable/memtable_flush_executor.h" |
61 | | #include "load/stream_load/stream_load_recorder.h" |
62 | | #include "storage/binlog.h" |
63 | | #include "storage/cache/schema_cache.h" |
64 | | #include "storage/compaction/single_replica_compaction.h" |
65 | | #include "storage/data_dir.h" |
66 | | #include "storage/id_manager.h" |
67 | | #include "storage/olap_common.h" |
68 | | #include "storage/olap_define.h" |
69 | | #include "storage/rowset/rowset_fwd.h" |
70 | | #include "storage/rowset/rowset_meta.h" |
71 | | #include "storage/rowset/rowset_meta_manager.h" |
72 | | #include "storage/rowset/unique_rowset_id_generator.h" |
73 | | #include "storage/snapshot/snapshot_manager.h" |
74 | | #include "storage/tablet/tablet_manager.h" |
75 | | #include "storage/tablet/tablet_meta.h" |
76 | | #include "storage/tablet/tablet_meta_manager.h" |
77 | | #include "storage/txn/txn_manager.h" |
78 | | #include "util/client_cache.h" |
79 | | #include "util/mem_info.h" |
80 | | #include "util/stopwatch.hpp" |
81 | | #include "util/thread.h" |
82 | | #include "util/threadpool.h" |
83 | | #include "util/thrift_rpc_helper.h" |
84 | | #include "util/uid_util.h" |
85 | | #include "util/work_thread_pool.hpp" |
86 | | |
87 | | using std::filesystem::directory_iterator; |
88 | | using std::filesystem::path; |
89 | | using std::map; |
90 | | using std::set; |
91 | | using std::string; |
92 | | using std::stringstream; |
93 | | using std::vector; |
94 | | |
95 | | namespace doris { |
96 | | #include "common/compile_check_begin.h" |
97 | | using namespace ErrorCode; |
98 | | extern void get_round_robin_stores(int64_t curr_index, const std::vector<DirInfo>& dir_infos, |
99 | | std::vector<DataDir*>& stores); |
100 | | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(unused_rowsets_count, MetricUnit::ROWSETS); |
101 | | bvar::Status<int64_t> g_max_rowsets_with_useless_delete_bitmap( |
102 | | "max_rowsets_with_useless_delete_bitmap", 0); |
103 | | bvar::Status<int64_t> g_max_rowsets_with_useless_delete_bitmap_version( |
104 | | "max_rowsets_with_useless_delete_bitmap_version", 0); |
105 | | |
106 | | namespace { |
107 | | bvar::Adder<uint64_t> unused_rowsets_counter("ununsed_rowsets_counter"); |
108 | | }; |
109 | | |
110 | | BaseStorageEngine::BaseStorageEngine(Type type, const UniqueId& backend_uid) |
111 | 478 | : _type(type), |
112 | 478 | _rowset_id_generator(std::make_unique<UniqueRowsetIdGenerator>(backend_uid)), |
113 | 478 | _stop_background_threads_latch(1) { |
114 | 478 | _memory_limitation_bytes_for_schema_change = static_cast<int64_t>( |
115 | 478 | static_cast<double>(MemInfo::soft_mem_limit()) * config::schema_change_mem_limit_frac); |
116 | 478 | _tablet_max_delete_bitmap_score_metrics = |
117 | 478 | std::make_shared<bvar::Status<size_t>>("tablet_max", "delete_bitmap_score", 0); |
118 | 478 | _tablet_max_base_rowset_delete_bitmap_score_metrics = std::make_shared<bvar::Status<size_t>>( |
119 | 478 | "tablet_max_base_rowset", "delete_bitmap_score", 0); |
120 | 478 | } |
121 | | |
122 | 474 | BaseStorageEngine::~BaseStorageEngine() = default; |
123 | | |
124 | 242k | RowsetId BaseStorageEngine::next_rowset_id() { |
125 | 242k | return _rowset_id_generator->next_id(); |
126 | 242k | } |
127 | | |
128 | 23.2k | StorageEngine& BaseStorageEngine::to_local() { |
129 | 23.2k | CHECK_EQ(_type, Type::LOCAL); |
130 | 23.2k | return *static_cast<StorageEngine*>(this); |
131 | 23.2k | } |
132 | | |
133 | 2.07M | CloudStorageEngine& BaseStorageEngine::to_cloud() { |
134 | 2.07M | CHECK_EQ(_type, Type::CLOUD); |
135 | 2.07M | return *static_cast<CloudStorageEngine*>(this); |
136 | 2.07M | } |
137 | | |
138 | 34.7k | int64_t BaseStorageEngine::memory_limitation_bytes_per_thread_for_schema_change() const { |
139 | 34.7k | return std::max(_memory_limitation_bytes_for_schema_change / config::alter_tablet_worker_count, |
140 | 34.7k | config::memory_limitation_per_thread_for_schema_change_bytes); |
141 | 34.7k | } |
142 | | |
143 | 45 | Status BaseStorageEngine::init_stream_load_recorder(const std::string& stream_load_record_path) { |
144 | 45 | LOG(INFO) << "stream load record path: " << stream_load_record_path; |
145 | | // init stream load record rocksdb |
146 | 45 | _stream_load_recorder = StreamLoadRecorder::create_shared(stream_load_record_path); |
147 | 45 | if (_stream_load_recorder == nullptr) { |
148 | 0 | RETURN_NOT_OK_STATUS_WITH_WARN( |
149 | 0 | Status::MemoryAllocFailed("allocate memory for StreamLoadRecorder failed"), |
150 | 0 | "new StreamLoadRecorder failed"); |
151 | 0 | } |
152 | 45 | auto st = _stream_load_recorder->init(); |
153 | 45 | if (!st.ok()) { |
154 | 0 | RETURN_NOT_OK_STATUS_WITH_WARN( |
155 | 0 | Status::IOError("open StreamLoadRecorder rocksdb failed, path={}", |
156 | 0 | stream_load_record_path), |
157 | 0 | "init StreamLoadRecorder failed"); |
158 | 0 | } |
159 | 45 | return Status::OK(); |
160 | 45 | } |
161 | | |
162 | 0 | void CompactionSubmitRegistry::jsonfy_compaction_status(std::string* result) { |
163 | 0 | rapidjson::Document root; |
164 | 0 | root.SetObject(); |
165 | |
|
166 | 0 | auto add_node = [&root](const std::string& name, const Registry& registry) { |
167 | 0 | rapidjson::Value compaction_name; |
168 | 0 | compaction_name.SetString(name.c_str(), cast_set<uint32_t>(name.length()), |
169 | 0 | root.GetAllocator()); |
170 | 0 | rapidjson::Document path_obj; |
171 | 0 | path_obj.SetObject(); |
172 | 0 | for (const auto& it : registry) { |
173 | 0 | const auto& dir = it.first->path(); |
174 | 0 | rapidjson::Value path_key; |
175 | 0 | path_key.SetString(dir.c_str(), cast_set<uint32_t>(dir.length()), root.GetAllocator()); |
176 | |
|
177 | 0 | rapidjson::Document arr; |
178 | 0 | arr.SetArray(); |
179 | |
|
180 | 0 | for (const auto& tablet : it.second) { |
181 | 0 | rapidjson::Value tablet_id; |
182 | 0 | auto tablet_id_str = std::to_string(tablet->tablet_id()); |
183 | 0 | tablet_id.SetString(tablet_id_str.c_str(), |
184 | 0 | cast_set<uint32_t>(tablet_id_str.length()), |
185 | 0 | root.GetAllocator()); |
186 | 0 | arr.PushBack(tablet_id, root.GetAllocator()); |
187 | 0 | } |
188 | 0 | path_obj.AddMember(path_key, arr, root.GetAllocator()); |
189 | 0 | } |
190 | 0 | root.AddMember(compaction_name, path_obj, root.GetAllocator()); |
191 | 0 | }; |
192 | |
|
193 | 0 | std::unique_lock<std::mutex> l(_tablet_submitted_compaction_mutex); |
194 | 0 | add_node("BaseCompaction", _tablet_submitted_base_compaction); |
195 | 0 | add_node("CumulativeCompaction", _tablet_submitted_cumu_compaction); |
196 | 0 | add_node("FullCompaction", _tablet_submitted_full_compaction); |
197 | |
|
198 | 0 | rapidjson::StringBuffer str_buf; |
199 | 0 | rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(str_buf); |
200 | 0 | root.Accept(writer); |
201 | 0 | *result = std::string(str_buf.GetString()); |
202 | 0 | } |
203 | | |
204 | 44 | static Status _validate_options(const EngineOptions& options) { |
205 | 44 | if (options.store_paths.empty()) { |
206 | 0 | return Status::InternalError("store paths is empty"); |
207 | 0 | } |
208 | 44 | return Status::OK(); |
209 | 44 | } |
210 | | |
211 | 44 | Status StorageEngine::open() { |
212 | 44 | RETURN_IF_ERROR(_validate_options(_options)); |
213 | 44 | LOG(INFO) << "starting backend using uid:" << _options.backend_uid.to_string(); |
214 | 44 | RETURN_NOT_OK_STATUS_WITH_WARN(_open(), "open engine failed"); |
215 | 44 | LOG(INFO) << "success to init storage engine."; |
216 | 44 | return Status::OK(); |
217 | 44 | } |
218 | | |
219 | | StorageEngine::StorageEngine(const EngineOptions& options) |
220 | 344 | : BaseStorageEngine(Type::LOCAL, options.backend_uid), |
221 | 344 | _options(options), |
222 | 344 | _available_storage_medium_type_count(0), |
223 | 344 | _is_all_cluster_id_exist(true), |
224 | 344 | _stopped(false), |
225 | 344 | _tablet_manager(new TabletManager(*this, config::tablet_map_shard_size)), |
226 | 344 | _txn_manager(new TxnManager(*this, config::txn_map_shard_size, config::txn_shard_size)), |
227 | 344 | _default_rowset_type(BETA_ROWSET), |
228 | 344 | _create_tablet_idx_lru_cache( |
229 | 344 | new CreateTabletRRIdxCache(config::partition_disk_index_lru_size)), |
230 | 344 | _snapshot_mgr(std::make_unique<SnapshotManager>(*this)) { |
231 | 344 | REGISTER_HOOK_METRIC(unused_rowsets_count, [this]() { |
232 | | // std::lock_guard<std::mutex> lock(_gc_mutex); |
233 | 344 | return _unused_rowsets.size(); |
234 | 344 | }); |
235 | | |
236 | 344 | _broken_paths = options.broken_paths; |
237 | 344 | } |
238 | | |
239 | 341 | StorageEngine::~StorageEngine() { |
240 | 341 | stop(); |
241 | 341 | } |
242 | | |
243 | 44 | static Status load_data_dirs(const std::vector<DataDir*>& data_dirs) { |
244 | 44 | std::unique_ptr<ThreadPool> pool; |
245 | | |
246 | 44 | int num_threads = config::load_data_dirs_threads; |
247 | 44 | if (num_threads <= 0) { |
248 | 44 | num_threads = cast_set<int>(data_dirs.size()); |
249 | 44 | } |
250 | | |
251 | 44 | auto st = ThreadPoolBuilder("load_data_dir") |
252 | 44 | .set_min_threads(num_threads) |
253 | 44 | .set_max_threads(num_threads) |
254 | 44 | .build(&pool); |
255 | 44 | CHECK(st.ok()) << st; |
256 | | |
257 | 44 | std::mutex result_mtx; |
258 | 44 | Status result; |
259 | | |
260 | 55 | for (auto* data_dir : data_dirs) { |
261 | 55 | st = pool->submit_func([&, data_dir] { |
262 | 55 | SCOPED_INIT_THREAD_CONTEXT(); |
263 | 55 | { |
264 | 55 | std::lock_guard lock(result_mtx); |
265 | 55 | if (!result.ok()) { // Some data dir has failed |
266 | 0 | return; |
267 | 0 | } |
268 | 55 | } |
269 | | |
270 | 55 | auto st = data_dir->load(); |
271 | 55 | if (!st.ok()) { |
272 | 0 | LOG(WARNING) << "error occured when init load tables. res=" << st |
273 | 0 | << ", data dir=" << data_dir->path(); |
274 | 0 | std::lock_guard lock(result_mtx); |
275 | 0 | result = std::move(st); |
276 | 0 | } |
277 | 55 | }); |
278 | | |
279 | 55 | if (!st.ok()) { |
280 | 0 | return st; |
281 | 0 | } |
282 | 55 | } |
283 | | |
284 | 44 | pool->wait(); |
285 | | |
286 | 44 | return result; |
287 | 44 | } |
288 | | |
289 | 44 | Status StorageEngine::_open() { |
290 | | // init store_map |
291 | 44 | RETURN_NOT_OK_STATUS_WITH_WARN(_init_store_map(), "_init_store_map failed"); |
292 | | |
293 | 44 | _effective_cluster_id = config::cluster_id; |
294 | 44 | RETURN_NOT_OK_STATUS_WITH_WARN(_check_all_root_path_cluster_id(), "fail to check cluster id"); |
295 | | |
296 | 44 | _update_storage_medium_type_count(); |
297 | | |
298 | 44 | RETURN_NOT_OK_STATUS_WITH_WARN(_check_file_descriptor_number(), "check fd number failed"); |
299 | | |
300 | 44 | auto dirs = get_stores(); |
301 | 44 | RETURN_IF_ERROR(load_data_dirs(dirs)); |
302 | | |
303 | 44 | _disk_num = cast_set<int>(dirs.size()); |
304 | 44 | _memtable_flush_executor = std::make_unique<MemTableFlushExecutor>(); |
305 | 44 | _memtable_flush_executor->init(_disk_num); |
306 | | |
307 | 44 | _calc_delete_bitmap_executor = std::make_unique<CalcDeleteBitmapExecutor>(); |
308 | 44 | _calc_delete_bitmap_executor->init(config::calc_delete_bitmap_max_thread); |
309 | | |
310 | 44 | _calc_delete_bitmap_executor_for_load = std::make_unique<CalcDeleteBitmapExecutor>(); |
311 | 44 | _calc_delete_bitmap_executor_for_load->init( |
312 | 44 | config::calc_delete_bitmap_for_load_max_thread > 0 |
313 | 44 | ? config::calc_delete_bitmap_for_load_max_thread |
314 | 44 | : std::max(1, CpuInfo::num_cores() / 2)); |
315 | | |
316 | 44 | _parse_default_rowset_type(); |
317 | | |
318 | 44 | return Status::OK(); |
319 | 44 | } |
320 | | |
321 | 44 | Status StorageEngine::_init_store_map() { |
322 | 44 | std::vector<std::thread> threads; |
323 | 44 | std::mutex error_msg_lock; |
324 | 44 | std::string error_msg; |
325 | 55 | for (auto& path : _options.store_paths) { |
326 | 55 | auto store = std::make_unique<DataDir>(*this, path.path, path.capacity_bytes, |
327 | 55 | path.storage_medium); |
328 | 55 | threads.emplace_back([store = store.get(), &error_msg_lock, &error_msg]() { |
329 | 55 | SCOPED_INIT_THREAD_CONTEXT(); |
330 | 55 | auto st = store->init(); |
331 | 55 | if (!st.ok()) { |
332 | 0 | { |
333 | 0 | std::lock_guard<std::mutex> l(error_msg_lock); |
334 | 0 | error_msg.append(st.to_string() + ";"); |
335 | 0 | } |
336 | 0 | LOG(WARNING) << "Store load failed, status=" << st.to_string() |
337 | 0 | << ", path=" << store->path(); |
338 | 0 | } |
339 | 55 | }); |
340 | 55 | _store_map.emplace(store->path(), std::move(store)); |
341 | 55 | } |
342 | 55 | for (auto& thread : threads) { |
343 | 55 | thread.join(); |
344 | 55 | } |
345 | | |
346 | | // All store paths MUST init successfully |
347 | 44 | if (!error_msg.empty()) { |
348 | 0 | return Status::InternalError("init path failed, error={}", error_msg); |
349 | 0 | } |
350 | | |
351 | 44 | RETURN_NOT_OK_STATUS_WITH_WARN(init_stream_load_recorder(_options.store_paths[0].path), |
352 | 44 | "init StreamLoadRecorder failed"); |
353 | | |
354 | 44 | return Status::OK(); |
355 | 44 | } |
356 | | |
357 | 1.81k | void StorageEngine::_update_storage_medium_type_count() { |
358 | 1.81k | set<TStorageMedium::type> available_storage_medium_types; |
359 | | |
360 | 1.81k | std::lock_guard<std::mutex> l(_store_lock); |
361 | 2.04k | for (auto& it : _store_map) { |
362 | 2.04k | if (it.second->is_used()) { |
363 | 2.04k | available_storage_medium_types.insert(it.second->storage_medium()); |
364 | 2.04k | } |
365 | 2.04k | } |
366 | | |
367 | 1.81k | _available_storage_medium_type_count = |
368 | 1.81k | cast_set<uint32_t>(available_storage_medium_types.size()); |
369 | 1.81k | } |
370 | | |
371 | 44 | Status StorageEngine::_judge_and_update_effective_cluster_id(int32_t cluster_id) { |
372 | 44 | if (cluster_id == -1 && _effective_cluster_id == -1) { |
373 | | // maybe this is a new cluster, cluster id will get from heartbeat message |
374 | 40 | return Status::OK(); |
375 | 40 | } else if (cluster_id != -1 && _effective_cluster_id == -1) { |
376 | 4 | _effective_cluster_id = cluster_id; |
377 | 4 | return Status::OK(); |
378 | 4 | } else if (cluster_id == -1 && _effective_cluster_id != -1) { |
379 | | // _effective_cluster_id is the right effective cluster id |
380 | 0 | return Status::OK(); |
381 | 0 | } else { |
382 | 0 | if (cluster_id != _effective_cluster_id) { |
383 | 0 | RETURN_NOT_OK_STATUS_WITH_WARN( |
384 | 0 | Status::Corruption("multiple cluster ids is not equal. one={}, other={}", |
385 | 0 | _effective_cluster_id, cluster_id), |
386 | 0 | "cluster id not equal"); |
387 | 0 | } |
388 | 0 | } |
389 | | |
390 | 0 | return Status::OK(); |
391 | 44 | } |
392 | | |
393 | 708 | std::vector<DataDir*> StorageEngine::get_stores(bool include_unused) { |
394 | 708 | std::vector<DataDir*> stores; |
395 | 708 | stores.reserve(_store_map.size()); |
396 | | |
397 | 708 | std::lock_guard<std::mutex> l(_store_lock); |
398 | 708 | if (include_unused) { |
399 | 0 | for (auto&& [_, store] : _store_map) { |
400 | 0 | stores.push_back(store.get()); |
401 | 0 | } |
402 | 708 | } else { |
403 | 775 | for (auto&& [_, store] : _store_map) { |
404 | 775 | if (store->is_used()) { |
405 | 775 | stores.push_back(store.get()); |
406 | 775 | } |
407 | 775 | } |
408 | 708 | } |
409 | 708 | return stores; |
410 | 708 | } |
411 | | |
412 | | Status StorageEngine::get_all_data_dir_info(std::vector<DataDirInfo>* data_dir_infos, |
413 | 349 | bool need_update) { |
414 | 349 | Status res = Status::OK(); |
415 | 349 | data_dir_infos->clear(); |
416 | | |
417 | 349 | MonotonicStopWatch timer; |
418 | 349 | timer.start(); |
419 | | |
420 | | // 1. update available capacity of each data dir |
421 | | // get all root path info and construct a path map. |
422 | | // path -> DataDirInfo |
423 | 349 | std::map<std::string, DataDirInfo> path_map; |
424 | 349 | { |
425 | 349 | std::lock_guard<std::mutex> l(_store_lock); |
426 | 393 | for (auto& it : _store_map) { |
427 | 393 | if (need_update) { |
428 | 333 | RETURN_IF_ERROR(it.second->update_capacity()); |
429 | 333 | } |
430 | 393 | path_map.emplace(it.first, it.second->get_dir_info()); |
431 | 393 | } |
432 | 349 | } |
433 | | |
434 | | // 2. get total tablets' size of each data dir |
435 | 349 | size_t tablet_count = 0; |
436 | 349 | _tablet_manager->update_root_path_info(&path_map, &tablet_count); |
437 | | |
438 | | // 3. update metrics in DataDir |
439 | 393 | for (auto& path : path_map) { |
440 | 393 | std::lock_guard<std::mutex> l(_store_lock); |
441 | 393 | auto data_dir = _store_map.find(path.first); |
442 | 393 | DCHECK(data_dir != _store_map.end()); |
443 | 393 | data_dir->second->update_local_data_size(path.second.local_used_capacity); |
444 | 393 | data_dir->second->update_remote_data_size(path.second.remote_used_capacity); |
445 | 393 | } |
446 | | |
447 | | // add path info to data_dir_infos |
448 | 393 | for (auto& entry : path_map) { |
449 | 393 | data_dir_infos->emplace_back(entry.second); |
450 | 393 | } |
451 | | |
452 | 349 | timer.stop(); |
453 | 349 | LOG(INFO) << "get root path info cost: " << timer.elapsed_time() / 1000000 |
454 | 349 | << " ms. tablet counter: " << tablet_count; |
455 | | |
456 | 349 | return res; |
457 | 349 | } |
458 | | |
459 | 60 | int64_t StorageEngine::get_file_or_directory_size(const std::string& file_path) { |
460 | 60 | if (!std::filesystem::exists(file_path)) { |
461 | 60 | return 0; |
462 | 60 | } |
463 | 0 | if (!std::filesystem::is_directory(file_path)) { |
464 | 0 | return std::filesystem::file_size(file_path); |
465 | 0 | } |
466 | 0 | int64_t sum_size = 0; |
467 | 0 | for (const auto& it : std::filesystem::directory_iterator(file_path)) { |
468 | 0 | sum_size += get_file_or_directory_size(it.path()); |
469 | 0 | } |
470 | 0 | return sum_size; |
471 | 0 | } |
472 | | |
473 | 1.77k | void StorageEngine::_start_disk_stat_monitor() { |
474 | 1.98k | for (auto& it : _store_map) { |
475 | 1.98k | it.second->health_check(); |
476 | 1.98k | } |
477 | | |
478 | 1.77k | _update_storage_medium_type_count(); |
479 | | |
480 | 1.77k | _exit_if_too_many_disks_are_failed(); |
481 | 1.77k | } |
482 | | |
483 | | // TODO(lingbin): Should be in EnvPosix? |
484 | 44 | Status StorageEngine::_check_file_descriptor_number() { |
485 | 44 | struct rlimit l; |
486 | 44 | int ret = getrlimit(RLIMIT_NOFILE, &l); |
487 | 44 | if (ret != 0) { |
488 | 0 | LOG(WARNING) << "call getrlimit() failed. errno=" << strerror(errno) |
489 | 0 | << ", use default configuration instead."; |
490 | 0 | return Status::OK(); |
491 | 0 | } |
492 | 44 | if (getenv("SKIP_CHECK_ULIMIT") == nullptr) { |
493 | 44 | LOG(INFO) << "will check 'ulimit' value."; |
494 | 44 | } else if (std::string(getenv("SKIP_CHECK_ULIMIT")) == "true") { |
495 | 0 | LOG(INFO) << "the 'ulimit' value check is skipped" |
496 | 0 | << ", the SKIP_CHECK_ULIMIT env value is " << getenv("SKIP_CHECK_ULIMIT"); |
497 | 0 | return Status::OK(); |
498 | 0 | } else { |
499 | 0 | LOG(INFO) << "the SKIP_CHECK_ULIMIT env value is " << getenv("SKIP_CHECK_ULIMIT") |
500 | 0 | << ", will check ulimit value."; |
501 | 0 | } |
502 | 44 | if (l.rlim_cur < config::min_file_descriptor_number) { |
503 | 0 | LOG(ERROR) << "File descriptor number is less than " << config::min_file_descriptor_number |
504 | 0 | << ". Please use (ulimit -n) to set a value equal or greater than " |
505 | 0 | << config::min_file_descriptor_number; |
506 | 0 | return Status::Error<ErrorCode::EXCEEDED_LIMIT>( |
507 | 0 | "file descriptors limit {} is small than {}", l.rlim_cur, |
508 | 0 | config::min_file_descriptor_number); |
509 | 0 | } |
510 | 44 | return Status::OK(); |
511 | 44 | } |
512 | | |
513 | 44 | Status StorageEngine::_check_all_root_path_cluster_id() { |
514 | 44 | int32_t cluster_id = -1; |
515 | 55 | for (auto& it : _store_map) { |
516 | 55 | int32_t tmp_cluster_id = it.second->cluster_id(); |
517 | 55 | if (it.second->cluster_id_incomplete()) { |
518 | 47 | _is_all_cluster_id_exist = false; |
519 | 47 | } else if (tmp_cluster_id == cluster_id) { |
520 | | // both have right cluster id, do nothing |
521 | 4 | } else if (cluster_id == -1) { |
522 | 4 | cluster_id = tmp_cluster_id; |
523 | 4 | } else { |
524 | 0 | RETURN_NOT_OK_STATUS_WITH_WARN( |
525 | 0 | Status::Corruption("multiple cluster ids is not equal. one={}, other={}", |
526 | 0 | cluster_id, tmp_cluster_id), |
527 | 0 | "cluster id not equal"); |
528 | 0 | } |
529 | 55 | } |
530 | | |
531 | | // judge and get effective cluster id |
532 | 44 | RETURN_IF_ERROR(_judge_and_update_effective_cluster_id(cluster_id)); |
533 | | |
534 | | // write cluster id into cluster_id_path if get effective cluster id success |
535 | 44 | if (_effective_cluster_id != -1 && !_is_all_cluster_id_exist) { |
536 | 0 | RETURN_IF_ERROR(set_cluster_id(_effective_cluster_id)); |
537 | 0 | } |
538 | | |
539 | 44 | return Status::OK(); |
540 | 44 | } |
541 | | |
542 | 2 | Status StorageEngine::set_cluster_id(int32_t cluster_id) { |
543 | 2 | std::lock_guard<std::mutex> l(_store_lock); |
544 | 2 | for (auto& it : _store_map) { |
545 | 2 | RETURN_IF_ERROR(it.second->set_cluster_id(cluster_id)); |
546 | 2 | } |
547 | 2 | _effective_cluster_id = cluster_id; |
548 | 2 | _is_all_cluster_id_exist = true; |
549 | 2 | return Status::OK(); |
550 | 2 | } |
551 | | |
552 | | int StorageEngine::_get_and_set_next_disk_index(int64_t partition_id, |
553 | 6.13k | TStorageMedium::type storage_medium) { |
554 | 6.13k | auto key = CreateTabletRRIdxCache::get_key(partition_id, storage_medium); |
555 | 6.13k | int curr_index = _create_tablet_idx_lru_cache->get_index(key); |
556 | | // -1, lru can't find key |
557 | 6.13k | if (curr_index == -1) { |
558 | 1.32k | curr_index = std::max(0, _last_use_index[storage_medium] + 1); |
559 | 1.32k | } |
560 | 6.13k | _last_use_index[storage_medium] = curr_index; |
561 | 6.13k | _create_tablet_idx_lru_cache->set_index(key, std::max(0, curr_index + 1)); |
562 | 6.13k | return curr_index; |
563 | 6.13k | } |
564 | | |
565 | | void StorageEngine::_get_candidate_stores(TStorageMedium::type storage_medium, |
566 | 6.13k | std::vector<DirInfo>& dir_infos) { |
567 | 6.13k | std::vector<double> usages; |
568 | 6.13k | for (auto& it : _store_map) { |
569 | 6.13k | DataDir* data_dir = it.second.get(); |
570 | 6.13k | if (data_dir->is_used()) { |
571 | 6.13k | if ((_available_storage_medium_type_count == 1 || |
572 | 6.13k | data_dir->storage_medium() == storage_medium) && |
573 | 6.13k | !data_dir->reach_capacity_limit(0)) { |
574 | 6.13k | double usage = data_dir->get_usage(0); |
575 | 6.13k | DirInfo dir_info; |
576 | 6.13k | dir_info.data_dir = data_dir; |
577 | 6.13k | dir_info.usage = usage; |
578 | 6.13k | dir_info.available_level = 0; |
579 | 6.13k | usages.push_back(usage); |
580 | 6.13k | dir_infos.push_back(dir_info); |
581 | 6.13k | } |
582 | 6.13k | } |
583 | 6.13k | } |
584 | | |
585 | 6.13k | if (dir_infos.size() <= 1) { |
586 | 6.13k | return; |
587 | 6.13k | } |
588 | | |
589 | 1 | std::sort(usages.begin(), usages.end()); |
590 | 1 | if (usages.back() < 0.7) { |
591 | 1 | return; |
592 | 1 | } |
593 | | |
594 | 0 | std::vector<double> level_min_usages; |
595 | 0 | level_min_usages.push_back(usages[0]); |
596 | 0 | for (auto usage : usages) { |
597 | | // usage < 0.7 consider as one level, give a small skew |
598 | 0 | if (usage < 0.7 - (config::high_disk_avail_level_diff_usages / 2.0)) { |
599 | 0 | continue; |
600 | 0 | } |
601 | | |
602 | | // at high usages, default 15% is one level |
603 | | // for example: there disk usages are: 0.66, 0.72, 0.83 |
604 | | // then level_min_usages = [0.66, 0.83], divide disks into 2 levels: [0.66, 0.72], [0.83] |
605 | 0 | if (usage >= level_min_usages.back() + config::high_disk_avail_level_diff_usages) { |
606 | 0 | level_min_usages.push_back(usage); |
607 | 0 | } |
608 | 0 | } |
609 | 0 | for (auto& dir_info : dir_infos) { |
610 | 0 | double usage = dir_info.usage; |
611 | 0 | for (size_t i = 1; i < level_min_usages.size() && usage >= level_min_usages[i]; i++) { |
612 | 0 | dir_info.available_level++; |
613 | 0 | } |
614 | | |
615 | | // when usage is too high, no matter consider balance now, |
616 | | // make it a higher level. |
617 | | // for example, two disks and usages are: 0.85 and 0.92, then let tablets fall on the first disk. |
618 | | // by default, storage_flood_stage_usage_percent = 90 |
619 | 0 | if (usage > config::storage_flood_stage_usage_percent / 100.0) { |
620 | 0 | dir_info.available_level++; |
621 | 0 | } |
622 | 0 | } |
623 | 0 | } |
624 | | |
625 | | std::vector<DataDir*> StorageEngine::get_stores_for_create_tablet( |
626 | 6.13k | int64_t partition_id, TStorageMedium::type storage_medium) { |
627 | 6.13k | std::vector<DirInfo> dir_infos; |
628 | 6.13k | int curr_index = 0; |
629 | 6.13k | std::vector<DataDir*> stores; |
630 | 6.13k | { |
631 | 6.13k | std::lock_guard<std::mutex> l(_store_lock); |
632 | 6.13k | curr_index = _get_and_set_next_disk_index(partition_id, storage_medium); |
633 | 6.13k | _get_candidate_stores(storage_medium, dir_infos); |
634 | 6.13k | } |
635 | | |
636 | 6.13k | std::sort(dir_infos.begin(), dir_infos.end()); |
637 | 6.13k | get_round_robin_stores(curr_index, dir_infos, stores); |
638 | | |
639 | 6.13k | return stores; |
640 | 6.13k | } |
641 | | |
642 | | // maintain in stores LOW,MID,HIGH level round robin |
643 | | void get_round_robin_stores(int64_t curr_index, const std::vector<DirInfo>& dir_infos, |
644 | 6.13k | std::vector<DataDir*>& stores) { |
645 | 12.2k | for (size_t i = 0; i < dir_infos.size();) { |
646 | 6.13k | size_t end = i + 1; |
647 | 6.13k | while (end < dir_infos.size() && |
648 | 6.13k | dir_infos[i].available_level == dir_infos[end].available_level) { |
649 | 1 | end++; |
650 | 1 | } |
651 | | // data dirs [i, end) have the same tablet size, round robin range [i, end) |
652 | 6.13k | size_t count = end - i; |
653 | 12.2k | for (size_t k = 0; k < count; k++) { |
654 | 6.13k | size_t index = i + ((k + curr_index) % count); |
655 | 6.13k | stores.push_back(dir_infos[index].data_dir); |
656 | 6.13k | } |
657 | 6.13k | i = end; |
658 | 6.13k | } |
659 | 6.13k | } |
660 | | |
661 | 176 | DataDir* StorageEngine::get_store(const std::string& path) { |
662 | | // _store_map is unchanged, no need to lock |
663 | 176 | auto it = _store_map.find(path); |
664 | 176 | if (it == _store_map.end()) { |
665 | 0 | return nullptr; |
666 | 0 | } |
667 | 176 | return it->second.get(); |
668 | 176 | } |
669 | | |
670 | 1.77k | static bool too_many_disks_are_failed(uint32_t unused_num, uint32_t total_num) { |
671 | 1.77k | return ((total_num == 0) || |
672 | 1.77k | (unused_num * 100 / total_num > config::max_percentage_of_error_disk)); |
673 | 1.77k | } |
674 | | |
675 | 1.77k | void StorageEngine::_exit_if_too_many_disks_are_failed() { |
676 | 1.77k | uint32_t unused_root_path_num = 0; |
677 | 1.77k | uint32_t total_root_path_num = 0; |
678 | | |
679 | 1.77k | { |
680 | | // TODO(yingchun): _store_map is only updated in main and ~StorageEngine, maybe we can remove it? |
681 | 1.77k | std::lock_guard<std::mutex> l(_store_lock); |
682 | 1.77k | if (_store_map.empty()) { |
683 | 0 | return; |
684 | 0 | } |
685 | | |
686 | 1.98k | for (auto& it : _store_map) { |
687 | 1.98k | ++total_root_path_num; |
688 | 1.98k | if (it.second->is_used()) { |
689 | 1.98k | continue; |
690 | 1.98k | } |
691 | 0 | ++unused_root_path_num; |
692 | 0 | } |
693 | 1.77k | } |
694 | | |
695 | 1.77k | if (too_many_disks_are_failed(unused_root_path_num, total_root_path_num)) { |
696 | 0 | LOG(FATAL) << "meet too many error disks, process exit. " |
697 | 0 | << "max_ratio_allowed=" << config::max_percentage_of_error_disk << "%" |
698 | 0 | << ", error_disk_count=" << unused_root_path_num |
699 | 0 | << ", total_disk_count=" << total_root_path_num; |
700 | 0 | exit(0); |
701 | 0 | } |
702 | 1.77k | } |
703 | | |
704 | 344 | void StorageEngine::stop() { |
705 | 344 | if (_stopped) { |
706 | 3 | LOG(WARNING) << "Storage engine is stopped twice."; |
707 | 3 | return; |
708 | 3 | } |
709 | | // trigger the waiting threads |
710 | 341 | notify_listeners(); |
711 | | |
712 | 341 | { |
713 | 341 | std::lock_guard<std::mutex> l(_store_lock); |
714 | 341 | for (auto& store_pair : _store_map) { |
715 | 50 | store_pair.second->stop_bg_worker(); |
716 | 50 | } |
717 | 341 | } |
718 | | |
719 | 341 | _stop_background_threads_latch.count_down(); |
720 | 341 | #define THREAD_JOIN(thread) \ |
721 | 3.75k | if (thread) { \ |
722 | 33 | thread->join(); \ |
723 | 33 | } |
724 | | |
725 | 341 | THREAD_JOIN(_compaction_tasks_producer_thread); |
726 | 341 | THREAD_JOIN(_update_replica_infos_thread); |
727 | 341 | THREAD_JOIN(_unused_rowset_monitor_thread); |
728 | 341 | THREAD_JOIN(_garbage_sweeper_thread); |
729 | 341 | THREAD_JOIN(_disk_stat_monitor_thread); |
730 | 341 | THREAD_JOIN(_cache_clean_thread); |
731 | 341 | THREAD_JOIN(_tablet_checkpoint_tasks_producer_thread); |
732 | 341 | THREAD_JOIN(_async_publish_thread); |
733 | 341 | THREAD_JOIN(_cold_data_compaction_producer_thread); |
734 | 341 | THREAD_JOIN(_cooldown_tasks_producer_thread); |
735 | 341 | THREAD_JOIN(_check_delete_bitmap_score_thread); |
736 | 341 | #undef THREAD_JOIN |
737 | | |
738 | 341 | #define THREADS_JOIN(threads) \ |
739 | 341 | for (const auto& thread : threads) { \ |
740 | 5 | if (thread) { \ |
741 | 5 | thread->join(); \ |
742 | 5 | } \ |
743 | 5 | } |
744 | | |
745 | 341 | THREADS_JOIN(_path_gc_threads); |
746 | 341 | #undef THREADS_JOIN |
747 | | |
748 | 341 | if (_base_compaction_thread_pool) { |
749 | 9 | _base_compaction_thread_pool->shutdown(); |
750 | 9 | } |
751 | 341 | if (_cumu_compaction_thread_pool) { |
752 | 10 | _cumu_compaction_thread_pool->shutdown(); |
753 | 10 | } |
754 | 341 | if (_single_replica_compaction_thread_pool) { |
755 | 3 | _single_replica_compaction_thread_pool->shutdown(); |
756 | 3 | } |
757 | | |
758 | 341 | if (_seg_compaction_thread_pool) { |
759 | 14 | _seg_compaction_thread_pool->shutdown(); |
760 | 14 | } |
761 | 341 | if (_tablet_meta_checkpoint_thread_pool) { |
762 | 3 | _tablet_meta_checkpoint_thread_pool->shutdown(); |
763 | 3 | } |
764 | 341 | if (_cold_data_compaction_thread_pool) { |
765 | 3 | _cold_data_compaction_thread_pool->shutdown(); |
766 | 3 | } |
767 | | |
768 | 341 | if (_cooldown_thread_pool) { |
769 | 3 | _cooldown_thread_pool->shutdown(); |
770 | 3 | } |
771 | | |
772 | 341 | _memtable_flush_executor.reset(nullptr); |
773 | 341 | _calc_delete_bitmap_executor.reset(nullptr); |
774 | 341 | _calc_delete_bitmap_executor_for_load.reset(); |
775 | | |
776 | 341 | _stopped = true; |
777 | 341 | LOG(INFO) << "Storage engine is stopped."; |
778 | 341 | } |
779 | | |
780 | 12 | void StorageEngine::clear_transaction_task(const TTransactionId transaction_id) { |
781 | | // clear transaction task may not contains partitions ids, we should get partition id from txn manager. |
782 | 12 | std::vector<int64_t> partition_ids; |
783 | 12 | _txn_manager->get_partition_ids(transaction_id, &partition_ids); |
784 | 12 | clear_transaction_task(transaction_id, partition_ids); |
785 | 12 | } |
786 | | |
787 | | void StorageEngine::clear_transaction_task(const TTransactionId transaction_id, |
788 | 14 | const std::vector<TPartitionId>& partition_ids) { |
789 | 14 | LOG(INFO) << "begin to clear transaction task. transaction_id=" << transaction_id; |
790 | | |
791 | 14 | for (const TPartitionId& partition_id : partition_ids) { |
792 | 14 | std::map<TabletInfo, RowsetSharedPtr> tablet_infos; |
793 | 14 | _txn_manager->get_txn_related_tablets(transaction_id, partition_id, &tablet_infos); |
794 | | |
795 | | // each tablet |
796 | 272 | for (auto& tablet_info : tablet_infos) { |
797 | | // should use tablet uid to ensure clean txn correctly |
798 | 272 | TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_info.first.tablet_id, |
799 | 272 | tablet_info.first.tablet_uid); |
800 | | // The tablet may be dropped or altered, leave a INFO log and go on process other tablet |
801 | 272 | if (tablet == nullptr) { |
802 | 0 | LOG(INFO) << "tablet is no longer exist. tablet_id=" << tablet_info.first.tablet_id |
803 | 0 | << ", tablet_uid=" << tablet_info.first.tablet_uid; |
804 | 0 | continue; |
805 | 0 | } |
806 | 272 | Status s = _txn_manager->delete_txn(partition_id, tablet, transaction_id); |
807 | 272 | if (!s.ok()) { |
808 | 0 | LOG(WARNING) << "failed to clear transaction. txn_id=" << transaction_id |
809 | 0 | << ", partition_id=" << partition_id |
810 | 0 | << ", tablet_id=" << tablet_info.first.tablet_id |
811 | 0 | << ", status=" << s.to_string(); |
812 | 0 | } |
813 | 272 | } |
814 | 14 | } |
815 | 14 | LOG(INFO) << "finish to clear transaction task. transaction_id=" << transaction_id; |
816 | 14 | } |
817 | | |
818 | 52 | Status StorageEngine::start_trash_sweep(double* usage, bool ignore_guard) { |
819 | 52 | Status res = Status::OK(); |
820 | | |
821 | 52 | std::unique_lock<std::mutex> l(_trash_sweep_lock, std::defer_lock); |
822 | 52 | if (!l.try_lock()) { |
823 | 0 | LOG(INFO) << "trash and snapshot sweep is running."; |
824 | 0 | if (ignore_guard) { |
825 | 0 | _need_clean_trash.store(true, std::memory_order_relaxed); |
826 | 0 | } |
827 | 0 | return res; |
828 | 0 | } |
829 | | |
830 | 52 | LOG(INFO) << "start trash and snapshot sweep. is_clean=" << ignore_guard; |
831 | | |
832 | 52 | const int32_t snapshot_expire = config::snapshot_expire_time_sec; |
833 | 52 | const int32_t trash_expire = config::trash_file_expire_time_sec; |
834 | | // the guard space should be lower than storage_flood_stage_usage_percent, |
835 | | // so here we multiply 0.9 |
836 | | // if ignore_guard is true, set guard_space to 0. |
837 | 52 | const double guard_space = |
838 | 52 | ignore_guard ? 0 : config::storage_flood_stage_usage_percent / 100.0 * 0.9; |
839 | 52 | std::vector<DataDirInfo> data_dir_infos; |
840 | 52 | RETURN_NOT_OK_STATUS_WITH_WARN(get_all_data_dir_info(&data_dir_infos, false), |
841 | 52 | "failed to get root path stat info when sweep trash.") |
842 | 52 | std::sort(data_dir_infos.begin(), data_dir_infos.end(), DataDirInfoLessAvailability()); |
843 | | |
844 | 52 | time_t now = time(nullptr); //获取UTC时间 |
845 | 52 | tm local_tm_now; |
846 | 52 | local_tm_now.tm_isdst = 0; |
847 | 52 | if (localtime_r(&now, &local_tm_now) == nullptr) { |
848 | 0 | return Status::Error<OS_ERROR>("fail to localtime_r time. time={}", now); |
849 | 0 | } |
850 | 52 | const time_t local_now = mktime(&local_tm_now); //得到当地日历时间 |
851 | | |
852 | 52 | double tmp_usage = 0.0; |
853 | 60 | for (DataDirInfo& info : data_dir_infos) { |
854 | 60 | LOG(INFO) << "Start to sweep path " << info.path; |
855 | 60 | if (!info.is_used) { |
856 | 0 | continue; |
857 | 0 | } |
858 | | |
859 | 60 | double curr_usage = |
860 | 60 | (double)(info.disk_capacity - info.available) / (double)info.disk_capacity; |
861 | 60 | tmp_usage = std::max(tmp_usage, curr_usage); |
862 | | |
863 | 60 | Status curr_res = Status::OK(); |
864 | 60 | auto snapshot_path = fmt::format("{}/{}", info.path, SNAPSHOT_PREFIX); |
865 | 60 | curr_res = _do_sweep(snapshot_path, local_now, snapshot_expire); |
866 | 60 | if (!curr_res.ok()) { |
867 | 0 | LOG(WARNING) << "failed to sweep snapshot. path=" << snapshot_path |
868 | 0 | << ", err_code=" << curr_res; |
869 | 0 | res = curr_res; |
870 | 0 | } |
871 | | |
872 | 60 | auto trash_path = fmt::format("{}/{}", info.path, TRASH_PREFIX); |
873 | 60 | curr_res = _do_sweep(trash_path, local_now, curr_usage > guard_space ? 0 : trash_expire); |
874 | 60 | if (!curr_res.ok()) { |
875 | 0 | LOG(WARNING) << "failed to sweep trash. path=" << trash_path |
876 | 0 | << ", err_code=" << curr_res; |
877 | 0 | res = curr_res; |
878 | 0 | } |
879 | 60 | } |
880 | | |
881 | 52 | if (usage != nullptr) { |
882 | 52 | *usage = tmp_usage; // update usage |
883 | 52 | } |
884 | | |
885 | | // clear expire incremental rowset, move deleted tablet to trash |
886 | 52 | RETURN_IF_ERROR(_tablet_manager->start_trash_sweep()); |
887 | | |
888 | | // clean rubbish transactions |
889 | 52 | _clean_unused_txns(); |
890 | | |
891 | | // clean unused rowset metas in OlapMeta |
892 | 52 | _clean_unused_rowset_metas(); |
893 | | |
894 | | // clean unused binlog metas in OlapMeta |
895 | 52 | _clean_unused_binlog_metas(); |
896 | | |
897 | | // cleand unused delete bitmap for deleted tablet |
898 | 52 | _clean_unused_delete_bitmap(); |
899 | | |
900 | | // cleand unused pending publish info for deleted tablet |
901 | 52 | _clean_unused_pending_publish_info(); |
902 | | |
903 | | // clean unused partial update info for finished txns |
904 | 52 | _clean_unused_partial_update_info(); |
905 | | |
906 | | // clean unused rowsets in remote storage backends |
907 | 60 | for (auto data_dir : get_stores()) { |
908 | 60 | data_dir->perform_remote_rowset_gc(); |
909 | 60 | data_dir->perform_remote_tablet_gc(); |
910 | 60 | data_dir->update_trash_capacity(); |
911 | 60 | } |
912 | | |
913 | 52 | return res; |
914 | 52 | } |
915 | | |
916 | 52 | void StorageEngine::_clean_unused_rowset_metas() { |
917 | 52 | std::vector<RowsetMetaSharedPtr> invalid_rowset_metas; |
918 | 52 | auto clean_rowset_func = [this, &invalid_rowset_metas](TabletUid tablet_uid, RowsetId rowset_id, |
919 | 21.2k | std::string_view meta_str) -> bool { |
920 | | // return false will break meta iterator, return true to skip this error |
921 | 21.2k | RowsetMetaSharedPtr rowset_meta(new RowsetMeta()); |
922 | 21.2k | bool parsed = rowset_meta->init(meta_str); |
923 | 21.2k | if (!parsed) { |
924 | 0 | LOG(WARNING) << "parse rowset meta string failed for rowset_id:" << rowset_id; |
925 | 0 | invalid_rowset_metas.push_back(rowset_meta); |
926 | 0 | return true; |
927 | 0 | } |
928 | 21.2k | if (rowset_meta->tablet_uid() != tablet_uid) { |
929 | 0 | LOG(WARNING) << "tablet uid is not equal, skip the rowset" |
930 | 0 | << ", rowset_id=" << rowset_meta->rowset_id() |
931 | 0 | << ", in_put_tablet_uid=" << tablet_uid |
932 | 0 | << ", tablet_uid in rowset meta=" << rowset_meta->tablet_uid(); |
933 | 0 | invalid_rowset_metas.push_back(rowset_meta); |
934 | 0 | return true; |
935 | 0 | } |
936 | | |
937 | 21.2k | TabletSharedPtr tablet = _tablet_manager->get_tablet(rowset_meta->tablet_id()); |
938 | 21.2k | if (tablet == nullptr) { |
939 | | // tablet may be dropped |
940 | | // TODO(cmy): this is better to be a VLOG, because drop table is a very common case. |
941 | | // leave it as INFO log for observation. Maybe change it in future. |
942 | 600 | LOG(INFO) << "failed to find tablet " << rowset_meta->tablet_id() |
943 | 600 | << " for rowset: " << rowset_meta->rowset_id() << ", tablet may be dropped"; |
944 | 600 | invalid_rowset_metas.push_back(rowset_meta); |
945 | 600 | return true; |
946 | 600 | } |
947 | 20.6k | if (tablet->tablet_uid() != rowset_meta->tablet_uid()) { |
948 | | // In this case, we get the tablet using the tablet id recorded in the rowset meta. |
949 | | // but the uid in the tablet is different from the one recorded in the rowset meta. |
950 | | // How this happened: |
951 | | // Replica1 of Tablet A exists on BE1. Because of the clone task, a new replica2 is createed on BE2, |
952 | | // and then replica1 deleted from BE1. After some time, we created replica again on BE1, |
953 | | // which will creates a new tablet with the same id but a different uid. |
954 | | // And in the historical version, when we deleted the replica, we did not delete the corresponding rowset meta, |
955 | | // thus causing the original rowset meta to remain(with same tablet id but different uid). |
956 | 0 | LOG(WARNING) << "rowset's tablet uid " << rowset_meta->tablet_uid() |
957 | 0 | << " does not equal to tablet uid: " << tablet->tablet_uid(); |
958 | 0 | invalid_rowset_metas.push_back(rowset_meta); |
959 | 0 | return true; |
960 | 0 | } |
961 | 20.6k | if (rowset_meta->rowset_state() == RowsetStatePB::VISIBLE && |
962 | 20.6k | (!tablet->rowset_meta_is_useful(rowset_meta)) && |
963 | 20.6k | !check_rowset_id_in_unused_rowsets(rowset_id)) { |
964 | 654 | LOG(INFO) << "rowset meta is not used any more, remove it. rowset_id=" |
965 | 654 | << rowset_meta->rowset_id(); |
966 | 654 | invalid_rowset_metas.push_back(rowset_meta); |
967 | 654 | } |
968 | 20.6k | return true; |
969 | 20.6k | }; |
970 | 52 | auto data_dirs = get_stores(); |
971 | 60 | for (auto data_dir : data_dirs) { |
972 | 60 | static_cast<void>( |
973 | 60 | RowsetMetaManager::traverse_rowset_metas(data_dir->get_meta(), clean_rowset_func)); |
974 | | // 1. delete delete_bitmap |
975 | 60 | std::set<int64_t> tablets_to_save_meta; |
976 | 1.25k | for (auto& rowset_meta : invalid_rowset_metas) { |
977 | 1.25k | TabletSharedPtr tablet = _tablet_manager->get_tablet(rowset_meta->tablet_id()); |
978 | 1.25k | if (tablet && tablet->tablet_meta()->enable_unique_key_merge_on_write()) { |
979 | 644 | tablet->tablet_meta()->remove_rowset_delete_bitmap(rowset_meta->rowset_id(), |
980 | 644 | rowset_meta->version()); |
981 | 644 | tablets_to_save_meta.emplace(tablet->tablet_id()); |
982 | 644 | } |
983 | 1.25k | } |
984 | 60 | for (const auto& tablet_id : tablets_to_save_meta) { |
985 | 30 | auto tablet = _tablet_manager->get_tablet(tablet_id); |
986 | 30 | if (tablet) { |
987 | 30 | std::shared_lock rlock(tablet->get_header_lock()); |
988 | 30 | tablet->save_meta(); |
989 | 30 | } |
990 | 30 | } |
991 | | // 2. delete rowset meta |
992 | 1.25k | for (auto& rowset_meta : invalid_rowset_metas) { |
993 | 1.25k | static_cast<void>(RowsetMetaManager::remove( |
994 | 1.25k | data_dir->get_meta(), rowset_meta->tablet_uid(), rowset_meta->rowset_id())); |
995 | 1.25k | } |
996 | 60 | LOG(INFO) << "remove " << invalid_rowset_metas.size() |
997 | 60 | << " invalid rowset meta from dir: " << data_dir->path(); |
998 | 60 | invalid_rowset_metas.clear(); |
999 | 60 | } |
1000 | 52 | } |
1001 | | |
1002 | 52 | void StorageEngine::_clean_unused_binlog_metas() { |
1003 | 52 | std::vector<std::string> unused_binlog_key_suffixes; |
1004 | 52 | auto unused_binlog_collector = [this, &unused_binlog_key_suffixes](std::string_view key, |
1005 | 52 | std::string_view value, |
1006 | 248 | bool need_check) -> bool { |
1007 | 248 | if (need_check) { |
1008 | 248 | BinlogMetaEntryPB binlog_meta_pb; |
1009 | 248 | if (UNLIKELY(!binlog_meta_pb.ParseFromArray(value.data(), |
1010 | 248 | cast_set<int>(value.size())))) { |
1011 | 0 | LOG(WARNING) << "parse rowset meta string failed for binlog meta key: " << key; |
1012 | 248 | } else if (_tablet_manager->get_tablet(binlog_meta_pb.tablet_id()) == nullptr) { |
1013 | 0 | LOG(INFO) << "failed to find tablet " << binlog_meta_pb.tablet_id() |
1014 | 0 | << " for binlog rowset: " << binlog_meta_pb.rowset_id() |
1015 | 0 | << ", tablet may be dropped"; |
1016 | 248 | } else { |
1017 | 248 | return false; |
1018 | 248 | } |
1019 | 248 | } |
1020 | | |
1021 | 0 | unused_binlog_key_suffixes.emplace_back(key.substr(kBinlogMetaPrefix.size())); |
1022 | 0 | return true; |
1023 | 248 | }; |
1024 | 52 | auto data_dirs = get_stores(); |
1025 | 60 | for (auto data_dir : data_dirs) { |
1026 | 60 | static_cast<void>(RowsetMetaManager::traverse_binlog_metas(data_dir->get_meta(), |
1027 | 60 | unused_binlog_collector)); |
1028 | 60 | for (const auto& suffix : unused_binlog_key_suffixes) { |
1029 | 0 | static_cast<void>(RowsetMetaManager::remove_binlog(data_dir->get_meta(), suffix)); |
1030 | 0 | } |
1031 | 60 | LOG(INFO) << "remove " << unused_binlog_key_suffixes.size() |
1032 | 60 | << " invalid binlog meta from dir: " << data_dir->path(); |
1033 | 60 | unused_binlog_key_suffixes.clear(); |
1034 | 60 | } |
1035 | 52 | } |
1036 | | |
1037 | 52 | void StorageEngine::_clean_unused_delete_bitmap() { |
1038 | 52 | std::unordered_set<int64_t> removed_tablets; |
1039 | 52 | auto clean_delete_bitmap_func = [this, &removed_tablets](int64_t tablet_id, int64_t version, |
1040 | 284 | std::string_view val) -> bool { |
1041 | 284 | TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id); |
1042 | 284 | if (tablet == nullptr) { |
1043 | 0 | if (removed_tablets.insert(tablet_id).second) { |
1044 | 0 | LOG(INFO) << "clean ununsed delete bitmap for deleted tablet, tablet_id: " |
1045 | 0 | << tablet_id; |
1046 | 0 | } |
1047 | 0 | } |
1048 | 284 | return true; |
1049 | 284 | }; |
1050 | 52 | auto data_dirs = get_stores(); |
1051 | 60 | for (auto data_dir : data_dirs) { |
1052 | 60 | static_cast<void>(TabletMetaManager::traverse_delete_bitmap(data_dir->get_meta(), |
1053 | 60 | clean_delete_bitmap_func)); |
1054 | 60 | for (auto id : removed_tablets) { |
1055 | 0 | static_cast<void>( |
1056 | 0 | TabletMetaManager::remove_old_version_delete_bitmap(data_dir, id, INT64_MAX)); |
1057 | 0 | } |
1058 | 60 | LOG(INFO) << "removed invalid delete bitmap from dir: " << data_dir->path() |
1059 | 60 | << ", deleted tablets size: " << removed_tablets.size(); |
1060 | 60 | removed_tablets.clear(); |
1061 | 60 | } |
1062 | 52 | } |
1063 | | |
1064 | 52 | void StorageEngine::_clean_unused_pending_publish_info() { |
1065 | 52 | std::vector<std::pair<int64_t, int64_t>> removed_infos; |
1066 | 52 | auto clean_pending_publish_info_func = [this, &removed_infos](int64_t tablet_id, |
1067 | 52 | int64_t publish_version, |
1068 | 52 | std::string_view info) -> bool { |
1069 | 0 | TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id); |
1070 | 0 | if (tablet == nullptr) { |
1071 | 0 | removed_infos.emplace_back(tablet_id, publish_version); |
1072 | 0 | } |
1073 | 0 | return true; |
1074 | 0 | }; |
1075 | 52 | auto data_dirs = get_stores(); |
1076 | 60 | for (auto data_dir : data_dirs) { |
1077 | 60 | static_cast<void>(TabletMetaManager::traverse_pending_publish( |
1078 | 60 | data_dir->get_meta(), clean_pending_publish_info_func)); |
1079 | 60 | for (auto& [tablet_id, publish_version] : removed_infos) { |
1080 | 0 | static_cast<void>(TabletMetaManager::remove_pending_publish_info(data_dir, tablet_id, |
1081 | 0 | publish_version)); |
1082 | 0 | } |
1083 | 60 | LOG(INFO) << "removed invalid pending publish info from dir: " << data_dir->path() |
1084 | 60 | << ", deleted pending publish info size: " << removed_infos.size(); |
1085 | 60 | removed_infos.clear(); |
1086 | 60 | } |
1087 | 52 | } |
1088 | | |
1089 | 52 | void StorageEngine::_clean_unused_partial_update_info() { |
1090 | 52 | std::vector<std::tuple<int64_t, int64_t, int64_t>> remove_infos; |
1091 | 52 | auto unused_partial_update_info_collector = |
1092 | 52 | [this, &remove_infos](int64_t tablet_id, int64_t partition_id, int64_t txn_id, |
1093 | 52 | std::string_view value) -> bool { |
1094 | 0 | TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id); |
1095 | 0 | if (tablet == nullptr) { |
1096 | 0 | remove_infos.emplace_back(tablet_id, partition_id, txn_id); |
1097 | 0 | return true; |
1098 | 0 | } |
1099 | 0 | TxnState txn_state = |
1100 | 0 | _txn_manager->get_txn_state(partition_id, txn_id, tablet_id, tablet->tablet_uid()); |
1101 | 0 | if (txn_state == TxnState::NOT_FOUND || txn_state == TxnState::ABORTED || |
1102 | 0 | txn_state == TxnState::DELETED) { |
1103 | 0 | remove_infos.emplace_back(tablet_id, partition_id, txn_id); |
1104 | 0 | return true; |
1105 | 0 | } |
1106 | 0 | return true; |
1107 | 0 | }; |
1108 | 52 | auto data_dirs = get_stores(); |
1109 | 60 | for (auto* data_dir : data_dirs) { |
1110 | 60 | static_cast<void>(RowsetMetaManager::traverse_partial_update_info( |
1111 | 60 | data_dir->get_meta(), unused_partial_update_info_collector)); |
1112 | 60 | static_cast<void>( |
1113 | 60 | RowsetMetaManager::remove_partial_update_infos(data_dir->get_meta(), remove_infos)); |
1114 | 60 | } |
1115 | 52 | } |
1116 | | |
1117 | 0 | void StorageEngine::gc_binlogs(const std::unordered_map<int64_t, int64_t>& gc_tablet_infos) { |
1118 | 0 | for (auto [tablet_id, version] : gc_tablet_infos) { |
1119 | 0 | LOG(INFO) << fmt::format("start to gc binlogs for tablet_id: {}, version: {}", tablet_id, |
1120 | 0 | version); |
1121 | |
|
1122 | 0 | TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id); |
1123 | 0 | if (tablet == nullptr) { |
1124 | 0 | LOG(WARNING) << fmt::format("tablet_id: {} not found", tablet_id); |
1125 | 0 | continue; |
1126 | 0 | } |
1127 | 0 | tablet->gc_binlogs(version); |
1128 | 0 | } |
1129 | 0 | } |
1130 | | |
1131 | 52 | void StorageEngine::_clean_unused_txns() { |
1132 | 52 | std::set<TabletInfo> tablet_infos; |
1133 | 52 | _txn_manager->get_all_related_tablets(&tablet_infos); |
1134 | 980 | for (auto& tablet_info : tablet_infos) { |
1135 | 980 | TabletSharedPtr tablet = |
1136 | 980 | _tablet_manager->get_tablet(tablet_info.tablet_id, tablet_info.tablet_uid, true); |
1137 | 980 | if (tablet == nullptr) { |
1138 | | // TODO(ygl) : should check if tablet still in meta, it's a improvement |
1139 | | // case 1: tablet still in meta, just remove from memory |
1140 | | // case 2: tablet not in meta store, remove rowset from meta |
1141 | | // currently just remove them from memory |
1142 | | // nullptr to indicate not remove them from meta store |
1143 | 0 | _txn_manager->force_rollback_tablet_related_txns(nullptr, tablet_info.tablet_id, |
1144 | 0 | tablet_info.tablet_uid); |
1145 | 0 | } |
1146 | 980 | } |
1147 | 52 | } |
1148 | | |
1149 | | Status StorageEngine::_do_sweep(const std::string& scan_root, const time_t& local_now, |
1150 | 120 | const int32_t expire) { |
1151 | 120 | Status res = Status::OK(); |
1152 | 120 | bool exists = true; |
1153 | 120 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(scan_root, &exists)); |
1154 | 120 | if (!exists) { |
1155 | | // dir not existed. no need to sweep trash. |
1156 | 90 | return res; |
1157 | 90 | } |
1158 | | |
1159 | 30 | int curr_sweep_batch_size = 0; |
1160 | 30 | try { |
1161 | | // Sort pathes by name, that is by delete time. |
1162 | 30 | std::vector<path> sorted_pathes; |
1163 | 30 | std::copy(directory_iterator(scan_root), directory_iterator(), |
1164 | 30 | std::back_inserter(sorted_pathes)); |
1165 | 30 | std::sort(sorted_pathes.begin(), sorted_pathes.end()); |
1166 | 30 | for (const auto& sorted_path : sorted_pathes) { |
1167 | 4 | string dir_name = sorted_path.filename().string(); |
1168 | 4 | string str_time = dir_name.substr(0, dir_name.find('.')); |
1169 | 4 | tm local_tm_create; |
1170 | 4 | local_tm_create.tm_isdst = 0; |
1171 | 4 | if (strptime(str_time.c_str(), "%Y%m%d%H%M%S", &local_tm_create) == nullptr) { |
1172 | 0 | res = Status::Error<OS_ERROR>("fail to strptime time. time={}", str_time); |
1173 | 0 | continue; |
1174 | 0 | } |
1175 | | |
1176 | 4 | int32_t actual_expire = expire; |
1177 | | // try get timeout in dir name, the old snapshot dir does not contain timeout |
1178 | | // eg: 20190818221123.3.86400, the 86400 is timeout, in second |
1179 | 4 | size_t pos = dir_name.find('.', str_time.size() + 1); |
1180 | 4 | if (pos != string::npos) { |
1181 | 4 | actual_expire = std::stoi(dir_name.substr(pos + 1)); |
1182 | 4 | } |
1183 | 4 | VLOG_TRACE << "get actual expire time " << actual_expire << " of dir: " << dir_name; |
1184 | | |
1185 | 4 | string path_name = sorted_path.string(); |
1186 | 4 | if (difftime(local_now, mktime(&local_tm_create)) >= actual_expire) { |
1187 | 0 | res = io::global_local_filesystem()->delete_directory(path_name); |
1188 | 0 | LOG(INFO) << "do sweep delete directory " << path_name << " local_now " << local_now |
1189 | 0 | << "actual_expire " << actual_expire << " res " << res; |
1190 | 0 | if (!res.ok()) { |
1191 | 0 | continue; |
1192 | 0 | } |
1193 | | |
1194 | 0 | curr_sweep_batch_size++; |
1195 | 0 | if (config::garbage_sweep_batch_size > 0 && |
1196 | 0 | curr_sweep_batch_size >= config::garbage_sweep_batch_size) { |
1197 | 0 | curr_sweep_batch_size = 0; |
1198 | 0 | std::this_thread::sleep_for(std::chrono::milliseconds(1)); |
1199 | 0 | } |
1200 | 4 | } else { |
1201 | | // Because files are ordered by filename, i.e. by create time, so all the left files are not expired. |
1202 | 4 | break; |
1203 | 4 | } |
1204 | 4 | } |
1205 | 30 | } catch (...) { |
1206 | 0 | res = Status::Error<IO_ERROR>("Exception occur when scan directory. path_desc={}", |
1207 | 0 | scan_root); |
1208 | 0 | } |
1209 | | |
1210 | 30 | return res; |
1211 | 30 | } |
1212 | | |
1213 | | // invalid rowset type config will return ALPHA_ROWSET for system to run smoothly |
1214 | 44 | void StorageEngine::_parse_default_rowset_type() { |
1215 | 44 | std::string default_rowset_type_config = config::default_rowset_type; |
1216 | 44 | boost::to_upper(default_rowset_type_config); |
1217 | 44 | if (default_rowset_type_config == "BETA") { |
1218 | 44 | _default_rowset_type = BETA_ROWSET; |
1219 | 44 | } else if (default_rowset_type_config == "ALPHA") { |
1220 | 0 | _default_rowset_type = ALPHA_ROWSET; |
1221 | 0 | LOG(WARNING) << "default_rowset_type in be.conf should be set to beta, alpha is not " |
1222 | 0 | "supported any more"; |
1223 | 0 | } else { |
1224 | 0 | LOG(FATAL) << "unknown value " << default_rowset_type_config |
1225 | 0 | << " in default_rowset_type in be.conf"; |
1226 | 0 | } |
1227 | 44 | } |
1228 | | |
1229 | 300 | void StorageEngine::start_delete_unused_rowset() { |
1230 | 300 | DBUG_EXECUTE_IF("StorageEngine::start_delete_unused_rowset.block", DBUG_BLOCK); |
1231 | 300 | LOG(INFO) << "start to delete unused rowset, size: " << _unused_rowsets.size() |
1232 | 300 | << ", unused delete bitmap size: " << _unused_delete_bitmap.size(); |
1233 | 300 | std::vector<RowsetSharedPtr> unused_rowsets_copy; |
1234 | 300 | unused_rowsets_copy.reserve(_unused_rowsets.size()); |
1235 | 300 | auto due_to_use_count = 0; |
1236 | 300 | auto due_to_not_delete_file = 0; |
1237 | 300 | auto due_to_delayed_expired_ts = 0; |
1238 | 300 | std::set<int64_t> tablets_to_save_meta; |
1239 | 300 | { |
1240 | 300 | std::lock_guard<std::mutex> lock(_gc_mutex); |
1241 | 16.1k | for (auto it = _unused_rowsets.begin(); it != _unused_rowsets.end();) { |
1242 | 15.8k | auto&& rs = it->second; |
1243 | 15.8k | if (rs.use_count() == 1 && rs->need_delete_file()) { |
1244 | | // remote rowset data will be reclaimed by `remove_unused_remote_files` |
1245 | 15.8k | if (rs->is_local()) { |
1246 | 15.8k | unused_rowsets_copy.push_back(std::move(rs)); |
1247 | 15.8k | } |
1248 | 15.8k | it = _unused_rowsets.erase(it); |
1249 | 15.8k | } else { |
1250 | 0 | if (rs.use_count() != 1) { |
1251 | 0 | ++due_to_use_count; |
1252 | 0 | } else if (!rs->need_delete_file()) { |
1253 | 0 | ++due_to_not_delete_file; |
1254 | 0 | } else { |
1255 | 0 | ++due_to_delayed_expired_ts; |
1256 | 0 | } |
1257 | 0 | ++it; |
1258 | 0 | } |
1259 | 15.8k | } |
1260 | | // check remove delete bitmaps |
1261 | 300 | for (auto it = _unused_delete_bitmap.begin(); it != _unused_delete_bitmap.end();) { |
1262 | 0 | auto tablet_id = std::get<0>(*it); |
1263 | 0 | auto tablet = _tablet_manager->get_tablet(tablet_id); |
1264 | 0 | if (tablet == nullptr) { |
1265 | 0 | it = _unused_delete_bitmap.erase(it); |
1266 | 0 | continue; |
1267 | 0 | } |
1268 | 0 | auto& rowset_ids = std::get<1>(*it); |
1269 | 0 | auto& key_ranges = std::get<2>(*it); |
1270 | 0 | bool find_unused_rowset = false; |
1271 | 0 | for (const auto& rowset_id : rowset_ids) { |
1272 | 0 | if (_unused_rowsets.find(rowset_id) != _unused_rowsets.end()) { |
1273 | 0 | VLOG_DEBUG << "can not remove pre rowset delete bitmap because rowset is in use" |
1274 | 0 | << ", tablet_id=" << tablet_id |
1275 | 0 | << ", rowset_id=" << rowset_id.to_string(); |
1276 | 0 | find_unused_rowset = true; |
1277 | 0 | break; |
1278 | 0 | } |
1279 | 0 | } |
1280 | 0 | if (find_unused_rowset) { |
1281 | 0 | ++it; |
1282 | 0 | continue; |
1283 | 0 | } |
1284 | 0 | tablet->tablet_meta()->delete_bitmap().remove(key_ranges); |
1285 | 0 | tablets_to_save_meta.emplace(tablet_id); |
1286 | 0 | it = _unused_delete_bitmap.erase(it); |
1287 | 0 | } |
1288 | 300 | } |
1289 | 300 | LOG(INFO) << "collected " << unused_rowsets_copy.size() << " unused rowsets to remove, skipped " |
1290 | 300 | << due_to_use_count << " rowsets due to use count > 1, skipped " |
1291 | 300 | << due_to_not_delete_file << " rowsets due to don't need to delete file, skipped " |
1292 | 300 | << due_to_delayed_expired_ts << " rowsets due to delayed expired timestamp. left " |
1293 | 300 | << _unused_delete_bitmap.size() << " unused delete bitmap."; |
1294 | 15.8k | for (auto&& rs : unused_rowsets_copy) { |
1295 | 15.8k | VLOG_NOTICE << "start to remove rowset:" << rs->rowset_id() |
1296 | 0 | << ", version:" << rs->version(); |
1297 | | // delete delete_bitmap of unused rowsets |
1298 | 15.8k | if (auto tablet = _tablet_manager->get_tablet(rs->rowset_meta()->tablet_id()); |
1299 | 15.8k | tablet && tablet->enable_unique_key_merge_on_write()) { |
1300 | 3.68k | tablet->tablet_meta()->remove_rowset_delete_bitmap(rs->rowset_id(), rs->version()); |
1301 | 3.68k | tablets_to_save_meta.emplace(tablet->tablet_id()); |
1302 | 3.68k | } |
1303 | 15.8k | Status status = rs->remove(); |
1304 | 15.8k | unused_rowsets_counter << -1; |
1305 | 15.8k | VLOG_NOTICE << "remove rowset:" << rs->rowset_id() << " finished. status:" << status; |
1306 | 15.8k | } |
1307 | 300 | for (const auto& tablet_id : tablets_to_save_meta) { |
1308 | 198 | auto tablet = _tablet_manager->get_tablet(tablet_id); |
1309 | 198 | if (tablet) { |
1310 | 198 | std::shared_lock rlock(tablet->get_header_lock()); |
1311 | 198 | tablet->save_meta(); |
1312 | 198 | } |
1313 | 198 | } |
1314 | 300 | LOG(INFO) << "removed all collected unused rowsets"; |
1315 | 300 | } |
1316 | | |
1317 | 15.8k | void StorageEngine::add_unused_rowset(RowsetSharedPtr rowset) { |
1318 | 15.8k | if (rowset == nullptr) { |
1319 | 5 | return; |
1320 | 5 | } |
1321 | 15.8k | VLOG_NOTICE << "add unused rowset, rowset id:" << rowset->rowset_id() |
1322 | 39 | << ", version:" << rowset->version(); |
1323 | 15.8k | std::lock_guard<std::mutex> lock(_gc_mutex); |
1324 | 15.8k | auto it = _unused_rowsets.find(rowset->rowset_id()); |
1325 | 15.8k | if (it == _unused_rowsets.end()) { |
1326 | 15.8k | rowset->set_need_delete_file(); |
1327 | 15.8k | rowset->close(); |
1328 | 15.8k | _unused_rowsets[rowset->rowset_id()] = std::move(rowset); |
1329 | 15.8k | unused_rowsets_counter << 1; |
1330 | 15.8k | } |
1331 | 15.8k | } |
1332 | | |
1333 | | void StorageEngine::add_unused_delete_bitmap_key_ranges(int64_t tablet_id, |
1334 | | const std::vector<RowsetId>& rowsets, |
1335 | 0 | const DeleteBitmapKeyRanges& key_ranges) { |
1336 | 0 | VLOG_NOTICE << "add unused delete bitmap key ranges, tablet id:" << tablet_id; |
1337 | 0 | std::lock_guard<std::mutex> lock(_gc_mutex); |
1338 | 0 | _unused_delete_bitmap.push_back(std::make_tuple(tablet_id, rowsets, key_ranges)); |
1339 | 0 | } |
1340 | | |
1341 | | // TODO(zc): refactor this funciton |
1342 | 6.13k | Status StorageEngine::create_tablet(const TCreateTabletReq& request, RuntimeProfile* profile) { |
1343 | | // Get all available stores, use ref_root_path if the caller specified |
1344 | 6.13k | std::vector<DataDir*> stores; |
1345 | 6.13k | { |
1346 | 6.13k | SCOPED_TIMER(ADD_TIMER(profile, "GetStores")); |
1347 | 6.13k | stores = get_stores_for_create_tablet(request.partition_id, request.storage_medium); |
1348 | 6.13k | } |
1349 | 6.13k | if (stores.empty()) { |
1350 | 0 | return Status::Error<CE_CMD_PARAMS_ERROR>( |
1351 | 0 | "there is no available disk that can be used to create tablet."); |
1352 | 0 | } |
1353 | 6.13k | return _tablet_manager->create_tablet(request, stores, profile); |
1354 | 6.13k | } |
1355 | | |
1356 | | Result<BaseTabletSPtr> StorageEngine::get_tablet(int64_t tablet_id, SyncRowsetStats* sync_stats, |
1357 | 357k | bool force_use_only_cached, bool cache_on_miss) { |
1358 | 357k | BaseTabletSPtr tablet; |
1359 | 357k | std::string err; |
1360 | 357k | tablet = _tablet_manager->get_tablet(tablet_id, true, &err); |
1361 | 357k | if (tablet == nullptr) { |
1362 | 1 | return unexpected( |
1363 | 1 | Status::InternalError("failed to get tablet: {}, reason: {}", tablet_id, err)); |
1364 | 1 | } |
1365 | 357k | return tablet; |
1366 | 357k | } |
1367 | | |
1368 | | Status StorageEngine::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta, |
1369 | 0 | bool force_use_only_cached) { |
1370 | 0 | if (tablet_meta == nullptr) { |
1371 | 0 | return Status::InvalidArgument("tablet_meta output is null"); |
1372 | 0 | } |
1373 | | |
1374 | 0 | auto res = get_tablet(tablet_id, nullptr, force_use_only_cached, true); |
1375 | 0 | if (!res.has_value()) { |
1376 | 0 | return res.error(); |
1377 | 0 | } |
1378 | | |
1379 | 0 | *tablet_meta = res.value()->tablet_meta(); |
1380 | 0 | return Status::OK(); |
1381 | 0 | } |
1382 | | |
1383 | | Status StorageEngine::obtain_shard_path(TStorageMedium::type storage_medium, int64_t path_hash, |
1384 | | std::string* shard_path, DataDir** store, |
1385 | 0 | int64_t partition_id) { |
1386 | 0 | LOG(INFO) << "begin to process obtain root path. storage_medium=" << storage_medium; |
1387 | |
|
1388 | 0 | if (shard_path == nullptr) { |
1389 | 0 | return Status::Error<CE_CMD_PARAMS_ERROR>( |
1390 | 0 | "invalid output parameter which is null pointer."); |
1391 | 0 | } |
1392 | | |
1393 | 0 | auto stores = get_stores_for_create_tablet(partition_id, storage_medium); |
1394 | 0 | if (stores.empty()) { |
1395 | 0 | return Status::Error<NO_AVAILABLE_ROOT_PATH>( |
1396 | 0 | "no available disk can be used to create tablet."); |
1397 | 0 | } |
1398 | | |
1399 | 0 | *store = nullptr; |
1400 | 0 | if (path_hash != -1) { |
1401 | 0 | for (auto data_dir : stores) { |
1402 | 0 | if (data_dir->path_hash() == path_hash) { |
1403 | 0 | *store = data_dir; |
1404 | 0 | break; |
1405 | 0 | } |
1406 | 0 | } |
1407 | 0 | } |
1408 | 0 | if (*store == nullptr) { |
1409 | 0 | *store = stores[0]; |
1410 | 0 | } |
1411 | |
|
1412 | 0 | uint64_t shard = (*store)->get_shard(); |
1413 | |
|
1414 | 0 | std::stringstream root_path_stream; |
1415 | 0 | root_path_stream << (*store)->path() << "/" << DATA_PREFIX << "/" << shard; |
1416 | 0 | *shard_path = root_path_stream.str(); |
1417 | |
|
1418 | 0 | LOG(INFO) << "success to process obtain root path. path=" << *shard_path; |
1419 | 0 | return Status::OK(); |
1420 | 0 | } |
1421 | | |
1422 | | Status StorageEngine::load_header(const string& shard_path, const TCloneReq& request, |
1423 | 0 | bool restore) { |
1424 | 0 | LOG(INFO) << "begin to process load headers." |
1425 | 0 | << "tablet_id=" << request.tablet_id << ", schema_hash=" << request.schema_hash; |
1426 | 0 | Status res = Status::OK(); |
1427 | |
|
1428 | 0 | DataDir* store = nullptr; |
1429 | 0 | { |
1430 | | // TODO(zc) |
1431 | 0 | try { |
1432 | 0 | auto store_path = |
1433 | 0 | std::filesystem::path(shard_path).parent_path().parent_path().string(); |
1434 | 0 | store = get_store(store_path); |
1435 | 0 | if (store == nullptr) { |
1436 | 0 | return Status::Error<INVALID_ROOT_PATH>("invalid shard path, path={}", shard_path); |
1437 | 0 | } |
1438 | 0 | } catch (...) { |
1439 | 0 | return Status::Error<INVALID_ROOT_PATH>("invalid shard path, path={}", shard_path); |
1440 | 0 | } |
1441 | 0 | } |
1442 | | |
1443 | 0 | std::stringstream schema_hash_path_stream; |
1444 | 0 | schema_hash_path_stream << shard_path << "/" << request.tablet_id << "/" << request.schema_hash; |
1445 | | // not surely, reload and restore tablet action call this api |
1446 | | // reset tablet uid here |
1447 | |
|
1448 | 0 | string header_path = TabletMeta::construct_header_file_path(schema_hash_path_stream.str(), |
1449 | 0 | request.tablet_id); |
1450 | 0 | res = _tablet_manager->load_tablet_from_dir(store, request.tablet_id, request.schema_hash, |
1451 | 0 | schema_hash_path_stream.str(), false, restore); |
1452 | 0 | if (!res.ok()) { |
1453 | 0 | LOG(WARNING) << "fail to process load headers. res=" << res; |
1454 | 0 | return res; |
1455 | 0 | } |
1456 | | |
1457 | 0 | LOG(INFO) << "success to process load headers."; |
1458 | 0 | return res; |
1459 | 0 | } |
1460 | | |
1461 | 29 | void BaseStorageEngine::register_report_listener(ReportWorker* listener) { |
1462 | 29 | std::lock_guard<std::mutex> l(_report_mtx); |
1463 | 29 | if (std::find(_report_listeners.begin(), _report_listeners.end(), listener) != |
1464 | 29 | _report_listeners.end()) [[unlikely]] { |
1465 | 0 | return; |
1466 | 0 | } |
1467 | 29 | _report_listeners.push_back(listener); |
1468 | 29 | } |
1469 | | |
1470 | 13 | void BaseStorageEngine::deregister_report_listener(ReportWorker* listener) { |
1471 | 13 | std::lock_guard<std::mutex> l(_report_mtx); |
1472 | 13 | if (auto it = std::find(_report_listeners.begin(), _report_listeners.end(), listener); |
1473 | 13 | it != _report_listeners.end()) { |
1474 | 13 | _report_listeners.erase(it); |
1475 | 13 | } |
1476 | 13 | } |
1477 | | |
1478 | 355 | void BaseStorageEngine::notify_listeners() { |
1479 | 355 | std::lock_guard<std::mutex> l(_report_mtx); |
1480 | 355 | for (auto& listener : _report_listeners) { |
1481 | 56 | listener->notify(); |
1482 | 56 | } |
1483 | 355 | } |
1484 | | |
1485 | 2 | bool BaseStorageEngine::notify_listener(std::string_view name) { |
1486 | 2 | bool found = false; |
1487 | 2 | std::lock_guard<std::mutex> l(_report_mtx); |
1488 | 5 | for (auto& listener : _report_listeners) { |
1489 | 5 | if (listener->name() == name) { |
1490 | 2 | listener->notify(); |
1491 | 2 | found = true; |
1492 | 2 | } |
1493 | 5 | } |
1494 | 2 | return found; |
1495 | 2 | } |
1496 | | |
1497 | 7 | void BaseStorageEngine::_evict_quring_rowset_thread_callback() { |
1498 | 7 | int32_t interval = config::quering_rowsets_evict_interval; |
1499 | 448 | do { |
1500 | 448 | _evict_querying_rowset(); |
1501 | 448 | interval = config::quering_rowsets_evict_interval; |
1502 | 448 | if (interval <= 0) { |
1503 | 0 | LOG(WARNING) << "quering_rowsets_evict_interval config is illegal: " << interval |
1504 | 0 | << ", force set to 1"; |
1505 | 0 | interval = 1; |
1506 | 0 | } |
1507 | 448 | } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))); |
1508 | 7 | } |
1509 | | |
1510 | | // check whether any unused rowsets's id equal to rowset_id |
1511 | 4.39k | bool StorageEngine::check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id) { |
1512 | 4.39k | std::lock_guard<std::mutex> lock(_gc_mutex); |
1513 | 4.39k | return _unused_rowsets.contains(rowset_id); |
1514 | 4.39k | } |
1515 | | |
1516 | 1.39k | PendingRowsetGuard StorageEngine::add_pending_rowset(const RowsetWriterContext& ctx) { |
1517 | 1.39k | if (ctx.is_local_rowset()) { |
1518 | 1.38k | return _pending_local_rowsets.add(ctx.rowset_id); |
1519 | 1.38k | } |
1520 | 2 | return _pending_remote_rowsets.add(ctx.rowset_id); |
1521 | 1.39k | } |
1522 | | |
1523 | | bool StorageEngine::get_peer_replica_info(int64_t tablet_id, TReplicaInfo* replica, |
1524 | 0 | std::string* token) { |
1525 | 0 | TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id); |
1526 | 0 | if (tablet == nullptr) { |
1527 | 0 | LOG(WARNING) << "tablet is no longer exist: tablet_id=" << tablet_id; |
1528 | 0 | return false; |
1529 | 0 | } |
1530 | 0 | std::unique_lock<std::mutex> lock(_peer_replica_infos_mutex); |
1531 | 0 | if (_peer_replica_infos.contains(tablet_id) && |
1532 | 0 | _peer_replica_infos[tablet_id].replica_id != tablet->replica_id()) { |
1533 | 0 | *replica = _peer_replica_infos[tablet_id]; |
1534 | 0 | *token = _token; |
1535 | 0 | return true; |
1536 | 0 | } |
1537 | 0 | return false; |
1538 | 0 | } |
1539 | | |
1540 | 0 | bool StorageEngine::get_peers_replica_backends(int64_t tablet_id, std::vector<TBackend>* backends) { |
1541 | 0 | TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id); |
1542 | 0 | if (tablet == nullptr) { |
1543 | 0 | LOG(WARNING) << "tablet is no longer exist: tablet_id=" << tablet_id; |
1544 | 0 | return false; |
1545 | 0 | } |
1546 | 0 | int64_t cur_time = UnixMillis(); |
1547 | 0 | if (cur_time - _last_get_peers_replica_backends_time_ms < 10000) { |
1548 | 0 | LOG_WARNING("failed to get peers replica backens.") |
1549 | 0 | .tag("tablet_id", tablet_id) |
1550 | 0 | .tag("last time", _last_get_peers_replica_backends_time_ms) |
1551 | 0 | .tag("cur time", cur_time); |
1552 | 0 | return false; |
1553 | 0 | } |
1554 | 0 | LOG_INFO("start get peers replica backends info.").tag("tablet id", tablet_id); |
1555 | 0 | ClusterInfo* cluster_info = ExecEnv::GetInstance()->cluster_info(); |
1556 | 0 | if (cluster_info == nullptr) { |
1557 | 0 | LOG(WARNING) << "Have not get FE Master heartbeat yet"; |
1558 | 0 | return false; |
1559 | 0 | } |
1560 | 0 | TNetworkAddress master_addr = cluster_info->master_fe_addr; |
1561 | 0 | if (master_addr.hostname.empty() || master_addr.port == 0) { |
1562 | 0 | LOG(WARNING) << "Have not get FE Master heartbeat yet"; |
1563 | 0 | return false; |
1564 | 0 | } |
1565 | 0 | TGetTabletReplicaInfosRequest request; |
1566 | 0 | TGetTabletReplicaInfosResult result; |
1567 | 0 | request.tablet_ids.emplace_back(tablet_id); |
1568 | 0 | Status rpc_st = ThriftRpcHelper::rpc<FrontendServiceClient>( |
1569 | 0 | master_addr.hostname, master_addr.port, |
1570 | 0 | [&request, &result](FrontendServiceConnection& client) { |
1571 | 0 | client->getTabletReplicaInfos(result, request); |
1572 | 0 | }); |
1573 | |
|
1574 | 0 | if (!rpc_st.ok()) { |
1575 | 0 | LOG(WARNING) << "Failed to get tablet replica infos, encounter rpc failure, " |
1576 | 0 | "tablet id: " |
1577 | 0 | << tablet_id; |
1578 | 0 | return false; |
1579 | 0 | } |
1580 | 0 | std::unique_lock<std::mutex> lock(_peer_replica_infos_mutex); |
1581 | 0 | if (result.tablet_replica_infos.contains(tablet_id)) { |
1582 | 0 | std::vector<TReplicaInfo> reps = result.tablet_replica_infos[tablet_id]; |
1583 | 0 | if (reps.empty()) [[unlikely]] { |
1584 | 0 | VLOG_DEBUG << "get_peers_replica_backends reps is empty, maybe this tablet is in " |
1585 | 0 | "schema change. Go to FE to see more info. Tablet id: " |
1586 | 0 | << tablet_id; |
1587 | 0 | } |
1588 | 0 | for (const auto& rep : reps) { |
1589 | 0 | if (rep.replica_id != tablet->replica_id()) { |
1590 | 0 | TBackend backend; |
1591 | 0 | backend.__set_host(rep.host); |
1592 | 0 | backend.__set_be_port(rep.be_port); |
1593 | 0 | backend.__set_http_port(rep.http_port); |
1594 | 0 | backend.__set_brpc_port(rep.brpc_port); |
1595 | 0 | if (rep.__isset.is_alive) { |
1596 | 0 | backend.__set_is_alive(rep.is_alive); |
1597 | 0 | } |
1598 | 0 | if (rep.__isset.backend_id) { |
1599 | 0 | backend.__set_id(rep.backend_id); |
1600 | 0 | } |
1601 | 0 | backends->emplace_back(backend); |
1602 | 0 | std::stringstream backend_string; |
1603 | 0 | backend.printTo(backend_string); |
1604 | 0 | LOG_INFO("get 1 peer replica backend info.") |
1605 | 0 | .tag("tablet id", tablet_id) |
1606 | 0 | .tag("backend info", backend_string.str()); |
1607 | 0 | } |
1608 | 0 | } |
1609 | 0 | _last_get_peers_replica_backends_time_ms = UnixMillis(); |
1610 | 0 | LOG_INFO("succeed get peers replica backends info.") |
1611 | 0 | .tag("tablet id", tablet_id) |
1612 | 0 | .tag("replica num", backends->size()); |
1613 | 0 | return true; |
1614 | 0 | } |
1615 | 0 | return false; |
1616 | 0 | } |
1617 | | |
1618 | 0 | bool StorageEngine::should_fetch_from_peer(int64_t tablet_id) { |
1619 | | #ifdef BE_TEST |
1620 | | if (tablet_id % 2 == 0) { |
1621 | | return true; |
1622 | | } |
1623 | | return false; |
1624 | | #endif |
1625 | 0 | TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id); |
1626 | 0 | if (tablet == nullptr) { |
1627 | 0 | LOG(WARNING) << "tablet is no longer exist: tablet_id=" << tablet_id; |
1628 | 0 | return false; |
1629 | 0 | } |
1630 | 0 | std::unique_lock<std::mutex> lock(_peer_replica_infos_mutex); |
1631 | 0 | if (_peer_replica_infos.contains(tablet_id)) { |
1632 | 0 | return _peer_replica_infos[tablet_id].replica_id != tablet->replica_id(); |
1633 | 0 | } |
1634 | 0 | return false; |
1635 | 0 | } |
1636 | | |
1637 | | // Return json: |
1638 | | // { |
1639 | | // "CumulativeCompaction": { |
1640 | | // "/home/disk1" : [10001, 10002], |
1641 | | // "/home/disk2" : [10003] |
1642 | | // }, |
1643 | | // "BaseCompaction": { |
1644 | | // "/home/disk1" : [10001, 10002], |
1645 | | // "/home/disk2" : [10003] |
1646 | | // } |
1647 | | // } |
1648 | 0 | void StorageEngine::get_compaction_status_json(std::string* result) { |
1649 | 0 | _compaction_submit_registry.jsonfy_compaction_status(result); |
1650 | 0 | } |
1651 | | |
1652 | 13 | void BaseStorageEngine::add_quering_rowset(RowsetSharedPtr rs) { |
1653 | 13 | std::lock_guard<std::mutex> lock(_quering_rowsets_mutex); |
1654 | 13 | _querying_rowsets.emplace(rs->rowset_id(), rs); |
1655 | 13 | } |
1656 | | |
1657 | 25 | RowsetSharedPtr BaseStorageEngine::get_quering_rowset(RowsetId rs_id) { |
1658 | 25 | std::lock_guard<std::mutex> lock(_quering_rowsets_mutex); |
1659 | 25 | auto it = _querying_rowsets.find(rs_id); |
1660 | 25 | if (it != _querying_rowsets.end()) { |
1661 | 25 | return it->second; |
1662 | 25 | } |
1663 | 0 | return nullptr; |
1664 | 25 | } |
1665 | | |
1666 | 448 | void BaseStorageEngine::_evict_querying_rowset() { |
1667 | 448 | { |
1668 | 448 | std::lock_guard<std::mutex> lock(_quering_rowsets_mutex); |
1669 | 646 | for (auto it = _querying_rowsets.begin(); it != _querying_rowsets.end();) { |
1670 | 198 | uint64_t now = UnixSeconds(); |
1671 | | // We delay the GC time of this rowset since it's maybe still needed, see #20732 |
1672 | 198 | if (now > it->second->delayed_expired_timestamp()) { |
1673 | 6 | it = _querying_rowsets.erase(it); |
1674 | 192 | } else { |
1675 | 192 | ++it; |
1676 | 192 | } |
1677 | 198 | } |
1678 | 448 | } |
1679 | | |
1680 | 448 | uint64_t now = UnixSeconds(); |
1681 | 448 | ExecEnv::GetInstance()->get_id_manager()->gc_expired_id_file_map(now); |
1682 | 448 | } |
1683 | | |
1684 | 4 | bool BaseStorageEngine::_should_delay_large_task() { |
1685 | 4 | DCHECK_GE(_cumu_compaction_thread_pool->max_threads(), |
1686 | 4 | _cumu_compaction_thread_pool_used_threads); |
1687 | 4 | DCHECK_GE(_cumu_compaction_thread_pool_small_tasks_running, 0); |
1688 | | // Case 1: Multiple threads available => accept large task |
1689 | 4 | if (_cumu_compaction_thread_pool->max_threads() - _cumu_compaction_thread_pool_used_threads > |
1690 | 4 | 0) { |
1691 | 2 | return false; // No delay needed |
1692 | 2 | } |
1693 | | // Case 2: Only one thread left => accept large task only if another small task is already running |
1694 | 2 | if (_cumu_compaction_thread_pool_small_tasks_running > 0) { |
1695 | 1 | return false; // No delay needed |
1696 | 1 | } |
1697 | | // Case 3: Only one thread left, this is a large task, and no small tasks are running |
1698 | | // Delay this task to reserve capacity for potential small tasks |
1699 | 1 | return true; // Delay this large task |
1700 | 2 | } |
1701 | | |
1702 | 3 | bool StorageEngine::add_broken_path(std::string path) { |
1703 | 3 | std::lock_guard<std::mutex> lock(_broken_paths_mutex); |
1704 | 3 | auto success = _broken_paths.emplace(path).second; |
1705 | 3 | if (success) { |
1706 | 2 | static_cast<void>(_persist_broken_paths()); |
1707 | 2 | } |
1708 | 3 | return success; |
1709 | 3 | } |
1710 | | |
1711 | 1 | bool StorageEngine::remove_broken_path(std::string path) { |
1712 | 1 | std::lock_guard<std::mutex> lock(_broken_paths_mutex); |
1713 | 1 | auto count = _broken_paths.erase(path); |
1714 | 1 | if (count > 0) { |
1715 | 1 | static_cast<void>(_persist_broken_paths()); |
1716 | 1 | } |
1717 | 1 | return count > 0; |
1718 | 1 | } |
1719 | | |
1720 | 3 | Status StorageEngine::_persist_broken_paths() { |
1721 | 3 | std::string config_value; |
1722 | 4 | for (const std::string& path : _broken_paths) { |
1723 | 4 | config_value += path + ";"; |
1724 | 4 | } |
1725 | | |
1726 | 3 | if (config_value.length() > 0) { |
1727 | 3 | auto st = config::set_config("broken_storage_path", config_value, true); |
1728 | 3 | LOG(INFO) << "persist broken_storage_path " << config_value << st; |
1729 | 3 | return st; |
1730 | 3 | } |
1731 | | |
1732 | 0 | return Status::OK(); |
1733 | 3 | } |
1734 | | |
1735 | 0 | Status StorageEngine::submit_clone_task(Tablet* tablet, int64_t version) { |
1736 | 0 | std::vector<TBackend> backends; |
1737 | 0 | if (!get_peers_replica_backends(tablet->tablet_id(), &backends)) { |
1738 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>( |
1739 | 0 | "get_peers_replica_backends failed."); |
1740 | 0 | } |
1741 | 0 | TAgentTaskRequest task; |
1742 | 0 | TCloneReq req; |
1743 | 0 | req.__set_tablet_id(tablet->tablet_id()); |
1744 | 0 | req.__set_schema_hash(tablet->schema_hash()); |
1745 | 0 | req.__set_src_backends(backends); |
1746 | 0 | req.__set_version(version); |
1747 | 0 | req.__set_replica_id(tablet->replica_id()); |
1748 | 0 | req.__set_partition_id(tablet->partition_id()); |
1749 | 0 | req.__set_table_id(tablet->table_id()); |
1750 | 0 | task.__set_task_type(TTaskType::CLONE); |
1751 | 0 | task.__set_clone_req(req); |
1752 | 0 | task.__set_priority(TPriority::HIGH); |
1753 | 0 | task.__set_signature(tablet->tablet_id()); |
1754 | 0 | LOG_INFO("BE start to submit missing rowset clone task.") |
1755 | 0 | .tag("tablet_id", tablet->tablet_id()) |
1756 | 0 | .tag("version", version) |
1757 | 0 | .tag("replica_id", tablet->replica_id()) |
1758 | 0 | .tag("partition_id", tablet->partition_id()) |
1759 | 0 | .tag("table_id", tablet->table_id()); |
1760 | 0 | RETURN_IF_ERROR(assert_cast<PriorTaskWorkerPool*>(workers->at(TTaskType::CLONE).get()) |
1761 | 0 | ->submit_high_prior_and_cancel_low(task)); |
1762 | 0 | return Status::OK(); |
1763 | 0 | } |
1764 | | |
1765 | 6.13k | int CreateTabletRRIdxCache::get_index(const std::string& key) { |
1766 | 6.13k | auto* lru_handle = lookup(key); |
1767 | 6.13k | if (lru_handle) { |
1768 | 4.80k | Defer release([cache = this, lru_handle] { cache->release(lru_handle); }); |
1769 | 4.80k | auto* value = (CacheValue*)LRUCachePolicy::value(lru_handle); |
1770 | 4.80k | VLOG_DEBUG << "use create tablet idx cache key=" << key << " value=" << value->idx; |
1771 | 4.80k | return value->idx; |
1772 | 4.80k | } |
1773 | 1.32k | return -1; |
1774 | 6.13k | } |
1775 | | |
1776 | 6.13k | void CreateTabletRRIdxCache::set_index(const std::string& key, int next_idx) { |
1777 | 6.13k | assert(next_idx >= 0); |
1778 | 6.13k | auto* value = new CacheValue; |
1779 | 6.13k | value->idx = next_idx; |
1780 | 6.13k | auto* lru_handle = insert(key, value, 1, sizeof(int), CachePriority::NORMAL); |
1781 | 6.13k | release(lru_handle); |
1782 | 6.13k | } |
1783 | | #include "common/compile_check_end.h" |
1784 | | } // namespace doris |