Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "io/fs/s3_file_writer.h" |
19 | | |
20 | | #include <aws/s3/model/CompletedPart.h> |
21 | | #include <bvar/recorder.h> |
22 | | #include <bvar/reducer.h> |
23 | | #include <bvar/window.h> |
24 | | #include <fmt/core.h> |
25 | | #include <glog/logging.h> |
26 | | |
27 | | #include <sstream> |
28 | | #include <tuple> |
29 | | #include <utility> |
30 | | |
31 | | #include "common/config.h" |
32 | | #include "common/status.h" |
33 | | #include "cpp/sync_point.h" |
34 | | #include "io/cache/block_file_cache.h" |
35 | | #include "io/cache/block_file_cache_factory.h" |
36 | | #include "io/cache/file_block.h" |
37 | | #include "io/cache/file_cache_common.h" |
38 | | #include "io/fs/file_writer.h" |
39 | | #include "io/fs/path.h" |
40 | | #include "io/fs/s3_file_bufferpool.h" |
41 | | #include "io/fs/s3_file_system.h" |
42 | | #include "io/fs/s3_obj_storage_client.h" |
43 | | #include "runtime/exec_env.h" |
44 | | #include "util/debug_points.h" |
45 | | #include "util/s3_util.h" |
46 | | #include "util/stopwatch.hpp" |
47 | | |
48 | | namespace doris::io { |
49 | | |
50 | | bvar::Adder<uint64_t> s3_file_writer_total("s3_file_writer_total_num"); |
51 | | bvar::Adder<uint64_t> s3_bytes_written_total("s3_file_writer_bytes_written"); |
52 | | bvar::Adder<uint64_t> s3_file_created_total("s3_file_writer_file_created"); |
53 | | bvar::Adder<uint64_t> s3_file_being_written("s3_file_writer_file_being_written"); |
54 | | bvar::Adder<uint64_t> s3_file_writer_async_close_queuing("s3_file_writer_async_close_queuing"); |
55 | | bvar::Adder<uint64_t> s3_file_writer_async_close_processing( |
56 | | "s3_file_writer_async_close_processing"); |
57 | | bvar::IntRecorder s3_file_writer_first_append_to_close_ms_recorder; |
58 | | bvar::Window<bvar::IntRecorder> s3_file_writer_first_append_to_close_ms_window( |
59 | | "s3_file_writer_first_append_to_close_ms", |
60 | | &s3_file_writer_first_append_to_close_ms_recorder, /*window_size=*/10); |
61 | | |
62 | | S3FileWriter::S3FileWriter(std::shared_ptr<ObjClientHolder> client, std::string bucket, |
63 | | std::string key, const FileWriterOptions* opts) |
64 | 2.07k | : _obj_storage_path_opts({.path = fmt::format("s3://{}/{}", bucket, key), |
65 | 2.07k | .bucket = std::move(bucket), |
66 | 2.07k | .key = std::move(key)}), |
67 | 2.07k | _used_by_s3_committer(opts ? opts->used_by_s3_committer : false), |
68 | 2.07k | _obj_client(std::move(client)) { |
69 | 2.07k | s3_file_writer_total << 1; |
70 | 2.07k | s3_file_being_written << 1; |
71 | 2.07k | Aws::Http::SetCompliantRfc3986Encoding(true); |
72 | | |
73 | 2.07k | init_cache_builder(opts, _obj_storage_path_opts.path); |
74 | 2.07k | } |
75 | | |
76 | 2.08k | S3FileWriter::~S3FileWriter() { |
77 | 2.08k | if (_async_close_pack != nullptr) { |
78 | | // For thread safety |
79 | 0 | std::ignore = _async_close_pack->future.get(); |
80 | 0 | _async_close_pack = nullptr; |
81 | 2.08k | } else { |
82 | | // Consider one situation where the file writer is destructed after it submit at least one async task |
83 | | // without calling close(), then there exists one occasion where the async task is executed right after |
84 | | // the correspoding S3 file writer is already destructed |
85 | 2.08k | _wait_until_finish(fmt::format("wait s3 file {} upload to be finished", |
86 | 2.08k | _obj_storage_path_opts.path.native())); |
87 | 2.08k | } |
88 | | // We won't do S3 abort operation in BE, we let s3 service do it own. |
89 | 2.08k | if (state() == State::OPENED && !_failed) { |
90 | 1.01k | s3_bytes_written_total << _bytes_appended; |
91 | 1.01k | } |
92 | 2.08k | s3_file_being_written << -1; |
93 | 2.08k | } |
94 | | |
95 | 46 | Status S3FileWriter::_create_multi_upload_request() { |
96 | 46 | LOG(INFO) << "create_multi_upload_request " << _obj_storage_path_opts.path.native(); |
97 | 46 | const auto& client = _obj_client->get(); |
98 | 46 | if (nullptr == client) { |
99 | 0 | return Status::InternalError<false>("invalid obj storage client"); |
100 | 0 | } |
101 | 46 | auto resp = client->create_multipart_upload(_obj_storage_path_opts); |
102 | 46 | if (resp.resp.status.code == ErrorCode::OK) { |
103 | 45 | _obj_storage_path_opts.upload_id = resp.upload_id; |
104 | 45 | } |
105 | 46 | return {resp.resp.status.code, std::move(resp.resp.status.msg)}; |
106 | 46 | } |
107 | | |
108 | 3.14k | void S3FileWriter::_wait_until_finish(std::string_view task_name) { |
109 | 3.14k | auto timeout_duration = config::s3_file_writer_log_interval_second; |
110 | 3.14k | auto msg = fmt::format( |
111 | 3.14k | "{} multipart upload already takes {} seconds, bucket={}, key={}, upload_id={}", |
112 | 3.14k | task_name, timeout_duration, _obj_storage_path_opts.bucket, |
113 | 3.14k | _obj_storage_path_opts.path.native(), |
114 | 3.14k | _obj_storage_path_opts.upload_id.has_value() ? *_obj_storage_path_opts.upload_id : ""); |
115 | 3.14k | timespec current_time; |
116 | | // We don't need high accuracy here, so we use time(nullptr) |
117 | | // since it's the fastest way to get current time(second) |
118 | 3.14k | auto current_time_second = time(nullptr); |
119 | 3.14k | current_time.tv_sec = current_time_second + timeout_duration; |
120 | 3.14k | current_time.tv_nsec = 0; |
121 | | // bthread::countdown_event::timed_wait() should use absolute time |
122 | 3.14k | while (0 != _countdown_event.timed_wait(current_time)) { |
123 | 0 | current_time.tv_sec += timeout_duration; |
124 | 0 | LOG(WARNING) << msg; |
125 | 0 | } |
126 | 3.14k | } |
127 | | |
128 | 1.07k | Status S3FileWriter::close(bool non_block) { |
129 | 1.07k | if (state() == State::CLOSED) { |
130 | 0 | return Status::InternalError("S3FileWriter already closed, file path {}, file key {}", |
131 | 0 | _obj_storage_path_opts.path.native(), |
132 | 0 | _obj_storage_path_opts.key); |
133 | 0 | } |
134 | 1.07k | if (state() == State::ASYNC_CLOSING) { |
135 | 8 | if (non_block) { |
136 | 0 | return Status::InternalError("Don't submit async close multi times"); |
137 | 0 | } |
138 | 8 | CHECK(_async_close_pack != nullptr); |
139 | 8 | _st = _async_close_pack->future.get(); |
140 | 8 | _async_close_pack = nullptr; |
141 | | // We should wait for all the pre async task to be finished |
142 | 8 | _state = State::CLOSED; |
143 | | // The next time we call close() with no matter non_block true or false, it would always return the |
144 | | // '_st' value because this writer is already closed. |
145 | 8 | if (!non_block && _st.ok()) { |
146 | 3 | _record_close_latency(); |
147 | 3 | } |
148 | 8 | return _st; |
149 | 8 | } |
150 | 1.06k | if (non_block) { |
151 | 1.00k | _state = State::ASYNC_CLOSING; |
152 | 1.00k | _async_close_pack = std::make_unique<AsyncCloseStatusPack>(); |
153 | 1.00k | _async_close_pack->future = _async_close_pack->promise.get_future(); |
154 | 1.00k | s3_file_writer_async_close_queuing << 1; |
155 | 1.00k | Status submit_status = Status::OK(); |
156 | 1.00k | DBUG_EXECUTE_IF("S3FileWriter.close.submit_async_close.inject_error", { |
157 | 1.00k | submit_status = Status::IOError("S3FileWriter.close.submit_async_close.inject_error"); |
158 | 1.00k | }); |
159 | 1.00k | if (submit_status.ok()) { |
160 | 1.00k | submit_status = |
161 | 1.00k | ExecEnv::GetInstance()->non_block_close_thread_pool()->submit_func([&]() { |
162 | 1.00k | s3_file_writer_async_close_queuing << -1; |
163 | 1.00k | s3_file_writer_async_close_processing << 1; |
164 | 1.00k | _st = _close_impl(); |
165 | 1.00k | _async_close_pack->promise.set_value(_st); |
166 | 1.00k | s3_file_writer_async_close_processing << -1; |
167 | 1.00k | }); |
168 | 1.00k | } |
169 | 1.00k | if (!submit_status.ok()) { |
170 | 0 | s3_file_writer_async_close_queuing << -1; |
171 | 0 | LOG(WARNING) << "failed to submit async close for " |
172 | 0 | << _obj_storage_path_opts.path.native() |
173 | 0 | << ", fallback to sync close, status=" << submit_status; |
174 | 0 | _st = _close_impl(); |
175 | 0 | _async_close_pack->promise.set_value(_st); |
176 | 0 | return _st; |
177 | 0 | } |
178 | 1.00k | return Status::OK(); |
179 | 1.00k | } |
180 | 59 | _st = _close_impl(); |
181 | 59 | _state = State::CLOSED; |
182 | 59 | if (!non_block && _st.ok()) { |
183 | 57 | _record_close_latency(); |
184 | 57 | } |
185 | 59 | return _st; |
186 | 1.06k | } |
187 | | |
188 | 1.06k | void S3FileWriter::_record_close_latency() { |
189 | 1.06k | if (_close_latency_recorded || !_first_append_timestamp.has_value()) { |
190 | 1 | return; |
191 | 1 | } |
192 | 1.05k | auto now = std::chrono::steady_clock::now(); |
193 | 1.05k | auto latency_ms = |
194 | 1.05k | std::chrono::duration_cast<std::chrono::milliseconds>(now - *_first_append_timestamp) |
195 | 1.05k | .count(); |
196 | 1.05k | s3_file_writer_first_append_to_close_ms_recorder << latency_ms; |
197 | 1.05k | if (auto* sampler = s3_file_writer_first_append_to_close_ms_recorder.get_sampler()) { |
198 | 1.05k | sampler->take_sample(); |
199 | 1.05k | } |
200 | 1.05k | _close_latency_recorded = true; |
201 | 1.05k | } |
202 | | |
203 | 1.00k | Status S3FileWriter::try_finish_close() { |
204 | 1.00k | if (state() == State::CLOSED) { |
205 | 0 | return _st; |
206 | 0 | } |
207 | 1.00k | if (state() != State::ASYNC_CLOSING) { |
208 | 0 | return Status::NotSupported("S3FileWriter is not async closing"); |
209 | 0 | } |
210 | 1.00k | CHECK(_async_close_pack != nullptr); |
211 | 1.00k | if (_async_close_pack->future.wait_for(std::chrono::seconds(0)) != std::future_status::ready) { |
212 | 0 | return Status::NeedSendAgain("async close is not finished"); |
213 | 0 | } |
214 | 1.00k | _st = _async_close_pack->future.get(); |
215 | 1.00k | _async_close_pack = nullptr; |
216 | 1.00k | _state = State::CLOSED; |
217 | 1.00k | if (_st.ok()) { |
218 | 1.00k | _record_close_latency(); |
219 | 1.00k | } |
220 | 1.00k | return _st; |
221 | 1.00k | } |
222 | | |
223 | 1.29k | bool S3FileWriter::_complete_part_task_callback(Status s) { |
224 | 1.29k | bool ret = false; |
225 | 1.29k | if (!s.ok()) [[unlikely]] { |
226 | 3 | VLOG_NOTICE << "failed at key: " << _obj_storage_path_opts.key |
227 | 0 | << ", status: " << s.to_string(); |
228 | 3 | std::unique_lock<std::mutex> _lck {_completed_lock}; |
229 | 3 | _failed = true; |
230 | 3 | ret = true; |
231 | 3 | _st = std::move(s); |
232 | 3 | } |
233 | | // After the signal, there is a scenario where the previous invocation of _wait_until_finish |
234 | | // returns to the caller, and subsequently, the S3 file writer is destructed. |
235 | | // This means that accessing _failed afterwards would result in a heap use after free vulnerability. |
236 | 1.29k | _countdown_event.signal(); |
237 | 1.29k | return ret; |
238 | 1.29k | } |
239 | | |
240 | 1.29k | Status S3FileWriter::_build_upload_buffer() { |
241 | 1.29k | auto builder = FileBufferBuilder(); |
242 | 1.29k | builder.set_type(BufferType::UPLOAD) |
243 | 1.29k | .set_upload_callback([part_num = _cur_part_num, this](UploadFileBuffer& buf) { |
244 | 267 | _upload_one_part(part_num, buf); |
245 | 267 | }) |
246 | 1.29k | .set_file_offset(_bytes_appended) |
247 | 1.29k | .set_sync_after_complete_task([this](auto&& PH1) { |
248 | 1.29k | return _complete_part_task_callback(std::forward<decltype(PH1)>(PH1)); |
249 | 1.29k | }) |
250 | 1.55k | .set_is_cancelled([this]() { return _failed.load(); }); |
251 | 1.29k | if (_cache_builder != nullptr) { |
252 | | // We would load the data into file cache asynchronously which indicates |
253 | | // that this instance of S3FileWriter might have been destructed when we |
254 | | // try to do writing into file cache, so we make the lambda capture the variable |
255 | | // we need by value to extend their lifetime |
256 | 0 | int64_t id = get_tablet_id(_obj_storage_path_opts.path.native()).value_or(0); |
257 | 0 | builder.set_allocate_file_blocks_holder([builder = *_cache_builder, |
258 | 0 | offset = _bytes_appended, |
259 | 0 | tablet_id = id]() -> FileBlocksHolderPtr { |
260 | 0 | return builder.allocate_cache_holder(offset, config::s3_write_buffer_size, tablet_id); |
261 | 0 | }); |
262 | 0 | } |
263 | 1.29k | RETURN_IF_ERROR(builder.build(&_pending_buf)); |
264 | 1.29k | auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get()); |
265 | 1.29k | DCHECK(buf != nullptr); |
266 | 1.29k | return Status::OK(); |
267 | 1.29k | } |
268 | | |
269 | 1.29k | Status S3FileWriter::_submit_upload_buffer(const std::shared_ptr<FileBuffer>& buf) { |
270 | 1.29k | _countdown_event.add_count(); |
271 | 1.29k | DBUG_EXECUTE_IF("S3FileWriter.submit_upload_buffer.inject_error", { |
272 | 1.29k | auto st = Status::IOError("S3FileWriter.submit_upload_buffer.inject_error"); |
273 | 1.29k | _complete_part_task_callback(st); |
274 | 1.29k | return st; |
275 | 1.29k | }); |
276 | 1.29k | auto st = FileBuffer::submit(buf); |
277 | 1.29k | if (!st.ok()) [[unlikely]] { |
278 | 0 | _complete_part_task_callback(st); |
279 | 0 | } |
280 | 1.29k | return st; |
281 | 1.29k | } |
282 | | |
283 | 1.06k | Status S3FileWriter::_close_impl() { |
284 | 1.06k | VLOG_DEBUG << "S3FileWriter::close, path: " << _obj_storage_path_opts.path.native(); |
285 | | |
286 | 1.06k | DBUG_EXECUTE_IF("S3FileWriter._close_impl.inject_error", { |
287 | 1.06k | if (_obj_storage_path_opts.key.ends_with(".dat")) { |
288 | 1.06k | return Status::IOError("S3FileWriter._close_impl.inject_error"); |
289 | 1.06k | } |
290 | 1.06k | }); |
291 | | |
292 | 1.06k | if (_cur_part_num == 1 && _pending_buf) { // data size is less than config::s3_write_buffer_size |
293 | 1.02k | RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size()); |
294 | 1.02k | } |
295 | | |
296 | 1.06k | if (_bytes_appended == 0) { |
297 | 3 | DCHECK_EQ(_cur_part_num, 1); |
298 | | // No data written, but need to create an empty file |
299 | 3 | RETURN_IF_ERROR(_build_upload_buffer()); |
300 | 3 | if (!_used_by_s3_committer) { |
301 | 3 | auto* pending_buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get()); |
302 | 3 | pending_buf->set_upload_to_remote([this](UploadFileBuffer& buf) { _put_object(buf); }); |
303 | 3 | } else { |
304 | 0 | RETURN_IF_ERROR(_create_multi_upload_request()); |
305 | 0 | } |
306 | 3 | } |
307 | | |
308 | 1.06k | if (_pending_buf != nullptr) { // there is remaining data in buffer need to be uploaded |
309 | 1.06k | auto st = _submit_upload_buffer(_pending_buf); |
310 | 1.06k | _pending_buf = nullptr; |
311 | 1.06k | if (!st.ok()) { |
312 | 0 | _wait_until_finish("pending buffer submit failed"); |
313 | 0 | return st; |
314 | 0 | } |
315 | 1.06k | } |
316 | | |
317 | 1.06k | RETURN_IF_ERROR(_complete()); |
318 | 1.06k | SYNC_POINT_RETURN_WITH_VALUE("s3_file_writer::close", Status()); |
319 | | |
320 | 1.06k | return Status::OK(); |
321 | 1.06k | } |
322 | | |
323 | 10.4k | Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) { |
324 | 10.4k | if (state() != State::OPENED) [[unlikely]] { |
325 | 0 | return Status::InternalError("append to closed file: {}", |
326 | 0 | _obj_storage_path_opts.path.native()); |
327 | 0 | } |
328 | | |
329 | 10.4k | if (!_first_append_timestamp.has_value()) { |
330 | 1.06k | _first_append_timestamp = std::chrono::steady_clock::now(); |
331 | 1.06k | } |
332 | | |
333 | 10.4k | size_t buffer_size = config::s3_write_buffer_size; |
334 | 10.4k | TEST_SYNC_POINT_RETURN_WITH_VALUE("s3_file_writer::appenv", Status()); |
335 | 20.8k | for (size_t i = 0; i < data_cnt; i++) { |
336 | 10.4k | size_t data_size = data[i].get_size(); |
337 | 21.0k | for (size_t pos = 0, data_size_to_append = 0; pos < data_size; pos += data_size_to_append) { |
338 | 10.6k | if (_failed) { |
339 | 0 | return _st; |
340 | 0 | } |
341 | 10.6k | if (!_pending_buf) { |
342 | 1.29k | RETURN_IF_ERROR(_build_upload_buffer()); |
343 | 1.29k | } |
344 | | // we need to make sure all parts except the last one to be 5MB or more |
345 | | // and shouldn't be larger than buf |
346 | 10.6k | data_size_to_append = std::min(data_size - pos, _pending_buf->get_file_offset() + |
347 | 10.6k | buffer_size - _bytes_appended); |
348 | | |
349 | | // if the buffer has memory buf inside, the data would be written into memory first then S3 then file cache |
350 | | // it would be written to cache then S3 if the buffer doesn't have memory preserved |
351 | 10.6k | RETURN_IF_ERROR(_pending_buf->append_data( |
352 | 10.6k | Slice {data[i].get_data() + pos, data_size_to_append})); |
353 | 10.6k | TEST_SYNC_POINT_CALLBACK("s3_file_writer::appenv_1", &_pending_buf, _cur_part_num); |
354 | | |
355 | | // If this is the last part and the data size is less than s3_write_buffer_size, |
356 | | // the pending_buf will be handled by _close_impl() and _complete() |
357 | | // If this is the last part and the data size is equal to s3_write_buffer_size, |
358 | | // the pending_buf is handled here and submitted. it will be waited by _complete() |
359 | 10.6k | if (_pending_buf->get_size() == buffer_size) { |
360 | | // only create multiple upload request when the data size is |
361 | | // larger or equal to s3_write_buffer_size than one memory buffer |
362 | 233 | if (_cur_part_num == 1) { |
363 | 46 | RETURN_IF_ERROR(_create_multi_upload_request()); |
364 | 46 | } |
365 | 232 | _cur_part_num++; |
366 | 232 | auto st = _submit_upload_buffer(_pending_buf); |
367 | 232 | _pending_buf = nullptr; |
368 | 232 | RETURN_IF_ERROR(st); |
369 | 232 | } |
370 | 10.6k | _bytes_appended += data_size_to_append; |
371 | 10.6k | } |
372 | 10.4k | } |
373 | 10.4k | return Status::OK(); |
374 | 10.4k | } |
375 | | |
376 | 267 | void S3FileWriter::_upload_one_part(int part_num, UploadFileBuffer& buf) { |
377 | 267 | VLOG_DEBUG << "upload_one_part " << _obj_storage_path_opts.path.native() |
378 | 0 | << " part=" << part_num; |
379 | 267 | if (buf.is_cancelled()) { |
380 | 0 | LOG_INFO("file {} skip part {} because previous failure {}", |
381 | 0 | _obj_storage_path_opts.path.native(), part_num, _st); |
382 | 0 | return; |
383 | 0 | } |
384 | 267 | const auto& client = _obj_client->get(); |
385 | 267 | if (nullptr == client) { |
386 | 0 | LOG_WARNING("failed to upload part, key={}, part_num={} bacause of null obj client", |
387 | 0 | _obj_storage_path_opts.key, part_num); |
388 | 0 | buf.set_status(Status::InternalError<false>("invalid obj storage client")); |
389 | 0 | return; |
390 | 0 | } |
391 | 267 | auto resp = client->upload_part(_obj_storage_path_opts, buf.get_string_view_data(), part_num); |
392 | 267 | if (resp.resp.status.code != ErrorCode::OK) { |
393 | 1 | LOG_WARNING("failed to upload part, key={}, part_num={}, status={}", |
394 | 1 | _obj_storage_path_opts.key, part_num, resp.resp.status.msg); |
395 | 1 | buf.set_status(Status(resp.resp.status.code, std::move(resp.resp.status.msg))); |
396 | 1 | return; |
397 | 1 | } |
398 | 266 | s3_bytes_written_total << buf.get_size(); |
399 | | |
400 | 266 | ObjectCompleteMultiPart completed_part { |
401 | 266 | part_num, resp.etag.has_value() ? std::move(resp.etag.value()) : ""}; |
402 | | |
403 | 266 | std::unique_lock<std::mutex> lck {_completed_lock}; |
404 | 266 | _completed_parts.emplace_back(std::move(completed_part)); |
405 | 266 | } |
406 | | |
407 | | // if enabled check |
408 | | // 1. issue a head object request for existence check |
409 | | // 2. check the file size |
410 | | Status check_after_upload(ObjStorageClient* client, const ObjectStorageResponse& upload_res, |
411 | | const ObjectStoragePathOptions& path_opt, int64_t bytes_appended, |
412 | 1.06k | const std::string& put_or_comp) { |
413 | 1.06k | if (!config::enable_s3_object_check_after_upload) return Status::OK(); |
414 | | |
415 | 1.06k | auto head_res = client->head_object(path_opt); |
416 | | |
417 | | // clang-format off |
418 | 1.06k | auto err_msg = [&]() { |
419 | 0 | std::stringstream ss; |
420 | 0 | ss << "failed to check object after upload=" << put_or_comp |
421 | 0 | << " file_path=" << path_opt.path.native() |
422 | 0 | << fmt::format(" {}_err=", put_or_comp) << upload_res.status.msg |
423 | 0 | << fmt::format(" {}_code=", put_or_comp) << upload_res.status.code |
424 | 0 | << fmt::format(" {}_http_code=", put_or_comp) << upload_res.http_code |
425 | 0 | << fmt::format(" {}_request_id=", put_or_comp) << upload_res.request_id |
426 | 0 | << " head_err=" << head_res.resp.status.msg |
427 | 0 | << " head_code=" << head_res.resp.status.code |
428 | 0 | << " head_http_code=" << head_res.resp.http_code |
429 | 0 | << " head_request_id=" << head_res.resp.request_id; |
430 | 0 | return ss.str(); |
431 | 0 | }; |
432 | | // clang-format on |
433 | | |
434 | | // TODO(gavin): make it fail by injection |
435 | 1.06k | TEST_SYNC_POINT_CALLBACK("S3FileWriter::check_after_load", &head_res); |
436 | 1.06k | if (head_res.resp.status.code != ErrorCode::OK && head_res.resp.http_code != 200) { |
437 | 0 | LOG(WARNING) << "failed to issue head object after upload, " << err_msg(); |
438 | 0 | DCHECK(false) << "failed to issue head object after upload, " << err_msg(); |
439 | | // FIXME(gavin): we should retry if this HEAD fails? |
440 | 0 | return Status::IOError( |
441 | 0 | "failed to issue head object after upload, status_code={}, http_code={}, err={}", |
442 | 0 | head_res.resp.status.code, head_res.resp.http_code, head_res.resp.status.msg); |
443 | 0 | } |
444 | 1.06k | if (head_res.file_size != bytes_appended) { |
445 | 0 | LOG(WARNING) << "failed to check size after upload, expected_size=" << bytes_appended |
446 | 0 | << " actual_size=" << head_res.file_size << err_msg(); |
447 | 0 | DCHECK_EQ(bytes_appended, head_res.file_size) |
448 | 0 | << "failed to check size after upload," << err_msg(); |
449 | 0 | return Status::IOError( |
450 | 0 | "failed to check object size after upload, expected_size={} actual_size={}", |
451 | 0 | bytes_appended, head_res.file_size); |
452 | 0 | } |
453 | 1.06k | return Status::OK(); |
454 | 1.06k | } |
455 | | |
456 | 1.06k | Status S3FileWriter::_complete() { |
457 | 1.06k | const auto& client = _obj_client->get(); |
458 | 1.06k | if (nullptr == client) { |
459 | 0 | return Status::InternalError<false>("invalid obj storage client"); |
460 | 0 | } |
461 | 1.06k | if (_failed) { |
462 | 0 | _wait_until_finish("early quit"); |
463 | 0 | return _st; |
464 | 0 | } |
465 | | // When the part num is only one, it means the data is less than 5MB so we can just put it. |
466 | 1.06k | if (_cur_part_num == 1) { |
467 | 1.02k | _wait_until_finish("PutObject"); |
468 | 1.02k | return _st; |
469 | 1.02k | } |
470 | | // Wait multipart load and finish. |
471 | 44 | _wait_until_finish("Complete"); |
472 | 44 | TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:1", |
473 | 44 | std::make_pair(&_failed, &_completed_parts)); |
474 | 44 | if (_used_by_s3_committer) { // S3 committer will complete multipart upload file on FE side. |
475 | 0 | s3_file_created_total << 1; // Assume that it will be created successfully |
476 | 0 | return Status::OK(); |
477 | 0 | } |
478 | | |
479 | | // check number of parts |
480 | 44 | int64_t expected_num_parts1 = (_bytes_appended / config::s3_write_buffer_size) + |
481 | 44 | !!(_bytes_appended % config::s3_write_buffer_size); |
482 | 44 | int64_t expected_num_parts2 = |
483 | 44 | (_bytes_appended % config::s3_write_buffer_size) ? _cur_part_num : _cur_part_num - 1; |
484 | 44 | DCHECK_EQ(expected_num_parts1, expected_num_parts2) |
485 | 0 | << " bytes_appended=" << _bytes_appended << " cur_part_num=" << _cur_part_num |
486 | 0 | << " s3_write_buffer_size=" << config::s3_write_buffer_size; |
487 | 44 | if (_failed || _completed_parts.size() != static_cast<size_t>(expected_num_parts1) || |
488 | 44 | expected_num_parts1 != expected_num_parts2) { |
489 | 3 | _st = Status::InternalError( |
490 | 3 | "failed to complete multipart upload, error status={} failed={} #complete_parts={} " |
491 | 3 | "#expected_parts={} " |
492 | 3 | "completed_parts_list={} file_path={} file_size={} has left buffer not uploaded={}", |
493 | 3 | _st, _failed, _completed_parts.size(), expected_num_parts1, _dump_completed_part(), |
494 | 3 | _obj_storage_path_opts.path.native(), _bytes_appended, _pending_buf != nullptr); |
495 | 3 | LOG(WARNING) << _st; |
496 | 3 | return _st; |
497 | 3 | } |
498 | | // make sure _completed_parts are ascending order |
499 | 41 | std::sort(_completed_parts.begin(), _completed_parts.end(), |
500 | 773 | [](auto& p1, auto& p2) { return p1.part_num < p2.part_num; }); |
501 | 41 | TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:2", &_completed_parts); |
502 | 41 | LOG(INFO) << "complete_multipart_upload " << _obj_storage_path_opts.path.native() |
503 | 41 | << " size=" << _bytes_appended << " number_parts=" << _completed_parts.size() |
504 | 41 | << " s3_write_buffer_size=" << config::s3_write_buffer_size; |
505 | 41 | auto resp = client->complete_multipart_upload(_obj_storage_path_opts, _completed_parts); |
506 | 41 | if (resp.status.code != ErrorCode::OK) { |
507 | 2 | LOG_WARNING("failed to complete multipart upload, err={}, file_path={}", resp.status.msg, |
508 | 2 | _obj_storage_path_opts.path.native()); |
509 | 2 | return {resp.status.code, std::move(resp.status.msg)}; |
510 | 2 | } |
511 | | |
512 | 39 | RETURN_IF_ERROR(check_after_upload(client.get(), resp, _obj_storage_path_opts, _bytes_appended, |
513 | 39 | "complete_multipart")); |
514 | | |
515 | 39 | s3_file_created_total << 1; |
516 | 39 | return Status::OK(); |
517 | 39 | } |
518 | | |
519 | 1.02k | Status S3FileWriter::_set_upload_to_remote_less_than_buffer_size() { |
520 | 1.02k | auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get()); |
521 | 1.02k | DCHECK(buf != nullptr); |
522 | 1.02k | if (_used_by_s3_committer) { |
523 | | // If used_by_s3_committer, we always use multi-parts uploading. |
524 | 0 | buf->set_upload_to_remote([part_num = _cur_part_num, this](UploadFileBuffer& buf) { |
525 | 0 | _upload_one_part(part_num, buf); |
526 | 0 | }); |
527 | 0 | DCHECK(_cur_part_num == 1); |
528 | 0 | RETURN_IF_ERROR(_create_multi_upload_request()); |
529 | 1.02k | } else { |
530 | | // if we only need to upload one file less than 5MB, we can just |
531 | | // call PutObject to reduce the network IO |
532 | 1.02k | buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); }); |
533 | 1.02k | } |
534 | 1.02k | return Status::OK(); |
535 | 1.02k | } |
536 | | |
537 | 1.02k | void S3FileWriter::_put_object(UploadFileBuffer& buf) { |
538 | 1.02k | MonotonicStopWatch timer; |
539 | 1.02k | timer.start(); |
540 | | |
541 | 1.02k | if (state() == State::CLOSED) { |
542 | 0 | DCHECK(state() != State::CLOSED) |
543 | 0 | << "state=" << (int)state() << " path=" << _obj_storage_path_opts.path.native(); |
544 | 0 | LOG_WARNING("failed to put object because file closed, file path {}", |
545 | 0 | _obj_storage_path_opts.path.native()); |
546 | 0 | buf.set_status(Status::InternalError<false>("try to put closed file")); |
547 | 0 | return; |
548 | 0 | } |
549 | 1.02k | const auto& client = _obj_client->get(); |
550 | 1.02k | if (nullptr == client) { |
551 | 0 | buf.set_status(Status::InternalError<false>("invalid obj storage client")); |
552 | 0 | return; |
553 | 0 | } |
554 | 1.02k | TEST_SYNC_POINT_RETURN_WITH_VOID("S3FileWriter::_put_object", this, &buf); |
555 | 1.02k | auto resp = client->put_object(_obj_storage_path_opts, buf.get_string_view_data()); |
556 | 1.02k | timer.stop(); |
557 | | |
558 | 1.02k | if (resp.status.code != ErrorCode::OK) { |
559 | 0 | LOG_WARNING("failed to put object, put object failed because {}, file path {}, time={}ms", |
560 | 0 | resp.status.msg, _obj_storage_path_opts.path.native(), |
561 | 0 | timer.elapsed_time_milliseconds()); |
562 | 0 | buf.set_status({resp.status.code, std::move(resp.status.msg)}); |
563 | 0 | return; |
564 | 0 | } |
565 | | |
566 | 1.02k | auto st = check_after_upload(client.get(), resp, _obj_storage_path_opts, _bytes_appended, |
567 | 1.02k | "put_object"); |
568 | 1.02k | if (!st.ok()) { |
569 | 0 | buf.set_status(st); |
570 | 0 | return; |
571 | 0 | } |
572 | | |
573 | 1.02k | LOG(INFO) << "put_object " << _obj_storage_path_opts.path.native() |
574 | 1.02k | << " size=" << _bytes_appended << " time=" << timer.elapsed_time_milliseconds() |
575 | 1.02k | << "ms"; |
576 | 1.02k | s3_file_created_total << 1; |
577 | 1.02k | s3_bytes_written_total << buf.get_size(); |
578 | 1.02k | } |
579 | | |
580 | 3 | std::string S3FileWriter::_dump_completed_part() const { |
581 | 3 | std::stringstream ss; |
582 | 3 | ss << "part_numbers:"; |
583 | 3 | for (const auto& part : _completed_parts) { |
584 | 2 | ss << " " << part.part_num; |
585 | 2 | } |
586 | 3 | return ss.str(); |
587 | 3 | } |
588 | | |
589 | | } // namespace doris::io |