/root/doris/be/src/runtime/snapshot_loader.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "runtime/snapshot_loader.h" |
19 | | |
20 | | // IWYU pragma: no_include <bthread/errno.h> |
21 | | #include <errno.h> // IWYU pragma: keep |
22 | | #include <fmt/format.h> |
23 | | #include <gen_cpp/FrontendService.h> |
24 | | #include <gen_cpp/FrontendService_types.h> |
25 | | #include <gen_cpp/HeartbeatService_types.h> |
26 | | #include <gen_cpp/PlanNodes_types.h> |
27 | | #include <gen_cpp/Status_types.h> |
28 | | #include <gen_cpp/Types_types.h> |
29 | | |
30 | | #include <algorithm> |
31 | | #include <cstring> |
32 | | #include <filesystem> |
33 | | #include <istream> |
34 | | #include <unordered_map> |
35 | | #include <utility> |
36 | | |
37 | | #include "common/logging.h" |
38 | | #include "gutil/strings/split.h" |
39 | | #include "http/http_client.h" |
40 | | #include "io/fs/broker_file_system.h" |
41 | | #include "io/fs/file_system.h" |
42 | | #include "io/fs/hdfs_file_system.h" |
43 | | #include "io/fs/local_file_system.h" |
44 | | #include "io/fs/path.h" |
45 | | #include "io/fs/remote_file_system.h" |
46 | | #include "io/fs/s3_file_system.h" |
47 | | #include "io/hdfs_builder.h" |
48 | | #include "olap/data_dir.h" |
49 | | #include "olap/snapshot_manager.h" |
50 | | #include "olap/storage_engine.h" |
51 | | #include "olap/tablet.h" |
52 | | #include "olap/tablet_manager.h" |
53 | | #include "runtime/client_cache.h" |
54 | | #include "runtime/exec_env.h" |
55 | | #include "util/s3_uri.h" |
56 | | #include "util/s3_util.h" |
57 | | #include "util/thrift_rpc_helper.h" |
58 | | |
59 | | namespace doris { |
60 | | namespace { |
61 | | |
62 | | Status upload_with_checksum(io::RemoteFileSystem& fs, std::string_view local_path, |
63 | 0 | std::string_view remote_path, std::string_view checksum) { |
64 | 0 | auto full_remote_path = fmt::format("{}.{}", remote_path, checksum); |
65 | 0 | switch (fs.type()) { |
66 | 0 | case io::FileSystemType::HDFS: |
67 | 0 | case io::FileSystemType::BROKER: { |
68 | 0 | std::string temp = fmt::format("{}.part", remote_path); |
69 | 0 | RETURN_IF_ERROR(fs.upload(local_path, temp)); |
70 | 0 | RETURN_IF_ERROR(fs.rename(temp, full_remote_path)); |
71 | 0 | break; |
72 | 0 | } |
73 | 0 | case io::FileSystemType::S3: |
74 | 0 | RETURN_IF_ERROR(fs.upload(local_path, full_remote_path)); |
75 | 0 | break; |
76 | 0 | default: |
77 | 0 | LOG(FATAL) << "unknown fs type: " << static_cast<int>(fs.type()); |
78 | 0 | } |
79 | 0 | return Status::OK(); |
80 | 0 | } |
81 | | |
82 | 9 | bool _end_with(std::string_view str, std::string_view match) { |
83 | 9 | return str.size() >= match.size() && |
84 | 9 | str.compare(str.size() - match.size(), match.size(), match) == 0; |
85 | 9 | } |
86 | | |
87 | | } // namespace |
88 | | |
89 | | SnapshotLoader::SnapshotLoader(StorageEngine& engine, ExecEnv* env, int64_t job_id, int64_t task_id, |
90 | | const TNetworkAddress& broker_addr, |
91 | | const std::map<std::string, std::string>& prop) |
92 | | : _engine(engine), |
93 | | _env(env), |
94 | | _job_id(job_id), |
95 | | _task_id(task_id), |
96 | | _broker_addr(broker_addr), |
97 | 1 | _prop(prop) {} |
98 | | |
99 | 0 | Status SnapshotLoader::init(TStorageBackendType::type type, const std::string& location) { |
100 | 0 | if (TStorageBackendType::type::S3 == type) { |
101 | 0 | S3Conf s3_conf; |
102 | 0 | S3URI s3_uri(location); |
103 | 0 | RETURN_IF_ERROR(s3_uri.parse()); |
104 | 0 | RETURN_IF_ERROR(S3ClientFactory::convert_properties_to_s3_conf(_prop, s3_uri, &s3_conf)); |
105 | 0 | _remote_fs = |
106 | 0 | DORIS_TRY(io::S3FileSystem::create(std::move(s3_conf), io::FileSystem::TMP_FS_ID)); |
107 | 0 | } else if (TStorageBackendType::type::HDFS == type) { |
108 | 0 | THdfsParams hdfs_params = parse_properties(_prop); |
109 | 0 | _remote_fs = DORIS_TRY(io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name, |
110 | 0 | io::FileSystem::TMP_FS_ID, nullptr)); |
111 | 0 | } else if (TStorageBackendType::type::BROKER == type) { |
112 | 0 | std::shared_ptr<io::BrokerFileSystem> fs; |
113 | 0 | _remote_fs = DORIS_TRY( |
114 | 0 | io::BrokerFileSystem::create(_broker_addr, _prop, io::FileSystem::TMP_FS_ID)); |
115 | 0 | } else { |
116 | 0 | return Status::InternalError("Unknown storage type: {}", type); |
117 | 0 | } |
118 | 0 | return Status::OK(); |
119 | 0 | } |
120 | | |
121 | 1 | SnapshotLoader::~SnapshotLoader() = default; |
122 | | |
123 | | Status SnapshotLoader::upload(const std::map<std::string, std::string>& src_to_dest_path, |
124 | 0 | std::map<int64_t, std::vector<std::string>>* tablet_files) { |
125 | 0 | if (!_remote_fs) { |
126 | 0 | return Status::InternalError("Storage backend not initialized."); |
127 | 0 | } |
128 | 0 | LOG(INFO) << "begin to upload snapshot files. num: " << src_to_dest_path.size() |
129 | 0 | << ", broker addr: " << _broker_addr << ", job: " << _job_id << ", task" << _task_id; |
130 | | |
131 | | // check if job has already been cancelled |
132 | 0 | int tmp_counter = 1; |
133 | 0 | RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::UPLOAD)); |
134 | | |
135 | 0 | Status status = Status::OK(); |
136 | | // 1. validate local tablet snapshot paths |
137 | 0 | RETURN_IF_ERROR(_check_local_snapshot_paths(src_to_dest_path, true)); |
138 | | |
139 | | // 2. for each src path, upload it to remote storage |
140 | | // we report to frontend for every 10 files, and we will cancel the job if |
141 | | // the job has already been cancelled in frontend. |
142 | 0 | int report_counter = 0; |
143 | 0 | int total_num = src_to_dest_path.size(); |
144 | 0 | int finished_num = 0; |
145 | 0 | for (const auto& iter : src_to_dest_path) { |
146 | 0 | const std::string& src_path = iter.first; |
147 | 0 | const std::string& dest_path = iter.second; |
148 | |
|
149 | 0 | int64_t tablet_id = 0; |
150 | 0 | int32_t schema_hash = 0; |
151 | 0 | RETURN_IF_ERROR( |
152 | 0 | _get_tablet_id_and_schema_hash_from_file_path(src_path, &tablet_id, &schema_hash)); |
153 | | |
154 | | // 2.1 get existing files from remote path |
155 | 0 | std::map<std::string, FileStat> remote_files; |
156 | 0 | RETURN_IF_ERROR(_list_with_checksum(dest_path, &remote_files)); |
157 | | |
158 | 0 | for (auto& tmp : remote_files) { |
159 | 0 | VLOG_CRITICAL << "get remote file: " << tmp.first << ", checksum: " << tmp.second.md5; |
160 | 0 | } |
161 | | |
162 | | // 2.2 list local files |
163 | 0 | std::vector<std::string> local_files; |
164 | 0 | std::vector<std::string> local_files_with_checksum; |
165 | 0 | RETURN_IF_ERROR(_get_existing_files_from_local(src_path, &local_files)); |
166 | | |
167 | | // 2.3 iterate local files |
168 | 0 | for (auto& local_file : local_files) { |
169 | 0 | RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num, |
170 | 0 | TTaskType::type::UPLOAD)); |
171 | | |
172 | | // calc md5sum of localfile |
173 | 0 | std::string md5sum; |
174 | 0 | RETURN_IF_ERROR( |
175 | 0 | io::global_local_filesystem()->md5sum(src_path + "/" + local_file, &md5sum)); |
176 | 0 | VLOG_CRITICAL << "get file checksum: " << local_file << ": " << md5sum; |
177 | 0 | local_files_with_checksum.push_back(local_file + "." + md5sum); |
178 | | |
179 | | // check if this local file need upload |
180 | 0 | bool need_upload = false; |
181 | 0 | auto find = remote_files.find(local_file); |
182 | 0 | if (find != remote_files.end()) { |
183 | 0 | if (md5sum != find->second.md5) { |
184 | | // remote storage file exist, but with different checksum |
185 | 0 | LOG(WARNING) << "remote file checksum is invalid. remote: " << find->first |
186 | 0 | << ", local: " << md5sum; |
187 | | // TODO(cmy): save these files and delete them later |
188 | 0 | need_upload = true; |
189 | 0 | } |
190 | 0 | } else { |
191 | 0 | need_upload = true; |
192 | 0 | } |
193 | |
|
194 | 0 | if (!need_upload) { |
195 | 0 | VLOG_CRITICAL << "file exist in remote path, no need to upload: " << local_file; |
196 | 0 | continue; |
197 | 0 | } |
198 | | |
199 | | // upload |
200 | 0 | std::string remote_path = dest_path + '/' + local_file; |
201 | 0 | std::string local_path = src_path + '/' + local_file; |
202 | 0 | RETURN_IF_ERROR(upload_with_checksum(*_remote_fs, local_path, remote_path, md5sum)); |
203 | 0 | } // end for each tablet's local files |
204 | | |
205 | 0 | tablet_files->emplace(tablet_id, local_files_with_checksum); |
206 | 0 | finished_num++; |
207 | 0 | LOG(INFO) << "finished to write tablet to remote. local path: " << src_path |
208 | 0 | << ", remote path: " << dest_path; |
209 | 0 | } // end for each tablet path |
210 | | |
211 | 0 | LOG(INFO) << "finished to upload snapshots. job: " << _job_id << ", task id: " << _task_id; |
212 | 0 | return status; |
213 | 0 | } |
214 | | |
215 | | /* |
216 | | * Download snapshot files from remote. |
217 | | * After downloaded, the local dir should contains all files existing in remote, |
218 | | * may also contains several useless files. |
219 | | */ |
220 | | Status SnapshotLoader::download(const std::map<std::string, std::string>& src_to_dest_path, |
221 | 0 | std::vector<int64_t>* downloaded_tablet_ids) { |
222 | 0 | if (!_remote_fs) { |
223 | 0 | return Status::InternalError("Storage backend not initialized."); |
224 | 0 | } |
225 | 0 | LOG(INFO) << "begin to download snapshot files. num: " << src_to_dest_path.size() |
226 | 0 | << ", broker addr: " << _broker_addr << ", job: " << _job_id |
227 | 0 | << ", task id: " << _task_id; |
228 | | |
229 | | // check if job has already been cancelled |
230 | 0 | int tmp_counter = 1; |
231 | 0 | RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::DOWNLOAD)); |
232 | | |
233 | 0 | Status status = Status::OK(); |
234 | | // 1. validate local tablet snapshot paths |
235 | 0 | RETURN_IF_ERROR(_check_local_snapshot_paths(src_to_dest_path, false)); |
236 | | |
237 | | // 2. for each src path, download it to local storage |
238 | 0 | int report_counter = 0; |
239 | 0 | int total_num = src_to_dest_path.size(); |
240 | 0 | int finished_num = 0; |
241 | 0 | for (const auto& iter : src_to_dest_path) { |
242 | 0 | const std::string& remote_path = iter.first; |
243 | 0 | const std::string& local_path = iter.second; |
244 | |
|
245 | 0 | int64_t local_tablet_id = 0; |
246 | 0 | int32_t schema_hash = 0; |
247 | 0 | RETURN_IF_ERROR(_get_tablet_id_and_schema_hash_from_file_path(local_path, &local_tablet_id, |
248 | 0 | &schema_hash)); |
249 | 0 | downloaded_tablet_ids->push_back(local_tablet_id); |
250 | |
|
251 | 0 | int64_t remote_tablet_id; |
252 | 0 | RETURN_IF_ERROR(_get_tablet_id_from_remote_path(remote_path, &remote_tablet_id)); |
253 | 0 | VLOG_CRITICAL << "get local tablet id: " << local_tablet_id |
254 | 0 | << ", schema hash: " << schema_hash |
255 | 0 | << ", remote tablet id: " << remote_tablet_id; |
256 | | |
257 | | // 2.1. get local files |
258 | 0 | std::vector<std::string> local_files; |
259 | 0 | RETURN_IF_ERROR(_get_existing_files_from_local(local_path, &local_files)); |
260 | | |
261 | | // 2.2. get remote files |
262 | 0 | std::map<std::string, FileStat> remote_files; |
263 | 0 | RETURN_IF_ERROR(_list_with_checksum(remote_path, &remote_files)); |
264 | 0 | if (remote_files.empty()) { |
265 | 0 | std::stringstream ss; |
266 | 0 | ss << "get nothing from remote path: " << remote_path; |
267 | 0 | LOG(WARNING) << ss.str(); |
268 | 0 | return Status::InternalError(ss.str()); |
269 | 0 | } |
270 | | |
271 | 0 | TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(local_tablet_id); |
272 | 0 | if (tablet == nullptr) { |
273 | 0 | std::stringstream ss; |
274 | 0 | ss << "failed to get local tablet: " << local_tablet_id; |
275 | 0 | LOG(WARNING) << ss.str(); |
276 | 0 | return Status::InternalError(ss.str()); |
277 | 0 | } |
278 | 0 | DataDir* data_dir = tablet->data_dir(); |
279 | |
|
280 | 0 | for (auto& iter : remote_files) { |
281 | 0 | RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num, |
282 | 0 | TTaskType::type::DOWNLOAD)); |
283 | | |
284 | 0 | bool need_download = false; |
285 | 0 | const std::string& remote_file = iter.first; |
286 | 0 | const FileStat& file_stat = iter.second; |
287 | 0 | auto find = std::find(local_files.begin(), local_files.end(), remote_file); |
288 | 0 | if (find == local_files.end()) { |
289 | | // remote file does not exist in local, download it |
290 | 0 | need_download = true; |
291 | 0 | } else { |
292 | 0 | if (_end_with(remote_file, ".hdr")) { |
293 | | // this is a header file, download it. |
294 | 0 | need_download = true; |
295 | 0 | } else { |
296 | | // check checksum |
297 | 0 | std::string local_md5sum; |
298 | 0 | Status st = io::global_local_filesystem()->md5sum( |
299 | 0 | local_path + "/" + remote_file, &local_md5sum); |
300 | 0 | if (!st.ok()) { |
301 | 0 | LOG(WARNING) << "failed to get md5sum of local file: " << remote_file |
302 | 0 | << ". msg: " << st << ". download it"; |
303 | 0 | need_download = true; |
304 | 0 | } else { |
305 | 0 | VLOG_CRITICAL << "get local file checksum: " << remote_file << ": " |
306 | 0 | << local_md5sum; |
307 | 0 | if (file_stat.md5 != local_md5sum) { |
308 | | // file's checksum does not equal, download it. |
309 | 0 | need_download = true; |
310 | 0 | } |
311 | 0 | } |
312 | 0 | } |
313 | 0 | } |
314 | |
|
315 | 0 | if (!need_download) { |
316 | 0 | LOG(INFO) << "remote file already exist in local, no need to download." |
317 | 0 | << ", file: " << remote_file; |
318 | 0 | continue; |
319 | 0 | } |
320 | | |
321 | | // begin to download |
322 | 0 | std::string full_remote_file = remote_path + "/" + remote_file + "." + file_stat.md5; |
323 | 0 | std::string local_file_name; |
324 | | // we need to replace the tablet_id in remote file name with local tablet id |
325 | 0 | RETURN_IF_ERROR(_replace_tablet_id(remote_file, local_tablet_id, &local_file_name)); |
326 | 0 | std::string full_local_file = local_path + "/" + local_file_name; |
327 | 0 | LOG(INFO) << "begin to download from " << full_remote_file << " to " << full_local_file; |
328 | 0 | size_t file_len = file_stat.size; |
329 | | |
330 | | // check disk capacity |
331 | 0 | if (data_dir->reach_capacity_limit(file_len)) { |
332 | 0 | return Status::Error<ErrorCode::EXCEEDED_LIMIT>( |
333 | 0 | "reach the capacity limit of path {}, file_size={}", data_dir->path(), |
334 | 0 | file_len); |
335 | 0 | } |
336 | | // remove file which will be downloaded now. |
337 | | // this file will be added to local_files if it be downloaded successfully. |
338 | 0 | if (find != local_files.end()) { |
339 | 0 | local_files.erase(find); |
340 | 0 | } |
341 | 0 | RETURN_IF_ERROR(_remote_fs->download(full_remote_file, full_local_file)); |
342 | | |
343 | | // 3. check md5 of the downloaded file |
344 | 0 | std::string downloaded_md5sum; |
345 | 0 | RETURN_IF_ERROR( |
346 | 0 | io::global_local_filesystem()->md5sum(full_local_file, &downloaded_md5sum)); |
347 | 0 | VLOG_CRITICAL << "get downloaded file checksum: " << full_local_file << ": " |
348 | 0 | << downloaded_md5sum; |
349 | 0 | if (downloaded_md5sum != file_stat.md5) { |
350 | 0 | std::stringstream ss; |
351 | 0 | ss << "invalid md5 of downloaded file: " << full_local_file |
352 | 0 | << ", expected: " << file_stat.md5 << ", get: " << downloaded_md5sum; |
353 | 0 | LOG(WARNING) << ss.str(); |
354 | 0 | return Status::InternalError(ss.str()); |
355 | 0 | } |
356 | | |
357 | | // local_files always keep the updated local files |
358 | 0 | local_files.push_back(local_file_name); |
359 | 0 | LOG(INFO) << "finished to download file via broker. file: " << full_local_file |
360 | 0 | << ", length: " << file_len; |
361 | 0 | } // end for all remote files |
362 | | |
363 | | // finally, delete local files which are not in remote |
364 | 0 | for (const auto& local_file : local_files) { |
365 | | // replace the tablet id in local file name with the remote tablet id, |
366 | | // in order to compare the file name. |
367 | 0 | std::string new_name; |
368 | 0 | Status st = _replace_tablet_id(local_file, remote_tablet_id, &new_name); |
369 | 0 | if (!st.ok()) { |
370 | 0 | LOG(WARNING) << "failed to replace tablet id. unknown local file: " << st |
371 | 0 | << ". ignore it"; |
372 | 0 | continue; |
373 | 0 | } |
374 | 0 | VLOG_CRITICAL << "new file name after replace tablet id: " << new_name; |
375 | 0 | const auto& find = remote_files.find(new_name); |
376 | 0 | if (find != remote_files.end()) { |
377 | 0 | continue; |
378 | 0 | } |
379 | | |
380 | | // delete |
381 | 0 | std::string full_local_file = local_path + "/" + local_file; |
382 | 0 | VLOG_CRITICAL << "begin to delete local snapshot file: " << full_local_file |
383 | 0 | << ", it does not exist in remote"; |
384 | 0 | if (remove(full_local_file.c_str()) != 0) { |
385 | 0 | LOG(WARNING) << "failed to delete unknown local file: " << full_local_file |
386 | 0 | << ", ignore it"; |
387 | 0 | } |
388 | 0 | } |
389 | |
|
390 | 0 | finished_num++; |
391 | 0 | } // end for src_to_dest_path |
392 | | |
393 | 0 | LOG(INFO) << "finished to download snapshots. job: " << _job_id << ", task id: " << _task_id; |
394 | 0 | return status; |
395 | 0 | } |
396 | | |
397 | | Status SnapshotLoader::remote_http_download( |
398 | | const std::vector<TRemoteTabletSnapshot>& remote_tablet_snapshots, |
399 | 0 | std::vector<int64_t>* downloaded_tablet_ids) { |
400 | 0 | LOG(INFO) << fmt::format("begin to download snapshots via http. job: {}, task id: {}", _job_id, |
401 | 0 | _task_id); |
402 | 0 | constexpr uint32_t kListRemoteFileTimeout = 15; |
403 | 0 | constexpr uint32_t kDownloadFileMaxRetry = 3; |
404 | 0 | constexpr uint32_t kGetLengthTimeout = 10; |
405 | | |
406 | | // check if job has already been cancelled |
407 | 0 | int tmp_counter = 1; |
408 | 0 | RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::DOWNLOAD)); |
409 | 0 | Status status = Status::OK(); |
410 | | |
411 | | // Step before, validate all remote |
412 | | |
413 | | // Step 1: Validate local tablet snapshot paths |
414 | 0 | for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) { |
415 | 0 | const auto& path = remote_tablet_snapshot.local_snapshot_path; |
416 | 0 | bool res = true; |
417 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(path, &res)); |
418 | 0 | if (!res) { |
419 | 0 | std::stringstream ss; |
420 | 0 | auto err_msg = |
421 | 0 | fmt::format("snapshot path is not directory or does not exist: {}", path); |
422 | 0 | LOG(WARNING) << err_msg; |
423 | 0 | return Status::RuntimeError(err_msg); |
424 | 0 | } |
425 | 0 | } |
426 | | |
427 | | // Step 2: get all local files |
428 | 0 | struct LocalFileStat { |
429 | 0 | uint64_t size; |
430 | 0 | std::string md5; |
431 | 0 | }; |
432 | 0 | std::unordered_map<std::string, std::unordered_map<std::string, LocalFileStat>> local_files_map; |
433 | 0 | for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) { |
434 | 0 | const auto& local_path = remote_tablet_snapshot.local_snapshot_path; |
435 | 0 | std::vector<std::string> local_files; |
436 | 0 | RETURN_IF_ERROR(_get_existing_files_from_local(local_path, &local_files)); |
437 | | |
438 | 0 | auto& local_filestat = local_files_map[local_path]; |
439 | 0 | for (auto& local_file : local_files) { |
440 | | // add file size |
441 | 0 | std::string local_file_path = local_path + "/" + local_file; |
442 | 0 | std::error_code ec; |
443 | 0 | uint64_t local_file_size = std::filesystem::file_size(local_file_path, ec); |
444 | 0 | if (ec) { |
445 | 0 | LOG(WARNING) << "download file error" << ec.message(); |
446 | 0 | return Status::IOError("can't retrive file_size of {}, due to {}", local_file_path, |
447 | 0 | ec.message()); |
448 | 0 | } |
449 | 0 | std::string md5; |
450 | 0 | auto status = io::global_local_filesystem()->md5sum(local_file_path, &md5); |
451 | 0 | if (!status.ok()) { |
452 | 0 | LOG(WARNING) << "download file error, local file " << local_file_path |
453 | 0 | << " md5sum: " << status.to_string(); |
454 | 0 | return status; |
455 | 0 | } |
456 | 0 | local_filestat[local_file] = {local_file_size, md5}; |
457 | 0 | } |
458 | 0 | } |
459 | | |
460 | | // Step 3: Validate remote tablet snapshot paths && remote files map |
461 | | // key is remote snapshot paths, value is filelist |
462 | | // get all these use http download action |
463 | | // http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180//2774718/217609978/2774718.hdr |
464 | 0 | int report_counter = 0; |
465 | 0 | int total_num = remote_tablet_snapshots.size(); |
466 | 0 | int finished_num = 0; |
467 | 0 | struct RemoteFileStat { |
468 | 0 | std::string url; |
469 | 0 | std::string md5; |
470 | 0 | uint64_t size; |
471 | 0 | }; |
472 | 0 | std::unordered_map<std::string, std::unordered_map<std::string, RemoteFileStat>> |
473 | 0 | remote_files_map; |
474 | 0 | for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) { |
475 | 0 | const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path; |
476 | 0 | auto& remote_files = remote_files_map[remote_path]; |
477 | 0 | const auto& token = remote_tablet_snapshot.remote_token; |
478 | 0 | const auto& remote_be_addr = remote_tablet_snapshot.remote_be_addr; |
479 | | |
480 | | // HEAD http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180/ |
481 | 0 | std::string base_url = fmt::format("http://{}:{}/api/_tablet/_download?token={}", |
482 | 0 | remote_be_addr.hostname, remote_be_addr.port, token); |
483 | 0 | std::string remote_url_prefix = fmt::format("{}&file={}", base_url, remote_path); |
484 | |
|
485 | 0 | LOG(INFO) << "list remote files: " << remote_url_prefix << ", job: " << _job_id |
486 | 0 | << ", task id: " << _task_id << ", remote be: " << remote_be_addr; |
487 | 0 | string file_list_str; |
488 | 0 | auto list_files_cb = [&remote_url_prefix, &file_list_str](HttpClient* client) { |
489 | 0 | RETURN_IF_ERROR(client->init(remote_url_prefix)); |
490 | 0 | client->set_timeout_ms(kListRemoteFileTimeout * 1000); |
491 | 0 | return client->execute(&file_list_str); |
492 | 0 | }; |
493 | 0 | RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, list_files_cb)); |
494 | 0 | std::vector<string> filename_list = |
495 | 0 | strings::Split(file_list_str, "\n", strings::SkipWhitespace()); |
496 | |
|
497 | 0 | for (const auto& filename : filename_list) { |
498 | 0 | std::string remote_file_url = |
499 | 0 | fmt::format("{}&file={}/{}&channel=ingest_binlog", base_url, |
500 | 0 | remote_tablet_snapshot.remote_snapshot_path, filename); |
501 | | |
502 | | // get file length |
503 | 0 | uint64_t file_size = 0; |
504 | 0 | std::string file_md5; |
505 | 0 | auto get_file_stat_cb = [&remote_file_url, &file_size, &file_md5](HttpClient* client) { |
506 | 0 | std::string url = fmt::format("{}&acquire_md5=true", remote_file_url); |
507 | 0 | RETURN_IF_ERROR(client->init(url)); |
508 | 0 | client->set_timeout_ms(kGetLengthTimeout * 1000); |
509 | 0 | RETURN_IF_ERROR(client->head()); |
510 | 0 | RETURN_IF_ERROR(client->get_content_length(&file_size)); |
511 | 0 | RETURN_IF_ERROR(client->get_content_md5(&file_md5)); |
512 | 0 | return Status::OK(); |
513 | 0 | }; |
514 | 0 | RETURN_IF_ERROR( |
515 | 0 | HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, get_file_stat_cb)); |
516 | | |
517 | 0 | remote_files[filename] = RemoteFileStat {remote_file_url, file_md5, file_size}; |
518 | 0 | } |
519 | 0 | } |
520 | | |
521 | | // Step 4: Compare local and remote files && get all need download files |
522 | 0 | for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) { |
523 | 0 | RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num, |
524 | 0 | TTaskType::type::DOWNLOAD)); |
525 | | |
526 | 0 | const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path; |
527 | 0 | const auto& local_path = remote_tablet_snapshot.local_snapshot_path; |
528 | 0 | auto& remote_files = remote_files_map[remote_path]; |
529 | 0 | auto& local_files = local_files_map[local_path]; |
530 | 0 | auto remote_tablet_id = remote_tablet_snapshot.remote_tablet_id; |
531 | | |
532 | | // get all need download files |
533 | 0 | std::vector<std::string> need_download_files; |
534 | 0 | for (const auto& [remote_file, remote_filestat] : remote_files) { |
535 | 0 | LOG(INFO) << "remote file: " << remote_file << ", size: " << remote_filestat.size |
536 | 0 | << ", md5: " << remote_filestat.md5; |
537 | 0 | auto it = local_files.find(remote_file); |
538 | 0 | if (it == local_files.end()) { |
539 | 0 | need_download_files.emplace_back(remote_file); |
540 | 0 | continue; |
541 | 0 | } |
542 | 0 | if (_end_with(remote_file, ".hdr")) { |
543 | 0 | need_download_files.emplace_back(remote_file); |
544 | 0 | continue; |
545 | 0 | } |
546 | | |
547 | 0 | if (auto& local_filestat = it->second; local_filestat.size != remote_filestat.size) { |
548 | 0 | need_download_files.emplace_back(remote_file); |
549 | 0 | continue; |
550 | 0 | } |
551 | | |
552 | 0 | if (auto& local_filestat = it->second; local_filestat.md5 != remote_filestat.md5) { |
553 | 0 | need_download_files.emplace_back(remote_file); |
554 | 0 | continue; |
555 | 0 | } |
556 | | |
557 | 0 | LOG(INFO) << fmt::format("file {} already exists, skip download", remote_file); |
558 | 0 | } |
559 | |
|
560 | 0 | auto local_tablet_id = remote_tablet_snapshot.local_tablet_id; |
561 | 0 | TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(local_tablet_id); |
562 | 0 | if (tablet == nullptr) { |
563 | 0 | std::stringstream ss; |
564 | 0 | ss << "failed to get local tablet: " << local_tablet_id; |
565 | 0 | LOG(WARNING) << ss.str(); |
566 | 0 | return Status::InternalError(ss.str()); |
567 | 0 | } |
568 | 0 | DataDir* data_dir = tablet->data_dir(); |
569 | | |
570 | | // download all need download files |
571 | 0 | uint64_t total_file_size = 0; |
572 | 0 | MonotonicStopWatch watch; |
573 | 0 | watch.start(); |
574 | 0 | for (auto& filename : need_download_files) { |
575 | 0 | auto& remote_filestat = remote_files[filename]; |
576 | 0 | auto file_size = remote_filestat.size; |
577 | 0 | auto& remote_file_url = remote_filestat.url; |
578 | 0 | auto& remote_file_md5 = remote_filestat.md5; |
579 | | |
580 | | // check disk capacity |
581 | 0 | if (data_dir->reach_capacity_limit(file_size)) { |
582 | 0 | return Status::Error<ErrorCode::EXCEEDED_LIMIT>( |
583 | 0 | "reach the capacity limit of path {}, file_size={}", data_dir->path(), |
584 | 0 | file_size); |
585 | 0 | } |
586 | | |
587 | 0 | total_file_size += file_size; |
588 | 0 | uint64_t estimate_timeout = file_size / config::download_low_speed_limit_kbps / 1024; |
589 | 0 | if (estimate_timeout < config::download_low_speed_time) { |
590 | 0 | estimate_timeout = config::download_low_speed_time; |
591 | 0 | } |
592 | |
|
593 | 0 | std::string local_filename; |
594 | 0 | RETURN_IF_ERROR(_replace_tablet_id(filename, local_tablet_id, &local_filename)); |
595 | 0 | std::string local_file_path = local_path + "/" + local_filename; |
596 | |
|
597 | 0 | LOG(INFO) << "clone begin to download file from: " << remote_file_url |
598 | 0 | << " to: " << local_file_path << ". size(B): " << file_size |
599 | 0 | << ", timeout(s): " << estimate_timeout; |
600 | |
|
601 | 0 | auto download_cb = [&remote_file_url, &remote_file_md5, estimate_timeout, |
602 | 0 | &local_file_path, file_size](HttpClient* client) { |
603 | 0 | RETURN_IF_ERROR(client->init(remote_file_url)); |
604 | 0 | client->set_timeout_ms(estimate_timeout * 1000); |
605 | 0 | RETURN_IF_ERROR(client->download(local_file_path)); |
606 | | |
607 | 0 | std::error_code ec; |
608 | | // Check file length |
609 | 0 | uint64_t local_file_size = std::filesystem::file_size(local_file_path, ec); |
610 | 0 | if (ec) { |
611 | 0 | LOG(WARNING) << "download file error" << ec.message(); |
612 | 0 | return Status::IOError("can't retrive file_size of {}, due to {}", |
613 | 0 | local_file_path, ec.message()); |
614 | 0 | } |
615 | 0 | if (local_file_size != file_size) { |
616 | 0 | LOG(WARNING) << "download file length error" |
617 | 0 | << ", remote_path=" << remote_file_url |
618 | 0 | << ", file_size=" << file_size |
619 | 0 | << ", local_file_size=" << local_file_size; |
620 | 0 | return Status::InternalError("downloaded file size is not equal"); |
621 | 0 | } |
622 | | |
623 | 0 | if (!remote_file_md5.empty()) { // keep compatibility |
624 | 0 | std::string local_file_md5; |
625 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(local_file_path, |
626 | 0 | &local_file_md5)); |
627 | 0 | if (local_file_md5 != remote_file_md5) { |
628 | 0 | LOG(WARNING) << "download file md5 error" |
629 | 0 | << ", remote_file_url=" << remote_file_url |
630 | 0 | << ", local_file_path=" << local_file_path |
631 | 0 | << ", remote_file_md5=" << remote_file_md5 |
632 | 0 | << ", local_file_md5=" << local_file_md5; |
633 | 0 | return Status::RuntimeError( |
634 | 0 | "download file {} md5 is not equal, local={}, remote={}", |
635 | 0 | remote_file_url, local_file_md5, remote_file_md5); |
636 | 0 | } |
637 | 0 | } |
638 | | |
639 | 0 | return io::global_local_filesystem()->permission( |
640 | 0 | local_file_path, io::LocalFileSystem::PERMS_OWNER_RW); |
641 | 0 | }; |
642 | 0 | auto status = HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, download_cb); |
643 | 0 | if (!status.ok()) { |
644 | 0 | LOG(WARNING) << "failed to download file from " << remote_file_url |
645 | 0 | << ", status: " << status.to_string(); |
646 | 0 | return status; |
647 | 0 | } |
648 | | |
649 | | // local_files always keep the updated local files |
650 | 0 | local_files[filename] = LocalFileStat {file_size, remote_file_md5}; |
651 | 0 | } |
652 | | |
653 | 0 | uint64_t total_time_ms = watch.elapsed_time() / 1000 / 1000; |
654 | 0 | total_time_ms = total_time_ms > 0 ? total_time_ms : 0; |
655 | 0 | double copy_rate = 0.0; |
656 | 0 | if (total_time_ms > 0) { |
657 | 0 | copy_rate = total_file_size / ((double)total_time_ms) / 1000; |
658 | 0 | } |
659 | 0 | LOG(INFO) << fmt::format( |
660 | 0 | "succeed to copy remote tablet {} to local tablet {}, total file size: {} B, cost: " |
661 | 0 | "{} ms, rate: {} MB/s", |
662 | 0 | remote_tablet_id, local_tablet_id, total_file_size, total_time_ms, copy_rate); |
663 | | |
664 | | // local_files: contain all remote files and local files |
665 | | // finally, delete local files which are not in remote |
666 | 0 | for (const auto& [local_file, local_filestat] : local_files) { |
667 | | // replace the tablet id in local file name with the remote tablet id, |
668 | | // in order to compare the file name. |
669 | 0 | std::string new_name; |
670 | 0 | Status st = _replace_tablet_id(local_file, remote_tablet_id, &new_name); |
671 | 0 | if (!st.ok()) { |
672 | 0 | LOG(WARNING) << "failed to replace tablet id. unknown local file: " << st |
673 | 0 | << ". ignore it"; |
674 | 0 | continue; |
675 | 0 | } |
676 | 0 | VLOG_CRITICAL << "new file name after replace tablet id: " << new_name; |
677 | 0 | const auto& find = remote_files.find(new_name); |
678 | 0 | if (find != remote_files.end()) { |
679 | 0 | continue; |
680 | 0 | } |
681 | | |
682 | | // delete |
683 | 0 | std::string full_local_file = local_path + "/" + local_file; |
684 | 0 | LOG(INFO) << "begin to delete local snapshot file: " << full_local_file |
685 | 0 | << ", it does not exist in remote"; |
686 | 0 | if (remove(full_local_file.c_str()) != 0) { |
687 | 0 | LOG(WARNING) << "failed to delete unknown local file: " << full_local_file |
688 | 0 | << ", error: " << strerror(errno) |
689 | 0 | << ", file size: " << local_filestat.size << ", ignore it"; |
690 | 0 | } |
691 | 0 | } |
692 | |
|
693 | 0 | ++finished_num; |
694 | 0 | } |
695 | | |
696 | 0 | LOG(INFO) << "finished to download snapshots. job: " << _job_id << ", task id: " << _task_id; |
697 | 0 | return status; |
698 | 0 | } |
699 | | |
700 | | // move the snapshot files in snapshot_path |
701 | | // to tablet_path |
702 | | // If overwrite, just replace the tablet_path with snapshot_path, |
703 | | // else: (TODO) |
704 | | // |
705 | | // MUST hold tablet's header lock, push lock, cumulative lock and base compaction lock |
706 | | Status SnapshotLoader::move(const std::string& snapshot_path, TabletSharedPtr tablet, |
707 | 0 | bool overwrite) { |
708 | 0 | auto tablet_path = tablet->tablet_path(); |
709 | 0 | auto store_path = tablet->data_dir()->path(); |
710 | 0 | LOG(INFO) << "begin to move snapshot files. from: " << snapshot_path << ", to: " << tablet_path |
711 | 0 | << ", store: " << store_path << ", job: " << _job_id << ", task id: " << _task_id; |
712 | |
|
713 | 0 | Status status = Status::OK(); |
714 | | |
715 | | // validate snapshot_path and tablet_path |
716 | 0 | int64_t snapshot_tablet_id = 0; |
717 | 0 | int32_t snapshot_schema_hash = 0; |
718 | 0 | RETURN_IF_ERROR(_get_tablet_id_and_schema_hash_from_file_path( |
719 | 0 | snapshot_path, &snapshot_tablet_id, &snapshot_schema_hash)); |
720 | | |
721 | 0 | int64_t tablet_id = 0; |
722 | 0 | int32_t schema_hash = 0; |
723 | 0 | RETURN_IF_ERROR( |
724 | 0 | _get_tablet_id_and_schema_hash_from_file_path(tablet_path, &tablet_id, &schema_hash)); |
725 | | |
726 | 0 | if (tablet_id != snapshot_tablet_id || schema_hash != snapshot_schema_hash) { |
727 | 0 | std::stringstream ss; |
728 | 0 | ss << "path does not match. snapshot: " << snapshot_path |
729 | 0 | << ", tablet path: " << tablet_path; |
730 | 0 | LOG(WARNING) << ss.str(); |
731 | 0 | return Status::InternalError(ss.str()); |
732 | 0 | } |
733 | | |
734 | 0 | DataDir* store = _engine.get_store(store_path); |
735 | 0 | if (store == nullptr) { |
736 | 0 | std::stringstream ss; |
737 | 0 | ss << "failed to get store by path: " << store_path; |
738 | 0 | LOG(WARNING) << ss.str(); |
739 | 0 | return Status::InternalError(ss.str()); |
740 | 0 | } |
741 | | |
742 | 0 | if (!std::filesystem::exists(tablet_path)) { |
743 | 0 | std::stringstream ss; |
744 | 0 | ss << "tablet path does not exist: " << tablet_path; |
745 | 0 | LOG(WARNING) << ss.str(); |
746 | 0 | return Status::InternalError(ss.str()); |
747 | 0 | } |
748 | | |
749 | 0 | if (!std::filesystem::exists(snapshot_path)) { |
750 | 0 | std::stringstream ss; |
751 | 0 | ss << "snapshot path does not exist: " << snapshot_path; |
752 | 0 | LOG(WARNING) << ss.str(); |
753 | 0 | return Status::InternalError(ss.str()); |
754 | 0 | } |
755 | | |
756 | | // rename the rowset ids and tabletid info in rowset meta |
757 | 0 | auto res = _engine.snapshot_mgr()->convert_rowset_ids(snapshot_path, tablet_id, |
758 | 0 | tablet->replica_id(), tablet->table_id(), |
759 | 0 | tablet->partition_id(), schema_hash); |
760 | 0 | if (!res.has_value()) [[unlikely]] { |
761 | 0 | auto err_msg = |
762 | 0 | fmt::format("failed to convert rowsetids in snapshot: {}, tablet path: {}, err: {}", |
763 | 0 | snapshot_path, tablet_path, res.error()); |
764 | 0 | LOG(WARNING) << err_msg; |
765 | 0 | return Status::InternalError(err_msg); |
766 | 0 | } |
767 | | |
768 | 0 | if (overwrite) { |
769 | 0 | std::vector<std::string> snapshot_files; |
770 | 0 | RETURN_IF_ERROR(_get_existing_files_from_local(snapshot_path, &snapshot_files)); |
771 | | |
772 | | // 1. simply delete the old dir and replace it with the snapshot dir |
773 | 0 | try { |
774 | | // This remove seems soft enough, because we already get |
775 | | // tablet id and schema hash from this path, which |
776 | | // means this path is a valid path. |
777 | 0 | std::filesystem::remove_all(tablet_path); |
778 | 0 | VLOG_CRITICAL << "remove dir: " << tablet_path; |
779 | 0 | std::filesystem::create_directory(tablet_path); |
780 | 0 | VLOG_CRITICAL << "re-create dir: " << tablet_path; |
781 | 0 | } catch (const std::filesystem::filesystem_error& e) { |
782 | 0 | std::stringstream ss; |
783 | 0 | ss << "failed to move tablet path: " << tablet_path << ". err: " << e.what(); |
784 | 0 | LOG(WARNING) << ss.str(); |
785 | 0 | return Status::InternalError(ss.str()); |
786 | 0 | } |
787 | | |
788 | | // link files one by one |
789 | | // files in snapshot dir will be moved in snapshot clean process |
790 | 0 | std::vector<std::string> linked_files; |
791 | 0 | for (auto& file : snapshot_files) { |
792 | 0 | auto full_src_path = fmt::format("{}/{}", snapshot_path, file); |
793 | 0 | auto full_dest_path = fmt::format("{}/{}", tablet_path, file); |
794 | 0 | if (link(full_src_path.c_str(), full_dest_path.c_str()) != 0) { |
795 | 0 | LOG(WARNING) << "failed to link file from " << full_src_path << " to " |
796 | 0 | << full_dest_path << ", err: " << std::strerror(errno); |
797 | | |
798 | | // clean the already linked files |
799 | 0 | for (auto& linked_file : linked_files) { |
800 | 0 | remove(linked_file.c_str()); |
801 | 0 | } |
802 | |
|
803 | 0 | return Status::InternalError("move tablet failed"); |
804 | 0 | } |
805 | 0 | linked_files.push_back(full_dest_path); |
806 | 0 | VLOG_CRITICAL << "link file from " << full_src_path << " to " << full_dest_path; |
807 | 0 | } |
808 | |
|
809 | 0 | } else { |
810 | 0 | LOG(FATAL) << "only support overwrite now"; |
811 | 0 | __builtin_unreachable(); |
812 | 0 | } |
813 | | |
814 | | // snapshot loader not need to change tablet uid |
815 | | // fixme: there is no header now and can not call load_one_tablet here |
816 | | // reload header |
817 | 0 | Status ost = _engine.tablet_manager()->load_tablet_from_dir(store, tablet_id, schema_hash, |
818 | 0 | tablet_path, true); |
819 | 0 | if (!ost.ok()) { |
820 | 0 | std::stringstream ss; |
821 | 0 | ss << "failed to reload header of tablet: " << tablet_id; |
822 | 0 | LOG(WARNING) << ss.str(); |
823 | 0 | return Status::InternalError(ss.str()); |
824 | 0 | } |
825 | 0 | LOG(INFO) << "finished to reload header of tablet: " << tablet_id; |
826 | |
|
827 | 0 | return status; |
828 | 0 | } |
829 | | |
830 | | Status SnapshotLoader::_get_tablet_id_and_schema_hash_from_file_path(const std::string& src_path, |
831 | | int64_t* tablet_id, |
832 | 2 | int32_t* schema_hash) { |
833 | | // path should be like: /path/.../tablet_id/schema_hash |
834 | | // we try to extract tablet_id from path |
835 | 2 | size_t pos = src_path.find_last_of("/"); |
836 | 2 | if (pos == std::string::npos || pos == src_path.length() - 1) { |
837 | 1 | return Status::InternalError("failed to get tablet id from path: {}", src_path); |
838 | 1 | } |
839 | | |
840 | 1 | std::string schema_hash_str = src_path.substr(pos + 1); |
841 | 1 | std::stringstream ss1; |
842 | 1 | ss1 << schema_hash_str; |
843 | 1 | ss1 >> *schema_hash; |
844 | | |
845 | | // skip schema hash part |
846 | 1 | size_t pos2 = src_path.find_last_of("/", pos - 1); |
847 | 1 | if (pos2 == std::string::npos) { |
848 | 0 | return Status::InternalError("failed to get tablet id from path: {}", src_path); |
849 | 0 | } |
850 | | |
851 | 1 | std::string tablet_str = src_path.substr(pos2 + 1, pos - pos2); |
852 | 1 | std::stringstream ss2; |
853 | 1 | ss2 << tablet_str; |
854 | 1 | ss2 >> *tablet_id; |
855 | | |
856 | 1 | VLOG_CRITICAL << "get tablet id " << *tablet_id << ", schema hash: " << *schema_hash |
857 | 0 | << " from path: " << src_path; |
858 | 1 | return Status::OK(); |
859 | 1 | } |
860 | | |
861 | | Status SnapshotLoader::_check_local_snapshot_paths( |
862 | 4 | const std::map<std::string, std::string>& src_to_dest_path, bool check_src) { |
863 | 4 | bool res = true; |
864 | 4 | for (const auto& pair : src_to_dest_path) { |
865 | 4 | std::string path; |
866 | 4 | if (check_src) { |
867 | 2 | path = pair.first; |
868 | 2 | } else { |
869 | 2 | path = pair.second; |
870 | 2 | } |
871 | | |
872 | 4 | RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(path, &res)); |
873 | 2 | if (!res) { |
874 | 0 | std::stringstream ss; |
875 | 0 | ss << "snapshot path is not directory or does not exist: " << path; |
876 | 0 | LOG(WARNING) << ss.str(); |
877 | 0 | return Status::RuntimeError(ss.str()); |
878 | 0 | } |
879 | 2 | } |
880 | 2 | LOG(INFO) << "all local snapshot paths are existing. num: " << src_to_dest_path.size(); |
881 | 2 | return Status::OK(); |
882 | 4 | } |
883 | | |
884 | | Status SnapshotLoader::_get_existing_files_from_local(const std::string& local_path, |
885 | 1 | std::vector<std::string>* local_files) { |
886 | 1 | bool exists = true; |
887 | 1 | std::vector<io::FileInfo> files; |
888 | 1 | RETURN_IF_ERROR(io::global_local_filesystem()->list(local_path, true, &files, &exists)); |
889 | 1 | for (auto& file : files) { |
890 | 0 | local_files->push_back(file.file_name); |
891 | 0 | } |
892 | 1 | LOG(INFO) << "finished to list files in local path: " << local_path |
893 | 1 | << ", file num: " << local_files->size(); |
894 | 1 | return Status::OK(); |
895 | 1 | } |
896 | | |
897 | | Status SnapshotLoader::_replace_tablet_id(const std::string& file_name, int64_t tablet_id, |
898 | 4 | std::string* new_file_name) { |
899 | | // eg: |
900 | | // 10007.hdr |
901 | | // 10007_2_2_0_0.idx |
902 | | // 10007_2_2_0_0.dat |
903 | 4 | if (_end_with(file_name, ".hdr")) { |
904 | 1 | std::stringstream ss; |
905 | 1 | ss << tablet_id << ".hdr"; |
906 | 1 | *new_file_name = ss.str(); |
907 | 1 | return Status::OK(); |
908 | 3 | } else if (_end_with(file_name, ".idx") || _end_with(file_name, ".dat")) { |
909 | 2 | *new_file_name = file_name; |
910 | 2 | return Status::OK(); |
911 | 2 | } else { |
912 | 1 | return Status::InternalError("invalid tablet file name: {}", file_name); |
913 | 1 | } |
914 | 4 | } |
915 | | |
916 | | Status SnapshotLoader::_get_tablet_id_from_remote_path(const std::string& remote_path, |
917 | 1 | int64_t* tablet_id) { |
918 | | // eg: |
919 | | // bos://xxx/../__tbl_10004/__part_10003/__idx_10004/__10005 |
920 | 1 | size_t pos = remote_path.find_last_of("_"); |
921 | 1 | if (pos == std::string::npos) { |
922 | 0 | return Status::InternalError("invalid remove file path: {}", remote_path); |
923 | 0 | } |
924 | | |
925 | 1 | std::string tablet_id_str = remote_path.substr(pos + 1); |
926 | 1 | std::stringstream ss; |
927 | 1 | ss << tablet_id_str; |
928 | 1 | ss >> *tablet_id; |
929 | | |
930 | 1 | return Status::OK(); |
931 | 1 | } |
932 | | |
933 | | // only return CANCELLED if FE return that job is cancelled. |
934 | | // otherwise, return OK |
935 | | Status SnapshotLoader::_report_every(int report_threshold, int* counter, int32_t finished_num, |
936 | 0 | int32_t total_num, TTaskType::type type) { |
937 | 0 | ++*counter; |
938 | 0 | if (*counter <= report_threshold) { |
939 | 0 | return Status::OK(); |
940 | 0 | } |
941 | | |
942 | 0 | LOG(INFO) << "report to frontend. job id: " << _job_id << ", task id: " << _task_id |
943 | 0 | << ", finished num: " << finished_num << ", total num:" << total_num; |
944 | |
|
945 | 0 | TNetworkAddress master_addr = _env->cluster_info()->master_fe_addr; |
946 | |
|
947 | 0 | TSnapshotLoaderReportRequest request; |
948 | 0 | request.job_id = _job_id; |
949 | 0 | request.task_id = _task_id; |
950 | 0 | request.task_type = type; |
951 | 0 | request.__set_finished_num(finished_num); |
952 | 0 | request.__set_total_num(total_num); |
953 | 0 | TStatus report_st; |
954 | |
|
955 | 0 | Status rpcStatus = ThriftRpcHelper::rpc<FrontendServiceClient>( |
956 | 0 | master_addr.hostname, master_addr.port, |
957 | 0 | [&request, &report_st](FrontendServiceConnection& client) { |
958 | 0 | client->snapshotLoaderReport(report_st, request); |
959 | 0 | }, |
960 | 0 | 10000); |
961 | |
|
962 | 0 | if (!rpcStatus.ok()) { |
963 | | // rpc failed, ignore |
964 | 0 | return Status::OK(); |
965 | 0 | } |
966 | | |
967 | | // reset |
968 | 0 | *counter = 0; |
969 | 0 | if (report_st.status_code == TStatusCode::CANCELLED) { |
970 | 0 | LOG(INFO) << "job is cancelled. job id: " << _job_id << ", task id: " << _task_id; |
971 | 0 | return Status::Cancelled("Cancelled"); |
972 | 0 | } |
973 | 0 | return Status::OK(); |
974 | 0 | } |
975 | | |
976 | | Status SnapshotLoader::_list_with_checksum(const std::string& dir, |
977 | 0 | std::map<std::string, FileStat>* md5_files) { |
978 | 0 | bool exists = true; |
979 | 0 | std::vector<io::FileInfo> files; |
980 | 0 | RETURN_IF_ERROR(_remote_fs->list(dir, true, &files, &exists)); |
981 | 0 | for (auto& tmp_file : files) { |
982 | 0 | io::Path path(tmp_file.file_name); |
983 | 0 | std::string file_name = path.filename(); |
984 | 0 | size_t pos = file_name.find_last_of("."); |
985 | 0 | if (pos == std::string::npos || pos == file_name.size() - 1) { |
986 | | // Not found checksum separator, ignore this file |
987 | 0 | continue; |
988 | 0 | } |
989 | 0 | FileStat stat = {std::string(file_name, 0, pos), std::string(file_name, pos + 1), |
990 | 0 | tmp_file.file_size}; |
991 | 0 | md5_files->emplace(std::string(file_name, 0, pos), stat); |
992 | 0 | } |
993 | |
|
994 | 0 | return Status::OK(); |
995 | 0 | } |
996 | | |
997 | | } // end namespace doris |