Coverage Report

Created: 2026-03-14 06:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/packed_file_manager.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/packed_file_manager.h"
19
20
#include <bvar/bvar.h>
21
#include <bvar/recorder.h>
22
#include <bvar/window.h>
23
24
#include <algorithm>
25
#include <chrono>
26
#include <cstdint>
27
#include <ctime>
28
#include <functional>
29
#include <limits>
30
#include <optional>
31
#include <random>
32
#include <sstream>
33
#include <unordered_set>
34
35
#ifdef BE_TEST
36
#include "cpp/sync_point.h"
37
#endif
38
39
#include <gen_cpp/cloud.pb.h>
40
41
#include "cloud/cloud_meta_mgr.h"
42
#include "cloud/cloud_storage_engine.h"
43
#include "cloud/config.h"
44
#include "common/config.h"
45
#include "io/cache/block_file_cache.h"
46
#include "io/cache/block_file_cache_factory.h"
47
#include "io/cache/file_block.h"
48
#include "io/cache/file_cache_common.h"
49
#include "io/fs/packed_file_trailer.h"
50
#include "runtime/exec_env.h"
51
#include "storage/storage_engine.h"
52
#include "util/coding.h"
53
#include "util/slice.h"
54
#include "util/uid_util.h"
55
56
namespace doris::io {
57
58
namespace {
59
60
bvar::Adder<int64_t> g_packed_file_total_count("packed_file", "total_count");
61
bvar::Adder<int64_t> g_packed_file_total_small_file_count("packed_file", "total_small_file_num");
62
bvar::Adder<int64_t> g_packed_file_total_size_bytes("packed_file", "total_size_bytes");
63
bvar::IntRecorder g_packed_file_small_file_num_recorder;
64
bvar::IntRecorder g_packed_file_file_size_recorder;
65
bvar::Window<bvar::IntRecorder> g_packed_file_avg_small_file_num(
66
        "packed_file_avg_small_file_num", &g_packed_file_small_file_num_recorder,
67
        /*window_size=*/10);
68
bvar::Window<bvar::IntRecorder> g_packed_file_avg_file_size_bytes("packed_file_avg_file_size_bytes",
69
                                                                  &g_packed_file_file_size_recorder,
70
                                                                  /*window_size=*/10);
71
bvar::IntRecorder g_packed_file_active_to_ready_ms_recorder;
72
bvar::IntRecorder g_packed_file_ready_to_upload_ms_recorder;
73
bvar::IntRecorder g_packed_file_uploading_to_uploaded_ms_recorder;
74
bvar::Window<bvar::IntRecorder> g_packed_file_active_to_ready_ms_window(
75
        "packed_file_active_to_ready_ms", &g_packed_file_active_to_ready_ms_recorder,
76
        /*window_size=*/10);
77
bvar::Window<bvar::IntRecorder> g_packed_file_ready_to_upload_ms_window(
78
        "packed_file_ready_to_upload_ms", &g_packed_file_ready_to_upload_ms_recorder,
79
        /*window_size=*/10);
80
bvar::Window<bvar::IntRecorder> g_packed_file_uploading_to_uploaded_ms_window(
81
        "packed_file_uploading_to_uploaded_ms", &g_packed_file_uploading_to_uploaded_ms_recorder,
82
        /*window_size=*/10);
83
84
// Metrics for async small file cache write
85
bvar::Adder<int64_t> g_packed_file_cache_async_write_count("packed_file_cache",
86
                                                           "async_write_count");
87
bvar::Adder<int64_t> g_packed_file_cache_async_write_bytes("packed_file_cache",
88
                                                           "async_write_bytes");
89
90
Status append_packed_info_trailer(FileWriter* writer, const std::string& packed_file_path,
91
1.00k
                                  const cloud::PackedFileInfoPB& packed_file_info) {
92
1.00k
    if (writer == nullptr) {
93
0
        return Status::InternalError("File writer is null for packed file: {}", packed_file_path);
94
0
    }
95
1.00k
    if (writer->state() == FileWriter::State::CLOSED) {
96
0
        return Status::InternalError("File writer already closed for packed file: {}",
97
0
                                     packed_file_path);
98
0
    }
99
100
1.00k
    cloud::PackedFileFooterPB footer_pb;
101
1.00k
    footer_pb.mutable_packed_file_info()->CopyFrom(packed_file_info);
102
103
1.00k
    std::string serialized_footer;
104
1.00k
    if (!footer_pb.SerializeToString(&serialized_footer)) {
105
0
        return Status::InternalError("Failed to serialize packed file footer info for {}",
106
0
                                     packed_file_path);
107
0
    }
108
109
1.00k
    if (serialized_footer.size() >
110
1.00k
        std::numeric_limits<uint32_t>::max() - kPackedFileTrailerSuffixSize) {
111
0
        return Status::InternalError("PackedFileFooterPB too large for {}", packed_file_path);
112
0
    }
113
114
1.00k
    std::string trailer;
115
1.00k
    trailer.reserve(serialized_footer.size() + kPackedFileTrailerSuffixSize);
116
1.00k
    trailer.append(serialized_footer);
117
1.00k
    put_fixed32_le(&trailer, static_cast<uint32_t>(serialized_footer.size()));
118
1.00k
    put_fixed32_le(&trailer, kPackedFileTrailerVersion);
119
120
1.00k
    return writer->append(Slice(trailer));
121
1.00k
}
122
123
// write small file data to file cache
124
void do_write_to_file_cache(const std::string& small_file_path, const std::string& data,
125
1.00k
                            int64_t tablet_id, uint64_t expiration_time) {
126
1.00k
    if (data.empty()) {
127
0
        return;
128
0
    }
129
130
    // Generate cache key from small file path (e.g., "rowset_id_seg_id.dat")
131
1.00k
    Path path(small_file_path);
132
1.00k
    UInt128Wrapper cache_hash = BlockFileCache::hash(path.filename().native());
133
134
1.00k
    VLOG_DEBUG << "packed_file_cache_write: path=" << small_file_path
135
0
               << " filename=" << path.filename().native() << " hash=" << cache_hash.to_string()
136
0
               << " size=" << data.size() << " tablet_id=" << tablet_id
137
0
               << " expiration_time=" << expiration_time;
138
139
1.00k
    BlockFileCache* file_cache = FileCacheFactory::instance()->get_by_path(cache_hash);
140
1.00k
    if (file_cache == nullptr) {
141
0
        return; // Cache not available, skip
142
0
    }
143
144
    // Allocate cache blocks
145
1.00k
    CacheContext ctx;
146
1.00k
    ctx.cache_type = expiration_time > 0 ? FileCacheType::TTL : FileCacheType::NORMAL;
147
1.00k
    ctx.expiration_time = expiration_time;
148
1.00k
    ctx.tablet_id = tablet_id;
149
1.00k
    ReadStatistics stats;
150
1.00k
    ctx.stats = &stats;
151
152
1.00k
    FileBlocksHolder holder = file_cache->get_or_set(cache_hash, 0, data.size(), ctx);
153
154
    // Write data to cache blocks
155
1.00k
    size_t data_offset = 0;
156
1.00k
    for (auto& block : holder.file_blocks) {
157
1.00k
        if (data_offset >= data.size()) {
158
0
            break;
159
0
        }
160
1.00k
        size_t block_size = block->range().size();
161
1.00k
        size_t write_size = std::min(block_size, data.size() - data_offset);
162
163
1.00k
        if (block->state() == FileBlock::State::EMPTY) {
164
1.00k
            block->get_or_set_downloader();
165
1.00k
            if (block->is_downloader()) {
166
1.00k
                Slice s(data.data() + data_offset, write_size);
167
1.00k
                Status st = block->append(s);
168
1.00k
                if (st.ok()) {
169
1.00k
                    st = block->finalize();
170
1.00k
                }
171
1.00k
                if (!st.ok()) {
172
0
                    LOG(WARNING) << "Write small file to cache failed: " << st.msg();
173
0
                }
174
1.00k
            }
175
1.00k
        }
176
1.00k
        data_offset += write_size;
177
1.00k
    }
178
1.00k
}
179
180
// Async wrapper: submit cache write task to thread pool
181
// The data is copied into the lambda capture to ensure its lifetime extends beyond
182
// the async task execution. The original Slice may reference a buffer that gets
183
// reused or freed before the async task runs.
184
void write_small_file_to_cache_async(const std::string& small_file_path, const Slice& data,
185
1.04k
                                     int64_t tablet_id, uint64_t expiration_time) {
186
1.04k
    if (!config::enable_file_cache || data.size == 0) {
187
43
        return;
188
43
    }
189
190
    // Copy data since original buffer may be reused before async task executes
191
    // For small files (< 1MB), copy overhead is acceptable
192
1.00k
    std::string data_copy(data.data, data.size);
193
1.00k
    size_t data_size = data.size;
194
195
1.00k
    auto* thread_pool = ExecEnv::GetInstance()->s3_file_upload_thread_pool();
196
1.00k
    if (thread_pool == nullptr) {
197
        // Fallback to sync write if thread pool not available
198
0
        do_write_to_file_cache(small_file_path, data_copy, tablet_id, expiration_time);
199
0
        return;
200
0
    }
201
202
    // Track async write count and bytes
203
1.00k
    g_packed_file_cache_async_write_count << 1;
204
1.00k
    g_packed_file_cache_async_write_bytes << static_cast<int64_t>(data_size);
205
206
1.00k
    Status st = thread_pool->submit_func([path = small_file_path, data = std::move(data_copy),
207
1.00k
                                          tablet_id, data_size, expiration_time]() {
208
1.00k
        do_write_to_file_cache(path, data, tablet_id, expiration_time);
209
        // Decrement async write count after completion
210
1.00k
        g_packed_file_cache_async_write_count << -1;
211
1.00k
        g_packed_file_cache_async_write_bytes << -static_cast<int64_t>(data_size);
212
1.00k
    });
213
214
1.00k
    if (!st.ok()) {
215
        // Revert metrics since task was not submitted
216
0
        g_packed_file_cache_async_write_count << -1;
217
0
        g_packed_file_cache_async_write_bytes << -static_cast<int64_t>(data_size);
218
0
        LOG(WARNING) << "Failed to submit cache write task for " << small_file_path << ": "
219
0
                     << st.msg();
220
        // Don't block on failure, cache write is best-effort
221
0
    }
222
1.00k
}
223
224
} // namespace
225
226
1.02k
PackedFileManager* PackedFileManager::instance() {
227
1.02k
    static PackedFileManager instance;
228
1.02k
    return &instance;
229
1.02k
}
230
231
31
PackedFileManager::~PackedFileManager() {
232
31
    stop_background_manager();
233
31
}
234
235
37
Status PackedFileManager::init() {
236
37
    return Status::OK();
237
37
}
238
239
Status PackedFileManager::create_new_packed_file_context(
240
1.05k
        const std::string& resource_id, std::unique_ptr<PackedFileContext>& packed_file_ctx) {
241
1.05k
    FileSystemSPtr file_system;
242
1.05k
    RETURN_IF_ERROR(ensure_file_system(resource_id, &file_system));
243
1.05k
    if (file_system == nullptr) {
244
0
        return Status::InternalError("File system is not available for packed file creation: " +
245
0
                                     resource_id);
246
0
    }
247
248
1.05k
    auto uuid = generate_uuid_string();
249
1.05k
    auto hash_val = std::hash<std::string> {}(uuid);
250
1.05k
    uint16_t path_bucket = hash_val % 4096 + 1;
251
1.05k
    std::stringstream path_stream;
252
1.05k
    path_stream << "data/packed_file/" << path_bucket << "/" << uuid << ".bin";
253
254
1.05k
    packed_file_ctx = std::make_unique<PackedFileContext>();
255
1.05k
    const std::string relative_path = path_stream.str();
256
1.05k
    packed_file_ctx->packed_file_path = relative_path;
257
1.05k
    packed_file_ctx->create_time = std::time(nullptr);
258
1.05k
    packed_file_ctx->create_timestamp = std::chrono::steady_clock::now();
259
1.05k
    packed_file_ctx->state = PackedFileState::INIT;
260
1.05k
    packed_file_ctx->resource_id = resource_id;
261
1.05k
    packed_file_ctx->file_system = std::move(file_system);
262
263
    // Create file writer for the packed file
264
1.05k
    FileWriterPtr new_writer;
265
1.05k
    FileWriterOptions opts;
266
    // Disable write_file_cache for packed file itself.
267
    // We write file cache for each small file separately in append_small_file()
268
    // using the small file path as cache key, ensuring cache entries can be
269
    // properly cleaned up when stale rowsets are removed.
270
1.05k
    opts.write_file_cache = false;
271
1.05k
    RETURN_IF_ERROR(
272
1.05k
            packed_file_ctx->file_system->create_file(Path(relative_path), &new_writer, &opts));
273
1.05k
    packed_file_ctx->writer = std::move(new_writer);
274
275
1.05k
    return Status::OK();
276
1.05k
}
277
278
Status PackedFileManager::ensure_file_system(const std::string& resource_id,
279
1.05k
                                             FileSystemSPtr* file_system) {
280
1.05k
    if (file_system == nullptr) {
281
0
        return Status::InvalidArgument("file_system output parameter is null");
282
0
    }
283
284
1.05k
    {
285
1.05k
        std::lock_guard<std::mutex> lock(_file_system_mutex);
286
1.05k
        if (resource_id.empty()) {
287
0
            if (_default_file_system != nullptr) {
288
0
                *file_system = _default_file_system;
289
0
                return Status::OK();
290
0
            }
291
1.05k
        } else {
292
1.05k
            auto it = _file_systems.find(resource_id);
293
1.05k
            if (it != _file_systems.end()) {
294
1.05k
                *file_system = it->second;
295
1.05k
                return Status::OK();
296
1.05k
            }
297
1.05k
        }
298
1.05k
    }
299
300
2
    if (!config::is_cloud_mode()) {
301
2
        return Status::InternalError("Cloud file system is not available in local mode");
302
2
    }
303
304
0
    auto* exec_env = ExecEnv::GetInstance();
305
0
    if (exec_env == nullptr) {
306
0
        return Status::InternalError("ExecEnv instance is not initialized");
307
0
    }
308
309
0
    FileSystemSPtr resolved_fs;
310
0
    if (resource_id.empty()) {
311
0
        resolved_fs = exec_env->storage_engine().to_cloud().latest_fs();
312
0
        if (resolved_fs == nullptr) {
313
0
            return Status::InternalError("Cloud file system is not ready");
314
0
        }
315
0
    } else {
316
0
        auto storage_resource =
317
0
                exec_env->storage_engine().to_cloud().get_storage_resource(resource_id);
318
0
        if (!storage_resource.has_value() || storage_resource->fs == nullptr) {
319
0
            return Status::InternalError("Cloud file system is not ready for resource: " +
320
0
                                         resource_id);
321
0
        }
322
0
        resolved_fs = storage_resource->fs;
323
0
    }
324
325
0
    {
326
0
        std::lock_guard<std::mutex> lock(_file_system_mutex);
327
0
        if (resource_id.empty()) {
328
0
            _default_file_system = resolved_fs;
329
0
            *file_system = _default_file_system;
330
0
        } else {
331
0
            _file_systems[resource_id] = resolved_fs;
332
0
            *file_system = resolved_fs;
333
0
        }
334
0
    }
335
336
0
    return Status::OK();
337
0
}
338
339
Status PackedFileManager::append_small_file(const std::string& path, const Slice& data,
340
1.05k
                                            const PackedAppendContext& info) {
341
    // Check if file is too large to be merged
342
1.05k
    if (data.get_size() > config::small_file_threshold_bytes) {
343
3
        return Status::OK(); // Skip merging for large files
344
3
    }
345
346
1.04k
    if (info.txn_id <= 0) {
347
1
        return Status::InvalidArgument("Missing valid txn id for packed file append: " + path);
348
1
    }
349
350
1.04k
    std::lock_guard<std::timed_mutex> lock(_current_packed_file_mutex);
351
352
1.04k
    auto& current_state = _current_packed_files[info.resource_id];
353
1.04k
    if (!current_state || !current_state->writer) {
354
12
        RETURN_IF_ERROR(create_new_packed_file_context(info.resource_id, current_state));
355
12
    }
356
357
    // Check if we need to create a new packed file
358
1.04k
    if (current_state->total_size + data.get_size() >= config::packed_file_size_threshold_bytes) {
359
4
        RETURN_IF_ERROR(mark_current_packed_file_for_upload_locked(info.resource_id));
360
4
        auto it = _current_packed_files.find(info.resource_id);
361
4
        if (it == _current_packed_files.end() || !it->second || !it->second->writer) {
362
0
            RETURN_IF_ERROR(create_new_packed_file_context(
363
0
                    info.resource_id, _current_packed_files[info.resource_id]));
364
0
        }
365
4
    }
366
367
1.04k
    PackedFileContext* active_state = current_state.get();
368
369
    // Write data to current packed file
370
1.04k
    RETURN_IF_ERROR(active_state->writer->append(data));
371
372
    // Async write data to file cache using small file path as cache key.
373
    // This ensures cache key matches the cleanup key in Rowset::clear_cache(),
374
    // allowing proper cache cleanup when stale rowsets are removed.
375
1.04k
    if (info.write_file_cache) {
376
1.04k
        write_small_file_to_cache_async(path, data, info.tablet_id, info.expiration_time);
377
1.04k
    }
378
379
    // Update index
380
1.04k
    PackedSliceLocation location;
381
1.04k
    location.packed_file_path = active_state->packed_file_path;
382
1.04k
    location.offset = active_state->current_offset;
383
1.04k
    location.size = data.get_size();
384
1.04k
    location.create_time = std::time(nullptr);
385
1.04k
    location.tablet_id = info.tablet_id;
386
1.04k
    location.rowset_id = info.rowset_id;
387
1.04k
    location.resource_id = info.resource_id;
388
1.04k
    location.txn_id = info.txn_id;
389
390
1.04k
    active_state->slice_locations[path] = location;
391
1.04k
    active_state->current_offset += data.get_size();
392
1.04k
    active_state->total_size += data.get_size();
393
394
    // Rotate packed file when small file count reaches threshold
395
1.04k
    if (config::packed_file_small_file_count_threshold > 0 &&
396
1.04k
        static_cast<int64_t>(active_state->slice_locations.size()) >=
397
1.04k
                config::packed_file_small_file_count_threshold) {
398
1
        RETURN_IF_ERROR(mark_current_packed_file_for_upload_locked(info.resource_id));
399
1
    }
400
401
    // Mark as active if this is the first write
402
1.04k
    if (!active_state->first_append_timestamp.has_value()) {
403
1.01k
        active_state->first_append_timestamp = std::chrono::steady_clock::now();
404
1.01k
    }
405
1.04k
    if (active_state->state == PackedFileState::INIT) {
406
1.01k
        active_state->state = PackedFileState::ACTIVE;
407
1.01k
    }
408
409
    // Update global index
410
1.04k
    {
411
1.04k
        std::lock_guard<std::mutex> global_lock(_global_index_mutex);
412
1.04k
        _global_slice_locations[path] = location;
413
1.04k
    }
414
415
1.04k
    return Status::OK();
416
1.04k
}
417
418
1.00k
Status PackedFileManager::wait_for_packed_file_upload(PackedFileContext* packed_file_ptr) {
419
1.00k
    std::unique_lock<std::mutex> upload_lock(packed_file_ptr->upload_mutex);
420
2.00k
    packed_file_ptr->upload_cv.wait(upload_lock, [packed_file_ptr] {
421
2.00k
        auto state = packed_file_ptr->state.load(std::memory_order_acquire);
422
2.00k
        return state == PackedFileState::UPLOADED || state == PackedFileState::FAILED;
423
2.00k
    });
424
1.00k
    if (packed_file_ptr->state == PackedFileState::FAILED) {
425
0
        std::string err = packed_file_ptr->last_error;
426
0
        if (err.empty()) {
427
0
            err = "Packed file upload failed";
428
0
        }
429
0
        return Status::InternalError(err);
430
0
    }
431
1.00k
    return Status::OK();
432
1.00k
}
433
434
1.00k
Status PackedFileManager::wait_upload_done(const std::string& path) {
435
1.00k
    std::string packed_file_path;
436
1.00k
    {
437
1.00k
        std::lock_guard<std::mutex> global_lock(_global_index_mutex);
438
1.00k
        auto it = _global_slice_locations.find(path);
439
1.00k
        if (it == _global_slice_locations.end()) {
440
1
            return Status::InternalError("File not found in global index: " + path);
441
1
        }
442
1.00k
        packed_file_path = it->second.packed_file_path;
443
1.00k
    }
444
445
    // Find the packed file in uploaded files first - if already uploaded, no need to wait
446
0
    std::shared_ptr<PackedFileContext> managed_packed_file;
447
1.00k
    std::shared_ptr<PackedFileContext> failed_packed_file;
448
1.00k
    {
449
1.00k
        std::lock_guard<std::mutex> lock(_packed_files_mutex);
450
1.00k
        auto uploaded_it = _uploaded_packed_files.find(packed_file_path);
451
1.00k
        if (uploaded_it != _uploaded_packed_files.end()) {
452
2
            auto state = uploaded_it->second->state.load(std::memory_order_acquire);
453
2
            if (state == PackedFileState::UPLOADED) {
454
1
                return Status::OK(); // Already uploaded, no need to wait
455
1
            }
456
1
            if (state == PackedFileState::FAILED) {
457
1
                failed_packed_file = uploaded_it->second;
458
1
            } else {
459
0
                managed_packed_file = uploaded_it->second;
460
0
            }
461
1
        }
462
1.00k
    }
463
464
1.00k
    if (failed_packed_file) {
465
1
        std::lock_guard<std::mutex> upload_lock(failed_packed_file->upload_mutex);
466
1
        std::string err = failed_packed_file->last_error;
467
1
        if (err.empty()) {
468
0
            err = "Merge file upload failed";
469
0
        }
470
1
        return Status::InternalError(err);
471
1
    }
472
473
    // Find the packed file in either current or uploading files
474
1.00k
    PackedFileContext* packed_file_ptr = nullptr;
475
1.00k
    {
476
1.00k
        std::lock_guard<std::timed_mutex> current_lock(_current_packed_file_mutex);
477
5.75k
        for (auto& [resource_id, state] : _current_packed_files) {
478
5.75k
            if (state && state->packed_file_path == packed_file_path) {
479
938
                packed_file_ptr = state.get();
480
938
                break;
481
938
            }
482
5.75k
        }
483
1.00k
    }
484
485
1.00k
    if (!packed_file_ptr) {
486
63
        std::lock_guard<std::mutex> lock(_packed_files_mutex);
487
63
        auto uploading_it = _uploading_packed_files.find(packed_file_path);
488
63
        if (uploading_it != _uploading_packed_files.end()) {
489
63
            managed_packed_file = uploading_it->second;
490
63
            packed_file_ptr = managed_packed_file.get();
491
63
        } else {
492
0
            auto uploaded_it = _uploaded_packed_files.find(packed_file_path);
493
0
            if (uploaded_it != _uploaded_packed_files.end()) {
494
0
                managed_packed_file = uploaded_it->second;
495
0
                packed_file_ptr = managed_packed_file.get();
496
0
            }
497
0
        }
498
63
    }
499
500
1.00k
    if (!packed_file_ptr) {
501
        // Packed file not found in any location, this is unexpected
502
0
        return Status::InternalError("Packed file not found for path: " + path);
503
0
    }
504
505
1.00k
    Status wait_status = wait_for_packed_file_upload(packed_file_ptr);
506
1.00k
    (void)managed_packed_file; // keep shared ownership alive during wait
507
1.00k
    return wait_status;
508
1.00k
}
509
510
Status PackedFileManager::get_packed_slice_location(const std::string& path,
511
1.00k
                                                    PackedSliceLocation* location) {
512
1.00k
    std::lock_guard<std::mutex> lock(_global_index_mutex);
513
1.00k
    auto it = _global_slice_locations.find(path);
514
1.00k
    if (it == _global_slice_locations.end()) {
515
1
        return Status::NotFound("File not found in global packed index: {}", path);
516
1
    }
517
518
1.00k
    *location = it->second;
519
1.00k
    return Status::OK();
520
1.00k
}
521
522
2
void PackedFileManager::start_background_manager() {
523
2
    if (_background_thread) {
524
0
        return; // Already started
525
0
    }
526
527
2
    _stop_background_thread = false;
528
2
    _background_thread = std::make_unique<std::thread>([this] { background_manager(); });
529
2
}
530
531
34
void PackedFileManager::stop_background_manager() {
532
34
    _stop_background_thread = true;
533
34
    if (_background_thread && _background_thread->joinable()) {
534
2
        _background_thread->join();
535
2
    }
536
34
    _background_thread.reset();
537
34
}
538
539
Status PackedFileManager::mark_current_packed_file_for_upload_locked(
540
1.01k
        const std::string& resource_id) {
541
1.01k
    auto it = _current_packed_files.find(resource_id);
542
1.01k
    if (it == _current_packed_files.end() || !it->second || !it->second->writer) {
543
0
        return Status::OK(); // Nothing to mark for upload
544
0
    }
545
546
1.01k
    auto& current = it->second;
547
548
    // Mark as ready for upload
549
1.01k
    current->state = PackedFileState::READY_TO_UPLOAD;
550
1.01k
    if (!current->ready_to_upload_timestamp.has_value()) {
551
1.01k
        auto now = std::chrono::steady_clock::now();
552
1.01k
        current->ready_to_upload_timestamp = now;
553
1.01k
        int64_t active_to_ready_ms = -1;
554
1.01k
        if (current->first_append_timestamp.has_value()) {
555
1.01k
            active_to_ready_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
556
1.01k
                                         now - *current->first_append_timestamp)
557
1.01k
                                         .count();
558
1.01k
            g_packed_file_active_to_ready_ms_recorder << active_to_ready_ms;
559
1.01k
            if (auto* sampler = g_packed_file_active_to_ready_ms_recorder.get_sampler()) {
560
1.01k
                sampler->take_sample();
561
1.01k
            }
562
1.01k
        }
563
1.01k
        VLOG_DEBUG << "Packed file " << current->packed_file_path
564
0
                   << " transition ACTIVE->READY_TO_UPLOAD; active_to_ready_ms="
565
0
                   << active_to_ready_ms;
566
1.01k
    }
567
568
    // Move to uploading files list
569
1.01k
    {
570
1.01k
        std::shared_ptr<PackedFileContext> uploading_ptr =
571
1.01k
                std::shared_ptr<PackedFileContext>(std::move(current));
572
1.01k
        std::lock_guard<std::mutex> lock(_packed_files_mutex);
573
1.01k
        _uploading_packed_files[uploading_ptr->packed_file_path] = uploading_ptr;
574
1.01k
    }
575
576
    // Create new packed file
577
1.01k
    return create_new_packed_file_context(resource_id, it->second);
578
1.01k
}
579
580
1.00k
Status PackedFileManager::mark_current_packed_file_for_upload(const std::string& resource_id) {
581
1.00k
    std::lock_guard<std::timed_mutex> lock(_current_packed_file_mutex);
582
1.00k
    return mark_current_packed_file_for_upload_locked(resource_id);
583
1.00k
}
584
585
1.00k
void PackedFileManager::record_packed_file_metrics(const PackedFileContext& packed_file) {
586
1.00k
    g_packed_file_total_count << 1;
587
1.00k
    g_packed_file_total_small_file_count
588
1.00k
            << static_cast<int64_t>(packed_file.slice_locations.size());
589
1.00k
    g_packed_file_total_size_bytes << packed_file.total_size;
590
1.00k
    g_packed_file_small_file_num_recorder
591
1.00k
            << static_cast<int64_t>(packed_file.slice_locations.size());
592
1.00k
    g_packed_file_file_size_recorder << packed_file.total_size;
593
    // Flush samplers immediately so the window bvar reflects the latest packed file.
594
1.00k
    if (auto* sampler = g_packed_file_small_file_num_recorder.get_sampler()) {
595
1.00k
        sampler->take_sample();
596
1.00k
    }
597
1.00k
    if (auto* sampler = g_packed_file_file_size_recorder.get_sampler()) {
598
1.00k
        sampler->take_sample();
599
1.00k
    }
600
1.00k
}
601
602
2
void PackedFileManager::background_manager() {
603
2
    auto last_cleanup_time = std::chrono::steady_clock::now();
604
605
493
    while (!_stop_background_thread.load()) {
606
491
        int64_t check_interval_ms =
607
491
                std::max<int64_t>(1, config::packed_file_time_threshold_ms / 10);
608
491
        std::this_thread::sleep_for(std::chrono::milliseconds(check_interval_ms));
609
610
        // Check if current packed file should be closed due to time threshold
611
491
        std::vector<std::string> resources_to_mark;
612
491
        {
613
491
            std::unique_lock<std::timed_mutex> current_lock(_current_packed_file_mutex,
614
491
                                                            std::defer_lock);
615
491
            int64_t lock_wait_ms = std::max<int64_t>(0, config::packed_file_try_lock_timeout_ms);
616
491
            if (current_lock.try_lock_for(std::chrono::milliseconds(lock_wait_ms))) {
617
4.67k
                for (auto& [resource_id, state] : _current_packed_files) {
618
4.67k
                    if (!state || state->state != PackedFileState::ACTIVE) {
619
3.06k
                        continue;
620
3.06k
                    }
621
1.60k
                    if (!state->first_append_timestamp.has_value()) {
622
0
                        continue;
623
0
                    }
624
1.60k
                    auto current_time = std::chrono::steady_clock::now();
625
1.60k
                    auto elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
626
1.60k
                                              current_time - *(state->first_append_timestamp))
627
1.60k
                                              .count();
628
1.60k
                    if (elapsed_ms >= config::packed_file_time_threshold_ms) {
629
1.00k
                        resources_to_mark.push_back(resource_id);
630
1.00k
                    }
631
1.60k
                }
632
487
            }
633
491
        }
634
1.00k
        for (const auto& resource_id : resources_to_mark) {
635
1.00k
            Status st = mark_current_packed_file_for_upload(resource_id);
636
1.00k
            if (!st.ok()) {
637
0
                LOG(WARNING) << "Failed to close current packed file for resource " << resource_id
638
0
                             << ": " << st.to_string();
639
0
            }
640
1.00k
        }
641
642
        // Process uploading files
643
491
        process_uploading_packed_files();
644
645
491
        auto now = std::chrono::steady_clock::now();
646
491
        int64_t cleanup_interval_sec =
647
491
                std::max<int64_t>(1, config::packed_file_cleanup_interval_seconds);
648
491
        auto cleanup_interval = std::chrono::seconds(cleanup_interval_sec);
649
491
        if (now - last_cleanup_time >= cleanup_interval) {
650
0
            cleanup_expired_data();
651
0
            last_cleanup_time = now;
652
0
        }
653
491
    }
654
2
}
655
656
498
void PackedFileManager::process_uploading_packed_files() {
657
498
    std::vector<std::shared_ptr<PackedFileContext>> files_ready;
658
498
    std::vector<std::shared_ptr<PackedFileContext>> files_uploading;
659
1.00k
    auto record_ready_to_upload = [&](const std::shared_ptr<PackedFileContext>& packed_file) {
660
1.00k
        if (!packed_file->uploading_timestamp.has_value()) {
661
1.00k
            packed_file->uploading_timestamp = std::chrono::steady_clock::now();
662
1.00k
            int64_t duration_ms = -1;
663
1.00k
            if (packed_file->ready_to_upload_timestamp.has_value()) {
664
1.00k
                duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
665
1.00k
                                      *packed_file->uploading_timestamp -
666
1.00k
                                      *packed_file->ready_to_upload_timestamp)
667
1.00k
                                      .count();
668
1.00k
                g_packed_file_ready_to_upload_ms_recorder << duration_ms;
669
1.00k
                if (auto* sampler = g_packed_file_ready_to_upload_ms_recorder.get_sampler()) {
670
1.00k
                    sampler->take_sample();
671
1.00k
                }
672
1.00k
            }
673
1.00k
            VLOG_DEBUG << "Packed file " << packed_file->packed_file_path
674
0
                       << " transition READY_TO_UPLOAD->UPLOADING; "
675
0
                          "ready_to_upload_ms="
676
0
                       << duration_ms;
677
1.00k
        }
678
1.00k
    };
679
680
498
    {
681
498
        std::lock_guard<std::mutex> lock(_packed_files_mutex);
682
3.00k
        for (auto& [packed_file_path, packed_file] : _uploading_packed_files) {
683
3.00k
            auto state = packed_file->state.load(std::memory_order_acquire);
684
3.00k
            if (state != PackedFileState::READY_TO_UPLOAD && state != PackedFileState::UPLOADING) {
685
0
                continue;
686
0
            }
687
3.00k
            if (state == PackedFileState::READY_TO_UPLOAD) {
688
1.00k
                files_ready.emplace_back(packed_file);
689
2.00k
            } else {
690
2.00k
                files_uploading.emplace_back(packed_file);
691
2.00k
            }
692
3.00k
        }
693
498
    }
694
695
1.00k
    auto handle_success = [&](const std::shared_ptr<PackedFileContext>& packed_file) {
696
1.00k
        auto now = std::chrono::steady_clock::now();
697
1.00k
        int64_t active_to_ready_ms = -1;
698
1.00k
        int64_t ready_to_upload_ms = -1;
699
1.00k
        int64_t uploading_to_uploaded_ms = -1;
700
1.00k
        if (packed_file->first_append_timestamp.has_value() &&
701
1.00k
            packed_file->ready_to_upload_timestamp.has_value()) {
702
1.00k
            active_to_ready_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
703
1.00k
                                         *packed_file->ready_to_upload_timestamp -
704
1.00k
                                         *packed_file->first_append_timestamp)
705
1.00k
                                         .count();
706
1.00k
        }
707
1.00k
        if (packed_file->ready_to_upload_timestamp.has_value() &&
708
1.00k
            packed_file->uploading_timestamp.has_value()) {
709
1.00k
            ready_to_upload_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
710
1.00k
                                         *packed_file->uploading_timestamp -
711
1.00k
                                         *packed_file->ready_to_upload_timestamp)
712
1.00k
                                         .count();
713
1.00k
        }
714
1.00k
        if (packed_file->uploading_timestamp.has_value()) {
715
1.00k
            uploading_to_uploaded_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
716
1.00k
                                               now - *packed_file->uploading_timestamp)
717
1.00k
                                               .count();
718
1.00k
            g_packed_file_uploading_to_uploaded_ms_recorder << uploading_to_uploaded_ms;
719
1.00k
            if (auto* sampler = g_packed_file_uploading_to_uploaded_ms_recorder.get_sampler()) {
720
1.00k
                sampler->take_sample();
721
1.00k
            }
722
1.00k
        }
723
1.00k
        int64_t total_ms = -1;
724
1.00k
        if (packed_file->first_append_timestamp.has_value()) {
725
1.00k
            total_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
726
1.00k
                               now - *packed_file->first_append_timestamp)
727
1.00k
                               .count();
728
1.00k
        }
729
1.00k
        std::ostringstream slices_stream;
730
1.00k
        bool first_slice = true;
731
1.00k
        for (const auto& [small_file_path, index] : packed_file->slice_locations) {
732
1.00k
            if (!first_slice) {
733
0
                slices_stream << "; ";
734
0
            }
735
1.00k
            first_slice = false;
736
1.00k
            slices_stream << small_file_path << "(txn=" << index.txn_id
737
1.00k
                          << ", offset=" << index.offset << ", size=" << index.size << ")";
738
739
            // Update packed_file_size in global index
740
1.00k
            {
741
1.00k
                std::lock_guard<std::mutex> global_lock(_global_index_mutex);
742
1.00k
                auto it = _global_slice_locations.find(small_file_path);
743
1.00k
                if (it != _global_slice_locations.end()) {
744
1.00k
                    it->second.packed_file_size = packed_file->total_size;
745
1.00k
                }
746
1.00k
            }
747
1.00k
        }
748
1.00k
        LOG(INFO) << "Packed file " << packed_file->packed_file_path
749
1.00k
                  << " uploaded; slices=" << packed_file->slice_locations.size()
750
1.00k
                  << ", total_bytes=" << packed_file->total_size << ", slice_detail=["
751
1.00k
                  << slices_stream.str() << "]"
752
1.00k
                  << ", active_to_ready_ms=" << active_to_ready_ms
753
1.00k
                  << ", ready_to_upload_ms=" << ready_to_upload_ms
754
1.00k
                  << ", uploading_to_uploaded_ms=" << uploading_to_uploaded_ms
755
1.00k
                  << ", total_ms=" << total_ms;
756
1.00k
        {
757
1.00k
            std::lock_guard<std::mutex> upload_lock(packed_file->upload_mutex);
758
1.00k
            packed_file->state = PackedFileState::UPLOADED;
759
1.00k
            packed_file->upload_time = std::time(nullptr);
760
1.00k
        }
761
1.00k
        packed_file->upload_cv.notify_all();
762
1.00k
        {
763
1.00k
            std::lock_guard<std::mutex> lock(_packed_files_mutex);
764
1.00k
            _uploading_packed_files.erase(packed_file->packed_file_path);
765
1.00k
            _uploaded_packed_files[packed_file->packed_file_path] = packed_file;
766
1.00k
        }
767
1.00k
    };
768
769
498
    auto handle_failure = [&](const std::shared_ptr<PackedFileContext>& packed_file,
770
498
                              const Status& status) {
771
3
        LOG(WARNING) << "Failed to upload packed file: " << packed_file->packed_file_path
772
3
                     << ", error: " << status.to_string();
773
3
        {
774
3
            std::lock_guard<std::mutex> upload_lock(packed_file->upload_mutex);
775
3
            packed_file->state = PackedFileState::FAILED;
776
3
            packed_file->last_error = status.to_string();
777
3
            packed_file->upload_time = std::time(nullptr);
778
3
        }
779
3
        packed_file->upload_cv.notify_all();
780
3
        {
781
3
            std::lock_guard<std::mutex> lock(_packed_files_mutex);
782
3
            _uploading_packed_files.erase(packed_file->packed_file_path);
783
3
            _uploaded_packed_files[packed_file->packed_file_path] = packed_file;
784
3
        }
785
3
    };
786
787
1.00k
    for (auto& packed_file : files_ready) {
788
1.00k
        const std::string& packed_file_path = packed_file->packed_file_path;
789
1.00k
        cloud::PackedFileInfoPB packed_file_info;
790
1.00k
        packed_file_info.set_ref_cnt(packed_file->slice_locations.size());
791
1.00k
        packed_file_info.set_total_slice_num(packed_file->slice_locations.size());
792
1.00k
        packed_file_info.set_total_slice_bytes(packed_file->total_size);
793
1.00k
        packed_file_info.set_remaining_slice_bytes(packed_file->total_size);
794
1.00k
        packed_file_info.set_created_at_sec(packed_file->create_time);
795
1.00k
        packed_file_info.set_corrected(false);
796
1.00k
        packed_file_info.set_state(doris::cloud::PackedFileInfoPB::NORMAL);
797
1.00k
        packed_file_info.set_resource_id(packed_file->resource_id);
798
799
1.00k
        for (const auto& [small_file_path, index] : packed_file->slice_locations) {
800
1.00k
            auto* small_file = packed_file_info.add_slices();
801
1.00k
            small_file->set_path(small_file_path);
802
1.00k
            small_file->set_offset(index.offset);
803
1.00k
            small_file->set_size(index.size);
804
1.00k
            small_file->set_deleted(false);
805
1.00k
            if (index.tablet_id != 0) {
806
1.00k
                small_file->set_tablet_id(index.tablet_id);
807
1.00k
            }
808
1.00k
            if (!index.rowset_id.empty()) {
809
1.00k
                small_file->set_rowset_id(index.rowset_id);
810
1.00k
            }
811
1.00k
            if (index.txn_id != 0) {
812
1.00k
                small_file->set_txn_id(index.txn_id);
813
1.00k
            }
814
1.00k
        }
815
816
1.00k
        Status meta_status = update_meta_service(packed_file->packed_file_path, packed_file_info);
817
1.00k
        if (!meta_status.ok()) {
818
2
            LOG(WARNING) << "Failed to update meta service for packed file: "
819
2
                         << packed_file->packed_file_path << ", error: " << meta_status.to_string();
820
2
            handle_failure(packed_file, meta_status);
821
2
            continue;
822
2
        }
823
824
        // Record stats once the packed file metadata is persisted.
825
1.00k
        record_packed_file_metrics(*packed_file);
826
827
1.00k
        Status trailer_status = append_packed_info_trailer(
828
1.00k
                packed_file->writer.get(), packed_file->packed_file_path, packed_file_info);
829
1.00k
        if (!trailer_status.ok()) {
830
0
            handle_failure(packed_file, trailer_status);
831
0
            continue;
832
0
        }
833
834
        // Now upload the file
835
1.00k
        if (!packed_file->slice_locations.empty()) {
836
1.00k
            std::ostringstream oss;
837
1.00k
            oss << "Uploading packed file " << packed_file_path << " with "
838
1.00k
                << packed_file->slice_locations.size() << " small files: ";
839
1.00k
            bool first = true;
840
1.00k
            for (const auto& [small_file_path, index] : packed_file->slice_locations) {
841
1.00k
                if (!first) {
842
0
                    oss << ", ";
843
0
                }
844
1.00k
                first = false;
845
1.00k
                oss << "[" << small_file_path << ", offset=" << index.offset
846
1.00k
                    << ", size=" << index.size << "]";
847
1.00k
            }
848
1.00k
            VLOG_DEBUG << oss.str();
849
1.00k
        } else {
850
0
            VLOG_DEBUG << "Uploading packed file " << packed_file_path << " with no small files";
851
0
        }
852
853
1.00k
        Status upload_status = finalize_packed_file_upload(packed_file->packed_file_path,
854
1.00k
                                                           packed_file->writer.get());
855
856
1.00k
        if (upload_status.is<ErrorCode::ALREADY_CLOSED>()) {
857
0
            record_ready_to_upload(packed_file);
858
0
            handle_success(packed_file);
859
0
            continue;
860
0
        }
861
1.00k
        if (!upload_status.ok()) {
862
0
            handle_failure(packed_file, upload_status);
863
0
            continue;
864
0
        }
865
866
1.00k
        record_ready_to_upload(packed_file);
867
1.00k
        packed_file->state = PackedFileState::UPLOADING;
868
1.00k
    }
869
870
2.00k
    for (auto& packed_file : files_uploading) {
871
2.00k
        if (!packed_file->writer) {
872
0
            handle_failure(packed_file,
873
0
                           Status::InternalError("File writer is null for packed file: {}",
874
0
                                                 packed_file->packed_file_path));
875
0
            continue;
876
0
        }
877
878
2.00k
        if (packed_file->writer->state() != FileWriter::State::CLOSED) {
879
2
            continue;
880
2
        }
881
882
2.00k
        Status status = packed_file->writer->close(true);
883
2.00k
        if (status.is<ErrorCode::ALREADY_CLOSED>()) {
884
1.00k
            handle_success(packed_file);
885
1.00k
            continue;
886
1.00k
        }
887
1.00k
        if (status.ok()) {
888
1.00k
            continue;
889
1.00k
        }
890
891
1
        handle_failure(packed_file, status);
892
1
    }
893
498
}
894
895
Status PackedFileManager::finalize_packed_file_upload(const std::string& packed_file_path,
896
1.00k
                                                      FileWriter* writer) {
897
1.00k
    if (writer == nullptr) {
898
1
        return Status::InternalError("File writer is null for packed file: " + packed_file_path);
899
1
    }
900
901
1.00k
    return writer->close(true);
902
1.00k
}
903
904
Status PackedFileManager::update_meta_service(const std::string& packed_file_path,
905
1.00k
                                              const cloud::PackedFileInfoPB& packed_file_info) {
906
1.00k
#ifdef BE_TEST
907
1.00k
    TEST_SYNC_POINT_RETURN_WITH_VALUE("PackedFileManager::update_meta_service", Status::OK(),
908
2
                                      packed_file_path, &packed_file_info);
909
2
#endif
910
2
    VLOG_DEBUG << "Updating meta service for packed file: " << packed_file_path << " with "
911
0
               << packed_file_info.total_slice_num() << " small files"
912
0
               << ", total bytes: " << packed_file_info.total_slice_bytes();
913
914
    // Get CloudMetaMgr through StorageEngine
915
2
    if (!config::is_cloud_mode()) {
916
2
        return Status::InternalError("Storage engine is not cloud mode");
917
2
    }
918
919
0
    auto& storage_engine = ExecEnv::GetInstance()->storage_engine();
920
0
    auto& cloud_meta_mgr = storage_engine.to_cloud().meta_mgr();
921
0
    return cloud_meta_mgr.update_packed_file_info(packed_file_path, packed_file_info);
922
2
}
923
924
1
void PackedFileManager::cleanup_expired_data() {
925
1
    auto current_time = std::time(nullptr);
926
927
    // Clean up expired uploaded files
928
1
    {
929
1
        std::lock_guard<std::mutex> uploaded_lock(_packed_files_mutex);
930
1
        auto it = _uploaded_packed_files.begin();
931
2
        while (it != _uploaded_packed_files.end()) {
932
1
            if (it->second->state == PackedFileState::UPLOADED &&
933
1
                current_time - it->second->upload_time > config::uploaded_file_retention_seconds) {
934
1
                it = _uploaded_packed_files.erase(it);
935
1
            } else if (it->second->state == PackedFileState::FAILED &&
936
0
                       current_time - it->second->upload_time >
937
0
                               config::uploaded_file_retention_seconds) {
938
0
                it = _uploaded_packed_files.erase(it);
939
0
            } else {
940
0
                ++it;
941
0
            }
942
1
        }
943
1
    }
944
945
    // Clean up expired global index entries
946
1
    {
947
1
        std::lock_guard<std::mutex> global_lock(_global_index_mutex);
948
1
        auto it = _global_slice_locations.begin();
949
1
        while (it != _global_slice_locations.end()) {
950
0
            const auto& index = it->second;
951
0
            if (index.create_time > 0 &&
952
0
                current_time - index.create_time > config::uploaded_file_retention_seconds) {
953
0
                it = _global_slice_locations.erase(it);
954
0
            } else {
955
0
                ++it;
956
0
            }
957
0
        }
958
1
    }
959
1
}
960
961
#ifdef BE_TEST
962
namespace {
963
96
void reset_adder(bvar::Adder<int64_t>& adder) {
964
96
    auto current = adder.get_value();
965
96
    if (current != 0) {
966
9
        adder << (-current);
967
9
    }
968
96
}
969
} // namespace
970
971
32
void PackedFileManager::reset_packed_file_bvars_for_test() const {
972
32
    reset_adder(g_packed_file_total_count);
973
32
    reset_adder(g_packed_file_total_small_file_count);
974
32
    reset_adder(g_packed_file_total_size_bytes);
975
32
    g_packed_file_small_file_num_recorder.reset();
976
32
    g_packed_file_file_size_recorder.reset();
977
32
    g_packed_file_active_to_ready_ms_recorder.reset();
978
32
    g_packed_file_ready_to_upload_ms_recorder.reset();
979
32
    g_packed_file_uploading_to_uploaded_ms_recorder.reset();
980
32
    if (auto* sampler = g_packed_file_small_file_num_recorder.get_sampler()) {
981
32
        sampler->take_sample();
982
32
    }
983
32
    if (auto* sampler = g_packed_file_file_size_recorder.get_sampler()) {
984
32
        sampler->take_sample();
985
32
    }
986
32
    if (auto* sampler = g_packed_file_active_to_ready_ms_recorder.get_sampler()) {
987
32
        sampler->take_sample();
988
32
    }
989
32
    if (auto* sampler = g_packed_file_ready_to_upload_ms_recorder.get_sampler()) {
990
32
        sampler->take_sample();
991
32
    }
992
32
    if (auto* sampler = g_packed_file_uploading_to_uploaded_ms_recorder.get_sampler()) {
993
32
        sampler->take_sample();
994
32
    }
995
32
}
996
997
2
int64_t PackedFileManager::packed_file_total_count_for_test() const {
998
2
    return g_packed_file_total_count.get_value();
999
2
}
1000
1001
2
int64_t PackedFileManager::packed_file_total_small_file_num_for_test() const {
1002
2
    return g_packed_file_total_small_file_count.get_value();
1003
2
}
1004
1005
2
int64_t PackedFileManager::packed_file_total_size_bytes_for_test() const {
1006
2
    return g_packed_file_total_size_bytes.get_value();
1007
2
}
1008
1009
2
double PackedFileManager::packed_file_avg_small_file_num_for_test() const {
1010
2
    return g_packed_file_avg_small_file_num.get_value().get_average_double();
1011
2
}
1012
1013
2
double PackedFileManager::packed_file_avg_file_size_for_test() const {
1014
2
    return g_packed_file_avg_file_size_bytes.get_value().get_average_double();
1015
2
}
1016
1017
void PackedFileManager::record_packed_file_metrics_for_test(
1018
1
        const PackedFileManager::PackedFileContext* packed_file) {
1019
1
    DCHECK(packed_file != nullptr);
1020
1
    record_packed_file_metrics(*packed_file);
1021
1
}
1022
1023
2
void PackedFileManager::clear_state_for_test() {
1024
2
    std::lock_guard<std::timed_mutex> cur_lock(_current_packed_file_mutex);
1025
2
    _current_packed_files.clear();
1026
2
    {
1027
2
        std::lock_guard<std::mutex> lock(_packed_files_mutex);
1028
2
        _uploading_packed_files.clear();
1029
2
        _uploaded_packed_files.clear();
1030
2
    }
1031
2
    {
1032
2
        std::lock_guard<std::mutex> lock(_global_index_mutex);
1033
2
        _global_slice_locations.clear();
1034
2
    }
1035
2
}
1036
#endif
1037
1038
} // namespace doris::io