Coverage Report

Created: 2026-07-01 22:22

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/exchange/vdata_stream_recvr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/exchange/vdata_stream_recvr.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Metrics_types.h>
22
#include <gen_cpp/Types_types.h>
23
#include <gen_cpp/data.pb.h>
24
25
#include <algorithm>
26
#include <functional>
27
#include <string>
28
29
#include "common/logging.h"
30
#include "core/block/block.h"
31
#include "core/block/materialize_block.h"
32
#include "exec/exchange/vdata_stream_mgr.h"
33
#include "exec/operator/exchange_sink_operator.h"
34
#include "exec/operator/exchange_source_operator.h"
35
#include "exec/sort/sort_cursor.h"
36
#include "exec/sort/vsorted_run_merger.h"
37
#include "runtime/memory/mem_tracker.h"
38
#include "runtime/runtime_state.h"
39
#include "runtime/thread_context.h"
40
#include "util/defer_op.h"
41
#include "util/uid_util.h"
42
43
namespace doris {
44
45
VDataStreamRecvr::SenderQueue::SenderQueue(VDataStreamRecvr* parent_recvr, int num_senders,
46
                                           std::shared_ptr<Dependency> local_channel_dependency)
47
27
        : _recvr(parent_recvr),
48
27
          _is_cancelled(false),
49
27
          _num_remaining_senders(num_senders),
50
27
          _local_channel_dependency(local_channel_dependency) {
51
27
    _cancel_status = Status::OK();
52
27
    _queue_mem_tracker = std::make_unique<MemTracker>("local data queue mem tracker");
53
27
}
54
55
27
VDataStreamRecvr::SenderQueue::~SenderQueue() {
56
27
    run_block_queue_done_callbacks(_block_queue);
57
27
    _block_queue.clear();
58
27
}
59
60
923
Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) {
61
923
    BlockItem block_item;
62
923
    {
63
923
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
64
923
#ifndef NDEBUG
65
923
        if (!_is_cancelled && _block_queue.empty() && _num_remaining_senders > 0) {
66
0
            throw doris::Exception(ErrorCode::INTERNAL_ERROR,
67
0
                                   "_is_cancelled: {}, _block_queue_empty: {}, "
68
0
                                   "_num_remaining_senders: {}",
69
0
                                   _is_cancelled, _block_queue.empty(), _num_remaining_senders);
70
0
        }
71
923
#endif
72
        //check and get block_item from data_queue
73
923
        if (_is_cancelled) {
74
3
            RETURN_IF_ERROR(_cancel_status);
75
2
            return Status::Cancelled("Cancelled");
76
3
        }
77
78
920
        if (_block_queue.empty()) {
79
9
            if (_num_remaining_senders != 0) {
80
0
                return Status::InternalError(
81
0
                        "Data queue is empty but there are still remaining senders. "
82
0
                        "_num_remaining_senders: {}",
83
0
                        _num_remaining_senders);
84
0
            }
85
9
            *eos = true;
86
9
            return Status::OK();
87
9
        }
88
89
920
        DCHECK(!_block_queue.empty());
90
911
        block_item = std::move(_block_queue.front());
91
911
        _block_queue.pop_front();
92
911
    }
93
0
    BlockUPtr next_block;
94
911
    RETURN_IF_ERROR(block_item.get_block(next_block));
95
911
    size_t block_byte_size = block_item.block_byte_size();
96
911
    COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, block_item.deserialize_time());
97
911
    COUNTER_UPDATE(_recvr->_decompress_timer, block_item.decompress_time());
98
911
    COUNTER_UPDATE(_recvr->_decompress_bytes, block_item.decompress_bytes());
99
911
    _recvr->_memory_used_counter->update(-(int64_t)block_byte_size);
100
911
    INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
101
911
    sub_blocks_memory_usage(block_byte_size);
102
911
    if (_block_queue.empty() && _source_dependency) {
103
391
        if (!_is_cancelled && _num_remaining_senders > 0) {
104
387
            _source_dependency->block();
105
387
        }
106
391
    }
107
108
911
    block_item.call_done(_recvr);
109
110
911
    DCHECK(block->empty());
111
911
    block->swap(*next_block);
112
911
    *eos = false;
113
911
    return Status::OK();
114
911
}
115
116
0
bool VDataStreamRecvr::SenderQueue::has_data_or_finished() {
117
0
    std::lock_guard<std::mutex> l(_lock);
118
0
    return _is_cancelled || !_block_queue.empty() || _num_remaining_senders == 0;
119
0
}
120
121
978
void VDataStreamRecvr::SenderQueue::set_source_ready(std::lock_guard<std::mutex>&) {
122
    // Here, it is necessary to check if _source_dependency is not nullptr.
123
    // This is because the queue might be closed before setting the source dependency.
124
978
    if (!_source_dependency) {
125
47
        return;
126
47
    }
127
931
    const bool should_wait = !_is_cancelled && _block_queue.empty() && _num_remaining_senders > 0;
128
931
    if (!should_wait) {
129
931
        _source_dependency->set_ready();
130
931
    }
131
931
}
132
133
void VDataStreamRecvr::SenderQueue::run_block_queue_done_callbacks(
134
43
        std::list<BlockItem>& block_queue) {
135
43
    for (auto& block_item : block_queue) {
136
40
        block_item.call_done(_recvr);
137
40
    }
138
43
}
139
140
0
std::string VDataStreamRecvr::SenderQueue::debug_string() {
141
0
    std::lock_guard<std::mutex> l(_lock);
142
0
    fmt::memory_buffer debug_string_buffer;
143
0
    fmt::format_to(debug_string_buffer,
144
0
                   "_num_remaining_senders = {}, block_queue size = {}, _is_cancelled: {}, "
145
0
                   "_cancel_status: {}, _sender_eos_set: (",
146
0
                   _num_remaining_senders, _block_queue.size(), _is_cancelled,
147
0
                   _cancel_status.to_string());
148
0
    for (auto& i : _sender_eos_set) {
149
0
        fmt::format_to(debug_string_buffer, "{}, ", i);
150
0
    }
151
0
    fmt::format_to(debug_string_buffer, ")");
152
0
    return fmt::to_string(debug_string_buffer);
153
0
}
154
155
Status VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr<PBlock> pblock, int be_number,
156
                                                int64_t packet_seq,
157
                                                ::google::protobuf::Closure** done,
158
                                                const int64_t wait_for_worker,
159
2.09k
                                                const uint64_t time_to_find_recvr) {
160
2.09k
    {
161
2.09k
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
162
2.09k
        if (_is_cancelled) {
163
1.56k
            return Status::OK();
164
1.56k
        }
165
533
        auto iter = _packet_seq_map.find(be_number);
166
533
        if (iter != _packet_seq_map.end()) {
167
523
            if (iter->second >= packet_seq) {
168
0
                return Status::InternalError(
169
0
                        "packet already exist [cur_packet_id= {} receive_packet_id={}]",
170
0
                        iter->second, packet_seq);
171
0
            }
172
523
            iter->second = packet_seq;
173
523
        } else {
174
10
            _packet_seq_map.emplace(be_number, packet_seq);
175
10
        }
176
177
533
        DCHECK(_num_remaining_senders >= 0);
178
533
        if (_num_remaining_senders == 0) {
179
0
            DCHECK(_sender_eos_set.contains(be_number));
180
0
            return Status::OK();
181
0
        }
182
533
    }
183
184
533
    INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
185
533
    if (_is_cancelled) {
186
3
        return Status::OK();
187
3
    }
188
189
530
    const auto block_byte_size = pblock->ByteSizeLong();
190
530
    COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
191
530
    if (_recvr->_max_wait_worker_time->value() < wait_for_worker) {
192
0
        _recvr->_max_wait_worker_time->set(wait_for_worker);
193
0
    }
194
195
530
    if (_recvr->_max_find_recvr_time->value() < time_to_find_recvr) {
196
0
        _recvr->_max_find_recvr_time->set((int64_t)time_to_find_recvr);
197
0
    }
198
199
530
    _block_queue.emplace_back(std::move(pblock), block_byte_size);
200
530
    COUNTER_UPDATE(_recvr->_remote_bytes_received_counter, block_byte_size);
201
530
    set_source_ready(l);
202
203
    // if done is nullptr, this function can't delay this response
204
535
    if (done != nullptr && _recvr->exceeds_limit(block_byte_size)) {
205
34
        _block_queue.back().set_done(*done);
206
34
        *done = nullptr;
207
34
    }
208
530
    _recvr->_memory_used_counter->update(block_byte_size);
209
530
    add_blocks_memory_usage(block_byte_size);
210
530
    return Status::OK();
211
533
}
212
213
Status VDataStreamRecvr::SenderQueue::add_blocks(const PTransmitDataParams* request,
214
                                                 ::google::protobuf::Closure** done,
215
                                                 const int64_t wait_for_worker,
216
1
                                                 const uint64_t time_to_find_recvr) {
217
1
    {
218
1
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
219
1
        if (_is_cancelled) {
220
0
            return Status::OK();
221
0
        }
222
1
        const int be_number = request->be_number();
223
        // In the request, the packet_seq for blocks is [request->packet_seq() - blocks_size(), request->packet_seq())
224
        // Note this is a left-closed, right-open interval; the packet_seq of the last block is request->packet_seq() - 1
225
        // We store the packet_seq of the last block in _packet_seq_map so we can compare it with the packet_seq of the next received packet
226
1
        const int64_t packet_seq = request->packet_seq() - 1;
227
1
        auto iter = _packet_seq_map.find(be_number);
228
1
        if (iter != _packet_seq_map.end()) {
229
0
            if (iter->second > (packet_seq - request->blocks_size())) {
230
0
                return Status::InternalError(
231
0
                        "packet already exist [cur_packet_id= {} receive_packet_id={}]",
232
0
                        iter->second, packet_seq);
233
0
            }
234
0
            iter->second = packet_seq;
235
1
        } else {
236
1
            _packet_seq_map.emplace(be_number, packet_seq);
237
1
        }
238
239
1
        DCHECK(_num_remaining_senders >= 0);
240
1
        if (_num_remaining_senders == 0) {
241
0
            DCHECK(_sender_eos_set.end() != _sender_eos_set.find(be_number));
242
0
            return Status::OK();
243
0
        }
244
1
    }
245
246
1
    INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
247
1
    if (_is_cancelled) {
248
0
        return Status::OK();
249
0
    }
250
251
1
    int64_t total_block_byte_size = 0;
252
3
    for (int i = 0; i < request->blocks_size(); i++) {
253
2
        std::unique_ptr<PBlock> pblock = std::make_unique<PBlock>();
254
2
        pblock->CopyFrom(request->blocks(i));
255
256
2
        const auto block_byte_size = pblock->ByteSizeLong();
257
2
        COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
258
2
        if (_recvr->_max_wait_worker_time->value() < wait_for_worker) {
259
1
            _recvr->_max_wait_worker_time->set(wait_for_worker);
260
1
        }
261
262
2
        if (_recvr->_max_find_recvr_time->value() < time_to_find_recvr) {
263
1
            _recvr->_max_find_recvr_time->set((int64_t)time_to_find_recvr);
264
1
        }
265
266
2
        _block_queue.emplace_back(std::move(pblock), block_byte_size);
267
2
        COUNTER_UPDATE(_recvr->_remote_bytes_received_counter, block_byte_size);
268
2
        total_block_byte_size += block_byte_size;
269
2
    }
270
271
1
    set_source_ready(l);
272
273
    // if done is nullptr, this function can't delay this response
274
1
    if (done != nullptr && _recvr->exceeds_limit(total_block_byte_size)) {
275
1
        _block_queue.back().set_done(*done);
276
1
        *done = nullptr;
277
1
    }
278
1
    _recvr->_memory_used_counter->update(total_block_byte_size);
279
1
    add_blocks_memory_usage(total_block_byte_size);
280
1
    return Status::OK();
281
1
}
282
283
708
void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) {
284
708
    if (block->rows() == 0) {
285
0
        return;
286
0
    }
287
708
    {
288
708
        INJECT_MOCK_SLEEP(std::unique_lock<std::mutex> l(_lock));
289
708
        if (_is_cancelled) {
290
297
            return;
291
297
        }
292
708
        DCHECK(_num_remaining_senders >= 0);
293
411
        if (_num_remaining_senders == 0) {
294
1
            return;
295
1
        }
296
411
    }
297
410
    BlockUPtr nblock = Block::create_unique(block->get_columns_with_type_and_name());
298
299
    // local exchange should copy the block contented if use move == false
300
410
    if (use_move) {
301
10
        block->clear();
302
400
    } else {
303
400
        auto rows = block->rows();
304
801
        for (int i = 0; i < nblock->columns(); ++i) {
305
401
            nblock->get_by_position(i).column =
306
401
                    nblock->get_by_position(i).column->clone_resized(rows);
307
401
        }
308
400
    }
309
410
    materialize_block_inplace(*nblock);
310
311
410
    auto block_mem_size = nblock->allocated_bytes();
312
410
    {
313
410
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
314
410
        if (_is_cancelled) {
315
0
            return;
316
0
        }
317
410
        _block_queue.emplace_back(std::move(nblock), block_mem_size);
318
410
        set_source_ready(l);
319
410
        COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size);
320
410
        _recvr->_memory_used_counter->update(block_mem_size);
321
410
        add_blocks_memory_usage(block_mem_size);
322
410
    }
323
410
}
324
325
23
void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) {
326
23
    INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
327
23
    if (_sender_eos_set.end() != _sender_eos_set.find(be_number)) {
328
0
        return;
329
0
    }
330
23
    _sender_eos_set.insert(be_number);
331
23
    DCHECK_GT(_num_remaining_senders, 0);
332
23
    _num_remaining_senders--;
333
23
    VLOG_FILE << "decremented senders: fragment_instance_id="
334
0
              << print_id(_recvr->fragment_instance_id()) << " node_id=" << _recvr->dest_node_id()
335
0
              << " #senders=" << _num_remaining_senders;
336
23
    if (_num_remaining_senders == 0) {
337
12
        set_source_ready(l);
338
12
    }
339
23
}
340
341
8
void VDataStreamRecvr::SenderQueue::cancel(Status cancel_status) {
342
8
    std::list<BlockItem> block_queue;
343
8
    {
344
8
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
345
8
        if (_is_cancelled) {
346
7
            return;
347
7
        }
348
1
        _is_cancelled = true;
349
1
        _cancel_status = cancel_status;
350
1
        set_source_ready(l);
351
1
        VLOG_QUERY << "cancelled stream: _fragment_instance_id="
352
0
                   << print_id(_recvr->fragment_instance_id())
353
0
                   << " node_id=" << _recvr->dest_node_id();
354
1
        block_queue.splice(block_queue.end(), _block_queue);
355
1
    }
356
0
    run_block_queue_done_callbacks(block_queue);
357
1
}
358
359
15
void VDataStreamRecvr::SenderQueue::close() {
360
    // If _is_cancelled is not set to true, there may be concurrent send
361
    // which add batch to _block_queue. The batch added after _block_queue
362
    // is clear will be memory leak
363
15
    std::list<BlockItem> block_queue;
364
15
    {
365
15
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
366
15
        _is_cancelled = true;
367
15
        set_source_ready(l);
368
15
        block_queue.splice(block_queue.end(), _block_queue);
369
15
    }
370
    // Release delayed RPC callbacks after the queue state is fully closed.
371
15
    run_block_queue_done_callbacks(block_queue);
372
15
}
373
374
VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr,
375
                                   RuntimeProfile::HighWaterMarkCounter* memory_used_counter,
376
                                   RuntimeState* state, const TUniqueId& fragment_instance_id,
377
                                   PlanNodeId dest_node_id, int num_senders, bool is_merging,
378
                                   RuntimeProfile* profile, size_t data_queue_capacity)
379
23
        : HasTaskExecutionCtx(state),
380
23
          _mgr(stream_mgr),
381
23
          _memory_used_counter(memory_used_counter),
382
23
          _resource_ctx(state->get_query_ctx()->resource_ctx()),
383
23
          _query_context(state->get_query_ctx()->shared_from_this()),
384
23
          _fragment_instance_id(fragment_instance_id),
385
23
          _dest_node_id(dest_node_id),
386
23
          _is_merging(is_merging),
387
23
          _is_closed(false),
388
23
          _sender_queue_mem_limit(data_queue_capacity),
389
23
          _profile(profile) {
390
    // DataStreamRecvr may be destructed after the instance execution thread ends.
391
23
    _mem_tracker =
392
23
            std::make_unique<MemTracker>("VDataStreamRecvr:" + print_id(_fragment_instance_id));
393
23
    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
394
395
    // Create one queue per sender if is_merging is true.
396
23
    int num_queues = is_merging ? num_senders : 1;
397
23
    _sender_to_local_channel_dependency.resize(num_queues);
398
50
    for (size_t i = 0; i < num_queues; i++) {
399
27
        _sender_to_local_channel_dependency[i] = Dependency::create_shared(
400
27
                _dest_node_id, _dest_node_id, fmt::format("LocalExchangeChannelDependency_{}", i),
401
27
                true);
402
27
    }
403
23
    _sender_queues.reserve(num_queues);
404
23
    int num_sender_per_queue = is_merging ? 1 : num_senders;
405
50
    for (int i = 0; i < num_queues; ++i) {
406
27
        SenderQueue* queue = nullptr;
407
27
        queue = _sender_queue_pool.add(new SenderQueue(this, num_sender_per_queue,
408
27
                                                       _sender_to_local_channel_dependency[i]));
409
27
        _sender_queues.push_back(queue);
410
27
    }
411
412
    // Initialize the counters
413
23
    _remote_bytes_received_counter = ADD_COUNTER(_profile, "RemoteBytesReceived", TUnit::BYTES);
414
23
    _local_bytes_received_counter = ADD_COUNTER(_profile, "LocalBytesReceived", TUnit::BYTES);
415
416
23
    _deserialize_row_batch_timer = ADD_TIMER(_profile, "DeserializeRowBatchTimer");
417
23
    _data_arrival_timer = ADD_TIMER(_profile, "DataArrivalWaitTime");
418
23
    _buffer_full_total_timer = ADD_TIMER(_profile, "SendersBlockedTotalTimer(*)");
419
23
    _first_batch_wait_total_timer = ADD_TIMER(_profile, "FirstBatchArrivalWaitTime");
420
23
    _decompress_timer = ADD_TIMER(_profile, "DecompressTime");
421
23
    _decompress_bytes = ADD_COUNTER(_profile, "DecompressBytes", TUnit::BYTES);
422
23
    _blocks_produced_counter = ADD_COUNTER(_profile, "BlocksProduced", TUnit::UNIT);
423
23
    _max_wait_worker_time = ADD_COUNTER(_profile, "MaxWaitForWorkerTime", TUnit::UNIT);
424
23
    _max_wait_to_process_time = ADD_COUNTER(_profile, "MaxWaitToProcessTime", TUnit::UNIT);
425
23
    _max_find_recvr_time = ADD_COUNTER(_profile, "MaxFindRecvrCpuTime(NS)", TUnit::UNIT);
426
23
}
427
428
23
VDataStreamRecvr::~VDataStreamRecvr() {
429
23
    DCHECK(_mgr == nullptr) << "Must call close()";
430
23
}
431
432
Status VDataStreamRecvr::create_merger(const VExprContextSPtrs& ordering_expr,
433
                                       const std::vector<bool>& is_asc_order,
434
                                       const std::vector<bool>& nulls_first, size_t batch_size,
435
0
                                       int64_t limit, size_t offset) {
436
0
    DCHECK(_is_merging);
437
0
    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
438
0
    std::vector<BlockSupplier> child_block_suppliers;
439
0
    std::vector<BlockSupplierReadyChecker> child_block_supplier_ready_checkers;
440
    // Create the merger that will a single stream of sorted rows.
441
0
    _merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, nulls_first, batch_size, limit,
442
0
                                       offset, _profile));
443
444
0
    child_block_suppliers.reserve(_sender_queues.size());
445
0
    child_block_supplier_ready_checkers.reserve(_sender_queues.size());
446
0
    for (int i = 0; i < _sender_queues.size(); ++i) {
447
0
        child_block_suppliers.emplace_back(std::bind(std::mem_fn(&SenderQueue::get_batch),
448
0
                                                     _sender_queues[i], std::placeholders::_1,
449
0
                                                     std::placeholders::_2));
450
0
        child_block_supplier_ready_checkers.emplace_back(
451
0
                std::bind(std::mem_fn(&SenderQueue::has_data_or_finished), _sender_queues[i]));
452
0
    }
453
0
    RETURN_IF_ERROR(_merger->prepare(child_block_suppliers, child_block_supplier_ready_checkers));
454
0
    return Status::OK();
455
0
}
456
457
Status VDataStreamRecvr::add_block(std::unique_ptr<PBlock> pblock, int sender_id, int be_number,
458
                                   int64_t packet_seq, ::google::protobuf::Closure** done,
459
                                   const int64_t wait_for_worker,
460
0
                                   const uint64_t time_to_find_recvr) {
461
0
    SCOPED_ATTACH_TASK(_resource_ctx);
462
0
    if (_query_context->low_memory_mode()) {
463
0
        set_low_memory_mode();
464
0
    }
465
466
0
    int use_sender_id = _is_merging ? sender_id : 0;
467
0
    return _sender_queues[use_sender_id]->add_block(std::move(pblock), be_number, packet_seq, done,
468
0
                                                    wait_for_worker, time_to_find_recvr);
469
0
}
470
471
Status VDataStreamRecvr::add_blocks(const PTransmitDataParams* request,
472
                                    ::google::protobuf::Closure** done,
473
                                    const int64_t wait_for_worker,
474
1
                                    const uint64_t time_to_find_recvr) {
475
1
    SCOPED_ATTACH_TASK(_resource_ctx);
476
1
    if (_query_context->low_memory_mode()) {
477
0
        set_low_memory_mode();
478
0
    }
479
1
    int use_sender_id = _is_merging ? request->sender_id() : 0;
480
1
    return _sender_queues[use_sender_id]->add_blocks(request, done, wait_for_worker,
481
1
                                                     time_to_find_recvr);
482
1
}
483
484
4
void VDataStreamRecvr::add_block(Block* block, int sender_id, bool use_move) {
485
4
    if (_query_context->low_memory_mode()) {
486
0
        set_low_memory_mode();
487
0
    }
488
4
    int use_sender_id = _is_merging ? sender_id : 0;
489
4
    _sender_queues[use_sender_id]->add_block(block, use_move);
490
4
}
491
492
0
std::string VDataStreamRecvr::debug_string() {
493
0
    fmt::memory_buffer debug_string_buffer;
494
0
    fmt::format_to(debug_string_buffer,
495
0
                   "fragment_instance_id: {}, _dest_node_id: {}, _is_merging: {}, _is_closed: {}",
496
0
                   print_id(_fragment_instance_id), _dest_node_id, _is_merging, _is_closed);
497
0
    for (size_t i = 0; i < _sender_queues.size(); i++) {
498
0
        fmt::format_to(debug_string_buffer, "No. {} queue: {}", i,
499
0
                       _sender_queues[i]->debug_string());
500
0
    }
501
0
    return fmt::to_string(debug_string_buffer);
502
0
}
503
504
3
std::shared_ptr<Dependency> VDataStreamRecvr::get_local_channel_dependency(int sender_id) {
505
3
    DCHECK(_sender_to_local_channel_dependency[_is_merging ? sender_id : 0] != nullptr);
506
3
    return _sender_to_local_channel_dependency[_is_merging ? sender_id : 0];
507
3
}
508
509
11
Status VDataStreamRecvr::get_next(Block* block, bool* eos) {
510
11
    if (!_is_merging) {
511
11
        block->clear();
512
11
        return _sender_queues[0]->get_batch(block, eos);
513
11
    } else {
514
0
        return _merger->get_next(block, eos);
515
0
    }
516
11
}
517
518
3
void VDataStreamRecvr::remove_sender(int sender_id, int be_number, Status exec_status) {
519
3
    if (!exec_status.ok()) {
520
0
        cancel_stream(exec_status);
521
0
        return;
522
0
    }
523
3
    int use_sender_id = _is_merging ? sender_id : 0;
524
3
    _sender_queues[use_sender_id]->decrement_senders(be_number);
525
3
}
526
527
7
void VDataStreamRecvr::cancel_stream(Status exec_status) {
528
7
    VLOG_QUERY << "cancel_stream: fragment_instance_id=" << print_id(_fragment_instance_id)
529
0
               << exec_status;
530
531
14
    for (int i = 0; i < _sender_queues.size(); ++i) {
532
7
        _sender_queues[i]->cancel(exec_status);
533
7
    }
534
7
}
535
536
950
void VDataStreamRecvr::SenderQueue::add_blocks_memory_usage(int64_t size) {
537
950
    DCHECK(size >= 0);
538
950
    _recvr->_mem_tracker->consume(size);
539
950
    _queue_mem_tracker->consume(size);
540
950
    if (_local_channel_dependency && exceeds_limit()) {
541
942
        _local_channel_dependency->block();
542
942
    }
543
950
}
544
545
911
void VDataStreamRecvr::SenderQueue::sub_blocks_memory_usage(int64_t size) {
546
911
    DCHECK(size >= 0);
547
911
    _recvr->_mem_tracker->release(size);
548
911
    _queue_mem_tracker->release(size);
549
911
    if (_local_channel_dependency && (!exceeds_limit())) {
550
392
        _local_channel_dependency->set_ready();
551
392
    }
552
911
}
553
554
1.86k
bool VDataStreamRecvr::SenderQueue::exceeds_limit() {
555
1.86k
    const size_t queue_byte_size = _queue_mem_tracker->consumption();
556
1.86k
    return _recvr->queue_exceeds_limit(queue_byte_size);
557
1.86k
}
558
559
502
bool VDataStreamRecvr::exceeds_limit(size_t block_byte_size) {
560
502
    return _mem_tracker->consumption() + block_byte_size > config::exchg_node_buffer_size_bytes;
561
502
}
562
563
1.86k
bool VDataStreamRecvr::queue_exceeds_limit(size_t queue_byte_size) const {
564
1.86k
    return queue_byte_size >= _sender_queue_mem_limit;
565
1.86k
}
566
567
15
void VDataStreamRecvr::close() {
568
15
    if (_is_closed) {
569
5
        return;
570
5
    }
571
10
    _is_closed = true;
572
10
    for (auto& it : _sender_to_local_channel_dependency) {
573
10
        it->set_always_ready();
574
10
    }
575
20
    for (int i = 0; i < _sender_queues.size(); ++i) {
576
10
        _sender_queues[i]->close();
577
10
    }
578
    // Remove this receiver from the DataStreamMgr that created it.
579
    // TODO: log error msg
580
10
    if (_mgr) {
581
7
        static_cast<void>(_mgr->deregister_recvr(fragment_instance_id(), dest_node_id()));
582
7
    }
583
10
    _mgr = nullptr;
584
585
10
    _merger.reset();
586
10
}
587
588
7
void VDataStreamRecvr::set_sink_dep_always_ready() const {
589
9
    for (auto dep : _sender_to_local_channel_dependency) {
590
9
        dep->set_always_ready();
591
9
    }
592
7
}
593
594
} // namespace doris