Coverage Report

Created: 2026-06-16 17:29

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/thrift_server.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "util/thrift_server.h"
19
20
#include <glog/logging.h>
21
#include <thrift/Thrift.h>
22
#include <thrift/concurrency/ThreadFactory.h>
23
#include <thrift/concurrency/ThreadManager.h>
24
#include <thrift/protocol/TBinaryProtocol.h>
25
#include <thrift/protocol/TProtocol.h>
26
#include <thrift/server/TNonblockingServer.h>
27
#include <thrift/server/TThreadPoolServer.h>
28
#include <thrift/server/TThreadedServer.h>
29
#include <thrift/transport/TBufferTransports.h>
30
#include <thrift/transport/TNonblockingServerSocket.h>
31
#include <thrift/transport/TServerSocket.h>
32
#include <thrift/transport/TSocket.h>
33
#include <thrift/transport/TTransport.h>
34
#include <thrift/transport/TTransportException.h>
35
// IWYU pragma: no_include <bits/chrono.h>
36
#include <chrono> // IWYU pragma: keep
37
#include <condition_variable>
38
#include <memory>
39
#include <mutex>
40
#include <sstream>
41
#include <string_view>
42
#include <thread>
43
#include <utility>
44
45
#include "common/config.h"
46
#include "common/metrics/doris_metrics.h"
47
#include "service/backend_options.h"
48
49
namespace apache {
50
namespace thrift {
51
class TProcessor;
52
53
namespace transport {
54
class TServerTransport;
55
} // namespace transport
56
} // namespace thrift
57
} // namespace apache
58
59
namespace doris {
60
61
DEFINE_GAUGE_METRIC_PROTOTYPE_3ARG(thrift_current_connections, MetricUnit::CONNECTIONS,
62
                                   "Number of currently active connections");
63
DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(thrift_connections_total, MetricUnit::CONNECTIONS,
64
                                     "Total connections made over the lifetime of this server");
65
66
32
bool is_unknown_eintr(const apache::thrift::transport::TTransportException& e) {
67
32
    return e.getType() == apache::thrift::transport::TTransportException::UNKNOWN &&
68
32
           std::string_view(e.what()).find("Interrupted system call") != std::string_view::npos;
69
32
}
70
71
// Full-process stack collection sends a diagnostic signal to many BE threads. Even with
72
// SA_RESTART, Linux can still return EINTR from poll/select/epoll-style waits, and Thrift wraps
73
// that as an UNKNOWN TTransportException with "Interrupted system call". If this exception escapes
74
// the blocking thrift server loop, heartbeat/backend thrift services can exit while the BE process
75
// is still partly alive, making FE report the BE as Alive=false. This wrapper keeps the normal
76
// TServerSocket behavior but retries only that narrow EINTR case on accept/read/peek; all other
77
// transport errors are still propagated.
78
class ImprovedServerSocket final : public apache::thrift::transport::TServerSocket {
79
    using TConfiguration = apache::thrift::TConfiguration;
80
    using TServerSocket = apache::thrift::transport::TServerSocket;
81
    using TSocket = apache::thrift::transport::TSocket;
82
    using TTransport = apache::thrift::transport::TTransport;
83
    using TTransportException = apache::thrift::transport::TTransportException;
84
85
    class RetryingSocket final : public TSocket {
86
    public:
87
        RetryingSocket(THRIFT_SOCKET socket, std::shared_ptr<THRIFT_SOCKET> interrupt_listener,
88
                       std::shared_ptr<TConfiguration> config)
89
66
                : TSocket(socket, std::move(interrupt_listener), std::move(config)) {}
90
91
253k
        uint32_t read(uint8_t* buf, uint32_t len) override {
92
253k
            while (true) {
93
253k
                try {
94
253k
                    return TSocket::read(buf, len);
95
253k
                } catch (const TTransportException& e) {
96
26
                    if (!is_unknown_eintr(e)) {
97
26
                        throw;
98
26
                    }
99
26
                }
100
253k
            }
101
253k
        }
102
103
0
        bool peek() override {
104
0
            while (true) {
105
0
                try {
106
0
                    return TSocket::peek();
107
0
                } catch (const TTransportException& e) {
108
0
                    if (!is_unknown_eintr(e)) {
109
0
                        throw;
110
0
                    }
111
0
                }
112
0
            }
113
0
        }
114
    };
115
116
public:
117
    ImprovedServerSocket(const std::string& address, int port)
118
14
            : TServerSocket(address, port),
119
14
              _config(std::make_shared<TConfiguration>(config::thrift_max_message_size)) {}
120
6
    ~ImprovedServerSocket() override = default;
121
122
protected:
123
80
    std::shared_ptr<TTransport> acceptImpl() override {
124
80
        while (true) {
125
80
            try {
126
80
                return TServerSocket::acceptImpl();
127
80
            } catch (const TTransportException& e) {
128
6
                if (!is_unknown_eintr(e)) {
129
6
                    throw;
130
6
                }
131
6
            }
132
80
        }
133
80
    }
134
135
66
    std::shared_ptr<TSocket> createSocket(THRIFT_SOCKET client) override {
136
66
        return std::make_shared<RetryingSocket>(
137
66
                client, interruptableChildren_ ? pChildInterruptSockReader_ : nullptr, _config);
138
66
    }
139
140
private:
141
    std::shared_ptr<TConfiguration> _config;
142
};
143
144
// Nonblocking Server socket implementation of TNonblockingServerTransport. Wrapper around a unix
145
// socket listen and accept calls.
146
class ImprovedNonblockingServerSocket : public apache::thrift::transport::TNonblockingServerSocket {
147
    using TConfiguration = apache::thrift::TConfiguration;
148
    using TSocket = apache::thrift::transport::TSocket;
149
150
public:
151
    // Constructor.
152
    ImprovedNonblockingServerSocket(int port)
153
0
            : TNonblockingServerSocket(port),
154
0
              config(std::make_shared<TConfiguration>(config::thrift_max_message_size)) {}
155
0
    ~ImprovedNonblockingServerSocket() override = default;
156
157
protected:
158
0
    std::shared_ptr<TSocket> createSocket(THRIFT_SOCKET clientSocket) override {
159
0
        return std::make_shared<TSocket>(clientSocket, config);
160
0
    }
161
162
private:
163
    std::shared_ptr<TConfiguration> config;
164
};
165
166
class ImprovedBufferedTransportFactory
167
        : public apache::thrift::transport::TBufferedTransportFactory {
168
    using TConfiguration = apache::thrift::TConfiguration;
169
    using TTransport = apache::thrift::transport::TTransport;
170
    using TBufferedTransport = apache::thrift::transport::TBufferedTransport;
171
172
public:
173
    ImprovedBufferedTransportFactory()
174
14
            : config(std::make_shared<TConfiguration>(config::thrift_max_message_size)) {}
175
6
    ~ImprovedBufferedTransportFactory() override = default;
176
177
132
    std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> trans) override {
178
132
        return std::make_shared<TBufferedTransport>(std::move(trans), config);
179
132
    }
180
181
private:
182
    std::shared_ptr<TConfiguration> config;
183
};
184
185
// Helper class that starts a server in a separate thread, and handles
186
// the inter-thread communication to monitor whether it started
187
// correctly.
188
class ThriftServer::ThriftServerEventProcessor
189
        : public apache::thrift::server::TServerEventHandler {
190
public:
191
    ThriftServerEventProcessor(ThriftServer* thrift_server)
192
14
            : _thrift_server(thrift_server), _signal_fired(false) {}
193
194
    // friendly to code style
195
6
    ~ThriftServerEventProcessor() override = default;
196
197
    // Called by TNonBlockingServer when server has acquired its resources and is ready to
198
    // serve, and signals to StartAndWaitForServer that start-up is finished.
199
    // From TServerEventHandler.
200
    void preServe() override;
201
202
    // Called when a client connects; we create per-client state and call any
203
    // SessionHandlerIf handler.
204
    void* createContext(std::shared_ptr<apache::thrift::protocol::TProtocol> input,
205
                        std::shared_ptr<apache::thrift::protocol::TProtocol> output) override;
206
207
    // Called when a client starts an RPC; we set the thread-local session key.
208
    void processContext(void* context,
209
                        std::shared_ptr<apache::thrift::transport::TTransport> output) override;
210
211
    // Called when a client disconnects; we call any SessionHandlerIf handler.
212
    void deleteContext(void* serverContext,
213
                       std::shared_ptr<apache::thrift::protocol::TProtocol> input,
214
                       std::shared_ptr<apache::thrift::protocol::TProtocol> output) override;
215
216
    // Waits for a timeout of TIMEOUT_MS for a server to signal that it has started
217
    // correctly.
218
    Status start_and_wait_for_server();
219
220
private:
221
    // Lock used to ensure that there are no missed notifications between starting the
222
    // supervision thread and calling _signal_cond.timed_wait. Also used to ensure
223
    // thread-safe access to members of _thrift_server
224
    std::mutex _signal_lock;
225
226
    // Condition variable that is notified by the supervision thread once either
227
    // a) all is well or b) an error occurred.
228
    std::condition_variable _signal_cond;
229
230
    // The ThriftServer under management. This class is a friend of ThriftServer, and
231
    // reaches in to change member variables at will.
232
    ThriftServer* _thrift_server;
233
234
    // Guards against spurious condition variable wakeups
235
    bool _signal_fired;
236
237
    // The time, in milliseconds, to wait for a server to come up
238
    const static int TIMEOUT_MS;
239
240
    // Called in a separate thread; wraps TNonBlockingServer::serve in an exception handler
241
    void supervise();
242
};
243
244
// https://stackoverflow.com/questions/5391973/undefined-reference-to-static-const-int
245
const int ThriftServer::ThriftServerEventProcessor::TIMEOUT_MS = 2500;
246
247
14
Status ThriftServer::ThriftServerEventProcessor::start_and_wait_for_server() {
248
    // Locking here protects against missed notifications if Supervise executes quickly
249
14
    std::unique_lock<std::mutex> lock(_signal_lock);
250
14
    _thrift_server->_started = false;
251
252
14
    _thrift_server->_server_thread = std::make_unique<std::thread>(
253
14
            &ThriftServer::ThriftServerEventProcessor::supervise, this);
254
255
    // Loop protects against spurious wakeup. Locks provide necessary fences to ensure
256
    // visibility.
257
28
    while (!_signal_fired) {
258
        // Yields lock and allows supervision thread to continue and signal
259
14
        std::cv_status cvsts = _signal_cond.wait_for(lock, std::chrono::milliseconds(TIMEOUT_MS));
260
14
        if (cvsts == std::cv_status::timeout) {
261
0
            std::stringstream ss;
262
0
            ss << "ThriftServer '" << _thrift_server->_name
263
0
               << "' (on port: " << _thrift_server->_port << ") did not start within " << TIMEOUT_MS
264
0
               << "ms";
265
0
            LOG(ERROR) << ss.str();
266
0
            return Status::InternalError(ss.str());
267
0
        }
268
14
    }
269
270
    // _started == true only if preServe was called. May be false if there was an exception
271
    // after preServe that was caught by Supervise, causing it to reset the error condition.
272
14
    if (!_thrift_server->_started) {
273
0
        std::stringstream ss;
274
0
        ss << "ThriftServer '" << _thrift_server->_name << "' (on port: " << _thrift_server->_port
275
0
           << ") did not start correctly ";
276
0
        LOG(ERROR) << ss.str();
277
0
        return Status::InternalError(ss.str());
278
0
    }
279
280
14
    return Status::OK();
281
14
}
282
283
14
void ThriftServer::ThriftServerEventProcessor::supervise() {
284
14
    DCHECK(_thrift_server->_server.get() != nullptr);
285
286
14
    try {
287
14
        _thrift_server->_server->serve();
288
14
    } catch (apache::thrift::TException& e) {
289
0
        LOG(ERROR) << "ThriftServer '" << _thrift_server->_name
290
0
                   << "' (on port: " << _thrift_server->_port
291
0
                   << ") exited due to TException: " << e.what();
292
0
    }
293
294
14
    LOG(INFO) << "ThriftServer " << _thrift_server->_name << " exited";
295
296
6
    {
297
        // _signal_lock ensures mutual exclusion of access to _thrift_server
298
6
        std::lock_guard<std::mutex> lock(_signal_lock);
299
6
        _thrift_server->_started = false;
300
301
        // There may not be anyone waiting on this signal (if the
302
        // exception occurs after startup). That's not a problem, this is
303
        // just to avoid waiting for the timeout in case of a bind
304
        // failure, for example.
305
6
        _signal_fired = true;
306
6
    }
307
308
6
    _signal_cond.notify_all();
309
6
}
310
311
14
void ThriftServer::ThriftServerEventProcessor::preServe() {
312
    // Acquire the signal lock to ensure that StartAndWaitForServer is
313
    // waiting on _signal_cond when we notify.
314
14
    std::lock_guard<std::mutex> lock(_signal_lock);
315
14
    _signal_fired = true;
316
317
    // This is the (only) success path - if this is not reached within TIMEOUT_MS,
318
    // StartAndWaitForServer will indicate failure.
319
14
    _thrift_server->_started = true;
320
321
    // Should only be one thread waiting on _signal_cond, but wake all just in case.
322
14
    _signal_cond.notify_all();
323
14
}
324
325
// This thread-local variable contains the current session key for whichever
326
// thrift server is currently serving a request on the current thread.
327
__thread ThriftServer::SessionKey* _session_key;
328
329
0
ThriftServer::SessionKey* ThriftServer::get_thread_session_key() {
330
0
    return _session_key;
331
0
}
332
333
void* ThriftServer::ThriftServerEventProcessor::createContext(
334
        std::shared_ptr<apache::thrift::protocol::TProtocol> input,
335
66
        std::shared_ptr<apache::thrift::protocol::TProtocol> output) {
336
66
    std::stringstream ss;
337
338
66
    apache::thrift::transport::TSocket* socket = nullptr;
339
66
    apache::thrift::transport::TTransport* transport = input->getTransport().get();
340
66
    {
341
66
        switch (_thrift_server->_server_type) {
342
0
        case NON_BLOCKING:
343
0
            socket = static_cast<apache::thrift::transport::TSocket*>(
344
0
                    static_cast<apache::thrift::transport::TFramedTransport*>(transport)
345
0
                            ->getUnderlyingTransport()
346
0
                            .get());
347
0
            break;
348
349
0
        case THREAD_POOL:
350
66
        case THREADED:
351
66
            socket = static_cast<apache::thrift::transport::TSocket*>(
352
66
                    static_cast<apache::thrift::transport::TBufferedTransport*>(transport)
353
66
                            ->getUnderlyingTransport()
354
66
                            .get());
355
66
            break;
356
357
0
        default:
358
0
            DCHECK(false) << "Unexpected thrift server type";
359
66
        }
360
66
    }
361
362
66
    ss << socket->getPeerAddress() << ":" << socket->getPeerPort();
363
364
66
    {
365
66
        std::lock_guard<std::mutex> _l(_thrift_server->_session_keys_lock);
366
367
66
        std::shared_ptr<SessionKey> key_ptr(new std::string(ss.str()));
368
369
66
        _session_key = key_ptr.get();
370
66
        _thrift_server->_session_keys[key_ptr.get()] = key_ptr;
371
66
    }
372
373
66
    if (_thrift_server->_session_handler != nullptr) {
374
0
        _thrift_server->_session_handler->session_start(*_session_key);
375
0
    }
376
377
66
    _thrift_server->thrift_connections_total->increment(1L);
378
66
    _thrift_server->thrift_current_connections->increment(1L);
379
380
    // Store the _session_key in the per-client context to avoid recomputing
381
    // it. If only this were accessible from RPC method calls, we wouldn't have to
382
    // mess around with thread locals.
383
66
    return (void*)_session_key;
384
66
}
385
386
void ThriftServer::ThriftServerEventProcessor::processContext(
387
58.0k
        void* context, std::shared_ptr<apache::thrift::transport::TTransport> transport) {
388
58.0k
    _session_key = reinterpret_cast<SessionKey*>(context);
389
58.0k
}
390
391
void ThriftServer::ThriftServerEventProcessor::deleteContext(
392
        void* serverContext, std::shared_ptr<apache::thrift::protocol::TProtocol> input,
393
30
        std::shared_ptr<apache::thrift::protocol::TProtocol> output) {
394
30
    _session_key = (SessionKey*)serverContext;
395
396
30
    if (_thrift_server->_session_handler != nullptr) {
397
0
        _thrift_server->_session_handler->session_end(*_session_key);
398
0
    }
399
400
30
    {
401
30
        std::lock_guard<std::mutex> _l(_thrift_server->_session_keys_lock);
402
30
        _thrift_server->_session_keys.erase(_session_key);
403
30
    }
404
405
30
    _thrift_server->thrift_current_connections->increment(-1L);
406
30
}
407
408
ThriftServer::ThriftServer(const std::string& name,
409
                           const std::shared_ptr<apache::thrift::TProcessor>& processor, int port,
410
                           int num_worker_threads, ServerType server_type)
411
14
        : _started(false),
412
14
          _port(port),
413
14
          _num_worker_threads(num_worker_threads),
414
14
          _server_type(server_type),
415
14
          _name(name),
416
14
          _server_thread(nullptr),
417
14
          _server(nullptr),
418
14
          _processor(processor),
419
14
          _session_handler(nullptr) {
420
14
    _thrift_server_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(
421
14
            std::string("thrift_server.") + name, {{"name", name}});
422
14
    INT_GAUGE_METRIC_REGISTER(_thrift_server_metric_entity, thrift_current_connections);
423
14
    INT_COUNTER_METRIC_REGISTER(_thrift_server_metric_entity, thrift_connections_total);
424
14
}
425
426
6
ThriftServer::~ThriftServer() {
427
6
    stop();
428
6
    join();
429
6
}
430
431
14
Status ThriftServer::start() {
432
14
    DCHECK(!_started);
433
14
    std::shared_ptr<apache::thrift::protocol::TProtocolFactory> protocol_factory(
434
14
            new apache::thrift::protocol::TBinaryProtocolFactory());
435
14
    std::shared_ptr<apache::thrift::concurrency::ThreadManager> thread_mgr;
436
14
    std::shared_ptr<apache::thrift::concurrency::ThreadFactory> thread_factory =
437
14
            std::make_shared<apache::thrift::concurrency::ThreadFactory>();
438
14
    std::shared_ptr<apache::thrift::transport::TServerTransport> fe_server_transport;
439
14
    std::shared_ptr<apache::thrift::transport::TTransportFactory> transport_factory;
440
14
    std::shared_ptr<apache::thrift::transport::TNonblockingServerSocket> socket;
441
14
    if (_server_type != THREADED) {
442
0
        thread_mgr = apache::thrift::concurrency::ThreadManager::newSimpleThreadManager(
443
0
                _num_worker_threads);
444
0
        thread_mgr->threadFactory(thread_factory);
445
0
        thread_mgr->start();
446
0
    }
447
448
    // Note - if you change the transport types here, you must check that the
449
    // logic in createContext is still accurate.
450
14
    apache::thrift::transport::TServerSocket* server_socket = nullptr;
451
452
14
    switch (_server_type) {
453
0
    case NON_BLOCKING:
454
0
        socket = std::make_shared<ImprovedNonblockingServerSocket>(_port);
455
0
        if (transport_factory == nullptr) {
456
0
            transport_factory.reset(new apache::thrift::transport::TTransportFactory());
457
0
        }
458
459
0
        _server = std::make_unique<apache::thrift::server::TNonblockingServer>(
460
0
                _processor, transport_factory, transport_factory, protocol_factory,
461
0
                protocol_factory, socket, thread_mgr);
462
0
        break;
463
464
0
    case THREAD_POOL:
465
0
        fe_server_transport.reset(new ImprovedServerSocket(
466
0
                BackendOptions::get_service_bind_address_without_bracket(), _port));
467
468
0
        if (transport_factory == nullptr) {
469
0
            transport_factory = std::make_shared<ImprovedBufferedTransportFactory>();
470
0
        }
471
472
0
        _server = std::make_unique<apache::thrift::server::TThreadPoolServer>(
473
0
                _processor, fe_server_transport, transport_factory, protocol_factory, thread_mgr);
474
0
        break;
475
476
14
    case THREADED:
477
14
        server_socket = new ImprovedServerSocket(
478
14
                BackendOptions::get_service_bind_address_without_bracket(), _port);
479
14
        fe_server_transport.reset(server_socket);
480
14
        server_socket->setKeepAlive(true);
481
482
14
        if (transport_factory == nullptr) {
483
14
            transport_factory = std::make_shared<ImprovedBufferedTransportFactory>();
484
14
        }
485
486
14
        _server = std::make_unique<apache::thrift::server::TThreadedServer>(
487
14
                _processor, fe_server_transport, transport_factory, protocol_factory,
488
14
                thread_factory);
489
14
        break;
490
491
0
    default:
492
0
        std::stringstream error_msg;
493
0
        error_msg << "Unsupported server type: " << _server_type;
494
0
        LOG(ERROR) << error_msg.str();
495
0
        return Status::InternalError(error_msg.str());
496
14
    }
497
498
14
    std::shared_ptr<ThriftServer::ThriftServerEventProcessor> event_processor(
499
14
            new ThriftServer::ThriftServerEventProcessor(this));
500
14
    _server->setServerEventHandler(event_processor);
501
502
14
    RETURN_IF_ERROR(event_processor->start_and_wait_for_server());
503
504
14
    LOG(INFO) << "ThriftServer '" << _name << "' started on port: " << _port;
505
506
14
    DCHECK(_started);
507
14
    return Status::OK();
508
14
}
509
510
12
void ThriftServer::stop() {
511
12
    _server->stop();
512
12
}
513
514
6
void ThriftServer::join() {
515
6
    DCHECK(_server_thread != nullptr);
516
6
    _server_thread->join();
517
6
}
518
519
0
void ThriftServer::stop_for_testing() {
520
0
    DCHECK(_server_thread != nullptr);
521
0
    DCHECK(_server);
522
0
    DCHECK_EQ(_server_type, THREADED);
523
0
    _server->stop();
524
525
0
    if (_started) {
526
0
        join();
527
0
    }
528
0
}
529
} // namespace doris