be/src/exec/exchange/local_exchanger.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/exchange/local_exchanger.h" |
19 | | |
20 | | #include "common/cast_set.h" |
21 | | #include "common/config.h" |
22 | | #include "common/status.h" |
23 | | #include "exec/exchange/local_exchange_sink_operator.h" |
24 | | #include "exec/exchange/local_exchange_source_operator.h" |
25 | | #include "exec/partitioner/partitioner.h" |
26 | | |
27 | | namespace doris { |
28 | | #include "common/compile_check_begin.h" |
29 | | template <typename BlockType> |
30 | | void Exchanger<BlockType>::_enqueue_data_and_set_ready(int channel_id, |
31 | | LocalExchangeSinkLocalState* local_state, |
32 | 47.3k | BlockType&& block) { |
33 | 47.3k | if (local_state == nullptr) { |
34 | 0 | _enqueue_data_and_set_ready(channel_id, std::move(block)); |
35 | 0 | return; |
36 | 0 | } |
37 | | // PartitionedBlock is used by shuffle exchanger. |
38 | | // PartitionedBlock will be push into multiple queues with different row ranges, so it will be |
39 | | // referenced multiple times. Otherwise, we only ref the block once because it is only push into |
40 | | // one queue. |
41 | 47.3k | std::unique_lock l(*_m[channel_id]); |
42 | | if constexpr (std::is_same_v<PartitionedBlock, BlockType> || |
43 | 1.14k | std::is_same_v<BroadcastBlock, BlockType>) { |
44 | 1.14k | block.first->record_channel_id(channel_id); |
45 | 46.1k | } else { |
46 | 46.1k | block->record_channel_id(channel_id); |
47 | 46.1k | } |
48 | | |
49 | 47.3k | if (_data_queue[channel_id].enqueue(std::move(block))) { |
50 | 47.3k | local_state->_shared_state->set_ready_to_read(channel_id); |
51 | 47.3k | } |
52 | 47.3k | } _ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE27_enqueue_data_and_set_readyEiPNS_27LocalExchangeSinkLocalStateEOS7_ Line | Count | Source | 32 | 44 | BlockType&& block) { | 33 | 44 | if (local_state == nullptr) { | 34 | 0 | _enqueue_data_and_set_ready(channel_id, std::move(block)); | 35 | 0 | return; | 36 | 0 | } | 37 | | // PartitionedBlock is used by shuffle exchanger. | 38 | | // PartitionedBlock will be push into multiple queues with different row ranges, so it will be | 39 | | // referenced multiple times. Otherwise, we only ref the block once because it is only push into | 40 | | // one queue. | 41 | 44 | std::unique_lock l(*_m[channel_id]); | 42 | | if constexpr (std::is_same_v<PartitionedBlock, BlockType> || | 43 | 44 | std::is_same_v<BroadcastBlock, BlockType>) { | 44 | 44 | block.first->record_channel_id(channel_id); | 45 | | } else { | 46 | | block->record_channel_id(channel_id); | 47 | | } | 48 | | | 49 | 44 | if (_data_queue[channel_id].enqueue(std::move(block))) { | 50 | 40 | local_state->_shared_state->set_ready_to_read(channel_id); | 51 | 40 | } | 52 | 44 | } |
_ZN5doris9ExchangerISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE27_enqueue_data_and_set_readyEiPNS_27LocalExchangeSinkLocalStateEOS4_ Line | Count | Source | 32 | 46.1k | BlockType&& block) { | 33 | 46.1k | if (local_state == nullptr) { | 34 | 0 | _enqueue_data_and_set_ready(channel_id, std::move(block)); | 35 | 0 | return; | 36 | 0 | } | 37 | | // PartitionedBlock is used by shuffle exchanger. | 38 | | // PartitionedBlock will be push into multiple queues with different row ranges, so it will be | 39 | | // referenced multiple times. Otherwise, we only ref the block once because it is only push into | 40 | | // one queue. | 41 | 46.1k | std::unique_lock l(*_m[channel_id]); | 42 | | if constexpr (std::is_same_v<PartitionedBlock, BlockType> || | 43 | | std::is_same_v<BroadcastBlock, BlockType>) { | 44 | | block.first->record_channel_id(channel_id); | 45 | 46.1k | } else { | 46 | 46.1k | block->record_channel_id(channel_id); | 47 | 46.1k | } | 48 | | | 49 | 46.1k | if (_data_queue[channel_id].enqueue(std::move(block))) { | 50 | 46.1k | local_state->_shared_state->set_ready_to_read(channel_id); | 51 | 46.1k | } | 52 | 46.1k | } |
_ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE27_enqueue_data_and_set_readyEiPNS_27LocalExchangeSinkLocalStateEOS7_ Line | Count | Source | 32 | 1.10k | BlockType&& block) { | 33 | 1.10k | if (local_state == nullptr) { | 34 | 0 | _enqueue_data_and_set_ready(channel_id, std::move(block)); | 35 | 0 | return; | 36 | 0 | } | 37 | | // PartitionedBlock is used by shuffle exchanger. | 38 | | // PartitionedBlock will be push into multiple queues with different row ranges, so it will be | 39 | | // referenced multiple times. Otherwise, we only ref the block once because it is only push into | 40 | | // one queue. | 41 | 1.10k | std::unique_lock l(*_m[channel_id]); | 42 | | if constexpr (std::is_same_v<PartitionedBlock, BlockType> || | 43 | 1.10k | std::is_same_v<BroadcastBlock, BlockType>) { | 44 | 1.10k | block.first->record_channel_id(channel_id); | 45 | | } else { | 46 | | block->record_channel_id(channel_id); | 47 | | } | 48 | | | 49 | 1.10k | if (_data_queue[channel_id].enqueue(std::move(block))) { | 50 | 1.08k | local_state->_shared_state->set_ready_to_read(channel_id); | 51 | 1.08k | } | 52 | 1.10k | } |
|
53 | | |
54 | | template <typename BlockType> |
55 | | bool Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState* local_state, |
56 | | BlockType& block, bool* eos, Block* data_block, |
57 | 326k | int channel_id) { |
58 | 326k | if (local_state == nullptr) { |
59 | 20 | return _dequeue_data(block, eos, data_block, channel_id); |
60 | 20 | } |
61 | 326k | bool all_finished = _running_sink_operators == 0; |
62 | 326k | if (_data_queue[channel_id].try_dequeue(block)) { |
63 | | if constexpr (std::is_same_v<PartitionedBlock, BlockType> || |
64 | 1.12k | std::is_same_v<BroadcastBlock, BlockType>) { |
65 | 1.12k | local_state->_shared_state->sub_mem_usage(channel_id, block.first->_allocated_bytes); |
66 | 46.1k | } else { |
67 | 46.1k | local_state->_shared_state->sub_mem_usage(channel_id, block->_allocated_bytes); |
68 | 46.1k | data_block->swap(block->_data_block); |
69 | 46.1k | } |
70 | 47.2k | return true; |
71 | 278k | } else if (all_finished) { |
72 | 245k | *eos = true; |
73 | 245k | } else { |
74 | 33.7k | std::unique_lock l(*_m[channel_id]); |
75 | 33.7k | if (_data_queue[channel_id].try_dequeue(block)) { |
76 | | if constexpr (std::is_same_v<PartitionedBlock, BlockType> || |
77 | 0 | std::is_same_v<BroadcastBlock, BlockType>) { |
78 | 0 | local_state->_shared_state->sub_mem_usage(channel_id, |
79 | 0 | block.first->_allocated_bytes); |
80 | 2 | } else { |
81 | 2 | local_state->_shared_state->sub_mem_usage(channel_id, block->_allocated_bytes); |
82 | 2 | data_block->swap(block->_data_block); |
83 | 2 | } |
84 | 2 | return true; |
85 | 2 | } |
86 | 33.7k | COUNTER_UPDATE(local_state->_get_block_failed_counter, 1); |
87 | 33.7k | local_state->_dependency->block(); |
88 | 33.7k | } |
89 | 278k | return false; |
90 | 326k | } _ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE13_dequeue_dataEPNS_29LocalExchangeSourceLocalStateERS7_PbPNS_5BlockEi Line | Count | Source | 57 | 232 | int channel_id) { | 58 | 232 | if (local_state == nullptr) { | 59 | 4 | return _dequeue_data(block, eos, data_block, channel_id); | 60 | 4 | } | 61 | 228 | bool all_finished = _running_sink_operators == 0; | 62 | 228 | if (_data_queue[channel_id].try_dequeue(block)) { | 63 | | if constexpr (std::is_same_v<PartitionedBlock, BlockType> || | 64 | 36 | std::is_same_v<BroadcastBlock, BlockType>) { | 65 | 36 | local_state->_shared_state->sub_mem_usage(channel_id, block.first->_allocated_bytes); | 66 | | } else { | 67 | | local_state->_shared_state->sub_mem_usage(channel_id, block->_allocated_bytes); | 68 | | data_block->swap(block->_data_block); | 69 | | } | 70 | 36 | return true; | 71 | 192 | } else if (all_finished) { | 72 | 162 | *eos = true; | 73 | 162 | } else { | 74 | 30 | std::unique_lock l(*_m[channel_id]); | 75 | 30 | if (_data_queue[channel_id].try_dequeue(block)) { | 76 | | if constexpr (std::is_same_v<PartitionedBlock, BlockType> || | 77 | 0 | std::is_same_v<BroadcastBlock, BlockType>) { | 78 | 0 | local_state->_shared_state->sub_mem_usage(channel_id, | 79 | 0 | block.first->_allocated_bytes); | 80 | | } else { | 81 | | local_state->_shared_state->sub_mem_usage(channel_id, block->_allocated_bytes); | 82 | | data_block->swap(block->_data_block); | 83 | | } | 84 | 0 | return true; | 85 | 0 | } | 86 | 30 | COUNTER_UPDATE(local_state->_get_block_failed_counter, 1); | 87 | 30 | local_state->_dependency->block(); | 88 | 30 | } | 89 | 192 | return false; | 90 | 228 | } |
_ZN5doris9ExchangerISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE13_dequeue_dataEPNS_29LocalExchangeSourceLocalStateERS4_PbPNS_5BlockEi Line | Count | Source | 57 | 323k | int channel_id) { | 58 | 323k | if (local_state == nullptr) { | 59 | 12 | return _dequeue_data(block, eos, data_block, channel_id); | 60 | 12 | } | 61 | 323k | bool all_finished = _running_sink_operators == 0; | 62 | 323k | if (_data_queue[channel_id].try_dequeue(block)) { | 63 | | if constexpr (std::is_same_v<PartitionedBlock, BlockType> || | 64 | | std::is_same_v<BroadcastBlock, BlockType>) { | 65 | | local_state->_shared_state->sub_mem_usage(channel_id, block.first->_allocated_bytes); | 66 | 46.1k | } else { | 67 | 46.1k | local_state->_shared_state->sub_mem_usage(channel_id, block->_allocated_bytes); | 68 | 46.1k | data_block->swap(block->_data_block); | 69 | 46.1k | } | 70 | 46.1k | return true; | 71 | 277k | } else if (all_finished) { | 72 | 244k | *eos = true; | 73 | 244k | } else { | 74 | 33.1k | std::unique_lock l(*_m[channel_id]); | 75 | 33.1k | if (_data_queue[channel_id].try_dequeue(block)) { | 76 | | if constexpr (std::is_same_v<PartitionedBlock, BlockType> || | 77 | | std::is_same_v<BroadcastBlock, BlockType>) { | 78 | | local_state->_shared_state->sub_mem_usage(channel_id, | 79 | | block.first->_allocated_bytes); | 80 | 2 | } else { | 81 | 2 | local_state->_shared_state->sub_mem_usage(channel_id, block->_allocated_bytes); | 82 | 2 | data_block->swap(block->_data_block); | 83 | 2 | } | 84 | 2 | return true; | 85 | 2 | } | 86 | 33.1k | COUNTER_UPDATE(local_state->_get_block_failed_counter, 1); | 87 | 33.1k | local_state->_dependency->block(); | 88 | 33.1k | } | 89 | 277k | return false; | 90 | 323k | } |
_ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE13_dequeue_dataEPNS_29LocalExchangeSourceLocalStateERS7_PbPNS_5BlockEi Line | Count | Source | 57 | 2.62k | int channel_id) { | 58 | 2.62k | if (local_state == nullptr) { | 59 | 4 | return _dequeue_data(block, eos, data_block, channel_id); | 60 | 4 | } | 61 | 2.62k | bool all_finished = _running_sink_operators == 0; | 62 | 2.62k | if (_data_queue[channel_id].try_dequeue(block)) { | 63 | | if constexpr (std::is_same_v<PartitionedBlock, BlockType> || | 64 | 1.08k | std::is_same_v<BroadcastBlock, BlockType>) { | 65 | 1.08k | local_state->_shared_state->sub_mem_usage(channel_id, block.first->_allocated_bytes); | 66 | | } else { | 67 | | local_state->_shared_state->sub_mem_usage(channel_id, block->_allocated_bytes); | 68 | | data_block->swap(block->_data_block); | 69 | | } | 70 | 1.08k | return true; | 71 | 1.53k | } else if (all_finished) { | 72 | 932 | *eos = true; | 73 | 932 | } else { | 74 | 602 | std::unique_lock l(*_m[channel_id]); | 75 | 602 | if (_data_queue[channel_id].try_dequeue(block)) { | 76 | | if constexpr (std::is_same_v<PartitionedBlock, BlockType> || | 77 | 0 | std::is_same_v<BroadcastBlock, BlockType>) { | 78 | 0 | local_state->_shared_state->sub_mem_usage(channel_id, | 79 | 0 | block.first->_allocated_bytes); | 80 | | } else { | 81 | | local_state->_shared_state->sub_mem_usage(channel_id, block->_allocated_bytes); | 82 | | data_block->swap(block->_data_block); | 83 | | } | 84 | 0 | return true; | 85 | 0 | } | 86 | 602 | COUNTER_UPDATE(local_state->_get_block_failed_counter, 1); | 87 | 602 | local_state->_dependency->block(); | 88 | 602 | } | 89 | 1.53k | return false; | 90 | 2.62k | } |
|
91 | | |
92 | | template <typename BlockType> |
93 | 0 | void Exchanger<BlockType>::_enqueue_data_and_set_ready(int channel_id, BlockType&& block) { |
94 | | if constexpr (std::is_same_v<PartitionedBlock, BlockType> || |
95 | 0 | std::is_same_v<BroadcastBlock, BlockType>) { |
96 | 0 | block.first->record_channel_id(channel_id); |
97 | 0 | } else { |
98 | 0 | block->record_channel_id(channel_id); |
99 | 0 | } |
100 | 0 | _data_queue[channel_id].enqueue(std::move(block)); |
101 | 0 | } Unexecuted instantiation: _ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE27_enqueue_data_and_set_readyEiOS7_ Unexecuted instantiation: _ZN5doris9ExchangerISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE27_enqueue_data_and_set_readyEiOS4_ Unexecuted instantiation: _ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE27_enqueue_data_and_set_readyEiOS7_ |
102 | | |
103 | | template <typename BlockType> |
104 | | bool Exchanger<BlockType>::_dequeue_data(BlockType& block, bool* eos, Block* data_block, |
105 | 20 | int channel_id) { |
106 | 20 | if (_data_queue[channel_id].try_dequeue(block)) { |
107 | | if constexpr (!std::is_same_v<PartitionedBlock, BlockType> && |
108 | 0 | !std::is_same_v<BroadcastBlock, BlockType>) { |
109 | 0 | data_block->swap(block->_data_block); |
110 | 0 | } |
111 | 0 | return true; |
112 | 0 | } |
113 | 20 | return false; |
114 | 20 | } _ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE13_dequeue_dataERS7_PbPNS_5BlockEi Line | Count | Source | 105 | 4 | int channel_id) { | 106 | 4 | if (_data_queue[channel_id].try_dequeue(block)) { | 107 | | if constexpr (!std::is_same_v<PartitionedBlock, BlockType> && | 108 | | !std::is_same_v<BroadcastBlock, BlockType>) { | 109 | | data_block->swap(block->_data_block); | 110 | | } | 111 | 0 | return true; | 112 | 0 | } | 113 | 4 | return false; | 114 | 4 | } |
_ZN5doris9ExchangerISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE13_dequeue_dataERS4_PbPNS_5BlockEi Line | Count | Source | 105 | 12 | int channel_id) { | 106 | 12 | if (_data_queue[channel_id].try_dequeue(block)) { | 107 | | if constexpr (!std::is_same_v<PartitionedBlock, BlockType> && | 108 | 0 | !std::is_same_v<BroadcastBlock, BlockType>) { | 109 | 0 | data_block->swap(block->_data_block); | 110 | 0 | } | 111 | 0 | return true; | 112 | 0 | } | 113 | 12 | return false; | 114 | 12 | } |
_ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE13_dequeue_dataERS7_PbPNS_5BlockEi Line | Count | Source | 105 | 4 | int channel_id) { | 106 | 4 | if (_data_queue[channel_id].try_dequeue(block)) { | 107 | | if constexpr (!std::is_same_v<PartitionedBlock, BlockType> && | 108 | | !std::is_same_v<BroadcastBlock, BlockType>) { | 109 | | data_block->swap(block->_data_block); | 110 | | } | 111 | 0 | return true; | 112 | 0 | } | 113 | 4 | return false; | 114 | 4 | } |
|
115 | | |
116 | | Status ShuffleExchanger::sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile, |
117 | 141 | SinkInfo& sink_info) { |
118 | 141 | if (in_block->empty()) { |
119 | 96 | return Status::OK(); |
120 | 96 | } |
121 | 45 | { |
122 | 45 | SCOPED_TIMER(profile.compute_hash_value_timer); |
123 | 45 | RETURN_IF_ERROR(sink_info.partitioner->do_partitioning(state, in_block)); |
124 | 45 | } |
125 | 45 | { |
126 | 45 | SCOPED_TIMER(profile.distribute_timer); |
127 | 45 | RETURN_IF_ERROR(_split_rows(state, sink_info.partitioner->get_channel_ids(), in_block, |
128 | 45 | *sink_info.channel_id, sink_info.local_state, |
129 | 45 | sink_info.shuffle_idx_to_instance_idx)); |
130 | 45 | } |
131 | | |
132 | 44 | sink_info.local_state->_memory_used_counter->set( |
133 | 44 | sink_info.local_state->_shared_state->mem_usage); |
134 | 44 | return Status::OK(); |
135 | 45 | } |
136 | | |
137 | 100 | void ShuffleExchanger::close(SourceInfo&& source_info) { |
138 | 100 | PartitionedBlock partitioned_block; |
139 | 100 | bool eos; |
140 | 100 | Block block; |
141 | 100 | _data_queue[source_info.channel_id].set_eos(); |
142 | 100 | while (_dequeue_data(source_info.local_state, partitioned_block, &eos, &block, |
143 | 100 | source_info.channel_id)) { |
144 | | // do nothing |
145 | 0 | } |
146 | 100 | } |
147 | | |
148 | | Status ShuffleExchanger::get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile, |
149 | 96 | SourceInfo&& source_info) { |
150 | 96 | PartitionedBlock partitioned_block; |
151 | 96 | MutableBlock mutable_block; |
152 | | |
153 | 96 | const auto max_batch_size = state->block_max_rows(); |
154 | 96 | const auto max_block_bytes = state->block_max_bytes(); |
155 | | |
156 | 96 | auto get_data = [&]() -> Status { |
157 | 36 | do { |
158 | 36 | const auto* offset_start = partitioned_block.second.row_idxs->data() + |
159 | 36 | partitioned_block.second.offset_start; |
160 | 36 | auto block_wrapper = partitioned_block.first; |
161 | 36 | RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->_data_block, offset_start, |
162 | 36 | offset_start + partitioned_block.second.length)); |
163 | 36 | if (max_block_bytes > 0 && mutable_block.bytes() >= max_block_bytes) { |
164 | 0 | break; |
165 | 0 | } |
166 | 36 | } while (mutable_block.rows() < max_batch_size && !*eos && |
167 | 36 | _dequeue_data(source_info.local_state, partitioned_block, eos, block, |
168 | 36 | source_info.channel_id)); |
169 | 32 | return Status::OK(); |
170 | 32 | }; |
171 | | |
172 | 96 | if (_dequeue_data(source_info.local_state, partitioned_block, eos, block, |
173 | 96 | source_info.channel_id)) { |
174 | 32 | SCOPED_TIMER(profile.copy_data_timer); |
175 | 32 | mutable_block = VectorizedUtils::build_mutable_mem_reuse_block( |
176 | 32 | block, partitioned_block.first->_data_block); |
177 | 32 | RETURN_IF_ERROR(get_data()); |
178 | 32 | } |
179 | 96 | return Status::OK(); |
180 | 96 | } |
181 | | |
182 | | Status ShuffleExchanger::_split_rows(RuntimeState* state, const std::vector<uint32_t>& channel_ids, |
183 | | Block* block, int channel_id, |
184 | | LocalExchangeSinkLocalState* local_state, |
185 | 45 | std::map<int, int>* shuffle_idx_to_instance_idx) { |
186 | 45 | if (local_state == nullptr) { |
187 | 0 | return _split_rows(state, channel_ids, block, channel_id); |
188 | 0 | } |
189 | 45 | const auto rows = cast_set<int32_t>(block->rows()); |
190 | 45 | auto row_idx = std::make_shared<PODArray<uint32_t>>(rows); |
191 | 45 | auto& partition_rows_histogram = _partition_rows_histogram[channel_id]; |
192 | 45 | { |
193 | 45 | partition_rows_histogram.assign(_num_partitions + 1, 0); |
194 | 303 | for (int32_t i = 0; i < rows; ++i) { |
195 | 258 | partition_rows_histogram[channel_ids[i]]++; |
196 | 258 | } |
197 | 225 | for (int32_t i = 1; i <= _num_partitions; ++i) { |
198 | 180 | partition_rows_histogram[i] += partition_rows_histogram[i - 1]; |
199 | 180 | } |
200 | 303 | for (int32_t i = rows - 1; i >= 0; --i) { |
201 | 258 | (*row_idx)[partition_rows_histogram[channel_ids[i]] - 1] = i; |
202 | 258 | partition_rows_histogram[channel_ids[i]]--; |
203 | 258 | } |
204 | 45 | } |
205 | | |
206 | 45 | Block data_block; |
207 | 45 | std::shared_ptr<BlockWrapper> new_block_wrapper; |
208 | 45 | if (!_free_blocks.try_dequeue(data_block)) { |
209 | 34 | data_block = block->clone_empty(); |
210 | 34 | } |
211 | 45 | data_block.swap(*block); |
212 | 45 | new_block_wrapper = |
213 | 45 | BlockWrapper::create_shared(std::move(data_block), local_state->_shared_state, -1); |
214 | 45 | if (new_block_wrapper->_data_block.empty()) { |
215 | 0 | return Status::OK(); |
216 | 0 | } |
217 | | /** |
218 | | * Data are hash-shuffled and distributed to all instances of |
219 | | * all BEs. So we need a shuffleId-To-InstanceId mapping. |
220 | | * For example, row 1 get a hash value 1 which means we should distribute to instance 1 on |
221 | | * BE 1 and row 2 get a hash value 2 which means we should distribute to instance 1 on BE 3. |
222 | | */ |
223 | 45 | DCHECK(shuffle_idx_to_instance_idx && shuffle_idx_to_instance_idx->size() > 0); |
224 | 45 | const auto& map = *shuffle_idx_to_instance_idx; |
225 | 45 | int32_t enqueue_rows = 0; |
226 | 179 | for (const auto& it : map) { |
227 | 179 | DCHECK(it.second >= 0 && it.second < _num_partitions) |
228 | 0 | << it.first << " : " << it.second << " " << _num_partitions; |
229 | 179 | uint32_t start = partition_rows_histogram[it.first]; |
230 | 179 | uint32_t size = partition_rows_histogram[it.first + 1] - start; |
231 | 179 | if (size > 0) { |
232 | 44 | enqueue_rows += size; |
233 | 44 | _enqueue_data_and_set_ready( |
234 | 44 | it.second, local_state, |
235 | 44 | {new_block_wrapper, |
236 | 44 | {.row_idxs = row_idx, .offset_start = start, .length = size}}); |
237 | 44 | } |
238 | 179 | } |
239 | 45 | if (enqueue_rows != rows) [[unlikely]] { |
240 | 1 | fmt::memory_buffer debug_string_buffer; |
241 | 1 | fmt::format_to(debug_string_buffer, "Type: {}, Local Exchange Id: {}, Shuffled Map: ", |
242 | 1 | get_exchange_type_name(get_type()), local_state->parent()->node_id()); |
243 | 3 | for (const auto& it : map) { |
244 | 3 | fmt::format_to(debug_string_buffer, "[{}:{}], ", it.first, it.second); |
245 | 3 | } |
246 | 1 | return Status::InternalError( |
247 | 1 | "Rows mismatched! Data may be lost. [Expected enqueue rows={}, Real enqueue " |
248 | 1 | "rows={}, Detail: {}]", |
249 | 1 | rows, enqueue_rows, fmt::to_string(debug_string_buffer)); |
250 | 1 | } |
251 | | |
252 | 44 | return Status::OK(); |
253 | 45 | } |
254 | | |
255 | | Status ShuffleExchanger::_split_rows(RuntimeState* state, const std::vector<uint32_t>& channel_ids, |
256 | 0 | Block* block, int channel_id) { |
257 | 0 | const auto rows = cast_set<int32_t>(block->rows()); |
258 | 0 | auto row_idx = std::make_shared<PODArray<uint32_t>>(rows); |
259 | 0 | auto& partition_rows_histogram = _partition_rows_histogram[channel_id]; |
260 | 0 | { |
261 | 0 | partition_rows_histogram.assign(_num_partitions + 1, 0); |
262 | 0 | for (int32_t i = 0; i < rows; ++i) { |
263 | 0 | partition_rows_histogram[channel_ids[i]]++; |
264 | 0 | } |
265 | 0 | for (int32_t i = 1; i <= _num_partitions; ++i) { |
266 | 0 | partition_rows_histogram[i] += partition_rows_histogram[i - 1]; |
267 | 0 | } |
268 | 0 | for (int32_t i = rows - 1; i >= 0; --i) { |
269 | 0 | (*row_idx)[partition_rows_histogram[channel_ids[i]] - 1] = i; |
270 | 0 | partition_rows_histogram[channel_ids[i]]--; |
271 | 0 | } |
272 | 0 | } |
273 | |
|
274 | 0 | Block data_block; |
275 | 0 | std::shared_ptr<BlockWrapper> new_block_wrapper; |
276 | 0 | if (!_free_blocks.try_dequeue(data_block)) { |
277 | 0 | data_block = block->clone_empty(); |
278 | 0 | } |
279 | 0 | data_block.swap(*block); |
280 | 0 | new_block_wrapper = BlockWrapper::create_shared(std::move(data_block), nullptr, -1); |
281 | 0 | if (new_block_wrapper->_data_block.empty()) { |
282 | 0 | return Status::OK(); |
283 | 0 | } |
284 | 0 | for (int i = 0; i < _num_partitions; i++) { |
285 | 0 | uint32_t start = partition_rows_histogram[i]; |
286 | 0 | uint32_t size = partition_rows_histogram[i + 1] - start; |
287 | 0 | if (size > 0) { |
288 | 0 | _enqueue_data_and_set_ready( |
289 | 0 | i, {new_block_wrapper, |
290 | 0 | {.row_idxs = row_idx, .offset_start = start, .length = size}}); |
291 | 0 | } |
292 | 0 | } |
293 | |
|
294 | 0 | return Status::OK(); |
295 | 0 | } |
296 | | |
297 | | Status PassthroughExchanger::sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile, |
298 | 63.1k | SinkInfo& sink_info) { |
299 | 63.1k | if (in_block->empty()) { |
300 | 18.5k | return Status::OK(); |
301 | 18.5k | } |
302 | 44.5k | Block new_block; |
303 | 44.5k | if (!_free_blocks.try_dequeue(new_block)) { |
304 | 16.2k | new_block = {in_block->clone_empty()}; |
305 | 16.2k | } |
306 | 44.5k | new_block.swap(*in_block); |
307 | 44.5k | auto channel_id = ((*sink_info.channel_id)++) % _num_partitions; |
308 | 44.5k | BlockWrapperSPtr wrapper = BlockWrapper::create_shared( |
309 | 44.5k | std::move(new_block), |
310 | 44.5k | sink_info.local_state ? sink_info.local_state->_shared_state : nullptr, channel_id); |
311 | | |
312 | 44.5k | _enqueue_data_and_set_ready(channel_id, sink_info.local_state, std::move(wrapper)); |
313 | | |
314 | 44.5k | sink_info.local_state->_memory_used_counter->set( |
315 | 44.5k | sink_info.local_state->_shared_state->mem_usage); |
316 | | |
317 | 44.5k | return Status::OK(); |
318 | 63.1k | } |
319 | | |
320 | 118k | void PassthroughExchanger::close(SourceInfo&& source_info) { |
321 | 118k | Block next_block; |
322 | 118k | BlockWrapperSPtr wrapper; |
323 | 118k | bool eos; |
324 | 118k | _data_queue[source_info.channel_id].set_eos(); |
325 | 118k | while (_dequeue_data(source_info.local_state, wrapper, &eos, &next_block, |
326 | 118k | source_info.channel_id)) { |
327 | | // do nothing |
328 | 0 | } |
329 | 118k | } |
330 | | |
331 | 5.32k | void PassToOneExchanger::close(SourceInfo&& source_info) { |
332 | 5.32k | Block next_block; |
333 | 5.32k | BlockWrapperSPtr wrapper; |
334 | 5.32k | bool eos; |
335 | 5.32k | _data_queue[source_info.channel_id].set_eos(); |
336 | 5.32k | while (_dequeue_data(source_info.local_state, wrapper, &eos, &next_block, |
337 | 5.32k | source_info.channel_id)) { |
338 | | // do nothing |
339 | 0 | } |
340 | 5.32k | } |
341 | | |
342 | | Status PassthroughExchanger::get_block(RuntimeState* state, Block* block, bool* eos, |
343 | 195k | Profile&& profile, SourceInfo&& source_info) { |
344 | 195k | BlockWrapperSPtr next_block; |
345 | 195k | _dequeue_data(source_info.local_state, next_block, eos, block, source_info.channel_id); |
346 | 195k | return Status::OK(); |
347 | 195k | } |
348 | | |
349 | | Status PassToOneExchanger::sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile, |
350 | 1.77k | SinkInfo& sink_info) { |
351 | 1.77k | if (in_block->empty()) { |
352 | 666 | return Status::OK(); |
353 | 666 | } |
354 | 1.10k | Block new_block; |
355 | 1.10k | if (!_free_blocks.try_dequeue(new_block)) { |
356 | 747 | new_block = {in_block->clone_empty()}; |
357 | 747 | } |
358 | 1.10k | new_block.swap(*in_block); |
359 | | |
360 | 1.10k | BlockWrapperSPtr wrapper = BlockWrapper::create_shared( |
361 | 1.10k | std::move(new_block), |
362 | 1.10k | sink_info.local_state ? sink_info.local_state->_shared_state : nullptr, 0); |
363 | 1.10k | _enqueue_data_and_set_ready(0, sink_info.local_state, std::move(wrapper)); |
364 | | |
365 | 1.10k | sink_info.local_state->_memory_used_counter->set( |
366 | 1.10k | sink_info.local_state->_shared_state->mem_usage); |
367 | | |
368 | 1.10k | return Status::OK(); |
369 | 1.77k | } |
370 | | |
371 | | Status PassToOneExchanger::get_block(RuntimeState* state, Block* block, bool* eos, |
372 | 2.07k | Profile&& profile, SourceInfo&& source_info) { |
373 | 2.07k | if (source_info.channel_id != 0) { |
374 | 3 | *eos = true; |
375 | 3 | return Status::OK(); |
376 | 3 | } |
377 | 2.07k | BlockWrapperSPtr next_block; |
378 | 2.07k | _dequeue_data(source_info.local_state, next_block, eos, block, source_info.channel_id); |
379 | 2.07k | return Status::OK(); |
380 | 2.07k | } |
381 | | |
382 | 15.6k | void ExchangerBase::finalize() { |
383 | 15.6k | DCHECK(_running_source_operators == 0); |
384 | 15.6k | Block block; |
385 | 33.2k | while (_free_blocks.try_dequeue(block)) { |
386 | | // do nothing |
387 | 17.5k | } |
388 | 15.6k | } |
389 | | |
390 | | Status BroadcastExchanger::sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile, |
391 | 204 | SinkInfo& sink_info) { |
392 | 204 | if (in_block->empty()) { |
393 | 58 | return Status::OK(); |
394 | 58 | } |
395 | 146 | Block new_block; |
396 | 146 | if (!_free_blocks.try_dequeue(new_block)) { |
397 | 71 | new_block = {in_block->clone_empty()}; |
398 | 71 | } |
399 | 146 | new_block.swap(*in_block); |
400 | 146 | auto wrapper = BlockWrapper::create_shared( |
401 | 146 | std::move(new_block), |
402 | 146 | sink_info.local_state ? sink_info.local_state->_shared_state : nullptr, -1); |
403 | 1.25k | for (int i = 0; i < _num_partitions; i++) { |
404 | 1.10k | _enqueue_data_and_set_ready( |
405 | 1.10k | i, sink_info.local_state, |
406 | 1.10k | {wrapper, {.offset_start = 0, .length = wrapper->_data_block.rows()}}); |
407 | 1.10k | } |
408 | | |
409 | 146 | return Status::OK(); |
410 | 204 | } |
411 | | |
412 | 468 | void BroadcastExchanger::close(SourceInfo&& source_info) { |
413 | 468 | BroadcastBlock partitioned_block; |
414 | 468 | bool eos; |
415 | 468 | Block block; |
416 | 468 | _data_queue[source_info.channel_id].set_eos(); |
417 | 468 | while (_dequeue_data(source_info.local_state, partitioned_block, &eos, &block, |
418 | 468 | source_info.channel_id)) { |
419 | | // do nothing |
420 | 0 | } |
421 | 468 | } |
422 | | |
423 | | Status BroadcastExchanger::get_block(RuntimeState* state, Block* block, bool* eos, |
424 | 2.15k | Profile&& profile, SourceInfo&& source_info) { |
425 | 2.15k | BroadcastBlock partitioned_block; |
426 | | |
427 | 2.15k | if (_dequeue_data(source_info.local_state, partitioned_block, eos, block, |
428 | 2.15k | source_info.channel_id)) { |
429 | 1.08k | SCOPED_TIMER(profile.copy_data_timer); |
430 | 1.08k | MutableBlock mutable_block = VectorizedUtils::build_mutable_mem_reuse_block( |
431 | 1.08k | block, partitioned_block.first->_data_block); |
432 | 1.08k | auto block_wrapper = partitioned_block.first; |
433 | 1.08k | RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->_data_block, |
434 | 1.08k | partitioned_block.second.offset_start, |
435 | 1.08k | partitioned_block.second.length)); |
436 | 1.08k | } |
437 | | |
438 | 2.15k | return Status::OK(); |
439 | 2.15k | } |
440 | | |
441 | | Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state, Block* in_block, |
442 | 20 | SinkInfo& sink_info) { |
443 | 20 | Block new_block; |
444 | 20 | if (!_free_blocks.try_dequeue(new_block)) { |
445 | 13 | new_block = {in_block->clone_empty()}; |
446 | 13 | } |
447 | 20 | new_block.swap(*in_block); |
448 | 20 | auto channel_id = ((*sink_info.channel_id)++) % _num_partitions; |
449 | 20 | _enqueue_data_and_set_ready( |
450 | 20 | channel_id, sink_info.local_state, |
451 | 20 | BlockWrapper::create_shared( |
452 | 20 | std::move(new_block), |
453 | 20 | sink_info.local_state ? sink_info.local_state->_shared_state : nullptr, |
454 | 20 | channel_id)); |
455 | | |
456 | 20 | sink_info.local_state->_memory_used_counter->set( |
457 | 20 | sink_info.local_state->_shared_state->mem_usage); |
458 | 20 | return Status::OK(); |
459 | 20 | } |
460 | | |
461 | | Status AdaptivePassthroughExchanger::_shuffle_sink(RuntimeState* state, Block* block, |
462 | 182 | SinkInfo& sink_info) { |
463 | 182 | std::vector<uint32_t> channel_ids; |
464 | 182 | const auto num_rows = block->rows(); |
465 | 182 | channel_ids.resize(num_rows, 0); |
466 | 182 | if (num_rows <= _num_partitions) { |
467 | 178 | std::iota(channel_ids.begin(), channel_ids.end(), 0); |
468 | 178 | } else { |
469 | 4 | size_t i = 0; |
470 | 12 | for (; i < num_rows - _num_partitions; i += _num_partitions) { |
471 | 8 | std::iota(channel_ids.begin() + i, channel_ids.begin() + i + _num_partitions, 0); |
472 | 8 | } |
473 | 4 | if (i < num_rows - 1) { |
474 | 4 | std::iota(channel_ids.begin() + i, channel_ids.end(), 0); |
475 | 4 | } |
476 | 4 | } |
477 | | |
478 | 182 | sink_info.local_state->_memory_used_counter->set( |
479 | 182 | sink_info.local_state->_shared_state->mem_usage); |
480 | 182 | RETURN_IF_ERROR(_split_rows(state, channel_ids, block, sink_info)); |
481 | 182 | return Status::OK(); |
482 | 182 | } |
483 | | |
484 | | Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state, |
485 | | const std::vector<uint32_t>& channel_ids, |
486 | 182 | Block* block, SinkInfo& sink_info) { |
487 | 182 | const auto rows = cast_set<int32_t>(block->rows()); |
488 | 182 | auto row_idx = std::make_shared<std::vector<uint32_t>>(rows); |
489 | 182 | auto& partition_rows_histogram = _partition_rows_histogram[*sink_info.channel_id]; |
490 | 182 | { |
491 | 182 | partition_rows_histogram.assign(_num_partitions + 1, 0); |
492 | 710 | for (int32_t i = 0; i < rows; ++i) { |
493 | 528 | partition_rows_histogram[channel_ids[i]]++; |
494 | 528 | } |
495 | 1.62k | for (int32_t i = 1; i <= _num_partitions; ++i) { |
496 | 1.44k | partition_rows_histogram[i] += partition_rows_histogram[i - 1]; |
497 | 1.44k | } |
498 | | |
499 | 710 | for (int32_t i = rows - 1; i >= 0; --i) { |
500 | 528 | (*row_idx)[partition_rows_histogram[channel_ids[i]] - 1] = i; |
501 | 528 | partition_rows_histogram[channel_ids[i]]--; |
502 | 528 | } |
503 | 182 | } |
504 | 1.62k | for (int32_t i = 0; i < _num_partitions; i++) { |
505 | 1.44k | const size_t start = partition_rows_histogram[i]; |
506 | 1.44k | const size_t size = partition_rows_histogram[i + 1] - start; |
507 | 1.44k | if (size > 0) { |
508 | 496 | std::unique_ptr<MutableBlock> mutable_block = |
509 | 496 | MutableBlock::create_unique(block->clone_empty()); |
510 | 496 | RETURN_IF_ERROR(mutable_block->add_rows(block, start, size)); |
511 | 496 | auto new_block = mutable_block->to_block(); |
512 | | |
513 | 496 | _enqueue_data_and_set_ready( |
514 | 496 | i, sink_info.local_state, |
515 | 496 | BlockWrapper::create_shared( |
516 | 496 | std::move(new_block), |
517 | 496 | sink_info.local_state ? sink_info.local_state->_shared_state : nullptr, |
518 | 496 | i)); |
519 | 496 | } |
520 | 1.44k | } |
521 | 182 | return Status::OK(); |
522 | 182 | } |
523 | | |
524 | | Status AdaptivePassthroughExchanger::sink(RuntimeState* state, Block* in_block, bool eos, |
525 | 664 | Profile&& profile, SinkInfo& sink_info) { |
526 | 664 | if (in_block->empty()) { |
527 | 464 | return Status::OK(); |
528 | 464 | } |
529 | 200 | if (_is_pass_through) { |
530 | 20 | return _passthrough_sink(state, in_block, sink_info); |
531 | 180 | } else { |
532 | 180 | if (++_total_block >= _num_partitions) { |
533 | 1 | _is_pass_through = true; |
534 | 1 | } |
535 | 180 | return _shuffle_sink(state, in_block, sink_info); |
536 | 180 | } |
537 | 200 | } |
538 | | |
539 | | Status AdaptivePassthroughExchanger::get_block(RuntimeState* state, Block* block, bool* eos, |
540 | 990 | Profile&& profile, SourceInfo&& source_info) { |
541 | 990 | BlockWrapperSPtr next_block; |
542 | 990 | _dequeue_data(source_info.local_state, next_block, eos, block, source_info.channel_id); |
543 | 990 | return Status::OK(); |
544 | 990 | } |
545 | | |
546 | 468 | void AdaptivePassthroughExchanger::close(SourceInfo&& source_info) { |
547 | 468 | Block next_block; |
548 | 468 | bool eos; |
549 | 468 | BlockWrapperSPtr wrapper; |
550 | 468 | _data_queue[source_info.channel_id].set_eos(); |
551 | 468 | while (_dequeue_data(source_info.local_state, wrapper, &eos, &next_block, |
552 | 468 | source_info.channel_id)) { |
553 | | // do nothing |
554 | 0 | } |
555 | 468 | } |
556 | | |
557 | | } // namespace doris |