Coverage Report

Created: 2026-03-13 03:47

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/s3_file_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/s3_file_writer.h"
19
20
#include <aws/s3/model/CompletedPart.h>
21
#include <bvar/recorder.h>
22
#include <bvar/reducer.h>
23
#include <bvar/window.h>
24
#include <fmt/core.h>
25
#include <glog/logging.h>
26
27
#include <sstream>
28
#include <tuple>
29
#include <utility>
30
31
#include "common/config.h"
32
#include "common/status.h"
33
#include "cpp/sync_point.h"
34
#include "io/cache/block_file_cache.h"
35
#include "io/cache/block_file_cache_factory.h"
36
#include "io/cache/file_block.h"
37
#include "io/cache/file_cache_common.h"
38
#include "io/fs/file_writer.h"
39
#include "io/fs/path.h"
40
#include "io/fs/s3_file_bufferpool.h"
41
#include "io/fs/s3_file_system.h"
42
#include "io/fs/s3_obj_storage_client.h"
43
#include "runtime/exec_env.h"
44
#include "util/s3_util.h"
45
#include "util/stopwatch.hpp"
46
47
namespace doris::io {
48
#include "common/compile_check_begin.h"
49
50
bvar::Adder<uint64_t> s3_file_writer_total("s3_file_writer_total_num");
51
bvar::Adder<uint64_t> s3_bytes_written_total("s3_file_writer_bytes_written");
52
bvar::Adder<uint64_t> s3_file_created_total("s3_file_writer_file_created");
53
bvar::Adder<uint64_t> s3_file_being_written("s3_file_writer_file_being_written");
54
bvar::Adder<uint64_t> s3_file_writer_async_close_queuing("s3_file_writer_async_close_queuing");
55
bvar::Adder<uint64_t> s3_file_writer_async_close_processing(
56
        "s3_file_writer_async_close_processing");
57
bvar::IntRecorder s3_file_writer_first_append_to_close_ms_recorder;
58
bvar::Window<bvar::IntRecorder> s3_file_writer_first_append_to_close_ms_window(
59
        "s3_file_writer_first_append_to_close_ms",
60
        &s3_file_writer_first_append_to_close_ms_recorder, /*window_size=*/10);
61
62
S3FileWriter::S3FileWriter(std::shared_ptr<ObjClientHolder> client, std::string bucket,
63
                           std::string key, const FileWriterOptions* opts)
64
72.1k
        : _obj_storage_path_opts({.path = fmt::format("s3://{}/{}", bucket, key),
65
72.1k
                                  .bucket = std::move(bucket),
66
72.1k
                                  .key = std::move(key)}),
67
72.1k
          _used_by_s3_committer(opts ? opts->used_by_s3_committer : false),
68
72.1k
          _obj_client(std::move(client)) {
69
72.1k
    s3_file_writer_total << 1;
70
72.1k
    s3_file_being_written << 1;
71
72.1k
    Aws::Http::SetCompliantRfc3986Encoding(true);
72
73
72.1k
    init_cache_builder(opts, _obj_storage_path_opts.path);
74
72.1k
}
75
76
72.4k
S3FileWriter::~S3FileWriter() {
77
72.4k
    if (_async_close_pack != nullptr) {
78
        // For thread safety
79
14.1k
        std::ignore = _async_close_pack->future.get();
80
14.1k
        _async_close_pack = nullptr;
81
58.3k
    } else {
82
        // Consider one situation where the file writer is destructed after it submit at least one async task
83
        // without calling close(), then there exists one occasion where the async task is executed right after
84
        // the correspoding S3 file writer is already destructed
85
58.3k
        _wait_until_finish(fmt::format("wait s3 file {} upload to be finished",
86
58.3k
                                       _obj_storage_path_opts.path.native()));
87
58.3k
    }
88
    // We won't do S3 abort operation in BE, we let s3 service do it own.
89
72.4k
    if (state() == State::OPENED && !_failed) {
90
1.07k
        s3_bytes_written_total << _bytes_appended;
91
1.07k
    }
92
72.4k
    s3_file_being_written << -1;
93
72.4k
}
94
95
115
Status S3FileWriter::_create_multi_upload_request() {
96
115
    LOG(INFO) << "create_multi_upload_request " << _obj_storage_path_opts.path.native();
97
115
    const auto& client = _obj_client->get();
98
115
    if (nullptr == client) {
99
0
        return Status::InternalError<false>("invalid obj storage client");
100
0
    }
101
115
    auto resp = client->create_multipart_upload(_obj_storage_path_opts);
102
115
    if (resp.resp.status.code == ErrorCode::OK) {
103
114
        _obj_storage_path_opts.upload_id = resp.upload_id;
104
114
    }
105
115
    return {resp.resp.status.code, std::move(resp.resp.status.msg)};
106
115
}
107
108
129k
void S3FileWriter::_wait_until_finish(std::string_view task_name) {
109
129k
    auto timeout_duration = config::s3_file_writer_log_interval_second;
110
129k
    auto msg = fmt::format(
111
129k
            "{} multipart upload already takes {} seconds, bucket={}, key={}, upload_id={}",
112
129k
            task_name, timeout_duration, _obj_storage_path_opts.bucket,
113
129k
            _obj_storage_path_opts.path.native(),
114
129k
            _obj_storage_path_opts.upload_id.has_value() ? *_obj_storage_path_opts.upload_id : "");
115
129k
    timespec current_time;
116
    // We don't need high accuracy here, so we use time(nullptr)
117
    // since it's the fastest way to get current time(second)
118
129k
    auto current_time_second = time(nullptr);
119
129k
    current_time.tv_sec = current_time_second + timeout_duration;
120
129k
    current_time.tv_nsec = 0;
121
    // bthread::countdown_event::timed_wait() should use absolute time
122
129k
    while (0 != _countdown_event.timed_wait(current_time)) {
123
0
        current_time.tv_sec += timeout_duration;
124
0
        LOG(WARNING) << msg;
125
0
    }
126
129k
}
127
128
128k
Status S3FileWriter::close(bool non_block) {
129
128k
    auto record_close_latency = [this]() {
130
57.2k
        if (_close_latency_recorded || !_first_append_timestamp.has_value()) {
131
33
            return;
132
33
        }
133
57.2k
        auto now = std::chrono::steady_clock::now();
134
57.2k
        auto latency_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
135
57.2k
                                  now - *_first_append_timestamp)
136
57.2k
                                  .count();
137
57.2k
        s3_file_writer_first_append_to_close_ms_recorder << latency_ms;
138
57.2k
        if (auto* sampler = s3_file_writer_first_append_to_close_ms_recorder.get_sampler()) {
139
57.2k
            sampler->take_sample();
140
57.2k
        }
141
57.2k
        _close_latency_recorded = true;
142
57.2k
    };
143
144
128k
    if (state() == State::CLOSED) {
145
2.02k
        if (_async_close_pack != nullptr) {
146
1.02k
            _st = _async_close_pack->future.get();
147
1.02k
            _async_close_pack = nullptr;
148
            // Return the final close status so that a blocking close issued after
149
            // an async close observes the real result just like the legacy behavior.
150
1.02k
            if (!non_block && _st.ok()) {
151
26
                record_close_latency();
152
26
            }
153
1.02k
            return _st;
154
1.02k
        }
155
1.00k
        if (non_block) {
156
1.00k
            if (_st.ok()) {
157
1.00k
                record_close_latency();
158
1.00k
                return Status::Error<ErrorCode::ALREADY_CLOSED>(
159
1.00k
                        "S3FileWriter already closed, file path {}, file key {}",
160
1.00k
                        _obj_storage_path_opts.path.native(), _obj_storage_path_opts.key);
161
1.00k
            }
162
0
            return _st;
163
1.00k
        }
164
0
        if (_st.ok()) {
165
0
            record_close_latency();
166
0
            return Status::Error<ErrorCode::ALREADY_CLOSED>(
167
0
                    "S3FileWriter already closed, file path {}, file key {}",
168
0
                    _obj_storage_path_opts.path.native(), _obj_storage_path_opts.key);
169
0
        }
170
0
        return _st;
171
0
    }
172
126k
    if (state() == State::ASYNC_CLOSING) {
173
54.8k
        if (non_block) {
174
0
            return Status::InternalError("Don't submit async close multi times");
175
0
        }
176
54.8k
        CHECK(_async_close_pack != nullptr);
177
54.8k
        _st = _async_close_pack->future.get();
178
54.8k
        _async_close_pack = nullptr;
179
        // We should wait for all the pre async task to be finished
180
54.8k
        _state = State::CLOSED;
181
        // The next time we call close() with no matter non_block true or false, it would always return the
182
        // '_st' value because this writer is already closed.
183
54.8k
        if (!non_block && _st.ok()) {
184
54.8k
            record_close_latency();
185
54.8k
        }
186
54.8k
        return _st;
187
54.8k
    }
188
71.1k
    if (non_block) {
189
69.8k
        _state = State::ASYNC_CLOSING;
190
69.8k
        _async_close_pack = std::make_unique<AsyncCloseStatusPack>();
191
69.8k
        _async_close_pack->future = _async_close_pack->promise.get_future();
192
69.8k
        s3_file_writer_async_close_queuing << 1;
193
69.9k
        return ExecEnv::GetInstance()->non_block_close_thread_pool()->submit_func([&]() {
194
69.9k
            s3_file_writer_async_close_queuing << -1;
195
69.9k
            s3_file_writer_async_close_processing << 1;
196
69.9k
            _st = _close_impl();
197
69.9k
            _state = State::CLOSED;
198
69.9k
            _async_close_pack->promise.set_value(_st);
199
69.9k
            s3_file_writer_async_close_processing << -1;
200
69.9k
        });
201
69.8k
    }
202
1.34k
    _st = _close_impl();
203
1.34k
    _state = State::CLOSED;
204
1.42k
    if (!non_block && _st.ok()) {
205
1.41k
        record_close_latency();
206
1.41k
    }
207
1.34k
    return _st;
208
71.1k
}
209
210
71.7k
bool S3FileWriter::_complete_part_task_callback(Status s) {
211
71.7k
    bool ret = false;
212
71.7k
    if (!s.ok()) [[unlikely]] {
213
14
        VLOG_NOTICE << "failed at key: " << _obj_storage_path_opts.key
214
0
                    << ", status: " << s.to_string();
215
14
        std::unique_lock<std::mutex> _lck {_completed_lock};
216
14
        _failed = true;
217
14
        ret = true;
218
14
        _st = std::move(s);
219
14
    }
220
    // After the signal, there is a scenario where the previous invocation of _wait_until_finish
221
    // returns to the caller, and subsequently, the S3 file writer is destructed.
222
    // This means that accessing _failed afterwards would result in a heap use after free vulnerability.
223
71.7k
    _countdown_event.signal();
224
71.7k
    return ret;
225
71.7k
}
226
227
71.7k
Status S3FileWriter::_build_upload_buffer() {
228
71.7k
    auto builder = FileBufferBuilder();
229
71.7k
    builder.set_type(BufferType::UPLOAD)
230
71.7k
            .set_upload_callback([part_num = _cur_part_num, this](UploadFileBuffer& buf) {
231
445
                _upload_one_part(part_num, buf);
232
445
            })
233
71.7k
            .set_file_offset(_bytes_appended)
234
71.7k
            .set_sync_after_complete_task([this](auto&& PH1) {
235
71.7k
                return _complete_part_task_callback(std::forward<decltype(PH1)>(PH1));
236
71.7k
            })
237
72.1k
            .set_is_cancelled([this]() { return _failed.load(); });
238
71.7k
    if (_cache_builder != nullptr) {
239
        // We would load the data into file cache asynchronously which indicates
240
        // that this instance of S3FileWriter might have been destructed when we
241
        // try to do writing into file cache, so we make the lambda capture the variable
242
        // we need by value to extend their lifetime
243
58.8k
        int64_t id = get_tablet_id(_obj_storage_path_opts.path.native()).value_or(0);
244
58.8k
        builder.set_allocate_file_blocks_holder([builder = *_cache_builder,
245
58.8k
                                                 offset = _bytes_appended,
246
58.8k
                                                 tablet_id = id]() -> FileBlocksHolderPtr {
247
58.7k
            return builder.allocate_cache_holder(offset, config::s3_write_buffer_size, tablet_id);
248
58.7k
        });
249
58.8k
    }
250
71.7k
    RETURN_IF_ERROR(builder.build(&_pending_buf));
251
71.7k
    auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
252
71.7k
    DCHECK(buf != nullptr);
253
71.7k
    return Status::OK();
254
71.7k
}
255
256
71.4k
Status S3FileWriter::_close_impl() {
257
18.4E
    VLOG_DEBUG << "S3FileWriter::close, path: " << _obj_storage_path_opts.path.native();
258
259
71.4k
    if (_cur_part_num == 1 && _pending_buf) { // data size is less than config::s3_write_buffer_size
260
71.2k
        RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size());
261
71.2k
    }
262
263
71.4k
    if (_bytes_appended == 0) {
264
36
        DCHECK_EQ(_cur_part_num, 1);
265
        // No data written, but need to create an empty file
266
36
        RETURN_IF_ERROR(_build_upload_buffer());
267
36
        if (!_used_by_s3_committer) {
268
36
            auto* pending_buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
269
36
            pending_buf->set_upload_to_remote([this](UploadFileBuffer& buf) { _put_object(buf); });
270
36
        } else {
271
0
            RETURN_IF_ERROR(_create_multi_upload_request());
272
0
        }
273
36
    }
274
275
71.4k
    if (_pending_buf != nullptr) { // there is remaining data in buffer need to be uploaded
276
71.4k
        _countdown_event.add_count();
277
71.4k
        RETURN_IF_ERROR(FileBuffer::submit(std::move(_pending_buf)));
278
71.4k
        _pending_buf = nullptr;
279
71.4k
    }
280
281
71.4k
    RETURN_IF_ERROR(_complete());
282
71.4k
    SYNC_POINT_RETURN_WITH_VALUE("s3_file_writer::close", Status());
283
284
71.3k
    return Status::OK();
285
71.3k
}
286
287
2.40M
Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
288
2.40M
    if (state() != State::OPENED) [[unlikely]] {
289
0
        return Status::InternalError("append to closed file: {}",
290
0
                                     _obj_storage_path_opts.path.native());
291
0
    }
292
293
2.40M
    if (!_first_append_timestamp.has_value()) {
294
70.3k
        _first_append_timestamp = std::chrono::steady_clock::now();
295
70.3k
    }
296
297
2.40M
    size_t buffer_size = config::s3_write_buffer_size;
298
2.40M
    TEST_SYNC_POINT_RETURN_WITH_VALUE("s3_file_writer::appenv", Status());
299
10.0M
    for (size_t i = 0; i < data_cnt; i++) {
300
7.67M
        size_t data_size = data[i].get_size();
301
14.5M
        for (size_t pos = 0, data_size_to_append = 0; pos < data_size; pos += data_size_to_append) {
302
6.91M
            if (_failed) {
303
0
                return _st;
304
0
            }
305
6.91M
            if (!_pending_buf) {
306
70.4k
                RETURN_IF_ERROR(_build_upload_buffer());
307
70.4k
            }
308
            // we need to make sure all parts except the last one to be 5MB or more
309
            // and shouldn't be larger than buf
310
6.91M
            data_size_to_append = std::min(data_size - pos, _pending_buf->get_file_offset() +
311
6.91M
                                                                    buffer_size - _bytes_appended);
312
313
            // if the buffer has memory buf inside, the data would be written into memory first then S3 then file cache
314
            // it would be written to cache then S3 if the buffer doesn't have memory preserved
315
6.91M
            RETURN_IF_ERROR(_pending_buf->append_data(
316
6.91M
                    Slice {data[i].get_data() + pos, data_size_to_append}));
317
6.91M
            TEST_SYNC_POINT_CALLBACK("s3_file_writer::appenv_1", &_pending_buf, _cur_part_num);
318
319
            // If this is the last part and the data size is less than s3_write_buffer_size,
320
            // the pending_buf will be handled by _close_impl() and _complete()
321
            // If this is the last part and the data size is equal to s3_write_buffer_size,
322
            // the pending_buf is handled here and submitted. it will be waited by _complete()
323
6.91M
            if (_pending_buf->get_size() == buffer_size) {
324
                // only create multiple upload request when the data size is
325
                // larger or equal to s3_write_buffer_size than one memory buffer
326
90
                if (_cur_part_num == 1) {
327
69
                    RETURN_IF_ERROR(_create_multi_upload_request());
328
69
                }
329
90
                _cur_part_num++;
330
90
                _countdown_event.add_count();
331
90
                RETURN_IF_ERROR(FileBuffer::submit(std::move(_pending_buf)));
332
90
                _pending_buf = nullptr;
333
90
            }
334
6.91M
            _bytes_appended += data_size_to_append;
335
6.91M
        }
336
7.67M
    }
337
2.40M
    return Status::OK();
338
2.40M
}
339
340
445
void S3FileWriter::_upload_one_part(int part_num, UploadFileBuffer& buf) {
341
445
    VLOG_DEBUG << "upload_one_part " << _obj_storage_path_opts.path.native()
342
0
               << " part=" << part_num;
343
445
    if (buf.is_cancelled()) {
344
0
        LOG_INFO("file {} skip part {} because previous failure {}",
345
0
                 _obj_storage_path_opts.path.native(), part_num, _st);
346
0
        return;
347
0
    }
348
445
    const auto& client = _obj_client->get();
349
445
    if (nullptr == client) {
350
0
        LOG_WARNING("failed to upload part, key={}, part_num={} bacause of null obj client",
351
0
                    _obj_storage_path_opts.key, part_num);
352
0
        buf.set_status(Status::InternalError<false>("invalid obj storage client"));
353
0
        return;
354
0
    }
355
445
    auto resp = client->upload_part(_obj_storage_path_opts, buf.get_string_view_data(), part_num);
356
445
    if (resp.resp.status.code != ErrorCode::OK) {
357
1
        LOG_WARNING("failed to upload part, key={}, part_num={}, status={}",
358
1
                    _obj_storage_path_opts.key, part_num, resp.resp.status.msg);
359
1
        buf.set_status(Status(resp.resp.status.code, std::move(resp.resp.status.msg)));
360
1
        return;
361
1
    }
362
444
    s3_bytes_written_total << buf.get_size();
363
364
444
    ObjectCompleteMultiPart completed_part {
365
444
            static_cast<int>(part_num), resp.etag.has_value() ? std::move(resp.etag.value()) : ""};
366
367
444
    std::unique_lock<std::mutex> lck {_completed_lock};
368
444
    _completed_parts.emplace_back(std::move(completed_part));
369
444
}
370
371
// if enabled check
372
// 1. issue a head object request for existence check
373
// 2. check the file size
374
Status check_after_upload(ObjStorageClient* client, const ObjectStorageResponse& upload_res,
375
                          const ObjectStoragePathOptions& path_opt, int64_t bytes_appended,
376
71.3k
                          const std::string& put_or_comp) {
377
71.3k
    if (!config::enable_s3_object_check_after_upload) return Status::OK();
378
379
71.3k
    auto head_res = client->head_object(path_opt);
380
381
    // clang-format off
382
71.3k
    auto err_msg = [&]() {
383
0
        std::stringstream ss;
384
0
        ss << "failed to check object after upload=" << put_or_comp
385
0
            << " file_path=" << path_opt.path.native()
386
0
            << fmt::format(" {}_err=", put_or_comp) << upload_res.status.msg
387
0
            << fmt::format(" {}_code=", put_or_comp) << upload_res.status.code
388
0
            << fmt::format(" {}_http_code=", put_or_comp) << upload_res.http_code
389
0
            << fmt::format(" {}_request_id=", put_or_comp) << upload_res.request_id
390
0
            << " head_err=" << head_res.resp.status.msg
391
0
            << " head_code=" << head_res.resp.status.code
392
0
            << " head_http_code=" << head_res.resp.http_code
393
0
            << " head_request_id=" << head_res.resp.request_id;
394
0
        return ss.str();
395
0
    };
396
    // clang-format on
397
398
    // TODO(gavin): make it fail by injection
399
71.3k
    TEST_SYNC_POINT_CALLBACK("S3FileWriter::check_after_load", &head_res);
400
71.3k
    if (head_res.resp.status.code != ErrorCode::OK && head_res.resp.http_code != 200) {
401
0
        LOG(WARNING) << "failed to issue head object after upload, " << err_msg();
402
0
        DCHECK(false) << "failed to issue head object after upload, " << err_msg();
403
        // FIXME(gavin): we should retry if this HEAD fails?
404
0
        return Status::IOError(
405
0
                "failed to issue head object after upload, status_code={}, http_code={}, err={}",
406
0
                head_res.resp.status.code, head_res.resp.http_code, head_res.resp.status.msg);
407
0
    }
408
71.3k
    if (head_res.file_size != bytes_appended) {
409
0
        LOG(WARNING) << "failed to check size after upload, expected_size=" << bytes_appended
410
0
                     << " actual_size=" << head_res.file_size << err_msg();
411
0
        DCHECK_EQ(bytes_appended, head_res.file_size)
412
0
                << "failed to check size after upload," << err_msg();
413
0
        return Status::IOError(
414
0
                "failed to check object size after upload, expected_size={} actual_size={}",
415
0
                bytes_appended, head_res.file_size);
416
0
    }
417
71.3k
    return Status::OK();
418
71.3k
}
419
420
71.4k
Status S3FileWriter::_complete() {
421
71.4k
    const auto& client = _obj_client->get();
422
71.4k
    if (nullptr == client) {
423
0
        return Status::InternalError<false>("invalid obj storage client");
424
0
    }
425
71.4k
    if (_failed) {
426
0
        _wait_until_finish("early quit");
427
0
        return _st;
428
0
    }
429
    // When the part num is only one, it means the data is less than 5MB so we can just put it.
430
71.4k
    if (_cur_part_num == 1) {
431
71.3k
        _wait_until_finish("PutObject");
432
71.3k
        return _st;
433
71.3k
    }
434
    // Wait multipart load and finish.
435
113
    _wait_until_finish("Complete");
436
113
    TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:1",
437
113
                             std::make_pair(&_failed, &_completed_parts));
438
113
    if (_used_by_s3_committer) {    // S3 committer will complete multipart upload file on FE side.
439
0
        s3_file_created_total << 1; // Assume that it will be created successfully
440
0
        return Status::OK();
441
0
    }
442
443
    // check number of parts
444
113
    int64_t expected_num_parts1 = (_bytes_appended / config::s3_write_buffer_size) +
445
113
                                  !!(_bytes_appended % config::s3_write_buffer_size);
446
113
    int64_t expected_num_parts2 =
447
113
            (_bytes_appended % config::s3_write_buffer_size) ? _cur_part_num : _cur_part_num - 1;
448
113
    DCHECK_EQ(expected_num_parts1, expected_num_parts2)
449
0
            << " bytes_appended=" << _bytes_appended << " cur_part_num=" << _cur_part_num
450
0
            << " s3_write_buffer_size=" << config::s3_write_buffer_size;
451
113
    if (_failed || _completed_parts.size() != static_cast<size_t>(expected_num_parts1) ||
452
113
        expected_num_parts1 != expected_num_parts2) {
453
3
        _st = Status::InternalError(
454
3
                "failed to complete multipart upload, error status={} failed={} #complete_parts={} "
455
3
                "#expected_parts={} "
456
3
                "completed_parts_list={} file_path={} file_size={} has left buffer not uploaded={}",
457
3
                _st, _failed, _completed_parts.size(), expected_num_parts1, _dump_completed_part(),
458
3
                _obj_storage_path_opts.path.native(), _bytes_appended, _pending_buf != nullptr);
459
3
        LOG(WARNING) << _st;
460
3
        return _st;
461
3
    }
462
    // make sure _completed_parts are ascending order
463
110
    std::sort(_completed_parts.begin(), _completed_parts.end(),
464
1.01k
              [](auto& p1, auto& p2) { return p1.part_num < p2.part_num; });
465
110
    TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:2", &_completed_parts);
466
110
    LOG(INFO) << "complete_multipart_upload " << _obj_storage_path_opts.path.native()
467
110
              << " size=" << _bytes_appended << " number_parts=" << _completed_parts.size()
468
110
              << " s3_write_buffer_size=" << config::s3_write_buffer_size;
469
110
    auto resp = client->complete_multipart_upload(_obj_storage_path_opts, _completed_parts);
470
110
    if (resp.status.code != ErrorCode::OK) {
471
2
        LOG_WARNING("failed to complete multipart upload, err={}, file_path={}", resp.status.msg,
472
2
                    _obj_storage_path_opts.path.native());
473
2
        return {resp.status.code, std::move(resp.status.msg)};
474
2
    }
475
476
108
    RETURN_IF_ERROR(check_after_upload(client.get(), resp, _obj_storage_path_opts, _bytes_appended,
477
108
                                       "complete_multipart"));
478
479
108
    s3_file_created_total << 1;
480
108
    return Status::OK();
481
108
}
482
483
71.2k
Status S3FileWriter::_set_upload_to_remote_less_than_buffer_size() {
484
71.2k
    auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
485
71.2k
    DCHECK(buf != nullptr);
486
71.2k
    if (_used_by_s3_committer) {
487
        // If used_by_s3_committer, we always use multi-parts uploading.
488
0
        buf->set_upload_to_remote([part_num = _cur_part_num, this](UploadFileBuffer& buf) {
489
0
            _upload_one_part(part_num, buf);
490
0
        });
491
0
        DCHECK(_cur_part_num == 1);
492
0
        RETURN_IF_ERROR(_create_multi_upload_request());
493
71.2k
    } else {
494
        // if we only need to upload one file less than 5MB, we can just
495
        // call PutObject to reduce the network IO
496
71.2k
        buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); });
497
71.2k
    }
498
71.2k
    return Status::OK();
499
71.2k
}
500
501
70.2k
void S3FileWriter::_put_object(UploadFileBuffer& buf) {
502
70.2k
    MonotonicStopWatch timer;
503
70.2k
    timer.start();
504
505
70.2k
    if (state() == State::CLOSED) {
506
0
        DCHECK(state() != State::CLOSED)
507
0
                << "state=" << (int)state() << " path=" << _obj_storage_path_opts.path.native();
508
0
        LOG_WARNING("failed to put object because file closed, file path {}",
509
0
                    _obj_storage_path_opts.path.native());
510
0
        buf.set_status(Status::InternalError<false>("try to put closed file"));
511
0
        return;
512
0
    }
513
70.2k
    const auto& client = _obj_client->get();
514
70.2k
    if (nullptr == client) {
515
0
        buf.set_status(Status::InternalError<false>("invalid obj storage client"));
516
0
        return;
517
0
    }
518
70.2k
    TEST_SYNC_POINT_RETURN_WITH_VOID("S3FileWriter::_put_object", this, &buf);
519
70.2k
    auto resp = client->put_object(_obj_storage_path_opts, buf.get_string_view_data());
520
70.2k
    timer.stop();
521
522
70.2k
    if (resp.status.code != ErrorCode::OK) {
523
11
        LOG_WARNING("failed to put object, put object failed because {}, file path {}, time={}ms",
524
11
                    resp.status.msg, _obj_storage_path_opts.path.native(),
525
11
                    timer.elapsed_time_milliseconds());
526
11
        buf.set_status({resp.status.code, std::move(resp.status.msg)});
527
11
        return;
528
11
    }
529
530
70.2k
    auto st = check_after_upload(client.get(), resp, _obj_storage_path_opts, _bytes_appended,
531
70.2k
                                 "put_object");
532
70.2k
    if (!st.ok()) {
533
0
        buf.set_status(st);
534
0
        return;
535
0
    }
536
537
70.2k
    LOG(INFO) << "put_object " << _obj_storage_path_opts.path.native()
538
70.2k
              << " size=" << _bytes_appended << " time=" << timer.elapsed_time_milliseconds()
539
70.2k
              << "ms";
540
70.2k
    s3_file_created_total << 1;
541
70.2k
    s3_bytes_written_total << buf.get_size();
542
70.2k
}
543
544
3
std::string S3FileWriter::_dump_completed_part() const {
545
3
    std::stringstream ss;
546
3
    ss << "part_numbers:";
547
3
    for (const auto& part : _completed_parts) {
548
2
        ss << " " << part.part_num;
549
2
    }
550
3
    return ss.str();
551
3
}
552
#include "common/compile_check_end.h"
553
554
} // namespace doris::io