/root/doris/be/src/runtime/snapshot_loader.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "runtime/snapshot_loader.h" |
19 | | |
20 | | // IWYU pragma: no_include <bthread/errno.h> |
21 | | #include <errno.h> // IWYU pragma: keep |
22 | | #include <fmt/format.h> |
23 | | #include <gen_cpp/AgentService_types.h> |
24 | | #include <gen_cpp/FrontendService.h> |
25 | | #include <gen_cpp/FrontendService_types.h> |
26 | | #include <gen_cpp/HeartbeatService_types.h> |
27 | | #include <gen_cpp/PlanNodes_types.h> |
28 | | #include <gen_cpp/Status_types.h> |
29 | | #include <gen_cpp/Types_types.h> |
30 | | |
31 | | #include <algorithm> |
32 | | #include <cstddef> |
33 | | #include <cstring> |
34 | | #include <filesystem> |
35 | | #include <istream> |
36 | | #include <unordered_map> |
37 | | #include <utility> |
38 | | |
39 | | #include "common/config.h" |
40 | | #include "common/logging.h" |
41 | | #include "gutil/strings/split.h" |
42 | | #include "http/http_client.h" |
43 | | #include "io/fs/broker_file_system.h" |
44 | | #include "io/fs/file_system.h" |
45 | | #include "io/fs/hdfs_file_system.h" |
46 | | #include "io/fs/local_file_system.h" |
47 | | #include "io/fs/path.h" |
48 | | #include "io/fs/remote_file_system.h" |
49 | | #include "io/fs/s3_file_system.h" |
50 | | #include "io/hdfs_builder.h" |
51 | | #include "olap/data_dir.h" |
52 | | #include "olap/snapshot_manager.h" |
53 | | #include "olap/storage_engine.h" |
54 | | #include "olap/tablet.h" |
55 | | #include "olap/tablet_manager.h" |
56 | | #include "runtime/client_cache.h" |
57 | | #include "runtime/exec_env.h" |
58 | | #include "util/s3_uri.h" |
59 | | #include "util/s3_util.h" |
60 | | #include "util/thrift_rpc_helper.h" |
61 | | |
62 | | namespace doris { |
63 | | |
64 | | struct LocalFileStat { |
65 | | uint64_t size; |
66 | | std::string md5; |
67 | | }; |
68 | | |
69 | | struct RemoteFileStat { |
70 | | std::string url; |
71 | | std::string md5; |
72 | | uint64_t size; |
73 | | }; |
74 | | |
75 | | class SnapshotHttpDownloader { |
76 | | public: |
77 | | SnapshotHttpDownloader(const TRemoteTabletSnapshot& remote_tablet_snapshot, |
78 | | TabletSharedPtr tablet, SnapshotLoader& snapshot_loader) |
79 | | : _tablet(std::move(tablet)), |
80 | | _snapshot_loader(snapshot_loader), |
81 | | _local_tablet_id(remote_tablet_snapshot.local_tablet_id), |
82 | | _remote_tablet_id(remote_tablet_snapshot.remote_tablet_id), |
83 | | _local_path(remote_tablet_snapshot.local_snapshot_path), |
84 | | _remote_path(remote_tablet_snapshot.remote_snapshot_path), |
85 | 1 | _remote_be_addr(remote_tablet_snapshot.remote_be_addr) { |
86 | 1 | auto& token = remote_tablet_snapshot.remote_token; |
87 | 1 | auto& remote_be_addr = remote_tablet_snapshot.remote_be_addr; |
88 | | |
89 | | // HEAD http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180/ |
90 | 1 | _base_url = fmt::format("http://{}:{}/api/_tablet/_download?token={}&channel=ingest_binlog", |
91 | 1 | remote_be_addr.hostname, remote_be_addr.port, token); |
92 | 1 | } |
93 | 1 | ~SnapshotHttpDownloader() = default; |
94 | | SnapshotHttpDownloader(const SnapshotHttpDownloader&) = delete; |
95 | | SnapshotHttpDownloader& operator=(const SnapshotHttpDownloader&) = delete; |
96 | | |
97 | 0 | void set_report_progress_callback(std::function<Status()> report_progress) { |
98 | 0 | _report_progress_callback = std::move(report_progress); |
99 | 0 | } |
100 | | |
101 | 1 | size_t get_download_file_num() { return _need_download_files.size(); } |
102 | | |
103 | | Status download(); |
104 | | |
105 | | private: |
106 | | constexpr static int kDownloadFileMaxRetry = 3; |
107 | | |
108 | | // Load existing files from local snapshot path, compute the md5sum of the files |
109 | | // if enable_download_md5sum_check is true |
110 | | Status _load_existing_files(); |
111 | | |
112 | | // List remote files from remote be, and find the hdr file |
113 | | Status _list_remote_files(); |
114 | | |
115 | | // Download hdr file from remote be to a tmp file |
116 | | Status _download_hdr_file(); |
117 | | |
118 | | // Link same rowset files by compare local hdr file and remote hdr file |
119 | | // if the local files are copied from the remote rowset, link them as the |
120 | | // remote rowset files, to avoid the duplicated downloading. |
121 | | Status _link_same_rowset_files(); |
122 | | |
123 | | // Get all remote file stats, excluding the hdr file. |
124 | | Status _get_remote_file_stats(); |
125 | | |
126 | | // Compute the need download files according to the local files md5sum (if enable_download_md5sum_check is true) |
127 | | void _get_need_download_files(); |
128 | | |
129 | | // Download all need download files |
130 | | Status _download_files(); |
131 | | |
132 | | // Install remote hdr file to local snapshot path from the tmp file |
133 | | Status _install_remote_hdr_file(); |
134 | | |
135 | | // Delete orphan files, which are not in remote |
136 | | Status _delete_orphan_files(); |
137 | | |
138 | | // Download a file from remote be to local path with the file stat |
139 | | Status _download_http_file(DataDir* data_dir, const std::string& remote_file_url, |
140 | | const std::string& local_file_path, |
141 | | const RemoteFileStat& remote_filestat); |
142 | | |
143 | | // Get the file stat from remote be |
144 | | Status _get_http_file_stat(const std::string& remote_file_url, RemoteFileStat* file_stat); |
145 | | |
146 | | TabletSharedPtr _tablet; |
147 | | SnapshotLoader& _snapshot_loader; |
148 | | std::function<Status()> _report_progress_callback; |
149 | | |
150 | | std::string _base_url; |
151 | | int64_t _local_tablet_id; |
152 | | int64_t _remote_tablet_id; |
153 | | const std::string& _local_path; |
154 | | const std::string& _remote_path; |
155 | | const TNetworkAddress& _remote_be_addr; |
156 | | |
157 | | std::string _local_hdr_filename; |
158 | | std::string _remote_hdr_filename; |
159 | | std::vector<std::string> _remote_file_list; |
160 | | std::unordered_map<std::string, LocalFileStat> _local_files; |
161 | | std::unordered_map<std::string, RemoteFileStat> _remote_files; |
162 | | |
163 | | std::string _tmp_hdr_file; |
164 | | RemoteFileStat _remote_hdr_filestat; |
165 | | std::vector<std::string> _need_download_files; |
166 | | }; |
167 | | |
168 | 3 | static std::string get_loaded_tag_path(const std::string& snapshot_path) { |
169 | 3 | return snapshot_path + "/LOADED"; |
170 | 3 | } |
171 | | |
172 | 1 | static Status write_loaded_tag(const std::string& snapshot_path, int64_t tablet_id) { |
173 | 1 | std::unique_ptr<io::FileWriter> writer; |
174 | 1 | std::string file = get_loaded_tag_path(snapshot_path); |
175 | 1 | RETURN_IF_ERROR(io::global_local_filesystem()->create_file(file, &writer)); |
176 | 1 | return writer->close(); |
177 | 1 | } |
178 | | |
179 | | Status upload_with_checksum(io::RemoteFileSystem& fs, std::string_view local_path, |
180 | 0 | std::string_view remote_path, std::string_view checksum) { |
181 | 0 | auto full_remote_path = fmt::format("{}.{}", remote_path, checksum); |
182 | 0 | switch (fs.type()) { |
183 | 0 | case io::FileSystemType::HDFS: |
184 | 0 | case io::FileSystemType::BROKER: { |
185 | 0 | std::string temp = fmt::format("{}.part", remote_path); |
186 | 0 | RETURN_IF_ERROR(fs.upload(local_path, temp)); |
187 | 0 | RETURN_IF_ERROR(fs.rename(temp, full_remote_path)); |
188 | 0 | break; |
189 | 0 | } |
190 | 0 | case io::FileSystemType::S3: |
191 | 0 | RETURN_IF_ERROR(fs.upload(local_path, full_remote_path)); |
192 | 0 | break; |
193 | 0 | default: |
194 | 0 | throw Exception(Status::FatalError("unknown fs type: {}", static_cast<int>(fs.type()))); |
195 | 0 | } |
196 | 0 | return Status::OK(); |
197 | 0 | } |
198 | | |
199 | 18 | bool _end_with(std::string_view str, std::string_view match) { |
200 | 18 | return str.size() >= match.size() && |
201 | 18 | str.compare(str.size() - match.size(), match.size(), match) == 0; |
202 | 18 | } |
203 | | |
204 | | Status SnapshotHttpDownloader::_get_http_file_stat(const std::string& remote_file_url, |
205 | 2 | RemoteFileStat* file_stat) { |
206 | 2 | uint64_t file_size = 0; |
207 | 2 | std::string file_md5; |
208 | 2 | auto get_file_stat_cb = [&remote_file_url, &file_size, &file_md5](HttpClient* client) { |
209 | 2 | int64_t timeout_ms = config::download_binlog_meta_timeout_ms; |
210 | 2 | std::string url = remote_file_url; |
211 | 2 | if (config::enable_download_md5sum_check) { |
212 | | // compute md5sum is time-consuming, so we set a longer timeout |
213 | 0 | timeout_ms = config::download_binlog_meta_timeout_ms * 3; |
214 | 0 | url = fmt::format("{}&acquire_md5=true", remote_file_url); |
215 | 0 | } |
216 | 2 | RETURN_IF_ERROR(client->init(url)); |
217 | 2 | client->set_timeout_ms(timeout_ms); |
218 | 2 | RETURN_IF_ERROR(client->head()); |
219 | 2 | RETURN_IF_ERROR(client->get_content_length(&file_size)); |
220 | 2 | if (config::enable_download_md5sum_check) { |
221 | 0 | RETURN_IF_ERROR(client->get_content_md5(&file_md5)); |
222 | 0 | } |
223 | 2 | return Status::OK(); |
224 | 2 | }; |
225 | 2 | RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, get_file_stat_cb)); |
226 | 2 | file_stat->url = remote_file_url; |
227 | 2 | file_stat->size = file_size; |
228 | 2 | file_stat->md5 = std::move(file_md5); |
229 | 2 | return Status::OK(); |
230 | 2 | } |
231 | | |
232 | | Status SnapshotHttpDownloader::_download_http_file(DataDir* data_dir, |
233 | | const std::string& remote_file_url, |
234 | | const std::string& local_file_path, |
235 | 1 | const RemoteFileStat& remote_filestat) { |
236 | 1 | auto file_size = remote_filestat.size; |
237 | 1 | const auto& remote_file_md5 = remote_filestat.md5; |
238 | | |
239 | | // check disk capacity |
240 | 1 | if (data_dir->reach_capacity_limit(file_size)) { |
241 | 0 | return Status::Error<ErrorCode::EXCEEDED_LIMIT>( |
242 | 0 | "reach the capacity limit of path {}, file_size={}", data_dir->path(), file_size); |
243 | 0 | } |
244 | | |
245 | 1 | uint64_t estimate_timeout = file_size / config::download_low_speed_limit_kbps / 1024; |
246 | 1 | if (estimate_timeout < config::download_low_speed_time) { |
247 | 1 | estimate_timeout = config::download_low_speed_time; |
248 | 1 | } |
249 | | |
250 | 1 | LOG(INFO) << "clone begin to download file from: " << remote_file_url |
251 | 1 | << " to: " << local_file_path << ". size(B): " << file_size |
252 | 1 | << ", timeout(s): " << estimate_timeout; |
253 | | |
254 | 1 | auto download_cb = [&remote_file_url, &remote_file_md5, estimate_timeout, &local_file_path, |
255 | 1 | file_size](HttpClient* client) { |
256 | 1 | RETURN_IF_ERROR(client->init(remote_file_url)); |
257 | 1 | client->set_timeout_ms(estimate_timeout * 1000); |
258 | 1 | RETURN_IF_ERROR(client->download(local_file_path)); |
259 | | |
260 | 1 | std::error_code ec; |
261 | | // Check file length |
262 | 1 | uint64_t local_file_size = std::filesystem::file_size(local_file_path, ec); |
263 | 1 | if (ec) { |
264 | 0 | LOG(WARNING) << "download file error" << ec.message(); |
265 | 0 | return Status::IOError("can't retrive file_size of {}, due to {}", local_file_path, |
266 | 0 | ec.message()); |
267 | 0 | } |
268 | 1 | if (local_file_size != file_size) { |
269 | 0 | LOG(WARNING) << "download file length error" |
270 | 0 | << ", remote_path=" << remote_file_url << ", file_size=" << file_size |
271 | 0 | << ", local_file_size=" << local_file_size; |
272 | 0 | return Status::InternalError("downloaded file size is not equal"); |
273 | 0 | } |
274 | | |
275 | 1 | if (!remote_file_md5.empty()) { // keep compatibility |
276 | 0 | std::string local_file_md5; |
277 | 0 | RETURN_IF_ERROR( |
278 | 0 | io::global_local_filesystem()->md5sum(local_file_path, &local_file_md5)); |
279 | 0 | if (local_file_md5 != remote_file_md5) { |
280 | 0 | LOG(WARNING) << "download file md5 error" |
281 | 0 | << ", remote_file_url=" << remote_file_url |
282 | 0 | << ", local_file_path=" << local_file_path |
283 | 0 | << ", remote_file_md5=" << remote_file_md5 |
284 | 0 | << ", local_file_md5=" << local_file_md5; |
285 | 0 | return Status::RuntimeError( |
286 | 0 | "download file {} md5 is not equal, local={}, remote={}", remote_file_url, |
287 | 0 | local_file_md5, remote_file_md5); |
288 | 0 | } |
289 | 0 | } |
290 | | |
291 | 1 | return io::global_local_filesystem()->permission(local_file_path, |
292 | 1 | io::LocalFileSystem::PERMS_OWNER_RW); |
293 | 1 | }; |
294 | 1 | auto status = HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, download_cb); |
295 | 1 | if (!status.ok()) { |
296 | 0 | LOG(WARNING) << "failed to download file from " << remote_file_url |
297 | 0 | << ", status: " << status.to_string(); |
298 | 0 | return status; |
299 | 0 | } |
300 | | |
301 | 1 | return Status::OK(); |
302 | 1 | } |
303 | | |
304 | 1 | Status SnapshotHttpDownloader::_load_existing_files() { |
305 | 1 | std::vector<std::string> existing_files; |
306 | 1 | RETURN_IF_ERROR(_snapshot_loader._get_existing_files_from_local(_local_path, &existing_files)); |
307 | 2 | for (auto& local_file : existing_files) { |
308 | | // add file size |
309 | 2 | std::string local_file_path = _local_path + "/" + local_file; |
310 | 2 | std::error_code ec; |
311 | 2 | uint64_t local_file_size = std::filesystem::file_size(local_file_path, ec); |
312 | 2 | if (ec) { |
313 | 0 | LOG(WARNING) << "download file error, can't retrive file_size of " << local_file_path |
314 | 0 | << ", due to " << ec.message(); |
315 | 0 | return Status::IOError("can't retrive file_size of {}, due to {}", local_file_path, |
316 | 0 | ec.message()); |
317 | 0 | } |
318 | | |
319 | | // get md5sum |
320 | 2 | std::string md5; |
321 | 2 | if (config::enable_download_md5sum_check) { |
322 | 0 | auto status = io::global_local_filesystem()->md5sum(local_file_path, &md5); |
323 | 0 | if (!status.ok()) { |
324 | 0 | LOG(WARNING) << "download file error, local file " << local_file_path |
325 | 0 | << " md5sum: " << status.to_string(); |
326 | 0 | return status; |
327 | 0 | } |
328 | 0 | } |
329 | 2 | _local_files[local_file] = {local_file_size, md5}; |
330 | | |
331 | | // get hdr file |
332 | 2 | if (local_file.ends_with(".hdr")) { |
333 | 1 | _local_hdr_filename = local_file; |
334 | 1 | } |
335 | 2 | } |
336 | | |
337 | 1 | return Status::OK(); |
338 | 1 | } |
339 | | |
340 | 1 | Status SnapshotHttpDownloader::_list_remote_files() { |
341 | | // get all these use http download action |
342 | | // http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180//2774718/217609978/2774718.hdr |
343 | 1 | std::string remote_url_prefix = fmt::format("{}&file={}", _base_url, _remote_path); |
344 | | |
345 | 1 | LOG(INFO) << "list remote files: " << remote_url_prefix << ", job: " << _snapshot_loader._job_id |
346 | 1 | << ", task id: " << _snapshot_loader._task_id << ", remote be: " << _remote_be_addr; |
347 | | |
348 | 1 | std::string remote_file_list_str; |
349 | 1 | auto list_files_cb = [&remote_url_prefix, &remote_file_list_str](HttpClient* client) { |
350 | 1 | RETURN_IF_ERROR(client->init(remote_url_prefix)); |
351 | 1 | client->set_timeout_ms(config::download_binlog_meta_timeout_ms); |
352 | 1 | return client->execute(&remote_file_list_str); |
353 | 1 | }; |
354 | 1 | RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, list_files_cb)); |
355 | | |
356 | 1 | _remote_file_list = strings::Split(remote_file_list_str, "\n", strings::SkipWhitespace()); |
357 | | |
358 | | // find hdr file |
359 | 1 | auto hdr_file = |
360 | 1 | std::find_if(_remote_file_list.begin(), _remote_file_list.end(), |
361 | 1 | [](const std::string& filename) { return _end_with(filename, ".hdr"); }); |
362 | 1 | if (hdr_file == _remote_file_list.end()) { |
363 | 0 | std::string msg = |
364 | 0 | fmt::format("can't find hdr file in remote snapshot path: {}", _remote_path); |
365 | 0 | LOG(WARNING) << msg; |
366 | 0 | return Status::RuntimeError(std::move(msg)); |
367 | 0 | } |
368 | 1 | _remote_hdr_filename = *hdr_file; |
369 | 1 | _remote_file_list.erase(hdr_file); |
370 | | |
371 | 1 | return Status::OK(); |
372 | 1 | } |
373 | | |
374 | 1 | Status SnapshotHttpDownloader::_download_hdr_file() { |
375 | 1 | RemoteFileStat remote_hdr_stat; |
376 | 1 | std::string remote_hdr_file_url = |
377 | 1 | fmt::format("{}&file={}/{}", _base_url, _remote_path, _remote_hdr_filename); |
378 | 1 | auto status = _get_http_file_stat(remote_hdr_file_url, &remote_hdr_stat); |
379 | 1 | if (!status.ok()) { |
380 | 0 | LOG(WARNING) << "failed to get remote hdr file stat: " << remote_hdr_file_url |
381 | 0 | << ", error: " << status.to_string(); |
382 | 0 | return status; |
383 | 0 | } |
384 | | |
385 | 1 | std::string hdr_filename = _remote_hdr_filename + ".tmp"; |
386 | 1 | std::string hdr_file = _local_path + "/" + hdr_filename; |
387 | 1 | status = _download_http_file(_tablet->data_dir(), remote_hdr_file_url, hdr_file, |
388 | 1 | remote_hdr_stat); |
389 | 1 | if (!status.ok()) { |
390 | 0 | LOG(WARNING) << "failed to download remote hdr file: " << remote_hdr_file_url |
391 | 0 | << ", error: " << status.to_string(); |
392 | 0 | return status; |
393 | 0 | } |
394 | 1 | _tmp_hdr_file = hdr_file; |
395 | 1 | _remote_hdr_filestat = remote_hdr_stat; |
396 | 1 | return Status::OK(); |
397 | 1 | } |
398 | | |
399 | 1 | Status SnapshotHttpDownloader::_link_same_rowset_files() { |
400 | 1 | std::string local_hdr_file_path = _local_path + "/" + _local_hdr_filename; |
401 | | |
402 | | // load local tablet meta |
403 | 1 | TabletMetaPB local_tablet_meta; |
404 | 1 | auto status = TabletMeta::load_from_file(local_hdr_file_path, &local_tablet_meta); |
405 | 1 | if (!status.ok()) { |
406 | | // This file might broken because of the partial downloading. |
407 | 0 | LOG(WARNING) << "failed to load local tablet meta: " << local_hdr_file_path |
408 | 0 | << ", skip link same rowset files, error: " << status.to_string(); |
409 | 0 | return Status::OK(); |
410 | 0 | } |
411 | | |
412 | | // load remote tablet meta |
413 | 1 | TabletMetaPB remote_tablet_meta; |
414 | 1 | status = TabletMeta::load_from_file(_tmp_hdr_file, &remote_tablet_meta); |
415 | 1 | if (!status.ok()) { |
416 | 0 | LOG(WARNING) << "failed to load remote tablet meta: " << _tmp_hdr_file |
417 | 0 | << ", error: " << status.to_string(); |
418 | 0 | return status; |
419 | 0 | } |
420 | | |
421 | 1 | LOG(INFO) << "link rowset files by compare " << _local_hdr_filename << " and " |
422 | 1 | << _remote_hdr_filename; |
423 | | |
424 | 1 | std::unordered_map<std::string, const RowsetMetaPB&> remote_rowset_metas; |
425 | 2 | for (const auto& rowset_meta : remote_tablet_meta.rs_metas()) { |
426 | 2 | if (rowset_meta.has_resource_id()) { // skip remote rowset |
427 | 0 | continue; |
428 | 0 | } |
429 | 2 | remote_rowset_metas.insert({rowset_meta.rowset_id_v2(), rowset_meta}); |
430 | 2 | } |
431 | | |
432 | 1 | std::unordered_map<std::string, const RowsetMetaPB&> local_rowset_metas; |
433 | 2 | for (const auto& rowset_meta : local_tablet_meta.rs_metas()) { |
434 | 2 | if (rowset_meta.has_resource_id()) { |
435 | 0 | continue; |
436 | 0 | } |
437 | 2 | local_rowset_metas.insert({rowset_meta.rowset_id_v2(), rowset_meta}); |
438 | 2 | } |
439 | | |
440 | 2 | for (const auto& local_rowset_meta : local_tablet_meta.rs_metas()) { |
441 | 2 | if (local_rowset_meta.has_resource_id() || !local_rowset_meta.has_source_rowset_id()) { |
442 | 2 | continue; |
443 | 2 | } |
444 | | |
445 | 0 | auto remote_rowset_meta = remote_rowset_metas.find(local_rowset_meta.source_rowset_id()); |
446 | 0 | if (remote_rowset_meta == remote_rowset_metas.end()) { |
447 | 0 | continue; |
448 | 0 | } |
449 | | |
450 | 0 | const auto& remote_rowset_id = remote_rowset_meta->first; |
451 | 0 | const auto& remote_rowset_meta_pb = remote_rowset_meta->second; |
452 | 0 | const auto& local_rowset_id = local_rowset_meta.rowset_id_v2(); |
453 | 0 | auto remote_tablet_id = remote_rowset_meta_pb.tablet_id(); |
454 | 0 | if (local_rowset_meta.start_version() != remote_rowset_meta_pb.start_version() || |
455 | 0 | local_rowset_meta.end_version() != remote_rowset_meta_pb.end_version()) { |
456 | 0 | continue; |
457 | 0 | } |
458 | | |
459 | 0 | LOG(INFO) << "rowset " << local_rowset_id << " was downloaded from remote tablet " |
460 | 0 | << remote_tablet_id << " rowset " << remote_rowset_id |
461 | 0 | << ", directly link files instead of downloading"; |
462 | | |
463 | | // Since the rowset meta are the same, we can link the local rowset files as |
464 | | // the downloaded remote rowset files. |
465 | 0 | for (const auto& [local_file, local_filestat] : _local_files) { |
466 | 0 | if (!local_file.starts_with(local_rowset_id)) { |
467 | 0 | continue; |
468 | 0 | } |
469 | | |
470 | 0 | std::string remote_file = local_file; |
471 | 0 | remote_file.replace(0, local_rowset_id.size(), remote_rowset_id); |
472 | 0 | std::string local_file_path = _local_path + "/" + local_file; |
473 | 0 | std::string remote_file_path = _local_path + "/" + remote_file; |
474 | |
|
475 | 0 | bool exist = true; |
476 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(remote_file_path, &exist)); |
477 | 0 | if (exist) { |
478 | 0 | continue; |
479 | 0 | } |
480 | | |
481 | 0 | LOG(INFO) << "link file from " << local_file_path << " to " << remote_file_path; |
482 | 0 | if (!io::global_local_filesystem()->link_file(local_file_path, remote_file_path)) { |
483 | 0 | std::string msg = fmt::format("link file failed from {} to {}, err: {}", |
484 | 0 | local_file_path, remote_file_path, strerror(errno)); |
485 | 0 | LOG(WARNING) << msg; |
486 | 0 | return Status::InternalError(std::move(msg)); |
487 | 0 | } |
488 | | |
489 | 0 | _local_files[remote_file] = local_filestat; |
490 | 0 | } |
491 | 0 | } |
492 | | |
493 | 2 | for (const auto& remote_rowset_meta : remote_tablet_meta.rs_metas()) { |
494 | 2 | if (remote_rowset_meta.has_resource_id() || !remote_rowset_meta.has_source_rowset_id()) { |
495 | 0 | continue; |
496 | 0 | } |
497 | | |
498 | 2 | auto local_rowset_meta = local_rowset_metas.find(remote_rowset_meta.source_rowset_id()); |
499 | 2 | if (local_rowset_meta == local_rowset_metas.end()) { |
500 | 0 | continue; |
501 | 0 | } |
502 | | |
503 | 2 | const auto& local_rowset_id = local_rowset_meta->first; |
504 | 2 | const auto& local_rowset_meta_pb = local_rowset_meta->second; |
505 | 2 | const auto& remote_rowset_id = remote_rowset_meta.rowset_id_v2(); |
506 | 2 | auto local_tablet_id = local_rowset_meta_pb.tablet_id(); |
507 | | |
508 | 2 | if (remote_rowset_meta.start_version() != local_rowset_meta_pb.start_version() || |
509 | 2 | remote_rowset_meta.end_version() != local_rowset_meta_pb.end_version()) { |
510 | 0 | continue; |
511 | 0 | } |
512 | | |
513 | 2 | LOG(INFO) << "remote rowset " << remote_rowset_id << " was derived from local tablet " |
514 | 2 | << local_tablet_id << " rowset " << local_rowset_id |
515 | 2 | << ", skip downloading these files"; |
516 | | |
517 | 2 | for (const auto& remote_file : _remote_file_list) { |
518 | 2 | if (!remote_file.starts_with(remote_rowset_id)) { |
519 | 1 | continue; |
520 | 1 | } |
521 | | |
522 | 1 | std::string local_file = remote_file; |
523 | 1 | local_file.replace(0, remote_rowset_id.size(), local_rowset_id); |
524 | 1 | std::string local_file_path = _local_path + "/" + local_file; |
525 | 1 | std::string remote_file_path = _local_path + "/" + remote_file; |
526 | | |
527 | 1 | bool exist = false; |
528 | 1 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(remote_file_path, &exist)); |
529 | 1 | if (exist) { |
530 | 0 | continue; |
531 | 0 | } |
532 | | |
533 | 1 | LOG(INFO) << "link file from " << local_file_path << " to " << remote_file_path; |
534 | 1 | if (!io::global_local_filesystem()->link_file(local_file_path, remote_file_path)) { |
535 | 0 | std::string msg = fmt::format("link file failed from {} to {}, err: {}", |
536 | 0 | local_file_path, remote_file_path, strerror(errno)); |
537 | 0 | LOG(WARNING) << msg; |
538 | 0 | return Status::InternalError(std::move(msg)); |
539 | 1 | } else { |
540 | 1 | auto it = _local_files.find(local_file); |
541 | 1 | if (it != _local_files.end()) { |
542 | 1 | _local_files[remote_file] = it->second; |
543 | 1 | } else { |
544 | 0 | std::string msg = |
545 | 0 | fmt::format("local file {} don't exist in _local_files, err: {}", |
546 | 0 | local_file, strerror(errno)); |
547 | 0 | LOG(WARNING) << msg; |
548 | 0 | return Status::InternalError(std::move(msg)); |
549 | 0 | } |
550 | 1 | } |
551 | 1 | } |
552 | 2 | } |
553 | 1 | return Status::OK(); |
554 | 1 | } |
555 | | |
556 | 1 | Status SnapshotHttpDownloader::_get_remote_file_stats() { |
557 | 1 | for (const auto& filename : _remote_file_list) { |
558 | 1 | if (_report_progress_callback) { |
559 | 0 | RETURN_IF_ERROR(_report_progress_callback()); |
560 | 0 | } |
561 | | |
562 | 1 | std::string remote_file_url = |
563 | 1 | fmt::format("{}&file={}/{}", _base_url, _remote_path, filename); |
564 | | |
565 | 1 | RemoteFileStat remote_filestat; |
566 | 1 | RETURN_IF_ERROR(_get_http_file_stat(remote_file_url, &remote_filestat)); |
567 | 1 | _remote_files[filename] = remote_filestat; |
568 | 1 | } |
569 | | |
570 | 1 | return Status::OK(); |
571 | 1 | } |
572 | | |
573 | 1 | void SnapshotHttpDownloader::_get_need_download_files() { |
574 | 1 | for (const auto& [remote_file, remote_filestat] : _remote_files) { |
575 | 1 | LOG(INFO) << "remote file: " << remote_file << ", size: " << remote_filestat.size |
576 | 1 | << ", md5: " << remote_filestat.md5; |
577 | 1 | auto it = _local_files.find(remote_file); |
578 | 1 | if (it == _local_files.end()) { |
579 | 0 | _need_download_files.emplace_back(remote_file); |
580 | 0 | continue; |
581 | 0 | } |
582 | | |
583 | 1 | if (auto& local_filestat = it->second; local_filestat.size != remote_filestat.size) { |
584 | 0 | _need_download_files.emplace_back(remote_file); |
585 | 0 | continue; |
586 | 0 | } |
587 | | |
588 | 1 | if (auto& local_filestat = it->second; local_filestat.md5 != remote_filestat.md5) { |
589 | 0 | _need_download_files.emplace_back(remote_file); |
590 | 0 | continue; |
591 | 0 | } |
592 | | |
593 | 1 | LOG(INFO) << fmt::format("file {} already exists, skip download url {}", remote_file, |
594 | 1 | remote_filestat.url); |
595 | 1 | } |
596 | 1 | } |
597 | | |
598 | 0 | Status SnapshotHttpDownloader::_download_files() { |
599 | 0 | DataDir* data_dir = _tablet->data_dir(); |
600 | |
|
601 | 0 | uint64_t total_file_size = 0; |
602 | 0 | MonotonicStopWatch watch(true); |
603 | 0 | for (auto& filename : _need_download_files) { |
604 | 0 | if (_report_progress_callback) { |
605 | 0 | RETURN_IF_ERROR(_report_progress_callback()); |
606 | 0 | } |
607 | | |
608 | 0 | auto& remote_filestat = _remote_files[filename]; |
609 | 0 | auto file_size = remote_filestat.size; |
610 | 0 | auto& remote_file_url = remote_filestat.url; |
611 | 0 | auto& remote_file_md5 = remote_filestat.md5; |
612 | |
|
613 | 0 | std::string local_filename; |
614 | 0 | RETURN_IF_ERROR( |
615 | 0 | _snapshot_loader._replace_tablet_id(filename, _local_tablet_id, &local_filename)); |
616 | 0 | std::string local_file_path = _local_path + "/" + local_filename; |
617 | |
|
618 | 0 | RETURN_IF_ERROR( |
619 | 0 | _download_http_file(data_dir, remote_file_url, local_file_path, remote_filestat)); |
620 | 0 | total_file_size += file_size; |
621 | | |
622 | | // local_files always keep the updated local files |
623 | 0 | _local_files[filename] = LocalFileStat {file_size, remote_file_md5}; |
624 | 0 | } |
625 | | |
626 | 0 | uint64_t total_time_ms = watch.elapsed_time() / 1000 / 1000; |
627 | 0 | total_time_ms = total_time_ms > 0 ? total_time_ms : 0; |
628 | 0 | double copy_rate = 0.0; |
629 | 0 | if (total_time_ms > 0) { |
630 | 0 | copy_rate = total_file_size / ((double)total_time_ms) / 1000; |
631 | 0 | } |
632 | 0 | LOG(INFO) << fmt::format( |
633 | 0 | "succeed to copy remote tablet {} to local tablet {}, total downloading {} files, " |
634 | 0 | "total file size: {} B, cost: {} ms, rate: {} MB/s", |
635 | 0 | _remote_tablet_id, _local_tablet_id, _need_download_files.size(), total_file_size, |
636 | 0 | total_time_ms, copy_rate); |
637 | |
|
638 | 0 | return Status::OK(); |
639 | 0 | } |
640 | | |
641 | 1 | Status SnapshotHttpDownloader::_install_remote_hdr_file() { |
642 | 1 | std::string local_hdr_filename; |
643 | 1 | RETURN_IF_ERROR(_snapshot_loader._replace_tablet_id(_remote_hdr_filename, _local_tablet_id, |
644 | 1 | &local_hdr_filename)); |
645 | 1 | std::string local_hdr_file_path = _local_path + "/" + local_hdr_filename; |
646 | | |
647 | 1 | auto status = io::global_local_filesystem()->rename(_tmp_hdr_file, local_hdr_file_path); |
648 | 1 | if (!status.ok()) { |
649 | 0 | LOG(WARNING) << "failed to install remote hdr file from: " << _tmp_hdr_file << " to" |
650 | 0 | << local_hdr_file_path << ", error: " << status.to_string(); |
651 | 0 | return Status::RuntimeError("failed install remote hdr file {} from tmp {}, error: {}", |
652 | 0 | local_hdr_file_path, _tmp_hdr_file, status.to_string()); |
653 | 0 | } |
654 | | |
655 | | // also save the hdr file into remote files. |
656 | 1 | _remote_files[_remote_hdr_filename] = _remote_hdr_filestat; |
657 | | |
658 | 1 | return Status::OK(); |
659 | 1 | } |
660 | | |
661 | 1 | Status SnapshotHttpDownloader::_delete_orphan_files() { |
662 | | // local_files: contain all remote files and local files |
663 | | // finally, delete local files which are not in remote |
664 | 3 | for (const auto& [local_file, local_filestat] : _local_files) { |
665 | | // replace the tablet id in local file name with the remote tablet id, |
666 | | // in order to compare the file name. |
667 | 3 | std::string new_name; |
668 | 3 | Status st = _snapshot_loader._replace_tablet_id(local_file, _remote_tablet_id, &new_name); |
669 | 3 | if (!st.ok()) { |
670 | 0 | LOG(WARNING) << "failed to replace tablet id. unknown local file: " << st |
671 | 0 | << ". ignore it"; |
672 | 0 | continue; |
673 | 0 | } |
674 | 3 | VLOG_CRITICAL << "new file name after replace tablet id: " << new_name; |
675 | 3 | const auto& find = _remote_files.find(new_name); |
676 | 3 | if (find != _remote_files.end()) { |
677 | 2 | continue; |
678 | 2 | } |
679 | | |
680 | | // delete |
681 | 1 | std::string full_local_file = _local_path + "/" + local_file; |
682 | 1 | LOG(INFO) << "begin to delete local snapshot file: " << full_local_file |
683 | 1 | << ", it does not exist in remote"; |
684 | 1 | if (remove(full_local_file.c_str()) != 0) { |
685 | 0 | LOG(WARNING) << "failed to delete unknown local file: " << full_local_file |
686 | 0 | << ", error: " << strerror(errno) << ", file size: " << local_filestat.size |
687 | 0 | << ", ignore it"; |
688 | 0 | } |
689 | 1 | } |
690 | 1 | return Status::OK(); |
691 | 1 | } |
692 | | |
693 | 1 | Status SnapshotHttpDownloader::download() { |
694 | | // Take a lock to protect the local snapshot path. |
695 | 1 | auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(_local_path); |
696 | | |
697 | | // Step 1: Validate local tablet snapshot paths |
698 | 1 | bool res = true; |
699 | 1 | RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(_local_path, &res)); |
700 | 1 | if (!res) { |
701 | 0 | std::string msg = |
702 | 0 | fmt::format("snapshot path is not directory or does not exist: {}", _local_path); |
703 | 0 | LOG(WARNING) << msg; |
704 | 0 | return Status::RuntimeError(std::move(msg)); |
705 | 0 | } |
706 | | |
707 | | // Step 2: get all local files |
708 | 1 | RETURN_IF_ERROR(_load_existing_files()); |
709 | | |
710 | | // Step 3: Validate remote tablet snapshot paths && remote files map |
711 | 1 | RETURN_IF_ERROR(_list_remote_files()); |
712 | | |
713 | | // Step 4: download hdr file to a tmp file |
714 | 1 | RETURN_IF_ERROR(_download_hdr_file()); |
715 | | |
716 | | // Step 5: link same rowset files, if local tablet meta file exists |
717 | 1 | if (!_local_hdr_filename.empty()) { |
718 | 1 | RETURN_IF_ERROR(_link_same_rowset_files()); |
719 | 1 | } |
720 | | |
721 | | // Step 6: get all remote file stats |
722 | 1 | RETURN_IF_ERROR(_get_remote_file_stats()); |
723 | | |
724 | | // Step 7: get all need download files & download them |
725 | 1 | _get_need_download_files(); |
726 | 1 | if (!_need_download_files.empty()) { |
727 | 0 | RETURN_IF_ERROR(_download_files()); |
728 | 0 | } |
729 | | |
730 | | // Step 8: install remote hdr file from tmp file |
731 | 1 | RETURN_IF_ERROR(_install_remote_hdr_file()); |
732 | | |
733 | | // Step 9: delete orphan files |
734 | 1 | RETURN_IF_ERROR(_delete_orphan_files()); |
735 | | |
736 | 1 | return Status::OK(); |
737 | 1 | } |
738 | | |
739 | | SnapshotLoader::SnapshotLoader(StorageEngine& engine, ExecEnv* env, int64_t job_id, int64_t task_id, |
740 | | const TNetworkAddress& broker_addr, |
741 | | const std::map<std::string, std::string>& prop) |
742 | | : _engine(engine), |
743 | | _env(env), |
744 | | _job_id(job_id), |
745 | | _task_id(task_id), |
746 | | _broker_addr(broker_addr), |
747 | 4 | _prop(prop) {} |
748 | | |
749 | 0 | Status SnapshotLoader::init(TStorageBackendType::type type, const std::string& location) { |
750 | 0 | if (TStorageBackendType::type::S3 == type) { |
751 | 0 | S3Conf s3_conf; |
752 | 0 | S3URI s3_uri(location); |
753 | 0 | RETURN_IF_ERROR(s3_uri.parse()); |
754 | 0 | RETURN_IF_ERROR(S3ClientFactory::convert_properties_to_s3_conf(_prop, s3_uri, &s3_conf)); |
755 | 0 | _remote_fs = |
756 | 0 | DORIS_TRY(io::S3FileSystem::create(std::move(s3_conf), io::FileSystem::TMP_FS_ID)); |
757 | 0 | } else if (TStorageBackendType::type::HDFS == type) { |
758 | 0 | THdfsParams hdfs_params = parse_properties(_prop); |
759 | 0 | _remote_fs = DORIS_TRY(io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name, |
760 | 0 | io::FileSystem::TMP_FS_ID, nullptr)); |
761 | 0 | } else if (TStorageBackendType::type::BROKER == type) { |
762 | 0 | std::shared_ptr<io::BrokerFileSystem> fs; |
763 | 0 | _remote_fs = DORIS_TRY( |
764 | 0 | io::BrokerFileSystem::create(_broker_addr, _prop, io::FileSystem::TMP_FS_ID)); |
765 | 0 | } else { |
766 | 0 | return Status::InternalError("Unknown storage type: {}", type); |
767 | 0 | } |
768 | 0 | return Status::OK(); |
769 | 0 | } |
770 | | |
771 | 4 | SnapshotLoader::~SnapshotLoader() = default; |
772 | | |
773 | | Status SnapshotLoader::upload(const std::map<std::string, std::string>& src_to_dest_path, |
774 | 0 | std::map<int64_t, std::vector<std::string>>* tablet_files) { |
775 | 0 | if (!_remote_fs) { |
776 | 0 | return Status::InternalError("Storage backend not initialized."); |
777 | 0 | } |
778 | 0 | LOG(INFO) << "begin to upload snapshot files. num: " << src_to_dest_path.size() |
779 | 0 | << ", broker addr: " << _broker_addr << ", job: " << _job_id << ", task" << _task_id; |
780 | | |
781 | | // check if job has already been cancelled |
782 | 0 | int tmp_counter = 1; |
783 | 0 | RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::UPLOAD)); |
784 | | |
785 | 0 | Status status = Status::OK(); |
786 | | // 1. validate local tablet snapshot paths |
787 | 0 | RETURN_IF_ERROR(_check_local_snapshot_paths(src_to_dest_path, true)); |
788 | | |
789 | | // 2. for each src path, upload it to remote storage |
790 | | // we report to frontend for every 10 files, and we will cancel the job if |
791 | | // the job has already been cancelled in frontend. |
792 | 0 | int report_counter = 0; |
793 | 0 | int total_num = src_to_dest_path.size(); |
794 | 0 | int finished_num = 0; |
795 | 0 | for (const auto& iter : src_to_dest_path) { |
796 | 0 | const std::string& src_path = iter.first; |
797 | 0 | const std::string& dest_path = iter.second; |
798 | | |
799 | | // Take a lock to protect the local snapshot path. |
800 | 0 | auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(src_path); |
801 | |
|
802 | 0 | int64_t tablet_id = 0; |
803 | 0 | int32_t schema_hash = 0; |
804 | 0 | RETURN_IF_ERROR( |
805 | 0 | _get_tablet_id_and_schema_hash_from_file_path(src_path, &tablet_id, &schema_hash)); |
806 | | |
807 | | // 2.1 get existing files from remote path |
808 | 0 | std::map<std::string, FileStat> remote_files; |
809 | 0 | RETURN_IF_ERROR(_list_with_checksum(dest_path, &remote_files)); |
810 | | |
811 | 0 | for (auto& tmp : remote_files) { |
812 | 0 | VLOG_CRITICAL << "get remote file: " << tmp.first << ", checksum: " << tmp.second.md5; |
813 | 0 | } |
814 | | |
815 | | // 2.2 list local files |
816 | 0 | std::vector<std::string> local_files; |
817 | 0 | std::vector<std::string> local_files_with_checksum; |
818 | 0 | RETURN_IF_ERROR(_get_existing_files_from_local(src_path, &local_files)); |
819 | | |
820 | | // 2.3 iterate local files |
821 | 0 | for (auto& local_file : local_files) { |
822 | 0 | RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num, |
823 | 0 | TTaskType::type::UPLOAD)); |
824 | | |
825 | | // calc md5sum of localfile |
826 | 0 | std::string md5sum; |
827 | 0 | RETURN_IF_ERROR( |
828 | 0 | io::global_local_filesystem()->md5sum(src_path + "/" + local_file, &md5sum)); |
829 | 0 | VLOG_CRITICAL << "get file checksum: " << local_file << ": " << md5sum; |
830 | 0 | local_files_with_checksum.push_back(local_file + "." + md5sum); |
831 | | |
832 | | // check if this local file need upload |
833 | 0 | bool need_upload = false; |
834 | 0 | auto find = remote_files.find(local_file); |
835 | 0 | if (find != remote_files.end()) { |
836 | 0 | if (md5sum != find->second.md5) { |
837 | | // remote storage file exist, but with different checksum |
838 | 0 | LOG(WARNING) << "remote file checksum is invalid. remote: " << find->first |
839 | 0 | << ", local: " << md5sum; |
840 | | // TODO(cmy): save these files and delete them later |
841 | 0 | need_upload = true; |
842 | 0 | } |
843 | 0 | } else { |
844 | 0 | need_upload = true; |
845 | 0 | } |
846 | |
|
847 | 0 | if (!need_upload) { |
848 | 0 | VLOG_CRITICAL << "file exist in remote path, no need to upload: " << local_file; |
849 | 0 | continue; |
850 | 0 | } |
851 | | |
852 | | // upload |
853 | 0 | std::string remote_path = dest_path + '/' + local_file; |
854 | 0 | std::string local_path = src_path + '/' + local_file; |
855 | 0 | RETURN_IF_ERROR(upload_with_checksum(*_remote_fs, local_path, remote_path, md5sum)); |
856 | 0 | } // end for each tablet's local files |
857 | | |
858 | 0 | tablet_files->emplace(tablet_id, local_files_with_checksum); |
859 | 0 | finished_num++; |
860 | 0 | LOG(INFO) << "finished to write tablet to remote. local path: " << src_path |
861 | 0 | << ", remote path: " << dest_path; |
862 | 0 | } // end for each tablet path |
863 | | |
864 | 0 | LOG(INFO) << "finished to upload snapshots. job: " << _job_id << ", task id: " << _task_id; |
865 | 0 | return status; |
866 | 0 | } |
867 | | |
868 | | /* |
869 | | * Download snapshot files from remote. |
870 | | * After downloaded, the local dir should contains all files existing in remote, |
871 | | * may also contains several useless files. |
872 | | */ |
873 | | Status SnapshotLoader::download(const std::map<std::string, std::string>& src_to_dest_path, |
874 | 0 | std::vector<int64_t>* downloaded_tablet_ids) { |
875 | 0 | if (!_remote_fs) { |
876 | 0 | return Status::InternalError("Storage backend not initialized."); |
877 | 0 | } |
878 | 0 | LOG(INFO) << "begin to download snapshot files. num: " << src_to_dest_path.size() |
879 | 0 | << ", broker addr: " << _broker_addr << ", job: " << _job_id |
880 | 0 | << ", task id: " << _task_id; |
881 | | |
882 | | // check if job has already been cancelled |
883 | 0 | int tmp_counter = 1; |
884 | 0 | RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::DOWNLOAD)); |
885 | | |
886 | 0 | Status status = Status::OK(); |
887 | | // 1. validate local tablet snapshot paths |
888 | 0 | RETURN_IF_ERROR(_check_local_snapshot_paths(src_to_dest_path, false)); |
889 | | |
890 | | // 2. for each src path, download it to local storage |
891 | 0 | int report_counter = 0; |
892 | 0 | int total_num = src_to_dest_path.size(); |
893 | 0 | int finished_num = 0; |
894 | 0 | for (const auto& iter : src_to_dest_path) { |
895 | 0 | const std::string& remote_path = iter.first; |
896 | 0 | const std::string& local_path = iter.second; |
897 | | |
898 | | // Take a lock to protect the local snapshot path. |
899 | 0 | auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(local_path); |
900 | |
|
901 | 0 | int64_t local_tablet_id = 0; |
902 | 0 | int32_t schema_hash = 0; |
903 | 0 | RETURN_IF_ERROR(_get_tablet_id_and_schema_hash_from_file_path(local_path, &local_tablet_id, |
904 | 0 | &schema_hash)); |
905 | 0 | if (downloaded_tablet_ids != nullptr) { |
906 | 0 | downloaded_tablet_ids->push_back(local_tablet_id); |
907 | 0 | } |
908 | |
|
909 | 0 | int64_t remote_tablet_id; |
910 | 0 | RETURN_IF_ERROR(_get_tablet_id_from_remote_path(remote_path, &remote_tablet_id)); |
911 | 0 | VLOG_CRITICAL << "get local tablet id: " << local_tablet_id |
912 | 0 | << ", schema hash: " << schema_hash |
913 | 0 | << ", remote tablet id: " << remote_tablet_id; |
914 | | |
915 | | // 2.1. get local files |
916 | 0 | std::vector<std::string> local_files; |
917 | 0 | RETURN_IF_ERROR(_get_existing_files_from_local(local_path, &local_files)); |
918 | | |
919 | | // 2.2. get remote files |
920 | 0 | std::map<std::string, FileStat> remote_files; |
921 | 0 | RETURN_IF_ERROR(_list_with_checksum(remote_path, &remote_files)); |
922 | 0 | if (remote_files.empty()) { |
923 | 0 | std::stringstream ss; |
924 | 0 | ss << "get nothing from remote path: " << remote_path; |
925 | 0 | LOG(WARNING) << ss.str(); |
926 | 0 | return Status::InternalError(ss.str()); |
927 | 0 | } |
928 | | |
929 | 0 | TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(local_tablet_id); |
930 | 0 | if (tablet == nullptr) { |
931 | 0 | std::stringstream ss; |
932 | 0 | ss << "failed to get local tablet: " << local_tablet_id; |
933 | 0 | LOG(WARNING) << ss.str(); |
934 | 0 | return Status::InternalError(ss.str()); |
935 | 0 | } |
936 | 0 | DataDir* data_dir = tablet->data_dir(); |
937 | |
|
938 | 0 | for (auto& iter : remote_files) { |
939 | 0 | RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num, |
940 | 0 | TTaskType::type::DOWNLOAD)); |
941 | | |
942 | 0 | bool need_download = false; |
943 | 0 | const std::string& remote_file = iter.first; |
944 | 0 | const FileStat& file_stat = iter.second; |
945 | 0 | auto find = std::find(local_files.begin(), local_files.end(), remote_file); |
946 | 0 | if (find == local_files.end()) { |
947 | | // remote file does not exist in local, download it |
948 | 0 | need_download = true; |
949 | 0 | } else { |
950 | 0 | if (_end_with(remote_file, ".hdr")) { |
951 | | // this is a header file, download it. |
952 | 0 | need_download = true; |
953 | 0 | } else { |
954 | | // check checksum |
955 | 0 | std::string local_md5sum; |
956 | 0 | Status st = io::global_local_filesystem()->md5sum( |
957 | 0 | local_path + "/" + remote_file, &local_md5sum); |
958 | 0 | if (!st.ok()) { |
959 | 0 | LOG(WARNING) << "failed to get md5sum of local file: " << remote_file |
960 | 0 | << ". msg: " << st << ". download it"; |
961 | 0 | need_download = true; |
962 | 0 | } else { |
963 | 0 | VLOG_CRITICAL << "get local file checksum: " << remote_file << ": " |
964 | 0 | << local_md5sum; |
965 | 0 | if (file_stat.md5 != local_md5sum) { |
966 | | // file's checksum does not equal, download it. |
967 | 0 | need_download = true; |
968 | 0 | } |
969 | 0 | } |
970 | 0 | } |
971 | 0 | } |
972 | |
|
973 | 0 | if (!need_download) { |
974 | 0 | LOG(INFO) << "remote file already exist in local, no need to download." |
975 | 0 | << ", file: " << remote_file; |
976 | 0 | continue; |
977 | 0 | } |
978 | | |
979 | | // begin to download |
980 | 0 | std::string full_remote_file = remote_path + "/" + remote_file + "." + file_stat.md5; |
981 | 0 | std::string local_file_name; |
982 | | // we need to replace the tablet_id in remote file name with local tablet id |
983 | 0 | RETURN_IF_ERROR(_replace_tablet_id(remote_file, local_tablet_id, &local_file_name)); |
984 | 0 | std::string full_local_file = local_path + "/" + local_file_name; |
985 | 0 | LOG(INFO) << "begin to download from " << full_remote_file << " to " << full_local_file; |
986 | 0 | size_t file_len = file_stat.size; |
987 | | |
988 | | // check disk capacity |
989 | 0 | if (data_dir->reach_capacity_limit(file_len)) { |
990 | 0 | return Status::Error<ErrorCode::EXCEEDED_LIMIT>( |
991 | 0 | "reach the capacity limit of path {}, file_size={}", data_dir->path(), |
992 | 0 | file_len); |
993 | 0 | } |
994 | | // remove file which will be downloaded now. |
995 | | // this file will be added to local_files if it be downloaded successfully. |
996 | 0 | if (find != local_files.end()) { |
997 | 0 | local_files.erase(find); |
998 | 0 | } |
999 | 0 | RETURN_IF_ERROR(_remote_fs->download(full_remote_file, full_local_file)); |
1000 | | |
1001 | | // 3. check md5 of the downloaded file |
1002 | 0 | std::string downloaded_md5sum; |
1003 | 0 | RETURN_IF_ERROR( |
1004 | 0 | io::global_local_filesystem()->md5sum(full_local_file, &downloaded_md5sum)); |
1005 | 0 | VLOG_CRITICAL << "get downloaded file checksum: " << full_local_file << ": " |
1006 | 0 | << downloaded_md5sum; |
1007 | 0 | if (downloaded_md5sum != file_stat.md5) { |
1008 | 0 | std::stringstream ss; |
1009 | 0 | ss << "invalid md5 of downloaded file: " << full_local_file |
1010 | 0 | << ", expected: " << file_stat.md5 << ", get: " << downloaded_md5sum; |
1011 | 0 | LOG(WARNING) << ss.str(); |
1012 | 0 | return Status::InternalError(ss.str()); |
1013 | 0 | } |
1014 | | |
1015 | | // local_files always keep the updated local files |
1016 | 0 | local_files.push_back(local_file_name); |
1017 | 0 | LOG(INFO) << "finished to download file via broker. file: " << full_local_file |
1018 | 0 | << ", length: " << file_len; |
1019 | 0 | } // end for all remote files |
1020 | | |
1021 | | // finally, delete local files which are not in remote |
1022 | 0 | for (const auto& local_file : local_files) { |
1023 | | // replace the tablet id in local file name with the remote tablet id, |
1024 | | // in order to compare the file name. |
1025 | 0 | std::string new_name; |
1026 | 0 | Status st = _replace_tablet_id(local_file, remote_tablet_id, &new_name); |
1027 | 0 | if (!st.ok()) { |
1028 | 0 | LOG(WARNING) << "failed to replace tablet id. unknown local file: " << st |
1029 | 0 | << ". ignore it"; |
1030 | 0 | continue; |
1031 | 0 | } |
1032 | 0 | VLOG_CRITICAL << "new file name after replace tablet id: " << new_name; |
1033 | 0 | const auto& find = remote_files.find(new_name); |
1034 | 0 | if (find != remote_files.end()) { |
1035 | 0 | continue; |
1036 | 0 | } |
1037 | | |
1038 | | // delete |
1039 | 0 | std::string full_local_file = local_path + "/" + local_file; |
1040 | 0 | VLOG_CRITICAL << "begin to delete local snapshot file: " << full_local_file |
1041 | 0 | << ", it does not exist in remote"; |
1042 | 0 | if (remove(full_local_file.c_str()) != 0) { |
1043 | 0 | LOG(WARNING) << "failed to delete unknown local file: " << full_local_file |
1044 | 0 | << ", ignore it"; |
1045 | 0 | } |
1046 | 0 | } |
1047 | |
|
1048 | 0 | finished_num++; |
1049 | 0 | } // end for src_to_dest_path |
1050 | | |
1051 | 0 | LOG(INFO) << "finished to download snapshots. job: " << _job_id << ", task id: " << _task_id; |
1052 | 0 | return status; |
1053 | 0 | } |
1054 | | |
1055 | | Status SnapshotLoader::remote_http_download( |
1056 | | const std::vector<TRemoteTabletSnapshot>& remote_tablet_snapshots, |
1057 | 1 | std::vector<int64_t>* downloaded_tablet_ids) { |
1058 | | // check if job has already been cancelled |
1059 | | |
1060 | | #ifndef BE_TEST |
1061 | | int tmp_counter = 1; |
1062 | | RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::DOWNLOAD)); |
1063 | | #endif |
1064 | 1 | Status status = Status::OK(); |
1065 | | |
1066 | 1 | for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) { |
1067 | 1 | auto local_tablet_id = remote_tablet_snapshot.local_tablet_id; |
1068 | 1 | const auto& local_path = remote_tablet_snapshot.local_snapshot_path; |
1069 | 1 | const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path; |
1070 | | |
1071 | 1 | LOG(INFO) << fmt::format( |
1072 | 1 | "download snapshots via http. job: {}, task id: {}, local dir: {}, remote dir: {}", |
1073 | 1 | _job_id, _task_id, local_path, remote_path); |
1074 | | |
1075 | 1 | TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(local_tablet_id); |
1076 | 1 | if (tablet == nullptr) { |
1077 | 0 | std::string msg = fmt::format("failed to get local tablet: {}", local_tablet_id); |
1078 | 0 | LOG(WARNING) << msg; |
1079 | 0 | return Status::RuntimeError(std::move(msg)); |
1080 | 0 | } |
1081 | | |
1082 | 1 | if (downloaded_tablet_ids != nullptr) { |
1083 | 1 | downloaded_tablet_ids->push_back(local_tablet_id); |
1084 | 1 | } |
1085 | | |
1086 | 1 | SnapshotHttpDownloader downloader(remote_tablet_snapshot, std::move(tablet), *this); |
1087 | | #ifndef BE_TEST |
1088 | | int report_counter = 0; |
1089 | | int finished_num = 0; |
1090 | | int total_num = remote_tablet_snapshots.size(); |
1091 | | downloader.set_report_progress_callback( |
1092 | | [this, &report_counter, &finished_num, &total_num]() { |
1093 | | return _report_every(10, &report_counter, finished_num, total_num, |
1094 | | TTaskType::type::DOWNLOAD); |
1095 | | }); |
1096 | | #endif |
1097 | | |
1098 | 1 | RETURN_IF_ERROR(downloader.download()); |
1099 | 1 | _set_http_download_files_num(downloader.get_download_file_num()); |
1100 | | |
1101 | | #ifndef BE_TEST |
1102 | | ++finished_num; |
1103 | | #endif |
1104 | 1 | } |
1105 | | |
1106 | 1 | LOG(INFO) << "finished to download snapshots. job: " << _job_id << ", task id: " << _task_id; |
1107 | 1 | return status; |
1108 | 1 | } |
1109 | | |
1110 | | // move the snapshot files in snapshot_path |
1111 | | // to tablet_path |
1112 | | // If overwrite, just replace the tablet_path with snapshot_path, |
1113 | | // else: (TODO) |
1114 | | // |
1115 | | // MUST hold tablet's header lock, push lock, cumulative lock and base compaction lock |
1116 | | Status SnapshotLoader::move(const std::string& snapshot_path, TabletSharedPtr tablet, |
1117 | 2 | bool overwrite) { |
1118 | | // Take a lock to protect the local snapshot path. |
1119 | 2 | auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(snapshot_path); |
1120 | | |
1121 | 2 | auto tablet_path = tablet->tablet_path(); |
1122 | 2 | auto store_path = tablet->data_dir()->path(); |
1123 | 2 | LOG(INFO) << "begin to move snapshot files. from: " << snapshot_path << ", to: " << tablet_path |
1124 | 2 | << ", store: " << store_path << ", job: " << _job_id << ", task id: " << _task_id; |
1125 | | |
1126 | 2 | Status status = Status::OK(); |
1127 | | |
1128 | | // validate snapshot_path and tablet_path |
1129 | 2 | int64_t snapshot_tablet_id = 0; |
1130 | 2 | int32_t snapshot_schema_hash = 0; |
1131 | 2 | RETURN_IF_ERROR(_get_tablet_id_and_schema_hash_from_file_path( |
1132 | 2 | snapshot_path, &snapshot_tablet_id, &snapshot_schema_hash)); |
1133 | | |
1134 | 2 | int64_t tablet_id = 0; |
1135 | 2 | int32_t schema_hash = 0; |
1136 | 2 | RETURN_IF_ERROR( |
1137 | 2 | _get_tablet_id_and_schema_hash_from_file_path(tablet_path, &tablet_id, &schema_hash)); |
1138 | | |
1139 | 2 | if (tablet_id != snapshot_tablet_id || schema_hash != snapshot_schema_hash) { |
1140 | 0 | std::stringstream ss; |
1141 | 0 | ss << "path does not match. snapshot: " << snapshot_path |
1142 | 0 | << ", tablet path: " << tablet_path; |
1143 | 0 | LOG(WARNING) << ss.str(); |
1144 | 0 | return Status::InternalError(ss.str()); |
1145 | 0 | } |
1146 | | |
1147 | 2 | DataDir* store = _engine.get_store(store_path); |
1148 | 2 | if (store == nullptr) { |
1149 | 0 | std::stringstream ss; |
1150 | 0 | ss << "failed to get store by path: " << store_path; |
1151 | 0 | LOG(WARNING) << ss.str(); |
1152 | 0 | return Status::InternalError(ss.str()); |
1153 | 0 | } |
1154 | | |
1155 | 2 | if (!std::filesystem::exists(tablet_path)) { |
1156 | 0 | std::stringstream ss; |
1157 | 0 | ss << "tablet path does not exist: " << tablet_path; |
1158 | 0 | LOG(WARNING) << ss.str(); |
1159 | 0 | return Status::InternalError(ss.str()); |
1160 | 0 | } |
1161 | | |
1162 | 2 | if (!std::filesystem::exists(snapshot_path)) { |
1163 | 0 | std::stringstream ss; |
1164 | 0 | ss << "snapshot path does not exist: " << snapshot_path; |
1165 | 0 | LOG(WARNING) << ss.str(); |
1166 | 0 | return Status::InternalError(ss.str()); |
1167 | 0 | } |
1168 | | |
1169 | 2 | std::string loaded_tag_path = get_loaded_tag_path(snapshot_path); |
1170 | 2 | bool already_loaded = false; |
1171 | 2 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(loaded_tag_path, &already_loaded)); |
1172 | 2 | if (already_loaded) { |
1173 | 1 | LOG(INFO) << "snapshot path already moved: " << snapshot_path; |
1174 | 1 | return Status::OK(); |
1175 | 1 | } |
1176 | | |
1177 | | // rename the rowset ids and tabletid info in rowset meta |
1178 | 1 | auto res = _engine.snapshot_mgr()->convert_rowset_ids(snapshot_path, tablet_id, |
1179 | 1 | tablet->replica_id(), tablet->table_id(), |
1180 | 1 | tablet->partition_id(), schema_hash); |
1181 | 1 | if (!res.has_value()) [[unlikely]] { |
1182 | 0 | auto err_msg = |
1183 | 0 | fmt::format("failed to convert rowsetids in snapshot: {}, tablet path: {}, err: {}", |
1184 | 0 | snapshot_path, tablet_path, res.error()); |
1185 | 0 | LOG(WARNING) << err_msg; |
1186 | 0 | return Status::InternalError(err_msg); |
1187 | 0 | } |
1188 | | |
1189 | 1 | if (!overwrite) { |
1190 | 0 | throw Exception(Status::FatalError("only support overwrite now")); |
1191 | 0 | } |
1192 | | |
1193 | | // Medium migration/clone/checkpoint/compaction may change or check the |
1194 | | // files and tablet meta, so we need to take these locks. |
1195 | 1 | std::unique_lock migration_lock(tablet->get_migration_lock(), std::try_to_lock); |
1196 | 1 | std::unique_lock base_compact_lock(tablet->get_base_compaction_lock(), std::try_to_lock); |
1197 | 1 | std::unique_lock cumu_compact_lock(tablet->get_cumulative_compaction_lock(), std::try_to_lock); |
1198 | 1 | std::unique_lock cold_compact_lock(tablet->get_cold_compaction_lock(), std::try_to_lock); |
1199 | 1 | std::unique_lock build_idx_lock(tablet->get_build_inverted_index_lock(), std::try_to_lock); |
1200 | 1 | std::unique_lock meta_store_lock(tablet->get_meta_store_lock(), std::try_to_lock); |
1201 | 1 | if (!migration_lock.owns_lock() || !base_compact_lock.owns_lock() || |
1202 | 1 | !cumu_compact_lock.owns_lock() || !cold_compact_lock.owns_lock() || |
1203 | 1 | !build_idx_lock.owns_lock() || !meta_store_lock.owns_lock()) { |
1204 | | // This error should be retryable |
1205 | 0 | auto status = Status::ObtainLockFailed("failed to get tablet locks, tablet: {}", tablet_id); |
1206 | 0 | LOG(WARNING) << status << ", snapshot path: " << snapshot_path |
1207 | 0 | << ", tablet path: " << tablet_path; |
1208 | 0 | return status; |
1209 | 0 | } |
1210 | | |
1211 | 1 | std::vector<std::string> snapshot_files; |
1212 | 1 | RETURN_IF_ERROR(_get_existing_files_from_local(snapshot_path, &snapshot_files)); |
1213 | | |
1214 | | // FIXME: the below logic will demage the tablet files if failed in the middle. |
1215 | | |
1216 | | // 1. simply delete the old dir and replace it with the snapshot dir |
1217 | 1 | try { |
1218 | | // This remove seems soft enough, because we already get |
1219 | | // tablet id and schema hash from this path, which |
1220 | | // means this path is a valid path. |
1221 | 1 | std::filesystem::remove_all(tablet_path); |
1222 | 1 | VLOG_CRITICAL << "remove dir: " << tablet_path; |
1223 | 1 | std::filesystem::create_directory(tablet_path); |
1224 | 1 | VLOG_CRITICAL << "re-create dir: " << tablet_path; |
1225 | 1 | } catch (const std::filesystem::filesystem_error& e) { |
1226 | 0 | std::stringstream ss; |
1227 | 0 | ss << "failed to move tablet path: " << tablet_path << ". err: " << e.what(); |
1228 | 0 | LOG(WARNING) << ss.str(); |
1229 | 0 | return Status::InternalError(ss.str()); |
1230 | 0 | } |
1231 | | |
1232 | | // link files one by one |
1233 | | // files in snapshot dir will be moved in snapshot clean process |
1234 | 1 | std::vector<std::string> linked_files; |
1235 | 2 | for (auto& file : snapshot_files) { |
1236 | 2 | auto full_src_path = fmt::format("{}/{}", snapshot_path, file); |
1237 | 2 | auto full_dest_path = fmt::format("{}/{}", tablet_path, file); |
1238 | 2 | if (link(full_src_path.c_str(), full_dest_path.c_str()) != 0) { |
1239 | 0 | LOG(WARNING) << "failed to link file from " << full_src_path << " to " << full_dest_path |
1240 | 0 | << ", err: " << std::strerror(errno); |
1241 | | |
1242 | | // clean the already linked files |
1243 | 0 | for (auto& linked_file : linked_files) { |
1244 | 0 | remove(linked_file.c_str()); |
1245 | 0 | } |
1246 | |
|
1247 | 0 | return Status::InternalError("move tablet failed"); |
1248 | 0 | } |
1249 | 2 | linked_files.push_back(full_dest_path); |
1250 | 2 | VLOG_CRITICAL << "link file from " << full_src_path << " to " << full_dest_path; |
1251 | 2 | } |
1252 | | |
1253 | | // snapshot loader not need to change tablet uid |
1254 | | // fixme: there is no header now and can not call load_one_tablet here |
1255 | | // reload header |
1256 | 1 | Status ost = _engine.tablet_manager()->load_tablet_from_dir(store, tablet_id, schema_hash, |
1257 | 1 | tablet_path, true); |
1258 | 1 | if (!ost.ok()) { |
1259 | 0 | std::stringstream ss; |
1260 | 0 | ss << "failed to reload header of tablet: " << tablet_id; |
1261 | 0 | LOG(WARNING) << ss.str(); |
1262 | 0 | return Status::InternalError(ss.str()); |
1263 | 0 | } |
1264 | | |
1265 | | // mark the snapshot path as loaded |
1266 | 1 | RETURN_IF_ERROR(write_loaded_tag(snapshot_path, tablet_id)); |
1267 | | |
1268 | 1 | LOG(INFO) << "finished to reload header of tablet: " << tablet_id; |
1269 | | |
1270 | 1 | return status; |
1271 | 1 | } |
1272 | | |
1273 | | Status SnapshotLoader::_replace_tablet_id(const std::string& file_name, int64_t tablet_id, |
1274 | 8 | std::string* new_file_name) { |
1275 | | // eg: |
1276 | | // 10007.hdr |
1277 | | // 10007_2_2_0_0.idx |
1278 | | // 10007_2_2_0_0.dat |
1279 | 8 | if (_end_with(file_name, ".hdr")) { |
1280 | 3 | std::stringstream ss; |
1281 | 3 | ss << tablet_id << ".hdr"; |
1282 | 3 | *new_file_name = ss.str(); |
1283 | 3 | return Status::OK(); |
1284 | 5 | } else if (_end_with(file_name, ".idx") || _end_with(file_name, ".dat")) { |
1285 | 4 | *new_file_name = file_name; |
1286 | 4 | return Status::OK(); |
1287 | 4 | } else { |
1288 | 1 | return Status::InternalError("invalid tablet file name: {}", file_name); |
1289 | 1 | } |
1290 | 8 | } |
1291 | | |
1292 | | Status SnapshotLoader::_get_tablet_id_and_schema_hash_from_file_path(const std::string& src_path, |
1293 | | int64_t* tablet_id, |
1294 | 6 | int32_t* schema_hash) { |
1295 | | // path should be like: /path/.../tablet_id/schema_hash |
1296 | | // we try to extract tablet_id from path |
1297 | 6 | size_t pos = src_path.find_last_of("/"); |
1298 | 6 | if (pos == std::string::npos || pos == src_path.length() - 1) { |
1299 | 1 | return Status::InternalError("failed to get tablet id from path: {}", src_path); |
1300 | 1 | } |
1301 | | |
1302 | 5 | std::string schema_hash_str = src_path.substr(pos + 1); |
1303 | 5 | std::stringstream ss1; |
1304 | 5 | ss1 << schema_hash_str; |
1305 | 5 | ss1 >> *schema_hash; |
1306 | | |
1307 | | // skip schema hash part |
1308 | 5 | size_t pos2 = src_path.find_last_of("/", pos - 1); |
1309 | 5 | if (pos2 == std::string::npos) { |
1310 | 0 | return Status::InternalError("failed to get tablet id from path: {}", src_path); |
1311 | 0 | } |
1312 | | |
1313 | 5 | std::string tablet_str = src_path.substr(pos2 + 1, pos - pos2); |
1314 | 5 | std::stringstream ss2; |
1315 | 5 | ss2 << tablet_str; |
1316 | 5 | ss2 >> *tablet_id; |
1317 | | |
1318 | 5 | VLOG_CRITICAL << "get tablet id " << *tablet_id << ", schema hash: " << *schema_hash |
1319 | 0 | << " from path: " << src_path; |
1320 | 5 | return Status::OK(); |
1321 | 5 | } |
1322 | | |
1323 | | Status SnapshotLoader::_check_local_snapshot_paths( |
1324 | 4 | const std::map<std::string, std::string>& src_to_dest_path, bool check_src) { |
1325 | 4 | bool res = true; |
1326 | 4 | for (const auto& pair : src_to_dest_path) { |
1327 | 4 | std::string path; |
1328 | 4 | if (check_src) { |
1329 | 2 | path = pair.first; |
1330 | 2 | } else { |
1331 | 2 | path = pair.second; |
1332 | 2 | } |
1333 | | |
1334 | 4 | RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(path, &res)); |
1335 | 2 | if (!res) { |
1336 | 0 | std::stringstream ss; |
1337 | 0 | ss << "snapshot path is not directory or does not exist: " << path; |
1338 | 0 | LOG(WARNING) << ss.str(); |
1339 | 0 | return Status::RuntimeError(ss.str()); |
1340 | 0 | } |
1341 | 2 | } |
1342 | 2 | LOG(INFO) << "all local snapshot paths are existing. num: " << src_to_dest_path.size(); |
1343 | 2 | return Status::OK(); |
1344 | 4 | } |
1345 | | |
1346 | | Status SnapshotLoader::_get_existing_files_from_local(const std::string& local_path, |
1347 | 3 | std::vector<std::string>* local_files) { |
1348 | 3 | bool exists = true; |
1349 | 3 | std::vector<io::FileInfo> files; |
1350 | 3 | RETURN_IF_ERROR(io::global_local_filesystem()->list(local_path, true, &files, &exists)); |
1351 | 4 | for (auto& file : files) { |
1352 | 4 | local_files->push_back(file.file_name); |
1353 | 4 | } |
1354 | 3 | LOG(INFO) << "finished to list files in local path: " << local_path |
1355 | 3 | << ", file num: " << local_files->size(); |
1356 | 3 | return Status::OK(); |
1357 | 3 | } |
1358 | | |
1359 | | Status SnapshotLoader::_get_tablet_id_from_remote_path(const std::string& remote_path, |
1360 | 1 | int64_t* tablet_id) { |
1361 | | // eg: |
1362 | | // bos://xxx/../__tbl_10004/__part_10003/__idx_10004/__10005 |
1363 | 1 | size_t pos = remote_path.find_last_of("_"); |
1364 | 1 | if (pos == std::string::npos) { |
1365 | 0 | return Status::InternalError("invalid remove file path: {}", remote_path); |
1366 | 0 | } |
1367 | | |
1368 | 1 | std::string tablet_id_str = remote_path.substr(pos + 1); |
1369 | 1 | std::stringstream ss; |
1370 | 1 | ss << tablet_id_str; |
1371 | 1 | ss >> *tablet_id; |
1372 | | |
1373 | 1 | return Status::OK(); |
1374 | 1 | } |
1375 | | |
1376 | | // only return CANCELLED if FE return that job is cancelled. |
1377 | | // otherwise, return OK |
1378 | | Status SnapshotLoader::_report_every(int report_threshold, int* counter, int32_t finished_num, |
1379 | 0 | int32_t total_num, TTaskType::type type) { |
1380 | 0 | ++*counter; |
1381 | 0 | if (*counter <= report_threshold) { |
1382 | 0 | return Status::OK(); |
1383 | 0 | } |
1384 | | |
1385 | 0 | LOG(INFO) << "report to frontend. job id: " << _job_id << ", task id: " << _task_id |
1386 | 0 | << ", finished num: " << finished_num << ", total num:" << total_num; |
1387 | |
|
1388 | 0 | TNetworkAddress master_addr = _env->cluster_info()->master_fe_addr; |
1389 | |
|
1390 | 0 | TSnapshotLoaderReportRequest request; |
1391 | 0 | request.job_id = _job_id; |
1392 | 0 | request.task_id = _task_id; |
1393 | 0 | request.task_type = type; |
1394 | 0 | request.__set_finished_num(finished_num); |
1395 | 0 | request.__set_total_num(total_num); |
1396 | 0 | TStatus report_st; |
1397 | |
|
1398 | 0 | Status rpcStatus = ThriftRpcHelper::rpc<FrontendServiceClient>( |
1399 | 0 | master_addr.hostname, master_addr.port, |
1400 | 0 | [&request, &report_st](FrontendServiceConnection& client) { |
1401 | 0 | client->snapshotLoaderReport(report_st, request); |
1402 | 0 | }, |
1403 | 0 | 10000); |
1404 | |
|
1405 | 0 | if (!rpcStatus.ok()) { |
1406 | | // rpc failed, ignore |
1407 | 0 | return Status::OK(); |
1408 | 0 | } |
1409 | | |
1410 | | // reset |
1411 | 0 | *counter = 0; |
1412 | 0 | if (report_st.status_code == TStatusCode::CANCELLED) { |
1413 | 0 | LOG(INFO) << "job is cancelled. job id: " << _job_id << ", task id: " << _task_id; |
1414 | 0 | return Status::Cancelled("Cancelled"); |
1415 | 0 | } |
1416 | 0 | return Status::OK(); |
1417 | 0 | } |
1418 | | |
1419 | | Status SnapshotLoader::_list_with_checksum(const std::string& dir, |
1420 | 0 | std::map<std::string, FileStat>* md5_files) { |
1421 | 0 | bool exists = true; |
1422 | 0 | std::vector<io::FileInfo> files; |
1423 | 0 | RETURN_IF_ERROR(_remote_fs->list(dir, true, &files, &exists)); |
1424 | 0 | for (auto& tmp_file : files) { |
1425 | 0 | io::Path path(tmp_file.file_name); |
1426 | 0 | std::string file_name = path.filename(); |
1427 | 0 | size_t pos = file_name.find_last_of("."); |
1428 | 0 | if (pos == std::string::npos || pos == file_name.size() - 1) { |
1429 | | // Not found checksum separator, ignore this file |
1430 | 0 | continue; |
1431 | 0 | } |
1432 | 0 | FileStat stat = {std::string(file_name, 0, pos), std::string(file_name, pos + 1), |
1433 | 0 | tmp_file.file_size}; |
1434 | 0 | md5_files->emplace(std::string(file_name, 0, pos), stat); |
1435 | 0 | } |
1436 | |
|
1437 | 0 | return Status::OK(); |
1438 | 0 | } |
1439 | | |
1440 | | } // end namespace doris |