Coverage Report

Created: 2026-03-13 09:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/service/http/ev_http_server.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "service/http/ev_http_server.h"
19
20
#include <arpa/inet.h>
21
#include <butil/endpoint.h>
22
#include <butil/fd_utility.h>
23
// IWYU pragma: no_include <bthread/errno.h>
24
#include <errno.h> // IWYU pragma: keep
25
#include <event2/event.h>
26
#include <event2/http.h>
27
#include <event2/http_struct.h>
28
#include <event2/thread.h>
29
#include <netinet/in.h>
30
#include <string.h>
31
#include <sys/socket.h>
32
#include <unistd.h>
33
34
#include <algorithm>
35
#include <memory>
36
#include <sstream>
37
38
#include "common/logging.h"
39
#include "service/backend_options.h"
40
#include "service/http/http_channel.h"
41
#include "service/http/http_handler.h"
42
#include "service/http/http_headers.h"
43
#include "service/http/http_request.h"
44
#include "service/http/http_status.h"
45
#include "util/threadpool.h"
46
47
struct event_base;
48
struct evhttp;
49
50
namespace doris {
51
52
326k
static void on_chunked(struct evhttp_request* ev_req, void* param) {
53
326k
    HttpRequest* request = (HttpRequest*)ev_req->on_free_cb_arg;
54
326k
    request->handler()->on_chunk_data(request);
55
326k
}
56
57
6.78k
static void on_free(struct evhttp_request* ev_req, void* arg) {
58
6.78k
    HttpRequest* request = (HttpRequest*)arg;
59
6.78k
    request->wait_finish_send_reply();
60
6.78k
    delete request;
61
6.78k
}
62
63
6.78k
static void on_request(struct evhttp_request* ev_req, void* arg) {
64
6.78k
    auto request = (HttpRequest*)ev_req->on_free_cb_arg;
65
6.78k
    if (request == nullptr) {
66
        // In this case, request's on_header return -1
67
2
        return;
68
2
    }
69
6.78k
    request->handler()->handle(request);
70
6.78k
}
71
72
6.99k
static int on_header(struct evhttp_request* ev_req, void* param) {
73
6.99k
    EvHttpServer* server = (EvHttpServer*)ev_req->on_complete_cb_arg;
74
6.99k
    return server->on_header(ev_req);
75
6.99k
}
76
77
// param is pointer of EvHttpServer
78
13.9k
static int on_connection(struct evhttp_request* req, void* param) {
79
13.9k
    evhttp_request_set_header_cb(req, on_header);
80
    // only used on_complete_cb's argument
81
13.9k
    evhttp_request_set_on_complete_cb(req, nullptr, param);
82
13.9k
    return 0;
83
13.9k
}
84
85
EvHttpServer::EvHttpServer(int port, int num_workers)
86
10
        : _port(port), _num_workers(num_workers), _real_port(0) {
87
10
    _host = BackendOptions::get_service_bind_address();
88
89
10
    evthread_use_pthreads();
90
10
    DCHECK_GT(_num_workers, 0);
91
10
    _event_bases.resize(_num_workers);
92
911
    for (int i = 0; i < _num_workers; ++i) {
93
901
        std::shared_ptr<event_base> base(event_base_new(),
94
901
                                         [](event_base* base) { event_base_free(base); });
95
901
        CHECK(base != nullptr) << "Couldn't create an event_base.";
96
901
        std::lock_guard lock(_event_bases_lock);
97
901
        _event_bases[i] = base;
98
901
    }
99
10
}
100
101
EvHttpServer::EvHttpServer(const std::string& host, int port, int num_workers)
102
0
        : _host(host), _port(port), _num_workers(num_workers), _real_port(0) {
103
0
    DCHECK_GT(_num_workers, 0);
104
0
}
105
106
6
EvHttpServer::~EvHttpServer() {
107
6
    if (_started) {
108
2
        stop();
109
2
    }
110
6
}
111
112
10
void EvHttpServer::start() {
113
10
    _started = true;
114
    // bind to
115
10
    auto s = _bind();
116
10
    CHECK(s.ok()) << s.to_string();
117
10
    static_cast<void>(ThreadPoolBuilder("EvHttpServer")
118
10
                              .set_min_threads(_num_workers)
119
10
                              .set_max_threads(_num_workers)
120
10
                              .build(&_workers));
121
122
    // Pre-create all evhttp objects and store them as class members
123
    // to ensure proper lifecycle management during shutdown
124
10
    {
125
10
        std::lock_guard lock(_event_bases_lock);
126
10
        _evhttp_servers.resize(_num_workers);
127
911
        for (int i = 0; i < _num_workers; ++i) {
128
901
            std::shared_ptr<evhttp> http(evhttp_new(_event_bases[i].get()),
129
901
                                         [](evhttp* http) { evhttp_free(http); });
130
901
            CHECK(http != nullptr) << "Couldn't create an evhttp.";
131
132
901
            auto res = evhttp_accept_socket(http.get(), _server_fd);
133
901
            CHECK(res >= 0) << "evhttp accept socket failed, res=" << res;
134
135
901
            evhttp_set_newreqcb(http.get(), on_connection, this);
136
901
            evhttp_set_gencb(http.get(), on_request, this);
137
138
901
            _evhttp_servers[i] = http;
139
901
        }
140
10
    }
141
142
911
    for (int i = 0; i < _num_workers; ++i) {
143
901
        auto status = _workers->submit_func([this, i]() {
144
898
            std::shared_ptr<event_base> base;
145
898
            {
146
898
                std::lock_guard lock(_event_bases_lock);
147
898
                base = _event_bases[i];
148
898
            }
149
898
            event_base_dispatch(base.get());
150
898
        });
151
901
        CHECK(status.ok());
152
901
    }
153
10
}
154
155
6
void EvHttpServer::stop() {
156
    // 1. Close server fd first to reject new connections
157
6
    close(_server_fd);
158
6
    _server_fd = -1;
159
160
    // 2. Break all event loops to make dispatch return
161
6
    {
162
6
        std::lock_guard<std::mutex> lock(_event_bases_lock);
163
395
        for (int i = 0; i < _num_workers; ++i) {
164
389
            if (_event_bases[i]) {
165
389
                event_base_loopbreak(_event_bases[i].get());
166
389
            }
167
389
        }
168
6
    }
169
170
    // 3. Wait for all worker threads to finish event_base_dispatch
171
6
    _workers->shutdown();
172
173
    // 4. Now it's safe to cleanup - all worker threads have exited
174
    // Clear evhttp before event_base since evhttp depends on event_base
175
6
    {
176
6
        std::lock_guard<std::mutex> lock(_event_bases_lock);
177
6
        _evhttp_servers.clear();
178
6
        _event_bases.clear();
179
6
    }
180
181
6
    _started = false;
182
6
}
183
184
0
void EvHttpServer::join() {}
185
186
7
Status EvHttpServer::_bind() {
187
7
    butil::EndPoint point;
188
7
    auto res = butil::str2endpoint(_host.c_str(), _port, &point);
189
7
    if (res < 0) {
190
0
        return Status::InternalError("convert address failed, host={}, port={}", _host, _port);
191
0
    }
192
7
    _server_fd = butil::tcp_listen(point);
193
7
    if (_server_fd < 0) {
194
0
        char buf[64];
195
0
        std::stringstream ss;
196
0
        ss << "tcp listen failed, errno=" << errno
197
0
           << ", errmsg=" << strerror_r(errno, buf, sizeof(buf));
198
0
        return Status::InternalError(ss.str());
199
0
    }
200
7
    if (_port == 0) {
201
0
        struct sockaddr_in addr;
202
0
        socklen_t socklen = sizeof(addr);
203
0
        const int rc = getsockname(_server_fd, (struct sockaddr*)&addr, &socklen);
204
0
        if (rc == 0) {
205
0
            _real_port = ntohs(addr.sin_port);
206
0
        }
207
0
    }
208
7
    res = butil::make_non_blocking(_server_fd);
209
7
    if (res < 0) {
210
0
        char buf[64];
211
0
        std::stringstream ss;
212
0
        ss << "make socket to non_blocking failed, errno=" << errno
213
0
           << ", errmsg=" << strerror_r(errno, buf, sizeof(buf));
214
0
        return Status::InternalError(ss.str());
215
0
    }
216
7
    return Status::OK();
217
7
}
218
219
bool EvHttpServer::register_handler(const HttpMethod& method, const std::string& path,
220
641
                                    HttpHandler* handler) {
221
641
    if (handler == nullptr) {
222
0
        LOG(WARNING) << "dummy handler for http method " << method << " with path " << path;
223
0
        return false;
224
0
    }
225
226
641
    bool result = true;
227
641
    std::lock_guard<std::mutex> lock(_handler_lock);
228
641
    PathTrie<HttpHandler*>* root = nullptr;
229
641
    switch (method) {
230
447
    case GET:
231
447
        root = &_get_handlers;
232
447
        break;
233
48
    case PUT:
234
48
        root = &_put_handlers;
235
48
        break;
236
96
    case POST:
237
96
        root = &_post_handlers;
238
96
        break;
239
0
    case DELETE:
240
0
        root = &_delete_handlers;
241
0
        break;
242
50
    case HEAD:
243
50
        root = &_head_handlers;
244
50
        break;
245
0
    case OPTIONS:
246
0
        root = &_options_handlers;
247
0
        break;
248
0
    default:
249
0
        LOG(WARNING) << "unknown HTTP method, method=" << method;
250
0
        result = false;
251
641
    }
252
641
    if (result) {
253
641
        result = root->insert(path, handler);
254
641
    }
255
256
641
    return result;
257
641
}
258
259
8
void EvHttpServer::register_static_file_handler(HttpHandler* handler) {
260
8
    DCHECK(handler != nullptr);
261
8
    DCHECK(_static_file_handler == nullptr);
262
8
    std::lock_guard<std::mutex> lock(_handler_lock);
263
8
    _static_file_handler = handler;
264
8
}
265
266
6.99k
int EvHttpServer::on_header(struct evhttp_request* ev_req) {
267
6.99k
    std::unique_ptr<HttpRequest> request(new HttpRequest(ev_req));
268
6.99k
    auto res = request->init_from_evhttp();
269
6.99k
    if (res < 0) {
270
0
        return -1;
271
0
    }
272
6.99k
    auto handler = _find_handler(request.get());
273
6.99k
    if (handler == nullptr) {
274
3
        evhttp_remove_header(evhttp_request_get_input_headers(ev_req), HttpHeaders::EXPECT);
275
3
        HttpChannel::send_reply(request.get(), HttpStatus::NOT_FOUND, "Not Found");
276
3
        return 0;
277
3
    }
278
    // set handler before call on_header, because handler_ctx will set in on_header
279
6.98k
    request->set_handler(handler);
280
6.98k
    res = handler->on_header(request.get());
281
6.98k
    if (res < 0) {
282
        // reply has already sent by handler's on_header
283
202
        evhttp_remove_header(evhttp_request_get_input_headers(ev_req), HttpHeaders::EXPECT);
284
202
        return 0;
285
202
    }
286
287
    // If request body would be big(greater than 1GB),
288
    // it is better that request_will_be_read_progressively is set true,
289
    // this can make body read in chunk, not in total
290
6.78k
    if (handler->request_will_be_read_progressively()) {
291
2.23k
        evhttp_request_set_chunked_cb(ev_req, on_chunked);
292
2.23k
    }
293
294
6.78k
    evhttp_request_set_on_free_cb(ev_req, on_free, request.release());
295
6.78k
    return 0;
296
6.98k
}
297
298
6.99k
HttpHandler* EvHttpServer::_find_handler(HttpRequest* req) {
299
6.99k
    auto& path = req->raw_path();
300
301
6.99k
    HttpHandler* handler = nullptr;
302
303
6.99k
    std::lock_guard<std::mutex> lock(_handler_lock);
304
6.99k
    switch (req->method()) {
305
3.86k
    case GET:
306
3.86k
        _get_handlers.retrieve(path, &handler, req->params());
307
        // Static file handler is a fallback handler
308
3.86k
        if (handler == nullptr) {
309
1
            handler = _static_file_handler;
310
1
        }
311
3.86k
        break;
312
2.47k
    case PUT:
313
2.47k
        _put_handlers.retrieve(path, &handler, req->params());
314
2.47k
        break;
315
645
    case POST:
316
645
        _post_handlers.retrieve(path, &handler, req->params());
317
645
        break;
318
0
    case DELETE:
319
0
        _delete_handlers.retrieve(path, &handler, req->params());
320
0
        break;
321
7
    case HEAD:
322
7
        _head_handlers.retrieve(path, &handler, req->params());
323
7
        break;
324
0
    case OPTIONS:
325
0
        _options_handlers.retrieve(path, &handler, req->params());
326
0
        break;
327
0
    default:
328
0
        LOG(WARNING) << "unknown HTTP method, method=" << req->method();
329
0
        break;
330
6.99k
    }
331
6.99k
    return handler;
332
6.99k
}
333
334
} // namespace doris