Coverage Report

Created: 2026-04-15 20:02

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/exchange/vdata_stream_recvr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/exchange/vdata_stream_recvr.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Metrics_types.h>
22
#include <gen_cpp/Types_types.h>
23
#include <gen_cpp/data.pb.h>
24
25
#include <algorithm>
26
#include <functional>
27
#include <string>
28
29
#include "common/logging.h"
30
#include "core/block/block.h"
31
#include "core/block/materialize_block.h"
32
#include "exec/exchange/vdata_stream_mgr.h"
33
#include "exec/operator/exchange_sink_operator.h"
34
#include "exec/operator/exchange_source_operator.h"
35
#include "exec/sort/sort_cursor.h"
36
#include "exec/sort/vsorted_run_merger.h"
37
#include "runtime/memory/mem_tracker.h"
38
#include "runtime/runtime_state.h"
39
#include "runtime/thread_context.h"
40
#include "util/defer_op.h"
41
#include "util/uid_util.h"
42
43
namespace doris {
44
45
VDataStreamRecvr::SenderQueue::SenderQueue(VDataStreamRecvr* parent_recvr, int num_senders,
46
                                           std::shared_ptr<Dependency> local_channel_dependency)
47
24
        : _recvr(parent_recvr),
48
24
          _is_cancelled(false),
49
24
          _num_remaining_senders(num_senders),
50
24
          _local_channel_dependency(local_channel_dependency) {
51
24
    _cancel_status = Status::OK();
52
24
    _queue_mem_tracker = std::make_unique<MemTracker>("local data queue mem tracker");
53
24
}
54
55
24
VDataStreamRecvr::SenderQueue::~SenderQueue() {
56
24
    for (auto& block_item : _block_queue) {
57
0
        block_item.call_done(_recvr);
58
0
    }
59
24
    _block_queue.clear();
60
24
}
61
62
920
Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) {
63
920
#ifndef NDEBUG
64
920
    if (!_is_cancelled && _block_queue.empty() && _num_remaining_senders > 0) {
65
0
        throw doris::Exception(ErrorCode::INTERNAL_ERROR,
66
0
                               "_is_cancelled: {}, _block_queue_empty: {}, "
67
0
                               "_num_remaining_senders: {}",
68
0
                               _is_cancelled, _block_queue.empty(), _num_remaining_senders);
69
0
    }
70
920
#endif
71
920
    BlockItem block_item;
72
920
    {
73
920
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
74
        //check and get block_item from data_queue
75
920
        if (_is_cancelled) {
76
1
            RETURN_IF_ERROR(_cancel_status);
77
1
            return Status::Cancelled("Cancelled");
78
1
        }
79
80
919
        if (_block_queue.empty()) {
81
9
            if (_num_remaining_senders != 0) {
82
0
                return Status::InternalError(
83
0
                        "Data queue is empty but there are still remaining senders. "
84
0
                        "_num_remaining_senders: {}",
85
0
                        _num_remaining_senders);
86
0
            }
87
9
            *eos = true;
88
9
            return Status::OK();
89
9
        }
90
91
919
        DCHECK(!_block_queue.empty());
92
910
        block_item = std::move(_block_queue.front());
93
910
        _block_queue.pop_front();
94
910
    }
95
0
    BlockUPtr next_block;
96
910
    RETURN_IF_ERROR(block_item.get_block(next_block));
97
910
    size_t block_byte_size = block_item.block_byte_size();
98
910
    COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, block_item.deserialize_time());
99
910
    COUNTER_UPDATE(_recvr->_decompress_timer, block_item.decompress_time());
100
910
    COUNTER_UPDATE(_recvr->_decompress_bytes, block_item.decompress_bytes());
101
910
    _recvr->_memory_used_counter->update(-(int64_t)block_byte_size);
102
910
    INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
103
910
    sub_blocks_memory_usage(block_byte_size);
104
910
    if (_block_queue.empty() && _source_dependency) {
105
408
        if (!_is_cancelled && _num_remaining_senders > 0) {
106
405
            _source_dependency->block();
107
405
        }
108
408
    }
109
110
910
    block_item.call_done(_recvr);
111
112
910
    DCHECK(block->empty());
113
910
    block->swap(*next_block);
114
910
    *eos = false;
115
910
    return Status::OK();
116
910
}
117
118
939
void VDataStreamRecvr::SenderQueue::set_source_ready(std::lock_guard<std::mutex>&) {
119
    // Here, it is necessary to check if _source_dependency is not nullptr.
120
    // This is because the queue might be closed before setting the source dependency.
121
939
    if (!_source_dependency) {
122
11
        return;
123
11
    }
124
928
    const bool should_wait = !_is_cancelled && _block_queue.empty() && _num_remaining_senders > 0;
125
928
    if (!should_wait) {
126
928
        _source_dependency->set_ready();
127
928
    }
128
928
}
129
130
0
std::string VDataStreamRecvr::SenderQueue::debug_string() {
131
0
    fmt::memory_buffer debug_string_buffer;
132
0
    fmt::format_to(debug_string_buffer,
133
0
                   "_num_remaining_senders = {}, block_queue size = {}, _is_cancelled: {}, "
134
0
                   "_cancel_status: {}, _sender_eos_set: (",
135
0
                   _num_remaining_senders, _block_queue.size(), _is_cancelled,
136
0
                   _cancel_status.to_string());
137
0
    std::lock_guard<std::mutex> l(_lock);
138
0
    for (auto& i : _sender_eos_set) {
139
0
        fmt::format_to(debug_string_buffer, "{}, ", i);
140
0
    }
141
0
    fmt::format_to(debug_string_buffer, ")");
142
0
    return fmt::to_string(debug_string_buffer);
143
0
}
144
145
Status VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr<PBlock> pblock, int be_number,
146
                                                int64_t packet_seq,
147
                                                ::google::protobuf::Closure** done,
148
                                                const int64_t wait_for_worker,
149
490
                                                const uint64_t time_to_find_recvr) {
150
490
    {
151
490
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
152
490
        if (_is_cancelled) {
153
0
            return Status::OK();
154
0
        }
155
490
        auto iter = _packet_seq_map.find(be_number);
156
495
        if (iter != _packet_seq_map.end()) {
157
495
            if (iter->second >= packet_seq) {
158
0
                return Status::InternalError(
159
0
                        "packet already exist [cur_packet_id= {} receive_packet_id={}]",
160
0
                        iter->second, packet_seq);
161
0
            }
162
495
            iter->second = packet_seq;
163
18.4E
        } else {
164
18.4E
            _packet_seq_map.emplace(be_number, packet_seq);
165
18.4E
        }
166
167
490
        DCHECK(_num_remaining_senders >= 0);
168
490
        if (_num_remaining_senders == 0) {
169
0
            DCHECK(_sender_eos_set.contains(be_number));
170
0
            return Status::OK();
171
0
        }
172
490
    }
173
174
490
    INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
175
490
    if (_is_cancelled) {
176
0
        return Status::OK();
177
0
    }
178
179
490
    const auto block_byte_size = pblock->ByteSizeLong();
180
490
    COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
181
490
    if (_recvr->_max_wait_worker_time->value() < wait_for_worker) {
182
0
        _recvr->_max_wait_worker_time->set(wait_for_worker);
183
0
    }
184
185
490
    if (_recvr->_max_find_recvr_time->value() < time_to_find_recvr) {
186
0
        _recvr->_max_find_recvr_time->set((int64_t)time_to_find_recvr);
187
0
    }
188
189
490
    _block_queue.emplace_back(std::move(pblock), block_byte_size);
190
490
    COUNTER_UPDATE(_recvr->_remote_bytes_received_counter, block_byte_size);
191
490
    set_source_ready(l);
192
193
    // if done is nullptr, this function can't delay this response
194
502
    if (done != nullptr && _recvr->exceeds_limit(block_byte_size)) {
195
1
        _block_queue.back().set_done(*done);
196
1
        *done = nullptr;
197
1
    }
198
490
    _recvr->_memory_used_counter->update(block_byte_size);
199
490
    add_blocks_memory_usage(block_byte_size);
200
490
    return Status::OK();
201
490
}
202
203
Status VDataStreamRecvr::SenderQueue::add_blocks(const PTransmitDataParams* request,
204
                                                 ::google::protobuf::Closure** done,
205
                                                 const int64_t wait_for_worker,
206
1
                                                 const uint64_t time_to_find_recvr) {
207
1
    {
208
1
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
209
1
        if (_is_cancelled) {
210
0
            return Status::OK();
211
0
        }
212
1
        const int be_number = request->be_number();
213
        // In the request, the packet_seq for blocks is [request->packet_seq() - blocks_size(), request->packet_seq())
214
        // Note this is a left-closed, right-open interval; the packet_seq of the last block is request->packet_seq() - 1
215
        // We store the packet_seq of the last block in _packet_seq_map so we can compare it with the packet_seq of the next received packet
216
1
        const int64_t packet_seq = request->packet_seq() - 1;
217
1
        auto iter = _packet_seq_map.find(be_number);
218
1
        if (iter != _packet_seq_map.end()) {
219
0
            if (iter->second > (packet_seq - request->blocks_size())) {
220
0
                return Status::InternalError(
221
0
                        "packet already exist [cur_packet_id= {} receive_packet_id={}]",
222
0
                        iter->second, packet_seq);
223
0
            }
224
0
            iter->second = packet_seq;
225
1
        } else {
226
1
            _packet_seq_map.emplace(be_number, packet_seq);
227
1
        }
228
229
1
        DCHECK(_num_remaining_senders >= 0);
230
1
        if (_num_remaining_senders == 0) {
231
0
            DCHECK(_sender_eos_set.end() != _sender_eos_set.find(be_number));
232
0
            return Status::OK();
233
0
        }
234
1
    }
235
236
1
    INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
237
1
    if (_is_cancelled) {
238
0
        return Status::OK();
239
0
    }
240
241
1
    int64_t total_block_byte_size = 0;
242
3
    for (int i = 0; i < request->blocks_size(); i++) {
243
2
        std::unique_ptr<PBlock> pblock = std::make_unique<PBlock>();
244
2
        pblock->CopyFrom(request->blocks(i));
245
246
2
        const auto block_byte_size = pblock->ByteSizeLong();
247
2
        COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
248
2
        if (_recvr->_max_wait_worker_time->value() < wait_for_worker) {
249
1
            _recvr->_max_wait_worker_time->set(wait_for_worker);
250
1
        }
251
252
2
        if (_recvr->_max_find_recvr_time->value() < time_to_find_recvr) {
253
1
            _recvr->_max_find_recvr_time->set((int64_t)time_to_find_recvr);
254
1
        }
255
256
2
        _block_queue.emplace_back(std::move(pblock), block_byte_size);
257
2
        COUNTER_UPDATE(_recvr->_remote_bytes_received_counter, block_byte_size);
258
2
        total_block_byte_size += block_byte_size;
259
2
    }
260
261
1
    set_source_ready(l);
262
263
    // if done is nullptr, this function can't delay this response
264
1
    if (done != nullptr && _recvr->exceeds_limit(total_block_byte_size)) {
265
1
        _block_queue.back().set_done(*done);
266
1
        *done = nullptr;
267
1
    }
268
1
    _recvr->_memory_used_counter->update(total_block_byte_size);
269
1
    add_blocks_memory_usage(total_block_byte_size);
270
1
    return Status::OK();
271
1
}
272
273
698
void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) {
274
698
    if (block->rows() == 0) {
275
0
        return;
276
0
    }
277
698
    {
278
698
        INJECT_MOCK_SLEEP(std::unique_lock<std::mutex> l(_lock));
279
698
        if (_is_cancelled) {
280
297
            return;
281
297
        }
282
698
        DCHECK(_num_remaining_senders >= 0);
283
401
        if (_num_remaining_senders == 0) {
284
1
            return;
285
1
        }
286
401
    }
287
400
    BlockUPtr nblock = Block::create_unique(block->get_columns_with_type_and_name());
288
289
    // local exchange should copy the block contented if use move == false
290
400
    if (use_move) {
291
10
        block->clear();
292
390
    } else {
293
390
        auto rows = block->rows();
294
794
        for (int i = 0; i < nblock->columns(); ++i) {
295
404
            nblock->get_by_position(i).column =
296
404
                    nblock->get_by_position(i).column->clone_resized(rows);
297
404
        }
298
390
    }
299
400
    materialize_block_inplace(*nblock);
300
301
400
    auto block_mem_size = nblock->allocated_bytes();
302
400
    {
303
400
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
304
400
        if (_is_cancelled) {
305
3
            return;
306
3
        }
307
397
        _block_queue.emplace_back(std::move(nblock), block_mem_size);
308
397
        set_source_ready(l);
309
397
        COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size);
310
397
        _recvr->_memory_used_counter->update(block_mem_size);
311
397
        add_blocks_memory_usage(block_mem_size);
312
397
    }
313
397
}
314
315
23
void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) {
316
23
    INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
317
23
    if (_sender_eos_set.end() != _sender_eos_set.find(be_number)) {
318
0
        return;
319
0
    }
320
23
    _sender_eos_set.insert(be_number);
321
23
    DCHECK_GT(_num_remaining_senders, 0);
322
23
    _num_remaining_senders--;
323
23
    VLOG_FILE << "decremented senders: fragment_instance_id="
324
0
              << print_id(_recvr->fragment_instance_id()) << " node_id=" << _recvr->dest_node_id()
325
0
              << " #senders=" << _num_remaining_senders;
326
23
    if (_num_remaining_senders == 0) {
327
12
        set_source_ready(l);
328
12
    }
329
23
}
330
331
7
void VDataStreamRecvr::SenderQueue::cancel(Status cancel_status) {
332
7
    {
333
7
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
334
7
        if (_is_cancelled) {
335
7
            return;
336
7
        }
337
0
        _is_cancelled = true;
338
0
        _cancel_status = cancel_status;
339
0
        set_source_ready(l);
340
0
        VLOG_QUERY << "cancelled stream: _fragment_instance_id="
341
0
                   << print_id(_recvr->fragment_instance_id())
342
0
                   << " node_id=" << _recvr->dest_node_id();
343
0
    }
344
0
    {
345
0
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
346
0
        for (auto& block_item : _block_queue) {
347
0
            block_item.call_done(_recvr);
348
0
        }
349
0
        _block_queue.clear();
350
0
    }
351
0
}
352
353
13
void VDataStreamRecvr::SenderQueue::close() {
354
    // If _is_cancelled is not set to true, there may be concurrent send
355
    // which add batch to _block_queue. The batch added after _block_queue
356
    // is clear will be memory leak
357
13
    INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
358
13
    _is_cancelled = true;
359
13
    set_source_ready(l);
360
361
13
    for (auto& block_item : _block_queue) {
362
5
        block_item.call_done(_recvr);
363
5
    }
364
    // Delete any batches queued in _block_queue
365
13
    _block_queue.clear();
366
13
}
367
368
VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr,
369
                                   RuntimeProfile::HighWaterMarkCounter* memory_used_counter,
370
                                   RuntimeState* state, const TUniqueId& fragment_instance_id,
371
                                   PlanNodeId dest_node_id, int num_senders, bool is_merging,
372
                                   RuntimeProfile* profile, size_t data_queue_capacity)
373
20
        : HasTaskExecutionCtx(state),
374
20
          _mgr(stream_mgr),
375
20
          _memory_used_counter(memory_used_counter),
376
20
          _resource_ctx(state->get_query_ctx()->resource_ctx()),
377
20
          _query_context(state->get_query_ctx()->shared_from_this()),
378
20
          _fragment_instance_id(fragment_instance_id),
379
20
          _dest_node_id(dest_node_id),
380
20
          _is_merging(is_merging),
381
20
          _is_closed(false),
382
20
          _sender_queue_mem_limit(data_queue_capacity),
383
20
          _profile(profile) {
384
    // DataStreamRecvr may be destructed after the instance execution thread ends.
385
20
    _mem_tracker =
386
20
            std::make_unique<MemTracker>("VDataStreamRecvr:" + print_id(_fragment_instance_id));
387
20
    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
388
389
    // Create one queue per sender if is_merging is true.
390
20
    int num_queues = is_merging ? num_senders : 1;
391
20
    _sender_to_local_channel_dependency.resize(num_queues);
392
44
    for (size_t i = 0; i < num_queues; i++) {
393
24
        _sender_to_local_channel_dependency[i] = Dependency::create_shared(
394
24
                _dest_node_id, _dest_node_id, fmt::format("LocalExchangeChannelDependency_{}", i),
395
24
                true);
396
24
    }
397
20
    _sender_queues.reserve(num_queues);
398
20
    int num_sender_per_queue = is_merging ? 1 : num_senders;
399
44
    for (int i = 0; i < num_queues; ++i) {
400
24
        SenderQueue* queue = nullptr;
401
24
        queue = _sender_queue_pool.add(new SenderQueue(this, num_sender_per_queue,
402
24
                                                       _sender_to_local_channel_dependency[i]));
403
24
        _sender_queues.push_back(queue);
404
24
    }
405
406
    // Initialize the counters
407
20
    _remote_bytes_received_counter = ADD_COUNTER(_profile, "RemoteBytesReceived", TUnit::BYTES);
408
20
    _local_bytes_received_counter = ADD_COUNTER(_profile, "LocalBytesReceived", TUnit::BYTES);
409
410
20
    _deserialize_row_batch_timer = ADD_TIMER(_profile, "DeserializeRowBatchTimer");
411
20
    _data_arrival_timer = ADD_TIMER(_profile, "DataArrivalWaitTime");
412
20
    _buffer_full_total_timer = ADD_TIMER(_profile, "SendersBlockedTotalTimer(*)");
413
20
    _first_batch_wait_total_timer = ADD_TIMER(_profile, "FirstBatchArrivalWaitTime");
414
20
    _decompress_timer = ADD_TIMER(_profile, "DecompressTime");
415
20
    _decompress_bytes = ADD_COUNTER(_profile, "DecompressBytes", TUnit::BYTES);
416
20
    _blocks_produced_counter = ADD_COUNTER(_profile, "BlocksProduced", TUnit::UNIT);
417
20
    _max_wait_worker_time = ADD_COUNTER(_profile, "MaxWaitForWorkerTime", TUnit::UNIT);
418
20
    _max_wait_to_process_time = ADD_COUNTER(_profile, "MaxWaitToProcessTime", TUnit::UNIT);
419
20
    _max_find_recvr_time = ADD_COUNTER(_profile, "MaxFindRecvrTime(NS)", TUnit::UNIT);
420
20
}
421
422
20
VDataStreamRecvr::~VDataStreamRecvr() {
423
20
    DCHECK(_mgr == nullptr) << "Must call close()";
424
20
}
425
426
Status VDataStreamRecvr::create_merger(const VExprContextSPtrs& ordering_expr,
427
                                       const std::vector<bool>& is_asc_order,
428
                                       const std::vector<bool>& nulls_first, size_t batch_size,
429
0
                                       int64_t limit, size_t offset) {
430
0
    DCHECK(_is_merging);
431
0
    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
432
0
    std::vector<BlockSupplier> child_block_suppliers;
433
    // Create the merger that will a single stream of sorted rows.
434
0
    _merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, nulls_first, batch_size, limit,
435
0
                                       offset, _profile));
436
437
0
    for (int i = 0; i < _sender_queues.size(); ++i) {
438
0
        child_block_suppliers.emplace_back(std::bind(std::mem_fn(&SenderQueue::get_batch),
439
0
                                                     _sender_queues[i], std::placeholders::_1,
440
0
                                                     std::placeholders::_2));
441
0
    }
442
0
    RETURN_IF_ERROR(_merger->prepare(child_block_suppliers));
443
0
    return Status::OK();
444
0
}
445
446
Status VDataStreamRecvr::add_block(std::unique_ptr<PBlock> pblock, int sender_id, int be_number,
447
                                   int64_t packet_seq, ::google::protobuf::Closure** done,
448
                                   const int64_t wait_for_worker,
449
0
                                   const uint64_t time_to_find_recvr) {
450
0
    SCOPED_ATTACH_TASK(_resource_ctx);
451
0
    if (_query_context->low_memory_mode()) {
452
0
        set_low_memory_mode();
453
0
    }
454
455
0
    int use_sender_id = _is_merging ? sender_id : 0;
456
0
    return _sender_queues[use_sender_id]->add_block(std::move(pblock), be_number, packet_seq, done,
457
0
                                                    wait_for_worker, time_to_find_recvr);
458
0
}
459
460
Status VDataStreamRecvr::add_blocks(const PTransmitDataParams* request,
461
                                    ::google::protobuf::Closure** done,
462
                                    const int64_t wait_for_worker,
463
1
                                    const uint64_t time_to_find_recvr) {
464
1
    SCOPED_ATTACH_TASK(_resource_ctx);
465
1
    if (_query_context->low_memory_mode()) {
466
0
        set_low_memory_mode();
467
0
    }
468
1
    int use_sender_id = _is_merging ? request->sender_id() : 0;
469
1
    return _sender_queues[use_sender_id]->add_blocks(request, done, wait_for_worker,
470
1
                                                     time_to_find_recvr);
471
1
}
472
473
4
void VDataStreamRecvr::add_block(Block* block, int sender_id, bool use_move) {
474
4
    if (_query_context->low_memory_mode()) {
475
0
        set_low_memory_mode();
476
0
    }
477
4
    int use_sender_id = _is_merging ? sender_id : 0;
478
4
    _sender_queues[use_sender_id]->add_block(block, use_move);
479
4
}
480
481
0
std::string VDataStreamRecvr::debug_string() {
482
0
    fmt::memory_buffer debug_string_buffer;
483
0
    fmt::format_to(debug_string_buffer,
484
0
                   "fragment_instance_id: {}, _dest_node_id: {}, _is_merging: {}, _is_closed: {}",
485
0
                   print_id(_fragment_instance_id), _dest_node_id, _is_merging, _is_closed);
486
0
    for (size_t i = 0; i < _sender_queues.size(); i++) {
487
0
        fmt::format_to(debug_string_buffer, "No. {} queue: {}", i,
488
0
                       _sender_queues[i]->debug_string());
489
0
    }
490
0
    return fmt::to_string(debug_string_buffer);
491
0
}
492
493
3
std::shared_ptr<Dependency> VDataStreamRecvr::get_local_channel_dependency(int sender_id) {
494
3
    DCHECK(_sender_to_local_channel_dependency[_is_merging ? sender_id : 0] != nullptr);
495
3
    return _sender_to_local_channel_dependency[_is_merging ? sender_id : 0];
496
3
}
497
498
11
Status VDataStreamRecvr::get_next(Block* block, bool* eos) {
499
11
    if (!_is_merging) {
500
11
        block->clear();
501
11
        return _sender_queues[0]->get_batch(block, eos);
502
11
    } else {
503
0
        return _merger->get_next(block, eos);
504
0
    }
505
11
}
506
507
3
void VDataStreamRecvr::remove_sender(int sender_id, int be_number, Status exec_status) {
508
3
    if (!exec_status.ok()) {
509
0
        cancel_stream(exec_status);
510
0
        return;
511
0
    }
512
3
    int use_sender_id = _is_merging ? sender_id : 0;
513
3
    _sender_queues[use_sender_id]->decrement_senders(be_number);
514
3
}
515
516
7
void VDataStreamRecvr::cancel_stream(Status exec_status) {
517
7
    VLOG_QUERY << "cancel_stream: fragment_instance_id=" << print_id(_fragment_instance_id)
518
0
               << exec_status;
519
520
14
    for (int i = 0; i < _sender_queues.size(); ++i) {
521
7
        _sender_queues[i]->cancel(exec_status);
522
7
    }
523
7
}
524
525
914
void VDataStreamRecvr::SenderQueue::add_blocks_memory_usage(int64_t size) {
526
914
    DCHECK(size >= 0);
527
914
    _recvr->_mem_tracker->consume(size);
528
914
    _queue_mem_tracker->consume(size);
529
914
    if (_local_channel_dependency && exceeds_limit()) {
530
906
        _local_channel_dependency->block();
531
906
    }
532
914
}
533
534
910
void VDataStreamRecvr::SenderQueue::sub_blocks_memory_usage(int64_t size) {
535
910
    DCHECK(size >= 0);
536
910
    _recvr->_mem_tracker->release(size);
537
910
    _queue_mem_tracker->release(size);
538
910
    if (_local_channel_dependency && (!exceeds_limit())) {
539
410
        _local_channel_dependency->set_ready();
540
410
    }
541
910
}
542
543
1.82k
bool VDataStreamRecvr::SenderQueue::exceeds_limit() {
544
1.82k
    const size_t queue_byte_size = _queue_mem_tracker->consumption();
545
1.82k
    return _recvr->queue_exceeds_limit(queue_byte_size);
546
1.82k
}
547
548
502
bool VDataStreamRecvr::exceeds_limit(size_t block_byte_size) {
549
502
    return _mem_tracker->consumption() + block_byte_size > config::exchg_node_buffer_size_bytes;
550
502
}
551
552
1.82k
bool VDataStreamRecvr::queue_exceeds_limit(size_t queue_byte_size) const {
553
1.82k
    return queue_byte_size >= _sender_queue_mem_limit;
554
1.82k
}
555
556
14
void VDataStreamRecvr::close() {
557
14
    if (_is_closed) {
558
5
        return;
559
5
    }
560
9
    _is_closed = true;
561
9
    for (auto& it : _sender_to_local_channel_dependency) {
562
9
        it->set_always_ready();
563
9
    }
564
18
    for (int i = 0; i < _sender_queues.size(); ++i) {
565
9
        _sender_queues[i]->close();
566
9
    }
567
    // Remove this receiver from the DataStreamMgr that created it.
568
    // TODO: log error msg
569
9
    if (_mgr) {
570
7
        static_cast<void>(_mgr->deregister_recvr(fragment_instance_id(), dest_node_id()));
571
7
    }
572
9
    _mgr = nullptr;
573
574
9
    _merger.reset();
575
9
}
576
577
7
void VDataStreamRecvr::set_sink_dep_always_ready() const {
578
9
    for (auto dep : _sender_to_local_channel_dependency) {
579
9
        dep->set_always_ready();
580
9
    }
581
7
}
582
583
} // namespace doris