be/src/io/fs/packed_file_system.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "io/fs/packed_file_system.h" |
19 | | |
20 | | #include <string_view> |
21 | | #include <utility> |
22 | | |
23 | | #include "cloud/config.h" |
24 | | #include "common/status.h" |
25 | | #include "io/fs/file_reader.h" |
26 | | #include "io/fs/packed_file_reader.h" |
27 | | #include "io/fs/packed_file_writer.h" |
28 | | |
29 | | namespace doris::io { |
30 | | |
31 | | namespace { |
32 | | |
33 | | // Only keep packed-file aggregation for the first segment in a rowset. |
34 | | // The path handled here is expected to look like: |
35 | | // local/remote segment file: .../{rowset_id}_{segment_id}.dat |
36 | | // local/remote index file: .../{rowset_id}_{segment_id}.idx |
37 | | // The .idx case here is V2 only. V1 inverted-index tablets are filtered before |
38 | | // PackedFileSystem is enabled, so V1 names like |
39 | | // {rowset_id}_{segment_id}_{index_id}@{suffix}.idx never reach this helper. |
40 | | // Multi-segment rowsets usually come from large loads or memory-pressure flushes, |
41 | | // and continuing to buffer later segments in packed files can amplify memory usage. |
42 | | // Non-rowset file names keep the legacy behavior to avoid changing unrelated callers. |
43 | 1.00k | bool should_use_packed_writer(std::string_view file_name) { |
44 | 1.00k | constexpr std::string_view kSegmentSuffix = ".dat"; |
45 | 1.00k | constexpr std::string_view kIndexSuffix = ".idx"; |
46 | | |
47 | 1.00k | size_t suffix_len = 0; |
48 | 1.00k | if (file_name.ends_with(kSegmentSuffix)) { |
49 | 2 | suffix_len = kSegmentSuffix.size(); |
50 | 1.00k | } else if (file_name.ends_with(kIndexSuffix)) { |
51 | 1 | suffix_len = kIndexSuffix.size(); |
52 | 1.00k | } else { |
53 | 1.00k | return true; |
54 | 1.00k | } |
55 | | |
56 | 3 | file_name.remove_suffix(suffix_len); |
57 | 3 | size_t pos = file_name.rfind('_'); |
58 | 3 | if (pos == std::string_view::npos || pos + 1 >= file_name.size()) { |
59 | 0 | return true; |
60 | 0 | } |
61 | | |
62 | 3 | return file_name.substr(pos + 1) == "0"; |
63 | 3 | } |
64 | | |
65 | | } // namespace |
66 | | |
67 | | PackedFileSystem::PackedFileSystem(FileSystemSPtr inner_fs, PackedAppendContext append_info) |
68 | 1.01k | : FileSystem(inner_fs->id(), inner_fs->type()), |
69 | 1.01k | _inner_fs(std::move(inner_fs)), |
70 | 1.01k | _append_info(std::move(append_info)) { |
71 | 1.01k | if (_append_info.resource_id.empty() && _inner_fs != nullptr) { |
72 | 0 | _append_info.resource_id = _inner_fs->id(); |
73 | 0 | } |
74 | 1.01k | } |
75 | | |
76 | | PackedFileSystem::PackedFileSystem(FileSystemSPtr inner_fs, |
77 | | std::unordered_map<std::string, PackedSliceLocation> index_map, |
78 | | PackedAppendContext append_info) |
79 | 1.00k | : FileSystem(inner_fs->id(), inner_fs->type()), |
80 | 1.00k | _inner_fs(std::move(inner_fs)), |
81 | 1.00k | _index_map(std::move(index_map)), |
82 | 1.00k | _append_info(std::move(append_info)) { |
83 | 1.00k | if (_append_info.resource_id.empty() && _inner_fs != nullptr) { |
84 | 0 | _append_info.resource_id = _inner_fs->id(); |
85 | 0 | } |
86 | 1.00k | _index_map_initialized = true; |
87 | 1.00k | } |
88 | | |
89 | | Status PackedFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer, |
90 | 1.00k | const FileWriterOptions* opts) { |
91 | | // Create file using inner file system |
92 | 1.00k | FileWriterPtr inner_writer; |
93 | 1.00k | RETURN_IF_ERROR(_inner_fs->create_file(file, &inner_writer, opts)); |
94 | | |
95 | 1.00k | if (!should_use_packed_writer(file.filename().native())) { |
96 | 2 | *writer = std::move(inner_writer); |
97 | 2 | return Status::OK(); |
98 | 2 | } |
99 | | |
100 | 1.00k | auto append_info = _append_info; |
101 | 1.00k | if (config::enable_file_cache_write_index_file_only && opts != nullptr) { |
102 | 0 | append_info.write_file_cache = opts->write_file_cache; |
103 | 0 | } |
104 | | |
105 | | // Wrap with PackedFileWriter |
106 | 1.00k | *writer = std::make_unique<PackedFileWriter>(std::move(inner_writer), file, append_info); |
107 | 1.00k | return Status::OK(); |
108 | 1.00k | } |
109 | | |
110 | | Status PackedFileSystem::open_file_impl(const Path& file, FileReaderSPtr* reader, |
111 | 1.00k | const FileReaderOptions* opts) { |
112 | | // Check if this file is in a packed file |
113 | 1.00k | std::string file_path = file.native(); |
114 | 1.00k | auto it = _index_map.find(file_path); |
115 | 1.00k | PackedSliceLocation manager_location; |
116 | 1.00k | const PackedSliceLocation* packed_location = nullptr; |
117 | 1.00k | if (it != _index_map.end()) { |
118 | 1.00k | packed_location = &it->second; |
119 | 1.00k | } else if (PackedFileManager::instance() |
120 | 1 | ->get_packed_slice_location(file_path, &manager_location) |
121 | 1 | .ok()) { |
122 | 0 | packed_location = &manager_location; |
123 | 0 | } |
124 | | |
125 | 1.00k | if (packed_location != nullptr) { |
126 | | // File is in packed file, open packed file and wrap with PackedFileReader |
127 | 1.00k | const auto& index = *packed_location; |
128 | 1.00k | FileReaderSPtr inner_reader; |
129 | | |
130 | | // Create options for opening the packed file |
131 | | // Disable cache at this layer - we'll add cache wrapper around PackedFileReader instead |
132 | | // This ensures cache key is based on segment path, not packed file path |
133 | 1.00k | FileReaderOptions inner_opts = opts ? *opts : FileReaderOptions(); |
134 | 1.00k | inner_opts.file_size = index.packed_file_size; |
135 | 1.00k | inner_opts.cache_type = FileCachePolicy::NO_CACHE; |
136 | | |
137 | 1.00k | VLOG_DEBUG << "open packed file: " << index.packed_file_path << ", file: " << file.native() |
138 | 0 | << ", offset: " << index.offset << ", size: " << index.size |
139 | 0 | << ", packed_file_size: " << index.packed_file_size; |
140 | 1.00k | RETURN_IF_ERROR( |
141 | 1.00k | _inner_fs->open_file(Path(index.packed_file_path), &inner_reader, &inner_opts)); |
142 | | |
143 | | // Create PackedFileReader with segment path |
144 | | // PackedFileReader.path() returns segment path, not packed file path |
145 | 1.00k | auto packed_reader = std::make_shared<PackedFileReader>(std::move(inner_reader), file, |
146 | 1.00k | index.offset, index.size); |
147 | | |
148 | | // If cache is requested, wrap PackedFileReader with CachedRemoteFileReader |
149 | | // This ensures: |
150 | | // 1. Cache key = hash(segment_path.filename()) - matches cleanup key |
151 | | // 2. Cache size = segment size - correct boundary |
152 | | // 3. Each segment has independent cache entry - no interference during cleanup |
153 | 1.00k | if (opts && opts->cache_type != FileCachePolicy::NO_CACHE) { |
154 | 1.00k | FileReaderOptions cache_opts = *opts; |
155 | 1.00k | cache_opts.file_size = index.size; // Use segment size for cache |
156 | 1.00k | *reader = DORIS_TRY(create_cached_file_reader(packed_reader, cache_opts)); |
157 | 1.00k | } else { |
158 | 1 | *reader = packed_reader; |
159 | 1 | } |
160 | 1.00k | } else { |
161 | 1 | RETURN_IF_ERROR(_inner_fs->open_file(file, reader, opts)); |
162 | 1 | } |
163 | 1.00k | return Status::OK(); |
164 | 1.00k | } |
165 | | |
166 | 4 | Status PackedFileSystem::exists_impl(const Path& path, bool* res) const { |
167 | 4 | VLOG_DEBUG << "packed file system exist, rowset id " << _append_info.rowset_id; |
168 | 4 | if (!_index_map_initialized) { |
169 | 1 | return Status::InternalError("PackedFileSystem index map is not initialized"); |
170 | 1 | } |
171 | 3 | const auto it = _index_map.find(path.native()); |
172 | 3 | if (it != _index_map.end()) { |
173 | 1 | return _inner_fs->exists(Path(it->second.packed_file_path), res); |
174 | 1 | } |
175 | 2 | return _inner_fs->exists(path, res); |
176 | 3 | } |
177 | | |
178 | 5 | Status PackedFileSystem::file_size_impl(const Path& file, int64_t* file_size) const { |
179 | 5 | if (!_index_map_initialized) { |
180 | 1 | return Status::InternalError("PackedFileSystem index map is not initialized"); |
181 | 1 | } |
182 | 4 | const auto it = _index_map.find(file.native()); |
183 | 4 | if (it != _index_map.end()) { |
184 | 3 | *file_size = it->second.size; |
185 | 3 | return Status::OK(); |
186 | 3 | } |
187 | 1 | return _inner_fs->file_size(file, file_size); |
188 | 4 | } |
189 | | |
190 | | } // namespace doris::io |