Coverage Report

Created: 2026-05-27 13:03

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/file_handle_cache.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// This file is copied from
19
// https://github.com/apache/impala/blob/master/be/src/runtime/io/handle-cache.inline.h
20
// and modified by Doris
21
22
#include "io/fs/file_handle_cache.h"
23
24
#include <cstdint>
25
#include <thread>
26
#include <tuple>
27
28
#include "common/cast_set.h"
29
#include "io/fs/err_utils.h"
30
#include "util/hash_util.hpp"
31
#include "util/time.h"
32
namespace doris::io {
33
34
0
HdfsFileHandle::~HdfsFileHandle() {
35
0
    if (_hdfs_file != nullptr && _fs != nullptr) {
36
0
        VLOG_FILE << "hdfsCloseFile() fid=" << _hdfs_file;
37
0
        hdfsCloseFile(_fs, _hdfs_file); // TODO: check return code
38
0
    }
39
0
    _fs = nullptr;
40
0
    _hdfs_file = nullptr;
41
0
}
42
43
0
Status HdfsFileHandle::init(int64_t file_size) {
44
0
    _hdfs_file = hdfsOpenFile(_fs, _fname.c_str(), O_RDONLY, 0, 0, 0);
45
0
    if (_hdfs_file == nullptr) {
46
0
        std::string _err_msg = hdfs_error();
47
        // invoker maybe just skip Status.NotFound and continue
48
        // so we need distinguish between it and other kinds of errors
49
0
        if (_err_msg.find("No such file or directory") != std::string::npos) {
50
0
            return Status::NotFound(_err_msg);
51
0
        }
52
0
        return Status::InternalError("failed to open {}: {}", _fname, _err_msg);
53
0
    }
54
55
0
    _file_size = file_size;
56
0
    if (_file_size <= 0) {
57
0
        hdfsFileInfo* file_info = hdfsGetPathInfo(_fs, _fname.c_str());
58
0
        if (file_info == nullptr) {
59
0
            return Status::InternalError("failed to get file size of {}: {}", _fname, hdfs_error());
60
0
        }
61
0
        _file_size = file_info->mSize;
62
0
        hdfsFreeFileInfo(file_info, 1);
63
0
    }
64
0
    return Status::OK();
65
0
}
66
67
CachedHdfsFileHandle::CachedHdfsFileHandle(const hdfsFS& fs, const std::string& fname,
68
                                           int64_t mtime)
69
0
        : HdfsFileHandle(fs, fname, mtime) {}
70
71
0
CachedHdfsFileHandle::~CachedHdfsFileHandle() {}
72
73
0
FileHandleCache::Accessor::Accessor() : _cache_accessor() {}
74
75
FileHandleCache::Accessor::Accessor(FileHandleCachePartition::CacheType::Accessor&& cache_accessor)
76
0
        : _cache_accessor(std::move(cache_accessor)) {}
77
78
void FileHandleCache::Accessor::set(
79
0
        FileHandleCachePartition::CacheType::Accessor&& cache_accessor) {
80
0
    _cache_accessor = std::move(cache_accessor);
81
0
}
82
83
0
CachedHdfsFileHandle* FileHandleCache::Accessor::get() {
84
0
    return _cache_accessor.get();
85
0
}
86
87
0
void FileHandleCache::Accessor::release() {
88
0
    if (_cache_accessor.get()) {
89
0
        _cache_accessor.release();
90
0
    }
91
0
}
92
93
0
void FileHandleCache::Accessor::destroy() {
94
0
    if (_cache_accessor.get()) {
95
0
        _cache_accessor.destroy();
96
0
    }
97
0
}
98
99
0
FileHandleCache::Accessor::~Accessor() {
100
0
    if (_cache_accessor.get()) {
101
0
#ifdef USE_HADOOP_HDFS
102
0
        if (hdfsUnbufferFile(get()->file()) != 0) {
103
0
            VLOG_FILE << "FS does not support file handle unbuffering, closing file="
104
0
                      << _cache_accessor.get_key()->second.first;
105
0
            destroy();
106
0
        } else {
107
            // Calling explicit release to handle metrics
108
0
            release();
109
0
        }
110
#else
111
        destroy();
112
#endif
113
0
    }
114
0
}
115
116
FileHandleCache::FileHandleCache(size_t capacity, size_t num_partitions,
117
                                 uint64_t unused_handle_timeout_secs)
118
0
        : _cache_partitions(num_partitions),
119
0
          _unused_handle_timeout_secs(unused_handle_timeout_secs) {
120
0
    DCHECK_GT(num_partitions, 0);
121
122
0
    size_t remainder = capacity % num_partitions;
123
0
    size_t base_capacity = capacity / num_partitions;
124
0
    size_t partition_capacity = (remainder > 0 ? base_capacity + 1 : base_capacity);
125
126
0
    for (FileHandleCachePartition& p : _cache_partitions) {
127
0
        p.cache.set_capacity(partition_capacity);
128
0
    }
129
0
    Status st = init();
130
0
    if (!st) {
131
0
        LOG(FATAL) << "failed to start file handle cache thread: " << st.to_string();
132
0
    }
133
0
}
134
135
0
FileHandleCache::~FileHandleCache() {
136
0
    _is_shut_down.store(true);
137
0
    if (_eviction_thread != nullptr) {
138
0
        _eviction_thread->join();
139
0
    }
140
0
}
141
142
0
Status FileHandleCache::init() {
143
0
    return Thread::create("file-handle-cache", "File Handle Timeout",
144
0
                          &FileHandleCache::_evict_handles_loop, this, &_eviction_thread);
145
0
}
146
147
Status FileHandleCache::get_file_handle(const hdfsFS& fs, const std::string& fname, int64_t mtime,
148
                                        int64_t file_size, bool require_new_handle,
149
0
                                        FileHandleCache::Accessor* accessor, bool* cache_hit) {
150
0
    DCHECK_GE(mtime, 0);
151
    // Hash the key and get appropriate partition
152
0
    uintptr_t fs_identity = reinterpret_cast<uintptr_t>(fs);
153
0
    uint32_t seed = HashUtil::hash(&fs_identity, sizeof(fs_identity), 0);
154
0
    int index = HashUtil::hash(fname.data(), cast_set<int>(fname.size()), seed) %
155
0
                _cache_partitions.size();
156
0
    FileHandleCachePartition& p = _cache_partitions[index];
157
158
0
    auto cache_key = make_cache_key(fs, fname, mtime);
159
160
    // If this requires a new handle, skip to the creation codepath. Otherwise,
161
    // find an unused entry with the same mtime
162
0
    if (!require_new_handle) {
163
0
        auto cache_accessor = p.cache.get(cache_key);
164
165
0
        if (cache_accessor.get()) {
166
            // Found a handler in cache and reserved it
167
0
            *cache_hit = true;
168
0
            accessor->set(std::move(cache_accessor));
169
0
            return Status::OK();
170
0
        }
171
0
    }
172
173
    // There was no entry that was free or caller asked for a new handle
174
0
    *cache_hit = false;
175
176
    // Emplace a new file handle and get access
177
0
    auto accessor_tmp = p.cache.emplace_and_get(cache_key, fs, fname, mtime);
178
179
    // Opening a file handle requires talking to the NameNode so it can take some time.
180
0
    Status status = accessor_tmp.get()->init(file_size);
181
0
    if (UNLIKELY(!status.ok())) {
182
        // Removing the handler from the cache after failed initialization.
183
0
        accessor_tmp.destroy();
184
0
        return status;
185
0
    }
186
187
    // Moving the cache accessor to the in/out parameter
188
0
    accessor->set(std::move(accessor_tmp));
189
190
0
    return Status::OK();
191
0
}
192
193
#ifdef BE_TEST
194
bool FileHandleCache::same_cache_key_for_test(const hdfsFS& lhs_fs, const std::string& lhs_fname,
195
                                              int64_t lhs_mtime, const hdfsFS& rhs_fs,
196
4
                                              const std::string& rhs_fname, int64_t rhs_mtime) {
197
4
    return make_cache_key(lhs_fs, lhs_fname, lhs_mtime) ==
198
4
           make_cache_key(rhs_fs, rhs_fname, rhs_mtime);
199
4
}
200
#endif
201
202
0
void FileHandleCache::_evict_handles_loop() {
203
0
    while (!_is_shut_down.load()) {
204
0
        if (_unused_handle_timeout_secs) {
205
0
            for (FileHandleCachePartition& p : _cache_partitions) {
206
0
                uint64_t now = MonotonicSeconds();
207
0
                uint64_t oldest_allowed_timestamp =
208
0
                        now > _unused_handle_timeout_secs ? now - _unused_handle_timeout_secs : 0;
209
0
                p.cache.evict_older_than(oldest_allowed_timestamp);
210
0
            }
211
0
        }
212
213
0
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
214
0
    }
215
0
}
216
217
} // namespace doris::io