Coverage Report

Created: 2026-06-24 11:44

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/load_stream_stub.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/load_stream_stub.h"
19
20
#include <sstream>
21
22
#include "common/cast_set.h"
23
#include "runtime/query_context.h"
24
#include "storage/rowset/rowset_writer.h"
25
#include "util/brpc_client_cache.h"
26
#include "util/debug_points.h"
27
#include "util/network_util.h"
28
#include "util/thrift_util.h"
29
#include "util/uid_util.h"
30
31
namespace doris {
32
33
400
int64_t CloseWaitNotifier::close_wait_version() const {
34
400
    return _close_wait_version.load(std::memory_order_acquire);
35
400
}
36
37
132
void CloseWaitNotifier::wait_for_close_event(int64_t observed_version, int64_t timeout_ms) {
38
132
    std::unique_lock<bthread::Mutex> lock(_close_wait_mutex);
39
132
    if (observed_version != close_wait_version()) {
40
0
        return;
41
0
    }
42
132
    static_cast<void>(_close_wait_cv.wait_for(lock, timeout_ms * 1000));
43
132
}
44
45
134
void CloseWaitNotifier::notify_close_wait() {
46
134
    _close_wait_version.fetch_add(1, std::memory_order_acq_rel);
47
134
    std::lock_guard<bthread::Mutex> lock(_close_wait_mutex);
48
134
    _close_wait_cv.notify_all();
49
134
}
50
51
int LoadStreamReplyHandler::on_received_messages(brpc::StreamId id, butil::IOBuf* const messages[],
52
132
                                                 size_t size) {
53
132
    auto stub = _stub.lock();
54
132
    if (!stub) {
55
0
        LOG(WARNING) << "stub is not exist when on_received_messages, " << *this
56
0
                     << ", stream_id=" << id;
57
0
        return 0;
58
0
    }
59
264
    for (size_t i = 0; i < size; i++) {
60
132
        butil::IOBufAsZeroCopyInputStream wrapper(*messages[i]);
61
132
        PLoadStreamResponse response;
62
132
        response.ParseFromZeroCopyStream(&wrapper);
63
64
132
        if (response.eos()) {
65
132
            stub->_is_eos.store(true);
66
132
        }
67
68
132
        Status st = Status::create<false>(response.status());
69
70
132
        std::stringstream ss;
71
132
        ss << "on_received_messages, " << *this << ", stream_id=" << id;
72
132
        if (response.success_tablet_ids_size() > 0) {
73
66
            ss << ", success tablet ids:";
74
462
            for (auto tablet_id : response.success_tablet_ids()) {
75
462
                ss << " " << tablet_id;
76
462
            }
77
66
            std::lock_guard<bthread::Mutex> lock(stub->_success_tablets_mutex);
78
462
            for (auto tablet_id : response.success_tablet_ids()) {
79
462
                stub->_success_tablets.push_back(tablet_id);
80
462
            }
81
66
        }
82
132
        if (response.failed_tablets_size() > 0) {
83
0
            ss << ", failed tablet ids:";
84
0
            for (auto pb : response.failed_tablets()) {
85
0
                ss << " " << pb.id() << ":" << Status::create(pb.status());
86
0
            }
87
0
            std::lock_guard<bthread::Mutex> lock(stub->_failed_tablets_mutex);
88
0
            for (auto pb : response.failed_tablets()) {
89
0
                stub->_failed_tablets.emplace(pb.id(), Status::create(pb.status()));
90
0
            }
91
0
        }
92
132
        if (response.tablet_schemas_size() > 0) {
93
0
            ss << ", tablet schema num: " << response.tablet_schemas_size();
94
0
            std::lock_guard<bthread::Mutex> lock(stub->_schema_mutex);
95
0
            for (const auto& schema : response.tablet_schemas()) {
96
0
                auto tablet_schema = std::make_unique<TabletSchema>();
97
0
                tablet_schema->init_from_pb(schema.tablet_schema());
98
0
                stub->_tablet_schema_for_index->emplace(schema.index_id(),
99
0
                                                        std::move(tablet_schema));
100
0
                stub->_enable_unique_mow_for_index->emplace(
101
0
                        schema.index_id(), schema.enable_unique_key_merge_on_write());
102
0
            }
103
0
            stub->_schema_cv.notify_all();
104
0
        }
105
132
        ss << ", status: " << st;
106
132
        LOG(INFO) << ss.str();
107
108
132
        if (response.tablet_load_rowset_num_infos_size() > 0) {
109
0
            stub->_refresh_back_pressure_version_wait_time(response.tablet_load_rowset_num_infos());
110
0
        }
111
112
132
        if (response.has_load_stream_profile()) {
113
0
            TRuntimeProfileTree tprofile;
114
0
            const uint8_t* buf =
115
0
                    reinterpret_cast<const uint8_t*>(response.load_stream_profile().data());
116
0
            uint32_t len = cast_set<uint32_t>(response.load_stream_profile().size());
117
0
            auto status = deserialize_thrift_msg(buf, &len, false, &tprofile);
118
0
            if (status.ok()) {
119
                // TODO
120
                //_sink->_state->load_channel_profile()->update(tprofile);
121
0
            } else {
122
0
                LOG(WARNING) << "load stream TRuntimeProfileTree deserialize failed, errmsg="
123
0
                             << status;
124
0
            }
125
0
        }
126
132
    }
127
132
    return 0;
128
132
}
129
130
132
void LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
131
132
    Defer defer {[this]() { delete this; }};
132
132
    LOG(INFO) << "on_closed, " << *this << ", stream_id=" << id;
133
132
    auto stub = _stub.lock();
134
132
    if (!stub) {
135
0
        LOG(WARNING) << "stub is not exist when on_closed, " << *this;
136
0
        return;
137
0
    }
138
132
    stub->_is_closed.store(true);
139
132
    stub->notify_close_wait();
140
132
}
141
142
264
inline std::ostream& operator<<(std::ostream& ostr, const LoadStreamReplyHandler& handler) {
143
264
    ostr << "LoadStreamReplyHandler load_id=" << UniqueId(handler._load_id)
144
264
         << ", dst_id=" << handler._dst_id;
145
264
    return ostr;
146
264
}
147
148
LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id,
149
                               std::shared_ptr<IndexToTabletSchema> schema_map,
150
                               std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental,
151
                               std::shared_ptr<CloseWaitNotifier> close_wait_notifier)
152
192
        : _load_id(load_id),
153
192
          _src_id(src_id),
154
192
          _tablet_schema_for_index(schema_map),
155
192
          _enable_unique_mow_for_index(mow_map),
156
192
          _is_incremental(incremental),
157
192
          _close_wait_notifier(std::move(close_wait_notifier)) {
158
192
    DCHECK(_close_wait_notifier != nullptr);
159
192
};
160
161
192
LoadStreamStub::~LoadStreamStub() {
162
192
    if (_is_open.load() && !_is_closed.load()) {
163
0
        auto ret = brpc::StreamClose(_stream_id);
164
0
        LOG(INFO) << *this << " is deconstructed, close " << (ret == 0 ? "success" : "failed");
165
0
    }
166
192
}
167
168
// open_load_stream
169
Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
170
                            const NodeInfo& node_info, int64_t txn_id,
171
                            const OlapTableSchemaParam& schema,
172
                            const std::vector<PTabletID>& tablets_for_schema, int total_streams,
173
132
                            int64_t idle_timeout_ms, bool enable_profile) {
174
132
    std::unique_lock<bthread::Mutex> lock(_open_mutex);
175
132
    if (_is_init.load()) {
176
0
        return _status;
177
0
    }
178
132
    _is_init.store(true);
179
132
    _dst_id = node_info.id;
180
132
    brpc::StreamOptions opt;
181
132
    opt.max_buf_size = cast_set<int>(config::load_stream_max_buf_size);
182
132
    opt.idle_timeout_ms = idle_timeout_ms;
183
132
    opt.messages_in_batch = config::load_stream_messages_in_batch;
184
132
    opt.handler = new LoadStreamReplyHandler(_load_id, _dst_id, shared_from_this());
185
132
    brpc::Controller cntl;
186
132
    if (int ret = brpc::StreamCreate(&_stream_id, cntl, &opt)) {
187
0
        delete opt.handler;
188
0
        _status = Status::Error<true>(ret, "Failed to create stream");
189
0
        return _status;
190
0
    }
191
132
    cntl.set_timeout_ms(config::open_load_stream_timeout_ms);
192
132
    POpenLoadStreamRequest request;
193
132
    *request.mutable_load_id() = _load_id;
194
132
    request.set_src_id(_src_id);
195
132
    request.set_txn_id(txn_id);
196
132
    request.set_enable_profile(enable_profile);
197
132
    if (_is_incremental) {
198
0
        request.set_total_streams(0);
199
132
    } else if (total_streams > 0) {
200
132
        request.set_total_streams(total_streams);
201
132
    } else {
202
0
        _status = Status::InternalError("total_streams should be greator than 0");
203
0
        return _status;
204
0
    }
205
132
    request.set_idle_timeout_ms(idle_timeout_ms);
206
132
    schema.to_protobuf(request.mutable_schema());
207
132
    for (auto& tablet : tablets_for_schema) {
208
66
        *request.add_tablets() = tablet;
209
66
    }
210
132
    POpenLoadStreamResponse response;
211
    // set connection_group "streaming" to distinguish with non-streaming connections
212
132
    const auto& stub = client_cache->get_client(node_info.host, node_info.brpc_port);
213
132
    if (stub == nullptr) {
214
0
        return Status::InternalError("failed to init brpc client to {}:{}", node_info.host,
215
0
                                     node_info.brpc_port);
216
0
    }
217
132
    stub->open_load_stream(&cntl, &request, &response, nullptr);
218
132
    for (const auto& resp : response.tablet_schemas()) {
219
66
        auto tablet_schema = std::make_unique<TabletSchema>();
220
66
        tablet_schema->init_from_pb(resp.tablet_schema());
221
66
        _tablet_schema_for_index->emplace(resp.index_id(), std::move(tablet_schema));
222
66
        _enable_unique_mow_for_index->emplace(resp.index_id(),
223
66
                                              resp.enable_unique_key_merge_on_write());
224
66
    }
225
132
    if (response.tablet_load_rowset_num_infos_size() > 0) {
226
0
        _refresh_back_pressure_version_wait_time(response.tablet_load_rowset_num_infos());
227
0
    }
228
132
    if (cntl.Failed()) {
229
0
        brpc::StreamClose(_stream_id);
230
0
        _status = Status::InternalError("Failed to connect to backend {}: {}", _dst_id,
231
0
                                        cntl.ErrorText());
232
0
        return _status;
233
0
    }
234
132
    LOG(INFO) << "open load stream to host=" << node_info.host << ", port=" << node_info.brpc_port
235
132
              << ", " << *this;
236
132
    _is_open.store(true);
237
132
    _status = Status::OK();
238
132
    return _status;
239
132
}
240
241
// APPEND_DATA
242
Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id,
243
                                   int32_t segment_id, uint64_t offset, std::span<const Slice> data,
244
8.70k
                                   bool segment_eos, FileType file_type) {
245
8.70k
    if (!_is_open.load()) {
246
0
        add_failed_tablet(tablet_id, _status);
247
0
        return _status;
248
0
    }
249
8.70k
    DBUG_EXECUTE_IF("LoadStreamStub.skip_send_segment", { return Status::OK(); });
250
8.70k
    PStreamHeader header;
251
8.70k
    header.set_src_id(_src_id);
252
8.70k
    *header.mutable_load_id() = _load_id;
253
8.70k
    header.set_partition_id(partition_id);
254
8.70k
    header.set_index_id(index_id);
255
8.70k
    header.set_tablet_id(tablet_id);
256
8.70k
    header.set_segment_id(segment_id);
257
8.70k
    header.set_segment_eos(segment_eos);
258
8.70k
    header.set_offset(offset);
259
8.70k
    header.set_opcode(doris::PStreamHeader::APPEND_DATA);
260
8.70k
    header.set_file_type(file_type);
261
8.70k
    return _encode_and_send(header, data);
262
8.70k
}
263
264
// ADD_SEGMENT
265
Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id,
266
134
                                   int32_t segment_id, const SegmentStatistics& segment_stat) {
267
134
    if (!_is_open.load()) {
268
0
        add_failed_tablet(tablet_id, _status);
269
0
        return _status;
270
0
    }
271
134
    DBUG_EXECUTE_IF("LoadStreamStub.skip_send_segment", { return Status::OK(); });
272
134
    PStreamHeader header;
273
134
    header.set_src_id(_src_id);
274
134
    *header.mutable_load_id() = _load_id;
275
134
    header.set_partition_id(partition_id);
276
134
    header.set_index_id(index_id);
277
134
    header.set_tablet_id(tablet_id);
278
134
    header.set_segment_id(segment_id);
279
134
    header.set_opcode(doris::PStreamHeader::ADD_SEGMENT);
280
134
    segment_stat.to_pb(header.mutable_segment_statistics());
281
134
    return _encode_and_send(header);
282
134
}
283
284
// CLOSE_LOAD
285
Status LoadStreamStub::close_load(const std::vector<PTabletID>& tablets_to_commit,
286
132
                                  int num_incremental_streams) {
287
132
    if (!_is_open.load()) {
288
0
        return _status;
289
0
    }
290
132
    PStreamHeader header;
291
132
    *header.mutable_load_id() = _load_id;
292
132
    header.set_src_id(_src_id);
293
132
    header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
294
462
    for (const auto& tablet : tablets_to_commit) {
295
462
        *header.add_tablets() = tablet;
296
462
    }
297
132
    header.set_num_incremental_streams(num_incremental_streams);
298
132
    _status = _encode_and_send(header);
299
132
    if (!_status.ok()) {
300
0
        LOG(WARNING) << "stream " << _stream_id << " close failed: " << _status;
301
0
        return _status;
302
0
    }
303
132
    _is_closing.store(true);
304
132
    return Status::OK();
305
132
}
306
307
// GET_SCHEMA
308
0
Status LoadStreamStub::get_schema(const std::vector<PTabletID>& tablets) {
309
0
    if (!_is_open.load()) {
310
0
        return _status;
311
0
    }
312
0
    PStreamHeader header;
313
0
    *header.mutable_load_id() = _load_id;
314
0
    header.set_src_id(_src_id);
315
0
    header.set_opcode(doris::PStreamHeader::GET_SCHEMA);
316
0
    std::ostringstream oss;
317
0
    oss << "fetching tablet schema from stream " << _stream_id
318
0
        << ", load id: " << print_id(_load_id) << ", tablet id:";
319
0
    for (const auto& tablet : tablets) {
320
0
        *header.add_tablets() = tablet;
321
0
        oss << " " << tablet.tablet_id();
322
0
    }
323
0
    if (tablets.size() == 0) {
324
0
        oss << " none";
325
0
    }
326
0
    LOG(INFO) << oss.str();
327
0
    return _encode_and_send(header);
328
0
}
329
330
Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, int64_t tablet_id,
331
135
                                       int64_t timeout_ms) {
332
135
    if (!_is_open.load()) {
333
0
        return _status;
334
0
    }
335
135
    if (_tablet_schema_for_index->contains(index_id)) {
336
135
        return Status::OK();
337
135
    }
338
0
    PTabletID tablet;
339
0
    tablet.set_partition_id(partition_id);
340
0
    tablet.set_index_id(index_id);
341
0
    tablet.set_tablet_id(tablet_id);
342
0
    RETURN_IF_ERROR(get_schema({tablet}));
343
344
0
    MonotonicStopWatch watch;
345
0
    watch.start();
346
0
    while (!_tablet_schema_for_index->contains(index_id) &&
347
0
           watch.elapsed_time() / 1000 / 1000 < timeout_ms) {
348
0
        RETURN_IF_ERROR(check_cancel());
349
0
        static_cast<void>(wait_for_new_schema(100));
350
0
    }
351
352
0
    if (!_tablet_schema_for_index->contains(index_id)) {
353
0
        return Status::TimedOut("timeout to get tablet schema for index {}", index_id);
354
0
    }
355
0
    return Status::OK();
356
0
}
357
358
462
Status LoadStreamStub::close_finish_check(RuntimeState* state, bool* is_closed) {
359
462
    DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", DBUG_BLOCK);
360
462
    DBUG_EXECUTE_IF("LoadStreamStub::close_finish_check.close_failed",
361
462
                    { return Status::InternalError("close failed"); });
362
462
    *is_closed = true;
363
462
    if (!_is_open.load()) {
364
        // we don't need to close wait on non-open streams
365
0
        return Status::OK();
366
0
    }
367
    // If stream is cancelled (e.g., due to connection failure), treat it as closed
368
    // to avoid waiting indefinitely for a stream that will never respond.
369
462
    if (_is_cancelled.load()) {
370
0
        return check_cancel();
371
0
    }
372
462
    if (state->get_query_ctx()->is_cancelled()) {
373
0
        return state->get_query_ctx()->exec_status();
374
0
    }
375
462
    if (!_is_closing.load()) {
376
0
        *is_closed = false;
377
0
        return _status;
378
0
    }
379
462
    if (_is_closed.load()) {
380
264
        RETURN_IF_ERROR(check_cancel());
381
264
        if (!_is_eos.load()) {
382
0
            return Status::InternalError("Stream closed without EOS, {}", to_string());
383
0
        }
384
264
        return Status::OK();
385
264
    }
386
198
    *is_closed = false;
387
198
    return Status::OK();
388
462
}
389
390
134
void LoadStreamStub::notify_close_wait() {
391
134
    _close_wait_notifier->notify_close_wait();
392
134
}
393
394
2
void LoadStreamStub::cancel(Status reason) {
395
2
    LOG(WARNING) << *this << " is cancelled because of " << reason;
396
2
    if (_is_open.load()) {
397
1
        brpc::StreamClose(_stream_id);
398
1
    }
399
2
    {
400
2
        std::lock_guard<bthread::Mutex> lock(_cancel_mutex);
401
2
        _cancel_st = reason;
402
2
        _is_cancelled.store(true);
403
2
    }
404
2
    _is_closed.store(true);
405
2
    notify_close_wait();
406
2
}
407
408
8.96k
Status LoadStreamStub::_encode_and_send(PStreamHeader& header, std::span<const Slice> data) {
409
8.96k
    butil::IOBuf buf;
410
8.96k
    size_t header_len = header.ByteSizeLong();
411
8.96k
    buf.append(reinterpret_cast<uint8_t*>(&header_len), sizeof(header_len));
412
8.96k
    buf.append(header.SerializeAsString());
413
8.96k
    size_t data_len = std::transform_reduce(data.begin(), data.end(), 0, std::plus(),
414
23.2k
                                            [](const Slice& s) { return s.get_size(); });
415
8.96k
    buf.append(reinterpret_cast<uint8_t*>(&data_len), sizeof(data_len));
416
23.2k
    for (const auto& slice : data) {
417
23.2k
        buf.append(slice.get_data(), slice.get_size());
418
23.2k
    }
419
8.96k
    bool eos = header.opcode() == doris::PStreamHeader::CLOSE_LOAD;
420
8.96k
    bool get_schema = header.opcode() == doris::PStreamHeader::GET_SCHEMA;
421
8.96k
    add_bytes_written(buf.size());
422
8.96k
    return _send_with_buffer(buf, eos || get_schema);
423
8.96k
}
424
425
8.96k
Status LoadStreamStub::_send_with_buffer(butil::IOBuf& buf, bool sync) {
426
8.96k
    butil::IOBuf output;
427
8.96k
    std::unique_lock<decltype(_buffer_mutex)> buffer_lock(_buffer_mutex);
428
8.96k
    _buffer.append(buf);
429
8.96k
    if (!sync && _buffer.size() < config::brpc_streaming_client_batch_bytes) {
430
8.84k
        return Status::OK();
431
8.84k
    }
432
126
    output.swap(_buffer);
433
    // acquire send lock while holding buffer lock, to ensure the message order
434
126
    std::lock_guard<decltype(_send_mutex)> send_lock(_send_mutex);
435
126
    buffer_lock.unlock();
436
18.4E
    VLOG_DEBUG << "send buf size : " << output.size() << ", sync: " << sync;
437
126
    auto st = _send_with_retry(output);
438
126
    if (!st.ok()) {
439
0
        _handle_failure(output, st);
440
0
    }
441
126
    return st;
442
8.96k
}
443
444
0
void LoadStreamStub::_handle_failure(butil::IOBuf& buf, Status st) {
445
0
    while (buf.size() > 0) {
446
        // step 1: parse header
447
0
        size_t hdr_len = 0;
448
0
        buf.cutn((void*)&hdr_len, sizeof(size_t));
449
0
        butil::IOBuf hdr_buf;
450
0
        PStreamHeader hdr;
451
0
        buf.cutn(&hdr_buf, hdr_len);
452
0
        butil::IOBufAsZeroCopyInputStream wrapper(hdr_buf);
453
0
        hdr.ParseFromZeroCopyStream(&wrapper);
454
455
        // step 2: cut data
456
0
        size_t data_len = 0;
457
0
        buf.cutn((void*)&data_len, sizeof(size_t));
458
0
        butil::IOBuf data_buf;
459
0
        buf.cutn(&data_buf, data_len);
460
461
        // step 3: handle failure
462
0
        switch (hdr.opcode()) {
463
0
        case PStreamHeader::ADD_SEGMENT:
464
0
        case PStreamHeader::APPEND_DATA: {
465
0
            DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.append_data_failed", {
466
0
                add_failed_tablet(hdr.tablet_id(), st);
467
0
                return;
468
0
            });
469
0
            DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.add_segment_failed", {
470
0
                add_failed_tablet(hdr.tablet_id(), st);
471
0
                return;
472
0
            });
473
0
            add_failed_tablet(hdr.tablet_id(), st);
474
0
        } break;
475
0
        case PStreamHeader::CLOSE_LOAD: {
476
0
            DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.close_load_failed", {
477
0
                brpc::StreamClose(_stream_id);
478
0
                return;
479
0
            });
480
0
            brpc::StreamClose(_stream_id);
481
0
        } break;
482
0
        case PStreamHeader::GET_SCHEMA: {
483
0
            DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.get_schema_failed", {
484
                // Just log and let wait_for_schema timeout
485
0
                std::ostringstream oss;
486
0
                for (const auto& tablet : hdr.tablets()) {
487
0
                    oss << " " << tablet.tablet_id();
488
0
                }
489
0
                LOG(WARNING) << "failed to send GET_SCHEMA request, tablet_id:" << oss.str() << ", "
490
0
                             << *this;
491
0
                return;
492
0
            });
493
            // Just log and let wait_for_schema timeout
494
0
            std::ostringstream oss;
495
0
            for (const auto& tablet : hdr.tablets()) {
496
0
                oss << " " << tablet.tablet_id();
497
0
            }
498
0
            LOG(WARNING) << "failed to send GET_SCHEMA request, tablet_id:" << oss.str() << ", "
499
0
                         << *this;
500
0
        } break;
501
0
        default:
502
0
            LOG(WARNING) << "unexpected stream message " << hdr.opcode() << ", " << *this;
503
0
            DCHECK(false);
504
0
        }
505
0
    }
506
0
}
507
508
132
Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) {
509
132
    for (;;) {
510
132
        RETURN_IF_ERROR(check_cancel());
511
132
        int ret;
512
132
        {
513
132
            DBUG_EXECUTE_IF("LoadStreamStub._send_with_retry.delay_before_send", {
514
132
                int64_t delay_ms = dp->param<int64_t>("delay_ms", 1000);
515
132
                bthread_usleep(delay_ms * 1000);
516
132
            });
517
132
            brpc::StreamWriteOptions options;
518
132
            options.write_in_background = config::enable_brpc_stream_write_background;
519
132
            ret = brpc::StreamWrite(_stream_id, buf, &options);
520
132
        }
521
132
        DBUG_EXECUTE_IF("LoadStreamStub._send_with_retry.stream_write_failed", { ret = EPIPE; });
522
132
        switch (ret) {
523
132
        case 0:
524
132
            return Status::OK();
525
0
        case EAGAIN: {
526
0
            const timespec time = butil::seconds_from_now(config::load_stream_eagain_wait_seconds);
527
0
            int wait_ret = brpc::StreamWait(_stream_id, &time);
528
0
            if (wait_ret != 0) {
529
0
                return Status::InternalError("StreamWait failed, err={}, {}", wait_ret,
530
0
                                             to_string());
531
0
            }
532
0
            break;
533
0
        }
534
0
        default:
535
0
            return Status::InternalError("StreamWrite failed, err={}, {}", ret, to_string());
536
132
        }
537
132
    }
538
132
}
539
540
void LoadStreamStub::_refresh_back_pressure_version_wait_time(
541
        const ::google::protobuf::RepeatedPtrField<::doris::PTabletLoadRowsetInfo>&
542
0
                tablet_load_infos) {
543
0
    int64_t max_rowset_num_gap = 0;
544
    // if any one tablet is under high load pressure, we would make the whole procedure
545
    // sleep to prevent the corresponding BE return -235
546
0
    std::for_each(
547
0
            tablet_load_infos.begin(), tablet_load_infos.end(),
548
0
            [&max_rowset_num_gap](auto& load_info) {
549
0
                int64_t cur_rowset_num = load_info.current_rowset_nums();
550
0
                int64_t high_load_point = load_info.max_config_rowset_nums() *
551
0
                                          (config::load_back_pressure_version_threshold / 100);
552
0
                DCHECK(cur_rowset_num > high_load_point);
553
0
                max_rowset_num_gap = std::max(max_rowset_num_gap, cur_rowset_num - high_load_point);
554
0
            });
555
    // to slow down the high load pressure
556
    // we would use the rowset num gap to calculate one sleep time
557
    // for example:
558
    // if the max tablet version is 2000, there are 3 BE
559
    // A: ====================  1800
560
    // B: ===================   1700
561
    // C: ==================    1600
562
    //    ==================    1600
563
    //                      ^
564
    //                      the high load point
565
    // then then max gap is 1800 - (max tablet version * config::load_back_pressure_version_threshold / 100) = 200,
566
    // we would make the whole send procesure sleep
567
    // 1200ms for compaction to be done toe reduce the high pressure
568
0
    auto max_time = config::max_load_back_pressure_version_wait_time_ms;
569
0
    if (UNLIKELY(max_rowset_num_gap > 0)) {
570
0
        _load_back_pressure_version_wait_time_ms.store(
571
0
                std::min(max_rowset_num_gap + 1000, max_time));
572
0
        LOG(INFO) << "try to back pressure version, wait time(ms): "
573
0
                  << _load_back_pressure_version_wait_time_ms << ", load id: " << print_id(_load_id)
574
0
                  << ", max_rowset_num_gap: " << max_rowset_num_gap;
575
0
    }
576
0
}
577
578
0
std::string LoadStreamStub::to_string() {
579
0
    std::ostringstream ss;
580
0
    ss << *this;
581
0
    return ss.str();
582
0
}
583
584
134
inline std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub& stub) {
585
134
    ostr << "LoadStreamStub load_id=" << print_id(stub._load_id) << ", src_id=" << stub._src_id
586
134
         << ", dst_id=" << stub._dst_id << ", stream_id=" << stub._stream_id;
587
134
    return ostr;
588
134
}
589
590
Status LoadStreamStubs::open(BrpcClientCache<PBackendService_Stub>* client_cache,
591
                             const NodeInfo& node_info, int64_t txn_id,
592
                             const OlapTableSchemaParam& schema,
593
                             const std::vector<PTabletID>& tablets_for_schema, int total_streams,
594
66
                             int64_t idle_timeout_ms, bool enable_profile) {
595
66
    bool get_schema = true;
596
66
    auto status = Status::OK();
597
66
    bool first_stream = true;
598
132
    for (auto& stream : _streams) {
599
132
        Status st;
600
132
        if (get_schema) {
601
66
            st = stream->open(client_cache, node_info, txn_id, schema, tablets_for_schema,
602
66
                              total_streams, idle_timeout_ms, enable_profile);
603
66
        } else {
604
66
            st = stream->open(client_cache, node_info, txn_id, schema, {}, total_streams,
605
66
                              idle_timeout_ms, enable_profile);
606
66
        }
607
        // Simulate one stream open failure within LoadStreamStubs.
608
        // This causes the successfully opened streams to be cancelled,
609
        // reproducing the bug where cancelled streams cause close_wait timeout.
610
132
        DBUG_EXECUTE_IF("LoadStreamStubs.open.fail_one_stream", {
611
132
            if (st.ok() && !first_stream) {
612
132
                st = Status::InternalError("Injected stream open failure");
613
132
            }
614
132
        });
615
132
        if (st.ok()) {
616
132
            get_schema = false;
617
132
        } else {
618
0
            LOG(WARNING) << "open stream failed: " << st << "; stream: " << *stream;
619
0
            status = st;
620
            // no break here to try get schema from the rest streams
621
0
        }
622
132
        first_stream = false;
623
132
    }
624
    // only mark open when all streams open success
625
66
    _open_success.store(status.ok());
626
    // cancel all streams if open failed
627
66
    if (!status.ok()) {
628
0
        cancel(status);
629
0
    }
630
66
    return status;
631
66
}
632
633
Status LoadStreamStubs::close_load(const std::vector<PTabletID>& tablets_to_commit,
634
66
                                   int num_incremental_streams) {
635
66
    if (!_open_success.load()) {
636
0
        return Status::InternalError("streams not open");
637
0
    }
638
66
    bool first = true;
639
66
    auto status = Status::OK();
640
132
    for (auto& stream : _streams) {
641
132
        Status st;
642
132
        if (first) {
643
66
            st = stream->close_load(tablets_to_commit, num_incremental_streams);
644
66
            first = false;
645
66
        } else {
646
66
            st = stream->close_load({}, num_incremental_streams);
647
66
        }
648
132
        if (!st.ok()) {
649
            LOG(WARNING) << "close_load failed: " << st << "; stream: " << *stream;
650
0
        }
651
132
    }
652
66
    return status;
653
66
}
654
655
} // namespace doris