be/src/io/cache/file_block.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "io/cache/file_block.h" |
19 | | |
20 | | #include <glog/logging.h> |
21 | | // IWYU pragma: no_include <bits/chrono.h> |
22 | | #include <chrono> // IWYU pragma: keep |
23 | | #include <sstream> |
24 | | #include <string> |
25 | | #include <thread> |
26 | | |
27 | | #include "common/status.h" |
28 | | #include "cpp/sync_point.h" |
29 | | #include "io/cache/block_file_cache.h" |
30 | | |
31 | | namespace doris { |
32 | | namespace io { |
33 | | |
34 | 1.50k | std::ostream& operator<<(std::ostream& os, const FileBlock::State& value) { |
35 | 1.50k | os << FileBlock::state_to_string(value); |
36 | 1.50k | return os; |
37 | 1.50k | } |
38 | | |
39 | | FileBlock::FileBlock(const FileCacheKey& key, size_t size, BlockFileCache* mgr, |
40 | | State download_state) |
41 | 147k | : _block_range(key.offset, key.offset + size - 1), |
42 | 147k | _download_state(download_state), |
43 | 147k | _mgr(mgr), |
44 | 147k | _key(key) { |
45 | | /// On creation, file block state can be EMPTY, DOWNLOADED, SKIP_CACHE. |
46 | 147k | switch (_download_state) { |
47 | 0 | case State::DOWNLOADING: { |
48 | 0 | DCHECK(false) << "Can create cell with either EMPTY, DOWNLOADED, SKIP_CACHE "; |
49 | 0 | break; |
50 | 0 | } |
51 | 147k | default: { |
52 | 147k | break; |
53 | 0 | } |
54 | 147k | } |
55 | 147k | } |
56 | | |
57 | 7.28M | FileBlock::State FileBlock::state() const { |
58 | 7.28M | std::lock_guard block_lock(_mutex); |
59 | 7.28M | return _download_state; |
60 | 7.28M | } |
61 | | |
62 | 3.73M | FileBlock::State FileBlock::state_unsafe() const { |
63 | 3.73M | return _download_state; |
64 | 3.73M | } |
65 | | |
66 | 3.97M | uint64_t FileBlock::get_caller_id() { |
67 | 3.97M | uint64_t id; |
68 | | #if defined(__APPLE__) |
69 | | // On macOS, use pthread_threadid_np to get the thread ID |
70 | | pthread_threadid_np(nullptr, &id); |
71 | | #else |
72 | 3.97M | id = static_cast<uint64_t>(pthread_self()); |
73 | 3.97M | #endif |
74 | 3.97M | DCHECK(id != 0); |
75 | 3.97M | return id; |
76 | 3.97M | } |
77 | | |
78 | 116k | uint64_t FileBlock::get_or_set_downloader() { |
79 | 116k | std::lock_guard block_lock(_mutex); |
80 | | |
81 | 116k | if (_downloader_id == 0 && _download_state != State::DOWNLOADED) { |
82 | 116k | DCHECK(_download_state != State::DOWNLOADING); |
83 | 116k | _downloader_id = get_caller_id(); |
84 | 116k | _download_state = State::DOWNLOADING; |
85 | 116k | } else if (_downloader_id == get_caller_id()) { |
86 | 4 | LOG(INFO) << "Attempt to set the same downloader for block " << range().to_string() |
87 | 4 | << " for the second time"; |
88 | 4 | } |
89 | | |
90 | 116k | return _downloader_id; |
91 | 116k | } |
92 | | |
93 | 18 | void FileBlock::reset_downloader(std::lock_guard<std::mutex>& block_lock) { |
94 | 18 | DCHECK(_downloader_id != 0) << "There is no downloader"; |
95 | | |
96 | 18 | DCHECK(get_caller_id() == _downloader_id) << "Downloader can be reset only by downloader"; |
97 | | |
98 | 18 | reset_downloader_impl(block_lock); |
99 | 18 | } |
100 | | |
101 | 18 | void FileBlock::reset_downloader_impl(std::lock_guard<std::mutex>& block_lock) { |
102 | 18 | if (_downloaded_size == range().size()) { |
103 | 2 | Status st = set_downloaded(block_lock); |
104 | 2 | if (!st.ok()) { |
105 | 1 | LOG(WARNING) << "reset downloader error" << st; |
106 | 1 | } |
107 | 16 | } else { |
108 | 16 | _downloaded_size = 0; |
109 | 16 | _download_state = State::EMPTY; |
110 | 16 | _downloader_id = 0; |
111 | 16 | } |
112 | 18 | } |
113 | | |
114 | 116k | Status FileBlock::set_downloaded(std::lock_guard<std::mutex>& /* block_lock */) { |
115 | 116k | DCHECK(_download_state != State::DOWNLOADED); |
116 | 116k | if (_downloaded_size == 0) { |
117 | 1 | _download_state = State::EMPTY; |
118 | 1 | _downloader_id = 0; |
119 | 1 | return Status::InternalError("Try to set empty block {} as downloaded", |
120 | 1 | _block_range.to_string()); |
121 | 1 | } |
122 | 116k | Status status = _mgr->_storage->finalize(_key, this->_block_range.size()); |
123 | 116k | if (status.ok()) [[likely]] { |
124 | 115k | _download_state = State::DOWNLOADED; |
125 | 115k | } else { |
126 | 759 | _download_state = State::EMPTY; |
127 | 759 | _downloaded_size = 0; |
128 | 759 | } |
129 | 116k | _downloader_id = 0; |
130 | 116k | return status; |
131 | 116k | } |
132 | | |
133 | 16 | uint64_t FileBlock::get_downloader() const { |
134 | 16 | std::lock_guard block_lock(_mutex); |
135 | 16 | return _downloader_id; |
136 | 16 | } |
137 | | |
138 | 111k | bool FileBlock::is_downloader() const { |
139 | 111k | std::lock_guard block_lock(_mutex); |
140 | 111k | return get_caller_id() == _downloader_id; |
141 | 111k | } |
142 | | |
143 | 3.72M | bool FileBlock::is_downloader_impl(std::lock_guard<std::mutex>& /* block_lock */) const { |
144 | 3.72M | return get_caller_id() == _downloader_id; |
145 | 3.72M | } |
146 | | |
147 | 116k | Status FileBlock::append(Slice data) { |
148 | 116k | DCHECK(data.size != 0) << "Writing zero size is not allowed"; |
149 | 116k | RETURN_IF_ERROR(_mgr->_storage->append(_key, data)); |
150 | 116k | _downloaded_size += data.size; |
151 | 116k | return Status::OK(); |
152 | 116k | } |
153 | | |
154 | 116k | Status FileBlock::finalize() { |
155 | 116k | if (_downloaded_size == 0) { |
156 | 1 | std::lock_guard block_lock(_mutex); |
157 | 1 | _download_state = State::EMPTY; |
158 | 1 | _downloader_id = 0; |
159 | 1 | _cv.notify_all(); |
160 | 1 | return Status::InternalError("Try to finalize an empty file block {}", |
161 | 1 | _block_range.to_string()); |
162 | 1 | } |
163 | 116k | if (_downloaded_size != _block_range.size()) { |
164 | 7.04k | SCOPED_CACHE_LOCK(_mgr->_mutex, _mgr); |
165 | 7.04k | size_t old_size = _block_range.size(); |
166 | 7.04k | _block_range.right = _block_range.left + _downloaded_size - 1; |
167 | 7.04k | size_t new_size = _block_range.size(); |
168 | 7.04k | DCHECK(new_size < old_size); |
169 | 7.04k | _mgr->reset_range(_key.hash, _block_range.left, old_size, new_size, cache_lock); |
170 | 7.04k | } |
171 | 116k | std::lock_guard block_lock(_mutex); |
172 | 116k | Status st = set_downloaded(block_lock); |
173 | 116k | _cv.notify_all(); |
174 | 116k | return st; |
175 | 116k | } |
176 | | |
177 | 2.25M | Status FileBlock::read(Slice buffer, size_t read_offset) { |
178 | 2.25M | return _mgr->_storage->read(_key, read_offset, buffer); |
179 | 2.25M | } |
180 | | |
181 | 37 | Status FileBlock::change_cache_type(FileCacheType new_type) { |
182 | 37 | SCOPED_CACHE_LOCK(_mgr->_mutex, _mgr); |
183 | 37 | return change_cache_type_lock(new_type, cache_lock); |
184 | 37 | } |
185 | | |
186 | | Status FileBlock::change_cache_type_lock(FileCacheType new_type, |
187 | 38 | std::lock_guard<std::mutex>& cache_lock) { |
188 | 38 | std::lock_guard block_lock(_mutex); |
189 | | |
190 | 38 | if (new_type == _key.meta.type) { |
191 | 1 | return Status::OK(); |
192 | 1 | } |
193 | 37 | if (_download_state == State::DOWNLOADED) { |
194 | 6 | Status st; |
195 | 6 | TEST_SYNC_POINT_CALLBACK("FileBlock::change_cache_type", &st); |
196 | 6 | RETURN_IF_ERROR(_mgr->_storage->change_key_meta_type(_key, new_type, _block_range.size())); |
197 | 6 | } |
198 | 37 | _mgr->change_cache_type(_key.hash, _block_range.left, new_type, cache_lock); |
199 | 37 | _key.meta.type = new_type; |
200 | 37 | return Status::OK(); |
201 | 37 | } |
202 | | |
203 | 14.8k | FileBlock::State FileBlock::wait() { |
204 | 14.8k | std::unique_lock block_lock(_mutex); |
205 | | |
206 | 14.8k | if (_downloader_id == 0) { |
207 | 3 | return _download_state; |
208 | 3 | } |
209 | | |
210 | 14.8k | if (_download_state == State::DOWNLOADING) { |
211 | 14.8k | DCHECK(_downloader_id != 0 && _downloader_id != get_caller_id()); |
212 | 14.8k | _cv.wait_for(block_lock, std::chrono::milliseconds(config::block_cache_wait_timeout_ms)); |
213 | 14.8k | } |
214 | | |
215 | 14.8k | return _download_state; |
216 | 14.8k | } |
217 | | |
218 | 3.72M | void FileBlock::complete_unlocked(std::lock_guard<std::mutex>& block_lock) { |
219 | 3.72M | if (is_downloader_impl(block_lock)) { |
220 | 18 | reset_downloader(block_lock); |
221 | 18 | _cv.notify_all(); |
222 | 18 | } |
223 | 3.72M | } |
224 | | |
225 | 1 | std::string FileBlock::get_info_for_log() const { |
226 | 1 | std::lock_guard block_lock(_mutex); |
227 | 1 | return get_info_for_log_impl(block_lock); |
228 | 1 | } |
229 | | |
230 | 1 | std::string FileBlock::get_info_for_log_impl(std::lock_guard<std::mutex>& block_lock) const { |
231 | 1 | std::stringstream info; |
232 | 1 | info << "File block: " << range().to_string() << ", "; |
233 | 1 | info << "state: " << state_to_string(_download_state) << ", "; |
234 | 1 | info << "size: " << _block_range.size() << ", "; |
235 | 1 | info << "downloader id: " << _downloader_id << ", "; |
236 | 1 | info << "caller id: " << get_caller_id(); |
237 | | |
238 | 1 | return info.str(); |
239 | 1 | } |
240 | | |
241 | 3.88M | FileBlock::State FileBlock::state_unlock(std::lock_guard<std::mutex>&) const { |
242 | 3.88M | return _download_state; |
243 | 3.88M | } |
244 | | |
245 | 1.51k | std::string FileBlock::state_to_string(FileBlock::State state) { |
246 | 1.51k | switch (state) { |
247 | 1.51k | case FileBlock::State::DOWNLOADED: |
248 | 1.51k | return "DOWNLOADED"; |
249 | 1 | case FileBlock::State::EMPTY: |
250 | 1 | return "EMPTY"; |
251 | 2 | case FileBlock::State::DOWNLOADING: |
252 | 2 | return "DOWNLOADING"; |
253 | 1 | case FileBlock::State::SKIP_CACHE: |
254 | 1 | return "SKIP_CACHE"; |
255 | 0 | default: |
256 | 0 | DCHECK(false); |
257 | 0 | return ""; |
258 | 1.51k | } |
259 | 1.51k | } |
260 | | |
261 | 1 | std::string FileBlock::get_cache_file() const { |
262 | 1 | return _mgr->_storage->get_local_file(this->_key); |
263 | 1 | } |
264 | | |
265 | 3.61M | FileBlocksHolder::~FileBlocksHolder() { |
266 | 7.34M | for (auto file_block_it = file_blocks.begin(); file_block_it != file_blocks.end();) { |
267 | 3.72M | auto current_file_block_it = file_block_it; |
268 | 3.72M | auto& file_block = *current_file_block_it; |
269 | 3.72M | BlockFileCache* _mgr = file_block->_mgr; |
270 | 3.72M | { |
271 | 3.72M | bool should_remove = false; |
272 | 3.72M | { |
273 | 3.72M | std::lock_guard block_lock(file_block->_mutex); |
274 | 3.72M | file_block->complete_unlocked(block_lock); |
275 | 3.72M | if (file_block.use_count() == 2 && |
276 | 3.72M | (file_block->is_deleting() || |
277 | 3.65M | file_block->state_unlock(block_lock) == FileBlock::State::EMPTY)) { |
278 | 31.1k | should_remove = true; |
279 | 31.1k | } |
280 | 3.72M | } |
281 | 3.72M | if (should_remove) { |
282 | 31.1k | SCOPED_CACHE_LOCK(_mgr->_mutex, _mgr); |
283 | 31.1k | std::lock_guard block_lock(file_block->_mutex); |
284 | 31.1k | if (file_block.use_count() == 2) { |
285 | 31.1k | DCHECK(file_block->state_unlock(block_lock) != FileBlock::State::DOWNLOADING); |
286 | | // one in cache, one in here |
287 | 31.1k | if (file_block->is_deleting() || |
288 | 31.1k | file_block->state_unlock(block_lock) == FileBlock::State::EMPTY) { |
289 | 31.1k | _mgr->remove(file_block, cache_lock, block_lock, false); |
290 | 31.1k | } |
291 | 31.1k | } |
292 | 31.1k | } |
293 | 3.72M | } |
294 | 3.72M | file_block_it = file_blocks.erase(current_file_block_it); |
295 | 3.72M | } |
296 | 3.61M | } |
297 | | |
298 | | } // namespace io |
299 | | } // namespace doris |