Coverage Report

Created: 2026-03-16 21:03

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/s3_file_bufferpool.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/s3_file_bufferpool.h"
19
20
#include <bvar/bvar.h>
21
#include <crc32c/crc32c.h>
22
23
#include <chrono>
24
#include <memory>
25
26
#include "common/config.h"
27
#include "common/exception.h"
28
#include "common/logging.h"
29
#include "common/status.h"
30
#include "core/arena.h"
31
#include "cpp/sync_point.h"
32
#include "io/cache/file_block.h"
33
#include "io/cache/file_cache_common.h"
34
#include "io/fs/s3_common.h"
35
#include "runtime/exec_env.h"
36
#include "runtime/thread_context.h"
37
#include "util/defer_op.h"
38
#include "util/slice.h"
39
40
namespace doris {
41
namespace io {
42
43
bvar::Adder<uint64_t> s3_file_buffer_allocated("s3_file_buffer_allocated");
44
45
template <typename Allocator = Allocator<false>>
46
struct Memory : boost::noncopyable, Allocator {
47
    Memory() = default;
48
1.31k
    explicit Memory(size_t size) : _size(size) {
49
1.31k
        alloc(size);
50
1.31k
        s3_file_buffer_allocated << 1;
51
1.31k
    }
52
1.31k
    ~Memory() {
53
1.31k
        dealloc();
54
1.31k
        s3_file_buffer_allocated << -1;
55
1.31k
    }
56
1.31k
    void alloc(size_t size) { _data = static_cast<char*>(Allocator::alloc(size, 0)); }
57
1.31k
    void dealloc() {
58
1.31k
        if (_data == nullptr) {
59
0
            return;
60
0
        }
61
1.31k
        Allocator::free(_data, _size);
62
1.31k
        _data = nullptr;
63
1.31k
    }
64
    size_t _size;
65
    char* _data;
66
};
67
68
struct FileBuffer::PartData {
69
    Memory<> _memory;
70
1.31k
    PartData() : _memory(config::s3_write_buffer_size) {}
71
1.31k
    ~PartData() = default;
72
13.5k
    [[nodiscard]] Slice data() const { return Slice {_memory._data, _memory._size}; }
73
1.31k
    [[nodiscard]] size_t size() const { return _memory._size; }
74
};
75
76
0
Slice FileBuffer::get_slice() const {
77
0
    return _inner_data->data();
78
0
}
79
80
FileBuffer::FileBuffer(BufferType type, std::function<FileBlocksHolderPtr()> alloc_holder,
81
                       size_t offset, OperationState state)
82
1.31k
        : _type(type),
83
1.31k
          _alloc_holder(std::move(alloc_holder)),
84
1.31k
          _offset(offset),
85
1.31k
          _size(0),
86
1.31k
          _state(std::move(state)),
87
1.31k
          _inner_data(std::make_unique<FileBuffer::PartData>()),
88
1.31k
          _capacity(_inner_data->size()) {}
89
90
1.31k
FileBuffer::~FileBuffer() {
91
1.31k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->s3_file_buffer_tracker());
92
1.31k
    _inner_data.reset();
93
1.31k
}
94
95
/**
96
 * 0. when there is memory preserved, directly write data to buf
97
 * 1. write to file cache otherwise, then we'll wait for free buffer and to rob it
98
 */
99
10.6k
Status UploadFileBuffer::append_data(const Slice& data) {
100
10.6k
    TEST_SYNC_POINT_RETURN_WITH_VALUE("UploadFileBuffer::append_data", Status::OK(), this,
101
9.63k
                                      data.get_size());
102
9.63k
    std::memcpy((void*)(_inner_data->data().get_data() + _size), data.get_data(), data.get_size());
103
9.63k
    _size += data.get_size();
104
9.63k
    _crc_value = crc32c::Extend(_crc_value, (const uint8_t*)data.get_data(), data.get_size());
105
9.63k
    return Status::OK();
106
10.6k
}
107
108
/**
109
 * 0. constrcut the stream ptr if the buffer is not empty
110
 * 1. submit the on_upload() callback to executor
111
 */
112
1.31k
static Status submit_upload_buffer(std::shared_ptr<FileBuffer> buffer) {
113
1.31k
    TEST_SYNC_POINT_RETURN_WITH_VALUE("UploadFileBuffer::submit", Status::OK(), buffer.get());
114
1.31k
    return ExecEnv::GetInstance()->s3_file_upload_thread_pool()->submit_func(
115
1.31k
            [buf = std::move(buffer)]() { buf->execute_async(); });
116
1.31k
}
117
118
0
std::ostream& operator<<(std::ostream& os, const BufferType& value) {
119
0
    switch (value) {
120
0
    case BufferType::UPLOAD:
121
0
        os << "upload";
122
0
        break;
123
0
    case BufferType::DOWNLOAD:
124
0
        os << "download";
125
0
        break;
126
0
    default:
127
0
        auto cast_value = static_cast<uint32_t>(value);
128
0
        os << cast_value;
129
0
    }
130
0
    return os;
131
0
}
132
133
1.31k
Status FileBuffer::submit(std::shared_ptr<FileBuffer> buf) {
134
1.31k
    switch (buf->_type) {
135
1.31k
    case BufferType::UPLOAD:
136
1.31k
        return submit_upload_buffer(std::move(buf));
137
0
        break;
138
0
    default:
139
0
        CHECK(false) << "should never come here, the illegal type is " << buf->_type;
140
1.31k
    };
141
0
    return Status::InternalError("should never come here");
142
1.31k
}
143
144
1.30k
std::string_view FileBuffer::get_string_view_data() const {
145
1.30k
    return {_inner_data->data().get_data(), _size};
146
1.30k
}
147
148
1.31k
void UploadFileBuffer::on_upload() {
149
1.31k
    _stream_ptr = std::make_shared<StringViewStream>(_inner_data->data().get_data(), _size);
150
1.31k
    if (_crc_value != crc32c::Crc32c(_inner_data->data().get_data(), _size)) {
151
0
        DCHECK(false);
152
0
        set_status(Status::IOError("Buffer checksum not match"));
153
0
        return;
154
0
    }
155
1.31k
    _upload_to_remote(*this);
156
1.31k
    if (config::enable_flush_file_cache_async) {
157
        // If we call is_cancelled() after _state.set_status() then there might one situation where
158
        // s3 file writer is already destructed
159
1.31k
        bool cancelled = is_cancelled();
160
1.31k
        _state.set_status();
161
        // this control flow means the buf and the stream shares one memory
162
        // so we can directly use buf here
163
1.31k
        upload_to_local_file_cache(cancelled);
164
1.31k
    } else {
165
0
        upload_to_local_file_cache(is_cancelled());
166
0
        _state.set_status();
167
0
    }
168
1.31k
}
169
170
/**
171
 * write the content of the memory buffer to local file cache
172
 */
173
1.31k
void UploadFileBuffer::upload_to_local_file_cache(bool is_cancelled) {
174
1.31k
    if (!config::enable_file_cache || _alloc_holder == nullptr) {
175
1.31k
        return;
176
1.31k
    }
177
0
    if (_holder) {
178
0
        return;
179
0
    }
180
0
    if (is_cancelled) {
181
0
        return;
182
0
    }
183
0
    TEST_INJECTION_POINT_CALLBACK("UploadFileBuffer::upload_to_local_file_cache");
184
    // the data is already written to S3 in this situation
185
    // so i didn't handle the file cache write error
186
0
    _holder = _alloc_holder();
187
0
    size_t pos = 0;
188
0
    size_t data_remain_size = _size;
189
0
    for (auto& block : _holder->file_blocks) {
190
0
        if (data_remain_size == 0) {
191
0
            break;
192
0
        }
193
0
        size_t block_size = block->range().size();
194
0
        size_t append_size = std::min(data_remain_size, block_size);
195
0
        if (block->state() == FileBlock::State::EMPTY) {
196
0
            block->get_or_set_downloader();
197
            // Another thread may have started downloading due to a query
198
            // Just skip putting to cache from UploadFileBuffer
199
0
            if (block->is_downloader()) {
200
0
                Slice s(_inner_data->data().get_data() + pos, append_size);
201
0
                Status st = block->append(s);
202
0
                TEST_INJECTION_POINT_CALLBACK("UploadFileBuffer::upload_to_local_file_cache_inject",
203
0
                                              &st);
204
0
                if (st.ok()) {
205
0
                    st = block->finalize();
206
0
                }
207
0
                if (!st.ok()) {
208
0
                    {
209
0
                        [[maybe_unused]] bool ret = false;
210
0
                        TEST_SYNC_POINT_CALLBACK("UploadFileBuffer::upload_to_local_file_cache",
211
0
                                                 &ret);
212
0
                    }
213
0
                    LOG_WARNING("failed to append data to file cache").error(st);
214
0
                }
215
0
            }
216
0
        }
217
0
        data_remain_size -= append_size;
218
0
        pos += append_size;
219
0
    }
220
0
}
221
222
1.31k
FileBufferBuilder& FileBufferBuilder::set_type(BufferType type) {
223
1.31k
    _type = type;
224
1.31k
    return *this;
225
1.31k
}
226
FileBufferBuilder& FileBufferBuilder::set_upload_callback(
227
1.31k
        std::function<void(UploadFileBuffer& buf)> cb) {
228
1.31k
    _upload_cb = std::move(cb);
229
1.31k
    return *this;
230
1.31k
}
231
// set callback to do task sync for the caller
232
1.31k
FileBufferBuilder& FileBufferBuilder::set_sync_after_complete_task(std::function<bool(Status)> cb) {
233
1.31k
    _sync_after_complete_task = std::move(cb);
234
1.31k
    return *this;
235
1.31k
}
236
237
FileBufferBuilder& FileBufferBuilder::set_allocate_file_blocks_holder(
238
0
        std::function<FileBlocksHolderPtr()> cb) {
239
0
    _alloc_holder_cb = std::move(cb);
240
0
    return *this;
241
0
}
242
243
1.31k
Status FileBufferBuilder::build(std::shared_ptr<FileBuffer>* buf) {
244
1.31k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->s3_file_buffer_tracker());
245
1.31k
    OperationState state(_sync_after_complete_task, _is_cancelled);
246
247
1.31k
    if (_type == BufferType::UPLOAD) {
248
1.31k
        RETURN_IF_CATCH_EXCEPTION(*buf = std::make_shared<UploadFileBuffer>(
249
1.31k
                                          std::move(_upload_cb), std::move(state), _offset,
250
1.31k
                                          std::move(_alloc_holder_cb)));
251
1.31k
        return Status::OK();
252
1.31k
    }
253
0
    if (_type == BufferType::DOWNLOAD) {
254
0
        RETURN_IF_CATCH_EXCEPTION(*buf = std::make_shared<DownloadFileBuffer>(
255
0
                                          std::move(_download),
256
0
                                          std::move(_write_to_local_file_cache),
257
0
                                          std::move(_write_to_use_buffer), std::move(state),
258
0
                                          _offset, std::move(_alloc_holder_cb)));
259
0
        return Status::OK();
260
0
    }
261
    // should never come here
262
0
    return Status::InternalError("unsupport buffer type {}", _type);
263
0
}
264
265
/**
266
 * 0. check if we need to write into cache
267
 * 1. check if there is free space inside the file cache
268
 * 2. call the download callback
269
 * 3. write the downloaded content into user buffer if necessary
270
 */
271
0
void DownloadFileBuffer::on_download() {
272
0
    auto s = Status::OK();
273
0
    Defer def {[&]() { _state.set_status(std::move(s)); }};
274
0
    if (is_cancelled()) {
275
0
        return;
276
0
    }
277
0
    FileBlocksHolderPtr holder = nullptr;
278
0
    bool need_to_download_into_cache = false;
279
0
    if (_alloc_holder != nullptr) {
280
0
        holder = _alloc_holder();
281
0
        std::for_each(holder->file_blocks.begin(), holder->file_blocks.end(),
282
0
                      [&need_to_download_into_cache](FileBlockSPtr& file_block) {
283
0
                          if (file_block->state() == FileBlock::State::EMPTY) {
284
0
                              file_block->get_or_set_downloader();
285
0
                              if (file_block->is_downloader()) {
286
0
                                  need_to_download_into_cache = true;
287
0
                              }
288
0
                          }
289
0
                      });
290
0
        if (!need_to_download_into_cache && !_write_to_use_buffer) [[unlikely]] {
291
0
            LOG(INFO) << "Skipping download because that there is no space for catch data.";
292
0
        } else {
293
0
            Slice tmp = _inner_data->data();
294
0
            s = _download(tmp);
295
0
            if (s) {
296
0
                _size = tmp.get_size();
297
0
                if (_write_to_use_buffer != nullptr) {
298
0
                    _write_to_use_buffer({_inner_data->data().get_data(), get_size()},
299
0
                                         get_file_offset());
300
0
                }
301
0
                if (need_to_download_into_cache) {
302
0
                    _write_to_local_file_cache(std::move(holder),
303
0
                                               Slice {_inner_data->data().get_data(), _size});
304
0
                }
305
0
            } else {
306
0
                LOG(WARNING) << s;
307
0
            }
308
0
            _state.set_status(std::move(s));
309
0
        }
310
0
    } else {
311
0
        Slice tmp = _inner_data->data();
312
0
        s = _download(tmp);
313
0
        _size = tmp.get_size();
314
0
        if (s.ok() && _write_to_use_buffer != nullptr) {
315
0
            _write_to_use_buffer({_inner_data->data().get_data(), get_size()}, get_file_offset());
316
0
        }
317
0
    }
318
0
}
319
320
} // namespace io
321
} // namespace doris