Coverage Report

Created: 2026-03-13 09:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/exchange/vdata_stream_sender.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <brpc/controller.h>
21
#include <butil/errno.h>
22
#include <fmt/format.h>
23
#include <gen_cpp/Partitions_types.h>
24
#include <gen_cpp/Types_types.h>
25
#include <gen_cpp/data.pb.h>
26
#include <gen_cpp/internal_service.pb.h>
27
#include <gen_cpp/types.pb.h>
28
29
#include <atomic>
30
#include <cstddef>
31
#include <cstdint>
32
#include <memory>
33
#include <ostream>
34
#include <string>
35
#include <utility>
36
#include <vector>
37
38
#include "common/config.h"
39
#include "common/global_types.h"
40
#include "common/logging.h"
41
#include "common/status.h"
42
#include "core/block/block.h"
43
#include "exec/exchange/vdata_stream_recvr.h"
44
#include "exec/operator/exchange_sink_buffer.h"
45
#include "exec/partitioner/partitioner.h"
46
#include "exec/sink/vrow_distribution.h"
47
#include "exec/sink/vtablet_finder.h"
48
#include "exprs/vexpr_context.h"
49
#include "runtime/runtime_profile.h"
50
#include "service/backend_options.h"
51
#include "storage/tablet_info.h"
52
#include "util/brpc_closure.h"
53
#include "util/uid_util.h"
54
55
namespace doris {
56
#include "common/compile_check_begin.h"
57
class ObjectPool;
58
class RuntimeState;
59
class RowDescriptor;
60
class TDataSink;
61
class TDataStreamSink;
62
class TPlanFragmentDestination;
63
64
namespace segment_v2 {
65
enum CompressionTypePB : int;
66
} // namespace segment_v2
67
68
class ExchangeSinkOperatorX;
69
class Dependency;
70
class ExchangeSinkLocalState;
71
72
class BlockSerializer {
73
public:
74
    BlockSerializer(ExchangeSinkLocalState* parent, bool is_local = true);
75
#ifdef BE_TEST
76
    BlockSerializer() : _batch_size(0) {};
77
#endif
78
    Status next_serialized_block(Block* src, PBlock* dest, size_t num_receivers, bool* serialized,
79
                                 bool eos, const uint32_t* data = nullptr,
80
                                 const uint32_t offset = 0, const uint32_t size = 0);
81
    Status serialize_block(const Block* src, PBlock* dest, size_t num_receivers = 1);
82
83
18.5M
    MutableBlock* get_block() const { return _mutable_block.get(); }
84
85
15.5M
    size_t mem_usage() const { return _mutable_block ? _mutable_block->allocated_bytes() : 0; }
86
87
781k
    void reset_block() { _mutable_block.reset(); }
88
89
6.32M
    void set_is_local(bool is_local) { _is_local = is_local; }
90
697
    bool is_local() const { return _is_local; }
91
92
7.64k
    void set_low_memory_mode(RuntimeState* state) { _buffer_mem_limit = 4 * 1024 * 1024; }
93
94
private:
95
    Status _serialize_block(PBlock* dest, size_t num_receivers = 1);
96
97
    ExchangeSinkLocalState* _parent;
98
    std::unique_ptr<MutableBlock> _mutable_block;
99
100
    bool _is_local;
101
    const int _batch_size;
102
    std::atomic<size_t> _buffer_mem_limit = UINT64_MAX;
103
};
104
105
class Channel {
106
public:
107
    friend class ExchangeSinkBuffer;
108
    // Create channel to send data to particular ipaddress/port/query/node
109
    // combination. buffer_size is specified in bytes and a soft limit on
110
    // how much tuple data is getting accumulated before being sent; it only applies
111
    // when data is added via add_row() and not sent directly via send_batch().
112
    Channel(ExchangeSinkLocalState* parent, TNetworkAddress brpc_dest,
113
            TUniqueId fragment_instance_id, PlanNodeId dest_node_id)
114
6.28M
            : _parent(parent),
115
6.28M
              _fragment_instance_id(std::move(fragment_instance_id)),
116
6.28M
              _dest_node_id(dest_node_id),
117
6.28M
              _brpc_dest_addr(std::move(brpc_dest)),
118
6.28M
              _is_local((_brpc_dest_addr.hostname == BackendOptions::get_localhost()) &&
119
6.34M
                        (_brpc_dest_addr.port == config::brpc_port)),
120
6.28M
              _serializer(_parent, _is_local) {}
121
122
6.56M
    virtual ~Channel() = default;
123
124
    // Initialize channel.
125
    // Returns OK if successful, error indication otherwise.
126
    Status init(RuntimeState* state);
127
    Status open(RuntimeState* state);
128
    std::string debug_string() const;
129
130
    MOCK_FUNCTION Status send_local_block(Block* block, bool eos, bool can_be_moved);
131
    // Flush buffered rows and close channel. This function don't wait the response
132
    // of close operation, client should call close_wait() to finish channel's close.
133
    // We split one close operation into two phases in order to make multiple channels
134
    // can run parallel.
135
    Status close(RuntimeState* state);
136
137
0
    std::string get_fragment_instance_id_str() {
138
0
        UniqueId uid(_fragment_instance_id);
139
0
        return uid.to_string();
140
0
    }
141
142
22.3M
    bool is_local() const { return _is_local; }
143
144
23.3M
    bool is_receiver_eof() const { return _receiver_status.is<ErrorCode::END_OF_FILE>(); }
145
146
17.6k
    void set_receiver_eof(Status st) { _receiver_status = st; }
147
148
    int64_t mem_usage() const;
149
150
    // Asynchronously sends a block
151
    // Returns the status of the most recently finished transmit_data
152
    // rpc (or OK if there wasn't one that hasn't been reported yet).
153
    // if batch is nullptr, send the eof packet
154
    MOCK_FUNCTION Status send_remote_block(std::unique_ptr<PBlock>&& block, bool eos = false);
155
    MOCK_FUNCTION Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block,
156
                                              bool eos = false);
157
158
    Status add_rows(Block* block, const uint32_t* data, const uint32_t offset, const uint32_t size,
159
                    bool eos);
160
161
3.08M
    void set_exchange_buffer(ExchangeSinkBuffer* buffer) { _buffer = buffer; }
162
163
3.07M
    InstanceLoId dest_ins_id() const { return _fragment_instance_id.lo; }
164
165
    std::shared_ptr<ExchangeSendCallback<PTransmitDataResult>> get_send_callback(RpcInstance* ins,
166
3.06M
                                                                                 bool eos) {
167
3.06M
        if (!_send_callback) {
168
3.05M
            _send_callback = ExchangeSendCallback<PTransmitDataResult>::create_shared();
169
3.05M
        } else {
170
8.80k
            _send_callback->cntl_->Reset();
171
8.80k
        }
172
3.06M
        _send_callback->init(ins, eos);
173
3.06M
        return _send_callback;
174
3.06M
    }
175
176
    std::shared_ptr<Dependency> get_local_channel_dependency();
177
178
3.82k
    void set_low_memory_mode(RuntimeState* state) { _serializer.set_low_memory_mode(state); }
179
180
private:
181
    Status _send_local_block(bool eos);
182
    Status _send_current_block(bool eos);
183
184
    MOCK_FUNCTION Status _init_brpc_stub(RuntimeState* state);
185
    MOCK_FUNCTION Status _find_local_recvr(RuntimeState* state);
186
187
3.54M
    Status _recvr_status() const {
188
3.54M
        if (_local_recvr && !_local_recvr->is_closed()) {
189
3.53M
            return Status::OK();
190
3.53M
        }
191
4.10k
        return Status::EndOfFile(
192
4.10k
                "local data stream receiver closed"); // local data stream receiver closed
193
3.54M
    }
194
195
    ExchangeSinkLocalState* _parent = nullptr;
196
197
    const TUniqueId _fragment_instance_id;
198
    PlanNodeId _dest_node_id;
199
    bool _closed {false};
200
    bool _need_close {false};
201
    int _be_number;
202
203
    TNetworkAddress _brpc_dest_addr;
204
205
    std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr;
206
    Status _receiver_status;
207
    int32_t _brpc_timeout_ms = 500;
208
209
    bool _is_local;
210
    std::shared_ptr<VDataStreamRecvr> _local_recvr;
211
212
    BlockSerializer _serializer;
213
214
    ExchangeSinkBuffer* _buffer = nullptr;
215
    bool _eos_send = false;
216
    std::shared_ptr<ExchangeSendCallback<PTransmitDataResult>> _send_callback;
217
    std::unique_ptr<PBlock> _pblock;
218
};
219
220
#define HANDLE_CHANNEL_STATUS(state, channel, status)                     \
221
6.53M
    do {                                                                  \
222
6.53M
        if (status.is<ErrorCode::END_OF_FILE>()) {                        \
223
17.6k
            RETURN_IF_ERROR(_handle_eof_channel(state, channel, status)); \
224
6.51M
        } else {                                                          \
225
6.51M
            RETURN_IF_ERROR(status);                                      \
226
6.51M
        }                                                                 \
227
6.53M
    } while (0)
228
229
} // namespace doris
230
231
#include "common/compile_check_end.h"