Coverage Report

Created: 2026-04-10 04:10

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/exchange/local_exchanger.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include "exec/operator/operator.h"
21
#include "exec/pipeline/dependency.h"
22
23
namespace doris {
24
template <typename T>
25
void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks,
26
                  RuntimeProfile::Counter* memory_used_counter = nullptr);
27
28
class PartitionerBase;
29
class LocalExchangeSourceLocalState;
30
class LocalExchangeSinkLocalState;
31
32
struct Profile {
33
    RuntimeProfile::Counter* compute_hash_value_timer = nullptr;
34
    RuntimeProfile::Counter* distribute_timer = nullptr;
35
    RuntimeProfile::Counter* copy_data_timer = nullptr;
36
};
37
38
struct SinkInfo {
39
    int* channel_id;
40
    PartitionerBase* partitioner;
41
    LocalExchangeSinkLocalState* local_state;
42
    std::map<int, int>* shuffle_idx_to_instance_idx;
43
};
44
45
struct SourceInfo {
46
    int channel_id;
47
    LocalExchangeSourceLocalState* local_state;
48
};
49
/**
50
 * One exchanger is hold by one `LocalExchangeSharedState`. And one `LocalExchangeSharedState` is
51
 * shared by all local exchange sink operators and source operators with the same id.
52
 *
53
 * In exchanger, two block queues is maintained, one is data block queue and another is free block queue.
54
 *
55
 * In details, data block queue has queues as many as source operators. Each source operator will get
56
 * data block from the corresponding queue. Data blocks is push into the queue by sink operators. One
57
 * sink operator will push blocks into one or more queues.
58
 *
59
 * Free block is used to reuse the allocated memory. To reduce the memory limit, we also use a conf
60
 * to limit the size of free block queue.
61
 */
62
class ExchangerBase {
63
public:
64
    /**
65
     * `BlockWrapper` is used to wrap a data block with a reference count.
66
     *
67
     * In function `unref()`, if `ref_count` decremented to 0, which means this block is not needed by
68
     * operators, so we put it into `_free_blocks` to reuse its memory if needed and refresh memory usage
69
     * in current queue.
70
     *
71
     * Note: `ref_count` will be larger than 1 only if this block is shared between multiple queues in
72
     * shuffle exchanger.
73
     */
74
    class BlockWrapper {
75
    public:
76
        ENABLE_FACTORY_CREATOR(BlockWrapper);
77
        BlockWrapper(Block&& data_block, LocalExchangeSharedState* shared_state, int channel_id)
78
176k
                : _data_block(std::move(data_block)),
79
176k
                  _shared_state(shared_state),
80
176k
                  _allocated_bytes(_data_block.allocated_bytes()) {
81
176k
            if (_shared_state) {
82
176k
                _shared_state->add_total_mem_usage(_allocated_bytes);
83
176k
            }
84
176k
        }
85
176k
        ~BlockWrapper() {
86
176k
            if (_shared_state != nullptr) {
87
176k
                DCHECK_GT(_allocated_bytes, 0);
88
                // `_channel_ids` may be empty if exchanger is shuffled exchanger and channel id is
89
                // not used by `sub_total_mem_usage`. So we just pass -1 here.
90
176k
                _shared_state->sub_total_mem_usage(_allocated_bytes);
91
176k
                if (_shared_state->exchanger->_free_block_limit == 0 ||
92
176k
                    _shared_state->exchanger->_free_blocks.size_approx() <
93
176k
                            _shared_state->exchanger->_free_block_limit *
94
176k
                                    _shared_state->exchanger->_num_sources) {
95
171k
                    _data_block.clear_column_data();
96
                    // Free blocks is used to improve memory efficiency. Failure during pushing back
97
                    // free block will not incur any bad result so just ignore the return value.
98
171k
                    _shared_state->exchanger->_free_blocks.enqueue(std::move(_data_block));
99
171k
                }
100
176k
            };
101
176k
        }
102
215k
        void record_channel_id(int channel_id) {
103
215k
            _channel_ids.push_back(channel_id);
104
215k
            if (_shared_state) {
105
215k
                _shared_state->add_mem_usage(channel_id, _allocated_bytes);
106
215k
            }
107
215k
        }
108
109
    private:
110
        friend class ShuffleExchanger;
111
        friend class BucketShuffleExchanger;
112
        friend class PassthroughExchanger;
113
        friend class BroadcastExchanger;
114
        friend class PassToOneExchanger;
115
        friend class AdaptivePassthroughExchanger;
116
        template <typename BlockType>
117
        friend class Exchanger;
118
119
        Block _data_block;
120
        LocalExchangeSharedState* _shared_state;
121
        std::vector<int> _channel_ids;
122
        const size_t _allocated_bytes;
123
    };
124
    ExchangerBase(int running_sink_operators, int num_partitions, int free_block_limit)
125
79.4k
            : _running_sink_operators(running_sink_operators),
126
79.4k
              _running_source_operators(num_partitions),
127
79.4k
              _num_partitions(num_partitions),
128
79.4k
              _num_senders(running_sink_operators),
129
79.4k
              _num_sources(num_partitions),
130
79.4k
              _free_block_limit(free_block_limit) {}
131
    ExchangerBase(int running_sink_operators, int num_sources, int num_partitions,
132
                  int free_block_limit)
133
22.5k
            : _running_sink_operators(running_sink_operators),
134
22.5k
              _running_source_operators(num_sources),
135
22.5k
              _num_partitions(num_partitions),
136
22.5k
              _num_senders(running_sink_operators),
137
22.5k
              _num_sources(num_sources),
138
22.5k
              _free_block_limit(free_block_limit) {}
139
102k
    virtual ~ExchangerBase() = default;
140
    virtual Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile,
141
                             SourceInfo&& source_info) = 0;
142
    virtual Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile,
143
                        SinkInfo& sink_info) = 0;
144
    virtual ExchangeType get_type() const = 0;
145
    // Called if a local exchanger source operator are closed. Free the unused data block in data_queue.
146
    virtual void close(SourceInfo&& source_info) = 0;
147
    // Called if all local exchanger source operators are closed. We free the memory in
148
    // `_free_blocks` here.
149
    virtual void finalize();
150
151
    virtual std::string data_queue_debug_string(int i) = 0;
152
153
0
    void set_low_memory_mode() {
154
0
        _free_block_limit = 0;
155
0
        clear_blocks(_free_blocks);
156
0
    }
157
158
protected:
159
    friend struct LocalExchangeSharedState;
160
    friend class LocalExchangeSourceLocalState;
161
    friend class LocalExchangeSinkOperatorX;
162
    friend class LocalExchangeSinkLocalState;
163
    std::atomic<int> _running_sink_operators = 0;
164
    std::atomic<int> _running_source_operators = 0;
165
    const int _num_partitions;
166
    const int _num_senders;
167
    const int _num_sources;
168
    std::atomic_int _free_block_limit = 0;
169
    moodycamel::ConcurrentQueue<Block> _free_blocks;
170
};
171
172
struct PartitionedRowIdxs {
173
    std::shared_ptr<PODArray<uint32_t>> row_idxs;
174
    uint32_t offset_start;
175
    uint32_t length;
176
};
177
178
using PartitionedBlock =
179
        std::pair<std::shared_ptr<ExchangerBase::BlockWrapper>, PartitionedRowIdxs>;
180
181
struct BroadcastRowRange {
182
    uint32_t offset_start;
183
    size_t length;
184
};
185
using BroadcastBlock = std::pair<std::shared_ptr<ExchangerBase::BlockWrapper>, BroadcastRowRange>;
186
187
template <typename BlockType>
188
struct BlockQueue {
189
    std::atomic<bool> eos = false;
190
    moodycamel::ConcurrentQueue<BlockType> data_queue;
191
    moodycamel::ProducerToken ptok {data_queue};
192
595k
    BlockQueue() : eos(false), data_queue(moodycamel::ConcurrentQueue<BlockType>()) {}
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEEC2Ev
Line
Count
Source
192
116k
    BlockQueue() : eos(false), data_queue(moodycamel::ConcurrentQueue<BlockType>()) {}
_ZN5doris10BlockQueueISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEEC2Ev
Line
Count
Source
192
467k
    BlockQueue() : eos(false), data_queue(moodycamel::ConcurrentQueue<BlockType>()) {}
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEEC2Ev
Line
Count
Source
192
11.3k
    BlockQueue() : eos(false), data_queue(moodycamel::ConcurrentQueue<BlockType>()) {}
193
    BlockQueue(BlockQueue<BlockType>&& other)
194
0
            : eos(other.eos.load()), data_queue(std::move(other.data_queue)) {}
Unexecuted instantiation: _ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEEC2EOS8_
Unexecuted instantiation: _ZN5doris10BlockQueueISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEEC2EOS5_
Unexecuted instantiation: _ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEEC2EOS8_
195
    inline bool enqueue(BlockType const& item) {
196
        if (!eos) {
197
            if (!data_queue.enqueue(ptok, item)) [[unlikely]] {
198
                throw Exception(ErrorCode::INTERNAL_ERROR,
199
                                "Exception occurs in data queue [size = {}] of local exchange.",
200
                                data_queue.size_approx());
201
            }
202
            return true;
203
        }
204
        return false;
205
    }
206
207
215k
    inline bool enqueue(BlockType&& item) {
208
215k
        if (!eos) {
209
215k
            if (!data_queue.enqueue(ptok, std::move(item))) [[unlikely]] {
210
0
                throw Exception(ErrorCode::INTERNAL_ERROR,
211
0
                                "Exception occurs in data queue [size = {}] of local exchange.",
212
0
                                data_queue.size_approx());
213
0
            }
214
215k
            return true;
215
215k
        }
216
43
        return false;
217
215k
    }
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE7enqueueEOS7_
Line
Count
Source
207
37.4k
    inline bool enqueue(BlockType&& item) {
208
37.4k
        if (!eos) {
209
37.4k
            if (!data_queue.enqueue(ptok, std::move(item))) [[unlikely]] {
210
0
                throw Exception(ErrorCode::INTERNAL_ERROR,
211
0
                                "Exception occurs in data queue [size = {}] of local exchange.",
212
0
                                data_queue.size_approx());
213
0
            }
214
37.4k
            return true;
215
37.4k
        }
216
8
        return false;
217
37.4k
    }
_ZN5doris10BlockQueueISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE7enqueueEOS4_
Line
Count
Source
207
153k
    inline bool enqueue(BlockType&& item) {
208
153k
        if (!eos) {
209
153k
            if (!data_queue.enqueue(ptok, std::move(item))) [[unlikely]] {
210
0
                throw Exception(ErrorCode::INTERNAL_ERROR,
211
0
                                "Exception occurs in data queue [size = {}] of local exchange.",
212
0
                                data_queue.size_approx());
213
0
            }
214
153k
            return true;
215
153k
        }
216
18
        return false;
217
153k
    }
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE7enqueueEOS7_
Line
Count
Source
207
24.0k
    inline bool enqueue(BlockType&& item) {
208
24.0k
        if (!eos) {
209
23.9k
            if (!data_queue.enqueue(ptok, std::move(item))) [[unlikely]] {
210
0
                throw Exception(ErrorCode::INTERNAL_ERROR,
211
0
                                "Exception occurs in data queue [size = {}] of local exchange.",
212
0
                                data_queue.size_approx());
213
0
            }
214
23.9k
            return true;
215
23.9k
        }
216
17
        return false;
217
24.0k
    }
218
219
1.64M
    bool try_dequeue(BlockType& item) { return data_queue.try_dequeue(item); }
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE11try_dequeueERS7_
Line
Count
Source
219
318k
    bool try_dequeue(BlockType& item) { return data_queue.try_dequeue(item); }
_ZN5doris10BlockQueueISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE11try_dequeueERS4_
Line
Count
Source
219
1.26M
    bool try_dequeue(BlockType& item) { return data_queue.try_dequeue(item); }
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE11try_dequeueERS7_
Line
Count
Source
219
60.6k
    bool try_dequeue(BlockType& item) { return data_queue.try_dequeue(item); }
220
221
593k
    void set_eos() { eos = true; }
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE7set_eosEv
Line
Count
Source
221
116k
    void set_eos() { eos = true; }
_ZN5doris10BlockQueueISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE7set_eosEv
Line
Count
Source
221
465k
    void set_eos() { eos = true; }
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE7set_eosEv
Line
Count
Source
221
11.3k
    void set_eos() { eos = true; }
222
};
223
224
using BlockWrapperSPtr = std::shared_ptr<ExchangerBase::BlockWrapper>;
225
226
template <typename BlockType>
227
class Exchanger : public ExchangerBase {
228
public:
229
    Exchanger(int running_sink_operators, int num_partitions, int free_block_limit)
230
79.4k
            : ExchangerBase(running_sink_operators, num_partitions, free_block_limit) {
231
79.4k
        _data_queue.resize(num_partitions);
232
79.4k
        _m.resize(num_partitions);
233
558k
        for (size_t i = 0; i < num_partitions; i++) {
234
478k
            _m[i] = std::make_unique<std::mutex>();
235
478k
        }
236
79.4k
    }
_ZN5doris9ExchangerISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEEC2Eiii
Line
Count
Source
230
78.1k
            : ExchangerBase(running_sink_operators, num_partitions, free_block_limit) {
231
78.1k
        _data_queue.resize(num_partitions);
232
78.1k
        _m.resize(num_partitions);
233
545k
        for (size_t i = 0; i < num_partitions; i++) {
234
467k
            _m[i] = std::make_unique<std::mutex>();
235
467k
        }
236
78.1k
    }
_ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEEC2Eiii
Line
Count
Source
230
1.35k
            : ExchangerBase(running_sink_operators, num_partitions, free_block_limit) {
231
1.35k
        _data_queue.resize(num_partitions);
232
1.35k
        _m.resize(num_partitions);
233
12.7k
        for (size_t i = 0; i < num_partitions; i++) {
234
11.3k
            _m[i] = std::make_unique<std::mutex>();
235
11.3k
        }
236
1.35k
    }
237
    Exchanger(int running_sink_operators, int num_sources, int num_partitions, int free_block_limit)
238
22.5k
            : ExchangerBase(running_sink_operators, num_sources, num_partitions, free_block_limit) {
239
22.5k
        _data_queue.resize(num_sources);
240
22.5k
        _m.resize(num_sources);
241
139k
        for (size_t i = 0; i < num_sources; i++) {
242
116k
            _m[i] = std::make_unique<std::mutex>();
243
116k
        }
244
22.5k
    }
245
102k
    ~Exchanger() override = default;
_ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEED2Ev
Line
Count
Source
245
22.6k
    ~Exchanger() override = default;
_ZN5doris9ExchangerISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEED2Ev
Line
Count
Source
245
78.2k
    ~Exchanger() override = default;
_ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEED2Ev
Line
Count
Source
245
1.35k
    ~Exchanger() override = default;
246
146
    std::string data_queue_debug_string(int i) override {
247
146
        return fmt::format("Data Queue {}: [size approx = {}, eos = {}]", i,
248
146
                           _data_queue[i].data_queue.size_approx(), _data_queue[i].eos);
249
146
    }
_ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE23data_queue_debug_stringB5cxx11Ei
Line
Count
Source
246
12
    std::string data_queue_debug_string(int i) override {
247
12
        return fmt::format("Data Queue {}: [size approx = {}, eos = {}]", i,
248
12
                           _data_queue[i].data_queue.size_approx(), _data_queue[i].eos);
249
12
    }
_ZN5doris9ExchangerISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE23data_queue_debug_stringB5cxx11Ei
Line
Count
Source
246
134
    std::string data_queue_debug_string(int i) override {
247
134
        return fmt::format("Data Queue {}: [size approx = {}, eos = {}]", i,
248
134
                           _data_queue[i].data_queue.size_approx(), _data_queue[i].eos);
249
134
    }
Unexecuted instantiation: _ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE23data_queue_debug_stringB5cxx11Ei
250
251
protected:
252
    // Enqueue data block and set downstream source operator to read.
253
    void _enqueue_data_and_set_ready(int channel_id, LocalExchangeSinkLocalState* local_state,
254
                                     BlockType&& block);
255
    bool _dequeue_data(LocalExchangeSourceLocalState* local_state, BlockType& block, bool* eos,
256
                       Block* data_block, int channel_id);
257
258
    void _enqueue_data_and_set_ready(int channel_id, BlockType&& block);
259
    bool _dequeue_data(BlockType& block, bool* eos, Block* data_block, int channel_id);
260
    std::vector<BlockQueue<BlockType>> _data_queue;
261
    std::vector<std::unique_ptr<std::mutex>> _m;
262
};
263
264
class LocalExchangeSourceLocalState;
265
class LocalExchangeSinkLocalState;
266
267
class ShuffleExchanger : public Exchanger<PartitionedBlock> {
268
public:
269
    ENABLE_FACTORY_CREATOR(ShuffleExchanger);
270
    ShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions,
271
                     int free_block_limit)
272
22.5k
            : Exchanger<PartitionedBlock>(running_sink_operators, num_sources, num_partitions,
273
22.5k
                                          free_block_limit) {
274
22.5k
        DCHECK_GT(num_partitions, 0);
275
22.5k
        DCHECK_GT(num_sources, 0);
276
22.5k
        _partition_rows_histogram.resize(running_sink_operators);
277
22.5k
    }
278
22.6k
    ~ShuffleExchanger() override = default;
279
    Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile,
280
                SinkInfo& sink_info) override;
281
282
    Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile,
283
                     SourceInfo&& source_info) override;
284
    void close(SourceInfo&& source_info) override;
285
335k
    ExchangeType get_type() const override { return ExchangeType::HASH_SHUFFLE; }
286
287
protected:
288
    Status _split_rows(RuntimeState* state, const std::vector<uint32_t>& channel_ids, Block* block,
289
                       int channel_id, LocalExchangeSinkLocalState* local_state,
290
                       std::map<int, int>* shuffle_idx_to_instance_idx);
291
    Status _split_rows(RuntimeState* state, const std::vector<uint32_t>& channel_ids, Block* block,
292
                       int channel_id);
293
    std::vector<std::vector<uint32_t>> _partition_rows_histogram;
294
};
295
296
class BucketShuffleExchanger final : public ShuffleExchanger {
297
    ENABLE_FACTORY_CREATOR(BucketShuffleExchanger);
298
    BucketShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions,
299
                           int free_block_limit)
300
547
            : ShuffleExchanger(running_sink_operators, num_sources, num_partitions,
301
547
                               free_block_limit) {}
302
    ~BucketShuffleExchanger() override = default;
303
19.6k
    ExchangeType get_type() const override { return ExchangeType::BUCKET_HASH_SHUFFLE; }
304
};
305
306
class PassthroughExchanger final : public Exchanger<BlockWrapperSPtr> {
307
public:
308
    ENABLE_FACTORY_CREATOR(PassthroughExchanger);
309
    PassthroughExchanger(int running_sink_operators, int num_partitions, int free_block_limit)
310
76.3k
            : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions,
311
76.3k
                                          free_block_limit) {}
312
    ~PassthroughExchanger() override = default;
313
    Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile,
314
                SinkInfo& sink_info) override;
315
316
    Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile,
317
                     SourceInfo&& source_info) override;
318
1.66M
    ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH; }
319
    void close(SourceInfo&& source_info) override;
320
};
321
322
class PassToOneExchanger final : public Exchanger<BlockWrapperSPtr> {
323
public:
324
    ENABLE_FACTORY_CREATOR(PassToOneExchanger);
325
    PassToOneExchanger(int running_sink_operators, int num_partitions, int free_block_limit)
326
863
            : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions,
327
863
                                          free_block_limit) {}
328
    ~PassToOneExchanger() override = default;
329
    Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile,
330
                SinkInfo& sink_info) override;
331
332
    Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile,
333
                     SourceInfo&& source_info) override;
334
18.4k
    ExchangeType get_type() const override { return ExchangeType::PASS_TO_ONE; }
335
    void close(SourceInfo&& source_info) override;
336
};
337
class BroadcastExchanger final : public Exchanger<BroadcastBlock> {
338
public:
339
    ENABLE_FACTORY_CREATOR(BroadcastExchanger);
340
    BroadcastExchanger(int running_sink_operators, int num_partitions, int free_block_limit)
341
1.35k
            : Exchanger<BroadcastBlock>(running_sink_operators, num_partitions, free_block_limit) {}
342
    ~BroadcastExchanger() override = default;
343
    Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile,
344
                SinkInfo& sink_info) override;
345
346
    Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile,
347
                     SourceInfo&& source_info) override;
348
36.5k
    ExchangeType get_type() const override { return ExchangeType::BROADCAST; }
349
    void close(SourceInfo&& source_info) override;
350
};
351
352
//The code in AdaptivePassthroughExchanger is essentially
353
// a copy of ShuffleExchanger and PassthroughExchanger.
354
class AdaptivePassthroughExchanger : public Exchanger<BlockWrapperSPtr> {
355
public:
356
    ENABLE_FACTORY_CREATOR(AdaptivePassthroughExchanger);
357
    AdaptivePassthroughExchanger(int running_sink_operators, int num_partitions,
358
                                 int free_block_limit)
359
798
            : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions,
360
798
                                          free_block_limit) {
361
798
        _partition_rows_histogram.resize(running_sink_operators);
362
798
    }
363
    Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile,
364
                SinkInfo& sink_info) override;
365
366
    Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile,
367
                     SourceInfo&& source_info) override;
368
25.6k
    ExchangeType get_type() const override { return ExchangeType::ADAPTIVE_PASSTHROUGH; }
369
370
    void close(SourceInfo&& source_info) override;
371
372
private:
373
    Status _passthrough_sink(RuntimeState* state, Block* in_block, SinkInfo& sink_info);
374
    Status _shuffle_sink(RuntimeState* state, Block* in_block, SinkInfo& sink_info);
375
    Status _split_rows(RuntimeState* state, const std::vector<uint32_t>& channel_ids, Block* block,
376
                       SinkInfo& sink_info);
377
378
    std::atomic_bool _is_pass_through = false;
379
    std::atomic_int32_t _total_block = 0;
380
    std::vector<std::vector<uint32_t>> _partition_rows_histogram;
381
};
382
} // namespace doris