be/src/load/load_path_mgr.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "load/load_path_mgr.h" |
19 | | |
20 | | // IWYU pragma: no_include <bthread/errno.h> |
21 | | #include <errno.h> // IWYU pragma: keep |
22 | | #include <gen_cpp/Types_types.h> |
23 | | #include <glog/logging.h> |
24 | | #include <string.h> |
25 | | #include <sys/stat.h> |
26 | | |
27 | | #include <algorithm> |
28 | | #include <boost/algorithm/string/join.hpp> |
29 | | |
30 | | #include "common/cast_set.h" // IWYU pragma: keep |
31 | | #include "common/compiler_util.h" // IWYU pragma: keep |
32 | | // IWYU pragma: no_include <bits/chrono.h> |
33 | | #include <chrono> // IWYU pragma: keep |
34 | | #include <memory> |
35 | | #include <ostream> |
36 | | #include <string> |
37 | | |
38 | | #include "common/config.h" |
39 | | #include "io/fs/file_system.h" |
40 | | #include "io/fs/local_file_system.h" |
41 | | #include "runtime/exec_env.h" |
42 | | #include "storage/olap_define.h" |
43 | | #include "storage/options.h" |
44 | | #include "util/thread.h" |
45 | | |
46 | | namespace doris { |
47 | | |
48 | | #include "common/compile_check_begin.h" |
49 | | |
50 | | using namespace ErrorCode; |
51 | | |
52 | | static const uint32_t MAX_SHARD_NUM = 1024; |
53 | | static const std::string SHARD_PREFIX = "__shard_"; |
54 | | |
55 | | LoadPathMgr::LoadPathMgr(ExecEnv* exec_env) |
56 | 8 | : _exec_env(exec_env), |
57 | 8 | _idx(0), |
58 | 8 | _next_shard(0), |
59 | 8 | _error_path_next_shard(0), |
60 | 8 | _stop_background_threads_latch(1) {} |
61 | | |
62 | 4 | void LoadPathMgr::stop() { |
63 | 4 | _stop_background_threads_latch.count_down(); |
64 | 4 | if (_clean_thread) { |
65 | 3 | _clean_thread->join(); |
66 | 3 | } |
67 | 4 | } |
68 | | |
69 | 7 | Status LoadPathMgr::init() { |
70 | 7 | _path_vec.clear(); |
71 | 7 | LOG(INFO) << "Load path configured to [" << boost::join(_path_vec, ",") << "]"; |
72 | | |
73 | | // error log is saved in first root path |
74 | 7 | _error_log_dir = _exec_env->store_paths()[0].path + "/" + ERROR_LOG_PREFIX; |
75 | | // check and make dir |
76 | 7 | RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(_error_log_dir)); |
77 | | |
78 | 7 | _idx = 0; |
79 | 7 | _reserved_hours = std::max<int64_t>(config::load_data_reserve_hours, 1L); |
80 | 7 | RETURN_IF_ERROR(Thread::create( |
81 | 7 | "LoadPathMgr", "clean_expired_temp_path", |
82 | 7 | [this]() { |
83 | | // TODO(zc): add this thread to cgroup for control resource it use |
84 | 7 | while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(3600))) { |
85 | 7 | this->clean(); |
86 | 7 | } |
87 | 7 | }, |
88 | 7 | &_clean_thread)); |
89 | 7 | return Status::OK(); |
90 | 7 | } |
91 | | |
92 | | Status LoadPathMgr::allocate_dir(const std::string& db, const std::string& label, |
93 | 24 | std::string* prefix, int64_t file_bytes) { |
94 | 24 | Status status = _init_once.call([this] { |
95 | 4 | for (auto& store_path : _exec_env->store_paths()) { |
96 | 4 | _path_vec.push_back(store_path.path + "/" + MINI_PREFIX); |
97 | 4 | } |
98 | 4 | return Status::OK(); |
99 | 4 | }); |
100 | 24 | std::string path; |
101 | 24 | auto size = _path_vec.size(); |
102 | 24 | auto retry = size; |
103 | 24 | auto exceed_capacity_path_num = 0; |
104 | 24 | size_t disk_capacity_bytes = 0; |
105 | 24 | size_t available_bytes = 0; |
106 | 25 | while (retry--) { |
107 | | // Call get_space_info to get disk space information. |
108 | | // If the call fails or the disk space is insufficient, |
109 | | // increment the count of paths exceeding capacity and proceed to the next loop iteration. |
110 | 24 | std::string base_path = _path_vec[_idx].substr(0, _path_vec[_idx].find("/" + MINI_PREFIX)); |
111 | 24 | if (!io::global_local_filesystem() |
112 | 24 | ->get_space_info(base_path, &disk_capacity_bytes, &available_bytes) |
113 | 24 | .ok() || |
114 | 24 | !check_disk_space(disk_capacity_bytes, available_bytes, file_bytes)) { |
115 | 1 | ++exceed_capacity_path_num; |
116 | 1 | continue; |
117 | 1 | } |
118 | | // add SHARD_PREFIX for compatible purpose |
119 | 23 | std::lock_guard<std::mutex> l(_lock); |
120 | 23 | std::string shard = SHARD_PREFIX + std::to_string(_next_shard++ % MAX_SHARD_NUM); |
121 | 23 | path = _path_vec[_idx] + "/" + db + "/" + shard + "/" + label; |
122 | 23 | _idx = (_idx + 1) % size; |
123 | 23 | status = io::global_local_filesystem()->create_directory(path); |
124 | 23 | if (LIKELY(status.ok())) { |
125 | 23 | *prefix = path; |
126 | 23 | return Status::OK(); |
127 | 23 | } |
128 | 23 | } |
129 | 1 | if (exceed_capacity_path_num == size) { |
130 | 1 | return Status::Error<DISK_REACH_CAPACITY_LIMIT, false>("exceed capacity limit."); |
131 | 1 | } |
132 | 0 | return status; |
133 | 1 | } |
134 | | |
135 | | bool LoadPathMgr::check_disk_space(size_t disk_capacity_bytes, size_t available_bytes, |
136 | 26 | int64_t file_bytes) { |
137 | 26 | bool is_available = false; |
138 | 26 | int64_t remaining_bytes = available_bytes - file_bytes; |
139 | 26 | double used_ratio = |
140 | 26 | 1.0 - cast_set<double>(remaining_bytes) / cast_set<double>(disk_capacity_bytes); |
141 | 26 | is_available = !(used_ratio >= config::storage_flood_stage_usage_percent / 100.0 && |
142 | 26 | remaining_bytes <= config::storage_flood_stage_left_capacity_bytes); |
143 | 26 | if (!is_available) { |
144 | 2 | LOG(WARNING) << "Exceed capacity limit. disk_capacity: " << disk_capacity_bytes |
145 | 2 | << ", available: " << available_bytes << ", file_bytes: " << file_bytes; |
146 | 2 | } |
147 | 26 | return is_available; |
148 | 26 | } |
149 | | |
150 | 577 | bool LoadPathMgr::is_too_old(time_t cur_time, const std::string& label_dir, int64_t reserve_hours) { |
151 | 577 | struct stat dir_stat; |
152 | 577 | if (stat(label_dir.c_str(), &dir_stat)) { |
153 | 0 | char buf[64]; |
154 | | // State failed, just information |
155 | 0 | LOG(WARNING) << "stat directory failed.path=" << label_dir |
156 | 0 | << ",code=" << strerror_r(errno, buf, 64); |
157 | 0 | return false; |
158 | 0 | } |
159 | | |
160 | 577 | if ((cur_time - dir_stat.st_mtime) < reserve_hours * 3600) { |
161 | 577 | return false; |
162 | 577 | } |
163 | | |
164 | 0 | return true; |
165 | 577 | } |
166 | | |
167 | 0 | void LoadPathMgr::get_load_data_path(std::vector<std::string>* data_paths) { |
168 | 0 | data_paths->insert(data_paths->end(), _path_vec.begin(), _path_vec.end()); |
169 | 0 | return; |
170 | 0 | } |
171 | | |
172 | | const std::string ERROR_FILE_NAME = "error_log"; |
173 | | |
174 | | Status LoadPathMgr::get_load_error_file_name(const std::string& db, const std::string& label, |
175 | | const TUniqueId& fragment_instance_id, |
176 | 577 | std::string* error_path) { |
177 | 577 | std::stringstream ss; |
178 | 577 | std::string shard = ""; |
179 | 577 | { |
180 | 577 | std::lock_guard<std::mutex> l(_lock); |
181 | 577 | shard = SHARD_PREFIX + std::to_string(_error_path_next_shard++ % MAX_SHARD_NUM); |
182 | 577 | } |
183 | 577 | std::string shard_path = _error_log_dir + "/" + shard; |
184 | | // check and create shard path |
185 | 577 | RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(shard_path)); |
186 | | // add shard sub dir to file path |
187 | 577 | ss << shard << "/" << ERROR_FILE_NAME << "_" << db << "_" << label << "_" << std::hex |
188 | 577 | << fragment_instance_id.hi << "_" << fragment_instance_id.lo; |
189 | 577 | *error_path = ss.str(); |
190 | 577 | return Status::OK(); |
191 | 577 | } |
192 | | |
193 | 1.10k | std::string LoadPathMgr::get_load_error_absolute_path(const std::string& file_path) { |
194 | 1.10k | std::string path; |
195 | 1.10k | path.append(_error_log_dir); |
196 | 1.10k | path.append("/"); |
197 | 1.10k | path.append(file_path); |
198 | 1.10k | return path; |
199 | 1.10k | } |
200 | | |
201 | 577 | void LoadPathMgr::process_path(time_t now, const std::string& path, int64_t reserve_hours) { |
202 | 577 | if (!is_too_old(now, path, reserve_hours)) { |
203 | 577 | return; |
204 | 577 | } |
205 | 577 | LOG(INFO) << "Going to remove path. path=" << path; |
206 | 0 | Status status = io::global_local_filesystem()->delete_directory_or_file(path); |
207 | 0 | if (status.ok()) { |
208 | 0 | LOG(INFO) << "Remove path success. path=" << path; |
209 | 0 | } else { |
210 | 0 | LOG(WARNING) << "Remove path failed. path=" << path << ", error=" << status; |
211 | 0 | } |
212 | 0 | } |
213 | | |
214 | 23 | void LoadPathMgr::clean_files_in_path_vec(const std::string& path) { |
215 | | // Check if the path contains "/"+MINI_PREFIX. If not, return directly. |
216 | 23 | if (path.find("/" + MINI_PREFIX) == std::string::npos) { |
217 | 0 | return; |
218 | 0 | } |
219 | | |
220 | 23 | bool exists = false; |
221 | | // Check if the path exists |
222 | 23 | Status status = io::global_local_filesystem()->exists(path, &exists); |
223 | 23 | if (!status.ok()) { |
224 | 0 | LOG(WARNING) << "Failed to check if path exists: " << path << ", error: " << status; |
225 | 0 | return; |
226 | 0 | } |
227 | 23 | if (exists) { |
228 | | // If the path exists, delete the file or directory corresponding to that path |
229 | 22 | status = io::global_local_filesystem()->delete_directory_or_file(path); |
230 | 22 | if (status.ok()) { |
231 | 22 | LOG(INFO) << "Delete path success: " << path; |
232 | 22 | } else { |
233 | 0 | LOG(WARNING) << "Delete path failed: " << path << ", error: " << status; |
234 | 0 | } |
235 | 22 | } |
236 | 23 | } |
237 | | |
238 | 3 | void LoadPathMgr::clean_one_path(const std::string& path) { |
239 | 3 | bool exists = true; |
240 | 3 | std::vector<io::FileInfo> dbs; |
241 | 3 | Status st = io::global_local_filesystem()->list(path, false, &dbs, &exists); |
242 | 3 | if (!st) { |
243 | 0 | return; |
244 | 0 | } |
245 | | |
246 | 3 | Status status; |
247 | 3 | time_t now = time(nullptr); |
248 | 12 | for (auto& db : dbs) { |
249 | 12 | if (db.is_file) { |
250 | 0 | continue; |
251 | 0 | } |
252 | 12 | std::string db_dir = path + "/" + db.file_name; |
253 | 12 | std::vector<io::FileInfo> sub_dirs; |
254 | 12 | status = io::global_local_filesystem()->list(db_dir, false, &sub_dirs, &exists); |
255 | 12 | if (!status.ok()) { |
256 | 0 | LOG(WARNING) << "scan db of trash dir failed: " << status; |
257 | 0 | continue; |
258 | 0 | } |
259 | | // delete this file |
260 | 22 | for (auto& sub_dir : sub_dirs) { |
261 | 22 | if (sub_dir.is_file) { |
262 | 0 | continue; |
263 | 0 | } |
264 | 22 | std::string sub_path = db_dir + "/" + sub_dir.file_name; |
265 | | // for compatible |
266 | 22 | if (sub_dir.file_name.find(SHARD_PREFIX) == 0) { |
267 | | // sub_dir starts with SHARD_PREFIX |
268 | | // process shard sub dir |
269 | 22 | std::vector<io::FileInfo> labels; |
270 | 22 | status = io::global_local_filesystem()->list(sub_path, false, &labels, &exists); |
271 | 22 | if (!status.ok()) { |
272 | 0 | LOG(WARNING) << "scan one path to delete directory failed: " << status; |
273 | 0 | continue; |
274 | 0 | } |
275 | 22 | for (auto& label : labels) { |
276 | 0 | std::string label_dir = sub_path + "/" + label.file_name; |
277 | 0 | process_path(now, label_dir, config::load_data_reserve_hours); |
278 | 0 | } |
279 | 22 | } else { |
280 | | // process label dir |
281 | 0 | process_path(now, sub_path, config::load_data_reserve_hours); |
282 | 0 | } |
283 | 22 | } |
284 | 12 | } |
285 | 3 | } |
286 | | |
287 | 3 | void LoadPathMgr::clean() { |
288 | 3 | for (auto& path : _path_vec) { |
289 | 3 | clean_one_path(path); |
290 | 3 | } |
291 | 3 | clean_error_log(); |
292 | 3 | } |
293 | | |
294 | 3 | void LoadPathMgr::clean_error_log() { |
295 | 3 | time_t now = time(nullptr); |
296 | 3 | bool exists = true; |
297 | 3 | std::vector<io::FileInfo> sub_dirs; |
298 | 3 | { |
299 | 3 | Status status = |
300 | 3 | io::global_local_filesystem()->list(_error_log_dir, false, &sub_dirs, &exists); |
301 | 3 | if (!status.ok()) { |
302 | 0 | LOG(WARNING) << "scan error_log dir failed: " << status; |
303 | 0 | return; |
304 | 0 | } |
305 | 3 | } |
306 | | |
307 | 577 | for (auto& sub_dir : sub_dirs) { |
308 | 577 | if (sub_dir.is_file) { |
309 | 0 | continue; |
310 | 0 | } |
311 | 577 | std::string sub_path = _error_log_dir + "/" + sub_dir.file_name; |
312 | | // for compatible |
313 | 577 | if (sub_dir.file_name.find(SHARD_PREFIX) == 0) { |
314 | | // sub_dir starts with SHARD_PREFIX |
315 | | // process shard sub dir |
316 | 577 | std::vector<io::FileInfo> error_log_files; |
317 | 577 | Status status = |
318 | 577 | io::global_local_filesystem()->list(sub_path, false, &error_log_files, &exists); |
319 | 577 | if (!status.ok()) { |
320 | 0 | LOG(WARNING) << "scan one path to delete directory failed: " << status; |
321 | 0 | continue; |
322 | 0 | } |
323 | 577 | for (auto& error_log : error_log_files) { |
324 | 577 | std::string error_log_path = sub_path + "/" + error_log.file_name; |
325 | 577 | process_path(now, error_log_path, config::load_error_log_reserve_hours); |
326 | 577 | } |
327 | 577 | } else { |
328 | | // process error log file |
329 | 0 | process_path(now, sub_path, config::load_error_log_reserve_hours); |
330 | 0 | } |
331 | 577 | } |
332 | 3 | } |
333 | | |
334 | | #include "common/compile_check_end.h" |
335 | | |
336 | | } // namespace doris |