be/src/io/fs/hdfs/hdfs_mgr.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "io/fs/hdfs/hdfs_mgr.h" |
19 | | |
20 | | #include <bthread/bthread.h> |
21 | | #include <bthread/butex.h> |
22 | | |
23 | | #include <chrono> |
24 | | #include <thread> |
25 | | |
26 | | #include "common/config.h" |
27 | | #include "common/kerberos/kerberos_ticket_mgr.h" |
28 | | #include "common/logging.h" |
29 | | #include "core/string_ref.h" |
30 | | #include "io/fs/err_utils.h" |
31 | | #include "io/hdfs_builder.h" |
32 | | #include "io/hdfs_util.h" |
33 | | #include "runtime/exec_env.h" |
34 | | |
35 | | namespace doris::io { |
36 | | |
37 | 12 | HdfsMgr::HdfsMgr() : _should_stop_cleanup_thread(false) { |
38 | 12 | _start_cleanup_thread(); |
39 | 12 | } |
40 | | |
41 | 8 | HdfsMgr::~HdfsMgr() { |
42 | 8 | _stop_cleanup_thread(); |
43 | 8 | } |
44 | | |
45 | 12 | void HdfsMgr::_start_cleanup_thread() { |
46 | 12 | _cleanup_thread = std::make_unique<std::thread>(&HdfsMgr::_cleanup_loop, this); |
47 | 12 | } |
48 | | |
49 | 8 | void HdfsMgr::_stop_cleanup_thread() { |
50 | 8 | if (_cleanup_thread) { |
51 | 8 | _should_stop_cleanup_thread = true; |
52 | 8 | _cleanup_thread->join(); |
53 | 8 | _cleanup_thread.reset(); |
54 | 8 | } |
55 | 8 | } |
56 | | |
57 | 12 | void HdfsMgr::_cleanup_loop() { |
58 | | #ifdef BE_TEST |
59 | | static constexpr int64_t CHECK_INTERVAL_SECONDS = 1; // For testing purpose |
60 | | #else |
61 | 12 | static constexpr int64_t CHECK_INTERVAL_SECONDS = 5; // Check stop flag every 5 seconds |
62 | 12 | #endif |
63 | 12 | uint64_t last_cleanup_time = std::time(nullptr); |
64 | | |
65 | 2.87k | while (!_should_stop_cleanup_thread) { |
66 | 2.85k | uint64_t current_time = std::time(nullptr); |
67 | | |
68 | | // Only perform cleanup if enough time has passed |
69 | 2.85k | if (current_time - last_cleanup_time >= _cleanup_interval_seconds) { |
70 | | // Collect expired handlers under lock |
71 | 6 | std::vector<std::shared_ptr<HdfsHandler>> handlers_to_cleanup; |
72 | 6 | { |
73 | 6 | std::lock_guard<std::mutex> lock(_mutex); |
74 | 6 | std::vector<uint64_t> to_remove; |
75 | | |
76 | | // Find expired handlers |
77 | 42 | for (const auto& entry : _fs_handlers) { |
78 | 42 | bool is_expired = current_time - entry.second->last_access_time >= |
79 | 42 | _instance_timeout_seconds; |
80 | | // bool is_krb_expired = |
81 | | // entry.second->is_kerberos_auth && |
82 | | // (current_time - entry.second->create_time >= |
83 | | // entry.second->ticket_cache->get_ticket_lifetime_sec() / 2); |
84 | 42 | if (is_expired) { |
85 | 1 | LOG(INFO) << "Found expired HDFS handler, hash_code=" << entry.first |
86 | 1 | << ", last_access_time=" << entry.second->last_access_time |
87 | 1 | << ", is_kerberos=" << entry.second->is_kerberos_auth |
88 | 1 | << ", principal=" << entry.second->principal |
89 | 1 | << ", fs_name=" << entry.second->fs_name |
90 | 1 | << ", is_expired=" << is_expired; |
91 | | // << ", is_krb_expire=" << is_krb_expired; |
92 | 1 | to_remove.push_back(entry.first); |
93 | 1 | handlers_to_cleanup.push_back(entry.second); |
94 | 1 | } |
95 | 42 | } |
96 | | |
97 | | // Remove expired handlers from map under lock |
98 | 6 | for (uint64_t hash_code : to_remove) { |
99 | 1 | _fs_handlers.erase(hash_code); |
100 | 1 | } |
101 | 6 | } |
102 | | |
103 | | // Cleanup handlers outside lock |
104 | 6 | for (const auto& handler : handlers_to_cleanup) { |
105 | 1 | LOG(INFO) << "Start to cleanup HDFS handler" |
106 | 1 | << ", is_kerberos=" << handler->is_kerberos_auth |
107 | 1 | << ", principal=" << handler->principal |
108 | 1 | << ", fs_name=" << handler->fs_name; |
109 | | |
110 | | // The kerberos ticket cache will be automatically cleaned up when the last reference is gone |
111 | | // DO NOT call hdfsDisconnect(), or we will meet "Filesystem closed" |
112 | | // even if we create a new one |
113 | | // hdfsDisconnect(handler->hdfs_fs); |
114 | | |
115 | 1 | LOG(INFO) << "Finished cleanup HDFS handler" |
116 | 1 | << ", fs_name=" << handler->fs_name; |
117 | 1 | } |
118 | | |
119 | 6 | handlers_to_cleanup.clear(); |
120 | 6 | last_cleanup_time = current_time; |
121 | 6 | } |
122 | | |
123 | | // Sleep for a short interval to check stop flag more frequently |
124 | 2.85k | std::this_thread::sleep_for(std::chrono::seconds(CHECK_INTERVAL_SECONDS)); |
125 | 2.85k | } |
126 | 12 | } |
127 | | |
128 | | Status HdfsMgr::get_or_create_fs(const THdfsParams& hdfs_params, const std::string& fs_name, |
129 | 54.0k | std::shared_ptr<HdfsHandler>* fs_handler) { |
130 | 54.0k | #ifdef USE_HADOOP_HDFS |
131 | 54.0k | if (!config::enable_java_support) { |
132 | 0 | return Status::InvalidArgument( |
133 | 0 | "hdfs file system is not enabled, you can change be config enable_java_support to " |
134 | 0 | "true."); |
135 | 0 | } |
136 | 54.0k | #endif |
137 | 54.0k | uint64_t hash_code = _hdfs_hash_code(hdfs_params, fs_name); |
138 | | |
139 | | // First check without lock |
140 | 54.0k | { |
141 | 54.0k | std::lock_guard<std::mutex> lock(_mutex); |
142 | 54.0k | auto it = _fs_handlers.find(hash_code); |
143 | 54.0k | if (it != _fs_handlers.end()) { |
144 | 54.0k | LOG(INFO) << "Reuse existing HDFS handler, hash_code=" << hash_code |
145 | 54.0k | << ", is_kerberos=" << it->second->is_kerberos_auth |
146 | 54.0k | << ", principal=" << it->second->principal << ", fs_name=" << fs_name; |
147 | 54.0k | it->second->update_access_time(); |
148 | 54.0k | *fs_handler = it->second; |
149 | 54.0k | return Status::OK(); |
150 | 54.0k | } |
151 | 54.0k | } |
152 | | |
153 | | // Create new hdfsFS handler outside the lock |
154 | 54.0k | LOG(INFO) << "Start to create new HDFS handler, hash_code=" << hash_code |
155 | 4 | << ", fs_name=" << fs_name; |
156 | | |
157 | 4 | std::shared_ptr<HdfsHandler> new_fs_handler; |
158 | 4 | RETURN_IF_ERROR(_create_hdfs_fs(hdfs_params, fs_name, &new_fs_handler)); |
159 | | |
160 | | // Double check with lock before inserting |
161 | 18.4E | { |
162 | 18.4E | std::lock_guard<std::mutex> lock(_mutex); |
163 | 18.4E | auto it = _fs_handlers.find(hash_code); |
164 | 18.4E | if (it != _fs_handlers.end()) { |
165 | | // Another thread has created the handler, use it instead |
166 | 39 | LOG(INFO) << "Another thread created HDFS handler, reuse it, hash_code=" << hash_code |
167 | 39 | << ", is_kerberos=" << it->second->is_kerberos_auth |
168 | 39 | << ", principal=" << it->second->principal << ", fs_name=" << fs_name; |
169 | 39 | it->second->update_access_time(); |
170 | 39 | *fs_handler = it->second; |
171 | 39 | return Status::OK(); |
172 | 39 | } |
173 | | |
174 | | // Store the new handler |
175 | 18.4E | *fs_handler = new_fs_handler; |
176 | 18.4E | _fs_handlers[hash_code] = new_fs_handler; |
177 | | |
178 | 18.4E | LOG(INFO) << "Finished create new HDFS handler, hash_code=" << hash_code |
179 | 18.4E | << ", is_kerberos=" << new_fs_handler->is_kerberos_auth |
180 | 18.4E | << ", principal=" << new_fs_handler->principal << ", fs_name=" << fs_name; |
181 | 18.4E | } |
182 | | |
183 | 0 | return Status::OK(); |
184 | 18.4E | } |
185 | | |
186 | | Status HdfsMgr::_create_hdfs_fs_impl(const THdfsParams& hdfs_params, const std::string& fs_name, |
187 | 96 | std::shared_ptr<HdfsHandler>* fs_handler) { |
188 | 96 | HDFSCommonBuilder builder; |
189 | 96 | RETURN_IF_ERROR(create_hdfs_builder(hdfs_params, fs_name, &builder)); |
190 | 96 | hdfsFS hdfs_fs = hdfsBuilderConnect(builder.get()); |
191 | 96 | if (hdfs_fs == nullptr) { |
192 | 14 | return Status::InternalError("failed to connect to hdfs {}: {}", fs_name, hdfs_error()); |
193 | 14 | } |
194 | | |
195 | 82 | bool is_kerberos = builder.is_kerberos(); |
196 | 82 | *fs_handler = std::make_shared<HdfsHandler>( |
197 | 82 | hdfs_fs, is_kerberos, is_kerberos ? hdfs_params.hdfs_kerberos_principal : "", |
198 | 82 | is_kerberos ? hdfs_params.hdfs_kerberos_keytab : "", fs_name); |
199 | | // builder.get_ticket_cache()); |
200 | 82 | return Status::OK(); |
201 | 96 | } |
202 | | |
203 | | // https://brpc.apache.org/docs/server/basics/ |
204 | | // According to the brpc doc, JNI code checks stack layout and cannot be run in |
205 | | // bthreads so create a pthread for creating hdfs connection if necessary. |
206 | | Status HdfsMgr::_create_hdfs_fs(const THdfsParams& hdfs_params, const std::string& fs_name, |
207 | 102 | std::shared_ptr<HdfsHandler>* fs_handler) { |
208 | 102 | bool is_pthread = bthread_self() == 0; |
209 | 102 | LOG(INFO) << "create hdfs fs, is_pthread=" << is_pthread << " fs_name=" << fs_name; |
210 | 102 | if (is_pthread) { // running in pthread |
211 | 102 | return _create_hdfs_fs_impl(hdfs_params, fs_name, fs_handler); |
212 | 102 | } |
213 | | |
214 | | // running in bthread, switch to a pthread and wait |
215 | 0 | Status st; |
216 | 0 | auto btx = bthread::butex_create(); |
217 | 0 | *(int*)btx = 0; |
218 | 0 | std::thread t([&] { |
219 | 0 | st = _create_hdfs_fs_impl(hdfs_params, fs_name, fs_handler); |
220 | 0 | *(int*)btx = 1; |
221 | 0 | bthread::butex_wake_all(btx); |
222 | 0 | }); |
223 | 0 | std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, [&t, &btx](...) { |
224 | 0 | if (t.joinable()) t.join(); |
225 | 0 | bthread::butex_destroy(btx); |
226 | 0 | }); |
227 | 0 | timespec tmout {.tv_sec = std::chrono::system_clock::now().time_since_epoch().count() + 60, |
228 | 0 | .tv_nsec = 0}; |
229 | 0 | if (int ret = bthread::butex_wait(btx, 1, &tmout); ret != 0) { |
230 | 0 | std::string msg = "failed to wait create_hdfs_fs finish. fs_name=" + fs_name; |
231 | 0 | LOG(WARNING) << msg << " error=" << std::strerror(errno); |
232 | 0 | st = Status::Error<ErrorCode::INTERNAL_ERROR, false>(msg); |
233 | 0 | } |
234 | 0 | return st; |
235 | 102 | } |
236 | | |
237 | 54.0k | uint64_t HdfsMgr::_hdfs_hash_code(const THdfsParams& hdfs_params, const std::string& fs_name) { |
238 | 54.0k | uint64_t hash_code = 0; |
239 | | // The specified fsname is used first. |
240 | | // If there is no specified fsname, the default fsname is used |
241 | 54.0k | if (!fs_name.empty()) { |
242 | 53.9k | hash_code ^= crc32_hash(fs_name); |
243 | 53.9k | } else if (hdfs_params.__isset.fs_name) { |
244 | 0 | hash_code ^= crc32_hash(hdfs_params.fs_name); |
245 | 0 | } |
246 | | |
247 | 54.0k | if (hdfs_params.__isset.user) { |
248 | 9.08k | hash_code ^= crc32_hash(hdfs_params.user); |
249 | 9.08k | } |
250 | 54.0k | if (hdfs_params.__isset.hdfs_kerberos_principal) { |
251 | 2.07k | hash_code ^= crc32_hash(hdfs_params.hdfs_kerberos_principal); |
252 | 2.07k | } |
253 | 54.0k | if (hdfs_params.__isset.hdfs_kerberos_keytab) { |
254 | 2.07k | hash_code ^= crc32_hash(hdfs_params.hdfs_kerberos_keytab); |
255 | 2.07k | } |
256 | 54.0k | if (hdfs_params.__isset.hdfs_conf) { |
257 | 53.9k | std::map<std::string, std::string> conf_map; |
258 | 114k | for (const auto& conf : hdfs_params.hdfs_conf) { |
259 | 114k | conf_map[conf.key] = conf.value; |
260 | 114k | } |
261 | 114k | for (auto& conf : conf_map) { |
262 | 114k | hash_code ^= crc32_hash(conf.first); |
263 | 114k | hash_code ^= crc32_hash(conf.second); |
264 | 114k | } |
265 | 53.9k | } |
266 | 54.0k | return hash_code; |
267 | 54.0k | } |
268 | | |
269 | | } // namespace doris::io |