/root/doris/be/src/http/http_request.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "http/http_request.h" |
19 | | |
20 | | #include <event2/buffer.h> |
21 | | #include <event2/http.h> |
22 | | #include <event2/http_struct.h> |
23 | | #include <event2/keyvalq_struct.h> |
24 | | |
25 | | #include <memory> |
26 | | #include <sstream> |
27 | | #include <string> |
28 | | #include <unordered_map> |
29 | | #include <utility> |
30 | | |
31 | | #include "http/http_handler.h" |
32 | | #include "runtime/stream_load/stream_load_context.h" |
33 | | #include "util/stack_util.h" |
34 | | |
35 | | namespace doris { |
36 | | |
37 | | static std::string s_empty = ""; |
38 | | |
39 | 70 | HttpRequest::HttpRequest(evhttp_request* evhttp_request) : _ev_req(evhttp_request) {} |
40 | | |
41 | 70 | HttpRequest::~HttpRequest() { |
42 | 70 | if (_handler_ctx != nullptr) { |
43 | 0 | DCHECK(_handler != nullptr); |
44 | 0 | _handler->free_handler_ctx(_handler_ctx); |
45 | 0 | } |
46 | 70 | } |
47 | | |
48 | 57 | int HttpRequest::init_from_evhttp() { |
49 | 57 | _method = to_http_method(evhttp_request_get_command(_ev_req)); |
50 | 57 | if (_method == HttpMethod::UNKNOWN) { |
51 | 0 | LOG(WARNING) << "unknown method of HTTP request, method=" |
52 | 0 | << evhttp_request_get_command(_ev_req); |
53 | 0 | return -1; |
54 | 0 | } |
55 | 57 | _uri = evhttp_request_get_uri(_ev_req); |
56 | | // conver header |
57 | 57 | auto headers = evhttp_request_get_input_headers(_ev_req); |
58 | 217 | for (auto header = headers->tqh_first; header != nullptr; header = header->next.tqe_next) { |
59 | 160 | _headers.emplace(header->key, header->value); |
60 | 160 | } |
61 | | // parse |
62 | 57 | auto ev_uri = evhttp_request_get_evhttp_uri(_ev_req); |
63 | 57 | _raw_path = evhttp_uri_get_path(ev_uri); |
64 | 57 | auto query = evhttp_uri_get_query(ev_uri); |
65 | 57 | if (query == nullptr || *query == '\0') { |
66 | 45 | return 0; |
67 | 45 | } |
68 | 12 | struct evkeyvalq params; |
69 | 12 | auto res = evhttp_parse_query_str(query, ¶ms); |
70 | 12 | if (res < 0) { |
71 | 0 | LOG(WARNING) << "parse query str failed, query=" << query; |
72 | 0 | return res; |
73 | 0 | } |
74 | 39 | for (auto param = params.tqh_first; param != nullptr; param = param->next.tqe_next) { |
75 | 27 | _query_params.emplace(param->key, param->value); |
76 | 27 | } |
77 | 12 | _params.insert(_query_params.begin(), _query_params.end()); |
78 | 12 | evhttp_clear_headers(¶ms); |
79 | 12 | return 0; |
80 | 12 | } |
81 | | |
82 | 25 | std::string HttpRequest::debug_string() const { |
83 | 25 | std::stringstream ss; |
84 | 25 | ss << "HttpRequest: \n" |
85 | 25 | << "method:" << _method << "\n" |
86 | 25 | << "uri:" << _uri << "\n" |
87 | 25 | << "raw_path:" << _raw_path << "\n" |
88 | 25 | << "headers: \n"; |
89 | 64 | for (auto& iter : _headers) { |
90 | 64 | ss << "key=" << iter.first << ", value=" << iter.second << "\n"; |
91 | 64 | } |
92 | 25 | ss << "params: \n"; |
93 | 30 | for (auto& iter : _params) { |
94 | 30 | ss << "key=" << iter.first << ", value=" << iter.second << "\n"; |
95 | 30 | } |
96 | | |
97 | 25 | return ss.str(); |
98 | 25 | } |
99 | | |
100 | 155 | const std::string& HttpRequest::header(const std::string& key) const { |
101 | 155 | auto iter = _headers.find(key); |
102 | 155 | if (iter == _headers.end()) { |
103 | 126 | return s_empty; |
104 | 126 | } |
105 | 29 | return iter->second; |
106 | 155 | } |
107 | | |
108 | 60 | const std::string& HttpRequest::param(const std::string& key) const { |
109 | 60 | auto iter = _params.find(key); |
110 | 60 | if (iter == _params.end()) { |
111 | 28 | return s_empty; |
112 | 28 | } |
113 | 32 | return iter->second; |
114 | 60 | } |
115 | | |
116 | 0 | std::string HttpRequest::get_all_headers() const { |
117 | 0 | std::stringstream headers; |
118 | 0 | for (const auto& header : _headers) { |
119 | 0 | headers << header.first << ":" << header.second + ", "; |
120 | 0 | } |
121 | 0 | return headers.str(); |
122 | 0 | } |
123 | | |
124 | 27 | void HttpRequest::add_output_header(const char* key, const char* value) { |
125 | 27 | evhttp_add_header(evhttp_request_get_output_headers(_ev_req), key, value); |
126 | 27 | } |
127 | | |
128 | 2 | std::string HttpRequest::get_request_body() { |
129 | 2 | if (!_request_body.empty()) { |
130 | 0 | return _request_body; |
131 | 0 | } |
132 | | // read buf |
133 | 2 | auto evbuf = evhttp_request_get_input_buffer(_ev_req); |
134 | 2 | if (evbuf == nullptr) { |
135 | 0 | return _request_body; |
136 | 0 | } |
137 | 2 | auto length = evbuffer_get_length(evbuf); |
138 | 2 | _request_body.resize(length); |
139 | 2 | evbuffer_remove(evbuf, (char*)_request_body.data(), length); |
140 | 2 | return _request_body; |
141 | 2 | } |
142 | | |
143 | 32 | const char* HttpRequest::remote_host() const { |
144 | 32 | return _ev_req->remote_host; |
145 | 32 | } |
146 | | |
147 | 53 | void HttpRequest::finish_send_reply() { |
148 | 53 | if (_send_reply_type == REPLY_SYNC) { |
149 | 53 | return; |
150 | 53 | } |
151 | | |
152 | 0 | std::string infos; |
153 | 0 | if (_handler_ctx != nullptr) { |
154 | 0 | infos = reinterpret_cast<StreamLoadContext*>(_handler_ctx.get())->brief(); |
155 | 0 | } |
156 | 0 | VLOG_NOTICE << "finish send reply, infos=" << infos |
157 | 0 | << ", stack=" << get_stack_trace(); // temp locate problem |
158 | 0 | _http_reply_promise.set_value(true); |
159 | 0 | } |
160 | | |
161 | 31 | void HttpRequest::wait_finish_send_reply() { |
162 | 31 | if (_send_reply_type == REPLY_SYNC) { |
163 | 31 | return; |
164 | 31 | } |
165 | | |
166 | 0 | std::string infos; |
167 | 0 | StreamLoadContext* ctx = nullptr; |
168 | 0 | if (_handler_ctx != nullptr) { |
169 | 0 | ctx = reinterpret_cast<StreamLoadContext*>(_handler_ctx.get()); |
170 | 0 | infos = ctx->brief(); |
171 | 0 | _handler->free_handler_ctx(_handler_ctx); |
172 | 0 | } |
173 | |
|
174 | 0 | VLOG_NOTICE << "start to wait send reply, infos=" << infos; |
175 | 0 | auto status = _http_reply_futrue.wait_for(std::chrono::seconds(3)); |
176 | | // if request is timeout and can't cancel fragment in time, it will cause some new request block |
177 | | // so we will free cancelled request in time. |
178 | 0 | if (status != std::future_status::ready) { |
179 | 0 | LOG(WARNING) << "wait for send reply timeout, " << this->debug_string(); |
180 | 0 | std::unique_lock<std::mutex> lock1(ctx->_send_reply_lock); |
181 | | // do not send_reply after free current request |
182 | 0 | ctx->_can_send_reply = false; |
183 | 0 | ctx->_finish_send_reply = true; |
184 | 0 | ctx->_can_send_reply_cv.notify_all(); |
185 | 0 | } else { |
186 | 0 | VLOG_NOTICE << "wait send reply finished"; |
187 | 0 | } |
188 | | |
189 | | // delete _handler_ctx at the end, in case that finish_send_reply can't get detailed info |
190 | 0 | _handler_ctx = nullptr; |
191 | 0 | } |
192 | | |
193 | | } // namespace doris |