be/src/service/http/action/download_action.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "service/http/action/download_action.h" |
19 | | |
20 | | #include <memory> |
21 | | #include <string> |
22 | | #include <utility> |
23 | | |
24 | | #include "common/config.h" |
25 | | #include "common/logging.h" |
26 | | #include "common/status.h" |
27 | | #include "io/fs/local_file_system.h" |
28 | | #include "runtime/exec_env.h" |
29 | | #include "service/http/http_channel.h" |
30 | | #include "service/http/http_request.h" |
31 | | #include "service/http/utils.h" |
32 | | |
33 | | namespace doris { |
34 | | namespace { |
35 | | const std::string FILE_PARAMETER = "file"; |
36 | | const std::string TOKEN_PARAMETER = "token"; |
37 | | const std::string CHANNEL_PARAMETER = "channel"; |
38 | | const std::string CHANNEL_INGEST_BINLOG_TYPE = "ingest_binlog"; |
39 | | const std::string ACQUIRE_MD5_PARAMETER = "acquire_md5"; |
40 | | } // namespace |
41 | | |
42 | | DownloadAction::DownloadAction(ExecEnv* exec_env, |
43 | | std::shared_ptr<bufferevent_rate_limit_group> rate_limit_group, |
44 | | const std::vector<std::string>& allow_dirs, int32_t num_workers) |
45 | 22 | : HttpHandlerWithAuth(exec_env), |
46 | 22 | _download_type(NORMAL), |
47 | 22 | _num_workers(num_workers), |
48 | 22 | _rate_limit_group(std::move(rate_limit_group)) { |
49 | 34 | for (const auto& dir : allow_dirs) { |
50 | 34 | std::string p; |
51 | 34 | Status st = io::global_local_filesystem()->canonicalize(dir, &p); |
52 | 34 | if (!st.ok()) { |
53 | 0 | continue; |
54 | 0 | } |
55 | 34 | _allow_paths.emplace_back(std::move(p)); |
56 | 34 | } |
57 | 22 | if (_num_workers > 0) { |
58 | | // for single-replica-load |
59 | 6 | static_cast<void>(ThreadPoolBuilder("DownloadThreadPool") |
60 | 6 | .set_min_threads(num_workers) |
61 | 6 | .set_max_threads(num_workers) |
62 | 6 | .build(&_download_workers)); |
63 | 6 | } |
64 | 22 | } |
65 | | |
66 | | DownloadAction::DownloadAction(ExecEnv* exec_env, const std::string& error_log_root_dir) |
67 | 8 | : HttpHandlerWithAuth(exec_env), _download_type(ERROR_LOG), _num_workers(0) { |
68 | 8 | #ifndef BE_TEST |
69 | 8 | static_cast<void>( |
70 | 8 | io::global_local_filesystem()->canonicalize(error_log_root_dir, &_error_log_root_dir)); |
71 | 8 | #endif |
72 | 8 | } |
73 | | |
74 | 4 | void DownloadAction::handle_normal(HttpRequest* req, const std::string& file_param) { |
75 | | // check token |
76 | 4 | Status status; |
77 | 4 | if (config::enable_token_check) { |
78 | 4 | status = check_token(req); |
79 | 4 | if (!status.ok()) { |
80 | 0 | std::string error_msg = status.to_string(); |
81 | 0 | if (status.is<ErrorCode::NOT_AUTHORIZED>()) { |
82 | 0 | HttpChannel::send_reply(req, HttpStatus::UNAUTHORIZED, error_msg); |
83 | 0 | return; |
84 | 0 | } else { |
85 | 0 | HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, error_msg); |
86 | 0 | return; |
87 | 0 | } |
88 | 0 | } |
89 | 4 | } |
90 | | |
91 | 4 | status = check_path_is_allowed(file_param); |
92 | 4 | if (!status.ok()) { |
93 | 0 | std::string error_msg = status.to_string(); |
94 | 0 | if (status.is<ErrorCode::NOT_FOUND>() || status.is<ErrorCode::IO_ERROR>()) { |
95 | 0 | HttpChannel::send_reply(req, HttpStatus::NOT_FOUND, error_msg); |
96 | 0 | return; |
97 | 0 | } else if (status.is<ErrorCode::NOT_AUTHORIZED>()) { |
98 | 0 | HttpChannel::send_reply(req, HttpStatus::UNAUTHORIZED, error_msg); |
99 | 0 | return; |
100 | 0 | } else { |
101 | 0 | HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, error_msg); |
102 | 0 | return; |
103 | 0 | } |
104 | 0 | } |
105 | | |
106 | 4 | bool is_dir = false; |
107 | 4 | status = io::global_local_filesystem()->is_directory(file_param, &is_dir); |
108 | 4 | if (!status.ok()) { |
109 | 0 | HttpChannel::send_reply(req, status.to_string()); |
110 | 0 | return; |
111 | 0 | } |
112 | | |
113 | 4 | if (is_dir) { |
114 | 1 | do_dir_response(file_param, req); |
115 | 3 | } else { |
116 | 3 | const auto& channel = req->param(CHANNEL_PARAMETER); |
117 | 3 | bool ingest_binlog = (channel == CHANNEL_INGEST_BINLOG_TYPE); |
118 | 3 | bool is_acquire_md5 = !req->param(ACQUIRE_MD5_PARAMETER).empty(); |
119 | 3 | auto* rate_limit_group = ingest_binlog ? _rate_limit_group.get() : nullptr; |
120 | 3 | do_file_response(file_param, req, rate_limit_group, is_acquire_md5); |
121 | 3 | } |
122 | 4 | } |
123 | | |
124 | 0 | void DownloadAction::handle_error_log(HttpRequest* req, const std::string& file_param) { |
125 | 0 | const std::string absolute_path = _error_log_root_dir + "/" + file_param; |
126 | |
|
127 | 0 | Status status = check_log_path_is_allowed(absolute_path); |
128 | 0 | if (!status.ok()) { |
129 | 0 | std::string error_msg = status.to_string(); |
130 | 0 | if (status.is<ErrorCode::NOT_AUTHORIZED>()) { |
131 | 0 | HttpChannel::send_reply(req, HttpStatus::UNAUTHORIZED, error_msg); |
132 | 0 | return; |
133 | 0 | } else { |
134 | 0 | HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, error_msg); |
135 | 0 | return; |
136 | 0 | } |
137 | 0 | } |
138 | | |
139 | 0 | bool is_dir = false; |
140 | 0 | status = io::global_local_filesystem()->is_directory(absolute_path, &is_dir); |
141 | 0 | if (!status.ok()) { |
142 | 0 | std::string error_msg = status.to_string(); |
143 | 0 | if (status.is<ErrorCode::NOT_FOUND>() || status.is<ErrorCode::IO_ERROR>()) { |
144 | 0 | HttpChannel::send_reply(req, HttpStatus::NOT_FOUND, error_msg); |
145 | 0 | return; |
146 | 0 | } else if (status.is<ErrorCode::NOT_AUTHORIZED>()) { |
147 | 0 | HttpChannel::send_reply(req, HttpStatus::UNAUTHORIZED, error_msg); |
148 | 0 | return; |
149 | 0 | } else { |
150 | 0 | HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, error_msg); |
151 | 0 | return; |
152 | 0 | } |
153 | 0 | } |
154 | 0 | if (is_dir) { |
155 | 0 | std::string error_msg = "error log can only be file."; |
156 | 0 | HttpChannel::send_reply(req, error_msg); |
157 | 0 | return; |
158 | 0 | } |
159 | | |
160 | 0 | do_file_response(absolute_path, req); |
161 | 0 | } |
162 | | |
163 | 4 | void DownloadAction::handle(HttpRequest* req) { |
164 | 4 | if (_num_workers > 0) { |
165 | | // async for heavy download job, currently mainly for single-replica-load |
166 | 0 | auto status = _download_workers->submit_func([this, req]() { _handle(req); }); |
167 | 0 | if (!status.ok()) { |
168 | 0 | HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, status.to_string()); |
169 | 0 | } |
170 | 4 | } else { |
171 | 4 | _handle(req); |
172 | 4 | } |
173 | 4 | } |
174 | | |
175 | 4 | void DownloadAction::_handle(HttpRequest* req) { |
176 | 4 | VLOG_CRITICAL << "accept one download request " << req->debug_string(); |
177 | | |
178 | | // Get 'file' parameter, then assembly file absolute path |
179 | 4 | const std::string& file_path = req->param(FILE_PARAMETER); |
180 | 4 | if (file_path.empty()) { |
181 | 0 | std::string error_msg = |
182 | 0 | std::string("parameter " + FILE_PARAMETER + " not specified in url."); |
183 | 0 | HttpChannel::send_reply(req, error_msg); |
184 | 0 | return; |
185 | 0 | } |
186 | | |
187 | 4 | if (_download_type == ERROR_LOG) { |
188 | 0 | handle_error_log(req, file_path); |
189 | 4 | } else if (_download_type == NORMAL) { |
190 | 4 | handle_normal(req, file_path); |
191 | 4 | } |
192 | | |
193 | 4 | VLOG_CRITICAL << "deal with download request finished! "; |
194 | 4 | } |
195 | | |
196 | 4 | Status DownloadAction::check_token(HttpRequest* req) { |
197 | 4 | const std::string& token_str = req->param(TOKEN_PARAMETER); |
198 | 4 | if (token_str.empty()) { |
199 | 0 | return Status::NotAuthorized("token is not specified."); |
200 | 0 | } |
201 | | |
202 | 4 | const std::string& local_token = _exec_env->token(); |
203 | 4 | if (token_str != local_token) { |
204 | 0 | LOG(WARNING) << "invalid download token: " << token_str << ", local token: " << local_token; |
205 | 0 | return Status::NotAuthorized("invalid token {}", token_str); |
206 | 0 | } |
207 | | |
208 | 4 | return Status::OK(); |
209 | 4 | } |
210 | | |
211 | 4 | Status DownloadAction::check_path_is_allowed(const std::string& file_path) { |
212 | 4 | DCHECK_EQ(_download_type, NORMAL); |
213 | | |
214 | 4 | std::string canonical_file_path; |
215 | 4 | RETURN_IF_ERROR(io::global_local_filesystem()->canonicalize(file_path, &canonical_file_path)); |
216 | 4 | for (auto& allow_path : _allow_paths) { |
217 | 4 | if (io::LocalFileSystem::contain_path(allow_path, canonical_file_path)) { |
218 | 4 | return Status::OK(); |
219 | 4 | } |
220 | 4 | } |
221 | | |
222 | 0 | return Status::NotAuthorized("file path is not allowed: {}", canonical_file_path); |
223 | 4 | } |
224 | | |
225 | 0 | Status DownloadAction::check_log_path_is_allowed(const std::string& file_path) { |
226 | 0 | DCHECK_EQ(_download_type, ERROR_LOG); |
227 | |
|
228 | 0 | std::string canonical_file_path; |
229 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->canonicalize(file_path, &canonical_file_path)); |
230 | 0 | if (io::LocalFileSystem::contain_path(_error_log_root_dir, canonical_file_path)) { |
231 | 0 | return Status::OK(); |
232 | 0 | } |
233 | | |
234 | 0 | return Status::NotAuthorized("file path is not allowed: {}", file_path); |
235 | 0 | } |
236 | | |
237 | | } // end namespace doris |