Coverage Report

Created: 2026-03-15 17:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/packed_file_system.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/packed_file_system.h"
19
20
#include <utility>
21
22
#include "common/status.h"
23
#include "io/fs/file_reader.h"
24
#include "io/fs/packed_file_reader.h"
25
#include "io/fs/packed_file_writer.h"
26
27
namespace doris::io {
28
29
PackedFileSystem::PackedFileSystem(FileSystemSPtr inner_fs, PackedAppendContext append_info)
30
1.01k
        : FileSystem(inner_fs->id(), inner_fs->type()),
31
1.01k
          _inner_fs(std::move(inner_fs)),
32
1.01k
          _append_info(std::move(append_info)) {
33
1.01k
    if (_append_info.resource_id.empty() && _inner_fs != nullptr) {
34
0
        _append_info.resource_id = _inner_fs->id();
35
0
    }
36
1.01k
}
37
38
PackedFileSystem::PackedFileSystem(FileSystemSPtr inner_fs,
39
                                   std::unordered_map<std::string, PackedSliceLocation> index_map,
40
                                   PackedAppendContext append_info)
41
1.00k
        : FileSystem(inner_fs->id(), inner_fs->type()),
42
1.00k
          _inner_fs(std::move(inner_fs)),
43
1.00k
          _index_map(std::move(index_map)),
44
1.00k
          _append_info(std::move(append_info)) {
45
1.00k
    if (_append_info.resource_id.empty() && _inner_fs != nullptr) {
46
0
        _append_info.resource_id = _inner_fs->id();
47
0
    }
48
1.00k
    _index_map_initialized = true;
49
1.00k
}
50
51
Status PackedFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer,
52
1.00k
                                          const FileWriterOptions* opts) {
53
    // Create file using inner file system
54
1.00k
    FileWriterPtr inner_writer;
55
1.00k
    RETURN_IF_ERROR(_inner_fs->create_file(file, &inner_writer, opts));
56
57
    // Wrap with PackedFileWriter
58
1.00k
    *writer = std::make_unique<PackedFileWriter>(std::move(inner_writer), file, _append_info);
59
1.00k
    return Status::OK();
60
1.00k
}
61
62
Status PackedFileSystem::open_file_impl(const Path& file, FileReaderSPtr* reader,
63
1.00k
                                        const FileReaderOptions* opts) {
64
    // Check if this file is in a packed file
65
1.00k
    std::string file_path = file.native();
66
1.00k
    auto it = _index_map.find(file_path);
67
1.00k
    bool is_packed_file = (it != _index_map.end());
68
69
1.00k
    if (is_packed_file) {
70
        // File is in packed file, open packed file and wrap with PackedFileReader
71
1.00k
        const auto& index = it->second;
72
1.00k
        FileReaderSPtr inner_reader;
73
74
        // Create options for opening the packed file
75
        // Disable cache at this layer - we'll add cache wrapper around PackedFileReader instead
76
        // This ensures cache key is based on segment path, not packed file path
77
1.00k
        FileReaderOptions inner_opts = opts ? *opts : FileReaderOptions();
78
1.00k
        inner_opts.file_size = index.packed_file_size;
79
1.00k
        inner_opts.cache_type = FileCachePolicy::NO_CACHE;
80
81
18.4E
        VLOG_DEBUG << "open packed file: " << index.packed_file_path << ", file: " << file.native()
82
18.4E
                   << ", offset: " << index.offset << ", size: " << index.size
83
18.4E
                   << ", packed_file_size: " << index.packed_file_size;
84
1.00k
        RETURN_IF_ERROR(
85
1.00k
                _inner_fs->open_file(Path(index.packed_file_path), &inner_reader, &inner_opts));
86
87
        // Create PackedFileReader with segment path
88
        // PackedFileReader.path() returns segment path, not packed file path
89
1.00k
        auto packed_reader = std::make_shared<PackedFileReader>(std::move(inner_reader), file,
90
1.00k
                                                                index.offset, index.size);
91
92
        // If cache is requested, wrap PackedFileReader with CachedRemoteFileReader
93
        // This ensures:
94
        // 1. Cache key = hash(segment_path.filename()) - matches cleanup key
95
        // 2. Cache size = segment size - correct boundary
96
        // 3. Each segment has independent cache entry - no interference during cleanup
97
1.00k
        if (opts && opts->cache_type != FileCachePolicy::NO_CACHE) {
98
1.00k
            FileReaderOptions cache_opts = *opts;
99
1.00k
            cache_opts.file_size = index.size; // Use segment size for cache
100
1.00k
            *reader = DORIS_TRY(create_cached_file_reader(packed_reader, cache_opts));
101
1.00k
        } else {
102
0
            *reader = packed_reader;
103
0
        }
104
1.00k
    } else {
105
2
        RETURN_IF_ERROR(_inner_fs->open_file(file, reader, opts));
106
2
    }
107
1.00k
    return Status::OK();
108
1.00k
}
109
110
4
Status PackedFileSystem::exists_impl(const Path& path, bool* res) const {
111
4
    VLOG_DEBUG << "packed file system exist, rowset id " << _append_info.rowset_id;
112
4
    if (!_index_map_initialized) {
113
1
        return Status::InternalError("PackedFileSystem index map is not initialized");
114
1
    }
115
3
    const auto it = _index_map.find(path.native());
116
3
    if (it != _index_map.end()) {
117
1
        return _inner_fs->exists(Path(it->second.packed_file_path), res);
118
1
    }
119
2
    return _inner_fs->exists(path, res);
120
3
}
121
122
5
Status PackedFileSystem::file_size_impl(const Path& file, int64_t* file_size) const {
123
5
    if (!_index_map_initialized) {
124
1
        return Status::InternalError("PackedFileSystem index map is not initialized");
125
1
    }
126
4
    const auto it = _index_map.find(file.native());
127
4
    if (it != _index_map.end()) {
128
3
        *file_size = it->second.size;
129
3
        return Status::OK();
130
3
    }
131
1
    return _inner_fs->file_size(file, file_size);
132
4
}
133
134
} // namespace doris::io