be/src/io/fs/packed_file_manager.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <butil/macros.h> |
21 | | #include <gen_cpp/cloud.pb.h> |
22 | | #include <glog/logging.h> |
23 | | |
24 | | #include <atomic> |
25 | | #include <chrono> |
26 | | #include <condition_variable> |
27 | | #include <map> |
28 | | #include <memory> |
29 | | #include <mutex> |
30 | | #include <optional> |
31 | | #include <string> |
32 | | #include <thread> |
33 | | #include <unordered_map> |
34 | | #include <vector> |
35 | | |
36 | | #include "common/status.h" |
37 | | #include "io/fs/file_system.h" |
38 | | #include "io/fs/file_writer.h" |
39 | | #include "io/fs/path.h" |
40 | | #include "util/slice.h" |
41 | | |
42 | | namespace doris::io { |
43 | | |
44 | | struct PackedSliceLocation { |
45 | | std::string packed_file_path; |
46 | | int64_t offset; |
47 | | int64_t size; |
48 | | int64_t create_time = 0; |
49 | | int64_t tablet_id = 0; |
50 | | std::string rowset_id; |
51 | | std::string resource_id; |
52 | | int64_t txn_id = 0; |
53 | | int64_t packed_file_size = -1; // Total size of the packed file, -1 means not set |
54 | | }; |
55 | | |
56 | | struct PackedAppendContext { |
57 | | std::string resource_id; |
58 | | int64_t tablet_id = 0; |
59 | | std::string rowset_id; |
60 | | int64_t txn_id = 0; |
61 | | uint64_t expiration_time = 0; // TTL expiration time in seconds since epoch, 0 means no TTL |
62 | | bool write_file_cache = true; // Whether to write data to file cache |
63 | | }; |
64 | | |
65 | | // Global object that manages packing small files into larger files for S3 optimization |
66 | | class PackedFileManager { |
67 | | struct PackedFileContext; |
68 | | |
69 | | public: |
70 | | static PackedFileManager* instance(); |
71 | | |
72 | | // Initialize manager state; file system will be resolved lazily |
73 | | Status init(); |
74 | | |
75 | | // Write a small file to the current packed file |
76 | | Status append_small_file(const std::string& path, const Slice& data, |
77 | | const PackedAppendContext& info); |
78 | | |
79 | | // Block until the small file's packed file is uploaded to S3 |
80 | | Status wait_upload_done(const std::string& path); |
81 | | |
82 | | // Get packed file index information for a small file |
83 | | Status get_packed_slice_location(const std::string& path, PackedSliceLocation* location); |
84 | | |
85 | | // Start the background management thread |
86 | | void start_background_manager(); |
87 | | |
88 | | // Stop the background management thread |
89 | | void stop_background_manager(); |
90 | | |
91 | | // Mark current packed file for upload and create new one |
92 | | Status mark_current_packed_file_for_upload(const std::string& resource_id); |
93 | | |
94 | | // Internal helper; expects caller holds _current_packed_file_mutex |
95 | | Status mark_current_packed_file_for_upload_locked(const std::string& resource_id); |
96 | | |
97 | | void record_packed_file_metrics(const PackedFileContext& packed_file); |
98 | | |
99 | | private: |
100 | 37 | PackedFileManager() = default; |
101 | | ~PackedFileManager(); |
102 | | |
103 | | DISALLOW_COPY_AND_ASSIGN(PackedFileManager); |
104 | | |
105 | | // Background thread function for managing packed file lifecycle |
106 | | void background_manager(); |
107 | | |
108 | | // Upload packed file to S3 and update meta service |
109 | | Status finalize_packed_file_upload(const std::string& packed_file_path, FileWriter* writer); |
110 | | |
111 | | // Update meta service with packed file information |
112 | | Status update_meta_service(const std::string& packed_file_path, |
113 | | const cloud::PackedFileInfoPB& packed_file_info); |
114 | | |
115 | | // Process uploading files |
116 | | void process_uploading_packed_files(); |
117 | | |
118 | | // Clean up expired data |
119 | | void cleanup_expired_data(); |
120 | | |
121 | | // Internal structure to track packed file state |
122 | | enum class PackedFileState { |
123 | | INIT, // Initial state, no files written yet |
124 | | ACTIVE, // Has files but doesn't meet upload conditions |
125 | | READY_TO_UPLOAD, // Ready for upload, metadata still being prepared |
126 | | UPLOADING, // Upload triggered, waiting for writer close to finish |
127 | | UPLOADED, // Upload completed |
128 | | FAILED // Upload failed |
129 | | }; |
130 | | |
131 | | struct PackedFileContext { |
132 | | std::string packed_file_path; |
133 | | std::unique_ptr<FileWriter> writer; |
134 | | std::unordered_map<std::string, PackedSliceLocation> slice_locations; |
135 | | int64_t current_offset = 0; |
136 | | int64_t total_size = 0; |
137 | | int64_t create_time; |
138 | | int64_t upload_time = 0; |
139 | | std::chrono::steady_clock::time_point create_timestamp; |
140 | | std::optional<std::chrono::steady_clock::time_point> first_append_timestamp; |
141 | | std::optional<std::chrono::steady_clock::time_point> ready_to_upload_timestamp; |
142 | | std::optional<std::chrono::steady_clock::time_point> uploading_timestamp; |
143 | | std::atomic<PackedFileState> state {PackedFileState::INIT}; |
144 | | std::condition_variable upload_cv; |
145 | | std::mutex upload_mutex; |
146 | | std::string last_error; |
147 | | std::string resource_id; |
148 | | FileSystemSPtr file_system; |
149 | | }; |
150 | | |
151 | | // Create a new packed file state with file writer |
152 | | Status create_new_packed_file_context(const std::string& resource_id, |
153 | | std::unique_ptr<PackedFileContext>& packed_file_ctx); |
154 | | |
155 | | Status ensure_file_system(const std::string& resource_id, FileSystemSPtr* file_system); |
156 | | |
157 | | // Helper function to wait for packed file upload completion |
158 | | Status wait_for_packed_file_upload(PackedFileContext* packed_file_ptr); |
159 | | |
160 | | // Thread management |
161 | | std::atomic<bool> _stop_background_thread {false}; |
162 | | std::unique_ptr<std::thread> _background_thread; |
163 | | |
164 | | // File system |
165 | | FileSystemSPtr _default_file_system; |
166 | | std::unordered_map<std::string, FileSystemSPtr> _file_systems; |
167 | | std::mutex _file_system_mutex; |
168 | | |
169 | | // Current active packed file |
170 | | std::unordered_map<std::string, std::unique_ptr<PackedFileContext>> _current_packed_files; |
171 | | std::timed_mutex _current_packed_file_mutex; |
172 | | |
173 | | // Merge files ready for upload or being processed |
174 | | std::unordered_map<std::string, std::shared_ptr<PackedFileContext>> _uploading_packed_files; |
175 | | |
176 | | // Uploaded packed files (kept for some time for wait_write_done) |
177 | | std::unordered_map<std::string, std::shared_ptr<PackedFileContext>> _uploaded_packed_files; |
178 | | std::mutex _packed_files_mutex; |
179 | | |
180 | | // Global index mapping small file path to packed file index |
181 | | std::unordered_map<std::string, PackedSliceLocation> _global_slice_locations; |
182 | | std::mutex _global_index_mutex; |
183 | | |
184 | | #ifdef BE_TEST |
185 | | public: |
186 | | void reset_packed_file_bvars_for_test() const; |
187 | | int64_t packed_file_total_count_for_test() const; |
188 | | int64_t packed_file_total_small_file_num_for_test() const; |
189 | | int64_t packed_file_total_size_bytes_for_test() const; |
190 | | double packed_file_avg_small_file_num_for_test() const; |
191 | | double packed_file_avg_file_size_for_test() const; |
192 | | void record_packed_file_metrics_for_test(const PackedFileContext* packed_file); |
193 | | |
194 | | // Test-only helpers to introspect/clear internal state |
195 | | void clear_state_for_test(); |
196 | | auto& current_packed_files_for_test() { return _current_packed_files; } |
197 | | auto& uploading_packed_files_for_test() { return _uploading_packed_files; } |
198 | | auto& uploaded_packed_files_for_test() { return _uploaded_packed_files; } |
199 | | auto& global_slice_locations_for_test() { return _global_slice_locations; } |
200 | | auto& file_systems_for_test() { return _file_systems; } |
201 | | FileSystemSPtr& default_file_system_for_test() { return _default_file_system; } |
202 | | Status create_new_packed_file_state_for_test(const std::string& resource_id, |
203 | | std::unique_ptr<PackedFileContext>& ctx) { |
204 | | return create_new_packed_file_context(resource_id, ctx); |
205 | | } |
206 | | #endif |
207 | | }; |
208 | | |
209 | | } // namespace doris::io |