be/src/exec/exchange/vdata_stream_sender.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/exchange/vdata_stream_sender.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <fmt/ranges.h> // IWYU pragma: keep |
22 | | #include <gen_cpp/DataSinks_types.h> |
23 | | #include <gen_cpp/Metrics_types.h> |
24 | | #include <gen_cpp/Types_types.h> |
25 | | #include <gen_cpp/data.pb.h> |
26 | | #include <gen_cpp/internal_service.pb.h> |
27 | | #include <glog/logging.h> |
28 | | #include <stddef.h> |
29 | | |
30 | | #include <algorithm> |
31 | | #include <cstdint> |
32 | | #include <functional> |
33 | | #include <map> |
34 | | #include <memory> |
35 | | #include <random> |
36 | | |
37 | | #include "common/object_pool.h" |
38 | | #include "common/status.h" |
39 | | #include "core/column/column_const.h" |
40 | | #include "exec/common/sip_hash.h" |
41 | | #include "exec/exchange/vdata_stream_mgr.h" |
42 | | #include "exec/exchange/vdata_stream_recvr.h" |
43 | | #include "exec/operator/exchange_sink_operator.h" |
44 | | #include "exec/operator/result_file_sink_operator.h" |
45 | | #include "exec/sink/vrow_distribution.h" |
46 | | #include "exec/sink/writer/vtablet_writer_v2.h" |
47 | | #include "exprs/vexpr.h" |
48 | | #include "runtime/descriptors.h" |
49 | | #include "runtime/runtime_state.h" |
50 | | #include "runtime/thread_context.h" |
51 | | #include "storage/tablet_info.h" |
52 | | #include "util/proto_util.h" |
53 | | |
54 | | namespace doris { |
55 | | #include "common/compile_check_begin.h" |
56 | | |
57 | 5.96M | Status Channel::init(RuntimeState* state) { |
58 | | // only enable_local_exchange() is true and the destination address is localhost, then the channel is local |
59 | 5.96M | _is_local &= state->enable_local_exchange(); |
60 | | |
61 | 5.96M | if (_is_local) { |
62 | 2.39M | return Status::OK(); |
63 | 2.39M | } |
64 | | |
65 | 3.57M | RETURN_IF_ERROR(_init_brpc_stub(state)); |
66 | | |
67 | 3.57M | return Status::OK(); |
68 | 3.57M | } |
69 | | |
70 | 3.58M | Status Channel::_init_brpc_stub(RuntimeState* state) { |
71 | 3.58M | if (_brpc_dest_addr.hostname.empty()) { |
72 | 0 | LOG(WARNING) << "there is no brpc destination address's hostname" |
73 | 0 | ", maybe version is not compatible."; |
74 | 0 | return Status::InternalError("no brpc destination"); |
75 | 0 | } |
76 | | |
77 | 3.58M | auto network_address = _brpc_dest_addr; |
78 | 3.58M | if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) { |
79 | 3.58M | _brpc_stub = state->exec_env()->brpc_internal_client_cache()->get_client( |
80 | 3.58M | "127.0.0.1", _brpc_dest_addr.port); |
81 | 3.58M | network_address.hostname = "127.0.0.1"; |
82 | 18.4E | } else { |
83 | 18.4E | _brpc_stub = state->exec_env()->brpc_internal_client_cache()->get_client(_brpc_dest_addr); |
84 | 18.4E | } |
85 | | |
86 | 3.58M | if (!_brpc_stub) { |
87 | 0 | std::string msg = fmt::format("Get rpc stub failed, dest_addr={}:{}", |
88 | 0 | _brpc_dest_addr.hostname, _brpc_dest_addr.port); |
89 | 0 | LOG(WARNING) << msg; |
90 | 0 | return Status::InternalError(msg); |
91 | 0 | } |
92 | | |
93 | 3.60M | if (config::enable_brpc_connection_check) { |
94 | 3.60M | state->get_query_ctx()->add_using_brpc_stub(_brpc_dest_addr, _brpc_stub); |
95 | 3.60M | } |
96 | 3.58M | return Status::OK(); |
97 | 3.58M | } |
98 | | |
99 | 5.68M | Status Channel::open(RuntimeState* state) { |
100 | 5.68M | if (_is_local) { |
101 | 2.43M | RETURN_IF_ERROR(_find_local_recvr(state)); |
102 | 2.43M | } |
103 | 5.68M | _be_number = state->be_number(); |
104 | 5.68M | _brpc_timeout_ms = get_execution_rpc_timeout_ms(state->execution_timeout()); |
105 | 5.68M | _serializer.set_is_local(_is_local); |
106 | | |
107 | | // In bucket shuffle join will set fragment_instance_id (-1, -1) |
108 | | // to build a camouflaged empty channel. the ip and port is '0.0.0.0:0" |
109 | | // so the empty channel not need call function close_internal() |
110 | 5.71M | _need_close = (_fragment_instance_id.hi != -1 && _fragment_instance_id.lo != -1); |
111 | | |
112 | 5.68M | return Status::OK(); |
113 | 5.68M | } |
114 | | |
115 | 2.42M | Status Channel::_find_local_recvr(RuntimeState* state) { |
116 | 2.42M | auto st = _parent->state()->exec_env()->vstream_mgr()->find_recvr(_fragment_instance_id, |
117 | 2.42M | _dest_node_id, &_local_recvr); |
118 | 2.42M | if (!st.ok()) { |
119 | | // If could not find local receiver, then it means the channel is EOF. |
120 | | // Maybe downstream task is finished already. |
121 | | //if (_receiver_status.ok()) { |
122 | | // _receiver_status = Status::EndOfFile("local data stream receiver is deconstructed"); |
123 | | //} |
124 | 8.33k | LOG(INFO) << "Query: " << print_id(state->query_id()) |
125 | 8.33k | << " recvr is not found, maybe downstream task is finished. error st is: " |
126 | 8.33k | << st.to_string(); |
127 | 8.33k | } |
128 | 2.42M | return Status::OK(); |
129 | 2.42M | } |
130 | | Status Channel::add_rows(Block* block, const uint32_t* data, const uint32_t offset, |
131 | 5.34M | const uint32_t size, bool eos) { |
132 | 5.34M | if (_fragment_instance_id.lo == -1) { |
133 | 4.99k | return Status::OK(); |
134 | 4.99k | } |
135 | | |
136 | 5.33M | bool serialized = false; |
137 | 5.33M | if (_pblock == nullptr) { |
138 | 5.26M | _pblock = std::make_unique<PBlock>(); |
139 | 5.26M | } |
140 | 5.33M | RETURN_IF_ERROR(_serializer.next_serialized_block(block, _pblock.get(), 1, &serialized, eos, |
141 | 5.33M | data, offset, size)); |
142 | 5.33M | if (serialized) { |
143 | 5.16M | RETURN_IF_ERROR(_send_current_block(eos)); |
144 | 5.16M | } |
145 | | |
146 | 5.31M | return Status::OK(); |
147 | 5.33M | } |
148 | | |
149 | 2.44M | std::shared_ptr<Dependency> Channel::get_local_channel_dependency() { |
150 | 2.44M | if (!_local_recvr) { |
151 | 8.31k | return nullptr; |
152 | 8.31k | } |
153 | 2.43M | return _local_recvr->get_local_channel_dependency(_parent->sender_id()); |
154 | 2.44M | } |
155 | | |
156 | 12.8M | int64_t Channel::mem_usage() const { |
157 | 12.8M | return _serializer.mem_usage(); |
158 | 12.8M | } |
159 | | |
160 | 6.61M | Status Channel::send_remote_block(std::unique_ptr<PBlock>&& block, bool eos) { |
161 | 6.61M | COUNTER_UPDATE(_parent->blocks_sent_counter(), 1); |
162 | | |
163 | 6.71M | if (eos) { |
164 | 6.71M | if (_eos_send) { |
165 | 3.32M | return Status::OK(); |
166 | 3.39M | } else { |
167 | 3.39M | _eos_send = true; |
168 | 3.39M | } |
169 | 6.71M | } |
170 | 3.41M | if (eos || block->column_metas_size()) { |
171 | 3.41M | RETURN_IF_ERROR(_buffer->add_block(this, {std::move(block), eos})); |
172 | 3.41M | } |
173 | 3.28M | return Status::OK(); |
174 | 3.28M | } |
175 | | |
176 | 166k | Status Channel::send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block, bool eos) { |
177 | 166k | COUNTER_UPDATE(_parent->blocks_sent_counter(), 1); |
178 | 166k | if (eos) { |
179 | 166k | if (_eos_send) { |
180 | 0 | return Status::OK(); |
181 | 0 | } |
182 | 166k | _eos_send = true; |
183 | 166k | } |
184 | 167k | if (eos || block->get_block()->column_metas_size()) { |
185 | 167k | RETURN_IF_ERROR(_buffer->add_block(this, {block, eos})); |
186 | 167k | } |
187 | 166k | return Status::OK(); |
188 | 166k | } |
189 | | |
190 | 11.0M | Status Channel::_send_current_block(bool eos) { |
191 | 11.0M | if (is_local()) { |
192 | 4.51M | return _send_local_block(eos); |
193 | 4.51M | } |
194 | | // here _pblock maybe nullptr , but we must send the eos to the receiver |
195 | 6.51M | return send_remote_block(std::move(_pblock), eos); |
196 | 11.0M | } |
197 | | |
198 | 4.53M | Status Channel::_send_local_block(bool eos) { |
199 | 4.53M | Block block; |
200 | 4.53M | if (_serializer.get_block() != nullptr) { |
201 | 4.19M | block = _serializer.get_block()->to_block(); |
202 | 4.19M | _serializer.get_block()->set_mutable_columns(block.clone_empty_columns()); |
203 | 4.19M | } |
204 | | |
205 | 4.54M | if (!block.empty() || eos) { // if eos is true, we MUST to send an empty block |
206 | 4.54M | RETURN_IF_ERROR(send_local_block(&block, eos, true)); |
207 | 4.54M | } |
208 | 4.52M | return Status::OK(); |
209 | 4.53M | } |
210 | | |
211 | 4.93M | Status Channel::send_local_block(Block* block, bool eos, bool can_be_moved) { |
212 | 4.93M | SCOPED_TIMER(_parent->local_send_timer()); |
213 | | |
214 | 4.93M | if (eos) { |
215 | 4.87M | if (_eos_send) { |
216 | 2.41M | return Status::OK(); |
217 | 2.45M | } else { |
218 | 2.45M | _eos_send = true; |
219 | 2.45M | } |
220 | 4.87M | } |
221 | | |
222 | 2.52M | if (is_receiver_eof()) { |
223 | 0 | return _receiver_status; |
224 | 0 | } |
225 | 2.52M | auto receiver_status = _recvr_status(); |
226 | | // _local_recvr depdend on ExchangeLocalState* _parent to do some memory counter settings |
227 | | // but it only owns a raw pointer, so that the ExchangeLocalState object may be deconstructed. |
228 | | // Lock the fragment context to ensure the runtime state and other objects are not deconstructed |
229 | 2.52M | TaskExecutionContextSPtr ctx_lock = nullptr; |
230 | 2.54M | if (receiver_status.ok() && _local_recvr != nullptr) { |
231 | 2.54M | ctx_lock = _local_recvr->task_exec_ctx(); |
232 | | // Do not return internal error, because when query finished, the downstream node |
233 | | // may finish before upstream node. And the object maybe deconstructed. If return error |
234 | | // then the upstream node may report error status to FE, the query is failed. |
235 | 2.54M | if (ctx_lock == nullptr) { |
236 | 0 | receiver_status = Status::EndOfFile("local data stream receiver is deconstructed"); |
237 | 0 | } |
238 | 2.54M | } |
239 | 2.55M | if (receiver_status.ok()) { |
240 | 2.55M | COUNTER_UPDATE(_parent->local_bytes_send_counter(), block->bytes()); |
241 | 2.55M | COUNTER_UPDATE(_parent->local_sent_rows(), block->rows()); |
242 | 2.55M | COUNTER_UPDATE(_parent->blocks_sent_counter(), 1); |
243 | | |
244 | 2.55M | const auto sender_id = _parent->sender_id(); |
245 | 2.55M | if (!block->empty()) [[likely]] { |
246 | 174k | _local_recvr->add_block(block, sender_id, can_be_moved); |
247 | 174k | } |
248 | | |
249 | 2.55M | if (eos) [[unlikely]] { |
250 | 2.44M | _local_recvr->remove_sender(sender_id, _be_number, Status::OK()); |
251 | 2.44M | _parent->on_channel_finished(_fragment_instance_id.lo); |
252 | 2.44M | } |
253 | 2.55M | return Status::OK(); |
254 | 18.4E | } else { |
255 | 18.4E | _receiver_status = std::move(receiver_status); |
256 | 18.4E | _parent->on_channel_finished(_fragment_instance_id.lo); |
257 | 18.4E | return _receiver_status; |
258 | 18.4E | } |
259 | 2.52M | } |
260 | | |
261 | 0 | std::string Channel::debug_string() const { |
262 | 0 | fmt::memory_buffer debug_string_buffer; |
263 | 0 | fmt::format_to(debug_string_buffer, |
264 | 0 | "fragment_instance_id: {}, _dest_node_id: {}, _is_local: {}, _receiver_status: " |
265 | 0 | "{}, _closed: {}, _need_close: {}, _be_number: {}, _eos_send: {}", |
266 | 0 | print_id(_fragment_instance_id), _dest_node_id, _is_local, |
267 | 0 | _receiver_status.to_string(), _closed, _need_close, _be_number, _eos_send); |
268 | 0 | if (_is_local) { |
269 | 0 | fmt::format_to(debug_string_buffer, "_local_recvr: {}", _local_recvr->debug_string()); |
270 | 0 | } |
271 | 0 | return fmt::to_string(debug_string_buffer); |
272 | 0 | } |
273 | | |
274 | 5.99M | Status Channel::close(RuntimeState* state) { |
275 | 5.99M | if (_closed) { |
276 | 25.3k | return Status::OK(); |
277 | 25.3k | } |
278 | 5.97M | _closed = true; |
279 | 5.97M | if (!_need_close) { |
280 | 53 | return Status::OK(); |
281 | 53 | } |
282 | | |
283 | 5.97M | if (is_receiver_eof()) { |
284 | 17.6k | _serializer.reset_block(); |
285 | 17.6k | return Status::OK(); |
286 | 5.95M | } else { |
287 | 5.95M | return _send_current_block(true); |
288 | 5.95M | } |
289 | 5.97M | } |
290 | | |
291 | | BlockSerializer::BlockSerializer(ExchangeSinkLocalState* parent, bool is_local) |
292 | 6.61M | : _parent(parent), _is_local(is_local), _batch_size(parent->state()->batch_size()) {} |
293 | | |
294 | | Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, size_t num_receivers, |
295 | | bool* serialized, bool eos, const uint32_t* data, |
296 | 5.48M | const uint32_t offset, const uint32_t size) { |
297 | 5.48M | if (_mutable_block == nullptr) { |
298 | 5.36M | _mutable_block = MutableBlock::create_unique(block->clone_empty()); |
299 | 5.36M | } |
300 | | |
301 | 5.48M | { |
302 | 5.48M | SCOPED_TIMER(_parent->merge_block_timer()); |
303 | 5.48M | if (data) { |
304 | 5.35M | if (size > 0) { |
305 | 91.2k | RETURN_IF_ERROR( |
306 | 91.2k | _mutable_block->add_rows(block, data + offset, data + offset + size)); |
307 | 91.2k | } |
308 | 5.35M | } else if (!block->empty()) { |
309 | 49.7k | RETURN_IF_ERROR(_mutable_block->merge(*block)); |
310 | 49.7k | } |
311 | 5.48M | } |
312 | | |
313 | 5.50M | if (_mutable_block->rows() >= _batch_size || eos || |
314 | 5.48M | (_mutable_block->rows() > 0 && _mutable_block->allocated_bytes() > _buffer_mem_limit)) { |
315 | 5.38M | if (!_is_local) { |
316 | 3.13M | RETURN_IF_ERROR(_serialize_block(dest, num_receivers)); |
317 | 3.13M | } |
318 | 5.38M | *serialized = true; |
319 | 5.38M | } else { |
320 | 106k | *serialized = false; |
321 | 106k | } |
322 | 5.48M | return Status::OK(); |
323 | 5.48M | } |
324 | | |
325 | 3.12M | Status BlockSerializer::_serialize_block(PBlock* dest, size_t num_receivers) { |
326 | 3.13M | if (_mutable_block && _mutable_block->rows() > 0) { |
327 | 24.8k | auto block = _mutable_block->to_block(); |
328 | 24.8k | RETURN_IF_ERROR(serialize_block(&block, dest, num_receivers)); |
329 | 24.8k | if (_parent->state()->low_memory_mode()) { |
330 | 0 | reset_block(); |
331 | 24.8k | } else { |
332 | 24.8k | block.clear_column_data(); |
333 | 24.8k | _mutable_block->set_mutable_columns(block.mutate_columns()); |
334 | 24.8k | } |
335 | 24.8k | } |
336 | | |
337 | 3.12M | return Status::OK(); |
338 | 3.12M | } |
339 | | |
340 | 70.2k | Status BlockSerializer::serialize_block(const Block* src, PBlock* dest, size_t num_receivers) { |
341 | 70.2k | SCOPED_TIMER(_parent->_serialize_batch_timer); |
342 | 70.2k | dest->Clear(); |
343 | 70.2k | size_t uncompressed_bytes = 0, compressed_bytes = 0; |
344 | 70.2k | int64_t compress_time = 0; |
345 | 70.2k | RETURN_IF_ERROR(src->serialize(_parent->_state->be_exec_version(), dest, &uncompressed_bytes, |
346 | 70.2k | &compressed_bytes, &compress_time, _parent->compression_type(), |
347 | 70.2k | _parent->transfer_large_data_by_brpc())); |
348 | 70.2k | COUNTER_UPDATE(_parent->_bytes_sent_counter, compressed_bytes * num_receivers); |
349 | 70.2k | COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, uncompressed_bytes * num_receivers); |
350 | 70.2k | COUNTER_UPDATE(_parent->_compress_timer, compress_time); |
351 | 70.2k | #ifndef BE_TEST |
352 | 70.2k | _parent->state()->get_query_ctx()->resource_ctx()->io_context()->update_shuffle_send_bytes( |
353 | 70.2k | compressed_bytes * num_receivers); |
354 | 70.2k | _parent->state()->get_query_ctx()->resource_ctx()->io_context()->update_shuffle_send_rows( |
355 | 70.2k | src->rows() * num_receivers); |
356 | 70.2k | #endif |
357 | | |
358 | 70.2k | return Status::OK(); |
359 | 70.2k | } |
360 | | |
361 | | } // namespace doris |