be/src/io/fs/packed_file_writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "io/fs/packed_file_writer.h" |
19 | | |
20 | | #include <bvar/recorder.h> |
21 | | #include <bvar/window.h> |
22 | | #include <glog/logging.h> |
23 | | |
24 | | #include "cloud/config.h" |
25 | | #include "common/config.h" |
26 | | #include "common/status.h" |
27 | | #include "runtime/exec_env.h" |
28 | | #include "util/slice.h" |
29 | | |
30 | | namespace doris::io { |
31 | | |
32 | | bvar::IntRecorder packed_file_writer_first_append_to_close_ms_recorder; |
33 | | bvar::Window<bvar::IntRecorder> packed_file_writer_first_append_to_close_ms_window( |
34 | | "packed_file_writer_first_append_to_close_ms", |
35 | | &packed_file_writer_first_append_to_close_ms_recorder, /*window_size=*/10); |
36 | | |
37 | | PackedFileWriter::PackedFileWriter(FileWriterPtr inner_writer, Path path, |
38 | | PackedAppendContext append_info) |
39 | 63.4k | : _inner_writer(std::move(inner_writer)), |
40 | 63.4k | _file_path(path.native()), |
41 | 63.4k | _packed_file_manager(PackedFileManager::instance()), |
42 | 63.4k | _append_info(std::move(append_info)) { |
43 | 63.4k | DCHECK(_inner_writer != nullptr); |
44 | 63.4k | DCHECK(!_file_path.empty()); |
45 | 63.4k | } |
46 | | |
47 | 63.7k | PackedFileWriter::~PackedFileWriter() { |
48 | 63.7k | if (_state == State::OPENED) { |
49 | 68 | LOG(WARNING) << "PackedFileWriter destroyed without being closed, file: " << _file_path; |
50 | 68 | } |
51 | 63.7k | } |
52 | | |
53 | 2.29M | Status PackedFileWriter::appendv(const Slice* data, size_t data_cnt) { |
54 | 2.29M | if (_state != State::OPENED) { |
55 | 0 | return Status::InternalError("Cannot append to closed or closing writer for file: " + |
56 | 0 | _file_path); |
57 | 0 | } |
58 | | |
59 | 2.29M | if (!_first_append_timestamp.has_value()) { |
60 | 63.6k | _first_append_timestamp = std::chrono::steady_clock::now(); |
61 | 63.6k | } |
62 | | |
63 | | // Calculate total size to append |
64 | 2.29M | size_t total_size = 0; |
65 | 9.10M | for (size_t i = 0; i < data_cnt; ++i) { |
66 | 6.80M | total_size += data[i].size; |
67 | 6.80M | } |
68 | | |
69 | 2.29M | if (total_size == 0) { |
70 | 1 | return Status::OK(); |
71 | 1 | } |
72 | | |
73 | | // Check if we should switch to direct write mode |
74 | 2.29M | if (!_is_direct_write && _bytes_appended + total_size > config::small_file_threshold_bytes) { |
75 | 232 | RETURN_IF_ERROR(_switch_to_direct_write()); |
76 | 232 | _is_direct_write = true; |
77 | 232 | } |
78 | | |
79 | | // Write data based on current mode |
80 | 2.29M | if (_is_direct_write) { |
81 | 51.9k | RETURN_IF_ERROR(_inner_writer->appendv(data, data_cnt)); |
82 | 2.24M | } else { |
83 | | // Buffer small file data |
84 | 8.89M | for (size_t i = 0; i < data_cnt; ++i) { |
85 | 6.64M | _buffer.append(data[i].data, data[i].size); |
86 | 6.64M | } |
87 | 2.24M | } |
88 | | |
89 | 2.29M | _bytes_appended += total_size; |
90 | 2.29M | return Status::OK(); |
91 | 2.29M | } |
92 | | |
93 | 127k | Status PackedFileWriter::close(bool non_block) { |
94 | 127k | if (_state == State::CLOSED) { |
95 | 0 | return Status::OK(); |
96 | 0 | } |
97 | | |
98 | 127k | auto record_close_latency = [this]() { |
99 | 63.5k | if (_close_latency_recorded || !_first_append_timestamp.has_value()) { |
100 | 58 | return; |
101 | 58 | } |
102 | 63.4k | auto now = std::chrono::steady_clock::now(); |
103 | 63.4k | auto latency_ms = std::chrono::duration_cast<std::chrono::milliseconds>( |
104 | 63.4k | now - *_first_append_timestamp) |
105 | 63.4k | .count(); |
106 | 63.4k | packed_file_writer_first_append_to_close_ms_recorder << latency_ms; |
107 | 63.5k | if (auto* sampler = packed_file_writer_first_append_to_close_ms_recorder.get_sampler()) { |
108 | 63.5k | sampler->take_sample(); |
109 | 63.5k | } |
110 | 63.4k | _close_latency_recorded = true; |
111 | 63.4k | }; |
112 | | |
113 | 127k | if (_state == State::ASYNC_CLOSING) { |
114 | 63.6k | if (non_block) { |
115 | 0 | return Status::InternalError("Don't submit async close multi times"); |
116 | 0 | } |
117 | 63.6k | if (!_is_direct_write) { |
118 | 63.4k | RETURN_IF_ERROR(_wait_packed_upload()); |
119 | 63.4k | } else { |
120 | 233 | RETURN_IF_ERROR(_inner_writer->close(false)); |
121 | 233 | } |
122 | 63.6k | _state = State::CLOSED; |
123 | 63.6k | if (!non_block) { |
124 | 63.5k | record_close_latency(); |
125 | 63.5k | } |
126 | 63.6k | return Status::OK(); |
127 | 63.6k | } |
128 | | |
129 | 63.5k | if (non_block) { |
130 | 63.5k | return _close_async(); |
131 | 63.5k | } else { |
132 | 2 | return _close_sync(); |
133 | 2 | } |
134 | 63.5k | } |
135 | | |
136 | 63.5k | Status PackedFileWriter::_close_async() { |
137 | 63.5k | if (!_is_direct_write) { |
138 | | // Send small file data to packed manager |
139 | 63.3k | RETURN_IF_ERROR(_send_to_packed_manager()); |
140 | 63.3k | } else { |
141 | | // For large files, just close the inner writer asynchronously |
142 | 242 | RETURN_IF_ERROR(_inner_writer->close(true)); |
143 | 242 | } |
144 | 63.5k | _state = State::ASYNC_CLOSING; |
145 | 63.5k | return Status::OK(); |
146 | 63.5k | } |
147 | | |
148 | 1 | Status PackedFileWriter::_close_sync() { |
149 | 1 | if (!_is_direct_write) { |
150 | | // Send small file data to pack manager and wait for upload |
151 | 1 | RETURN_IF_ERROR(_send_to_packed_manager()); |
152 | 0 | RETURN_IF_ERROR(_wait_packed_upload()); |
153 | 0 | } else { |
154 | | // For large files, close the inner writer synchronously |
155 | 0 | RETURN_IF_ERROR(_inner_writer->close(false)); |
156 | 0 | } |
157 | 0 | _state = State::CLOSED; |
158 | 0 | if (!_close_latency_recorded) { |
159 | 0 | auto now = std::chrono::steady_clock::now(); |
160 | 0 | if (_first_append_timestamp.has_value()) { |
161 | 0 | auto latency_ms = std::chrono::duration_cast<std::chrono::milliseconds>( |
162 | 0 | now - *_first_append_timestamp) |
163 | 0 | .count(); |
164 | 0 | packed_file_writer_first_append_to_close_ms_recorder << latency_ms; |
165 | 0 | if (auto* sampler = |
166 | 0 | packed_file_writer_first_append_to_close_ms_recorder.get_sampler()) { |
167 | 0 | sampler->take_sample(); |
168 | 0 | } |
169 | 0 | _close_latency_recorded = true; |
170 | 0 | } |
171 | 0 | } |
172 | 0 | return Status::OK(); |
173 | 1 | } |
174 | | |
175 | 63.4k | Status PackedFileWriter::_wait_packed_upload() { |
176 | 63.4k | DCHECK(!_is_direct_write); |
177 | | // Only wait if we have data that was sent to packed manager |
178 | 63.4k | if (_bytes_appended > 0 && _packed_file_manager != nullptr) { |
179 | 63.3k | return _packed_file_manager->wait_upload_done(_file_path); |
180 | 63.3k | } |
181 | 49 | return Status::OK(); |
182 | 63.4k | } |
183 | | |
184 | 232 | Status PackedFileWriter::_switch_to_direct_write() { |
185 | 232 | DCHECK(!_is_direct_write); |
186 | | |
187 | | // If we have buffered data, write it to inner writer first |
188 | 232 | if (_buffer.size() > 0) { |
189 | 231 | Slice buffer_slice(_buffer.data(), _buffer.size()); |
190 | 231 | RETURN_IF_ERROR(_inner_writer->appendv(&buffer_slice, 1)); |
191 | 231 | _buffer.clear(); |
192 | 231 | } |
193 | | |
194 | 232 | return Status::OK(); |
195 | 232 | } |
196 | | |
197 | 63.3k | Status PackedFileWriter::_send_to_packed_manager() { |
198 | 63.3k | DCHECK(!_is_direct_write); |
199 | | |
200 | 63.3k | if (_packed_file_manager == nullptr) { |
201 | 0 | return Status::InternalError("PackedFileManager is not available"); |
202 | 0 | } |
203 | 63.3k | LOG(INFO) << "send_to_packed_manager: " << _file_path << " buffer size: " << _buffer.size(); |
204 | | |
205 | 63.3k | if (_append_info.resource_id.empty()) { |
206 | 0 | return Status::InternalError("Missing resource id for packed file append"); |
207 | 0 | } |
208 | | |
209 | 63.3k | if (_append_info.txn_id <= 0) { |
210 | 0 | return Status::InvalidArgument("Missing valid txn id for packed file append: " + |
211 | 0 | _file_path); |
212 | 0 | } |
213 | | |
214 | 63.3k | Slice data_slice(_buffer.data(), _buffer.size()); |
215 | 63.3k | RETURN_IF_ERROR(_packed_file_manager->append_small_file(_file_path, data_slice, _append_info)); |
216 | 63.3k | _buffer.clear(); |
217 | 63.3k | return Status::OK(); |
218 | 63.3k | } |
219 | | |
220 | 1.00k | Status PackedFileWriter::get_packed_slice_location(PackedSliceLocation* location) const { |
221 | 1.00k | DCHECK(_state == State::CLOSED) |
222 | 0 | << " file_path: " << _file_path << " bytes_appended: " << _bytes_appended; |
223 | 1.00k | if (_is_direct_write) { |
224 | 0 | *location = PackedSliceLocation {}; |
225 | 0 | return Status::OK(); |
226 | 0 | } |
227 | 1.00k | RETURN_IF_ERROR(_packed_file_manager->get_packed_slice_location(_file_path, location)); |
228 | 1.00k | LOG(INFO) << "get_packed_slice_location: " << _file_path |
229 | 1.00k | << " packed_path: " << location->packed_file_path << " " << location->offset << " " |
230 | 1.00k | << location->size; |
231 | 1.00k | return Status::OK(); |
232 | 1.00k | } |
233 | | } // namespace doris::io |