be/src/service/http/ev_http_server.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "service/http/ev_http_server.h" |
19 | | |
20 | | #include <arpa/inet.h> |
21 | | #include <butil/endpoint.h> |
22 | | #include <butil/fd_utility.h> |
23 | | // IWYU pragma: no_include <bthread/errno.h> |
24 | | #include <errno.h> // IWYU pragma: keep |
25 | | #include <event2/event.h> |
26 | | #include <event2/http.h> |
27 | | #include <event2/http_struct.h> |
28 | | #include <event2/thread.h> |
29 | | #include <netinet/in.h> |
30 | | #include <string.h> |
31 | | #include <sys/socket.h> |
32 | | #include <unistd.h> |
33 | | |
34 | | #include <algorithm> |
35 | | #include <memory> |
36 | | #include <sstream> |
37 | | |
38 | | #include "common/logging.h" |
39 | | #include "service/backend_options.h" |
40 | | #include "service/http/http_channel.h" |
41 | | #include "service/http/http_handler.h" |
42 | | #include "service/http/http_headers.h" |
43 | | #include "service/http/http_request.h" |
44 | | #include "service/http/http_status.h" |
45 | | #include "util/threadpool.h" |
46 | | |
47 | | struct event_base; |
48 | | struct evhttp; |
49 | | |
50 | | namespace doris { |
51 | | |
52 | 326k | static void on_chunked(struct evhttp_request* ev_req, void* param) { |
53 | 326k | HttpRequest* request = (HttpRequest*)ev_req->on_free_cb_arg; |
54 | 326k | request->handler()->on_chunk_data(request); |
55 | 326k | } |
56 | | |
57 | 6.78k | static void on_free(struct evhttp_request* ev_req, void* arg) { |
58 | 6.78k | HttpRequest* request = (HttpRequest*)arg; |
59 | 6.78k | request->wait_finish_send_reply(); |
60 | 6.78k | delete request; |
61 | 6.78k | } |
62 | | |
63 | 6.78k | static void on_request(struct evhttp_request* ev_req, void* arg) { |
64 | 6.78k | auto request = (HttpRequest*)ev_req->on_free_cb_arg; |
65 | 6.78k | if (request == nullptr) { |
66 | | // In this case, request's on_header return -1 |
67 | 2 | return; |
68 | 2 | } |
69 | 6.78k | request->handler()->handle(request); |
70 | 6.78k | } |
71 | | |
72 | 6.99k | static int on_header(struct evhttp_request* ev_req, void* param) { |
73 | 6.99k | EvHttpServer* server = (EvHttpServer*)ev_req->on_complete_cb_arg; |
74 | 6.99k | return server->on_header(ev_req); |
75 | 6.99k | } |
76 | | |
77 | | // param is pointer of EvHttpServer |
78 | 13.9k | static int on_connection(struct evhttp_request* req, void* param) { |
79 | 13.9k | evhttp_request_set_header_cb(req, on_header); |
80 | | // only used on_complete_cb's argument |
81 | 13.9k | evhttp_request_set_on_complete_cb(req, nullptr, param); |
82 | 13.9k | return 0; |
83 | 13.9k | } |
84 | | |
85 | | EvHttpServer::EvHttpServer(int port, int num_workers) |
86 | 10 | : _port(port), _num_workers(num_workers), _real_port(0) { |
87 | 10 | _host = BackendOptions::get_service_bind_address(); |
88 | | |
89 | 10 | evthread_use_pthreads(); |
90 | 10 | DCHECK_GT(_num_workers, 0); |
91 | 10 | _event_bases.resize(_num_workers); |
92 | 911 | for (int i = 0; i < _num_workers; ++i) { |
93 | 901 | std::shared_ptr<event_base> base(event_base_new(), |
94 | 901 | [](event_base* base) { event_base_free(base); }); |
95 | 901 | CHECK(base != nullptr) << "Couldn't create an event_base."; |
96 | 901 | std::lock_guard lock(_event_bases_lock); |
97 | 901 | _event_bases[i] = base; |
98 | 901 | } |
99 | 10 | } |
100 | | |
101 | | EvHttpServer::EvHttpServer(const std::string& host, int port, int num_workers) |
102 | 0 | : _host(host), _port(port), _num_workers(num_workers), _real_port(0) { |
103 | 0 | DCHECK_GT(_num_workers, 0); |
104 | 0 | } |
105 | | |
106 | 6 | EvHttpServer::~EvHttpServer() { |
107 | 6 | if (_started) { |
108 | 2 | stop(); |
109 | 2 | } |
110 | 6 | } |
111 | | |
112 | 10 | void EvHttpServer::start() { |
113 | 10 | _started = true; |
114 | | // bind to |
115 | 10 | auto s = _bind(); |
116 | 10 | CHECK(s.ok()) << s.to_string(); |
117 | 10 | static_cast<void>(ThreadPoolBuilder("EvHttpServer") |
118 | 10 | .set_min_threads(_num_workers) |
119 | 10 | .set_max_threads(_num_workers) |
120 | 10 | .build(&_workers)); |
121 | | |
122 | | // Pre-create all evhttp objects and store them as class members |
123 | | // to ensure proper lifecycle management during shutdown |
124 | 10 | { |
125 | 10 | std::lock_guard lock(_event_bases_lock); |
126 | 10 | _evhttp_servers.resize(_num_workers); |
127 | 911 | for (int i = 0; i < _num_workers; ++i) { |
128 | 901 | std::shared_ptr<evhttp> http(evhttp_new(_event_bases[i].get()), |
129 | 901 | [](evhttp* http) { evhttp_free(http); }); |
130 | 901 | CHECK(http != nullptr) << "Couldn't create an evhttp."; |
131 | | |
132 | 901 | auto res = evhttp_accept_socket(http.get(), _server_fd); |
133 | 901 | CHECK(res >= 0) << "evhttp accept socket failed, res=" << res; |
134 | | |
135 | 901 | evhttp_set_newreqcb(http.get(), on_connection, this); |
136 | 901 | evhttp_set_gencb(http.get(), on_request, this); |
137 | | |
138 | 901 | _evhttp_servers[i] = http; |
139 | 901 | } |
140 | 10 | } |
141 | | |
142 | 911 | for (int i = 0; i < _num_workers; ++i) { |
143 | 901 | auto status = _workers->submit_func([this, i]() { |
144 | 898 | std::shared_ptr<event_base> base; |
145 | 898 | { |
146 | 898 | std::lock_guard lock(_event_bases_lock); |
147 | 898 | base = _event_bases[i]; |
148 | 898 | } |
149 | 898 | event_base_dispatch(base.get()); |
150 | 898 | }); |
151 | 901 | CHECK(status.ok()); |
152 | 901 | } |
153 | 10 | } |
154 | | |
155 | 6 | void EvHttpServer::stop() { |
156 | | // 1. Close server fd first to reject new connections |
157 | 6 | close(_server_fd); |
158 | 6 | _server_fd = -1; |
159 | | |
160 | | // 2. Break all event loops to make dispatch return |
161 | 6 | { |
162 | 6 | std::lock_guard<std::mutex> lock(_event_bases_lock); |
163 | 395 | for (int i = 0; i < _num_workers; ++i) { |
164 | 389 | if (_event_bases[i]) { |
165 | 389 | event_base_loopbreak(_event_bases[i].get()); |
166 | 389 | } |
167 | 389 | } |
168 | 6 | } |
169 | | |
170 | | // 3. Wait for all worker threads to finish event_base_dispatch |
171 | 6 | _workers->shutdown(); |
172 | | |
173 | | // 4. Now it's safe to cleanup - all worker threads have exited |
174 | | // Clear evhttp before event_base since evhttp depends on event_base |
175 | 6 | { |
176 | 6 | std::lock_guard<std::mutex> lock(_event_bases_lock); |
177 | 6 | _evhttp_servers.clear(); |
178 | 6 | _event_bases.clear(); |
179 | 6 | } |
180 | | |
181 | 6 | _started = false; |
182 | 6 | } |
183 | | |
184 | 0 | void EvHttpServer::join() {} |
185 | | |
186 | 7 | Status EvHttpServer::_bind() { |
187 | 7 | butil::EndPoint point; |
188 | 7 | auto res = butil::str2endpoint(_host.c_str(), _port, &point); |
189 | 7 | if (res < 0) { |
190 | 0 | return Status::InternalError("convert address failed, host={}, port={}", _host, _port); |
191 | 0 | } |
192 | 7 | _server_fd = butil::tcp_listen(point); |
193 | 7 | if (_server_fd < 0) { |
194 | 0 | char buf[64]; |
195 | 0 | std::stringstream ss; |
196 | 0 | ss << "tcp listen failed, errno=" << errno |
197 | 0 | << ", errmsg=" << strerror_r(errno, buf, sizeof(buf)); |
198 | 0 | return Status::InternalError(ss.str()); |
199 | 0 | } |
200 | 7 | if (_port == 0) { |
201 | 0 | struct sockaddr_in addr; |
202 | 0 | socklen_t socklen = sizeof(addr); |
203 | 0 | const int rc = getsockname(_server_fd, (struct sockaddr*)&addr, &socklen); |
204 | 0 | if (rc == 0) { |
205 | 0 | _real_port = ntohs(addr.sin_port); |
206 | 0 | } |
207 | 0 | } |
208 | 7 | res = butil::make_non_blocking(_server_fd); |
209 | 7 | if (res < 0) { |
210 | 0 | char buf[64]; |
211 | 0 | std::stringstream ss; |
212 | 0 | ss << "make socket to non_blocking failed, errno=" << errno |
213 | 0 | << ", errmsg=" << strerror_r(errno, buf, sizeof(buf)); |
214 | 0 | return Status::InternalError(ss.str()); |
215 | 0 | } |
216 | 7 | return Status::OK(); |
217 | 7 | } |
218 | | |
219 | | bool EvHttpServer::register_handler(const HttpMethod& method, const std::string& path, |
220 | 641 | HttpHandler* handler) { |
221 | 641 | if (handler == nullptr) { |
222 | 0 | LOG(WARNING) << "dummy handler for http method " << method << " with path " << path; |
223 | 0 | return false; |
224 | 0 | } |
225 | | |
226 | 641 | bool result = true; |
227 | 641 | std::lock_guard<std::mutex> lock(_handler_lock); |
228 | 641 | PathTrie<HttpHandler*>* root = nullptr; |
229 | 641 | switch (method) { |
230 | 447 | case GET: |
231 | 447 | root = &_get_handlers; |
232 | 447 | break; |
233 | 48 | case PUT: |
234 | 48 | root = &_put_handlers; |
235 | 48 | break; |
236 | 96 | case POST: |
237 | 96 | root = &_post_handlers; |
238 | 96 | break; |
239 | 0 | case DELETE: |
240 | 0 | root = &_delete_handlers; |
241 | 0 | break; |
242 | 50 | case HEAD: |
243 | 50 | root = &_head_handlers; |
244 | 50 | break; |
245 | 0 | case OPTIONS: |
246 | 0 | root = &_options_handlers; |
247 | 0 | break; |
248 | 0 | default: |
249 | 0 | LOG(WARNING) << "unknown HTTP method, method=" << method; |
250 | 0 | result = false; |
251 | 641 | } |
252 | 641 | if (result) { |
253 | 641 | result = root->insert(path, handler); |
254 | 641 | } |
255 | | |
256 | 641 | return result; |
257 | 641 | } |
258 | | |
259 | 8 | void EvHttpServer::register_static_file_handler(HttpHandler* handler) { |
260 | 8 | DCHECK(handler != nullptr); |
261 | 8 | DCHECK(_static_file_handler == nullptr); |
262 | 8 | std::lock_guard<std::mutex> lock(_handler_lock); |
263 | 8 | _static_file_handler = handler; |
264 | 8 | } |
265 | | |
266 | 6.99k | int EvHttpServer::on_header(struct evhttp_request* ev_req) { |
267 | 6.99k | std::unique_ptr<HttpRequest> request(new HttpRequest(ev_req)); |
268 | 6.99k | auto res = request->init_from_evhttp(); |
269 | 6.99k | if (res < 0) { |
270 | 0 | return -1; |
271 | 0 | } |
272 | 6.99k | auto handler = _find_handler(request.get()); |
273 | 6.99k | if (handler == nullptr) { |
274 | 3 | evhttp_remove_header(evhttp_request_get_input_headers(ev_req), HttpHeaders::EXPECT); |
275 | 3 | HttpChannel::send_reply(request.get(), HttpStatus::NOT_FOUND, "Not Found"); |
276 | 3 | return 0; |
277 | 3 | } |
278 | | // set handler before call on_header, because handler_ctx will set in on_header |
279 | 6.98k | request->set_handler(handler); |
280 | 6.98k | res = handler->on_header(request.get()); |
281 | 6.98k | if (res < 0) { |
282 | | // reply has already sent by handler's on_header |
283 | 202 | evhttp_remove_header(evhttp_request_get_input_headers(ev_req), HttpHeaders::EXPECT); |
284 | 202 | return 0; |
285 | 202 | } |
286 | | |
287 | | // If request body would be big(greater than 1GB), |
288 | | // it is better that request_will_be_read_progressively is set true, |
289 | | // this can make body read in chunk, not in total |
290 | 6.78k | if (handler->request_will_be_read_progressively()) { |
291 | 2.23k | evhttp_request_set_chunked_cb(ev_req, on_chunked); |
292 | 2.23k | } |
293 | | |
294 | 6.78k | evhttp_request_set_on_free_cb(ev_req, on_free, request.release()); |
295 | 6.78k | return 0; |
296 | 6.98k | } |
297 | | |
298 | 6.99k | HttpHandler* EvHttpServer::_find_handler(HttpRequest* req) { |
299 | 6.99k | auto& path = req->raw_path(); |
300 | | |
301 | 6.99k | HttpHandler* handler = nullptr; |
302 | | |
303 | 6.99k | std::lock_guard<std::mutex> lock(_handler_lock); |
304 | 6.99k | switch (req->method()) { |
305 | 3.86k | case GET: |
306 | 3.86k | _get_handlers.retrieve(path, &handler, req->params()); |
307 | | // Static file handler is a fallback handler |
308 | 3.86k | if (handler == nullptr) { |
309 | 1 | handler = _static_file_handler; |
310 | 1 | } |
311 | 3.86k | break; |
312 | 2.47k | case PUT: |
313 | 2.47k | _put_handlers.retrieve(path, &handler, req->params()); |
314 | 2.47k | break; |
315 | 645 | case POST: |
316 | 645 | _post_handlers.retrieve(path, &handler, req->params()); |
317 | 645 | break; |
318 | 0 | case DELETE: |
319 | 0 | _delete_handlers.retrieve(path, &handler, req->params()); |
320 | 0 | break; |
321 | 7 | case HEAD: |
322 | 7 | _head_handlers.retrieve(path, &handler, req->params()); |
323 | 7 | break; |
324 | 0 | case OPTIONS: |
325 | 0 | _options_handlers.retrieve(path, &handler, req->params()); |
326 | 0 | break; |
327 | 0 | default: |
328 | 0 | LOG(WARNING) << "unknown HTTP method, method=" << req->method(); |
329 | 0 | break; |
330 | 6.99k | } |
331 | 6.99k | return handler; |
332 | 6.99k | } |
333 | | |
334 | | } // namespace doris |