Coverage Report

Created: 2026-04-15 18:59

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/service/http/utils.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "service/http/utils.h"
19
20
#include <absl/strings/str_split.h>
21
#include <fcntl.h>
22
#include <stdint.h>
23
#include <sys/stat.h>
24
#include <unistd.h>
25
26
#include <ostream>
27
#include <string>
28
#include <unordered_map>
29
#include <vector>
30
31
#include "common/config.h"
32
#include "common/logging.h"
33
#include "common/status.h"
34
#include "common/utils.h"
35
#include "io/fs/file_system.h"
36
#include "io/fs/local_file_system.h"
37
#include "load/group_commit/wal/wal_manager.h"
38
#include "runtime/exec_env.h"
39
#include "service/http/http_channel.h"
40
#include "service/http/http_client.h"
41
#include "service/http/http_common.h"
42
#include "service/http/http_headers.h"
43
#include "service/http/http_method.h"
44
#include "service/http/http_request.h"
45
#include "service/http/http_status.h"
46
#include "util/md5.h"
47
#include "util/path_util.h"
48
#include "util/security.h"
49
#include "util/url_coding.h"
50
51
namespace doris {
52
53
const uint32_t CHECK_SUPPORT_TIMEOUT = 3;
54
const uint32_t DOWNLOAD_FILE_MAX_RETRY = 3;
55
const uint32_t LIST_REMOTE_FILE_TIMEOUT = 15;
56
57
2
std::string encode_basic_auth(const std::string& user, const std::string& passwd) {
58
2
    std::string auth = user + ":" + passwd;
59
2
    std::string encoded_auth;
60
2
    base64_encode(auth, &encoded_auth);
61
2
    static std::string s_prefix = "Basic ";
62
2
    return s_prefix + encoded_auth;
63
2
}
64
65
90
bool parse_basic_auth(const HttpRequest& req, std::string* user, std::string* passwd) {
66
    // const auto& token = req.header(HttpHeaders::AUTH_TOKEN);
67
68
90
    const char k_basic[] = "Basic ";
69
90
    const auto& auth = req.header(HttpHeaders::AUTHORIZATION);
70
90
    if (auth.compare(0, sizeof(k_basic) - 1, k_basic, sizeof(k_basic) - 1) != 0) {
71
38
        return false;
72
38
    }
73
52
    std::string encoded_str = auth.substr(sizeof(k_basic) - 1);
74
52
    std::string decoded_auth;
75
52
    if (!base64_decode(encoded_str, &decoded_auth)) {
76
2
        return false;
77
2
    }
78
50
    auto pos = decoded_auth.find(':');
79
50
    if (pos == std::string::npos) {
80
2
        return false;
81
2
    }
82
48
    user->assign(decoded_auth.c_str(), pos);
83
48
    passwd->assign(decoded_auth.c_str() + pos + 1);
84
85
48
    return true;
86
50
}
87
88
68
bool parse_basic_auth(const HttpRequest& req, AuthInfo* auth) {
89
    // deprecated, removed in 3.1, use AUTH_TOKEN
90
68
    const auto& token = req.header("token");
91
    // deprecated, removed in 3.1, use AUTH_TOKEN
92
68
    const auto& auth_code = req.header(HTTP_AUTH_CODE);
93
68
    const auto& auth_token = req.header(HttpHeaders::AUTH_TOKEN);
94
95
68
    std::tuple<std::string, std::string, std::string> tmp;
96
68
    auto& [user, pass, cluster] = tmp;
97
68
    bool valid_basic_auth = parse_basic_auth(req, &user, &pass);
98
68
    if (valid_basic_auth) { // always set the basic auth, the user may be useful
99
34
        auto pos = user.find('@');
100
34
        if (pos != std::string::npos) {
101
0
            cluster.assign(user.c_str() + pos + 1);
102
0
            user.assign(user.c_str(), pos); // user is updated
103
0
        }
104
34
        auth->user = user;
105
34
        auth->passwd = pass;
106
34
        auth->cluster = cluster;
107
34
    }
108
109
68
    if (!token.empty()) {
110
0
        auth->token = token; // deprecated
111
68
    } else if (!auth_token.empty()) {
112
4
        auth->token = auth_token;
113
64
    } else if (!auth_code.empty()) {
114
0
        auth->auth_code = std::stoll(auth_code); // deprecated
115
64
    } else if (!valid_basic_auth) {
116
30
        return false;
117
30
    }
118
119
    // set user ip
120
38
    auth->user_ip.assign(req.remote_host() != nullptr ? req.remote_host() : "");
121
122
38
    return true;
123
68
}
124
125
// Do a simple decision, only deal a few type
126
8
std::string get_content_type(const std::string& file_name) {
127
8
    std::string file_ext = path_util::file_extension(file_name);
128
8
    VLOG_TRACE << "file_name: " << file_name << "; file extension: [" << file_ext << "]";
129
8
    if (file_ext == std::string(".html") || file_ext == std::string(".htm")) {
130
0
        return "text/html; charset=utf-8";
131
8
    } else if (file_ext == std::string(".js")) {
132
0
        return "application/javascript; charset=utf-8";
133
8
    } else if (file_ext == std::string(".css")) {
134
0
        return "text/css; charset=utf-8";
135
8
    } else if (file_ext == std::string(".txt")) {
136
0
        return "text/plain; charset=utf-8";
137
8
    } else if (file_ext == std::string(".png")) {
138
0
        return "image/png";
139
8
    } else if (file_ext == std::string(".ico")) {
140
0
        return "image/x-icon";
141
8
    } else {
142
8
        return "text/plain; charset=utf-8";
143
8
    }
144
8
}
145
146
void do_file_response(const std::string& file_path, HttpRequest* req,
147
8
                      bufferevent_rate_limit_group* rate_limit_group, bool is_acquire_md5) {
148
8
    if (file_path.find("..") != std::string::npos) {
149
0
        LOG(WARNING) << "Not allowed to read relative path: " << file_path;
150
0
        HttpChannel::send_error(req, HttpStatus::FORBIDDEN);
151
0
        return;
152
0
    }
153
154
    // read file content and send response
155
8
    int fd = open(file_path.c_str(), O_RDONLY);
156
8
    if (fd < 0) {
157
0
        LOG(WARNING) << "Failed to open file: " << file_path;
158
0
        HttpChannel::send_error(req, HttpStatus::NOT_FOUND);
159
0
        return;
160
0
    }
161
8
    struct stat st;
162
8
    auto res = fstat(fd, &st);
163
8
    if (res < 0) {
164
0
        close(fd);
165
0
        LOG(WARNING) << "Failed to open file: " << file_path;
166
0
        HttpChannel::send_error(req, HttpStatus::NOT_FOUND);
167
0
        return;
168
0
    }
169
170
8
    int64_t file_size = st.st_size;
171
172
    // TODO(lingbin): process "IF_MODIFIED_SINCE" header
173
    // TODO(lingbin): process "RANGE" header
174
8
    const std::string& range_header = req->header(HttpHeaders::RANGE);
175
8
    if (!range_header.empty()) {
176
        // analyse range header
177
0
    }
178
179
8
    req->add_output_header(HttpHeaders::CONTENT_TYPE, get_content_type(file_path).c_str());
180
181
8
    if (is_acquire_md5) {
182
2
        Md5Digest md5;
183
184
2
        void* buf = mmap(nullptr, file_size, PROT_READ, MAP_SHARED, fd, 0);
185
2
        md5.update(buf, file_size);
186
2
        md5.digest();
187
2
        munmap(buf, file_size);
188
189
2
        req->add_output_header(HttpHeaders::CONTENT_MD5, md5.hex().c_str());
190
2
    }
191
192
8
    if (req->method() == HttpMethod::HEAD) {
193
6
        close(fd);
194
6
        req->add_output_header(HttpHeaders::CONTENT_LENGTH, std::to_string(file_size).c_str());
195
6
        HttpChannel::send_reply(req);
196
6
        return;
197
6
    }
198
199
2
    HttpChannel::send_file(req, fd, 0, file_size, rate_limit_group);
200
2
}
201
202
4
void do_dir_response(const std::string& dir_path, HttpRequest* req, bool is_acquire_filesize) {
203
4
    bool exists = true;
204
4
    std::vector<io::FileInfo> files;
205
4
    Status st = io::global_local_filesystem()->list(dir_path, true, &files, &exists);
206
4
    if (!st.ok()) {
207
0
        LOG(WARNING) << "Failed to scan dir. " << st;
208
0
        HttpChannel::send_error(req, HttpStatus::INTERNAL_SERVER_ERROR);
209
0
        return;
210
0
    }
211
212
4
    VLOG_DEBUG << "list dir: " << dir_path << ", exists: " << exists
213
0
               << ", file count: " << files.size();
214
215
4
    if (!exists) {
216
0
        HttpChannel::send_error(req, HttpStatus::NOT_FOUND);
217
0
        return;
218
0
    }
219
220
4
    const std::string FILE_DELIMITER_IN_DIR_RESPONSE = "\n";
221
222
4
    std::stringstream result;
223
74
    for (auto& file : files) {
224
74
        result << file.file_name << FILE_DELIMITER_IN_DIR_RESPONSE;
225
74
        if (is_acquire_filesize) {
226
70
            result << file.file_size << FILE_DELIMITER_IN_DIR_RESPONSE;
227
70
        }
228
74
    }
229
230
4
    std::string result_str = result.str();
231
4
    HttpChannel::send_reply(req, result_str);
232
4
}
233
234
4
bool load_size_smaller_than_wal_limit(int64_t content_length) {
235
4
    DBUG_EXECUTE_IF("StreamLoad.load_size_smaller_than_wal_limit.always_false", { return false; });
236
    // 1. req->header(HttpHeaders::CONTENT_LENGTH) will return streamload content length. If it is empty or equals to 0, it means this streamload
237
    // is a chunked streamload and we are not sure its size.
238
    // 2. if streamload content length is too large, like larger than 80% of the WAL constrain.
239
    //
240
    // This two cases, we are not certain that the Write-Ahead Logging (WAL) constraints allow for writing down
241
    // these blocks within the limited space. So we need to set group_commit = false to avoid dead lock.
242
4
    size_t max_available_size = ExecEnv::GetInstance()->wal_mgr()->get_max_available_size();
243
4
    return (content_length < 0.8 * max_available_size);
244
4
}
245
246
2
Status is_support_batch_download(const std::string& endpoint) {
247
2
    std::string url = fmt::format("http://{}/api/_tablet/_batch_download?check=true", endpoint);
248
2
    auto check_support_cb = [&url](HttpClient* client) {
249
2
        RETURN_IF_ERROR(client->init(url));
250
2
        client->set_timeout_ms(CHECK_SUPPORT_TIMEOUT * 1000);
251
2
        client->set_method(HttpMethod::HEAD);
252
2
        std::string response;
253
2
        return client->execute(&response);
254
2
    };
255
2
    return HttpClient::execute_with_retry(DOWNLOAD_FILE_MAX_RETRY, 1, check_support_cb);
256
2
}
257
258
Status list_remote_files_v2(const std::string& address, const std::string& token,
259
                            const std::string& remote_dir,
260
2
                            std::vector<std::pair<std::string, size_t>>* file_info_list) {
261
2
    std::string remote_url =
262
2
            fmt::format("http://{}/api/_tablet/_batch_download?token={}&dir={}&list=true", address,
263
2
                        token, remote_dir);
264
265
2
    std::string file_list_str;
266
2
    auto list_files_cb = [&](HttpClient* client) {
267
2
        file_list_str.clear();
268
2
        RETURN_IF_ERROR(client->init(remote_url, false));
269
2
        client->set_method(HttpMethod::GET);
270
2
        client->set_timeout_ms(LIST_REMOTE_FILE_TIMEOUT * 1000);
271
2
        return client->execute(&file_list_str);
272
2
    };
273
2
    Status status = HttpClient::execute_with_retry(DOWNLOAD_FILE_MAX_RETRY, 1, list_files_cb);
274
2
    if (!status.ok()) {
275
0
        LOG(WARNING) << "failed to list remote files from " << remote_url
276
0
                     << ", status: " << status.to_string() << ", response: " << file_list_str;
277
0
        return status;
278
0
    }
279
280
2
    std::vector<std::string> file_list =
281
2
            absl::StrSplit(file_list_str, "\n", absl::SkipWhitespace());
282
2
    if (file_list.size() % 2 != 0) {
283
0
        return Status::InternalError("batch download files: invalid file list, size is not even");
284
0
    }
285
286
2
    VLOG_DEBUG << "list remote files from " << remote_url
287
0
               << ", file count: " << file_list.size() / 2;
288
289
72
    for (size_t i = 0; i < file_list.size(); i += 2) {
290
70
        uint64_t file_size = 0;
291
70
        try {
292
70
            file_size = std::stoull(file_list[i + 1]);
293
70
        } catch (std::exception&) {
294
0
            return Status::InternalError("batch download files: invalid file size format: " +
295
0
                                         file_list[i + 1]);
296
0
        }
297
70
        file_info_list->emplace_back(std::move(file_list[i]), file_size);
298
70
    }
299
300
2
    return Status::OK();
301
2
}
302
303
Status download_files_v2(const std::string& address, const std::string& token,
304
                         const std::string& remote_dir, const std::string& local_dir,
305
2
                         const std::vector<std::pair<std::string, size_t>>& file_info_list) {
306
2
    std::string remote_url = fmt::format("http://{}/api/_tablet/_batch_download?dir={}&token={}",
307
2
                                         address, remote_dir, token);
308
309
2
    size_t batch_file_size = 0;
310
2
    std::unordered_set<std::string> expected_files;
311
2
    std::stringstream ss;
312
70
    for (const auto& file_info : file_info_list) {
313
70
        ss << file_info.first << "\n";
314
70
        batch_file_size += file_info.second;
315
70
        expected_files.insert(file_info.first);
316
70
    }
317
2
    std::string payload = ss.str();
318
319
2
    uint64_t estimate_timeout = batch_file_size / config::download_low_speed_limit_kbps / 1024;
320
2
    if (estimate_timeout < config::download_low_speed_time) {
321
2
        estimate_timeout = config::download_low_speed_time;
322
2
    }
323
324
2
    LOG(INFO) << "begin to download files from " << remote_url << " to " << local_dir
325
2
              << ", file count: " << file_info_list.size() << ", total size: " << batch_file_size
326
2
              << ", timeout: " << estimate_timeout;
327
328
2
    auto callback = [&](HttpClient* client) -> Status {
329
2
        RETURN_IF_ERROR(client->init(remote_url, false));
330
2
        client->set_method(HttpMethod::POST);
331
2
        client->set_payload(payload);
332
2
        client->set_timeout_ms(estimate_timeout * 1000);
333
2
        RETURN_IF_ERROR(client->download_multi_files(local_dir, expected_files));
334
70
        for (auto&& [file_name, file_size] : file_info_list) {
335
70
            std::string local_file_path = local_dir + "/" + file_name;
336
337
70
            std::error_code ec;
338
            // Check file length
339
70
            uint64_t local_file_size = std::filesystem::file_size(local_file_path, ec);
340
70
            if (ec) {
341
0
                LOG(WARNING) << "download file error: " << ec.message();
342
0
                return Status::IOError("can't retrive file_size of {}, due to {}", local_file_path,
343
0
                                       ec.message());
344
0
            }
345
70
            if (local_file_size != file_size) {
346
0
                LOG(WARNING) << "download file length error"
347
0
                             << ", remote_path=" << mask_token(remote_url)
348
0
                             << ", file_name=" << file_name << ", file_size=" << file_size
349
0
                             << ", local_file_size=" << local_file_size;
350
0
                return Status::InternalError("downloaded file size is not equal");
351
0
            }
352
70
            RETURN_IF_ERROR(io::global_local_filesystem()->permission(
353
70
                    local_file_path, io::LocalFileSystem::PERMS_OWNER_RW));
354
70
        }
355
356
2
        return Status::OK();
357
2
    };
358
2
    return HttpClient::execute_with_retry(DOWNLOAD_FILE_MAX_RETRY, 1, callback);
359
2
}
360
361
} // namespace doris