be/src/exec/exchange/vdata_stream_sender.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <brpc/controller.h> |
21 | | #include <butil/errno.h> |
22 | | #include <fmt/format.h> |
23 | | #include <gen_cpp/Partitions_types.h> |
24 | | #include <gen_cpp/Types_types.h> |
25 | | #include <gen_cpp/data.pb.h> |
26 | | #include <gen_cpp/internal_service.pb.h> |
27 | | #include <gen_cpp/types.pb.h> |
28 | | |
29 | | #include <atomic> |
30 | | #include <cstddef> |
31 | | #include <cstdint> |
32 | | #include <memory> |
33 | | #include <ostream> |
34 | | #include <string> |
35 | | #include <utility> |
36 | | #include <vector> |
37 | | |
38 | | #include "common/config.h" |
39 | | #include "common/global_types.h" |
40 | | #include "common/logging.h" |
41 | | #include "common/status.h" |
42 | | #include "core/block/block.h" |
43 | | #include "exec/exchange/vdata_stream_recvr.h" |
44 | | #include "exec/operator/exchange_sink_buffer.h" |
45 | | #include "exec/partitioner/partitioner.h" |
46 | | #include "exec/sink/vrow_distribution.h" |
47 | | #include "exec/sink/vtablet_finder.h" |
48 | | #include "exprs/vexpr_context.h" |
49 | | #include "runtime/runtime_profile.h" |
50 | | #include "service/backend_options.h" |
51 | | #include "storage/tablet_info.h" |
52 | | #include "util/brpc_closure.h" |
53 | | #include "util/uid_util.h" |
54 | | |
55 | | namespace doris { |
56 | | #include "common/compile_check_begin.h" |
57 | | class ObjectPool; |
58 | | class RuntimeState; |
59 | | class RowDescriptor; |
60 | | class TDataSink; |
61 | | class TDataStreamSink; |
62 | | class TPlanFragmentDestination; |
63 | | |
64 | | namespace segment_v2 { |
65 | | enum CompressionTypePB : int; |
66 | | } // namespace segment_v2 |
67 | | |
68 | | class ExchangeSinkOperatorX; |
69 | | class Dependency; |
70 | | class ExchangeSinkLocalState; |
71 | | |
72 | | class BlockSerializer { |
73 | | public: |
74 | | BlockSerializer(ExchangeSinkLocalState* parent, bool is_local = true); |
75 | | #ifdef BE_TEST |
76 | | BlockSerializer() : _batch_size(0) {}; |
77 | | #endif |
78 | | Status next_serialized_block(Block* src, PBlock* dest, size_t num_receivers, bool* serialized, |
79 | | bool eos, const uint32_t* data = nullptr, |
80 | | const uint32_t offset = 0, const uint32_t size = 0); |
81 | | Status serialize_block(const Block* src, PBlock* dest, size_t num_receivers = 1); |
82 | | |
83 | 18.5M | MutableBlock* get_block() const { return _mutable_block.get(); } |
84 | | |
85 | 15.5M | size_t mem_usage() const { return _mutable_block ? _mutable_block->allocated_bytes() : 0; } |
86 | | |
87 | 781k | void reset_block() { _mutable_block.reset(); } |
88 | | |
89 | 6.32M | void set_is_local(bool is_local) { _is_local = is_local; } |
90 | 697 | bool is_local() const { return _is_local; } |
91 | | |
92 | 7.64k | void set_low_memory_mode(RuntimeState* state) { _buffer_mem_limit = 4 * 1024 * 1024; } |
93 | | |
94 | | private: |
95 | | Status _serialize_block(PBlock* dest, size_t num_receivers = 1); |
96 | | |
97 | | ExchangeSinkLocalState* _parent; |
98 | | std::unique_ptr<MutableBlock> _mutable_block; |
99 | | |
100 | | bool _is_local; |
101 | | const int _batch_size; |
102 | | std::atomic<size_t> _buffer_mem_limit = UINT64_MAX; |
103 | | }; |
104 | | |
105 | | class Channel { |
106 | | public: |
107 | | friend class ExchangeSinkBuffer; |
108 | | // Create channel to send data to particular ipaddress/port/query/node |
109 | | // combination. buffer_size is specified in bytes and a soft limit on |
110 | | // how much tuple data is getting accumulated before being sent; it only applies |
111 | | // when data is added via add_row() and not sent directly via send_batch(). |
112 | | Channel(ExchangeSinkLocalState* parent, TNetworkAddress brpc_dest, |
113 | | TUniqueId fragment_instance_id, PlanNodeId dest_node_id) |
114 | 6.28M | : _parent(parent), |
115 | 6.28M | _fragment_instance_id(std::move(fragment_instance_id)), |
116 | 6.28M | _dest_node_id(dest_node_id), |
117 | 6.28M | _brpc_dest_addr(std::move(brpc_dest)), |
118 | 6.28M | _is_local((_brpc_dest_addr.hostname == BackendOptions::get_localhost()) && |
119 | 6.34M | (_brpc_dest_addr.port == config::brpc_port)), |
120 | 6.28M | _serializer(_parent, _is_local) {} |
121 | | |
122 | 6.56M | virtual ~Channel() = default; |
123 | | |
124 | | // Initialize channel. |
125 | | // Returns OK if successful, error indication otherwise. |
126 | | Status init(RuntimeState* state); |
127 | | Status open(RuntimeState* state); |
128 | | std::string debug_string() const; |
129 | | |
130 | | MOCK_FUNCTION Status send_local_block(Block* block, bool eos, bool can_be_moved); |
131 | | // Flush buffered rows and close channel. This function don't wait the response |
132 | | // of close operation, client should call close_wait() to finish channel's close. |
133 | | // We split one close operation into two phases in order to make multiple channels |
134 | | // can run parallel. |
135 | | Status close(RuntimeState* state); |
136 | | |
137 | 0 | std::string get_fragment_instance_id_str() { |
138 | 0 | UniqueId uid(_fragment_instance_id); |
139 | 0 | return uid.to_string(); |
140 | 0 | } |
141 | | |
142 | 22.3M | bool is_local() const { return _is_local; } |
143 | | |
144 | 23.3M | bool is_receiver_eof() const { return _receiver_status.is<ErrorCode::END_OF_FILE>(); } |
145 | | |
146 | 17.6k | void set_receiver_eof(Status st) { _receiver_status = st; } |
147 | | |
148 | | int64_t mem_usage() const; |
149 | | |
150 | | // Asynchronously sends a block |
151 | | // Returns the status of the most recently finished transmit_data |
152 | | // rpc (or OK if there wasn't one that hasn't been reported yet). |
153 | | // if batch is nullptr, send the eof packet |
154 | | MOCK_FUNCTION Status send_remote_block(std::unique_ptr<PBlock>&& block, bool eos = false); |
155 | | MOCK_FUNCTION Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block, |
156 | | bool eos = false); |
157 | | |
158 | | Status add_rows(Block* block, const uint32_t* data, const uint32_t offset, const uint32_t size, |
159 | | bool eos); |
160 | | |
161 | 3.08M | void set_exchange_buffer(ExchangeSinkBuffer* buffer) { _buffer = buffer; } |
162 | | |
163 | 3.07M | InstanceLoId dest_ins_id() const { return _fragment_instance_id.lo; } |
164 | | |
165 | | std::shared_ptr<ExchangeSendCallback<PTransmitDataResult>> get_send_callback(RpcInstance* ins, |
166 | 3.06M | bool eos) { |
167 | 3.06M | if (!_send_callback) { |
168 | 3.05M | _send_callback = ExchangeSendCallback<PTransmitDataResult>::create_shared(); |
169 | 3.05M | } else { |
170 | 8.80k | _send_callback->cntl_->Reset(); |
171 | 8.80k | } |
172 | 3.06M | _send_callback->init(ins, eos); |
173 | 3.06M | return _send_callback; |
174 | 3.06M | } |
175 | | |
176 | | std::shared_ptr<Dependency> get_local_channel_dependency(); |
177 | | |
178 | 3.82k | void set_low_memory_mode(RuntimeState* state) { _serializer.set_low_memory_mode(state); } |
179 | | |
180 | | private: |
181 | | Status _send_local_block(bool eos); |
182 | | Status _send_current_block(bool eos); |
183 | | |
184 | | MOCK_FUNCTION Status _init_brpc_stub(RuntimeState* state); |
185 | | MOCK_FUNCTION Status _find_local_recvr(RuntimeState* state); |
186 | | |
187 | 3.54M | Status _recvr_status() const { |
188 | 3.54M | if (_local_recvr && !_local_recvr->is_closed()) { |
189 | 3.53M | return Status::OK(); |
190 | 3.53M | } |
191 | 4.10k | return Status::EndOfFile( |
192 | 4.10k | "local data stream receiver closed"); // local data stream receiver closed |
193 | 3.54M | } |
194 | | |
195 | | ExchangeSinkLocalState* _parent = nullptr; |
196 | | |
197 | | const TUniqueId _fragment_instance_id; |
198 | | PlanNodeId _dest_node_id; |
199 | | bool _closed {false}; |
200 | | bool _need_close {false}; |
201 | | int _be_number; |
202 | | |
203 | | TNetworkAddress _brpc_dest_addr; |
204 | | |
205 | | std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr; |
206 | | Status _receiver_status; |
207 | | int32_t _brpc_timeout_ms = 500; |
208 | | |
209 | | bool _is_local; |
210 | | std::shared_ptr<VDataStreamRecvr> _local_recvr; |
211 | | |
212 | | BlockSerializer _serializer; |
213 | | |
214 | | ExchangeSinkBuffer* _buffer = nullptr; |
215 | | bool _eos_send = false; |
216 | | std::shared_ptr<ExchangeSendCallback<PTransmitDataResult>> _send_callback; |
217 | | std::unique_ptr<PBlock> _pblock; |
218 | | }; |
219 | | |
220 | | #define HANDLE_CHANNEL_STATUS(state, channel, status) \ |
221 | 6.53M | do { \ |
222 | 6.53M | if (status.is<ErrorCode::END_OF_FILE>()) { \ |
223 | 17.6k | RETURN_IF_ERROR(_handle_eof_channel(state, channel, status)); \ |
224 | 6.51M | } else { \ |
225 | 6.51M | RETURN_IF_ERROR(status); \ |
226 | 6.51M | } \ |
227 | 6.53M | } while (0) |
228 | | |
229 | | } // namespace doris |
230 | | |
231 | | #include "common/compile_check_end.h" |