Coverage Report

Created: 2026-07-02 23:26

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/packed_file_system.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/packed_file_system.h"
19
20
#include <string_view>
21
#include <utility>
22
23
#include "cloud/config.h"
24
#include "common/status.h"
25
#include "io/fs/file_reader.h"
26
#include "io/fs/packed_file_reader.h"
27
#include "io/fs/packed_file_writer.h"
28
29
namespace doris::io {
30
31
namespace {
32
33
// Only keep packed-file aggregation for the first segment in a rowset.
34
// The path handled here is expected to look like:
35
//   local/remote segment file: .../{rowset_id}_{segment_id}.dat
36
//   local/remote index file:   .../{rowset_id}_{segment_id}.idx
37
// The .idx case here is V2 only. V1 inverted-index tablets are filtered before
38
// PackedFileSystem is enabled, so V1 names like
39
// {rowset_id}_{segment_id}_{index_id}@{suffix}.idx never reach this helper.
40
// Multi-segment rowsets usually come from large loads or memory-pressure flushes,
41
// and continuing to buffer later segments in packed files can amplify memory usage.
42
// Non-rowset file names keep the legacy behavior to avoid changing unrelated callers.
43
1.00k
bool should_use_packed_writer(std::string_view file_name) {
44
1.00k
    constexpr std::string_view kSegmentSuffix = ".dat";
45
1.00k
    constexpr std::string_view kIndexSuffix = ".idx";
46
47
1.00k
    size_t suffix_len = 0;
48
1.00k
    if (file_name.ends_with(kSegmentSuffix)) {
49
2
        suffix_len = kSegmentSuffix.size();
50
1.00k
    } else if (file_name.ends_with(kIndexSuffix)) {
51
1
        suffix_len = kIndexSuffix.size();
52
1.00k
    } else {
53
1.00k
        return true;
54
1.00k
    }
55
56
3
    file_name.remove_suffix(suffix_len);
57
3
    size_t pos = file_name.rfind('_');
58
3
    if (pos == std::string_view::npos || pos + 1 >= file_name.size()) {
59
0
        return true;
60
0
    }
61
62
3
    return file_name.substr(pos + 1) == "0";
63
3
}
64
65
} // namespace
66
67
PackedFileSystem::PackedFileSystem(FileSystemSPtr inner_fs, PackedAppendContext append_info)
68
1.01k
        : FileSystem(inner_fs->id(), inner_fs->type()),
69
1.01k
          _inner_fs(std::move(inner_fs)),
70
1.01k
          _append_info(std::move(append_info)) {
71
1.01k
    if (_append_info.resource_id.empty() && _inner_fs != nullptr) {
72
0
        _append_info.resource_id = _inner_fs->id();
73
0
    }
74
1.01k
}
75
76
PackedFileSystem::PackedFileSystem(FileSystemSPtr inner_fs,
77
                                   std::unordered_map<std::string, PackedSliceLocation> index_map,
78
                                   PackedAppendContext append_info)
79
1.00k
        : FileSystem(inner_fs->id(), inner_fs->type()),
80
1.00k
          _inner_fs(std::move(inner_fs)),
81
1.00k
          _index_map(std::move(index_map)),
82
1.00k
          _append_info(std::move(append_info)) {
83
1.00k
    if (_append_info.resource_id.empty() && _inner_fs != nullptr) {
84
0
        _append_info.resource_id = _inner_fs->id();
85
0
    }
86
1.00k
    _index_map_initialized = true;
87
1.00k
}
88
89
Status PackedFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer,
90
1.00k
                                          const FileWriterOptions* opts) {
91
    // Create file using inner file system
92
1.00k
    FileWriterPtr inner_writer;
93
1.00k
    RETURN_IF_ERROR(_inner_fs->create_file(file, &inner_writer, opts));
94
95
1.00k
    if (!should_use_packed_writer(file.filename().native())) {
96
2
        *writer = std::move(inner_writer);
97
2
        return Status::OK();
98
2
    }
99
100
1.00k
    auto append_info = _append_info;
101
1.00k
    if (config::enable_file_cache_write_index_file_only && opts != nullptr) {
102
0
        append_info.write_file_cache = opts->write_file_cache;
103
0
    }
104
105
    // Wrap with PackedFileWriter
106
1.00k
    *writer = std::make_unique<PackedFileWriter>(std::move(inner_writer), file, append_info);
107
1.00k
    return Status::OK();
108
1.00k
}
109
110
Status PackedFileSystem::open_file_impl(const Path& file, FileReaderSPtr* reader,
111
1.00k
                                        const FileReaderOptions* opts) {
112
    // Check if this file is in a packed file
113
1.00k
    std::string file_path = file.native();
114
1.00k
    auto it = _index_map.find(file_path);
115
1.00k
    PackedSliceLocation manager_location;
116
1.00k
    const PackedSliceLocation* packed_location = nullptr;
117
1.00k
    if (it != _index_map.end()) {
118
1.00k
        packed_location = &it->second;
119
1.00k
    } else if (PackedFileManager::instance()
120
1
                       ->get_packed_slice_location(file_path, &manager_location)
121
1
                       .ok()) {
122
0
        packed_location = &manager_location;
123
0
    }
124
125
1.00k
    if (packed_location != nullptr) {
126
        // File is in packed file, open packed file and wrap with PackedFileReader
127
1.00k
        const auto& index = *packed_location;
128
1.00k
        FileReaderSPtr inner_reader;
129
130
        // Create options for opening the packed file
131
        // Disable cache at this layer - we'll add cache wrapper around PackedFileReader instead
132
        // This ensures cache key is based on segment path, not packed file path
133
1.00k
        FileReaderOptions inner_opts = opts ? *opts : FileReaderOptions();
134
1.00k
        inner_opts.file_size = index.packed_file_size;
135
1.00k
        inner_opts.cache_type = FileCachePolicy::NO_CACHE;
136
137
1.00k
        VLOG_DEBUG << "open packed file: " << index.packed_file_path << ", file: " << file.native()
138
0
                   << ", offset: " << index.offset << ", size: " << index.size
139
0
                   << ", packed_file_size: " << index.packed_file_size;
140
1.00k
        RETURN_IF_ERROR(
141
1.00k
                _inner_fs->open_file(Path(index.packed_file_path), &inner_reader, &inner_opts));
142
143
        // Create PackedFileReader with segment path
144
        // PackedFileReader.path() returns segment path, not packed file path
145
1.00k
        auto packed_reader = std::make_shared<PackedFileReader>(std::move(inner_reader), file,
146
1.00k
                                                                index.offset, index.size);
147
148
        // If cache is requested, wrap PackedFileReader with CachedRemoteFileReader
149
        // This ensures:
150
        // 1. Cache key = hash(segment_path.filename()) - matches cleanup key
151
        // 2. Cache size = segment size - correct boundary
152
        // 3. Each segment has independent cache entry - no interference during cleanup
153
1.00k
        if (opts && opts->cache_type != FileCachePolicy::NO_CACHE) {
154
1.00k
            FileReaderOptions cache_opts = *opts;
155
1.00k
            cache_opts.file_size = index.size; // Use segment size for cache
156
1.00k
            *reader = DORIS_TRY(create_cached_file_reader(packed_reader, cache_opts));
157
1.00k
        } else {
158
1
            *reader = packed_reader;
159
1
        }
160
1.00k
    } else {
161
1
        RETURN_IF_ERROR(_inner_fs->open_file(file, reader, opts));
162
1
    }
163
1.00k
    return Status::OK();
164
1.00k
}
165
166
4
Status PackedFileSystem::exists_impl(const Path& path, bool* res) const {
167
4
    VLOG_DEBUG << "packed file system exist, rowset id " << _append_info.rowset_id;
168
4
    if (!_index_map_initialized) {
169
1
        return Status::InternalError("PackedFileSystem index map is not initialized");
170
1
    }
171
3
    const auto it = _index_map.find(path.native());
172
3
    if (it != _index_map.end()) {
173
1
        return _inner_fs->exists(Path(it->second.packed_file_path), res);
174
1
    }
175
2
    return _inner_fs->exists(path, res);
176
3
}
177
178
5
Status PackedFileSystem::file_size_impl(const Path& file, int64_t* file_size) const {
179
5
    if (!_index_map_initialized) {
180
1
        return Status::InternalError("PackedFileSystem index map is not initialized");
181
1
    }
182
4
    const auto it = _index_map.find(file.native());
183
4
    if (it != _index_map.end()) {
184
3
        *file_size = it->second.size;
185
3
        return Status::OK();
186
3
    }
187
1
    return _inner_fs->file_size(file, file_size);
188
4
}
189
190
} // namespace doris::io