be/src/io/fs/local_file_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "io/fs/local_file_reader.h" |
19 | | |
20 | | #include <bthread/bthread.h> |
21 | | #include <butil/iobuf.h> |
22 | | // IWYU pragma: no_include <bthread/errno.h> |
23 | | #include <bvar/bvar.h> |
24 | | #include <errno.h> // IWYU pragma: keep |
25 | | #include <fmt/format.h> |
26 | | #include <glog/logging.h> |
27 | | #include <unistd.h> |
28 | | |
29 | | #include <algorithm> |
30 | | #include <atomic> |
31 | | #include <cstring> |
32 | | #include <string> |
33 | | #include <utility> |
34 | | |
35 | | #include "common/compiler_util.h" // IWYU pragma: keep |
36 | | #include "common/metrics/doris_metrics.h" |
37 | | #include "cpp/sync_point.h" |
38 | | #include "io/fs/err_utils.h" |
39 | | #include "runtime/thread_context.h" |
40 | | #include "runtime/workload_management/io_throttle.h" |
41 | | #include "storage/data_dir.h" |
42 | | #include "storage/olap_common.h" |
43 | | #include "storage/options.h" |
44 | | #include "util/async_io.h" |
45 | | #include "util/debug_points.h" |
46 | | #include "util/defer_op.h" |
47 | | |
48 | | namespace doris { |
49 | | namespace io { |
50 | | // 1: initing 2: inited 0: before init |
51 | | std::atomic_int BeConfDataDirReader::be_config_data_dir_list_state = 0; |
52 | | |
53 | | std::vector<doris::DataDirInfo> BeConfDataDirReader::be_config_data_dir_list; |
54 | | |
55 | | void BeConfDataDirReader::get_data_dir_by_file_path(io::Path* file_path, |
56 | 15.5k | std::string* data_dir_arg) { |
57 | 15.5k | int state = be_config_data_dir_list_state.load(std::memory_order_acquire); |
58 | 15.5k | if (state == 0) [[unlikely]] { |
59 | 15.5k | return; |
60 | 18.4E | } else if (state == 1) [[unlikely]] { |
61 | 0 | be_config_data_dir_list_state.wait(1); |
62 | 0 | } |
63 | | |
64 | 18.4E | for (const auto& data_dir_info : be_config_data_dir_list) { |
65 | 0 | if (data_dir_info.path.size() >= file_path->string().size()) { |
66 | 0 | continue; |
67 | 0 | } |
68 | 0 | if (file_path->string().compare(0, data_dir_info.path.size(), data_dir_info.path) == 0) { |
69 | 0 | *data_dir_arg = data_dir_info.path; |
70 | 0 | break; |
71 | 0 | } |
72 | 0 | } |
73 | 18.4E | } |
74 | | |
75 | | void BeConfDataDirReader::init_be_conf_data_dir( |
76 | | const std::vector<doris::StorePath>& store_paths, |
77 | | const std::vector<doris::StorePath>& spill_store_paths, |
78 | 0 | const std::vector<doris::CachePath>& cache_paths) { |
79 | 0 | be_config_data_dir_list_state.store(1, std::memory_order_release); |
80 | 0 | Defer defer {[]() { |
81 | 0 | be_config_data_dir_list_state.store(2, std::memory_order_release); |
82 | 0 | be_config_data_dir_list_state.notify_all(); |
83 | 0 | }}; |
84 | 0 | for (int i = 0; i < store_paths.size(); i++) { |
85 | 0 | DataDirInfo data_dir_info; |
86 | 0 | data_dir_info.path = store_paths[i].path; |
87 | 0 | data_dir_info.storage_medium = store_paths[i].storage_medium; |
88 | 0 | data_dir_info.data_dir_type = DataDirType::OLAP_DATA_DIR; |
89 | 0 | data_dir_info.metric_name = "local_data_dir_" + std::to_string(i); |
90 | 0 | be_config_data_dir_list.push_back(data_dir_info); |
91 | 0 | } |
92 | |
|
93 | 0 | for (int i = 0; i < spill_store_paths.size(); i++) { |
94 | 0 | doris::DataDirInfo data_dir_info; |
95 | 0 | data_dir_info.path = spill_store_paths[i].path; |
96 | 0 | data_dir_info.storage_medium = spill_store_paths[i].storage_medium; |
97 | 0 | data_dir_info.data_dir_type = doris::DataDirType::SPILL_DISK_DIR; |
98 | 0 | data_dir_info.metric_name = "spill_data_dir_" + std::to_string(i); |
99 | 0 | be_config_data_dir_list.push_back(data_dir_info); |
100 | 0 | } |
101 | |
|
102 | 0 | for (int i = 0; i < cache_paths.size(); i++) { |
103 | 0 | doris::DataDirInfo data_dir_info; |
104 | 0 | data_dir_info.path = cache_paths[i].path; |
105 | 0 | data_dir_info.storage_medium = TStorageMedium::REMOTE_CACHE; |
106 | 0 | data_dir_info.data_dir_type = doris::DataDirType::DATA_CACHE_DIR; |
107 | 0 | data_dir_info.metric_name = "local_cache_dir_" + std::to_string(i); |
108 | 0 | be_config_data_dir_list.push_back(data_dir_info); |
109 | 0 | } |
110 | |
|
111 | 0 | std::sort(be_config_data_dir_list.begin(), be_config_data_dir_list.end(), |
112 | 0 | [](const DataDirInfo& a, const DataDirInfo& b) { |
113 | 0 | return a.path.length() > b.path.length(); |
114 | 0 | }); |
115 | 0 | } |
116 | | |
117 | | LocalFileReader::LocalFileReader(Path path, size_t file_size, int fd) |
118 | 15.5k | : _fd(fd), _path(std::move(path)), _file_size(file_size) { |
119 | 15.5k | _data_dir_path = ""; |
120 | 15.5k | BeConfDataDirReader::get_data_dir_by_file_path(&_path, &_data_dir_path); |
121 | 15.5k | DorisMetrics::instance()->local_file_open_reading->increment(1); |
122 | 15.5k | DorisMetrics::instance()->local_file_reader_total->increment(1); |
123 | 15.5k | } |
124 | | |
125 | 15.5k | LocalFileReader::~LocalFileReader() { |
126 | 15.5k | WARN_IF_ERROR(close(), fmt::format("Failed to close file {}", _path.native())); |
127 | 15.5k | } |
128 | | |
129 | 18.8k | Status LocalFileReader::close() { |
130 | 18.8k | bool expected = false; |
131 | 18.8k | if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { |
132 | 15.5k | DorisMetrics::instance()->local_file_open_reading->increment(-1); |
133 | 15.5k | int res = -1; |
134 | 15.5k | if (bthread_self() == 0) { |
135 | 15.5k | res = ::close(_fd); |
136 | 15.5k | } else { |
137 | 2 | auto task = [&] { res = ::close(_fd); }; |
138 | 2 | AsyncIO::run_task(task, io::FileSystemType::LOCAL); |
139 | 2 | } |
140 | 15.5k | if (-1 == res) { |
141 | 0 | std::string err = errno_to_str(); |
142 | 0 | return localfs_error(errno, fmt::format("failed to close {}", _path.native())); |
143 | 0 | } |
144 | 15.5k | _fd = -1; |
145 | 15.5k | } |
146 | 18.8k | return Status::OK(); |
147 | 18.8k | } |
148 | | |
149 | | Status LocalFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, |
150 | 327k | const IOContext* /*io_ctx*/) { |
151 | 327k | TEST_SYNC_POINT_RETURN_WITH_VALUE("LocalFileReader::read_at_impl", |
152 | 327k | Status::IOError("inject io error")); |
153 | 327k | if (closed()) [[unlikely]] { |
154 | 0 | return Status::InternalError("read closed file: ", _path.native()); |
155 | 0 | } |
156 | | |
157 | 327k | if (offset > _file_size) { |
158 | 0 | return Status::InternalError( |
159 | 0 | "offset exceeds file size(offset: {}, file size: {}, path: {})", offset, _file_size, |
160 | 0 | _path.native()); |
161 | 0 | } |
162 | 327k | size_t bytes_req = result.size; |
163 | 327k | char* to = result.data; |
164 | 327k | bytes_req = std::min(bytes_req, _file_size - offset); |
165 | 327k | *bytes_read = 0; |
166 | | |
167 | 327k | LIMIT_LOCAL_SCAN_IO(get_data_dir_path(), bytes_read); |
168 | | |
169 | 655k | while (bytes_req != 0) { |
170 | 327k | auto res = SYNC_POINT_HOOK_RETURN_VALUE(::pread(_fd, to, bytes_req, offset), |
171 | 327k | "LocalFileReader::pread", _fd, to); |
172 | 327k | DBUG_EXECUTE_IF("LocalFileReader::read_at_impl.io_error", { |
173 | 327k | auto sub_path = dp->param<std::string>("sub_path", ""); |
174 | 327k | if ((sub_path.empty() && _path.filename().compare(kTestFilePath)) || |
175 | 327k | (!sub_path.empty() && _path.native().find(sub_path) != std::string::npos)) { |
176 | 327k | res = -1; |
177 | 327k | errno = EIO; |
178 | 327k | LOG(WARNING) << Status::IOError("debug read io error: {}", _path.native()); |
179 | 327k | } |
180 | 327k | }); |
181 | 327k | if (UNLIKELY(-1 == res && errno != EINTR)) { |
182 | 1 | return localfs_error(errno, fmt::format("failed to read {}", _path.native())); |
183 | 1 | } |
184 | 327k | if (UNLIKELY(res == 0)) { |
185 | 0 | return Status::InternalError("cannot read from {}: unexpected EOF", _path.native()); |
186 | 0 | } |
187 | 327k | if (res > 0) { |
188 | 327k | to += res; |
189 | 327k | offset += res; |
190 | 327k | bytes_req -= res; |
191 | 327k | *bytes_read += res; |
192 | 327k | } |
193 | 327k | } |
194 | 327k | DorisMetrics::instance()->local_bytes_read_total->increment(*bytes_read); |
195 | 327k | return Status::OK(); |
196 | 327k | } |
197 | | |
198 | | Status LocalFileReader::read_at_iobuf_impl(size_t offset, size_t bytes_req, butil::IOBuf* out, |
199 | 0 | size_t* bytes_read, const IOContext* /*io_ctx*/) { |
200 | 0 | TEST_SYNC_POINT_RETURN_WITH_VALUE("LocalFileReader::read_at_iobuf_impl", |
201 | 0 | Status::IOError("inject io error")); |
202 | 0 | if (out == nullptr || bytes_read == nullptr) { |
203 | 0 | return Status::InvalidArgument("read_at_iobuf requires non-null out and bytes_read"); |
204 | 0 | } |
205 | 0 | if (closed()) [[unlikely]] { |
206 | 0 | return Status::InternalError("read closed file: ", _path.native()); |
207 | 0 | } |
208 | | |
209 | 0 | if (offset > _file_size) { |
210 | 0 | return Status::InternalError( |
211 | 0 | "offset exceeds file size(offset: {}, file size: {}, path: {})", offset, _file_size, |
212 | 0 | _path.native()); |
213 | 0 | } |
214 | 0 | bytes_req = std::min(bytes_req, _file_size - offset); |
215 | 0 | *bytes_read = 0; |
216 | 0 | if (bytes_req == 0) { |
217 | 0 | return Status::OK(); |
218 | 0 | } |
219 | | |
220 | 0 | LIMIT_LOCAL_SCAN_IO(get_data_dir_path(), bytes_read); |
221 | |
|
222 | 0 | butil::IOPortal portal; |
223 | 0 | while (bytes_req != 0) { |
224 | 0 | ssize_t res = |
225 | 0 | portal.pappend_from_file_descriptor(_fd, static_cast<off_t>(offset), bytes_req); |
226 | 0 | if (UNLIKELY(-1 == res && errno != EINTR)) { |
227 | 0 | return localfs_error(errno, fmt::format("failed to read {}", _path.native())); |
228 | 0 | } |
229 | 0 | if (UNLIKELY(res == 0)) { |
230 | 0 | return Status::InternalError("cannot read from {}: unexpected EOF", _path.native()); |
231 | 0 | } |
232 | 0 | if (res > 0) { |
233 | 0 | offset += static_cast<size_t>(res); |
234 | 0 | bytes_req -= static_cast<size_t>(res); |
235 | 0 | *bytes_read += static_cast<size_t>(res); |
236 | 0 | } |
237 | 0 | } |
238 | 0 | out->append(portal); |
239 | 0 | DorisMetrics::instance()->local_bytes_read_total->increment(*bytes_read); |
240 | 0 | return Status::OK(); |
241 | 0 | } |
242 | | |
243 | | } // namespace io |
244 | | } // namespace doris |