Coverage Report

Created: 2026-03-15 15:32

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/hdfs_file_system.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/hdfs_file_system.h"
19
20
#include <errno.h>
21
#include <fcntl.h>
22
#include <gen_cpp/PlanNodes_types.h>
23
24
#include <algorithm>
25
#include <filesystem>
26
#include <map>
27
#include <mutex>
28
#include <ostream>
29
#include <unordered_map>
30
#include <utility>
31
32
#include "common/config.h"
33
#include "common/status.h"
34
#include "io/fs/err_utils.h"
35
#include "io/fs/hdfs/hdfs_mgr.h"
36
#include "io/fs/hdfs_file_reader.h"
37
#include "io/fs/hdfs_file_writer.h"
38
#include "io/fs/local_file_system.h"
39
#include "io/hdfs_builder.h"
40
#include "io/hdfs_util.h"
41
#include "runtime/exec_env.h"
42
#include "util/obj_lru_cache.h"
43
#include "util/slice.h"
44
45
namespace doris::io {
46
47
#ifndef CHECK_HDFS_HANDLER
48
#define CHECK_HDFS_HANDLER(handler)                        \
49
0
    if (!handler) {                                        \
50
0
        return Status::IOError("init Hdfs handler error"); \
51
0
    }
52
#endif
53
54
Result<std::shared_ptr<HdfsFileSystem>> HdfsFileSystem::create(
55
        const std::map<std::string, std::string>& properties, std::string fs_name, std::string id,
56
0
        RuntimeProfile* profile, std::string root_path) {
57
0
    return HdfsFileSystem::create(parse_properties(properties), std::move(fs_name), std::move(id),
58
0
                                  profile, std::move(root_path));
59
0
}
60
61
Result<std::shared_ptr<HdfsFileSystem>> HdfsFileSystem::create(const THdfsParams& hdfs_params,
62
                                                               std::string fs_name, std::string id,
63
                                                               RuntimeProfile* profile,
64
34
                                                               std::string root_path) {
65
34
#ifdef USE_HADOOP_HDFS
66
34
    if (!config::enable_java_support) {
67
0
        return ResultError(Status::InternalError(
68
0
                "hdfs file system is not enabled, you can change be config enable_java_support to "
69
0
                "true."));
70
0
    }
71
34
#endif
72
34
    std::shared_ptr<HdfsFileSystem> fs(new HdfsFileSystem(
73
34
            hdfs_params, std::move(fs_name), std::move(id), profile, std::move(root_path)));
74
34
    RETURN_IF_ERROR_RESULT(fs->init());
75
22
    return fs;
76
34
}
77
78
HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, std::string fs_name, std::string id,
79
                               RuntimeProfile* profile, std::string root_path)
80
34
        : RemoteFileSystem(std::move(root_path), std::move(id), FileSystemType::HDFS),
81
34
          _hdfs_params(hdfs_params),
82
34
          _fs_name(std::move(fs_name)),
83
34
          _profile(profile) {
84
34
    if (_fs_name.empty()) {
85
6
        _fs_name = hdfs_params.fs_name;
86
6
    }
87
34
}
88
89
23
HdfsFileSystem::~HdfsFileSystem() = default;
90
91
34
Status HdfsFileSystem::init() {
92
34
    RETURN_IF_ERROR(ExecEnv::GetInstance()->hdfs_mgr()->get_or_create_fs(_hdfs_params, _fs_name,
93
34
                                                                         &_fs_handler));
94
22
    if (!_fs_handler) {
95
0
        return Status::InternalError("failed to init Hdfs handler with, please check hdfs params.");
96
0
    }
97
22
    return Status::OK();
98
22
}
99
100
Status HdfsFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer,
101
0
                                        const FileWriterOptions* opts) {
102
0
    auto res = io::HdfsFileWriter::create(file, _fs_handler, _fs_name, opts);
103
0
    if (res.has_value()) {
104
0
        *writer = std::move(res).value();
105
0
        return Status::OK();
106
0
    } else {
107
0
        return std::move(res).error();
108
0
    }
109
0
}
110
111
Status HdfsFileSystem::open_file_internal(const Path& file, FileReaderSPtr* reader,
112
0
                                          const FileReaderOptions& opts) {
113
0
    CHECK_HDFS_HANDLER(_fs_handler);
114
0
    *reader =
115
0
            DORIS_TRY(HdfsFileReader::create(file, _fs_handler->hdfs_fs, _fs_name, opts, _profile));
116
0
    return Status::OK();
117
0
}
118
119
0
Status HdfsFileSystem::create_directory_impl(const Path& dir, bool failed_if_exists) {
120
0
    CHECK_HDFS_HANDLER(_fs_handler);
121
0
    Path real_path = convert_path(dir, _fs_name);
122
0
    int res = hdfsCreateDirectory(_fs_handler->hdfs_fs, real_path.string().c_str());
123
0
    if (res == -1) {
124
0
        return Status::IOError("failed to create directory {}: {}", dir.native(), hdfs_error());
125
0
    }
126
0
    return Status::OK();
127
0
}
128
129
0
Status HdfsFileSystem::delete_file_impl(const Path& file) {
130
0
    return delete_internal(file, 0);
131
0
}
132
133
0
Status HdfsFileSystem::delete_directory_impl(const Path& dir) {
134
0
    return delete_internal(dir, 1);
135
0
}
136
137
0
Status HdfsFileSystem::batch_delete_impl(const std::vector<Path>& files) {
138
0
    for (auto& file : files) {
139
0
        RETURN_IF_ERROR(delete_file_impl(file));
140
0
    }
141
0
    return Status::OK();
142
0
}
143
144
0
Status HdfsFileSystem::delete_internal(const Path& path, int is_recursive) {
145
0
    bool exists = true;
146
0
    RETURN_IF_ERROR(exists_impl(path, &exists));
147
0
    if (!exists) {
148
0
        return Status::OK();
149
0
    }
150
0
    CHECK_HDFS_HANDLER(_fs_handler);
151
0
    Path real_path = convert_path(path, _fs_name);
152
0
    int res = hdfsDelete(_fs_handler->hdfs_fs, real_path.string().c_str(), is_recursive);
153
0
    if (res == -1) {
154
0
        return Status::IOError("failed to delete directory {}: {}", path.native(), hdfs_error());
155
0
    }
156
0
    return Status::OK();
157
0
}
158
159
0
Status HdfsFileSystem::exists_impl(const Path& path, bool* res) const {
160
0
    CHECK_HDFS_HANDLER(_fs_handler);
161
0
    Path real_path = convert_path(path, _fs_name);
162
0
    int is_exists = hdfsExists(_fs_handler->hdfs_fs, real_path.string().c_str());
163
0
#ifdef USE_HADOOP_HDFS
164
    // when calling hdfsExists() and return non-zero code,
165
    // if errno is ENOENT, which means the file does not exist.
166
    // if errno is not ENOENT, which means it encounter other error, should return.
167
    // NOTE: not for libhdfs3 since it only runs on MaxOS, don't have to support it.
168
    //
169
    // See details:
170
    //  https://github.com/apache/hadoop/blob/5cda162a804fb0cfc2a5ac0058ab407662c5fb00/
171
    //  hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c#L1923-L1924
172
0
    if (is_exists != 0 && errno != ENOENT) {
173
0
        char* root_cause = hdfsGetLastExceptionRootCause();
174
0
        return Status::IOError("failed to check path existence {}: {}", path.native(),
175
0
                               (root_cause ? root_cause : "unknown"));
176
0
    }
177
0
#endif
178
0
    *res = (is_exists == 0);
179
0
    return Status::OK();
180
0
}
181
182
0
Status HdfsFileSystem::file_size_impl(const Path& path, int64_t* file_size) const {
183
0
    CHECK_HDFS_HANDLER(_fs_handler);
184
0
    Path real_path = convert_path(path, _fs_name);
185
0
    hdfsFileInfo* file_info = hdfsGetPathInfo(_fs_handler->hdfs_fs, real_path.string().c_str());
186
0
    if (file_info == nullptr) {
187
0
        return Status::IOError("failed to get file size of {}: {}", path.native(), hdfs_error());
188
0
    }
189
0
    *file_size = file_info->mSize;
190
0
    hdfsFreeFileInfo(file_info, 1);
191
0
    return Status::OK();
192
0
}
193
194
Status HdfsFileSystem::list_impl(const Path& path, bool only_file, std::vector<FileInfo>* files,
195
0
                                 bool* exists) {
196
0
    RETURN_IF_ERROR(exists_impl(path, exists));
197
0
    if (!(*exists)) {
198
0
        return Status::OK();
199
0
    }
200
201
0
    CHECK_HDFS_HANDLER(_fs_handler);
202
0
    Path real_path = convert_path(path, _fs_name);
203
0
    int numEntries = 0;
204
0
    hdfsFileInfo* hdfs_file_info =
205
0
            hdfsListDirectory(_fs_handler->hdfs_fs, real_path.c_str(), &numEntries);
206
0
    if (hdfs_file_info == nullptr) {
207
0
        return Status::IOError("failed to list files/directors {}: {}", path.native(),
208
0
                               hdfs_error());
209
0
    }
210
0
    for (int idx = 0; idx < numEntries; ++idx) {
211
0
        auto& file = hdfs_file_info[idx];
212
0
        if (only_file && file.mKind == kObjectKindDirectory) {
213
0
            continue;
214
0
        }
215
0
        auto& file_info = files->emplace_back();
216
0
        std::string_view fname(file.mName);
217
0
        fname.remove_prefix(fname.rfind('/') + 1);
218
0
        file_info.file_name = fname;
219
0
        file_info.file_size = file.mSize;
220
0
        file_info.is_file = (file.mKind != kObjectKindDirectory);
221
0
    }
222
0
    hdfsFreeFileInfo(hdfs_file_info, numEntries);
223
0
    return Status::OK();
224
0
}
225
226
0
Status HdfsFileSystem::rename_impl(const Path& orig_name, const Path& new_name) {
227
0
    Path normal_orig_name = convert_path(orig_name, _fs_name);
228
0
    Path normal_new_name = convert_path(new_name, _fs_name);
229
0
    int ret = hdfsRename(_fs_handler->hdfs_fs, normal_orig_name.c_str(), normal_new_name.c_str());
230
0
    if (ret == 0) {
231
0
        LOG(INFO) << "finished to rename file. orig: " << normal_orig_name
232
0
                  << ", new: " << normal_new_name;
233
0
        return Status::OK();
234
0
    } else {
235
0
        return Status::IOError("fail to rename from {} to {}: {}", normal_orig_name.native(),
236
0
                               normal_new_name.native(), hdfs_error());
237
0
    }
238
0
    return Status::OK();
239
0
}
240
241
0
Status HdfsFileSystem::upload_impl(const Path& local_file, const Path& remote_file) {
242
    // 1. open local file for read
243
0
    FileSystemSPtr local_fs = global_local_filesystem();
244
0
    FileReaderSPtr local_reader = nullptr;
245
0
    RETURN_IF_ERROR(local_fs->open_file(local_file, &local_reader));
246
0
    int64_t file_len = local_reader->size();
247
0
    if (file_len == -1) {
248
0
        return Status::IOError("failed to get size of file: {}", local_file.string());
249
0
    }
250
251
    // 2. open remote file for write
252
0
    FileWriterPtr hdfs_writer = nullptr;
253
0
    RETURN_IF_ERROR(create_file_impl(remote_file, &hdfs_writer, nullptr));
254
255
0
    constexpr size_t buf_sz = 1024 * 1024;
256
0
    char read_buf[buf_sz];
257
0
    size_t left_len = file_len;
258
0
    size_t read_offset = 0;
259
0
    size_t bytes_read = 0;
260
0
    while (left_len > 0) {
261
0
        size_t read_len = left_len > buf_sz ? buf_sz : left_len;
262
0
        RETURN_IF_ERROR(local_reader->read_at(read_offset, {read_buf, read_len}, &bytes_read));
263
0
        RETURN_IF_ERROR(hdfs_writer->append({read_buf, read_len}));
264
265
0
        read_offset += read_len;
266
0
        left_len -= read_len;
267
0
    }
268
269
0
    return hdfs_writer->close();
270
0
}
271
272
Status HdfsFileSystem::batch_upload_impl(const std::vector<Path>& local_files,
273
0
                                         const std::vector<Path>& remote_files) {
274
0
    DCHECK(local_files.size() == remote_files.size());
275
0
    for (int i = 0; i < local_files.size(); ++i) {
276
0
        RETURN_IF_ERROR(upload_impl(local_files[i], remote_files[i]));
277
0
    }
278
0
    return Status::OK();
279
0
}
280
281
0
Status HdfsFileSystem::download_impl(const Path& remote_file, const Path& local_file) {
282
    // 1. open remote file for read
283
0
    FileReaderSPtr hdfs_reader = nullptr;
284
0
    RETURN_IF_ERROR(open_file_internal(remote_file, &hdfs_reader, FileReaderOptions::DEFAULT));
285
286
    // 2. remove the existing local file if exist
287
0
    if (std::filesystem::remove(local_file)) {
288
0
        LOG(INFO) << "remove the previously exist local file: " << local_file;
289
0
    }
290
291
    // 3. open local file for write
292
0
    FileSystemSPtr local_fs = global_local_filesystem();
293
0
    FileWriterPtr local_writer = nullptr;
294
0
    RETURN_IF_ERROR(local_fs->create_file(local_file, &local_writer));
295
296
    // 4. read remote and write to local
297
0
    LOG(INFO) << "read remote file: " << remote_file << " to local: " << local_file;
298
0
    constexpr size_t buf_sz = 1024 * 1024;
299
0
    std::unique_ptr<char[]> read_buf(new char[buf_sz]);
300
0
    size_t cur_offset = 0;
301
0
    while (true) {
302
0
        size_t read_len = 0;
303
0
        Slice file_slice(read_buf.get(), buf_sz);
304
0
        RETURN_IF_ERROR(hdfs_reader->read_at(cur_offset, file_slice, &read_len));
305
0
        cur_offset += read_len;
306
0
        if (read_len == 0) {
307
0
            break;
308
0
        }
309
310
0
        RETURN_IF_ERROR(local_writer->append({read_buf.get(), read_len}));
311
0
    }
312
0
    return local_writer->close();
313
0
}
314
315
} // namespace doris::io