be/src/exec/exchange/vdata_stream_sender.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/exchange/vdata_stream_sender.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <fmt/ranges.h> // IWYU pragma: keep |
22 | | #include <gen_cpp/DataSinks_types.h> |
23 | | #include <gen_cpp/Metrics_types.h> |
24 | | #include <gen_cpp/Types_types.h> |
25 | | #include <gen_cpp/data.pb.h> |
26 | | #include <gen_cpp/internal_service.pb.h> |
27 | | #include <glog/logging.h> |
28 | | #include <stddef.h> |
29 | | |
30 | | #include <algorithm> |
31 | | #include <cstdint> |
32 | | #include <functional> |
33 | | #include <map> |
34 | | #include <memory> |
35 | | #include <random> |
36 | | |
37 | | #include "common/object_pool.h" |
38 | | #include "common/status.h" |
39 | | #include "core/column/column_const.h" |
40 | | #include "exec/common/sip_hash.h" |
41 | | #include "exec/exchange/vdata_stream_mgr.h" |
42 | | #include "exec/exchange/vdata_stream_recvr.h" |
43 | | #include "exec/operator/exchange_sink_operator.h" |
44 | | #include "exec/operator/result_file_sink_operator.h" |
45 | | #include "exec/sink/vrow_distribution.h" |
46 | | #include "exec/sink/writer/vtablet_writer_v2.h" |
47 | | #include "exprs/vexpr.h" |
48 | | #include "runtime/descriptors.h" |
49 | | #include "runtime/runtime_state.h" |
50 | | #include "runtime/thread_context.h" |
51 | | #include "storage/tablet_info.h" |
52 | | #include "util/proto_util.h" |
53 | | |
54 | | namespace doris { |
55 | | |
56 | 3.90M | Status Channel::init(RuntimeState* state) { |
57 | | // only enable_local_exchange() is true and the destination address is localhost, then the channel is local |
58 | 3.90M | _is_local &= state->enable_local_exchange(); |
59 | | |
60 | 3.90M | if (_is_local) { |
61 | 1.21M | return Status::OK(); |
62 | 1.21M | } |
63 | | |
64 | 2.68M | RETURN_IF_ERROR(_init_brpc_stub(state)); |
65 | | |
66 | 2.68M | return Status::OK(); |
67 | 2.68M | } |
68 | | |
69 | 2.69M | Status Channel::_init_brpc_stub(RuntimeState* state) { |
70 | 2.69M | if (_brpc_dest_addr.hostname.empty()) { |
71 | 0 | LOG(WARNING) << "there is no brpc destination address's hostname" |
72 | 0 | ", maybe version is not compatible."; |
73 | 0 | return Status::InternalError("no brpc destination"); |
74 | 0 | } |
75 | | |
76 | 2.69M | auto network_address = _brpc_dest_addr; |
77 | 2.69M | if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) { |
78 | 2.69M | _brpc_stub = state->exec_env()->brpc_internal_client_cache()->get_client( |
79 | 2.69M | "127.0.0.1", _brpc_dest_addr.port); |
80 | 2.69M | network_address.hostname = "127.0.0.1"; |
81 | 18.4E | } else { |
82 | 18.4E | _brpc_stub = state->exec_env()->brpc_internal_client_cache()->get_client(_brpc_dest_addr); |
83 | 18.4E | } |
84 | | |
85 | 2.69M | if (!_brpc_stub) { |
86 | 0 | std::string msg = fmt::format("Get rpc stub failed, dest_addr={}:{}", |
87 | 0 | _brpc_dest_addr.hostname, _brpc_dest_addr.port); |
88 | 0 | LOG(WARNING) << msg; |
89 | 0 | return Status::InternalError(msg); |
90 | 0 | } |
91 | | |
92 | 2.71M | if (config::enable_brpc_connection_check) { |
93 | 2.71M | state->get_query_ctx()->add_using_brpc_stub(_brpc_dest_addr, _brpc_stub); |
94 | 2.71M | } |
95 | 2.69M | return Status::OK(); |
96 | 2.69M | } |
97 | | |
98 | 3.78M | Status Channel::open(RuntimeState* state) { |
99 | 3.78M | if (_is_local) { |
100 | 1.24M | RETURN_IF_ERROR(_find_local_recvr(state)); |
101 | 1.24M | } |
102 | 3.78M | _be_number = state->be_number(); |
103 | 3.78M | _brpc_timeout_ms = get_execution_rpc_timeout_ms(state->execution_timeout()); |
104 | 3.78M | _serializer.set_is_local(_is_local); |
105 | | |
106 | | // In bucket shuffle join will set fragment_instance_id (-1, -1) |
107 | | // to build a camouflaged empty channel. the ip and port is '0.0.0.0:0" |
108 | | // so the empty channel not need call function close_internal() |
109 | 3.78M | _need_close = (_fragment_instance_id.hi != -1 && _fragment_instance_id.lo != -1); |
110 | | |
111 | 3.78M | return Status::OK(); |
112 | 3.78M | } |
113 | | |
114 | 1.23M | Status Channel::_find_local_recvr(RuntimeState* state) { |
115 | 1.23M | auto st = _parent->state()->exec_env()->vstream_mgr()->find_recvr(_fragment_instance_id, |
116 | 1.23M | _dest_node_id, &_local_recvr); |
117 | 1.23M | if (!st.ok()) { |
118 | | // If could not find local receiver, then it means the channel is EOF. |
119 | | // Maybe downstream task is finished already. |
120 | | //if (_receiver_status.ok()) { |
121 | | // _receiver_status = Status::EndOfFile("local data stream receiver is deconstructed"); |
122 | | //} |
123 | 10.0k | LOG(INFO) << "Query: " << print_id(state->query_id()) |
124 | 10.0k | << " recvr is not found, maybe downstream task is finished. error st is: " |
125 | 10.0k | << st.to_string(); |
126 | 10.0k | } |
127 | 1.23M | return Status::OK(); |
128 | 1.23M | } |
129 | | Status Channel::add_rows(Block* block, const uint32_t* data, const uint32_t offset, |
130 | 3.43M | const uint32_t size, bool eos) { |
131 | 3.43M | if (_fragment_instance_id.lo == -1) { |
132 | 366 | return Status::OK(); |
133 | 366 | } |
134 | | |
135 | 3.43M | bool serialized = false; |
136 | 3.43M | if (_pblock == nullptr) { |
137 | 3.37M | _pblock = std::make_unique<PBlock>(); |
138 | 3.37M | } |
139 | 3.43M | RETURN_IF_ERROR(_serializer.next_serialized_block(block, _pblock.get(), 1, &serialized, eos, |
140 | 3.43M | data, offset, size)); |
141 | 3.43M | if (serialized) { |
142 | 3.35M | RETURN_IF_ERROR(_send_current_block(eos)); |
143 | 3.35M | } |
144 | | |
145 | 3.33M | return Status::OK(); |
146 | 3.43M | } |
147 | | |
148 | 1.24M | std::shared_ptr<Dependency> Channel::get_local_channel_dependency() { |
149 | 1.24M | if (!_local_recvr) { |
150 | 10.0k | return nullptr; |
151 | 10.0k | } |
152 | 1.23M | return _local_recvr->get_local_channel_dependency(_parent->sender_id()); |
153 | 1.24M | } |
154 | | |
155 | 8.43M | int64_t Channel::mem_usage() const { |
156 | 8.43M | return _serializer.mem_usage(); |
157 | 8.43M | } |
158 | | |
159 | 5.07M | Status Channel::send_remote_block(std::unique_ptr<PBlock>&& block, bool eos) { |
160 | 5.07M | COUNTER_UPDATE(_parent->blocks_sent_counter(), 1); |
161 | | |
162 | 5.10M | if (eos) { |
163 | 5.10M | if (_eos_send) { |
164 | 2.57M | return Status::OK(); |
165 | 2.57M | } else { |
166 | 2.53M | _eos_send = true; |
167 | 2.53M | } |
168 | 5.10M | } |
169 | 2.56M | if (eos || block->column_metas_size()) { |
170 | 2.56M | RETURN_IF_ERROR(_buffer->add_block(this, {std::move(block), eos})); |
171 | 2.56M | } |
172 | 2.41M | return Status::OK(); |
173 | 2.49M | } |
174 | | |
175 | 147k | Status Channel::send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block, bool eos) { |
176 | 147k | COUNTER_UPDATE(_parent->blocks_sent_counter(), 1); |
177 | 147k | if (eos) { |
178 | 147k | if (_eos_send) { |
179 | 0 | return Status::OK(); |
180 | 0 | } |
181 | 147k | _eos_send = true; |
182 | 147k | } |
183 | 148k | if (eos || block->get_block()->column_metas_size()) { |
184 | 148k | RETURN_IF_ERROR(_buffer->add_block(this, {block, eos})); |
185 | 148k | } |
186 | 147k | return Status::OK(); |
187 | 147k | } |
188 | | |
189 | 7.10M | Status Channel::_send_current_block(bool eos) { |
190 | 7.10M | if (is_local()) { |
191 | 2.07M | return _send_local_block(eos); |
192 | 2.07M | } |
193 | | // here _pblock maybe nullptr , but we must send the eos to the receiver |
194 | 5.02M | return send_remote_block(std::move(_pblock), eos); |
195 | 7.10M | } |
196 | | |
197 | 2.07M | Status Channel::_send_local_block(bool eos) { |
198 | 2.07M | Block block; |
199 | 2.07M | if (_serializer.get_block() != nullptr) { |
200 | 1.69M | block = _serializer.get_block()->to_block(); |
201 | 1.69M | _serializer.get_block()->set_mutable_columns(block.clone_empty_columns()); |
202 | 1.69M | } |
203 | | |
204 | 2.08M | if (!block.empty() || eos) { // if eos is true, we MUST to send an empty block |
205 | 2.08M | RETURN_IF_ERROR(send_local_block(&block, eos, true)); |
206 | 2.08M | } |
207 | 2.06M | return Status::OK(); |
208 | 2.07M | } |
209 | | |
210 | 2.44M | Status Channel::send_local_block(Block* block, bool eos, bool can_be_moved) { |
211 | 2.44M | SCOPED_TIMER(_parent->local_send_timer()); |
212 | | |
213 | 2.44M | if (eos) { |
214 | 2.35M | if (_eos_send) { |
215 | 1.11M | return Status::OK(); |
216 | 1.24M | } else { |
217 | 1.24M | _eos_send = true; |
218 | 1.24M | } |
219 | 2.35M | } |
220 | | |
221 | 1.33M | if (is_receiver_eof()) { |
222 | 0 | return _receiver_status; |
223 | 0 | } |
224 | 1.33M | auto receiver_status = _recvr_status(); |
225 | | // _local_recvr depdend on ExchangeLocalState* _parent to do some memory counter settings |
226 | | // but it only owns a raw pointer, so that the ExchangeLocalState object may be deconstructed. |
227 | | // Lock the fragment context to ensure the runtime state and other objects are not deconstructed |
228 | 1.33M | TaskExecutionContextSPtr ctx_lock = nullptr; |
229 | 1.33M | if (receiver_status.ok() && _local_recvr != nullptr) { |
230 | 1.32M | ctx_lock = _local_recvr->task_exec_ctx(); |
231 | | // Do not return internal error, because when query finished, the downstream node |
232 | | // may finish before upstream node. And the object maybe deconstructed. If return error |
233 | | // then the upstream node may report error status to FE, the query is failed. |
234 | 1.32M | if (ctx_lock == nullptr) { |
235 | 0 | receiver_status = Status::EndOfFile("local data stream receiver is deconstructed"); |
236 | 0 | } |
237 | 1.32M | } |
238 | 1.33M | if (receiver_status.ok()) { |
239 | 1.32M | COUNTER_UPDATE(_parent->local_bytes_send_counter(), block->bytes()); |
240 | 1.32M | COUNTER_UPDATE(_parent->local_sent_rows(), block->rows()); |
241 | 1.32M | COUNTER_UPDATE(_parent->blocks_sent_counter(), 1); |
242 | | |
243 | 1.32M | const auto sender_id = _parent->sender_id(); |
244 | 1.32M | if (!block->empty()) [[likely]] { |
245 | 163k | _local_recvr->add_block(block, sender_id, can_be_moved); |
246 | 163k | } |
247 | | |
248 | 1.32M | if (eos) [[unlikely]] { |
249 | 1.23M | _local_recvr->remove_sender(sender_id, _be_number, Status::OK()); |
250 | 1.23M | _parent->on_channel_finished(_fragment_instance_id.lo); |
251 | 1.23M | } |
252 | 1.32M | return Status::OK(); |
253 | 1.32M | } else { |
254 | 4.25k | _receiver_status = std::move(receiver_status); |
255 | 4.25k | _parent->on_channel_finished(_fragment_instance_id.lo); |
256 | 4.25k | return _receiver_status; |
257 | 4.25k | } |
258 | 1.33M | } |
259 | | |
260 | 0 | std::string Channel::debug_string() const { |
261 | 0 | fmt::memory_buffer debug_string_buffer; |
262 | 0 | fmt::format_to(debug_string_buffer, |
263 | 0 | "fragment_instance_id: {}, _dest_node_id: {}, _is_local: {}, _receiver_status: " |
264 | 0 | "{}, _closed: {}, _need_close: {}, _be_number: {}, _eos_send: {}", |
265 | 0 | print_id(_fragment_instance_id), _dest_node_id, _is_local, |
266 | 0 | _receiver_status.to_string(), _closed, _need_close, _be_number, _eos_send); |
267 | 0 | if (_is_local) { |
268 | 0 | fmt::format_to(debug_string_buffer, "_local_recvr: {}", _local_recvr->debug_string()); |
269 | 0 | } |
270 | 0 | return fmt::to_string(debug_string_buffer); |
271 | 0 | } |
272 | | |
273 | 3.99M | Status Channel::close(RuntimeState* state) { |
274 | 3.99M | if (_closed) { |
275 | 99.8k | return Status::OK(); |
276 | 99.8k | } |
277 | 3.89M | _closed = true; |
278 | 3.89M | if (!_need_close) { |
279 | 7 | return Status::OK(); |
280 | 7 | } |
281 | | |
282 | 3.89M | if (is_receiver_eof()) { |
283 | 99.0k | _serializer.reset_block(); |
284 | 99.0k | return Status::OK(); |
285 | 3.79M | } else { |
286 | 3.79M | return _send_current_block(true); |
287 | 3.79M | } |
288 | 3.89M | } |
289 | | |
290 | | BlockSerializer::BlockSerializer(ExchangeSinkLocalState* parent, bool is_local) |
291 | 4.50M | : _parent(parent), _is_local(is_local), _batch_size(parent->state()->batch_size()) {} |
292 | | |
293 | | Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, size_t num_receivers, |
294 | | bool* serialized, bool eos, const uint32_t* data, |
295 | 3.60M | const uint32_t offset, const uint32_t size) { |
296 | 3.60M | if (_mutable_block == nullptr) { |
297 | 3.49M | _mutable_block = MutableBlock::create_unique(block->clone_empty()); |
298 | 3.49M | } |
299 | | |
300 | 3.60M | { |
301 | 3.60M | SCOPED_TIMER(_parent->merge_block_timer()); |
302 | 3.60M | if (data) { |
303 | 3.44M | if (size > 0) { |
304 | 70.4k | RETURN_IF_ERROR( |
305 | 70.4k | _mutable_block->add_rows(block, data + offset, data + offset + size)); |
306 | 70.4k | } |
307 | 3.44M | } else if (!block->empty()) { |
308 | 60.7k | RETURN_IF_ERROR(_mutable_block->merge(*block)); |
309 | 60.7k | } |
310 | 3.60M | } |
311 | | |
312 | 3.63M | if (_mutable_block->rows() >= _batch_size || eos || |
313 | 3.60M | (_mutable_block->rows() > 0 && _mutable_block->allocated_bytes() > _buffer_mem_limit)) { |
314 | 3.52M | if (!_is_local) { |
315 | 2.52M | RETURN_IF_ERROR(_serialize_block(dest, num_receivers)); |
316 | 2.52M | } |
317 | 3.52M | *serialized = true; |
318 | 3.52M | } else { |
319 | 84.1k | *serialized = false; |
320 | 84.1k | } |
321 | 3.60M | return Status::OK(); |
322 | 3.60M | } |
323 | | |
324 | 2.51M | Status BlockSerializer::_serialize_block(PBlock* dest, size_t num_receivers) { |
325 | 2.51M | if (_mutable_block && _mutable_block->rows() > 0) { |
326 | 26.1k | auto block = _mutable_block->to_block(); |
327 | 26.1k | RETURN_IF_ERROR(serialize_block(&block, dest, num_receivers)); |
328 | 26.1k | if (_parent->state()->low_memory_mode()) { |
329 | 0 | reset_block(); |
330 | 26.1k | } else { |
331 | 26.1k | block.clear_column_data(); |
332 | 26.1k | _mutable_block->set_mutable_columns(block.mutate_columns()); |
333 | 26.1k | } |
334 | 26.1k | } |
335 | | |
336 | 2.51M | return Status::OK(); |
337 | 2.51M | } |
338 | | |
339 | 67.4k | Status BlockSerializer::serialize_block(const Block* src, PBlock* dest, size_t num_receivers) { |
340 | 67.4k | SCOPED_TIMER(_parent->_serialize_batch_timer); |
341 | 67.4k | dest->Clear(); |
342 | 67.4k | size_t uncompressed_bytes = 0, compressed_bytes = 0; |
343 | 67.4k | int64_t compress_time = 0; |
344 | 67.4k | RETURN_IF_ERROR(src->serialize(_parent->_state->be_exec_version(), dest, &uncompressed_bytes, |
345 | 67.4k | &compressed_bytes, &compress_time, _parent->compression_type(), |
346 | 67.4k | _parent->transfer_large_data_by_brpc())); |
347 | 67.4k | COUNTER_UPDATE(_parent->_bytes_sent_counter, compressed_bytes * num_receivers); |
348 | 67.4k | COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, uncompressed_bytes * num_receivers); |
349 | 67.4k | COUNTER_UPDATE(_parent->_compress_timer, compress_time); |
350 | 67.4k | #ifndef BE_TEST |
351 | 67.4k | _parent->state()->get_query_ctx()->resource_ctx()->io_context()->update_shuffle_send_bytes( |
352 | 67.4k | compressed_bytes * num_receivers); |
353 | 67.4k | _parent->state()->get_query_ctx()->resource_ctx()->io_context()->update_shuffle_send_rows( |
354 | 67.4k | src->rows() * num_receivers); |
355 | 67.4k | #endif |
356 | | |
357 | 67.4k | return Status::OK(); |
358 | 67.4k | } |
359 | | |
360 | | } // namespace doris |