Coverage Report

Created: 2026-04-16 06:41

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/packed_file_system.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/packed_file_system.h"
19
20
#include <string_view>
21
#include <utility>
22
23
#include "common/status.h"
24
#include "io/fs/file_reader.h"
25
#include "io/fs/packed_file_reader.h"
26
#include "io/fs/packed_file_writer.h"
27
28
namespace doris::io {
29
30
namespace {
31
32
// Only keep packed-file aggregation for the first segment in a rowset.
33
// The path handled here is expected to look like:
34
//   local/remote segment file: .../{rowset_id}_{segment_id}.dat
35
//   local/remote index file:   .../{rowset_id}_{segment_id}.idx
36
// The .idx case here is V2 only. V1 inverted-index tablets are filtered before
37
// PackedFileSystem is enabled, so V1 names like
38
// {rowset_id}_{segment_id}_{index_id}@{suffix}.idx never reach this helper.
39
// Multi-segment rowsets usually come from large loads or memory-pressure flushes,
40
// and continuing to buffer later segments in packed files can amplify memory usage.
41
// Non-rowset file names keep the legacy behavior to avoid changing unrelated callers.
42
62.3k
bool should_use_packed_writer(std::string_view file_name) {
43
62.3k
    constexpr std::string_view kSegmentSuffix = ".dat";
44
62.3k
    constexpr std::string_view kIndexSuffix = ".idx";
45
46
62.3k
    size_t suffix_len = 0;
47
62.3k
    if (file_name.ends_with(kSegmentSuffix)) {
48
55.6k
        suffix_len = kSegmentSuffix.size();
49
55.6k
    } else if (file_name.ends_with(kIndexSuffix)) {
50
5.36k
        suffix_len = kIndexSuffix.size();
51
5.36k
    } else {
52
1.34k
        return true;
53
1.34k
    }
54
55
61.0k
    file_name.remove_suffix(suffix_len);
56
61.0k
    size_t pos = file_name.rfind('_');
57
61.2k
    if (pos == std::string_view::npos || pos + 1 >= file_name.size()) {
58
0
        return true;
59
0
    }
60
61
61.0k
    return file_name.substr(pos + 1) == "0";
62
61.0k
}
63
64
} // namespace
65
66
PackedFileSystem::PackedFileSystem(FileSystemSPtr inner_fs, PackedAppendContext append_info)
67
57.1k
        : FileSystem(inner_fs->id(), inner_fs->type()),
68
57.1k
          _inner_fs(std::move(inner_fs)),
69
57.1k
          _append_info(std::move(append_info)) {
70
57.1k
    if (_append_info.resource_id.empty() && _inner_fs != nullptr) {
71
56.1k
        _append_info.resource_id = _inner_fs->id();
72
56.1k
    }
73
57.1k
}
74
75
PackedFileSystem::PackedFileSystem(FileSystemSPtr inner_fs,
76
                                   std::unordered_map<std::string, PackedSliceLocation> index_map,
77
                                   PackedAppendContext append_info)
78
200k
        : FileSystem(inner_fs->id(), inner_fs->type()),
79
200k
          _inner_fs(std::move(inner_fs)),
80
200k
          _index_map(std::move(index_map)),
81
200k
          _append_info(std::move(append_info)) {
82
201k
    if (_append_info.resource_id.empty() && _inner_fs != nullptr) {
83
201k
        _append_info.resource_id = _inner_fs->id();
84
201k
    }
85
200k
    _index_map_initialized = true;
86
200k
}
87
88
Status PackedFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer,
89
62.3k
                                          const FileWriterOptions* opts) {
90
    // Create file using inner file system
91
62.3k
    FileWriterPtr inner_writer;
92
62.3k
    RETURN_IF_ERROR(_inner_fs->create_file(file, &inner_writer, opts));
93
94
62.3k
    if (!should_use_packed_writer(file.filename().native())) {
95
2
        *writer = std::move(inner_writer);
96
2
        return Status::OK();
97
2
    }
98
99
    // Wrap with PackedFileWriter
100
62.3k
    *writer = std::make_unique<PackedFileWriter>(std::move(inner_writer), file, _append_info);
101
62.3k
    return Status::OK();
102
62.3k
}
103
104
Status PackedFileSystem::open_file_impl(const Path& file, FileReaderSPtr* reader,
105
209k
                                        const FileReaderOptions* opts) {
106
    // Check if this file is in a packed file
107
209k
    std::string file_path = file.native();
108
209k
    auto it = _index_map.find(file_path);
109
209k
    bool is_packed_file = (it != _index_map.end());
110
111
209k
    if (is_packed_file) {
112
        // File is in packed file, open packed file and wrap with PackedFileReader
113
203k
        const auto& index = it->second;
114
203k
        FileReaderSPtr inner_reader;
115
116
        // Create options for opening the packed file
117
        // Disable cache at this layer - we'll add cache wrapper around PackedFileReader instead
118
        // This ensures cache key is based on segment path, not packed file path
119
18.4E
        FileReaderOptions inner_opts = opts ? *opts : FileReaderOptions();
120
203k
        inner_opts.file_size = index.packed_file_size;
121
203k
        inner_opts.cache_type = FileCachePolicy::NO_CACHE;
122
123
203k
        VLOG_DEBUG << "open packed file: " << index.packed_file_path << ", file: " << file.native()
124
595
                   << ", offset: " << index.offset << ", size: " << index.size
125
595
                   << ", packed_file_size: " << index.packed_file_size;
126
203k
        RETURN_IF_ERROR(
127
203k
                _inner_fs->open_file(Path(index.packed_file_path), &inner_reader, &inner_opts));
128
129
        // Create PackedFileReader with segment path
130
        // PackedFileReader.path() returns segment path, not packed file path
131
203k
        auto packed_reader = std::make_shared<PackedFileReader>(std::move(inner_reader), file,
132
203k
                                                                index.offset, index.size);
133
134
        // If cache is requested, wrap PackedFileReader with CachedRemoteFileReader
135
        // This ensures:
136
        // 1. Cache key = hash(segment_path.filename()) - matches cleanup key
137
        // 2. Cache size = segment size - correct boundary
138
        // 3. Each segment has independent cache entry - no interference during cleanup
139
203k
        if (opts && opts->cache_type != FileCachePolicy::NO_CACHE) {
140
203k
            FileReaderOptions cache_opts = *opts;
141
203k
            cache_opts.file_size = index.size; // Use segment size for cache
142
203k
            *reader = DORIS_TRY(create_cached_file_reader(packed_reader, cache_opts));
143
18.4E
        } else {
144
18.4E
            *reader = packed_reader;
145
18.4E
        }
146
203k
    } else {
147
5.18k
        RETURN_IF_ERROR(_inner_fs->open_file(file, reader, opts));
148
5.18k
    }
149
203k
    return Status::OK();
150
209k
}
151
152
4
Status PackedFileSystem::exists_impl(const Path& path, bool* res) const {
153
4
    VLOG_DEBUG << "packed file system exist, rowset id " << _append_info.rowset_id;
154
4
    if (!_index_map_initialized) {
155
1
        return Status::InternalError("PackedFileSystem index map is not initialized");
156
1
    }
157
3
    const auto it = _index_map.find(path.native());
158
3
    if (it != _index_map.end()) {
159
1
        return _inner_fs->exists(Path(it->second.packed_file_path), res);
160
1
    }
161
2
    return _inner_fs->exists(path, res);
162
3
}
163
164
67
Status PackedFileSystem::file_size_impl(const Path& file, int64_t* file_size) const {
165
67
    if (!_index_map_initialized) {
166
1
        return Status::InternalError("PackedFileSystem index map is not initialized");
167
1
    }
168
66
    const auto it = _index_map.find(file.native());
169
66
    if (it != _index_map.end()) {
170
65
        *file_size = it->second.size;
171
65
        return Status::OK();
172
65
    }
173
1
    return _inner_fs->file_size(file, file_size);
174
66
}
175
176
} // namespace doris::io