Coverage Report

Created: 2026-05-27 13:18

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/file_handle_cache.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// This file is copied from
19
// https://github.com/apache/impala/blob/master/be/src/runtime/io/handle-cache.inline.h
20
// and modified by Doris
21
22
#include "io/fs/file_handle_cache.h"
23
24
#include <cstdint>
25
#include <thread>
26
#include <tuple>
27
28
#include "common/cast_set.h"
29
#include "io/fs/err_utils.h"
30
#include "util/hash_util.hpp"
31
#include "util/time.h"
32
namespace doris::io {
33
34
3.01k
HdfsFileHandle::~HdfsFileHandle() {
35
3.01k
    if (_hdfs_file != nullptr && _fs != nullptr) {
36
3.01k
        VLOG_FILE << "hdfsCloseFile() fid=" << _hdfs_file;
37
3.01k
        hdfsCloseFile(_fs, _hdfs_file); // TODO: check return code
38
3.01k
    }
39
3.01k
    _fs = nullptr;
40
3.01k
    _hdfs_file = nullptr;
41
3.01k
}
42
43
6.03k
Status HdfsFileHandle::init(int64_t file_size) {
44
6.03k
    _hdfs_file = hdfsOpenFile(_fs, _fname.c_str(), O_RDONLY, 0, 0, 0);
45
6.03k
    if (_hdfs_file == nullptr) {
46
0
        std::string _err_msg = hdfs_error();
47
        // invoker maybe just skip Status.NotFound and continue
48
        // so we need distinguish between it and other kinds of errors
49
0
        if (_err_msg.find("No such file or directory") != std::string::npos) {
50
0
            return Status::NotFound(_err_msg);
51
0
        }
52
0
        return Status::InternalError("failed to open {}: {}", _fname, _err_msg);
53
0
    }
54
55
6.03k
    _file_size = file_size;
56
6.03k
    if (_file_size <= 0) {
57
238
        hdfsFileInfo* file_info = hdfsGetPathInfo(_fs, _fname.c_str());
58
238
        if (file_info == nullptr) {
59
0
            return Status::InternalError("failed to get file size of {}: {}", _fname, hdfs_error());
60
0
        }
61
238
        _file_size = file_info->mSize;
62
238
        hdfsFreeFileInfo(file_info, 1);
63
238
    }
64
6.03k
    return Status::OK();
65
6.03k
}
66
67
CachedHdfsFileHandle::CachedHdfsFileHandle(const hdfsFS& fs, const std::string& fname,
68
                                           int64_t mtime)
69
6.03k
        : HdfsFileHandle(fs, fname, mtime) {}
70
71
CachedHdfsFileHandle::~CachedHdfsFileHandle() {}
72
73
40.8k
FileHandleCache::Accessor::Accessor() : _cache_accessor() {}
74
75
FileHandleCache::Accessor::Accessor(FileHandleCachePartition::CacheType::Accessor&& cache_accessor)
76
0
        : _cache_accessor(std::move(cache_accessor)) {}
77
78
void FileHandleCache::Accessor::set(
79
40.8k
        FileHandleCachePartition::CacheType::Accessor&& cache_accessor) {
80
40.8k
    _cache_accessor = std::move(cache_accessor);
81
40.8k
}
82
83
81.6k
CachedHdfsFileHandle* FileHandleCache::Accessor::get() {
84
81.6k
    return _cache_accessor.get();
85
81.6k
}
86
87
40.8k
void FileHandleCache::Accessor::release() {
88
40.8k
    if (_cache_accessor.get()) {
89
40.8k
        _cache_accessor.release();
90
40.8k
    }
91
40.8k
}
92
93
0
void FileHandleCache::Accessor::destroy() {
94
0
    if (_cache_accessor.get()) {
95
0
        _cache_accessor.destroy();
96
0
    }
97
0
}
98
99
163k
FileHandleCache::Accessor::~Accessor() {
100
163k
    if (_cache_accessor.get()) {
101
40.8k
#ifdef USE_HADOOP_HDFS
102
40.8k
        if (hdfsUnbufferFile(get()->file()) != 0) {
103
0
            VLOG_FILE << "FS does not support file handle unbuffering, closing file="
104
0
                      << _cache_accessor.get_key()->second.first;
105
0
            destroy();
106
40.8k
        } else {
107
            // Calling explicit release to handle metrics
108
40.8k
            release();
109
40.8k
        }
110
#else
111
        destroy();
112
#endif
113
40.8k
    }
114
163k
}
115
116
FileHandleCache::FileHandleCache(size_t capacity, size_t num_partitions,
117
                                 uint64_t unused_handle_timeout_secs)
118
2
        : _cache_partitions(num_partitions),
119
2
          _unused_handle_timeout_secs(unused_handle_timeout_secs) {
120
2
    DCHECK_GT(num_partitions, 0);
121
122
2
    size_t remainder = capacity % num_partitions;
123
2
    size_t base_capacity = capacity / num_partitions;
124
2
    size_t partition_capacity = (remainder > 0 ? base_capacity + 1 : base_capacity);
125
126
32
    for (FileHandleCachePartition& p : _cache_partitions) {
127
32
        p.cache.set_capacity(partition_capacity);
128
32
    }
129
2
    Status st = init();
130
2
    if (!st) {
131
0
        LOG(FATAL) << "failed to start file handle cache thread: " << st.to_string();
132
0
    }
133
2
}
134
135
1
FileHandleCache::~FileHandleCache() {
136
1
    _is_shut_down.store(true);
137
1
    if (_eviction_thread != nullptr) {
138
1
        _eviction_thread->join();
139
1
    }
140
1
}
141
142
2
Status FileHandleCache::init() {
143
2
    return Thread::create("file-handle-cache", "File Handle Timeout",
144
2
                          &FileHandleCache::_evict_handles_loop, this, &_eviction_thread);
145
2
}
146
147
Status FileHandleCache::get_file_handle(const hdfsFS& fs, const std::string& fname, int64_t mtime,
148
                                        int64_t file_size, bool require_new_handle,
149
40.8k
                                        FileHandleCache::Accessor* accessor, bool* cache_hit) {
150
40.8k
    DCHECK_GE(mtime, 0);
151
    // Hash the key and get appropriate partition
152
40.8k
    uintptr_t fs_identity = reinterpret_cast<uintptr_t>(fs);
153
40.8k
    uint32_t seed = HashUtil::hash(&fs_identity, sizeof(fs_identity), 0);
154
40.8k
    int index = HashUtil::hash(fname.data(), cast_set<int>(fname.size()), seed) %
155
40.8k
                _cache_partitions.size();
156
40.8k
    FileHandleCachePartition& p = _cache_partitions[index];
157
158
40.8k
    auto cache_key = make_cache_key(fs, fname, mtime);
159
160
    // If this requires a new handle, skip to the creation codepath. Otherwise,
161
    // find an unused entry with the same mtime
162
40.8k
    if (!require_new_handle) {
163
40.8k
        auto cache_accessor = p.cache.get(cache_key);
164
165
40.8k
        if (cache_accessor.get()) {
166
            // Found a handler in cache and reserved it
167
34.8k
            *cache_hit = true;
168
34.8k
            accessor->set(std::move(cache_accessor));
169
34.8k
            return Status::OK();
170
34.8k
        }
171
40.8k
    }
172
173
    // There was no entry that was free or caller asked for a new handle
174
6.03k
    *cache_hit = false;
175
176
    // Emplace a new file handle and get access
177
6.03k
    auto accessor_tmp = p.cache.emplace_and_get(cache_key, fs, fname, mtime);
178
179
    // Opening a file handle requires talking to the NameNode so it can take some time.
180
6.03k
    Status status = accessor_tmp.get()->init(file_size);
181
6.03k
    if (UNLIKELY(!status.ok())) {
182
        // Removing the handler from the cache after failed initialization.
183
0
        accessor_tmp.destroy();
184
0
        return status;
185
0
    }
186
187
    // Moving the cache accessor to the in/out parameter
188
6.03k
    accessor->set(std::move(accessor_tmp));
189
190
6.03k
    return Status::OK();
191
6.03k
}
192
193
#ifdef BE_TEST
194
bool FileHandleCache::same_cache_key_for_test(const hdfsFS& lhs_fs, const std::string& lhs_fname,
195
                                              int64_t lhs_mtime, const hdfsFS& rhs_fs,
196
                                              const std::string& rhs_fname, int64_t rhs_mtime) {
197
    return make_cache_key(lhs_fs, lhs_fname, lhs_mtime) ==
198
           make_cache_key(rhs_fs, rhs_fname, rhs_mtime);
199
}
200
#endif
201
202
2
void FileHandleCache::_evict_handles_loop() {
203
5.51k
    while (!_is_shut_down.load()) {
204
5.50k
        if (_unused_handle_timeout_secs) {
205
88.1k
            for (FileHandleCachePartition& p : _cache_partitions) {
206
88.1k
                uint64_t now = MonotonicSeconds();
207
88.1k
                uint64_t oldest_allowed_timestamp =
208
88.1k
                        now > _unused_handle_timeout_secs ? now - _unused_handle_timeout_secs : 0;
209
88.1k
                p.cache.evict_older_than(oldest_allowed_timestamp);
210
88.1k
            }
211
5.50k
        }
212
213
5.50k
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
214
5.50k
    }
215
2
}
216
217
} // namespace doris::io