Coverage Report

Created: 2026-03-12 17:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/service/brpc_service.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "service/brpc_service.h"
19
20
#include <brpc/server.h>
21
#include <brpc/ssl_options.h>
22
#include <butil/endpoint.h>
23
// IWYU pragma: no_include <bthread/errno.h>
24
#include <errno.h> // IWYU pragma: keep
25
#include <gflags/gflags_declare.h>
26
#include <string.h>
27
28
#include <ostream>
29
30
#include "cloud/cloud_internal_service.h"
31
#include "cloud/config.h"
32
#include "common/config.h"
33
#include "common/logging.h"
34
#include "runtime/exec_env.h"
35
#include "service/backend_options.h"
36
#include "service/internal_service.h"
37
#include "storage/storage_engine.h"
38
#include "util/mem_info.h"
39
40
namespace brpc {
41
42
DECLARE_uint64(max_body_size);
43
DECLARE_int64(socket_max_unwritten_bytes);
44
DECLARE_bool(usercode_in_pthread);
45
46
} // namespace brpc
47
48
namespace doris {
49
50
7
BRpcService::BRpcService(ExecEnv* exec_env) : _exec_env(exec_env), _server(new brpc::Server()) {
51
    // Set config
52
7
    if (config::brpc_usercode_in_pthread) {
53
0
        brpc::FLAGS_usercode_in_pthread = true;
54
0
    }
55
7
    brpc::FLAGS_max_body_size = config::brpc_max_body_size;
56
7
    brpc::FLAGS_socket_max_unwritten_bytes =
57
7
            config::brpc_socket_max_unwritten_bytes != -1
58
7
                    ? config::brpc_socket_max_unwritten_bytes
59
7
                    : std::max((int64_t)1073741824, (MemInfo::mem_limit() / 1024) * 20);
60
7
}
61
62
3
BRpcService::~BRpcService() {
63
3
    join();
64
3
}
65
66
7
Status BRpcService::start(int port, int num_threads) {
67
    // Add service
68
7
    if (config::is_cloud_mode()) {
69
1
        _server->AddService(
70
1
                new CloudInternalServiceImpl(_exec_env->storage_engine().to_cloud(), _exec_env),
71
1
                brpc::SERVER_OWNS_SERVICE);
72
6
    } else {
73
6
        _server->AddService(
74
6
                new PInternalServiceImpl(_exec_env->storage_engine().to_local(), _exec_env),
75
6
                brpc::SERVER_OWNS_SERVICE);
76
6
    }
77
    // start service
78
7
    brpc::ServerOptions options;
79
7
    if (num_threads != -1) {
80
7
        options.num_threads = num_threads;
81
7
    }
82
7
    options.idle_timeout_sec = config::brpc_idle_timeout_sec;
83
84
7
    if (config::enable_https) {
85
0
        auto sslOptions = options.mutable_ssl_options();
86
0
        sslOptions->default_cert.certificate = config::ssl_certificate_path;
87
0
        sslOptions->default_cert.private_key = config::ssl_private_key_path;
88
0
    }
89
90
7
    options.has_builtin_services = config::enable_brpc_builtin_services;
91
92
7
    butil::EndPoint point;
93
7
    if (butil::str2endpoint(BackendOptions::get_service_bind_address(), port, &point) < 0) {
94
0
        return Status::InternalError("convert address failed, host={}, port={}", "[::0]", port);
95
0
    }
96
7
    LOG(INFO) << "BRPC server bind to host: " << BackendOptions::get_service_bind_address()
97
7
              << ", port: " << port;
98
7
    if (_server->Start(point, &options) != 0) {
99
0
        char buf[64];
100
0
        LOG(WARNING) << "start brpc failed, errno=" << errno
101
0
                     << ", errmsg=" << strerror_r(errno, buf, 64) << ", port=" << port;
102
0
        return Status::InternalError("start brpc service failed");
103
0
    }
104
7
    return Status::OK();
105
7
}
106
107
3
void BRpcService::join() {
108
3
    int stop_succeed = _server->Stop(1000);
109
110
3
    if (stop_succeed == 0) {
111
3
        _server->Join();
112
3
    } else {
113
0
        LOG(WARNING) << "Failed to stop brpc service, "
114
0
                     << "not calling brpc server join since it will never retrun."
115
0
                     << "maybe something bad will happen, let us know if you meet something error.";
116
0
    }
117
118
3
    _server->ClearServices();
119
3
}
120
121
} // namespace doris