Coverage Report

Created: 2026-04-15 09:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/exchange/vdata_stream_recvr.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/Types_types.h>
21
#include <gen_cpp/data.pb.h>
22
#include <glog/logging.h>
23
#include <google/protobuf/stubs/callback.h>
24
25
#include <atomic>
26
#include <condition_variable>
27
#include <cstddef>
28
#include <cstdint>
29
#include <deque>
30
#include <list>
31
#include <memory>
32
#include <mutex>
33
#include <ostream>
34
#include <sstream>
35
#include <thread>
36
#include <unordered_map>
37
#include <unordered_set>
38
#include <utility>
39
#include <vector>
40
41
#include "common/config.h"
42
#include "common/global_types.h"
43
#include "common/object_pool.h"
44
#include "common/status.h"
45
#include "core/block/block.h"
46
#include "exprs/vexpr_fwd.h"
47
#include "runtime/descriptors.h"
48
#include "runtime/runtime_profile.h"
49
#include "runtime/task_execution_context.h"
50
#include "runtime/thread_context.h"
51
#include "runtime/workload_group/workload_group.h"
52
#include "util/stopwatch.hpp"
53
54
namespace doris {
55
class MemTracker;
56
class PBlock;
57
class MemTrackerLimiter;
58
class RuntimeState;
59
60
class Dependency;
61
class ExchangeLocalState;
62
63
class VDataStreamMgr;
64
class VSortedRunMerger;
65
66
class VDataStreamRecvr;
67
68
class VDataStreamRecvr : public HasTaskExecutionCtx {
69
public:
70
    class SenderQueue;
71
    VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeProfile::HighWaterMarkCounter* counter,
72
                     RuntimeState* state, const TUniqueId& fragment_instance_id,
73
                     PlanNodeId dest_node_id, int num_senders, bool is_merging,
74
                     RuntimeProfile* profile, size_t data_queue_capacity);
75
76
    ~VDataStreamRecvr() override;
77
78
    MOCK_FUNCTION Status create_merger(const VExprContextSPtrs& ordering_expr,
79
                                       const std::vector<bool>& is_asc_order,
80
                                       const std::vector<bool>& nulls_first, size_t batch_size,
81
                                       int64_t limit, size_t offset);
82
83
807k
    std::vector<SenderQueue*> sender_queues() const { return _sender_queues; }
84
85
    Status add_block(std::unique_ptr<PBlock> pblock, int sender_id, int be_number,
86
                     int64_t packet_seq, ::google::protobuf::Closure** done,
87
                     const int64_t wait_for_worker, const uint64_t time_to_find_recvr);
88
89
    Status add_blocks(const PTransmitDataParams* request, ::google::protobuf::Closure** done,
90
                      const int64_t wait_for_worker, const uint64_t time_to_find_recvr);
91
92
    void add_block(Block* block, int sender_id, bool use_move);
93
    std::string debug_string();
94
95
    MOCK_FUNCTION Status get_next(Block* block, bool* eos);
96
97
5.22M
    const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; }
98
5.21M
    PlanNodeId dest_node_id() const { return _dest_node_id; }
99
100
    // Indicate that a particular sender is done. Delegated to the appropriate
101
    // sender queue. Called from DataStreamMgr.
102
    void remove_sender(int sender_id, int be_number, Status exec_status);
103
104
    void cancel_stream(Status exec_status);
105
106
    MOCK_FUNCTION void close();
107
108
    // When the source reaches eos = true
109
    void set_sink_dep_always_ready() const;
110
111
    // Careful: stream sender will call this function for a local receiver,
112
    // accessing members of receiver that are allocated by Object pool
113
    // in this function is not safe.
114
    MOCK_FUNCTION bool exceeds_limit(size_t block_byte_size);
115
    bool queue_exceeds_limit(size_t byte_size) const;
116
2.08M
    bool is_closed() const { return _is_closed; }
117
118
    std::shared_ptr<Dependency> get_local_channel_dependency(int sender_id);
119
120
6
    void set_low_memory_mode() { _sender_queue_mem_limit = 1012 * 1024; }
121
122
private:
123
    friend struct BlockSupplierSortCursorImpl;
124
125
    // DataStreamMgr instance used to create this recvr. (Not owned)
126
    VDataStreamMgr* _mgr = nullptr;
127
128
    RuntimeProfile::HighWaterMarkCounter* _memory_used_counter = nullptr;
129
130
    std::shared_ptr<ResourceContext> _resource_ctx;
131
132
    std::shared_ptr<QueryContext> _query_context;
133
134
    // Fragment and node id of the destination exchange node this receiver is used by.
135
    TUniqueId _fragment_instance_id;
136
    PlanNodeId _dest_node_id;
137
138
    // Row schema, copied from the caller of CreateRecvr().
139
    RowDescriptor _row_desc;
140
141
    // True if this reciver merges incoming rows from different senders. Per-sender
142
    // row batch queues are maintained in this case.
143
    bool _is_merging;
144
    bool _is_closed;
145
146
    std::unique_ptr<MemTracker> _mem_tracker;
147
    // Managed by object pool
148
    std::vector<SenderQueue*> _sender_queues;
149
150
    std::atomic<size_t> _sender_queue_mem_limit;
151
152
    std::unique_ptr<VSortedRunMerger> _merger;
153
154
    ObjectPool _sender_queue_pool;
155
    RuntimeProfile* _profile = nullptr;
156
157
    RuntimeProfile::Counter* _remote_bytes_received_counter = nullptr;
158
    RuntimeProfile::Counter* _local_bytes_received_counter = nullptr;
159
    RuntimeProfile::Counter* _deserialize_row_batch_timer = nullptr;
160
    RuntimeProfile::Counter* _first_batch_wait_total_timer = nullptr;
161
    RuntimeProfile::Counter* _buffer_full_total_timer = nullptr;
162
    RuntimeProfile::Counter* _data_arrival_timer = nullptr;
163
    RuntimeProfile::Counter* _decompress_timer = nullptr;
164
    RuntimeProfile::Counter* _decompress_bytes = nullptr;
165
166
    // Number of blocks received
167
    RuntimeProfile::Counter* _blocks_produced_counter = nullptr;
168
    RuntimeProfile::Counter* _max_wait_worker_time = nullptr;
169
    RuntimeProfile::Counter* _max_wait_to_process_time = nullptr;
170
    RuntimeProfile::Counter* _max_find_recvr_time = nullptr;
171
172
    std::vector<std::shared_ptr<Dependency>> _sender_to_local_channel_dependency;
173
};
174
175
class VDataStreamRecvr::SenderQueue {
176
public:
177
    SenderQueue(VDataStreamRecvr* parent_recvr, int num_senders,
178
                std::shared_ptr<Dependency> local_channel_dependency);
179
180
    ~SenderQueue();
181
182
    Status get_batch(Block* next_block, bool* eos);
183
184
    Status add_block(std::unique_ptr<PBlock> pblock, int be_number, int64_t packet_seq,
185
                     ::google::protobuf::Closure** done, const int64_t wait_for_worker,
186
                     const uint64_t time_to_find_recvr);
187
188
    Status add_blocks(const PTransmitDataParams* request, ::google::protobuf::Closure** done,
189
                      const int64_t wait_for_worker, const uint64_t time_to_find_recvr);
190
191
    std::string debug_string();
192
193
    void add_block(Block* block, bool use_move);
194
195
    void decrement_senders(int sender_id);
196
197
    void cancel(Status cancel_status);
198
199
    void close();
200
201
605k
    void set_dependency(std::shared_ptr<Dependency> dependency) { _source_dependency = dependency; }
202
203
protected:
204
    void add_blocks_memory_usage(int64_t size);
205
206
    void sub_blocks_memory_usage(int64_t size);
207
208
    bool exceeds_limit();
209
    friend class ExchangeLocalState;
210
211
    void set_source_ready(std::lock_guard<std::mutex>&);
212
213
    // Not managed by this class
214
    VDataStreamRecvr* _recvr = nullptr;
215
    std::mutex _lock;
216
    bool _is_cancelled;
217
    Status _cancel_status;
218
    int _num_remaining_senders;
219
    std::unique_ptr<MemTracker> _queue_mem_tracker;
220
221
    // `BlockItem` is used in `_block_queue` to handle both local and remote exchange blocks.
222
    // For local exchange blocks, `BlockUPtr` is used directly without any modification.
223
    // For remote exchange blocks, the `pblock` is stored in `BlockItem`.
224
    // When `getBlock` is called, the `pblock` is deserialized into a usable block.
225
    struct BlockItem {
226
221k
        Status get_block(BlockUPtr& block) {
227
221k
            if (!_block) {
228
71.6k
                DCHECK(_pblock);
229
71.6k
                SCOPED_RAW_TIMER(&_deserialize_time);
230
71.6k
                _block = Block::create_unique();
231
71.6k
                RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
232
71.6k
                        _block->deserialize(*_pblock, &_decompress_bytes, &_decompress_time));
233
71.6k
            }
234
221k
            block.swap(_block);
235
221k
            _block.reset();
236
221k
            return Status::OK();
237
221k
        }
238
239
221k
        size_t block_byte_size() const { return _block_byte_size; }
240
221k
        int64_t deserialize_time() const { return _deserialize_time; }
241
221k
        int64_t decompress_time() const { return _decompress_time; }
242
221k
        size_t decompress_bytes() const { return _decompress_bytes; }
243
810k
        BlockItem() = default;
244
        BlockItem(BlockUPtr&& block, size_t block_byte_size)
245
150k
                : _block(std::move(block)), _block_byte_size(block_byte_size) {}
246
247
        BlockItem(std::unique_ptr<PBlock>&& pblock, size_t block_byte_size)
248
72.3k
                : _block(nullptr), _pblock(std::move(pblock)), _block_byte_size(block_byte_size) {}
249
250
3
        void set_done(google::protobuf::Closure* done) {
251
            // The done callback is only set when the queue memory limit is exceeded.
252
3
            _done_cb = done;
253
3
            _wait_timer.start();
254
3
        }
255
256
223k
        void call_done(VDataStreamRecvr* recvr) {
257
223k
            if (_done_cb != nullptr) {
258
3
                _done_cb->Run();
259
3
                _done_cb = nullptr;
260
3
                _wait_timer.stop();
261
3
                int64_t elapse_time = _wait_timer.elapsed_time();
262
3
                if (recvr->_max_wait_to_process_time->value() < elapse_time) {
263
3
                    recvr->_max_wait_to_process_time->set(elapse_time);
264
3
                }
265
3
                recvr->_buffer_full_total_timer->update(elapse_time);
266
3
            }
267
223k
        }
268
269
    private:
270
        BlockUPtr _block;
271
        std::unique_ptr<PBlock> _pblock;
272
        size_t _block_byte_size = 0;
273
        int64_t _deserialize_time = 0;
274
        int64_t _decompress_time = 0;
275
        size_t _decompress_bytes = 0;
276
277
        google::protobuf::Closure* _done_cb = nullptr;
278
        MonotonicStopWatch _wait_timer;
279
    };
280
281
    std::list<BlockItem> _block_queue;
282
283
    // sender_id
284
    std::unordered_set<int> _sender_eos_set;
285
    // be_number => packet_seq
286
    std::unordered_map<int, int64_t> _packet_seq_map;
287
288
    std::shared_ptr<Dependency> _source_dependency;
289
    std::shared_ptr<Dependency> _local_channel_dependency;
290
};
291
292
} // namespace doris