Coverage Report

Created: 2026-03-12 17:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/packed_file_manager.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/packed_file_manager.h"
19
20
#include <bvar/bvar.h>
21
#include <bvar/recorder.h>
22
#include <bvar/window.h>
23
24
#include <algorithm>
25
#include <chrono>
26
#include <cstdint>
27
#include <ctime>
28
#include <functional>
29
#include <limits>
30
#include <optional>
31
#include <random>
32
#include <sstream>
33
#include <unordered_set>
34
35
#ifdef BE_TEST
36
#include "cpp/sync_point.h"
37
#endif
38
39
#include <gen_cpp/cloud.pb.h>
40
41
#include "cloud/cloud_meta_mgr.h"
42
#include "cloud/cloud_storage_engine.h"
43
#include "cloud/config.h"
44
#include "common/config.h"
45
#include "io/cache/block_file_cache.h"
46
#include "io/cache/block_file_cache_factory.h"
47
#include "io/cache/file_block.h"
48
#include "io/cache/file_cache_common.h"
49
#include "io/fs/packed_file_trailer.h"
50
#include "runtime/exec_env.h"
51
#include "storage/storage_engine.h"
52
#include "util/coding.h"
53
#include "util/slice.h"
54
#include "util/uid_util.h"
55
56
namespace doris::io {
57
58
namespace {
59
60
bvar::Adder<int64_t> g_packed_file_total_count("packed_file", "total_count");
61
bvar::Adder<int64_t> g_packed_file_total_small_file_count("packed_file", "total_small_file_num");
62
bvar::Adder<int64_t> g_packed_file_total_size_bytes("packed_file", "total_size_bytes");
63
bvar::IntRecorder g_packed_file_small_file_num_recorder;
64
bvar::IntRecorder g_packed_file_file_size_recorder;
65
bvar::Window<bvar::IntRecorder> g_packed_file_avg_small_file_num(
66
        "packed_file_avg_small_file_num", &g_packed_file_small_file_num_recorder,
67
        /*window_size=*/10);
68
bvar::Window<bvar::IntRecorder> g_packed_file_avg_file_size_bytes("packed_file_avg_file_size_bytes",
69
                                                                  &g_packed_file_file_size_recorder,
70
                                                                  /*window_size=*/10);
71
bvar::IntRecorder g_packed_file_active_to_ready_ms_recorder;
72
bvar::IntRecorder g_packed_file_ready_to_upload_ms_recorder;
73
bvar::IntRecorder g_packed_file_uploading_to_uploaded_ms_recorder;
74
bvar::Window<bvar::IntRecorder> g_packed_file_active_to_ready_ms_window(
75
        "packed_file_active_to_ready_ms", &g_packed_file_active_to_ready_ms_recorder,
76
        /*window_size=*/10);
77
bvar::Window<bvar::IntRecorder> g_packed_file_ready_to_upload_ms_window(
78
        "packed_file_ready_to_upload_ms", &g_packed_file_ready_to_upload_ms_recorder,
79
        /*window_size=*/10);
80
bvar::Window<bvar::IntRecorder> g_packed_file_uploading_to_uploaded_ms_window(
81
        "packed_file_uploading_to_uploaded_ms", &g_packed_file_uploading_to_uploaded_ms_recorder,
82
        /*window_size=*/10);
83
84
// Metrics for async small file cache write
85
bvar::Adder<int64_t> g_packed_file_cache_async_write_count("packed_file_cache",
86
                                                           "async_write_count");
87
bvar::Adder<int64_t> g_packed_file_cache_async_write_bytes("packed_file_cache",
88
                                                           "async_write_bytes");
89
90
Status append_packed_info_trailer(FileWriter* writer, const std::string& packed_file_path,
91
14.7k
                                  const cloud::PackedFileInfoPB& packed_file_info) {
92
14.7k
    if (writer == nullptr) {
93
0
        return Status::InternalError("File writer is null for packed file: {}", packed_file_path);
94
0
    }
95
14.7k
    if (writer->state() == FileWriter::State::CLOSED) {
96
0
        return Status::InternalError("File writer already closed for packed file: {}",
97
0
                                     packed_file_path);
98
0
    }
99
100
14.7k
    cloud::PackedFileFooterPB footer_pb;
101
14.7k
    footer_pb.mutable_packed_file_info()->CopyFrom(packed_file_info);
102
103
14.7k
    std::string serialized_footer;
104
14.7k
    if (!footer_pb.SerializeToString(&serialized_footer)) {
105
0
        return Status::InternalError("Failed to serialize packed file footer info for {}",
106
0
                                     packed_file_path);
107
0
    }
108
109
14.7k
    if (serialized_footer.size() >
110
14.7k
        std::numeric_limits<uint32_t>::max() - kPackedFileTrailerSuffixSize) {
111
0
        return Status::InternalError("PackedFileFooterPB too large for {}", packed_file_path);
112
0
    }
113
114
14.7k
    std::string trailer;
115
14.7k
    trailer.reserve(serialized_footer.size() + kPackedFileTrailerSuffixSize);
116
14.7k
    trailer.append(serialized_footer);
117
14.7k
    put_fixed32_le(&trailer, static_cast<uint32_t>(serialized_footer.size()));
118
14.7k
    put_fixed32_le(&trailer, kPackedFileTrailerVersion);
119
120
14.7k
    return writer->append(Slice(trailer));
121
14.7k
}
122
123
// write small file data to file cache
124
void do_write_to_file_cache(const std::string& small_file_path, const std::string& data,
125
63.2k
                            int64_t tablet_id, uint64_t expiration_time) {
126
63.2k
    if (data.empty()) {
127
0
        return;
128
0
    }
129
130
    // Generate cache key from small file path (e.g., "rowset_id_seg_id.dat")
131
63.2k
    Path path(small_file_path);
132
63.2k
    UInt128Wrapper cache_hash = BlockFileCache::hash(path.filename().native());
133
134
63.2k
    VLOG_DEBUG << "packed_file_cache_write: path=" << small_file_path
135
2
               << " filename=" << path.filename().native() << " hash=" << cache_hash.to_string()
136
2
               << " size=" << data.size() << " tablet_id=" << tablet_id
137
2
               << " expiration_time=" << expiration_time;
138
139
63.2k
    BlockFileCache* file_cache = FileCacheFactory::instance()->get_by_path(cache_hash);
140
63.2k
    if (file_cache == nullptr) {
141
0
        return; // Cache not available, skip
142
0
    }
143
144
    // Allocate cache blocks
145
63.2k
    CacheContext ctx;
146
63.2k
    ctx.cache_type = expiration_time > 0 ? FileCacheType::TTL : FileCacheType::NORMAL;
147
63.2k
    ctx.expiration_time = expiration_time;
148
63.2k
    ctx.tablet_id = tablet_id;
149
63.2k
    ReadStatistics stats;
150
63.2k
    ctx.stats = &stats;
151
152
63.2k
    FileBlocksHolder holder = file_cache->get_or_set(cache_hash, 0, data.size(), ctx);
153
154
    // Write data to cache blocks
155
63.2k
    size_t data_offset = 0;
156
63.2k
    for (auto& block : holder.file_blocks) {
157
63.2k
        if (data_offset >= data.size()) {
158
0
            break;
159
0
        }
160
63.2k
        size_t block_size = block->range().size();
161
63.2k
        size_t write_size = std::min(block_size, data.size() - data_offset);
162
163
63.2k
        if (block->state() == FileBlock::State::EMPTY) {
164
63.2k
            block->get_or_set_downloader();
165
63.2k
            if (block->is_downloader()) {
166
63.2k
                Slice s(data.data() + data_offset, write_size);
167
63.2k
                Status st = block->append(s);
168
63.2k
                if (st.ok()) {
169
63.2k
                    st = block->finalize();
170
63.2k
                }
171
63.2k
                if (!st.ok()) {
172
0
                    LOG(WARNING) << "Write small file to cache failed: " << st.msg();
173
0
                }
174
63.2k
            }
175
63.2k
        }
176
63.2k
        data_offset += write_size;
177
63.2k
    }
178
63.2k
}
179
180
// Async wrapper: submit cache write task to thread pool
181
// The data is copied into the lambda capture to ensure its lifetime extends beyond
182
// the async task execution. The original Slice may reference a buffer that gets
183
// reused or freed before the async task runs.
184
void write_small_file_to_cache_async(const std::string& small_file_path, const Slice& data,
185
63.3k
                                     int64_t tablet_id, uint64_t expiration_time) {
186
63.3k
    if (!config::enable_file_cache || data.size == 0) {
187
80
        return;
188
80
    }
189
190
    // Copy data since original buffer may be reused before async task executes
191
    // For small files (< 1MB), copy overhead is acceptable
192
63.2k
    std::string data_copy(data.data, data.size);
193
63.2k
    size_t data_size = data.size;
194
195
63.2k
    auto* thread_pool = ExecEnv::GetInstance()->s3_file_upload_thread_pool();
196
63.2k
    if (thread_pool == nullptr) {
197
        // Fallback to sync write if thread pool not available
198
0
        do_write_to_file_cache(small_file_path, data_copy, tablet_id, expiration_time);
199
0
        return;
200
0
    }
201
202
    // Track async write count and bytes
203
63.2k
    g_packed_file_cache_async_write_count << 1;
204
63.2k
    g_packed_file_cache_async_write_bytes << static_cast<int64_t>(data_size);
205
206
63.2k
    Status st = thread_pool->submit_func([path = small_file_path, data = std::move(data_copy),
207
63.2k
                                          tablet_id, data_size, expiration_time]() {
208
63.2k
        do_write_to_file_cache(path, data, tablet_id, expiration_time);
209
        // Decrement async write count after completion
210
63.2k
        g_packed_file_cache_async_write_count << -1;
211
63.2k
        g_packed_file_cache_async_write_bytes << -static_cast<int64_t>(data_size);
212
63.2k
    });
213
214
63.2k
    if (!st.ok()) {
215
        // Revert metrics since task was not submitted
216
0
        g_packed_file_cache_async_write_count << -1;
217
0
        g_packed_file_cache_async_write_bytes << -static_cast<int64_t>(data_size);
218
0
        LOG(WARNING) << "Failed to submit cache write task for " << small_file_path << ": "
219
0
                     << st.msg();
220
        // Don't block on failure, cache write is best-effort
221
0
    }
222
63.2k
}
223
224
} // namespace
225
226
207k
PackedFileManager* PackedFileManager::instance() {
227
207k
    static PackedFileManager instance;
228
207k
    return &instance;
229
207k
}
230
231
34
PackedFileManager::~PackedFileManager() {
232
34
    stop_background_manager();
233
34
}
234
235
38
Status PackedFileManager::init() {
236
38
    return Status::OK();
237
38
}
238
239
Status PackedFileManager::create_new_packed_file_context(
240
14.8k
        const std::string& resource_id, std::unique_ptr<PackedFileContext>& packed_file_ctx) {
241
14.8k
    FileSystemSPtr file_system;
242
14.8k
    RETURN_IF_ERROR(ensure_file_system(resource_id, &file_system));
243
14.8k
    if (file_system == nullptr) {
244
0
        return Status::InternalError("File system is not available for packed file creation: " +
245
0
                                     resource_id);
246
0
    }
247
248
14.8k
    auto uuid = generate_uuid_string();
249
14.8k
    auto hash_val = std::hash<std::string> {}(uuid);
250
14.8k
    uint16_t path_bucket = hash_val % 4096 + 1;
251
14.8k
    std::stringstream path_stream;
252
14.8k
    path_stream << "data/packed_file/" << path_bucket << "/" << uuid << ".bin";
253
254
14.8k
    packed_file_ctx = std::make_unique<PackedFileContext>();
255
14.8k
    const std::string relative_path = path_stream.str();
256
14.8k
    packed_file_ctx->packed_file_path = relative_path;
257
14.8k
    packed_file_ctx->create_time = std::time(nullptr);
258
14.8k
    packed_file_ctx->create_timestamp = std::chrono::steady_clock::now();
259
14.8k
    packed_file_ctx->state = PackedFileState::INIT;
260
14.8k
    packed_file_ctx->resource_id = resource_id;
261
14.8k
    packed_file_ctx->file_system = std::move(file_system);
262
263
    // Create file writer for the packed file
264
14.8k
    FileWriterPtr new_writer;
265
14.8k
    FileWriterOptions opts;
266
    // Disable write_file_cache for packed file itself.
267
    // We write file cache for each small file separately in append_small_file()
268
    // using the small file path as cache key, ensuring cache entries can be
269
    // properly cleaned up when stale rowsets are removed.
270
14.8k
    opts.write_file_cache = false;
271
14.8k
    RETURN_IF_ERROR(
272
14.8k
            packed_file_ctx->file_system->create_file(Path(relative_path), &new_writer, &opts));
273
14.7k
    packed_file_ctx->writer = std::move(new_writer);
274
275
14.7k
    return Status::OK();
276
14.8k
}
277
278
Status PackedFileManager::ensure_file_system(const std::string& resource_id,
279
14.8k
                                             FileSystemSPtr* file_system) {
280
14.8k
    if (file_system == nullptr) {
281
0
        return Status::InvalidArgument("file_system output parameter is null");
282
0
    }
283
284
14.8k
    {
285
14.8k
        std::lock_guard<std::mutex> lock(_file_system_mutex);
286
14.8k
        if (resource_id.empty()) {
287
0
            if (_default_file_system != nullptr) {
288
0
                *file_system = _default_file_system;
289
0
                return Status::OK();
290
0
            }
291
14.8k
        } else {
292
14.8k
            auto it = _file_systems.find(resource_id);
293
14.8k
            if (it != _file_systems.end()) {
294
14.7k
                *file_system = it->second;
295
14.7k
                return Status::OK();
296
14.7k
            }
297
14.8k
        }
298
14.8k
    }
299
300
3
    if (!config::is_cloud_mode()) {
301
2
        return Status::InternalError("Cloud file system is not available in local mode");
302
2
    }
303
304
1
    auto* exec_env = ExecEnv::GetInstance();
305
1
    if (exec_env == nullptr) {
306
0
        return Status::InternalError("ExecEnv instance is not initialized");
307
0
    }
308
309
1
    FileSystemSPtr resolved_fs;
310
1
    if (resource_id.empty()) {
311
0
        resolved_fs = exec_env->storage_engine().to_cloud().latest_fs();
312
0
        if (resolved_fs == nullptr) {
313
0
            return Status::InternalError("Cloud file system is not ready");
314
0
        }
315
1
    } else {
316
1
        auto storage_resource =
317
1
                exec_env->storage_engine().to_cloud().get_storage_resource(resource_id);
318
1
        if (!storage_resource.has_value() || storage_resource->fs == nullptr) {
319
0
            return Status::InternalError("Cloud file system is not ready for resource: " +
320
0
                                         resource_id);
321
0
        }
322
1
        resolved_fs = storage_resource->fs;
323
1
    }
324
325
1
    {
326
1
        std::lock_guard<std::mutex> lock(_file_system_mutex);
327
1
        if (resource_id.empty()) {
328
0
            _default_file_system = resolved_fs;
329
0
            *file_system = _default_file_system;
330
1
        } else {
331
1
            _file_systems[resource_id] = resolved_fs;
332
1
            *file_system = resolved_fs;
333
1
        }
334
1
    }
335
336
1
    return Status::OK();
337
1
}
338
339
Status PackedFileManager::append_small_file(const std::string& path, const Slice& data,
340
63.3k
                                            const PackedAppendContext& info) {
341
    // Check if file is too large to be merged
342
63.3k
    if (data.get_size() > config::small_file_threshold_bytes) {
343
3
        return Status::OK(); // Skip merging for large files
344
3
    }
345
346
63.3k
    if (info.txn_id <= 0) {
347
1
        return Status::InvalidArgument("Missing valid txn id for packed file append: " + path);
348
1
    }
349
350
63.3k
    std::lock_guard<std::timed_mutex> lock(_current_packed_file_mutex);
351
352
63.3k
    auto& current_state = _current_packed_files[info.resource_id];
353
63.3k
    if (!current_state || !current_state->writer) {
354
13
        RETURN_IF_ERROR(create_new_packed_file_context(info.resource_id, current_state));
355
13
    }
356
357
    // Check if we need to create a new packed file
358
63.3k
    if (current_state->total_size + data.get_size() >= config::packed_file_size_threshold_bytes) {
359
4
        RETURN_IF_ERROR(mark_current_packed_file_for_upload_locked(info.resource_id));
360
4
        auto it = _current_packed_files.find(info.resource_id);
361
4
        if (it == _current_packed_files.end() || !it->second || !it->second->writer) {
362
0
            RETURN_IF_ERROR(create_new_packed_file_context(
363
0
                    info.resource_id, _current_packed_files[info.resource_id]));
364
0
        }
365
4
    }
366
367
63.3k
    PackedFileContext* active_state = current_state.get();
368
369
    // Write data to current packed file
370
63.3k
    RETURN_IF_ERROR(active_state->writer->append(data));
371
372
    // Async write data to file cache using small file path as cache key.
373
    // This ensures cache key matches the cleanup key in Rowset::clear_cache(),
374
    // allowing proper cache cleanup when stale rowsets are removed.
375
63.3k
    if (info.write_file_cache) {
376
63.3k
        write_small_file_to_cache_async(path, data, info.tablet_id, info.expiration_time);
377
63.3k
    }
378
379
    // Update index
380
63.3k
    PackedSliceLocation location;
381
63.3k
    location.packed_file_path = active_state->packed_file_path;
382
63.3k
    location.offset = active_state->current_offset;
383
63.3k
    location.size = data.get_size();
384
63.3k
    location.create_time = std::time(nullptr);
385
63.3k
    location.tablet_id = info.tablet_id;
386
63.3k
    location.rowset_id = info.rowset_id;
387
63.3k
    location.resource_id = info.resource_id;
388
63.3k
    location.txn_id = info.txn_id;
389
390
63.3k
    active_state->slice_locations[path] = location;
391
63.3k
    active_state->current_offset += data.get_size();
392
63.3k
    active_state->total_size += data.get_size();
393
394
    // Rotate packed file when small file count reaches threshold
395
63.3k
    if (config::packed_file_small_file_count_threshold > 0 &&
396
63.3k
        static_cast<int64_t>(active_state->slice_locations.size()) >=
397
63.3k
                config::packed_file_small_file_count_threshold) {
398
4
        RETURN_IF_ERROR(mark_current_packed_file_for_upload_locked(info.resource_id));
399
4
    }
400
401
    // Mark as active if this is the first write
402
63.3k
    if (!active_state->first_append_timestamp.has_value()) {
403
14.7k
        active_state->first_append_timestamp = std::chrono::steady_clock::now();
404
14.7k
    }
405
63.3k
    if (active_state->state == PackedFileState::INIT) {
406
14.7k
        active_state->state = PackedFileState::ACTIVE;
407
14.7k
    }
408
409
    // Update global index
410
63.3k
    {
411
63.3k
        std::lock_guard<std::mutex> global_lock(_global_index_mutex);
412
63.3k
        _global_slice_locations[path] = location;
413
63.3k
    }
414
415
63.3k
    return Status::OK();
416
63.3k
}
417
418
44.6k
Status PackedFileManager::wait_for_packed_file_upload(PackedFileContext* packed_file_ptr) {
419
44.6k
    std::unique_lock<std::mutex> upload_lock(packed_file_ptr->upload_mutex);
420
89.3k
    packed_file_ptr->upload_cv.wait(upload_lock, [packed_file_ptr] {
421
89.3k
        auto state = packed_file_ptr->state.load(std::memory_order_acquire);
422
89.3k
        return state == PackedFileState::UPLOADED || state == PackedFileState::FAILED;
423
89.3k
    });
424
44.6k
    if (packed_file_ptr->state == PackedFileState::FAILED) {
425
0
        std::string err = packed_file_ptr->last_error;
426
0
        if (err.empty()) {
427
0
            err = "Packed file upload failed";
428
0
        }
429
0
        return Status::InternalError(err);
430
0
    }
431
44.6k
    return Status::OK();
432
44.6k
}
433
434
63.1k
Status PackedFileManager::wait_upload_done(const std::string& path) {
435
63.1k
    std::string packed_file_path;
436
63.1k
    {
437
63.1k
        std::lock_guard<std::mutex> global_lock(_global_index_mutex);
438
63.1k
        auto it = _global_slice_locations.find(path);
439
63.1k
        if (it == _global_slice_locations.end()) {
440
1
            return Status::InternalError("File not found in global index: " + path);
441
1
        }
442
63.1k
        packed_file_path = it->second.packed_file_path;
443
63.1k
    }
444
445
    // Find the packed file in uploaded files first - if already uploaded, no need to wait
446
0
    std::shared_ptr<PackedFileContext> managed_packed_file;
447
63.1k
    std::shared_ptr<PackedFileContext> failed_packed_file;
448
63.1k
    {
449
63.1k
        std::lock_guard<std::mutex> lock(_packed_files_mutex);
450
63.1k
        auto uploaded_it = _uploaded_packed_files.find(packed_file_path);
451
63.1k
        if (uploaded_it != _uploaded_packed_files.end()) {
452
18.5k
            auto state = uploaded_it->second->state.load(std::memory_order_acquire);
453
18.5k
            if (state == PackedFileState::UPLOADED) {
454
18.5k
                return Status::OK(); // Already uploaded, no need to wait
455
18.5k
            }
456
1
            if (state == PackedFileState::FAILED) {
457
1
                failed_packed_file = uploaded_it->second;
458
1
            } else {
459
0
                managed_packed_file = uploaded_it->second;
460
0
            }
461
1
        }
462
63.1k
    }
463
464
44.6k
    if (failed_packed_file) {
465
1
        std::lock_guard<std::mutex> upload_lock(failed_packed_file->upload_mutex);
466
1
        std::string err = failed_packed_file->last_error;
467
1
        if (err.empty()) {
468
0
            err = "Merge file upload failed";
469
0
        }
470
1
        return Status::InternalError(err);
471
1
    }
472
473
    // Find the packed file in either current or uploading files
474
44.6k
    PackedFileContext* packed_file_ptr = nullptr;
475
44.6k
    {
476
44.6k
        std::lock_guard<std::timed_mutex> current_lock(_current_packed_file_mutex);
477
49.3k
        for (auto& [resource_id, state] : _current_packed_files) {
478
49.3k
            if (state && state->packed_file_path == packed_file_path) {
479
44.0k
                packed_file_ptr = state.get();
480
44.0k
                break;
481
44.0k
            }
482
49.3k
        }
483
44.6k
    }
484
485
44.6k
    if (!packed_file_ptr) {
486
577
        std::lock_guard<std::mutex> lock(_packed_files_mutex);
487
577
        auto uploading_it = _uploading_packed_files.find(packed_file_path);
488
577
        if (uploading_it != _uploading_packed_files.end()) {
489
577
            managed_packed_file = uploading_it->second;
490
577
            packed_file_ptr = managed_packed_file.get();
491
577
        } else {
492
0
            auto uploaded_it = _uploaded_packed_files.find(packed_file_path);
493
0
            if (uploaded_it != _uploaded_packed_files.end()) {
494
0
                managed_packed_file = uploaded_it->second;
495
0
                packed_file_ptr = managed_packed_file.get();
496
0
            }
497
0
        }
498
577
    }
499
500
44.6k
    if (!packed_file_ptr) {
501
        // Packed file not found in any location, this is unexpected
502
0
        return Status::InternalError("Packed file not found for path: " + path);
503
0
    }
504
505
44.6k
    Status wait_status = wait_for_packed_file_upload(packed_file_ptr);
506
44.6k
    (void)managed_packed_file; // keep shared ownership alive during wait
507
44.6k
    return wait_status;
508
44.6k
}
509
510
Status PackedFileManager::get_packed_slice_location(const std::string& path,
511
145k
                                                    PackedSliceLocation* location) {
512
145k
    std::lock_guard<std::mutex> lock(_global_index_mutex);
513
145k
    auto it = _global_slice_locations.find(path);
514
145k
    if (it == _global_slice_locations.end()) {
515
1
        return Status::NotFound("File not found in global packed index: {}", path);
516
1
    }
517
518
145k
    *location = it->second;
519
145k
    return Status::OK();
520
145k
}
521
522
3
void PackedFileManager::start_background_manager() {
523
3
    if (_background_thread) {
524
0
        return; // Already started
525
0
    }
526
527
3
    _stop_background_thread = false;
528
3
    _background_thread = std::make_unique<std::thread>([this] { background_manager(); });
529
3
}
530
531
40
void PackedFileManager::stop_background_manager() {
532
40
    _stop_background_thread = true;
533
40
    if (_background_thread && _background_thread->joinable()) {
534
2
        _background_thread->join();
535
2
    }
536
40
    _background_thread.reset();
537
40
}
538
539
Status PackedFileManager::mark_current_packed_file_for_upload_locked(
540
14.7k
        const std::string& resource_id) {
541
14.7k
    auto it = _current_packed_files.find(resource_id);
542
14.7k
    if (it == _current_packed_files.end() || !it->second || !it->second->writer) {
543
0
        return Status::OK(); // Nothing to mark for upload
544
0
    }
545
546
14.7k
    auto& current = it->second;
547
548
    // Mark as ready for upload
549
14.7k
    current->state = PackedFileState::READY_TO_UPLOAD;
550
14.7k
    if (!current->ready_to_upload_timestamp.has_value()) {
551
14.7k
        auto now = std::chrono::steady_clock::now();
552
14.7k
        current->ready_to_upload_timestamp = now;
553
14.7k
        int64_t active_to_ready_ms = -1;
554
14.7k
        if (current->first_append_timestamp.has_value()) {
555
14.7k
            active_to_ready_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
556
14.7k
                                         now - *current->first_append_timestamp)
557
14.7k
                                         .count();
558
14.7k
            g_packed_file_active_to_ready_ms_recorder << active_to_ready_ms;
559
14.7k
            if (auto* sampler = g_packed_file_active_to_ready_ms_recorder.get_sampler()) {
560
14.7k
                sampler->take_sample();
561
14.7k
            }
562
14.7k
        }
563
14.7k
        VLOG_DEBUG << "Packed file " << current->packed_file_path
564
0
                   << " transition ACTIVE->READY_TO_UPLOAD; active_to_ready_ms="
565
0
                   << active_to_ready_ms;
566
14.7k
    }
567
568
    // Move to uploading files list
569
14.7k
    {
570
14.7k
        std::shared_ptr<PackedFileContext> uploading_ptr =
571
14.7k
                std::shared_ptr<PackedFileContext>(std::move(current));
572
14.7k
        std::lock_guard<std::mutex> lock(_packed_files_mutex);
573
14.7k
        _uploading_packed_files[uploading_ptr->packed_file_path] = uploading_ptr;
574
14.7k
    }
575
576
    // Create new packed file
577
14.7k
    return create_new_packed_file_context(resource_id, it->second);
578
14.7k
}
579
580
14.7k
Status PackedFileManager::mark_current_packed_file_for_upload(const std::string& resource_id) {
581
14.7k
    std::lock_guard<std::timed_mutex> lock(_current_packed_file_mutex);
582
14.7k
    return mark_current_packed_file_for_upload_locked(resource_id);
583
14.7k
}
584
585
14.7k
void PackedFileManager::record_packed_file_metrics(const PackedFileContext& packed_file) {
586
14.7k
    g_packed_file_total_count << 1;
587
14.7k
    g_packed_file_total_small_file_count
588
14.7k
            << static_cast<int64_t>(packed_file.slice_locations.size());
589
14.7k
    g_packed_file_total_size_bytes << packed_file.total_size;
590
14.7k
    g_packed_file_small_file_num_recorder
591
14.7k
            << static_cast<int64_t>(packed_file.slice_locations.size());
592
14.7k
    g_packed_file_file_size_recorder << packed_file.total_size;
593
    // Flush samplers immediately so the window bvar reflects the latest packed file.
594
14.7k
    if (auto* sampler = g_packed_file_small_file_num_recorder.get_sampler()) {
595
14.7k
        sampler->take_sample();
596
14.7k
    }
597
14.7k
    if (auto* sampler = g_packed_file_file_size_recorder.get_sampler()) {
598
14.7k
        sampler->take_sample();
599
14.7k
    }
600
14.7k
}
601
602
3
void PackedFileManager::background_manager() {
603
3
    auto last_cleanup_time = std::chrono::steady_clock::now();
604
605
414k
    while (!_stop_background_thread.load()) {
606
414k
        int64_t check_interval_ms =
607
414k
                std::max<int64_t>(1, config::packed_file_time_threshold_ms / 10);
608
414k
        std::this_thread::sleep_for(std::chrono::milliseconds(check_interval_ms));
609
610
        // Check if current packed file should be closed due to time threshold
611
414k
        std::vector<std::string> resources_to_mark;
612
414k
        {
613
414k
            std::unique_lock<std::timed_mutex> current_lock(_current_packed_file_mutex,
614
414k
                                                            std::defer_lock);
615
414k
            int64_t lock_wait_ms = std::max<int64_t>(0, config::packed_file_try_lock_timeout_ms);
616
414k
            if (current_lock.try_lock_for(std::chrono::milliseconds(lock_wait_ms))) {
617
414k
                for (auto& [resource_id, state] : _current_packed_files) {
618
413k
                    if (!state || state->state != PackedFileState::ACTIVE) {
619
264k
                        continue;
620
264k
                    }
621
149k
                    if (!state->first_append_timestamp.has_value()) {
622
0
                        continue;
623
0
                    }
624
149k
                    auto current_time = std::chrono::steady_clock::now();
625
149k
                    auto elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
626
149k
                                              current_time - *(state->first_append_timestamp))
627
149k
                                              .count();
628
149k
                    if (elapsed_ms >= config::packed_file_time_threshold_ms) {
629
14.7k
                        resources_to_mark.push_back(resource_id);
630
14.7k
                    }
631
149k
                }
632
414k
            }
633
414k
        }
634
414k
        for (const auto& resource_id : resources_to_mark) {
635
14.7k
            Status st = mark_current_packed_file_for_upload(resource_id);
636
14.7k
            if (!st.ok()) {
637
0
                LOG(WARNING) << "Failed to close current packed file for resource " << resource_id
638
0
                             << ": " << st.to_string();
639
0
            }
640
14.7k
        }
641
642
        // Process uploading files
643
414k
        process_uploading_packed_files();
644
645
414k
        auto now = std::chrono::steady_clock::now();
646
414k
        int64_t cleanup_interval_sec =
647
414k
                std::max<int64_t>(1, config::packed_file_cleanup_interval_seconds);
648
414k
        auto cleanup_interval = std::chrono::seconds(cleanup_interval_sec);
649
414k
        if (now - last_cleanup_time >= cleanup_interval) {
650
72
            cleanup_expired_data();
651
72
            last_cleanup_time = now;
652
72
        }
653
414k
    }
654
3
}
655
656
414k
void PackedFileManager::process_uploading_packed_files() {
657
414k
    std::vector<std::shared_ptr<PackedFileContext>> files_ready;
658
414k
    std::vector<std::shared_ptr<PackedFileContext>> files_uploading;
659
414k
    auto record_ready_to_upload = [&](const std::shared_ptr<PackedFileContext>& packed_file) {
660
13.7k
        if (!packed_file->uploading_timestamp.has_value()) {
661
13.7k
            packed_file->uploading_timestamp = std::chrono::steady_clock::now();
662
13.7k
            int64_t duration_ms = -1;
663
13.7k
            if (packed_file->ready_to_upload_timestamp.has_value()) {
664
13.7k
                duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
665
13.7k
                                      *packed_file->uploading_timestamp -
666
13.7k
                                      *packed_file->ready_to_upload_timestamp)
667
13.7k
                                      .count();
668
13.7k
                g_packed_file_ready_to_upload_ms_recorder << duration_ms;
669
13.7k
                if (auto* sampler = g_packed_file_ready_to_upload_ms_recorder.get_sampler()) {
670
13.7k
                    sampler->take_sample();
671
13.7k
                }
672
13.7k
            }
673
13.7k
            VLOG_DEBUG << "Packed file " << packed_file->packed_file_path
674
0
                       << " transition READY_TO_UPLOAD->UPLOADING; "
675
0
                          "ready_to_upload_ms="
676
0
                       << duration_ms;
677
13.7k
        }
678
13.7k
    };
679
680
414k
    {
681
414k
        std::lock_guard<std::mutex> lock(_packed_files_mutex);
682
414k
        for (auto& [packed_file_path, packed_file] : _uploading_packed_files) {
683
78.1k
            auto state = packed_file->state.load(std::memory_order_acquire);
684
78.1k
            if (state != PackedFileState::READY_TO_UPLOAD && state != PackedFileState::UPLOADING) {
685
0
                continue;
686
0
            }
687
78.1k
            if (state == PackedFileState::READY_TO_UPLOAD) {
688
14.7k
                files_ready.emplace_back(packed_file);
689
63.4k
            } else {
690
63.4k
                files_uploading.emplace_back(packed_file);
691
63.4k
            }
692
78.1k
        }
693
414k
    }
694
695
414k
    auto handle_success = [&](const std::shared_ptr<PackedFileContext>& packed_file) {
696
13.7k
        auto now = std::chrono::steady_clock::now();
697
13.7k
        int64_t active_to_ready_ms = -1;
698
13.7k
        int64_t ready_to_upload_ms = -1;
699
13.7k
        int64_t uploading_to_uploaded_ms = -1;
700
13.7k
        if (packed_file->first_append_timestamp.has_value() &&
701
13.7k
            packed_file->ready_to_upload_timestamp.has_value()) {
702
13.7k
            active_to_ready_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
703
13.7k
                                         *packed_file->ready_to_upload_timestamp -
704
13.7k
                                         *packed_file->first_append_timestamp)
705
13.7k
                                         .count();
706
13.7k
        }
707
13.7k
        if (packed_file->ready_to_upload_timestamp.has_value() &&
708
13.7k
            packed_file->uploading_timestamp.has_value()) {
709
13.7k
            ready_to_upload_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
710
13.7k
                                         *packed_file->uploading_timestamp -
711
13.7k
                                         *packed_file->ready_to_upload_timestamp)
712
13.7k
                                         .count();
713
13.7k
        }
714
13.7k
        if (packed_file->uploading_timestamp.has_value()) {
715
13.7k
            uploading_to_uploaded_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
716
13.7k
                                               now - *packed_file->uploading_timestamp)
717
13.7k
                                               .count();
718
13.7k
            g_packed_file_uploading_to_uploaded_ms_recorder << uploading_to_uploaded_ms;
719
13.7k
            if (auto* sampler = g_packed_file_uploading_to_uploaded_ms_recorder.get_sampler()) {
720
13.7k
                sampler->take_sample();
721
13.7k
            }
722
13.7k
        }
723
13.7k
        int64_t total_ms = -1;
724
13.7k
        if (packed_file->first_append_timestamp.has_value()) {
725
13.7k
            total_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
726
13.7k
                               now - *packed_file->first_append_timestamp)
727
13.7k
                               .count();
728
13.7k
        }
729
13.7k
        std::ostringstream slices_stream;
730
13.7k
        bool first_slice = true;
731
62.2k
        for (const auto& [small_file_path, index] : packed_file->slice_locations) {
732
62.2k
            if (!first_slice) {
733
48.5k
                slices_stream << "; ";
734
48.5k
            }
735
62.2k
            first_slice = false;
736
62.2k
            slices_stream << small_file_path << "(txn=" << index.txn_id
737
62.2k
                          << ", offset=" << index.offset << ", size=" << index.size << ")";
738
739
            // Update packed_file_size in global index
740
62.2k
            {
741
62.2k
                std::lock_guard<std::mutex> global_lock(_global_index_mutex);
742
62.2k
                auto it = _global_slice_locations.find(small_file_path);
743
62.2k
                if (it != _global_slice_locations.end()) {
744
62.2k
                    it->second.packed_file_size = packed_file->total_size;
745
62.2k
                }
746
62.2k
            }
747
62.2k
        }
748
13.7k
        LOG(INFO) << "Packed file " << packed_file->packed_file_path
749
13.7k
                  << " uploaded; slices=" << packed_file->slice_locations.size()
750
13.7k
                  << ", total_bytes=" << packed_file->total_size << ", slice_detail=["
751
13.7k
                  << slices_stream.str() << "]"
752
13.7k
                  << ", active_to_ready_ms=" << active_to_ready_ms
753
13.7k
                  << ", ready_to_upload_ms=" << ready_to_upload_ms
754
13.7k
                  << ", uploading_to_uploaded_ms=" << uploading_to_uploaded_ms
755
13.7k
                  << ", total_ms=" << total_ms;
756
13.7k
        {
757
13.7k
            std::lock_guard<std::mutex> upload_lock(packed_file->upload_mutex);
758
13.7k
            packed_file->state = PackedFileState::UPLOADED;
759
13.7k
            packed_file->upload_time = std::time(nullptr);
760
13.7k
        }
761
13.7k
        packed_file->upload_cv.notify_all();
762
13.7k
        {
763
13.7k
            std::lock_guard<std::mutex> lock(_packed_files_mutex);
764
13.7k
            _uploading_packed_files.erase(packed_file->packed_file_path);
765
13.7k
            _uploaded_packed_files[packed_file->packed_file_path] = packed_file;
766
13.7k
        }
767
13.7k
    };
768
769
414k
    auto handle_failure = [&](const std::shared_ptr<PackedFileContext>& packed_file,
770
414k
                              const Status& status) {
771
0
        LOG(WARNING) << "Failed to upload packed file: " << packed_file->packed_file_path
772
0
                     << ", error: " << status.to_string();
773
0
        {
774
0
            std::lock_guard<std::mutex> upload_lock(packed_file->upload_mutex);
775
0
            packed_file->state = PackedFileState::FAILED;
776
0
            packed_file->last_error = status.to_string();
777
0
            packed_file->upload_time = std::time(nullptr);
778
0
        }
779
0
        packed_file->upload_cv.notify_all();
780
0
        {
781
0
            std::lock_guard<std::mutex> lock(_packed_files_mutex);
782
0
            _uploading_packed_files.erase(packed_file->packed_file_path);
783
0
            _uploaded_packed_files[packed_file->packed_file_path] = packed_file;
784
0
        }
785
0
    };
786
787
414k
    for (auto& packed_file : files_ready) {
788
14.7k
        const std::string& packed_file_path = packed_file->packed_file_path;
789
14.7k
        cloud::PackedFileInfoPB packed_file_info;
790
14.7k
        packed_file_info.set_ref_cnt(packed_file->slice_locations.size());
791
14.7k
        packed_file_info.set_total_slice_num(packed_file->slice_locations.size());
792
14.7k
        packed_file_info.set_total_slice_bytes(packed_file->total_size);
793
14.7k
        packed_file_info.set_remaining_slice_bytes(packed_file->total_size);
794
14.7k
        packed_file_info.set_created_at_sec(packed_file->create_time);
795
14.7k
        packed_file_info.set_corrected(false);
796
14.7k
        packed_file_info.set_state(doris::cloud::PackedFileInfoPB::NORMAL);
797
14.7k
        packed_file_info.set_resource_id(packed_file->resource_id);
798
799
63.2k
        for (const auto& [small_file_path, index] : packed_file->slice_locations) {
800
63.2k
            auto* small_file = packed_file_info.add_slices();
801
63.2k
            small_file->set_path(small_file_path);
802
63.2k
            small_file->set_offset(index.offset);
803
63.2k
            small_file->set_size(index.size);
804
63.2k
            small_file->set_deleted(false);
805
63.2k
            if (index.tablet_id != 0) {
806
63.2k
                small_file->set_tablet_id(index.tablet_id);
807
63.2k
            }
808
63.2k
            if (!index.rowset_id.empty()) {
809
63.2k
                small_file->set_rowset_id(index.rowset_id);
810
63.2k
            }
811
63.2k
            if (index.txn_id != 0) {
812
63.2k
                small_file->set_txn_id(index.txn_id);
813
63.2k
            }
814
63.2k
        }
815
816
14.7k
        Status meta_status = update_meta_service(packed_file->packed_file_path, packed_file_info);
817
14.7k
        if (!meta_status.ok()) {
818
2
            LOG(WARNING) << "Failed to update meta service for packed file: "
819
2
                         << packed_file->packed_file_path << ", error: " << meta_status.to_string();
820
2
            handle_failure(packed_file, meta_status);
821
2
            continue;
822
2
        }
823
824
        // Record stats once the packed file metadata is persisted.
825
14.7k
        record_packed_file_metrics(*packed_file);
826
827
14.7k
        Status trailer_status = append_packed_info_trailer(
828
14.7k
                packed_file->writer.get(), packed_file->packed_file_path, packed_file_info);
829
14.7k
        if (!trailer_status.ok()) {
830
0
            handle_failure(packed_file, trailer_status);
831
0
            continue;
832
0
        }
833
834
        // Now upload the file
835
14.7k
        if (!packed_file->slice_locations.empty()) {
836
14.7k
            std::ostringstream oss;
837
14.7k
            oss << "Uploading packed file " << packed_file_path << " with "
838
14.7k
                << packed_file->slice_locations.size() << " small files: ";
839
14.7k
            bool first = true;
840
63.2k
            for (const auto& [small_file_path, index] : packed_file->slice_locations) {
841
63.2k
                if (!first) {
842
48.5k
                    oss << ", ";
843
48.5k
                }
844
63.2k
                first = false;
845
63.2k
                oss << "[" << small_file_path << ", offset=" << index.offset
846
63.2k
                    << ", size=" << index.size << "]";
847
63.2k
            }
848
14.7k
            VLOG_DEBUG << oss.str();
849
14.7k
        } else {
850
0
            VLOG_DEBUG << "Uploading packed file " << packed_file_path << " with no small files";
851
0
        }
852
853
14.7k
        Status upload_status = finalize_packed_file_upload(packed_file->packed_file_path,
854
14.7k
                                                           packed_file->writer.get());
855
856
14.7k
        if (upload_status.is<ErrorCode::ALREADY_CLOSED>()) {
857
0
            record_ready_to_upload(packed_file);
858
0
            handle_success(packed_file);
859
0
            continue;
860
0
        }
861
14.7k
        if (!upload_status.ok()) {
862
0
            handle_failure(packed_file, upload_status);
863
0
            continue;
864
0
        }
865
866
14.7k
        record_ready_to_upload(packed_file);
867
14.7k
        packed_file->state = PackedFileState::UPLOADING;
868
14.7k
    }
869
870
414k
    for (auto& packed_file : files_uploading) {
871
63.4k
        if (!packed_file->writer) {
872
0
            handle_failure(packed_file,
873
0
                           Status::InternalError("File writer is null for packed file: {}",
874
0
                                                 packed_file->packed_file_path));
875
0
            continue;
876
0
        }
877
878
63.4k
        if (packed_file->writer->state() != FileWriter::State::CLOSED) {
879
33.9k
            continue;
880
33.9k
        }
881
882
29.4k
        Status status = packed_file->writer->close(true);
883
29.4k
        if (status.is<ErrorCode::ALREADY_CLOSED>()) {
884
14.7k
            handle_success(packed_file);
885
14.7k
            continue;
886
14.7k
        }
887
14.7k
        if (status.ok()) {
888
14.7k
            continue;
889
14.7k
        }
890
891
1
        handle_failure(packed_file, status);
892
1
    }
893
414k
}
894
895
Status PackedFileManager::finalize_packed_file_upload(const std::string& packed_file_path,
896
14.7k
                                                      FileWriter* writer) {
897
14.7k
    if (writer == nullptr) {
898
1
        return Status::InternalError("File writer is null for packed file: " + packed_file_path);
899
1
    }
900
901
14.7k
    return writer->close(true);
902
14.7k
}
903
904
Status PackedFileManager::update_meta_service(const std::string& packed_file_path,
905
13.7k
                                              const cloud::PackedFileInfoPB& packed_file_info) {
906
#ifdef BE_TEST
907
    TEST_SYNC_POINT_RETURN_WITH_VALUE("PackedFileManager::update_meta_service", Status::OK(),
908
                                      packed_file_path, &packed_file_info);
909
#endif
910
13.7k
    VLOG_DEBUG << "Updating meta service for packed file: " << packed_file_path << " with "
911
0
               << packed_file_info.total_slice_num() << " small files"
912
0
               << ", total bytes: " << packed_file_info.total_slice_bytes();
913
914
    // Get CloudMetaMgr through StorageEngine
915
13.7k
    if (!config::is_cloud_mode()) {
916
0
        return Status::InternalError("Storage engine is not cloud mode");
917
0
    }
918
919
13.7k
    auto& storage_engine = ExecEnv::GetInstance()->storage_engine();
920
13.7k
    auto& cloud_meta_mgr = storage_engine.to_cloud().meta_mgr();
921
13.7k
    return cloud_meta_mgr.update_packed_file_info(packed_file_path, packed_file_info);
922
13.7k
}
923
924
73
void PackedFileManager::cleanup_expired_data() {
925
73
    auto current_time = std::time(nullptr);
926
927
    // Clean up expired uploaded files
928
73
    {
929
73
        std::lock_guard<std::mutex> uploaded_lock(_packed_files_mutex);
930
73
        auto it = _uploaded_packed_files.begin();
931
383k
        while (it != _uploaded_packed_files.end()) {
932
383k
            if (it->second->state == PackedFileState::UPLOADED &&
933
383k
                current_time - it->second->upload_time > config::uploaded_file_retention_seconds) {
934
9.28k
                it = _uploaded_packed_files.erase(it);
935
374k
            } else if (it->second->state == PackedFileState::FAILED &&
936
374k
                       current_time - it->second->upload_time >
937
0
                               config::uploaded_file_retention_seconds) {
938
0
                it = _uploaded_packed_files.erase(it);
939
374k
            } else {
940
374k
                ++it;
941
374k
            }
942
383k
        }
943
73
    }
944
945
    // Clean up expired global index entries
946
73
    {
947
73
        std::lock_guard<std::mutex> global_lock(_global_index_mutex);
948
73
        auto it = _global_slice_locations.begin();
949
1.72M
        while (it != _global_slice_locations.end()) {
950
1.72M
            const auto& index = it->second;
951
1.72M
            if (index.create_time > 0 &&
952
1.72M
                current_time - index.create_time > config::uploaded_file_retention_seconds) {
953
41.4k
                it = _global_slice_locations.erase(it);
954
1.68M
            } else {
955
1.68M
                ++it;
956
1.68M
            }
957
1.72M
        }
958
73
    }
959
73
}
960
961
#ifdef BE_TEST
962
namespace {
963
void reset_adder(bvar::Adder<int64_t>& adder) {
964
    auto current = adder.get_value();
965
    if (current != 0) {
966
        adder << (-current);
967
    }
968
}
969
} // namespace
970
971
void PackedFileManager::reset_packed_file_bvars_for_test() const {
972
    reset_adder(g_packed_file_total_count);
973
    reset_adder(g_packed_file_total_small_file_count);
974
    reset_adder(g_packed_file_total_size_bytes);
975
    g_packed_file_small_file_num_recorder.reset();
976
    g_packed_file_file_size_recorder.reset();
977
    g_packed_file_active_to_ready_ms_recorder.reset();
978
    g_packed_file_ready_to_upload_ms_recorder.reset();
979
    g_packed_file_uploading_to_uploaded_ms_recorder.reset();
980
    if (auto* sampler = g_packed_file_small_file_num_recorder.get_sampler()) {
981
        sampler->take_sample();
982
    }
983
    if (auto* sampler = g_packed_file_file_size_recorder.get_sampler()) {
984
        sampler->take_sample();
985
    }
986
    if (auto* sampler = g_packed_file_active_to_ready_ms_recorder.get_sampler()) {
987
        sampler->take_sample();
988
    }
989
    if (auto* sampler = g_packed_file_ready_to_upload_ms_recorder.get_sampler()) {
990
        sampler->take_sample();
991
    }
992
    if (auto* sampler = g_packed_file_uploading_to_uploaded_ms_recorder.get_sampler()) {
993
        sampler->take_sample();
994
    }
995
}
996
997
int64_t PackedFileManager::packed_file_total_count_for_test() const {
998
    return g_packed_file_total_count.get_value();
999
}
1000
1001
int64_t PackedFileManager::packed_file_total_small_file_num_for_test() const {
1002
    return g_packed_file_total_small_file_count.get_value();
1003
}
1004
1005
int64_t PackedFileManager::packed_file_total_size_bytes_for_test() const {
1006
    return g_packed_file_total_size_bytes.get_value();
1007
}
1008
1009
double PackedFileManager::packed_file_avg_small_file_num_for_test() const {
1010
    return g_packed_file_avg_small_file_num.get_value().get_average_double();
1011
}
1012
1013
double PackedFileManager::packed_file_avg_file_size_for_test() const {
1014
    return g_packed_file_avg_file_size_bytes.get_value().get_average_double();
1015
}
1016
1017
void PackedFileManager::record_packed_file_metrics_for_test(
1018
        const PackedFileManager::PackedFileContext* packed_file) {
1019
    DCHECK(packed_file != nullptr);
1020
    record_packed_file_metrics(*packed_file);
1021
}
1022
1023
void PackedFileManager::clear_state_for_test() {
1024
    std::lock_guard<std::timed_mutex> cur_lock(_current_packed_file_mutex);
1025
    _current_packed_files.clear();
1026
    {
1027
        std::lock_guard<std::mutex> lock(_packed_files_mutex);
1028
        _uploading_packed_files.clear();
1029
        _uploaded_packed_files.clear();
1030
    }
1031
    {
1032
        std::lock_guard<std::mutex> lock(_global_index_mutex);
1033
        _global_slice_locations.clear();
1034
    }
1035
}
1036
#endif
1037
1038
} // namespace doris::io