Coverage Report

Created: 2026-03-16 21:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/operator/exchange_sink_buffer.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <brpc/controller.h>
21
#include <gen_cpp/data.pb.h>
22
#include <gen_cpp/internal_service.pb.h>
23
#include <gen_cpp/types.pb.h>
24
#include <parallel_hashmap/phmap.h>
25
26
#include <atomic>
27
#include <cstdint>
28
#include <list>
29
#include <memory>
30
#include <mutex>
31
#include <queue>
32
#include <stack>
33
#include <string>
34
35
#include "common/global_types.h"
36
#include "common/status.h"
37
#include "runtime/runtime_state.h"
38
#include "service/backend_options.h"
39
#include "util/brpc_closure.h"
40
41
namespace doris {
42
#include "common/compile_check_begin.h"
43
class PTransmitDataParams;
44
class TUniqueId;
45
46
using InstanceLoId = int64_t;
47
48
class Dependency;
49
class ExchangeSinkLocalState;
50
51
class Channel;
52
53
// We use BroadcastPBlockHolder to hold a broadcasted PBlock. For broadcast shuffle, one PBlock
54
// will be shared between different channel, so we have to use a ref count to mark if this
55
// PBlock is available for next serialization.
56
class BroadcastPBlockHolderMemLimiter;
57
class BroadcastPBlockHolder {
58
    ENABLE_FACTORY_CREATOR(BroadcastPBlockHolder);
59
60
public:
61
2
    BroadcastPBlockHolder() { _pblock = std::make_unique<PBlock>(); }
62
    ~BroadcastPBlockHolder();
63
64
11
    PBlock* get_block() { return _pblock.get(); }
65
66
0
    void reset_block() { _pblock->Clear(); }
67
68
private:
69
    friend class BroadcastPBlockHolderMemLimiter;
70
    std::unique_ptr<PBlock> _pblock;
71
    std::weak_ptr<BroadcastPBlockHolderMemLimiter> _parent_creator;
72
2
    void set_parent_creator(std::shared_ptr<BroadcastPBlockHolderMemLimiter> parent_creator) {
73
2
        _parent_creator = parent_creator;
74
2
    }
75
};
76
77
class BroadcastPBlockHolderMemLimiter
78
        : public std::enable_shared_from_this<BroadcastPBlockHolderMemLimiter> {
79
    ENABLE_FACTORY_CREATOR(BroadcastPBlockHolderMemLimiter);
80
81
public:
82
    BroadcastPBlockHolderMemLimiter() = delete;
83
84
    BroadcastPBlockHolderMemLimiter(std::shared_ptr<Dependency>& broadcast_dependency)
85
3
            : _total_queue_buffer_size_limit(config::exchg_node_buffer_size_bytes),
86
3
              _total_queue_blocks_count_limit(config::num_broadcast_buffer) {
87
3
        _broadcast_dependency = broadcast_dependency;
88
3
    }
89
90
0
    void set_low_memory_mode() {
91
0
        _total_queue_buffer_size_limit = 1024 * 1024;
92
0
        _total_queue_blocks_count_limit = 8;
93
0
    }
94
95
    void acquire(BroadcastPBlockHolder& holder);
96
    void release(const BroadcastPBlockHolder& holder);
97
98
private:
99
    std::atomic_int64_t _total_queue_buffer_size_limit {0};
100
    std::atomic_int64_t _total_queue_blocks_count_limit {0};
101
    std::atomic_int64_t _total_queue_buffer_size {0};
102
    std::atomic_int64_t _total_queue_blocks_count {0};
103
    std::shared_ptr<Dependency> _broadcast_dependency;
104
    std::mutex _holders_lock;
105
};
106
107
struct TransmitInfo {
108
    std::unique_ptr<PBlock> block;
109
    bool eos;
110
};
111
112
struct BroadcastTransmitInfo {
113
    std::shared_ptr<BroadcastPBlockHolder> block_holder = nullptr;
114
    bool eos;
115
};
116
117
struct RpcInstanceStatistics {
118
    int64_t rpc_count = 0;
119
    int64_t max_time = 0;
120
    int64_t min_time = INT64_MAX;
121
    int64_t sum_time = 0;
122
};
123
124
// Consolidated structure for RPC instance data
125
struct RpcInstance {
126
    // Constructor initializes the instance with the given ID
127
14
    RpcInstance(InstanceLoId id) : id(id) {}
128
129
    // Unique identifier for this RPC instance
130
    InstanceLoId id;
131
132
    // Mutex for thread-safe access to this instance's data
133
    std::unique_ptr<std::mutex> mutex;
134
135
    // Sequence number for RPC packets, incremented for each packet sent
136
    int64_t seq = 0;
137
138
    // Queue for regular data transmission requests
139
    std::unordered_map<Channel*, std::queue<TransmitInfo, std::list<TransmitInfo>>> package_queue;
140
141
    // Queue for broadcast data transmission requests
142
    std::unordered_map<Channel*,
143
                       std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>>
144
            broadcast_package_queue;
145
146
    // RPC request parameters for data transmission
147
    std::shared_ptr<PTransmitDataParams> request;
148
149
    // Flag indicating if the RPC channel is currently idle (no active RPC)
150
    bool rpc_channel_is_idle = true;
151
152
    // Flag indicating if the RPC channel has been turned off (no more RPCs will be sent)
153
    bool rpc_channel_is_turn_off = false;
154
155
    // Statistics for monitoring RPC performance (latency, counts, etc.)
156
    RpcInstanceStatistics stats;
157
158
    // Count of active exchange sinks using this RPC instance
159
    int64_t running_sink_count = 0;
160
};
161
162
template <typename Response>
163
class ExchangeSendCallback : public ::doris::DummyBrpcCallback<Response> {
164
    ENABLE_FACTORY_CREATOR(ExchangeSendCallback);
165
166
public:
167
22
    ExchangeSendCallback() = default;
168
169
28
    void init(RpcInstance* ins, bool eos) {
170
28
        _ins = ins;
171
28
        _eos = eos;
172
28
    }
173
174
22
    ~ExchangeSendCallback() override = default;
175
    ExchangeSendCallback(const ExchangeSendCallback& other) = delete;
176
    ExchangeSendCallback& operator=(const ExchangeSendCallback& other) = delete;
177
28
    void addFailedHandler(const std::function<void(RpcInstance*, const std::string&)>& fail_fn) {
178
28
        _fail_fn = fail_fn;
179
28
    }
180
    void addSuccessHandler(const std::function<void(RpcInstance*, const bool&, const Response&,
181
28
                                                    const int64_t&)>& suc_fn) {
182
28
        _suc_fn = suc_fn;
183
28
    }
184
185
28
    void call() noexcept override {
186
28
        try {
187
28
            if (::doris::DummyBrpcCallback<Response>::cntl_->Failed()) {
188
1
                std::string err = fmt::format(
189
1
                        "failed to send brpc when exchange, error={}, error_text={}, client: {}, "
190
1
                        "latency = {}",
191
1
                        berror(::doris::DummyBrpcCallback<Response>::cntl_->ErrorCode()),
192
1
                        ::doris::DummyBrpcCallback<Response>::cntl_->ErrorText(),
193
1
                        BackendOptions::get_localhost(),
194
1
                        ::doris::DummyBrpcCallback<Response>::cntl_->latency_us());
195
1
                _fail_fn(_ins, err);
196
27
            } else {
197
27
                _suc_fn(_ins, _eos, *(::doris::DummyBrpcCallback<Response>::response_),
198
27
                        start_rpc_time);
199
27
            }
200
28
        } catch (const std::exception& exp) {
201
0
            LOG(FATAL) << "brpc callback error: " << exp.what();
202
0
        } catch (...) {
203
0
            LOG(FATAL) << "brpc callback error.";
204
0
            __builtin_unreachable();
205
0
        }
206
28
    }
207
    int64_t start_rpc_time;
208
209
private:
210
    std::function<void(RpcInstance*, const std::string&)> _fail_fn;
211
    std::function<void(RpcInstance*, const bool&, const Response&, const int64_t&)> _suc_fn;
212
    RpcInstance* _ins;
213
    bool _eos;
214
};
215
216
// ExchangeSinkBuffer can either be shared among multiple ExchangeSinkLocalState instances
217
// or be individually owned by each ExchangeSinkLocalState.
218
// The following describes the scenario where ExchangeSinkBuffer is shared among multiple ExchangeSinkLocalState instances.
219
// Of course, individual ownership can be seen as a special case where only one ExchangeSinkLocalState shares the buffer.
220
221
// A sink buffer contains multiple rpc_channels.
222
// Each rpc_channel corresponds to a target instance on the receiving side.
223
// Data is sent using a ping-pong mode within each rpc_channel,
224
// meaning that at most one RPC can exist in a single rpc_channel at a time.
225
// The next RPC can only be sent after the previous one has completed.
226
//
227
// Each exchange sink sends data to all target instances on the receiving side.
228
// If the concurrency is 3, a single rpc_channel will be used simultaneously by three exchange sinks.
229
230
/*                                                                                                                                                                                                                                                                                                                          
231
                          +-----------+          +-----------+        +-----------+      
232
                          |dest ins id|          |dest ins id|        |dest ins id|      
233
                          |           |          |           |        |           |      
234
                          +----+------+          +-----+-----+        +------+----+      
235
                               |                       |                     |           
236
                               |                       |                     |           
237
                      +----------------+      +----------------+     +----------------+  
238
                      |                |      |                |     |                |  
239
 sink buffer -------- |   rpc_channel  |      |  rpc_channel   |     |  rpc_channel   |  
240
                      |                |      |                |     |                |  
241
                      +-------+--------+      +----------------+     +----------------+  
242
                              |                        |                      |          
243
                              |------------------------+----------------------+          
244
                              |                        |                      |          
245
                              |                        |                      |          
246
                     +-----------------+       +-------+---------+    +-------+---------+
247
                     |                 |       |                 |    |                 |
248
                     |  exchange sink  |       |  exchange sink  |    |  exchange sink  |
249
                     |                 |       |                 |    |                 |
250
                     +-----------------+       +-----------------+    +-----------------+
251
*/
252
253
#if defined(BE_TEST) && !defined(BE_BENCHMARK)
254
void transmit_blockv2(PBackendService_Stub* stub,
255
                      std::unique_ptr<AutoReleaseClosure<PTransmitDataParams,
256
                                                         ExchangeSendCallback<PTransmitDataResult>>>
257
                              closure);
258
#endif
259
class ExchangeSinkBuffer : public HasTaskExecutionCtx {
260
public:
261
    ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, PlanNodeId node_id,
262
                       RuntimeState* state, const std::vector<InstanceLoId>& sender_ins_ids);
263
#ifdef BE_TEST
264
    ExchangeSinkBuffer(RuntimeState* state, int64_t sinknum)
265
4
            : HasTaskExecutionCtx(state), _state(state), _exchange_sink_num(sinknum) {};
266
#endif
267
268
10
    ~ExchangeSinkBuffer() override = default;
269
270
    void construct_request(TUniqueId);
271
272
    Status add_block(Channel* channel, TransmitInfo&& request);
273
    Status add_block(Channel* channel, BroadcastTransmitInfo&& request);
274
    void close();
275
    void update_rpc_time(RpcInstance& ins, int64_t start_rpc_time, int64_t receive_rpc_time);
276
    void update_profile(RuntimeProfile* profile);
277
278
    void set_dependency(InstanceLoId sender_ins_id, std::shared_ptr<Dependency> queue_dependency,
279
3
                        ExchangeSinkLocalState* local_state) {
280
3
        std::lock_guard l(_m);
281
3
        _queue_deps.push_back(queue_dependency);
282
3
        _parents.push_back(local_state);
283
3
    }
284
285
0
    void set_low_memory_mode() { _queue_capacity = 8; }
286
    std::string debug_each_instance_queue_size();
287
#ifdef BE_TEST
288
public:
289
#else
290
private:
291
#endif
292
    friend class ExchangeSinkLocalState;
293
294
    // Single map to store all RPC instance data
295
    phmap::flat_hash_map<InstanceLoId, std::unique_ptr<RpcInstance>> _rpc_instances;
296
    std::atomic<size_t> _queue_capacity;
297
298
    // It is set to true only when an RPC fails. Currently, we do not have an error retry mechanism.
299
    // If an RPC error occurs, the query will be canceled.
300
    std::atomic<bool> _is_failed;
301
    PUniqueId _query_id;
302
    PlanNodeId _dest_node_id;
303
304
    PlanNodeId _node_id;
305
    std::atomic<int64_t> _rpc_count = 0;
306
    // The state may be from PipelineFragmentContext if it is shared among multi instances.
307
    RuntimeState* _state = nullptr;
308
    QueryContext* _context = nullptr;
309
310
    Status _send_rpc(RpcInstance& ins);
311
312
#ifndef BE_TEST
313
    inline void _ended(RpcInstance& ins);
314
    inline void _failed(InstanceLoId id, const std::string& err);
315
    inline void _set_receiver_eof(RpcInstance& ins);
316
    inline void _turn_off_channel(RpcInstance& ins, std::unique_lock<std::mutex>& with_lock);
317
318
#else
319
    virtual void _ended(RpcInstance& ins);
320
    virtual void _failed(InstanceLoId id, const std::string& err);
321
    virtual void _set_receiver_eof(RpcInstance& ins);
322
    virtual void _turn_off_channel(RpcInstance& ins, std::unique_lock<std::mutex>& with_lock);
323
#endif
324
325
    void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time);
326
    int64_t get_sum_rpc_time();
327
328
    // _total_queue_size is the sum of the sizes of all instance_to_package_queues.
329
    // Any modification to instance_to_package_queue requires a corresponding modification to _total_queue_size.
330
    std::atomic<int> _total_queue_size = 0;
331
332
    // protected the `_queue_deps` and `_parents`
333
    std::mutex _m;
334
    // _queue_deps is used for memory control.
335
    std::vector<std::shared_ptr<Dependency>> _queue_deps;
336
    // The ExchangeSinkLocalState in _parents is only used in _turn_off_channel.
337
    std::vector<ExchangeSinkLocalState*> _parents;
338
    const int64_t _exchange_sink_num;
339
    bool _send_multi_blocks = false;
340
    int _send_multi_blocks_byte_size = 256 * 1024;
341
};
342
343
#include "common/compile_check_end.h"
344
} // namespace doris