be/src/io/fs/packed_file_writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "io/fs/packed_file_writer.h" |
19 | | |
20 | | #include <bvar/recorder.h> |
21 | | #include <bvar/window.h> |
22 | | #include <glog/logging.h> |
23 | | |
24 | | #include "cloud/config.h" |
25 | | #include "common/config.h" |
26 | | #include "common/status.h" |
27 | | #include "runtime/exec_env.h" |
28 | | #include "util/slice.h" |
29 | | |
30 | | namespace doris::io { |
31 | | |
32 | | bvar::IntRecorder packed_file_writer_first_append_to_close_ms_recorder; |
33 | | bvar::Window<bvar::IntRecorder> packed_file_writer_first_append_to_close_ms_window( |
34 | | "packed_file_writer_first_append_to_close_ms", |
35 | | &packed_file_writer_first_append_to_close_ms_recorder, /*window_size=*/10); |
36 | | |
37 | | PackedFileWriter::PackedFileWriter(FileWriterPtr inner_writer, Path path, |
38 | | PackedAppendContext append_info) |
39 | 1.01k | : _inner_writer(std::move(inner_writer)), |
40 | 1.01k | _file_path(path.native()), |
41 | 1.01k | _packed_file_manager(PackedFileManager::instance()), |
42 | 1.01k | _append_info(std::move(append_info)) { |
43 | 1.01k | DCHECK(_inner_writer != nullptr); |
44 | 1.01k | DCHECK(!_file_path.empty()); |
45 | 1.01k | } |
46 | | |
47 | 1.01k | PackedFileWriter::~PackedFileWriter() { |
48 | 1.01k | if (_state == State::OPENED) { |
49 | 11 | LOG(WARNING) << "PackedFileWriter destroyed without being closed, file: " << _file_path; |
50 | 11 | } |
51 | 1.01k | } |
52 | | |
53 | 2.47k | Status PackedFileWriter::appendv(const Slice* data, size_t data_cnt) { |
54 | 2.47k | if (_state != State::OPENED) { |
55 | 0 | return Status::InternalError("Cannot append to closed or closing writer for file: " + |
56 | 0 | _file_path); |
57 | 0 | } |
58 | | |
59 | 2.47k | if (!_first_append_timestamp.has_value()) { |
60 | 1.01k | _first_append_timestamp = std::chrono::steady_clock::now(); |
61 | 1.01k | } |
62 | | |
63 | | // Calculate total size to append |
64 | 2.47k | size_t total_size = 0; |
65 | 4.94k | for (size_t i = 0; i < data_cnt; ++i) { |
66 | 2.47k | total_size += data[i].size; |
67 | 2.47k | } |
68 | | |
69 | 2.47k | if (total_size == 0) { |
70 | 1 | return Status::OK(); |
71 | 1 | } |
72 | | |
73 | | // Check if we should switch to direct write mode |
74 | 2.46k | if (!_is_direct_write && _bytes_appended + total_size > config::small_file_threshold_bytes) { |
75 | 3 | RETURN_IF_ERROR(_switch_to_direct_write()); |
76 | 3 | _is_direct_write = true; |
77 | 3 | } |
78 | | |
79 | | // Write data based on current mode |
80 | 2.46k | if (_is_direct_write) { |
81 | 3 | RETURN_IF_ERROR(_inner_writer->appendv(data, data_cnt)); |
82 | 2.46k | } else { |
83 | 2.46k | _buffer.reserve(_bytes_appended + total_size); |
84 | | // Buffer small file data |
85 | 4.93k | for (size_t i = 0; i < data_cnt; ++i) { |
86 | 2.46k | _buffer.append(data[i].data, data[i].size); |
87 | 2.46k | } |
88 | 2.46k | } |
89 | | |
90 | 2.46k | _bytes_appended += total_size; |
91 | 2.46k | return Status::OK(); |
92 | 2.46k | } |
93 | | |
94 | 2.00k | Status PackedFileWriter::close(bool non_block) { |
95 | 2.00k | if (_state == State::CLOSED) { |
96 | 0 | return Status::OK(); |
97 | 0 | } |
98 | | |
99 | 2.00k | auto record_close_latency = [this]() { |
100 | 1.00k | if (_close_latency_recorded || !_first_append_timestamp.has_value()) { |
101 | 0 | return; |
102 | 0 | } |
103 | 1.00k | auto now = std::chrono::steady_clock::now(); |
104 | 1.00k | auto latency_ms = std::chrono::duration_cast<std::chrono::milliseconds>( |
105 | 1.00k | now - *_first_append_timestamp) |
106 | 1.00k | .count(); |
107 | 1.00k | packed_file_writer_first_append_to_close_ms_recorder << latency_ms; |
108 | 1.00k | if (auto* sampler = packed_file_writer_first_append_to_close_ms_recorder.get_sampler()) { |
109 | 1.00k | sampler->take_sample(); |
110 | 1.00k | } |
111 | 1.00k | _close_latency_recorded = true; |
112 | 1.00k | }; |
113 | | |
114 | 2.00k | if (_state == State::ASYNC_CLOSING) { |
115 | 1.00k | if (non_block) { |
116 | 0 | return Status::InternalError("Don't submit async close multi times"); |
117 | 0 | } |
118 | 1.00k | if (!_is_direct_write) { |
119 | 1.00k | RETURN_IF_ERROR(_wait_packed_upload()); |
120 | 1.00k | } else { |
121 | 0 | RETURN_IF_ERROR(_inner_writer->close(false)); |
122 | 0 | } |
123 | 1.00k | _state = State::CLOSED; |
124 | 1.00k | if (!non_block) { |
125 | 1.00k | record_close_latency(); |
126 | 1.00k | } |
127 | 1.00k | return Status::OK(); |
128 | 1.00k | } |
129 | | |
130 | 1.00k | if (non_block) { |
131 | 1.00k | return _close_async(); |
132 | 1.00k | } else { |
133 | 1 | return _close_sync(); |
134 | 1 | } |
135 | 1.00k | } |
136 | | |
137 | 1.00k | Status PackedFileWriter::_close_async() { |
138 | 1.00k | if (!_is_direct_write) { |
139 | | // Send small file data to packed manager |
140 | 1.00k | RETURN_IF_ERROR(_send_to_packed_manager()); |
141 | 1.00k | } else { |
142 | | // For large files, just close the inner writer asynchronously |
143 | 0 | RETURN_IF_ERROR(_inner_writer->close(true)); |
144 | 0 | } |
145 | 1.00k | _state = State::ASYNC_CLOSING; |
146 | 1.00k | return Status::OK(); |
147 | 1.00k | } |
148 | | |
149 | 1 | Status PackedFileWriter::_close_sync() { |
150 | 1 | if (!_is_direct_write) { |
151 | | // Send small file data to pack manager and wait for upload |
152 | 1 | RETURN_IF_ERROR(_send_to_packed_manager()); |
153 | 0 | RETURN_IF_ERROR(_wait_packed_upload()); |
154 | 0 | } else { |
155 | | // For large files, close the inner writer synchronously |
156 | 0 | RETURN_IF_ERROR(_inner_writer->close(false)); |
157 | 0 | } |
158 | 0 | _state = State::CLOSED; |
159 | 0 | if (!_close_latency_recorded) { |
160 | 0 | auto now = std::chrono::steady_clock::now(); |
161 | 0 | if (_first_append_timestamp.has_value()) { |
162 | 0 | auto latency_ms = std::chrono::duration_cast<std::chrono::milliseconds>( |
163 | 0 | now - *_first_append_timestamp) |
164 | 0 | .count(); |
165 | 0 | packed_file_writer_first_append_to_close_ms_recorder << latency_ms; |
166 | 0 | if (auto* sampler = |
167 | 0 | packed_file_writer_first_append_to_close_ms_recorder.get_sampler()) { |
168 | 0 | sampler->take_sample(); |
169 | 0 | } |
170 | 0 | _close_latency_recorded = true; |
171 | 0 | } |
172 | 0 | } |
173 | 0 | return Status::OK(); |
174 | 1 | } |
175 | | |
176 | 1.00k | Status PackedFileWriter::_wait_packed_upload() { |
177 | 1.00k | DCHECK(!_is_direct_write); |
178 | | // Only wait if we have data that was sent to packed manager |
179 | 1.00k | if (_bytes_appended > 0 && _packed_file_manager != nullptr) { |
180 | 1.00k | return _packed_file_manager->wait_upload_done(_file_path); |
181 | 1.00k | } |
182 | 0 | return Status::OK(); |
183 | 1.00k | } |
184 | | |
185 | 1.00k | void PackedFileWriter::_release_buffer() { |
186 | 1.00k | std::string().swap(_buffer); |
187 | 1.00k | } |
188 | | |
189 | 3 | Status PackedFileWriter::_switch_to_direct_write() { |
190 | 3 | DCHECK(!_is_direct_write); |
191 | | |
192 | | // If we have buffered data, write it to inner writer first |
193 | 3 | if (_buffer.size() > 0) { |
194 | 2 | Slice buffer_slice(_buffer.data(), _buffer.size()); |
195 | 2 | RETURN_IF_ERROR(_inner_writer->appendv(&buffer_slice, 1)); |
196 | 2 | _release_buffer(); |
197 | 2 | } |
198 | | |
199 | 3 | return Status::OK(); |
200 | 3 | } |
201 | | |
202 | 1.00k | Status PackedFileWriter::_send_to_packed_manager() { |
203 | 1.00k | DCHECK(!_is_direct_write); |
204 | | |
205 | 1.00k | if (_packed_file_manager == nullptr) { |
206 | 0 | return Status::InternalError("PackedFileManager is not available"); |
207 | 0 | } |
208 | 1.00k | LOG(INFO) << "send_to_packed_manager: " << _file_path << " buffer size: " << _buffer.size(); |
209 | | |
210 | 1.00k | if (_append_info.resource_id.empty()) { |
211 | 0 | return Status::InternalError("Missing resource id for packed file append"); |
212 | 0 | } |
213 | | |
214 | 1.00k | if (_append_info.txn_id <= 0) { |
215 | 0 | return Status::InvalidArgument("Missing valid txn id for packed file append: " + |
216 | 0 | _file_path); |
217 | 0 | } |
218 | | |
219 | 1.00k | Slice data_slice(_buffer.data(), _buffer.size()); |
220 | 1.00k | RETURN_IF_ERROR(_packed_file_manager->append_small_file(_file_path, data_slice, _append_info)); |
221 | 1.00k | _release_buffer(); |
222 | 1.00k | return Status::OK(); |
223 | 1.00k | } |
224 | | |
225 | 1.00k | Status PackedFileWriter::get_packed_slice_location(PackedSliceLocation* location) const { |
226 | 1.00k | DCHECK(_state == State::CLOSED) |
227 | 0 | << " file_path: " << _file_path << " bytes_appended: " << _bytes_appended; |
228 | 1.00k | if (_is_direct_write) { |
229 | 0 | *location = PackedSliceLocation {}; |
230 | 0 | return Status::OK(); |
231 | 0 | } |
232 | 1.00k | RETURN_IF_ERROR(_packed_file_manager->get_packed_slice_location(_file_path, location)); |
233 | 1.00k | LOG(INFO) << "get_packed_slice_location: " << _file_path |
234 | 1.00k | << " packed_path: " << location->packed_file_path << " " << location->offset << " " |
235 | 1.00k | << location->size; |
236 | 1.00k | return Status::OK(); |
237 | 1.00k | } |
238 | | } // namespace doris::io |