/root/doris/be/src/runtime/load_stream.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <bthread/mutex.h> |
21 | | #include <gen_cpp/internal_service.pb.h> |
22 | | |
23 | | #include <condition_variable> |
24 | | #include <memory> |
25 | | #include <mutex> |
26 | | #include <unordered_map> |
27 | | #include <utility> |
28 | | |
29 | | #include "brpc/stream.h" |
30 | | #include "butil/iobuf.h" |
31 | | #include "common/compiler_util.h" // IWYU pragma: keep |
32 | | #include "common/status.h" |
33 | | #include "runtime/load_stream_writer.h" |
34 | | #include "util/runtime_profile.h" |
35 | | |
36 | | namespace doris { |
37 | | |
38 | | class LoadStreamMgr; |
39 | | class ThreadPoolToken; |
40 | | class OlapTableSchemaParam; |
41 | | |
42 | | // origin_segid(index) -> new_segid(value in vector) |
43 | | using SegIdMapping = std::vector<uint32_t>; |
44 | | using FailedTablets = std::vector<std::pair<int64_t, Status>>; |
45 | | class TabletStream { |
46 | | public: |
47 | | TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id, LoadStreamMgr* load_stream_mgr, |
48 | | RuntimeProfile* profile); |
49 | | |
50 | | Status init(std::shared_ptr<OlapTableSchemaParam> schema, int64_t index_id, |
51 | | int64_t partition_id); |
52 | | |
53 | | Status append_data(const PStreamHeader& header, butil::IOBuf* data); |
54 | | Status add_segment(const PStreamHeader& header, butil::IOBuf* data); |
55 | 16 | void add_num_segments(int64_t num_segments) { _num_segments += num_segments; } |
56 | 0 | void disable_num_segments_check() { _check_num_segments = false; } |
57 | | void pre_close(); |
58 | | Status close(); |
59 | 14 | int64_t id() const { return _id; } |
60 | | |
61 | | friend std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream); |
62 | | |
63 | | private: |
64 | | Status _run_in_heavy_work_pool(std::function<Status()> fn); |
65 | | |
66 | | int64_t _id; |
67 | | LoadStreamWriterSharedPtr _load_stream_writer; |
68 | | std::vector<std::unique_ptr<ThreadPoolToken>> _flush_tokens; |
69 | | std::unordered_map<int64_t, std::unique_ptr<SegIdMapping>> _segids_mapping; |
70 | | std::atomic<uint32_t> _next_segid; |
71 | | int64_t _num_segments = 0; |
72 | | bool _check_num_segments = true; |
73 | | bthread::Mutex _lock; |
74 | | AtomicStatus _status; |
75 | | PUniqueId _load_id; |
76 | | int64_t _txn_id; |
77 | | RuntimeProfile* _profile = nullptr; |
78 | | RuntimeProfile::Counter* _append_data_timer = nullptr; |
79 | | RuntimeProfile::Counter* _add_segment_timer = nullptr; |
80 | | RuntimeProfile::Counter* _close_wait_timer = nullptr; |
81 | | LoadStreamMgr* _load_stream_mgr = nullptr; |
82 | | }; |
83 | | |
84 | | using TabletStreamSharedPtr = std::shared_ptr<TabletStream>; |
85 | | |
86 | | class IndexStream { |
87 | | public: |
88 | | IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id, |
89 | | std::shared_ptr<OlapTableSchemaParam> schema, LoadStreamMgr* load_stream_mgr, |
90 | | RuntimeProfile* profile); |
91 | | |
92 | | Status append_data(const PStreamHeader& header, butil::IOBuf* data); |
93 | | |
94 | | void close(const std::vector<PTabletID>& tablets_to_commit, |
95 | | std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablet_ids); |
96 | | |
97 | | private: |
98 | | void _init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id, |
99 | | int64_t partition_id); |
100 | | |
101 | | private: |
102 | | int64_t _id; |
103 | | std::unordered_map<int64_t /*tabletid*/, TabletStreamSharedPtr> _tablet_streams_map; |
104 | | bthread::Mutex _lock; |
105 | | PUniqueId _load_id; |
106 | | int64_t _txn_id; |
107 | | std::shared_ptr<OlapTableSchemaParam> _schema; |
108 | | std::unordered_map<int64_t, int64_t> _tablet_partitions; |
109 | | RuntimeProfile* _profile = nullptr; |
110 | | RuntimeProfile::Counter* _append_data_timer = nullptr; |
111 | | RuntimeProfile::Counter* _close_wait_timer = nullptr; |
112 | | LoadStreamMgr* _load_stream_mgr = nullptr; |
113 | | }; |
114 | | using IndexStreamSharedPtr = std::shared_ptr<IndexStream>; |
115 | | |
116 | | using StreamId = brpc::StreamId; |
117 | | class LoadStream : public brpc::StreamInputHandler { |
118 | | public: |
119 | | LoadStream(PUniqueId load_id, LoadStreamMgr* load_stream_mgr, bool enable_profile); |
120 | | ~LoadStream() override; |
121 | | |
122 | | Status init(const POpenLoadStreamRequest* request); |
123 | | |
124 | 16 | void add_source(int64_t src_id) { |
125 | 16 | std::lock_guard lock_guard(_lock); |
126 | 16 | _open_streams[src_id]++; |
127 | 16 | if (_is_incremental) { |
128 | 0 | _total_streams++; |
129 | 0 | } |
130 | 16 | } |
131 | | |
132 | | void close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit, |
133 | | std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablet_ids); |
134 | | |
135 | | // callbacks called by brpc |
136 | | int on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) override; |
137 | | void on_idle_timeout(StreamId id) override; |
138 | | void on_closed(StreamId id) override; |
139 | | |
140 | | friend std::ostream& operator<<(std::ostream& ostr, const LoadStream& load_stream); |
141 | | |
142 | | private: |
143 | | void _parse_header(butil::IOBuf* const message, PStreamHeader& hdr); |
144 | | void _dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* data); |
145 | | Status _append_data(const PStreamHeader& header, butil::IOBuf* data); |
146 | | |
147 | | void _report_result(StreamId stream, const Status& status, |
148 | | const std::vector<int64_t>& success_tablet_ids, |
149 | | const FailedTablets& failed_tablets, bool eos); |
150 | | void _report_schema(StreamId stream, const PStreamHeader& hdr); |
151 | | |
152 | | // report failure for one message |
153 | 20 | void _report_failure(StreamId stream, const Status& status, const PStreamHeader& header) { |
154 | 20 | FailedTablets failed_tablets; |
155 | 20 | if (header.has_tablet_id()) { |
156 | 4 | failed_tablets.emplace_back(header.tablet_id(), status); |
157 | 4 | } |
158 | 20 | _report_result(stream, status, {}, failed_tablets, false); |
159 | 20 | } |
160 | | |
161 | | Status _write_stream(StreamId stream, butil::IOBuf& buf); |
162 | | |
163 | | private: |
164 | | PUniqueId _load_id; |
165 | | std::unordered_map<int64_t, IndexStreamSharedPtr> _index_streams_map; |
166 | | int32_t _total_streams = 0; |
167 | | int32_t _close_load_cnt = 0; |
168 | | std::atomic<int32_t> _close_rpc_cnt = 0; |
169 | | std::vector<PTabletID> _tablets_to_commit; |
170 | | bthread::Mutex _lock; |
171 | | std::unordered_map<int64_t, int32_t> _open_streams; |
172 | | int64_t _txn_id = 0; |
173 | | std::shared_ptr<OlapTableSchemaParam> _schema; |
174 | | bool _enable_profile = false; |
175 | | std::unique_ptr<RuntimeProfile> _profile; |
176 | | RuntimeProfile::Counter* _append_data_timer = nullptr; |
177 | | RuntimeProfile::Counter* _close_wait_timer = nullptr; |
178 | | LoadStreamMgr* _load_stream_mgr = nullptr; |
179 | | QueryThreadContext _query_thread_context; |
180 | | bool _is_incremental = false; |
181 | | }; |
182 | | |
183 | | using LoadStreamPtr = std::unique_ptr<LoadStream>; |
184 | | |
185 | | } // namespace doris |