Coverage Report

Created: 2026-07-01 20:24

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/local_file_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/local_file_reader.h"
19
20
#include <bthread/bthread.h>
21
#include <butil/iobuf.h>
22
// IWYU pragma: no_include <bthread/errno.h>
23
#include <bvar/bvar.h>
24
#include <errno.h> // IWYU pragma: keep
25
#include <fmt/format.h>
26
#include <glog/logging.h>
27
#include <unistd.h>
28
29
#include <algorithm>
30
#include <atomic>
31
#include <cstring>
32
#include <string>
33
#include <utility>
34
35
#include "common/compiler_util.h" // IWYU pragma: keep
36
#include "common/metrics/doris_metrics.h"
37
#include "cpp/sync_point.h"
38
#include "io/fs/err_utils.h"
39
#include "runtime/thread_context.h"
40
#include "runtime/workload_management/io_throttle.h"
41
#include "storage/data_dir.h"
42
#include "storage/olap_common.h"
43
#include "storage/options.h"
44
#include "util/async_io.h"
45
#include "util/debug_points.h"
46
#include "util/defer_op.h"
47
48
namespace doris {
49
namespace io {
50
// 1: initing 2: inited 0: before init
51
std::atomic_int BeConfDataDirReader::be_config_data_dir_list_state = 0;
52
53
std::vector<doris::DataDirInfo> BeConfDataDirReader::be_config_data_dir_list;
54
55
void BeConfDataDirReader::get_data_dir_by_file_path(io::Path* file_path,
56
303k
                                                    std::string* data_dir_arg) {
57
303k
    int state = be_config_data_dir_list_state.load(std::memory_order_acquire);
58
303k
    if (state == 0) [[unlikely]] {
59
17.3k
        return;
60
286k
    } else if (state == 1) [[unlikely]] {
61
0
        be_config_data_dir_list_state.wait(1);
62
0
    }
63
64
694k
    for (const auto& data_dir_info : be_config_data_dir_list) {
65
694k
        if (data_dir_info.path.size() >= file_path->string().size()) {
66
182k
            continue;
67
182k
        }
68
512k
        if (file_path->string().compare(0, data_dir_info.path.size(), data_dir_info.path) == 0) {
69
285k
            *data_dir_arg = data_dir_info.path;
70
285k
            break;
71
285k
        }
72
512k
    }
73
286k
}
74
75
void BeConfDataDirReader::init_be_conf_data_dir(
76
        const std::vector<doris::StorePath>& store_paths,
77
        const std::vector<doris::StorePath>& spill_store_paths,
78
6
        const std::vector<doris::CachePath>& cache_paths) {
79
6
    be_config_data_dir_list_state.store(1, std::memory_order_release);
80
6
    Defer defer {[]() {
81
6
        be_config_data_dir_list_state.store(2, std::memory_order_release);
82
6
        be_config_data_dir_list_state.notify_all();
83
6
    }};
84
15
    for (int i = 0; i < store_paths.size(); i++) {
85
9
        DataDirInfo data_dir_info;
86
9
        data_dir_info.path = store_paths[i].path;
87
9
        data_dir_info.storage_medium = store_paths[i].storage_medium;
88
9
        data_dir_info.data_dir_type = DataDirType::OLAP_DATA_DIR;
89
9
        data_dir_info.metric_name = "local_data_dir_" + std::to_string(i);
90
9
        be_config_data_dir_list.push_back(data_dir_info);
91
9
    }
92
93
15
    for (int i = 0; i < spill_store_paths.size(); i++) {
94
9
        doris::DataDirInfo data_dir_info;
95
9
        data_dir_info.path = spill_store_paths[i].path;
96
9
        data_dir_info.storage_medium = spill_store_paths[i].storage_medium;
97
9
        data_dir_info.data_dir_type = doris::DataDirType::SPILL_DISK_DIR;
98
9
        data_dir_info.metric_name = "spill_data_dir_" + std::to_string(i);
99
9
        be_config_data_dir_list.push_back(data_dir_info);
100
9
    }
101
102
11
    for (int i = 0; i < cache_paths.size(); i++) {
103
5
        doris::DataDirInfo data_dir_info;
104
5
        data_dir_info.path = cache_paths[i].path;
105
5
        data_dir_info.storage_medium = TStorageMedium::REMOTE_CACHE;
106
5
        data_dir_info.data_dir_type = doris::DataDirType::DATA_CACHE_DIR;
107
5
        data_dir_info.metric_name = "local_cache_dir_" + std::to_string(i);
108
5
        be_config_data_dir_list.push_back(data_dir_info);
109
5
    }
110
111
6
    std::sort(be_config_data_dir_list.begin(), be_config_data_dir_list.end(),
112
39
              [](const DataDirInfo& a, const DataDirInfo& b) {
113
39
                  return a.path.length() > b.path.length();
114
39
              });
115
6
}
116
117
LocalFileReader::LocalFileReader(Path path, size_t file_size, int fd)
118
304k
        : _fd(fd), _path(std::move(path)), _file_size(file_size) {
119
304k
    _data_dir_path = "";
120
304k
    BeConfDataDirReader::get_data_dir_by_file_path(&_path, &_data_dir_path);
121
304k
    DorisMetrics::instance()->local_file_open_reading->increment(1);
122
304k
    DorisMetrics::instance()->local_file_reader_total->increment(1);
123
304k
}
124
125
293k
LocalFileReader::~LocalFileReader() {
126
293k
    WARN_IF_ERROR(close(), fmt::format("Failed to close file {}", _path.native()));
127
293k
}
128
129
381k
Status LocalFileReader::close() {
130
381k
    bool expected = false;
131
381k
    if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
132
294k
        DorisMetrics::instance()->local_file_open_reading->increment(-1);
133
294k
        int res = -1;
134
294k
        if (bthread_self() == 0) {
135
294k
            res = ::close(_fd);
136
294k
        } else {
137
724
            auto task = [&] { res = ::close(_fd); };
138
724
            AsyncIO::run_task(task, io::FileSystemType::LOCAL);
139
724
        }
140
294k
        if (-1 == res) {
141
0
            std::string err = errno_to_str();
142
0
            return localfs_error(errno, fmt::format("failed to close {}", _path.native()));
143
0
        }
144
294k
        _fd = -1;
145
294k
    }
146
381k
    return Status::OK();
147
381k
}
148
149
Status LocalFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
150
3.67M
                                     const IOContext* /*io_ctx*/) {
151
3.67M
    TEST_SYNC_POINT_RETURN_WITH_VALUE("LocalFileReader::read_at_impl",
152
3.67M
                                      Status::IOError("inject io error"));
153
3.67M
    if (closed()) [[unlikely]] {
154
0
        return Status::InternalError("read closed file: ", _path.native());
155
0
    }
156
157
3.67M
    if (offset > _file_size) {
158
0
        return Status::InternalError(
159
0
                "offset exceeds file size(offset: {}, file size: {}, path: {})", offset, _file_size,
160
0
                _path.native());
161
0
    }
162
3.67M
    size_t bytes_req = result.size;
163
3.67M
    char* to = result.data;
164
3.67M
    bytes_req = std::min(bytes_req, _file_size - offset);
165
3.67M
    *bytes_read = 0;
166
167
3.67M
    LIMIT_LOCAL_SCAN_IO(get_data_dir_path(), bytes_read);
168
169
7.35M
    while (bytes_req != 0) {
170
3.67M
        auto res = SYNC_POINT_HOOK_RETURN_VALUE(::pread(_fd, to, bytes_req, offset),
171
3.67M
                                                "LocalFileReader::pread", _fd, to);
172
3.67M
        DBUG_EXECUTE_IF("LocalFileReader::read_at_impl.io_error", {
173
3.67M
            auto sub_path = dp->param<std::string>("sub_path", "");
174
3.67M
            if ((sub_path.empty() && _path.filename().compare(kTestFilePath)) ||
175
3.67M
                (!sub_path.empty() && _path.native().find(sub_path) != std::string::npos)) {
176
3.67M
                res = -1;
177
3.67M
                errno = EIO;
178
3.67M
                LOG(WARNING) << Status::IOError("debug read io error: {}", _path.native());
179
3.67M
            }
180
3.67M
        });
181
3.67M
        if (UNLIKELY(-1 == res && errno != EINTR)) {
182
3
            return localfs_error(errno, fmt::format("failed to read {}", _path.native()));
183
3
        }
184
3.67M
        if (UNLIKELY(res == 0)) {
185
0
            return Status::InternalError("cannot read from {}: unexpected EOF", _path.native());
186
0
        }
187
3.67M
        if (res > 0) {
188
3.67M
            to += res;
189
3.67M
            offset += res;
190
3.67M
            bytes_req -= res;
191
3.67M
            *bytes_read += res;
192
3.67M
        }
193
3.67M
    }
194
3.67M
    DorisMetrics::instance()->local_bytes_read_total->increment(*bytes_read);
195
3.67M
    return Status::OK();
196
3.67M
}
197
198
Status LocalFileReader::read_at_iobuf_impl(size_t offset, size_t bytes_req, butil::IOBuf* out,
199
0
                                           size_t* bytes_read, const IOContext* /*io_ctx*/) {
200
0
    TEST_SYNC_POINT_RETURN_WITH_VALUE("LocalFileReader::read_at_iobuf_impl",
201
0
                                      Status::IOError("inject io error"));
202
0
    if (out == nullptr || bytes_read == nullptr) {
203
0
        return Status::InvalidArgument("read_at_iobuf requires non-null out and bytes_read");
204
0
    }
205
0
    if (closed()) [[unlikely]] {
206
0
        return Status::InternalError("read closed file: ", _path.native());
207
0
    }
208
209
0
    if (offset > _file_size) {
210
0
        return Status::InternalError(
211
0
                "offset exceeds file size(offset: {}, file size: {}, path: {})", offset, _file_size,
212
0
                _path.native());
213
0
    }
214
0
    bytes_req = std::min(bytes_req, _file_size - offset);
215
0
    *bytes_read = 0;
216
0
    if (bytes_req == 0) {
217
0
        return Status::OK();
218
0
    }
219
220
0
    LIMIT_LOCAL_SCAN_IO(get_data_dir_path(), bytes_read);
221
222
0
    butil::IOPortal portal;
223
0
    while (bytes_req != 0) {
224
0
        ssize_t res =
225
0
                portal.pappend_from_file_descriptor(_fd, static_cast<off_t>(offset), bytes_req);
226
0
        if (UNLIKELY(-1 == res && errno != EINTR)) {
227
0
            return localfs_error(errno, fmt::format("failed to read {}", _path.native()));
228
0
        }
229
0
        if (UNLIKELY(res == 0)) {
230
0
            return Status::InternalError("cannot read from {}: unexpected EOF", _path.native());
231
0
        }
232
0
        if (res > 0) {
233
0
            offset += static_cast<size_t>(res);
234
0
            bytes_req -= static_cast<size_t>(res);
235
0
            *bytes_read += static_cast<size_t>(res);
236
0
        }
237
0
    }
238
0
    out->append(portal);
239
0
    DorisMetrics::instance()->local_bytes_read_total->increment(*bytes_read);
240
0
    return Status::OK();
241
0
}
242
243
} // namespace io
244
} // namespace doris