be/src/exec/operator/exchange_sink_buffer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/operator/exchange_sink_buffer.h" |
19 | | |
20 | | #include <brpc/controller.h> |
21 | | #include <butil/errno.h> |
22 | | #include <butil/iobuf_inl.h> |
23 | | #include <fmt/format.h> |
24 | | #include <gen_cpp/Types_types.h> |
25 | | #include <gen_cpp/types.pb.h> |
26 | | #include <glog/logging.h> |
27 | | #include <google/protobuf/stubs/callback.h> |
28 | | #include <pdqsort.h> |
29 | | #include <stddef.h> |
30 | | |
31 | | #include <atomic> |
32 | | #include <cstdint> |
33 | | #include <exception> |
34 | | #include <functional> |
35 | | #include <memory> |
36 | | #include <ostream> |
37 | | #include <utility> |
38 | | |
39 | | #include "common/status.h" |
40 | | #include "exec/exchange/vdata_stream_sender.h" |
41 | | #include "exec/operator/exchange_sink_operator.h" |
42 | | #include "exec/pipeline/pipeline_fragment_context.h" |
43 | | #include "runtime/exec_env.h" |
44 | | #include "runtime/thread_context.h" |
45 | | #include "service/backend_options.h" |
46 | | #include "util/defer_op.h" |
47 | | #include "util/proto_util.h" |
48 | | #include "util/time.h" |
49 | | |
50 | | namespace doris { |
51 | | |
52 | 2 | BroadcastPBlockHolder::~BroadcastPBlockHolder() { |
53 | | // lock the parent queue, if the queue could lock success, then return the block |
54 | | // to the queue, to reuse the block |
55 | 2 | std::shared_ptr<BroadcastPBlockHolderMemLimiter> limiter = _parent_creator.lock(); |
56 | 2 | if (limiter != nullptr) { |
57 | 2 | limiter->release(*this); |
58 | 2 | } |
59 | | // If the queue already deconstruted, then release pblock automatically since it |
60 | | // is a unique ptr. |
61 | 2 | } |
62 | | |
63 | 2 | void BroadcastPBlockHolderMemLimiter::acquire(BroadcastPBlockHolder& holder) { |
64 | 2 | std::unique_lock l(_holders_lock); |
65 | 2 | DCHECK(_broadcast_dependency != nullptr); |
66 | 2 | holder.set_parent_creator(shared_from_this()); |
67 | 2 | auto size = holder._pblock->column_values().size(); |
68 | 2 | _total_queue_buffer_size += size; |
69 | 2 | _total_queue_blocks_count++; |
70 | 2 | if (_total_queue_buffer_size >= _total_queue_buffer_size_limit || |
71 | 2 | _total_queue_blocks_count >= _total_queue_blocks_count_limit) { |
72 | 0 | _broadcast_dependency->block(); |
73 | 0 | } |
74 | 2 | } |
75 | | |
76 | 2 | void BroadcastPBlockHolderMemLimiter::release(const BroadcastPBlockHolder& holder) { |
77 | 2 | std::unique_lock l(_holders_lock); |
78 | 2 | DCHECK(_broadcast_dependency != nullptr); |
79 | 2 | auto size = holder._pblock->column_values().size(); |
80 | 2 | _total_queue_buffer_size -= size; |
81 | 2 | _total_queue_blocks_count--; |
82 | 2 | if (_total_queue_buffer_size <= 0) { |
83 | 2 | _broadcast_dependency->set_ready(); |
84 | 2 | } |
85 | 2 | } |
86 | | |
87 | | ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, |
88 | | PlanNodeId node_id, RuntimeState* state, |
89 | | const std::vector<InstanceLoId>& sender_ins_ids) |
90 | 6 | : HasTaskExecutionCtx(state), |
91 | 6 | _queue_capacity(0), |
92 | 6 | _is_failed(false), |
93 | 6 | _query_id(std::move(query_id)), |
94 | 6 | _dest_node_id(dest_node_id), |
95 | 6 | _node_id(node_id), |
96 | 6 | _state(state), |
97 | 6 | _context(state->get_query_ctx()), |
98 | 6 | _exchange_sink_num(sender_ins_ids.size()), |
99 | 6 | _send_multi_blocks(state->query_options().__isset.exchange_multi_blocks_byte_size && |
100 | 6 | state->query_options().exchange_multi_blocks_byte_size > 0) { |
101 | 6 | if (_send_multi_blocks) { |
102 | 6 | _send_multi_blocks_byte_size = state->query_options().exchange_multi_blocks_byte_size; |
103 | 6 | } |
104 | 6 | } |
105 | | |
106 | 0 | void ExchangeSinkBuffer::close() { |
107 | | // Could not clear the queue here, because there maybe a running rpc want to |
108 | | // get a request from the queue, and clear method will release the request |
109 | | // and it will core. |
110 | | //_instance_to_broadcast_package_queue.clear(); |
111 | | //_instance_to_package_queue.clear(); |
112 | | //_instance_to_request.clear(); |
113 | 0 | } |
114 | | |
115 | 14 | void ExchangeSinkBuffer::construct_request(TUniqueId fragment_instance_id) { |
116 | 14 | if (_is_failed) { |
117 | 0 | return; |
118 | 0 | } |
119 | 14 | auto low_id = fragment_instance_id.lo; |
120 | 14 | if (_rpc_instances.contains(low_id)) { |
121 | 0 | return; |
122 | 0 | } |
123 | | |
124 | | // Initialize the instance data |
125 | 14 | auto instance_data = std::make_unique<RpcInstance>(low_id); |
126 | 14 | instance_data->mutex = std::make_unique<std::mutex>(); |
127 | 14 | instance_data->seq = 0; |
128 | 14 | instance_data->package_queue = |
129 | 14 | std::unordered_map<Channel*, std::queue<TransmitInfo, std::list<TransmitInfo>>>(); |
130 | 14 | instance_data->broadcast_package_queue = std::unordered_map< |
131 | 14 | Channel*, std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>>(); |
132 | 14 | _queue_capacity = config::exchg_buffer_queue_capacity_factor * _rpc_instances.size(); |
133 | | |
134 | 14 | PUniqueId finst_id; |
135 | 14 | finst_id.set_hi(fragment_instance_id.hi); |
136 | 14 | finst_id.set_lo(fragment_instance_id.lo); |
137 | | |
138 | 14 | instance_data->rpc_channel_is_idle = true; |
139 | 14 | instance_data->rpc_channel_is_turn_off = false; |
140 | | |
141 | | // Initialize request |
142 | 14 | instance_data->request = std::make_shared<PTransmitDataParams>(); |
143 | 14 | instance_data->request->mutable_finst_id()->CopyFrom(finst_id); |
144 | 14 | instance_data->request->mutable_query_id()->CopyFrom(_query_id); |
145 | 14 | instance_data->request->set_node_id(_dest_node_id); |
146 | 14 | instance_data->running_sink_count = _exchange_sink_num; |
147 | | |
148 | 14 | _rpc_instances[low_id] = std::move(instance_data); |
149 | 14 | } |
150 | | |
151 | 48 | Status ExchangeSinkBuffer::add_block(Channel* channel, TransmitInfo&& request) { |
152 | 48 | if (_is_failed) { |
153 | 9 | return Status::OK(); |
154 | 9 | } |
155 | 39 | auto ins_id = channel->dest_ins_id(); |
156 | 39 | if (!_rpc_instances.contains(ins_id)) { |
157 | 0 | return Status::InternalError("fragment_instance_id {} not do register_sink", |
158 | 0 | print_id(channel->_fragment_instance_id)); |
159 | 0 | } |
160 | 39 | auto& instance_data = *_rpc_instances[ins_id]; |
161 | 39 | if (instance_data.rpc_channel_is_turn_off) { |
162 | 1 | return Status::EndOfFile("receiver eof"); |
163 | 1 | } |
164 | 38 | bool send_now = false; |
165 | 38 | { |
166 | 38 | std::unique_lock<std::mutex> lock(*instance_data.mutex); |
167 | | // Do not have in process rpc, directly send |
168 | 38 | if (instance_data.rpc_channel_is_idle) { |
169 | 14 | send_now = true; |
170 | 14 | instance_data.rpc_channel_is_idle = false; |
171 | 14 | } |
172 | 38 | if (request.block) { |
173 | 38 | RETURN_IF_ERROR( |
174 | 38 | BeExecVersionManager::check_be_exec_version(request.block->be_exec_version())); |
175 | 38 | COUNTER_UPDATE(channel->_parent->memory_used_counter(), request.block->ByteSizeLong()); |
176 | 38 | } |
177 | 38 | instance_data.package_queue[channel].emplace(std::move(request)); |
178 | 38 | _total_queue_size++; |
179 | 38 | if (_total_queue_size > _queue_capacity) { |
180 | 0 | for (auto& dep : _queue_deps) { |
181 | 0 | dep->block(); |
182 | 0 | } |
183 | 0 | } |
184 | 38 | } |
185 | 38 | if (send_now) { |
186 | 14 | RETURN_IF_ERROR(_send_rpc(instance_data)); |
187 | 14 | } |
188 | | |
189 | 38 | return Status::OK(); |
190 | 38 | } |
191 | | |
192 | 0 | Status ExchangeSinkBuffer::add_block(Channel* channel, BroadcastTransmitInfo&& request) { |
193 | 0 | if (_is_failed) { |
194 | 0 | return Status::OK(); |
195 | 0 | } |
196 | 0 | auto ins_id = channel->dest_ins_id(); |
197 | 0 | if (!_rpc_instances.contains(ins_id)) { |
198 | 0 | return Status::InternalError("fragment_instance_id {} not do register_sink", |
199 | 0 | print_id(channel->_fragment_instance_id)); |
200 | 0 | } |
201 | 0 | auto& instance_data = *_rpc_instances[ins_id]; |
202 | 0 | if (instance_data.rpc_channel_is_turn_off) { |
203 | 0 | return Status::EndOfFile("receiver eof"); |
204 | 0 | } |
205 | 0 | bool send_now = false; |
206 | 0 | { |
207 | 0 | std::unique_lock<std::mutex> lock(*instance_data.mutex); |
208 | | // Do not have in process rpc, directly send |
209 | 0 | if (instance_data.rpc_channel_is_idle) { |
210 | 0 | send_now = true; |
211 | 0 | instance_data.rpc_channel_is_idle = false; |
212 | 0 | } |
213 | 0 | if (request.block_holder->get_block()) { |
214 | 0 | RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version( |
215 | 0 | request.block_holder->get_block()->be_exec_version())); |
216 | 0 | } |
217 | 0 | instance_data.broadcast_package_queue[channel].emplace(request); |
218 | 0 | } |
219 | 0 | if (send_now) { |
220 | 0 | RETURN_IF_ERROR(_send_rpc(instance_data)); |
221 | 0 | } |
222 | | |
223 | 0 | return Status::OK(); |
224 | 0 | } |
225 | | |
226 | 41 | Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { |
227 | 41 | std::unique_lock<std::mutex> lock(*(instance_data.mutex)); |
228 | | |
229 | 41 | auto& q_map = instance_data.package_queue; |
230 | 41 | auto& broadcast_q_map = instance_data.broadcast_package_queue; |
231 | | |
232 | 82 | auto find_max_size_queue = [](Channel*& channel, auto& ptr, auto& map) { |
233 | 82 | for (auto& [chan, lists] : map) { |
234 | 81 | if (!ptr) { |
235 | 62 | if (!lists.empty()) { |
236 | 30 | channel = chan; |
237 | 30 | ptr = &lists; |
238 | 30 | } |
239 | 62 | } else { |
240 | 19 | if (ptr->size() < lists.size()) { |
241 | 0 | channel = chan; |
242 | 0 | ptr = &lists; |
243 | 0 | } |
244 | 19 | } |
245 | 81 | } |
246 | 82 | }; exchange_sink_buffer.cpp:_ZZN5doris18ExchangeSinkBuffer9_send_rpcERNS_11RpcInstanceEENK3$_0clIPSt5queueINS_12TransmitInfoENSt7__cxx114listIS6_SaIS6_EEEESt13unordered_mapIPNS_7ChannelESB_St4hashISF_ESt8equal_toISF_ESaISt4pairIKSF_SB_EEEEEDaRSF_RT_RT0_ Line | Count | Source | 232 | 41 | auto find_max_size_queue = [](Channel*& channel, auto& ptr, auto& map) { | 233 | 81 | for (auto& [chan, lists] : map) { | 234 | 81 | if (!ptr) { | 235 | 62 | if (!lists.empty()) { | 236 | 30 | channel = chan; | 237 | 30 | ptr = &lists; | 238 | 30 | } | 239 | 62 | } else { | 240 | 19 | if (ptr->size() < lists.size()) { | 241 | 0 | channel = chan; | 242 | 0 | ptr = &lists; | 243 | 0 | } | 244 | 19 | } | 245 | 81 | } | 246 | 41 | }; |
exchange_sink_buffer.cpp:_ZZN5doris18ExchangeSinkBuffer9_send_rpcERNS_11RpcInstanceEENK3$_0clIPSt5queueINS_21BroadcastTransmitInfoENSt7__cxx114listIS6_SaIS6_EEEESt13unordered_mapIPNS_7ChannelESB_St4hashISF_ESt8equal_toISF_ESaISt4pairIKSF_SB_EEEEEDaRSF_RT_RT0_ Line | Count | Source | 232 | 41 | auto find_max_size_queue = [](Channel*& channel, auto& ptr, auto& map) { | 233 | 41 | for (auto& [chan, lists] : map) { | 234 | 0 | if (!ptr) { | 235 | 0 | if (!lists.empty()) { | 236 | 0 | channel = chan; | 237 | 0 | ptr = &lists; | 238 | 0 | } | 239 | 0 | } else { | 240 | 0 | if (ptr->size() < lists.size()) { | 241 | 0 | channel = chan; | 242 | 0 | ptr = &lists; | 243 | 0 | } | 244 | 0 | } | 245 | 0 | } | 246 | 41 | }; |
|
247 | | |
248 | 41 | Channel* channel = nullptr; |
249 | | |
250 | 41 | std::queue<TransmitInfo, std::list<TransmitInfo>>* q_ptr = nullptr; |
251 | 41 | find_max_size_queue(channel, q_ptr, q_map); |
252 | 41 | std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>* broadcast_q_ptr = nullptr; |
253 | 41 | find_max_size_queue(channel, broadcast_q_ptr, broadcast_q_map); |
254 | | |
255 | 41 | if (_is_failed) { |
256 | 2 | _turn_off_channel(instance_data, lock); |
257 | 2 | return Status::OK(); |
258 | 2 | } |
259 | 39 | if (instance_data.rpc_channel_is_turn_off) { |
260 | 6 | return Status::OK(); |
261 | 6 | } |
262 | | |
263 | 33 | auto mem_byte = 0; |
264 | 33 | if (q_ptr && !q_ptr->empty()) { |
265 | 28 | auto& q = *q_ptr; |
266 | | |
267 | 28 | std::vector<TransmitInfo> requests(_send_multi_blocks ? q.size() : 1); |
268 | 56 | for (int i = 0; i < requests.size(); i++) { |
269 | 28 | requests[i] = std::move(q.front()); |
270 | 28 | q.pop(); |
271 | | |
272 | 28 | if (requests[i].block) { |
273 | | // make sure rpc byte size under the _send_multi_blocks_bytes_size |
274 | 28 | mem_byte += requests[i].block->ByteSizeLong(); |
275 | 28 | if (_send_multi_blocks && mem_byte > _send_multi_blocks_byte_size) { |
276 | 0 | requests.resize(i + 1); |
277 | 0 | break; |
278 | 0 | } |
279 | 28 | } |
280 | 28 | } |
281 | | |
282 | | // If we have data to shuffle which is not broadcasted |
283 | 28 | auto& request = requests[0]; |
284 | 28 | auto& brpc_request = instance_data.request; |
285 | 28 | brpc_request->set_sender_id(channel->_parent->sender_id()); |
286 | 28 | brpc_request->set_be_number(channel->_parent->be_number()); |
287 | | |
288 | 28 | if (_send_multi_blocks) { |
289 | 0 | for (auto& req : requests) { |
290 | 0 | if (req.block && !req.block->column_metas().empty()) { |
291 | 0 | auto add_block = brpc_request->add_blocks(); |
292 | 0 | add_block->Swap(req.block.get()); |
293 | 0 | } |
294 | 0 | } |
295 | 28 | } else { |
296 | 28 | if (request.block && !request.block->column_metas().empty()) { |
297 | 0 | brpc_request->set_allocated_block(request.block.get()); |
298 | 0 | } |
299 | 28 | } |
300 | | |
301 | 28 | instance_data.seq += requests.size(); |
302 | 28 | brpc_request->set_packet_seq(instance_data.seq); |
303 | 28 | brpc_request->set_eos(requests.back().eos); |
304 | 28 | auto send_callback = channel->get_send_callback(&instance_data, requests.back().eos); |
305 | 28 | send_callback->cntl_->set_timeout_ms(channel->_brpc_timeout_ms); |
306 | 28 | if (config::execution_ignore_eovercrowded) { |
307 | 28 | send_callback->cntl_->ignore_eovercrowded(); |
308 | 28 | } |
309 | 28 | send_callback->addFailedHandler([&, weak_task_ctx = weak_task_exec_ctx()]( |
310 | 28 | RpcInstance* ins, const std::string& err) { |
311 | 1 | auto task_lock = weak_task_ctx.lock(); |
312 | 1 | if (task_lock == nullptr) { |
313 | | // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more. |
314 | 0 | return; |
315 | 0 | } |
316 | | // attach task for memory tracker and query id when core |
317 | 1 | SCOPED_ATTACH_TASK(_state); |
318 | 1 | _failed(ins->id, err); |
319 | 1 | }); |
320 | 28 | send_callback->start_rpc_time = GetCurrentTimeNanos(); |
321 | 28 | send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()]( |
322 | 28 | RpcInstance* ins_ptr, const bool& eos, |
323 | 28 | const PTransmitDataResult& result, |
324 | 28 | const int64_t& start_rpc_time) { |
325 | 27 | auto task_lock = weak_task_ctx.lock(); |
326 | 27 | if (task_lock == nullptr) { |
327 | | // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more. |
328 | 0 | return; |
329 | 0 | } |
330 | | // attach task for memory tracker and query id when core |
331 | 27 | SCOPED_ATTACH_TASK(_state); |
332 | | |
333 | 27 | auto& ins = *ins_ptr; |
334 | 27 | auto end_rpc_time = GetCurrentTimeNanos(); |
335 | 27 | update_rpc_time(ins, start_rpc_time, end_rpc_time); |
336 | | |
337 | 27 | Status s(Status::create(result.status())); |
338 | 27 | if (s.is<ErrorCode::END_OF_FILE>()) { |
339 | 2 | _set_receiver_eof(ins); |
340 | 25 | } else if (!s.ok()) { |
341 | 0 | _failed(ins.id, |
342 | 0 | fmt::format("exchange req success but status isn't ok: {}", s.to_string())); |
343 | 0 | return; |
344 | 25 | } else if (eos) { |
345 | 14 | _ended(ins); |
346 | 14 | } |
347 | | // The eos here only indicates that the current exchange sink has reached eos. |
348 | | // However, the queue still contains data from other exchange sinks, so RPCs need to continue being sent. |
349 | 27 | s = _send_rpc(ins); |
350 | 27 | if (!s) { |
351 | 0 | _failed(ins.id, |
352 | 0 | fmt::format("exchange req success but status isn't ok: {}", s.to_string())); |
353 | 0 | } |
354 | 27 | }); |
355 | 28 | { |
356 | 28 | auto send_remote_block_closure = AutoReleaseClosure< |
357 | 28 | PTransmitDataParams, |
358 | 28 | ExchangeSendCallback<PTransmitDataResult>>::create_unique(brpc_request, |
359 | 28 | send_callback); |
360 | 28 | if (enable_http_send_block(*brpc_request)) { |
361 | 0 | RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(), |
362 | 0 | std::move(send_remote_block_closure), |
363 | 0 | channel->_brpc_dest_addr)); |
364 | 28 | } else { |
365 | 28 | transmit_blockv2(channel->_brpc_stub.get(), std::move(send_remote_block_closure)); |
366 | 28 | } |
367 | 28 | } |
368 | | |
369 | 28 | if (!_send_multi_blocks && request.block) { |
370 | 28 | static_cast<void>(brpc_request->release_block()); |
371 | 28 | } else { |
372 | 0 | brpc_request->clear_blocks(); |
373 | 0 | } |
374 | 28 | if (mem_byte) { |
375 | 0 | COUNTER_UPDATE(channel->_parent->memory_used_counter(), -mem_byte); |
376 | 0 | } |
377 | 28 | DCHECK_GE(_total_queue_size, requests.size()); |
378 | 28 | _total_queue_size -= (int)requests.size(); |
379 | 28 | if (_total_queue_size <= _queue_capacity) { |
380 | 28 | for (auto& dep : _queue_deps) { |
381 | 0 | dep->set_ready(); |
382 | 0 | } |
383 | 28 | } |
384 | 28 | } else if (broadcast_q_ptr && !broadcast_q_ptr->empty()) { |
385 | 0 | auto& broadcast_q = *broadcast_q_ptr; |
386 | | // If we have data to shuffle which is broadcasted |
387 | 0 | std::vector<BroadcastTransmitInfo> requests(_send_multi_blocks ? broadcast_q.size() : 1); |
388 | 0 | for (int i = 0; i < requests.size(); i++) { |
389 | 0 | requests[i] = broadcast_q.front(); |
390 | 0 | broadcast_q.pop(); |
391 | |
|
392 | 0 | if (requests[i].block_holder->get_block()) { |
393 | | // make sure rpc byte size under the _send_multi_blocks_bytes_size |
394 | 0 | mem_byte += requests[i].block_holder->get_block()->ByteSizeLong(); |
395 | 0 | if (_send_multi_blocks && mem_byte > _send_multi_blocks_byte_size) { |
396 | 0 | requests.resize(i + 1); |
397 | 0 | break; |
398 | 0 | } |
399 | 0 | } |
400 | 0 | } |
401 | |
|
402 | 0 | auto& request = requests[0]; |
403 | 0 | auto& brpc_request = instance_data.request; |
404 | 0 | brpc_request->set_sender_id(channel->_parent->sender_id()); |
405 | 0 | brpc_request->set_be_number(channel->_parent->be_number()); |
406 | |
|
407 | 0 | if (_send_multi_blocks) { |
408 | 0 | for (int i = 0; i < requests.size(); i++) { |
409 | 0 | auto& req = requests[i]; |
410 | 0 | if (auto block = req.block_holder->get_block(); |
411 | 0 | block && !block->column_metas().empty()) { |
412 | 0 | auto add_block = brpc_request->add_blocks(); |
413 | 0 | for (int j = 0; j < block->column_metas_size(); ++j) { |
414 | 0 | add_block->add_column_metas()->CopyFrom(block->column_metas(j)); |
415 | 0 | } |
416 | 0 | add_block->set_be_exec_version(block->be_exec_version()); |
417 | 0 | add_block->set_compressed(block->compressed()); |
418 | 0 | add_block->set_compression_type(block->compression_type()); |
419 | 0 | add_block->set_uncompressed_size(block->uncompressed_size()); |
420 | 0 | add_block->set_allocated_column_values(block->mutable_column_values()); |
421 | 0 | } |
422 | 0 | } |
423 | 0 | } else { |
424 | 0 | if (request.block_holder->get_block() && |
425 | 0 | !request.block_holder->get_block()->column_metas().empty()) { |
426 | 0 | brpc_request->set_allocated_block(request.block_holder->get_block()); |
427 | 0 | } |
428 | 0 | } |
429 | 0 | instance_data.seq += requests.size(); |
430 | 0 | brpc_request->set_packet_seq(instance_data.seq); |
431 | 0 | brpc_request->set_eos(requests.back().eos); |
432 | 0 | auto send_callback = channel->get_send_callback(&instance_data, requests.back().eos); |
433 | |
|
434 | 0 | send_callback->cntl_->set_timeout_ms(channel->_brpc_timeout_ms); |
435 | 0 | if (config::execution_ignore_eovercrowded) { |
436 | 0 | send_callback->cntl_->ignore_eovercrowded(); |
437 | 0 | } |
438 | 0 | send_callback->addFailedHandler([&, weak_task_ctx = weak_task_exec_ctx()]( |
439 | 0 | RpcInstance* ins, const std::string& err) { |
440 | 0 | auto task_lock = weak_task_ctx.lock(); |
441 | 0 | if (task_lock == nullptr) { |
442 | | // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more. |
443 | 0 | return; |
444 | 0 | } |
445 | | // attach task for memory tracker and query id when core |
446 | 0 | SCOPED_ATTACH_TASK(_state); |
447 | 0 | _failed(ins->id, err); |
448 | 0 | }); |
449 | 0 | send_callback->start_rpc_time = GetCurrentTimeNanos(); |
450 | 0 | send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()]( |
451 | 0 | RpcInstance* ins_ptr, const bool& eos, |
452 | 0 | const PTransmitDataResult& result, |
453 | 0 | const int64_t& start_rpc_time) { |
454 | 0 | auto task_lock = weak_task_ctx.lock(); |
455 | 0 | if (task_lock == nullptr) { |
456 | | // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more. |
457 | 0 | return; |
458 | 0 | } |
459 | | // attach task for memory tracker and query id when core |
460 | 0 | SCOPED_ATTACH_TASK(_state); |
461 | 0 | auto& ins = *ins_ptr; |
462 | 0 | auto end_rpc_time = GetCurrentTimeNanos(); |
463 | 0 | update_rpc_time(ins, start_rpc_time, end_rpc_time); |
464 | |
|
465 | 0 | Status s(Status::create(result.status())); |
466 | 0 | if (s.is<ErrorCode::END_OF_FILE>()) { |
467 | 0 | _set_receiver_eof(ins); |
468 | 0 | } else if (!s.ok()) { |
469 | 0 | _failed(ins.id, |
470 | 0 | fmt::format("exchange req success but status isn't ok: {}", s.to_string())); |
471 | 0 | return; |
472 | 0 | } else if (eos) { |
473 | 0 | _ended(ins); |
474 | 0 | } |
475 | | |
476 | | // The eos here only indicates that the current exchange sink has reached eos. |
477 | | // However, the queue still contains data from other exchange sinks, so RPCs need to continue being sent. |
478 | 0 | s = _send_rpc(ins); |
479 | 0 | if (!s) { |
480 | 0 | _failed(ins.id, |
481 | 0 | fmt::format("exchange req success but status isn't ok: {}", s.to_string())); |
482 | 0 | } |
483 | 0 | }); |
484 | 0 | { |
485 | 0 | auto send_remote_block_closure = AutoReleaseClosure< |
486 | 0 | PTransmitDataParams, |
487 | 0 | ExchangeSendCallback<PTransmitDataResult>>::create_unique(brpc_request, |
488 | 0 | send_callback); |
489 | 0 | if (enable_http_send_block(*brpc_request)) { |
490 | 0 | RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(), |
491 | 0 | std::move(send_remote_block_closure), |
492 | 0 | channel->_brpc_dest_addr)); |
493 | 0 | } else { |
494 | 0 | transmit_blockv2(channel->_brpc_stub.get(), std::move(send_remote_block_closure)); |
495 | 0 | } |
496 | 0 | } |
497 | 0 | if (!_send_multi_blocks && request.block_holder->get_block()) { |
498 | 0 | static_cast<void>(brpc_request->release_block()); |
499 | 0 | } else { |
500 | 0 | for (int i = 0; i < brpc_request->mutable_blocks()->size(); ++i) { |
501 | 0 | static_cast<void>(brpc_request->mutable_blocks(i)->release_column_values()); |
502 | 0 | } |
503 | 0 | brpc_request->clear_blocks(); |
504 | 0 | } |
505 | 5 | } else { |
506 | 5 | instance_data.rpc_channel_is_idle = true; |
507 | 5 | } |
508 | | |
509 | 33 | return Status::OK(); |
510 | 33 | } |
511 | | |
512 | 14 | void ExchangeSinkBuffer::_ended(RpcInstance& ins) { |
513 | 14 | std::unique_lock<std::mutex> lock(*ins.mutex); |
514 | 14 | ins.running_sink_count--; |
515 | 14 | if (ins.running_sink_count == 0) { |
516 | 4 | _turn_off_channel(ins, lock); |
517 | 4 | } |
518 | 14 | } |
519 | | |
520 | 0 | void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) { |
521 | 0 | _is_failed = true; |
522 | 0 | LOG(INFO) << "send rpc failed, instance id: " << id << ", _dest_node_id: " << _dest_node_id |
523 | 0 | << ", node id: " << _node_id << ", err: " << err; |
524 | 0 | _context->cancel(Status::Cancelled(err)); |
525 | 0 | } |
526 | | |
527 | 2 | void ExchangeSinkBuffer::_set_receiver_eof(RpcInstance& ins) { |
528 | 2 | std::unique_lock<std::mutex> lock(*ins.mutex); |
529 | | // When the receiving side reaches eof, it means the receiver has finished early. |
530 | | // The remaining data in the current rpc_channel does not need to be sent, |
531 | | // and the rpc_channel should be turned off immediately. |
532 | 2 | Defer turn_off([&]() { _turn_off_channel(ins, lock); }); |
533 | | |
534 | 2 | auto& broadcast_q_map = ins.broadcast_package_queue; |
535 | 2 | for (auto& [channel, broadcast_q] : broadcast_q_map) { |
536 | 0 | for (; !broadcast_q.empty(); broadcast_q.pop()) { |
537 | 0 | if (broadcast_q.front().block_holder->get_block()) { |
538 | 0 | COUNTER_UPDATE(channel->_parent->memory_used_counter(), |
539 | 0 | -broadcast_q.front().block_holder->get_block()->ByteSizeLong()); |
540 | 0 | } |
541 | 0 | } |
542 | 0 | } |
543 | 2 | broadcast_q_map.clear(); |
544 | | |
545 | 2 | auto& q_map = ins.package_queue; |
546 | 4 | for (auto& [channel, q] : q_map) { |
547 | 8 | for (; !q.empty(); q.pop()) { |
548 | | // Must update _total_queue_size here, otherwise if _total_queue_size > _queue_capacity at EOF, |
549 | | // ExchangeSinkQueueDependency will be blocked and pipeline will be deadlocked |
550 | 4 | _total_queue_size--; |
551 | 4 | if (q.front().block) { |
552 | 4 | COUNTER_UPDATE(channel->_parent->memory_used_counter(), |
553 | 4 | -q.front().block->ByteSizeLong()); |
554 | 4 | } |
555 | 4 | } |
556 | 4 | } |
557 | | |
558 | | // Try to wake up pipeline after clearing the queue |
559 | 2 | if (_total_queue_size <= _queue_capacity) { |
560 | 2 | for (auto& dep : _queue_deps) { |
561 | 0 | dep->set_ready(); |
562 | 0 | } |
563 | 2 | } |
564 | | |
565 | 2 | q_map.clear(); |
566 | 2 | } |
567 | | |
568 | | // The unused parameter `with_lock` is to ensure that the function is called when the lock is held. |
569 | | void ExchangeSinkBuffer::_turn_off_channel(RpcInstance& ins, |
570 | 8 | std::unique_lock<std::mutex>& /*with_lock*/) { |
571 | 8 | if (!ins.rpc_channel_is_idle) { |
572 | 8 | ins.rpc_channel_is_idle = true; |
573 | 8 | } |
574 | | // Ensure that each RPC is turned off only once. |
575 | 8 | if (ins.rpc_channel_is_turn_off) { |
576 | 0 | return; |
577 | 0 | } |
578 | 8 | ins.rpc_channel_is_turn_off = true; |
579 | 8 | auto weak_task_ctx = weak_task_exec_ctx(); |
580 | 8 | if (auto pip_ctx = weak_task_ctx.lock()) { |
581 | 8 | for (auto& parent : _parents) { |
582 | 0 | parent->on_channel_finished(ins.id); |
583 | 0 | } |
584 | 8 | } |
585 | 8 | } |
586 | | |
587 | 0 | void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* min_time) { |
588 | 0 | int64_t local_max_time = 0; |
589 | 0 | int64_t local_min_time = INT64_MAX; |
590 | 0 | for (auto& [_, ins] : _rpc_instances) { |
591 | 0 | if (ins->stats.sum_time != 0) { |
592 | 0 | local_max_time = std::max(local_max_time, ins->stats.sum_time); |
593 | 0 | local_min_time = std::min(local_min_time, ins->stats.sum_time); |
594 | 0 | } |
595 | 0 | } |
596 | 0 | *max_time = local_max_time; |
597 | 0 | *min_time = local_min_time == INT64_MAX ? 0 : local_min_time; |
598 | 0 | } |
599 | | |
600 | 0 | int64_t ExchangeSinkBuffer::get_sum_rpc_time() { |
601 | 0 | int64_t sum_time = 0; |
602 | 0 | for (auto& [_, ins] : _rpc_instances) { |
603 | 0 | sum_time += ins->stats.sum_time; |
604 | 0 | } |
605 | 0 | return sum_time; |
606 | 0 | } |
607 | | |
608 | | void ExchangeSinkBuffer::update_rpc_time(RpcInstance& ins, int64_t start_rpc_time, |
609 | 27 | int64_t receive_rpc_time) { |
610 | 27 | _rpc_count++; |
611 | 27 | int64_t rpc_spend_time = receive_rpc_time - start_rpc_time; |
612 | 27 | if (rpc_spend_time > 0) { |
613 | 27 | auto& stats = ins.stats; |
614 | 27 | ++stats.rpc_count; |
615 | 27 | stats.sum_time += rpc_spend_time; |
616 | 27 | stats.max_time = std::max(stats.max_time, rpc_spend_time); |
617 | 27 | stats.min_time = std::min(stats.min_time, rpc_spend_time); |
618 | 27 | } |
619 | 27 | } |
620 | | |
621 | 0 | void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) { |
622 | 0 | auto* _max_rpc_timer = ADD_TIMER_WITH_LEVEL(profile, "RpcMaxTime", 1); |
623 | 0 | auto* _min_rpc_timer = ADD_TIMER(profile, "RpcMinTime"); |
624 | 0 | auto* _sum_rpc_timer = ADD_TIMER(profile, "RpcSumTime"); |
625 | 0 | auto* _count_rpc = ADD_COUNTER(profile, "RpcCount", TUnit::UNIT); |
626 | 0 | auto* _avg_rpc_timer = ADD_TIMER(profile, "RpcAvgTime"); |
627 | |
|
628 | 0 | int64_t max_rpc_time = 0, min_rpc_time = 0; |
629 | 0 | get_max_min_rpc_time(&max_rpc_time, &min_rpc_time); |
630 | 0 | _max_rpc_timer->set(max_rpc_time); |
631 | 0 | _min_rpc_timer->set(min_rpc_time); |
632 | |
|
633 | 0 | _count_rpc->set(_rpc_count); |
634 | 0 | int64_t sum_time = get_sum_rpc_time(); |
635 | 0 | _sum_rpc_timer->set(sum_time); |
636 | 0 | _avg_rpc_timer->set(sum_time / std::max(static_cast<int64_t>(1), _rpc_count.load())); |
637 | |
|
638 | 0 | auto max_count = _state->rpc_verbose_profile_max_instance_count(); |
639 | | // This counter will lead to performance degradation. |
640 | | // So only collect this information when the profile level is greater than 3. |
641 | 0 | if (_state->profile_level() > 3 && max_count > 0) { |
642 | 0 | std::vector<std::pair<InstanceLoId, RpcInstanceStatistics>> tmp_rpc_stats_vec; |
643 | 0 | for (const auto& [id, ins] : _rpc_instances) { |
644 | 0 | tmp_rpc_stats_vec.emplace_back(id, ins->stats); |
645 | 0 | } |
646 | 0 | pdqsort(tmp_rpc_stats_vec.begin(), tmp_rpc_stats_vec.end(), |
647 | 0 | [](const auto& a, const auto& b) { return a.second.max_time > b.second.max_time; }); |
648 | 0 | auto count = std::min((size_t)max_count, tmp_rpc_stats_vec.size()); |
649 | 0 | int i = 0; |
650 | 0 | auto* detail_profile = profile->create_child("RpcInstanceDetails", true, true); |
651 | 0 | for (const auto& [id, stats] : tmp_rpc_stats_vec) { |
652 | 0 | if (0 == stats.rpc_count) { |
653 | 0 | continue; |
654 | 0 | } |
655 | 0 | std::stringstream out; |
656 | 0 | out << "Instance " << std::hex << id; |
657 | 0 | auto stats_str = fmt::format( |
658 | 0 | "Count: {}, MaxTime: {}, MinTime: {}, AvgTime: {}, SumTime: {}", |
659 | 0 | stats.rpc_count, PrettyPrinter::print(stats.max_time, TUnit::TIME_NS), |
660 | 0 | PrettyPrinter::print(stats.min_time, TUnit::TIME_NS), |
661 | 0 | PrettyPrinter::print( |
662 | 0 | stats.sum_time / std::max(static_cast<int64_t>(1), stats.rpc_count), |
663 | 0 | TUnit::TIME_NS), |
664 | 0 | PrettyPrinter::print(stats.sum_time, TUnit::TIME_NS)); |
665 | 0 | detail_profile->add_info_string(out.str(), stats_str); |
666 | 0 | if (++i == count) { |
667 | 0 | break; |
668 | 0 | } |
669 | 0 | } |
670 | 0 | } |
671 | 0 | } |
672 | | |
673 | 2 | std::string ExchangeSinkBuffer::debug_each_instance_queue_size() { |
674 | 2 | fmt::memory_buffer debug_string_buffer; |
675 | 6 | for (auto& [id, instance_data] : _rpc_instances) { |
676 | 6 | std::unique_lock<std::mutex> lock(*instance_data->mutex); |
677 | 6 | auto queue_size = 0; |
678 | 6 | for (auto& [_, list] : instance_data->package_queue) { |
679 | 5 | queue_size += list.size(); |
680 | 5 | } |
681 | 6 | fmt::format_to(debug_string_buffer, "Instance: {}, queue size: {}\n", id, queue_size); |
682 | 6 | } |
683 | 2 | return fmt::to_string(debug_string_buffer); |
684 | 2 | } |
685 | | |
686 | | } // namespace doris |