Coverage Report

Created: 2024-11-21 15:53

/root/doris/be/src/http/http_client.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "http/http_client.h"
19
20
#include <glog/logging.h>
21
#include <unistd.h>
22
23
#include <memory>
24
#include <ostream>
25
26
#include "common/config.h"
27
#include "http/http_headers.h"
28
#include "http/http_status.h"
29
#include "runtime/exec_env.h"
30
#include "util/security.h"
31
#include "util/stack_util.h"
32
33
namespace doris {
34
35
0
static const char* header_error_msg(CURLHcode code) {
36
0
    switch (code) {
37
0
    case CURLHE_OK:
38
0
        return "OK";
39
0
    case CURLHE_BADINDEX:
40
0
        return "header exists but not with this index ";
41
0
    case CURLHE_MISSING:
42
0
        return "no such header exists";
43
0
    case CURLHE_NOHEADERS:
44
0
        return "no headers at all exist (yet)";
45
0
    case CURLHE_NOREQUEST:
46
0
        return "no request with this number was used";
47
0
    case CURLHE_OUT_OF_MEMORY:
48
0
        return "out of memory while processing";
49
0
    case CURLHE_BAD_ARGUMENT:
50
0
        return "a function argument was not okay";
51
0
    case CURLHE_NOT_BUILT_IN:
52
0
        return "curl_easy_header() was disabled in the build";
53
0
    default:
54
0
        return "unknown";
55
0
    }
56
0
}
57
58
48
HttpClient::HttpClient() = default;
59
60
48
HttpClient::~HttpClient() {
61
48
    if (_curl != nullptr) {
62
47
        curl_easy_cleanup(_curl);
63
47
        _curl = nullptr;
64
47
    }
65
48
    if (_header_list != nullptr) {
66
2
        curl_slist_free_all(_header_list);
67
2
        _header_list = nullptr;
68
2
    }
69
48
}
70
71
47
Status HttpClient::init(const std::string& url, bool set_fail_on_error) {
72
47
    if (_curl == nullptr) {
73
46
        _curl = curl_easy_init();
74
46
        if (_curl == nullptr) {
75
0
            return Status::InternalError("fail to initialize curl");
76
0
        }
77
46
    } else {
78
1
        curl_easy_reset(_curl);
79
1
    }
80
81
47
    if (_header_list != nullptr) {
82
0
        curl_slist_free_all(_header_list);
83
0
        _header_list = nullptr;
84
0
    }
85
    // set error_buf
86
47
    _error_buf[0] = 0;
87
47
    auto code = curl_easy_setopt(_curl, CURLOPT_ERRORBUFFER, _error_buf);
88
47
    if (code != CURLE_OK) {
89
0
        LOG(WARNING) << "fail to set CURLOPT_ERRORBUFFER, msg=" << _to_errmsg(code);
90
0
        return Status::InternalError("fail to set error buffer");
91
0
    }
92
    // forbid signals
93
47
    code = curl_easy_setopt(_curl, CURLOPT_NOSIGNAL, 1L);
94
47
    if (code != CURLE_OK) {
95
0
        LOG(WARNING) << "fail to set CURLOPT_NOSIGNAL, msg=" << _to_errmsg(code);
96
0
        return Status::InternalError("fail to set CURLOPT_NOSIGNAL");
97
0
    }
98
    // set fail on error
99
    // When this option is set to `1L` (enabled), libcurl will return an error directly
100
    // when encountering HTTP error codes (>= 400), without reading the body of the error response.
101
47
    if (set_fail_on_error) {
102
47
        code = curl_easy_setopt(_curl, CURLOPT_FAILONERROR, 1L);
103
47
        if (code != CURLE_OK) {
104
0
            LOG(WARNING) << "fail to set CURLOPT_FAILONERROR, msg=" << _to_errmsg(code);
105
0
            return Status::InternalError("fail to set CURLOPT_FAILONERROR");
106
0
        }
107
47
    }
108
    // set redirect
109
47
    code = curl_easy_setopt(_curl, CURLOPT_FOLLOWLOCATION, 1L);
110
47
    if (code != CURLE_OK) {
111
0
        LOG(WARNING) << "fail to set CURLOPT_FOLLOWLOCATION, msg=" << _to_errmsg(code);
112
0
        return Status::InternalError("fail to set CURLOPT_FOLLOWLOCATION");
113
0
    }
114
47
    code = curl_easy_setopt(_curl, CURLOPT_MAXREDIRS, 20);
115
47
    if (code != CURLE_OK) {
116
0
        LOG(WARNING) << "fail to set CURLOPT_MAXREDIRS, msg=" << _to_errmsg(code);
117
0
        return Status::InternalError("fail to set CURLOPT_MAXREDIRS");
118
0
    }
119
120
47
    curl_write_callback callback = [](char* buffer, size_t size, size_t nmemb, void* param) {
121
16
        auto* client = (HttpClient*)param;
122
16
        return client->on_response_data(buffer, size * nmemb);
123
16
    };
124
125
    // set callback function
126
47
    code = curl_easy_setopt(_curl, CURLOPT_WRITEFUNCTION, callback);
127
47
    if (code != CURLE_OK) {
128
0
        LOG(WARNING) << "fail to set CURLOPT_WRITEFUNCTION, msg=" << _to_errmsg(code);
129
0
        return Status::InternalError("fail to set CURLOPT_WRITEFUNCTION");
130
0
    }
131
47
    code = curl_easy_setopt(_curl, CURLOPT_WRITEDATA, (void*)this);
132
47
    if (code != CURLE_OK) {
133
0
        LOG(WARNING) << "fail to set CURLOPT_WRITEDATA, msg=" << _to_errmsg(code);
134
0
        return Status::InternalError("fail to set CURLOPT_WRITEDATA");
135
0
    }
136
137
47
    std::string escaped_url;
138
47
    RETURN_IF_ERROR(_escape_url(url, &escaped_url));
139
    // set url
140
47
    code = curl_easy_setopt(_curl, CURLOPT_URL, escaped_url.c_str());
141
47
    if (code != CURLE_OK) {
142
0
        LOG(WARNING) << "failed to set CURLOPT_URL, errmsg=" << _to_errmsg(code);
143
0
        return Status::InternalError("fail to set CURLOPT_URL");
144
0
    }
145
146
#ifndef BE_TEST
147
    set_auth_token(ExecEnv::GetInstance()->cluster_info()->curr_auth_token);
148
#endif
149
47
    return Status::OK();
150
47
}
151
152
47
void HttpClient::set_method(HttpMethod method) {
153
47
    switch (method) {
154
29
    case GET:
155
29
        curl_easy_setopt(_curl, CURLOPT_HTTPGET, 1L);
156
29
        return;
157
0
    case PUT:
158
0
        curl_easy_setopt(_curl, CURLOPT_UPLOAD, 1L);
159
0
        return;
160
14
    case POST:
161
14
        curl_easy_setopt(_curl, CURLOPT_POST, 1L);
162
14
        return;
163
0
    case DELETE:
164
0
        curl_easy_setopt(_curl, CURLOPT_CUSTOMREQUEST, "DELETE");
165
0
        return;
166
4
    case HEAD:
167
4
        curl_easy_setopt(_curl, CURLOPT_NOBODY, 1L);
168
4
        return;
169
0
    case OPTIONS:
170
0
        curl_easy_setopt(_curl, CURLOPT_CUSTOMREQUEST, "OPTIONS");
171
0
        return;
172
0
    default:
173
0
        return;
174
47
    }
175
47
}
176
177
16
size_t HttpClient::on_response_data(const void* data, size_t length) {
178
16
    if (*_callback != nullptr) {
179
16
        bool is_continue = (*_callback)(data, length);
180
16
        if (!is_continue) {
181
0
            return -1;
182
0
        }
183
16
    }
184
16
    return length;
185
16
}
186
187
// Status HttpClient::execute_post_request(const std::string& post_data, const std::function<bool(const void* data, size_t length)>& callback = {}) {
188
//     _callback = &callback;
189
//     set_post_body(post_data);
190
//     return execute(callback);
191
// }
192
193
11
Status HttpClient::execute_post_request(const std::string& payload, std::string* response) {
194
11
    set_method(POST);
195
11
    set_payload(payload);
196
11
    return execute(response);
197
11
}
198
199
0
Status HttpClient::execute_delete_request(const std::string& payload, std::string* response) {
200
0
    set_method(DELETE);
201
0
    set_payload(payload);
202
0
    return execute(response);
203
0
}
204
205
47
Status HttpClient::execute(const std::function<bool(const void* data, size_t length)>& callback) {
206
47
    _callback = &callback;
207
47
    auto code = curl_easy_perform(_curl);
208
47
    if (code != CURLE_OK) {
209
27
        std::string url = mask_token(_get_url());
210
27
        LOG(WARNING) << "fail to execute HTTP client, errmsg=" << _to_errmsg(code)
211
27
                     << ", trace=" << get_stack_trace() << ", url=" << url;
212
27
        std::string errmsg = fmt::format("{}, url={}", _to_errmsg(code), url);
213
27
        return Status::HttpError(std::move(errmsg));
214
27
    }
215
20
    return Status::OK();
216
47
}
217
218
3
Status HttpClient::get_content_md5(std::string* md5) const {
219
3
    struct curl_header* header_ptr;
220
3
    auto code = curl_easy_header(_curl, HttpHeaders::CONTENT_MD5, 0, CURLH_HEADER, 0, &header_ptr);
221
3
    if (code == CURLHE_MISSING || code == CURLHE_NOHEADERS) {
222
        // no such headers exists
223
1
        md5->clear();
224
1
        return Status::OK();
225
2
    } else if (code != CURLHE_OK) {
226
0
        auto msg = fmt::format("failed to get http header {}: {} ({})", HttpHeaders::CONTENT_MD5,
227
0
                               header_error_msg(code), code);
228
0
        LOG(WARNING) << msg << ", trace=" << get_stack_trace();
229
0
        return Status::HttpError(std::move(msg));
230
0
    }
231
232
2
    *md5 = header_ptr->value;
233
2
    return Status::OK();
234
3
}
235
236
1
Status HttpClient::download(const std::string& local_path) {
237
    // set method to GET
238
1
    set_method(GET);
239
240
    // TODO(zc) Move this download speed limit outside to limit download speed
241
    // at system level
242
1
    curl_easy_setopt(_curl, CURLOPT_LOW_SPEED_LIMIT, config::download_low_speed_limit_kbps * 1024);
243
1
    curl_easy_setopt(_curl, CURLOPT_LOW_SPEED_TIME, config::download_low_speed_time);
244
1
    curl_easy_setopt(_curl, CURLOPT_MAX_RECV_SPEED_LARGE, config::max_download_speed_kbps * 1024);
245
246
1
    auto fp_closer = [](FILE* fp) { fclose(fp); };
247
1
    std::unique_ptr<FILE, decltype(fp_closer)> fp(fopen(local_path.c_str(), "w"), fp_closer);
248
1
    if (fp == nullptr) {
249
0
        LOG(WARNING) << "open file failed, file=" << local_path;
250
0
        return Status::InternalError("open file failed");
251
0
    }
252
1
    Status status;
253
1
    auto callback = [&status, &fp, &local_path](const void* data, size_t length) {
254
1
        auto res = fwrite(data, length, 1, fp.get());
255
1
        if (res != 1) {
256
0
            LOG(WARNING) << "fail to write data to file, file=" << local_path
257
0
                         << ", error=" << ferror(fp.get());
258
0
            status = Status::InternalError("fail to write data when download");
259
0
            return false;
260
0
        }
261
1
        return true;
262
1
    };
263
264
1
    if (auto s = execute(callback); !s.ok()) {
265
0
        status = s;
266
0
    }
267
1
    if (!status.ok()) {
268
0
        remove(local_path.c_str());
269
0
    }
270
1
    return status;
271
1
}
272
273
42
Status HttpClient::execute(std::string* response) {
274
42
    auto callback = [response](const void* data, size_t length) {
275
15
        response->append((char*)data, length);
276
15
        return true;
277
15
    };
278
42
    return execute(callback);
279
42
}
280
281
54
const char* HttpClient::_to_errmsg(CURLcode code) const {
282
54
    if (_error_buf[0] == 0) {
283
0
        return curl_easy_strerror(code);
284
0
    }
285
54
    return _error_buf;
286
54
}
287
288
27
const char* HttpClient::_get_url() const {
289
27
    const char* url = nullptr;
290
27
    curl_easy_getinfo(_curl, CURLINFO_EFFECTIVE_URL, &url);
291
27
    if (!url) {
292
0
        url = "<unknown>";
293
0
    }
294
27
    return url;
295
27
}
296
297
Status HttpClient::execute_with_retry(int retry_times, int sleep_time,
298
1
                                      const std::function<Status(HttpClient*)>& callback) {
299
1
    Status status;
300
4
    for (int i = 0; i < retry_times; ++i) {
301
3
        HttpClient client;
302
3
        status = callback(&client);
303
3
        if (status.ok()) {
304
0
            auto http_status = client.get_http_status();
305
0
            if (http_status == 200) {
306
0
                return status;
307
0
            } else {
308
0
                std::string url = mask_token(client._get_url());
309
0
                auto error_msg = fmt::format("http status code is not 200, code={}, url={}",
310
0
                                             http_status, url);
311
0
                LOG(WARNING) << error_msg;
312
0
                return Status::HttpError(error_msg);
313
0
            }
314
0
        }
315
3
        sleep(sleep_time);
316
3
    }
317
1
    return status;
318
1
}
319
320
// http://example.com/page?param1=value1&param2=value+with+spaces#section
321
54
Status HttpClient::_escape_url(const std::string& url, std::string* escaped_url) {
322
54
    size_t query_pos = url.find('?');
323
54
    if (query_pos == std::string::npos) {
324
43
        *escaped_url = url;
325
43
        return Status::OK();
326
43
    }
327
11
    size_t fragment_pos = url.find('#');
328
11
    std::string query;
329
11
    std::string fragment;
330
331
11
    if (fragment_pos == std::string::npos) {
332
10
        query = url.substr(query_pos + 1, url.length() - query_pos - 1);
333
10
    } else {
334
1
        query = url.substr(query_pos + 1, fragment_pos - query_pos - 1);
335
1
        fragment = url.substr(fragment_pos, url.length() - fragment_pos);
336
1
    }
337
338
11
    std::string encoded_query;
339
11
    size_t ampersand_pos = query.find('&');
340
11
    size_t equal_pos;
341
342
11
    if (ampersand_pos == std::string::npos) {
343
6
        ampersand_pos = query.length();
344
6
    }
345
346
19
    while (true) {
347
19
        equal_pos = query.find('=');
348
19
        if (equal_pos != std::string::npos) {
349
16
            std::string key = query.substr(0, equal_pos);
350
16
            std::string value = query.substr(equal_pos + 1, ampersand_pos - equal_pos - 1);
351
352
16
            auto encoded_value = std::unique_ptr<char, decltype(&curl_free)>(
353
16
                    curl_easy_escape(_curl, value.c_str(), value.length()), &curl_free);
354
16
            if (encoded_value) {
355
16
                encoded_query += key + "=" + std::string(encoded_value.get());
356
16
            } else {
357
0
                return Status::InternalError("escape url failed, url={}", url);
358
0
            }
359
16
        } else {
360
3
            encoded_query += query.substr(0, ampersand_pos);
361
3
        }
362
363
19
        if (ampersand_pos == query.length() || ampersand_pos == std::string::npos) {
364
11
            break;
365
11
        }
366
367
8
        encoded_query += "&";
368
8
        query = query.substr(ampersand_pos + 1);
369
8
        ampersand_pos = query.find('&');
370
8
    }
371
11
    *escaped_url = url.substr(0, query_pos + 1) + encoded_query + fragment;
372
11
    return Status::OK();
373
11
}
374
375
} // namespace doris