Coverage Report

Created: 2026-04-16 14:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/operator/exchange_sink_buffer.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <brpc/controller.h>
21
#include <gen_cpp/data.pb.h>
22
#include <gen_cpp/internal_service.pb.h>
23
#include <gen_cpp/types.pb.h>
24
#include <parallel_hashmap/phmap.h>
25
26
#include <atomic>
27
#include <cstdint>
28
#include <list>
29
#include <memory>
30
#include <mutex>
31
#include <queue>
32
#include <stack>
33
#include <string>
34
35
#include "common/global_types.h"
36
#include "common/status.h"
37
#include "runtime/runtime_state.h"
38
#include "service/backend_options.h"
39
#include "util/brpc_closure.h"
40
41
namespace doris {
42
class PTransmitDataParams;
43
class TUniqueId;
44
45
using InstanceLoId = int64_t;
46
47
class Dependency;
48
class ExchangeSinkLocalState;
49
50
class Channel;
51
52
// We use BroadcastPBlockHolder to hold a broadcasted PBlock. For broadcast shuffle, one PBlock
53
// will be shared between different channel, so we have to use a ref count to mark if this
54
// PBlock is available for next serialization.
55
class BroadcastPBlockHolderMemLimiter;
56
class BroadcastPBlockHolder {
57
    ENABLE_FACTORY_CREATOR(BroadcastPBlockHolder);
58
59
public:
60
2
    BroadcastPBlockHolder() { _pblock = std::make_unique<PBlock>(); }
61
    ~BroadcastPBlockHolder();
62
63
11
    PBlock* get_block() { return _pblock.get(); }
64
65
0
    void reset_block() { _pblock->Clear(); }
66
67
private:
68
    friend class BroadcastPBlockHolderMemLimiter;
69
    std::unique_ptr<PBlock> _pblock;
70
    std::weak_ptr<BroadcastPBlockHolderMemLimiter> _parent_creator;
71
2
    void set_parent_creator(std::shared_ptr<BroadcastPBlockHolderMemLimiter> parent_creator) {
72
2
        _parent_creator = parent_creator;
73
2
    }
74
};
75
76
class BroadcastPBlockHolderMemLimiter
77
        : public std::enable_shared_from_this<BroadcastPBlockHolderMemLimiter> {
78
    ENABLE_FACTORY_CREATOR(BroadcastPBlockHolderMemLimiter);
79
80
public:
81
    BroadcastPBlockHolderMemLimiter() = delete;
82
83
    BroadcastPBlockHolderMemLimiter(std::shared_ptr<Dependency>& broadcast_dependency)
84
3
            : _total_queue_buffer_size_limit(config::exchg_node_buffer_size_bytes),
85
3
              _total_queue_blocks_count_limit(config::num_broadcast_buffer) {
86
3
        _broadcast_dependency = broadcast_dependency;
87
3
    }
88
89
0
    void set_low_memory_mode() {
90
0
        _total_queue_buffer_size_limit = 1024 * 1024;
91
0
        _total_queue_blocks_count_limit = 8;
92
0
    }
93
94
    void acquire(BroadcastPBlockHolder& holder);
95
    void release(const BroadcastPBlockHolder& holder);
96
97
private:
98
    std::atomic_int64_t _total_queue_buffer_size_limit {0};
99
    std::atomic_int64_t _total_queue_blocks_count_limit {0};
100
    std::atomic_int64_t _total_queue_buffer_size {0};
101
    std::atomic_int64_t _total_queue_blocks_count {0};
102
    std::shared_ptr<Dependency> _broadcast_dependency;
103
    std::mutex _holders_lock;
104
};
105
106
struct TransmitInfo {
107
    std::unique_ptr<PBlock> block;
108
    bool eos;
109
};
110
111
struct BroadcastTransmitInfo {
112
    std::shared_ptr<BroadcastPBlockHolder> block_holder = nullptr;
113
    bool eos;
114
};
115
116
struct RpcInstanceStatistics {
117
    int64_t rpc_count = 0;
118
    int64_t max_time = 0;
119
    int64_t min_time = INT64_MAX;
120
    int64_t sum_time = 0;
121
};
122
123
// Consolidated structure for RPC instance data
124
struct RpcInstance {
125
    // Constructor initializes the instance with the given ID
126
14
    RpcInstance(InstanceLoId id) : id(id) {}
127
128
    // Unique identifier for this RPC instance
129
    InstanceLoId id;
130
131
    // Mutex for thread-safe access to this instance's data
132
    std::unique_ptr<std::mutex> mutex;
133
134
    // Sequence number for RPC packets, incremented for each packet sent
135
    int64_t seq = 0;
136
137
    // Queue for regular data transmission requests
138
    std::unordered_map<Channel*, std::queue<TransmitInfo, std::list<TransmitInfo>>> package_queue;
139
140
    // Queue for broadcast data transmission requests
141
    std::unordered_map<Channel*,
142
                       std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>>
143
            broadcast_package_queue;
144
145
    // RPC request parameters for data transmission
146
    std::shared_ptr<PTransmitDataParams> request;
147
148
    // Flag indicating if the RPC channel is currently idle (no active RPC)
149
    bool rpc_channel_is_idle = true;
150
151
    // Flag indicating if the RPC channel has been turned off (no more RPCs will be sent)
152
    bool rpc_channel_is_turn_off = false;
153
154
    // Statistics for monitoring RPC performance (latency, counts, etc.)
155
    RpcInstanceStatistics stats;
156
157
    // Count of active exchange sinks using this RPC instance
158
    int64_t running_sink_count = 0;
159
};
160
161
template <typename Response>
162
class ExchangeSendCallback : public ::doris::DummyBrpcCallback<Response> {
163
    ENABLE_FACTORY_CREATOR(ExchangeSendCallback);
164
165
public:
166
22
    ExchangeSendCallback() = default;
167
168
28
    void init(RpcInstance* ins, bool eos) {
169
28
        _ins = ins;
170
28
        _eos = eos;
171
28
    }
172
173
22
    ~ExchangeSendCallback() override = default;
174
    ExchangeSendCallback(const ExchangeSendCallback& other) = delete;
175
    ExchangeSendCallback& operator=(const ExchangeSendCallback& other) = delete;
176
28
    void addFailedHandler(const std::function<void(RpcInstance*, const std::string&)>& fail_fn) {
177
28
        _fail_fn = fail_fn;
178
28
    }
179
    void addSuccessHandler(const std::function<void(RpcInstance*, const bool&, const Response&,
180
28
                                                    const int64_t&)>& suc_fn) {
181
28
        _suc_fn = suc_fn;
182
28
    }
183
184
28
    void call() noexcept override {
185
28
        try {
186
28
            if (::doris::DummyBrpcCallback<Response>::cntl_->Failed()) {
187
1
                std::string err = fmt::format(
188
1
                        "failed to send brpc when exchange, error={}, error_text={}, client: {}, "
189
1
                        "latency = {}",
190
1
                        berror(::doris::DummyBrpcCallback<Response>::cntl_->ErrorCode()),
191
1
                        ::doris::DummyBrpcCallback<Response>::cntl_->ErrorText(),
192
1
                        BackendOptions::get_localhost(),
193
1
                        ::doris::DummyBrpcCallback<Response>::cntl_->latency_us());
194
1
                _fail_fn(_ins, err);
195
27
            } else {
196
27
                _suc_fn(_ins, _eos, *(::doris::DummyBrpcCallback<Response>::response_),
197
27
                        start_rpc_time);
198
27
            }
199
28
        } catch (const std::exception& exp) {
200
0
            LOG(FATAL) << "brpc callback error: " << exp.what();
201
0
        } catch (...) {
202
0
            LOG(FATAL) << "brpc callback error.";
203
0
            __builtin_unreachable();
204
0
        }
205
28
    }
206
    int64_t start_rpc_time;
207
208
private:
209
    std::function<void(RpcInstance*, const std::string&)> _fail_fn;
210
    std::function<void(RpcInstance*, const bool&, const Response&, const int64_t&)> _suc_fn;
211
    RpcInstance* _ins;
212
    bool _eos;
213
};
214
215
// ExchangeSinkBuffer can either be shared among multiple ExchangeSinkLocalState instances
216
// or be individually owned by each ExchangeSinkLocalState.
217
// The following describes the scenario where ExchangeSinkBuffer is shared among multiple ExchangeSinkLocalState instances.
218
// Of course, individual ownership can be seen as a special case where only one ExchangeSinkLocalState shares the buffer.
219
220
// A sink buffer contains multiple rpc_channels.
221
// Each rpc_channel corresponds to a target instance on the receiving side.
222
// Data is sent using a ping-pong mode within each rpc_channel,
223
// meaning that at most one RPC can exist in a single rpc_channel at a time.
224
// The next RPC can only be sent after the previous one has completed.
225
//
226
// Each exchange sink sends data to all target instances on the receiving side.
227
// If the concurrency is 3, a single rpc_channel will be used simultaneously by three exchange sinks.
228
229
/*                                                                                                                                                                                                                                                                                                                          
230
                          +-----------+          +-----------+        +-----------+      
231
                          |dest ins id|          |dest ins id|        |dest ins id|      
232
                          |           |          |           |        |           |      
233
                          +----+------+          +-----+-----+        +------+----+      
234
                               |                       |                     |           
235
                               |                       |                     |           
236
                      +----------------+      +----------------+     +----------------+  
237
                      |                |      |                |     |                |  
238
 sink buffer -------- |   rpc_channel  |      |  rpc_channel   |     |  rpc_channel   |  
239
                      |                |      |                |     |                |  
240
                      +-------+--------+      +----------------+     +----------------+  
241
                              |                        |                      |          
242
                              |------------------------+----------------------+          
243
                              |                        |                      |          
244
                              |                        |                      |          
245
                     +-----------------+       +-------+---------+    +-------+---------+
246
                     |                 |       |                 |    |                 |
247
                     |  exchange sink  |       |  exchange sink  |    |  exchange sink  |
248
                     |                 |       |                 |    |                 |
249
                     +-----------------+       +-----------------+    +-----------------+
250
*/
251
252
#if defined(BE_TEST) && !defined(BE_BENCHMARK)
253
void transmit_blockv2(PBackendService_Stub* stub,
254
                      std::unique_ptr<AutoReleaseClosure<PTransmitDataParams,
255
                                                         ExchangeSendCallback<PTransmitDataResult>>>
256
                              closure);
257
#endif
258
class ExchangeSinkBuffer : public HasTaskExecutionCtx {
259
public:
260
    ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, PlanNodeId node_id,
261
                       RuntimeState* state, const std::vector<InstanceLoId>& sender_ins_ids);
262
#ifdef BE_TEST
263
    ExchangeSinkBuffer(RuntimeState* state, int64_t sinknum)
264
4
            : HasTaskExecutionCtx(state), _state(state), _exchange_sink_num(sinknum) {};
265
#endif
266
267
10
    ~ExchangeSinkBuffer() override = default;
268
269
    void construct_request(TUniqueId);
270
271
    Status add_block(Channel* channel, TransmitInfo&& request);
272
    Status add_block(Channel* channel, BroadcastTransmitInfo&& request);
273
    void close();
274
    void update_rpc_time(RpcInstance& ins, int64_t start_rpc_time, int64_t receive_rpc_time);
275
    void update_profile(RuntimeProfile* profile);
276
277
    void set_dependency(InstanceLoId sender_ins_id, std::shared_ptr<Dependency> queue_dependency,
278
3
                        ExchangeSinkLocalState* local_state) {
279
3
        std::lock_guard l(_m);
280
3
        _queue_deps.push_back(queue_dependency);
281
3
        _parents.push_back(local_state);
282
3
    }
283
284
0
    void set_low_memory_mode() { _queue_capacity = 8; }
285
    std::string debug_each_instance_queue_size();
286
#ifdef BE_TEST
287
public:
288
#else
289
private:
290
#endif
291
    friend class ExchangeSinkLocalState;
292
293
    // Single map to store all RPC instance data
294
    phmap::flat_hash_map<InstanceLoId, std::unique_ptr<RpcInstance>> _rpc_instances;
295
    std::atomic<size_t> _queue_capacity;
296
297
    // It is set to true only when an RPC fails. Currently, we do not have an error retry mechanism.
298
    // If an RPC error occurs, the query will be canceled.
299
    std::atomic<bool> _is_failed;
300
    PUniqueId _query_id;
301
    PlanNodeId _dest_node_id;
302
303
    PlanNodeId _node_id;
304
    std::atomic<int64_t> _rpc_count = 0;
305
    // The state may be from PipelineFragmentContext if it is shared among multi instances.
306
    RuntimeState* _state = nullptr;
307
    QueryContext* _context = nullptr;
308
309
    Status _send_rpc(RpcInstance& ins);
310
311
#ifndef BE_TEST
312
    inline void _ended(RpcInstance& ins);
313
    inline void _failed(InstanceLoId id, const std::string& err);
314
    inline void _set_receiver_eof(RpcInstance& ins);
315
    inline void _turn_off_channel(RpcInstance& ins, std::unique_lock<std::mutex>& with_lock);
316
317
#else
318
    virtual void _ended(RpcInstance& ins);
319
    virtual void _failed(InstanceLoId id, const std::string& err);
320
    virtual void _set_receiver_eof(RpcInstance& ins);
321
    virtual void _turn_off_channel(RpcInstance& ins, std::unique_lock<std::mutex>& with_lock);
322
#endif
323
324
    void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time);
325
    int64_t get_sum_rpc_time();
326
327
    // _total_queue_size is the sum of the sizes of all instance_to_package_queues.
328
    // Any modification to instance_to_package_queue requires a corresponding modification to _total_queue_size.
329
    std::atomic<int> _total_queue_size = 0;
330
331
    // protected the `_queue_deps` and `_parents`
332
    std::mutex _m;
333
    // _queue_deps is used for memory control.
334
    std::vector<std::shared_ptr<Dependency>> _queue_deps;
335
    // The ExchangeSinkLocalState in _parents is only used in _turn_off_channel.
336
    std::vector<ExchangeSinkLocalState*> _parents;
337
    const int64_t _exchange_sink_num;
338
    bool _send_multi_blocks = false;
339
    int _send_multi_blocks_byte_size = 256 * 1024;
340
};
341
342
} // namespace doris