be/src/exec/sink/load_stream_stub.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | #include <brpc/controller.h> |
20 | | #include <bthread/condition_variable.h> |
21 | | #include <bthread/mutex.h> |
22 | | #include <bthread/types.h> |
23 | | #include <butil/errno.h> |
24 | | #include <fmt/format.h> |
25 | | #include <gen_cpp/PaloInternalService_types.h> |
26 | | #include <gen_cpp/Types_types.h> |
27 | | #include <gen_cpp/internal_service.pb.h> |
28 | | #include <gen_cpp/types.pb.h> |
29 | | #include <glog/logging.h> |
30 | | #include <google/protobuf/stubs/callback.h> |
31 | | #include <parallel_hashmap/phmap.h> |
32 | | #include <stddef.h> |
33 | | #include <stdint.h> |
34 | | |
35 | | #include <atomic> |
36 | | // IWYU pragma: no_include <bits/chrono.h> |
37 | | #include <chrono> // IWYU pragma: keep |
38 | | #include <functional> |
39 | | #include <initializer_list> |
40 | | #include <map> |
41 | | #include <memory> |
42 | | #include <mutex> |
43 | | #include <ostream> |
44 | | #include <queue> |
45 | | #include <set> |
46 | | #include <span> |
47 | | #include <string> |
48 | | #include <unordered_map> |
49 | | #include <unordered_set> |
50 | | #include <utility> |
51 | | #include <vector> |
52 | | |
53 | | #include "common/config.h" |
54 | | #include "common/status.h" |
55 | | #include "core/allocator.h" |
56 | | #include "core/block/block.h" |
57 | | #include "core/column/column.h" |
58 | | #include "core/data_type/data_type.h" |
59 | | #include "exprs/vexpr_fwd.h" |
60 | | #include "runtime/exec_env.h" |
61 | | #include "runtime/memory/mem_tracker.h" |
62 | | #include "runtime/runtime_profile.h" |
63 | | #include "runtime/thread_context.h" |
64 | | #include "storage/tablet_info.h" |
65 | | #include "util/countdown_latch.h" |
66 | | #include "util/debug_points.h" |
67 | | #include "util/stopwatch.hpp" |
68 | | |
69 | | namespace doris { |
70 | | class TabletSchema; |
71 | | class LoadStreamStub; |
72 | | |
73 | | struct SegmentStatistics; |
74 | | |
75 | | using IndexToTabletSchema = phmap::parallel_flat_hash_map< |
76 | | int64_t, std::shared_ptr<TabletSchema>, std::hash<int64_t>, std::equal_to<int64_t>, |
77 | | std::allocator<phmap::Pair<const int64_t, std::shared_ptr<TabletSchema>>>, 4, std::mutex>; |
78 | | |
79 | | using IndexToEnableMoW = |
80 | | phmap::parallel_flat_hash_map<int64_t, bool, std::hash<int64_t>, std::equal_to<int64_t>, |
81 | | std::allocator<phmap::Pair<const int64_t, bool>>, 4, |
82 | | std::mutex>; |
83 | | |
84 | | class LoadStreamReplyHandler : public brpc::StreamInputHandler { |
85 | | public: |
86 | | LoadStreamReplyHandler(PUniqueId load_id, int64_t dst_id, std::weak_ptr<LoadStreamStub> stub) |
87 | 4.06k | : _load_id(load_id), _dst_id(dst_id), _stub(stub) {} |
88 | | |
89 | | int on_received_messages(brpc::StreamId id, butil::IOBuf* const messages[], |
90 | | size_t size) override; |
91 | | |
92 | 0 | void on_idle_timeout(brpc::StreamId id) override {} |
93 | | |
94 | | void on_closed(brpc::StreamId id) override; |
95 | | |
96 | | friend std::ostream& operator<<(std::ostream& ostr, const LoadStreamReplyHandler& handler); |
97 | | |
98 | | private: |
99 | | PUniqueId _load_id; // for logging |
100 | | int64_t _dst_id = -1; // for logging |
101 | | std::weak_ptr<LoadStreamStub> _stub; |
102 | | }; |
103 | | |
104 | | class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> { |
105 | | friend class LoadStreamReplyHandler; |
106 | | |
107 | | public: |
108 | | // construct new stub |
109 | | LoadStreamStub(PUniqueId load_id, int64_t src_id, |
110 | | std::shared_ptr<IndexToTabletSchema> schema_map, |
111 | | std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental = false); |
112 | | |
113 | | LoadStreamStub(UniqueId load_id, int64_t src_id, |
114 | | std::shared_ptr<IndexToTabletSchema> schema_map, |
115 | | std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental = false) |
116 | 4.11k | : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map, incremental) {}; |
117 | | |
118 | | // for mock this class in UT |
119 | | #ifdef BE_TEST |
120 | | virtual |
121 | | #endif |
122 | | ~LoadStreamStub(); |
123 | | |
124 | | // open_load_stream |
125 | | Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const NodeInfo& node_info, |
126 | | int64_t txn_id, const OlapTableSchemaParam& schema, |
127 | | const std::vector<PTabletID>& tablets_for_schema, int total_streams, |
128 | | int64_t idle_timeout_ms, bool enable_profile); |
129 | | |
130 | | // for mock this class in UT |
131 | | #ifdef BE_TEST |
132 | | virtual |
133 | | #endif |
134 | | // segment_id is limited by max_segment_num_per_rowset (default value of 1000), |
135 | | // so in practice it will not exceed the range of i16. |
136 | | |
137 | | // APPEND_DATA |
138 | | Status |
139 | | append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id, |
140 | | int32_t segment_id, uint64_t offset, std::span<const Slice> data, |
141 | | bool segment_eos = false, FileType file_type = FileType::SEGMENT_FILE); |
142 | | |
143 | | // ADD_SEGMENT |
144 | | Status add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id, |
145 | | int32_t segment_id, const SegmentStatistics& segment_stat); |
146 | | |
147 | | // CLOSE_LOAD |
148 | | Status close_load(const std::vector<PTabletID>& tablets_to_commit, int num_incremental_streams); |
149 | | |
150 | | // GET_SCHEMA |
151 | | Status get_schema(const std::vector<PTabletID>& tablets); |
152 | | |
153 | | // wait remote to close stream, |
154 | | // remote will close stream when it receives CLOSE_LOAD |
155 | | Status close_finish_check(RuntimeState* state, bool* is_closed); |
156 | | |
157 | | // cancel the stream, abort close_wait, mark _is_closed and _is_cancelled |
158 | | void cancel(Status reason); |
159 | | |
160 | | Status wait_for_schema(int64_t partition_id, int64_t index_id, int64_t tablet_id, |
161 | | int64_t timeout_ms = 60000); |
162 | | |
163 | 4 | Status wait_for_new_schema(int64_t timeout_ms) { |
164 | 4 | std::unique_lock<bthread::Mutex> lock(_schema_mutex); |
165 | 4 | if (timeout_ms > 0) { |
166 | 4 | int ret = _schema_cv.wait_for(lock, timeout_ms * 1000); |
167 | 4 | return ret == 0 ? Status::OK() : Status::Error<true>(ret, "wait schema update timeout"); |
168 | 4 | } |
169 | 0 | _schema_cv.wait(lock); |
170 | 0 | return Status::OK(); |
171 | 4 | }; |
172 | | |
173 | 7.50k | std::shared_ptr<TabletSchema> tablet_schema(int64_t index_id) const { |
174 | 7.50k | return (*_tablet_schema_for_index)[index_id]; |
175 | 7.50k | } |
176 | | |
177 | 7.50k | bool enable_unique_mow(int64_t index_id) const { |
178 | 7.50k | return _enable_unique_mow_for_index->at(index_id); |
179 | 7.50k | } |
180 | | |
181 | 4.09k | std::vector<int64_t> success_tablets() { |
182 | 4.09k | std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex); |
183 | 4.09k | return _success_tablets; |
184 | 4.09k | } |
185 | | |
186 | 4.09k | std::unordered_map<int64_t, Status> failed_tablets() { |
187 | 4.09k | std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex); |
188 | 4.09k | return _failed_tablets; |
189 | 4.09k | } |
190 | | |
191 | 0 | brpc::StreamId stream_id() const { return _stream_id; } |
192 | | |
193 | 0 | int64_t src_id() const { return _src_id; } |
194 | | |
195 | 0 | int64_t dst_id() const { return _dst_id; } |
196 | | |
197 | 0 | bool is_open() const { return _is_open.load(); } |
198 | | |
199 | 10.8k | bool is_incremental() const { return _is_incremental; } |
200 | | |
201 | | friend std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub& stub); |
202 | | |
203 | | std::string to_string(); |
204 | | |
205 | | // for tests only |
206 | | void add_success_tablet(int64_t tablet_id) { |
207 | | std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex); |
208 | | _success_tablets.push_back(tablet_id); |
209 | | } |
210 | | |
211 | 13 | void add_failed_tablet(int64_t tablet_id, Status reason) { |
212 | 13 | std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex); |
213 | 13 | _failed_tablets[tablet_id] = reason; |
214 | 13 | } |
215 | | |
216 | 116k | void add_bytes_written(size_t bytes) { |
217 | 116k | std::lock_guard<bthread::Mutex> lock(_write_mutex); |
218 | 116k | _bytes_written += bytes; |
219 | 116k | } |
220 | | |
221 | 0 | int64_t bytes_written() { |
222 | 0 | std::lock_guard<bthread::Mutex> lock(_write_mutex); |
223 | 0 | return _bytes_written; |
224 | 0 | } |
225 | | |
226 | 35.7k | Status check_cancel() { |
227 | 35.7k | DBUG_EXECUTE_IF("LoadStreamStub._check_cancel.cancelled", |
228 | 35.7k | { return Status::InternalError("stream cancelled"); }); |
229 | 35.7k | if (!_is_cancelled.load()) { |
230 | 35.7k | return Status::OK(); |
231 | 35.7k | } |
232 | 18.4E | std::lock_guard<bthread::Mutex> lock(_cancel_mutex); |
233 | 18.4E | return Status::Cancelled("load_id={}, reason: {}", print_id(_load_id), |
234 | 18.4E | _cancel_st.to_string_no_stack()); |
235 | 35.7k | } |
236 | | |
237 | 4.88k | int64_t get_and_reset_load_back_pressure_version_wait_time_ms() { |
238 | 4.88k | return _load_back_pressure_version_wait_time_ms.exchange(0); |
239 | 4.88k | } |
240 | | |
241 | | void _refresh_back_pressure_version_wait_time( |
242 | | const ::google::protobuf::RepeatedPtrField<::doris::PTabletLoadRowsetInfo>& |
243 | | tablet_load_infos); |
244 | | |
245 | | private: |
246 | | Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data = {}); |
247 | | Status _send_with_buffer(butil::IOBuf& buf, bool sync = false); |
248 | | Status _send_with_retry(butil::IOBuf& buf); |
249 | | void _handle_failure(butil::IOBuf& buf, Status st); |
250 | | |
251 | | protected: |
252 | | std::atomic<bool> _is_init; |
253 | | std::atomic<bool> _is_open; |
254 | | std::atomic<bool> _is_closing; |
255 | | std::atomic<bool> _is_closed; |
256 | | std::atomic<bool> _is_cancelled; |
257 | | std::atomic<bool> _is_eos; |
258 | | |
259 | | PUniqueId _load_id; |
260 | | brpc::StreamId _stream_id; |
261 | | int64_t _src_id = -1; // source backend_id |
262 | | int64_t _dst_id = -1; // destination backend_id |
263 | | Status _status = Status::InternalError<false>("Stream is not open"); |
264 | | Status _cancel_st; |
265 | | |
266 | | bthread::Mutex _open_mutex; |
267 | | bthread::Mutex _cancel_mutex; |
268 | | |
269 | | std::mutex _buffer_mutex; |
270 | | std::mutex _send_mutex; |
271 | | butil::IOBuf _buffer; |
272 | | |
273 | | bthread::Mutex _schema_mutex; |
274 | | bthread::ConditionVariable _schema_cv; |
275 | | std::shared_ptr<IndexToTabletSchema> _tablet_schema_for_index; |
276 | | std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index; |
277 | | |
278 | | bthread::Mutex _success_tablets_mutex; |
279 | | bthread::Mutex _failed_tablets_mutex; |
280 | | std::vector<int64_t> _success_tablets; |
281 | | std::unordered_map<int64_t, Status> _failed_tablets; |
282 | | |
283 | | bool _is_incremental = false; |
284 | | |
285 | | bthread::Mutex _write_mutex; |
286 | | size_t _bytes_written = 0; |
287 | | |
288 | | std::atomic<int64_t> _load_back_pressure_version_wait_time_ms {0}; |
289 | | }; |
290 | | |
291 | | // a collection of LoadStreams connect to the same node |
292 | | class LoadStreamStubs { |
293 | | public: |
294 | | LoadStreamStubs(size_t num_streams, UniqueId load_id, int64_t src_id, |
295 | | std::shared_ptr<IndexToTabletSchema> schema_map, |
296 | | std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental = false) |
297 | 2.13k | : _is_incremental(incremental) { |
298 | 2.13k | _streams.reserve(num_streams); |
299 | 6.25k | for (size_t i = 0; i < num_streams; i++) { |
300 | 4.11k | _streams.emplace_back( |
301 | 4.11k | new LoadStreamStub(load_id, src_id, schema_map, mow_map, incremental)); |
302 | 4.11k | } |
303 | 2.13k | } |
304 | | |
305 | | Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const NodeInfo& node_info, |
306 | | int64_t txn_id, const OlapTableSchemaParam& schema, |
307 | | const std::vector<PTabletID>& tablets_for_schema, int total_streams, |
308 | | int64_t idle_timeout_ms, bool enable_profile); |
309 | | |
310 | 4.18k | bool is_incremental() const { return _is_incremental; } |
311 | | |
312 | | size_t size() const { return _streams.size(); } |
313 | | |
314 | | // for UT only |
315 | | void mark_open() { _open_success.store(true); } |
316 | | |
317 | 3.81k | std::shared_ptr<LoadStreamStub> select_one_stream() { |
318 | 3.81k | if (!_open_success.load()) { |
319 | 0 | return nullptr; |
320 | 0 | } |
321 | 3.81k | size_t i = _select_index.fetch_add(1); |
322 | 3.81k | return _streams[i % _streams.size()]; |
323 | 3.81k | } |
324 | | |
325 | 32 | void cancel(Status reason) { |
326 | 64 | for (auto& stream : _streams) { |
327 | 64 | stream->cancel(reason); |
328 | 64 | } |
329 | 32 | } |
330 | | |
331 | | Status close_load(const std::vector<PTabletID>& tablets_to_commit, int num_incremental_streams); |
332 | | |
333 | 2.12k | std::unordered_set<int64_t> success_tablets() { |
334 | 2.12k | std::unordered_set<int64_t> s; |
335 | 4.09k | for (auto& stream : _streams) { |
336 | 4.09k | auto v = stream->success_tablets(); |
337 | 4.09k | std::copy(v.begin(), v.end(), std::inserter(s, s.end())); |
338 | 4.09k | } |
339 | 2.12k | return s; |
340 | 2.12k | } |
341 | | |
342 | 2.12k | std::unordered_map<int64_t, Status> failed_tablets() { |
343 | 2.12k | std::unordered_map<int64_t, Status> m; |
344 | 4.09k | for (auto& stream : _streams) { |
345 | 4.09k | auto v = stream->failed_tablets(); |
346 | 4.09k | m.insert(v.begin(), v.end()); |
347 | 4.09k | } |
348 | 2.12k | return m; |
349 | 2.12k | } |
350 | | |
351 | 64.3k | std::vector<std::shared_ptr<LoadStreamStub>> streams() { return _streams; } |
352 | | |
353 | | private: |
354 | | std::vector<std::shared_ptr<LoadStreamStub>> _streams; |
355 | | std::atomic<bool> _open_success = false; |
356 | | std::atomic<size_t> _select_index = 0; |
357 | | const bool _is_incremental; |
358 | | }; |
359 | | |
360 | | } // namespace doris |