be/src/service/http/action/download_binlog_action.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "service/http/action/download_binlog_action.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <fmt/ranges.h> |
22 | | |
23 | | #include <cstdint> |
24 | | #include <stdexcept> |
25 | | #include <string_view> |
26 | | #include <utility> |
27 | | |
28 | | #include "common/config.h" |
29 | | #include "common/logging.h" |
30 | | #include "io/fs/local_file_system.h" |
31 | | #include "runtime/exec_env.h" |
32 | | #include "service/http/http_channel.h" |
33 | | #include "service/http/http_request.h" |
34 | | #include "service/http/utils.h" |
35 | | #include "storage/storage_engine.h" |
36 | | #include "storage/tablet/tablet_manager.h" |
37 | | #include "util/stopwatch.hpp" |
38 | | |
39 | | namespace doris { |
40 | | |
41 | | namespace { |
42 | | const std::string kMethodParameter = "method"; |
43 | | const std::string kTokenParameter = "token"; |
44 | | const std::string kTabletIdParameter = "tablet_id"; |
45 | | const std::string kBinlogVersionParameter = "binlog_version"; |
46 | | const std::string kRowsetIdParameter = "rowset_id"; |
47 | | const std::string kSegmentIndexParameter = "segment_index"; |
48 | | const std::string kSegmentIndexIdParameter = "segment_index_id"; |
49 | | const std::string kAcquireMD5Parameter = "acquire_md5"; |
50 | | |
51 | | bvar::LatencyRecorder g_get_binlog_info_latency("doris_download_binlog", "get_binlog_info"); |
52 | | bvar::LatencyRecorder g_get_segment_file_latency("doris_download_binlog", "get_segment_file"); |
53 | | bvar::LatencyRecorder g_get_segment_index_file_latency("doris_download_binlog", |
54 | | "get_segment_index_file"); |
55 | | bvar::LatencyRecorder g_get_rowset_meta_latency("doris_download_binlog", "get_rowset_meta"); |
56 | | |
57 | | // get http param, if no value throw exception |
58 | 0 | const auto& get_http_param(HttpRequest* req, const std::string& param_name) { |
59 | 0 | const auto& param = req->param(param_name); |
60 | 0 | if (param.empty()) { |
61 | 0 | auto error_msg = fmt::format("parameter {} not specified in url.", param_name); |
62 | 0 | throw std::runtime_error(error_msg); |
63 | 0 | } |
64 | 0 | return param; |
65 | 0 | } |
66 | | |
67 | 0 | auto get_tablet(StorageEngine& engine, const std::string& tablet_id_str) { |
68 | 0 | int64_t tablet_id = std::atoll(tablet_id_str.data()); |
69 | |
|
70 | 0 | TabletSharedPtr tablet = engine.tablet_manager()->get_tablet(tablet_id); |
71 | 0 | if (tablet == nullptr) { |
72 | 0 | auto error = fmt::format("tablet is not exist, tablet_id={}", tablet_id); |
73 | 0 | LOG(WARNING) << error; |
74 | 0 | throw std::runtime_error(error); |
75 | 0 | } |
76 | | |
77 | 0 | return tablet; |
78 | 0 | } |
79 | | |
80 | | // need binlog_version, tablet_id |
81 | 0 | void handle_get_binlog_info(StorageEngine& engine, HttpRequest* req) { |
82 | 0 | try { |
83 | 0 | const auto& binlog_version = get_http_param(req, kBinlogVersionParameter); |
84 | 0 | const auto& tablet_id = get_http_param(req, kTabletIdParameter); |
85 | 0 | auto tablet = get_tablet(engine, tablet_id); |
86 | |
|
87 | 0 | const auto& [rowset_id, num_segments] = tablet->get_binlog_info(binlog_version); |
88 | 0 | if (rowset_id.empty()) { |
89 | 0 | HttpChannel::send_reply( |
90 | 0 | req, HttpStatus::NOT_FOUND, |
91 | 0 | fmt::format("get binlog info failed, binlog_version={}", binlog_version)); |
92 | 0 | } else if (num_segments < 0) { |
93 | 0 | HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, |
94 | 0 | fmt::format("invalid num_segments: {}", num_segments)); |
95 | 0 | } else { |
96 | 0 | auto binlog_info_msg = fmt::format("{}:{}", rowset_id, num_segments); |
97 | 0 | HttpChannel::send_reply(req, binlog_info_msg); |
98 | 0 | } |
99 | 0 | } catch (const std::exception& e) { |
100 | 0 | HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what()); |
101 | 0 | LOG(WARNING) << "get binlog info failed, error: " << e.what(); |
102 | 0 | return; |
103 | 0 | } |
104 | 0 | } |
105 | | |
106 | | /// handle get segment file, need tablet_id, rowset_id && index |
107 | | void handle_get_segment_file(StorageEngine& engine, HttpRequest* req, |
108 | 0 | bufferevent_rate_limit_group* rate_limit_group) { |
109 | | // Step 1: get download file path |
110 | 0 | std::string segment_file_path; |
111 | 0 | bool is_acquire_md5 = false; |
112 | 0 | try { |
113 | 0 | const auto& tablet_id = get_http_param(req, kTabletIdParameter); |
114 | 0 | auto tablet = get_tablet(engine, tablet_id); |
115 | 0 | const auto& rowset_id = get_http_param(req, kRowsetIdParameter); |
116 | 0 | const auto& segment_index = get_http_param(req, kSegmentIndexParameter); |
117 | 0 | segment_file_path = tablet->get_segment_filepath(rowset_id, segment_index); |
118 | 0 | is_acquire_md5 = !req->param(kAcquireMD5Parameter).empty(); |
119 | 0 | } catch (const std::exception& e) { |
120 | 0 | HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what()); |
121 | 0 | LOG(WARNING) << "get download file path failed, error: " << e.what(); |
122 | 0 | return; |
123 | 0 | } |
124 | | |
125 | | // Step 2: handle download |
126 | | // check file exists |
127 | 0 | bool exists = false; |
128 | 0 | Status status = io::global_local_filesystem()->exists(segment_file_path, &exists); |
129 | 0 | if (!status.ok()) { |
130 | 0 | HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, status.to_string()); |
131 | 0 | LOG(WARNING) << "check file exists failed, error: " << status.to_string(); |
132 | 0 | return; |
133 | 0 | } |
134 | 0 | if (!exists) { |
135 | 0 | HttpChannel::send_reply(req, HttpStatus::NOT_FOUND, "file not exist."); |
136 | 0 | LOG(WARNING) << "file not exist, file path: " << segment_file_path; |
137 | 0 | return; |
138 | 0 | } |
139 | 0 | do_file_response(segment_file_path, req, rate_limit_group, is_acquire_md5); |
140 | 0 | } |
141 | | |
142 | | /// handle get segment index file, need tablet_id, rowset_id, segment_index && segment_index_id |
143 | | void handle_get_segment_index_file(StorageEngine& engine, HttpRequest* req, |
144 | 0 | bufferevent_rate_limit_group* rate_limit_group) { |
145 | | // Step 1: get download file path |
146 | 0 | std::string segment_index_file_path; |
147 | 0 | bool is_acquire_md5 = false; |
148 | 0 | try { |
149 | 0 | const auto& tablet_id = get_http_param(req, kTabletIdParameter); |
150 | 0 | auto tablet = get_tablet(engine, tablet_id); |
151 | 0 | const auto& rowset_id = get_http_param(req, kRowsetIdParameter); |
152 | 0 | const auto& segment_index = get_http_param(req, kSegmentIndexParameter); |
153 | 0 | const auto& segment_index_id = req->param(kSegmentIndexIdParameter); |
154 | 0 | auto segment_file_path = tablet->get_segment_filepath(rowset_id, segment_index); |
155 | 0 | if (tablet->tablet_schema()->get_inverted_index_storage_format() == |
156 | 0 | InvertedIndexStorageFormatPB::V1) { |
157 | | // now CCR not support for variant + index v1 |
158 | 0 | constexpr std::string_view index_suffix = ""; |
159 | 0 | segment_index_file_path = InvertedIndexDescriptor::get_index_file_path_v1( |
160 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path), |
161 | 0 | std::stoll(segment_index_id), index_suffix); |
162 | 0 | } else { |
163 | 0 | DCHECK(segment_index_id == "-1"); |
164 | 0 | segment_index_file_path = InvertedIndexDescriptor::get_index_file_path_v2( |
165 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path)); |
166 | 0 | } |
167 | 0 | is_acquire_md5 = !req->param(kAcquireMD5Parameter).empty(); |
168 | 0 | } catch (const std::exception& e) { |
169 | 0 | HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what()); |
170 | 0 | LOG(WARNING) << "get download file path failed, error: " << e.what(); |
171 | 0 | return; |
172 | 0 | } |
173 | | |
174 | | // Step 2: handle download |
175 | | // check file exists |
176 | 0 | bool exists = false; |
177 | 0 | Status status = io::global_local_filesystem()->exists(segment_index_file_path, &exists); |
178 | 0 | if (!status.ok()) { |
179 | 0 | HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, status.to_string()); |
180 | 0 | LOG(WARNING) << "check file exists failed, error: " << status.to_string(); |
181 | 0 | return; |
182 | 0 | } |
183 | 0 | if (!exists) { |
184 | 0 | HttpChannel::send_reply(req, HttpStatus::NOT_FOUND, "file not exist."); |
185 | 0 | LOG(WARNING) << "file not exist, file path: " << segment_index_file_path; |
186 | 0 | return; |
187 | 0 | } |
188 | 0 | do_file_response(segment_index_file_path, req, rate_limit_group, is_acquire_md5); |
189 | 0 | } |
190 | | |
191 | 0 | void handle_get_rowset_meta(StorageEngine& engine, HttpRequest* req) { |
192 | 0 | try { |
193 | 0 | const auto& tablet_id = get_http_param(req, kTabletIdParameter); |
194 | 0 | auto tablet = get_tablet(engine, tablet_id); |
195 | 0 | const auto& rowset_id = get_http_param(req, kRowsetIdParameter); |
196 | 0 | const auto& binlog_version = get_http_param(req, kBinlogVersionParameter); |
197 | 0 | auto rowset_meta = tablet->get_rowset_binlog_meta(binlog_version, rowset_id); |
198 | 0 | if (rowset_meta.empty()) { |
199 | 0 | HttpChannel::send_reply(req, HttpStatus::NOT_FOUND, |
200 | 0 | fmt::format("get rowset meta failed, rowset_id={}", rowset_id)); |
201 | 0 | } else { |
202 | 0 | HttpChannel::send_reply(req, rowset_meta); |
203 | 0 | } |
204 | 0 | } catch (const std::exception& e) { |
205 | 0 | HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what()); |
206 | 0 | LOG(WARNING) << "get download file path failed, error: " << e.what(); |
207 | 0 | } |
208 | 0 | } |
209 | | |
210 | | } // namespace |
211 | | |
212 | | DownloadBinlogAction::DownloadBinlogAction( |
213 | | ExecEnv* exec_env, StorageEngine& engine, |
214 | | std::shared_ptr<bufferevent_rate_limit_group> rate_limit_group) |
215 | 0 | : HttpHandlerWithAuth(exec_env), |
216 | 0 | _engine(engine), |
217 | 0 | _rate_limit_group(std::move(rate_limit_group)) {} |
218 | | |
219 | 0 | void DownloadBinlogAction::handle(HttpRequest* req) { |
220 | 0 | VLOG_CRITICAL << "accept one download binlog request " << req->debug_string(); |
221 | |
|
222 | 0 | if (!config::enable_feature_binlog) { |
223 | 0 | HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, |
224 | 0 | "binlog feature is not enabled."); |
225 | 0 | return; |
226 | 0 | } |
227 | | |
228 | | // Step 1: check token |
229 | 0 | Status status; |
230 | 0 | if (config::enable_token_check) { |
231 | | // FIXME(Drogon): support check token |
232 | | // status = _check_token(req); |
233 | 0 | if (!status.ok()) { |
234 | 0 | HttpChannel::send_reply(req, HttpStatus::UNAUTHORIZED, status.to_string()); |
235 | 0 | return; |
236 | 0 | } |
237 | 0 | } |
238 | | |
239 | | // Step 2: get method |
240 | 0 | const std::string& method = req->param(kMethodParameter); |
241 | | |
242 | | // Step 3: dispatch |
243 | 0 | MonotonicStopWatch watch; |
244 | 0 | watch.start(); |
245 | 0 | if (method == "get_binlog_info") { |
246 | 0 | handle_get_binlog_info(_engine, req); |
247 | 0 | g_get_binlog_info_latency << watch.elapsed_time_microseconds(); |
248 | 0 | } else if (method == "get_segment_file") { |
249 | 0 | handle_get_segment_file(_engine, req, _rate_limit_group.get()); |
250 | 0 | g_get_segment_file_latency << watch.elapsed_time_microseconds(); |
251 | 0 | } else if (method == "get_segment_index_file") { |
252 | 0 | handle_get_segment_index_file(_engine, req, _rate_limit_group.get()); |
253 | 0 | g_get_segment_index_file_latency << watch.elapsed_time_microseconds(); |
254 | 0 | } else if (method == "get_rowset_meta") { |
255 | 0 | handle_get_rowset_meta(_engine, req); |
256 | 0 | g_get_rowset_meta_latency << watch.elapsed_time_microseconds(); |
257 | 0 | } else { |
258 | 0 | auto error_msg = fmt::format("invalid method: {}", method); |
259 | 0 | LOG(WARNING) << error_msg; |
260 | 0 | HttpChannel::send_reply(req, HttpStatus::NOT_IMPLEMENTED, error_msg); |
261 | 0 | } |
262 | 0 | } |
263 | | |
264 | 0 | Status DownloadBinlogAction::_check_token(HttpRequest* req) { |
265 | 0 | const std::string& token_str = req->param(kTokenParameter); |
266 | 0 | if (token_str.empty()) { |
267 | 0 | return Status::InternalError("token is not specified."); |
268 | 0 | } |
269 | | |
270 | 0 | const std::string& local_token = _exec_env->token(); |
271 | 0 | if (token_str != local_token) { |
272 | 0 | LOG(WARNING) << "invalid download token: " << token_str << ", local token: " << local_token; |
273 | 0 | return Status::NotAuthorized("invalid token {}", token_str); |
274 | 0 | } |
275 | | |
276 | 0 | return Status::OK(); |
277 | 0 | } |
278 | | |
279 | | } // end namespace doris |