Coverage Report

Created: 2026-04-14 05:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/file_handle_cache.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// This file is copied from
19
// https://github.com/apache/impala/blob/master/be/src/runtime/io/handle-cache.inline.h
20
// and modified by Doris
21
22
#include "io/fs/file_handle_cache.h"
23
24
#include <thread>
25
#include <tuple>
26
27
#include "common/cast_set.h"
28
#include "io/fs/err_utils.h"
29
#include "util/hash_util.hpp"
30
#include "util/time.h"
31
namespace doris::io {
32
33
0
HdfsFileHandle::~HdfsFileHandle() {
34
0
    if (_hdfs_file != nullptr && _fs != nullptr) {
35
0
        VLOG_FILE << "hdfsCloseFile() fid=" << _hdfs_file;
36
0
        hdfsCloseFile(_fs, _hdfs_file); // TODO: check return code
37
0
    }
38
0
    _fs = nullptr;
39
0
    _hdfs_file = nullptr;
40
0
}
41
42
0
Status HdfsFileHandle::init(int64_t file_size) {
43
0
    _hdfs_file = hdfsOpenFile(_fs, _fname.c_str(), O_RDONLY, 0, 0, 0);
44
0
    if (_hdfs_file == nullptr) {
45
0
        std::string _err_msg = hdfs_error();
46
        // invoker maybe just skip Status.NotFound and continue
47
        // so we need distinguish between it and other kinds of errors
48
0
        if (_err_msg.find("No such file or directory") != std::string::npos) {
49
0
            return Status::NotFound(_err_msg);
50
0
        }
51
0
        return Status::InternalError("failed to open {}: {}", _fname, _err_msg);
52
0
    }
53
54
0
    _file_size = file_size;
55
0
    if (_file_size <= 0) {
56
0
        hdfsFileInfo* file_info = hdfsGetPathInfo(_fs, _fname.c_str());
57
0
        if (file_info == nullptr) {
58
0
            return Status::InternalError("failed to get file size of {}: {}", _fname, hdfs_error());
59
0
        }
60
0
        _file_size = file_info->mSize;
61
0
        hdfsFreeFileInfo(file_info, 1);
62
0
    }
63
0
    return Status::OK();
64
0
}
65
66
CachedHdfsFileHandle::CachedHdfsFileHandle(const hdfsFS& fs, const std::string& fname,
67
                                           int64_t mtime)
68
0
        : HdfsFileHandle(fs, fname, mtime) {}
69
70
CachedHdfsFileHandle::~CachedHdfsFileHandle() {}
71
72
0
FileHandleCache::Accessor::Accessor() : _cache_accessor() {}
73
74
FileHandleCache::Accessor::Accessor(FileHandleCachePartition::CacheType::Accessor&& cache_accessor)
75
0
        : _cache_accessor(std::move(cache_accessor)) {}
76
77
void FileHandleCache::Accessor::set(
78
0
        FileHandleCachePartition::CacheType::Accessor&& cache_accessor) {
79
0
    _cache_accessor = std::move(cache_accessor);
80
0
}
81
82
0
CachedHdfsFileHandle* FileHandleCache::Accessor::get() {
83
0
    return _cache_accessor.get();
84
0
}
85
86
0
void FileHandleCache::Accessor::release() {
87
0
    if (_cache_accessor.get()) {
88
0
        _cache_accessor.release();
89
0
    }
90
0
}
91
92
0
void FileHandleCache::Accessor::destroy() {
93
0
    if (_cache_accessor.get()) {
94
0
        _cache_accessor.destroy();
95
0
    }
96
0
}
97
98
0
FileHandleCache::Accessor::~Accessor() {
99
0
    if (_cache_accessor.get()) {
100
0
#ifdef USE_HADOOP_HDFS
101
0
        if (hdfsUnbufferFile(get()->file()) != 0) {
102
0
            VLOG_FILE << "FS does not support file handle unbuffering, closing file="
103
0
                      << _cache_accessor.get_key()->first;
104
0
            destroy();
105
0
        } else {
106
            // Calling explicit release to handle metrics
107
0
            release();
108
0
        }
109
#else
110
        destroy();
111
#endif
112
0
    }
113
0
}
114
115
FileHandleCache::FileHandleCache(size_t capacity, size_t num_partitions,
116
                                 uint64_t unused_handle_timeout_secs)
117
0
        : _cache_partitions(num_partitions),
118
0
          _unused_handle_timeout_secs(unused_handle_timeout_secs) {
119
0
    DCHECK_GT(num_partitions, 0);
120
121
0
    size_t remainder = capacity % num_partitions;
122
0
    size_t base_capacity = capacity / num_partitions;
123
0
    size_t partition_capacity = (remainder > 0 ? base_capacity + 1 : base_capacity);
124
125
0
    for (FileHandleCachePartition& p : _cache_partitions) {
126
0
        p.cache.set_capacity(partition_capacity);
127
0
    }
128
0
    Status st = init();
129
0
    if (!st) {
130
0
        LOG(FATAL) << "failed to start file handle cache thread: " << st.to_string();
131
0
    }
132
0
}
133
134
0
FileHandleCache::~FileHandleCache() {
135
0
    _is_shut_down.store(true);
136
0
    if (_eviction_thread != nullptr) {
137
0
        _eviction_thread->join();
138
0
    }
139
0
}
140
141
0
Status FileHandleCache::init() {
142
0
    return Thread::create("file-handle-cache", "File Handle Timeout",
143
0
                          &FileHandleCache::_evict_handles_loop, this, &_eviction_thread);
144
0
}
145
146
Status FileHandleCache::get_file_handle(const hdfsFS& fs, const std::string& fname, int64_t mtime,
147
                                        int64_t file_size, bool require_new_handle,
148
0
                                        FileHandleCache::Accessor* accessor, bool* cache_hit) {
149
0
    DCHECK_GE(mtime, 0);
150
    // Hash the key and get appropriate partition
151
0
    int index =
152
0
            HashUtil::hash(fname.data(), cast_set<int>(fname.size()), 0) % _cache_partitions.size();
153
0
    FileHandleCachePartition& p = _cache_partitions[index];
154
155
0
    auto cache_key = std::make_pair(fname, mtime);
156
157
    // If this requires a new handle, skip to the creation codepath. Otherwise,
158
    // find an unused entry with the same mtime
159
0
    if (!require_new_handle) {
160
0
        auto cache_accessor = p.cache.get(cache_key);
161
162
0
        if (cache_accessor.get()) {
163
            // Found a handler in cache and reserved it
164
0
            *cache_hit = true;
165
0
            accessor->set(std::move(cache_accessor));
166
0
            return Status::OK();
167
0
        }
168
0
    }
169
170
    // There was no entry that was free or caller asked for a new handle
171
0
    *cache_hit = false;
172
173
    // Emplace a new file handle and get access
174
0
    auto accessor_tmp = p.cache.emplace_and_get(cache_key, fs, fname, mtime);
175
176
    // Opening a file handle requires talking to the NameNode so it can take some time.
177
0
    Status status = accessor_tmp.get()->init(file_size);
178
0
    if (UNLIKELY(!status.ok())) {
179
        // Removing the handler from the cache after failed initialization.
180
0
        accessor_tmp.destroy();
181
0
        return status;
182
0
    }
183
184
    // Moving the cache accessor to the in/out parameter
185
0
    accessor->set(std::move(accessor_tmp));
186
187
0
    return Status::OK();
188
0
}
189
190
0
void FileHandleCache::_evict_handles_loop() {
191
0
    while (!_is_shut_down.load()) {
192
0
        if (_unused_handle_timeout_secs) {
193
0
            for (FileHandleCachePartition& p : _cache_partitions) {
194
0
                uint64_t now = MonotonicSeconds();
195
0
                uint64_t oldest_allowed_timestamp =
196
0
                        now > _unused_handle_timeout_secs ? now - _unused_handle_timeout_secs : 0;
197
0
                p.cache.evict_older_than(oldest_allowed_timestamp);
198
0
            }
199
0
        }
200
201
0
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
202
0
    }
203
0
}
204
205
} // namespace doris::io