be/src/io/fs/s3_file_bufferpool.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <crc32c/crc32c.h> |
21 | | |
22 | | #include <condition_variable> |
23 | | #include <cstdint> |
24 | | #include <fstream> |
25 | | #include <functional> |
26 | | #include <list> |
27 | | #include <memory> |
28 | | #include <mutex> |
29 | | |
30 | | #include "common/status.h" |
31 | | #include "io/cache/file_block.h" |
32 | | #include "util/slice.h" |
33 | | #include "util/threadpool.h" |
34 | | |
35 | | namespace doris { |
36 | | namespace io { |
37 | | enum class BufferType : uint32_t { DOWNLOAD, UPLOAD }; |
38 | | using FileBlocksHolderPtr = std::unique_ptr<FileBlocksHolder>; |
39 | | struct OperationState { |
40 | | OperationState(std::function<bool(Status)> sync_after_complete_task, |
41 | | std::function<bool()> is_cancelled) |
42 | 71.7k | : _sync_after_complete_task(std::move(sync_after_complete_task)), |
43 | 71.7k | _is_cancelled(std::move(is_cancelled)) {} |
44 | | /** |
45 | | * set the val of this operation state which indicates it failed or succeeded |
46 | | * |
47 | | * @param S the execution result |
48 | | */ |
49 | 71.7k | void set_status(Status s = Status::OK()) { |
50 | | // make sure we wouldn't sync twice |
51 | 71.7k | if (_value_set) [[unlikely]] { |
52 | 14 | return; |
53 | 14 | } |
54 | 71.7k | if (nullptr != _sync_after_complete_task) { |
55 | 71.7k | _fail_after_sync = _sync_after_complete_task(std::move(s)); |
56 | 71.7k | } |
57 | 71.7k | _value_set = true; |
58 | 71.7k | } |
59 | | |
60 | | /** |
61 | | * detect whether the execution task is done |
62 | | * |
63 | | * @return is the execution task is done |
64 | | */ |
65 | 72.1k | [[nodiscard]] bool is_cancelled() const { |
66 | 72.1k | DCHECK(nullptr != _is_cancelled); |
67 | | // If _fail_after_sync is true then it means the sync task already returns |
68 | | // that the task failed and if the outside file writer might already be |
69 | | // destructed |
70 | 72.1k | return _fail_after_sync ? true : _is_cancelled(); |
71 | 72.1k | } |
72 | | |
73 | | std::function<bool(Status)> _sync_after_complete_task; |
74 | | std::function<bool()> _is_cancelled; |
75 | | bool _value_set = false; |
76 | | bool _fail_after_sync = false; |
77 | | }; |
78 | | |
79 | | struct FileBuffer { |
80 | | FileBuffer(BufferType type, std::function<FileBlocksHolderPtr()> alloc_holder, size_t offset, |
81 | | OperationState state); |
82 | | virtual ~FileBuffer(); |
83 | | /** |
84 | | * submit the correspoding task to async executor |
85 | | */ |
86 | | static Status submit(std::shared_ptr<FileBuffer> buf); |
87 | | /** |
88 | | * append data to the inner memory buffer |
89 | | * |
90 | | * @param S the content to be appended |
91 | | */ |
92 | | virtual Status append_data(const Slice& s) = 0; |
93 | | virtual void execute_async() = 0; |
94 | | /** |
95 | | * set the val of it's operation state |
96 | | * |
97 | | * @param S the execution result |
98 | | */ |
99 | 16 | void set_status(Status s) { _state.set_status(s); } |
100 | | /** |
101 | | * get the start offset of this file buffer |
102 | | * |
103 | | * @return start offset of this file buffer |
104 | | */ |
105 | 6.92M | size_t get_file_offset() const { return _offset; } |
106 | | /** |
107 | | * get the size of the buffered data |
108 | | * |
109 | | * @return the size of the buffered data |
110 | | */ |
111 | 6.99M | size_t get_size() const { return _size; } |
112 | 0 | size_t get_capacaticy() const { return _capacity; } |
113 | | Slice get_slice() const; |
114 | | /** |
115 | | * detect whether the execution task is done |
116 | | * |
117 | | * @return is the execution task is done |
118 | | */ |
119 | 72.1k | bool is_cancelled() const { return _state.is_cancelled(); } |
120 | | |
121 | | std::string_view get_string_view_data() const; |
122 | | |
123 | | BufferType _type; |
124 | | std::function<FileBlocksHolderPtr()> _alloc_holder; |
125 | | size_t _offset; |
126 | | size_t _size; |
127 | | OperationState _state; |
128 | | struct PartData; |
129 | | std::unique_ptr<PartData> _inner_data; |
130 | | size_t _capacity; |
131 | | }; |
132 | | |
133 | | struct DownloadFileBuffer final : public FileBuffer { |
134 | | DownloadFileBuffer(std::function<Status(Slice&)> download, |
135 | | std::function<void(FileBlocksHolderPtr, Slice)> write_to_cache, |
136 | | std::function<void(Slice, size_t)> write_to_use_buffer, OperationState state, |
137 | | size_t offset, std::function<FileBlocksHolderPtr()> alloc_holder) |
138 | 0 | : FileBuffer(BufferType::DOWNLOAD, alloc_holder, offset, state), |
139 | 0 | _download(std::move(download)), |
140 | 0 | _write_to_local_file_cache(std::move(write_to_cache)), |
141 | 0 | _write_to_use_buffer(std::move(write_to_use_buffer)) {} |
142 | 0 | ~DownloadFileBuffer() override = default; |
143 | | /** |
144 | | * do the download work, it would write the content into local memory buffer |
145 | | */ |
146 | | void on_download(); |
147 | 0 | void execute_async() override { on_download(); } |
148 | 0 | Status append_data(const Slice& s) override { return Status::OK(); } |
149 | | |
150 | | std::function<Status(Slice&)> _download; |
151 | | std::function<void(FileBlocksHolderPtr, Slice)> _write_to_local_file_cache; |
152 | | std::function<void(Slice, size_t)> _write_to_use_buffer; |
153 | | }; |
154 | | |
155 | | struct UploadFileBuffer final : public FileBuffer { |
156 | | UploadFileBuffer(std::function<void(UploadFileBuffer&)> upload_cb, OperationState state, |
157 | | size_t offset, std::function<FileBlocksHolderPtr()> alloc_holder) |
158 | 71.7k | : FileBuffer(BufferType::UPLOAD, alloc_holder, offset, state), |
159 | 71.7k | _upload_to_remote(std::move(upload_cb)) {} |
160 | 71.5k | ~UploadFileBuffer() override = default; |
161 | | Status append_data(const Slice& s) override; |
162 | | /** |
163 | | * read the content from local file cache |
164 | | * because previously lack of memory buffer |
165 | | */ |
166 | | void read_from_cache(); |
167 | | /** |
168 | | * write the content inside memory buffer into |
169 | | * local file cache |
170 | | */ |
171 | | void upload_to_local_file_cache(bool); |
172 | | |
173 | 71.7k | void execute_async() override { on_upload(); } |
174 | | /** |
175 | | * do the upload work |
176 | | * 1. read from cache if the data is written to cache first |
177 | | * 2. upload content of buffer to S3 |
178 | | * 3. upload content to file cache if necessary |
179 | | * 4. call the finish callback caller specified |
180 | | * 5. reclaim self |
181 | | */ |
182 | | void on_upload(); |
183 | | /** |
184 | | * |
185 | | * @return the stream representing the inner memory buffer |
186 | | */ |
187 | 0 | std::shared_ptr<std::iostream> get_stream() const { return _stream_ptr; } |
188 | | |
189 | | /** |
190 | | * Currently only used for small file to set callback |
191 | | */ |
192 | 71.3k | void set_upload_to_remote(std::function<void(UploadFileBuffer&)> cb) { |
193 | 71.3k | _upload_to_remote = std::move(cb); |
194 | 71.3k | } |
195 | | |
196 | | private: |
197 | | std::function<void(UploadFileBuffer&)> _upload_to_remote = nullptr; |
198 | | std::shared_ptr<std::iostream> _stream_ptr; // point to _buffer.get_data() |
199 | | |
200 | | bool _is_cache_allocated {false}; |
201 | | FileBlocksHolderPtr _holder; |
202 | | decltype(_holder->file_blocks.begin()) _cur_file_block; |
203 | | size_t _append_offset {0}; |
204 | | uint32_t _crc_value = 0; |
205 | | }; |
206 | | |
207 | | struct FileBufferBuilder { |
208 | 71.7k | FileBufferBuilder() = default; |
209 | 71.7k | ~FileBufferBuilder() = default; |
210 | | /** |
211 | | * build one file buffer using previously set properties |
212 | | * @return the file buffer's base shared pointer |
213 | | */ |
214 | | Status build(std::shared_ptr<FileBuffer>* buf); |
215 | | /** |
216 | | * set the file buffer type |
217 | | * |
218 | | * @param type enum class for buffer type |
219 | | */ |
220 | | FileBufferBuilder& set_type(BufferType type); |
221 | | /** |
222 | | * set the download callback which would download the content on cloud into file buffer |
223 | | * |
224 | | * @param cb |
225 | | */ |
226 | 0 | FileBufferBuilder& set_download_callback(std::function<Status(Slice&)> cb) { |
227 | 0 | _download = std::move(cb); |
228 | 0 | return *this; |
229 | 0 | } |
230 | | /** |
231 | | * set the upload callback which would upload the content inside buffer into remote storage |
232 | | * |
233 | | * @param cb |
234 | | */ |
235 | | FileBufferBuilder& set_upload_callback(std::function<void(UploadFileBuffer& buf)> cb); |
236 | | /** |
237 | | * set the callback which would do task sync for the caller |
238 | | * |
239 | | * @param cb |
240 | | */ |
241 | | FileBufferBuilder& set_sync_after_complete_task(std::function<bool(Status)> cb); |
242 | | /** |
243 | | * set the callback which detect whether the task is done |
244 | | * |
245 | | * @param cb |
246 | | */ |
247 | 71.7k | FileBufferBuilder& set_is_cancelled(std::function<bool()> cb) { |
248 | 71.7k | _is_cancelled = std::move(cb); |
249 | 71.7k | return *this; |
250 | 71.7k | } |
251 | | /** |
252 | | * set the callback which allocate file cache block holder |
253 | | * **Notice**: Because the load file cache workload coule be done |
254 | | * asynchronously so you must make sure all the dependencies of this |
255 | | * cb could last until this cb is invoked |
256 | | * @param cb |
257 | | */ |
258 | | FileBufferBuilder& set_allocate_file_blocks_holder(std::function<FileBlocksHolderPtr()> cb); |
259 | | /** |
260 | | * set the file offset of the file buffer |
261 | | * |
262 | | * @param cb |
263 | | */ |
264 | 71.7k | FileBufferBuilder& set_file_offset(size_t offset) { |
265 | 71.7k | _offset = offset; |
266 | 71.7k | return *this; |
267 | 71.7k | } |
268 | | /** |
269 | | * set the callback which write the content into local file cache |
270 | | * |
271 | | * @param cb |
272 | | */ |
273 | | FileBufferBuilder& set_write_to_local_file_cache( |
274 | 0 | std::function<void(FileBlocksHolderPtr, Slice)> cb) { |
275 | 0 | _write_to_local_file_cache = std::move(cb); |
276 | 0 | return *this; |
277 | 0 | } |
278 | | /** |
279 | | * set the callback which would write the downloaded content into user's buffer |
280 | | * |
281 | | * @param cb |
282 | | */ |
283 | 0 | FileBufferBuilder& set_write_to_use_buffer(std::function<void(Slice, size_t)> cb) { |
284 | 0 | _write_to_use_buffer = std::move(cb); |
285 | 0 | return *this; |
286 | 0 | } |
287 | | |
288 | | BufferType _type; |
289 | | std::function<void(UploadFileBuffer& buf)> _upload_cb = nullptr; |
290 | | std::function<bool(Status)> _sync_after_complete_task = nullptr; |
291 | | std::function<FileBlocksHolderPtr()> _alloc_holder_cb = nullptr; |
292 | | std::function<bool()> _is_cancelled = nullptr; |
293 | | std::function<void(FileBlocksHolderPtr, Slice)> _write_to_local_file_cache; |
294 | | std::function<Status(Slice&)> _download; |
295 | | std::function<void(Slice, size_t)> _write_to_use_buffer; |
296 | | size_t _offset; |
297 | | }; |
298 | | } // namespace io |
299 | | } // namespace doris |