be/src/io/fs/s3_file_writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "io/fs/s3_file_writer.h" |
19 | | |
20 | | #include <aws/s3/model/CompletedPart.h> |
21 | | #include <bvar/recorder.h> |
22 | | #include <bvar/reducer.h> |
23 | | #include <bvar/window.h> |
24 | | #include <fmt/core.h> |
25 | | #include <glog/logging.h> |
26 | | |
27 | | #include <sstream> |
28 | | #include <tuple> |
29 | | #include <utility> |
30 | | |
31 | | #include "common/config.h" |
32 | | #include "common/status.h" |
33 | | #include "cpp/sync_point.h" |
34 | | #include "io/cache/block_file_cache.h" |
35 | | #include "io/cache/block_file_cache_factory.h" |
36 | | #include "io/cache/file_block.h" |
37 | | #include "io/cache/file_cache_common.h" |
38 | | #include "io/fs/file_writer.h" |
39 | | #include "io/fs/path.h" |
40 | | #include "io/fs/s3_file_bufferpool.h" |
41 | | #include "io/fs/s3_file_system.h" |
42 | | #include "io/fs/s3_obj_storage_client.h" |
43 | | #include "runtime/exec_env.h" |
44 | | #include "util/s3_util.h" |
45 | | #include "util/stopwatch.hpp" |
46 | | |
47 | | namespace doris::io { |
48 | | |
49 | | bvar::Adder<uint64_t> s3_file_writer_total("s3_file_writer_total_num"); |
50 | | bvar::Adder<uint64_t> s3_bytes_written_total("s3_file_writer_bytes_written"); |
51 | | bvar::Adder<uint64_t> s3_file_created_total("s3_file_writer_file_created"); |
52 | | bvar::Adder<uint64_t> s3_file_being_written("s3_file_writer_file_being_written"); |
53 | | bvar::Adder<uint64_t> s3_file_writer_async_close_queuing("s3_file_writer_async_close_queuing"); |
54 | | bvar::Adder<uint64_t> s3_file_writer_async_close_processing( |
55 | | "s3_file_writer_async_close_processing"); |
56 | | bvar::IntRecorder s3_file_writer_first_append_to_close_ms_recorder; |
57 | | bvar::Window<bvar::IntRecorder> s3_file_writer_first_append_to_close_ms_window( |
58 | | "s3_file_writer_first_append_to_close_ms", |
59 | | &s3_file_writer_first_append_to_close_ms_recorder, /*window_size=*/10); |
60 | | |
61 | | S3FileWriter::S3FileWriter(std::shared_ptr<ObjClientHolder> client, std::string bucket, |
62 | | std::string key, const FileWriterOptions* opts) |
63 | 83.5k | : _obj_storage_path_opts({.path = fmt::format("s3://{}/{}", bucket, key), |
64 | 83.5k | .bucket = std::move(bucket), |
65 | 83.5k | .key = std::move(key)}), |
66 | 83.5k | _used_by_s3_committer(opts ? opts->used_by_s3_committer : false), |
67 | 83.5k | _obj_client(std::move(client)) { |
68 | 83.5k | s3_file_writer_total << 1; |
69 | 83.5k | s3_file_being_written << 1; |
70 | 83.5k | Aws::Http::SetCompliantRfc3986Encoding(true); |
71 | | |
72 | 83.5k | init_cache_builder(opts, _obj_storage_path_opts.path); |
73 | 83.5k | } |
74 | | |
75 | 78.0k | S3FileWriter::~S3FileWriter() { |
76 | 78.0k | if (_async_close_pack != nullptr) { |
77 | | // For thread safety |
78 | 516 | std::ignore = _async_close_pack->future.get(); |
79 | 516 | _async_close_pack = nullptr; |
80 | 77.4k | } else { |
81 | | // Consider one situation where the file writer is destructed after it submit at least one async task |
82 | | // without calling close(), then there exists one occasion where the async task is executed right after |
83 | | // the correspoding S3 file writer is already destructed |
84 | 77.4k | _wait_until_finish(fmt::format("wait s3 file {} upload to be finished", |
85 | 77.4k | _obj_storage_path_opts.path.native())); |
86 | 77.4k | } |
87 | | // We won't do S3 abort operation in BE, we let s3 service do it own. |
88 | 78.0k | if (state() == State::OPENED && !_failed) { |
89 | 61.3k | s3_bytes_written_total << _bytes_appended; |
90 | 61.3k | } |
91 | 78.0k | s3_file_being_written << -1; |
92 | 78.0k | } |
93 | | |
94 | 123 | Status S3FileWriter::_create_multi_upload_request() { |
95 | 123 | LOG(INFO) << "create_multi_upload_request " << _obj_storage_path_opts.path.native(); |
96 | 123 | const auto& client = _obj_client->get(); |
97 | 123 | if (nullptr == client) { |
98 | 0 | return Status::InternalError<false>("invalid obj storage client"); |
99 | 0 | } |
100 | 123 | auto resp = client->create_multipart_upload(_obj_storage_path_opts); |
101 | 123 | if (resp.resp.status.code == ErrorCode::OK) { |
102 | 122 | _obj_storage_path_opts.upload_id = resp.upload_id; |
103 | 122 | } |
104 | 123 | return {resp.resp.status.code, std::move(resp.resp.status.msg)}; |
105 | 123 | } |
106 | | |
107 | 100k | void S3FileWriter::_wait_until_finish(std::string_view task_name) { |
108 | 100k | auto timeout_duration = config::s3_file_writer_log_interval_second; |
109 | 100k | auto msg = fmt::format( |
110 | 100k | "{} multipart upload already takes {} seconds, bucket={}, key={}, upload_id={}", |
111 | 100k | task_name, timeout_duration, _obj_storage_path_opts.bucket, |
112 | 100k | _obj_storage_path_opts.path.native(), |
113 | 100k | _obj_storage_path_opts.upload_id.has_value() ? *_obj_storage_path_opts.upload_id : ""); |
114 | 100k | timespec current_time; |
115 | | // We don't need high accuracy here, so we use time(nullptr) |
116 | | // since it's the fastest way to get current time(second) |
117 | 100k | auto current_time_second = time(nullptr); |
118 | 100k | current_time.tv_sec = current_time_second + timeout_duration; |
119 | 100k | current_time.tv_nsec = 0; |
120 | | // bthread::countdown_event::timed_wait() should use absolute time |
121 | 100k | while (0 != _countdown_event.timed_wait(current_time)) { |
122 | 0 | current_time.tv_sec += timeout_duration; |
123 | 0 | LOG(WARNING) << msg; |
124 | 0 | } |
125 | 100k | } |
126 | | |
127 | 55.0k | Status S3FileWriter::close(bool non_block) { |
128 | 55.0k | auto record_close_latency = [this]() { |
129 | 22.2k | if (_close_latency_recorded || !_first_append_timestamp.has_value()) { |
130 | 4 | return; |
131 | 4 | } |
132 | 22.2k | auto now = std::chrono::steady_clock::now(); |
133 | 22.2k | auto latency_ms = std::chrono::duration_cast<std::chrono::milliseconds>( |
134 | 22.2k | now - *_first_append_timestamp) |
135 | 22.2k | .count(); |
136 | 22.2k | s3_file_writer_first_append_to_close_ms_recorder << latency_ms; |
137 | 22.2k | if (auto* sampler = s3_file_writer_first_append_to_close_ms_recorder.get_sampler()) { |
138 | 22.2k | sampler->take_sample(); |
139 | 22.2k | } |
140 | 22.2k | _close_latency_recorded = true; |
141 | 22.2k | }; |
142 | | |
143 | 55.0k | if (state() == State::CLOSED) { |
144 | 27.7k | if (_async_close_pack != nullptr) { |
145 | 13.9k | _st = _async_close_pack->future.get(); |
146 | 13.9k | _async_close_pack = nullptr; |
147 | | // Return the final close status so that a blocking close issued after |
148 | | // an async close observes the real result just like the legacy behavior. |
149 | 13.9k | if (!non_block && _st.ok()) { |
150 | 98 | record_close_latency(); |
151 | 98 | } |
152 | 13.9k | return _st; |
153 | 13.9k | } |
154 | 13.8k | if (non_block) { |
155 | 13.8k | if (_st.ok()) { |
156 | 13.8k | record_close_latency(); |
157 | 13.8k | return Status::Error<ErrorCode::ALREADY_CLOSED>( |
158 | 13.8k | "S3FileWriter already closed, file path {}, file key {}", |
159 | 13.8k | _obj_storage_path_opts.path.native(), _obj_storage_path_opts.key); |
160 | 13.8k | } |
161 | 0 | return _st; |
162 | 13.8k | } |
163 | 0 | if (_st.ok()) { |
164 | 0 | record_close_latency(); |
165 | 0 | return Status::Error<ErrorCode::ALREADY_CLOSED>( |
166 | 0 | "S3FileWriter already closed, file path {}, file key {}", |
167 | 0 | _obj_storage_path_opts.path.native(), _obj_storage_path_opts.key); |
168 | 0 | } |
169 | 0 | return _st; |
170 | 0 | } |
171 | 27.3k | if (state() == State::ASYNC_CLOSING) { |
172 | 4.57k | if (non_block) { |
173 | 0 | return Status::InternalError("Don't submit async close multi times"); |
174 | 0 | } |
175 | 4.57k | CHECK(_async_close_pack != nullptr); |
176 | 4.57k | _st = _async_close_pack->future.get(); |
177 | 4.57k | _async_close_pack = nullptr; |
178 | | // We should wait for all the pre async task to be finished |
179 | 4.57k | _state = State::CLOSED; |
180 | | // The next time we call close() with no matter non_block true or false, it would always return the |
181 | | // '_st' value because this writer is already closed. |
182 | 4.58k | if (!non_block && _st.ok()) { |
183 | 4.57k | record_close_latency(); |
184 | 4.57k | } |
185 | 4.57k | return _st; |
186 | 4.57k | } |
187 | 22.7k | if (non_block) { |
188 | 19.0k | _state = State::ASYNC_CLOSING; |
189 | 19.0k | _async_close_pack = std::make_unique<AsyncCloseStatusPack>(); |
190 | 19.0k | _async_close_pack->future = _async_close_pack->promise.get_future(); |
191 | 19.0k | s3_file_writer_async_close_queuing << 1; |
192 | 19.0k | return ExecEnv::GetInstance()->non_block_close_thread_pool()->submit_func([&]() { |
193 | 19.0k | s3_file_writer_async_close_queuing << -1; |
194 | 19.0k | s3_file_writer_async_close_processing << 1; |
195 | 19.0k | _st = _close_impl(); |
196 | 19.0k | _state = State::CLOSED; |
197 | 19.0k | _async_close_pack->promise.set_value(_st); |
198 | 19.0k | s3_file_writer_async_close_processing << -1; |
199 | 19.0k | }); |
200 | 19.0k | } |
201 | 3.70k | _st = _close_impl(); |
202 | 3.70k | _state = State::CLOSED; |
203 | 3.70k | if (!non_block && _st.ok()) { |
204 | 3.69k | record_close_latency(); |
205 | 3.69k | } |
206 | 3.70k | return _st; |
207 | 22.7k | } |
208 | | |
209 | 23.0k | bool S3FileWriter::_complete_part_task_callback(Status s) { |
210 | 23.0k | bool ret = false; |
211 | 23.0k | if (!s.ok()) [[unlikely]] { |
212 | 14 | VLOG_NOTICE << "failed at key: " << _obj_storage_path_opts.key |
213 | 0 | << ", status: " << s.to_string(); |
214 | 14 | std::unique_lock<std::mutex> _lck {_completed_lock}; |
215 | 14 | _failed = true; |
216 | 14 | ret = true; |
217 | 14 | _st = std::move(s); |
218 | 14 | } |
219 | | // After the signal, there is a scenario where the previous invocation of _wait_until_finish |
220 | | // returns to the caller, and subsequently, the S3 file writer is destructed. |
221 | | // This means that accessing _failed afterwards would result in a heap use after free vulnerability. |
222 | 23.0k | _countdown_event.signal(); |
223 | 23.0k | return ret; |
224 | 23.0k | } |
225 | | |
226 | 23.0k | Status S3FileWriter::_build_upload_buffer() { |
227 | 23.0k | auto builder = FileBufferBuilder(); |
228 | 23.0k | builder.set_type(BufferType::UPLOAD) |
229 | 23.0k | .set_upload_callback([part_num = _cur_part_num, this](UploadFileBuffer& buf) { |
230 | 466 | _upload_one_part(part_num, buf); |
231 | 466 | }) |
232 | 23.0k | .set_file_offset(_bytes_appended) |
233 | 23.0k | .set_sync_after_complete_task([this](auto&& PH1) { |
234 | 23.0k | return _complete_part_task_callback(std::forward<decltype(PH1)>(PH1)); |
235 | 23.0k | }) |
236 | 23.5k | .set_is_cancelled([this]() { return _failed.load(); }); |
237 | 23.0k | if (_cache_builder != nullptr) { |
238 | | // We would load the data into file cache asynchronously which indicates |
239 | | // that this instance of S3FileWriter might have been destructed when we |
240 | | // try to do writing into file cache, so we make the lambda capture the variable |
241 | | // we need by value to extend their lifetime |
242 | 5.77k | int64_t id = get_tablet_id(_obj_storage_path_opts.path.native()).value_or(0); |
243 | 5.77k | builder.set_allocate_file_blocks_holder([builder = *_cache_builder, |
244 | 5.77k | offset = _bytes_appended, |
245 | 5.77k | tablet_id = id]() -> FileBlocksHolderPtr { |
246 | 5.77k | return builder.allocate_cache_holder(offset, config::s3_write_buffer_size, tablet_id); |
247 | 5.77k | }); |
248 | 5.77k | } |
249 | 23.0k | RETURN_IF_ERROR(builder.build(&_pending_buf)); |
250 | 23.0k | auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get()); |
251 | 23.0k | DCHECK(buf != nullptr); |
252 | 23.0k | return Status::OK(); |
253 | 23.0k | } |
254 | | |
255 | 22.7k | Status S3FileWriter::_close_impl() { |
256 | 22.7k | VLOG_DEBUG << "S3FileWriter::close, path: " << _obj_storage_path_opts.path.native(); |
257 | | |
258 | 22.7k | if (_cur_part_num == 1 && _pending_buf) { // data size is less than config::s3_write_buffer_size |
259 | 22.6k | RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size()); |
260 | 22.6k | } |
261 | | |
262 | 22.7k | if (_bytes_appended == 0) { |
263 | 6 | DCHECK_EQ(_cur_part_num, 1); |
264 | | // No data written, but need to create an empty file |
265 | 6 | RETURN_IF_ERROR(_build_upload_buffer()); |
266 | 6 | if (!_used_by_s3_committer) { |
267 | 6 | auto* pending_buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get()); |
268 | 6 | pending_buf->set_upload_to_remote([this](UploadFileBuffer& buf) { _put_object(buf); }); |
269 | 6 | } else { |
270 | 0 | RETURN_IF_ERROR(_create_multi_upload_request()); |
271 | 0 | } |
272 | 6 | } |
273 | | |
274 | 22.7k | if (_pending_buf != nullptr) { // there is remaining data in buffer need to be uploaded |
275 | 22.7k | _countdown_event.add_count(); |
276 | 22.7k | RETURN_IF_ERROR(FileBuffer::submit(std::move(_pending_buf))); |
277 | 22.7k | _pending_buf = nullptr; |
278 | 22.7k | } |
279 | | |
280 | 22.7k | RETURN_IF_ERROR(_complete()); |
281 | 22.7k | SYNC_POINT_RETURN_WITH_VALUE("s3_file_writer::close", Status()); |
282 | | |
283 | 22.7k | return Status::OK(); |
284 | 22.7k | } |
285 | | |
286 | 356k | Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) { |
287 | 356k | if (state() != State::OPENED) [[unlikely]] { |
288 | 0 | return Status::InternalError("append to closed file: {}", |
289 | 0 | _obj_storage_path_opts.path.native()); |
290 | 0 | } |
291 | | |
292 | 356k | if (!_first_append_timestamp.has_value()) { |
293 | 21.6k | _first_append_timestamp = std::chrono::steady_clock::now(); |
294 | 21.6k | } |
295 | | |
296 | 356k | size_t buffer_size = config::s3_write_buffer_size; |
297 | 356k | TEST_SYNC_POINT_RETURN_WITH_VALUE("s3_file_writer::appenv", Status()); |
298 | 1.26M | for (size_t i = 0; i < data_cnt; i++) { |
299 | 907k | size_t data_size = data[i].get_size(); |
300 | 1.76M | for (size_t pos = 0, data_size_to_append = 0; pos < data_size; pos += data_size_to_append) { |
301 | 853k | if (_failed) { |
302 | 0 | return _st; |
303 | 0 | } |
304 | 853k | if (!_pending_buf) { |
305 | 21.7k | RETURN_IF_ERROR(_build_upload_buffer()); |
306 | 21.7k | } |
307 | | // we need to make sure all parts except the last one to be 5MB or more |
308 | | // and shouldn't be larger than buf |
309 | 853k | data_size_to_append = std::min(data_size - pos, _pending_buf->get_file_offset() + |
310 | 853k | buffer_size - _bytes_appended); |
311 | | |
312 | | // if the buffer has memory buf inside, the data would be written into memory first then S3 then file cache |
313 | | // it would be written to cache then S3 if the buffer doesn't have memory preserved |
314 | 853k | RETURN_IF_ERROR(_pending_buf->append_data( |
315 | 853k | Slice {data[i].get_data() + pos, data_size_to_append})); |
316 | 853k | TEST_SYNC_POINT_CALLBACK("s3_file_writer::appenv_1", &_pending_buf, _cur_part_num); |
317 | | |
318 | | // If this is the last part and the data size is less than s3_write_buffer_size, |
319 | | // the pending_buf will be handled by _close_impl() and _complete() |
320 | | // If this is the last part and the data size is equal to s3_write_buffer_size, |
321 | | // the pending_buf is handled here and submitted. it will be waited by _complete() |
322 | 853k | if (_pending_buf->get_size() == buffer_size) { |
323 | | // only create multiple upload request when the data size is |
324 | | // larger or equal to s3_write_buffer_size than one memory buffer |
325 | 99 | if (_cur_part_num == 1) { |
326 | 76 | RETURN_IF_ERROR(_create_multi_upload_request()); |
327 | 76 | } |
328 | 99 | _cur_part_num++; |
329 | 99 | _countdown_event.add_count(); |
330 | 99 | RETURN_IF_ERROR(FileBuffer::submit(std::move(_pending_buf))); |
331 | 99 | _pending_buf = nullptr; |
332 | 99 | } |
333 | 853k | _bytes_appended += data_size_to_append; |
334 | 853k | } |
335 | 907k | } |
336 | 356k | return Status::OK(); |
337 | 356k | } |
338 | | |
339 | 466 | void S3FileWriter::_upload_one_part(int part_num, UploadFileBuffer& buf) { |
340 | 466 | VLOG_DEBUG << "upload_one_part " << _obj_storage_path_opts.path.native() |
341 | 0 | << " part=" << part_num; |
342 | 466 | if (buf.is_cancelled()) { |
343 | 0 | LOG_INFO("file {} skip part {} because previous failure {}", |
344 | 0 | _obj_storage_path_opts.path.native(), part_num, _st); |
345 | 0 | return; |
346 | 0 | } |
347 | 466 | const auto& client = _obj_client->get(); |
348 | 466 | if (nullptr == client) { |
349 | 0 | LOG_WARNING("failed to upload part, key={}, part_num={} bacause of null obj client", |
350 | 0 | _obj_storage_path_opts.key, part_num); |
351 | 0 | buf.set_status(Status::InternalError<false>("invalid obj storage client")); |
352 | 0 | return; |
353 | 0 | } |
354 | 466 | auto resp = client->upload_part(_obj_storage_path_opts, buf.get_string_view_data(), part_num); |
355 | 466 | if (resp.resp.status.code != ErrorCode::OK) { |
356 | 1 | LOG_WARNING("failed to upload part, key={}, part_num={}, status={}", |
357 | 1 | _obj_storage_path_opts.key, part_num, resp.resp.status.msg); |
358 | 1 | buf.set_status(Status(resp.resp.status.code, std::move(resp.resp.status.msg))); |
359 | 1 | return; |
360 | 1 | } |
361 | 465 | s3_bytes_written_total << buf.get_size(); |
362 | | |
363 | 465 | ObjectCompleteMultiPart completed_part { |
364 | 465 | static_cast<int>(part_num), resp.etag.has_value() ? std::move(resp.etag.value()) : ""}; |
365 | | |
366 | 465 | std::unique_lock<std::mutex> lck {_completed_lock}; |
367 | 465 | _completed_parts.emplace_back(std::move(completed_part)); |
368 | 465 | } |
369 | | |
370 | | // if enabled check |
371 | | // 1. issue a head object request for existence check |
372 | | // 2. check the file size |
373 | | Status check_after_upload(ObjStorageClient* client, const ObjectStorageResponse& upload_res, |
374 | | const ObjectStoragePathOptions& path_opt, int64_t bytes_appended, |
375 | 22.7k | const std::string& put_or_comp) { |
376 | 22.7k | if (!config::enable_s3_object_check_after_upload) return Status::OK(); |
377 | | |
378 | 22.7k | auto head_res = client->head_object(path_opt); |
379 | | |
380 | | // clang-format off |
381 | 22.7k | auto err_msg = [&]() { |
382 | 0 | std::stringstream ss; |
383 | 0 | ss << "failed to check object after upload=" << put_or_comp |
384 | 0 | << " file_path=" << path_opt.path.native() |
385 | 0 | << fmt::format(" {}_err=", put_or_comp) << upload_res.status.msg |
386 | 0 | << fmt::format(" {}_code=", put_or_comp) << upload_res.status.code |
387 | 0 | << fmt::format(" {}_http_code=", put_or_comp) << upload_res.http_code |
388 | 0 | << fmt::format(" {}_request_id=", put_or_comp) << upload_res.request_id |
389 | 0 | << " head_err=" << head_res.resp.status.msg |
390 | 0 | << " head_code=" << head_res.resp.status.code |
391 | 0 | << " head_http_code=" << head_res.resp.http_code |
392 | 0 | << " head_request_id=" << head_res.resp.request_id; |
393 | 0 | return ss.str(); |
394 | 0 | }; |
395 | | // clang-format on |
396 | | |
397 | | // TODO(gavin): make it fail by injection |
398 | 22.7k | TEST_SYNC_POINT_CALLBACK("S3FileWriter::check_after_load", &head_res); |
399 | 22.7k | if (head_res.resp.status.code != ErrorCode::OK && head_res.resp.http_code != 200) { |
400 | 0 | LOG(WARNING) << "failed to issue head object after upload, " << err_msg(); |
401 | 0 | DCHECK(false) << "failed to issue head object after upload, " << err_msg(); |
402 | | // FIXME(gavin): we should retry if this HEAD fails? |
403 | 0 | return Status::IOError( |
404 | 0 | "failed to issue head object after upload, status_code={}, http_code={}, err={}", |
405 | 0 | head_res.resp.status.code, head_res.resp.http_code, head_res.resp.status.msg); |
406 | 0 | } |
407 | 22.7k | if (head_res.file_size != bytes_appended) { |
408 | 0 | LOG(WARNING) << "failed to check size after upload, expected_size=" << bytes_appended |
409 | 0 | << " actual_size=" << head_res.file_size << err_msg(); |
410 | 0 | DCHECK_EQ(bytes_appended, head_res.file_size) |
411 | 0 | << "failed to check size after upload," << err_msg(); |
412 | 0 | return Status::IOError( |
413 | 0 | "failed to check object size after upload, expected_size={} actual_size={}", |
414 | 0 | bytes_appended, head_res.file_size); |
415 | 0 | } |
416 | 22.7k | return Status::OK(); |
417 | 22.7k | } |
418 | | |
419 | 22.7k | Status S3FileWriter::_complete() { |
420 | 22.7k | const auto& client = _obj_client->get(); |
421 | 22.7k | if (nullptr == client) { |
422 | 0 | return Status::InternalError<false>("invalid obj storage client"); |
423 | 0 | } |
424 | 22.7k | if (_failed) { |
425 | 0 | _wait_until_finish("early quit"); |
426 | 0 | return _st; |
427 | 0 | } |
428 | | // When the part num is only one, it means the data is less than 5MB so we can just put it. |
429 | 22.7k | if (_cur_part_num == 1) { |
430 | 22.6k | _wait_until_finish("PutObject"); |
431 | 22.6k | return _st; |
432 | 22.6k | } |
433 | | // Wait multipart load and finish. |
434 | 121 | _wait_until_finish("Complete"); |
435 | 121 | TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:1", |
436 | 121 | std::make_pair(&_failed, &_completed_parts)); |
437 | 121 | if (_used_by_s3_committer) { // S3 committer will complete multipart upload file on FE side. |
438 | 0 | s3_file_created_total << 1; // Assume that it will be created successfully |
439 | 0 | return Status::OK(); |
440 | 0 | } |
441 | | |
442 | | // check number of parts |
443 | 121 | int64_t expected_num_parts1 = (_bytes_appended / config::s3_write_buffer_size) + |
444 | 121 | !!(_bytes_appended % config::s3_write_buffer_size); |
445 | 121 | int64_t expected_num_parts2 = |
446 | 121 | (_bytes_appended % config::s3_write_buffer_size) ? _cur_part_num : _cur_part_num - 1; |
447 | 121 | DCHECK_EQ(expected_num_parts1, expected_num_parts2) |
448 | 0 | << " bytes_appended=" << _bytes_appended << " cur_part_num=" << _cur_part_num |
449 | 0 | << " s3_write_buffer_size=" << config::s3_write_buffer_size; |
450 | 121 | if (_failed || _completed_parts.size() != static_cast<size_t>(expected_num_parts1) || |
451 | 121 | expected_num_parts1 != expected_num_parts2) { |
452 | 3 | _st = Status::InternalError( |
453 | 3 | "failed to complete multipart upload, error status={} failed={} #complete_parts={} " |
454 | 3 | "#expected_parts={} " |
455 | 3 | "completed_parts_list={} file_path={} file_size={} has left buffer not uploaded={}", |
456 | 3 | _st, _failed, _completed_parts.size(), expected_num_parts1, _dump_completed_part(), |
457 | 3 | _obj_storage_path_opts.path.native(), _bytes_appended, _pending_buf != nullptr); |
458 | 3 | LOG(WARNING) << _st; |
459 | 3 | return _st; |
460 | 3 | } |
461 | | // make sure _completed_parts are ascending order |
462 | 118 | std::sort(_completed_parts.begin(), _completed_parts.end(), |
463 | 1.03k | [](auto& p1, auto& p2) { return p1.part_num < p2.part_num; }); |
464 | 118 | TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:2", &_completed_parts); |
465 | 118 | LOG(INFO) << "complete_multipart_upload " << _obj_storage_path_opts.path.native() |
466 | 118 | << " size=" << _bytes_appended << " number_parts=" << _completed_parts.size() |
467 | 118 | << " s3_write_buffer_size=" << config::s3_write_buffer_size; |
468 | 118 | auto resp = client->complete_multipart_upload(_obj_storage_path_opts, _completed_parts); |
469 | 118 | if (resp.status.code != ErrorCode::OK) { |
470 | 2 | LOG_WARNING("failed to complete multipart upload, err={}, file_path={}", resp.status.msg, |
471 | 2 | _obj_storage_path_opts.path.native()); |
472 | 2 | return {resp.status.code, std::move(resp.status.msg)}; |
473 | 2 | } |
474 | | |
475 | 116 | RETURN_IF_ERROR(check_after_upload(client.get(), resp, _obj_storage_path_opts, _bytes_appended, |
476 | 116 | "complete_multipart")); |
477 | | |
478 | 116 | s3_file_created_total << 1; |
479 | 116 | return Status::OK(); |
480 | 116 | } |
481 | | |
482 | 22.6k | Status S3FileWriter::_set_upload_to_remote_less_than_buffer_size() { |
483 | 22.6k | auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get()); |
484 | 22.6k | DCHECK(buf != nullptr); |
485 | 22.6k | if (_used_by_s3_committer) { |
486 | | // If used_by_s3_committer, we always use multi-parts uploading. |
487 | 0 | buf->set_upload_to_remote([part_num = _cur_part_num, this](UploadFileBuffer& buf) { |
488 | 0 | _upload_one_part(part_num, buf); |
489 | 0 | }); |
490 | 0 | DCHECK(_cur_part_num == 1); |
491 | 0 | RETURN_IF_ERROR(_create_multi_upload_request()); |
492 | 22.6k | } else { |
493 | | // if we only need to upload one file less than 5MB, we can just |
494 | | // call PutObject to reduce the network IO |
495 | 22.6k | buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); }); |
496 | 22.6k | } |
497 | 22.6k | return Status::OK(); |
498 | 22.6k | } |
499 | | |
500 | 21.5k | void S3FileWriter::_put_object(UploadFileBuffer& buf) { |
501 | 21.5k | MonotonicStopWatch timer; |
502 | 21.5k | timer.start(); |
503 | | |
504 | 21.5k | if (state() == State::CLOSED) { |
505 | 0 | DCHECK(state() != State::CLOSED) |
506 | 0 | << "state=" << (int)state() << " path=" << _obj_storage_path_opts.path.native(); |
507 | 0 | LOG_WARNING("failed to put object because file closed, file path {}", |
508 | 0 | _obj_storage_path_opts.path.native()); |
509 | 0 | buf.set_status(Status::InternalError<false>("try to put closed file")); |
510 | 0 | return; |
511 | 0 | } |
512 | 21.5k | const auto& client = _obj_client->get(); |
513 | 21.5k | if (nullptr == client) { |
514 | 0 | buf.set_status(Status::InternalError<false>("invalid obj storage client")); |
515 | 0 | return; |
516 | 0 | } |
517 | 21.5k | TEST_SYNC_POINT_RETURN_WITH_VOID("S3FileWriter::_put_object", this, &buf); |
518 | 21.5k | auto resp = client->put_object(_obj_storage_path_opts, buf.get_string_view_data()); |
519 | 21.5k | timer.stop(); |
520 | | |
521 | 21.5k | if (resp.status.code != ErrorCode::OK) { |
522 | 11 | LOG_WARNING("failed to put object, put object failed because {}, file path {}, time={}ms", |
523 | 11 | resp.status.msg, _obj_storage_path_opts.path.native(), |
524 | 11 | timer.elapsed_time_milliseconds()); |
525 | 11 | buf.set_status({resp.status.code, std::move(resp.status.msg)}); |
526 | 11 | return; |
527 | 11 | } |
528 | | |
529 | 21.5k | auto st = check_after_upload(client.get(), resp, _obj_storage_path_opts, _bytes_appended, |
530 | 21.5k | "put_object"); |
531 | 21.5k | if (!st.ok()) { |
532 | 0 | buf.set_status(st); |
533 | 0 | return; |
534 | 0 | } |
535 | | |
536 | 21.5k | LOG(INFO) << "put_object " << _obj_storage_path_opts.path.native() |
537 | 21.5k | << " size=" << _bytes_appended << " time=" << timer.elapsed_time_milliseconds() |
538 | 21.5k | << "ms"; |
539 | 21.5k | s3_file_created_total << 1; |
540 | 21.5k | s3_bytes_written_total << buf.get_size(); |
541 | 21.5k | } |
542 | | |
543 | 3 | std::string S3FileWriter::_dump_completed_part() const { |
544 | 3 | std::stringstream ss; |
545 | 3 | ss << "part_numbers:"; |
546 | 3 | for (const auto& part : _completed_parts) { |
547 | 2 | ss << " " << part.part_num; |
548 | 2 | } |
549 | 3 | return ss.str(); |
550 | 3 | } |
551 | | |
552 | | } // namespace doris::io |