Coverage Report

Created: 2026-04-15 12:22

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/packed_file_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/packed_file_writer.h"
19
20
#include <bvar/recorder.h>
21
#include <bvar/window.h>
22
#include <glog/logging.h>
23
24
#include "cloud/config.h"
25
#include "common/config.h"
26
#include "common/status.h"
27
#include "runtime/exec_env.h"
28
#include "util/slice.h"
29
30
namespace doris::io {
31
32
bvar::IntRecorder packed_file_writer_first_append_to_close_ms_recorder;
33
bvar::Window<bvar::IntRecorder> packed_file_writer_first_append_to_close_ms_window(
34
        "packed_file_writer_first_append_to_close_ms",
35
        &packed_file_writer_first_append_to_close_ms_recorder, /*window_size=*/10);
36
37
PackedFileWriter::PackedFileWriter(FileWriterPtr inner_writer, Path path,
38
                                   PackedAppendContext append_info)
39
2.01k
        : _inner_writer(std::move(inner_writer)),
40
2.01k
          _file_path(path.native()),
41
2.01k
          _packed_file_manager(PackedFileManager::instance()),
42
2.01k
          _append_info(std::move(append_info)) {
43
2.01k
    DCHECK(_inner_writer != nullptr);
44
2.01k
    DCHECK(!_file_path.empty());
45
2.01k
}
46
47
2.02k
PackedFileWriter::~PackedFileWriter() {
48
2.02k
    if (_state == State::OPENED) {
49
20
        LOG(WARNING) << "PackedFileWriter destroyed without being closed, file: " << _file_path;
50
20
    }
51
2.02k
}
52
53
4.93k
Status PackedFileWriter::appendv(const Slice* data, size_t data_cnt) {
54
4.93k
    if (_state != State::OPENED) {
55
0
        return Status::InternalError("Cannot append to closed or closing writer for file: " +
56
0
                                     _file_path);
57
0
    }
58
59
4.93k
    if (!_first_append_timestamp.has_value()) {
60
2.02k
        _first_append_timestamp = std::chrono::steady_clock::now();
61
2.02k
    }
62
63
    // Calculate total size to append
64
4.93k
    size_t total_size = 0;
65
9.87k
    for (size_t i = 0; i < data_cnt; ++i) {
66
4.93k
        total_size += data[i].size;
67
4.93k
    }
68
69
4.93k
    if (total_size == 0) {
70
2
        return Status::OK();
71
2
    }
72
73
    // Check if we should switch to direct write mode
74
4.93k
    if (!_is_direct_write && _bytes_appended + total_size > config::small_file_threshold_bytes) {
75
4
        RETURN_IF_ERROR(_switch_to_direct_write());
76
4
        _is_direct_write = true;
77
4
    }
78
79
    // Write data based on current mode
80
4.93k
    if (_is_direct_write) {
81
4
        RETURN_IF_ERROR(_inner_writer->appendv(data, data_cnt));
82
4.93k
    } else {
83
        // Buffer small file data
84
9.86k
        for (size_t i = 0; i < data_cnt; ++i) {
85
4.93k
            _buffer.append(data[i].data, data[i].size);
86
4.93k
        }
87
4.93k
    }
88
89
4.93k
    _bytes_appended += total_size;
90
4.93k
    return Status::OK();
91
4.93k
}
92
93
4.00k
Status PackedFileWriter::close(bool non_block) {
94
4.00k
    if (_state == State::CLOSED) {
95
0
        return Status::OK();
96
0
    }
97
98
4.00k
    auto record_close_latency = [this]() {
99
2.00k
        if (_close_latency_recorded || !_first_append_timestamp.has_value()) {
100
0
            return;
101
0
        }
102
2.00k
        auto now = std::chrono::steady_clock::now();
103
2.00k
        auto latency_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
104
2.00k
                                  now - *_first_append_timestamp)
105
2.00k
                                  .count();
106
2.00k
        packed_file_writer_first_append_to_close_ms_recorder << latency_ms;
107
2.00k
        if (auto* sampler = packed_file_writer_first_append_to_close_ms_recorder.get_sampler()) {
108
1.99k
            sampler->take_sample();
109
1.99k
        }
110
2.00k
        _close_latency_recorded = true;
111
2.00k
    };
112
113
4.00k
    if (_state == State::ASYNC_CLOSING) {
114
2.00k
        if (non_block) {
115
0
            return Status::InternalError("Don't submit async close multi times");
116
0
        }
117
2.00k
        if (!_is_direct_write) {
118
2.00k
            RETURN_IF_ERROR(_wait_packed_upload());
119
2.00k
        } else {
120
0
            RETURN_IF_ERROR(_inner_writer->close(false));
121
0
        }
122
2.00k
        _state = State::CLOSED;
123
2.00k
        if (!non_block) {
124
2.00k
            record_close_latency();
125
2.00k
        }
126
2.00k
        return Status::OK();
127
2.00k
    }
128
129
2.00k
    if (non_block) {
130
2.00k
        return _close_async();
131
2.00k
    } else {
132
2
        return _close_sync();
133
2
    }
134
2.00k
}
135
136
2.00k
Status PackedFileWriter::_close_async() {
137
2.00k
    if (!_is_direct_write) {
138
        // Send small file data to packed manager
139
2.00k
        RETURN_IF_ERROR(_send_to_packed_manager());
140
2.00k
    } else {
141
        // For large files, just close the inner writer asynchronously
142
0
        RETURN_IF_ERROR(_inner_writer->close(true));
143
0
    }
144
2.00k
    _state = State::ASYNC_CLOSING;
145
2.00k
    return Status::OK();
146
2.00k
}
147
148
2
Status PackedFileWriter::_close_sync() {
149
2
    if (!_is_direct_write) {
150
        // Send small file data to pack manager and wait for upload
151
2
        RETURN_IF_ERROR(_send_to_packed_manager());
152
0
        RETURN_IF_ERROR(_wait_packed_upload());
153
0
    } else {
154
        // For large files, close the inner writer synchronously
155
0
        RETURN_IF_ERROR(_inner_writer->close(false));
156
0
    }
157
0
    _state = State::CLOSED;
158
0
    if (!_close_latency_recorded) {
159
0
        auto now = std::chrono::steady_clock::now();
160
0
        if (_first_append_timestamp.has_value()) {
161
0
            auto latency_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
162
0
                                      now - *_first_append_timestamp)
163
0
                                      .count();
164
0
            packed_file_writer_first_append_to_close_ms_recorder << latency_ms;
165
0
            if (auto* sampler =
166
0
                        packed_file_writer_first_append_to_close_ms_recorder.get_sampler()) {
167
0
                sampler->take_sample();
168
0
            }
169
0
            _close_latency_recorded = true;
170
0
        }
171
0
    }
172
0
    return Status::OK();
173
2
}
174
175
2.00k
Status PackedFileWriter::_wait_packed_upload() {
176
2.00k
    DCHECK(!_is_direct_write);
177
    // Only wait if we have data that was sent to packed manager
178
2.00k
    if (_bytes_appended > 0 && _packed_file_manager != nullptr) {
179
2.00k
        return _packed_file_manager->wait_upload_done(_file_path);
180
2.00k
    }
181
0
    return Status::OK();
182
2.00k
}
183
184
4
Status PackedFileWriter::_switch_to_direct_write() {
185
4
    DCHECK(!_is_direct_write);
186
187
    // If we have buffered data, write it to inner writer first
188
4
    if (_buffer.size() > 0) {
189
2
        Slice buffer_slice(_buffer.data(), _buffer.size());
190
2
        RETURN_IF_ERROR(_inner_writer->appendv(&buffer_slice, 1));
191
2
        _buffer.clear();
192
2
    }
193
194
4
    return Status::OK();
195
4
}
196
197
2.00k
Status PackedFileWriter::_send_to_packed_manager() {
198
2.00k
    DCHECK(!_is_direct_write);
199
200
2.00k
    if (_packed_file_manager == nullptr) {
201
0
        return Status::InternalError("PackedFileManager is not available");
202
0
    }
203
2.00k
    LOG(INFO) << "send_to_packed_manager: " << _file_path << " buffer size: " << _buffer.size();
204
205
2.00k
    if (_append_info.resource_id.empty()) {
206
0
        return Status::InternalError("Missing resource id for packed file append");
207
0
    }
208
209
2.00k
    if (_append_info.txn_id <= 0) {
210
0
        return Status::InvalidArgument("Missing valid txn id for packed file append: " +
211
0
                                       _file_path);
212
0
    }
213
214
2.00k
    Slice data_slice(_buffer.data(), _buffer.size());
215
2.00k
    RETURN_IF_ERROR(_packed_file_manager->append_small_file(_file_path, data_slice, _append_info));
216
2.00k
    _buffer.clear();
217
2.00k
    return Status::OK();
218
2.00k
}
219
220
2.00k
Status PackedFileWriter::get_packed_slice_location(PackedSliceLocation* location) const {
221
2.00k
    DCHECK(_state == State::CLOSED)
222
0
            << " file_path: " << _file_path << " bytes_appended: " << _bytes_appended;
223
2.00k
    if (_is_direct_write) {
224
0
        *location = PackedSliceLocation {};
225
0
        return Status::OK();
226
0
    }
227
2.00k
    RETURN_IF_ERROR(_packed_file_manager->get_packed_slice_location(_file_path, location));
228
2.00k
    LOG(INFO) << "get_packed_slice_location: " << _file_path
229
2.00k
              << " packed_path: " << location->packed_file_path << " " << location->offset << " "
230
2.00k
              << location->size;
231
2.00k
    return Status::OK();
232
2.00k
}
233
} // namespace doris::io