Coverage Report

Created: 2026-04-15 07:49

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/file_handle_cache.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// This file is copied from
19
// https://github.com/apache/impala/blob/master/be/src/runtime/io/handle-cache.inline.h
20
// and modified by Doris
21
22
#include "io/fs/file_handle_cache.h"
23
24
#include <thread>
25
#include <tuple>
26
27
#include "common/cast_set.h"
28
#include "io/fs/err_utils.h"
29
#include "util/hash_util.hpp"
30
#include "util/time.h"
31
namespace doris::io {
32
33
2.99k
HdfsFileHandle::~HdfsFileHandle() {
34
2.99k
    if (_hdfs_file != nullptr && _fs != nullptr) {
35
2.99k
        VLOG_FILE << "hdfsCloseFile() fid=" << _hdfs_file;
36
2.99k
        hdfsCloseFile(_fs, _hdfs_file); // TODO: check return code
37
2.99k
    }
38
2.99k
    _fs = nullptr;
39
2.99k
    _hdfs_file = nullptr;
40
2.99k
}
41
42
5.99k
Status HdfsFileHandle::init(int64_t file_size) {
43
5.99k
    _hdfs_file = hdfsOpenFile(_fs, _fname.c_str(), O_RDONLY, 0, 0, 0);
44
5.99k
    if (_hdfs_file == nullptr) {
45
0
        std::string _err_msg = hdfs_error();
46
        // invoker maybe just skip Status.NotFound and continue
47
        // so we need distinguish between it and other kinds of errors
48
0
        if (_err_msg.find("No such file or directory") != std::string::npos) {
49
0
            return Status::NotFound(_err_msg);
50
0
        }
51
0
        return Status::InternalError("failed to open {}: {}", _fname, _err_msg);
52
0
    }
53
54
5.99k
    _file_size = file_size;
55
5.99k
    if (_file_size <= 0) {
56
238
        hdfsFileInfo* file_info = hdfsGetPathInfo(_fs, _fname.c_str());
57
238
        if (file_info == nullptr) {
58
0
            return Status::InternalError("failed to get file size of {}: {}", _fname, hdfs_error());
59
0
        }
60
238
        _file_size = file_info->mSize;
61
238
        hdfsFreeFileInfo(file_info, 1);
62
238
    }
63
5.99k
    return Status::OK();
64
5.99k
}
65
66
CachedHdfsFileHandle::CachedHdfsFileHandle(const hdfsFS& fs, const std::string& fname,
67
                                           int64_t mtime)
68
5.99k
        : HdfsFileHandle(fs, fname, mtime) {}
69
70
CachedHdfsFileHandle::~CachedHdfsFileHandle() {}
71
72
41.0k
FileHandleCache::Accessor::Accessor() : _cache_accessor() {}
73
74
FileHandleCache::Accessor::Accessor(FileHandleCachePartition::CacheType::Accessor&& cache_accessor)
75
0
        : _cache_accessor(std::move(cache_accessor)) {}
76
77
void FileHandleCache::Accessor::set(
78
40.9k
        FileHandleCachePartition::CacheType::Accessor&& cache_accessor) {
79
40.9k
    _cache_accessor = std::move(cache_accessor);
80
40.9k
}
81
82
81.9k
CachedHdfsFileHandle* FileHandleCache::Accessor::get() {
83
81.9k
    return _cache_accessor.get();
84
81.9k
}
85
86
40.9k
void FileHandleCache::Accessor::release() {
87
40.9k
    if (_cache_accessor.get()) {
88
40.9k
        _cache_accessor.release();
89
40.9k
    }
90
40.9k
}
91
92
0
void FileHandleCache::Accessor::destroy() {
93
0
    if (_cache_accessor.get()) {
94
0
        _cache_accessor.destroy();
95
0
    }
96
0
}
97
98
163k
FileHandleCache::Accessor::~Accessor() {
99
163k
    if (_cache_accessor.get()) {
100
40.9k
#ifdef USE_HADOOP_HDFS
101
40.9k
        if (hdfsUnbufferFile(get()->file()) != 0) {
102
0
            VLOG_FILE << "FS does not support file handle unbuffering, closing file="
103
0
                      << _cache_accessor.get_key()->first;
104
0
            destroy();
105
40.9k
        } else {
106
            // Calling explicit release to handle metrics
107
40.9k
            release();
108
40.9k
        }
109
#else
110
        destroy();
111
#endif
112
40.9k
    }
113
163k
}
114
115
FileHandleCache::FileHandleCache(size_t capacity, size_t num_partitions,
116
                                 uint64_t unused_handle_timeout_secs)
117
2
        : _cache_partitions(num_partitions),
118
2
          _unused_handle_timeout_secs(unused_handle_timeout_secs) {
119
2
    DCHECK_GT(num_partitions, 0);
120
121
2
    size_t remainder = capacity % num_partitions;
122
2
    size_t base_capacity = capacity / num_partitions;
123
2
    size_t partition_capacity = (remainder > 0 ? base_capacity + 1 : base_capacity);
124
125
32
    for (FileHandleCachePartition& p : _cache_partitions) {
126
32
        p.cache.set_capacity(partition_capacity);
127
32
    }
128
2
    Status st = init();
129
2
    if (!st) {
130
0
        LOG(FATAL) << "failed to start file handle cache thread: " << st.to_string();
131
0
    }
132
2
}
133
134
1
FileHandleCache::~FileHandleCache() {
135
1
    _is_shut_down.store(true);
136
1
    if (_eviction_thread != nullptr) {
137
1
        _eviction_thread->join();
138
1
    }
139
1
}
140
141
2
Status FileHandleCache::init() {
142
2
    return Thread::create("file-handle-cache", "File Handle Timeout",
143
2
                          &FileHandleCache::_evict_handles_loop, this, &_eviction_thread);
144
2
}
145
146
Status FileHandleCache::get_file_handle(const hdfsFS& fs, const std::string& fname, int64_t mtime,
147
                                        int64_t file_size, bool require_new_handle,
148
41.0k
                                        FileHandleCache::Accessor* accessor, bool* cache_hit) {
149
41.0k
    DCHECK_GE(mtime, 0);
150
    // Hash the key and get appropriate partition
151
41.0k
    int index =
152
41.0k
            HashUtil::hash(fname.data(), cast_set<int>(fname.size()), 0) % _cache_partitions.size();
153
41.0k
    FileHandleCachePartition& p = _cache_partitions[index];
154
155
41.0k
    auto cache_key = std::make_pair(fname, mtime);
156
157
    // If this requires a new handle, skip to the creation codepath. Otherwise,
158
    // find an unused entry with the same mtime
159
41.0k
    if (!require_new_handle) {
160
41.0k
        auto cache_accessor = p.cache.get(cache_key);
161
162
41.0k
        if (cache_accessor.get()) {
163
            // Found a handler in cache and reserved it
164
35.0k
            *cache_hit = true;
165
35.0k
            accessor->set(std::move(cache_accessor));
166
35.0k
            return Status::OK();
167
35.0k
        }
168
41.0k
    }
169
170
    // There was no entry that was free or caller asked for a new handle
171
5.99k
    *cache_hit = false;
172
173
    // Emplace a new file handle and get access
174
5.99k
    auto accessor_tmp = p.cache.emplace_and_get(cache_key, fs, fname, mtime);
175
176
    // Opening a file handle requires talking to the NameNode so it can take some time.
177
5.99k
    Status status = accessor_tmp.get()->init(file_size);
178
5.99k
    if (UNLIKELY(!status.ok())) {
179
        // Removing the handler from the cache after failed initialization.
180
0
        accessor_tmp.destroy();
181
0
        return status;
182
0
    }
183
184
    // Moving the cache accessor to the in/out parameter
185
5.99k
    accessor->set(std::move(accessor_tmp));
186
187
5.99k
    return Status::OK();
188
5.99k
}
189
190
2
void FileHandleCache::_evict_handles_loop() {
191
4.87k
    while (!_is_shut_down.load()) {
192
4.87k
        if (_unused_handle_timeout_secs) {
193
78.0k
            for (FileHandleCachePartition& p : _cache_partitions) {
194
78.0k
                uint64_t now = MonotonicSeconds();
195
78.0k
                uint64_t oldest_allowed_timestamp =
196
78.0k
                        now > _unused_handle_timeout_secs ? now - _unused_handle_timeout_secs : 0;
197
78.0k
                p.cache.evict_older_than(oldest_allowed_timestamp);
198
78.0k
            }
199
4.87k
        }
200
201
4.87k
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
202
4.87k
    }
203
2
}
204
205
} // namespace doris::io