Coverage Report

Created: 2025-07-23 16:39

/root/doris/be/src/http/http_client.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "http/http_client.h"
19
20
#include <glog/logging.h>
21
#include <unistd.h>
22
23
#include <memory>
24
#include <ostream>
25
26
#include "common/config.h"
27
#include "common/status.h"
28
#include "http/http_headers.h"
29
#include "runtime/exec_env.h"
30
#include "util/security.h"
31
#include "util/stack_util.h"
32
33
namespace doris {
34
35
class MultiFileSplitter {
36
public:
37
    MultiFileSplitter(std::string local_dir, std::unordered_set<std::string> expected_files)
38
1
            : _local_dir_path(std::move(local_dir)), _expected_files(std::move(expected_files)) {}
39
1
    ~MultiFileSplitter() {
40
1
        if (_fd >= 0) {
  Branch (40:13): [True: 0, False: 1]
41
0
            close(_fd);
42
0
        }
43
44
1
        if (!_status.ok() && !downloaded_files.empty()) {
  Branch (44:13): [True: 0, False: 1]
  Branch (44:30): [True: 0, False: 0]
45
0
            LOG(WARNING) << "download files to " << _local_dir_path << " failed, try remove the "
46
0
                         << downloaded_files.size() << " downloaded files";
47
0
            for (const auto& file : downloaded_files) {
  Branch (47:35): [True: 0, False: 0]
48
0
                remove(file.c_str());
49
0
            }
50
0
        }
51
1
    }
52
53
661
    bool append(const char* data, size_t length) {
54
        // Already failed.
55
661
        if (!_status.ok()) {
  Branch (55:13): [True: 0, False: 661]
56
0
            return false;
57
0
        }
58
59
661
        std::string buf;
60
661
        if (!_buffer.empty()) {
  Branch (60:13): [True: 0, False: 661]
61
0
            buf.swap(_buffer);
62
0
            buf.append(data, length);
63
0
            data = buf.data();
64
0
            length = buf.size();
65
0
        }
66
661
        return append_inner(data, length);
67
661
    }
68
69
1
    Status finish() {
70
1
        if (_status.ok()) {
  Branch (70:13): [True: 1, False: 0]
71
1
            _status = finish_inner();
72
1
        }
73
74
1
        return _status;
75
1
    }
76
77
private:
78
661
    bool append_inner(const char* data, size_t length) {
79
1.39k
        while (length > 0) {
  Branch (79:16): [True: 729, False: 661]
80
729
            int consumed = 0;
81
729
            if (_is_reading_header) {
  Branch (81:17): [True: 35, False: 694]
82
35
                consumed = parse_header(data, length);
83
694
            } else {
84
694
                consumed = append_file(data, length);
85
694
            }
86
87
729
            if (consumed < 0) {
  Branch (87:17): [True: 0, False: 729]
88
0
                return false;
89
0
            }
90
91
729
            DCHECK(consumed <= length);
92
729
            data += consumed;
93
729
            length -= consumed;
94
729
        }
95
661
        return true;
96
661
    }
97
98
35
    int parse_header(const char* data, size_t length) {
99
35
        DCHECK(_fd < 0);
100
101
35
        std::string_view buf(data, length);
102
35
        size_t pos = buf.find("\r\n\r\n");
103
35
        if (pos == std::string::npos) {
  Branch (103:13): [True: 0, False: 35]
104
0
            _buffer.append(data, length);
105
0
            return static_cast<int>(length);
106
0
        }
107
108
        // header already read.
109
35
        _is_reading_header = false;
110
111
35
        bool has_file_name = false;
112
35
        bool has_file_size = false;
113
35
        std::string_view header = buf.substr(0, pos);
114
35
        std::vector<std::string> headers =
115
35
                strings::Split(header, "\r\n", strings::SkipWhitespace());
116
70
        for (auto& s : headers) {
  Branch (116:22): [True: 70, False: 35]
117
70
            size_t header_pos = s.find(':');
118
70
            if (header_pos == std::string::npos) {
  Branch (118:17): [True: 0, False: 70]
119
0
                continue;
120
0
            }
121
70
            std::string_view header_view(s);
122
70
            std::string_view key = header_view.substr(0, header_pos);
123
70
            std::string_view value = header_view.substr(header_pos + 1);
124
70
            if (value.starts_with(' ')) {
  Branch (124:17): [True: 70, False: 0]
125
70
                value.remove_prefix(std::min(value.find_first_not_of(' '), value.size()));
126
70
            }
127
70
            if (key == "File-Name") {
  Branch (127:17): [True: 35, False: 35]
128
35
                _file_name = value;
129
35
                has_file_name = true;
130
35
            } else if (key == "Content-Length") {
  Branch (130:24): [True: 35, False: 0]
131
35
                auto res = std::from_chars(value.data(), value.data() + value.size(), _file_size);
132
35
                if (res.ec != std::errc()) {
  Branch (132:21): [True: 0, False: 35]
133
0
                    std::string error_msg = fmt::format("invalid content length: {}", value);
134
0
                    LOG(WARNING) << "download files to " << _local_dir_path
135
0
                                 << "failed, err=" << error_msg;
136
0
                    _status = Status::HttpError(std::move(error_msg));
137
0
                    return -1;
138
0
                }
139
35
                has_file_size = true;
140
35
            }
141
70
        }
142
143
35
        if (!has_file_name || !has_file_size) {
  Branch (143:13): [True: 0, False: 35]
  Branch (143:31): [True: 0, False: 35]
144
0
            std::string error_msg =
145
0
                    fmt::format("invalid multi part header, has file name: {}, has file size: {}",
146
0
                                has_file_name, has_file_size);
147
0
            LOG(WARNING) << "download files to " << _local_dir_path << "failed, err=" << error_msg;
148
0
            _status = Status::HttpError(std::move(error_msg));
149
0
            return -1;
150
0
        }
151
152
35
        if (!_expected_files.contains(_file_name)) {
  Branch (152:13): [True: 0, False: 35]
153
0
            std::string error_msg = fmt::format("unexpected file: {}", _file_name);
154
0
            LOG(WARNING) << "download files to " << _local_dir_path << "failed, err=" << error_msg;
155
0
            _status = Status::HttpError(std::move(error_msg));
156
0
            return -1;
157
0
        }
158
159
35
        VLOG_DEBUG << "receive file " << _file_name << ", size " << _file_size;
Line
Count
Source
41
0
#define VLOG_DEBUG VLOG(7)
160
161
35
        _written_size = 0;
162
35
        _local_file_path = fmt::format("{}/{}", _local_dir_path, _file_name);
163
35
        _fd = open(_local_file_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
164
35
        if (_fd < 0) {
  Branch (164:13): [True: 0, False: 35]
165
0
            std::string error_msg = "fail to open file to write: " + _local_file_path;
166
0
            LOG(WARNING) << "download files to " << _local_dir_path << "failed, err=" << error_msg;
167
0
            _status = Status::IOError(std::move(error_msg));
168
0
            return -1;
169
0
        }
170
35
        downloaded_files.push_back(_local_file_path);
171
172
35
        return static_cast<int>(pos + 4);
173
35
    }
174
175
694
    int append_file(const char* data, size_t length) {
176
694
        DCHECK(_fd >= 0);
177
694
        DCHECK(_file_size >= _written_size);
178
179
694
        size_t write_size = std::min(length, _file_size - _written_size);
180
694
        if (write_size > 0 && write(_fd, data, write_size) < 0) {
  Branch (180:13): [True: 693, False: 1]
  Branch (180:31): [True: 0, False: 693]
181
0
            auto msg = fmt::format("write file failed, file={}, error={}", _local_file_path,
182
0
                                   strerror(errno));
183
0
            LOG(WARNING) << "download files to " << _local_dir_path << "failed, err=" << msg;
184
0
            _status = Status::HttpError(std::move(msg));
185
0
            return -1;
186
0
        }
187
188
694
        _written_size += write_size;
189
694
        if (_written_size == _file_size) {
  Branch (189:13): [True: 34, False: 660]
190
            // This file has been downloaded, switch to the next one.
191
34
            switch_to_next_file();
192
34
        }
193
194
694
        return write_size;
195
694
    }
196
197
1
    Status finish_inner() {
198
1
        if (!_is_reading_header && _written_size == _file_size) {
  Branch (198:13): [True: 1, False: 0]
  Branch (198:36): [True: 1, False: 0]
199
1
            switch_to_next_file();
200
1
        }
201
202
1
        if (_fd >= 0) {
  Branch (202:13): [True: 0, False: 1]
203
            // This file is not completely downloaded.
204
0
            close(_fd);
205
0
            _fd = -1;
206
0
            auto error_msg = fmt::format("file {} is not completely downloaded", _local_file_path);
207
0
            LOG(WARNING) << "download files to " << _local_dir_path << "failed, err=" << error_msg;
208
0
            return Status::HttpError(std::move(error_msg));
209
0
        }
210
211
1
        if (!_expected_files.empty()) {
  Branch (211:13): [True: 0, False: 1]
212
0
            auto error_msg = fmt::format("not all files are downloaded, {} missing files",
213
0
                                         _expected_files.size());
214
0
            LOG(WARNING) << "download files to " << _local_dir_path << "failed, err=" << error_msg;
215
0
            return Status::HttpError(std::move(error_msg));
216
0
        }
217
218
1
        downloaded_files.clear();
219
1
        return Status::OK();
220
1
    }
221
222
35
    void switch_to_next_file() {
223
35
        DCHECK(_fd >= 0);
224
35
        DCHECK(_written_size == _file_size);
225
226
35
        close(_fd);
227
35
        _fd = -1;
228
35
        _expected_files.erase(_file_name);
229
35
        _is_reading_header = true;
230
35
    }
231
232
    const std::string _local_dir_path;
233
    std::string _buffer;
234
    std::unordered_set<std::string> _expected_files;
235
    Status _status;
236
237
    bool _is_reading_header = true;
238
    int _fd = -1;
239
    std::string _local_file_path;
240
    std::string _file_name;
241
    size_t _file_size = 0;
242
    size_t _written_size = 0;
243
    std::vector<std::string> downloaded_files;
244
};
245
246
0
static const char* header_error_msg(CURLHcode code) {
247
0
    switch (code) {
248
0
    case CURLHE_OK:
  Branch (248:5): [True: 0, False: 0]
249
0
        return "OK";
250
0
    case CURLHE_BADINDEX:
  Branch (250:5): [True: 0, False: 0]
251
0
        return "header exists but not with this index ";
252
0
    case CURLHE_MISSING:
  Branch (252:5): [True: 0, False: 0]
253
0
        return "no such header exists";
254
0
    case CURLHE_NOHEADERS:
  Branch (254:5): [True: 0, False: 0]
255
0
        return "no headers at all exist (yet)";
256
0
    case CURLHE_NOREQUEST:
  Branch (256:5): [True: 0, False: 0]
257
0
        return "no request with this number was used";
258
0
    case CURLHE_OUT_OF_MEMORY:
  Branch (258:5): [True: 0, False: 0]
259
0
        return "out of memory while processing";
260
0
    case CURLHE_BAD_ARGUMENT:
  Branch (260:5): [True: 0, False: 0]
261
0
        return "a function argument was not okay";
262
0
    case CURLHE_NOT_BUILT_IN:
  Branch (262:5): [True: 0, False: 0]
263
0
        return "curl_easy_header() was disabled in the build";
264
0
    default:
  Branch (264:5): [True: 0, False: 0]
265
0
        return "unknown";
266
0
    }
267
0
}
268
269
55
HttpClient::HttpClient() = default;
270
271
55
HttpClient::~HttpClient() {
272
55
    if (_curl != nullptr) {
  Branch (272:9): [True: 54, False: 1]
273
54
        curl_easy_cleanup(_curl);
274
54
        _curl = nullptr;
275
54
    }
276
55
    if (_header_list != nullptr) {
  Branch (276:9): [True: 2, False: 53]
277
2
        curl_slist_free_all(_header_list);
278
2
        _header_list = nullptr;
279
2
    }
280
55
}
281
282
54
Status HttpClient::init(const std::string& url, bool set_fail_on_error) {
283
54
    if (_curl == nullptr) {
  Branch (283:9): [True: 53, False: 1]
284
53
        _curl = curl_easy_init();
285
53
        if (_curl == nullptr) {
  Branch (285:13): [True: 0, False: 53]
286
0
            return Status::InternalError("fail to initialize curl");
287
0
        }
288
53
    } else {
289
1
        curl_easy_reset(_curl);
290
1
    }
291
292
54
    if (_header_list != nullptr) {
  Branch (292:9): [True: 0, False: 54]
293
0
        curl_slist_free_all(_header_list);
294
0
        _header_list = nullptr;
295
0
    }
296
    // set error_buf
297
54
    _error_buf[0] = 0;
298
54
    auto code = curl_easy_setopt(_curl, CURLOPT_ERRORBUFFER, _error_buf);
299
54
    if (code != CURLE_OK) {
  Branch (299:9): [True: 0, False: 54]
300
0
        LOG(WARNING) << "fail to set CURLOPT_ERRORBUFFER, msg=" << _to_errmsg(code);
301
0
        return Status::InternalError("fail to set error buffer");
302
0
    }
303
    // forbid signals
304
54
    code = curl_easy_setopt(_curl, CURLOPT_NOSIGNAL, 1L);
305
54
    if (code != CURLE_OK) {
  Branch (305:9): [True: 0, False: 54]
306
0
        LOG(WARNING) << "fail to set CURLOPT_NOSIGNAL, msg=" << _to_errmsg(code);
307
0
        return Status::InternalError("fail to set CURLOPT_NOSIGNAL");
308
0
    }
309
    // set fail on error
310
    // When this option is set to `1L` (enabled), libcurl will return an error directly
311
    // when encountering HTTP error codes (>= 400), without reading the body of the error response.
312
54
    if (set_fail_on_error) {
  Branch (312:9): [True: 52, False: 2]
313
52
        code = curl_easy_setopt(_curl, CURLOPT_FAILONERROR, 1L);
314
52
        if (code != CURLE_OK) {
  Branch (314:13): [True: 0, False: 52]
315
0
            LOG(WARNING) << "fail to set CURLOPT_FAILONERROR, msg=" << _to_errmsg(code);
316
0
            return Status::InternalError("fail to set CURLOPT_FAILONERROR");
317
0
        }
318
52
    }
319
    // set redirect
320
54
    code = curl_easy_setopt(_curl, CURLOPT_FOLLOWLOCATION, 1L);
321
54
    if (code != CURLE_OK) {
  Branch (321:9): [True: 0, False: 54]
322
0
        LOG(WARNING) << "fail to set CURLOPT_FOLLOWLOCATION, msg=" << _to_errmsg(code);
323
0
        return Status::InternalError("fail to set CURLOPT_FOLLOWLOCATION");
324
0
    }
325
54
    code = curl_easy_setopt(_curl, CURLOPT_MAXREDIRS, 20);
326
54
    if (code != CURLE_OK) {
  Branch (326:9): [True: 0, False: 54]
327
0
        LOG(WARNING) << "fail to set CURLOPT_MAXREDIRS, msg=" << _to_errmsg(code);
328
0
        return Status::InternalError("fail to set CURLOPT_MAXREDIRS");
329
0
    }
330
331
683
    curl_write_callback callback = [](char* buffer, size_t size, size_t nmemb, void* param) {
332
683
        auto* client = (HttpClient*)param;
333
683
        return client->on_response_data(buffer, size * nmemb);
334
683
    };
335
336
    // set callback function
337
54
    code = curl_easy_setopt(_curl, CURLOPT_WRITEFUNCTION, callback);
338
54
    if (code != CURLE_OK) {
  Branch (338:9): [True: 0, False: 54]
339
0
        LOG(WARNING) << "fail to set CURLOPT_WRITEFUNCTION, msg=" << _to_errmsg(code);
340
0
        return Status::InternalError("fail to set CURLOPT_WRITEFUNCTION");
341
0
    }
342
54
    code = curl_easy_setopt(_curl, CURLOPT_WRITEDATA, (void*)this);
343
54
    if (code != CURLE_OK) {
  Branch (343:9): [True: 0, False: 54]
344
0
        LOG(WARNING) << "fail to set CURLOPT_WRITEDATA, msg=" << _to_errmsg(code);
345
0
        return Status::InternalError("fail to set CURLOPT_WRITEDATA");
346
0
    }
347
348
54
    std::string escaped_url;
349
54
    RETURN_IF_ERROR(_escape_url(url, &escaped_url));
350
    // set url
351
54
    code = curl_easy_setopt(_curl, CURLOPT_URL, escaped_url.c_str());
352
54
    if (code != CURLE_OK) {
  Branch (352:9): [True: 0, False: 54]
353
0
        LOG(WARNING) << "failed to set CURLOPT_URL, errmsg=" << _to_errmsg(code);
354
0
        return Status::InternalError("fail to set CURLOPT_URL");
355
0
    }
356
357
#ifndef BE_TEST
358
    set_auth_token(ExecEnv::GetInstance()->cluster_info()->curr_auth_token);
359
#endif
360
54
    return Status::OK();
361
54
}
362
363
53
void HttpClient::set_method(HttpMethod method) {
364
53
    _method = method;
365
53
    switch (method) {
366
31
    case GET:
  Branch (366:5): [True: 31, False: 22]
367
31
        curl_easy_setopt(_curl, CURLOPT_HTTPGET, 1L);
368
31
        return;
369
0
    case PUT:
  Branch (369:5): [True: 0, False: 53]
370
0
        curl_easy_setopt(_curl, CURLOPT_UPLOAD, 1L);
371
0
        return;
372
15
    case POST:
  Branch (372:5): [True: 15, False: 38]
373
15
        curl_easy_setopt(_curl, CURLOPT_POST, 1L);
374
15
        return;
375
0
    case DELETE:
  Branch (375:5): [True: 0, False: 53]
376
0
        curl_easy_setopt(_curl, CURLOPT_CUSTOMREQUEST, "DELETE");
377
0
        return;
378
7
    case HEAD:
  Branch (378:5): [True: 7, False: 46]
379
7
        curl_easy_setopt(_curl, CURLOPT_NOBODY, 1L);
380
7
        return;
381
0
    case OPTIONS:
  Branch (381:5): [True: 0, False: 53]
382
0
        curl_easy_setopt(_curl, CURLOPT_CUSTOMREQUEST, "OPTIONS");
383
0
        return;
384
0
    default:
  Branch (384:5): [True: 0, False: 53]
385
0
        return;
386
53
    }
387
53
}
388
389
3
void HttpClient::set_speed_limit() {
390
3
    curl_easy_setopt(_curl, CURLOPT_LOW_SPEED_LIMIT, config::download_low_speed_limit_kbps * 1024);
391
3
    curl_easy_setopt(_curl, CURLOPT_LOW_SPEED_TIME, config::download_low_speed_time);
392
3
    curl_easy_setopt(_curl, CURLOPT_MAX_RECV_SPEED_LARGE, config::max_download_speed_kbps * 1024);
393
3
}
394
395
683
size_t HttpClient::on_response_data(const void* data, size_t length) {
396
683
    if (*_callback != nullptr) {
  Branch (396:9): [True: 683, False: 0]
397
683
        bool is_continue = (*_callback)(data, length);
398
683
        if (!is_continue) {
  Branch (398:13): [True: 0, False: 683]
399
0
            return -1;
400
0
        }
401
683
    }
402
683
    return length;
403
683
}
404
405
11
Status HttpClient::execute_post_request(const std::string& payload, std::string* response) {
406
11
    set_method(POST);
407
11
    set_payload(payload);
408
11
    return execute(response);
409
11
}
410
411
0
Status HttpClient::execute_delete_request(const std::string& payload, std::string* response) {
412
0
    set_method(DELETE);
413
0
    set_payload(payload);
414
0
    return execute(response);
415
0
}
416
417
54
Status HttpClient::execute(const std::function<bool(const void* data, size_t length)>& callback) {
418
54
    if (VLOG_DEBUG_IS_ON) {
Line
Count
Source
51
54
#define VLOG_DEBUG_IS_ON VLOG_IS_ON(7)
419
0
        VLOG_DEBUG << "execute http " << to_method_desc(_method) << " request, url " << _get_url();
Line
Count
Source
41
0
#define VLOG_DEBUG VLOG(7)
420
0
    }
421
54
    _callback = &callback;
422
54
    auto code = curl_easy_perform(_curl);
423
54
    if (code != CURLE_OK) {
  Branch (423:9): [True: 27, False: 27]
424
27
        std::string url = mask_token(_get_url());
425
27
        LOG(WARNING) << "fail to execute HTTP client, errmsg=" << _to_errmsg(code)
426
27
                     << ", trace=" << get_stack_trace() << ", url=" << url;
427
27
        std::string errmsg = fmt::format("{}, url={}", _to_errmsg(code), url);
428
27
        return Status::HttpError(std::move(errmsg));
429
27
    }
430
27
    if (VLOG_DEBUG_IS_ON) {
Line
Count
Source
51
27
#define VLOG_DEBUG_IS_ON VLOG_IS_ON(7)
431
0
        VLOG_DEBUG << "execute http " << to_method_desc(_method) << " request, url " << _get_url()
Line
Count
Source
41
0
#define VLOG_DEBUG VLOG(7)
432
0
                   << " done";
433
0
    }
434
27
    return Status::OK();
435
54
}
436
437
3
Status HttpClient::get_content_md5(std::string* md5) const {
438
3
    struct curl_header* header_ptr;
439
3
    auto code = curl_easy_header(_curl, HttpHeaders::CONTENT_MD5, 0, CURLH_HEADER, 0, &header_ptr);
440
3
    if (code == CURLHE_MISSING || code == CURLHE_NOHEADERS) {
  Branch (440:9): [True: 1, False: 2]
  Branch (440:35): [True: 0, False: 2]
441
        // no such headers exists
442
1
        md5->clear();
443
1
        return Status::OK();
444
2
    } else if (code != CURLHE_OK) {
  Branch (444:16): [True: 0, False: 2]
445
0
        auto msg = fmt::format("failed to get http header {}: {} ({})", HttpHeaders::CONTENT_MD5,
446
0
                               header_error_msg(code), code);
447
0
        LOG(WARNING) << msg << ", trace=" << get_stack_trace();
448
0
        return Status::HttpError(std::move(msg));
449
0
    }
450
451
2
    *md5 = header_ptr->value;
452
2
    return Status::OK();
453
3
}
454
455
2
Status HttpClient::download(const std::string& local_path) {
456
2
    set_method(GET);
457
2
    set_speed_limit();
458
459
    // remove the file if it exists, to avoid change the linked files unexpectedly
460
2
    bool exist = false;
461
2
    RETURN_IF_ERROR(io::global_local_filesystem()->exists(local_path, &exist));
462
2
    if (exist) {
  Branch (462:9): [True: 0, False: 2]
463
0
        remove(local_path.c_str());
464
0
    }
465
466
2
    auto fp_closer = [](FILE* fp) { fclose(fp); };
467
2
    std::unique_ptr<FILE, decltype(fp_closer)> fp(fopen(local_path.c_str(), "w"), fp_closer);
468
2
    if (fp == nullptr) {
  Branch (468:9): [True: 0, False: 2]
469
0
        LOG(WARNING) << "open file failed, file=" << local_path;
470
0
        return Status::InternalError("open file failed");
471
0
    }
472
2
    Status status;
473
2
    auto callback = [&status, &fp, &local_path](const void* data, size_t length) {
474
2
        auto res = fwrite(data, length, 1, fp.get());
475
2
        if (res != 1) {
  Branch (475:13): [True: 0, False: 2]
476
0
            LOG(WARNING) << "fail to write data to file, file=" << local_path
477
0
                         << ", error=" << ferror(fp.get());
478
0
            status = Status::InternalError("fail to write data when download");
479
0
            return false;
480
0
        }
481
2
        return true;
482
2
    };
483
484
2
    if (auto s = execute(callback); !s.ok()) {
  Branch (484:37): [True: 0, False: 2]
485
0
        status = s;
486
0
    }
487
2
    if (!status.ok()) {
  Branch (487:9): [True: 0, False: 2]
488
0
        remove(local_path.c_str());
489
0
    }
490
2
    return status;
491
2
}
492
493
Status HttpClient::download_multi_files(const std::string& local_dir,
494
1
                                        const std::unordered_set<std::string>& expected_files) {
495
1
    set_speed_limit();
496
497
1
    MultiFileSplitter splitter(local_dir, expected_files);
498
661
    auto callback = [&](const void* data, size_t length) {
499
661
        return splitter.append(reinterpret_cast<const char*>(data), length);
500
661
    };
501
1
    if (auto s = execute(callback); !s.ok()) {
  Branch (501:37): [True: 0, False: 1]
502
0
        return s;
503
0
    }
504
1
    return splitter.finish();
505
1
}
506
507
45
Status HttpClient::execute(std::string* response) {
508
45
    auto callback = [response](const void* data, size_t length) {
509
20
        response->append((char*)data, length);
510
20
        return true;
511
20
    };
512
45
    return execute(callback);
513
45
}
514
515
54
const char* HttpClient::_to_errmsg(CURLcode code) const {
516
54
    if (_error_buf[0] == 0) {
  Branch (516:9): [True: 0, False: 54]
517
0
        return curl_easy_strerror(code);
518
0
    }
519
54
    return _error_buf;
520
54
}
521
522
27
const char* HttpClient::_get_url() const {
523
27
    const char* url = nullptr;
524
27
    curl_easy_getinfo(_curl, CURLINFO_EFFECTIVE_URL, &url);
525
27
    if (!url) {
  Branch (525:9): [True: 0, False: 27]
526
0
        url = "<unknown>";
527
0
    }
528
27
    return url;
529
27
}
530
531
// execute remote call action with retry
532
Status HttpClient::execute(int retry_times, int sleep_time,
533
0
                           const std::function<Status(HttpClient*)>& callback) {
534
0
    Status status;
535
0
    for (int i = 0; i < retry_times; ++i) {
  Branch (535:21): [True: 0, False: 0]
536
0
        status = callback(this);
537
0
        if (status.ok()) {
  Branch (537:13): [True: 0, False: 0]
538
0
            auto http_status = get_http_status();
539
0
            if (http_status == 200) {
  Branch (539:17): [True: 0, False: 0]
540
0
                return status;
541
0
            } else {
542
0
                std::string url = mask_token(_get_url());
543
0
                auto error_msg = fmt::format("http status code is not 200, code={}, url={}",
544
0
                                             http_status, url);
545
0
                LOG(WARNING) << error_msg;
546
0
                return Status::HttpError(error_msg);
547
0
            }
548
0
        }
549
0
        sleep(sleep_time);
550
0
    }
551
0
    return status;
552
0
}
553
554
Status HttpClient::execute_with_retry(int retry_times, int sleep_time,
555
8
                                      const std::function<Status(HttpClient*)>& callback) {
556
8
    Status status;
557
11
    for (int i = 0; i < retry_times; ++i) {
  Branch (557:21): [True: 10, False: 1]
558
10
        HttpClient client;
559
10
        status = callback(&client);
560
10
        if (status.ok()) {
  Branch (560:13): [True: 7, False: 3]
561
7
            auto http_status = client.get_http_status();
562
7
            if (http_status == 200) {
  Branch (562:17): [True: 7, False: 0]
563
7
                return status;
564
7
            } else {
565
0
                std::string url = mask_token(client._get_url());
566
0
                auto error_msg = fmt::format("http status code is not 200, code={}, url={}",
567
0
                                             http_status, url);
568
0
                LOG(WARNING) << error_msg;
569
0
                return Status::HttpError(error_msg);
570
0
            }
571
7
        }
572
3
        sleep(sleep_time);
573
3
    }
574
1
    return status;
575
8
}
576
577
// http://example.com/page?param1=value1&param2=value+with+spaces#section
578
61
Status HttpClient::_escape_url(const std::string& url, std::string* escaped_url) {
579
61
    size_t query_pos = url.find('?');
580
61
    if (query_pos == std::string::npos) {
  Branch (580:9): [True: 43, False: 18]
581
43
        *escaped_url = url;
582
43
        return Status::OK();
583
43
    }
584
18
    size_t fragment_pos = url.find('#');
585
18
    std::string query;
586
18
    std::string fragment;
587
588
18
    if (fragment_pos == std::string::npos) {
  Branch (588:9): [True: 17, False: 1]
589
17
        query = url.substr(query_pos + 1, url.length() - query_pos - 1);
590
17
    } else {
591
1
        query = url.substr(query_pos + 1, fragment_pos - query_pos - 1);
592
1
        fragment = url.substr(fragment_pos, url.length() - fragment_pos);
593
1
    }
594
595
18
    std::string encoded_query;
596
18
    size_t ampersand_pos = query.find('&');
597
18
    size_t equal_pos;
598
599
18
    if (ampersand_pos == std::string::npos) {
  Branch (599:9): [True: 7, False: 11]
600
7
        ampersand_pos = query.length();
601
7
    }
602
603
37
    while (true) {
  Branch (603:12): [Folded - Ignored]
604
37
        equal_pos = query.find('=');
605
37
        if (equal_pos != std::string::npos) {
  Branch (605:13): [True: 34, False: 3]
606
34
            std::string key = query.substr(0, equal_pos);
607
34
            std::string value = query.substr(equal_pos + 1, ampersand_pos - equal_pos - 1);
608
609
34
            auto encoded_value = std::unique_ptr<char, decltype(&curl_free)>(
610
34
                    curl_easy_escape(_curl, value.c_str(), value.length()), &curl_free);
611
34
            if (encoded_value) {
  Branch (611:17): [True: 34, False: 0]
612
34
                encoded_query += key + "=" + std::string(encoded_value.get());
613
34
            } else {
614
0
                return Status::InternalError("escape url failed, url={}", url);
615
0
            }
616
34
        } else {
617
3
            encoded_query += query.substr(0, ampersand_pos);
618
3
        }
619
620
37
        if (ampersand_pos == query.length() || ampersand_pos == std::string::npos) {
  Branch (620:13): [True: 7, False: 30]
  Branch (620:48): [True: 11, False: 19]
621
18
            break;
622
18
        }
623
624
19
        encoded_query += "&";
625
19
        query = query.substr(ampersand_pos + 1);
626
19
        ampersand_pos = query.find('&');
627
19
    }
628
18
    *escaped_url = url.substr(0, query_pos + 1) + encoded_query + fragment;
629
18
    return Status::OK();
630
18
}
631
632
} // namespace doris