be/src/exec/sink/load_stream_stub.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | #include <brpc/controller.h> |
20 | | #include <bthread/condition_variable.h> |
21 | | #include <bthread/mutex.h> |
22 | | #include <bthread/types.h> |
23 | | #include <butil/errno.h> |
24 | | #include <fmt/format.h> |
25 | | #include <gen_cpp/PaloInternalService_types.h> |
26 | | #include <gen_cpp/Types_types.h> |
27 | | #include <gen_cpp/internal_service.pb.h> |
28 | | #include <gen_cpp/types.pb.h> |
29 | | #include <glog/logging.h> |
30 | | #include <google/protobuf/stubs/callback.h> |
31 | | #include <parallel_hashmap/phmap.h> |
32 | | #include <stddef.h> |
33 | | #include <stdint.h> |
34 | | |
35 | | #include <atomic> |
36 | | // IWYU pragma: no_include <bits/chrono.h> |
37 | | #include <chrono> // IWYU pragma: keep |
38 | | #include <functional> |
39 | | #include <initializer_list> |
40 | | #include <map> |
41 | | #include <memory> |
42 | | #include <mutex> |
43 | | #include <ostream> |
44 | | #include <queue> |
45 | | #include <set> |
46 | | #include <span> |
47 | | #include <string> |
48 | | #include <unordered_map> |
49 | | #include <unordered_set> |
50 | | #include <utility> |
51 | | #include <vector> |
52 | | |
53 | | #include "common/config.h" |
54 | | #include "common/status.h" |
55 | | #include "core/allocator.h" |
56 | | #include "core/block/block.h" |
57 | | #include "core/column/column.h" |
58 | | #include "core/data_type/data_type.h" |
59 | | #include "exprs/vexpr_fwd.h" |
60 | | #include "runtime/exec_env.h" |
61 | | #include "runtime/memory/mem_tracker.h" |
62 | | #include "runtime/runtime_profile.h" |
63 | | #include "runtime/thread_context.h" |
64 | | #include "storage/tablet_info.h" |
65 | | #include "util/countdown_latch.h" |
66 | | #include "util/debug_points.h" |
67 | | #include "util/stopwatch.hpp" |
68 | | |
69 | | namespace doris { |
70 | | class TabletSchema; |
71 | | class LoadStreamStub; |
72 | | |
73 | | struct SegmentStatistics; |
74 | | |
75 | | using IndexToTabletSchema = phmap::parallel_flat_hash_map< |
76 | | int64_t, std::shared_ptr<TabletSchema>, std::hash<int64_t>, std::equal_to<int64_t>, |
77 | | std::allocator<phmap::Pair<const int64_t, std::shared_ptr<TabletSchema>>>, 4, std::mutex>; |
78 | | |
79 | | using IndexToEnableMoW = |
80 | | phmap::parallel_flat_hash_map<int64_t, bool, std::hash<int64_t>, std::equal_to<int64_t>, |
81 | | std::allocator<phmap::Pair<const int64_t, bool>>, 4, |
82 | | std::mutex>; |
83 | | |
84 | | class CloseWaitNotifier { |
85 | | public: |
86 | | int64_t close_wait_version() const; |
87 | | |
88 | | void wait_for_close_event(int64_t observed_version, int64_t timeout_ms); |
89 | | |
90 | | void notify_close_wait(); |
91 | | |
92 | | private: |
93 | | std::atomic<int64_t> _close_wait_version {0}; |
94 | | bthread::Mutex _close_wait_mutex; |
95 | | bthread::ConditionVariable _close_wait_cv; |
96 | | }; |
97 | | |
98 | | class LoadStreamReplyHandler : public brpc::StreamInputHandler { |
99 | | public: |
100 | | LoadStreamReplyHandler(PUniqueId load_id, int64_t dst_id, std::weak_ptr<LoadStreamStub> stub) |
101 | 5.74k | : _load_id(load_id), _dst_id(dst_id), _stub(stub) {} |
102 | | |
103 | | int on_received_messages(brpc::StreamId id, butil::IOBuf* const messages[], |
104 | | size_t size) override; |
105 | | |
106 | 0 | void on_idle_timeout(brpc::StreamId id) override {} |
107 | | |
108 | | void on_closed(brpc::StreamId id) override; |
109 | | |
110 | | friend std::ostream& operator<<(std::ostream& ostr, const LoadStreamReplyHandler& handler); |
111 | | |
112 | | private: |
113 | | PUniqueId _load_id; // for logging |
114 | | int64_t _dst_id = -1; // for logging |
115 | | std::weak_ptr<LoadStreamStub> _stub; |
116 | | }; |
117 | | |
118 | | class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> { |
119 | | friend class LoadStreamReplyHandler; |
120 | | |
121 | | public: |
122 | | // construct new stub |
123 | | LoadStreamStub(PUniqueId load_id, int64_t src_id, |
124 | | std::shared_ptr<IndexToTabletSchema> schema_map, |
125 | | std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental = false, |
126 | | std::shared_ptr<CloseWaitNotifier> close_wait_notifier = |
127 | | std::make_shared<CloseWaitNotifier>()); |
128 | | |
129 | | LoadStreamStub(UniqueId load_id, int64_t src_id, |
130 | | std::shared_ptr<IndexToTabletSchema> schema_map, |
131 | | std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental = false, |
132 | | std::shared_ptr<CloseWaitNotifier> close_wait_notifier = |
133 | | std::make_shared<CloseWaitNotifier>()) |
134 | 5.80k | : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map, incremental, |
135 | 5.80k | std::move(close_wait_notifier)) {}; |
136 | | |
137 | | // for mock this class in UT |
138 | | #ifdef BE_TEST |
139 | | virtual |
140 | | #endif |
141 | | ~LoadStreamStub(); |
142 | | |
143 | | // open_load_stream |
144 | | Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const NodeInfo& node_info, |
145 | | int64_t txn_id, const OlapTableSchemaParam& schema, |
146 | | const std::vector<PTabletID>& tablets_for_schema, int total_streams, |
147 | | int64_t idle_timeout_ms, bool enable_profile); |
148 | | |
149 | | // for mock this class in UT |
150 | | #ifdef BE_TEST |
151 | | virtual |
152 | | #endif |
153 | | // segment_id is limited by max_segment_num_per_rowset (default value of 1000), |
154 | | // so in practice it will not exceed the range of i16. |
155 | | |
156 | | // APPEND_DATA |
157 | | Status |
158 | | append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id, |
159 | | int32_t segment_id, uint64_t offset, std::span<const Slice> data, |
160 | | bool segment_eos = false, FileType file_type = FileType::SEGMENT_FILE); |
161 | | |
162 | | // ADD_SEGMENT |
163 | | Status add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id, |
164 | | int32_t segment_id, const SegmentStatistics& segment_stat); |
165 | | |
166 | | // CLOSE_LOAD |
167 | | Status close_load(const std::vector<PTabletID>& tablets_to_commit, int num_incremental_streams); |
168 | | |
169 | | // GET_SCHEMA |
170 | | Status get_schema(const std::vector<PTabletID>& tablets); |
171 | | |
172 | | // wait remote to close stream, |
173 | | // remote will close stream when it receives CLOSE_LOAD |
174 | | Status close_finish_check(RuntimeState* state, bool* is_closed); |
175 | | |
176 | | // cancel the stream, abort close_wait, mark _is_closed and _is_cancelled |
177 | | void cancel(Status reason); |
178 | | |
179 | | Status wait_for_schema(int64_t partition_id, int64_t index_id, int64_t tablet_id, |
180 | | int64_t timeout_ms = 60000); |
181 | | |
182 | 4 | Status wait_for_new_schema(int64_t timeout_ms) { |
183 | 4 | std::unique_lock<bthread::Mutex> lock(_schema_mutex); |
184 | 4 | if (timeout_ms > 0) { |
185 | 4 | int ret = _schema_cv.wait_for(lock, timeout_ms * 1000); |
186 | 4 | return ret == 0 ? Status::OK() : Status::Error<true>(ret, "wait schema update timeout"); |
187 | 4 | } |
188 | 0 | _schema_cv.wait(lock); |
189 | 0 | return Status::OK(); |
190 | 4 | }; |
191 | | |
192 | 13.2k | std::shared_ptr<TabletSchema> tablet_schema(int64_t index_id) const { |
193 | 13.2k | return (*_tablet_schema_for_index)[index_id]; |
194 | 13.2k | } |
195 | | |
196 | 13.2k | bool enable_unique_mow(int64_t index_id) const { |
197 | 13.2k | return _enable_unique_mow_for_index->at(index_id); |
198 | 13.2k | } |
199 | | |
200 | 5.75k | std::vector<int64_t> success_tablets() { |
201 | 5.75k | std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex); |
202 | 5.75k | return _success_tablets; |
203 | 5.75k | } |
204 | | |
205 | 5.75k | std::unordered_map<int64_t, Status> failed_tablets() { |
206 | 5.75k | std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex); |
207 | 5.75k | return _failed_tablets; |
208 | 5.75k | } |
209 | | |
210 | 0 | brpc::StreamId stream_id() const { return _stream_id; } |
211 | | |
212 | 0 | int64_t src_id() const { return _src_id; } |
213 | | |
214 | 0 | int64_t dst_id() const { return _dst_id; } |
215 | | |
216 | 0 | bool is_open() const { return _is_open.load(); } |
217 | | |
218 | 18.3k | bool is_incremental() const { return _is_incremental; } |
219 | | |
220 | | friend std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub& stub); |
221 | | |
222 | | std::string to_string(); |
223 | | |
224 | | // for tests only |
225 | | void add_success_tablet(int64_t tablet_id) { |
226 | | std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex); |
227 | | _success_tablets.push_back(tablet_id); |
228 | | } |
229 | | |
230 | 13 | void add_failed_tablet(int64_t tablet_id, Status reason) { |
231 | 13 | std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex); |
232 | 13 | _failed_tablets[tablet_id] = reason; |
233 | 13 | } |
234 | | |
235 | 172k | void add_bytes_written(size_t bytes) { |
236 | 172k | std::lock_guard<bthread::Mutex> lock(_write_mutex); |
237 | 172k | _bytes_written += bytes; |
238 | 172k | } |
239 | | |
240 | 0 | int64_t bytes_written() { |
241 | 0 | std::lock_guard<bthread::Mutex> lock(_write_mutex); |
242 | 0 | return _bytes_written; |
243 | 0 | } |
244 | | |
245 | 55.8k | Status check_cancel() { |
246 | 55.8k | DBUG_EXECUTE_IF("LoadStreamStub._check_cancel.cancelled", |
247 | 55.8k | { return Status::InternalError("stream cancelled"); }); |
248 | 55.9k | if (!_is_cancelled.load()) { |
249 | 55.9k | return Status::OK(); |
250 | 55.9k | } |
251 | 18.4E | std::lock_guard<bthread::Mutex> lock(_cancel_mutex); |
252 | 18.4E | return Status::Cancelled("load_id={}, reason: {}", print_id(_load_id), |
253 | 18.4E | _cancel_st.to_string_no_stack()); |
254 | 55.8k | } |
255 | | |
256 | 6.69k | int64_t get_and_reset_load_back_pressure_version_wait_time_ms() { |
257 | 6.69k | return _load_back_pressure_version_wait_time_ms.exchange(0); |
258 | 6.69k | } |
259 | | |
260 | | void _refresh_back_pressure_version_wait_time( |
261 | | const ::google::protobuf::RepeatedPtrField<::doris::PTabletLoadRowsetInfo>& |
262 | | tablet_load_infos); |
263 | | |
264 | | private: |
265 | | void notify_close_wait(); |
266 | | |
267 | | Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data = {}); |
268 | | Status _send_with_buffer(butil::IOBuf& buf, bool sync = false); |
269 | | Status _send_with_retry(butil::IOBuf& buf); |
270 | | void _handle_failure(butil::IOBuf& buf, Status st); |
271 | | |
272 | | protected: |
273 | | std::atomic<bool> _is_init; |
274 | | std::atomic<bool> _is_open; |
275 | | std::atomic<bool> _is_closing; |
276 | | std::atomic<bool> _is_closed; |
277 | | std::atomic<bool> _is_cancelled; |
278 | | std::atomic<bool> _is_eos; |
279 | | |
280 | | PUniqueId _load_id; |
281 | | brpc::StreamId _stream_id; |
282 | | int64_t _src_id = -1; // source backend_id |
283 | | int64_t _dst_id = -1; // destination backend_id |
284 | | Status _status = Status::InternalError<false>("Stream is not open"); |
285 | | Status _cancel_st; |
286 | | |
287 | | bthread::Mutex _open_mutex; |
288 | | bthread::Mutex _cancel_mutex; |
289 | | |
290 | | std::mutex _buffer_mutex; |
291 | | std::mutex _send_mutex; |
292 | | butil::IOBuf _buffer; |
293 | | |
294 | | bthread::Mutex _schema_mutex; |
295 | | bthread::ConditionVariable _schema_cv; |
296 | | std::shared_ptr<IndexToTabletSchema> _tablet_schema_for_index; |
297 | | std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index; |
298 | | |
299 | | bthread::Mutex _success_tablets_mutex; |
300 | | bthread::Mutex _failed_tablets_mutex; |
301 | | std::vector<int64_t> _success_tablets; |
302 | | std::unordered_map<int64_t, Status> _failed_tablets; |
303 | | |
304 | | bool _is_incremental = false; |
305 | | std::shared_ptr<CloseWaitNotifier> _close_wait_notifier; |
306 | | |
307 | | bthread::Mutex _write_mutex; |
308 | | size_t _bytes_written = 0; |
309 | | |
310 | | std::atomic<int64_t> _load_back_pressure_version_wait_time_ms {0}; |
311 | | }; |
312 | | |
313 | | // a collection of LoadStreams connect to the same node |
314 | | class LoadStreamStubs { |
315 | | public: |
316 | | LoadStreamStubs(size_t num_streams, UniqueId load_id, int64_t src_id, |
317 | | std::shared_ptr<IndexToTabletSchema> schema_map, |
318 | | std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental = false, |
319 | | std::shared_ptr<CloseWaitNotifier> close_wait_notifier = |
320 | | std::make_shared<CloseWaitNotifier>()) |
321 | 2.97k | : _is_incremental(incremental) { |
322 | 2.97k | _streams.reserve(num_streams); |
323 | 8.77k | for (size_t i = 0; i < num_streams; i++) { |
324 | 5.80k | _streams.emplace_back(new LoadStreamStub(load_id, src_id, schema_map, mow_map, |
325 | 5.80k | incremental, close_wait_notifier)); |
326 | 5.80k | } |
327 | 2.97k | } |
328 | | |
329 | | Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const NodeInfo& node_info, |
330 | | int64_t txn_id, const OlapTableSchemaParam& schema, |
331 | | const std::vector<PTabletID>& tablets_for_schema, int total_streams, |
332 | | int64_t idle_timeout_ms, bool enable_profile); |
333 | | |
334 | 5.84k | bool is_incremental() const { return _is_incremental; } |
335 | | |
336 | | size_t size() const { return _streams.size(); } |
337 | | |
338 | | // for UT only |
339 | | void mark_open() { _open_success.store(true); } |
340 | | |
341 | 6.68k | std::shared_ptr<LoadStreamStub> select_one_stream() { |
342 | 6.68k | if (!_open_success.load()) { |
343 | 0 | return nullptr; |
344 | 0 | } |
345 | 6.68k | size_t i = _select_index.fetch_add(1); |
346 | 6.68k | return _streams[i % _streams.size()]; |
347 | 6.68k | } |
348 | | |
349 | 97 | void cancel(Status reason) { |
350 | 193 | for (auto& stream : _streams) { |
351 | 193 | stream->cancel(reason); |
352 | 193 | } |
353 | 97 | } |
354 | | |
355 | | Status close_load(const std::vector<PTabletID>& tablets_to_commit, int num_incremental_streams); |
356 | | |
357 | 2.95k | std::unordered_set<int64_t> success_tablets() { |
358 | 2.95k | std::unordered_set<int64_t> s; |
359 | 5.75k | for (auto& stream : _streams) { |
360 | 5.75k | auto v = stream->success_tablets(); |
361 | 5.75k | std::copy(v.begin(), v.end(), std::inserter(s, s.end())); |
362 | 5.75k | } |
363 | 2.95k | return s; |
364 | 2.95k | } |
365 | | |
366 | 2.95k | std::unordered_map<int64_t, Status> failed_tablets() { |
367 | 2.95k | std::unordered_map<int64_t, Status> m; |
368 | 5.75k | for (auto& stream : _streams) { |
369 | 5.75k | auto v = stream->failed_tablets(); |
370 | 5.75k | m.insert(v.begin(), v.end()); |
371 | 5.75k | } |
372 | 2.95k | return m; |
373 | 2.95k | } |
374 | | |
375 | 71.2k | std::vector<std::shared_ptr<LoadStreamStub>> streams() { return _streams; } |
376 | | |
377 | | private: |
378 | | std::vector<std::shared_ptr<LoadStreamStub>> _streams; |
379 | | std::atomic<bool> _open_success = false; |
380 | | std::atomic<size_t> _select_index = 0; |
381 | | const bool _is_incremental; |
382 | | }; |
383 | | |
384 | | } // namespace doris |