Coverage Report

Created: 2026-04-10 16:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/s3_file_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/s3_file_writer.h"
19
20
#include <aws/s3/model/CompletedPart.h>
21
#include <bvar/recorder.h>
22
#include <bvar/reducer.h>
23
#include <bvar/window.h>
24
#include <fmt/core.h>
25
#include <glog/logging.h>
26
27
#include <sstream>
28
#include <tuple>
29
#include <utility>
30
31
#include "common/config.h"
32
#include "common/status.h"
33
#include "cpp/sync_point.h"
34
#include "io/cache/block_file_cache.h"
35
#include "io/cache/block_file_cache_factory.h"
36
#include "io/cache/file_block.h"
37
#include "io/cache/file_cache_common.h"
38
#include "io/fs/file_writer.h"
39
#include "io/fs/path.h"
40
#include "io/fs/s3_file_bufferpool.h"
41
#include "io/fs/s3_file_system.h"
42
#include "io/fs/s3_obj_storage_client.h"
43
#include "runtime/exec_env.h"
44
#include "util/s3_util.h"
45
#include "util/stopwatch.hpp"
46
47
namespace doris::io {
48
49
bvar::Adder<uint64_t> s3_file_writer_total("s3_file_writer_total_num");
50
bvar::Adder<uint64_t> s3_bytes_written_total("s3_file_writer_bytes_written");
51
bvar::Adder<uint64_t> s3_file_created_total("s3_file_writer_file_created");
52
bvar::Adder<uint64_t> s3_file_being_written("s3_file_writer_file_being_written");
53
bvar::Adder<uint64_t> s3_file_writer_async_close_queuing("s3_file_writer_async_close_queuing");
54
bvar::Adder<uint64_t> s3_file_writer_async_close_processing(
55
        "s3_file_writer_async_close_processing");
56
bvar::IntRecorder s3_file_writer_first_append_to_close_ms_recorder;
57
bvar::Window<bvar::IntRecorder> s3_file_writer_first_append_to_close_ms_window(
58
        "s3_file_writer_first_append_to_close_ms",
59
        &s3_file_writer_first_append_to_close_ms_recorder, /*window_size=*/10);
60
61
S3FileWriter::S3FileWriter(std::shared_ptr<ObjClientHolder> client, std::string bucket,
62
                           std::string key, const FileWriterOptions* opts)
63
83.5k
        : _obj_storage_path_opts({.path = fmt::format("s3://{}/{}", bucket, key),
64
83.5k
                                  .bucket = std::move(bucket),
65
83.5k
                                  .key = std::move(key)}),
66
83.5k
          _used_by_s3_committer(opts ? opts->used_by_s3_committer : false),
67
83.5k
          _obj_client(std::move(client)) {
68
83.5k
    s3_file_writer_total << 1;
69
83.5k
    s3_file_being_written << 1;
70
83.5k
    Aws::Http::SetCompliantRfc3986Encoding(true);
71
72
83.5k
    init_cache_builder(opts, _obj_storage_path_opts.path);
73
83.5k
}
74
75
78.0k
S3FileWriter::~S3FileWriter() {
76
78.0k
    if (_async_close_pack != nullptr) {
77
        // For thread safety
78
516
        std::ignore = _async_close_pack->future.get();
79
516
        _async_close_pack = nullptr;
80
77.4k
    } else {
81
        // Consider one situation where the file writer is destructed after it submit at least one async task
82
        // without calling close(), then there exists one occasion where the async task is executed right after
83
        // the correspoding S3 file writer is already destructed
84
77.4k
        _wait_until_finish(fmt::format("wait s3 file {} upload to be finished",
85
77.4k
                                       _obj_storage_path_opts.path.native()));
86
77.4k
    }
87
    // We won't do S3 abort operation in BE, we let s3 service do it own.
88
78.0k
    if (state() == State::OPENED && !_failed) {
89
61.3k
        s3_bytes_written_total << _bytes_appended;
90
61.3k
    }
91
78.0k
    s3_file_being_written << -1;
92
78.0k
}
93
94
123
Status S3FileWriter::_create_multi_upload_request() {
95
123
    LOG(INFO) << "create_multi_upload_request " << _obj_storage_path_opts.path.native();
96
123
    const auto& client = _obj_client->get();
97
123
    if (nullptr == client) {
98
0
        return Status::InternalError<false>("invalid obj storage client");
99
0
    }
100
123
    auto resp = client->create_multipart_upload(_obj_storage_path_opts);
101
123
    if (resp.resp.status.code == ErrorCode::OK) {
102
122
        _obj_storage_path_opts.upload_id = resp.upload_id;
103
122
    }
104
123
    return {resp.resp.status.code, std::move(resp.resp.status.msg)};
105
123
}
106
107
100k
void S3FileWriter::_wait_until_finish(std::string_view task_name) {
108
100k
    auto timeout_duration = config::s3_file_writer_log_interval_second;
109
100k
    auto msg = fmt::format(
110
100k
            "{} multipart upload already takes {} seconds, bucket={}, key={}, upload_id={}",
111
100k
            task_name, timeout_duration, _obj_storage_path_opts.bucket,
112
100k
            _obj_storage_path_opts.path.native(),
113
100k
            _obj_storage_path_opts.upload_id.has_value() ? *_obj_storage_path_opts.upload_id : "");
114
100k
    timespec current_time;
115
    // We don't need high accuracy here, so we use time(nullptr)
116
    // since it's the fastest way to get current time(second)
117
100k
    auto current_time_second = time(nullptr);
118
100k
    current_time.tv_sec = current_time_second + timeout_duration;
119
100k
    current_time.tv_nsec = 0;
120
    // bthread::countdown_event::timed_wait() should use absolute time
121
100k
    while (0 != _countdown_event.timed_wait(current_time)) {
122
0
        current_time.tv_sec += timeout_duration;
123
0
        LOG(WARNING) << msg;
124
0
    }
125
100k
}
126
127
55.0k
Status S3FileWriter::close(bool non_block) {
128
55.0k
    auto record_close_latency = [this]() {
129
22.2k
        if (_close_latency_recorded || !_first_append_timestamp.has_value()) {
130
4
            return;
131
4
        }
132
22.2k
        auto now = std::chrono::steady_clock::now();
133
22.2k
        auto latency_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
134
22.2k
                                  now - *_first_append_timestamp)
135
22.2k
                                  .count();
136
22.2k
        s3_file_writer_first_append_to_close_ms_recorder << latency_ms;
137
22.2k
        if (auto* sampler = s3_file_writer_first_append_to_close_ms_recorder.get_sampler()) {
138
22.2k
            sampler->take_sample();
139
22.2k
        }
140
22.2k
        _close_latency_recorded = true;
141
22.2k
    };
142
143
55.0k
    if (state() == State::CLOSED) {
144
27.7k
        if (_async_close_pack != nullptr) {
145
13.9k
            _st = _async_close_pack->future.get();
146
13.9k
            _async_close_pack = nullptr;
147
            // Return the final close status so that a blocking close issued after
148
            // an async close observes the real result just like the legacy behavior.
149
13.9k
            if (!non_block && _st.ok()) {
150
98
                record_close_latency();
151
98
            }
152
13.9k
            return _st;
153
13.9k
        }
154
13.8k
        if (non_block) {
155
13.8k
            if (_st.ok()) {
156
13.8k
                record_close_latency();
157
13.8k
                return Status::Error<ErrorCode::ALREADY_CLOSED>(
158
13.8k
                        "S3FileWriter already closed, file path {}, file key {}",
159
13.8k
                        _obj_storage_path_opts.path.native(), _obj_storage_path_opts.key);
160
13.8k
            }
161
0
            return _st;
162
13.8k
        }
163
0
        if (_st.ok()) {
164
0
            record_close_latency();
165
0
            return Status::Error<ErrorCode::ALREADY_CLOSED>(
166
0
                    "S3FileWriter already closed, file path {}, file key {}",
167
0
                    _obj_storage_path_opts.path.native(), _obj_storage_path_opts.key);
168
0
        }
169
0
        return _st;
170
0
    }
171
27.3k
    if (state() == State::ASYNC_CLOSING) {
172
4.57k
        if (non_block) {
173
0
            return Status::InternalError("Don't submit async close multi times");
174
0
        }
175
4.57k
        CHECK(_async_close_pack != nullptr);
176
4.57k
        _st = _async_close_pack->future.get();
177
4.57k
        _async_close_pack = nullptr;
178
        // We should wait for all the pre async task to be finished
179
4.57k
        _state = State::CLOSED;
180
        // The next time we call close() with no matter non_block true or false, it would always return the
181
        // '_st' value because this writer is already closed.
182
4.58k
        if (!non_block && _st.ok()) {
183
4.57k
            record_close_latency();
184
4.57k
        }
185
4.57k
        return _st;
186
4.57k
    }
187
22.7k
    if (non_block) {
188
19.0k
        _state = State::ASYNC_CLOSING;
189
19.0k
        _async_close_pack = std::make_unique<AsyncCloseStatusPack>();
190
19.0k
        _async_close_pack->future = _async_close_pack->promise.get_future();
191
19.0k
        s3_file_writer_async_close_queuing << 1;
192
19.0k
        return ExecEnv::GetInstance()->non_block_close_thread_pool()->submit_func([&]() {
193
19.0k
            s3_file_writer_async_close_queuing << -1;
194
19.0k
            s3_file_writer_async_close_processing << 1;
195
19.0k
            _st = _close_impl();
196
19.0k
            _state = State::CLOSED;
197
19.0k
            _async_close_pack->promise.set_value(_st);
198
19.0k
            s3_file_writer_async_close_processing << -1;
199
19.0k
        });
200
19.0k
    }
201
3.70k
    _st = _close_impl();
202
3.70k
    _state = State::CLOSED;
203
3.70k
    if (!non_block && _st.ok()) {
204
3.69k
        record_close_latency();
205
3.69k
    }
206
3.70k
    return _st;
207
22.7k
}
208
209
23.0k
bool S3FileWriter::_complete_part_task_callback(Status s) {
210
23.0k
    bool ret = false;
211
23.0k
    if (!s.ok()) [[unlikely]] {
212
14
        VLOG_NOTICE << "failed at key: " << _obj_storage_path_opts.key
213
0
                    << ", status: " << s.to_string();
214
14
        std::unique_lock<std::mutex> _lck {_completed_lock};
215
14
        _failed = true;
216
14
        ret = true;
217
14
        _st = std::move(s);
218
14
    }
219
    // After the signal, there is a scenario where the previous invocation of _wait_until_finish
220
    // returns to the caller, and subsequently, the S3 file writer is destructed.
221
    // This means that accessing _failed afterwards would result in a heap use after free vulnerability.
222
23.0k
    _countdown_event.signal();
223
23.0k
    return ret;
224
23.0k
}
225
226
23.0k
Status S3FileWriter::_build_upload_buffer() {
227
23.0k
    auto builder = FileBufferBuilder();
228
23.0k
    builder.set_type(BufferType::UPLOAD)
229
23.0k
            .set_upload_callback([part_num = _cur_part_num, this](UploadFileBuffer& buf) {
230
466
                _upload_one_part(part_num, buf);
231
466
            })
232
23.0k
            .set_file_offset(_bytes_appended)
233
23.0k
            .set_sync_after_complete_task([this](auto&& PH1) {
234
23.0k
                return _complete_part_task_callback(std::forward<decltype(PH1)>(PH1));
235
23.0k
            })
236
23.5k
            .set_is_cancelled([this]() { return _failed.load(); });
237
23.0k
    if (_cache_builder != nullptr) {
238
        // We would load the data into file cache asynchronously which indicates
239
        // that this instance of S3FileWriter might have been destructed when we
240
        // try to do writing into file cache, so we make the lambda capture the variable
241
        // we need by value to extend their lifetime
242
5.77k
        int64_t id = get_tablet_id(_obj_storage_path_opts.path.native()).value_or(0);
243
5.77k
        builder.set_allocate_file_blocks_holder([builder = *_cache_builder,
244
5.77k
                                                 offset = _bytes_appended,
245
5.77k
                                                 tablet_id = id]() -> FileBlocksHolderPtr {
246
5.77k
            return builder.allocate_cache_holder(offset, config::s3_write_buffer_size, tablet_id);
247
5.77k
        });
248
5.77k
    }
249
23.0k
    RETURN_IF_ERROR(builder.build(&_pending_buf));
250
23.0k
    auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
251
23.0k
    DCHECK(buf != nullptr);
252
23.0k
    return Status::OK();
253
23.0k
}
254
255
22.7k
Status S3FileWriter::_close_impl() {
256
22.7k
    VLOG_DEBUG << "S3FileWriter::close, path: " << _obj_storage_path_opts.path.native();
257
258
22.7k
    if (_cur_part_num == 1 && _pending_buf) { // data size is less than config::s3_write_buffer_size
259
22.6k
        RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size());
260
22.6k
    }
261
262
22.7k
    if (_bytes_appended == 0) {
263
6
        DCHECK_EQ(_cur_part_num, 1);
264
        // No data written, but need to create an empty file
265
6
        RETURN_IF_ERROR(_build_upload_buffer());
266
6
        if (!_used_by_s3_committer) {
267
6
            auto* pending_buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
268
6
            pending_buf->set_upload_to_remote([this](UploadFileBuffer& buf) { _put_object(buf); });
269
6
        } else {
270
0
            RETURN_IF_ERROR(_create_multi_upload_request());
271
0
        }
272
6
    }
273
274
22.7k
    if (_pending_buf != nullptr) { // there is remaining data in buffer need to be uploaded
275
22.7k
        _countdown_event.add_count();
276
22.7k
        RETURN_IF_ERROR(FileBuffer::submit(std::move(_pending_buf)));
277
22.7k
        _pending_buf = nullptr;
278
22.7k
    }
279
280
22.7k
    RETURN_IF_ERROR(_complete());
281
22.7k
    SYNC_POINT_RETURN_WITH_VALUE("s3_file_writer::close", Status());
282
283
22.7k
    return Status::OK();
284
22.7k
}
285
286
356k
Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
287
356k
    if (state() != State::OPENED) [[unlikely]] {
288
0
        return Status::InternalError("append to closed file: {}",
289
0
                                     _obj_storage_path_opts.path.native());
290
0
    }
291
292
356k
    if (!_first_append_timestamp.has_value()) {
293
21.6k
        _first_append_timestamp = std::chrono::steady_clock::now();
294
21.6k
    }
295
296
356k
    size_t buffer_size = config::s3_write_buffer_size;
297
356k
    TEST_SYNC_POINT_RETURN_WITH_VALUE("s3_file_writer::appenv", Status());
298
1.26M
    for (size_t i = 0; i < data_cnt; i++) {
299
907k
        size_t data_size = data[i].get_size();
300
1.76M
        for (size_t pos = 0, data_size_to_append = 0; pos < data_size; pos += data_size_to_append) {
301
853k
            if (_failed) {
302
0
                return _st;
303
0
            }
304
853k
            if (!_pending_buf) {
305
21.7k
                RETURN_IF_ERROR(_build_upload_buffer());
306
21.7k
            }
307
            // we need to make sure all parts except the last one to be 5MB or more
308
            // and shouldn't be larger than buf
309
853k
            data_size_to_append = std::min(data_size - pos, _pending_buf->get_file_offset() +
310
853k
                                                                    buffer_size - _bytes_appended);
311
312
            // if the buffer has memory buf inside, the data would be written into memory first then S3 then file cache
313
            // it would be written to cache then S3 if the buffer doesn't have memory preserved
314
853k
            RETURN_IF_ERROR(_pending_buf->append_data(
315
853k
                    Slice {data[i].get_data() + pos, data_size_to_append}));
316
853k
            TEST_SYNC_POINT_CALLBACK("s3_file_writer::appenv_1", &_pending_buf, _cur_part_num);
317
318
            // If this is the last part and the data size is less than s3_write_buffer_size,
319
            // the pending_buf will be handled by _close_impl() and _complete()
320
            // If this is the last part and the data size is equal to s3_write_buffer_size,
321
            // the pending_buf is handled here and submitted. it will be waited by _complete()
322
853k
            if (_pending_buf->get_size() == buffer_size) {
323
                // only create multiple upload request when the data size is
324
                // larger or equal to s3_write_buffer_size than one memory buffer
325
99
                if (_cur_part_num == 1) {
326
76
                    RETURN_IF_ERROR(_create_multi_upload_request());
327
76
                }
328
99
                _cur_part_num++;
329
99
                _countdown_event.add_count();
330
99
                RETURN_IF_ERROR(FileBuffer::submit(std::move(_pending_buf)));
331
99
                _pending_buf = nullptr;
332
99
            }
333
853k
            _bytes_appended += data_size_to_append;
334
853k
        }
335
907k
    }
336
356k
    return Status::OK();
337
356k
}
338
339
466
void S3FileWriter::_upload_one_part(int part_num, UploadFileBuffer& buf) {
340
466
    VLOG_DEBUG << "upload_one_part " << _obj_storage_path_opts.path.native()
341
0
               << " part=" << part_num;
342
466
    if (buf.is_cancelled()) {
343
0
        LOG_INFO("file {} skip part {} because previous failure {}",
344
0
                 _obj_storage_path_opts.path.native(), part_num, _st);
345
0
        return;
346
0
    }
347
466
    const auto& client = _obj_client->get();
348
466
    if (nullptr == client) {
349
0
        LOG_WARNING("failed to upload part, key={}, part_num={} bacause of null obj client",
350
0
                    _obj_storage_path_opts.key, part_num);
351
0
        buf.set_status(Status::InternalError<false>("invalid obj storage client"));
352
0
        return;
353
0
    }
354
466
    auto resp = client->upload_part(_obj_storage_path_opts, buf.get_string_view_data(), part_num);
355
466
    if (resp.resp.status.code != ErrorCode::OK) {
356
1
        LOG_WARNING("failed to upload part, key={}, part_num={}, status={}",
357
1
                    _obj_storage_path_opts.key, part_num, resp.resp.status.msg);
358
1
        buf.set_status(Status(resp.resp.status.code, std::move(resp.resp.status.msg)));
359
1
        return;
360
1
    }
361
465
    s3_bytes_written_total << buf.get_size();
362
363
465
    ObjectCompleteMultiPart completed_part {
364
465
            static_cast<int>(part_num), resp.etag.has_value() ? std::move(resp.etag.value()) : ""};
365
366
465
    std::unique_lock<std::mutex> lck {_completed_lock};
367
465
    _completed_parts.emplace_back(std::move(completed_part));
368
465
}
369
370
// if enabled check
371
// 1. issue a head object request for existence check
372
// 2. check the file size
373
Status check_after_upload(ObjStorageClient* client, const ObjectStorageResponse& upload_res,
374
                          const ObjectStoragePathOptions& path_opt, int64_t bytes_appended,
375
22.7k
                          const std::string& put_or_comp) {
376
22.7k
    if (!config::enable_s3_object_check_after_upload) return Status::OK();
377
378
22.7k
    auto head_res = client->head_object(path_opt);
379
380
    // clang-format off
381
22.7k
    auto err_msg = [&]() {
382
0
        std::stringstream ss;
383
0
        ss << "failed to check object after upload=" << put_or_comp
384
0
            << " file_path=" << path_opt.path.native()
385
0
            << fmt::format(" {}_err=", put_or_comp) << upload_res.status.msg
386
0
            << fmt::format(" {}_code=", put_or_comp) << upload_res.status.code
387
0
            << fmt::format(" {}_http_code=", put_or_comp) << upload_res.http_code
388
0
            << fmt::format(" {}_request_id=", put_or_comp) << upload_res.request_id
389
0
            << " head_err=" << head_res.resp.status.msg
390
0
            << " head_code=" << head_res.resp.status.code
391
0
            << " head_http_code=" << head_res.resp.http_code
392
0
            << " head_request_id=" << head_res.resp.request_id;
393
0
        return ss.str();
394
0
    };
395
    // clang-format on
396
397
    // TODO(gavin): make it fail by injection
398
22.7k
    TEST_SYNC_POINT_CALLBACK("S3FileWriter::check_after_load", &head_res);
399
22.7k
    if (head_res.resp.status.code != ErrorCode::OK && head_res.resp.http_code != 200) {
400
0
        LOG(WARNING) << "failed to issue head object after upload, " << err_msg();
401
0
        DCHECK(false) << "failed to issue head object after upload, " << err_msg();
402
        // FIXME(gavin): we should retry if this HEAD fails?
403
0
        return Status::IOError(
404
0
                "failed to issue head object after upload, status_code={}, http_code={}, err={}",
405
0
                head_res.resp.status.code, head_res.resp.http_code, head_res.resp.status.msg);
406
0
    }
407
22.7k
    if (head_res.file_size != bytes_appended) {
408
0
        LOG(WARNING) << "failed to check size after upload, expected_size=" << bytes_appended
409
0
                     << " actual_size=" << head_res.file_size << err_msg();
410
0
        DCHECK_EQ(bytes_appended, head_res.file_size)
411
0
                << "failed to check size after upload," << err_msg();
412
0
        return Status::IOError(
413
0
                "failed to check object size after upload, expected_size={} actual_size={}",
414
0
                bytes_appended, head_res.file_size);
415
0
    }
416
22.7k
    return Status::OK();
417
22.7k
}
418
419
22.7k
Status S3FileWriter::_complete() {
420
22.7k
    const auto& client = _obj_client->get();
421
22.7k
    if (nullptr == client) {
422
0
        return Status::InternalError<false>("invalid obj storage client");
423
0
    }
424
22.7k
    if (_failed) {
425
0
        _wait_until_finish("early quit");
426
0
        return _st;
427
0
    }
428
    // When the part num is only one, it means the data is less than 5MB so we can just put it.
429
22.7k
    if (_cur_part_num == 1) {
430
22.6k
        _wait_until_finish("PutObject");
431
22.6k
        return _st;
432
22.6k
    }
433
    // Wait multipart load and finish.
434
121
    _wait_until_finish("Complete");
435
121
    TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:1",
436
121
                             std::make_pair(&_failed, &_completed_parts));
437
121
    if (_used_by_s3_committer) {    // S3 committer will complete multipart upload file on FE side.
438
0
        s3_file_created_total << 1; // Assume that it will be created successfully
439
0
        return Status::OK();
440
0
    }
441
442
    // check number of parts
443
121
    int64_t expected_num_parts1 = (_bytes_appended / config::s3_write_buffer_size) +
444
121
                                  !!(_bytes_appended % config::s3_write_buffer_size);
445
121
    int64_t expected_num_parts2 =
446
121
            (_bytes_appended % config::s3_write_buffer_size) ? _cur_part_num : _cur_part_num - 1;
447
121
    DCHECK_EQ(expected_num_parts1, expected_num_parts2)
448
0
            << " bytes_appended=" << _bytes_appended << " cur_part_num=" << _cur_part_num
449
0
            << " s3_write_buffer_size=" << config::s3_write_buffer_size;
450
121
    if (_failed || _completed_parts.size() != static_cast<size_t>(expected_num_parts1) ||
451
121
        expected_num_parts1 != expected_num_parts2) {
452
3
        _st = Status::InternalError(
453
3
                "failed to complete multipart upload, error status={} failed={} #complete_parts={} "
454
3
                "#expected_parts={} "
455
3
                "completed_parts_list={} file_path={} file_size={} has left buffer not uploaded={}",
456
3
                _st, _failed, _completed_parts.size(), expected_num_parts1, _dump_completed_part(),
457
3
                _obj_storage_path_opts.path.native(), _bytes_appended, _pending_buf != nullptr);
458
3
        LOG(WARNING) << _st;
459
3
        return _st;
460
3
    }
461
    // make sure _completed_parts are ascending order
462
118
    std::sort(_completed_parts.begin(), _completed_parts.end(),
463
1.03k
              [](auto& p1, auto& p2) { return p1.part_num < p2.part_num; });
464
118
    TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:2", &_completed_parts);
465
118
    LOG(INFO) << "complete_multipart_upload " << _obj_storage_path_opts.path.native()
466
118
              << " size=" << _bytes_appended << " number_parts=" << _completed_parts.size()
467
118
              << " s3_write_buffer_size=" << config::s3_write_buffer_size;
468
118
    auto resp = client->complete_multipart_upload(_obj_storage_path_opts, _completed_parts);
469
118
    if (resp.status.code != ErrorCode::OK) {
470
2
        LOG_WARNING("failed to complete multipart upload, err={}, file_path={}", resp.status.msg,
471
2
                    _obj_storage_path_opts.path.native());
472
2
        return {resp.status.code, std::move(resp.status.msg)};
473
2
    }
474
475
116
    RETURN_IF_ERROR(check_after_upload(client.get(), resp, _obj_storage_path_opts, _bytes_appended,
476
116
                                       "complete_multipart"));
477
478
116
    s3_file_created_total << 1;
479
116
    return Status::OK();
480
116
}
481
482
22.6k
Status S3FileWriter::_set_upload_to_remote_less_than_buffer_size() {
483
22.6k
    auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
484
22.6k
    DCHECK(buf != nullptr);
485
22.6k
    if (_used_by_s3_committer) {
486
        // If used_by_s3_committer, we always use multi-parts uploading.
487
0
        buf->set_upload_to_remote([part_num = _cur_part_num, this](UploadFileBuffer& buf) {
488
0
            _upload_one_part(part_num, buf);
489
0
        });
490
0
        DCHECK(_cur_part_num == 1);
491
0
        RETURN_IF_ERROR(_create_multi_upload_request());
492
22.6k
    } else {
493
        // if we only need to upload one file less than 5MB, we can just
494
        // call PutObject to reduce the network IO
495
22.6k
        buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); });
496
22.6k
    }
497
22.6k
    return Status::OK();
498
22.6k
}
499
500
21.5k
void S3FileWriter::_put_object(UploadFileBuffer& buf) {
501
21.5k
    MonotonicStopWatch timer;
502
21.5k
    timer.start();
503
504
21.5k
    if (state() == State::CLOSED) {
505
0
        DCHECK(state() != State::CLOSED)
506
0
                << "state=" << (int)state() << " path=" << _obj_storage_path_opts.path.native();
507
0
        LOG_WARNING("failed to put object because file closed, file path {}",
508
0
                    _obj_storage_path_opts.path.native());
509
0
        buf.set_status(Status::InternalError<false>("try to put closed file"));
510
0
        return;
511
0
    }
512
21.5k
    const auto& client = _obj_client->get();
513
21.5k
    if (nullptr == client) {
514
0
        buf.set_status(Status::InternalError<false>("invalid obj storage client"));
515
0
        return;
516
0
    }
517
21.5k
    TEST_SYNC_POINT_RETURN_WITH_VOID("S3FileWriter::_put_object", this, &buf);
518
21.5k
    auto resp = client->put_object(_obj_storage_path_opts, buf.get_string_view_data());
519
21.5k
    timer.stop();
520
521
21.5k
    if (resp.status.code != ErrorCode::OK) {
522
11
        LOG_WARNING("failed to put object, put object failed because {}, file path {}, time={}ms",
523
11
                    resp.status.msg, _obj_storage_path_opts.path.native(),
524
11
                    timer.elapsed_time_milliseconds());
525
11
        buf.set_status({resp.status.code, std::move(resp.status.msg)});
526
11
        return;
527
11
    }
528
529
21.5k
    auto st = check_after_upload(client.get(), resp, _obj_storage_path_opts, _bytes_appended,
530
21.5k
                                 "put_object");
531
21.5k
    if (!st.ok()) {
532
0
        buf.set_status(st);
533
0
        return;
534
0
    }
535
536
21.5k
    LOG(INFO) << "put_object " << _obj_storage_path_opts.path.native()
537
21.5k
              << " size=" << _bytes_appended << " time=" << timer.elapsed_time_milliseconds()
538
21.5k
              << "ms";
539
21.5k
    s3_file_created_total << 1;
540
21.5k
    s3_bytes_written_total << buf.get_size();
541
21.5k
}
542
543
3
std::string S3FileWriter::_dump_completed_part() const {
544
3
    std::stringstream ss;
545
3
    ss << "part_numbers:";
546
3
    for (const auto& part : _completed_parts) {
547
2
        ss << " " << part.part_num;
548
2
    }
549
3
    return ss.str();
550
3
}
551
552
} // namespace doris::io