be/src/exec/operator/exchange_sink_buffer.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <brpc/controller.h> |
21 | | #include <gen_cpp/data.pb.h> |
22 | | #include <gen_cpp/internal_service.pb.h> |
23 | | #include <gen_cpp/types.pb.h> |
24 | | #include <parallel_hashmap/phmap.h> |
25 | | |
26 | | #include <atomic> |
27 | | #include <cstdint> |
28 | | #include <list> |
29 | | #include <memory> |
30 | | #include <mutex> |
31 | | #include <queue> |
32 | | #include <stack> |
33 | | #include <string> |
34 | | |
35 | | #include "common/global_types.h" |
36 | | #include "common/status.h" |
37 | | #include "runtime/runtime_state.h" |
38 | | #include "service/backend_options.h" |
39 | | #include "util/brpc_closure.h" |
40 | | |
41 | | namespace doris { |
42 | | class PTransmitDataParams; |
43 | | class TUniqueId; |
44 | | |
45 | | using InstanceLoId = int64_t; |
46 | | |
47 | | class Dependency; |
48 | | class ExchangeSinkLocalState; |
49 | | |
50 | | class Channel; |
51 | | |
52 | | // We use BroadcastPBlockHolder to hold a broadcasted PBlock. For broadcast shuffle, one PBlock |
53 | | // will be shared between different channel, so we have to use a ref count to mark if this |
54 | | // PBlock is available for next serialization. |
55 | | class BroadcastPBlockHolderMemLimiter; |
56 | | class BroadcastPBlockHolder { |
57 | | ENABLE_FACTORY_CREATOR(BroadcastPBlockHolder); |
58 | | |
59 | | public: |
60 | 2 | BroadcastPBlockHolder() { _pblock = std::make_unique<PBlock>(); } |
61 | | ~BroadcastPBlockHolder(); |
62 | | |
63 | 11 | PBlock* get_block() { return _pblock.get(); } |
64 | | |
65 | 0 | void reset_block() { _pblock->Clear(); } |
66 | | |
67 | | private: |
68 | | friend class BroadcastPBlockHolderMemLimiter; |
69 | | std::unique_ptr<PBlock> _pblock; |
70 | | std::weak_ptr<BroadcastPBlockHolderMemLimiter> _parent_creator; |
71 | 2 | void set_parent_creator(std::shared_ptr<BroadcastPBlockHolderMemLimiter> parent_creator) { |
72 | 2 | _parent_creator = parent_creator; |
73 | 2 | } |
74 | | }; |
75 | | |
76 | | class BroadcastPBlockHolderMemLimiter |
77 | | : public std::enable_shared_from_this<BroadcastPBlockHolderMemLimiter> { |
78 | | ENABLE_FACTORY_CREATOR(BroadcastPBlockHolderMemLimiter); |
79 | | |
80 | | public: |
81 | | BroadcastPBlockHolderMemLimiter() = delete; |
82 | | |
83 | | BroadcastPBlockHolderMemLimiter(std::shared_ptr<Dependency>& broadcast_dependency) |
84 | 3 | : _total_queue_buffer_size_limit(config::exchg_node_buffer_size_bytes), |
85 | 3 | _total_queue_blocks_count_limit(config::num_broadcast_buffer) { |
86 | 3 | _broadcast_dependency = broadcast_dependency; |
87 | 3 | } |
88 | | |
89 | 0 | void set_low_memory_mode() { |
90 | 0 | _total_queue_buffer_size_limit = 1024 * 1024; |
91 | 0 | _total_queue_blocks_count_limit = 8; |
92 | 0 | } |
93 | | |
94 | | void acquire(BroadcastPBlockHolder& holder); |
95 | | void release(const BroadcastPBlockHolder& holder); |
96 | | |
97 | | private: |
98 | | std::atomic_int64_t _total_queue_buffer_size_limit {0}; |
99 | | std::atomic_int64_t _total_queue_blocks_count_limit {0}; |
100 | | std::atomic_int64_t _total_queue_buffer_size {0}; |
101 | | std::atomic_int64_t _total_queue_blocks_count {0}; |
102 | | std::shared_ptr<Dependency> _broadcast_dependency; |
103 | | std::mutex _holders_lock; |
104 | | }; |
105 | | |
106 | | struct TransmitInfo { |
107 | | std::unique_ptr<PBlock> block; |
108 | | bool eos; |
109 | | }; |
110 | | |
111 | | struct BroadcastTransmitInfo { |
112 | | std::shared_ptr<BroadcastPBlockHolder> block_holder = nullptr; |
113 | | bool eos; |
114 | | }; |
115 | | |
116 | | struct RpcInstanceStatistics { |
117 | | int64_t rpc_count = 0; |
118 | | int64_t max_time = 0; |
119 | | int64_t min_time = INT64_MAX; |
120 | | int64_t sum_time = 0; |
121 | | }; |
122 | | |
123 | | // Consolidated structure for RPC instance data |
124 | | struct RpcInstance { |
125 | | // Constructor initializes the instance with the given ID |
126 | 14 | RpcInstance(InstanceLoId id) : id(id) {} |
127 | | |
128 | | // Unique identifier for this RPC instance |
129 | | InstanceLoId id; |
130 | | |
131 | | // Mutex for thread-safe access to this instance's data |
132 | | std::unique_ptr<std::mutex> mutex; |
133 | | |
134 | | // Sequence number for RPC packets, incremented for each packet sent |
135 | | int64_t seq = 0; |
136 | | |
137 | | // Queue for regular data transmission requests |
138 | | std::unordered_map<Channel*, std::queue<TransmitInfo, std::list<TransmitInfo>>> package_queue; |
139 | | |
140 | | // Queue for broadcast data transmission requests |
141 | | std::unordered_map<Channel*, |
142 | | std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>> |
143 | | broadcast_package_queue; |
144 | | |
145 | | // RPC request parameters for data transmission |
146 | | std::shared_ptr<PTransmitDataParams> request; |
147 | | |
148 | | // Flag indicating if the RPC channel is currently idle (no active RPC) |
149 | | bool rpc_channel_is_idle = true; |
150 | | |
151 | | // Flag indicating if the RPC channel has been turned off (no more RPCs will be sent) |
152 | | bool rpc_channel_is_turn_off = false; |
153 | | |
154 | | // Statistics for monitoring RPC performance (latency, counts, etc.) |
155 | | RpcInstanceStatistics stats; |
156 | | |
157 | | // Count of active exchange sinks using this RPC instance |
158 | | int64_t running_sink_count = 0; |
159 | | }; |
160 | | |
161 | | template <typename Response> |
162 | | class ExchangeSendCallback : public ::doris::DummyBrpcCallback<Response> { |
163 | | ENABLE_FACTORY_CREATOR(ExchangeSendCallback); |
164 | | |
165 | | public: |
166 | 22 | ExchangeSendCallback() = default; |
167 | | |
168 | 28 | void init(RpcInstance* ins, bool eos) { |
169 | 28 | _ins = ins; |
170 | 28 | _eos = eos; |
171 | 28 | } |
172 | | |
173 | 22 | ~ExchangeSendCallback() override = default; |
174 | | ExchangeSendCallback(const ExchangeSendCallback& other) = delete; |
175 | | ExchangeSendCallback& operator=(const ExchangeSendCallback& other) = delete; |
176 | 28 | void addFailedHandler(const std::function<void(RpcInstance*, const std::string&)>& fail_fn) { |
177 | 28 | _fail_fn = fail_fn; |
178 | 28 | } |
179 | | void addSuccessHandler(const std::function<void(RpcInstance*, const bool&, const Response&, |
180 | 28 | const int64_t&)>& suc_fn) { |
181 | 28 | _suc_fn = suc_fn; |
182 | 28 | } |
183 | | |
184 | 28 | void call() noexcept override { |
185 | 28 | try { |
186 | 28 | if (::doris::DummyBrpcCallback<Response>::cntl_->Failed()) { |
187 | 1 | std::string err = fmt::format( |
188 | 1 | "failed to send brpc when exchange, error={}, error_text={}, client: {}, " |
189 | 1 | "latency = {}", |
190 | 1 | berror(::doris::DummyBrpcCallback<Response>::cntl_->ErrorCode()), |
191 | 1 | ::doris::DummyBrpcCallback<Response>::cntl_->ErrorText(), |
192 | 1 | BackendOptions::get_localhost(), |
193 | 1 | ::doris::DummyBrpcCallback<Response>::cntl_->latency_us()); |
194 | 1 | _fail_fn(_ins, err); |
195 | 27 | } else { |
196 | 27 | _suc_fn(_ins, _eos, *(::doris::DummyBrpcCallback<Response>::response_), |
197 | 27 | start_rpc_time); |
198 | 27 | } |
199 | 28 | } catch (const std::exception& exp) { |
200 | 0 | LOG(FATAL) << "brpc callback error: " << exp.what(); |
201 | 0 | } catch (...) { |
202 | 0 | LOG(FATAL) << "brpc callback error."; |
203 | 0 | __builtin_unreachable(); |
204 | 0 | } |
205 | 28 | } |
206 | | int64_t start_rpc_time; |
207 | | |
208 | | private: |
209 | | std::function<void(RpcInstance*, const std::string&)> _fail_fn; |
210 | | std::function<void(RpcInstance*, const bool&, const Response&, const int64_t&)> _suc_fn; |
211 | | RpcInstance* _ins; |
212 | | bool _eos; |
213 | | }; |
214 | | |
215 | | // ExchangeSinkBuffer can either be shared among multiple ExchangeSinkLocalState instances |
216 | | // or be individually owned by each ExchangeSinkLocalState. |
217 | | // The following describes the scenario where ExchangeSinkBuffer is shared among multiple ExchangeSinkLocalState instances. |
218 | | // Of course, individual ownership can be seen as a special case where only one ExchangeSinkLocalState shares the buffer. |
219 | | |
220 | | // A sink buffer contains multiple rpc_channels. |
221 | | // Each rpc_channel corresponds to a target instance on the receiving side. |
222 | | // Data is sent using a ping-pong mode within each rpc_channel, |
223 | | // meaning that at most one RPC can exist in a single rpc_channel at a time. |
224 | | // The next RPC can only be sent after the previous one has completed. |
225 | | // |
226 | | // Each exchange sink sends data to all target instances on the receiving side. |
227 | | // If the concurrency is 3, a single rpc_channel will be used simultaneously by three exchange sinks. |
228 | | |
229 | | /* |
230 | | +-----------+ +-----------+ +-----------+ |
231 | | |dest ins id| |dest ins id| |dest ins id| |
232 | | | | | | | | |
233 | | +----+------+ +-----+-----+ +------+----+ |
234 | | | | | |
235 | | | | | |
236 | | +----------------+ +----------------+ +----------------+ |
237 | | | | | | | | |
238 | | sink buffer -------- | rpc_channel | | rpc_channel | | rpc_channel | |
239 | | | | | | | | |
240 | | +-------+--------+ +----------------+ +----------------+ |
241 | | | | | |
242 | | |------------------------+----------------------+ |
243 | | | | | |
244 | | | | | |
245 | | +-----------------+ +-------+---------+ +-------+---------+ |
246 | | | | | | | | |
247 | | | exchange sink | | exchange sink | | exchange sink | |
248 | | | | | | | | |
249 | | +-----------------+ +-----------------+ +-----------------+ |
250 | | */ |
251 | | |
252 | | #if defined(BE_TEST) && !defined(BE_BENCHMARK) |
253 | | void transmit_blockv2(PBackendService_Stub* stub, |
254 | | std::unique_ptr<AutoReleaseClosure<PTransmitDataParams, |
255 | | ExchangeSendCallback<PTransmitDataResult>>> |
256 | | closure); |
257 | | #endif |
258 | | class ExchangeSinkBuffer : public HasTaskExecutionCtx { |
259 | | public: |
260 | | ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, PlanNodeId node_id, |
261 | | RuntimeState* state, const std::vector<InstanceLoId>& sender_ins_ids); |
262 | | #ifdef BE_TEST |
263 | | ExchangeSinkBuffer(RuntimeState* state, int64_t sinknum) |
264 | 4 | : HasTaskExecutionCtx(state), _state(state), _exchange_sink_num(sinknum) {}; |
265 | | #endif |
266 | | |
267 | 10 | ~ExchangeSinkBuffer() override = default; |
268 | | |
269 | | void construct_request(TUniqueId); |
270 | | |
271 | | Status add_block(Channel* channel, TransmitInfo&& request); |
272 | | Status add_block(Channel* channel, BroadcastTransmitInfo&& request); |
273 | | void close(); |
274 | | void update_rpc_time(RpcInstance& ins, int64_t start_rpc_time, int64_t receive_rpc_time); |
275 | | void update_profile(RuntimeProfile* profile); |
276 | | |
277 | | void set_dependency(InstanceLoId sender_ins_id, std::shared_ptr<Dependency> queue_dependency, |
278 | 3 | ExchangeSinkLocalState* local_state) { |
279 | 3 | std::lock_guard l(_m); |
280 | 3 | _queue_deps.push_back(queue_dependency); |
281 | 3 | _parents.push_back(local_state); |
282 | 3 | } |
283 | | |
284 | 0 | void set_low_memory_mode() { _queue_capacity = 8; } |
285 | | std::string debug_each_instance_queue_size(); |
286 | | #ifdef BE_TEST |
287 | | public: |
288 | | #else |
289 | | private: |
290 | | #endif |
291 | | friend class ExchangeSinkLocalState; |
292 | | |
293 | | // Single map to store all RPC instance data |
294 | | phmap::flat_hash_map<InstanceLoId, std::unique_ptr<RpcInstance>> _rpc_instances; |
295 | | std::atomic<size_t> _queue_capacity; |
296 | | |
297 | | // It is set to true only when an RPC fails. Currently, we do not have an error retry mechanism. |
298 | | // If an RPC error occurs, the query will be canceled. |
299 | | std::atomic<bool> _is_failed; |
300 | | PUniqueId _query_id; |
301 | | PlanNodeId _dest_node_id; |
302 | | |
303 | | PlanNodeId _node_id; |
304 | | std::atomic<int64_t> _rpc_count = 0; |
305 | | // The state may be from PipelineFragmentContext if it is shared among multi instances. |
306 | | RuntimeState* _state = nullptr; |
307 | | QueryContext* _context = nullptr; |
308 | | |
309 | | Status _send_rpc(RpcInstance& ins); |
310 | | |
311 | | #ifndef BE_TEST |
312 | | inline void _ended(RpcInstance& ins); |
313 | | inline void _failed(InstanceLoId id, const std::string& err); |
314 | | inline void _set_receiver_eof(RpcInstance& ins); |
315 | | inline void _turn_off_channel(RpcInstance& ins, std::unique_lock<std::mutex>& with_lock); |
316 | | |
317 | | #else |
318 | | virtual void _ended(RpcInstance& ins); |
319 | | virtual void _failed(InstanceLoId id, const std::string& err); |
320 | | virtual void _set_receiver_eof(RpcInstance& ins); |
321 | | virtual void _turn_off_channel(RpcInstance& ins, std::unique_lock<std::mutex>& with_lock); |
322 | | #endif |
323 | | |
324 | | void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time); |
325 | | int64_t get_sum_rpc_time(); |
326 | | |
327 | | // _total_queue_size is the sum of the sizes of all instance_to_package_queues. |
328 | | // Any modification to instance_to_package_queue requires a corresponding modification to _total_queue_size. |
329 | | std::atomic<int> _total_queue_size = 0; |
330 | | |
331 | | // protected the `_queue_deps` and `_parents` |
332 | | std::mutex _m; |
333 | | // _queue_deps is used for memory control. |
334 | | std::vector<std::shared_ptr<Dependency>> _queue_deps; |
335 | | // The ExchangeSinkLocalState in _parents is only used in _turn_off_channel. |
336 | | std::vector<ExchangeSinkLocalState*> _parents; |
337 | | const int64_t _exchange_sink_num; |
338 | | bool _send_multi_blocks = false; |
339 | | int _send_multi_blocks_byte_size = 256 * 1024; |
340 | | }; |
341 | | |
342 | | } // namespace doris |