Coverage Report

Created: 2026-04-16 21:18

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/operator/exchange_sink_buffer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/operator/exchange_sink_buffer.h"
19
20
#include <brpc/controller.h>
21
#include <butil/errno.h>
22
#include <butil/iobuf_inl.h>
23
#include <fmt/format.h>
24
#include <gen_cpp/Types_types.h>
25
#include <gen_cpp/types.pb.h>
26
#include <glog/logging.h>
27
#include <google/protobuf/stubs/callback.h>
28
#include <pdqsort.h>
29
#include <stddef.h>
30
31
#include <atomic>
32
#include <cstdint>
33
#include <exception>
34
#include <functional>
35
#include <memory>
36
#include <ostream>
37
#include <utility>
38
39
#include "common/status.h"
40
#include "exec/exchange/vdata_stream_sender.h"
41
#include "exec/operator/exchange_sink_operator.h"
42
#include "exec/pipeline/pipeline_fragment_context.h"
43
#include "runtime/exec_env.h"
44
#include "runtime/thread_context.h"
45
#include "service/backend_options.h"
46
#include "util/defer_op.h"
47
#include "util/proto_util.h"
48
#include "util/time.h"
49
50
namespace doris {
51
52
2
BroadcastPBlockHolder::~BroadcastPBlockHolder() {
53
    // lock the parent queue, if the queue could lock success, then return the block
54
    // to the queue, to reuse the block
55
2
    std::shared_ptr<BroadcastPBlockHolderMemLimiter> limiter = _parent_creator.lock();
56
2
    if (limiter != nullptr) {
57
2
        limiter->release(*this);
58
2
    }
59
    // If the queue already deconstruted, then release pblock automatically since it
60
    // is a unique ptr.
61
2
}
62
63
2
void BroadcastPBlockHolderMemLimiter::acquire(BroadcastPBlockHolder& holder) {
64
2
    std::unique_lock l(_holders_lock);
65
2
    DCHECK(_broadcast_dependency != nullptr);
66
2
    holder.set_parent_creator(shared_from_this());
67
2
    auto size = holder._pblock->column_values().size();
68
2
    _total_queue_buffer_size += size;
69
2
    _total_queue_blocks_count++;
70
2
    if (_total_queue_buffer_size >= _total_queue_buffer_size_limit ||
71
2
        _total_queue_blocks_count >= _total_queue_blocks_count_limit) {
72
0
        _broadcast_dependency->block();
73
0
    }
74
2
}
75
76
2
void BroadcastPBlockHolderMemLimiter::release(const BroadcastPBlockHolder& holder) {
77
2
    std::unique_lock l(_holders_lock);
78
2
    DCHECK(_broadcast_dependency != nullptr);
79
2
    auto size = holder._pblock->column_values().size();
80
2
    _total_queue_buffer_size -= size;
81
2
    _total_queue_blocks_count--;
82
2
    if (_total_queue_buffer_size <= 0) {
83
2
        _broadcast_dependency->set_ready();
84
2
    }
85
2
}
86
87
ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id,
88
                                       PlanNodeId node_id, RuntimeState* state,
89
                                       const std::vector<InstanceLoId>& sender_ins_ids)
90
6
        : HasTaskExecutionCtx(state),
91
6
          _queue_capacity(0),
92
6
          _is_failed(false),
93
6
          _query_id(std::move(query_id)),
94
6
          _dest_node_id(dest_node_id),
95
6
          _node_id(node_id),
96
6
          _state(state),
97
6
          _context(state->get_query_ctx()),
98
6
          _exchange_sink_num(sender_ins_ids.size()),
99
6
          _send_multi_blocks(state->query_options().__isset.exchange_multi_blocks_byte_size &&
100
6
                             state->query_options().exchange_multi_blocks_byte_size > 0) {
101
6
    if (_send_multi_blocks) {
102
6
        _send_multi_blocks_byte_size = state->query_options().exchange_multi_blocks_byte_size;
103
6
    }
104
6
}
105
106
0
void ExchangeSinkBuffer::close() {
107
    // Could not clear the queue here, because there maybe a running rpc want to
108
    // get a request from the queue, and clear method will release the request
109
    // and it will core.
110
    //_instance_to_broadcast_package_queue.clear();
111
    //_instance_to_package_queue.clear();
112
    //_instance_to_request.clear();
113
0
}
114
115
14
void ExchangeSinkBuffer::construct_request(TUniqueId fragment_instance_id) {
116
14
    if (_is_failed) {
117
0
        return;
118
0
    }
119
14
    auto low_id = fragment_instance_id.lo;
120
14
    if (_rpc_instances.contains(low_id)) {
121
0
        return;
122
0
    }
123
124
    // Initialize the instance data
125
14
    auto instance_data = std::make_unique<RpcInstance>(low_id);
126
14
    instance_data->mutex = std::make_unique<std::mutex>();
127
14
    instance_data->seq = 0;
128
14
    instance_data->package_queue =
129
14
            std::unordered_map<Channel*, std::queue<TransmitInfo, std::list<TransmitInfo>>>();
130
14
    instance_data->broadcast_package_queue = std::unordered_map<
131
14
            Channel*, std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>>();
132
14
    _queue_capacity = config::exchg_buffer_queue_capacity_factor * _rpc_instances.size();
133
134
14
    PUniqueId finst_id;
135
14
    finst_id.set_hi(fragment_instance_id.hi);
136
14
    finst_id.set_lo(fragment_instance_id.lo);
137
138
14
    instance_data->rpc_channel_is_idle = true;
139
14
    instance_data->rpc_channel_is_turn_off = false;
140
141
    // Initialize request
142
14
    instance_data->request = std::make_shared<PTransmitDataParams>();
143
14
    instance_data->request->mutable_finst_id()->CopyFrom(finst_id);
144
14
    instance_data->request->mutable_query_id()->CopyFrom(_query_id);
145
14
    instance_data->request->set_node_id(_dest_node_id);
146
14
    instance_data->running_sink_count = _exchange_sink_num;
147
148
14
    _rpc_instances[low_id] = std::move(instance_data);
149
14
}
150
151
48
Status ExchangeSinkBuffer::add_block(Channel* channel, TransmitInfo&& request) {
152
48
    if (_is_failed) {
153
9
        return Status::OK();
154
9
    }
155
39
    auto ins_id = channel->dest_ins_id();
156
39
    if (!_rpc_instances.contains(ins_id)) {
157
0
        return Status::InternalError("fragment_instance_id {} not do register_sink",
158
0
                                     print_id(channel->_fragment_instance_id));
159
0
    }
160
39
    auto& instance_data = *_rpc_instances[ins_id];
161
39
    if (instance_data.rpc_channel_is_turn_off) {
162
1
        return Status::EndOfFile("receiver eof");
163
1
    }
164
38
    bool send_now = false;
165
38
    {
166
38
        std::unique_lock<std::mutex> lock(*instance_data.mutex);
167
        // Do not have in process rpc, directly send
168
38
        if (instance_data.rpc_channel_is_idle) {
169
14
            send_now = true;
170
14
            instance_data.rpc_channel_is_idle = false;
171
14
        }
172
38
        if (request.block) {
173
38
            RETURN_IF_ERROR(
174
38
                    BeExecVersionManager::check_be_exec_version(request.block->be_exec_version()));
175
38
            COUNTER_UPDATE(channel->_parent->memory_used_counter(), request.block->ByteSizeLong());
176
38
        }
177
38
        instance_data.package_queue[channel].emplace(std::move(request));
178
38
        _total_queue_size++;
179
38
        if (_total_queue_size > _queue_capacity) {
180
0
            for (auto& dep : _queue_deps) {
181
0
                dep->block();
182
0
            }
183
0
        }
184
38
    }
185
38
    if (send_now) {
186
14
        RETURN_IF_ERROR(_send_rpc(instance_data));
187
14
    }
188
189
38
    return Status::OK();
190
38
}
191
192
0
Status ExchangeSinkBuffer::add_block(Channel* channel, BroadcastTransmitInfo&& request) {
193
0
    if (_is_failed) {
194
0
        return Status::OK();
195
0
    }
196
0
    auto ins_id = channel->dest_ins_id();
197
0
    if (!_rpc_instances.contains(ins_id)) {
198
0
        return Status::InternalError("fragment_instance_id {} not do register_sink",
199
0
                                     print_id(channel->_fragment_instance_id));
200
0
    }
201
0
    auto& instance_data = *_rpc_instances[ins_id];
202
0
    if (instance_data.rpc_channel_is_turn_off) {
203
0
        return Status::EndOfFile("receiver eof");
204
0
    }
205
0
    bool send_now = false;
206
0
    {
207
0
        std::unique_lock<std::mutex> lock(*instance_data.mutex);
208
        // Do not have in process rpc, directly send
209
0
        if (instance_data.rpc_channel_is_idle) {
210
0
            send_now = true;
211
0
            instance_data.rpc_channel_is_idle = false;
212
0
        }
213
0
        if (request.block_holder->get_block()) {
214
0
            RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(
215
0
                    request.block_holder->get_block()->be_exec_version()));
216
0
        }
217
0
        instance_data.broadcast_package_queue[channel].emplace(request);
218
0
    }
219
0
    if (send_now) {
220
0
        RETURN_IF_ERROR(_send_rpc(instance_data));
221
0
    }
222
223
0
    return Status::OK();
224
0
}
225
226
41
Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) {
227
41
    std::unique_lock<std::mutex> lock(*(instance_data.mutex));
228
229
41
    auto& q_map = instance_data.package_queue;
230
41
    auto& broadcast_q_map = instance_data.broadcast_package_queue;
231
232
82
    auto find_max_size_queue = [](Channel*& channel, auto& ptr, auto& map) {
233
82
        for (auto& [chan, lists] : map) {
234
81
            if (!ptr) {
235
62
                if (!lists.empty()) {
236
30
                    channel = chan;
237
30
                    ptr = &lists;
238
30
                }
239
62
            } else {
240
19
                if (ptr->size() < lists.size()) {
241
0
                    channel = chan;
242
0
                    ptr = &lists;
243
0
                }
244
19
            }
245
81
        }
246
82
    };
exchange_sink_buffer.cpp:_ZZN5doris18ExchangeSinkBuffer9_send_rpcERNS_11RpcInstanceEENK3$_0clIPSt5queueINS_12TransmitInfoENSt7__cxx114listIS6_SaIS6_EEEESt13unordered_mapIPNS_7ChannelESB_St4hashISF_ESt8equal_toISF_ESaISt4pairIKSF_SB_EEEEEDaRSF_RT_RT0_
Line
Count
Source
232
41
    auto find_max_size_queue = [](Channel*& channel, auto& ptr, auto& map) {
233
81
        for (auto& [chan, lists] : map) {
234
81
            if (!ptr) {
235
62
                if (!lists.empty()) {
236
30
                    channel = chan;
237
30
                    ptr = &lists;
238
30
                }
239
62
            } else {
240
19
                if (ptr->size() < lists.size()) {
241
0
                    channel = chan;
242
0
                    ptr = &lists;
243
0
                }
244
19
            }
245
81
        }
246
41
    };
exchange_sink_buffer.cpp:_ZZN5doris18ExchangeSinkBuffer9_send_rpcERNS_11RpcInstanceEENK3$_0clIPSt5queueINS_21BroadcastTransmitInfoENSt7__cxx114listIS6_SaIS6_EEEESt13unordered_mapIPNS_7ChannelESB_St4hashISF_ESt8equal_toISF_ESaISt4pairIKSF_SB_EEEEEDaRSF_RT_RT0_
Line
Count
Source
232
41
    auto find_max_size_queue = [](Channel*& channel, auto& ptr, auto& map) {
233
41
        for (auto& [chan, lists] : map) {
234
0
            if (!ptr) {
235
0
                if (!lists.empty()) {
236
0
                    channel = chan;
237
0
                    ptr = &lists;
238
0
                }
239
0
            } else {
240
0
                if (ptr->size() < lists.size()) {
241
0
                    channel = chan;
242
0
                    ptr = &lists;
243
0
                }
244
0
            }
245
0
        }
246
41
    };
247
248
41
    Channel* channel = nullptr;
249
250
41
    std::queue<TransmitInfo, std::list<TransmitInfo>>* q_ptr = nullptr;
251
41
    find_max_size_queue(channel, q_ptr, q_map);
252
41
    std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>* broadcast_q_ptr = nullptr;
253
41
    find_max_size_queue(channel, broadcast_q_ptr, broadcast_q_map);
254
255
41
    if (_is_failed) {
256
2
        _turn_off_channel(instance_data, lock);
257
2
        return Status::OK();
258
2
    }
259
39
    if (instance_data.rpc_channel_is_turn_off) {
260
6
        return Status::OK();
261
6
    }
262
263
33
    auto mem_byte = 0;
264
33
    if (q_ptr && !q_ptr->empty()) {
265
28
        auto& q = *q_ptr;
266
267
28
        std::vector<TransmitInfo> requests(_send_multi_blocks ? q.size() : 1);
268
56
        for (int i = 0; i < requests.size(); i++) {
269
28
            requests[i] = std::move(q.front());
270
28
            q.pop();
271
272
28
            if (requests[i].block) {
273
                // make sure rpc byte size under the _send_multi_blocks_bytes_size
274
28
                mem_byte += requests[i].block->ByteSizeLong();
275
28
                if (_send_multi_blocks && mem_byte > _send_multi_blocks_byte_size) {
276
0
                    requests.resize(i + 1);
277
0
                    break;
278
0
                }
279
28
            }
280
28
        }
281
282
        // If we have data to shuffle which is not broadcasted
283
28
        auto& request = requests[0];
284
28
        auto& brpc_request = instance_data.request;
285
28
        brpc_request->set_sender_id(channel->_parent->sender_id());
286
28
        brpc_request->set_be_number(channel->_parent->be_number());
287
288
28
        if (_send_multi_blocks) {
289
0
            for (auto& req : requests) {
290
0
                if (req.block && !req.block->column_metas().empty()) {
291
0
                    auto add_block = brpc_request->add_blocks();
292
0
                    add_block->Swap(req.block.get());
293
0
                }
294
0
            }
295
28
        } else {
296
28
            if (request.block && !request.block->column_metas().empty()) {
297
0
                brpc_request->set_allocated_block(request.block.get());
298
0
            }
299
28
        }
300
301
28
        instance_data.seq += requests.size();
302
28
        brpc_request->set_packet_seq(instance_data.seq);
303
28
        brpc_request->set_eos(requests.back().eos);
304
28
        auto send_callback = channel->get_send_callback(&instance_data, requests.back().eos);
305
28
        send_callback->cntl_->set_timeout_ms(channel->_brpc_timeout_ms);
306
28
        if (config::execution_ignore_eovercrowded) {
307
28
            send_callback->cntl_->ignore_eovercrowded();
308
28
        }
309
28
        send_callback->addFailedHandler([&, weak_task_ctx = weak_task_exec_ctx()](
310
28
                                                RpcInstance* ins, const std::string& err) {
311
1
            auto task_lock = weak_task_ctx.lock();
312
1
            if (task_lock == nullptr) {
313
                // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more.
314
0
                return;
315
0
            }
316
            // attach task for memory tracker and query id when core
317
1
            SCOPED_ATTACH_TASK(_state);
318
1
            _failed(ins->id, err);
319
1
        });
320
28
        send_callback->start_rpc_time = GetCurrentTimeNanos();
321
28
        send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()](
322
28
                                                 RpcInstance* ins_ptr, const bool& eos,
323
28
                                                 const PTransmitDataResult& result,
324
28
                                                 const int64_t& start_rpc_time) {
325
27
            auto task_lock = weak_task_ctx.lock();
326
27
            if (task_lock == nullptr) {
327
                // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more.
328
0
                return;
329
0
            }
330
            // attach task for memory tracker and query id when core
331
27
            SCOPED_ATTACH_TASK(_state);
332
333
27
            auto& ins = *ins_ptr;
334
27
            auto end_rpc_time = GetCurrentTimeNanos();
335
27
            update_rpc_time(ins, start_rpc_time, end_rpc_time);
336
337
27
            Status s(Status::create(result.status()));
338
27
            if (s.is<ErrorCode::END_OF_FILE>()) {
339
2
                _set_receiver_eof(ins);
340
25
            } else if (!s.ok()) {
341
0
                _failed(ins.id,
342
0
                        fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
343
0
                return;
344
25
            } else if (eos) {
345
14
                _ended(ins);
346
14
            }
347
            // The eos here only indicates that the current exchange sink has reached eos.
348
            // However, the queue still contains data from other exchange sinks, so RPCs need to continue being sent.
349
27
            s = _send_rpc(ins);
350
27
            if (!s) {
351
0
                _failed(ins.id,
352
0
                        fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
353
0
            }
354
27
        });
355
28
        {
356
28
            auto send_remote_block_closure = AutoReleaseClosure<
357
28
                    PTransmitDataParams,
358
28
                    ExchangeSendCallback<PTransmitDataResult>>::create_unique(brpc_request,
359
28
                                                                              send_callback);
360
28
            if (enable_http_send_block(*brpc_request)) {
361
0
                RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(),
362
0
                                                      std::move(send_remote_block_closure),
363
0
                                                      channel->_brpc_dest_addr));
364
28
            } else {
365
28
                transmit_blockv2(channel->_brpc_stub.get(), std::move(send_remote_block_closure));
366
28
            }
367
28
        }
368
369
28
        if (!_send_multi_blocks && request.block) {
370
28
            static_cast<void>(brpc_request->release_block());
371
28
        } else {
372
0
            brpc_request->clear_blocks();
373
0
        }
374
28
        if (mem_byte) {
375
0
            COUNTER_UPDATE(channel->_parent->memory_used_counter(), -mem_byte);
376
0
        }
377
28
        DCHECK_GE(_total_queue_size, requests.size());
378
28
        _total_queue_size -= (int)requests.size();
379
28
        if (_total_queue_size <= _queue_capacity) {
380
28
            for (auto& dep : _queue_deps) {
381
0
                dep->set_ready();
382
0
            }
383
28
        }
384
28
    } else if (broadcast_q_ptr && !broadcast_q_ptr->empty()) {
385
0
        auto& broadcast_q = *broadcast_q_ptr;
386
        // If we have data to shuffle which is broadcasted
387
0
        std::vector<BroadcastTransmitInfo> requests(_send_multi_blocks ? broadcast_q.size() : 1);
388
0
        for (int i = 0; i < requests.size(); i++) {
389
0
            requests[i] = broadcast_q.front();
390
0
            broadcast_q.pop();
391
392
0
            if (requests[i].block_holder->get_block()) {
393
                // make sure rpc byte size under the _send_multi_blocks_bytes_size
394
0
                mem_byte += requests[i].block_holder->get_block()->ByteSizeLong();
395
0
                if (_send_multi_blocks && mem_byte > _send_multi_blocks_byte_size) {
396
0
                    requests.resize(i + 1);
397
0
                    break;
398
0
                }
399
0
            }
400
0
        }
401
402
0
        auto& request = requests[0];
403
0
        auto& brpc_request = instance_data.request;
404
0
        brpc_request->set_sender_id(channel->_parent->sender_id());
405
0
        brpc_request->set_be_number(channel->_parent->be_number());
406
407
0
        if (_send_multi_blocks) {
408
0
            for (int i = 0; i < requests.size(); i++) {
409
0
                auto& req = requests[i];
410
0
                if (auto block = req.block_holder->get_block();
411
0
                    block && !block->column_metas().empty()) {
412
0
                    auto add_block = brpc_request->add_blocks();
413
0
                    for (int j = 0; j < block->column_metas_size(); ++j) {
414
0
                        add_block->add_column_metas()->CopyFrom(block->column_metas(j));
415
0
                    }
416
0
                    add_block->set_be_exec_version(block->be_exec_version());
417
0
                    add_block->set_compressed(block->compressed());
418
0
                    add_block->set_compression_type(block->compression_type());
419
0
                    add_block->set_uncompressed_size(block->uncompressed_size());
420
0
                    add_block->set_allocated_column_values(block->mutable_column_values());
421
0
                }
422
0
            }
423
0
        } else {
424
0
            if (request.block_holder->get_block() &&
425
0
                !request.block_holder->get_block()->column_metas().empty()) {
426
0
                brpc_request->set_allocated_block(request.block_holder->get_block());
427
0
            }
428
0
        }
429
0
        instance_data.seq += requests.size();
430
0
        brpc_request->set_packet_seq(instance_data.seq);
431
0
        brpc_request->set_eos(requests.back().eos);
432
0
        auto send_callback = channel->get_send_callback(&instance_data, requests.back().eos);
433
434
0
        send_callback->cntl_->set_timeout_ms(channel->_brpc_timeout_ms);
435
0
        if (config::execution_ignore_eovercrowded) {
436
0
            send_callback->cntl_->ignore_eovercrowded();
437
0
        }
438
0
        send_callback->addFailedHandler([&, weak_task_ctx = weak_task_exec_ctx()](
439
0
                                                RpcInstance* ins, const std::string& err) {
440
0
            auto task_lock = weak_task_ctx.lock();
441
0
            if (task_lock == nullptr) {
442
                // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more.
443
0
                return;
444
0
            }
445
            // attach task for memory tracker and query id when core
446
0
            SCOPED_ATTACH_TASK(_state);
447
0
            _failed(ins->id, err);
448
0
        });
449
0
        send_callback->start_rpc_time = GetCurrentTimeNanos();
450
0
        send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()](
451
0
                                                 RpcInstance* ins_ptr, const bool& eos,
452
0
                                                 const PTransmitDataResult& result,
453
0
                                                 const int64_t& start_rpc_time) {
454
0
            auto task_lock = weak_task_ctx.lock();
455
0
            if (task_lock == nullptr) {
456
                // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more.
457
0
                return;
458
0
            }
459
            // attach task for memory tracker and query id when core
460
0
            SCOPED_ATTACH_TASK(_state);
461
0
            auto& ins = *ins_ptr;
462
0
            auto end_rpc_time = GetCurrentTimeNanos();
463
0
            update_rpc_time(ins, start_rpc_time, end_rpc_time);
464
465
0
            Status s(Status::create(result.status()));
466
0
            if (s.is<ErrorCode::END_OF_FILE>()) {
467
0
                _set_receiver_eof(ins);
468
0
            } else if (!s.ok()) {
469
0
                _failed(ins.id,
470
0
                        fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
471
0
                return;
472
0
            } else if (eos) {
473
0
                _ended(ins);
474
0
            }
475
476
            // The eos here only indicates that the current exchange sink has reached eos.
477
            // However, the queue still contains data from other exchange sinks, so RPCs need to continue being sent.
478
0
            s = _send_rpc(ins);
479
0
            if (!s) {
480
0
                _failed(ins.id,
481
0
                        fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
482
0
            }
483
0
        });
484
0
        {
485
0
            auto send_remote_block_closure = AutoReleaseClosure<
486
0
                    PTransmitDataParams,
487
0
                    ExchangeSendCallback<PTransmitDataResult>>::create_unique(brpc_request,
488
0
                                                                              send_callback);
489
0
            if (enable_http_send_block(*brpc_request)) {
490
0
                RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(),
491
0
                                                      std::move(send_remote_block_closure),
492
0
                                                      channel->_brpc_dest_addr));
493
0
            } else {
494
0
                transmit_blockv2(channel->_brpc_stub.get(), std::move(send_remote_block_closure));
495
0
            }
496
0
        }
497
0
        if (!_send_multi_blocks && request.block_holder->get_block()) {
498
0
            static_cast<void>(brpc_request->release_block());
499
0
        } else {
500
0
            for (int i = 0; i < brpc_request->mutable_blocks()->size(); ++i) {
501
0
                static_cast<void>(brpc_request->mutable_blocks(i)->release_column_values());
502
0
            }
503
0
            brpc_request->clear_blocks();
504
0
        }
505
5
    } else {
506
5
        instance_data.rpc_channel_is_idle = true;
507
5
    }
508
509
33
    return Status::OK();
510
33
}
511
512
14
void ExchangeSinkBuffer::_ended(RpcInstance& ins) {
513
14
    std::unique_lock<std::mutex> lock(*ins.mutex);
514
14
    ins.running_sink_count--;
515
14
    if (ins.running_sink_count == 0) {
516
4
        _turn_off_channel(ins, lock);
517
4
    }
518
14
}
519
520
0
void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) {
521
0
    _is_failed = true;
522
0
    LOG(INFO) << "send rpc failed, instance id: " << id << ", _dest_node_id: " << _dest_node_id
523
0
              << ", node id: " << _node_id << ", err: " << err;
524
0
    _context->cancel(Status::Cancelled(err));
525
0
}
526
527
2
void ExchangeSinkBuffer::_set_receiver_eof(RpcInstance& ins) {
528
2
    std::unique_lock<std::mutex> lock(*ins.mutex);
529
    // When the receiving side reaches eof, it means the receiver has finished early.
530
    // The remaining data in the current rpc_channel does not need to be sent,
531
    // and the rpc_channel should be turned off immediately.
532
2
    Defer turn_off([&]() { _turn_off_channel(ins, lock); });
533
534
2
    auto& broadcast_q_map = ins.broadcast_package_queue;
535
2
    for (auto& [channel, broadcast_q] : broadcast_q_map) {
536
0
        for (; !broadcast_q.empty(); broadcast_q.pop()) {
537
0
            if (broadcast_q.front().block_holder->get_block()) {
538
0
                COUNTER_UPDATE(channel->_parent->memory_used_counter(),
539
0
                               -broadcast_q.front().block_holder->get_block()->ByteSizeLong());
540
0
            }
541
0
        }
542
0
    }
543
2
    broadcast_q_map.clear();
544
545
2
    auto& q_map = ins.package_queue;
546
4
    for (auto& [channel, q] : q_map) {
547
8
        for (; !q.empty(); q.pop()) {
548
            // Must update _total_queue_size here, otherwise if _total_queue_size > _queue_capacity at EOF,
549
            // ExchangeSinkQueueDependency will be blocked and pipeline will be deadlocked
550
4
            _total_queue_size--;
551
4
            if (q.front().block) {
552
4
                COUNTER_UPDATE(channel->_parent->memory_used_counter(),
553
4
                               -q.front().block->ByteSizeLong());
554
4
            }
555
4
        }
556
4
    }
557
558
    // Try to wake up pipeline after clearing the queue
559
2
    if (_total_queue_size <= _queue_capacity) {
560
2
        for (auto& dep : _queue_deps) {
561
0
            dep->set_ready();
562
0
        }
563
2
    }
564
565
2
    q_map.clear();
566
2
}
567
568
// The unused parameter `with_lock` is to ensure that the function is called when the lock is held.
569
void ExchangeSinkBuffer::_turn_off_channel(RpcInstance& ins,
570
8
                                           std::unique_lock<std::mutex>& /*with_lock*/) {
571
8
    if (!ins.rpc_channel_is_idle) {
572
8
        ins.rpc_channel_is_idle = true;
573
8
    }
574
    // Ensure that each RPC is turned off only once.
575
8
    if (ins.rpc_channel_is_turn_off) {
576
0
        return;
577
0
    }
578
8
    ins.rpc_channel_is_turn_off = true;
579
8
    auto weak_task_ctx = weak_task_exec_ctx();
580
8
    if (auto pip_ctx = weak_task_ctx.lock()) {
581
8
        for (auto& parent : _parents) {
582
0
            parent->on_channel_finished(ins.id);
583
0
        }
584
8
    }
585
8
}
586
587
0
void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* min_time) {
588
0
    int64_t local_max_time = 0;
589
0
    int64_t local_min_time = INT64_MAX;
590
0
    for (auto& [_, ins] : _rpc_instances) {
591
0
        if (ins->stats.sum_time != 0) {
592
0
            local_max_time = std::max(local_max_time, ins->stats.sum_time);
593
0
            local_min_time = std::min(local_min_time, ins->stats.sum_time);
594
0
        }
595
0
    }
596
0
    *max_time = local_max_time;
597
0
    *min_time = local_min_time == INT64_MAX ? 0 : local_min_time;
598
0
}
599
600
0
int64_t ExchangeSinkBuffer::get_sum_rpc_time() {
601
0
    int64_t sum_time = 0;
602
0
    for (auto& [_, ins] : _rpc_instances) {
603
0
        sum_time += ins->stats.sum_time;
604
0
    }
605
0
    return sum_time;
606
0
}
607
608
void ExchangeSinkBuffer::update_rpc_time(RpcInstance& ins, int64_t start_rpc_time,
609
27
                                         int64_t receive_rpc_time) {
610
27
    _rpc_count++;
611
27
    int64_t rpc_spend_time = receive_rpc_time - start_rpc_time;
612
27
    if (rpc_spend_time > 0) {
613
27
        auto& stats = ins.stats;
614
27
        ++stats.rpc_count;
615
27
        stats.sum_time += rpc_spend_time;
616
27
        stats.max_time = std::max(stats.max_time, rpc_spend_time);
617
27
        stats.min_time = std::min(stats.min_time, rpc_spend_time);
618
27
    }
619
27
}
620
621
0
void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) {
622
0
    auto* _max_rpc_timer = ADD_TIMER_WITH_LEVEL(profile, "RpcMaxTime", 1);
623
0
    auto* _min_rpc_timer = ADD_TIMER(profile, "RpcMinTime");
624
0
    auto* _sum_rpc_timer = ADD_TIMER(profile, "RpcSumTime");
625
0
    auto* _count_rpc = ADD_COUNTER(profile, "RpcCount", TUnit::UNIT);
626
0
    auto* _avg_rpc_timer = ADD_TIMER(profile, "RpcAvgTime");
627
628
0
    int64_t max_rpc_time = 0, min_rpc_time = 0;
629
0
    get_max_min_rpc_time(&max_rpc_time, &min_rpc_time);
630
0
    _max_rpc_timer->set(max_rpc_time);
631
0
    _min_rpc_timer->set(min_rpc_time);
632
633
0
    _count_rpc->set(_rpc_count);
634
0
    int64_t sum_time = get_sum_rpc_time();
635
0
    _sum_rpc_timer->set(sum_time);
636
0
    _avg_rpc_timer->set(sum_time / std::max(static_cast<int64_t>(1), _rpc_count.load()));
637
638
0
    auto max_count = _state->rpc_verbose_profile_max_instance_count();
639
    // This counter will lead to performance degradation.
640
    // So only collect this information when the profile level is greater than 3.
641
0
    if (_state->profile_level() > 3 && max_count > 0) {
642
0
        std::vector<std::pair<InstanceLoId, RpcInstanceStatistics>> tmp_rpc_stats_vec;
643
0
        for (const auto& [id, ins] : _rpc_instances) {
644
0
            tmp_rpc_stats_vec.emplace_back(id, ins->stats);
645
0
        }
646
0
        pdqsort(tmp_rpc_stats_vec.begin(), tmp_rpc_stats_vec.end(),
647
0
                [](const auto& a, const auto& b) { return a.second.max_time > b.second.max_time; });
648
0
        auto count = std::min((size_t)max_count, tmp_rpc_stats_vec.size());
649
0
        int i = 0;
650
0
        auto* detail_profile = profile->create_child("RpcInstanceDetails", true, true);
651
0
        for (const auto& [id, stats] : tmp_rpc_stats_vec) {
652
0
            if (0 == stats.rpc_count) {
653
0
                continue;
654
0
            }
655
0
            std::stringstream out;
656
0
            out << "Instance " << std::hex << id;
657
0
            auto stats_str = fmt::format(
658
0
                    "Count: {}, MaxTime: {}, MinTime: {}, AvgTime: {}, SumTime: {}",
659
0
                    stats.rpc_count, PrettyPrinter::print(stats.max_time, TUnit::TIME_NS),
660
0
                    PrettyPrinter::print(stats.min_time, TUnit::TIME_NS),
661
0
                    PrettyPrinter::print(
662
0
                            stats.sum_time / std::max(static_cast<int64_t>(1), stats.rpc_count),
663
0
                            TUnit::TIME_NS),
664
0
                    PrettyPrinter::print(stats.sum_time, TUnit::TIME_NS));
665
0
            detail_profile->add_info_string(out.str(), stats_str);
666
0
            if (++i == count) {
667
0
                break;
668
0
            }
669
0
        }
670
0
    }
671
0
}
672
673
2
std::string ExchangeSinkBuffer::debug_each_instance_queue_size() {
674
2
    fmt::memory_buffer debug_string_buffer;
675
6
    for (auto& [id, instance_data] : _rpc_instances) {
676
6
        std::unique_lock<std::mutex> lock(*instance_data->mutex);
677
6
        auto queue_size = 0;
678
6
        for (auto& [_, list] : instance_data->package_queue) {
679
5
            queue_size += list.size();
680
5
        }
681
6
        fmt::format_to(debug_string_buffer, "Instance: {}, queue size: {}\n", id, queue_size);
682
6
    }
683
2
    return fmt::to_string(debug_string_buffer);
684
2
}
685
686
} // namespace doris