be/src/io/fs/packed_file_manager.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "io/fs/packed_file_manager.h" |
19 | | |
20 | | #include <bvar/bvar.h> |
21 | | #include <bvar/recorder.h> |
22 | | #include <bvar/window.h> |
23 | | |
24 | | #include <algorithm> |
25 | | #include <chrono> |
26 | | #include <cstdint> |
27 | | #include <ctime> |
28 | | #include <functional> |
29 | | #include <limits> |
30 | | #include <optional> |
31 | | #include <random> |
32 | | #include <sstream> |
33 | | #include <unordered_set> |
34 | | |
35 | | #ifdef BE_TEST |
36 | | #include "cpp/sync_point.h" |
37 | | #endif |
38 | | |
39 | | #include <gen_cpp/cloud.pb.h> |
40 | | |
41 | | #include "cloud/cloud_meta_mgr.h" |
42 | | #include "cloud/cloud_storage_engine.h" |
43 | | #include "cloud/config.h" |
44 | | #include "common/config.h" |
45 | | #include "io/cache/block_file_cache.h" |
46 | | #include "io/cache/block_file_cache_factory.h" |
47 | | #include "io/cache/file_block.h" |
48 | | #include "io/cache/file_cache_common.h" |
49 | | #include "io/fs/packed_file_trailer.h" |
50 | | #include "runtime/exec_env.h" |
51 | | #include "storage/storage_engine.h" |
52 | | #include "util/coding.h" |
53 | | #include "util/slice.h" |
54 | | #include "util/uid_util.h" |
55 | | |
56 | | namespace doris::io { |
57 | | |
58 | | namespace { |
59 | | |
60 | | bvar::Adder<int64_t> g_packed_file_total_count("packed_file", "total_count"); |
61 | | bvar::Adder<int64_t> g_packed_file_total_small_file_count("packed_file", "total_small_file_num"); |
62 | | bvar::Adder<int64_t> g_packed_file_total_size_bytes("packed_file", "total_size_bytes"); |
63 | | bvar::IntRecorder g_packed_file_small_file_num_recorder; |
64 | | bvar::IntRecorder g_packed_file_file_size_recorder; |
65 | | bvar::Window<bvar::IntRecorder> g_packed_file_avg_small_file_num( |
66 | | "packed_file_avg_small_file_num", &g_packed_file_small_file_num_recorder, |
67 | | /*window_size=*/10); |
68 | | bvar::Window<bvar::IntRecorder> g_packed_file_avg_file_size_bytes("packed_file_avg_file_size_bytes", |
69 | | &g_packed_file_file_size_recorder, |
70 | | /*window_size=*/10); |
71 | | bvar::IntRecorder g_packed_file_active_to_ready_ms_recorder; |
72 | | bvar::IntRecorder g_packed_file_ready_to_upload_ms_recorder; |
73 | | bvar::IntRecorder g_packed_file_uploading_to_uploaded_ms_recorder; |
74 | | bvar::Window<bvar::IntRecorder> g_packed_file_active_to_ready_ms_window( |
75 | | "packed_file_active_to_ready_ms", &g_packed_file_active_to_ready_ms_recorder, |
76 | | /*window_size=*/10); |
77 | | bvar::Window<bvar::IntRecorder> g_packed_file_ready_to_upload_ms_window( |
78 | | "packed_file_ready_to_upload_ms", &g_packed_file_ready_to_upload_ms_recorder, |
79 | | /*window_size=*/10); |
80 | | bvar::Window<bvar::IntRecorder> g_packed_file_uploading_to_uploaded_ms_window( |
81 | | "packed_file_uploading_to_uploaded_ms", &g_packed_file_uploading_to_uploaded_ms_recorder, |
82 | | /*window_size=*/10); |
83 | | |
84 | | // Metrics for async small file cache write |
85 | | bvar::Adder<int64_t> g_packed_file_cache_async_write_count("packed_file_cache", |
86 | | "async_write_count"); |
87 | | bvar::Adder<int64_t> g_packed_file_cache_async_write_bytes("packed_file_cache", |
88 | | "async_write_bytes"); |
89 | | |
90 | | Status append_packed_info_trailer(FileWriter* writer, const std::string& packed_file_path, |
91 | 14.7k | const cloud::PackedFileInfoPB& packed_file_info) { |
92 | 14.7k | if (writer == nullptr) { |
93 | 0 | return Status::InternalError("File writer is null for packed file: {}", packed_file_path); |
94 | 0 | } |
95 | 14.7k | if (writer->state() == FileWriter::State::CLOSED) { |
96 | 0 | return Status::InternalError("File writer already closed for packed file: {}", |
97 | 0 | packed_file_path); |
98 | 0 | } |
99 | | |
100 | 14.7k | cloud::PackedFileFooterPB footer_pb; |
101 | 14.7k | footer_pb.mutable_packed_file_info()->CopyFrom(packed_file_info); |
102 | | |
103 | 14.7k | std::string serialized_footer; |
104 | 14.7k | if (!footer_pb.SerializeToString(&serialized_footer)) { |
105 | 0 | return Status::InternalError("Failed to serialize packed file footer info for {}", |
106 | 0 | packed_file_path); |
107 | 0 | } |
108 | | |
109 | 14.7k | if (serialized_footer.size() > |
110 | 14.7k | std::numeric_limits<uint32_t>::max() - kPackedFileTrailerSuffixSize) { |
111 | 0 | return Status::InternalError("PackedFileFooterPB too large for {}", packed_file_path); |
112 | 0 | } |
113 | | |
114 | 14.7k | std::string trailer; |
115 | 14.7k | trailer.reserve(serialized_footer.size() + kPackedFileTrailerSuffixSize); |
116 | 14.7k | trailer.append(serialized_footer); |
117 | 14.7k | put_fixed32_le(&trailer, static_cast<uint32_t>(serialized_footer.size())); |
118 | 14.7k | put_fixed32_le(&trailer, kPackedFileTrailerVersion); |
119 | | |
120 | 14.7k | return writer->append(Slice(trailer)); |
121 | 14.7k | } |
122 | | |
123 | | // write small file data to file cache |
124 | | void do_write_to_file_cache(const std::string& small_file_path, const std::string& data, |
125 | 63.2k | int64_t tablet_id, uint64_t expiration_time) { |
126 | 63.2k | if (data.empty()) { |
127 | 0 | return; |
128 | 0 | } |
129 | | |
130 | | // Generate cache key from small file path (e.g., "rowset_id_seg_id.dat") |
131 | 63.2k | Path path(small_file_path); |
132 | 63.2k | UInt128Wrapper cache_hash = BlockFileCache::hash(path.filename().native()); |
133 | | |
134 | 63.2k | VLOG_DEBUG << "packed_file_cache_write: path=" << small_file_path |
135 | 2 | << " filename=" << path.filename().native() << " hash=" << cache_hash.to_string() |
136 | 2 | << " size=" << data.size() << " tablet_id=" << tablet_id |
137 | 2 | << " expiration_time=" << expiration_time; |
138 | | |
139 | 63.2k | BlockFileCache* file_cache = FileCacheFactory::instance()->get_by_path(cache_hash); |
140 | 63.2k | if (file_cache == nullptr) { |
141 | 0 | return; // Cache not available, skip |
142 | 0 | } |
143 | | |
144 | | // Allocate cache blocks |
145 | 63.2k | CacheContext ctx; |
146 | 63.2k | ctx.cache_type = expiration_time > 0 ? FileCacheType::TTL : FileCacheType::NORMAL; |
147 | 63.2k | ctx.expiration_time = expiration_time; |
148 | 63.2k | ctx.tablet_id = tablet_id; |
149 | 63.2k | ReadStatistics stats; |
150 | 63.2k | ctx.stats = &stats; |
151 | | |
152 | 63.2k | FileBlocksHolder holder = file_cache->get_or_set(cache_hash, 0, data.size(), ctx); |
153 | | |
154 | | // Write data to cache blocks |
155 | 63.2k | size_t data_offset = 0; |
156 | 63.2k | for (auto& block : holder.file_blocks) { |
157 | 63.2k | if (data_offset >= data.size()) { |
158 | 0 | break; |
159 | 0 | } |
160 | 63.2k | size_t block_size = block->range().size(); |
161 | 63.2k | size_t write_size = std::min(block_size, data.size() - data_offset); |
162 | | |
163 | 63.2k | if (block->state() == FileBlock::State::EMPTY) { |
164 | 63.2k | block->get_or_set_downloader(); |
165 | 63.2k | if (block->is_downloader()) { |
166 | 63.2k | Slice s(data.data() + data_offset, write_size); |
167 | 63.2k | Status st = block->append(s); |
168 | 63.2k | if (st.ok()) { |
169 | 63.2k | st = block->finalize(); |
170 | 63.2k | } |
171 | 63.2k | if (!st.ok()) { |
172 | 0 | LOG(WARNING) << "Write small file to cache failed: " << st.msg(); |
173 | 0 | } |
174 | 63.2k | } |
175 | 63.2k | } |
176 | 63.2k | data_offset += write_size; |
177 | 63.2k | } |
178 | 63.2k | } |
179 | | |
180 | | // Async wrapper: submit cache write task to thread pool |
181 | | // The data is copied into the lambda capture to ensure its lifetime extends beyond |
182 | | // the async task execution. The original Slice may reference a buffer that gets |
183 | | // reused or freed before the async task runs. |
184 | | void write_small_file_to_cache_async(const std::string& small_file_path, const Slice& data, |
185 | 63.3k | int64_t tablet_id, uint64_t expiration_time) { |
186 | 63.3k | if (!config::enable_file_cache || data.size == 0) { |
187 | 80 | return; |
188 | 80 | } |
189 | | |
190 | | // Copy data since original buffer may be reused before async task executes |
191 | | // For small files (< 1MB), copy overhead is acceptable |
192 | 63.2k | std::string data_copy(data.data, data.size); |
193 | 63.2k | size_t data_size = data.size; |
194 | | |
195 | 63.2k | auto* thread_pool = ExecEnv::GetInstance()->s3_file_upload_thread_pool(); |
196 | 63.2k | if (thread_pool == nullptr) { |
197 | | // Fallback to sync write if thread pool not available |
198 | 0 | do_write_to_file_cache(small_file_path, data_copy, tablet_id, expiration_time); |
199 | 0 | return; |
200 | 0 | } |
201 | | |
202 | | // Track async write count and bytes |
203 | 63.2k | g_packed_file_cache_async_write_count << 1; |
204 | 63.2k | g_packed_file_cache_async_write_bytes << static_cast<int64_t>(data_size); |
205 | | |
206 | 63.2k | Status st = thread_pool->submit_func([path = small_file_path, data = std::move(data_copy), |
207 | 63.2k | tablet_id, data_size, expiration_time]() { |
208 | 63.2k | do_write_to_file_cache(path, data, tablet_id, expiration_time); |
209 | | // Decrement async write count after completion |
210 | 63.2k | g_packed_file_cache_async_write_count << -1; |
211 | 63.2k | g_packed_file_cache_async_write_bytes << -static_cast<int64_t>(data_size); |
212 | 63.2k | }); |
213 | | |
214 | 63.2k | if (!st.ok()) { |
215 | | // Revert metrics since task was not submitted |
216 | 0 | g_packed_file_cache_async_write_count << -1; |
217 | 0 | g_packed_file_cache_async_write_bytes << -static_cast<int64_t>(data_size); |
218 | 0 | LOG(WARNING) << "Failed to submit cache write task for " << small_file_path << ": " |
219 | 0 | << st.msg(); |
220 | | // Don't block on failure, cache write is best-effort |
221 | 0 | } |
222 | 63.2k | } |
223 | | |
224 | | } // namespace |
225 | | |
226 | 207k | PackedFileManager* PackedFileManager::instance() { |
227 | 207k | static PackedFileManager instance; |
228 | 207k | return &instance; |
229 | 207k | } |
230 | | |
231 | 34 | PackedFileManager::~PackedFileManager() { |
232 | 34 | stop_background_manager(); |
233 | 34 | } |
234 | | |
235 | 38 | Status PackedFileManager::init() { |
236 | 38 | return Status::OK(); |
237 | 38 | } |
238 | | |
239 | | Status PackedFileManager::create_new_packed_file_context( |
240 | 14.8k | const std::string& resource_id, std::unique_ptr<PackedFileContext>& packed_file_ctx) { |
241 | 14.8k | FileSystemSPtr file_system; |
242 | 14.8k | RETURN_IF_ERROR(ensure_file_system(resource_id, &file_system)); |
243 | 14.8k | if (file_system == nullptr) { |
244 | 0 | return Status::InternalError("File system is not available for packed file creation: " + |
245 | 0 | resource_id); |
246 | 0 | } |
247 | | |
248 | 14.8k | auto uuid = generate_uuid_string(); |
249 | 14.8k | auto hash_val = std::hash<std::string> {}(uuid); |
250 | 14.8k | uint16_t path_bucket = hash_val % 4096 + 1; |
251 | 14.8k | std::stringstream path_stream; |
252 | 14.8k | path_stream << "data/packed_file/" << path_bucket << "/" << uuid << ".bin"; |
253 | | |
254 | 14.8k | packed_file_ctx = std::make_unique<PackedFileContext>(); |
255 | 14.8k | const std::string relative_path = path_stream.str(); |
256 | 14.8k | packed_file_ctx->packed_file_path = relative_path; |
257 | 14.8k | packed_file_ctx->create_time = std::time(nullptr); |
258 | 14.8k | packed_file_ctx->create_timestamp = std::chrono::steady_clock::now(); |
259 | 14.8k | packed_file_ctx->state = PackedFileState::INIT; |
260 | 14.8k | packed_file_ctx->resource_id = resource_id; |
261 | 14.8k | packed_file_ctx->file_system = std::move(file_system); |
262 | | |
263 | | // Create file writer for the packed file |
264 | 14.8k | FileWriterPtr new_writer; |
265 | 14.8k | FileWriterOptions opts; |
266 | | // Disable write_file_cache for packed file itself. |
267 | | // We write file cache for each small file separately in append_small_file() |
268 | | // using the small file path as cache key, ensuring cache entries can be |
269 | | // properly cleaned up when stale rowsets are removed. |
270 | 14.8k | opts.write_file_cache = false; |
271 | 14.8k | RETURN_IF_ERROR( |
272 | 14.8k | packed_file_ctx->file_system->create_file(Path(relative_path), &new_writer, &opts)); |
273 | 14.7k | packed_file_ctx->writer = std::move(new_writer); |
274 | | |
275 | 14.7k | return Status::OK(); |
276 | 14.8k | } |
277 | | |
278 | | Status PackedFileManager::ensure_file_system(const std::string& resource_id, |
279 | 14.8k | FileSystemSPtr* file_system) { |
280 | 14.8k | if (file_system == nullptr) { |
281 | 0 | return Status::InvalidArgument("file_system output parameter is null"); |
282 | 0 | } |
283 | | |
284 | 14.8k | { |
285 | 14.8k | std::lock_guard<std::mutex> lock(_file_system_mutex); |
286 | 14.8k | if (resource_id.empty()) { |
287 | 0 | if (_default_file_system != nullptr) { |
288 | 0 | *file_system = _default_file_system; |
289 | 0 | return Status::OK(); |
290 | 0 | } |
291 | 14.8k | } else { |
292 | 14.8k | auto it = _file_systems.find(resource_id); |
293 | 14.8k | if (it != _file_systems.end()) { |
294 | 14.7k | *file_system = it->second; |
295 | 14.7k | return Status::OK(); |
296 | 14.7k | } |
297 | 14.8k | } |
298 | 14.8k | } |
299 | | |
300 | 3 | if (!config::is_cloud_mode()) { |
301 | 2 | return Status::InternalError("Cloud file system is not available in local mode"); |
302 | 2 | } |
303 | | |
304 | 1 | auto* exec_env = ExecEnv::GetInstance(); |
305 | 1 | if (exec_env == nullptr) { |
306 | 0 | return Status::InternalError("ExecEnv instance is not initialized"); |
307 | 0 | } |
308 | | |
309 | 1 | FileSystemSPtr resolved_fs; |
310 | 1 | if (resource_id.empty()) { |
311 | 0 | resolved_fs = exec_env->storage_engine().to_cloud().latest_fs(); |
312 | 0 | if (resolved_fs == nullptr) { |
313 | 0 | return Status::InternalError("Cloud file system is not ready"); |
314 | 0 | } |
315 | 1 | } else { |
316 | 1 | auto storage_resource = |
317 | 1 | exec_env->storage_engine().to_cloud().get_storage_resource(resource_id); |
318 | 1 | if (!storage_resource.has_value() || storage_resource->fs == nullptr) { |
319 | 0 | return Status::InternalError("Cloud file system is not ready for resource: " + |
320 | 0 | resource_id); |
321 | 0 | } |
322 | 1 | resolved_fs = storage_resource->fs; |
323 | 1 | } |
324 | | |
325 | 1 | { |
326 | 1 | std::lock_guard<std::mutex> lock(_file_system_mutex); |
327 | 1 | if (resource_id.empty()) { |
328 | 0 | _default_file_system = resolved_fs; |
329 | 0 | *file_system = _default_file_system; |
330 | 1 | } else { |
331 | 1 | _file_systems[resource_id] = resolved_fs; |
332 | 1 | *file_system = resolved_fs; |
333 | 1 | } |
334 | 1 | } |
335 | | |
336 | 1 | return Status::OK(); |
337 | 1 | } |
338 | | |
339 | | Status PackedFileManager::append_small_file(const std::string& path, const Slice& data, |
340 | 63.3k | const PackedAppendContext& info) { |
341 | | // Check if file is too large to be merged |
342 | 63.3k | if (data.get_size() > config::small_file_threshold_bytes) { |
343 | 3 | return Status::OK(); // Skip merging for large files |
344 | 3 | } |
345 | | |
346 | 63.3k | if (info.txn_id <= 0) { |
347 | 1 | return Status::InvalidArgument("Missing valid txn id for packed file append: " + path); |
348 | 1 | } |
349 | | |
350 | 63.3k | std::lock_guard<std::timed_mutex> lock(_current_packed_file_mutex); |
351 | | |
352 | 63.3k | auto& current_state = _current_packed_files[info.resource_id]; |
353 | 63.3k | if (!current_state || !current_state->writer) { |
354 | 13 | RETURN_IF_ERROR(create_new_packed_file_context(info.resource_id, current_state)); |
355 | 13 | } |
356 | | |
357 | | // Check if we need to create a new packed file |
358 | 63.3k | if (current_state->total_size + data.get_size() >= config::packed_file_size_threshold_bytes) { |
359 | 4 | RETURN_IF_ERROR(mark_current_packed_file_for_upload_locked(info.resource_id)); |
360 | 4 | auto it = _current_packed_files.find(info.resource_id); |
361 | 4 | if (it == _current_packed_files.end() || !it->second || !it->second->writer) { |
362 | 0 | RETURN_IF_ERROR(create_new_packed_file_context( |
363 | 0 | info.resource_id, _current_packed_files[info.resource_id])); |
364 | 0 | } |
365 | 4 | } |
366 | | |
367 | 63.3k | PackedFileContext* active_state = current_state.get(); |
368 | | |
369 | | // Write data to current packed file |
370 | 63.3k | RETURN_IF_ERROR(active_state->writer->append(data)); |
371 | | |
372 | | // Async write data to file cache using small file path as cache key. |
373 | | // This ensures cache key matches the cleanup key in Rowset::clear_cache(), |
374 | | // allowing proper cache cleanup when stale rowsets are removed. |
375 | 63.3k | if (info.write_file_cache) { |
376 | 63.3k | write_small_file_to_cache_async(path, data, info.tablet_id, info.expiration_time); |
377 | 63.3k | } |
378 | | |
379 | | // Update index |
380 | 63.3k | PackedSliceLocation location; |
381 | 63.3k | location.packed_file_path = active_state->packed_file_path; |
382 | 63.3k | location.offset = active_state->current_offset; |
383 | 63.3k | location.size = data.get_size(); |
384 | 63.3k | location.create_time = std::time(nullptr); |
385 | 63.3k | location.tablet_id = info.tablet_id; |
386 | 63.3k | location.rowset_id = info.rowset_id; |
387 | 63.3k | location.resource_id = info.resource_id; |
388 | 63.3k | location.txn_id = info.txn_id; |
389 | | |
390 | 63.3k | active_state->slice_locations[path] = location; |
391 | 63.3k | active_state->current_offset += data.get_size(); |
392 | 63.3k | active_state->total_size += data.get_size(); |
393 | | |
394 | | // Rotate packed file when small file count reaches threshold |
395 | 63.3k | if (config::packed_file_small_file_count_threshold > 0 && |
396 | 63.3k | static_cast<int64_t>(active_state->slice_locations.size()) >= |
397 | 63.3k | config::packed_file_small_file_count_threshold) { |
398 | 4 | RETURN_IF_ERROR(mark_current_packed_file_for_upload_locked(info.resource_id)); |
399 | 4 | } |
400 | | |
401 | | // Mark as active if this is the first write |
402 | 63.3k | if (!active_state->first_append_timestamp.has_value()) { |
403 | 14.7k | active_state->first_append_timestamp = std::chrono::steady_clock::now(); |
404 | 14.7k | } |
405 | 63.3k | if (active_state->state == PackedFileState::INIT) { |
406 | 14.7k | active_state->state = PackedFileState::ACTIVE; |
407 | 14.7k | } |
408 | | |
409 | | // Update global index |
410 | 63.3k | { |
411 | 63.3k | std::lock_guard<std::mutex> global_lock(_global_index_mutex); |
412 | 63.3k | _global_slice_locations[path] = location; |
413 | 63.3k | } |
414 | | |
415 | 63.3k | return Status::OK(); |
416 | 63.3k | } |
417 | | |
418 | 44.6k | Status PackedFileManager::wait_for_packed_file_upload(PackedFileContext* packed_file_ptr) { |
419 | 44.6k | std::unique_lock<std::mutex> upload_lock(packed_file_ptr->upload_mutex); |
420 | 89.3k | packed_file_ptr->upload_cv.wait(upload_lock, [packed_file_ptr] { |
421 | 89.3k | auto state = packed_file_ptr->state.load(std::memory_order_acquire); |
422 | 89.3k | return state == PackedFileState::UPLOADED || state == PackedFileState::FAILED; |
423 | 89.3k | }); |
424 | 44.6k | if (packed_file_ptr->state == PackedFileState::FAILED) { |
425 | 0 | std::string err = packed_file_ptr->last_error; |
426 | 0 | if (err.empty()) { |
427 | 0 | err = "Packed file upload failed"; |
428 | 0 | } |
429 | 0 | return Status::InternalError(err); |
430 | 0 | } |
431 | 44.6k | return Status::OK(); |
432 | 44.6k | } |
433 | | |
434 | 63.1k | Status PackedFileManager::wait_upload_done(const std::string& path) { |
435 | 63.1k | std::string packed_file_path; |
436 | 63.1k | { |
437 | 63.1k | std::lock_guard<std::mutex> global_lock(_global_index_mutex); |
438 | 63.1k | auto it = _global_slice_locations.find(path); |
439 | 63.1k | if (it == _global_slice_locations.end()) { |
440 | 1 | return Status::InternalError("File not found in global index: " + path); |
441 | 1 | } |
442 | 63.1k | packed_file_path = it->second.packed_file_path; |
443 | 63.1k | } |
444 | | |
445 | | // Find the packed file in uploaded files first - if already uploaded, no need to wait |
446 | 0 | std::shared_ptr<PackedFileContext> managed_packed_file; |
447 | 63.1k | std::shared_ptr<PackedFileContext> failed_packed_file; |
448 | 63.1k | { |
449 | 63.1k | std::lock_guard<std::mutex> lock(_packed_files_mutex); |
450 | 63.1k | auto uploaded_it = _uploaded_packed_files.find(packed_file_path); |
451 | 63.1k | if (uploaded_it != _uploaded_packed_files.end()) { |
452 | 18.5k | auto state = uploaded_it->second->state.load(std::memory_order_acquire); |
453 | 18.5k | if (state == PackedFileState::UPLOADED) { |
454 | 18.5k | return Status::OK(); // Already uploaded, no need to wait |
455 | 18.5k | } |
456 | 1 | if (state == PackedFileState::FAILED) { |
457 | 1 | failed_packed_file = uploaded_it->second; |
458 | 1 | } else { |
459 | 0 | managed_packed_file = uploaded_it->second; |
460 | 0 | } |
461 | 1 | } |
462 | 63.1k | } |
463 | | |
464 | 44.6k | if (failed_packed_file) { |
465 | 1 | std::lock_guard<std::mutex> upload_lock(failed_packed_file->upload_mutex); |
466 | 1 | std::string err = failed_packed_file->last_error; |
467 | 1 | if (err.empty()) { |
468 | 0 | err = "Merge file upload failed"; |
469 | 0 | } |
470 | 1 | return Status::InternalError(err); |
471 | 1 | } |
472 | | |
473 | | // Find the packed file in either current or uploading files |
474 | 44.6k | PackedFileContext* packed_file_ptr = nullptr; |
475 | 44.6k | { |
476 | 44.6k | std::lock_guard<std::timed_mutex> current_lock(_current_packed_file_mutex); |
477 | 49.3k | for (auto& [resource_id, state] : _current_packed_files) { |
478 | 49.3k | if (state && state->packed_file_path == packed_file_path) { |
479 | 44.0k | packed_file_ptr = state.get(); |
480 | 44.0k | break; |
481 | 44.0k | } |
482 | 49.3k | } |
483 | 44.6k | } |
484 | | |
485 | 44.6k | if (!packed_file_ptr) { |
486 | 577 | std::lock_guard<std::mutex> lock(_packed_files_mutex); |
487 | 577 | auto uploading_it = _uploading_packed_files.find(packed_file_path); |
488 | 577 | if (uploading_it != _uploading_packed_files.end()) { |
489 | 577 | managed_packed_file = uploading_it->second; |
490 | 577 | packed_file_ptr = managed_packed_file.get(); |
491 | 577 | } else { |
492 | 0 | auto uploaded_it = _uploaded_packed_files.find(packed_file_path); |
493 | 0 | if (uploaded_it != _uploaded_packed_files.end()) { |
494 | 0 | managed_packed_file = uploaded_it->second; |
495 | 0 | packed_file_ptr = managed_packed_file.get(); |
496 | 0 | } |
497 | 0 | } |
498 | 577 | } |
499 | | |
500 | 44.6k | if (!packed_file_ptr) { |
501 | | // Packed file not found in any location, this is unexpected |
502 | 0 | return Status::InternalError("Packed file not found for path: " + path); |
503 | 0 | } |
504 | | |
505 | 44.6k | Status wait_status = wait_for_packed_file_upload(packed_file_ptr); |
506 | 44.6k | (void)managed_packed_file; // keep shared ownership alive during wait |
507 | 44.6k | return wait_status; |
508 | 44.6k | } |
509 | | |
510 | | Status PackedFileManager::get_packed_slice_location(const std::string& path, |
511 | 145k | PackedSliceLocation* location) { |
512 | 145k | std::lock_guard<std::mutex> lock(_global_index_mutex); |
513 | 145k | auto it = _global_slice_locations.find(path); |
514 | 145k | if (it == _global_slice_locations.end()) { |
515 | 1 | return Status::NotFound("File not found in global packed index: {}", path); |
516 | 1 | } |
517 | | |
518 | 145k | *location = it->second; |
519 | 145k | return Status::OK(); |
520 | 145k | } |
521 | | |
522 | 3 | void PackedFileManager::start_background_manager() { |
523 | 3 | if (_background_thread) { |
524 | 0 | return; // Already started |
525 | 0 | } |
526 | | |
527 | 3 | _stop_background_thread = false; |
528 | 3 | _background_thread = std::make_unique<std::thread>([this] { background_manager(); }); |
529 | 3 | } |
530 | | |
531 | 40 | void PackedFileManager::stop_background_manager() { |
532 | 40 | _stop_background_thread = true; |
533 | 40 | if (_background_thread && _background_thread->joinable()) { |
534 | 2 | _background_thread->join(); |
535 | 2 | } |
536 | 40 | _background_thread.reset(); |
537 | 40 | } |
538 | | |
539 | | Status PackedFileManager::mark_current_packed_file_for_upload_locked( |
540 | 14.7k | const std::string& resource_id) { |
541 | 14.7k | auto it = _current_packed_files.find(resource_id); |
542 | 14.7k | if (it == _current_packed_files.end() || !it->second || !it->second->writer) { |
543 | 0 | return Status::OK(); // Nothing to mark for upload |
544 | 0 | } |
545 | | |
546 | 14.7k | auto& current = it->second; |
547 | | |
548 | | // Mark as ready for upload |
549 | 14.7k | current->state = PackedFileState::READY_TO_UPLOAD; |
550 | 14.7k | if (!current->ready_to_upload_timestamp.has_value()) { |
551 | 14.7k | auto now = std::chrono::steady_clock::now(); |
552 | 14.7k | current->ready_to_upload_timestamp = now; |
553 | 14.7k | int64_t active_to_ready_ms = -1; |
554 | 14.7k | if (current->first_append_timestamp.has_value()) { |
555 | 14.7k | active_to_ready_ms = std::chrono::duration_cast<std::chrono::milliseconds>( |
556 | 14.7k | now - *current->first_append_timestamp) |
557 | 14.7k | .count(); |
558 | 14.7k | g_packed_file_active_to_ready_ms_recorder << active_to_ready_ms; |
559 | 14.7k | if (auto* sampler = g_packed_file_active_to_ready_ms_recorder.get_sampler()) { |
560 | 14.7k | sampler->take_sample(); |
561 | 14.7k | } |
562 | 14.7k | } |
563 | 14.7k | VLOG_DEBUG << "Packed file " << current->packed_file_path |
564 | 0 | << " transition ACTIVE->READY_TO_UPLOAD; active_to_ready_ms=" |
565 | 0 | << active_to_ready_ms; |
566 | 14.7k | } |
567 | | |
568 | | // Move to uploading files list |
569 | 14.7k | { |
570 | 14.7k | std::shared_ptr<PackedFileContext> uploading_ptr = |
571 | 14.7k | std::shared_ptr<PackedFileContext>(std::move(current)); |
572 | 14.7k | std::lock_guard<std::mutex> lock(_packed_files_mutex); |
573 | 14.7k | _uploading_packed_files[uploading_ptr->packed_file_path] = uploading_ptr; |
574 | 14.7k | } |
575 | | |
576 | | // Create new packed file |
577 | 14.7k | return create_new_packed_file_context(resource_id, it->second); |
578 | 14.7k | } |
579 | | |
580 | 14.7k | Status PackedFileManager::mark_current_packed_file_for_upload(const std::string& resource_id) { |
581 | 14.7k | std::lock_guard<std::timed_mutex> lock(_current_packed_file_mutex); |
582 | 14.7k | return mark_current_packed_file_for_upload_locked(resource_id); |
583 | 14.7k | } |
584 | | |
585 | 14.7k | void PackedFileManager::record_packed_file_metrics(const PackedFileContext& packed_file) { |
586 | 14.7k | g_packed_file_total_count << 1; |
587 | 14.7k | g_packed_file_total_small_file_count |
588 | 14.7k | << static_cast<int64_t>(packed_file.slice_locations.size()); |
589 | 14.7k | g_packed_file_total_size_bytes << packed_file.total_size; |
590 | 14.7k | g_packed_file_small_file_num_recorder |
591 | 14.7k | << static_cast<int64_t>(packed_file.slice_locations.size()); |
592 | 14.7k | g_packed_file_file_size_recorder << packed_file.total_size; |
593 | | // Flush samplers immediately so the window bvar reflects the latest packed file. |
594 | 14.7k | if (auto* sampler = g_packed_file_small_file_num_recorder.get_sampler()) { |
595 | 14.7k | sampler->take_sample(); |
596 | 14.7k | } |
597 | 14.7k | if (auto* sampler = g_packed_file_file_size_recorder.get_sampler()) { |
598 | 14.7k | sampler->take_sample(); |
599 | 14.7k | } |
600 | 14.7k | } |
601 | | |
602 | 3 | void PackedFileManager::background_manager() { |
603 | 3 | auto last_cleanup_time = std::chrono::steady_clock::now(); |
604 | | |
605 | 414k | while (!_stop_background_thread.load()) { |
606 | 414k | int64_t check_interval_ms = |
607 | 414k | std::max<int64_t>(1, config::packed_file_time_threshold_ms / 10); |
608 | 414k | std::this_thread::sleep_for(std::chrono::milliseconds(check_interval_ms)); |
609 | | |
610 | | // Check if current packed file should be closed due to time threshold |
611 | 414k | std::vector<std::string> resources_to_mark; |
612 | 414k | { |
613 | 414k | std::unique_lock<std::timed_mutex> current_lock(_current_packed_file_mutex, |
614 | 414k | std::defer_lock); |
615 | 414k | int64_t lock_wait_ms = std::max<int64_t>(0, config::packed_file_try_lock_timeout_ms); |
616 | 414k | if (current_lock.try_lock_for(std::chrono::milliseconds(lock_wait_ms))) { |
617 | 414k | for (auto& [resource_id, state] : _current_packed_files) { |
618 | 413k | if (!state || state->state != PackedFileState::ACTIVE) { |
619 | 264k | continue; |
620 | 264k | } |
621 | 149k | if (!state->first_append_timestamp.has_value()) { |
622 | 0 | continue; |
623 | 0 | } |
624 | 149k | auto current_time = std::chrono::steady_clock::now(); |
625 | 149k | auto elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>( |
626 | 149k | current_time - *(state->first_append_timestamp)) |
627 | 149k | .count(); |
628 | 149k | if (elapsed_ms >= config::packed_file_time_threshold_ms) { |
629 | 14.7k | resources_to_mark.push_back(resource_id); |
630 | 14.7k | } |
631 | 149k | } |
632 | 414k | } |
633 | 414k | } |
634 | 414k | for (const auto& resource_id : resources_to_mark) { |
635 | 14.7k | Status st = mark_current_packed_file_for_upload(resource_id); |
636 | 14.7k | if (!st.ok()) { |
637 | 0 | LOG(WARNING) << "Failed to close current packed file for resource " << resource_id |
638 | 0 | << ": " << st.to_string(); |
639 | 0 | } |
640 | 14.7k | } |
641 | | |
642 | | // Process uploading files |
643 | 414k | process_uploading_packed_files(); |
644 | | |
645 | 414k | auto now = std::chrono::steady_clock::now(); |
646 | 414k | int64_t cleanup_interval_sec = |
647 | 414k | std::max<int64_t>(1, config::packed_file_cleanup_interval_seconds); |
648 | 414k | auto cleanup_interval = std::chrono::seconds(cleanup_interval_sec); |
649 | 414k | if (now - last_cleanup_time >= cleanup_interval) { |
650 | 72 | cleanup_expired_data(); |
651 | 72 | last_cleanup_time = now; |
652 | 72 | } |
653 | 414k | } |
654 | 3 | } |
655 | | |
656 | 414k | void PackedFileManager::process_uploading_packed_files() { |
657 | 414k | std::vector<std::shared_ptr<PackedFileContext>> files_ready; |
658 | 414k | std::vector<std::shared_ptr<PackedFileContext>> files_uploading; |
659 | 414k | auto record_ready_to_upload = [&](const std::shared_ptr<PackedFileContext>& packed_file) { |
660 | 13.7k | if (!packed_file->uploading_timestamp.has_value()) { |
661 | 13.7k | packed_file->uploading_timestamp = std::chrono::steady_clock::now(); |
662 | 13.7k | int64_t duration_ms = -1; |
663 | 13.7k | if (packed_file->ready_to_upload_timestamp.has_value()) { |
664 | 13.7k | duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>( |
665 | 13.7k | *packed_file->uploading_timestamp - |
666 | 13.7k | *packed_file->ready_to_upload_timestamp) |
667 | 13.7k | .count(); |
668 | 13.7k | g_packed_file_ready_to_upload_ms_recorder << duration_ms; |
669 | 13.7k | if (auto* sampler = g_packed_file_ready_to_upload_ms_recorder.get_sampler()) { |
670 | 13.7k | sampler->take_sample(); |
671 | 13.7k | } |
672 | 13.7k | } |
673 | 13.7k | VLOG_DEBUG << "Packed file " << packed_file->packed_file_path |
674 | 0 | << " transition READY_TO_UPLOAD->UPLOADING; " |
675 | 0 | "ready_to_upload_ms=" |
676 | 0 | << duration_ms; |
677 | 13.7k | } |
678 | 13.7k | }; |
679 | | |
680 | 414k | { |
681 | 414k | std::lock_guard<std::mutex> lock(_packed_files_mutex); |
682 | 414k | for (auto& [packed_file_path, packed_file] : _uploading_packed_files) { |
683 | 78.1k | auto state = packed_file->state.load(std::memory_order_acquire); |
684 | 78.1k | if (state != PackedFileState::READY_TO_UPLOAD && state != PackedFileState::UPLOADING) { |
685 | 0 | continue; |
686 | 0 | } |
687 | 78.1k | if (state == PackedFileState::READY_TO_UPLOAD) { |
688 | 14.7k | files_ready.emplace_back(packed_file); |
689 | 63.4k | } else { |
690 | 63.4k | files_uploading.emplace_back(packed_file); |
691 | 63.4k | } |
692 | 78.1k | } |
693 | 414k | } |
694 | | |
695 | 414k | auto handle_success = [&](const std::shared_ptr<PackedFileContext>& packed_file) { |
696 | 13.7k | auto now = std::chrono::steady_clock::now(); |
697 | 13.7k | int64_t active_to_ready_ms = -1; |
698 | 13.7k | int64_t ready_to_upload_ms = -1; |
699 | 13.7k | int64_t uploading_to_uploaded_ms = -1; |
700 | 13.7k | if (packed_file->first_append_timestamp.has_value() && |
701 | 13.7k | packed_file->ready_to_upload_timestamp.has_value()) { |
702 | 13.7k | active_to_ready_ms = std::chrono::duration_cast<std::chrono::milliseconds>( |
703 | 13.7k | *packed_file->ready_to_upload_timestamp - |
704 | 13.7k | *packed_file->first_append_timestamp) |
705 | 13.7k | .count(); |
706 | 13.7k | } |
707 | 13.7k | if (packed_file->ready_to_upload_timestamp.has_value() && |
708 | 13.7k | packed_file->uploading_timestamp.has_value()) { |
709 | 13.7k | ready_to_upload_ms = std::chrono::duration_cast<std::chrono::milliseconds>( |
710 | 13.7k | *packed_file->uploading_timestamp - |
711 | 13.7k | *packed_file->ready_to_upload_timestamp) |
712 | 13.7k | .count(); |
713 | 13.7k | } |
714 | 13.7k | if (packed_file->uploading_timestamp.has_value()) { |
715 | 13.7k | uploading_to_uploaded_ms = std::chrono::duration_cast<std::chrono::milliseconds>( |
716 | 13.7k | now - *packed_file->uploading_timestamp) |
717 | 13.7k | .count(); |
718 | 13.7k | g_packed_file_uploading_to_uploaded_ms_recorder << uploading_to_uploaded_ms; |
719 | 13.7k | if (auto* sampler = g_packed_file_uploading_to_uploaded_ms_recorder.get_sampler()) { |
720 | 13.7k | sampler->take_sample(); |
721 | 13.7k | } |
722 | 13.7k | } |
723 | 13.7k | int64_t total_ms = -1; |
724 | 13.7k | if (packed_file->first_append_timestamp.has_value()) { |
725 | 13.7k | total_ms = std::chrono::duration_cast<std::chrono::milliseconds>( |
726 | 13.7k | now - *packed_file->first_append_timestamp) |
727 | 13.7k | .count(); |
728 | 13.7k | } |
729 | 13.7k | std::ostringstream slices_stream; |
730 | 13.7k | bool first_slice = true; |
731 | 62.2k | for (const auto& [small_file_path, index] : packed_file->slice_locations) { |
732 | 62.2k | if (!first_slice) { |
733 | 48.5k | slices_stream << "; "; |
734 | 48.5k | } |
735 | 62.2k | first_slice = false; |
736 | 62.2k | slices_stream << small_file_path << "(txn=" << index.txn_id |
737 | 62.2k | << ", offset=" << index.offset << ", size=" << index.size << ")"; |
738 | | |
739 | | // Update packed_file_size in global index |
740 | 62.2k | { |
741 | 62.2k | std::lock_guard<std::mutex> global_lock(_global_index_mutex); |
742 | 62.2k | auto it = _global_slice_locations.find(small_file_path); |
743 | 62.2k | if (it != _global_slice_locations.end()) { |
744 | 62.2k | it->second.packed_file_size = packed_file->total_size; |
745 | 62.2k | } |
746 | 62.2k | } |
747 | 62.2k | } |
748 | 13.7k | LOG(INFO) << "Packed file " << packed_file->packed_file_path |
749 | 13.7k | << " uploaded; slices=" << packed_file->slice_locations.size() |
750 | 13.7k | << ", total_bytes=" << packed_file->total_size << ", slice_detail=[" |
751 | 13.7k | << slices_stream.str() << "]" |
752 | 13.7k | << ", active_to_ready_ms=" << active_to_ready_ms |
753 | 13.7k | << ", ready_to_upload_ms=" << ready_to_upload_ms |
754 | 13.7k | << ", uploading_to_uploaded_ms=" << uploading_to_uploaded_ms |
755 | 13.7k | << ", total_ms=" << total_ms; |
756 | 13.7k | { |
757 | 13.7k | std::lock_guard<std::mutex> upload_lock(packed_file->upload_mutex); |
758 | 13.7k | packed_file->state = PackedFileState::UPLOADED; |
759 | 13.7k | packed_file->upload_time = std::time(nullptr); |
760 | 13.7k | } |
761 | 13.7k | packed_file->upload_cv.notify_all(); |
762 | 13.7k | { |
763 | 13.7k | std::lock_guard<std::mutex> lock(_packed_files_mutex); |
764 | 13.7k | _uploading_packed_files.erase(packed_file->packed_file_path); |
765 | 13.7k | _uploaded_packed_files[packed_file->packed_file_path] = packed_file; |
766 | 13.7k | } |
767 | 13.7k | }; |
768 | | |
769 | 414k | auto handle_failure = [&](const std::shared_ptr<PackedFileContext>& packed_file, |
770 | 414k | const Status& status) { |
771 | 0 | LOG(WARNING) << "Failed to upload packed file: " << packed_file->packed_file_path |
772 | 0 | << ", error: " << status.to_string(); |
773 | 0 | { |
774 | 0 | std::lock_guard<std::mutex> upload_lock(packed_file->upload_mutex); |
775 | 0 | packed_file->state = PackedFileState::FAILED; |
776 | 0 | packed_file->last_error = status.to_string(); |
777 | 0 | packed_file->upload_time = std::time(nullptr); |
778 | 0 | } |
779 | 0 | packed_file->upload_cv.notify_all(); |
780 | 0 | { |
781 | 0 | std::lock_guard<std::mutex> lock(_packed_files_mutex); |
782 | 0 | _uploading_packed_files.erase(packed_file->packed_file_path); |
783 | 0 | _uploaded_packed_files[packed_file->packed_file_path] = packed_file; |
784 | 0 | } |
785 | 0 | }; |
786 | | |
787 | 414k | for (auto& packed_file : files_ready) { |
788 | 14.7k | const std::string& packed_file_path = packed_file->packed_file_path; |
789 | 14.7k | cloud::PackedFileInfoPB packed_file_info; |
790 | 14.7k | packed_file_info.set_ref_cnt(packed_file->slice_locations.size()); |
791 | 14.7k | packed_file_info.set_total_slice_num(packed_file->slice_locations.size()); |
792 | 14.7k | packed_file_info.set_total_slice_bytes(packed_file->total_size); |
793 | 14.7k | packed_file_info.set_remaining_slice_bytes(packed_file->total_size); |
794 | 14.7k | packed_file_info.set_created_at_sec(packed_file->create_time); |
795 | 14.7k | packed_file_info.set_corrected(false); |
796 | 14.7k | packed_file_info.set_state(doris::cloud::PackedFileInfoPB::NORMAL); |
797 | 14.7k | packed_file_info.set_resource_id(packed_file->resource_id); |
798 | | |
799 | 63.2k | for (const auto& [small_file_path, index] : packed_file->slice_locations) { |
800 | 63.2k | auto* small_file = packed_file_info.add_slices(); |
801 | 63.2k | small_file->set_path(small_file_path); |
802 | 63.2k | small_file->set_offset(index.offset); |
803 | 63.2k | small_file->set_size(index.size); |
804 | 63.2k | small_file->set_deleted(false); |
805 | 63.2k | if (index.tablet_id != 0) { |
806 | 63.2k | small_file->set_tablet_id(index.tablet_id); |
807 | 63.2k | } |
808 | 63.2k | if (!index.rowset_id.empty()) { |
809 | 63.2k | small_file->set_rowset_id(index.rowset_id); |
810 | 63.2k | } |
811 | 63.2k | if (index.txn_id != 0) { |
812 | 63.2k | small_file->set_txn_id(index.txn_id); |
813 | 63.2k | } |
814 | 63.2k | } |
815 | | |
816 | 14.7k | Status meta_status = update_meta_service(packed_file->packed_file_path, packed_file_info); |
817 | 14.7k | if (!meta_status.ok()) { |
818 | 2 | LOG(WARNING) << "Failed to update meta service for packed file: " |
819 | 2 | << packed_file->packed_file_path << ", error: " << meta_status.to_string(); |
820 | 2 | handle_failure(packed_file, meta_status); |
821 | 2 | continue; |
822 | 2 | } |
823 | | |
824 | | // Record stats once the packed file metadata is persisted. |
825 | 14.7k | record_packed_file_metrics(*packed_file); |
826 | | |
827 | 14.7k | Status trailer_status = append_packed_info_trailer( |
828 | 14.7k | packed_file->writer.get(), packed_file->packed_file_path, packed_file_info); |
829 | 14.7k | if (!trailer_status.ok()) { |
830 | 0 | handle_failure(packed_file, trailer_status); |
831 | 0 | continue; |
832 | 0 | } |
833 | | |
834 | | // Now upload the file |
835 | 14.7k | if (!packed_file->slice_locations.empty()) { |
836 | 14.7k | std::ostringstream oss; |
837 | 14.7k | oss << "Uploading packed file " << packed_file_path << " with " |
838 | 14.7k | << packed_file->slice_locations.size() << " small files: "; |
839 | 14.7k | bool first = true; |
840 | 63.2k | for (const auto& [small_file_path, index] : packed_file->slice_locations) { |
841 | 63.2k | if (!first) { |
842 | 48.5k | oss << ", "; |
843 | 48.5k | } |
844 | 63.2k | first = false; |
845 | 63.2k | oss << "[" << small_file_path << ", offset=" << index.offset |
846 | 63.2k | << ", size=" << index.size << "]"; |
847 | 63.2k | } |
848 | 14.7k | VLOG_DEBUG << oss.str(); |
849 | 14.7k | } else { |
850 | 0 | VLOG_DEBUG << "Uploading packed file " << packed_file_path << " with no small files"; |
851 | 0 | } |
852 | | |
853 | 14.7k | Status upload_status = finalize_packed_file_upload(packed_file->packed_file_path, |
854 | 14.7k | packed_file->writer.get()); |
855 | | |
856 | 14.7k | if (upload_status.is<ErrorCode::ALREADY_CLOSED>()) { |
857 | 0 | record_ready_to_upload(packed_file); |
858 | 0 | handle_success(packed_file); |
859 | 0 | continue; |
860 | 0 | } |
861 | 14.7k | if (!upload_status.ok()) { |
862 | 0 | handle_failure(packed_file, upload_status); |
863 | 0 | continue; |
864 | 0 | } |
865 | | |
866 | 14.7k | record_ready_to_upload(packed_file); |
867 | 14.7k | packed_file->state = PackedFileState::UPLOADING; |
868 | 14.7k | } |
869 | | |
870 | 414k | for (auto& packed_file : files_uploading) { |
871 | 63.4k | if (!packed_file->writer) { |
872 | 0 | handle_failure(packed_file, |
873 | 0 | Status::InternalError("File writer is null for packed file: {}", |
874 | 0 | packed_file->packed_file_path)); |
875 | 0 | continue; |
876 | 0 | } |
877 | | |
878 | 63.4k | if (packed_file->writer->state() != FileWriter::State::CLOSED) { |
879 | 33.9k | continue; |
880 | 33.9k | } |
881 | | |
882 | 29.4k | Status status = packed_file->writer->close(true); |
883 | 29.4k | if (status.is<ErrorCode::ALREADY_CLOSED>()) { |
884 | 14.7k | handle_success(packed_file); |
885 | 14.7k | continue; |
886 | 14.7k | } |
887 | 14.7k | if (status.ok()) { |
888 | 14.7k | continue; |
889 | 14.7k | } |
890 | | |
891 | 1 | handle_failure(packed_file, status); |
892 | 1 | } |
893 | 414k | } |
894 | | |
895 | | Status PackedFileManager::finalize_packed_file_upload(const std::string& packed_file_path, |
896 | 14.7k | FileWriter* writer) { |
897 | 14.7k | if (writer == nullptr) { |
898 | 1 | return Status::InternalError("File writer is null for packed file: " + packed_file_path); |
899 | 1 | } |
900 | | |
901 | 14.7k | return writer->close(true); |
902 | 14.7k | } |
903 | | |
904 | | Status PackedFileManager::update_meta_service(const std::string& packed_file_path, |
905 | 13.7k | const cloud::PackedFileInfoPB& packed_file_info) { |
906 | | #ifdef BE_TEST |
907 | | TEST_SYNC_POINT_RETURN_WITH_VALUE("PackedFileManager::update_meta_service", Status::OK(), |
908 | | packed_file_path, &packed_file_info); |
909 | | #endif |
910 | 13.7k | VLOG_DEBUG << "Updating meta service for packed file: " << packed_file_path << " with " |
911 | 0 | << packed_file_info.total_slice_num() << " small files" |
912 | 0 | << ", total bytes: " << packed_file_info.total_slice_bytes(); |
913 | | |
914 | | // Get CloudMetaMgr through StorageEngine |
915 | 13.7k | if (!config::is_cloud_mode()) { |
916 | 0 | return Status::InternalError("Storage engine is not cloud mode"); |
917 | 0 | } |
918 | | |
919 | 13.7k | auto& storage_engine = ExecEnv::GetInstance()->storage_engine(); |
920 | 13.7k | auto& cloud_meta_mgr = storage_engine.to_cloud().meta_mgr(); |
921 | 13.7k | return cloud_meta_mgr.update_packed_file_info(packed_file_path, packed_file_info); |
922 | 13.7k | } |
923 | | |
924 | 73 | void PackedFileManager::cleanup_expired_data() { |
925 | 73 | auto current_time = std::time(nullptr); |
926 | | |
927 | | // Clean up expired uploaded files |
928 | 73 | { |
929 | 73 | std::lock_guard<std::mutex> uploaded_lock(_packed_files_mutex); |
930 | 73 | auto it = _uploaded_packed_files.begin(); |
931 | 383k | while (it != _uploaded_packed_files.end()) { |
932 | 383k | if (it->second->state == PackedFileState::UPLOADED && |
933 | 383k | current_time - it->second->upload_time > config::uploaded_file_retention_seconds) { |
934 | 9.28k | it = _uploaded_packed_files.erase(it); |
935 | 374k | } else if (it->second->state == PackedFileState::FAILED && |
936 | 374k | current_time - it->second->upload_time > |
937 | 0 | config::uploaded_file_retention_seconds) { |
938 | 0 | it = _uploaded_packed_files.erase(it); |
939 | 374k | } else { |
940 | 374k | ++it; |
941 | 374k | } |
942 | 383k | } |
943 | 73 | } |
944 | | |
945 | | // Clean up expired global index entries |
946 | 73 | { |
947 | 73 | std::lock_guard<std::mutex> global_lock(_global_index_mutex); |
948 | 73 | auto it = _global_slice_locations.begin(); |
949 | 1.72M | while (it != _global_slice_locations.end()) { |
950 | 1.72M | const auto& index = it->second; |
951 | 1.72M | if (index.create_time > 0 && |
952 | 1.72M | current_time - index.create_time > config::uploaded_file_retention_seconds) { |
953 | 41.4k | it = _global_slice_locations.erase(it); |
954 | 1.68M | } else { |
955 | 1.68M | ++it; |
956 | 1.68M | } |
957 | 1.72M | } |
958 | 73 | } |
959 | 73 | } |
960 | | |
961 | | #ifdef BE_TEST |
962 | | namespace { |
963 | | void reset_adder(bvar::Adder<int64_t>& adder) { |
964 | | auto current = adder.get_value(); |
965 | | if (current != 0) { |
966 | | adder << (-current); |
967 | | } |
968 | | } |
969 | | } // namespace |
970 | | |
971 | | void PackedFileManager::reset_packed_file_bvars_for_test() const { |
972 | | reset_adder(g_packed_file_total_count); |
973 | | reset_adder(g_packed_file_total_small_file_count); |
974 | | reset_adder(g_packed_file_total_size_bytes); |
975 | | g_packed_file_small_file_num_recorder.reset(); |
976 | | g_packed_file_file_size_recorder.reset(); |
977 | | g_packed_file_active_to_ready_ms_recorder.reset(); |
978 | | g_packed_file_ready_to_upload_ms_recorder.reset(); |
979 | | g_packed_file_uploading_to_uploaded_ms_recorder.reset(); |
980 | | if (auto* sampler = g_packed_file_small_file_num_recorder.get_sampler()) { |
981 | | sampler->take_sample(); |
982 | | } |
983 | | if (auto* sampler = g_packed_file_file_size_recorder.get_sampler()) { |
984 | | sampler->take_sample(); |
985 | | } |
986 | | if (auto* sampler = g_packed_file_active_to_ready_ms_recorder.get_sampler()) { |
987 | | sampler->take_sample(); |
988 | | } |
989 | | if (auto* sampler = g_packed_file_ready_to_upload_ms_recorder.get_sampler()) { |
990 | | sampler->take_sample(); |
991 | | } |
992 | | if (auto* sampler = g_packed_file_uploading_to_uploaded_ms_recorder.get_sampler()) { |
993 | | sampler->take_sample(); |
994 | | } |
995 | | } |
996 | | |
997 | | int64_t PackedFileManager::packed_file_total_count_for_test() const { |
998 | | return g_packed_file_total_count.get_value(); |
999 | | } |
1000 | | |
1001 | | int64_t PackedFileManager::packed_file_total_small_file_num_for_test() const { |
1002 | | return g_packed_file_total_small_file_count.get_value(); |
1003 | | } |
1004 | | |
1005 | | int64_t PackedFileManager::packed_file_total_size_bytes_for_test() const { |
1006 | | return g_packed_file_total_size_bytes.get_value(); |
1007 | | } |
1008 | | |
1009 | | double PackedFileManager::packed_file_avg_small_file_num_for_test() const { |
1010 | | return g_packed_file_avg_small_file_num.get_value().get_average_double(); |
1011 | | } |
1012 | | |
1013 | | double PackedFileManager::packed_file_avg_file_size_for_test() const { |
1014 | | return g_packed_file_avg_file_size_bytes.get_value().get_average_double(); |
1015 | | } |
1016 | | |
1017 | | void PackedFileManager::record_packed_file_metrics_for_test( |
1018 | | const PackedFileManager::PackedFileContext* packed_file) { |
1019 | | DCHECK(packed_file != nullptr); |
1020 | | record_packed_file_metrics(*packed_file); |
1021 | | } |
1022 | | |
1023 | | void PackedFileManager::clear_state_for_test() { |
1024 | | std::lock_guard<std::timed_mutex> cur_lock(_current_packed_file_mutex); |
1025 | | _current_packed_files.clear(); |
1026 | | { |
1027 | | std::lock_guard<std::mutex> lock(_packed_files_mutex); |
1028 | | _uploading_packed_files.clear(); |
1029 | | _uploaded_packed_files.clear(); |
1030 | | } |
1031 | | { |
1032 | | std::lock_guard<std::mutex> lock(_global_index_mutex); |
1033 | | _global_slice_locations.clear(); |
1034 | | } |
1035 | | } |
1036 | | #endif |
1037 | | |
1038 | | } // namespace doris::io |