Coverage Report

Created: 2026-03-13 09:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/file_handle_cache.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// This file is copied from
19
// https://github.com/apache/impala/blob/master/be/src/runtime/io/handle-cache.inline.h
20
// and modified by Doris
21
22
#include "io/fs/file_handle_cache.h"
23
24
#include <thread>
25
#include <tuple>
26
27
#include "common/cast_set.h"
28
#include "io/fs/err_utils.h"
29
#include "util/hash_util.hpp"
30
#include "util/time.h"
31
namespace doris::io {
32
33
#include "common/compile_check_begin.h"
34
35
3.63k
HdfsFileHandle::~HdfsFileHandle() {
36
3.63k
    if (_hdfs_file != nullptr && _fs != nullptr) {
37
3.63k
        VLOG_FILE << "hdfsCloseFile() fid=" << _hdfs_file;
38
3.63k
        hdfsCloseFile(_fs, _hdfs_file); // TODO: check return code
39
3.63k
    }
40
3.63k
    _fs = nullptr;
41
3.63k
    _hdfs_file = nullptr;
42
3.63k
}
43
44
7.26k
Status HdfsFileHandle::init(int64_t file_size) {
45
7.26k
    _hdfs_file = hdfsOpenFile(_fs, _fname.c_str(), O_RDONLY, 0, 0, 0);
46
7.26k
    if (_hdfs_file == nullptr) {
47
0
        std::string _err_msg = hdfs_error();
48
        // invoker maybe just skip Status.NotFound and continue
49
        // so we need distinguish between it and other kinds of errors
50
0
        if (_err_msg.find("No such file or directory") != std::string::npos) {
51
0
            return Status::NotFound(_err_msg);
52
0
        }
53
0
        return Status::InternalError("failed to open {}: {}", _fname, _err_msg);
54
0
    }
55
56
7.26k
    _file_size = file_size;
57
7.26k
    if (_file_size <= 0) {
58
236
        hdfsFileInfo* file_info = hdfsGetPathInfo(_fs, _fname.c_str());
59
236
        if (file_info == nullptr) {
60
0
            return Status::InternalError("failed to get file size of {}: {}", _fname, hdfs_error());
61
0
        }
62
236
        _file_size = file_info->mSize;
63
236
        hdfsFreeFileInfo(file_info, 1);
64
236
    }
65
7.26k
    return Status::OK();
66
7.26k
}
67
68
CachedHdfsFileHandle::CachedHdfsFileHandle(const hdfsFS& fs, const std::string& fname,
69
                                           int64_t mtime)
70
7.26k
        : HdfsFileHandle(fs, fname, mtime) {}
71
72
CachedHdfsFileHandle::~CachedHdfsFileHandle() {}
73
74
51.5k
FileHandleCache::Accessor::Accessor() : _cache_accessor() {}
75
76
FileHandleCache::Accessor::Accessor(FileHandleCachePartition::CacheType::Accessor&& cache_accessor)
77
0
        : _cache_accessor(std::move(cache_accessor)) {}
78
79
void FileHandleCache::Accessor::set(
80
51.5k
        FileHandleCachePartition::CacheType::Accessor&& cache_accessor) {
81
51.5k
    _cache_accessor = std::move(cache_accessor);
82
51.5k
}
83
84
102k
CachedHdfsFileHandle* FileHandleCache::Accessor::get() {
85
102k
    return _cache_accessor.get();
86
102k
}
87
88
51.5k
void FileHandleCache::Accessor::release() {
89
51.5k
    if (_cache_accessor.get()) {
90
51.5k
        _cache_accessor.release();
91
51.5k
    }
92
51.5k
}
93
94
0
void FileHandleCache::Accessor::destroy() {
95
0
    if (_cache_accessor.get()) {
96
0
        _cache_accessor.destroy();
97
0
    }
98
0
}
99
100
206k
FileHandleCache::Accessor::~Accessor() {
101
206k
    if (_cache_accessor.get()) {
102
51.5k
#ifdef USE_HADOOP_HDFS
103
51.5k
        if (hdfsUnbufferFile(get()->file()) != 0) {
104
0
            VLOG_FILE << "FS does not support file handle unbuffering, closing file="
105
0
                      << _cache_accessor.get_key()->first;
106
0
            destroy();
107
51.5k
        } else {
108
            // Calling explicit release to handle metrics
109
51.5k
            release();
110
51.5k
        }
111
#else
112
        destroy();
113
#endif
114
51.5k
    }
115
206k
}
116
117
FileHandleCache::FileHandleCache(size_t capacity, size_t num_partitions,
118
                                 uint64_t unused_handle_timeout_secs)
119
2
        : _cache_partitions(num_partitions),
120
2
          _unused_handle_timeout_secs(unused_handle_timeout_secs) {
121
2
    DCHECK_GT(num_partitions, 0);
122
123
2
    size_t remainder = capacity % num_partitions;
124
2
    size_t base_capacity = capacity / num_partitions;
125
2
    size_t partition_capacity = (remainder > 0 ? base_capacity + 1 : base_capacity);
126
127
32
    for (FileHandleCachePartition& p : _cache_partitions) {
128
32
        p.cache.set_capacity(partition_capacity);
129
32
    }
130
2
    Status st = init();
131
2
    if (!st) {
132
0
        LOG(FATAL) << "failed to start file handle cache thread: " << st.to_string();
133
0
    }
134
2
}
135
136
1
FileHandleCache::~FileHandleCache() {
137
1
    _is_shut_down.store(true);
138
1
    if (_eviction_thread != nullptr) {
139
1
        _eviction_thread->join();
140
1
    }
141
1
}
142
143
2
Status FileHandleCache::init() {
144
2
    return Thread::create("file-handle-cache", "File Handle Timeout",
145
2
                          &FileHandleCache::_evict_handles_loop, this, &_eviction_thread);
146
2
}
147
148
Status FileHandleCache::get_file_handle(const hdfsFS& fs, const std::string& fname, int64_t mtime,
149
                                        int64_t file_size, bool require_new_handle,
150
51.5k
                                        FileHandleCache::Accessor* accessor, bool* cache_hit) {
151
51.5k
    DCHECK_GE(mtime, 0);
152
    // Hash the key and get appropriate partition
153
51.5k
    int index =
154
51.5k
            HashUtil::hash(fname.data(), cast_set<int>(fname.size()), 0) % _cache_partitions.size();
155
51.5k
    FileHandleCachePartition& p = _cache_partitions[index];
156
157
51.5k
    auto cache_key = std::make_pair(fname, mtime);
158
159
    // If this requires a new handle, skip to the creation codepath. Otherwise,
160
    // find an unused entry with the same mtime
161
51.5k
    if (!require_new_handle) {
162
51.5k
        auto cache_accessor = p.cache.get(cache_key);
163
164
51.5k
        if (cache_accessor.get()) {
165
            // Found a handler in cache and reserved it
166
44.2k
            *cache_hit = true;
167
44.2k
            accessor->set(std::move(cache_accessor));
168
44.2k
            return Status::OK();
169
44.2k
        }
170
51.5k
    }
171
172
    // There was no entry that was free or caller asked for a new handle
173
7.26k
    *cache_hit = false;
174
175
    // Emplace a new file handle and get access
176
7.26k
    auto accessor_tmp = p.cache.emplace_and_get(cache_key, fs, fname, mtime);
177
178
    // Opening a file handle requires talking to the NameNode so it can take some time.
179
7.26k
    Status status = accessor_tmp.get()->init(file_size);
180
7.26k
    if (UNLIKELY(!status.ok())) {
181
        // Removing the handler from the cache after failed initialization.
182
0
        accessor_tmp.destroy();
183
0
        return status;
184
0
    }
185
186
    // Moving the cache accessor to the in/out parameter
187
7.26k
    accessor->set(std::move(accessor_tmp));
188
189
7.26k
    return Status::OK();
190
7.26k
}
191
192
2
void FileHandleCache::_evict_handles_loop() {
193
4.49k
    while (!_is_shut_down.load()) {
194
4.49k
        if (_unused_handle_timeout_secs) {
195
71.8k
            for (FileHandleCachePartition& p : _cache_partitions) {
196
71.8k
                uint64_t now = MonotonicSeconds();
197
71.8k
                uint64_t oldest_allowed_timestamp =
198
71.8k
                        now > _unused_handle_timeout_secs ? now - _unused_handle_timeout_secs : 0;
199
71.8k
                p.cache.evict_older_than(oldest_allowed_timestamp);
200
71.8k
            }
201
4.49k
        }
202
203
4.49k
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
204
4.49k
    }
205
2
}
206
207
#include "common/compile_check_end.h"
208
209
} // namespace doris::io