be/src/io/fs/broker_file_system.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "io/fs/broker_file_system.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/PaloBrokerService_types.h> |
22 | | #include <gen_cpp/TPaloBrokerService.h> |
23 | | #include <gen_cpp/Types_types.h> |
24 | | #include <glog/logging.h> |
25 | | #include <thrift/Thrift.h> |
26 | | #include <thrift/transport/TTransportException.h> |
27 | | |
28 | | #include <cstddef> |
29 | | // IWYU pragma: no_include <bits/chrono.h> |
30 | | #include <chrono> // IWYU pragma: keep |
31 | | #include <filesystem> |
32 | | #include <ostream> |
33 | | #include <thread> |
34 | | #include <utility> |
35 | | |
36 | | #include "common/config.h" |
37 | | #include "common/status.h" |
38 | | #include "core/custom_allocator.h" |
39 | | #include "io/fs/broker_file_reader.h" |
40 | | #include "io/fs/broker_file_writer.h" |
41 | | #include "io/fs/file_reader.h" |
42 | | #include "io/fs/file_system.h" |
43 | | #include "io/fs/file_writer.h" |
44 | | #include "io/fs/local_file_system.h" |
45 | | #include "runtime/broker_mgr.h" |
46 | | #include "runtime/exec_env.h" |
47 | | #include "util/slice.h" |
48 | | |
49 | | namespace doris::io { |
50 | | |
51 | | #ifdef BE_TEST |
52 | | inline BrokerServiceClientCache* client_cache() { |
53 | | static BrokerServiceClientCache s_client_cache; |
54 | | return &s_client_cache; |
55 | | } |
56 | | |
57 | | inline const std::string& client_id(const TNetworkAddress& addr) { |
58 | | static std::string s_client_id = "doris_unit_test"; |
59 | | return s_client_id; |
60 | | } |
61 | | #else |
62 | 0 | inline BrokerServiceClientCache* client_cache() { |
63 | 0 | return ExecEnv::GetInstance()->broker_client_cache(); |
64 | 0 | } |
65 | | |
66 | 0 | inline const std::string& client_id(const TNetworkAddress& addr) { |
67 | 0 | return ExecEnv::GetInstance()->broker_mgr()->get_client_id(addr); |
68 | 0 | } |
69 | | #endif |
70 | | |
71 | | #ifndef CHECK_BROKER_CLIENT |
72 | | #define CHECK_BROKER_CLIENT(client) \ |
73 | 0 | if (!client || !client->is_alive()) { \ |
74 | 0 | return Status::InternalError("connect to broker failed"); \ |
75 | 0 | } |
76 | | #endif |
77 | | |
78 | | Result<std::shared_ptr<BrokerFileSystem>> BrokerFileSystem::create( |
79 | | const TNetworkAddress& broker_addr, const std::map<std::string, std::string>& broker_prop, |
80 | 0 | std::string id) { |
81 | 0 | std::shared_ptr<BrokerFileSystem> fs( |
82 | 0 | new BrokerFileSystem(broker_addr, broker_prop, std::move(id))); |
83 | 0 | RETURN_IF_ERROR_RESULT(fs->init()); |
84 | 0 | return fs; |
85 | 0 | } |
86 | | |
87 | | BrokerFileSystem::BrokerFileSystem(const TNetworkAddress& broker_addr, |
88 | | const std::map<std::string, std::string>& broker_prop, |
89 | | std::string id) |
90 | 0 | : RemoteFileSystem("", std::move(id), FileSystemType::BROKER), |
91 | 0 | _broker_addr(broker_addr), |
92 | 0 | _broker_prop(broker_prop) {} |
93 | | |
94 | 0 | Status BrokerFileSystem::init() { |
95 | 0 | Status status = Status::OK(); |
96 | 0 | _connection = std::make_shared<BrokerServiceConnection>(client_cache(), _broker_addr, |
97 | 0 | config::thrift_rpc_timeout_ms, &status); |
98 | 0 | return status; |
99 | 0 | } |
100 | | |
101 | | Status BrokerFileSystem::create_file_impl(const Path& path, FileWriterPtr* writer, |
102 | 0 | const FileWriterOptions* opts) { |
103 | 0 | *writer = DORIS_TRY( |
104 | 0 | BrokerFileWriter::create(ExecEnv::GetInstance(), _broker_addr, _broker_prop, path)); |
105 | 0 | return Status::OK(); |
106 | 0 | } |
107 | | |
108 | | Status BrokerFileSystem::open_file_internal(const Path& file, FileReaderSPtr* reader, |
109 | 0 | const FileReaderOptions& opts) { |
110 | 0 | int64_t fsize = opts.file_size; |
111 | 0 | if (fsize <= 0) { |
112 | 0 | RETURN_IF_ERROR(file_size_impl(file, &fsize)); |
113 | 0 | } |
114 | | |
115 | 0 | CHECK_BROKER_CLIENT(_connection); |
116 | 0 | TBrokerOpenReaderRequest request; |
117 | 0 | request.__set_version(TBrokerVersion::VERSION_ONE); |
118 | 0 | request.__set_path(file); |
119 | 0 | request.__set_startOffset(0); |
120 | 0 | request.__set_clientId(client_id(_broker_addr)); |
121 | 0 | request.__set_properties(_broker_prop); |
122 | |
|
123 | 0 | std::unique_ptr<TBrokerOpenReaderResponse> response(new TBrokerOpenReaderResponse()); |
124 | 0 | try { |
125 | 0 | Status status; |
126 | 0 | try { |
127 | 0 | (*_connection)->openReader(*response, request); |
128 | 0 | } catch (apache::thrift::transport::TTransportException&) { |
129 | 0 | std::this_thread::sleep_for(std::chrono::seconds(1)); |
130 | 0 | RETURN_IF_ERROR(_connection->reopen()); |
131 | 0 | (*_connection)->openReader(*response, request); |
132 | 0 | } |
133 | 0 | } catch (apache::thrift::TException& e) { |
134 | 0 | return Status::RpcError("failed to open file {}: {}", file.native(), error_msg(e.what())); |
135 | 0 | } |
136 | | |
137 | 0 | if (response->opStatus.statusCode != TBrokerOperationStatusCode::OK) { |
138 | 0 | return Status::IOError("failed to open file {}: {}", file.native(), |
139 | 0 | error_msg(response->opStatus.message)); |
140 | 0 | } |
141 | 0 | *reader = std::make_shared<BrokerFileReader>(_broker_addr, file, fsize, response->fd, |
142 | 0 | _connection, opts.mtime); |
143 | 0 | return Status::OK(); |
144 | 0 | } |
145 | | |
146 | 0 | Status BrokerFileSystem::create_directory_impl(const Path& /*path*/, bool /*failed_if_exists*/) { |
147 | 0 | return Status::NotSupported("create directory not implemented!"); |
148 | 0 | } |
149 | | |
150 | 0 | Status BrokerFileSystem::delete_file_impl(const Path& file) { |
151 | 0 | CHECK_BROKER_CLIENT(_connection); |
152 | 0 | try { |
153 | | // rm file from remote path |
154 | 0 | TBrokerDeletePathRequest del_req; |
155 | 0 | TBrokerOperationStatus del_rep; |
156 | 0 | del_req.__set_version(TBrokerVersion::VERSION_ONE); |
157 | 0 | del_req.__set_path(file); |
158 | 0 | del_req.__set_properties(_broker_prop); |
159 | |
|
160 | 0 | try { |
161 | 0 | (*_connection)->deletePath(del_rep, del_req); |
162 | 0 | } catch (apache::thrift::transport::TTransportException&) { |
163 | 0 | RETURN_IF_ERROR((*_connection).reopen()); |
164 | 0 | (*_connection)->deletePath(del_rep, del_req); |
165 | 0 | } |
166 | | |
167 | 0 | if (del_rep.statusCode == TBrokerOperationStatusCode::OK) { |
168 | 0 | return Status::OK(); |
169 | 0 | } else { |
170 | 0 | return Status::IOError("failed to delete file {}: {}", file.native(), |
171 | 0 | error_msg(del_rep.message)); |
172 | 0 | } |
173 | 0 | } catch (apache::thrift::TException& e) { |
174 | 0 | return Status::RpcError("failed to delete file {}: {}", file.native(), error_msg(e.what())); |
175 | 0 | } |
176 | 0 | } |
177 | | |
178 | | // Delete all files under path. |
179 | 0 | Status BrokerFileSystem::delete_directory_impl(const Path& dir) { |
180 | 0 | return delete_file_impl(dir); |
181 | 0 | } |
182 | | |
183 | 0 | Status BrokerFileSystem::batch_delete_impl(const std::vector<Path>& files) { |
184 | 0 | for (auto& file : files) { |
185 | 0 | RETURN_IF_ERROR(delete_file_impl(file)); |
186 | 0 | } |
187 | 0 | return Status::OK(); |
188 | 0 | } |
189 | | |
190 | 0 | Status BrokerFileSystem::exists_impl(const Path& path, bool* res) const { |
191 | 0 | CHECK_BROKER_CLIENT(_connection); |
192 | 0 | *res = false; |
193 | 0 | try { |
194 | 0 | TBrokerCheckPathExistRequest check_req; |
195 | 0 | TBrokerCheckPathExistResponse check_rep; |
196 | 0 | check_req.__set_version(TBrokerVersion::VERSION_ONE); |
197 | 0 | check_req.__set_path(path); |
198 | 0 | check_req.__set_properties(_broker_prop); |
199 | |
|
200 | 0 | try { |
201 | 0 | (*_connection)->checkPathExist(check_rep, check_req); |
202 | 0 | } catch (apache::thrift::transport::TTransportException&) { |
203 | 0 | RETURN_IF_ERROR((*_connection).reopen()); |
204 | 0 | (*_connection)->checkPathExist(check_rep, check_req); |
205 | 0 | } |
206 | | |
207 | 0 | if (check_rep.opStatus.statusCode != TBrokerOperationStatusCode::OK) { |
208 | 0 | return Status::IOError("failed to check exist of path {}: {}", path.native(), |
209 | 0 | error_msg(check_rep.opStatus.message)); |
210 | 0 | } else if (!check_rep.isPathExist) { |
211 | 0 | *res = false; |
212 | 0 | return Status::OK(); |
213 | 0 | } else { |
214 | 0 | *res = true; |
215 | 0 | return Status::OK(); |
216 | 0 | } |
217 | 0 | } catch (apache::thrift::TException& e) { |
218 | 0 | return Status::RpcError("failed to check exist of path {}: {}", path.native(), |
219 | 0 | error_msg(e.what())); |
220 | 0 | } |
221 | 0 | } |
222 | | |
223 | 0 | Status BrokerFileSystem::file_size_impl(const Path& path, int64_t* file_size) const { |
224 | 0 | CHECK_BROKER_CLIENT(_connection); |
225 | 0 | try { |
226 | 0 | TBrokerFileSizeRequest req; |
227 | 0 | req.__set_version(TBrokerVersion::VERSION_ONE); |
228 | 0 | req.__set_path(path); |
229 | 0 | req.__set_properties(_broker_prop); |
230 | |
|
231 | 0 | TBrokerFileSizeResponse resp; |
232 | 0 | try { |
233 | 0 | (*_connection)->fileSize(resp, req); |
234 | 0 | } catch (apache::thrift::transport::TTransportException&) { |
235 | 0 | RETURN_IF_ERROR((*_connection).reopen()); |
236 | 0 | (*_connection)->fileSize(resp, req); |
237 | 0 | } |
238 | | |
239 | 0 | if (resp.opStatus.statusCode != TBrokerOperationStatusCode::OK) { |
240 | 0 | return Status::IOError("failed to get file size of path {}: {}", path.native(), |
241 | 0 | error_msg(resp.opStatus.message)); |
242 | 0 | } |
243 | 0 | if (resp.fileSize < 0) { |
244 | 0 | return Status::IOError("failed to get file size of path {}: size is negtive: {}", |
245 | 0 | path.native(), resp.fileSize); |
246 | 0 | } |
247 | 0 | *file_size = resp.fileSize; |
248 | 0 | return Status::OK(); |
249 | 0 | } catch (apache::thrift::TException& e) { |
250 | 0 | return Status::RpcError("failed to get file size of path {}: {}", path.native(), |
251 | 0 | error_msg(e.what())); |
252 | 0 | } |
253 | 0 | } |
254 | | |
255 | | Status BrokerFileSystem::list_impl(const Path& dir, bool only_file, std::vector<FileInfo>* files, |
256 | 0 | bool* exists) { |
257 | 0 | RETURN_IF_ERROR(exists_impl(dir, exists)); |
258 | 0 | if (!(*exists)) { |
259 | 0 | return Status::OK(); |
260 | 0 | } |
261 | 0 | CHECK_BROKER_CLIENT(_connection); |
262 | 0 | Status status = Status::OK(); |
263 | 0 | try { |
264 | | // get existing files from remote path |
265 | 0 | TBrokerListResponse list_rep; |
266 | 0 | TBrokerListPathRequest list_req; |
267 | 0 | list_req.__set_version(TBrokerVersion::VERSION_ONE); |
268 | 0 | list_req.__set_path(dir / "*"); |
269 | 0 | list_req.__set_isRecursive(false); |
270 | 0 | list_req.__set_properties(_broker_prop); |
271 | 0 | list_req.__set_fileNameOnly(true); // we only need file name, not abs path |
272 | |
|
273 | 0 | try { |
274 | 0 | (*_connection)->listPath(list_rep, list_req); |
275 | 0 | } catch (apache::thrift::transport::TTransportException&) { |
276 | 0 | RETURN_IF_ERROR((*_connection).reopen()); |
277 | 0 | (*_connection)->listPath(list_rep, list_req); |
278 | 0 | } |
279 | | |
280 | 0 | if (list_rep.opStatus.statusCode == TBrokerOperationStatusCode::FILE_NOT_FOUND) { |
281 | 0 | LOG(INFO) << "path does not exist: " << dir; |
282 | 0 | *exists = false; |
283 | 0 | return Status::OK(); |
284 | 0 | } else if (list_rep.opStatus.statusCode != TBrokerOperationStatusCode::OK) { |
285 | 0 | return Status::IOError("failed to list dir {}: {}", dir.native(), |
286 | 0 | error_msg(list_rep.opStatus.message)); |
287 | 0 | } |
288 | 0 | LOG(INFO) << "finished to list files from remote path. file num: " << list_rep.files.size(); |
289 | 0 | *exists = true; |
290 | | |
291 | | // split file name and checksum |
292 | 0 | for (const auto& file : list_rep.files) { |
293 | 0 | if (only_file && file.isDir) { |
294 | | // this is not a file |
295 | 0 | continue; |
296 | 0 | } |
297 | 0 | FileInfo file_info; |
298 | 0 | file_info.file_name = file.path; |
299 | 0 | file_info.file_size = file.size; |
300 | 0 | file_info.is_file = !file.isDir; |
301 | 0 | files->emplace_back(std::move(file_info)); |
302 | 0 | } |
303 | |
|
304 | 0 | LOG(INFO) << "finished to split files. valid file num: " << files->size(); |
305 | 0 | } catch (apache::thrift::TException& e) { |
306 | 0 | std::stringstream ss; |
307 | 0 | ss << "failed to list files in remote path: " << dir << ", msg: " << e.what(); |
308 | 0 | return Status::RpcError("failed to list dir {}: {}", dir.native(), error_msg(e.what())); |
309 | 0 | } |
310 | 0 | return status; |
311 | 0 | } |
312 | | |
313 | 0 | Status BrokerFileSystem::rename_impl(const Path& orig_name, const Path& new_name) { |
314 | 0 | CHECK_BROKER_CLIENT(_connection); |
315 | 0 | try { |
316 | 0 | TBrokerOperationStatus op_status; |
317 | 0 | TBrokerRenamePathRequest rename_req; |
318 | 0 | rename_req.__set_version(TBrokerVersion::VERSION_ONE); |
319 | 0 | rename_req.__set_srcPath(orig_name); |
320 | 0 | rename_req.__set_destPath(new_name); |
321 | 0 | rename_req.__set_properties(_broker_prop); |
322 | |
|
323 | 0 | try { |
324 | 0 | (*_connection)->renamePath(op_status, rename_req); |
325 | 0 | } catch (apache::thrift::transport::TTransportException&) { |
326 | 0 | RETURN_IF_ERROR((*_connection).reopen()); |
327 | 0 | (*_connection)->renamePath(op_status, rename_req); |
328 | 0 | } |
329 | | |
330 | 0 | if (op_status.statusCode != TBrokerOperationStatusCode::OK) { |
331 | 0 | return Status::IOError("failed to rename from {} to {}: {}", orig_name.native(), |
332 | 0 | new_name.native(), error_msg(op_status.message)); |
333 | 0 | } |
334 | 0 | } catch (apache::thrift::TException& e) { |
335 | 0 | return Status::RpcError("failed to rename from {} to {}: {}", orig_name.native(), |
336 | 0 | new_name.native(), error_msg(e.what())); |
337 | 0 | } |
338 | | |
339 | 0 | LOG(INFO) << "finished to rename file. orig: " << orig_name << ", new: " << new_name; |
340 | 0 | return Status::OK(); |
341 | 0 | } |
342 | | |
343 | 0 | Status BrokerFileSystem::upload_impl(const Path& local_file, const Path& remote_file) { |
344 | | // 1. open local file for read |
345 | 0 | FileSystemSPtr local_fs = global_local_filesystem(); |
346 | 0 | FileReaderSPtr local_reader = nullptr; |
347 | 0 | RETURN_IF_ERROR(local_fs->open_file(local_file, &local_reader)); |
348 | | |
349 | 0 | int64_t file_len = local_reader->size(); |
350 | 0 | if (file_len == -1) { |
351 | 0 | return Status::IOError("failed to get length of file: {}: {}", local_file.native(), |
352 | 0 | error_msg("")); |
353 | 0 | } |
354 | | |
355 | | // NOTICE: broker writer must be closed before calling rename |
356 | 0 | FileWriterPtr broker_writer = nullptr; |
357 | 0 | RETURN_IF_ERROR(create_file_impl(remote_file, &broker_writer, nullptr)); |
358 | | |
359 | 0 | constexpr size_t buf_sz = 1024 * 1024; |
360 | 0 | char read_buf[buf_sz]; |
361 | 0 | size_t left_len = file_len; |
362 | 0 | size_t read_offset = 0; |
363 | 0 | size_t bytes_read = 0; |
364 | 0 | while (left_len > 0) { |
365 | 0 | size_t read_len = left_len > buf_sz ? buf_sz : left_len; |
366 | 0 | RETURN_IF_ERROR(local_reader->read_at(read_offset, {read_buf, read_len}, &bytes_read)); |
367 | | // write through broker |
368 | 0 | RETURN_IF_ERROR(broker_writer->append({read_buf, read_len})); |
369 | | |
370 | 0 | read_offset += read_len; |
371 | 0 | left_len -= read_len; |
372 | 0 | } |
373 | | |
374 | | // close manually, because we need to check its close status |
375 | 0 | RETURN_IF_ERROR(broker_writer->close()); |
376 | 0 | LOG(INFO) << "finished to write file via broker. file: " << local_file |
377 | 0 | << ", length: " << file_len; |
378 | 0 | return Status::OK(); |
379 | 0 | } |
380 | | |
381 | | Status BrokerFileSystem::batch_upload_impl(const std::vector<Path>& local_files, |
382 | 0 | const std::vector<Path>& remote_files) { |
383 | 0 | DCHECK(local_files.size() == remote_files.size()); |
384 | 0 | for (int i = 0; i < local_files.size(); ++i) { |
385 | 0 | RETURN_IF_ERROR(upload_impl(local_files[i], remote_files[i])); |
386 | 0 | } |
387 | 0 | return Status::OK(); |
388 | 0 | } |
389 | | |
390 | 0 | Status BrokerFileSystem::download_impl(const Path& remote_file, const Path& local_file) { |
391 | | // 1. open remote file for read |
392 | 0 | FileReaderSPtr broker_reader = nullptr; |
393 | 0 | RETURN_IF_ERROR(open_file_internal(remote_file, &broker_reader, FileReaderOptions::DEFAULT)); |
394 | | |
395 | | // 2. remove the existing local file if exist |
396 | 0 | if (std::filesystem::remove(local_file)) { |
397 | 0 | VLOG(2) << "remove the previously exist local file: " << local_file; |
398 | 0 | } |
399 | | |
400 | | // 3. open local file for write |
401 | 0 | FileSystemSPtr local_fs = global_local_filesystem(); |
402 | 0 | FileWriterPtr local_writer = nullptr; |
403 | 0 | RETURN_IF_ERROR(local_fs->create_file(local_file, &local_writer)); |
404 | | |
405 | | // 4. read remote and write to local |
406 | 0 | VLOG(2) << "read remote file: " << remote_file << " to local: " << local_file; |
407 | 0 | constexpr size_t buf_sz = 1024 * 1024; |
408 | 0 | auto read_buf = make_unique_buffer<uint8_t>(buf_sz); |
409 | 0 | size_t cur_offset = 0; |
410 | 0 | while (true) { |
411 | 0 | size_t read_len = 0; |
412 | 0 | Slice file_slice(read_buf.get(), buf_sz); |
413 | 0 | RETURN_IF_ERROR(broker_reader->read_at(cur_offset, file_slice, &read_len)); |
414 | 0 | cur_offset += read_len; |
415 | 0 | if (read_len == 0) { |
416 | 0 | break; |
417 | 0 | } |
418 | | |
419 | 0 | RETURN_IF_ERROR(local_writer->append({read_buf.get(), read_len})); |
420 | 0 | } // file_handler should be closed before calculating checksum |
421 | | |
422 | 0 | return local_writer->close(); |
423 | 0 | } |
424 | | |
425 | 0 | std::string BrokerFileSystem::error_msg(const std::string& err) const { |
426 | 0 | return fmt::format("({}:{}), {}", _broker_addr.hostname, _broker_addr.port, err); |
427 | 0 | } |
428 | | |
429 | | } // namespace doris::io |