Coverage Report

Created: 2026-07-01 20:24

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/local_file_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/local_file_writer.h"
19
20
#include <butil/iobuf.h>
21
22
// IWYU pragma: no_include <bthread/errno.h>
23
#include <errno.h> // IWYU pragma: keep
24
#include <fcntl.h>
25
#include <glog/logging.h>
26
#include <sys/uio.h>
27
#include <unistd.h>
28
29
#include <algorithm>
30
#include <cstring>
31
#include <ostream>
32
#include <utility>
33
34
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
35
#include "common/cast_set.h"
36
#include "common/compiler_util.h" // IWYU pragma: keep
37
#include "common/macros.h"
38
#include "common/metrics/doris_metrics.h"
39
#include "common/status.h"
40
#include "cpp/sync_point.h"
41
#include "io/fs/err_utils.h"
42
#include "io/fs/file_writer.h"
43
#include "io/fs/local_file_system.h"
44
#include "io/fs/path.h"
45
#include "storage/data_dir.h"
46
#include "util/debug_points.h"
47
48
namespace doris::io {
49
namespace {
50
51
128k
Status sync_dir(const io::Path& dirname) {
52
128k
    TEST_SYNC_POINT_RETURN_WITH_VALUE("sync_dir", Status::IOError(""));
53
128k
    int fd;
54
128k
    RETRY_ON_EINTR(fd, ::open(dirname.c_str(), O_DIRECTORY | O_RDONLY));
55
128k
    if (-1 == fd) {
56
0
        return localfs_error(errno, fmt::format("failed to open {}", dirname.native()));
57
0
    }
58
131k
    Defer defer {[fd] { ::close(fd); }};
59
#ifdef __APPLE__
60
    if (fcntl(fd, F_FULLFSYNC) < 0) {
61
        return localfs_error(errno, fmt::format("failed to sync {}", dirname.native()));
62
    }
63
#else
64
128k
    if (0 != ::fdatasync(fd)) {
65
0
        return localfs_error(errno, fmt::format("failed to sync {}", dirname.native()));
66
0
    }
67
128k
#endif
68
128k
    return Status::OK();
69
128k
}
70
71
} // namespace
72
73
LocalFileWriter::LocalFileWriter(Path path, int fd, bool sync_data)
74
249k
        : _path(std::move(path)), _fd(fd), _sync_data(sync_data) {
75
249k
    DorisMetrics::instance()->local_file_open_writing->increment(1);
76
249k
    DorisMetrics::instance()->local_file_writer_total->increment(1);
77
249k
}
78
79
2.46M
size_t LocalFileWriter::bytes_appended() const {
80
2.46M
    return _bytes_appended;
81
2.46M
}
82
83
244k
LocalFileWriter::~LocalFileWriter() {
84
244k
    if (_state == State::OPENED) {
85
373
        _abort();
86
373
    }
87
244k
    DorisMetrics::instance()->local_file_open_writing->increment(-1);
88
244k
    DorisMetrics::instance()->file_created_total->increment(1);
89
244k
    DorisMetrics::instance()->local_bytes_written_total->increment(_bytes_appended);
90
244k
}
91
92
271k
Status LocalFileWriter::close(bool non_block) {
93
271k
    if (_state == State::CLOSED) {
94
0
        return Status::InternalError("LocalFileWriter already closed, file path {}",
95
0
                                     _path.native());
96
0
    }
97
271k
    if (_state == State::ASYNC_CLOSING) {
98
23.4k
        if (non_block) {
99
0
            return Status::InternalError("Don't submit async close multi times");
100
0
        }
101
        // Actucally the first time call to close(true) would return the value of _finalize, if it returned one
102
        // error status then the code would never call the second close(true)
103
23.4k
        _state = State::CLOSED;
104
23.4k
        return Status::OK();
105
23.4k
    }
106
248k
    if (non_block) {
107
23.5k
        _state = State::ASYNC_CLOSING;
108
224k
    } else {
109
224k
        _state = State::CLOSED;
110
224k
    }
111
248k
    return _close(_sync_data);
112
271k
}
113
114
373
void LocalFileWriter::_abort() {
115
373
    auto st = _close(false);
116
373
    if (!st.ok()) [[unlikely]] {
117
0
        LOG(WARNING) << "close file failed when abort file writer: " << st;
118
0
    }
119
373
    st = io::global_local_filesystem()->delete_file(_path);
120
373
    if (!st.ok()) [[unlikely]] {
121
0
        LOG(WARNING) << "delete file failed when abort file writer: " << st;
122
0
    }
123
373
}
124
125
1.21M
Status LocalFileWriter::appendv(const Slice* data, size_t data_cnt) {
126
1.21M
    TEST_SYNC_POINT_RETURN_WITH_VALUE("LocalFileWriter::appendv",
127
1.21M
                                      Status::IOError("inject io error"));
128
1.21M
    if (_state != State::OPENED) [[unlikely]] {
129
0
        return Status::InternalError("append to closed file: {}", _path.native());
130
0
    }
131
1.21M
    _dirty = true;
132
133
    // Convert the results into the iovec vector to request
134
    // and calculate the total bytes requested.
135
1.21M
    size_t bytes_req = 0;
136
1.21M
    std::vector<iovec> iov(data_cnt);
137
3.58M
    for (size_t i = 0; i < data_cnt; i++) {
138
2.36M
        const Slice& result = data[i];
139
2.36M
        bytes_req += result.size;
140
2.36M
        iov[i] = {result.data, result.size};
141
2.36M
    }
142
143
1.21M
    size_t completed_iov = 0;
144
1.21M
    size_t n_left = bytes_req;
145
1.21M
    while (n_left > 0) {
146
        // Never request more than IOV_MAX in one request.
147
1.20M
        size_t iov_count = std::min(data_cnt - completed_iov, static_cast<size_t>(IOV_MAX));
148
1.20M
        ssize_t res;
149
1.20M
        RETRY_ON_EINTR(res, SYNC_POINT_HOOK_RETURN_VALUE(::writev(_fd, iov.data() + completed_iov,
150
1.20M
                                                                  cast_set<int32_t>(iov_count)),
151
1.20M
                                                         "LocalFileWriter::writev", _fd));
152
1.20M
        DBUG_EXECUTE_IF("LocalFileWriter::appendv.io_error", {
153
1.20M
            auto sub_path = dp->param<std::string>("sub_path", "");
154
1.20M
            if ((sub_path.empty() && _path.filename().compare(kTestFilePath)) ||
155
1.20M
                (!sub_path.empty() && _path.native().find(sub_path) != std::string::npos)) {
156
1.20M
                res = -1;
157
1.20M
                errno = EIO;
158
1.20M
                LOG(WARNING) << Status::IOError("debug write io error: {}", _path.native());
159
1.20M
            }
160
1.20M
        });
161
1.20M
        if (UNLIKELY(res < 0)) {
162
0
            return localfs_error(errno, fmt::format("failed to write {}", _path.native()));
163
0
        }
164
165
1.20M
        if (LIKELY(res == n_left)) {
166
            // All requested bytes were read. This is almost always the case.
167
1.20M
            n_left = 0;
168
1.20M
            break;
169
1.20M
        }
170
        // Adjust iovec vector based on bytes read for the next request.
171
637
        ssize_t bytes_rem = res;
172
637
        for (size_t i = completed_iov; i < data_cnt; i++) {
173
0
            if (bytes_rem >= iov[i].iov_len) {
174
                // The full length of this iovec was written.
175
0
                completed_iov++;
176
0
                bytes_rem -= iov[i].iov_len;
177
0
            } else {
178
                // Partially wrote this result.
179
                // Adjust the iov_len and iov_base to write only the missing data.
180
0
                iov[i].iov_base = static_cast<uint8_t*>(iov[i].iov_base) + bytes_rem;
181
0
                iov[i].iov_len -= bytes_rem;
182
0
                break; // Don't need to adjust remaining iovec's.
183
0
            }
184
0
        }
185
637
        n_left -= res;
186
637
    }
187
1.21M
    DCHECK_EQ(0, n_left);
188
1.21M
    _bytes_appended += bytes_req;
189
1.21M
    return Status::OK();
190
1.21M
}
191
192
0
Status LocalFileWriter::append_iobuf(const butil::IOBuf& data) {
193
0
    TEST_SYNC_POINT_RETURN_WITH_VALUE("LocalFileWriter::append_iobuf",
194
0
                                      Status::IOError("inject io error"));
195
0
    if (_state != State::OPENED) [[unlikely]] {
196
0
        return Status::InternalError("append to closed file: {}", _path.native());
197
0
    }
198
0
    if (data.empty()) {
199
0
        return Status::OK();
200
0
    }
201
0
    _dirty = true;
202
203
0
    butil::IOBuf payload(data);
204
0
    const size_t bytes_req = payload.length();
205
0
    while (!payload.empty()) {
206
0
        const size_t size_hint = payload.length();
207
0
        ssize_t res =
208
0
                SYNC_POINT_HOOK_RETURN_VALUE(payload.cut_into_file_descriptor(_fd, size_hint),
209
0
                                             "LocalFileWriter::cut_into_file_descriptor", _fd);
210
0
        DBUG_EXECUTE_IF("LocalFileWriter::append_iobuf.io_error", {
211
0
            auto sub_path = dp->param<std::string>("sub_path", "");
212
0
            if ((sub_path.empty() && _path.filename().compare(kTestFilePath)) ||
213
0
                (!sub_path.empty() && _path.native().find(sub_path) != std::string::npos)) {
214
0
                res = -1;
215
0
                errno = EIO;
216
0
                LOG(WARNING) << Status::IOError("debug write io error: {}", _path.native());
217
0
            }
218
0
        });
219
0
        if (UNLIKELY(res < 0)) {
220
0
            if (errno == EINTR) {
221
0
                continue;
222
0
            }
223
0
            return localfs_error(errno, fmt::format("failed to write {}", _path.native()));
224
0
        }
225
0
        if (UNLIKELY(res == 0)) {
226
0
            return Status::IOError("failed to write {}, zero bytes written", _path.native());
227
0
        }
228
0
    }
229
0
    _bytes_appended += bytes_req;
230
0
    return Status::OK();
231
0
}
232
233
// TODO(ByteYue): Refactor this function as FileWriter::flush()
234
0
Status LocalFileWriter::_finalize() {
235
0
    TEST_SYNC_POINT_RETURN_WITH_VALUE("LocalFileWriter::finalize",
236
0
                                      Status::IOError("inject io error"));
237
0
    if (_state == State::OPENED) [[unlikely]] {
238
0
        return Status::InternalError("finalize closed file: {}", _path.native());
239
0
    }
240
241
0
    if (_dirty) {
242
0
#if defined(__linux__)
243
0
        int flags = SYNC_FILE_RANGE_WRITE;
244
0
        if (sync_file_range(_fd, 0, 0, flags) < 0) {
245
0
            return localfs_error(errno, fmt::format("failed to finalize {}", _path.native()));
246
0
        }
247
0
#endif
248
0
    }
249
0
    return Status::OK();
250
0
}
251
252
237k
Status LocalFileWriter::_close(bool sync) {
253
374k
    auto fd_reclaim_func = [&](Status st) {
254
374k
        if (_fd > 0 && 0 != ::close(_fd)) {
255
0
            return localfs_error(errno, fmt::format("failed to {}, along with failed to close {}",
256
0
                                                    st, _path.native()));
257
0
        }
258
374k
        _fd = -1;
259
374k
        return st;
260
374k
    };
261
237k
    if (sync && config::sync_file_on_close) {
262
130k
        if (_dirty) {
263
#ifdef __APPLE__
264
            if (fcntl(_fd, F_FULLFSYNC) < 0) [[unlikely]] {
265
                return fd_reclaim_func(
266
                        localfs_error(errno, fmt::format("failed to sync {}", _path.native())));
267
            }
268
#else
269
129k
            if (0 != ::fdatasync(_fd)) [[unlikely]] {
270
0
                return fd_reclaim_func(
271
0
                        localfs_error(errno, fmt::format("failed to sync {}", _path.native())));
272
0
            }
273
129k
#endif
274
129k
            _dirty = false;
275
129k
        }
276
130k
        RETURN_IF_ERROR(fd_reclaim_func(sync_dir(_path.parent_path())));
277
130k
    }
278
279
237k
    DBUG_EXECUTE_IF("LocalFileWriter.close.failed", {
280
        // spare '.testfile' to make bad disk checker happy
281
237k
        if (_path.filename().compare(kTestFilePath)) {
282
237k
            return fd_reclaim_func(
283
237k
                    Status::IOError("cannot close {}: {}", _path.native(), std::strerror(errno)));
284
237k
        }
285
237k
    });
286
287
237k
    TEST_SYNC_POINT_RETURN_WITH_VALUE("LocalFileWriter::close", Status::IOError("inject io error"));
288
237k
    return fd_reclaim_func(Status::OK());
289
237k
}
290
291
} // namespace doris::io