Coverage Report

Created: 2026-04-11 14:25

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_snapshot_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_snapshot_mgr.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/olap_file.pb.h>
22
23
#include <map>
24
#include <unordered_map>
25
26
#include "cloud/cloud_meta_mgr.h"
27
#include "cloud/cloud_storage_engine.h"
28
#include "cloud/cloud_tablet_mgr.h"
29
#include "common/cast_set.h"
30
#include "common/config.h"
31
#include "common/logging.h"
32
#include "common/status.h"
33
#include "io/fs/local_file_system.h"
34
#include "runtime/memory/mem_tracker_limiter.h"
35
#include "runtime/thread_context.h"
36
#include "storage/data_dir.h"
37
#include "storage/olap_common.h"
38
#include "storage/olap_define.h"
39
#include "storage/pb_helper.h"
40
#include "storage/rowset/rowset.h"
41
#include "storage/rowset/rowset_factory.h"
42
#include "storage/rowset/rowset_meta.h"
43
#include "storage/rowset/rowset_writer.h"
44
#include "storage/rowset/rowset_writer_context.h"
45
#include "storage/storage_policy.h"
46
#include "storage/tablet/tablet_meta.h"
47
#include "storage/tablet/tablet_schema.h"
48
#include "storage/tablet/tablet_schema_cache.h"
49
#include "storage/utils.h"
50
#include "util/slice.h"
51
#include "util/uid_util.h"
52
53
namespace doris {
54
using namespace ErrorCode;
55
56
3
CloudSnapshotMgr::CloudSnapshotMgr(CloudStorageEngine& engine) : _engine(engine) {
57
3
    _mem_tracker =
58
3
            MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, "CloudSnapshotMgr");
59
3
}
60
61
Status CloudSnapshotMgr::make_snapshot(int64_t target_tablet_id, StorageResource& storage_resource,
62
                                       std::unordered_map<std::string, std::string>& file_mapping,
63
0
                                       bool is_restore, const Slice* slice) {
64
0
    SCOPED_ATTACH_TASK(_mem_tracker);
65
0
    if (is_restore && slice == nullptr) {
66
0
        return Status::Error<INVALID_ARGUMENT>("slice cannot be null in restore.");
67
0
    }
68
69
0
    CloudTabletSPtr target_tablet = DORIS_TRY(_engine.tablet_mgr().get_tablet(target_tablet_id));
70
0
    if (target_tablet == nullptr) {
71
0
        return Status::Error<TABLE_NOT_FOUND>("failed to get tablet. tablet={}", target_tablet_id);
72
0
    }
73
74
0
    TabletMeta tablet_meta;
75
0
    if (is_restore) {
76
        // 1. deserialize tablet meta from memory
77
0
        RETURN_IF_ERROR(tablet_meta.create_from_buffer((const uint8_t*)slice->data, slice->size));
78
0
        TabletMetaPB tablet_meta_pb;
79
0
        tablet_meta.to_meta_pb(&tablet_meta_pb, false);
80
81
0
        tablet_meta_pb.clear_rs_metas(); // copy the rs meta
82
0
        if (tablet_meta.all_rs_metas().size() > 0) {
83
0
            tablet_meta_pb.mutable_inc_rs_metas()->Reserve(
84
0
                    cast_set<int>(tablet_meta.all_rs_metas().size()));
85
0
            for (auto& [_, rs] : tablet_meta.all_rs_metas()) {
86
0
                rs->to_rowset_pb(tablet_meta_pb.add_rs_metas());
87
0
            }
88
0
        }
89
0
        tablet_meta_pb.clear_stale_rs_metas(); // strip off the stale rs meta
90
91
        // 2. convert rowsets
92
0
        TabletMetaPB new_tablet_meta_pb;
93
0
        RETURN_IF_ERROR(convert_rowsets(&new_tablet_meta_pb, tablet_meta_pb, target_tablet_id,
94
0
                                        target_tablet, storage_resource, file_mapping));
95
96
        // 3. send make snapshot request
97
0
        RETURN_IF_ERROR(_engine.meta_mgr().prepare_restore_job(new_tablet_meta_pb));
98
0
        return Status::OK();
99
0
    }
100
101
    // backup not implemented
102
103
0
    LOG(INFO) << "success to make snapshot. [tablet_id=" << target_tablet_id << "]";
104
0
    return Status::OK();
105
0
}
106
107
0
Status CloudSnapshotMgr::commit_snapshot(int64_t tablet_id) {
108
0
    SCOPED_ATTACH_TASK(_mem_tracker);
109
0
    CloudTabletSPtr tablet = DORIS_TRY(_engine.tablet_mgr().get_tablet(tablet_id));
110
0
    if (tablet == nullptr) {
111
0
        return Status::Error<TABLE_NOT_FOUND>("failed to get tablet. tablet={}", tablet_id);
112
0
    }
113
0
    RETURN_IF_ERROR(_engine.meta_mgr().commit_restore_job(tablet_id));
114
0
    tablet->clear_cache();
115
0
    LOG(INFO) << "success to commit snapshot. [tablet_id=" << tablet_id << "]";
116
0
    return Status::OK();
117
0
}
118
119
0
Status CloudSnapshotMgr::release_snapshot(int64_t tablet_id, bool is_completed) {
120
0
    SCOPED_ATTACH_TASK(_mem_tracker);
121
0
    RETURN_IF_ERROR(_engine.meta_mgr().finish_restore_job(tablet_id, is_completed));
122
0
    LOG(INFO) << "success to release snapshot. [tablet_id=" << tablet_id << "]";
123
0
    return Status::OK();
124
0
}
125
126
Status CloudSnapshotMgr::convert_rowsets(
127
        TabletMetaPB* out, const TabletMetaPB& in, int64_t tablet_id,
128
        CloudTabletSPtr& target_tablet, StorageResource& storage_resource,
129
1
        std::unordered_map<std::string, std::string>& file_mapping) {
130
1
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
131
    // deep copy
132
1
    *out = in;
133
134
1
    out->clear_rs_metas();
135
1
    out->clear_inc_rs_metas();
136
1
    out->clear_stale_rs_metas();
137
    // modify id
138
1
    out->set_tablet_id(tablet_id);
139
1
    *out->mutable_tablet_uid() = TabletUid::gen_uid().to_proto();
140
1
    out->set_table_id(target_tablet->table_id());
141
1
    out->set_partition_id(target_tablet->partition_id());
142
1
    out->set_index_id(target_tablet->index_id());
143
1
    PUniqueId* cooldown_meta_id = out->mutable_cooldown_meta_id();
144
1
    cooldown_meta_id->set_hi(0);
145
1
    cooldown_meta_id->set_lo(0);
146
147
1
    TabletSchemaSPtr target_tablet_schema = std::make_shared<TabletSchema>();
148
1
    target_tablet_schema->copy_from(*target_tablet->tablet_schema());
149
150
1
    TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
151
1
    tablet_schema->init_from_pb(in.schema());
152
153
1
    std::unordered_map<Version, RowsetMetaPB*, HashOfVersion> rs_version_map;
154
1
    std::unordered_map<RowsetId, RowsetId> rowset_id_mapping;
155
1
    for (auto&& rowset_meta_pb : in.rs_metas()) {
156
1
        RowsetMetaPB* new_rowset_meta_pb = out->add_rs_metas();
157
1
        RETURN_IF_ERROR(_create_rowset_meta(new_rowset_meta_pb, rowset_meta_pb, tablet_id,
158
1
                                            target_tablet, storage_resource, tablet_schema,
159
1
                                            file_mapping, rowset_id_mapping));
160
1
        if (new_rowset_meta_pb->has_tablet_schema() &&
161
1
            new_rowset_meta_pb->tablet_schema().index_size() > 0) {
162
1
            RETURN_IF_ERROR(_rename_index_ids(*new_rowset_meta_pb->mutable_tablet_schema(),
163
1
                                              target_tablet_schema));
164
1
        }
165
1
        Version rowset_version = {rowset_meta_pb.start_version(), rowset_meta_pb.end_version()};
166
1
        rs_version_map[rowset_version] = new_rowset_meta_pb;
167
1
    }
168
169
1
    if (out->schema().index_size() > 0) {
170
1
        RETURN_IF_ERROR(_rename_index_ids(*out->mutable_schema(), target_tablet_schema));
171
1
    }
172
173
1
    if (!rowset_id_mapping.empty() && in.has_delete_bitmap()) {
174
0
        const auto& old_del_bitmap_pb = in.delete_bitmap();
175
0
        DeleteBitmapPB* new_del_bitmap_pb = out->mutable_delete_bitmap();
176
0
        const int rst_ids_size = old_del_bitmap_pb.rowset_ids_size();
177
0
        if (rst_ids_size > 0) {
178
0
            new_del_bitmap_pb->mutable_rowset_ids()->Reserve(rst_ids_size);
179
0
        }
180
0
        LOG(INFO) << "convert delete bitmap rowset_ids. [rowset_ids_size=" << rst_ids_size << "]";
181
0
        for (size_t i = 0; i < rst_ids_size; ++i) {
182
0
            RowsetId rst_id;
183
0
            rst_id.init(old_del_bitmap_pb.rowset_ids(cast_set<int>(i)));
184
0
            auto it = rowset_id_mapping.find(rst_id);
185
            // It should not happen, if we can't convert some rowid in delete bitmap, the
186
            // data might be inconsist.
187
0
            CHECK(it != rowset_id_mapping.end())
188
0
                    << "can't find rowset_id " << rst_id.to_string() << " in convert_rowset_ids";
189
0
            new_del_bitmap_pb->set_rowset_ids(cast_set<int>(i), it->second.to_string());
190
0
        }
191
0
    }
192
193
1
    return Status::OK();
194
1
}
195
196
Status CloudSnapshotMgr::_create_rowset_meta(
197
        RowsetMetaPB* new_rowset_meta_pb, const RowsetMetaPB& source_meta_pb,
198
        int64_t target_tablet_id, CloudTabletSPtr& target_tablet, StorageResource& storage_resource,
199
        TabletSchemaSPtr tablet_schema, std::unordered_map<std::string, std::string>& file_mapping,
200
1
        std::unordered_map<RowsetId, RowsetId>& rowset_id_mapping) {
201
1
    RowsetId dst_rs_id = _engine.next_rowset_id();
202
1
    RowsetWriterContext context;
203
1
    context.rowset_id = dst_rs_id;
204
1
    context.tablet_id = target_tablet_id;
205
1
    context.partition_id = target_tablet->partition_id();
206
1
    context.index_id = target_tablet->index_id();
207
    // Note: use origin txn id
208
1
    context.txn_id = source_meta_pb.txn_id();
209
1
    context.txn_expiration = 0;
210
1
    context.rowset_state = source_meta_pb.rowset_state();
211
1
    context.storage_resource = storage_resource;
212
1
    context.tablet = target_tablet;
213
1
    context.version = {source_meta_pb.start_version(), source_meta_pb.end_version()};
214
1
    context.segments_overlap = source_meta_pb.segments_overlap_pb();
215
1
    context.tablet_schema_hash = source_meta_pb.tablet_schema_hash();
216
1
    if (source_meta_pb.has_tablet_schema()) {
217
1
        context.tablet_schema = std::make_shared<TabletSchema>();
218
1
        context.tablet_schema->init_from_pb(source_meta_pb.tablet_schema());
219
1
    } else {
220
0
        context.tablet_schema = tablet_schema;
221
0
    }
222
1
    context.newest_write_timestamp = source_meta_pb.newest_write_timestamp();
223
224
1
    auto rs_writer = DORIS_TRY(RowsetFactory::create_rowset_writer(_engine, context, false));
225
1
    rs_writer->rowset_meta()->to_rowset_pb(new_rowset_meta_pb);
226
227
    // build file mapping
228
1
    RowsetId src_rs_id;
229
1
    if (source_meta_pb.rowset_id() > 0) {
230
0
        src_rs_id.init(source_meta_pb.rowset_id());
231
1
    } else {
232
1
        src_rs_id.init(source_meta_pb.rowset_id_v2());
233
1
    }
234
1
    rowset_id_mapping[src_rs_id] = dst_rs_id;
235
236
4
    for (int i = 0; i < source_meta_pb.num_segments(); ++i) {
237
3
        std::string src_segment_file = fmt::format("{}_{}.dat", src_rs_id.to_string(), i);
238
3
        std::string dst_segment_file = fmt::format("{}_{}.dat", dst_rs_id.to_string(), i);
239
3
        file_mapping[src_segment_file] = dst_segment_file;
240
3
        if (context.tablet_schema->get_inverted_index_storage_format() ==
241
3
            InvertedIndexStorageFormatPB::V1) {
242
3
            for (const auto& index : context.tablet_schema->inverted_indexes()) {
243
3
                auto index_id = index->index_id();
244
3
                std::string src_index_file = InvertedIndexDescriptor::get_index_file_path_v1(
245
3
                        InvertedIndexDescriptor::get_index_file_path_prefix(src_segment_file),
246
3
                        index_id, index->get_index_suffix());
247
3
                std::string dst_index_file = InvertedIndexDescriptor::get_index_file_path_v1(
248
3
                        InvertedIndexDescriptor::get_index_file_path_prefix(dst_segment_file),
249
3
                        index_id, index->get_index_suffix());
250
3
                file_mapping[src_index_file] = dst_index_file;
251
3
            }
252
3
        } else {
253
0
            if (context.tablet_schema->has_inverted_index() ||
254
0
                context.tablet_schema->has_ann_index()) {
255
0
                std::string src_index_file = InvertedIndexDescriptor::get_index_file_path_v2(
256
0
                        InvertedIndexDescriptor::get_index_file_path_prefix(src_segment_file));
257
0
                std::string dst_index_file = InvertedIndexDescriptor::get_index_file_path_v2(
258
0
                        InvertedIndexDescriptor::get_index_file_path_prefix(dst_segment_file));
259
0
                file_mapping[src_index_file] = dst_index_file;
260
0
            }
261
0
        }
262
3
    }
263
264
    // build rowset meta
265
1
    new_rowset_meta_pb->set_num_rows(source_meta_pb.num_rows());
266
1
    new_rowset_meta_pb->set_total_disk_size(source_meta_pb.total_disk_size());
267
1
    new_rowset_meta_pb->set_data_disk_size(source_meta_pb.data_disk_size());
268
1
    new_rowset_meta_pb->set_index_disk_size(source_meta_pb.index_disk_size());
269
1
    new_rowset_meta_pb->set_empty(source_meta_pb.num_rows() == 0);
270
1
    new_rowset_meta_pb->set_creation_time(time(nullptr));
271
1
    new_rowset_meta_pb->set_num_segments(source_meta_pb.num_segments());
272
1
    new_rowset_meta_pb->set_rowset_state(source_meta_pb.rowset_state());
273
274
1
    new_rowset_meta_pb->clear_segments_key_bounds();
275
1
    for (const auto& key_bound : source_meta_pb.segments_key_bounds()) {
276
0
        *new_rowset_meta_pb->add_segments_key_bounds() = key_bound;
277
0
    }
278
1
    if (source_meta_pb.has_delete_predicate()) {
279
0
        DeletePredicatePB* new_delete_condition = new_rowset_meta_pb->mutable_delete_predicate();
280
0
        *new_delete_condition = source_meta_pb.delete_predicate();
281
0
    }
282
283
1
    return Status::OK();
284
1
}
285
286
Status CloudSnapshotMgr::_rename_index_ids(TabletSchemaPB& schema_pb,
287
3
                                           const TabletSchemaSPtr& tablet_schema) const {
288
3
    if (tablet_schema == nullptr) {
289
0
        return Status::OK();
290
0
    }
291
292
9
    for (int i = 0; i < schema_pb.index_size(); ++i) {
293
6
        TabletIndexPB* index_pb = schema_pb.mutable_index(i);
294
6
        for (int32_t col_unique_id : index_pb->col_unique_id()) {
295
6
            auto local_index = tablet_schema->get_index(col_unique_id, index_pb->index_type(),
296
6
                                                        index_pb->index_suffix_name());
297
6
            if (local_index) {
298
6
                if (index_pb->index_id() != local_index->index_id()) {
299
6
                    index_pb->set_index_id(local_index->index_id());
300
6
                }
301
6
                break;
302
6
            }
303
6
        }
304
6
    }
305
3
    return Status::OK();
306
3
}
307
308
} // namespace doris