Coverage Report

Created: 2026-03-15 17:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/exchange/local_exchanger.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include "exec/operator/operator.h"
21
#include "exec/pipeline/dependency.h"
22
23
namespace doris {
24
#include "common/compile_check_begin.h"
25
template <typename T>
26
void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks,
27
                  RuntimeProfile::Counter* memory_used_counter = nullptr);
28
29
class PartitionerBase;
30
class LocalExchangeSourceLocalState;
31
class LocalExchangeSinkLocalState;
32
33
struct Profile {
34
    RuntimeProfile::Counter* compute_hash_value_timer = nullptr;
35
    RuntimeProfile::Counter* distribute_timer = nullptr;
36
    RuntimeProfile::Counter* copy_data_timer = nullptr;
37
};
38
39
struct SinkInfo {
40
    int* channel_id;
41
    PartitionerBase* partitioner;
42
    LocalExchangeSinkLocalState* local_state;
43
    std::map<int, int>* shuffle_idx_to_instance_idx;
44
};
45
46
struct SourceInfo {
47
    int channel_id;
48
    LocalExchangeSourceLocalState* local_state;
49
};
50
/**
51
 * One exchanger is hold by one `LocalExchangeSharedState`. And one `LocalExchangeSharedState` is
52
 * shared by all local exchange sink operators and source operators with the same id.
53
 *
54
 * In exchanger, two block queues is maintained, one is data block queue and another is free block queue.
55
 *
56
 * In details, data block queue has queues as many as source operators. Each source operator will get
57
 * data block from the corresponding queue. Data blocks is push into the queue by sink operators. One
58
 * sink operator will push blocks into one or more queues.
59
 *
60
 * Free block is used to reuse the allocated memory. To reduce the memory limit, we also use a conf
61
 * to limit the size of free block queue.
62
 */
63
class ExchangerBase {
64
public:
65
    /**
66
     * `BlockWrapper` is used to wrap a data block with a reference count.
67
     *
68
     * In function `unref()`, if `ref_count` decremented to 0, which means this block is not needed by
69
     * operators, so we put it into `_free_blocks` to reuse its memory if needed and refresh memory usage
70
     * in current queue.
71
     *
72
     * Note: `ref_count` will be larger than 1 only if this block is shared between multiple queues in
73
     * shuffle exchanger.
74
     */
75
    class BlockWrapper {
76
    public:
77
        ENABLE_FACTORY_CREATOR(BlockWrapper);
78
        BlockWrapper(Block&& data_block, LocalExchangeSharedState* shared_state, int channel_id)
79
114
                : _data_block(std::move(data_block)),
80
114
                  _shared_state(shared_state),
81
114
                  _allocated_bytes(_data_block.allocated_bytes()) {
82
114
            if (_shared_state) {
83
114
                _shared_state->add_total_mem_usage(_allocated_bytes);
84
114
            }
85
114
        }
86
114
        ~BlockWrapper() {
87
114
            if (_shared_state != nullptr) {
88
114
                DCHECK_GT(_allocated_bytes, 0);
89
                // `_channel_ids` may be empty if exchanger is shuffled exchanger and channel id is
90
                // not used by `sub_total_mem_usage`. So we just pass -1 here.
91
114
                _shared_state->sub_total_mem_usage(_allocated_bytes);
92
114
                if (_shared_state->exchanger->_free_block_limit == 0 ||
93
114
                    _shared_state->exchanger->_free_blocks.size_approx() <
94
28
                            _shared_state->exchanger->_free_block_limit *
95
98
                                    _shared_state->exchanger->_num_sources) {
96
98
                    _data_block.clear_column_data();
97
                    // Free blocks is used to improve memory efficiency. Failure during pushing back
98
                    // free block will not incur any bad result so just ignore the return value.
99
98
                    _shared_state->exchanger->_free_blocks.enqueue(std::move(_data_block));
100
98
                }
101
114
            };
102
114
        }
103
161
        void record_channel_id(int channel_id) {
104
161
            _channel_ids.push_back(channel_id);
105
161
            if (_shared_state) {
106
161
                _shared_state->add_mem_usage(channel_id, _allocated_bytes);
107
161
            }
108
161
        }
109
110
    private:
111
        friend class ShuffleExchanger;
112
        friend class BucketShuffleExchanger;
113
        friend class PassthroughExchanger;
114
        friend class BroadcastExchanger;
115
        friend class PassToOneExchanger;
116
        friend class AdaptivePassthroughExchanger;
117
        template <typename BlockType>
118
        friend class Exchanger;
119
120
        Block _data_block;
121
        LocalExchangeSharedState* _shared_state;
122
        std::vector<int> _channel_ids;
123
        const size_t _allocated_bytes;
124
    };
125
    ExchangerBase(int running_sink_operators, int num_partitions, int free_block_limit)
126
4
            : _running_sink_operators(running_sink_operators),
127
4
              _running_source_operators(num_partitions),
128
4
              _num_partitions(num_partitions),
129
4
              _num_senders(running_sink_operators),
130
4
              _num_sources(num_partitions),
131
4
              _free_block_limit(free_block_limit) {}
132
    ExchangerBase(int running_sink_operators, int num_sources, int num_partitions,
133
                  int free_block_limit)
134
2
            : _running_sink_operators(running_sink_operators),
135
2
              _running_source_operators(num_sources),
136
2
              _num_partitions(num_partitions),
137
2
              _num_senders(running_sink_operators),
138
2
              _num_sources(num_sources),
139
2
              _free_block_limit(free_block_limit) {}
140
6
    virtual ~ExchangerBase() = default;
141
    virtual Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile,
142
                             SourceInfo&& source_info) = 0;
143
    virtual Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile,
144
                        SinkInfo& sink_info) = 0;
145
    virtual ExchangeType get_type() const = 0;
146
    // Called if a local exchanger source operator are closed. Free the unused data block in data_queue.
147
    virtual void close(SourceInfo&& source_info) = 0;
148
    // Called if all local exchanger source operators are closed. We free the memory in
149
    // `_free_blocks` here.
150
    virtual void finalize();
151
152
    virtual std::string data_queue_debug_string(int i) = 0;
153
154
0
    void set_low_memory_mode() {
155
0
        _free_block_limit = 0;
156
0
        clear_blocks(_free_blocks);
157
0
    }
158
159
protected:
160
    friend struct LocalExchangeSharedState;
161
    friend class LocalExchangeSourceLocalState;
162
    friend class LocalExchangeSinkOperatorX;
163
    friend class LocalExchangeSinkLocalState;
164
    std::atomic<int> _running_sink_operators = 0;
165
    std::atomic<int> _running_source_operators = 0;
166
    const int _num_partitions;
167
    const int _num_senders;
168
    const int _num_sources;
169
    std::atomic_int _free_block_limit = 0;
170
    moodycamel::ConcurrentQueue<Block> _free_blocks;
171
};
172
173
struct PartitionedRowIdxs {
174
    std::shared_ptr<PODArray<uint32_t>> row_idxs;
175
    uint32_t offset_start;
176
    uint32_t length;
177
};
178
179
using PartitionedBlock =
180
        std::pair<std::shared_ptr<ExchangerBase::BlockWrapper>, PartitionedRowIdxs>;
181
182
struct BroadcastRowRange {
183
    uint32_t offset_start;
184
    size_t length;
185
};
186
using BroadcastBlock = std::pair<std::shared_ptr<ExchangerBase::BlockWrapper>, BroadcastRowRange>;
187
188
template <typename BlockType>
189
struct BlockQueue {
190
    std::atomic<bool> eos = false;
191
    moodycamel::ConcurrentQueue<BlockType> data_queue;
192
    moodycamel::ProducerToken ptok {data_queue};
193
24
    BlockQueue() : eos(false), data_queue(moodycamel::ConcurrentQueue<BlockType>()) {}
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEEC2Ev
Line
Count
Source
193
8
    BlockQueue() : eos(false), data_queue(moodycamel::ConcurrentQueue<BlockType>()) {}
_ZN5doris10BlockQueueISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEEC2Ev
Line
Count
Source
193
12
    BlockQueue() : eos(false), data_queue(moodycamel::ConcurrentQueue<BlockType>()) {}
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEEC2Ev
Line
Count
Source
193
4
    BlockQueue() : eos(false), data_queue(moodycamel::ConcurrentQueue<BlockType>()) {}
194
    BlockQueue(BlockQueue<BlockType>&& other)
195
0
            : eos(other.eos.load()), data_queue(std::move(other.data_queue)) {}
Unexecuted instantiation: _ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEEC2EOS8_
Unexecuted instantiation: _ZN5doris10BlockQueueISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEEC2EOS5_
Unexecuted instantiation: _ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEEC2EOS8_
196
    inline bool enqueue(BlockType const& item) {
197
        if (!eos) {
198
            if (!data_queue.enqueue(ptok, item)) [[unlikely]] {
199
                throw Exception(ErrorCode::INTERNAL_ERROR,
200
                                "Exception occurs in data queue [size = {}] of local exchange.",
201
                                data_queue.size_approx());
202
            }
203
            return true;
204
        }
205
        return false;
206
    }
207
208
161
    inline bool enqueue(BlockType&& item) {
209
161
        if (!eos) {
210
129
            if (!data_queue.enqueue(ptok, std::move(item))) [[unlikely]] {
211
0
                throw Exception(ErrorCode::INTERNAL_ERROR,
212
0
                                "Exception occurs in data queue [size = {}] of local exchange.",
213
0
                                data_queue.size_approx());
214
0
            }
215
129
            return true;
216
129
        }
217
32
        return false;
218
161
    }
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE7enqueueEOS7_
Line
Count
Source
208
20
    inline bool enqueue(BlockType&& item) {
209
20
        if (!eos) {
210
16
            if (!data_queue.enqueue(ptok, std::move(item))) [[unlikely]] {
211
0
                throw Exception(ErrorCode::INTERNAL_ERROR,
212
0
                                "Exception occurs in data queue [size = {}] of local exchange.",
213
0
                                data_queue.size_approx());
214
0
            }
215
16
            return true;
216
16
        }
217
4
        return false;
218
20
    }
_ZN5doris10BlockQueueISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE7enqueueEOS4_
Line
Count
Source
208
77
    inline bool enqueue(BlockType&& item) {
209
77
        if (!eos) {
210
65
            if (!data_queue.enqueue(ptok, std::move(item))) [[unlikely]] {
211
0
                throw Exception(ErrorCode::INTERNAL_ERROR,
212
0
                                "Exception occurs in data queue [size = {}] of local exchange.",
213
0
                                data_queue.size_approx());
214
0
            }
215
65
            return true;
216
65
        }
217
12
        return false;
218
77
    }
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE7enqueueEOS7_
Line
Count
Source
208
64
    inline bool enqueue(BlockType&& item) {
209
64
        if (!eos) {
210
48
            if (!data_queue.enqueue(ptok, std::move(item))) [[unlikely]] {
211
0
                throw Exception(ErrorCode::INTERNAL_ERROR,
212
0
                                "Exception occurs in data queue [size = {}] of local exchange.",
213
0
                                data_queue.size_approx());
214
0
            }
215
48
            return true;
216
48
        }
217
16
        return false;
218
64
    }
219
220
230
    bool try_dequeue(BlockType& item) { return data_queue.try_dequeue(item); }
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE11try_dequeueERS7_
Line
Count
Source
220
36
    bool try_dequeue(BlockType& item) { return data_queue.try_dequeue(item); }
_ZN5doris10BlockQueueISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE11try_dequeueERS4_
Line
Count
Source
220
122
    bool try_dequeue(BlockType& item) { return data_queue.try_dequeue(item); }
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE11try_dequeueERS7_
Line
Count
Source
220
72
    bool try_dequeue(BlockType& item) { return data_queue.try_dequeue(item); }
221
222
20
    void set_eos() { eos = true; }
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE7set_eosEv
Line
Count
Source
222
4
    void set_eos() { eos = true; }
_ZN5doris10BlockQueueISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE7set_eosEv
Line
Count
Source
222
12
    void set_eos() { eos = true; }
_ZN5doris10BlockQueueISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE7set_eosEv
Line
Count
Source
222
4
    void set_eos() { eos = true; }
223
};
224
225
using BlockWrapperSPtr = std::shared_ptr<ExchangerBase::BlockWrapper>;
226
227
template <typename BlockType>
228
class Exchanger : public ExchangerBase {
229
public:
230
    Exchanger(int running_sink_operators, int num_partitions, int free_block_limit)
231
4
            : ExchangerBase(running_sink_operators, num_partitions, free_block_limit) {
232
4
        _data_queue.resize(num_partitions);
233
4
        _m.resize(num_partitions);
234
20
        for (size_t i = 0; i < num_partitions; i++) {
235
16
            _m[i] = std::make_unique<std::mutex>();
236
16
        }
237
4
    }
_ZN5doris9ExchangerISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEEC2Eiii
Line
Count
Source
231
3
            : ExchangerBase(running_sink_operators, num_partitions, free_block_limit) {
232
3
        _data_queue.resize(num_partitions);
233
3
        _m.resize(num_partitions);
234
15
        for (size_t i = 0; i < num_partitions; i++) {
235
12
            _m[i] = std::make_unique<std::mutex>();
236
12
        }
237
3
    }
_ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEEC2Eiii
Line
Count
Source
231
1
            : ExchangerBase(running_sink_operators, num_partitions, free_block_limit) {
232
1
        _data_queue.resize(num_partitions);
233
1
        _m.resize(num_partitions);
234
5
        for (size_t i = 0; i < num_partitions; i++) {
235
4
            _m[i] = std::make_unique<std::mutex>();
236
4
        }
237
1
    }
238
    Exchanger(int running_sink_operators, int num_sources, int num_partitions, int free_block_limit)
239
2
            : ExchangerBase(running_sink_operators, num_sources, num_partitions, free_block_limit) {
240
2
        _data_queue.resize(num_sources);
241
2
        _m.resize(num_sources);
242
10
        for (size_t i = 0; i < num_sources; i++) {
243
8
            _m[i] = std::make_unique<std::mutex>();
244
8
        }
245
2
    }
246
6
    ~Exchanger() override = default;
_ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEED2Ev
Line
Count
Source
246
2
    ~Exchanger() override = default;
_ZN5doris9ExchangerISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEED2Ev
Line
Count
Source
246
3
    ~Exchanger() override = default;
_ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEED2Ev
Line
Count
Source
246
1
    ~Exchanger() override = default;
247
0
    std::string data_queue_debug_string(int i) override {
248
0
        return fmt::format("Data Queue {}: [size approx = {}, eos = {}]", i,
249
0
                           _data_queue[i].data_queue.size_approx(), _data_queue[i].eos);
250
0
    }
Unexecuted instantiation: _ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_18PartitionedRowIdxsEEE23data_queue_debug_stringB5cxx11Ei
Unexecuted instantiation: _ZN5doris9ExchangerISt10shared_ptrINS_13ExchangerBase12BlockWrapperEEE23data_queue_debug_stringB5cxx11Ei
Unexecuted instantiation: _ZN5doris9ExchangerISt4pairISt10shared_ptrINS_13ExchangerBase12BlockWrapperEENS_17BroadcastRowRangeEEE23data_queue_debug_stringB5cxx11Ei
251
252
protected:
253
    // Enqueue data block and set downstream source operator to read.
254
    void _enqueue_data_and_set_ready(int channel_id, LocalExchangeSinkLocalState* local_state,
255
                                     BlockType&& block);
256
    bool _dequeue_data(LocalExchangeSourceLocalState* local_state, BlockType& block, bool* eos,
257
                       Block* data_block, int channel_id);
258
259
    void _enqueue_data_and_set_ready(int channel_id, BlockType&& block);
260
    bool _dequeue_data(BlockType& block, bool* eos, Block* data_block, int channel_id);
261
    std::vector<BlockQueue<BlockType>> _data_queue;
262
    std::vector<std::unique_ptr<std::mutex>> _m;
263
};
264
265
class LocalExchangeSourceLocalState;
266
class LocalExchangeSinkLocalState;
267
268
class ShuffleExchanger : public Exchanger<PartitionedBlock> {
269
public:
270
    ENABLE_FACTORY_CREATOR(ShuffleExchanger);
271
    ShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions,
272
                     int free_block_limit)
273
2
            : Exchanger<PartitionedBlock>(running_sink_operators, num_sources, num_partitions,
274
2
                                          free_block_limit) {
275
2
        DCHECK_GT(num_partitions, 0);
276
2
        DCHECK_GT(num_sources, 0);
277
2
        _partition_rows_histogram.resize(running_sink_operators);
278
2
    }
279
2
    ~ShuffleExchanger() override = default;
280
    Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile,
281
                SinkInfo& sink_info) override;
282
283
    Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile,
284
                     SourceInfo&& source_info) override;
285
    void close(SourceInfo&& source_info) override;
286
1
    ExchangeType get_type() const override { return ExchangeType::HASH_SHUFFLE; }
287
288
protected:
289
    Status _split_rows(RuntimeState* state, const std::vector<uint32_t>& channel_ids, Block* block,
290
                       int channel_id, LocalExchangeSinkLocalState* local_state,
291
                       std::map<int, int>* shuffle_idx_to_instance_idx);
292
    Status _split_rows(RuntimeState* state, const std::vector<uint32_t>& channel_ids, Block* block,
293
                       int channel_id);
294
    std::vector<std::vector<uint32_t>> _partition_rows_histogram;
295
};
296
297
class BucketShuffleExchanger final : public ShuffleExchanger {
298
    ENABLE_FACTORY_CREATOR(BucketShuffleExchanger);
299
    BucketShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions,
300
                           int free_block_limit)
301
0
            : ShuffleExchanger(running_sink_operators, num_sources, num_partitions,
302
0
                               free_block_limit) {}
303
0
    ~BucketShuffleExchanger() override = default;
304
0
    ExchangeType get_type() const override { return ExchangeType::BUCKET_HASH_SHUFFLE; }
305
};
306
307
class PassthroughExchanger final : public Exchanger<BlockWrapperSPtr> {
308
public:
309
    ENABLE_FACTORY_CREATOR(PassthroughExchanger);
310
    PassthroughExchanger(int running_sink_operators, int num_partitions, int free_block_limit)
311
1
            : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions,
312
1
                                          free_block_limit) {}
313
1
    ~PassthroughExchanger() override = default;
314
    Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile,
315
                SinkInfo& sink_info) override;
316
317
    Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile,
318
                     SourceInfo&& source_info) override;
319
0
    ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH; }
320
    void close(SourceInfo&& source_info) override;
321
};
322
323
class PassToOneExchanger final : public Exchanger<BlockWrapperSPtr> {
324
public:
325
    ENABLE_FACTORY_CREATOR(PassToOneExchanger);
326
    PassToOneExchanger(int running_sink_operators, int num_partitions, int free_block_limit)
327
1
            : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions,
328
1
                                          free_block_limit) {}
329
1
    ~PassToOneExchanger() override = default;
330
    Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile,
331
                SinkInfo& sink_info) override;
332
333
    Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile,
334
                     SourceInfo&& source_info) override;
335
0
    ExchangeType get_type() const override { return ExchangeType::PASS_TO_ONE; }
336
    void close(SourceInfo&& source_info) override;
337
};
338
class BroadcastExchanger final : public Exchanger<BroadcastBlock> {
339
public:
340
    ENABLE_FACTORY_CREATOR(BroadcastExchanger);
341
    BroadcastExchanger(int running_sink_operators, int num_partitions, int free_block_limit)
342
1
            : Exchanger<BroadcastBlock>(running_sink_operators, num_partitions, free_block_limit) {}
343
1
    ~BroadcastExchanger() override = default;
344
    Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile,
345
                SinkInfo& sink_info) override;
346
347
    Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile,
348
                     SourceInfo&& source_info) override;
349
0
    ExchangeType get_type() const override { return ExchangeType::BROADCAST; }
350
    void close(SourceInfo&& source_info) override;
351
};
352
353
//The code in AdaptivePassthroughExchanger is essentially
354
// a copy of ShuffleExchanger and PassthroughExchanger.
355
class AdaptivePassthroughExchanger : public Exchanger<BlockWrapperSPtr> {
356
public:
357
    ENABLE_FACTORY_CREATOR(AdaptivePassthroughExchanger);
358
    AdaptivePassthroughExchanger(int running_sink_operators, int num_partitions,
359
                                 int free_block_limit)
360
1
            : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions,
361
1
                                          free_block_limit) {
362
1
        _partition_rows_histogram.resize(running_sink_operators);
363
1
    }
364
    Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile,
365
                SinkInfo& sink_info) override;
366
367
    Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile,
368
                     SourceInfo&& source_info) override;
369
0
    ExchangeType get_type() const override { return ExchangeType::ADAPTIVE_PASSTHROUGH; }
370
371
    void close(SourceInfo&& source_info) override;
372
373
private:
374
    Status _passthrough_sink(RuntimeState* state, Block* in_block, SinkInfo& sink_info);
375
    Status _shuffle_sink(RuntimeState* state, Block* in_block, SinkInfo& sink_info);
376
    Status _split_rows(RuntimeState* state, const std::vector<uint32_t>& channel_ids, Block* block,
377
                       SinkInfo& sink_info);
378
379
    std::atomic_bool _is_pass_through = false;
380
    std::atomic_int32_t _total_block = 0;
381
    std::vector<std::vector<uint32_t>> _partition_rows_histogram;
382
};
383
#include "common/compile_check_end.h"
384
} // namespace doris