Coverage Report

Created: 2026-06-02 13:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/packed_file_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/packed_file_writer.h"
19
20
#include <bvar/recorder.h>
21
#include <bvar/window.h>
22
#include <glog/logging.h>
23
24
#include "cloud/config.h"
25
#include "common/config.h"
26
#include "common/status.h"
27
#include "runtime/exec_env.h"
28
#include "util/slice.h"
29
30
namespace doris::io {
31
32
bvar::IntRecorder packed_file_writer_first_append_to_close_ms_recorder;
33
bvar::Window<bvar::IntRecorder> packed_file_writer_first_append_to_close_ms_window(
34
        "packed_file_writer_first_append_to_close_ms",
35
        &packed_file_writer_first_append_to_close_ms_recorder, /*window_size=*/10);
36
37
PackedFileWriter::PackedFileWriter(FileWriterPtr inner_writer, Path path,
38
                                   PackedAppendContext append_info)
39
1.01k
        : _inner_writer(std::move(inner_writer)),
40
1.01k
          _file_path(path.native()),
41
1.01k
          _packed_file_manager(PackedFileManager::instance()),
42
1.01k
          _append_info(std::move(append_info)) {
43
1.01k
    DCHECK(_inner_writer != nullptr);
44
1.01k
    DCHECK(!_file_path.empty());
45
1.01k
}
46
47
1.01k
PackedFileWriter::~PackedFileWriter() {
48
1.01k
    if (_state == State::OPENED) {
49
11
        LOG(WARNING) << "PackedFileWriter destroyed without being closed, file: " << _file_path;
50
11
    }
51
1.01k
}
52
53
2.47k
Status PackedFileWriter::appendv(const Slice* data, size_t data_cnt) {
54
2.47k
    if (_state != State::OPENED) {
55
0
        return Status::InternalError("Cannot append to closed or closing writer for file: " +
56
0
                                     _file_path);
57
0
    }
58
59
2.47k
    if (!_first_append_timestamp.has_value()) {
60
1.01k
        _first_append_timestamp = std::chrono::steady_clock::now();
61
1.01k
    }
62
63
    // Calculate total size to append
64
2.47k
    size_t total_size = 0;
65
4.94k
    for (size_t i = 0; i < data_cnt; ++i) {
66
2.47k
        total_size += data[i].size;
67
2.47k
    }
68
69
2.47k
    if (total_size == 0) {
70
1
        return Status::OK();
71
1
    }
72
73
    // Check if we should switch to direct write mode
74
2.46k
    if (!_is_direct_write && _bytes_appended + total_size > config::small_file_threshold_bytes) {
75
3
        RETURN_IF_ERROR(_switch_to_direct_write());
76
3
        _is_direct_write = true;
77
3
    }
78
79
    // Write data based on current mode
80
2.46k
    if (_is_direct_write) {
81
3
        RETURN_IF_ERROR(_inner_writer->appendv(data, data_cnt));
82
2.46k
    } else {
83
2.46k
        _buffer.reserve(_bytes_appended + total_size);
84
        // Buffer small file data
85
4.93k
        for (size_t i = 0; i < data_cnt; ++i) {
86
2.46k
            _buffer.append(data[i].data, data[i].size);
87
2.46k
        }
88
2.46k
    }
89
90
2.46k
    _bytes_appended += total_size;
91
2.46k
    return Status::OK();
92
2.46k
}
93
94
2.00k
Status PackedFileWriter::close(bool non_block) {
95
2.00k
    if (_state == State::CLOSED) {
96
0
        return Status::OK();
97
0
    }
98
99
2.00k
    auto record_close_latency = [this]() {
100
1.00k
        if (_close_latency_recorded || !_first_append_timestamp.has_value()) {
101
0
            return;
102
0
        }
103
1.00k
        auto now = std::chrono::steady_clock::now();
104
1.00k
        auto latency_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
105
1.00k
                                  now - *_first_append_timestamp)
106
1.00k
                                  .count();
107
1.00k
        packed_file_writer_first_append_to_close_ms_recorder << latency_ms;
108
1.00k
        if (auto* sampler = packed_file_writer_first_append_to_close_ms_recorder.get_sampler()) {
109
1.00k
            sampler->take_sample();
110
1.00k
        }
111
1.00k
        _close_latency_recorded = true;
112
1.00k
    };
113
114
2.00k
    if (_state == State::ASYNC_CLOSING) {
115
1.00k
        if (non_block) {
116
0
            return Status::InternalError("Don't submit async close multi times");
117
0
        }
118
1.00k
        if (!_is_direct_write) {
119
1.00k
            RETURN_IF_ERROR(_wait_packed_upload());
120
1.00k
        } else {
121
0
            RETURN_IF_ERROR(_inner_writer->close(false));
122
0
        }
123
1.00k
        _state = State::CLOSED;
124
1.00k
        if (!non_block) {
125
1.00k
            record_close_latency();
126
1.00k
        }
127
1.00k
        return Status::OK();
128
1.00k
    }
129
130
1.00k
    if (non_block) {
131
1.00k
        return _close_async();
132
1.00k
    } else {
133
1
        return _close_sync();
134
1
    }
135
1.00k
}
136
137
1.00k
Status PackedFileWriter::_close_async() {
138
1.00k
    if (!_is_direct_write) {
139
        // Send small file data to packed manager
140
1.00k
        RETURN_IF_ERROR(_send_to_packed_manager());
141
1.00k
    } else {
142
        // For large files, just close the inner writer asynchronously
143
0
        RETURN_IF_ERROR(_inner_writer->close(true));
144
0
    }
145
1.00k
    _state = State::ASYNC_CLOSING;
146
1.00k
    return Status::OK();
147
1.00k
}
148
149
1
Status PackedFileWriter::_close_sync() {
150
1
    if (!_is_direct_write) {
151
        // Send small file data to pack manager and wait for upload
152
1
        RETURN_IF_ERROR(_send_to_packed_manager());
153
0
        RETURN_IF_ERROR(_wait_packed_upload());
154
0
    } else {
155
        // For large files, close the inner writer synchronously
156
0
        RETURN_IF_ERROR(_inner_writer->close(false));
157
0
    }
158
0
    _state = State::CLOSED;
159
0
    if (!_close_latency_recorded) {
160
0
        auto now = std::chrono::steady_clock::now();
161
0
        if (_first_append_timestamp.has_value()) {
162
0
            auto latency_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
163
0
                                      now - *_first_append_timestamp)
164
0
                                      .count();
165
0
            packed_file_writer_first_append_to_close_ms_recorder << latency_ms;
166
0
            if (auto* sampler =
167
0
                        packed_file_writer_first_append_to_close_ms_recorder.get_sampler()) {
168
0
                sampler->take_sample();
169
0
            }
170
0
            _close_latency_recorded = true;
171
0
        }
172
0
    }
173
0
    return Status::OK();
174
1
}
175
176
1.00k
Status PackedFileWriter::_wait_packed_upload() {
177
1.00k
    DCHECK(!_is_direct_write);
178
    // Only wait if we have data that was sent to packed manager
179
1.00k
    if (_bytes_appended > 0 && _packed_file_manager != nullptr) {
180
1.00k
        return _packed_file_manager->wait_upload_done(_file_path);
181
1.00k
    }
182
0
    return Status::OK();
183
1.00k
}
184
185
1.00k
void PackedFileWriter::_release_buffer() {
186
1.00k
    std::string().swap(_buffer);
187
1.00k
}
188
189
3
Status PackedFileWriter::_switch_to_direct_write() {
190
3
    DCHECK(!_is_direct_write);
191
192
    // If we have buffered data, write it to inner writer first
193
3
    if (_buffer.size() > 0) {
194
2
        Slice buffer_slice(_buffer.data(), _buffer.size());
195
2
        RETURN_IF_ERROR(_inner_writer->appendv(&buffer_slice, 1));
196
2
        _release_buffer();
197
2
    }
198
199
3
    return Status::OK();
200
3
}
201
202
1.00k
Status PackedFileWriter::_send_to_packed_manager() {
203
1.00k
    DCHECK(!_is_direct_write);
204
205
1.00k
    if (_packed_file_manager == nullptr) {
206
0
        return Status::InternalError("PackedFileManager is not available");
207
0
    }
208
1.00k
    LOG(INFO) << "send_to_packed_manager: " << _file_path << " buffer size: " << _buffer.size();
209
210
1.00k
    if (_append_info.resource_id.empty()) {
211
0
        return Status::InternalError("Missing resource id for packed file append");
212
0
    }
213
214
1.00k
    if (_append_info.txn_id <= 0) {
215
0
        return Status::InvalidArgument("Missing valid txn id for packed file append: " +
216
0
                                       _file_path);
217
0
    }
218
219
1.00k
    Slice data_slice(_buffer.data(), _buffer.size());
220
1.00k
    RETURN_IF_ERROR(_packed_file_manager->append_small_file(_file_path, data_slice, _append_info));
221
1.00k
    _release_buffer();
222
1.00k
    return Status::OK();
223
1.00k
}
224
225
1.00k
Status PackedFileWriter::get_packed_slice_location(PackedSliceLocation* location) const {
226
1.00k
    DCHECK(_state == State::CLOSED)
227
0
            << " file_path: " << _file_path << " bytes_appended: " << _bytes_appended;
228
1.00k
    if (_is_direct_write) {
229
0
        *location = PackedSliceLocation {};
230
0
        return Status::OK();
231
0
    }
232
1.00k
    RETURN_IF_ERROR(_packed_file_manager->get_packed_slice_location(_file_path, location));
233
1.00k
    LOG(INFO) << "get_packed_slice_location: " << _file_path
234
1.00k
              << " packed_path: " << location->packed_file_path << " " << location->offset << " "
235
1.00k
              << location->size;
236
1.00k
    return Status::OK();
237
1.00k
}
238
} // namespace doris::io