be/src/exec/operator/exchange_sink_buffer.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <brpc/controller.h> |
21 | | #include <gen_cpp/data.pb.h> |
22 | | #include <gen_cpp/internal_service.pb.h> |
23 | | #include <gen_cpp/types.pb.h> |
24 | | #include <parallel_hashmap/phmap.h> |
25 | | |
26 | | #include <atomic> |
27 | | #include <cstdint> |
28 | | #include <list> |
29 | | #include <memory> |
30 | | #include <mutex> |
31 | | #include <queue> |
32 | | #include <stack> |
33 | | #include <string> |
34 | | |
35 | | #include "common/global_types.h" |
36 | | #include "common/status.h" |
37 | | #include "runtime/runtime_state.h" |
38 | | #include "service/backend_options.h" |
39 | | #include "util/brpc_closure.h" |
40 | | |
41 | | namespace doris { |
42 | | #include "common/compile_check_begin.h" |
43 | | class PTransmitDataParams; |
44 | | class TUniqueId; |
45 | | |
46 | | using InstanceLoId = int64_t; |
47 | | |
48 | | class Dependency; |
49 | | class ExchangeSinkLocalState; |
50 | | |
51 | | class Channel; |
52 | | |
53 | | // We use BroadcastPBlockHolder to hold a broadcasted PBlock. For broadcast shuffle, one PBlock |
54 | | // will be shared between different channel, so we have to use a ref count to mark if this |
55 | | // PBlock is available for next serialization. |
56 | | class BroadcastPBlockHolderMemLimiter; |
57 | | class BroadcastPBlockHolder { |
58 | | ENABLE_FACTORY_CREATOR(BroadcastPBlockHolder); |
59 | | |
60 | | public: |
61 | 2 | BroadcastPBlockHolder() { _pblock = std::make_unique<PBlock>(); } |
62 | | ~BroadcastPBlockHolder(); |
63 | | |
64 | 11 | PBlock* get_block() { return _pblock.get(); } |
65 | | |
66 | 0 | void reset_block() { _pblock->Clear(); } |
67 | | |
68 | | private: |
69 | | friend class BroadcastPBlockHolderMemLimiter; |
70 | | std::unique_ptr<PBlock> _pblock; |
71 | | std::weak_ptr<BroadcastPBlockHolderMemLimiter> _parent_creator; |
72 | 2 | void set_parent_creator(std::shared_ptr<BroadcastPBlockHolderMemLimiter> parent_creator) { |
73 | 2 | _parent_creator = parent_creator; |
74 | 2 | } |
75 | | }; |
76 | | |
77 | | class BroadcastPBlockHolderMemLimiter |
78 | | : public std::enable_shared_from_this<BroadcastPBlockHolderMemLimiter> { |
79 | | ENABLE_FACTORY_CREATOR(BroadcastPBlockHolderMemLimiter); |
80 | | |
81 | | public: |
82 | | BroadcastPBlockHolderMemLimiter() = delete; |
83 | | |
84 | | BroadcastPBlockHolderMemLimiter(std::shared_ptr<Dependency>& broadcast_dependency) |
85 | 3 | : _total_queue_buffer_size_limit(config::exchg_node_buffer_size_bytes), |
86 | 3 | _total_queue_blocks_count_limit(config::num_broadcast_buffer) { |
87 | 3 | _broadcast_dependency = broadcast_dependency; |
88 | 3 | } |
89 | | |
90 | 0 | void set_low_memory_mode() { |
91 | 0 | _total_queue_buffer_size_limit = 1024 * 1024; |
92 | 0 | _total_queue_blocks_count_limit = 8; |
93 | 0 | } |
94 | | |
95 | | void acquire(BroadcastPBlockHolder& holder); |
96 | | void release(const BroadcastPBlockHolder& holder); |
97 | | |
98 | | private: |
99 | | std::atomic_int64_t _total_queue_buffer_size_limit {0}; |
100 | | std::atomic_int64_t _total_queue_blocks_count_limit {0}; |
101 | | std::atomic_int64_t _total_queue_buffer_size {0}; |
102 | | std::atomic_int64_t _total_queue_blocks_count {0}; |
103 | | std::shared_ptr<Dependency> _broadcast_dependency; |
104 | | std::mutex _holders_lock; |
105 | | }; |
106 | | |
107 | | struct TransmitInfo { |
108 | | std::unique_ptr<PBlock> block; |
109 | | bool eos; |
110 | | }; |
111 | | |
112 | | struct BroadcastTransmitInfo { |
113 | | std::shared_ptr<BroadcastPBlockHolder> block_holder = nullptr; |
114 | | bool eos; |
115 | | }; |
116 | | |
117 | | struct RpcInstanceStatistics { |
118 | | int64_t rpc_count = 0; |
119 | | int64_t max_time = 0; |
120 | | int64_t min_time = INT64_MAX; |
121 | | int64_t sum_time = 0; |
122 | | }; |
123 | | |
124 | | // Consolidated structure for RPC instance data |
125 | | struct RpcInstance { |
126 | | // Constructor initializes the instance with the given ID |
127 | 14 | RpcInstance(InstanceLoId id) : id(id) {} |
128 | | |
129 | | // Unique identifier for this RPC instance |
130 | | InstanceLoId id; |
131 | | |
132 | | // Mutex for thread-safe access to this instance's data |
133 | | std::unique_ptr<std::mutex> mutex; |
134 | | |
135 | | // Sequence number for RPC packets, incremented for each packet sent |
136 | | int64_t seq = 0; |
137 | | |
138 | | // Queue for regular data transmission requests |
139 | | std::unordered_map<Channel*, std::queue<TransmitInfo, std::list<TransmitInfo>>> package_queue; |
140 | | |
141 | | // Queue for broadcast data transmission requests |
142 | | std::unordered_map<Channel*, |
143 | | std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>> |
144 | | broadcast_package_queue; |
145 | | |
146 | | // RPC request parameters for data transmission |
147 | | std::shared_ptr<PTransmitDataParams> request; |
148 | | |
149 | | // Flag indicating if the RPC channel is currently idle (no active RPC) |
150 | | bool rpc_channel_is_idle = true; |
151 | | |
152 | | // Flag indicating if the RPC channel has been turned off (no more RPCs will be sent) |
153 | | bool rpc_channel_is_turn_off = false; |
154 | | |
155 | | // Statistics for monitoring RPC performance (latency, counts, etc.) |
156 | | RpcInstanceStatistics stats; |
157 | | |
158 | | // Count of active exchange sinks using this RPC instance |
159 | | int64_t running_sink_count = 0; |
160 | | }; |
161 | | |
162 | | template <typename Response> |
163 | | class ExchangeSendCallback : public ::doris::DummyBrpcCallback<Response> { |
164 | | ENABLE_FACTORY_CREATOR(ExchangeSendCallback); |
165 | | |
166 | | public: |
167 | 22 | ExchangeSendCallback() = default; |
168 | | |
169 | 28 | void init(RpcInstance* ins, bool eos) { |
170 | 28 | _ins = ins; |
171 | 28 | _eos = eos; |
172 | 28 | } |
173 | | |
174 | 22 | ~ExchangeSendCallback() override = default; |
175 | | ExchangeSendCallback(const ExchangeSendCallback& other) = delete; |
176 | | ExchangeSendCallback& operator=(const ExchangeSendCallback& other) = delete; |
177 | 28 | void addFailedHandler(const std::function<void(RpcInstance*, const std::string&)>& fail_fn) { |
178 | 28 | _fail_fn = fail_fn; |
179 | 28 | } |
180 | | void addSuccessHandler(const std::function<void(RpcInstance*, const bool&, const Response&, |
181 | 28 | const int64_t&)>& suc_fn) { |
182 | 28 | _suc_fn = suc_fn; |
183 | 28 | } |
184 | | |
185 | 28 | void call() noexcept override { |
186 | 28 | try { |
187 | 28 | if (::doris::DummyBrpcCallback<Response>::cntl_->Failed()) { |
188 | 1 | std::string err = fmt::format( |
189 | 1 | "failed to send brpc when exchange, error={}, error_text={}, client: {}, " |
190 | 1 | "latency = {}", |
191 | 1 | berror(::doris::DummyBrpcCallback<Response>::cntl_->ErrorCode()), |
192 | 1 | ::doris::DummyBrpcCallback<Response>::cntl_->ErrorText(), |
193 | 1 | BackendOptions::get_localhost(), |
194 | 1 | ::doris::DummyBrpcCallback<Response>::cntl_->latency_us()); |
195 | 1 | _fail_fn(_ins, err); |
196 | 27 | } else { |
197 | 27 | _suc_fn(_ins, _eos, *(::doris::DummyBrpcCallback<Response>::response_), |
198 | 27 | start_rpc_time); |
199 | 27 | } |
200 | 28 | } catch (const std::exception& exp) { |
201 | 0 | LOG(FATAL) << "brpc callback error: " << exp.what(); |
202 | 0 | } catch (...) { |
203 | 0 | LOG(FATAL) << "brpc callback error."; |
204 | 0 | __builtin_unreachable(); |
205 | 0 | } |
206 | 28 | } |
207 | | int64_t start_rpc_time; |
208 | | |
209 | | private: |
210 | | std::function<void(RpcInstance*, const std::string&)> _fail_fn; |
211 | | std::function<void(RpcInstance*, const bool&, const Response&, const int64_t&)> _suc_fn; |
212 | | RpcInstance* _ins; |
213 | | bool _eos; |
214 | | }; |
215 | | |
216 | | // ExchangeSinkBuffer can either be shared among multiple ExchangeSinkLocalState instances |
217 | | // or be individually owned by each ExchangeSinkLocalState. |
218 | | // The following describes the scenario where ExchangeSinkBuffer is shared among multiple ExchangeSinkLocalState instances. |
219 | | // Of course, individual ownership can be seen as a special case where only one ExchangeSinkLocalState shares the buffer. |
220 | | |
221 | | // A sink buffer contains multiple rpc_channels. |
222 | | // Each rpc_channel corresponds to a target instance on the receiving side. |
223 | | // Data is sent using a ping-pong mode within each rpc_channel, |
224 | | // meaning that at most one RPC can exist in a single rpc_channel at a time. |
225 | | // The next RPC can only be sent after the previous one has completed. |
226 | | // |
227 | | // Each exchange sink sends data to all target instances on the receiving side. |
228 | | // If the concurrency is 3, a single rpc_channel will be used simultaneously by three exchange sinks. |
229 | | |
230 | | /* |
231 | | +-----------+ +-----------+ +-----------+ |
232 | | |dest ins id| |dest ins id| |dest ins id| |
233 | | | | | | | | |
234 | | +----+------+ +-----+-----+ +------+----+ |
235 | | | | | |
236 | | | | | |
237 | | +----------------+ +----------------+ +----------------+ |
238 | | | | | | | | |
239 | | sink buffer -------- | rpc_channel | | rpc_channel | | rpc_channel | |
240 | | | | | | | | |
241 | | +-------+--------+ +----------------+ +----------------+ |
242 | | | | | |
243 | | |------------------------+----------------------+ |
244 | | | | | |
245 | | | | | |
246 | | +-----------------+ +-------+---------+ +-------+---------+ |
247 | | | | | | | | |
248 | | | exchange sink | | exchange sink | | exchange sink | |
249 | | | | | | | | |
250 | | +-----------------+ +-----------------+ +-----------------+ |
251 | | */ |
252 | | |
253 | | #if defined(BE_TEST) && !defined(BE_BENCHMARK) |
254 | | void transmit_blockv2(PBackendService_Stub* stub, |
255 | | std::unique_ptr<AutoReleaseClosure<PTransmitDataParams, |
256 | | ExchangeSendCallback<PTransmitDataResult>>> |
257 | | closure); |
258 | | #endif |
259 | | class ExchangeSinkBuffer : public HasTaskExecutionCtx { |
260 | | public: |
261 | | ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, PlanNodeId node_id, |
262 | | RuntimeState* state, const std::vector<InstanceLoId>& sender_ins_ids); |
263 | | #ifdef BE_TEST |
264 | | ExchangeSinkBuffer(RuntimeState* state, int64_t sinknum) |
265 | 4 | : HasTaskExecutionCtx(state), _state(state), _exchange_sink_num(sinknum) {}; |
266 | | #endif |
267 | | |
268 | 10 | ~ExchangeSinkBuffer() override = default; |
269 | | |
270 | | void construct_request(TUniqueId); |
271 | | |
272 | | Status add_block(Channel* channel, TransmitInfo&& request); |
273 | | Status add_block(Channel* channel, BroadcastTransmitInfo&& request); |
274 | | void close(); |
275 | | void update_rpc_time(RpcInstance& ins, int64_t start_rpc_time, int64_t receive_rpc_time); |
276 | | void update_profile(RuntimeProfile* profile); |
277 | | |
278 | | void set_dependency(InstanceLoId sender_ins_id, std::shared_ptr<Dependency> queue_dependency, |
279 | 3 | ExchangeSinkLocalState* local_state) { |
280 | 3 | std::lock_guard l(_m); |
281 | 3 | _queue_deps.push_back(queue_dependency); |
282 | 3 | _parents.push_back(local_state); |
283 | 3 | } |
284 | | |
285 | 0 | void set_low_memory_mode() { _queue_capacity = 8; } |
286 | | std::string debug_each_instance_queue_size(); |
287 | | #ifdef BE_TEST |
288 | | public: |
289 | | #else |
290 | | private: |
291 | | #endif |
292 | | friend class ExchangeSinkLocalState; |
293 | | |
294 | | // Single map to store all RPC instance data |
295 | | phmap::flat_hash_map<InstanceLoId, std::unique_ptr<RpcInstance>> _rpc_instances; |
296 | | std::atomic<size_t> _queue_capacity; |
297 | | |
298 | | // It is set to true only when an RPC fails. Currently, we do not have an error retry mechanism. |
299 | | // If an RPC error occurs, the query will be canceled. |
300 | | std::atomic<bool> _is_failed; |
301 | | PUniqueId _query_id; |
302 | | PlanNodeId _dest_node_id; |
303 | | |
304 | | PlanNodeId _node_id; |
305 | | std::atomic<int64_t> _rpc_count = 0; |
306 | | // The state may be from PipelineFragmentContext if it is shared among multi instances. |
307 | | RuntimeState* _state = nullptr; |
308 | | QueryContext* _context = nullptr; |
309 | | |
310 | | Status _send_rpc(RpcInstance& ins); |
311 | | |
312 | | #ifndef BE_TEST |
313 | | inline void _ended(RpcInstance& ins); |
314 | | inline void _failed(InstanceLoId id, const std::string& err); |
315 | | inline void _set_receiver_eof(RpcInstance& ins); |
316 | | inline void _turn_off_channel(RpcInstance& ins, std::unique_lock<std::mutex>& with_lock); |
317 | | |
318 | | #else |
319 | | virtual void _ended(RpcInstance& ins); |
320 | | virtual void _failed(InstanceLoId id, const std::string& err); |
321 | | virtual void _set_receiver_eof(RpcInstance& ins); |
322 | | virtual void _turn_off_channel(RpcInstance& ins, std::unique_lock<std::mutex>& with_lock); |
323 | | #endif |
324 | | |
325 | | void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time); |
326 | | int64_t get_sum_rpc_time(); |
327 | | |
328 | | // _total_queue_size is the sum of the sizes of all instance_to_package_queues. |
329 | | // Any modification to instance_to_package_queue requires a corresponding modification to _total_queue_size. |
330 | | std::atomic<int> _total_queue_size = 0; |
331 | | |
332 | | // protected the `_queue_deps` and `_parents` |
333 | | std::mutex _m; |
334 | | // _queue_deps is used for memory control. |
335 | | std::vector<std::shared_ptr<Dependency>> _queue_deps; |
336 | | // The ExchangeSinkLocalState in _parents is only used in _turn_off_channel. |
337 | | std::vector<ExchangeSinkLocalState*> _parents; |
338 | | const int64_t _exchange_sink_num; |
339 | | bool _send_multi_blocks = false; |
340 | | int _send_multi_blocks_byte_size = 256 * 1024; |
341 | | }; |
342 | | |
343 | | #include "common/compile_check_end.h" |
344 | | } // namespace doris |