Coverage Report

Created: 2025-04-12 02:12

/root/doris/be/src/io/hdfs_util.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/hdfs_util.h"
19
20
#include <bthread/bthread.h>
21
#include <bthread/butex.h>
22
#include <bvar/latency_recorder.h>
23
#include <gen_cpp/cloud.pb.h>
24
25
#include <ostream>
26
#include <thread>
27
28
#include "common/logging.h"
29
#include "io/fs/err_utils.h"
30
#include "io/hdfs_builder.h"
31
#include "vec/common/string_ref.h"
32
33
namespace doris::io {
34
35
namespace hdfs_bvar {
36
bvar::LatencyRecorder hdfs_read_latency("hdfs_read");
37
bvar::LatencyRecorder hdfs_write_latency("hdfs_write");
38
bvar::LatencyRecorder hdfs_create_dir_latency("hdfs_create_dir");
39
bvar::LatencyRecorder hdfs_open_latency("hdfs_open");
40
bvar::LatencyRecorder hdfs_close_latency("hdfs_close");
41
bvar::LatencyRecorder hdfs_flush_latency("hdfs_flush");
42
bvar::LatencyRecorder hdfs_hflush_latency("hdfs_hflush");
43
bvar::LatencyRecorder hdfs_hsync_latency("hdfs_hsync");
44
}; // namespace hdfs_bvar
45
46
0
Path convert_path(const Path& path, const std::string& namenode) {
47
0
    std::string fs_path;
48
0
    if (path.native().find(namenode) != std::string::npos) {
49
        // `path` is uri format, remove the namenode part in `path`
50
        // FIXME(plat1ko): Not robust if `namenode` doesn't appear at the beginning of `path`
51
0
        fs_path = path.native().substr(namenode.size());
52
0
    } else {
53
0
        fs_path = path;
54
0
    }
55
56
    // Always use absolute path (start with '/') in hdfs
57
0
    if (fs_path.empty() || fs_path[0] != '/') {
58
0
        fs_path.insert(fs_path.begin(), '/');
59
0
    }
60
0
    return fs_path;
61
0
}
62
63
0
bool is_hdfs(const std::string& path_or_fs) {
64
0
    return path_or_fs.rfind("hdfs://") == 0;
65
0
}
66
67
0
THdfsParams to_hdfs_params(const cloud::HdfsVaultInfo& vault) {
68
0
    THdfsParams params;
69
0
    auto build_conf = vault.build_conf();
70
0
    params.__set_fs_name(build_conf.fs_name());
71
0
    if (build_conf.has_user()) {
72
0
        params.__set_user(build_conf.user());
73
0
    }
74
0
    if (build_conf.has_hdfs_kerberos_principal()) {
75
0
        params.__set_hdfs_kerberos_principal(build_conf.hdfs_kerberos_principal());
76
0
    }
77
0
    if (build_conf.has_hdfs_kerberos_keytab()) {
78
0
        params.__set_hdfs_kerberos_keytab(build_conf.hdfs_kerberos_keytab());
79
0
    }
80
0
    std::vector<THdfsConf> tconfs;
81
0
    for (const auto& confs : vault.build_conf().hdfs_confs()) {
82
0
        THdfsConf conf;
83
0
        conf.__set_key(confs.key());
84
0
        conf.__set_value(confs.value());
85
0
        tconfs.emplace_back(conf);
86
0
    }
87
0
    params.__set_hdfs_conf(tconfs);
88
0
    return params;
89
0
}
90
91
} // namespace doris::io