Coverage Report

Created: 2025-04-15 11:51

/root/doris/be/src/runtime/load_stream.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/load_stream.h"
19
20
#include <brpc/stream.h>
21
#include <bthread/bthread.h>
22
#include <bthread/condition_variable.h>
23
#include <bthread/mutex.h>
24
#include <olap/rowset/rowset_factory.h>
25
#include <olap/rowset/rowset_meta.h>
26
#include <olap/storage_engine.h>
27
#include <olap/tablet_manager.h>
28
#include <runtime/exec_env.h>
29
30
#include <memory>
31
#include <sstream>
32
33
#include "bvar/bvar.h"
34
#include "common/signal_handler.h"
35
#include "exec/tablet_info.h"
36
#include "gutil/ref_counted.h"
37
#include "olap/tablet_fwd.h"
38
#include "olap/tablet_schema.h"
39
#include "runtime/fragment_mgr.h"
40
#include "runtime/load_channel.h"
41
#include "runtime/load_stream_mgr.h"
42
#include "runtime/load_stream_writer.h"
43
#include "runtime/workload_group/workload_group_manager.h"
44
#include "util/debug_points.h"
45
#include "util/runtime_profile.h"
46
#include "util/thrift_util.h"
47
#include "util/uid_util.h"
48
49
#define UNKNOWN_ID_FOR_TEST 0x7c00
50
51
namespace doris {
52
53
bvar::Adder<int64_t> g_load_stream_cnt("load_stream_count");
54
bvar::LatencyRecorder g_load_stream_flush_wait_ms("load_stream_flush_wait_ms");
55
bvar::Adder<int> g_load_stream_flush_running_threads("load_stream_flush_wait_threads");
56
57
TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id,
58
                           LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile)
59
        : _id(id),
60
          _next_segid(0),
61
          _load_id(load_id),
62
          _txn_id(txn_id),
63
14
          _load_stream_mgr(load_stream_mgr) {
64
14
    load_stream_mgr->create_token(_flush_token);
65
14
    _profile = profile->create_child(fmt::format("TabletStream {}", id), true, true);
66
14
    _append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
67
14
    _add_segment_timer = ADD_TIMER(_profile, "AddSegmentTime");
68
14
    _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
69
14
}
70
71
10
inline std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream) {
72
10
    ostr << "load_id=" << tablet_stream._load_id << ", txn_id=" << tablet_stream._txn_id
73
10
         << ", tablet_id=" << tablet_stream._id << ", status=" << tablet_stream._status.status();
74
10
    return ostr;
75
10
}
76
77
Status TabletStream::init(std::shared_ptr<OlapTableSchemaParam> schema, int64_t index_id,
78
14
                          int64_t partition_id) {
79
14
    WriteRequest req {
80
14
            .tablet_id = _id,
81
14
            .txn_id = _txn_id,
82
14
            .index_id = index_id,
83
14
            .partition_id = partition_id,
84
14
            .load_id = _load_id,
85
14
            .table_schema_param = schema,
86
            // TODO(plat1ko): write_file_cache
87
14
    };
88
89
14
    _load_stream_writer = std::make_shared<LoadStreamWriter>(&req, _profile);
90
14
    DBUG_EXECUTE_IF("TabletStream.init.uninited_writer", {
91
14
        _status.update(Status::Uninitialized("fault injection"));
92
14
        return _status.status();
93
14
    });
94
14
    _status.update(_load_stream_writer->init());
95
14
    if (!_status.ok()) {
96
1
        LOG(INFO) << "failed to init rowset builder due to " << *this;
97
1
    }
98
14
    return _status.status();
99
14
}
100
101
27
Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data) {
102
27
    if (!_status.ok()) {
103
1
        return _status.status();
104
1
    }
105
106
    // dispatch add_segment request
107
26
    if (header.opcode() == PStreamHeader::ADD_SEGMENT) {
108
0
        return add_segment(header, data);
109
0
    }
110
111
26
    SCOPED_TIMER(_append_data_timer);
112
113
26
    int64_t src_id = header.src_id();
114
26
    uint32_t segid = header.segment_id();
115
    // Ensure there are enough space and mapping are built.
116
26
    SegIdMapping* mapping = nullptr;
117
26
    {
118
26
        std::lock_guard lock_guard(_lock);
119
26
        if (!_segids_mapping.contains(src_id)) {
120
14
            _segids_mapping[src_id] = std::make_unique<SegIdMapping>();
121
14
        }
122
26
        mapping = _segids_mapping[src_id].get();
123
26
    }
124
26
    if (segid + 1 > mapping->size()) {
125
        // TODO: Each sender lock is enough.
126
15
        std::lock_guard lock_guard(_lock);
127
15
        ssize_t origin_size = mapping->size();
128
15
        if (segid + 1 > origin_size) {
129
15
            mapping->resize(segid + 1, std::numeric_limits<uint32_t>::max());
130
39
            for (size_t index = origin_size; index <= segid; index++) {
131
24
                mapping->at(index) = _next_segid;
132
24
                _next_segid++;
133
24
                VLOG_DEBUG << "src_id=" << src_id << ", segid=" << index << " to "
134
0
                           << " segid=" << _next_segid - 1 << ", " << *this;
135
24
            }
136
15
        }
137
15
    }
138
139
    // Each sender sends data in one segment sequential, so we also do not
140
    // need a lock here.
141
26
    bool eos = header.segment_eos();
142
26
    uint32_t new_segid = mapping->at(segid);
143
26
    DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
144
26
    butil::IOBuf buf = data->movable();
145
26
    auto flush_func = [this, new_segid, eos, buf, header]() {
146
26
        signal::set_signal_task_id(_load_id);
147
26
        g_load_stream_flush_running_threads << -1;
148
26
        auto st = _load_stream_writer->append_data(new_segid, header.offset(), buf);
149
26
        if (eos && st.ok()) {
150
21
            st = _load_stream_writer->close_segment(new_segid);
151
21
        }
152
26
        if (!st.ok()) {
153
2
            _status.update(st);
154
2
            LOG(WARNING) << "write data failed " << st << ", " << *this;
155
2
        }
156
26
    };
157
26
    auto load_stream_flush_token_max_tasks = config::load_stream_flush_token_max_tasks;
158
26
    auto load_stream_max_wait_flush_token_time_ms =
159
26
            config::load_stream_max_wait_flush_token_time_ms;
160
26
    DBUG_EXECUTE_IF("TabletStream.append_data.long_wait", {
161
26
        load_stream_flush_token_max_tasks = 0;
162
26
        load_stream_max_wait_flush_token_time_ms = 1000;
163
26
    });
164
26
    MonotonicStopWatch timer;
165
26
    timer.start();
166
26
    while (_flush_token->num_tasks() >= load_stream_flush_token_max_tasks) {
167
0
        if (timer.elapsed_time() / 1000 / 1000 >= load_stream_max_wait_flush_token_time_ms) {
168
0
            _status.update(
169
0
                    Status::Error<true>("wait flush token back pressure time is more than "
170
0
                                        "load_stream_max_wait_flush_token_time {}",
171
0
                                        load_stream_max_wait_flush_token_time_ms));
172
0
            return _status.status();
173
0
        }
174
0
        bthread_usleep(2 * 1000); // 2ms
175
0
    }
176
26
    timer.stop();
177
26
    int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
178
26
    g_load_stream_flush_wait_ms << time_ms;
179
26
    g_load_stream_flush_running_threads << 1;
180
26
    auto st = _flush_token->submit_func(flush_func);
181
26
    if (!st.ok()) {
182
0
        _status.update(st);
183
0
    }
184
26
    return _status.status();
185
26
}
186
187
0
Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data) {
188
0
    if (!_status.ok()) {
189
0
        return _status.status();
190
0
    }
191
192
0
    SCOPED_TIMER(_add_segment_timer);
193
0
    DCHECK(header.has_segment_statistics());
194
0
    SegmentStatistics stat(header.segment_statistics());
195
0
    TabletSchemaSPtr flush_schema;
196
0
    if (header.has_flush_schema()) {
197
0
        flush_schema = std::make_shared<TabletSchema>();
198
0
        flush_schema->init_from_pb(header.flush_schema());
199
0
    }
200
201
0
    int64_t src_id = header.src_id();
202
0
    uint32_t segid = header.segment_id();
203
0
    uint32_t new_segid;
204
0
    DBUG_EXECUTE_IF("TabletStream.add_segment.unknown_segid", { segid = UNKNOWN_ID_FOR_TEST; });
205
0
    {
206
0
        std::lock_guard lock_guard(_lock);
207
0
        if (!_segids_mapping.contains(src_id)) {
208
0
            _status.update(Status::InternalError(
209
0
                    "add segment failed, no segment written by this src be yet, src_id={}, "
210
0
                    "segment_id={}",
211
0
                    src_id, segid));
212
0
            return _status.status();
213
0
        }
214
0
        if (segid >= _segids_mapping[src_id]->size()) {
215
0
            _status.update(Status::InternalError(
216
0
                    "add segment failed, segment is never written, src_id={}, segment_id={}",
217
0
                    src_id, segid));
218
0
            return _status.status();
219
0
        }
220
0
        new_segid = _segids_mapping[src_id]->at(segid);
221
0
    }
222
0
    DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
223
224
0
    auto add_segment_func = [this, new_segid, stat, flush_schema]() {
225
0
        signal::set_signal_task_id(_load_id);
226
0
        auto st = _load_stream_writer->add_segment(new_segid, stat, flush_schema);
227
0
        if (!st.ok()) {
228
0
            _status.update(st);
229
0
            LOG(INFO) << "add segment failed " << *this;
230
0
        }
231
0
    };
232
0
    auto st = _flush_token->submit_func(add_segment_func);
233
0
    if (!st.ok()) {
234
0
        _status.update(st);
235
0
    }
236
0
    return _status.status();
237
0
}
238
239
30
Status TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) {
240
30
    bthread::Mutex mu;
241
30
    std::unique_lock<bthread::Mutex> lock(mu);
242
30
    bthread::ConditionVariable cv;
243
30
    auto st = Status::OK();
244
30
    auto func = [this, &mu, &cv, &st, &fn] {
245
30
        signal::set_signal_task_id(_load_id);
246
30
        st = fn();
247
30
        std::lock_guard<bthread::Mutex> lock(mu);
248
30
        cv.notify_one();
249
30
    };
250
30
    bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(func);
251
30
    if (!ret) {
252
0
        return Status::Error<ErrorCode::INTERNAL_ERROR>(
253
0
                "there is not enough thread resource for close load");
254
0
    }
255
30
    cv.wait(lock);
256
30
    return st;
257
30
}
258
259
14
void TabletStream::pre_close() {
260
14
    if (!_status.ok()) {
261
1
        return;
262
1
    }
263
264
13
    SCOPED_TIMER(_close_wait_timer);
265
13
    _status.update(_run_in_heavy_work_pool([this]() {
266
13
        _flush_token->wait();
267
13
        return Status::OK();
268
13
    }));
269
    // it is necessary to check status after wait_func,
270
    // for create_rowset could fail during add_segment when loading to MOW table,
271
    // in this case, should skip close to avoid submit_calc_delete_bitmap_task which could cause coredump.
272
13
    if (!_status.ok()) {
273
2
        return;
274
2
    }
275
276
11
    if (_check_num_segments && (_next_segid.load() != _num_segments)) {
277
2
        _status.update(Status::Corruption(
278
2
                "segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id,
279
2
                _num_segments, _next_segid.load(), print_id(_load_id)));
280
2
        return;
281
2
    }
282
283
9
    _status.update(_run_in_heavy_work_pool([this]() { return _load_stream_writer->pre_close(); }));
284
9
}
285
286
14
Status TabletStream::close() {
287
14
    if (!_status.ok()) {
288
6
        return _status.status();
289
6
    }
290
291
8
    SCOPED_TIMER(_close_wait_timer);
292
8
    _status.update(_run_in_heavy_work_pool([this]() { return _load_stream_writer->close(); }));
293
8
    return _status.status();
294
14
}
295
296
IndexStream::IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id,
297
                         std::shared_ptr<OlapTableSchemaParam> schema,
298
                         LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile)
299
        : _id(id),
300
          _load_id(load_id),
301
          _txn_id(txn_id),
302
          _schema(schema),
303
28
          _load_stream_mgr(load_stream_mgr) {
304
28
    _profile = profile->create_child(fmt::format("IndexStream {}", id), true, true);
305
28
    _append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
306
28
    _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
307
28
}
308
309
27
Status IndexStream::append_data(const PStreamHeader& header, butil::IOBuf* data) {
310
27
    SCOPED_TIMER(_append_data_timer);
311
27
    int64_t tablet_id = header.tablet_id();
312
27
    TabletStreamSharedPtr tablet_stream;
313
27
    {
314
27
        std::lock_guard lock_guard(_lock);
315
27
        auto it = _tablet_streams_map.find(tablet_id);
316
27
        if (it == _tablet_streams_map.end()) {
317
13
            _init_tablet_stream(tablet_stream, tablet_id, header.partition_id());
318
14
        } else {
319
14
            tablet_stream = it->second;
320
14
        }
321
27
    }
322
323
27
    return tablet_stream->append_data(header, data);
324
27
}
325
326
void IndexStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id,
327
14
                                      int64_t partition_id) {
328
14
    tablet_stream = std::make_shared<TabletStream>(_load_id, tablet_id, _txn_id, _load_stream_mgr,
329
14
                                                   _profile);
330
14
    _tablet_streams_map[tablet_id] = tablet_stream;
331
14
    auto st = tablet_stream->init(_schema, _id, partition_id);
332
14
    if (!st.ok()) {
333
1
        LOG(WARNING) << "tablet stream init failed " << *tablet_stream;
334
1
    }
335
14
}
336
337
void IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
338
28
                        std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) {
339
28
    std::lock_guard lock_guard(_lock);
340
28
    SCOPED_TIMER(_close_wait_timer);
341
    // open all need commit tablets
342
34
    for (const auto& tablet : tablets_to_commit) {
343
34
        if (_id != tablet.index_id()) {
344
18
            continue;
345
18
        }
346
16
        TabletStreamSharedPtr tablet_stream;
347
16
        auto it = _tablet_streams_map.find(tablet.tablet_id());
348
16
        if (it == _tablet_streams_map.end()) {
349
1
            _init_tablet_stream(tablet_stream, tablet.tablet_id(), tablet.partition_id());
350
15
        } else {
351
15
            tablet_stream = it->second;
352
15
        }
353
16
        if (tablet.has_num_segments()) {
354
16
            tablet_stream->add_num_segments(tablet.num_segments());
355
16
        } else {
356
            // for compatibility reasons (sink from old version BE)
357
0
            tablet_stream->disable_num_segments_check();
358
0
        }
359
16
    }
360
361
28
    for (auto& [_, tablet_stream] : _tablet_streams_map) {
362
14
        tablet_stream->pre_close();
363
14
    }
364
365
28
    for (auto& [_, tablet_stream] : _tablet_streams_map) {
366
14
        auto st = tablet_stream->close();
367
14
        if (st.ok()) {
368
8
            success_tablet_ids->push_back(tablet_stream->id());
369
8
        } else {
370
6
            LOG(INFO) << "close tablet stream " << *tablet_stream << ", status=" << st;
371
6
            failed_tablets->emplace_back(tablet_stream->id(), st);
372
6
        }
373
14
    }
374
28
}
375
376
// TODO: Profile is temporary disabled, because:
377
// 1. It's not being processed by the upstream for now
378
// 2. There are some problems in _profile->to_thrift()
379
LoadStream::LoadStream(PUniqueId load_id, LoadStreamMgr* load_stream_mgr, bool enable_profile)
380
14
        : _load_id(load_id), _enable_profile(false), _load_stream_mgr(load_stream_mgr) {
381
14
    g_load_stream_cnt << 1;
382
14
    _profile = std::make_unique<RuntimeProfile>("LoadStream");
383
14
    _append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
384
14
    _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
385
14
    TUniqueId load_tid = ((UniqueId)load_id).to_thrift();
386
#ifndef BE_TEST
387
    std::shared_ptr<QueryContext> query_context =
388
            ExecEnv::GetInstance()->fragment_mgr()->get_query_context(load_tid);
389
    if (query_context != nullptr) {
390
        _query_thread_context = {load_tid, query_context->query_mem_tracker,
391
                                 query_context->workload_group()};
392
    } else {
393
        _query_thread_context = {load_tid, MemTrackerLimiter::create_shared(
394
                                                   MemTrackerLimiter::Type::LOAD,
395
                                                   fmt::format("(FromLoadStream)Load#Id={}",
396
                                                               ((UniqueId)load_id).to_string()))};
397
    }
398
#else
399
14
    _query_thread_context = {load_tid, MemTrackerLimiter::create_shared(
400
14
                                               MemTrackerLimiter::Type::LOAD,
401
14
                                               fmt::format("(FromLoadStream)Load#Id={}",
402
14
                                                           ((UniqueId)load_id).to_string()))};
403
14
#endif
404
14
}
405
406
14
LoadStream::~LoadStream() {
407
14
    g_load_stream_cnt << -1;
408
14
    LOG(INFO) << "load stream is deconstructed " << *this;
409
14
}
410
411
14
Status LoadStream::init(const POpenLoadStreamRequest* request) {
412
14
    _txn_id = request->txn_id();
413
14
    _total_streams = request->total_streams();
414
14
    _is_incremental = (_total_streams == 0);
415
416
14
    _schema = std::make_shared<OlapTableSchemaParam>();
417
14
    RETURN_IF_ERROR(_schema->init(request->schema()));
418
28
    for (auto& index : request->schema().indexes()) {
419
28
        _index_streams_map[index.id()] = std::make_shared<IndexStream>(
420
28
                _load_id, index.id(), _txn_id, _schema, _load_stream_mgr, _profile.get());
421
28
    }
422
14
    LOG(INFO) << "succeed to init load stream " << *this;
423
14
    return Status::OK();
424
14
}
425
426
void LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
427
16
                       std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) {
428
16
    std::lock_guard<bthread::Mutex> lock_guard(_lock);
429
16
    SCOPED_TIMER(_close_wait_timer);
430
431
    // we do nothing until recv CLOSE_LOAD from all stream to ensure all data are handled before ack
432
16
    _open_streams[src_id]--;
433
16
    if (_open_streams[src_id] == 0) {
434
16
        _open_streams.erase(src_id);
435
16
    }
436
16
    _close_load_cnt++;
437
16
    LOG(INFO) << "received CLOSE_LOAD from sender " << src_id << ", remaining "
438
16
              << _total_streams - _close_load_cnt << " senders, " << *this;
439
440
16
    _tablets_to_commit.insert(_tablets_to_commit.end(), tablets_to_commit.begin(),
441
16
                              tablets_to_commit.end());
442
443
16
    if (_close_load_cnt < _total_streams) {
444
        // do not return commit info if there is remaining streams.
445
2
        return;
446
2
    }
447
448
28
    for (auto& [_, index_stream] : _index_streams_map) {
449
28
        index_stream->close(_tablets_to_commit, success_tablet_ids, failed_tablets);
450
28
    }
451
14
    LOG(INFO) << "close load " << *this << ", success_tablet_num=" << success_tablet_ids->size()
452
14
              << ", failed_tablet_num=" << failed_tablets->size();
453
14
}
454
455
void LoadStream::_report_result(StreamId stream, const Status& status,
456
                                const std::vector<int64_t>& success_tablet_ids,
457
36
                                const FailedTablets& failed_tablets, bool eos) {
458
36
    LOG(INFO) << "report result " << *this << ", success tablet num " << success_tablet_ids.size()
459
36
              << ", failed tablet num " << failed_tablets.size();
460
36
    butil::IOBuf buf;
461
36
    PLoadStreamResponse response;
462
36
    response.set_eos(eos);
463
36
    status.to_protobuf(response.mutable_status());
464
36
    for (auto& id : success_tablet_ids) {
465
8
        response.add_success_tablet_ids(id);
466
8
    }
467
36
    for (auto& [id, st] : failed_tablets) {
468
10
        auto pb = response.add_failed_tablets();
469
10
        pb->set_id(id);
470
10
        st.to_protobuf(pb->mutable_status());
471
10
    }
472
473
36
    if (_enable_profile && _close_load_cnt == _total_streams) {
474
0
        TRuntimeProfileTree tprofile;
475
0
        ThriftSerializer ser(false, 4096);
476
0
        uint8_t* buf = nullptr;
477
0
        uint32_t len = 0;
478
0
        std::unique_lock<bthread::Mutex> l(_lock);
479
480
0
        _profile->to_thrift(&tprofile);
481
0
        auto st = ser.serialize(&tprofile, &len, &buf);
482
0
        if (st.ok()) {
483
0
            response.set_load_stream_profile(buf, len);
484
0
        } else {
485
0
            LOG(WARNING) << "TRuntimeProfileTree serialize failed, errmsg=" << st << ", " << *this;
486
0
        }
487
0
    }
488
489
36
    buf.append(response.SerializeAsString());
490
36
    auto wst = _write_stream(stream, buf);
491
36
    if (!wst.ok()) {
492
0
        LOG(WARNING) << " report result failed with " << wst << ", " << *this;
493
0
    }
494
36
}
495
496
0
void LoadStream::_report_schema(StreamId stream, const PStreamHeader& hdr) {
497
0
    butil::IOBuf buf;
498
0
    PLoadStreamResponse response;
499
0
    Status st = Status::OK();
500
0
    for (const auto& req : hdr.tablets()) {
501
0
        TabletManager* tablet_mgr = StorageEngine::instance()->tablet_manager();
502
0
        TabletSharedPtr tablet = tablet_mgr->get_tablet(req.tablet_id());
503
0
        if (tablet == nullptr) {
504
0
            st = Status::NotFound("Tablet {} not found", req.tablet_id());
505
0
            break;
506
0
        }
507
0
        auto resp = response.add_tablet_schemas();
508
0
        resp->set_index_id(req.index_id());
509
0
        resp->set_enable_unique_key_merge_on_write(tablet->enable_unique_key_merge_on_write());
510
0
        tablet->tablet_schema()->to_schema_pb(resp->mutable_tablet_schema());
511
0
    }
512
0
    st.to_protobuf(response.mutable_status());
513
514
0
    buf.append(response.SerializeAsString());
515
0
    auto wst = _write_stream(stream, buf);
516
0
    if (!wst.ok()) {
517
0
        LOG(WARNING) << " report result failed with " << wst << ", " << *this;
518
0
    }
519
0
}
520
521
36
Status LoadStream::_write_stream(StreamId stream, butil::IOBuf& buf) {
522
36
    for (;;) {
523
36
        int ret = brpc::StreamWrite(stream, buf);
524
36
        switch (ret) {
525
36
        case 0:
526
36
            return Status::OK();
527
0
        case EAGAIN: {
528
0
            const timespec time = butil::seconds_from_now(config::load_stream_eagain_wait_seconds);
529
0
            int wait_ret = brpc::StreamWait(stream, &time);
530
0
            if (wait_ret != 0) {
531
0
                return Status::InternalError("StreamWait failed, err={}", wait_ret);
532
0
            }
533
0
            break;
534
0
        }
535
0
        default:
536
0
            return Status::InternalError("StreamWrite failed, err={}", ret);
537
36
        }
538
36
    }
539
0
    return Status::OK();
540
36
}
541
542
62
void LoadStream::_parse_header(butil::IOBuf* const message, PStreamHeader& hdr) {
543
62
    butil::IOBufAsZeroCopyInputStream wrapper(*message);
544
62
    hdr.ParseFromZeroCopyStream(&wrapper);
545
62
    VLOG_DEBUG << "header parse result: " << hdr.DebugString();
546
62
}
547
548
28
Status LoadStream::_append_data(const PStreamHeader& header, butil::IOBuf* data) {
549
28
    SCOPED_TIMER(_append_data_timer);
550
28
    IndexStreamSharedPtr index_stream;
551
552
28
    int64_t index_id = header.index_id();
553
28
    DBUG_EXECUTE_IF("TabletStream._append_data.unknown_indexid",
554
28
                    { index_id = UNKNOWN_ID_FOR_TEST; });
555
28
    auto it = _index_streams_map.find(index_id);
556
28
    if (it == _index_streams_map.end()) {
557
1
        return Status::Error<ErrorCode::INVALID_ARGUMENT>("unknown index_id {}", index_id);
558
27
    } else {
559
27
        index_stream = it->second;
560
27
    }
561
562
27
    return index_stream->append_data(header, data);
563
28
}
564
565
49
int LoadStream::on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) {
566
49
    VLOG_DEBUG << "on_received_messages " << id << " " << size;
567
111
    for (size_t i = 0; i < size; ++i) {
568
124
        while (messages[i]->size() > 0) {
569
            // step 1: parse header
570
62
            size_t hdr_len = 0;
571
62
            messages[i]->cutn((void*)&hdr_len, sizeof(size_t));
572
62
            butil::IOBuf hdr_buf;
573
62
            PStreamHeader hdr;
574
62
            messages[i]->cutn(&hdr_buf, hdr_len);
575
62
            _parse_header(&hdr_buf, hdr);
576
577
            // step 2: cut data
578
62
            size_t data_len = 0;
579
62
            messages[i]->cutn((void*)&data_len, sizeof(size_t));
580
62
            butil::IOBuf data_buf;
581
62
            PStreamHeader data;
582
62
            messages[i]->cutn(&data_buf, data_len);
583
584
            // step 3: dispatch
585
62
            _dispatch(id, hdr, &data_buf);
586
62
        }
587
62
    }
588
49
    return 0;
589
49
}
590
591
62
void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* data) {
592
62
    VLOG_DEBUG << PStreamHeader_Opcode_Name(hdr.opcode()) << " from " << hdr.src_id()
593
0
               << " with tablet " << hdr.tablet_id();
594
62
    SCOPED_ATTACH_TASK(_query_thread_context);
595
    // CLOSE_LOAD message should not be fault injected,
596
    // otherwise the message will be ignored and causing close wait timeout
597
62
    if (hdr.opcode() != PStreamHeader::CLOSE_LOAD) {
598
30
        DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_loadid", {
599
30
            PUniqueId& load_id = const_cast<PUniqueId&>(hdr.load_id());
600
30
            load_id.set_hi(UNKNOWN_ID_FOR_TEST);
601
30
            load_id.set_lo(UNKNOWN_ID_FOR_TEST);
602
30
        });
603
30
        DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_srcid", {
604
30
            PStreamHeader& t_hdr = const_cast<PStreamHeader&>(hdr);
605
30
            t_hdr.set_src_id(UNKNOWN_ID_FOR_TEST);
606
30
        });
607
30
    }
608
62
    if (UniqueId(hdr.load_id()) != UniqueId(_load_id)) {
609
1
        Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>(
610
1
                "invalid load id {}, expected {}", print_id(hdr.load_id()), print_id(_load_id));
611
1
        _report_failure(id, st, hdr);
612
1
        return;
613
1
    }
614
615
61
    {
616
61
        std::lock_guard lock_guard(_lock);
617
61
        if (!_open_streams.contains(hdr.src_id())) {
618
17
            Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>("no open stream from source {}",
619
17
                                                                   hdr.src_id());
620
17
            _report_failure(id, st, hdr);
621
17
            return;
622
17
        }
623
61
    }
624
625
44
    switch (hdr.opcode()) {
626
0
    case PStreamHeader::ADD_SEGMENT: // ADD_SEGMENT will be dispatched inside TabletStream
627
28
    case PStreamHeader::APPEND_DATA: {
628
28
        auto st = _append_data(hdr, data);
629
28
        if (!st.ok()) {
630
2
            _report_failure(id, st, hdr);
631
2
        }
632
28
    } break;
633
16
    case PStreamHeader::CLOSE_LOAD: {
634
16
        std::vector<int64_t> success_tablet_ids;
635
16
        FailedTablets failed_tablets;
636
16
        std::vector<PTabletID> tablets_to_commit(hdr.tablets().begin(), hdr.tablets().end());
637
16
        close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets);
638
16
        _report_result(id, Status::OK(), success_tablet_ids, failed_tablets, true);
639
16
        brpc::StreamClose(id);
640
16
    } break;
641
0
    case PStreamHeader::GET_SCHEMA: {
642
0
        _report_schema(id, hdr);
643
0
    } break;
644
0
    default:
645
0
        LOG(WARNING) << "unexpected stream message " << hdr.opcode() << ", " << *this;
646
0
        DCHECK(false);
647
44
    }
648
44
}
649
650
0
void LoadStream::on_idle_timeout(StreamId id) {
651
0
    LOG(WARNING) << "closing load stream on idle timeout, " << *this;
652
0
    brpc::StreamClose(id);
653
0
}
654
655
16
void LoadStream::on_closed(StreamId id) {
656
    // `this` may be freed by other threads after increasing `_close_rpc_cnt`,
657
    // format string first to prevent use-after-free
658
16
    std::stringstream ss;
659
16
    ss << *this;
660
16
    auto remaining_streams = _total_streams - _close_rpc_cnt.fetch_add(1) - 1;
661
16
    LOG(INFO) << "stream " << id << " on_closed, remaining streams = " << remaining_streams << ", "
662
16
              << ss.str();
663
16
    if (remaining_streams == 0) {
664
14
        _load_stream_mgr->clear_load(_load_id);
665
14
    }
666
16
}
667
668
110
inline std::ostream& operator<<(std::ostream& ostr, const LoadStream& load_stream) {
669
110
    ostr << "load_id=" << print_id(load_stream._load_id) << ", txn_id=" << load_stream._txn_id;
670
110
    return ostr;
671
110
}
672
673
} // namespace doris