be/src/io/fs/packed_file_system.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "io/fs/packed_file_system.h" |
19 | | |
20 | | #include <string_view> |
21 | | #include <utility> |
22 | | |
23 | | #include "common/status.h" |
24 | | #include "io/fs/file_reader.h" |
25 | | #include "io/fs/packed_file_reader.h" |
26 | | #include "io/fs/packed_file_writer.h" |
27 | | |
28 | | namespace doris::io { |
29 | | |
30 | | namespace { |
31 | | |
32 | | // Only keep packed-file aggregation for the first segment in a rowset. |
33 | | // The path handled here is expected to look like: |
34 | | // local/remote segment file: .../{rowset_id}_{segment_id}.dat |
35 | | // local/remote index file: .../{rowset_id}_{segment_id}.idx |
36 | | // The .idx case here is V2 only. V1 inverted-index tablets are filtered before |
37 | | // PackedFileSystem is enabled, so V1 names like |
38 | | // {rowset_id}_{segment_id}_{index_id}@{suffix}.idx never reach this helper. |
39 | | // Multi-segment rowsets usually come from large loads or memory-pressure flushes, |
40 | | // and continuing to buffer later segments in packed files can amplify memory usage. |
41 | | // Non-rowset file names keep the legacy behavior to avoid changing unrelated callers. |
42 | 1.00k | bool should_use_packed_writer(std::string_view file_name) { |
43 | 1.00k | constexpr std::string_view kSegmentSuffix = ".dat"; |
44 | 1.00k | constexpr std::string_view kIndexSuffix = ".idx"; |
45 | | |
46 | 1.00k | size_t suffix_len = 0; |
47 | 1.00k | if (file_name.ends_with(kSegmentSuffix)) { |
48 | 2 | suffix_len = kSegmentSuffix.size(); |
49 | 1.00k | } else if (file_name.ends_with(kIndexSuffix)) { |
50 | 1 | suffix_len = kIndexSuffix.size(); |
51 | 1.00k | } else { |
52 | 1.00k | return true; |
53 | 1.00k | } |
54 | | |
55 | 3 | file_name.remove_suffix(suffix_len); |
56 | 3 | size_t pos = file_name.rfind('_'); |
57 | 3 | if (pos == std::string_view::npos || pos + 1 >= file_name.size()) { |
58 | 0 | return true; |
59 | 0 | } |
60 | | |
61 | 3 | return file_name.substr(pos + 1) == "0"; |
62 | 3 | } |
63 | | |
64 | | } // namespace |
65 | | |
66 | | PackedFileSystem::PackedFileSystem(FileSystemSPtr inner_fs, PackedAppendContext append_info) |
67 | 1.01k | : FileSystem(inner_fs->id(), inner_fs->type()), |
68 | 1.01k | _inner_fs(std::move(inner_fs)), |
69 | 1.01k | _append_info(std::move(append_info)) { |
70 | 1.01k | if (_append_info.resource_id.empty() && _inner_fs != nullptr) { |
71 | 0 | _append_info.resource_id = _inner_fs->id(); |
72 | 0 | } |
73 | 1.01k | } |
74 | | |
75 | | PackedFileSystem::PackedFileSystem(FileSystemSPtr inner_fs, |
76 | | std::unordered_map<std::string, PackedSliceLocation> index_map, |
77 | | PackedAppendContext append_info) |
78 | 1.00k | : FileSystem(inner_fs->id(), inner_fs->type()), |
79 | 1.00k | _inner_fs(std::move(inner_fs)), |
80 | 1.00k | _index_map(std::move(index_map)), |
81 | 1.00k | _append_info(std::move(append_info)) { |
82 | 1.00k | if (_append_info.resource_id.empty() && _inner_fs != nullptr) { |
83 | 0 | _append_info.resource_id = _inner_fs->id(); |
84 | 0 | } |
85 | 1.00k | _index_map_initialized = true; |
86 | 1.00k | } |
87 | | |
88 | | Status PackedFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer, |
89 | 1.00k | const FileWriterOptions* opts) { |
90 | | // Create file using inner file system |
91 | 1.00k | FileWriterPtr inner_writer; |
92 | 1.00k | RETURN_IF_ERROR(_inner_fs->create_file(file, &inner_writer, opts)); |
93 | | |
94 | 1.00k | if (!should_use_packed_writer(file.filename().native())) { |
95 | 2 | *writer = std::move(inner_writer); |
96 | 2 | return Status::OK(); |
97 | 2 | } |
98 | | |
99 | | // Wrap with PackedFileWriter |
100 | 1.00k | *writer = std::make_unique<PackedFileWriter>(std::move(inner_writer), file, _append_info); |
101 | 1.00k | return Status::OK(); |
102 | 1.00k | } |
103 | | |
104 | | Status PackedFileSystem::open_file_impl(const Path& file, FileReaderSPtr* reader, |
105 | 1.00k | const FileReaderOptions* opts) { |
106 | | // Check if this file is in a packed file |
107 | 1.00k | std::string file_path = file.native(); |
108 | 1.00k | auto it = _index_map.find(file_path); |
109 | 1.00k | bool is_packed_file = (it != _index_map.end()); |
110 | | |
111 | 1.00k | if (is_packed_file) { |
112 | | // File is in packed file, open packed file and wrap with PackedFileReader |
113 | 1.00k | const auto& index = it->second; |
114 | 1.00k | FileReaderSPtr inner_reader; |
115 | | |
116 | | // Create options for opening the packed file |
117 | | // Disable cache at this layer - we'll add cache wrapper around PackedFileReader instead |
118 | | // This ensures cache key is based on segment path, not packed file path |
119 | 1.00k | FileReaderOptions inner_opts = opts ? *opts : FileReaderOptions(); |
120 | 1.00k | inner_opts.file_size = index.packed_file_size; |
121 | 1.00k | inner_opts.cache_type = FileCachePolicy::NO_CACHE; |
122 | | |
123 | 18.4E | VLOG_DEBUG << "open packed file: " << index.packed_file_path << ", file: " << file.native() |
124 | 18.4E | << ", offset: " << index.offset << ", size: " << index.size |
125 | 18.4E | << ", packed_file_size: " << index.packed_file_size; |
126 | 1.00k | RETURN_IF_ERROR( |
127 | 1.00k | _inner_fs->open_file(Path(index.packed_file_path), &inner_reader, &inner_opts)); |
128 | | |
129 | | // Create PackedFileReader with segment path |
130 | | // PackedFileReader.path() returns segment path, not packed file path |
131 | 1.00k | auto packed_reader = std::make_shared<PackedFileReader>(std::move(inner_reader), file, |
132 | 1.00k | index.offset, index.size); |
133 | | |
134 | | // If cache is requested, wrap PackedFileReader with CachedRemoteFileReader |
135 | | // This ensures: |
136 | | // 1. Cache key = hash(segment_path.filename()) - matches cleanup key |
137 | | // 2. Cache size = segment size - correct boundary |
138 | | // 3. Each segment has independent cache entry - no interference during cleanup |
139 | 1.00k | if (opts && opts->cache_type != FileCachePolicy::NO_CACHE) { |
140 | 1.00k | FileReaderOptions cache_opts = *opts; |
141 | 1.00k | cache_opts.file_size = index.size; // Use segment size for cache |
142 | 1.00k | *reader = DORIS_TRY(create_cached_file_reader(packed_reader, cache_opts)); |
143 | 1.00k | } else { |
144 | 0 | *reader = packed_reader; |
145 | 0 | } |
146 | 1.00k | } else { |
147 | 2 | RETURN_IF_ERROR(_inner_fs->open_file(file, reader, opts)); |
148 | 2 | } |
149 | 1.00k | return Status::OK(); |
150 | 1.00k | } |
151 | | |
152 | 4 | Status PackedFileSystem::exists_impl(const Path& path, bool* res) const { |
153 | 4 | VLOG_DEBUG << "packed file system exist, rowset id " << _append_info.rowset_id; |
154 | 4 | if (!_index_map_initialized) { |
155 | 1 | return Status::InternalError("PackedFileSystem index map is not initialized"); |
156 | 1 | } |
157 | 3 | const auto it = _index_map.find(path.native()); |
158 | 3 | if (it != _index_map.end()) { |
159 | 1 | return _inner_fs->exists(Path(it->second.packed_file_path), res); |
160 | 1 | } |
161 | 2 | return _inner_fs->exists(path, res); |
162 | 3 | } |
163 | | |
164 | 5 | Status PackedFileSystem::file_size_impl(const Path& file, int64_t* file_size) const { |
165 | 5 | if (!_index_map_initialized) { |
166 | 1 | return Status::InternalError("PackedFileSystem index map is not initialized"); |
167 | 1 | } |
168 | 4 | const auto it = _index_map.find(file.native()); |
169 | 4 | if (it != _index_map.end()) { |
170 | 3 | *file_size = it->second.size; |
171 | 3 | return Status::OK(); |
172 | 3 | } |
173 | 1 | return _inner_fs->file_size(file, file_size); |
174 | 4 | } |
175 | | |
176 | | } // namespace doris::io |