/root/doris/be/src/io/hdfs_util.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "io/hdfs_util.h" |
19 | | |
20 | | #include <bthread/bthread.h> |
21 | | #include <bthread/butex.h> |
22 | | #include <bvar/latency_recorder.h> |
23 | | #include <gen_cpp/cloud.pb.h> |
24 | | |
25 | | #include <ostream> |
26 | | #include <thread> |
27 | | |
28 | | #include "common/logging.h" |
29 | | #include "io/fs/err_utils.h" |
30 | | #include "io/hdfs_builder.h" |
31 | | #include "vec/common/string_ref.h" |
32 | | |
33 | | namespace doris::io { |
34 | | |
35 | | namespace hdfs_bvar { |
36 | | bvar::LatencyRecorder hdfs_read_latency("hdfs_read"); |
37 | | bvar::LatencyRecorder hdfs_write_latency("hdfs_write"); |
38 | | bvar::LatencyRecorder hdfs_create_dir_latency("hdfs_create_dir"); |
39 | | bvar::LatencyRecorder hdfs_open_latency("hdfs_open"); |
40 | | bvar::LatencyRecorder hdfs_close_latency("hdfs_close"); |
41 | | bvar::LatencyRecorder hdfs_flush_latency("hdfs_flush"); |
42 | | bvar::LatencyRecorder hdfs_hflush_latency("hdfs_hflush"); |
43 | | bvar::LatencyRecorder hdfs_hsync_latency("hdfs_hsync"); |
44 | | }; // namespace hdfs_bvar |
45 | | |
46 | 0 | Path convert_path(const Path& path, const std::string& namenode) { |
47 | 0 | std::string fs_path; |
48 | 0 | if (path.native().find(namenode) != std::string::npos) { |
49 | | // `path` is uri format, remove the namenode part in `path` |
50 | | // FIXME(plat1ko): Not robust if `namenode` doesn't appear at the beginning of `path` |
51 | 0 | fs_path = path.native().substr(namenode.size()); |
52 | 0 | } else { |
53 | 0 | fs_path = path; |
54 | 0 | } |
55 | | |
56 | | // Always use absolute path (start with '/') in hdfs |
57 | 0 | if (fs_path.empty() || fs_path[0] != '/') { |
58 | 0 | fs_path.insert(fs_path.begin(), '/'); |
59 | 0 | } |
60 | 0 | return fs_path; |
61 | 0 | } |
62 | | |
63 | 0 | bool is_hdfs(const std::string& path_or_fs) { |
64 | 0 | return path_or_fs.rfind("hdfs://") == 0; |
65 | 0 | } |
66 | | |
67 | 0 | THdfsParams to_hdfs_params(const cloud::HdfsVaultInfo& vault) { |
68 | 0 | THdfsParams params; |
69 | 0 | auto build_conf = vault.build_conf(); |
70 | 0 | params.__set_fs_name(build_conf.fs_name()); |
71 | 0 | if (build_conf.has_user()) { |
72 | 0 | params.__set_user(build_conf.user()); |
73 | 0 | } |
74 | 0 | if (build_conf.has_hdfs_kerberos_principal()) { |
75 | 0 | params.__set_hdfs_kerberos_principal(build_conf.hdfs_kerberos_principal()); |
76 | 0 | } |
77 | 0 | if (build_conf.has_hdfs_kerberos_keytab()) { |
78 | 0 | params.__set_hdfs_kerberos_keytab(build_conf.hdfs_kerberos_keytab()); |
79 | 0 | } |
80 | 0 | std::vector<THdfsConf> tconfs; |
81 | 0 | for (const auto& confs : vault.build_conf().hdfs_confs()) { |
82 | 0 | THdfsConf conf; |
83 | 0 | conf.__set_key(confs.key()); |
84 | 0 | conf.__set_value(confs.value()); |
85 | 0 | tconfs.emplace_back(conf); |
86 | 0 | } |
87 | 0 | params.__set_hdfs_conf(tconfs); |
88 | 0 | return params; |
89 | 0 | } |
90 | | |
91 | | } // namespace doris::io |