Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "util/thrift_server.h" |
19 | | |
20 | | #include <glog/logging.h> |
21 | | #include <thrift/Thrift.h> |
22 | | #include <thrift/concurrency/ThreadFactory.h> |
23 | | #include <thrift/concurrency/ThreadManager.h> |
24 | | #include <thrift/protocol/TBinaryProtocol.h> |
25 | | #include <thrift/protocol/TProtocol.h> |
26 | | #include <thrift/server/TNonblockingServer.h> |
27 | | #include <thrift/server/TThreadPoolServer.h> |
28 | | #include <thrift/server/TThreadedServer.h> |
29 | | #include <thrift/transport/TBufferTransports.h> |
30 | | #include <thrift/transport/TNonblockingServerSocket.h> |
31 | | #include <thrift/transport/TServerSocket.h> |
32 | | #include <thrift/transport/TSocket.h> |
33 | | #include <thrift/transport/TTransport.h> |
34 | | #include <thrift/transport/TTransportException.h> |
35 | | // IWYU pragma: no_include <bits/chrono.h> |
36 | | #include <chrono> // IWYU pragma: keep |
37 | | #include <condition_variable> |
38 | | #include <memory> |
39 | | #include <mutex> |
40 | | #include <sstream> |
41 | | #include <string_view> |
42 | | #include <thread> |
43 | | #include <utility> |
44 | | |
45 | | #include "common/config.h" |
46 | | #include "common/metrics/doris_metrics.h" |
47 | | #include "service/backend_options.h" |
48 | | |
49 | | namespace apache { |
50 | | namespace thrift { |
51 | | class TProcessor; |
52 | | |
53 | | namespace transport { |
54 | | class TServerTransport; |
55 | | } // namespace transport |
56 | | } // namespace thrift |
57 | | } // namespace apache |
58 | | |
59 | | namespace doris { |
60 | | |
61 | | DEFINE_GAUGE_METRIC_PROTOTYPE_3ARG(thrift_current_connections, MetricUnit::CONNECTIONS, |
62 | | "Number of currently active connections"); |
63 | | DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(thrift_connections_total, MetricUnit::CONNECTIONS, |
64 | | "Total connections made over the lifetime of this server"); |
65 | | |
66 | 32 | bool is_unknown_eintr(const apache::thrift::transport::TTransportException& e) { |
67 | 32 | return e.getType() == apache::thrift::transport::TTransportException::UNKNOWN && |
68 | 32 | std::string_view(e.what()).find("Interrupted system call") != std::string_view::npos; |
69 | 32 | } |
70 | | |
71 | | // Full-process stack collection sends a diagnostic signal to many BE threads. Even with |
72 | | // SA_RESTART, Linux can still return EINTR from poll/select/epoll-style waits, and Thrift wraps |
73 | | // that as an UNKNOWN TTransportException with "Interrupted system call". If this exception escapes |
74 | | // the blocking thrift server loop, heartbeat/backend thrift services can exit while the BE process |
75 | | // is still partly alive, making FE report the BE as Alive=false. This wrapper keeps the normal |
76 | | // TServerSocket behavior but retries only that narrow EINTR case on accept/read/peek; all other |
77 | | // transport errors are still propagated. |
78 | | class ImprovedServerSocket final : public apache::thrift::transport::TServerSocket { |
79 | | using TConfiguration = apache::thrift::TConfiguration; |
80 | | using TServerSocket = apache::thrift::transport::TServerSocket; |
81 | | using TSocket = apache::thrift::transport::TSocket; |
82 | | using TTransport = apache::thrift::transport::TTransport; |
83 | | using TTransportException = apache::thrift::transport::TTransportException; |
84 | | |
85 | | class RetryingSocket final : public TSocket { |
86 | | public: |
87 | | RetryingSocket(THRIFT_SOCKET socket, std::shared_ptr<THRIFT_SOCKET> interrupt_listener, |
88 | | std::shared_ptr<TConfiguration> config) |
89 | 66 | : TSocket(socket, std::move(interrupt_listener), std::move(config)) {} |
90 | | |
91 | 253k | uint32_t read(uint8_t* buf, uint32_t len) override { |
92 | 253k | while (true) { |
93 | 253k | try { |
94 | 253k | return TSocket::read(buf, len); |
95 | 253k | } catch (const TTransportException& e) { |
96 | 26 | if (!is_unknown_eintr(e)) { |
97 | 26 | throw; |
98 | 26 | } |
99 | 26 | } |
100 | 253k | } |
101 | 253k | } |
102 | | |
103 | 0 | bool peek() override { |
104 | 0 | while (true) { |
105 | 0 | try { |
106 | 0 | return TSocket::peek(); |
107 | 0 | } catch (const TTransportException& e) { |
108 | 0 | if (!is_unknown_eintr(e)) { |
109 | 0 | throw; |
110 | 0 | } |
111 | 0 | } |
112 | 0 | } |
113 | 0 | } |
114 | | }; |
115 | | |
116 | | public: |
117 | | ImprovedServerSocket(const std::string& address, int port) |
118 | 14 | : TServerSocket(address, port), |
119 | 14 | _config(std::make_shared<TConfiguration>(config::thrift_max_message_size)) {} |
120 | 6 | ~ImprovedServerSocket() override = default; |
121 | | |
122 | | protected: |
123 | 80 | std::shared_ptr<TTransport> acceptImpl() override { |
124 | 80 | while (true) { |
125 | 80 | try { |
126 | 80 | return TServerSocket::acceptImpl(); |
127 | 80 | } catch (const TTransportException& e) { |
128 | 6 | if (!is_unknown_eintr(e)) { |
129 | 6 | throw; |
130 | 6 | } |
131 | 6 | } |
132 | 80 | } |
133 | 80 | } |
134 | | |
135 | 66 | std::shared_ptr<TSocket> createSocket(THRIFT_SOCKET client) override { |
136 | 66 | return std::make_shared<RetryingSocket>( |
137 | 66 | client, interruptableChildren_ ? pChildInterruptSockReader_ : nullptr, _config); |
138 | 66 | } |
139 | | |
140 | | private: |
141 | | std::shared_ptr<TConfiguration> _config; |
142 | | }; |
143 | | |
144 | | // Nonblocking Server socket implementation of TNonblockingServerTransport. Wrapper around a unix |
145 | | // socket listen and accept calls. |
146 | | class ImprovedNonblockingServerSocket : public apache::thrift::transport::TNonblockingServerSocket { |
147 | | using TConfiguration = apache::thrift::TConfiguration; |
148 | | using TSocket = apache::thrift::transport::TSocket; |
149 | | |
150 | | public: |
151 | | // Constructor. |
152 | | ImprovedNonblockingServerSocket(int port) |
153 | 0 | : TNonblockingServerSocket(port), |
154 | 0 | config(std::make_shared<TConfiguration>(config::thrift_max_message_size)) {} |
155 | 0 | ~ImprovedNonblockingServerSocket() override = default; |
156 | | |
157 | | protected: |
158 | 0 | std::shared_ptr<TSocket> createSocket(THRIFT_SOCKET clientSocket) override { |
159 | 0 | return std::make_shared<TSocket>(clientSocket, config); |
160 | 0 | } |
161 | | |
162 | | private: |
163 | | std::shared_ptr<TConfiguration> config; |
164 | | }; |
165 | | |
166 | | class ImprovedBufferedTransportFactory |
167 | | : public apache::thrift::transport::TBufferedTransportFactory { |
168 | | using TConfiguration = apache::thrift::TConfiguration; |
169 | | using TTransport = apache::thrift::transport::TTransport; |
170 | | using TBufferedTransport = apache::thrift::transport::TBufferedTransport; |
171 | | |
172 | | public: |
173 | | ImprovedBufferedTransportFactory() |
174 | 14 | : config(std::make_shared<TConfiguration>(config::thrift_max_message_size)) {} |
175 | 6 | ~ImprovedBufferedTransportFactory() override = default; |
176 | | |
177 | 132 | std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> trans) override { |
178 | 132 | return std::make_shared<TBufferedTransport>(std::move(trans), config); |
179 | 132 | } |
180 | | |
181 | | private: |
182 | | std::shared_ptr<TConfiguration> config; |
183 | | }; |
184 | | |
185 | | // Helper class that starts a server in a separate thread, and handles |
186 | | // the inter-thread communication to monitor whether it started |
187 | | // correctly. |
188 | | class ThriftServer::ThriftServerEventProcessor |
189 | | : public apache::thrift::server::TServerEventHandler { |
190 | | public: |
191 | | ThriftServerEventProcessor(ThriftServer* thrift_server) |
192 | 14 | : _thrift_server(thrift_server), _signal_fired(false) {} |
193 | | |
194 | | // friendly to code style |
195 | 6 | ~ThriftServerEventProcessor() override = default; |
196 | | |
197 | | // Called by TNonBlockingServer when server has acquired its resources and is ready to |
198 | | // serve, and signals to StartAndWaitForServer that start-up is finished. |
199 | | // From TServerEventHandler. |
200 | | void preServe() override; |
201 | | |
202 | | // Called when a client connects; we create per-client state and call any |
203 | | // SessionHandlerIf handler. |
204 | | void* createContext(std::shared_ptr<apache::thrift::protocol::TProtocol> input, |
205 | | std::shared_ptr<apache::thrift::protocol::TProtocol> output) override; |
206 | | |
207 | | // Called when a client starts an RPC; we set the thread-local session key. |
208 | | void processContext(void* context, |
209 | | std::shared_ptr<apache::thrift::transport::TTransport> output) override; |
210 | | |
211 | | // Called when a client disconnects; we call any SessionHandlerIf handler. |
212 | | void deleteContext(void* serverContext, |
213 | | std::shared_ptr<apache::thrift::protocol::TProtocol> input, |
214 | | std::shared_ptr<apache::thrift::protocol::TProtocol> output) override; |
215 | | |
216 | | // Waits for a timeout of TIMEOUT_MS for a server to signal that it has started |
217 | | // correctly. |
218 | | Status start_and_wait_for_server(); |
219 | | |
220 | | private: |
221 | | // Lock used to ensure that there are no missed notifications between starting the |
222 | | // supervision thread and calling _signal_cond.timed_wait. Also used to ensure |
223 | | // thread-safe access to members of _thrift_server |
224 | | std::mutex _signal_lock; |
225 | | |
226 | | // Condition variable that is notified by the supervision thread once either |
227 | | // a) all is well or b) an error occurred. |
228 | | std::condition_variable _signal_cond; |
229 | | |
230 | | // The ThriftServer under management. This class is a friend of ThriftServer, and |
231 | | // reaches in to change member variables at will. |
232 | | ThriftServer* _thrift_server; |
233 | | |
234 | | // Guards against spurious condition variable wakeups |
235 | | bool _signal_fired; |
236 | | |
237 | | // The time, in milliseconds, to wait for a server to come up |
238 | | const static int TIMEOUT_MS; |
239 | | |
240 | | // Called in a separate thread; wraps TNonBlockingServer::serve in an exception handler |
241 | | void supervise(); |
242 | | }; |
243 | | |
244 | | // https://stackoverflow.com/questions/5391973/undefined-reference-to-static-const-int |
245 | | const int ThriftServer::ThriftServerEventProcessor::TIMEOUT_MS = 2500; |
246 | | |
247 | 14 | Status ThriftServer::ThriftServerEventProcessor::start_and_wait_for_server() { |
248 | | // Locking here protects against missed notifications if Supervise executes quickly |
249 | 14 | std::unique_lock<std::mutex> lock(_signal_lock); |
250 | 14 | _thrift_server->_started = false; |
251 | | |
252 | 14 | _thrift_server->_server_thread = std::make_unique<std::thread>( |
253 | 14 | &ThriftServer::ThriftServerEventProcessor::supervise, this); |
254 | | |
255 | | // Loop protects against spurious wakeup. Locks provide necessary fences to ensure |
256 | | // visibility. |
257 | 28 | while (!_signal_fired) { |
258 | | // Yields lock and allows supervision thread to continue and signal |
259 | 14 | std::cv_status cvsts = _signal_cond.wait_for(lock, std::chrono::milliseconds(TIMEOUT_MS)); |
260 | 14 | if (cvsts == std::cv_status::timeout) { |
261 | 0 | std::stringstream ss; |
262 | 0 | ss << "ThriftServer '" << _thrift_server->_name |
263 | 0 | << "' (on port: " << _thrift_server->_port << ") did not start within " << TIMEOUT_MS |
264 | 0 | << "ms"; |
265 | 0 | LOG(ERROR) << ss.str(); |
266 | 0 | return Status::InternalError(ss.str()); |
267 | 0 | } |
268 | 14 | } |
269 | | |
270 | | // _started == true only if preServe was called. May be false if there was an exception |
271 | | // after preServe that was caught by Supervise, causing it to reset the error condition. |
272 | 14 | if (!_thrift_server->_started) { |
273 | 0 | std::stringstream ss; |
274 | 0 | ss << "ThriftServer '" << _thrift_server->_name << "' (on port: " << _thrift_server->_port |
275 | 0 | << ") did not start correctly "; |
276 | 0 | LOG(ERROR) << ss.str(); |
277 | 0 | return Status::InternalError(ss.str()); |
278 | 0 | } |
279 | | |
280 | 14 | return Status::OK(); |
281 | 14 | } |
282 | | |
283 | 14 | void ThriftServer::ThriftServerEventProcessor::supervise() { |
284 | 14 | DCHECK(_thrift_server->_server.get() != nullptr); |
285 | | |
286 | 14 | try { |
287 | 14 | _thrift_server->_server->serve(); |
288 | 14 | } catch (apache::thrift::TException& e) { |
289 | 0 | LOG(ERROR) << "ThriftServer '" << _thrift_server->_name |
290 | 0 | << "' (on port: " << _thrift_server->_port |
291 | 0 | << ") exited due to TException: " << e.what(); |
292 | 0 | } |
293 | | |
294 | 14 | LOG(INFO) << "ThriftServer " << _thrift_server->_name << " exited"; |
295 | | |
296 | 6 | { |
297 | | // _signal_lock ensures mutual exclusion of access to _thrift_server |
298 | 6 | std::lock_guard<std::mutex> lock(_signal_lock); |
299 | 6 | _thrift_server->_started = false; |
300 | | |
301 | | // There may not be anyone waiting on this signal (if the |
302 | | // exception occurs after startup). That's not a problem, this is |
303 | | // just to avoid waiting for the timeout in case of a bind |
304 | | // failure, for example. |
305 | 6 | _signal_fired = true; |
306 | 6 | } |
307 | | |
308 | 6 | _signal_cond.notify_all(); |
309 | 6 | } |
310 | | |
311 | 14 | void ThriftServer::ThriftServerEventProcessor::preServe() { |
312 | | // Acquire the signal lock to ensure that StartAndWaitForServer is |
313 | | // waiting on _signal_cond when we notify. |
314 | 14 | std::lock_guard<std::mutex> lock(_signal_lock); |
315 | 14 | _signal_fired = true; |
316 | | |
317 | | // This is the (only) success path - if this is not reached within TIMEOUT_MS, |
318 | | // StartAndWaitForServer will indicate failure. |
319 | 14 | _thrift_server->_started = true; |
320 | | |
321 | | // Should only be one thread waiting on _signal_cond, but wake all just in case. |
322 | 14 | _signal_cond.notify_all(); |
323 | 14 | } |
324 | | |
325 | | // This thread-local variable contains the current session key for whichever |
326 | | // thrift server is currently serving a request on the current thread. |
327 | | __thread ThriftServer::SessionKey* _session_key; |
328 | | |
329 | 0 | ThriftServer::SessionKey* ThriftServer::get_thread_session_key() { |
330 | 0 | return _session_key; |
331 | 0 | } |
332 | | |
333 | | void* ThriftServer::ThriftServerEventProcessor::createContext( |
334 | | std::shared_ptr<apache::thrift::protocol::TProtocol> input, |
335 | 66 | std::shared_ptr<apache::thrift::protocol::TProtocol> output) { |
336 | 66 | std::stringstream ss; |
337 | | |
338 | 66 | apache::thrift::transport::TSocket* socket = nullptr; |
339 | 66 | apache::thrift::transport::TTransport* transport = input->getTransport().get(); |
340 | 66 | { |
341 | 66 | switch (_thrift_server->_server_type) { |
342 | 0 | case NON_BLOCKING: |
343 | 0 | socket = static_cast<apache::thrift::transport::TSocket*>( |
344 | 0 | static_cast<apache::thrift::transport::TFramedTransport*>(transport) |
345 | 0 | ->getUnderlyingTransport() |
346 | 0 | .get()); |
347 | 0 | break; |
348 | | |
349 | 0 | case THREAD_POOL: |
350 | 66 | case THREADED: |
351 | 66 | socket = static_cast<apache::thrift::transport::TSocket*>( |
352 | 66 | static_cast<apache::thrift::transport::TBufferedTransport*>(transport) |
353 | 66 | ->getUnderlyingTransport() |
354 | 66 | .get()); |
355 | 66 | break; |
356 | | |
357 | 0 | default: |
358 | 0 | DCHECK(false) << "Unexpected thrift server type"; |
359 | 66 | } |
360 | 66 | } |
361 | | |
362 | 66 | ss << socket->getPeerAddress() << ":" << socket->getPeerPort(); |
363 | | |
364 | 66 | { |
365 | 66 | std::lock_guard<std::mutex> _l(_thrift_server->_session_keys_lock); |
366 | | |
367 | 66 | std::shared_ptr<SessionKey> key_ptr(new std::string(ss.str())); |
368 | | |
369 | 66 | _session_key = key_ptr.get(); |
370 | 66 | _thrift_server->_session_keys[key_ptr.get()] = key_ptr; |
371 | 66 | } |
372 | | |
373 | 66 | if (_thrift_server->_session_handler != nullptr) { |
374 | 0 | _thrift_server->_session_handler->session_start(*_session_key); |
375 | 0 | } |
376 | | |
377 | 66 | _thrift_server->thrift_connections_total->increment(1L); |
378 | 66 | _thrift_server->thrift_current_connections->increment(1L); |
379 | | |
380 | | // Store the _session_key in the per-client context to avoid recomputing |
381 | | // it. If only this were accessible from RPC method calls, we wouldn't have to |
382 | | // mess around with thread locals. |
383 | 66 | return (void*)_session_key; |
384 | 66 | } |
385 | | |
386 | | void ThriftServer::ThriftServerEventProcessor::processContext( |
387 | 58.0k | void* context, std::shared_ptr<apache::thrift::transport::TTransport> transport) { |
388 | 58.0k | _session_key = reinterpret_cast<SessionKey*>(context); |
389 | 58.0k | } |
390 | | |
391 | | void ThriftServer::ThriftServerEventProcessor::deleteContext( |
392 | | void* serverContext, std::shared_ptr<apache::thrift::protocol::TProtocol> input, |
393 | 30 | std::shared_ptr<apache::thrift::protocol::TProtocol> output) { |
394 | 30 | _session_key = (SessionKey*)serverContext; |
395 | | |
396 | 30 | if (_thrift_server->_session_handler != nullptr) { |
397 | 0 | _thrift_server->_session_handler->session_end(*_session_key); |
398 | 0 | } |
399 | | |
400 | 30 | { |
401 | 30 | std::lock_guard<std::mutex> _l(_thrift_server->_session_keys_lock); |
402 | 30 | _thrift_server->_session_keys.erase(_session_key); |
403 | 30 | } |
404 | | |
405 | 30 | _thrift_server->thrift_current_connections->increment(-1L); |
406 | 30 | } |
407 | | |
408 | | ThriftServer::ThriftServer(const std::string& name, |
409 | | const std::shared_ptr<apache::thrift::TProcessor>& processor, int port, |
410 | | int num_worker_threads, ServerType server_type) |
411 | 14 | : _started(false), |
412 | 14 | _port(port), |
413 | 14 | _num_worker_threads(num_worker_threads), |
414 | 14 | _server_type(server_type), |
415 | 14 | _name(name), |
416 | 14 | _server_thread(nullptr), |
417 | 14 | _server(nullptr), |
418 | 14 | _processor(processor), |
419 | 14 | _session_handler(nullptr) { |
420 | 14 | _thrift_server_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity( |
421 | 14 | std::string("thrift_server.") + name, {{"name", name}}); |
422 | 14 | INT_GAUGE_METRIC_REGISTER(_thrift_server_metric_entity, thrift_current_connections); |
423 | 14 | INT_COUNTER_METRIC_REGISTER(_thrift_server_metric_entity, thrift_connections_total); |
424 | 14 | } |
425 | | |
426 | 6 | ThriftServer::~ThriftServer() { |
427 | 6 | stop(); |
428 | 6 | join(); |
429 | 6 | } |
430 | | |
431 | 14 | Status ThriftServer::start() { |
432 | 14 | DCHECK(!_started); |
433 | 14 | std::shared_ptr<apache::thrift::protocol::TProtocolFactory> protocol_factory( |
434 | 14 | new apache::thrift::protocol::TBinaryProtocolFactory()); |
435 | 14 | std::shared_ptr<apache::thrift::concurrency::ThreadManager> thread_mgr; |
436 | 14 | std::shared_ptr<apache::thrift::concurrency::ThreadFactory> thread_factory = |
437 | 14 | std::make_shared<apache::thrift::concurrency::ThreadFactory>(); |
438 | 14 | std::shared_ptr<apache::thrift::transport::TServerTransport> fe_server_transport; |
439 | 14 | std::shared_ptr<apache::thrift::transport::TTransportFactory> transport_factory; |
440 | 14 | std::shared_ptr<apache::thrift::transport::TNonblockingServerSocket> socket; |
441 | 14 | if (_server_type != THREADED) { |
442 | 0 | thread_mgr = apache::thrift::concurrency::ThreadManager::newSimpleThreadManager( |
443 | 0 | _num_worker_threads); |
444 | 0 | thread_mgr->threadFactory(thread_factory); |
445 | 0 | thread_mgr->start(); |
446 | 0 | } |
447 | | |
448 | | // Note - if you change the transport types here, you must check that the |
449 | | // logic in createContext is still accurate. |
450 | 14 | apache::thrift::transport::TServerSocket* server_socket = nullptr; |
451 | | |
452 | 14 | switch (_server_type) { |
453 | 0 | case NON_BLOCKING: |
454 | 0 | socket = std::make_shared<ImprovedNonblockingServerSocket>(_port); |
455 | 0 | if (transport_factory == nullptr) { |
456 | 0 | transport_factory.reset(new apache::thrift::transport::TTransportFactory()); |
457 | 0 | } |
458 | |
|
459 | 0 | _server = std::make_unique<apache::thrift::server::TNonblockingServer>( |
460 | 0 | _processor, transport_factory, transport_factory, protocol_factory, |
461 | 0 | protocol_factory, socket, thread_mgr); |
462 | 0 | break; |
463 | | |
464 | 0 | case THREAD_POOL: |
465 | 0 | fe_server_transport.reset(new ImprovedServerSocket( |
466 | 0 | BackendOptions::get_service_bind_address_without_bracket(), _port)); |
467 | |
|
468 | 0 | if (transport_factory == nullptr) { |
469 | 0 | transport_factory = std::make_shared<ImprovedBufferedTransportFactory>(); |
470 | 0 | } |
471 | |
|
472 | 0 | _server = std::make_unique<apache::thrift::server::TThreadPoolServer>( |
473 | 0 | _processor, fe_server_transport, transport_factory, protocol_factory, thread_mgr); |
474 | 0 | break; |
475 | | |
476 | 14 | case THREADED: |
477 | 14 | server_socket = new ImprovedServerSocket( |
478 | 14 | BackendOptions::get_service_bind_address_without_bracket(), _port); |
479 | 14 | fe_server_transport.reset(server_socket); |
480 | 14 | server_socket->setKeepAlive(true); |
481 | | |
482 | 14 | if (transport_factory == nullptr) { |
483 | 14 | transport_factory = std::make_shared<ImprovedBufferedTransportFactory>(); |
484 | 14 | } |
485 | | |
486 | 14 | _server = std::make_unique<apache::thrift::server::TThreadedServer>( |
487 | 14 | _processor, fe_server_transport, transport_factory, protocol_factory, |
488 | 14 | thread_factory); |
489 | 14 | break; |
490 | | |
491 | 0 | default: |
492 | 0 | std::stringstream error_msg; |
493 | 0 | error_msg << "Unsupported server type: " << _server_type; |
494 | 0 | LOG(ERROR) << error_msg.str(); |
495 | 0 | return Status::InternalError(error_msg.str()); |
496 | 14 | } |
497 | | |
498 | 14 | std::shared_ptr<ThriftServer::ThriftServerEventProcessor> event_processor( |
499 | 14 | new ThriftServer::ThriftServerEventProcessor(this)); |
500 | 14 | _server->setServerEventHandler(event_processor); |
501 | | |
502 | 14 | RETURN_IF_ERROR(event_processor->start_and_wait_for_server()); |
503 | | |
504 | 14 | LOG(INFO) << "ThriftServer '" << _name << "' started on port: " << _port; |
505 | | |
506 | 14 | DCHECK(_started); |
507 | 14 | return Status::OK(); |
508 | 14 | } |
509 | | |
510 | 12 | void ThriftServer::stop() { |
511 | 12 | _server->stop(); |
512 | 12 | } |
513 | | |
514 | 6 | void ThriftServer::join() { |
515 | 6 | DCHECK(_server_thread != nullptr); |
516 | 6 | _server_thread->join(); |
517 | 6 | } |
518 | | |
519 | 0 | void ThriftServer::stop_for_testing() { |
520 | 0 | DCHECK(_server_thread != nullptr); |
521 | 0 | DCHECK(_server); |
522 | 0 | DCHECK_EQ(_server_type, THREADED); |
523 | 0 | _server->stop(); |
524 | |
|
525 | 0 | if (_started) { |
526 | 0 | join(); |
527 | 0 | } |
528 | 0 | } |
529 | | } // namespace doris |