Coverage Report

Created: 2026-07-01 07:02

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_snapshot_loader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_snapshot_loader.h"
19
20
#include <gen_cpp/Types_types.h>
21
22
#include <unordered_map>
23
24
#include "cloud/cloud_snapshot_mgr.h"
25
#include "cloud/cloud_storage_engine.h"
26
#include "common/logging.h"
27
#include "io/fs/broker_file_system.h"
28
#include "io/fs/file_system.h"
29
#include "io/fs/hdfs_file_system.h"
30
#include "io/fs/path.h"
31
#include "io/fs/remote_file_system.h"
32
#include "io/fs/s3_file_system.h"
33
#include "storage/olap_common.h"
34
#include "storage/olap_define.h"
35
#include "storage/rowset/rowset_factory.h"
36
#include "storage/rowset/rowset_meta.h"
37
#include "storage/rowset/rowset_writer.h"
38
#include "storage/rowset/rowset_writer_context.h"
39
#include "storage/tablet/tablet.h"
40
#include "util/slice.h"
41
42
namespace doris {
43
#include "common/compile_check_avoid_begin.h"
44
namespace {
45
0
bool _end_with(std::string_view str, std::string_view match) {
46
0
    return str.size() >= match.size() &&
47
0
           str.compare(str.size() - match.size(), match.size(), match) == 0;
48
0
}
49
} // namespace
50
51
CloudSnapshotLoader::CloudSnapshotLoader(CloudStorageEngine& engine, ExecEnv* env, int64_t job_id,
52
                                         int64_t task_id, const TNetworkAddress& broker_addr,
53
                                         const std::map<std::string, std::string>& broker_prop)
54
0
        : BaseSnapshotLoader(env, job_id, task_id, broker_addr, broker_prop), _engine(engine) {};
55
56
Status CloudSnapshotLoader::init(TStorageBackendType::type type, const std::string& location,
57
0
                                 std::string vault_id) {
58
0
    RETURN_IF_ERROR(BaseSnapshotLoader::init(type, location));
59
0
    _storage_resource = _engine.get_storage_resource(vault_id);
60
0
    if (!_storage_resource) {
61
0
        return Status::InternalError("vault id not found, vault id {}", vault_id);
62
0
    }
63
0
    return Status::OK();
64
0
}
65
66
0
io::RemoteFileSystemSPtr CloudSnapshotLoader::storage_fs() {
67
0
    return _storage_resource->fs;
68
0
}
69
70
Status CloudSnapshotLoader::upload(const std::map<std::string, std::string>& src_to_dest_path,
71
0
                                   std::map<int64_t, std::vector<std::string>>* tablet_files) {
72
0
    return Status::NotSupported("upload not supported");
73
0
}
74
75
Status CloudSnapshotLoader::download(const std::map<std::string, std::string>& src_to_dest_path,
76
0
                                     std::vector<int64_t>* downloaded_tablet_ids) {
77
0
    if (!_remote_fs || !_storage_resource) {
78
0
        return Status::InternalError("Storage backend not initialized.");
79
0
    }
80
81
0
    LOG(INFO) << "begin to transfer snapshot files. num: " << src_to_dest_path.size()
82
0
              << ", broker addr: " << _broker_addr << ", job: " << _job_id
83
0
              << ", task id: " << _task_id;
84
85
    // check if job has already been cancelled
86
0
    int tmp_counter = 1;
87
0
    RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::DOWNLOAD));
88
89
0
    Status status = Status::OK();
90
91
    // 1. for each src path, transfer files to target path
92
0
    int report_counter = 0;
93
0
    int total_num = src_to_dest_path.size();
94
0
    int finished_num = 0;
95
0
    for (const auto& iter : src_to_dest_path) {
96
        // remote_path eg:
97
        // cos://xxx/__palo_repository_xxx/_ss_xxx/_ss_content/__db_10000/
98
        // __tbl_10001/__part_10002/_idx_10001/__10003
99
0
        const std::string& remote_path = iter.first;
100
0
        const std::string& tablet_str = iter.second;
101
0
        int64_t target_tablet_id = -1;
102
0
        try {
103
0
            target_tablet_id = std::stoll(tablet_str);
104
0
        } catch (std::exception& e) {
105
0
            return Status::InternalError("failed to parse target tablet id {}, {}", tablet_str,
106
0
                                         e.what());
107
0
        }
108
0
        const std::string target_path = _storage_resource->remote_tablet_path(target_tablet_id);
109
110
        // 1.1. check target path not exists
111
0
        bool target_path_exist = false;
112
0
        if (!storage_fs()->exists(target_path, &target_path_exist).ok() || target_path_exist) {
113
0
            std::stringstream ss;
114
0
            ss << "failed to download snapshot files, target path already exists: " << target_path;
115
0
            LOG(WARNING) << ss.str();
116
0
            return Status::InternalError(ss.str());
117
0
        }
118
119
0
        downloaded_tablet_ids->push_back(target_tablet_id);
120
121
0
        int64_t remote_tablet_id;
122
0
        RETURN_IF_ERROR(_get_tablet_id_from_remote_path(remote_path, &remote_tablet_id));
123
0
        VLOG_CRITICAL << "get target tablet id: " << target_tablet_id
124
0
                      << ", remote tablet id: " << remote_tablet_id;
125
126
        // 1.2. get remote files
127
0
        std::map<std::string, FileStat> remote_files;
128
0
        RETURN_IF_ERROR(_list_with_checksum(remote_path, &remote_files));
129
0
        if (remote_files.empty()) {
130
0
            std::stringstream ss;
131
0
            ss << "get nothing from remote path: " << remote_path;
132
0
            LOG(WARNING) << ss.str();
133
0
            return Status::InternalError(ss.str());
134
0
        }
135
136
0
        auto remote_hdr_file_path = [&remote_files, &remote_path](std::string& full_hdr_path,
137
0
                                                                  size_t* hdr_file_len) {
138
0
            for (auto iter = remote_files.begin(); iter != remote_files.end();) {
139
0
                if (_end_with(iter->first, ".hdr")) {
140
0
                    *hdr_file_len = iter->second.size;
141
0
                    full_hdr_path = remote_path + "/" + iter->first + "." + iter->second.md5;
142
                    // remove hdr file from remote_files
143
0
                    iter = remote_files.erase(iter);
144
0
                    return true;
145
0
                } else {
146
0
                    ++iter;
147
0
                }
148
0
            }
149
0
            return false;
150
0
        };
151
152
0
        size_t hdr_file_len;
153
0
        std::string full_remote_hdr_path;
154
0
        if (!remote_hdr_file_path(full_remote_hdr_path, &hdr_file_len)) {
155
0
            std::stringstream ss;
156
0
            ss << "failed to find hdr file from remote_path: " << remote_path;
157
0
            LOG(WARNING) << ss.str();
158
0
            return Status::InternalError(ss.str());
159
0
        }
160
161
        // 1.3. download hdr file
162
0
        io::FileReaderOptions reader_options;
163
0
        reader_options.file_size = static_cast<int64_t>(hdr_file_len);
164
0
        LOG(INFO) << "download hdr file: " << full_remote_hdr_path;
165
0
        io::FileReaderSPtr hdr_reader = nullptr;
166
0
        RETURN_IF_ERROR(_remote_fs->open_file(full_remote_hdr_path, &hdr_reader, &reader_options));
167
0
        std::unique_ptr<char[]> read_buf = std::make_unique_for_overwrite<char[]>(hdr_file_len);
168
0
        size_t read_len = 0;
169
0
        Slice hdr_slice(read_buf.get(), hdr_file_len);
170
0
        RETURN_IF_ERROR(hdr_reader->read_at(0, hdr_slice, &read_len));
171
0
        if (read_len != hdr_file_len) {
172
0
            std::stringstream ss;
173
0
            ss << "failed to read hdr file: " << full_remote_hdr_path;
174
0
            LOG(WARNING) << ss.str();
175
0
            return Status::InternalError(ss.str());
176
0
        }
177
178
0
        RETURN_IF_ERROR(
179
0
                _report_every(0, &tmp_counter, finished_num, total_num, TTaskType::type::DOWNLOAD));
180
181
        // 1.4. make snapshot
182
0
        std::unordered_map<std::string, std::string> file_mapping;
183
0
        RETURN_IF_ERROR(_engine.cloud_snapshot_mgr().make_snapshot(
184
0
                target_tablet_id, *_storage_resource, file_mapping, true, &hdr_slice));
185
186
0
        LOG(INFO) << "finish to make snapshot for tablet: " << target_tablet_id;
187
188
        // 1.5. download files
189
0
        for (auto& nested_iter : remote_files) {
190
0
            RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num,
191
0
                                          TTaskType::type::DOWNLOAD));
192
0
            const std::string& remote_file = nested_iter.first;
193
0
            const FileStat& file_stat = nested_iter.second;
194
0
            auto find = file_mapping.find(remote_file);
195
0
            if (find == file_mapping.end()) {
196
0
                continue;
197
0
            }
198
0
            std::string target_file = find->second;
199
0
            std::string full_remote_file = remote_path + "/" + remote_file + "." + file_stat.md5;
200
0
            std::string full_target_file = target_path + "/" + target_file;
201
0
            LOG(INFO) << "begin to download from " << full_remote_file << " to "
202
0
                      << full_target_file;
203
0
            io::FileReaderOptions nested_reader_options;
204
0
            nested_reader_options.file_size = static_cast<int64_t>(file_stat.size);
205
0
            io::FileReaderSPtr file_reader = nullptr;
206
0
            RETURN_IF_ERROR(
207
0
                    _remote_fs->open_file(full_remote_file, &file_reader, &nested_reader_options));
208
0
            io::FileWriterPtr file_writer = nullptr;
209
0
            RETURN_IF_ERROR(storage_fs()->create_file(full_target_file, &file_writer));
210
0
            size_t buf_size = config::s3_file_system_local_upload_buffer_size;
211
0
            std::unique_ptr<char[]> transfer_buffer =
212
0
                    std::make_unique_for_overwrite<char[]>(buf_size);
213
0
            size_t cur_offset = 0;
214
            // (TODO) Add Verification that the length of the source file is consistent
215
            // with the length of the target file
216
0
            while (true) {
217
0
                read_len = 0;
218
0
                RETURN_IF_ERROR(file_reader->read_at(
219
0
                        cur_offset, Slice {transfer_buffer.get(), buf_size}, &read_len));
220
0
                cur_offset += read_len;
221
0
                if (read_len == 0) {
222
0
                    break;
223
0
                }
224
0
                RETURN_IF_ERROR(file_writer->append({transfer_buffer.get(), read_len}));
225
0
            }
226
0
            RETURN_IF_ERROR(file_writer->close());
227
0
        }
228
229
0
        finished_num++;
230
231
        // (TODO) Add bvar metrics to track download time
232
0
    } // end for src_to_dest_path
233
234
0
    LOG(INFO) << "finished to download snapshots. job: " << _job_id << ", task id: " << _task_id;
235
0
    return status;
236
0
}
237
238
#include "common/compile_check_avoid_end.h"
239
} // end namespace doris