Coverage Report

Created: 2026-04-10 04:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/local_file_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/local_file_writer.h"
19
20
// IWYU pragma: no_include <bthread/errno.h>
21
#include <errno.h> // IWYU pragma: keep
22
#include <fcntl.h>
23
#include <glog/logging.h>
24
#include <sys/uio.h>
25
#include <unistd.h>
26
27
#include <algorithm>
28
#include <cstring>
29
#include <ostream>
30
#include <utility>
31
32
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
33
#include "common/cast_set.h"
34
#include "common/compiler_util.h" // IWYU pragma: keep
35
#include "common/macros.h"
36
#include "common/metrics/doris_metrics.h"
37
#include "common/status.h"
38
#include "cpp/sync_point.h"
39
#include "io/fs/err_utils.h"
40
#include "io/fs/file_writer.h"
41
#include "io/fs/local_file_system.h"
42
#include "io/fs/path.h"
43
#include "storage/data_dir.h"
44
#include "util/debug_points.h"
45
46
namespace doris::io {
47
namespace {
48
49
7.61k
Status sync_dir(const io::Path& dirname) {
50
7.61k
    TEST_SYNC_POINT_RETURN_WITH_VALUE("sync_dir", Status::IOError(""));
51
7.61k
    int fd;
52
7.61k
    RETRY_ON_EINTR(fd, ::open(dirname.c_str(), O_DIRECTORY | O_RDONLY));
53
7.61k
    if (-1 == fd) {
54
15
        return localfs_error(errno, fmt::format("failed to open {}", dirname.native()));
55
15
    }
56
7.59k
    Defer defer {[fd] { ::close(fd); }};
57
#ifdef __APPLE__
58
    if (fcntl(fd, F_FULLFSYNC) < 0) {
59
        return localfs_error(errno, fmt::format("failed to sync {}", dirname.native()));
60
    }
61
#else
62
7.59k
    if (0 != ::fdatasync(fd)) {
63
0
        return localfs_error(errno, fmt::format("failed to sync {}", dirname.native()));
64
0
    }
65
7.59k
#endif
66
7.59k
    return Status::OK();
67
7.59k
}
68
69
} // namespace
70
71
LocalFileWriter::LocalFileWriter(Path path, int fd, bool sync_data)
72
12.9k
        : _path(std::move(path)), _fd(fd), _sync_data(sync_data) {
73
12.9k
    DorisMetrics::instance()->local_file_open_writing->increment(1);
74
12.9k
    DorisMetrics::instance()->local_file_writer_total->increment(1);
75
12.9k
}
76
77
193k
size_t LocalFileWriter::bytes_appended() const {
78
193k
    return _bytes_appended;
79
193k
}
80
81
12.9k
LocalFileWriter::~LocalFileWriter() {
82
12.9k
    if (_state == State::OPENED) {
83
23
        _abort();
84
23
    }
85
12.9k
    DorisMetrics::instance()->local_file_open_writing->increment(-1);
86
12.9k
    DorisMetrics::instance()->file_created_total->increment(1);
87
12.9k
    DorisMetrics::instance()->local_bytes_written_total->increment(_bytes_appended);
88
12.9k
}
89
90
18.5k
Status LocalFileWriter::close(bool non_block) {
91
18.5k
    if (_state == State::CLOSED) {
92
0
        return Status::InternalError("LocalFileWriter already closed, file path {}",
93
0
                                     _path.native());
94
0
    }
95
18.5k
    if (_state == State::ASYNC_CLOSING) {
96
5.62k
        if (non_block) {
97
0
            return Status::InternalError("Don't submit async close multi times");
98
0
        }
99
        // Actucally the first time call to close(true) would return the value of _finalize, if it returned one
100
        // error status then the code would never call the second close(true)
101
5.62k
        _state = State::CLOSED;
102
5.62k
        return Status::OK();
103
5.62k
    }
104
12.9k
    if (non_block) {
105
5.63k
        _state = State::ASYNC_CLOSING;
106
7.30k
    } else {
107
7.30k
        _state = State::CLOSED;
108
7.30k
    }
109
12.9k
    return _close(_sync_data);
110
18.5k
}
111
112
23
void LocalFileWriter::_abort() {
113
23
    auto st = _close(false);
114
23
    if (!st.ok()) [[unlikely]] {
115
0
        LOG(WARNING) << "close file failed when abort file writer: " << st;
116
0
    }
117
23
    st = io::global_local_filesystem()->delete_file(_path);
118
23
    if (!st.ok()) [[unlikely]] {
119
0
        LOG(WARNING) << "delete file failed when abort file writer: " << st;
120
0
    }
121
23
}
122
123
69.0k
Status LocalFileWriter::appendv(const Slice* data, size_t data_cnt) {
124
69.0k
    TEST_SYNC_POINT_RETURN_WITH_VALUE("LocalFileWriter::appendv",
125
69.0k
                                      Status::IOError("inject io error"));
126
69.0k
    if (_state != State::OPENED) [[unlikely]] {
127
1
        return Status::InternalError("append to closed file: {}", _path.native());
128
1
    }
129
69.0k
    _dirty = true;
130
131
    // Convert the results into the iovec vector to request
132
    // and calculate the total bytes requested.
133
69.0k
    size_t bytes_req = 0;
134
69.0k
    std::vector<iovec> iov(data_cnt);
135
259k
    for (size_t i = 0; i < data_cnt; i++) {
136
190k
        const Slice& result = data[i];
137
190k
        bytes_req += result.size;
138
190k
        iov[i] = {result.data, result.size};
139
190k
    }
140
141
69.0k
    size_t completed_iov = 0;
142
69.0k
    size_t n_left = bytes_req;
143
69.0k
    while (n_left > 0) {
144
        // Never request more than IOV_MAX in one request.
145
69.0k
        size_t iov_count = std::min(data_cnt - completed_iov, static_cast<size_t>(IOV_MAX));
146
69.0k
        ssize_t res;
147
69.0k
        RETRY_ON_EINTR(res, SYNC_POINT_HOOK_RETURN_VALUE(::writev(_fd, iov.data() + completed_iov,
148
69.0k
                                                                  cast_set<int32_t>(iov_count)),
149
69.0k
                                                         "LocalFileWriter::writev", _fd));
150
69.0k
        DBUG_EXECUTE_IF("LocalFileWriter::appendv.io_error", {
151
69.0k
            auto sub_path = dp->param<std::string>("sub_path", "");
152
69.0k
            if ((sub_path.empty() && _path.filename().compare(kTestFilePath)) ||
153
69.0k
                (!sub_path.empty() && _path.native().find(sub_path) != std::string::npos)) {
154
69.0k
                res = -1;
155
69.0k
                errno = EIO;
156
69.0k
                LOG(WARNING) << Status::IOError("debug write io error: {}", _path.native());
157
69.0k
            }
158
69.0k
        });
159
69.0k
        if (UNLIKELY(res < 0)) {
160
1
            return localfs_error(errno, fmt::format("failed to write {}", _path.native()));
161
1
        }
162
163
69.0k
        if (LIKELY(res == n_left)) {
164
            // All requested bytes were read. This is almost always the case.
165
69.0k
            n_left = 0;
166
69.0k
            break;
167
69.0k
        }
168
        // Adjust iovec vector based on bytes read for the next request.
169
3
        ssize_t bytes_rem = res;
170
5
        for (size_t i = completed_iov; i < data_cnt; i++) {
171
5
            if (bytes_rem >= iov[i].iov_len) {
172
                // The full length of this iovec was written.
173
2
                completed_iov++;
174
2
                bytes_rem -= iov[i].iov_len;
175
3
            } else {
176
                // Partially wrote this result.
177
                // Adjust the iov_len and iov_base to write only the missing data.
178
3
                iov[i].iov_base = static_cast<uint8_t*>(iov[i].iov_base) + bytes_rem;
179
3
                iov[i].iov_len -= bytes_rem;
180
3
                break; // Don't need to adjust remaining iovec's.
181
3
            }
182
5
        }
183
3
        n_left -= res;
184
3
    }
185
69.0k
    DCHECK_EQ(0, n_left);
186
69.0k
    _bytes_appended += bytes_req;
187
69.0k
    return Status::OK();
188
69.0k
}
189
190
// TODO(ByteYue): Refactor this function as FileWriter::flush()
191
0
Status LocalFileWriter::_finalize() {
192
0
    TEST_SYNC_POINT_RETURN_WITH_VALUE("LocalFileWriter::finalize",
193
0
                                      Status::IOError("inject io error"));
194
0
    if (_state == State::OPENED) [[unlikely]] {
195
0
        return Status::InternalError("finalize closed file: {}", _path.native());
196
0
    }
197
198
0
    if (_dirty) {
199
0
#if defined(__linux__)
200
0
        int flags = SYNC_FILE_RANGE_WRITE;
201
0
        if (sync_file_range(_fd, 0, 0, flags) < 0) {
202
0
            return localfs_error(errno, fmt::format("failed to finalize {}", _path.native()));
203
0
        }
204
0
#endif
205
0
    }
206
0
    return Status::OK();
207
0
}
208
209
12.9k
Status LocalFileWriter::_close(bool sync) {
210
20.5k
    auto fd_reclaim_func = [&](Status st) {
211
20.5k
        if (_fd > 0 && 0 != ::close(_fd)) {
212
0
            return localfs_error(errno, fmt::format("failed to {}, along with failed to close {}",
213
0
                                                    st, _path.native()));
214
0
        }
215
20.5k
        _fd = -1;
216
20.5k
        return st;
217
20.5k
    };
218
12.9k
    if (sync && config::sync_file_on_close) {
219
7.61k
        if (_dirty) {
220
#ifdef __APPLE__
221
            if (fcntl(_fd, F_FULLFSYNC) < 0) [[unlikely]] {
222
                return fd_reclaim_func(
223
                        localfs_error(errno, fmt::format("failed to sync {}", _path.native())));
224
            }
225
#else
226
7.11k
            if (0 != ::fdatasync(_fd)) [[unlikely]] {
227
0
                return fd_reclaim_func(
228
0
                        localfs_error(errno, fmt::format("failed to sync {}", _path.native())));
229
0
            }
230
7.11k
#endif
231
7.11k
            _dirty = false;
232
7.11k
        }
233
7.61k
        RETURN_IF_ERROR(fd_reclaim_func(sync_dir(_path.parent_path())));
234
7.61k
    }
235
236
12.9k
    DBUG_EXECUTE_IF("LocalFileWriter.close.failed", {
237
        // spare '.testfile' to make bad disk checker happy
238
12.9k
        if (_path.filename().compare(kTestFilePath)) {
239
12.9k
            return fd_reclaim_func(
240
12.9k
                    Status::IOError("cannot close {}: {}", _path.native(), std::strerror(errno)));
241
12.9k
        }
242
12.9k
    });
243
244
12.9k
    TEST_SYNC_POINT_RETURN_WITH_VALUE("LocalFileWriter::close", Status::IOError("inject io error"));
245
12.9k
    return fd_reclaim_func(Status::OK());
246
12.9k
}
247
248
} // namespace doris::io