Coverage Report

Created: 2026-03-12 17:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/s3_file_writer.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <bthread/countdown_event.h>
21
22
#include <chrono>
23
#include <cstddef>
24
#include <memory>
25
#include <optional>
26
#include <string>
27
28
#include "common/status.h"
29
#include "io/fs/file_system.h"
30
#include "io/fs/file_writer.h"
31
#include "io/fs/obj_storage_client.h"
32
#include "io/fs/path.h"
33
#include "io/fs/s3_file_bufferpool.h"
34
35
namespace Aws::S3 {
36
namespace Model {
37
class CompletedPart;
38
}
39
class S3Client;
40
} // namespace Aws::S3
41
42
namespace doris {
43
struct S3Conf;
44
45
namespace io {
46
struct S3FileBuffer;
47
class S3FileSystem;
48
struct AsyncCloseStatusPack;
49
class ObjClientHolder;
50
51
class S3FileWriter final : public FileWriter {
52
public:
53
    S3FileWriter(std::shared_ptr<ObjClientHolder> client, std::string bucket, std::string key,
54
                 const FileWriterOptions* opts);
55
    ~S3FileWriter() override;
56
57
    Status appendv(const Slice* data, size_t data_cnt) override;
58
59
98
    const Path& path() const override { return _obj_storage_path_opts.path; }
60
778k
    size_t bytes_appended() const override { return _bytes_appended; }
61
673k
    State state() const override { return _state; }
62
63
0
    const std::vector<ObjectCompleteMultiPart>& completed_parts() const { return _completed_parts; }
64
65
0
    const std::string& key() const { return _obj_storage_path_opts.key; }
66
0
    const std::string& bucket() const { return _obj_storage_path_opts.bucket; }
67
2
    std::string upload_id() const {
68
2
        return _obj_storage_path_opts.upload_id.has_value()
69
2
                       ? _obj_storage_path_opts.upload_id.value()
70
2
                       : std::string();
71
2
    }
72
73
    Status close(bool non_block = false) override;
74
75
private:
76
    Status _close_impl();
77
    Status _abort();
78
    [[nodiscard]] std::string _dump_completed_part() const;
79
    void _wait_until_finish(std::string_view task_name);
80
    Status _complete();
81
    Status _create_multi_upload_request();
82
    Status _set_upload_to_remote_less_than_buffer_size();
83
    void _put_object(UploadFileBuffer& buf);
84
    void _upload_one_part(int part_num, UploadFileBuffer& buf);
85
    bool _complete_part_task_callback(Status s);
86
    Status _build_upload_buffer();
87
88
    ObjectStoragePathOptions _obj_storage_path_opts;
89
90
    // Current Part Num for CompletedPart
91
    int _cur_part_num = 1;
92
    std::mutex _completed_lock;
93
    std::vector<ObjectCompleteMultiPart> _completed_parts;
94
95
    // **Attention** call add_count() before submitting buf to async thread pool
96
    bthread::CountdownEvent _countdown_event {0};
97
98
    std::atomic_bool _failed = false;
99
100
    Status _st;
101
    size_t _bytes_appended = 0;
102
103
    std::shared_ptr<FileBuffer> _pending_buf;
104
105
    // S3 committer will start multipart uploading all files on BE side,
106
    // and then complete multipart upload these files on FE side.
107
    // If you do not complete multi parts of a file, the file will not be visible.
108
    // So in this way, the atomicity of a single file can be guaranteed. But it still cannot
109
    // guarantee the atomicity of multiple files.
110
    // Because hive committers have best-effort semantics,
111
    // this shortens the inconsistent time window.
112
    bool _used_by_s3_committer;
113
    std::unique_ptr<AsyncCloseStatusPack> _async_close_pack;
114
    State _state {State::OPENED};
115
    std::shared_ptr<ObjClientHolder> _obj_client;
116
    std::optional<std::chrono::steady_clock::time_point> _first_append_timestamp;
117
    bool _close_latency_recorded = false;
118
};
119
120
} // namespace io
121
} // namespace doris