be/src/exec/exchange/vdata_stream_sender.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <brpc/controller.h> |
21 | | #include <butil/errno.h> |
22 | | #include <fmt/format.h> |
23 | | #include <gen_cpp/Partitions_types.h> |
24 | | #include <gen_cpp/Types_types.h> |
25 | | #include <gen_cpp/data.pb.h> |
26 | | #include <gen_cpp/internal_service.pb.h> |
27 | | #include <gen_cpp/types.pb.h> |
28 | | |
29 | | #include <atomic> |
30 | | #include <cstddef> |
31 | | #include <cstdint> |
32 | | #include <memory> |
33 | | #include <ostream> |
34 | | #include <string> |
35 | | #include <utility> |
36 | | #include <vector> |
37 | | |
38 | | #include "common/config.h" |
39 | | #include "common/global_types.h" |
40 | | #include "common/logging.h" |
41 | | #include "common/status.h" |
42 | | #include "core/block/block.h" |
43 | | #include "exec/exchange/vdata_stream_recvr.h" |
44 | | #include "exec/operator/exchange_sink_buffer.h" |
45 | | #include "exec/partitioner/partitioner.h" |
46 | | #include "exec/sink/vrow_distribution.h" |
47 | | #include "exec/sink/vtablet_finder.h" |
48 | | #include "exprs/vexpr_context.h" |
49 | | #include "runtime/runtime_profile.h" |
50 | | #include "service/backend_options.h" |
51 | | #include "storage/tablet_info.h" |
52 | | #include "util/brpc_closure.h" |
53 | | #include "util/uid_util.h" |
54 | | |
55 | | namespace doris { |
56 | | class ObjectPool; |
57 | | class RuntimeState; |
58 | | class RowDescriptor; |
59 | | class TDataSink; |
60 | | class TDataStreamSink; |
61 | | class TPlanFragmentDestination; |
62 | | |
63 | | namespace segment_v2 { |
64 | | enum CompressionTypePB : int; |
65 | | } // namespace segment_v2 |
66 | | |
67 | | class ExchangeSinkOperatorX; |
68 | | class Dependency; |
69 | | class ExchangeSinkLocalState; |
70 | | |
71 | | class BlockSerializer { |
72 | | public: |
73 | | BlockSerializer(ExchangeSinkLocalState* parent, bool is_local = true); |
74 | | #ifdef BE_TEST |
75 | 18 | BlockSerializer() : _batch_size(0) {}; |
76 | | #endif |
77 | | Status next_serialized_block(Block* src, PBlock* dest, size_t num_receivers, bool* serialized, |
78 | | bool eos, const uint32_t* data = nullptr, |
79 | | const uint32_t offset = 0, const uint32_t size = 0); |
80 | | Status serialize_block(const Block* src, PBlock* dest, size_t num_receivers = 1); |
81 | | |
82 | 14 | MutableBlock* get_block() const { return _mutable_block.get(); } |
83 | | |
84 | 64 | size_t mem_usage() const { return _mutable_block ? _mutable_block->allocated_bytes() : 0; } |
85 | | |
86 | 7 | void reset_block() { _mutable_block.reset(); } |
87 | | |
88 | 23 | void set_is_local(bool is_local) { _is_local = is_local; } |
89 | 0 | bool is_local() const { return _is_local; } |
90 | | |
91 | 0 | void set_low_memory_mode(RuntimeState* state) { _buffer_mem_limit = 4 * 1024 * 1024; } |
92 | | |
93 | | private: |
94 | | Status _serialize_block(PBlock* dest, size_t num_receivers = 1); |
95 | | |
96 | | ExchangeSinkLocalState* _parent; |
97 | | std::unique_ptr<MutableBlock> _mutable_block; |
98 | | |
99 | | bool _is_local; |
100 | | const int _batch_size; |
101 | | std::atomic<size_t> _buffer_mem_limit = UINT64_MAX; |
102 | | }; |
103 | | |
104 | | class Channel { |
105 | | public: |
106 | | friend class ExchangeSinkBuffer; |
107 | | // Create channel to send data to particular ipaddress/port/query/node |
108 | | // combination. buffer_size is specified in bytes and a soft limit on |
109 | | // how much tuple data is getting accumulated before being sent; it only applies |
110 | | // when data is added via add_row() and not sent directly via send_batch(). |
111 | | Channel(ExchangeSinkLocalState* parent, TNetworkAddress brpc_dest, |
112 | | TUniqueId fragment_instance_id, PlanNodeId dest_node_id) |
113 | 72 | : _parent(parent), |
114 | 72 | _fragment_instance_id(std::move(fragment_instance_id)), |
115 | 72 | _dest_node_id(dest_node_id), |
116 | 72 | _brpc_dest_addr(std::move(brpc_dest)), |
117 | 72 | _is_local((_brpc_dest_addr.hostname == BackendOptions::get_localhost()) && |
118 | 72 | (_brpc_dest_addr.port == config::brpc_port)), |
119 | 72 | _serializer(_parent, _is_local) {} |
120 | | |
121 | 72 | virtual ~Channel() = default; |
122 | | |
123 | | // Initialize channel. |
124 | | // Returns OK if successful, error indication otherwise. |
125 | | Status init(RuntimeState* state); |
126 | | Status open(RuntimeState* state); |
127 | | std::string debug_string() const; |
128 | | |
129 | | MOCK_FUNCTION Status send_local_block(Block* block, bool eos, bool can_be_moved); |
130 | | // Flush buffered rows and close channel. This function don't wait the response |
131 | | // of close operation, client should call close_wait() to finish channel's close. |
132 | | // We split one close operation into two phases in order to make multiple channels |
133 | | // can run parallel. |
134 | | Status close(RuntimeState* state); |
135 | | |
136 | 0 | std::string get_fragment_instance_id_str() { |
137 | 0 | UniqueId uid(_fragment_instance_id); |
138 | 0 | return uid.to_string(); |
139 | 0 | } |
140 | | |
141 | 74 | bool is_local() const { return _is_local; } |
142 | | |
143 | 76 | bool is_receiver_eof() const { return _receiver_status.is<ErrorCode::END_OF_FILE>(); } |
144 | | |
145 | 19 | void set_receiver_eof(Status st) { _receiver_status = st; } |
146 | | |
147 | | int64_t mem_usage() const; |
148 | | |
149 | | // Asynchronously sends a block |
150 | | // Returns the status of the most recently finished transmit_data |
151 | | // rpc (or OK if there wasn't one that hasn't been reported yet). |
152 | | // if batch is nullptr, send the eof packet |
153 | | MOCK_FUNCTION Status send_remote_block(std::unique_ptr<PBlock>&& block, bool eos = false); |
154 | | MOCK_FUNCTION Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block, |
155 | | bool eos = false); |
156 | | |
157 | | Status add_rows(Block* block, const uint32_t* data, const uint32_t offset, const uint32_t size, |
158 | | bool eos); |
159 | | |
160 | 15 | void set_exchange_buffer(ExchangeSinkBuffer* buffer) { _buffer = buffer; } |
161 | | |
162 | 69 | InstanceLoId dest_ins_id() const { return _fragment_instance_id.lo; } |
163 | | |
164 | | std::shared_ptr<ExchangeSendCallback<PTransmitDataResult>> get_send_callback(RpcInstance* ins, |
165 | 28 | bool eos) { |
166 | 28 | if (!_send_callback) { |
167 | 22 | _send_callback = ExchangeSendCallback<PTransmitDataResult>::create_shared(); |
168 | 22 | } else { |
169 | 6 | _send_callback->cntl_->Reset(); |
170 | 6 | } |
171 | 28 | _send_callback->init(ins, eos); |
172 | 28 | return _send_callback; |
173 | 28 | } |
174 | | |
175 | | std::shared_ptr<Dependency> get_local_channel_dependency(); |
176 | | |
177 | 0 | void set_low_memory_mode(RuntimeState* state) { _serializer.set_low_memory_mode(state); } |
178 | | |
179 | | private: |
180 | | Status _send_local_block(bool eos); |
181 | | Status _send_current_block(bool eos); |
182 | | |
183 | | MOCK_FUNCTION Status _init_brpc_stub(RuntimeState* state); |
184 | | MOCK_FUNCTION Status _find_local_recvr(RuntimeState* state); |
185 | | |
186 | 7 | Status _recvr_status() const { |
187 | 7 | if (_local_recvr && !_local_recvr->is_closed()) { |
188 | 7 | return Status::OK(); |
189 | 7 | } |
190 | 0 | return Status::EndOfFile( |
191 | 0 | "local data stream receiver closed"); // local data stream receiver closed |
192 | 7 | } |
193 | | |
194 | | ExchangeSinkLocalState* _parent = nullptr; |
195 | | |
196 | | const TUniqueId _fragment_instance_id; |
197 | | PlanNodeId _dest_node_id; |
198 | | bool _closed {false}; |
199 | | bool _need_close {false}; |
200 | | int _be_number; |
201 | | |
202 | | TNetworkAddress _brpc_dest_addr; |
203 | | |
204 | | std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr; |
205 | | Status _receiver_status; |
206 | | int32_t _brpc_timeout_ms = 500; |
207 | | |
208 | | bool _is_local; |
209 | | std::shared_ptr<VDataStreamRecvr> _local_recvr; |
210 | | |
211 | | BlockSerializer _serializer; |
212 | | |
213 | | ExchangeSinkBuffer* _buffer = nullptr; |
214 | | bool _eos_send = false; |
215 | | std::shared_ptr<ExchangeSendCallback<PTransmitDataResult>> _send_callback; |
216 | | std::unique_ptr<PBlock> _pblock; |
217 | | }; |
218 | | |
219 | | #define HANDLE_CHANNEL_STATUS(state, channel, status) \ |
220 | 22 | do { \ |
221 | 22 | if (status.is<ErrorCode::END_OF_FILE>()) { \ |
222 | 0 | RETURN_IF_ERROR(_handle_eof_channel(state, channel, status)); \ |
223 | 22 | } else { \ |
224 | 22 | RETURN_IF_ERROR(status); \ |
225 | 22 | } \ |
226 | 22 | } while (0) |
227 | | |
228 | | } // namespace doris |