be/src/exec/exchange/local_exchanger.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include "exec/operator/operator.h" |
21 | | #include "exec/pipeline/dependency.h" |
22 | | |
23 | | namespace doris { |
24 | | template <typename T> |
25 | | void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks, |
26 | | RuntimeProfile::Counter* memory_used_counter = nullptr); |
27 | | |
28 | | class PartitionerBase; |
29 | | class LocalExchangeSourceLocalState; |
30 | | class LocalExchangeSinkLocalState; |
31 | | |
32 | | struct Profile { |
33 | | RuntimeProfile::Counter* compute_hash_value_timer = nullptr; |
34 | | RuntimeProfile::Counter* distribute_timer = nullptr; |
35 | | RuntimeProfile::Counter* copy_data_timer = nullptr; |
36 | | }; |
37 | | |
38 | | struct SinkInfo { |
39 | | int* channel_id; |
40 | | PartitionerBase* partitioner; |
41 | | LocalExchangeSinkLocalState* local_state; |
42 | | std::map<int, int>* shuffle_idx_to_instance_idx; |
43 | | }; |
44 | | |
45 | | struct SourceInfo { |
46 | | int channel_id; |
47 | | LocalExchangeSourceLocalState* local_state; |
48 | | }; |
49 | | /** |
50 | | * One exchanger is hold by one `LocalExchangeSharedState`. And one `LocalExchangeSharedState` is |
51 | | * shared by all local exchange sink operators and source operators with the same id. |
52 | | * |
53 | | * In exchanger, two block queues is maintained, one is data block queue and another is free block queue. |
54 | | * |
55 | | * In details, data block queue has queues as many as source operators. Each source operator will get |
56 | | * data block from the corresponding queue. Data blocks is push into the queue by sink operators. One |
57 | | * sink operator will push blocks into one or more queues. |
58 | | * |
59 | | * Free block is used to reuse the allocated memory. To reduce the memory limit, we also use a conf |
60 | | * to limit the size of free block queue. |
61 | | */ |
62 | | class ExchangerBase { |
63 | | public: |
64 | | /** |
65 | | * `BlockWrapper` is used to wrap a data block with a reference count. |
66 | | * |
67 | | * In function `unref()`, if `ref_count` decremented to 0, which means this block is not needed by |
68 | | * operators, so we put it into `_free_blocks` to reuse its memory if needed and refresh memory usage |
69 | | * in current queue. |
70 | | * |
71 | | * Note: `ref_count` will be larger than 1 only if this block is shared between multiple queues in |
72 | | * shuffle exchanger. |
73 | | */ |
74 | | class BlockWrapper { |
75 | | public: |
76 | | ENABLE_FACTORY_CREATOR(BlockWrapper); |
77 | | BlockWrapper(Block&& data_block, LocalExchangeSharedState* shared_state, int channel_id) |
78 | 114 | : _data_block(std::move(data_block)), |
79 | 114 | _shared_state(shared_state), |
80 | 114 | _allocated_bytes(_data_block.allocated_bytes()) { |
81 | 114 | if (_shared_state) { |
82 | 114 | _shared_state->add_total_mem_usage(_allocated_bytes); |
83 | 114 | } |
84 | 114 | } |
85 | 114 | ~BlockWrapper() { |
86 | 114 | if (_shared_state != nullptr) { |
87 | 114 | DCHECK_GT(_allocated_bytes, 0); |
88 | | // `_channel_ids` may be empty if exchanger is shuffled exchanger and channel id is |
89 | | // not used by `sub_total_mem_usage`. So we just pass -1 here. |
90 | 114 | _shared_state->sub_total_mem_usage(_allocated_bytes); |
91 | 114 | if (_shared_state->exchanger->_free_block_limit == 0 || |
92 | 114 | _shared_state->exchanger->_free_blocks.size_approx() < |
93 | 28 | _shared_state->exchanger->_free_block_limit * |
94 | 98 | _shared_state->exchanger->_num_sources) { |
95 | 98 | _data_block.clear_column_data(); |
96 | | // Free blocks is used to improve memory efficiency. Failure during pushing back |
97 | | // free block will not incur any bad result so just ignore the return value. |
98 | 98 | _shared_state->exchanger->_free_blocks.enqueue(std::move(_data_block)); |
99 | 98 | } |
100 | 114 | }; |
101 | 114 | } |
102 | 161 | void record_channel_id(int channel_id) { |
103 | 161 | _channel_ids.push_back(channel_id); |
104 | 161 | if (_shared_state) { |
105 | 161 | _shared_state->add_mem_usage(channel_id, _allocated_bytes); |
106 | 161 | } |
107 | 161 | } |
108 | | |
109 | | private: |
110 | | friend class ShuffleExchanger; |
111 | | friend class BucketShuffleExchanger; |
112 | | friend class PassthroughExchanger; |
113 | | friend class BroadcastExchanger; |
114 | | friend class PassToOneExchanger; |
115 | | friend class AdaptivePassthroughExchanger; |
116 | | template <typename BlockType> |
117 | | friend class Exchanger; |
118 | | |
119 | | Block _data_block; |
120 | | LocalExchangeSharedState* _shared_state; |
121 | | std::vector<int> _channel_ids; |
122 | | const size_t _allocated_bytes; |
123 | | }; |
124 | | ExchangerBase(int running_sink_operators, int num_partitions, int free_block_limit) |
125 | 4 | : _running_sink_operators(running_sink_operators), |
126 | 4 | _running_source_operators(num_partitions), |
127 | 4 | _num_partitions(num_partitions), |
128 | 4 | _num_senders(running_sink_operators), |
129 | 4 | _num_sources(num_partitions), |
130 | 4 | _free_block_limit(free_block_limit) {} |
131 | | ExchangerBase(int running_sink_operators, int num_sources, int num_partitions, |
132 | | int free_block_limit) |
133 | 2 | : _running_sink_operators(running_sink_operators), |
134 | 2 | _running_source_operators(num_sources), |
135 | 2 | _num_partitions(num_partitions), |
136 | 2 | _num_senders(running_sink_operators), |
137 | 2 | _num_sources(num_sources), |
138 | 2 | _free_block_limit(free_block_limit) {} |
139 | 6 | virtual ~ExchangerBase() = default; |
140 | | virtual Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile, |
141 | | SourceInfo&& source_info) = 0; |
142 | | virtual Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile, |
143 | | SinkInfo& sink_info) = 0; |
144 | | virtual ExchangeType get_type() const = 0; |
145 | | // Called if a local exchanger source operator are closed. Free the unused data block in data_queue. |
146 | | virtual void close(SourceInfo&& source_info) = 0; |
147 | | // Called if all local exchanger source operators are closed. We free the memory in |
148 | | // `_free_blocks` here. |
149 | | virtual void finalize(); |
150 | | |
151 | | virtual std::string data_queue_debug_string(int i) = 0; |
152 | | |
153 | 0 | void set_low_memory_mode() { |
154 | 0 | _free_block_limit = 0; |
155 | 0 | clear_blocks(_free_blocks); |
156 | 0 | } |
157 | | |
158 | | protected: |
159 | | friend struct LocalExchangeSharedState; |
160 | | friend class LocalExchangeSourceLocalState; |
161 | | friend class LocalExchangeSinkOperatorX; |
162 | | friend class LocalExchangeSinkLocalState; |
163 | | std::atomic<int> _running_sink_operators = 0; |
164 | | std::atomic<int> _running_source_operators = 0; |
165 | | const int _num_partitions; |
166 | | const int _num_senders; |
167 | | const int _num_sources; |
168 | | std::atomic_int _free_block_limit = 0; |
169 | | moodycamel::ConcurrentQueue<Block> _free_blocks; |
170 | | }; |
171 | | |
172 | | struct PartitionedRowIdxs { |
173 | | std::shared_ptr<PODArray<uint32_t>> row_idxs; |
174 | | uint32_t offset_start; |
175 | | uint32_t length; |
176 | | }; |
177 | | |
178 | | using PartitionedBlock = |
179 | | std::pair<std::shared_ptr<ExchangerBase::BlockWrapper>, PartitionedRowIdxs>; |
180 | | |
181 | | struct BroadcastRowRange { |
182 | | uint32_t offset_start; |
183 | | size_t length; |
184 | | }; |
185 | | using BroadcastBlock = std::pair<std::shared_ptr<ExchangerBase::BlockWrapper>, BroadcastRowRange>; |
186 | | |
187 | | template <typename BlockType> |
188 | | struct BlockQueue { |
189 | | std::atomic<bool> eos = false; |
190 | | moodycamel::ConcurrentQueue<BlockType> data_queue; |
191 | | moodycamel::ProducerToken ptok {data_queue}; |
192 | 24 | BlockQueue() : eos(false), data_queue(moodycamel::ConcurrentQueue<BlockType>()) {}_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEEC2Ev Line | Count | Source | 192 | 8 | BlockQueue() : eos(false), data_queue(moodycamel::ConcurrentQueue<BlockType>()) {} |
_ZN5doris10BlockQueueISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEEC2Ev Line | Count | Source | 192 | 12 | BlockQueue() : eos(false), data_queue(moodycamel::ConcurrentQueue<BlockType>()) {} |
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEEC2Ev Line | Count | Source | 192 | 4 | BlockQueue() : eos(false), data_queue(moodycamel::ConcurrentQueue<BlockType>()) {} |
|
193 | | BlockQueue(BlockQueue<BlockType>&& other) |
194 | 0 | : eos(other.eos.load()), data_queue(std::move(other.data_queue)) {}Unexecuted instantiation: _ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEEC2EOS8_ Unexecuted instantiation: _ZN5doris10BlockQueueISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEEC2EOS5_ Unexecuted instantiation: _ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEEC2EOS8_ |
195 | | inline bool enqueue(BlockType const& item) { |
196 | | if (!eos) { |
197 | | if (!data_queue.enqueue(ptok, item)) [[unlikely]] { |
198 | | throw Exception(ErrorCode::INTERNAL_ERROR, |
199 | | "Exception occurs in data queue [size = {}] of local exchange.", |
200 | | data_queue.size_approx()); |
201 | | } |
202 | | return true; |
203 | | } |
204 | | return false; |
205 | | } |
206 | | |
207 | 161 | inline bool enqueue(BlockType&& item) { |
208 | 161 | if (!eos) { |
209 | 129 | if (!data_queue.enqueue(ptok, std::move(item))) [[unlikely]] { |
210 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, |
211 | 0 | "Exception occurs in data queue [size = {}] of local exchange.", |
212 | 0 | data_queue.size_approx()); |
213 | 0 | } |
214 | 129 | return true; |
215 | 129 | } |
216 | 32 | return false; |
217 | 161 | } _ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE7enqueueEOS7_ Line | Count | Source | 207 | 20 | inline bool enqueue(BlockType&& item) { | 208 | 20 | if (!eos) { | 209 | 16 | if (!data_queue.enqueue(ptok, std::move(item))) [[unlikely]] { | 210 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, | 211 | 0 | "Exception occurs in data queue [size = {}] of local exchange.", | 212 | 0 | data_queue.size_approx()); | 213 | 0 | } | 214 | 16 | return true; | 215 | 16 | } | 216 | 4 | return false; | 217 | 20 | } |
_ZN5doris10BlockQueueISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE7enqueueEOS4_ Line | Count | Source | 207 | 77 | inline bool enqueue(BlockType&& item) { | 208 | 77 | if (!eos) { | 209 | 65 | if (!data_queue.enqueue(ptok, std::move(item))) [[unlikely]] { | 210 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, | 211 | 0 | "Exception occurs in data queue [size = {}] of local exchange.", | 212 | 0 | data_queue.size_approx()); | 213 | 0 | } | 214 | 65 | return true; | 215 | 65 | } | 216 | 12 | return false; | 217 | 77 | } |
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE7enqueueEOS7_ Line | Count | Source | 207 | 64 | inline bool enqueue(BlockType&& item) { | 208 | 64 | if (!eos) { | 209 | 48 | if (!data_queue.enqueue(ptok, std::move(item))) [[unlikely]] { | 210 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, | 211 | 0 | "Exception occurs in data queue [size = {}] of local exchange.", | 212 | 0 | data_queue.size_approx()); | 213 | 0 | } | 214 | 48 | return true; | 215 | 48 | } | 216 | 16 | return false; | 217 | 64 | } |
|
218 | | |
219 | 230 | bool try_dequeue(BlockType& item) { return data_queue.try_dequeue(item); }_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE11try_dequeueERS7_ Line | Count | Source | 219 | 36 | bool try_dequeue(BlockType& item) { return data_queue.try_dequeue(item); } |
_ZN5doris10BlockQueueISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE11try_dequeueERS4_ Line | Count | Source | 219 | 122 | bool try_dequeue(BlockType& item) { return data_queue.try_dequeue(item); } |
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE11try_dequeueERS7_ Line | Count | Source | 219 | 72 | bool try_dequeue(BlockType& item) { return data_queue.try_dequeue(item); } |
|
220 | | |
221 | 20 | void set_eos() { eos = true; }_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE7set_eosEv Line | Count | Source | 221 | 4 | void set_eos() { eos = true; } |
_ZN5doris10BlockQueueISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE7set_eosEv Line | Count | Source | 221 | 12 | void set_eos() { eos = true; } |
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE7set_eosEv Line | Count | Source | 221 | 4 | void set_eos() { eos = true; } |
|
222 | | }; |
223 | | |
224 | | using BlockWrapperSPtr = std::shared_ptr<ExchangerBase::BlockWrapper>; |
225 | | |
226 | | template <typename BlockType> |
227 | | class Exchanger : public ExchangerBase { |
228 | | public: |
229 | | Exchanger(int running_sink_operators, int num_partitions, int free_block_limit) |
230 | 4 | : ExchangerBase(running_sink_operators, num_partitions, free_block_limit) { |
231 | 4 | _data_queue.resize(num_partitions); |
232 | 4 | _m.resize(num_partitions); |
233 | 20 | for (size_t i = 0; i < num_partitions; i++) { |
234 | 16 | _m[i] = std::make_unique<std::mutex>(); |
235 | 16 | } |
236 | 4 | } _ZN5doris9ExchangerISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEEC2Eiii Line | Count | Source | 230 | 3 | : ExchangerBase(running_sink_operators, num_partitions, free_block_limit) { | 231 | 3 | _data_queue.resize(num_partitions); | 232 | 3 | _m.resize(num_partitions); | 233 | 15 | for (size_t i = 0; i < num_partitions; i++) { | 234 | 12 | _m[i] = std::make_unique<std::mutex>(); | 235 | 12 | } | 236 | 3 | } |
_ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEEC2Eiii Line | Count | Source | 230 | 1 | : ExchangerBase(running_sink_operators, num_partitions, free_block_limit) { | 231 | 1 | _data_queue.resize(num_partitions); | 232 | 1 | _m.resize(num_partitions); | 233 | 5 | for (size_t i = 0; i < num_partitions; i++) { | 234 | 4 | _m[i] = std::make_unique<std::mutex>(); | 235 | 4 | } | 236 | 1 | } |
|
237 | | Exchanger(int running_sink_operators, int num_sources, int num_partitions, int free_block_limit) |
238 | 2 | : ExchangerBase(running_sink_operators, num_sources, num_partitions, free_block_limit) { |
239 | 2 | _data_queue.resize(num_sources); |
240 | 2 | _m.resize(num_sources); |
241 | 10 | for (size_t i = 0; i < num_sources; i++) { |
242 | 8 | _m[i] = std::make_unique<std::mutex>(); |
243 | 8 | } |
244 | 2 | } |
245 | 6 | ~Exchanger() override = default; _ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEED2Ev Line | Count | Source | 245 | 2 | ~Exchanger() override = default; |
_ZN5doris9ExchangerISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEED2Ev Line | Count | Source | 245 | 3 | ~Exchanger() override = default; |
_ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEED2Ev Line | Count | Source | 245 | 1 | ~Exchanger() override = default; |
|
246 | 0 | std::string data_queue_debug_string(int i) override { |
247 | 0 | return fmt::format("Data Queue {}: [size approx = {}, eos = {}]", i, |
248 | 0 | _data_queue[i].data_queue.size_approx(), _data_queue[i].eos); |
249 | 0 | } Unexecuted instantiation: _ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE23data_queue_debug_stringB5cxx11Ei Unexecuted instantiation: _ZN5doris9ExchangerISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE23data_queue_debug_stringB5cxx11Ei Unexecuted instantiation: _ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE23data_queue_debug_stringB5cxx11Ei |
250 | | |
251 | | protected: |
252 | | // Enqueue data block and set downstream source operator to read. |
253 | | void _enqueue_data_and_set_ready(int channel_id, LocalExchangeSinkLocalState* local_state, |
254 | | BlockType&& block); |
255 | | bool _dequeue_data(LocalExchangeSourceLocalState* local_state, BlockType& block, bool* eos, |
256 | | Block* data_block, int channel_id); |
257 | | |
258 | | void _enqueue_data_and_set_ready(int channel_id, BlockType&& block); |
259 | | bool _dequeue_data(BlockType& block, bool* eos, Block* data_block, int channel_id); |
260 | | std::vector<BlockQueue<BlockType>> _data_queue; |
261 | | std::vector<std::unique_ptr<std::mutex>> _m; |
262 | | }; |
263 | | |
264 | | class LocalExchangeSourceLocalState; |
265 | | class LocalExchangeSinkLocalState; |
266 | | |
267 | | class ShuffleExchanger : public Exchanger<PartitionedBlock> { |
268 | | public: |
269 | | ENABLE_FACTORY_CREATOR(ShuffleExchanger); |
270 | | ShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions, |
271 | | int free_block_limit) |
272 | 2 | : Exchanger<PartitionedBlock>(running_sink_operators, num_sources, num_partitions, |
273 | 2 | free_block_limit) { |
274 | 2 | DCHECK_GT(num_partitions, 0); |
275 | 2 | DCHECK_GT(num_sources, 0); |
276 | 2 | _partition_rows_histogram.resize(running_sink_operators); |
277 | 2 | } |
278 | 2 | ~ShuffleExchanger() override = default; |
279 | | Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile, |
280 | | SinkInfo& sink_info) override; |
281 | | |
282 | | Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile, |
283 | | SourceInfo&& source_info) override; |
284 | | void close(SourceInfo&& source_info) override; |
285 | 1 | ExchangeType get_type() const override { return ExchangeType::HASH_SHUFFLE; } |
286 | | |
287 | | protected: |
288 | | Status _split_rows(RuntimeState* state, const std::vector<uint32_t>& channel_ids, Block* block, |
289 | | int channel_id, LocalExchangeSinkLocalState* local_state, |
290 | | std::map<int, int>* shuffle_idx_to_instance_idx); |
291 | | Status _split_rows(RuntimeState* state, const std::vector<uint32_t>& channel_ids, Block* block, |
292 | | int channel_id); |
293 | | std::vector<std::vector<uint32_t>> _partition_rows_histogram; |
294 | | }; |
295 | | |
296 | | class BucketShuffleExchanger final : public ShuffleExchanger { |
297 | | ENABLE_FACTORY_CREATOR(BucketShuffleExchanger); |
298 | | BucketShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions, |
299 | | int free_block_limit) |
300 | 0 | : ShuffleExchanger(running_sink_operators, num_sources, num_partitions, |
301 | 0 | free_block_limit) {} |
302 | 0 | ~BucketShuffleExchanger() override = default; |
303 | 0 | ExchangeType get_type() const override { return ExchangeType::BUCKET_HASH_SHUFFLE; } |
304 | | }; |
305 | | |
306 | | class PassthroughExchanger final : public Exchanger<BlockWrapperSPtr> { |
307 | | public: |
308 | | ENABLE_FACTORY_CREATOR(PassthroughExchanger); |
309 | | PassthroughExchanger(int running_sink_operators, int num_partitions, int free_block_limit) |
310 | 1 | : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions, |
311 | 1 | free_block_limit) {} |
312 | 1 | ~PassthroughExchanger() override = default; |
313 | | Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile, |
314 | | SinkInfo& sink_info) override; |
315 | | |
316 | | Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile, |
317 | | SourceInfo&& source_info) override; |
318 | 0 | ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH; } |
319 | | void close(SourceInfo&& source_info) override; |
320 | | }; |
321 | | |
322 | | class PassToOneExchanger final : public Exchanger<BlockWrapperSPtr> { |
323 | | public: |
324 | | ENABLE_FACTORY_CREATOR(PassToOneExchanger); |
325 | | PassToOneExchanger(int running_sink_operators, int num_partitions, int free_block_limit) |
326 | 1 | : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions, |
327 | 1 | free_block_limit) {} |
328 | 1 | ~PassToOneExchanger() override = default; |
329 | | Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile, |
330 | | SinkInfo& sink_info) override; |
331 | | |
332 | | Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile, |
333 | | SourceInfo&& source_info) override; |
334 | 0 | ExchangeType get_type() const override { return ExchangeType::PASS_TO_ONE; } |
335 | | void close(SourceInfo&& source_info) override; |
336 | | }; |
337 | | class BroadcastExchanger final : public Exchanger<BroadcastBlock> { |
338 | | public: |
339 | | ENABLE_FACTORY_CREATOR(BroadcastExchanger); |
340 | | BroadcastExchanger(int running_sink_operators, int num_partitions, int free_block_limit) |
341 | 1 | : Exchanger<BroadcastBlock>(running_sink_operators, num_partitions, free_block_limit) {} |
342 | 1 | ~BroadcastExchanger() override = default; |
343 | | Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile, |
344 | | SinkInfo& sink_info) override; |
345 | | |
346 | | Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile, |
347 | | SourceInfo&& source_info) override; |
348 | 0 | ExchangeType get_type() const override { return ExchangeType::BROADCAST; } |
349 | | void close(SourceInfo&& source_info) override; |
350 | | }; |
351 | | |
352 | | //The code in AdaptivePassthroughExchanger is essentially |
353 | | // a copy of ShuffleExchanger and PassthroughExchanger. |
354 | | class AdaptivePassthroughExchanger : public Exchanger<BlockWrapperSPtr> { |
355 | | public: |
356 | | ENABLE_FACTORY_CREATOR(AdaptivePassthroughExchanger); |
357 | | AdaptivePassthroughExchanger(int running_sink_operators, int num_partitions, |
358 | | int free_block_limit) |
359 | 1 | : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions, |
360 | 1 | free_block_limit) { |
361 | 1 | _partition_rows_histogram.resize(running_sink_operators); |
362 | 1 | } |
363 | | Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile, |
364 | | SinkInfo& sink_info) override; |
365 | | |
366 | | Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile, |
367 | | SourceInfo&& source_info) override; |
368 | 0 | ExchangeType get_type() const override { return ExchangeType::ADAPTIVE_PASSTHROUGH; } |
369 | | |
370 | | void close(SourceInfo&& source_info) override; |
371 | | |
372 | | private: |
373 | | Status _passthrough_sink(RuntimeState* state, Block* in_block, SinkInfo& sink_info); |
374 | | Status _shuffle_sink(RuntimeState* state, Block* in_block, SinkInfo& sink_info); |
375 | | Status _split_rows(RuntimeState* state, const std::vector<uint32_t>& channel_ids, Block* block, |
376 | | SinkInfo& sink_info); |
377 | | |
378 | | std::atomic_bool _is_pass_through = false; |
379 | | std::atomic_int32_t _total_block = 0; |
380 | | std::vector<std::vector<uint32_t>> _partition_rows_histogram; |
381 | | }; |
382 | | } // namespace doris |