Coverage Report

Created: 2026-03-12 17:15

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/packed_file_manager.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <butil/macros.h>
21
#include <gen_cpp/cloud.pb.h>
22
#include <glog/logging.h>
23
24
#include <atomic>
25
#include <chrono>
26
#include <condition_variable>
27
#include <map>
28
#include <memory>
29
#include <mutex>
30
#include <optional>
31
#include <string>
32
#include <thread>
33
#include <unordered_map>
34
#include <vector>
35
36
#include "common/status.h"
37
#include "io/fs/file_system.h"
38
#include "io/fs/file_writer.h"
39
#include "io/fs/path.h"
40
#include "util/slice.h"
41
42
namespace doris::io {
43
44
struct PackedSliceLocation {
45
    std::string packed_file_path;
46
    int64_t offset;
47
    int64_t size;
48
    int64_t create_time = 0;
49
    int64_t tablet_id = 0;
50
    std::string rowset_id;
51
    std::string resource_id;
52
    int64_t txn_id = 0;
53
    int64_t packed_file_size = -1; // Total size of the packed file, -1 means not set
54
};
55
56
struct PackedAppendContext {
57
    std::string resource_id;
58
    int64_t tablet_id = 0;
59
    std::string rowset_id;
60
    int64_t txn_id = 0;
61
    uint64_t expiration_time = 0; // TTL expiration time in seconds since epoch, 0 means no TTL
62
    bool write_file_cache = true; // Whether to write data to file cache
63
};
64
65
// Global object that manages packing small files into larger files for S3 optimization
66
class PackedFileManager {
67
    struct PackedFileContext;
68
69
public:
70
    static PackedFileManager* instance();
71
72
    // Initialize manager state; file system will be resolved lazily
73
    Status init();
74
75
    // Write a small file to the current packed file
76
    Status append_small_file(const std::string& path, const Slice& data,
77
                             const PackedAppendContext& info);
78
79
    // Block until the small file's packed file is uploaded to S3
80
    Status wait_upload_done(const std::string& path);
81
82
    // Get packed file index information for a small file
83
    Status get_packed_slice_location(const std::string& path, PackedSliceLocation* location);
84
85
    // Start the background management thread
86
    void start_background_manager();
87
88
    // Stop the background management thread
89
    void stop_background_manager();
90
91
    // Mark current packed file for upload and create new one
92
    Status mark_current_packed_file_for_upload(const std::string& resource_id);
93
94
    // Internal helper; expects caller holds _current_packed_file_mutex
95
    Status mark_current_packed_file_for_upload_locked(const std::string& resource_id);
96
97
    void record_packed_file_metrics(const PackedFileContext& packed_file);
98
99
private:
100
37
    PackedFileManager() = default;
101
    ~PackedFileManager();
102
103
    DISALLOW_COPY_AND_ASSIGN(PackedFileManager);
104
105
    // Background thread function for managing packed file lifecycle
106
    void background_manager();
107
108
    // Upload packed file to S3 and update meta service
109
    Status finalize_packed_file_upload(const std::string& packed_file_path, FileWriter* writer);
110
111
    // Update meta service with packed file information
112
    Status update_meta_service(const std::string& packed_file_path,
113
                               const cloud::PackedFileInfoPB& packed_file_info);
114
115
    // Process uploading files
116
    void process_uploading_packed_files();
117
118
    // Clean up expired data
119
    void cleanup_expired_data();
120
121
    // Internal structure to track packed file state
122
    enum class PackedFileState {
123
        INIT,            // Initial state, no files written yet
124
        ACTIVE,          // Has files but doesn't meet upload conditions
125
        READY_TO_UPLOAD, // Ready for upload, metadata still being prepared
126
        UPLOADING,       // Upload triggered, waiting for writer close to finish
127
        UPLOADED,        // Upload completed
128
        FAILED           // Upload failed
129
    };
130
131
    struct PackedFileContext {
132
        std::string packed_file_path;
133
        std::unique_ptr<FileWriter> writer;
134
        std::unordered_map<std::string, PackedSliceLocation> slice_locations;
135
        int64_t current_offset = 0;
136
        int64_t total_size = 0;
137
        int64_t create_time;
138
        int64_t upload_time = 0;
139
        std::chrono::steady_clock::time_point create_timestamp;
140
        std::optional<std::chrono::steady_clock::time_point> first_append_timestamp;
141
        std::optional<std::chrono::steady_clock::time_point> ready_to_upload_timestamp;
142
        std::optional<std::chrono::steady_clock::time_point> uploading_timestamp;
143
        std::atomic<PackedFileState> state {PackedFileState::INIT};
144
        std::condition_variable upload_cv;
145
        std::mutex upload_mutex;
146
        std::string last_error;
147
        std::string resource_id;
148
        FileSystemSPtr file_system;
149
    };
150
151
    // Create a new packed file state with file writer
152
    Status create_new_packed_file_context(const std::string& resource_id,
153
                                          std::unique_ptr<PackedFileContext>& packed_file_ctx);
154
155
    Status ensure_file_system(const std::string& resource_id, FileSystemSPtr* file_system);
156
157
    // Helper function to wait for packed file upload completion
158
    Status wait_for_packed_file_upload(PackedFileContext* packed_file_ptr);
159
160
    // Thread management
161
    std::atomic<bool> _stop_background_thread {false};
162
    std::unique_ptr<std::thread> _background_thread;
163
164
    // File system
165
    FileSystemSPtr _default_file_system;
166
    std::unordered_map<std::string, FileSystemSPtr> _file_systems;
167
    std::mutex _file_system_mutex;
168
169
    // Current active packed file
170
    std::unordered_map<std::string, std::unique_ptr<PackedFileContext>> _current_packed_files;
171
    std::timed_mutex _current_packed_file_mutex;
172
173
    // Merge files ready for upload or being processed
174
    std::unordered_map<std::string, std::shared_ptr<PackedFileContext>> _uploading_packed_files;
175
176
    // Uploaded packed files (kept for some time for wait_write_done)
177
    std::unordered_map<std::string, std::shared_ptr<PackedFileContext>> _uploaded_packed_files;
178
    std::mutex _packed_files_mutex;
179
180
    // Global index mapping small file path to packed file index
181
    std::unordered_map<std::string, PackedSliceLocation> _global_slice_locations;
182
    std::mutex _global_index_mutex;
183
184
#ifdef BE_TEST
185
public:
186
    void reset_packed_file_bvars_for_test() const;
187
    int64_t packed_file_total_count_for_test() const;
188
    int64_t packed_file_total_small_file_num_for_test() const;
189
    int64_t packed_file_total_size_bytes_for_test() const;
190
    double packed_file_avg_small_file_num_for_test() const;
191
    double packed_file_avg_file_size_for_test() const;
192
    void record_packed_file_metrics_for_test(const PackedFileContext* packed_file);
193
194
    // Test-only helpers to introspect/clear internal state
195
    void clear_state_for_test();
196
    auto& current_packed_files_for_test() { return _current_packed_files; }
197
    auto& uploading_packed_files_for_test() { return _uploading_packed_files; }
198
    auto& uploaded_packed_files_for_test() { return _uploaded_packed_files; }
199
    auto& global_slice_locations_for_test() { return _global_slice_locations; }
200
    auto& file_systems_for_test() { return _file_systems; }
201
    FileSystemSPtr& default_file_system_for_test() { return _default_file_system; }
202
    Status create_new_packed_file_state_for_test(const std::string& resource_id,
203
                                                 std::unique_ptr<PackedFileContext>& ctx) {
204
        return create_new_packed_file_context(resource_id, ctx);
205
    }
206
#endif
207
};
208
209
} // namespace doris::io