be/src/io/fs/packed_file_system.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "io/fs/packed_file_system.h" |
19 | | |
20 | | #include <utility> |
21 | | |
22 | | #include "common/status.h" |
23 | | #include "io/fs/file_reader.h" |
24 | | #include "io/fs/packed_file_reader.h" |
25 | | #include "io/fs/packed_file_writer.h" |
26 | | |
27 | | namespace doris::io { |
28 | | |
29 | | PackedFileSystem::PackedFileSystem(FileSystemSPtr inner_fs, PackedAppendContext append_info) |
30 | 1.01k | : FileSystem(inner_fs->id(), inner_fs->type()), |
31 | 1.01k | _inner_fs(std::move(inner_fs)), |
32 | 1.01k | _append_info(std::move(append_info)) { |
33 | 1.01k | if (_append_info.resource_id.empty() && _inner_fs != nullptr) { |
34 | 0 | _append_info.resource_id = _inner_fs->id(); |
35 | 0 | } |
36 | 1.01k | } |
37 | | |
38 | | PackedFileSystem::PackedFileSystem(FileSystemSPtr inner_fs, |
39 | | std::unordered_map<std::string, PackedSliceLocation> index_map, |
40 | | PackedAppendContext append_info) |
41 | 1.00k | : FileSystem(inner_fs->id(), inner_fs->type()), |
42 | 1.00k | _inner_fs(std::move(inner_fs)), |
43 | 1.00k | _index_map(std::move(index_map)), |
44 | 1.00k | _append_info(std::move(append_info)) { |
45 | 1.00k | if (_append_info.resource_id.empty() && _inner_fs != nullptr) { |
46 | 0 | _append_info.resource_id = _inner_fs->id(); |
47 | 0 | } |
48 | 1.00k | _index_map_initialized = true; |
49 | 1.00k | } |
50 | | |
51 | | Status PackedFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer, |
52 | 1.00k | const FileWriterOptions* opts) { |
53 | | // Create file using inner file system |
54 | 1.00k | FileWriterPtr inner_writer; |
55 | 1.00k | RETURN_IF_ERROR(_inner_fs->create_file(file, &inner_writer, opts)); |
56 | | |
57 | | // Wrap with PackedFileWriter |
58 | 1.00k | *writer = std::make_unique<PackedFileWriter>(std::move(inner_writer), file, _append_info); |
59 | 1.00k | return Status::OK(); |
60 | 1.00k | } |
61 | | |
62 | | Status PackedFileSystem::open_file_impl(const Path& file, FileReaderSPtr* reader, |
63 | 1.00k | const FileReaderOptions* opts) { |
64 | | // Check if this file is in a packed file |
65 | 1.00k | std::string file_path = file.native(); |
66 | 1.00k | auto it = _index_map.find(file_path); |
67 | 1.00k | bool is_packed_file = (it != _index_map.end()); |
68 | | |
69 | 1.00k | if (is_packed_file) { |
70 | | // File is in packed file, open packed file and wrap with PackedFileReader |
71 | 1.00k | const auto& index = it->second; |
72 | 1.00k | FileReaderSPtr inner_reader; |
73 | | |
74 | | // Create options for opening the packed file |
75 | | // Disable cache at this layer - we'll add cache wrapper around PackedFileReader instead |
76 | | // This ensures cache key is based on segment path, not packed file path |
77 | 1.00k | FileReaderOptions inner_opts = opts ? *opts : FileReaderOptions(); |
78 | 1.00k | inner_opts.file_size = index.packed_file_size; |
79 | 1.00k | inner_opts.cache_type = FileCachePolicy::NO_CACHE; |
80 | | |
81 | 18.4E | VLOG_DEBUG << "open packed file: " << index.packed_file_path << ", file: " << file.native() |
82 | 18.4E | << ", offset: " << index.offset << ", size: " << index.size |
83 | 18.4E | << ", packed_file_size: " << index.packed_file_size; |
84 | 1.00k | RETURN_IF_ERROR( |
85 | 1.00k | _inner_fs->open_file(Path(index.packed_file_path), &inner_reader, &inner_opts)); |
86 | | |
87 | | // Create PackedFileReader with segment path |
88 | | // PackedFileReader.path() returns segment path, not packed file path |
89 | 1.00k | auto packed_reader = std::make_shared<PackedFileReader>(std::move(inner_reader), file, |
90 | 1.00k | index.offset, index.size); |
91 | | |
92 | | // If cache is requested, wrap PackedFileReader with CachedRemoteFileReader |
93 | | // This ensures: |
94 | | // 1. Cache key = hash(segment_path.filename()) - matches cleanup key |
95 | | // 2. Cache size = segment size - correct boundary |
96 | | // 3. Each segment has independent cache entry - no interference during cleanup |
97 | 1.00k | if (opts && opts->cache_type != FileCachePolicy::NO_CACHE) { |
98 | 1.00k | FileReaderOptions cache_opts = *opts; |
99 | 1.00k | cache_opts.file_size = index.size; // Use segment size for cache |
100 | 1.00k | *reader = DORIS_TRY(create_cached_file_reader(packed_reader, cache_opts)); |
101 | 1.00k | } else { |
102 | 0 | *reader = packed_reader; |
103 | 0 | } |
104 | 1.00k | } else { |
105 | 2 | RETURN_IF_ERROR(_inner_fs->open_file(file, reader, opts)); |
106 | 2 | } |
107 | 1.00k | return Status::OK(); |
108 | 1.00k | } |
109 | | |
110 | 4 | Status PackedFileSystem::exists_impl(const Path& path, bool* res) const { |
111 | 4 | VLOG_DEBUG << "packed file system exist, rowset id " << _append_info.rowset_id; |
112 | 4 | if (!_index_map_initialized) { |
113 | 1 | return Status::InternalError("PackedFileSystem index map is not initialized"); |
114 | 1 | } |
115 | 3 | const auto it = _index_map.find(path.native()); |
116 | 3 | if (it != _index_map.end()) { |
117 | 1 | return _inner_fs->exists(Path(it->second.packed_file_path), res); |
118 | 1 | } |
119 | 2 | return _inner_fs->exists(path, res); |
120 | 3 | } |
121 | | |
122 | 5 | Status PackedFileSystem::file_size_impl(const Path& file, int64_t* file_size) const { |
123 | 5 | if (!_index_map_initialized) { |
124 | 1 | return Status::InternalError("PackedFileSystem index map is not initialized"); |
125 | 1 | } |
126 | 4 | const auto it = _index_map.find(file.native()); |
127 | 4 | if (it != _index_map.end()) { |
128 | 3 | *file_size = it->second.size; |
129 | 3 | return Status::OK(); |
130 | 3 | } |
131 | 1 | return _inner_fs->file_size(file, file_size); |
132 | 4 | } |
133 | | |
134 | | } // namespace doris::io |