be/src/io/fs/hdfs/hdfs_mgr.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <gen_cpp/PlanNodes_types.h> |
21 | | |
22 | | #include <atomic> |
23 | | #include <memory> |
24 | | #include <mutex> |
25 | | #include <string> |
26 | | #include <thread> |
27 | | #include <unordered_map> |
28 | | |
29 | | #include "common/status.h" |
30 | | #include "io/fs/hdfs.h" |
31 | | #include "io/hdfs_util.h" |
32 | | |
33 | | namespace doris::io { |
34 | | |
35 | | // A manager class to handle multiple hdfsFS instances |
36 | | class HdfsMgr { |
37 | | public: |
38 | | HdfsMgr(); |
39 | | |
40 | | // Get or create a hdfsFS instance based on the given parameters |
41 | | Status get_or_create_fs(const THdfsParams& hdfs_params, const std::string& fs_name, |
42 | | std::shared_ptr<HdfsHandler>* fs_handler); |
43 | | |
44 | | virtual ~HdfsMgr(); |
45 | | |
46 | | protected: |
47 | | // For testing purpose |
48 | | friend class HdfsMgrTest; |
49 | | size_t get_fs_handlers_size() const { return _fs_handlers.size(); } |
50 | 0 | bool has_fs_handler(uint64_t hash_code) const { |
51 | 0 | return _fs_handlers.find(hash_code) != _fs_handlers.end(); |
52 | 0 | } |
53 | | void set_instance_timeout_seconds(int64_t timeout_seconds) { |
54 | | _instance_timeout_seconds = timeout_seconds; |
55 | | } |
56 | | void set_cleanup_interval_seconds(int64_t interval_seconds) { |
57 | | _cleanup_interval_seconds = interval_seconds; |
58 | | } |
59 | | uint64_t _hdfs_hash_code(const THdfsParams& hdfs_params, const std::string& fs_name); |
60 | | |
61 | | virtual Status _create_hdfs_fs_impl(const THdfsParams& hdfs_params, const std::string& fs_name, |
62 | | std::shared_ptr<HdfsHandler>* fs_handler); |
63 | | |
64 | | private: |
65 | | HdfsMgr(const HdfsMgr&) = delete; |
66 | | HdfsMgr& operator=(const HdfsMgr&) = delete; |
67 | | |
68 | | // Start the cleanup thread |
69 | | void _start_cleanup_thread(); |
70 | | // Stop the cleanup thread |
71 | | void _stop_cleanup_thread(); |
72 | | // Cleanup thread function |
73 | | void _cleanup_loop(); |
74 | | // Remove kerberos ticket cache if instance is using kerberos auth |
75 | | void _cleanup_kerberos_ticket(const HdfsHandler& handler); |
76 | | |
77 | | Status _create_hdfs_fs(const THdfsParams& hdfs_params, const std::string& fs_name, |
78 | | std::shared_ptr<HdfsHandler>* fs_handler); |
79 | | |
80 | | private: |
81 | | std::mutex _mutex; |
82 | | std::unordered_map<uint64_t, std::shared_ptr<HdfsHandler>> _fs_handlers; |
83 | | |
84 | | std::atomic<bool> _should_stop_cleanup_thread; |
85 | | std::unique_ptr<std::thread> _cleanup_thread; |
86 | | int64_t _cleanup_interval_seconds = 3600; // Run cleanup every hour |
87 | | int64_t _instance_timeout_seconds = 86400; // 24 hours timeout |
88 | | }; |
89 | | |
90 | | } // namespace doris::io |