Coverage Report

Created: 2026-01-24 22:32

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/be/src/runtime/load_stream.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/load_stream.h"
19
20
#include <brpc/stream.h>
21
#include <bthread/bthread.h>
22
#include <bthread/condition_variable.h>
23
#include <bthread/mutex.h>
24
#include <olap/rowset/rowset_factory.h>
25
#include <olap/rowset/rowset_meta.h>
26
#include <olap/storage_engine.h>
27
#include <olap/tablet_manager.h>
28
#include <runtime/exec_env.h>
29
30
#include <memory>
31
#include <sstream>
32
33
#include "bvar/bvar.h"
34
#include "cloud/config.h"
35
#include "common/signal_handler.h"
36
#include "exec/tablet_info.h"
37
#include "olap/delta_writer.h"
38
#include "olap/tablet.h"
39
#include "olap/tablet_fwd.h"
40
#include "olap/tablet_schema.h"
41
#include "runtime/exec_env.h"
42
#include "runtime/fragment_mgr.h"
43
#include "runtime/load_channel.h"
44
#include "runtime/load_stream_mgr.h"
45
#include "runtime/load_stream_writer.h"
46
#include "runtime/workload_group/workload_group_manager.h"
47
#include "util/debug_points.h"
48
#include "util/runtime_profile.h"
49
#include "util/thrift_util.h"
50
#include "util/uid_util.h"
51
52
#define UNKNOWN_ID_FOR_TEST 0x7c00
53
54
namespace doris {
55
#include "common/compile_check_begin.h"
56
57
bvar::Adder<int64_t> g_load_stream_cnt("load_stream_count");
58
bvar::LatencyRecorder g_load_stream_flush_wait_ms("load_stream_flush_wait_ms");
59
bvar::Adder<int> g_load_stream_flush_running_threads("load_stream_flush_wait_threads");
60
61
TabletStream::TabletStream(const PUniqueId& load_id, int64_t id, int64_t txn_id,
62
                           LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile)
63
14
        : _id(id),
64
14
          _next_segid(0),
65
14
          _load_id(load_id),
66
14
          _txn_id(txn_id),
67
14
          _load_stream_mgr(load_stream_mgr) {
68
14
    load_stream_mgr->create_token(_flush_token);
69
14
    _profile = profile->create_child(fmt::format("TabletStream {}", id), true, true);
70
14
    _append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
71
14
    _add_segment_timer = ADD_TIMER(_profile, "AddSegmentTime");
72
14
    _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
73
14
}
74
75
10
inline std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream) {
76
10
    ostr << "load_id=" << print_id(tablet_stream._load_id) << ", txn_id=" << tablet_stream._txn_id
77
10
         << ", tablet_id=" << tablet_stream._id << ", status=" << tablet_stream._status.status();
78
10
    return ostr;
79
10
}
80
81
Status TabletStream::init(std::shared_ptr<OlapTableSchemaParam> schema, int64_t index_id,
82
14
                          int64_t partition_id) {
83
14
    WriteRequest req {
84
14
            .tablet_id = _id,
85
14
            .txn_id = _txn_id,
86
14
            .index_id = index_id,
87
14
            .partition_id = partition_id,
88
14
            .load_id = _load_id,
89
14
            .table_schema_param = schema,
90
            // TODO(plat1ko): write_file_cache
91
14
            .storage_vault_id {},
92
14
    };
93
94
14
    _load_stream_writer = std::make_shared<LoadStreamWriter>(&req, _profile);
95
14
    DBUG_EXECUTE_IF("TabletStream.init.uninited_writer", {
96
14
        _status.update(Status::Uninitialized("fault injection"));
97
14
        return _status.status();
98
14
    });
99
14
    _status.update(_load_stream_writer->init());
100
14
    if (!_status.ok()) {
101
1
        LOG(INFO) << "failed to init rowset builder due to " << *this;
102
1
    }
103
14
    return _status.status();
104
14
}
105
106
27
Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data) {
107
27
    if (!_status.ok()) {
108
1
        return _status.status();
109
1
    }
110
111
    // dispatch add_segment request
112
26
    if (header.opcode() == PStreamHeader::ADD_SEGMENT) {
113
0
        return add_segment(header, data);
114
0
    }
115
116
26
    SCOPED_TIMER(_append_data_timer);
117
118
26
    int64_t src_id = header.src_id();
119
26
    uint32_t segid = header.segment_id();
120
    // Ensure there are enough space and mapping are built.
121
26
    SegIdMapping* mapping = nullptr;
122
26
    {
123
26
        std::lock_guard lock_guard(_lock);
124
26
        if (!_segids_mapping.contains(src_id)) {
125
14
            _segids_mapping[src_id] = std::make_unique<SegIdMapping>();
126
14
        }
127
26
        mapping = _segids_mapping[src_id].get();
128
26
    }
129
26
    if (segid + 1 > mapping->size()) {
130
        // TODO: Each sender lock is enough.
131
15
        std::lock_guard lock_guard(_lock);
132
15
        ssize_t origin_size = mapping->size();
133
15
        if (segid + 1 > origin_size) {
134
15
            mapping->resize(segid + 1, std::numeric_limits<uint32_t>::max());
135
39
            for (size_t index = origin_size; index <= segid; index++) {
136
24
                mapping->at(index) = _next_segid;
137
24
                _next_segid++;
138
24
                VLOG_DEBUG << "src_id=" << src_id << ", segid=" << index << " to "
139
0
                           << " segid=" << _next_segid - 1 << ", " << *this;
140
24
            }
141
15
        }
142
15
    }
143
144
    // Each sender sends data in one segment sequential, so we also do not
145
    // need a lock here.
146
26
    bool eos = header.segment_eos();
147
26
    FileType file_type = header.file_type();
148
26
    uint32_t new_segid = mapping->at(segid);
149
26
    DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
150
26
    butil::IOBuf buf = data->movable();
151
26
    auto self = shared_from_this();
152
26
    auto flush_func = [self, new_segid, eos, buf, header, file_type]() mutable {
153
26
        signal::set_signal_task_id(self->_load_id);
154
26
        g_load_stream_flush_running_threads << -1;
155
26
        auto st =
156
26
                self->_load_stream_writer->append_data(new_segid, header.offset(), buf, file_type);
157
26
        if (!st.ok() && !config::is_cloud_mode()) {
158
1
            auto res = ExecEnv::get_tablet(self->_id);
159
1
            TabletSharedPtr tablet =
160
1
                    res.has_value() ? std::dynamic_pointer_cast<Tablet>(res.value()) : nullptr;
161
1
            if (tablet) {
162
1
                tablet->report_error(st);
163
1
            }
164
1
        }
165
26
        if (eos && st.ok()) {
166
21
            DBUG_EXECUTE_IF("TabletStream.append_data.unknown_file_type",
167
21
                            { file_type = static_cast<FileType>(-1); });
168
21
            if (file_type == FileType::SEGMENT_FILE || file_type == FileType::INVERTED_INDEX_FILE) {
169
21
                st = self->_load_stream_writer->close_writer(new_segid, file_type);
170
21
            } else {
171
0
                st = Status::InternalError(
172
0
                        "appent data failed, file type error, file type = {}, "
173
0
                        "segment_id={}",
174
0
                        file_type, new_segid);
175
0
            }
176
21
        }
177
26
        DBUG_EXECUTE_IF("TabletStream.append_data.append_failed",
178
26
                        { st = Status::InternalError("fault injection"); });
179
26
        if (!st.ok()) {
180
2
            self->_status.update(st);
181
2
            LOG(WARNING) << "write data failed " << st << ", " << *self;
182
2
        }
183
26
    };
184
26
    auto load_stream_flush_token_max_tasks = config::load_stream_flush_token_max_tasks;
185
26
    auto load_stream_max_wait_flush_token_time_ms =
186
26
            config::load_stream_max_wait_flush_token_time_ms;
187
26
    DBUG_EXECUTE_IF("TabletStream.append_data.long_wait", {
188
26
        load_stream_flush_token_max_tasks = 0;
189
26
        load_stream_max_wait_flush_token_time_ms = 1000;
190
26
    });
191
26
    MonotonicStopWatch timer;
192
26
    timer.start();
193
26
    while (_flush_token->num_tasks() >= load_stream_flush_token_max_tasks) {
194
0
        if (timer.elapsed_time() / 1000 / 1000 >= load_stream_max_wait_flush_token_time_ms) {
195
0
            _status.update(
196
0
                    Status::Error<true>("wait flush token back pressure time is more than "
197
0
                                        "load_stream_max_wait_flush_token_time {}",
198
0
                                        load_stream_max_wait_flush_token_time_ms));
199
0
            return _status.status();
200
0
        }
201
0
        bthread_usleep(2 * 1000); // 2ms
202
0
    }
203
26
    timer.stop();
204
26
    int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
205
26
    g_load_stream_flush_wait_ms << time_ms;
206
26
    g_load_stream_flush_running_threads << 1;
207
26
    Status st = Status::OK();
208
26
    DBUG_EXECUTE_IF("TabletStream.append_data.submit_func_failed",
209
26
                    { st = Status::InternalError("fault injection"); });
210
26
    if (st.ok()) {
211
26
        st = _flush_token->submit_func(flush_func);
212
26
    }
213
26
    if (!st.ok()) {
214
0
        _status.update(st);
215
0
    }
216
26
    return _status.status();
217
26
}
218
219
0
Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data) {
220
0
    if (!_status.ok()) {
221
0
        return _status.status();
222
0
    }
223
224
0
    SCOPED_TIMER(_add_segment_timer);
225
0
    DCHECK(header.has_segment_statistics());
226
0
    SegmentStatistics stat(header.segment_statistics());
227
228
0
    int64_t src_id = header.src_id();
229
0
    uint32_t segid = header.segment_id();
230
0
    uint32_t new_segid;
231
0
    DBUG_EXECUTE_IF("TabletStream.add_segment.unknown_segid", { segid = UNKNOWN_ID_FOR_TEST; });
232
0
    {
233
0
        std::lock_guard lock_guard(_lock);
234
0
        if (!_segids_mapping.contains(src_id)) {
235
0
            _status.update(Status::InternalError(
236
0
                    "add segment failed, no segment written by this src be yet, src_id={}, "
237
0
                    "segment_id={}",
238
0
                    src_id, segid));
239
0
            return _status.status();
240
0
        }
241
0
        DBUG_EXECUTE_IF("TabletStream.add_segment.segid_never_written",
242
0
                        { segid = static_cast<uint32_t>(_segids_mapping[src_id]->size()); });
243
0
        if (segid >= _segids_mapping[src_id]->size()) {
244
0
            _status.update(Status::InternalError(
245
0
                    "add segment failed, segment is never written, src_id={}, segment_id={}",
246
0
                    src_id, segid));
247
0
            return _status.status();
248
0
        }
249
0
        new_segid = _segids_mapping[src_id]->at(segid);
250
0
    }
251
0
    DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
252
253
0
    auto self = shared_from_this();
254
0
    auto add_segment_func = [self, new_segid, stat]() {
255
0
        signal::set_signal_task_id(self->_load_id);
256
0
        auto st = self->_load_stream_writer->add_segment(new_segid, stat);
257
0
        DBUG_EXECUTE_IF("TabletStream.add_segment.add_segment_failed",
258
0
                        { st = Status::InternalError("fault injection"); });
259
0
        if (!st.ok()) {
260
0
            self->_status.update(st);
261
0
            LOG(INFO) << "add segment failed " << *self;
262
0
        }
263
0
    };
264
0
    Status st = Status::OK();
265
0
    DBUG_EXECUTE_IF("TabletStream.add_segment.submit_func_failed",
266
0
                    { st = Status::InternalError("fault injection"); });
267
0
    if (st.ok()) {
268
0
        st = _flush_token->submit_func(add_segment_func);
269
0
    }
270
0
    if (!st.ok()) {
271
0
        _status.update(st);
272
0
    }
273
0
    return _status.status();
274
0
}
275
276
29
Status TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) {
277
29
    bthread::Mutex mu;
278
29
    std::unique_lock<bthread::Mutex> lock(mu);
279
29
    bthread::ConditionVariable cv;
280
29
    auto st = Status::OK();
281
29
    auto self = shared_from_this();
282
29
    auto func = [self, &mu, &cv, &st, &fn] {
283
29
        signal::set_signal_task_id(self->_load_id);
284
29
        st = fn();
285
29
        std::lock_guard<bthread::Mutex> lock(mu);
286
29
        cv.notify_one();
287
29
    };
288
29
    bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(func);
289
29
    if (!ret) {
290
0
        return Status::Error<ErrorCode::INTERNAL_ERROR>(
291
0
                "there is not enough thread resource for close load");
292
0
    }
293
29
    cv.wait(lock);
294
29
    return st;
295
29
}
296
297
14
void TabletStream::pre_close() {
298
14
    if (!_status.ok()) {
299
        // cancel all pending tasks, wait all running tasks to finish
300
2
        _flush_token->shutdown();
301
2
        return;
302
2
    }
303
304
12
    SCOPED_TIMER(_close_wait_timer);
305
12
    _status.update(_run_in_heavy_work_pool([this]() {
306
12
        _flush_token->wait();
307
12
        return Status::OK();
308
12
    }));
309
    // it is necessary to check status after wait_func,
310
    // for create_rowset could fail during add_segment when loading to MOW table,
311
    // in this case, should skip close to avoid submit_calc_delete_bitmap_task which could cause coredump.
312
12
    if (!_status.ok()) {
313
1
        return;
314
1
    }
315
316
11
    DBUG_EXECUTE_IF("TabletStream.close.segment_num_mismatch", { _num_segments++; });
317
11
    if (_check_num_segments && (_next_segid.load() != _num_segments)) {
318
2
        _status.update(Status::Corruption(
319
2
                "segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id,
320
2
                _num_segments, _next_segid.load(), print_id(_load_id)));
321
2
        return;
322
2
    }
323
324
9
    _status.update(_run_in_heavy_work_pool([this]() { return _load_stream_writer->pre_close(); }));
325
9
}
326
327
14
Status TabletStream::close() {
328
14
    if (!_status.ok()) {
329
6
        return _status.status();
330
6
    }
331
332
8
    SCOPED_TIMER(_close_wait_timer);
333
8
    _status.update(_run_in_heavy_work_pool([this]() { return _load_stream_writer->close(); }));
334
8
    return _status.status();
335
14
}
336
337
IndexStream::IndexStream(const PUniqueId& load_id, int64_t id, int64_t txn_id,
338
                         std::shared_ptr<OlapTableSchemaParam> schema,
339
                         LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile)
340
28
        : _id(id),
341
28
          _load_id(load_id),
342
28
          _txn_id(txn_id),
343
28
          _schema(schema),
344
28
          _load_stream_mgr(load_stream_mgr) {
345
28
    _profile = profile->create_child(fmt::format("IndexStream {}", id), true, true);
346
28
    _append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
347
28
    _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
348
28
}
349
350
27
Status IndexStream::append_data(const PStreamHeader& header, butil::IOBuf* data) {
351
27
    SCOPED_TIMER(_append_data_timer);
352
27
    int64_t tablet_id = header.tablet_id();
353
27
    TabletStreamSharedPtr tablet_stream;
354
27
    {
355
27
        std::lock_guard lock_guard(_lock);
356
27
        auto it = _tablet_streams_map.find(tablet_id);
357
27
        if (it == _tablet_streams_map.end()) {
358
13
            _init_tablet_stream(tablet_stream, tablet_id, header.partition_id());
359
14
        } else {
360
14
            tablet_stream = it->second;
361
14
        }
362
27
    }
363
364
27
    return tablet_stream->append_data(header, data);
365
27
}
366
367
void IndexStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id,
368
14
                                      int64_t partition_id) {
369
14
    tablet_stream = std::make_shared<TabletStream>(_load_id, tablet_id, _txn_id, _load_stream_mgr,
370
14
                                                   _profile);
371
14
    _tablet_streams_map[tablet_id] = tablet_stream;
372
14
    auto st = tablet_stream->init(_schema, _id, partition_id);
373
14
    if (!st.ok()) {
374
1
        LOG(WARNING) << "tablet stream init failed " << *tablet_stream;
375
1
    }
376
14
}
377
378
0
void IndexStream::get_all_write_tablet_ids(std::vector<int64_t>* tablet_ids) {
379
0
    std::lock_guard lock_guard(_lock);
380
0
    for (const auto& [tablet_id, _] : _tablet_streams_map) {
381
0
        tablet_ids->push_back(tablet_id);
382
0
    }
383
0
}
384
385
void IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
386
28
                        std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) {
387
28
    std::lock_guard lock_guard(_lock);
388
28
    SCOPED_TIMER(_close_wait_timer);
389
    // open all need commit tablets
390
34
    for (const auto& tablet : tablets_to_commit) {
391
34
        if (_id != tablet.index_id()) {
392
18
            continue;
393
18
        }
394
16
        TabletStreamSharedPtr tablet_stream;
395
16
        auto it = _tablet_streams_map.find(tablet.tablet_id());
396
16
        if (it == _tablet_streams_map.end()) {
397
1
            _init_tablet_stream(tablet_stream, tablet.tablet_id(), tablet.partition_id());
398
15
        } else {
399
15
            tablet_stream = it->second;
400
15
        }
401
16
        if (tablet.has_num_segments()) {
402
16
            tablet_stream->add_num_segments(tablet.num_segments());
403
16
        } else {
404
            // for compatibility reasons (sink from old version BE)
405
0
            tablet_stream->disable_num_segments_check();
406
0
        }
407
16
    }
408
409
28
    for (auto& [_, tablet_stream] : _tablet_streams_map) {
410
14
        tablet_stream->pre_close();
411
14
    }
412
413
28
    for (auto& [_, tablet_stream] : _tablet_streams_map) {
414
14
        auto st = tablet_stream->close();
415
14
        if (st.ok()) {
416
8
            success_tablet_ids->push_back(tablet_stream->id());
417
8
        } else {
418
6
            LOG(INFO) << "close tablet stream " << *tablet_stream << ", status=" << st;
419
6
            failed_tablets->emplace_back(tablet_stream->id(), st);
420
6
        }
421
14
    }
422
28
}
423
424
// TODO: Profile is temporary disabled, because:
425
// 1. It's not being processed by the upstream for now
426
// 2. There are some problems in _profile->to_thrift()
427
LoadStream::LoadStream(const PUniqueId& load_id, LoadStreamMgr* load_stream_mgr,
428
                       bool enable_profile)
429
14
        : _load_id(load_id), _enable_profile(false), _load_stream_mgr(load_stream_mgr) {
430
14
    g_load_stream_cnt << 1;
431
14
    _profile = std::make_unique<RuntimeProfile>("LoadStream");
432
14
    _append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
433
14
    _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
434
14
    TUniqueId load_tid = ((UniqueId)load_id).to_thrift();
435
#ifndef BE_TEST
436
    std::shared_ptr<QueryContext> query_context =
437
            ExecEnv::GetInstance()->fragment_mgr()->get_query_ctx(load_tid);
438
    if (query_context != nullptr) {
439
        _resource_ctx = query_context->resource_ctx();
440
    } else {
441
        _resource_ctx = ResourceContext::create_shared();
442
        _resource_ctx->task_controller()->set_task_id(load_tid);
443
        std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared(
444
                MemTrackerLimiter::Type::LOAD,
445
                fmt::format("(FromLoadStream)Load#Id={}", ((UniqueId)load_id).to_string()));
446
        _resource_ctx->memory_context()->set_mem_tracker(mem_tracker);
447
    }
448
#else
449
14
    _resource_ctx = ResourceContext::create_shared();
450
14
    _resource_ctx->task_controller()->set_task_id(load_tid);
451
14
    std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared(
452
14
            MemTrackerLimiter::Type::LOAD,
453
14
            fmt::format("(FromLoadStream)Load#Id={}", ((UniqueId)load_id).to_string()));
454
14
    _resource_ctx->memory_context()->set_mem_tracker(mem_tracker);
455
14
#endif
456
14
}
457
458
14
LoadStream::~LoadStream() {
459
14
    g_load_stream_cnt << -1;
460
14
    LOG(INFO) << "load stream is deconstructed " << *this;
461
14
}
462
463
14
Status LoadStream::init(const POpenLoadStreamRequest* request) {
464
14
    _txn_id = request->txn_id();
465
14
    _total_streams = static_cast<int32_t>(request->total_streams());
466
14
    _is_incremental = (_total_streams == 0);
467
468
14
    _schema = std::make_shared<OlapTableSchemaParam>();
469
14
    RETURN_IF_ERROR(_schema->init(request->schema()));
470
28
    for (auto& index : request->schema().indexes()) {
471
28
        _index_streams_map[index.id()] = std::make_shared<IndexStream>(
472
28
                _load_id, index.id(), _txn_id, _schema, _load_stream_mgr, _profile.get());
473
28
    }
474
14
    LOG(INFO) << "succeed to init load stream " << *this;
475
14
    return Status::OK();
476
14
}
477
478
bool LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
479
16
                       std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) {
480
16
    std::lock_guard<bthread::Mutex> lock_guard(_lock);
481
16
    SCOPED_TIMER(_close_wait_timer);
482
483
    // we do nothing until recv CLOSE_LOAD from all stream to ensure all data are handled before ack
484
16
    _open_streams[src_id]--;
485
16
    if (_open_streams[src_id] == 0) {
486
16
        _open_streams.erase(src_id);
487
16
    }
488
16
    _close_load_cnt++;
489
16
    LOG(INFO) << "received CLOSE_LOAD from sender " << src_id << ", remaining "
490
16
              << _total_streams - _close_load_cnt << " senders, " << *this;
491
492
16
    _tablets_to_commit.insert(_tablets_to_commit.end(), tablets_to_commit.begin(),
493
16
                              tablets_to_commit.end());
494
495
16
    if (_close_load_cnt < _total_streams) {
496
        // do not return commit info if there is remaining streams.
497
2
        return false;
498
2
    }
499
500
28
    for (auto& [_, index_stream] : _index_streams_map) {
501
28
        index_stream->close(_tablets_to_commit, success_tablet_ids, failed_tablets);
502
28
    }
503
14
    LOG(INFO) << "close load " << *this << ", success_tablet_num=" << success_tablet_ids->size()
504
14
              << ", failed_tablet_num=" << failed_tablets->size();
505
14
    return true;
506
16
}
507
508
void LoadStream::_report_result(StreamId stream, const Status& status,
509
                                const std::vector<int64_t>& success_tablet_ids,
510
36
                                const FailedTablets& failed_tablets, bool eos) {
511
36
    LOG(INFO) << "report result " << *this << ", success tablet num " << success_tablet_ids.size()
512
36
              << ", failed tablet num " << failed_tablets.size();
513
36
    butil::IOBuf buf;
514
36
    PLoadStreamResponse response;
515
36
    response.set_eos(eos);
516
36
    status.to_protobuf(response.mutable_status());
517
36
    for (auto& id : success_tablet_ids) {
518
8
        response.add_success_tablet_ids(id);
519
8
    }
520
36
    for (auto& [id, st] : failed_tablets) {
521
10
        auto pb = response.add_failed_tablets();
522
10
        pb->set_id(id);
523
10
        st.to_protobuf(pb->mutable_status());
524
10
    }
525
526
36
    if (_enable_profile && _close_load_cnt == _total_streams) {
527
0
        TRuntimeProfileTree tprofile;
528
0
        ThriftSerializer ser(false, 4096);
529
0
        uint8_t* profile_buf = nullptr;
530
0
        uint32_t len = 0;
531
0
        std::unique_lock<bthread::Mutex> l(_lock);
532
533
0
        _profile->to_thrift(&tprofile);
534
0
        auto st = ser.serialize(&tprofile, &len, &profile_buf);
535
0
        if (st.ok()) {
536
0
            response.set_load_stream_profile(profile_buf, len);
537
0
        } else {
538
0
            LOG(WARNING) << "TRuntimeProfileTree serialize failed, errmsg=" << st << ", " << *this;
539
0
        }
540
0
    }
541
542
36
    buf.append(response.SerializeAsString());
543
36
    auto wst = _write_stream(stream, buf);
544
36
    if (!wst.ok()) {
545
0
        LOG(WARNING) << " report result failed with " << wst << ", " << *this;
546
0
    }
547
36
}
548
549
0
void LoadStream::_report_schema(StreamId stream, const PStreamHeader& hdr) {
550
0
    butil::IOBuf buf;
551
0
    PLoadStreamResponse response;
552
0
    Status st = Status::OK();
553
0
    for (const auto& req : hdr.tablets()) {
554
0
        BaseTabletSPtr tablet;
555
0
        if (auto res = ExecEnv::get_tablet(req.tablet_id()); res.has_value()) {
556
0
            tablet = std::move(res).value();
557
0
        } else {
558
0
            st = std::move(res).error();
559
0
            break;
560
0
        }
561
0
        auto* resp = response.add_tablet_schemas();
562
0
        resp->set_index_id(req.index_id());
563
0
        resp->set_enable_unique_key_merge_on_write(tablet->enable_unique_key_merge_on_write());
564
0
        tablet->tablet_schema()->to_schema_pb(resp->mutable_tablet_schema());
565
0
    }
566
0
    st.to_protobuf(response.mutable_status());
567
568
0
    buf.append(response.SerializeAsString());
569
0
    auto wst = _write_stream(stream, buf);
570
0
    if (!wst.ok()) {
571
0
        LOG(WARNING) << " report result failed with " << wst << ", " << *this;
572
0
    }
573
0
}
574
575
0
void LoadStream::_report_tablet_load_info(StreamId stream, int64_t index_id) {
576
0
    std::vector<int64_t> write_tablet_ids;
577
0
    auto it = _index_streams_map.find(index_id);
578
0
    if (it != _index_streams_map.end()) {
579
0
        it->second->get_all_write_tablet_ids(&write_tablet_ids);
580
0
    }
581
582
0
    if (!write_tablet_ids.empty()) {
583
0
        butil::IOBuf buf;
584
0
        PLoadStreamResponse response;
585
0
        auto* tablet_load_infos = response.mutable_tablet_load_rowset_num_infos();
586
0
        _collect_tablet_load_info_from_tablets(write_tablet_ids, tablet_load_infos);
587
0
        buf.append(response.SerializeAsString());
588
0
        auto wst = _write_stream(stream, buf);
589
0
        if (!wst.ok()) {
590
0
            LOG(WARNING) << "report tablet load info failed with " << wst << ", " << *this;
591
0
        }
592
0
    }
593
0
}
594
595
void LoadStream::_collect_tablet_load_info_from_tablets(
596
        const std::vector<int64_t>& tablet_ids,
597
0
        google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>* tablet_load_infos) {
598
0
    for (auto tablet_id : tablet_ids) {
599
0
        BaseTabletSPtr tablet;
600
0
        if (auto res = ExecEnv::get_tablet(tablet_id); res.has_value()) {
601
0
            tablet = std::move(res).value();
602
0
        } else {
603
0
            continue;
604
0
        }
605
0
        BaseDeltaWriter::collect_tablet_load_rowset_num_info(tablet.get(), tablet_load_infos);
606
0
    }
607
0
}
608
609
36
Status LoadStream::_write_stream(StreamId stream, butil::IOBuf& buf) {
610
36
    for (;;) {
611
36
        int ret = 0;
612
36
        DBUG_EXECUTE_IF("LoadStream._write_stream.EAGAIN", { ret = EAGAIN; });
613
36
        if (ret == 0) {
614
36
            ret = brpc::StreamWrite(stream, buf);
615
36
        }
616
36
        switch (ret) {
617
36
        case 0:
618
36
            return Status::OK();
619
0
        case EAGAIN: {
620
0
            const timespec time = butil::seconds_from_now(config::load_stream_eagain_wait_seconds);
621
0
            int wait_ret = brpc::StreamWait(stream, &time);
622
0
            if (wait_ret != 0) {
623
0
                return Status::InternalError("StreamWait failed, err={}", wait_ret);
624
0
            }
625
0
            break;
626
0
        }
627
0
        default:
628
0
            return Status::InternalError("StreamWrite failed, err={}", ret);
629
36
        }
630
36
    }
631
0
    return Status::OK();
632
36
}
633
634
62
void LoadStream::_parse_header(butil::IOBuf* const message, PStreamHeader& hdr) {
635
62
    butil::IOBufAsZeroCopyInputStream wrapper(*message);
636
62
    hdr.ParseFromZeroCopyStream(&wrapper);
637
62
    VLOG_DEBUG << "header parse result: " << hdr.DebugString();
638
62
}
639
640
28
Status LoadStream::_append_data(const PStreamHeader& header, butil::IOBuf* data) {
641
28
    SCOPED_TIMER(_append_data_timer);
642
28
    IndexStreamSharedPtr index_stream;
643
644
28
    int64_t index_id = header.index_id();
645
28
    DBUG_EXECUTE_IF("TabletStream._append_data.unknown_indexid",
646
28
                    { index_id = UNKNOWN_ID_FOR_TEST; });
647
28
    auto it = _index_streams_map.find(index_id);
648
28
    if (it == _index_streams_map.end()) {
649
1
        return Status::Error<ErrorCode::INVALID_ARGUMENT>("unknown index_id {}", index_id);
650
27
    } else {
651
27
        index_stream = it->second;
652
27
    }
653
654
27
    return index_stream->append_data(header, data);
655
28
}
656
657
50
int LoadStream::on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) {
658
50
    VLOG_DEBUG << "on_received_messages " << id << " " << size;
659
112
    for (size_t i = 0; i < size; ++i) {
660
124
        while (messages[i]->size() > 0) {
661
            // step 1: parse header
662
62
            size_t hdr_len = 0;
663
62
            messages[i]->cutn((void*)&hdr_len, sizeof(size_t));
664
62
            butil::IOBuf hdr_buf;
665
62
            PStreamHeader hdr;
666
62
            messages[i]->cutn(&hdr_buf, hdr_len);
667
62
            _parse_header(&hdr_buf, hdr);
668
669
            // step 2: cut data
670
62
            size_t data_len = 0;
671
62
            messages[i]->cutn((void*)&data_len, sizeof(size_t));
672
62
            butil::IOBuf data_buf;
673
62
            PStreamHeader data;
674
62
            messages[i]->cutn(&data_buf, data_len);
675
676
            // step 3: dispatch
677
62
            _dispatch(id, hdr, &data_buf);
678
62
        }
679
62
    }
680
50
    return 0;
681
50
}
682
683
62
void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* data) {
684
62
    VLOG_DEBUG << PStreamHeader_Opcode_Name(hdr.opcode()) << " from " << hdr.src_id()
685
0
               << " with tablet " << hdr.tablet_id();
686
62
    SCOPED_ATTACH_TASK(_resource_ctx);
687
    // CLOSE_LOAD message should not be fault injected,
688
    // otherwise the message will be ignored and causing close wait timeout
689
62
    if (hdr.opcode() != PStreamHeader::CLOSE_LOAD) {
690
30
        DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_loadid", {
691
30
            PStreamHeader& t_hdr = const_cast<PStreamHeader&>(hdr);
692
30
            PUniqueId* load_id = t_hdr.mutable_load_id();
693
30
            load_id->set_hi(UNKNOWN_ID_FOR_TEST);
694
30
            load_id->set_lo(UNKNOWN_ID_FOR_TEST);
695
30
        });
696
30
        DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_srcid", {
697
30
            PStreamHeader& t_hdr = const_cast<PStreamHeader&>(hdr);
698
30
            t_hdr.set_src_id(UNKNOWN_ID_FOR_TEST);
699
30
        });
700
30
    }
701
62
    if (UniqueId(hdr.load_id()) != UniqueId(_load_id)) {
702
1
        Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>(
703
1
                "invalid load id {}, expected {}", print_id(hdr.load_id()), print_id(_load_id));
704
1
        _report_failure(id, st, hdr);
705
1
        return;
706
1
    }
707
708
61
    {
709
61
        std::lock_guard lock_guard(_lock);
710
61
        if (!_open_streams.contains(hdr.src_id())) {
711
17
            Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>("no open stream from source {}",
712
17
                                                                   hdr.src_id());
713
17
            _report_failure(id, st, hdr);
714
17
            return;
715
17
        }
716
61
    }
717
718
44
    switch (hdr.opcode()) {
719
0
    case PStreamHeader::ADD_SEGMENT: {
720
0
        auto st = _append_data(hdr, data);
721
0
        if (!st.ok()) {
722
0
            _report_failure(id, st, hdr);
723
0
        } else {
724
            // Report tablet load info only on ADD_SEGMENT to reduce frequency.
725
            // ADD_SEGMENT is sent once per segment, while APPEND_DATA is sent
726
            // for every data batch. This reduces unnecessary writes and avoids
727
            // potential stream write failures when the sender is closing.
728
0
            _report_tablet_load_info(id, hdr.index_id());
729
0
        }
730
0
    } break;
731
28
    case PStreamHeader::APPEND_DATA: {
732
28
        auto st = _append_data(hdr, data);
733
28
        if (!st.ok()) {
734
2
            _report_failure(id, st, hdr);
735
2
        }
736
28
    } break;
737
16
    case PStreamHeader::CLOSE_LOAD: {
738
16
        std::vector<int64_t> success_tablet_ids;
739
16
        FailedTablets failed_tablets;
740
16
        std::vector<PTabletID> tablets_to_commit(hdr.tablets().begin(), hdr.tablets().end());
741
16
        bool all_closed =
742
16
                close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets);
743
16
        _report_result(id, Status::OK(), success_tablet_ids, failed_tablets, true);
744
16
        std::lock_guard<bthread::Mutex> lock_guard(_lock);
745
        // if incremental stream, we need to wait for all non-incremental streams to be closed
746
        // before closing incremental streams. We need a fencing mechanism to avoid use after closing
747
        // across different be.
748
16
        if (hdr.has_num_incremental_streams() && hdr.num_incremental_streams() > 0) {
749
0
            _closing_stream_ids.push_back(id);
750
16
        } else {
751
16
            brpc::StreamClose(id);
752
16
        }
753
754
16
        if (all_closed) {
755
14
            for (auto& closing_id : _closing_stream_ids) {
756
0
                brpc::StreamClose(closing_id);
757
0
            }
758
14
            _closing_stream_ids.clear();
759
14
        }
760
16
    } break;
761
0
    case PStreamHeader::GET_SCHEMA: {
762
0
        _report_schema(id, hdr);
763
0
    } break;
764
0
    default:
765
0
        LOG(WARNING) << "unexpected stream message " << hdr.opcode() << ", " << *this;
766
0
        DCHECK(false);
767
44
    }
768
44
}
769
770
0
void LoadStream::on_idle_timeout(StreamId id) {
771
0
    LOG(WARNING) << "closing load stream on idle timeout, " << *this;
772
0
    brpc::StreamClose(id);
773
0
}
774
775
16
void LoadStream::on_closed(StreamId id) {
776
    // `this` may be freed by other threads after increasing `_close_rpc_cnt`,
777
    // format string first to prevent use-after-free
778
16
    std::stringstream ss;
779
16
    ss << *this;
780
16
    auto remaining_streams = _total_streams - _close_rpc_cnt.fetch_add(1) - 1;
781
16
    LOG(INFO) << "stream " << id << " on_closed, remaining streams = " << remaining_streams << ", "
782
16
              << ss.str();
783
16
    if (remaining_streams == 0) {
784
14
        _load_stream_mgr->clear_load(_load_id);
785
14
    }
786
16
}
787
788
110
inline std::ostream& operator<<(std::ostream& ostr, const LoadStream& load_stream) {
789
110
    ostr << "load_id=" << print_id(load_stream._load_id) << ", txn_id=" << load_stream._txn_id;
790
110
    return ostr;
791
110
}
792
793
#include "common/compile_check_end.h"
794
} // namespace doris