be/src/exec/operator/exchange_sink_buffer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/operator/exchange_sink_buffer.h" |
19 | | |
20 | | #include <brpc/controller.h> |
21 | | #include <butil/errno.h> |
22 | | #include <butil/iobuf_inl.h> |
23 | | #include <fmt/format.h> |
24 | | #include <gen_cpp/Types_types.h> |
25 | | #include <gen_cpp/types.pb.h> |
26 | | #include <glog/logging.h> |
27 | | #include <google/protobuf/stubs/callback.h> |
28 | | #include <pdqsort.h> |
29 | | #include <stddef.h> |
30 | | |
31 | | #include <atomic> |
32 | | #include <cstdint> |
33 | | #include <exception> |
34 | | #include <functional> |
35 | | #include <memory> |
36 | | #include <ostream> |
37 | | #include <utility> |
38 | | |
39 | | #include "common/status.h" |
40 | | #include "exec/exchange/vdata_stream_sender.h" |
41 | | #include "exec/operator/exchange_sink_operator.h" |
42 | | #include "exec/pipeline/pipeline_fragment_context.h" |
43 | | #include "runtime/exec_env.h" |
44 | | #include "runtime/thread_context.h" |
45 | | #include "service/backend_options.h" |
46 | | #include "util/defer_op.h" |
47 | | #include "util/proto_util.h" |
48 | | #include "util/time.h" |
49 | | |
50 | | namespace doris { |
51 | | #include "common/compile_check_begin.h" |
52 | | |
53 | 2 | BroadcastPBlockHolder::~BroadcastPBlockHolder() { |
54 | | // lock the parent queue, if the queue could lock success, then return the block |
55 | | // to the queue, to reuse the block |
56 | 2 | std::shared_ptr<BroadcastPBlockHolderMemLimiter> limiter = _parent_creator.lock(); |
57 | 2 | if (limiter != nullptr) { |
58 | 2 | limiter->release(*this); |
59 | 2 | } |
60 | | // If the queue already deconstruted, then release pblock automatically since it |
61 | | // is a unique ptr. |
62 | 2 | } |
63 | | |
64 | 2 | void BroadcastPBlockHolderMemLimiter::acquire(BroadcastPBlockHolder& holder) { |
65 | 2 | std::unique_lock l(_holders_lock); |
66 | 2 | DCHECK(_broadcast_dependency != nullptr); |
67 | 2 | holder.set_parent_creator(shared_from_this()); |
68 | 2 | auto size = holder._pblock->column_values().size(); |
69 | 2 | _total_queue_buffer_size += size; |
70 | 2 | _total_queue_blocks_count++; |
71 | 2 | if (_total_queue_buffer_size >= _total_queue_buffer_size_limit || |
72 | 2 | _total_queue_blocks_count >= _total_queue_blocks_count_limit) { |
73 | 0 | _broadcast_dependency->block(); |
74 | 0 | } |
75 | 2 | } |
76 | | |
77 | 2 | void BroadcastPBlockHolderMemLimiter::release(const BroadcastPBlockHolder& holder) { |
78 | 2 | std::unique_lock l(_holders_lock); |
79 | 2 | DCHECK(_broadcast_dependency != nullptr); |
80 | 2 | auto size = holder._pblock->column_values().size(); |
81 | 2 | _total_queue_buffer_size -= size; |
82 | 2 | _total_queue_blocks_count--; |
83 | 2 | if (_total_queue_buffer_size <= 0) { |
84 | 2 | _broadcast_dependency->set_ready(); |
85 | 2 | } |
86 | 2 | } |
87 | | |
88 | | ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, |
89 | | PlanNodeId node_id, RuntimeState* state, |
90 | | const std::vector<InstanceLoId>& sender_ins_ids) |
91 | 6 | : HasTaskExecutionCtx(state), |
92 | 6 | _queue_capacity(0), |
93 | 6 | _is_failed(false), |
94 | 6 | _query_id(std::move(query_id)), |
95 | 6 | _dest_node_id(dest_node_id), |
96 | 6 | _node_id(node_id), |
97 | 6 | _state(state), |
98 | 6 | _context(state->get_query_ctx()), |
99 | 6 | _exchange_sink_num(sender_ins_ids.size()), |
100 | 6 | _send_multi_blocks(state->query_options().__isset.exchange_multi_blocks_byte_size && |
101 | 6 | state->query_options().exchange_multi_blocks_byte_size > 0) { |
102 | 6 | if (_send_multi_blocks) { |
103 | 6 | _send_multi_blocks_byte_size = state->query_options().exchange_multi_blocks_byte_size; |
104 | 6 | } |
105 | 6 | } |
106 | | |
107 | 0 | void ExchangeSinkBuffer::close() { |
108 | | // Could not clear the queue here, because there maybe a running rpc want to |
109 | | // get a request from the queue, and clear method will release the request |
110 | | // and it will core. |
111 | | //_instance_to_broadcast_package_queue.clear(); |
112 | | //_instance_to_package_queue.clear(); |
113 | | //_instance_to_request.clear(); |
114 | 0 | } |
115 | | |
116 | 14 | void ExchangeSinkBuffer::construct_request(TUniqueId fragment_instance_id) { |
117 | 14 | if (_is_failed) { |
118 | 0 | return; |
119 | 0 | } |
120 | 14 | auto low_id = fragment_instance_id.lo; |
121 | 14 | if (_rpc_instances.contains(low_id)) { |
122 | 0 | return; |
123 | 0 | } |
124 | | |
125 | | // Initialize the instance data |
126 | 14 | auto instance_data = std::make_unique<RpcInstance>(low_id); |
127 | 14 | instance_data->mutex = std::make_unique<std::mutex>(); |
128 | 14 | instance_data->seq = 0; |
129 | 14 | instance_data->package_queue = |
130 | 14 | std::unordered_map<Channel*, std::queue<TransmitInfo, std::list<TransmitInfo>>>(); |
131 | 14 | instance_data->broadcast_package_queue = std::unordered_map< |
132 | 14 | Channel*, std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>>(); |
133 | 14 | _queue_capacity = config::exchg_buffer_queue_capacity_factor * _rpc_instances.size(); |
134 | | |
135 | 14 | PUniqueId finst_id; |
136 | 14 | finst_id.set_hi(fragment_instance_id.hi); |
137 | 14 | finst_id.set_lo(fragment_instance_id.lo); |
138 | | |
139 | 14 | instance_data->rpc_channel_is_idle = true; |
140 | 14 | instance_data->rpc_channel_is_turn_off = false; |
141 | | |
142 | | // Initialize request |
143 | 14 | instance_data->request = std::make_shared<PTransmitDataParams>(); |
144 | 14 | instance_data->request->mutable_finst_id()->CopyFrom(finst_id); |
145 | 14 | instance_data->request->mutable_query_id()->CopyFrom(_query_id); |
146 | 14 | instance_data->request->set_node_id(_dest_node_id); |
147 | 14 | instance_data->running_sink_count = _exchange_sink_num; |
148 | | |
149 | 14 | _rpc_instances[low_id] = std::move(instance_data); |
150 | 14 | } |
151 | | |
152 | 48 | Status ExchangeSinkBuffer::add_block(Channel* channel, TransmitInfo&& request) { |
153 | 48 | if (_is_failed) { |
154 | 9 | return Status::OK(); |
155 | 9 | } |
156 | 39 | auto ins_id = channel->dest_ins_id(); |
157 | 39 | if (!_rpc_instances.contains(ins_id)) { |
158 | 0 | return Status::InternalError("fragment_instance_id {} not do register_sink", |
159 | 0 | print_id(channel->_fragment_instance_id)); |
160 | 0 | } |
161 | 39 | auto& instance_data = *_rpc_instances[ins_id]; |
162 | 39 | if (instance_data.rpc_channel_is_turn_off) { |
163 | 1 | return Status::EndOfFile("receiver eof"); |
164 | 1 | } |
165 | 38 | bool send_now = false; |
166 | 38 | { |
167 | 38 | std::unique_lock<std::mutex> lock(*instance_data.mutex); |
168 | | // Do not have in process rpc, directly send |
169 | 38 | if (instance_data.rpc_channel_is_idle) { |
170 | 14 | send_now = true; |
171 | 14 | instance_data.rpc_channel_is_idle = false; |
172 | 14 | } |
173 | 38 | if (request.block) { |
174 | 38 | RETURN_IF_ERROR( |
175 | 38 | BeExecVersionManager::check_be_exec_version(request.block->be_exec_version())); |
176 | 38 | COUNTER_UPDATE(channel->_parent->memory_used_counter(), request.block->ByteSizeLong()); |
177 | 38 | } |
178 | 38 | instance_data.package_queue[channel].emplace(std::move(request)); |
179 | 38 | _total_queue_size++; |
180 | 38 | if (_total_queue_size > _queue_capacity) { |
181 | 0 | for (auto& dep : _queue_deps) { |
182 | 0 | dep->block(); |
183 | 0 | } |
184 | 0 | } |
185 | 38 | } |
186 | 38 | if (send_now) { |
187 | 14 | RETURN_IF_ERROR(_send_rpc(instance_data)); |
188 | 14 | } |
189 | | |
190 | 38 | return Status::OK(); |
191 | 38 | } |
192 | | |
193 | 0 | Status ExchangeSinkBuffer::add_block(Channel* channel, BroadcastTransmitInfo&& request) { |
194 | 0 | if (_is_failed) { |
195 | 0 | return Status::OK(); |
196 | 0 | } |
197 | 0 | auto ins_id = channel->dest_ins_id(); |
198 | 0 | if (!_rpc_instances.contains(ins_id)) { |
199 | 0 | return Status::InternalError("fragment_instance_id {} not do register_sink", |
200 | 0 | print_id(channel->_fragment_instance_id)); |
201 | 0 | } |
202 | 0 | auto& instance_data = *_rpc_instances[ins_id]; |
203 | 0 | if (instance_data.rpc_channel_is_turn_off) { |
204 | 0 | return Status::EndOfFile("receiver eof"); |
205 | 0 | } |
206 | 0 | bool send_now = false; |
207 | 0 | { |
208 | 0 | std::unique_lock<std::mutex> lock(*instance_data.mutex); |
209 | | // Do not have in process rpc, directly send |
210 | 0 | if (instance_data.rpc_channel_is_idle) { |
211 | 0 | send_now = true; |
212 | 0 | instance_data.rpc_channel_is_idle = false; |
213 | 0 | } |
214 | 0 | if (request.block_holder->get_block()) { |
215 | 0 | RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version( |
216 | 0 | request.block_holder->get_block()->be_exec_version())); |
217 | 0 | } |
218 | 0 | instance_data.broadcast_package_queue[channel].emplace(request); |
219 | 0 | } |
220 | 0 | if (send_now) { |
221 | 0 | RETURN_IF_ERROR(_send_rpc(instance_data)); |
222 | 0 | } |
223 | | |
224 | 0 | return Status::OK(); |
225 | 0 | } |
226 | | |
227 | 41 | Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { |
228 | 41 | std::unique_lock<std::mutex> lock(*(instance_data.mutex)); |
229 | | |
230 | 41 | auto& q_map = instance_data.package_queue; |
231 | 41 | auto& broadcast_q_map = instance_data.broadcast_package_queue; |
232 | | |
233 | 82 | auto find_max_size_queue = [](Channel*& channel, auto& ptr, auto& map) { |
234 | 82 | for (auto& [chan, lists] : map) { |
235 | 81 | if (!ptr) { |
236 | 62 | if (!lists.empty()) { |
237 | 30 | channel = chan; |
238 | 30 | ptr = &lists; |
239 | 30 | } |
240 | 62 | } else { |
241 | 19 | if (ptr->size() < lists.size()) { |
242 | 0 | channel = chan; |
243 | 0 | ptr = &lists; |
244 | 0 | } |
245 | 19 | } |
246 | 81 | } |
247 | 82 | }; exchange_sink_buffer.cpp:_ZZN5doris18ExchangeSinkBuffer9_send_rpcERNS_11RpcInstanceEENK3$_0clIPSt5queueINS_12TransmitInfoENSt7__cxx114listIS6_SaIS6_EEEESt13unordered_mapIPNS_7ChannelESB_St4hashISF_ESt8equal_toISF_ESaISt4pairIKSF_SB_EEEEEDaRSF_RT_RT0_ Line | Count | Source | 233 | 41 | auto find_max_size_queue = [](Channel*& channel, auto& ptr, auto& map) { | 234 | 81 | for (auto& [chan, lists] : map) { | 235 | 81 | if (!ptr) { | 236 | 62 | if (!lists.empty()) { | 237 | 30 | channel = chan; | 238 | 30 | ptr = &lists; | 239 | 30 | } | 240 | 62 | } else { | 241 | 19 | if (ptr->size() < lists.size()) { | 242 | 0 | channel = chan; | 243 | 0 | ptr = &lists; | 244 | 0 | } | 245 | 19 | } | 246 | 81 | } | 247 | 41 | }; |
exchange_sink_buffer.cpp:_ZZN5doris18ExchangeSinkBuffer9_send_rpcERNS_11RpcInstanceEENK3$_0clIPSt5queueINS_21BroadcastTransmitInfoENSt7__cxx114listIS6_SaIS6_EEEESt13unordered_mapIPNS_7ChannelESB_St4hashISF_ESt8equal_toISF_ESaISt4pairIKSF_SB_EEEEEDaRSF_RT_RT0_ Line | Count | Source | 233 | 41 | auto find_max_size_queue = [](Channel*& channel, auto& ptr, auto& map) { | 234 | 41 | for (auto& [chan, lists] : map) { | 235 | 0 | if (!ptr) { | 236 | 0 | if (!lists.empty()) { | 237 | 0 | channel = chan; | 238 | 0 | ptr = &lists; | 239 | 0 | } | 240 | 0 | } else { | 241 | 0 | if (ptr->size() < lists.size()) { | 242 | 0 | channel = chan; | 243 | 0 | ptr = &lists; | 244 | 0 | } | 245 | 0 | } | 246 | 0 | } | 247 | 41 | }; |
|
248 | | |
249 | 41 | Channel* channel = nullptr; |
250 | | |
251 | 41 | std::queue<TransmitInfo, std::list<TransmitInfo>>* q_ptr = nullptr; |
252 | 41 | find_max_size_queue(channel, q_ptr, q_map); |
253 | 41 | std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>* broadcast_q_ptr = nullptr; |
254 | 41 | find_max_size_queue(channel, broadcast_q_ptr, broadcast_q_map); |
255 | | |
256 | 41 | if (_is_failed) { |
257 | 2 | _turn_off_channel(instance_data, lock); |
258 | 2 | return Status::OK(); |
259 | 2 | } |
260 | 39 | if (instance_data.rpc_channel_is_turn_off) { |
261 | 6 | return Status::OK(); |
262 | 6 | } |
263 | | |
264 | 33 | auto mem_byte = 0; |
265 | 33 | if (q_ptr && !q_ptr->empty()) { |
266 | 28 | auto& q = *q_ptr; |
267 | | |
268 | 28 | std::vector<TransmitInfo> requests(_send_multi_blocks ? q.size() : 1); |
269 | 56 | for (int i = 0; i < requests.size(); i++) { |
270 | 28 | requests[i] = std::move(q.front()); |
271 | 28 | q.pop(); |
272 | | |
273 | 28 | if (requests[i].block) { |
274 | | // make sure rpc byte size under the _send_multi_blocks_bytes_size |
275 | 28 | mem_byte += requests[i].block->ByteSizeLong(); |
276 | 28 | if (_send_multi_blocks && mem_byte > _send_multi_blocks_byte_size) { |
277 | 0 | requests.resize(i + 1); |
278 | 0 | break; |
279 | 0 | } |
280 | 28 | } |
281 | 28 | } |
282 | | |
283 | | // If we have data to shuffle which is not broadcasted |
284 | 28 | auto& request = requests[0]; |
285 | 28 | auto& brpc_request = instance_data.request; |
286 | 28 | brpc_request->set_sender_id(channel->_parent->sender_id()); |
287 | 28 | brpc_request->set_be_number(channel->_parent->be_number()); |
288 | | |
289 | 28 | if (_send_multi_blocks) { |
290 | 0 | for (auto& req : requests) { |
291 | 0 | if (req.block && !req.block->column_metas().empty()) { |
292 | 0 | auto add_block = brpc_request->add_blocks(); |
293 | 0 | add_block->Swap(req.block.get()); |
294 | 0 | } |
295 | 0 | } |
296 | 28 | } else { |
297 | 28 | if (request.block && !request.block->column_metas().empty()) { |
298 | 0 | brpc_request->set_allocated_block(request.block.get()); |
299 | 0 | } |
300 | 28 | } |
301 | | |
302 | 28 | instance_data.seq += requests.size(); |
303 | 28 | brpc_request->set_packet_seq(instance_data.seq); |
304 | 28 | brpc_request->set_eos(requests.back().eos); |
305 | 28 | auto send_callback = channel->get_send_callback(&instance_data, requests.back().eos); |
306 | 28 | send_callback->cntl_->set_timeout_ms(channel->_brpc_timeout_ms); |
307 | 28 | if (config::execution_ignore_eovercrowded) { |
308 | 28 | send_callback->cntl_->ignore_eovercrowded(); |
309 | 28 | } |
310 | 28 | send_callback->addFailedHandler([&, weak_task_ctx = weak_task_exec_ctx()]( |
311 | 28 | RpcInstance* ins, const std::string& err) { |
312 | 1 | auto task_lock = weak_task_ctx.lock(); |
313 | 1 | if (task_lock == nullptr) { |
314 | | // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more. |
315 | 0 | return; |
316 | 0 | } |
317 | | // attach task for memory tracker and query id when core |
318 | 1 | SCOPED_ATTACH_TASK(_state); |
319 | 1 | _failed(ins->id, err); |
320 | 1 | }); |
321 | 28 | send_callback->start_rpc_time = GetCurrentTimeNanos(); |
322 | 28 | send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()]( |
323 | 28 | RpcInstance* ins_ptr, const bool& eos, |
324 | 28 | const PTransmitDataResult& result, |
325 | 28 | const int64_t& start_rpc_time) { |
326 | 27 | auto task_lock = weak_task_ctx.lock(); |
327 | 27 | if (task_lock == nullptr) { |
328 | | // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more. |
329 | 0 | return; |
330 | 0 | } |
331 | | // attach task for memory tracker and query id when core |
332 | 27 | SCOPED_ATTACH_TASK(_state); |
333 | | |
334 | 27 | auto& ins = *ins_ptr; |
335 | 27 | auto end_rpc_time = GetCurrentTimeNanos(); |
336 | 27 | update_rpc_time(ins, start_rpc_time, end_rpc_time); |
337 | | |
338 | 27 | Status s(Status::create(result.status())); |
339 | 27 | if (s.is<ErrorCode::END_OF_FILE>()) { |
340 | 2 | _set_receiver_eof(ins); |
341 | 25 | } else if (!s.ok()) { |
342 | 0 | _failed(ins.id, |
343 | 0 | fmt::format("exchange req success but status isn't ok: {}", s.to_string())); |
344 | 0 | return; |
345 | 25 | } else if (eos) { |
346 | 14 | _ended(ins); |
347 | 14 | } |
348 | | // The eos here only indicates that the current exchange sink has reached eos. |
349 | | // However, the queue still contains data from other exchange sinks, so RPCs need to continue being sent. |
350 | 27 | s = _send_rpc(ins); |
351 | 27 | if (!s) { |
352 | 0 | _failed(ins.id, |
353 | 0 | fmt::format("exchange req success but status isn't ok: {}", s.to_string())); |
354 | 0 | } |
355 | 27 | }); |
356 | 28 | { |
357 | 28 | auto send_remote_block_closure = AutoReleaseClosure< |
358 | 28 | PTransmitDataParams, |
359 | 28 | ExchangeSendCallback<PTransmitDataResult>>::create_unique(brpc_request, |
360 | 28 | send_callback); |
361 | 28 | if (enable_http_send_block(*brpc_request)) { |
362 | 0 | RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(), |
363 | 0 | std::move(send_remote_block_closure), |
364 | 0 | channel->_brpc_dest_addr)); |
365 | 28 | } else { |
366 | 28 | transmit_blockv2(channel->_brpc_stub.get(), std::move(send_remote_block_closure)); |
367 | 28 | } |
368 | 28 | } |
369 | | |
370 | 28 | if (!_send_multi_blocks && request.block) { |
371 | 28 | static_cast<void>(brpc_request->release_block()); |
372 | 28 | } else { |
373 | 0 | brpc_request->clear_blocks(); |
374 | 0 | } |
375 | 28 | if (mem_byte) { |
376 | 0 | COUNTER_UPDATE(channel->_parent->memory_used_counter(), -mem_byte); |
377 | 0 | } |
378 | 28 | DCHECK_GE(_total_queue_size, requests.size()); |
379 | 28 | _total_queue_size -= (int)requests.size(); |
380 | 28 | if (_total_queue_size <= _queue_capacity) { |
381 | 28 | for (auto& dep : _queue_deps) { |
382 | 0 | dep->set_ready(); |
383 | 0 | } |
384 | 28 | } |
385 | 28 | } else if (broadcast_q_ptr && !broadcast_q_ptr->empty()) { |
386 | 0 | auto& broadcast_q = *broadcast_q_ptr; |
387 | | // If we have data to shuffle which is broadcasted |
388 | 0 | std::vector<BroadcastTransmitInfo> requests(_send_multi_blocks ? broadcast_q.size() : 1); |
389 | 0 | for (int i = 0; i < requests.size(); i++) { |
390 | 0 | requests[i] = broadcast_q.front(); |
391 | 0 | broadcast_q.pop(); |
392 | |
|
393 | 0 | if (requests[i].block_holder->get_block()) { |
394 | | // make sure rpc byte size under the _send_multi_blocks_bytes_size |
395 | 0 | mem_byte += requests[i].block_holder->get_block()->ByteSizeLong(); |
396 | 0 | if (_send_multi_blocks && mem_byte > _send_multi_blocks_byte_size) { |
397 | 0 | requests.resize(i + 1); |
398 | 0 | break; |
399 | 0 | } |
400 | 0 | } |
401 | 0 | } |
402 | |
|
403 | 0 | auto& request = requests[0]; |
404 | 0 | auto& brpc_request = instance_data.request; |
405 | 0 | brpc_request->set_sender_id(channel->_parent->sender_id()); |
406 | 0 | brpc_request->set_be_number(channel->_parent->be_number()); |
407 | |
|
408 | 0 | if (_send_multi_blocks) { |
409 | 0 | for (int i = 0; i < requests.size(); i++) { |
410 | 0 | auto& req = requests[i]; |
411 | 0 | if (auto block = req.block_holder->get_block(); |
412 | 0 | block && !block->column_metas().empty()) { |
413 | 0 | auto add_block = brpc_request->add_blocks(); |
414 | 0 | for (int j = 0; j < block->column_metas_size(); ++j) { |
415 | 0 | add_block->add_column_metas()->CopyFrom(block->column_metas(j)); |
416 | 0 | } |
417 | 0 | add_block->set_be_exec_version(block->be_exec_version()); |
418 | 0 | add_block->set_compressed(block->compressed()); |
419 | 0 | add_block->set_compression_type(block->compression_type()); |
420 | 0 | add_block->set_uncompressed_size(block->uncompressed_size()); |
421 | 0 | add_block->set_allocated_column_values(block->mutable_column_values()); |
422 | 0 | } |
423 | 0 | } |
424 | 0 | } else { |
425 | 0 | if (request.block_holder->get_block() && |
426 | 0 | !request.block_holder->get_block()->column_metas().empty()) { |
427 | 0 | brpc_request->set_allocated_block(request.block_holder->get_block()); |
428 | 0 | } |
429 | 0 | } |
430 | 0 | instance_data.seq += requests.size(); |
431 | 0 | brpc_request->set_packet_seq(instance_data.seq); |
432 | 0 | brpc_request->set_eos(requests.back().eos); |
433 | 0 | auto send_callback = channel->get_send_callback(&instance_data, requests.back().eos); |
434 | |
|
435 | 0 | send_callback->cntl_->set_timeout_ms(channel->_brpc_timeout_ms); |
436 | 0 | if (config::execution_ignore_eovercrowded) { |
437 | 0 | send_callback->cntl_->ignore_eovercrowded(); |
438 | 0 | } |
439 | 0 | send_callback->addFailedHandler([&, weak_task_ctx = weak_task_exec_ctx()]( |
440 | 0 | RpcInstance* ins, const std::string& err) { |
441 | 0 | auto task_lock = weak_task_ctx.lock(); |
442 | 0 | if (task_lock == nullptr) { |
443 | | // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more. |
444 | 0 | return; |
445 | 0 | } |
446 | | // attach task for memory tracker and query id when core |
447 | 0 | SCOPED_ATTACH_TASK(_state); |
448 | 0 | _failed(ins->id, err); |
449 | 0 | }); |
450 | 0 | send_callback->start_rpc_time = GetCurrentTimeNanos(); |
451 | 0 | send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()]( |
452 | 0 | RpcInstance* ins_ptr, const bool& eos, |
453 | 0 | const PTransmitDataResult& result, |
454 | 0 | const int64_t& start_rpc_time) { |
455 | 0 | auto task_lock = weak_task_ctx.lock(); |
456 | 0 | if (task_lock == nullptr) { |
457 | | // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more. |
458 | 0 | return; |
459 | 0 | } |
460 | | // attach task for memory tracker and query id when core |
461 | 0 | SCOPED_ATTACH_TASK(_state); |
462 | 0 | auto& ins = *ins_ptr; |
463 | 0 | auto end_rpc_time = GetCurrentTimeNanos(); |
464 | 0 | update_rpc_time(ins, start_rpc_time, end_rpc_time); |
465 | |
|
466 | 0 | Status s(Status::create(result.status())); |
467 | 0 | if (s.is<ErrorCode::END_OF_FILE>()) { |
468 | 0 | _set_receiver_eof(ins); |
469 | 0 | } else if (!s.ok()) { |
470 | 0 | _failed(ins.id, |
471 | 0 | fmt::format("exchange req success but status isn't ok: {}", s.to_string())); |
472 | 0 | return; |
473 | 0 | } else if (eos) { |
474 | 0 | _ended(ins); |
475 | 0 | } |
476 | | |
477 | | // The eos here only indicates that the current exchange sink has reached eos. |
478 | | // However, the queue still contains data from other exchange sinks, so RPCs need to continue being sent. |
479 | 0 | s = _send_rpc(ins); |
480 | 0 | if (!s) { |
481 | 0 | _failed(ins.id, |
482 | 0 | fmt::format("exchange req success but status isn't ok: {}", s.to_string())); |
483 | 0 | } |
484 | 0 | }); |
485 | 0 | { |
486 | 0 | auto send_remote_block_closure = AutoReleaseClosure< |
487 | 0 | PTransmitDataParams, |
488 | 0 | ExchangeSendCallback<PTransmitDataResult>>::create_unique(brpc_request, |
489 | 0 | send_callback); |
490 | 0 | if (enable_http_send_block(*brpc_request)) { |
491 | 0 | RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(), |
492 | 0 | std::move(send_remote_block_closure), |
493 | 0 | channel->_brpc_dest_addr)); |
494 | 0 | } else { |
495 | 0 | transmit_blockv2(channel->_brpc_stub.get(), std::move(send_remote_block_closure)); |
496 | 0 | } |
497 | 0 | } |
498 | 0 | if (!_send_multi_blocks && request.block_holder->get_block()) { |
499 | 0 | static_cast<void>(brpc_request->release_block()); |
500 | 0 | } else { |
501 | 0 | for (int i = 0; i < brpc_request->mutable_blocks()->size(); ++i) { |
502 | 0 | static_cast<void>(brpc_request->mutable_blocks(i)->release_column_values()); |
503 | 0 | } |
504 | 0 | brpc_request->clear_blocks(); |
505 | 0 | } |
506 | 5 | } else { |
507 | 5 | instance_data.rpc_channel_is_idle = true; |
508 | 5 | } |
509 | | |
510 | 33 | return Status::OK(); |
511 | 33 | } |
512 | | |
513 | 14 | void ExchangeSinkBuffer::_ended(RpcInstance& ins) { |
514 | 14 | std::unique_lock<std::mutex> lock(*ins.mutex); |
515 | 14 | ins.running_sink_count--; |
516 | 14 | if (ins.running_sink_count == 0) { |
517 | 4 | _turn_off_channel(ins, lock); |
518 | 4 | } |
519 | 14 | } |
520 | | |
521 | 0 | void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) { |
522 | 0 | _is_failed = true; |
523 | 0 | LOG(INFO) << "send rpc failed, instance id: " << id << ", _dest_node_id: " << _dest_node_id |
524 | 0 | << ", node id: " << _node_id << ", err: " << err; |
525 | 0 | _context->cancel(Status::Cancelled(err)); |
526 | 0 | } |
527 | | |
528 | 2 | void ExchangeSinkBuffer::_set_receiver_eof(RpcInstance& ins) { |
529 | 2 | std::unique_lock<std::mutex> lock(*ins.mutex); |
530 | | // When the receiving side reaches eof, it means the receiver has finished early. |
531 | | // The remaining data in the current rpc_channel does not need to be sent, |
532 | | // and the rpc_channel should be turned off immediately. |
533 | 2 | Defer turn_off([&]() { _turn_off_channel(ins, lock); }); |
534 | | |
535 | 2 | auto& broadcast_q_map = ins.broadcast_package_queue; |
536 | 2 | for (auto& [channel, broadcast_q] : broadcast_q_map) { |
537 | 0 | for (; !broadcast_q.empty(); broadcast_q.pop()) { |
538 | 0 | if (broadcast_q.front().block_holder->get_block()) { |
539 | 0 | COUNTER_UPDATE(channel->_parent->memory_used_counter(), |
540 | 0 | -broadcast_q.front().block_holder->get_block()->ByteSizeLong()); |
541 | 0 | } |
542 | 0 | } |
543 | 0 | } |
544 | 2 | broadcast_q_map.clear(); |
545 | | |
546 | 2 | auto& q_map = ins.package_queue; |
547 | 4 | for (auto& [channel, q] : q_map) { |
548 | 8 | for (; !q.empty(); q.pop()) { |
549 | | // Must update _total_queue_size here, otherwise if _total_queue_size > _queue_capacity at EOF, |
550 | | // ExchangeSinkQueueDependency will be blocked and pipeline will be deadlocked |
551 | 4 | _total_queue_size--; |
552 | 4 | if (q.front().block) { |
553 | 4 | COUNTER_UPDATE(channel->_parent->memory_used_counter(), |
554 | 4 | -q.front().block->ByteSizeLong()); |
555 | 4 | } |
556 | 4 | } |
557 | 4 | } |
558 | | |
559 | | // Try to wake up pipeline after clearing the queue |
560 | 2 | if (_total_queue_size <= _queue_capacity) { |
561 | 2 | for (auto& dep : _queue_deps) { |
562 | 0 | dep->set_ready(); |
563 | 0 | } |
564 | 2 | } |
565 | | |
566 | 2 | q_map.clear(); |
567 | 2 | } |
568 | | |
569 | | // The unused parameter `with_lock` is to ensure that the function is called when the lock is held. |
570 | | void ExchangeSinkBuffer::_turn_off_channel(RpcInstance& ins, |
571 | 8 | std::unique_lock<std::mutex>& /*with_lock*/) { |
572 | 8 | if (!ins.rpc_channel_is_idle) { |
573 | 8 | ins.rpc_channel_is_idle = true; |
574 | 8 | } |
575 | | // Ensure that each RPC is turned off only once. |
576 | 8 | if (ins.rpc_channel_is_turn_off) { |
577 | 0 | return; |
578 | 0 | } |
579 | 8 | ins.rpc_channel_is_turn_off = true; |
580 | 8 | auto weak_task_ctx = weak_task_exec_ctx(); |
581 | 8 | if (auto pip_ctx = weak_task_ctx.lock()) { |
582 | 8 | for (auto& parent : _parents) { |
583 | 0 | parent->on_channel_finished(ins.id); |
584 | 0 | } |
585 | 8 | } |
586 | 8 | } |
587 | | |
588 | 0 | void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* min_time) { |
589 | 0 | int64_t local_max_time = 0; |
590 | 0 | int64_t local_min_time = INT64_MAX; |
591 | 0 | for (auto& [_, ins] : _rpc_instances) { |
592 | 0 | if (ins->stats.sum_time != 0) { |
593 | 0 | local_max_time = std::max(local_max_time, ins->stats.sum_time); |
594 | 0 | local_min_time = std::min(local_min_time, ins->stats.sum_time); |
595 | 0 | } |
596 | 0 | } |
597 | 0 | *max_time = local_max_time; |
598 | 0 | *min_time = local_min_time == INT64_MAX ? 0 : local_min_time; |
599 | 0 | } |
600 | | |
601 | 0 | int64_t ExchangeSinkBuffer::get_sum_rpc_time() { |
602 | 0 | int64_t sum_time = 0; |
603 | 0 | for (auto& [_, ins] : _rpc_instances) { |
604 | 0 | sum_time += ins->stats.sum_time; |
605 | 0 | } |
606 | 0 | return sum_time; |
607 | 0 | } |
608 | | |
609 | | void ExchangeSinkBuffer::update_rpc_time(RpcInstance& ins, int64_t start_rpc_time, |
610 | 27 | int64_t receive_rpc_time) { |
611 | 27 | _rpc_count++; |
612 | 27 | int64_t rpc_spend_time = receive_rpc_time - start_rpc_time; |
613 | 27 | if (rpc_spend_time > 0) { |
614 | 27 | auto& stats = ins.stats; |
615 | 27 | ++stats.rpc_count; |
616 | 27 | stats.sum_time += rpc_spend_time; |
617 | 27 | stats.max_time = std::max(stats.max_time, rpc_spend_time); |
618 | 27 | stats.min_time = std::min(stats.min_time, rpc_spend_time); |
619 | 27 | } |
620 | 27 | } |
621 | | |
622 | 0 | void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) { |
623 | 0 | auto* _max_rpc_timer = ADD_TIMER_WITH_LEVEL(profile, "RpcMaxTime", 1); |
624 | 0 | auto* _min_rpc_timer = ADD_TIMER(profile, "RpcMinTime"); |
625 | 0 | auto* _sum_rpc_timer = ADD_TIMER(profile, "RpcSumTime"); |
626 | 0 | auto* _count_rpc = ADD_COUNTER(profile, "RpcCount", TUnit::UNIT); |
627 | 0 | auto* _avg_rpc_timer = ADD_TIMER(profile, "RpcAvgTime"); |
628 | |
|
629 | 0 | int64_t max_rpc_time = 0, min_rpc_time = 0; |
630 | 0 | get_max_min_rpc_time(&max_rpc_time, &min_rpc_time); |
631 | 0 | _max_rpc_timer->set(max_rpc_time); |
632 | 0 | _min_rpc_timer->set(min_rpc_time); |
633 | |
|
634 | 0 | _count_rpc->set(_rpc_count); |
635 | 0 | int64_t sum_time = get_sum_rpc_time(); |
636 | 0 | _sum_rpc_timer->set(sum_time); |
637 | 0 | _avg_rpc_timer->set(sum_time / std::max(static_cast<int64_t>(1), _rpc_count.load())); |
638 | |
|
639 | 0 | auto max_count = _state->rpc_verbose_profile_max_instance_count(); |
640 | | // This counter will lead to performance degradation. |
641 | | // So only collect this information when the profile level is greater than 3. |
642 | 0 | if (_state->profile_level() > 3 && max_count > 0) { |
643 | 0 | std::vector<std::pair<InstanceLoId, RpcInstanceStatistics>> tmp_rpc_stats_vec; |
644 | 0 | for (const auto& [id, ins] : _rpc_instances) { |
645 | 0 | tmp_rpc_stats_vec.emplace_back(id, ins->stats); |
646 | 0 | } |
647 | 0 | pdqsort(tmp_rpc_stats_vec.begin(), tmp_rpc_stats_vec.end(), |
648 | 0 | [](const auto& a, const auto& b) { return a.second.max_time > b.second.max_time; }); |
649 | 0 | auto count = std::min((size_t)max_count, tmp_rpc_stats_vec.size()); |
650 | 0 | int i = 0; |
651 | 0 | auto* detail_profile = profile->create_child("RpcInstanceDetails", true, true); |
652 | 0 | for (const auto& [id, stats] : tmp_rpc_stats_vec) { |
653 | 0 | if (0 == stats.rpc_count) { |
654 | 0 | continue; |
655 | 0 | } |
656 | 0 | std::stringstream out; |
657 | 0 | out << "Instance " << std::hex << id; |
658 | 0 | auto stats_str = fmt::format( |
659 | 0 | "Count: {}, MaxTime: {}, MinTime: {}, AvgTime: {}, SumTime: {}", |
660 | 0 | stats.rpc_count, PrettyPrinter::print(stats.max_time, TUnit::TIME_NS), |
661 | 0 | PrettyPrinter::print(stats.min_time, TUnit::TIME_NS), |
662 | 0 | PrettyPrinter::print( |
663 | 0 | stats.sum_time / std::max(static_cast<int64_t>(1), stats.rpc_count), |
664 | 0 | TUnit::TIME_NS), |
665 | 0 | PrettyPrinter::print(stats.sum_time, TUnit::TIME_NS)); |
666 | 0 | detail_profile->add_info_string(out.str(), stats_str); |
667 | 0 | if (++i == count) { |
668 | 0 | break; |
669 | 0 | } |
670 | 0 | } |
671 | 0 | } |
672 | 0 | } |
673 | | |
674 | 2 | std::string ExchangeSinkBuffer::debug_each_instance_queue_size() { |
675 | 2 | fmt::memory_buffer debug_string_buffer; |
676 | 6 | for (auto& [id, instance_data] : _rpc_instances) { |
677 | 6 | std::unique_lock<std::mutex> lock(*instance_data->mutex); |
678 | 6 | auto queue_size = 0; |
679 | 6 | for (auto& [_, list] : instance_data->package_queue) { |
680 | 5 | queue_size += list.size(); |
681 | 5 | } |
682 | 6 | fmt::format_to(debug_string_buffer, "Instance: {}, queue size: {}\n", id, queue_size); |
683 | 6 | } |
684 | 2 | return fmt::to_string(debug_string_buffer); |
685 | 2 | } |
686 | | |
687 | | #include "common/compile_check_end.h" |
688 | | } // namespace doris |