Coverage Report

Created: 2026-05-12 18:32

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/packed_file_manager.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/packed_file_manager.h"
19
20
#include <bvar/bvar.h>
21
#include <bvar/recorder.h>
22
#include <bvar/window.h>
23
24
#include <algorithm>
25
#include <chrono>
26
#include <cstdint>
27
#include <ctime>
28
#include <functional>
29
#include <limits>
30
#include <optional>
31
#include <random>
32
#include <sstream>
33
#include <unordered_set>
34
35
#ifdef BE_TEST
36
#include "cpp/sync_point.h"
37
#endif
38
39
#include <gen_cpp/cloud.pb.h>
40
41
#include "cloud/cloud_meta_mgr.h"
42
#include "cloud/cloud_storage_engine.h"
43
#include "cloud/config.h"
44
#include "common/config.h"
45
#include "io/cache/block_file_cache.h"
46
#include "io/cache/block_file_cache_factory.h"
47
#include "io/cache/file_block.h"
48
#include "io/cache/file_cache_common.h"
49
#include "io/fs/packed_file_trailer.h"
50
#include "runtime/exec_env.h"
51
#include "storage/storage_engine.h"
52
#include "util/coding.h"
53
#include "util/slice.h"
54
#include "util/uid_util.h"
55
56
namespace doris::io {
57
58
namespace {
59
60
bvar::Adder<int64_t> g_packed_file_total_count("packed_file", "total_count");
61
bvar::Adder<int64_t> g_packed_file_total_small_file_count("packed_file", "total_small_file_num");
62
bvar::Adder<int64_t> g_packed_file_total_size_bytes("packed_file", "total_size_bytes");
63
bvar::IntRecorder g_packed_file_small_file_num_recorder;
64
bvar::IntRecorder g_packed_file_file_size_recorder;
65
bvar::Window<bvar::IntRecorder> g_packed_file_avg_small_file_num(
66
        "packed_file_avg_small_file_num", &g_packed_file_small_file_num_recorder,
67
        /*window_size=*/10);
68
bvar::Window<bvar::IntRecorder> g_packed_file_avg_file_size_bytes("packed_file_avg_file_size_bytes",
69
                                                                  &g_packed_file_file_size_recorder,
70
                                                                  /*window_size=*/10);
71
bvar::IntRecorder g_packed_file_active_to_ready_ms_recorder;
72
bvar::IntRecorder g_packed_file_ready_to_upload_ms_recorder;
73
bvar::IntRecorder g_packed_file_uploading_to_uploaded_ms_recorder;
74
bvar::Window<bvar::IntRecorder> g_packed_file_active_to_ready_ms_window(
75
        "packed_file_active_to_ready_ms", &g_packed_file_active_to_ready_ms_recorder,
76
        /*window_size=*/10);
77
bvar::Window<bvar::IntRecorder> g_packed_file_ready_to_upload_ms_window(
78
        "packed_file_ready_to_upload_ms", &g_packed_file_ready_to_upload_ms_recorder,
79
        /*window_size=*/10);
80
bvar::Window<bvar::IntRecorder> g_packed_file_uploading_to_uploaded_ms_window(
81
        "packed_file_uploading_to_uploaded_ms", &g_packed_file_uploading_to_uploaded_ms_recorder,
82
        /*window_size=*/10);
83
84
// Metrics for async small file cache write
85
bvar::Adder<int64_t> g_packed_file_cache_async_write_count("packed_file_cache",
86
                                                           "async_write_count");
87
bvar::Adder<int64_t> g_packed_file_cache_async_write_bytes("packed_file_cache",
88
                                                           "async_write_bytes");
89
90
Status append_packed_info_trailer(FileWriter* writer, const std::string& packed_file_path,
91
14.1k
                                  const cloud::PackedFileInfoPB& packed_file_info) {
92
14.1k
    if (writer == nullptr) {
93
0
        return Status::InternalError("File writer is null for packed file: {}", packed_file_path);
94
0
    }
95
14.1k
    if (writer->state() == FileWriter::State::CLOSED) {
96
0
        return Status::InternalError("File writer already closed for packed file: {}",
97
0
                                     packed_file_path);
98
0
    }
99
100
14.1k
    cloud::PackedFileFooterPB footer_pb;
101
14.1k
    footer_pb.mutable_packed_file_info()->CopyFrom(packed_file_info);
102
103
14.1k
    std::string serialized_footer;
104
14.1k
    if (!footer_pb.SerializeToString(&serialized_footer)) {
105
0
        return Status::InternalError("Failed to serialize packed file footer info for {}",
106
0
                                     packed_file_path);
107
0
    }
108
109
14.1k
    if (serialized_footer.size() >
110
14.1k
        std::numeric_limits<uint32_t>::max() - kPackedFileTrailerSuffixSize) {
111
0
        return Status::InternalError("PackedFileFooterPB too large for {}", packed_file_path);
112
0
    }
113
114
14.1k
    std::string trailer;
115
14.1k
    trailer.reserve(serialized_footer.size() + kPackedFileTrailerSuffixSize);
116
14.1k
    trailer.append(serialized_footer);
117
14.1k
    put_fixed32_le(&trailer, static_cast<uint32_t>(serialized_footer.size()));
118
14.1k
    put_fixed32_le(&trailer, kPackedFileTrailerVersion);
119
120
14.1k
    return writer->append(Slice(trailer));
121
14.1k
}
122
123
// write small file data to file cache
124
void do_write_to_file_cache(const std::string& small_file_path, const std::string& data,
125
62.3k
                            int64_t tablet_id, uint64_t expiration_time) {
126
62.3k
    if (data.empty()) {
127
0
        return;
128
0
    }
129
130
    // Generate cache key from small file path (e.g., "rowset_id_seg_id.dat")
131
62.3k
    Path path(small_file_path);
132
62.3k
    UInt128Wrapper cache_hash = BlockFileCache::hash(path.filename().native());
133
134
62.3k
    VLOG_DEBUG << "packed_file_cache_write: path=" << small_file_path
135
1
               << " filename=" << path.filename().native() << " hash=" << cache_hash.to_string()
136
1
               << " size=" << data.size() << " tablet_id=" << tablet_id
137
1
               << " expiration_time=" << expiration_time;
138
139
62.3k
    BlockFileCache* file_cache = FileCacheFactory::instance()->get_by_path(cache_hash);
140
62.3k
    if (file_cache == nullptr) {
141
0
        return; // Cache not available, skip
142
0
    }
143
144
    // Allocate cache blocks
145
62.3k
    CacheContext ctx;
146
62.3k
    ctx.cache_type = expiration_time > 0 ? FileCacheType::TTL : FileCacheType::NORMAL;
147
62.3k
    ctx.expiration_time = expiration_time;
148
62.3k
    ctx.tablet_id = tablet_id;
149
62.3k
    ReadStatistics stats;
150
62.3k
    ctx.stats = &stats;
151
152
62.3k
    FileBlocksHolder holder = file_cache->get_or_set(cache_hash, 0, data.size(), ctx);
153
154
    // Write data to cache blocks
155
62.3k
    size_t data_offset = 0;
156
62.3k
    for (auto& block : holder.file_blocks) {
157
62.3k
        if (data_offset >= data.size()) {
158
0
            break;
159
0
        }
160
62.3k
        size_t block_size = block->range().size();
161
62.3k
        size_t write_size = std::min(block_size, data.size() - data_offset);
162
163
62.3k
        if (block->state() == FileBlock::State::EMPTY) {
164
62.2k
            block->get_or_set_downloader();
165
62.2k
            if (block->is_downloader()) {
166
62.2k
                Slice s(data.data() + data_offset, write_size);
167
62.2k
                Status st = block->append(s);
168
62.2k
                if (st.ok()) {
169
62.2k
                    st = block->finalize();
170
62.2k
                }
171
62.2k
                if (!st.ok()) {
172
0
                    LOG(WARNING) << "Write small file to cache failed: " << st.msg();
173
0
                }
174
62.2k
            }
175
62.2k
        }
176
62.3k
        data_offset += write_size;
177
62.3k
    }
178
62.3k
}
179
180
// Async wrapper: submit cache write task to thread pool
181
// The data is copied into the lambda capture to ensure its lifetime extends beyond
182
// the async task execution. The original Slice may reference a buffer that gets
183
// reused or freed before the async task runs.
184
void write_small_file_to_cache_async(const std::string& small_file_path, const Slice& data,
185
62.4k
                                     int64_t tablet_id, uint64_t expiration_time) {
186
62.4k
    if (!config::enable_file_cache || data.size == 0) {
187
97
        return;
188
97
    }
189
190
    // Copy data since original buffer may be reused before async task executes
191
    // For small files (< 1MB), copy overhead is acceptable
192
62.3k
    std::string data_copy(data.data, data.size);
193
62.3k
    size_t data_size = data.size;
194
195
62.3k
    auto* thread_pool = ExecEnv::GetInstance()->s3_file_upload_thread_pool();
196
62.3k
    if (thread_pool == nullptr) {
197
        // Fallback to sync write if thread pool not available
198
0
        do_write_to_file_cache(small_file_path, data_copy, tablet_id, expiration_time);
199
0
        return;
200
0
    }
201
202
    // Track async write count and bytes
203
62.3k
    g_packed_file_cache_async_write_count << 1;
204
62.3k
    g_packed_file_cache_async_write_bytes << static_cast<int64_t>(data_size);
205
206
62.3k
    Status st = thread_pool->submit_func([path = small_file_path, data = std::move(data_copy),
207
62.3k
                                          tablet_id, data_size, expiration_time]() {
208
62.3k
        do_write_to_file_cache(path, data, tablet_id, expiration_time);
209
        // Decrement async write count after completion
210
62.3k
        g_packed_file_cache_async_write_count << -1;
211
62.3k
        g_packed_file_cache_async_write_bytes << -static_cast<int64_t>(data_size);
212
62.3k
    });
213
214
62.3k
    if (!st.ok()) {
215
        // Revert metrics since task was not submitted
216
0
        g_packed_file_cache_async_write_count << -1;
217
0
        g_packed_file_cache_async_write_bytes << -static_cast<int64_t>(data_size);
218
0
        LOG(WARNING) << "Failed to submit cache write task for " << small_file_path << ": "
219
0
                     << st.msg();
220
        // Don't block on failure, cache write is best-effort
221
0
    }
222
62.3k
}
223
224
} // namespace
225
226
203k
PackedFileManager* PackedFileManager::instance() {
227
203k
    static PackedFileManager instance;
228
203k
    return &instance;
229
203k
}
230
231
35
PackedFileManager::~PackedFileManager() {
232
35
    stop_background_manager();
233
35
}
234
235
39
Status PackedFileManager::init() {
236
39
    return Status::OK();
237
39
}
238
239
Status PackedFileManager::create_new_packed_file_context(
240
14.2k
        const std::string& resource_id, std::unique_ptr<PackedFileContext>& packed_file_ctx) {
241
14.2k
    FileSystemSPtr file_system;
242
14.2k
    RETURN_IF_ERROR(ensure_file_system(resource_id, &file_system));
243
14.2k
    if (file_system == nullptr) {
244
0
        return Status::InternalError("File system is not available for packed file creation: " +
245
0
                                     resource_id);
246
0
    }
247
248
14.2k
    auto uuid = generate_uuid_string();
249
14.2k
    auto hash_val = std::hash<std::string> {}(uuid);
250
14.2k
    uint16_t path_bucket = hash_val % 4096 + 1;
251
14.2k
    std::stringstream path_stream;
252
14.2k
    path_stream << "data/packed_file/" << path_bucket << "/" << uuid << ".bin";
253
254
14.2k
    packed_file_ctx = std::make_unique<PackedFileContext>();
255
14.2k
    const std::string relative_path = path_stream.str();
256
14.2k
    packed_file_ctx->packed_file_path = relative_path;
257
14.2k
    packed_file_ctx->create_time = std::time(nullptr);
258
14.2k
    packed_file_ctx->create_timestamp = std::chrono::steady_clock::now();
259
14.2k
    packed_file_ctx->state = PackedFileState::INIT;
260
14.2k
    packed_file_ctx->resource_id = resource_id;
261
14.2k
    packed_file_ctx->file_system = std::move(file_system);
262
263
    // Create file writer for the packed file
264
14.2k
    FileWriterPtr new_writer;
265
14.2k
    FileWriterOptions opts;
266
    // Disable write_file_cache for packed file itself.
267
    // We write file cache for each small file separately in append_small_file()
268
    // using the small file path as cache key, ensuring cache entries can be
269
    // properly cleaned up when stale rowsets are removed.
270
14.2k
    opts.write_file_cache = false;
271
14.2k
    RETURN_IF_ERROR(
272
14.2k
            packed_file_ctx->file_system->create_file(Path(relative_path), &new_writer, &opts));
273
14.2k
    packed_file_ctx->writer = std::move(new_writer);
274
275
14.2k
    return Status::OK();
276
14.2k
}
277
278
Status PackedFileManager::ensure_file_system(const std::string& resource_id,
279
14.2k
                                             FileSystemSPtr* file_system) {
280
14.2k
    if (file_system == nullptr) {
281
0
        return Status::InvalidArgument("file_system output parameter is null");
282
0
    }
283
284
14.2k
    {
285
14.2k
        std::lock_guard<std::mutex> lock(_file_system_mutex);
286
14.2k
        if (resource_id.empty()) {
287
0
            if (_default_file_system != nullptr) {
288
0
                *file_system = _default_file_system;
289
0
                return Status::OK();
290
0
            }
291
14.2k
        } else {
292
14.2k
            auto it = _file_systems.find(resource_id);
293
14.2k
            if (it != _file_systems.end()) {
294
14.2k
                *file_system = it->second;
295
14.2k
                return Status::OK();
296
14.2k
            }
297
14.2k
        }
298
14.2k
    }
299
300
3
    if (!config::is_cloud_mode()) {
301
2
        return Status::InternalError("Cloud file system is not available in local mode");
302
2
    }
303
304
1
    auto* exec_env = ExecEnv::GetInstance();
305
1
    if (exec_env == nullptr) {
306
0
        return Status::InternalError("ExecEnv instance is not initialized");
307
0
    }
308
309
1
    FileSystemSPtr resolved_fs;
310
1
    if (resource_id.empty()) {
311
0
        resolved_fs = exec_env->storage_engine().to_cloud().latest_fs();
312
0
        if (resolved_fs == nullptr) {
313
0
            return Status::InternalError("Cloud file system is not ready");
314
0
        }
315
1
    } else {
316
1
        auto storage_resource =
317
1
                exec_env->storage_engine().to_cloud().get_storage_resource(resource_id);
318
1
        if (!storage_resource.has_value() || storage_resource->fs == nullptr) {
319
0
            return Status::InternalError("Cloud file system is not ready for resource: " +
320
0
                                         resource_id);
321
0
        }
322
1
        resolved_fs = storage_resource->fs;
323
1
    }
324
325
1
    {
326
1
        std::lock_guard<std::mutex> lock(_file_system_mutex);
327
1
        if (resource_id.empty()) {
328
0
            _default_file_system = resolved_fs;
329
0
            *file_system = _default_file_system;
330
1
        } else {
331
1
            _file_systems[resource_id] = resolved_fs;
332
1
            *file_system = resolved_fs;
333
1
        }
334
1
    }
335
336
1
    return Status::OK();
337
1
}
338
339
Status PackedFileManager::append_small_file(const std::string& path, const Slice& data,
340
62.4k
                                            const PackedAppendContext& info) {
341
    // Check if file is too large to be merged
342
62.4k
    if (data.get_size() > config::small_file_threshold_bytes) {
343
3
        return Status::OK(); // Skip merging for large files
344
3
    }
345
346
62.4k
    if (info.txn_id <= 0) {
347
1
        return Status::InvalidArgument("Missing valid txn id for packed file append: " + path);
348
1
    }
349
350
62.4k
    std::lock_guard<std::timed_mutex> lock(_current_packed_file_mutex);
351
352
62.4k
    auto& current_state = _current_packed_files[info.resource_id];
353
62.4k
    if (!current_state || !current_state->writer) {
354
13
        RETURN_IF_ERROR(create_new_packed_file_context(info.resource_id, current_state));
355
13
    }
356
357
    // Check if we need to create a new packed file
358
62.4k
    if (current_state->total_size + data.get_size() >= config::packed_file_size_threshold_bytes) {
359
16
        RETURN_IF_ERROR(mark_current_packed_file_for_upload_locked(info.resource_id));
360
16
        auto it = _current_packed_files.find(info.resource_id);
361
16
        if (it == _current_packed_files.end() || !it->second || !it->second->writer) {
362
0
            RETURN_IF_ERROR(create_new_packed_file_context(
363
0
                    info.resource_id, _current_packed_files[info.resource_id]));
364
0
        }
365
16
    }
366
367
62.4k
    PackedFileContext* active_state = current_state.get();
368
369
    // Write data to current packed file
370
62.4k
    RETURN_IF_ERROR(active_state->writer->append(data));
371
372
    // Async write data to file cache using small file path as cache key.
373
    // This ensures cache key matches the cleanup key in Rowset::clear_cache(),
374
    // allowing proper cache cleanup when stale rowsets are removed.
375
62.4k
    if (info.write_file_cache) {
376
62.4k
        write_small_file_to_cache_async(path, data, info.tablet_id, info.expiration_time);
377
62.4k
    }
378
379
    // Update index
380
62.4k
    PackedSliceLocation location;
381
62.4k
    location.packed_file_path = active_state->packed_file_path;
382
62.4k
    location.offset = active_state->current_offset;
383
62.4k
    location.size = data.get_size();
384
62.4k
    location.create_time = std::time(nullptr);
385
62.4k
    location.tablet_id = info.tablet_id;
386
62.4k
    location.rowset_id = info.rowset_id;
387
62.4k
    location.resource_id = info.resource_id;
388
62.4k
    location.txn_id = info.txn_id;
389
390
62.4k
    active_state->slice_locations[path] = location;
391
62.4k
    active_state->current_offset += data.get_size();
392
62.4k
    active_state->total_size += data.get_size();
393
394
    // Rotate packed file when small file count reaches threshold
395
62.4k
    if (config::packed_file_small_file_count_threshold > 0 &&
396
62.4k
        static_cast<int64_t>(active_state->slice_locations.size()) >=
397
62.4k
                config::packed_file_small_file_count_threshold) {
398
5
        RETURN_IF_ERROR(mark_current_packed_file_for_upload_locked(info.resource_id));
399
5
    }
400
401
    // Mark as active if this is the first write
402
62.4k
    if (!active_state->first_append_timestamp.has_value()) {
403
14.1k
        active_state->first_append_timestamp = std::chrono::steady_clock::now();
404
14.1k
    }
405
62.4k
    if (active_state->state == PackedFileState::INIT) {
406
14.1k
        active_state->state = PackedFileState::ACTIVE;
407
14.1k
    }
408
409
    // Update global index
410
62.4k
    {
411
62.4k
        std::lock_guard<std::mutex> global_lock(_global_index_mutex);
412
62.4k
        _global_slice_locations[path] = location;
413
62.4k
    }
414
415
62.4k
    return Status::OK();
416
62.4k
}
417
418
42.2k
Status PackedFileManager::wait_for_packed_file_upload(PackedFileContext* packed_file_ptr) {
419
42.2k
    std::unique_lock<std::mutex> upload_lock(packed_file_ptr->upload_mutex);
420
84.4k
    packed_file_ptr->upload_cv.wait(upload_lock, [packed_file_ptr] {
421
84.4k
        auto state = packed_file_ptr->state.load(std::memory_order_acquire);
422
84.4k
        return state == PackedFileState::UPLOADED || state == PackedFileState::FAILED;
423
84.4k
    });
424
42.2k
    if (packed_file_ptr->state == PackedFileState::FAILED) {
425
0
        std::string err = packed_file_ptr->last_error;
426
0
        if (err.empty()) {
427
0
            err = "Packed file upload failed";
428
0
        }
429
0
        return Status::InternalError(err);
430
0
    }
431
42.2k
    return Status::OK();
432
42.2k
}
433
434
62.3k
Status PackedFileManager::wait_upload_done(const std::string& path) {
435
62.3k
    std::string packed_file_path;
436
62.3k
    {
437
62.3k
        std::lock_guard<std::mutex> global_lock(_global_index_mutex);
438
62.3k
        auto it = _global_slice_locations.find(path);
439
62.3k
        if (it == _global_slice_locations.end()) {
440
1
            return Status::InternalError("File not found in global index: " + path);
441
1
        }
442
62.3k
        packed_file_path = it->second.packed_file_path;
443
62.3k
    }
444
445
    // Find the packed file in uploaded files first - if already uploaded, no need to wait
446
0
    std::shared_ptr<PackedFileContext> managed_packed_file;
447
62.3k
    std::shared_ptr<PackedFileContext> failed_packed_file;
448
62.3k
    {
449
62.3k
        std::lock_guard<std::mutex> lock(_packed_files_mutex);
450
62.3k
        auto uploaded_it = _uploaded_packed_files.find(packed_file_path);
451
62.3k
        if (uploaded_it != _uploaded_packed_files.end()) {
452
20.1k
            auto state = uploaded_it->second->state.load(std::memory_order_acquire);
453
20.1k
            if (state == PackedFileState::UPLOADED) {
454
20.1k
                return Status::OK(); // Already uploaded, no need to wait
455
20.1k
            }
456
1
            if (state == PackedFileState::FAILED) {
457
1
                failed_packed_file = uploaded_it->second;
458
1
            } else {
459
0
                managed_packed_file = uploaded_it->second;
460
0
            }
461
1
        }
462
62.3k
    }
463
464
42.2k
    if (failed_packed_file) {
465
1
        std::lock_guard<std::mutex> upload_lock(failed_packed_file->upload_mutex);
466
1
        std::string err = failed_packed_file->last_error;
467
1
        if (err.empty()) {
468
0
            err = "Merge file upload failed";
469
0
        }
470
1
        return Status::InternalError(err);
471
1
    }
472
473
    // Find the packed file in either current or uploading files
474
42.2k
    PackedFileContext* packed_file_ptr = nullptr;
475
42.2k
    {
476
42.2k
        std::lock_guard<std::timed_mutex> current_lock(_current_packed_file_mutex);
477
46.8k
        for (auto& [resource_id, state] : _current_packed_files) {
478
46.8k
            if (state && state->packed_file_path == packed_file_path) {
479
41.4k
                packed_file_ptr = state.get();
480
41.4k
                break;
481
41.4k
            }
482
46.8k
        }
483
42.2k
    }
484
485
42.2k
    if (!packed_file_ptr) {
486
834
        std::lock_guard<std::mutex> lock(_packed_files_mutex);
487
834
        auto uploading_it = _uploading_packed_files.find(packed_file_path);
488
834
        if (uploading_it != _uploading_packed_files.end()) {
489
825
            managed_packed_file = uploading_it->second;
490
825
            packed_file_ptr = managed_packed_file.get();
491
825
        } else {
492
9
            auto uploaded_it = _uploaded_packed_files.find(packed_file_path);
493
9
            if (uploaded_it != _uploaded_packed_files.end()) {
494
9
                managed_packed_file = uploaded_it->second;
495
9
                packed_file_ptr = managed_packed_file.get();
496
9
            }
497
9
        }
498
834
    }
499
500
42.2k
    if (!packed_file_ptr) {
501
        // Packed file not found in any location, this is unexpected
502
0
        return Status::InternalError("Packed file not found for path: " + path);
503
0
    }
504
505
42.2k
    Status wait_status = wait_for_packed_file_upload(packed_file_ptr);
506
42.2k
    (void)managed_packed_file; // keep shared ownership alive during wait
507
42.2k
    return wait_status;
508
42.2k
}
509
510
Status PackedFileManager::get_packed_slice_location(const std::string& path,
511
142k
                                                    PackedSliceLocation* location) {
512
142k
    std::lock_guard<std::mutex> lock(_global_index_mutex);
513
142k
    auto it = _global_slice_locations.find(path);
514
142k
    if (it == _global_slice_locations.end()) {
515
1
        return Status::NotFound("File not found in global packed index: {}", path);
516
1
    }
517
518
142k
    *location = it->second;
519
142k
    return Status::OK();
520
142k
}
521
522
3
void PackedFileManager::start_background_manager() {
523
3
    if (_background_thread) {
524
0
        return; // Already started
525
0
    }
526
527
3
    _stop_background_thread = false;
528
3
    _background_thread = std::make_unique<std::thread>([this] { background_manager(); });
529
3
}
530
531
41
void PackedFileManager::stop_background_manager() {
532
41
    _stop_background_thread = true;
533
41
    if (_background_thread && _background_thread->joinable()) {
534
2
        _background_thread->join();
535
2
    }
536
41
    _background_thread.reset();
537
41
}
538
539
Status PackedFileManager::mark_current_packed_file_for_upload_locked(
540
14.1k
        const std::string& resource_id) {
541
14.1k
    auto it = _current_packed_files.find(resource_id);
542
14.1k
    if (it == _current_packed_files.end() || !it->second || !it->second->writer) {
543
0
        return Status::OK(); // Nothing to mark for upload
544
0
    }
545
546
14.1k
    auto& current = it->second;
547
548
    // Mark as ready for upload
549
14.1k
    current->state = PackedFileState::READY_TO_UPLOAD;
550
14.1k
    if (!current->ready_to_upload_timestamp.has_value()) {
551
14.1k
        auto now = std::chrono::steady_clock::now();
552
14.1k
        current->ready_to_upload_timestamp = now;
553
14.1k
        int64_t active_to_ready_ms = -1;
554
14.1k
        if (current->first_append_timestamp.has_value()) {
555
14.1k
            active_to_ready_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
556
14.1k
                                         now - *current->first_append_timestamp)
557
14.1k
                                         .count();
558
14.1k
            g_packed_file_active_to_ready_ms_recorder << active_to_ready_ms;
559
14.1k
            if (auto* sampler = g_packed_file_active_to_ready_ms_recorder.get_sampler()) {
560
14.1k
                sampler->take_sample();
561
14.1k
            }
562
14.1k
        }
563
14.1k
        VLOG_DEBUG << "Packed file " << current->packed_file_path
564
0
                   << " transition ACTIVE->READY_TO_UPLOAD; active_to_ready_ms="
565
0
                   << active_to_ready_ms;
566
14.1k
    }
567
568
    // Move to uploading files list
569
14.1k
    {
570
14.1k
        std::shared_ptr<PackedFileContext> uploading_ptr =
571
14.1k
                std::shared_ptr<PackedFileContext>(std::move(current));
572
14.1k
        std::lock_guard<std::mutex> lock(_packed_files_mutex);
573
14.1k
        _uploading_packed_files[uploading_ptr->packed_file_path] = uploading_ptr;
574
14.1k
    }
575
576
    // Create new packed file
577
14.1k
    return create_new_packed_file_context(resource_id, it->second);
578
14.1k
}
579
580
14.1k
Status PackedFileManager::mark_current_packed_file_for_upload(const std::string& resource_id) {
581
14.1k
    std::lock_guard<std::timed_mutex> lock(_current_packed_file_mutex);
582
14.1k
    return mark_current_packed_file_for_upload_locked(resource_id);
583
14.1k
}
584
585
14.1k
void PackedFileManager::record_packed_file_metrics(const PackedFileContext& packed_file) {
586
14.1k
    g_packed_file_total_count << 1;
587
14.1k
    g_packed_file_total_small_file_count
588
14.1k
            << static_cast<int64_t>(packed_file.slice_locations.size());
589
14.1k
    g_packed_file_total_size_bytes << packed_file.total_size;
590
14.1k
    g_packed_file_small_file_num_recorder
591
14.1k
            << static_cast<int64_t>(packed_file.slice_locations.size());
592
14.1k
    g_packed_file_file_size_recorder << packed_file.total_size;
593
    // Flush samplers immediately so the window bvar reflects the latest packed file.
594
14.1k
    if (auto* sampler = g_packed_file_small_file_num_recorder.get_sampler()) {
595
14.1k
        sampler->take_sample();
596
14.1k
    }
597
14.1k
    if (auto* sampler = g_packed_file_file_size_recorder.get_sampler()) {
598
14.1k
        sampler->take_sample();
599
14.1k
    }
600
14.1k
}
601
602
3
void PackedFileManager::background_manager() {
603
3
    auto last_cleanup_time = std::chrono::steady_clock::now();
604
605
378k
    while (!_stop_background_thread.load()) {
606
378k
        int64_t check_interval_ms =
607
378k
                std::max<int64_t>(1, config::packed_file_time_threshold_ms / 10);
608
378k
        std::this_thread::sleep_for(std::chrono::milliseconds(check_interval_ms));
609
610
        // Check if current packed file should be closed due to time threshold
611
378k
        std::vector<std::string> resources_to_mark;
612
378k
        {
613
378k
            std::unique_lock<std::timed_mutex> current_lock(_current_packed_file_mutex,
614
378k
                                                            std::defer_lock);
615
378k
            int64_t lock_wait_ms = std::max<int64_t>(0, config::packed_file_try_lock_timeout_ms);
616
378k
            if (current_lock.try_lock_for(std::chrono::milliseconds(lock_wait_ms))) {
617
384k
                for (auto& [resource_id, state] : _current_packed_files) {
618
384k
                    if (!state || state->state != PackedFileState::ACTIVE) {
619
241k
                        continue;
620
241k
                    }
621
142k
                    if (!state->first_append_timestamp.has_value()) {
622
0
                        continue;
623
0
                    }
624
142k
                    auto current_time = std::chrono::steady_clock::now();
625
142k
                    auto elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
626
142k
                                              current_time - *(state->first_append_timestamp))
627
142k
                                              .count();
628
142k
                    if (elapsed_ms >= config::packed_file_time_threshold_ms) {
629
14.1k
                        resources_to_mark.push_back(resource_id);
630
14.1k
                    }
631
142k
                }
632
376k
            }
633
378k
        }
634
378k
        for (const auto& resource_id : resources_to_mark) {
635
14.1k
            Status st = mark_current_packed_file_for_upload(resource_id);
636
14.1k
            if (!st.ok()) {
637
0
                LOG(WARNING) << "Failed to close current packed file for resource " << resource_id
638
0
                             << ": " << st.to_string();
639
0
            }
640
14.1k
        }
641
642
        // Process uploading files
643
378k
        process_uploading_packed_files();
644
645
378k
        auto now = std::chrono::steady_clock::now();
646
378k
        int64_t cleanup_interval_sec =
647
378k
                std::max<int64_t>(1, config::packed_file_cleanup_interval_seconds);
648
378k
        auto cleanup_interval = std::chrono::seconds(cleanup_interval_sec);
649
378k
        if (now - last_cleanup_time >= cleanup_interval) {
650
65
            cleanup_expired_data();
651
65
            last_cleanup_time = now;
652
65
        }
653
378k
    }
654
3
}
655
656
378k
void PackedFileManager::process_uploading_packed_files() {
657
378k
    std::vector<std::shared_ptr<PackedFileContext>> files_ready;
658
378k
    std::vector<std::shared_ptr<PackedFileContext>> files_uploading;
659
378k
    auto record_ready_to_upload = [&](const std::shared_ptr<PackedFileContext>& packed_file) {
660
13.1k
        if (!packed_file->uploading_timestamp.has_value()) {
661
13.1k
            packed_file->uploading_timestamp = std::chrono::steady_clock::now();
662
13.1k
            int64_t duration_ms = -1;
663
13.1k
            if (packed_file->ready_to_upload_timestamp.has_value()) {
664
13.1k
                duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
665
13.1k
                                      *packed_file->uploading_timestamp -
666
13.1k
                                      *packed_file->ready_to_upload_timestamp)
667
13.1k
                                      .count();
668
13.1k
                g_packed_file_ready_to_upload_ms_recorder << duration_ms;
669
13.1k
                if (auto* sampler = g_packed_file_ready_to_upload_ms_recorder.get_sampler()) {
670
13.1k
                    sampler->take_sample();
671
13.1k
                }
672
13.1k
            }
673
13.1k
            VLOG_DEBUG << "Packed file " << packed_file->packed_file_path
674
0
                       << " transition READY_TO_UPLOAD->UPLOADING; "
675
0
                          "ready_to_upload_ms="
676
0
                       << duration_ms;
677
13.1k
        }
678
13.1k
    };
679
680
378k
    {
681
378k
        std::lock_guard<std::mutex> lock(_packed_files_mutex);
682
378k
        for (auto& [packed_file_path, packed_file] : _uploading_packed_files) {
683
66.0k
            auto state = packed_file->state.load(std::memory_order_acquire);
684
66.0k
            if (state != PackedFileState::READY_TO_UPLOAD && state != PackedFileState::UPLOADING) {
685
0
                continue;
686
0
            }
687
66.0k
            if (state == PackedFileState::READY_TO_UPLOAD) {
688
14.1k
                files_ready.emplace_back(packed_file);
689
51.9k
            } else {
690
51.9k
                files_uploading.emplace_back(packed_file);
691
51.9k
            }
692
66.0k
        }
693
378k
    }
694
695
378k
    auto handle_success = [&](const std::shared_ptr<PackedFileContext>& packed_file) {
696
13.1k
        auto now = std::chrono::steady_clock::now();
697
13.1k
        int64_t active_to_ready_ms = -1;
698
13.1k
        int64_t ready_to_upload_ms = -1;
699
13.1k
        int64_t uploading_to_uploaded_ms = -1;
700
13.1k
        if (packed_file->first_append_timestamp.has_value() &&
701
13.1k
            packed_file->ready_to_upload_timestamp.has_value()) {
702
13.1k
            active_to_ready_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
703
13.1k
                                         *packed_file->ready_to_upload_timestamp -
704
13.1k
                                         *packed_file->first_append_timestamp)
705
13.1k
                                         .count();
706
13.1k
        }
707
13.1k
        if (packed_file->ready_to_upload_timestamp.has_value() &&
708
13.1k
            packed_file->uploading_timestamp.has_value()) {
709
13.1k
            ready_to_upload_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
710
13.1k
                                         *packed_file->uploading_timestamp -
711
13.1k
                                         *packed_file->ready_to_upload_timestamp)
712
13.1k
                                         .count();
713
13.1k
        }
714
13.1k
        if (packed_file->uploading_timestamp.has_value()) {
715
13.1k
            uploading_to_uploaded_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
716
13.1k
                                               now - *packed_file->uploading_timestamp)
717
13.1k
                                               .count();
718
13.1k
            g_packed_file_uploading_to_uploaded_ms_recorder << uploading_to_uploaded_ms;
719
13.1k
            if (auto* sampler = g_packed_file_uploading_to_uploaded_ms_recorder.get_sampler()) {
720
13.1k
                sampler->take_sample();
721
13.1k
            }
722
13.1k
        }
723
13.1k
        int64_t total_ms = -1;
724
13.1k
        if (packed_file->first_append_timestamp.has_value()) {
725
13.1k
            total_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
726
13.1k
                               now - *packed_file->first_append_timestamp)
727
13.1k
                               .count();
728
13.1k
        }
729
13.1k
        std::ostringstream slices_stream;
730
13.1k
        bool first_slice = true;
731
61.4k
        for (const auto& [small_file_path, index] : packed_file->slice_locations) {
732
61.4k
            if (!first_slice) {
733
48.2k
                slices_stream << "; ";
734
48.2k
            }
735
61.4k
            first_slice = false;
736
61.4k
            slices_stream << small_file_path << "(txn=" << index.txn_id
737
61.4k
                          << ", offset=" << index.offset << ", size=" << index.size << ")";
738
739
            // Update packed_file_size in global index
740
61.4k
            {
741
61.4k
                std::lock_guard<std::mutex> global_lock(_global_index_mutex);
742
61.4k
                auto it = _global_slice_locations.find(small_file_path);
743
61.4k
                if (it != _global_slice_locations.end()) {
744
61.4k
                    it->second.packed_file_size = packed_file->total_size;
745
61.4k
                }
746
61.4k
            }
747
61.4k
        }
748
13.1k
        LOG(INFO) << "Packed file " << packed_file->packed_file_path
749
13.1k
                  << " uploaded; slices=" << packed_file->slice_locations.size()
750
13.1k
                  << ", total_bytes=" << packed_file->total_size << ", slice_detail=["
751
13.1k
                  << slices_stream.str() << "]"
752
13.1k
                  << ", active_to_ready_ms=" << active_to_ready_ms
753
13.1k
                  << ", ready_to_upload_ms=" << ready_to_upload_ms
754
13.1k
                  << ", uploading_to_uploaded_ms=" << uploading_to_uploaded_ms
755
13.1k
                  << ", total_ms=" << total_ms;
756
13.1k
        {
757
13.1k
            std::lock_guard<std::mutex> upload_lock(packed_file->upload_mutex);
758
13.1k
            packed_file->state = PackedFileState::UPLOADED;
759
13.1k
            packed_file->upload_time = std::time(nullptr);
760
13.1k
        }
761
13.1k
        packed_file->upload_cv.notify_all();
762
13.1k
        {
763
13.1k
            std::lock_guard<std::mutex> lock(_packed_files_mutex);
764
13.1k
            _uploading_packed_files.erase(packed_file->packed_file_path);
765
13.1k
            _uploaded_packed_files[packed_file->packed_file_path] = packed_file;
766
13.1k
        }
767
13.1k
    };
768
769
378k
    auto handle_failure = [&](const std::shared_ptr<PackedFileContext>& packed_file,
770
378k
                              const Status& status) {
771
0
        LOG(WARNING) << "Failed to upload packed file: " << packed_file->packed_file_path
772
0
                     << ", error: " << status.to_string();
773
0
        {
774
0
            std::lock_guard<std::mutex> upload_lock(packed_file->upload_mutex);
775
0
            packed_file->state = PackedFileState::FAILED;
776
0
            packed_file->last_error = status.to_string();
777
0
            packed_file->upload_time = std::time(nullptr);
778
0
        }
779
0
        packed_file->upload_cv.notify_all();
780
0
        {
781
0
            std::lock_guard<std::mutex> lock(_packed_files_mutex);
782
0
            _uploading_packed_files.erase(packed_file->packed_file_path);
783
0
            _uploaded_packed_files[packed_file->packed_file_path] = packed_file;
784
0
        }
785
0
    };
786
787
378k
    for (auto& packed_file : files_ready) {
788
14.1k
        const std::string& packed_file_path = packed_file->packed_file_path;
789
14.1k
        cloud::PackedFileInfoPB packed_file_info;
790
14.1k
        packed_file_info.set_ref_cnt(packed_file->slice_locations.size());
791
14.1k
        packed_file_info.set_total_slice_num(packed_file->slice_locations.size());
792
14.1k
        packed_file_info.set_total_slice_bytes(packed_file->total_size);
793
14.1k
        packed_file_info.set_remaining_slice_bytes(packed_file->total_size);
794
14.1k
        packed_file_info.set_created_at_sec(packed_file->create_time);
795
14.1k
        packed_file_info.set_corrected(false);
796
14.1k
        packed_file_info.set_state(doris::cloud::PackedFileInfoPB::NORMAL);
797
14.1k
        packed_file_info.set_resource_id(packed_file->resource_id);
798
799
62.4k
        for (const auto& [small_file_path, index] : packed_file->slice_locations) {
800
62.4k
            auto* small_file = packed_file_info.add_slices();
801
62.4k
            small_file->set_path(small_file_path);
802
62.4k
            small_file->set_offset(index.offset);
803
62.4k
            small_file->set_size(index.size);
804
62.4k
            small_file->set_deleted(false);
805
62.4k
            if (index.tablet_id != 0) {
806
62.4k
                small_file->set_tablet_id(index.tablet_id);
807
62.4k
            }
808
62.4k
            if (!index.rowset_id.empty()) {
809
62.4k
                small_file->set_rowset_id(index.rowset_id);
810
62.4k
            }
811
62.4k
            if (index.txn_id != 0) {
812
62.4k
                small_file->set_txn_id(index.txn_id);
813
62.4k
            }
814
62.4k
        }
815
816
14.1k
        Status meta_status = update_meta_service(packed_file->packed_file_path, packed_file_info);
817
14.1k
        if (!meta_status.ok()) {
818
2
            LOG(WARNING) << "Failed to update meta service for packed file: "
819
2
                         << packed_file->packed_file_path << ", error: " << meta_status.to_string();
820
2
            handle_failure(packed_file, meta_status);
821
2
            continue;
822
2
        }
823
824
        // Record stats once the packed file metadata is persisted.
825
14.1k
        record_packed_file_metrics(*packed_file);
826
827
14.1k
        Status trailer_status = append_packed_info_trailer(
828
14.1k
                packed_file->writer.get(), packed_file->packed_file_path, packed_file_info);
829
14.1k
        if (!trailer_status.ok()) {
830
0
            handle_failure(packed_file, trailer_status);
831
0
            continue;
832
0
        }
833
834
        // Now upload the file
835
14.1k
        if (!packed_file->slice_locations.empty()) {
836
14.1k
            std::ostringstream oss;
837
14.1k
            oss << "Uploading packed file " << packed_file_path << " with "
838
14.1k
                << packed_file->slice_locations.size() << " small files: ";
839
14.1k
            bool first = true;
840
62.4k
            for (const auto& [small_file_path, index] : packed_file->slice_locations) {
841
62.4k
                if (!first) {
842
48.2k
                    oss << ", ";
843
48.2k
                }
844
62.4k
                first = false;
845
62.4k
                oss << "[" << small_file_path << ", offset=" << index.offset
846
62.4k
                    << ", size=" << index.size << "]";
847
62.4k
            }
848
14.1k
            VLOG_DEBUG << oss.str();
849
14.1k
        } else {
850
0
            VLOG_DEBUG << "Uploading packed file " << packed_file_path << " with no small files";
851
0
        }
852
853
14.1k
        Status upload_status = finalize_packed_file_upload(packed_file->packed_file_path,
854
14.1k
                                                           packed_file->writer.get());
855
856
14.1k
        if (!upload_status.ok()) {
857
0
            handle_failure(packed_file, upload_status);
858
0
            continue;
859
0
        }
860
861
14.1k
        record_ready_to_upload(packed_file);
862
14.1k
        packed_file->state = PackedFileState::UPLOADING;
863
14.1k
    }
864
865
378k
    for (auto& packed_file : files_uploading) {
866
51.9k
        if (!packed_file->writer) {
867
0
            handle_failure(packed_file,
868
0
                           Status::InternalError("File writer is null for packed file: {}",
869
0
                                                 packed_file->packed_file_path));
870
0
            continue;
871
0
        }
872
873
51.9k
        Status status = packed_file->writer->try_finish_close();
874
51.9k
        if (status.is<ErrorCode::NEED_SEND_AGAIN>()) {
875
37.7k
            continue;
876
37.7k
        }
877
878
14.1k
        if (status.ok()) {
879
14.1k
            handle_success(packed_file);
880
14.1k
            continue;
881
14.1k
        }
882
883
2
        handle_failure(packed_file, status);
884
2
    }
885
378k
}
886
887
Status PackedFileManager::finalize_packed_file_upload(const std::string& packed_file_path,
888
14.1k
                                                      FileWriter* writer) {
889
14.1k
    if (writer == nullptr) {
890
1
        return Status::InternalError("File writer is null for packed file: " + packed_file_path);
891
1
    }
892
893
14.1k
    return writer->close(true);
894
14.1k
}
895
896
Status PackedFileManager::update_meta_service(const std::string& packed_file_path,
897
                                              const cloud::PackedFileInfoPB& packed_file_info,
898
13.1k
                                              int64_t table_id) {
899
#ifdef BE_TEST
900
    TEST_SYNC_POINT_RETURN_WITH_VALUE("PackedFileManager::update_meta_service", Status::OK(),
901
                                      packed_file_path, &packed_file_info);
902
#endif
903
13.1k
    VLOG_DEBUG << "Updating meta service for packed file: " << packed_file_path << " with "
904
0
               << packed_file_info.total_slice_num() << " small files"
905
0
               << ", total bytes: " << packed_file_info.total_slice_bytes();
906
907
    // Get CloudMetaMgr through StorageEngine
908
13.1k
    if (!config::is_cloud_mode()) {
909
0
        return Status::InternalError("Storage engine is not cloud mode");
910
0
    }
911
912
13.1k
    auto& storage_engine = ExecEnv::GetInstance()->storage_engine();
913
13.1k
    auto& cloud_meta_mgr = storage_engine.to_cloud().meta_mgr();
914
13.1k
    return cloud_meta_mgr.update_packed_file_info(packed_file_path, packed_file_info, table_id);
915
13.1k
}
916
917
66
void PackedFileManager::cleanup_expired_data() {
918
66
    auto current_time = std::time(nullptr);
919
920
    // Clean up expired uploaded files
921
66
    {
922
66
        std::lock_guard<std::mutex> uploaded_lock(_packed_files_mutex);
923
66
        auto it = _uploaded_packed_files.begin();
924
328k
        while (it != _uploaded_packed_files.end()) {
925
328k
            if (it->second->state == PackedFileState::UPLOADED &&
926
328k
                current_time - it->second->upload_time > config::uploaded_file_retention_seconds) {
927
7.10k
                it = _uploaded_packed_files.erase(it);
928
321k
            } else if (it->second->state == PackedFileState::FAILED &&
929
321k
                       current_time - it->second->upload_time >
930
0
                               config::uploaded_file_retention_seconds) {
931
0
                it = _uploaded_packed_files.erase(it);
932
321k
            } else {
933
321k
                ++it;
934
321k
            }
935
328k
        }
936
66
    }
937
938
    // Clean up expired global index entries
939
66
    {
940
66
        std::lock_guard<std::mutex> global_lock(_global_index_mutex);
941
66
        auto it = _global_slice_locations.begin();
942
1.49M
        while (it != _global_slice_locations.end()) {
943
1.49M
            const auto& index = it->second;
944
1.49M
            if (index.create_time > 0 &&
945
1.49M
                current_time - index.create_time > config::uploaded_file_retention_seconds) {
946
32.2k
                it = _global_slice_locations.erase(it);
947
1.46M
            } else {
948
1.46M
                ++it;
949
1.46M
            }
950
1.49M
        }
951
66
    }
952
66
}
953
954
#ifdef BE_TEST
955
namespace {
956
void reset_adder(bvar::Adder<int64_t>& adder) {
957
    auto current = adder.get_value();
958
    if (current != 0) {
959
        adder << (-current);
960
    }
961
}
962
} // namespace
963
964
void PackedFileManager::reset_packed_file_bvars_for_test() const {
965
    reset_adder(g_packed_file_total_count);
966
    reset_adder(g_packed_file_total_small_file_count);
967
    reset_adder(g_packed_file_total_size_bytes);
968
    g_packed_file_small_file_num_recorder.reset();
969
    g_packed_file_file_size_recorder.reset();
970
    g_packed_file_active_to_ready_ms_recorder.reset();
971
    g_packed_file_ready_to_upload_ms_recorder.reset();
972
    g_packed_file_uploading_to_uploaded_ms_recorder.reset();
973
    if (auto* sampler = g_packed_file_small_file_num_recorder.get_sampler()) {
974
        sampler->take_sample();
975
    }
976
    if (auto* sampler = g_packed_file_file_size_recorder.get_sampler()) {
977
        sampler->take_sample();
978
    }
979
    if (auto* sampler = g_packed_file_active_to_ready_ms_recorder.get_sampler()) {
980
        sampler->take_sample();
981
    }
982
    if (auto* sampler = g_packed_file_ready_to_upload_ms_recorder.get_sampler()) {
983
        sampler->take_sample();
984
    }
985
    if (auto* sampler = g_packed_file_uploading_to_uploaded_ms_recorder.get_sampler()) {
986
        sampler->take_sample();
987
    }
988
}
989
990
int64_t PackedFileManager::packed_file_total_count_for_test() const {
991
    return g_packed_file_total_count.get_value();
992
}
993
994
int64_t PackedFileManager::packed_file_total_small_file_num_for_test() const {
995
    return g_packed_file_total_small_file_count.get_value();
996
}
997
998
int64_t PackedFileManager::packed_file_total_size_bytes_for_test() const {
999
    return g_packed_file_total_size_bytes.get_value();
1000
}
1001
1002
double PackedFileManager::packed_file_avg_small_file_num_for_test() const {
1003
    return g_packed_file_avg_small_file_num.get_value().get_average_double();
1004
}
1005
1006
double PackedFileManager::packed_file_avg_file_size_for_test() const {
1007
    return g_packed_file_avg_file_size_bytes.get_value().get_average_double();
1008
}
1009
1010
void PackedFileManager::record_packed_file_metrics_for_test(
1011
        const PackedFileManager::PackedFileContext* packed_file) {
1012
    DCHECK(packed_file != nullptr);
1013
    record_packed_file_metrics(*packed_file);
1014
}
1015
1016
void PackedFileManager::clear_state_for_test() {
1017
    std::lock_guard<std::timed_mutex> cur_lock(_current_packed_file_mutex);
1018
    _current_packed_files.clear();
1019
    {
1020
        std::lock_guard<std::mutex> lock(_packed_files_mutex);
1021
        _uploading_packed_files.clear();
1022
        _uploaded_packed_files.clear();
1023
    }
1024
    {
1025
        std::lock_guard<std::mutex> lock(_global_index_mutex);
1026
        _global_slice_locations.clear();
1027
    }
1028
}
1029
#endif
1030
1031
} // namespace doris::io