be/src/exec/spill/spill_file_manager.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/spill/spill_file_manager.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <glog/logging.h> |
22 | | |
23 | | #include <algorithm> |
24 | | #include <filesystem> |
25 | | #include <memory> |
26 | | #include <string> |
27 | | |
28 | | #include "common/logging.h" |
29 | | #include "common/metrics/doris_metrics.h" |
30 | | #include "exec/spill/spill_file.h" |
31 | | #include "io/fs/file_system.h" |
32 | | #include "io/fs/local_file_system.h" |
33 | | #include "storage/olap_define.h" |
34 | | #include "util/parse_util.h" |
35 | | #include "util/pretty_printer.h" |
36 | | #include "util/time.h" |
37 | | |
38 | | namespace doris { |
39 | | #include "common/compile_check_begin.h" |
40 | | |
41 | 155 | SpillFileManager::~SpillFileManager() { |
42 | 155 | DorisMetrics::instance()->metric_registry()->deregister_entity(_entity); |
43 | 155 | } |
44 | | |
45 | | SpillFileManager::SpillFileManager( |
46 | | std::unordered_map<std::string, std::unique_ptr<SpillDataDir>>&& spill_store_map) |
47 | 170 | : _spill_store_map(std::move(spill_store_map)), _stop_background_threads_latch(1) {} |
48 | | |
49 | 170 | Status SpillFileManager::init() { |
50 | 170 | LOG(INFO) << "init spill stream manager"; |
51 | 170 | RETURN_IF_ERROR(_init_spill_store_map()); |
52 | | |
53 | 174 | for (const auto& [path, store] : _spill_store_map) { |
54 | 174 | auto gc_dir_root_dir = store->get_spill_data_gc_path(); |
55 | 174 | bool exists = true; |
56 | 174 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(gc_dir_root_dir, &exists)); |
57 | 174 | if (!exists) { |
58 | 64 | RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(gc_dir_root_dir)); |
59 | 64 | } |
60 | | |
61 | 174 | auto spill_dir = store->get_spill_data_path(); |
62 | 174 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(spill_dir, &exists)); |
63 | 174 | if (!exists) { |
64 | 64 | RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(spill_dir)); |
65 | 110 | } else { |
66 | 110 | auto suffix = ToStringFromUnixMillis(UnixMillis()); |
67 | 110 | auto gc_dir = store->get_spill_data_gc_path(suffix); |
68 | 110 | if (std::filesystem::exists(gc_dir)) { |
69 | 0 | LOG(WARNING) << "gc dir already exists: " << gc_dir; |
70 | 0 | } |
71 | 110 | (void)io::global_local_filesystem()->rename(spill_dir, gc_dir); |
72 | 110 | RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(spill_dir)); |
73 | 110 | } |
74 | 174 | } |
75 | | |
76 | 170 | RETURN_IF_ERROR(Thread::create( |
77 | 170 | "Spill", "spill_gc_thread", [this]() { this->_spill_gc_thread_callback(); }, |
78 | 170 | &_spill_gc_thread)); |
79 | 170 | LOG(INFO) << "spill gc thread started"; |
80 | | |
81 | 170 | _init_metrics(); |
82 | | |
83 | 170 | return Status::OK(); |
84 | 170 | } |
85 | | |
86 | 170 | void SpillFileManager::_init_metrics() { |
87 | 170 | _entity = DorisMetrics::instance()->metric_registry()->register_entity("spill", |
88 | 170 | {{"name", "spill"}}); |
89 | | |
90 | 170 | _spill_write_bytes_metric = std::make_unique<doris::MetricPrototype>( |
91 | 170 | doris::MetricType::COUNTER, doris::MetricUnit::BYTES, "spill_write_bytes"); |
92 | 170 | _spill_write_bytes_counter = (IntAtomicCounter*)(_entity->register_metric<IntAtomicCounter>( |
93 | 170 | _spill_write_bytes_metric.get())); |
94 | | |
95 | 170 | _spill_read_bytes_metric = std::make_unique<doris::MetricPrototype>( |
96 | 170 | doris::MetricType::COUNTER, doris::MetricUnit::BYTES, "spill_read_bytes"); |
97 | 170 | _spill_read_bytes_counter = (IntAtomicCounter*)(_entity->register_metric<IntAtomicCounter>( |
98 | 170 | _spill_read_bytes_metric.get())); |
99 | 170 | } |
100 | | |
101 | | // clean up stale spilled files |
102 | 170 | void SpillFileManager::_spill_gc_thread_callback() { |
103 | 11.2k | while (!_stop_background_threads_latch.wait_for( |
104 | 11.2k | std::chrono::milliseconds(config::spill_gc_interval_ms))) { |
105 | 11.1k | gc(config::spill_gc_work_time_ms); |
106 | 11.5k | for (auto& [path, dir] : _spill_store_map) { |
107 | 11.5k | static_cast<void>(dir->update_capacity()); |
108 | 11.5k | } |
109 | 11.1k | } |
110 | 170 | } |
111 | | |
112 | 170 | Status SpillFileManager::_init_spill_store_map() { |
113 | 174 | for (const auto& store : _spill_store_map) { |
114 | 174 | RETURN_IF_ERROR(store.second->init()); |
115 | 174 | } |
116 | | |
117 | 170 | return Status::OK(); |
118 | 170 | } |
119 | | |
120 | | std::vector<SpillDataDir*> SpillFileManager::_get_stores_for_spill( |
121 | 660 | TStorageMedium::type storage_medium) { |
122 | 660 | std::vector<std::pair<SpillDataDir*, double>> stores_with_usage; |
123 | 674 | for (auto& [_, store] : _spill_store_map) { |
124 | 674 | if (store->storage_medium() == storage_medium && !store->reach_capacity_limit(0)) { |
125 | 337 | stores_with_usage.emplace_back(store.get(), store->_get_disk_usage(0)); |
126 | 337 | } |
127 | 674 | } |
128 | 660 | if (stores_with_usage.empty()) { |
129 | 323 | return {}; |
130 | 323 | } |
131 | | |
132 | 337 | std::ranges::sort(stores_with_usage, [](auto&& a, auto&& b) { return a.second < b.second; }); |
133 | | |
134 | 337 | std::vector<SpillDataDir*> stores; |
135 | 337 | for (const auto& [store, _] : stores_with_usage) { |
136 | 337 | stores.emplace_back(store); |
137 | 337 | } |
138 | 337 | return stores; |
139 | 660 | } |
140 | | |
141 | | Status SpillFileManager::create_spill_file(const std::string& relative_path, |
142 | 337 | SpillFileSPtr& spill_file) { |
143 | 337 | auto data_dirs = _get_stores_for_spill(TStorageMedium::type::SSD); |
144 | 337 | if (data_dirs.empty()) { |
145 | 323 | data_dirs = _get_stores_for_spill(TStorageMedium::type::HDD); |
146 | 323 | } |
147 | 337 | if (data_dirs.empty()) { |
148 | 0 | return Status::Error<ErrorCode::NO_AVAILABLE_ROOT_PATH>( |
149 | 0 | "no available disk can be used for spill."); |
150 | 0 | } |
151 | | |
152 | | // Select the first available data dir (sorted by usage ascending) |
153 | 337 | SpillDataDir* data_dir = data_dirs.front(); |
154 | 337 | spill_file = std::make_shared<SpillFile>(data_dir, relative_path); |
155 | 337 | return Status::OK(); |
156 | 337 | } |
157 | | |
158 | 112 | void SpillFileManager::delete_spill_file(SpillFileSPtr spill_file) { |
159 | 112 | if (!spill_file) { |
160 | 0 | LOG(WARNING) << "[spill][delete] null spill_file"; |
161 | 0 | return; |
162 | 0 | } |
163 | 112 | spill_file->gc(); |
164 | 112 | } |
165 | | |
166 | 11.1k | void SpillFileManager::gc(int32_t max_work_time_ms) { |
167 | 11.1k | bool exists = true; |
168 | 11.1k | bool has_work = false; |
169 | 11.1k | int64_t max_work_time_ns = max_work_time_ms * 1000L * 1000L; |
170 | 11.1k | MonotonicStopWatch watch; |
171 | 11.1k | watch.start(); |
172 | 11.1k | Defer defer {[&]() { |
173 | 11.1k | if (has_work) { |
174 | 14 | std::string msg( |
175 | 14 | fmt::format("spill gc time: {}", |
176 | 14 | PrettyPrinter::print(watch.elapsed_time(), TUnit::TIME_NS))); |
177 | 14 | msg += ", spill storage:\n"; |
178 | 22 | for (const auto& [path, store_dir] : _spill_store_map) { |
179 | 22 | msg += " " + store_dir->debug_string(); |
180 | 22 | msg += "\n"; |
181 | 22 | } |
182 | 14 | LOG(INFO) << msg; |
183 | 14 | } |
184 | 11.1k | }}; |
185 | 11.6k | for (const auto& [path, store_dir] : _spill_store_map) { |
186 | 11.6k | std::string gc_root_dir = store_dir->get_spill_data_gc_path(); |
187 | | |
188 | 11.6k | std::error_code ec; |
189 | 11.6k | exists = std::filesystem::exists(gc_root_dir, ec); |
190 | 11.6k | if (ec || !exists) { |
191 | 8.04k | continue; |
192 | 8.04k | } |
193 | | // dirs of queries |
194 | 3.62k | std::vector<io::FileInfo> dirs; |
195 | 3.62k | auto st = io::global_local_filesystem()->list(gc_root_dir, false, &dirs, &exists); |
196 | 3.62k | if (!st.ok()) { |
197 | 0 | continue; |
198 | 0 | } |
199 | | |
200 | 3.62k | for (const auto& dir : dirs) { |
201 | 164 | has_work = true; |
202 | 164 | if (dir.is_file) { |
203 | 0 | continue; |
204 | 0 | } |
205 | 164 | std::string abs_dir = fmt::format("{}/{}", gc_root_dir, dir.file_name); |
206 | | // operator spill sub dirs of a query |
207 | 164 | std::vector<io::FileInfo> files; |
208 | 164 | st = io::global_local_filesystem()->list(abs_dir, false, &files, &exists); |
209 | 164 | if (!st.ok()) { |
210 | 0 | continue; |
211 | 0 | } |
212 | 164 | if (files.empty()) { |
213 | 110 | static_cast<void>(io::global_local_filesystem()->delete_directory(abs_dir)); |
214 | 110 | continue; |
215 | 110 | } |
216 | | |
217 | 278 | for (const auto& file : files) { |
218 | 278 | auto abs_file_path = fmt::format("{}/{}", abs_dir, file.file_name); |
219 | 278 | if (file.is_file) { |
220 | 0 | static_cast<void>(io::global_local_filesystem()->delete_file(abs_file_path)); |
221 | 278 | } else { |
222 | 278 | static_cast<void>( |
223 | 278 | io::global_local_filesystem()->delete_directory(abs_file_path)); |
224 | 278 | } |
225 | 278 | if (watch.elapsed_time() > max_work_time_ns) { |
226 | 0 | break; |
227 | 0 | } |
228 | 278 | } |
229 | 54 | } |
230 | 3.62k | } |
231 | 11.1k | } |
232 | | |
233 | | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(spill_disk_capacity, MetricUnit::BYTES); |
234 | | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(spill_disk_limit, MetricUnit::BYTES); |
235 | | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(spill_disk_avail_capacity, MetricUnit::BYTES); |
236 | | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(spill_disk_data_size, MetricUnit::BYTES); |
237 | | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(spill_disk_has_spill_data, MetricUnit::BYTES); |
238 | | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(spill_disk_has_spill_gc_data, MetricUnit::BYTES); |
239 | | |
240 | | SpillDataDir::SpillDataDir(std::string path, int64_t capacity_bytes, |
241 | | TStorageMedium::type storage_medium) |
242 | 174 | : _path(std::move(path)), |
243 | 174 | _disk_capacity_bytes(capacity_bytes), |
244 | 174 | _storage_medium(storage_medium) { |
245 | 174 | spill_data_dir_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity( |
246 | 174 | std::string("spill_data_dir.") + _path, {{"path", _path + "/" + SPILL_DIR_PREFIX}}); |
247 | 174 | INT_GAUGE_METRIC_REGISTER(spill_data_dir_metric_entity, spill_disk_capacity); |
248 | 174 | INT_GAUGE_METRIC_REGISTER(spill_data_dir_metric_entity, spill_disk_limit); |
249 | 174 | INT_GAUGE_METRIC_REGISTER(spill_data_dir_metric_entity, spill_disk_avail_capacity); |
250 | 174 | INT_GAUGE_METRIC_REGISTER(spill_data_dir_metric_entity, spill_disk_data_size); |
251 | 174 | INT_GAUGE_METRIC_REGISTER(spill_data_dir_metric_entity, spill_disk_has_spill_data); |
252 | 174 | INT_GAUGE_METRIC_REGISTER(spill_data_dir_metric_entity, spill_disk_has_spill_gc_data); |
253 | 174 | } |
254 | | |
255 | 7.45k | bool is_directory_empty(const std::filesystem::path& dir) { |
256 | 7.45k | try { |
257 | 7.45k | return std::filesystem::is_directory(dir) && |
258 | 7.45k | std::filesystem::directory_iterator(dir) == |
259 | 7.32k | std::filesystem::end(std::filesystem::directory_iterator {}); |
260 | | // this method is not thread safe, the file referenced by directory_iterator |
261 | | // maybe moved to spill_gc dir during this function call, so need to catch expection |
262 | 7.45k | } catch (const std::filesystem::filesystem_error&) { |
263 | 0 | return true; |
264 | 0 | } |
265 | 7.45k | } |
266 | | |
267 | 174 | Status SpillDataDir::init() { |
268 | 174 | bool exists = false; |
269 | 174 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(_path, &exists)); |
270 | 174 | if (!exists) { |
271 | 0 | RETURN_NOT_OK_STATUS_WITH_WARN(Status::IOError("opendir failed, path={}", _path), |
272 | 0 | "check file exist failed"); |
273 | 0 | } |
274 | 174 | RETURN_IF_ERROR(update_capacity()); |
275 | 174 | LOG(INFO) << fmt::format( |
276 | 174 | "spill storage path: {}, capacity: {}, limit: {}, available: " |
277 | 174 | "{}", |
278 | 174 | _path, PrettyPrinter::print_bytes(_disk_capacity_bytes), |
279 | 174 | PrettyPrinter::print_bytes(_spill_data_limit_bytes), |
280 | 174 | PrettyPrinter::print_bytes(_available_bytes)); |
281 | 174 | return Status::OK(); |
282 | 174 | } |
283 | | |
284 | 4.23k | std::string SpillDataDir::get_spill_data_path(const std::string& query_id) const { |
285 | 4.23k | auto dir = fmt::format("{}/{}", _path, SPILL_DIR_PREFIX); |
286 | 4.23k | if (!query_id.empty()) { |
287 | 0 | dir = fmt::format("{}/{}", dir, query_id); |
288 | 0 | } |
289 | 4.23k | return dir; |
290 | 4.23k | } |
291 | | |
292 | 15.6k | std::string SpillDataDir::get_spill_data_gc_path(const std::string& sub_dir_name) const { |
293 | 15.6k | auto dir = fmt::format("{}/{}", _path, SPILL_GC_DIR_PREFIX); |
294 | 15.6k | if (!sub_dir_name.empty()) { |
295 | 110 | dir = fmt::format("{}/{}", dir, sub_dir_name); |
296 | 110 | } |
297 | 15.6k | return dir; |
298 | 15.6k | } |
299 | | |
300 | 11.8k | Status SpillDataDir::update_capacity() { |
301 | 11.8k | std::lock_guard<std::mutex> l(_mutex); |
302 | 11.8k | RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info(_path, &_disk_capacity_bytes, |
303 | 11.8k | &_available_bytes)); |
304 | 3.84k | spill_disk_capacity->set_value(_disk_capacity_bytes); |
305 | 3.84k | spill_disk_avail_capacity->set_value(_available_bytes); |
306 | 3.84k | auto disk_use_max_bytes = |
307 | 3.84k | (int64_t)(_disk_capacity_bytes * config::storage_flood_stage_usage_percent / 100); |
308 | 3.84k | bool is_percent = true; |
309 | 3.84k | _spill_data_limit_bytes = ParseUtil::parse_mem_spec(config::spill_storage_limit, -1, |
310 | 3.84k | _disk_capacity_bytes, &is_percent); |
311 | 3.84k | if (_spill_data_limit_bytes <= 0) { |
312 | 0 | spill_disk_limit->set_value(_spill_data_limit_bytes); |
313 | 0 | auto err_msg = fmt::format("Failed to parse spill storage limit from '{}'", |
314 | 0 | config::spill_storage_limit); |
315 | 0 | LOG(WARNING) << err_msg; |
316 | 0 | return Status::InvalidArgument(err_msg); |
317 | 0 | } |
318 | 3.84k | if (is_percent) { |
319 | 3.72k | _spill_data_limit_bytes = (int64_t)(_spill_data_limit_bytes * |
320 | 3.72k | config::storage_flood_stage_usage_percent / 100); |
321 | 3.72k | } |
322 | 3.84k | _spill_data_limit_bytes = std::min(_spill_data_limit_bytes, disk_use_max_bytes); |
323 | 3.84k | spill_disk_limit->set_value(_spill_data_limit_bytes); |
324 | | |
325 | 3.84k | std::string spill_root_dir = get_spill_data_path(); |
326 | 3.84k | std::string spill_gc_root_dir = get_spill_data_gc_path(); |
327 | 3.84k | spill_disk_has_spill_data->set_value(is_directory_empty(spill_root_dir) ? 0 : 1); |
328 | 3.84k | spill_disk_has_spill_gc_data->set_value(is_directory_empty(spill_gc_root_dir) ? 0 : 1); |
329 | | |
330 | 3.84k | return Status::OK(); |
331 | 3.84k | } |
332 | | |
333 | 828 | bool SpillDataDir::_reach_disk_capacity_limit(int64_t incoming_data_size) { |
334 | 828 | double used_pct = _get_disk_usage(incoming_data_size); |
335 | 828 | int64_t left_bytes = _available_bytes - incoming_data_size; |
336 | 828 | if (used_pct >= config::storage_flood_stage_usage_percent / 100.0 && |
337 | 828 | left_bytes <= config::storage_flood_stage_left_capacity_bytes) { |
338 | 0 | LOG(WARNING) << "reach capacity limit. used pct: " << used_pct |
339 | 0 | << ", left bytes: " << left_bytes << ", path: " << _path; |
340 | 0 | return true; |
341 | 0 | } |
342 | 828 | return false; |
343 | 828 | } |
344 | 828 | bool SpillDataDir::reach_capacity_limit(int64_t incoming_data_size) { |
345 | 828 | std::lock_guard<std::mutex> l(_mutex); |
346 | 828 | if (_reach_disk_capacity_limit(incoming_data_size)) { |
347 | 0 | return true; |
348 | 0 | } |
349 | 828 | if (_spill_data_bytes + incoming_data_size > _spill_data_limit_bytes) { |
350 | 0 | LOG_EVERY_T(WARNING, 1) << fmt::format( |
351 | 0 | "spill data reach limit, path: {}, capacity: {}, limit: {}, used: {}, " |
352 | 0 | "available: " |
353 | 0 | "{}, " |
354 | 0 | "incoming " |
355 | 0 | "bytes: {}", |
356 | 0 | _path, PrettyPrinter::print_bytes(_disk_capacity_bytes), |
357 | 0 | PrettyPrinter::print_bytes(_spill_data_limit_bytes), |
358 | 0 | PrettyPrinter::print_bytes(_spill_data_bytes), |
359 | 0 | PrettyPrinter::print_bytes(_available_bytes), |
360 | 0 | PrettyPrinter::print_bytes(incoming_data_size)); |
361 | 0 | return true; |
362 | 0 | } |
363 | 828 | return false; |
364 | 828 | } |
365 | 22 | std::string SpillDataDir::debug_string() { |
366 | 22 | return fmt::format( |
367 | 22 | "path: {}, capacity: {}, limit: {}, used: {}, available: " |
368 | 22 | "{}", |
369 | 22 | _path, PrettyPrinter::print_bytes(_disk_capacity_bytes), |
370 | 22 | PrettyPrinter::print_bytes(_spill_data_limit_bytes), |
371 | 22 | PrettyPrinter::print_bytes(_spill_data_bytes), |
372 | 22 | PrettyPrinter::print_bytes(_available_bytes)); |
373 | 22 | } |
374 | | } // namespace doris |