Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/exchange/local_exchanger.h" |
19 | | |
20 | | #include "common/cast_set.h" |
21 | | #include "common/status.h" |
22 | | #include "exec/exchange/local_exchange_sink_operator.h" |
23 | | #include "exec/exchange/local_exchange_source_operator.h" |
24 | | #include "exec/partitioner/partitioner.h" |
25 | | |
26 | | namespace doris { |
27 | | template <typename BlockType> |
28 | | void Exchanger<BlockType>::_enqueue_data_and_set_ready(int channel_id, |
29 | | LocalExchangeSinkLocalState* local_state, |
30 | 267k | BlockType&& block) { |
31 | 267k | if (local_state == nullptr) { |
32 | 0 | _enqueue_data_and_set_ready(channel_id, std::move(block)); |
33 | 0 | return; |
34 | 0 | } |
35 | | // PartitionedBlock is used by shuffle exchanger. |
36 | | // PartitionedBlock will be push into multiple queues with different row ranges, so it will be |
37 | | // referenced multiple times. Otherwise, we only ref the block once because it is only push into |
38 | | // one queue. |
39 | 267k | std::unique_lock l(*_m[channel_id]); |
40 | | if constexpr (std::is_same_v<PartitionedBlock, BlockType> || |
41 | 61.5k | std::is_same_v<BroadcastBlock, BlockType>) { |
42 | 61.5k | block.first->record_channel_id(channel_id); |
43 | 205k | } else { |
44 | 205k | block->record_channel_id(channel_id); |
45 | 205k | } |
46 | | |
47 | 267k | if (_data_queue[channel_id].enqueue(std::move(block))) { |
48 | 267k | local_state->_shared_state->set_ready_to_read(channel_id); |
49 | 267k | } |
50 | 267k | } _ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE27_enqueue_data_and_set_readyEiPNS_27LocalExchangeSinkLocalStateEOS7_ Line | Count | Source | 30 | 43.0k | BlockType&& block) { | 31 | 43.0k | if (local_state == nullptr) { | 32 | 0 | _enqueue_data_and_set_ready(channel_id, std::move(block)); | 33 | 0 | return; | 34 | 0 | } | 35 | | // PartitionedBlock is used by shuffle exchanger. | 36 | | // PartitionedBlock will be push into multiple queues with different row ranges, so it will be | 37 | | // referenced multiple times. Otherwise, we only ref the block once because it is only push into | 38 | | // one queue. | 39 | 43.0k | std::unique_lock l(*_m[channel_id]); | 40 | | if constexpr (std::is_same_v<PartitionedBlock, BlockType> || | 41 | 43.0k | std::is_same_v<BroadcastBlock, BlockType>) { | 42 | 43.0k | block.first->record_channel_id(channel_id); | 43 | | } else { | 44 | | block->record_channel_id(channel_id); | 45 | | } | 46 | | | 47 | 43.0k | if (_data_queue[channel_id].enqueue(std::move(block))) { | 48 | 43.0k | local_state->_shared_state->set_ready_to_read(channel_id); | 49 | 43.0k | } | 50 | 43.0k | } |
_ZN5doris9ExchangerISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE27_enqueue_data_and_set_readyEiPNS_27LocalExchangeSinkLocalStateEOS4_ Line | Count | Source | 30 | 205k | BlockType&& block) { | 31 | 205k | if (local_state == nullptr) { | 32 | 0 | _enqueue_data_and_set_ready(channel_id, std::move(block)); | 33 | 0 | return; | 34 | 0 | } | 35 | | // PartitionedBlock is used by shuffle exchanger. | 36 | | // PartitionedBlock will be push into multiple queues with different row ranges, so it will be | 37 | | // referenced multiple times. Otherwise, we only ref the block once because it is only push into | 38 | | // one queue. | 39 | 205k | std::unique_lock l(*_m[channel_id]); | 40 | | if constexpr (std::is_same_v<PartitionedBlock, BlockType> || | 41 | | std::is_same_v<BroadcastBlock, BlockType>) { | 42 | | block.first->record_channel_id(channel_id); | 43 | 205k | } else { | 44 | 205k | block->record_channel_id(channel_id); | 45 | 205k | } | 46 | | | 47 | 205k | if (_data_queue[channel_id].enqueue(std::move(block))) { | 48 | 205k | local_state->_shared_state->set_ready_to_read(channel_id); | 49 | 205k | } | 50 | 205k | } |
_ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE27_enqueue_data_and_set_readyEiPNS_27LocalExchangeSinkLocalStateEOS7_ Line | Count | Source | 30 | 18.4k | BlockType&& block) { | 31 | 18.4k | if (local_state == nullptr) { | 32 | 0 | _enqueue_data_and_set_ready(channel_id, std::move(block)); | 33 | 0 | return; | 34 | 0 | } | 35 | | // PartitionedBlock is used by shuffle exchanger. | 36 | | // PartitionedBlock will be push into multiple queues with different row ranges, so it will be | 37 | | // referenced multiple times. Otherwise, we only ref the block once because it is only push into | 38 | | // one queue. | 39 | 18.4k | std::unique_lock l(*_m[channel_id]); | 40 | | if constexpr (std::is_same_v<PartitionedBlock, BlockType> || | 41 | 18.4k | std::is_same_v<BroadcastBlock, BlockType>) { | 42 | 18.4k | block.first->record_channel_id(channel_id); | 43 | | } else { | 44 | | block->record_channel_id(channel_id); | 45 | | } | 46 | | | 47 | 18.4k | if (_data_queue[channel_id].enqueue(std::move(block))) { | 48 | 18.4k | local_state->_shared_state->set_ready_to_read(channel_id); | 49 | 18.4k | } | 50 | 18.4k | } |
|
51 | | |
52 | | template <typename BlockType> |
53 | | bool Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState* local_state, |
54 | | BlockType& block, bool* eos, Block* data_block, |
55 | 2.17M | int channel_id) { |
56 | 2.17M | if (local_state == nullptr) { |
57 | 20 | return _dequeue_data(block, eos, data_block, channel_id); |
58 | 20 | } |
59 | 2.17M | bool all_finished = _running_sink_operators == 0; |
60 | 2.17M | if (_data_queue[channel_id].try_dequeue(block)) { |
61 | | if constexpr (std::is_same_v<PartitionedBlock, BlockType> || |
62 | 61.4k | std::is_same_v<BroadcastBlock, BlockType>) { |
63 | 61.4k | local_state->_shared_state->sub_mem_usage(channel_id, block.first->_allocated_bytes); |
64 | 205k | } else { |
65 | 205k | local_state->_shared_state->sub_mem_usage(channel_id, block->_allocated_bytes); |
66 | 205k | data_block->swap(block->_data_block); |
67 | 205k | } |
68 | 267k | return true; |
69 | 1.90M | } else if (all_finished) { |
70 | 1.73M | *eos = true; |
71 | 1.73M | } else { |
72 | 166k | std::unique_lock l(*_m[channel_id]); |
73 | 166k | if (_data_queue[channel_id].try_dequeue(block)) { |
74 | | if constexpr (std::is_same_v<PartitionedBlock, BlockType> || |
75 | 9 | std::is_same_v<BroadcastBlock, BlockType>) { |
76 | 9 | local_state->_shared_state->sub_mem_usage(channel_id, |
77 | 9 | block.first->_allocated_bytes); |
78 | 9 | } else { |
79 | 9 | local_state->_shared_state->sub_mem_usage(channel_id, block->_allocated_bytes); |
80 | 9 | data_block->swap(block->_data_block); |
81 | 9 | } |
82 | 18 | return true; |
83 | 18 | } |
84 | 166k | COUNTER_UPDATE(local_state->_get_block_failed_counter, 1); |
85 | 166k | local_state->_dependency->block(); |
86 | 166k | } |
87 | 1.90M | return false; |
88 | 2.17M | } _ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE13_dequeue_dataEPNS_29LocalExchangeSourceLocalStateERS7_PbPNS_5BlockEi Line | Count | Source | 55 | 431k | int channel_id) { | 56 | 431k | if (local_state == nullptr) { | 57 | 8 | return _dequeue_data(block, eos, data_block, channel_id); | 58 | 8 | } | 59 | 431k | bool all_finished = _running_sink_operators == 0; | 60 | 431k | if (_data_queue[channel_id].try_dequeue(block)) { | 61 | | if constexpr (std::is_same_v<PartitionedBlock, BlockType> || | 62 | 43.0k | std::is_same_v<BroadcastBlock, BlockType>) { | 63 | 43.0k | local_state->_shared_state->sub_mem_usage(channel_id, block.first->_allocated_bytes); | 64 | | } else { | 65 | | local_state->_shared_state->sub_mem_usage(channel_id, block->_allocated_bytes); | 66 | | data_block->swap(block->_data_block); | 67 | | } | 68 | 43.0k | return true; | 69 | 388k | } else if (all_finished) { | 70 | 362k | *eos = true; | 71 | 362k | } else { | 72 | 25.8k | std::unique_lock l(*_m[channel_id]); | 73 | 25.8k | if (_data_queue[channel_id].try_dequeue(block)) { | 74 | | if constexpr (std::is_same_v<PartitionedBlock, BlockType> || | 75 | 6 | std::is_same_v<BroadcastBlock, BlockType>) { | 76 | 6 | local_state->_shared_state->sub_mem_usage(channel_id, | 77 | 6 | block.first->_allocated_bytes); | 78 | | } else { | 79 | | local_state->_shared_state->sub_mem_usage(channel_id, block->_allocated_bytes); | 80 | | data_block->swap(block->_data_block); | 81 | | } | 82 | 6 | return true; | 83 | 6 | } | 84 | 25.8k | COUNTER_UPDATE(local_state->_get_block_failed_counter, 1); | 85 | 25.8k | local_state->_dependency->block(); | 86 | 25.8k | } | 87 | 388k | return false; | 88 | 431k | } |
_ZN5doris9ExchangerISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE13_dequeue_dataEPNS_29LocalExchangeSourceLocalStateERS4_PbPNS_5BlockEi Line | Count | Source | 55 | 1.69M | int channel_id) { | 56 | 1.69M | if (local_state == nullptr) { | 57 | 8 | return _dequeue_data(block, eos, data_block, channel_id); | 58 | 8 | } | 59 | 1.69M | bool all_finished = _running_sink_operators == 0; | 60 | 1.69M | if (_data_queue[channel_id].try_dequeue(block)) { | 61 | | if constexpr (std::is_same_v<PartitionedBlock, BlockType> || | 62 | | std::is_same_v<BroadcastBlock, BlockType>) { | 63 | | local_state->_shared_state->sub_mem_usage(channel_id, block.first->_allocated_bytes); | 64 | 205k | } else { | 65 | 205k | local_state->_shared_state->sub_mem_usage(channel_id, block->_allocated_bytes); | 66 | 205k | data_block->swap(block->_data_block); | 67 | 205k | } | 68 | 205k | return true; | 69 | 1.49M | } else if (all_finished) { | 70 | 1.35M | *eos = true; | 71 | 1.35M | } else { | 72 | 136k | std::unique_lock l(*_m[channel_id]); | 73 | 136k | if (_data_queue[channel_id].try_dequeue(block)) { | 74 | | if constexpr (std::is_same_v<PartitionedBlock, BlockType> || | 75 | | std::is_same_v<BroadcastBlock, BlockType>) { | 76 | | local_state->_shared_state->sub_mem_usage(channel_id, | 77 | | block.first->_allocated_bytes); | 78 | 9 | } else { | 79 | 9 | local_state->_shared_state->sub_mem_usage(channel_id, block->_allocated_bytes); | 80 | 9 | data_block->swap(block->_data_block); | 81 | 9 | } | 82 | 9 | return true; | 83 | 9 | } | 84 | 136k | COUNTER_UPDATE(local_state->_get_block_failed_counter, 1); | 85 | 136k | local_state->_dependency->block(); | 86 | 136k | } | 87 | 1.49M | return false; | 88 | 1.69M | } |
_ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE13_dequeue_dataEPNS_29LocalExchangeSourceLocalStateERS7_PbPNS_5BlockEi Line | Count | Source | 55 | 41.7k | int channel_id) { | 56 | 41.7k | if (local_state == nullptr) { | 57 | 4 | return _dequeue_data(block, eos, data_block, channel_id); | 58 | 4 | } | 59 | 41.6k | bool all_finished = _running_sink_operators == 0; | 60 | 41.6k | if (_data_queue[channel_id].try_dequeue(block)) { | 61 | | if constexpr (std::is_same_v<PartitionedBlock, BlockType> || | 62 | 18.4k | std::is_same_v<BroadcastBlock, BlockType>) { | 63 | 18.4k | local_state->_shared_state->sub_mem_usage(channel_id, block.first->_allocated_bytes); | 64 | | } else { | 65 | | local_state->_shared_state->sub_mem_usage(channel_id, block->_allocated_bytes); | 66 | | data_block->swap(block->_data_block); | 67 | | } | 68 | 18.4k | return true; | 69 | 23.2k | } else if (all_finished) { | 70 | 18.8k | *eos = true; | 71 | 18.8k | } else { | 72 | 4.45k | std::unique_lock l(*_m[channel_id]); | 73 | 4.45k | if (_data_queue[channel_id].try_dequeue(block)) { | 74 | | if constexpr (std::is_same_v<PartitionedBlock, BlockType> || | 75 | 3 | std::is_same_v<BroadcastBlock, BlockType>) { | 76 | 3 | local_state->_shared_state->sub_mem_usage(channel_id, | 77 | 3 | block.first->_allocated_bytes); | 78 | | } else { | 79 | | local_state->_shared_state->sub_mem_usage(channel_id, block->_allocated_bytes); | 80 | | data_block->swap(block->_data_block); | 81 | | } | 82 | 3 | return true; | 83 | 3 | } | 84 | 4.45k | COUNTER_UPDATE(local_state->_get_block_failed_counter, 1); | 85 | 4.45k | local_state->_dependency->block(); | 86 | 4.45k | } | 87 | 23.2k | return false; | 88 | 41.6k | } |
|
89 | | |
90 | | template <typename BlockType> |
91 | 0 | void Exchanger<BlockType>::_enqueue_data_and_set_ready(int channel_id, BlockType&& block) { |
92 | | if constexpr (std::is_same_v<PartitionedBlock, BlockType> || |
93 | 0 | std::is_same_v<BroadcastBlock, BlockType>) { |
94 | 0 | block.first->record_channel_id(channel_id); |
95 | 0 | } else { |
96 | 0 | block->record_channel_id(channel_id); |
97 | 0 | } |
98 | 0 | _data_queue[channel_id].enqueue(std::move(block)); |
99 | 0 | } Unexecuted instantiation: _ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE27_enqueue_data_and_set_readyEiOS7_ Unexecuted instantiation: _ZN5doris9ExchangerISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE27_enqueue_data_and_set_readyEiOS4_ Unexecuted instantiation: _ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE27_enqueue_data_and_set_readyEiOS7_ |
100 | | |
101 | | template <typename BlockType> |
102 | | bool Exchanger<BlockType>::_dequeue_data(BlockType& block, bool* eos, Block* data_block, |
103 | 20 | int channel_id) { |
104 | 20 | if (_data_queue[channel_id].try_dequeue(block)) { |
105 | | if constexpr (!std::is_same_v<PartitionedBlock, BlockType> && |
106 | 0 | !std::is_same_v<BroadcastBlock, BlockType>) { |
107 | 0 | data_block->swap(block->_data_block); |
108 | 0 | } |
109 | 0 | return true; |
110 | 0 | } |
111 | 20 | return false; |
112 | 20 | } _ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE13_dequeue_dataERS7_PbPNS_5BlockEi Line | Count | Source | 103 | 8 | int channel_id) { | 104 | 8 | if (_data_queue[channel_id].try_dequeue(block)) { | 105 | | if constexpr (!std::is_same_v<PartitionedBlock, BlockType> && | 106 | | !std::is_same_v<BroadcastBlock, BlockType>) { | 107 | | data_block->swap(block->_data_block); | 108 | | } | 109 | 0 | return true; | 110 | 0 | } | 111 | 8 | return false; | 112 | 8 | } |
_ZN5doris9ExchangerISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE13_dequeue_dataERS4_PbPNS_5BlockEi Line | Count | Source | 103 | 8 | int channel_id) { | 104 | 8 | if (_data_queue[channel_id].try_dequeue(block)) { | 105 | | if constexpr (!std::is_same_v<PartitionedBlock, BlockType> && | 106 | 0 | !std::is_same_v<BroadcastBlock, BlockType>) { | 107 | 0 | data_block->swap(block->_data_block); | 108 | 0 | } | 109 | 0 | return true; | 110 | 0 | } | 111 | 8 | return false; | 112 | 8 | } |
_ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE13_dequeue_dataERS7_PbPNS_5BlockEi Line | Count | Source | 103 | 4 | int channel_id) { | 104 | 4 | if (_data_queue[channel_id].try_dequeue(block)) { | 105 | | if constexpr (!std::is_same_v<PartitionedBlock, BlockType> && | 106 | | !std::is_same_v<BroadcastBlock, BlockType>) { | 107 | | data_block->swap(block->_data_block); | 108 | | } | 109 | 0 | return true; | 110 | 0 | } | 111 | 4 | return false; | 112 | 4 | } |
|
113 | | |
114 | | Status ShuffleExchanger::sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile, |
115 | 195k | SinkInfo& sink_info) { |
116 | 195k | if (in_block->empty()) { |
117 | 175k | return Status::OK(); |
118 | 175k | } |
119 | 19.6k | { |
120 | 19.6k | SCOPED_TIMER(profile.compute_hash_value_timer); |
121 | 19.6k | RETURN_IF_ERROR(sink_info.partitioner->do_partitioning(state, in_block)); |
122 | 19.6k | } |
123 | 19.6k | { |
124 | 19.6k | SCOPED_TIMER(profile.distribute_timer); |
125 | 19.6k | RETURN_IF_ERROR(_split_rows(state, sink_info.partitioner->get_channel_ids(), in_block, |
126 | 19.6k | *sink_info.channel_id, sink_info.local_state, |
127 | 19.6k | sink_info.shuffle_idx_to_instance_idx)); |
128 | 19.6k | } |
129 | | |
130 | 19.6k | sink_info.local_state->_memory_used_counter->set( |
131 | 19.6k | sink_info.local_state->_shared_state->mem_usage); |
132 | 19.6k | return Status::OK(); |
133 | 19.6k | } |
134 | | |
135 | 176k | void ShuffleExchanger::close(SourceInfo&& source_info) { |
136 | 176k | PartitionedBlock partitioned_block; |
137 | 176k | bool eos; |
138 | 176k | Block block; |
139 | 176k | _data_queue[source_info.channel_id].set_eos(); |
140 | 176k | while (_dequeue_data(source_info.local_state, partitioned_block, &eos, &block, |
141 | 176k | source_info.channel_id)) { |
142 | | // do nothing |
143 | 36 | } |
144 | 176k | } |
145 | | |
146 | | Status ShuffleExchanger::get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile, |
147 | 200k | SourceInfo&& source_info) { |
148 | 200k | PartitionedBlock partitioned_block; |
149 | 200k | if (_dequeue_data(source_info.local_state, partitioned_block, eos, block, |
150 | 200k | source_info.channel_id)) { |
151 | 30.0k | SCOPED_TIMER(profile.copy_data_timer); |
152 | 30.0k | auto scoped_mutable_block = VectorizedUtils::build_scoped_mutable_mem_reuse_block( |
153 | 30.0k | block, partitioned_block.first->_data_block); |
154 | 30.0k | auto& mutable_block = scoped_mutable_block.mutable_block(); |
155 | 35.9k | do { |
156 | 35.9k | const auto* offset_start = partitioned_block.second.row_idxs->data() + |
157 | 35.9k | partitioned_block.second.offset_start; |
158 | 35.9k | auto block_wrapper = partitioned_block.first; |
159 | 35.9k | RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->_data_block, offset_start, |
160 | 35.9k | offset_start + partitioned_block.second.length)); |
161 | 35.9k | } while (mutable_block.rows() < state->batch_size() && !*eos && |
162 | 35.9k | _dequeue_data(source_info.local_state, partitioned_block, eos, block, |
163 | 35.0k | source_info.channel_id)); |
164 | 30.0k | } |
165 | 200k | return Status::OK(); |
166 | 200k | } |
167 | | |
168 | | Status ShuffleExchanger::_split_rows(RuntimeState* state, const std::vector<uint32_t>& channel_ids, |
169 | | Block* block, int channel_id, |
170 | | LocalExchangeSinkLocalState* local_state, |
171 | 19.5k | std::map<int, int>* shuffle_idx_to_instance_idx) { |
172 | 19.5k | if (local_state == nullptr) { |
173 | 0 | return _split_rows(state, channel_ids, block, channel_id); |
174 | 0 | } |
175 | 19.5k | const auto rows = cast_set<int32_t>(block->rows()); |
176 | 19.5k | auto row_idx = std::make_shared<PODArray<uint32_t>>(rows); |
177 | 19.5k | auto& partition_rows_histogram = _partition_rows_histogram[channel_id]; |
178 | 19.5k | { |
179 | 19.5k | partition_rows_histogram.assign(_num_partitions + 1, 0); |
180 | 12.2M | for (int32_t i = 0; i < rows; ++i) { |
181 | 12.2M | partition_rows_histogram[channel_ids[i]]++; |
182 | 12.2M | } |
183 | 236k | for (int32_t i = 1; i <= _num_partitions; ++i) { |
184 | 217k | partition_rows_histogram[i] += partition_rows_histogram[i - 1]; |
185 | 217k | } |
186 | 11.8M | for (int32_t i = rows - 1; i >= 0; --i) { |
187 | 11.8M | (*row_idx)[partition_rows_histogram[channel_ids[i]] - 1] = i; |
188 | 11.8M | partition_rows_histogram[channel_ids[i]]--; |
189 | 11.8M | } |
190 | 19.5k | } |
191 | | |
192 | 19.5k | Block data_block; |
193 | 19.5k | std::shared_ptr<BlockWrapper> new_block_wrapper; |
194 | 19.5k | if (!_free_blocks.try_dequeue(data_block)) { |
195 | 10.6k | data_block = block->clone_empty(); |
196 | 10.6k | } |
197 | 19.5k | data_block.swap(*block); |
198 | 19.5k | new_block_wrapper = |
199 | 19.5k | BlockWrapper::create_shared(std::move(data_block), local_state->_shared_state, -1); |
200 | 19.5k | if (new_block_wrapper->_data_block.empty()) { |
201 | 0 | return Status::OK(); |
202 | 0 | } |
203 | | /** |
204 | | * Data are hash-shuffled and distributed to all instances of |
205 | | * all BEs. So we need a shuffleId-To-InstanceId mapping. |
206 | | * For example, row 1 get a hash value 1 which means we should distribute to instance 1 on |
207 | | * BE 1 and row 2 get a hash value 2 which means we should distribute to instance 1 on BE 3. |
208 | | */ |
209 | 19.5k | DCHECK(shuffle_idx_to_instance_idx && !shuffle_idx_to_instance_idx->empty()); |
210 | 19.5k | const auto& map = *shuffle_idx_to_instance_idx; |
211 | 19.5k | int32_t enqueue_rows = 0; |
212 | 217k | for (const auto& it : map) { |
213 | 217k | DCHECK(it.second >= 0 && it.second < _num_partitions) |
214 | 0 | << it.first << " : " << it.second << " " << _num_partitions; |
215 | 217k | uint32_t start = partition_rows_histogram[it.first]; |
216 | 217k | uint32_t size = partition_rows_histogram[it.first + 1] - start; |
217 | 217k | if (size > 0) { |
218 | 36.0k | enqueue_rows += size; |
219 | 36.0k | _enqueue_data_and_set_ready( |
220 | 36.0k | it.second, local_state, |
221 | 36.0k | {new_block_wrapper, |
222 | 36.0k | {.row_idxs = row_idx, .offset_start = start, .length = size}}); |
223 | 36.0k | } |
224 | 217k | } |
225 | 19.5k | if (enqueue_rows != rows) [[unlikely]] { |
226 | 1 | fmt::memory_buffer debug_string_buffer; |
227 | 1 | fmt::format_to(debug_string_buffer, "Type: {}, Local Exchange Id: {}, Shuffled Map: ", |
228 | 1 | get_exchange_type_name(get_type()), local_state->parent()->node_id()); |
229 | 3 | for (const auto& it : map) { |
230 | 3 | fmt::format_to(debug_string_buffer, "[{}:{}], ", it.first, it.second); |
231 | 3 | } |
232 | 1 | return Status::InternalError( |
233 | 1 | "Rows mismatched! Data may be lost. [Expected enqueue rows={}, Real enqueue " |
234 | 1 | "rows={}, Detail: {}]", |
235 | 1 | rows, enqueue_rows, fmt::to_string(debug_string_buffer)); |
236 | 1 | } |
237 | | |
238 | 19.5k | return Status::OK(); |
239 | 19.5k | } |
240 | | |
241 | | Status ShuffleExchanger::_split_rows(RuntimeState* state, const std::vector<uint32_t>& channel_ids, |
242 | 0 | Block* block, int channel_id) { |
243 | 0 | const auto rows = cast_set<int32_t>(block->rows()); |
244 | 0 | auto row_idx = std::make_shared<PODArray<uint32_t>>(rows); |
245 | 0 | auto& partition_rows_histogram = _partition_rows_histogram[channel_id]; |
246 | 0 | { |
247 | 0 | partition_rows_histogram.assign(_num_partitions + 1, 0); |
248 | 0 | for (int32_t i = 0; i < rows; ++i) { |
249 | 0 | partition_rows_histogram[channel_ids[i]]++; |
250 | 0 | } |
251 | 0 | for (int32_t i = 1; i <= _num_partitions; ++i) { |
252 | 0 | partition_rows_histogram[i] += partition_rows_histogram[i - 1]; |
253 | 0 | } |
254 | 0 | for (int32_t i = rows - 1; i >= 0; --i) { |
255 | 0 | (*row_idx)[partition_rows_histogram[channel_ids[i]] - 1] = i; |
256 | 0 | partition_rows_histogram[channel_ids[i]]--; |
257 | 0 | } |
258 | 0 | } |
259 | |
|
260 | 0 | Block data_block; |
261 | 0 | std::shared_ptr<BlockWrapper> new_block_wrapper; |
262 | 0 | if (!_free_blocks.try_dequeue(data_block)) { |
263 | 0 | data_block = block->clone_empty(); |
264 | 0 | } |
265 | 0 | data_block.swap(*block); |
266 | 0 | new_block_wrapper = BlockWrapper::create_shared(std::move(data_block), nullptr, -1); |
267 | 0 | if (new_block_wrapper->_data_block.empty()) { |
268 | 0 | return Status::OK(); |
269 | 0 | } |
270 | 0 | for (int i = 0; i < _num_partitions; i++) { |
271 | 0 | uint32_t start = partition_rows_histogram[i]; |
272 | 0 | uint32_t size = partition_rows_histogram[i + 1] - start; |
273 | 0 | if (size > 0) { |
274 | 0 | _enqueue_data_and_set_ready( |
275 | 0 | i, {new_block_wrapper, |
276 | 0 | {.row_idxs = row_idx, .offset_start = start, .length = size}}); |
277 | 0 | } |
278 | 0 | } |
279 | |
|
280 | 0 | return Status::OK(); |
281 | 0 | } |
282 | | |
283 | | Status PassthroughExchanger::sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile, |
284 | 304k | SinkInfo& sink_info) { |
285 | 304k | if (in_block->empty()) { |
286 | 101k | return Status::OK(); |
287 | 101k | } |
288 | 202k | Block new_block; |
289 | 202k | if (!_free_blocks.try_dequeue(new_block)) { |
290 | 80.1k | new_block = {in_block->clone_empty()}; |
291 | 80.1k | } |
292 | 202k | new_block.swap(*in_block); |
293 | 202k | auto channel_id = ((*sink_info.channel_id)++) % _num_partitions; |
294 | 202k | BlockWrapperSPtr wrapper = BlockWrapper::create_shared( |
295 | 202k | std::move(new_block), |
296 | 202k | sink_info.local_state ? sink_info.local_state->_shared_state : nullptr, channel_id); |
297 | | |
298 | 202k | _enqueue_data_and_set_ready(channel_id, sink_info.local_state, std::move(wrapper)); |
299 | | |
300 | 202k | sink_info.local_state->_memory_used_counter->set( |
301 | 202k | sink_info.local_state->_shared_state->mem_usage); |
302 | | |
303 | 202k | return Status::OK(); |
304 | 304k | } |
305 | | |
306 | 669k | void PassthroughExchanger::close(SourceInfo&& source_info) { |
307 | 669k | Block next_block; |
308 | 669k | BlockWrapperSPtr wrapper; |
309 | 669k | bool eos; |
310 | 669k | _data_queue[source_info.channel_id].set_eos(); |
311 | 670k | while (_dequeue_data(source_info.local_state, wrapper, &eos, &next_block, |
312 | 670k | source_info.channel_id)) { |
313 | | // do nothing |
314 | 1.37k | } |
315 | 669k | } |
316 | | |
317 | 14.2k | void PassToOneExchanger::close(SourceInfo&& source_info) { |
318 | 14.2k | Block next_block; |
319 | 14.2k | BlockWrapperSPtr wrapper; |
320 | 14.2k | bool eos; |
321 | 14.2k | _data_queue[source_info.channel_id].set_eos(); |
322 | 14.2k | while (_dequeue_data(source_info.local_state, wrapper, &eos, &next_block, |
323 | 14.2k | source_info.channel_id)) { |
324 | | // do nothing |
325 | 0 | } |
326 | 14.2k | } |
327 | | |
328 | | Status PassthroughExchanger::get_block(RuntimeState* state, Block* block, bool* eos, |
329 | 1.00M | Profile&& profile, SourceInfo&& source_info) { |
330 | 1.00M | BlockWrapperSPtr next_block; |
331 | 1.00M | _dequeue_data(source_info.local_state, next_block, eos, block, source_info.channel_id); |
332 | 1.00M | return Status::OK(); |
333 | 1.00M | } |
334 | | |
335 | | Status PassToOneExchanger::sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile, |
336 | 5.08k | SinkInfo& sink_info) { |
337 | 5.08k | if (in_block->empty()) { |
338 | 1.89k | return Status::OK(); |
339 | 1.89k | } |
340 | 3.18k | Block new_block; |
341 | 3.18k | if (!_free_blocks.try_dequeue(new_block)) { |
342 | 2.06k | new_block = {in_block->clone_empty()}; |
343 | 2.06k | } |
344 | 3.18k | new_block.swap(*in_block); |
345 | | |
346 | 3.18k | BlockWrapperSPtr wrapper = BlockWrapper::create_shared( |
347 | 3.18k | std::move(new_block), |
348 | 3.18k | sink_info.local_state ? sink_info.local_state->_shared_state : nullptr, 0); |
349 | 3.18k | _enqueue_data_and_set_ready(0, sink_info.local_state, std::move(wrapper)); |
350 | | |
351 | 3.18k | sink_info.local_state->_memory_used_counter->set( |
352 | 3.18k | sink_info.local_state->_shared_state->mem_usage); |
353 | | |
354 | 3.18k | return Status::OK(); |
355 | 5.08k | } |
356 | | |
357 | | Status PassToOneExchanger::get_block(RuntimeState* state, Block* block, bool* eos, |
358 | 5.98k | Profile&& profile, SourceInfo&& source_info) { |
359 | 5.98k | if (source_info.channel_id != 0) { |
360 | 3 | *eos = true; |
361 | 3 | return Status::OK(); |
362 | 3 | } |
363 | 5.98k | BlockWrapperSPtr next_block; |
364 | 5.98k | _dequeue_data(source_info.local_state, next_block, eos, block, source_info.channel_id); |
365 | 5.98k | return Status::OK(); |
366 | 5.98k | } |
367 | | |
368 | 117k | void ExchangerBase::finalize() { |
369 | 117k | DCHECK(_running_source_operators == 0); |
370 | 117k | Block block; |
371 | 211k | while (_free_blocks.try_dequeue(block)) { |
372 | | // do nothing |
373 | 94.5k | } |
374 | 117k | } |
375 | | |
376 | | Status BroadcastExchanger::sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile, |
377 | 3.33k | SinkInfo& sink_info) { |
378 | 3.33k | if (in_block->empty()) { |
379 | 1.17k | return Status::OK(); |
380 | 1.17k | } |
381 | 2.16k | Block new_block; |
382 | 2.16k | if (!_free_blocks.try_dequeue(new_block)) { |
383 | 1.62k | new_block = {in_block->clone_empty()}; |
384 | 1.62k | } |
385 | 2.16k | new_block.swap(*in_block); |
386 | 2.16k | auto wrapper = BlockWrapper::create_shared( |
387 | 2.16k | std::move(new_block), |
388 | 2.16k | sink_info.local_state ? sink_info.local_state->_shared_state : nullptr, -1); |
389 | 20.6k | for (int i = 0; i < _num_partitions; i++) { |
390 | 18.4k | _enqueue_data_and_set_ready( |
391 | 18.4k | i, sink_info.local_state, |
392 | 18.4k | {wrapper, {.offset_start = 0, .length = wrapper->_data_block.rows()}}); |
393 | 18.4k | } |
394 | | |
395 | 2.16k | return Status::OK(); |
396 | 3.33k | } |
397 | | |
398 | 9.40k | void BroadcastExchanger::close(SourceInfo&& source_info) { |
399 | 9.40k | BroadcastBlock partitioned_block; |
400 | 9.40k | bool eos; |
401 | 9.40k | Block block; |
402 | 9.40k | _data_queue[source_info.channel_id].set_eos(); |
403 | 9.40k | while (_dequeue_data(source_info.local_state, partitioned_block, &eos, &block, |
404 | 9.40k | source_info.channel_id)) { |
405 | | // do nothing |
406 | 0 | } |
407 | 9.40k | } |
408 | | |
409 | | Status BroadcastExchanger::get_block(RuntimeState* state, Block* block, bool* eos, |
410 | 32.2k | Profile&& profile, SourceInfo&& source_info) { |
411 | 32.2k | BroadcastBlock partitioned_block; |
412 | | |
413 | 32.2k | if (_dequeue_data(source_info.local_state, partitioned_block, eos, block, |
414 | 32.2k | source_info.channel_id)) { |
415 | 18.4k | SCOPED_TIMER(profile.copy_data_timer); |
416 | 18.4k | auto scoped_mutable_block = VectorizedUtils::build_scoped_mutable_mem_reuse_block( |
417 | 18.4k | block, partitioned_block.first->_data_block); |
418 | 18.4k | auto& mutable_block = scoped_mutable_block.mutable_block(); |
419 | 18.4k | auto block_wrapper = partitioned_block.first; |
420 | 18.4k | RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->_data_block, |
421 | 18.4k | partitioned_block.second.offset_start, |
422 | 18.4k | partitioned_block.second.length)); |
423 | 18.4k | } |
424 | | |
425 | 32.2k | return Status::OK(); |
426 | 32.2k | } |
427 | | |
428 | | Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state, Block* in_block, |
429 | 827 | SinkInfo& sink_info) { |
430 | 827 | Block new_block; |
431 | 827 | if (!_free_blocks.try_dequeue(new_block)) { |
432 | 410 | new_block = {in_block->clone_empty()}; |
433 | 410 | } |
434 | 827 | new_block.swap(*in_block); |
435 | 827 | auto channel_id = ((*sink_info.channel_id)++) % _num_partitions; |
436 | 827 | _enqueue_data_and_set_ready( |
437 | 827 | channel_id, sink_info.local_state, |
438 | 827 | {BlockWrapper::create_shared( |
439 | 827 | std::move(new_block), |
440 | 18.4E | sink_info.local_state ? sink_info.local_state->_shared_state : nullptr, -1), |
441 | 827 | {.row_idxs = nullptr, .offset_start = 0, .length = 0}}); |
442 | | |
443 | 827 | sink_info.local_state->_memory_used_counter->set( |
444 | 827 | sink_info.local_state->_shared_state->mem_usage); |
445 | 827 | return Status::OK(); |
446 | 827 | } |
447 | | |
448 | | Status AdaptivePassthroughExchanger::_shuffle_sink(RuntimeState* state, Block* block, |
449 | 1.98k | SinkInfo& sink_info) { |
450 | 1.98k | std::vector<uint32_t> channel_ids; |
451 | 1.98k | const auto num_rows = block->rows(); |
452 | 1.98k | channel_ids.resize(num_rows, 0); |
453 | 1.98k | if (num_rows <= _num_partitions) { |
454 | 1.46k | std::iota(channel_ids.begin(), channel_ids.end(), 0); |
455 | 1.46k | } else { |
456 | 521 | size_t i = 0; |
457 | 8.31k | for (; i < num_rows - _num_partitions; i += _num_partitions) { |
458 | 7.79k | std::iota(channel_ids.begin() + i, channel_ids.begin() + i + _num_partitions, 0); |
459 | 7.79k | } |
460 | 521 | if (i < num_rows - 1) { |
461 | 343 | std::iota(channel_ids.begin() + i, channel_ids.end(), 0); |
462 | 343 | } |
463 | 521 | } |
464 | | |
465 | 1.98k | sink_info.local_state->_memory_used_counter->set( |
466 | 1.98k | sink_info.local_state->_shared_state->mem_usage); |
467 | 1.98k | RETURN_IF_ERROR(_split_rows(state, channel_ids, block, sink_info)); |
468 | 1.98k | return Status::OK(); |
469 | 1.98k | } |
470 | | |
471 | | Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state, |
472 | | const std::vector<uint32_t>& channel_ids, |
473 | 1.98k | Block* block, SinkInfo& sink_info) { |
474 | 1.98k | const auto rows = cast_set<int32_t>(block->rows()); |
475 | 1.98k | auto row_idx = std::make_shared<PODArray<uint32_t>>(rows); |
476 | 1.98k | auto& partition_rows_histogram = _partition_rows_histogram[sink_info.ins_idx]; |
477 | 1.98k | { |
478 | 1.98k | partition_rows_histogram.assign(_num_partitions + 1, 0); |
479 | 41.1k | for (int32_t i = 0; i < rows; ++i) { |
480 | 39.1k | partition_rows_histogram[channel_ids[i]]++; |
481 | 39.1k | } |
482 | 13.2k | for (int32_t i = 1; i <= _num_partitions; ++i) { |
483 | 11.2k | partition_rows_histogram[i] += partition_rows_histogram[i - 1]; |
484 | 11.2k | } |
485 | | |
486 | 41.1k | for (int32_t i = rows - 1; i >= 0; --i) { |
487 | 39.1k | (*row_idx)[partition_rows_histogram[channel_ids[i]] - 1] = i; |
488 | 39.1k | partition_rows_histogram[channel_ids[i]]--; |
489 | 39.1k | } |
490 | 1.98k | } |
491 | 1.98k | Block data_block; |
492 | 1.98k | if (!_free_blocks.try_dequeue(data_block)) { |
493 | 1.65k | data_block = block->clone_empty(); |
494 | 1.65k | } |
495 | 1.98k | data_block.swap(*block); |
496 | 1.98k | std::shared_ptr<BlockWrapper> new_block_wrapper = BlockWrapper::create_shared( |
497 | 1.98k | std::move(data_block), sink_info.local_state->_shared_state, sink_info.ins_idx); |
498 | 1.98k | if (new_block_wrapper->_data_block.empty()) { |
499 | 0 | return Status::OK(); |
500 | 0 | } |
501 | 13.2k | for (int32_t i = 0; i < _num_partitions; i++) { |
502 | 11.2k | const uint32_t start = partition_rows_histogram[i]; |
503 | 11.2k | const uint32_t size = partition_rows_histogram[i + 1] - start; |
504 | 11.2k | if (size > 0) { |
505 | 6.22k | _enqueue_data_and_set_ready( |
506 | 6.22k | i, sink_info.local_state, |
507 | 6.22k | {new_block_wrapper, |
508 | 6.22k | {.row_idxs = row_idx, .offset_start = start, .length = size}}); |
509 | 6.22k | } |
510 | 11.2k | } |
511 | 1.98k | return Status::OK(); |
512 | 1.98k | } |
513 | | |
514 | | Status AdaptivePassthroughExchanger::sink(RuntimeState* state, Block* in_block, bool eos, |
515 | 6.66k | Profile&& profile, SinkInfo& sink_info) { |
516 | 6.66k | if (in_block->empty()) { |
517 | 3.84k | return Status::OK(); |
518 | 3.84k | } |
519 | 2.81k | if (_is_pass_through) { |
520 | 829 | return _passthrough_sink(state, in_block, sink_info); |
521 | 1.98k | } else { |
522 | 1.98k | if (++_total_block >= _num_partitions) { |
523 | 365 | _is_pass_through = true; |
524 | 365 | } |
525 | 1.98k | return _shuffle_sink(state, in_block, sink_info); |
526 | 1.98k | } |
527 | 2.81k | } |
528 | | |
529 | | Status AdaptivePassthroughExchanger::get_block(RuntimeState* state, Block* block, bool* eos, |
530 | 8.41k | Profile&& profile, SourceInfo&& source_info) { |
531 | 8.41k | if (!_tmp_block[source_info.channel_id].empty()) { |
532 | 277 | *block = std::move(_tmp_block[source_info.channel_id]); |
533 | 277 | *eos = _tmp_eos[source_info.channel_id]; |
534 | 277 | _tmp_block[source_info.channel_id] = {}; |
535 | 277 | return Status::OK(); |
536 | 277 | } |
537 | 8.13k | PartitionedBlock partitioned_block; |
538 | 8.13k | if (_dequeue_data(source_info.local_state, partitioned_block, eos, block, |
539 | 8.13k | source_info.channel_id)) { |
540 | 4.35k | SCOPED_TIMER(profile.copy_data_timer); |
541 | 4.35k | auto scoped_mutable_block = VectorizedUtils::build_scoped_mutable_mem_reuse_block( |
542 | 4.35k | block, partitioned_block.first->_data_block); |
543 | 4.35k | auto& mutable_block = scoped_mutable_block.mutable_block(); |
544 | 7.04k | do { |
545 | 7.04k | if (partitioned_block.second.row_idxs == nullptr) { |
546 | | // The passthrough path which means the block is not partitioned, we can directly move the block without copying. |
547 | 825 | if (mutable_block.rows() > 0) { |
548 | 277 | _tmp_block[source_info.channel_id] = |
549 | 277 | std::move(partitioned_block.first->_data_block); |
550 | 277 | _tmp_eos[source_info.channel_id] = *eos; |
551 | 277 | *eos = false; |
552 | 548 | } else { |
553 | 548 | scoped_mutable_block.restore(); |
554 | 548 | *block = std::move(partitioned_block.first->_data_block); |
555 | 548 | } |
556 | 825 | break; |
557 | 825 | } |
558 | 6.22k | const auto* offset_start = partitioned_block.second.row_idxs->data() + |
559 | 6.22k | partitioned_block.second.offset_start; |
560 | 6.22k | auto block_wrapper = partitioned_block.first; |
561 | 6.22k | RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->_data_block, offset_start, |
562 | 6.22k | offset_start + partitioned_block.second.length)); |
563 | 6.22k | } while (mutable_block.rows() < state->batch_size() && !*eos && |
564 | 6.22k | _dequeue_data(source_info.local_state, partitioned_block, eos, block, |
565 | 6.22k | source_info.channel_id)); |
566 | 4.35k | } |
567 | 8.13k | return Status::OK(); |
568 | 8.13k | } |
569 | | |
570 | 4.75k | void AdaptivePassthroughExchanger::close(SourceInfo&& source_info) { |
571 | 4.75k | PartitionedBlock partitioned_block; |
572 | 4.75k | bool eos; |
573 | 4.75k | Block block; |
574 | 4.75k | _data_queue[source_info.channel_id].set_eos(); |
575 | 4.75k | while (_dequeue_data(source_info.local_state, partitioned_block, &eos, &block, |
576 | 4.75k | source_info.channel_id)) { |
577 | | // do nothing |
578 | 3 | } |
579 | 4.75k | } |
580 | | |
581 | | } // namespace doris |