Coverage Report

Created: 2026-03-12 17:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/s3_file_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/s3_file_writer.h"
19
20
#include <aws/s3/model/CompletedPart.h>
21
#include <bvar/recorder.h>
22
#include <bvar/reducer.h>
23
#include <bvar/window.h>
24
#include <fmt/core.h>
25
#include <glog/logging.h>
26
27
#include <sstream>
28
#include <tuple>
29
#include <utility>
30
31
#include "common/config.h"
32
#include "common/status.h"
33
#include "cpp/sync_point.h"
34
#include "io/cache/block_file_cache.h"
35
#include "io/cache/block_file_cache_factory.h"
36
#include "io/cache/file_block.h"
37
#include "io/cache/file_cache_common.h"
38
#include "io/fs/file_writer.h"
39
#include "io/fs/path.h"
40
#include "io/fs/s3_file_bufferpool.h"
41
#include "io/fs/s3_file_system.h"
42
#include "io/fs/s3_obj_storage_client.h"
43
#include "runtime/exec_env.h"
44
#include "util/s3_util.h"
45
#include "util/stopwatch.hpp"
46
47
namespace doris::io {
48
#include "common/compile_check_begin.h"
49
50
bvar::Adder<uint64_t> s3_file_writer_total("s3_file_writer_total_num");
51
bvar::Adder<uint64_t> s3_bytes_written_total("s3_file_writer_bytes_written");
52
bvar::Adder<uint64_t> s3_file_created_total("s3_file_writer_file_created");
53
bvar::Adder<uint64_t> s3_file_being_written("s3_file_writer_file_being_written");
54
bvar::Adder<uint64_t> s3_file_writer_async_close_queuing("s3_file_writer_async_close_queuing");
55
bvar::Adder<uint64_t> s3_file_writer_async_close_processing(
56
        "s3_file_writer_async_close_processing");
57
bvar::IntRecorder s3_file_writer_first_append_to_close_ms_recorder;
58
bvar::Window<bvar::IntRecorder> s3_file_writer_first_append_to_close_ms_window(
59
        "s3_file_writer_first_append_to_close_ms",
60
        &s3_file_writer_first_append_to_close_ms_recorder, /*window_size=*/10);
61
62
S3FileWriter::S3FileWriter(std::shared_ptr<ObjClientHolder> client, std::string bucket,
63
                           std::string key, const FileWriterOptions* opts)
64
87.2k
        : _obj_storage_path_opts({.path = fmt::format("s3://{}/{}", bucket, key),
65
87.2k
                                  .bucket = std::move(bucket),
66
87.2k
                                  .key = std::move(key)}),
67
87.2k
          _used_by_s3_committer(opts ? opts->used_by_s3_committer : false),
68
87.2k
          _obj_client(std::move(client)) {
69
87.2k
    s3_file_writer_total << 1;
70
87.2k
    s3_file_being_written << 1;
71
87.2k
    Aws::Http::SetCompliantRfc3986Encoding(true);
72
73
87.2k
    init_cache_builder(opts, _obj_storage_path_opts.path);
74
87.2k
}
75
76
81.9k
S3FileWriter::~S3FileWriter() {
77
81.9k
    if (_async_close_pack != nullptr) {
78
        // For thread safety
79
493
        std::ignore = _async_close_pack->future.get();
80
493
        _async_close_pack = nullptr;
81
81.4k
    } else {
82
        // Consider one situation where the file writer is destructed after it submit at least one async task
83
        // without calling close(), then there exists one occasion where the async task is executed right after
84
        // the correspoding S3 file writer is already destructed
85
81.4k
        _wait_until_finish(fmt::format("wait s3 file {} upload to be finished",
86
81.4k
                                       _obj_storage_path_opts.path.native()));
87
81.4k
    }
88
    // We won't do S3 abort operation in BE, we let s3 service do it own.
89
81.9k
    if (state() == State::OPENED && !_failed) {
90
63.1k
        s3_bytes_written_total << _bytes_appended;
91
63.1k
    }
92
81.9k
    s3_file_being_written << -1;
93
81.9k
}
94
95
124
Status S3FileWriter::_create_multi_upload_request() {
96
124
    LOG(INFO) << "create_multi_upload_request " << _obj_storage_path_opts.path.native();
97
124
    const auto& client = _obj_client->get();
98
124
    if (nullptr == client) {
99
0
        return Status::InternalError<false>("invalid obj storage client");
100
0
    }
101
124
    auto resp = client->create_multipart_upload(_obj_storage_path_opts);
102
124
    if (resp.resp.status.code == ErrorCode::OK) {
103
123
        _obj_storage_path_opts.upload_id = resp.upload_id;
104
123
    }
105
124
    return {resp.resp.status.code, std::move(resp.resp.status.msg)};
106
124
}
107
108
106k
void S3FileWriter::_wait_until_finish(std::string_view task_name) {
109
106k
    auto timeout_duration = config::s3_file_writer_log_interval_second;
110
106k
    auto msg = fmt::format(
111
106k
            "{} multipart upload already takes {} seconds, bucket={}, key={}, upload_id={}",
112
106k
            task_name, timeout_duration, _obj_storage_path_opts.bucket,
113
106k
            _obj_storage_path_opts.path.native(),
114
106k
            _obj_storage_path_opts.upload_id.has_value() ? *_obj_storage_path_opts.upload_id : "");
115
106k
    timespec current_time;
116
    // We don't need high accuracy here, so we use time(nullptr)
117
    // since it's the fastest way to get current time(second)
118
106k
    auto current_time_second = time(nullptr);
119
106k
    current_time.tv_sec = current_time_second + timeout_duration;
120
106k
    current_time.tv_nsec = 0;
121
    // bthread::countdown_event::timed_wait() should use absolute time
122
106k
    while (0 != _countdown_event.timed_wait(current_time)) {
123
0
        current_time.tv_sec += timeout_duration;
124
0
        LOG(WARNING) << msg;
125
0
    }
126
106k
}
127
128
60.1k
Status S3FileWriter::close(bool non_block) {
129
60.1k
    auto record_close_latency = [this]() {
130
24.1k
        if (_close_latency_recorded || !_first_append_timestamp.has_value()) {
131
5
            return;
132
5
        }
133
24.1k
        auto now = std::chrono::steady_clock::now();
134
24.1k
        auto latency_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
135
24.1k
                                  now - *_first_append_timestamp)
136
24.1k
                                  .count();
137
24.1k
        s3_file_writer_first_append_to_close_ms_recorder << latency_ms;
138
24.1k
        if (auto* sampler = s3_file_writer_first_append_to_close_ms_recorder.get_sampler()) {
139
24.1k
            sampler->take_sample();
140
24.1k
        }
141
24.1k
        _close_latency_recorded = true;
142
24.1k
    };
143
144
60.1k
    if (state() == State::CLOSED) {
145
29.3k
        if (_async_close_pack != nullptr) {
146
14.7k
            _st = _async_close_pack->future.get();
147
14.7k
            _async_close_pack = nullptr;
148
            // Return the final close status so that a blocking close issued after
149
            // an async close observes the real result just like the legacy behavior.
150
14.7k
            if (!non_block && _st.ok()) {
151
91
                record_close_latency();
152
91
            }
153
14.7k
            return _st;
154
14.7k
        }
155
14.6k
        if (non_block) {
156
14.6k
            if (_st.ok()) {
157
14.6k
                record_close_latency();
158
14.6k
                return Status::Error<ErrorCode::ALREADY_CLOSED>(
159
14.6k
                        "S3FileWriter already closed, file path {}, file key {}",
160
14.6k
                        _obj_storage_path_opts.path.native(), _obj_storage_path_opts.key);
161
14.6k
            }
162
0
            return _st;
163
14.6k
        }
164
0
        if (_st.ok()) {
165
0
            record_close_latency();
166
0
            return Status::Error<ErrorCode::ALREADY_CLOSED>(
167
0
                    "S3FileWriter already closed, file path {}, file key {}",
168
0
                    _obj_storage_path_opts.path.native(), _obj_storage_path_opts.key);
169
0
        }
170
0
        return _st;
171
0
    }
172
30.7k
    if (state() == State::ASYNC_CLOSING) {
173
6.08k
        if (non_block) {
174
0
            return Status::InternalError("Don't submit async close multi times");
175
0
        }
176
6.08k
        CHECK(_async_close_pack != nullptr);
177
6.08k
        _st = _async_close_pack->future.get();
178
6.08k
        _async_close_pack = nullptr;
179
        // We should wait for all the pre async task to be finished
180
6.08k
        _state = State::CLOSED;
181
        // The next time we call close() with no matter non_block true or false, it would always return the
182
        // '_st' value because this writer is already closed.
183
6.09k
        if (!non_block && _st.ok()) {
184
6.08k
            record_close_latency();
185
6.08k
        }
186
6.08k
        return _st;
187
6.08k
    }
188
24.6k
    if (non_block) {
189
21.3k
        _state = State::ASYNC_CLOSING;
190
21.3k
        _async_close_pack = std::make_unique<AsyncCloseStatusPack>();
191
21.3k
        _async_close_pack->future = _async_close_pack->promise.get_future();
192
21.3k
        s3_file_writer_async_close_queuing << 1;
193
21.3k
        return ExecEnv::GetInstance()->non_block_close_thread_pool()->submit_func([&]() {
194
21.3k
            s3_file_writer_async_close_queuing << -1;
195
21.3k
            s3_file_writer_async_close_processing << 1;
196
21.3k
            _st = _close_impl();
197
21.3k
            _state = State::CLOSED;
198
21.3k
            _async_close_pack->promise.set_value(_st);
199
21.3k
            s3_file_writer_async_close_processing << -1;
200
21.3k
        });
201
21.3k
    }
202
3.33k
    _st = _close_impl();
203
3.33k
    _state = State::CLOSED;
204
3.34k
    if (!non_block && _st.ok()) {
205
3.32k
        record_close_latency();
206
3.32k
    }
207
3.33k
    return _st;
208
24.6k
}
209
210
25.0k
bool S3FileWriter::_complete_part_task_callback(Status s) {
211
25.0k
    bool ret = false;
212
25.0k
    if (!s.ok()) [[unlikely]] {
213
14
        VLOG_NOTICE << "failed at key: " << _obj_storage_path_opts.key
214
0
                    << ", status: " << s.to_string();
215
14
        std::unique_lock<std::mutex> _lck {_completed_lock};
216
14
        _failed = true;
217
14
        ret = true;
218
14
        _st = std::move(s);
219
14
    }
220
    // After the signal, there is a scenario where the previous invocation of _wait_until_finish
221
    // returns to the caller, and subsequently, the S3 file writer is destructed.
222
    // This means that accessing _failed afterwards would result in a heap use after free vulnerability.
223
25.0k
    _countdown_event.signal();
224
25.0k
    return ret;
225
25.0k
}
226
227
25.0k
Status S3FileWriter::_build_upload_buffer() {
228
25.0k
    auto builder = FileBufferBuilder();
229
25.0k
    builder.set_type(BufferType::UPLOAD)
230
25.0k
            .set_upload_callback([part_num = _cur_part_num, this](UploadFileBuffer& buf) {
231
469
                _upload_one_part(part_num, buf);
232
469
            })
233
25.0k
            .set_file_offset(_bytes_appended)
234
25.0k
            .set_sync_after_complete_task([this](auto&& PH1) {
235
25.0k
                return _complete_part_task_callback(std::forward<decltype(PH1)>(PH1));
236
25.0k
            })
237
25.4k
            .set_is_cancelled([this]() { return _failed.load(); });
238
25.0k
    if (_cache_builder != nullptr) {
239
        // We would load the data into file cache asynchronously which indicates
240
        // that this instance of S3FileWriter might have been destructed when we
241
        // try to do writing into file cache, so we make the lambda capture the variable
242
        // we need by value to extend their lifetime
243
7.23k
        int64_t id = get_tablet_id(_obj_storage_path_opts.path.native()).value_or(0);
244
7.23k
        builder.set_allocate_file_blocks_holder([builder = *_cache_builder,
245
7.23k
                                                 offset = _bytes_appended,
246
7.23k
                                                 tablet_id = id]() -> FileBlocksHolderPtr {
247
7.23k
            return builder.allocate_cache_holder(offset, config::s3_write_buffer_size, tablet_id);
248
7.23k
        });
249
7.23k
    }
250
25.0k
    RETURN_IF_ERROR(builder.build(&_pending_buf));
251
25.0k
    auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
252
25.0k
    DCHECK(buf != nullptr);
253
25.0k
    return Status::OK();
254
25.0k
}
255
256
24.6k
Status S3FileWriter::_close_impl() {
257
24.6k
    VLOG_DEBUG << "S3FileWriter::close, path: " << _obj_storage_path_opts.path.native();
258
259
24.6k
    if (_cur_part_num == 1 && _pending_buf) { // data size is less than config::s3_write_buffer_size
260
24.5k
        RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size());
261
24.5k
    }
262
263
24.6k
    if (_bytes_appended == 0) {
264
7
        DCHECK_EQ(_cur_part_num, 1);
265
        // No data written, but need to create an empty file
266
7
        RETURN_IF_ERROR(_build_upload_buffer());
267
7
        if (!_used_by_s3_committer) {
268
7
            auto* pending_buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
269
7
            pending_buf->set_upload_to_remote([this](UploadFileBuffer& buf) { _put_object(buf); });
270
7
        } else {
271
0
            RETURN_IF_ERROR(_create_multi_upload_request());
272
0
        }
273
7
    }
274
275
24.6k
    if (_pending_buf != nullptr) { // there is remaining data in buffer need to be uploaded
276
24.6k
        _countdown_event.add_count();
277
24.6k
        RETURN_IF_ERROR(FileBuffer::submit(std::move(_pending_buf)));
278
24.6k
        _pending_buf = nullptr;
279
24.6k
    }
280
281
24.6k
    RETURN_IF_ERROR(_complete());
282
24.6k
    SYNC_POINT_RETURN_WITH_VALUE("s3_file_writer::close", Status());
283
284
24.6k
    return Status::OK();
285
24.6k
}
286
287
406k
Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
288
406k
    if (state() != State::OPENED) [[unlikely]] {
289
0
        return Status::InternalError("append to closed file: {}",
290
0
                                     _obj_storage_path_opts.path.native());
291
0
    }
292
293
406k
    if (!_first_append_timestamp.has_value()) {
294
23.5k
        _first_append_timestamp = std::chrono::steady_clock::now();
295
23.5k
    }
296
297
406k
    size_t buffer_size = config::s3_write_buffer_size;
298
406k
    TEST_SYNC_POINT_RETURN_WITH_VALUE("s3_file_writer::appenv", Status());
299
1.48M
    for (size_t i = 0; i < data_cnt; i++) {
300
1.07M
        size_t data_size = data[i].get_size();
301
2.08M
        for (size_t pos = 0, data_size_to_append = 0; pos < data_size; pos += data_size_to_append) {
302
1.00M
            if (_failed) {
303
0
                return _st;
304
0
            }
305
1.00M
            if (!_pending_buf) {
306
23.6k
                RETURN_IF_ERROR(_build_upload_buffer());
307
23.6k
            }
308
            // we need to make sure all parts except the last one to be 5MB or more
309
            // and shouldn't be larger than buf
310
1.00M
            data_size_to_append = std::min(data_size - pos, _pending_buf->get_file_offset() +
311
1.00M
                                                                    buffer_size - _bytes_appended);
312
313
            // if the buffer has memory buf inside, the data would be written into memory first then S3 then file cache
314
            // it would be written to cache then S3 if the buffer doesn't have memory preserved
315
1.00M
            RETURN_IF_ERROR(_pending_buf->append_data(
316
1.00M
                    Slice {data[i].get_data() + pos, data_size_to_append}));
317
1.00M
            TEST_SYNC_POINT_CALLBACK("s3_file_writer::appenv_1", &_pending_buf, _cur_part_num);
318
319
            // If this is the last part and the data size is less than s3_write_buffer_size,
320
            // the pending_buf will be handled by _close_impl() and _complete()
321
            // If this is the last part and the data size is equal to s3_write_buffer_size,
322
            // the pending_buf is handled here and submitted. it will be waited by _complete()
323
1.00M
            if (_pending_buf->get_size() == buffer_size) {
324
                // only create multiple upload request when the data size is
325
                // larger or equal to s3_write_buffer_size than one memory buffer
326
104
                if (_cur_part_num == 1) {
327
77
                    RETURN_IF_ERROR(_create_multi_upload_request());
328
77
                }
329
104
                _cur_part_num++;
330
104
                _countdown_event.add_count();
331
104
                RETURN_IF_ERROR(FileBuffer::submit(std::move(_pending_buf)));
332
104
                _pending_buf = nullptr;
333
104
            }
334
1.00M
            _bytes_appended += data_size_to_append;
335
1.00M
        }
336
1.07M
    }
337
406k
    return Status::OK();
338
406k
}
339
340
469
void S3FileWriter::_upload_one_part(int part_num, UploadFileBuffer& buf) {
341
469
    VLOG_DEBUG << "upload_one_part " << _obj_storage_path_opts.path.native()
342
0
               << " part=" << part_num;
343
469
    if (buf.is_cancelled()) {
344
0
        LOG_INFO("file {} skip part {} because previous failure {}",
345
0
                 _obj_storage_path_opts.path.native(), part_num, _st);
346
0
        return;
347
0
    }
348
469
    const auto& client = _obj_client->get();
349
469
    if (nullptr == client) {
350
0
        LOG_WARNING("failed to upload part, key={}, part_num={} bacause of null obj client",
351
0
                    _obj_storage_path_opts.key, part_num);
352
0
        buf.set_status(Status::InternalError<false>("invalid obj storage client"));
353
0
        return;
354
0
    }
355
469
    auto resp = client->upload_part(_obj_storage_path_opts, buf.get_string_view_data(), part_num);
356
469
    if (resp.resp.status.code != ErrorCode::OK) {
357
1
        LOG_WARNING("failed to upload part, key={}, part_num={}, status={}",
358
1
                    _obj_storage_path_opts.key, part_num, resp.resp.status.msg);
359
1
        buf.set_status(Status(resp.resp.status.code, std::move(resp.resp.status.msg)));
360
1
        return;
361
1
    }
362
468
    s3_bytes_written_total << buf.get_size();
363
364
468
    ObjectCompleteMultiPart completed_part {
365
468
            static_cast<int>(part_num), resp.etag.has_value() ? std::move(resp.etag.value()) : ""};
366
367
468
    std::unique_lock<std::mutex> lck {_completed_lock};
368
468
    _completed_parts.emplace_back(std::move(completed_part));
369
468
}
370
371
// if enabled check
372
// 1. issue a head object request for existence check
373
// 2. check the file size
374
Status check_after_upload(ObjStorageClient* client, const ObjectStorageResponse& upload_res,
375
                          const ObjectStoragePathOptions& path_opt, int64_t bytes_appended,
376
24.6k
                          const std::string& put_or_comp) {
377
24.6k
    if (!config::enable_s3_object_check_after_upload) return Status::OK();
378
379
24.6k
    auto head_res = client->head_object(path_opt);
380
381
    // clang-format off
382
24.6k
    auto err_msg = [&]() {
383
0
        std::stringstream ss;
384
0
        ss << "failed to check object after upload=" << put_or_comp
385
0
            << " file_path=" << path_opt.path.native()
386
0
            << fmt::format(" {}_err=", put_or_comp) << upload_res.status.msg
387
0
            << fmt::format(" {}_code=", put_or_comp) << upload_res.status.code
388
0
            << fmt::format(" {}_http_code=", put_or_comp) << upload_res.http_code
389
0
            << fmt::format(" {}_request_id=", put_or_comp) << upload_res.request_id
390
0
            << " head_err=" << head_res.resp.status.msg
391
0
            << " head_code=" << head_res.resp.status.code
392
0
            << " head_http_code=" << head_res.resp.http_code
393
0
            << " head_request_id=" << head_res.resp.request_id;
394
0
        return ss.str();
395
0
    };
396
    // clang-format on
397
398
    // TODO(gavin): make it fail by injection
399
24.6k
    TEST_SYNC_POINT_CALLBACK("S3FileWriter::check_after_load", &head_res);
400
24.6k
    if (head_res.resp.status.code != ErrorCode::OK && head_res.resp.http_code != 200) {
401
0
        LOG(WARNING) << "failed to issue head object after upload, " << err_msg();
402
0
        DCHECK(false) << "failed to issue head object after upload, " << err_msg();
403
        // FIXME(gavin): we should retry if this HEAD fails?
404
0
        return Status::IOError(
405
0
                "failed to issue head object after upload, status_code={}, http_code={}, err={}",
406
0
                head_res.resp.status.code, head_res.resp.http_code, head_res.resp.status.msg);
407
0
    }
408
24.6k
    if (head_res.file_size != bytes_appended) {
409
0
        LOG(WARNING) << "failed to check size after upload, expected_size=" << bytes_appended
410
0
                     << " actual_size=" << head_res.file_size << err_msg();
411
0
        DCHECK_EQ(bytes_appended, head_res.file_size)
412
0
                << "failed to check size after upload," << err_msg();
413
0
        return Status::IOError(
414
0
                "failed to check object size after upload, expected_size={} actual_size={}",
415
0
                bytes_appended, head_res.file_size);
416
0
    }
417
24.6k
    return Status::OK();
418
24.6k
}
419
420
24.6k
Status S3FileWriter::_complete() {
421
24.6k
    const auto& client = _obj_client->get();
422
24.6k
    if (nullptr == client) {
423
0
        return Status::InternalError<false>("invalid obj storage client");
424
0
    }
425
24.6k
    if (_failed) {
426
0
        _wait_until_finish("early quit");
427
0
        return _st;
428
0
    }
429
    // When the part num is only one, it means the data is less than 5MB so we can just put it.
430
24.6k
    if (_cur_part_num == 1) {
431
24.5k
        _wait_until_finish("PutObject");
432
24.5k
        return _st;
433
24.5k
    }
434
    // Wait multipart load and finish.
435
122
    _wait_until_finish("Complete");
436
122
    TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:1",
437
122
                             std::make_pair(&_failed, &_completed_parts));
438
122
    if (_used_by_s3_committer) {    // S3 committer will complete multipart upload file on FE side.
439
0
        s3_file_created_total << 1; // Assume that it will be created successfully
440
0
        return Status::OK();
441
0
    }
442
443
    // check number of parts
444
122
    int64_t expected_num_parts1 = (_bytes_appended / config::s3_write_buffer_size) +
445
122
                                  !!(_bytes_appended % config::s3_write_buffer_size);
446
122
    int64_t expected_num_parts2 =
447
122
            (_bytes_appended % config::s3_write_buffer_size) ? _cur_part_num : _cur_part_num - 1;
448
122
    DCHECK_EQ(expected_num_parts1, expected_num_parts2)
449
0
            << " bytes_appended=" << _bytes_appended << " cur_part_num=" << _cur_part_num
450
0
            << " s3_write_buffer_size=" << config::s3_write_buffer_size;
451
122
    if (_failed || _completed_parts.size() != static_cast<size_t>(expected_num_parts1) ||
452
122
        expected_num_parts1 != expected_num_parts2) {
453
3
        _st = Status::InternalError(
454
3
                "failed to complete multipart upload, error status={} failed={} #complete_parts={} "
455
3
                "#expected_parts={} "
456
3
                "completed_parts_list={} file_path={} file_size={} has left buffer not uploaded={}",
457
3
                _st, _failed, _completed_parts.size(), expected_num_parts1, _dump_completed_part(),
458
3
                _obj_storage_path_opts.path.native(), _bytes_appended, _pending_buf != nullptr);
459
3
        LOG(WARNING) << _st;
460
3
        return _st;
461
3
    }
462
    // make sure _completed_parts are ascending order
463
119
    std::sort(_completed_parts.begin(), _completed_parts.end(),
464
952
              [](auto& p1, auto& p2) { return p1.part_num < p2.part_num; });
465
119
    TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:2", &_completed_parts);
466
119
    LOG(INFO) << "complete_multipart_upload " << _obj_storage_path_opts.path.native()
467
119
              << " size=" << _bytes_appended << " number_parts=" << _completed_parts.size()
468
119
              << " s3_write_buffer_size=" << config::s3_write_buffer_size;
469
119
    auto resp = client->complete_multipart_upload(_obj_storage_path_opts, _completed_parts);
470
119
    if (resp.status.code != ErrorCode::OK) {
471
2
        LOG_WARNING("failed to complete multipart upload, err={}, file_path={}", resp.status.msg,
472
2
                    _obj_storage_path_opts.path.native());
473
2
        return {resp.status.code, std::move(resp.status.msg)};
474
2
    }
475
476
117
    RETURN_IF_ERROR(check_after_upload(client.get(), resp, _obj_storage_path_opts, _bytes_appended,
477
117
                                       "complete_multipart"));
478
479
117
    s3_file_created_total << 1;
480
117
    return Status::OK();
481
117
}
482
483
24.5k
Status S3FileWriter::_set_upload_to_remote_less_than_buffer_size() {
484
24.5k
    auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
485
24.5k
    DCHECK(buf != nullptr);
486
24.5k
    if (_used_by_s3_committer) {
487
        // If used_by_s3_committer, we always use multi-parts uploading.
488
0
        buf->set_upload_to_remote([part_num = _cur_part_num, this](UploadFileBuffer& buf) {
489
0
            _upload_one_part(part_num, buf);
490
0
        });
491
0
        DCHECK(_cur_part_num == 1);
492
0
        RETURN_IF_ERROR(_create_multi_upload_request());
493
24.5k
    } else {
494
        // if we only need to upload one file less than 5MB, we can just
495
        // call PutObject to reduce the network IO
496
24.5k
        buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); });
497
24.5k
    }
498
24.5k
    return Status::OK();
499
24.5k
}
500
501
23.5k
void S3FileWriter::_put_object(UploadFileBuffer& buf) {
502
23.5k
    MonotonicStopWatch timer;
503
23.5k
    timer.start();
504
505
23.5k
    if (state() == State::CLOSED) {
506
0
        DCHECK(state() != State::CLOSED)
507
0
                << "state=" << (int)state() << " path=" << _obj_storage_path_opts.path.native();
508
0
        LOG_WARNING("failed to put object because file closed, file path {}",
509
0
                    _obj_storage_path_opts.path.native());
510
0
        buf.set_status(Status::InternalError<false>("try to put closed file"));
511
0
        return;
512
0
    }
513
23.5k
    const auto& client = _obj_client->get();
514
23.5k
    if (nullptr == client) {
515
0
        buf.set_status(Status::InternalError<false>("invalid obj storage client"));
516
0
        return;
517
0
    }
518
23.5k
    TEST_SYNC_POINT_RETURN_WITH_VOID("S3FileWriter::_put_object", this, &buf);
519
23.5k
    auto resp = client->put_object(_obj_storage_path_opts, buf.get_string_view_data());
520
23.5k
    timer.stop();
521
522
23.5k
    if (resp.status.code != ErrorCode::OK) {
523
11
        LOG_WARNING("failed to put object, put object failed because {}, file path {}, time={}ms",
524
11
                    resp.status.msg, _obj_storage_path_opts.path.native(),
525
11
                    timer.elapsed_time_milliseconds());
526
11
        buf.set_status({resp.status.code, std::move(resp.status.msg)});
527
11
        return;
528
11
    }
529
530
23.5k
    auto st = check_after_upload(client.get(), resp, _obj_storage_path_opts, _bytes_appended,
531
23.5k
                                 "put_object");
532
23.5k
    if (!st.ok()) {
533
0
        buf.set_status(st);
534
0
        return;
535
0
    }
536
537
23.5k
    LOG(INFO) << "put_object " << _obj_storage_path_opts.path.native()
538
23.5k
              << " size=" << _bytes_appended << " time=" << timer.elapsed_time_milliseconds()
539
23.5k
              << "ms";
540
23.5k
    s3_file_created_total << 1;
541
23.5k
    s3_bytes_written_total << buf.get_size();
542
23.5k
}
543
544
3
std::string S3FileWriter::_dump_completed_part() const {
545
3
    std::stringstream ss;
546
3
    ss << "part_numbers:";
547
3
    for (const auto& part : _completed_parts) {
548
2
        ss << " " << part.part_num;
549
2
    }
550
3
    return ss.str();
551
3
}
552
#include "common/compile_check_end.h"
553
554
} // namespace doris::io