be/src/exec/sink/load_stream_stub.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/sink/load_stream_stub.h" |
19 | | |
20 | | #include <sstream> |
21 | | |
22 | | #include "common/cast_set.h" |
23 | | #include "runtime/query_context.h" |
24 | | #include "storage/rowset/rowset_writer.h" |
25 | | #include "util/brpc_client_cache.h" |
26 | | #include "util/debug_points.h" |
27 | | #include "util/network_util.h" |
28 | | #include "util/thrift_util.h" |
29 | | #include "util/uid_util.h" |
30 | | |
31 | | namespace doris { |
32 | | |
33 | 53.8k | int64_t CloseWaitNotifier::close_wait_version() const { |
34 | 53.8k | return _close_wait_version.load(std::memory_order_acquire); |
35 | 53.8k | } |
36 | | |
37 | 17.7k | void CloseWaitNotifier::wait_for_close_event(int64_t observed_version, int64_t timeout_ms) { |
38 | 17.7k | std::unique_lock<bthread::Mutex> lock(_close_wait_mutex); |
39 | 17.7k | if (observed_version != close_wait_version()) { |
40 | 88 | return; |
41 | 88 | } |
42 | 17.6k | static_cast<void>(_close_wait_cv.wait_for(lock, timeout_ms * 1000)); |
43 | 17.6k | } |
44 | | |
45 | 5.77k | void CloseWaitNotifier::notify_close_wait() { |
46 | 5.77k | _close_wait_version.fetch_add(1, std::memory_order_acq_rel); |
47 | 5.77k | std::lock_guard<bthread::Mutex> lock(_close_wait_mutex); |
48 | 5.77k | _close_wait_cv.notify_all(); |
49 | 5.77k | } |
50 | | |
51 | | int LoadStreamReplyHandler::on_received_messages(brpc::StreamId id, butil::IOBuf* const messages[], |
52 | 5.62k | size_t size) { |
53 | 5.62k | auto stub = _stub.lock(); |
54 | 5.62k | if (!stub) { |
55 | 0 | LOG(WARNING) << "stub is not exist when on_received_messages, " << *this |
56 | 0 | << ", stream_id=" << id; |
57 | 0 | return 0; |
58 | 0 | } |
59 | 11.2k | for (size_t i = 0; i < size; i++) { |
60 | 5.62k | butil::IOBufAsZeroCopyInputStream wrapper(*messages[i]); |
61 | 5.62k | PLoadStreamResponse response; |
62 | 5.62k | response.ParseFromZeroCopyStream(&wrapper); |
63 | | |
64 | 5.62k | if (response.eos()) { |
65 | 5.62k | stub->_is_eos.store(true); |
66 | 5.62k | } |
67 | | |
68 | 5.62k | Status st = Status::create<false>(response.status()); |
69 | | |
70 | 5.62k | std::stringstream ss; |
71 | 5.62k | ss << "on_received_messages, " << *this << ", stream_id=" << id; |
72 | 5.62k | if (response.success_tablet_ids_size() > 0) { |
73 | 2.83k | ss << ", success tablet ids:"; |
74 | 19.1k | for (auto tablet_id : response.success_tablet_ids()) { |
75 | 19.1k | ss << " " << tablet_id; |
76 | 19.1k | } |
77 | 2.83k | std::lock_guard<bthread::Mutex> lock(stub->_success_tablets_mutex); |
78 | 19.1k | for (auto tablet_id : response.success_tablet_ids()) { |
79 | 19.1k | stub->_success_tablets.push_back(tablet_id); |
80 | 19.1k | } |
81 | 2.83k | } |
82 | 5.62k | if (response.failed_tablets_size() > 0) { |
83 | 0 | ss << ", failed tablet ids:"; |
84 | 0 | for (auto pb : response.failed_tablets()) { |
85 | 0 | ss << " " << pb.id() << ":" << Status::create(pb.status()); |
86 | 0 | } |
87 | 0 | std::lock_guard<bthread::Mutex> lock(stub->_failed_tablets_mutex); |
88 | 0 | for (auto pb : response.failed_tablets()) { |
89 | 0 | stub->_failed_tablets.emplace(pb.id(), Status::create(pb.status())); |
90 | 0 | } |
91 | 0 | } |
92 | 5.62k | if (response.tablet_schemas_size() > 0) { |
93 | 4 | ss << ", tablet schema num: " << response.tablet_schemas_size(); |
94 | 4 | std::lock_guard<bthread::Mutex> lock(stub->_schema_mutex); |
95 | 4 | for (const auto& schema : response.tablet_schemas()) { |
96 | 4 | auto tablet_schema = std::make_unique<TabletSchema>(); |
97 | 4 | tablet_schema->init_from_pb(schema.tablet_schema()); |
98 | 4 | stub->_tablet_schema_for_index->emplace(schema.index_id(), |
99 | 4 | std::move(tablet_schema)); |
100 | 4 | stub->_enable_unique_mow_for_index->emplace( |
101 | 4 | schema.index_id(), schema.enable_unique_key_merge_on_write()); |
102 | 4 | } |
103 | 4 | stub->_schema_cv.notify_all(); |
104 | 4 | } |
105 | 5.62k | ss << ", status: " << st; |
106 | 5.62k | LOG(INFO) << ss.str(); |
107 | | |
108 | 5.62k | if (response.tablet_load_rowset_num_infos_size() > 0) { |
109 | 0 | stub->_refresh_back_pressure_version_wait_time(response.tablet_load_rowset_num_infos()); |
110 | 0 | } |
111 | | |
112 | 5.62k | if (response.has_load_stream_profile()) { |
113 | 0 | TRuntimeProfileTree tprofile; |
114 | 0 | const uint8_t* buf = |
115 | 0 | reinterpret_cast<const uint8_t*>(response.load_stream_profile().data()); |
116 | 0 | uint32_t len = cast_set<uint32_t>(response.load_stream_profile().size()); |
117 | 0 | auto status = deserialize_thrift_msg(buf, &len, false, &tprofile); |
118 | 0 | if (status.ok()) { |
119 | | // TODO |
120 | | //_sink->_state->load_channel_profile()->update(tprofile); |
121 | 0 | } else { |
122 | 0 | LOG(WARNING) << "load stream TRuntimeProfileTree deserialize failed, errmsg=" |
123 | 0 | << status; |
124 | 0 | } |
125 | 0 | } |
126 | 5.62k | } |
127 | 5.62k | return 0; |
128 | 5.62k | } |
129 | | |
130 | 5.64k | void LoadStreamReplyHandler::on_closed(brpc::StreamId id) { |
131 | 5.64k | Defer defer {[this]() { delete this; }}; |
132 | 5.64k | LOG(INFO) << "on_closed, " << *this << ", stream_id=" << id; |
133 | 5.64k | auto stub = _stub.lock(); |
134 | 5.64k | if (!stub) { |
135 | 0 | LOG(WARNING) << "stub is not exist when on_closed, " << *this; |
136 | 0 | return; |
137 | 0 | } |
138 | 5.64k | stub->_is_closed.store(true); |
139 | 5.64k | stub->notify_close_wait(); |
140 | 5.64k | } |
141 | | |
142 | 11.2k | inline std::ostream& operator<<(std::ostream& ostr, const LoadStreamReplyHandler& handler) { |
143 | 11.2k | ostr << "LoadStreamReplyHandler load_id=" << UniqueId(handler._load_id) |
144 | 11.2k | << ", dst_id=" << handler._dst_id; |
145 | 11.2k | return ostr; |
146 | 11.2k | } |
147 | | |
148 | | LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, |
149 | | std::shared_ptr<IndexToTabletSchema> schema_map, |
150 | | std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental, |
151 | | std::shared_ptr<CloseWaitNotifier> close_wait_notifier) |
152 | 5.70k | : _load_id(load_id), |
153 | 5.70k | _src_id(src_id), |
154 | 5.70k | _tablet_schema_for_index(schema_map), |
155 | 5.70k | _enable_unique_mow_for_index(mow_map), |
156 | 5.70k | _is_incremental(incremental), |
157 | 5.70k | _close_wait_notifier(std::move(close_wait_notifier)) { |
158 | 5.70k | DCHECK(_close_wait_notifier != nullptr); |
159 | 5.70k | }; |
160 | | |
161 | 5.70k | LoadStreamStub::~LoadStreamStub() { |
162 | 5.70k | if (_is_open.load() && !_is_closed.load()) { |
163 | 0 | auto ret = brpc::StreamClose(_stream_id); |
164 | 0 | LOG(INFO) << *this << " is deconstructed, close " << (ret == 0 ? "success" : "failed"); |
165 | 0 | } |
166 | 5.70k | } |
167 | | |
168 | | // open_load_stream |
169 | | Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache, |
170 | | const NodeInfo& node_info, int64_t txn_id, |
171 | | const OlapTableSchemaParam& schema, |
172 | | const std::vector<PTabletID>& tablets_for_schema, int total_streams, |
173 | 18.4k | int64_t idle_timeout_ms, bool enable_profile) { |
174 | 18.4k | std::unique_lock<bthread::Mutex> lock(_open_mutex); |
175 | 18.4k | if (_is_init.load()) { |
176 | 12.7k | return _status; |
177 | 12.7k | } |
178 | 5.63k | _is_init.store(true); |
179 | 5.63k | _dst_id = node_info.id; |
180 | 5.63k | brpc::StreamOptions opt; |
181 | 5.63k | opt.max_buf_size = cast_set<int>(config::load_stream_max_buf_size); |
182 | 5.63k | opt.idle_timeout_ms = idle_timeout_ms; |
183 | 5.63k | opt.messages_in_batch = config::load_stream_messages_in_batch; |
184 | 5.63k | opt.handler = new LoadStreamReplyHandler(_load_id, _dst_id, shared_from_this()); |
185 | 5.63k | brpc::Controller cntl; |
186 | 5.63k | if (int ret = brpc::StreamCreate(&_stream_id, cntl, &opt)) { |
187 | 0 | delete opt.handler; |
188 | 0 | _status = Status::Error<true>(ret, "Failed to create stream"); |
189 | 0 | return _status; |
190 | 0 | } |
191 | 5.63k | cntl.set_timeout_ms(config::open_load_stream_timeout_ms); |
192 | 5.63k | POpenLoadStreamRequest request; |
193 | 5.63k | *request.mutable_load_id() = _load_id; |
194 | 5.63k | request.set_src_id(_src_id); |
195 | 5.63k | request.set_txn_id(txn_id); |
196 | 5.63k | request.set_enable_profile(enable_profile); |
197 | 5.63k | if (_is_incremental) { |
198 | 0 | request.set_total_streams(0); |
199 | 5.64k | } else if (total_streams > 0) { |
200 | 5.64k | request.set_total_streams(total_streams); |
201 | 18.4E | } else { |
202 | 18.4E | _status = Status::InternalError("total_streams should be greator than 0"); |
203 | 18.4E | return _status; |
204 | 18.4E | } |
205 | 5.64k | request.set_idle_timeout_ms(idle_timeout_ms); |
206 | 5.64k | schema.to_protobuf(request.mutable_schema()); |
207 | 5.64k | for (auto& tablet : tablets_for_schema) { |
208 | 2.87k | *request.add_tablets() = tablet; |
209 | 2.87k | } |
210 | 5.64k | POpenLoadStreamResponse response; |
211 | | // set connection_group "streaming" to distinguish with non-streaming connections |
212 | 5.64k | const auto& stub = client_cache->get_client(node_info.host, node_info.brpc_port); |
213 | 5.64k | if (stub == nullptr) { |
214 | 0 | return Status::InternalError("failed to init brpc client to {}:{}", node_info.host, |
215 | 0 | node_info.brpc_port); |
216 | 0 | } |
217 | 5.64k | stub->open_load_stream(&cntl, &request, &response, nullptr); |
218 | 5.64k | for (const auto& resp : response.tablet_schemas()) { |
219 | 2.87k | auto tablet_schema = std::make_unique<TabletSchema>(); |
220 | 2.87k | tablet_schema->init_from_pb(resp.tablet_schema()); |
221 | 2.87k | _tablet_schema_for_index->emplace(resp.index_id(), std::move(tablet_schema)); |
222 | 2.87k | _enable_unique_mow_for_index->emplace(resp.index_id(), |
223 | 2.87k | resp.enable_unique_key_merge_on_write()); |
224 | 2.87k | } |
225 | 5.64k | if (response.tablet_load_rowset_num_infos_size() > 0) { |
226 | 0 | _refresh_back_pressure_version_wait_time(response.tablet_load_rowset_num_infos()); |
227 | 0 | } |
228 | 5.64k | if (cntl.Failed()) { |
229 | 0 | brpc::StreamClose(_stream_id); |
230 | 0 | _status = Status::InternalError("Failed to connect to backend {}: {}", _dst_id, |
231 | 0 | cntl.ErrorText()); |
232 | 0 | return _status; |
233 | 0 | } |
234 | 5.64k | LOG(INFO) << "open load stream to host=" << node_info.host << ", port=" << node_info.brpc_port |
235 | 5.64k | << ", " << *this; |
236 | 5.64k | _is_open.store(true); |
237 | 5.64k | _status = Status::OK(); |
238 | 5.64k | return _status; |
239 | 5.64k | } |
240 | | |
241 | | // APPEND_DATA |
242 | | Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id, |
243 | | int32_t segment_id, uint64_t offset, std::span<const Slice> data, |
244 | 156k | bool segment_eos, FileType file_type) { |
245 | 156k | if (!_is_open.load()) { |
246 | 0 | add_failed_tablet(tablet_id, _status); |
247 | 0 | return _status; |
248 | 0 | } |
249 | 156k | DBUG_EXECUTE_IF("LoadStreamStub.skip_send_segment", { return Status::OK(); }); |
250 | 156k | PStreamHeader header; |
251 | 156k | header.set_src_id(_src_id); |
252 | 156k | *header.mutable_load_id() = _load_id; |
253 | 156k | header.set_partition_id(partition_id); |
254 | 156k | header.set_index_id(index_id); |
255 | 156k | header.set_tablet_id(tablet_id); |
256 | 156k | header.set_segment_id(segment_id); |
257 | 156k | header.set_segment_eos(segment_eos); |
258 | 156k | header.set_offset(offset); |
259 | 156k | header.set_opcode(doris::PStreamHeader::APPEND_DATA); |
260 | 156k | header.set_file_type(file_type); |
261 | 156k | return _encode_and_send(header, data); |
262 | 156k | } |
263 | | |
264 | | // ADD_SEGMENT |
265 | | Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id, |
266 | 6.49k | int32_t segment_id, const SegmentStatistics& segment_stat) { |
267 | 6.49k | if (!_is_open.load()) { |
268 | 0 | add_failed_tablet(tablet_id, _status); |
269 | 0 | return _status; |
270 | 0 | } |
271 | 6.49k | DBUG_EXECUTE_IF("LoadStreamStub.skip_send_segment", { return Status::OK(); }); |
272 | 6.49k | PStreamHeader header; |
273 | 6.49k | header.set_src_id(_src_id); |
274 | 6.49k | *header.mutable_load_id() = _load_id; |
275 | 6.49k | header.set_partition_id(partition_id); |
276 | 6.49k | header.set_index_id(index_id); |
277 | 6.49k | header.set_tablet_id(tablet_id); |
278 | 6.49k | header.set_segment_id(segment_id); |
279 | 6.49k | header.set_opcode(doris::PStreamHeader::ADD_SEGMENT); |
280 | 6.49k | segment_stat.to_pb(header.mutable_segment_statistics()); |
281 | 6.49k | return _encode_and_send(header); |
282 | 6.49k | } |
283 | | |
284 | | // CLOSE_LOAD |
285 | | Status LoadStreamStub::close_load(const std::vector<PTabletID>& tablets_to_commit, |
286 | 5.62k | int num_incremental_streams) { |
287 | 5.62k | if (!_is_open.load()) { |
288 | 0 | return _status; |
289 | 0 | } |
290 | 5.62k | PStreamHeader header; |
291 | 5.62k | *header.mutable_load_id() = _load_id; |
292 | 5.62k | header.set_src_id(_src_id); |
293 | 5.62k | header.set_opcode(doris::PStreamHeader::CLOSE_LOAD); |
294 | 19.1k | for (const auto& tablet : tablets_to_commit) { |
295 | 19.1k | *header.add_tablets() = tablet; |
296 | 19.1k | } |
297 | 5.62k | header.set_num_incremental_streams(num_incremental_streams); |
298 | 5.62k | _status = _encode_and_send(header); |
299 | 5.62k | if (!_status.ok()) { |
300 | 0 | LOG(WARNING) << "stream " << _stream_id << " close failed: " << _status; |
301 | 0 | return _status; |
302 | 0 | } |
303 | 5.62k | _is_closing.store(true); |
304 | 5.62k | return Status::OK(); |
305 | 5.62k | } |
306 | | |
307 | | // GET_SCHEMA |
308 | 4 | Status LoadStreamStub::get_schema(const std::vector<PTabletID>& tablets) { |
309 | 4 | if (!_is_open.load()) { |
310 | 0 | return _status; |
311 | 0 | } |
312 | 4 | PStreamHeader header; |
313 | 4 | *header.mutable_load_id() = _load_id; |
314 | 4 | header.set_src_id(_src_id); |
315 | 4 | header.set_opcode(doris::PStreamHeader::GET_SCHEMA); |
316 | 4 | std::ostringstream oss; |
317 | 4 | oss << "fetching tablet schema from stream " << _stream_id |
318 | 4 | << ", load id: " << print_id(_load_id) << ", tablet id:"; |
319 | 4 | for (const auto& tablet : tablets) { |
320 | 4 | *header.add_tablets() = tablet; |
321 | 4 | oss << " " << tablet.tablet_id(); |
322 | 4 | } |
323 | 4 | if (tablets.size() == 0) { |
324 | 0 | oss << " none"; |
325 | 0 | } |
326 | 4 | LOG(INFO) << oss.str(); |
327 | 4 | return _encode_and_send(header); |
328 | 4 | } |
329 | | |
330 | | Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, int64_t tablet_id, |
331 | 6.51k | int64_t timeout_ms) { |
332 | 6.51k | if (!_is_open.load()) { |
333 | 0 | return _status; |
334 | 0 | } |
335 | 6.51k | if (_tablet_schema_for_index->contains(index_id)) { |
336 | 6.50k | return Status::OK(); |
337 | 6.50k | } |
338 | 4 | PTabletID tablet; |
339 | 4 | tablet.set_partition_id(partition_id); |
340 | 4 | tablet.set_index_id(index_id); |
341 | 4 | tablet.set_tablet_id(tablet_id); |
342 | 4 | RETURN_IF_ERROR(get_schema({tablet})); |
343 | | |
344 | 4 | MonotonicStopWatch watch; |
345 | 4 | watch.start(); |
346 | 8 | while (!_tablet_schema_for_index->contains(index_id) && |
347 | 8 | watch.elapsed_time() / 1000 / 1000 < timeout_ms) { |
348 | 4 | RETURN_IF_ERROR(check_cancel()); |
349 | 4 | static_cast<void>(wait_for_new_schema(100)); |
350 | 4 | } |
351 | | |
352 | 4 | if (!_tablet_schema_for_index->contains(index_id)) { |
353 | 0 | return Status::TimedOut("timeout to get tablet schema for index {}", index_id); |
354 | 0 | } |
355 | 4 | return Status::OK(); |
356 | 4 | } |
357 | | |
358 | 63.1k | Status LoadStreamStub::close_finish_check(RuntimeState* state, bool* is_closed) { |
359 | 63.1k | DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", DBUG_BLOCK); |
360 | 63.1k | DBUG_EXECUTE_IF("LoadStreamStub::close_finish_check.close_failed", |
361 | 63.1k | { return Status::InternalError("close failed"); }); |
362 | 63.1k | *is_closed = true; |
363 | 63.1k | if (!_is_open.load()) { |
364 | | // we don't need to close wait on non-open streams |
365 | 0 | return Status::OK(); |
366 | 0 | } |
367 | | // If stream is cancelled (e.g., due to connection failure), treat it as closed |
368 | | // to avoid waiting indefinitely for a stream that will never respond. |
369 | 63.1k | if (_is_cancelled.load()) { |
370 | 0 | return check_cancel(); |
371 | 0 | } |
372 | 63.1k | if (state->get_query_ctx()->is_cancelled()) { |
373 | 0 | return state->get_query_ctx()->exec_status(); |
374 | 0 | } |
375 | 63.1k | if (!_is_closing.load()) { |
376 | 12.6k | *is_closed = false; |
377 | 12.6k | return _status; |
378 | 12.6k | } |
379 | 50.4k | if (_is_closed.load()) { |
380 | 36.5k | RETURN_IF_ERROR(check_cancel()); |
381 | 36.5k | if (!_is_eos.load()) { |
382 | 0 | return Status::InternalError("Stream closed without EOS, {}", to_string()); |
383 | 0 | } |
384 | 36.5k | return Status::OK(); |
385 | 36.5k | } |
386 | 13.9k | *is_closed = false; |
387 | 13.9k | return Status::OK(); |
388 | 50.4k | } |
389 | | |
390 | 5.77k | void LoadStreamStub::notify_close_wait() { |
391 | 5.77k | _close_wait_notifier->notify_close_wait(); |
392 | 5.77k | } |
393 | | |
394 | 134 | void LoadStreamStub::cancel(Status reason) { |
395 | 134 | LOG(WARNING) << *this << " is cancelled because of " << reason; |
396 | 134 | if (_is_open.load()) { |
397 | 133 | brpc::StreamClose(_stream_id); |
398 | 133 | } |
399 | 134 | { |
400 | 134 | std::lock_guard<bthread::Mutex> lock(_cancel_mutex); |
401 | 134 | _cancel_st = reason; |
402 | 134 | _is_cancelled.store(true); |
403 | 134 | } |
404 | 134 | _is_closed.store(true); |
405 | 134 | notify_close_wait(); |
406 | 134 | } |
407 | | |
408 | 168k | Status LoadStreamStub::_encode_and_send(PStreamHeader& header, std::span<const Slice> data) { |
409 | 168k | butil::IOBuf buf; |
410 | 168k | size_t header_len = header.ByteSizeLong(); |
411 | 168k | buf.append(reinterpret_cast<uint8_t*>(&header_len), sizeof(header_len)); |
412 | 168k | buf.append(header.SerializeAsString()); |
413 | 168k | size_t data_len = std::transform_reduce(data.begin(), data.end(), 0, std::plus(), |
414 | 500k | [](const Slice& s) { return s.get_size(); }); |
415 | 168k | buf.append(reinterpret_cast<uint8_t*>(&data_len), sizeof(data_len)); |
416 | 500k | for (const auto& slice : data) { |
417 | 500k | buf.append(slice.get_data(), slice.get_size()); |
418 | 500k | } |
419 | 168k | bool eos = header.opcode() == doris::PStreamHeader::CLOSE_LOAD; |
420 | 168k | bool get_schema = header.opcode() == doris::PStreamHeader::GET_SCHEMA; |
421 | 168k | add_bytes_written(buf.size()); |
422 | 168k | return _send_with_buffer(buf, eos || get_schema); |
423 | 168k | } |
424 | | |
425 | 168k | Status LoadStreamStub::_send_with_buffer(butil::IOBuf& buf, bool sync) { |
426 | 168k | butil::IOBuf output; |
427 | 168k | std::unique_lock<decltype(_buffer_mutex)> buffer_lock(_buffer_mutex); |
428 | 168k | _buffer.append(buf); |
429 | 168k | if (!sync && _buffer.size() < config::brpc_streaming_client_batch_bytes) { |
430 | 162k | return Status::OK(); |
431 | 162k | } |
432 | 5.57k | output.swap(_buffer); |
433 | | // acquire send lock while holding buffer lock, to ensure the message order |
434 | 5.57k | std::lock_guard<decltype(_send_mutex)> send_lock(_send_mutex); |
435 | 5.57k | buffer_lock.unlock(); |
436 | 18.4E | VLOG_DEBUG << "send buf size : " << output.size() << ", sync: " << sync; |
437 | 5.57k | auto st = _send_with_retry(output); |
438 | 5.57k | if (!st.ok()) { |
439 | 0 | _handle_failure(output, st); |
440 | 0 | } |
441 | 5.57k | return st; |
442 | 168k | } |
443 | | |
444 | 0 | void LoadStreamStub::_handle_failure(butil::IOBuf& buf, Status st) { |
445 | 0 | while (buf.size() > 0) { |
446 | | // step 1: parse header |
447 | 0 | size_t hdr_len = 0; |
448 | 0 | buf.cutn((void*)&hdr_len, sizeof(size_t)); |
449 | 0 | butil::IOBuf hdr_buf; |
450 | 0 | PStreamHeader hdr; |
451 | 0 | buf.cutn(&hdr_buf, hdr_len); |
452 | 0 | butil::IOBufAsZeroCopyInputStream wrapper(hdr_buf); |
453 | 0 | hdr.ParseFromZeroCopyStream(&wrapper); |
454 | | |
455 | | // step 2: cut data |
456 | 0 | size_t data_len = 0; |
457 | 0 | buf.cutn((void*)&data_len, sizeof(size_t)); |
458 | 0 | butil::IOBuf data_buf; |
459 | 0 | buf.cutn(&data_buf, data_len); |
460 | | |
461 | | // step 3: handle failure |
462 | 0 | switch (hdr.opcode()) { |
463 | 0 | case PStreamHeader::ADD_SEGMENT: |
464 | 0 | case PStreamHeader::APPEND_DATA: { |
465 | 0 | DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.append_data_failed", { |
466 | 0 | add_failed_tablet(hdr.tablet_id(), st); |
467 | 0 | return; |
468 | 0 | }); |
469 | 0 | DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.add_segment_failed", { |
470 | 0 | add_failed_tablet(hdr.tablet_id(), st); |
471 | 0 | return; |
472 | 0 | }); |
473 | 0 | add_failed_tablet(hdr.tablet_id(), st); |
474 | 0 | } break; |
475 | 0 | case PStreamHeader::CLOSE_LOAD: { |
476 | 0 | DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.close_load_failed", { |
477 | 0 | brpc::StreamClose(_stream_id); |
478 | 0 | return; |
479 | 0 | }); |
480 | 0 | brpc::StreamClose(_stream_id); |
481 | 0 | } break; |
482 | 0 | case PStreamHeader::GET_SCHEMA: { |
483 | 0 | DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.get_schema_failed", { |
484 | | // Just log and let wait_for_schema timeout |
485 | 0 | std::ostringstream oss; |
486 | 0 | for (const auto& tablet : hdr.tablets()) { |
487 | 0 | oss << " " << tablet.tablet_id(); |
488 | 0 | } |
489 | 0 | LOG(WARNING) << "failed to send GET_SCHEMA request, tablet_id:" << oss.str() << ", " |
490 | 0 | << *this; |
491 | 0 | return; |
492 | 0 | }); |
493 | | // Just log and let wait_for_schema timeout |
494 | 0 | std::ostringstream oss; |
495 | 0 | for (const auto& tablet : hdr.tablets()) { |
496 | 0 | oss << " " << tablet.tablet_id(); |
497 | 0 | } |
498 | 0 | LOG(WARNING) << "failed to send GET_SCHEMA request, tablet_id:" << oss.str() << ", " |
499 | 0 | << *this; |
500 | 0 | } break; |
501 | 0 | default: |
502 | 0 | LOG(WARNING) << "unexpected stream message " << hdr.opcode() << ", " << *this; |
503 | 0 | DCHECK(false); |
504 | 0 | } |
505 | 0 | } |
506 | 0 | } |
507 | | |
508 | 5.62k | Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) { |
509 | 5.62k | for (;;) { |
510 | 5.62k | RETURN_IF_ERROR(check_cancel()); |
511 | 5.62k | int ret; |
512 | 5.62k | { |
513 | 5.62k | DBUG_EXECUTE_IF("LoadStreamStub._send_with_retry.delay_before_send", { |
514 | 5.62k | int64_t delay_ms = dp->param<int64_t>("delay_ms", 1000); |
515 | 5.62k | bthread_usleep(delay_ms * 1000); |
516 | 5.62k | }); |
517 | 5.62k | brpc::StreamWriteOptions options; |
518 | 5.62k | options.write_in_background = config::enable_brpc_stream_write_background; |
519 | 5.62k | ret = brpc::StreamWrite(_stream_id, buf, &options); |
520 | 5.62k | } |
521 | 5.62k | DBUG_EXECUTE_IF("LoadStreamStub._send_with_retry.stream_write_failed", { ret = EPIPE; }); |
522 | 5.62k | switch (ret) { |
523 | 5.62k | case 0: |
524 | 5.62k | return Status::OK(); |
525 | 0 | case EAGAIN: { |
526 | 0 | const timespec time = butil::seconds_from_now(config::load_stream_eagain_wait_seconds); |
527 | 0 | int wait_ret = brpc::StreamWait(_stream_id, &time); |
528 | 0 | if (wait_ret != 0) { |
529 | 0 | return Status::InternalError("StreamWait failed, err={}, {}", wait_ret, |
530 | 0 | to_string()); |
531 | 0 | } |
532 | 0 | break; |
533 | 0 | } |
534 | 0 | default: |
535 | 0 | return Status::InternalError("StreamWrite failed, err={}, {}", ret, to_string()); |
536 | 5.62k | } |
537 | 5.62k | } |
538 | 5.62k | } |
539 | | |
540 | | void LoadStreamStub::_refresh_back_pressure_version_wait_time( |
541 | | const ::google::protobuf::RepeatedPtrField<::doris::PTabletLoadRowsetInfo>& |
542 | 0 | tablet_load_infos) { |
543 | 0 | int64_t max_rowset_num_gap = 0; |
544 | | // if any one tablet is under high load pressure, we would make the whole procedure |
545 | | // sleep to prevent the corresponding BE return -235 |
546 | 0 | std::for_each( |
547 | 0 | tablet_load_infos.begin(), tablet_load_infos.end(), |
548 | 0 | [&max_rowset_num_gap](auto& load_info) { |
549 | 0 | int64_t cur_rowset_num = load_info.current_rowset_nums(); |
550 | 0 | int64_t high_load_point = load_info.max_config_rowset_nums() * |
551 | 0 | (config::load_back_pressure_version_threshold / 100); |
552 | 0 | DCHECK(cur_rowset_num > high_load_point); |
553 | 0 | max_rowset_num_gap = std::max(max_rowset_num_gap, cur_rowset_num - high_load_point); |
554 | 0 | }); |
555 | | // to slow down the high load pressure |
556 | | // we would use the rowset num gap to calculate one sleep time |
557 | | // for example: |
558 | | // if the max tablet version is 2000, there are 3 BE |
559 | | // A: ==================== 1800 |
560 | | // B: =================== 1700 |
561 | | // C: ================== 1600 |
562 | | // ================== 1600 |
563 | | // ^ |
564 | | // the high load point |
565 | | // then then max gap is 1800 - (max tablet version * config::load_back_pressure_version_threshold / 100) = 200, |
566 | | // we would make the whole send procesure sleep |
567 | | // 1200ms for compaction to be done toe reduce the high pressure |
568 | 0 | auto max_time = config::max_load_back_pressure_version_wait_time_ms; |
569 | 0 | if (UNLIKELY(max_rowset_num_gap > 0)) { |
570 | 0 | _load_back_pressure_version_wait_time_ms.store( |
571 | 0 | std::min(max_rowset_num_gap + 1000, max_time)); |
572 | 0 | LOG(INFO) << "try to back pressure version, wait time(ms): " |
573 | 0 | << _load_back_pressure_version_wait_time_ms << ", load id: " << print_id(_load_id) |
574 | 0 | << ", max_rowset_num_gap: " << max_rowset_num_gap; |
575 | 0 | } |
576 | 0 | } |
577 | | |
578 | 0 | std::string LoadStreamStub::to_string() { |
579 | 0 | std::ostringstream ss; |
580 | 0 | ss << *this; |
581 | 0 | return ss.str(); |
582 | 0 | } |
583 | | |
584 | 5.77k | inline std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub& stub) { |
585 | 5.77k | ostr << "LoadStreamStub load_id=" << print_id(stub._load_id) << ", src_id=" << stub._src_id |
586 | 5.77k | << ", dst_id=" << stub._dst_id << ", stream_id=" << stub._stream_id; |
587 | 5.77k | return ostr; |
588 | 5.77k | } |
589 | | |
590 | | Status LoadStreamStubs::open(BrpcClientCache<PBackendService_Stub>* client_cache, |
591 | | const NodeInfo& node_info, int64_t txn_id, |
592 | | const OlapTableSchemaParam& schema, |
593 | | const std::vector<PTabletID>& tablets_for_schema, int total_streams, |
594 | 9.25k | int64_t idle_timeout_ms, bool enable_profile) { |
595 | 9.25k | bool get_schema = true; |
596 | 9.25k | auto status = Status::OK(); |
597 | 9.25k | bool first_stream = true; |
598 | 18.3k | for (auto& stream : _streams) { |
599 | 18.3k | Status st; |
600 | 18.3k | if (get_schema) { |
601 | 9.24k | st = stream->open(client_cache, node_info, txn_id, schema, tablets_for_schema, |
602 | 9.24k | total_streams, idle_timeout_ms, enable_profile); |
603 | 9.24k | } else { |
604 | 9.15k | st = stream->open(client_cache, node_info, txn_id, schema, {}, total_streams, |
605 | 9.15k | idle_timeout_ms, enable_profile); |
606 | 9.15k | } |
607 | | // Simulate one stream open failure within LoadStreamStubs. |
608 | | // This causes the successfully opened streams to be cancelled, |
609 | | // reproducing the bug where cancelled streams cause close_wait timeout. |
610 | 18.3k | DBUG_EXECUTE_IF("LoadStreamStubs.open.fail_one_stream", { |
611 | 18.3k | if (st.ok() && !first_stream) { |
612 | 18.3k | st = Status::InternalError("Injected stream open failure"); |
613 | 18.3k | } |
614 | 18.3k | }); |
615 | 18.4k | if (st.ok()) { |
616 | 18.4k | get_schema = false; |
617 | 18.4E | } else { |
618 | 18.4E | LOG(WARNING) << "open stream failed: " << st << "; stream: " << *stream; |
619 | 18.4E | status = st; |
620 | | // no break here to try get schema from the rest streams |
621 | 18.4E | } |
622 | 18.3k | first_stream = false; |
623 | 18.3k | } |
624 | | // only mark open when all streams open success |
625 | 9.25k | _open_success.store(status.ok()); |
626 | | // cancel all streams if open failed |
627 | 9.25k | if (!status.ok()) { |
628 | 0 | cancel(status); |
629 | 0 | } |
630 | 9.25k | return status; |
631 | 9.25k | } |
632 | | |
633 | | Status LoadStreamStubs::close_load(const std::vector<PTabletID>& tablets_to_commit, |
634 | 2.87k | int num_incremental_streams) { |
635 | 2.87k | if (!_open_success.load()) { |
636 | 0 | return Status::InternalError("streams not open"); |
637 | 0 | } |
638 | 2.87k | bool first = true; |
639 | 2.87k | auto status = Status::OK(); |
640 | 5.62k | for (auto& stream : _streams) { |
641 | 5.62k | Status st; |
642 | 5.62k | if (first) { |
643 | 2.87k | st = stream->close_load(tablets_to_commit, num_incremental_streams); |
644 | 2.87k | first = false; |
645 | 2.87k | } else { |
646 | 2.74k | st = stream->close_load({}, num_incremental_streams); |
647 | 2.74k | } |
648 | 5.62k | if (!st.ok()) { |
649 | | LOG(WARNING) << "close_load failed: " << st << "; stream: " << *stream; |
650 | 0 | } |
651 | 5.62k | } |
652 | 2.87k | return status; |
653 | 2.87k | } |
654 | | |
655 | | } // namespace doris |