Coverage Report

Created: 2026-03-13 03:47

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/s3_file_bufferpool.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <crc32c/crc32c.h>
21
22
#include <condition_variable>
23
#include <cstdint>
24
#include <fstream>
25
#include <functional>
26
#include <list>
27
#include <memory>
28
#include <mutex>
29
30
#include "common/status.h"
31
#include "io/cache/file_block.h"
32
#include "util/slice.h"
33
#include "util/threadpool.h"
34
35
namespace doris {
36
namespace io {
37
enum class BufferType : uint32_t { DOWNLOAD, UPLOAD };
38
using FileBlocksHolderPtr = std::unique_ptr<FileBlocksHolder>;
39
struct OperationState {
40
    OperationState(std::function<bool(Status)> sync_after_complete_task,
41
                   std::function<bool()> is_cancelled)
42
71.7k
            : _sync_after_complete_task(std::move(sync_after_complete_task)),
43
71.7k
              _is_cancelled(std::move(is_cancelled)) {}
44
    /**
45
    * set the val of this operation state which indicates it failed or succeeded
46
    *
47
    * @param S the execution result
48
    */
49
71.7k
    void set_status(Status s = Status::OK()) {
50
        // make sure we wouldn't sync twice
51
71.7k
        if (_value_set) [[unlikely]] {
52
14
            return;
53
14
        }
54
71.7k
        if (nullptr != _sync_after_complete_task) {
55
71.7k
            _fail_after_sync = _sync_after_complete_task(std::move(s));
56
71.7k
        }
57
71.7k
        _value_set = true;
58
71.7k
    }
59
60
    /**
61
    * detect whether the execution task is done
62
    *
63
    * @return is the execution task is done
64
    */
65
72.1k
    [[nodiscard]] bool is_cancelled() const {
66
72.1k
        DCHECK(nullptr != _is_cancelled);
67
        // If _fail_after_sync is true then it means the sync task already returns
68
        // that the task failed and if the outside file writer might already be
69
        // destructed
70
72.1k
        return _fail_after_sync ? true : _is_cancelled();
71
72.1k
    }
72
73
    std::function<bool(Status)> _sync_after_complete_task;
74
    std::function<bool()> _is_cancelled;
75
    bool _value_set = false;
76
    bool _fail_after_sync = false;
77
};
78
79
struct FileBuffer {
80
    FileBuffer(BufferType type, std::function<FileBlocksHolderPtr()> alloc_holder, size_t offset,
81
               OperationState state);
82
    virtual ~FileBuffer();
83
    /**
84
    * submit the correspoding task to async executor
85
    */
86
    static Status submit(std::shared_ptr<FileBuffer> buf);
87
    /**
88
    * append data to the inner memory buffer
89
    *
90
    * @param S the content to be appended
91
    */
92
    virtual Status append_data(const Slice& s) = 0;
93
    virtual void execute_async() = 0;
94
    /**
95
    * set the val of it's operation state
96
    *
97
    * @param S the execution result
98
    */
99
16
    void set_status(Status s) { _state.set_status(s); }
100
    /**
101
    * get the start offset of this file buffer
102
    *
103
    * @return start offset of this file buffer
104
    */
105
6.92M
    size_t get_file_offset() const { return _offset; }
106
    /**
107
    * get the size of the buffered data
108
    *
109
    * @return the size of the buffered data
110
    */
111
6.99M
    size_t get_size() const { return _size; }
112
0
    size_t get_capacaticy() const { return _capacity; }
113
    Slice get_slice() const;
114
    /**
115
    * detect whether the execution task is done
116
    *
117
    * @return is the execution task is done
118
    */
119
72.1k
    bool is_cancelled() const { return _state.is_cancelled(); }
120
121
    std::string_view get_string_view_data() const;
122
123
    BufferType _type;
124
    std::function<FileBlocksHolderPtr()> _alloc_holder;
125
    size_t _offset;
126
    size_t _size;
127
    OperationState _state;
128
    struct PartData;
129
    std::unique_ptr<PartData> _inner_data;
130
    size_t _capacity;
131
};
132
133
struct DownloadFileBuffer final : public FileBuffer {
134
    DownloadFileBuffer(std::function<Status(Slice&)> download,
135
                       std::function<void(FileBlocksHolderPtr, Slice)> write_to_cache,
136
                       std::function<void(Slice, size_t)> write_to_use_buffer, OperationState state,
137
                       size_t offset, std::function<FileBlocksHolderPtr()> alloc_holder)
138
0
            : FileBuffer(BufferType::DOWNLOAD, alloc_holder, offset, state),
139
0
              _download(std::move(download)),
140
0
              _write_to_local_file_cache(std::move(write_to_cache)),
141
0
              _write_to_use_buffer(std::move(write_to_use_buffer)) {}
142
0
    ~DownloadFileBuffer() override = default;
143
    /**
144
    * do the download work, it would write the content into local memory buffer
145
    */
146
    void on_download();
147
0
    void execute_async() override { on_download(); }
148
0
    Status append_data(const Slice& s) override { return Status::OK(); }
149
150
    std::function<Status(Slice&)> _download;
151
    std::function<void(FileBlocksHolderPtr, Slice)> _write_to_local_file_cache;
152
    std::function<void(Slice, size_t)> _write_to_use_buffer;
153
};
154
155
struct UploadFileBuffer final : public FileBuffer {
156
    UploadFileBuffer(std::function<void(UploadFileBuffer&)> upload_cb, OperationState state,
157
                     size_t offset, std::function<FileBlocksHolderPtr()> alloc_holder)
158
71.7k
            : FileBuffer(BufferType::UPLOAD, alloc_holder, offset, state),
159
71.7k
              _upload_to_remote(std::move(upload_cb)) {}
160
71.5k
    ~UploadFileBuffer() override = default;
161
    Status append_data(const Slice& s) override;
162
    /**
163
    * read the content from local file cache
164
    * because previously lack of  memory buffer
165
    */
166
    void read_from_cache();
167
    /**
168
    * write the content inside memory buffer into 
169
    * local file cache
170
    */
171
    void upload_to_local_file_cache(bool);
172
173
71.7k
    void execute_async() override { on_upload(); }
174
    /**
175
    * do the upload work
176
    * 1. read from cache if the data is written to cache first
177
    * 2. upload content of buffer to S3
178
    * 3. upload content to file cache if necessary
179
    * 4. call the finish callback caller specified
180
    * 5. reclaim self
181
    */
182
    void on_upload();
183
    /**
184
    *
185
    * @return the stream representing the inner memory buffer
186
    */
187
0
    std::shared_ptr<std::iostream> get_stream() const { return _stream_ptr; }
188
189
    /**
190
    * Currently only used for small file to set callback
191
    */
192
71.3k
    void set_upload_to_remote(std::function<void(UploadFileBuffer&)> cb) {
193
71.3k
        _upload_to_remote = std::move(cb);
194
71.3k
    }
195
196
private:
197
    std::function<void(UploadFileBuffer&)> _upload_to_remote = nullptr;
198
    std::shared_ptr<std::iostream> _stream_ptr; // point to _buffer.get_data()
199
200
    bool _is_cache_allocated {false};
201
    FileBlocksHolderPtr _holder;
202
    decltype(_holder->file_blocks.begin()) _cur_file_block;
203
    size_t _append_offset {0};
204
    uint32_t _crc_value = 0;
205
};
206
207
struct FileBufferBuilder {
208
71.7k
    FileBufferBuilder() = default;
209
71.7k
    ~FileBufferBuilder() = default;
210
    /**
211
    * build one file buffer using previously set properties
212
    * @return the file buffer's base shared pointer
213
    */
214
    Status build(std::shared_ptr<FileBuffer>* buf);
215
    /**
216
    * set the file buffer type
217
    *
218
    * @param type enum class for buffer type
219
    */
220
    FileBufferBuilder& set_type(BufferType type);
221
    /**
222
    * set the download callback which would download the content on cloud into file buffer
223
    *
224
    * @param cb 
225
    */
226
0
    FileBufferBuilder& set_download_callback(std::function<Status(Slice&)> cb) {
227
0
        _download = std::move(cb);
228
0
        return *this;
229
0
    }
230
    /**
231
    * set the upload callback which would upload the content inside buffer into remote storage
232
    *
233
    * @param cb 
234
    */
235
    FileBufferBuilder& set_upload_callback(std::function<void(UploadFileBuffer& buf)> cb);
236
    /**
237
    * set the callback which would do task sync for the caller
238
    *
239
    * @param cb 
240
    */
241
    FileBufferBuilder& set_sync_after_complete_task(std::function<bool(Status)> cb);
242
    /**
243
    * set the callback which detect whether the task is done
244
    *
245
    * @param cb 
246
    */
247
71.7k
    FileBufferBuilder& set_is_cancelled(std::function<bool()> cb) {
248
71.7k
        _is_cancelled = std::move(cb);
249
71.7k
        return *this;
250
71.7k
    }
251
    /**
252
    * set the callback which allocate file cache block holder
253
    * **Notice**: Because the load file cache workload coule be done
254
    * asynchronously so you must make sure all the dependencies of this
255
    * cb could last until this cb is invoked
256
    * @param cb 
257
    */
258
    FileBufferBuilder& set_allocate_file_blocks_holder(std::function<FileBlocksHolderPtr()> cb);
259
    /**
260
    * set the file offset of the file buffer
261
    *
262
    * @param cb 
263
    */
264
71.7k
    FileBufferBuilder& set_file_offset(size_t offset) {
265
71.7k
        _offset = offset;
266
71.7k
        return *this;
267
71.7k
    }
268
    /**
269
    * set the callback which write the content into local file cache
270
    *
271
    * @param cb 
272
    */
273
    FileBufferBuilder& set_write_to_local_file_cache(
274
0
            std::function<void(FileBlocksHolderPtr, Slice)> cb) {
275
0
        _write_to_local_file_cache = std::move(cb);
276
0
        return *this;
277
0
    }
278
    /**
279
    * set the callback which would write the downloaded content into user's buffer
280
    *
281
    * @param cb 
282
    */
283
0
    FileBufferBuilder& set_write_to_use_buffer(std::function<void(Slice, size_t)> cb) {
284
0
        _write_to_use_buffer = std::move(cb);
285
0
        return *this;
286
0
    }
287
288
    BufferType _type;
289
    std::function<void(UploadFileBuffer& buf)> _upload_cb = nullptr;
290
    std::function<bool(Status)> _sync_after_complete_task = nullptr;
291
    std::function<FileBlocksHolderPtr()> _alloc_holder_cb = nullptr;
292
    std::function<bool()> _is_cancelled = nullptr;
293
    std::function<void(FileBlocksHolderPtr, Slice)> _write_to_local_file_cache;
294
    std::function<Status(Slice&)> _download;
295
    std::function<void(Slice, size_t)> _write_to_use_buffer;
296
    size_t _offset;
297
};
298
} // namespace io
299
} // namespace doris