Coverage Report

Created: 2026-03-15 17:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/broker_file_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/broker_file_writer.h"
19
20
#include <gen_cpp/PaloBrokerService_types.h>
21
#include <gen_cpp/TPaloBrokerService.h>
22
#include <gen_cpp/Types_types.h>
23
#include <thrift/Thrift.h>
24
#include <thrift/protocol/TDebugProtocol.h>
25
#include <thrift/transport/TTransportException.h>
26
27
#include <sstream>
28
29
#include "common/config.h"
30
#include "common/logging.h"
31
#include "io/fs/file_writer.h"
32
#include "runtime/broker_mgr.h"
33
#include "runtime/exec_env.h"
34
#include "util/client_cache.h"
35
36
namespace doris::io {
37
38
BrokerFileWriter::BrokerFileWriter(ExecEnv* env, const TNetworkAddress& broker_address, Path path,
39
                                   TBrokerFD fd)
40
0
        : _env(env), _address(broker_address), _path(std::move(path)), _fd(fd) {}
41
42
0
BrokerFileWriter::~BrokerFileWriter() = default;
43
44
#ifdef BE_TEST
45
0
inline BrokerServiceClientCache* client_cache(ExecEnv* env) {
46
0
    static BrokerServiceClientCache s_client_cache;
47
0
    return &s_client_cache;
48
0
}
49
50
0
inline const std::string& client_id(ExecEnv* env, const TNetworkAddress& addr) {
51
0
    static std::string s_client_id = "doris_unit_test";
52
0
    return s_client_id;
53
0
}
54
#else
55
inline BrokerServiceClientCache* client_cache(ExecEnv* env) {
56
    return env->broker_client_cache();
57
}
58
59
inline const std::string& client_id(ExecEnv* env, const TNetworkAddress& addr) {
60
    return env->broker_mgr()->get_client_id(addr);
61
}
62
#endif
63
64
0
Status BrokerFileWriter::close(bool non_block) {
65
0
    if (_state == State::CLOSED) {
66
0
        return Status::InternalError("BrokerFileWriter already closed, file path {}",
67
0
                                     _path.native());
68
0
    }
69
0
    if (_state == State::ASYNC_CLOSING) {
70
0
        if (non_block) {
71
0
            return Status::InternalError("Don't submit async close multi times");
72
0
        }
73
        // Actucally the first time call to close(true) would return the value of _finalize, if it returned one
74
        // error status then the code would never call the second close(true)
75
0
        _state = State::CLOSED;
76
0
        return Status::OK();
77
0
    }
78
0
    if (non_block) {
79
0
        _state = State::ASYNC_CLOSING;
80
0
    } else {
81
0
        _state = State::CLOSED;
82
0
    }
83
0
    return _close_impl();
84
0
}
85
86
0
Status BrokerFileWriter::_close_impl() {
87
0
    TBrokerCloseWriterRequest request;
88
0
    request.__set_version(TBrokerVersion::VERSION_ONE);
89
0
    request.__set_fd(_fd);
90
0
    VLOG_ROW << "debug: send broker close writer request: "
91
0
             << apache::thrift::ThriftDebugString(request).c_str();
92
93
0
    TBrokerOperationStatus response;
94
0
    try {
95
0
        Status status;
96
        // use 20 second because close may take longer in remote storage, sometimes.
97
        // TODO(cmy): optimize this if necessary.
98
0
        BrokerServiceConnection client(client_cache(_env), _address, 20000, &status);
99
0
        if (!status.ok()) {
100
0
            LOG(WARNING) << "Create broker write client failed. broker=" << _address
101
0
                         << ", status=" << status;
102
0
            return status;
103
0
        }
104
105
0
        try {
106
0
            client->closeWriter(response, request);
107
0
        } catch (apache::thrift::transport::TTransportException& e) {
108
0
            LOG(WARNING) << "Close broker writer failed. broker:" << _address
109
0
                         << " msg:" << e.what();
110
0
            status = client.reopen();
111
0
            if (!status.ok()) {
112
0
                LOG(WARNING) << "Reopen broker writer failed. broker=" << _address
113
0
                             << ", status=" << status;
114
0
                return status;
115
0
            }
116
0
            client->closeWriter(response, request);
117
0
        }
118
0
    } catch (apache::thrift::TException& e) {
119
0
        std::stringstream ss;
120
0
        ss << "Close broker writer failed, broker:" << _address << " msg:" << e.what();
121
0
        LOG(WARNING) << ss.str();
122
0
        return Status::InternalError(ss.str());
123
0
    }
124
125
0
    VLOG_ROW << "debug: send broker close writer response: "
126
0
             << apache::thrift::ThriftDebugString(response).c_str();
127
128
0
    if (response.statusCode != TBrokerOperationStatusCode::OK) {
129
0
        std::stringstream ss;
130
0
        ss << "Close broker writer failed, broker:" << _address << " msg:" << response.message;
131
0
        LOG(WARNING) << ss.str();
132
0
        return Status::InternalError(ss.str());
133
0
    }
134
0
    return Status::OK();
135
0
}
136
137
0
Status BrokerFileWriter::appendv(const Slice* data, size_t data_cnt) {
138
0
    if (_state != State::OPENED) [[unlikely]] {
139
0
        return Status::InternalError("append to closed file: {}", _path.native());
140
0
    }
141
142
0
    for (size_t i = 0; i < data_cnt; i++) {
143
0
        const Slice& result = data[i];
144
0
        size_t left_bytes = result.size;
145
0
        const char* p = result.data;
146
0
        while (left_bytes > 0) {
147
0
            size_t written_bytes = 0;
148
0
            RETURN_IF_ERROR(_write((const uint8_t*)p, left_bytes, &written_bytes));
149
0
            left_bytes -= written_bytes;
150
0
            p += written_bytes;
151
0
        }
152
0
    }
153
0
    return Status::OK();
154
0
}
155
156
Result<FileWriterPtr> BrokerFileWriter::create(ExecEnv* env, const TNetworkAddress& broker_address,
157
                                               const std::map<std::string, std::string>& properties,
158
0
                                               Path path) {
159
0
    TBrokerOpenWriterRequest request;
160
161
0
    request.__set_version(TBrokerVersion::VERSION_ONE);
162
0
    request.__set_path(path);
163
0
    request.__set_openMode(TBrokerOpenMode::APPEND);
164
0
    request.__set_clientId(client_id(env, broker_address));
165
0
    request.__set_properties(properties);
166
167
0
    VLOG_ROW << "debug: send broker open writer request: "
168
0
             << apache::thrift::ThriftDebugString(request).c_str();
169
170
0
    TBrokerOpenWriterResponse response;
171
0
    try {
172
0
        Status status;
173
0
        BrokerServiceConnection client(client_cache(env), broker_address,
174
0
                                       config::thrift_rpc_timeout_ms, &status);
175
0
        if (!status.ok()) {
176
0
            LOG(WARNING) << "Create broker writer client failed. "
177
0
                         << "broker=" << broker_address << ", status=" << status;
178
0
            return ResultError(std::move(status));
179
0
        }
180
181
0
        try {
182
0
            client->openWriter(response, request);
183
0
        } catch (apache::thrift::transport::TTransportException&) {
184
0
            RETURN_IF_ERROR_RESULT(client.reopen());
185
0
            client->openWriter(response, request);
186
0
        }
187
0
    } catch (apache::thrift::TException& e) {
188
0
        std::stringstream ss;
189
0
        ss << "Open broker writer failed, broker:" << broker_address << " failed:" << e.what();
190
0
        LOG(WARNING) << ss.str();
191
0
        return ResultError(Status::RpcError(ss.str()));
192
0
    }
193
194
0
    VLOG_ROW << "debug: send broker open writer response: "
195
0
             << apache::thrift::ThriftDebugString(response).c_str();
196
197
0
    if (response.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
198
0
        std::stringstream ss;
199
0
        ss << "Open broker writer failed, broker:" << broker_address
200
0
           << " failed:" << response.opStatus.message;
201
0
        LOG(WARNING) << ss.str();
202
0
        return ResultError(Status::InternalError(ss.str()));
203
0
    }
204
205
0
    return std::make_unique<BrokerFileWriter>(env, broker_address, std::move(path), response.fd);
206
0
}
207
208
0
Status BrokerFileWriter::_write(const uint8_t* buf, size_t buf_len, size_t* written_bytes) {
209
0
    if (buf_len == 0) {
210
0
        *written_bytes = 0;
211
0
        return Status::OK();
212
0
    }
213
214
0
    TBrokerPWriteRequest request;
215
0
    request.__set_version(TBrokerVersion::VERSION_ONE);
216
0
    request.__set_fd(_fd);
217
0
    request.__set_offset(_cur_offset);
218
0
    request.__set_data(std::string(reinterpret_cast<const char*>(buf), buf_len));
219
220
0
    VLOG_ROW << "debug: send broker pwrite request: "
221
0
             << apache::thrift::ThriftDebugString(request).c_str();
222
223
0
    TBrokerOperationStatus response;
224
0
    try {
225
0
        Status status;
226
0
        BrokerServiceConnection client(client_cache(_env), _address, config::thrift_rpc_timeout_ms,
227
0
                                       &status);
228
0
        if (!status.ok()) {
229
0
            LOG(WARNING) << "Create broker write client failed. "
230
0
                         << "broker=" << _address << ", status=" << status;
231
0
            return status;
232
0
        }
233
234
0
        try {
235
0
            client->pwrite(response, request);
236
0
        } catch (apache::thrift::transport::TTransportException&) {
237
0
            RETURN_IF_ERROR(client.reopen());
238
            // broker server will check write offset, so it is safe to re-try
239
0
            client->pwrite(response, request);
240
0
        }
241
0
    } catch (apache::thrift::TException& e) {
242
0
        std::stringstream ss;
243
0
        ss << "Fail to write to broker, broker:" << _address << " failed:" << e.what();
244
0
        LOG(WARNING) << ss.str();
245
0
        return Status::RpcError(ss.str());
246
0
    }
247
248
0
    VLOG_ROW << "debug: send broker pwrite response: "
249
0
             << apache::thrift::ThriftDebugString(response).c_str();
250
251
0
    if (response.statusCode != TBrokerOperationStatusCode::OK) {
252
0
        std::stringstream ss;
253
0
        ss << "Fail to write to broker, broker:" << _address << " msg:" << response.message;
254
0
        LOG(WARNING) << ss.str();
255
0
        return Status::InternalError(ss.str());
256
0
    }
257
258
0
    *written_bytes = buf_len;
259
0
    _cur_offset += buf_len;
260
261
0
    return Status::OK();
262
0
}
263
264
} // namespace doris::io