Coverage Report

Created: 2026-05-09 20:48

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/s3_file_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/s3_file_writer.h"
19
20
#include <aws/s3/model/CompletedPart.h>
21
#include <bvar/recorder.h>
22
#include <bvar/reducer.h>
23
#include <bvar/window.h>
24
#include <fmt/core.h>
25
#include <glog/logging.h>
26
27
#include <sstream>
28
#include <tuple>
29
#include <utility>
30
31
#include "common/config.h"
32
#include "common/status.h"
33
#include "cpp/sync_point.h"
34
#include "io/cache/block_file_cache.h"
35
#include "io/cache/block_file_cache_factory.h"
36
#include "io/cache/file_block.h"
37
#include "io/cache/file_cache_common.h"
38
#include "io/fs/file_writer.h"
39
#include "io/fs/path.h"
40
#include "io/fs/s3_file_bufferpool.h"
41
#include "io/fs/s3_file_system.h"
42
#include "io/fs/s3_obj_storage_client.h"
43
#include "runtime/exec_env.h"
44
#include "util/debug_points.h"
45
#include "util/s3_util.h"
46
#include "util/stopwatch.hpp"
47
48
namespace doris::io {
49
50
bvar::Adder<uint64_t> s3_file_writer_total("s3_file_writer_total_num");
51
bvar::Adder<uint64_t> s3_bytes_written_total("s3_file_writer_bytes_written");
52
bvar::Adder<uint64_t> s3_file_created_total("s3_file_writer_file_created");
53
bvar::Adder<uint64_t> s3_file_being_written("s3_file_writer_file_being_written");
54
bvar::Adder<uint64_t> s3_file_writer_async_close_queuing("s3_file_writer_async_close_queuing");
55
bvar::Adder<uint64_t> s3_file_writer_async_close_processing(
56
        "s3_file_writer_async_close_processing");
57
bvar::IntRecorder s3_file_writer_first_append_to_close_ms_recorder;
58
bvar::Window<bvar::IntRecorder> s3_file_writer_first_append_to_close_ms_window(
59
        "s3_file_writer_first_append_to_close_ms",
60
        &s3_file_writer_first_append_to_close_ms_recorder, /*window_size=*/10);
61
62
S3FileWriter::S3FileWriter(std::shared_ptr<ObjClientHolder> client, std::string bucket,
63
                           std::string key, const FileWriterOptions* opts)
64
2.07k
        : _obj_storage_path_opts({.path = fmt::format("s3://{}/{}", bucket, key),
65
2.07k
                                  .bucket = std::move(bucket),
66
2.07k
                                  .key = std::move(key)}),
67
2.07k
          _used_by_s3_committer(opts ? opts->used_by_s3_committer : false),
68
2.07k
          _obj_client(std::move(client)) {
69
2.07k
    s3_file_writer_total << 1;
70
2.07k
    s3_file_being_written << 1;
71
2.07k
    Aws::Http::SetCompliantRfc3986Encoding(true);
72
73
2.07k
    init_cache_builder(opts, _obj_storage_path_opts.path);
74
2.07k
}
75
76
2.08k
S3FileWriter::~S3FileWriter() {
77
2.08k
    if (_async_close_pack != nullptr) {
78
        // For thread safety
79
0
        std::ignore = _async_close_pack->future.get();
80
0
        _async_close_pack = nullptr;
81
2.08k
    } else {
82
        // Consider one situation where the file writer is destructed after it submit at least one async task
83
        // without calling close(), then there exists one occasion where the async task is executed right after
84
        // the correspoding S3 file writer is already destructed
85
2.08k
        _wait_until_finish(fmt::format("wait s3 file {} upload to be finished",
86
2.08k
                                       _obj_storage_path_opts.path.native()));
87
2.08k
    }
88
    // We won't do S3 abort operation in BE, we let s3 service do it own.
89
2.08k
    if (state() == State::OPENED && !_failed) {
90
1.01k
        s3_bytes_written_total << _bytes_appended;
91
1.01k
    }
92
2.08k
    s3_file_being_written << -1;
93
2.08k
}
94
95
50
Status S3FileWriter::_create_multi_upload_request() {
96
50
    LOG(INFO) << "create_multi_upload_request " << _obj_storage_path_opts.path.native();
97
50
    const auto& client = _obj_client->get();
98
50
    if (nullptr == client) {
99
0
        return Status::InternalError<false>("invalid obj storage client");
100
0
    }
101
50
    auto resp = client->create_multipart_upload(_obj_storage_path_opts);
102
50
    if (resp.resp.status.code == ErrorCode::OK) {
103
49
        _obj_storage_path_opts.upload_id = resp.upload_id;
104
49
    }
105
50
    return {resp.resp.status.code, std::move(resp.resp.status.msg)};
106
50
}
107
108
3.14k
void S3FileWriter::_wait_until_finish(std::string_view task_name) {
109
3.14k
    auto timeout_duration = config::s3_file_writer_log_interval_second;
110
3.14k
    auto msg = fmt::format(
111
3.14k
            "{} multipart upload already takes {} seconds, bucket={}, key={}, upload_id={}",
112
3.14k
            task_name, timeout_duration, _obj_storage_path_opts.bucket,
113
3.14k
            _obj_storage_path_opts.path.native(),
114
3.14k
            _obj_storage_path_opts.upload_id.has_value() ? *_obj_storage_path_opts.upload_id : "");
115
3.14k
    timespec current_time;
116
    // We don't need high accuracy here, so we use time(nullptr)
117
    // since it's the fastest way to get current time(second)
118
3.14k
    auto current_time_second = time(nullptr);
119
3.14k
    current_time.tv_sec = current_time_second + timeout_duration;
120
3.14k
    current_time.tv_nsec = 0;
121
    // bthread::countdown_event::timed_wait() should use absolute time
122
3.14k
    while (0 != _countdown_event.timed_wait(current_time)) {
123
0
        current_time.tv_sec += timeout_duration;
124
0
        LOG(WARNING) << msg;
125
0
    }
126
3.14k
}
127
128
1.07k
Status S3FileWriter::close(bool non_block) {
129
1.07k
    if (state() == State::CLOSED) {
130
0
        return Status::InternalError("S3FileWriter already closed, file path {}, file key {}",
131
0
                                     _obj_storage_path_opts.path.native(),
132
0
                                     _obj_storage_path_opts.key);
133
0
    }
134
1.07k
    if (state() == State::ASYNC_CLOSING) {
135
8
        if (non_block) {
136
0
            return Status::InternalError("Don't submit async close multi times");
137
0
        }
138
8
        CHECK(_async_close_pack != nullptr);
139
8
        _st = _async_close_pack->future.get();
140
8
        _async_close_pack = nullptr;
141
        // We should wait for all the pre async task to be finished
142
8
        _state = State::CLOSED;
143
        // The next time we call close() with no matter non_block true or false, it would always return the
144
        // '_st' value because this writer is already closed.
145
8
        if (!non_block && _st.ok()) {
146
3
            _record_close_latency();
147
3
        }
148
8
        return _st;
149
8
    }
150
1.06k
    if (non_block) {
151
1.00k
        _state = State::ASYNC_CLOSING;
152
1.00k
        _async_close_pack = std::make_unique<AsyncCloseStatusPack>();
153
1.00k
        _async_close_pack->future = _async_close_pack->promise.get_future();
154
1.00k
        s3_file_writer_async_close_queuing << 1;
155
1.00k
        return ExecEnv::GetInstance()->non_block_close_thread_pool()->submit_func([&]() {
156
1.00k
            s3_file_writer_async_close_queuing << -1;
157
1.00k
            s3_file_writer_async_close_processing << 1;
158
1.00k
            _st = _close_impl();
159
1.00k
            _async_close_pack->promise.set_value(_st);
160
1.00k
            s3_file_writer_async_close_processing << -1;
161
1.00k
        });
162
1.00k
    }
163
59
    _st = _close_impl();
164
59
    _state = State::CLOSED;
165
59
    if (!non_block && _st.ok()) {
166
57
        _record_close_latency();
167
57
    }
168
59
    return _st;
169
1.06k
}
170
171
1.06k
void S3FileWriter::_record_close_latency() {
172
1.06k
    if (_close_latency_recorded || !_first_append_timestamp.has_value()) {
173
1
        return;
174
1
    }
175
1.05k
    auto now = std::chrono::steady_clock::now();
176
1.05k
    auto latency_ms =
177
1.05k
            std::chrono::duration_cast<std::chrono::milliseconds>(now - *_first_append_timestamp)
178
1.05k
                    .count();
179
1.05k
    s3_file_writer_first_append_to_close_ms_recorder << latency_ms;
180
1.05k
    if (auto* sampler = s3_file_writer_first_append_to_close_ms_recorder.get_sampler()) {
181
1.05k
        sampler->take_sample();
182
1.05k
    }
183
1.05k
    _close_latency_recorded = true;
184
1.05k
}
185
186
1.00k
Status S3FileWriter::try_finish_close() {
187
1.00k
    if (state() == State::CLOSED) {
188
0
        return _st;
189
0
    }
190
1.00k
    if (state() != State::ASYNC_CLOSING) {
191
0
        return Status::NotSupported("S3FileWriter is not async closing");
192
0
    }
193
1.00k
    CHECK(_async_close_pack != nullptr);
194
1.00k
    if (_async_close_pack->future.wait_for(std::chrono::seconds(0)) != std::future_status::ready) {
195
1
        return Status::NeedSendAgain("async close is not finished");
196
1
    }
197
1.00k
    _st = _async_close_pack->future.get();
198
1.00k
    _async_close_pack = nullptr;
199
1.00k
    _state = State::CLOSED;
200
1.00k
    if (_st.ok()) {
201
1.00k
        _record_close_latency();
202
1.00k
    }
203
1.00k
    return _st;
204
1.00k
}
205
206
1.29k
bool S3FileWriter::_complete_part_task_callback(Status s) {
207
1.29k
    bool ret = false;
208
1.29k
    if (!s.ok()) [[unlikely]] {
209
3
        VLOG_NOTICE << "failed at key: " << _obj_storage_path_opts.key
210
0
                    << ", status: " << s.to_string();
211
3
        std::unique_lock<std::mutex> _lck {_completed_lock};
212
3
        _failed = true;
213
3
        ret = true;
214
3
        _st = std::move(s);
215
3
    }
216
    // After the signal, there is a scenario where the previous invocation of _wait_until_finish
217
    // returns to the caller, and subsequently, the S3 file writer is destructed.
218
    // This means that accessing _failed afterwards would result in a heap use after free vulnerability.
219
1.29k
    _countdown_event.signal();
220
1.29k
    return ret;
221
1.29k
}
222
223
1.29k
Status S3FileWriter::_build_upload_buffer() {
224
1.29k
    auto builder = FileBufferBuilder();
225
1.29k
    builder.set_type(BufferType::UPLOAD)
226
1.29k
            .set_upload_callback([part_num = _cur_part_num, this](UploadFileBuffer& buf) {
227
272
                _upload_one_part(part_num, buf);
228
272
            })
229
1.29k
            .set_file_offset(_bytes_appended)
230
1.29k
            .set_sync_after_complete_task([this](auto&& PH1) {
231
1.29k
                return _complete_part_task_callback(std::forward<decltype(PH1)>(PH1));
232
1.29k
            })
233
1.56k
            .set_is_cancelled([this]() { return _failed.load(); });
234
1.29k
    if (_cache_builder != nullptr) {
235
        // We would load the data into file cache asynchronously which indicates
236
        // that this instance of S3FileWriter might have been destructed when we
237
        // try to do writing into file cache, so we make the lambda capture the variable
238
        // we need by value to extend their lifetime
239
0
        int64_t id = get_tablet_id(_obj_storage_path_opts.path.native()).value_or(0);
240
0
        builder.set_allocate_file_blocks_holder([builder = *_cache_builder,
241
0
                                                 offset = _bytes_appended,
242
0
                                                 tablet_id = id]() -> FileBlocksHolderPtr {
243
0
            return builder.allocate_cache_holder(offset, config::s3_write_buffer_size, tablet_id);
244
0
        });
245
0
    }
246
1.29k
    RETURN_IF_ERROR(builder.build(&_pending_buf));
247
1.29k
    auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
248
1.29k
    DCHECK(buf != nullptr);
249
1.29k
    return Status::OK();
250
1.29k
}
251
252
1.06k
Status S3FileWriter::_close_impl() {
253
1.06k
    VLOG_DEBUG << "S3FileWriter::close, path: " << _obj_storage_path_opts.path.native();
254
255
1.06k
    DBUG_EXECUTE_IF("S3FileWriter._close_impl.inject_error", {
256
1.06k
        if (_obj_storage_path_opts.key.ends_with(".dat")) {
257
1.06k
            return Status::IOError("S3FileWriter._close_impl.inject_error");
258
1.06k
        }
259
1.06k
    });
260
261
1.06k
    if (_cur_part_num == 1 && _pending_buf) { // data size is less than config::s3_write_buffer_size
262
1.01k
        RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size());
263
1.01k
    }
264
265
1.06k
    if (_bytes_appended == 0) {
266
4
        DCHECK_EQ(_cur_part_num, 1);
267
        // No data written, but need to create an empty file
268
4
        RETURN_IF_ERROR(_build_upload_buffer());
269
4
        if (!_used_by_s3_committer) {
270
4
            auto* pending_buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
271
4
            pending_buf->set_upload_to_remote([this](UploadFileBuffer& buf) { _put_object(buf); });
272
4
        } else {
273
0
            RETURN_IF_ERROR(_create_multi_upload_request());
274
0
        }
275
4
    }
276
277
1.06k
    if (_pending_buf != nullptr) { // there is remaining data in buffer need to be uploaded
278
1.05k
        _countdown_event.add_count();
279
1.05k
        RETURN_IF_ERROR(FileBuffer::submit(std::move(_pending_buf)));
280
1.05k
        _pending_buf = nullptr;
281
1.05k
    }
282
283
1.06k
    RETURN_IF_ERROR(_complete());
284
1.06k
    SYNC_POINT_RETURN_WITH_VALUE("s3_file_writer::close", Status());
285
286
1.06k
    return Status::OK();
287
1.06k
}
288
289
10.4k
Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
290
10.4k
    if (state() != State::OPENED) [[unlikely]] {
291
0
        return Status::InternalError("append to closed file: {}",
292
0
                                     _obj_storage_path_opts.path.native());
293
0
    }
294
295
10.4k
    if (!_first_append_timestamp.has_value()) {
296
1.06k
        _first_append_timestamp = std::chrono::steady_clock::now();
297
1.06k
    }
298
299
10.4k
    size_t buffer_size = config::s3_write_buffer_size;
300
10.4k
    TEST_SYNC_POINT_RETURN_WITH_VALUE("s3_file_writer::appenv", Status());
301
20.8k
    for (size_t i = 0; i < data_cnt; i++) {
302
10.4k
        size_t data_size = data[i].get_size();
303
21.0k
        for (size_t pos = 0, data_size_to_append = 0; pos < data_size; pos += data_size_to_append) {
304
10.6k
            if (_failed) {
305
0
                return _st;
306
0
            }
307
10.6k
            if (!_pending_buf) {
308
1.29k
                RETURN_IF_ERROR(_build_upload_buffer());
309
1.29k
            }
310
            // we need to make sure all parts except the last one to be 5MB or more
311
            // and shouldn't be larger than buf
312
10.6k
            data_size_to_append = std::min(data_size - pos, _pending_buf->get_file_offset() +
313
10.6k
                                                                    buffer_size - _bytes_appended);
314
315
            // if the buffer has memory buf inside, the data would be written into memory first then S3 then file cache
316
            // it would be written to cache then S3 if the buffer doesn't have memory preserved
317
10.6k
            RETURN_IF_ERROR(_pending_buf->append_data(
318
10.6k
                    Slice {data[i].get_data() + pos, data_size_to_append}));
319
10.6k
            TEST_SYNC_POINT_CALLBACK("s3_file_writer::appenv_1", &_pending_buf, _cur_part_num);
320
321
            // If this is the last part and the data size is less than s3_write_buffer_size,
322
            // the pending_buf will be handled by _close_impl() and _complete()
323
            // If this is the last part and the data size is equal to s3_write_buffer_size,
324
            // the pending_buf is handled here and submitted. it will be waited by _complete()
325
10.6k
            if (_pending_buf->get_size() == buffer_size) {
326
                // only create multiple upload request when the data size is
327
                // larger or equal to s3_write_buffer_size than one memory buffer
328
237
                if (_cur_part_num == 1) {
329
50
                    RETURN_IF_ERROR(_create_multi_upload_request());
330
50
                }
331
236
                _cur_part_num++;
332
236
                _countdown_event.add_count();
333
236
                RETURN_IF_ERROR(FileBuffer::submit(std::move(_pending_buf)));
334
236
                _pending_buf = nullptr;
335
236
            }
336
10.6k
            _bytes_appended += data_size_to_append;
337
10.6k
        }
338
10.4k
    }
339
10.4k
    return Status::OK();
340
10.4k
}
341
342
272
void S3FileWriter::_upload_one_part(int part_num, UploadFileBuffer& buf) {
343
272
    VLOG_DEBUG << "upload_one_part " << _obj_storage_path_opts.path.native()
344
0
               << " part=" << part_num;
345
272
    if (buf.is_cancelled()) {
346
0
        LOG_INFO("file {} skip part {} because previous failure {}",
347
0
                 _obj_storage_path_opts.path.native(), part_num, _st);
348
0
        return;
349
0
    }
350
272
    const auto& client = _obj_client->get();
351
272
    if (nullptr == client) {
352
0
        LOG_WARNING("failed to upload part, key={}, part_num={} bacause of null obj client",
353
0
                    _obj_storage_path_opts.key, part_num);
354
0
        buf.set_status(Status::InternalError<false>("invalid obj storage client"));
355
0
        return;
356
0
    }
357
272
    auto resp = client->upload_part(_obj_storage_path_opts, buf.get_string_view_data(), part_num);
358
272
    if (resp.resp.status.code != ErrorCode::OK) {
359
1
        LOG_WARNING("failed to upload part, key={}, part_num={}, status={}",
360
1
                    _obj_storage_path_opts.key, part_num, resp.resp.status.msg);
361
1
        buf.set_status(Status(resp.resp.status.code, std::move(resp.resp.status.msg)));
362
1
        return;
363
1
    }
364
271
    s3_bytes_written_total << buf.get_size();
365
366
271
    ObjectCompleteMultiPart completed_part {
367
271
            static_cast<int>(part_num), resp.etag.has_value() ? std::move(resp.etag.value()) : ""};
368
369
271
    std::unique_lock<std::mutex> lck {_completed_lock};
370
271
    _completed_parts.emplace_back(std::move(completed_part));
371
271
}
372
373
// if enabled check
374
// 1. issue a head object request for existence check
375
// 2. check the file size
376
Status check_after_upload(ObjStorageClient* client, const ObjectStorageResponse& upload_res,
377
                          const ObjectStoragePathOptions& path_opt, int64_t bytes_appended,
378
1.06k
                          const std::string& put_or_comp) {
379
1.06k
    if (!config::enable_s3_object_check_after_upload) return Status::OK();
380
381
1.06k
    auto head_res = client->head_object(path_opt);
382
383
    // clang-format off
384
1.06k
    auto err_msg = [&]() {
385
0
        std::stringstream ss;
386
0
        ss << "failed to check object after upload=" << put_or_comp
387
0
            << " file_path=" << path_opt.path.native()
388
0
            << fmt::format(" {}_err=", put_or_comp) << upload_res.status.msg
389
0
            << fmt::format(" {}_code=", put_or_comp) << upload_res.status.code
390
0
            << fmt::format(" {}_http_code=", put_or_comp) << upload_res.http_code
391
0
            << fmt::format(" {}_request_id=", put_or_comp) << upload_res.request_id
392
0
            << " head_err=" << head_res.resp.status.msg
393
0
            << " head_code=" << head_res.resp.status.code
394
0
            << " head_http_code=" << head_res.resp.http_code
395
0
            << " head_request_id=" << head_res.resp.request_id;
396
0
        return ss.str();
397
0
    };
398
    // clang-format on
399
400
    // TODO(gavin): make it fail by injection
401
1.06k
    TEST_SYNC_POINT_CALLBACK("S3FileWriter::check_after_load", &head_res);
402
1.06k
    if (head_res.resp.status.code != ErrorCode::OK && head_res.resp.http_code != 200) {
403
0
        LOG(WARNING) << "failed to issue head object after upload, " << err_msg();
404
0
        DCHECK(false) << "failed to issue head object after upload, " << err_msg();
405
        // FIXME(gavin): we should retry if this HEAD fails?
406
0
        return Status::IOError(
407
0
                "failed to issue head object after upload, status_code={}, http_code={}, err={}",
408
0
                head_res.resp.status.code, head_res.resp.http_code, head_res.resp.status.msg);
409
0
    }
410
1.06k
    if (head_res.file_size != bytes_appended) {
411
0
        LOG(WARNING) << "failed to check size after upload, expected_size=" << bytes_appended
412
0
                     << " actual_size=" << head_res.file_size << err_msg();
413
0
        DCHECK_EQ(bytes_appended, head_res.file_size)
414
0
                << "failed to check size after upload," << err_msg();
415
0
        return Status::IOError(
416
0
                "failed to check object size after upload, expected_size={} actual_size={}",
417
0
                bytes_appended, head_res.file_size);
418
0
    }
419
1.06k
    return Status::OK();
420
1.06k
}
421
422
1.06k
Status S3FileWriter::_complete() {
423
1.06k
    const auto& client = _obj_client->get();
424
1.06k
    if (nullptr == client) {
425
0
        return Status::InternalError<false>("invalid obj storage client");
426
0
    }
427
1.06k
    if (_failed) {
428
0
        _wait_until_finish("early quit");
429
0
        return _st;
430
0
    }
431
    // When the part num is only one, it means the data is less than 5MB so we can just put it.
432
1.06k
    if (_cur_part_num == 1) {
433
1.01k
        _wait_until_finish("PutObject");
434
1.01k
        return _st;
435
1.01k
    }
436
    // Wait multipart load and finish.
437
48
    _wait_until_finish("Complete");
438
48
    TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:1",
439
48
                             std::make_pair(&_failed, &_completed_parts));
440
48
    if (_used_by_s3_committer) {    // S3 committer will complete multipart upload file on FE side.
441
0
        s3_file_created_total << 1; // Assume that it will be created successfully
442
0
        return Status::OK();
443
0
    }
444
445
    // check number of parts
446
48
    int64_t expected_num_parts1 = (_bytes_appended / config::s3_write_buffer_size) +
447
48
                                  !!(_bytes_appended % config::s3_write_buffer_size);
448
48
    int64_t expected_num_parts2 =
449
48
            (_bytes_appended % config::s3_write_buffer_size) ? _cur_part_num : _cur_part_num - 1;
450
48
    DCHECK_EQ(expected_num_parts1, expected_num_parts2)
451
0
            << " bytes_appended=" << _bytes_appended << " cur_part_num=" << _cur_part_num
452
0
            << " s3_write_buffer_size=" << config::s3_write_buffer_size;
453
48
    if (_failed || _completed_parts.size() != static_cast<size_t>(expected_num_parts1) ||
454
48
        expected_num_parts1 != expected_num_parts2) {
455
3
        _st = Status::InternalError(
456
3
                "failed to complete multipart upload, error status={} failed={} #complete_parts={} "
457
3
                "#expected_parts={} "
458
3
                "completed_parts_list={} file_path={} file_size={} has left buffer not uploaded={}",
459
3
                _st, _failed, _completed_parts.size(), expected_num_parts1, _dump_completed_part(),
460
3
                _obj_storage_path_opts.path.native(), _bytes_appended, _pending_buf != nullptr);
461
3
        LOG(WARNING) << _st;
462
3
        return _st;
463
3
    }
464
    // make sure _completed_parts are ascending order
465
45
    std::sort(_completed_parts.begin(), _completed_parts.end(),
466
749
              [](auto& p1, auto& p2) { return p1.part_num < p2.part_num; });
467
45
    TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:2", &_completed_parts);
468
45
    LOG(INFO) << "complete_multipart_upload " << _obj_storage_path_opts.path.native()
469
45
              << " size=" << _bytes_appended << " number_parts=" << _completed_parts.size()
470
45
              << " s3_write_buffer_size=" << config::s3_write_buffer_size;
471
45
    auto resp = client->complete_multipart_upload(_obj_storage_path_opts, _completed_parts);
472
45
    if (resp.status.code != ErrorCode::OK) {
473
2
        LOG_WARNING("failed to complete multipart upload, err={}, file_path={}", resp.status.msg,
474
2
                    _obj_storage_path_opts.path.native());
475
2
        return {resp.status.code, std::move(resp.status.msg)};
476
2
    }
477
478
43
    RETURN_IF_ERROR(check_after_upload(client.get(), resp, _obj_storage_path_opts, _bytes_appended,
479
43
                                       "complete_multipart"));
480
481
43
    s3_file_created_total << 1;
482
43
    return Status::OK();
483
43
}
484
485
1.01k
Status S3FileWriter::_set_upload_to_remote_less_than_buffer_size() {
486
1.01k
    auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
487
1.01k
    DCHECK(buf != nullptr);
488
1.01k
    if (_used_by_s3_committer) {
489
        // If used_by_s3_committer, we always use multi-parts uploading.
490
0
        buf->set_upload_to_remote([part_num = _cur_part_num, this](UploadFileBuffer& buf) {
491
0
            _upload_one_part(part_num, buf);
492
0
        });
493
0
        DCHECK(_cur_part_num == 1);
494
0
        RETURN_IF_ERROR(_create_multi_upload_request());
495
1.01k
    } else {
496
        // if we only need to upload one file less than 5MB, we can just
497
        // call PutObject to reduce the network IO
498
1.01k
        buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); });
499
1.01k
    }
500
1.01k
    return Status::OK();
501
1.01k
}
502
503
1.01k
void S3FileWriter::_put_object(UploadFileBuffer& buf) {
504
1.01k
    MonotonicStopWatch timer;
505
1.01k
    timer.start();
506
507
1.01k
    if (state() == State::CLOSED) {
508
0
        DCHECK(state() != State::CLOSED)
509
0
                << "state=" << (int)state() << " path=" << _obj_storage_path_opts.path.native();
510
0
        LOG_WARNING("failed to put object because file closed, file path {}",
511
0
                    _obj_storage_path_opts.path.native());
512
0
        buf.set_status(Status::InternalError<false>("try to put closed file"));
513
0
        return;
514
0
    }
515
1.01k
    const auto& client = _obj_client->get();
516
1.01k
    if (nullptr == client) {
517
0
        buf.set_status(Status::InternalError<false>("invalid obj storage client"));
518
0
        return;
519
0
    }
520
1.01k
    TEST_SYNC_POINT_RETURN_WITH_VOID("S3FileWriter::_put_object", this, &buf);
521
1.01k
    auto resp = client->put_object(_obj_storage_path_opts, buf.get_string_view_data());
522
1.01k
    timer.stop();
523
524
1.01k
    if (resp.status.code != ErrorCode::OK) {
525
0
        LOG_WARNING("failed to put object, put object failed because {}, file path {}, time={}ms",
526
0
                    resp.status.msg, _obj_storage_path_opts.path.native(),
527
0
                    timer.elapsed_time_milliseconds());
528
0
        buf.set_status({resp.status.code, std::move(resp.status.msg)});
529
0
        return;
530
0
    }
531
532
1.01k
    auto st = check_after_upload(client.get(), resp, _obj_storage_path_opts, _bytes_appended,
533
1.01k
                                 "put_object");
534
1.01k
    if (!st.ok()) {
535
0
        buf.set_status(st);
536
0
        return;
537
0
    }
538
539
1.01k
    LOG(INFO) << "put_object " << _obj_storage_path_opts.path.native()
540
1.01k
              << " size=" << _bytes_appended << " time=" << timer.elapsed_time_milliseconds()
541
1.01k
              << "ms";
542
1.01k
    s3_file_created_total << 1;
543
1.01k
    s3_bytes_written_total << buf.get_size();
544
1.01k
}
545
546
3
std::string S3FileWriter::_dump_completed_part() const {
547
3
    std::stringstream ss;
548
3
    ss << "part_numbers:";
549
3
    for (const auto& part : _completed_parts) {
550
2
        ss << " " << part.part_num;
551
2
    }
552
3
    return ss.str();
553
3
}
554
555
} // namespace doris::io