Coverage Report

Created: 2026-03-17 00:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_snapshot_loader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_snapshot_loader.h"
19
20
#include <gen_cpp/Types_types.h>
21
22
#include <unordered_map>
23
24
#include "cloud/cloud_snapshot_mgr.h"
25
#include "cloud/cloud_storage_engine.h"
26
#include "common/logging.h"
27
#include "io/fs/broker_file_system.h"
28
#include "io/fs/file_system.h"
29
#include "io/fs/hdfs_file_system.h"
30
#include "io/fs/path.h"
31
#include "io/fs/remote_file_system.h"
32
#include "io/fs/s3_file_system.h"
33
#include "storage/olap_common.h"
34
#include "storage/olap_define.h"
35
#include "storage/rowset/rowset_factory.h"
36
#include "storage/rowset/rowset_meta.h"
37
#include "storage/rowset/rowset_writer.h"
38
#include "storage/rowset/rowset_writer_context.h"
39
#include "storage/tablet/tablet.h"
40
#include "util/slice.h"
41
42
namespace doris {
43
#include "common/compile_check_avoid_begin.h"
44
namespace {
45
0
bool _end_with(std::string_view str, std::string_view match) {
46
0
    return str.size() >= match.size() &&
47
0
           str.compare(str.size() - match.size(), match.size(), match) == 0;
48
0
}
49
} // namespace
50
51
CloudSnapshotLoader::CloudSnapshotLoader(CloudStorageEngine& engine, ExecEnv* env, int64_t job_id,
52
                                         int64_t task_id, const TNetworkAddress& broker_addr,
53
                                         const std::map<std::string, std::string>& broker_prop)
54
0
        : BaseSnapshotLoader(env, job_id, task_id, broker_addr, broker_prop), _engine(engine) {};
55
56
Status CloudSnapshotLoader::init(TStorageBackendType::type type, const std::string& location,
57
0
                                 std::string vault_id) {
58
0
    RETURN_IF_ERROR(BaseSnapshotLoader::init(type, location));
59
0
    _storage_resource = _engine.get_storage_resource(vault_id);
60
0
    if (!_storage_resource) {
61
0
        return Status::InternalError("vault id not found, vault id {}", vault_id);
62
0
    }
63
0
    return Status::OK();
64
0
}
65
66
0
io::RemoteFileSystemSPtr CloudSnapshotLoader::storage_fs() {
67
0
    return _storage_resource->fs;
68
0
}
69
70
Status CloudSnapshotLoader::upload(const std::map<std::string, std::string>& src_to_dest_path,
71
0
                                   std::map<int64_t, std::vector<std::string>>* tablet_files) {
72
0
    return Status::NotSupported("upload not supported");
73
0
}
74
75
Status CloudSnapshotLoader::download(const std::map<std::string, std::string>& src_to_dest_path,
76
0
                                     std::vector<int64_t>* downloaded_tablet_ids) {
77
0
    if (!_remote_fs || !_storage_resource) {
78
0
        return Status::InternalError("Storage backend not initialized.");
79
0
    }
80
81
0
    LOG(INFO) << "begin to transfer snapshot files. num: " << src_to_dest_path.size()
82
0
              << ", broker addr: " << _broker_addr << ", job: " << _job_id
83
0
              << ", task id: " << _task_id;
84
85
    // check if job has already been cancelled
86
0
    int tmp_counter = 1;
87
0
    RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::DOWNLOAD));
88
89
0
    Status status = Status::OK();
90
91
    // 1. for each src path, transfer files to target path
92
0
    int report_counter = 0;
93
0
    int total_num = src_to_dest_path.size();
94
0
    int finished_num = 0;
95
0
    for (const auto& iter : src_to_dest_path) {
96
        // remote_path eg:
97
        // cos://xxx/__palo_repository_xxx/_ss_xxx/_ss_content/__db_10000/
98
        // __tbl_10001/__part_10002/_idx_10001/__10003
99
0
        const std::string& remote_path = iter.first;
100
0
        const std::string& tablet_str = iter.second;
101
0
        int64_t target_tablet_id = -1;
102
0
        try {
103
0
            target_tablet_id = std::stoll(tablet_str);
104
0
        } catch (std::exception& e) {
105
0
            return Status::InternalError("failed to parse target tablet id {}, {}", tablet_str,
106
0
                                         e.what());
107
0
        }
108
0
        const std::string target_path = _storage_resource->remote_tablet_path(target_tablet_id);
109
110
        // 1.1. check target path not exists
111
0
        bool target_path_exist = false;
112
0
        if (!storage_fs()->exists(target_path, &target_path_exist).ok() || target_path_exist) {
113
0
            std::stringstream ss;
114
0
            ss << "failed to download snapshot files, target path already exists: " << target_path;
115
0
            LOG(WARNING) << ss.str();
116
0
            return Status::InternalError(ss.str());
117
0
        }
118
119
0
        downloaded_tablet_ids->push_back(target_tablet_id);
120
121
0
        int64_t remote_tablet_id;
122
0
        RETURN_IF_ERROR(_get_tablet_id_from_remote_path(remote_path, &remote_tablet_id));
123
0
        VLOG_CRITICAL << "get target tablet id: " << target_tablet_id
124
0
                      << ", remote tablet id: " << remote_tablet_id;
125
126
        // 1.2. get remote files
127
0
        std::map<std::string, FileStat> remote_files;
128
0
        RETURN_IF_ERROR(_list_with_checksum(remote_path, &remote_files));
129
0
        if (remote_files.empty()) {
130
0
            std::stringstream ss;
131
0
            ss << "get nothing from remote path: " << remote_path;
132
0
            LOG(WARNING) << ss.str();
133
0
            return Status::InternalError(ss.str());
134
0
        }
135
136
0
        auto remote_hdr_file_path = [&remote_files, &remote_path](std::string& full_hdr_path,
137
0
                                                                  size_t* hdr_file_len) {
138
0
            for (auto iter = remote_files.begin(); iter != remote_files.end();) {
139
0
                if (_end_with(iter->first, ".hdr")) {
140
0
                    *hdr_file_len = iter->second.size;
141
0
                    full_hdr_path = remote_path + "/" + iter->first + "." + iter->second.md5;
142
                    // remove hdr file from remote_files
143
0
                    iter = remote_files.erase(iter);
144
0
                    return true;
145
0
                } else {
146
0
                    ++iter;
147
0
                }
148
0
            }
149
0
            return false;
150
0
        };
151
152
0
        size_t hdr_file_len;
153
0
        std::string full_remote_hdr_path;
154
0
        if (!remote_hdr_file_path(full_remote_hdr_path, &hdr_file_len)) {
155
0
            std::stringstream ss;
156
0
            ss << "failed to find hdr file from remote_path: " << remote_path;
157
0
            LOG(WARNING) << ss.str();
158
0
            return Status::InternalError(ss.str());
159
0
        }
160
161
        // 1.3. download hdr file
162
0
        io::FileReaderOptions reader_options {
163
0
                .cache_type = io::FileCachePolicy::NO_CACHE,
164
0
                .is_doris_table = false,
165
0
                .cache_base_path = "",
166
0
                .file_size = static_cast<int64_t>(hdr_file_len),
167
0
        };
168
0
        LOG(INFO) << "download hdr file: " << full_remote_hdr_path;
169
0
        io::FileReaderSPtr hdr_reader = nullptr;
170
0
        RETURN_IF_ERROR(_remote_fs->open_file(full_remote_hdr_path, &hdr_reader, &reader_options));
171
0
        std::unique_ptr<char[]> read_buf = std::make_unique_for_overwrite<char[]>(hdr_file_len);
172
0
        size_t read_len = 0;
173
0
        Slice hdr_slice(read_buf.get(), hdr_file_len);
174
0
        RETURN_IF_ERROR(hdr_reader->read_at(0, hdr_slice, &read_len));
175
0
        if (read_len != hdr_file_len) {
176
0
            std::stringstream ss;
177
0
            ss << "failed to read hdr file: " << full_remote_hdr_path;
178
0
            LOG(WARNING) << ss.str();
179
0
            return Status::InternalError(ss.str());
180
0
        }
181
182
0
        RETURN_IF_ERROR(
183
0
                _report_every(0, &tmp_counter, finished_num, total_num, TTaskType::type::DOWNLOAD));
184
185
        // 1.4. make snapshot
186
0
        std::unordered_map<std::string, std::string> file_mapping;
187
0
        RETURN_IF_ERROR(_engine.cloud_snapshot_mgr().make_snapshot(
188
0
                target_tablet_id, *_storage_resource, file_mapping, true, &hdr_slice));
189
190
0
        LOG(INFO) << "finish to make snapshot for tablet: " << target_tablet_id;
191
192
        // 1.5. download files
193
0
        for (auto& nested_iter : remote_files) {
194
0
            RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num,
195
0
                                          TTaskType::type::DOWNLOAD));
196
0
            const std::string& remote_file = nested_iter.first;
197
0
            const FileStat& file_stat = nested_iter.second;
198
0
            auto find = file_mapping.find(remote_file);
199
0
            if (find == file_mapping.end()) {
200
0
                continue;
201
0
            }
202
0
            std::string target_file = find->second;
203
0
            std::string full_remote_file = remote_path + "/" + remote_file + "." + file_stat.md5;
204
0
            std::string full_target_file = target_path + "/" + target_file;
205
0
            LOG(INFO) << "begin to download from " << full_remote_file << " to "
206
0
                      << full_target_file;
207
0
            io::FileReaderOptions nested_reader_options {
208
0
                    .cache_type = io::FileCachePolicy::NO_CACHE,
209
0
                    .is_doris_table = false,
210
0
                    .cache_base_path = "",
211
0
                    .file_size = static_cast<int64_t>(file_stat.size),
212
0
            };
213
0
            io::FileReaderSPtr file_reader = nullptr;
214
0
            RETURN_IF_ERROR(
215
0
                    _remote_fs->open_file(full_remote_file, &file_reader, &nested_reader_options));
216
0
            io::FileWriterPtr file_writer = nullptr;
217
0
            RETURN_IF_ERROR(storage_fs()->create_file(full_target_file, &file_writer));
218
0
            size_t buf_size = config::s3_file_system_local_upload_buffer_size;
219
0
            std::unique_ptr<char[]> transfer_buffer =
220
0
                    std::make_unique_for_overwrite<char[]>(buf_size);
221
0
            size_t cur_offset = 0;
222
            // (TODO) Add Verification that the length of the source file is consistent
223
            // with the length of the target file
224
0
            while (true) {
225
0
                read_len = 0;
226
0
                RETURN_IF_ERROR(file_reader->read_at(
227
0
                        cur_offset, Slice {transfer_buffer.get(), buf_size}, &read_len));
228
0
                cur_offset += read_len;
229
0
                if (read_len == 0) {
230
0
                    break;
231
0
                }
232
0
                RETURN_IF_ERROR(file_writer->append({transfer_buffer.get(), read_len}));
233
0
            }
234
0
            RETURN_IF_ERROR(file_writer->close());
235
0
        }
236
237
0
        finished_num++;
238
239
        // (TODO) Add bvar metrics to track download time
240
0
    } // end for src_to_dest_path
241
242
0
    LOG(INFO) << "finished to download snapshots. job: " << _job_id << ", task id: " << _task_id;
243
0
    return status;
244
0
}
245
246
#include "common/compile_check_avoid_end.h"
247
} // end namespace doris