Coverage Report

Created: 2026-03-13 05:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/thrift_server.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "util/thrift_server.h"
19
20
#include <glog/logging.h>
21
#include <thrift/Thrift.h>
22
#include <thrift/concurrency/ThreadFactory.h>
23
#include <thrift/concurrency/ThreadManager.h>
24
#include <thrift/protocol/TBinaryProtocol.h>
25
#include <thrift/protocol/TProtocol.h>
26
#include <thrift/server/TNonblockingServer.h>
27
#include <thrift/server/TThreadPoolServer.h>
28
#include <thrift/server/TThreadedServer.h>
29
#include <thrift/transport/TBufferTransports.h>
30
#include <thrift/transport/TNonblockingServerSocket.h>
31
#include <thrift/transport/TServerSocket.h>
32
#include <thrift/transport/TSocket.h>
33
#include <thrift/transport/TTransport.h>
34
// IWYU pragma: no_include <bits/chrono.h>
35
#include <chrono> // IWYU pragma: keep
36
#include <condition_variable>
37
#include <memory>
38
#include <mutex>
39
#include <sstream>
40
#include <thread>
41
42
#include "common/config.h"
43
#include "common/metrics/doris_metrics.h"
44
#include "service/backend_options.h"
45
46
namespace apache {
47
namespace thrift {
48
class TProcessor;
49
50
namespace transport {
51
class TServerTransport;
52
} // namespace transport
53
} // namespace thrift
54
} // namespace apache
55
56
namespace doris {
57
58
DEFINE_GAUGE_METRIC_PROTOTYPE_3ARG(thrift_current_connections, MetricUnit::CONNECTIONS,
59
                                   "Number of currently active connections");
60
DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(thrift_connections_total, MetricUnit::CONNECTIONS,
61
                                     "Total connections made over the lifetime of this server");
62
63
// Nonblocking Server socket implementation of TNonblockingServerTransport. Wrapper around a unix
64
// socket listen and accept calls.
65
class ImprovedNonblockingServerSocket : public apache::thrift::transport::TNonblockingServerSocket {
66
    using TConfiguration = apache::thrift::TConfiguration;
67
    using TSocket = apache::thrift::transport::TSocket;
68
69
public:
70
    // Constructor.
71
    ImprovedNonblockingServerSocket(int port)
72
0
            : TNonblockingServerSocket(port),
73
0
              config(std::make_shared<TConfiguration>(config::thrift_max_message_size)) {}
74
0
    ~ImprovedNonblockingServerSocket() override = default;
75
76
protected:
77
0
    std::shared_ptr<TSocket> createSocket(THRIFT_SOCKET clientSocket) override {
78
0
        return std::make_shared<TSocket>(clientSocket, config);
79
0
    }
80
81
private:
82
    std::shared_ptr<TConfiguration> config;
83
};
84
85
class ImprovedBufferedTransportFactory
86
        : public apache::thrift::transport::TBufferedTransportFactory {
87
    using TConfiguration = apache::thrift::TConfiguration;
88
    using TTransport = apache::thrift::transport::TTransport;
89
    using TBufferedTransport = apache::thrift::transport::TBufferedTransport;
90
91
public:
92
    ImprovedBufferedTransportFactory()
93
14
            : config(std::make_shared<TConfiguration>(config::thrift_max_message_size)) {}
94
6
    ~ImprovedBufferedTransportFactory() override = default;
95
96
94
    std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> trans) override {
97
94
        return std::make_shared<TBufferedTransport>(std::move(trans), config);
98
94
    }
99
100
private:
101
    std::shared_ptr<TConfiguration> config;
102
};
103
104
// Helper class that starts a server in a separate thread, and handles
105
// the inter-thread communication to monitor whether it started
106
// correctly.
107
class ThriftServer::ThriftServerEventProcessor
108
        : public apache::thrift::server::TServerEventHandler {
109
public:
110
    ThriftServerEventProcessor(ThriftServer* thrift_server)
111
14
            : _thrift_server(thrift_server), _signal_fired(false) {}
112
113
    // friendly to code style
114
6
    ~ThriftServerEventProcessor() override = default;
115
116
    // Called by TNonBlockingServer when server has acquired its resources and is ready to
117
    // serve, and signals to StartAndWaitForServer that start-up is finished.
118
    // From TServerEventHandler.
119
    void preServe() override;
120
121
    // Called when a client connects; we create per-client state and call any
122
    // SessionHandlerIf handler.
123
    void* createContext(std::shared_ptr<apache::thrift::protocol::TProtocol> input,
124
                        std::shared_ptr<apache::thrift::protocol::TProtocol> output) override;
125
126
    // Called when a client starts an RPC; we set the thread-local session key.
127
    void processContext(void* context,
128
                        std::shared_ptr<apache::thrift::transport::TTransport> output) override;
129
130
    // Called when a client disconnects; we call any SessionHandlerIf handler.
131
    void deleteContext(void* serverContext,
132
                       std::shared_ptr<apache::thrift::protocol::TProtocol> input,
133
                       std::shared_ptr<apache::thrift::protocol::TProtocol> output) override;
134
135
    // Waits for a timeout of TIMEOUT_MS for a server to signal that it has started
136
    // correctly.
137
    Status start_and_wait_for_server();
138
139
private:
140
    // Lock used to ensure that there are no missed notifications between starting the
141
    // supervision thread and calling _signal_cond.timed_wait. Also used to ensure
142
    // thread-safe access to members of _thrift_server
143
    std::mutex _signal_lock;
144
145
    // Condition variable that is notified by the supervision thread once either
146
    // a) all is well or b) an error occurred.
147
    std::condition_variable _signal_cond;
148
149
    // The ThriftServer under management. This class is a friend of ThriftServer, and
150
    // reaches in to change member variables at will.
151
    ThriftServer* _thrift_server;
152
153
    // Guards against spurious condition variable wakeups
154
    bool _signal_fired;
155
156
    // The time, in milliseconds, to wait for a server to come up
157
    const static int TIMEOUT_MS;
158
159
    // Called in a separate thread; wraps TNonBlockingServer::serve in an exception handler
160
    void supervise();
161
};
162
163
// https://stackoverflow.com/questions/5391973/undefined-reference-to-static-const-int
164
const int ThriftServer::ThriftServerEventProcessor::TIMEOUT_MS = 2500;
165
166
14
Status ThriftServer::ThriftServerEventProcessor::start_and_wait_for_server() {
167
    // Locking here protects against missed notifications if Supervise executes quickly
168
14
    std::unique_lock<std::mutex> lock(_signal_lock);
169
14
    _thrift_server->_started = false;
170
171
14
    _thrift_server->_server_thread = std::make_unique<std::thread>(
172
14
            &ThriftServer::ThriftServerEventProcessor::supervise, this);
173
174
    // Loop protects against spurious wakeup. Locks provide necessary fences to ensure
175
    // visibility.
176
28
    while (!_signal_fired) {
177
        // Yields lock and allows supervision thread to continue and signal
178
14
        std::cv_status cvsts = _signal_cond.wait_for(lock, std::chrono::milliseconds(TIMEOUT_MS));
179
14
        if (cvsts == std::cv_status::timeout) {
180
0
            std::stringstream ss;
181
0
            ss << "ThriftServer '" << _thrift_server->_name
182
0
               << "' (on port: " << _thrift_server->_port << ") did not start within " << TIMEOUT_MS
183
0
               << "ms";
184
0
            LOG(ERROR) << ss.str();
185
0
            return Status::InternalError(ss.str());
186
0
        }
187
14
    }
188
189
    // _started == true only if preServe was called. May be false if there was an exception
190
    // after preServe that was caught by Supervise, causing it to reset the error condition.
191
14
    if (!_thrift_server->_started) {
192
0
        std::stringstream ss;
193
0
        ss << "ThriftServer '" << _thrift_server->_name << "' (on port: " << _thrift_server->_port
194
0
           << ") did not start correctly ";
195
0
        LOG(ERROR) << ss.str();
196
0
        return Status::InternalError(ss.str());
197
0
    }
198
199
14
    return Status::OK();
200
14
}
201
202
14
void ThriftServer::ThriftServerEventProcessor::supervise() {
203
14
    DCHECK(_thrift_server->_server.get() != nullptr);
204
205
14
    try {
206
14
        _thrift_server->_server->serve();
207
14
    } catch (apache::thrift::TException& e) {
208
0
        LOG(ERROR) << "ThriftServer '" << _thrift_server->_name
209
0
                   << "' (on port: " << _thrift_server->_port
210
0
                   << ") exited due to TException: " << e.what();
211
0
    }
212
213
14
    LOG(INFO) << "ThriftServer " << _thrift_server->_name << " exited";
214
215
6
    {
216
        // _signal_lock ensures mutual exclusion of access to _thrift_server
217
6
        std::lock_guard<std::mutex> lock(_signal_lock);
218
6
        _thrift_server->_started = false;
219
220
        // There may not be anyone waiting on this signal (if the
221
        // exception occurs after startup). That's not a problem, this is
222
        // just to avoid waiting for the timeout in case of a bind
223
        // failure, for example.
224
6
        _signal_fired = true;
225
6
    }
226
227
6
    _signal_cond.notify_all();
228
6
}
229
230
14
void ThriftServer::ThriftServerEventProcessor::preServe() {
231
    // Acquire the signal lock to ensure that StartAndWaitForServer is
232
    // waiting on _signal_cond when we notify.
233
14
    std::lock_guard<std::mutex> lock(_signal_lock);
234
14
    _signal_fired = true;
235
236
    // This is the (only) success path - if this is not reached within TIMEOUT_MS,
237
    // StartAndWaitForServer will indicate failure.
238
14
    _thrift_server->_started = true;
239
240
    // Should only be one thread waiting on _signal_cond, but wake all just in case.
241
14
    _signal_cond.notify_all();
242
14
}
243
244
// This thread-local variable contains the current session key for whichever
245
// thrift server is currently serving a request on the current thread.
246
__thread ThriftServer::SessionKey* _session_key;
247
248
0
ThriftServer::SessionKey* ThriftServer::get_thread_session_key() {
249
0
    return _session_key;
250
0
}
251
252
void* ThriftServer::ThriftServerEventProcessor::createContext(
253
        std::shared_ptr<apache::thrift::protocol::TProtocol> input,
254
47
        std::shared_ptr<apache::thrift::protocol::TProtocol> output) {
255
47
    std::stringstream ss;
256
257
47
    apache::thrift::transport::TSocket* socket = nullptr;
258
47
    apache::thrift::transport::TTransport* transport = input->getTransport().get();
259
47
    {
260
47
        switch (_thrift_server->_server_type) {
261
0
        case NON_BLOCKING:
262
0
            socket = static_cast<apache::thrift::transport::TSocket*>(
263
0
                    static_cast<apache::thrift::transport::TFramedTransport*>(transport)
264
0
                            ->getUnderlyingTransport()
265
0
                            .get());
266
0
            break;
267
268
0
        case THREAD_POOL:
269
47
        case THREADED:
270
47
            socket = static_cast<apache::thrift::transport::TSocket*>(
271
47
                    static_cast<apache::thrift::transport::TBufferedTransport*>(transport)
272
47
                            ->getUnderlyingTransport()
273
47
                            .get());
274
47
            break;
275
276
0
        default:
277
0
            DCHECK(false) << "Unexpected thrift server type";
278
47
        }
279
47
    }
280
281
47
    ss << socket->getPeerAddress() << ":" << socket->getPeerPort();
282
283
47
    {
284
47
        std::lock_guard<std::mutex> _l(_thrift_server->_session_keys_lock);
285
286
47
        std::shared_ptr<SessionKey> key_ptr(new std::string(ss.str()));
287
288
47
        _session_key = key_ptr.get();
289
47
        _thrift_server->_session_keys[key_ptr.get()] = key_ptr;
290
47
    }
291
292
47
    if (_thrift_server->_session_handler != nullptr) {
293
0
        _thrift_server->_session_handler->session_start(*_session_key);
294
0
    }
295
296
47
    _thrift_server->thrift_connections_total->increment(1L);
297
47
    _thrift_server->thrift_current_connections->increment(1L);
298
299
    // Store the _session_key in the per-client context to avoid recomputing
300
    // it. If only this were accessible from RPC method calls, we wouldn't have to
301
    // mess around with thread locals.
302
47
    return (void*)_session_key;
303
47
}
304
305
void ThriftServer::ThriftServerEventProcessor::processContext(
306
25.2k
        void* context, std::shared_ptr<apache::thrift::transport::TTransport> transport) {
307
25.2k
    _session_key = reinterpret_cast<SessionKey*>(context);
308
25.2k
}
309
310
void ThriftServer::ThriftServerEventProcessor::deleteContext(
311
        void* serverContext, std::shared_ptr<apache::thrift::protocol::TProtocol> input,
312
21
        std::shared_ptr<apache::thrift::protocol::TProtocol> output) {
313
21
    _session_key = (SessionKey*)serverContext;
314
315
21
    if (_thrift_server->_session_handler != nullptr) {
316
0
        _thrift_server->_session_handler->session_end(*_session_key);
317
0
    }
318
319
21
    {
320
21
        std::lock_guard<std::mutex> _l(_thrift_server->_session_keys_lock);
321
21
        _thrift_server->_session_keys.erase(_session_key);
322
21
    }
323
324
21
    _thrift_server->thrift_current_connections->increment(-1L);
325
21
}
326
327
ThriftServer::ThriftServer(const std::string& name,
328
                           const std::shared_ptr<apache::thrift::TProcessor>& processor, int port,
329
                           int num_worker_threads, ServerType server_type)
330
14
        : _started(false),
331
14
          _port(port),
332
14
          _num_worker_threads(num_worker_threads),
333
14
          _server_type(server_type),
334
14
          _name(name),
335
14
          _server_thread(nullptr),
336
14
          _server(nullptr),
337
14
          _processor(processor),
338
14
          _session_handler(nullptr) {
339
14
    _thrift_server_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(
340
14
            std::string("thrift_server.") + name, {{"name", name}});
341
14
    INT_GAUGE_METRIC_REGISTER(_thrift_server_metric_entity, thrift_current_connections);
342
14
    INT_COUNTER_METRIC_REGISTER(_thrift_server_metric_entity, thrift_connections_total);
343
14
}
344
345
6
ThriftServer::~ThriftServer() {
346
6
    stop();
347
6
    join();
348
6
}
349
350
14
Status ThriftServer::start() {
351
14
    DCHECK(!_started);
352
14
    std::shared_ptr<apache::thrift::protocol::TProtocolFactory> protocol_factory(
353
14
            new apache::thrift::protocol::TBinaryProtocolFactory());
354
14
    std::shared_ptr<apache::thrift::concurrency::ThreadManager> thread_mgr;
355
14
    std::shared_ptr<apache::thrift::concurrency::ThreadFactory> thread_factory =
356
14
            std::make_shared<apache::thrift::concurrency::ThreadFactory>();
357
14
    std::shared_ptr<apache::thrift::transport::TServerTransport> fe_server_transport;
358
14
    std::shared_ptr<apache::thrift::transport::TTransportFactory> transport_factory;
359
14
    std::shared_ptr<apache::thrift::transport::TNonblockingServerSocket> socket;
360
14
    if (_server_type != THREADED) {
361
0
        thread_mgr = apache::thrift::concurrency::ThreadManager::newSimpleThreadManager(
362
0
                _num_worker_threads);
363
0
        thread_mgr->threadFactory(thread_factory);
364
0
        thread_mgr->start();
365
0
    }
366
367
    // Note - if you change the transport types here, you must check that the
368
    // logic in createContext is still accurate.
369
14
    apache::thrift::transport::TServerSocket* server_socket = nullptr;
370
371
14
    switch (_server_type) {
372
0
    case NON_BLOCKING:
373
0
        socket = std::make_shared<ImprovedNonblockingServerSocket>(_port);
374
0
        if (transport_factory == nullptr) {
375
0
            transport_factory.reset(new apache::thrift::transport::TTransportFactory());
376
0
        }
377
378
0
        _server = std::make_unique<apache::thrift::server::TNonblockingServer>(
379
0
                _processor, transport_factory, transport_factory, protocol_factory,
380
0
                protocol_factory, socket, thread_mgr);
381
0
        break;
382
383
0
    case THREAD_POOL:
384
0
        fe_server_transport.reset(new apache::thrift::transport::TServerSocket(
385
0
                BackendOptions::get_service_bind_address_without_bracket(), _port));
386
387
0
        if (transport_factory == nullptr) {
388
0
            transport_factory = std::make_shared<ImprovedBufferedTransportFactory>();
389
0
        }
390
391
0
        _server = std::make_unique<apache::thrift::server::TThreadPoolServer>(
392
0
                _processor, fe_server_transport, transport_factory, protocol_factory, thread_mgr);
393
0
        break;
394
395
14
    case THREADED:
396
14
        server_socket = new apache::thrift::transport::TServerSocket(
397
14
                BackendOptions::get_service_bind_address_without_bracket(), _port);
398
14
        fe_server_transport.reset(server_socket);
399
14
        server_socket->setKeepAlive(true);
400
401
14
        if (transport_factory == nullptr) {
402
14
            transport_factory = std::make_shared<ImprovedBufferedTransportFactory>();
403
14
        }
404
405
14
        _server = std::make_unique<apache::thrift::server::TThreadedServer>(
406
14
                _processor, fe_server_transport, transport_factory, protocol_factory,
407
14
                thread_factory);
408
14
        break;
409
410
0
    default:
411
0
        std::stringstream error_msg;
412
0
        error_msg << "Unsupported server type: " << _server_type;
413
0
        LOG(ERROR) << error_msg.str();
414
0
        return Status::InternalError(error_msg.str());
415
14
    }
416
417
14
    std::shared_ptr<ThriftServer::ThriftServerEventProcessor> event_processor(
418
14
            new ThriftServer::ThriftServerEventProcessor(this));
419
14
    _server->setServerEventHandler(event_processor);
420
421
14
    RETURN_IF_ERROR(event_processor->start_and_wait_for_server());
422
423
14
    LOG(INFO) << "ThriftServer '" << _name << "' started on port: " << _port;
424
425
14
    DCHECK(_started);
426
14
    return Status::OK();
427
14
}
428
429
12
void ThriftServer::stop() {
430
12
    _server->stop();
431
12
}
432
433
6
void ThriftServer::join() {
434
6
    DCHECK(_server_thread != nullptr);
435
6
    _server_thread->join();
436
6
}
437
438
0
void ThriftServer::stop_for_testing() {
439
0
    DCHECK(_server_thread != nullptr);
440
0
    DCHECK(_server);
441
0
    DCHECK_EQ(_server_type, THREADED);
442
0
    _server->stop();
443
444
0
    if (_started) {
445
0
        join();
446
0
    }
447
0
}
448
} // namespace doris