Coverage Report

Created: 2026-03-15 01:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/hdfs_file_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/hdfs_file_reader.h"
19
20
#include <stdint.h>
21
22
#include <algorithm>
23
#include <filesystem>
24
#include <ostream>
25
#include <utility>
26
27
#include "bvar/latency_recorder.h"
28
#include "bvar/reducer.h"
29
#include "common/compiler_util.h" // IWYU pragma: keep
30
#include "common/logging.h"
31
#include "common/metrics/doris_metrics.h"
32
#include "cpp/sync_point.h"
33
#include "io/fs/err_utils.h"
34
#include "io/hdfs_util.h"
35
#include "runtime/thread_context.h"
36
#include "runtime/workload_management/io_throttle.h"
37
#include "service/backend_options.h"
38
39
namespace doris::io {
40
#include "common/compile_check_begin.h"
41
42
bvar::Adder<uint64_t> hdfs_bytes_read_total("hdfs_file_reader", "bytes_read");
43
bvar::LatencyRecorder hdfs_bytes_per_read("hdfs_file_reader", "bytes_per_read"); // also QPS
44
bvar::PerSecond<bvar::Adder<uint64_t>> hdfs_read_througthput("hdfs_file_reader",
45
                                                             "hdfs_read_throughput",
46
                                                             &hdfs_bytes_read_total);
47
48
namespace {
49
50
Result<FileHandleCache::Accessor> get_file(const hdfsFS& fs, const Path& file, int64_t mtime,
51
0
                                           int64_t file_size) {
52
0
    static FileHandleCache cache(config::max_hdfs_file_handle_cache_num, 16,
53
0
                                 config::max_hdfs_file_handle_cache_time_sec);
54
0
    bool cache_hit;
55
0
    FileHandleCache::Accessor accessor;
56
0
    RETURN_IF_ERROR_RESULT(cache.get_file_handle(fs, file.native(), mtime, file_size, false,
57
0
                                                 &accessor, &cache_hit));
58
0
    return accessor;
59
0
}
60
61
} // namespace
62
63
Result<FileReaderSPtr> HdfsFileReader::create(Path full_path, const hdfsFS& fs, std::string fs_name,
64
                                              const FileReaderOptions& opts,
65
0
                                              RuntimeProfile* profile) {
66
0
    auto path = convert_path(full_path, fs_name);
67
0
    return get_file(fs, path, opts.mtime, opts.file_size).transform([&](auto&& accessor) {
68
0
        return std::make_shared<HdfsFileReader>(std::move(path), std::move(fs_name),
69
0
                                                std::move(accessor), profile, opts.mtime);
70
0
    });
71
0
}
72
73
HdfsFileReader::HdfsFileReader(Path path, std::string fs_name, FileHandleCache::Accessor accessor,
74
                               RuntimeProfile* profile, int64_t mtime)
75
0
        : _path(std::move(path)),
76
0
          _fs_name(std::move(fs_name)),
77
0
          _accessor(std::move(accessor)),
78
0
          _profile(profile),
79
0
          _mtime(mtime) {
80
0
    _handle = _accessor.get();
81
82
0
    DorisMetrics::instance()->hdfs_file_open_reading->increment(1);
83
0
    DorisMetrics::instance()->hdfs_file_reader_total->increment(1);
84
0
    if (_profile != nullptr && is_hdfs(_fs_name)) {
85
0
#ifdef USE_HADOOP_HDFS
86
0
        const char* hdfs_profile_name = "HdfsIO";
87
0
        ADD_TIMER(_profile, hdfs_profile_name);
88
0
        _hdfs_profile.total_bytes_read =
89
0
                ADD_CHILD_COUNTER(_profile, "TotalBytesRead", TUnit::BYTES, hdfs_profile_name);
90
0
        _hdfs_profile.total_local_bytes_read =
91
0
                ADD_CHILD_COUNTER(_profile, "TotalLocalBytesRead", TUnit::BYTES, hdfs_profile_name);
92
0
        _hdfs_profile.total_short_circuit_bytes_read = ADD_CHILD_COUNTER(
93
0
                _profile, "TotalShortCircuitBytesRead", TUnit::BYTES, hdfs_profile_name);
94
0
        _hdfs_profile.total_total_zero_copy_bytes_read = ADD_CHILD_COUNTER(
95
0
                _profile, "TotalZeroCopyBytesRead", TUnit::BYTES, hdfs_profile_name);
96
97
0
        _hdfs_profile.total_hedged_read =
98
0
                ADD_CHILD_COUNTER(_profile, "TotalHedgedRead", TUnit::UNIT, hdfs_profile_name);
99
0
        _hdfs_profile.hedged_read_in_cur_thread = ADD_CHILD_COUNTER(
100
0
                _profile, "HedgedReadInCurThread", TUnit::UNIT, hdfs_profile_name);
101
0
        _hdfs_profile.hedged_read_wins =
102
0
                ADD_CHILD_COUNTER(_profile, "HedgedReadWins", TUnit::UNIT, hdfs_profile_name);
103
0
#endif
104
0
    }
105
0
}
106
107
0
HdfsFileReader::~HdfsFileReader() {
108
0
    static_cast<void>(close());
109
0
}
110
111
0
Status HdfsFileReader::close() {
112
0
    bool expected = false;
113
0
    if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
114
0
        DorisMetrics::instance()->hdfs_file_open_reading->increment(-1);
115
0
    }
116
0
    return Status::OK();
117
0
}
118
119
Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
120
0
                                    const IOContext* io_ctx) {
121
0
    auto st = do_read_at_impl(offset, result, bytes_read, io_ctx);
122
0
    if (!st.ok()) {
123
0
        _handle = nullptr;
124
0
        _accessor.destroy();
125
0
    }
126
0
    return st;
127
0
}
128
129
#ifdef USE_HADOOP_HDFS
130
Status HdfsFileReader::do_read_at_impl(size_t offset, Slice result, size_t* bytes_read,
131
0
                                       const IOContext* /*io_ctx*/) {
132
0
    if (closed()) [[unlikely]] {
133
0
        return Status::InternalError("read closed file: {}", _path.native());
134
0
    }
135
136
0
    if (_handle == nullptr) [[unlikely]] {
137
0
        return Status::InternalError("cached hdfs file handle has been destroyed: {}",
138
0
                                     _path.native());
139
0
    }
140
141
0
    if (offset > _handle->file_size()) {
142
0
        return Status::IOError("offset exceeds file size(offset: {}, file size: {}, path: {})",
143
0
                               offset, _handle->file_size(), _path.native());
144
0
    }
145
146
0
    size_t bytes_req = result.size;
147
0
    char* to = result.data;
148
0
    bytes_req = std::min(bytes_req, (size_t)(_handle->file_size() - offset));
149
0
    *bytes_read = 0;
150
0
    if (UNLIKELY(bytes_req == 0)) {
151
0
        return Status::OK();
152
0
    }
153
154
0
    LIMIT_REMOTE_SCAN_IO(bytes_read);
155
156
0
    size_t has_read = 0;
157
0
    while (has_read < bytes_req) {
158
0
        int64_t max_to_read = bytes_req - has_read;
159
0
        tSize to_read = static_cast<tSize>(
160
0
                std::min(max_to_read, static_cast<int64_t>(std::numeric_limits<tSize>::max())));
161
0
        tSize loop_read = hdfsPread(_handle->fs(), _handle->file(), offset + has_read,
162
0
                                    to + has_read, to_read);
163
0
        {
164
0
            [[maybe_unused]] Status error_ret;
165
0
            TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileReader:read_error", error_ret);
166
0
        }
167
0
        if (loop_read < 0) {
168
            // invoker maybe just skip Status.NotFound and continue
169
            // so we need distinguish between it and other kinds of errors
170
0
            std::string _err_msg = hdfs_error();
171
0
            if (_err_msg.find("No such file or directory") != std::string::npos) {
172
0
                return Status::NotFound(_err_msg);
173
0
            }
174
0
            return Status::InternalError(
175
0
                    "Read hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}",
176
0
                    BackendOptions::get_localhost(), _fs_name, _path.string(), _err_msg);
177
0
        }
178
0
        if (loop_read == 0) {
179
0
            break;
180
0
        }
181
0
        has_read += loop_read;
182
0
    }
183
0
    *bytes_read = has_read;
184
0
    hdfs_bytes_read_total << *bytes_read;
185
0
    hdfs_bytes_per_read << *bytes_read;
186
0
    return Status::OK();
187
0
}
188
189
#else
190
// The hedged read only support hdfsPread().
191
// TODO: rethink here to see if there are some difference between hdfsPread() and hdfsRead()
192
Status HdfsFileReader::do_read_at_impl(size_t offset, Slice result, size_t* bytes_read,
193
                                       const IOContext* /*io_ctx*/) {
194
    if (closed()) [[unlikely]] {
195
        return Status::InternalError("read closed file: ", _path.native());
196
    }
197
198
    if (offset > _handle->file_size()) {
199
        return Status::IOError("offset exceeds file size(offset: {}, file size: {}, path: {})",
200
                               offset, _handle->file_size(), _path.native());
201
    }
202
203
    int res = hdfsSeek(_handle->fs(), _handle->file(), offset);
204
    if (res != 0) {
205
        // invoker maybe just skip Status.NotFound and continue
206
        // so we need distinguish between it and other kinds of errors
207
        std::string _err_msg = hdfs_error();
208
        if (_err_msg.find("No such file or directory") != std::string::npos) {
209
            return Status::NotFound(_err_msg);
210
        }
211
        return Status::InternalError("Seek to offset failed. (BE: {}) offset={}, err: {}",
212
                                     BackendOptions::get_localhost(), offset, _err_msg);
213
    }
214
215
    size_t bytes_req = result.size;
216
    char* to = result.data;
217
    bytes_req = std::min(bytes_req, (size_t)(_handle->file_size() - offset));
218
    *bytes_read = 0;
219
    if (UNLIKELY(bytes_req == 0)) {
220
        return Status::OK();
221
    }
222
223
    LIMIT_REMOTE_SCAN_IO(bytes_read);
224
225
    size_t has_read = 0;
226
    while (has_read < bytes_req) {
227
        int64_t loop_read = hdfsRead(_handle->fs(), _handle->file(), to + has_read,
228
                                     static_cast<int32_t>(bytes_req - has_read));
229
        if (loop_read < 0) {
230
            // invoker maybe just skip Status.NotFound and continue
231
            // so we need distinguish between it and other kinds of errors
232
            std::string _err_msg = hdfs_error();
233
            if (_err_msg.find("No such file or directory") != std::string::npos) {
234
                return Status::NotFound(_err_msg);
235
            }
236
            return Status::InternalError(
237
                    "Read hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}",
238
                    BackendOptions::get_localhost(), _fs_name, _path.string(), _err_msg);
239
        }
240
        if (loop_read == 0) {
241
            break;
242
        }
243
        has_read += loop_read;
244
    }
245
    *bytes_read = has_read;
246
    hdfs_bytes_read_total << *bytes_read;
247
    hdfs_bytes_per_read << *bytes_read;
248
    return Status::OK();
249
}
250
#endif
251
252
0
void HdfsFileReader::_collect_profile_before_close() {
253
0
    if (_profile != nullptr && is_hdfs(_fs_name)) {
254
0
#ifdef USE_HADOOP_HDFS
255
0
        if (_handle == nullptr) [[unlikely]] {
256
0
            return;
257
0
        }
258
259
0
        struct hdfsReadStatistics* hdfs_statistics = nullptr;
260
0
        auto r = hdfsFileGetReadStatistics(_handle->file(), &hdfs_statistics);
261
0
        if (r != 0) {
262
0
            LOG(WARNING) << "Failed to run hdfsFileGetReadStatistics(): " << r
263
0
                         << ", name node: " << _fs_name;
264
0
            return;
265
0
        }
266
0
        COUNTER_UPDATE(_hdfs_profile.total_bytes_read, hdfs_statistics->totalBytesRead);
267
0
        COUNTER_UPDATE(_hdfs_profile.total_local_bytes_read, hdfs_statistics->totalLocalBytesRead);
268
0
        COUNTER_UPDATE(_hdfs_profile.total_short_circuit_bytes_read,
269
0
                       hdfs_statistics->totalShortCircuitBytesRead);
270
0
        COUNTER_UPDATE(_hdfs_profile.total_total_zero_copy_bytes_read,
271
0
                       hdfs_statistics->totalZeroCopyBytesRead);
272
0
        hdfsFileFreeReadStatistics(hdfs_statistics);
273
274
0
        struct hdfsHedgedReadMetrics* hdfs_hedged_read_statistics = nullptr;
275
0
        r = hdfsGetHedgedReadMetrics(_handle->fs(), &hdfs_hedged_read_statistics);
276
0
        if (r != 0) {
277
0
            LOG(WARNING) << "Failed to run hdfsGetHedgedReadMetrics(): " << r
278
0
                         << ", name node: " << _fs_name;
279
0
            return;
280
0
        }
281
282
0
        COUNTER_UPDATE(_hdfs_profile.total_hedged_read, hdfs_hedged_read_statistics->hedgedReadOps);
283
0
        COUNTER_UPDATE(_hdfs_profile.hedged_read_in_cur_thread,
284
0
                       hdfs_hedged_read_statistics->hedgedReadOpsInCurThread);
285
0
        COUNTER_UPDATE(_hdfs_profile.hedged_read_wins,
286
0
                       hdfs_hedged_read_statistics->hedgedReadOpsWin);
287
288
0
        hdfsFreeHedgedReadMetrics(hdfs_hedged_read_statistics);
289
0
        hdfsFileClearReadStatistics(_handle->file());
290
0
#endif
291
0
    }
292
0
}
293
#include "common/compile_check_end.h"
294
295
} // namespace doris::io