Coverage Report

Created: 2026-04-09 12:24

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/hdfs_file_system.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/hdfs_file_system.h"
19
20
#include <errno.h>
21
#include <fcntl.h>
22
#include <gen_cpp/PlanNodes_types.h>
23
24
#include <algorithm>
25
#include <filesystem>
26
#include <map>
27
#include <mutex>
28
#include <ostream>
29
#include <unordered_map>
30
#include <utility>
31
32
#include "common/config.h"
33
#include "common/status.h"
34
#include "core/pod_array.h"
35
#include "io/fs/err_utils.h"
36
#include "io/fs/hdfs/hdfs_mgr.h"
37
#include "io/fs/hdfs_file_reader.h"
38
#include "io/fs/hdfs_file_writer.h"
39
#include "io/fs/local_file_system.h"
40
#include "io/hdfs_builder.h"
41
#include "io/hdfs_util.h"
42
#include "runtime/exec_env.h"
43
#include "util/obj_lru_cache.h"
44
#include "util/slice.h"
45
46
namespace doris::io {
47
48
#ifndef CHECK_HDFS_HANDLER
49
#define CHECK_HDFS_HANDLER(handler)                        \
50
178
    if (!handler) {                                        \
51
0
        return Status::IOError("init Hdfs handler error"); \
52
0
    }
53
#endif
54
55
Result<std::shared_ptr<HdfsFileSystem>> HdfsFileSystem::create(
56
        const std::map<std::string, std::string>& properties, std::string fs_name, std::string id,
57
2.60k
        RuntimeProfile* profile, std::string root_path) {
58
2.60k
    return HdfsFileSystem::create(parse_properties(properties), std::move(fs_name), std::move(id),
59
2.60k
                                  profile, std::move(root_path));
60
2.60k
}
61
62
Result<std::shared_ptr<HdfsFileSystem>> HdfsFileSystem::create(const THdfsParams& hdfs_params,
63
                                                               std::string fs_name, std::string id,
64
                                                               RuntimeProfile* profile,
65
2.64k
                                                               std::string root_path) {
66
2.64k
#ifdef USE_HADOOP_HDFS
67
2.64k
    if (!config::enable_java_support) {
68
0
        return ResultError(Status::InternalError(
69
0
                "hdfs file system is not enabled, you can change be config enable_java_support to "
70
0
                "true."));
71
0
    }
72
2.64k
#endif
73
2.64k
    std::shared_ptr<HdfsFileSystem> fs(new HdfsFileSystem(
74
2.64k
            hdfs_params, std::move(fs_name), std::move(id), profile, std::move(root_path)));
75
2.64k
    RETURN_IF_ERROR_RESULT(fs->init());
76
2.63k
    return fs;
77
2.64k
}
78
79
HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, std::string fs_name, std::string id,
80
                               RuntimeProfile* profile, std::string root_path)
81
2.64k
        : RemoteFileSystem(std::move(root_path), std::move(id), FileSystemType::HDFS),
82
2.64k
          _hdfs_params(hdfs_params),
83
2.64k
          _fs_name(std::move(fs_name)),
84
2.64k
          _profile(profile) {
85
2.64k
    if (_fs_name.empty()) {
86
8
        _fs_name = hdfs_params.fs_name;
87
8
    }
88
2.64k
}
89
90
2.63k
HdfsFileSystem::~HdfsFileSystem() = default;
91
92
2.64k
Status HdfsFileSystem::init() {
93
2.64k
    RETURN_IF_ERROR(ExecEnv::GetInstance()->hdfs_mgr()->get_or_create_fs(_hdfs_params, _fs_name,
94
2.64k
                                                                         &_fs_handler));
95
2.63k
    if (!_fs_handler) {
96
0
        return Status::InternalError("failed to init Hdfs handler with, please check hdfs params.");
97
0
    }
98
2.63k
    return Status::OK();
99
2.63k
}
100
101
Status HdfsFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer,
102
2.60k
                                        const FileWriterOptions* opts) {
103
2.60k
    auto res = io::HdfsFileWriter::create(file, _fs_handler, _fs_name, opts);
104
2.60k
    if (res.has_value()) {
105
2.60k
        *writer = std::move(res).value();
106
2.60k
        return Status::OK();
107
2.60k
    } else {
108
0
        return std::move(res).error();
109
0
    }
110
2.60k
}
111
112
Status HdfsFileSystem::open_file_internal(const Path& file, FileReaderSPtr* reader,
113
178
                                          const FileReaderOptions& opts) {
114
178
    CHECK_HDFS_HANDLER(_fs_handler);
115
178
    *reader =
116
178
            DORIS_TRY(HdfsFileReader::create(file, _fs_handler->hdfs_fs, _fs_name, opts, _profile));
117
178
    return Status::OK();
118
178
}
119
120
0
Status HdfsFileSystem::create_directory_impl(const Path& dir, bool failed_if_exists) {
121
0
    CHECK_HDFS_HANDLER(_fs_handler);
122
0
    Path real_path = convert_path(dir, _fs_name);
123
0
    int res = hdfsCreateDirectory(_fs_handler->hdfs_fs, real_path.string().c_str());
124
0
    if (res == -1) {
125
0
        return Status::IOError("failed to create directory {}: {}", dir.native(), hdfs_error());
126
0
    }
127
0
    return Status::OK();
128
0
}
129
130
0
Status HdfsFileSystem::delete_file_impl(const Path& file) {
131
0
    return delete_internal(file, 0);
132
0
}
133
134
0
Status HdfsFileSystem::delete_directory_impl(const Path& dir) {
135
0
    return delete_internal(dir, 1);
136
0
}
137
138
0
Status HdfsFileSystem::batch_delete_impl(const std::vector<Path>& files) {
139
0
    for (auto& file : files) {
140
0
        RETURN_IF_ERROR(delete_file_impl(file));
141
0
    }
142
0
    return Status::OK();
143
0
}
144
145
0
Status HdfsFileSystem::delete_internal(const Path& path, int is_recursive) {
146
0
    bool exists = true;
147
0
    RETURN_IF_ERROR(exists_impl(path, &exists));
148
0
    if (!exists) {
149
0
        return Status::OK();
150
0
    }
151
0
    CHECK_HDFS_HANDLER(_fs_handler);
152
0
    Path real_path = convert_path(path, _fs_name);
153
0
    int res = hdfsDelete(_fs_handler->hdfs_fs, real_path.string().c_str(), is_recursive);
154
0
    if (res == -1) {
155
0
        return Status::IOError("failed to delete directory {}: {}", path.native(), hdfs_error());
156
0
    }
157
0
    return Status::OK();
158
0
}
159
160
0
Status HdfsFileSystem::exists_impl(const Path& path, bool* res) const {
161
0
    CHECK_HDFS_HANDLER(_fs_handler);
162
0
    Path real_path = convert_path(path, _fs_name);
163
0
    int is_exists = hdfsExists(_fs_handler->hdfs_fs, real_path.string().c_str());
164
0
#ifdef USE_HADOOP_HDFS
165
    // when calling hdfsExists() and return non-zero code,
166
    // if errno is ENOENT, which means the file does not exist.
167
    // if errno is not ENOENT, which means it encounter other error, should return.
168
    // NOTE: not for libhdfs3 since it only runs on MaxOS, don't have to support it.
169
    //
170
    // See details:
171
    //  https://github.com/apache/hadoop/blob/5cda162a804fb0cfc2a5ac0058ab407662c5fb00/
172
    //  hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c#L1923-L1924
173
0
    if (is_exists != 0 && errno != ENOENT) {
174
0
        char* root_cause = hdfsGetLastExceptionRootCause();
175
0
        return Status::IOError("failed to check path existence {}: {}", path.native(),
176
0
                               (root_cause ? root_cause : "unknown"));
177
0
    }
178
0
#endif
179
0
    *res = (is_exists == 0);
180
0
    return Status::OK();
181
0
}
182
183
0
Status HdfsFileSystem::file_size_impl(const Path& path, int64_t* file_size) const {
184
0
    CHECK_HDFS_HANDLER(_fs_handler);
185
0
    Path real_path = convert_path(path, _fs_name);
186
0
    hdfsFileInfo* file_info = hdfsGetPathInfo(_fs_handler->hdfs_fs, real_path.string().c_str());
187
0
    if (file_info == nullptr) {
188
0
        return Status::IOError("failed to get file size of {}: {}", path.native(), hdfs_error());
189
0
    }
190
0
    *file_size = file_info->mSize;
191
0
    hdfsFreeFileInfo(file_info, 1);
192
0
    return Status::OK();
193
0
}
194
195
Status HdfsFileSystem::list_impl(const Path& path, bool only_file, std::vector<FileInfo>* files,
196
0
                                 bool* exists) {
197
0
    RETURN_IF_ERROR(exists_impl(path, exists));
198
0
    if (!(*exists)) {
199
0
        return Status::OK();
200
0
    }
201
202
0
    CHECK_HDFS_HANDLER(_fs_handler);
203
0
    Path real_path = convert_path(path, _fs_name);
204
0
    int numEntries = 0;
205
0
    hdfsFileInfo* hdfs_file_info =
206
0
            hdfsListDirectory(_fs_handler->hdfs_fs, real_path.c_str(), &numEntries);
207
0
    if (hdfs_file_info == nullptr) {
208
0
        return Status::IOError("failed to list files/directors {}: {}", path.native(),
209
0
                               hdfs_error());
210
0
    }
211
0
    for (int idx = 0; idx < numEntries; ++idx) {
212
0
        auto& file = hdfs_file_info[idx];
213
0
        if (only_file && file.mKind == kObjectKindDirectory) {
214
0
            continue;
215
0
        }
216
0
        auto& file_info = files->emplace_back();
217
0
        std::string_view fname(file.mName);
218
0
        fname.remove_prefix(fname.rfind('/') + 1);
219
0
        file_info.file_name = fname;
220
0
        file_info.file_size = file.mSize;
221
0
        file_info.is_file = (file.mKind != kObjectKindDirectory);
222
0
    }
223
0
    hdfsFreeFileInfo(hdfs_file_info, numEntries);
224
0
    return Status::OK();
225
0
}
226
227
0
Status HdfsFileSystem::rename_impl(const Path& orig_name, const Path& new_name) {
228
0
    Path normal_orig_name = convert_path(orig_name, _fs_name);
229
0
    Path normal_new_name = convert_path(new_name, _fs_name);
230
0
    int ret = hdfsRename(_fs_handler->hdfs_fs, normal_orig_name.c_str(), normal_new_name.c_str());
231
0
    if (ret == 0) {
232
0
        LOG(INFO) << "finished to rename file. orig: " << normal_orig_name
233
0
                  << ", new: " << normal_new_name;
234
0
        return Status::OK();
235
0
    } else {
236
0
        return Status::IOError("fail to rename from {} to {}: {}", normal_orig_name.native(),
237
0
                               normal_new_name.native(), hdfs_error());
238
0
    }
239
0
    return Status::OK();
240
0
}
241
242
0
Status HdfsFileSystem::upload_impl(const Path& local_file, const Path& remote_file) {
243
    // 1. open local file for read
244
0
    FileSystemSPtr local_fs = global_local_filesystem();
245
0
    FileReaderSPtr local_reader = nullptr;
246
0
    RETURN_IF_ERROR(local_fs->open_file(local_file, &local_reader));
247
0
    int64_t file_len = local_reader->size();
248
0
    if (file_len == -1) {
249
0
        return Status::IOError("failed to get size of file: {}", local_file.string());
250
0
    }
251
252
    // 2. open remote file for write
253
0
    FileWriterPtr hdfs_writer = nullptr;
254
0
    RETURN_IF_ERROR(create_file_impl(remote_file, &hdfs_writer, nullptr));
255
256
0
    constexpr size_t buf_sz = 1024 * 1024;
257
0
    char read_buf[buf_sz];
258
0
    size_t left_len = file_len;
259
0
    size_t read_offset = 0;
260
0
    size_t bytes_read = 0;
261
0
    while (left_len > 0) {
262
0
        size_t read_len = left_len > buf_sz ? buf_sz : left_len;
263
0
        RETURN_IF_ERROR(local_reader->read_at(read_offset, {read_buf, read_len}, &bytes_read));
264
0
        RETURN_IF_ERROR(hdfs_writer->append({read_buf, read_len}));
265
266
0
        read_offset += read_len;
267
0
        left_len -= read_len;
268
0
    }
269
270
0
    return hdfs_writer->close();
271
0
}
272
273
Status HdfsFileSystem::batch_upload_impl(const std::vector<Path>& local_files,
274
0
                                         const std::vector<Path>& remote_files) {
275
0
    DCHECK(local_files.size() == remote_files.size());
276
0
    for (int i = 0; i < local_files.size(); ++i) {
277
0
        RETURN_IF_ERROR(upload_impl(local_files[i], remote_files[i]));
278
0
    }
279
0
    return Status::OK();
280
0
}
281
282
0
Status HdfsFileSystem::download_impl(const Path& remote_file, const Path& local_file) {
283
    // 1. open remote file for read
284
0
    FileReaderSPtr hdfs_reader = nullptr;
285
0
    RETURN_IF_ERROR(open_file_internal(remote_file, &hdfs_reader, FileReaderOptions::DEFAULT));
286
287
    // 2. remove the existing local file if exist
288
0
    if (std::filesystem::remove(local_file)) {
289
0
        LOG(INFO) << "remove the previously exist local file: " << local_file;
290
0
    }
291
292
    // 3. open local file for write
293
0
    FileSystemSPtr local_fs = global_local_filesystem();
294
0
    FileWriterPtr local_writer = nullptr;
295
0
    RETURN_IF_ERROR(local_fs->create_file(local_file, &local_writer));
296
297
    // 4. read remote and write to local
298
0
    LOG(INFO) << "read remote file: " << remote_file << " to local: " << local_file;
299
0
    constexpr size_t buf_sz = 1024 * 1024;
300
0
    PODArray<char> read_buf;
301
0
    read_buf.resize(buf_sz);
302
0
    size_t cur_offset = 0;
303
0
    while (true) {
304
0
        size_t read_len = 0;
305
0
        Slice file_slice(read_buf.data(), buf_sz);
306
0
        RETURN_IF_ERROR(hdfs_reader->read_at(cur_offset, file_slice, &read_len));
307
0
        cur_offset += read_len;
308
0
        if (read_len == 0) {
309
0
            break;
310
0
        }
311
312
0
        RETURN_IF_ERROR(local_writer->append({read_buf.data(), read_len}));
313
0
    }
314
0
    return local_writer->close();
315
0
}
316
317
} // namespace doris::io