be/src/service/http/http_request.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "service/http/http_request.h" |
19 | | |
20 | | #include <event2/buffer.h> |
21 | | #include <event2/http.h> |
22 | | #include <event2/http_struct.h> |
23 | | #include <event2/keyvalq_struct.h> |
24 | | |
25 | | #include <memory> |
26 | | #include <sstream> |
27 | | #include <string> |
28 | | #include <unordered_map> |
29 | | #include <utility> |
30 | | |
31 | | #include "load/stream_load/stream_load_context.h" |
32 | | #include "service/http/http_handler.h" |
33 | | #include "service/http/http_headers.h" |
34 | | #include "util/stack_util.h" |
35 | | |
36 | | namespace doris { |
37 | | |
38 | | static std::string s_empty = ""; |
39 | | |
40 | | // Helper function to check if a header should be masked in logs |
41 | 25.8k | static bool is_sensitive_header(const std::string& header_name) { |
42 | 25.8k | return iequal(header_name, HttpHeaders::AUTHORIZATION) || |
43 | 25.8k | iequal(header_name, HttpHeaders::PROXY_AUTHORIZATION) || iequal(header_name, "token") || |
44 | 25.8k | iequal(header_name, HttpHeaders::AUTH_TOKEN); |
45 | 25.8k | } |
46 | | |
47 | 7.00k | HttpRequest::HttpRequest(evhttp_request* evhttp_request) : _ev_req(evhttp_request) {} |
48 | | |
49 | 7.00k | HttpRequest::~HttpRequest() { |
50 | 7.00k | if (_handler_ctx != nullptr) { |
51 | 314 | DCHECK(_handler != nullptr); |
52 | 314 | _handler->free_handler_ctx(_handler_ctx); |
53 | 314 | } |
54 | 7.00k | } |
55 | | |
56 | 6.99k | int HttpRequest::init_from_evhttp() { |
57 | 6.99k | _method = to_http_method(evhttp_request_get_command(_ev_req)); |
58 | 6.99k | if (_method == HttpMethod::UNKNOWN) { |
59 | 0 | LOG(WARNING) << "unknown method of HTTP request, method=" |
60 | 0 | << evhttp_request_get_command(_ev_req); |
61 | 0 | return -1; |
62 | 0 | } |
63 | 6.99k | _uri = evhttp_request_get_uri(_ev_req); |
64 | | // conver header |
65 | 6.99k | auto headers = evhttp_request_get_input_headers(_ev_req); |
66 | 48.2k | for (auto header = headers->tqh_first; header != nullptr; header = header->next.tqe_next) { |
67 | 41.2k | _headers.emplace(header->key, header->value); |
68 | 41.2k | } |
69 | | // parse |
70 | 6.99k | auto ev_uri = evhttp_request_get_evhttp_uri(_ev_req); |
71 | 6.99k | _raw_path = evhttp_uri_get_path(ev_uri); |
72 | 6.99k | auto query = evhttp_uri_get_query(ev_uri); |
73 | 6.99k | if (query == nullptr || *query == '\0') { |
74 | 2.73k | return 0; |
75 | 2.73k | } |
76 | 4.25k | struct evkeyvalq params; |
77 | 4.25k | auto res = evhttp_parse_query_str(query, ¶ms); |
78 | 4.25k | if (res < 0) { |
79 | 0 | LOG(WARNING) << "parse query str failed, query=" << query; |
80 | 0 | return res; |
81 | 0 | } |
82 | 9.12k | for (auto param = params.tqh_first; param != nullptr; param = param->next.tqe_next) { |
83 | 4.86k | _query_params.emplace(param->key, param->value); |
84 | 4.86k | } |
85 | 4.25k | _params.insert(_query_params.begin(), _query_params.end()); |
86 | 4.25k | evhttp_clear_headers(¶ms); |
87 | 4.25k | return 0; |
88 | 4.25k | } |
89 | | |
90 | 56 | std::string HttpRequest::debug_string() const { |
91 | 56 | std::stringstream ss; |
92 | 56 | ss << "HttpRequest: \n" |
93 | 56 | << "method:" << _method << "\n" |
94 | 56 | << "uri:" << _uri << "\n" |
95 | 56 | << "raw_path:" << _raw_path << "\n" |
96 | 56 | << "headers: \n"; |
97 | 161 | for (auto& iter : _headers) { |
98 | 161 | if (is_sensitive_header(iter.first)) { |
99 | 2 | ss << "key=" << iter.first << ", value=***MASKED***\n"; |
100 | 159 | } else { |
101 | 159 | ss << "key=" << iter.first << ", value=" << iter.second << "\n"; |
102 | 159 | } |
103 | 161 | } |
104 | 56 | ss << "params: \n"; |
105 | 56 | for (auto& iter : _params) { |
106 | 54 | ss << "key=" << iter.first << ", value=" << iter.second << "\n"; |
107 | 54 | } |
108 | | |
109 | 56 | return ss.str(); |
110 | 56 | } |
111 | | |
112 | 165k | const std::string& HttpRequest::header(const std::string& key) const { |
113 | 165k | auto iter = _headers.find(key); |
114 | 165k | if (iter == _headers.end()) { |
115 | 129k | return s_empty; |
116 | 129k | } |
117 | 35.7k | return iter->second; |
118 | 165k | } |
119 | | |
120 | 10.3k | const std::string& HttpRequest::param(const std::string& key) const { |
121 | 10.3k | auto iter = _params.find(key); |
122 | 10.3k | if (iter == _params.end()) { |
123 | 705 | return s_empty; |
124 | 705 | } |
125 | 9.64k | return iter->second; |
126 | 10.3k | } |
127 | | |
128 | 2.27k | std::string HttpRequest::get_all_headers() const { |
129 | 2.27k | std::stringstream headers; |
130 | 25.7k | for (const auto& header : _headers) { |
131 | | // Mask sensitive headers |
132 | 25.7k | if (is_sensitive_header(header.first)) { |
133 | 2.57k | headers << header.first << ":***MASKED***, "; |
134 | 23.1k | } else { |
135 | 23.1k | headers << header.first << ":" << header.second + ", "; |
136 | 23.1k | } |
137 | 25.7k | } |
138 | 2.27k | return headers.str(); |
139 | 2.27k | } |
140 | | |
141 | 6.70k | void HttpRequest::add_output_header(const char* key, const char* value) { |
142 | 6.70k | evhttp_add_header(evhttp_request_get_output_headers(_ev_req), key, value); |
143 | 6.70k | } |
144 | | |
145 | 2 | std::string HttpRequest::get_request_body() { |
146 | 2 | if (!_request_body.empty()) { |
147 | 0 | return _request_body; |
148 | 0 | } |
149 | | // read buf |
150 | 2 | auto evbuf = evhttp_request_get_input_buffer(_ev_req); |
151 | 2 | if (evbuf == nullptr) { |
152 | 0 | return _request_body; |
153 | 0 | } |
154 | 2 | auto length = evbuffer_get_length(evbuf); |
155 | 2 | _request_body.resize(length); |
156 | 2 | evbuffer_remove(evbuf, (char*)_request_body.data(), length); |
157 | 2 | return _request_body; |
158 | 2 | } |
159 | | |
160 | 4.97k | const char* HttpRequest::remote_host() const { |
161 | 4.97k | return _ev_req->remote_host; |
162 | 4.97k | } |
163 | | |
164 | 6.99k | void HttpRequest::finish_send_reply() { |
165 | 6.99k | if (_send_reply_type == REPLY_SYNC) { |
166 | 4.71k | return; |
167 | 4.71k | } |
168 | | |
169 | 2.27k | std::string infos; |
170 | 2.27k | if (_handler_ctx != nullptr) { |
171 | 2.27k | infos = reinterpret_cast<StreamLoadContext*>(_handler_ctx.get())->brief(); |
172 | 2.27k | } |
173 | 2.27k | _http_reply_promise.set_value(true); |
174 | 2.27k | } |
175 | | |
176 | 6.78k | void HttpRequest::wait_finish_send_reply() { |
177 | 6.78k | if (_send_reply_type == REPLY_SYNC) { |
178 | 4.69k | return; |
179 | 4.69k | } |
180 | | |
181 | 2.09k | std::string infos; |
182 | 2.09k | StreamLoadContext* ctx = nullptr; |
183 | 2.09k | if (_handler_ctx != nullptr) { |
184 | 2.09k | ctx = reinterpret_cast<StreamLoadContext*>(_handler_ctx.get()); |
185 | 2.09k | infos = ctx->brief(); |
186 | 2.09k | _handler->free_handler_ctx(_handler_ctx); |
187 | 2.09k | } |
188 | | |
189 | 2.09k | VLOG_NOTICE << "start to wait send reply, infos=" << infos; |
190 | 2.09k | auto status = _http_reply_future.wait_for(std::chrono::seconds(config::async_reply_timeout_s)); |
191 | | // if request is timeout and can't cancel fragment in time, it will cause some new request block |
192 | | // so we will free cancelled request in time. |
193 | 2.09k | if (status != std::future_status::ready) { |
194 | 0 | LOG(WARNING) << "wait for send reply timeout, " << this->debug_string(); |
195 | 0 | std::unique_lock<std::mutex> lock1(ctx->_send_reply_lock); |
196 | | // do not send_reply after free current request |
197 | 0 | ctx->_can_send_reply = false; |
198 | 0 | ctx->_finish_send_reply = true; |
199 | 0 | ctx->_can_send_reply_cv.notify_all(); |
200 | 2.09k | } else { |
201 | 2.09k | VLOG_NOTICE << "wait send reply finished"; |
202 | 2.09k | } |
203 | | |
204 | | // delete _handler_ctx at the end, in case that finish_send_reply can't get detailed info |
205 | 2.09k | _handler_ctx = nullptr; |
206 | 2.09k | } |
207 | | |
208 | | } // namespace doris |