Coverage Report

Created: 2026-03-15 08:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/operator/exchange_sink_buffer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/operator/exchange_sink_buffer.h"
19
20
#include <brpc/controller.h>
21
#include <butil/errno.h>
22
#include <butil/iobuf_inl.h>
23
#include <fmt/format.h>
24
#include <gen_cpp/Types_types.h>
25
#include <gen_cpp/types.pb.h>
26
#include <glog/logging.h>
27
#include <google/protobuf/stubs/callback.h>
28
#include <pdqsort.h>
29
#include <stddef.h>
30
31
#include <atomic>
32
#include <cstdint>
33
#include <exception>
34
#include <functional>
35
#include <memory>
36
#include <ostream>
37
#include <utility>
38
39
#include "common/status.h"
40
#include "exec/exchange/vdata_stream_sender.h"
41
#include "exec/operator/exchange_sink_operator.h"
42
#include "exec/pipeline/pipeline_fragment_context.h"
43
#include "runtime/exec_env.h"
44
#include "runtime/thread_context.h"
45
#include "service/backend_options.h"
46
#include "util/defer_op.h"
47
#include "util/proto_util.h"
48
#include "util/time.h"
49
50
namespace doris {
51
#include "common/compile_check_begin.h"
52
53
2
BroadcastPBlockHolder::~BroadcastPBlockHolder() {
54
    // lock the parent queue, if the queue could lock success, then return the block
55
    // to the queue, to reuse the block
56
2
    std::shared_ptr<BroadcastPBlockHolderMemLimiter> limiter = _parent_creator.lock();
57
2
    if (limiter != nullptr) {
58
2
        limiter->release(*this);
59
2
    }
60
    // If the queue already deconstruted, then release pblock automatically since it
61
    // is a unique ptr.
62
2
}
63
64
2
void BroadcastPBlockHolderMemLimiter::acquire(BroadcastPBlockHolder& holder) {
65
2
    std::unique_lock l(_holders_lock);
66
2
    DCHECK(_broadcast_dependency != nullptr);
67
2
    holder.set_parent_creator(shared_from_this());
68
2
    auto size = holder._pblock->column_values().size();
69
2
    _total_queue_buffer_size += size;
70
2
    _total_queue_blocks_count++;
71
2
    if (_total_queue_buffer_size >= _total_queue_buffer_size_limit ||
72
2
        _total_queue_blocks_count >= _total_queue_blocks_count_limit) {
73
0
        _broadcast_dependency->block();
74
0
    }
75
2
}
76
77
2
void BroadcastPBlockHolderMemLimiter::release(const BroadcastPBlockHolder& holder) {
78
2
    std::unique_lock l(_holders_lock);
79
2
    DCHECK(_broadcast_dependency != nullptr);
80
2
    auto size = holder._pblock->column_values().size();
81
2
    _total_queue_buffer_size -= size;
82
2
    _total_queue_blocks_count--;
83
2
    if (_total_queue_buffer_size <= 0) {
84
2
        _broadcast_dependency->set_ready();
85
2
    }
86
2
}
87
88
ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id,
89
                                       PlanNodeId node_id, RuntimeState* state,
90
                                       const std::vector<InstanceLoId>& sender_ins_ids)
91
6
        : HasTaskExecutionCtx(state),
92
6
          _queue_capacity(0),
93
6
          _is_failed(false),
94
6
          _query_id(std::move(query_id)),
95
6
          _dest_node_id(dest_node_id),
96
6
          _node_id(node_id),
97
6
          _state(state),
98
6
          _context(state->get_query_ctx()),
99
6
          _exchange_sink_num(sender_ins_ids.size()),
100
6
          _send_multi_blocks(state->query_options().__isset.exchange_multi_blocks_byte_size &&
101
6
                             state->query_options().exchange_multi_blocks_byte_size > 0) {
102
6
    if (_send_multi_blocks) {
103
6
        _send_multi_blocks_byte_size = state->query_options().exchange_multi_blocks_byte_size;
104
6
    }
105
6
}
106
107
0
void ExchangeSinkBuffer::close() {
108
    // Could not clear the queue here, because there maybe a running rpc want to
109
    // get a request from the queue, and clear method will release the request
110
    // and it will core.
111
    //_instance_to_broadcast_package_queue.clear();
112
    //_instance_to_package_queue.clear();
113
    //_instance_to_request.clear();
114
0
}
115
116
14
void ExchangeSinkBuffer::construct_request(TUniqueId fragment_instance_id) {
117
14
    if (_is_failed) {
118
0
        return;
119
0
    }
120
14
    auto low_id = fragment_instance_id.lo;
121
14
    if (_rpc_instances.contains(low_id)) {
122
0
        return;
123
0
    }
124
125
    // Initialize the instance data
126
14
    auto instance_data = std::make_unique<RpcInstance>(low_id);
127
14
    instance_data->mutex = std::make_unique<std::mutex>();
128
14
    instance_data->seq = 0;
129
14
    instance_data->package_queue =
130
14
            std::unordered_map<Channel*, std::queue<TransmitInfo, std::list<TransmitInfo>>>();
131
14
    instance_data->broadcast_package_queue = std::unordered_map<
132
14
            Channel*, std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>>();
133
14
    _queue_capacity = config::exchg_buffer_queue_capacity_factor * _rpc_instances.size();
134
135
14
    PUniqueId finst_id;
136
14
    finst_id.set_hi(fragment_instance_id.hi);
137
14
    finst_id.set_lo(fragment_instance_id.lo);
138
139
14
    instance_data->rpc_channel_is_idle = true;
140
14
    instance_data->rpc_channel_is_turn_off = false;
141
142
    // Initialize request
143
14
    instance_data->request = std::make_shared<PTransmitDataParams>();
144
14
    instance_data->request->mutable_finst_id()->CopyFrom(finst_id);
145
14
    instance_data->request->mutable_query_id()->CopyFrom(_query_id);
146
14
    instance_data->request->set_node_id(_dest_node_id);
147
14
    instance_data->running_sink_count = _exchange_sink_num;
148
149
14
    _rpc_instances[low_id] = std::move(instance_data);
150
14
}
151
152
48
Status ExchangeSinkBuffer::add_block(Channel* channel, TransmitInfo&& request) {
153
48
    if (_is_failed) {
154
9
        return Status::OK();
155
9
    }
156
39
    auto ins_id = channel->dest_ins_id();
157
39
    if (!_rpc_instances.contains(ins_id)) {
158
0
        return Status::InternalError("fragment_instance_id {} not do register_sink",
159
0
                                     print_id(channel->_fragment_instance_id));
160
0
    }
161
39
    auto& instance_data = *_rpc_instances[ins_id];
162
39
    if (instance_data.rpc_channel_is_turn_off) {
163
1
        return Status::EndOfFile("receiver eof");
164
1
    }
165
38
    bool send_now = false;
166
38
    {
167
38
        std::unique_lock<std::mutex> lock(*instance_data.mutex);
168
        // Do not have in process rpc, directly send
169
38
        if (instance_data.rpc_channel_is_idle) {
170
14
            send_now = true;
171
14
            instance_data.rpc_channel_is_idle = false;
172
14
        }
173
38
        if (request.block) {
174
38
            RETURN_IF_ERROR(
175
38
                    BeExecVersionManager::check_be_exec_version(request.block->be_exec_version()));
176
38
            COUNTER_UPDATE(channel->_parent->memory_used_counter(), request.block->ByteSizeLong());
177
38
        }
178
38
        instance_data.package_queue[channel].emplace(std::move(request));
179
38
        _total_queue_size++;
180
38
        if (_total_queue_size > _queue_capacity) {
181
0
            for (auto& dep : _queue_deps) {
182
0
                dep->block();
183
0
            }
184
0
        }
185
38
    }
186
38
    if (send_now) {
187
14
        RETURN_IF_ERROR(_send_rpc(instance_data));
188
14
    }
189
190
38
    return Status::OK();
191
38
}
192
193
0
Status ExchangeSinkBuffer::add_block(Channel* channel, BroadcastTransmitInfo&& request) {
194
0
    if (_is_failed) {
195
0
        return Status::OK();
196
0
    }
197
0
    auto ins_id = channel->dest_ins_id();
198
0
    if (!_rpc_instances.contains(ins_id)) {
199
0
        return Status::InternalError("fragment_instance_id {} not do register_sink",
200
0
                                     print_id(channel->_fragment_instance_id));
201
0
    }
202
0
    auto& instance_data = *_rpc_instances[ins_id];
203
0
    if (instance_data.rpc_channel_is_turn_off) {
204
0
        return Status::EndOfFile("receiver eof");
205
0
    }
206
0
    bool send_now = false;
207
0
    {
208
0
        std::unique_lock<std::mutex> lock(*instance_data.mutex);
209
        // Do not have in process rpc, directly send
210
0
        if (instance_data.rpc_channel_is_idle) {
211
0
            send_now = true;
212
0
            instance_data.rpc_channel_is_idle = false;
213
0
        }
214
0
        if (request.block_holder->get_block()) {
215
0
            RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(
216
0
                    request.block_holder->get_block()->be_exec_version()));
217
0
        }
218
0
        instance_data.broadcast_package_queue[channel].emplace(request);
219
0
    }
220
0
    if (send_now) {
221
0
        RETURN_IF_ERROR(_send_rpc(instance_data));
222
0
    }
223
224
0
    return Status::OK();
225
0
}
226
227
41
Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) {
228
41
    std::unique_lock<std::mutex> lock(*(instance_data.mutex));
229
230
41
    auto& q_map = instance_data.package_queue;
231
41
    auto& broadcast_q_map = instance_data.broadcast_package_queue;
232
233
82
    auto find_max_size_queue = [](Channel*& channel, auto& ptr, auto& map) {
234
82
        for (auto& [chan, lists] : map) {
235
81
            if (!ptr) {
236
62
                if (!lists.empty()) {
237
30
                    channel = chan;
238
30
                    ptr = &lists;
239
30
                }
240
62
            } else {
241
19
                if (ptr->size() < lists.size()) {
242
0
                    channel = chan;
243
0
                    ptr = &lists;
244
0
                }
245
19
            }
246
81
        }
247
82
    };
exchange_sink_buffer.cpp:_ZZN5doris18ExchangeSinkBuffer9_send_rpcERNS_11RpcInstanceEENK3$_0clIPSt5queueINS_12TransmitInfoENSt7__cxx114listIS6_SaIS6_EEEESt13unordered_mapIPNS_7ChannelESB_St4hashISF_ESt8equal_toISF_ESaISt4pairIKSF_SB_EEEEEDaRSF_RT_RT0_
Line
Count
Source
233
41
    auto find_max_size_queue = [](Channel*& channel, auto& ptr, auto& map) {
234
81
        for (auto& [chan, lists] : map) {
235
81
            if (!ptr) {
236
62
                if (!lists.empty()) {
237
30
                    channel = chan;
238
30
                    ptr = &lists;
239
30
                }
240
62
            } else {
241
19
                if (ptr->size() < lists.size()) {
242
0
                    channel = chan;
243
0
                    ptr = &lists;
244
0
                }
245
19
            }
246
81
        }
247
41
    };
exchange_sink_buffer.cpp:_ZZN5doris18ExchangeSinkBuffer9_send_rpcERNS_11RpcInstanceEENK3$_0clIPSt5queueINS_21BroadcastTransmitInfoENSt7__cxx114listIS6_SaIS6_EEEESt13unordered_mapIPNS_7ChannelESB_St4hashISF_ESt8equal_toISF_ESaISt4pairIKSF_SB_EEEEEDaRSF_RT_RT0_
Line
Count
Source
233
41
    auto find_max_size_queue = [](Channel*& channel, auto& ptr, auto& map) {
234
41
        for (auto& [chan, lists] : map) {
235
0
            if (!ptr) {
236
0
                if (!lists.empty()) {
237
0
                    channel = chan;
238
0
                    ptr = &lists;
239
0
                }
240
0
            } else {
241
0
                if (ptr->size() < lists.size()) {
242
0
                    channel = chan;
243
0
                    ptr = &lists;
244
0
                }
245
0
            }
246
0
        }
247
41
    };
248
249
41
    Channel* channel = nullptr;
250
251
41
    std::queue<TransmitInfo, std::list<TransmitInfo>>* q_ptr = nullptr;
252
41
    find_max_size_queue(channel, q_ptr, q_map);
253
41
    std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>* broadcast_q_ptr = nullptr;
254
41
    find_max_size_queue(channel, broadcast_q_ptr, broadcast_q_map);
255
256
41
    if (_is_failed) {
257
2
        _turn_off_channel(instance_data, lock);
258
2
        return Status::OK();
259
2
    }
260
39
    if (instance_data.rpc_channel_is_turn_off) {
261
6
        return Status::OK();
262
6
    }
263
264
33
    auto mem_byte = 0;
265
33
    if (q_ptr && !q_ptr->empty()) {
266
28
        auto& q = *q_ptr;
267
268
28
        std::vector<TransmitInfo> requests(_send_multi_blocks ? q.size() : 1);
269
56
        for (int i = 0; i < requests.size(); i++) {
270
28
            requests[i] = std::move(q.front());
271
28
            q.pop();
272
273
28
            if (requests[i].block) {
274
                // make sure rpc byte size under the _send_multi_blocks_bytes_size
275
28
                mem_byte += requests[i].block->ByteSizeLong();
276
28
                if (_send_multi_blocks && mem_byte > _send_multi_blocks_byte_size) {
277
0
                    requests.resize(i + 1);
278
0
                    break;
279
0
                }
280
28
            }
281
28
        }
282
283
        // If we have data to shuffle which is not broadcasted
284
28
        auto& request = requests[0];
285
28
        auto& brpc_request = instance_data.request;
286
28
        brpc_request->set_sender_id(channel->_parent->sender_id());
287
28
        brpc_request->set_be_number(channel->_parent->be_number());
288
289
28
        if (_send_multi_blocks) {
290
0
            for (auto& req : requests) {
291
0
                if (req.block && !req.block->column_metas().empty()) {
292
0
                    auto add_block = brpc_request->add_blocks();
293
0
                    add_block->Swap(req.block.get());
294
0
                }
295
0
            }
296
28
        } else {
297
28
            if (request.block && !request.block->column_metas().empty()) {
298
0
                brpc_request->set_allocated_block(request.block.get());
299
0
            }
300
28
        }
301
302
28
        instance_data.seq += requests.size();
303
28
        brpc_request->set_packet_seq(instance_data.seq);
304
28
        brpc_request->set_eos(requests.back().eos);
305
28
        auto send_callback = channel->get_send_callback(&instance_data, requests.back().eos);
306
28
        send_callback->cntl_->set_timeout_ms(channel->_brpc_timeout_ms);
307
28
        if (config::execution_ignore_eovercrowded) {
308
28
            send_callback->cntl_->ignore_eovercrowded();
309
28
        }
310
28
        send_callback->addFailedHandler([&, weak_task_ctx = weak_task_exec_ctx()](
311
28
                                                RpcInstance* ins, const std::string& err) {
312
1
            auto task_lock = weak_task_ctx.lock();
313
1
            if (task_lock == nullptr) {
314
                // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more.
315
0
                return;
316
0
            }
317
            // attach task for memory tracker and query id when core
318
1
            SCOPED_ATTACH_TASK(_state);
319
1
            _failed(ins->id, err);
320
1
        });
321
28
        send_callback->start_rpc_time = GetCurrentTimeNanos();
322
28
        send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()](
323
28
                                                 RpcInstance* ins_ptr, const bool& eos,
324
28
                                                 const PTransmitDataResult& result,
325
28
                                                 const int64_t& start_rpc_time) {
326
27
            auto task_lock = weak_task_ctx.lock();
327
27
            if (task_lock == nullptr) {
328
                // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more.
329
0
                return;
330
0
            }
331
            // attach task for memory tracker and query id when core
332
27
            SCOPED_ATTACH_TASK(_state);
333
334
27
            auto& ins = *ins_ptr;
335
27
            auto end_rpc_time = GetCurrentTimeNanos();
336
27
            update_rpc_time(ins, start_rpc_time, end_rpc_time);
337
338
27
            Status s(Status::create(result.status()));
339
27
            if (s.is<ErrorCode::END_OF_FILE>()) {
340
2
                _set_receiver_eof(ins);
341
25
            } else if (!s.ok()) {
342
0
                _failed(ins.id,
343
0
                        fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
344
0
                return;
345
25
            } else if (eos) {
346
14
                _ended(ins);
347
14
            }
348
            // The eos here only indicates that the current exchange sink has reached eos.
349
            // However, the queue still contains data from other exchange sinks, so RPCs need to continue being sent.
350
27
            s = _send_rpc(ins);
351
27
            if (!s) {
352
0
                _failed(ins.id,
353
0
                        fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
354
0
            }
355
27
        });
356
28
        {
357
28
            auto send_remote_block_closure = AutoReleaseClosure<
358
28
                    PTransmitDataParams,
359
28
                    ExchangeSendCallback<PTransmitDataResult>>::create_unique(brpc_request,
360
28
                                                                              send_callback);
361
28
            if (enable_http_send_block(*brpc_request)) {
362
0
                RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(),
363
0
                                                      std::move(send_remote_block_closure),
364
0
                                                      channel->_brpc_dest_addr));
365
28
            } else {
366
28
                transmit_blockv2(channel->_brpc_stub.get(), std::move(send_remote_block_closure));
367
28
            }
368
28
        }
369
370
28
        if (!_send_multi_blocks && request.block) {
371
28
            static_cast<void>(brpc_request->release_block());
372
28
        } else {
373
0
            brpc_request->clear_blocks();
374
0
        }
375
28
        if (mem_byte) {
376
0
            COUNTER_UPDATE(channel->_parent->memory_used_counter(), -mem_byte);
377
0
        }
378
28
        DCHECK_GE(_total_queue_size, requests.size());
379
28
        _total_queue_size -= (int)requests.size();
380
28
        if (_total_queue_size <= _queue_capacity) {
381
28
            for (auto& dep : _queue_deps) {
382
0
                dep->set_ready();
383
0
            }
384
28
        }
385
28
    } else if (broadcast_q_ptr && !broadcast_q_ptr->empty()) {
386
0
        auto& broadcast_q = *broadcast_q_ptr;
387
        // If we have data to shuffle which is broadcasted
388
0
        std::vector<BroadcastTransmitInfo> requests(_send_multi_blocks ? broadcast_q.size() : 1);
389
0
        for (int i = 0; i < requests.size(); i++) {
390
0
            requests[i] = broadcast_q.front();
391
0
            broadcast_q.pop();
392
393
0
            if (requests[i].block_holder->get_block()) {
394
                // make sure rpc byte size under the _send_multi_blocks_bytes_size
395
0
                mem_byte += requests[i].block_holder->get_block()->ByteSizeLong();
396
0
                if (_send_multi_blocks && mem_byte > _send_multi_blocks_byte_size) {
397
0
                    requests.resize(i + 1);
398
0
                    break;
399
0
                }
400
0
            }
401
0
        }
402
403
0
        auto& request = requests[0];
404
0
        auto& brpc_request = instance_data.request;
405
0
        brpc_request->set_sender_id(channel->_parent->sender_id());
406
0
        brpc_request->set_be_number(channel->_parent->be_number());
407
408
0
        if (_send_multi_blocks) {
409
0
            for (int i = 0; i < requests.size(); i++) {
410
0
                auto& req = requests[i];
411
0
                if (auto block = req.block_holder->get_block();
412
0
                    block && !block->column_metas().empty()) {
413
0
                    auto add_block = brpc_request->add_blocks();
414
0
                    for (int j = 0; j < block->column_metas_size(); ++j) {
415
0
                        add_block->add_column_metas()->CopyFrom(block->column_metas(j));
416
0
                    }
417
0
                    add_block->set_be_exec_version(block->be_exec_version());
418
0
                    add_block->set_compressed(block->compressed());
419
0
                    add_block->set_compression_type(block->compression_type());
420
0
                    add_block->set_uncompressed_size(block->uncompressed_size());
421
0
                    add_block->set_allocated_column_values(block->mutable_column_values());
422
0
                }
423
0
            }
424
0
        } else {
425
0
            if (request.block_holder->get_block() &&
426
0
                !request.block_holder->get_block()->column_metas().empty()) {
427
0
                brpc_request->set_allocated_block(request.block_holder->get_block());
428
0
            }
429
0
        }
430
0
        instance_data.seq += requests.size();
431
0
        brpc_request->set_packet_seq(instance_data.seq);
432
0
        brpc_request->set_eos(requests.back().eos);
433
0
        auto send_callback = channel->get_send_callback(&instance_data, requests.back().eos);
434
435
0
        send_callback->cntl_->set_timeout_ms(channel->_brpc_timeout_ms);
436
0
        if (config::execution_ignore_eovercrowded) {
437
0
            send_callback->cntl_->ignore_eovercrowded();
438
0
        }
439
0
        send_callback->addFailedHandler([&, weak_task_ctx = weak_task_exec_ctx()](
440
0
                                                RpcInstance* ins, const std::string& err) {
441
0
            auto task_lock = weak_task_ctx.lock();
442
0
            if (task_lock == nullptr) {
443
                // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more.
444
0
                return;
445
0
            }
446
            // attach task for memory tracker and query id when core
447
0
            SCOPED_ATTACH_TASK(_state);
448
0
            _failed(ins->id, err);
449
0
        });
450
0
        send_callback->start_rpc_time = GetCurrentTimeNanos();
451
0
        send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()](
452
0
                                                 RpcInstance* ins_ptr, const bool& eos,
453
0
                                                 const PTransmitDataResult& result,
454
0
                                                 const int64_t& start_rpc_time) {
455
0
            auto task_lock = weak_task_ctx.lock();
456
0
            if (task_lock == nullptr) {
457
                // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more.
458
0
                return;
459
0
            }
460
            // attach task for memory tracker and query id when core
461
0
            SCOPED_ATTACH_TASK(_state);
462
0
            auto& ins = *ins_ptr;
463
0
            auto end_rpc_time = GetCurrentTimeNanos();
464
0
            update_rpc_time(ins, start_rpc_time, end_rpc_time);
465
466
0
            Status s(Status::create(result.status()));
467
0
            if (s.is<ErrorCode::END_OF_FILE>()) {
468
0
                _set_receiver_eof(ins);
469
0
            } else if (!s.ok()) {
470
0
                _failed(ins.id,
471
0
                        fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
472
0
                return;
473
0
            } else if (eos) {
474
0
                _ended(ins);
475
0
            }
476
477
            // The eos here only indicates that the current exchange sink has reached eos.
478
            // However, the queue still contains data from other exchange sinks, so RPCs need to continue being sent.
479
0
            s = _send_rpc(ins);
480
0
            if (!s) {
481
0
                _failed(ins.id,
482
0
                        fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
483
0
            }
484
0
        });
485
0
        {
486
0
            auto send_remote_block_closure = AutoReleaseClosure<
487
0
                    PTransmitDataParams,
488
0
                    ExchangeSendCallback<PTransmitDataResult>>::create_unique(brpc_request,
489
0
                                                                              send_callback);
490
0
            if (enable_http_send_block(*brpc_request)) {
491
0
                RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(),
492
0
                                                      std::move(send_remote_block_closure),
493
0
                                                      channel->_brpc_dest_addr));
494
0
            } else {
495
0
                transmit_blockv2(channel->_brpc_stub.get(), std::move(send_remote_block_closure));
496
0
            }
497
0
        }
498
0
        if (!_send_multi_blocks && request.block_holder->get_block()) {
499
0
            static_cast<void>(brpc_request->release_block());
500
0
        } else {
501
0
            for (int i = 0; i < brpc_request->mutable_blocks()->size(); ++i) {
502
0
                static_cast<void>(brpc_request->mutable_blocks(i)->release_column_values());
503
0
            }
504
0
            brpc_request->clear_blocks();
505
0
        }
506
5
    } else {
507
5
        instance_data.rpc_channel_is_idle = true;
508
5
    }
509
510
33
    return Status::OK();
511
33
}
512
513
14
void ExchangeSinkBuffer::_ended(RpcInstance& ins) {
514
14
    std::unique_lock<std::mutex> lock(*ins.mutex);
515
14
    ins.running_sink_count--;
516
14
    if (ins.running_sink_count == 0) {
517
4
        _turn_off_channel(ins, lock);
518
4
    }
519
14
}
520
521
0
void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) {
522
0
    _is_failed = true;
523
0
    LOG(INFO) << "send rpc failed, instance id: " << id << ", _dest_node_id: " << _dest_node_id
524
0
              << ", node id: " << _node_id << ", err: " << err;
525
0
    _context->cancel(Status::Cancelled(err));
526
0
}
527
528
2
void ExchangeSinkBuffer::_set_receiver_eof(RpcInstance& ins) {
529
2
    std::unique_lock<std::mutex> lock(*ins.mutex);
530
    // When the receiving side reaches eof, it means the receiver has finished early.
531
    // The remaining data in the current rpc_channel does not need to be sent,
532
    // and the rpc_channel should be turned off immediately.
533
2
    Defer turn_off([&]() { _turn_off_channel(ins, lock); });
534
535
2
    auto& broadcast_q_map = ins.broadcast_package_queue;
536
2
    for (auto& [channel, broadcast_q] : broadcast_q_map) {
537
0
        for (; !broadcast_q.empty(); broadcast_q.pop()) {
538
0
            if (broadcast_q.front().block_holder->get_block()) {
539
0
                COUNTER_UPDATE(channel->_parent->memory_used_counter(),
540
0
                               -broadcast_q.front().block_holder->get_block()->ByteSizeLong());
541
0
            }
542
0
        }
543
0
    }
544
2
    broadcast_q_map.clear();
545
546
2
    auto& q_map = ins.package_queue;
547
4
    for (auto& [channel, q] : q_map) {
548
8
        for (; !q.empty(); q.pop()) {
549
            // Must update _total_queue_size here, otherwise if _total_queue_size > _queue_capacity at EOF,
550
            // ExchangeSinkQueueDependency will be blocked and pipeline will be deadlocked
551
4
            _total_queue_size--;
552
4
            if (q.front().block) {
553
4
                COUNTER_UPDATE(channel->_parent->memory_used_counter(),
554
4
                               -q.front().block->ByteSizeLong());
555
4
            }
556
4
        }
557
4
    }
558
559
    // Try to wake up pipeline after clearing the queue
560
2
    if (_total_queue_size <= _queue_capacity) {
561
2
        for (auto& dep : _queue_deps) {
562
0
            dep->set_ready();
563
0
        }
564
2
    }
565
566
2
    q_map.clear();
567
2
}
568
569
// The unused parameter `with_lock` is to ensure that the function is called when the lock is held.
570
void ExchangeSinkBuffer::_turn_off_channel(RpcInstance& ins,
571
8
                                           std::unique_lock<std::mutex>& /*with_lock*/) {
572
8
    if (!ins.rpc_channel_is_idle) {
573
8
        ins.rpc_channel_is_idle = true;
574
8
    }
575
    // Ensure that each RPC is turned off only once.
576
8
    if (ins.rpc_channel_is_turn_off) {
577
0
        return;
578
0
    }
579
8
    ins.rpc_channel_is_turn_off = true;
580
8
    auto weak_task_ctx = weak_task_exec_ctx();
581
8
    if (auto pip_ctx = weak_task_ctx.lock()) {
582
8
        for (auto& parent : _parents) {
583
0
            parent->on_channel_finished(ins.id);
584
0
        }
585
8
    }
586
8
}
587
588
0
void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* min_time) {
589
0
    int64_t local_max_time = 0;
590
0
    int64_t local_min_time = INT64_MAX;
591
0
    for (auto& [_, ins] : _rpc_instances) {
592
0
        if (ins->stats.sum_time != 0) {
593
0
            local_max_time = std::max(local_max_time, ins->stats.sum_time);
594
0
            local_min_time = std::min(local_min_time, ins->stats.sum_time);
595
0
        }
596
0
    }
597
0
    *max_time = local_max_time;
598
0
    *min_time = local_min_time == INT64_MAX ? 0 : local_min_time;
599
0
}
600
601
0
int64_t ExchangeSinkBuffer::get_sum_rpc_time() {
602
0
    int64_t sum_time = 0;
603
0
    for (auto& [_, ins] : _rpc_instances) {
604
0
        sum_time += ins->stats.sum_time;
605
0
    }
606
0
    return sum_time;
607
0
}
608
609
void ExchangeSinkBuffer::update_rpc_time(RpcInstance& ins, int64_t start_rpc_time,
610
27
                                         int64_t receive_rpc_time) {
611
27
    _rpc_count++;
612
27
    int64_t rpc_spend_time = receive_rpc_time - start_rpc_time;
613
27
    if (rpc_spend_time > 0) {
614
27
        auto& stats = ins.stats;
615
27
        ++stats.rpc_count;
616
27
        stats.sum_time += rpc_spend_time;
617
27
        stats.max_time = std::max(stats.max_time, rpc_spend_time);
618
27
        stats.min_time = std::min(stats.min_time, rpc_spend_time);
619
27
    }
620
27
}
621
622
0
void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) {
623
0
    auto* _max_rpc_timer = ADD_TIMER_WITH_LEVEL(profile, "RpcMaxTime", 1);
624
0
    auto* _min_rpc_timer = ADD_TIMER(profile, "RpcMinTime");
625
0
    auto* _sum_rpc_timer = ADD_TIMER(profile, "RpcSumTime");
626
0
    auto* _count_rpc = ADD_COUNTER(profile, "RpcCount", TUnit::UNIT);
627
0
    auto* _avg_rpc_timer = ADD_TIMER(profile, "RpcAvgTime");
628
629
0
    int64_t max_rpc_time = 0, min_rpc_time = 0;
630
0
    get_max_min_rpc_time(&max_rpc_time, &min_rpc_time);
631
0
    _max_rpc_timer->set(max_rpc_time);
632
0
    _min_rpc_timer->set(min_rpc_time);
633
634
0
    _count_rpc->set(_rpc_count);
635
0
    int64_t sum_time = get_sum_rpc_time();
636
0
    _sum_rpc_timer->set(sum_time);
637
0
    _avg_rpc_timer->set(sum_time / std::max(static_cast<int64_t>(1), _rpc_count.load()));
638
639
0
    auto max_count = _state->rpc_verbose_profile_max_instance_count();
640
    // This counter will lead to performance degradation.
641
    // So only collect this information when the profile level is greater than 3.
642
0
    if (_state->profile_level() > 3 && max_count > 0) {
643
0
        std::vector<std::pair<InstanceLoId, RpcInstanceStatistics>> tmp_rpc_stats_vec;
644
0
        for (const auto& [id, ins] : _rpc_instances) {
645
0
            tmp_rpc_stats_vec.emplace_back(id, ins->stats);
646
0
        }
647
0
        pdqsort(tmp_rpc_stats_vec.begin(), tmp_rpc_stats_vec.end(),
648
0
                [](const auto& a, const auto& b) { return a.second.max_time > b.second.max_time; });
649
0
        auto count = std::min((size_t)max_count, tmp_rpc_stats_vec.size());
650
0
        int i = 0;
651
0
        auto* detail_profile = profile->create_child("RpcInstanceDetails", true, true);
652
0
        for (const auto& [id, stats] : tmp_rpc_stats_vec) {
653
0
            if (0 == stats.rpc_count) {
654
0
                continue;
655
0
            }
656
0
            std::stringstream out;
657
0
            out << "Instance " << std::hex << id;
658
0
            auto stats_str = fmt::format(
659
0
                    "Count: {}, MaxTime: {}, MinTime: {}, AvgTime: {}, SumTime: {}",
660
0
                    stats.rpc_count, PrettyPrinter::print(stats.max_time, TUnit::TIME_NS),
661
0
                    PrettyPrinter::print(stats.min_time, TUnit::TIME_NS),
662
0
                    PrettyPrinter::print(
663
0
                            stats.sum_time / std::max(static_cast<int64_t>(1), stats.rpc_count),
664
0
                            TUnit::TIME_NS),
665
0
                    PrettyPrinter::print(stats.sum_time, TUnit::TIME_NS));
666
0
            detail_profile->add_info_string(out.str(), stats_str);
667
0
            if (++i == count) {
668
0
                break;
669
0
            }
670
0
        }
671
0
    }
672
0
}
673
674
2
std::string ExchangeSinkBuffer::debug_each_instance_queue_size() {
675
2
    fmt::memory_buffer debug_string_buffer;
676
6
    for (auto& [id, instance_data] : _rpc_instances) {
677
6
        std::unique_lock<std::mutex> lock(*instance_data->mutex);
678
6
        auto queue_size = 0;
679
6
        for (auto& [_, list] : instance_data->package_queue) {
680
5
            queue_size += list.size();
681
5
        }
682
6
        fmt::format_to(debug_string_buffer, "Instance: {}, queue size: {}\n", id, queue_size);
683
6
    }
684
2
    return fmt::to_string(debug_string_buffer);
685
2
}
686
687
#include "common/compile_check_end.h"
688
} // namespace doris