be/src/io/fs/s3_file_writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "io/fs/s3_file_writer.h" |
19 | | |
20 | | #include <aws/s3/model/CompletedPart.h> |
21 | | #include <bvar/recorder.h> |
22 | | #include <bvar/reducer.h> |
23 | | #include <bvar/window.h> |
24 | | #include <fmt/core.h> |
25 | | #include <glog/logging.h> |
26 | | |
27 | | #include <sstream> |
28 | | #include <tuple> |
29 | | #include <utility> |
30 | | |
31 | | #include "common/config.h" |
32 | | #include "common/status.h" |
33 | | #include "cpp/sync_point.h" |
34 | | #include "io/cache/block_file_cache.h" |
35 | | #include "io/cache/block_file_cache_factory.h" |
36 | | #include "io/cache/file_block.h" |
37 | | #include "io/cache/file_cache_common.h" |
38 | | #include "io/fs/file_writer.h" |
39 | | #include "io/fs/path.h" |
40 | | #include "io/fs/s3_file_bufferpool.h" |
41 | | #include "io/fs/s3_file_system.h" |
42 | | #include "io/fs/s3_obj_storage_client.h" |
43 | | #include "runtime/exec_env.h" |
44 | | #include "util/s3_util.h" |
45 | | #include "util/stopwatch.hpp" |
46 | | |
47 | | namespace doris::io { |
48 | | #include "common/compile_check_begin.h" |
49 | | |
50 | | bvar::Adder<uint64_t> s3_file_writer_total("s3_file_writer_total_num"); |
51 | | bvar::Adder<uint64_t> s3_bytes_written_total("s3_file_writer_bytes_written"); |
52 | | bvar::Adder<uint64_t> s3_file_created_total("s3_file_writer_file_created"); |
53 | | bvar::Adder<uint64_t> s3_file_being_written("s3_file_writer_file_being_written"); |
54 | | bvar::Adder<uint64_t> s3_file_writer_async_close_queuing("s3_file_writer_async_close_queuing"); |
55 | | bvar::Adder<uint64_t> s3_file_writer_async_close_processing( |
56 | | "s3_file_writer_async_close_processing"); |
57 | | bvar::IntRecorder s3_file_writer_first_append_to_close_ms_recorder; |
58 | | bvar::Window<bvar::IntRecorder> s3_file_writer_first_append_to_close_ms_window( |
59 | | "s3_file_writer_first_append_to_close_ms", |
60 | | &s3_file_writer_first_append_to_close_ms_recorder, /*window_size=*/10); |
61 | | |
62 | | S3FileWriter::S3FileWriter(std::shared_ptr<ObjClientHolder> client, std::string bucket, |
63 | | std::string key, const FileWriterOptions* opts) |
64 | 72.1k | : _obj_storage_path_opts({.path = fmt::format("s3://{}/{}", bucket, key), |
65 | 72.1k | .bucket = std::move(bucket), |
66 | 72.1k | .key = std::move(key)}), |
67 | 72.1k | _used_by_s3_committer(opts ? opts->used_by_s3_committer : false), |
68 | 72.1k | _obj_client(std::move(client)) { |
69 | 72.1k | s3_file_writer_total << 1; |
70 | 72.1k | s3_file_being_written << 1; |
71 | 72.1k | Aws::Http::SetCompliantRfc3986Encoding(true); |
72 | | |
73 | 72.1k | init_cache_builder(opts, _obj_storage_path_opts.path); |
74 | 72.1k | } |
75 | | |
76 | 72.4k | S3FileWriter::~S3FileWriter() { |
77 | 72.4k | if (_async_close_pack != nullptr) { |
78 | | // For thread safety |
79 | 14.1k | std::ignore = _async_close_pack->future.get(); |
80 | 14.1k | _async_close_pack = nullptr; |
81 | 58.3k | } else { |
82 | | // Consider one situation where the file writer is destructed after it submit at least one async task |
83 | | // without calling close(), then there exists one occasion where the async task is executed right after |
84 | | // the correspoding S3 file writer is already destructed |
85 | 58.3k | _wait_until_finish(fmt::format("wait s3 file {} upload to be finished", |
86 | 58.3k | _obj_storage_path_opts.path.native())); |
87 | 58.3k | } |
88 | | // We won't do S3 abort operation in BE, we let s3 service do it own. |
89 | 72.4k | if (state() == State::OPENED && !_failed) { |
90 | 1.07k | s3_bytes_written_total << _bytes_appended; |
91 | 1.07k | } |
92 | 72.4k | s3_file_being_written << -1; |
93 | 72.4k | } |
94 | | |
95 | 115 | Status S3FileWriter::_create_multi_upload_request() { |
96 | 115 | LOG(INFO) << "create_multi_upload_request " << _obj_storage_path_opts.path.native(); |
97 | 115 | const auto& client = _obj_client->get(); |
98 | 115 | if (nullptr == client) { |
99 | 0 | return Status::InternalError<false>("invalid obj storage client"); |
100 | 0 | } |
101 | 115 | auto resp = client->create_multipart_upload(_obj_storage_path_opts); |
102 | 115 | if (resp.resp.status.code == ErrorCode::OK) { |
103 | 114 | _obj_storage_path_opts.upload_id = resp.upload_id; |
104 | 114 | } |
105 | 115 | return {resp.resp.status.code, std::move(resp.resp.status.msg)}; |
106 | 115 | } |
107 | | |
108 | 129k | void S3FileWriter::_wait_until_finish(std::string_view task_name) { |
109 | 129k | auto timeout_duration = config::s3_file_writer_log_interval_second; |
110 | 129k | auto msg = fmt::format( |
111 | 129k | "{} multipart upload already takes {} seconds, bucket={}, key={}, upload_id={}", |
112 | 129k | task_name, timeout_duration, _obj_storage_path_opts.bucket, |
113 | 129k | _obj_storage_path_opts.path.native(), |
114 | 129k | _obj_storage_path_opts.upload_id.has_value() ? *_obj_storage_path_opts.upload_id : ""); |
115 | 129k | timespec current_time; |
116 | | // We don't need high accuracy here, so we use time(nullptr) |
117 | | // since it's the fastest way to get current time(second) |
118 | 129k | auto current_time_second = time(nullptr); |
119 | 129k | current_time.tv_sec = current_time_second + timeout_duration; |
120 | 129k | current_time.tv_nsec = 0; |
121 | | // bthread::countdown_event::timed_wait() should use absolute time |
122 | 129k | while (0 != _countdown_event.timed_wait(current_time)) { |
123 | 0 | current_time.tv_sec += timeout_duration; |
124 | 0 | LOG(WARNING) << msg; |
125 | 0 | } |
126 | 129k | } |
127 | | |
128 | 128k | Status S3FileWriter::close(bool non_block) { |
129 | 128k | auto record_close_latency = [this]() { |
130 | 57.2k | if (_close_latency_recorded || !_first_append_timestamp.has_value()) { |
131 | 33 | return; |
132 | 33 | } |
133 | 57.2k | auto now = std::chrono::steady_clock::now(); |
134 | 57.2k | auto latency_ms = std::chrono::duration_cast<std::chrono::milliseconds>( |
135 | 57.2k | now - *_first_append_timestamp) |
136 | 57.2k | .count(); |
137 | 57.2k | s3_file_writer_first_append_to_close_ms_recorder << latency_ms; |
138 | 57.2k | if (auto* sampler = s3_file_writer_first_append_to_close_ms_recorder.get_sampler()) { |
139 | 57.2k | sampler->take_sample(); |
140 | 57.2k | } |
141 | 57.2k | _close_latency_recorded = true; |
142 | 57.2k | }; |
143 | | |
144 | 128k | if (state() == State::CLOSED) { |
145 | 2.02k | if (_async_close_pack != nullptr) { |
146 | 1.02k | _st = _async_close_pack->future.get(); |
147 | 1.02k | _async_close_pack = nullptr; |
148 | | // Return the final close status so that a blocking close issued after |
149 | | // an async close observes the real result just like the legacy behavior. |
150 | 1.02k | if (!non_block && _st.ok()) { |
151 | 26 | record_close_latency(); |
152 | 26 | } |
153 | 1.02k | return _st; |
154 | 1.02k | } |
155 | 1.00k | if (non_block) { |
156 | 1.00k | if (_st.ok()) { |
157 | 1.00k | record_close_latency(); |
158 | 1.00k | return Status::Error<ErrorCode::ALREADY_CLOSED>( |
159 | 1.00k | "S3FileWriter already closed, file path {}, file key {}", |
160 | 1.00k | _obj_storage_path_opts.path.native(), _obj_storage_path_opts.key); |
161 | 1.00k | } |
162 | 0 | return _st; |
163 | 1.00k | } |
164 | 0 | if (_st.ok()) { |
165 | 0 | record_close_latency(); |
166 | 0 | return Status::Error<ErrorCode::ALREADY_CLOSED>( |
167 | 0 | "S3FileWriter already closed, file path {}, file key {}", |
168 | 0 | _obj_storage_path_opts.path.native(), _obj_storage_path_opts.key); |
169 | 0 | } |
170 | 0 | return _st; |
171 | 0 | } |
172 | 126k | if (state() == State::ASYNC_CLOSING) { |
173 | 54.8k | if (non_block) { |
174 | 0 | return Status::InternalError("Don't submit async close multi times"); |
175 | 0 | } |
176 | 54.8k | CHECK(_async_close_pack != nullptr); |
177 | 54.8k | _st = _async_close_pack->future.get(); |
178 | 54.8k | _async_close_pack = nullptr; |
179 | | // We should wait for all the pre async task to be finished |
180 | 54.8k | _state = State::CLOSED; |
181 | | // The next time we call close() with no matter non_block true or false, it would always return the |
182 | | // '_st' value because this writer is already closed. |
183 | 54.8k | if (!non_block && _st.ok()) { |
184 | 54.8k | record_close_latency(); |
185 | 54.8k | } |
186 | 54.8k | return _st; |
187 | 54.8k | } |
188 | 71.1k | if (non_block) { |
189 | 69.8k | _state = State::ASYNC_CLOSING; |
190 | 69.8k | _async_close_pack = std::make_unique<AsyncCloseStatusPack>(); |
191 | 69.8k | _async_close_pack->future = _async_close_pack->promise.get_future(); |
192 | 69.8k | s3_file_writer_async_close_queuing << 1; |
193 | 69.9k | return ExecEnv::GetInstance()->non_block_close_thread_pool()->submit_func([&]() { |
194 | 69.9k | s3_file_writer_async_close_queuing << -1; |
195 | 69.9k | s3_file_writer_async_close_processing << 1; |
196 | 69.9k | _st = _close_impl(); |
197 | 69.9k | _state = State::CLOSED; |
198 | 69.9k | _async_close_pack->promise.set_value(_st); |
199 | 69.9k | s3_file_writer_async_close_processing << -1; |
200 | 69.9k | }); |
201 | 69.8k | } |
202 | 1.34k | _st = _close_impl(); |
203 | 1.34k | _state = State::CLOSED; |
204 | 1.42k | if (!non_block && _st.ok()) { |
205 | 1.41k | record_close_latency(); |
206 | 1.41k | } |
207 | 1.34k | return _st; |
208 | 71.1k | } |
209 | | |
210 | 71.7k | bool S3FileWriter::_complete_part_task_callback(Status s) { |
211 | 71.7k | bool ret = false; |
212 | 71.7k | if (!s.ok()) [[unlikely]] { |
213 | 14 | VLOG_NOTICE << "failed at key: " << _obj_storage_path_opts.key |
214 | 0 | << ", status: " << s.to_string(); |
215 | 14 | std::unique_lock<std::mutex> _lck {_completed_lock}; |
216 | 14 | _failed = true; |
217 | 14 | ret = true; |
218 | 14 | _st = std::move(s); |
219 | 14 | } |
220 | | // After the signal, there is a scenario where the previous invocation of _wait_until_finish |
221 | | // returns to the caller, and subsequently, the S3 file writer is destructed. |
222 | | // This means that accessing _failed afterwards would result in a heap use after free vulnerability. |
223 | 71.7k | _countdown_event.signal(); |
224 | 71.7k | return ret; |
225 | 71.7k | } |
226 | | |
227 | 71.7k | Status S3FileWriter::_build_upload_buffer() { |
228 | 71.7k | auto builder = FileBufferBuilder(); |
229 | 71.7k | builder.set_type(BufferType::UPLOAD) |
230 | 71.7k | .set_upload_callback([part_num = _cur_part_num, this](UploadFileBuffer& buf) { |
231 | 445 | _upload_one_part(part_num, buf); |
232 | 445 | }) |
233 | 71.7k | .set_file_offset(_bytes_appended) |
234 | 71.7k | .set_sync_after_complete_task([this](auto&& PH1) { |
235 | 71.7k | return _complete_part_task_callback(std::forward<decltype(PH1)>(PH1)); |
236 | 71.7k | }) |
237 | 72.1k | .set_is_cancelled([this]() { return _failed.load(); }); |
238 | 71.7k | if (_cache_builder != nullptr) { |
239 | | // We would load the data into file cache asynchronously which indicates |
240 | | // that this instance of S3FileWriter might have been destructed when we |
241 | | // try to do writing into file cache, so we make the lambda capture the variable |
242 | | // we need by value to extend their lifetime |
243 | 58.8k | int64_t id = get_tablet_id(_obj_storage_path_opts.path.native()).value_or(0); |
244 | 58.8k | builder.set_allocate_file_blocks_holder([builder = *_cache_builder, |
245 | 58.8k | offset = _bytes_appended, |
246 | 58.8k | tablet_id = id]() -> FileBlocksHolderPtr { |
247 | 58.7k | return builder.allocate_cache_holder(offset, config::s3_write_buffer_size, tablet_id); |
248 | 58.7k | }); |
249 | 58.8k | } |
250 | 71.7k | RETURN_IF_ERROR(builder.build(&_pending_buf)); |
251 | 71.7k | auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get()); |
252 | 71.7k | DCHECK(buf != nullptr); |
253 | 71.7k | return Status::OK(); |
254 | 71.7k | } |
255 | | |
256 | 71.4k | Status S3FileWriter::_close_impl() { |
257 | 18.4E | VLOG_DEBUG << "S3FileWriter::close, path: " << _obj_storage_path_opts.path.native(); |
258 | | |
259 | 71.4k | if (_cur_part_num == 1 && _pending_buf) { // data size is less than config::s3_write_buffer_size |
260 | 71.2k | RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size()); |
261 | 71.2k | } |
262 | | |
263 | 71.4k | if (_bytes_appended == 0) { |
264 | 36 | DCHECK_EQ(_cur_part_num, 1); |
265 | | // No data written, but need to create an empty file |
266 | 36 | RETURN_IF_ERROR(_build_upload_buffer()); |
267 | 36 | if (!_used_by_s3_committer) { |
268 | 36 | auto* pending_buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get()); |
269 | 36 | pending_buf->set_upload_to_remote([this](UploadFileBuffer& buf) { _put_object(buf); }); |
270 | 36 | } else { |
271 | 0 | RETURN_IF_ERROR(_create_multi_upload_request()); |
272 | 0 | } |
273 | 36 | } |
274 | | |
275 | 71.4k | if (_pending_buf != nullptr) { // there is remaining data in buffer need to be uploaded |
276 | 71.4k | _countdown_event.add_count(); |
277 | 71.4k | RETURN_IF_ERROR(FileBuffer::submit(std::move(_pending_buf))); |
278 | 71.4k | _pending_buf = nullptr; |
279 | 71.4k | } |
280 | | |
281 | 71.4k | RETURN_IF_ERROR(_complete()); |
282 | 71.4k | SYNC_POINT_RETURN_WITH_VALUE("s3_file_writer::close", Status()); |
283 | | |
284 | 71.3k | return Status::OK(); |
285 | 71.3k | } |
286 | | |
287 | 2.40M | Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) { |
288 | 2.40M | if (state() != State::OPENED) [[unlikely]] { |
289 | 0 | return Status::InternalError("append to closed file: {}", |
290 | 0 | _obj_storage_path_opts.path.native()); |
291 | 0 | } |
292 | | |
293 | 2.40M | if (!_first_append_timestamp.has_value()) { |
294 | 70.3k | _first_append_timestamp = std::chrono::steady_clock::now(); |
295 | 70.3k | } |
296 | | |
297 | 2.40M | size_t buffer_size = config::s3_write_buffer_size; |
298 | 2.40M | TEST_SYNC_POINT_RETURN_WITH_VALUE("s3_file_writer::appenv", Status()); |
299 | 10.0M | for (size_t i = 0; i < data_cnt; i++) { |
300 | 7.67M | size_t data_size = data[i].get_size(); |
301 | 14.5M | for (size_t pos = 0, data_size_to_append = 0; pos < data_size; pos += data_size_to_append) { |
302 | 6.91M | if (_failed) { |
303 | 0 | return _st; |
304 | 0 | } |
305 | 6.91M | if (!_pending_buf) { |
306 | 70.4k | RETURN_IF_ERROR(_build_upload_buffer()); |
307 | 70.4k | } |
308 | | // we need to make sure all parts except the last one to be 5MB or more |
309 | | // and shouldn't be larger than buf |
310 | 6.91M | data_size_to_append = std::min(data_size - pos, _pending_buf->get_file_offset() + |
311 | 6.91M | buffer_size - _bytes_appended); |
312 | | |
313 | | // if the buffer has memory buf inside, the data would be written into memory first then S3 then file cache |
314 | | // it would be written to cache then S3 if the buffer doesn't have memory preserved |
315 | 6.91M | RETURN_IF_ERROR(_pending_buf->append_data( |
316 | 6.91M | Slice {data[i].get_data() + pos, data_size_to_append})); |
317 | 6.91M | TEST_SYNC_POINT_CALLBACK("s3_file_writer::appenv_1", &_pending_buf, _cur_part_num); |
318 | | |
319 | | // If this is the last part and the data size is less than s3_write_buffer_size, |
320 | | // the pending_buf will be handled by _close_impl() and _complete() |
321 | | // If this is the last part and the data size is equal to s3_write_buffer_size, |
322 | | // the pending_buf is handled here and submitted. it will be waited by _complete() |
323 | 6.91M | if (_pending_buf->get_size() == buffer_size) { |
324 | | // only create multiple upload request when the data size is |
325 | | // larger or equal to s3_write_buffer_size than one memory buffer |
326 | 90 | if (_cur_part_num == 1) { |
327 | 69 | RETURN_IF_ERROR(_create_multi_upload_request()); |
328 | 69 | } |
329 | 90 | _cur_part_num++; |
330 | 90 | _countdown_event.add_count(); |
331 | 90 | RETURN_IF_ERROR(FileBuffer::submit(std::move(_pending_buf))); |
332 | 90 | _pending_buf = nullptr; |
333 | 90 | } |
334 | 6.91M | _bytes_appended += data_size_to_append; |
335 | 6.91M | } |
336 | 7.67M | } |
337 | 2.40M | return Status::OK(); |
338 | 2.40M | } |
339 | | |
340 | 445 | void S3FileWriter::_upload_one_part(int part_num, UploadFileBuffer& buf) { |
341 | 445 | VLOG_DEBUG << "upload_one_part " << _obj_storage_path_opts.path.native() |
342 | 0 | << " part=" << part_num; |
343 | 445 | if (buf.is_cancelled()) { |
344 | 0 | LOG_INFO("file {} skip part {} because previous failure {}", |
345 | 0 | _obj_storage_path_opts.path.native(), part_num, _st); |
346 | 0 | return; |
347 | 0 | } |
348 | 445 | const auto& client = _obj_client->get(); |
349 | 445 | if (nullptr == client) { |
350 | 0 | LOG_WARNING("failed to upload part, key={}, part_num={} bacause of null obj client", |
351 | 0 | _obj_storage_path_opts.key, part_num); |
352 | 0 | buf.set_status(Status::InternalError<false>("invalid obj storage client")); |
353 | 0 | return; |
354 | 0 | } |
355 | 445 | auto resp = client->upload_part(_obj_storage_path_opts, buf.get_string_view_data(), part_num); |
356 | 445 | if (resp.resp.status.code != ErrorCode::OK) { |
357 | 1 | LOG_WARNING("failed to upload part, key={}, part_num={}, status={}", |
358 | 1 | _obj_storage_path_opts.key, part_num, resp.resp.status.msg); |
359 | 1 | buf.set_status(Status(resp.resp.status.code, std::move(resp.resp.status.msg))); |
360 | 1 | return; |
361 | 1 | } |
362 | 444 | s3_bytes_written_total << buf.get_size(); |
363 | | |
364 | 444 | ObjectCompleteMultiPart completed_part { |
365 | 444 | static_cast<int>(part_num), resp.etag.has_value() ? std::move(resp.etag.value()) : ""}; |
366 | | |
367 | 444 | std::unique_lock<std::mutex> lck {_completed_lock}; |
368 | 444 | _completed_parts.emplace_back(std::move(completed_part)); |
369 | 444 | } |
370 | | |
371 | | // if enabled check |
372 | | // 1. issue a head object request for existence check |
373 | | // 2. check the file size |
374 | | Status check_after_upload(ObjStorageClient* client, const ObjectStorageResponse& upload_res, |
375 | | const ObjectStoragePathOptions& path_opt, int64_t bytes_appended, |
376 | 71.3k | const std::string& put_or_comp) { |
377 | 71.3k | if (!config::enable_s3_object_check_after_upload) return Status::OK(); |
378 | | |
379 | 71.3k | auto head_res = client->head_object(path_opt); |
380 | | |
381 | | // clang-format off |
382 | 71.3k | auto err_msg = [&]() { |
383 | 0 | std::stringstream ss; |
384 | 0 | ss << "failed to check object after upload=" << put_or_comp |
385 | 0 | << " file_path=" << path_opt.path.native() |
386 | 0 | << fmt::format(" {}_err=", put_or_comp) << upload_res.status.msg |
387 | 0 | << fmt::format(" {}_code=", put_or_comp) << upload_res.status.code |
388 | 0 | << fmt::format(" {}_http_code=", put_or_comp) << upload_res.http_code |
389 | 0 | << fmt::format(" {}_request_id=", put_or_comp) << upload_res.request_id |
390 | 0 | << " head_err=" << head_res.resp.status.msg |
391 | 0 | << " head_code=" << head_res.resp.status.code |
392 | 0 | << " head_http_code=" << head_res.resp.http_code |
393 | 0 | << " head_request_id=" << head_res.resp.request_id; |
394 | 0 | return ss.str(); |
395 | 0 | }; |
396 | | // clang-format on |
397 | | |
398 | | // TODO(gavin): make it fail by injection |
399 | 71.3k | TEST_SYNC_POINT_CALLBACK("S3FileWriter::check_after_load", &head_res); |
400 | 71.3k | if (head_res.resp.status.code != ErrorCode::OK && head_res.resp.http_code != 200) { |
401 | 0 | LOG(WARNING) << "failed to issue head object after upload, " << err_msg(); |
402 | 0 | DCHECK(false) << "failed to issue head object after upload, " << err_msg(); |
403 | | // FIXME(gavin): we should retry if this HEAD fails? |
404 | 0 | return Status::IOError( |
405 | 0 | "failed to issue head object after upload, status_code={}, http_code={}, err={}", |
406 | 0 | head_res.resp.status.code, head_res.resp.http_code, head_res.resp.status.msg); |
407 | 0 | } |
408 | 71.3k | if (head_res.file_size != bytes_appended) { |
409 | 0 | LOG(WARNING) << "failed to check size after upload, expected_size=" << bytes_appended |
410 | 0 | << " actual_size=" << head_res.file_size << err_msg(); |
411 | 0 | DCHECK_EQ(bytes_appended, head_res.file_size) |
412 | 0 | << "failed to check size after upload," << err_msg(); |
413 | 0 | return Status::IOError( |
414 | 0 | "failed to check object size after upload, expected_size={} actual_size={}", |
415 | 0 | bytes_appended, head_res.file_size); |
416 | 0 | } |
417 | 71.3k | return Status::OK(); |
418 | 71.3k | } |
419 | | |
420 | 71.4k | Status S3FileWriter::_complete() { |
421 | 71.4k | const auto& client = _obj_client->get(); |
422 | 71.4k | if (nullptr == client) { |
423 | 0 | return Status::InternalError<false>("invalid obj storage client"); |
424 | 0 | } |
425 | 71.4k | if (_failed) { |
426 | 0 | _wait_until_finish("early quit"); |
427 | 0 | return _st; |
428 | 0 | } |
429 | | // When the part num is only one, it means the data is less than 5MB so we can just put it. |
430 | 71.4k | if (_cur_part_num == 1) { |
431 | 71.3k | _wait_until_finish("PutObject"); |
432 | 71.3k | return _st; |
433 | 71.3k | } |
434 | | // Wait multipart load and finish. |
435 | 113 | _wait_until_finish("Complete"); |
436 | 113 | TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:1", |
437 | 113 | std::make_pair(&_failed, &_completed_parts)); |
438 | 113 | if (_used_by_s3_committer) { // S3 committer will complete multipart upload file on FE side. |
439 | 0 | s3_file_created_total << 1; // Assume that it will be created successfully |
440 | 0 | return Status::OK(); |
441 | 0 | } |
442 | | |
443 | | // check number of parts |
444 | 113 | int64_t expected_num_parts1 = (_bytes_appended / config::s3_write_buffer_size) + |
445 | 113 | !!(_bytes_appended % config::s3_write_buffer_size); |
446 | 113 | int64_t expected_num_parts2 = |
447 | 113 | (_bytes_appended % config::s3_write_buffer_size) ? _cur_part_num : _cur_part_num - 1; |
448 | 113 | DCHECK_EQ(expected_num_parts1, expected_num_parts2) |
449 | 0 | << " bytes_appended=" << _bytes_appended << " cur_part_num=" << _cur_part_num |
450 | 0 | << " s3_write_buffer_size=" << config::s3_write_buffer_size; |
451 | 113 | if (_failed || _completed_parts.size() != static_cast<size_t>(expected_num_parts1) || |
452 | 113 | expected_num_parts1 != expected_num_parts2) { |
453 | 3 | _st = Status::InternalError( |
454 | 3 | "failed to complete multipart upload, error status={} failed={} #complete_parts={} " |
455 | 3 | "#expected_parts={} " |
456 | 3 | "completed_parts_list={} file_path={} file_size={} has left buffer not uploaded={}", |
457 | 3 | _st, _failed, _completed_parts.size(), expected_num_parts1, _dump_completed_part(), |
458 | 3 | _obj_storage_path_opts.path.native(), _bytes_appended, _pending_buf != nullptr); |
459 | 3 | LOG(WARNING) << _st; |
460 | 3 | return _st; |
461 | 3 | } |
462 | | // make sure _completed_parts are ascending order |
463 | 110 | std::sort(_completed_parts.begin(), _completed_parts.end(), |
464 | 1.01k | [](auto& p1, auto& p2) { return p1.part_num < p2.part_num; }); |
465 | 110 | TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:2", &_completed_parts); |
466 | 110 | LOG(INFO) << "complete_multipart_upload " << _obj_storage_path_opts.path.native() |
467 | 110 | << " size=" << _bytes_appended << " number_parts=" << _completed_parts.size() |
468 | 110 | << " s3_write_buffer_size=" << config::s3_write_buffer_size; |
469 | 110 | auto resp = client->complete_multipart_upload(_obj_storage_path_opts, _completed_parts); |
470 | 110 | if (resp.status.code != ErrorCode::OK) { |
471 | 2 | LOG_WARNING("failed to complete multipart upload, err={}, file_path={}", resp.status.msg, |
472 | 2 | _obj_storage_path_opts.path.native()); |
473 | 2 | return {resp.status.code, std::move(resp.status.msg)}; |
474 | 2 | } |
475 | | |
476 | 108 | RETURN_IF_ERROR(check_after_upload(client.get(), resp, _obj_storage_path_opts, _bytes_appended, |
477 | 108 | "complete_multipart")); |
478 | | |
479 | 108 | s3_file_created_total << 1; |
480 | 108 | return Status::OK(); |
481 | 108 | } |
482 | | |
483 | 71.2k | Status S3FileWriter::_set_upload_to_remote_less_than_buffer_size() { |
484 | 71.2k | auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get()); |
485 | 71.2k | DCHECK(buf != nullptr); |
486 | 71.2k | if (_used_by_s3_committer) { |
487 | | // If used_by_s3_committer, we always use multi-parts uploading. |
488 | 0 | buf->set_upload_to_remote([part_num = _cur_part_num, this](UploadFileBuffer& buf) { |
489 | 0 | _upload_one_part(part_num, buf); |
490 | 0 | }); |
491 | 0 | DCHECK(_cur_part_num == 1); |
492 | 0 | RETURN_IF_ERROR(_create_multi_upload_request()); |
493 | 71.2k | } else { |
494 | | // if we only need to upload one file less than 5MB, we can just |
495 | | // call PutObject to reduce the network IO |
496 | 71.2k | buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); }); |
497 | 71.2k | } |
498 | 71.2k | return Status::OK(); |
499 | 71.2k | } |
500 | | |
501 | 70.2k | void S3FileWriter::_put_object(UploadFileBuffer& buf) { |
502 | 70.2k | MonotonicStopWatch timer; |
503 | 70.2k | timer.start(); |
504 | | |
505 | 70.2k | if (state() == State::CLOSED) { |
506 | 0 | DCHECK(state() != State::CLOSED) |
507 | 0 | << "state=" << (int)state() << " path=" << _obj_storage_path_opts.path.native(); |
508 | 0 | LOG_WARNING("failed to put object because file closed, file path {}", |
509 | 0 | _obj_storage_path_opts.path.native()); |
510 | 0 | buf.set_status(Status::InternalError<false>("try to put closed file")); |
511 | 0 | return; |
512 | 0 | } |
513 | 70.2k | const auto& client = _obj_client->get(); |
514 | 70.2k | if (nullptr == client) { |
515 | 0 | buf.set_status(Status::InternalError<false>("invalid obj storage client")); |
516 | 0 | return; |
517 | 0 | } |
518 | 70.2k | TEST_SYNC_POINT_RETURN_WITH_VOID("S3FileWriter::_put_object", this, &buf); |
519 | 70.2k | auto resp = client->put_object(_obj_storage_path_opts, buf.get_string_view_data()); |
520 | 70.2k | timer.stop(); |
521 | | |
522 | 70.2k | if (resp.status.code != ErrorCode::OK) { |
523 | 11 | LOG_WARNING("failed to put object, put object failed because {}, file path {}, time={}ms", |
524 | 11 | resp.status.msg, _obj_storage_path_opts.path.native(), |
525 | 11 | timer.elapsed_time_milliseconds()); |
526 | 11 | buf.set_status({resp.status.code, std::move(resp.status.msg)}); |
527 | 11 | return; |
528 | 11 | } |
529 | | |
530 | 70.2k | auto st = check_after_upload(client.get(), resp, _obj_storage_path_opts, _bytes_appended, |
531 | 70.2k | "put_object"); |
532 | 70.2k | if (!st.ok()) { |
533 | 0 | buf.set_status(st); |
534 | 0 | return; |
535 | 0 | } |
536 | | |
537 | 70.2k | LOG(INFO) << "put_object " << _obj_storage_path_opts.path.native() |
538 | 70.2k | << " size=" << _bytes_appended << " time=" << timer.elapsed_time_milliseconds() |
539 | 70.2k | << "ms"; |
540 | 70.2k | s3_file_created_total << 1; |
541 | 70.2k | s3_bytes_written_total << buf.get_size(); |
542 | 70.2k | } |
543 | | |
544 | 3 | std::string S3FileWriter::_dump_completed_part() const { |
545 | 3 | std::stringstream ss; |
546 | 3 | ss << "part_numbers:"; |
547 | 3 | for (const auto& part : _completed_parts) { |
548 | 2 | ss << " " << part.part_num; |
549 | 2 | } |
550 | 3 | return ss.str(); |
551 | 3 | } |
552 | | #include "common/compile_check_end.h" |
553 | | |
554 | | } // namespace doris::io |