Coverage Report

Created: 2026-03-15 16:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/cache/file_block.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/cache/file_block.h"
19
20
#include <glog/logging.h>
21
// IWYU pragma: no_include <bits/chrono.h>
22
#include <chrono> // IWYU pragma: keep
23
#include <sstream>
24
#include <string>
25
#include <thread>
26
27
#include "common/status.h"
28
#include "cpp/sync_point.h"
29
#include "io/cache/block_file_cache.h"
30
31
namespace doris {
32
namespace io {
33
34
3
std::ostream& operator<<(std::ostream& os, const FileBlock::State& value) {
35
3
    os << FileBlock::state_to_string(value);
36
3
    return os;
37
3
}
38
39
FileBlock::FileBlock(const FileCacheKey& key, size_t size, BlockFileCache* mgr,
40
                     State download_state)
41
9.87k
        : _block_range(key.offset, key.offset + size - 1),
42
9.87k
          _download_state(download_state),
43
9.87k
          _mgr(mgr),
44
9.87k
          _key(key) {
45
    /// On creation, file block state can be EMPTY, DOWNLOADED, SKIP_CACHE.
46
9.87k
    switch (_download_state) {
47
0
    case State::DOWNLOADING: {
48
0
        DCHECK(false) << "Can create cell with either EMPTY, DOWNLOADED, SKIP_CACHE ";
49
0
        break;
50
0
    }
51
9.87k
    default: {
52
9.87k
        break;
53
0
    }
54
9.87k
    }
55
9.87k
}
56
57
17.5k
FileBlock::State FileBlock::state() const {
58
17.5k
    std::lock_guard block_lock(_mutex);
59
17.5k
    return _download_state;
60
17.5k
}
61
62
10.4k
FileBlock::State FileBlock::state_unsafe() const {
63
10.4k
    return _download_state;
64
10.4k
}
65
66
23.2k
uint64_t FileBlock::get_caller_id() {
67
23.2k
    uint64_t id;
68
#if defined(__APPLE__)
69
    // On macOS, use pthread_threadid_np to get the thread ID
70
    pthread_threadid_np(nullptr, &id);
71
#else
72
23.2k
    id = static_cast<uint64_t>(pthread_self());
73
23.2k
#endif
74
23.2k
    DCHECK(id != 0);
75
23.2k
    return id;
76
23.2k
}
77
78
28
void FileBlock::set_cache_type(FileCacheType new_type) {
79
28
    std::lock_guard block_lock(_mutex);
80
28
    _key.meta.type = new_type;
81
28
}
82
83
6.36k
uint64_t FileBlock::get_or_set_downloader() {
84
6.36k
    std::lock_guard block_lock(_mutex);
85
86
6.36k
    if (_downloader_id == 0 && _download_state != State::DOWNLOADED) {
87
6.34k
        DCHECK(_download_state != State::DOWNLOADING);
88
6.34k
        _downloader_id = get_caller_id();
89
6.34k
        _download_state = State::DOWNLOADING;
90
6.34k
    } else if (_downloader_id == get_caller_id()) {
91
4
        LOG(INFO) << "Attempt to set the same downloader for block " << range().to_string()
92
4
                  << " for the second time";
93
4
    }
94
95
6.36k
    return _downloader_id;
96
6.36k
}
97
98
18
void FileBlock::reset_downloader(std::lock_guard<std::mutex>& block_lock) {
99
18
    DCHECK(_downloader_id != 0) << "There is no downloader";
100
101
18
    DCHECK(get_caller_id() == _downloader_id) << "Downloader can be reset only by downloader";
102
103
18
    reset_downloader_impl(block_lock);
104
18
}
105
106
18
void FileBlock::reset_downloader_impl(std::lock_guard<std::mutex>& block_lock) {
107
18
    if (_downloaded_size == range().size()) {
108
2
        Status st = set_downloaded(block_lock);
109
2
        if (!st.ok()) {
110
1
            LOG(WARNING) << "reset downloader error" << st;
111
1
        }
112
16
    } else {
113
16
        _downloaded_size = 0;
114
16
        _download_state = State::EMPTY;
115
16
        _downloader_id = 0;
116
16
    }
117
18
}
118
119
6.34k
Status FileBlock::set_downloaded(std::lock_guard<std::mutex>& /* block_lock */) {
120
6.34k
    DCHECK(_download_state != State::DOWNLOADED);
121
6.34k
    if (_downloaded_size == 0) {
122
1
        _download_state = State::EMPTY;
123
1
        _downloader_id = 0;
124
1
        return Status::InternalError("Try to set empty block {} as downloaded",
125
1
                                     _block_range.to_string());
126
1
    }
127
6.33k
    Status status = _mgr->_storage->finalize(_key, this->_block_range.size());
128
6.33k
    if (status.ok()) [[likely]] {
129
6.33k
        _download_state = State::DOWNLOADED;
130
6.33k
    } else {
131
4
        _download_state = State::EMPTY;
132
4
        _downloaded_size = 0;
133
4
    }
134
6.33k
    _downloader_id = 0;
135
6.33k
    return status;
136
6.34k
}
137
138
16
uint64_t FileBlock::get_downloader() const {
139
16
    std::lock_guard block_lock(_mutex);
140
16
    return _downloader_id;
141
16
}
142
143
1.00k
bool FileBlock::is_downloader() const {
144
1.00k
    std::lock_guard block_lock(_mutex);
145
1.00k
    return get_caller_id() == _downloader_id;
146
1.00k
}
147
148
10.4k
bool FileBlock::is_downloader_impl(std::lock_guard<std::mutex>& /* block_lock */) const {
149
10.4k
    return get_caller_id() == _downloader_id;
150
10.4k
}
151
152
6.34k
Status FileBlock::append(Slice data) {
153
6.34k
    DCHECK(data.size != 0) << "Writing zero size is not allowed";
154
6.34k
    RETURN_IF_ERROR(_mgr->_storage->append(_key, data));
155
6.34k
    _downloaded_size += data.size;
156
6.34k
    return Status::OK();
157
6.34k
}
158
159
6.33k
Status FileBlock::finalize() {
160
6.33k
    if (_downloaded_size == 0) {
161
1
        std::lock_guard block_lock(_mutex);
162
1
        _download_state = State::EMPTY;
163
1
        _downloader_id = 0;
164
1
        _cv.notify_all();
165
1
        return Status::InternalError("Try to finalize an empty file block {}",
166
1
                                     _block_range.to_string());
167
1
    }
168
6.33k
    if (_downloaded_size != _block_range.size()) {
169
3
        SCOPED_CACHE_LOCK(_mgr->_mutex, _mgr);
170
3
        size_t old_size = _block_range.size();
171
3
        _block_range.right = _block_range.left + _downloaded_size - 1;
172
3
        size_t new_size = _block_range.size();
173
3
        DCHECK(new_size < old_size);
174
3
        _mgr->reset_range(_key.hash, _block_range.left, old_size, new_size, cache_lock);
175
3
    }
176
6.33k
    std::lock_guard block_lock(_mutex);
177
6.33k
    Status st = set_downloaded(block_lock);
178
6.33k
    _cv.notify_all();
179
6.33k
    return st;
180
6.33k
}
181
182
8.93k
Status FileBlock::read(Slice buffer, size_t read_offset) {
183
8.93k
    return _mgr->_storage->read(_key, read_offset, buffer);
184
8.93k
}
185
186
9
Status FileBlock::change_cache_type(FileCacheType new_type) {
187
9
    SCOPED_CACHE_LOCK(_mgr->_mutex, _mgr);
188
9
    return change_cache_type_lock(new_type, cache_lock);
189
9
}
190
191
Status FileBlock::change_cache_type_lock(FileCacheType new_type,
192
10
                                         std::lock_guard<std::mutex>& cache_lock) {
193
10
    std::lock_guard block_lock(_mutex);
194
195
10
    if (new_type == _key.meta.type) {
196
1
        return Status::OK();
197
1
    }
198
9
    if (_download_state == State::DOWNLOADED) {
199
6
        Status st;
200
6
        TEST_SYNC_POINT_CALLBACK("FileBlock::change_cache_type", &st);
201
6
        RETURN_IF_ERROR(_mgr->_storage->change_key_meta_type(_key, new_type, _block_range.size()));
202
6
    }
203
9
    _mgr->change_cache_type(_key.hash, _block_range.left, new_type, cache_lock);
204
9
    _key.meta.type = new_type;
205
9
    return Status::OK();
206
9
}
207
208
32
FileBlock::State FileBlock::wait() {
209
32
    std::unique_lock block_lock(_mutex);
210
211
32
    if (_downloader_id == 0) {
212
1
        return _download_state;
213
1
    }
214
215
31
    if (_download_state == State::DOWNLOADING) {
216
31
        DCHECK(_downloader_id != 0 && _downloader_id != get_caller_id());
217
31
        _cv.wait_for(block_lock, std::chrono::milliseconds(config::block_cache_wait_timeout_ms));
218
31
    }
219
220
31
    return _download_state;
221
32
}
222
223
10.4k
void FileBlock::complete_unlocked(std::lock_guard<std::mutex>& block_lock) {
224
10.4k
    if (is_downloader_impl(block_lock)) {
225
18
        reset_downloader(block_lock);
226
18
        _cv.notify_all();
227
18
    }
228
10.4k
}
229
230
1
std::string FileBlock::get_info_for_log() const {
231
1
    std::lock_guard block_lock(_mutex);
232
1
    return get_info_for_log_impl(block_lock);
233
1
}
234
235
1
std::string FileBlock::get_info_for_log_impl(std::lock_guard<std::mutex>& block_lock) const {
236
1
    std::stringstream info;
237
1
    info << "File block: " << range().to_string() << ", ";
238
1
    info << "state: " << state_to_string(_download_state) << ", ";
239
1
    info << "size: " << _block_range.size() << ", ";
240
1
    info << "downloader id: " << _downloader_id << ", ";
241
1
    info << "caller id: " << get_caller_id();
242
243
1
    return info.str();
244
1
}
245
246
18.1k
FileBlock::State FileBlock::state_unlock(std::lock_guard<std::mutex>&) const {
247
18.1k
    return _download_state;
248
18.1k
}
249
250
8
std::string FileBlock::state_to_string(FileBlock::State state) {
251
8
    switch (state) {
252
4
    case FileBlock::State::DOWNLOADED:
253
4
        return "DOWNLOADED";
254
1
    case FileBlock::State::EMPTY:
255
1
        return "EMPTY";
256
2
    case FileBlock::State::DOWNLOADING:
257
2
        return "DOWNLOADING";
258
1
    case FileBlock::State::SKIP_CACHE:
259
1
        return "SKIP_CACHE";
260
0
    default:
261
0
        DCHECK(false);
262
0
        return "";
263
8
    }
264
8
}
265
266
1
std::string FileBlock::get_cache_file() const {
267
1
    return _mgr->_storage->get_local_file(this->_key);
268
1
}
269
270
9.72k
FileBlocksHolder::~FileBlocksHolder() {
271
20.1k
    for (auto file_block_it = file_blocks.begin(); file_block_it != file_blocks.end();) {
272
10.4k
        auto current_file_block_it = file_block_it;
273
10.4k
        auto& file_block = *current_file_block_it;
274
10.4k
        BlockFileCache* _mgr = file_block->_mgr;
275
10.4k
        {
276
10.4k
            bool should_remove = false;
277
10.4k
            {
278
10.4k
                std::lock_guard block_lock(file_block->_mutex);
279
10.4k
                file_block->complete_unlocked(block_lock);
280
10.4k
                if (file_block.use_count() == 2 &&
281
10.4k
                    (file_block->is_deleting() ||
282
8.74k
                     file_block->state_unlock(block_lock) == FileBlock::State::EMPTY)) {
283
1.99k
                    should_remove = true;
284
1.99k
                }
285
10.4k
            }
286
10.4k
            if (should_remove) {
287
1.99k
                SCOPED_CACHE_LOCK(_mgr->_mutex, _mgr);
288
1.99k
                std::lock_guard block_lock(file_block->_mutex);
289
1.99k
                if (file_block.use_count() == 2) {
290
1.99k
                    DCHECK(file_block->state_unlock(block_lock) != FileBlock::State::DOWNLOADING);
291
                    // one in cache, one in here
292
1.99k
                    if (file_block->is_deleting() ||
293
1.99k
                        file_block->state_unlock(block_lock) == FileBlock::State::EMPTY) {
294
1.99k
                        _mgr->remove(file_block, cache_lock, block_lock, false);
295
1.99k
                    }
296
1.99k
                }
297
1.99k
            }
298
10.4k
        }
299
10.4k
        file_block_it = file_blocks.erase(current_file_block_it);
300
10.4k
    }
301
9.72k
}
302
303
} // namespace io
304
} // namespace doris