Coverage Report

Created: 2026-04-16 10:20

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/hdfs_file_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/hdfs_file_reader.h"
19
20
#include <stdint.h>
21
22
#include <algorithm>
23
#include <filesystem>
24
#include <ostream>
25
#include <utility>
26
27
#include "bvar/latency_recorder.h"
28
#include "bvar/reducer.h"
29
#include "common/compiler_util.h" // IWYU pragma: keep
30
#include "common/logging.h"
31
#include "common/metrics/doris_metrics.h"
32
#include "cpp/sync_point.h"
33
#include "io/fs/err_utils.h"
34
#include "io/hdfs_util.h"
35
#include "runtime/thread_context.h"
36
#include "runtime/workload_management/io_throttle.h"
37
#include "service/backend_options.h"
38
39
namespace doris::io {
40
41
bvar::Adder<uint64_t> hdfs_bytes_read_total("hdfs_file_reader", "bytes_read");
42
bvar::LatencyRecorder hdfs_bytes_per_read("hdfs_file_reader", "bytes_per_read"); // also QPS
43
bvar::PerSecond<bvar::Adder<uint64_t>> hdfs_read_througthput("hdfs_file_reader",
44
                                                             "hdfs_read_throughput",
45
                                                             &hdfs_bytes_read_total);
46
47
namespace {
48
49
Result<FileHandleCache::Accessor> get_file(const hdfsFS& fs, const Path& file, int64_t mtime,
50
37.6k
                                           int64_t file_size) {
51
37.6k
    static FileHandleCache cache(config::max_hdfs_file_handle_cache_num, 16,
52
37.6k
                                 config::max_hdfs_file_handle_cache_time_sec);
53
37.6k
    bool cache_hit;
54
37.6k
    FileHandleCache::Accessor accessor;
55
37.6k
    RETURN_IF_ERROR_RESULT(cache.get_file_handle(fs, file.native(), mtime, file_size, false,
56
37.6k
                                                 &accessor, &cache_hit));
57
37.6k
    return accessor;
58
37.6k
}
59
60
} // namespace
61
62
Result<FileReaderSPtr> HdfsFileReader::create(Path full_path, const hdfsFS& fs, std::string fs_name,
63
                                              const FileReaderOptions& opts,
64
37.6k
                                              RuntimeProfile* profile) {
65
37.6k
    auto path = convert_path(full_path, fs_name);
66
37.6k
    return get_file(fs, path, opts.mtime, opts.file_size).transform([&](auto&& accessor) {
67
37.5k
        return std::make_shared<HdfsFileReader>(std::move(path), std::move(fs_name),
68
37.5k
                                                std::move(accessor), profile, opts.mtime);
69
37.5k
    });
70
37.6k
}
71
72
HdfsFileReader::HdfsFileReader(Path path, std::string fs_name, FileHandleCache::Accessor accessor,
73
                               RuntimeProfile* profile, int64_t mtime)
74
37.5k
        : _path(std::move(path)),
75
37.5k
          _fs_name(std::move(fs_name)),
76
37.5k
          _accessor(std::move(accessor)),
77
37.5k
          _profile(profile),
78
37.5k
          _mtime(mtime) {
79
37.5k
    _handle = _accessor.get();
80
81
37.5k
    DorisMetrics::instance()->hdfs_file_open_reading->increment(1);
82
37.5k
    DorisMetrics::instance()->hdfs_file_reader_total->increment(1);
83
37.5k
    if (_profile != nullptr && is_hdfs(_fs_name)) {
84
35.1k
#ifdef USE_HADOOP_HDFS
85
35.1k
        const char* hdfs_profile_name = "HdfsIO";
86
35.1k
        ADD_TIMER(_profile, hdfs_profile_name);
87
35.1k
        _hdfs_profile.total_bytes_read =
88
35.1k
                ADD_CHILD_COUNTER(_profile, "TotalBytesRead", TUnit::BYTES, hdfs_profile_name);
89
35.1k
        _hdfs_profile.total_local_bytes_read =
90
35.1k
                ADD_CHILD_COUNTER(_profile, "TotalLocalBytesRead", TUnit::BYTES, hdfs_profile_name);
91
35.1k
        _hdfs_profile.total_short_circuit_bytes_read = ADD_CHILD_COUNTER(
92
35.1k
                _profile, "TotalShortCircuitBytesRead", TUnit::BYTES, hdfs_profile_name);
93
35.1k
        _hdfs_profile.total_total_zero_copy_bytes_read = ADD_CHILD_COUNTER(
94
35.1k
                _profile, "TotalZeroCopyBytesRead", TUnit::BYTES, hdfs_profile_name);
95
96
35.1k
        _hdfs_profile.total_hedged_read =
97
35.1k
                ADD_CHILD_COUNTER(_profile, "TotalHedgedRead", TUnit::UNIT, hdfs_profile_name);
98
35.1k
        _hdfs_profile.hedged_read_in_cur_thread = ADD_CHILD_COUNTER(
99
35.1k
                _profile, "HedgedReadInCurThread", TUnit::UNIT, hdfs_profile_name);
100
35.1k
        _hdfs_profile.hedged_read_wins =
101
35.1k
                ADD_CHILD_COUNTER(_profile, "HedgedReadWins", TUnit::UNIT, hdfs_profile_name);
102
35.1k
#endif
103
35.1k
    }
104
37.5k
}
105
106
37.5k
HdfsFileReader::~HdfsFileReader() {
107
37.5k
    static_cast<void>(close());
108
37.5k
}
109
110
39.9k
Status HdfsFileReader::close() {
111
39.9k
    bool expected = false;
112
39.9k
    if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
113
37.6k
        DorisMetrics::instance()->hdfs_file_open_reading->increment(-1);
114
37.6k
    }
115
39.9k
    return Status::OK();
116
39.9k
}
117
118
Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
119
284k
                                    const IOContext* io_ctx) {
120
284k
    auto st = do_read_at_impl(offset, result, bytes_read, io_ctx);
121
284k
    if (!st.ok()) {
122
0
        _handle = nullptr;
123
0
        _accessor.destroy();
124
0
    }
125
284k
    return st;
126
284k
}
127
128
#ifdef USE_HADOOP_HDFS
129
Status HdfsFileReader::do_read_at_impl(size_t offset, Slice result, size_t* bytes_read,
130
284k
                                       const IOContext* /*io_ctx*/) {
131
284k
    if (closed()) [[unlikely]] {
132
0
        return Status::InternalError("read closed file: {}", _path.native());
133
0
    }
134
135
284k
    if (_handle == nullptr) [[unlikely]] {
136
0
        return Status::InternalError("cached hdfs file handle has been destroyed: {}",
137
0
                                     _path.native());
138
0
    }
139
140
284k
    if (offset > _handle->file_size()) {
141
0
        return Status::IOError("offset exceeds file size(offset: {}, file size: {}, path: {})",
142
0
                               offset, _handle->file_size(), _path.native());
143
0
    }
144
145
284k
    size_t bytes_req = result.size;
146
284k
    char* to = result.data;
147
284k
    bytes_req = std::min(bytes_req, (size_t)(_handle->file_size() - offset));
148
284k
    *bytes_read = 0;
149
284k
    if (UNLIKELY(bytes_req == 0)) {
150
350
        return Status::OK();
151
350
    }
152
153
284k
    LIMIT_REMOTE_SCAN_IO(bytes_read);
154
155
284k
    size_t has_read = 0;
156
568k
    while (has_read < bytes_req) {
157
284k
        int64_t max_to_read = bytes_req - has_read;
158
284k
        tSize to_read = static_cast<tSize>(
159
284k
                std::min(max_to_read, static_cast<int64_t>(std::numeric_limits<tSize>::max())));
160
284k
        tSize loop_read = hdfsPread(_handle->fs(), _handle->file(), offset + has_read,
161
284k
                                    to + has_read, to_read);
162
284k
        {
163
284k
            [[maybe_unused]] Status error_ret;
164
284k
            TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileReader:read_error", error_ret);
165
284k
        }
166
284k
        if (loop_read < 0) {
167
            // invoker maybe just skip Status.NotFound and continue
168
            // so we need distinguish between it and other kinds of errors
169
0
            std::string _err_msg = hdfs_error();
170
0
            if (_err_msg.find("No such file or directory") != std::string::npos) {
171
0
                return Status::NotFound(_err_msg);
172
0
            }
173
0
            return Status::InternalError(
174
0
                    "Read hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}",
175
0
                    BackendOptions::get_localhost(), _fs_name, _path.string(), _err_msg);
176
0
        }
177
284k
        if (loop_read == 0) {
178
0
            break;
179
0
        }
180
284k
        has_read += loop_read;
181
284k
    }
182
284k
    *bytes_read = has_read;
183
284k
    hdfs_bytes_read_total << *bytes_read;
184
284k
    hdfs_bytes_per_read << *bytes_read;
185
284k
    return Status::OK();
186
284k
}
187
188
#else
189
// The hedged read only support hdfsPread().
190
// TODO: rethink here to see if there are some difference between hdfsPread() and hdfsRead()
191
Status HdfsFileReader::do_read_at_impl(size_t offset, Slice result, size_t* bytes_read,
192
                                       const IOContext* /*io_ctx*/) {
193
    if (closed()) [[unlikely]] {
194
        return Status::InternalError("read closed file: ", _path.native());
195
    }
196
197
    if (offset > _handle->file_size()) {
198
        return Status::IOError("offset exceeds file size(offset: {}, file size: {}, path: {})",
199
                               offset, _handle->file_size(), _path.native());
200
    }
201
202
    int res = hdfsSeek(_handle->fs(), _handle->file(), offset);
203
    if (res != 0) {
204
        // invoker maybe just skip Status.NotFound and continue
205
        // so we need distinguish between it and other kinds of errors
206
        std::string _err_msg = hdfs_error();
207
        if (_err_msg.find("No such file or directory") != std::string::npos) {
208
            return Status::NotFound(_err_msg);
209
        }
210
        return Status::InternalError("Seek to offset failed. (BE: {}) offset={}, err: {}",
211
                                     BackendOptions::get_localhost(), offset, _err_msg);
212
    }
213
214
    size_t bytes_req = result.size;
215
    char* to = result.data;
216
    bytes_req = std::min(bytes_req, (size_t)(_handle->file_size() - offset));
217
    *bytes_read = 0;
218
    if (UNLIKELY(bytes_req == 0)) {
219
        return Status::OK();
220
    }
221
222
    LIMIT_REMOTE_SCAN_IO(bytes_read);
223
224
    size_t has_read = 0;
225
    while (has_read < bytes_req) {
226
        int64_t loop_read = hdfsRead(_handle->fs(), _handle->file(), to + has_read,
227
                                     static_cast<int32_t>(bytes_req - has_read));
228
        if (loop_read < 0) {
229
            // invoker maybe just skip Status.NotFound and continue
230
            // so we need distinguish between it and other kinds of errors
231
            std::string _err_msg = hdfs_error();
232
            if (_err_msg.find("No such file or directory") != std::string::npos) {
233
                return Status::NotFound(_err_msg);
234
            }
235
            return Status::InternalError(
236
                    "Read hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}",
237
                    BackendOptions::get_localhost(), _fs_name, _path.string(), _err_msg);
238
        }
239
        if (loop_read == 0) {
240
            break;
241
        }
242
        has_read += loop_read;
243
    }
244
    *bytes_read = has_read;
245
    hdfs_bytes_read_total << *bytes_read;
246
    hdfs_bytes_per_read << *bytes_read;
247
    return Status::OK();
248
}
249
#endif
250
251
31.2k
void HdfsFileReader::_collect_profile_before_close() {
252
31.2k
    if (_profile != nullptr && is_hdfs(_fs_name)) {
253
30.6k
#ifdef USE_HADOOP_HDFS
254
30.6k
        if (_handle == nullptr) [[unlikely]] {
255
0
            return;
256
0
        }
257
258
30.6k
        struct hdfsReadStatistics* hdfs_statistics = nullptr;
259
30.6k
        auto r = hdfsFileGetReadStatistics(_handle->file(), &hdfs_statistics);
260
30.6k
        if (r != 0) {
261
0
            LOG(WARNING) << "Failed to run hdfsFileGetReadStatistics(): " << r
262
0
                         << ", name node: " << _fs_name;
263
0
            return;
264
0
        }
265
30.6k
        COUNTER_UPDATE(_hdfs_profile.total_bytes_read, hdfs_statistics->totalBytesRead);
266
30.6k
        COUNTER_UPDATE(_hdfs_profile.total_local_bytes_read, hdfs_statistics->totalLocalBytesRead);
267
30.6k
        COUNTER_UPDATE(_hdfs_profile.total_short_circuit_bytes_read,
268
30.6k
                       hdfs_statistics->totalShortCircuitBytesRead);
269
30.6k
        COUNTER_UPDATE(_hdfs_profile.total_total_zero_copy_bytes_read,
270
30.6k
                       hdfs_statistics->totalZeroCopyBytesRead);
271
30.6k
        hdfsFileFreeReadStatistics(hdfs_statistics);
272
273
30.6k
        struct hdfsHedgedReadMetrics* hdfs_hedged_read_statistics = nullptr;
274
30.6k
        r = hdfsGetHedgedReadMetrics(_handle->fs(), &hdfs_hedged_read_statistics);
275
30.6k
        if (r != 0) {
276
0
            LOG(WARNING) << "Failed to run hdfsGetHedgedReadMetrics(): " << r
277
0
                         << ", name node: " << _fs_name;
278
0
            return;
279
0
        }
280
281
30.6k
        COUNTER_UPDATE(_hdfs_profile.total_hedged_read, hdfs_hedged_read_statistics->hedgedReadOps);
282
30.6k
        COUNTER_UPDATE(_hdfs_profile.hedged_read_in_cur_thread,
283
30.6k
                       hdfs_hedged_read_statistics->hedgedReadOpsInCurThread);
284
30.6k
        COUNTER_UPDATE(_hdfs_profile.hedged_read_wins,
285
30.6k
                       hdfs_hedged_read_statistics->hedgedReadOpsWin);
286
287
30.6k
        hdfsFreeHedgedReadMetrics(hdfs_hedged_read_statistics);
288
30.6k
        hdfsFileClearReadStatistics(_handle->file());
289
30.6k
#endif
290
30.6k
    }
291
31.2k
}
292
293
} // namespace doris::io