/root/doris/be/src/runtime/load_stream.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "runtime/load_stream.h" |
19 | | |
20 | | #include <brpc/stream.h> |
21 | | #include <bthread/bthread.h> |
22 | | #include <bthread/condition_variable.h> |
23 | | #include <bthread/mutex.h> |
24 | | #include <olap/rowset/rowset_factory.h> |
25 | | #include <olap/rowset/rowset_meta.h> |
26 | | #include <olap/storage_engine.h> |
27 | | #include <olap/tablet_manager.h> |
28 | | #include <runtime/exec_env.h> |
29 | | |
30 | | #include <memory> |
31 | | #include <sstream> |
32 | | |
33 | | #include "bvar/bvar.h" |
34 | | #include "cloud/config.h" |
35 | | #include "common/signal_handler.h" |
36 | | #include "exec/tablet_info.h" |
37 | | #include "olap/delta_writer.h" |
38 | | #include "olap/tablet.h" |
39 | | #include "olap/tablet_fwd.h" |
40 | | #include "olap/tablet_schema.h" |
41 | | #include "runtime/exec_env.h" |
42 | | #include "runtime/fragment_mgr.h" |
43 | | #include "runtime/load_channel.h" |
44 | | #include "runtime/load_stream_mgr.h" |
45 | | #include "runtime/load_stream_writer.h" |
46 | | #include "runtime/workload_group/workload_group_manager.h" |
47 | | #include "util/debug_points.h" |
48 | | #include "util/runtime_profile.h" |
49 | | #include "util/thrift_util.h" |
50 | | #include "util/uid_util.h" |
51 | | |
52 | | #define UNKNOWN_ID_FOR_TEST 0x7c00 |
53 | | |
54 | | namespace doris { |
55 | | #include "common/compile_check_begin.h" |
56 | | |
57 | | bvar::Adder<int64_t> g_load_stream_cnt("load_stream_count"); |
58 | | bvar::LatencyRecorder g_load_stream_flush_wait_ms("load_stream_flush_wait_ms"); |
59 | | bvar::Adder<int> g_load_stream_flush_running_threads("load_stream_flush_wait_threads"); |
60 | | |
61 | | TabletStream::TabletStream(const PUniqueId& load_id, int64_t id, int64_t txn_id, |
62 | | LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile) |
63 | 14 | : _id(id), |
64 | 14 | _next_segid(0), |
65 | 14 | _load_id(load_id), |
66 | 14 | _txn_id(txn_id), |
67 | 14 | _load_stream_mgr(load_stream_mgr) { |
68 | 14 | load_stream_mgr->create_token(_flush_token); |
69 | 14 | _profile = profile->create_child(fmt::format("TabletStream {}", id), true, true); |
70 | 14 | _append_data_timer = ADD_TIMER(_profile, "AppendDataTime"); |
71 | 14 | _add_segment_timer = ADD_TIMER(_profile, "AddSegmentTime"); |
72 | 14 | _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime"); |
73 | 14 | } |
74 | | |
75 | 10 | inline std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream) { |
76 | 10 | ostr << "load_id=" << print_id(tablet_stream._load_id) << ", txn_id=" << tablet_stream._txn_id |
77 | 10 | << ", tablet_id=" << tablet_stream._id << ", status=" << tablet_stream._status.status(); |
78 | 10 | return ostr; |
79 | 10 | } |
80 | | |
81 | | Status TabletStream::init(std::shared_ptr<OlapTableSchemaParam> schema, int64_t index_id, |
82 | 14 | int64_t partition_id) { |
83 | 14 | WriteRequest req { |
84 | 14 | .tablet_id = _id, |
85 | 14 | .txn_id = _txn_id, |
86 | 14 | .index_id = index_id, |
87 | 14 | .partition_id = partition_id, |
88 | 14 | .load_id = _load_id, |
89 | 14 | .table_schema_param = schema, |
90 | | // TODO(plat1ko): write_file_cache |
91 | 14 | .storage_vault_id {}, |
92 | 14 | }; |
93 | | |
94 | 14 | _load_stream_writer = std::make_shared<LoadStreamWriter>(&req, _profile); |
95 | 14 | DBUG_EXECUTE_IF("TabletStream.init.uninited_writer", { |
96 | 14 | _status.update(Status::Uninitialized("fault injection")); |
97 | 14 | return _status.status(); |
98 | 14 | }); |
99 | 14 | _status.update(_load_stream_writer->init()); |
100 | 14 | if (!_status.ok()) { |
101 | 1 | LOG(INFO) << "failed to init rowset builder due to " << *this; |
102 | 1 | } |
103 | 14 | return _status.status(); |
104 | 14 | } |
105 | | |
106 | 27 | Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data) { |
107 | 27 | if (!_status.ok()) { |
108 | 1 | return _status.status(); |
109 | 1 | } |
110 | | |
111 | | // dispatch add_segment request |
112 | 26 | if (header.opcode() == PStreamHeader::ADD_SEGMENT) { |
113 | 0 | return add_segment(header, data); |
114 | 0 | } |
115 | | |
116 | 26 | SCOPED_TIMER(_append_data_timer); |
117 | | |
118 | 26 | int64_t src_id = header.src_id(); |
119 | 26 | uint32_t segid = header.segment_id(); |
120 | | // Ensure there are enough space and mapping are built. |
121 | 26 | SegIdMapping* mapping = nullptr; |
122 | 26 | { |
123 | 26 | std::lock_guard lock_guard(_lock); |
124 | 26 | if (!_segids_mapping.contains(src_id)) { |
125 | 14 | _segids_mapping[src_id] = std::make_unique<SegIdMapping>(); |
126 | 14 | } |
127 | 26 | mapping = _segids_mapping[src_id].get(); |
128 | 26 | } |
129 | 26 | if (segid + 1 > mapping->size()) { |
130 | | // TODO: Each sender lock is enough. |
131 | 15 | std::lock_guard lock_guard(_lock); |
132 | 15 | ssize_t origin_size = mapping->size(); |
133 | 15 | if (segid + 1 > origin_size) { |
134 | 15 | mapping->resize(segid + 1, std::numeric_limits<uint32_t>::max()); |
135 | 39 | for (size_t index = origin_size; index <= segid; index++) { |
136 | 24 | mapping->at(index) = _next_segid; |
137 | 24 | _next_segid++; |
138 | 24 | VLOG_DEBUG << "src_id=" << src_id << ", segid=" << index << " to " |
139 | 0 | << " segid=" << _next_segid - 1 << ", " << *this; |
140 | 24 | } |
141 | 15 | } |
142 | 15 | } |
143 | | |
144 | | // Each sender sends data in one segment sequential, so we also do not |
145 | | // need a lock here. |
146 | 26 | bool eos = header.segment_eos(); |
147 | 26 | FileType file_type = header.file_type(); |
148 | 26 | uint32_t new_segid = mapping->at(segid); |
149 | 26 | DCHECK(new_segid != std::numeric_limits<uint32_t>::max()); |
150 | 26 | butil::IOBuf buf = data->movable(); |
151 | 26 | auto flush_func = [this, new_segid, eos, buf, header, file_type]() mutable { |
152 | 26 | signal::set_signal_task_id(_load_id); |
153 | 26 | g_load_stream_flush_running_threads << -1; |
154 | 26 | auto st = _load_stream_writer->append_data(new_segid, header.offset(), buf, file_type); |
155 | 26 | if (!st.ok() && !config::is_cloud_mode()) { |
156 | 1 | auto res = ExecEnv::get_tablet(_id); |
157 | 1 | TabletSharedPtr tablet = |
158 | 1 | res.has_value() ? std::dynamic_pointer_cast<Tablet>(res.value()) : nullptr; |
159 | 1 | if (tablet) { |
160 | 1 | tablet->report_error(st); |
161 | 1 | } |
162 | 1 | } |
163 | 26 | if (eos && st.ok()) { |
164 | 21 | DBUG_EXECUTE_IF("TabletStream.append_data.unknown_file_type", |
165 | 21 | { file_type = static_cast<FileType>(-1); }); |
166 | 21 | if (file_type == FileType::SEGMENT_FILE || file_type == FileType::INVERTED_INDEX_FILE) { |
167 | 21 | st = _load_stream_writer->close_writer(new_segid, file_type); |
168 | 21 | } else { |
169 | 0 | st = Status::InternalError( |
170 | 0 | "appent data failed, file type error, file type = {}, " |
171 | 0 | "segment_id={}", |
172 | 0 | file_type, new_segid); |
173 | 0 | } |
174 | 21 | } |
175 | 26 | DBUG_EXECUTE_IF("TabletStream.append_data.append_failed", |
176 | 26 | { st = Status::InternalError("fault injection"); }); |
177 | 26 | if (!st.ok()) { |
178 | 2 | _status.update(st); |
179 | 2 | LOG(WARNING) << "write data failed " << st << ", " << *this; |
180 | 2 | } |
181 | 26 | }; |
182 | 26 | auto load_stream_flush_token_max_tasks = config::load_stream_flush_token_max_tasks; |
183 | 26 | auto load_stream_max_wait_flush_token_time_ms = |
184 | 26 | config::load_stream_max_wait_flush_token_time_ms; |
185 | 26 | DBUG_EXECUTE_IF("TabletStream.append_data.long_wait", { |
186 | 26 | load_stream_flush_token_max_tasks = 0; |
187 | 26 | load_stream_max_wait_flush_token_time_ms = 1000; |
188 | 26 | }); |
189 | 26 | MonotonicStopWatch timer; |
190 | 26 | timer.start(); |
191 | 26 | while (_flush_token->num_tasks() >= load_stream_flush_token_max_tasks) { |
192 | 0 | if (timer.elapsed_time() / 1000 / 1000 >= load_stream_max_wait_flush_token_time_ms) { |
193 | 0 | _status.update( |
194 | 0 | Status::Error<true>("wait flush token back pressure time is more than " |
195 | 0 | "load_stream_max_wait_flush_token_time {}", |
196 | 0 | load_stream_max_wait_flush_token_time_ms)); |
197 | 0 | return _status.status(); |
198 | 0 | } |
199 | 0 | bthread_usleep(2 * 1000); // 2ms |
200 | 0 | } |
201 | 26 | timer.stop(); |
202 | 26 | int64_t time_ms = timer.elapsed_time() / 1000 / 1000; |
203 | 26 | g_load_stream_flush_wait_ms << time_ms; |
204 | 26 | g_load_stream_flush_running_threads << 1; |
205 | 26 | Status st = Status::OK(); |
206 | 26 | DBUG_EXECUTE_IF("TabletStream.append_data.submit_func_failed", |
207 | 26 | { st = Status::InternalError("fault injection"); }); |
208 | 26 | if (st.ok()) { |
209 | 26 | st = _flush_token->submit_func(flush_func); |
210 | 26 | } |
211 | 26 | if (!st.ok()) { |
212 | 0 | _status.update(st); |
213 | 0 | } |
214 | 26 | return _status.status(); |
215 | 26 | } |
216 | | |
217 | 0 | Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data) { |
218 | 0 | if (!_status.ok()) { |
219 | 0 | return _status.status(); |
220 | 0 | } |
221 | | |
222 | 0 | SCOPED_TIMER(_add_segment_timer); |
223 | 0 | DCHECK(header.has_segment_statistics()); |
224 | 0 | SegmentStatistics stat(header.segment_statistics()); |
225 | |
|
226 | 0 | int64_t src_id = header.src_id(); |
227 | 0 | uint32_t segid = header.segment_id(); |
228 | 0 | uint32_t new_segid; |
229 | 0 | DBUG_EXECUTE_IF("TabletStream.add_segment.unknown_segid", { segid = UNKNOWN_ID_FOR_TEST; }); |
230 | 0 | { |
231 | 0 | std::lock_guard lock_guard(_lock); |
232 | 0 | if (!_segids_mapping.contains(src_id)) { |
233 | 0 | _status.update(Status::InternalError( |
234 | 0 | "add segment failed, no segment written by this src be yet, src_id={}, " |
235 | 0 | "segment_id={}", |
236 | 0 | src_id, segid)); |
237 | 0 | return _status.status(); |
238 | 0 | } |
239 | 0 | DBUG_EXECUTE_IF("TabletStream.add_segment.segid_never_written", |
240 | 0 | { segid = static_cast<uint32_t>(_segids_mapping[src_id]->size()); }); |
241 | 0 | if (segid >= _segids_mapping[src_id]->size()) { |
242 | 0 | _status.update(Status::InternalError( |
243 | 0 | "add segment failed, segment is never written, src_id={}, segment_id={}", |
244 | 0 | src_id, segid)); |
245 | 0 | return _status.status(); |
246 | 0 | } |
247 | 0 | new_segid = _segids_mapping[src_id]->at(segid); |
248 | 0 | } |
249 | 0 | DCHECK(new_segid != std::numeric_limits<uint32_t>::max()); |
250 | |
|
251 | 0 | auto add_segment_func = [this, new_segid, stat]() { |
252 | 0 | signal::set_signal_task_id(_load_id); |
253 | 0 | auto st = _load_stream_writer->add_segment(new_segid, stat); |
254 | 0 | DBUG_EXECUTE_IF("TabletStream.add_segment.add_segment_failed", |
255 | 0 | { st = Status::InternalError("fault injection"); }); |
256 | 0 | if (!st.ok()) { |
257 | 0 | _status.update(st); |
258 | 0 | LOG(INFO) << "add segment failed " << *this; |
259 | 0 | } |
260 | 0 | }; |
261 | 0 | Status st = Status::OK(); |
262 | 0 | DBUG_EXECUTE_IF("TabletStream.add_segment.submit_func_failed", |
263 | 0 | { st = Status::InternalError("fault injection"); }); |
264 | 0 | if (st.ok()) { |
265 | 0 | st = _flush_token->submit_func(add_segment_func); |
266 | 0 | } |
267 | 0 | if (!st.ok()) { |
268 | 0 | _status.update(st); |
269 | 0 | } |
270 | 0 | return _status.status(); |
271 | 0 | } |
272 | | |
273 | 30 | Status TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) { |
274 | 30 | bthread::Mutex mu; |
275 | 30 | std::unique_lock<bthread::Mutex> lock(mu); |
276 | 30 | bthread::ConditionVariable cv; |
277 | 30 | auto st = Status::OK(); |
278 | 30 | auto func = [this, &mu, &cv, &st, &fn] { |
279 | 30 | signal::set_signal_task_id(_load_id); |
280 | 30 | st = fn(); |
281 | 30 | std::lock_guard<bthread::Mutex> lock(mu); |
282 | 30 | cv.notify_one(); |
283 | 30 | }; |
284 | 30 | bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(func); |
285 | 30 | if (!ret) { |
286 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
287 | 0 | "there is not enough thread resource for close load"); |
288 | 0 | } |
289 | 30 | cv.wait(lock); |
290 | 30 | return st; |
291 | 30 | } |
292 | | |
293 | 28 | void TabletStream::wait_for_flush_tasks() { |
294 | 28 | { |
295 | 28 | std::lock_guard lock_guard(_lock); |
296 | 28 | if (_flush_tasks_done) { |
297 | 14 | return; |
298 | 14 | } |
299 | 14 | _flush_tasks_done = true; |
300 | 14 | } |
301 | | |
302 | 14 | if (!_status.ok()) { |
303 | 1 | _flush_token->shutdown(); |
304 | 1 | return; |
305 | 1 | } |
306 | | |
307 | | // Note: Do not use SCOPED_TIMER here because this function may be called |
308 | | // from IndexStream::~IndexStream() during LoadStream destruction, at which |
309 | | // point the RuntimeProfile (and _close_wait_timer) may already be destroyed. |
310 | | // Use heavy_work_pool to avoid blocking bthread |
311 | 13 | auto st = _run_in_heavy_work_pool([this]() { |
312 | 13 | _flush_token->wait(); |
313 | 13 | return Status::OK(); |
314 | 13 | }); |
315 | 13 | if (!st.ok()) { |
316 | | // If heavy_work_pool is unavailable, fall back to shutdown |
317 | | // which will cancel pending tasks and wait for running tasks |
318 | 0 | _flush_token->shutdown(); |
319 | 0 | _status.update(st); |
320 | 0 | } |
321 | 13 | } |
322 | | |
323 | 14 | void TabletStream::pre_close() { |
324 | 14 | SCOPED_TIMER(_close_wait_timer); |
325 | 14 | wait_for_flush_tasks(); |
326 | | |
327 | 14 | if (!_status.ok()) { |
328 | 3 | return; |
329 | 3 | } |
330 | | |
331 | 11 | DBUG_EXECUTE_IF("TabletStream.close.segment_num_mismatch", { _num_segments++; }); |
332 | 11 | if (_check_num_segments && (_next_segid.load() != _num_segments)) { |
333 | 2 | _status.update(Status::Corruption( |
334 | 2 | "segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id, |
335 | 2 | _num_segments, _next_segid.load(), print_id(_load_id))); |
336 | 2 | return; |
337 | 2 | } |
338 | | |
339 | 9 | _status.update(_run_in_heavy_work_pool([this]() { return _load_stream_writer->pre_close(); })); |
340 | 9 | } |
341 | | |
342 | 14 | Status TabletStream::close() { |
343 | 14 | if (!_status.ok()) { |
344 | 6 | return _status.status(); |
345 | 6 | } |
346 | | |
347 | 8 | SCOPED_TIMER(_close_wait_timer); |
348 | 8 | _status.update(_run_in_heavy_work_pool([this]() { return _load_stream_writer->close(); })); |
349 | 8 | return _status.status(); |
350 | 14 | } |
351 | | |
352 | | IndexStream::IndexStream(const PUniqueId& load_id, int64_t id, int64_t txn_id, |
353 | | std::shared_ptr<OlapTableSchemaParam> schema, |
354 | | LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile) |
355 | 28 | : _id(id), |
356 | 28 | _load_id(load_id), |
357 | 28 | _txn_id(txn_id), |
358 | 28 | _schema(schema), |
359 | 28 | _load_stream_mgr(load_stream_mgr) { |
360 | 28 | _profile = profile->create_child(fmt::format("IndexStream {}", id), true, true); |
361 | 28 | _append_data_timer = ADD_TIMER(_profile, "AppendDataTime"); |
362 | 28 | _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime"); |
363 | 28 | } |
364 | | |
365 | 28 | IndexStream::~IndexStream() { |
366 | | // Ensure all TabletStreams have their flush tokens properly handled before destruction. |
367 | | // In normal flow, close() should have called pre_close() on all tablet streams. |
368 | | // But if IndexStream is destroyed without close() being called (e.g., on_idle_timeout), |
369 | | // we need to wait for flush tasks here to ensure flush tokens are properly shut down. |
370 | 28 | for (auto& [_, tablet_stream] : _tablet_streams_map) { |
371 | 14 | tablet_stream->wait_for_flush_tasks(); |
372 | 14 | } |
373 | 28 | } |
374 | | |
375 | 27 | Status IndexStream::append_data(const PStreamHeader& header, butil::IOBuf* data) { |
376 | 27 | SCOPED_TIMER(_append_data_timer); |
377 | 27 | int64_t tablet_id = header.tablet_id(); |
378 | 27 | TabletStreamSharedPtr tablet_stream; |
379 | 27 | { |
380 | 27 | std::lock_guard lock_guard(_lock); |
381 | 27 | auto it = _tablet_streams_map.find(tablet_id); |
382 | 27 | if (it == _tablet_streams_map.end()) { |
383 | 13 | _init_tablet_stream(tablet_stream, tablet_id, header.partition_id()); |
384 | 14 | } else { |
385 | 14 | tablet_stream = it->second; |
386 | 14 | } |
387 | 27 | } |
388 | | |
389 | 27 | return tablet_stream->append_data(header, data); |
390 | 27 | } |
391 | | |
392 | | void IndexStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id, |
393 | 14 | int64_t partition_id) { |
394 | 14 | tablet_stream = std::make_shared<TabletStream>(_load_id, tablet_id, _txn_id, _load_stream_mgr, |
395 | 14 | _profile); |
396 | 14 | _tablet_streams_map[tablet_id] = tablet_stream; |
397 | 14 | auto st = tablet_stream->init(_schema, _id, partition_id); |
398 | 14 | if (!st.ok()) { |
399 | 1 | LOG(WARNING) << "tablet stream init failed " << *tablet_stream; |
400 | 1 | } |
401 | 14 | } |
402 | | |
403 | 0 | void IndexStream::get_all_write_tablet_ids(std::vector<int64_t>* tablet_ids) { |
404 | 0 | std::lock_guard lock_guard(_lock); |
405 | 0 | for (const auto& [tablet_id, _] : _tablet_streams_map) { |
406 | 0 | tablet_ids->push_back(tablet_id); |
407 | 0 | } |
408 | 0 | } |
409 | | |
410 | | void IndexStream::close(const std::vector<PTabletID>& tablets_to_commit, |
411 | 28 | std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) { |
412 | 28 | std::lock_guard lock_guard(_lock); |
413 | 28 | SCOPED_TIMER(_close_wait_timer); |
414 | | // open all need commit tablets |
415 | 34 | for (const auto& tablet : tablets_to_commit) { |
416 | 34 | if (_id != tablet.index_id()) { |
417 | 18 | continue; |
418 | 18 | } |
419 | 16 | TabletStreamSharedPtr tablet_stream; |
420 | 16 | auto it = _tablet_streams_map.find(tablet.tablet_id()); |
421 | 16 | if (it == _tablet_streams_map.end()) { |
422 | 1 | _init_tablet_stream(tablet_stream, tablet.tablet_id(), tablet.partition_id()); |
423 | 15 | } else { |
424 | 15 | tablet_stream = it->second; |
425 | 15 | } |
426 | 16 | if (tablet.has_num_segments()) { |
427 | 16 | tablet_stream->add_num_segments(tablet.num_segments()); |
428 | 16 | } else { |
429 | | // for compatibility reasons (sink from old version BE) |
430 | 0 | tablet_stream->disable_num_segments_check(); |
431 | 0 | } |
432 | 16 | } |
433 | | |
434 | 28 | for (auto& [_, tablet_stream] : _tablet_streams_map) { |
435 | 14 | tablet_stream->pre_close(); |
436 | 14 | } |
437 | | |
438 | 28 | for (auto& [_, tablet_stream] : _tablet_streams_map) { |
439 | 14 | auto st = tablet_stream->close(); |
440 | 14 | if (st.ok()) { |
441 | 8 | success_tablet_ids->push_back(tablet_stream->id()); |
442 | 8 | } else { |
443 | 6 | LOG(INFO) << "close tablet stream " << *tablet_stream << ", status=" << st; |
444 | 6 | failed_tablets->emplace_back(tablet_stream->id(), st); |
445 | 6 | } |
446 | 14 | } |
447 | 28 | } |
448 | | |
449 | | // TODO: Profile is temporary disabled, because: |
450 | | // 1. It's not being processed by the upstream for now |
451 | | // 2. There are some problems in _profile->to_thrift() |
452 | | LoadStream::LoadStream(const PUniqueId& load_id, LoadStreamMgr* load_stream_mgr, |
453 | | bool enable_profile) |
454 | 14 | : _load_id(load_id), _enable_profile(false), _load_stream_mgr(load_stream_mgr) { |
455 | 14 | g_load_stream_cnt << 1; |
456 | 14 | _profile = std::make_unique<RuntimeProfile>("LoadStream"); |
457 | 14 | _append_data_timer = ADD_TIMER(_profile, "AppendDataTime"); |
458 | 14 | _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime"); |
459 | 14 | TUniqueId load_tid = ((UniqueId)load_id).to_thrift(); |
460 | | #ifndef BE_TEST |
461 | | std::shared_ptr<QueryContext> query_context = |
462 | | ExecEnv::GetInstance()->fragment_mgr()->get_query_ctx(load_tid); |
463 | | if (query_context != nullptr) { |
464 | | _resource_ctx = query_context->resource_ctx(); |
465 | | } else { |
466 | | _resource_ctx = ResourceContext::create_shared(); |
467 | | _resource_ctx->task_controller()->set_task_id(load_tid); |
468 | | std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared( |
469 | | MemTrackerLimiter::Type::LOAD, |
470 | | fmt::format("(FromLoadStream)Load#Id={}", ((UniqueId)load_id).to_string())); |
471 | | _resource_ctx->memory_context()->set_mem_tracker(mem_tracker); |
472 | | } |
473 | | #else |
474 | 14 | _resource_ctx = ResourceContext::create_shared(); |
475 | 14 | _resource_ctx->task_controller()->set_task_id(load_tid); |
476 | 14 | std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared( |
477 | 14 | MemTrackerLimiter::Type::LOAD, |
478 | 14 | fmt::format("(FromLoadStream)Load#Id={}", ((UniqueId)load_id).to_string())); |
479 | 14 | _resource_ctx->memory_context()->set_mem_tracker(mem_tracker); |
480 | 14 | #endif |
481 | 14 | } |
482 | | |
483 | 14 | LoadStream::~LoadStream() { |
484 | 14 | g_load_stream_cnt << -1; |
485 | 14 | LOG(INFO) << "load stream is deconstructed " << *this; |
486 | 14 | } |
487 | | |
488 | 14 | Status LoadStream::init(const POpenLoadStreamRequest* request) { |
489 | 14 | _txn_id = request->txn_id(); |
490 | 14 | _total_streams = static_cast<int32_t>(request->total_streams()); |
491 | 14 | _is_incremental = (_total_streams == 0); |
492 | | |
493 | 14 | _schema = std::make_shared<OlapTableSchemaParam>(); |
494 | 14 | RETURN_IF_ERROR(_schema->init(request->schema())); |
495 | 28 | for (auto& index : request->schema().indexes()) { |
496 | 28 | _index_streams_map[index.id()] = std::make_shared<IndexStream>( |
497 | 28 | _load_id, index.id(), _txn_id, _schema, _load_stream_mgr, _profile.get()); |
498 | 28 | } |
499 | 14 | LOG(INFO) << "succeed to init load stream " << *this; |
500 | 14 | return Status::OK(); |
501 | 14 | } |
502 | | |
503 | | bool LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit, |
504 | 16 | std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) { |
505 | 16 | std::lock_guard<bthread::Mutex> lock_guard(_lock); |
506 | 16 | SCOPED_TIMER(_close_wait_timer); |
507 | | |
508 | | // we do nothing until recv CLOSE_LOAD from all stream to ensure all data are handled before ack |
509 | 16 | _open_streams[src_id]--; |
510 | 16 | if (_open_streams[src_id] == 0) { |
511 | 16 | _open_streams.erase(src_id); |
512 | 16 | } |
513 | 16 | _close_load_cnt++; |
514 | 16 | LOG(INFO) << "received CLOSE_LOAD from sender " << src_id << ", remaining " |
515 | 16 | << _total_streams - _close_load_cnt << " senders, " << *this; |
516 | | |
517 | 16 | _tablets_to_commit.insert(_tablets_to_commit.end(), tablets_to_commit.begin(), |
518 | 16 | tablets_to_commit.end()); |
519 | | |
520 | 16 | if (_close_load_cnt < _total_streams) { |
521 | | // do not return commit info if there is remaining streams. |
522 | 2 | return false; |
523 | 2 | } |
524 | | |
525 | 28 | for (auto& [_, index_stream] : _index_streams_map) { |
526 | 28 | index_stream->close(_tablets_to_commit, success_tablet_ids, failed_tablets); |
527 | 28 | } |
528 | 14 | LOG(INFO) << "close load " << *this << ", success_tablet_num=" << success_tablet_ids->size() |
529 | 14 | << ", failed_tablet_num=" << failed_tablets->size(); |
530 | 14 | return true; |
531 | 16 | } |
532 | | |
533 | | void LoadStream::_report_result(StreamId stream, const Status& status, |
534 | | const std::vector<int64_t>& success_tablet_ids, |
535 | 36 | const FailedTablets& failed_tablets, bool eos) { |
536 | 36 | LOG(INFO) << "report result " << *this << ", success tablet num " << success_tablet_ids.size() |
537 | 36 | << ", failed tablet num " << failed_tablets.size(); |
538 | 36 | butil::IOBuf buf; |
539 | 36 | PLoadStreamResponse response; |
540 | 36 | response.set_eos(eos); |
541 | 36 | status.to_protobuf(response.mutable_status()); |
542 | 36 | for (auto& id : success_tablet_ids) { |
543 | 8 | response.add_success_tablet_ids(id); |
544 | 8 | } |
545 | 36 | for (auto& [id, st] : failed_tablets) { |
546 | 10 | auto pb = response.add_failed_tablets(); |
547 | 10 | pb->set_id(id); |
548 | 10 | st.to_protobuf(pb->mutable_status()); |
549 | 10 | } |
550 | | |
551 | 36 | if (_enable_profile && _close_load_cnt == _total_streams) { |
552 | 0 | TRuntimeProfileTree tprofile; |
553 | 0 | ThriftSerializer ser(false, 4096); |
554 | 0 | uint8_t* profile_buf = nullptr; |
555 | 0 | uint32_t len = 0; |
556 | 0 | std::unique_lock<bthread::Mutex> l(_lock); |
557 | |
|
558 | 0 | _profile->to_thrift(&tprofile); |
559 | 0 | auto st = ser.serialize(&tprofile, &len, &profile_buf); |
560 | 0 | if (st.ok()) { |
561 | 0 | response.set_load_stream_profile(profile_buf, len); |
562 | 0 | } else { |
563 | 0 | LOG(WARNING) << "TRuntimeProfileTree serialize failed, errmsg=" << st << ", " << *this; |
564 | 0 | } |
565 | 0 | } |
566 | | |
567 | 36 | buf.append(response.SerializeAsString()); |
568 | 36 | auto wst = _write_stream(stream, buf); |
569 | 36 | if (!wst.ok()) { |
570 | 0 | LOG(WARNING) << " report result failed with " << wst << ", " << *this; |
571 | 0 | } |
572 | 36 | } |
573 | | |
574 | 0 | void LoadStream::_report_schema(StreamId stream, const PStreamHeader& hdr) { |
575 | 0 | butil::IOBuf buf; |
576 | 0 | PLoadStreamResponse response; |
577 | 0 | Status st = Status::OK(); |
578 | 0 | for (const auto& req : hdr.tablets()) { |
579 | 0 | BaseTabletSPtr tablet; |
580 | 0 | if (auto res = ExecEnv::get_tablet(req.tablet_id()); res.has_value()) { |
581 | 0 | tablet = std::move(res).value(); |
582 | 0 | } else { |
583 | 0 | st = std::move(res).error(); |
584 | 0 | break; |
585 | 0 | } |
586 | 0 | auto* resp = response.add_tablet_schemas(); |
587 | 0 | resp->set_index_id(req.index_id()); |
588 | 0 | resp->set_enable_unique_key_merge_on_write(tablet->enable_unique_key_merge_on_write()); |
589 | 0 | tablet->tablet_schema()->to_schema_pb(resp->mutable_tablet_schema()); |
590 | 0 | } |
591 | 0 | st.to_protobuf(response.mutable_status()); |
592 | |
|
593 | 0 | buf.append(response.SerializeAsString()); |
594 | 0 | auto wst = _write_stream(stream, buf); |
595 | 0 | if (!wst.ok()) { |
596 | 0 | LOG(WARNING) << " report result failed with " << wst << ", " << *this; |
597 | 0 | } |
598 | 0 | } |
599 | | |
600 | 0 | void LoadStream::_report_tablet_load_info(StreamId stream, int64_t index_id) { |
601 | 0 | std::vector<int64_t> write_tablet_ids; |
602 | 0 | auto it = _index_streams_map.find(index_id); |
603 | 0 | if (it != _index_streams_map.end()) { |
604 | 0 | it->second->get_all_write_tablet_ids(&write_tablet_ids); |
605 | 0 | } |
606 | |
|
607 | 0 | if (!write_tablet_ids.empty()) { |
608 | 0 | butil::IOBuf buf; |
609 | 0 | PLoadStreamResponse response; |
610 | 0 | auto* tablet_load_infos = response.mutable_tablet_load_rowset_num_infos(); |
611 | 0 | _collect_tablet_load_info_from_tablets(write_tablet_ids, tablet_load_infos); |
612 | 0 | if (tablet_load_infos->empty()) { |
613 | 0 | return; |
614 | 0 | } |
615 | 0 | buf.append(response.SerializeAsString()); |
616 | 0 | auto wst = _write_stream(stream, buf); |
617 | 0 | if (!wst.ok()) { |
618 | 0 | LOG(WARNING) << "report tablet load info failed with " << wst << ", " << *this; |
619 | 0 | } |
620 | 0 | } |
621 | 0 | } |
622 | | |
623 | | void LoadStream::_collect_tablet_load_info_from_tablets( |
624 | | const std::vector<int64_t>& tablet_ids, |
625 | 0 | google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>* tablet_load_infos) { |
626 | 0 | for (auto tablet_id : tablet_ids) { |
627 | 0 | BaseTabletSPtr tablet; |
628 | 0 | if (auto res = ExecEnv::get_tablet(tablet_id); res.has_value()) { |
629 | 0 | tablet = std::move(res).value(); |
630 | 0 | } else { |
631 | 0 | continue; |
632 | 0 | } |
633 | 0 | BaseDeltaWriter::collect_tablet_load_rowset_num_info(tablet.get(), tablet_load_infos); |
634 | 0 | } |
635 | 0 | } |
636 | | |
637 | 36 | Status LoadStream::_write_stream(StreamId stream, butil::IOBuf& buf) { |
638 | 36 | for (;;) { |
639 | 36 | int ret = 0; |
640 | 36 | DBUG_EXECUTE_IF("LoadStream._write_stream.EAGAIN", { ret = EAGAIN; }); |
641 | 36 | if (ret == 0) { |
642 | 36 | ret = brpc::StreamWrite(stream, buf); |
643 | 36 | } |
644 | 36 | switch (ret) { |
645 | 36 | case 0: |
646 | 36 | return Status::OK(); |
647 | 0 | case EAGAIN: { |
648 | 0 | const timespec time = butil::seconds_from_now(config::load_stream_eagain_wait_seconds); |
649 | 0 | int wait_ret = brpc::StreamWait(stream, &time); |
650 | 0 | if (wait_ret != 0) { |
651 | 0 | return Status::InternalError("StreamWait failed, err={}", wait_ret); |
652 | 0 | } |
653 | 0 | break; |
654 | 0 | } |
655 | 0 | default: |
656 | 0 | return Status::InternalError("StreamWrite failed, err={}", ret); |
657 | 36 | } |
658 | 36 | } |
659 | 0 | return Status::OK(); |
660 | 36 | } |
661 | | |
662 | 62 | void LoadStream::_parse_header(butil::IOBuf* const message, PStreamHeader& hdr) { |
663 | 62 | butil::IOBufAsZeroCopyInputStream wrapper(*message); |
664 | 62 | hdr.ParseFromZeroCopyStream(&wrapper); |
665 | 62 | VLOG_DEBUG << "header parse result: " << hdr.DebugString(); |
666 | 62 | } |
667 | | |
668 | 28 | Status LoadStream::_append_data(const PStreamHeader& header, butil::IOBuf* data) { |
669 | 28 | SCOPED_TIMER(_append_data_timer); |
670 | 28 | IndexStreamSharedPtr index_stream; |
671 | | |
672 | 28 | int64_t index_id = header.index_id(); |
673 | 28 | DBUG_EXECUTE_IF("TabletStream._append_data.unknown_indexid", |
674 | 28 | { index_id = UNKNOWN_ID_FOR_TEST; }); |
675 | 28 | auto it = _index_streams_map.find(index_id); |
676 | 28 | if (it == _index_streams_map.end()) { |
677 | 1 | return Status::Error<ErrorCode::INVALID_ARGUMENT>("unknown index_id {}", index_id); |
678 | 27 | } else { |
679 | 27 | index_stream = it->second; |
680 | 27 | } |
681 | | |
682 | 27 | return index_stream->append_data(header, data); |
683 | 28 | } |
684 | | |
685 | 50 | int LoadStream::on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) { |
686 | 50 | VLOG_DEBUG << "on_received_messages " << id << " " << size; |
687 | 112 | for (size_t i = 0; i < size; ++i) { |
688 | 124 | while (messages[i]->size() > 0) { |
689 | | // step 1: parse header |
690 | 62 | size_t hdr_len = 0; |
691 | 62 | messages[i]->cutn((void*)&hdr_len, sizeof(size_t)); |
692 | 62 | butil::IOBuf hdr_buf; |
693 | 62 | PStreamHeader hdr; |
694 | 62 | messages[i]->cutn(&hdr_buf, hdr_len); |
695 | 62 | _parse_header(&hdr_buf, hdr); |
696 | | |
697 | | // step 2: cut data |
698 | 62 | size_t data_len = 0; |
699 | 62 | messages[i]->cutn((void*)&data_len, sizeof(size_t)); |
700 | 62 | butil::IOBuf data_buf; |
701 | 62 | PStreamHeader data; |
702 | 62 | messages[i]->cutn(&data_buf, data_len); |
703 | | |
704 | | // step 3: dispatch |
705 | 62 | _dispatch(id, hdr, &data_buf); |
706 | 62 | } |
707 | 62 | } |
708 | 50 | return 0; |
709 | 50 | } |
710 | | |
711 | 62 | void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* data) { |
712 | 62 | VLOG_DEBUG << PStreamHeader_Opcode_Name(hdr.opcode()) << " from " << hdr.src_id() |
713 | 0 | << " with tablet " << hdr.tablet_id(); |
714 | 62 | SCOPED_ATTACH_TASK(_resource_ctx); |
715 | | // CLOSE_LOAD message should not be fault injected, |
716 | | // otherwise the message will be ignored and causing close wait timeout |
717 | 62 | if (hdr.opcode() != PStreamHeader::CLOSE_LOAD) { |
718 | 30 | DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_loadid", { |
719 | 30 | PStreamHeader& t_hdr = const_cast<PStreamHeader&>(hdr); |
720 | 30 | PUniqueId* load_id = t_hdr.mutable_load_id(); |
721 | 30 | load_id->set_hi(UNKNOWN_ID_FOR_TEST); |
722 | 30 | load_id->set_lo(UNKNOWN_ID_FOR_TEST); |
723 | 30 | }); |
724 | 30 | DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_srcid", { |
725 | 30 | PStreamHeader& t_hdr = const_cast<PStreamHeader&>(hdr); |
726 | 30 | t_hdr.set_src_id(UNKNOWN_ID_FOR_TEST); |
727 | 30 | }); |
728 | 30 | } |
729 | 62 | if (UniqueId(hdr.load_id()) != UniqueId(_load_id)) { |
730 | 1 | Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>( |
731 | 1 | "invalid load id {}, expected {}", print_id(hdr.load_id()), print_id(_load_id)); |
732 | 1 | _report_failure(id, st, hdr); |
733 | 1 | return; |
734 | 1 | } |
735 | | |
736 | 61 | { |
737 | 61 | std::lock_guard lock_guard(_lock); |
738 | 61 | if (!_open_streams.contains(hdr.src_id())) { |
739 | 17 | Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>("no open stream from source {}", |
740 | 17 | hdr.src_id()); |
741 | 17 | _report_failure(id, st, hdr); |
742 | 17 | return; |
743 | 17 | } |
744 | 61 | } |
745 | | |
746 | 44 | switch (hdr.opcode()) { |
747 | 0 | case PStreamHeader::ADD_SEGMENT: { |
748 | 0 | auto st = _append_data(hdr, data); |
749 | 0 | if (!st.ok()) { |
750 | 0 | _report_failure(id, st, hdr); |
751 | 0 | } else { |
752 | | // Report tablet load info only on ADD_SEGMENT to reduce frequency. |
753 | | // ADD_SEGMENT is sent once per segment, while APPEND_DATA is sent |
754 | | // for every data batch. This reduces unnecessary writes and avoids |
755 | | // potential stream write failures when the sender is closing. |
756 | 0 | _report_tablet_load_info(id, hdr.index_id()); |
757 | 0 | } |
758 | 0 | } break; |
759 | 28 | case PStreamHeader::APPEND_DATA: { |
760 | 28 | auto st = _append_data(hdr, data); |
761 | 28 | if (!st.ok()) { |
762 | 2 | _report_failure(id, st, hdr); |
763 | 2 | } |
764 | 28 | } break; |
765 | 16 | case PStreamHeader::CLOSE_LOAD: { |
766 | 16 | DBUG_EXECUTE_IF("LoadStream.close_load.block", DBUG_BLOCK); |
767 | 16 | std::vector<int64_t> success_tablet_ids; |
768 | 16 | FailedTablets failed_tablets; |
769 | 16 | std::vector<PTabletID> tablets_to_commit(hdr.tablets().begin(), hdr.tablets().end()); |
770 | 16 | bool all_closed = |
771 | 16 | close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets); |
772 | 16 | _report_result(id, Status::OK(), success_tablet_ids, failed_tablets, true); |
773 | 16 | std::lock_guard<bthread::Mutex> lock_guard(_lock); |
774 | | // if incremental stream, we need to wait for all non-incremental streams to be closed |
775 | | // before closing incremental streams. We need a fencing mechanism to avoid use after closing |
776 | | // across different be. |
777 | 16 | if (hdr.has_num_incremental_streams() && hdr.num_incremental_streams() > 0) { |
778 | 0 | _closing_stream_ids.push_back(id); |
779 | 16 | } else { |
780 | 16 | brpc::StreamClose(id); |
781 | 16 | } |
782 | | |
783 | 16 | if (all_closed) { |
784 | 14 | for (auto& closing_id : _closing_stream_ids) { |
785 | 0 | brpc::StreamClose(closing_id); |
786 | 0 | } |
787 | 14 | _closing_stream_ids.clear(); |
788 | 14 | } |
789 | 16 | } break; |
790 | 0 | case PStreamHeader::GET_SCHEMA: { |
791 | 0 | _report_schema(id, hdr); |
792 | 0 | } break; |
793 | 0 | default: |
794 | 0 | LOG(WARNING) << "unexpected stream message " << hdr.opcode() << ", " << *this; |
795 | 0 | DCHECK(false); |
796 | 44 | } |
797 | 44 | } |
798 | | |
799 | 0 | void LoadStream::on_idle_timeout(StreamId id) { |
800 | 0 | LOG(WARNING) << "closing load stream on idle timeout, " << *this; |
801 | 0 | brpc::StreamClose(id); |
802 | 0 | } |
803 | | |
804 | 16 | void LoadStream::on_closed(StreamId id) { |
805 | | // `this` may be freed by other threads after increasing `_close_rpc_cnt`, |
806 | | // format string first to prevent use-after-free |
807 | 16 | std::stringstream ss; |
808 | 16 | ss << *this; |
809 | 16 | auto remaining_streams = _total_streams - _close_rpc_cnt.fetch_add(1) - 1; |
810 | 16 | LOG(INFO) << "stream " << id << " on_closed, remaining streams = " << remaining_streams << ", " |
811 | 16 | << ss.str(); |
812 | 16 | if (remaining_streams == 0) { |
813 | 14 | _load_stream_mgr->clear_load(_load_id); |
814 | 14 | } |
815 | 16 | } |
816 | | |
817 | 110 | inline std::ostream& operator<<(std::ostream& ostr, const LoadStream& load_stream) { |
818 | 110 | ostr << "load_id=" << print_id(load_stream._load_id) << ", txn_id=" << load_stream._txn_id; |
819 | 110 | return ostr; |
820 | 110 | } |
821 | | |
822 | | #include "common/compile_check_end.h" |
823 | | } // namespace doris |