be/src/io/fs/hdfs_file_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "io/fs/hdfs_file_reader.h" |
19 | | |
20 | | #include <stdint.h> |
21 | | |
22 | | #include <algorithm> |
23 | | #include <filesystem> |
24 | | #include <ostream> |
25 | | #include <utility> |
26 | | |
27 | | #include "bvar/latency_recorder.h" |
28 | | #include "bvar/reducer.h" |
29 | | #include "common/compiler_util.h" // IWYU pragma: keep |
30 | | #include "common/logging.h" |
31 | | #include "common/metrics/doris_metrics.h" |
32 | | #include "cpp/sync_point.h" |
33 | | #include "io/fs/err_utils.h" |
34 | | #include "io/hdfs_util.h" |
35 | | #include "runtime/thread_context.h" |
36 | | #include "runtime/workload_management/io_throttle.h" |
37 | | #include "service/backend_options.h" |
38 | | |
39 | | namespace doris::io { |
40 | | #include "common/compile_check_begin.h" |
41 | | |
42 | | bvar::Adder<uint64_t> hdfs_bytes_read_total("hdfs_file_reader", "bytes_read"); |
43 | | bvar::LatencyRecorder hdfs_bytes_per_read("hdfs_file_reader", "bytes_per_read"); // also QPS |
44 | | bvar::PerSecond<bvar::Adder<uint64_t>> hdfs_read_througthput("hdfs_file_reader", |
45 | | "hdfs_read_throughput", |
46 | | &hdfs_bytes_read_total); |
47 | | |
48 | | namespace { |
49 | | |
50 | | Result<FileHandleCache::Accessor> get_file(const hdfsFS& fs, const Path& file, int64_t mtime, |
51 | 0 | int64_t file_size) { |
52 | 0 | static FileHandleCache cache(config::max_hdfs_file_handle_cache_num, 16, |
53 | 0 | config::max_hdfs_file_handle_cache_time_sec); |
54 | 0 | bool cache_hit; |
55 | 0 | FileHandleCache::Accessor accessor; |
56 | 0 | RETURN_IF_ERROR_RESULT(cache.get_file_handle(fs, file.native(), mtime, file_size, false, |
57 | 0 | &accessor, &cache_hit)); |
58 | 0 | return accessor; |
59 | 0 | } |
60 | | |
61 | | } // namespace |
62 | | |
63 | | Result<FileReaderSPtr> HdfsFileReader::create(Path full_path, const hdfsFS& fs, std::string fs_name, |
64 | | const FileReaderOptions& opts, |
65 | 0 | RuntimeProfile* profile) { |
66 | 0 | auto path = convert_path(full_path, fs_name); |
67 | 0 | return get_file(fs, path, opts.mtime, opts.file_size).transform([&](auto&& accessor) { |
68 | 0 | return std::make_shared<HdfsFileReader>(std::move(path), std::move(fs_name), |
69 | 0 | std::move(accessor), profile, opts.mtime); |
70 | 0 | }); |
71 | 0 | } |
72 | | |
73 | | HdfsFileReader::HdfsFileReader(Path path, std::string fs_name, FileHandleCache::Accessor accessor, |
74 | | RuntimeProfile* profile, int64_t mtime) |
75 | 0 | : _path(std::move(path)), |
76 | 0 | _fs_name(std::move(fs_name)), |
77 | 0 | _accessor(std::move(accessor)), |
78 | 0 | _profile(profile), |
79 | 0 | _mtime(mtime) { |
80 | 0 | _handle = _accessor.get(); |
81 | |
|
82 | 0 | DorisMetrics::instance()->hdfs_file_open_reading->increment(1); |
83 | 0 | DorisMetrics::instance()->hdfs_file_reader_total->increment(1); |
84 | 0 | if (_profile != nullptr && is_hdfs(_fs_name)) { |
85 | 0 | #ifdef USE_HADOOP_HDFS |
86 | 0 | const char* hdfs_profile_name = "HdfsIO"; |
87 | 0 | ADD_TIMER(_profile, hdfs_profile_name); |
88 | 0 | _hdfs_profile.total_bytes_read = |
89 | 0 | ADD_CHILD_COUNTER(_profile, "TotalBytesRead", TUnit::BYTES, hdfs_profile_name); |
90 | 0 | _hdfs_profile.total_local_bytes_read = |
91 | 0 | ADD_CHILD_COUNTER(_profile, "TotalLocalBytesRead", TUnit::BYTES, hdfs_profile_name); |
92 | 0 | _hdfs_profile.total_short_circuit_bytes_read = ADD_CHILD_COUNTER( |
93 | 0 | _profile, "TotalShortCircuitBytesRead", TUnit::BYTES, hdfs_profile_name); |
94 | 0 | _hdfs_profile.total_total_zero_copy_bytes_read = ADD_CHILD_COUNTER( |
95 | 0 | _profile, "TotalZeroCopyBytesRead", TUnit::BYTES, hdfs_profile_name); |
96 | |
|
97 | 0 | _hdfs_profile.total_hedged_read = |
98 | 0 | ADD_CHILD_COUNTER(_profile, "TotalHedgedRead", TUnit::UNIT, hdfs_profile_name); |
99 | 0 | _hdfs_profile.hedged_read_in_cur_thread = ADD_CHILD_COUNTER( |
100 | 0 | _profile, "HedgedReadInCurThread", TUnit::UNIT, hdfs_profile_name); |
101 | 0 | _hdfs_profile.hedged_read_wins = |
102 | 0 | ADD_CHILD_COUNTER(_profile, "HedgedReadWins", TUnit::UNIT, hdfs_profile_name); |
103 | 0 | #endif |
104 | 0 | } |
105 | 0 | } |
106 | | |
107 | 0 | HdfsFileReader::~HdfsFileReader() { |
108 | 0 | static_cast<void>(close()); |
109 | 0 | } |
110 | | |
111 | 0 | Status HdfsFileReader::close() { |
112 | 0 | bool expected = false; |
113 | 0 | if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { |
114 | 0 | DorisMetrics::instance()->hdfs_file_open_reading->increment(-1); |
115 | 0 | } |
116 | 0 | return Status::OK(); |
117 | 0 | } |
118 | | |
119 | | Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, |
120 | 0 | const IOContext* io_ctx) { |
121 | 0 | auto st = do_read_at_impl(offset, result, bytes_read, io_ctx); |
122 | 0 | if (!st.ok()) { |
123 | 0 | _handle = nullptr; |
124 | 0 | _accessor.destroy(); |
125 | 0 | } |
126 | 0 | return st; |
127 | 0 | } |
128 | | |
129 | | #ifdef USE_HADOOP_HDFS |
130 | | Status HdfsFileReader::do_read_at_impl(size_t offset, Slice result, size_t* bytes_read, |
131 | 0 | const IOContext* /*io_ctx*/) { |
132 | 0 | if (closed()) [[unlikely]] { |
133 | 0 | return Status::InternalError("read closed file: {}", _path.native()); |
134 | 0 | } |
135 | | |
136 | 0 | if (_handle == nullptr) [[unlikely]] { |
137 | 0 | return Status::InternalError("cached hdfs file handle has been destroyed: {}", |
138 | 0 | _path.native()); |
139 | 0 | } |
140 | | |
141 | 0 | if (offset > _handle->file_size()) { |
142 | 0 | return Status::IOError("offset exceeds file size(offset: {}, file size: {}, path: {})", |
143 | 0 | offset, _handle->file_size(), _path.native()); |
144 | 0 | } |
145 | | |
146 | 0 | size_t bytes_req = result.size; |
147 | 0 | char* to = result.data; |
148 | 0 | bytes_req = std::min(bytes_req, (size_t)(_handle->file_size() - offset)); |
149 | 0 | *bytes_read = 0; |
150 | 0 | if (UNLIKELY(bytes_req == 0)) { |
151 | 0 | return Status::OK(); |
152 | 0 | } |
153 | | |
154 | 0 | LIMIT_REMOTE_SCAN_IO(bytes_read); |
155 | |
|
156 | 0 | size_t has_read = 0; |
157 | 0 | while (has_read < bytes_req) { |
158 | 0 | int64_t max_to_read = bytes_req - has_read; |
159 | 0 | tSize to_read = static_cast<tSize>( |
160 | 0 | std::min(max_to_read, static_cast<int64_t>(std::numeric_limits<tSize>::max()))); |
161 | 0 | tSize loop_read = hdfsPread(_handle->fs(), _handle->file(), offset + has_read, |
162 | 0 | to + has_read, to_read); |
163 | 0 | { |
164 | 0 | [[maybe_unused]] Status error_ret; |
165 | 0 | TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileReader:read_error", error_ret); |
166 | 0 | } |
167 | 0 | if (loop_read < 0) { |
168 | | // invoker maybe just skip Status.NotFound and continue |
169 | | // so we need distinguish between it and other kinds of errors |
170 | 0 | std::string _err_msg = hdfs_error(); |
171 | 0 | if (_err_msg.find("No such file or directory") != std::string::npos) { |
172 | 0 | return Status::NotFound(_err_msg); |
173 | 0 | } |
174 | 0 | return Status::InternalError( |
175 | 0 | "Read hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}", |
176 | 0 | BackendOptions::get_localhost(), _fs_name, _path.string(), _err_msg); |
177 | 0 | } |
178 | 0 | if (loop_read == 0) { |
179 | 0 | break; |
180 | 0 | } |
181 | 0 | has_read += loop_read; |
182 | 0 | } |
183 | 0 | *bytes_read = has_read; |
184 | 0 | hdfs_bytes_read_total << *bytes_read; |
185 | 0 | hdfs_bytes_per_read << *bytes_read; |
186 | 0 | return Status::OK(); |
187 | 0 | } |
188 | | |
189 | | #else |
190 | | // The hedged read only support hdfsPread(). |
191 | | // TODO: rethink here to see if there are some difference between hdfsPread() and hdfsRead() |
192 | | Status HdfsFileReader::do_read_at_impl(size_t offset, Slice result, size_t* bytes_read, |
193 | | const IOContext* /*io_ctx*/) { |
194 | | if (closed()) [[unlikely]] { |
195 | | return Status::InternalError("read closed file: ", _path.native()); |
196 | | } |
197 | | |
198 | | if (offset > _handle->file_size()) { |
199 | | return Status::IOError("offset exceeds file size(offset: {}, file size: {}, path: {})", |
200 | | offset, _handle->file_size(), _path.native()); |
201 | | } |
202 | | |
203 | | int res = hdfsSeek(_handle->fs(), _handle->file(), offset); |
204 | | if (res != 0) { |
205 | | // invoker maybe just skip Status.NotFound and continue |
206 | | // so we need distinguish between it and other kinds of errors |
207 | | std::string _err_msg = hdfs_error(); |
208 | | if (_err_msg.find("No such file or directory") != std::string::npos) { |
209 | | return Status::NotFound(_err_msg); |
210 | | } |
211 | | return Status::InternalError("Seek to offset failed. (BE: {}) offset={}, err: {}", |
212 | | BackendOptions::get_localhost(), offset, _err_msg); |
213 | | } |
214 | | |
215 | | size_t bytes_req = result.size; |
216 | | char* to = result.data; |
217 | | bytes_req = std::min(bytes_req, (size_t)(_handle->file_size() - offset)); |
218 | | *bytes_read = 0; |
219 | | if (UNLIKELY(bytes_req == 0)) { |
220 | | return Status::OK(); |
221 | | } |
222 | | |
223 | | LIMIT_REMOTE_SCAN_IO(bytes_read); |
224 | | |
225 | | size_t has_read = 0; |
226 | | while (has_read < bytes_req) { |
227 | | int64_t loop_read = hdfsRead(_handle->fs(), _handle->file(), to + has_read, |
228 | | static_cast<int32_t>(bytes_req - has_read)); |
229 | | if (loop_read < 0) { |
230 | | // invoker maybe just skip Status.NotFound and continue |
231 | | // so we need distinguish between it and other kinds of errors |
232 | | std::string _err_msg = hdfs_error(); |
233 | | if (_err_msg.find("No such file or directory") != std::string::npos) { |
234 | | return Status::NotFound(_err_msg); |
235 | | } |
236 | | return Status::InternalError( |
237 | | "Read hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}", |
238 | | BackendOptions::get_localhost(), _fs_name, _path.string(), _err_msg); |
239 | | } |
240 | | if (loop_read == 0) { |
241 | | break; |
242 | | } |
243 | | has_read += loop_read; |
244 | | } |
245 | | *bytes_read = has_read; |
246 | | hdfs_bytes_read_total << *bytes_read; |
247 | | hdfs_bytes_per_read << *bytes_read; |
248 | | return Status::OK(); |
249 | | } |
250 | | #endif |
251 | | |
252 | 0 | void HdfsFileReader::_collect_profile_before_close() { |
253 | 0 | if (_profile != nullptr && is_hdfs(_fs_name)) { |
254 | 0 | #ifdef USE_HADOOP_HDFS |
255 | 0 | if (_handle == nullptr) [[unlikely]] { |
256 | 0 | return; |
257 | 0 | } |
258 | | |
259 | 0 | struct hdfsReadStatistics* hdfs_statistics = nullptr; |
260 | 0 | auto r = hdfsFileGetReadStatistics(_handle->file(), &hdfs_statistics); |
261 | 0 | if (r != 0) { |
262 | 0 | LOG(WARNING) << "Failed to run hdfsFileGetReadStatistics(): " << r |
263 | 0 | << ", name node: " << _fs_name; |
264 | 0 | return; |
265 | 0 | } |
266 | 0 | COUNTER_UPDATE(_hdfs_profile.total_bytes_read, hdfs_statistics->totalBytesRead); |
267 | 0 | COUNTER_UPDATE(_hdfs_profile.total_local_bytes_read, hdfs_statistics->totalLocalBytesRead); |
268 | 0 | COUNTER_UPDATE(_hdfs_profile.total_short_circuit_bytes_read, |
269 | 0 | hdfs_statistics->totalShortCircuitBytesRead); |
270 | 0 | COUNTER_UPDATE(_hdfs_profile.total_total_zero_copy_bytes_read, |
271 | 0 | hdfs_statistics->totalZeroCopyBytesRead); |
272 | 0 | hdfsFileFreeReadStatistics(hdfs_statistics); |
273 | |
|
274 | 0 | struct hdfsHedgedReadMetrics* hdfs_hedged_read_statistics = nullptr; |
275 | 0 | r = hdfsGetHedgedReadMetrics(_handle->fs(), &hdfs_hedged_read_statistics); |
276 | 0 | if (r != 0) { |
277 | 0 | LOG(WARNING) << "Failed to run hdfsGetHedgedReadMetrics(): " << r |
278 | 0 | << ", name node: " << _fs_name; |
279 | 0 | return; |
280 | 0 | } |
281 | | |
282 | 0 | COUNTER_UPDATE(_hdfs_profile.total_hedged_read, hdfs_hedged_read_statistics->hedgedReadOps); |
283 | 0 | COUNTER_UPDATE(_hdfs_profile.hedged_read_in_cur_thread, |
284 | 0 | hdfs_hedged_read_statistics->hedgedReadOpsInCurThread); |
285 | 0 | COUNTER_UPDATE(_hdfs_profile.hedged_read_wins, |
286 | 0 | hdfs_hedged_read_statistics->hedgedReadOpsWin); |
287 | |
|
288 | 0 | hdfsFreeHedgedReadMetrics(hdfs_hedged_read_statistics); |
289 | 0 | hdfsFileClearReadStatistics(_handle->file()); |
290 | 0 | #endif |
291 | 0 | } |
292 | 0 | } |
293 | | #include "common/compile_check_end.h" |
294 | | |
295 | | } // namespace doris::io |