Coverage Report

Created: 2026-07-03 18:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/cache/file_block.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/cache/file_block.h"
19
20
#include <butil/iobuf.h>
21
#include <glog/logging.h>
22
// IWYU pragma: no_include <bits/chrono.h>
23
#include <chrono> // IWYU pragma: keep
24
#include <sstream>
25
#include <string>
26
#include <thread>
27
28
#include "common/status.h"
29
#include "cpp/sync_point.h"
30
#include "io/cache/block_file_cache.h"
31
#include "io/cache/fs_file_cache_storage.h"
32
#include "io/cache/mem_file_cache_storage.h"
33
34
namespace doris {
35
namespace io {
36
37
namespace {
38
39
13
Status abort_pending_cache_write(FileCacheStorage* storage, const FileCacheKey& key) {
40
13
    if (auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(storage); fs_storage != nullptr) {
41
13
        return fs_storage->abort(key);
42
13
    }
43
0
    if (auto* mem_storage = dynamic_cast<MemFileCacheStorage*>(storage); mem_storage != nullptr) {
44
0
        return mem_storage->abort(key);
45
0
    }
46
0
    return Status::OK();
47
0
}
48
49
} // namespace
50
51
10.8k
std::ostream& operator<<(std::ostream& os, const FileBlock::State& value) {
52
10.8k
    os << FileBlock::state_to_string(value);
53
10.8k
    return os;
54
10.8k
}
55
56
FileBlock::FileBlock(const FileCacheKey& key, size_t size, BlockFileCache* mgr,
57
                     State download_state)
58
586k
        : _block_range(key.offset, key.offset + size - 1),
59
586k
          _download_state(download_state),
60
586k
          _mgr(mgr),
61
586k
          _key(key) {
62
    /// On creation, file block state can be EMPTY, DOWNLOADED, SKIP_CACHE.
63
586k
    switch (_download_state) {
64
0
    case State::DOWNLOADING: {
65
0
        DCHECK(false) << "Can create cell with either EMPTY, DOWNLOADED, SKIP_CACHE ";
66
0
        break;
67
0
    }
68
586k
    default: {
69
586k
        break;
70
0
    }
71
586k
    }
72
586k
}
73
74
2.21M
FileBlock::State FileBlock::state() const {
75
2.21M
    std::lock_guard block_lock(_mutex);
76
2.21M
    return _download_state;
77
2.21M
}
78
79
1.22M
FileBlock::State FileBlock::state_unsafe() const {
80
1.22M
    return _download_state;
81
1.22M
}
82
83
1.56M
uint64_t FileBlock::get_caller_id() {
84
1.56M
    uint64_t id;
85
#if defined(__APPLE__)
86
    // On macOS, use pthread_threadid_np to get the thread ID
87
    pthread_threadid_np(nullptr, &id);
88
#else
89
1.56M
    id = static_cast<uint64_t>(pthread_self());
90
1.56M
#endif
91
1.56M
    DCHECK(id != 0);
92
1.56M
    return id;
93
1.56M
}
94
95
167k
uint64_t FileBlock::get_or_set_downloader() {
96
167k
    std::lock_guard block_lock(_mutex);
97
98
167k
    if (_downloader_id == 0 && _download_state != State::DOWNLOADED) {
99
167k
        DCHECK(_download_state != State::DOWNLOADING);
100
167k
        _downloader_id = get_caller_id();
101
167k
        _download_state = State::DOWNLOADING;
102
167k
    } else if (_downloader_id == get_caller_id()) {
103
4
        LOG(INFO) << "Attempt to set the same downloader for block " << range().to_string()
104
4
                  << " for the second time";
105
4
    }
106
107
167k
    return _downloader_id;
108
167k
}
109
110
24
void FileBlock::reset_downloader(std::lock_guard<std::mutex>& block_lock) {
111
24
    DCHECK(_downloader_id != 0) << "There is no downloader";
112
113
24
    DCHECK(get_caller_id() == _downloader_id) << "Downloader can be reset only by downloader";
114
115
24
    reset_downloader_impl(block_lock);
116
24
}
117
118
24
void FileBlock::reset_downloader_impl(std::lock_guard<std::mutex>& block_lock) {
119
24
    if (_downloaded_size == range().size()) {
120
2
        Status st = set_downloaded(block_lock);
121
2
        if (!st.ok()) {
122
1
            LOG(WARNING) << "reset downloader failed, err=" << st
123
1
                         << ", block=" << get_info_for_log_impl(block_lock);
124
1
        }
125
22
    } else {
126
22
        _downloaded_size = 0;
127
22
        _download_state = State::EMPTY;
128
22
        _downloader_id = 0;
129
22
    }
130
24
}
131
132
166k
Status FileBlock::set_downloaded(std::lock_guard<std::mutex>& block_lock) {
133
166k
    DCHECK(_download_state != State::DOWNLOADED);
134
166k
    if (_downloaded_size == 0) {
135
1
        _download_state = State::EMPTY;
136
1
        _downloader_id = 0;
137
1
        LOG(WARNING) << "set file cache block downloaded failed: empty block, block="
138
1
                     << get_info_for_log_impl(block_lock);
139
1
        return Status::InternalError("Try to set empty block {} as downloaded",
140
1
                                     _block_range.to_string());
141
1
    }
142
166k
    Status status = _mgr->_storage->finalize(_key, this->_block_range.size());
143
166k
    if (status.ok()) [[likely]] {
144
166k
        _download_state = State::DOWNLOADED;
145
166k
    } else {
146
568
        auto abort_st = abort_pending_cache_write(_mgr->_storage.get(), _key);
147
568
        LOG(WARNING) << "finalize file cache block failed, err=" << status
148
568
                     << ", abort_status=" << abort_st
149
568
                     << ", block=" << get_info_for_log_impl(block_lock);
150
568
        _download_state = State::EMPTY;
151
568
        _downloaded_size = 0;
152
568
    }
153
166k
    _downloader_id = 0;
154
166k
    return status;
155
166k
}
156
157
16
uint64_t FileBlock::get_downloader() const {
158
16
    std::lock_guard block_lock(_mutex);
159
16
    return _downloader_id;
160
16
}
161
162
162k
bool FileBlock::is_downloader() const {
163
162k
    std::lock_guard block_lock(_mutex);
164
162k
    return get_caller_id() == _downloader_id;
165
162k
}
166
167
1.22M
bool FileBlock::is_downloader_impl(std::lock_guard<std::mutex>& /* block_lock */) const {
168
1.22M
    return get_caller_id() == _downloader_id;
169
1.22M
}
170
171
166k
Status FileBlock::append(Slice data) {
172
166k
    return appendv(&data, 1);
173
166k
}
174
175
166k
Status FileBlock::appendv(const Slice* data, size_t data_cnt) {
176
166k
    size_t appended_size = 0;
177
333k
    for (size_t idx = 0; idx < data_cnt; ++idx) {
178
166k
        appended_size += data[idx].size;
179
166k
    }
180
18.4E
    DCHECK(appended_size != 0) << "Writing zero size is not allowed";
181
166k
    auto st = _mgr->_storage->appendv(_key, data, data_cnt);
182
166k
    if (!st.ok()) {
183
6
        auto abort_st = abort_pending_cache_write(_mgr->_storage.get(), _key);
184
6
        LOG(WARNING) << "appendv file cache block failed, append_size=" << appended_size
185
6
                     << ", slice_count=" << data_cnt << ", err=" << st
186
6
                     << ", abort_status=" << abort_st << ", block=" << get_info_for_log();
187
6
        return st;
188
6
    }
189
166k
    _downloaded_size += appended_size;
190
166k
    return Status::OK();
191
166k
}
192
193
48
Status FileBlock::append_iobuf(const butil::IOBuf& data) {
194
48
    const size_t appended_size = data.length();
195
48
    DCHECK(appended_size != 0) << "Writing zero size is not allowed";
196
48
    auto st = _mgr->_storage->append_iobuf(_key, data);
197
48
    if (!st.ok()) {
198
2
        auto abort_st = abort_pending_cache_write(_mgr->_storage.get(), _key);
199
2
        LOG(WARNING) << "append_iobuf file cache block failed, append_size=" << appended_size
200
2
                     << ", err=" << st << ", abort_status=" << abort_st
201
2
                     << ", block=" << get_info_for_log();
202
2
        return st;
203
2
    }
204
46
    _downloaded_size += appended_size;
205
46
    return Status::OK();
206
48
}
207
208
166k
Status FileBlock::finalize() {
209
166k
    if (_downloaded_size == 0) {
210
1
        std::lock_guard block_lock(_mutex);
211
1
        _download_state = State::EMPTY;
212
1
        _downloader_id = 0;
213
1
        _cv.notify_all();
214
1
        LOG(WARNING) << "finalize file cache block failed: empty block, block="
215
1
                     << get_info_for_log_impl(block_lock);
216
1
        return Status::InternalError("Try to finalize an empty file block {}",
217
1
                                     _block_range.to_string());
218
1
    }
219
166k
    if (_downloaded_size != _block_range.size()) {
220
43.9k
        SCOPED_CACHE_LOCK(_mgr->_mutex, _mgr);
221
43.9k
        size_t old_size = _block_range.size();
222
43.9k
        size_t new_size = _downloaded_size;
223
43.9k
        DCHECK(new_size < old_size);
224
43.9k
        _mgr->reset_range(_key.hash, _block_range.left, old_size, new_size, cache_lock);
225
43.9k
    }
226
166k
    std::lock_guard block_lock(_mutex);
227
166k
    Status st = set_downloaded(block_lock);
228
166k
    _cv.notify_all();
229
166k
    return st;
230
166k
}
231
232
3.62M
Status FileBlock::read(Slice buffer, size_t read_offset) {
233
3.62M
    return _mgr->_storage->read(_key, read_offset, buffer);
234
3.62M
}
235
236
Status FileBlock::read_to_iobuf(butil::IOBuf* out, size_t read_offset, size_t bytes_req,
237
9
                                size_t* bytes_read) {
238
9
    return _mgr->_storage->read_to_iobuf(_key, read_offset, bytes_req, out, bytes_read);
239
9
}
240
241
2.19k
Status FileBlock::change_cache_type(FileCacheType new_type) {
242
2.19k
    SCOPED_CACHE_LOCK(_mgr->_mutex, _mgr);
243
2.19k
    return change_cache_type_lock(new_type, cache_lock);
244
2.19k
}
245
246
Status FileBlock::change_cache_type_lock(FileCacheType new_type,
247
2.19k
                                         std::lock_guard<std::mutex>& cache_lock) {
248
2.19k
    std::lock_guard block_lock(_mutex);
249
250
2.19k
    if (new_type == _key.meta.type) {
251
1
        return Status::OK();
252
1
    }
253
2.19k
    if (_download_state == State::DOWNLOADED) {
254
8
        Status st;
255
8
        TEST_SYNC_POINT_CALLBACK("FileBlock::change_cache_type", &st);
256
8
        RETURN_IF_ERROR(_mgr->_storage->change_key_meta_type(_key, new_type, _block_range.size()));
257
8
    }
258
2.19k
    _mgr->change_cache_type(_key.hash, _block_range.left, new_type, cache_lock);
259
2.19k
    _key.meta.type = new_type;
260
2.19k
    return Status::OK();
261
2.19k
}
262
263
9.17k
FileBlock::State FileBlock::wait() {
264
9.17k
    std::unique_lock block_lock(_mutex);
265
266
9.17k
    if (_downloader_id == 0) {
267
2
        return _download_state;
268
2
    }
269
270
9.17k
    if (_download_state == State::DOWNLOADING) {
271
9.17k
        DCHECK(_downloader_id != 0 && _downloader_id != get_caller_id());
272
9.17k
        _cv.wait_for(block_lock, std::chrono::milliseconds(config::block_cache_wait_timeout_ms));
273
9.17k
    }
274
275
9.17k
    return _download_state;
276
9.17k
}
277
278
1.22M
void FileBlock::complete_unlocked(std::lock_guard<std::mutex>& block_lock) {
279
1.22M
    if (is_downloader_impl(block_lock)) {
280
24
        reset_downloader(block_lock);
281
24
        _cv.notify_all();
282
24
    }
283
1.22M
}
284
285
14
std::string FileBlock::get_info_for_log() const {
286
14
    std::lock_guard block_lock(_mutex);
287
14
    return get_info_for_log_impl(block_lock);
288
14
}
289
290
22
std::string FileBlock::get_info_for_log_impl(std::lock_guard<std::mutex>& block_lock) const {
291
22
    std::stringstream info;
292
22
    info << "File block: " << range().to_string() << ", ";
293
22
    info << "hash: " << _key.hash.to_string() << ", ";
294
22
    info << "cache_type: " << cache_type_to_string(_key.meta.type) << ", ";
295
22
    info << "state: " << state_to_string(_download_state) << ", ";
296
22
    info << "size: " << _block_range.size() << ", ";
297
22
    info << "downloaded_size: " << _downloaded_size << ", ";
298
22
    info << "downloader id: " << _downloader_id << ", ";
299
22
    info << "caller id: " << get_caller_id() << ", ";
300
301
22
    std::string cache_file = "<unknown>";
302
22
    if (_mgr != nullptr && _mgr->_storage != nullptr) {
303
21
        cache_file = _mgr->_storage->get_local_file(_key);
304
21
    }
305
22
    info << "cache_file: " << cache_file;
306
307
22
    return info.str();
308
22
}
309
310
1.69M
FileBlock::State FileBlock::state_unlock(std::lock_guard<std::mutex>&) const {
311
1.69M
    return _download_state;
312
1.69M
}
313
314
10.9k
std::string FileBlock::state_to_string(FileBlock::State state) {
315
10.9k
    switch (state) {
316
10.8k
    case FileBlock::State::DOWNLOADED:
317
10.8k
        return "DOWNLOADED";
318
9
    case FileBlock::State::EMPTY:
319
9
        return "EMPTY";
320
17
    case FileBlock::State::DOWNLOADING:
321
17
        return "DOWNLOADING";
322
1
    case FileBlock::State::SKIP_CACHE:
323
1
        return "SKIP_CACHE";
324
0
    default:
325
0
        DCHECK(false);
326
0
        return "";
327
10.9k
    }
328
10.9k
}
329
330
8
std::string FileBlock::get_cache_file() const {
331
8
    return _mgr->_storage->get_local_file(this->_key);
332
8
}
333
334
1.04M
FileBlocksHolder::~FileBlocksHolder() {
335
2.26M
    for (auto file_block_it = file_blocks.begin(); file_block_it != file_blocks.end();) {
336
1.22M
        auto current_file_block_it = file_block_it;
337
1.22M
        auto& file_block = *current_file_block_it;
338
1.22M
        BlockFileCache* _mgr = file_block->_mgr;
339
1.22M
        {
340
1.22M
            bool should_remove = false;
341
1.22M
            {
342
1.22M
                std::lock_guard block_lock(file_block->_mutex);
343
1.22M
                file_block->complete_unlocked(block_lock);
344
1.22M
                if (file_block.use_count() == 2 &&
345
1.22M
                    (file_block->is_deleting() ||
346
787k
                     file_block->state_unlock(block_lock) == FileBlock::State::EMPTY)) {
347
160k
                    should_remove = true;
348
160k
                }
349
1.22M
            }
350
1.22M
            if (should_remove) {
351
160k
                SCOPED_CACHE_LOCK(_mgr->_mutex, _mgr);
352
160k
                std::lock_guard block_lock(file_block->_mutex);
353
160k
                if (file_block.use_count() == 2) {
354
160k
                    DCHECK(file_block->state_unlock(block_lock) != FileBlock::State::DOWNLOADING);
355
                    // one in cache, one in here
356
160k
                    if (file_block->is_deleting() ||
357
160k
                        file_block->state_unlock(block_lock) == FileBlock::State::EMPTY) {
358
160k
                        _mgr->remove(file_block, cache_lock, block_lock, false);
359
160k
                    }
360
160k
                }
361
160k
            }
362
1.22M
        }
363
1.22M
        file_block_it = file_blocks.erase(current_file_block_it);
364
1.22M
    }
365
1.04M
}
366
367
} // namespace io
368
} // namespace doris