Coverage Report

Created: 2026-03-12 17:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/exchange/local_exchanger.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include "exec/operator/operator.h"
21
#include "exec/pipeline/dependency.h"
22
23
namespace doris {
24
#include "common/compile_check_begin.h"
25
template <typename T>
26
void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks,
27
                  RuntimeProfile::Counter* memory_used_counter = nullptr);
28
29
class PartitionerBase;
30
class LocalExchangeSourceLocalState;
31
class LocalExchangeSinkLocalState;
32
33
struct Profile {
34
    RuntimeProfile::Counter* compute_hash_value_timer = nullptr;
35
    RuntimeProfile::Counter* distribute_timer = nullptr;
36
    RuntimeProfile::Counter* copy_data_timer = nullptr;
37
};
38
39
struct SinkInfo {
40
    int* channel_id;
41
    PartitionerBase* partitioner;
42
    LocalExchangeSinkLocalState* local_state;
43
    std::map<int, int>* shuffle_idx_to_instance_idx;
44
};
45
46
struct SourceInfo {
47
    int channel_id;
48
    LocalExchangeSourceLocalState* local_state;
49
};
50
/**
51
 * One exchanger is hold by one `LocalExchangeSharedState`. And one `LocalExchangeSharedState` is
52
 * shared by all local exchange sink operators and source operators with the same id.
53
 *
54
 * In exchanger, two block queues is maintained, one is data block queue and another is free block queue.
55
 *
56
 * In details, data block queue has queues as many as source operators. Each source operator will get
57
 * data block from the corresponding queue. Data blocks is push into the queue by sink operators. One
58
 * sink operator will push blocks into one or more queues.
59
 *
60
 * Free block is used to reuse the allocated memory. To reduce the memory limit, we also use a conf
61
 * to limit the size of free block queue.
62
 */
63
class ExchangerBase {
64
public:
65
    /**
66
     * `BlockWrapper` is used to wrap a data block with a reference count.
67
     *
68
     * In function `unref()`, if `ref_count` decremented to 0, which means this block is not needed by
69
     * operators, so we put it into `_free_blocks` to reuse its memory if needed and refresh memory usage
70
     * in current queue.
71
     *
72
     * Note: `ref_count` will be larger than 1 only if this block is shared between multiple queues in
73
     * shuffle exchanger.
74
     */
75
    class BlockWrapper {
76
    public:
77
        ENABLE_FACTORY_CREATOR(BlockWrapper);
78
        BlockWrapper(Block&& data_block, LocalExchangeSharedState* shared_state, int channel_id)
79
238k
                : _data_block(std::move(data_block)),
80
238k
                  _shared_state(shared_state),
81
238k
                  _allocated_bytes(_data_block.allocated_bytes()) {
82
238k
            if (_shared_state) {
83
238k
                _shared_state->add_total_mem_usage(_allocated_bytes);
84
238k
            }
85
238k
        }
86
238k
        ~BlockWrapper() {
87
238k
            if (_shared_state != nullptr) {
88
238k
                DCHECK_GT(_allocated_bytes, 0);
89
                // `_channel_ids` may be empty if exchanger is shuffled exchanger and channel id is
90
                // not used by `sub_total_mem_usage`. So we just pass -1 here.
91
238k
                _shared_state->sub_total_mem_usage(_allocated_bytes);
92
238k
                if (_shared_state->exchanger->_free_block_limit == 0 ||
93
238k
                    _shared_state->exchanger->_free_blocks.size_approx() <
94
238k
                            _shared_state->exchanger->_free_block_limit *
95
238k
                                    _shared_state->exchanger->_num_sources) {
96
233k
                    _data_block.clear_column_data();
97
                    // Free blocks is used to improve memory efficiency. Failure during pushing back
98
                    // free block will not incur any bad result so just ignore the return value.
99
233k
                    _shared_state->exchanger->_free_blocks.enqueue(std::move(_data_block));
100
233k
                }
101
238k
            };
102
238k
        }
103
275k
        void record_channel_id(int channel_id) {
104
275k
            _channel_ids.push_back(channel_id);
105
275k
            if (_shared_state) {
106
275k
                _shared_state->add_mem_usage(channel_id, _allocated_bytes);
107
275k
            }
108
275k
        }
109
110
    private:
111
        friend class ShuffleExchanger;
112
        friend class BucketShuffleExchanger;
113
        friend class PassthroughExchanger;
114
        friend class BroadcastExchanger;
115
        friend class PassToOneExchanger;
116
        friend class AdaptivePassthroughExchanger;
117
        template <typename BlockType>
118
        friend class Exchanger;
119
120
        Block _data_block;
121
        LocalExchangeSharedState* _shared_state;
122
        std::vector<int> _channel_ids;
123
        const size_t _allocated_bytes;
124
    };
125
    ExchangerBase(int running_sink_operators, int num_partitions, int free_block_limit)
126
103k
            : _running_sink_operators(running_sink_operators),
127
103k
              _running_source_operators(num_partitions),
128
103k
              _num_partitions(num_partitions),
129
103k
              _num_senders(running_sink_operators),
130
103k
              _num_sources(num_partitions),
131
103k
              _free_block_limit(free_block_limit) {}
132
    ExchangerBase(int running_sink_operators, int num_sources, int num_partitions,
133
                  int free_block_limit)
134
20.1k
            : _running_sink_operators(running_sink_operators),
135
20.1k
              _running_source_operators(num_sources),
136
20.1k
              _num_partitions(num_partitions),
137
20.1k
              _num_senders(running_sink_operators),
138
20.1k
              _num_sources(num_sources),
139
20.1k
              _free_block_limit(free_block_limit) {}
140
124k
    virtual ~ExchangerBase() = default;
141
    virtual Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile,
142
                             SourceInfo&& source_info) = 0;
143
    virtual Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile,
144
                        SinkInfo& sink_info) = 0;
145
    virtual ExchangeType get_type() const = 0;
146
    // Called if a local exchanger source operator are closed. Free the unused data block in data_queue.
147
    virtual void close(SourceInfo&& source_info) = 0;
148
    // Called if all local exchanger source operators are closed. We free the memory in
149
    // `_free_blocks` here.
150
    virtual void finalize();
151
152
    virtual std::string data_queue_debug_string(int i) = 0;
153
154
134
    void set_low_memory_mode() {
155
134
        _free_block_limit = 0;
156
134
        clear_blocks(_free_blocks);
157
134
    }
158
159
protected:
160
    friend struct LocalExchangeSharedState;
161
    friend class LocalExchangeSourceLocalState;
162
    friend class LocalExchangeSinkOperatorX;
163
    friend class LocalExchangeSinkLocalState;
164
    std::atomic<int> _running_sink_operators = 0;
165
    std::atomic<int> _running_source_operators = 0;
166
    const int _num_partitions;
167
    const int _num_senders;
168
    const int _num_sources;
169
    std::atomic_int _free_block_limit = 0;
170
    moodycamel::ConcurrentQueue<Block> _free_blocks;
171
};
172
173
struct PartitionedRowIdxs {
174
    std::shared_ptr<PODArray<uint32_t>> row_idxs;
175
    uint32_t offset_start;
176
    uint32_t length;
177
};
178
179
using PartitionedBlock =
180
        std::pair<std::shared_ptr<ExchangerBase::BlockWrapper>, PartitionedRowIdxs>;
181
182
struct BroadcastRowRange {
183
    uint32_t offset_start;
184
    size_t length;
185
};
186
using BroadcastBlock = std::pair<std::shared_ptr<ExchangerBase::BlockWrapper>, BroadcastRowRange>;
187
188
template <typename BlockType>
189
struct BlockQueue {
190
    std::atomic<bool> eos = false;
191
    moodycamel::ConcurrentQueue<BlockType> data_queue;
192
    moodycamel::ProducerToken ptok {data_queue};
193
809k
    BlockQueue() : eos(false), data_queue(moodycamel::ConcurrentQueue<BlockType>()) {}
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEEC2Ev
Line
Count
Source
193
119k
    BlockQueue() : eos(false), data_queue(moodycamel::ConcurrentQueue<BlockType>()) {}
_ZN5doris10BlockQueueISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEEC2Ev
Line
Count
Source
193
680k
    BlockQueue() : eos(false), data_queue(moodycamel::ConcurrentQueue<BlockType>()) {}
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEEC2Ev
Line
Count
Source
193
9.57k
    BlockQueue() : eos(false), data_queue(moodycamel::ConcurrentQueue<BlockType>()) {}
194
    BlockQueue(BlockQueue<BlockType>&& other)
195
0
            : eos(other.eos.load()), data_queue(std::move(other.data_queue)) {}
Unexecuted instantiation: _ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEEC2EOS8_
Unexecuted instantiation: _ZN5doris10BlockQueueISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEEC2EOS5_
Unexecuted instantiation: _ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEEC2EOS8_
196
    inline bool enqueue(BlockType const& item) {
197
        if (!eos) {
198
            if (!data_queue.enqueue(ptok, item)) [[unlikely]] {
199
                throw Exception(ErrorCode::INTERNAL_ERROR,
200
                                "Exception occurs in data queue [size = {}] of local exchange.",
201
                                data_queue.size_approx());
202
            }
203
            return true;
204
        }
205
        return false;
206
    }
207
208
275k
    inline bool enqueue(BlockType&& item) {
209
275k
        if (!eos) {
210
275k
            if (!data_queue.enqueue(ptok, std::move(item))) [[unlikely]] {
211
0
                throw Exception(ErrorCode::INTERNAL_ERROR,
212
0
                                "Exception occurs in data queue [size = {}] of local exchange.",
213
0
                                data_queue.size_approx());
214
0
            }
215
275k
            return true;
216
275k
        }
217
80
        return false;
218
275k
    }
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE7enqueueEOS7_
Line
Count
Source
208
44.3k
    inline bool enqueue(BlockType&& item) {
209
44.3k
        if (!eos) {
210
44.2k
            if (!data_queue.enqueue(ptok, std::move(item))) [[unlikely]] {
211
0
                throw Exception(ErrorCode::INTERNAL_ERROR,
212
0
                                "Exception occurs in data queue [size = {}] of local exchange.",
213
0
                                data_queue.size_approx());
214
0
            }
215
44.2k
            return true;
216
44.2k
        }
217
86
        return false;
218
44.3k
    }
_ZN5doris10BlockQueueISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE7enqueueEOS4_
Line
Count
Source
208
213k
    inline bool enqueue(BlockType&& item) {
209
213k
        if (!eos) {
210
213k
            if (!data_queue.enqueue(ptok, std::move(item))) [[unlikely]] {
211
0
                throw Exception(ErrorCode::INTERNAL_ERROR,
212
0
                                "Exception occurs in data queue [size = {}] of local exchange.",
213
0
                                data_queue.size_approx());
214
0
            }
215
213k
            return true;
216
213k
        }
217
18.4E
        return false;
218
213k
    }
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE7enqueueEOS7_
Line
Count
Source
208
17.5k
    inline bool enqueue(BlockType&& item) {
209
17.5k
        if (!eos) {
210
17.5k
            if (!data_queue.enqueue(ptok, std::move(item))) [[unlikely]] {
211
0
                throw Exception(ErrorCode::INTERNAL_ERROR,
212
0
                                "Exception occurs in data queue [size = {}] of local exchange.",
213
0
                                data_queue.size_approx());
214
0
            }
215
17.5k
            return true;
216
17.5k
        }
217
16
        return false;
218
17.5k
    }
219
220
2.16M
    bool try_dequeue(BlockType& item) { return data_queue.try_dequeue(item); }
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE11try_dequeueERS7_
Line
Count
Source
220
332k
    bool try_dequeue(BlockType& item) { return data_queue.try_dequeue(item); }
_ZN5doris10BlockQueueISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE11try_dequeueERS4_
Line
Count
Source
220
1.79M
    bool try_dequeue(BlockType& item) { return data_queue.try_dequeue(item); }
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE11try_dequeueERS7_
Line
Count
Source
220
44.9k
    bool try_dequeue(BlockType& item) { return data_queue.try_dequeue(item); }
221
222
807k
    void set_eos() { eos = true; }
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE7set_eosEv
Line
Count
Source
222
119k
    void set_eos() { eos = true; }
_ZN5doris10BlockQueueISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE7set_eosEv
Line
Count
Source
222
678k
    void set_eos() { eos = true; }
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE7set_eosEv
Line
Count
Source
222
9.56k
    void set_eos() { eos = true; }
223
};
224
225
using BlockWrapperSPtr = std::shared_ptr<ExchangerBase::BlockWrapper>;
226
227
template <typename BlockType>
228
class Exchanger : public ExchangerBase {
229
public:
230
    Exchanger(int running_sink_operators, int num_partitions, int free_block_limit)
231
103k
            : ExchangerBase(running_sink_operators, num_partitions, free_block_limit) {
232
103k
        _data_queue.resize(num_partitions);
233
103k
        _m.resize(num_partitions);
234
793k
        for (size_t i = 0; i < num_partitions; i++) {
235
690k
            _m[i] = std::make_unique<std::mutex>();
236
690k
        }
237
103k
    }
_ZN5doris9ExchangerISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEEC2Eiii
Line
Count
Source
231
102k
            : ExchangerBase(running_sink_operators, num_partitions, free_block_limit) {
232
102k
        _data_queue.resize(num_partitions);
233
102k
        _m.resize(num_partitions);
234
782k
        for (size_t i = 0; i < num_partitions; i++) {
235
680k
            _m[i] = std::make_unique<std::mutex>();
236
680k
        }
237
102k
    }
_ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEEC2Eiii
Line
Count
Source
231
1.27k
            : ExchangerBase(running_sink_operators, num_partitions, free_block_limit) {
232
1.27k
        _data_queue.resize(num_partitions);
233
1.27k
        _m.resize(num_partitions);
234
10.8k
        for (size_t i = 0; i < num_partitions; i++) {
235
9.57k
            _m[i] = std::make_unique<std::mutex>();
236
9.57k
        }
237
1.27k
    }
238
    Exchanger(int running_sink_operators, int num_sources, int num_partitions, int free_block_limit)
239
20.1k
            : ExchangerBase(running_sink_operators, num_sources, num_partitions, free_block_limit) {
240
20.1k
        _data_queue.resize(num_sources);
241
20.1k
        _m.resize(num_sources);
242
139k
        for (size_t i = 0; i < num_sources; i++) {
243
119k
            _m[i] = std::make_unique<std::mutex>();
244
119k
        }
245
20.1k
    }
246
124k
    ~Exchanger() override = default;
_ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEED2Ev
Line
Count
Source
246
20.2k
    ~Exchanger() override = default;
_ZN5doris9ExchangerISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEED2Ev
Line
Count
Source
246
102k
    ~Exchanger() override = default;
_ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEED2Ev
Line
Count
Source
246
1.27k
    ~Exchanger() override = default;
247
60
    std::string data_queue_debug_string(int i) override {
248
60
        return fmt::format("Data Queue {}: [size approx = {}, eos = {}]", i,
249
60
                           _data_queue[i].data_queue.size_approx(), _data_queue[i].eos);
250
60
    }
Unexecuted instantiation: _ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE23data_queue_debug_stringB5cxx11Ei
_ZN5doris9ExchangerISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE23data_queue_debug_stringB5cxx11Ei
Line
Count
Source
247
60
    std::string data_queue_debug_string(int i) override {
248
60
        return fmt::format("Data Queue {}: [size approx = {}, eos = {}]", i,
249
60
                           _data_queue[i].data_queue.size_approx(), _data_queue[i].eos);
250
60
    }
Unexecuted instantiation: _ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE23data_queue_debug_stringB5cxx11Ei
251
252
protected:
253
    // Enqueue data block and set downstream source operator to read.
254
    void _enqueue_data_and_set_ready(int channel_id, LocalExchangeSinkLocalState* local_state,
255
                                     BlockType&& block);
256
    bool _dequeue_data(LocalExchangeSourceLocalState* local_state, BlockType& block, bool* eos,
257
                       Block* data_block, int channel_id);
258
259
    void _enqueue_data_and_set_ready(int channel_id, BlockType&& block);
260
    bool _dequeue_data(BlockType& block, bool* eos, Block* data_block, int channel_id);
261
    std::vector<BlockQueue<BlockType>> _data_queue;
262
    std::vector<std::unique_ptr<std::mutex>> _m;
263
};
264
265
class LocalExchangeSourceLocalState;
266
class LocalExchangeSinkLocalState;
267
268
class ShuffleExchanger : public Exchanger<PartitionedBlock> {
269
public:
270
    ENABLE_FACTORY_CREATOR(ShuffleExchanger);
271
    ShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions,
272
                     int free_block_limit)
273
20.1k
            : Exchanger<PartitionedBlock>(running_sink_operators, num_sources, num_partitions,
274
20.1k
                                          free_block_limit) {
275
20.1k
        DCHECK_GT(num_partitions, 0);
276
20.1k
        DCHECK_GT(num_sources, 0);
277
20.1k
        _partition_rows_histogram.resize(running_sink_operators);
278
20.1k
    }
279
20.2k
    ~ShuffleExchanger() override = default;
280
    Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile,
281
                SinkInfo& sink_info) override;
282
283
    Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile,
284
                     SourceInfo&& source_info) override;
285
    void close(SourceInfo&& source_info) override;
286
336k
    ExchangeType get_type() const override { return ExchangeType::HASH_SHUFFLE; }
287
288
protected:
289
    Status _split_rows(RuntimeState* state, const std::vector<uint32_t>& channel_ids, Block* block,
290
                       int channel_id, LocalExchangeSinkLocalState* local_state,
291
                       std::map<int, int>* shuffle_idx_to_instance_idx);
292
    Status _split_rows(RuntimeState* state, const std::vector<uint32_t>& channel_ids, Block* block,
293
                       int channel_id);
294
    std::vector<std::vector<uint32_t>> _partition_rows_histogram;
295
};
296
297
class BucketShuffleExchanger final : public ShuffleExchanger {
298
    ENABLE_FACTORY_CREATOR(BucketShuffleExchanger);
299
    BucketShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions,
300
                           int free_block_limit)
301
535
            : ShuffleExchanger(running_sink_operators, num_sources, num_partitions,
302
535
                               free_block_limit) {}
303
    ~BucketShuffleExchanger() override = default;
304
33.0k
    ExchangeType get_type() const override { return ExchangeType::BUCKET_HASH_SHUFFLE; }
305
};
306
307
class PassthroughExchanger final : public Exchanger<BlockWrapperSPtr> {
308
public:
309
    ENABLE_FACTORY_CREATOR(PassthroughExchanger);
310
    PassthroughExchanger(int running_sink_operators, int num_partitions, int free_block_limit)
311
99.1k
            : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions,
312
99.1k
                                          free_block_limit) {}
313
    ~PassthroughExchanger() override = default;
314
    Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile,
315
                SinkInfo& sink_info) override;
316
317
    Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile,
318
                     SourceInfo&& source_info) override;
319
2.37M
    ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH; }
320
    void close(SourceInfo&& source_info) override;
321
};
322
323
class PassToOneExchanger final : public Exchanger<BlockWrapperSPtr> {
324
public:
325
    ENABLE_FACTORY_CREATOR(PassToOneExchanger);
326
    PassToOneExchanger(int running_sink_operators, int num_partitions, int free_block_limit)
327
2.15k
            : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions,
328
2.15k
                                          free_block_limit) {}
329
    ~PassToOneExchanger() override = default;
330
    Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile,
331
                SinkInfo& sink_info) override;
332
333
    Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile,
334
                     SourceInfo&& source_info) override;
335
56.9k
    ExchangeType get_type() const override { return ExchangeType::PASS_TO_ONE; }
336
    void close(SourceInfo&& source_info) override;
337
};
338
class BroadcastExchanger final : public Exchanger<BroadcastBlock> {
339
public:
340
    ENABLE_FACTORY_CREATOR(BroadcastExchanger);
341
    BroadcastExchanger(int running_sink_operators, int num_partitions, int free_block_limit)
342
1.27k
            : Exchanger<BroadcastBlock>(running_sink_operators, num_partitions, free_block_limit) {}
343
    ~BroadcastExchanger() override = default;
344
    Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile,
345
                SinkInfo& sink_info) override;
346
347
    Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile,
348
                     SourceInfo&& source_info) override;
349
31.0k
    ExchangeType get_type() const override { return ExchangeType::BROADCAST; }
350
    void close(SourceInfo&& source_info) override;
351
};
352
353
//The code in AdaptivePassthroughExchanger is essentially
354
// a copy of ShuffleExchanger and PassthroughExchanger.
355
class AdaptivePassthroughExchanger : public Exchanger<BlockWrapperSPtr> {
356
public:
357
    ENABLE_FACTORY_CREATOR(AdaptivePassthroughExchanger);
358
    AdaptivePassthroughExchanger(int running_sink_operators, int num_partitions,
359
                                 int free_block_limit)
360
952
            : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions,
361
952
                                          free_block_limit) {
362
952
        _partition_rows_histogram.resize(running_sink_operators);
363
952
    }
364
    Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile,
365
                SinkInfo& sink_info) override;
366
367
    Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile,
368
                     SourceInfo&& source_info) override;
369
25.3k
    ExchangeType get_type() const override { return ExchangeType::ADAPTIVE_PASSTHROUGH; }
370
371
    void close(SourceInfo&& source_info) override;
372
373
private:
374
    Status _passthrough_sink(RuntimeState* state, Block* in_block, SinkInfo& sink_info);
375
    Status _shuffle_sink(RuntimeState* state, Block* in_block, SinkInfo& sink_info);
376
    Status _split_rows(RuntimeState* state, const std::vector<uint32_t>& channel_ids, Block* block,
377
                       SinkInfo& sink_info);
378
379
    std::atomic_bool _is_pass_through = false;
380
    std::atomic_int32_t _total_block = 0;
381
    std::vector<std::vector<uint32_t>> _partition_rows_histogram;
382
};
383
#include "common/compile_check_end.h"
384
} // namespace doris