Coverage Report

Created: 2026-03-13 03:47

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/broker_file_system.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/broker_file_system.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/PaloBrokerService_types.h>
22
#include <gen_cpp/TPaloBrokerService.h>
23
#include <gen_cpp/Types_types.h>
24
#include <glog/logging.h>
25
#include <thrift/Thrift.h>
26
#include <thrift/transport/TTransportException.h>
27
28
#include <cstddef>
29
// IWYU pragma: no_include <bits/chrono.h>
30
#include <chrono> // IWYU pragma: keep
31
#include <filesystem>
32
#include <ostream>
33
#include <thread>
34
#include <utility>
35
36
#include "common/config.h"
37
#include "common/status.h"
38
#include "core/custom_allocator.h"
39
#include "io/fs/broker_file_reader.h"
40
#include "io/fs/broker_file_writer.h"
41
#include "io/fs/file_reader.h"
42
#include "io/fs/file_system.h"
43
#include "io/fs/file_writer.h"
44
#include "io/fs/local_file_system.h"
45
#include "runtime/broker_mgr.h"
46
#include "runtime/exec_env.h"
47
#include "util/slice.h"
48
49
namespace doris::io {
50
51
#ifdef BE_TEST
52
inline BrokerServiceClientCache* client_cache() {
53
    static BrokerServiceClientCache s_client_cache;
54
    return &s_client_cache;
55
}
56
57
inline const std::string& client_id(const TNetworkAddress& addr) {
58
    static std::string s_client_id = "doris_unit_test";
59
    return s_client_id;
60
}
61
#else
62
0
inline BrokerServiceClientCache* client_cache() {
63
0
    return ExecEnv::GetInstance()->broker_client_cache();
64
0
}
65
66
0
inline const std::string& client_id(const TNetworkAddress& addr) {
67
0
    return ExecEnv::GetInstance()->broker_mgr()->get_client_id(addr);
68
0
}
69
#endif
70
71
#ifndef CHECK_BROKER_CLIENT
72
#define CHECK_BROKER_CLIENT(client)                               \
73
0
    if (!client || !client->is_alive()) {                         \
74
0
        return Status::InternalError("connect to broker failed"); \
75
0
    }
76
#endif
77
78
Result<std::shared_ptr<BrokerFileSystem>> BrokerFileSystem::create(
79
        const TNetworkAddress& broker_addr, const std::map<std::string, std::string>& broker_prop,
80
0
        std::string id) {
81
0
    std::shared_ptr<BrokerFileSystem> fs(
82
0
            new BrokerFileSystem(broker_addr, broker_prop, std::move(id)));
83
0
    RETURN_IF_ERROR_RESULT(fs->init());
84
0
    return fs;
85
0
}
86
87
BrokerFileSystem::BrokerFileSystem(const TNetworkAddress& broker_addr,
88
                                   const std::map<std::string, std::string>& broker_prop,
89
                                   std::string id)
90
0
        : RemoteFileSystem("", std::move(id), FileSystemType::BROKER),
91
0
          _broker_addr(broker_addr),
92
0
          _broker_prop(broker_prop) {}
93
94
0
Status BrokerFileSystem::init() {
95
0
    Status status = Status::OK();
96
0
    _connection = std::make_shared<BrokerServiceConnection>(client_cache(), _broker_addr,
97
0
                                                            config::thrift_rpc_timeout_ms, &status);
98
0
    return status;
99
0
}
100
101
Status BrokerFileSystem::create_file_impl(const Path& path, FileWriterPtr* writer,
102
0
                                          const FileWriterOptions* opts) {
103
0
    *writer = DORIS_TRY(
104
0
            BrokerFileWriter::create(ExecEnv::GetInstance(), _broker_addr, _broker_prop, path));
105
0
    return Status::OK();
106
0
}
107
108
Status BrokerFileSystem::open_file_internal(const Path& file, FileReaderSPtr* reader,
109
0
                                            const FileReaderOptions& opts) {
110
0
    int64_t fsize = opts.file_size;
111
0
    if (fsize <= 0) {
112
0
        RETURN_IF_ERROR(file_size_impl(file, &fsize));
113
0
    }
114
115
0
    CHECK_BROKER_CLIENT(_connection);
116
0
    TBrokerOpenReaderRequest request;
117
0
    request.__set_version(TBrokerVersion::VERSION_ONE);
118
0
    request.__set_path(file);
119
0
    request.__set_startOffset(0);
120
0
    request.__set_clientId(client_id(_broker_addr));
121
0
    request.__set_properties(_broker_prop);
122
123
0
    std::unique_ptr<TBrokerOpenReaderResponse> response(new TBrokerOpenReaderResponse());
124
0
    try {
125
0
        Status status;
126
0
        try {
127
0
            (*_connection)->openReader(*response, request);
128
0
        } catch (apache::thrift::transport::TTransportException&) {
129
0
            std::this_thread::sleep_for(std::chrono::seconds(1));
130
0
            RETURN_IF_ERROR(_connection->reopen());
131
0
            (*_connection)->openReader(*response, request);
132
0
        }
133
0
    } catch (apache::thrift::TException& e) {
134
0
        return Status::RpcError("failed to open file {}: {}", file.native(), error_msg(e.what()));
135
0
    }
136
137
0
    if (response->opStatus.statusCode != TBrokerOperationStatusCode::OK) {
138
0
        return Status::IOError("failed to open file {}: {}", file.native(),
139
0
                               error_msg(response->opStatus.message));
140
0
    }
141
0
    *reader = std::make_shared<BrokerFileReader>(_broker_addr, file, fsize, response->fd,
142
0
                                                 _connection, opts.mtime);
143
0
    return Status::OK();
144
0
}
145
146
0
Status BrokerFileSystem::create_directory_impl(const Path& /*path*/, bool /*failed_if_exists*/) {
147
0
    return Status::NotSupported("create directory not implemented!");
148
0
}
149
150
0
Status BrokerFileSystem::delete_file_impl(const Path& file) {
151
0
    CHECK_BROKER_CLIENT(_connection);
152
0
    try {
153
        // rm file from remote path
154
0
        TBrokerDeletePathRequest del_req;
155
0
        TBrokerOperationStatus del_rep;
156
0
        del_req.__set_version(TBrokerVersion::VERSION_ONE);
157
0
        del_req.__set_path(file);
158
0
        del_req.__set_properties(_broker_prop);
159
160
0
        try {
161
0
            (*_connection)->deletePath(del_rep, del_req);
162
0
        } catch (apache::thrift::transport::TTransportException&) {
163
0
            RETURN_IF_ERROR((*_connection).reopen());
164
0
            (*_connection)->deletePath(del_rep, del_req);
165
0
        }
166
167
0
        if (del_rep.statusCode == TBrokerOperationStatusCode::OK) {
168
0
            return Status::OK();
169
0
        } else {
170
0
            return Status::IOError("failed to delete file {}: {}", file.native(),
171
0
                                   error_msg(del_rep.message));
172
0
        }
173
0
    } catch (apache::thrift::TException& e) {
174
0
        return Status::RpcError("failed to delete file {}: {}", file.native(), error_msg(e.what()));
175
0
    }
176
0
}
177
178
// Delete all files under path.
179
0
Status BrokerFileSystem::delete_directory_impl(const Path& dir) {
180
0
    return delete_file_impl(dir);
181
0
}
182
183
0
Status BrokerFileSystem::batch_delete_impl(const std::vector<Path>& files) {
184
0
    for (auto& file : files) {
185
0
        RETURN_IF_ERROR(delete_file_impl(file));
186
0
    }
187
0
    return Status::OK();
188
0
}
189
190
0
Status BrokerFileSystem::exists_impl(const Path& path, bool* res) const {
191
0
    CHECK_BROKER_CLIENT(_connection);
192
0
    *res = false;
193
0
    try {
194
0
        TBrokerCheckPathExistRequest check_req;
195
0
        TBrokerCheckPathExistResponse check_rep;
196
0
        check_req.__set_version(TBrokerVersion::VERSION_ONE);
197
0
        check_req.__set_path(path);
198
0
        check_req.__set_properties(_broker_prop);
199
200
0
        try {
201
0
            (*_connection)->checkPathExist(check_rep, check_req);
202
0
        } catch (apache::thrift::transport::TTransportException&) {
203
0
            RETURN_IF_ERROR((*_connection).reopen());
204
0
            (*_connection)->checkPathExist(check_rep, check_req);
205
0
        }
206
207
0
        if (check_rep.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
208
0
            return Status::IOError("failed to check exist of path {}: {}", path.native(),
209
0
                                   error_msg(check_rep.opStatus.message));
210
0
        } else if (!check_rep.isPathExist) {
211
0
            *res = false;
212
0
            return Status::OK();
213
0
        } else {
214
0
            *res = true;
215
0
            return Status::OK();
216
0
        }
217
0
    } catch (apache::thrift::TException& e) {
218
0
        return Status::RpcError("failed to check exist of path {}: {}", path.native(),
219
0
                                error_msg(e.what()));
220
0
    }
221
0
}
222
223
0
Status BrokerFileSystem::file_size_impl(const Path& path, int64_t* file_size) const {
224
0
    CHECK_BROKER_CLIENT(_connection);
225
0
    try {
226
0
        TBrokerFileSizeRequest req;
227
0
        req.__set_version(TBrokerVersion::VERSION_ONE);
228
0
        req.__set_path(path);
229
0
        req.__set_properties(_broker_prop);
230
231
0
        TBrokerFileSizeResponse resp;
232
0
        try {
233
0
            (*_connection)->fileSize(resp, req);
234
0
        } catch (apache::thrift::transport::TTransportException&) {
235
0
            RETURN_IF_ERROR((*_connection).reopen());
236
0
            (*_connection)->fileSize(resp, req);
237
0
        }
238
239
0
        if (resp.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
240
0
            return Status::IOError("failed to get file size of path {}: {}", path.native(),
241
0
                                   error_msg(resp.opStatus.message));
242
0
        }
243
0
        if (resp.fileSize < 0) {
244
0
            return Status::IOError("failed to get file size of path {}: size is negtive: {}",
245
0
                                   path.native(), resp.fileSize);
246
0
        }
247
0
        *file_size = resp.fileSize;
248
0
        return Status::OK();
249
0
    } catch (apache::thrift::TException& e) {
250
0
        return Status::RpcError("failed to get file size of path {}: {}", path.native(),
251
0
                                error_msg(e.what()));
252
0
    }
253
0
}
254
255
Status BrokerFileSystem::list_impl(const Path& dir, bool only_file, std::vector<FileInfo>* files,
256
0
                                   bool* exists) {
257
0
    RETURN_IF_ERROR(exists_impl(dir, exists));
258
0
    if (!(*exists)) {
259
0
        return Status::OK();
260
0
    }
261
0
    CHECK_BROKER_CLIENT(_connection);
262
0
    Status status = Status::OK();
263
0
    try {
264
        // get existing files from remote path
265
0
        TBrokerListResponse list_rep;
266
0
        TBrokerListPathRequest list_req;
267
0
        list_req.__set_version(TBrokerVersion::VERSION_ONE);
268
0
        list_req.__set_path(dir / "*");
269
0
        list_req.__set_isRecursive(false);
270
0
        list_req.__set_properties(_broker_prop);
271
0
        list_req.__set_fileNameOnly(true); // we only need file name, not abs path
272
273
0
        try {
274
0
            (*_connection)->listPath(list_rep, list_req);
275
0
        } catch (apache::thrift::transport::TTransportException&) {
276
0
            RETURN_IF_ERROR((*_connection).reopen());
277
0
            (*_connection)->listPath(list_rep, list_req);
278
0
        }
279
280
0
        if (list_rep.opStatus.statusCode == TBrokerOperationStatusCode::FILE_NOT_FOUND) {
281
0
            LOG(INFO) << "path does not exist: " << dir;
282
0
            *exists = false;
283
0
            return Status::OK();
284
0
        } else if (list_rep.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
285
0
            return Status::IOError("failed to list dir {}: {}", dir.native(),
286
0
                                   error_msg(list_rep.opStatus.message));
287
0
        }
288
0
        LOG(INFO) << "finished to list files from remote path. file num: " << list_rep.files.size();
289
0
        *exists = true;
290
291
        // split file name and checksum
292
0
        for (const auto& file : list_rep.files) {
293
0
            if (only_file && file.isDir) {
294
                // this is not a file
295
0
                continue;
296
0
            }
297
0
            FileInfo file_info;
298
0
            file_info.file_name = file.path;
299
0
            file_info.file_size = file.size;
300
0
            file_info.is_file = !file.isDir;
301
0
            files->emplace_back(std::move(file_info));
302
0
        }
303
304
0
        LOG(INFO) << "finished to split files. valid file num: " << files->size();
305
0
    } catch (apache::thrift::TException& e) {
306
0
        std::stringstream ss;
307
0
        ss << "failed to list files in remote path: " << dir << ", msg: " << e.what();
308
0
        return Status::RpcError("failed to list dir {}: {}", dir.native(), error_msg(e.what()));
309
0
    }
310
0
    return status;
311
0
}
312
313
0
Status BrokerFileSystem::rename_impl(const Path& orig_name, const Path& new_name) {
314
0
    CHECK_BROKER_CLIENT(_connection);
315
0
    try {
316
0
        TBrokerOperationStatus op_status;
317
0
        TBrokerRenamePathRequest rename_req;
318
0
        rename_req.__set_version(TBrokerVersion::VERSION_ONE);
319
0
        rename_req.__set_srcPath(orig_name);
320
0
        rename_req.__set_destPath(new_name);
321
0
        rename_req.__set_properties(_broker_prop);
322
323
0
        try {
324
0
            (*_connection)->renamePath(op_status, rename_req);
325
0
        } catch (apache::thrift::transport::TTransportException&) {
326
0
            RETURN_IF_ERROR((*_connection).reopen());
327
0
            (*_connection)->renamePath(op_status, rename_req);
328
0
        }
329
330
0
        if (op_status.statusCode != TBrokerOperationStatusCode::OK) {
331
0
            return Status::IOError("failed to rename from {} to {}: {}", orig_name.native(),
332
0
                                   new_name.native(), error_msg(op_status.message));
333
0
        }
334
0
    } catch (apache::thrift::TException& e) {
335
0
        return Status::RpcError("failed to rename from {} to {}: {}", orig_name.native(),
336
0
                                new_name.native(), error_msg(e.what()));
337
0
    }
338
339
0
    LOG(INFO) << "finished to rename file. orig: " << orig_name << ", new: " << new_name;
340
0
    return Status::OK();
341
0
}
342
343
0
Status BrokerFileSystem::upload_impl(const Path& local_file, const Path& remote_file) {
344
    // 1. open local file for read
345
0
    FileSystemSPtr local_fs = global_local_filesystem();
346
0
    FileReaderSPtr local_reader = nullptr;
347
0
    RETURN_IF_ERROR(local_fs->open_file(local_file, &local_reader));
348
349
0
    int64_t file_len = local_reader->size();
350
0
    if (file_len == -1) {
351
0
        return Status::IOError("failed to get length of file: {}: {}", local_file.native(),
352
0
                               error_msg(""));
353
0
    }
354
355
    // NOTICE: broker writer must be closed before calling rename
356
0
    FileWriterPtr broker_writer = nullptr;
357
0
    RETURN_IF_ERROR(create_file_impl(remote_file, &broker_writer, nullptr));
358
359
0
    constexpr size_t buf_sz = 1024 * 1024;
360
0
    char read_buf[buf_sz];
361
0
    size_t left_len = file_len;
362
0
    size_t read_offset = 0;
363
0
    size_t bytes_read = 0;
364
0
    while (left_len > 0) {
365
0
        size_t read_len = left_len > buf_sz ? buf_sz : left_len;
366
0
        RETURN_IF_ERROR(local_reader->read_at(read_offset, {read_buf, read_len}, &bytes_read));
367
        // write through broker
368
0
        RETURN_IF_ERROR(broker_writer->append({read_buf, read_len}));
369
370
0
        read_offset += read_len;
371
0
        left_len -= read_len;
372
0
    }
373
374
    // close manually, because we need to check its close status
375
0
    RETURN_IF_ERROR(broker_writer->close());
376
0
    LOG(INFO) << "finished to write file via broker. file: " << local_file
377
0
              << ", length: " << file_len;
378
0
    return Status::OK();
379
0
}
380
381
Status BrokerFileSystem::batch_upload_impl(const std::vector<Path>& local_files,
382
0
                                           const std::vector<Path>& remote_files) {
383
0
    DCHECK(local_files.size() == remote_files.size());
384
0
    for (int i = 0; i < local_files.size(); ++i) {
385
0
        RETURN_IF_ERROR(upload_impl(local_files[i], remote_files[i]));
386
0
    }
387
0
    return Status::OK();
388
0
}
389
390
0
Status BrokerFileSystem::download_impl(const Path& remote_file, const Path& local_file) {
391
    // 1. open remote file for read
392
0
    FileReaderSPtr broker_reader = nullptr;
393
0
    RETURN_IF_ERROR(open_file_internal(remote_file, &broker_reader, FileReaderOptions::DEFAULT));
394
395
    // 2. remove the existing local file if exist
396
0
    if (std::filesystem::remove(local_file)) {
397
0
        VLOG(2) << "remove the previously exist local file: " << local_file;
398
0
    }
399
400
    // 3. open local file for write
401
0
    FileSystemSPtr local_fs = global_local_filesystem();
402
0
    FileWriterPtr local_writer = nullptr;
403
0
    RETURN_IF_ERROR(local_fs->create_file(local_file, &local_writer));
404
405
    // 4. read remote and write to local
406
0
    VLOG(2) << "read remote file: " << remote_file << " to local: " << local_file;
407
0
    constexpr size_t buf_sz = 1024 * 1024;
408
0
    auto read_buf = make_unique_buffer<uint8_t>(buf_sz);
409
0
    size_t cur_offset = 0;
410
0
    while (true) {
411
0
        size_t read_len = 0;
412
0
        Slice file_slice(read_buf.get(), buf_sz);
413
0
        RETURN_IF_ERROR(broker_reader->read_at(cur_offset, file_slice, &read_len));
414
0
        cur_offset += read_len;
415
0
        if (read_len == 0) {
416
0
            break;
417
0
        }
418
419
0
        RETURN_IF_ERROR(local_writer->append({read_buf.get(), read_len}));
420
0
    } // file_handler should be closed before calculating checksum
421
422
0
    return local_writer->close();
423
0
}
424
425
0
std::string BrokerFileSystem::error_msg(const std::string& err) const {
426
0
    return fmt::format("({}:{}), {}", _broker_addr.hostname, _broker_addr.port, err);
427
0
}
428
429
} // namespace doris::io