Coverage Report

Created: 2026-03-18 11:39

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/exchange/vdata_stream_sender.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/exchange/vdata_stream_sender.h"
19
20
#include <fmt/format.h>
21
#include <fmt/ranges.h> // IWYU pragma: keep
22
#include <gen_cpp/DataSinks_types.h>
23
#include <gen_cpp/Metrics_types.h>
24
#include <gen_cpp/Types_types.h>
25
#include <gen_cpp/data.pb.h>
26
#include <gen_cpp/internal_service.pb.h>
27
#include <glog/logging.h>
28
#include <stddef.h>
29
30
#include <algorithm>
31
#include <cstdint>
32
#include <functional>
33
#include <map>
34
#include <memory>
35
#include <random>
36
37
#include "common/object_pool.h"
38
#include "common/status.h"
39
#include "core/column/column_const.h"
40
#include "exec/common/sip_hash.h"
41
#include "exec/exchange/vdata_stream_mgr.h"
42
#include "exec/exchange/vdata_stream_recvr.h"
43
#include "exec/operator/exchange_sink_operator.h"
44
#include "exec/operator/result_file_sink_operator.h"
45
#include "exec/sink/vrow_distribution.h"
46
#include "exec/sink/writer/vtablet_writer_v2.h"
47
#include "exprs/vexpr.h"
48
#include "runtime/descriptors.h"
49
#include "runtime/runtime_state.h"
50
#include "runtime/thread_context.h"
51
#include "storage/tablet_info.h"
52
#include "util/proto_util.h"
53
54
namespace doris {
55
#include "common/compile_check_begin.h"
56
57
5.98M
Status Channel::init(RuntimeState* state) {
58
    // only enable_local_exchange() is true and the destination address is localhost, then the channel is local
59
5.98M
    _is_local &= state->enable_local_exchange();
60
61
5.98M
    if (_is_local) {
62
1.77M
        return Status::OK();
63
1.77M
    }
64
65
4.20M
    RETURN_IF_ERROR(_init_brpc_stub(state));
66
67
4.20M
    return Status::OK();
68
4.20M
}
69
70
4.23M
Status Channel::_init_brpc_stub(RuntimeState* state) {
71
4.23M
    if (_brpc_dest_addr.hostname.empty()) {
72
0
        LOG(WARNING) << "there is no brpc destination address's hostname"
73
0
                        ", maybe version is not compatible.";
74
0
        return Status::InternalError("no brpc destination");
75
0
    }
76
77
4.23M
    auto network_address = _brpc_dest_addr;
78
4.23M
    if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) {
79
4.23M
        _brpc_stub = state->exec_env()->brpc_internal_client_cache()->get_client(
80
4.23M
                "127.0.0.1", _brpc_dest_addr.port);
81
4.23M
        network_address.hostname = "127.0.0.1";
82
18.4E
    } else {
83
18.4E
        _brpc_stub = state->exec_env()->brpc_internal_client_cache()->get_client(_brpc_dest_addr);
84
18.4E
    }
85
86
4.23M
    if (!_brpc_stub) {
87
0
        std::string msg = fmt::format("Get rpc stub failed, dest_addr={}:{}",
88
0
                                      _brpc_dest_addr.hostname, _brpc_dest_addr.port);
89
0
        LOG(WARNING) << msg;
90
0
        return Status::InternalError(msg);
91
0
    }
92
93
4.25M
    if (config::enable_brpc_connection_check) {
94
4.25M
        state->get_query_ctx()->add_using_brpc_stub(_brpc_dest_addr, _brpc_stub);
95
4.25M
    }
96
4.23M
    return Status::OK();
97
4.23M
}
98
99
5.96M
Status Channel::open(RuntimeState* state) {
100
5.96M
    if (_is_local) {
101
1.84M
        RETURN_IF_ERROR(_find_local_recvr(state));
102
1.84M
    }
103
5.96M
    _be_number = state->be_number();
104
5.96M
    _brpc_timeout_ms = get_execution_rpc_timeout_ms(state->execution_timeout());
105
5.96M
    _serializer.set_is_local(_is_local);
106
107
    // In bucket shuffle join will set fragment_instance_id (-1, -1)
108
    // to build a camouflaged empty channel. the ip and port is '0.0.0.0:0"
109
    // so the empty channel not need call function close_internal()
110
5.96M
    _need_close = (_fragment_instance_id.hi != -1 && _fragment_instance_id.lo != -1);
111
112
5.96M
    return Status::OK();
113
5.96M
}
114
115
1.82M
Status Channel::_find_local_recvr(RuntimeState* state) {
116
1.82M
    auto st = _parent->state()->exec_env()->vstream_mgr()->find_recvr(_fragment_instance_id,
117
1.82M
                                                                      _dest_node_id, &_local_recvr);
118
1.82M
    if (!st.ok()) {
119
        // If could not find local receiver, then it means the channel is EOF.
120
        // Maybe downstream task is finished already.
121
        //if (_receiver_status.ok()) {
122
        //    _receiver_status = Status::EndOfFile("local data stream receiver is deconstructed");
123
        //}
124
5.50k
        LOG(INFO) << "Query: " << print_id(state->query_id())
125
5.50k
                  << " recvr is not found, maybe downstream task is finished. error st is: "
126
5.50k
                  << st.to_string();
127
5.50k
    }
128
1.82M
    return Status::OK();
129
1.82M
}
130
Status Channel::add_rows(Block* block, const uint32_t* data, const uint32_t offset,
131
5.51M
                         const uint32_t size, bool eos) {
132
5.51M
    if (_fragment_instance_id.lo == -1) {
133
6.99k
        return Status::OK();
134
6.99k
    }
135
136
5.51M
    bool serialized = false;
137
5.51M
    if (_pblock == nullptr) {
138
5.48M
        _pblock = std::make_unique<PBlock>();
139
5.48M
    }
140
5.51M
    RETURN_IF_ERROR(_serializer.next_serialized_block(block, _pblock.get(), 1, &serialized, eos,
141
5.51M
                                                      data, offset, size));
142
5.51M
    if (serialized) {
143
5.45M
        RETURN_IF_ERROR(_send_current_block(eos));
144
5.45M
    }
145
146
5.48M
    return Status::OK();
147
5.51M
}
148
149
1.84M
std::shared_ptr<Dependency> Channel::get_local_channel_dependency() {
150
1.84M
    if (!_local_recvr) {
151
5.50k
        return nullptr;
152
5.50k
    }
153
1.83M
    return _local_recvr->get_local_channel_dependency(_parent->sender_id());
154
1.84M
}
155
156
12.7M
int64_t Channel::mem_usage() const {
157
12.7M
    return _serializer.mem_usage();
158
12.7M
}
159
160
8.02M
Status Channel::send_remote_block(std::unique_ptr<PBlock>&& block, bool eos) {
161
8.02M
    COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
162
163
8.05M
    if (eos) {
164
8.05M
        if (_eos_send) {
165
4.02M
            return Status::OK();
166
4.03M
        } else {
167
4.03M
            _eos_send = true;
168
4.03M
        }
169
8.05M
    }
170
4.06M
    if (eos || block->column_metas_size()) {
171
4.06M
        RETURN_IF_ERROR(_buffer->add_block(this, {std::move(block), eos}));
172
4.06M
    }
173
3.98M
    return Status::OK();
174
4.00M
}
175
176
192k
Status Channel::send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block, bool eos) {
177
192k
    COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
178
193k
    if (eos) {
179
193k
        if (_eos_send) {
180
0
            return Status::OK();
181
0
        }
182
193k
        _eos_send = true;
183
193k
    }
184
193k
    if (eos || block->get_block()->column_metas_size()) {
185
193k
        RETURN_IF_ERROR(_buffer->add_block(this, {block, eos}));
186
193k
    }
187
192k
    return Status::OK();
188
192k
}
189
190
11.4M
Status Channel::_send_current_block(bool eos) {
191
11.4M
    if (is_local()) {
192
3.45M
        return _send_local_block(eos);
193
3.45M
    }
194
    // here _pblock maybe nullptr , but we must send the eos to the receiver
195
7.96M
    return send_remote_block(std::move(_pblock), eos);
196
11.4M
}
197
198
3.45M
Status Channel::_send_local_block(bool eos) {
199
3.45M
    Block block;
200
3.45M
    if (_serializer.get_block() != nullptr) {
201
3.24M
        block = _serializer.get_block()->to_block();
202
3.24M
        _serializer.get_block()->set_mutable_columns(block.clone_empty_columns());
203
3.24M
    }
204
205
3.46M
    if (!block.empty() || eos) { // if eos is true, we MUST to send an empty block
206
3.46M
        RETURN_IF_ERROR(send_local_block(&block, eos, true));
207
3.46M
    }
208
3.44M
    return Status::OK();
209
3.45M
}
210
211
3.67M
Status Channel::send_local_block(Block* block, bool eos, bool can_be_moved) {
212
3.67M
    SCOPED_TIMER(_parent->local_send_timer());
213
214
3.67M
    if (eos) {
215
3.63M
        if (_eos_send) {
216
1.78M
            return Status::OK();
217
1.85M
        } else {
218
1.85M
            _eos_send = true;
219
1.85M
        }
220
3.63M
    }
221
222
1.89M
    if (is_receiver_eof()) {
223
0
        return _receiver_status;
224
0
    }
225
1.89M
    auto receiver_status = _recvr_status();
226
    // _local_recvr depdend on ExchangeLocalState* _parent to do some memory counter settings
227
    // but it only owns a raw pointer, so that the ExchangeLocalState object may be deconstructed.
228
    // Lock the fragment context to ensure the runtime state and other objects are not deconstructed
229
1.89M
    TaskExecutionContextSPtr ctx_lock = nullptr;
230
1.89M
    if (receiver_status.ok() && _local_recvr != nullptr) {
231
1.89M
        ctx_lock = _local_recvr->task_exec_ctx();
232
        // Do not return internal error, because when query finished, the downstream node
233
        // may finish before upstream node. And the object maybe deconstructed. If return error
234
        // then the upstream node may report error status to FE, the query is failed.
235
1.89M
        if (ctx_lock == nullptr) {
236
0
            receiver_status = Status::EndOfFile("local data stream receiver is deconstructed");
237
0
        }
238
1.89M
    }
239
1.89M
    if (receiver_status.ok()) {
240
1.89M
        COUNTER_UPDATE(_parent->local_bytes_send_counter(), block->bytes());
241
1.89M
        COUNTER_UPDATE(_parent->local_sent_rows(), block->rows());
242
1.89M
        COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
243
244
1.89M
        const auto sender_id = _parent->sender_id();
245
1.89M
        if (!block->empty()) [[likely]] {
246
87.4k
            _local_recvr->add_block(block, sender_id, can_be_moved);
247
87.4k
        }
248
249
1.89M
        if (eos) [[unlikely]] {
250
1.84M
            _local_recvr->remove_sender(sender_id, _be_number, Status::OK());
251
1.84M
            _parent->on_channel_finished(_fragment_instance_id.lo);
252
1.84M
        }
253
1.89M
        return Status::OK();
254
18.4E
    } else {
255
18.4E
        _receiver_status = std::move(receiver_status);
256
18.4E
        _parent->on_channel_finished(_fragment_instance_id.lo);
257
18.4E
        return _receiver_status;
258
18.4E
    }
259
1.89M
}
260
261
0
std::string Channel::debug_string() const {
262
0
    fmt::memory_buffer debug_string_buffer;
263
0
    fmt::format_to(debug_string_buffer,
264
0
                   "fragment_instance_id: {}, _dest_node_id: {}, _is_local: {}, _receiver_status: "
265
0
                   "{}, _closed: {}, _need_close: {}, _be_number: {}, _eos_send: {}",
266
0
                   print_id(_fragment_instance_id), _dest_node_id, _is_local,
267
0
                   _receiver_status.to_string(), _closed, _need_close, _be_number, _eos_send);
268
0
    if (_is_local) {
269
0
        fmt::format_to(debug_string_buffer, "_local_recvr: {}", _local_recvr->debug_string());
270
0
    }
271
0
    return fmt::to_string(debug_string_buffer);
272
0
}
273
274
6.07M
Status Channel::close(RuntimeState* state) {
275
6.07M
    if (_closed) {
276
33.3k
        return Status::OK();
277
33.3k
    }
278
6.04M
    _closed = true;
279
6.04M
    if (!_need_close) {
280
74
        return Status::OK();
281
74
    }
282
283
6.04M
    if (is_receiver_eof()) {
284
25.9k
        _serializer.reset_block();
285
25.9k
        return Status::OK();
286
6.01M
    } else {
287
6.01M
        return _send_current_block(true);
288
6.01M
    }
289
6.04M
}
290
291
BlockSerializer::BlockSerializer(ExchangeSinkLocalState* parent, bool is_local)
292
6.56M
        : _parent(parent), _is_local(is_local), _batch_size(parent->state()->batch_size()) {}
293
294
Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, size_t num_receivers,
295
                                              bool* serialized, bool eos, const uint32_t* data,
296
5.72M
                                              const uint32_t offset, const uint32_t size) {
297
5.72M
    if (_mutable_block == nullptr) {
298
5.62M
        _mutable_block = MutableBlock::create_unique(block->clone_empty());
299
5.62M
    }
300
301
5.72M
    {
302
5.72M
        SCOPED_TIMER(_parent->merge_block_timer());
303
5.72M
        if (data) {
304
5.52M
            if (size > 0) {
305
45.9k
                RETURN_IF_ERROR(
306
45.9k
                        _mutable_block->add_rows(block, data + offset, data + offset + size));
307
45.9k
            }
308
5.52M
        } else if (!block->empty()) {
309
68.0k
            RETURN_IF_ERROR(_mutable_block->merge(*block));
310
68.0k
        }
311
5.72M
    }
312
313
5.75M
    if (_mutable_block->rows() >= _batch_size || eos ||
314
5.72M
        (_mutable_block->rows() > 0 && _mutable_block->allocated_bytes() > _buffer_mem_limit)) {
315
5.66M
        if (!_is_local) {
316
3.85M
            RETURN_IF_ERROR(_serialize_block(dest, num_receivers));
317
3.85M
        }
318
5.66M
        *serialized = true;
319
5.66M
    } else {
320
56.4k
        *serialized = false;
321
56.4k
    }
322
5.72M
    return Status::OK();
323
5.72M
}
324
325
3.83M
Status BlockSerializer::_serialize_block(PBlock* dest, size_t num_receivers) {
326
3.84M
    if (_mutable_block && _mutable_block->rows() > 0) {
327
29.2k
        auto block = _mutable_block->to_block();
328
29.2k
        RETURN_IF_ERROR(serialize_block(&block, dest, num_receivers));
329
29.2k
        if (_parent->state()->low_memory_mode()) {
330
0
            reset_block();
331
29.2k
        } else {
332
29.2k
            block.clear_column_data();
333
29.2k
            _mutable_block->set_mutable_columns(block.mutate_columns());
334
29.2k
        }
335
29.2k
    }
336
337
3.83M
    return Status::OK();
338
3.83M
}
339
340
85.6k
Status BlockSerializer::serialize_block(const Block* src, PBlock* dest, size_t num_receivers) {
341
85.6k
    SCOPED_TIMER(_parent->_serialize_batch_timer);
342
85.6k
    dest->Clear();
343
85.6k
    size_t uncompressed_bytes = 0, compressed_bytes = 0;
344
85.6k
    int64_t compress_time = 0;
345
85.6k
    RETURN_IF_ERROR(src->serialize(_parent->_state->be_exec_version(), dest, &uncompressed_bytes,
346
85.6k
                                   &compressed_bytes, &compress_time, _parent->compression_type(),
347
85.6k
                                   _parent->transfer_large_data_by_brpc()));
348
85.6k
    COUNTER_UPDATE(_parent->_bytes_sent_counter, compressed_bytes * num_receivers);
349
85.6k
    COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, uncompressed_bytes * num_receivers);
350
85.6k
    COUNTER_UPDATE(_parent->_compress_timer, compress_time);
351
85.6k
#ifndef BE_TEST
352
85.6k
    _parent->state()->get_query_ctx()->resource_ctx()->io_context()->update_shuffle_send_bytes(
353
85.6k
            compressed_bytes * num_receivers);
354
85.6k
    _parent->state()->get_query_ctx()->resource_ctx()->io_context()->update_shuffle_send_rows(
355
85.6k
            src->rows() * num_receivers);
356
85.6k
#endif
357
358
85.6k
    return Status::OK();
359
85.6k
}
360
361
} // namespace doris