be/src/io/cache/file_block.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "io/cache/file_block.h" |
19 | | |
20 | | #include <butil/iobuf.h> |
21 | | #include <glog/logging.h> |
22 | | // IWYU pragma: no_include <bits/chrono.h> |
23 | | #include <chrono> // IWYU pragma: keep |
24 | | #include <sstream> |
25 | | #include <string> |
26 | | #include <thread> |
27 | | |
28 | | #include "common/status.h" |
29 | | #include "cpp/sync_point.h" |
30 | | #include "io/cache/block_file_cache.h" |
31 | | #include "io/cache/fs_file_cache_storage.h" |
32 | | #include "io/cache/mem_file_cache_storage.h" |
33 | | |
34 | | namespace doris { |
35 | | namespace io { |
36 | | |
37 | | namespace { |
38 | | |
39 | 12 | Status abort_pending_cache_write(FileCacheStorage* storage, const FileCacheKey& key) { |
40 | 12 | if (auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(storage); fs_storage != nullptr) { |
41 | 12 | return fs_storage->abort(key); |
42 | 12 | } |
43 | 0 | if (auto* mem_storage = dynamic_cast<MemFileCacheStorage*>(storage); mem_storage != nullptr) { |
44 | 0 | return mem_storage->abort(key); |
45 | 0 | } |
46 | 0 | return Status::OK(); |
47 | 0 | } |
48 | | |
49 | | } // namespace |
50 | | |
51 | 8.09k | std::ostream& operator<<(std::ostream& os, const FileBlock::State& value) { |
52 | 8.09k | os << FileBlock::state_to_string(value); |
53 | 8.09k | return os; |
54 | 8.09k | } |
55 | | |
56 | | FileBlock::FileBlock(const FileCacheKey& key, size_t size, BlockFileCache* mgr, |
57 | | State download_state) |
58 | 145k | : _block_range(key.offset, key.offset + size - 1), |
59 | 145k | _download_state(download_state), |
60 | 145k | _mgr(mgr), |
61 | 145k | _key(key) { |
62 | | /// On creation, file block state can be EMPTY, DOWNLOADED, SKIP_CACHE. |
63 | 145k | switch (_download_state) { |
64 | 0 | case State::DOWNLOADING: { |
65 | 0 | DCHECK(false) << "Can create cell with either EMPTY, DOWNLOADED, SKIP_CACHE "; |
66 | 0 | break; |
67 | 0 | } |
68 | 145k | default: { |
69 | 145k | break; |
70 | 0 | } |
71 | 145k | } |
72 | 145k | } |
73 | | |
74 | 1.95M | FileBlock::State FileBlock::state() const { |
75 | 1.95M | std::lock_guard block_lock(_mutex); |
76 | 1.95M | return _download_state; |
77 | 1.95M | } |
78 | | |
79 | 892k | FileBlock::State FileBlock::state_unsafe() const { |
80 | 892k | return _download_state; |
81 | 892k | } |
82 | | |
83 | 1.13M | uint64_t FileBlock::get_caller_id() { |
84 | 1.13M | uint64_t id; |
85 | | #if defined(__APPLE__) |
86 | | // On macOS, use pthread_threadid_np to get the thread ID |
87 | | pthread_threadid_np(nullptr, &id); |
88 | | #else |
89 | 1.13M | id = static_cast<uint64_t>(pthread_self()); |
90 | 1.13M | #endif |
91 | 1.13M | DCHECK(id != 0); |
92 | 1.13M | return id; |
93 | 1.13M | } |
94 | | |
95 | 120k | uint64_t FileBlock::get_or_set_downloader() { |
96 | 120k | std::lock_guard block_lock(_mutex); |
97 | | |
98 | 120k | if (_downloader_id == 0 && _download_state != State::DOWNLOADED) { |
99 | 120k | DCHECK(_download_state != State::DOWNLOADING); |
100 | 120k | _downloader_id = get_caller_id(); |
101 | 120k | _download_state = State::DOWNLOADING; |
102 | 120k | } else if (_downloader_id == get_caller_id()) { |
103 | 4 | LOG(INFO) << "Attempt to set the same downloader for block " << range().to_string() |
104 | 4 | << " for the second time"; |
105 | 4 | } |
106 | | |
107 | 120k | return _downloader_id; |
108 | 120k | } |
109 | | |
110 | 23 | void FileBlock::reset_downloader(std::lock_guard<std::mutex>& block_lock) { |
111 | 23 | DCHECK(_downloader_id != 0) << "There is no downloader"; |
112 | | |
113 | 23 | DCHECK(get_caller_id() == _downloader_id) << "Downloader can be reset only by downloader"; |
114 | | |
115 | 23 | reset_downloader_impl(block_lock); |
116 | 23 | } |
117 | | |
118 | 23 | void FileBlock::reset_downloader_impl(std::lock_guard<std::mutex>& block_lock) { |
119 | 23 | if (_downloaded_size == range().size()) { |
120 | 2 | Status st = set_downloaded(block_lock); |
121 | 2 | if (!st.ok()) { |
122 | 1 | LOG(WARNING) << "reset downloader failed, err=" << st |
123 | 1 | << ", block=" << get_info_for_log_impl(block_lock); |
124 | 1 | } |
125 | 21 | } else { |
126 | 21 | _downloaded_size = 0; |
127 | 21 | _download_state = State::EMPTY; |
128 | 21 | _downloader_id = 0; |
129 | 21 | } |
130 | 23 | } |
131 | | |
132 | 120k | Status FileBlock::set_downloaded(std::lock_guard<std::mutex>& block_lock) { |
133 | 120k | DCHECK(_download_state != State::DOWNLOADED); |
134 | 120k | if (_downloaded_size == 0) { |
135 | 1 | _download_state = State::EMPTY; |
136 | 1 | _downloader_id = 0; |
137 | 1 | LOG(WARNING) << "set file cache block downloaded failed: empty block, block=" |
138 | 1 | << get_info_for_log_impl(block_lock); |
139 | 1 | return Status::InternalError("Try to set empty block {} as downloaded", |
140 | 1 | _block_range.to_string()); |
141 | 1 | } |
142 | 120k | Status status = _mgr->_storage->finalize(_key, this->_block_range.size()); |
143 | 120k | if (status.ok()) [[likely]] { |
144 | 120k | _download_state = State::DOWNLOADED; |
145 | 120k | } else { |
146 | 477 | auto abort_st = abort_pending_cache_write(_mgr->_storage.get(), _key); |
147 | 477 | LOG(WARNING) << "finalize file cache block failed, err=" << status |
148 | 477 | << ", abort_status=" << abort_st |
149 | 477 | << ", block=" << get_info_for_log_impl(block_lock); |
150 | 477 | _download_state = State::EMPTY; |
151 | 477 | _downloaded_size = 0; |
152 | 477 | } |
153 | 120k | _downloader_id = 0; |
154 | 120k | return status; |
155 | 120k | } |
156 | | |
157 | 16 | uint64_t FileBlock::get_downloader() const { |
158 | 16 | std::lock_guard block_lock(_mutex); |
159 | 16 | return _downloader_id; |
160 | 16 | } |
161 | | |
162 | 115k | bool FileBlock::is_downloader() const { |
163 | 115k | std::lock_guard block_lock(_mutex); |
164 | 115k | return get_caller_id() == _downloader_id; |
165 | 115k | } |
166 | | |
167 | 892k | bool FileBlock::is_downloader_impl(std::lock_guard<std::mutex>& /* block_lock */) const { |
168 | 892k | return get_caller_id() == _downloader_id; |
169 | 892k | } |
170 | | |
171 | 120k | Status FileBlock::append(Slice data) { |
172 | 120k | return appendv(&data, 1); |
173 | 120k | } |
174 | | |
175 | 120k | Status FileBlock::appendv(const Slice* data, size_t data_cnt) { |
176 | 120k | size_t appended_size = 0; |
177 | 240k | for (size_t idx = 0; idx < data_cnt; ++idx) { |
178 | 120k | appended_size += data[idx].size; |
179 | 120k | } |
180 | 18.4E | DCHECK(appended_size != 0) << "Writing zero size is not allowed"; |
181 | 120k | auto st = _mgr->_storage->appendv(_key, data, data_cnt); |
182 | 120k | if (!st.ok()) { |
183 | 5 | auto abort_st = abort_pending_cache_write(_mgr->_storage.get(), _key); |
184 | 5 | LOG(WARNING) << "appendv file cache block failed, append_size=" << appended_size |
185 | 5 | << ", slice_count=" << data_cnt << ", err=" << st |
186 | 5 | << ", abort_status=" << abort_st << ", block=" << get_info_for_log(); |
187 | 5 | return st; |
188 | 5 | } |
189 | 120k | _downloaded_size += appended_size; |
190 | 120k | return Status::OK(); |
191 | 120k | } |
192 | | |
193 | 48 | Status FileBlock::append_iobuf(const butil::IOBuf& data) { |
194 | 48 | const size_t appended_size = data.length(); |
195 | 48 | DCHECK(appended_size != 0) << "Writing zero size is not allowed"; |
196 | 48 | auto st = _mgr->_storage->append_iobuf(_key, data); |
197 | 48 | if (!st.ok()) { |
198 | 2 | auto abort_st = abort_pending_cache_write(_mgr->_storage.get(), _key); |
199 | 2 | LOG(WARNING) << "append_iobuf file cache block failed, append_size=" << appended_size |
200 | 2 | << ", err=" << st << ", abort_status=" << abort_st |
201 | 2 | << ", block=" << get_info_for_log(); |
202 | 2 | return st; |
203 | 2 | } |
204 | 46 | _downloaded_size += appended_size; |
205 | 46 | return Status::OK(); |
206 | 48 | } |
207 | | |
208 | 120k | Status FileBlock::finalize() { |
209 | 120k | if (_downloaded_size == 0) { |
210 | 1 | std::lock_guard block_lock(_mutex); |
211 | 1 | _download_state = State::EMPTY; |
212 | 1 | _downloader_id = 0; |
213 | 1 | _cv.notify_all(); |
214 | 1 | LOG(WARNING) << "finalize file cache block failed: empty block, block=" |
215 | 1 | << get_info_for_log_impl(block_lock); |
216 | 1 | return Status::InternalError("Try to finalize an empty file block {}", |
217 | 1 | _block_range.to_string()); |
218 | 1 | } |
219 | 120k | if (_downloaded_size != _block_range.size()) { |
220 | 5.79k | SCOPED_CACHE_LOCK(_mgr->_mutex, _mgr); |
221 | 5.79k | size_t old_size = _block_range.size(); |
222 | 5.79k | size_t new_size = _downloaded_size; |
223 | 5.79k | DCHECK(new_size < old_size); |
224 | 5.79k | _mgr->reset_range(_key.hash, _block_range.left, old_size, new_size, cache_lock); |
225 | 5.79k | } |
226 | 120k | std::lock_guard block_lock(_mutex); |
227 | 120k | Status st = set_downloaded(block_lock); |
228 | 120k | _cv.notify_all(); |
229 | 120k | return st; |
230 | 120k | } |
231 | | |
232 | 5.56M | Status FileBlock::read(Slice buffer, size_t read_offset) { |
233 | 5.56M | return _mgr->_storage->read(_key, read_offset, buffer); |
234 | 5.56M | } |
235 | | |
236 | | Status FileBlock::read_to_iobuf(butil::IOBuf* out, size_t read_offset, size_t bytes_req, |
237 | 9 | size_t* bytes_read) { |
238 | 9 | return _mgr->_storage->read_to_iobuf(_key, read_offset, bytes_req, out, bytes_read); |
239 | 9 | } |
240 | | |
241 | 42 | Status FileBlock::change_cache_type(FileCacheType new_type) { |
242 | 42 | SCOPED_CACHE_LOCK(_mgr->_mutex, _mgr); |
243 | 42 | return change_cache_type_lock(new_type, cache_lock); |
244 | 42 | } |
245 | | |
246 | | Status FileBlock::change_cache_type_lock(FileCacheType new_type, |
247 | 43 | std::lock_guard<std::mutex>& cache_lock) { |
248 | 43 | std::lock_guard block_lock(_mutex); |
249 | | |
250 | 43 | if (new_type == _key.meta.type) { |
251 | 1 | return Status::OK(); |
252 | 1 | } |
253 | 42 | if (_download_state == State::DOWNLOADED) { |
254 | 6 | Status st; |
255 | 6 | TEST_SYNC_POINT_CALLBACK("FileBlock::change_cache_type", &st); |
256 | 6 | RETURN_IF_ERROR(_mgr->_storage->change_key_meta_type(_key, new_type, _block_range.size())); |
257 | 6 | } |
258 | 42 | _mgr->change_cache_type(_key.hash, _block_range.left, new_type, cache_lock); |
259 | 42 | _key.meta.type = new_type; |
260 | 42 | return Status::OK(); |
261 | 42 | } |
262 | | |
263 | 5.10k | FileBlock::State FileBlock::wait() { |
264 | 5.10k | std::unique_lock block_lock(_mutex); |
265 | | |
266 | 5.10k | if (_downloader_id == 0) { |
267 | 2 | return _download_state; |
268 | 2 | } |
269 | | |
270 | 5.10k | if (_download_state == State::DOWNLOADING) { |
271 | 5.10k | DCHECK(_downloader_id != 0 && _downloader_id != get_caller_id()); |
272 | 5.10k | _cv.wait_for(block_lock, std::chrono::milliseconds(config::block_cache_wait_timeout_ms)); |
273 | 5.10k | } |
274 | | |
275 | 5.10k | return _download_state; |
276 | 5.10k | } |
277 | | |
278 | 892k | void FileBlock::complete_unlocked(std::lock_guard<std::mutex>& block_lock) { |
279 | 892k | if (is_downloader_impl(block_lock)) { |
280 | 23 | reset_downloader(block_lock); |
281 | 23 | _cv.notify_all(); |
282 | 23 | } |
283 | 892k | } |
284 | | |
285 | 12 | std::string FileBlock::get_info_for_log() const { |
286 | 12 | std::lock_guard block_lock(_mutex); |
287 | 12 | return get_info_for_log_impl(block_lock); |
288 | 12 | } |
289 | | |
290 | 20 | std::string FileBlock::get_info_for_log_impl(std::lock_guard<std::mutex>& block_lock) const { |
291 | 20 | std::stringstream info; |
292 | 20 | info << "File block: " << range().to_string() << ", "; |
293 | 20 | info << "hash: " << _key.hash.to_string() << ", "; |
294 | 20 | info << "cache_type: " << cache_type_to_string(_key.meta.type) << ", "; |
295 | 20 | info << "state: " << state_to_string(_download_state) << ", "; |
296 | 20 | info << "size: " << _block_range.size() << ", "; |
297 | 20 | info << "downloaded_size: " << _downloaded_size << ", "; |
298 | 20 | info << "downloader id: " << _downloader_id << ", "; |
299 | 20 | info << "caller id: " << get_caller_id() << ", "; |
300 | | |
301 | 20 | std::string cache_file = "<unknown>"; |
302 | 20 | if (_mgr != nullptr && _mgr->_storage != nullptr) { |
303 | 19 | cache_file = _mgr->_storage->get_local_file(_key); |
304 | 19 | } |
305 | 20 | info << "cache_file: " << cache_file; |
306 | | |
307 | 20 | return info.str(); |
308 | 20 | } |
309 | | |
310 | 870k | FileBlock::State FileBlock::state_unlock(std::lock_guard<std::mutex>&) const { |
311 | 870k | return _download_state; |
312 | 870k | } |
313 | | |
314 | 8.12k | std::string FileBlock::state_to_string(FileBlock::State state) { |
315 | 8.12k | switch (state) { |
316 | 8.09k | case FileBlock::State::DOWNLOADED: |
317 | 8.09k | return "DOWNLOADED"; |
318 | 9 | case FileBlock::State::EMPTY: |
319 | 9 | return "EMPTY"; |
320 | 15 | case FileBlock::State::DOWNLOADING: |
321 | 15 | return "DOWNLOADING"; |
322 | 1 | case FileBlock::State::SKIP_CACHE: |
323 | 1 | return "SKIP_CACHE"; |
324 | 0 | default: |
325 | 0 | DCHECK(false); |
326 | 0 | return ""; |
327 | 8.12k | } |
328 | 8.12k | } |
329 | | |
330 | 7 | std::string FileBlock::get_cache_file() const { |
331 | 7 | return _mgr->_storage->get_local_file(this->_key); |
332 | 7 | } |
333 | | |
334 | 870k | FileBlocksHolder::~FileBlocksHolder() { |
335 | 1.76M | for (auto file_block_it = file_blocks.begin(); file_block_it != file_blocks.end();) { |
336 | 892k | auto current_file_block_it = file_block_it; |
337 | 892k | auto& file_block = *current_file_block_it; |
338 | 892k | BlockFileCache* _mgr = file_block->_mgr; |
339 | 892k | { |
340 | 892k | bool should_remove = false; |
341 | 892k | { |
342 | 892k | std::lock_guard block_lock(file_block->_mutex); |
343 | 892k | file_block->complete_unlocked(block_lock); |
344 | 892k | if (file_block.use_count() == 2 && |
345 | 892k | (file_block->is_deleting() || |
346 | 662k | file_block->state_unlock(block_lock) == FileBlock::State::EMPTY)) { |
347 | 24.7k | should_remove = true; |
348 | 24.7k | } |
349 | 892k | } |
350 | 892k | if (should_remove) { |
351 | 24.7k | SCOPED_CACHE_LOCK(_mgr->_mutex, _mgr); |
352 | 24.7k | std::lock_guard block_lock(file_block->_mutex); |
353 | 24.7k | if (file_block.use_count() == 2) { |
354 | 24.7k | DCHECK(file_block->state_unlock(block_lock) != FileBlock::State::DOWNLOADING); |
355 | | // one in cache, one in here |
356 | 24.7k | if (file_block->is_deleting() || |
357 | 24.7k | file_block->state_unlock(block_lock) == FileBlock::State::EMPTY) { |
358 | 24.7k | _mgr->remove(file_block, cache_lock, block_lock, false); |
359 | 24.7k | } |
360 | 24.7k | } |
361 | 24.7k | } |
362 | 892k | } |
363 | 892k | file_block_it = file_blocks.erase(current_file_block_it); |
364 | 892k | } |
365 | 870k | } |
366 | | |
367 | | } // namespace io |
368 | | } // namespace doris |