be/src/io/fs/s3_file_bufferpool.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "io/fs/s3_file_bufferpool.h" |
19 | | |
20 | | #include <bvar/bvar.h> |
21 | | #include <crc32c/crc32c.h> |
22 | | |
23 | | #include <chrono> |
24 | | #include <memory> |
25 | | |
26 | | #include "common/config.h" |
27 | | #include "common/exception.h" |
28 | | #include "common/logging.h" |
29 | | #include "common/status.h" |
30 | | #include "core/arena.h" |
31 | | #include "cpp/sync_point.h" |
32 | | #include "io/cache/file_block.h" |
33 | | #include "io/cache/file_cache_common.h" |
34 | | #include "io/fs/s3_common.h" |
35 | | #include "runtime/exec_env.h" |
36 | | #include "runtime/thread_context.h" |
37 | | #include "util/defer_op.h" |
38 | | #include "util/slice.h" |
39 | | |
40 | | namespace doris { |
41 | | namespace io { |
42 | | |
43 | | bvar::Adder<uint64_t> s3_file_buffer_allocated("s3_file_buffer_allocated"); |
44 | | |
45 | | template <typename Allocator = Allocator<false>> |
46 | | struct Memory : boost::noncopyable, Allocator { |
47 | | Memory() = default; |
48 | 1.31k | explicit Memory(size_t size) : _size(size) { |
49 | 1.31k | alloc(size); |
50 | 1.31k | s3_file_buffer_allocated << 1; |
51 | 1.31k | } |
52 | 1.31k | ~Memory() { |
53 | 1.31k | dealloc(); |
54 | 1.31k | s3_file_buffer_allocated << -1; |
55 | 1.31k | } |
56 | 1.31k | void alloc(size_t size) { _data = static_cast<char*>(Allocator::alloc(size, 0)); } |
57 | 1.31k | void dealloc() { |
58 | 1.31k | if (_data == nullptr) { |
59 | 0 | return; |
60 | 0 | } |
61 | 1.31k | Allocator::free(_data, _size); |
62 | 1.31k | _data = nullptr; |
63 | 1.31k | } |
64 | | size_t _size; |
65 | | char* _data; |
66 | | }; |
67 | | |
68 | | struct FileBuffer::PartData { |
69 | | Memory<> _memory; |
70 | 1.31k | PartData() : _memory(config::s3_write_buffer_size) {} |
71 | 1.31k | ~PartData() = default; |
72 | 13.5k | [[nodiscard]] Slice data() const { return Slice {_memory._data, _memory._size}; } |
73 | 1.31k | [[nodiscard]] size_t size() const { return _memory._size; } |
74 | | }; |
75 | | |
76 | 0 | Slice FileBuffer::get_slice() const { |
77 | 0 | return _inner_data->data(); |
78 | 0 | } |
79 | | |
80 | | FileBuffer::FileBuffer(BufferType type, std::function<FileBlocksHolderPtr()> alloc_holder, |
81 | | size_t offset, OperationState state) |
82 | 1.31k | : _type(type), |
83 | 1.31k | _alloc_holder(std::move(alloc_holder)), |
84 | 1.31k | _offset(offset), |
85 | 1.31k | _size(0), |
86 | 1.31k | _state(std::move(state)), |
87 | 1.31k | _inner_data(std::make_unique<FileBuffer::PartData>()), |
88 | 1.31k | _capacity(_inner_data->size()) {} |
89 | | |
90 | 1.31k | FileBuffer::~FileBuffer() { |
91 | 1.31k | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->s3_file_buffer_tracker()); |
92 | 1.31k | _inner_data.reset(); |
93 | 1.31k | } |
94 | | |
95 | | /** |
96 | | * 0. when there is memory preserved, directly write data to buf |
97 | | * 1. write to file cache otherwise, then we'll wait for free buffer and to rob it |
98 | | */ |
99 | 10.6k | Status UploadFileBuffer::append_data(const Slice& data) { |
100 | 10.6k | TEST_SYNC_POINT_RETURN_WITH_VALUE("UploadFileBuffer::append_data", Status::OK(), this, |
101 | 9.63k | data.get_size()); |
102 | 9.63k | std::memcpy((void*)(_inner_data->data().get_data() + _size), data.get_data(), data.get_size()); |
103 | 9.63k | _size += data.get_size(); |
104 | 9.63k | _crc_value = crc32c::Extend(_crc_value, (const uint8_t*)data.get_data(), data.get_size()); |
105 | 9.63k | return Status::OK(); |
106 | 10.6k | } |
107 | | |
108 | | /** |
109 | | * 0. constrcut the stream ptr if the buffer is not empty |
110 | | * 1. submit the on_upload() callback to executor |
111 | | */ |
112 | 1.31k | static Status submit_upload_buffer(std::shared_ptr<FileBuffer> buffer) { |
113 | 1.31k | TEST_SYNC_POINT_RETURN_WITH_VALUE("UploadFileBuffer::submit", Status::OK(), buffer.get()); |
114 | 1.31k | return ExecEnv::GetInstance()->s3_file_upload_thread_pool()->submit_func( |
115 | 1.31k | [buf = std::move(buffer)]() { buf->execute_async(); }); |
116 | 1.31k | } |
117 | | |
118 | 0 | std::ostream& operator<<(std::ostream& os, const BufferType& value) { |
119 | 0 | switch (value) { |
120 | 0 | case BufferType::UPLOAD: |
121 | 0 | os << "upload"; |
122 | 0 | break; |
123 | 0 | case BufferType::DOWNLOAD: |
124 | 0 | os << "download"; |
125 | 0 | break; |
126 | 0 | default: |
127 | 0 | auto cast_value = static_cast<uint32_t>(value); |
128 | 0 | os << cast_value; |
129 | 0 | } |
130 | 0 | return os; |
131 | 0 | } |
132 | | |
133 | 1.31k | Status FileBuffer::submit(std::shared_ptr<FileBuffer> buf) { |
134 | 1.31k | switch (buf->_type) { |
135 | 1.31k | case BufferType::UPLOAD: |
136 | 1.31k | return submit_upload_buffer(std::move(buf)); |
137 | 0 | break; |
138 | 0 | default: |
139 | 0 | CHECK(false) << "should never come here, the illegal type is " << buf->_type; |
140 | 1.31k | }; |
141 | 0 | return Status::InternalError("should never come here"); |
142 | 1.31k | } |
143 | | |
144 | 1.30k | std::string_view FileBuffer::get_string_view_data() const { |
145 | 1.30k | return {_inner_data->data().get_data(), _size}; |
146 | 1.30k | } |
147 | | |
148 | 1.31k | void UploadFileBuffer::on_upload() { |
149 | 1.31k | _stream_ptr = std::make_shared<StringViewStream>(_inner_data->data().get_data(), _size); |
150 | 1.31k | if (_crc_value != crc32c::Crc32c(_inner_data->data().get_data(), _size)) { |
151 | 0 | DCHECK(false); |
152 | 0 | set_status(Status::IOError("Buffer checksum not match")); |
153 | 0 | return; |
154 | 0 | } |
155 | 1.31k | _upload_to_remote(*this); |
156 | 1.31k | if (config::enable_flush_file_cache_async) { |
157 | | // If we call is_cancelled() after _state.set_status() then there might one situation where |
158 | | // s3 file writer is already destructed |
159 | 1.31k | bool cancelled = is_cancelled(); |
160 | 1.31k | _state.set_status(); |
161 | | // this control flow means the buf and the stream shares one memory |
162 | | // so we can directly use buf here |
163 | 1.31k | upload_to_local_file_cache(cancelled); |
164 | 1.31k | } else { |
165 | 0 | upload_to_local_file_cache(is_cancelled()); |
166 | 0 | _state.set_status(); |
167 | 0 | } |
168 | 1.31k | } |
169 | | |
170 | | /** |
171 | | * write the content of the memory buffer to local file cache |
172 | | */ |
173 | 1.31k | void UploadFileBuffer::upload_to_local_file_cache(bool is_cancelled) { |
174 | 1.31k | if (!config::enable_file_cache || _alloc_holder == nullptr) { |
175 | 1.31k | return; |
176 | 1.31k | } |
177 | 0 | if (_holder) { |
178 | 0 | return; |
179 | 0 | } |
180 | 0 | if (is_cancelled) { |
181 | 0 | return; |
182 | 0 | } |
183 | 0 | TEST_INJECTION_POINT_CALLBACK("UploadFileBuffer::upload_to_local_file_cache"); |
184 | | // the data is already written to S3 in this situation |
185 | | // so i didn't handle the file cache write error |
186 | 0 | _holder = _alloc_holder(); |
187 | 0 | size_t pos = 0; |
188 | 0 | size_t data_remain_size = _size; |
189 | 0 | for (auto& block : _holder->file_blocks) { |
190 | 0 | if (data_remain_size == 0) { |
191 | 0 | break; |
192 | 0 | } |
193 | 0 | size_t block_size = block->range().size(); |
194 | 0 | size_t append_size = std::min(data_remain_size, block_size); |
195 | 0 | if (block->state() == FileBlock::State::EMPTY) { |
196 | 0 | block->get_or_set_downloader(); |
197 | | // Another thread may have started downloading due to a query |
198 | | // Just skip putting to cache from UploadFileBuffer |
199 | 0 | if (block->is_downloader()) { |
200 | 0 | Slice s(_inner_data->data().get_data() + pos, append_size); |
201 | 0 | Status st = block->append(s); |
202 | 0 | TEST_INJECTION_POINT_CALLBACK("UploadFileBuffer::upload_to_local_file_cache_inject", |
203 | 0 | &st); |
204 | 0 | if (st.ok()) { |
205 | 0 | st = block->finalize(); |
206 | 0 | } |
207 | 0 | if (!st.ok()) { |
208 | 0 | { |
209 | 0 | [[maybe_unused]] bool ret = false; |
210 | 0 | TEST_SYNC_POINT_CALLBACK("UploadFileBuffer::upload_to_local_file_cache", |
211 | 0 | &ret); |
212 | 0 | } |
213 | 0 | LOG_WARNING("failed to append data to file cache").error(st); |
214 | 0 | } |
215 | 0 | } |
216 | 0 | } |
217 | 0 | data_remain_size -= append_size; |
218 | 0 | pos += append_size; |
219 | 0 | } |
220 | 0 | } |
221 | | |
222 | 1.31k | FileBufferBuilder& FileBufferBuilder::set_type(BufferType type) { |
223 | 1.31k | _type = type; |
224 | 1.31k | return *this; |
225 | 1.31k | } |
226 | | FileBufferBuilder& FileBufferBuilder::set_upload_callback( |
227 | 1.31k | std::function<void(UploadFileBuffer& buf)> cb) { |
228 | 1.31k | _upload_cb = std::move(cb); |
229 | 1.31k | return *this; |
230 | 1.31k | } |
231 | | // set callback to do task sync for the caller |
232 | 1.31k | FileBufferBuilder& FileBufferBuilder::set_sync_after_complete_task(std::function<bool(Status)> cb) { |
233 | 1.31k | _sync_after_complete_task = std::move(cb); |
234 | 1.31k | return *this; |
235 | 1.31k | } |
236 | | |
237 | | FileBufferBuilder& FileBufferBuilder::set_allocate_file_blocks_holder( |
238 | 0 | std::function<FileBlocksHolderPtr()> cb) { |
239 | 0 | _alloc_holder_cb = std::move(cb); |
240 | 0 | return *this; |
241 | 0 | } |
242 | | |
243 | 1.31k | Status FileBufferBuilder::build(std::shared_ptr<FileBuffer>* buf) { |
244 | 1.31k | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->s3_file_buffer_tracker()); |
245 | 1.31k | OperationState state(_sync_after_complete_task, _is_cancelled); |
246 | | |
247 | 1.31k | if (_type == BufferType::UPLOAD) { |
248 | 1.31k | RETURN_IF_CATCH_EXCEPTION(*buf = std::make_shared<UploadFileBuffer>( |
249 | 1.31k | std::move(_upload_cb), std::move(state), _offset, |
250 | 1.31k | std::move(_alloc_holder_cb))); |
251 | 1.31k | return Status::OK(); |
252 | 1.31k | } |
253 | 0 | if (_type == BufferType::DOWNLOAD) { |
254 | 0 | RETURN_IF_CATCH_EXCEPTION(*buf = std::make_shared<DownloadFileBuffer>( |
255 | 0 | std::move(_download), |
256 | 0 | std::move(_write_to_local_file_cache), |
257 | 0 | std::move(_write_to_use_buffer), std::move(state), |
258 | 0 | _offset, std::move(_alloc_holder_cb))); |
259 | 0 | return Status::OK(); |
260 | 0 | } |
261 | | // should never come here |
262 | 0 | return Status::InternalError("unsupport buffer type {}", _type); |
263 | 0 | } |
264 | | |
265 | | /** |
266 | | * 0. check if we need to write into cache |
267 | | * 1. check if there is free space inside the file cache |
268 | | * 2. call the download callback |
269 | | * 3. write the downloaded content into user buffer if necessary |
270 | | */ |
271 | 0 | void DownloadFileBuffer::on_download() { |
272 | 0 | auto s = Status::OK(); |
273 | 0 | Defer def {[&]() { _state.set_status(std::move(s)); }}; |
274 | 0 | if (is_cancelled()) { |
275 | 0 | return; |
276 | 0 | } |
277 | 0 | FileBlocksHolderPtr holder = nullptr; |
278 | 0 | bool need_to_download_into_cache = false; |
279 | 0 | if (_alloc_holder != nullptr) { |
280 | 0 | holder = _alloc_holder(); |
281 | 0 | std::for_each(holder->file_blocks.begin(), holder->file_blocks.end(), |
282 | 0 | [&need_to_download_into_cache](FileBlockSPtr& file_block) { |
283 | 0 | if (file_block->state() == FileBlock::State::EMPTY) { |
284 | 0 | file_block->get_or_set_downloader(); |
285 | 0 | if (file_block->is_downloader()) { |
286 | 0 | need_to_download_into_cache = true; |
287 | 0 | } |
288 | 0 | } |
289 | 0 | }); |
290 | 0 | if (!need_to_download_into_cache && !_write_to_use_buffer) [[unlikely]] { |
291 | 0 | LOG(INFO) << "Skipping download because that there is no space for catch data."; |
292 | 0 | } else { |
293 | 0 | Slice tmp = _inner_data->data(); |
294 | 0 | s = _download(tmp); |
295 | 0 | if (s) { |
296 | 0 | _size = tmp.get_size(); |
297 | 0 | if (_write_to_use_buffer != nullptr) { |
298 | 0 | _write_to_use_buffer({_inner_data->data().get_data(), get_size()}, |
299 | 0 | get_file_offset()); |
300 | 0 | } |
301 | 0 | if (need_to_download_into_cache) { |
302 | 0 | _write_to_local_file_cache(std::move(holder), |
303 | 0 | Slice {_inner_data->data().get_data(), _size}); |
304 | 0 | } |
305 | 0 | } else { |
306 | 0 | LOG(WARNING) << s; |
307 | 0 | } |
308 | 0 | _state.set_status(std::move(s)); |
309 | 0 | } |
310 | 0 | } else { |
311 | 0 | Slice tmp = _inner_data->data(); |
312 | 0 | s = _download(tmp); |
313 | 0 | _size = tmp.get_size(); |
314 | 0 | if (s.ok() && _write_to_use_buffer != nullptr) { |
315 | 0 | _write_to_use_buffer({_inner_data->data().get_data(), get_size()}, get_file_offset()); |
316 | 0 | } |
317 | 0 | } |
318 | 0 | } |
319 | | |
320 | | } // namespace io |
321 | | } // namespace doris |