Coverage Report

Created: 2026-03-12 14:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/storage_policy.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/storage_policy.h"
19
20
#include <gen_cpp/cloud.pb.h>
21
#include <glog/logging.h>
22
23
#include <algorithm>
24
#include <cstdlib>
25
#include <cstring>
26
#include <mutex>
27
#include <ranges>
28
#include <unordered_map>
29
30
#include "storage/olap_define.h"
31
#include "storage/rowset/rowset_meta.h"
32
#include "util/hash_util.hpp"
33
34
namespace doris {
35
36
struct StoragePolicyMgr {
37
    std::mutex mtx;
38
    std::unordered_map<int64_t, StoragePolicyPtr> map;
39
};
40
41
static StoragePolicyMgr s_storage_policy_mgr;
42
43
127k
Result<StorageResource> get_resource_by_storage_policy_id(int64_t storage_policy_id) {
44
127k
    auto storage_policy = get_storage_policy(storage_policy_id);
45
127k
    if (storage_policy == nullptr) {
46
127k
        return ResultError(Status::NotFound<false>(
47
127k
                "could not find storage_policy, storage_policy_id={}", storage_policy_id));
48
127k
    }
49
50
19
    if (auto resource = get_storage_resource(storage_policy->resource_id); resource) {
51
19
        return resource->first;
52
19
    } else {
53
0
        return ResultError(Status::NotFound<false>("could not find resource, resource_id={}",
54
0
                                                   storage_policy->resource_id));
55
0
    }
56
19
}
57
58
127k
StoragePolicyPtr get_storage_policy(int64_t id) {
59
127k
    std::lock_guard lock(s_storage_policy_mgr.mtx);
60
127k
    if (auto it = s_storage_policy_mgr.map.find(id); it != s_storage_policy_mgr.map.end()) {
61
31
        return it->second;
62
31
    }
63
127k
    return nullptr;
64
127k
}
65
66
29
void put_storage_policy(int64_t id, StoragePolicyPtr policy) {
67
29
    std::lock_guard lock(s_storage_policy_mgr.mtx);
68
29
    s_storage_policy_mgr.map[id] = std::move(policy);
69
29
}
70
71
0
void delete_storage_policy(int64_t id) {
72
0
    std::lock_guard lock(s_storage_policy_mgr.mtx);
73
0
    s_storage_policy_mgr.map.erase(id);
74
0
}
75
76
27
std::vector<std::pair<int64_t, int64_t>> get_storage_policy_ids() {
77
27
    std::vector<std::pair<int64_t, int64_t>> res;
78
27
    res.reserve(s_storage_policy_mgr.map.size());
79
27
    std::lock_guard lock(s_storage_policy_mgr.mtx);
80
60
    for (auto& [id, policy] : s_storage_policy_mgr.map) {
81
60
        res.emplace_back(id, policy->version);
82
60
    }
83
27
    return res;
84
27
}
85
86
struct StorageResourceMgr {
87
    std::mutex mtx;
88
    // resource_id -> storage_resource, resource_version
89
    std::unordered_map<std::string, std::pair<StorageResource, int64_t>> map;
90
};
91
92
static StorageResourceMgr s_storage_resource_mgr;
93
94
121
io::RemoteFileSystemSPtr get_filesystem(const std::string& resource_id) {
95
121
    std::lock_guard lock(s_storage_resource_mgr.mtx);
96
121
    if (auto it = s_storage_resource_mgr.map.find(resource_id);
97
121
        it != s_storage_resource_mgr.map.end()) {
98
120
        return it->second.first.fs;
99
120
    }
100
1
    return nullptr;
101
121
}
102
103
88
std::optional<std::pair<StorageResource, int64_t>> get_storage_resource(int64_t resource_id) {
104
88
    return get_storage_resource(std::to_string(resource_id));
105
88
}
106
107
std::optional<std::pair<StorageResource, int64_t>> get_storage_resource(
108
88.8k
        const std::string& resource_id) {
109
88.8k
    std::lock_guard lock(s_storage_resource_mgr.mtx);
110
88.8k
    if (auto it = s_storage_resource_mgr.map.find(resource_id);
111
88.8k
        it != s_storage_resource_mgr.map.end()) {
112
88.8k
        return it->second;
113
88.8k
    }
114
4
    return std::nullopt;
115
88.8k
}
116
117
50
void put_storage_resource(std::string resource_id, StorageResource resource, int64_t version) {
118
50
    std::lock_guard lock(s_storage_resource_mgr.mtx);
119
50
    s_storage_resource_mgr.map[resource_id] = std::make_pair(std::move(resource), version);
120
50
}
121
122
49
void put_storage_resource(int64_t resource_id, StorageResource resource, int64_t version) {
123
49
    auto id_str = std::to_string(resource_id);
124
49
    put_storage_resource(id_str, std::move(resource), version);
125
49
}
126
127
0
void delete_storage_resource(int64_t resource_id) {
128
0
    auto id_str = std::to_string(resource_id);
129
0
    std::lock_guard lock(s_storage_resource_mgr.mtx);
130
0
    s_storage_resource_mgr.map.erase(id_str);
131
0
}
132
133
7
void clear_storage_resource() {
134
7
    std::lock_guard lock(s_storage_resource_mgr.mtx);
135
7
    s_storage_resource_mgr.map.clear();
136
7
}
137
138
27
std::vector<std::pair<std::string, int64_t>> get_storage_resource_ids() {
139
27
    std::vector<std::pair<std::string, int64_t>> res;
140
27
    res.reserve(s_storage_resource_mgr.map.size());
141
27
    std::lock_guard lock(s_storage_resource_mgr.mtx);
142
122
    for (auto& [id, resource] : s_storage_resource_mgr.map) {
143
122
        res.emplace_back(id, resource.second);
144
122
    }
145
27
    return res;
146
27
}
147
148
namespace {
149
150
0
[[noreturn]] void exit_at_unknown_path_version(std::string_view resource_id, int64_t path_version) {
151
0
    throw Exception(
152
0
            Status::FatalError("unknown path version, please upgrade BE or drop this storage "
153
0
                               "vault. resource_id={} path_version={}",
154
0
                               resource_id, path_version));
155
0
}
156
157
} // namespace
158
159
StorageResource::StorageResource(io::RemoteFileSystemSPtr fs_,
160
                                 const cloud::StorageVaultPB_PathFormat& path_format)
161
3
        : fs(std::move(fs_)), path_version(path_format.path_version()) {
162
3
    switch (path_version) {
163
2
    case 0:
164
2
        break;
165
1
    case 1:
166
4
        shard_fn = [shard_num = path_format.shard_num()](int64_t tablet_id) {
167
4
            return HashUtil::murmur_hash64A(static_cast<void*>(&tablet_id), sizeof(tablet_id),
168
4
                                            HashUtil::MURMUR_SEED) %
169
4
                   shard_num;
170
4
        };
171
1
        break;
172
0
    default:
173
0
        exit_at_unknown_path_version(fs->id(), path_version);
174
3
    }
175
3
}
176
177
std::string StorageResource::remote_segment_path(int64_t tablet_id, std::string_view rowset_id,
178
1.89M
                                                 int64_t seg_id) const {
179
1.89M
    switch (path_version) {
180
1.89M
    case 0:
181
1.89M
        return fmt::format("{}/{}/{}_{}.dat", DATA_PREFIX, tablet_id, rowset_id, seg_id);
182
1
    case 1:
183
1
        return fmt::format("{}/{}/{}/{}/{}.dat", DATA_PREFIX, shard_fn(tablet_id), tablet_id,
184
1
                           rowset_id, seg_id);
185
0
    default:
186
0
        exit_at_unknown_path_version(fs->id(), path_version);
187
1.89M
    }
188
1.89M
}
189
190
3
std::string StorageResource::remote_segment_path(const RowsetMeta& rowset, int64_t seg_id) const {
191
3
    switch (path_version) {
192
2
    case 0:
193
2
        return fmt::format("{}/{}/{}_{}.dat", DATA_PREFIX, rowset.tablet_id(),
194
2
                           rowset.rowset_id().to_string(), seg_id);
195
1
    case 1:
196
1
        return fmt::format("{}/{}/{}/{}/{}.dat", DATA_PREFIX, shard_fn(rowset.tablet_id()),
197
1
                           rowset.tablet_id(), rowset.rowset_id().to_string(), seg_id);
198
0
    default:
199
0
        exit_at_unknown_path_version(fs->id(), path_version);
200
3
    }
201
3
}
202
203
// TODO(dx)
204
// fix this, it is a tricky function. Pass the upper layer's tablet ID to the io layer instead of using this tricky method
205
// Tricky, It is used to parse tablet_id from remote segment path, and it is used in tablet manager to parse tablet_id from remote segment path.
206
// Static function to parse tablet_id from remote segment path
207
52.2k
std::optional<int64_t> StorageResource::parse_tablet_id_from_path(const std::string& path) {
208
    // Expected path formats:
209
    // support both .dat and .idx file extensions
210
    // support formate see ut. storage_resource_test:StorageResourceTest.ParseTabletIdFromPath
211
212
52.2k
    if (path.empty()) {
213
1
        return std::nullopt;
214
1
    }
215
216
    // Find the position of "data/" in the path
217
52.2k
    std::string_view path_view = path;
218
52.2k
    std::string_view data_prefix = DATA_PREFIX;
219
52.2k
    size_t data_pos = path_view.find(data_prefix);
220
52.2k
    if (data_pos == std::string_view::npos) {
221
3
        return std::nullopt;
222
3
    }
223
224
    // Extract the part after "data/"
225
52.2k
    path_view = path_view.substr(data_pos + data_prefix.length() + 1);
226
227
    // Check if path ends with .dat or .idx
228
52.2k
    if (!path_view.ends_with(".dat") && !path_view.ends_with(".idx")) {
229
3
        return std::nullopt;
230
3
    }
231
232
    // Count slashes in the remaining path
233
52.2k
    size_t slash_count = 0;
234
3.55M
    for (char c : path_view) {
235
3.55M
        if (c == '/') {
236
52.3k
            slash_count++;
237
52.3k
        }
238
3.55M
    }
239
240
    // Split path by '/'
241
52.2k
    std::vector<std::string_view> parts;
242
52.2k
    size_t start = 0;
243
52.2k
    size_t pos = 0;
244
104k
    while ((pos = path_view.find('/', start)) != std::string_view::npos) {
245
52.3k
        if (pos > start) {
246
52.3k
            parts.push_back(path_view.substr(start, pos - start));
247
52.3k
        }
248
52.3k
        start = pos + 1;
249
52.3k
    }
250
52.2k
    if (start < path_view.length()) {
251
52.2k
        parts.push_back(path_view.substr(start));
252
52.2k
    }
253
254
52.2k
    if (parts.empty()) {
255
0
        return std::nullopt;
256
0
    }
257
258
    // Determine path version based on slash count and extract tablet_id
259
    // Version 0: {tablet_id}/{rowset_id}_{seg_id}.dat (1 slash)
260
    // Version 1: {shard}/{tablet_id}/{rowset_id}/{seg_id}.dat (3 slashes)
261
262
52.2k
    if (slash_count == 1) {
263
        // Version 0 format: parts[0] should be tablet_id
264
52.2k
        if (parts.size() >= 1) {
265
52.2k
            try {
266
52.2k
                int64_t tablet_id = std::stoll(std::string(parts[0]));
267
52.2k
                return tablet_id;
268
52.2k
            } catch (const std::exception&) {
269
                // Not a valid number, return nullopt at last
270
1
            }
271
52.2k
        }
272
52.2k
    } else if (slash_count == 3) {
273
        // Version 1 format: parts[1] should be tablet_id (parts[0] is shard)
274
14
        if (parts.size() >= 2) {
275
14
            try {
276
14
                int64_t tablet_id = std::stoll(std::string(parts[1]));
277
14
                return tablet_id;
278
14
            } catch (const std::exception&) {
279
                // Not a valid number, return nullopt at last
280
4
            }
281
14
        }
282
14
    }
283
284
8
    return std::nullopt;
285
52.2k
}
286
287
std::string StorageResource::remote_idx_v1_path(const RowsetMeta& rowset, int64_t seg_id,
288
                                                int64_t index_id,
289
0
                                                std::string_view index_path_suffix) const {
290
0
    std::string suffix =
291
0
            index_path_suffix.empty() ? "" : std::string {"@"} + index_path_suffix.data();
292
0
    switch (path_version) {
293
0
    case 0:
294
0
        return fmt::format("{}/{}/{}_{}_{}{}.idx", DATA_PREFIX, rowset.tablet_id(),
295
0
                           rowset.rowset_id().to_string(), seg_id, index_id, suffix);
296
0
    case 1:
297
0
        return fmt::format("{}/{}/{}/{}/{}_{}{}.idx", DATA_PREFIX, shard_fn(rowset.tablet_id()),
298
0
                           rowset.tablet_id(), rowset.rowset_id().to_string(), seg_id, index_id,
299
0
                           suffix);
300
0
    default:
301
0
        exit_at_unknown_path_version(fs->id(), path_version);
302
0
    }
303
0
}
304
305
0
std::string StorageResource::remote_idx_v2_path(const RowsetMeta& rowset, int64_t seg_id) const {
306
0
    switch (path_version) {
307
0
    case 0:
308
0
        return fmt::format("{}/{}/{}_{}.idx", DATA_PREFIX, rowset.tablet_id(),
309
0
                           rowset.rowset_id().to_string(), seg_id);
310
0
    case 1:
311
0
        return fmt::format("{}/{}/{}/{}/{}.idx", DATA_PREFIX, shard_fn(rowset.tablet_id()),
312
0
                           rowset.tablet_id(), rowset.rowset_id().to_string(), seg_id);
313
0
    default:
314
0
        exit_at_unknown_path_version(fs->id(), path_version);
315
0
    }
316
0
}
317
318
19
std::string StorageResource::remote_tablet_path(int64_t tablet_id) const {
319
19
    switch (path_version) {
320
17
    case 0:
321
17
        return fmt::format("{}/{}", DATA_PREFIX, tablet_id);
322
2
    case 1:
323
2
        return fmt::format("{}/{}/{}", DATA_PREFIX, shard_fn(tablet_id), tablet_id);
324
0
    default:
325
0
        exit_at_unknown_path_version(fs->id(), path_version);
326
19
    }
327
19
}
328
329
std::string StorageResource::remote_delete_bitmap_path(int64_t tablet_id,
330
0
                                                       std::string_view rowset_id) const {
331
0
    switch (path_version) {
332
0
    case 0:
333
0
        return fmt::format("{}/{}/{}_delete_bitmap.db", DATA_PREFIX, tablet_id, rowset_id);
334
0
    case 1:
335
0
        return fmt::format("{}/{}/{}/{}_delete_bitmap.db", DATA_PREFIX, shard_fn(tablet_id),
336
0
                           tablet_id, rowset_id);
337
0
    default:
338
0
        exit_at_unknown_path_version(fs->id(), path_version);
339
0
    }
340
0
}
341
342
std::string StorageResource::cooldown_tablet_meta_path(int64_t tablet_id, int64_t replica_id,
343
9
                                                       int64_t cooldown_term) const {
344
9
    return remote_tablet_path(tablet_id) + '/' +
345
9
           cooldown_tablet_meta_filename(replica_id, cooldown_term);
346
9
}
347
348
} // namespace doris