Coverage Report

Created: 2025-07-27 00:54

/root/doris/be/src/runtime/snapshot_loader.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/snapshot_loader.h"
19
20
// IWYU pragma: no_include <bthread/errno.h>
21
#include <errno.h> // IWYU pragma: keep
22
#include <fmt/format.h>
23
#include <gen_cpp/FrontendService.h>
24
#include <gen_cpp/FrontendService_types.h>
25
#include <gen_cpp/HeartbeatService_types.h>
26
#include <gen_cpp/PlanNodes_types.h>
27
#include <gen_cpp/Status_types.h>
28
#include <gen_cpp/Types_types.h>
29
#include <stdint.h>
30
#include <stdio.h>
31
#include <unistd.h>
32
33
#include <algorithm>
34
#include <condition_variable>
35
#include <cstring>
36
#include <filesystem>
37
#include <istream>
38
#include <unordered_map>
39
#include <utility>
40
41
#include "common/config.h"
42
#include "common/logging.h"
43
#include "gutil/strings/split.h"
44
#include "http/http_client.h"
45
#include "io/fs/broker_file_system.h"
46
#include "io/fs/file_system.h"
47
#include "io/fs/file_writer.h"
48
#include "io/fs/hdfs_file_system.h"
49
#include "io/fs/local_file_system.h"
50
#include "io/fs/path.h"
51
#include "io/fs/remote_file_system.h"
52
#include "io/fs/s3_file_system.h"
53
#include "io/hdfs_builder.h"
54
#include "olap/data_dir.h"
55
#include "olap/snapshot_manager.h"
56
#include "olap/storage_engine.h"
57
#include "olap/tablet.h"
58
#include "olap/tablet_manager.h"
59
#include "runtime/client_cache.h"
60
#include "runtime/exec_env.h"
61
#include "util/s3_uri.h"
62
#include "util/s3_util.h"
63
#include "util/thrift_rpc_helper.h"
64
65
namespace doris {
66
67
0
static std::string get_loaded_tag_path(const std::string& snapshot_path) {
68
0
    return snapshot_path + "/LOADED";
69
0
}
70
71
0
static Status write_loaded_tag(const std::string& snapshot_path, int64_t tablet_id) {
72
0
    std::unique_ptr<io::FileWriter> writer;
73
0
    std::string file = get_loaded_tag_path(snapshot_path);
74
0
    RETURN_IF_ERROR(io::global_local_filesystem()->create_file(file, &writer));
75
0
    return writer->close();
76
0
}
77
78
Status upload_with_checksum(io::RemoteFileSystem& fs, std::string_view local_path,
79
0
                            std::string_view remote_path, std::string_view checksum) {
80
0
    auto full_remote_path = fmt::format("{}.{}", remote_path, checksum);
81
0
    switch (fs.type()) {
82
0
    case io::FileSystemType::HDFS:
83
0
    case io::FileSystemType::BROKER: {
84
0
        std::string temp = fmt::format("{}.part", remote_path);
85
0
        RETURN_IF_ERROR(fs.upload(local_path, temp));
86
0
        RETURN_IF_ERROR(fs.rename(temp, full_remote_path));
87
0
        break;
88
0
    }
89
0
    case io::FileSystemType::S3:
90
0
        RETURN_IF_ERROR(fs.upload(local_path, full_remote_path));
91
0
        break;
92
0
    default:
93
0
        LOG(FATAL) << "unknown fs type: " << static_cast<int>(fs.type());
94
0
    }
95
0
    return Status::OK();
96
0
}
97
98
SnapshotLoader::SnapshotLoader(ExecEnv* env, int64_t job_id, int64_t task_id)
99
        : _env(env),
100
          _job_id(job_id),
101
          _task_id(task_id),
102
          _broker_addr(TNetworkAddress()),
103
          _prop(std::map<std::string, std::string>()),
104
1
          _remote_fs(nullptr) {}
105
106
SnapshotLoader::SnapshotLoader(ExecEnv* env, int64_t job_id, int64_t task_id,
107
                               const TNetworkAddress& broker_addr,
108
                               const std::map<std::string, std::string>& prop)
109
0
        : _env(env), _job_id(job_id), _task_id(task_id), _broker_addr(broker_addr), _prop(prop) {}
110
111
0
Status SnapshotLoader::init(TStorageBackendType::type type, const std::string& location) {
112
0
    if (TStorageBackendType::type::S3 == type) {
113
0
        S3Conf s3_conf;
114
0
        S3URI s3_uri(location);
115
0
        RETURN_IF_ERROR(s3_uri.parse());
116
0
        RETURN_IF_ERROR(S3ClientFactory::convert_properties_to_s3_conf(_prop, s3_uri, &s3_conf));
117
0
        std::shared_ptr<io::S3FileSystem> fs;
118
0
        RETURN_IF_ERROR(io::S3FileSystem::create(std::move(s3_conf), "", nullptr, &fs));
119
0
        _remote_fs = std::move(fs);
120
0
    } else if (TStorageBackendType::type::HDFS == type) {
121
0
        THdfsParams hdfs_params = parse_properties(_prop);
122
0
        std::shared_ptr<io::HdfsFileSystem> fs;
123
0
        RETURN_IF_ERROR(
124
0
                io::HdfsFileSystem::create(hdfs_params, "", hdfs_params.fs_name, nullptr, &fs));
125
0
        _remote_fs = std::move(fs);
126
0
    } else if (TStorageBackendType::type::BROKER == type) {
127
0
        std::shared_ptr<io::BrokerFileSystem> fs;
128
0
        RETURN_IF_ERROR(io::BrokerFileSystem::create(_broker_addr, _prop, &fs));
129
0
        _remote_fs = std::move(fs);
130
0
    } else {
131
0
        return Status::InternalError("Unknown storage type: {}", type);
132
0
    }
133
0
    return Status::OK();
134
0
}
135
136
1
SnapshotLoader::~SnapshotLoader() = default;
137
138
Status SnapshotLoader::upload(const std::map<std::string, std::string>& src_to_dest_path,
139
0
                              std::map<int64_t, std::vector<std::string>>* tablet_files) {
140
0
    if (!_remote_fs) {
141
0
        return Status::InternalError("Storage backend not initialized.");
142
0
    }
143
0
    LOG(INFO) << "begin to upload snapshot files. num: " << src_to_dest_path.size()
144
0
              << ", broker addr: " << _broker_addr << ", job: " << _job_id << ", task" << _task_id;
145
146
    // check if job has already been cancelled
147
0
    int tmp_counter = 1;
148
0
    RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::UPLOAD));
149
150
0
    Status status = Status::OK();
151
    // 1. validate local tablet snapshot paths
152
0
    RETURN_IF_ERROR(_check_local_snapshot_paths(src_to_dest_path, true));
153
154
    // 2. for each src path, upload it to remote storage
155
    // we report to frontend for every 10 files, and we will cancel the job if
156
    // the job has already been cancelled in frontend.
157
0
    int report_counter = 0;
158
0
    int total_num = src_to_dest_path.size();
159
0
    int finished_num = 0;
160
0
    for (auto iter = src_to_dest_path.begin(); iter != src_to_dest_path.end(); iter++) {
161
0
        const std::string& src_path = iter->first;
162
0
        const std::string& dest_path = iter->second;
163
164
        // Take a lock to protect the local snapshot path.
165
0
        auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(src_path);
166
167
0
        int64_t tablet_id = 0;
168
0
        int32_t schema_hash = 0;
169
0
        RETURN_IF_ERROR(
170
0
                _get_tablet_id_and_schema_hash_from_file_path(src_path, &tablet_id, &schema_hash));
171
172
        // 2.1 get existing files from remote path
173
0
        std::map<std::string, FileStat> remote_files;
174
0
        RETURN_IF_ERROR(_list_with_checksum(dest_path, &remote_files));
175
176
0
        for (auto& tmp : remote_files) {
177
0
            VLOG_CRITICAL << "get remote file: " << tmp.first << ", checksum: " << tmp.second.md5;
178
0
        }
179
180
        // 2.2 list local files
181
0
        std::vector<std::string> local_files;
182
0
        std::vector<std::string> local_files_with_checksum;
183
0
        RETURN_IF_ERROR(_get_existing_files_from_local(src_path, &local_files));
184
185
        // 2.3 iterate local files
186
0
        for (auto it = local_files.begin(); it != local_files.end(); it++) {
187
0
            RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num,
188
0
                                          TTaskType::type::UPLOAD));
189
190
0
            const std::string& local_file = *it;
191
            // calc md5sum of localfile
192
0
            std::string md5sum;
193
0
            RETURN_IF_ERROR(
194
0
                    io::global_local_filesystem()->md5sum(src_path + "/" + local_file, &md5sum));
195
0
            VLOG_CRITICAL << "get file checksum: " << local_file << ": " << md5sum;
196
0
            local_files_with_checksum.push_back(local_file + "." + md5sum);
197
198
            // check if this local file need upload
199
0
            bool need_upload = false;
200
0
            auto find = remote_files.find(local_file);
201
0
            if (find != remote_files.end()) {
202
0
                if (md5sum != find->second.md5) {
203
                    // remote storage file exist, but with different checksum
204
0
                    LOG(WARNING) << "remote file checksum is invalid. remote: " << find->first
205
0
                                 << ", local: " << md5sum;
206
                    // TODO(cmy): save these files and delete them later
207
0
                    need_upload = true;
208
0
                }
209
0
            } else {
210
0
                need_upload = true;
211
0
            }
212
213
0
            if (!need_upload) {
214
0
                VLOG_CRITICAL << "file exist in remote path, no need to upload: " << local_file;
215
0
                continue;
216
0
            }
217
218
            // upload
219
0
            std::string remote_path = dest_path + '/' + local_file;
220
0
            std::string local_path = src_path + '/' + local_file;
221
0
            RETURN_IF_ERROR(upload_with_checksum(*_remote_fs, local_path, remote_path, md5sum));
222
0
        } // end for each tablet's local files
223
224
0
        tablet_files->emplace(tablet_id, local_files_with_checksum);
225
0
        finished_num++;
226
0
        LOG(INFO) << "finished to write tablet to remote. local path: " << src_path
227
0
                  << ", remote path: " << dest_path;
228
0
    } // end for each tablet path
229
230
0
    LOG(INFO) << "finished to upload snapshots. job: " << _job_id << ", task id: " << _task_id;
231
0
    return status;
232
0
}
233
234
/*
235
 * Download snapshot files from remote.
236
 * After downloaded, the local dir should contains all files existing in remote,
237
 * may also contains several useless files.
238
 */
239
Status SnapshotLoader::download(const std::map<std::string, std::string>& src_to_dest_path,
240
0
                                std::vector<int64_t>* downloaded_tablet_ids) {
241
0
    if (!_remote_fs) {
242
0
        return Status::InternalError("Storage backend not initialized.");
243
0
    }
244
0
    LOG(INFO) << "begin to download snapshot files. num: " << src_to_dest_path.size()
245
0
              << ", broker addr: " << _broker_addr << ", job: " << _job_id
246
0
              << ", task id: " << _task_id;
247
248
    // check if job has already been cancelled
249
0
    int tmp_counter = 1;
250
0
    RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::DOWNLOAD));
251
252
0
    Status status = Status::OK();
253
    // 1. validate local tablet snapshot paths
254
0
    RETURN_IF_ERROR(_check_local_snapshot_paths(src_to_dest_path, false));
255
256
    // 2. for each src path, download it to local storage
257
0
    int report_counter = 0;
258
0
    int total_num = src_to_dest_path.size();
259
0
    int finished_num = 0;
260
0
    for (auto iter = src_to_dest_path.begin(); iter != src_to_dest_path.end(); iter++) {
261
0
        const std::string& remote_path = iter->first;
262
0
        const std::string& local_path = iter->second;
263
264
        // Take a lock to protect the local snapshot path.
265
0
        auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(local_path);
266
267
0
        int64_t local_tablet_id = 0;
268
0
        int32_t schema_hash = 0;
269
0
        RETURN_IF_ERROR(_get_tablet_id_and_schema_hash_from_file_path(local_path, &local_tablet_id,
270
0
                                                                      &schema_hash));
271
0
        downloaded_tablet_ids->push_back(local_tablet_id);
272
273
0
        int64_t remote_tablet_id;
274
0
        RETURN_IF_ERROR(_get_tablet_id_from_remote_path(remote_path, &remote_tablet_id));
275
0
        VLOG_CRITICAL << "get local tablet id: " << local_tablet_id
276
0
                      << ", schema hash: " << schema_hash
277
0
                      << ", remote tablet id: " << remote_tablet_id;
278
279
        // 2.1. get local files
280
0
        std::vector<std::string> local_files;
281
0
        RETURN_IF_ERROR(_get_existing_files_from_local(local_path, &local_files));
282
283
        // 2.2. get remote files
284
0
        std::map<std::string, FileStat> remote_files;
285
0
        RETURN_IF_ERROR(_list_with_checksum(remote_path, &remote_files));
286
0
        if (remote_files.empty()) {
287
0
            std::stringstream ss;
288
0
            ss << "get nothing from remote path: " << remote_path;
289
0
            LOG(WARNING) << ss.str();
290
0
            return Status::InternalError(ss.str());
291
0
        }
292
293
0
        TabletSharedPtr tablet =
294
0
                StorageEngine::instance()->tablet_manager()->get_tablet(local_tablet_id);
295
0
        if (tablet == nullptr) {
296
0
            std::stringstream ss;
297
0
            ss << "failed to get local tablet: " << local_tablet_id;
298
0
            LOG(WARNING) << ss.str();
299
0
            return Status::InternalError(ss.str());
300
0
        }
301
0
        DataDir* data_dir = tablet->data_dir();
302
303
0
        for (auto& iter : remote_files) {
304
0
            RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num,
305
0
                                          TTaskType::type::DOWNLOAD));
306
307
0
            bool need_download = false;
308
0
            const std::string& remote_file = iter.first;
309
0
            const FileStat& file_stat = iter.second;
310
0
            auto find = std::find(local_files.begin(), local_files.end(), remote_file);
311
0
            if (find == local_files.end()) {
312
                // remote file does not exist in local, download it
313
0
                need_download = true;
314
0
            } else {
315
0
                if (_end_with(remote_file, ".hdr")) {
316
                    // this is a header file, download it.
317
0
                    need_download = true;
318
0
                } else {
319
                    // check checksum
320
0
                    std::string local_md5sum;
321
0
                    Status st = io::global_local_filesystem()->md5sum(
322
0
                            local_path + "/" + remote_file, &local_md5sum);
323
0
                    if (!st.ok()) {
324
0
                        LOG(WARNING) << "failed to get md5sum of local file: " << remote_file
325
0
                                     << ". msg: " << st << ". download it";
326
0
                        need_download = true;
327
0
                    } else {
328
0
                        VLOG_CRITICAL << "get local file checksum: " << remote_file << ": "
329
0
                                      << local_md5sum;
330
0
                        if (file_stat.md5 != local_md5sum) {
331
                            // file's checksum does not equal, download it.
332
0
                            need_download = true;
333
0
                        }
334
0
                    }
335
0
                }
336
0
            }
337
338
0
            if (!need_download) {
339
0
                LOG(INFO) << "remote file already exist in local, no need to download."
340
0
                          << ", file: " << remote_file;
341
0
                continue;
342
0
            }
343
344
            // begin to download
345
0
            std::string full_remote_file = remote_path + "/" + remote_file + "." + file_stat.md5;
346
0
            std::string local_file_name;
347
            // we need to replace the tablet_id in remote file name with local tablet id
348
0
            RETURN_IF_ERROR(_replace_tablet_id(remote_file, local_tablet_id, &local_file_name));
349
0
            std::string full_local_file = local_path + "/" + local_file_name;
350
0
            LOG(INFO) << "begin to download from " << full_remote_file << " to " << full_local_file;
351
0
            size_t file_len = file_stat.size;
352
353
            // check disk capacity
354
0
            if (data_dir->reach_capacity_limit(file_len)) {
355
0
                return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
356
0
                        "reach the capacity limit of path {}, file_size={}", data_dir->path(),
357
0
                        file_len);
358
0
            }
359
            // remove file which will be downloaded now.
360
            // this file will be added to local_files if it be downloaded successfully.
361
0
            if (find != local_files.end()) {
362
0
                local_files.erase(find);
363
0
            }
364
0
            RETURN_IF_ERROR(_remote_fs->download(full_remote_file, full_local_file));
365
366
            // 3. check md5 of the downloaded file
367
0
            std::string downloaded_md5sum;
368
0
            RETURN_IF_ERROR(
369
0
                    io::global_local_filesystem()->md5sum(full_local_file, &downloaded_md5sum));
370
0
            VLOG_CRITICAL << "get downloaded file checksum: " << full_local_file << ": "
371
0
                          << downloaded_md5sum;
372
0
            if (downloaded_md5sum != file_stat.md5) {
373
0
                std::stringstream ss;
374
0
                ss << "invalid md5 of downloaded file: " << full_local_file
375
0
                   << ", expected: " << file_stat.md5 << ", get: " << downloaded_md5sum;
376
0
                LOG(WARNING) << ss.str();
377
0
                return Status::InternalError(ss.str());
378
0
            }
379
380
            // local_files always keep the updated local files
381
0
            local_files.push_back(local_file_name);
382
0
            LOG(INFO) << "finished to download file via broker. file: " << full_local_file
383
0
                      << ", length: " << file_len;
384
0
        } // end for all remote files
385
386
        // finally, delete local files which are not in remote
387
0
        for (const auto& local_file : local_files) {
388
            // replace the tablet id in local file name with the remote tablet id,
389
            // in order to compare the file name.
390
0
            std::string new_name;
391
0
            Status st = _replace_tablet_id(local_file, remote_tablet_id, &new_name);
392
0
            if (!st.ok()) {
393
0
                LOG(WARNING) << "failed to replace tablet id. unknown local file: " << st
394
0
                             << ". ignore it";
395
0
                continue;
396
0
            }
397
0
            VLOG_CRITICAL << "new file name after replace tablet id: " << new_name;
398
0
            const auto& find = remote_files.find(new_name);
399
0
            if (find != remote_files.end()) {
400
0
                continue;
401
0
            }
402
403
            // delete
404
0
            std::string full_local_file = local_path + "/" + local_file;
405
0
            VLOG_CRITICAL << "begin to delete local snapshot file: " << full_local_file
406
0
                          << ", it does not exist in remote";
407
0
            if (remove(full_local_file.c_str()) != 0) {
408
0
                LOG(WARNING) << "failed to delete unknown local file: " << full_local_file
409
0
                             << ", ignore it";
410
0
            }
411
0
        }
412
413
0
        finished_num++;
414
0
    } // end for src_to_dest_path
415
416
0
    LOG(INFO) << "finished to download snapshots. job: " << _job_id << ", task id: " << _task_id;
417
0
    return status;
418
0
}
419
420
Status SnapshotLoader::remote_http_download(
421
        const std::vector<TRemoteTabletSnapshot>& remote_tablet_snapshots,
422
0
        std::vector<int64_t>* downloaded_tablet_ids) {
423
0
    LOG(INFO) << fmt::format("begin to download snapshots via http. job: {}, task id: {}", _job_id,
424
0
                             _task_id);
425
0
    constexpr uint32_t kDownloadFileMaxRetry = 3;
426
427
    // check if job has already been cancelled
428
0
    int tmp_counter = 1;
429
0
    RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::DOWNLOAD));
430
0
    Status status = Status::OK();
431
432
0
    int report_counter = 0;
433
0
    int finished_num = 0;
434
0
    int total_num = remote_tablet_snapshots.size();
435
0
    for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
436
0
        const auto& local_path = remote_tablet_snapshot.local_snapshot_path;
437
0
        const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path;
438
0
        LOG(INFO) << fmt::format(
439
0
                "download snapshots via http. job: {}, task id: {}, local dir: {}, remote dir: {}",
440
0
                _job_id, _task_id, local_path, remote_path);
441
442
        // Take a lock to protect the local snapshot path.
443
0
        auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(local_path);
444
445
        // Step 1: Validate local tablet snapshot paths
446
0
        bool res = true;
447
0
        RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(local_path, &res));
448
0
        if (!res) {
449
0
            std::stringstream ss;
450
0
            auto err_msg =
451
0
                    fmt::format("snapshot path is not directory or does not exist: {}", local_path);
452
0
            LOG(WARNING) << err_msg;
453
0
            return Status::RuntimeError(err_msg);
454
0
        }
455
456
        // Step 2: get all local files
457
0
        struct LocalFileStat {
458
0
            uint64_t size;
459
0
            std::string md5;
460
0
        };
461
0
        std::unordered_map<std::string, LocalFileStat> local_files;
462
0
        std::vector<std::string> existing_files;
463
0
        RETURN_IF_ERROR(_get_existing_files_from_local(local_path, &existing_files));
464
0
        for (auto& local_file : existing_files) {
465
            // add file size
466
0
            std::string local_file_path = local_path + "/" + local_file;
467
0
            std::error_code ec;
468
0
            uint64_t local_file_size = std::filesystem::file_size(local_file_path, ec);
469
0
            if (ec) {
470
0
                LOG(WARNING) << "download file error" << ec.message();
471
0
                return Status::IOError("can't retrive file_size of {}, due to {}", local_file_path,
472
0
                                       ec.message());
473
0
            }
474
0
            std::string md5;
475
0
            if (config::enable_download_md5sum_check) {
476
0
                auto status = io::global_local_filesystem()->md5sum(local_file_path, &md5);
477
0
                if (!status.ok()) {
478
0
                    LOG(WARNING) << "download file error, local file " << local_file_path
479
0
                                 << " md5sum: " << status.to_string();
480
0
                    return status;
481
0
                }
482
0
            }
483
0
            local_files[local_file] = {local_file_size, md5};
484
0
        }
485
0
        existing_files.clear();
486
487
        // Step 3: Validate remote tablet snapshot paths && remote files map
488
        // key is remote snapshot paths, value is filelist
489
        // get all these use http download action
490
        // http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180//2774718/217609978/2774718.hdr
491
0
        struct RemoteFileStat {
492
0
            std::string url;
493
0
            std::string md5;
494
0
            uint64_t size;
495
0
        };
496
0
        std::unordered_map<std::string, RemoteFileStat> remote_files;
497
0
        const auto& token = remote_tablet_snapshot.remote_token;
498
0
        const auto& remote_be_addr = remote_tablet_snapshot.remote_be_addr;
499
500
        // HEAD http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180/
501
0
        std::string base_url = fmt::format("http://{}:{}/api/_tablet/_download?token={}",
502
0
                                           remote_be_addr.hostname, remote_be_addr.port, token);
503
0
        std::string remote_url_prefix = fmt::format("{}&file={}", base_url, remote_path);
504
505
0
        string file_list_str;
506
0
        auto list_files_cb = [&remote_url_prefix, &file_list_str](HttpClient* client) {
507
0
            RETURN_IF_ERROR(client->init(remote_url_prefix));
508
0
            client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
509
0
            return client->execute(&file_list_str);
510
0
        };
511
0
        RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, list_files_cb));
512
0
        std::vector<string> filename_list =
513
0
                strings::Split(file_list_str, "\n", strings::SkipWhitespace());
514
515
0
        for (const auto& filename : filename_list) {
516
0
            std::string remote_file_url =
517
0
                    fmt::format("{}&file={}/{}&channel=ingest_binlog", base_url,
518
0
                                remote_tablet_snapshot.remote_snapshot_path, filename);
519
520
            // get file length
521
0
            uint64_t file_size = 0;
522
0
            std::string file_md5;
523
0
            auto get_file_stat_cb = [&remote_file_url, &file_size, &file_md5](HttpClient* client) {
524
0
                int64_t timeout_ms = config::download_binlog_meta_timeout_ms;
525
0
                std::string url = remote_file_url;
526
0
                if (config::enable_download_md5sum_check) {
527
                    // compute md5sum is time-consuming, so we set a longer timeout
528
0
                    timeout_ms = config::download_binlog_meta_timeout_ms * 3;
529
0
                    url = fmt::format("{}&acquire_md5=true", remote_file_url);
530
0
                }
531
0
                RETURN_IF_ERROR(client->init(url));
532
0
                client->set_timeout_ms(timeout_ms);
533
0
                RETURN_IF_ERROR(client->head());
534
0
                RETURN_IF_ERROR(client->get_content_length(&file_size));
535
0
                if (config::enable_download_md5sum_check) {
536
0
                    RETURN_IF_ERROR(client->get_content_md5(&file_md5));
537
0
                }
538
0
                return Status::OK();
539
0
            };
540
0
            RETURN_IF_ERROR(
541
0
                    HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, get_file_stat_cb));
542
543
0
            remote_files[filename] = RemoteFileStat {remote_file_url, file_md5, file_size};
544
0
        }
545
546
        // Step 4: Compare local and remote files && get all need download files
547
0
        RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num,
548
0
                                      TTaskType::type::DOWNLOAD));
549
550
        // get all need download files
551
0
        std::vector<std::string> need_download_files;
552
0
        for (const auto& [remote_file, remote_filestat] : remote_files) {
553
0
            LOG(INFO) << "remote file: " << remote_file << ", size: " << remote_filestat.size
554
0
                      << ", md5: " << remote_filestat.md5;
555
0
            auto it = local_files.find(remote_file);
556
0
            if (it == local_files.end()) {
557
0
                need_download_files.emplace_back(remote_file);
558
0
                continue;
559
0
            }
560
0
            if (_end_with(remote_file, ".hdr")) {
561
0
                need_download_files.emplace_back(remote_file);
562
0
                continue;
563
0
            }
564
565
0
            if (auto& local_filestat = it->second; local_filestat.size != remote_filestat.size) {
566
0
                need_download_files.emplace_back(remote_file);
567
0
                continue;
568
0
            }
569
570
0
            if (auto& local_filestat = it->second;
571
0
                !remote_filestat.md5.empty() && // compatible with old version
572
0
                local_filestat.md5 != remote_filestat.md5) {
573
0
                need_download_files.emplace_back(remote_file);
574
0
                continue;
575
0
            }
576
577
0
            LOG(INFO) << fmt::format("file {} already exists, skip download", remote_file);
578
0
        }
579
580
0
        auto local_tablet_id = remote_tablet_snapshot.local_tablet_id;
581
0
        TabletSharedPtr tablet =
582
0
                StorageEngine::instance()->tablet_manager()->get_tablet(local_tablet_id);
583
0
        if (tablet == nullptr) {
584
0
            std::stringstream ss;
585
0
            ss << "failed to get local tablet: " << local_tablet_id;
586
0
            LOG(WARNING) << ss.str();
587
0
            return Status::InternalError(ss.str());
588
0
        }
589
0
        DataDir* data_dir = tablet->data_dir();
590
591
        // download all need download files
592
0
        uint64_t total_file_size = 0;
593
0
        MonotonicStopWatch watch;
594
0
        watch.start();
595
0
        for (auto& filename : need_download_files) {
596
0
            auto& remote_filestat = remote_files[filename];
597
0
            auto file_size = remote_filestat.size;
598
0
            auto& remote_file_url = remote_filestat.url;
599
0
            auto& remote_file_md5 = remote_filestat.md5;
600
601
            // check disk capacity
602
0
            if (data_dir->reach_capacity_limit(file_size)) {
603
0
                return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
604
0
                        "reach the capacity limit of path {}, file_size={}", data_dir->path(),
605
0
                        file_size);
606
0
            }
607
608
0
            total_file_size += file_size;
609
0
            uint64_t estimate_timeout = file_size / config::download_low_speed_limit_kbps / 1024;
610
0
            if (estimate_timeout < config::download_low_speed_time) {
611
0
                estimate_timeout = config::download_low_speed_time;
612
0
            }
613
614
0
            std::string local_filename;
615
0
            RETURN_IF_ERROR(_replace_tablet_id(filename, local_tablet_id, &local_filename));
616
0
            std::string local_file_path = local_path + "/" + local_filename;
617
618
0
            LOG(INFO) << "clone begin to download file from: " << remote_file_url
619
0
                      << " to: " << local_file_path << ". size(B): " << file_size
620
0
                      << ", timeout(s): " << estimate_timeout;
621
622
0
            auto download_cb = [&remote_file_url, &remote_file_md5, estimate_timeout,
623
0
                                &local_file_path, file_size](HttpClient* client) {
624
0
                RETURN_IF_ERROR(client->init(remote_file_url));
625
0
                client->set_timeout_ms(estimate_timeout * 1000);
626
0
                RETURN_IF_ERROR(client->download(local_file_path));
627
628
0
                std::error_code ec;
629
                // Check file length
630
0
                uint64_t local_file_size = std::filesystem::file_size(local_file_path, ec);
631
0
                if (ec) {
632
0
                    LOG(WARNING) << "download file error" << ec.message();
633
0
                    return Status::IOError("can't retrive file_size of {}, due to {}",
634
0
                                           local_file_path, ec.message());
635
0
                }
636
0
                if (local_file_size != file_size) {
637
0
                    LOG(WARNING) << "download file length error"
638
0
                                 << ", remote_path=" << remote_file_url
639
0
                                 << ", file_size=" << file_size
640
0
                                 << ", local_file_size=" << local_file_size;
641
0
                    return Status::InternalError("downloaded file size is not equal");
642
0
                }
643
644
0
                if (!remote_file_md5.empty()) { // keep compatibility
645
0
                    std::string local_file_md5;
646
0
                    RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(local_file_path,
647
0
                                                                          &local_file_md5));
648
0
                    if (local_file_md5 != remote_file_md5) {
649
0
                        LOG(WARNING) << "download file md5 error"
650
0
                                     << ", remote_file_url=" << remote_file_url
651
0
                                     << ", local_file_path=" << local_file_path
652
0
                                     << ", remote_file_md5=" << remote_file_md5
653
0
                                     << ", local_file_md5=" << local_file_md5;
654
0
                        return Status::RuntimeError(
655
0
                                "download file {} md5 is not equal, local={}, remote={}",
656
0
                                remote_file_url, local_file_md5, remote_file_md5);
657
0
                    }
658
0
                }
659
660
0
                return io::global_local_filesystem()->permission(
661
0
                        local_file_path, io::LocalFileSystem::PERMS_OWNER_RW);
662
0
            };
663
0
            auto status = HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, download_cb);
664
0
            if (!status.ok()) {
665
0
                LOG(WARNING) << "failed to download file from " << remote_file_url
666
0
                             << ", status: " << status.to_string();
667
0
                return status;
668
0
            }
669
670
            // local_files always keep the updated local files
671
0
            local_files[filename] = LocalFileStat {file_size, remote_file_md5};
672
0
        }
673
674
0
        uint64_t total_time_ms = watch.elapsed_time() / 1000 / 1000;
675
0
        total_time_ms = total_time_ms > 0 ? total_time_ms : 0;
676
0
        double copy_rate = 0.0;
677
0
        if (total_time_ms > 0) {
678
0
            copy_rate = total_file_size / ((double)total_time_ms) / 1000;
679
0
        }
680
0
        auto remote_tablet_id = remote_tablet_snapshot.remote_tablet_id;
681
0
        LOG(INFO) << fmt::format(
682
0
                "succeed to copy remote tablet {} to local tablet {}, total file size: {} B, cost: "
683
0
                "{} ms, rate: {} MB/s",
684
0
                remote_tablet_id, local_tablet_id, total_file_size, total_time_ms, copy_rate);
685
686
        // local_files: contain all remote files and local files
687
        // finally, delete local files which are not in remote
688
0
        for (const auto& [local_file, local_filestat] : local_files) {
689
            // replace the tablet id in local file name with the remote tablet id,
690
            // in order to compare the file name.
691
0
            std::string new_name;
692
0
            Status st = _replace_tablet_id(local_file, remote_tablet_id, &new_name);
693
0
            if (!st.ok()) {
694
0
                LOG(WARNING) << "failed to replace tablet id. unknown local file: " << st
695
0
                             << ". ignore it";
696
0
                continue;
697
0
            }
698
0
            VLOG_CRITICAL << "new file name after replace tablet id: " << new_name;
699
0
            const auto& find = remote_files.find(new_name);
700
0
            if (find != remote_files.end()) {
701
0
                continue;
702
0
            }
703
704
            // delete
705
0
            std::string full_local_file = local_path + "/" + local_file;
706
0
            LOG(INFO) << "begin to delete local snapshot file: " << full_local_file
707
0
                      << ", it does not exist in remote";
708
0
            if (remove(full_local_file.c_str()) != 0) {
709
0
                LOG(WARNING) << "failed to delete unknown local file: " << full_local_file
710
0
                             << ", error: " << strerror(errno)
711
0
                             << ", file size: " << local_filestat.size << ", ignore it";
712
0
            }
713
0
        }
714
715
0
        ++finished_num;
716
0
    }
717
718
0
    LOG(INFO) << "finished to download snapshots. job: " << _job_id << ", task id: " << _task_id;
719
0
    return status;
720
0
}
721
722
// move the snapshot files in snapshot_path
723
// to tablet_path
724
// If overwrite, just replace the tablet_path with snapshot_path,
725
// else: (TODO)
726
//
727
// MUST hold tablet's header lock, push lock, cumulative lock and base compaction lock
728
Status SnapshotLoader::move(const std::string& snapshot_path, TabletSharedPtr tablet,
729
0
                            bool overwrite) {
730
    // Take a lock to protect the local snapshot path.
731
0
    auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(snapshot_path);
732
733
0
    auto tablet_path = tablet->tablet_path();
734
0
    auto store_path = tablet->data_dir()->path();
735
0
    LOG(INFO) << "begin to move snapshot files. from: " << snapshot_path << ", to: " << tablet_path
736
0
              << ", store: " << store_path << ", job: " << _job_id << ", task id: " << _task_id;
737
738
0
    Status status = Status::OK();
739
740
    // validate snapshot_path and tablet_path
741
0
    int64_t snapshot_tablet_id = 0;
742
0
    int32_t snapshot_schema_hash = 0;
743
0
    RETURN_IF_ERROR(_get_tablet_id_and_schema_hash_from_file_path(
744
0
            snapshot_path, &snapshot_tablet_id, &snapshot_schema_hash));
745
746
0
    int64_t tablet_id = 0;
747
0
    int32_t schema_hash = 0;
748
0
    RETURN_IF_ERROR(
749
0
            _get_tablet_id_and_schema_hash_from_file_path(tablet_path, &tablet_id, &schema_hash));
750
751
0
    if (tablet_id != snapshot_tablet_id || schema_hash != snapshot_schema_hash) {
752
0
        std::stringstream ss;
753
0
        ss << "path does not match. snapshot: " << snapshot_path
754
0
           << ", tablet path: " << tablet_path;
755
0
        LOG(WARNING) << ss.str();
756
0
        return Status::InternalError(ss.str());
757
0
    }
758
759
0
    DataDir* store = StorageEngine::instance()->get_store(store_path);
760
0
    if (store == nullptr) {
761
0
        std::stringstream ss;
762
0
        ss << "failed to get store by path: " << store_path;
763
0
        LOG(WARNING) << ss.str();
764
0
        return Status::InternalError(ss.str());
765
0
    }
766
767
0
    if (!std::filesystem::exists(tablet_path)) {
768
0
        std::stringstream ss;
769
0
        ss << "tablet path does not exist: " << tablet_path;
770
0
        LOG(WARNING) << ss.str();
771
0
        return Status::InternalError(ss.str());
772
0
    }
773
774
0
    if (!std::filesystem::exists(snapshot_path)) {
775
0
        std::stringstream ss;
776
0
        ss << "snapshot path does not exist: " << snapshot_path;
777
0
        LOG(WARNING) << ss.str();
778
0
        return Status::InternalError(ss.str());
779
0
    }
780
781
0
    std::string loaded_tag_path = get_loaded_tag_path(snapshot_path);
782
0
    bool already_loaded = false;
783
0
    RETURN_IF_ERROR(io::global_local_filesystem()->exists(loaded_tag_path, &already_loaded));
784
0
    if (already_loaded) {
785
0
        LOG(INFO) << "snapshot path already moved: " << snapshot_path;
786
0
        return Status::OK();
787
0
    }
788
789
    // rename the rowset ids and tabletid info in rowset meta
790
0
    auto res = SnapshotManager::instance()->convert_rowset_ids(
791
0
            snapshot_path, tablet_id, tablet->replica_id(), tablet->partition_id(), schema_hash);
792
0
    if (!res.has_value()) [[unlikely]] {
793
0
        auto err_msg =
794
0
                fmt::format("failed to convert rowsetids in snapshot: {}, tablet path: {}, err: {}",
795
0
                            snapshot_path, tablet_path, res.error());
796
0
        LOG(WARNING) << err_msg;
797
0
        return Status::InternalError(err_msg);
798
0
    }
799
800
0
    if (!overwrite) {
801
0
        LOG(FATAL) << "only support overwrite now";
802
0
        __builtin_unreachable();
803
0
    }
804
805
    // Medium migration/clone/checkpoint/compaction may change or check the
806
    // files and tablet meta, so we need to take these locks.
807
0
    std::unique_lock migration_lock(tablet->get_migration_lock(), std::try_to_lock);
808
0
    std::unique_lock base_compact_lock(tablet->get_base_compaction_lock(), std::try_to_lock);
809
0
    std::unique_lock cumu_compact_lock(tablet->get_cumulative_compaction_lock(), std::try_to_lock);
810
0
    std::unique_lock cold_compact_lock(tablet->get_cold_compaction_lock(), std::try_to_lock);
811
0
    std::unique_lock build_idx_lock(tablet->get_build_inverted_index_lock(), std::try_to_lock);
812
0
    std::unique_lock meta_store_lock(tablet->get_meta_store_lock(), std::try_to_lock);
813
0
    if (!migration_lock.owns_lock() || !base_compact_lock.owns_lock() ||
814
0
        !cumu_compact_lock.owns_lock() || !cold_compact_lock.owns_lock() ||
815
0
        !build_idx_lock.owns_lock() || !meta_store_lock.owns_lock()) {
816
        // This error should be retryable
817
0
        auto status = Status::ObtainLockFailed("failed to get tablet locks, tablet: {}", tablet_id);
818
0
        LOG(WARNING) << status << ", snapshot path: " << snapshot_path
819
0
                     << ", tablet path: " << tablet_path;
820
0
        return status;
821
0
    }
822
823
0
    std::vector<std::string> snapshot_files;
824
0
    RETURN_IF_ERROR(_get_existing_files_from_local(snapshot_path, &snapshot_files));
825
826
    // FIXME: the below logic will demage the tablet files if failed in the middle.
827
828
    // 1. simply delete the old dir and replace it with the snapshot dir
829
0
    try {
830
        // This remove seems soft enough, because we already get
831
        // tablet id and schema hash from this path, which
832
        // means this path is a valid path.
833
0
        std::filesystem::remove_all(tablet_path);
834
0
        VLOG_CRITICAL << "remove dir: " << tablet_path;
835
0
        std::filesystem::create_directory(tablet_path);
836
0
        VLOG_CRITICAL << "re-create dir: " << tablet_path;
837
0
    } catch (const std::filesystem::filesystem_error& e) {
838
0
        std::stringstream ss;
839
0
        ss << "failed to move tablet path: " << tablet_path << ". err: " << e.what();
840
0
        LOG(WARNING) << ss.str();
841
0
        return Status::InternalError(ss.str());
842
0
    }
843
844
    // link files one by one
845
    // files in snapshot dir will be moved in snapshot clean process
846
0
    std::vector<std::string> linked_files;
847
0
    for (auto& file : snapshot_files) {
848
0
        auto full_src_path = fmt::format("{}/{}", snapshot_path, file);
849
0
        auto full_dest_path = fmt::format("{}/{}", tablet_path, file);
850
0
        if (link(full_src_path.c_str(), full_dest_path.c_str()) != 0) {
851
0
            LOG(WARNING) << "failed to link file from " << full_src_path << " to " << full_dest_path
852
0
                         << ", err: " << std::strerror(errno);
853
854
            // clean the already linked files
855
0
            for (auto& linked_file : linked_files) {
856
0
                remove(linked_file.c_str());
857
0
            }
858
859
0
            return Status::InternalError("move tablet failed");
860
0
        }
861
0
        linked_files.push_back(full_dest_path);
862
0
        VLOG_CRITICAL << "link file from " << full_src_path << " to " << full_dest_path;
863
0
    }
864
865
    // snapshot loader not need to change tablet uid
866
    // fixme: there is no header now and can not call load_one_tablet here
867
    // reload header
868
0
    Status ost = StorageEngine::instance()->tablet_manager()->load_tablet_from_dir(
869
0
            store, tablet_id, schema_hash, tablet_path, true);
870
0
    if (!ost.ok()) {
871
0
        std::stringstream ss;
872
0
        ss << "failed to reload header of tablet: " << tablet_id;
873
0
        LOG(WARNING) << ss.str();
874
0
        return Status::InternalError(ss.str());
875
0
    }
876
877
    // mark the snapshot path as loaded
878
0
    RETURN_IF_ERROR(write_loaded_tag(snapshot_path, tablet_id));
879
880
0
    LOG(INFO) << "finished to reload header of tablet: " << tablet_id;
881
882
0
    return status;
883
0
}
884
885
11
bool SnapshotLoader::_end_with(const std::string& str, const std::string& match) {
886
11
    if (str.size() >= match.size() &&
887
11
        str.compare(str.size() - match.size(), match.size(), match) == 0) {
888
4
        return true;
889
4
    }
890
7
    return false;
891
11
}
892
893
Status SnapshotLoader::_get_tablet_id_and_schema_hash_from_file_path(const std::string& src_path,
894
                                                                     int64_t* tablet_id,
895
2
                                                                     int32_t* schema_hash) {
896
    // path should be like: /path/.../tablet_id/schema_hash
897
    // we try to extract tablet_id from path
898
2
    size_t pos = src_path.find_last_of("/");
899
2
    if (pos == std::string::npos || pos == src_path.length() - 1) {
900
1
        return Status::InternalError("failed to get tablet id from path: {}", src_path);
901
1
    }
902
903
1
    std::string schema_hash_str = src_path.substr(pos + 1);
904
1
    std::stringstream ss1;
905
1
    ss1 << schema_hash_str;
906
1
    ss1 >> *schema_hash;
907
908
    // skip schema hash part
909
1
    size_t pos2 = src_path.find_last_of("/", pos - 1);
910
1
    if (pos2 == std::string::npos) {
911
0
        return Status::InternalError("failed to get tablet id from path: {}", src_path);
912
0
    }
913
914
1
    std::string tablet_str = src_path.substr(pos2 + 1, pos - pos2);
915
1
    std::stringstream ss2;
916
1
    ss2 << tablet_str;
917
1
    ss2 >> *tablet_id;
918
919
1
    VLOG_CRITICAL << "get tablet id " << *tablet_id << ", schema hash: " << *schema_hash
920
0
                  << " from path: " << src_path;
921
1
    return Status::OK();
922
1
}
923
924
Status SnapshotLoader::_check_local_snapshot_paths(
925
4
        const std::map<std::string, std::string>& src_to_dest_path, bool check_src) {
926
4
    bool res = true;
927
4
    for (const auto& pair : src_to_dest_path) {
928
4
        std::string path;
929
4
        if (check_src) {
930
2
            path = pair.first;
931
2
        } else {
932
2
            path = pair.second;
933
2
        }
934
935
4
        RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(path, &res));
936
2
        if (!res) {
937
0
            std::stringstream ss;
938
0
            ss << "snapshot path is not directory or does not exist: " << path;
939
0
            LOG(WARNING) << ss.str();
940
0
            return Status::RuntimeError(ss.str());
941
0
        }
942
2
    }
943
2
    LOG(INFO) << "all local snapshot paths are existing. num: " << src_to_dest_path.size();
944
2
    return Status::OK();
945
4
}
946
947
Status SnapshotLoader::_get_existing_files_from_local(const std::string& local_path,
948
1
                                                      std::vector<std::string>* local_files) {
949
1
    bool exists = true;
950
1
    std::vector<io::FileInfo> files;
951
1
    RETURN_IF_ERROR(io::global_local_filesystem()->list(local_path, true, &files, &exists));
952
1
    for (auto& file : files) {
953
0
        local_files->push_back(file.file_name);
954
0
    }
955
1
    LOG(INFO) << "finished to list files in local path: " << local_path
956
1
              << ", file num: " << local_files->size();
957
1
    return Status::OK();
958
1
}
959
960
Status SnapshotLoader::_replace_tablet_id(const std::string& file_name, int64_t tablet_id,
961
4
                                          std::string* new_file_name) {
962
    // eg:
963
    // 10007.hdr
964
    // 10007_2_2_0_0.idx
965
    // 10007_2_2_0_0.dat
966
4
    if (_end_with(file_name, ".hdr")) {
967
1
        std::stringstream ss;
968
1
        ss << tablet_id << ".hdr";
969
1
        *new_file_name = ss.str();
970
1
        return Status::OK();
971
3
    } else if (_end_with(file_name, ".idx") || _end_with(file_name, ".dat")) {
972
2
        *new_file_name = file_name;
973
2
        return Status::OK();
974
2
    } else {
975
1
        return Status::InternalError("invalid tablet file name: {}", file_name);
976
1
    }
977
4
}
978
979
Status SnapshotLoader::_get_tablet_id_from_remote_path(const std::string& remote_path,
980
1
                                                       int64_t* tablet_id) {
981
    // eg:
982
    // bos://xxx/../__tbl_10004/__part_10003/__idx_10004/__10005
983
1
    size_t pos = remote_path.find_last_of("_");
984
1
    if (pos == std::string::npos) {
985
0
        return Status::InternalError("invalid remove file path: {}", remote_path);
986
0
    }
987
988
1
    std::string tablet_id_str = remote_path.substr(pos + 1);
989
1
    std::stringstream ss;
990
1
    ss << tablet_id_str;
991
1
    ss >> *tablet_id;
992
993
1
    return Status::OK();
994
1
}
995
996
// only return CANCELLED if FE return that job is cancelled.
997
// otherwise, return OK
998
Status SnapshotLoader::_report_every(int report_threshold, int* counter, int32_t finished_num,
999
0
                                     int32_t total_num, TTaskType::type type) {
1000
0
    ++*counter;
1001
0
    if (*counter <= report_threshold) {
1002
0
        return Status::OK();
1003
0
    }
1004
1005
0
    LOG(INFO) << "report to frontend. job id: " << _job_id << ", task id: " << _task_id
1006
0
              << ", finished num: " << finished_num << ", total num:" << total_num;
1007
1008
0
    TNetworkAddress master_addr = _env->master_info()->network_address;
1009
1010
0
    TSnapshotLoaderReportRequest request;
1011
0
    request.job_id = _job_id;
1012
0
    request.task_id = _task_id;
1013
0
    request.task_type = type;
1014
0
    request.__set_finished_num(finished_num);
1015
0
    request.__set_total_num(total_num);
1016
0
    TStatus report_st;
1017
1018
0
    Status rpcStatus = ThriftRpcHelper::rpc<FrontendServiceClient>(
1019
0
            master_addr.hostname, master_addr.port,
1020
0
            [&request, &report_st](FrontendServiceConnection& client) {
1021
0
                client->snapshotLoaderReport(report_st, request);
1022
0
            },
1023
0
            10000);
1024
1025
0
    if (!rpcStatus.ok()) {
1026
        // rpc failed, ignore
1027
0
        return Status::OK();
1028
0
    }
1029
1030
    // reset
1031
0
    *counter = 0;
1032
0
    if (report_st.status_code == TStatusCode::CANCELLED) {
1033
0
        LOG(INFO) << "job is cancelled. job id: " << _job_id << ", task id: " << _task_id;
1034
0
        return Status::Cancelled("Cancelled");
1035
0
    }
1036
0
    return Status::OK();
1037
0
}
1038
1039
Status SnapshotLoader::_list_with_checksum(const std::string& dir,
1040
0
                                           std::map<std::string, FileStat>* md5_files) {
1041
0
    bool exists = true;
1042
0
    std::vector<io::FileInfo> files;
1043
0
    RETURN_IF_ERROR(_remote_fs->list(dir, true, &files, &exists));
1044
0
    for (auto& tmp_file : files) {
1045
0
        io::Path path(tmp_file.file_name);
1046
0
        std::string file_name = path.filename();
1047
0
        size_t pos = file_name.find_last_of(".");
1048
0
        if (pos == std::string::npos || pos == file_name.size() - 1) {
1049
            // Not found checksum separator, ignore this file
1050
0
            continue;
1051
0
        }
1052
0
        FileStat stat = {std::string(file_name, 0, pos), std::string(file_name, pos + 1),
1053
0
                         tmp_file.file_size};
1054
0
        md5_files->emplace(std::string(file_name, 0, pos), stat);
1055
0
    }
1056
1057
0
    return Status::OK();
1058
0
}
1059
1060
} // end namespace doris