Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "runtime/snapshot_loader.h" |
19 | | |
20 | | // IWYU pragma: no_include <bthread/errno.h> |
21 | | #include <errno.h> // IWYU pragma: keep |
22 | | #include <fmt/format.h> |
23 | | #include <gen_cpp/AgentService_types.h> |
24 | | #include <gen_cpp/FrontendService.h> |
25 | | #include <gen_cpp/FrontendService_types.h> |
26 | | #include <gen_cpp/HeartbeatService_types.h> |
27 | | #include <gen_cpp/PlanNodes_types.h> |
28 | | #include <gen_cpp/Status_types.h> |
29 | | #include <gen_cpp/Types_types.h> |
30 | | |
31 | | #include <algorithm> |
32 | | #include <cstring> |
33 | | #include <filesystem> |
34 | | #include <istream> |
35 | | #include <unordered_map> |
36 | | #include <utility> |
37 | | |
38 | | #include "common/config.h" |
39 | | #include "common/logging.h" |
40 | | #include "gutil/strings/split.h" |
41 | | #include "http/http_client.h" |
42 | | #include "io/fs/broker_file_system.h" |
43 | | #include "io/fs/file_system.h" |
44 | | #include "io/fs/hdfs_file_system.h" |
45 | | #include "io/fs/local_file_system.h" |
46 | | #include "io/fs/path.h" |
47 | | #include "io/fs/remote_file_system.h" |
48 | | #include "io/fs/s3_file_system.h" |
49 | | #include "io/hdfs_builder.h" |
50 | | #include "olap/data_dir.h" |
51 | | #include "olap/snapshot_manager.h" |
52 | | #include "olap/storage_engine.h" |
53 | | #include "olap/tablet.h" |
54 | | #include "olap/tablet_manager.h" |
55 | | #include "runtime/client_cache.h" |
56 | | #include "runtime/exec_env.h" |
57 | | #include "util/s3_uri.h" |
58 | | #include "util/s3_util.h" |
59 | | #include "util/thrift_rpc_helper.h" |
60 | | |
61 | | namespace doris { |
62 | | |
63 | | struct LocalFileStat { |
64 | | uint64_t size; |
65 | | std::string md5; |
66 | | }; |
67 | | |
68 | | struct RemoteFileStat { |
69 | | std::string url; |
70 | | std::string md5; |
71 | | uint64_t size; |
72 | | }; |
73 | | |
74 | | class SnapshotHttpDownloader { |
75 | | public: |
76 | | SnapshotHttpDownloader(const TRemoteTabletSnapshot& remote_tablet_snapshot, |
77 | | TabletSharedPtr tablet, SnapshotLoader& snapshot_loader) |
78 | | : _tablet(std::move(tablet)), |
79 | | _snapshot_loader(snapshot_loader), |
80 | | _local_tablet_id(remote_tablet_snapshot.local_tablet_id), |
81 | | _remote_tablet_id(remote_tablet_snapshot.remote_tablet_id), |
82 | | _local_path(remote_tablet_snapshot.local_snapshot_path), |
83 | | _remote_path(remote_tablet_snapshot.remote_snapshot_path), |
84 | 0 | _remote_be_addr(remote_tablet_snapshot.remote_be_addr) { |
85 | 0 | auto& token = remote_tablet_snapshot.remote_token; |
86 | 0 | auto& remote_be_addr = remote_tablet_snapshot.remote_be_addr; |
87 | | |
88 | | // HEAD http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180/ |
89 | 0 | _base_url = fmt::format("http://{}:{}/api/_tablet/_download?token={}&channel=ingest_binlog", |
90 | 0 | remote_be_addr.hostname, remote_be_addr.port, token); |
91 | 0 | } |
92 | 0 | ~SnapshotHttpDownloader() = default; |
93 | | SnapshotHttpDownloader(const SnapshotHttpDownloader&) = delete; |
94 | | SnapshotHttpDownloader& operator=(const SnapshotHttpDownloader&) = delete; |
95 | | |
96 | 0 | void set_report_progress_callback(std::function<Status()> report_progress) { |
97 | 0 | _report_progress_callback = std::move(report_progress); |
98 | 0 | } |
99 | | |
100 | | Status download(); |
101 | | |
102 | | private: |
103 | | constexpr static int kDownloadFileMaxRetry = 3; |
104 | | |
105 | | // Load existing files from local snapshot path, compute the md5sum of the files |
106 | | // if enable_download_md5sum_check is true |
107 | | Status _load_existing_files(); |
108 | | |
109 | | // List remote files from remote be, and find the hdr file |
110 | | Status _list_remote_files(); |
111 | | |
112 | | // Download hdr file from remote be to a tmp file |
113 | | Status _download_hdr_file(); |
114 | | |
115 | | // Link same rowset files by compare local hdr file and remote hdr file |
116 | | // if the local files are copied from the remote rowset, link them as the |
117 | | // remote rowset files, to avoid the duplicated downloading. |
118 | | Status _link_same_rowset_files(); |
119 | | |
120 | | // Get all remote file stats, excluding the hdr file. |
121 | | Status _get_remote_file_stats(); |
122 | | |
123 | | // Compute the need download files according to the local files md5sum (if enable_download_md5sum_check is true) |
124 | | void _get_need_download_files(); |
125 | | |
126 | | // Download all need download files |
127 | | Status _download_files(); |
128 | | |
129 | | // Install remote hdr file to local snapshot path from the tmp file |
130 | | Status _install_remote_hdr_file(); |
131 | | |
132 | | // Delete orphan files, which are not in remote |
133 | | Status _delete_orphan_files(); |
134 | | |
135 | | // Download a file from remote be to local path with the file stat |
136 | | Status _download_http_file(DataDir* data_dir, const std::string& remote_file_url, |
137 | | const std::string& local_file_path, |
138 | | const RemoteFileStat& remote_filestat); |
139 | | |
140 | | // Get the file stat from remote be |
141 | | Status _get_http_file_stat(const std::string& remote_file_url, RemoteFileStat* file_stat); |
142 | | |
143 | | TabletSharedPtr _tablet; |
144 | | SnapshotLoader& _snapshot_loader; |
145 | | std::function<Status()> _report_progress_callback; |
146 | | |
147 | | std::string _base_url; |
148 | | int64_t _local_tablet_id; |
149 | | int64_t _remote_tablet_id; |
150 | | const std::string& _local_path; |
151 | | const std::string& _remote_path; |
152 | | const TNetworkAddress& _remote_be_addr; |
153 | | |
154 | | std::string _local_hdr_filename; |
155 | | std::string _remote_hdr_filename; |
156 | | std::vector<std::string> _remote_file_list; |
157 | | std::unordered_map<std::string, LocalFileStat> _local_files; |
158 | | std::unordered_map<std::string, RemoteFileStat> _remote_files; |
159 | | |
160 | | std::string _tmp_hdr_file; |
161 | | RemoteFileStat _remote_hdr_filestat; |
162 | | std::vector<std::string> _need_download_files; |
163 | | }; |
164 | | |
165 | 3 | static std::string get_loaded_tag_path(const std::string& snapshot_path) { |
166 | 3 | return snapshot_path + "/LOADED"; |
167 | 3 | } |
168 | | |
169 | 1 | static Status write_loaded_tag(const std::string& snapshot_path, int64_t tablet_id) { |
170 | 1 | std::unique_ptr<io::FileWriter> writer; |
171 | 1 | std::string file = get_loaded_tag_path(snapshot_path); |
172 | 1 | RETURN_IF_ERROR(io::global_local_filesystem()->create_file(file, &writer)); |
173 | 1 | return writer->close(); |
174 | 1 | } |
175 | | |
176 | | Status upload_with_checksum(io::RemoteFileSystem& fs, std::string_view local_path, |
177 | 0 | std::string_view remote_path, std::string_view checksum) { |
178 | 0 | auto full_remote_path = fmt::format("{}.{}", remote_path, checksum); |
179 | 0 | switch (fs.type()) { |
180 | 0 | case io::FileSystemType::HDFS: |
181 | 0 | case io::FileSystemType::BROKER: { |
182 | 0 | std::string temp = fmt::format("{}.part", remote_path); |
183 | 0 | RETURN_IF_ERROR(fs.upload(local_path, temp)); |
184 | 0 | RETURN_IF_ERROR(fs.rename(temp, full_remote_path)); |
185 | 0 | break; |
186 | 0 | } |
187 | 0 | case io::FileSystemType::S3: |
188 | 0 | RETURN_IF_ERROR(fs.upload(local_path, full_remote_path)); |
189 | 0 | break; |
190 | 0 | default: |
191 | 0 | throw Exception(Status::FatalError("unknown fs type: {}", static_cast<int>(fs.type()))); |
192 | 0 | } |
193 | 0 | return Status::OK(); |
194 | 0 | } |
195 | | |
196 | 9 | bool _end_with(std::string_view str, std::string_view match) { |
197 | 9 | return str.size() >= match.size() && |
198 | 9 | str.compare(str.size() - match.size(), match.size(), match) == 0; |
199 | 9 | } |
200 | | |
201 | | Status SnapshotHttpDownloader::_get_http_file_stat(const std::string& remote_file_url, |
202 | 0 | RemoteFileStat* file_stat) { |
203 | 0 | uint64_t file_size = 0; |
204 | 0 | std::string file_md5; |
205 | 0 | auto get_file_stat_cb = [&remote_file_url, &file_size, &file_md5](HttpClient* client) { |
206 | 0 | int64_t timeout_ms = config::download_binlog_meta_timeout_ms; |
207 | 0 | std::string url = remote_file_url; |
208 | 0 | if (config::enable_download_md5sum_check) { |
209 | | // compute md5sum is time-consuming, so we set a longer timeout |
210 | 0 | timeout_ms = config::download_binlog_meta_timeout_ms * 3; |
211 | 0 | url = fmt::format("{}&acquire_md5=true", remote_file_url); |
212 | 0 | } |
213 | 0 | RETURN_IF_ERROR(client->init(url)); |
214 | 0 | client->set_timeout_ms(timeout_ms); |
215 | 0 | RETURN_IF_ERROR(client->head()); |
216 | 0 | RETURN_IF_ERROR(client->get_content_length(&file_size)); |
217 | 0 | if (config::enable_download_md5sum_check) { |
218 | 0 | RETURN_IF_ERROR(client->get_content_md5(&file_md5)); |
219 | 0 | } |
220 | 0 | return Status::OK(); |
221 | 0 | }; |
222 | 0 | RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, get_file_stat_cb)); |
223 | 0 | file_stat->url = remote_file_url; |
224 | 0 | file_stat->size = file_size; |
225 | 0 | file_stat->md5 = std::move(file_md5); |
226 | 0 | return Status::OK(); |
227 | 0 | } |
228 | | |
229 | | Status SnapshotHttpDownloader::_download_http_file(DataDir* data_dir, |
230 | | const std::string& remote_file_url, |
231 | | const std::string& local_file_path, |
232 | 0 | const RemoteFileStat& remote_filestat) { |
233 | 0 | auto file_size = remote_filestat.size; |
234 | 0 | const auto& remote_file_md5 = remote_filestat.md5; |
235 | | |
236 | | // check disk capacity |
237 | 0 | if (data_dir->reach_capacity_limit(file_size)) { |
238 | 0 | return Status::Error<ErrorCode::EXCEEDED_LIMIT>( |
239 | 0 | "reach the capacity limit of path {}, file_size={}", data_dir->path(), file_size); |
240 | 0 | } |
241 | | |
242 | 0 | uint64_t estimate_timeout = file_size / config::download_low_speed_limit_kbps / 1024; |
243 | 0 | if (estimate_timeout < config::download_low_speed_time) { |
244 | 0 | estimate_timeout = config::download_low_speed_time; |
245 | 0 | } |
246 | |
|
247 | 0 | LOG(INFO) << "clone begin to download file from: " << remote_file_url |
248 | 0 | << " to: " << local_file_path << ". size(B): " << file_size |
249 | 0 | << ", timeout(s): " << estimate_timeout; |
250 | |
|
251 | 0 | auto download_cb = [&remote_file_url, &remote_file_md5, estimate_timeout, &local_file_path, |
252 | 0 | file_size](HttpClient* client) { |
253 | 0 | RETURN_IF_ERROR(client->init(remote_file_url)); |
254 | 0 | client->set_timeout_ms(estimate_timeout * 1000); |
255 | 0 | RETURN_IF_ERROR(client->download(local_file_path)); |
256 | | |
257 | 0 | std::error_code ec; |
258 | | // Check file length |
259 | 0 | uint64_t local_file_size = std::filesystem::file_size(local_file_path, ec); |
260 | 0 | if (ec) { |
261 | 0 | LOG(WARNING) << "download file error" << ec.message(); |
262 | 0 | return Status::IOError("can't retrive file_size of {}, due to {}", local_file_path, |
263 | 0 | ec.message()); |
264 | 0 | } |
265 | 0 | if (local_file_size != file_size) { |
266 | 0 | LOG(WARNING) << "download file length error" |
267 | 0 | << ", remote_path=" << remote_file_url << ", file_size=" << file_size |
268 | 0 | << ", local_file_size=" << local_file_size; |
269 | 0 | return Status::InternalError("downloaded file size is not equal"); |
270 | 0 | } |
271 | | |
272 | 0 | if (!remote_file_md5.empty()) { // keep compatibility |
273 | 0 | std::string local_file_md5; |
274 | 0 | RETURN_IF_ERROR( |
275 | 0 | io::global_local_filesystem()->md5sum(local_file_path, &local_file_md5)); |
276 | 0 | if (local_file_md5 != remote_file_md5) { |
277 | 0 | LOG(WARNING) << "download file md5 error" |
278 | 0 | << ", remote_file_url=" << remote_file_url |
279 | 0 | << ", local_file_path=" << local_file_path |
280 | 0 | << ", remote_file_md5=" << remote_file_md5 |
281 | 0 | << ", local_file_md5=" << local_file_md5; |
282 | 0 | return Status::RuntimeError( |
283 | 0 | "download file {} md5 is not equal, local={}, remote={}", remote_file_url, |
284 | 0 | local_file_md5, remote_file_md5); |
285 | 0 | } |
286 | 0 | } |
287 | | |
288 | 0 | return io::global_local_filesystem()->permission(local_file_path, |
289 | 0 | io::LocalFileSystem::PERMS_OWNER_RW); |
290 | 0 | }; |
291 | 0 | auto status = HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, download_cb); |
292 | 0 | if (!status.ok()) { |
293 | 0 | LOG(WARNING) << "failed to download file from " << remote_file_url |
294 | 0 | << ", status: " << status.to_string(); |
295 | 0 | return status; |
296 | 0 | } |
297 | | |
298 | 0 | return Status::OK(); |
299 | 0 | } |
300 | | |
301 | 0 | Status SnapshotHttpDownloader::_load_existing_files() { |
302 | 0 | std::vector<std::string> existing_files; |
303 | 0 | RETURN_IF_ERROR(_snapshot_loader._get_existing_files_from_local(_local_path, &existing_files)); |
304 | 0 | for (auto& local_file : existing_files) { |
305 | | // add file size |
306 | 0 | std::string local_file_path = _local_path + "/" + local_file; |
307 | 0 | std::error_code ec; |
308 | 0 | uint64_t local_file_size = std::filesystem::file_size(local_file_path, ec); |
309 | 0 | if (ec) { |
310 | 0 | LOG(WARNING) << "download file error, can't retrive file_size of " << local_file_path |
311 | 0 | << ", due to " << ec.message(); |
312 | 0 | return Status::IOError("can't retrive file_size of {}, due to {}", local_file_path, |
313 | 0 | ec.message()); |
314 | 0 | } |
315 | | |
316 | | // get md5sum |
317 | 0 | std::string md5; |
318 | 0 | if (config::enable_download_md5sum_check) { |
319 | 0 | auto status = io::global_local_filesystem()->md5sum(local_file_path, &md5); |
320 | 0 | if (!status.ok()) { |
321 | 0 | LOG(WARNING) << "download file error, local file " << local_file_path |
322 | 0 | << " md5sum: " << status.to_string(); |
323 | 0 | return status; |
324 | 0 | } |
325 | 0 | } |
326 | 0 | _local_files[local_file] = {local_file_size, md5}; |
327 | | |
328 | | // get hdr file |
329 | 0 | if (local_file.ends_with(".hdr")) { |
330 | 0 | _local_hdr_filename = local_file; |
331 | 0 | } |
332 | 0 | } |
333 | | |
334 | 0 | return Status::OK(); |
335 | 0 | } |
336 | | |
337 | 0 | Status SnapshotHttpDownloader::_list_remote_files() { |
338 | | // get all these use http download action |
339 | | // http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180//2774718/217609978/2774718.hdr |
340 | 0 | std::string remote_url_prefix = fmt::format("{}&file={}", _base_url, _remote_path); |
341 | |
|
342 | 0 | LOG(INFO) << "list remote files: " << remote_url_prefix << ", job: " << _snapshot_loader._job_id |
343 | 0 | << ", task id: " << _snapshot_loader._task_id << ", remote be: " << _remote_be_addr; |
344 | |
|
345 | 0 | std::string remote_file_list_str; |
346 | 0 | auto list_files_cb = [&remote_url_prefix, &remote_file_list_str](HttpClient* client) { |
347 | 0 | RETURN_IF_ERROR(client->init(remote_url_prefix)); |
348 | 0 | client->set_timeout_ms(config::download_binlog_meta_timeout_ms); |
349 | 0 | return client->execute(&remote_file_list_str); |
350 | 0 | }; |
351 | 0 | RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, list_files_cb)); |
352 | | |
353 | 0 | _remote_file_list = strings::Split(remote_file_list_str, "\n", strings::SkipWhitespace()); |
354 | | |
355 | | // find hdr file |
356 | 0 | auto hdr_file = |
357 | 0 | std::find_if(_remote_file_list.begin(), _remote_file_list.end(), |
358 | 0 | [](const std::string& filename) { return _end_with(filename, ".hdr"); }); |
359 | 0 | if (hdr_file == _remote_file_list.end()) { |
360 | 0 | std::string msg = |
361 | 0 | fmt::format("can't find hdr file in remote snapshot path: {}", _remote_path); |
362 | 0 | LOG(WARNING) << msg; |
363 | 0 | return Status::RuntimeError(std::move(msg)); |
364 | 0 | } |
365 | 0 | _remote_hdr_filename = *hdr_file; |
366 | 0 | _remote_file_list.erase(hdr_file); |
367 | |
|
368 | 0 | return Status::OK(); |
369 | 0 | } |
370 | | |
371 | 0 | Status SnapshotHttpDownloader::_download_hdr_file() { |
372 | 0 | RemoteFileStat remote_hdr_stat; |
373 | 0 | std::string remote_hdr_file_url = |
374 | 0 | fmt::format("{}&file={}/{}", _base_url, _remote_path, _remote_hdr_filename); |
375 | 0 | auto status = _get_http_file_stat(remote_hdr_file_url, &remote_hdr_stat); |
376 | 0 | if (!status.ok()) { |
377 | 0 | LOG(WARNING) << "failed to get remote hdr file stat: " << remote_hdr_file_url |
378 | 0 | << ", error: " << status.to_string(); |
379 | 0 | return status; |
380 | 0 | } |
381 | | |
382 | 0 | std::string hdr_filename = _remote_hdr_filename + ".tmp"; |
383 | 0 | std::string hdr_file = _local_path + "/" + hdr_filename; |
384 | 0 | status = _download_http_file(_tablet->data_dir(), remote_hdr_file_url, hdr_file, |
385 | 0 | remote_hdr_stat); |
386 | 0 | if (!status.ok()) { |
387 | 0 | LOG(WARNING) << "failed to download remote hdr file: " << remote_hdr_file_url |
388 | 0 | << ", error: " << status.to_string(); |
389 | 0 | return status; |
390 | 0 | } |
391 | 0 | _tmp_hdr_file = hdr_file; |
392 | 0 | _remote_hdr_filestat = remote_hdr_stat; |
393 | 0 | return Status::OK(); |
394 | 0 | } |
395 | | |
396 | 0 | Status SnapshotHttpDownloader::_link_same_rowset_files() { |
397 | 0 | std::string local_hdr_file_path = _local_path + "/" + _local_hdr_filename; |
398 | | |
399 | | // load local tablet meta |
400 | 0 | TabletMetaPB local_tablet_meta; |
401 | 0 | auto status = TabletMeta::load_from_file(local_hdr_file_path, &local_tablet_meta); |
402 | 0 | if (!status.ok()) { |
403 | | // This file might broken because of the partial downloading. |
404 | 0 | LOG(WARNING) << "failed to load local tablet meta: " << local_hdr_file_path |
405 | 0 | << ", skip link same rowset files, error: " << status.to_string(); |
406 | 0 | return Status::OK(); |
407 | 0 | } |
408 | | |
409 | | // load remote tablet meta |
410 | 0 | TabletMetaPB remote_tablet_meta; |
411 | 0 | status = TabletMeta::load_from_file(_tmp_hdr_file, &remote_tablet_meta); |
412 | 0 | if (!status.ok()) { |
413 | 0 | LOG(WARNING) << "failed to load remote tablet meta: " << _tmp_hdr_file |
414 | 0 | << ", error: " << status.to_string(); |
415 | 0 | return status; |
416 | 0 | } |
417 | | |
418 | 0 | LOG(INFO) << "link rowset files by compare " << _local_hdr_filename << " and " |
419 | 0 | << _remote_hdr_filename; |
420 | |
|
421 | 0 | std::unordered_map<std::string, const RowsetMetaPB&> remote_rowset_metas; |
422 | 0 | for (const auto& rowset_meta : remote_tablet_meta.rs_metas()) { |
423 | 0 | if (rowset_meta.has_resource_id()) { // skip remote rowset |
424 | 0 | continue; |
425 | 0 | } |
426 | 0 | remote_rowset_metas.insert({rowset_meta.rowset_id_v2(), rowset_meta}); |
427 | 0 | } |
428 | |
|
429 | 0 | for (const auto& local_rowset_meta : local_tablet_meta.rs_metas()) { |
430 | 0 | if (local_rowset_meta.has_resource_id() || !local_rowset_meta.has_source_rowset_id()) { |
431 | 0 | continue; |
432 | 0 | } |
433 | | |
434 | 0 | auto remote_rowset_meta = remote_rowset_metas.find(local_rowset_meta.source_rowset_id()); |
435 | 0 | if (remote_rowset_meta == remote_rowset_metas.end()) { |
436 | 0 | continue; |
437 | 0 | } |
438 | | |
439 | 0 | const auto& remote_rowset_id = remote_rowset_meta->first; |
440 | 0 | const auto& remote_rowset_meta_pb = remote_rowset_meta->second; |
441 | 0 | const auto& local_rowset_id = local_rowset_meta.rowset_id_v2(); |
442 | 0 | auto remote_tablet_id = remote_rowset_meta_pb.tablet_id(); |
443 | 0 | if (local_rowset_meta.start_version() != remote_rowset_meta_pb.start_version() || |
444 | 0 | local_rowset_meta.end_version() != remote_rowset_meta_pb.end_version()) { |
445 | 0 | continue; |
446 | 0 | } |
447 | | |
448 | 0 | LOG(INFO) << "rowset " << local_rowset_id << " was downloaded from remote tablet " |
449 | 0 | << remote_tablet_id << " rowset " << remote_rowset_id |
450 | 0 | << ", directly link files instead of downloading"; |
451 | | |
452 | | // Since the rowset meta are the same, we can link the local rowset files as |
453 | | // the downloaded remote rowset files. |
454 | 0 | for (const auto& [local_file, local_filestat] : _local_files) { |
455 | 0 | if (!local_file.starts_with(local_rowset_id)) { |
456 | 0 | continue; |
457 | 0 | } |
458 | | |
459 | 0 | std::string remote_file = local_file; |
460 | 0 | remote_file.replace(0, local_rowset_id.size(), remote_rowset_id); |
461 | 0 | std::string local_file_path = _local_path + "/" + local_file; |
462 | 0 | std::string remote_file_path = _local_path + "/" + remote_file; |
463 | |
|
464 | 0 | bool exist = true; |
465 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(remote_file_path, &exist)); |
466 | 0 | if (exist) { |
467 | 0 | continue; |
468 | 0 | } |
469 | | |
470 | 0 | LOG(INFO) << "link file from " << local_file_path << " to " << remote_file_path; |
471 | 0 | if (!io::global_local_filesystem()->link_file(local_file_path, remote_file_path)) { |
472 | 0 | std::string msg = fmt::format("link file failed from {} to {}, err: {}", |
473 | 0 | local_file_path, remote_file_path, strerror(errno)); |
474 | 0 | LOG(WARNING) << msg; |
475 | 0 | return Status::InternalError(std::move(msg)); |
476 | 0 | } |
477 | | |
478 | 0 | _local_files[remote_file] = local_filestat; |
479 | 0 | } |
480 | 0 | } |
481 | | |
482 | 0 | return Status::OK(); |
483 | 0 | } |
484 | | |
485 | 0 | Status SnapshotHttpDownloader::_get_remote_file_stats() { |
486 | 0 | for (const auto& filename : _remote_file_list) { |
487 | 0 | if (_report_progress_callback) { |
488 | 0 | RETURN_IF_ERROR(_report_progress_callback()); |
489 | 0 | } |
490 | | |
491 | 0 | std::string remote_file_url = |
492 | 0 | fmt::format("{}&file={}/{}", _base_url, _remote_path, filename); |
493 | |
|
494 | 0 | RemoteFileStat remote_filestat; |
495 | 0 | RETURN_IF_ERROR(_get_http_file_stat(remote_file_url, &remote_filestat)); |
496 | 0 | _remote_files[filename] = remote_filestat; |
497 | 0 | } |
498 | | |
499 | 0 | return Status::OK(); |
500 | 0 | } |
501 | | |
502 | 0 | void SnapshotHttpDownloader::_get_need_download_files() { |
503 | 0 | for (const auto& [remote_file, remote_filestat] : _remote_files) { |
504 | 0 | LOG(INFO) << "remote file: " << remote_file << ", size: " << remote_filestat.size |
505 | 0 | << ", md5: " << remote_filestat.md5; |
506 | 0 | auto it = _local_files.find(remote_file); |
507 | 0 | if (it == _local_files.end()) { |
508 | 0 | _need_download_files.emplace_back(remote_file); |
509 | 0 | continue; |
510 | 0 | } |
511 | | |
512 | 0 | if (auto& local_filestat = it->second; local_filestat.size != remote_filestat.size) { |
513 | 0 | _need_download_files.emplace_back(remote_file); |
514 | 0 | continue; |
515 | 0 | } |
516 | | |
517 | 0 | if (auto& local_filestat = it->second; local_filestat.md5 != remote_filestat.md5) { |
518 | 0 | _need_download_files.emplace_back(remote_file); |
519 | 0 | continue; |
520 | 0 | } |
521 | | |
522 | 0 | LOG(INFO) << fmt::format("file {} already exists, skip download url {}", remote_file, |
523 | 0 | remote_filestat.url); |
524 | 0 | } |
525 | 0 | } |
526 | | |
527 | 0 | Status SnapshotHttpDownloader::_download_files() { |
528 | 0 | DataDir* data_dir = _tablet->data_dir(); |
529 | |
|
530 | 0 | uint64_t total_file_size = 0; |
531 | 0 | MonotonicStopWatch watch(true); |
532 | 0 | for (auto& filename : _need_download_files) { |
533 | 0 | if (_report_progress_callback) { |
534 | 0 | RETURN_IF_ERROR(_report_progress_callback()); |
535 | 0 | } |
536 | | |
537 | 0 | auto& remote_filestat = _remote_files[filename]; |
538 | 0 | auto file_size = remote_filestat.size; |
539 | 0 | auto& remote_file_url = remote_filestat.url; |
540 | 0 | auto& remote_file_md5 = remote_filestat.md5; |
541 | |
|
542 | 0 | std::string local_filename; |
543 | 0 | RETURN_IF_ERROR( |
544 | 0 | _snapshot_loader._replace_tablet_id(filename, _local_tablet_id, &local_filename)); |
545 | 0 | std::string local_file_path = _local_path + "/" + local_filename; |
546 | |
|
547 | 0 | RETURN_IF_ERROR( |
548 | 0 | _download_http_file(data_dir, remote_file_url, local_file_path, remote_filestat)); |
549 | 0 | total_file_size += file_size; |
550 | | |
551 | | // local_files always keep the updated local files |
552 | 0 | _local_files[filename] = LocalFileStat {file_size, remote_file_md5}; |
553 | 0 | } |
554 | | |
555 | 0 | uint64_t total_time_ms = watch.elapsed_time() / 1000 / 1000; |
556 | 0 | total_time_ms = total_time_ms > 0 ? total_time_ms : 0; |
557 | 0 | double copy_rate = 0.0; |
558 | 0 | if (total_time_ms > 0) { |
559 | 0 | copy_rate = total_file_size / ((double)total_time_ms) / 1000; |
560 | 0 | } |
561 | 0 | LOG(INFO) << fmt::format( |
562 | 0 | "succeed to copy remote tablet {} to local tablet {}, total downloading {} files, " |
563 | 0 | "total file size: {} B, cost: {} ms, rate: {} MB/s", |
564 | 0 | _remote_tablet_id, _local_tablet_id, _need_download_files.size(), total_file_size, |
565 | 0 | total_time_ms, copy_rate); |
566 | |
|
567 | 0 | return Status::OK(); |
568 | 0 | } |
569 | | |
570 | 0 | Status SnapshotHttpDownloader::_install_remote_hdr_file() { |
571 | 0 | std::string local_hdr_filename; |
572 | 0 | RETURN_IF_ERROR(_snapshot_loader._replace_tablet_id(_remote_hdr_filename, _local_tablet_id, |
573 | 0 | &local_hdr_filename)); |
574 | 0 | std::string local_hdr_file_path = _local_path + "/" + local_hdr_filename; |
575 | |
|
576 | 0 | auto status = io::global_local_filesystem()->rename(_tmp_hdr_file, local_hdr_file_path); |
577 | 0 | if (!status.ok()) { |
578 | 0 | LOG(WARNING) << "failed to install remote hdr file from: " << _tmp_hdr_file << " to" |
579 | 0 | << local_hdr_file_path << ", error: " << status.to_string(); |
580 | 0 | return Status::RuntimeError("failed install remote hdr file {} from tmp {}, error: {}", |
581 | 0 | local_hdr_file_path, _tmp_hdr_file, status.to_string()); |
582 | 0 | } |
583 | | |
584 | | // also save the hdr file into remote files. |
585 | 0 | _remote_files[_remote_hdr_filename] = _remote_hdr_filestat; |
586 | |
|
587 | 0 | return Status::OK(); |
588 | 0 | } |
589 | | |
590 | 0 | Status SnapshotHttpDownloader::_delete_orphan_files() { |
591 | | // local_files: contain all remote files and local files |
592 | | // finally, delete local files which are not in remote |
593 | 0 | for (const auto& [local_file, local_filestat] : _local_files) { |
594 | | // replace the tablet id in local file name with the remote tablet id, |
595 | | // in order to compare the file name. |
596 | 0 | std::string new_name; |
597 | 0 | Status st = _snapshot_loader._replace_tablet_id(local_file, _remote_tablet_id, &new_name); |
598 | 0 | if (!st.ok()) { |
599 | 0 | LOG(WARNING) << "failed to replace tablet id. unknown local file: " << st |
600 | 0 | << ". ignore it"; |
601 | 0 | continue; |
602 | 0 | } |
603 | 0 | VLOG_CRITICAL << "new file name after replace tablet id: " << new_name; |
604 | 0 | const auto& find = _remote_files.find(new_name); |
605 | 0 | if (find != _remote_files.end()) { |
606 | 0 | continue; |
607 | 0 | } |
608 | | |
609 | | // delete |
610 | 0 | std::string full_local_file = _local_path + "/" + local_file; |
611 | 0 | LOG(INFO) << "begin to delete local snapshot file: " << full_local_file |
612 | 0 | << ", it does not exist in remote"; |
613 | 0 | if (remove(full_local_file.c_str()) != 0) { |
614 | 0 | LOG(WARNING) << "failed to delete unknown local file: " << full_local_file |
615 | 0 | << ", error: " << strerror(errno) << ", file size: " << local_filestat.size |
616 | 0 | << ", ignore it"; |
617 | 0 | } |
618 | 0 | } |
619 | 0 | return Status::OK(); |
620 | 0 | } |
621 | | |
622 | 0 | Status SnapshotHttpDownloader::download() { |
623 | | // Take a lock to protect the local snapshot path. |
624 | 0 | auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(_local_path); |
625 | | |
626 | | // Step 1: Validate local tablet snapshot paths |
627 | 0 | bool res = true; |
628 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(_local_path, &res)); |
629 | 0 | if (!res) { |
630 | 0 | std::string msg = |
631 | 0 | fmt::format("snapshot path is not directory or does not exist: {}", _local_path); |
632 | 0 | LOG(WARNING) << msg; |
633 | 0 | return Status::RuntimeError(std::move(msg)); |
634 | 0 | } |
635 | | |
636 | | // Step 2: get all local files |
637 | 0 | RETURN_IF_ERROR(_load_existing_files()); |
638 | | |
639 | | // Step 3: Validate remote tablet snapshot paths && remote files map |
640 | 0 | RETURN_IF_ERROR(_list_remote_files()); |
641 | | |
642 | | // Step 4: download hdr file to a tmp file |
643 | 0 | RETURN_IF_ERROR(_download_hdr_file()); |
644 | | |
645 | | // Step 5: link same rowset files, if local tablet meta file exists |
646 | 0 | if (!_local_hdr_filename.empty()) { |
647 | 0 | RETURN_IF_ERROR(_link_same_rowset_files()); |
648 | 0 | } |
649 | | |
650 | | // Step 6: get all remote file stats |
651 | 0 | RETURN_IF_ERROR(_get_remote_file_stats()); |
652 | | |
653 | | // Step 7: get all need download files & download them |
654 | 0 | _get_need_download_files(); |
655 | 0 | if (!_need_download_files.empty()) { |
656 | 0 | RETURN_IF_ERROR(_download_files()); |
657 | 0 | } |
658 | | |
659 | | // Step 8: install remote hdr file from tmp file |
660 | 0 | RETURN_IF_ERROR(_install_remote_hdr_file()); |
661 | | |
662 | | // Step 9: delete orphan files |
663 | 0 | RETURN_IF_ERROR(_delete_orphan_files()); |
664 | | |
665 | 0 | return Status::OK(); |
666 | 0 | } |
667 | | |
668 | | SnapshotLoader::SnapshotLoader(StorageEngine& engine, ExecEnv* env, int64_t job_id, int64_t task_id, |
669 | | const TNetworkAddress& broker_addr, |
670 | | const std::map<std::string, std::string>& prop) |
671 | | : _engine(engine), |
672 | | _env(env), |
673 | | _job_id(job_id), |
674 | | _task_id(task_id), |
675 | | _broker_addr(broker_addr), |
676 | 3 | _prop(prop) {} |
677 | | |
678 | 0 | Status SnapshotLoader::init(TStorageBackendType::type type, const std::string& location) { |
679 | 0 | if (TStorageBackendType::type::S3 == type) { |
680 | 0 | S3Conf s3_conf; |
681 | 0 | S3URI s3_uri(location); |
682 | 0 | RETURN_IF_ERROR(s3_uri.parse()); |
683 | 0 | RETURN_IF_ERROR(S3ClientFactory::convert_properties_to_s3_conf(_prop, s3_uri, &s3_conf)); |
684 | 0 | _remote_fs = |
685 | 0 | DORIS_TRY(io::S3FileSystem::create(std::move(s3_conf), io::FileSystem::TMP_FS_ID)); |
686 | 0 | } else if (TStorageBackendType::type::HDFS == type) { |
687 | 0 | THdfsParams hdfs_params = parse_properties(_prop); |
688 | 0 | _remote_fs = DORIS_TRY(io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name, |
689 | 0 | io::FileSystem::TMP_FS_ID, nullptr)); |
690 | 0 | } else if (TStorageBackendType::type::BROKER == type) { |
691 | 0 | std::shared_ptr<io::BrokerFileSystem> fs; |
692 | 0 | _remote_fs = DORIS_TRY( |
693 | 0 | io::BrokerFileSystem::create(_broker_addr, _prop, io::FileSystem::TMP_FS_ID)); |
694 | 0 | } else { |
695 | 0 | return Status::InternalError("Unknown storage type: {}", type); |
696 | 0 | } |
697 | 0 | return Status::OK(); |
698 | 0 | } |
699 | | |
700 | 3 | SnapshotLoader::~SnapshotLoader() = default; |
701 | | |
702 | | Status SnapshotLoader::upload(const std::map<std::string, std::string>& src_to_dest_path, |
703 | 0 | std::map<int64_t, std::vector<std::string>>* tablet_files) { |
704 | 0 | if (!_remote_fs) { |
705 | 0 | return Status::InternalError("Storage backend not initialized."); |
706 | 0 | } |
707 | 0 | LOG(INFO) << "begin to upload snapshot files. num: " << src_to_dest_path.size() |
708 | 0 | << ", broker addr: " << _broker_addr << ", job: " << _job_id << ", task" << _task_id; |
709 | | |
710 | | // check if job has already been cancelled |
711 | 0 | int tmp_counter = 1; |
712 | 0 | RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::UPLOAD)); |
713 | | |
714 | 0 | Status status = Status::OK(); |
715 | | // 1. validate local tablet snapshot paths |
716 | 0 | RETURN_IF_ERROR(_check_local_snapshot_paths(src_to_dest_path, true)); |
717 | | |
718 | | // 2. for each src path, upload it to remote storage |
719 | | // we report to frontend for every 10 files, and we will cancel the job if |
720 | | // the job has already been cancelled in frontend. |
721 | 0 | int report_counter = 0; |
722 | 0 | int total_num = src_to_dest_path.size(); |
723 | 0 | int finished_num = 0; |
724 | 0 | for (const auto& iter : src_to_dest_path) { |
725 | 0 | const std::string& src_path = iter.first; |
726 | 0 | const std::string& dest_path = iter.second; |
727 | | |
728 | | // Take a lock to protect the local snapshot path. |
729 | 0 | auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(src_path); |
730 | |
|
731 | 0 | int64_t tablet_id = 0; |
732 | 0 | int32_t schema_hash = 0; |
733 | 0 | RETURN_IF_ERROR( |
734 | 0 | _get_tablet_id_and_schema_hash_from_file_path(src_path, &tablet_id, &schema_hash)); |
735 | | |
736 | | // 2.1 get existing files from remote path |
737 | 0 | std::map<std::string, FileStat> remote_files; |
738 | 0 | RETURN_IF_ERROR(_list_with_checksum(dest_path, &remote_files)); |
739 | | |
740 | 0 | for (auto& tmp : remote_files) { |
741 | 0 | VLOG_CRITICAL << "get remote file: " << tmp.first << ", checksum: " << tmp.second.md5; |
742 | 0 | } |
743 | | |
744 | | // 2.2 list local files |
745 | 0 | std::vector<std::string> local_files; |
746 | 0 | std::vector<std::string> local_files_with_checksum; |
747 | 0 | RETURN_IF_ERROR(_get_existing_files_from_local(src_path, &local_files)); |
748 | | |
749 | | // 2.3 iterate local files |
750 | 0 | for (auto& local_file : local_files) { |
751 | 0 | RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num, |
752 | 0 | TTaskType::type::UPLOAD)); |
753 | | |
754 | | // calc md5sum of localfile |
755 | 0 | std::string md5sum; |
756 | 0 | RETURN_IF_ERROR( |
757 | 0 | io::global_local_filesystem()->md5sum(src_path + "/" + local_file, &md5sum)); |
758 | 0 | VLOG_CRITICAL << "get file checksum: " << local_file << ": " << md5sum; |
759 | 0 | local_files_with_checksum.push_back(local_file + "." + md5sum); |
760 | | |
761 | | // check if this local file need upload |
762 | 0 | bool need_upload = false; |
763 | 0 | auto find = remote_files.find(local_file); |
764 | 0 | if (find != remote_files.end()) { |
765 | 0 | if (md5sum != find->second.md5) { |
766 | | // remote storage file exist, but with different checksum |
767 | 0 | LOG(WARNING) << "remote file checksum is invalid. remote: " << find->first |
768 | 0 | << ", local: " << md5sum; |
769 | | // TODO(cmy): save these files and delete them later |
770 | 0 | need_upload = true; |
771 | 0 | } |
772 | 0 | } else { |
773 | 0 | need_upload = true; |
774 | 0 | } |
775 | |
|
776 | 0 | if (!need_upload) { |
777 | 0 | VLOG_CRITICAL << "file exist in remote path, no need to upload: " << local_file; |
778 | 0 | continue; |
779 | 0 | } |
780 | | |
781 | | // upload |
782 | 0 | std::string remote_path = dest_path + '/' + local_file; |
783 | 0 | std::string local_path = src_path + '/' + local_file; |
784 | 0 | RETURN_IF_ERROR(upload_with_checksum(*_remote_fs, local_path, remote_path, md5sum)); |
785 | 0 | } // end for each tablet's local files |
786 | | |
787 | 0 | tablet_files->emplace(tablet_id, local_files_with_checksum); |
788 | 0 | finished_num++; |
789 | 0 | LOG(INFO) << "finished to write tablet to remote. local path: " << src_path |
790 | 0 | << ", remote path: " << dest_path; |
791 | 0 | } // end for each tablet path |
792 | | |
793 | 0 | LOG(INFO) << "finished to upload snapshots. job: " << _job_id << ", task id: " << _task_id; |
794 | 0 | return status; |
795 | 0 | } |
796 | | |
797 | | /* |
798 | | * Download snapshot files from remote. |
799 | | * After downloaded, the local dir should contains all files existing in remote, |
800 | | * may also contains several useless files. |
801 | | */ |
802 | | Status SnapshotLoader::download(const std::map<std::string, std::string>& src_to_dest_path, |
803 | 0 | std::vector<int64_t>* downloaded_tablet_ids) { |
804 | 0 | if (!_remote_fs) { |
805 | 0 | return Status::InternalError("Storage backend not initialized."); |
806 | 0 | } |
807 | 0 | LOG(INFO) << "begin to download snapshot files. num: " << src_to_dest_path.size() |
808 | 0 | << ", broker addr: " << _broker_addr << ", job: " << _job_id |
809 | 0 | << ", task id: " << _task_id; |
810 | | |
811 | | // check if job has already been cancelled |
812 | 0 | int tmp_counter = 1; |
813 | 0 | RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::DOWNLOAD)); |
814 | | |
815 | 0 | Status status = Status::OK(); |
816 | | // 1. validate local tablet snapshot paths |
817 | 0 | RETURN_IF_ERROR(_check_local_snapshot_paths(src_to_dest_path, false)); |
818 | | |
819 | | // 2. for each src path, download it to local storage |
820 | 0 | int report_counter = 0; |
821 | 0 | int total_num = src_to_dest_path.size(); |
822 | 0 | int finished_num = 0; |
823 | 0 | for (const auto& iter : src_to_dest_path) { |
824 | 0 | const std::string& remote_path = iter.first; |
825 | 0 | const std::string& local_path = iter.second; |
826 | | |
827 | | // Take a lock to protect the local snapshot path. |
828 | 0 | auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(local_path); |
829 | |
|
830 | 0 | int64_t local_tablet_id = 0; |
831 | 0 | int32_t schema_hash = 0; |
832 | 0 | RETURN_IF_ERROR(_get_tablet_id_and_schema_hash_from_file_path(local_path, &local_tablet_id, |
833 | 0 | &schema_hash)); |
834 | 0 | downloaded_tablet_ids->push_back(local_tablet_id); |
835 | |
|
836 | 0 | int64_t remote_tablet_id; |
837 | 0 | RETURN_IF_ERROR(_get_tablet_id_from_remote_path(remote_path, &remote_tablet_id)); |
838 | 0 | VLOG_CRITICAL << "get local tablet id: " << local_tablet_id |
839 | 0 | << ", schema hash: " << schema_hash |
840 | 0 | << ", remote tablet id: " << remote_tablet_id; |
841 | | |
842 | | // 2.1. get local files |
843 | 0 | std::vector<std::string> local_files; |
844 | 0 | RETURN_IF_ERROR(_get_existing_files_from_local(local_path, &local_files)); |
845 | | |
846 | | // 2.2. get remote files |
847 | 0 | std::map<std::string, FileStat> remote_files; |
848 | 0 | RETURN_IF_ERROR(_list_with_checksum(remote_path, &remote_files)); |
849 | 0 | if (remote_files.empty()) { |
850 | 0 | std::stringstream ss; |
851 | 0 | ss << "get nothing from remote path: " << remote_path; |
852 | 0 | LOG(WARNING) << ss.str(); |
853 | 0 | return Status::InternalError(ss.str()); |
854 | 0 | } |
855 | | |
856 | 0 | TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(local_tablet_id); |
857 | 0 | if (tablet == nullptr) { |
858 | 0 | std::stringstream ss; |
859 | 0 | ss << "failed to get local tablet: " << local_tablet_id; |
860 | 0 | LOG(WARNING) << ss.str(); |
861 | 0 | return Status::InternalError(ss.str()); |
862 | 0 | } |
863 | 0 | DataDir* data_dir = tablet->data_dir(); |
864 | |
|
865 | 0 | for (auto& iter : remote_files) { |
866 | 0 | RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num, |
867 | 0 | TTaskType::type::DOWNLOAD)); |
868 | | |
869 | 0 | bool need_download = false; |
870 | 0 | const std::string& remote_file = iter.first; |
871 | 0 | const FileStat& file_stat = iter.second; |
872 | 0 | auto find = std::find(local_files.begin(), local_files.end(), remote_file); |
873 | 0 | if (find == local_files.end()) { |
874 | | // remote file does not exist in local, download it |
875 | 0 | need_download = true; |
876 | 0 | } else { |
877 | 0 | if (_end_with(remote_file, ".hdr")) { |
878 | | // this is a header file, download it. |
879 | 0 | need_download = true; |
880 | 0 | } else { |
881 | | // check checksum |
882 | 0 | std::string local_md5sum; |
883 | 0 | Status st = io::global_local_filesystem()->md5sum( |
884 | 0 | local_path + "/" + remote_file, &local_md5sum); |
885 | 0 | if (!st.ok()) { |
886 | 0 | LOG(WARNING) << "failed to get md5sum of local file: " << remote_file |
887 | 0 | << ". msg: " << st << ". download it"; |
888 | 0 | need_download = true; |
889 | 0 | } else { |
890 | 0 | VLOG_CRITICAL << "get local file checksum: " << remote_file << ": " |
891 | 0 | << local_md5sum; |
892 | 0 | if (file_stat.md5 != local_md5sum) { |
893 | | // file's checksum does not equal, download it. |
894 | 0 | need_download = true; |
895 | 0 | } |
896 | 0 | } |
897 | 0 | } |
898 | 0 | } |
899 | |
|
900 | 0 | if (!need_download) { |
901 | 0 | LOG(INFO) << "remote file already exist in local, no need to download." |
902 | 0 | << ", file: " << remote_file; |
903 | 0 | continue; |
904 | 0 | } |
905 | | |
906 | | // begin to download |
907 | 0 | std::string full_remote_file = remote_path + "/" + remote_file + "." + file_stat.md5; |
908 | 0 | std::string local_file_name; |
909 | | // we need to replace the tablet_id in remote file name with local tablet id |
910 | 0 | RETURN_IF_ERROR(_replace_tablet_id(remote_file, local_tablet_id, &local_file_name)); |
911 | 0 | std::string full_local_file = local_path + "/" + local_file_name; |
912 | 0 | LOG(INFO) << "begin to download from " << full_remote_file << " to " << full_local_file; |
913 | 0 | size_t file_len = file_stat.size; |
914 | | |
915 | | // check disk capacity |
916 | 0 | if (data_dir->reach_capacity_limit(file_len)) { |
917 | 0 | return Status::Error<ErrorCode::EXCEEDED_LIMIT>( |
918 | 0 | "reach the capacity limit of path {}, file_size={}", data_dir->path(), |
919 | 0 | file_len); |
920 | 0 | } |
921 | | // remove file which will be downloaded now. |
922 | | // this file will be added to local_files if it be downloaded successfully. |
923 | 0 | if (find != local_files.end()) { |
924 | 0 | local_files.erase(find); |
925 | 0 | } |
926 | 0 | RETURN_IF_ERROR(_remote_fs->download(full_remote_file, full_local_file)); |
927 | | |
928 | | // 3. check md5 of the downloaded file |
929 | 0 | std::string downloaded_md5sum; |
930 | 0 | RETURN_IF_ERROR( |
931 | 0 | io::global_local_filesystem()->md5sum(full_local_file, &downloaded_md5sum)); |
932 | 0 | VLOG_CRITICAL << "get downloaded file checksum: " << full_local_file << ": " |
933 | 0 | << downloaded_md5sum; |
934 | 0 | if (downloaded_md5sum != file_stat.md5) { |
935 | 0 | std::stringstream ss; |
936 | 0 | ss << "invalid md5 of downloaded file: " << full_local_file |
937 | 0 | << ", expected: " << file_stat.md5 << ", get: " << downloaded_md5sum; |
938 | 0 | LOG(WARNING) << ss.str(); |
939 | 0 | return Status::InternalError(ss.str()); |
940 | 0 | } |
941 | | |
942 | | // local_files always keep the updated local files |
943 | 0 | local_files.push_back(local_file_name); |
944 | 0 | LOG(INFO) << "finished to download file via broker. file: " << full_local_file |
945 | 0 | << ", length: " << file_len; |
946 | 0 | } // end for all remote files |
947 | | |
948 | | // finally, delete local files which are not in remote |
949 | 0 | for (const auto& local_file : local_files) { |
950 | | // replace the tablet id in local file name with the remote tablet id, |
951 | | // in order to compare the file name. |
952 | 0 | std::string new_name; |
953 | 0 | Status st = _replace_tablet_id(local_file, remote_tablet_id, &new_name); |
954 | 0 | if (!st.ok()) { |
955 | 0 | LOG(WARNING) << "failed to replace tablet id. unknown local file: " << st |
956 | 0 | << ". ignore it"; |
957 | 0 | continue; |
958 | 0 | } |
959 | 0 | VLOG_CRITICAL << "new file name after replace tablet id: " << new_name; |
960 | 0 | const auto& find = remote_files.find(new_name); |
961 | 0 | if (find != remote_files.end()) { |
962 | 0 | continue; |
963 | 0 | } |
964 | | |
965 | | // delete |
966 | 0 | std::string full_local_file = local_path + "/" + local_file; |
967 | 0 | VLOG_CRITICAL << "begin to delete local snapshot file: " << full_local_file |
968 | 0 | << ", it does not exist in remote"; |
969 | 0 | if (remove(full_local_file.c_str()) != 0) { |
970 | 0 | LOG(WARNING) << "failed to delete unknown local file: " << full_local_file |
971 | 0 | << ", ignore it"; |
972 | 0 | } |
973 | 0 | } |
974 | |
|
975 | 0 | finished_num++; |
976 | 0 | } // end for src_to_dest_path |
977 | | |
978 | 0 | LOG(INFO) << "finished to download snapshots. job: " << _job_id << ", task id: " << _task_id; |
979 | 0 | return status; |
980 | 0 | } |
981 | | |
982 | | Status SnapshotLoader::remote_http_download( |
983 | | const std::vector<TRemoteTabletSnapshot>& remote_tablet_snapshots, |
984 | 0 | std::vector<int64_t>* downloaded_tablet_ids) { |
985 | | // check if job has already been cancelled |
986 | 0 | int tmp_counter = 1; |
987 | 0 | RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::DOWNLOAD)); |
988 | 0 | Status status = Status::OK(); |
989 | |
|
990 | 0 | int report_counter = 0; |
991 | 0 | int finished_num = 0; |
992 | 0 | int total_num = remote_tablet_snapshots.size(); |
993 | 0 | for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) { |
994 | 0 | auto local_tablet_id = remote_tablet_snapshot.local_tablet_id; |
995 | 0 | const auto& local_path = remote_tablet_snapshot.local_snapshot_path; |
996 | 0 | const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path; |
997 | |
|
998 | 0 | LOG(INFO) << fmt::format( |
999 | 0 | "download snapshots via http. job: {}, task id: {}, local dir: {}, remote dir: {}", |
1000 | 0 | _job_id, _task_id, local_path, remote_path); |
1001 | |
|
1002 | 0 | TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(local_tablet_id); |
1003 | 0 | if (tablet == nullptr) { |
1004 | 0 | std::string msg = fmt::format("failed to get local tablet: {}", local_tablet_id); |
1005 | 0 | LOG(WARNING) << msg; |
1006 | 0 | return Status::RuntimeError(std::move(msg)); |
1007 | 0 | } |
1008 | | |
1009 | 0 | SnapshotHttpDownloader downloader(remote_tablet_snapshot, std::move(tablet), *this); |
1010 | 0 | downloader.set_report_progress_callback( |
1011 | 0 | [this, &report_counter, &finished_num, &total_num]() { |
1012 | 0 | return _report_every(10, &report_counter, finished_num, total_num, |
1013 | 0 | TTaskType::type::DOWNLOAD); |
1014 | 0 | }); |
1015 | 0 | RETURN_IF_ERROR(downloader.download()); |
1016 | | |
1017 | 0 | ++finished_num; |
1018 | 0 | } |
1019 | | |
1020 | 0 | LOG(INFO) << "finished to download snapshots. job: " << _job_id << ", task id: " << _task_id; |
1021 | 0 | return status; |
1022 | 0 | } |
1023 | | |
1024 | | // move the snapshot files in snapshot_path |
1025 | | // to tablet_path |
1026 | | // If overwrite, just replace the tablet_path with snapshot_path, |
1027 | | // else: (TODO) |
1028 | | // |
1029 | | // MUST hold tablet's header lock, push lock, cumulative lock and base compaction lock |
1030 | | Status SnapshotLoader::move(const std::string& snapshot_path, TabletSharedPtr tablet, |
1031 | 2 | bool overwrite) { |
1032 | | // Take a lock to protect the local snapshot path. |
1033 | 2 | auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(snapshot_path); |
1034 | | |
1035 | 2 | auto tablet_path = tablet->tablet_path(); |
1036 | 2 | auto store_path = tablet->data_dir()->path(); |
1037 | 2 | LOG(INFO) << "begin to move snapshot files. from: " << snapshot_path << ", to: " << tablet_path |
1038 | 2 | << ", store: " << store_path << ", job: " << _job_id << ", task id: " << _task_id; |
1039 | | |
1040 | 2 | Status status = Status::OK(); |
1041 | | |
1042 | | // validate snapshot_path and tablet_path |
1043 | 2 | int64_t snapshot_tablet_id = 0; |
1044 | 2 | int32_t snapshot_schema_hash = 0; |
1045 | 2 | RETURN_IF_ERROR(_get_tablet_id_and_schema_hash_from_file_path( |
1046 | 2 | snapshot_path, &snapshot_tablet_id, &snapshot_schema_hash)); |
1047 | | |
1048 | 2 | int64_t tablet_id = 0; |
1049 | 2 | int32_t schema_hash = 0; |
1050 | 2 | RETURN_IF_ERROR( |
1051 | 2 | _get_tablet_id_and_schema_hash_from_file_path(tablet_path, &tablet_id, &schema_hash)); |
1052 | | |
1053 | 2 | if (tablet_id != snapshot_tablet_id || schema_hash != snapshot_schema_hash) { |
1054 | 0 | std::stringstream ss; |
1055 | 0 | ss << "path does not match. snapshot: " << snapshot_path |
1056 | 0 | << ", tablet path: " << tablet_path; |
1057 | 0 | LOG(WARNING) << ss.str(); |
1058 | 0 | return Status::InternalError(ss.str()); |
1059 | 0 | } |
1060 | | |
1061 | 2 | DataDir* store = _engine.get_store(store_path); |
1062 | 2 | if (store == nullptr) { |
1063 | 0 | std::stringstream ss; |
1064 | 0 | ss << "failed to get store by path: " << store_path; |
1065 | 0 | LOG(WARNING) << ss.str(); |
1066 | 0 | return Status::InternalError(ss.str()); |
1067 | 0 | } |
1068 | | |
1069 | 2 | if (!std::filesystem::exists(tablet_path)) { |
1070 | 0 | std::stringstream ss; |
1071 | 0 | ss << "tablet path does not exist: " << tablet_path; |
1072 | 0 | LOG(WARNING) << ss.str(); |
1073 | 0 | return Status::InternalError(ss.str()); |
1074 | 0 | } |
1075 | | |
1076 | 2 | if (!std::filesystem::exists(snapshot_path)) { |
1077 | 0 | std::stringstream ss; |
1078 | 0 | ss << "snapshot path does not exist: " << snapshot_path; |
1079 | 0 | LOG(WARNING) << ss.str(); |
1080 | 0 | return Status::InternalError(ss.str()); |
1081 | 0 | } |
1082 | | |
1083 | 2 | std::string loaded_tag_path = get_loaded_tag_path(snapshot_path); |
1084 | 2 | bool already_loaded = false; |
1085 | 2 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(loaded_tag_path, &already_loaded)); |
1086 | 2 | if (already_loaded) { |
1087 | 1 | LOG(INFO) << "snapshot path already moved: " << snapshot_path; |
1088 | 1 | return Status::OK(); |
1089 | 1 | } |
1090 | | |
1091 | | // rename the rowset ids and tabletid info in rowset meta |
1092 | 1 | auto res = _engine.snapshot_mgr()->convert_rowset_ids(snapshot_path, tablet_id, |
1093 | 1 | tablet->replica_id(), tablet->table_id(), |
1094 | 1 | tablet->partition_id(), schema_hash); |
1095 | 1 | if (!res.has_value()) [[unlikely]] { |
1096 | 0 | auto err_msg = |
1097 | 0 | fmt::format("failed to convert rowsetids in snapshot: {}, tablet path: {}, err: {}", |
1098 | 0 | snapshot_path, tablet_path, res.error()); |
1099 | 0 | LOG(WARNING) << err_msg; |
1100 | 0 | return Status::InternalError(err_msg); |
1101 | 0 | } |
1102 | | |
1103 | 1 | if (!overwrite) { |
1104 | 0 | throw Exception(Status::FatalError("only support overwrite now")); |
1105 | 0 | } |
1106 | | |
1107 | | // Medium migration/clone/checkpoint/compaction may change or check the |
1108 | | // files and tablet meta, so we need to take these locks. |
1109 | 1 | std::unique_lock migration_lock(tablet->get_migration_lock(), std::try_to_lock); |
1110 | 1 | std::unique_lock base_compact_lock(tablet->get_base_compaction_lock(), std::try_to_lock); |
1111 | 1 | std::unique_lock cumu_compact_lock(tablet->get_cumulative_compaction_lock(), std::try_to_lock); |
1112 | 1 | std::unique_lock cold_compact_lock(tablet->get_cold_compaction_lock(), std::try_to_lock); |
1113 | 1 | std::unique_lock build_idx_lock(tablet->get_build_inverted_index_lock(), std::try_to_lock); |
1114 | 1 | std::unique_lock meta_store_lock(tablet->get_meta_store_lock(), std::try_to_lock); |
1115 | 1 | if (!migration_lock.owns_lock() || !base_compact_lock.owns_lock() || |
1116 | 1 | !cumu_compact_lock.owns_lock() || !cold_compact_lock.owns_lock() || |
1117 | 1 | !build_idx_lock.owns_lock() || !meta_store_lock.owns_lock()) { |
1118 | | // This error should be retryable |
1119 | 0 | auto status = Status::ObtainLockFailed("failed to get tablet locks, tablet: {}", tablet_id); |
1120 | 0 | LOG(WARNING) << status << ", snapshot path: " << snapshot_path |
1121 | 0 | << ", tablet path: " << tablet_path; |
1122 | 0 | return status; |
1123 | 0 | } |
1124 | | |
1125 | 1 | std::vector<std::string> snapshot_files; |
1126 | 1 | RETURN_IF_ERROR(_get_existing_files_from_local(snapshot_path, &snapshot_files)); |
1127 | | |
1128 | | // FIXME: the below logic will demage the tablet files if failed in the middle. |
1129 | | |
1130 | | // 1. simply delete the old dir and replace it with the snapshot dir |
1131 | 1 | try { |
1132 | | // This remove seems soft enough, because we already get |
1133 | | // tablet id and schema hash from this path, which |
1134 | | // means this path is a valid path. |
1135 | 1 | std::filesystem::remove_all(tablet_path); |
1136 | 1 | VLOG_CRITICAL << "remove dir: " << tablet_path; |
1137 | 1 | std::filesystem::create_directory(tablet_path); |
1138 | 1 | VLOG_CRITICAL << "re-create dir: " << tablet_path; |
1139 | 1 | } catch (const std::filesystem::filesystem_error& e) { |
1140 | 0 | std::stringstream ss; |
1141 | 0 | ss << "failed to move tablet path: " << tablet_path << ". err: " << e.what(); |
1142 | 0 | LOG(WARNING) << ss.str(); |
1143 | 0 | return Status::InternalError(ss.str()); |
1144 | 0 | } |
1145 | | |
1146 | | // link files one by one |
1147 | | // files in snapshot dir will be moved in snapshot clean process |
1148 | 1 | std::vector<std::string> linked_files; |
1149 | 2 | for (auto& file : snapshot_files) { |
1150 | 2 | auto full_src_path = fmt::format("{}/{}", snapshot_path, file); |
1151 | 2 | auto full_dest_path = fmt::format("{}/{}", tablet_path, file); |
1152 | 2 | if (link(full_src_path.c_str(), full_dest_path.c_str()) != 0) { |
1153 | 0 | LOG(WARNING) << "failed to link file from " << full_src_path << " to " << full_dest_path |
1154 | 0 | << ", err: " << std::strerror(errno); |
1155 | | |
1156 | | // clean the already linked files |
1157 | 0 | for (auto& linked_file : linked_files) { |
1158 | 0 | remove(linked_file.c_str()); |
1159 | 0 | } |
1160 | |
|
1161 | 0 | return Status::InternalError("move tablet failed"); |
1162 | 0 | } |
1163 | 2 | linked_files.push_back(full_dest_path); |
1164 | 2 | VLOG_CRITICAL << "link file from " << full_src_path << " to " << full_dest_path; |
1165 | 2 | } |
1166 | | |
1167 | | // snapshot loader not need to change tablet uid |
1168 | | // fixme: there is no header now and can not call load_one_tablet here |
1169 | | // reload header |
1170 | 1 | Status ost = _engine.tablet_manager()->load_tablet_from_dir(store, tablet_id, schema_hash, |
1171 | 1 | tablet_path, true); |
1172 | 1 | if (!ost.ok()) { |
1173 | 0 | std::stringstream ss; |
1174 | 0 | ss << "failed to reload header of tablet: " << tablet_id; |
1175 | 0 | LOG(WARNING) << ss.str(); |
1176 | 0 | return Status::InternalError(ss.str()); |
1177 | 0 | } |
1178 | | |
1179 | | // mark the snapshot path as loaded |
1180 | 1 | RETURN_IF_ERROR(write_loaded_tag(snapshot_path, tablet_id)); |
1181 | | |
1182 | 1 | LOG(INFO) << "finished to reload header of tablet: " << tablet_id; |
1183 | | |
1184 | 1 | return status; |
1185 | 1 | } |
1186 | | |
1187 | | Status SnapshotLoader::_replace_tablet_id(const std::string& file_name, int64_t tablet_id, |
1188 | 4 | std::string* new_file_name) { |
1189 | | // eg: |
1190 | | // 10007.hdr |
1191 | | // 10007_2_2_0_0.idx |
1192 | | // 10007_2_2_0_0.dat |
1193 | 4 | if (_end_with(file_name, ".hdr")) { |
1194 | 1 | std::stringstream ss; |
1195 | 1 | ss << tablet_id << ".hdr"; |
1196 | 1 | *new_file_name = ss.str(); |
1197 | 1 | return Status::OK(); |
1198 | 3 | } else if (_end_with(file_name, ".idx") || _end_with(file_name, ".dat")) { |
1199 | 2 | *new_file_name = file_name; |
1200 | 2 | return Status::OK(); |
1201 | 2 | } else { |
1202 | 1 | return Status::InternalError("invalid tablet file name: {}", file_name); |
1203 | 1 | } |
1204 | 4 | } |
1205 | | |
1206 | | Status SnapshotLoader::_get_tablet_id_and_schema_hash_from_file_path(const std::string& src_path, |
1207 | | int64_t* tablet_id, |
1208 | 6 | int32_t* schema_hash) { |
1209 | | // path should be like: /path/.../tablet_id/schema_hash |
1210 | | // we try to extract tablet_id from path |
1211 | 6 | size_t pos = src_path.find_last_of("/"); |
1212 | 6 | if (pos == std::string::npos || pos == src_path.length() - 1) { |
1213 | 1 | return Status::InternalError("failed to get tablet id from path: {}", src_path); |
1214 | 1 | } |
1215 | | |
1216 | 5 | std::string schema_hash_str = src_path.substr(pos + 1); |
1217 | 5 | std::stringstream ss1; |
1218 | 5 | ss1 << schema_hash_str; |
1219 | 5 | ss1 >> *schema_hash; |
1220 | | |
1221 | | // skip schema hash part |
1222 | 5 | size_t pos2 = src_path.find_last_of("/", pos - 1); |
1223 | 5 | if (pos2 == std::string::npos) { |
1224 | 0 | return Status::InternalError("failed to get tablet id from path: {}", src_path); |
1225 | 0 | } |
1226 | | |
1227 | 5 | std::string tablet_str = src_path.substr(pos2 + 1, pos - pos2); |
1228 | 5 | std::stringstream ss2; |
1229 | 5 | ss2 << tablet_str; |
1230 | 5 | ss2 >> *tablet_id; |
1231 | | |
1232 | 5 | VLOG_CRITICAL << "get tablet id " << *tablet_id << ", schema hash: " << *schema_hash |
1233 | 0 | << " from path: " << src_path; |
1234 | 5 | return Status::OK(); |
1235 | 5 | } |
1236 | | |
1237 | | Status SnapshotLoader::_check_local_snapshot_paths( |
1238 | 4 | const std::map<std::string, std::string>& src_to_dest_path, bool check_src) { |
1239 | 4 | bool res = true; |
1240 | 4 | for (const auto& pair : src_to_dest_path) { |
1241 | 4 | std::string path; |
1242 | 4 | if (check_src) { |
1243 | 2 | path = pair.first; |
1244 | 2 | } else { |
1245 | 2 | path = pair.second; |
1246 | 2 | } |
1247 | | |
1248 | 4 | RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(path, &res)); |
1249 | 2 | if (!res) { |
1250 | 0 | std::stringstream ss; |
1251 | 0 | ss << "snapshot path is not directory or does not exist: " << path; |
1252 | 0 | LOG(WARNING) << ss.str(); |
1253 | 0 | return Status::RuntimeError(ss.str()); |
1254 | 0 | } |
1255 | 2 | } |
1256 | 2 | LOG(INFO) << "all local snapshot paths are existing. num: " << src_to_dest_path.size(); |
1257 | 2 | return Status::OK(); |
1258 | 4 | } |
1259 | | |
1260 | | Status SnapshotLoader::_get_existing_files_from_local(const std::string& local_path, |
1261 | 2 | std::vector<std::string>* local_files) { |
1262 | 2 | bool exists = true; |
1263 | 2 | std::vector<io::FileInfo> files; |
1264 | 2 | RETURN_IF_ERROR(io::global_local_filesystem()->list(local_path, true, &files, &exists)); |
1265 | 2 | for (auto& file : files) { |
1266 | 2 | local_files->push_back(file.file_name); |
1267 | 2 | } |
1268 | 2 | LOG(INFO) << "finished to list files in local path: " << local_path |
1269 | 2 | << ", file num: " << local_files->size(); |
1270 | 2 | return Status::OK(); |
1271 | 2 | } |
1272 | | |
1273 | | Status SnapshotLoader::_get_tablet_id_from_remote_path(const std::string& remote_path, |
1274 | 1 | int64_t* tablet_id) { |
1275 | | // eg: |
1276 | | // bos://xxx/../__tbl_10004/__part_10003/__idx_10004/__10005 |
1277 | 1 | size_t pos = remote_path.find_last_of("_"); |
1278 | 1 | if (pos == std::string::npos) { |
1279 | 0 | return Status::InternalError("invalid remove file path: {}", remote_path); |
1280 | 0 | } |
1281 | | |
1282 | 1 | std::string tablet_id_str = remote_path.substr(pos + 1); |
1283 | 1 | std::stringstream ss; |
1284 | 1 | ss << tablet_id_str; |
1285 | 1 | ss >> *tablet_id; |
1286 | | |
1287 | 1 | return Status::OK(); |
1288 | 1 | } |
1289 | | |
1290 | | // only return CANCELLED if FE return that job is cancelled. |
1291 | | // otherwise, return OK |
1292 | | Status SnapshotLoader::_report_every(int report_threshold, int* counter, int32_t finished_num, |
1293 | 0 | int32_t total_num, TTaskType::type type) { |
1294 | 0 | ++*counter; |
1295 | 0 | if (*counter <= report_threshold) { |
1296 | 0 | return Status::OK(); |
1297 | 0 | } |
1298 | | |
1299 | 0 | LOG(INFO) << "report to frontend. job id: " << _job_id << ", task id: " << _task_id |
1300 | 0 | << ", finished num: " << finished_num << ", total num:" << total_num; |
1301 | |
|
1302 | 0 | TNetworkAddress master_addr = _env->cluster_info()->master_fe_addr; |
1303 | |
|
1304 | 0 | TSnapshotLoaderReportRequest request; |
1305 | 0 | request.job_id = _job_id; |
1306 | 0 | request.task_id = _task_id; |
1307 | 0 | request.task_type = type; |
1308 | 0 | request.__set_finished_num(finished_num); |
1309 | 0 | request.__set_total_num(total_num); |
1310 | 0 | TStatus report_st; |
1311 | |
|
1312 | 0 | Status rpcStatus = ThriftRpcHelper::rpc<FrontendServiceClient>( |
1313 | 0 | master_addr.hostname, master_addr.port, |
1314 | 0 | [&request, &report_st](FrontendServiceConnection& client) { |
1315 | 0 | client->snapshotLoaderReport(report_st, request); |
1316 | 0 | }, |
1317 | 0 | 10000); |
1318 | |
|
1319 | 0 | if (!rpcStatus.ok()) { |
1320 | | // rpc failed, ignore |
1321 | 0 | return Status::OK(); |
1322 | 0 | } |
1323 | | |
1324 | | // reset |
1325 | 0 | *counter = 0; |
1326 | 0 | if (report_st.status_code == TStatusCode::CANCELLED) { |
1327 | 0 | LOG(INFO) << "job is cancelled. job id: " << _job_id << ", task id: " << _task_id; |
1328 | 0 | return Status::Cancelled("Cancelled"); |
1329 | 0 | } |
1330 | 0 | return Status::OK(); |
1331 | 0 | } |
1332 | | |
1333 | | Status SnapshotLoader::_list_with_checksum(const std::string& dir, |
1334 | 0 | std::map<std::string, FileStat>* md5_files) { |
1335 | 0 | bool exists = true; |
1336 | 0 | std::vector<io::FileInfo> files; |
1337 | 0 | RETURN_IF_ERROR(_remote_fs->list(dir, true, &files, &exists)); |
1338 | 0 | for (auto& tmp_file : files) { |
1339 | 0 | io::Path path(tmp_file.file_name); |
1340 | 0 | std::string file_name = path.filename(); |
1341 | 0 | size_t pos = file_name.find_last_of("."); |
1342 | 0 | if (pos == std::string::npos || pos == file_name.size() - 1) { |
1343 | | // Not found checksum separator, ignore this file |
1344 | 0 | continue; |
1345 | 0 | } |
1346 | 0 | FileStat stat = {std::string(file_name, 0, pos), std::string(file_name, pos + 1), |
1347 | 0 | tmp_file.file_size}; |
1348 | 0 | md5_files->emplace(std::string(file_name, 0, pos), stat); |
1349 | 0 | } |
1350 | |
|
1351 | 0 | return Status::OK(); |
1352 | 0 | } |
1353 | | |
1354 | | } // end namespace doris |