Coverage Report

Created: 2026-03-13 09:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/broker_file_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/broker_file_reader.h"
19
20
#include <gen_cpp/TPaloBrokerService.h>
21
#include <string.h>
22
#include <thrift/Thrift.h>
23
#include <thrift/transport/TTransportException.h>
24
// IWYU pragma: no_include <bits/chrono.h>
25
#include <chrono> // IWYU pragma: keep
26
#include <ostream>
27
#include <string>
28
#include <thread>
29
#include <utility>
30
31
#include "common/compiler_util.h" // IWYU pragma: keep
32
#include "common/logging.h"
33
#include "common/metrics/doris_metrics.h"
34
#include "common/status.h"
35
#include "io/fs/broker_file_system.h"
36
37
namespace doris::io {
38
struct IOContext;
39
40
BrokerFileReader::BrokerFileReader(const TNetworkAddress& broker_addr, Path path, size_t file_size,
41
                                   TBrokerFD fd,
42
                                   std::shared_ptr<BrokerServiceConnection> connection,
43
                                   int64_t mtime)
44
0
        : _path(std::move(path)),
45
0
          _file_size(file_size),
46
0
          _broker_addr(broker_addr),
47
0
          _fd(fd),
48
0
          _connection(std::move(connection)),
49
0
          _mtime(mtime) {
50
0
    DorisMetrics::instance()->broker_file_open_reading->increment(1);
51
0
    DorisMetrics::instance()->broker_file_reader_total->increment(1);
52
0
}
53
54
0
BrokerFileReader::~BrokerFileReader() {
55
0
    static_cast<void>(BrokerFileReader::close());
56
0
}
57
58
0
Status BrokerFileReader::close() {
59
0
    bool expected = false;
60
0
    if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
61
0
        if (!_connection->is_alive()) {
62
0
            return Status::InternalError("connect to broker failed");
63
0
        }
64
65
0
        TBrokerCloseReaderRequest request;
66
0
        request.__set_version(TBrokerVersion::VERSION_ONE);
67
0
        request.__set_fd(_fd);
68
69
0
        TBrokerOperationStatus response;
70
0
        try {
71
0
            try {
72
0
                (*_connection)->closeReader(response, request);
73
0
            } catch (apache::thrift::transport::TTransportException&) {
74
0
                std::this_thread::sleep_for(std::chrono::seconds(1));
75
0
                RETURN_IF_ERROR((*_connection).reopen());
76
0
                (*_connection)->closeReader(response, request);
77
0
            }
78
0
        } catch (apache::thrift::TException& e) {
79
0
            std::stringstream ss;
80
0
            ss << "close broker file failed, broker:" << _broker_addr << " failed:" << e.what();
81
0
            return Status::RpcError(ss.str());
82
0
        }
83
84
0
        if (response.statusCode != TBrokerOperationStatusCode::OK) {
85
0
            std::stringstream ss;
86
0
            ss << "close broker file failed, broker:" << _broker_addr
87
0
               << " failed:" << response.message;
88
0
            return Status::InternalError(ss.str());
89
0
        }
90
91
0
        DorisMetrics::instance()->broker_file_open_reading->increment(-1);
92
0
    }
93
0
    return Status::OK();
94
0
}
95
96
Status BrokerFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
97
0
                                      const IOContext* /*io_ctx*/) {
98
0
    if (closed()) [[unlikely]] {
99
0
        return Status::InternalError("read closed file: ", _path.native());
100
0
    }
101
102
0
    size_t bytes_req = result.size;
103
0
    char* to = result.data;
104
0
    *bytes_read = 0;
105
0
    if (UNLIKELY(bytes_req == 0)) {
106
0
        return Status::OK();
107
0
    }
108
109
0
    if (!_connection->is_alive()) {
110
0
        return Status::InternalError("connect to broker failed");
111
0
    }
112
113
0
    TBrokerPReadRequest request;
114
0
    request.__set_version(TBrokerVersion::VERSION_ONE);
115
0
    request.__set_fd(_fd);
116
0
    request.__set_offset(offset);
117
0
    request.__set_length(bytes_req);
118
119
0
    TBrokerReadResponse response;
120
0
    try {
121
0
        VLOG_RPC << "send pread request to broker:" << _broker_addr << " position:" << offset
122
0
                 << ", read bytes length:" << bytes_req;
123
0
        try {
124
0
            (*_connection)->pread(response, request);
125
0
        } catch (apache::thrift::transport::TTransportException& e) {
126
0
            std::this_thread::sleep_for(std::chrono::seconds(1));
127
0
            RETURN_IF_ERROR((*_connection).reopen());
128
0
            LOG(INFO) << "retry reading from broker: " << _broker_addr << ". reason: " << e.what();
129
0
            (*_connection)->pread(response, request);
130
0
        }
131
0
    } catch (apache::thrift::TException& e) {
132
0
        std::stringstream ss;
133
0
        ss << "read broker file failed, broker:" << _broker_addr << " failed:" << e.what();
134
0
        return Status::RpcError(ss.str());
135
0
    }
136
137
0
    if (response.opStatus.statusCode == TBrokerOperationStatusCode::END_OF_FILE) {
138
        // read the end of broker's file
139
0
        return Status::OK();
140
0
    }
141
0
    if (response.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
142
0
        std::stringstream ss;
143
0
        ss << "Open broker reader failed, broker:" << _broker_addr
144
0
           << " failed:" << response.opStatus.message;
145
0
        return Status::InternalError(ss.str());
146
0
    }
147
148
0
    *bytes_read = response.data.size();
149
0
    memcpy(to, response.data.data(), *bytes_read);
150
0
    return Status::OK();
151
0
}
152
153
} // namespace doris::io