Coverage Report

Created: 2026-04-16 17:24

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/packed_file_system.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/packed_file_system.h"
19
20
#include <string_view>
21
#include <utility>
22
23
#include "common/status.h"
24
#include "io/fs/file_reader.h"
25
#include "io/fs/packed_file_reader.h"
26
#include "io/fs/packed_file_writer.h"
27
28
namespace doris::io {
29
30
namespace {
31
32
// Only keep packed-file aggregation for the first segment in a rowset.
33
// The path handled here is expected to look like:
34
//   local/remote segment file: .../{rowset_id}_{segment_id}.dat
35
//   local/remote index file:   .../{rowset_id}_{segment_id}.idx
36
// The .idx case here is V2 only. V1 inverted-index tablets are filtered before
37
// PackedFileSystem is enabled, so V1 names like
38
// {rowset_id}_{segment_id}_{index_id}@{suffix}.idx never reach this helper.
39
// Multi-segment rowsets usually come from large loads or memory-pressure flushes,
40
// and continuing to buffer later segments in packed files can amplify memory usage.
41
// Non-rowset file names keep the legacy behavior to avoid changing unrelated callers.
42
1.00k
bool should_use_packed_writer(std::string_view file_name) {
43
1.00k
    constexpr std::string_view kSegmentSuffix = ".dat";
44
1.00k
    constexpr std::string_view kIndexSuffix = ".idx";
45
46
1.00k
    size_t suffix_len = 0;
47
1.00k
    if (file_name.ends_with(kSegmentSuffix)) {
48
2
        suffix_len = kSegmentSuffix.size();
49
1.00k
    } else if (file_name.ends_with(kIndexSuffix)) {
50
1
        suffix_len = kIndexSuffix.size();
51
1.00k
    } else {
52
1.00k
        return true;
53
1.00k
    }
54
55
3
    file_name.remove_suffix(suffix_len);
56
3
    size_t pos = file_name.rfind('_');
57
3
    if (pos == std::string_view::npos || pos + 1 >= file_name.size()) {
58
0
        return true;
59
0
    }
60
61
3
    return file_name.substr(pos + 1) == "0";
62
3
}
63
64
} // namespace
65
66
PackedFileSystem::PackedFileSystem(FileSystemSPtr inner_fs, PackedAppendContext append_info)
67
1.01k
        : FileSystem(inner_fs->id(), inner_fs->type()),
68
1.01k
          _inner_fs(std::move(inner_fs)),
69
1.01k
          _append_info(std::move(append_info)) {
70
1.01k
    if (_append_info.resource_id.empty() && _inner_fs != nullptr) {
71
0
        _append_info.resource_id = _inner_fs->id();
72
0
    }
73
1.01k
}
74
75
PackedFileSystem::PackedFileSystem(FileSystemSPtr inner_fs,
76
                                   std::unordered_map<std::string, PackedSliceLocation> index_map,
77
                                   PackedAppendContext append_info)
78
1.00k
        : FileSystem(inner_fs->id(), inner_fs->type()),
79
1.00k
          _inner_fs(std::move(inner_fs)),
80
1.00k
          _index_map(std::move(index_map)),
81
1.00k
          _append_info(std::move(append_info)) {
82
1.00k
    if (_append_info.resource_id.empty() && _inner_fs != nullptr) {
83
0
        _append_info.resource_id = _inner_fs->id();
84
0
    }
85
1.00k
    _index_map_initialized = true;
86
1.00k
}
87
88
Status PackedFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer,
89
1.00k
                                          const FileWriterOptions* opts) {
90
    // Create file using inner file system
91
1.00k
    FileWriterPtr inner_writer;
92
1.00k
    RETURN_IF_ERROR(_inner_fs->create_file(file, &inner_writer, opts));
93
94
1.00k
    if (!should_use_packed_writer(file.filename().native())) {
95
2
        *writer = std::move(inner_writer);
96
2
        return Status::OK();
97
2
    }
98
99
    // Wrap with PackedFileWriter
100
1.00k
    *writer = std::make_unique<PackedFileWriter>(std::move(inner_writer), file, _append_info);
101
1.00k
    return Status::OK();
102
1.00k
}
103
104
Status PackedFileSystem::open_file_impl(const Path& file, FileReaderSPtr* reader,
105
1.00k
                                        const FileReaderOptions* opts) {
106
    // Check if this file is in a packed file
107
1.00k
    std::string file_path = file.native();
108
1.00k
    auto it = _index_map.find(file_path);
109
1.00k
    bool is_packed_file = (it != _index_map.end());
110
111
1.00k
    if (is_packed_file) {
112
        // File is in packed file, open packed file and wrap with PackedFileReader
113
1.00k
        const auto& index = it->second;
114
1.00k
        FileReaderSPtr inner_reader;
115
116
        // Create options for opening the packed file
117
        // Disable cache at this layer - we'll add cache wrapper around PackedFileReader instead
118
        // This ensures cache key is based on segment path, not packed file path
119
1.00k
        FileReaderOptions inner_opts = opts ? *opts : FileReaderOptions();
120
1.00k
        inner_opts.file_size = index.packed_file_size;
121
1.00k
        inner_opts.cache_type = FileCachePolicy::NO_CACHE;
122
123
18.4E
        VLOG_DEBUG << "open packed file: " << index.packed_file_path << ", file: " << file.native()
124
18.4E
                   << ", offset: " << index.offset << ", size: " << index.size
125
18.4E
                   << ", packed_file_size: " << index.packed_file_size;
126
1.00k
        RETURN_IF_ERROR(
127
1.00k
                _inner_fs->open_file(Path(index.packed_file_path), &inner_reader, &inner_opts));
128
129
        // Create PackedFileReader with segment path
130
        // PackedFileReader.path() returns segment path, not packed file path
131
1.00k
        auto packed_reader = std::make_shared<PackedFileReader>(std::move(inner_reader), file,
132
1.00k
                                                                index.offset, index.size);
133
134
        // If cache is requested, wrap PackedFileReader with CachedRemoteFileReader
135
        // This ensures:
136
        // 1. Cache key = hash(segment_path.filename()) - matches cleanup key
137
        // 2. Cache size = segment size - correct boundary
138
        // 3. Each segment has independent cache entry - no interference during cleanup
139
1.00k
        if (opts && opts->cache_type != FileCachePolicy::NO_CACHE) {
140
1.00k
            FileReaderOptions cache_opts = *opts;
141
1.00k
            cache_opts.file_size = index.size; // Use segment size for cache
142
1.00k
            *reader = DORIS_TRY(create_cached_file_reader(packed_reader, cache_opts));
143
1.00k
        } else {
144
0
            *reader = packed_reader;
145
0
        }
146
1.00k
    } else {
147
2
        RETURN_IF_ERROR(_inner_fs->open_file(file, reader, opts));
148
2
    }
149
1.00k
    return Status::OK();
150
1.00k
}
151
152
4
Status PackedFileSystem::exists_impl(const Path& path, bool* res) const {
153
4
    VLOG_DEBUG << "packed file system exist, rowset id " << _append_info.rowset_id;
154
4
    if (!_index_map_initialized) {
155
1
        return Status::InternalError("PackedFileSystem index map is not initialized");
156
1
    }
157
3
    const auto it = _index_map.find(path.native());
158
3
    if (it != _index_map.end()) {
159
1
        return _inner_fs->exists(Path(it->second.packed_file_path), res);
160
1
    }
161
2
    return _inner_fs->exists(path, res);
162
3
}
163
164
5
Status PackedFileSystem::file_size_impl(const Path& file, int64_t* file_size) const {
165
5
    if (!_index_map_initialized) {
166
1
        return Status::InternalError("PackedFileSystem index map is not initialized");
167
1
    }
168
4
    const auto it = _index_map.find(file.native());
169
4
    if (it != _index_map.end()) {
170
3
        *file_size = it->second.size;
171
3
        return Status::OK();
172
3
    }
173
1
    return _inner_fs->file_size(file, file_size);
174
4
}
175
176
} // namespace doris::io