Coverage Report

Created: 2026-03-13 10:59

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/exchange/local_exchanger.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include "exec/operator/operator.h"
21
#include "exec/pipeline/dependency.h"
22
23
namespace doris {
24
#include "common/compile_check_begin.h"
25
template <typename T>
26
void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks,
27
                  RuntimeProfile::Counter* memory_used_counter = nullptr);
28
29
class PartitionerBase;
30
class LocalExchangeSourceLocalState;
31
class LocalExchangeSinkLocalState;
32
33
struct Profile {
34
    RuntimeProfile::Counter* compute_hash_value_timer = nullptr;
35
    RuntimeProfile::Counter* distribute_timer = nullptr;
36
    RuntimeProfile::Counter* copy_data_timer = nullptr;
37
};
38
39
struct SinkInfo {
40
    int* channel_id;
41
    PartitionerBase* partitioner;
42
    LocalExchangeSinkLocalState* local_state;
43
    std::map<int, int>* shuffle_idx_to_instance_idx;
44
};
45
46
struct SourceInfo {
47
    int channel_id;
48
    LocalExchangeSourceLocalState* local_state;
49
};
50
/**
51
 * One exchanger is hold by one `LocalExchangeSharedState`. And one `LocalExchangeSharedState` is
52
 * shared by all local exchange sink operators and source operators with the same id.
53
 *
54
 * In exchanger, two block queues is maintained, one is data block queue and another is free block queue.
55
 *
56
 * In details, data block queue has queues as many as source operators. Each source operator will get
57
 * data block from the corresponding queue. Data blocks is push into the queue by sink operators. One
58
 * sink operator will push blocks into one or more queues.
59
 *
60
 * Free block is used to reuse the allocated memory. To reduce the memory limit, we also use a conf
61
 * to limit the size of free block queue.
62
 */
63
class ExchangerBase {
64
public:
65
    /**
66
     * `BlockWrapper` is used to wrap a data block with a reference count.
67
     *
68
     * In function `unref()`, if `ref_count` decremented to 0, which means this block is not needed by
69
     * operators, so we put it into `_free_blocks` to reuse its memory if needed and refresh memory usage
70
     * in current queue.
71
     *
72
     * Note: `ref_count` will be larger than 1 only if this block is shared between multiple queues in
73
     * shuffle exchanger.
74
     */
75
    class BlockWrapper {
76
    public:
77
        ENABLE_FACTORY_CREATOR(BlockWrapper);
78
        BlockWrapper(Block&& data_block, LocalExchangeSharedState* shared_state, int channel_id)
79
229k
                : _data_block(std::move(data_block)),
80
229k
                  _shared_state(shared_state),
81
229k
                  _allocated_bytes(_data_block.allocated_bytes()) {
82
229k
            if (_shared_state) {
83
229k
                _shared_state->add_total_mem_usage(_allocated_bytes);
84
229k
            }
85
229k
        }
86
228k
        ~BlockWrapper() {
87
228k
            if (_shared_state != nullptr) {
88
228k
                DCHECK_GT(_allocated_bytes, 0);
89
                // `_channel_ids` may be empty if exchanger is shuffled exchanger and channel id is
90
                // not used by `sub_total_mem_usage`. So we just pass -1 here.
91
228k
                _shared_state->sub_total_mem_usage(_allocated_bytes);
92
228k
                if (_shared_state->exchanger->_free_block_limit == 0 ||
93
228k
                    _shared_state->exchanger->_free_blocks.size_approx() <
94
228k
                            _shared_state->exchanger->_free_block_limit *
95
228k
                                    _shared_state->exchanger->_num_sources) {
96
225k
                    _data_block.clear_column_data();
97
                    // Free blocks is used to improve memory efficiency. Failure during pushing back
98
                    // free block will not incur any bad result so just ignore the return value.
99
225k
                    _shared_state->exchanger->_free_blocks.enqueue(std::move(_data_block));
100
225k
                }
101
228k
            };
102
228k
        }
103
266k
        void record_channel_id(int channel_id) {
104
266k
            _channel_ids.push_back(channel_id);
105
266k
            if (_shared_state) {
106
266k
                _shared_state->add_mem_usage(channel_id, _allocated_bytes);
107
266k
            }
108
266k
        }
109
110
    private:
111
        friend class ShuffleExchanger;
112
        friend class BucketShuffleExchanger;
113
        friend class PassthroughExchanger;
114
        friend class BroadcastExchanger;
115
        friend class PassToOneExchanger;
116
        friend class AdaptivePassthroughExchanger;
117
        template <typename BlockType>
118
        friend class Exchanger;
119
120
        Block _data_block;
121
        LocalExchangeSharedState* _shared_state;
122
        std::vector<int> _channel_ids;
123
        const size_t _allocated_bytes;
124
    };
125
    ExchangerBase(int running_sink_operators, int num_partitions, int free_block_limit)
126
103k
            : _running_sink_operators(running_sink_operators),
127
103k
              _running_source_operators(num_partitions),
128
103k
              _num_partitions(num_partitions),
129
103k
              _num_senders(running_sink_operators),
130
103k
              _num_sources(num_partitions),
131
103k
              _free_block_limit(free_block_limit) {}
132
    ExchangerBase(int running_sink_operators, int num_sources, int num_partitions,
133
                  int free_block_limit)
134
15.0k
            : _running_sink_operators(running_sink_operators),
135
15.0k
              _running_source_operators(num_sources),
136
15.0k
              _num_partitions(num_partitions),
137
15.0k
              _num_senders(running_sink_operators),
138
15.0k
              _num_sources(num_sources),
139
15.0k
              _free_block_limit(free_block_limit) {}
140
119k
    virtual ~ExchangerBase() = default;
141
    virtual Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile,
142
                             SourceInfo&& source_info) = 0;
143
    virtual Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile,
144
                        SinkInfo& sink_info) = 0;
145
    virtual ExchangeType get_type() const = 0;
146
    // Called if a local exchanger source operator are closed. Free the unused data block in data_queue.
147
    virtual void close(SourceInfo&& source_info) = 0;
148
    // Called if all local exchanger source operators are closed. We free the memory in
149
    // `_free_blocks` here.
150
    virtual void finalize();
151
152
    virtual std::string data_queue_debug_string(int i) = 0;
153
154
132
    void set_low_memory_mode() {
155
132
        _free_block_limit = 0;
156
132
        clear_blocks(_free_blocks);
157
132
    }
158
159
protected:
160
    friend struct LocalExchangeSharedState;
161
    friend class LocalExchangeSourceLocalState;
162
    friend class LocalExchangeSinkOperatorX;
163
    friend class LocalExchangeSinkLocalState;
164
    std::atomic<int> _running_sink_operators = 0;
165
    std::atomic<int> _running_source_operators = 0;
166
    const int _num_partitions;
167
    const int _num_senders;
168
    const int _num_sources;
169
    std::atomic_int _free_block_limit = 0;
170
    moodycamel::ConcurrentQueue<Block> _free_blocks;
171
};
172
173
struct PartitionedRowIdxs {
174
    std::shared_ptr<PODArray<uint32_t>> row_idxs;
175
    uint32_t offset_start;
176
    uint32_t length;
177
};
178
179
using PartitionedBlock =
180
        std::pair<std::shared_ptr<ExchangerBase::BlockWrapper>, PartitionedRowIdxs>;
181
182
struct BroadcastRowRange {
183
    uint32_t offset_start;
184
    size_t length;
185
};
186
using BroadcastBlock = std::pair<std::shared_ptr<ExchangerBase::BlockWrapper>, BroadcastRowRange>;
187
188
template <typename BlockType>
189
struct BlockQueue {
190
    std::atomic<bool> eos = false;
191
    moodycamel::ConcurrentQueue<BlockType> data_queue;
192
    moodycamel::ProducerToken ptok {data_queue};
193
860k
    BlockQueue() : eos(false), data_queue(moodycamel::ConcurrentQueue<BlockType>()) {}
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEEC2Ev
Line
Count
Source
193
127k
    BlockQueue() : eos(false), data_queue(moodycamel::ConcurrentQueue<BlockType>()) {}
_ZN5doris10BlockQueueISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEEC2Ev
Line
Count
Source
193
726k
    BlockQueue() : eos(false), data_queue(moodycamel::ConcurrentQueue<BlockType>()) {}
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEEC2Ev
Line
Count
Source
193
6.88k
    BlockQueue() : eos(false), data_queue(moodycamel::ConcurrentQueue<BlockType>()) {}
194
    BlockQueue(BlockQueue<BlockType>&& other)
195
0
            : eos(other.eos.load()), data_queue(std::move(other.data_queue)) {}
Unexecuted instantiation: _ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEEC2EOS8_
Unexecuted instantiation: _ZN5doris10BlockQueueISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEEC2EOS5_
Unexecuted instantiation: _ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEEC2EOS8_
196
    inline bool enqueue(BlockType const& item) {
197
        if (!eos) {
198
            if (!data_queue.enqueue(ptok, item)) [[unlikely]] {
199
                throw Exception(ErrorCode::INTERNAL_ERROR,
200
                                "Exception occurs in data queue [size = {}] of local exchange.",
201
                                data_queue.size_approx());
202
            }
203
            return true;
204
        }
205
        return false;
206
    }
207
208
266k
    inline bool enqueue(BlockType&& item) {
209
266k
        if (!eos) {
210
266k
            if (!data_queue.enqueue(ptok, std::move(item))) [[unlikely]] {
211
0
                throw Exception(ErrorCode::INTERNAL_ERROR,
212
0
                                "Exception occurs in data queue [size = {}] of local exchange.",
213
0
                                data_queue.size_approx());
214
0
            }
215
266k
            return true;
216
266k
        }
217
128
        return false;
218
266k
    }
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE7enqueueEOS7_
Line
Count
Source
208
45.7k
    inline bool enqueue(BlockType&& item) {
209
45.7k
        if (!eos) {
210
45.5k
            if (!data_queue.enqueue(ptok, std::move(item))) [[unlikely]] {
211
0
                throw Exception(ErrorCode::INTERNAL_ERROR,
212
0
                                "Exception occurs in data queue [size = {}] of local exchange.",
213
0
                                data_queue.size_approx());
214
0
            }
215
45.5k
            return true;
216
45.5k
        }
217
122
        return false;
218
45.7k
    }
_ZN5doris10BlockQueueISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE7enqueueEOS4_
Line
Count
Source
208
207k
    inline bool enqueue(BlockType&& item) {
209
207k
        if (!eos) {
210
207k
            if (!data_queue.enqueue(ptok, std::move(item))) [[unlikely]] {
211
0
                throw Exception(ErrorCode::INTERNAL_ERROR,
212
0
                                "Exception occurs in data queue [size = {}] of local exchange.",
213
0
                                data_queue.size_approx());
214
0
            }
215
207k
            return true;
216
207k
        }
217
18.4E
        return false;
218
207k
    }
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE7enqueueEOS7_
Line
Count
Source
208
12.9k
    inline bool enqueue(BlockType&& item) {
209
12.9k
        if (!eos) {
210
12.9k
            if (!data_queue.enqueue(ptok, std::move(item))) [[unlikely]] {
211
0
                throw Exception(ErrorCode::INTERNAL_ERROR,
212
0
                                "Exception occurs in data queue [size = {}] of local exchange.",
213
0
                                data_queue.size_approx());
214
0
            }
215
12.9k
            return true;
216
12.9k
        }
217
15
        return false;
218
12.9k
    }
219
220
2.25M
    bool try_dequeue(BlockType& item) { return data_queue.try_dequeue(item); }
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE11try_dequeueERS7_
Line
Count
Source
220
345k
    bool try_dequeue(BlockType& item) { return data_queue.try_dequeue(item); }
_ZN5doris10BlockQueueISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE11try_dequeueERS4_
Line
Count
Source
220
1.87M
    bool try_dequeue(BlockType& item) { return data_queue.try_dequeue(item); }
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE11try_dequeueERS7_
Line
Count
Source
220
32.3k
    bool try_dequeue(BlockType& item) { return data_queue.try_dequeue(item); }
221
222
858k
    void set_eos() { eos = true; }
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE7set_eosEv
Line
Count
Source
222
127k
    void set_eos() { eos = true; }
_ZN5doris10BlockQueueISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE7set_eosEv
Line
Count
Source
222
724k
    void set_eos() { eos = true; }
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE7set_eosEv
Line
Count
Source
222
6.87k
    void set_eos() { eos = true; }
223
};
224
225
using BlockWrapperSPtr = std::shared_ptr<ExchangerBase::BlockWrapper>;
226
227
template <typename BlockType>
228
class Exchanger : public ExchangerBase {
229
public:
230
    Exchanger(int running_sink_operators, int num_partitions, int free_block_limit)
231
103k
            : ExchangerBase(running_sink_operators, num_partitions, free_block_limit) {
232
103k
        _data_queue.resize(num_partitions);
233
103k
        _m.resize(num_partitions);
234
837k
        for (size_t i = 0; i < num_partitions; i++) {
235
733k
            _m[i] = std::make_unique<std::mutex>();
236
733k
        }
237
103k
    }
_ZN5doris9ExchangerISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEEC2Eiii
Line
Count
Source
231
102k
            : ExchangerBase(running_sink_operators, num_partitions, free_block_limit) {
232
102k
        _data_queue.resize(num_partitions);
233
102k
        _m.resize(num_partitions);
234
829k
        for (size_t i = 0; i < num_partitions; i++) {
235
726k
            _m[i] = std::make_unique<std::mutex>();
236
726k
        }
237
102k
    }
_ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEEC2Eiii
Line
Count
Source
231
996
            : ExchangerBase(running_sink_operators, num_partitions, free_block_limit) {
232
996
        _data_queue.resize(num_partitions);
233
996
        _m.resize(num_partitions);
234
7.88k
        for (size_t i = 0; i < num_partitions; i++) {
235
6.88k
            _m[i] = std::make_unique<std::mutex>();
236
6.88k
        }
237
996
    }
238
    Exchanger(int running_sink_operators, int num_sources, int num_partitions, int free_block_limit)
239
15.0k
            : ExchangerBase(running_sink_operators, num_sources, num_partitions, free_block_limit) {
240
15.0k
        _data_queue.resize(num_sources);
241
15.0k
        _m.resize(num_sources);
242
142k
        for (size_t i = 0; i < num_sources; i++) {
243
127k
            _m[i] = std::make_unique<std::mutex>();
244
127k
        }
245
15.0k
    }
246
119k
    ~Exchanger() override = default;
_ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEED2Ev
Line
Count
Source
246
15.1k
    ~Exchanger() override = default;
_ZN5doris9ExchangerISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEED2Ev
Line
Count
Source
246
103k
    ~Exchanger() override = default;
_ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEED2Ev
Line
Count
Source
246
996
    ~Exchanger() override = default;
247
163
    std::string data_queue_debug_string(int i) override {
248
163
        return fmt::format("Data Queue {}: [size approx = {}, eos = {}]", i,
249
163
                           _data_queue[i].data_queue.size_approx(), _data_queue[i].eos);
250
163
    }
Unexecuted instantiation: _ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE23data_queue_debug_stringB5cxx11Ei
_ZN5doris9ExchangerISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE23data_queue_debug_stringB5cxx11Ei
Line
Count
Source
247
163
    std::string data_queue_debug_string(int i) override {
248
163
        return fmt::format("Data Queue {}: [size approx = {}, eos = {}]", i,
249
163
                           _data_queue[i].data_queue.size_approx(), _data_queue[i].eos);
250
163
    }
Unexecuted instantiation: _ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE23data_queue_debug_stringB5cxx11Ei
251
252
protected:
253
    // Enqueue data block and set downstream source operator to read.
254
    void _enqueue_data_and_set_ready(int channel_id, LocalExchangeSinkLocalState* local_state,
255
                                     BlockType&& block);
256
    bool _dequeue_data(LocalExchangeSourceLocalState* local_state, BlockType& block, bool* eos,
257
                       Block* data_block, int channel_id);
258
259
    void _enqueue_data_and_set_ready(int channel_id, BlockType&& block);
260
    bool _dequeue_data(BlockType& block, bool* eos, Block* data_block, int channel_id);
261
    std::vector<BlockQueue<BlockType>> _data_queue;
262
    std::vector<std::unique_ptr<std::mutex>> _m;
263
};
264
265
class LocalExchangeSourceLocalState;
266
class LocalExchangeSinkLocalState;
267
268
class ShuffleExchanger : public Exchanger<PartitionedBlock> {
269
public:
270
    ENABLE_FACTORY_CREATOR(ShuffleExchanger);
271
    ShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions,
272
                     int free_block_limit)
273
15.0k
            : Exchanger<PartitionedBlock>(running_sink_operators, num_sources, num_partitions,
274
15.0k
                                          free_block_limit) {
275
15.0k
        DCHECK_GT(num_partitions, 0);
276
15.0k
        DCHECK_GT(num_sources, 0);
277
15.0k
        _partition_rows_histogram.resize(running_sink_operators);
278
15.0k
    }
279
15.1k
    ~ShuffleExchanger() override = default;
280
    Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile,
281
                SinkInfo& sink_info) override;
282
283
    Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile,
284
                     SourceInfo&& source_info) override;
285
    void close(SourceInfo&& source_info) override;
286
365k
    ExchangeType get_type() const override { return ExchangeType::HASH_SHUFFLE; }
287
288
protected:
289
    Status _split_rows(RuntimeState* state, const std::vector<uint32_t>& channel_ids, Block* block,
290
                       int channel_id, LocalExchangeSinkLocalState* local_state,
291
                       std::map<int, int>* shuffle_idx_to_instance_idx);
292
    Status _split_rows(RuntimeState* state, const std::vector<uint32_t>& channel_ids, Block* block,
293
                       int channel_id);
294
    std::vector<std::vector<uint32_t>> _partition_rows_histogram;
295
};
296
297
class BucketShuffleExchanger final : public ShuffleExchanger {
298
    ENABLE_FACTORY_CREATOR(BucketShuffleExchanger);
299
    BucketShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions,
300
                           int free_block_limit)
301
361
            : ShuffleExchanger(running_sink_operators, num_sources, num_partitions,
302
361
                               free_block_limit) {}
303
    ~BucketShuffleExchanger() override = default;
304
23.6k
    ExchangeType get_type() const override { return ExchangeType::BUCKET_HASH_SHUFFLE; }
305
};
306
307
class PassthroughExchanger final : public Exchanger<BlockWrapperSPtr> {
308
public:
309
    ENABLE_FACTORY_CREATOR(PassthroughExchanger);
310
    PassthroughExchanger(int running_sink_operators, int num_partitions, int free_block_limit)
311
100k
            : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions,
312
100k
                                          free_block_limit) {}
313
    ~PassthroughExchanger() override = default;
314
    Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile,
315
                SinkInfo& sink_info) override;
316
317
    Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile,
318
                     SourceInfo&& source_info) override;
319
2.51M
    ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH; }
320
    void close(SourceInfo&& source_info) override;
321
};
322
323
class PassToOneExchanger final : public Exchanger<BlockWrapperSPtr> {
324
public:
325
    ENABLE_FACTORY_CREATOR(PassToOneExchanger);
326
    PassToOneExchanger(int running_sink_operators, int num_partitions, int free_block_limit)
327
1.65k
            : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions,
328
1.65k
                                          free_block_limit) {}
329
    ~PassToOneExchanger() override = default;
330
    Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile,
331
                SinkInfo& sink_info) override;
332
333
    Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile,
334
                     SourceInfo&& source_info) override;
335
43.8k
    ExchangeType get_type() const override { return ExchangeType::PASS_TO_ONE; }
336
    void close(SourceInfo&& source_info) override;
337
};
338
class BroadcastExchanger final : public Exchanger<BroadcastBlock> {
339
public:
340
    ENABLE_FACTORY_CREATOR(BroadcastExchanger);
341
    BroadcastExchanger(int running_sink_operators, int num_partitions, int free_block_limit)
342
996
            : Exchanger<BroadcastBlock>(running_sink_operators, num_partitions, free_block_limit) {}
343
    ~BroadcastExchanger() override = default;
344
    Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile,
345
                SinkInfo& sink_info) override;
346
347
    Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile,
348
                     SourceInfo&& source_info) override;
349
22.5k
    ExchangeType get_type() const override { return ExchangeType::BROADCAST; }
350
    void close(SourceInfo&& source_info) override;
351
};
352
353
//The code in AdaptivePassthroughExchanger is essentially
354
// a copy of ShuffleExchanger and PassthroughExchanger.
355
class AdaptivePassthroughExchanger : public Exchanger<BlockWrapperSPtr> {
356
public:
357
    ENABLE_FACTORY_CREATOR(AdaptivePassthroughExchanger);
358
    AdaptivePassthroughExchanger(int running_sink_operators, int num_partitions,
359
                                 int free_block_limit)
360
910
            : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions,
361
910
                                          free_block_limit) {
362
910
        _partition_rows_histogram.resize(running_sink_operators);
363
910
    }
364
    Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile,
365
                SinkInfo& sink_info) override;
366
367
    Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile,
368
                     SourceInfo&& source_info) override;
369
21.9k
    ExchangeType get_type() const override { return ExchangeType::ADAPTIVE_PASSTHROUGH; }
370
371
    void close(SourceInfo&& source_info) override;
372
373
private:
374
    Status _passthrough_sink(RuntimeState* state, Block* in_block, SinkInfo& sink_info);
375
    Status _shuffle_sink(RuntimeState* state, Block* in_block, SinkInfo& sink_info);
376
    Status _split_rows(RuntimeState* state, const std::vector<uint32_t>& channel_ids, Block* block,
377
                       SinkInfo& sink_info);
378
379
    std::atomic_bool _is_pass_through = false;
380
    std::atomic_int32_t _total_block = 0;
381
    std::vector<std::vector<uint32_t>> _partition_rows_histogram;
382
};
383
#include "common/compile_check_end.h"
384
} // namespace doris