Coverage Report

Created: 2026-03-25 10:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/exchange/vdata_stream_recvr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/exchange/vdata_stream_recvr.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Metrics_types.h>
22
#include <gen_cpp/Types_types.h>
23
#include <gen_cpp/data.pb.h>
24
25
#include <algorithm>
26
#include <functional>
27
#include <string>
28
29
#include "common/logging.h"
30
#include "core/block/block.h"
31
#include "core/block/materialize_block.h"
32
#include "exec/exchange/vdata_stream_mgr.h"
33
#include "exec/operator/exchange_sink_operator.h"
34
#include "exec/operator/exchange_source_operator.h"
35
#include "exec/sort/sort_cursor.h"
36
#include "exec/sort/vsorted_run_merger.h"
37
#include "runtime/memory/mem_tracker.h"
38
#include "runtime/runtime_state.h"
39
#include "runtime/thread_context.h"
40
#include "util/defer_op.h"
41
#include "util/uid_util.h"
42
43
namespace doris {
44
#include "common/compile_check_begin.h"
45
46
VDataStreamRecvr::SenderQueue::SenderQueue(VDataStreamRecvr* parent_recvr, int num_senders,
47
                                           std::shared_ptr<Dependency> local_channel_dependency)
48
5.18k
        : _recvr(parent_recvr),
49
5.18k
          _is_cancelled(false),
50
5.18k
          _num_remaining_senders(num_senders),
51
5.18k
          _local_channel_dependency(local_channel_dependency) {
52
5.18k
    _cancel_status = Status::OK();
53
5.18k
    _queue_mem_tracker = std::make_unique<MemTracker>("local data queue mem tracker");
54
5.18k
}
55
56
5.18k
VDataStreamRecvr::SenderQueue::~SenderQueue() {
57
5.18k
    for (auto& block_item : _block_queue) {
58
0
        block_item.call_done(_recvr);
59
0
    }
60
5.18k
    _block_queue.clear();
61
5.18k
}
62
63
18.1k
Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) {
64
18.1k
#ifndef NDEBUG
65
18.1k
    if (!_is_cancelled && _block_queue.empty() && _num_remaining_senders > 0) {
66
0
        throw doris::Exception(ErrorCode::INTERNAL_ERROR,
67
0
                               "_is_cancelled: {}, _block_queue_empty: {}, "
68
0
                               "_num_remaining_senders: {}",
69
0
                               _is_cancelled, _block_queue.empty(), _num_remaining_senders);
70
0
    }
71
18.1k
#endif
72
18.1k
    BlockItem block_item;
73
18.1k
    {
74
18.1k
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
75
        //check and get block_item from data_queue
76
18.1k
        if (_is_cancelled) {
77
1
            RETURN_IF_ERROR(_cancel_status);
78
1
            return Status::Cancelled("Cancelled");
79
1
        }
80
81
18.1k
        if (_block_queue.empty()) {
82
5.00k
            if (_num_remaining_senders != 0) {
83
0
                return Status::InternalError(
84
0
                        "Data queue is empty but there are still remaining senders. "
85
0
                        "_num_remaining_senders: {}",
86
0
                        _num_remaining_senders);
87
0
            }
88
5.00k
            *eos = true;
89
5.00k
            return Status::OK();
90
5.00k
        }
91
92
18.1k
        DCHECK(!_block_queue.empty());
93
13.1k
        block_item = std::move(_block_queue.front());
94
13.1k
        _block_queue.pop_front();
95
13.1k
    }
96
0
    BlockUPtr next_block;
97
13.1k
    RETURN_IF_ERROR(block_item.get_block(next_block));
98
13.1k
    size_t block_byte_size = block_item.block_byte_size();
99
13.1k
    COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, block_item.deserialize_time());
100
13.1k
    COUNTER_UPDATE(_recvr->_decompress_timer, block_item.decompress_time());
101
13.1k
    COUNTER_UPDATE(_recvr->_decompress_bytes, block_item.decompress_bytes());
102
13.1k
    _recvr->_memory_used_counter->update(-(int64_t)block_byte_size);
103
13.1k
    INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
104
13.1k
    sub_blocks_memory_usage(block_byte_size);
105
13.1k
    if (_block_queue.empty() && _source_dependency) {
106
8.11k
        if (!_is_cancelled && _num_remaining_senders > 0) {
107
3.00k
            _source_dependency->block();
108
3.00k
        }
109
8.11k
    }
110
111
13.1k
    block_item.call_done(_recvr);
112
113
13.1k
    DCHECK(block->empty());
114
13.1k
    block->swap(*next_block);
115
13.1k
    *eos = false;
116
13.1k
    return Status::OK();
117
13.1k
}
118
119
23.5k
void VDataStreamRecvr::SenderQueue::set_source_ready(std::lock_guard<std::mutex>&) {
120
    // Here, it is necessary to check if _source_dependency is not nullptr.
121
    // This is because the queue might be closed before setting the source dependency.
122
23.5k
    if (!_source_dependency) {
123
11
        return;
124
11
    }
125
23.5k
    const bool should_wait = !_is_cancelled && _block_queue.empty() && _num_remaining_senders > 0;
126
23.5k
    if (!should_wait) {
127
23.5k
        _source_dependency->set_ready();
128
23.5k
    }
129
23.5k
}
130
131
0
std::string VDataStreamRecvr::SenderQueue::debug_string() {
132
0
    fmt::memory_buffer debug_string_buffer;
133
0
    fmt::format_to(debug_string_buffer,
134
0
                   "_num_remaining_senders = {}, block_queue size = {}, _is_cancelled: {}, "
135
0
                   "_cancel_status: {}, _sender_eos_set: (",
136
0
                   _num_remaining_senders, _block_queue.size(), _is_cancelled,
137
0
                   _cancel_status.to_string());
138
0
    std::lock_guard<std::mutex> l(_lock);
139
0
    for (auto& i : _sender_eos_set) {
140
0
        fmt::format_to(debug_string_buffer, "{}, ", i);
141
0
    }
142
0
    fmt::format_to(debug_string_buffer, ")");
143
0
    return fmt::to_string(debug_string_buffer);
144
0
}
145
146
Status VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr<PBlock> pblock, int be_number,
147
                                                int64_t packet_seq,
148
                                                ::google::protobuf::Closure** done,
149
                                                const int64_t wait_for_worker,
150
487
                                                const uint64_t time_to_find_recvr) {
151
487
    {
152
487
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
153
487
        if (_is_cancelled) {
154
0
            return Status::OK();
155
0
        }
156
487
        auto iter = _packet_seq_map.find(be_number);
157
495
        if (iter != _packet_seq_map.end()) {
158
495
            if (iter->second >= packet_seq) {
159
0
                return Status::InternalError(
160
0
                        "packet already exist [cur_packet_id= {} receive_packet_id={}]",
161
0
                        iter->second, packet_seq);
162
0
            }
163
495
            iter->second = packet_seq;
164
18.4E
        } else {
165
18.4E
            _packet_seq_map.emplace(be_number, packet_seq);
166
18.4E
        }
167
168
487
        DCHECK(_num_remaining_senders >= 0);
169
487
        if (_num_remaining_senders == 0) {
170
0
            DCHECK(_sender_eos_set.contains(be_number));
171
0
            return Status::OK();
172
0
        }
173
487
    }
174
175
487
    INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
176
487
    if (_is_cancelled) {
177
0
        return Status::OK();
178
0
    }
179
180
487
    const auto block_byte_size = pblock->ByteSizeLong();
181
487
    COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
182
487
    if (_recvr->_max_wait_worker_time->value() < wait_for_worker) {
183
0
        _recvr->_max_wait_worker_time->set(wait_for_worker);
184
0
    }
185
186
487
    if (_recvr->_max_find_recvr_time->value() < time_to_find_recvr) {
187
0
        _recvr->_max_find_recvr_time->set((int64_t)time_to_find_recvr);
188
0
    }
189
190
487
    _block_queue.emplace_back(std::move(pblock), block_byte_size);
191
487
    COUNTER_UPDATE(_recvr->_remote_bytes_received_counter, block_byte_size);
192
487
    set_source_ready(l);
193
194
    // if done is nullptr, this function can't delay this response
195
502
    if (done != nullptr && _recvr->exceeds_limit(block_byte_size)) {
196
1
        _block_queue.back().set_done(*done);
197
1
        *done = nullptr;
198
1
    }
199
487
    _recvr->_memory_used_counter->update(block_byte_size);
200
487
    add_blocks_memory_usage(block_byte_size);
201
487
    return Status::OK();
202
487
}
203
204
Status VDataStreamRecvr::SenderQueue::add_blocks(const PTransmitDataParams* request,
205
                                                 ::google::protobuf::Closure** done,
206
                                                 const int64_t wait_for_worker,
207
5.87k
                                                 const uint64_t time_to_find_recvr) {
208
5.87k
    {
209
5.87k
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
210
5.87k
        if (_is_cancelled) {
211
0
            return Status::OK();
212
0
        }
213
5.87k
        const int be_number = request->be_number();
214
        // In the request, the packet_seq for blocks is [request->packet_seq() - blocks_size(), request->packet_seq())
215
        // Note this is a left-closed, right-open interval; the packet_seq of the last block is request->packet_seq() - 1
216
        // We store the packet_seq of the last block in _packet_seq_map so we can compare it with the packet_seq of the next received packet
217
5.87k
        const int64_t packet_seq = request->packet_seq() - 1;
218
5.87k
        auto iter = _packet_seq_map.find(be_number);
219
5.87k
        if (iter != _packet_seq_map.end()) {
220
3.36k
            if (iter->second > (packet_seq - request->blocks_size())) {
221
0
                return Status::InternalError(
222
0
                        "packet already exist [cur_packet_id= {} receive_packet_id={}]",
223
0
                        iter->second, packet_seq);
224
0
            }
225
3.36k
            iter->second = packet_seq;
226
3.36k
        } else {
227
2.50k
            _packet_seq_map.emplace(be_number, packet_seq);
228
2.50k
        }
229
230
5.87k
        DCHECK(_num_remaining_senders >= 0);
231
5.87k
        if (_num_remaining_senders == 0) {
232
0
            DCHECK(_sender_eos_set.end() != _sender_eos_set.find(be_number));
233
0
            return Status::OK();
234
0
        }
235
5.87k
    }
236
237
5.87k
    INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
238
5.87k
    if (_is_cancelled) {
239
0
        return Status::OK();
240
0
    }
241
242
5.87k
    int64_t total_block_byte_size = 0;
243
11.7k
    for (int i = 0; i < request->blocks_size(); i++) {
244
5.87k
        std::unique_ptr<PBlock> pblock = std::make_unique<PBlock>();
245
5.87k
        pblock->CopyFrom(request->blocks(i));
246
247
5.87k
        const auto block_byte_size = pblock->ByteSizeLong();
248
5.87k
        COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
249
5.87k
        if (_recvr->_max_wait_worker_time->value() < wait_for_worker) {
250
1
            _recvr->_max_wait_worker_time->set(wait_for_worker);
251
1
        }
252
253
5.87k
        if (_recvr->_max_find_recvr_time->value() < time_to_find_recvr) {
254
2.71k
            _recvr->_max_find_recvr_time->set((int64_t)time_to_find_recvr);
255
2.71k
        }
256
257
5.87k
        _block_queue.emplace_back(std::move(pblock), block_byte_size);
258
5.87k
        COUNTER_UPDATE(_recvr->_remote_bytes_received_counter, block_byte_size);
259
5.87k
        total_block_byte_size += block_byte_size;
260
5.87k
    }
261
262
5.87k
    set_source_ready(l);
263
264
    // if done is nullptr, this function can't delay this response
265
5.87k
    if (done != nullptr && _recvr->exceeds_limit(total_block_byte_size)) {
266
1
        _block_queue.back().set_done(*done);
267
1
        *done = nullptr;
268
1
    }
269
5.87k
    _recvr->_memory_used_counter->update(total_block_byte_size);
270
5.87k
    add_blocks_memory_usage(total_block_byte_size);
271
5.87k
    return Status::OK();
272
5.87k
}
273
274
7.08k
void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) {
275
7.08k
    if (block->rows() == 0) {
276
0
        return;
277
0
    }
278
7.08k
    {
279
7.08k
        INJECT_MOCK_SLEEP(std::unique_lock<std::mutex> l(_lock));
280
7.08k
        if (_is_cancelled) {
281
298
            return;
282
298
        }
283
7.08k
        DCHECK(_num_remaining_senders >= 0);
284
6.78k
        if (_num_remaining_senders == 0) {
285
1
            return;
286
1
        }
287
6.78k
    }
288
6.78k
    BlockUPtr nblock = Block::create_unique(block->get_columns_with_type_and_name());
289
290
    // local exchange should copy the block contented if use move == false
291
6.78k
    if (use_move) {
292
6.39k
        block->clear();
293
6.39k
    } else {
294
386
        auto rows = block->rows();
295
784
        for (int i = 0; i < nblock->columns(); ++i) {
296
398
            nblock->get_by_position(i).column =
297
398
                    nblock->get_by_position(i).column->clone_resized(rows);
298
398
        }
299
386
    }
300
6.78k
    materialize_block_inplace(*nblock);
301
302
6.78k
    auto block_mem_size = nblock->allocated_bytes();
303
6.78k
    {
304
6.78k
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
305
6.78k
        if (_is_cancelled) {
306
1
            return;
307
1
        }
308
6.78k
        _block_queue.emplace_back(std::move(nblock), block_mem_size);
309
6.78k
        set_source_ready(l);
310
6.78k
        COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size);
311
6.78k
        _recvr->_memory_used_counter->update(block_mem_size);
312
6.78k
        add_blocks_memory_usage(block_mem_size);
313
6.78k
    }
314
6.78k
}
315
316
5.38k
void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) {
317
5.38k
    INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
318
5.38k
    if (_sender_eos_set.end() != _sender_eos_set.find(be_number)) {
319
0
        return;
320
0
    }
321
5.38k
    _sender_eos_set.insert(be_number);
322
5.38k
    DCHECK_GT(_num_remaining_senders, 0);
323
5.38k
    _num_remaining_senders--;
324
5.38k
    VLOG_FILE << "decremented senders: fragment_instance_id="
325
1
              << print_id(_recvr->fragment_instance_id()) << " node_id=" << _recvr->dest_node_id()
326
1
              << " #senders=" << _num_remaining_senders;
327
5.38k
    if (_num_remaining_senders == 0) {
328
5.17k
        set_source_ready(l);
329
5.17k
    }
330
5.38k
}
331
332
5.17k
void VDataStreamRecvr::SenderQueue::cancel(Status cancel_status) {
333
5.17k
    {
334
5.17k
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
335
5.17k
        if (_is_cancelled) {
336
5.17k
            return;
337
5.17k
        }
338
0
        _is_cancelled = true;
339
0
        _cancel_status = cancel_status;
340
0
        set_source_ready(l);
341
0
        VLOG_QUERY << "cancelled stream: _fragment_instance_id="
342
0
                   << print_id(_recvr->fragment_instance_id())
343
0
                   << " node_id=" << _recvr->dest_node_id();
344
0
    }
345
0
    {
346
0
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
347
0
        for (auto& block_item : _block_queue) {
348
0
            block_item.call_done(_recvr);
349
0
        }
350
0
        _block_queue.clear();
351
0
    }
352
0
}
353
354
5.17k
void VDataStreamRecvr::SenderQueue::close() {
355
    // If _is_cancelled is not set to true, there may be concurrent send
356
    // which add batch to _block_queue. The batch added after _block_queue
357
    // is clear will be memory leak
358
5.17k
    INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
359
5.17k
    _is_cancelled = true;
360
5.17k
    set_source_ready(l);
361
362
5.17k
    for (auto& block_item : _block_queue) {
363
6
        block_item.call_done(_recvr);
364
6
    }
365
    // Delete any batches queued in _block_queue
366
5.17k
    _block_queue.clear();
367
5.17k
}
368
369
VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr,
370
                                   RuntimeProfile::HighWaterMarkCounter* memory_used_counter,
371
                                   RuntimeState* state, const TUniqueId& fragment_instance_id,
372
                                   PlanNodeId dest_node_id, int num_senders, bool is_merging,
373
                                   RuntimeProfile* profile, size_t data_queue_capacity)
374
5.18k
        : HasTaskExecutionCtx(state),
375
5.18k
          _mgr(stream_mgr),
376
5.18k
          _memory_used_counter(memory_used_counter),
377
5.18k
          _resource_ctx(state->get_query_ctx()->resource_ctx()),
378
5.18k
          _query_context(state->get_query_ctx()->shared_from_this()),
379
5.18k
          _fragment_instance_id(fragment_instance_id),
380
5.18k
          _dest_node_id(dest_node_id),
381
5.18k
          _is_merging(is_merging),
382
5.18k
          _is_closed(false),
383
5.18k
          _sender_queue_mem_limit(data_queue_capacity),
384
5.18k
          _profile(profile) {
385
    // DataStreamRecvr may be destructed after the instance execution thread ends.
386
5.18k
    _mem_tracker =
387
5.18k
            std::make_unique<MemTracker>("VDataStreamRecvr:" + print_id(_fragment_instance_id));
388
5.18k
    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
389
390
    // Create one queue per sender if is_merging is true.
391
5.18k
    int num_queues = is_merging ? num_senders : 1;
392
5.18k
    _sender_to_local_channel_dependency.resize(num_queues);
393
10.3k
    for (size_t i = 0; i < num_queues; i++) {
394
5.18k
        _sender_to_local_channel_dependency[i] = Dependency::create_shared(
395
5.18k
                _dest_node_id, _dest_node_id, fmt::format("LocalExchangeChannelDependency_{}", i),
396
5.18k
                true);
397
5.18k
    }
398
5.18k
    _sender_queues.reserve(num_queues);
399
5.18k
    int num_sender_per_queue = is_merging ? 1 : num_senders;
400
10.3k
    for (int i = 0; i < num_queues; ++i) {
401
5.18k
        SenderQueue* queue = nullptr;
402
5.18k
        queue = _sender_queue_pool.add(new SenderQueue(this, num_sender_per_queue,
403
5.18k
                                                       _sender_to_local_channel_dependency[i]));
404
5.18k
        _sender_queues.push_back(queue);
405
5.18k
    }
406
407
    // Initialize the counters
408
5.18k
    _remote_bytes_received_counter = ADD_COUNTER(_profile, "RemoteBytesReceived", TUnit::BYTES);
409
5.18k
    _local_bytes_received_counter = ADD_COUNTER(_profile, "LocalBytesReceived", TUnit::BYTES);
410
411
5.18k
    _deserialize_row_batch_timer = ADD_TIMER(_profile, "DeserializeRowBatchTimer");
412
5.18k
    _data_arrival_timer = ADD_TIMER(_profile, "DataArrivalWaitTime");
413
5.18k
    _buffer_full_total_timer = ADD_TIMER(_profile, "SendersBlockedTotalTimer(*)");
414
5.18k
    _first_batch_wait_total_timer = ADD_TIMER(_profile, "FirstBatchArrivalWaitTime");
415
5.18k
    _decompress_timer = ADD_TIMER(_profile, "DecompressTime");
416
5.18k
    _decompress_bytes = ADD_COUNTER(_profile, "DecompressBytes", TUnit::BYTES);
417
5.18k
    _blocks_produced_counter = ADD_COUNTER(_profile, "BlocksProduced", TUnit::UNIT);
418
5.18k
    _max_wait_worker_time = ADD_COUNTER(_profile, "MaxWaitForWorkerTime", TUnit::UNIT);
419
5.18k
    _max_wait_to_process_time = ADD_COUNTER(_profile, "MaxWaitToProcessTime", TUnit::UNIT);
420
5.18k
    _max_find_recvr_time = ADD_COUNTER(_profile, "MaxFindRecvrTime(NS)", TUnit::UNIT);
421
5.18k
}
422
423
5.18k
VDataStreamRecvr::~VDataStreamRecvr() {
424
5.18k
    DCHECK(_mgr == nullptr) << "Must call close()";
425
5.18k
}
426
427
Status VDataStreamRecvr::create_merger(const VExprContextSPtrs& ordering_expr,
428
                                       const std::vector<bool>& is_asc_order,
429
                                       const std::vector<bool>& nulls_first, size_t batch_size,
430
508
                                       int64_t limit, size_t offset) {
431
508
    DCHECK(_is_merging);
432
508
    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
433
508
    std::vector<BlockSupplier> child_block_suppliers;
434
    // Create the merger that will a single stream of sorted rows.
435
508
    _merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, nulls_first, batch_size, limit,
436
508
                                       offset, _profile));
437
438
1.01k
    for (int i = 0; i < _sender_queues.size(); ++i) {
439
508
        child_block_suppliers.emplace_back(std::bind(std::mem_fn(&SenderQueue::get_batch),
440
508
                                                     _sender_queues[i], std::placeholders::_1,
441
508
                                                     std::placeholders::_2));
442
508
    }
443
508
    RETURN_IF_ERROR(_merger->prepare(child_block_suppliers));
444
508
    return Status::OK();
445
508
}
446
447
Status VDataStreamRecvr::add_block(std::unique_ptr<PBlock> pblock, int sender_id, int be_number,
448
                                   int64_t packet_seq, ::google::protobuf::Closure** done,
449
                                   const int64_t wait_for_worker,
450
0
                                   const uint64_t time_to_find_recvr) {
451
0
    SCOPED_ATTACH_TASK(_resource_ctx);
452
0
    if (_query_context->low_memory_mode()) {
453
0
        set_low_memory_mode();
454
0
    }
455
456
0
    int use_sender_id = _is_merging ? sender_id : 0;
457
0
    return _sender_queues[use_sender_id]->add_block(std::move(pblock), be_number, packet_seq, done,
458
0
                                                    wait_for_worker, time_to_find_recvr);
459
0
}
460
461
Status VDataStreamRecvr::add_blocks(const PTransmitDataParams* request,
462
                                    ::google::protobuf::Closure** done,
463
                                    const int64_t wait_for_worker,
464
5.87k
                                    const uint64_t time_to_find_recvr) {
465
5.87k
    SCOPED_ATTACH_TASK(_resource_ctx);
466
5.87k
    if (_query_context->low_memory_mode()) {
467
0
        set_low_memory_mode();
468
0
    }
469
5.87k
    int use_sender_id = _is_merging ? request->sender_id() : 0;
470
5.87k
    return _sender_queues[use_sender_id]->add_blocks(request, done, wait_for_worker,
471
5.87k
                                                     time_to_find_recvr);
472
5.87k
}
473
474
6.39k
void VDataStreamRecvr::add_block(Block* block, int sender_id, bool use_move) {
475
6.39k
    if (_query_context->low_memory_mode()) {
476
604
        set_low_memory_mode();
477
604
    }
478
6.39k
    int use_sender_id = _is_merging ? sender_id : 0;
479
6.39k
    _sender_queues[use_sender_id]->add_block(block, use_move);
480
6.39k
}
481
482
0
std::string VDataStreamRecvr::debug_string() {
483
0
    fmt::memory_buffer debug_string_buffer;
484
0
    fmt::format_to(debug_string_buffer,
485
0
                   "fragment_instance_id: {}, _dest_node_id: {}, _is_merging: {}, _is_closed: {}",
486
0
                   print_id(_fragment_instance_id), _dest_node_id, _is_merging, _is_closed);
487
0
    for (size_t i = 0; i < _sender_queues.size(); i++) {
488
0
        fmt::format_to(debug_string_buffer, "No. {} queue: {}", i,
489
0
                       _sender_queues[i]->debug_string());
490
0
    }
491
0
    return fmt::to_string(debug_string_buffer);
492
0
}
493
494
2.73k
std::shared_ptr<Dependency> VDataStreamRecvr::get_local_channel_dependency(int sender_id) {
495
2.73k
    DCHECK(_sender_to_local_channel_dependency[_is_merging ? sender_id : 0] != nullptr);
496
2.73k
    return _sender_to_local_channel_dependency[_is_merging ? sender_id : 0];
497
2.73k
}
498
499
17.2k
Status VDataStreamRecvr::get_next(Block* block, bool* eos) {
500
17.2k
    if (!_is_merging) {
501
16.4k
        block->clear();
502
16.4k
        return _sender_queues[0]->get_batch(block, eos);
503
16.4k
    } else {
504
842
        return _merger->get_next(block, eos);
505
842
    }
506
17.2k
}
507
508
5.36k
void VDataStreamRecvr::remove_sender(int sender_id, int be_number, Status exec_status) {
509
5.36k
    if (!exec_status.ok()) {
510
0
        cancel_stream(exec_status);
511
0
        return;
512
0
    }
513
5.36k
    int use_sender_id = _is_merging ? sender_id : 0;
514
5.36k
    _sender_queues[use_sender_id]->decrement_senders(be_number);
515
5.36k
}
516
517
5.17k
void VDataStreamRecvr::cancel_stream(Status exec_status) {
518
5.17k
    VLOG_QUERY << "cancel_stream: fragment_instance_id=" << print_id(_fragment_instance_id)
519
0
               << exec_status;
520
521
10.3k
    for (int i = 0; i < _sender_queues.size(); ++i) {
522
5.17k
        _sender_queues[i]->cancel(exec_status);
523
5.17k
    }
524
5.17k
}
525
526
13.1k
void VDataStreamRecvr::SenderQueue::add_blocks_memory_usage(int64_t size) {
527
13.1k
    DCHECK(size >= 0);
528
13.1k
    _recvr->_mem_tracker->consume(size);
529
13.1k
    _queue_mem_tracker->consume(size);
530
13.1k
    if (_local_channel_dependency && exceeds_limit()) {
531
907
        _local_channel_dependency->block();
532
907
    }
533
13.1k
}
534
535
13.1k
void VDataStreamRecvr::SenderQueue::sub_blocks_memory_usage(int64_t size) {
536
13.1k
    DCHECK(size >= 0);
537
13.1k
    _recvr->_mem_tracker->release(size);
538
13.1k
    _queue_mem_tracker->release(size);
539
13.1k
    if (_local_channel_dependency && (!exceeds_limit())) {
540
12.6k
        _local_channel_dependency->set_ready();
541
12.6k
    }
542
13.1k
}
543
544
26.3k
bool VDataStreamRecvr::SenderQueue::exceeds_limit() {
545
26.3k
    const size_t queue_byte_size = _queue_mem_tracker->consumption();
546
26.3k
    return _recvr->queue_exceeds_limit(queue_byte_size);
547
26.3k
}
548
549
6.37k
bool VDataStreamRecvr::exceeds_limit(size_t block_byte_size) {
550
6.37k
    return _mem_tracker->consumption() + block_byte_size > config::exchg_node_buffer_size_bytes;
551
6.37k
}
552
553
26.3k
bool VDataStreamRecvr::queue_exceeds_limit(size_t queue_byte_size) const {
554
26.3k
    return queue_byte_size >= _sender_queue_mem_limit;
555
26.3k
}
556
557
10.3k
void VDataStreamRecvr::close() {
558
10.3k
    if (_is_closed) {
559
5.17k
        return;
560
5.17k
    }
561
5.17k
    _is_closed = true;
562
5.17k
    for (auto& it : _sender_to_local_channel_dependency) {
563
5.17k
        it->set_always_ready();
564
5.17k
    }
565
10.3k
    for (int i = 0; i < _sender_queues.size(); ++i) {
566
5.17k
        _sender_queues[i]->close();
567
5.17k
    }
568
    // Remove this receiver from the DataStreamMgr that created it.
569
    // TODO: log error msg
570
5.17k
    if (_mgr) {
571
5.17k
        static_cast<void>(_mgr->deregister_recvr(fragment_instance_id(), dest_node_id()));
572
5.17k
    }
573
5.17k
    _mgr = nullptr;
574
575
5.17k
    _merger.reset();
576
5.17k
}
577
578
5.17k
void VDataStreamRecvr::set_sink_dep_always_ready() const {
579
5.17k
    for (auto dep : _sender_to_local_channel_dependency) {
580
5.17k
        dep->set_always_ready();
581
5.17k
    }
582
5.17k
}
583
584
} // namespace doris