/root/doris/be/src/http/http_client.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "http/http_client.h" |
19 | | |
20 | | #include <absl/strings/str_split.h> |
21 | | #include <glog/logging.h> |
22 | | #include <unistd.h> |
23 | | |
24 | | #include <memory> |
25 | | #include <ostream> |
26 | | |
27 | | #include "common/cast_set.h" |
28 | | #include "common/config.h" |
29 | | #include "common/status.h" |
30 | | #include "http/http_headers.h" |
31 | | #include "runtime/exec_env.h" |
32 | | #include "util/security.h" |
33 | | #include "util/stack_util.h" |
34 | | |
35 | | namespace doris { |
36 | | #include "common/compile_check_begin.h" |
37 | | class MultiFileSplitter { |
38 | | public: |
39 | | MultiFileSplitter(std::string local_dir, std::unordered_set<std::string> expected_files) |
40 | 1 | : _local_dir_path(std::move(local_dir)), _expected_files(std::move(expected_files)) {} |
41 | 1 | ~MultiFileSplitter() { |
42 | 1 | if (_fd >= 0) { |
43 | 0 | close(_fd); |
44 | 0 | } |
45 | | |
46 | 1 | if (!_status.ok() && !downloaded_files.empty()) { |
47 | 0 | LOG(WARNING) << "download files to " << _local_dir_path << " failed, try remove the " |
48 | 0 | << downloaded_files.size() << " downloaded files"; |
49 | 0 | for (const auto& file : downloaded_files) { |
50 | 0 | remove(file.c_str()); |
51 | 0 | } |
52 | 0 | } |
53 | 1 | } |
54 | | |
55 | 661 | bool append(const char* data, size_t length) { |
56 | | // Already failed. |
57 | 661 | if (!_status.ok()) { |
58 | 0 | return false; |
59 | 0 | } |
60 | | |
61 | 661 | std::string buf; |
62 | 661 | if (!_buffer.empty()) { |
63 | 0 | buf.swap(_buffer); |
64 | 0 | buf.append(data, length); |
65 | 0 | data = buf.data(); |
66 | 0 | length = buf.size(); |
67 | 0 | } |
68 | 661 | return append_inner(data, length); |
69 | 661 | } |
70 | | |
71 | 1 | Status finish() { |
72 | 1 | if (_status.ok()) { |
73 | 1 | _status = finish_inner(); |
74 | 1 | } |
75 | | |
76 | 1 | return _status; |
77 | 1 | } |
78 | | |
79 | | private: |
80 | 661 | bool append_inner(const char* data, size_t length) { |
81 | 1.39k | while (length > 0) { |
82 | 729 | int consumed = 0; |
83 | 729 | if (_is_reading_header) { |
84 | 35 | consumed = parse_header(data, length); |
85 | 694 | } else { |
86 | 694 | consumed = append_file(data, length); |
87 | 694 | } |
88 | | |
89 | 729 | if (consumed < 0) { |
90 | 0 | return false; |
91 | 0 | } |
92 | | |
93 | 729 | DCHECK(consumed <= length); |
94 | 729 | data += consumed; |
95 | 729 | length -= consumed; |
96 | 729 | } |
97 | 661 | return true; |
98 | 661 | } |
99 | | |
100 | 35 | int parse_header(const char* data, size_t length) { |
101 | 35 | DCHECK(_fd < 0); |
102 | | |
103 | 35 | std::string_view buf(data, length); |
104 | 35 | size_t pos = buf.find("\r\n\r\n"); |
105 | 35 | if (pos == std::string::npos) { |
106 | 0 | _buffer.append(data, length); |
107 | 0 | return static_cast<int>(length); |
108 | 0 | } |
109 | | |
110 | | // header already read. |
111 | 35 | _is_reading_header = false; |
112 | | |
113 | 35 | bool has_file_name = false; |
114 | 35 | bool has_file_size = false; |
115 | 35 | std::string_view header = buf.substr(0, pos); |
116 | 35 | std::vector<std::string> headers = absl::StrSplit(header, "\r\n", absl::SkipWhitespace()); |
117 | 70 | for (auto& s : headers) { |
118 | 70 | size_t header_pos = s.find(':'); |
119 | 70 | if (header_pos == std::string::npos) { |
120 | 0 | continue; |
121 | 0 | } |
122 | 70 | std::string_view header_view(s); |
123 | 70 | std::string_view key = header_view.substr(0, header_pos); |
124 | 70 | std::string_view value = header_view.substr(header_pos + 1); |
125 | 70 | if (value.starts_with(' ')) { |
126 | 70 | value.remove_prefix(std::min(value.find_first_not_of(' '), value.size())); |
127 | 70 | } |
128 | 70 | if (key == "File-Name") { |
129 | 35 | _file_name = value; |
130 | 35 | has_file_name = true; |
131 | 35 | } else if (key == "Content-Length") { |
132 | 35 | auto res = std::from_chars(value.data(), value.data() + value.size(), _file_size); |
133 | 35 | if (res.ec != std::errc()) { |
134 | 0 | std::string error_msg = fmt::format("invalid content length: {}", value); |
135 | 0 | LOG(WARNING) << "download files to " << _local_dir_path |
136 | 0 | << "failed, err=" << error_msg; |
137 | 0 | _status = Status::HttpError(std::move(error_msg)); |
138 | 0 | return -1; |
139 | 0 | } |
140 | 35 | has_file_size = true; |
141 | 35 | } |
142 | 70 | } |
143 | | |
144 | 35 | if (!has_file_name || !has_file_size) { |
145 | 0 | std::string error_msg = |
146 | 0 | fmt::format("invalid multi part header, has file name: {}, has file size: {}", |
147 | 0 | has_file_name, has_file_size); |
148 | 0 | LOG(WARNING) << "download files to " << _local_dir_path << "failed, err=" << error_msg; |
149 | 0 | _status = Status::HttpError(std::move(error_msg)); |
150 | 0 | return -1; |
151 | 0 | } |
152 | | |
153 | 35 | if (!_expected_files.contains(_file_name)) { |
154 | 0 | std::string error_msg = fmt::format("unexpected file: {}", _file_name); |
155 | 0 | LOG(WARNING) << "download files to " << _local_dir_path << "failed, err=" << error_msg; |
156 | 0 | _status = Status::HttpError(std::move(error_msg)); |
157 | 0 | return -1; |
158 | 0 | } |
159 | | |
160 | 35 | VLOG_DEBUG << "receive file " << _file_name << ", size " << _file_size; |
161 | | |
162 | 35 | _written_size = 0; |
163 | 35 | _local_file_path = fmt::format("{}/{}", _local_dir_path, _file_name); |
164 | 35 | _fd = open(_local_file_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644); |
165 | 35 | if (_fd < 0) { |
166 | 0 | std::string error_msg = "fail to open file to write: " + _local_file_path; |
167 | 0 | LOG(WARNING) << "download files to " << _local_dir_path << "failed, err=" << error_msg; |
168 | 0 | _status = Status::IOError(std::move(error_msg)); |
169 | 0 | return -1; |
170 | 0 | } |
171 | 35 | downloaded_files.push_back(_local_file_path); |
172 | | |
173 | 35 | return static_cast<int>(pos + 4); |
174 | 35 | } |
175 | | |
176 | 694 | int append_file(const char* data, size_t length) { |
177 | 694 | DCHECK(_fd >= 0); |
178 | 694 | DCHECK(_file_size >= _written_size); |
179 | | |
180 | 694 | size_t write_size = std::min(length, _file_size - _written_size); |
181 | 694 | if (write_size > 0 && write(_fd, data, write_size) < 0) { |
182 | 0 | auto msg = fmt::format("write file failed, file={}, error={}", _local_file_path, |
183 | 0 | strerror(errno)); |
184 | 0 | LOG(WARNING) << "download files to " << _local_dir_path << "failed, err=" << msg; |
185 | 0 | _status = Status::HttpError(std::move(msg)); |
186 | 0 | return -1; |
187 | 0 | } |
188 | | |
189 | 694 | _written_size += write_size; |
190 | 694 | if (_written_size == _file_size) { |
191 | | // This file has been downloaded, switch to the next one. |
192 | 34 | switch_to_next_file(); |
193 | 34 | } |
194 | | |
195 | 694 | return cast_set<int>(write_size); |
196 | 694 | } |
197 | | |
198 | 1 | Status finish_inner() { |
199 | 1 | if (!_is_reading_header && _written_size == _file_size) { |
200 | 1 | switch_to_next_file(); |
201 | 1 | } |
202 | | |
203 | 1 | if (_fd >= 0) { |
204 | | // This file is not completely downloaded. |
205 | 0 | close(_fd); |
206 | 0 | _fd = -1; |
207 | 0 | auto error_msg = fmt::format("file {} is not completely downloaded", _local_file_path); |
208 | 0 | LOG(WARNING) << "download files to " << _local_dir_path << "failed, err=" << error_msg; |
209 | 0 | return Status::HttpError(std::move(error_msg)); |
210 | 0 | } |
211 | | |
212 | 1 | if (!_expected_files.empty()) { |
213 | 0 | auto error_msg = fmt::format("not all files are downloaded, {} missing files", |
214 | 0 | _expected_files.size()); |
215 | 0 | LOG(WARNING) << "download files to " << _local_dir_path << "failed, err=" << error_msg; |
216 | 0 | return Status::HttpError(std::move(error_msg)); |
217 | 0 | } |
218 | | |
219 | 1 | downloaded_files.clear(); |
220 | 1 | return Status::OK(); |
221 | 1 | } |
222 | | |
223 | 35 | void switch_to_next_file() { |
224 | 35 | DCHECK(_fd >= 0); |
225 | 35 | DCHECK(_written_size == _file_size); |
226 | | |
227 | 35 | close(_fd); |
228 | 35 | _fd = -1; |
229 | 35 | _expected_files.erase(_file_name); |
230 | 35 | _is_reading_header = true; |
231 | 35 | } |
232 | | |
233 | | const std::string _local_dir_path; |
234 | | std::string _buffer; |
235 | | std::unordered_set<std::string> _expected_files; |
236 | | Status _status; |
237 | | |
238 | | bool _is_reading_header = true; |
239 | | int _fd = -1; |
240 | | std::string _local_file_path; |
241 | | std::string _file_name; |
242 | | size_t _file_size = 0; |
243 | | size_t _written_size = 0; |
244 | | std::vector<std::string> downloaded_files; |
245 | | }; |
246 | | |
247 | 0 | static const char* header_error_msg(CURLHcode code) { |
248 | 0 | switch (code) { |
249 | 0 | case CURLHE_OK: |
250 | 0 | return "OK"; |
251 | 0 | case CURLHE_BADINDEX: |
252 | 0 | return "header exists but not with this index "; |
253 | 0 | case CURLHE_MISSING: |
254 | 0 | return "no such header exists"; |
255 | 0 | case CURLHE_NOHEADERS: |
256 | 0 | return "no headers at all exist (yet)"; |
257 | 0 | case CURLHE_NOREQUEST: |
258 | 0 | return "no request with this number was used"; |
259 | 0 | case CURLHE_OUT_OF_MEMORY: |
260 | 0 | return "out of memory while processing"; |
261 | 0 | case CURLHE_BAD_ARGUMENT: |
262 | 0 | return "a function argument was not okay"; |
263 | 0 | case CURLHE_NOT_BUILT_IN: |
264 | 0 | return "curl_easy_header() was disabled in the build"; |
265 | 0 | default: |
266 | 0 | return "unknown"; |
267 | 0 | } |
268 | 0 | } |
269 | | |
270 | 55 | HttpClient::HttpClient() = default; |
271 | | |
272 | 55 | HttpClient::~HttpClient() { |
273 | 55 | if (_curl != nullptr) { |
274 | 54 | curl_easy_cleanup(_curl); |
275 | 54 | _curl = nullptr; |
276 | 54 | } |
277 | 55 | if (_header_list != nullptr) { |
278 | 2 | curl_slist_free_all(_header_list); |
279 | 2 | _header_list = nullptr; |
280 | 2 | } |
281 | 55 | } |
282 | | |
283 | 54 | Status HttpClient::init(const std::string& url, bool set_fail_on_error) { |
284 | 54 | if (_curl == nullptr) { |
285 | 53 | _curl = curl_easy_init(); |
286 | 53 | if (_curl == nullptr) { |
287 | 0 | return Status::InternalError("fail to initialize curl"); |
288 | 0 | } |
289 | 53 | } else { |
290 | 1 | curl_easy_reset(_curl); |
291 | 1 | } |
292 | | |
293 | 54 | if (_header_list != nullptr) { |
294 | 0 | curl_slist_free_all(_header_list); |
295 | 0 | _header_list = nullptr; |
296 | 0 | } |
297 | | // set error_buf |
298 | 54 | _error_buf[0] = 0; |
299 | 54 | auto code = curl_easy_setopt(_curl, CURLOPT_ERRORBUFFER, _error_buf); |
300 | 54 | if (code != CURLE_OK) { |
301 | 0 | LOG(WARNING) << "fail to set CURLOPT_ERRORBUFFER, msg=" << _to_errmsg(code); |
302 | 0 | return Status::InternalError("fail to set error buffer"); |
303 | 0 | } |
304 | | // forbid signals |
305 | 54 | code = curl_easy_setopt(_curl, CURLOPT_NOSIGNAL, 1L); |
306 | 54 | if (code != CURLE_OK) { |
307 | 0 | LOG(WARNING) << "fail to set CURLOPT_NOSIGNAL, msg=" << _to_errmsg(code); |
308 | 0 | return Status::InternalError("fail to set CURLOPT_NOSIGNAL"); |
309 | 0 | } |
310 | | // set fail on error |
311 | | // When this option is set to `1L` (enabled), libcurl will return an error directly |
312 | | // when encountering HTTP error codes (>= 400), without reading the body of the error response. |
313 | 54 | if (set_fail_on_error) { |
314 | 52 | code = curl_easy_setopt(_curl, CURLOPT_FAILONERROR, 1L); |
315 | 52 | if (code != CURLE_OK) { |
316 | 0 | LOG(WARNING) << "fail to set CURLOPT_FAILONERROR, msg=" << _to_errmsg(code); |
317 | 0 | return Status::InternalError("fail to set CURLOPT_FAILONERROR"); |
318 | 0 | } |
319 | 52 | } |
320 | | // set redirect |
321 | 54 | code = curl_easy_setopt(_curl, CURLOPT_FOLLOWLOCATION, 1L); |
322 | 54 | if (code != CURLE_OK) { |
323 | 0 | LOG(WARNING) << "fail to set CURLOPT_FOLLOWLOCATION, msg=" << _to_errmsg(code); |
324 | 0 | return Status::InternalError("fail to set CURLOPT_FOLLOWLOCATION"); |
325 | 0 | } |
326 | 54 | code = curl_easy_setopt(_curl, CURLOPT_MAXREDIRS, 20); |
327 | 54 | if (code != CURLE_OK) { |
328 | 0 | LOG(WARNING) << "fail to set CURLOPT_MAXREDIRS, msg=" << _to_errmsg(code); |
329 | 0 | return Status::InternalError("fail to set CURLOPT_MAXREDIRS"); |
330 | 0 | } |
331 | | |
332 | 683 | curl_write_callback callback = [](char* buffer, size_t size, size_t nmemb, void* param) { |
333 | 683 | auto* client = (HttpClient*)param; |
334 | 683 | return client->on_response_data(buffer, size * nmemb); |
335 | 683 | }; |
336 | | |
337 | | // set callback function |
338 | 54 | code = curl_easy_setopt(_curl, CURLOPT_WRITEFUNCTION, callback); |
339 | 54 | if (code != CURLE_OK) { |
340 | 0 | LOG(WARNING) << "fail to set CURLOPT_WRITEFUNCTION, msg=" << _to_errmsg(code); |
341 | 0 | return Status::InternalError("fail to set CURLOPT_WRITEFUNCTION"); |
342 | 0 | } |
343 | 54 | code = curl_easy_setopt(_curl, CURLOPT_WRITEDATA, (void*)this); |
344 | 54 | if (code != CURLE_OK) { |
345 | 0 | LOG(WARNING) << "fail to set CURLOPT_WRITEDATA, msg=" << _to_errmsg(code); |
346 | 0 | return Status::InternalError("fail to set CURLOPT_WRITEDATA"); |
347 | 0 | } |
348 | | |
349 | 54 | std::string escaped_url; |
350 | 54 | RETURN_IF_ERROR(_escape_url(url, &escaped_url)); |
351 | | // set url |
352 | 54 | code = curl_easy_setopt(_curl, CURLOPT_URL, escaped_url.c_str()); |
353 | 54 | if (code != CURLE_OK) { |
354 | 0 | LOG(WARNING) << "failed to set CURLOPT_URL, errmsg=" << _to_errmsg(code); |
355 | 0 | return Status::InternalError("fail to set CURLOPT_URL"); |
356 | 0 | } |
357 | | |
358 | | #ifndef BE_TEST |
359 | | set_auth_token(ExecEnv::GetInstance()->cluster_info()->curr_auth_token); |
360 | | #endif |
361 | 54 | return Status::OK(); |
362 | 54 | } |
363 | | |
364 | 53 | void HttpClient::set_method(HttpMethod method) { |
365 | 53 | _method = method; |
366 | 53 | switch (method) { |
367 | 31 | case GET: |
368 | 31 | curl_easy_setopt(_curl, CURLOPT_HTTPGET, 1L); |
369 | 31 | return; |
370 | 0 | case PUT: |
371 | 0 | curl_easy_setopt(_curl, CURLOPT_UPLOAD, 1L); |
372 | 0 | return; |
373 | 15 | case POST: |
374 | 15 | curl_easy_setopt(_curl, CURLOPT_POST, 1L); |
375 | 15 | return; |
376 | 0 | case DELETE: |
377 | 0 | curl_easy_setopt(_curl, CURLOPT_CUSTOMREQUEST, "DELETE"); |
378 | 0 | return; |
379 | 7 | case HEAD: |
380 | 7 | curl_easy_setopt(_curl, CURLOPT_NOBODY, 1L); |
381 | 7 | return; |
382 | 0 | case OPTIONS: |
383 | 0 | curl_easy_setopt(_curl, CURLOPT_CUSTOMREQUEST, "OPTIONS"); |
384 | 0 | return; |
385 | 0 | default: |
386 | 0 | return; |
387 | 53 | } |
388 | 53 | } |
389 | | |
390 | 3 | void HttpClient::set_speed_limit() { |
391 | 3 | curl_easy_setopt(_curl, CURLOPT_LOW_SPEED_LIMIT, config::download_low_speed_limit_kbps * 1024); |
392 | 3 | curl_easy_setopt(_curl, CURLOPT_LOW_SPEED_TIME, config::download_low_speed_time); |
393 | 3 | curl_easy_setopt(_curl, CURLOPT_MAX_RECV_SPEED_LARGE, config::max_download_speed_kbps * 1024); |
394 | 3 | } |
395 | | |
396 | 683 | size_t HttpClient::on_response_data(const void* data, size_t length) { |
397 | 683 | if (*_callback != nullptr) { |
398 | 683 | bool is_continue = (*_callback)(data, length); |
399 | 683 | if (!is_continue) { |
400 | 0 | return -1; |
401 | 0 | } |
402 | 683 | } |
403 | 683 | return length; |
404 | 683 | } |
405 | | |
406 | 11 | Status HttpClient::execute_post_request(const std::string& payload, std::string* response) { |
407 | 11 | set_method(POST); |
408 | 11 | set_payload(payload); |
409 | 11 | return execute(response); |
410 | 11 | } |
411 | | |
412 | 0 | Status HttpClient::execute_delete_request(const std::string& payload, std::string* response) { |
413 | 0 | set_method(DELETE); |
414 | 0 | set_payload(payload); |
415 | 0 | return execute(response); |
416 | 0 | } |
417 | | |
418 | 54 | Status HttpClient::execute(const std::function<bool(const void* data, size_t length)>& callback) { |
419 | 54 | if (VLOG_DEBUG_IS_ON) { |
420 | 0 | VLOG_DEBUG << "execute http " << to_method_desc(_method) << " request, url " << _get_url(); |
421 | 0 | } |
422 | 54 | _callback = &callback; |
423 | 54 | auto code = curl_easy_perform(_curl); |
424 | 54 | if (code != CURLE_OK) { |
425 | 27 | std::string url = mask_token(_get_url()); |
426 | 27 | LOG(WARNING) << "fail to execute HTTP client, errmsg=" << _to_errmsg(code) |
427 | 27 | << ", trace=" << get_stack_trace() << ", url=" << url; |
428 | 27 | std::string errmsg = fmt::format("{}, url={}", _to_errmsg(code), url); |
429 | 27 | return Status::HttpError(std::move(errmsg)); |
430 | 27 | } |
431 | 27 | if (VLOG_DEBUG_IS_ON) { |
432 | 0 | VLOG_DEBUG << "execute http " << to_method_desc(_method) << " request, url " << _get_url() |
433 | 0 | << " done"; |
434 | 0 | } |
435 | 27 | return Status::OK(); |
436 | 54 | } |
437 | | |
438 | 3 | Status HttpClient::get_content_md5(std::string* md5) const { |
439 | 3 | struct curl_header* header_ptr; |
440 | 3 | auto code = curl_easy_header(_curl, HttpHeaders::CONTENT_MD5, 0, CURLH_HEADER, 0, &header_ptr); |
441 | 3 | if (code == CURLHE_MISSING || code == CURLHE_NOHEADERS) { |
442 | | // no such headers exists |
443 | 1 | md5->clear(); |
444 | 1 | return Status::OK(); |
445 | 2 | } else if (code != CURLHE_OK) { |
446 | 0 | auto msg = fmt::format("failed to get http header {}: {} ({})", HttpHeaders::CONTENT_MD5, |
447 | 0 | header_error_msg(code), code); |
448 | 0 | LOG(WARNING) << msg << ", trace=" << get_stack_trace(); |
449 | 0 | return Status::HttpError(std::move(msg)); |
450 | 0 | } |
451 | | |
452 | 2 | *md5 = header_ptr->value; |
453 | 2 | return Status::OK(); |
454 | 3 | } |
455 | | |
456 | 2 | Status HttpClient::download(const std::string& local_path) { |
457 | 2 | set_method(GET); |
458 | 2 | set_speed_limit(); |
459 | | |
460 | | // remove the file if it exists, to avoid change the linked files unexpectedly |
461 | 2 | bool exist = false; |
462 | 2 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(local_path, &exist)); |
463 | 2 | if (exist) { |
464 | 0 | remove(local_path.c_str()); |
465 | 0 | } |
466 | | |
467 | 2 | auto fp_closer = [](FILE* fp) { fclose(fp); }; |
468 | 2 | std::unique_ptr<FILE, decltype(fp_closer)> fp(fopen(local_path.c_str(), "w"), fp_closer); |
469 | 2 | if (fp == nullptr) { |
470 | 0 | LOG(WARNING) << "open file failed, file=" << local_path; |
471 | 0 | return Status::InternalError("open file failed"); |
472 | 0 | } |
473 | 2 | Status status; |
474 | 2 | auto callback = [&status, &fp, &local_path](const void* data, size_t length) { |
475 | 2 | auto res = fwrite(data, length, 1, fp.get()); |
476 | 2 | if (res != 1) { |
477 | 0 | LOG(WARNING) << "fail to write data to file, file=" << local_path |
478 | 0 | << ", error=" << ferror(fp.get()); |
479 | 0 | status = Status::InternalError("fail to write data when download"); |
480 | 0 | return false; |
481 | 0 | } |
482 | 2 | return true; |
483 | 2 | }; |
484 | | |
485 | 2 | if (auto s = execute(callback); !s.ok()) { |
486 | 0 | status = s; |
487 | 0 | } |
488 | 2 | if (!status.ok()) { |
489 | 0 | remove(local_path.c_str()); |
490 | 0 | } |
491 | 2 | return status; |
492 | 2 | } |
493 | | |
494 | | Status HttpClient::download_multi_files(const std::string& local_dir, |
495 | 1 | const std::unordered_set<std::string>& expected_files) { |
496 | 1 | set_speed_limit(); |
497 | | |
498 | 1 | MultiFileSplitter splitter(local_dir, expected_files); |
499 | 661 | auto callback = [&](const void* data, size_t length) { |
500 | 661 | return splitter.append(reinterpret_cast<const char*>(data), length); |
501 | 661 | }; |
502 | 1 | if (auto s = execute(callback); !s.ok()) { |
503 | 0 | return s; |
504 | 0 | } |
505 | 1 | return splitter.finish(); |
506 | 1 | } |
507 | | |
508 | 45 | Status HttpClient::execute(std::string* response) { |
509 | 45 | auto callback = [response](const void* data, size_t length) { |
510 | 20 | response->append((char*)data, length); |
511 | 20 | return true; |
512 | 20 | }; |
513 | 45 | return execute(callback); |
514 | 45 | } |
515 | | |
516 | 54 | const char* HttpClient::_to_errmsg(CURLcode code) const { |
517 | 54 | if (_error_buf[0] == 0) { |
518 | 0 | return curl_easy_strerror(code); |
519 | 0 | } |
520 | 54 | return _error_buf; |
521 | 54 | } |
522 | | |
523 | 27 | const char* HttpClient::_get_url() const { |
524 | 27 | const char* url = nullptr; |
525 | 27 | curl_easy_getinfo(_curl, CURLINFO_EFFECTIVE_URL, &url); |
526 | 27 | if (!url) { |
527 | 0 | url = "<unknown>"; |
528 | 0 | } |
529 | 27 | return url; |
530 | 27 | } |
531 | | |
532 | | // execute remote call action with retry |
533 | | Status HttpClient::execute(int retry_times, int sleep_time, |
534 | 0 | const std::function<Status(HttpClient*)>& callback) { |
535 | 0 | Status status; |
536 | 0 | for (int i = 0; i < retry_times; ++i) { |
537 | 0 | status = callback(this); |
538 | 0 | if (status.ok()) { |
539 | 0 | auto http_status = get_http_status(); |
540 | 0 | if (http_status == 200) { |
541 | 0 | return status; |
542 | 0 | } else { |
543 | 0 | std::string url = mask_token(_get_url()); |
544 | 0 | auto error_msg = fmt::format("http status code is not 200, code={}, url={}", |
545 | 0 | http_status, url); |
546 | 0 | LOG(WARNING) << error_msg; |
547 | 0 | return Status::HttpError(error_msg); |
548 | 0 | } |
549 | 0 | } |
550 | 0 | sleep(sleep_time); |
551 | 0 | } |
552 | 0 | return status; |
553 | 0 | } |
554 | | |
555 | | Status HttpClient::execute_with_retry(int retry_times, int sleep_time, |
556 | 8 | const std::function<Status(HttpClient*)>& callback) { |
557 | 8 | Status status; |
558 | 11 | for (int i = 0; i < retry_times; ++i) { |
559 | 10 | HttpClient client; |
560 | 10 | status = callback(&client); |
561 | 10 | if (status.ok()) { |
562 | 7 | auto http_status = client.get_http_status(); |
563 | 7 | if (http_status == 200) { |
564 | 7 | return status; |
565 | 7 | } else { |
566 | 0 | std::string url = mask_token(client._get_url()); |
567 | 0 | auto error_msg = fmt::format("http status code is not 200, code={}, url={}", |
568 | 0 | http_status, url); |
569 | 0 | LOG(WARNING) << error_msg; |
570 | 0 | return Status::HttpError(error_msg); |
571 | 0 | } |
572 | 7 | } |
573 | 3 | sleep(sleep_time); |
574 | 3 | } |
575 | 1 | return status; |
576 | 8 | } |
577 | | |
578 | | // http://example.com/page?param1=value1¶m2=value+with+spaces#section |
579 | 61 | Status HttpClient::_escape_url(const std::string& url, std::string* escaped_url) { |
580 | 61 | size_t query_pos = url.find('?'); |
581 | 61 | if (query_pos == std::string::npos) { |
582 | 43 | *escaped_url = url; |
583 | 43 | return Status::OK(); |
584 | 43 | } |
585 | 18 | size_t fragment_pos = url.find('#'); |
586 | 18 | std::string query; |
587 | 18 | std::string fragment; |
588 | | |
589 | 18 | if (fragment_pos == std::string::npos) { |
590 | 17 | query = url.substr(query_pos + 1, url.length() - query_pos - 1); |
591 | 17 | } else { |
592 | 1 | query = url.substr(query_pos + 1, fragment_pos - query_pos - 1); |
593 | 1 | fragment = url.substr(fragment_pos, url.length() - fragment_pos); |
594 | 1 | } |
595 | | |
596 | 18 | std::string encoded_query; |
597 | 18 | size_t ampersand_pos = query.find('&'); |
598 | 18 | size_t equal_pos; |
599 | | |
600 | 18 | if (ampersand_pos == std::string::npos) { |
601 | 7 | ampersand_pos = query.length(); |
602 | 7 | } |
603 | | |
604 | 37 | while (true) { |
605 | 37 | equal_pos = query.find('='); |
606 | 37 | if (equal_pos != std::string::npos) { |
607 | 34 | std::string key = query.substr(0, equal_pos); |
608 | 34 | std::string value = query.substr(equal_pos + 1, ampersand_pos - equal_pos - 1); |
609 | | |
610 | 34 | auto encoded_value = std::unique_ptr<char, decltype(&curl_free)>( |
611 | 34 | curl_easy_escape(_curl, value.c_str(), cast_set<int>(value.length())), |
612 | 34 | &curl_free); |
613 | 34 | if (encoded_value) { |
614 | 34 | encoded_query += key + "=" + std::string(encoded_value.get()); |
615 | 34 | } else { |
616 | 0 | return Status::InternalError("escape url failed, url={}", url); |
617 | 0 | } |
618 | 34 | } else { |
619 | 3 | encoded_query += query.substr(0, ampersand_pos); |
620 | 3 | } |
621 | | |
622 | 37 | if (ampersand_pos == query.length() || ampersand_pos == std::string::npos) { |
623 | 18 | break; |
624 | 18 | } |
625 | | |
626 | 19 | encoded_query += "&"; |
627 | 19 | query = query.substr(ampersand_pos + 1); |
628 | 19 | ampersand_pos = query.find('&'); |
629 | 19 | } |
630 | 18 | *escaped_url = url.substr(0, query_pos + 1) + encoded_query + fragment; |
631 | 18 | return Status::OK(); |
632 | 18 | } |
633 | | #include "common/compile_check_end.h" |
634 | | } // namespace doris |