Coverage Report

Created: 2026-05-14 22:02

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/operator/exchange_sink_buffer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/operator/exchange_sink_buffer.h"
19
20
#include <brpc/controller.h>
21
#include <butil/errno.h>
22
#include <butil/iobuf_inl.h>
23
#include <fmt/format.h>
24
#include <gen_cpp/Types_types.h>
25
#include <gen_cpp/types.pb.h>
26
#include <glog/logging.h>
27
#include <google/protobuf/stubs/callback.h>
28
#include <pdqsort.h>
29
#include <stddef.h>
30
31
#include <atomic>
32
#include <cstdint>
33
#include <exception>
34
#include <functional>
35
#include <memory>
36
#include <ostream>
37
#include <utility>
38
39
#include "common/status.h"
40
#include "exec/exchange/vdata_stream_sender.h"
41
#include "exec/operator/exchange_sink_operator.h"
42
#include "exec/pipeline/pipeline_fragment_context.h"
43
#include "runtime/exec_env.h"
44
#include "runtime/thread_context.h"
45
#include "service/backend_options.h"
46
#include "util/defer_op.h"
47
#include "util/proto_util.h"
48
#include "util/time.h"
49
50
namespace doris {
51
52
2
BroadcastPBlockHolder::~BroadcastPBlockHolder() {
53
    // lock the parent queue, if the queue could lock success, then return the block
54
    // to the queue, to reuse the block
55
2
    std::shared_ptr<BroadcastPBlockHolderMemLimiter> limiter = _parent_creator.lock();
56
2
    if (limiter != nullptr) {
57
2
        limiter->release(*this);
58
2
    }
59
    // If the queue already deconstruted, then release pblock automatically since it
60
    // is a unique ptr.
61
2
}
62
63
2
void BroadcastPBlockHolderMemLimiter::acquire(BroadcastPBlockHolder& holder) {
64
2
    std::unique_lock l(_holders_lock);
65
2
    DCHECK(_broadcast_dependency != nullptr);
66
2
    holder.set_parent_creator(shared_from_this());
67
2
    auto size = holder._pblock->column_values().size();
68
2
    _total_queue_buffer_size += size;
69
2
    _total_queue_blocks_count++;
70
2
    if (_total_queue_buffer_size >= _total_queue_buffer_size_limit ||
71
2
        _total_queue_blocks_count >= _total_queue_blocks_count_limit) {
72
0
        _broadcast_dependency->block();
73
0
    }
74
2
}
75
76
2
void BroadcastPBlockHolderMemLimiter::release(const BroadcastPBlockHolder& holder) {
77
2
    std::unique_lock l(_holders_lock);
78
2
    DCHECK(_broadcast_dependency != nullptr);
79
2
    auto size = holder._pblock->column_values().size();
80
2
    _total_queue_buffer_size -= size;
81
2
    _total_queue_blocks_count--;
82
2
    if (_total_queue_buffer_size <= 0) {
83
2
        _broadcast_dependency->set_ready();
84
2
    }
85
2
}
86
87
ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id,
88
                                       PlanNodeId node_id, RuntimeState* state,
89
                                       const std::vector<InstanceLoId>& sender_ins_ids)
90
6
        : HasTaskExecutionCtx(state),
91
6
          _queue_capacity(0),
92
6
          _is_failed(false),
93
6
          _query_id(std::move(query_id)),
94
6
          _dest_node_id(dest_node_id),
95
6
          _node_id(node_id),
96
6
          _state(state),
97
6
          _context(state->get_query_ctx()),
98
6
          _exchange_sink_num(sender_ins_ids.size()),
99
6
          _send_multi_blocks(state->query_options().__isset.exchange_multi_blocks_byte_size &&
100
6
                             state->query_options().exchange_multi_blocks_byte_size > 0) {
101
6
    if (_send_multi_blocks) {
102
6
        _send_multi_blocks_byte_size = state->query_options().exchange_multi_blocks_byte_size;
103
6
    }
104
6
}
105
106
0
void ExchangeSinkBuffer::close() {
107
    // Could not clear the queue here, because there maybe a running rpc want to
108
    // get a request from the queue, and clear method will release the request
109
    // and it will core.
110
    //_instance_to_broadcast_package_queue.clear();
111
    //_instance_to_package_queue.clear();
112
    //_instance_to_request.clear();
113
0
}
114
115
14
void ExchangeSinkBuffer::construct_request(TUniqueId fragment_instance_id) {
116
14
    if (_is_failed) {
117
0
        return;
118
0
    }
119
14
    auto low_id = fragment_instance_id.lo;
120
14
    if (_rpc_instances.contains(low_id)) {
121
0
        return;
122
0
    }
123
124
    // Initialize the instance data
125
14
    auto instance_data = std::make_unique<RpcInstance>(low_id);
126
14
    instance_data->mutex = std::make_unique<std::mutex>();
127
14
    instance_data->seq = 0;
128
14
    instance_data->package_queue =
129
14
            std::unordered_map<Channel*, std::queue<TransmitInfo, std::list<TransmitInfo>>>();
130
14
    instance_data->broadcast_package_queue = std::unordered_map<
131
14
            Channel*, std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>>();
132
14
    _queue_capacity = config::exchg_buffer_queue_capacity_factor * _rpc_instances.size();
133
134
14
    PUniqueId finst_id;
135
14
    finst_id.set_hi(fragment_instance_id.hi);
136
14
    finst_id.set_lo(fragment_instance_id.lo);
137
138
14
    instance_data->rpc_channel_is_idle = true;
139
14
    instance_data->rpc_channel_is_turn_off = false;
140
141
    // Initialize request
142
14
    instance_data->request = std::make_shared<PTransmitDataParams>();
143
14
    instance_data->request->mutable_finst_id()->CopyFrom(finst_id);
144
14
    instance_data->request->mutable_query_id()->CopyFrom(_query_id);
145
14
    instance_data->request->set_node_id(_dest_node_id);
146
14
    instance_data->running_sink_count = _exchange_sink_num;
147
148
14
    _rpc_instances[low_id] = std::move(instance_data);
149
14
}
150
151
48
Status ExchangeSinkBuffer::add_block(Channel* channel, TransmitInfo&& request) {
152
48
    if (_is_failed) {
153
9
        return Status::OK();
154
9
    }
155
39
    auto ins_id = channel->dest_ins_id();
156
39
    if (!_rpc_instances.contains(ins_id)) {
157
0
        return Status::InternalError("fragment_instance_id {} not do register_sink",
158
0
                                     print_id(channel->_fragment_instance_id));
159
0
    }
160
39
    auto& instance_data = *_rpc_instances[ins_id];
161
39
    bool send_now = false;
162
39
    {
163
39
        std::unique_lock<std::mutex> lock(*instance_data.mutex);
164
39
        if (instance_data.rpc_channel_is_turn_off) {
165
1
            return Status::EndOfFile("receiver eof");
166
1
        }
167
        // Do not have in process rpc, directly send
168
38
        if (instance_data.rpc_channel_is_idle) {
169
14
            send_now = true;
170
14
            instance_data.rpc_channel_is_idle = false;
171
14
        }
172
38
        if (request.block) {
173
38
            RETURN_IF_ERROR(
174
38
                    BeExecVersionManager::check_be_exec_version(request.block->be_exec_version()));
175
38
            COUNTER_UPDATE(channel->_parent->memory_used_counter(), request.block->ByteSizeLong());
176
38
        }
177
38
        instance_data.package_queue[channel].emplace(std::move(request));
178
38
        _total_queue_size++;
179
38
        if (_total_queue_size > _queue_capacity) {
180
0
            for (auto& dep : _queue_deps) {
181
0
                dep->block();
182
0
            }
183
0
        }
184
38
    }
185
38
    if (send_now) {
186
14
        RETURN_IF_ERROR(_send_rpc(instance_data));
187
14
    }
188
189
38
    return Status::OK();
190
38
}
191
192
0
Status ExchangeSinkBuffer::add_block(Channel* channel, BroadcastTransmitInfo&& request) {
193
0
    if (_is_failed) {
194
0
        return Status::OK();
195
0
    }
196
0
    auto ins_id = channel->dest_ins_id();
197
0
    if (!_rpc_instances.contains(ins_id)) {
198
0
        return Status::InternalError("fragment_instance_id {} not do register_sink",
199
0
                                     print_id(channel->_fragment_instance_id));
200
0
    }
201
0
    auto& instance_data = *_rpc_instances[ins_id];
202
0
    bool send_now = false;
203
0
    {
204
0
        std::unique_lock<std::mutex> lock(*instance_data.mutex);
205
0
        if (instance_data.rpc_channel_is_turn_off) {
206
0
            return Status::EndOfFile("receiver eof");
207
0
        }
208
        // Do not have in process rpc, directly send
209
0
        if (instance_data.rpc_channel_is_idle) {
210
0
            send_now = true;
211
0
            instance_data.rpc_channel_is_idle = false;
212
0
        }
213
0
        if (request.block_holder->get_block()) {
214
0
            RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(
215
0
                    request.block_holder->get_block()->be_exec_version()));
216
0
        }
217
0
        instance_data.broadcast_package_queue[channel].emplace(request);
218
0
    }
219
0
    if (send_now) {
220
0
        RETURN_IF_ERROR(_send_rpc(instance_data));
221
0
    }
222
223
0
    return Status::OK();
224
0
}
225
226
41
Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) {
227
41
    std::unique_lock<std::mutex> lock(*(instance_data.mutex));
228
229
41
    auto& q_map = instance_data.package_queue;
230
41
    auto& broadcast_q_map = instance_data.broadcast_package_queue;
231
232
82
    auto find_max_size_queue = [](Channel*& channel, auto& ptr, auto& map) {
233
82
        for (auto& [chan, lists] : map) {
234
81
            if (!ptr) {
235
62
                if (!lists.empty()) {
236
30
                    channel = chan;
237
30
                    ptr = &lists;
238
30
                }
239
62
            } else {
240
19
                if (ptr->size() < lists.size()) {
241
0
                    channel = chan;
242
0
                    ptr = &lists;
243
0
                }
244
19
            }
245
81
        }
246
82
    };
exchange_sink_buffer.cpp:_ZZN5doris18ExchangeSinkBuffer9_send_rpcERNS_11RpcInstanceEENK3$_0clIPSt5queueINS_12TransmitInfoENSt7__cxx114listIS6_SaIS6_EEEESt13unordered_mapIPNS_7ChannelESB_St4hashISF_ESt8equal_toISF_ESaISt4pairIKSF_SB_EEEEEDaRSF_RT_RT0_
Line
Count
Source
232
41
    auto find_max_size_queue = [](Channel*& channel, auto& ptr, auto& map) {
233
81
        for (auto& [chan, lists] : map) {
234
81
            if (!ptr) {
235
62
                if (!lists.empty()) {
236
30
                    channel = chan;
237
30
                    ptr = &lists;
238
30
                }
239
62
            } else {
240
19
                if (ptr->size() < lists.size()) {
241
0
                    channel = chan;
242
0
                    ptr = &lists;
243
0
                }
244
19
            }
245
81
        }
246
41
    };
exchange_sink_buffer.cpp:_ZZN5doris18ExchangeSinkBuffer9_send_rpcERNS_11RpcInstanceEENK3$_0clIPSt5queueINS_21BroadcastTransmitInfoENSt7__cxx114listIS6_SaIS6_EEEESt13unordered_mapIPNS_7ChannelESB_St4hashISF_ESt8equal_toISF_ESaISt4pairIKSF_SB_EEEEEDaRSF_RT_RT0_
Line
Count
Source
232
41
    auto find_max_size_queue = [](Channel*& channel, auto& ptr, auto& map) {
233
41
        for (auto& [chan, lists] : map) {
234
0
            if (!ptr) {
235
0
                if (!lists.empty()) {
236
0
                    channel = chan;
237
0
                    ptr = &lists;
238
0
                }
239
0
            } else {
240
0
                if (ptr->size() < lists.size()) {
241
0
                    channel = chan;
242
0
                    ptr = &lists;
243
0
                }
244
0
            }
245
0
        }
246
41
    };
247
248
41
    Channel* channel = nullptr;
249
250
41
    std::queue<TransmitInfo, std::list<TransmitInfo>>* q_ptr = nullptr;
251
41
    find_max_size_queue(channel, q_ptr, q_map);
252
41
    std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>* broadcast_q_ptr = nullptr;
253
41
    find_max_size_queue(channel, broadcast_q_ptr, broadcast_q_map);
254
255
41
    if (_is_failed) {
256
2
        _turn_off_channel(instance_data, lock);
257
2
        return Status::OK();
258
2
    }
259
39
    if (instance_data.rpc_channel_is_turn_off) {
260
6
        return Status::OK();
261
6
    }
262
263
33
    auto mem_byte = 0;
264
33
    if (q_ptr && !q_ptr->empty()) {
265
28
        auto& q = *q_ptr;
266
267
28
        std::vector<TransmitInfo> requests(_send_multi_blocks ? q.size() : 1);
268
56
        for (int i = 0; i < requests.size(); i++) {
269
28
            requests[i] = std::move(q.front());
270
28
            q.pop();
271
272
28
            if (requests[i].block) {
273
                // make sure rpc byte size under the _send_multi_blocks_bytes_size
274
28
                mem_byte += requests[i].block->ByteSizeLong();
275
28
                if (_send_multi_blocks && mem_byte > _send_multi_blocks_byte_size) {
276
0
                    requests.resize(i + 1);
277
0
                    break;
278
0
                }
279
28
            }
280
28
        }
281
282
        // If we have data to shuffle which is not broadcasted
283
28
        auto& request = requests[0];
284
28
        auto& brpc_request = instance_data.request;
285
28
        brpc_request->set_sender_id(channel->_parent->sender_id());
286
28
        brpc_request->set_be_number(channel->_parent->be_number());
287
288
28
        if (_send_multi_blocks) {
289
0
            for (auto& req : requests) {
290
0
                if (req.block && !req.block->column_metas().empty()) {
291
0
                    auto add_block = brpc_request->add_blocks();
292
0
                    add_block->Swap(req.block.get());
293
0
                }
294
0
            }
295
28
        } else {
296
28
            if (request.block && !request.block->column_metas().empty()) {
297
0
                brpc_request->set_allocated_block(request.block.get());
298
0
            }
299
28
        }
300
28
        Defer release_block([&]() {
301
28
            if (!_send_multi_blocks && request.block) {
302
28
                static_cast<void>(brpc_request->release_block());
303
28
            } else {
304
0
                brpc_request->clear_blocks();
305
0
            }
306
28
        });
307
308
28
        instance_data.seq += requests.size();
309
28
        brpc_request->set_packet_seq(instance_data.seq);
310
28
        brpc_request->set_eos(requests.back().eos);
311
28
        auto send_callback = channel->get_send_callback(&instance_data, requests.back().eos);
312
28
        send_callback->cntl_->set_timeout_ms(channel->_brpc_timeout_ms);
313
28
        if (config::execution_ignore_eovercrowded) {
314
28
            send_callback->cntl_->ignore_eovercrowded();
315
28
        }
316
28
        send_callback->addFailedHandler([&, weak_task_ctx = weak_task_exec_ctx()](
317
28
                                                RpcInstance* ins, const std::string& err) {
318
1
            auto task_lock = weak_task_ctx.lock();
319
1
            if (task_lock == nullptr) {
320
                // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more.
321
0
                return;
322
0
            }
323
            // attach task for memory tracker and query id when core
324
1
            SCOPED_ATTACH_TASK(_state);
325
1
            _failed(ins->id, err);
326
1
        });
327
28
        send_callback->start_rpc_time = GetCurrentTimeNanos();
328
28
        send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()](
329
28
                                                 RpcInstance* ins_ptr, const bool& eos,
330
28
                                                 const PTransmitDataResult& result,
331
28
                                                 const int64_t& start_rpc_time) {
332
27
            auto task_lock = weak_task_ctx.lock();
333
27
            if (task_lock == nullptr) {
334
                // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more.
335
0
                return;
336
0
            }
337
            // attach task for memory tracker and query id when core
338
27
            SCOPED_ATTACH_TASK(_state);
339
340
27
            auto& ins = *ins_ptr;
341
27
            auto end_rpc_time = GetCurrentTimeNanos();
342
27
            update_rpc_time(ins, start_rpc_time, end_rpc_time);
343
344
27
            Status s(Status::create(result.status()));
345
27
            if (s.is<ErrorCode::END_OF_FILE>()) {
346
2
                _set_receiver_eof(ins);
347
25
            } else if (!s.ok()) {
348
0
                _failed(ins.id,
349
0
                        fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
350
0
                return;
351
25
            } else if (eos) {
352
14
                _ended(ins);
353
14
            }
354
            // The eos here only indicates that the current exchange sink has reached eos.
355
            // However, the queue still contains data from other exchange sinks, so RPCs need to continue being sent.
356
            // `_send_rpc` must be the LAST operation in this function, because it may reuse the callback!
357
27
            s = _send_rpc(ins);
358
27
            if (!s) {
359
0
                _failed(ins.id,
360
0
                        fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
361
0
            }
362
27
        });
363
28
        {
364
28
            auto send_remote_block_closure = AutoReleaseClosure<
365
28
                    PTransmitDataParams,
366
28
                    ExchangeSendCallback<PTransmitDataResult>>::create_unique(brpc_request,
367
28
                                                                              send_callback);
368
28
            if (enable_http_send_block(*brpc_request)) {
369
0
                RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(),
370
0
                                                      std::move(send_remote_block_closure),
371
0
                                                      channel->_brpc_dest_addr));
372
28
            } else {
373
28
                transmit_blockv2(channel->_brpc_stub.get(), std::move(send_remote_block_closure));
374
28
            }
375
28
        }
376
377
28
        if (mem_byte) {
378
0
            COUNTER_UPDATE(channel->_parent->memory_used_counter(), -mem_byte);
379
0
        }
380
28
        DCHECK_GE(_total_queue_size, requests.size());
381
28
        _total_queue_size -= (int)requests.size();
382
28
        if (_total_queue_size <= _queue_capacity) {
383
28
            for (auto& dep : _queue_deps) {
384
0
                dep->set_ready();
385
0
            }
386
28
        }
387
28
    } else if (broadcast_q_ptr && !broadcast_q_ptr->empty()) {
388
0
        auto& broadcast_q = *broadcast_q_ptr;
389
        // If we have data to shuffle which is broadcasted
390
0
        std::vector<BroadcastTransmitInfo> requests(_send_multi_blocks ? broadcast_q.size() : 1);
391
0
        for (int i = 0; i < requests.size(); i++) {
392
0
            requests[i] = broadcast_q.front();
393
0
            broadcast_q.pop();
394
395
0
            if (requests[i].block_holder->get_block()) {
396
                // make sure rpc byte size under the _send_multi_blocks_bytes_size
397
0
                mem_byte += requests[i].block_holder->get_block()->ByteSizeLong();
398
0
                if (_send_multi_blocks && mem_byte > _send_multi_blocks_byte_size) {
399
0
                    requests.resize(i + 1);
400
0
                    break;
401
0
                }
402
0
            }
403
0
        }
404
405
0
        auto& request = requests[0];
406
0
        auto& brpc_request = instance_data.request;
407
0
        brpc_request->set_sender_id(channel->_parent->sender_id());
408
0
        brpc_request->set_be_number(channel->_parent->be_number());
409
410
0
        if (_send_multi_blocks) {
411
0
            for (int i = 0; i < requests.size(); i++) {
412
0
                auto& req = requests[i];
413
0
                if (auto block = req.block_holder->get_block();
414
0
                    block && !block->column_metas().empty()) {
415
0
                    auto add_block = brpc_request->add_blocks();
416
0
                    for (int j = 0; j < block->column_metas_size(); ++j) {
417
0
                        add_block->add_column_metas()->CopyFrom(block->column_metas(j));
418
0
                    }
419
0
                    add_block->set_be_exec_version(block->be_exec_version());
420
0
                    add_block->set_compressed(block->compressed());
421
0
                    add_block->set_compression_type(block->compression_type());
422
0
                    add_block->set_uncompressed_size(block->uncompressed_size());
423
0
                    add_block->set_allocated_column_values(block->mutable_column_values());
424
0
                }
425
0
            }
426
0
        } else {
427
0
            if (request.block_holder->get_block() &&
428
0
                !request.block_holder->get_block()->column_metas().empty()) {
429
0
                brpc_request->set_allocated_block(request.block_holder->get_block());
430
0
            }
431
0
        }
432
0
        Defer release_block([&]() {
433
0
            if (!_send_multi_blocks && request.block_holder->get_block()) {
434
0
                static_cast<void>(brpc_request->release_block());
435
0
            } else {
436
0
                for (int i = 0; i < brpc_request->mutable_blocks()->size(); ++i) {
437
0
                    static_cast<void>(brpc_request->mutable_blocks(i)->release_column_values());
438
0
                }
439
0
                brpc_request->clear_blocks();
440
0
            }
441
0
        });
442
0
        instance_data.seq += requests.size();
443
0
        brpc_request->set_packet_seq(instance_data.seq);
444
0
        brpc_request->set_eos(requests.back().eos);
445
0
        auto send_callback = channel->get_send_callback(&instance_data, requests.back().eos);
446
447
0
        send_callback->cntl_->set_timeout_ms(channel->_brpc_timeout_ms);
448
0
        if (config::execution_ignore_eovercrowded) {
449
0
            send_callback->cntl_->ignore_eovercrowded();
450
0
        }
451
0
        send_callback->addFailedHandler([&, weak_task_ctx = weak_task_exec_ctx()](
452
0
                                                RpcInstance* ins, const std::string& err) {
453
0
            auto task_lock = weak_task_ctx.lock();
454
0
            if (task_lock == nullptr) {
455
                // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more.
456
0
                return;
457
0
            }
458
            // attach task for memory tracker and query id when core
459
0
            SCOPED_ATTACH_TASK(_state);
460
0
            _failed(ins->id, err);
461
0
        });
462
0
        send_callback->start_rpc_time = GetCurrentTimeNanos();
463
0
        send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()](
464
0
                                                 RpcInstance* ins_ptr, const bool& eos,
465
0
                                                 const PTransmitDataResult& result,
466
0
                                                 const int64_t& start_rpc_time) {
467
0
            auto task_lock = weak_task_ctx.lock();
468
0
            if (task_lock == nullptr) {
469
                // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more.
470
0
                return;
471
0
            }
472
            // attach task for memory tracker and query id when core
473
0
            SCOPED_ATTACH_TASK(_state);
474
0
            auto& ins = *ins_ptr;
475
0
            auto end_rpc_time = GetCurrentTimeNanos();
476
0
            update_rpc_time(ins, start_rpc_time, end_rpc_time);
477
478
0
            Status s(Status::create(result.status()));
479
0
            if (s.is<ErrorCode::END_OF_FILE>()) {
480
0
                _set_receiver_eof(ins);
481
0
            } else if (!s.ok()) {
482
0
                _failed(ins.id,
483
0
                        fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
484
0
                return;
485
0
            } else if (eos) {
486
0
                _ended(ins);
487
0
            }
488
            // The eos here only indicates that the current exchange sink has reached eos.
489
            // However, the queue still contains data from other exchange sinks, so RPCs need to continue being sent.
490
            // `_send_rpc` must be the LAST operation in this function, because it may reuse the callback!
491
0
            s = _send_rpc(ins);
492
0
            if (!s) {
493
0
                _failed(ins.id,
494
0
                        fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
495
0
            }
496
0
        });
497
0
        {
498
0
            auto send_remote_block_closure = AutoReleaseClosure<
499
0
                    PTransmitDataParams,
500
0
                    ExchangeSendCallback<PTransmitDataResult>>::create_unique(brpc_request,
501
0
                                                                              send_callback);
502
0
            if (enable_http_send_block(*brpc_request)) {
503
0
                RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(),
504
0
                                                      std::move(send_remote_block_closure),
505
0
                                                      channel->_brpc_dest_addr));
506
0
            } else {
507
0
                transmit_blockv2(channel->_brpc_stub.get(), std::move(send_remote_block_closure));
508
0
            }
509
0
        }
510
5
    } else {
511
5
        instance_data.rpc_channel_is_idle = true;
512
5
    }
513
514
33
    return Status::OK();
515
33
}
516
517
14
void ExchangeSinkBuffer::_ended(RpcInstance& ins) {
518
14
    std::unique_lock<std::mutex> lock(*ins.mutex);
519
14
    ins.running_sink_count--;
520
14
    if (ins.running_sink_count == 0) {
521
4
        _turn_off_channel(ins, lock);
522
4
    }
523
14
}
524
525
0
void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) {
526
0
    _is_failed = true;
527
0
    LOG(INFO) << "send rpc failed, instance id: " << id << ", _dest_node_id: " << _dest_node_id
528
0
              << ", node id: " << _node_id << ", err: " << err;
529
0
    _context->cancel(Status::Cancelled(err));
530
0
}
531
532
2
void ExchangeSinkBuffer::_set_receiver_eof(RpcInstance& ins) {
533
2
    std::unique_lock<std::mutex> lock(*ins.mutex);
534
    // When the receiving side reaches eof, it means the receiver has finished early.
535
    // The remaining data in the current rpc_channel does not need to be sent,
536
    // and the rpc_channel should be turned off immediately.
537
2
    Defer turn_off([&]() { _turn_off_channel(ins, lock); });
538
539
2
    auto& broadcast_q_map = ins.broadcast_package_queue;
540
2
    for (auto& [channel, broadcast_q] : broadcast_q_map) {
541
0
        for (; !broadcast_q.empty(); broadcast_q.pop()) {
542
0
            if (broadcast_q.front().block_holder->get_block()) {
543
0
                COUNTER_UPDATE(channel->_parent->memory_used_counter(),
544
0
                               -broadcast_q.front().block_holder->get_block()->ByteSizeLong());
545
0
            }
546
0
        }
547
0
    }
548
2
    broadcast_q_map.clear();
549
550
2
    auto& q_map = ins.package_queue;
551
4
    for (auto& [channel, q] : q_map) {
552
8
        for (; !q.empty(); q.pop()) {
553
            // Must update _total_queue_size here, otherwise if _total_queue_size > _queue_capacity at EOF,
554
            // ExchangeSinkQueueDependency will be blocked and pipeline will be deadlocked
555
4
            _total_queue_size--;
556
4
            if (q.front().block) {
557
4
                COUNTER_UPDATE(channel->_parent->memory_used_counter(),
558
4
                               -q.front().block->ByteSizeLong());
559
4
            }
560
4
        }
561
4
    }
562
563
    // Try to wake up pipeline after clearing the queue
564
2
    if (_total_queue_size <= _queue_capacity) {
565
2
        for (auto& dep : _queue_deps) {
566
0
            dep->set_ready();
567
0
        }
568
2
    }
569
570
2
    q_map.clear();
571
2
}
572
573
// The unused parameter `with_lock` is to ensure that the function is called when the lock is held.
574
void ExchangeSinkBuffer::_turn_off_channel(RpcInstance& ins,
575
8
                                           std::unique_lock<std::mutex>& /*with_lock*/) {
576
8
    if (!ins.rpc_channel_is_idle) {
577
8
        ins.rpc_channel_is_idle = true;
578
8
    }
579
    // Ensure that each RPC is turned off only once.
580
8
    if (ins.rpc_channel_is_turn_off) {
581
0
        return;
582
0
    }
583
8
    ins.rpc_channel_is_turn_off = true;
584
8
    auto weak_task_ctx = weak_task_exec_ctx();
585
8
    if (auto pip_ctx = weak_task_ctx.lock()) {
586
8
        for (auto& parent : _parents) {
587
0
            parent->on_channel_finished(ins.id);
588
0
        }
589
8
    } else {
590
        // Task execution context is already gone. The pipeline fragment context is being
591
        // destroyed, so on_channel_finished is skipped. This is normally safe because
592
        // unblock_all_dependencies() should have already set finish_dependency to always_ready.
593
0
        LOG(INFO) << "ExchangeSinkBuffer::_turn_off_channel: task context is null, "
594
0
                  << "skipping on_channel_finished for instance " << ins.id
595
0
                  << ", dest_node_id=" << _dest_node_id << ", node_id=" << _node_id;
596
0
    }
597
8
}
598
599
0
void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* min_time) {
600
0
    int64_t local_max_time = 0;
601
0
    int64_t local_min_time = INT64_MAX;
602
0
    for (auto& [_, ins] : _rpc_instances) {
603
0
        if (ins->stats.sum_time != 0) {
604
0
            local_max_time = std::max(local_max_time, ins->stats.sum_time);
605
0
            local_min_time = std::min(local_min_time, ins->stats.sum_time);
606
0
        }
607
0
    }
608
0
    *max_time = local_max_time;
609
0
    *min_time = local_min_time == INT64_MAX ? 0 : local_min_time;
610
0
}
611
612
0
int64_t ExchangeSinkBuffer::get_sum_rpc_time() {
613
0
    int64_t sum_time = 0;
614
0
    for (auto& [_, ins] : _rpc_instances) {
615
0
        sum_time += ins->stats.sum_time;
616
0
    }
617
0
    return sum_time;
618
0
}
619
620
void ExchangeSinkBuffer::update_rpc_time(RpcInstance& ins, int64_t start_rpc_time,
621
27
                                         int64_t receive_rpc_time) {
622
27
    _rpc_count++;
623
27
    int64_t rpc_spend_time = receive_rpc_time - start_rpc_time;
624
27
    if (rpc_spend_time > 0) {
625
27
        auto& stats = ins.stats;
626
27
        ++stats.rpc_count;
627
27
        stats.sum_time += rpc_spend_time;
628
27
        stats.max_time = std::max(stats.max_time, rpc_spend_time);
629
27
        stats.min_time = std::min(stats.min_time, rpc_spend_time);
630
27
    }
631
27
}
632
633
0
void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) {
634
0
    auto* _max_rpc_timer = ADD_TIMER_WITH_LEVEL(profile, "RpcMaxTime", 1);
635
0
    auto* _min_rpc_timer = ADD_TIMER(profile, "RpcMinTime");
636
0
    auto* _sum_rpc_timer = ADD_TIMER(profile, "RpcSumTime");
637
0
    auto* _count_rpc = ADD_COUNTER(profile, "RpcCount", TUnit::UNIT);
638
0
    auto* _avg_rpc_timer = ADD_TIMER(profile, "RpcAvgTime");
639
640
0
    int64_t max_rpc_time = 0, min_rpc_time = 0;
641
0
    get_max_min_rpc_time(&max_rpc_time, &min_rpc_time);
642
0
    _max_rpc_timer->set(max_rpc_time);
643
0
    _min_rpc_timer->set(min_rpc_time);
644
645
0
    _count_rpc->set(_rpc_count);
646
0
    int64_t sum_time = get_sum_rpc_time();
647
0
    _sum_rpc_timer->set(sum_time);
648
0
    _avg_rpc_timer->set(sum_time / std::max(static_cast<int64_t>(1), _rpc_count.load()));
649
650
0
    auto max_count = _state->rpc_verbose_profile_max_instance_count();
651
    // This counter will lead to performance degradation.
652
    // So only collect this information when the profile level is greater than 3.
653
0
    if (_state->profile_level() > 3 && max_count > 0) {
654
0
        std::vector<std::pair<InstanceLoId, RpcInstanceStatistics>> tmp_rpc_stats_vec;
655
0
        for (const auto& [id, ins] : _rpc_instances) {
656
0
            tmp_rpc_stats_vec.emplace_back(id, ins->stats);
657
0
        }
658
0
        pdqsort(tmp_rpc_stats_vec.begin(), tmp_rpc_stats_vec.end(),
659
0
                [](const auto& a, const auto& b) { return a.second.max_time > b.second.max_time; });
660
0
        auto count = std::min((size_t)max_count, tmp_rpc_stats_vec.size());
661
0
        int i = 0;
662
0
        auto* detail_profile = profile->create_child("RpcInstanceDetails", true, true);
663
0
        for (const auto& [id, stats] : tmp_rpc_stats_vec) {
664
0
            if (0 == stats.rpc_count) {
665
0
                continue;
666
0
            }
667
0
            std::stringstream out;
668
0
            out << "Instance " << std::hex << id;
669
0
            auto stats_str = fmt::format(
670
0
                    "Count: {}, MaxTime: {}, MinTime: {}, AvgTime: {}, SumTime: {}",
671
0
                    stats.rpc_count, PrettyPrinter::print(stats.max_time, TUnit::TIME_NS),
672
0
                    PrettyPrinter::print(stats.min_time, TUnit::TIME_NS),
673
0
                    PrettyPrinter::print(
674
0
                            stats.sum_time / std::max(static_cast<int64_t>(1), stats.rpc_count),
675
0
                            TUnit::TIME_NS),
676
0
                    PrettyPrinter::print(stats.sum_time, TUnit::TIME_NS));
677
0
            detail_profile->add_info_string(out.str(), stats_str);
678
0
            if (++i == count) {
679
0
                break;
680
0
            }
681
0
        }
682
0
    }
683
0
}
684
685
2
std::string ExchangeSinkBuffer::debug_each_instance_queue_size() {
686
2
    fmt::memory_buffer debug_string_buffer;
687
6
    for (auto& [id, instance_data] : _rpc_instances) {
688
6
        std::unique_lock<std::mutex> lock(*instance_data->mutex);
689
6
        auto queue_size = 0;
690
6
        for (auto& [_, list] : instance_data->package_queue) {
691
5
            queue_size += list.size();
692
5
        }
693
6
        fmt::format_to(debug_string_buffer, "Instance: {}, queue size: {}, is turn off: {}\n",
694
                       fmt::format(FMT_COMPILE("{:x}"), static_cast<uint64_t>(id)), queue_size,
695
6
                       instance_data->rpc_channel_is_turn_off);
696
6
    }
697
2
    return fmt::to_string(debug_string_buffer);
698
2
}
699
700
} // namespace doris