Coverage Report

Created: 2026-03-13 19:41

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/service/http/ev_http_server.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "service/http/ev_http_server.h"
19
20
#include <arpa/inet.h>
21
#include <butil/endpoint.h>
22
#include <butil/fd_utility.h>
23
// IWYU pragma: no_include <bthread/errno.h>
24
#include <errno.h> // IWYU pragma: keep
25
#include <event2/event.h>
26
#include <event2/http.h>
27
#include <event2/http_struct.h>
28
#include <event2/thread.h>
29
#include <netinet/in.h>
30
#include <string.h>
31
#include <sys/socket.h>
32
#include <unistd.h>
33
34
#include <algorithm>
35
#include <memory>
36
#include <sstream>
37
38
#include "common/logging.h"
39
#include "service/backend_options.h"
40
#include "service/http/http_channel.h"
41
#include "service/http/http_handler.h"
42
#include "service/http/http_headers.h"
43
#include "service/http/http_request.h"
44
#include "service/http/http_status.h"
45
#include "util/threadpool.h"
46
47
struct event_base;
48
struct evhttp;
49
50
namespace doris {
51
52
0
static void on_chunked(struct evhttp_request* ev_req, void* param) {
53
0
    HttpRequest* request = (HttpRequest*)ev_req->on_free_cb_arg;
54
0
    request->handler()->on_chunk_data(request);
55
0
}
56
57
31
static void on_free(struct evhttp_request* ev_req, void* arg) {
58
31
    HttpRequest* request = (HttpRequest*)arg;
59
31
    request->wait_finish_send_reply();
60
31
    delete request;
61
31
}
62
63
33
static void on_request(struct evhttp_request* ev_req, void* arg) {
64
33
    auto request = (HttpRequest*)ev_req->on_free_cb_arg;
65
33
    if (request == nullptr) {
66
        // In this case, request's on_header return -1
67
2
        return;
68
2
    }
69
31
    request->handler()->handle(request);
70
31
}
71
72
54
static int on_header(struct evhttp_request* ev_req, void* param) {
73
54
    EvHttpServer* server = (EvHttpServer*)ev_req->on_complete_cb_arg;
74
54
    return server->on_header(ev_req);
75
54
}
76
77
// param is pointer of EvHttpServer
78
106
static int on_connection(struct evhttp_request* req, void* param) {
79
106
    evhttp_request_set_header_cb(req, on_header);
80
    // only used on_complete_cb's argument
81
106
    evhttp_request_set_on_complete_cb(req, nullptr, param);
82
106
    return 0;
83
106
}
84
85
EvHttpServer::EvHttpServer(int port, int num_workers)
86
3
        : _port(port), _num_workers(num_workers), _real_port(0) {
87
3
    _host = BackendOptions::get_service_bind_address();
88
89
3
    evthread_use_pthreads();
90
3
    DCHECK_GT(_num_workers, 0);
91
3
    _event_bases.resize(_num_workers);
92
8
    for (int i = 0; i < _num_workers; ++i) {
93
5
        std::shared_ptr<event_base> base(event_base_new(),
94
5
                                         [](event_base* base) { event_base_free(base); });
95
5
        CHECK(base != nullptr) << "Couldn't create an event_base.";
96
5
        std::lock_guard lock(_event_bases_lock);
97
5
        _event_bases[i] = base;
98
5
    }
99
3
}
100
101
EvHttpServer::EvHttpServer(const std::string& host, int port, int num_workers)
102
0
        : _host(host), _port(port), _num_workers(num_workers), _real_port(0) {
103
0
    DCHECK_GT(_num_workers, 0);
104
0
}
105
106
3
EvHttpServer::~EvHttpServer() {
107
3
    if (_started) {
108
2
        stop();
109
2
    }
110
3
}
111
112
3
void EvHttpServer::start() {
113
3
    _started = true;
114
    // bind to
115
3
    auto s = _bind();
116
3
    CHECK(s.ok()) << s.to_string();
117
3
    static_cast<void>(ThreadPoolBuilder("EvHttpServer")
118
3
                              .set_min_threads(_num_workers)
119
3
                              .set_max_threads(_num_workers)
120
3
                              .build(&_workers));
121
122
    // Pre-create all evhttp objects and store them as class members
123
    // to ensure proper lifecycle management during shutdown
124
3
    {
125
3
        std::lock_guard lock(_event_bases_lock);
126
3
        _evhttp_servers.resize(_num_workers);
127
8
        for (int i = 0; i < _num_workers; ++i) {
128
5
            std::shared_ptr<evhttp> http(evhttp_new(_event_bases[i].get()),
129
5
                                         [](evhttp* http) { evhttp_free(http); });
130
5
            CHECK(http != nullptr) << "Couldn't create an evhttp.";
131
132
5
            auto res = evhttp_accept_socket(http.get(), _server_fd);
133
5
            CHECK(res >= 0) << "evhttp accept socket failed, res=" << res;
134
135
5
            evhttp_set_newreqcb(http.get(), on_connection, this);
136
5
            evhttp_set_gencb(http.get(), on_request, this);
137
138
5
            _evhttp_servers[i] = http;
139
5
        }
140
3
    }
141
142
8
    for (int i = 0; i < _num_workers; ++i) {
143
5
        auto status = _workers->submit_func([this, i]() {
144
5
            std::shared_ptr<event_base> base;
145
5
            {
146
5
                std::lock_guard lock(_event_bases_lock);
147
5
                base = _event_bases[i];
148
5
            }
149
5
            event_base_dispatch(base.get());
150
5
        });
151
5
        CHECK(status.ok());
152
5
    }
153
3
}
154
155
3
void EvHttpServer::stop() {
156
    // 1. Close server fd first to reject new connections
157
3
    close(_server_fd);
158
3
    _server_fd = -1;
159
160
    // 2. Break all event loops to make dispatch return
161
3
    {
162
3
        std::lock_guard<std::mutex> lock(_event_bases_lock);
163
8
        for (int i = 0; i < _num_workers; ++i) {
164
5
            if (_event_bases[i]) {
165
5
                event_base_loopbreak(_event_bases[i].get());
166
5
            }
167
5
        }
168
3
    }
169
170
    // 3. Wait for all worker threads to finish event_base_dispatch
171
3
    _workers->shutdown();
172
173
    // 4. Now it's safe to cleanup - all worker threads have exited
174
    // Clear evhttp before event_base since evhttp depends on event_base
175
3
    {
176
3
        std::lock_guard<std::mutex> lock(_event_bases_lock);
177
3
        _evhttp_servers.clear();
178
3
        _event_bases.clear();
179
3
    }
180
181
3
    _started = false;
182
3
}
183
184
0
void EvHttpServer::join() {}
185
186
3
Status EvHttpServer::_bind() {
187
3
    butil::EndPoint point;
188
3
    auto res = butil::str2endpoint(_host.c_str(), _port, &point);
189
3
    if (res < 0) {
190
0
        return Status::InternalError("convert address failed, host={}, port={}", _host, _port);
191
0
    }
192
3
    _server_fd = butil::tcp_listen(point);
193
3
    if (_server_fd < 0) {
194
0
        char buf[64];
195
0
        std::stringstream ss;
196
0
        ss << "tcp listen failed, errno=" << errno
197
0
           << ", errmsg=" << strerror_r(errno, buf, sizeof(buf));
198
0
        return Status::InternalError(ss.str());
199
0
    }
200
3
    if (_port == 0) {
201
2
        struct sockaddr_in addr;
202
2
        socklen_t socklen = sizeof(addr);
203
2
        const int rc = getsockname(_server_fd, (struct sockaddr*)&addr, &socklen);
204
2
        if (rc == 0) {
205
2
            _real_port = ntohs(addr.sin_port);
206
2
        }
207
2
    }
208
3
    res = butil::make_non_blocking(_server_fd);
209
3
    if (res < 0) {
210
0
        char buf[64];
211
0
        std::stringstream ss;
212
0
        ss << "make socket to non_blocking failed, errno=" << errno
213
0
           << ", errmsg=" << strerror_r(errno, buf, sizeof(buf));
214
0
        return Status::InternalError(ss.str());
215
0
    }
216
3
    return Status::OK();
217
3
}
218
219
bool EvHttpServer::register_handler(const HttpMethod& method, const std::string& path,
220
65
                                    HttpHandler* handler) {
221
65
    if (handler == nullptr) {
222
0
        LOG(WARNING) << "dummy handler for http method " << method << " with path " << path;
223
0
        return false;
224
0
    }
225
226
65
    bool result = true;
227
65
    std::lock_guard<std::mutex> lock(_handler_lock);
228
65
    PathTrie<HttpHandler*>* root = nullptr;
229
65
    switch (method) {
230
43
    case GET:
231
43
        root = &_get_handlers;
232
43
        break;
233
6
    case PUT:
234
6
        root = &_put_handlers;
235
6
        break;
236
10
    case POST:
237
10
        root = &_post_handlers;
238
10
        break;
239
0
    case DELETE:
240
0
        root = &_delete_handlers;
241
0
        break;
242
6
    case HEAD:
243
6
        root = &_head_handlers;
244
6
        break;
245
0
    case OPTIONS:
246
0
        root = &_options_handlers;
247
0
        break;
248
0
    default:
249
0
        LOG(WARNING) << "unknown HTTP method, method=" << method;
250
0
        result = false;
251
65
    }
252
65
    if (result) {
253
65
        result = root->insert(path, handler);
254
65
    }
255
256
65
    return result;
257
65
}
258
259
1
void EvHttpServer::register_static_file_handler(HttpHandler* handler) {
260
1
    DCHECK(handler != nullptr);
261
1
    DCHECK(_static_file_handler == nullptr);
262
1
    std::lock_guard<std::mutex> lock(_handler_lock);
263
1
    _static_file_handler = handler;
264
1
}
265
266
54
int EvHttpServer::on_header(struct evhttp_request* ev_req) {
267
54
    std::unique_ptr<HttpRequest> request(new HttpRequest(ev_req));
268
54
    auto res = request->init_from_evhttp();
269
54
    if (res < 0) {
270
0
        return -1;
271
0
    }
272
54
    auto handler = _find_handler(request.get());
273
54
    if (handler == nullptr) {
274
1
        evhttp_remove_header(evhttp_request_get_input_headers(ev_req), HttpHeaders::EXPECT);
275
1
        HttpChannel::send_reply(request.get(), HttpStatus::NOT_FOUND, "Not Found");
276
1
        return 0;
277
1
    }
278
    // set handler before call on_header, because handler_ctx will set in on_header
279
53
    request->set_handler(handler);
280
53
    res = handler->on_header(request.get());
281
53
    if (res < 0) {
282
        // reply has already sent by handler's on_header
283
22
        evhttp_remove_header(evhttp_request_get_input_headers(ev_req), HttpHeaders::EXPECT);
284
22
        return 0;
285
22
    }
286
287
    // If request body would be big(greater than 1GB),
288
    // it is better that request_will_be_read_progressively is set true,
289
    // this can make body read in chunk, not in total
290
31
    if (handler->request_will_be_read_progressively()) {
291
0
        evhttp_request_set_chunked_cb(ev_req, on_chunked);
292
0
    }
293
294
31
    evhttp_request_set_on_free_cb(ev_req, on_free, request.release());
295
31
    return 0;
296
53
}
297
298
54
HttpHandler* EvHttpServer::_find_handler(HttpRequest* req) {
299
54
    auto& path = req->raw_path();
300
301
54
    HttpHandler* handler = nullptr;
302
303
54
    std::lock_guard<std::mutex> lock(_handler_lock);
304
54
    switch (req->method()) {
305
35
    case GET:
306
35
        _get_handlers.retrieve(path, &handler, req->params());
307
        // Static file handler is a fallback handler
308
35
        if (handler == nullptr) {
309
0
            handler = _static_file_handler;
310
0
        }
311
35
        break;
312
0
    case PUT:
313
0
        _put_handlers.retrieve(path, &handler, req->params());
314
0
        break;
315
12
    case POST:
316
12
        _post_handlers.retrieve(path, &handler, req->params());
317
12
        break;
318
0
    case DELETE:
319
0
        _delete_handlers.retrieve(path, &handler, req->params());
320
0
        break;
321
7
    case HEAD:
322
7
        _head_handlers.retrieve(path, &handler, req->params());
323
7
        break;
324
0
    case OPTIONS:
325
0
        _options_handlers.retrieve(path, &handler, req->params());
326
0
        break;
327
0
    default:
328
0
        LOG(WARNING) << "unknown HTTP method, method=" << req->method();
329
0
        break;
330
54
    }
331
54
    return handler;
332
54
}
333
334
} // namespace doris