/root/doris/be/src/runtime/snapshot_loader.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "runtime/snapshot_loader.h" |
19 | | |
20 | | // IWYU pragma: no_include <bthread/errno.h> |
21 | | #include <errno.h> // IWYU pragma: keep |
22 | | #include <fmt/format.h> |
23 | | #include <gen_cpp/FrontendService.h> |
24 | | #include <gen_cpp/FrontendService_types.h> |
25 | | #include <gen_cpp/HeartbeatService_types.h> |
26 | | #include <gen_cpp/PlanNodes_types.h> |
27 | | #include <gen_cpp/Status_types.h> |
28 | | #include <gen_cpp/Types_types.h> |
29 | | #include <stdint.h> |
30 | | #include <stdio.h> |
31 | | #include <unistd.h> |
32 | | |
33 | | #include <algorithm> |
34 | | #include <cstring> |
35 | | #include <filesystem> |
36 | | #include <istream> |
37 | | #include <unordered_map> |
38 | | #include <utility> |
39 | | |
40 | | #include "common/logging.h" |
41 | | #include "gutil/strings/split.h" |
42 | | #include "http/http_client.h" |
43 | | #include "io/fs/broker_file_system.h" |
44 | | #include "io/fs/file_system.h" |
45 | | #include "io/fs/hdfs_file_system.h" |
46 | | #include "io/fs/local_file_system.h" |
47 | | #include "io/fs/path.h" |
48 | | #include "io/fs/remote_file_system.h" |
49 | | #include "io/fs/s3_file_system.h" |
50 | | #include "io/hdfs_builder.h" |
51 | | #include "olap/data_dir.h" |
52 | | #include "olap/snapshot_manager.h" |
53 | | #include "olap/storage_engine.h" |
54 | | #include "olap/tablet.h" |
55 | | #include "olap/tablet_manager.h" |
56 | | #include "runtime/client_cache.h" |
57 | | #include "runtime/exec_env.h" |
58 | | #include "util/s3_uri.h" |
59 | | #include "util/s3_util.h" |
60 | | #include "util/thrift_rpc_helper.h" |
61 | | |
62 | | namespace doris { |
63 | | namespace { |
64 | | |
65 | | Status upload_with_checksum(io::RemoteFileSystem& fs, std::string_view local_path, |
66 | 0 | std::string_view remote_path, std::string_view checksum) { |
67 | 0 | auto full_remote_path = fmt::format("{}.{}", remote_path, checksum); |
68 | 0 | switch (fs.type()) { |
69 | 0 | case io::FileSystemType::HDFS: |
70 | 0 | case io::FileSystemType::BROKER: { |
71 | 0 | std::string temp = fmt::format("{}.part", remote_path); |
72 | 0 | RETURN_IF_ERROR(fs.upload(local_path, temp)); |
73 | 0 | RETURN_IF_ERROR(fs.rename(temp, full_remote_path)); |
74 | 0 | break; |
75 | 0 | } |
76 | 0 | case io::FileSystemType::S3: |
77 | 0 | RETURN_IF_ERROR(fs.upload(local_path, full_remote_path)); |
78 | 0 | break; |
79 | 0 | default: |
80 | 0 | LOG(FATAL) << "unknown fs type: " << static_cast<int>(fs.type()); |
81 | 0 | } |
82 | 0 | return Status::OK(); |
83 | 0 | } |
84 | | |
85 | | } // namespace |
86 | | |
87 | | SnapshotLoader::SnapshotLoader(ExecEnv* env, int64_t job_id, int64_t task_id) |
88 | | : _env(env), |
89 | | _job_id(job_id), |
90 | | _task_id(task_id), |
91 | | _broker_addr(TNetworkAddress()), |
92 | | _prop(std::map<std::string, std::string>()), |
93 | 1 | _remote_fs(nullptr) {} |
94 | | |
95 | | SnapshotLoader::SnapshotLoader(ExecEnv* env, int64_t job_id, int64_t task_id, |
96 | | const TNetworkAddress& broker_addr, |
97 | | const std::map<std::string, std::string>& prop) |
98 | 0 | : _env(env), _job_id(job_id), _task_id(task_id), _broker_addr(broker_addr), _prop(prop) {} |
99 | | |
100 | 0 | Status SnapshotLoader::init(TStorageBackendType::type type, const std::string& location) { |
101 | 0 | if (TStorageBackendType::type::S3 == type) { |
102 | 0 | S3Conf s3_conf; |
103 | 0 | S3URI s3_uri(location); |
104 | 0 | RETURN_IF_ERROR(s3_uri.parse()); |
105 | 0 | RETURN_IF_ERROR(S3ClientFactory::convert_properties_to_s3_conf(_prop, s3_uri, &s3_conf)); |
106 | 0 | std::shared_ptr<io::S3FileSystem> fs; |
107 | 0 | RETURN_IF_ERROR(io::S3FileSystem::create(std::move(s3_conf), "", nullptr, &fs)); |
108 | 0 | _remote_fs = std::move(fs); |
109 | 0 | } else if (TStorageBackendType::type::HDFS == type) { |
110 | 0 | THdfsParams hdfs_params = parse_properties(_prop); |
111 | 0 | std::shared_ptr<io::HdfsFileSystem> fs; |
112 | 0 | RETURN_IF_ERROR( |
113 | 0 | io::HdfsFileSystem::create(hdfs_params, "", hdfs_params.fs_name, nullptr, &fs)); |
114 | 0 | _remote_fs = std::move(fs); |
115 | 0 | } else if (TStorageBackendType::type::BROKER == type) { |
116 | 0 | std::shared_ptr<io::BrokerFileSystem> fs; |
117 | 0 | RETURN_IF_ERROR(io::BrokerFileSystem::create(_broker_addr, _prop, &fs)); |
118 | 0 | _remote_fs = std::move(fs); |
119 | 0 | } else { |
120 | 0 | return Status::InternalError("Unknown storage type: {}", type); |
121 | 0 | } |
122 | 0 | return Status::OK(); |
123 | 0 | } |
124 | | |
125 | 1 | SnapshotLoader::~SnapshotLoader() = default; |
126 | | |
127 | | Status SnapshotLoader::upload(const std::map<std::string, std::string>& src_to_dest_path, |
128 | 0 | std::map<int64_t, std::vector<std::string>>* tablet_files) { |
129 | 0 | if (!_remote_fs) { |
130 | 0 | return Status::InternalError("Storage backend not initialized."); |
131 | 0 | } |
132 | 0 | LOG(INFO) << "begin to upload snapshot files. num: " << src_to_dest_path.size() |
133 | 0 | << ", broker addr: " << _broker_addr << ", job: " << _job_id << ", task" << _task_id; |
134 | | |
135 | | // check if job has already been cancelled |
136 | 0 | int tmp_counter = 1; |
137 | 0 | RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::UPLOAD)); |
138 | | |
139 | 0 | Status status = Status::OK(); |
140 | | // 1. validate local tablet snapshot paths |
141 | 0 | RETURN_IF_ERROR(_check_local_snapshot_paths(src_to_dest_path, true)); |
142 | | |
143 | | // 2. for each src path, upload it to remote storage |
144 | | // we report to frontend for every 10 files, and we will cancel the job if |
145 | | // the job has already been cancelled in frontend. |
146 | 0 | int report_counter = 0; |
147 | 0 | int total_num = src_to_dest_path.size(); |
148 | 0 | int finished_num = 0; |
149 | 0 | for (auto iter = src_to_dest_path.begin(); iter != src_to_dest_path.end(); iter++) { |
150 | 0 | const std::string& src_path = iter->first; |
151 | 0 | const std::string& dest_path = iter->second; |
152 | |
|
153 | 0 | int64_t tablet_id = 0; |
154 | 0 | int32_t schema_hash = 0; |
155 | 0 | RETURN_IF_ERROR( |
156 | 0 | _get_tablet_id_and_schema_hash_from_file_path(src_path, &tablet_id, &schema_hash)); |
157 | | |
158 | | // 2.1 get existing files from remote path |
159 | 0 | std::map<std::string, FileStat> remote_files; |
160 | 0 | RETURN_IF_ERROR(_list_with_checksum(dest_path, &remote_files)); |
161 | | |
162 | 0 | for (auto& tmp : remote_files) { |
163 | 0 | VLOG_CRITICAL << "get remote file: " << tmp.first << ", checksum: " << tmp.second.md5; |
164 | 0 | } |
165 | | |
166 | | // 2.2 list local files |
167 | 0 | std::vector<std::string> local_files; |
168 | 0 | std::vector<std::string> local_files_with_checksum; |
169 | 0 | RETURN_IF_ERROR(_get_existing_files_from_local(src_path, &local_files)); |
170 | | |
171 | | // 2.3 iterate local files |
172 | 0 | for (auto it = local_files.begin(); it != local_files.end(); it++) { |
173 | 0 | RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num, |
174 | 0 | TTaskType::type::UPLOAD)); |
175 | | |
176 | 0 | const std::string& local_file = *it; |
177 | | // calc md5sum of localfile |
178 | 0 | std::string md5sum; |
179 | 0 | RETURN_IF_ERROR( |
180 | 0 | io::global_local_filesystem()->md5sum(src_path + "/" + local_file, &md5sum)); |
181 | 0 | VLOG_CRITICAL << "get file checksum: " << local_file << ": " << md5sum; |
182 | 0 | local_files_with_checksum.push_back(local_file + "." + md5sum); |
183 | | |
184 | | // check if this local file need upload |
185 | 0 | bool need_upload = false; |
186 | 0 | auto find = remote_files.find(local_file); |
187 | 0 | if (find != remote_files.end()) { |
188 | 0 | if (md5sum != find->second.md5) { |
189 | | // remote storage file exist, but with different checksum |
190 | 0 | LOG(WARNING) << "remote file checksum is invalid. remote: " << find->first |
191 | 0 | << ", local: " << md5sum; |
192 | | // TODO(cmy): save these files and delete them later |
193 | 0 | need_upload = true; |
194 | 0 | } |
195 | 0 | } else { |
196 | 0 | need_upload = true; |
197 | 0 | } |
198 | |
|
199 | 0 | if (!need_upload) { |
200 | 0 | VLOG_CRITICAL << "file exist in remote path, no need to upload: " << local_file; |
201 | 0 | continue; |
202 | 0 | } |
203 | | |
204 | | // upload |
205 | 0 | std::string remote_path = dest_path + '/' + local_file; |
206 | 0 | std::string local_path = src_path + '/' + local_file; |
207 | 0 | RETURN_IF_ERROR(upload_with_checksum(*_remote_fs, local_path, remote_path, md5sum)); |
208 | 0 | } // end for each tablet's local files |
209 | | |
210 | 0 | tablet_files->emplace(tablet_id, local_files_with_checksum); |
211 | 0 | finished_num++; |
212 | 0 | LOG(INFO) << "finished to write tablet to remote. local path: " << src_path |
213 | 0 | << ", remote path: " << dest_path; |
214 | 0 | } // end for each tablet path |
215 | | |
216 | 0 | LOG(INFO) << "finished to upload snapshots. job: " << _job_id << ", task id: " << _task_id; |
217 | 0 | return status; |
218 | 0 | } |
219 | | |
220 | | /* |
221 | | * Download snapshot files from remote. |
222 | | * After downloaded, the local dir should contains all files existing in remote, |
223 | | * may also contains several useless files. |
224 | | */ |
225 | | Status SnapshotLoader::download(const std::map<std::string, std::string>& src_to_dest_path, |
226 | 0 | std::vector<int64_t>* downloaded_tablet_ids) { |
227 | 0 | if (!_remote_fs) { |
228 | 0 | return Status::InternalError("Storage backend not initialized."); |
229 | 0 | } |
230 | 0 | LOG(INFO) << "begin to download snapshot files. num: " << src_to_dest_path.size() |
231 | 0 | << ", broker addr: " << _broker_addr << ", job: " << _job_id |
232 | 0 | << ", task id: " << _task_id; |
233 | | |
234 | | // check if job has already been cancelled |
235 | 0 | int tmp_counter = 1; |
236 | 0 | RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::DOWNLOAD)); |
237 | | |
238 | 0 | Status status = Status::OK(); |
239 | | // 1. validate local tablet snapshot paths |
240 | 0 | RETURN_IF_ERROR(_check_local_snapshot_paths(src_to_dest_path, false)); |
241 | | |
242 | | // 2. for each src path, download it to local storage |
243 | 0 | int report_counter = 0; |
244 | 0 | int total_num = src_to_dest_path.size(); |
245 | 0 | int finished_num = 0; |
246 | 0 | for (auto iter = src_to_dest_path.begin(); iter != src_to_dest_path.end(); iter++) { |
247 | 0 | const std::string& remote_path = iter->first; |
248 | 0 | const std::string& local_path = iter->second; |
249 | |
|
250 | 0 | int64_t local_tablet_id = 0; |
251 | 0 | int32_t schema_hash = 0; |
252 | 0 | RETURN_IF_ERROR(_get_tablet_id_and_schema_hash_from_file_path(local_path, &local_tablet_id, |
253 | 0 | &schema_hash)); |
254 | 0 | downloaded_tablet_ids->push_back(local_tablet_id); |
255 | |
|
256 | 0 | int64_t remote_tablet_id; |
257 | 0 | RETURN_IF_ERROR(_get_tablet_id_from_remote_path(remote_path, &remote_tablet_id)); |
258 | 0 | VLOG_CRITICAL << "get local tablet id: " << local_tablet_id |
259 | 0 | << ", schema hash: " << schema_hash |
260 | 0 | << ", remote tablet id: " << remote_tablet_id; |
261 | | |
262 | | // 2.1. get local files |
263 | 0 | std::vector<std::string> local_files; |
264 | 0 | RETURN_IF_ERROR(_get_existing_files_from_local(local_path, &local_files)); |
265 | | |
266 | | // 2.2. get remote files |
267 | 0 | std::map<std::string, FileStat> remote_files; |
268 | 0 | RETURN_IF_ERROR(_list_with_checksum(remote_path, &remote_files)); |
269 | 0 | if (remote_files.empty()) { |
270 | 0 | std::stringstream ss; |
271 | 0 | ss << "get nothing from remote path: " << remote_path; |
272 | 0 | LOG(WARNING) << ss.str(); |
273 | 0 | return Status::InternalError(ss.str()); |
274 | 0 | } |
275 | | |
276 | 0 | TabletSharedPtr tablet = |
277 | 0 | StorageEngine::instance()->tablet_manager()->get_tablet(local_tablet_id); |
278 | 0 | if (tablet == nullptr) { |
279 | 0 | std::stringstream ss; |
280 | 0 | ss << "failed to get local tablet: " << local_tablet_id; |
281 | 0 | LOG(WARNING) << ss.str(); |
282 | 0 | return Status::InternalError(ss.str()); |
283 | 0 | } |
284 | 0 | DataDir* data_dir = tablet->data_dir(); |
285 | |
|
286 | 0 | for (auto& iter : remote_files) { |
287 | 0 | RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num, |
288 | 0 | TTaskType::type::DOWNLOAD)); |
289 | | |
290 | 0 | bool need_download = false; |
291 | 0 | const std::string& remote_file = iter.first; |
292 | 0 | const FileStat& file_stat = iter.second; |
293 | 0 | auto find = std::find(local_files.begin(), local_files.end(), remote_file); |
294 | 0 | if (find == local_files.end()) { |
295 | | // remote file does not exist in local, download it |
296 | 0 | need_download = true; |
297 | 0 | } else { |
298 | 0 | if (_end_with(remote_file, ".hdr")) { |
299 | | // this is a header file, download it. |
300 | 0 | need_download = true; |
301 | 0 | } else { |
302 | | // check checksum |
303 | 0 | std::string local_md5sum; |
304 | 0 | Status st = io::global_local_filesystem()->md5sum( |
305 | 0 | local_path + "/" + remote_file, &local_md5sum); |
306 | 0 | if (!st.ok()) { |
307 | 0 | LOG(WARNING) << "failed to get md5sum of local file: " << remote_file |
308 | 0 | << ". msg: " << st << ". download it"; |
309 | 0 | need_download = true; |
310 | 0 | } else { |
311 | 0 | VLOG_CRITICAL << "get local file checksum: " << remote_file << ": " |
312 | 0 | << local_md5sum; |
313 | 0 | if (file_stat.md5 != local_md5sum) { |
314 | | // file's checksum does not equal, download it. |
315 | 0 | need_download = true; |
316 | 0 | } |
317 | 0 | } |
318 | 0 | } |
319 | 0 | } |
320 | |
|
321 | 0 | if (!need_download) { |
322 | 0 | LOG(INFO) << "remote file already exist in local, no need to download." |
323 | 0 | << ", file: " << remote_file; |
324 | 0 | continue; |
325 | 0 | } |
326 | | |
327 | | // begin to download |
328 | 0 | std::string full_remote_file = remote_path + "/" + remote_file + "." + file_stat.md5; |
329 | 0 | std::string local_file_name; |
330 | | // we need to replace the tablet_id in remote file name with local tablet id |
331 | 0 | RETURN_IF_ERROR(_replace_tablet_id(remote_file, local_tablet_id, &local_file_name)); |
332 | 0 | std::string full_local_file = local_path + "/" + local_file_name; |
333 | 0 | LOG(INFO) << "begin to download from " << full_remote_file << " to " << full_local_file; |
334 | 0 | size_t file_len = file_stat.size; |
335 | | |
336 | | // check disk capacity |
337 | 0 | if (data_dir->reach_capacity_limit(file_len)) { |
338 | 0 | return Status::Error<ErrorCode::EXCEEDED_LIMIT>( |
339 | 0 | "reach the capacity limit of path {}, file_size={}", data_dir->path(), |
340 | 0 | file_len); |
341 | 0 | } |
342 | | // remove file which will be downloaded now. |
343 | | // this file will be added to local_files if it be downloaded successfully. |
344 | 0 | if (find != local_files.end()) { |
345 | 0 | local_files.erase(find); |
346 | 0 | } |
347 | 0 | RETURN_IF_ERROR(_remote_fs->download(full_remote_file, full_local_file)); |
348 | | |
349 | | // 3. check md5 of the downloaded file |
350 | 0 | std::string downloaded_md5sum; |
351 | 0 | RETURN_IF_ERROR( |
352 | 0 | io::global_local_filesystem()->md5sum(full_local_file, &downloaded_md5sum)); |
353 | 0 | VLOG_CRITICAL << "get downloaded file checksum: " << full_local_file << ": " |
354 | 0 | << downloaded_md5sum; |
355 | 0 | if (downloaded_md5sum != file_stat.md5) { |
356 | 0 | std::stringstream ss; |
357 | 0 | ss << "invalid md5 of downloaded file: " << full_local_file |
358 | 0 | << ", expected: " << file_stat.md5 << ", get: " << downloaded_md5sum; |
359 | 0 | LOG(WARNING) << ss.str(); |
360 | 0 | return Status::InternalError(ss.str()); |
361 | 0 | } |
362 | | |
363 | | // local_files always keep the updated local files |
364 | 0 | local_files.push_back(local_file_name); |
365 | 0 | LOG(INFO) << "finished to download file via broker. file: " << full_local_file |
366 | 0 | << ", length: " << file_len; |
367 | 0 | } // end for all remote files |
368 | | |
369 | | // finally, delete local files which are not in remote |
370 | 0 | for (const auto& local_file : local_files) { |
371 | | // replace the tablet id in local file name with the remote tablet id, |
372 | | // in order to compare the file name. |
373 | 0 | std::string new_name; |
374 | 0 | Status st = _replace_tablet_id(local_file, remote_tablet_id, &new_name); |
375 | 0 | if (!st.ok()) { |
376 | 0 | LOG(WARNING) << "failed to replace tablet id. unknown local file: " << st |
377 | 0 | << ". ignore it"; |
378 | 0 | continue; |
379 | 0 | } |
380 | 0 | VLOG_CRITICAL << "new file name after replace tablet id: " << new_name; |
381 | 0 | const auto& find = remote_files.find(new_name); |
382 | 0 | if (find != remote_files.end()) { |
383 | 0 | continue; |
384 | 0 | } |
385 | | |
386 | | // delete |
387 | 0 | std::string full_local_file = local_path + "/" + local_file; |
388 | 0 | VLOG_CRITICAL << "begin to delete local snapshot file: " << full_local_file |
389 | 0 | << ", it does not exist in remote"; |
390 | 0 | if (remove(full_local_file.c_str()) != 0) { |
391 | 0 | LOG(WARNING) << "failed to delete unknown local file: " << full_local_file |
392 | 0 | << ", ignore it"; |
393 | 0 | } |
394 | 0 | } |
395 | |
|
396 | 0 | finished_num++; |
397 | 0 | } // end for src_to_dest_path |
398 | | |
399 | 0 | LOG(INFO) << "finished to download snapshots. job: " << _job_id << ", task id: " << _task_id; |
400 | 0 | return status; |
401 | 0 | } |
402 | | |
403 | | Status SnapshotLoader::remote_http_download( |
404 | | const std::vector<TRemoteTabletSnapshot>& remote_tablet_snapshots, |
405 | 0 | std::vector<int64_t>* downloaded_tablet_ids) { |
406 | 0 | LOG(INFO) << fmt::format("begin to download snapshots via http. job: {}, task id: {}", _job_id, |
407 | 0 | _task_id); |
408 | 0 | constexpr uint32_t kListRemoteFileTimeout = 15; |
409 | 0 | constexpr uint32_t kDownloadFileMaxRetry = 3; |
410 | 0 | constexpr uint32_t kGetLengthTimeout = 10; |
411 | | |
412 | | // check if job has already been cancelled |
413 | 0 | int tmp_counter = 1; |
414 | 0 | RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::DOWNLOAD)); |
415 | 0 | Status status = Status::OK(); |
416 | | |
417 | | // Step before, validate all remote |
418 | | |
419 | | // Step 1: Validate local tablet snapshot paths |
420 | 0 | for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) { |
421 | 0 | const auto& path = remote_tablet_snapshot.local_snapshot_path; |
422 | 0 | bool res = true; |
423 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(path, &res)); |
424 | 0 | if (!res) { |
425 | 0 | std::stringstream ss; |
426 | 0 | auto err_msg = |
427 | 0 | fmt::format("snapshot path is not directory or does not exist: {}", path); |
428 | 0 | LOG(WARNING) << err_msg; |
429 | 0 | return Status::RuntimeError(err_msg); |
430 | 0 | } |
431 | 0 | } |
432 | | |
433 | | // Step 2: get all local files |
434 | 0 | struct LocalFileStat { |
435 | 0 | uint64_t size; |
436 | 0 | std::string md5; |
437 | 0 | }; |
438 | 0 | std::unordered_map<std::string, std::unordered_map<std::string, LocalFileStat>> local_files_map; |
439 | 0 | for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) { |
440 | 0 | const auto& local_path = remote_tablet_snapshot.local_snapshot_path; |
441 | 0 | std::vector<std::string> local_files; |
442 | 0 | RETURN_IF_ERROR(_get_existing_files_from_local(local_path, &local_files)); |
443 | | |
444 | 0 | auto& local_filestat = local_files_map[local_path]; |
445 | 0 | for (auto& local_file : local_files) { |
446 | | // add file size |
447 | 0 | std::string local_file_path = local_path + "/" + local_file; |
448 | 0 | std::error_code ec; |
449 | 0 | uint64_t local_file_size = std::filesystem::file_size(local_file_path, ec); |
450 | 0 | if (ec) { |
451 | 0 | LOG(WARNING) << "download file error" << ec.message(); |
452 | 0 | return Status::IOError("can't retrive file_size of {}, due to {}", local_file_path, |
453 | 0 | ec.message()); |
454 | 0 | } |
455 | 0 | std::string md5; |
456 | 0 | auto status = io::global_local_filesystem()->md5sum(local_file_path, &md5); |
457 | 0 | if (!status.ok()) { |
458 | 0 | LOG(WARNING) << "download file error, local file " << local_file_path |
459 | 0 | << " md5sum: " << status.to_string(); |
460 | 0 | return status; |
461 | 0 | } |
462 | 0 | local_filestat[local_file] = {local_file_size, md5}; |
463 | 0 | } |
464 | 0 | } |
465 | | |
466 | | // Step 3: Validate remote tablet snapshot paths && remote files map |
467 | | // key is remote snapshot paths, value is filelist |
468 | | // get all these use http download action |
469 | | // http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180//2774718/217609978/2774718.hdr |
470 | 0 | int report_counter = 0; |
471 | 0 | int total_num = remote_tablet_snapshots.size(); |
472 | 0 | int finished_num = 0; |
473 | 0 | struct RemoteFileStat { |
474 | 0 | std::string url; |
475 | 0 | std::string md5; |
476 | 0 | uint64_t size; |
477 | 0 | }; |
478 | 0 | std::unordered_map<std::string, std::unordered_map<std::string, RemoteFileStat>> |
479 | 0 | remote_files_map; |
480 | 0 | for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) { |
481 | 0 | const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path; |
482 | 0 | auto& remote_files = remote_files_map[remote_path]; |
483 | 0 | const auto& token = remote_tablet_snapshot.remote_token; |
484 | 0 | const auto& remote_be_addr = remote_tablet_snapshot.remote_be_addr; |
485 | | |
486 | | // HEAD http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180/ |
487 | 0 | std::string base_url = fmt::format("http://{}:{}/api/_tablet/_download?token={}", |
488 | 0 | remote_be_addr.hostname, remote_be_addr.port, token); |
489 | 0 | std::string remote_url_prefix = fmt::format("{}&file={}", base_url, remote_path); |
490 | |
|
491 | 0 | string file_list_str; |
492 | 0 | auto list_files_cb = [&remote_url_prefix, &file_list_str](HttpClient* client) { |
493 | 0 | RETURN_IF_ERROR(client->init(remote_url_prefix)); |
494 | 0 | client->set_timeout_ms(kListRemoteFileTimeout * 1000); |
495 | 0 | return client->execute(&file_list_str); |
496 | 0 | }; |
497 | 0 | RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, list_files_cb)); |
498 | 0 | std::vector<string> filename_list = |
499 | 0 | strings::Split(file_list_str, "\n", strings::SkipWhitespace()); |
500 | |
|
501 | 0 | for (const auto& filename : filename_list) { |
502 | 0 | std::string remote_file_url = |
503 | 0 | fmt::format("{}&file={}/{}&channel=ingest_binlog", base_url, |
504 | 0 | remote_tablet_snapshot.remote_snapshot_path, filename); |
505 | | |
506 | | // get file length |
507 | 0 | uint64_t file_size = 0; |
508 | 0 | std::string file_md5; |
509 | 0 | auto get_file_stat_cb = [&remote_file_url, &file_size, &file_md5](HttpClient* client) { |
510 | 0 | std::string url = fmt::format("{}&acquire_md5=true", remote_file_url); |
511 | 0 | RETURN_IF_ERROR(client->init(url)); |
512 | 0 | client->set_timeout_ms(kGetLengthTimeout * 1000); |
513 | 0 | RETURN_IF_ERROR(client->head()); |
514 | 0 | RETURN_IF_ERROR(client->get_content_length(&file_size)); |
515 | 0 | RETURN_IF_ERROR(client->get_content_md5(&file_md5)); |
516 | 0 | return Status::OK(); |
517 | 0 | }; |
518 | 0 | RETURN_IF_ERROR( |
519 | 0 | HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, get_file_stat_cb)); |
520 | | |
521 | 0 | remote_files[filename] = RemoteFileStat {remote_file_url, file_md5, file_size}; |
522 | 0 | } |
523 | 0 | } |
524 | | |
525 | | // Step 4: Compare local and remote files && get all need download files |
526 | 0 | for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) { |
527 | 0 | RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num, |
528 | 0 | TTaskType::type::DOWNLOAD)); |
529 | | |
530 | 0 | const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path; |
531 | 0 | const auto& local_path = remote_tablet_snapshot.local_snapshot_path; |
532 | 0 | auto& remote_files = remote_files_map[remote_path]; |
533 | 0 | auto& local_files = local_files_map[local_path]; |
534 | 0 | auto remote_tablet_id = remote_tablet_snapshot.remote_tablet_id; |
535 | | |
536 | | // get all need download files |
537 | 0 | std::vector<std::string> need_download_files; |
538 | 0 | for (const auto& [remote_file, remote_filestat] : remote_files) { |
539 | 0 | LOG(INFO) << "remote file: " << remote_file << ", size: " << remote_filestat.size |
540 | 0 | << ", md5: " << remote_filestat.md5; |
541 | 0 | auto it = local_files.find(remote_file); |
542 | 0 | if (it == local_files.end()) { |
543 | 0 | need_download_files.emplace_back(remote_file); |
544 | 0 | continue; |
545 | 0 | } |
546 | 0 | if (_end_with(remote_file, ".hdr")) { |
547 | 0 | need_download_files.emplace_back(remote_file); |
548 | 0 | continue; |
549 | 0 | } |
550 | | |
551 | 0 | if (auto& local_filestat = it->second; local_filestat.size != remote_filestat.size) { |
552 | 0 | need_download_files.emplace_back(remote_file); |
553 | 0 | continue; |
554 | 0 | } |
555 | | |
556 | 0 | if (auto& local_filestat = it->second; local_filestat.md5 != remote_filestat.md5) { |
557 | 0 | need_download_files.emplace_back(remote_file); |
558 | 0 | continue; |
559 | 0 | } |
560 | | |
561 | 0 | LOG(INFO) << fmt::format("file {} already exists, skip download", remote_file); |
562 | 0 | } |
563 | |
|
564 | 0 | auto local_tablet_id = remote_tablet_snapshot.local_tablet_id; |
565 | 0 | TabletSharedPtr tablet = |
566 | 0 | StorageEngine::instance()->tablet_manager()->get_tablet(local_tablet_id); |
567 | 0 | if (tablet == nullptr) { |
568 | 0 | std::stringstream ss; |
569 | 0 | ss << "failed to get local tablet: " << local_tablet_id; |
570 | 0 | LOG(WARNING) << ss.str(); |
571 | 0 | return Status::InternalError(ss.str()); |
572 | 0 | } |
573 | 0 | DataDir* data_dir = tablet->data_dir(); |
574 | | |
575 | | // download all need download files |
576 | 0 | uint64_t total_file_size = 0; |
577 | 0 | MonotonicStopWatch watch; |
578 | 0 | watch.start(); |
579 | 0 | for (auto& filename : need_download_files) { |
580 | 0 | auto& remote_filestat = remote_files[filename]; |
581 | 0 | auto file_size = remote_filestat.size; |
582 | 0 | auto& remote_file_url = remote_filestat.url; |
583 | 0 | auto& remote_file_md5 = remote_filestat.md5; |
584 | | |
585 | | // check disk capacity |
586 | 0 | if (data_dir->reach_capacity_limit(file_size)) { |
587 | 0 | return Status::Error<ErrorCode::EXCEEDED_LIMIT>( |
588 | 0 | "reach the capacity limit of path {}, file_size={}", data_dir->path(), |
589 | 0 | file_size); |
590 | 0 | } |
591 | | |
592 | 0 | total_file_size += file_size; |
593 | 0 | uint64_t estimate_timeout = file_size / config::download_low_speed_limit_kbps / 1024; |
594 | 0 | if (estimate_timeout < config::download_low_speed_time) { |
595 | 0 | estimate_timeout = config::download_low_speed_time; |
596 | 0 | } |
597 | |
|
598 | 0 | std::string local_filename; |
599 | 0 | RETURN_IF_ERROR(_replace_tablet_id(filename, local_tablet_id, &local_filename)); |
600 | 0 | std::string local_file_path = local_path + "/" + local_filename; |
601 | |
|
602 | 0 | LOG(INFO) << "clone begin to download file from: " << remote_file_url |
603 | 0 | << " to: " << local_file_path << ". size(B): " << file_size |
604 | 0 | << ", timeout(s): " << estimate_timeout; |
605 | |
|
606 | 0 | auto download_cb = [&remote_file_url, &remote_file_md5, estimate_timeout, |
607 | 0 | &local_file_path, file_size](HttpClient* client) { |
608 | 0 | RETURN_IF_ERROR(client->init(remote_file_url)); |
609 | 0 | client->set_timeout_ms(estimate_timeout * 1000); |
610 | 0 | RETURN_IF_ERROR(client->download(local_file_path)); |
611 | | |
612 | 0 | std::error_code ec; |
613 | | // Check file length |
614 | 0 | uint64_t local_file_size = std::filesystem::file_size(local_file_path, ec); |
615 | 0 | if (ec) { |
616 | 0 | LOG(WARNING) << "download file error" << ec.message(); |
617 | 0 | return Status::IOError("can't retrive file_size of {}, due to {}", |
618 | 0 | local_file_path, ec.message()); |
619 | 0 | } |
620 | 0 | if (local_file_size != file_size) { |
621 | 0 | LOG(WARNING) << "download file length error" |
622 | 0 | << ", remote_path=" << remote_file_url |
623 | 0 | << ", file_size=" << file_size |
624 | 0 | << ", local_file_size=" << local_file_size; |
625 | 0 | return Status::InternalError("downloaded file size is not equal"); |
626 | 0 | } |
627 | | |
628 | 0 | if (!remote_file_md5.empty()) { // keep compatibility |
629 | 0 | std::string local_file_md5; |
630 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(local_file_path, |
631 | 0 | &local_file_md5)); |
632 | 0 | if (local_file_md5 != remote_file_md5) { |
633 | 0 | LOG(WARNING) << "download file md5 error" |
634 | 0 | << ", remote_file_url=" << remote_file_url |
635 | 0 | << ", local_file_path=" << local_file_path |
636 | 0 | << ", remote_file_md5=" << remote_file_md5 |
637 | 0 | << ", local_file_md5=" << local_file_md5; |
638 | 0 | return Status::RuntimeError( |
639 | 0 | "download file {} md5 is not equal, local={}, remote={}", |
640 | 0 | remote_file_url, local_file_md5, remote_file_md5); |
641 | 0 | } |
642 | 0 | } |
643 | | |
644 | 0 | return io::global_local_filesystem()->permission( |
645 | 0 | local_file_path, io::LocalFileSystem::PERMS_OWNER_RW); |
646 | 0 | }; |
647 | 0 | auto status = HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, download_cb); |
648 | 0 | if (!status.ok()) { |
649 | 0 | LOG(WARNING) << "failed to download file from " << remote_file_url |
650 | 0 | << ", status: " << status.to_string(); |
651 | 0 | return status; |
652 | 0 | } |
653 | | |
654 | | // local_files always keep the updated local files |
655 | 0 | local_files[filename] = LocalFileStat {file_size, remote_file_md5}; |
656 | 0 | } |
657 | | |
658 | 0 | uint64_t total_time_ms = watch.elapsed_time() / 1000 / 1000; |
659 | 0 | total_time_ms = total_time_ms > 0 ? total_time_ms : 0; |
660 | 0 | double copy_rate = 0.0; |
661 | 0 | if (total_time_ms > 0) { |
662 | 0 | copy_rate = total_file_size / ((double)total_time_ms) / 1000; |
663 | 0 | } |
664 | 0 | LOG(INFO) << fmt::format( |
665 | 0 | "succeed to copy remote tablet {} to local tablet {}, total file size: {} B, cost: " |
666 | 0 | "{} ms, rate: {} MB/s", |
667 | 0 | remote_tablet_id, local_tablet_id, total_file_size, total_time_ms, copy_rate); |
668 | | |
669 | | // local_files: contain all remote files and local files |
670 | | // finally, delete local files which are not in remote |
671 | 0 | for (const auto& [local_file, local_filestat] : local_files) { |
672 | | // replace the tablet id in local file name with the remote tablet id, |
673 | | // in order to compare the file name. |
674 | 0 | std::string new_name; |
675 | 0 | Status st = _replace_tablet_id(local_file, remote_tablet_id, &new_name); |
676 | 0 | if (!st.ok()) { |
677 | 0 | LOG(WARNING) << "failed to replace tablet id. unknown local file: " << st |
678 | 0 | << ". ignore it"; |
679 | 0 | continue; |
680 | 0 | } |
681 | 0 | VLOG_CRITICAL << "new file name after replace tablet id: " << new_name; |
682 | 0 | const auto& find = remote_files.find(new_name); |
683 | 0 | if (find != remote_files.end()) { |
684 | 0 | continue; |
685 | 0 | } |
686 | | |
687 | | // delete |
688 | 0 | std::string full_local_file = local_path + "/" + local_file; |
689 | 0 | LOG(INFO) << "begin to delete local snapshot file: " << full_local_file |
690 | 0 | << ", it does not exist in remote"; |
691 | 0 | if (remove(full_local_file.c_str()) != 0) { |
692 | 0 | LOG(WARNING) << "failed to delete unknown local file: " << full_local_file |
693 | 0 | << ", error: " << strerror(errno) |
694 | 0 | << ", file size: " << local_filestat.size << ", ignore it"; |
695 | 0 | } |
696 | 0 | } |
697 | |
|
698 | 0 | ++finished_num; |
699 | 0 | } |
700 | | |
701 | 0 | LOG(INFO) << "finished to download snapshots. job: " << _job_id << ", task id: " << _task_id; |
702 | 0 | return status; |
703 | 0 | } |
704 | | |
705 | | // move the snapshot files in snapshot_path |
706 | | // to tablet_path |
707 | | // If overwrite, just replace the tablet_path with snapshot_path, |
708 | | // else: (TODO) |
709 | | // |
710 | | // MUST hold tablet's header lock, push lock, cumulative lock and base compaction lock |
711 | | Status SnapshotLoader::move(const std::string& snapshot_path, TabletSharedPtr tablet, |
712 | 0 | bool overwrite) { |
713 | 0 | auto tablet_path = tablet->tablet_path(); |
714 | 0 | auto store_path = tablet->data_dir()->path(); |
715 | 0 | LOG(INFO) << "begin to move snapshot files. from: " << snapshot_path << ", to: " << tablet_path |
716 | 0 | << ", store: " << store_path << ", job: " << _job_id << ", task id: " << _task_id; |
717 | |
|
718 | 0 | Status status = Status::OK(); |
719 | | |
720 | | // validate snapshot_path and tablet_path |
721 | 0 | int64_t snapshot_tablet_id = 0; |
722 | 0 | int32_t snapshot_schema_hash = 0; |
723 | 0 | RETURN_IF_ERROR(_get_tablet_id_and_schema_hash_from_file_path( |
724 | 0 | snapshot_path, &snapshot_tablet_id, &snapshot_schema_hash)); |
725 | | |
726 | 0 | int64_t tablet_id = 0; |
727 | 0 | int32_t schema_hash = 0; |
728 | 0 | RETURN_IF_ERROR( |
729 | 0 | _get_tablet_id_and_schema_hash_from_file_path(tablet_path, &tablet_id, &schema_hash)); |
730 | | |
731 | 0 | if (tablet_id != snapshot_tablet_id || schema_hash != snapshot_schema_hash) { |
732 | 0 | std::stringstream ss; |
733 | 0 | ss << "path does not match. snapshot: " << snapshot_path |
734 | 0 | << ", tablet path: " << tablet_path; |
735 | 0 | LOG(WARNING) << ss.str(); |
736 | 0 | return Status::InternalError(ss.str()); |
737 | 0 | } |
738 | | |
739 | 0 | DataDir* store = StorageEngine::instance()->get_store(store_path); |
740 | 0 | if (store == nullptr) { |
741 | 0 | std::stringstream ss; |
742 | 0 | ss << "failed to get store by path: " << store_path; |
743 | 0 | LOG(WARNING) << ss.str(); |
744 | 0 | return Status::InternalError(ss.str()); |
745 | 0 | } |
746 | | |
747 | 0 | if (!std::filesystem::exists(tablet_path)) { |
748 | 0 | std::stringstream ss; |
749 | 0 | ss << "tablet path does not exist: " << tablet_path; |
750 | 0 | LOG(WARNING) << ss.str(); |
751 | 0 | return Status::InternalError(ss.str()); |
752 | 0 | } |
753 | | |
754 | 0 | if (!std::filesystem::exists(snapshot_path)) { |
755 | 0 | std::stringstream ss; |
756 | 0 | ss << "snapshot path does not exist: " << snapshot_path; |
757 | 0 | LOG(WARNING) << ss.str(); |
758 | 0 | return Status::InternalError(ss.str()); |
759 | 0 | } |
760 | | |
761 | | // rename the rowset ids and tabletid info in rowset meta |
762 | 0 | auto res = SnapshotManager::instance()->convert_rowset_ids( |
763 | 0 | snapshot_path, tablet_id, tablet->replica_id(), tablet->partition_id(), schema_hash); |
764 | 0 | if (!res.has_value()) [[unlikely]] { |
765 | 0 | auto err_msg = |
766 | 0 | fmt::format("failed to convert rowsetids in snapshot: {}, tablet path: {}, err: {}", |
767 | 0 | snapshot_path, tablet_path, res.error()); |
768 | 0 | LOG(WARNING) << err_msg; |
769 | 0 | return Status::InternalError(err_msg); |
770 | 0 | } |
771 | | |
772 | 0 | if (overwrite) { |
773 | 0 | std::vector<std::string> snapshot_files; |
774 | 0 | RETURN_IF_ERROR(_get_existing_files_from_local(snapshot_path, &snapshot_files)); |
775 | | |
776 | | // 1. simply delete the old dir and replace it with the snapshot dir |
777 | 0 | try { |
778 | | // This remove seems soft enough, because we already get |
779 | | // tablet id and schema hash from this path, which |
780 | | // means this path is a valid path. |
781 | 0 | std::filesystem::remove_all(tablet_path); |
782 | 0 | VLOG_CRITICAL << "remove dir: " << tablet_path; |
783 | 0 | std::filesystem::create_directory(tablet_path); |
784 | 0 | VLOG_CRITICAL << "re-create dir: " << tablet_path; |
785 | 0 | } catch (const std::filesystem::filesystem_error& e) { |
786 | 0 | std::stringstream ss; |
787 | 0 | ss << "failed to move tablet path: " << tablet_path << ". err: " << e.what(); |
788 | 0 | LOG(WARNING) << ss.str(); |
789 | 0 | return Status::InternalError(ss.str()); |
790 | 0 | } |
791 | | |
792 | | // link files one by one |
793 | | // files in snapshot dir will be moved in snapshot clean process |
794 | 0 | std::vector<std::string> linked_files; |
795 | 0 | for (auto& file : snapshot_files) { |
796 | 0 | auto full_src_path = fmt::format("{}/{}", snapshot_path, file); |
797 | 0 | auto full_dest_path = fmt::format("{}/{}", tablet_path, file); |
798 | 0 | if (link(full_src_path.c_str(), full_dest_path.c_str()) != 0) { |
799 | 0 | LOG(WARNING) << "failed to link file from " << full_src_path << " to " |
800 | 0 | << full_dest_path << ", err: " << std::strerror(errno); |
801 | | |
802 | | // clean the already linked files |
803 | 0 | for (auto& linked_file : linked_files) { |
804 | 0 | remove(linked_file.c_str()); |
805 | 0 | } |
806 | |
|
807 | 0 | return Status::InternalError("move tablet failed"); |
808 | 0 | } |
809 | 0 | linked_files.push_back(full_dest_path); |
810 | 0 | VLOG_CRITICAL << "link file from " << full_src_path << " to " << full_dest_path; |
811 | 0 | } |
812 | |
|
813 | 0 | } else { |
814 | 0 | LOG(FATAL) << "only support overwrite now"; |
815 | 0 | __builtin_unreachable(); |
816 | 0 | } |
817 | | |
818 | | // snapshot loader not need to change tablet uid |
819 | | // fixme: there is no header now and can not call load_one_tablet here |
820 | | // reload header |
821 | 0 | Status ost = StorageEngine::instance()->tablet_manager()->load_tablet_from_dir( |
822 | 0 | store, tablet_id, schema_hash, tablet_path, true); |
823 | 0 | if (!ost.ok()) { |
824 | 0 | std::stringstream ss; |
825 | 0 | ss << "failed to reload header of tablet: " << tablet_id; |
826 | 0 | LOG(WARNING) << ss.str(); |
827 | 0 | return Status::InternalError(ss.str()); |
828 | 0 | } |
829 | 0 | LOG(INFO) << "finished to reload header of tablet: " << tablet_id; |
830 | |
|
831 | 0 | return status; |
832 | 0 | } |
833 | | |
834 | 11 | bool SnapshotLoader::_end_with(const std::string& str, const std::string& match) { |
835 | 11 | if (str.size() >= match.size() && |
836 | 11 | str.compare(str.size() - match.size(), match.size(), match) == 0) { |
837 | 4 | return true; |
838 | 4 | } |
839 | 7 | return false; |
840 | 11 | } |
841 | | |
842 | | Status SnapshotLoader::_get_tablet_id_and_schema_hash_from_file_path(const std::string& src_path, |
843 | | int64_t* tablet_id, |
844 | 2 | int32_t* schema_hash) { |
845 | | // path should be like: /path/.../tablet_id/schema_hash |
846 | | // we try to extract tablet_id from path |
847 | 2 | size_t pos = src_path.find_last_of("/"); |
848 | 2 | if (pos == std::string::npos || pos == src_path.length() - 1) { |
849 | 1 | return Status::InternalError("failed to get tablet id from path: {}", src_path); |
850 | 1 | } |
851 | | |
852 | 1 | std::string schema_hash_str = src_path.substr(pos + 1); |
853 | 1 | std::stringstream ss1; |
854 | 1 | ss1 << schema_hash_str; |
855 | 1 | ss1 >> *schema_hash; |
856 | | |
857 | | // skip schema hash part |
858 | 1 | size_t pos2 = src_path.find_last_of("/", pos - 1); |
859 | 1 | if (pos2 == std::string::npos) { |
860 | 0 | return Status::InternalError("failed to get tablet id from path: {}", src_path); |
861 | 0 | } |
862 | | |
863 | 1 | std::string tablet_str = src_path.substr(pos2 + 1, pos - pos2); |
864 | 1 | std::stringstream ss2; |
865 | 1 | ss2 << tablet_str; |
866 | 1 | ss2 >> *tablet_id; |
867 | | |
868 | 1 | VLOG_CRITICAL << "get tablet id " << *tablet_id << ", schema hash: " << *schema_hash |
869 | 0 | << " from path: " << src_path; |
870 | 1 | return Status::OK(); |
871 | 1 | } |
872 | | |
873 | | Status SnapshotLoader::_check_local_snapshot_paths( |
874 | 4 | const std::map<std::string, std::string>& src_to_dest_path, bool check_src) { |
875 | 4 | bool res = true; |
876 | 4 | for (const auto& pair : src_to_dest_path) { |
877 | 4 | std::string path; |
878 | 4 | if (check_src) { |
879 | 2 | path = pair.first; |
880 | 2 | } else { |
881 | 2 | path = pair.second; |
882 | 2 | } |
883 | | |
884 | 4 | RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(path, &res)); |
885 | 2 | if (!res) { |
886 | 0 | std::stringstream ss; |
887 | 0 | ss << "snapshot path is not directory or does not exist: " << path; |
888 | 0 | LOG(WARNING) << ss.str(); |
889 | 0 | return Status::RuntimeError(ss.str()); |
890 | 0 | } |
891 | 2 | } |
892 | 2 | LOG(INFO) << "all local snapshot paths are existing. num: " << src_to_dest_path.size(); |
893 | 2 | return Status::OK(); |
894 | 4 | } |
895 | | |
896 | | Status SnapshotLoader::_get_existing_files_from_local(const std::string& local_path, |
897 | 1 | std::vector<std::string>* local_files) { |
898 | 1 | bool exists = true; |
899 | 1 | std::vector<io::FileInfo> files; |
900 | 1 | RETURN_IF_ERROR(io::global_local_filesystem()->list(local_path, true, &files, &exists)); |
901 | 1 | for (auto& file : files) { |
902 | 0 | local_files->push_back(file.file_name); |
903 | 0 | } |
904 | 1 | LOG(INFO) << "finished to list files in local path: " << local_path |
905 | 1 | << ", file num: " << local_files->size(); |
906 | 1 | return Status::OK(); |
907 | 1 | } |
908 | | |
909 | | Status SnapshotLoader::_replace_tablet_id(const std::string& file_name, int64_t tablet_id, |
910 | 4 | std::string* new_file_name) { |
911 | | // eg: |
912 | | // 10007.hdr |
913 | | // 10007_2_2_0_0.idx |
914 | | // 10007_2_2_0_0.dat |
915 | 4 | if (_end_with(file_name, ".hdr")) { |
916 | 1 | std::stringstream ss; |
917 | 1 | ss << tablet_id << ".hdr"; |
918 | 1 | *new_file_name = ss.str(); |
919 | 1 | return Status::OK(); |
920 | 3 | } else if (_end_with(file_name, ".idx") || _end_with(file_name, ".dat")) { |
921 | 2 | *new_file_name = file_name; |
922 | 2 | return Status::OK(); |
923 | 2 | } else { |
924 | 1 | return Status::InternalError("invalid tablet file name: {}", file_name); |
925 | 1 | } |
926 | 4 | } |
927 | | |
928 | | Status SnapshotLoader::_get_tablet_id_from_remote_path(const std::string& remote_path, |
929 | 1 | int64_t* tablet_id) { |
930 | | // eg: |
931 | | // bos://xxx/../__tbl_10004/__part_10003/__idx_10004/__10005 |
932 | 1 | size_t pos = remote_path.find_last_of("_"); |
933 | 1 | if (pos == std::string::npos) { |
934 | 0 | return Status::InternalError("invalid remove file path: {}", remote_path); |
935 | 0 | } |
936 | | |
937 | 1 | std::string tablet_id_str = remote_path.substr(pos + 1); |
938 | 1 | std::stringstream ss; |
939 | 1 | ss << tablet_id_str; |
940 | 1 | ss >> *tablet_id; |
941 | | |
942 | 1 | return Status::OK(); |
943 | 1 | } |
944 | | |
945 | | // only return CANCELLED if FE return that job is cancelled. |
946 | | // otherwise, return OK |
947 | | Status SnapshotLoader::_report_every(int report_threshold, int* counter, int32_t finished_num, |
948 | 0 | int32_t total_num, TTaskType::type type) { |
949 | 0 | ++*counter; |
950 | 0 | if (*counter <= report_threshold) { |
951 | 0 | return Status::OK(); |
952 | 0 | } |
953 | | |
954 | 0 | LOG(INFO) << "report to frontend. job id: " << _job_id << ", task id: " << _task_id |
955 | 0 | << ", finished num: " << finished_num << ", total num:" << total_num; |
956 | |
|
957 | 0 | TNetworkAddress master_addr = _env->master_info()->network_address; |
958 | |
|
959 | 0 | TSnapshotLoaderReportRequest request; |
960 | 0 | request.job_id = _job_id; |
961 | 0 | request.task_id = _task_id; |
962 | 0 | request.task_type = type; |
963 | 0 | request.__set_finished_num(finished_num); |
964 | 0 | request.__set_total_num(total_num); |
965 | 0 | TStatus report_st; |
966 | |
|
967 | 0 | Status rpcStatus = ThriftRpcHelper::rpc<FrontendServiceClient>( |
968 | 0 | master_addr.hostname, master_addr.port, |
969 | 0 | [&request, &report_st](FrontendServiceConnection& client) { |
970 | 0 | client->snapshotLoaderReport(report_st, request); |
971 | 0 | }, |
972 | 0 | 10000); |
973 | |
|
974 | 0 | if (!rpcStatus.ok()) { |
975 | | // rpc failed, ignore |
976 | 0 | return Status::OK(); |
977 | 0 | } |
978 | | |
979 | | // reset |
980 | 0 | *counter = 0; |
981 | 0 | if (report_st.status_code == TStatusCode::CANCELLED) { |
982 | 0 | LOG(INFO) << "job is cancelled. job id: " << _job_id << ", task id: " << _task_id; |
983 | 0 | return Status::Cancelled("Cancelled"); |
984 | 0 | } |
985 | 0 | return Status::OK(); |
986 | 0 | } |
987 | | |
988 | | Status SnapshotLoader::_list_with_checksum(const std::string& dir, |
989 | 0 | std::map<std::string, FileStat>* md5_files) { |
990 | 0 | bool exists = true; |
991 | 0 | std::vector<io::FileInfo> files; |
992 | 0 | RETURN_IF_ERROR(_remote_fs->list(dir, true, &files, &exists)); |
993 | 0 | for (auto& tmp_file : files) { |
994 | 0 | io::Path path(tmp_file.file_name); |
995 | 0 | std::string file_name = path.filename(); |
996 | 0 | size_t pos = file_name.find_last_of("."); |
997 | 0 | if (pos == std::string::npos || pos == file_name.size() - 1) { |
998 | | // Not found checksum separator, ignore this file |
999 | 0 | continue; |
1000 | 0 | } |
1001 | 0 | FileStat stat = {std::string(file_name, 0, pos), std::string(file_name, pos + 1), |
1002 | 0 | tmp_file.file_size}; |
1003 | 0 | md5_files->emplace(std::string(file_name, 0, pos), stat); |
1004 | 0 | } |
1005 | |
|
1006 | 0 | return Status::OK(); |
1007 | 0 | } |
1008 | | |
1009 | | } // end namespace doris |