Coverage Report

Created: 2026-03-16 13:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/packed_file_writer.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <chrono>
21
#include <optional>
22
#include <string>
23
24
#include "common/status.h"
25
#include "io/fs/file_reader_writer_fwd.h"
26
#include "io/fs/file_writer.h"
27
#include "io/fs/packed_file_manager.h"
28
#include "io/fs/path.h"
29
#include "util/slice.h"
30
31
namespace doris::io {
32
33
// FileWriter wrapper that buffers small files and writes them to packed file
34
// If file size exceeds threshold, it directly writes to inner writer
35
// Otherwise, it buffers data and writes to PackedFileManager on close
36
class PackedFileWriter final : public FileWriter {
37
public:
38
    PackedFileWriter(FileWriterPtr inner_writer, Path path,
39
                     PackedAppendContext append_info = PackedAppendContext {});
40
    ~PackedFileWriter() override;
41
42
    PackedFileWriter(const PackedFileWriter&) = delete;
43
    const PackedFileWriter& operator=(const PackedFileWriter&) = delete;
44
45
    Status appendv(const Slice* data, size_t data_cnt) override;
46
47
1
    const Path& path() const override { return _inner_writer->path(); }
48
49
6
    size_t bytes_appended() const override { return _bytes_appended; }
50
51
1
    State state() const override { return _state; }
52
53
    Status close(bool non_block = false) override;
54
55
    // Get merge file segment index information
56
    // This method should be called after close(true) to get the index information
57
    // Returns empty index if file is not in merge file
58
    Status get_packed_slice_location(PackedSliceLocation* location) const;
59
60
    // Returns true if this file's data was written to a packed file (not direct write)
61
0
    bool is_in_packed_file() const override { return !_is_direct_write; }
62
63
private:
64
    // Async close: submit data without waiting
65
    Status _close_async();
66
67
    // Sync close: submit data and wait for completion
68
    Status _close_sync();
69
70
    // Switch from merge file mode to direct write mode
71
    Status _switch_to_direct_write();
72
73
    // Send buffered data to merge file manager
74
    Status _send_to_packed_manager();
75
76
    // Wait for packed file to be uploaded to S3
77
    Status _wait_packed_upload();
78
79
    FileWriterPtr _inner_writer;
80
    std::string _file_path; // Store file path as string for packed file manager
81
82
    // Buffer for small files
83
    std::string _buffer;
84
    size_t _bytes_appended = 0;
85
    State _state = State::OPENED;
86
    bool _is_direct_write = false;                      // Whether to use direct write mode
87
    PackedFileManager* _packed_file_manager = nullptr;  // Packed file manager instance
88
    mutable PackedSliceLocation _packed_slice_location; // Packed slice info (mutable for lazy init)
89
    PackedAppendContext _append_info;
90
    std::optional<std::chrono::steady_clock::time_point> _first_append_timestamp;
91
    bool _close_latency_recorded = false;
92
};
93
94
} // namespace doris::io