be/src/exec/sink/load_stream_stub.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | #include <brpc/controller.h> |
20 | | #include <bthread/condition_variable.h> |
21 | | #include <bthread/mutex.h> |
22 | | #include <bthread/types.h> |
23 | | #include <butil/errno.h> |
24 | | #include <fmt/format.h> |
25 | | #include <gen_cpp/PaloInternalService_types.h> |
26 | | #include <gen_cpp/Types_types.h> |
27 | | #include <gen_cpp/internal_service.pb.h> |
28 | | #include <gen_cpp/types.pb.h> |
29 | | #include <glog/logging.h> |
30 | | #include <google/protobuf/stubs/callback.h> |
31 | | #include <parallel_hashmap/phmap.h> |
32 | | #include <stddef.h> |
33 | | #include <stdint.h> |
34 | | |
35 | | #include <atomic> |
36 | | // IWYU pragma: no_include <bits/chrono.h> |
37 | | #include <chrono> // IWYU pragma: keep |
38 | | #include <functional> |
39 | | #include <initializer_list> |
40 | | #include <map> |
41 | | #include <memory> |
42 | | #include <mutex> |
43 | | #include <ostream> |
44 | | #include <queue> |
45 | | #include <set> |
46 | | #include <span> |
47 | | #include <string> |
48 | | #include <unordered_map> |
49 | | #include <unordered_set> |
50 | | #include <utility> |
51 | | #include <vector> |
52 | | |
53 | | #include "common/config.h" |
54 | | #include "common/status.h" |
55 | | #include "core/allocator.h" |
56 | | #include "core/block/block.h" |
57 | | #include "core/column/column.h" |
58 | | #include "core/data_type/data_type.h" |
59 | | #include "exprs/vexpr_fwd.h" |
60 | | #include "runtime/exec_env.h" |
61 | | #include "runtime/memory/mem_tracker.h" |
62 | | #include "runtime/runtime_profile.h" |
63 | | #include "runtime/thread_context.h" |
64 | | #include "storage/tablet_info.h" |
65 | | #include "util/countdown_latch.h" |
66 | | #include "util/debug_points.h" |
67 | | #include "util/stopwatch.hpp" |
68 | | |
69 | | namespace doris { |
70 | | #include "common/compile_check_begin.h" |
71 | | class TabletSchema; |
72 | | class LoadStreamStub; |
73 | | |
74 | | struct SegmentStatistics; |
75 | | |
76 | | using IndexToTabletSchema = phmap::parallel_flat_hash_map< |
77 | | int64_t, std::shared_ptr<TabletSchema>, std::hash<int64_t>, std::equal_to<int64_t>, |
78 | | std::allocator<phmap::Pair<const int64_t, std::shared_ptr<TabletSchema>>>, 4, std::mutex>; |
79 | | |
80 | | using IndexToEnableMoW = |
81 | | phmap::parallel_flat_hash_map<int64_t, bool, std::hash<int64_t>, std::equal_to<int64_t>, |
82 | | std::allocator<phmap::Pair<const int64_t, bool>>, 4, |
83 | | std::mutex>; |
84 | | |
85 | | class LoadStreamReplyHandler : public brpc::StreamInputHandler { |
86 | | public: |
87 | | LoadStreamReplyHandler(PUniqueId load_id, int64_t dst_id, std::weak_ptr<LoadStreamStub> stub) |
88 | 24 | : _load_id(load_id), _dst_id(dst_id), _stub(stub) {} |
89 | | |
90 | | int on_received_messages(brpc::StreamId id, butil::IOBuf* const messages[], |
91 | | size_t size) override; |
92 | | |
93 | 0 | void on_idle_timeout(brpc::StreamId id) override {} |
94 | | |
95 | | void on_closed(brpc::StreamId id) override; |
96 | | |
97 | | friend std::ostream& operator<<(std::ostream& ostr, const LoadStreamReplyHandler& handler); |
98 | | |
99 | | private: |
100 | | PUniqueId _load_id; // for logging |
101 | | int64_t _dst_id = -1; // for logging |
102 | | std::weak_ptr<LoadStreamStub> _stub; |
103 | | }; |
104 | | |
105 | | class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> { |
106 | | friend class LoadStreamReplyHandler; |
107 | | |
108 | | public: |
109 | | // construct new stub |
110 | | LoadStreamStub(PUniqueId load_id, int64_t src_id, |
111 | | std::shared_ptr<IndexToTabletSchema> schema_map, |
112 | | std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental = false); |
113 | | |
114 | | LoadStreamStub(UniqueId load_id, int64_t src_id, |
115 | | std::shared_ptr<IndexToTabletSchema> schema_map, |
116 | | std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental = false) |
117 | 75 | : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map, incremental) {}; |
118 | | |
119 | | // for mock this class in UT |
120 | | #ifdef BE_TEST |
121 | | virtual |
122 | | #endif |
123 | | ~LoadStreamStub(); |
124 | | |
125 | | // open_load_stream |
126 | | Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const NodeInfo& node_info, |
127 | | int64_t txn_id, const OlapTableSchemaParam& schema, |
128 | | const std::vector<PTabletID>& tablets_for_schema, int total_streams, |
129 | | int64_t idle_timeout_ms, bool enable_profile); |
130 | | |
131 | | // for mock this class in UT |
132 | | #ifdef BE_TEST |
133 | | virtual |
134 | | #endif |
135 | | // segment_id is limited by max_segment_num_per_rowset (default value of 1000), |
136 | | // so in practice it will not exceed the range of i16. |
137 | | |
138 | | // APPEND_DATA |
139 | | Status |
140 | | append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id, |
141 | | int32_t segment_id, uint64_t offset, std::span<const Slice> data, |
142 | | bool segment_eos = false, FileType file_type = FileType::SEGMENT_FILE); |
143 | | |
144 | | // ADD_SEGMENT |
145 | | Status add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id, |
146 | | int32_t segment_id, const SegmentStatistics& segment_stat); |
147 | | |
148 | | // CLOSE_LOAD |
149 | | Status close_load(const std::vector<PTabletID>& tablets_to_commit, int num_incremental_streams); |
150 | | |
151 | | // GET_SCHEMA |
152 | | Status get_schema(const std::vector<PTabletID>& tablets); |
153 | | |
154 | | // wait remote to close stream, |
155 | | // remote will close stream when it receives CLOSE_LOAD |
156 | | Status close_finish_check(RuntimeState* state, bool* is_closed); |
157 | | |
158 | | // cancel the stream, abort close_wait, mark _is_closed and _is_cancelled |
159 | | void cancel(Status reason); |
160 | | |
161 | | Status wait_for_schema(int64_t partition_id, int64_t index_id, int64_t tablet_id, |
162 | | int64_t timeout_ms = 60000); |
163 | | |
164 | 0 | Status wait_for_new_schema(int64_t timeout_ms) { |
165 | 0 | std::unique_lock<bthread::Mutex> lock(_schema_mutex); |
166 | 0 | if (timeout_ms > 0) { |
167 | 0 | int ret = _schema_cv.wait_for(lock, timeout_ms * 1000); |
168 | 0 | return ret == 0 ? Status::OK() : Status::Error<true>(ret, "wait schema update timeout"); |
169 | 0 | } |
170 | 0 | _schema_cv.wait(lock); |
171 | 0 | return Status::OK(); |
172 | 0 | }; |
173 | | |
174 | 112 | std::shared_ptr<TabletSchema> tablet_schema(int64_t index_id) const { |
175 | 112 | return (*_tablet_schema_for_index)[index_id]; |
176 | 112 | } |
177 | | |
178 | 112 | bool enable_unique_mow(int64_t index_id) const { |
179 | 112 | return _enable_unique_mow_for_index->at(index_id); |
180 | 112 | } |
181 | | |
182 | 57 | std::vector<int64_t> success_tablets() { |
183 | 57 | std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex); |
184 | 57 | return _success_tablets; |
185 | 57 | } |
186 | | |
187 | 57 | std::unordered_map<int64_t, Status> failed_tablets() { |
188 | 57 | std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex); |
189 | 57 | return _failed_tablets; |
190 | 57 | } |
191 | | |
192 | 0 | brpc::StreamId stream_id() const { return _stream_id; } |
193 | | |
194 | 0 | int64_t src_id() const { return _src_id; } |
195 | | |
196 | 0 | int64_t dst_id() const { return _dst_id; } |
197 | | |
198 | 0 | bool is_open() const { return _is_open.load(); } |
199 | | |
200 | 24 | bool is_incremental() const { return _is_incremental; } |
201 | | |
202 | | friend std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub& stub); |
203 | | |
204 | | std::string to_string(); |
205 | | |
206 | | // for tests only |
207 | | void add_success_tablet(int64_t tablet_id) { |
208 | | std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex); |
209 | | _success_tablets.push_back(tablet_id); |
210 | | } |
211 | | |
212 | 13 | void add_failed_tablet(int64_t tablet_id, Status reason) { |
213 | 13 | std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex); |
214 | 13 | _failed_tablets[tablet_id] = reason; |
215 | 13 | } |
216 | | |
217 | 3.71k | void add_bytes_written(size_t bytes) { |
218 | 3.71k | std::lock_guard<bthread::Mutex> lock(_write_mutex); |
219 | 3.71k | _bytes_written += bytes; |
220 | 3.71k | } |
221 | | |
222 | 0 | int64_t bytes_written() { |
223 | 0 | std::lock_guard<bthread::Mutex> lock(_write_mutex); |
224 | 0 | return _bytes_written; |
225 | 0 | } |
226 | | |
227 | 124 | Status check_cancel() { |
228 | 124 | DBUG_EXECUTE_IF("LoadStreamStub._check_cancel.cancelled", |
229 | 124 | { return Status::InternalError("stream cancelled"); }); |
230 | 124 | if (!_is_cancelled.load()) { |
231 | 124 | return Status::OK(); |
232 | 124 | } |
233 | 0 | std::lock_guard<bthread::Mutex> lock(_cancel_mutex); |
234 | 0 | return Status::Cancelled("load_id={}, reason: {}", print_id(_load_id), |
235 | 0 | _cancel_st.to_string_no_stack()); |
236 | 124 | } |
237 | | |
238 | 24 | int64_t get_and_reset_load_back_pressure_version_wait_time_ms() { |
239 | 24 | return _load_back_pressure_version_wait_time_ms.exchange(0); |
240 | 24 | } |
241 | | |
242 | | void _refresh_back_pressure_version_wait_time( |
243 | | const ::google::protobuf::RepeatedPtrField<::doris::PTabletLoadRowsetInfo>& |
244 | | tablet_load_infos); |
245 | | |
246 | | private: |
247 | | Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data = {}); |
248 | | Status _send_with_buffer(butil::IOBuf& buf, bool sync = false); |
249 | | Status _send_with_retry(butil::IOBuf& buf); |
250 | | void _handle_failure(butil::IOBuf& buf, Status st); |
251 | | |
252 | | protected: |
253 | | std::atomic<bool> _is_init; |
254 | | std::atomic<bool> _is_open; |
255 | | std::atomic<bool> _is_closing; |
256 | | std::atomic<bool> _is_closed; |
257 | | std::atomic<bool> _is_cancelled; |
258 | | std::atomic<bool> _is_eos; |
259 | | |
260 | | PUniqueId _load_id; |
261 | | brpc::StreamId _stream_id; |
262 | | int64_t _src_id = -1; // source backend_id |
263 | | int64_t _dst_id = -1; // destination backend_id |
264 | | Status _status = Status::InternalError<false>("Stream is not open"); |
265 | | Status _cancel_st; |
266 | | |
267 | | bthread::Mutex _open_mutex; |
268 | | bthread::Mutex _cancel_mutex; |
269 | | |
270 | | std::mutex _buffer_mutex; |
271 | | std::mutex _send_mutex; |
272 | | butil::IOBuf _buffer; |
273 | | |
274 | | bthread::Mutex _schema_mutex; |
275 | | bthread::ConditionVariable _schema_cv; |
276 | | std::shared_ptr<IndexToTabletSchema> _tablet_schema_for_index; |
277 | | std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index; |
278 | | |
279 | | bthread::Mutex _success_tablets_mutex; |
280 | | bthread::Mutex _failed_tablets_mutex; |
281 | | std::vector<int64_t> _success_tablets; |
282 | | std::unordered_map<int64_t, Status> _failed_tablets; |
283 | | |
284 | | bool _is_incremental = false; |
285 | | |
286 | | bthread::Mutex _write_mutex; |
287 | | size_t _bytes_written = 0; |
288 | | |
289 | | std::atomic<int64_t> _load_back_pressure_version_wait_time_ms {0}; |
290 | | }; |
291 | | |
292 | | // a collection of LoadStreams connect to the same node |
293 | | class LoadStreamStubs { |
294 | | public: |
295 | | LoadStreamStubs(size_t num_streams, UniqueId load_id, int64_t src_id, |
296 | | std::shared_ptr<IndexToTabletSchema> schema_map, |
297 | | std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental = false) |
298 | 48 | : _is_incremental(incremental) { |
299 | 48 | _streams.reserve(num_streams); |
300 | 123 | for (size_t i = 0; i < num_streams; i++) { |
301 | 75 | _streams.emplace_back( |
302 | 75 | new LoadStreamStub(load_id, src_id, schema_map, mow_map, incremental)); |
303 | 75 | } |
304 | 48 | } |
305 | | |
306 | | Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const NodeInfo& node_info, |
307 | | int64_t txn_id, const OlapTableSchemaParam& schema, |
308 | | const std::vector<PTabletID>& tablets_for_schema, int total_streams, |
309 | | int64_t idle_timeout_ms, bool enable_profile); |
310 | | |
311 | 24 | bool is_incremental() const { return _is_incremental; } |
312 | | |
313 | | size_t size() const { return _streams.size(); } |
314 | | |
315 | | // for UT only |
316 | | void mark_open() { _open_success.store(true); } |
317 | | |
318 | 120 | std::shared_ptr<LoadStreamStub> select_one_stream() { |
319 | 120 | if (!_open_success.load()) { |
320 | 0 | return nullptr; |
321 | 0 | } |
322 | 120 | size_t i = _select_index.fetch_add(1); |
323 | 120 | return _streams[i % _streams.size()]; |
324 | 120 | } |
325 | | |
326 | 0 | void cancel(Status reason) { |
327 | 0 | for (auto& stream : _streams) { |
328 | 0 | stream->cancel(reason); |
329 | 0 | } |
330 | 0 | } |
331 | | |
332 | | Status close_load(const std::vector<PTabletID>& tablets_to_commit, int num_incremental_streams); |
333 | | |
334 | 45 | std::unordered_set<int64_t> success_tablets() { |
335 | 45 | std::unordered_set<int64_t> s; |
336 | 57 | for (auto& stream : _streams) { |
337 | 57 | auto v = stream->success_tablets(); |
338 | 57 | std::copy(v.begin(), v.end(), std::inserter(s, s.end())); |
339 | 57 | } |
340 | 45 | return s; |
341 | 45 | } |
342 | | |
343 | 45 | std::unordered_map<int64_t, Status> failed_tablets() { |
344 | 45 | std::unordered_map<int64_t, Status> m; |
345 | 57 | for (auto& stream : _streams) { |
346 | 57 | auto v = stream->failed_tablets(); |
347 | 57 | m.insert(v.begin(), v.end()); |
348 | 57 | } |
349 | 45 | return m; |
350 | 45 | } |
351 | | |
352 | 152 | std::vector<std::shared_ptr<LoadStreamStub>> streams() { return _streams; } |
353 | | |
354 | | private: |
355 | | std::vector<std::shared_ptr<LoadStreamStub>> _streams; |
356 | | std::atomic<bool> _open_success = false; |
357 | | std::atomic<size_t> _select_index = 0; |
358 | | const bool _is_incremental; |
359 | | }; |
360 | | |
361 | | } // namespace doris |
362 | | |
363 | | #include "common/compile_check_end.h" |