Coverage Report

Created: 2026-03-16 08:10

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/load_stream_stub.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/load_stream_stub.h"
19
20
#include <sstream>
21
22
#include "common/cast_set.h"
23
#include "runtime/query_context.h"
24
#include "storage/rowset/rowset_writer.h"
25
#include "util/brpc_client_cache.h"
26
#include "util/debug_points.h"
27
#include "util/network_util.h"
28
#include "util/thrift_util.h"
29
#include "util/uid_util.h"
30
31
namespace doris {
32
#include "common/compile_check_begin.h"
33
34
int LoadStreamReplyHandler::on_received_messages(brpc::StreamId id, butil::IOBuf* const messages[],
35
3.90k
                                                 size_t size) {
36
3.90k
    auto stub = _stub.lock();
37
3.90k
    if (!stub) {
38
0
        LOG(WARNING) << "stub is not exist when on_received_messages, " << *this
39
0
                     << ", stream_id=" << id;
40
0
        return 0;
41
0
    }
42
7.81k
    for (size_t i = 0; i < size; i++) {
43
3.90k
        butil::IOBufAsZeroCopyInputStream wrapper(*messages[i]);
44
3.90k
        PLoadStreamResponse response;
45
3.90k
        response.ParseFromZeroCopyStream(&wrapper);
46
47
3.90k
        if (response.eos()) {
48
3.90k
            stub->_is_eos.store(true);
49
3.90k
        }
50
51
3.90k
        Status st = Status::create<false>(response.status());
52
53
3.90k
        std::stringstream ss;
54
3.90k
        ss << "on_received_messages, " << *this << ", stream_id=" << id;
55
3.90k
        if (response.success_tablet_ids_size() > 0) {
56
1.98k
            ss << ", success tablet ids:";
57
12.8k
            for (auto tablet_id : response.success_tablet_ids()) {
58
12.8k
                ss << " " << tablet_id;
59
12.8k
            }
60
1.98k
            std::lock_guard<bthread::Mutex> lock(stub->_success_tablets_mutex);
61
12.8k
            for (auto tablet_id : response.success_tablet_ids()) {
62
12.8k
                stub->_success_tablets.push_back(tablet_id);
63
12.8k
            }
64
1.98k
        }
65
3.90k
        if (response.failed_tablets_size() > 0) {
66
0
            ss << ", failed tablet ids:";
67
0
            for (auto pb : response.failed_tablets()) {
68
0
                ss << " " << pb.id() << ":" << Status::create(pb.status());
69
0
            }
70
0
            std::lock_guard<bthread::Mutex> lock(stub->_failed_tablets_mutex);
71
0
            for (auto pb : response.failed_tablets()) {
72
0
                stub->_failed_tablets.emplace(pb.id(), Status::create(pb.status()));
73
0
            }
74
0
        }
75
3.90k
        if (response.tablet_schemas_size() > 0) {
76
4
            ss << ", tablet schema num: " << response.tablet_schemas_size();
77
4
            std::lock_guard<bthread::Mutex> lock(stub->_schema_mutex);
78
4
            for (const auto& schema : response.tablet_schemas()) {
79
4
                auto tablet_schema = std::make_unique<TabletSchema>();
80
4
                tablet_schema->init_from_pb(schema.tablet_schema());
81
4
                stub->_tablet_schema_for_index->emplace(schema.index_id(),
82
4
                                                        std::move(tablet_schema));
83
4
                stub->_enable_unique_mow_for_index->emplace(
84
4
                        schema.index_id(), schema.enable_unique_key_merge_on_write());
85
4
            }
86
4
            stub->_schema_cv.notify_all();
87
4
        }
88
3.90k
        ss << ", status: " << st;
89
3.90k
        LOG(INFO) << ss.str();
90
91
3.90k
        if (response.tablet_load_rowset_num_infos_size() > 0) {
92
0
            stub->_refresh_back_pressure_version_wait_time(response.tablet_load_rowset_num_infos());
93
0
        }
94
95
3.90k
        if (response.has_load_stream_profile()) {
96
0
            TRuntimeProfileTree tprofile;
97
0
            const uint8_t* buf =
98
0
                    reinterpret_cast<const uint8_t*>(response.load_stream_profile().data());
99
0
            uint32_t len = cast_set<uint32_t>(response.load_stream_profile().size());
100
0
            auto status = deserialize_thrift_msg(buf, &len, false, &tprofile);
101
0
            if (status.ok()) {
102
                // TODO
103
                //_sink->_state->load_channel_profile()->update(tprofile);
104
0
            } else {
105
0
                LOG(WARNING) << "load stream TRuntimeProfileTree deserialize failed, errmsg="
106
0
                             << status;
107
0
            }
108
0
        }
109
3.90k
    }
110
3.90k
    return 0;
111
3.90k
}
112
113
3.90k
void LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
114
3.90k
    Defer defer {[this]() { delete this; }};
115
3.90k
    LOG(INFO) << "on_closed, " << *this << ", stream_id=" << id;
116
3.90k
    auto stub = _stub.lock();
117
3.90k
    if (!stub) {
118
0
        LOG(WARNING) << "stub is not exist when on_closed, " << *this;
119
0
        return;
120
0
    }
121
3.90k
    stub->_is_closed.store(true);
122
3.90k
}
123
124
7.81k
inline std::ostream& operator<<(std::ostream& ostr, const LoadStreamReplyHandler& handler) {
125
7.81k
    ostr << "LoadStreamReplyHandler load_id=" << UniqueId(handler._load_id)
126
7.81k
         << ", dst_id=" << handler._dst_id;
127
7.81k
    return ostr;
128
7.81k
}
129
130
LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id,
131
                               std::shared_ptr<IndexToTabletSchema> schema_map,
132
                               std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental)
133
3.96k
        : _load_id(load_id),
134
3.96k
          _src_id(src_id),
135
3.96k
          _tablet_schema_for_index(schema_map),
136
3.96k
          _enable_unique_mow_for_index(mow_map),
137
3.96k
          _is_incremental(incremental) {};
138
139
3.96k
LoadStreamStub::~LoadStreamStub() {
140
3.96k
    if (_is_open.load() && !_is_closed.load()) {
141
0
        auto ret = brpc::StreamClose(_stream_id);
142
0
        LOG(INFO) << *this << " is deconstructed, close " << (ret == 0 ? "success" : "failed");
143
0
    }
144
3.96k
}
145
146
// open_load_stream
147
Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
148
                            const NodeInfo& node_info, int64_t txn_id,
149
                            const OlapTableSchemaParam& schema,
150
                            const std::vector<PTabletID>& tablets_for_schema, int total_streams,
151
10.8k
                            int64_t idle_timeout_ms, bool enable_profile) {
152
10.8k
    std::unique_lock<bthread::Mutex> lock(_open_mutex);
153
10.8k
    if (_is_init.load()) {
154
6.92k
        return _status;
155
6.92k
    }
156
3.89k
    _is_init.store(true);
157
3.89k
    _dst_id = node_info.id;
158
3.89k
    brpc::StreamOptions opt;
159
3.89k
    opt.max_buf_size = cast_set<int>(config::load_stream_max_buf_size);
160
3.89k
    opt.idle_timeout_ms = idle_timeout_ms;
161
3.89k
    opt.messages_in_batch = config::load_stream_messages_in_batch;
162
3.89k
    opt.handler = new LoadStreamReplyHandler(_load_id, _dst_id, shared_from_this());
163
3.89k
    brpc::Controller cntl;
164
3.89k
    if (int ret = brpc::StreamCreate(&_stream_id, cntl, &opt)) {
165
0
        delete opt.handler;
166
0
        _status = Status::Error<true>(ret, "Failed to create stream");
167
0
        return _status;
168
0
    }
169
3.89k
    cntl.set_timeout_ms(config::open_load_stream_timeout_ms);
170
3.89k
    POpenLoadStreamRequest request;
171
3.89k
    *request.mutable_load_id() = _load_id;
172
3.89k
    request.set_src_id(_src_id);
173
3.89k
    request.set_txn_id(txn_id);
174
3.89k
    request.set_enable_profile(enable_profile);
175
3.89k
    if (_is_incremental) {
176
0
        request.set_total_streams(0);
177
3.90k
    } else if (total_streams > 0) {
178
3.90k
        request.set_total_streams(total_streams);
179
18.4E
    } else {
180
18.4E
        _status = Status::InternalError("total_streams should be greator than 0");
181
18.4E
        return _status;
182
18.4E
    }
183
3.90k
    request.set_idle_timeout_ms(idle_timeout_ms);
184
3.90k
    schema.to_protobuf(request.mutable_schema());
185
3.90k
    for (auto& tablet : tablets_for_schema) {
186
2.00k
        *request.add_tablets() = tablet;
187
2.00k
    }
188
3.90k
    POpenLoadStreamResponse response;
189
    // set connection_group "streaming" to distinguish with non-streaming connections
190
3.90k
    const auto& stub = client_cache->get_client(node_info.host, node_info.brpc_port);
191
3.90k
    if (stub == nullptr) {
192
0
        return Status::InternalError("failed to init brpc client to {}:{}", node_info.host,
193
0
                                     node_info.brpc_port);
194
0
    }
195
3.90k
    stub->open_load_stream(&cntl, &request, &response, nullptr);
196
3.90k
    for (const auto& resp : response.tablet_schemas()) {
197
2.00k
        auto tablet_schema = std::make_unique<TabletSchema>();
198
2.00k
        tablet_schema->init_from_pb(resp.tablet_schema());
199
2.00k
        _tablet_schema_for_index->emplace(resp.index_id(), std::move(tablet_schema));
200
2.00k
        _enable_unique_mow_for_index->emplace(resp.index_id(),
201
2.00k
                                              resp.enable_unique_key_merge_on_write());
202
2.00k
    }
203
3.90k
    if (response.tablet_load_rowset_num_infos_size() > 0) {
204
0
        _refresh_back_pressure_version_wait_time(response.tablet_load_rowset_num_infos());
205
0
    }
206
3.90k
    if (cntl.Failed()) {
207
0
        brpc::StreamClose(_stream_id);
208
0
        _status = Status::InternalError("Failed to connect to backend {}: {}", _dst_id,
209
0
                                        cntl.ErrorText());
210
0
        return _status;
211
0
    }
212
3.90k
    LOG(INFO) << "open load stream to host=" << node_info.host << ", port=" << node_info.brpc_port
213
3.90k
              << ", " << *this;
214
3.90k
    _is_open.store(true);
215
3.90k
    _status = Status::OK();
216
3.90k
    return _status;
217
3.90k
}
218
219
// APPEND_DATA
220
Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id,
221
                                   int32_t segment_id, uint64_t offset, std::span<const Slice> data,
222
110k
                                   bool segment_eos, FileType file_type) {
223
110k
    if (!_is_open.load()) {
224
0
        add_failed_tablet(tablet_id, _status);
225
0
        return _status;
226
0
    }
227
110k
    DBUG_EXECUTE_IF("LoadStreamStub.skip_send_segment", { return Status::OK(); });
228
110k
    PStreamHeader header;
229
110k
    header.set_src_id(_src_id);
230
110k
    *header.mutable_load_id() = _load_id;
231
110k
    header.set_partition_id(partition_id);
232
110k
    header.set_index_id(index_id);
233
110k
    header.set_tablet_id(tablet_id);
234
110k
    header.set_segment_id(segment_id);
235
110k
    header.set_segment_eos(segment_eos);
236
110k
    header.set_offset(offset);
237
110k
    header.set_opcode(doris::PStreamHeader::APPEND_DATA);
238
110k
    header.set_file_type(file_type);
239
110k
    return _encode_and_send(header, data);
240
110k
}
241
242
// ADD_SEGMENT
243
Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id,
244
3.72k
                                   int32_t segment_id, const SegmentStatistics& segment_stat) {
245
3.72k
    if (!_is_open.load()) {
246
0
        add_failed_tablet(tablet_id, _status);
247
0
        return _status;
248
0
    }
249
3.72k
    DBUG_EXECUTE_IF("LoadStreamStub.skip_send_segment", { return Status::OK(); });
250
3.72k
    PStreamHeader header;
251
3.72k
    header.set_src_id(_src_id);
252
3.72k
    *header.mutable_load_id() = _load_id;
253
3.72k
    header.set_partition_id(partition_id);
254
3.72k
    header.set_index_id(index_id);
255
3.72k
    header.set_tablet_id(tablet_id);
256
3.72k
    header.set_segment_id(segment_id);
257
3.72k
    header.set_opcode(doris::PStreamHeader::ADD_SEGMENT);
258
3.72k
    segment_stat.to_pb(header.mutable_segment_statistics());
259
3.72k
    return _encode_and_send(header);
260
3.72k
}
261
262
// CLOSE_LOAD
263
Status LoadStreamStub::close_load(const std::vector<PTabletID>& tablets_to_commit,
264
3.90k
                                  int num_incremental_streams) {
265
3.90k
    if (!_is_open.load()) {
266
0
        return _status;
267
0
    }
268
3.90k
    PStreamHeader header;
269
3.90k
    *header.mutable_load_id() = _load_id;
270
3.90k
    header.set_src_id(_src_id);
271
3.90k
    header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
272
12.8k
    for (const auto& tablet : tablets_to_commit) {
273
12.8k
        *header.add_tablets() = tablet;
274
12.8k
    }
275
3.90k
    header.set_num_incremental_streams(num_incremental_streams);
276
3.90k
    _status = _encode_and_send(header);
277
3.90k
    if (!_status.ok()) {
278
0
        LOG(WARNING) << "stream " << _stream_id << " close failed: " << _status;
279
0
        return _status;
280
0
    }
281
3.90k
    _is_closing.store(true);
282
3.90k
    return Status::OK();
283
3.90k
}
284
285
// GET_SCHEMA
286
4
Status LoadStreamStub::get_schema(const std::vector<PTabletID>& tablets) {
287
4
    if (!_is_open.load()) {
288
0
        return _status;
289
0
    }
290
4
    PStreamHeader header;
291
4
    *header.mutable_load_id() = _load_id;
292
4
    header.set_src_id(_src_id);
293
4
    header.set_opcode(doris::PStreamHeader::GET_SCHEMA);
294
4
    std::ostringstream oss;
295
4
    oss << "fetching tablet schema from stream " << _stream_id
296
4
        << ", load id: " << print_id(_load_id) << ", tablet id:";
297
4
    for (const auto& tablet : tablets) {
298
4
        *header.add_tablets() = tablet;
299
4
        oss << " " << tablet.tablet_id();
300
4
    }
301
4
    if (tablets.size() == 0) {
302
0
        oss << " none";
303
0
    }
304
4
    LOG(INFO) << oss.str();
305
4
    return _encode_and_send(header);
306
4
}
307
308
Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, int64_t tablet_id,
309
3.73k
                                       int64_t timeout_ms) {
310
3.73k
    if (!_is_open.load()) {
311
0
        return _status;
312
0
    }
313
3.73k
    if (_tablet_schema_for_index->contains(index_id)) {
314
3.72k
        return Status::OK();
315
3.72k
    }
316
4
    PTabletID tablet;
317
4
    tablet.set_partition_id(partition_id);
318
4
    tablet.set_index_id(index_id);
319
4
    tablet.set_tablet_id(tablet_id);
320
4
    RETURN_IF_ERROR(get_schema({tablet}));
321
322
4
    MonotonicStopWatch watch;
323
4
    watch.start();
324
8
    while (!_tablet_schema_for_index->contains(index_id) &&
325
8
           watch.elapsed_time() / 1000 / 1000 < timeout_ms) {
326
4
        RETURN_IF_ERROR(check_cancel());
327
4
        static_cast<void>(wait_for_new_schema(100));
328
4
    }
329
330
4
    if (!_tablet_schema_for_index->contains(index_id)) {
331
0
        return Status::TimedOut("timeout to get tablet schema for index {}", index_id);
332
0
    }
333
4
    return Status::OK();
334
4
}
335
336
64.0k
Status LoadStreamStub::close_finish_check(RuntimeState* state, bool* is_closed) {
337
64.0k
    DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", DBUG_BLOCK);
338
64.0k
    DBUG_EXECUTE_IF("LoadStreamStub::close_finish_check.close_failed",
339
64.0k
                    { return Status::InternalError("close failed"); });
340
64.0k
    *is_closed = true;
341
64.0k
    if (!_is_open.load()) {
342
        // we don't need to close wait on non-open streams
343
0
        return Status::OK();
344
0
    }
345
    // If stream is cancelled (e.g., due to connection failure), treat it as closed
346
    // to avoid waiting indefinitely for a stream that will never respond.
347
64.0k
    if (_is_cancelled.load()) {
348
0
        return check_cancel();
349
0
    }
350
64.0k
    if (state->get_query_ctx()->is_cancelled()) {
351
0
        return state->get_query_ctx()->exec_status();
352
0
    }
353
64.0k
    if (!_is_closing.load()) {
354
29.3k
        *is_closed = false;
355
29.3k
        return _status;
356
29.3k
    }
357
34.6k
    if (_is_closed.load()) {
358
21.6k
        RETURN_IF_ERROR(check_cancel());
359
21.6k
        if (!_is_eos.load()) {
360
0
            return Status::InternalError("Stream closed without EOS, {}", to_string());
361
0
        }
362
21.6k
        return Status::OK();
363
21.6k
    }
364
13.0k
    *is_closed = false;
365
13.0k
    return Status::OK();
366
34.6k
}
367
368
0
void LoadStreamStub::cancel(Status reason) {
369
0
    LOG(WARNING) << *this << " is cancelled because of " << reason;
370
0
    if (_is_open.load()) {
371
0
        brpc::StreamClose(_stream_id);
372
0
    }
373
0
    {
374
0
        std::lock_guard<bthread::Mutex> lock(_cancel_mutex);
375
0
        _cancel_st = reason;
376
0
        _is_cancelled.store(true);
377
0
    }
378
0
    _is_closed.store(true);
379
0
}
380
381
118k
Status LoadStreamStub::_encode_and_send(PStreamHeader& header, std::span<const Slice> data) {
382
118k
    butil::IOBuf buf;
383
118k
    size_t header_len = header.ByteSizeLong();
384
118k
    buf.append(reinterpret_cast<uint8_t*>(&header_len), sizeof(header_len));
385
118k
    buf.append(header.SerializeAsString());
386
118k
    size_t data_len = std::transform_reduce(data.begin(), data.end(), 0, std::plus(),
387
360k
                                            [](const Slice& s) { return s.get_size(); });
388
118k
    buf.append(reinterpret_cast<uint8_t*>(&data_len), sizeof(data_len));
389
360k
    for (const auto& slice : data) {
390
360k
        buf.append(slice.get_data(), slice.get_size());
391
360k
    }
392
118k
    bool eos = header.opcode() == doris::PStreamHeader::CLOSE_LOAD;
393
118k
    bool get_schema = header.opcode() == doris::PStreamHeader::GET_SCHEMA;
394
118k
    add_bytes_written(buf.size());
395
118k
    return _send_with_buffer(buf, eos || get_schema);
396
118k
}
397
398
118k
Status LoadStreamStub::_send_with_buffer(butil::IOBuf& buf, bool sync) {
399
118k
    butil::IOBuf output;
400
118k
    std::unique_lock<decltype(_buffer_mutex)> buffer_lock(_buffer_mutex);
401
118k
    _buffer.append(buf);
402
118k
    if (!sync && _buffer.size() < config::brpc_streaming_client_batch_bytes) {
403
114k
        return Status::OK();
404
114k
    }
405
3.86k
    output.swap(_buffer);
406
    // acquire send lock while holding buffer lock, to ensure the message order
407
3.86k
    std::lock_guard<decltype(_send_mutex)> send_lock(_send_mutex);
408
3.86k
    buffer_lock.unlock();
409
18.4E
    VLOG_DEBUG << "send buf size : " << output.size() << ", sync: " << sync;
410
3.86k
    auto st = _send_with_retry(output);
411
3.86k
    if (!st.ok()) {
412
0
        _handle_failure(output, st);
413
0
    }
414
3.86k
    return st;
415
118k
}
416
417
0
void LoadStreamStub::_handle_failure(butil::IOBuf& buf, Status st) {
418
0
    while (buf.size() > 0) {
419
        // step 1: parse header
420
0
        size_t hdr_len = 0;
421
0
        buf.cutn((void*)&hdr_len, sizeof(size_t));
422
0
        butil::IOBuf hdr_buf;
423
0
        PStreamHeader hdr;
424
0
        buf.cutn(&hdr_buf, hdr_len);
425
0
        butil::IOBufAsZeroCopyInputStream wrapper(hdr_buf);
426
0
        hdr.ParseFromZeroCopyStream(&wrapper);
427
428
        // step 2: cut data
429
0
        size_t data_len = 0;
430
0
        buf.cutn((void*)&data_len, sizeof(size_t));
431
0
        butil::IOBuf data_buf;
432
0
        buf.cutn(&data_buf, data_len);
433
434
        // step 3: handle failure
435
0
        switch (hdr.opcode()) {
436
0
        case PStreamHeader::ADD_SEGMENT:
437
0
        case PStreamHeader::APPEND_DATA: {
438
0
            DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.append_data_failed", {
439
0
                add_failed_tablet(hdr.tablet_id(), st);
440
0
                return;
441
0
            });
442
0
            DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.add_segment_failed", {
443
0
                add_failed_tablet(hdr.tablet_id(), st);
444
0
                return;
445
0
            });
446
0
            add_failed_tablet(hdr.tablet_id(), st);
447
0
        } break;
448
0
        case PStreamHeader::CLOSE_LOAD: {
449
0
            DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.close_load_failed", {
450
0
                brpc::StreamClose(_stream_id);
451
0
                return;
452
0
            });
453
0
            brpc::StreamClose(_stream_id);
454
0
        } break;
455
0
        case PStreamHeader::GET_SCHEMA: {
456
0
            DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.get_schema_failed", {
457
                // Just log and let wait_for_schema timeout
458
0
                std::ostringstream oss;
459
0
                for (const auto& tablet : hdr.tablets()) {
460
0
                    oss << " " << tablet.tablet_id();
461
0
                }
462
0
                LOG(WARNING) << "failed to send GET_SCHEMA request, tablet_id:" << oss.str() << ", "
463
0
                             << *this;
464
0
                return;
465
0
            });
466
            // Just log and let wait_for_schema timeout
467
0
            std::ostringstream oss;
468
0
            for (const auto& tablet : hdr.tablets()) {
469
0
                oss << " " << tablet.tablet_id();
470
0
            }
471
0
            LOG(WARNING) << "failed to send GET_SCHEMA request, tablet_id:" << oss.str() << ", "
472
0
                         << *this;
473
0
        } break;
474
0
        default:
475
0
            LOG(WARNING) << "unexpected stream message " << hdr.opcode() << ", " << *this;
476
0
            DCHECK(false);
477
0
        }
478
0
    }
479
0
}
480
481
3.90k
Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) {
482
3.90k
    for (;;) {
483
3.90k
        RETURN_IF_ERROR(check_cancel());
484
3.90k
        int ret;
485
3.90k
        {
486
3.90k
            DBUG_EXECUTE_IF("LoadStreamStub._send_with_retry.delay_before_send", {
487
3.90k
                int64_t delay_ms = dp->param<int64_t>("delay_ms", 1000);
488
3.90k
                bthread_usleep(delay_ms * 1000);
489
3.90k
            });
490
3.90k
            brpc::StreamWriteOptions options;
491
3.90k
            options.write_in_background = config::enable_brpc_stream_write_background;
492
3.90k
            ret = brpc::StreamWrite(_stream_id, buf, &options);
493
3.90k
        }
494
3.90k
        DBUG_EXECUTE_IF("LoadStreamStub._send_with_retry.stream_write_failed", { ret = EPIPE; });
495
3.90k
        switch (ret) {
496
3.90k
        case 0:
497
3.90k
            return Status::OK();
498
0
        case EAGAIN: {
499
0
            const timespec time = butil::seconds_from_now(config::load_stream_eagain_wait_seconds);
500
0
            int wait_ret = brpc::StreamWait(_stream_id, &time);
501
0
            if (wait_ret != 0) {
502
0
                return Status::InternalError("StreamWait failed, err={}, {}", wait_ret,
503
0
                                             to_string());
504
0
            }
505
0
            break;
506
0
        }
507
0
        default:
508
0
            return Status::InternalError("StreamWrite failed, err={}, {}", ret, to_string());
509
3.90k
        }
510
3.90k
    }
511
3.90k
}
512
513
void LoadStreamStub::_refresh_back_pressure_version_wait_time(
514
        const ::google::protobuf::RepeatedPtrField<::doris::PTabletLoadRowsetInfo>&
515
0
                tablet_load_infos) {
516
0
    int64_t max_rowset_num_gap = 0;
517
    // if any one tablet is under high load pressure, we would make the whole procedure
518
    // sleep to prevent the corresponding BE return -235
519
0
    std::for_each(
520
0
            tablet_load_infos.begin(), tablet_load_infos.end(),
521
0
            [&max_rowset_num_gap](auto& load_info) {
522
0
                int64_t cur_rowset_num = load_info.current_rowset_nums();
523
0
                int64_t high_load_point = load_info.max_config_rowset_nums() *
524
0
                                          (config::load_back_pressure_version_threshold / 100);
525
0
                DCHECK(cur_rowset_num > high_load_point);
526
0
                max_rowset_num_gap = std::max(max_rowset_num_gap, cur_rowset_num - high_load_point);
527
0
            });
528
    // to slow down the high load pressure
529
    // we would use the rowset num gap to calculate one sleep time
530
    // for example:
531
    // if the max tablet version is 2000, there are 3 BE
532
    // A: ====================  1800
533
    // B: ===================   1700
534
    // C: ==================    1600
535
    //    ==================    1600
536
    //                      ^
537
    //                      the high load point
538
    // then then max gap is 1800 - (max tablet version * config::load_back_pressure_version_threshold / 100) = 200,
539
    // we would make the whole send procesure sleep
540
    // 1200ms for compaction to be done toe reduce the high pressure
541
0
    auto max_time = config::max_load_back_pressure_version_wait_time_ms;
542
0
    if (UNLIKELY(max_rowset_num_gap > 0)) {
543
0
        _load_back_pressure_version_wait_time_ms.store(
544
0
                std::min(max_rowset_num_gap + 1000, max_time));
545
0
        LOG(INFO) << "try to back pressure version, wait time(ms): "
546
0
                  << _load_back_pressure_version_wait_time_ms << ", load id: " << print_id(_load_id)
547
0
                  << ", max_rowset_num_gap: " << max_rowset_num_gap;
548
0
    }
549
0
}
550
551
0
std::string LoadStreamStub::to_string() {
552
0
    std::ostringstream ss;
553
0
    ss << *this;
554
0
    return ss.str();
555
0
}
556
557
3.90k
inline std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub& stub) {
558
3.90k
    ostr << "LoadStreamStub load_id=" << print_id(stub._load_id) << ", src_id=" << stub._src_id
559
3.90k
         << ", dst_id=" << stub._dst_id << ", stream_id=" << stub._stream_id;
560
3.90k
    return ostr;
561
3.90k
}
562
563
Status LoadStreamStubs::open(BrpcClientCache<PBackendService_Stub>* client_cache,
564
                             const NodeInfo& node_info, int64_t txn_id,
565
                             const OlapTableSchemaParam& schema,
566
                             const std::vector<PTabletID>& tablets_for_schema, int total_streams,
567
5.46k
                             int64_t idle_timeout_ms, bool enable_profile) {
568
5.46k
    bool get_schema = true;
569
5.46k
    auto status = Status::OK();
570
5.46k
    bool first_stream = true;
571
10.8k
    for (auto& stream : _streams) {
572
10.8k
        Status st;
573
10.8k
        if (get_schema) {
574
5.47k
            st = stream->open(client_cache, node_info, txn_id, schema, tablets_for_schema,
575
5.47k
                              total_streams, idle_timeout_ms, enable_profile);
576
5.47k
        } else {
577
5.35k
            st = stream->open(client_cache, node_info, txn_id, schema, {}, total_streams,
578
5.35k
                              idle_timeout_ms, enable_profile);
579
5.35k
        }
580
        // Simulate one stream open failure within LoadStreamStubs.
581
        // This causes the successfully opened streams to be cancelled,
582
        // reproducing the bug where cancelled streams cause close_wait timeout.
583
10.8k
        DBUG_EXECUTE_IF("LoadStreamStubs.open.fail_one_stream", {
584
10.8k
            if (st.ok() && !first_stream) {
585
10.8k
                st = Status::InternalError("Injected stream open failure");
586
10.8k
            }
587
10.8k
        });
588
10.8k
        if (st.ok()) {
589
10.8k
            get_schema = false;
590
18.4E
        } else {
591
18.4E
            LOG(WARNING) << "open stream failed: " << st << "; stream: " << *stream;
592
18.4E
            status = st;
593
            // no break here to try get schema from the rest streams
594
18.4E
        }
595
10.8k
        first_stream = false;
596
10.8k
    }
597
    // only mark open when all streams open success
598
5.46k
    _open_success.store(status.ok());
599
    // cancel all streams if open failed
600
5.46k
    if (!status.ok()) {
601
0
        cancel(status);
602
0
    }
603
5.46k
    return status;
604
5.46k
}
605
606
Status LoadStreamStubs::close_load(const std::vector<PTabletID>& tablets_to_commit,
607
2.01k
                                   int num_incremental_streams) {
608
2.01k
    if (!_open_success.load()) {
609
0
        return Status::InternalError("streams not open");
610
0
    }
611
2.01k
    bool first = true;
612
2.01k
    auto status = Status::OK();
613
3.90k
    for (auto& stream : _streams) {
614
3.90k
        Status st;
615
3.90k
        if (first) {
616
2.01k
            st = stream->close_load(tablets_to_commit, num_incremental_streams);
617
2.01k
            first = false;
618
2.01k
        } else {
619
1.89k
            st = stream->close_load({}, num_incremental_streams);
620
1.89k
        }
621
3.90k
        if (!st.ok()) {
622
            LOG(WARNING) << "close_load failed: " << st << "; stream: " << *stream;
623
0
        }
624
3.90k
    }
625
2.01k
    return status;
626
2.01k
}
627
628
} // namespace doris