Coverage Report

Created: 2026-04-14 07:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/channel/load_stream.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/channel/load_stream.h"
19
20
#include <brpc/stream.h>
21
#include <bthread/bthread.h>
22
#include <bthread/condition_variable.h>
23
#include <bthread/mutex.h>
24
25
#include <memory>
26
#include <sstream>
27
28
#include "bvar/bvar.h"
29
#include "cloud/config.h"
30
#include "common/signal_handler.h"
31
#include "load/channel/load_channel.h"
32
#include "load/channel/load_stream_mgr.h"
33
#include "load/channel/load_stream_writer.h"
34
#include "load/delta_writer/delta_writer.h"
35
#include "runtime/exec_env.h"
36
#include "runtime/fragment_mgr.h"
37
#include "runtime/runtime_profile.h"
38
#include "runtime/workload_group/workload_group_manager.h"
39
#include "storage/rowset/rowset_factory.h"
40
#include "storage/rowset/rowset_meta.h"
41
#include "storage/storage_engine.h"
42
#include "storage/tablet/tablet.h"
43
#include "storage/tablet/tablet_fwd.h"
44
#include "storage/tablet/tablet_manager.h"
45
#include "storage/tablet/tablet_schema.h"
46
#include "storage/tablet_info.h"
47
#include "util/debug_points.h"
48
#include "util/thrift_util.h"
49
#include "util/uid_util.h"
50
51
#define UNKNOWN_ID_FOR_TEST 0x7c00
52
53
namespace doris {
54
55
bvar::Adder<int64_t> g_load_stream_cnt("load_stream_count");
56
bvar::LatencyRecorder g_load_stream_flush_wait_ms("load_stream_flush_wait_ms");
57
bvar::Adder<int> g_load_stream_flush_running_threads("load_stream_flush_wait_threads");
58
59
TabletStream::TabletStream(const PUniqueId& load_id, int64_t id, int64_t txn_id,
60
                           LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile)
61
13.5k
        : _id(id),
62
13.5k
          _next_segid(0),
63
13.5k
          _load_id(load_id),
64
13.5k
          _txn_id(txn_id),
65
13.5k
          _load_stream_mgr(load_stream_mgr) {
66
13.5k
    load_stream_mgr->create_token(_flush_token);
67
13.5k
    _profile = profile->create_child(fmt::format("TabletStream {}", id), true, true);
68
13.5k
    _append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
69
13.5k
    _add_segment_timer = ADD_TIMER(_profile, "AddSegmentTime");
70
13.5k
    _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
71
13.5k
}
72
73
10
inline std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream) {
74
10
    ostr << "load_id=" << print_id(tablet_stream._load_id) << ", txn_id=" << tablet_stream._txn_id
75
10
         << ", tablet_id=" << tablet_stream._id << ", status=" << tablet_stream._status.status();
76
10
    return ostr;
77
10
}
78
79
Status TabletStream::init(std::shared_ptr<OlapTableSchemaParam> schema, int64_t index_id,
80
13.5k
                          int64_t partition_id) {
81
13.5k
    WriteRequest req {
82
13.5k
            .tablet_id = _id,
83
13.5k
            .txn_id = _txn_id,
84
13.5k
            .index_id = index_id,
85
13.5k
            .partition_id = partition_id,
86
13.5k
            .load_id = _load_id,
87
13.5k
            .table_schema_param = schema,
88
            // TODO(plat1ko): write_file_cache
89
13.5k
            .storage_vault_id {},
90
13.5k
    };
91
92
13.5k
    _load_stream_writer = std::make_shared<LoadStreamWriter>(&req, _profile);
93
13.5k
    DBUG_EXECUTE_IF("TabletStream.init.uninited_writer", {
94
13.5k
        _status.update(Status::Uninitialized("fault injection"));
95
13.5k
        return _status.status();
96
13.5k
    });
97
13.5k
    _status.update(_load_stream_writer->init());
98
13.5k
    if (!_status.ok()) {
99
1
        LOG(INFO) << "failed to init rowset builder due to " << *this;
100
1
    }
101
13.5k
    return _status.status();
102
13.5k
}
103
104
110k
Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data) {
105
110k
    if (!_status.ok()) {
106
1
        return _status.status();
107
1
    }
108
109
    // dispatch add_segment request
110
110k
    if (header.opcode() == PStreamHeader::ADD_SEGMENT) {
111
3.71k
        return add_segment(header, data);
112
3.71k
    }
113
114
106k
    SCOPED_TIMER(_append_data_timer);
115
116
106k
    int64_t src_id = header.src_id();
117
106k
    uint32_t segid = header.segment_id();
118
    // Ensure there are enough space and mapping are built.
119
106k
    SegIdMapping* mapping = nullptr;
120
106k
    {
121
106k
        std::lock_guard lock_guard(_lock);
122
106k
        if (!_segids_mapping.contains(src_id)) {
123
3.72k
            _segids_mapping[src_id] = std::make_unique<SegIdMapping>();
124
3.72k
        }
125
106k
        mapping = _segids_mapping[src_id].get();
126
106k
    }
127
106k
    if (segid + 1 > mapping->size()) {
128
        // TODO: Each sender lock is enough.
129
3.72k
        std::lock_guard lock_guard(_lock);
130
3.72k
        ssize_t origin_size = mapping->size();
131
3.72k
        if (segid + 1 > origin_size) {
132
3.72k
            mapping->resize(segid + 1, std::numeric_limits<uint32_t>::max());
133
7.46k
            for (size_t index = origin_size; index <= segid; index++) {
134
3.73k
                mapping->at(index) = _next_segid;
135
3.73k
                _next_segid++;
136
3.73k
                VLOG_DEBUG << "src_id=" << src_id << ", segid=" << index << " to "
137
0
                           << " segid=" << _next_segid - 1 << ", " << *this;
138
3.73k
            }
139
3.72k
        }
140
3.72k
    }
141
142
    // Each sender sends data in one segment sequential, so we also do not
143
    // need a lock here.
144
106k
    bool eos = header.segment_eos();
145
106k
    FileType file_type = header.file_type();
146
106k
    uint32_t new_segid = mapping->at(segid);
147
106k
    DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
148
106k
    butil::IOBuf buf = data->movable();
149
106k
    auto flush_func = [this, new_segid, eos, buf, header, file_type]() mutable {
150
106k
        signal::set_signal_task_id(_load_id);
151
106k
        g_load_stream_flush_running_threads << -1;
152
106k
        auto st = _load_stream_writer->append_data(new_segid, header.offset(), buf, file_type);
153
106k
        if (!st.ok() && !config::is_cloud_mode()) {
154
1
            auto res = ExecEnv::get_tablet(_id);
155
1
            TabletSharedPtr tablet =
156
1
                    res.has_value() ? std::dynamic_pointer_cast<Tablet>(res.value()) : nullptr;
157
1
            if (tablet) {
158
1
                tablet->report_error(st);
159
1
            }
160
1
        }
161
106k
        if (eos && st.ok()) {
162
3.73k
            DBUG_EXECUTE_IF("TabletStream.append_data.unknown_file_type",
163
3.73k
                            { file_type = static_cast<FileType>(-1); });
164
3.73k
            if (file_type == FileType::SEGMENT_FILE || file_type == FileType::INVERTED_INDEX_FILE) {
165
3.73k
                st = _load_stream_writer->close_writer(new_segid, file_type);
166
3.73k
            } else {
167
0
                st = Status::InternalError(
168
0
                        "appent data failed, file type error, file type = {}, "
169
0
                        "segment_id={}",
170
0
                        file_type, new_segid);
171
0
            }
172
3.73k
        }
173
106k
        DBUG_EXECUTE_IF("TabletStream.append_data.append_failed",
174
106k
                        { st = Status::InternalError("fault injection"); });
175
106k
        if (!st.ok()) {
176
2
            _status.update(st);
177
2
            LOG(WARNING) << "write data failed " << st << ", " << *this;
178
2
        }
179
106k
    };
180
106k
    auto load_stream_flush_token_max_tasks = config::load_stream_flush_token_max_tasks;
181
106k
    auto load_stream_max_wait_flush_token_time_ms =
182
106k
            config::load_stream_max_wait_flush_token_time_ms;
183
106k
    DBUG_EXECUTE_IF("TabletStream.append_data.long_wait", {
184
106k
        load_stream_flush_token_max_tasks = 0;
185
106k
        load_stream_max_wait_flush_token_time_ms = 1000;
186
106k
    });
187
106k
    MonotonicStopWatch timer;
188
106k
    timer.start();
189
107k
    while (_flush_token->num_tasks() >= load_stream_flush_token_max_tasks) {
190
420
        if (timer.elapsed_time() / 1000 / 1000 >= load_stream_max_wait_flush_token_time_ms) {
191
0
            _status.update(
192
0
                    Status::Error<true>("wait flush token back pressure time is more than "
193
0
                                        "load_stream_max_wait_flush_token_time {}",
194
0
                                        load_stream_max_wait_flush_token_time_ms));
195
0
            return _status.status();
196
0
        }
197
420
        bthread_usleep(2 * 1000); // 2ms
198
420
    }
199
106k
    timer.stop();
200
106k
    int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
201
106k
    g_load_stream_flush_wait_ms << time_ms;
202
106k
    g_load_stream_flush_running_threads << 1;
203
106k
    Status st = Status::OK();
204
106k
    DBUG_EXECUTE_IF("TabletStream.append_data.submit_func_failed",
205
106k
                    { st = Status::InternalError("fault injection"); });
206
106k
    if (st.ok()) {
207
106k
        st = _flush_token->submit_func(flush_func);
208
106k
    }
209
106k
    if (!st.ok()) {
210
0
        _status.update(st);
211
0
    }
212
106k
    return _status.status();
213
106k
}
214
215
3.71k
Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data) {
216
3.71k
    if (!_status.ok()) {
217
0
        return _status.status();
218
0
    }
219
220
3.71k
    SCOPED_TIMER(_add_segment_timer);
221
3.71k
    DCHECK(header.has_segment_statistics());
222
3.71k
    SegmentStatistics stat(header.segment_statistics());
223
224
3.71k
    int64_t src_id = header.src_id();
225
3.71k
    uint32_t segid = header.segment_id();
226
3.71k
    uint32_t new_segid;
227
3.71k
    DBUG_EXECUTE_IF("TabletStream.add_segment.unknown_segid", { segid = UNKNOWN_ID_FOR_TEST; });
228
3.71k
    {
229
3.71k
        std::lock_guard lock_guard(_lock);
230
3.71k
        if (!_segids_mapping.contains(src_id)) {
231
0
            _status.update(Status::InternalError(
232
0
                    "add segment failed, no segment written by this src be yet, src_id={}, "
233
0
                    "segment_id={}",
234
0
                    src_id, segid));
235
0
            return _status.status();
236
0
        }
237
3.71k
        DBUG_EXECUTE_IF("TabletStream.add_segment.segid_never_written",
238
3.71k
                        { segid = static_cast<uint32_t>(_segids_mapping[src_id]->size()); });
239
3.71k
        if (segid >= _segids_mapping[src_id]->size()) {
240
0
            _status.update(Status::InternalError(
241
0
                    "add segment failed, segment is never written, src_id={}, segment_id={}",
242
0
                    src_id, segid));
243
0
            return _status.status();
244
0
        }
245
3.71k
        new_segid = _segids_mapping[src_id]->at(segid);
246
3.71k
    }
247
3.71k
    DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
248
249
3.71k
    auto add_segment_func = [this, new_segid, stat]() {
250
3.71k
        signal::set_signal_task_id(_load_id);
251
3.71k
        auto st = _load_stream_writer->add_segment(new_segid, stat);
252
3.71k
        DBUG_EXECUTE_IF("TabletStream.add_segment.add_segment_failed",
253
3.71k
                        { st = Status::InternalError("fault injection"); });
254
3.71k
        if (!st.ok()) {
255
0
            _status.update(st);
256
0
            LOG(INFO) << "add segment failed " << *this;
257
0
        }
258
3.71k
    };
259
3.71k
    Status st = Status::OK();
260
3.71k
    DBUG_EXECUTE_IF("TabletStream.add_segment.submit_func_failed",
261
3.71k
                    { st = Status::InternalError("fault injection"); });
262
3.71k
    if (st.ok()) {
263
3.71k
        st = _flush_token->submit_func(add_segment_func);
264
3.71k
    }
265
3.71k
    if (!st.ok()) {
266
0
        _status.update(st);
267
0
    }
268
3.71k
    return _status.status();
269
3.71k
}
270
271
40.4k
Status TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) {
272
40.4k
    bthread::Mutex mu;
273
40.4k
    std::unique_lock<bthread::Mutex> lock(mu);
274
40.4k
    bthread::ConditionVariable cv;
275
40.4k
    auto st = Status::OK();
276
40.4k
    auto func = [this, &mu, &cv, &st, &fn] {
277
40.4k
        signal::set_signal_task_id(_load_id);
278
40.4k
        st = fn();
279
40.4k
        std::lock_guard<bthread::Mutex> lock(mu);
280
40.4k
        cv.notify_one();
281
40.4k
    };
282
40.4k
    bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(func);
283
40.4k
    if (!ret) {
284
0
        return Status::Error<ErrorCode::INTERNAL_ERROR>(
285
0
                "there is not enough thread resource for close load");
286
0
    }
287
40.4k
    cv.wait(lock);
288
40.4k
    return st;
289
40.4k
}
290
291
27.0k
void TabletStream::wait_for_flush_tasks() {
292
27.0k
    {
293
27.0k
        std::lock_guard lock_guard(_lock);
294
27.0k
        if (_flush_tasks_done) {
295
13.5k
            return;
296
13.5k
        }
297
13.5k
        _flush_tasks_done = true;
298
13.5k
    }
299
300
13.5k
    if (!_status.ok()) {
301
2
        _flush_token->shutdown();
302
2
        return;
303
2
    }
304
305
    // Use heavy_work_pool to avoid blocking bthread
306
13.5k
    auto st = _run_in_heavy_work_pool([this]() {
307
13.5k
        _flush_token->wait();
308
13.5k
        return Status::OK();
309
13.5k
    });
310
13.5k
    if (!st.ok()) {
311
        // If heavy_work_pool is unavailable, fall back to shutdown
312
        // which will cancel pending tasks and wait for running tasks
313
0
        _flush_token->shutdown();
314
0
        _status.update(st);
315
0
    }
316
13.5k
}
317
318
13.5k
void TabletStream::pre_close() {
319
13.5k
    SCOPED_TIMER(_close_wait_timer);
320
13.5k
    wait_for_flush_tasks();
321
322
13.5k
    if (!_status.ok()) {
323
3
        return;
324
3
    }
325
326
13.4k
    DBUG_EXECUTE_IF("TabletStream.close.segment_num_mismatch", { _num_segments++; });
327
13.4k
    if (_check_num_segments && (_next_segid.load() != _num_segments)) {
328
2
        _status.update(Status::Corruption(
329
2
                "segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id,
330
2
                _num_segments, _next_segid.load(), print_id(_load_id)));
331
2
        return;
332
2
    }
333
334
13.4k
    _status.update(_run_in_heavy_work_pool([this]() { return _load_stream_writer->pre_close(); }));
335
13.4k
}
336
337
13.5k
Status TabletStream::close() {
338
13.5k
    if (!_status.ok()) {
339
6
        return _status.status();
340
6
    }
341
342
13.4k
    SCOPED_TIMER(_close_wait_timer);
343
13.4k
    _status.update(_run_in_heavy_work_pool([this]() { return _load_stream_writer->close(); }));
344
13.4k
    return _status.status();
345
13.5k
}
346
347
IndexStream::IndexStream(const PUniqueId& load_id, int64_t id, int64_t txn_id,
348
                         std::shared_ptr<OlapTableSchemaParam> schema,
349
                         LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile)
350
2.10k
        : _id(id),
351
2.10k
          _load_id(load_id),
352
2.10k
          _txn_id(txn_id),
353
2.10k
          _schema(schema),
354
2.10k
          _load_stream_mgr(load_stream_mgr) {
355
2.10k
    _profile = profile->create_child(fmt::format("IndexStream {}", id), true, true);
356
2.10k
    _append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
357
2.10k
    _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
358
2.10k
}
359
360
2.10k
IndexStream::~IndexStream() {
361
    // Ensure all TabletStreams have their flush tokens properly handled before destruction.
362
    // In normal flow, close() should have called pre_close() on all tablet streams.
363
    // But if IndexStream is destroyed without close() being called (e.g., on_idle_timeout),
364
    // we need to wait for flush tasks here to ensure flush tokens are properly shut down.
365
13.5k
    for (auto& [_, tablet_stream] : _tablet_streams_map) {
366
13.5k
        tablet_stream->wait_for_flush_tasks();
367
13.5k
    }
368
2.10k
}
369
370
110k
Status IndexStream::append_data(const PStreamHeader& header, butil::IOBuf* data) {
371
110k
    SCOPED_TIMER(_append_data_timer);
372
110k
    int64_t tablet_id = header.tablet_id();
373
110k
    TabletStreamSharedPtr tablet_stream;
374
110k
    {
375
110k
        std::lock_guard lock_guard(_lock);
376
110k
        auto it = _tablet_streams_map.find(tablet_id);
377
110k
        if (it == _tablet_streams_map.end()) {
378
3.72k
            _init_tablet_stream(tablet_stream, tablet_id, header.partition_id());
379
106k
        } else {
380
106k
            tablet_stream = it->second;
381
106k
        }
382
110k
    }
383
384
110k
    return tablet_stream->append_data(header, data);
385
110k
}
386
387
void IndexStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id,
388
13.5k
                                      int64_t partition_id) {
389
13.5k
    tablet_stream = std::make_shared<TabletStream>(_load_id, tablet_id, _txn_id, _load_stream_mgr,
390
13.5k
                                                   _profile);
391
13.5k
    _tablet_streams_map[tablet_id] = tablet_stream;
392
13.5k
    auto st = tablet_stream->init(_schema, _id, partition_id);
393
13.5k
    if (!st.ok()) {
394
1
        LOG(WARNING) << "tablet stream init failed " << *tablet_stream;
395
1
    }
396
13.5k
}
397
398
3.71k
void IndexStream::get_all_write_tablet_ids(std::vector<int64_t>* tablet_ids) {
399
3.71k
    std::lock_guard lock_guard(_lock);
400
12.1k
    for (const auto& [tablet_id, _] : _tablet_streams_map) {
401
12.1k
        tablet_ids->push_back(tablet_id);
402
12.1k
    }
403
3.71k
}
404
405
void IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
406
2.09k
                        std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) {
407
2.09k
    std::lock_guard lock_guard(_lock);
408
2.09k
    SCOPED_TIMER(_close_wait_timer);
409
    // open all need commit tablets
410
13.5k
    for (const auto& tablet : tablets_to_commit) {
411
13.5k
        if (_id != tablet.index_id()) {
412
18
            continue;
413
18
        }
414
13.5k
        TabletStreamSharedPtr tablet_stream;
415
13.5k
        auto it = _tablet_streams_map.find(tablet.tablet_id());
416
13.5k
        if (it == _tablet_streams_map.end()) {
417
9.77k
            _init_tablet_stream(tablet_stream, tablet.tablet_id(), tablet.partition_id());
418
9.77k
        } else {
419
3.72k
            tablet_stream = it->second;
420
3.72k
        }
421
13.5k
        if (tablet.has_num_segments()) {
422
13.5k
            tablet_stream->add_num_segments(tablet.num_segments());
423
13.5k
        } else {
424
            // for compatibility reasons (sink from old version BE)
425
0
            tablet_stream->disable_num_segments_check();
426
0
        }
427
13.5k
    }
428
429
13.5k
    for (auto& [_, tablet_stream] : _tablet_streams_map) {
430
13.5k
        tablet_stream->pre_close();
431
13.5k
    }
432
433
13.5k
    for (auto& [_, tablet_stream] : _tablet_streams_map) {
434
13.5k
        auto st = tablet_stream->close();
435
13.5k
        if (st.ok()) {
436
13.4k
            success_tablet_ids->push_back(tablet_stream->id());
437
13.4k
        } else {
438
6
            LOG(INFO) << "close tablet stream " << *tablet_stream << ", status=" << st;
439
6
            failed_tablets->emplace_back(tablet_stream->id(), st);
440
6
        }
441
13.5k
    }
442
2.09k
}
443
444
// TODO: Profile is temporary disabled, because:
445
// 1. It's not being processed by the upstream for now
446
// 2. There are some problems in _profile->to_thrift()
447
LoadStream::LoadStream(const PUniqueId& load_id, LoadStreamMgr* load_stream_mgr,
448
                       bool enable_profile)
449
2.07k
        : _load_id(load_id), _enable_profile(false), _load_stream_mgr(load_stream_mgr) {
450
2.07k
    g_load_stream_cnt << 1;
451
2.07k
    _profile = std::make_unique<RuntimeProfile>("LoadStream");
452
2.07k
    _append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
453
2.07k
    _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
454
2.07k
    TUniqueId load_tid = ((UniqueId)load_id).to_thrift();
455
2.07k
#ifndef BE_TEST
456
2.07k
    std::shared_ptr<QueryContext> query_context =
457
2.07k
            ExecEnv::GetInstance()->fragment_mgr()->get_query_ctx(load_tid);
458
2.07k
    if (query_context != nullptr) {
459
2.07k
        _resource_ctx = query_context->resource_ctx();
460
2.07k
    } else {
461
0
        _resource_ctx = ResourceContext::create_shared();
462
0
        _resource_ctx->task_controller()->set_task_id(load_tid);
463
0
        std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared(
464
0
                MemTrackerLimiter::Type::LOAD,
465
0
                fmt::format("(FromLoadStream)Load#Id={}", ((UniqueId)load_id).to_string()));
466
0
        _resource_ctx->memory_context()->set_mem_tracker(mem_tracker);
467
0
    }
468
#else
469
    _resource_ctx = ResourceContext::create_shared();
470
    _resource_ctx->task_controller()->set_task_id(load_tid);
471
    std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared(
472
            MemTrackerLimiter::Type::LOAD,
473
            fmt::format("(FromLoadStream)Load#Id={}", ((UniqueId)load_id).to_string()));
474
    _resource_ctx->memory_context()->set_mem_tracker(mem_tracker);
475
#endif
476
2.07k
}
477
478
2.08k
LoadStream::~LoadStream() {
479
2.08k
    g_load_stream_cnt << -1;
480
2.08k
    LOG(INFO) << "load stream is deconstructed " << *this;
481
2.08k
}
482
483
2.08k
Status LoadStream::init(const POpenLoadStreamRequest* request) {
484
2.08k
    _txn_id = request->txn_id();
485
2.08k
    _total_streams = static_cast<int32_t>(request->total_streams());
486
2.08k
    _is_incremental = (_total_streams == 0);
487
488
2.08k
    _schema = std::make_shared<OlapTableSchemaParam>();
489
2.08k
    RETURN_IF_ERROR(_schema->init(request->schema()));
490
2.10k
    for (auto& index : request->schema().indexes()) {
491
2.10k
        _index_streams_map[index.id()] = std::make_shared<IndexStream>(
492
2.10k
                _load_id, index.id(), _txn_id, _schema, _load_stream_mgr, _profile.get());
493
2.10k
    }
494
2.08k
    LOG(INFO) << "succeed to init load stream " << *this;
495
2.08k
    return Status::OK();
496
2.08k
}
497
498
bool LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
499
4.02k
                       std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) {
500
4.02k
    std::lock_guard<bthread::Mutex> lock_guard(_lock);
501
4.02k
    SCOPED_TIMER(_close_wait_timer);
502
503
    // we do nothing until recv CLOSE_LOAD from all stream to ensure all data are handled before ack
504
4.02k
    _open_streams[src_id]--;
505
4.02k
    if (_open_streams[src_id] == 0) {
506
2.08k
        _open_streams.erase(src_id);
507
2.08k
    }
508
4.02k
    _close_load_cnt++;
509
4.02k
    LOG(INFO) << "received CLOSE_LOAD from sender " << src_id << ", remaining "
510
4.02k
              << _total_streams - _close_load_cnt << " senders, " << *this;
511
512
4.02k
    _tablets_to_commit.insert(_tablets_to_commit.end(), tablets_to_commit.begin(),
513
4.02k
                              tablets_to_commit.end());
514
515
4.02k
    if (_close_load_cnt < _total_streams) {
516
        // do not return commit info if there is remaining streams.
517
1.94k
        return false;
518
1.94k
    }
519
520
2.09k
    for (auto& [_, index_stream] : _index_streams_map) {
521
2.09k
        index_stream->close(_tablets_to_commit, success_tablet_ids, failed_tablets);
522
2.09k
    }
523
2.08k
    LOG(INFO) << "close load " << *this << ", success_tablet_num=" << success_tablet_ids->size()
524
2.08k
              << ", failed_tablet_num=" << failed_tablets->size();
525
2.08k
    return true;
526
4.02k
}
527
528
void LoadStream::_report_result(StreamId stream, const Status& status,
529
                                const std::vector<int64_t>& success_tablet_ids,
530
4.04k
                                const FailedTablets& failed_tablets, bool eos) {
531
4.04k
    LOG(INFO) << "report result " << *this << ", success tablet num " << success_tablet_ids.size()
532
4.04k
              << ", failed tablet num " << failed_tablets.size();
533
4.04k
    butil::IOBuf buf;
534
4.04k
    PLoadStreamResponse response;
535
4.04k
    response.set_eos(eos);
536
4.04k
    status.to_protobuf(response.mutable_status());
537
13.4k
    for (auto& id : success_tablet_ids) {
538
13.4k
        response.add_success_tablet_ids(id);
539
13.4k
    }
540
4.04k
    for (auto& [id, st] : failed_tablets) {
541
10
        auto pb = response.add_failed_tablets();
542
10
        pb->set_id(id);
543
10
        st.to_protobuf(pb->mutable_status());
544
10
    }
545
546
4.04k
    if (_enable_profile && _close_load_cnt == _total_streams) {
547
0
        TRuntimeProfileTree tprofile;
548
0
        ThriftSerializer ser(false, 4096);
549
0
        uint8_t* profile_buf = nullptr;
550
0
        uint32_t len = 0;
551
0
        std::unique_lock<bthread::Mutex> l(_lock);
552
553
0
        _profile->to_thrift(&tprofile);
554
0
        auto st = ser.serialize(&tprofile, &len, &profile_buf);
555
0
        if (st.ok()) {
556
0
            response.set_load_stream_profile(profile_buf, len);
557
0
        } else {
558
0
            LOG(WARNING) << "TRuntimeProfileTree serialize failed, errmsg=" << st << ", " << *this;
559
0
        }
560
0
    }
561
562
4.04k
    buf.append(response.SerializeAsString());
563
4.04k
    auto wst = _write_stream(stream, buf);
564
4.04k
    if (!wst.ok()) {
565
0
        LOG(WARNING) << " report result failed with " << wst << ", " << *this;
566
0
    }
567
4.04k
}
568
569
4
void LoadStream::_report_schema(StreamId stream, const PStreamHeader& hdr) {
570
4
    butil::IOBuf buf;
571
4
    PLoadStreamResponse response;
572
4
    Status st = Status::OK();
573
4
    for (const auto& req : hdr.tablets()) {
574
4
        BaseTabletSPtr tablet;
575
4
        if (auto res = ExecEnv::get_tablet(req.tablet_id()); res.has_value()) {
576
4
            tablet = std::move(res).value();
577
4
        } else {
578
0
            st = std::move(res).error();
579
0
            break;
580
0
        }
581
4
        auto* resp = response.add_tablet_schemas();
582
4
        resp->set_index_id(req.index_id());
583
4
        resp->set_enable_unique_key_merge_on_write(tablet->enable_unique_key_merge_on_write());
584
4
        tablet->tablet_schema()->to_schema_pb(resp->mutable_tablet_schema());
585
4
    }
586
4
    st.to_protobuf(response.mutable_status());
587
588
4
    buf.append(response.SerializeAsString());
589
4
    auto wst = _write_stream(stream, buf);
590
4
    if (!wst.ok()) {
591
0
        LOG(WARNING) << " report result failed with " << wst << ", " << *this;
592
0
    }
593
4
}
594
595
3.71k
void LoadStream::_report_tablet_load_info(StreamId stream, int64_t index_id) {
596
3.71k
    std::vector<int64_t> write_tablet_ids;
597
3.71k
    auto it = _index_streams_map.find(index_id);
598
3.71k
    if (it != _index_streams_map.end()) {
599
3.71k
        it->second->get_all_write_tablet_ids(&write_tablet_ids);
600
3.71k
    }
601
602
3.71k
    if (!write_tablet_ids.empty()) {
603
3.71k
        butil::IOBuf buf;
604
3.71k
        PLoadStreamResponse response;
605
3.71k
        auto* tablet_load_infos = response.mutable_tablet_load_rowset_num_infos();
606
3.71k
        _collect_tablet_load_info_from_tablets(write_tablet_ids, tablet_load_infos);
607
3.71k
        if (tablet_load_infos->empty()) {
608
3.71k
            return;
609
3.71k
        }
610
0
        buf.append(response.SerializeAsString());
611
0
        auto wst = _write_stream(stream, buf);
612
0
        if (!wst.ok()) {
613
0
            LOG(WARNING) << "report tablet load info failed with " << wst << ", " << *this;
614
0
        }
615
0
    }
616
3.71k
}
617
618
void LoadStream::_collect_tablet_load_info_from_tablets(
619
        const std::vector<int64_t>& tablet_ids,
620
3.71k
        google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>* tablet_load_infos) {
621
12.1k
    for (auto tablet_id : tablet_ids) {
622
12.1k
        BaseTabletSPtr tablet;
623
12.1k
        if (auto res = ExecEnv::get_tablet(tablet_id); res.has_value()) {
624
12.1k
            tablet = std::move(res).value();
625
12.1k
        } else {
626
0
            continue;
627
0
        }
628
12.1k
        BaseDeltaWriter::collect_tablet_load_rowset_num_info(tablet.get(), tablet_load_infos);
629
12.1k
    }
630
3.71k
}
631
632
4.05k
Status LoadStream::_write_stream(StreamId stream, butil::IOBuf& buf) {
633
4.05k
    for (;;) {
634
4.05k
        int ret = 0;
635
4.05k
        DBUG_EXECUTE_IF("LoadStream._write_stream.EAGAIN", { ret = EAGAIN; });
636
4.05k
        if (ret == 0) {
637
4.05k
            ret = brpc::StreamWrite(stream, buf);
638
4.05k
        }
639
4.05k
        switch (ret) {
640
4.05k
        case 0:
641
4.05k
            return Status::OK();
642
0
        case EAGAIN: {
643
0
            const timespec time = butil::seconds_from_now(config::load_stream_eagain_wait_seconds);
644
0
            int wait_ret = brpc::StreamWait(stream, &time);
645
0
            if (wait_ret != 0) {
646
0
                return Status::InternalError("StreamWait failed, err={}", wait_ret);
647
0
            }
648
0
            break;
649
0
        }
650
0
        default:
651
0
            return Status::InternalError("StreamWrite failed, err={}", ret);
652
4.05k
        }
653
4.05k
    }
654
0
    return Status::OK();
655
4.05k
}
656
657
114k
void LoadStream::_parse_header(butil::IOBuf* const message, PStreamHeader& hdr) {
658
114k
    butil::IOBufAsZeroCopyInputStream wrapper(*message);
659
114k
    hdr.ParseFromZeroCopyStream(&wrapper);
660
18.4E
    VLOG_DEBUG << "header parse result: " << hdr.DebugString();
661
114k
}
662
663
110k
Status LoadStream::_append_data(const PStreamHeader& header, butil::IOBuf* data) {
664
110k
    SCOPED_TIMER(_append_data_timer);
665
110k
    IndexStreamSharedPtr index_stream;
666
667
110k
    int64_t index_id = header.index_id();
668
110k
    DBUG_EXECUTE_IF("TabletStream._append_data.unknown_indexid",
669
110k
                    { index_id = UNKNOWN_ID_FOR_TEST; });
670
110k
    auto it = _index_streams_map.find(index_id);
671
110k
    if (it == _index_streams_map.end()) {
672
1
        return Status::Error<ErrorCode::INVALID_ARGUMENT>("unknown index_id {}", index_id);
673
110k
    } else {
674
110k
        index_stream = it->second;
675
110k
    }
676
677
110k
    return index_stream->append_data(header, data);
678
110k
}
679
680
4.05k
int LoadStream::on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) {
681
4.05k
    VLOG_DEBUG << "on_received_messages " << id << " " << size;
682
8.10k
    for (size_t i = 0; i < size; ++i) {
683
118k
        while (messages[i]->size() > 0) {
684
            // step 1: parse header
685
114k
            size_t hdr_len = 0;
686
114k
            messages[i]->cutn((void*)&hdr_len, sizeof(size_t));
687
114k
            butil::IOBuf hdr_buf;
688
114k
            PStreamHeader hdr;
689
114k
            messages[i]->cutn(&hdr_buf, hdr_len);
690
114k
            _parse_header(&hdr_buf, hdr);
691
692
            // step 2: cut data
693
114k
            size_t data_len = 0;
694
114k
            messages[i]->cutn((void*)&data_len, sizeof(size_t));
695
114k
            butil::IOBuf data_buf;
696
114k
            PStreamHeader data;
697
114k
            messages[i]->cutn(&data_buf, data_len);
698
699
            // step 3: dispatch
700
114k
            _dispatch(id, hdr, &data_buf);
701
114k
        }
702
4.05k
    }
703
4.05k
    return 0;
704
4.05k
}
705
706
114k
void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* data) {
707
114k
    VLOG_DEBUG << PStreamHeader_Opcode_Name(hdr.opcode()) << " from " << hdr.src_id()
708
0
               << " with tablet " << hdr.tablet_id();
709
114k
    SCOPED_ATTACH_TASK(_resource_ctx);
710
    // CLOSE_LOAD message should not be fault injected,
711
    // otherwise the message will be ignored and causing close wait timeout
712
114k
    if (hdr.opcode() != PStreamHeader::CLOSE_LOAD) {
713
110k
        DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_loadid", {
714
110k
            PStreamHeader& t_hdr = const_cast<PStreamHeader&>(hdr);
715
110k
            PUniqueId* load_id = t_hdr.mutable_load_id();
716
110k
            load_id->set_hi(UNKNOWN_ID_FOR_TEST);
717
110k
            load_id->set_lo(UNKNOWN_ID_FOR_TEST);
718
110k
        });
719
110k
        DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_srcid", {
720
110k
            PStreamHeader& t_hdr = const_cast<PStreamHeader&>(hdr);
721
110k
            t_hdr.set_src_id(UNKNOWN_ID_FOR_TEST);
722
110k
        });
723
110k
    }
724
114k
    if (UniqueId(hdr.load_id()) != UniqueId(_load_id)) {
725
1
        Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>(
726
1
                "invalid load id {}, expected {}", print_id(hdr.load_id()), print_id(_load_id));
727
1
        _report_failure(id, st, hdr);
728
1
        return;
729
1
    }
730
731
114k
    {
732
114k
        std::lock_guard lock_guard(_lock);
733
114k
        if (!_open_streams.contains(hdr.src_id())) {
734
17
            Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>("no open stream from source {}",
735
17
                                                                   hdr.src_id());
736
17
            _report_failure(id, st, hdr);
737
17
            return;
738
17
        }
739
114k
    }
740
741
114k
    switch (hdr.opcode()) {
742
3.71k
    case PStreamHeader::ADD_SEGMENT: {
743
3.71k
        auto st = _append_data(hdr, data);
744
3.71k
        if (!st.ok()) {
745
0
            _report_failure(id, st, hdr);
746
3.71k
        } else {
747
            // Report tablet load info only on ADD_SEGMENT to reduce frequency.
748
            // ADD_SEGMENT is sent once per segment, while APPEND_DATA is sent
749
            // for every data batch. This reduces unnecessary writes and avoids
750
            // potential stream write failures when the sender is closing.
751
3.71k
            _report_tablet_load_info(id, hdr.index_id());
752
3.71k
        }
753
3.71k
    } break;
754
106k
    case PStreamHeader::APPEND_DATA: {
755
106k
        auto st = _append_data(hdr, data);
756
106k
        if (!st.ok()) {
757
2
            _report_failure(id, st, hdr);
758
2
        }
759
106k
    } break;
760
4.02k
    case PStreamHeader::CLOSE_LOAD: {
761
4.02k
        DBUG_EXECUTE_IF("LoadStream.close_load.block", DBUG_BLOCK);
762
4.02k
        std::vector<int64_t> success_tablet_ids;
763
4.02k
        FailedTablets failed_tablets;
764
4.02k
        std::vector<PTabletID> tablets_to_commit(hdr.tablets().begin(), hdr.tablets().end());
765
4.02k
        bool all_closed =
766
4.02k
                close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets);
767
4.02k
        _report_result(id, Status::OK(), success_tablet_ids, failed_tablets, true);
768
4.02k
        std::lock_guard<bthread::Mutex> lock_guard(_lock);
769
        // if incremental stream, we need to wait for all non-incremental streams to be closed
770
        // before closing incremental streams. We need a fencing mechanism to avoid use after closing
771
        // across different be.
772
4.02k
        if (hdr.has_num_incremental_streams() && hdr.num_incremental_streams() > 0) {
773
0
            _closing_stream_ids.push_back(id);
774
4.02k
        } else {
775
4.02k
            brpc::StreamClose(id);
776
4.02k
        }
777
778
4.02k
        if (all_closed) {
779
2.08k
            for (auto& closing_id : _closing_stream_ids) {
780
0
                brpc::StreamClose(closing_id);
781
0
            }
782
2.08k
            _closing_stream_ids.clear();
783
2.08k
        }
784
4.02k
    } break;
785
4
    case PStreamHeader::GET_SCHEMA: {
786
4
        _report_schema(id, hdr);
787
4
    } break;
788
0
    default:
789
0
        LOG(WARNING) << "unexpected stream message " << hdr.opcode() << ", " << *this;
790
0
        DCHECK(false);
791
114k
    }
792
114k
}
793
794
0
void LoadStream::on_idle_timeout(StreamId id) {
795
0
    LOG(WARNING) << "closing load stream on idle timeout, " << *this;
796
0
    brpc::StreamClose(id);
797
0
}
798
799
4.03k
void LoadStream::on_closed(StreamId id) {
800
    // `this` may be freed by other threads after increasing `_close_rpc_cnt`,
801
    // format string first to prevent use-after-free
802
4.03k
    std::stringstream ss;
803
4.03k
    ss << *this;
804
4.03k
    auto remaining_streams = _total_streams - _close_rpc_cnt.fetch_add(1) - 1;
805
4.03k
    LOG(INFO) << "stream " << id << " on_closed, remaining streams = " << remaining_streams << ", "
806
4.03k
              << ss.str();
807
4.03k
    if (remaining_streams == 0) {
808
2.08k
        _load_stream_mgr->clear_load(_load_id);
809
2.08k
    }
810
4.03k
}
811
812
18.3k
inline std::ostream& operator<<(std::ostream& ostr, const LoadStream& load_stream) {
813
18.3k
    ostr << "load_id=" << print_id(load_stream._load_id) << ", txn_id=" << load_stream._txn_id;
814
18.3k
    return ostr;
815
18.3k
}
816
817
} // namespace doris