Coverage Report

Created: 2026-01-30 04:41

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/be/src/runtime/load_stream.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <bthread/mutex.h>
21
#include <gen_cpp/olap_common.pb.h>
22
23
#include <memory>
24
#include <mutex>
25
#include <unordered_map>
26
#include <utility>
27
28
#include "brpc/stream.h"
29
#include "butil/iobuf.h"
30
#include "common/compiler_util.h" // IWYU pragma: keep
31
#include "common/status.h"
32
#include "runtime/load_stream_writer.h"
33
#include "runtime/workload_management/resource_context.h"
34
#include "util/runtime_profile.h"
35
36
namespace doris {
37
38
class LoadStreamMgr;
39
class ThreadPoolToken;
40
class OlapTableSchemaParam;
41
42
// origin_segid(index) -> new_segid(value in vector)
43
using SegIdMapping = std::vector<uint32_t>;
44
using FailedTablets = std::vector<std::pair<int64_t, Status>>;
45
class TabletStream {
46
public:
47
    TabletStream(const PUniqueId& load_id, int64_t id, int64_t txn_id,
48
                 LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile);
49
50
    Status init(std::shared_ptr<OlapTableSchemaParam> schema, int64_t index_id,
51
                int64_t partition_id);
52
53
    Status append_data(const PStreamHeader& header, butil::IOBuf* data);
54
    Status add_segment(const PStreamHeader& header, butil::IOBuf* data);
55
16
    void add_num_segments(int64_t num_segments) { _num_segments += num_segments; }
56
0
    void disable_num_segments_check() { _check_num_segments = false; }
57
    // Wait for all pending flush tasks to complete and shut down the flush token.
58
    // Safe to call multiple times.
59
    void wait_for_flush_tasks();
60
    void pre_close();
61
    Status close();
62
14
    int64_t id() const { return _id; }
63
64
    friend std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream);
65
66
private:
67
    Status _run_in_heavy_work_pool(std::function<Status()> fn);
68
69
    int64_t _id;
70
    LoadStreamWriterSharedPtr _load_stream_writer;
71
    std::unique_ptr<ThreadPoolToken> _flush_token;
72
    std::unordered_map<int64_t, std::unique_ptr<SegIdMapping>> _segids_mapping;
73
    std::atomic<uint32_t> _next_segid;
74
    int64_t _num_segments = 0;
75
    bool _check_num_segments = true;
76
    bool _flush_tasks_done = false;
77
    bthread::Mutex _lock;
78
    AtomicStatus _status;
79
    PUniqueId _load_id;
80
    int64_t _txn_id;
81
    RuntimeProfile* _profile = nullptr;
82
    RuntimeProfile::Counter* _append_data_timer = nullptr;
83
    RuntimeProfile::Counter* _add_segment_timer = nullptr;
84
    RuntimeProfile::Counter* _close_wait_timer = nullptr;
85
    LoadStreamMgr* _load_stream_mgr = nullptr;
86
};
87
88
using TabletStreamSharedPtr = std::shared_ptr<TabletStream>;
89
90
class IndexStream {
91
public:
92
    IndexStream(const PUniqueId& load_id, int64_t id, int64_t txn_id,
93
                std::shared_ptr<OlapTableSchemaParam> schema, LoadStreamMgr* load_stream_mgr,
94
                RuntimeProfile* profile);
95
    ~IndexStream();
96
97
    Status append_data(const PStreamHeader& header, butil::IOBuf* data);
98
99
    void close(const std::vector<PTabletID>& tablets_to_commit,
100
               std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablet_ids);
101
102
    void get_all_write_tablet_ids(std::vector<int64_t>* tablet_ids);
103
104
private:
105
    void _init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id,
106
                             int64_t partition_id);
107
108
private:
109
    int64_t _id;
110
    std::unordered_map<int64_t /*tabletid*/, TabletStreamSharedPtr> _tablet_streams_map;
111
    bthread::Mutex _lock;
112
    PUniqueId _load_id;
113
    int64_t _txn_id;
114
    std::shared_ptr<OlapTableSchemaParam> _schema;
115
    std::unordered_map<int64_t, int64_t> _tablet_partitions;
116
    RuntimeProfile* _profile = nullptr;
117
    RuntimeProfile::Counter* _append_data_timer = nullptr;
118
    RuntimeProfile::Counter* _close_wait_timer = nullptr;
119
    LoadStreamMgr* _load_stream_mgr = nullptr;
120
};
121
using IndexStreamSharedPtr = std::shared_ptr<IndexStream>;
122
123
using StreamId = brpc::StreamId;
124
class LoadStream : public brpc::StreamInputHandler {
125
public:
126
    LoadStream(const PUniqueId& load_id, LoadStreamMgr* load_stream_mgr, bool enable_profile);
127
    ~LoadStream() override;
128
129
    Status init(const POpenLoadStreamRequest* request);
130
131
16
    void add_source(int64_t src_id) {
132
16
        std::lock_guard lock_guard(_lock);
133
16
        _open_streams[src_id]++;
134
16
        if (_is_incremental) {
135
0
            _total_streams++;
136
0
        }
137
16
    }
138
139
    // return true if all streams are closed, otherwise return false
140
    bool close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
141
               std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablet_ids);
142
143
    // callbacks called by brpc
144
    int on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) override;
145
    void on_idle_timeout(StreamId id) override;
146
    void on_closed(StreamId id) override;
147
148
    friend std::ostream& operator<<(std::ostream& ostr, const LoadStream& load_stream);
149
150
private:
151
    void _parse_header(butil::IOBuf* const message, PStreamHeader& hdr);
152
    void _dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* data);
153
    Status _append_data(const PStreamHeader& header, butil::IOBuf* data);
154
155
    void _report_result(StreamId stream, const Status& status,
156
                        const std::vector<int64_t>& success_tablet_ids,
157
                        const FailedTablets& failed_tablets, bool eos);
158
    void _report_schema(StreamId stream, const PStreamHeader& hdr);
159
    void _report_tablet_load_info(StreamId stream, int64_t index_id);
160
    void _collect_tablet_load_info_from_tablets(
161
            const std::vector<int64_t>& tablet_ids,
162
            google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>* tablet_load_infos);
163
164
    // report failure for one message
165
20
    void _report_failure(StreamId stream, const Status& status, const PStreamHeader& header) {
166
20
        FailedTablets failed_tablets;
167
20
        if (header.has_tablet_id()) {
168
4
            failed_tablets.emplace_back(header.tablet_id(), status);
169
4
        }
170
20
        _report_result(stream, status, {}, failed_tablets, false);
171
20
    }
172
173
    Status _write_stream(StreamId stream, butil::IOBuf& buf);
174
175
private:
176
    PUniqueId _load_id;
177
    std::unordered_map<int64_t, IndexStreamSharedPtr> _index_streams_map;
178
    int32_t _total_streams = 0;
179
    int32_t _close_load_cnt = 0;
180
    std::atomic<int32_t> _close_rpc_cnt = 0;
181
    std::vector<PTabletID> _tablets_to_commit;
182
    bthread::Mutex _lock;
183
    std::unordered_map<int64_t, int32_t> _open_streams;
184
    int64_t _txn_id = 0;
185
    std::shared_ptr<OlapTableSchemaParam> _schema;
186
    bool _enable_profile = false;
187
    std::unique_ptr<RuntimeProfile> _profile;
188
    RuntimeProfile::Counter* _append_data_timer = nullptr;
189
    RuntimeProfile::Counter* _close_wait_timer = nullptr;
190
    LoadStreamMgr* _load_stream_mgr = nullptr;
191
    std::shared_ptr<ResourceContext> _resource_ctx;
192
    std::vector<int64_t> _closing_stream_ids;
193
    bool _is_incremental = false;
194
};
195
196
using LoadStreamPtr = std::unique_ptr<LoadStream>;
197
198
} // namespace doris