Coverage Report

Created: 2026-03-24 20:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/channel/load_stream.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/channel/load_stream.h"
19
20
#include <brpc/stream.h>
21
#include <bthread/bthread.h>
22
#include <bthread/condition_variable.h>
23
#include <bthread/mutex.h>
24
25
#include <memory>
26
#include <sstream>
27
28
#include "bvar/bvar.h"
29
#include "cloud/config.h"
30
#include "common/signal_handler.h"
31
#include "load/channel/load_channel.h"
32
#include "load/channel/load_stream_mgr.h"
33
#include "load/channel/load_stream_writer.h"
34
#include "load/delta_writer/delta_writer.h"
35
#include "runtime/exec_env.h"
36
#include "runtime/fragment_mgr.h"
37
#include "runtime/runtime_profile.h"
38
#include "runtime/workload_group/workload_group_manager.h"
39
#include "storage/rowset/rowset_factory.h"
40
#include "storage/rowset/rowset_meta.h"
41
#include "storage/storage_engine.h"
42
#include "storage/tablet/tablet.h"
43
#include "storage/tablet/tablet_fwd.h"
44
#include "storage/tablet/tablet_manager.h"
45
#include "storage/tablet/tablet_schema.h"
46
#include "storage/tablet_info.h"
47
#include "util/debug_points.h"
48
#include "util/thrift_util.h"
49
#include "util/uid_util.h"
50
51
#define UNKNOWN_ID_FOR_TEST 0x7c00
52
53
namespace doris {
54
#include "common/compile_check_begin.h"
55
56
bvar::Adder<int64_t> g_load_stream_cnt("load_stream_count");
57
bvar::LatencyRecorder g_load_stream_flush_wait_ms("load_stream_flush_wait_ms");
58
bvar::Adder<int> g_load_stream_flush_running_threads("load_stream_flush_wait_threads");
59
60
TabletStream::TabletStream(const PUniqueId& load_id, int64_t id, int64_t txn_id,
61
                           LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile)
62
28
        : _id(id),
63
28
          _next_segid(0),
64
28
          _load_id(load_id),
65
28
          _txn_id(txn_id),
66
28
          _load_stream_mgr(load_stream_mgr) {
67
28
    load_stream_mgr->create_token(_flush_token);
68
28
    _profile = profile->create_child(fmt::format("TabletStream {}", id), true, true);
69
28
    _append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
70
28
    _add_segment_timer = ADD_TIMER(_profile, "AddSegmentTime");
71
28
    _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
72
28
}
73
74
20
inline std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream) {
75
20
    ostr << "load_id=" << print_id(tablet_stream._load_id) << ", txn_id=" << tablet_stream._txn_id
76
20
         << ", tablet_id=" << tablet_stream._id << ", status=" << tablet_stream._status.status();
77
20
    return ostr;
78
20
}
79
80
Status TabletStream::init(std::shared_ptr<OlapTableSchemaParam> schema, int64_t index_id,
81
28
                          int64_t partition_id) {
82
28
    WriteRequest req {
83
28
            .tablet_id = _id,
84
28
            .txn_id = _txn_id,
85
28
            .index_id = index_id,
86
28
            .partition_id = partition_id,
87
28
            .load_id = _load_id,
88
28
            .table_schema_param = schema,
89
            // TODO(plat1ko): write_file_cache
90
28
            .storage_vault_id {},
91
28
    };
92
93
28
    _load_stream_writer = std::make_shared<LoadStreamWriter>(&req, _profile);
94
28
    DBUG_EXECUTE_IF("TabletStream.init.uninited_writer", {
95
28
        _status.update(Status::Uninitialized("fault injection"));
96
28
        return _status.status();
97
28
    });
98
28
    _status.update(_load_stream_writer->init());
99
28
    if (!_status.ok()) {
100
2
        LOG(INFO) << "failed to init rowset builder due to " << *this;
101
2
    }
102
28
    return _status.status();
103
28
}
104
105
54
Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data) {
106
54
    if (!_status.ok()) {
107
2
        return _status.status();
108
2
    }
109
110
    // dispatch add_segment request
111
52
    if (header.opcode() == PStreamHeader::ADD_SEGMENT) {
112
0
        return add_segment(header, data);
113
0
    }
114
115
52
    SCOPED_TIMER(_append_data_timer);
116
117
52
    int64_t src_id = header.src_id();
118
52
    uint32_t segid = header.segment_id();
119
    // Ensure there are enough space and mapping are built.
120
52
    SegIdMapping* mapping = nullptr;
121
52
    {
122
52
        std::lock_guard lock_guard(_lock);
123
52
        if (!_segids_mapping.contains(src_id)) {
124
28
            _segids_mapping[src_id] = std::make_unique<SegIdMapping>();
125
28
        }
126
52
        mapping = _segids_mapping[src_id].get();
127
52
    }
128
52
    if (segid + 1 > mapping->size()) {
129
        // TODO: Each sender lock is enough.
130
30
        std::lock_guard lock_guard(_lock);
131
30
        ssize_t origin_size = mapping->size();
132
30
        if (segid + 1 > origin_size) {
133
30
            mapping->resize(segid + 1, std::numeric_limits<uint32_t>::max());
134
78
            for (size_t index = origin_size; index <= segid; index++) {
135
48
                mapping->at(index) = _next_segid;
136
48
                _next_segid++;
137
48
                VLOG_DEBUG << "src_id=" << src_id << ", segid=" << index << " to "
138
0
                           << " segid=" << _next_segid - 1 << ", " << *this;
139
48
            }
140
30
        }
141
30
    }
142
143
    // Each sender sends data in one segment sequential, so we also do not
144
    // need a lock here.
145
52
    bool eos = header.segment_eos();
146
52
    FileType file_type = header.file_type();
147
52
    uint32_t new_segid = mapping->at(segid);
148
52
    DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
149
52
    butil::IOBuf buf = data->movable();
150
52
    auto flush_func = [this, new_segid, eos, buf, header, file_type]() mutable {
151
52
        signal::set_signal_task_id(_load_id);
152
52
        g_load_stream_flush_running_threads << -1;
153
52
        auto st = _load_stream_writer->append_data(new_segid, header.offset(), buf, file_type);
154
52
        if (!st.ok() && !config::is_cloud_mode()) {
155
2
            auto res = ExecEnv::get_tablet(_id);
156
2
            TabletSharedPtr tablet =
157
2
                    res.has_value() ? std::dynamic_pointer_cast<Tablet>(res.value()) : nullptr;
158
2
            if (tablet) {
159
2
                tablet->report_error(st);
160
2
            }
161
2
        }
162
52
        if (eos && st.ok()) {
163
42
            DBUG_EXECUTE_IF("TabletStream.append_data.unknown_file_type",
164
42
                            { file_type = static_cast<FileType>(-1); });
165
42
            if (file_type == FileType::SEGMENT_FILE || file_type == FileType::INVERTED_INDEX_FILE) {
166
42
                st = _load_stream_writer->close_writer(new_segid, file_type);
167
42
            } else {
168
0
                st = Status::InternalError(
169
0
                        "appent data failed, file type error, file type = {}, "
170
0
                        "segment_id={}",
171
0
                        file_type, new_segid);
172
0
            }
173
42
        }
174
52
        DBUG_EXECUTE_IF("TabletStream.append_data.append_failed",
175
52
                        { st = Status::InternalError("fault injection"); });
176
52
        if (!st.ok()) {
177
4
            _status.update(st);
178
4
            LOG(WARNING) << "write data failed " << st << ", " << *this;
179
4
        }
180
52
    };
181
52
    auto load_stream_flush_token_max_tasks = config::load_stream_flush_token_max_tasks;
182
52
    auto load_stream_max_wait_flush_token_time_ms =
183
52
            config::load_stream_max_wait_flush_token_time_ms;
184
52
    DBUG_EXECUTE_IF("TabletStream.append_data.long_wait", {
185
52
        load_stream_flush_token_max_tasks = 0;
186
52
        load_stream_max_wait_flush_token_time_ms = 1000;
187
52
    });
188
52
    MonotonicStopWatch timer;
189
52
    timer.start();
190
52
    while (_flush_token->num_tasks() >= load_stream_flush_token_max_tasks) {
191
0
        if (timer.elapsed_time() / 1000 / 1000 >= load_stream_max_wait_flush_token_time_ms) {
192
0
            _status.update(
193
0
                    Status::Error<true>("wait flush token back pressure time is more than "
194
0
                                        "load_stream_max_wait_flush_token_time {}",
195
0
                                        load_stream_max_wait_flush_token_time_ms));
196
0
            return _status.status();
197
0
        }
198
0
        bthread_usleep(2 * 1000); // 2ms
199
0
    }
200
52
    timer.stop();
201
52
    int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
202
52
    g_load_stream_flush_wait_ms << time_ms;
203
52
    g_load_stream_flush_running_threads << 1;
204
52
    Status st = Status::OK();
205
52
    DBUG_EXECUTE_IF("TabletStream.append_data.submit_func_failed",
206
52
                    { st = Status::InternalError("fault injection"); });
207
52
    if (st.ok()) {
208
52
        st = _flush_token->submit_func(flush_func);
209
52
    }
210
52
    if (!st.ok()) {
211
0
        _status.update(st);
212
0
    }
213
52
    return _status.status();
214
52
}
215
216
0
Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data) {
217
0
    if (!_status.ok()) {
218
0
        return _status.status();
219
0
    }
220
221
0
    SCOPED_TIMER(_add_segment_timer);
222
0
    DCHECK(header.has_segment_statistics());
223
0
    SegmentStatistics stat(header.segment_statistics());
224
225
0
    int64_t src_id = header.src_id();
226
0
    uint32_t segid = header.segment_id();
227
0
    uint32_t new_segid;
228
0
    DBUG_EXECUTE_IF("TabletStream.add_segment.unknown_segid", { segid = UNKNOWN_ID_FOR_TEST; });
229
0
    {
230
0
        std::lock_guard lock_guard(_lock);
231
0
        if (!_segids_mapping.contains(src_id)) {
232
0
            _status.update(Status::InternalError(
233
0
                    "add segment failed, no segment written by this src be yet, src_id={}, "
234
0
                    "segment_id={}",
235
0
                    src_id, segid));
236
0
            return _status.status();
237
0
        }
238
0
        DBUG_EXECUTE_IF("TabletStream.add_segment.segid_never_written",
239
0
                        { segid = static_cast<uint32_t>(_segids_mapping[src_id]->size()); });
240
0
        if (segid >= _segids_mapping[src_id]->size()) {
241
0
            _status.update(Status::InternalError(
242
0
                    "add segment failed, segment is never written, src_id={}, segment_id={}",
243
0
                    src_id, segid));
244
0
            return _status.status();
245
0
        }
246
0
        new_segid = _segids_mapping[src_id]->at(segid);
247
0
    }
248
0
    DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
249
250
0
    auto add_segment_func = [this, new_segid, stat]() {
251
0
        signal::set_signal_task_id(_load_id);
252
0
        auto st = _load_stream_writer->add_segment(new_segid, stat);
253
0
        DBUG_EXECUTE_IF("TabletStream.add_segment.add_segment_failed",
254
0
                        { st = Status::InternalError("fault injection"); });
255
0
        if (!st.ok()) {
256
0
            _status.update(st);
257
0
            LOG(INFO) << "add segment failed " << *this;
258
0
        }
259
0
    };
260
0
    Status st = Status::OK();
261
0
    DBUG_EXECUTE_IF("TabletStream.add_segment.submit_func_failed",
262
0
                    { st = Status::InternalError("fault injection"); });
263
0
    if (st.ok()) {
264
0
        st = _flush_token->submit_func(add_segment_func);
265
0
    }
266
0
    if (!st.ok()) {
267
0
        _status.update(st);
268
0
    }
269
0
    return _status.status();
270
0
}
271
272
59
Status TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) {
273
59
    bthread::Mutex mu;
274
59
    std::unique_lock<bthread::Mutex> lock(mu);
275
59
    bthread::ConditionVariable cv;
276
59
    auto st = Status::OK();
277
59
    auto func = [this, &mu, &cv, &st, &fn] {
278
59
        signal::set_signal_task_id(_load_id);
279
59
        st = fn();
280
59
        std::lock_guard<bthread::Mutex> lock(mu);
281
59
        cv.notify_one();
282
59
    };
283
59
    bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(func);
284
59
    if (!ret) {
285
0
        return Status::Error<ErrorCode::INTERNAL_ERROR>(
286
0
                "there is not enough thread resource for close load");
287
0
    }
288
59
    cv.wait(lock);
289
59
    return st;
290
59
}
291
292
56
void TabletStream::wait_for_flush_tasks() {
293
56
    {
294
56
        std::lock_guard lock_guard(_lock);
295
56
        if (_flush_tasks_done) {
296
28
            return;
297
28
        }
298
28
        _flush_tasks_done = true;
299
28
    }
300
301
28
    if (!_status.ok()) {
302
3
        _flush_token->shutdown();
303
3
        return;
304
3
    }
305
306
    // Use heavy_work_pool to avoid blocking bthread
307
25
    auto st = _run_in_heavy_work_pool([this]() {
308
25
        _flush_token->wait();
309
25
        return Status::OK();
310
25
    });
311
25
    if (!st.ok()) {
312
        // If heavy_work_pool is unavailable, fall back to shutdown
313
        // which will cancel pending tasks and wait for running tasks
314
0
        _flush_token->shutdown();
315
0
        _status.update(st);
316
0
    }
317
25
}
318
319
28
void TabletStream::pre_close() {
320
28
    SCOPED_TIMER(_close_wait_timer);
321
28
    wait_for_flush_tasks();
322
323
28
    if (!_status.ok()) {
324
6
        return;
325
6
    }
326
327
22
    DBUG_EXECUTE_IF("TabletStream.close.segment_num_mismatch", { _num_segments++; });
328
22
    if (_check_num_segments && (_next_segid.load() != _num_segments)) {
329
4
        _status.update(Status::Corruption(
330
4
                "segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id,
331
4
                _num_segments, _next_segid.load(), print_id(_load_id)));
332
4
        return;
333
4
    }
334
335
18
    _status.update(_run_in_heavy_work_pool([this]() { return _load_stream_writer->pre_close(); }));
336
18
}
337
338
28
Status TabletStream::close() {
339
28
    if (!_status.ok()) {
340
12
        return _status.status();
341
12
    }
342
343
16
    SCOPED_TIMER(_close_wait_timer);
344
16
    _status.update(_run_in_heavy_work_pool([this]() { return _load_stream_writer->close(); }));
345
16
    return _status.status();
346
28
}
347
348
IndexStream::IndexStream(const PUniqueId& load_id, int64_t id, int64_t txn_id,
349
                         std::shared_ptr<OlapTableSchemaParam> schema,
350
                         LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile)
351
56
        : _id(id),
352
56
          _load_id(load_id),
353
56
          _txn_id(txn_id),
354
56
          _schema(schema),
355
56
          _load_stream_mgr(load_stream_mgr) {
356
56
    _profile = profile->create_child(fmt::format("IndexStream {}", id), true, true);
357
56
    _append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
358
56
    _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
359
56
}
360
361
56
IndexStream::~IndexStream() {
362
    // Ensure all TabletStreams have their flush tokens properly handled before destruction.
363
    // In normal flow, close() should have called pre_close() on all tablet streams.
364
    // But if IndexStream is destroyed without close() being called (e.g., on_idle_timeout),
365
    // we need to wait for flush tasks here to ensure flush tokens are properly shut down.
366
56
    for (auto& [_, tablet_stream] : _tablet_streams_map) {
367
28
        tablet_stream->wait_for_flush_tasks();
368
28
    }
369
56
}
370
371
54
Status IndexStream::append_data(const PStreamHeader& header, butil::IOBuf* data) {
372
54
    SCOPED_TIMER(_append_data_timer);
373
54
    int64_t tablet_id = header.tablet_id();
374
54
    TabletStreamSharedPtr tablet_stream;
375
54
    {
376
54
        std::lock_guard lock_guard(_lock);
377
54
        auto it = _tablet_streams_map.find(tablet_id);
378
54
        if (it == _tablet_streams_map.end()) {
379
26
            _init_tablet_stream(tablet_stream, tablet_id, header.partition_id());
380
28
        } else {
381
28
            tablet_stream = it->second;
382
28
        }
383
54
    }
384
385
54
    return tablet_stream->append_data(header, data);
386
54
}
387
388
void IndexStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id,
389
28
                                      int64_t partition_id) {
390
28
    tablet_stream = std::make_shared<TabletStream>(_load_id, tablet_id, _txn_id, _load_stream_mgr,
391
28
                                                   _profile);
392
28
    _tablet_streams_map[tablet_id] = tablet_stream;
393
28
    auto st = tablet_stream->init(_schema, _id, partition_id);
394
28
    if (!st.ok()) {
395
2
        LOG(WARNING) << "tablet stream init failed " << *tablet_stream;
396
2
    }
397
28
}
398
399
0
void IndexStream::get_all_write_tablet_ids(std::vector<int64_t>* tablet_ids) {
400
0
    std::lock_guard lock_guard(_lock);
401
0
    for (const auto& [tablet_id, _] : _tablet_streams_map) {
402
0
        tablet_ids->push_back(tablet_id);
403
0
    }
404
0
}
405
406
void IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
407
56
                        std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) {
408
56
    std::lock_guard lock_guard(_lock);
409
56
    SCOPED_TIMER(_close_wait_timer);
410
    // open all need commit tablets
411
68
    for (const auto& tablet : tablets_to_commit) {
412
68
        if (_id != tablet.index_id()) {
413
36
            continue;
414
36
        }
415
32
        TabletStreamSharedPtr tablet_stream;
416
32
        auto it = _tablet_streams_map.find(tablet.tablet_id());
417
32
        if (it == _tablet_streams_map.end()) {
418
2
            _init_tablet_stream(tablet_stream, tablet.tablet_id(), tablet.partition_id());
419
30
        } else {
420
30
            tablet_stream = it->second;
421
30
        }
422
32
        if (tablet.has_num_segments()) {
423
32
            tablet_stream->add_num_segments(tablet.num_segments());
424
32
        } else {
425
            // for compatibility reasons (sink from old version BE)
426
0
            tablet_stream->disable_num_segments_check();
427
0
        }
428
32
    }
429
430
56
    for (auto& [_, tablet_stream] : _tablet_streams_map) {
431
28
        tablet_stream->pre_close();
432
28
    }
433
434
56
    for (auto& [_, tablet_stream] : _tablet_streams_map) {
435
28
        auto st = tablet_stream->close();
436
28
        if (st.ok()) {
437
16
            success_tablet_ids->push_back(tablet_stream->id());
438
16
        } else {
439
12
            LOG(INFO) << "close tablet stream " << *tablet_stream << ", status=" << st;
440
12
            failed_tablets->emplace_back(tablet_stream->id(), st);
441
12
        }
442
28
    }
443
56
}
444
445
// TODO: Profile is temporary disabled, because:
446
// 1. It's not being processed by the upstream for now
447
// 2. There are some problems in _profile->to_thrift()
448
LoadStream::LoadStream(const PUniqueId& load_id, LoadStreamMgr* load_stream_mgr,
449
                       bool enable_profile)
450
28
        : _load_id(load_id), _enable_profile(false), _load_stream_mgr(load_stream_mgr) {
451
28
    g_load_stream_cnt << 1;
452
28
    _profile = std::make_unique<RuntimeProfile>("LoadStream");
453
28
    _append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
454
28
    _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
455
28
    TUniqueId load_tid = ((UniqueId)load_id).to_thrift();
456
#ifndef BE_TEST
457
    std::shared_ptr<QueryContext> query_context =
458
            ExecEnv::GetInstance()->fragment_mgr()->get_query_ctx(load_tid);
459
    if (query_context != nullptr) {
460
        _resource_ctx = query_context->resource_ctx();
461
    } else {
462
        _resource_ctx = ResourceContext::create_shared();
463
        _resource_ctx->task_controller()->set_task_id(load_tid);
464
        std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared(
465
                MemTrackerLimiter::Type::LOAD,
466
                fmt::format("(FromLoadStream)Load#Id={}", ((UniqueId)load_id).to_string()));
467
        _resource_ctx->memory_context()->set_mem_tracker(mem_tracker);
468
    }
469
#else
470
28
    _resource_ctx = ResourceContext::create_shared();
471
28
    _resource_ctx->task_controller()->set_task_id(load_tid);
472
28
    std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared(
473
28
            MemTrackerLimiter::Type::LOAD,
474
28
            fmt::format("(FromLoadStream)Load#Id={}", ((UniqueId)load_id).to_string()));
475
28
    _resource_ctx->memory_context()->set_mem_tracker(mem_tracker);
476
28
#endif
477
28
}
478
479
28
LoadStream::~LoadStream() {
480
28
    g_load_stream_cnt << -1;
481
28
    LOG(INFO) << "load stream is deconstructed " << *this;
482
28
}
483
484
28
Status LoadStream::init(const POpenLoadStreamRequest* request) {
485
28
    _txn_id = request->txn_id();
486
28
    _total_streams = static_cast<int32_t>(request->total_streams());
487
28
    _is_incremental = (_total_streams == 0);
488
489
28
    _schema = std::make_shared<OlapTableSchemaParam>();
490
28
    RETURN_IF_ERROR(_schema->init(request->schema()));
491
56
    for (auto& index : request->schema().indexes()) {
492
56
        _index_streams_map[index.id()] = std::make_shared<IndexStream>(
493
56
                _load_id, index.id(), _txn_id, _schema, _load_stream_mgr, _profile.get());
494
56
    }
495
28
    LOG(INFO) << "succeed to init load stream " << *this;
496
28
    return Status::OK();
497
28
}
498
499
bool LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
500
32
                       std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) {
501
32
    std::lock_guard<bthread::Mutex> lock_guard(_lock);
502
32
    SCOPED_TIMER(_close_wait_timer);
503
504
    // we do nothing until recv CLOSE_LOAD from all stream to ensure all data are handled before ack
505
32
    _open_streams[src_id]--;
506
32
    if (_open_streams[src_id] == 0) {
507
32
        _open_streams.erase(src_id);
508
32
    }
509
32
    _close_load_cnt++;
510
32
    LOG(INFO) << "received CLOSE_LOAD from sender " << src_id << ", remaining "
511
32
              << _total_streams - _close_load_cnt << " senders, " << *this;
512
513
32
    _tablets_to_commit.insert(_tablets_to_commit.end(), tablets_to_commit.begin(),
514
32
                              tablets_to_commit.end());
515
516
32
    if (_close_load_cnt < _total_streams) {
517
        // do not return commit info if there is remaining streams.
518
4
        return false;
519
4
    }
520
521
56
    for (auto& [_, index_stream] : _index_streams_map) {
522
56
        index_stream->close(_tablets_to_commit, success_tablet_ids, failed_tablets);
523
56
    }
524
28
    LOG(INFO) << "close load " << *this << ", success_tablet_num=" << success_tablet_ids->size()
525
28
              << ", failed_tablet_num=" << failed_tablets->size();
526
28
    return true;
527
32
}
528
529
void LoadStream::_report_result(StreamId stream, const Status& status,
530
                                const std::vector<int64_t>& success_tablet_ids,
531
72
                                const FailedTablets& failed_tablets, bool eos) {
532
72
    LOG(INFO) << "report result " << *this << ", success tablet num " << success_tablet_ids.size()
533
72
              << ", failed tablet num " << failed_tablets.size();
534
72
    butil::IOBuf buf;
535
72
    PLoadStreamResponse response;
536
72
    response.set_eos(eos);
537
72
    status.to_protobuf(response.mutable_status());
538
72
    for (auto& id : success_tablet_ids) {
539
16
        response.add_success_tablet_ids(id);
540
16
    }
541
72
    for (auto& [id, st] : failed_tablets) {
542
20
        auto pb = response.add_failed_tablets();
543
20
        pb->set_id(id);
544
20
        st.to_protobuf(pb->mutable_status());
545
20
    }
546
547
72
    if (_enable_profile && _close_load_cnt == _total_streams) {
548
0
        TRuntimeProfileTree tprofile;
549
0
        ThriftSerializer ser(false, 4096);
550
0
        uint8_t* profile_buf = nullptr;
551
0
        uint32_t len = 0;
552
0
        std::unique_lock<bthread::Mutex> l(_lock);
553
554
0
        _profile->to_thrift(&tprofile);
555
0
        auto st = ser.serialize(&tprofile, &len, &profile_buf);
556
0
        if (st.ok()) {
557
0
            response.set_load_stream_profile(profile_buf, len);
558
0
        } else {
559
0
            LOG(WARNING) << "TRuntimeProfileTree serialize failed, errmsg=" << st << ", " << *this;
560
0
        }
561
0
    }
562
563
72
    buf.append(response.SerializeAsString());
564
72
    auto wst = _write_stream(stream, buf);
565
72
    if (!wst.ok()) {
566
0
        LOG(WARNING) << " report result failed with " << wst << ", " << *this;
567
0
    }
568
72
}
569
570
0
void LoadStream::_report_schema(StreamId stream, const PStreamHeader& hdr) {
571
0
    butil::IOBuf buf;
572
0
    PLoadStreamResponse response;
573
0
    Status st = Status::OK();
574
0
    for (const auto& req : hdr.tablets()) {
575
0
        BaseTabletSPtr tablet;
576
0
        if (auto res = ExecEnv::get_tablet(req.tablet_id()); res.has_value()) {
577
0
            tablet = std::move(res).value();
578
0
        } else {
579
0
            st = std::move(res).error();
580
0
            break;
581
0
        }
582
0
        auto* resp = response.add_tablet_schemas();
583
0
        resp->set_index_id(req.index_id());
584
0
        resp->set_enable_unique_key_merge_on_write(tablet->enable_unique_key_merge_on_write());
585
0
        tablet->tablet_schema()->to_schema_pb(resp->mutable_tablet_schema());
586
0
    }
587
0
    st.to_protobuf(response.mutable_status());
588
589
0
    buf.append(response.SerializeAsString());
590
0
    auto wst = _write_stream(stream, buf);
591
0
    if (!wst.ok()) {
592
0
        LOG(WARNING) << " report result failed with " << wst << ", " << *this;
593
0
    }
594
0
}
595
596
0
void LoadStream::_report_tablet_load_info(StreamId stream, int64_t index_id) {
597
0
    std::vector<int64_t> write_tablet_ids;
598
0
    auto it = _index_streams_map.find(index_id);
599
0
    if (it != _index_streams_map.end()) {
600
0
        it->second->get_all_write_tablet_ids(&write_tablet_ids);
601
0
    }
602
603
0
    if (!write_tablet_ids.empty()) {
604
0
        butil::IOBuf buf;
605
0
        PLoadStreamResponse response;
606
0
        auto* tablet_load_infos = response.mutable_tablet_load_rowset_num_infos();
607
0
        _collect_tablet_load_info_from_tablets(write_tablet_ids, tablet_load_infos);
608
0
        if (tablet_load_infos->empty()) {
609
0
            return;
610
0
        }
611
0
        buf.append(response.SerializeAsString());
612
0
        auto wst = _write_stream(stream, buf);
613
0
        if (!wst.ok()) {
614
0
            LOG(WARNING) << "report tablet load info failed with " << wst << ", " << *this;
615
0
        }
616
0
    }
617
0
}
618
619
void LoadStream::_collect_tablet_load_info_from_tablets(
620
        const std::vector<int64_t>& tablet_ids,
621
0
        google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>* tablet_load_infos) {
622
0
    for (auto tablet_id : tablet_ids) {
623
0
        BaseTabletSPtr tablet;
624
0
        if (auto res = ExecEnv::get_tablet(tablet_id); res.has_value()) {
625
0
            tablet = std::move(res).value();
626
0
        } else {
627
0
            continue;
628
0
        }
629
0
        BaseDeltaWriter::collect_tablet_load_rowset_num_info(tablet.get(), tablet_load_infos);
630
0
    }
631
0
}
632
633
72
Status LoadStream::_write_stream(StreamId stream, butil::IOBuf& buf) {
634
72
    for (;;) {
635
72
        int ret = 0;
636
72
        DBUG_EXECUTE_IF("LoadStream._write_stream.EAGAIN", { ret = EAGAIN; });
637
72
        if (ret == 0) {
638
72
            ret = brpc::StreamWrite(stream, buf);
639
72
        }
640
72
        switch (ret) {
641
72
        case 0:
642
72
            return Status::OK();
643
0
        case EAGAIN: {
644
0
            const timespec time = butil::seconds_from_now(config::load_stream_eagain_wait_seconds);
645
0
            int wait_ret = brpc::StreamWait(stream, &time);
646
0
            if (wait_ret != 0) {
647
0
                return Status::InternalError("StreamWait failed, err={}", wait_ret);
648
0
            }
649
0
            break;
650
0
        }
651
0
        default:
652
0
            return Status::InternalError("StreamWrite failed, err={}", ret);
653
72
        }
654
72
    }
655
0
    return Status::OK();
656
72
}
657
658
124
void LoadStream::_parse_header(butil::IOBuf* const message, PStreamHeader& hdr) {
659
124
    butil::IOBufAsZeroCopyInputStream wrapper(*message);
660
124
    hdr.ParseFromZeroCopyStream(&wrapper);
661
124
    VLOG_DEBUG << "header parse result: " << hdr.DebugString();
662
124
}
663
664
56
Status LoadStream::_append_data(const PStreamHeader& header, butil::IOBuf* data) {
665
56
    SCOPED_TIMER(_append_data_timer);
666
56
    IndexStreamSharedPtr index_stream;
667
668
56
    int64_t index_id = header.index_id();
669
56
    DBUG_EXECUTE_IF("TabletStream._append_data.unknown_indexid",
670
56
                    { index_id = UNKNOWN_ID_FOR_TEST; });
671
56
    auto it = _index_streams_map.find(index_id);
672
56
    if (it == _index_streams_map.end()) {
673
2
        return Status::Error<ErrorCode::INVALID_ARGUMENT>("unknown index_id {}", index_id);
674
54
    } else {
675
54
        index_stream = it->second;
676
54
    }
677
678
54
    return index_stream->append_data(header, data);
679
56
}
680
681
98
int LoadStream::on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) {
682
98
    VLOG_DEBUG << "on_received_messages " << id << " " << size;
683
222
    for (size_t i = 0; i < size; ++i) {
684
248
        while (messages[i]->size() > 0) {
685
            // step 1: parse header
686
124
            size_t hdr_len = 0;
687
124
            messages[i]->cutn((void*)&hdr_len, sizeof(size_t));
688
124
            butil::IOBuf hdr_buf;
689
124
            PStreamHeader hdr;
690
124
            messages[i]->cutn(&hdr_buf, hdr_len);
691
124
            _parse_header(&hdr_buf, hdr);
692
693
            // step 2: cut data
694
124
            size_t data_len = 0;
695
124
            messages[i]->cutn((void*)&data_len, sizeof(size_t));
696
124
            butil::IOBuf data_buf;
697
124
            PStreamHeader data;
698
124
            messages[i]->cutn(&data_buf, data_len);
699
700
            // step 3: dispatch
701
124
            _dispatch(id, hdr, &data_buf);
702
124
        }
703
124
    }
704
98
    return 0;
705
98
}
706
707
124
void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* data) {
708
124
    VLOG_DEBUG << PStreamHeader_Opcode_Name(hdr.opcode()) << " from " << hdr.src_id()
709
0
               << " with tablet " << hdr.tablet_id();
710
124
    SCOPED_ATTACH_TASK(_resource_ctx);
711
    // CLOSE_LOAD message should not be fault injected,
712
    // otherwise the message will be ignored and causing close wait timeout
713
124
    if (hdr.opcode() != PStreamHeader::CLOSE_LOAD) {
714
60
        DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_loadid", {
715
60
            PStreamHeader& t_hdr = const_cast<PStreamHeader&>(hdr);
716
60
            PUniqueId* load_id = t_hdr.mutable_load_id();
717
60
            load_id->set_hi(UNKNOWN_ID_FOR_TEST);
718
60
            load_id->set_lo(UNKNOWN_ID_FOR_TEST);
719
60
        });
720
60
        DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_srcid", {
721
60
            PStreamHeader& t_hdr = const_cast<PStreamHeader&>(hdr);
722
60
            t_hdr.set_src_id(UNKNOWN_ID_FOR_TEST);
723
60
        });
724
60
    }
725
124
    if (UniqueId(hdr.load_id()) != UniqueId(_load_id)) {
726
2
        Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>(
727
2
                "invalid load id {}, expected {}", print_id(hdr.load_id()), print_id(_load_id));
728
2
        _report_failure(id, st, hdr);
729
2
        return;
730
2
    }
731
732
122
    {
733
122
        std::lock_guard lock_guard(_lock);
734
122
        if (!_open_streams.contains(hdr.src_id())) {
735
34
            Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>("no open stream from source {}",
736
34
                                                                   hdr.src_id());
737
34
            _report_failure(id, st, hdr);
738
34
            return;
739
34
        }
740
122
    }
741
742
88
    switch (hdr.opcode()) {
743
0
    case PStreamHeader::ADD_SEGMENT: {
744
0
        auto st = _append_data(hdr, data);
745
0
        if (!st.ok()) {
746
0
            _report_failure(id, st, hdr);
747
0
        } else {
748
            // Report tablet load info only on ADD_SEGMENT to reduce frequency.
749
            // ADD_SEGMENT is sent once per segment, while APPEND_DATA is sent
750
            // for every data batch. This reduces unnecessary writes and avoids
751
            // potential stream write failures when the sender is closing.
752
0
            _report_tablet_load_info(id, hdr.index_id());
753
0
        }
754
0
    } break;
755
56
    case PStreamHeader::APPEND_DATA: {
756
56
        auto st = _append_data(hdr, data);
757
56
        if (!st.ok()) {
758
4
            _report_failure(id, st, hdr);
759
4
        }
760
56
    } break;
761
32
    case PStreamHeader::CLOSE_LOAD: {
762
32
        DBUG_EXECUTE_IF("LoadStream.close_load.block", DBUG_BLOCK);
763
32
        std::vector<int64_t> success_tablet_ids;
764
32
        FailedTablets failed_tablets;
765
32
        std::vector<PTabletID> tablets_to_commit(hdr.tablets().begin(), hdr.tablets().end());
766
32
        bool all_closed =
767
32
                close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets);
768
32
        _report_result(id, Status::OK(), success_tablet_ids, failed_tablets, true);
769
32
        std::lock_guard<bthread::Mutex> lock_guard(_lock);
770
        // if incremental stream, we need to wait for all non-incremental streams to be closed
771
        // before closing incremental streams. We need a fencing mechanism to avoid use after closing
772
        // across different be.
773
32
        if (hdr.has_num_incremental_streams() && hdr.num_incremental_streams() > 0) {
774
0
            _closing_stream_ids.push_back(id);
775
32
        } else {
776
32
            brpc::StreamClose(id);
777
32
        }
778
779
32
        if (all_closed) {
780
28
            for (auto& closing_id : _closing_stream_ids) {
781
0
                brpc::StreamClose(closing_id);
782
0
            }
783
28
            _closing_stream_ids.clear();
784
28
        }
785
32
    } break;
786
0
    case PStreamHeader::GET_SCHEMA: {
787
0
        _report_schema(id, hdr);
788
0
    } break;
789
0
    default:
790
0
        LOG(WARNING) << "unexpected stream message " << hdr.opcode() << ", " << *this;
791
0
        DCHECK(false);
792
88
    }
793
88
}
794
795
0
void LoadStream::on_idle_timeout(StreamId id) {
796
0
    LOG(WARNING) << "closing load stream on idle timeout, " << *this;
797
0
    brpc::StreamClose(id);
798
0
}
799
800
32
void LoadStream::on_closed(StreamId id) {
801
    // `this` may be freed by other threads after increasing `_close_rpc_cnt`,
802
    // format string first to prevent use-after-free
803
32
    std::stringstream ss;
804
32
    ss << *this;
805
32
    auto remaining_streams = _total_streams - _close_rpc_cnt.fetch_add(1) - 1;
806
32
    LOG(INFO) << "stream " << id << " on_closed, remaining streams = " << remaining_streams << ", "
807
32
              << ss.str();
808
32
    if (remaining_streams == 0) {
809
28
        _load_stream_mgr->clear_load(_load_id);
810
28
    }
811
32
}
812
813
220
inline std::ostream& operator<<(std::ostream& ostr, const LoadStream& load_stream) {
814
220
    ostr << "load_id=" << print_id(load_stream._load_id) << ", txn_id=" << load_stream._txn_id;
815
220
    return ostr;
816
220
}
817
818
#include "common/compile_check_end.h"
819
} // namespace doris