be/src/common/kerberos/kerberos_ticket_mgr.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "common/kerberos/kerberos_ticket_mgr.h" |
19 | | |
20 | | #include <chrono> |
21 | | #include <iomanip> |
22 | | #include <sstream> |
23 | | |
24 | | #include "common/logging.h" |
25 | | #include "core/block/block.h" |
26 | | #include "information_schema/schema_scanner_helper.h" |
27 | | #include "service/backend_options.h" |
28 | | |
29 | | namespace doris::kerberos { |
30 | | |
31 | 9 | KerberosTicketMgr::KerberosTicketMgr(const std::string& root_path) { |
32 | 9 | _root_path = root_path; |
33 | 9 | _start_cleanup_thread(); |
34 | 9 | } |
35 | | |
36 | 5 | KerberosTicketMgr::~KerberosTicketMgr() { |
37 | 5 | _stop_cleanup_thread(); |
38 | 5 | } |
39 | | |
40 | 9 | void KerberosTicketMgr::_start_cleanup_thread() { |
41 | 9 | _cleanup_thread = std::make_unique<std::thread>(&KerberosTicketMgr::_cleanup_loop, this); |
42 | 9 | } |
43 | | |
44 | 5 | void KerberosTicketMgr::_stop_cleanup_thread() { |
45 | 5 | if (_cleanup_thread) { |
46 | 5 | _should_stop_cleanup_thread = true; |
47 | 5 | _cleanup_thread->join(); |
48 | 5 | _cleanup_thread.reset(); |
49 | 5 | } |
50 | 5 | } |
51 | | |
52 | 9 | void KerberosTicketMgr::_cleanup_loop() { |
53 | | #ifdef BE_TEST |
54 | | static constexpr int64_t CHECK_INTERVAL_SECONDS = 1; // For testing purpose |
55 | | #else |
56 | 9 | static constexpr int64_t CHECK_INTERVAL_SECONDS = 5; // Check stop flag every 5 seconds |
57 | 9 | #endif |
58 | 9 | uint64_t last_cleanup_time = std::time(nullptr); |
59 | | |
60 | 1.17k | while (!_should_stop_cleanup_thread) { |
61 | 1.16k | uint64_t current_time = std::time(nullptr); |
62 | | |
63 | | // Only perform cleanup if enough time has passed |
64 | 1.16k | if (current_time - last_cleanup_time >= _cleanup_interval.count()) { |
65 | 1 | std::vector<std::string> keys_to_remove; |
66 | 1 | { |
67 | 1 | std::lock_guard<std::mutex> lock(_mutex); |
68 | 1 | for (const auto& entry : _ticket_caches) { |
69 | | // Check if this is the last reference to the ticket cache |
70 | | // use_count() == 1 means it is only referenced in the _ticket_caches map |
71 | 0 | if (entry.second.cache.use_count() == 1) { |
72 | 0 | LOG(INFO) << "Found unused Kerberos ticket cache for principal: " |
73 | 0 | << entry.second.cache->get_config().get_principal() |
74 | 0 | << ", keytab: " |
75 | 0 | << entry.second.cache->get_config().get_keytab_path(); |
76 | 0 | keys_to_remove.push_back(entry.first); |
77 | 0 | } |
78 | 0 | } |
79 | | |
80 | | // Remove entries under lock |
81 | 1 | for (const auto& key : keys_to_remove) { |
82 | 0 | LOG(INFO) << "Removing unused Kerberos ticket cache for key: " << key; |
83 | 0 | _ticket_caches.erase(key); |
84 | 0 | } |
85 | 1 | } |
86 | | |
87 | 1 | last_cleanup_time = current_time; |
88 | 1 | } |
89 | | |
90 | | // Sleep for a short interval to check stop flag more frequently |
91 | 1.16k | std::this_thread::sleep_for(std::chrono::seconds(CHECK_INTERVAL_SECONDS)); |
92 | 1.16k | } |
93 | 9 | } |
94 | | |
95 | | Status KerberosTicketMgr::get_or_set_ticket_cache( |
96 | 3 | const KerberosConfig& config, std::shared_ptr<KerberosTicketCache>* ticket_cache) { |
97 | 3 | std::string key = config.get_hash_code(); |
98 | | |
99 | 3 | std::lock_guard<std::mutex> lock(_mutex); |
100 | | |
101 | | // Check if already exists |
102 | 3 | auto it = _ticket_caches.find(key); |
103 | 3 | if (it != _ticket_caches.end()) { |
104 | 1 | *ticket_cache = it->second.cache; |
105 | 1 | return Status::OK(); |
106 | 1 | } |
107 | | |
108 | | // Create new ticket cache |
109 | 2 | auto new_ticket_cache = _make_new_ticket_cache(config); |
110 | 2 | RETURN_IF_ERROR(new_ticket_cache->initialize()); |
111 | 1 | RETURN_IF_ERROR(new_ticket_cache->login()); |
112 | 1 | RETURN_IF_ERROR(new_ticket_cache->write_ticket_cache()); |
113 | 1 | new_ticket_cache->start_periodic_refresh(); |
114 | | |
115 | | // Insert into _ticket_caches |
116 | 1 | KerberosTicketEntry entry {.cache = new_ticket_cache, |
117 | 1 | .last_access_time = std::chrono::steady_clock::now()}; |
118 | 1 | auto [inserted_it, success] = _ticket_caches.emplace(key, std::move(entry)); |
119 | 1 | if (!success) { |
120 | 0 | return Status::InternalError("Failed to insert ticket cache into map"); |
121 | 0 | } |
122 | | |
123 | 1 | *ticket_cache = new_ticket_cache; |
124 | 1 | LOG(INFO) << "create new kerberos ticket cache: " |
125 | 1 | << inserted_it->second.cache->get_ticket_cache_path(); |
126 | 1 | return Status::OK(); |
127 | 1 | } |
128 | | |
129 | | std::shared_ptr<KerberosTicketCache> KerberosTicketMgr::get_ticket_cache( |
130 | 1 | const std::string& principal, const std::string& keytab_path) { |
131 | 1 | std::string key = KerberosConfig::get_hash_code(principal, keytab_path); |
132 | | |
133 | 1 | std::lock_guard<std::mutex> lock(_mutex); |
134 | 1 | auto it = _ticket_caches.find(key); |
135 | 1 | if (it != _ticket_caches.end()) { |
136 | 0 | return it->second.cache; |
137 | 0 | } |
138 | 1 | return nullptr; |
139 | 1 | } |
140 | | |
141 | | std::shared_ptr<KerberosTicketCache> KerberosTicketMgr::_make_new_ticket_cache( |
142 | 0 | const KerberosConfig& config) { |
143 | 0 | return std::make_shared<KerberosTicketCache>(config, _root_path); |
144 | 0 | } |
145 | | |
146 | 1 | std::vector<KerberosTicketInfo> KerberosTicketMgr::get_krb_ticket_cache_info() { |
147 | 1 | std::vector<KerberosTicketInfo> result; |
148 | 1 | std::lock_guard<std::mutex> lock(_mutex); |
149 | | |
150 | 1 | for (const auto& entry : _ticket_caches) { |
151 | 0 | auto cache_info = entry.second.cache->get_ticket_info(); |
152 | 0 | result.insert(result.end(), cache_info.begin(), cache_info.end()); |
153 | 0 | } |
154 | | |
155 | 1 | return result; |
156 | 1 | } |
157 | | |
158 | 1 | void KerberosTicketMgr::get_ticket_cache_info_block(Block* block, const cctz::time_zone& ctz) { |
159 | 1 | TBackend be = BackendOptions::get_local_backend(); |
160 | 1 | int64_t be_id = be.id; |
161 | 1 | std::string be_ip = be.host; |
162 | 1 | std::vector<KerberosTicketInfo> infos = get_krb_ticket_cache_info(); |
163 | 1 | for (auto& info : infos) { |
164 | 0 | SchemaScannerHelper::insert_int64_value(0, be_id, block); |
165 | 0 | SchemaScannerHelper::insert_string_value(1, be_ip, block); |
166 | 0 | SchemaScannerHelper::insert_string_value(2, info.principal, block); |
167 | 0 | SchemaScannerHelper::insert_string_value(3, info.keytab_path, block); |
168 | 0 | SchemaScannerHelper::insert_string_value(4, info.service_principal, block); |
169 | 0 | SchemaScannerHelper::insert_string_value(5, info.cache_path, block); |
170 | 0 | SchemaScannerHelper::insert_string_value(6, info.hash_code, block); |
171 | 0 | SchemaScannerHelper::insert_datetime_value(7, info.start_time, ctz, block); |
172 | 0 | SchemaScannerHelper::insert_datetime_value(8, info.expiry_time, ctz, block); |
173 | 0 | SchemaScannerHelper::insert_datetime_value(9, info.auth_time, ctz, block); |
174 | 0 | SchemaScannerHelper::insert_int64_value(10, info.use_count, block); |
175 | 0 | SchemaScannerHelper::insert_int64_value(11, info.refresh_interval_second, block); |
176 | 0 | } |
177 | 1 | } |
178 | | |
179 | | } // namespace doris::kerberos |