be/src/exec/exchange/local_exchanger.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include "exec/operator/operator.h" |
21 | | #include "exec/pipeline/dependency.h" |
22 | | |
23 | | namespace doris { |
24 | | #include "common/compile_check_begin.h" |
25 | | template <typename T> |
26 | | void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks, |
27 | | RuntimeProfile::Counter* memory_used_counter = nullptr); |
28 | | |
29 | | class PartitionerBase; |
30 | | class LocalExchangeSourceLocalState; |
31 | | class LocalExchangeSinkLocalState; |
32 | | |
33 | | struct Profile { |
34 | | RuntimeProfile::Counter* compute_hash_value_timer = nullptr; |
35 | | RuntimeProfile::Counter* distribute_timer = nullptr; |
36 | | RuntimeProfile::Counter* copy_data_timer = nullptr; |
37 | | }; |
38 | | |
39 | | struct SinkInfo { |
40 | | int* channel_id; |
41 | | PartitionerBase* partitioner; |
42 | | LocalExchangeSinkLocalState* local_state; |
43 | | std::map<int, int>* shuffle_idx_to_instance_idx; |
44 | | }; |
45 | | |
46 | | struct SourceInfo { |
47 | | int channel_id; |
48 | | LocalExchangeSourceLocalState* local_state; |
49 | | }; |
50 | | /** |
51 | | * One exchanger is hold by one `LocalExchangeSharedState`. And one `LocalExchangeSharedState` is |
52 | | * shared by all local exchange sink operators and source operators with the same id. |
53 | | * |
54 | | * In exchanger, two block queues is maintained, one is data block queue and another is free block queue. |
55 | | * |
56 | | * In details, data block queue has queues as many as source operators. Each source operator will get |
57 | | * data block from the corresponding queue. Data blocks is push into the queue by sink operators. One |
58 | | * sink operator will push blocks into one or more queues. |
59 | | * |
60 | | * Free block is used to reuse the allocated memory. To reduce the memory limit, we also use a conf |
61 | | * to limit the size of free block queue. |
62 | | */ |
63 | | class ExchangerBase { |
64 | | public: |
65 | | /** |
66 | | * `BlockWrapper` is used to wrap a data block with a reference count. |
67 | | * |
68 | | * In function `unref()`, if `ref_count` decremented to 0, which means this block is not needed by |
69 | | * operators, so we put it into `_free_blocks` to reuse its memory if needed and refresh memory usage |
70 | | * in current queue. |
71 | | * |
72 | | * Note: `ref_count` will be larger than 1 only if this block is shared between multiple queues in |
73 | | * shuffle exchanger. |
74 | | */ |
75 | | class BlockWrapper { |
76 | | public: |
77 | | ENABLE_FACTORY_CREATOR(BlockWrapper); |
78 | | BlockWrapper(Block&& data_block, LocalExchangeSharedState* shared_state, int channel_id) |
79 | 238k | : _data_block(std::move(data_block)), |
80 | 238k | _shared_state(shared_state), |
81 | 238k | _allocated_bytes(_data_block.allocated_bytes()) { |
82 | 238k | if (_shared_state) { |
83 | 238k | _shared_state->add_total_mem_usage(_allocated_bytes); |
84 | 238k | } |
85 | 238k | } |
86 | 238k | ~BlockWrapper() { |
87 | 238k | if (_shared_state != nullptr) { |
88 | 238k | DCHECK_GT(_allocated_bytes, 0); |
89 | | // `_channel_ids` may be empty if exchanger is shuffled exchanger and channel id is |
90 | | // not used by `sub_total_mem_usage`. So we just pass -1 here. |
91 | 238k | _shared_state->sub_total_mem_usage(_allocated_bytes); |
92 | 238k | if (_shared_state->exchanger->_free_block_limit == 0 || |
93 | 238k | _shared_state->exchanger->_free_blocks.size_approx() < |
94 | 238k | _shared_state->exchanger->_free_block_limit * |
95 | 238k | _shared_state->exchanger->_num_sources) { |
96 | 233k | _data_block.clear_column_data(); |
97 | | // Free blocks is used to improve memory efficiency. Failure during pushing back |
98 | | // free block will not incur any bad result so just ignore the return value. |
99 | 233k | _shared_state->exchanger->_free_blocks.enqueue(std::move(_data_block)); |
100 | 233k | } |
101 | 238k | }; |
102 | 238k | } |
103 | 275k | void record_channel_id(int channel_id) { |
104 | 275k | _channel_ids.push_back(channel_id); |
105 | 275k | if (_shared_state) { |
106 | 275k | _shared_state->add_mem_usage(channel_id, _allocated_bytes); |
107 | 275k | } |
108 | 275k | } |
109 | | |
110 | | private: |
111 | | friend class ShuffleExchanger; |
112 | | friend class BucketShuffleExchanger; |
113 | | friend class PassthroughExchanger; |
114 | | friend class BroadcastExchanger; |
115 | | friend class PassToOneExchanger; |
116 | | friend class AdaptivePassthroughExchanger; |
117 | | template <typename BlockType> |
118 | | friend class Exchanger; |
119 | | |
120 | | Block _data_block; |
121 | | LocalExchangeSharedState* _shared_state; |
122 | | std::vector<int> _channel_ids; |
123 | | const size_t _allocated_bytes; |
124 | | }; |
125 | | ExchangerBase(int running_sink_operators, int num_partitions, int free_block_limit) |
126 | 103k | : _running_sink_operators(running_sink_operators), |
127 | 103k | _running_source_operators(num_partitions), |
128 | 103k | _num_partitions(num_partitions), |
129 | 103k | _num_senders(running_sink_operators), |
130 | 103k | _num_sources(num_partitions), |
131 | 103k | _free_block_limit(free_block_limit) {} |
132 | | ExchangerBase(int running_sink_operators, int num_sources, int num_partitions, |
133 | | int free_block_limit) |
134 | 20.1k | : _running_sink_operators(running_sink_operators), |
135 | 20.1k | _running_source_operators(num_sources), |
136 | 20.1k | _num_partitions(num_partitions), |
137 | 20.1k | _num_senders(running_sink_operators), |
138 | 20.1k | _num_sources(num_sources), |
139 | 20.1k | _free_block_limit(free_block_limit) {} |
140 | 124k | virtual ~ExchangerBase() = default; |
141 | | virtual Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile, |
142 | | SourceInfo&& source_info) = 0; |
143 | | virtual Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile, |
144 | | SinkInfo& sink_info) = 0; |
145 | | virtual ExchangeType get_type() const = 0; |
146 | | // Called if a local exchanger source operator are closed. Free the unused data block in data_queue. |
147 | | virtual void close(SourceInfo&& source_info) = 0; |
148 | | // Called if all local exchanger source operators are closed. We free the memory in |
149 | | // `_free_blocks` here. |
150 | | virtual void finalize(); |
151 | | |
152 | | virtual std::string data_queue_debug_string(int i) = 0; |
153 | | |
154 | 134 | void set_low_memory_mode() { |
155 | 134 | _free_block_limit = 0; |
156 | 134 | clear_blocks(_free_blocks); |
157 | 134 | } |
158 | | |
159 | | protected: |
160 | | friend struct LocalExchangeSharedState; |
161 | | friend class LocalExchangeSourceLocalState; |
162 | | friend class LocalExchangeSinkOperatorX; |
163 | | friend class LocalExchangeSinkLocalState; |
164 | | std::atomic<int> _running_sink_operators = 0; |
165 | | std::atomic<int> _running_source_operators = 0; |
166 | | const int _num_partitions; |
167 | | const int _num_senders; |
168 | | const int _num_sources; |
169 | | std::atomic_int _free_block_limit = 0; |
170 | | moodycamel::ConcurrentQueue<Block> _free_blocks; |
171 | | }; |
172 | | |
173 | | struct PartitionedRowIdxs { |
174 | | std::shared_ptr<PODArray<uint32_t>> row_idxs; |
175 | | uint32_t offset_start; |
176 | | uint32_t length; |
177 | | }; |
178 | | |
179 | | using PartitionedBlock = |
180 | | std::pair<std::shared_ptr<ExchangerBase::BlockWrapper>, PartitionedRowIdxs>; |
181 | | |
182 | | struct BroadcastRowRange { |
183 | | uint32_t offset_start; |
184 | | size_t length; |
185 | | }; |
186 | | using BroadcastBlock = std::pair<std::shared_ptr<ExchangerBase::BlockWrapper>, BroadcastRowRange>; |
187 | | |
188 | | template <typename BlockType> |
189 | | struct BlockQueue { |
190 | | std::atomic<bool> eos = false; |
191 | | moodycamel::ConcurrentQueue<BlockType> data_queue; |
192 | | moodycamel::ProducerToken ptok {data_queue}; |
193 | 809k | BlockQueue() : eos(false), data_queue(moodycamel::ConcurrentQueue<BlockType>()) {}_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEEC2Ev Line | Count | Source | 193 | 119k | BlockQueue() : eos(false), data_queue(moodycamel::ConcurrentQueue<BlockType>()) {} |
_ZN5doris10BlockQueueISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEEC2Ev Line | Count | Source | 193 | 680k | BlockQueue() : eos(false), data_queue(moodycamel::ConcurrentQueue<BlockType>()) {} |
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEEC2Ev Line | Count | Source | 193 | 9.57k | BlockQueue() : eos(false), data_queue(moodycamel::ConcurrentQueue<BlockType>()) {} |
|
194 | | BlockQueue(BlockQueue<BlockType>&& other) |
195 | 0 | : eos(other.eos.load()), data_queue(std::move(other.data_queue)) {}Unexecuted instantiation: _ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEEC2EOS8_ Unexecuted instantiation: _ZN5doris10BlockQueueISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEEC2EOS5_ Unexecuted instantiation: _ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEEC2EOS8_ |
196 | | inline bool enqueue(BlockType const& item) { |
197 | | if (!eos) { |
198 | | if (!data_queue.enqueue(ptok, item)) [[unlikely]] { |
199 | | throw Exception(ErrorCode::INTERNAL_ERROR, |
200 | | "Exception occurs in data queue [size = {}] of local exchange.", |
201 | | data_queue.size_approx()); |
202 | | } |
203 | | return true; |
204 | | } |
205 | | return false; |
206 | | } |
207 | | |
208 | 275k | inline bool enqueue(BlockType&& item) { |
209 | 275k | if (!eos) { |
210 | 275k | if (!data_queue.enqueue(ptok, std::move(item))) [[unlikely]] { |
211 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, |
212 | 0 | "Exception occurs in data queue [size = {}] of local exchange.", |
213 | 0 | data_queue.size_approx()); |
214 | 0 | } |
215 | 275k | return true; |
216 | 275k | } |
217 | 80 | return false; |
218 | 275k | } _ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE7enqueueEOS7_ Line | Count | Source | 208 | 44.3k | inline bool enqueue(BlockType&& item) { | 209 | 44.3k | if (!eos) { | 210 | 44.2k | if (!data_queue.enqueue(ptok, std::move(item))) [[unlikely]] { | 211 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, | 212 | 0 | "Exception occurs in data queue [size = {}] of local exchange.", | 213 | 0 | data_queue.size_approx()); | 214 | 0 | } | 215 | 44.2k | return true; | 216 | 44.2k | } | 217 | 86 | return false; | 218 | 44.3k | } |
_ZN5doris10BlockQueueISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE7enqueueEOS4_ Line | Count | Source | 208 | 213k | inline bool enqueue(BlockType&& item) { | 209 | 213k | if (!eos) { | 210 | 213k | if (!data_queue.enqueue(ptok, std::move(item))) [[unlikely]] { | 211 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, | 212 | 0 | "Exception occurs in data queue [size = {}] of local exchange.", | 213 | 0 | data_queue.size_approx()); | 214 | 0 | } | 215 | 213k | return true; | 216 | 213k | } | 217 | 18.4E | return false; | 218 | 213k | } |
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE7enqueueEOS7_ Line | Count | Source | 208 | 17.5k | inline bool enqueue(BlockType&& item) { | 209 | 17.5k | if (!eos) { | 210 | 17.5k | if (!data_queue.enqueue(ptok, std::move(item))) [[unlikely]] { | 211 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, | 212 | 0 | "Exception occurs in data queue [size = {}] of local exchange.", | 213 | 0 | data_queue.size_approx()); | 214 | 0 | } | 215 | 17.5k | return true; | 216 | 17.5k | } | 217 | 16 | return false; | 218 | 17.5k | } |
|
219 | | |
220 | 2.16M | bool try_dequeue(BlockType& item) { return data_queue.try_dequeue(item); }_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE11try_dequeueERS7_ Line | Count | Source | 220 | 332k | bool try_dequeue(BlockType& item) { return data_queue.try_dequeue(item); } |
_ZN5doris10BlockQueueISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE11try_dequeueERS4_ Line | Count | Source | 220 | 1.79M | bool try_dequeue(BlockType& item) { return data_queue.try_dequeue(item); } |
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE11try_dequeueERS7_ Line | Count | Source | 220 | 44.9k | bool try_dequeue(BlockType& item) { return data_queue.try_dequeue(item); } |
|
221 | | |
222 | 807k | void set_eos() { eos = true; }_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE7set_eosEv Line | Count | Source | 222 | 119k | void set_eos() { eos = true; } |
_ZN5doris10BlockQueueISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE7set_eosEv Line | Count | Source | 222 | 678k | void set_eos() { eos = true; } |
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE7set_eosEv Line | Count | Source | 222 | 9.56k | void set_eos() { eos = true; } |
|
223 | | }; |
224 | | |
225 | | using BlockWrapperSPtr = std::shared_ptr<ExchangerBase::BlockWrapper>; |
226 | | |
227 | | template <typename BlockType> |
228 | | class Exchanger : public ExchangerBase { |
229 | | public: |
230 | | Exchanger(int running_sink_operators, int num_partitions, int free_block_limit) |
231 | 103k | : ExchangerBase(running_sink_operators, num_partitions, free_block_limit) { |
232 | 103k | _data_queue.resize(num_partitions); |
233 | 103k | _m.resize(num_partitions); |
234 | 793k | for (size_t i = 0; i < num_partitions; i++) { |
235 | 690k | _m[i] = std::make_unique<std::mutex>(); |
236 | 690k | } |
237 | 103k | } _ZN5doris9ExchangerISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEEC2Eiii Line | Count | Source | 231 | 102k | : ExchangerBase(running_sink_operators, num_partitions, free_block_limit) { | 232 | 102k | _data_queue.resize(num_partitions); | 233 | 102k | _m.resize(num_partitions); | 234 | 782k | for (size_t i = 0; i < num_partitions; i++) { | 235 | 680k | _m[i] = std::make_unique<std::mutex>(); | 236 | 680k | } | 237 | 102k | } |
_ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEEC2Eiii Line | Count | Source | 231 | 1.27k | : ExchangerBase(running_sink_operators, num_partitions, free_block_limit) { | 232 | 1.27k | _data_queue.resize(num_partitions); | 233 | 1.27k | _m.resize(num_partitions); | 234 | 10.8k | for (size_t i = 0; i < num_partitions; i++) { | 235 | 9.57k | _m[i] = std::make_unique<std::mutex>(); | 236 | 9.57k | } | 237 | 1.27k | } |
|
238 | | Exchanger(int running_sink_operators, int num_sources, int num_partitions, int free_block_limit) |
239 | 20.1k | : ExchangerBase(running_sink_operators, num_sources, num_partitions, free_block_limit) { |
240 | 20.1k | _data_queue.resize(num_sources); |
241 | 20.1k | _m.resize(num_sources); |
242 | 139k | for (size_t i = 0; i < num_sources; i++) { |
243 | 119k | _m[i] = std::make_unique<std::mutex>(); |
244 | 119k | } |
245 | 20.1k | } |
246 | 124k | ~Exchanger() override = default; _ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEED2Ev Line | Count | Source | 246 | 20.2k | ~Exchanger() override = default; |
_ZN5doris9ExchangerISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEED2Ev Line | Count | Source | 246 | 102k | ~Exchanger() override = default; |
_ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEED2Ev Line | Count | Source | 246 | 1.27k | ~Exchanger() override = default; |
|
247 | 60 | std::string data_queue_debug_string(int i) override { |
248 | 60 | return fmt::format("Data Queue {}: [size approx = {}, eos = {}]", i, |
249 | 60 | _data_queue[i].data_queue.size_approx(), _data_queue[i].eos); |
250 | 60 | } Unexecuted instantiation: _ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE23data_queue_debug_stringB5cxx11Ei _ZN5doris9ExchangerISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE23data_queue_debug_stringB5cxx11Ei Line | Count | Source | 247 | 60 | std::string data_queue_debug_string(int i) override { | 248 | 60 | return fmt::format("Data Queue {}: [size approx = {}, eos = {}]", i, | 249 | 60 | _data_queue[i].data_queue.size_approx(), _data_queue[i].eos); | 250 | 60 | } |
Unexecuted instantiation: _ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE23data_queue_debug_stringB5cxx11Ei |
251 | | |
252 | | protected: |
253 | | // Enqueue data block and set downstream source operator to read. |
254 | | void _enqueue_data_and_set_ready(int channel_id, LocalExchangeSinkLocalState* local_state, |
255 | | BlockType&& block); |
256 | | bool _dequeue_data(LocalExchangeSourceLocalState* local_state, BlockType& block, bool* eos, |
257 | | Block* data_block, int channel_id); |
258 | | |
259 | | void _enqueue_data_and_set_ready(int channel_id, BlockType&& block); |
260 | | bool _dequeue_data(BlockType& block, bool* eos, Block* data_block, int channel_id); |
261 | | std::vector<BlockQueue<BlockType>> _data_queue; |
262 | | std::vector<std::unique_ptr<std::mutex>> _m; |
263 | | }; |
264 | | |
265 | | class LocalExchangeSourceLocalState; |
266 | | class LocalExchangeSinkLocalState; |
267 | | |
268 | | class ShuffleExchanger : public Exchanger<PartitionedBlock> { |
269 | | public: |
270 | | ENABLE_FACTORY_CREATOR(ShuffleExchanger); |
271 | | ShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions, |
272 | | int free_block_limit) |
273 | 20.1k | : Exchanger<PartitionedBlock>(running_sink_operators, num_sources, num_partitions, |
274 | 20.1k | free_block_limit) { |
275 | 20.1k | DCHECK_GT(num_partitions, 0); |
276 | 20.1k | DCHECK_GT(num_sources, 0); |
277 | 20.1k | _partition_rows_histogram.resize(running_sink_operators); |
278 | 20.1k | } |
279 | 20.2k | ~ShuffleExchanger() override = default; |
280 | | Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile, |
281 | | SinkInfo& sink_info) override; |
282 | | |
283 | | Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile, |
284 | | SourceInfo&& source_info) override; |
285 | | void close(SourceInfo&& source_info) override; |
286 | 336k | ExchangeType get_type() const override { return ExchangeType::HASH_SHUFFLE; } |
287 | | |
288 | | protected: |
289 | | Status _split_rows(RuntimeState* state, const std::vector<uint32_t>& channel_ids, Block* block, |
290 | | int channel_id, LocalExchangeSinkLocalState* local_state, |
291 | | std::map<int, int>* shuffle_idx_to_instance_idx); |
292 | | Status _split_rows(RuntimeState* state, const std::vector<uint32_t>& channel_ids, Block* block, |
293 | | int channel_id); |
294 | | std::vector<std::vector<uint32_t>> _partition_rows_histogram; |
295 | | }; |
296 | | |
297 | | class BucketShuffleExchanger final : public ShuffleExchanger { |
298 | | ENABLE_FACTORY_CREATOR(BucketShuffleExchanger); |
299 | | BucketShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions, |
300 | | int free_block_limit) |
301 | 535 | : ShuffleExchanger(running_sink_operators, num_sources, num_partitions, |
302 | 535 | free_block_limit) {} |
303 | | ~BucketShuffleExchanger() override = default; |
304 | 33.0k | ExchangeType get_type() const override { return ExchangeType::BUCKET_HASH_SHUFFLE; } |
305 | | }; |
306 | | |
307 | | class PassthroughExchanger final : public Exchanger<BlockWrapperSPtr> { |
308 | | public: |
309 | | ENABLE_FACTORY_CREATOR(PassthroughExchanger); |
310 | | PassthroughExchanger(int running_sink_operators, int num_partitions, int free_block_limit) |
311 | 99.1k | : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions, |
312 | 99.1k | free_block_limit) {} |
313 | | ~PassthroughExchanger() override = default; |
314 | | Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile, |
315 | | SinkInfo& sink_info) override; |
316 | | |
317 | | Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile, |
318 | | SourceInfo&& source_info) override; |
319 | 2.37M | ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH; } |
320 | | void close(SourceInfo&& source_info) override; |
321 | | }; |
322 | | |
323 | | class PassToOneExchanger final : public Exchanger<BlockWrapperSPtr> { |
324 | | public: |
325 | | ENABLE_FACTORY_CREATOR(PassToOneExchanger); |
326 | | PassToOneExchanger(int running_sink_operators, int num_partitions, int free_block_limit) |
327 | 2.15k | : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions, |
328 | 2.15k | free_block_limit) {} |
329 | | ~PassToOneExchanger() override = default; |
330 | | Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile, |
331 | | SinkInfo& sink_info) override; |
332 | | |
333 | | Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile, |
334 | | SourceInfo&& source_info) override; |
335 | 56.9k | ExchangeType get_type() const override { return ExchangeType::PASS_TO_ONE; } |
336 | | void close(SourceInfo&& source_info) override; |
337 | | }; |
338 | | class BroadcastExchanger final : public Exchanger<BroadcastBlock> { |
339 | | public: |
340 | | ENABLE_FACTORY_CREATOR(BroadcastExchanger); |
341 | | BroadcastExchanger(int running_sink_operators, int num_partitions, int free_block_limit) |
342 | 1.27k | : Exchanger<BroadcastBlock>(running_sink_operators, num_partitions, free_block_limit) {} |
343 | | ~BroadcastExchanger() override = default; |
344 | | Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile, |
345 | | SinkInfo& sink_info) override; |
346 | | |
347 | | Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile, |
348 | | SourceInfo&& source_info) override; |
349 | 31.0k | ExchangeType get_type() const override { return ExchangeType::BROADCAST; } |
350 | | void close(SourceInfo&& source_info) override; |
351 | | }; |
352 | | |
353 | | //The code in AdaptivePassthroughExchanger is essentially |
354 | | // a copy of ShuffleExchanger and PassthroughExchanger. |
355 | | class AdaptivePassthroughExchanger : public Exchanger<BlockWrapperSPtr> { |
356 | | public: |
357 | | ENABLE_FACTORY_CREATOR(AdaptivePassthroughExchanger); |
358 | | AdaptivePassthroughExchanger(int running_sink_operators, int num_partitions, |
359 | | int free_block_limit) |
360 | 952 | : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions, |
361 | 952 | free_block_limit) { |
362 | 952 | _partition_rows_histogram.resize(running_sink_operators); |
363 | 952 | } |
364 | | Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile, |
365 | | SinkInfo& sink_info) override; |
366 | | |
367 | | Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile, |
368 | | SourceInfo&& source_info) override; |
369 | 25.3k | ExchangeType get_type() const override { return ExchangeType::ADAPTIVE_PASSTHROUGH; } |
370 | | |
371 | | void close(SourceInfo&& source_info) override; |
372 | | |
373 | | private: |
374 | | Status _passthrough_sink(RuntimeState* state, Block* in_block, SinkInfo& sink_info); |
375 | | Status _shuffle_sink(RuntimeState* state, Block* in_block, SinkInfo& sink_info); |
376 | | Status _split_rows(RuntimeState* state, const std::vector<uint32_t>& channel_ids, Block* block, |
377 | | SinkInfo& sink_info); |
378 | | |
379 | | std::atomic_bool _is_pass_through = false; |
380 | | std::atomic_int32_t _total_block = 0; |
381 | | std::vector<std::vector<uint32_t>> _partition_rows_histogram; |
382 | | }; |
383 | | #include "common/compile_check_end.h" |
384 | | } // namespace doris |