Coverage Report

Created: 2026-01-02 23:19

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/be/src/http/http_request.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "http/http_request.h"
19
20
#include <event2/buffer.h>
21
#include <event2/http.h>
22
#include <event2/http_struct.h>
23
#include <event2/keyvalq_struct.h>
24
25
#include <memory>
26
#include <sstream>
27
#include <string>
28
#include <unordered_map>
29
#include <utility>
30
31
#include "http/http_handler.h"
32
#include "runtime/stream_load/stream_load_context.h"
33
#include "util/stack_util.h"
34
35
namespace doris {
36
37
static std::string s_empty = "";
38
39
70
HttpRequest::HttpRequest(evhttp_request* evhttp_request) : _ev_req(evhttp_request) {}
40
41
70
HttpRequest::~HttpRequest() {
42
70
    if (_handler_ctx != nullptr) {
43
0
        DCHECK(_handler != nullptr);
44
0
        _handler->free_handler_ctx(_handler_ctx);
45
0
    }
46
70
}
47
48
57
int HttpRequest::init_from_evhttp() {
49
57
    _method = to_http_method(evhttp_request_get_command(_ev_req));
50
57
    if (_method == HttpMethod::UNKNOWN) {
51
0
        LOG(WARNING) << "unknown method of HTTP request, method="
52
0
                     << evhttp_request_get_command(_ev_req);
53
0
        return -1;
54
0
    }
55
57
    _uri = evhttp_request_get_uri(_ev_req);
56
    // conver header
57
57
    auto headers = evhttp_request_get_input_headers(_ev_req);
58
217
    for (auto header = headers->tqh_first; header != nullptr; header = header->next.tqe_next) {
59
160
        _headers.emplace(header->key, header->value);
60
160
    }
61
    // parse
62
57
    auto ev_uri = evhttp_request_get_evhttp_uri(_ev_req);
63
57
    _raw_path = evhttp_uri_get_path(ev_uri);
64
57
    auto query = evhttp_uri_get_query(ev_uri);
65
57
    if (query == nullptr || *query == '\0') {
66
45
        return 0;
67
45
    }
68
12
    struct evkeyvalq params;
69
12
    auto res = evhttp_parse_query_str(query, &params);
70
12
    if (res < 0) {
71
0
        LOG(WARNING) << "parse query str failed, query=" << query;
72
0
        return res;
73
0
    }
74
39
    for (auto param = params.tqh_first; param != nullptr; param = param->next.tqe_next) {
75
27
        _query_params.emplace(param->key, param->value);
76
27
    }
77
12
    _params.insert(_query_params.begin(), _query_params.end());
78
12
    evhttp_clear_headers(&params);
79
12
    return 0;
80
12
}
81
82
25
std::string HttpRequest::debug_string() const {
83
25
    std::stringstream ss;
84
25
    ss << "HttpRequest: \n"
85
25
       << "method:" << _method << "\n"
86
25
       << "uri:" << _uri << "\n"
87
25
       << "raw_path:" << _raw_path << "\n"
88
25
       << "headers: \n";
89
64
    for (auto& iter : _headers) {
90
64
        ss << "key=" << iter.first << ", value=" << iter.second << "\n";
91
64
    }
92
25
    ss << "params: \n";
93
30
    for (auto& iter : _params) {
94
30
        ss << "key=" << iter.first << ", value=" << iter.second << "\n";
95
30
    }
96
97
25
    return ss.str();
98
25
}
99
100
155
const std::string& HttpRequest::header(const std::string& key) const {
101
155
    auto iter = _headers.find(key);
102
155
    if (iter == _headers.end()) {
103
126
        return s_empty;
104
126
    }
105
29
    return iter->second;
106
155
}
107
108
60
const std::string& HttpRequest::param(const std::string& key) const {
109
60
    auto iter = _params.find(key);
110
60
    if (iter == _params.end()) {
111
28
        return s_empty;
112
28
    }
113
32
    return iter->second;
114
60
}
115
116
0
std::string HttpRequest::get_all_headers() const {
117
0
    std::stringstream headers;
118
0
    for (const auto& header : _headers) {
119
0
        headers << header.first << ":" << header.second + ", ";
120
0
    }
121
0
    return headers.str();
122
0
}
123
124
27
void HttpRequest::add_output_header(const char* key, const char* value) {
125
27
    evhttp_add_header(evhttp_request_get_output_headers(_ev_req), key, value);
126
27
}
127
128
2
std::string HttpRequest::get_request_body() {
129
2
    if (!_request_body.empty()) {
130
0
        return _request_body;
131
0
    }
132
    // read buf
133
2
    auto evbuf = evhttp_request_get_input_buffer(_ev_req);
134
2
    if (evbuf == nullptr) {
135
0
        return _request_body;
136
0
    }
137
2
    auto length = evbuffer_get_length(evbuf);
138
2
    _request_body.resize(length);
139
2
    evbuffer_remove(evbuf, (char*)_request_body.data(), length);
140
2
    return _request_body;
141
2
}
142
143
32
const char* HttpRequest::remote_host() const {
144
32
    return _ev_req->remote_host;
145
32
}
146
147
53
void HttpRequest::finish_send_reply() {
148
53
    if (_send_reply_type == REPLY_SYNC) {
149
53
        return;
150
53
    }
151
152
0
    std::string infos;
153
0
    if (_handler_ctx != nullptr) {
154
0
        infos = reinterpret_cast<StreamLoadContext*>(_handler_ctx.get())->brief();
155
0
    }
156
0
    VLOG_NOTICE << "finish send reply, infos=" << infos
157
0
                << ", stack=" << get_stack_trace(); // temp locate problem
158
0
    _http_reply_promise.set_value(true);
159
0
}
160
161
31
void HttpRequest::wait_finish_send_reply() {
162
31
    if (_send_reply_type == REPLY_SYNC) {
163
31
        return;
164
31
    }
165
166
0
    std::string infos;
167
0
    StreamLoadContext* ctx = nullptr;
168
0
    if (_handler_ctx != nullptr) {
169
0
        ctx = reinterpret_cast<StreamLoadContext*>(_handler_ctx.get());
170
0
        infos = ctx->brief();
171
0
        _handler->free_handler_ctx(_handler_ctx);
172
0
    }
173
174
0
    VLOG_NOTICE << "start to wait send reply, infos=" << infos;
175
0
    auto status = _http_reply_futrue.wait_for(std::chrono::seconds(3));
176
    // if request is timeout and can't cancel fragment in time, it will cause some new request block
177
    // so we will free cancelled request in time.
178
0
    if (status != std::future_status::ready) {
179
0
        LOG(WARNING) << "wait for send reply timeout, " << this->debug_string();
180
0
        std::unique_lock<std::mutex> lock1(ctx->_send_reply_lock);
181
        // do not send_reply after free current request
182
0
        ctx->_can_send_reply = false;
183
0
        ctx->_finish_send_reply = true;
184
0
        ctx->_can_send_reply_cv.notify_all();
185
0
    } else {
186
0
        VLOG_NOTICE << "wait send reply finished";
187
0
    }
188
189
    // delete _handler_ctx at the end, in case that finish_send_reply can't get detailed info
190
0
    _handler_ctx = nullptr;
191
0
}
192
193
} // namespace doris