Coverage Report

Created: 2026-03-24 20:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/packed_file_manager.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/packed_file_manager.h"
19
20
#include <bvar/bvar.h>
21
#include <bvar/recorder.h>
22
#include <bvar/window.h>
23
24
#include <algorithm>
25
#include <chrono>
26
#include <cstdint>
27
#include <ctime>
28
#include <functional>
29
#include <limits>
30
#include <optional>
31
#include <random>
32
#include <sstream>
33
#include <unordered_set>
34
35
#ifdef BE_TEST
36
#include "cpp/sync_point.h"
37
#endif
38
39
#include <gen_cpp/cloud.pb.h>
40
41
#include "cloud/cloud_meta_mgr.h"
42
#include "cloud/cloud_storage_engine.h"
43
#include "cloud/config.h"
44
#include "common/config.h"
45
#include "io/cache/block_file_cache.h"
46
#include "io/cache/block_file_cache_factory.h"
47
#include "io/cache/file_block.h"
48
#include "io/cache/file_cache_common.h"
49
#include "io/fs/packed_file_trailer.h"
50
#include "runtime/exec_env.h"
51
#include "storage/storage_engine.h"
52
#include "util/coding.h"
53
#include "util/slice.h"
54
#include "util/uid_util.h"
55
56
namespace doris::io {
57
58
namespace {
59
60
bvar::Adder<int64_t> g_packed_file_total_count("packed_file", "total_count");
61
bvar::Adder<int64_t> g_packed_file_total_small_file_count("packed_file", "total_small_file_num");
62
bvar::Adder<int64_t> g_packed_file_total_size_bytes("packed_file", "total_size_bytes");
63
bvar::IntRecorder g_packed_file_small_file_num_recorder;
64
bvar::IntRecorder g_packed_file_file_size_recorder;
65
bvar::Window<bvar::IntRecorder> g_packed_file_avg_small_file_num(
66
        "packed_file_avg_small_file_num", &g_packed_file_small_file_num_recorder,
67
        /*window_size=*/10);
68
bvar::Window<bvar::IntRecorder> g_packed_file_avg_file_size_bytes("packed_file_avg_file_size_bytes",
69
                                                                  &g_packed_file_file_size_recorder,
70
                                                                  /*window_size=*/10);
71
bvar::IntRecorder g_packed_file_active_to_ready_ms_recorder;
72
bvar::IntRecorder g_packed_file_ready_to_upload_ms_recorder;
73
bvar::IntRecorder g_packed_file_uploading_to_uploaded_ms_recorder;
74
bvar::Window<bvar::IntRecorder> g_packed_file_active_to_ready_ms_window(
75
        "packed_file_active_to_ready_ms", &g_packed_file_active_to_ready_ms_recorder,
76
        /*window_size=*/10);
77
bvar::Window<bvar::IntRecorder> g_packed_file_ready_to_upload_ms_window(
78
        "packed_file_ready_to_upload_ms", &g_packed_file_ready_to_upload_ms_recorder,
79
        /*window_size=*/10);
80
bvar::Window<bvar::IntRecorder> g_packed_file_uploading_to_uploaded_ms_window(
81
        "packed_file_uploading_to_uploaded_ms", &g_packed_file_uploading_to_uploaded_ms_recorder,
82
        /*window_size=*/10);
83
84
// Metrics for async small file cache write
85
bvar::Adder<int64_t> g_packed_file_cache_async_write_count("packed_file_cache",
86
                                                           "async_write_count");
87
bvar::Adder<int64_t> g_packed_file_cache_async_write_bytes("packed_file_cache",
88
                                                           "async_write_bytes");
89
90
Status append_packed_info_trailer(FileWriter* writer, const std::string& packed_file_path,
91
2.00k
                                  const cloud::PackedFileInfoPB& packed_file_info) {
92
2.00k
    if (writer == nullptr) {
93
0
        return Status::InternalError("File writer is null for packed file: {}", packed_file_path);
94
0
    }
95
2.00k
    if (writer->state() == FileWriter::State::CLOSED) {
96
0
        return Status::InternalError("File writer already closed for packed file: {}",
97
0
                                     packed_file_path);
98
0
    }
99
100
2.00k
    cloud::PackedFileFooterPB footer_pb;
101
2.00k
    footer_pb.mutable_packed_file_info()->CopyFrom(packed_file_info);
102
103
2.00k
    std::string serialized_footer;
104
2.00k
    if (!footer_pb.SerializeToString(&serialized_footer)) {
105
0
        return Status::InternalError("Failed to serialize packed file footer info for {}",
106
0
                                     packed_file_path);
107
0
    }
108
109
2.00k
    if (serialized_footer.size() >
110
2.00k
        std::numeric_limits<uint32_t>::max() - kPackedFileTrailerSuffixSize) {
111
0
        return Status::InternalError("PackedFileFooterPB too large for {}", packed_file_path);
112
0
    }
113
114
2.00k
    std::string trailer;
115
2.00k
    trailer.reserve(serialized_footer.size() + kPackedFileTrailerSuffixSize);
116
2.00k
    trailer.append(serialized_footer);
117
2.00k
    put_fixed32_le(&trailer, static_cast<uint32_t>(serialized_footer.size()));
118
2.00k
    put_fixed32_le(&trailer, kPackedFileTrailerVersion);
119
120
2.00k
    return writer->append(Slice(trailer));
121
2.00k
}
122
123
// write small file data to file cache
124
void do_write_to_file_cache(const std::string& small_file_path, const std::string& data,
125
2.00k
                            int64_t tablet_id, uint64_t expiration_time) {
126
2.00k
    if (data.empty()) {
127
0
        return;
128
0
    }
129
130
    // Generate cache key from small file path (e.g., "rowset_id_seg_id.dat")
131
2.00k
    Path path(small_file_path);
132
2.00k
    UInt128Wrapper cache_hash = BlockFileCache::hash(path.filename().native());
133
134
2.00k
    VLOG_DEBUG << "packed_file_cache_write: path=" << small_file_path
135
0
               << " filename=" << path.filename().native() << " hash=" << cache_hash.to_string()
136
0
               << " size=" << data.size() << " tablet_id=" << tablet_id
137
0
               << " expiration_time=" << expiration_time;
138
139
2.00k
    BlockFileCache* file_cache = FileCacheFactory::instance()->get_by_path(cache_hash);
140
2.00k
    if (file_cache == nullptr) {
141
0
        return; // Cache not available, skip
142
0
    }
143
144
    // Allocate cache blocks
145
2.00k
    CacheContext ctx;
146
2.00k
    ctx.cache_type = expiration_time > 0 ? FileCacheType::TTL : FileCacheType::NORMAL;
147
2.00k
    ctx.expiration_time = expiration_time;
148
2.00k
    ctx.tablet_id = tablet_id;
149
2.00k
    ReadStatistics stats;
150
2.00k
    ctx.stats = &stats;
151
152
2.00k
    FileBlocksHolder holder = file_cache->get_or_set(cache_hash, 0, data.size(), ctx);
153
154
    // Write data to cache blocks
155
2.00k
    size_t data_offset = 0;
156
2.00k
    for (auto& block : holder.file_blocks) {
157
2.00k
        if (data_offset >= data.size()) {
158
0
            break;
159
0
        }
160
2.00k
        size_t block_size = block->range().size();
161
2.00k
        size_t write_size = std::min(block_size, data.size() - data_offset);
162
163
2.00k
        if (block->state() == FileBlock::State::EMPTY) {
164
1.49k
            block->get_or_set_downloader();
165
1.49k
            if (block->is_downloader()) {
166
1.49k
                Slice s(data.data() + data_offset, write_size);
167
1.49k
                Status st = block->append(s);
168
1.49k
                if (st.ok()) {
169
1.49k
                    st = block->finalize();
170
1.49k
                }
171
1.49k
                if (!st.ok()) {
172
0
                    LOG(WARNING) << "Write small file to cache failed: " << st.msg();
173
0
                }
174
1.49k
            }
175
1.49k
        }
176
2.00k
        data_offset += write_size;
177
2.00k
    }
178
2.00k
}
179
180
// Async wrapper: submit cache write task to thread pool
181
// The data is copied into the lambda capture to ensure its lifetime extends beyond
182
// the async task execution. The original Slice may reference a buffer that gets
183
// reused or freed before the async task runs.
184
void write_small_file_to_cache_async(const std::string& small_file_path, const Slice& data,
185
2.08k
                                     int64_t tablet_id, uint64_t expiration_time) {
186
2.08k
    if (!config::enable_file_cache || data.size == 0) {
187
86
        return;
188
86
    }
189
190
    // Copy data since original buffer may be reused before async task executes
191
    // For small files (< 1MB), copy overhead is acceptable
192
2.00k
    std::string data_copy(data.data, data.size);
193
2.00k
    size_t data_size = data.size;
194
195
2.00k
    auto* thread_pool = ExecEnv::GetInstance()->s3_file_upload_thread_pool();
196
2.00k
    if (thread_pool == nullptr) {
197
        // Fallback to sync write if thread pool not available
198
0
        do_write_to_file_cache(small_file_path, data_copy, tablet_id, expiration_time);
199
0
        return;
200
0
    }
201
202
    // Track async write count and bytes
203
2.00k
    g_packed_file_cache_async_write_count << 1;
204
2.00k
    g_packed_file_cache_async_write_bytes << static_cast<int64_t>(data_size);
205
206
2.00k
    Status st = thread_pool->submit_func([path = small_file_path, data = std::move(data_copy),
207
2.00k
                                          tablet_id, data_size, expiration_time]() {
208
2.00k
        do_write_to_file_cache(path, data, tablet_id, expiration_time);
209
        // Decrement async write count after completion
210
2.00k
        g_packed_file_cache_async_write_count << -1;
211
2.00k
        g_packed_file_cache_async_write_bytes << -static_cast<int64_t>(data_size);
212
2.00k
    });
213
214
2.00k
    if (!st.ok()) {
215
        // Revert metrics since task was not submitted
216
0
        g_packed_file_cache_async_write_count << -1;
217
0
        g_packed_file_cache_async_write_bytes << -static_cast<int64_t>(data_size);
218
0
        LOG(WARNING) << "Failed to submit cache write task for " << small_file_path << ": "
219
0
                     << st.msg();
220
        // Don't block on failure, cache write is best-effort
221
0
    }
222
2.00k
}
223
224
} // namespace
225
226
2.04k
PackedFileManager* PackedFileManager::instance() {
227
2.04k
    static PackedFileManager instance;
228
2.04k
    return &instance;
229
2.04k
}
230
231
62
PackedFileManager::~PackedFileManager() {
232
62
    stop_background_manager();
233
62
}
234
235
74
Status PackedFileManager::init() {
236
74
    return Status::OK();
237
74
}
238
239
Status PackedFileManager::create_new_packed_file_context(
240
2.11k
        const std::string& resource_id, std::unique_ptr<PackedFileContext>& packed_file_ctx) {
241
2.11k
    FileSystemSPtr file_system;
242
2.11k
    RETURN_IF_ERROR(ensure_file_system(resource_id, &file_system));
243
2.10k
    if (file_system == nullptr) {
244
0
        return Status::InternalError("File system is not available for packed file creation: " +
245
0
                                     resource_id);
246
0
    }
247
248
2.10k
    auto uuid = generate_uuid_string();
249
2.10k
    auto hash_val = std::hash<std::string> {}(uuid);
250
2.10k
    uint16_t path_bucket = hash_val % 4096 + 1;
251
2.10k
    std::stringstream path_stream;
252
2.10k
    path_stream << "data/packed_file/" << path_bucket << "/" << uuid << ".bin";
253
254
2.10k
    packed_file_ctx = std::make_unique<PackedFileContext>();
255
2.10k
    const std::string relative_path = path_stream.str();
256
2.10k
    packed_file_ctx->packed_file_path = relative_path;
257
2.10k
    packed_file_ctx->create_time = std::time(nullptr);
258
2.10k
    packed_file_ctx->create_timestamp = std::chrono::steady_clock::now();
259
2.10k
    packed_file_ctx->state = PackedFileState::INIT;
260
2.10k
    packed_file_ctx->resource_id = resource_id;
261
2.10k
    packed_file_ctx->file_system = std::move(file_system);
262
263
    // Create file writer for the packed file
264
2.10k
    FileWriterPtr new_writer;
265
2.10k
    FileWriterOptions opts;
266
    // Disable write_file_cache for packed file itself.
267
    // We write file cache for each small file separately in append_small_file()
268
    // using the small file path as cache key, ensuring cache entries can be
269
    // properly cleaned up when stale rowsets are removed.
270
2.10k
    opts.write_file_cache = false;
271
2.10k
    RETURN_IF_ERROR(
272
2.10k
            packed_file_ctx->file_system->create_file(Path(relative_path), &new_writer, &opts));
273
2.10k
    packed_file_ctx->writer = std::move(new_writer);
274
275
2.10k
    return Status::OK();
276
2.10k
}
277
278
Status PackedFileManager::ensure_file_system(const std::string& resource_id,
279
2.11k
                                             FileSystemSPtr* file_system) {
280
2.11k
    if (file_system == nullptr) {
281
0
        return Status::InvalidArgument("file_system output parameter is null");
282
0
    }
283
284
2.11k
    {
285
2.11k
        std::lock_guard<std::mutex> lock(_file_system_mutex);
286
2.11k
        if (resource_id.empty()) {
287
0
            if (_default_file_system != nullptr) {
288
0
                *file_system = _default_file_system;
289
0
                return Status::OK();
290
0
            }
291
2.11k
        } else {
292
2.11k
            auto it = _file_systems.find(resource_id);
293
2.11k
            if (it != _file_systems.end()) {
294
2.10k
                *file_system = it->second;
295
2.10k
                return Status::OK();
296
2.10k
            }
297
2.11k
        }
298
2.11k
    }
299
300
4
    if (!config::is_cloud_mode()) {
301
4
        return Status::InternalError("Cloud file system is not available in local mode");
302
4
    }
303
304
0
    auto* exec_env = ExecEnv::GetInstance();
305
0
    if (exec_env == nullptr) {
306
0
        return Status::InternalError("ExecEnv instance is not initialized");
307
0
    }
308
309
0
    FileSystemSPtr resolved_fs;
310
0
    if (resource_id.empty()) {
311
0
        resolved_fs = exec_env->storage_engine().to_cloud().latest_fs();
312
0
        if (resolved_fs == nullptr) {
313
0
            return Status::InternalError("Cloud file system is not ready");
314
0
        }
315
0
    } else {
316
0
        auto storage_resource =
317
0
                exec_env->storage_engine().to_cloud().get_storage_resource(resource_id);
318
0
        if (!storage_resource.has_value() || storage_resource->fs == nullptr) {
319
0
            return Status::InternalError("Cloud file system is not ready for resource: " +
320
0
                                         resource_id);
321
0
        }
322
0
        resolved_fs = storage_resource->fs;
323
0
    }
324
325
0
    {
326
0
        std::lock_guard<std::mutex> lock(_file_system_mutex);
327
0
        if (resource_id.empty()) {
328
0
            _default_file_system = resolved_fs;
329
0
            *file_system = _default_file_system;
330
0
        } else {
331
0
            _file_systems[resource_id] = resolved_fs;
332
0
            *file_system = resolved_fs;
333
0
        }
334
0
    }
335
336
0
    return Status::OK();
337
0
}
338
339
Status PackedFileManager::append_small_file(const std::string& path, const Slice& data,
340
2.10k
                                            const PackedAppendContext& info) {
341
    // Check if file is too large to be merged
342
2.10k
    if (data.get_size() > config::small_file_threshold_bytes) {
343
6
        return Status::OK(); // Skip merging for large files
344
6
    }
345
346
2.09k
    if (info.txn_id <= 0) {
347
2
        return Status::InvalidArgument("Missing valid txn id for packed file append: " + path);
348
2
    }
349
350
2.09k
    std::lock_guard<std::timed_mutex> lock(_current_packed_file_mutex);
351
352
2.09k
    auto& current_state = _current_packed_files[info.resource_id];
353
2.09k
    if (!current_state || !current_state->writer) {
354
24
        RETURN_IF_ERROR(create_new_packed_file_context(info.resource_id, current_state));
355
24
    }
356
357
    // Check if we need to create a new packed file
358
2.08k
    if (current_state->total_size + data.get_size() >= config::packed_file_size_threshold_bytes) {
359
8
        RETURN_IF_ERROR(mark_current_packed_file_for_upload_locked(info.resource_id));
360
8
        auto it = _current_packed_files.find(info.resource_id);
361
8
        if (it == _current_packed_files.end() || !it->second || !it->second->writer) {
362
0
            RETURN_IF_ERROR(create_new_packed_file_context(
363
0
                    info.resource_id, _current_packed_files[info.resource_id]));
364
0
        }
365
8
    }
366
367
2.08k
    PackedFileContext* active_state = current_state.get();
368
369
    // Write data to current packed file
370
2.08k
    RETURN_IF_ERROR(active_state->writer->append(data));
371
372
    // Async write data to file cache using small file path as cache key.
373
    // This ensures cache key matches the cleanup key in Rowset::clear_cache(),
374
    // allowing proper cache cleanup when stale rowsets are removed.
375
2.08k
    if (info.write_file_cache) {
376
2.08k
        write_small_file_to_cache_async(path, data, info.tablet_id, info.expiration_time);
377
2.08k
    }
378
379
    // Update index
380
2.08k
    PackedSliceLocation location;
381
2.08k
    location.packed_file_path = active_state->packed_file_path;
382
2.08k
    location.offset = active_state->current_offset;
383
2.08k
    location.size = data.get_size();
384
2.08k
    location.create_time = std::time(nullptr);
385
2.08k
    location.tablet_id = info.tablet_id;
386
2.08k
    location.rowset_id = info.rowset_id;
387
2.08k
    location.resource_id = info.resource_id;
388
2.08k
    location.txn_id = info.txn_id;
389
390
2.08k
    active_state->slice_locations[path] = location;
391
2.08k
    active_state->current_offset += data.get_size();
392
2.08k
    active_state->total_size += data.get_size();
393
394
    // Rotate packed file when small file count reaches threshold
395
2.08k
    if (config::packed_file_small_file_count_threshold > 0 &&
396
2.08k
        static_cast<int64_t>(active_state->slice_locations.size()) >=
397
2.08k
                config::packed_file_small_file_count_threshold) {
398
2
        RETURN_IF_ERROR(mark_current_packed_file_for_upload_locked(info.resource_id));
399
2
    }
400
401
    // Mark as active if this is the first write
402
2.08k
    if (!active_state->first_append_timestamp.has_value()) {
403
2.03k
        active_state->first_append_timestamp = std::chrono::steady_clock::now();
404
2.03k
    }
405
2.08k
    if (active_state->state == PackedFileState::INIT) {
406
2.03k
        active_state->state = PackedFileState::ACTIVE;
407
2.03k
    }
408
409
    // Update global index
410
2.08k
    {
411
2.08k
        std::lock_guard<std::mutex> global_lock(_global_index_mutex);
412
2.08k
        _global_slice_locations[path] = location;
413
2.08k
    }
414
415
2.08k
    return Status::OK();
416
2.08k
}
417
418
1.99k
Status PackedFileManager::wait_for_packed_file_upload(PackedFileContext* packed_file_ptr) {
419
1.99k
    std::unique_lock<std::mutex> upload_lock(packed_file_ptr->upload_mutex);
420
4.00k
    packed_file_ptr->upload_cv.wait(upload_lock, [packed_file_ptr] {
421
4.00k
        auto state = packed_file_ptr->state.load(std::memory_order_acquire);
422
4.00k
        return state == PackedFileState::UPLOADED || state == PackedFileState::FAILED;
423
4.00k
    });
424
1.99k
    if (packed_file_ptr->state == PackedFileState::FAILED) {
425
0
        std::string err = packed_file_ptr->last_error;
426
0
        if (err.empty()) {
427
0
            err = "Packed file upload failed";
428
0
        }
429
0
        return Status::InternalError(err);
430
0
    }
431
1.99k
    return Status::OK();
432
1.99k
}
433
434
2.00k
Status PackedFileManager::wait_upload_done(const std::string& path) {
435
2.00k
    std::string packed_file_path;
436
2.00k
    {
437
2.00k
        std::lock_guard<std::mutex> global_lock(_global_index_mutex);
438
2.00k
        auto it = _global_slice_locations.find(path);
439
2.00k
        if (it == _global_slice_locations.end()) {
440
2
            return Status::InternalError("File not found in global index: " + path);
441
2
        }
442
2.00k
        packed_file_path = it->second.packed_file_path;
443
2.00k
    }
444
445
    // Find the packed file in uploaded files first - if already uploaded, no need to wait
446
0
    std::shared_ptr<PackedFileContext> managed_packed_file;
447
2.00k
    std::shared_ptr<PackedFileContext> failed_packed_file;
448
2.00k
    {
449
2.00k
        std::lock_guard<std::mutex> lock(_packed_files_mutex);
450
2.00k
        auto uploaded_it = _uploaded_packed_files.find(packed_file_path);
451
2.00k
        if (uploaded_it != _uploaded_packed_files.end()) {
452
4
            auto state = uploaded_it->second->state.load(std::memory_order_acquire);
453
4
            if (state == PackedFileState::UPLOADED) {
454
2
                return Status::OK(); // Already uploaded, no need to wait
455
2
            }
456
2
            if (state == PackedFileState::FAILED) {
457
2
                failed_packed_file = uploaded_it->second;
458
2
            } else {
459
0
                managed_packed_file = uploaded_it->second;
460
0
            }
461
2
        }
462
2.00k
    }
463
464
2.00k
    if (failed_packed_file) {
465
2
        std::lock_guard<std::mutex> upload_lock(failed_packed_file->upload_mutex);
466
2
        std::string err = failed_packed_file->last_error;
467
2
        if (err.empty()) {
468
0
            err = "Merge file upload failed";
469
0
        }
470
2
        return Status::InternalError(err);
471
2
    }
472
473
    // Find the packed file in either current or uploading files
474
2.00k
    PackedFileContext* packed_file_ptr = nullptr;
475
2.00k
    {
476
2.00k
        std::lock_guard<std::timed_mutex> current_lock(_current_packed_file_mutex);
477
11.4k
        for (auto& [resource_id, state] : _current_packed_files) {
478
11.4k
            if (state && state->packed_file_path == packed_file_path) {
479
1.91k
                packed_file_ptr = state.get();
480
1.91k
                break;
481
1.91k
            }
482
11.4k
        }
483
2.00k
    }
484
485
2.00k
    if (!packed_file_ptr) {
486
88
        std::lock_guard<std::mutex> lock(_packed_files_mutex);
487
88
        auto uploading_it = _uploading_packed_files.find(packed_file_path);
488
88
        if (uploading_it != _uploading_packed_files.end()) {
489
87
            managed_packed_file = uploading_it->second;
490
87
            packed_file_ptr = managed_packed_file.get();
491
87
        } else {
492
1
            auto uploaded_it = _uploaded_packed_files.find(packed_file_path);
493
1
            if (uploaded_it != _uploaded_packed_files.end()) {
494
1
                managed_packed_file = uploaded_it->second;
495
1
                packed_file_ptr = managed_packed_file.get();
496
1
            }
497
1
        }
498
88
    }
499
500
2.00k
    if (!packed_file_ptr) {
501
        // Packed file not found in any location, this is unexpected
502
0
        return Status::InternalError("Packed file not found for path: " + path);
503
0
    }
504
505
2.00k
    Status wait_status = wait_for_packed_file_upload(packed_file_ptr);
506
2.00k
    (void)managed_packed_file; // keep shared ownership alive during wait
507
2.00k
    return wait_status;
508
2.00k
}
509
510
Status PackedFileManager::get_packed_slice_location(const std::string& path,
511
2.00k
                                                    PackedSliceLocation* location) {
512
2.00k
    std::lock_guard<std::mutex> lock(_global_index_mutex);
513
2.00k
    auto it = _global_slice_locations.find(path);
514
2.00k
    if (it == _global_slice_locations.end()) {
515
2
        return Status::NotFound("File not found in global packed index: {}", path);
516
2
    }
517
518
2.00k
    *location = it->second;
519
2.00k
    return Status::OK();
520
2.00k
}
521
522
4
void PackedFileManager::start_background_manager() {
523
4
    if (_background_thread) {
524
0
        return; // Already started
525
0
    }
526
527
4
    _stop_background_thread = false;
528
4
    _background_thread = std::make_unique<std::thread>([this] { background_manager(); });
529
4
}
530
531
68
void PackedFileManager::stop_background_manager() {
532
68
    _stop_background_thread = true;
533
68
    if (_background_thread && _background_thread->joinable()) {
534
4
        _background_thread->join();
535
4
    }
536
68
    _background_thread.reset();
537
68
}
538
539
Status PackedFileManager::mark_current_packed_file_for_upload_locked(
540
2.02k
        const std::string& resource_id) {
541
2.02k
    auto it = _current_packed_files.find(resource_id);
542
2.02k
    if (it == _current_packed_files.end() || !it->second || !it->second->writer) {
543
0
        return Status::OK(); // Nothing to mark for upload
544
0
    }
545
546
2.02k
    auto& current = it->second;
547
548
    // Mark as ready for upload
549
2.02k
    current->state = PackedFileState::READY_TO_UPLOAD;
550
2.02k
    if (!current->ready_to_upload_timestamp.has_value()) {
551
2.02k
        auto now = std::chrono::steady_clock::now();
552
2.02k
        current->ready_to_upload_timestamp = now;
553
2.02k
        int64_t active_to_ready_ms = -1;
554
2.02k
        if (current->first_append_timestamp.has_value()) {
555
2.02k
            active_to_ready_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
556
2.02k
                                         now - *current->first_append_timestamp)
557
2.02k
                                         .count();
558
2.02k
            g_packed_file_active_to_ready_ms_recorder << active_to_ready_ms;
559
2.02k
            if (auto* sampler = g_packed_file_active_to_ready_ms_recorder.get_sampler()) {
560
2.02k
                sampler->take_sample();
561
2.02k
            }
562
2.02k
        }
563
2.02k
        VLOG_DEBUG << "Packed file " << current->packed_file_path
564
0
                   << " transition ACTIVE->READY_TO_UPLOAD; active_to_ready_ms="
565
0
                   << active_to_ready_ms;
566
2.02k
    }
567
568
    // Move to uploading files list
569
2.02k
    {
570
2.02k
        std::shared_ptr<PackedFileContext> uploading_ptr =
571
2.02k
                std::shared_ptr<PackedFileContext>(std::move(current));
572
2.02k
        std::lock_guard<std::mutex> lock(_packed_files_mutex);
573
2.02k
        _uploading_packed_files[uploading_ptr->packed_file_path] = uploading_ptr;
574
2.02k
    }
575
576
    // Create new packed file
577
2.02k
    return create_new_packed_file_context(resource_id, it->second);
578
2.02k
}
579
580
2.01k
Status PackedFileManager::mark_current_packed_file_for_upload(const std::string& resource_id) {
581
2.01k
    std::lock_guard<std::timed_mutex> lock(_current_packed_file_mutex);
582
2.01k
    return mark_current_packed_file_for_upload_locked(resource_id);
583
2.01k
}
584
585
2.00k
void PackedFileManager::record_packed_file_metrics(const PackedFileContext& packed_file) {
586
2.00k
    g_packed_file_total_count << 1;
587
2.00k
    g_packed_file_total_small_file_count
588
2.00k
            << static_cast<int64_t>(packed_file.slice_locations.size());
589
2.00k
    g_packed_file_total_size_bytes << packed_file.total_size;
590
2.00k
    g_packed_file_small_file_num_recorder
591
2.00k
            << static_cast<int64_t>(packed_file.slice_locations.size());
592
2.00k
    g_packed_file_file_size_recorder << packed_file.total_size;
593
    // Flush samplers immediately so the window bvar reflects the latest packed file.
594
2.00k
    if (auto* sampler = g_packed_file_small_file_num_recorder.get_sampler()) {
595
2.00k
        sampler->take_sample();
596
2.00k
    }
597
2.00k
    if (auto* sampler = g_packed_file_file_size_recorder.get_sampler()) {
598
2.00k
        sampler->take_sample();
599
2.00k
    }
600
2.00k
}
601
602
4
void PackedFileManager::background_manager() {
603
4
    auto last_cleanup_time = std::chrono::steady_clock::now();
604
605
2.08k
    while (!_stop_background_thread.load()) {
606
2.08k
        int64_t check_interval_ms =
607
2.08k
                std::max<int64_t>(1, config::packed_file_time_threshold_ms / 10);
608
2.08k
        std::this_thread::sleep_for(std::chrono::milliseconds(check_interval_ms));
609
610
        // Check if current packed file should be closed due to time threshold
611
2.08k
        std::vector<std::string> resources_to_mark;
612
2.08k
        {
613
2.08k
            std::unique_lock<std::timed_mutex> current_lock(_current_packed_file_mutex,
614
2.08k
                                                            std::defer_lock);
615
2.08k
            int64_t lock_wait_ms = std::max<int64_t>(0, config::packed_file_try_lock_timeout_ms);
616
2.08k
            if (current_lock.try_lock_for(std::chrono::milliseconds(lock_wait_ms))) {
617
20.3k
                for (auto& [resource_id, state] : _current_packed_files) {
618
20.3k
                    if (!state || state->state != PackedFileState::ACTIVE) {
619
17.0k
                        continue;
620
17.0k
                    }
621
3.34k
                    if (!state->first_append_timestamp.has_value()) {
622
0
                        continue;
623
0
                    }
624
3.34k
                    auto current_time = std::chrono::steady_clock::now();
625
3.34k
                    auto elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
626
3.34k
                                              current_time - *(state->first_append_timestamp))
627
3.34k
                                              .count();
628
3.34k
                    if (elapsed_ms >= config::packed_file_time_threshold_ms) {
629
2.00k
                        resources_to_mark.push_back(resource_id);
630
2.00k
                    }
631
3.34k
                }
632
2.07k
            }
633
2.08k
        }
634
2.08k
        for (const auto& resource_id : resources_to_mark) {
635
2.00k
            Status st = mark_current_packed_file_for_upload(resource_id);
636
2.00k
            if (!st.ok()) {
637
0
                LOG(WARNING) << "Failed to close current packed file for resource " << resource_id
638
0
                             << ": " << st.to_string();
639
0
            }
640
2.00k
        }
641
642
        // Process uploading files
643
2.08k
        process_uploading_packed_files();
644
645
2.08k
        auto now = std::chrono::steady_clock::now();
646
2.08k
        int64_t cleanup_interval_sec =
647
2.08k
                std::max<int64_t>(1, config::packed_file_cleanup_interval_seconds);
648
2.08k
        auto cleanup_interval = std::chrono::seconds(cleanup_interval_sec);
649
2.08k
        if (now - last_cleanup_time >= cleanup_interval) {
650
0
            cleanup_expired_data();
651
0
            last_cleanup_time = now;
652
0
        }
653
2.08k
    }
654
4
}
655
656
2.09k
void PackedFileManager::process_uploading_packed_files() {
657
2.09k
    std::vector<std::shared_ptr<PackedFileContext>> files_ready;
658
2.09k
    std::vector<std::shared_ptr<PackedFileContext>> files_uploading;
659
2.09k
    auto record_ready_to_upload = [&](const std::shared_ptr<PackedFileContext>& packed_file) {
660
2.00k
        if (!packed_file->uploading_timestamp.has_value()) {
661
2.00k
            packed_file->uploading_timestamp = std::chrono::steady_clock::now();
662
2.00k
            int64_t duration_ms = -1;
663
2.00k
            if (packed_file->ready_to_upload_timestamp.has_value()) {
664
2.00k
                duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
665
2.00k
                                      *packed_file->uploading_timestamp -
666
2.00k
                                      *packed_file->ready_to_upload_timestamp)
667
2.00k
                                      .count();
668
2.00k
                g_packed_file_ready_to_upload_ms_recorder << duration_ms;
669
2.00k
                if (auto* sampler = g_packed_file_ready_to_upload_ms_recorder.get_sampler()) {
670
2.00k
                    sampler->take_sample();
671
2.00k
                }
672
2.00k
            }
673
2.00k
            VLOG_DEBUG << "Packed file " << packed_file->packed_file_path
674
0
                       << " transition READY_TO_UPLOAD->UPLOADING; "
675
0
                          "ready_to_upload_ms="
676
0
                       << duration_ms;
677
2.00k
        }
678
2.00k
    };
679
680
2.09k
    {
681
2.09k
        std::lock_guard<std::mutex> lock(_packed_files_mutex);
682
6.06k
        for (auto& [packed_file_path, packed_file] : _uploading_packed_files) {
683
6.06k
            auto state = packed_file->state.load(std::memory_order_acquire);
684
6.06k
            if (state != PackedFileState::READY_TO_UPLOAD && state != PackedFileState::UPLOADING) {
685
0
                continue;
686
0
            }
687
6.06k
            if (state == PackedFileState::READY_TO_UPLOAD) {
688
2.00k
                files_ready.emplace_back(packed_file);
689
4.06k
            } else {
690
4.06k
                files_uploading.emplace_back(packed_file);
691
4.06k
            }
692
6.06k
        }
693
2.09k
    }
694
695
2.09k
    auto handle_success = [&](const std::shared_ptr<PackedFileContext>& packed_file) {
696
2.00k
        auto now = std::chrono::steady_clock::now();
697
2.00k
        int64_t active_to_ready_ms = -1;
698
2.00k
        int64_t ready_to_upload_ms = -1;
699
2.00k
        int64_t uploading_to_uploaded_ms = -1;
700
2.00k
        if (packed_file->first_append_timestamp.has_value() &&
701
2.00k
            packed_file->ready_to_upload_timestamp.has_value()) {
702
2.00k
            active_to_ready_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
703
2.00k
                                         *packed_file->ready_to_upload_timestamp -
704
2.00k
                                         *packed_file->first_append_timestamp)
705
2.00k
                                         .count();
706
2.00k
        }
707
2.00k
        if (packed_file->ready_to_upload_timestamp.has_value() &&
708
2.00k
            packed_file->uploading_timestamp.has_value()) {
709
2.00k
            ready_to_upload_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
710
2.00k
                                         *packed_file->uploading_timestamp -
711
2.00k
                                         *packed_file->ready_to_upload_timestamp)
712
2.00k
                                         .count();
713
2.00k
        }
714
2.00k
        if (packed_file->uploading_timestamp.has_value()) {
715
2.00k
            uploading_to_uploaded_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
716
2.00k
                                               now - *packed_file->uploading_timestamp)
717
2.00k
                                               .count();
718
2.00k
            g_packed_file_uploading_to_uploaded_ms_recorder << uploading_to_uploaded_ms;
719
2.00k
            if (auto* sampler = g_packed_file_uploading_to_uploaded_ms_recorder.get_sampler()) {
720
2.00k
                sampler->take_sample();
721
2.00k
            }
722
2.00k
        }
723
2.00k
        int64_t total_ms = -1;
724
2.00k
        if (packed_file->first_append_timestamp.has_value()) {
725
2.00k
            total_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
726
2.00k
                               now - *packed_file->first_append_timestamp)
727
2.00k
                               .count();
728
2.00k
        }
729
2.00k
        std::ostringstream slices_stream;
730
2.00k
        bool first_slice = true;
731
2.00k
        for (const auto& [small_file_path, index] : packed_file->slice_locations) {
732
2.00k
            if (!first_slice) {
733
0
                slices_stream << "; ";
734
0
            }
735
2.00k
            first_slice = false;
736
2.00k
            slices_stream << small_file_path << "(txn=" << index.txn_id
737
2.00k
                          << ", offset=" << index.offset << ", size=" << index.size << ")";
738
739
            // Update packed_file_size in global index
740
2.00k
            {
741
2.00k
                std::lock_guard<std::mutex> global_lock(_global_index_mutex);
742
2.00k
                auto it = _global_slice_locations.find(small_file_path);
743
2.00k
                if (it != _global_slice_locations.end()) {
744
2.00k
                    it->second.packed_file_size = packed_file->total_size;
745
2.00k
                }
746
2.00k
            }
747
2.00k
        }
748
2.00k
        LOG(INFO) << "Packed file " << packed_file->packed_file_path
749
2.00k
                  << " uploaded; slices=" << packed_file->slice_locations.size()
750
2.00k
                  << ", total_bytes=" << packed_file->total_size << ", slice_detail=["
751
2.00k
                  << slices_stream.str() << "]"
752
2.00k
                  << ", active_to_ready_ms=" << active_to_ready_ms
753
2.00k
                  << ", ready_to_upload_ms=" << ready_to_upload_ms
754
2.00k
                  << ", uploading_to_uploaded_ms=" << uploading_to_uploaded_ms
755
2.00k
                  << ", total_ms=" << total_ms;
756
2.00k
        {
757
2.00k
            std::lock_guard<std::mutex> upload_lock(packed_file->upload_mutex);
758
2.00k
            packed_file->state = PackedFileState::UPLOADED;
759
2.00k
            packed_file->upload_time = std::time(nullptr);
760
2.00k
        }
761
2.00k
        packed_file->upload_cv.notify_all();
762
2.00k
        {
763
2.00k
            std::lock_guard<std::mutex> lock(_packed_files_mutex);
764
2.00k
            _uploading_packed_files.erase(packed_file->packed_file_path);
765
2.00k
            _uploaded_packed_files[packed_file->packed_file_path] = packed_file;
766
2.00k
        }
767
2.00k
    };
768
769
2.09k
    auto handle_failure = [&](const std::shared_ptr<PackedFileContext>& packed_file,
770
2.09k
                              const Status& status) {
771
6
        LOG(WARNING) << "Failed to upload packed file: " << packed_file->packed_file_path
772
6
                     << ", error: " << status.to_string();
773
6
        {
774
6
            std::lock_guard<std::mutex> upload_lock(packed_file->upload_mutex);
775
6
            packed_file->state = PackedFileState::FAILED;
776
6
            packed_file->last_error = status.to_string();
777
6
            packed_file->upload_time = std::time(nullptr);
778
6
        }
779
6
        packed_file->upload_cv.notify_all();
780
6
        {
781
6
            std::lock_guard<std::mutex> lock(_packed_files_mutex);
782
6
            _uploading_packed_files.erase(packed_file->packed_file_path);
783
6
            _uploaded_packed_files[packed_file->packed_file_path] = packed_file;
784
6
        }
785
6
    };
786
787
2.09k
    for (auto& packed_file : files_ready) {
788
2.00k
        const std::string& packed_file_path = packed_file->packed_file_path;
789
2.00k
        cloud::PackedFileInfoPB packed_file_info;
790
2.00k
        packed_file_info.set_ref_cnt(packed_file->slice_locations.size());
791
2.00k
        packed_file_info.set_total_slice_num(packed_file->slice_locations.size());
792
2.00k
        packed_file_info.set_total_slice_bytes(packed_file->total_size);
793
2.00k
        packed_file_info.set_remaining_slice_bytes(packed_file->total_size);
794
2.00k
        packed_file_info.set_created_at_sec(packed_file->create_time);
795
2.00k
        packed_file_info.set_corrected(false);
796
2.00k
        packed_file_info.set_state(doris::cloud::PackedFileInfoPB::NORMAL);
797
2.00k
        packed_file_info.set_resource_id(packed_file->resource_id);
798
799
2.00k
        for (const auto& [small_file_path, index] : packed_file->slice_locations) {
800
2.00k
            auto* small_file = packed_file_info.add_slices();
801
2.00k
            small_file->set_path(small_file_path);
802
2.00k
            small_file->set_offset(index.offset);
803
2.00k
            small_file->set_size(index.size);
804
2.00k
            small_file->set_deleted(false);
805
2.00k
            if (index.tablet_id != 0) {
806
2.00k
                small_file->set_tablet_id(index.tablet_id);
807
2.00k
            }
808
2.00k
            if (!index.rowset_id.empty()) {
809
2.00k
                small_file->set_rowset_id(index.rowset_id);
810
2.00k
            }
811
2.00k
            if (index.txn_id != 0) {
812
2.00k
                small_file->set_txn_id(index.txn_id);
813
2.00k
            }
814
2.00k
        }
815
816
2.00k
        Status meta_status = update_meta_service(packed_file->packed_file_path, packed_file_info);
817
2.00k
        if (!meta_status.ok()) {
818
4
            LOG(WARNING) << "Failed to update meta service for packed file: "
819
4
                         << packed_file->packed_file_path << ", error: " << meta_status.to_string();
820
4
            handle_failure(packed_file, meta_status);
821
4
            continue;
822
4
        }
823
824
        // Record stats once the packed file metadata is persisted.
825
2.00k
        record_packed_file_metrics(*packed_file);
826
827
2.00k
        Status trailer_status = append_packed_info_trailer(
828
2.00k
                packed_file->writer.get(), packed_file->packed_file_path, packed_file_info);
829
2.00k
        if (!trailer_status.ok()) {
830
0
            handle_failure(packed_file, trailer_status);
831
0
            continue;
832
0
        }
833
834
        // Now upload the file
835
2.00k
        if (!packed_file->slice_locations.empty()) {
836
2.00k
            std::ostringstream oss;
837
2.00k
            oss << "Uploading packed file " << packed_file_path << " with "
838
2.00k
                << packed_file->slice_locations.size() << " small files: ";
839
2.00k
            bool first = true;
840
2.00k
            for (const auto& [small_file_path, index] : packed_file->slice_locations) {
841
2.00k
                if (!first) {
842
0
                    oss << ", ";
843
0
                }
844
2.00k
                first = false;
845
2.00k
                oss << "[" << small_file_path << ", offset=" << index.offset
846
2.00k
                    << ", size=" << index.size << "]";
847
2.00k
            }
848
2.00k
            VLOG_DEBUG << oss.str();
849
2.00k
        } else {
850
0
            VLOG_DEBUG << "Uploading packed file " << packed_file_path << " with no small files";
851
0
        }
852
853
2.00k
        Status upload_status = finalize_packed_file_upload(packed_file->packed_file_path,
854
2.00k
                                                           packed_file->writer.get());
855
856
2.00k
        if (upload_status.is<ErrorCode::ALREADY_CLOSED>()) {
857
0
            record_ready_to_upload(packed_file);
858
0
            handle_success(packed_file);
859
0
            continue;
860
0
        }
861
2.00k
        if (!upload_status.ok()) {
862
0
            handle_failure(packed_file, upload_status);
863
0
            continue;
864
0
        }
865
866
2.00k
        record_ready_to_upload(packed_file);
867
2.00k
        packed_file->state = PackedFileState::UPLOADING;
868
2.00k
    }
869
870
4.06k
    for (auto& packed_file : files_uploading) {
871
4.06k
        if (!packed_file->writer) {
872
0
            handle_failure(packed_file,
873
0
                           Status::InternalError("File writer is null for packed file: {}",
874
0
                                                 packed_file->packed_file_path));
875
0
            continue;
876
0
        }
877
878
4.06k
        if (packed_file->writer->state() != FileWriter::State::CLOSED) {
879
56
            continue;
880
56
        }
881
882
4.00k
        Status status = packed_file->writer->close(true);
883
4.00k
        if (status.is<ErrorCode::ALREADY_CLOSED>()) {
884
2.00k
            handle_success(packed_file);
885
2.00k
            continue;
886
2.00k
        }
887
2.00k
        if (status.ok()) {
888
2.00k
            continue;
889
2.00k
        }
890
891
2
        handle_failure(packed_file, status);
892
2
    }
893
2.09k
}
894
895
Status PackedFileManager::finalize_packed_file_upload(const std::string& packed_file_path,
896
2.00k
                                                      FileWriter* writer) {
897
2.00k
    if (writer == nullptr) {
898
2
        return Status::InternalError("File writer is null for packed file: " + packed_file_path);
899
2
    }
900
901
2.00k
    return writer->close(true);
902
2.00k
}
903
904
Status PackedFileManager::update_meta_service(const std::string& packed_file_path,
905
2.00k
                                              const cloud::PackedFileInfoPB& packed_file_info) {
906
2.00k
#ifdef BE_TEST
907
2.00k
    TEST_SYNC_POINT_RETURN_WITH_VALUE("PackedFileManager::update_meta_service", Status::OK(),
908
4
                                      packed_file_path, &packed_file_info);
909
4
#endif
910
4
    VLOG_DEBUG << "Updating meta service for packed file: " << packed_file_path << " with "
911
0
               << packed_file_info.total_slice_num() << " small files"
912
0
               << ", total bytes: " << packed_file_info.total_slice_bytes();
913
914
    // Get CloudMetaMgr through StorageEngine
915
4
    if (!config::is_cloud_mode()) {
916
4
        return Status::InternalError("Storage engine is not cloud mode");
917
4
    }
918
919
0
    auto& storage_engine = ExecEnv::GetInstance()->storage_engine();
920
0
    auto& cloud_meta_mgr = storage_engine.to_cloud().meta_mgr();
921
0
    return cloud_meta_mgr.update_packed_file_info(packed_file_path, packed_file_info);
922
4
}
923
924
2
void PackedFileManager::cleanup_expired_data() {
925
2
    auto current_time = std::time(nullptr);
926
927
    // Clean up expired uploaded files
928
2
    {
929
2
        std::lock_guard<std::mutex> uploaded_lock(_packed_files_mutex);
930
2
        auto it = _uploaded_packed_files.begin();
931
4
        while (it != _uploaded_packed_files.end()) {
932
2
            if (it->second->state == PackedFileState::UPLOADED &&
933
2
                current_time - it->second->upload_time > config::uploaded_file_retention_seconds) {
934
2
                it = _uploaded_packed_files.erase(it);
935
2
            } else if (it->second->state == PackedFileState::FAILED &&
936
0
                       current_time - it->second->upload_time >
937
0
                               config::uploaded_file_retention_seconds) {
938
0
                it = _uploaded_packed_files.erase(it);
939
0
            } else {
940
0
                ++it;
941
0
            }
942
2
        }
943
2
    }
944
945
    // Clean up expired global index entries
946
2
    {
947
2
        std::lock_guard<std::mutex> global_lock(_global_index_mutex);
948
2
        auto it = _global_slice_locations.begin();
949
2
        while (it != _global_slice_locations.end()) {
950
0
            const auto& index = it->second;
951
0
            if (index.create_time > 0 &&
952
0
                current_time - index.create_time > config::uploaded_file_retention_seconds) {
953
0
                it = _global_slice_locations.erase(it);
954
0
            } else {
955
0
                ++it;
956
0
            }
957
0
        }
958
2
    }
959
2
}
960
961
#ifdef BE_TEST
962
namespace {
963
192
void reset_adder(bvar::Adder<int64_t>& adder) {
964
192
    auto current = adder.get_value();
965
192
    if (current != 0) {
966
18
        adder << (-current);
967
18
    }
968
192
}
969
} // namespace
970
971
64
void PackedFileManager::reset_packed_file_bvars_for_test() const {
972
64
    reset_adder(g_packed_file_total_count);
973
64
    reset_adder(g_packed_file_total_small_file_count);
974
64
    reset_adder(g_packed_file_total_size_bytes);
975
64
    g_packed_file_small_file_num_recorder.reset();
976
64
    g_packed_file_file_size_recorder.reset();
977
64
    g_packed_file_active_to_ready_ms_recorder.reset();
978
64
    g_packed_file_ready_to_upload_ms_recorder.reset();
979
64
    g_packed_file_uploading_to_uploaded_ms_recorder.reset();
980
64
    if (auto* sampler = g_packed_file_small_file_num_recorder.get_sampler()) {
981
64
        sampler->take_sample();
982
64
    }
983
64
    if (auto* sampler = g_packed_file_file_size_recorder.get_sampler()) {
984
64
        sampler->take_sample();
985
64
    }
986
64
    if (auto* sampler = g_packed_file_active_to_ready_ms_recorder.get_sampler()) {
987
64
        sampler->take_sample();
988
64
    }
989
64
    if (auto* sampler = g_packed_file_ready_to_upload_ms_recorder.get_sampler()) {
990
64
        sampler->take_sample();
991
64
    }
992
64
    if (auto* sampler = g_packed_file_uploading_to_uploaded_ms_recorder.get_sampler()) {
993
64
        sampler->take_sample();
994
64
    }
995
64
}
996
997
4
int64_t PackedFileManager::packed_file_total_count_for_test() const {
998
4
    return g_packed_file_total_count.get_value();
999
4
}
1000
1001
4
int64_t PackedFileManager::packed_file_total_small_file_num_for_test() const {
1002
4
    return g_packed_file_total_small_file_count.get_value();
1003
4
}
1004
1005
4
int64_t PackedFileManager::packed_file_total_size_bytes_for_test() const {
1006
4
    return g_packed_file_total_size_bytes.get_value();
1007
4
}
1008
1009
4
double PackedFileManager::packed_file_avg_small_file_num_for_test() const {
1010
4
    return g_packed_file_avg_small_file_num.get_value().get_average_double();
1011
4
}
1012
1013
4
double PackedFileManager::packed_file_avg_file_size_for_test() const {
1014
4
    return g_packed_file_avg_file_size_bytes.get_value().get_average_double();
1015
4
}
1016
1017
void PackedFileManager::record_packed_file_metrics_for_test(
1018
2
        const PackedFileManager::PackedFileContext* packed_file) {
1019
2
    DCHECK(packed_file != nullptr);
1020
2
    record_packed_file_metrics(*packed_file);
1021
2
}
1022
1023
4
void PackedFileManager::clear_state_for_test() {
1024
4
    std::lock_guard<std::timed_mutex> cur_lock(_current_packed_file_mutex);
1025
4
    _current_packed_files.clear();
1026
4
    {
1027
4
        std::lock_guard<std::mutex> lock(_packed_files_mutex);
1028
4
        _uploading_packed_files.clear();
1029
4
        _uploaded_packed_files.clear();
1030
4
    }
1031
4
    {
1032
4
        std::lock_guard<std::mutex> lock(_global_index_mutex);
1033
4
        _global_slice_locations.clear();
1034
4
    }
1035
4
}
1036
#endif
1037
1038
} // namespace doris::io