/root/doris/be/src/io/hdfs_util.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "io/hdfs_util.h" |
19 | | |
20 | | #include <bthread/bthread.h> |
21 | | #include <bthread/butex.h> |
22 | | #include <bvar/latency_recorder.h> |
23 | | #include <gen_cpp/cloud.pb.h> |
24 | | |
25 | | #include <ostream> |
26 | | #include <thread> |
27 | | |
28 | | #include "common/logging.h" |
29 | | #include "io/fs/err_utils.h" |
30 | | #include "io/hdfs_builder.h" |
31 | | #include "vec/common/string_ref.h" |
32 | | |
33 | | namespace doris::io { |
34 | | namespace { |
35 | | |
36 | 0 | Status _create_hdfs_fs(const THdfsParams& hdfs_params, const std::string& fs_name, hdfsFS* fs) { |
37 | 0 | HDFSCommonBuilder builder; |
38 | 0 | RETURN_IF_ERROR(create_hdfs_builder(hdfs_params, fs_name, &builder)); |
39 | 0 | hdfsFS hdfs_fs = hdfsBuilderConnect(builder.get()); |
40 | 0 | if (hdfs_fs == nullptr) { |
41 | 0 | return Status::InternalError("failed to connect to hdfs {}: {}", fs_name, hdfs_error()); |
42 | 0 | } |
43 | 0 | *fs = hdfs_fs; |
44 | 0 | return Status::OK(); |
45 | 0 | } |
46 | | |
47 | | // https://brpc.apache.org/docs/server/basics/ |
48 | | // According to the brpc doc, JNI code checks stack layout and cannot be run in |
49 | | // bthreads so create a pthread for creating hdfs connection if necessary. |
50 | 0 | Status create_hdfs_fs(const THdfsParams& hdfs_params, const std::string& fs_name, hdfsFS* fs) { |
51 | 0 | bool is_pthread = bthread_self() == 0; |
52 | 0 | LOG(INFO) << "create hfdfs fs, is_pthread=" << is_pthread << " fs_name=" << fs_name; |
53 | 0 | if (is_pthread) { // running in pthread |
54 | 0 | return _create_hdfs_fs(hdfs_params, fs_name, fs); |
55 | 0 | } |
56 | | |
57 | | // running in bthread, switch to a pthread and wait |
58 | 0 | Status st; |
59 | 0 | auto btx = bthread::butex_create(); |
60 | 0 | *(int*)btx = 0; |
61 | 0 | std::thread t([&] { |
62 | 0 | st = _create_hdfs_fs(hdfs_params, fs_name, fs); |
63 | 0 | *(int*)btx = 1; |
64 | 0 | bthread::butex_wake_all(btx); |
65 | 0 | }); |
66 | 0 | std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, [&t, &btx](...) { |
67 | 0 | if (t.joinable()) t.join(); |
68 | 0 | bthread::butex_destroy(btx); |
69 | 0 | }); |
70 | 0 | timespec tmout {.tv_sec = std::chrono::system_clock::now().time_since_epoch().count() + 60, |
71 | 0 | .tv_nsec = 0}; |
72 | 0 | if (int ret = bthread::butex_wait(btx, 1, &tmout); ret != 0) { |
73 | 0 | std::string msg = "failed to wait _create_hdfs_fs fs_name=" + fs_name; |
74 | 0 | LOG(WARNING) << msg << " error=" << std::strerror(errno); |
75 | 0 | st = Status::Error<ErrorCode::INTERNAL_ERROR, false>(msg); |
76 | 0 | } |
77 | 0 | return st; |
78 | 0 | } |
79 | | |
80 | 0 | uint64_t hdfs_hash_code(const THdfsParams& hdfs_params, const std::string& fs_name) { |
81 | 0 | uint64_t hash_code = 0; |
82 | | // The specified fsname is used first. |
83 | | // If there is no specified fsname, the default fsname is used |
84 | 0 | if (!fs_name.empty()) { |
85 | 0 | hash_code ^= crc32_hash(fs_name); |
86 | 0 | } else if (hdfs_params.__isset.fs_name) { |
87 | 0 | hash_code ^= crc32_hash(hdfs_params.fs_name); |
88 | 0 | } |
89 | |
|
90 | 0 | if (hdfs_params.__isset.user) { |
91 | 0 | hash_code ^= crc32_hash(hdfs_params.user); |
92 | 0 | } |
93 | 0 | if (hdfs_params.__isset.hdfs_kerberos_principal) { |
94 | 0 | hash_code ^= crc32_hash(hdfs_params.hdfs_kerberos_principal); |
95 | 0 | } |
96 | 0 | if (hdfs_params.__isset.hdfs_kerberos_keytab) { |
97 | 0 | hash_code ^= crc32_hash(hdfs_params.hdfs_kerberos_keytab); |
98 | 0 | } |
99 | 0 | if (hdfs_params.__isset.hdfs_conf) { |
100 | 0 | std::map<std::string, std::string> conf_map; |
101 | 0 | for (const auto& conf : hdfs_params.hdfs_conf) { |
102 | 0 | conf_map[conf.key] = conf.value; |
103 | 0 | } |
104 | 0 | for (auto& conf : conf_map) { |
105 | 0 | hash_code ^= crc32_hash(conf.first); |
106 | 0 | hash_code ^= crc32_hash(conf.second); |
107 | 0 | } |
108 | 0 | } |
109 | 0 | return hash_code; |
110 | 0 | } |
111 | | |
112 | | } // namespace |
113 | | |
114 | | namespace hdfs_bvar { |
115 | | bvar::LatencyRecorder hdfs_read_latency("hdfs_read"); |
116 | | bvar::LatencyRecorder hdfs_write_latency("hdfs_write"); |
117 | | bvar::LatencyRecorder hdfs_create_dir_latency("hdfs_create_dir"); |
118 | | bvar::LatencyRecorder hdfs_open_latency("hdfs_open"); |
119 | | bvar::LatencyRecorder hdfs_close_latency("hdfs_close"); |
120 | | bvar::LatencyRecorder hdfs_flush_latency("hdfs_flush"); |
121 | | bvar::LatencyRecorder hdfs_hflush_latency("hdfs_hflush"); |
122 | | bvar::LatencyRecorder hdfs_hsync_latency("hdfs_hsync"); |
123 | | }; // namespace hdfs_bvar |
124 | | |
125 | 0 | void HdfsHandlerCache::_clean_invalid() { |
126 | 0 | std::vector<uint64_t> removed_handle; |
127 | 0 | for (auto& item : _cache) { |
128 | 0 | if (item.second.use_count() == 1 && item.second->invalid()) { |
129 | 0 | removed_handle.emplace_back(item.first); |
130 | 0 | } |
131 | 0 | } |
132 | 0 | for (auto& handle : removed_handle) { |
133 | 0 | _cache.erase(handle); |
134 | 0 | } |
135 | 0 | } |
136 | | |
137 | 0 | void HdfsHandlerCache::_clean_oldest() { |
138 | 0 | uint64_t oldest_time = ULONG_MAX; |
139 | 0 | uint64_t oldest = 0; |
140 | 0 | for (auto& item : _cache) { |
141 | 0 | if (item.second.use_count() == 1 && item.second->last_access_time() < oldest_time) { |
142 | 0 | oldest_time = item.second->last_access_time(); |
143 | 0 | oldest = item.first; |
144 | 0 | } |
145 | 0 | } |
146 | 0 | _cache.erase(oldest); |
147 | 0 | } |
148 | | |
149 | | Status HdfsHandlerCache::get_connection(const THdfsParams& hdfs_params, const std::string& fs_name, |
150 | 0 | std::shared_ptr<HdfsHandler>* fs_handle) { |
151 | 0 | uint64_t hash_code = hdfs_hash_code(hdfs_params, fs_name); |
152 | 0 | { |
153 | 0 | std::lock_guard<std::mutex> l(_lock); |
154 | 0 | auto it = _cache.find(hash_code); |
155 | 0 | if (it != _cache.end()) { |
156 | 0 | std::shared_ptr<HdfsHandler> handle = it->second; |
157 | 0 | if (!handle->invalid()) { |
158 | 0 | handle->update_last_access_time(); |
159 | 0 | *fs_handle = std::move(handle); |
160 | 0 | return Status::OK(); |
161 | 0 | } |
162 | | // fs handle is invalid, erase it. |
163 | 0 | _cache.erase(it); |
164 | 0 | LOG(INFO) << "erase the hdfs handle, fs name: " << fs_name; |
165 | 0 | } |
166 | | |
167 | | // not find in cache, or fs handle is invalid |
168 | | // create a new one and try to put it into cache |
169 | 0 | hdfsFS hdfs_fs = nullptr; |
170 | 0 | RETURN_IF_ERROR(create_hdfs_fs(hdfs_params, fs_name, &hdfs_fs)); |
171 | 0 | if (_cache.size() >= MAX_CACHE_HANDLE) { |
172 | 0 | _clean_invalid(); |
173 | 0 | _clean_oldest(); |
174 | 0 | } |
175 | 0 | if (_cache.size() < MAX_CACHE_HANDLE) { |
176 | 0 | auto handle = std::make_shared<HdfsHandler>(hdfs_fs, true); |
177 | 0 | handle->update_last_access_time(); |
178 | 0 | *fs_handle = handle; |
179 | 0 | _cache[hash_code] = std::move(handle); |
180 | 0 | } else { |
181 | 0 | *fs_handle = std::make_shared<HdfsHandler>(hdfs_fs, false); |
182 | 0 | } |
183 | 0 | } |
184 | 0 | return Status::OK(); |
185 | 0 | } |
186 | | |
187 | 0 | Path convert_path(const Path& path, const std::string& namenode) { |
188 | 0 | std::string fs_path; |
189 | 0 | if (path.native().find(namenode) != std::string::npos) { |
190 | | // `path` is uri format, remove the namenode part in `path` |
191 | | // FIXME(plat1ko): Not robust if `namenode` doesn't appear at the beginning of `path` |
192 | 0 | fs_path = path.native().substr(namenode.size()); |
193 | 0 | } else { |
194 | 0 | fs_path = path; |
195 | 0 | } |
196 | | |
197 | | // Always use absolute path (start with '/') in hdfs |
198 | 0 | if (fs_path.empty() || fs_path[0] != '/') { |
199 | 0 | fs_path.insert(fs_path.begin(), '/'); |
200 | 0 | } |
201 | 0 | return fs_path; |
202 | 0 | } |
203 | | |
204 | 0 | bool is_hdfs(const std::string& path_or_fs) { |
205 | 0 | return path_or_fs.rfind("hdfs://") == 0; |
206 | 0 | } |
207 | | |
208 | 0 | THdfsParams to_hdfs_params(const cloud::HdfsVaultInfo& vault) { |
209 | 0 | THdfsParams params; |
210 | 0 | auto build_conf = vault.build_conf(); |
211 | 0 | params.__set_fs_name(build_conf.fs_name()); |
212 | 0 | if (build_conf.has_user()) { |
213 | 0 | params.__set_user(build_conf.user()); |
214 | 0 | } |
215 | 0 | if (build_conf.has_hdfs_kerberos_principal()) { |
216 | 0 | params.__set_hdfs_kerberos_principal(build_conf.hdfs_kerberos_principal()); |
217 | 0 | } |
218 | 0 | if (build_conf.has_hdfs_kerberos_keytab()) { |
219 | 0 | params.__set_hdfs_kerberos_keytab(build_conf.hdfs_kerberos_keytab()); |
220 | 0 | } |
221 | 0 | std::vector<THdfsConf> tconfs; |
222 | 0 | for (const auto& confs : vault.build_conf().hdfs_confs()) { |
223 | 0 | THdfsConf conf; |
224 | 0 | conf.__set_key(confs.key()); |
225 | 0 | conf.__set_value(confs.value()); |
226 | 0 | tconfs.emplace_back(conf); |
227 | 0 | } |
228 | 0 | params.__set_hdfs_conf(tconfs); |
229 | 0 | return params; |
230 | 0 | } |
231 | | |
232 | | } // namespace doris::io |