be/src/io/fs/local_file_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "io/fs/local_file_reader.h" |
19 | | |
20 | | #include <bthread/bthread.h> |
21 | | // IWYU pragma: no_include <bthread/errno.h> |
22 | | #include <bvar/bvar.h> |
23 | | #include <errno.h> // IWYU pragma: keep |
24 | | #include <fmt/format.h> |
25 | | #include <glog/logging.h> |
26 | | #include <unistd.h> |
27 | | |
28 | | #include <algorithm> |
29 | | #include <atomic> |
30 | | #include <cstring> |
31 | | #include <string> |
32 | | #include <utility> |
33 | | |
34 | | #include "common/compiler_util.h" // IWYU pragma: keep |
35 | | #include "common/metrics/doris_metrics.h" |
36 | | #include "cpp/sync_point.h" |
37 | | #include "io/fs/err_utils.h" |
38 | | #include "runtime/thread_context.h" |
39 | | #include "runtime/workload_management/io_throttle.h" |
40 | | #include "storage/data_dir.h" |
41 | | #include "storage/olap_common.h" |
42 | | #include "storage/options.h" |
43 | | #include "util/async_io.h" |
44 | | #include "util/debug_points.h" |
45 | | #include "util/defer_op.h" |
46 | | |
47 | | namespace doris { |
48 | | namespace io { |
49 | | // 1: initing 2: inited 0: before init |
50 | | std::atomic_int BeConfDataDirReader::be_config_data_dir_list_state = 0; |
51 | | |
52 | | std::vector<doris::DataDirInfo> BeConfDataDirReader::be_config_data_dir_list; |
53 | | |
54 | | void BeConfDataDirReader::get_data_dir_by_file_path(io::Path* file_path, |
55 | 571k | std::string* data_dir_arg) { |
56 | 571k | int state = be_config_data_dir_list_state.load(std::memory_order_acquire); |
57 | 571k | if (state == 0) [[unlikely]] { |
58 | 18.4k | return; |
59 | 553k | } else if (state == 1) [[unlikely]] { |
60 | 0 | be_config_data_dir_list_state.wait(1); |
61 | 0 | } |
62 | | |
63 | 1.62M | for (const auto& data_dir_info : be_config_data_dir_list) { |
64 | 1.62M | if (data_dir_info.path.size() >= file_path->string().size()) { |
65 | 1.87k | continue; |
66 | 1.87k | } |
67 | 1.62M | if (file_path->string().compare(0, data_dir_info.path.size(), data_dir_info.path) == 0) { |
68 | 555k | *data_dir_arg = data_dir_info.path; |
69 | 555k | break; |
70 | 555k | } |
71 | 1.62M | } |
72 | 553k | } |
73 | | |
74 | | void BeConfDataDirReader::init_be_conf_data_dir( |
75 | | const std::vector<doris::StorePath>& store_paths, |
76 | | const std::vector<doris::StorePath>& spill_store_paths, |
77 | 6 | const std::vector<doris::CachePath>& cache_paths) { |
78 | 6 | be_config_data_dir_list_state.store(1, std::memory_order_release); |
79 | 6 | Defer defer {[]() { |
80 | 6 | be_config_data_dir_list_state.store(2, std::memory_order_release); |
81 | 6 | be_config_data_dir_list_state.notify_all(); |
82 | 6 | }}; |
83 | 16 | for (int i = 0; i < store_paths.size(); i++) { |
84 | 10 | DataDirInfo data_dir_info; |
85 | 10 | data_dir_info.path = store_paths[i].path; |
86 | 10 | data_dir_info.storage_medium = store_paths[i].storage_medium; |
87 | 10 | data_dir_info.data_dir_type = DataDirType::OLAP_DATA_DIR; |
88 | 10 | data_dir_info.metric_name = "local_data_dir_" + std::to_string(i); |
89 | 10 | be_config_data_dir_list.push_back(data_dir_info); |
90 | 10 | } |
91 | | |
92 | 16 | for (int i = 0; i < spill_store_paths.size(); i++) { |
93 | 10 | doris::DataDirInfo data_dir_info; |
94 | 10 | data_dir_info.path = spill_store_paths[i].path; |
95 | 10 | data_dir_info.storage_medium = spill_store_paths[i].storage_medium; |
96 | 10 | data_dir_info.data_dir_type = doris::DataDirType::SPILL_DISK_DIR; |
97 | 10 | data_dir_info.metric_name = "spill_data_dir_" + std::to_string(i); |
98 | 10 | be_config_data_dir_list.push_back(data_dir_info); |
99 | 10 | } |
100 | | |
101 | 10 | for (int i = 0; i < cache_paths.size(); i++) { |
102 | 4 | doris::DataDirInfo data_dir_info; |
103 | 4 | data_dir_info.path = cache_paths[i].path; |
104 | 4 | data_dir_info.storage_medium = TStorageMedium::REMOTE_CACHE; |
105 | 4 | data_dir_info.data_dir_type = doris::DataDirType::DATA_CACHE_DIR; |
106 | 4 | data_dir_info.metric_name = "local_cache_dir_" + std::to_string(i); |
107 | 4 | be_config_data_dir_list.push_back(data_dir_info); |
108 | 4 | } |
109 | | |
110 | 6 | std::sort(be_config_data_dir_list.begin(), be_config_data_dir_list.end(), |
111 | 42 | [](const DataDirInfo& a, const DataDirInfo& b) { |
112 | 42 | return a.path.length() > b.path.length(); |
113 | 42 | }); |
114 | 6 | } |
115 | | |
116 | | LocalFileReader::LocalFileReader(Path path, size_t file_size, int fd) |
117 | 574k | : _fd(fd), _path(std::move(path)), _file_size(file_size) { |
118 | 574k | _data_dir_path = ""; |
119 | 574k | BeConfDataDirReader::get_data_dir_by_file_path(&_path, &_data_dir_path); |
120 | 574k | DorisMetrics::instance()->local_file_open_reading->increment(1); |
121 | 574k | DorisMetrics::instance()->local_file_reader_total->increment(1); |
122 | 574k | } |
123 | | |
124 | 574k | LocalFileReader::~LocalFileReader() { |
125 | 574k | WARN_IF_ERROR(close(), fmt::format("Failed to close file {}", _path.native())); |
126 | 574k | } |
127 | | |
128 | 579k | Status LocalFileReader::close() { |
129 | 579k | bool expected = false; |
130 | 579k | if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { |
131 | 574k | DorisMetrics::instance()->local_file_open_reading->increment(-1); |
132 | 574k | int res = -1; |
133 | 574k | if (bthread_self() == 0) { |
134 | 574k | res = ::close(_fd); |
135 | 574k | } else { |
136 | 124 | auto task = [&] { res = ::close(_fd); }; |
137 | 124 | AsyncIO::run_task(task, io::FileSystemType::LOCAL); |
138 | 124 | } |
139 | 574k | if (-1 == res) { |
140 | 0 | std::string err = errno_to_str(); |
141 | 0 | return localfs_error(errno, fmt::format("failed to close {}", _path.native())); |
142 | 0 | } |
143 | 574k | _fd = -1; |
144 | 574k | } |
145 | 579k | return Status::OK(); |
146 | 579k | } |
147 | | |
148 | | Status LocalFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, |
149 | 297k | const IOContext* /*io_ctx*/) { |
150 | 297k | TEST_SYNC_POINT_RETURN_WITH_VALUE("LocalFileReader::read_at_impl", |
151 | 297k | Status::IOError("inject io error")); |
152 | 297k | if (closed()) [[unlikely]] { |
153 | 0 | return Status::InternalError("read closed file: ", _path.native()); |
154 | 0 | } |
155 | | |
156 | 297k | if (offset > _file_size) { |
157 | 0 | return Status::InternalError( |
158 | 0 | "offset exceeds file size(offset: {}, file size: {}, path: {})", offset, _file_size, |
159 | 0 | _path.native()); |
160 | 0 | } |
161 | 297k | size_t bytes_req = result.size; |
162 | 297k | char* to = result.data; |
163 | 297k | bytes_req = std::min(bytes_req, _file_size - offset); |
164 | 297k | *bytes_read = 0; |
165 | | |
166 | 297k | LIMIT_LOCAL_SCAN_IO(get_data_dir_path(), bytes_read); |
167 | | |
168 | 595k | while (bytes_req != 0) { |
169 | 297k | auto res = SYNC_POINT_HOOK_RETURN_VALUE(::pread(_fd, to, bytes_req, offset), |
170 | 297k | "LocalFileReader::pread", _fd, to); |
171 | 297k | DBUG_EXECUTE_IF("LocalFileReader::read_at_impl.io_error", { |
172 | 297k | auto sub_path = dp->param<std::string>("sub_path", ""); |
173 | 297k | if ((sub_path.empty() && _path.filename().compare(kTestFilePath)) || |
174 | 297k | (!sub_path.empty() && _path.native().find(sub_path) != std::string::npos)) { |
175 | 297k | res = -1; |
176 | 297k | errno = EIO; |
177 | 297k | LOG(WARNING) << Status::IOError("debug read io error: {}", _path.native()); |
178 | 297k | } |
179 | 297k | }); |
180 | 297k | if (UNLIKELY(-1 == res && errno != EINTR)) { |
181 | 0 | return localfs_error(errno, fmt::format("failed to read {}", _path.native())); |
182 | 0 | } |
183 | 297k | if (UNLIKELY(res == 0)) { |
184 | 0 | return Status::InternalError("cannot read from {}: unexpected EOF", _path.native()); |
185 | 0 | } |
186 | 298k | if (res > 0) { |
187 | 298k | to += res; |
188 | 298k | offset += res; |
189 | 298k | bytes_req -= res; |
190 | 298k | *bytes_read += res; |
191 | 298k | } |
192 | 297k | } |
193 | 297k | DorisMetrics::instance()->local_bytes_read_total->increment(*bytes_read); |
194 | 297k | return Status::OK(); |
195 | 297k | } |
196 | | |
197 | | } // namespace io |
198 | | } // namespace doris |