Coverage Report

Created: 2026-06-10 09:47

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/packed_file_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/packed_file_writer.h"
19
20
#include <bvar/recorder.h>
21
#include <bvar/window.h>
22
#include <glog/logging.h>
23
24
#include "cloud/config.h"
25
#include "common/config.h"
26
#include "common/status.h"
27
#include "runtime/exec_env.h"
28
#include "util/slice.h"
29
30
namespace doris::io {
31
32
bvar::IntRecorder packed_file_writer_first_append_to_close_ms_recorder;
33
bvar::Window<bvar::IntRecorder> packed_file_writer_first_append_to_close_ms_window(
34
        "packed_file_writer_first_append_to_close_ms",
35
        &packed_file_writer_first_append_to_close_ms_recorder, /*window_size=*/10);
36
37
PackedFileWriter::PackedFileWriter(FileWriterPtr inner_writer, Path path,
38
                                   PackedAppendContext append_info)
39
62.4k
        : _inner_writer(std::move(inner_writer)),
40
62.4k
          _file_path(path.native()),
41
62.4k
          _packed_file_manager(PackedFileManager::instance()),
42
62.4k
          _append_info(std::move(append_info)) {
43
62.4k
    DCHECK(_inner_writer != nullptr);
44
62.4k
    DCHECK(!_file_path.empty());
45
62.4k
}
46
47
62.7k
PackedFileWriter::~PackedFileWriter() {
48
62.7k
    if (_state == State::OPENED) {
49
66
        LOG(WARNING) << "PackedFileWriter destroyed without being closed, file: " << _file_path;
50
66
    }
51
62.7k
}
52
53
2.27M
Status PackedFileWriter::appendv(const Slice* data, size_t data_cnt) {
54
2.27M
    if (_state != State::OPENED) {
55
0
        return Status::InternalError("Cannot append to closed or closing writer for file: " +
56
0
                                     _file_path);
57
0
    }
58
59
2.27M
    if (!_first_append_timestamp.has_value()) {
60
62.7k
        _first_append_timestamp = std::chrono::steady_clock::now();
61
62.7k
    }
62
63
    // Calculate total size to append
64
2.27M
    size_t total_size = 0;
65
9.00M
    for (size_t i = 0; i < data_cnt; ++i) {
66
6.72M
        total_size += data[i].size;
67
6.72M
    }
68
69
2.27M
    if (total_size == 0) {
70
1
        return Status::OK();
71
1
    }
72
73
    // Check if we should switch to direct write mode
74
2.27M
    if (!_is_direct_write && _bytes_appended + total_size > config::small_file_threshold_bytes) {
75
257
        RETURN_IF_ERROR(_switch_to_direct_write());
76
257
        _is_direct_write = true;
77
257
    }
78
79
    // Write data based on current mode
80
2.27M
    if (_is_direct_write) {
81
74.2k
        RETURN_IF_ERROR(_inner_writer->appendv(data, data_cnt));
82
2.19M
    } else {
83
2.19M
        _buffer.reserve(_bytes_appended + total_size);
84
        // Buffer small file data
85
8.71M
        for (size_t i = 0; i < data_cnt; ++i) {
86
6.52M
            _buffer.append(data[i].data, data[i].size);
87
6.52M
        }
88
2.19M
    }
89
90
2.27M
    _bytes_appended += total_size;
91
2.27M
    return Status::OK();
92
2.27M
}
93
94
125k
Status PackedFileWriter::close(bool non_block) {
95
125k
    if (_state == State::CLOSED) {
96
0
        return Status::OK();
97
0
    }
98
99
125k
    auto record_close_latency = [this]() {
100
62.5k
        if (_close_latency_recorded || !_first_append_timestamp.has_value()) {
101
37
            return;
102
37
        }
103
62.4k
        auto now = std::chrono::steady_clock::now();
104
62.4k
        auto latency_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
105
62.4k
                                  now - *_first_append_timestamp)
106
62.4k
                                  .count();
107
62.4k
        packed_file_writer_first_append_to_close_ms_recorder << latency_ms;
108
62.5k
        if (auto* sampler = packed_file_writer_first_append_to_close_ms_recorder.get_sampler()) {
109
62.5k
            sampler->take_sample();
110
62.5k
        }
111
62.4k
        _close_latency_recorded = true;
112
62.4k
    };
113
114
125k
    if (_state == State::ASYNC_CLOSING) {
115
62.6k
        if (non_block) {
116
0
            return Status::InternalError("Don't submit async close multi times");
117
0
        }
118
62.6k
        if (!_is_direct_write) {
119
62.4k
            RETURN_IF_ERROR(_wait_packed_upload());
120
62.4k
        } else {
121
254
            RETURN_IF_ERROR(_inner_writer->close(false));
122
254
        }
123
62.6k
        _state = State::CLOSED;
124
62.6k
        if (!non_block) {
125
62.5k
            record_close_latency();
126
62.5k
        }
127
62.6k
        return Status::OK();
128
62.6k
    }
129
130
62.5k
    if (non_block) {
131
62.5k
        return _close_async();
132
62.5k
    } else {
133
1
        return _close_sync();
134
1
    }
135
62.5k
}
136
137
62.5k
Status PackedFileWriter::_close_async() {
138
62.5k
    if (!_is_direct_write) {
139
        // Send small file data to packed manager
140
62.2k
        RETURN_IF_ERROR(_send_to_packed_manager());
141
62.2k
    } else {
142
        // For large files, just close the inner writer asynchronously
143
245
        RETURN_IF_ERROR(_inner_writer->close(true));
144
245
    }
145
62.5k
    _state = State::ASYNC_CLOSING;
146
62.5k
    return Status::OK();
147
62.5k
}
148
149
1
Status PackedFileWriter::_close_sync() {
150
1
    if (!_is_direct_write) {
151
        // Send small file data to pack manager and wait for upload
152
1
        RETURN_IF_ERROR(_send_to_packed_manager());
153
0
        RETURN_IF_ERROR(_wait_packed_upload());
154
0
    } else {
155
        // For large files, close the inner writer synchronously
156
0
        RETURN_IF_ERROR(_inner_writer->close(false));
157
0
    }
158
0
    _state = State::CLOSED;
159
0
    if (!_close_latency_recorded) {
160
0
        auto now = std::chrono::steady_clock::now();
161
0
        if (_first_append_timestamp.has_value()) {
162
0
            auto latency_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
163
0
                                      now - *_first_append_timestamp)
164
0
                                      .count();
165
0
            packed_file_writer_first_append_to_close_ms_recorder << latency_ms;
166
0
            if (auto* sampler =
167
0
                        packed_file_writer_first_append_to_close_ms_recorder.get_sampler()) {
168
0
                sampler->take_sample();
169
0
            }
170
0
            _close_latency_recorded = true;
171
0
        }
172
0
    }
173
0
    return Status::OK();
174
1
}
175
176
62.4k
Status PackedFileWriter::_wait_packed_upload() {
177
62.4k
    DCHECK(!_is_direct_write);
178
    // Only wait if we have data that was sent to packed manager
179
62.4k
    if (_bytes_appended > 0 && _packed_file_manager != nullptr) {
180
62.3k
        return _packed_file_manager->wait_upload_done(_file_path);
181
62.3k
    }
182
27
    return Status::OK();
183
62.4k
}
184
185
62.7k
void PackedFileWriter::_release_buffer() {
186
62.7k
    std::string().swap(_buffer);
187
62.7k
}
188
189
257
Status PackedFileWriter::_switch_to_direct_write() {
190
257
    DCHECK(!_is_direct_write);
191
192
    // If we have buffered data, write it to inner writer first
193
257
    if (_buffer.size() > 0) {
194
256
        Slice buffer_slice(_buffer.data(), _buffer.size());
195
256
        RETURN_IF_ERROR(_inner_writer->appendv(&buffer_slice, 1));
196
256
        _release_buffer();
197
256
    }
198
199
257
    return Status::OK();
200
257
}
201
202
62.1k
Status PackedFileWriter::_send_to_packed_manager() {
203
62.1k
    DCHECK(!_is_direct_write);
204
205
62.1k
    if (_packed_file_manager == nullptr) {
206
0
        return Status::InternalError("PackedFileManager is not available");
207
0
    }
208
62.1k
    LOG(INFO) << "send_to_packed_manager: " << _file_path << " buffer size: " << _buffer.size();
209
210
62.1k
    if (_append_info.resource_id.empty()) {
211
0
        return Status::InternalError("Missing resource id for packed file append");
212
0
    }
213
214
62.1k
    if (_append_info.txn_id <= 0) {
215
0
        return Status::InvalidArgument("Missing valid txn id for packed file append: " +
216
0
                                       _file_path);
217
0
    }
218
219
62.1k
    Slice data_slice(_buffer.data(), _buffer.size());
220
62.1k
    RETURN_IF_ERROR(_packed_file_manager->append_small_file(_file_path, data_slice, _append_info));
221
62.1k
    _release_buffer();
222
62.1k
    return Status::OK();
223
62.1k
}
224
225
1.00k
Status PackedFileWriter::get_packed_slice_location(PackedSliceLocation* location) const {
226
1.00k
    DCHECK(_state == State::CLOSED)
227
0
            << " file_path: " << _file_path << " bytes_appended: " << _bytes_appended;
228
1.00k
    if (_is_direct_write) {
229
0
        *location = PackedSliceLocation {};
230
0
        return Status::OK();
231
0
    }
232
1.00k
    RETURN_IF_ERROR(_packed_file_manager->get_packed_slice_location(_file_path, location));
233
1.00k
    LOG(INFO) << "get_packed_slice_location: " << _file_path
234
1.00k
              << " packed_path: " << location->packed_file_path << " " << location->offset << " "
235
1.00k
              << location->size;
236
1.00k
    return Status::OK();
237
1.00k
}
238
} // namespace doris::io