be/src/util/thrift_server.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "util/thrift_server.h" |
19 | | |
20 | | #include <glog/logging.h> |
21 | | #include <thrift/Thrift.h> |
22 | | #include <thrift/concurrency/ThreadFactory.h> |
23 | | #include <thrift/concurrency/ThreadManager.h> |
24 | | #include <thrift/protocol/TBinaryProtocol.h> |
25 | | #include <thrift/protocol/TProtocol.h> |
26 | | #include <thrift/server/TNonblockingServer.h> |
27 | | #include <thrift/server/TThreadPoolServer.h> |
28 | | #include <thrift/server/TThreadedServer.h> |
29 | | #include <thrift/transport/TBufferTransports.h> |
30 | | #include <thrift/transport/TNonblockingServerSocket.h> |
31 | | #include <thrift/transport/TServerSocket.h> |
32 | | #include <thrift/transport/TSocket.h> |
33 | | #include <thrift/transport/TTransport.h> |
34 | | // IWYU pragma: no_include <bits/chrono.h> |
35 | | #include <chrono> // IWYU pragma: keep |
36 | | #include <condition_variable> |
37 | | #include <memory> |
38 | | #include <mutex> |
39 | | #include <sstream> |
40 | | #include <thread> |
41 | | |
42 | | #include "common/config.h" |
43 | | #include "common/metrics/doris_metrics.h" |
44 | | #include "service/backend_options.h" |
45 | | |
46 | | namespace apache { |
47 | | namespace thrift { |
48 | | class TProcessor; |
49 | | |
50 | | namespace transport { |
51 | | class TServerTransport; |
52 | | } // namespace transport |
53 | | } // namespace thrift |
54 | | } // namespace apache |
55 | | |
56 | | namespace doris { |
57 | | |
58 | | DEFINE_GAUGE_METRIC_PROTOTYPE_3ARG(thrift_current_connections, MetricUnit::CONNECTIONS, |
59 | | "Number of currently active connections"); |
60 | | DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(thrift_connections_total, MetricUnit::CONNECTIONS, |
61 | | "Total connections made over the lifetime of this server"); |
62 | | |
63 | | // Nonblocking Server socket implementation of TNonblockingServerTransport. Wrapper around a unix |
64 | | // socket listen and accept calls. |
65 | | class ImprovedNonblockingServerSocket : public apache::thrift::transport::TNonblockingServerSocket { |
66 | | using TConfiguration = apache::thrift::TConfiguration; |
67 | | using TSocket = apache::thrift::transport::TSocket; |
68 | | |
69 | | public: |
70 | | // Constructor. |
71 | | ImprovedNonblockingServerSocket(int port) |
72 | 0 | : TNonblockingServerSocket(port), |
73 | 0 | config(std::make_shared<TConfiguration>(config::thrift_max_message_size)) {} |
74 | 0 | ~ImprovedNonblockingServerSocket() override = default; |
75 | | |
76 | | protected: |
77 | 0 | std::shared_ptr<TSocket> createSocket(THRIFT_SOCKET clientSocket) override { |
78 | 0 | return std::make_shared<TSocket>(clientSocket, config); |
79 | 0 | } |
80 | | |
81 | | private: |
82 | | std::shared_ptr<TConfiguration> config; |
83 | | }; |
84 | | |
85 | | class ImprovedBufferedTransportFactory |
86 | | : public apache::thrift::transport::TBufferedTransportFactory { |
87 | | using TConfiguration = apache::thrift::TConfiguration; |
88 | | using TTransport = apache::thrift::transport::TTransport; |
89 | | using TBufferedTransport = apache::thrift::transport::TBufferedTransport; |
90 | | |
91 | | public: |
92 | | ImprovedBufferedTransportFactory() |
93 | 14 | : config(std::make_shared<TConfiguration>(config::thrift_max_message_size)) {} |
94 | 6 | ~ImprovedBufferedTransportFactory() override = default; |
95 | | |
96 | 94 | std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> trans) override { |
97 | 94 | return std::make_shared<TBufferedTransport>(std::move(trans), config); |
98 | 94 | } |
99 | | |
100 | | private: |
101 | | std::shared_ptr<TConfiguration> config; |
102 | | }; |
103 | | |
104 | | // Helper class that starts a server in a separate thread, and handles |
105 | | // the inter-thread communication to monitor whether it started |
106 | | // correctly. |
107 | | class ThriftServer::ThriftServerEventProcessor |
108 | | : public apache::thrift::server::TServerEventHandler { |
109 | | public: |
110 | | ThriftServerEventProcessor(ThriftServer* thrift_server) |
111 | 14 | : _thrift_server(thrift_server), _signal_fired(false) {} |
112 | | |
113 | | // friendly to code style |
114 | 6 | ~ThriftServerEventProcessor() override = default; |
115 | | |
116 | | // Called by TNonBlockingServer when server has acquired its resources and is ready to |
117 | | // serve, and signals to StartAndWaitForServer that start-up is finished. |
118 | | // From TServerEventHandler. |
119 | | void preServe() override; |
120 | | |
121 | | // Called when a client connects; we create per-client state and call any |
122 | | // SessionHandlerIf handler. |
123 | | void* createContext(std::shared_ptr<apache::thrift::protocol::TProtocol> input, |
124 | | std::shared_ptr<apache::thrift::protocol::TProtocol> output) override; |
125 | | |
126 | | // Called when a client starts an RPC; we set the thread-local session key. |
127 | | void processContext(void* context, |
128 | | std::shared_ptr<apache::thrift::transport::TTransport> output) override; |
129 | | |
130 | | // Called when a client disconnects; we call any SessionHandlerIf handler. |
131 | | void deleteContext(void* serverContext, |
132 | | std::shared_ptr<apache::thrift::protocol::TProtocol> input, |
133 | | std::shared_ptr<apache::thrift::protocol::TProtocol> output) override; |
134 | | |
135 | | // Waits for a timeout of TIMEOUT_MS for a server to signal that it has started |
136 | | // correctly. |
137 | | Status start_and_wait_for_server(); |
138 | | |
139 | | private: |
140 | | // Lock used to ensure that there are no missed notifications between starting the |
141 | | // supervision thread and calling _signal_cond.timed_wait. Also used to ensure |
142 | | // thread-safe access to members of _thrift_server |
143 | | std::mutex _signal_lock; |
144 | | |
145 | | // Condition variable that is notified by the supervision thread once either |
146 | | // a) all is well or b) an error occurred. |
147 | | std::condition_variable _signal_cond; |
148 | | |
149 | | // The ThriftServer under management. This class is a friend of ThriftServer, and |
150 | | // reaches in to change member variables at will. |
151 | | ThriftServer* _thrift_server; |
152 | | |
153 | | // Guards against spurious condition variable wakeups |
154 | | bool _signal_fired; |
155 | | |
156 | | // The time, in milliseconds, to wait for a server to come up |
157 | | const static int TIMEOUT_MS; |
158 | | |
159 | | // Called in a separate thread; wraps TNonBlockingServer::serve in an exception handler |
160 | | void supervise(); |
161 | | }; |
162 | | |
163 | | // https://stackoverflow.com/questions/5391973/undefined-reference-to-static-const-int |
164 | | const int ThriftServer::ThriftServerEventProcessor::TIMEOUT_MS = 2500; |
165 | | |
166 | 14 | Status ThriftServer::ThriftServerEventProcessor::start_and_wait_for_server() { |
167 | | // Locking here protects against missed notifications if Supervise executes quickly |
168 | 14 | std::unique_lock<std::mutex> lock(_signal_lock); |
169 | 14 | _thrift_server->_started = false; |
170 | | |
171 | 14 | _thrift_server->_server_thread = std::make_unique<std::thread>( |
172 | 14 | &ThriftServer::ThriftServerEventProcessor::supervise, this); |
173 | | |
174 | | // Loop protects against spurious wakeup. Locks provide necessary fences to ensure |
175 | | // visibility. |
176 | 28 | while (!_signal_fired) { |
177 | | // Yields lock and allows supervision thread to continue and signal |
178 | 14 | std::cv_status cvsts = _signal_cond.wait_for(lock, std::chrono::milliseconds(TIMEOUT_MS)); |
179 | 14 | if (cvsts == std::cv_status::timeout) { |
180 | 0 | std::stringstream ss; |
181 | 0 | ss << "ThriftServer '" << _thrift_server->_name |
182 | 0 | << "' (on port: " << _thrift_server->_port << ") did not start within " << TIMEOUT_MS |
183 | 0 | << "ms"; |
184 | 0 | LOG(ERROR) << ss.str(); |
185 | 0 | return Status::InternalError(ss.str()); |
186 | 0 | } |
187 | 14 | } |
188 | | |
189 | | // _started == true only if preServe was called. May be false if there was an exception |
190 | | // after preServe that was caught by Supervise, causing it to reset the error condition. |
191 | 14 | if (!_thrift_server->_started) { |
192 | 0 | std::stringstream ss; |
193 | 0 | ss << "ThriftServer '" << _thrift_server->_name << "' (on port: " << _thrift_server->_port |
194 | 0 | << ") did not start correctly "; |
195 | 0 | LOG(ERROR) << ss.str(); |
196 | 0 | return Status::InternalError(ss.str()); |
197 | 0 | } |
198 | | |
199 | 14 | return Status::OK(); |
200 | 14 | } |
201 | | |
202 | 14 | void ThriftServer::ThriftServerEventProcessor::supervise() { |
203 | 14 | DCHECK(_thrift_server->_server.get() != nullptr); |
204 | | |
205 | 14 | try { |
206 | 14 | _thrift_server->_server->serve(); |
207 | 14 | } catch (apache::thrift::TException& e) { |
208 | 0 | LOG(ERROR) << "ThriftServer '" << _thrift_server->_name |
209 | 0 | << "' (on port: " << _thrift_server->_port |
210 | 0 | << ") exited due to TException: " << e.what(); |
211 | 0 | } |
212 | | |
213 | 14 | LOG(INFO) << "ThriftServer " << _thrift_server->_name << " exited"; |
214 | | |
215 | 6 | { |
216 | | // _signal_lock ensures mutual exclusion of access to _thrift_server |
217 | 6 | std::lock_guard<std::mutex> lock(_signal_lock); |
218 | 6 | _thrift_server->_started = false; |
219 | | |
220 | | // There may not be anyone waiting on this signal (if the |
221 | | // exception occurs after startup). That's not a problem, this is |
222 | | // just to avoid waiting for the timeout in case of a bind |
223 | | // failure, for example. |
224 | 6 | _signal_fired = true; |
225 | 6 | } |
226 | | |
227 | 6 | _signal_cond.notify_all(); |
228 | 6 | } |
229 | | |
230 | 14 | void ThriftServer::ThriftServerEventProcessor::preServe() { |
231 | | // Acquire the signal lock to ensure that StartAndWaitForServer is |
232 | | // waiting on _signal_cond when we notify. |
233 | 14 | std::lock_guard<std::mutex> lock(_signal_lock); |
234 | 14 | _signal_fired = true; |
235 | | |
236 | | // This is the (only) success path - if this is not reached within TIMEOUT_MS, |
237 | | // StartAndWaitForServer will indicate failure. |
238 | 14 | _thrift_server->_started = true; |
239 | | |
240 | | // Should only be one thread waiting on _signal_cond, but wake all just in case. |
241 | 14 | _signal_cond.notify_all(); |
242 | 14 | } |
243 | | |
244 | | // This thread-local variable contains the current session key for whichever |
245 | | // thrift server is currently serving a request on the current thread. |
246 | | __thread ThriftServer::SessionKey* _session_key; |
247 | | |
248 | 0 | ThriftServer::SessionKey* ThriftServer::get_thread_session_key() { |
249 | 0 | return _session_key; |
250 | 0 | } |
251 | | |
252 | | void* ThriftServer::ThriftServerEventProcessor::createContext( |
253 | | std::shared_ptr<apache::thrift::protocol::TProtocol> input, |
254 | 47 | std::shared_ptr<apache::thrift::protocol::TProtocol> output) { |
255 | 47 | std::stringstream ss; |
256 | | |
257 | 47 | apache::thrift::transport::TSocket* socket = nullptr; |
258 | 47 | apache::thrift::transport::TTransport* transport = input->getTransport().get(); |
259 | 47 | { |
260 | 47 | switch (_thrift_server->_server_type) { |
261 | 0 | case NON_BLOCKING: |
262 | 0 | socket = static_cast<apache::thrift::transport::TSocket*>( |
263 | 0 | static_cast<apache::thrift::transport::TFramedTransport*>(transport) |
264 | 0 | ->getUnderlyingTransport() |
265 | 0 | .get()); |
266 | 0 | break; |
267 | | |
268 | 0 | case THREAD_POOL: |
269 | 47 | case THREADED: |
270 | 47 | socket = static_cast<apache::thrift::transport::TSocket*>( |
271 | 47 | static_cast<apache::thrift::transport::TBufferedTransport*>(transport) |
272 | 47 | ->getUnderlyingTransport() |
273 | 47 | .get()); |
274 | 47 | break; |
275 | | |
276 | 0 | default: |
277 | 0 | DCHECK(false) << "Unexpected thrift server type"; |
278 | 47 | } |
279 | 47 | } |
280 | | |
281 | 47 | ss << socket->getPeerAddress() << ":" << socket->getPeerPort(); |
282 | | |
283 | 47 | { |
284 | 47 | std::lock_guard<std::mutex> _l(_thrift_server->_session_keys_lock); |
285 | | |
286 | 47 | std::shared_ptr<SessionKey> key_ptr(new std::string(ss.str())); |
287 | | |
288 | 47 | _session_key = key_ptr.get(); |
289 | 47 | _thrift_server->_session_keys[key_ptr.get()] = key_ptr; |
290 | 47 | } |
291 | | |
292 | 47 | if (_thrift_server->_session_handler != nullptr) { |
293 | 0 | _thrift_server->_session_handler->session_start(*_session_key); |
294 | 0 | } |
295 | | |
296 | 47 | _thrift_server->thrift_connections_total->increment(1L); |
297 | 47 | _thrift_server->thrift_current_connections->increment(1L); |
298 | | |
299 | | // Store the _session_key in the per-client context to avoid recomputing |
300 | | // it. If only this were accessible from RPC method calls, we wouldn't have to |
301 | | // mess around with thread locals. |
302 | 47 | return (void*)_session_key; |
303 | 47 | } |
304 | | |
305 | | void ThriftServer::ThriftServerEventProcessor::processContext( |
306 | 25.2k | void* context, std::shared_ptr<apache::thrift::transport::TTransport> transport) { |
307 | 25.2k | _session_key = reinterpret_cast<SessionKey*>(context); |
308 | 25.2k | } |
309 | | |
310 | | void ThriftServer::ThriftServerEventProcessor::deleteContext( |
311 | | void* serverContext, std::shared_ptr<apache::thrift::protocol::TProtocol> input, |
312 | 21 | std::shared_ptr<apache::thrift::protocol::TProtocol> output) { |
313 | 21 | _session_key = (SessionKey*)serverContext; |
314 | | |
315 | 21 | if (_thrift_server->_session_handler != nullptr) { |
316 | 0 | _thrift_server->_session_handler->session_end(*_session_key); |
317 | 0 | } |
318 | | |
319 | 21 | { |
320 | 21 | std::lock_guard<std::mutex> _l(_thrift_server->_session_keys_lock); |
321 | 21 | _thrift_server->_session_keys.erase(_session_key); |
322 | 21 | } |
323 | | |
324 | 21 | _thrift_server->thrift_current_connections->increment(-1L); |
325 | 21 | } |
326 | | |
327 | | ThriftServer::ThriftServer(const std::string& name, |
328 | | const std::shared_ptr<apache::thrift::TProcessor>& processor, int port, |
329 | | int num_worker_threads, ServerType server_type) |
330 | 14 | : _started(false), |
331 | 14 | _port(port), |
332 | 14 | _num_worker_threads(num_worker_threads), |
333 | 14 | _server_type(server_type), |
334 | 14 | _name(name), |
335 | 14 | _server_thread(nullptr), |
336 | 14 | _server(nullptr), |
337 | 14 | _processor(processor), |
338 | 14 | _session_handler(nullptr) { |
339 | 14 | _thrift_server_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity( |
340 | 14 | std::string("thrift_server.") + name, {{"name", name}}); |
341 | 14 | INT_GAUGE_METRIC_REGISTER(_thrift_server_metric_entity, thrift_current_connections); |
342 | 14 | INT_COUNTER_METRIC_REGISTER(_thrift_server_metric_entity, thrift_connections_total); |
343 | 14 | } |
344 | | |
345 | 6 | ThriftServer::~ThriftServer() { |
346 | 6 | stop(); |
347 | 6 | join(); |
348 | 6 | } |
349 | | |
350 | 14 | Status ThriftServer::start() { |
351 | 14 | DCHECK(!_started); |
352 | 14 | std::shared_ptr<apache::thrift::protocol::TProtocolFactory> protocol_factory( |
353 | 14 | new apache::thrift::protocol::TBinaryProtocolFactory()); |
354 | 14 | std::shared_ptr<apache::thrift::concurrency::ThreadManager> thread_mgr; |
355 | 14 | std::shared_ptr<apache::thrift::concurrency::ThreadFactory> thread_factory = |
356 | 14 | std::make_shared<apache::thrift::concurrency::ThreadFactory>(); |
357 | 14 | std::shared_ptr<apache::thrift::transport::TServerTransport> fe_server_transport; |
358 | 14 | std::shared_ptr<apache::thrift::transport::TTransportFactory> transport_factory; |
359 | 14 | std::shared_ptr<apache::thrift::transport::TNonblockingServerSocket> socket; |
360 | 14 | if (_server_type != THREADED) { |
361 | 0 | thread_mgr = apache::thrift::concurrency::ThreadManager::newSimpleThreadManager( |
362 | 0 | _num_worker_threads); |
363 | 0 | thread_mgr->threadFactory(thread_factory); |
364 | 0 | thread_mgr->start(); |
365 | 0 | } |
366 | | |
367 | | // Note - if you change the transport types here, you must check that the |
368 | | // logic in createContext is still accurate. |
369 | 14 | apache::thrift::transport::TServerSocket* server_socket = nullptr; |
370 | | |
371 | 14 | switch (_server_type) { |
372 | 0 | case NON_BLOCKING: |
373 | 0 | socket = std::make_shared<ImprovedNonblockingServerSocket>(_port); |
374 | 0 | if (transport_factory == nullptr) { |
375 | 0 | transport_factory.reset(new apache::thrift::transport::TTransportFactory()); |
376 | 0 | } |
377 | |
|
378 | 0 | _server = std::make_unique<apache::thrift::server::TNonblockingServer>( |
379 | 0 | _processor, transport_factory, transport_factory, protocol_factory, |
380 | 0 | protocol_factory, socket, thread_mgr); |
381 | 0 | break; |
382 | | |
383 | 0 | case THREAD_POOL: |
384 | 0 | fe_server_transport.reset(new apache::thrift::transport::TServerSocket( |
385 | 0 | BackendOptions::get_service_bind_address_without_bracket(), _port)); |
386 | |
|
387 | 0 | if (transport_factory == nullptr) { |
388 | 0 | transport_factory = std::make_shared<ImprovedBufferedTransportFactory>(); |
389 | 0 | } |
390 | |
|
391 | 0 | _server = std::make_unique<apache::thrift::server::TThreadPoolServer>( |
392 | 0 | _processor, fe_server_transport, transport_factory, protocol_factory, thread_mgr); |
393 | 0 | break; |
394 | | |
395 | 14 | case THREADED: |
396 | 14 | server_socket = new apache::thrift::transport::TServerSocket( |
397 | 14 | BackendOptions::get_service_bind_address_without_bracket(), _port); |
398 | 14 | fe_server_transport.reset(server_socket); |
399 | 14 | server_socket->setKeepAlive(true); |
400 | | |
401 | 14 | if (transport_factory == nullptr) { |
402 | 14 | transport_factory = std::make_shared<ImprovedBufferedTransportFactory>(); |
403 | 14 | } |
404 | | |
405 | 14 | _server = std::make_unique<apache::thrift::server::TThreadedServer>( |
406 | 14 | _processor, fe_server_transport, transport_factory, protocol_factory, |
407 | 14 | thread_factory); |
408 | 14 | break; |
409 | | |
410 | 0 | default: |
411 | 0 | std::stringstream error_msg; |
412 | 0 | error_msg << "Unsupported server type: " << _server_type; |
413 | 0 | LOG(ERROR) << error_msg.str(); |
414 | 0 | return Status::InternalError(error_msg.str()); |
415 | 14 | } |
416 | | |
417 | 14 | std::shared_ptr<ThriftServer::ThriftServerEventProcessor> event_processor( |
418 | 14 | new ThriftServer::ThriftServerEventProcessor(this)); |
419 | 14 | _server->setServerEventHandler(event_processor); |
420 | | |
421 | 14 | RETURN_IF_ERROR(event_processor->start_and_wait_for_server()); |
422 | | |
423 | 14 | LOG(INFO) << "ThriftServer '" << _name << "' started on port: " << _port; |
424 | | |
425 | 14 | DCHECK(_started); |
426 | 14 | return Status::OK(); |
427 | 14 | } |
428 | | |
429 | 12 | void ThriftServer::stop() { |
430 | 12 | _server->stop(); |
431 | 12 | } |
432 | | |
433 | 6 | void ThriftServer::join() { |
434 | 6 | DCHECK(_server_thread != nullptr); |
435 | 6 | _server_thread->join(); |
436 | 6 | } |
437 | | |
438 | 0 | void ThriftServer::stop_for_testing() { |
439 | 0 | DCHECK(_server_thread != nullptr); |
440 | 0 | DCHECK(_server); |
441 | 0 | DCHECK_EQ(_server_type, THREADED); |
442 | 0 | _server->stop(); |
443 | |
|
444 | 0 | if (_started) { |
445 | 0 | join(); |
446 | 0 | } |
447 | 0 | } |
448 | | } // namespace doris |