Coverage Report

Created: 2026-04-14 20:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/exchange/vdata_stream_sender.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <brpc/controller.h>
21
#include <butil/errno.h>
22
#include <fmt/format.h>
23
#include <gen_cpp/Partitions_types.h>
24
#include <gen_cpp/Types_types.h>
25
#include <gen_cpp/data.pb.h>
26
#include <gen_cpp/internal_service.pb.h>
27
#include <gen_cpp/types.pb.h>
28
29
#include <atomic>
30
#include <cstddef>
31
#include <cstdint>
32
#include <memory>
33
#include <ostream>
34
#include <string>
35
#include <utility>
36
#include <vector>
37
38
#include "common/config.h"
39
#include "common/global_types.h"
40
#include "common/logging.h"
41
#include "common/status.h"
42
#include "core/block/block.h"
43
#include "exec/exchange/vdata_stream_recvr.h"
44
#include "exec/operator/exchange_sink_buffer.h"
45
#include "exec/partitioner/partitioner.h"
46
#include "exec/sink/vrow_distribution.h"
47
#include "exec/sink/vtablet_finder.h"
48
#include "exprs/vexpr_context.h"
49
#include "runtime/runtime_profile.h"
50
#include "service/backend_options.h"
51
#include "storage/tablet_info.h"
52
#include "util/brpc_closure.h"
53
#include "util/uid_util.h"
54
55
namespace doris {
56
class ObjectPool;
57
class RuntimeState;
58
class RowDescriptor;
59
class TDataSink;
60
class TDataStreamSink;
61
class TPlanFragmentDestination;
62
63
namespace segment_v2 {
64
enum CompressionTypePB : int;
65
} // namespace segment_v2
66
67
class ExchangeSinkOperatorX;
68
class Dependency;
69
class ExchangeSinkLocalState;
70
71
class BlockSerializer {
72
public:
73
    BlockSerializer(ExchangeSinkLocalState* parent, bool is_local = true);
74
#ifdef BE_TEST
75
18
    BlockSerializer() : _batch_size(0) {};
76
#endif
77
    Status next_serialized_block(Block* src, PBlock* dest, size_t num_receivers, bool* serialized,
78
                                 bool eos, const uint32_t* data = nullptr,
79
                                 const uint32_t offset = 0, const uint32_t size = 0);
80
    Status serialize_block(const Block* src, PBlock* dest, size_t num_receivers = 1);
81
82
14
    MutableBlock* get_block() const { return _mutable_block.get(); }
83
84
64
    size_t mem_usage() const { return _mutable_block ? _mutable_block->allocated_bytes() : 0; }
85
86
7
    void reset_block() { _mutable_block.reset(); }
87
88
23
    void set_is_local(bool is_local) { _is_local = is_local; }
89
0
    bool is_local() const { return _is_local; }
90
91
0
    void set_low_memory_mode(RuntimeState* state) { _buffer_mem_limit = 4 * 1024 * 1024; }
92
93
private:
94
    Status _serialize_block(PBlock* dest, size_t num_receivers = 1);
95
96
    ExchangeSinkLocalState* _parent;
97
    std::unique_ptr<MutableBlock> _mutable_block;
98
99
    bool _is_local;
100
    const int _batch_size;
101
    std::atomic<size_t> _buffer_mem_limit = UINT64_MAX;
102
};
103
104
class Channel {
105
public:
106
    friend class ExchangeSinkBuffer;
107
    // Create channel to send data to particular ipaddress/port/query/node
108
    // combination. buffer_size is specified in bytes and a soft limit on
109
    // how much tuple data is getting accumulated before being sent; it only applies
110
    // when data is added via add_row() and not sent directly via send_batch().
111
    Channel(ExchangeSinkLocalState* parent, TNetworkAddress brpc_dest,
112
            TUniqueId fragment_instance_id, PlanNodeId dest_node_id)
113
72
            : _parent(parent),
114
72
              _fragment_instance_id(std::move(fragment_instance_id)),
115
72
              _dest_node_id(dest_node_id),
116
72
              _brpc_dest_addr(std::move(brpc_dest)),
117
72
              _is_local((_brpc_dest_addr.hostname == BackendOptions::get_localhost()) &&
118
72
                        (_brpc_dest_addr.port == config::brpc_port)),
119
72
              _serializer(_parent, _is_local) {}
120
121
72
    virtual ~Channel() = default;
122
123
    // Initialize channel.
124
    // Returns OK if successful, error indication otherwise.
125
    Status init(RuntimeState* state);
126
    Status open(RuntimeState* state);
127
    std::string debug_string() const;
128
129
    MOCK_FUNCTION Status send_local_block(Block* block, bool eos, bool can_be_moved);
130
    // Flush buffered rows and close channel. This function don't wait the response
131
    // of close operation, client should call close_wait() to finish channel's close.
132
    // We split one close operation into two phases in order to make multiple channels
133
    // can run parallel.
134
    Status close(RuntimeState* state);
135
136
0
    std::string get_fragment_instance_id_str() {
137
0
        UniqueId uid(_fragment_instance_id);
138
0
        return uid.to_string();
139
0
    }
140
141
74
    bool is_local() const { return _is_local; }
142
143
76
    bool is_receiver_eof() const { return _receiver_status.is<ErrorCode::END_OF_FILE>(); }
144
145
19
    void set_receiver_eof(Status st) { _receiver_status = st; }
146
147
    int64_t mem_usage() const;
148
149
    // Asynchronously sends a block
150
    // Returns the status of the most recently finished transmit_data
151
    // rpc (or OK if there wasn't one that hasn't been reported yet).
152
    // if batch is nullptr, send the eof packet
153
    MOCK_FUNCTION Status send_remote_block(std::unique_ptr<PBlock>&& block, bool eos = false);
154
    MOCK_FUNCTION Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block,
155
                                              bool eos = false);
156
157
    Status add_rows(Block* block, const uint32_t* data, const uint32_t offset, const uint32_t size,
158
                    bool eos);
159
160
15
    void set_exchange_buffer(ExchangeSinkBuffer* buffer) { _buffer = buffer; }
161
162
69
    InstanceLoId dest_ins_id() const { return _fragment_instance_id.lo; }
163
164
    std::shared_ptr<ExchangeSendCallback<PTransmitDataResult>> get_send_callback(RpcInstance* ins,
165
28
                                                                                 bool eos) {
166
28
        if (!_send_callback) {
167
22
            _send_callback = ExchangeSendCallback<PTransmitDataResult>::create_shared();
168
22
        } else {
169
6
            _send_callback->cntl_->Reset();
170
6
        }
171
28
        _send_callback->init(ins, eos);
172
28
        return _send_callback;
173
28
    }
174
175
    std::shared_ptr<Dependency> get_local_channel_dependency();
176
177
0
    void set_low_memory_mode(RuntimeState* state) { _serializer.set_low_memory_mode(state); }
178
179
private:
180
    Status _send_local_block(bool eos);
181
    Status _send_current_block(bool eos);
182
183
    MOCK_FUNCTION Status _init_brpc_stub(RuntimeState* state);
184
    MOCK_FUNCTION Status _find_local_recvr(RuntimeState* state);
185
186
7
    Status _recvr_status() const {
187
7
        if (_local_recvr && !_local_recvr->is_closed()) {
188
7
            return Status::OK();
189
7
        }
190
0
        return Status::EndOfFile(
191
0
                "local data stream receiver closed"); // local data stream receiver closed
192
7
    }
193
194
    ExchangeSinkLocalState* _parent = nullptr;
195
196
    const TUniqueId _fragment_instance_id;
197
    PlanNodeId _dest_node_id;
198
    bool _closed {false};
199
    bool _need_close {false};
200
    int _be_number;
201
202
    TNetworkAddress _brpc_dest_addr;
203
204
    std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr;
205
    Status _receiver_status;
206
    int32_t _brpc_timeout_ms = 500;
207
208
    bool _is_local;
209
    std::shared_ptr<VDataStreamRecvr> _local_recvr;
210
211
    BlockSerializer _serializer;
212
213
    ExchangeSinkBuffer* _buffer = nullptr;
214
    bool _eos_send = false;
215
    std::shared_ptr<ExchangeSendCallback<PTransmitDataResult>> _send_callback;
216
    std::unique_ptr<PBlock> _pblock;
217
};
218
219
#define HANDLE_CHANNEL_STATUS(state, channel, status)                     \
220
22
    do {                                                                  \
221
22
        if (status.is<ErrorCode::END_OF_FILE>()) {                        \
222
0
            RETURN_IF_ERROR(_handle_eof_channel(state, channel, status)); \
223
22
        } else {                                                          \
224
22
            RETURN_IF_ERROR(status);                                      \
225
22
        }                                                                 \
226
22
    } while (0)
227
228
} // namespace doris