be/src/load/group_commit/wal/wal_manager.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "load/group_commit/wal/wal_manager.h" |
19 | | |
20 | | #include <absl/strings/str_split.h> |
21 | | #include <bvar/bvar.h> |
22 | | #include <glog/logging.h> |
23 | | |
24 | | #include <chrono> |
25 | | #include <filesystem> |
26 | | #include <shared_mutex> |
27 | | #include <thread> |
28 | | #include <unordered_map> |
29 | | #include <vector> |
30 | | |
31 | | #include "common/config.h" |
32 | | #include "common/status.h" |
33 | | #include "io/fs/local_file_system.h" |
34 | | #include "load/group_commit/wal/wal_dirs_info.h" |
35 | | #include "load/group_commit/wal/wal_reader.h" |
36 | | #include "runtime/exec_env.h" |
37 | | #include "runtime/fragment_mgr.h" |
38 | | #include "util/parse_util.h" |
39 | | |
40 | | namespace doris { |
41 | | |
42 | | bvar::Status<size_t> g_wal_total_count("wal_total_count", 0); |
43 | | bvar::Status<size_t> g_wal_max_count_per_table("wal_max_count_per_table", 0); |
44 | | |
45 | | WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list) |
46 | 2 | : _exec_env(exec_env), |
47 | 2 | _stop(false), |
48 | 2 | _stop_background_threads_latch(1), |
49 | 2 | _first_replay(true) { |
50 | 2 | _wal_dirs = absl::StrSplit(wal_dir_list, ";", absl::SkipWhitespace()); |
51 | 2 | static_cast<void>(ThreadPoolBuilder("GroupCommitReplayWalThreadPool") |
52 | 2 | .set_min_threads(1) |
53 | 2 | .set_max_threads(config::group_commit_relay_wal_threads) |
54 | 2 | .build(&_thread_pool)); |
55 | 2 | _wal_dirs_info = WalDirsInfo::create_unique(); |
56 | 2 | } |
57 | | |
58 | 2 | WalManager::~WalManager() { |
59 | 2 | LOG(INFO) << "WalManager is destoried"; |
60 | 2 | } |
61 | | |
62 | 0 | bool WalManager::is_running() { |
63 | 0 | return !_stop.load(); |
64 | 0 | } |
65 | | |
66 | 1 | void WalManager::stop() { |
67 | 1 | if (!this->_stop.load()) { |
68 | 1 | this->_stop.store(true); |
69 | 1 | _stop_relay_wal(); |
70 | 1 | _stop_background_threads_latch.count_down(); |
71 | 1 | if (_replay_thread) { |
72 | 0 | _replay_thread->join(); |
73 | 0 | } |
74 | 1 | if (_update_wal_dirs_info_thread) { |
75 | 0 | _update_wal_dirs_info_thread->join(); |
76 | 0 | } |
77 | 1 | _thread_pool->shutdown(); |
78 | 1 | LOG(INFO) << "WalManager is stopped"; |
79 | 1 | } |
80 | 1 | } |
81 | | |
82 | 0 | Status WalManager::init() { |
83 | 0 | RETURN_IF_ERROR(_init_wal_dirs_conf()); |
84 | 0 | RETURN_IF_ERROR(_init_wal_dirs()); |
85 | 0 | RETURN_IF_ERROR(_init_wal_dirs_info()); |
86 | 0 | return Thread::create( |
87 | 0 | "WalMgr", "replay_wal", [this]() { static_cast<void>(this->_replay_background()); }, |
88 | 0 | &_replay_thread); |
89 | 0 | } |
90 | | |
91 | 0 | Status WalManager::_init_wal_dirs_conf() { |
92 | 0 | std::vector<std::string> tmp_dirs; |
93 | 0 | if (_wal_dirs.empty()) { |
94 | | // default case. |
95 | 0 | for (const StorePath& path : ExecEnv::GetInstance()->store_paths()) { |
96 | 0 | tmp_dirs.emplace_back(path.path + "/wal"); |
97 | 0 | } |
98 | 0 | } else { |
99 | | // user config must be absolute path. |
100 | 0 | for (const std::string& wal_dir : _wal_dirs) { |
101 | 0 | if (std::filesystem::path(wal_dir).is_absolute()) { |
102 | 0 | tmp_dirs.emplace_back(wal_dir); |
103 | 0 | } else { |
104 | 0 | return Status::InternalError( |
105 | 0 | "BE config group_commit_replay_wal_dir has to be absolute path!"); |
106 | 0 | } |
107 | 0 | } |
108 | 0 | } |
109 | 0 | _wal_dirs = tmp_dirs; |
110 | 0 | return Status::OK(); |
111 | 0 | } |
112 | | |
113 | 0 | Status WalManager::_init_wal_dirs() { |
114 | 0 | bool exists = false; |
115 | 0 | for (auto wal_dir : _wal_dirs) { |
116 | 0 | std::string tmp_dir = wal_dir + "/" + _tmp; |
117 | 0 | LOG(INFO) << "wal_dir:" << wal_dir << ", tmp_dir:" << tmp_dir; |
118 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_dir, &exists)); |
119 | 0 | if (!exists) { |
120 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_dir)); |
121 | 0 | } |
122 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(tmp_dir, &exists)); |
123 | 0 | if (!exists) { |
124 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(tmp_dir)); |
125 | 0 | } |
126 | 0 | } |
127 | 0 | return Status::OK(); |
128 | 0 | } |
129 | | |
130 | 15 | Status WalManager::_init_wal_dirs_info() { |
131 | 15 | for (const std::string& wal_dir : _wal_dirs) { |
132 | 15 | size_t available_bytes; |
133 | | #ifndef BE_TEST |
134 | | size_t disk_capacity_bytes; |
135 | | RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info(wal_dir, &disk_capacity_bytes, |
136 | | &available_bytes)); |
137 | | #else |
138 | 15 | available_bytes = wal_limit_test_bytes; |
139 | 15 | #endif |
140 | 15 | bool is_percent = true; |
141 | 15 | int64_t wal_disk_limit = ParseUtil::parse_mem_spec(config::group_commit_wal_max_disk_limit, |
142 | 15 | -1, available_bytes, &is_percent); |
143 | 15 | if (wal_disk_limit < 0) { |
144 | 3 | return Status::InternalError( |
145 | 3 | "group_commit_wal_max_disk_limit config is wrong, please check your config!"); |
146 | 3 | } |
147 | | // if there are some wal files in wal dir, we need to add it to wal disk limit. |
148 | 12 | size_t wal_dir_size = 0; |
149 | | #ifndef BE_TEST |
150 | | RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(wal_dir, &wal_dir_size)); |
151 | | #endif |
152 | 12 | if (is_percent) { |
153 | 4 | wal_disk_limit += wal_dir_size; |
154 | 4 | } |
155 | 12 | RETURN_IF_ERROR(_wal_dirs_info->add(wal_dir, wal_disk_limit, wal_dir_size, 0)); |
156 | | |
157 | 12 | #ifdef BE_TEST |
158 | 12 | wal_limit_test_bytes = wal_disk_limit; |
159 | 12 | #endif |
160 | 12 | } |
161 | | #ifndef BE_TEST |
162 | | return Thread::create( |
163 | | "WalMgr", "update_wal_dir_info", |
164 | | [this]() { static_cast<void>(this->_update_wal_dir_info_thread()); }, |
165 | | &_update_wal_dirs_info_thread); |
166 | | #else |
167 | 12 | return Status::OK(); |
168 | 15 | #endif |
169 | 15 | } |
170 | | |
171 | 0 | void WalManager::add_wal_queue(int64_t table_id, int64_t wal_id) { |
172 | 0 | std::lock_guard<std::shared_mutex> wrlock(_wal_queue_lock); |
173 | 0 | LOG(INFO) << "add wal to queue, table_id: " << table_id << ", wal_id: " << wal_id; |
174 | 0 | auto it = _wal_queues.find(table_id); |
175 | 0 | if (it == _wal_queues.end()) { |
176 | 0 | std::set<int64_t> tmp_set; |
177 | 0 | tmp_set.insert(wal_id); |
178 | 0 | _wal_queues.emplace(table_id, tmp_set); |
179 | 0 | } else { |
180 | 0 | it->second.insert(wal_id); |
181 | 0 | } |
182 | 0 | } |
183 | | |
184 | 0 | void WalManager::erase_wal_queue(int64_t table_id, int64_t wal_id) { |
185 | 0 | std::lock_guard<std::shared_mutex> wrlock(_wal_queue_lock); |
186 | 0 | auto it = _wal_queues.find(table_id); |
187 | 0 | if (it != _wal_queues.end()) { |
188 | 0 | LOG(INFO) << "remove wal from queue, table_id: " << table_id << ", wal_id: " << wal_id; |
189 | 0 | it->second.erase(wal_id); |
190 | 0 | if (it->second.empty()) { |
191 | 0 | _wal_queues.erase(table_id); |
192 | 0 | } |
193 | 0 | } |
194 | 0 | } |
195 | | |
196 | 0 | size_t WalManager::get_wal_queue_size(int64_t table_id) { |
197 | 0 | std::lock_guard<std::shared_mutex> wrlock(_wal_queue_lock); |
198 | 0 | size_t count = 0; |
199 | 0 | if (table_id > 0) { |
200 | 0 | auto it = _wal_queues.find(table_id); |
201 | 0 | if (it != _wal_queues.end()) { |
202 | 0 | return it->second.size(); |
203 | 0 | } else { |
204 | 0 | return 0; |
205 | 0 | } |
206 | 0 | } else { |
207 | | // table_id is -1 meaning get all table wal size |
208 | 0 | size_t max_count_per_table = 0; |
209 | 0 | for (auto& [_, table_wals] : _wal_queues) { |
210 | 0 | size_t table_wal_count = table_wals.size(); |
211 | 0 | count += table_wal_count; |
212 | 0 | if (table_wal_count > max_count_per_table) { |
213 | 0 | max_count_per_table = table_wal_count; |
214 | 0 | } |
215 | 0 | } |
216 | 0 | g_wal_max_count_per_table.set_value(max_count_per_table); |
217 | 0 | } |
218 | 0 | return count; |
219 | 0 | } |
220 | | |
221 | | Status WalManager::create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, |
222 | | const std::string& label, std::string& base_path, |
223 | 2 | uint32_t wal_version) { |
224 | 2 | base_path = _wal_dirs_info->get_available_random_wal_dir(); |
225 | 2 | std::stringstream ss; |
226 | 2 | ss << base_path << "/" << std::to_string(db_id) << "/" << std::to_string(table_id) << "/" |
227 | 2 | << std::to_string(wal_version) << "_" << _exec_env->cluster_info()->backend_id << "_" |
228 | 2 | << std::to_string(wal_id) << "_" << label; |
229 | 2 | { |
230 | 2 | std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock); |
231 | 2 | auto it = _wal_path_map.find(wal_id); |
232 | 2 | if (it != _wal_path_map.end()) { |
233 | 0 | return Status::InternalError("wal_id {} already in wal_path_map", wal_id); |
234 | 0 | } |
235 | 2 | _wal_path_map.emplace(wal_id, ss.str()); |
236 | 2 | } |
237 | 0 | return Status::OK(); |
238 | 2 | } |
239 | | |
240 | 0 | Status WalManager::get_wal_path(int64_t wal_id, std::string& wal_path) { |
241 | 0 | std::shared_lock rdlock(_wal_path_lock); |
242 | 0 | auto it = _wal_path_map.find(wal_id); |
243 | 0 | if (it != _wal_path_map.end()) { |
244 | 0 | wal_path = _wal_path_map[wal_id]; |
245 | 0 | } else { |
246 | 0 | return Status::InternalError("can not find wal_id {} in wal_path_map", wal_id); |
247 | 0 | } |
248 | 0 | return Status::OK(); |
249 | 0 | } |
250 | | |
251 | | Status WalManager::parse_wal_path(const std::string& file_name, int64_t& version, |
252 | 0 | int64_t& backend_id, int64_t& wal_id, std::string& label) { |
253 | 0 | try { |
254 | | // find version |
255 | 0 | auto pos = file_name.find("_"); |
256 | 0 | version = std::strtoll(file_name.substr(0, pos).c_str(), NULL, 10); |
257 | | // find be id |
258 | 0 | auto substring1 = file_name.substr(pos + 1); |
259 | 0 | pos = substring1.find("_"); |
260 | 0 | backend_id = std::strtoll(substring1.substr(0, pos).c_str(), NULL, 10); |
261 | | // find wal id |
262 | 0 | auto substring2 = substring1.substr(pos + 1); |
263 | 0 | pos = substring2.find("_"); |
264 | 0 | wal_id = std::strtoll(substring2.substr(0, pos).c_str(), NULL, 10); |
265 | | // find label |
266 | 0 | label = substring2.substr(pos + 1); |
267 | 0 | VLOG_DEBUG << "version:" << version << "backend_id:" << backend_id << ",wal_id:" << wal_id |
268 | 0 | << ",label:" << label; |
269 | 0 | } catch (const std::invalid_argument& e) { |
270 | 0 | return Status::InvalidArgument("Invalid format, {}", e.what()); |
271 | 0 | } |
272 | 0 | return Status::OK(); |
273 | 0 | } |
274 | | |
275 | 0 | Status WalManager::_load_wals() { |
276 | 0 | std::vector<ScanWalInfo> wals; |
277 | 0 | for (auto wal_dir : _wal_dirs) { |
278 | 0 | WARN_IF_ERROR(_scan_wals(wal_dir, wals), fmt::format("fail to scan wal dir={}", wal_dir)); |
279 | 0 | } |
280 | 0 | for (const auto& wal : wals) { |
281 | 0 | bool exists = false; |
282 | 0 | WARN_IF_ERROR(io::global_local_filesystem()->exists(wal.wal_path, &exists), |
283 | 0 | fmt::format("fail to check exist on wal file={}", wal.wal_path)); |
284 | 0 | if (!exists) { |
285 | 0 | continue; |
286 | 0 | } |
287 | 0 | LOG(INFO) << "find wal: " << wal.wal_path; |
288 | 0 | { |
289 | 0 | std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock); |
290 | 0 | auto it = _wal_path_map.find(wal.wal_id); |
291 | 0 | if (it != _wal_path_map.end()) { |
292 | 0 | LOG(INFO) << "wal_id " << wal.wal_id << " already in wal_path_map, skip it"; |
293 | 0 | continue; |
294 | 0 | } |
295 | 0 | _wal_path_map.emplace(wal.wal_id, wal.wal_path); |
296 | 0 | } |
297 | | // this config is use for test p0 case in pipeline |
298 | 0 | if (config::group_commit_wait_replay_wal_finish) { |
299 | 0 | auto lock = std::make_shared<std::mutex>(); |
300 | 0 | auto cv = std::make_shared<std::condition_variable>(); |
301 | 0 | auto add_st = add_wal_cv_map(wal.wal_id, lock, cv); |
302 | 0 | if (!add_st.ok()) { |
303 | 0 | LOG(WARNING) << "fail to add wal_id " << wal.wal_id << " to wal_cv_map"; |
304 | 0 | continue; |
305 | 0 | } |
306 | 0 | } |
307 | 0 | _exec_env->wal_mgr()->add_wal_queue(wal.tb_id, wal.wal_id); |
308 | 0 | WARN_IF_ERROR(add_recover_wal(wal.db_id, wal.tb_id, wal.wal_id, wal.wal_path), |
309 | 0 | fmt::format("Failed to add recover wal={}", wal.wal_path)); |
310 | 0 | } |
311 | 0 | return Status::OK(); |
312 | 0 | } |
313 | | |
314 | 0 | Status WalManager::_scan_wals(const std::string& wal_path, std::vector<ScanWalInfo>& res) { |
315 | 0 | bool exists = false; |
316 | 0 | auto last_total_size = res.size(); |
317 | 0 | std::vector<io::FileInfo> dbs; |
318 | 0 | Status st = io::global_local_filesystem()->list(wal_path, false, &dbs, &exists); |
319 | 0 | if (!st.ok()) { |
320 | 0 | LOG(WARNING) << "failed list files for wal_dir=" << wal_path << ", st=" << st.to_string(); |
321 | 0 | return st; |
322 | 0 | } |
323 | 0 | for (const auto& database_id : dbs) { |
324 | 0 | if (database_id.is_file || database_id.file_name == _tmp) { |
325 | 0 | continue; |
326 | 0 | } |
327 | 0 | std::vector<io::FileInfo> tables; |
328 | 0 | auto db_path = wal_path + "/" + database_id.file_name; |
329 | 0 | st = io::global_local_filesystem()->list(db_path, false, &tables, &exists); |
330 | 0 | if (!st.ok()) { |
331 | 0 | LOG(WARNING) << "failed list files for wal_dir=" << db_path |
332 | 0 | << ", st=" << st.to_string(); |
333 | 0 | return st; |
334 | 0 | } |
335 | 0 | for (const auto& table_id : tables) { |
336 | 0 | if (table_id.is_file) { |
337 | 0 | continue; |
338 | 0 | } |
339 | 0 | std::vector<io::FileInfo> wals; |
340 | 0 | auto table_path = db_path + "/" + table_id.file_name; |
341 | 0 | st = io::global_local_filesystem()->list(table_path, false, &wals, &exists); |
342 | 0 | if (!st.ok()) { |
343 | 0 | LOG(WARNING) << "failed list files for wal_dir=" << table_path |
344 | 0 | << ", st=" << st.to_string(); |
345 | 0 | return st; |
346 | 0 | } |
347 | 0 | if (wals.empty()) { |
348 | 0 | continue; |
349 | 0 | } |
350 | 0 | int64_t db_id = -1; |
351 | 0 | int64_t tb_id = -1; |
352 | 0 | try { |
353 | 0 | db_id = std::strtoll(database_id.file_name.c_str(), NULL, 10); |
354 | 0 | tb_id = std::strtoll(table_id.file_name.c_str(), NULL, 10); |
355 | 0 | } catch (const std::invalid_argument& e) { |
356 | 0 | return Status::InvalidArgument("Invalid format, {}", e.what()); |
357 | 0 | } |
358 | 0 | for (const auto& wal : wals) { |
359 | 0 | int64_t version = -1; |
360 | 0 | int64_t backend_id = -1; |
361 | 0 | int64_t wal_id = -1; |
362 | 0 | std::string label = ""; |
363 | 0 | auto parse_st = parse_wal_path(wal.file_name, version, backend_id, wal_id, label); |
364 | 0 | if (!parse_st.ok()) { |
365 | 0 | LOG(WARNING) << "fail to parse file=" << wal.file_name |
366 | 0 | << ",st=" << parse_st.to_string(); |
367 | 0 | continue; |
368 | 0 | } |
369 | 0 | auto wal_file = table_path + "/" + wal.file_name; |
370 | 0 | struct ScanWalInfo scan_wal_info; |
371 | 0 | scan_wal_info.wal_path = wal_file; |
372 | 0 | scan_wal_info.db_id = db_id; |
373 | 0 | scan_wal_info.tb_id = tb_id; |
374 | 0 | scan_wal_info.wal_id = wal_id; |
375 | 0 | scan_wal_info.be_id = backend_id; |
376 | 0 | res.emplace_back(scan_wal_info); |
377 | 0 | } |
378 | 0 | } |
379 | 0 | } |
380 | 0 | LOG(INFO) << "Finish list wal_dir=" << wal_path |
381 | 0 | << ", wal count=" << std::to_string(res.size() - last_total_size); |
382 | 0 | return Status::OK(); |
383 | 0 | } |
384 | | |
385 | 0 | Status WalManager::_replay_background() { |
386 | 0 | do { |
387 | 0 | if (_stop.load()) { |
388 | 0 | break; |
389 | 0 | } |
390 | | // port == 0 means not received heartbeat yet |
391 | 0 | if (_exec_env->cluster_info() != nullptr && |
392 | 0 | _exec_env->cluster_info()->master_fe_addr.port == 0) { |
393 | 0 | continue; |
394 | 0 | } |
395 | | // replay residual wal,only replay once |
396 | 0 | bool expected = true; |
397 | 0 | if (_first_replay.compare_exchange_strong(expected, false)) { |
398 | 0 | RETURN_IF_ERROR(_load_wals()); |
399 | 0 | } |
400 | 0 | g_wal_total_count.set_value(get_wal_queue_size(-1)); |
401 | | // replay wal of current process |
402 | 0 | std::vector<int64_t> replay_tables; |
403 | 0 | { |
404 | 0 | std::lock_guard<std::shared_mutex> wrlock(_table_lock); |
405 | 0 | auto it = _table_map.begin(); |
406 | 0 | while (it != _table_map.end()) { |
407 | 0 | if (it->second->size() > 0) { |
408 | 0 | replay_tables.push_back(it->first); |
409 | 0 | } |
410 | 0 | it++; |
411 | 0 | } |
412 | 0 | } |
413 | 0 | for (const auto& table_id : replay_tables) { |
414 | 0 | RETURN_IF_ERROR(_thread_pool->submit_func([table_id, this] { |
415 | 0 | auto st = this->_table_map[table_id]->replay_wals(); |
416 | 0 | if (!st.ok()) { |
417 | 0 | LOG(WARNING) << "failed to submit replay wal for table=" << table_id; |
418 | 0 | } |
419 | 0 | })); |
420 | 0 | } |
421 | 0 | } while (!_stop_background_threads_latch.wait_for( |
422 | 0 | std::chrono::seconds(config::group_commit_replay_wal_retry_interval_seconds))); |
423 | 0 | return Status::OK(); |
424 | 0 | } |
425 | | |
426 | | Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_id, |
427 | 0 | std::string wal) { |
428 | 0 | std::lock_guard<std::shared_mutex> wrlock(_table_lock); |
429 | 0 | std::shared_ptr<WalTable> table_ptr; |
430 | 0 | auto it = _table_map.find(table_id); |
431 | 0 | if (it == _table_map.end()) { |
432 | 0 | table_ptr = std::make_shared<WalTable>(_exec_env, db_id, table_id); |
433 | 0 | _table_map.emplace(table_id, table_ptr); |
434 | 0 | } else { |
435 | 0 | table_ptr = it->second; |
436 | 0 | } |
437 | 0 | table_ptr->add_wal(wal_id, wal); |
438 | | #ifndef BE_TEST |
439 | | WARN_IF_ERROR(update_wal_dir_limit(get_base_wal_path(wal)), |
440 | | "Failed to update wal dir limit while add recover wal!"); |
441 | | WARN_IF_ERROR(update_wal_dir_used(get_base_wal_path(wal)), |
442 | | "Failed to update wal dir used while add recove wal!"); |
443 | | #endif |
444 | 0 | return Status::OK(); |
445 | 0 | } |
446 | | |
447 | 0 | size_t WalManager::get_wal_table_size(int64_t table_id) { |
448 | 0 | std::shared_lock rdlock(_table_lock); |
449 | 0 | auto it = _table_map.find(table_id); |
450 | 0 | if (it != _table_map.end()) { |
451 | 0 | return it->second->size(); |
452 | 0 | } else { |
453 | 0 | return 0; |
454 | 0 | } |
455 | 0 | } |
456 | | |
457 | 1 | void WalManager::_stop_relay_wal() { |
458 | 1 | std::lock_guard<std::shared_mutex> wrlock(_table_lock); |
459 | 1 | for (auto& [_, wal_table] : _table_map) { |
460 | 0 | wal_table->stop(); |
461 | 0 | } |
462 | 1 | } |
463 | | |
464 | 2 | size_t WalManager::get_max_available_size() { |
465 | 2 | return _wal_dirs_info->get_max_available_size(); |
466 | 2 | } |
467 | | |
468 | 0 | std::string WalManager::get_wal_dirs_info_string() { |
469 | 0 | return _wal_dirs_info->get_wal_dirs_info_string(); |
470 | 0 | } |
471 | | |
472 | 0 | Status WalManager::update_wal_dir_limit(const std::string& wal_dir, size_t limit) { |
473 | 0 | return _wal_dirs_info->update_wal_dir_limit(wal_dir, limit); |
474 | 0 | } |
475 | | |
476 | 0 | Status WalManager::update_wal_dir_used(const std::string& wal_dir, size_t used) { |
477 | 0 | return _wal_dirs_info->update_wal_dir_used(wal_dir, used); |
478 | 0 | } |
479 | | |
480 | | Status WalManager::update_wal_dir_estimated_wal_bytes(const std::string& wal_dir, |
481 | | size_t increase_estimated_wal_bytes, |
482 | 0 | size_t decrease_estimated_wal_bytes) { |
483 | 0 | return _wal_dirs_info->update_wal_dir_estimated_wal_bytes(wal_dir, increase_estimated_wal_bytes, |
484 | 0 | decrease_estimated_wal_bytes); |
485 | 0 | } |
486 | | |
487 | 0 | Status WalManager::_update_wal_dir_info_thread() { |
488 | 0 | while (!_stop.load()) { |
489 | 0 | if (!ExecEnv::ready()) { |
490 | 0 | VLOG_DEBUG << "Sleep 1s to wait for storage engine init."; |
491 | 0 | std::this_thread::sleep_for(std::chrono::milliseconds(1000)); |
492 | 0 | continue; |
493 | 0 | } |
494 | 0 | static_cast<void>(_wal_dirs_info->update_all_wal_dir_limit()); |
495 | 0 | static_cast<void>(_wal_dirs_info->update_all_wal_dir_used()); |
496 | 0 | LOG_EVERY_N(INFO, 100) << "Scheduled(every 10s) WAL info: " << get_wal_dirs_info_string(); |
497 | 0 | std::this_thread::sleep_for(std::chrono::milliseconds(100)); |
498 | 0 | } |
499 | 0 | return Status::OK(); |
500 | 0 | } |
501 | | |
502 | 0 | Status WalManager::get_wal_dir_available_size(const std::string& wal_dir, size_t* available_bytes) { |
503 | 0 | return _wal_dirs_info->get_wal_dir_available_size(wal_dir, available_bytes); |
504 | 0 | } |
505 | | |
506 | 0 | std::string WalManager::get_base_wal_path(const std::string& wal_path_str) { |
507 | 0 | io::Path wal_path = wal_path_str; |
508 | 0 | for (int i = 0; i < 3; ++i) { |
509 | 0 | if (!wal_path.has_parent_path()) { |
510 | 0 | return ""; |
511 | 0 | } |
512 | 0 | wal_path = wal_path.parent_path(); |
513 | 0 | } |
514 | 0 | return wal_path.string(); |
515 | 0 | } |
516 | | |
517 | | Status WalManager::add_wal_cv_map(int64_t wal_id, std::shared_ptr<std::mutex> lock, |
518 | 0 | std::shared_ptr<std::condition_variable> cv) { |
519 | 0 | std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock); |
520 | 0 | auto it = _wal_cv_map.find(wal_id); |
521 | 0 | if (it != _wal_cv_map.end()) { |
522 | 0 | return Status::InternalError("wal {} is already in _wal_cv_map ", wal_id); |
523 | 0 | } |
524 | 0 | auto pair = std::make_pair(lock, cv); |
525 | 0 | _wal_cv_map.emplace(wal_id, pair); |
526 | 0 | LOG(INFO) << "add " << wal_id << " to _wal_cv_map"; |
527 | 0 | return Status::OK(); |
528 | 0 | } |
529 | | |
530 | 0 | Status WalManager::erase_wal_cv_map(int64_t wal_id) { |
531 | 0 | std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock); |
532 | 0 | if (_wal_cv_map.erase(wal_id)) { |
533 | 0 | LOG(INFO) << "erase " << wal_id << " from _wal_cv_map"; |
534 | 0 | } else { |
535 | 0 | return Status::InternalError("fail to erase wal {} from wal_cv_map", wal_id); |
536 | 0 | } |
537 | 0 | return Status::OK(); |
538 | 0 | } |
539 | | |
540 | 0 | Status WalManager::wait_replay_wal_finish(int64_t wal_id) { |
541 | 0 | std::shared_ptr<std::mutex> lock = nullptr; |
542 | 0 | std::shared_ptr<std::condition_variable> cv = nullptr; |
543 | 0 | auto st = get_lock_and_cv(wal_id, lock, cv); |
544 | 0 | if (st.ok()) { |
545 | 0 | std::unique_lock l(*(lock)); |
546 | 0 | LOG(INFO) << "start wait " << wal_id; |
547 | 0 | if (cv->wait_for(l, std::chrono::seconds(180)) == std::cv_status::timeout) { |
548 | 0 | LOG(WARNING) << "wait for " << wal_id << " is time out"; |
549 | 0 | } |
550 | 0 | LOG(INFO) << "get wal " << wal_id << ",finish wait"; |
551 | 0 | RETURN_IF_ERROR(erase_wal_cv_map(wal_id)); |
552 | 0 | LOG(INFO) << "erase wal " << wal_id; |
553 | 0 | } |
554 | 0 | return Status::OK(); |
555 | 0 | } |
556 | | |
557 | 0 | Status WalManager::notify_relay_wal(int64_t wal_id) { |
558 | 0 | std::shared_ptr<std::mutex> lock = nullptr; |
559 | 0 | std::shared_ptr<std::condition_variable> cv = nullptr; |
560 | 0 | auto st = get_lock_and_cv(wal_id, lock, cv); |
561 | 0 | if (st.ok()) { |
562 | 0 | std::unique_lock l(*(lock)); |
563 | 0 | cv->notify_all(); |
564 | 0 | LOG(INFO) << "get wal " << wal_id << ",notify all"; |
565 | 0 | } |
566 | 0 | return Status::OK(); |
567 | 0 | } |
568 | | |
569 | | Status WalManager::get_lock_and_cv(int64_t wal_id, std::shared_ptr<std::mutex>& lock, |
570 | 0 | std::shared_ptr<std::condition_variable>& cv) { |
571 | 0 | std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock); |
572 | 0 | auto it = _wal_cv_map.find(wal_id); |
573 | 0 | if (it == _wal_cv_map.end()) { |
574 | 0 | return Status::InternalError("cannot find txn {} in _wal_cv_map", wal_id); |
575 | 0 | } |
576 | 0 | lock = it->second.first; |
577 | 0 | cv = it->second.second; |
578 | 0 | return Status::OK(); |
579 | 0 | } |
580 | | |
581 | 0 | Status WalManager::delete_wal(int64_t table_id, int64_t wal_id) { |
582 | 0 | std::string wal_path; |
583 | 0 | { |
584 | 0 | std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock); |
585 | 0 | auto it = _wal_path_map.find(wal_id); |
586 | 0 | if (it != _wal_path_map.end()) { |
587 | 0 | wal_path = it->second; |
588 | 0 | auto st = io::global_local_filesystem()->delete_file(wal_path); |
589 | 0 | if (st.ok()) { |
590 | 0 | LOG(INFO) << "delete wal=" << wal_path; |
591 | 0 | } else { |
592 | 0 | LOG(WARNING) << "failed to delete wal=" << wal_path << ", st=" << st.to_string(); |
593 | 0 | } |
594 | 0 | _wal_path_map.erase(wal_id); |
595 | 0 | } |
596 | 0 | } |
597 | 0 | erase_wal_queue(table_id, wal_id); |
598 | 0 | return Status::OK(); |
599 | 0 | } |
600 | | |
601 | 0 | Status WalManager::rename_to_tmp_path(const std::string wal, int64_t table_id, int64_t wal_id) { |
602 | 0 | io::Path wal_path = wal; |
603 | 0 | std::list<std::string> path_element; |
604 | 0 | for (int i = 0; i < 3; ++i) { |
605 | 0 | if (!wal_path.has_parent_path()) { |
606 | 0 | return Status::InternalError("parent path is not enough when rename " + wal); |
607 | 0 | } |
608 | 0 | path_element.push_front(wal_path.filename().string()); |
609 | 0 | wal_path = wal_path.parent_path(); |
610 | 0 | } |
611 | 0 | wal_path.append(_tmp); |
612 | 0 | for (auto path : path_element) { |
613 | 0 | wal_path.append(path); |
614 | 0 | } |
615 | 0 | bool exists = false; |
616 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_path.parent_path(), &exists)); |
617 | 0 | if (!exists) { |
618 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_path.parent_path())); |
619 | 0 | } |
620 | 0 | auto res = std::rename(wal.c_str(), wal_path.string().c_str()); |
621 | 0 | if (res < 0) { |
622 | 0 | LOG(INFO) << "failed to rename wal from " << wal << " to " << wal_path.string(); |
623 | 0 | return Status::InternalError("rename fail on path " + wal); |
624 | 0 | } |
625 | 0 | LOG(INFO) << "rename wal from " << wal << " to " << wal_path.string(); |
626 | 0 | { |
627 | 0 | std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock); |
628 | 0 | auto it = _wal_path_map.find(wal_id); |
629 | 0 | if (it != _wal_path_map.end()) { |
630 | 0 | _wal_path_map.erase(wal_id); |
631 | 0 | } else { |
632 | | LOG(WARNING) << "can't find " << wal_id << " in _wal_path_map when trying to rename"; |
633 | 0 | } |
634 | 0 | } |
635 | 0 | erase_wal_queue(table_id, wal_id); |
636 | 0 | return Status::OK(); |
637 | 0 | } |
638 | | |
639 | | } // namespace doris |