Coverage Report

Created: 2025-07-28 21:04

/root/doris/be/src/runtime/snapshot_loader.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/snapshot_loader.h"
19
20
// IWYU pragma: no_include <bthread/errno.h>
21
#include <errno.h> // IWYU pragma: keep
22
#include <fmt/format.h>
23
#include <gen_cpp/AgentService_types.h>
24
#include <gen_cpp/FrontendService.h>
25
#include <gen_cpp/FrontendService_types.h>
26
#include <gen_cpp/HeartbeatService_types.h>
27
#include <gen_cpp/PlanNodes_types.h>
28
#include <gen_cpp/Status_types.h>
29
#include <gen_cpp/Types_types.h>
30
31
#include <algorithm>
32
#include <condition_variable>
33
#include <cstddef>
34
#include <cstring>
35
#include <filesystem>
36
#include <istream>
37
#include <unordered_map>
38
#include <utility>
39
40
#include "common/config.h"
41
#include "common/logging.h"
42
#include "gutil/strings/split.h"
43
#include "http/http_client.h"
44
#include "io/fs/broker_file_system.h"
45
#include "io/fs/file_system.h"
46
#include "io/fs/hdfs_file_system.h"
47
#include "io/fs/local_file_system.h"
48
#include "io/fs/path.h"
49
#include "io/fs/remote_file_system.h"
50
#include "io/fs/s3_file_system.h"
51
#include "io/hdfs_builder.h"
52
#include "olap/data_dir.h"
53
#include "olap/snapshot_manager.h"
54
#include "olap/storage_engine.h"
55
#include "olap/tablet.h"
56
#include "olap/tablet_manager.h"
57
#include "runtime/client_cache.h"
58
#include "runtime/exec_env.h"
59
#include "util/s3_uri.h"
60
#include "util/s3_util.h"
61
#include "util/thrift_rpc_helper.h"
62
63
namespace doris {
64
65
struct LocalFileStat {
66
    uint64_t size;
67
    std::string md5;
68
};
69
70
struct RemoteFileStat {
71
    std::string url;
72
    std::string md5;
73
    uint64_t size;
74
};
75
76
class SnapshotHttpDownloader {
77
public:
78
    SnapshotHttpDownloader(const TRemoteTabletSnapshot& remote_tablet_snapshot,
79
                           TabletSharedPtr tablet, SnapshotLoader& snapshot_loader)
80
            : _tablet(std::move(tablet)),
81
              _snapshot_loader(snapshot_loader),
82
              _local_tablet_id(remote_tablet_snapshot.local_tablet_id),
83
              _remote_tablet_id(remote_tablet_snapshot.remote_tablet_id),
84
              _local_path(remote_tablet_snapshot.local_snapshot_path),
85
              _remote_path(remote_tablet_snapshot.remote_snapshot_path),
86
1
              _remote_be_addr(remote_tablet_snapshot.remote_be_addr) {
87
1
        auto& token = remote_tablet_snapshot.remote_token;
88
1
        auto& remote_be_addr = remote_tablet_snapshot.remote_be_addr;
89
90
        // HEAD http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180/
91
1
        _base_url = fmt::format("http://{}:{}/api/_tablet/_download?token={}&channel=ingest_binlog",
92
1
                                remote_be_addr.hostname, remote_be_addr.port, token);
93
1
    }
94
1
    ~SnapshotHttpDownloader() = default;
95
    SnapshotHttpDownloader(const SnapshotHttpDownloader&) = delete;
96
    SnapshotHttpDownloader& operator=(const SnapshotHttpDownloader&) = delete;
97
98
0
    void set_report_progress_callback(std::function<Status()> report_progress) {
99
0
        _report_progress_callback = std::move(report_progress);
100
0
    }
101
102
1
    size_t get_download_file_num() { return _need_download_files.size(); }
103
104
    Status download();
105
106
private:
107
    constexpr static int kDownloadFileMaxRetry = 3;
108
109
    // Load existing files from local snapshot path, compute the md5sum of the files
110
    // if enable_download_md5sum_check is true
111
    Status _load_existing_files();
112
113
    // List remote files from remote be, and find the hdr file
114
    Status _list_remote_files();
115
116
    // Download hdr file from remote be to a tmp file
117
    Status _download_hdr_file();
118
119
    // Link same rowset files by compare local hdr file and remote hdr file
120
    // if the local files are copied from the remote rowset, link them as the
121
    // remote rowset files, to avoid the duplicated downloading.
122
    Status _link_same_rowset_files();
123
124
    // Get all remote file stats, excluding the hdr file.
125
    Status _get_remote_file_stats();
126
127
    // Compute the need download files according to the local files md5sum (if enable_download_md5sum_check is true)
128
    void _get_need_download_files();
129
130
    // Download all need download files
131
    Status _download_files();
132
133
    // Install remote hdr file to local snapshot path from the tmp file
134
    Status _install_remote_hdr_file();
135
136
    // Delete orphan files, which are not in remote
137
    Status _delete_orphan_files();
138
139
    // Download a file from remote be to local path with the file stat
140
    Status _download_http_file(DataDir* data_dir, const std::string& remote_file_url,
141
                               const std::string& local_file_path,
142
                               const RemoteFileStat& remote_filestat);
143
144
    // Get the file stat from remote be
145
    Status _get_http_file_stat(const std::string& remote_file_url, RemoteFileStat* file_stat);
146
147
    TabletSharedPtr _tablet;
148
    SnapshotLoader& _snapshot_loader;
149
    std::function<Status()> _report_progress_callback;
150
151
    std::string _base_url;
152
    int64_t _local_tablet_id;
153
    int64_t _remote_tablet_id;
154
    const std::string& _local_path;
155
    const std::string& _remote_path;
156
    const TNetworkAddress& _remote_be_addr;
157
158
    std::string _local_hdr_filename;
159
    std::string _remote_hdr_filename;
160
    std::vector<std::string> _remote_file_list;
161
    std::unordered_map<std::string, LocalFileStat> _local_files;
162
    std::unordered_map<std::string, RemoteFileStat> _remote_files;
163
164
    std::string _tmp_hdr_file;
165
    RemoteFileStat _remote_hdr_filestat;
166
    std::vector<std::string> _need_download_files;
167
};
168
169
3
static std::string get_loaded_tag_path(const std::string& snapshot_path) {
170
3
    return snapshot_path + "/LOADED";
171
3
}
172
173
1
static Status write_loaded_tag(const std::string& snapshot_path, int64_t tablet_id) {
174
1
    std::unique_ptr<io::FileWriter> writer;
175
1
    std::string file = get_loaded_tag_path(snapshot_path);
176
1
    RETURN_IF_ERROR(io::global_local_filesystem()->create_file(file, &writer));
177
1
    return writer->close();
178
1
}
179
180
Status upload_with_checksum(io::RemoteFileSystem& fs, std::string_view local_path,
181
0
                            std::string_view remote_path, std::string_view checksum) {
182
0
    auto full_remote_path = fmt::format("{}.{}", remote_path, checksum);
183
0
    switch (fs.type()) {
184
0
    case io::FileSystemType::HDFS:
  Branch (184:5): [True: 0, False: 0]
185
0
    case io::FileSystemType::BROKER: {
  Branch (185:5): [True: 0, False: 0]
186
0
        std::string temp = fmt::format("{}.part", remote_path);
187
0
        RETURN_IF_ERROR(fs.upload(local_path, temp));
188
0
        RETURN_IF_ERROR(fs.rename(temp, full_remote_path));
189
0
        break;
190
0
    }
191
0
    case io::FileSystemType::S3:
  Branch (191:5): [True: 0, False: 0]
192
0
        RETURN_IF_ERROR(fs.upload(local_path, full_remote_path));
193
0
        break;
194
0
    default:
  Branch (194:5): [True: 0, False: 0]
195
0
        LOG(FATAL) << "unknown fs type: " << static_cast<int>(fs.type());
196
0
    }
197
0
    return Status::OK();
198
0
}
199
200
18
bool _end_with(std::string_view str, std::string_view match) {
201
18
    return str.size() >= match.size() &&
  Branch (201:12): [True: 18, False: 0]
202
18
           str.compare(str.size() - match.size(), match.size(), match) == 0;
  Branch (202:12): [True: 8, False: 10]
203
18
}
204
205
Status SnapshotHttpDownloader::_get_http_file_stat(const std::string& remote_file_url,
206
2
                                                   RemoteFileStat* file_stat) {
207
2
    uint64_t file_size = 0;
208
2
    std::string file_md5;
209
2
    auto get_file_stat_cb = [&remote_file_url, &file_size, &file_md5](HttpClient* client) {
210
2
        int64_t timeout_ms = config::download_binlog_meta_timeout_ms;
211
2
        std::string url = remote_file_url;
212
2
        if (config::enable_download_md5sum_check) {
  Branch (212:13): [True: 0, False: 2]
213
            // compute md5sum is time-consuming, so we set a longer timeout
214
0
            timeout_ms = config::download_binlog_meta_timeout_ms * 3;
215
0
            url = fmt::format("{}&acquire_md5=true", remote_file_url);
216
0
        }
217
2
        RETURN_IF_ERROR(client->init(url));
218
2
        client->set_timeout_ms(timeout_ms);
219
2
        RETURN_IF_ERROR(client->head());
220
2
        RETURN_IF_ERROR(client->get_content_length(&file_size));
221
2
        if (config::enable_download_md5sum_check) {
  Branch (221:13): [True: 0, False: 2]
222
0
            RETURN_IF_ERROR(client->get_content_md5(&file_md5));
223
0
        }
224
2
        return Status::OK();
225
2
    };
226
2
    RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, get_file_stat_cb));
227
2
    file_stat->url = remote_file_url;
228
2
    file_stat->size = file_size;
229
2
    file_stat->md5 = std::move(file_md5);
230
2
    return Status::OK();
231
2
}
232
233
Status SnapshotHttpDownloader::_download_http_file(DataDir* data_dir,
234
                                                   const std::string& remote_file_url,
235
                                                   const std::string& local_file_path,
236
1
                                                   const RemoteFileStat& remote_filestat) {
237
1
    auto file_size = remote_filestat.size;
238
1
    const auto& remote_file_md5 = remote_filestat.md5;
239
240
    // check disk capacity
241
1
    if (data_dir->reach_capacity_limit(file_size)) {
  Branch (241:9): [True: 0, False: 1]
242
0
        return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
243
0
                "reach the capacity limit of path {}, file_size={}", data_dir->path(), file_size);
244
0
    }
245
246
1
    uint64_t estimate_timeout = file_size / config::download_low_speed_limit_kbps / 1024;
247
1
    if (estimate_timeout < config::download_low_speed_time) {
  Branch (247:9): [True: 1, False: 0]
248
1
        estimate_timeout = config::download_low_speed_time;
249
1
    }
250
251
1
    LOG(INFO) << "clone begin to download file from: " << remote_file_url
252
1
              << " to: " << local_file_path << ". size(B): " << file_size
253
1
              << ", timeout(s): " << estimate_timeout;
254
255
1
    auto download_cb = [&remote_file_url, &remote_file_md5, estimate_timeout, &local_file_path,
256
1
                        file_size](HttpClient* client) {
257
1
        RETURN_IF_ERROR(client->init(remote_file_url));
258
1
        client->set_timeout_ms(estimate_timeout * 1000);
259
1
        RETURN_IF_ERROR(client->download(local_file_path));
260
261
1
        std::error_code ec;
262
        // Check file length
263
1
        uint64_t local_file_size = std::filesystem::file_size(local_file_path, ec);
264
1
        if (ec) {
  Branch (264:13): [True: 0, False: 1]
265
0
            LOG(WARNING) << "download file error" << ec.message();
266
0
            return Status::IOError("can't retrive file_size of {}, due to {}", local_file_path,
267
0
                                   ec.message());
268
0
        }
269
1
        if (local_file_size != file_size) {
  Branch (269:13): [True: 0, False: 1]
270
0
            LOG(WARNING) << "download file length error"
271
0
                         << ", remote_path=" << remote_file_url << ", file_size=" << file_size
272
0
                         << ", local_file_size=" << local_file_size;
273
0
            return Status::InternalError("downloaded file size is not equal");
274
0
        }
275
276
1
        if (!remote_file_md5.empty()) { // keep compatibility
  Branch (276:13): [True: 0, False: 1]
277
0
            std::string local_file_md5;
278
0
            RETURN_IF_ERROR(
279
0
                    io::global_local_filesystem()->md5sum(local_file_path, &local_file_md5));
280
0
            if (local_file_md5 != remote_file_md5) {
  Branch (280:17): [True: 0, False: 0]
281
0
                LOG(WARNING) << "download file md5 error"
282
0
                             << ", remote_file_url=" << remote_file_url
283
0
                             << ", local_file_path=" << local_file_path
284
0
                             << ", remote_file_md5=" << remote_file_md5
285
0
                             << ", local_file_md5=" << local_file_md5;
286
0
                return Status::RuntimeError(
287
0
                        "download file {} md5 is not equal, local={}, remote={}", remote_file_url,
288
0
                        local_file_md5, remote_file_md5);
289
0
            }
290
0
        }
291
292
1
        return io::global_local_filesystem()->permission(local_file_path,
293
1
                                                         io::LocalFileSystem::PERMS_OWNER_RW);
294
1
    };
295
1
    auto status = HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, download_cb);
296
1
    if (!status.ok()) {
  Branch (296:9): [True: 0, False: 1]
297
0
        LOG(WARNING) << "failed to download file from " << remote_file_url
298
0
                     << ", status: " << status.to_string();
299
0
        return status;
300
0
    }
301
302
1
    return Status::OK();
303
1
}
304
305
1
Status SnapshotHttpDownloader::_load_existing_files() {
306
1
    std::vector<std::string> existing_files;
307
1
    RETURN_IF_ERROR(_snapshot_loader._get_existing_files_from_local(_local_path, &existing_files));
308
2
    for (auto& local_file : existing_files) {
  Branch (308:27): [True: 2, False: 1]
309
        // add file size
310
2
        std::string local_file_path = _local_path + "/" + local_file;
311
2
        std::error_code ec;
312
2
        uint64_t local_file_size = std::filesystem::file_size(local_file_path, ec);
313
2
        if (ec) {
  Branch (313:13): [True: 0, False: 2]
314
0
            LOG(WARNING) << "download file error, can't retrive file_size of " << local_file_path
315
0
                         << ", due to " << ec.message();
316
0
            return Status::IOError("can't retrive file_size of {}, due to {}", local_file_path,
317
0
                                   ec.message());
318
0
        }
319
320
        // get md5sum
321
2
        std::string md5;
322
2
        if (config::enable_download_md5sum_check) {
  Branch (322:13): [True: 0, False: 2]
323
0
            auto status = io::global_local_filesystem()->md5sum(local_file_path, &md5);
324
0
            if (!status.ok()) {
  Branch (324:17): [True: 0, False: 0]
325
0
                LOG(WARNING) << "download file error, local file " << local_file_path
326
0
                             << " md5sum: " << status.to_string();
327
0
                return status;
328
0
            }
329
0
        }
330
2
        _local_files[local_file] = {local_file_size, md5};
331
332
        // get hdr file
333
2
        if (local_file.ends_with(".hdr")) {
  Branch (333:13): [True: 1, False: 1]
334
1
            _local_hdr_filename = local_file;
335
1
        }
336
2
    }
337
338
1
    return Status::OK();
339
1
}
340
341
1
Status SnapshotHttpDownloader::_list_remote_files() {
342
    // get all these use http download action
343
    // http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180//2774718/217609978/2774718.hdr
344
1
    std::string remote_url_prefix = fmt::format("{}&file={}", _base_url, _remote_path);
345
346
1
    LOG(INFO) << "list remote files: " << remote_url_prefix << ", job: " << _snapshot_loader._job_id
347
1
              << ", task id: " << _snapshot_loader._task_id << ", remote be: " << _remote_be_addr;
348
349
1
    std::string remote_file_list_str;
350
1
    auto list_files_cb = [&remote_url_prefix, &remote_file_list_str](HttpClient* client) {
351
1
        RETURN_IF_ERROR(client->init(remote_url_prefix));
352
1
        client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
353
1
        return client->execute(&remote_file_list_str);
354
1
    };
355
1
    RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, list_files_cb));
356
357
1
    _remote_file_list = strings::Split(remote_file_list_str, "\n", strings::SkipWhitespace());
358
359
    // find hdr file
360
1
    auto hdr_file =
361
1
            std::find_if(_remote_file_list.begin(), _remote_file_list.end(),
362
1
                         [](const std::string& filename) { return _end_with(filename, ".hdr"); });
363
1
    if (hdr_file == _remote_file_list.end()) {
  Branch (363:9): [True: 0, False: 1]
364
0
        std::string msg =
365
0
                fmt::format("can't find hdr file in remote snapshot path: {}", _remote_path);
366
0
        LOG(WARNING) << msg;
367
0
        return Status::RuntimeError(std::move(msg));
368
0
    }
369
1
    _remote_hdr_filename = *hdr_file;
370
1
    _remote_file_list.erase(hdr_file);
371
372
1
    return Status::OK();
373
1
}
374
375
1
Status SnapshotHttpDownloader::_download_hdr_file() {
376
1
    RemoteFileStat remote_hdr_stat;
377
1
    std::string remote_hdr_file_url =
378
1
            fmt::format("{}&file={}/{}", _base_url, _remote_path, _remote_hdr_filename);
379
1
    auto status = _get_http_file_stat(remote_hdr_file_url, &remote_hdr_stat);
380
1
    if (!status.ok()) {
  Branch (380:9): [True: 0, False: 1]
381
0
        LOG(WARNING) << "failed to get remote hdr file stat: " << remote_hdr_file_url
382
0
                     << ", error: " << status.to_string();
383
0
        return status;
384
0
    }
385
386
1
    std::string hdr_filename = _remote_hdr_filename + ".tmp";
387
1
    std::string hdr_file = _local_path + "/" + hdr_filename;
388
1
    status = _download_http_file(_tablet->data_dir(), remote_hdr_file_url, hdr_file,
389
1
                                 remote_hdr_stat);
390
1
    if (!status.ok()) {
  Branch (390:9): [True: 0, False: 1]
391
0
        LOG(WARNING) << "failed to download remote hdr file: " << remote_hdr_file_url
392
0
                     << ", error: " << status.to_string();
393
0
        return status;
394
0
    }
395
1
    _tmp_hdr_file = hdr_file;
396
1
    _remote_hdr_filestat = remote_hdr_stat;
397
1
    return Status::OK();
398
1
}
399
400
1
Status SnapshotHttpDownloader::_link_same_rowset_files() {
401
1
    std::string local_hdr_file_path = _local_path + "/" + _local_hdr_filename;
402
403
    // load local tablet meta
404
1
    TabletMetaPB local_tablet_meta;
405
1
    auto status = TabletMeta::load_from_file(local_hdr_file_path, &local_tablet_meta);
406
1
    if (!status.ok()) {
  Branch (406:9): [True: 0, False: 1]
407
        // This file might broken because of the partial downloading.
408
0
        LOG(WARNING) << "failed to load local tablet meta: " << local_hdr_file_path
409
0
                     << ", skip link same rowset files, error: " << status.to_string();
410
0
        return Status::OK();
411
0
    }
412
413
    // load remote tablet meta
414
1
    TabletMetaPB remote_tablet_meta;
415
1
    status = TabletMeta::load_from_file(_tmp_hdr_file, &remote_tablet_meta);
416
1
    if (!status.ok()) {
  Branch (416:9): [True: 0, False: 1]
417
0
        LOG(WARNING) << "failed to load remote tablet meta: " << _tmp_hdr_file
418
0
                     << ", error: " << status.to_string();
419
0
        return status;
420
0
    }
421
422
1
    LOG(INFO) << "link rowset files by compare " << _local_hdr_filename << " and "
423
1
              << _remote_hdr_filename;
424
425
1
    std::unordered_map<std::string, const RowsetMetaPB&> remote_rowset_metas;
426
2
    for (const auto& rowset_meta : remote_tablet_meta.rs_metas()) {
  Branch (426:34): [True: 2, False: 1]
427
2
        if (rowset_meta.has_resource_id()) { // skip remote rowset
  Branch (427:13): [True: 0, False: 2]
428
0
            continue;
429
0
        }
430
2
        remote_rowset_metas.insert({rowset_meta.rowset_id_v2(), rowset_meta});
431
2
    }
432
433
1
    std::unordered_map<std::string, const RowsetMetaPB&> local_rowset_metas;
434
2
    for (const auto& rowset_meta : local_tablet_meta.rs_metas()) {
  Branch (434:34): [True: 2, False: 1]
435
2
        if (rowset_meta.has_resource_id()) {
  Branch (435:13): [True: 0, False: 2]
436
0
            continue;
437
0
        }
438
2
        local_rowset_metas.insert({rowset_meta.rowset_id_v2(), rowset_meta});
439
2
    }
440
441
2
    for (const auto& local_rowset_meta : local_tablet_meta.rs_metas()) {
  Branch (441:40): [True: 2, False: 1]
442
2
        if (local_rowset_meta.has_resource_id() || !local_rowset_meta.has_source_rowset_id()) {
  Branch (442:13): [True: 0, False: 2]
  Branch (442:52): [True: 2, False: 0]
443
2
            continue;
444
2
        }
445
446
0
        auto remote_rowset_meta = remote_rowset_metas.find(local_rowset_meta.source_rowset_id());
447
0
        if (remote_rowset_meta == remote_rowset_metas.end()) {
  Branch (447:13): [True: 0, False: 0]
448
0
            continue;
449
0
        }
450
451
0
        const auto& remote_rowset_id = remote_rowset_meta->first;
452
0
        const auto& remote_rowset_meta_pb = remote_rowset_meta->second;
453
0
        const auto& local_rowset_id = local_rowset_meta.rowset_id_v2();
454
0
        auto remote_tablet_id = remote_rowset_meta_pb.tablet_id();
455
0
        if (local_rowset_meta.start_version() != remote_rowset_meta_pb.start_version() ||
  Branch (455:13): [True: 0, False: 0]
456
0
            local_rowset_meta.end_version() != remote_rowset_meta_pb.end_version()) {
  Branch (456:13): [True: 0, False: 0]
457
0
            continue;
458
0
        }
459
460
0
        LOG(INFO) << "rowset " << local_rowset_id << " was downloaded from remote tablet "
461
0
                  << remote_tablet_id << " rowset " << remote_rowset_id
462
0
                  << ", directly link files instead of downloading";
463
464
        // Since the rowset meta are the same, we can link the local rowset files as
465
        // the downloaded remote rowset files.
466
0
        for (const auto& [local_file, local_filestat] : _local_files) {
  Branch (466:55): [True: 0, False: 0]
467
0
            if (!local_file.starts_with(local_rowset_id)) {
  Branch (467:17): [True: 0, False: 0]
468
0
                continue;
469
0
            }
470
471
0
            std::string remote_file = local_file;
472
0
            remote_file.replace(0, local_rowset_id.size(), remote_rowset_id);
473
0
            std::string local_file_path = _local_path + "/" + local_file;
474
0
            std::string remote_file_path = _local_path + "/" + remote_file;
475
476
0
            bool exist = true;
477
0
            RETURN_IF_ERROR(io::global_local_filesystem()->exists(remote_file_path, &exist));
478
0
            if (exist) {
  Branch (478:17): [True: 0, False: 0]
479
0
                continue;
480
0
            }
481
482
0
            LOG(INFO) << "link file from " << local_file_path << " to " << remote_file_path;
483
0
            if (!io::global_local_filesystem()->link_file(local_file_path, remote_file_path)) {
  Branch (483:17): [True: 0, False: 0]
484
0
                std::string msg = fmt::format("link file failed from {} to {}, err: {}",
485
0
                                              local_file_path, remote_file_path, strerror(errno));
486
0
                LOG(WARNING) << msg;
487
0
                return Status::InternalError(std::move(msg));
488
0
            }
489
490
0
            _local_files[remote_file] = local_filestat;
491
0
        }
492
0
    }
493
494
2
    for (const auto& remote_rowset_meta : remote_tablet_meta.rs_metas()) {
  Branch (494:41): [True: 2, False: 1]
495
2
        if (remote_rowset_meta.has_resource_id() || !remote_rowset_meta.has_source_rowset_id()) {
  Branch (495:13): [True: 0, False: 2]
  Branch (495:53): [True: 0, False: 2]
496
0
            continue;
497
0
        }
498
499
2
        auto local_rowset_meta = local_rowset_metas.find(remote_rowset_meta.source_rowset_id());
500
2
        if (local_rowset_meta == local_rowset_metas.end()) {
  Branch (500:13): [True: 0, False: 2]
501
0
            continue;
502
0
        }
503
504
2
        const auto& local_rowset_id = local_rowset_meta->first;
505
2
        const auto& local_rowset_meta_pb = local_rowset_meta->second;
506
2
        const auto& remote_rowset_id = remote_rowset_meta.rowset_id_v2();
507
2
        auto local_tablet_id = local_rowset_meta_pb.tablet_id();
508
509
2
        if (remote_rowset_meta.start_version() != local_rowset_meta_pb.start_version() ||
  Branch (509:13): [True: 0, False: 2]
510
2
            remote_rowset_meta.end_version() != local_rowset_meta_pb.end_version()) {
  Branch (510:13): [True: 0, False: 2]
511
0
            continue;
512
0
        }
513
514
2
        LOG(INFO) << "remote rowset " << remote_rowset_id << " was derived from local tablet "
515
2
                  << local_tablet_id << " rowset " << local_rowset_id
516
2
                  << ", skip downloading these files";
517
518
2
        for (const auto& remote_file : _remote_file_list) {
  Branch (518:38): [True: 2, False: 2]
519
2
            if (!remote_file.starts_with(remote_rowset_id)) {
  Branch (519:17): [True: 1, False: 1]
520
1
                continue;
521
1
            }
522
523
1
            std::string local_file = remote_file;
524
1
            local_file.replace(0, remote_rowset_id.size(), local_rowset_id);
525
1
            std::string local_file_path = _local_path + "/" + local_file;
526
1
            std::string remote_file_path = _local_path + "/" + remote_file;
527
528
1
            bool exist = false;
529
1
            RETURN_IF_ERROR(io::global_local_filesystem()->exists(remote_file_path, &exist));
530
1
            if (exist) {
  Branch (530:17): [True: 0, False: 1]
531
0
                continue;
532
0
            }
533
534
1
            LOG(INFO) << "link file from " << local_file_path << " to " << remote_file_path;
535
1
            if (!io::global_local_filesystem()->link_file(local_file_path, remote_file_path)) {
  Branch (535:17): [True: 0, False: 1]
536
0
                std::string msg = fmt::format("link file failed from {} to {}, err: {}",
537
0
                                              local_file_path, remote_file_path, strerror(errno));
538
0
                LOG(WARNING) << msg;
539
0
                return Status::InternalError(std::move(msg));
540
1
            } else {
541
1
                auto it = _local_files.find(local_file);
542
1
                if (it != _local_files.end()) {
  Branch (542:21): [True: 1, False: 0]
543
1
                    _local_files[remote_file] = it->second;
544
1
                } else {
545
0
                    std::string msg =
546
0
                            fmt::format("local file {} don't exist in _local_files, err: {}",
547
0
                                        local_file, strerror(errno));
548
0
                    LOG(WARNING) << msg;
549
0
                    return Status::InternalError(std::move(msg));
550
0
                }
551
1
            }
552
1
        }
553
2
    }
554
1
    return Status::OK();
555
1
}
556
557
1
Status SnapshotHttpDownloader::_get_remote_file_stats() {
558
1
    for (const auto& filename : _remote_file_list) {
  Branch (558:31): [True: 1, False: 1]
559
1
        if (_report_progress_callback) {
  Branch (559:13): [True: 0, False: 1]
560
0
            RETURN_IF_ERROR(_report_progress_callback());
561
0
        }
562
563
1
        std::string remote_file_url =
564
1
                fmt::format("{}&file={}/{}", _base_url, _remote_path, filename);
565
566
1
        RemoteFileStat remote_filestat;
567
1
        RETURN_IF_ERROR(_get_http_file_stat(remote_file_url, &remote_filestat));
568
1
        _remote_files[filename] = remote_filestat;
569
1
    }
570
571
1
    return Status::OK();
572
1
}
573
574
1
void SnapshotHttpDownloader::_get_need_download_files() {
575
1
    for (const auto& [remote_file, remote_filestat] : _remote_files) {
  Branch (575:53): [True: 1, False: 1]
576
1
        LOG(INFO) << "remote file: " << remote_file << ", size: " << remote_filestat.size
577
1
                  << ", md5: " << remote_filestat.md5;
578
1
        auto it = _local_files.find(remote_file);
579
1
        if (it == _local_files.end()) {
  Branch (579:13): [True: 0, False: 1]
580
0
            _need_download_files.emplace_back(remote_file);
581
0
            continue;
582
0
        }
583
584
1
        if (auto& local_filestat = it->second; local_filestat.size != remote_filestat.size) {
  Branch (584:48): [True: 0, False: 1]
585
0
            _need_download_files.emplace_back(remote_file);
586
0
            continue;
587
0
        }
588
589
1
        if (auto& local_filestat = it->second; local_filestat.md5 != remote_filestat.md5) {
  Branch (589:48): [True: 0, False: 1]
590
0
            _need_download_files.emplace_back(remote_file);
591
0
            continue;
592
0
        }
593
594
1
        LOG(INFO) << fmt::format("file {} already exists, skip download url {}", remote_file,
595
1
                                 remote_filestat.url);
596
1
    }
597
1
}
598
599
0
Status SnapshotHttpDownloader::_download_files() {
600
0
    DataDir* data_dir = _tablet->data_dir();
601
602
0
    uint64_t total_file_size = 0;
603
0
    MonotonicStopWatch watch(true);
604
0
    for (auto& filename : _need_download_files) {
  Branch (604:25): [True: 0, False: 0]
605
0
        if (_report_progress_callback) {
  Branch (605:13): [True: 0, False: 0]
606
0
            RETURN_IF_ERROR(_report_progress_callback());
607
0
        }
608
609
0
        auto& remote_filestat = _remote_files[filename];
610
0
        auto file_size = remote_filestat.size;
611
0
        auto& remote_file_url = remote_filestat.url;
612
0
        auto& remote_file_md5 = remote_filestat.md5;
613
614
0
        std::string local_filename;
615
0
        RETURN_IF_ERROR(
616
0
                _snapshot_loader._replace_tablet_id(filename, _local_tablet_id, &local_filename));
617
0
        std::string local_file_path = _local_path + "/" + local_filename;
618
619
0
        RETURN_IF_ERROR(
620
0
                _download_http_file(data_dir, remote_file_url, local_file_path, remote_filestat));
621
0
        total_file_size += file_size;
622
623
        // local_files always keep the updated local files
624
0
        _local_files[filename] = LocalFileStat {file_size, remote_file_md5};
625
0
    }
626
627
0
    uint64_t total_time_ms = watch.elapsed_time() / 1000 / 1000;
628
0
    total_time_ms = total_time_ms > 0 ? total_time_ms : 0;
  Branch (628:21): [True: 0, False: 0]
629
0
    double copy_rate = 0.0;
630
0
    if (total_time_ms > 0) {
  Branch (630:9): [True: 0, False: 0]
631
0
        copy_rate = total_file_size / ((double)total_time_ms) / 1000;
632
0
    }
633
0
    LOG(INFO) << fmt::format(
634
0
            "succeed to copy remote tablet {} to local tablet {}, total downloading {} files, "
635
0
            "total file size: {} B, cost: {} ms, rate: {} MB/s",
636
0
            _remote_tablet_id, _local_tablet_id, _need_download_files.size(), total_file_size,
637
0
            total_time_ms, copy_rate);
638
639
0
    return Status::OK();
640
0
}
641
642
1
Status SnapshotHttpDownloader::_install_remote_hdr_file() {
643
1
    std::string local_hdr_filename;
644
1
    RETURN_IF_ERROR(_snapshot_loader._replace_tablet_id(_remote_hdr_filename, _local_tablet_id,
645
1
                                                        &local_hdr_filename));
646
1
    std::string local_hdr_file_path = _local_path + "/" + local_hdr_filename;
647
648
1
    auto status = io::global_local_filesystem()->rename(_tmp_hdr_file, local_hdr_file_path);
649
1
    if (!status.ok()) {
  Branch (649:9): [True: 0, False: 1]
650
0
        LOG(WARNING) << "failed to install remote hdr file from: " << _tmp_hdr_file << " to"
651
0
                     << local_hdr_file_path << ", error: " << status.to_string();
652
0
        return Status::RuntimeError("failed install remote hdr file {} from tmp {}, error: {}",
653
0
                                    local_hdr_file_path, _tmp_hdr_file, status.to_string());
654
0
    }
655
656
    // also save the hdr file into remote files.
657
1
    _remote_files[_remote_hdr_filename] = _remote_hdr_filestat;
658
659
1
    return Status::OK();
660
1
}
661
662
1
Status SnapshotHttpDownloader::_delete_orphan_files() {
663
    // local_files: contain all remote files and local files
664
    // finally, delete local files which are not in remote
665
3
    for (const auto& [local_file, local_filestat] : _local_files) {
  Branch (665:51): [True: 3, False: 1]
666
        // replace the tablet id in local file name with the remote tablet id,
667
        // in order to compare the file name.
668
3
        std::string new_name;
669
3
        Status st = _snapshot_loader._replace_tablet_id(local_file, _remote_tablet_id, &new_name);
670
3
        if (!st.ok()) {
  Branch (670:13): [True: 0, False: 3]
671
0
            LOG(WARNING) << "failed to replace tablet id. unknown local file: " << st
672
0
                         << ". ignore it";
673
0
            continue;
674
0
        }
675
3
        VLOG_CRITICAL << "new file name after replace tablet id: " << new_name;
Line
Count
Source
43
0
#define VLOG_CRITICAL VLOG(1)
676
3
        const auto& find = _remote_files.find(new_name);
677
3
        if (find != _remote_files.end()) {
  Branch (677:13): [True: 2, False: 1]
678
2
            continue;
679
2
        }
680
681
        // delete
682
1
        std::string full_local_file = _local_path + "/" + local_file;
683
1
        LOG(INFO) << "begin to delete local snapshot file: " << full_local_file
684
1
                  << ", it does not exist in remote";
685
1
        if (remove(full_local_file.c_str()) != 0) {
  Branch (685:13): [True: 0, False: 1]
686
0
            LOG(WARNING) << "failed to delete unknown local file: " << full_local_file
687
0
                         << ", error: " << strerror(errno) << ", file size: " << local_filestat.size
688
0
                         << ", ignore it";
689
0
        }
690
1
    }
691
1
    return Status::OK();
692
1
}
693
694
1
Status SnapshotHttpDownloader::download() {
695
    // Take a lock to protect the local snapshot path.
696
1
    auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(_local_path);
697
698
    // Step 1: Validate local tablet snapshot paths
699
1
    bool res = true;
700
1
    RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(_local_path, &res));
701
1
    if (!res) {
  Branch (701:9): [True: 0, False: 1]
702
0
        std::string msg =
703
0
                fmt::format("snapshot path is not directory or does not exist: {}", _local_path);
704
0
        LOG(WARNING) << msg;
705
0
        return Status::RuntimeError(std::move(msg));
706
0
    }
707
708
    // Step 2: get all local files
709
1
    RETURN_IF_ERROR(_load_existing_files());
710
711
    // Step 3: Validate remote tablet snapshot paths && remote files map
712
1
    RETURN_IF_ERROR(_list_remote_files());
713
714
    // Step 4: download hdr file to a tmp file
715
1
    RETURN_IF_ERROR(_download_hdr_file());
716
717
    // Step 5: link same rowset files, if local tablet meta file exists
718
1
    if (!_local_hdr_filename.empty()) {
  Branch (718:9): [True: 1, False: 0]
719
1
        RETURN_IF_ERROR(_link_same_rowset_files());
720
1
    }
721
722
    // Step 6: get all remote file stats
723
1
    RETURN_IF_ERROR(_get_remote_file_stats());
724
725
    // Step 7: get all need download files & download them
726
1
    _get_need_download_files();
727
1
    if (!_need_download_files.empty()) {
  Branch (727:9): [True: 0, False: 1]
728
0
        RETURN_IF_ERROR(_download_files());
729
0
    }
730
731
    // Step 8: install remote hdr file from tmp file
732
1
    RETURN_IF_ERROR(_install_remote_hdr_file());
733
734
    // Step 9: delete orphan files
735
1
    RETURN_IF_ERROR(_delete_orphan_files());
736
737
1
    return Status::OK();
738
1
}
739
740
BaseSnapshotLoader::BaseSnapshotLoader(ExecEnv* env, int64_t job_id, int64_t task_id,
741
                                       const TNetworkAddress& broker_addr,
742
                                       const std::map<std::string, std::string>& prop)
743
4
        : _env(env), _job_id(job_id), _task_id(task_id), _broker_addr(broker_addr), _prop(prop) {}
744
745
0
Status BaseSnapshotLoader::init(TStorageBackendType::type type, const std::string& location) {
746
0
    if (TStorageBackendType::type::S3 == type) {
  Branch (746:9): [True: 0, False: 0]
747
0
        S3Conf s3_conf;
748
0
        S3URI s3_uri(location);
749
0
        RETURN_IF_ERROR(s3_uri.parse());
750
0
        RETURN_IF_ERROR(S3ClientFactory::convert_properties_to_s3_conf(_prop, s3_uri, &s3_conf));
751
0
        _remote_fs =
752
0
                DORIS_TRY(io::S3FileSystem::create(std::move(s3_conf), io::FileSystem::TMP_FS_ID));
753
0
    } else if (TStorageBackendType::type::HDFS == type) {
  Branch (753:16): [True: 0, False: 0]
754
0
        THdfsParams hdfs_params = parse_properties(_prop);
755
0
        _remote_fs = DORIS_TRY(io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name,
756
0
                                                          io::FileSystem::TMP_FS_ID, nullptr));
757
0
    } else if (TStorageBackendType::type::BROKER == type) {
  Branch (757:16): [True: 0, False: 0]
758
0
        std::shared_ptr<io::BrokerFileSystem> fs;
759
0
        _remote_fs = DORIS_TRY(
760
0
                io::BrokerFileSystem::create(_broker_addr, _prop, io::FileSystem::TMP_FS_ID));
761
0
    } else {
762
0
        return Status::InternalError("Unknown storage type: {}", type);
763
0
    }
764
0
    return Status::OK();
765
0
}
766
767
SnapshotLoader::SnapshotLoader(StorageEngine& engine, ExecEnv* env, int64_t job_id, int64_t task_id,
768
                               const TNetworkAddress& broker_addr,
769
                               const std::map<std::string, std::string>& prop)
770
4
        : BaseSnapshotLoader(env, job_id, task_id, broker_addr, prop), _engine(engine) {}
771
772
Status SnapshotLoader::upload(const std::map<std::string, std::string>& src_to_dest_path,
773
0
                              std::map<int64_t, std::vector<std::string>>* tablet_files) {
774
0
    if (!_remote_fs) {
  Branch (774:9): [True: 0, False: 0]
775
0
        return Status::InternalError("Storage backend not initialized.");
776
0
    }
777
0
    LOG(INFO) << "begin to upload snapshot files. num: " << src_to_dest_path.size()
778
0
              << ", broker addr: " << _broker_addr << ", job: " << _job_id << ", task" << _task_id;
779
780
    // check if job has already been cancelled
781
0
    int tmp_counter = 1;
782
0
    RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::UPLOAD));
783
784
0
    Status status = Status::OK();
785
    // 1. validate local tablet snapshot paths
786
0
    RETURN_IF_ERROR(_check_local_snapshot_paths(src_to_dest_path, true));
787
788
    // 2. for each src path, upload it to remote storage
789
    // we report to frontend for every 10 files, and we will cancel the job if
790
    // the job has already been cancelled in frontend.
791
0
    int report_counter = 0;
792
0
    int total_num = src_to_dest_path.size();
793
0
    int finished_num = 0;
794
0
    for (const auto& iter : src_to_dest_path) {
  Branch (794:27): [True: 0, False: 0]
795
0
        const std::string& src_path = iter.first;
796
0
        const std::string& dest_path = iter.second;
797
798
        // Take a lock to protect the local snapshot path.
799
0
        auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(src_path);
800
801
0
        int64_t tablet_id = 0;
802
0
        int32_t schema_hash = 0;
803
0
        RETURN_IF_ERROR(
804
0
                _get_tablet_id_and_schema_hash_from_file_path(src_path, &tablet_id, &schema_hash));
805
806
        // 2.1 get existing files from remote path
807
0
        std::map<std::string, FileStat> remote_files;
808
0
        RETURN_IF_ERROR(_list_with_checksum(dest_path, &remote_files));
809
810
0
        for (auto& tmp : remote_files) {
  Branch (810:24): [True: 0, False: 0]
811
0
            VLOG_CRITICAL << "get remote file: " << tmp.first << ", checksum: " << tmp.second.md5;
Line
Count
Source
43
0
#define VLOG_CRITICAL VLOG(1)
812
0
        }
813
814
        // 2.2 list local files
815
0
        std::vector<std::string> local_files;
816
0
        std::vector<std::string> local_files_with_checksum;
817
0
        RETURN_IF_ERROR(_get_existing_files_from_local(src_path, &local_files));
818
819
        // 2.3 iterate local files
820
0
        for (auto& local_file : local_files) {
  Branch (820:31): [True: 0, False: 0]
821
0
            RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num,
822
0
                                          TTaskType::type::UPLOAD));
823
824
            // calc md5sum of localfile
825
0
            std::string md5sum;
826
0
            RETURN_IF_ERROR(
827
0
                    io::global_local_filesystem()->md5sum(src_path + "/" + local_file, &md5sum));
828
0
            VLOG_CRITICAL << "get file checksum: " << local_file << ": " << md5sum;
Line
Count
Source
43
0
#define VLOG_CRITICAL VLOG(1)
829
0
            local_files_with_checksum.push_back(local_file + "." + md5sum);
830
831
            // check if this local file need upload
832
0
            bool need_upload = false;
833
0
            auto find = remote_files.find(local_file);
834
0
            if (find != remote_files.end()) {
  Branch (834:17): [True: 0, False: 0]
835
0
                if (md5sum != find->second.md5) {
  Branch (835:21): [True: 0, False: 0]
836
                    // remote storage file exist, but with different checksum
837
0
                    LOG(WARNING) << "remote file checksum is invalid. remote: " << find->first
838
0
                                 << ", local: " << md5sum;
839
                    // TODO(cmy): save these files and delete them later
840
0
                    need_upload = true;
841
0
                }
842
0
            } else {
843
0
                need_upload = true;
844
0
            }
845
846
0
            if (!need_upload) {
  Branch (846:17): [True: 0, False: 0]
847
0
                VLOG_CRITICAL << "file exist in remote path, no need to upload: " << local_file;
Line
Count
Source
43
0
#define VLOG_CRITICAL VLOG(1)
848
0
                continue;
849
0
            }
850
851
            // upload
852
0
            std::string remote_path = dest_path + '/' + local_file;
853
0
            std::string local_path = src_path + '/' + local_file;
854
0
            RETURN_IF_ERROR(upload_with_checksum(*_remote_fs, local_path, remote_path, md5sum));
855
0
        } // end for each tablet's local files
856
857
0
        tablet_files->emplace(tablet_id, local_files_with_checksum);
858
0
        finished_num++;
859
0
        LOG(INFO) << "finished to write tablet to remote. local path: " << src_path
860
0
                  << ", remote path: " << dest_path;
861
0
    } // end for each tablet path
862
863
0
    LOG(INFO) << "finished to upload snapshots. job: " << _job_id << ", task id: " << _task_id;
864
0
    return status;
865
0
}
866
867
/*
868
 * Download snapshot files from remote.
869
 * After downloaded, the local dir should contains all files existing in remote,
870
 * may also contains several useless files.
871
 */
872
Status SnapshotLoader::download(const std::map<std::string, std::string>& src_to_dest_path,
873
0
                                std::vector<int64_t>* downloaded_tablet_ids) {
874
0
    if (!_remote_fs) {
  Branch (874:9): [True: 0, False: 0]
875
0
        return Status::InternalError("Storage backend not initialized.");
876
0
    }
877
0
    LOG(INFO) << "begin to download snapshot files. num: " << src_to_dest_path.size()
878
0
              << ", broker addr: " << _broker_addr << ", job: " << _job_id
879
0
              << ", task id: " << _task_id;
880
881
    // check if job has already been cancelled
882
0
    int tmp_counter = 1;
883
0
    RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::DOWNLOAD));
884
885
0
    Status status = Status::OK();
886
    // 1. validate local tablet snapshot paths
887
0
    RETURN_IF_ERROR(_check_local_snapshot_paths(src_to_dest_path, false));
888
889
    // 2. for each src path, download it to local storage
890
0
    int report_counter = 0;
891
0
    int total_num = src_to_dest_path.size();
892
0
    int finished_num = 0;
893
0
    for (const auto& iter : src_to_dest_path) {
  Branch (893:27): [True: 0, False: 0]
894
0
        const std::string& remote_path = iter.first;
895
0
        const std::string& local_path = iter.second;
896
897
        // Take a lock to protect the local snapshot path.
898
0
        auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(local_path);
899
900
0
        int64_t local_tablet_id = 0;
901
0
        int32_t schema_hash = 0;
902
0
        RETURN_IF_ERROR(_get_tablet_id_and_schema_hash_from_file_path(local_path, &local_tablet_id,
903
0
                                                                      &schema_hash));
904
0
        if (downloaded_tablet_ids != nullptr) {
  Branch (904:13): [True: 0, False: 0]
905
0
            downloaded_tablet_ids->push_back(local_tablet_id);
906
0
        }
907
908
0
        int64_t remote_tablet_id;
909
0
        RETURN_IF_ERROR(_get_tablet_id_from_remote_path(remote_path, &remote_tablet_id));
910
0
        VLOG_CRITICAL << "get local tablet id: " << local_tablet_id
Line
Count
Source
43
0
#define VLOG_CRITICAL VLOG(1)
911
0
                      << ", schema hash: " << schema_hash
912
0
                      << ", remote tablet id: " << remote_tablet_id;
913
914
        // 2.1. get local files
915
0
        std::vector<std::string> local_files;
916
0
        RETURN_IF_ERROR(_get_existing_files_from_local(local_path, &local_files));
917
918
        // 2.2. get remote files
919
0
        std::map<std::string, FileStat> remote_files;
920
0
        RETURN_IF_ERROR(_list_with_checksum(remote_path, &remote_files));
921
0
        if (remote_files.empty()) {
  Branch (921:13): [True: 0, False: 0]
922
0
            std::stringstream ss;
923
0
            ss << "get nothing from remote path: " << remote_path;
924
0
            LOG(WARNING) << ss.str();
925
0
            return Status::InternalError(ss.str());
926
0
        }
927
928
0
        TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(local_tablet_id);
929
0
        if (tablet == nullptr) {
  Branch (929:13): [True: 0, False: 0]
930
0
            std::stringstream ss;
931
0
            ss << "failed to get local tablet: " << local_tablet_id;
932
0
            LOG(WARNING) << ss.str();
933
0
            return Status::InternalError(ss.str());
934
0
        }
935
0
        DataDir* data_dir = tablet->data_dir();
936
937
0
        for (auto& iter : remote_files) {
  Branch (937:25): [True: 0, False: 0]
938
0
            RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num,
939
0
                                          TTaskType::type::DOWNLOAD));
940
941
0
            bool need_download = false;
942
0
            const std::string& remote_file = iter.first;
943
0
            const FileStat& file_stat = iter.second;
944
0
            auto find = std::find(local_files.begin(), local_files.end(), remote_file);
945
0
            if (find == local_files.end()) {
  Branch (945:17): [True: 0, False: 0]
946
                // remote file does not exist in local, download it
947
0
                need_download = true;
948
0
            } else {
949
0
                if (_end_with(remote_file, ".hdr")) {
  Branch (949:21): [True: 0, False: 0]
950
                    // this is a header file, download it.
951
0
                    need_download = true;
952
0
                } else {
953
                    // check checksum
954
0
                    std::string local_md5sum;
955
0
                    Status st = io::global_local_filesystem()->md5sum(
956
0
                            local_path + "/" + remote_file, &local_md5sum);
957
0
                    if (!st.ok()) {
  Branch (957:25): [True: 0, False: 0]
958
0
                        LOG(WARNING) << "failed to get md5sum of local file: " << remote_file
959
0
                                     << ". msg: " << st << ". download it";
960
0
                        need_download = true;
961
0
                    } else {
962
0
                        VLOG_CRITICAL << "get local file checksum: " << remote_file << ": "
Line
Count
Source
43
0
#define VLOG_CRITICAL VLOG(1)
963
0
                                      << local_md5sum;
964
0
                        if (file_stat.md5 != local_md5sum) {
  Branch (964:29): [True: 0, False: 0]
965
                            // file's checksum does not equal, download it.
966
0
                            need_download = true;
967
0
                        }
968
0
                    }
969
0
                }
970
0
            }
971
972
0
            if (!need_download) {
  Branch (972:17): [True: 0, False: 0]
973
0
                LOG(INFO) << "remote file already exist in local, no need to download."
974
0
                          << ", file: " << remote_file;
975
0
                continue;
976
0
            }
977
978
            // begin to download
979
0
            std::string full_remote_file = remote_path + "/" + remote_file + "." + file_stat.md5;
980
0
            std::string local_file_name;
981
            // we need to replace the tablet_id in remote file name with local tablet id
982
0
            RETURN_IF_ERROR(_replace_tablet_id(remote_file, local_tablet_id, &local_file_name));
983
0
            std::string full_local_file = local_path + "/" + local_file_name;
984
0
            LOG(INFO) << "begin to download from " << full_remote_file << " to " << full_local_file;
985
0
            size_t file_len = file_stat.size;
986
987
            // check disk capacity
988
0
            if (data_dir->reach_capacity_limit(file_len)) {
  Branch (988:17): [True: 0, False: 0]
989
0
                return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
990
0
                        "reach the capacity limit of path {}, file_size={}", data_dir->path(),
991
0
                        file_len);
992
0
            }
993
            // remove file which will be downloaded now.
994
            // this file will be added to local_files if it be downloaded successfully.
995
0
            if (find != local_files.end()) {
  Branch (995:17): [True: 0, False: 0]
996
0
                local_files.erase(find);
997
0
            }
998
0
            RETURN_IF_ERROR(_remote_fs->download(full_remote_file, full_local_file));
999
1000
            // 3. check md5 of the downloaded file
1001
0
            std::string downloaded_md5sum;
1002
0
            RETURN_IF_ERROR(
1003
0
                    io::global_local_filesystem()->md5sum(full_local_file, &downloaded_md5sum));
1004
0
            VLOG_CRITICAL << "get downloaded file checksum: " << full_local_file << ": "
Line
Count
Source
43
0
#define VLOG_CRITICAL VLOG(1)
1005
0
                          << downloaded_md5sum;
1006
0
            if (downloaded_md5sum != file_stat.md5) {
  Branch (1006:17): [True: 0, False: 0]
1007
0
                std::stringstream ss;
1008
0
                ss << "invalid md5 of downloaded file: " << full_local_file
1009
0
                   << ", expected: " << file_stat.md5 << ", get: " << downloaded_md5sum;
1010
0
                LOG(WARNING) << ss.str();
1011
0
                return Status::InternalError(ss.str());
1012
0
            }
1013
1014
            // local_files always keep the updated local files
1015
0
            local_files.push_back(local_file_name);
1016
0
            LOG(INFO) << "finished to download file via broker. file: " << full_local_file
1017
0
                      << ", length: " << file_len;
1018
0
        } // end for all remote files
1019
1020
        // finally, delete local files which are not in remote
1021
0
        for (const auto& local_file : local_files) {
  Branch (1021:37): [True: 0, False: 0]
1022
            // replace the tablet id in local file name with the remote tablet id,
1023
            // in order to compare the file name.
1024
0
            std::string new_name;
1025
0
            Status st = _replace_tablet_id(local_file, remote_tablet_id, &new_name);
1026
0
            if (!st.ok()) {
  Branch (1026:17): [True: 0, False: 0]
1027
0
                LOG(WARNING) << "failed to replace tablet id. unknown local file: " << st
1028
0
                             << ". ignore it";
1029
0
                continue;
1030
0
            }
1031
0
            VLOG_CRITICAL << "new file name after replace tablet id: " << new_name;
Line
Count
Source
43
0
#define VLOG_CRITICAL VLOG(1)
1032
0
            const auto& find = remote_files.find(new_name);
1033
0
            if (find != remote_files.end()) {
  Branch (1033:17): [True: 0, False: 0]
1034
0
                continue;
1035
0
            }
1036
1037
            // delete
1038
0
            std::string full_local_file = local_path + "/" + local_file;
1039
0
            VLOG_CRITICAL << "begin to delete local snapshot file: " << full_local_file
Line
Count
Source
43
0
#define VLOG_CRITICAL VLOG(1)
1040
0
                          << ", it does not exist in remote";
1041
0
            if (remove(full_local_file.c_str()) != 0) {
  Branch (1041:17): [True: 0, False: 0]
1042
0
                LOG(WARNING) << "failed to delete unknown local file: " << full_local_file
1043
0
                             << ", ignore it";
1044
0
            }
1045
0
        }
1046
1047
0
        finished_num++;
1048
0
    } // end for src_to_dest_path
1049
1050
0
    LOG(INFO) << "finished to download snapshots. job: " << _job_id << ", task id: " << _task_id;
1051
0
    return status;
1052
0
}
1053
1054
Status SnapshotLoader::remote_http_download(
1055
        const std::vector<TRemoteTabletSnapshot>& remote_tablet_snapshots,
1056
1
        std::vector<int64_t>* downloaded_tablet_ids) {
1057
    // check if job has already been cancelled
1058
1059
#ifndef BE_TEST
1060
    int tmp_counter = 1;
1061
    RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::DOWNLOAD));
1062
#endif
1063
1
    Status status = Status::OK();
1064
1065
1
    for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
  Branch (1065:45): [True: 1, False: 1]
1066
1
        auto local_tablet_id = remote_tablet_snapshot.local_tablet_id;
1067
1
        const auto& local_path = remote_tablet_snapshot.local_snapshot_path;
1068
1
        const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path;
1069
1070
1
        LOG(INFO) << fmt::format(
1071
1
                "download snapshots via http. job: {}, task id: {}, local dir: {}, remote dir: {}",
1072
1
                _job_id, _task_id, local_path, remote_path);
1073
1074
1
        TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(local_tablet_id);
1075
1
        if (tablet == nullptr) {
  Branch (1075:13): [True: 0, False: 1]
1076
0
            std::string msg = fmt::format("failed to get local tablet: {}", local_tablet_id);
1077
0
            LOG(WARNING) << msg;
1078
0
            return Status::RuntimeError(std::move(msg));
1079
0
        }
1080
1081
1
        if (downloaded_tablet_ids != nullptr) {
  Branch (1081:13): [True: 1, False: 0]
1082
1
            downloaded_tablet_ids->push_back(local_tablet_id);
1083
1
        }
1084
1085
1
        SnapshotHttpDownloader downloader(remote_tablet_snapshot, std::move(tablet), *this);
1086
#ifndef BE_TEST
1087
        int report_counter = 0;
1088
        int finished_num = 0;
1089
        int total_num = remote_tablet_snapshots.size();
1090
        downloader.set_report_progress_callback(
1091
                [this, &report_counter, &finished_num, &total_num]() {
1092
                    return _report_every(10, &report_counter, finished_num, total_num,
1093
                                         TTaskType::type::DOWNLOAD);
1094
                });
1095
#endif
1096
1097
1
        RETURN_IF_ERROR(downloader.download());
1098
1
        _set_http_download_files_num(downloader.get_download_file_num());
1099
1100
#ifndef BE_TEST
1101
        ++finished_num;
1102
#endif
1103
1
    }
1104
1105
1
    LOG(INFO) << "finished to download snapshots. job: " << _job_id << ", task id: " << _task_id;
1106
1
    return status;
1107
1
}
1108
1109
// move the snapshot files in snapshot_path
1110
// to tablet_path
1111
// If overwrite, just replace the tablet_path with snapshot_path,
1112
// else: (TODO)
1113
//
1114
// MUST hold tablet's header lock, push lock, cumulative lock and base compaction lock
1115
Status SnapshotLoader::move(const std::string& snapshot_path, TabletSharedPtr tablet,
1116
2
                            bool overwrite) {
1117
    // Take a lock to protect the local snapshot path.
1118
2
    auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(snapshot_path);
1119
1120
2
    auto tablet_path = tablet->tablet_path();
1121
2
    auto store_path = tablet->data_dir()->path();
1122
2
    LOG(INFO) << "begin to move snapshot files. from: " << snapshot_path << ", to: " << tablet_path
1123
2
              << ", store: " << store_path << ", job: " << _job_id << ", task id: " << _task_id;
1124
1125
2
    Status status = Status::OK();
1126
1127
    // validate snapshot_path and tablet_path
1128
2
    int64_t snapshot_tablet_id = 0;
1129
2
    int32_t snapshot_schema_hash = 0;
1130
2
    RETURN_IF_ERROR(_get_tablet_id_and_schema_hash_from_file_path(
1131
2
            snapshot_path, &snapshot_tablet_id, &snapshot_schema_hash));
1132
1133
2
    int64_t tablet_id = 0;
1134
2
    int32_t schema_hash = 0;
1135
2
    RETURN_IF_ERROR(
1136
2
            _get_tablet_id_and_schema_hash_from_file_path(tablet_path, &tablet_id, &schema_hash));
1137
1138
2
    if (tablet_id != snapshot_tablet_id || schema_hash != snapshot_schema_hash) {
  Branch (1138:9): [True: 0, False: 2]
  Branch (1138:44): [True: 0, False: 2]
1139
0
        std::stringstream ss;
1140
0
        ss << "path does not match. snapshot: " << snapshot_path
1141
0
           << ", tablet path: " << tablet_path;
1142
0
        LOG(WARNING) << ss.str();
1143
0
        return Status::InternalError(ss.str());
1144
0
    }
1145
1146
2
    DataDir* store = _engine.get_store(store_path);
1147
2
    if (store == nullptr) {
  Branch (1147:9): [True: 0, False: 2]
1148
0
        std::stringstream ss;
1149
0
        ss << "failed to get store by path: " << store_path;
1150
0
        LOG(WARNING) << ss.str();
1151
0
        return Status::InternalError(ss.str());
1152
0
    }
1153
1154
2
    if (!std::filesystem::exists(tablet_path)) {
  Branch (1154:9): [True: 0, False: 2]
1155
0
        std::stringstream ss;
1156
0
        ss << "tablet path does not exist: " << tablet_path;
1157
0
        LOG(WARNING) << ss.str();
1158
0
        return Status::InternalError(ss.str());
1159
0
    }
1160
1161
2
    if (!std::filesystem::exists(snapshot_path)) {
  Branch (1161:9): [True: 0, False: 2]
1162
0
        std::stringstream ss;
1163
0
        ss << "snapshot path does not exist: " << snapshot_path;
1164
0
        LOG(WARNING) << ss.str();
1165
0
        return Status::InternalError(ss.str());
1166
0
    }
1167
1168
2
    std::string loaded_tag_path = get_loaded_tag_path(snapshot_path);
1169
2
    bool already_loaded = false;
1170
2
    RETURN_IF_ERROR(io::global_local_filesystem()->exists(loaded_tag_path, &already_loaded));
1171
2
    if (already_loaded) {
  Branch (1171:9): [True: 1, False: 1]
1172
1
        LOG(INFO) << "snapshot path already moved: " << snapshot_path;
1173
1
        return Status::OK();
1174
1
    }
1175
1176
    // rename the rowset ids and tabletid info in rowset meta
1177
1
    auto res = _engine.snapshot_mgr()->convert_rowset_ids(snapshot_path, tablet_id,
1178
1
                                                          tablet->replica_id(), tablet->table_id(),
1179
1
                                                          tablet->partition_id(), schema_hash);
1180
1
    if (!res.has_value()) [[unlikely]] {
  Branch (1180:9): [True: 0, False: 1]
1181
0
        auto err_msg =
1182
0
                fmt::format("failed to convert rowsetids in snapshot: {}, tablet path: {}, err: {}",
1183
0
                            snapshot_path, tablet_path, res.error());
1184
0
        LOG(WARNING) << err_msg;
1185
0
        return Status::InternalError(err_msg);
1186
0
    }
1187
1188
1
    if (!overwrite) {
  Branch (1188:9): [True: 0, False: 1]
1189
0
        LOG(FATAL) << "only support overwrite now";
1190
0
        __builtin_unreachable();
1191
0
    }
1192
1193
    // Medium migration/clone/checkpoint/compaction may change or check the
1194
    // files and tablet meta, so we need to take these locks.
1195
1
    std::unique_lock migration_lock(tablet->get_migration_lock(), std::try_to_lock);
1196
1
    std::unique_lock base_compact_lock(tablet->get_base_compaction_lock(), std::try_to_lock);
1197
1
    std::unique_lock cumu_compact_lock(tablet->get_cumulative_compaction_lock(), std::try_to_lock);
1198
1
    std::unique_lock cold_compact_lock(tablet->get_cold_compaction_lock(), std::try_to_lock);
1199
1
    std::unique_lock build_idx_lock(tablet->get_build_inverted_index_lock(), std::try_to_lock);
1200
1
    std::unique_lock meta_store_lock(tablet->get_meta_store_lock(), std::try_to_lock);
1201
1
    if (!migration_lock.owns_lock() || !base_compact_lock.owns_lock() ||
  Branch (1201:9): [True: 0, False: 1]
  Branch (1201:40): [True: 0, False: 1]
1202
1
        !cumu_compact_lock.owns_lock() || !cold_compact_lock.owns_lock() ||
  Branch (1202:9): [True: 0, False: 1]
  Branch (1202:43): [True: 0, False: 1]
1203
1
        !build_idx_lock.owns_lock() || !meta_store_lock.owns_lock()) {
  Branch (1203:9): [True: 0, False: 1]
  Branch (1203:40): [True: 0, False: 1]
1204
        // This error should be retryable
1205
0
        auto status = Status::ObtainLockFailed("failed to get tablet locks, tablet: {}", tablet_id);
1206
0
        LOG(WARNING) << status << ", snapshot path: " << snapshot_path
1207
0
                     << ", tablet path: " << tablet_path;
1208
0
        return status;
1209
0
    }
1210
1211
1
    std::vector<std::string> snapshot_files;
1212
1
    RETURN_IF_ERROR(_get_existing_files_from_local(snapshot_path, &snapshot_files));
1213
1214
    // FIXME: the below logic will demage the tablet files if failed in the middle.
1215
1216
    // 1. simply delete the old dir and replace it with the snapshot dir
1217
1
    try {
1218
        // This remove seems soft enough, because we already get
1219
        // tablet id and schema hash from this path, which
1220
        // means this path is a valid path.
1221
1
        std::filesystem::remove_all(tablet_path);
1222
1
        VLOG_CRITICAL << "remove dir: " << tablet_path;
Line
Count
Source
43
0
#define VLOG_CRITICAL VLOG(1)
1223
1
        std::filesystem::create_directory(tablet_path);
1224
1
        VLOG_CRITICAL << "re-create dir: " << tablet_path;
Line
Count
Source
43
0
#define VLOG_CRITICAL VLOG(1)
1225
1
    } catch (const std::filesystem::filesystem_error& e) {
1226
0
        std::stringstream ss;
1227
0
        ss << "failed to move tablet path: " << tablet_path << ". err: " << e.what();
1228
0
        LOG(WARNING) << ss.str();
1229
0
        return Status::InternalError(ss.str());
1230
0
    }
1231
1232
    // link files one by one
1233
    // files in snapshot dir will be moved in snapshot clean process
1234
1
    std::vector<std::string> linked_files;
1235
2
    for (auto& file : snapshot_files) {
  Branch (1235:21): [True: 2, False: 1]
1236
2
        auto full_src_path = fmt::format("{}/{}", snapshot_path, file);
1237
2
        auto full_dest_path = fmt::format("{}/{}", tablet_path, file);
1238
2
        if (link(full_src_path.c_str(), full_dest_path.c_str()) != 0) {
  Branch (1238:13): [True: 0, False: 2]
1239
0
            LOG(WARNING) << "failed to link file from " << full_src_path << " to " << full_dest_path
1240
0
                         << ", err: " << std::strerror(errno);
1241
1242
            // clean the already linked files
1243
0
            for (auto& linked_file : linked_files) {
  Branch (1243:36): [True: 0, False: 0]
1244
0
                remove(linked_file.c_str());
1245
0
            }
1246
1247
0
            return Status::InternalError("move tablet failed");
1248
0
        }
1249
2
        linked_files.push_back(full_dest_path);
1250
2
        VLOG_CRITICAL << "link file from " << full_src_path << " to " << full_dest_path;
Line
Count
Source
43
0
#define VLOG_CRITICAL VLOG(1)
1251
2
    }
1252
1253
    // snapshot loader not need to change tablet uid
1254
    // fixme: there is no header now and can not call load_one_tablet here
1255
    // reload header
1256
1
    Status ost = _engine.tablet_manager()->load_tablet_from_dir(store, tablet_id, schema_hash,
1257
1
                                                                tablet_path, true);
1258
1
    if (!ost.ok()) {
  Branch (1258:9): [True: 0, False: 1]
1259
0
        std::stringstream ss;
1260
0
        ss << "failed to reload header of tablet: " << tablet_id;
1261
0
        LOG(WARNING) << ss.str();
1262
0
        return Status::InternalError(ss.str());
1263
0
    }
1264
1265
    // mark the snapshot path as loaded
1266
1
    RETURN_IF_ERROR(write_loaded_tag(snapshot_path, tablet_id));
1267
1268
1
    LOG(INFO) << "finished to reload header of tablet: " << tablet_id;
1269
1270
1
    return status;
1271
1
}
1272
1273
Status SnapshotLoader::_get_tablet_id_and_schema_hash_from_file_path(const std::string& src_path,
1274
                                                                     int64_t* tablet_id,
1275
6
                                                                     int32_t* schema_hash) {
1276
    // path should be like: /path/.../tablet_id/schema_hash
1277
    // we try to extract tablet_id from path
1278
6
    size_t pos = src_path.find_last_of("/");
1279
6
    if (pos == std::string::npos || pos == src_path.length() - 1) {
  Branch (1279:9): [True: 0, False: 6]
  Branch (1279:37): [True: 1, False: 5]
1280
1
        return Status::InternalError("failed to get tablet id from path: {}", src_path);
1281
1
    }
1282
1283
5
    std::string schema_hash_str = src_path.substr(pos + 1);
1284
5
    std::stringstream ss1;
1285
5
    ss1 << schema_hash_str;
1286
5
    ss1 >> *schema_hash;
1287
1288
    // skip schema hash part
1289
5
    size_t pos2 = src_path.find_last_of("/", pos - 1);
1290
5
    if (pos2 == std::string::npos) {
  Branch (1290:9): [True: 0, False: 5]
1291
0
        return Status::InternalError("failed to get tablet id from path: {}", src_path);
1292
0
    }
1293
1294
5
    std::string tablet_str = src_path.substr(pos2 + 1, pos - pos2);
1295
5
    std::stringstream ss2;
1296
5
    ss2 << tablet_str;
1297
5
    ss2 >> *tablet_id;
1298
1299
5
    VLOG_CRITICAL << "get tablet id " << *tablet_id << ", schema hash: " << *schema_hash
Line
Count
Source
43
0
#define VLOG_CRITICAL VLOG(1)
1300
0
                  << " from path: " << src_path;
1301
5
    return Status::OK();
1302
5
}
1303
1304
Status SnapshotLoader::_check_local_snapshot_paths(
1305
4
        const std::map<std::string, std::string>& src_to_dest_path, bool check_src) {
1306
4
    bool res = true;
1307
4
    for (const auto& pair : src_to_dest_path) {
  Branch (1307:27): [True: 4, False: 2]
1308
4
        std::string path;
1309
4
        if (check_src) {
  Branch (1309:13): [True: 2, False: 2]
1310
2
            path = pair.first;
1311
2
        } else {
1312
2
            path = pair.second;
1313
2
        }
1314
1315
4
        RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(path, &res));
1316
2
        if (!res) {
  Branch (1316:13): [True: 0, False: 2]
1317
0
            std::stringstream ss;
1318
0
            ss << "snapshot path is not directory or does not exist: " << path;
1319
0
            LOG(WARNING) << ss.str();
1320
0
            return Status::RuntimeError(ss.str());
1321
0
        }
1322
2
    }
1323
2
    LOG(INFO) << "all local snapshot paths are existing. num: " << src_to_dest_path.size();
1324
2
    return Status::OK();
1325
4
}
1326
1327
Status SnapshotLoader::_get_existing_files_from_local(const std::string& local_path,
1328
3
                                                      std::vector<std::string>* local_files) {
1329
3
    bool exists = true;
1330
3
    std::vector<io::FileInfo> files;
1331
3
    RETURN_IF_ERROR(io::global_local_filesystem()->list(local_path, true, &files, &exists));
1332
4
    for (auto& file : files) {
  Branch (1332:21): [True: 4, False: 3]
1333
4
        local_files->push_back(file.file_name);
1334
4
    }
1335
3
    LOG(INFO) << "finished to list files in local path: " << local_path
1336
3
              << ", file num: " << local_files->size();
1337
3
    return Status::OK();
1338
3
}
1339
1340
Status SnapshotLoader::_replace_tablet_id(const std::string& file_name, int64_t tablet_id,
1341
8
                                          std::string* new_file_name) {
1342
    // eg:
1343
    // 10007.hdr
1344
    // 10007_2_2_0_0.idx
1345
    // 10007_2_2_0_0.dat
1346
8
    if (_end_with(file_name, ".hdr")) {
  Branch (1346:9): [True: 3, False: 5]
1347
3
        std::stringstream ss;
1348
3
        ss << tablet_id << ".hdr";
1349
3
        *new_file_name = ss.str();
1350
3
        return Status::OK();
1351
5
    } else if (_end_with(file_name, ".idx") || _end_with(file_name, ".dat")) {
  Branch (1351:16): [True: 1, False: 4]
  Branch (1351:48): [True: 3, False: 1]
1352
4
        *new_file_name = file_name;
1353
4
        return Status::OK();
1354
4
    } else {
1355
1
        return Status::InternalError("invalid tablet file name: {}", file_name);
1356
1
    }
1357
8
}
1358
1359
Status BaseSnapshotLoader::_get_tablet_id_from_remote_path(const std::string& remote_path,
1360
1
                                                           int64_t* tablet_id) {
1361
    // eg:
1362
    // bos://xxx/../__tbl_10004/__part_10003/__idx_10004/__10005
1363
1
    size_t pos = remote_path.find_last_of("_");
1364
1
    if (pos == std::string::npos) {
  Branch (1364:9): [True: 0, False: 1]
1365
0
        return Status::InternalError("invalid remove file path: {}", remote_path);
1366
0
    }
1367
1368
1
    std::string tablet_id_str = remote_path.substr(pos + 1);
1369
1
    std::stringstream ss;
1370
1
    ss << tablet_id_str;
1371
1
    ss >> *tablet_id;
1372
1373
1
    return Status::OK();
1374
1
}
1375
1376
// only return CANCELLED if FE return that job is cancelled.
1377
// otherwise, return OK
1378
Status BaseSnapshotLoader::_report_every(int report_threshold, int* counter, int32_t finished_num,
1379
0
                                         int32_t total_num, TTaskType::type type) {
1380
0
    ++*counter;
1381
0
    if (*counter <= report_threshold) {
  Branch (1381:9): [True: 0, False: 0]
1382
0
        return Status::OK();
1383
0
    }
1384
1385
0
    LOG(INFO) << "report to frontend. job id: " << _job_id << ", task id: " << _task_id
1386
0
              << ", finished num: " << finished_num << ", total num:" << total_num;
1387
1388
0
    TNetworkAddress master_addr = _env->cluster_info()->master_fe_addr;
1389
1390
0
    TSnapshotLoaderReportRequest request;
1391
0
    request.job_id = _job_id;
1392
0
    request.task_id = _task_id;
1393
0
    request.task_type = type;
1394
0
    request.__set_finished_num(finished_num);
1395
0
    request.__set_total_num(total_num);
1396
0
    TStatus report_st;
1397
1398
0
    Status rpcStatus = ThriftRpcHelper::rpc<FrontendServiceClient>(
1399
0
            master_addr.hostname, master_addr.port,
1400
0
            [&request, &report_st](FrontendServiceConnection& client) {
1401
0
                client->snapshotLoaderReport(report_st, request);
1402
0
            },
1403
0
            10000);
1404
1405
0
    if (!rpcStatus.ok()) {
  Branch (1405:9): [True: 0, False: 0]
1406
        // rpc failed, ignore
1407
0
        return Status::OK();
1408
0
    }
1409
1410
    // reset
1411
0
    *counter = 0;
1412
0
    if (report_st.status_code == TStatusCode::CANCELLED) {
  Branch (1412:9): [True: 0, False: 0]
1413
0
        LOG(INFO) << "job is cancelled. job id: " << _job_id << ", task id: " << _task_id;
1414
0
        return Status::Cancelled("Cancelled");
1415
0
    }
1416
0
    return Status::OK();
1417
0
}
1418
1419
Status BaseSnapshotLoader::_list_with_checksum(const std::string& dir,
1420
0
                                               std::map<std::string, FileStat>* md5_files) {
1421
0
    bool exists = true;
1422
0
    std::vector<io::FileInfo> files;
1423
0
    RETURN_IF_ERROR(_remote_fs->list(dir, true, &files, &exists));
1424
0
    for (auto& tmp_file : files) {
  Branch (1424:25): [True: 0, False: 0]
1425
0
        io::Path path(tmp_file.file_name);
1426
0
        std::string file_name = path.filename();
1427
0
        size_t pos = file_name.find_last_of(".");
1428
0
        if (pos == std::string::npos || pos == file_name.size() - 1) {
  Branch (1428:13): [True: 0, False: 0]
  Branch (1428:41): [True: 0, False: 0]
1429
            // Not found checksum separator, ignore this file
1430
0
            continue;
1431
0
        }
1432
0
        FileStat stat = {std::string(file_name, 0, pos), std::string(file_name, pos + 1),
1433
0
                         tmp_file.file_size};
1434
0
        md5_files->emplace(std::string(file_name, 0, pos), stat);
1435
0
    }
1436
1437
0
    return Status::OK();
1438
0
}
1439
1440
} // end namespace doris