be/src/exec/exchange/vdata_stream_recvr.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/exchange/vdata_stream_recvr.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/Metrics_types.h> |
22 | | #include <gen_cpp/Types_types.h> |
23 | | #include <gen_cpp/data.pb.h> |
24 | | |
25 | | #include <algorithm> |
26 | | #include <functional> |
27 | | #include <string> |
28 | | |
29 | | #include "common/logging.h" |
30 | | #include "core/block/block.h" |
31 | | #include "core/block/materialize_block.h" |
32 | | #include "exec/exchange/vdata_stream_mgr.h" |
33 | | #include "exec/operator/exchange_sink_operator.h" |
34 | | #include "exec/operator/exchange_source_operator.h" |
35 | | #include "exec/sort/sort_cursor.h" |
36 | | #include "exec/sort/vsorted_run_merger.h" |
37 | | #include "runtime/memory/mem_tracker.h" |
38 | | #include "runtime/runtime_state.h" |
39 | | #include "runtime/thread_context.h" |
40 | | #include "util/defer_op.h" |
41 | | #include "util/uid_util.h" |
42 | | |
43 | | namespace doris { |
44 | | |
45 | | VDataStreamRecvr::SenderQueue::SenderQueue(VDataStreamRecvr* parent_recvr, int num_senders, |
46 | | std::shared_ptr<Dependency> local_channel_dependency) |
47 | 651k | : _recvr(parent_recvr), |
48 | 651k | _is_cancelled(false), |
49 | 651k | _num_remaining_senders(num_senders), |
50 | 651k | _local_channel_dependency(local_channel_dependency) { |
51 | 651k | _cancel_status = Status::OK(); |
52 | 651k | _queue_mem_tracker = std::make_unique<MemTracker>("local data queue mem tracker"); |
53 | 651k | } |
54 | | |
55 | 663k | VDataStreamRecvr::SenderQueue::~SenderQueue() { |
56 | 663k | run_block_queue_done_callbacks(_block_queue); |
57 | 663k | _block_queue.clear(); |
58 | 663k | } |
59 | | |
60 | 942k | Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) { |
61 | 942k | BlockItem block_item; |
62 | 942k | { |
63 | 942k | INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock)); |
64 | 942k | #ifndef NDEBUG |
65 | 942k | if (!_is_cancelled && _block_queue.empty() && _num_remaining_senders > 0) { |
66 | 0 | throw doris::Exception(ErrorCode::INTERNAL_ERROR, |
67 | 0 | "_is_cancelled: {}, _block_queue_empty: {}, " |
68 | 0 | "_num_remaining_senders: {}", |
69 | 0 | _is_cancelled, _block_queue.empty(), _num_remaining_senders); |
70 | 0 | } |
71 | 942k | #endif |
72 | | //check and get block_item from data_queue |
73 | 942k | if (_is_cancelled) { |
74 | 3 | RETURN_IF_ERROR(_cancel_status); |
75 | 2 | return Status::Cancelled("Cancelled"); |
76 | 3 | } |
77 | | |
78 | 942k | if (_block_queue.empty()) { |
79 | 648k | if (_num_remaining_senders != 0) { |
80 | 0 | return Status::InternalError( |
81 | 0 | "Data queue is empty but there are still remaining senders. " |
82 | 0 | "_num_remaining_senders: {}", |
83 | 0 | _num_remaining_senders); |
84 | 0 | } |
85 | 648k | *eos = true; |
86 | 648k | return Status::OK(); |
87 | 648k | } |
88 | | |
89 | 942k | DCHECK(!_block_queue.empty()); |
90 | 293k | block_item = std::move(_block_queue.front()); |
91 | 293k | _block_queue.pop_front(); |
92 | 293k | } |
93 | 0 | BlockUPtr next_block; |
94 | 293k | RETURN_IF_ERROR(block_item.get_block(next_block)); |
95 | 293k | size_t block_byte_size = block_item.block_byte_size(); |
96 | 293k | COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, block_item.deserialize_time()); |
97 | 293k | COUNTER_UPDATE(_recvr->_decompress_timer, block_item.decompress_time()); |
98 | 293k | COUNTER_UPDATE(_recvr->_decompress_bytes, block_item.decompress_bytes()); |
99 | 293k | _recvr->_memory_used_counter->update(-(int64_t)block_byte_size); |
100 | 293k | INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock)); |
101 | 293k | sub_blocks_memory_usage(block_byte_size); |
102 | 293k | if (_block_queue.empty() && _source_dependency) { |
103 | 235k | if (!_is_cancelled && _num_remaining_senders > 0) { |
104 | 119k | _source_dependency->block(); |
105 | 119k | } |
106 | 235k | } |
107 | | |
108 | 293k | block_item.call_done(_recvr); |
109 | | |
110 | 293k | DCHECK(block->empty()); |
111 | 293k | block->swap(*next_block); |
112 | 293k | *eos = false; |
113 | 293k | return Status::OK(); |
114 | 293k | } |
115 | | |
116 | 200 | bool VDataStreamRecvr::SenderQueue::has_data_or_finished() { |
117 | 200 | std::lock_guard<std::mutex> l(_lock); |
118 | 200 | return _is_cancelled || !_block_queue.empty() || _num_remaining_senders == 0; |
119 | 200 | } |
120 | | |
121 | 1.61M | void VDataStreamRecvr::SenderQueue::set_source_ready(std::lock_guard<std::mutex>&) { |
122 | | // Here, it is necessary to check if _source_dependency is not nullptr. |
123 | | // This is because the queue might be closed before setting the source dependency. |
124 | 1.61M | if (!_source_dependency) { |
125 | 49 | return; |
126 | 49 | } |
127 | 1.61M | const bool should_wait = !_is_cancelled && _block_queue.empty() && _num_remaining_senders > 0; |
128 | 1.61M | if (!should_wait) { |
129 | 1.61M | _source_dependency->set_ready(); |
130 | 1.61M | } |
131 | 1.61M | } |
132 | | |
133 | | void VDataStreamRecvr::SenderQueue::run_block_queue_done_callbacks( |
134 | 1.32M | std::list<BlockItem>& block_queue) { |
135 | 1.32M | for (auto& block_item : block_queue) { |
136 | 1.71k | block_item.call_done(_recvr); |
137 | 1.71k | } |
138 | 1.32M | } |
139 | | |
140 | 128 | std::string VDataStreamRecvr::SenderQueue::debug_string() { |
141 | 128 | std::lock_guard<std::mutex> l(_lock); |
142 | 128 | fmt::memory_buffer debug_string_buffer; |
143 | 128 | fmt::format_to(debug_string_buffer, |
144 | 128 | "_num_remaining_senders = {}, block_queue size = {}, _is_cancelled: {}, " |
145 | 128 | "_cancel_status: {}, _sender_eos_set: (", |
146 | 128 | _num_remaining_senders, _block_queue.size(), _is_cancelled, |
147 | 128 | _cancel_status.to_string()); |
148 | 128 | for (auto& i : _sender_eos_set) { |
149 | 112 | fmt::format_to(debug_string_buffer, "{}, ", i); |
150 | 112 | } |
151 | 128 | fmt::format_to(debug_string_buffer, ")"); |
152 | 128 | return fmt::to_string(debug_string_buffer); |
153 | 128 | } |
154 | | |
155 | | Status VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr<PBlock> pblock, int be_number, |
156 | | int64_t packet_seq, |
157 | | ::google::protobuf::Closure** done, |
158 | | const int64_t wait_for_worker, |
159 | 2.10k | const uint64_t time_to_find_recvr) { |
160 | 2.10k | { |
161 | 2.10k | INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock)); |
162 | 2.10k | if (_is_cancelled) { |
163 | 1.55k | return Status::OK(); |
164 | 1.55k | } |
165 | 543 | auto iter = _packet_seq_map.find(be_number); |
166 | 543 | if (iter != _packet_seq_map.end()) { |
167 | 528 | if (iter->second >= packet_seq) { |
168 | 0 | return Status::InternalError( |
169 | 0 | "packet already exist [cur_packet_id= {} receive_packet_id={}]", |
170 | 0 | iter->second, packet_seq); |
171 | 0 | } |
172 | 528 | iter->second = packet_seq; |
173 | 528 | } else { |
174 | 15 | _packet_seq_map.emplace(be_number, packet_seq); |
175 | 15 | } |
176 | | |
177 | 543 | DCHECK(_num_remaining_senders >= 0); |
178 | 543 | if (_num_remaining_senders == 0) { |
179 | 0 | DCHECK(_sender_eos_set.contains(be_number)); |
180 | 0 | return Status::OK(); |
181 | 0 | } |
182 | 543 | } |
183 | | |
184 | 543 | INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock)); |
185 | 543 | if (_is_cancelled) { |
186 | 6 | return Status::OK(); |
187 | 6 | } |
188 | | |
189 | 537 | const auto block_byte_size = pblock->ByteSizeLong(); |
190 | 537 | COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1); |
191 | 537 | if (_recvr->_max_wait_worker_time->value() < wait_for_worker) { |
192 | 0 | _recvr->_max_wait_worker_time->set(wait_for_worker); |
193 | 0 | } |
194 | | |
195 | 537 | if (_recvr->_max_find_recvr_time->value() < time_to_find_recvr) { |
196 | 0 | _recvr->_max_find_recvr_time->set((int64_t)time_to_find_recvr); |
197 | 0 | } |
198 | | |
199 | 537 | _block_queue.emplace_back(std::move(pblock), block_byte_size); |
200 | 537 | COUNTER_UPDATE(_recvr->_remote_bytes_received_counter, block_byte_size); |
201 | 537 | set_source_ready(l); |
202 | | |
203 | | // if done is nullptr, this function can't delay this response |
204 | 537 | if (done != nullptr && _recvr->exceeds_limit(block_byte_size)) { |
205 | 36 | _block_queue.back().set_done(*done); |
206 | 36 | *done = nullptr; |
207 | 36 | } |
208 | 537 | _recvr->_memory_used_counter->update(block_byte_size); |
209 | 537 | add_blocks_memory_usage(block_byte_size); |
210 | 537 | return Status::OK(); |
211 | 543 | } |
212 | | |
213 | | Status VDataStreamRecvr::SenderQueue::add_blocks(const PTransmitDataParams* request, |
214 | | ::google::protobuf::Closure** done, |
215 | | const int64_t wait_for_worker, |
216 | 83.4k | const uint64_t time_to_find_recvr) { |
217 | 83.4k | { |
218 | 83.4k | INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock)); |
219 | 83.4k | if (_is_cancelled) { |
220 | 1 | return Status::OK(); |
221 | 1 | } |
222 | 83.4k | const int be_number = request->be_number(); |
223 | | // In the request, the packet_seq for blocks is [request->packet_seq() - blocks_size(), request->packet_seq()) |
224 | | // Note this is a left-closed, right-open interval; the packet_seq of the last block is request->packet_seq() - 1 |
225 | | // We store the packet_seq of the last block in _packet_seq_map so we can compare it with the packet_seq of the next received packet |
226 | 83.4k | const int64_t packet_seq = request->packet_seq() - 1; |
227 | 83.4k | auto iter = _packet_seq_map.find(be_number); |
228 | 83.4k | if (iter != _packet_seq_map.end()) { |
229 | 16.1k | if (iter->second > (packet_seq - request->blocks_size())) { |
230 | 0 | return Status::InternalError( |
231 | 0 | "packet already exist [cur_packet_id= {} receive_packet_id={}]", |
232 | 0 | iter->second, packet_seq); |
233 | 0 | } |
234 | 16.1k | iter->second = packet_seq; |
235 | 67.2k | } else { |
236 | 67.2k | _packet_seq_map.emplace(be_number, packet_seq); |
237 | 67.2k | } |
238 | | |
239 | 83.4k | DCHECK(_num_remaining_senders >= 0); |
240 | 83.4k | if (_num_remaining_senders == 0) { |
241 | 0 | DCHECK(_sender_eos_set.end() != _sender_eos_set.find(be_number)); |
242 | 0 | return Status::OK(); |
243 | 0 | } |
244 | 83.4k | } |
245 | | |
246 | 83.4k | INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock)); |
247 | 83.4k | if (_is_cancelled) { |
248 | 0 | return Status::OK(); |
249 | 0 | } |
250 | | |
251 | 83.4k | int64_t total_block_byte_size = 0; |
252 | 166k | for (int i = 0; i < request->blocks_size(); i++) { |
253 | 83.4k | std::unique_ptr<PBlock> pblock = std::make_unique<PBlock>(); |
254 | 83.4k | pblock->CopyFrom(request->blocks(i)); |
255 | | |
256 | 83.4k | const auto block_byte_size = pblock->ByteSizeLong(); |
257 | 83.4k | COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1); |
258 | 83.4k | if (_recvr->_max_wait_worker_time->value() < wait_for_worker) { |
259 | 1 | _recvr->_max_wait_worker_time->set(wait_for_worker); |
260 | 1 | } |
261 | | |
262 | 83.4k | if (_recvr->_max_find_recvr_time->value() < time_to_find_recvr) { |
263 | 53.3k | _recvr->_max_find_recvr_time->set((int64_t)time_to_find_recvr); |
264 | 53.3k | } |
265 | | |
266 | 83.4k | _block_queue.emplace_back(std::move(pblock), block_byte_size); |
267 | 83.4k | COUNTER_UPDATE(_recvr->_remote_bytes_received_counter, block_byte_size); |
268 | 83.4k | total_block_byte_size += block_byte_size; |
269 | 83.4k | } |
270 | | |
271 | 83.4k | set_source_ready(l); |
272 | | |
273 | | // if done is nullptr, this function can't delay this response |
274 | 83.4k | if (done != nullptr && _recvr->exceeds_limit(total_block_byte_size)) { |
275 | 5 | _block_queue.back().set_done(*done); |
276 | 5 | *done = nullptr; |
277 | 5 | } |
278 | 83.4k | _recvr->_memory_used_counter->update(total_block_byte_size); |
279 | 83.4k | add_blocks_memory_usage(total_block_byte_size); |
280 | 83.4k | return Status::OK(); |
281 | 83.4k | } |
282 | | |
283 | 211k | void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { |
284 | 211k | if (block->rows() == 0) { |
285 | 0 | return; |
286 | 0 | } |
287 | 211k | { |
288 | 211k | INJECT_MOCK_SLEEP(std::unique_lock<std::mutex> l(_lock)); |
289 | 211k | if (_is_cancelled) { |
290 | 299 | return; |
291 | 299 | } |
292 | 211k | DCHECK(_num_remaining_senders >= 0); |
293 | 211k | if (_num_remaining_senders == 0) { |
294 | 1 | return; |
295 | 1 | } |
296 | 211k | } |
297 | 211k | BlockUPtr nblock = Block::create_unique(block->get_columns_with_type_and_name()); |
298 | | |
299 | | // local exchange should copy the block contented if use move == false |
300 | 211k | if (use_move) { |
301 | 203k | block->clear(); |
302 | 203k | } else { |
303 | 8.19k | auto rows = block->rows(); |
304 | 22.1k | for (int i = 0; i < nblock->columns(); ++i) { |
305 | 13.9k | nblock->get_by_position(i).column = |
306 | 13.9k | nblock->get_by_position(i).column->clone_resized(rows); |
307 | 13.9k | } |
308 | 8.19k | } |
309 | 211k | materialize_block_inplace(*nblock); |
310 | | |
311 | 211k | auto block_mem_size = nblock->allocated_bytes(); |
312 | 211k | { |
313 | 211k | INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock)); |
314 | 211k | if (_is_cancelled) { |
315 | 0 | return; |
316 | 0 | } |
317 | 211k | _block_queue.emplace_back(std::move(nblock), block_mem_size); |
318 | 211k | set_source_ready(l); |
319 | 211k | COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size); |
320 | 211k | _recvr->_memory_used_counter->update(block_mem_size); |
321 | 211k | add_blocks_memory_usage(block_mem_size); |
322 | 211k | } |
323 | 211k | } |
324 | | |
325 | 4.53M | void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) { |
326 | 4.53M | INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock)); |
327 | 4.53M | if (_sender_eos_set.end() != _sender_eos_set.find(be_number)) { |
328 | 0 | return; |
329 | 0 | } |
330 | 4.53M | _sender_eos_set.insert(be_number); |
331 | 4.53M | DCHECK_GT(_num_remaining_senders, 0); |
332 | 4.53M | _num_remaining_senders--; |
333 | 18.4E | VLOG_FILE << "decremented senders: fragment_instance_id=" |
334 | 18.4E | << print_id(_recvr->fragment_instance_id()) << " node_id=" << _recvr->dest_node_id() |
335 | 18.4E | << " #senders=" << _num_remaining_senders; |
336 | 4.53M | if (_num_remaining_senders == 0) { |
337 | 656k | set_source_ready(l); |
338 | 656k | } |
339 | 4.53M | } |
340 | | |
341 | 664k | void VDataStreamRecvr::SenderQueue::cancel(Status cancel_status) { |
342 | 664k | std::list<BlockItem> block_queue; |
343 | 664k | { |
344 | 664k | INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock)); |
345 | 664k | if (_is_cancelled) { |
346 | 664k | return; |
347 | 664k | } |
348 | 18.4E | _is_cancelled = true; |
349 | 18.4E | _cancel_status = cancel_status; |
350 | 18.4E | set_source_ready(l); |
351 | 18.4E | VLOG_QUERY << "cancelled stream: _fragment_instance_id=" |
352 | 18.4E | << print_id(_recvr->fragment_instance_id()) |
353 | 18.4E | << " node_id=" << _recvr->dest_node_id(); |
354 | 18.4E | block_queue.splice(block_queue.end(), _block_queue); |
355 | 18.4E | } |
356 | 0 | run_block_queue_done_callbacks(block_queue); |
357 | 18.4E | } |
358 | | |
359 | 662k | void VDataStreamRecvr::SenderQueue::close() { |
360 | | // If _is_cancelled is not set to true, there may be concurrent send |
361 | | // which add batch to _block_queue. The batch added after _block_queue |
362 | | // is clear will be memory leak |
363 | 662k | std::list<BlockItem> block_queue; |
364 | 662k | { |
365 | 662k | INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock)); |
366 | 662k | _is_cancelled = true; |
367 | 662k | set_source_ready(l); |
368 | 662k | block_queue.splice(block_queue.end(), _block_queue); |
369 | 662k | } |
370 | | // Release delayed RPC callbacks after the queue state is fully closed. |
371 | 662k | run_block_queue_done_callbacks(block_queue); |
372 | 662k | } |
373 | | |
374 | | VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, |
375 | | RuntimeProfile::HighWaterMarkCounter* memory_used_counter, |
376 | | RuntimeState* state, const TUniqueId& fragment_instance_id, |
377 | | PlanNodeId dest_node_id, int num_senders, bool is_merging, |
378 | | RuntimeProfile* profile, size_t data_queue_capacity) |
379 | 453k | : HasTaskExecutionCtx(state), |
380 | 453k | _mgr(stream_mgr), |
381 | 453k | _memory_used_counter(memory_used_counter), |
382 | 453k | _resource_ctx(state->get_query_ctx()->resource_ctx()), |
383 | 453k | _query_context(state->get_query_ctx()->shared_from_this()), |
384 | 453k | _fragment_instance_id(fragment_instance_id), |
385 | 453k | _dest_node_id(dest_node_id), |
386 | 453k | _is_merging(is_merging), |
387 | 453k | _is_closed(false), |
388 | 453k | _sender_queue_mem_limit(data_queue_capacity), |
389 | 453k | _profile(profile) { |
390 | | // DataStreamRecvr may be destructed after the instance execution thread ends. |
391 | 453k | _mem_tracker = |
392 | 453k | std::make_unique<MemTracker>("VDataStreamRecvr:" + print_id(_fragment_instance_id)); |
393 | 453k | SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); |
394 | | |
395 | | // Create one queue per sender if is_merging is true. |
396 | 453k | int num_queues = is_merging ? num_senders : 1; |
397 | 453k | _sender_to_local_channel_dependency.resize(num_queues); |
398 | 1.10M | for (size_t i = 0; i < num_queues; i++) { |
399 | 655k | _sender_to_local_channel_dependency[i] = Dependency::create_shared( |
400 | 655k | _dest_node_id, _dest_node_id, fmt::format("LocalExchangeChannelDependency_{}", i), |
401 | 655k | true); |
402 | 655k | } |
403 | 453k | _sender_queues.reserve(num_queues); |
404 | 453k | int num_sender_per_queue = is_merging ? 1 : num_senders; |
405 | 1.10M | for (int i = 0; i < num_queues; ++i) { |
406 | 655k | SenderQueue* queue = nullptr; |
407 | 655k | queue = _sender_queue_pool.add(new SenderQueue(this, num_sender_per_queue, |
408 | 655k | _sender_to_local_channel_dependency[i])); |
409 | 655k | _sender_queues.push_back(queue); |
410 | 655k | } |
411 | | |
412 | | // Initialize the counters |
413 | 453k | _remote_bytes_received_counter = ADD_COUNTER(_profile, "RemoteBytesReceived", TUnit::BYTES); |
414 | 453k | _local_bytes_received_counter = ADD_COUNTER(_profile, "LocalBytesReceived", TUnit::BYTES); |
415 | | |
416 | 453k | _deserialize_row_batch_timer = ADD_TIMER(_profile, "DeserializeRowBatchTimer"); |
417 | 453k | _data_arrival_timer = ADD_TIMER(_profile, "DataArrivalWaitTime"); |
418 | 453k | _buffer_full_total_timer = ADD_TIMER(_profile, "SendersBlockedTotalTimer(*)"); |
419 | 453k | _first_batch_wait_total_timer = ADD_TIMER(_profile, "FirstBatchArrivalWaitTime"); |
420 | 453k | _decompress_timer = ADD_TIMER(_profile, "DecompressTime"); |
421 | 453k | _decompress_bytes = ADD_COUNTER(_profile, "DecompressBytes", TUnit::BYTES); |
422 | 453k | _blocks_produced_counter = ADD_COUNTER(_profile, "BlocksProduced", TUnit::UNIT); |
423 | 453k | _max_wait_worker_time = ADD_COUNTER(_profile, "MaxWaitForWorkerTime", TUnit::UNIT); |
424 | 453k | _max_wait_to_process_time = ADD_COUNTER(_profile, "MaxWaitToProcessTime", TUnit::UNIT); |
425 | 453k | _max_find_recvr_time = ADD_COUNTER(_profile, "MaxFindRecvrCpuTime(NS)", TUnit::UNIT); |
426 | 453k | } |
427 | | |
428 | 459k | VDataStreamRecvr::~VDataStreamRecvr() { |
429 | 18.4E | DCHECK(_mgr == nullptr) << "Must call close()"; |
430 | 459k | } |
431 | | |
432 | | Status VDataStreamRecvr::create_merger(const VExprContextSPtrs& ordering_expr, |
433 | | const std::vector<bool>& is_asc_order, |
434 | | const std::vector<bool>& nulls_first, size_t batch_size, |
435 | 42.8k | int64_t limit, size_t offset) { |
436 | 42.8k | DCHECK(_is_merging); |
437 | 42.8k | SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); |
438 | 42.8k | std::vector<BlockSupplier> child_block_suppliers; |
439 | 42.8k | std::vector<BlockSupplierReadyChecker> child_block_supplier_ready_checkers; |
440 | | // Create the merger that will a single stream of sorted rows. |
441 | 42.8k | _merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, nulls_first, batch_size, limit, |
442 | 42.8k | offset, _profile)); |
443 | | |
444 | 42.8k | child_block_suppliers.reserve(_sender_queues.size()); |
445 | 42.8k | child_block_supplier_ready_checkers.reserve(_sender_queues.size()); |
446 | 288k | for (int i = 0; i < _sender_queues.size(); ++i) { |
447 | 245k | child_block_suppliers.emplace_back(std::bind(std::mem_fn(&SenderQueue::get_batch), |
448 | 245k | _sender_queues[i], std::placeholders::_1, |
449 | 245k | std::placeholders::_2)); |
450 | 245k | child_block_supplier_ready_checkers.emplace_back( |
451 | 245k | std::bind(std::mem_fn(&SenderQueue::has_data_or_finished), _sender_queues[i])); |
452 | 245k | } |
453 | 42.8k | RETURN_IF_ERROR(_merger->prepare(child_block_suppliers, child_block_supplier_ready_checkers)); |
454 | 42.8k | return Status::OK(); |
455 | 42.8k | } |
456 | | |
457 | | Status VDataStreamRecvr::add_block(std::unique_ptr<PBlock> pblock, int sender_id, int be_number, |
458 | | int64_t packet_seq, ::google::protobuf::Closure** done, |
459 | | const int64_t wait_for_worker, |
460 | 0 | const uint64_t time_to_find_recvr) { |
461 | 0 | SCOPED_ATTACH_TASK(_resource_ctx); |
462 | 0 | if (_query_context->low_memory_mode()) { |
463 | 0 | set_low_memory_mode(); |
464 | 0 | } |
465 | |
|
466 | 0 | int use_sender_id = _is_merging ? sender_id : 0; |
467 | 0 | return _sender_queues[use_sender_id]->add_block(std::move(pblock), be_number, packet_seq, done, |
468 | 0 | wait_for_worker, time_to_find_recvr); |
469 | 0 | } |
470 | | |
471 | | Status VDataStreamRecvr::add_blocks(const PTransmitDataParams* request, |
472 | | ::google::protobuf::Closure** done, |
473 | | const int64_t wait_for_worker, |
474 | 83.4k | const uint64_t time_to_find_recvr) { |
475 | 83.4k | SCOPED_ATTACH_TASK(_resource_ctx); |
476 | 83.4k | if (_query_context->low_memory_mode()) { |
477 | 0 | set_low_memory_mode(); |
478 | 0 | } |
479 | 83.4k | int use_sender_id = _is_merging ? request->sender_id() : 0; |
480 | 83.4k | return _sender_queues[use_sender_id]->add_blocks(request, done, wait_for_worker, |
481 | 83.4k | time_to_find_recvr); |
482 | 83.4k | } |
483 | | |
484 | 211k | void VDataStreamRecvr::add_block(Block* block, int sender_id, bool use_move) { |
485 | 211k | if (_query_context->low_memory_mode()) { |
486 | 0 | set_low_memory_mode(); |
487 | 0 | } |
488 | 211k | int use_sender_id = _is_merging ? sender_id : 0; |
489 | 211k | _sender_queues[use_sender_id]->add_block(block, use_move); |
490 | 211k | } |
491 | | |
492 | 97 | std::string VDataStreamRecvr::debug_string() { |
493 | 97 | fmt::memory_buffer debug_string_buffer; |
494 | 97 | fmt::format_to(debug_string_buffer, |
495 | 97 | "fragment_instance_id: {}, _dest_node_id: {}, _is_merging: {}, _is_closed: {}", |
496 | 97 | print_id(_fragment_instance_id), _dest_node_id, _is_merging, _is_closed); |
497 | 225 | for (size_t i = 0; i < _sender_queues.size(); i++) { |
498 | 128 | fmt::format_to(debug_string_buffer, "No. {} queue: {}", i, |
499 | 128 | _sender_queues[i]->debug_string()); |
500 | 128 | } |
501 | 97 | return fmt::to_string(debug_string_buffer); |
502 | 97 | } |
503 | | |
504 | 2.63M | std::shared_ptr<Dependency> VDataStreamRecvr::get_local_channel_dependency(int sender_id) { |
505 | 2.63M | DCHECK(_sender_to_local_channel_dependency[_is_merging ? sender_id : 0] != nullptr); |
506 | 2.63M | return _sender_to_local_channel_dependency[_is_merging ? sender_id : 0]; |
507 | 2.63M | } |
508 | | |
509 | 738k | Status VDataStreamRecvr::get_next(Block* block, bool* eos) { |
510 | 738k | if (!_is_merging) { |
511 | 620k | block->clear(); |
512 | 620k | return _sender_queues[0]->get_batch(block, eos); |
513 | 620k | } else { |
514 | 118k | return _merger->get_next(block, eos); |
515 | 118k | } |
516 | 738k | } |
517 | | |
518 | 4.53M | void VDataStreamRecvr::remove_sender(int sender_id, int be_number, Status exec_status) { |
519 | 4.53M | if (!exec_status.ok()) { |
520 | 0 | cancel_stream(exec_status); |
521 | 0 | return; |
522 | 0 | } |
523 | 4.53M | int use_sender_id = _is_merging ? sender_id : 0; |
524 | 4.53M | _sender_queues[use_sender_id]->decrement_senders(be_number); |
525 | 4.53M | } |
526 | | |
527 | 460k | void VDataStreamRecvr::cancel_stream(Status exec_status) { |
528 | 18.4E | VLOG_QUERY << "cancel_stream: fragment_instance_id=" << print_id(_fragment_instance_id) |
529 | 18.4E | << exec_status; |
530 | | |
531 | 1.12M | for (int i = 0; i < _sender_queues.size(); ++i) { |
532 | 664k | _sender_queues[i]->cancel(exec_status); |
533 | 664k | } |
534 | 460k | } |
535 | | |
536 | 295k | void VDataStreamRecvr::SenderQueue::add_blocks_memory_usage(int64_t size) { |
537 | 295k | DCHECK(size >= 0); |
538 | 295k | _recvr->_mem_tracker->consume(size); |
539 | 295k | _queue_mem_tracker->consume(size); |
540 | 295k | if (_local_channel_dependency && exceeds_limit()) { |
541 | 3.86k | _local_channel_dependency->block(); |
542 | 3.86k | } |
543 | 295k | } |
544 | | |
545 | 293k | void VDataStreamRecvr::SenderQueue::sub_blocks_memory_usage(int64_t size) { |
546 | 293k | DCHECK(size >= 0); |
547 | 293k | _recvr->_mem_tracker->release(size); |
548 | 293k | _queue_mem_tracker->release(size); |
549 | 293k | if (_local_channel_dependency && (!exceeds_limit())) { |
550 | 293k | _local_channel_dependency->set_ready(); |
551 | 293k | } |
552 | 293k | } |
553 | | |
554 | 589k | bool VDataStreamRecvr::SenderQueue::exceeds_limit() { |
555 | 589k | const size_t queue_byte_size = _queue_mem_tracker->consumption(); |
556 | 589k | return _recvr->queue_exceeds_limit(queue_byte_size); |
557 | 589k | } |
558 | | |
559 | 83.9k | bool VDataStreamRecvr::exceeds_limit(size_t block_byte_size) { |
560 | 83.9k | return _mem_tracker->consumption() + block_byte_size > config::exchg_node_buffer_size_bytes; |
561 | 83.9k | } |
562 | | |
563 | 589k | bool VDataStreamRecvr::queue_exceeds_limit(size_t queue_byte_size) const { |
564 | 589k | return queue_byte_size >= _sender_queue_mem_limit; |
565 | 589k | } |
566 | | |
567 | 916k | void VDataStreamRecvr::close() { |
568 | 916k | if (_is_closed) { |
569 | 457k | return; |
570 | 457k | } |
571 | 458k | _is_closed = true; |
572 | 663k | for (auto& it : _sender_to_local_channel_dependency) { |
573 | 663k | it->set_always_ready(); |
574 | 663k | } |
575 | 1.12M | for (int i = 0; i < _sender_queues.size(); ++i) { |
576 | 663k | _sender_queues[i]->close(); |
577 | 663k | } |
578 | | // Remove this receiver from the DataStreamMgr that created it. |
579 | | // TODO: log error msg |
580 | 459k | if (_mgr) { |
581 | 459k | static_cast<void>(_mgr->deregister_recvr(fragment_instance_id(), dest_node_id())); |
582 | 459k | } |
583 | 458k | _mgr = nullptr; |
584 | | |
585 | 458k | _merger.reset(); |
586 | 458k | } |
587 | | |
588 | 452k | void VDataStreamRecvr::set_sink_dep_always_ready() const { |
589 | 655k | for (auto dep : _sender_to_local_channel_dependency) { |
590 | 655k | dep->set_always_ready(); |
591 | 655k | } |
592 | 452k | } |
593 | | |
594 | | } // namespace doris |