/root/doris/be/src/runtime/load_stream.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "runtime/load_stream.h" |
19 | | |
20 | | #include <brpc/stream.h> |
21 | | #include <bthread/bthread.h> |
22 | | #include <bthread/condition_variable.h> |
23 | | #include <bthread/mutex.h> |
24 | | #include <olap/rowset/rowset_factory.h> |
25 | | #include <olap/rowset/rowset_meta.h> |
26 | | #include <olap/storage_engine.h> |
27 | | #include <olap/tablet_manager.h> |
28 | | #include <runtime/exec_env.h> |
29 | | |
30 | | #include <memory> |
31 | | #include <sstream> |
32 | | |
33 | | #include "bvar/bvar.h" |
34 | | #include "cloud/config.h" |
35 | | #include "common/signal_handler.h" |
36 | | #include "exec/tablet_info.h" |
37 | | #include "gutil/ref_counted.h" |
38 | | #include "olap/tablet.h" |
39 | | #include "olap/tablet_fwd.h" |
40 | | #include "olap/tablet_schema.h" |
41 | | #include "runtime/exec_env.h" |
42 | | #include "runtime/fragment_mgr.h" |
43 | | #include "runtime/load_channel.h" |
44 | | #include "runtime/load_stream_mgr.h" |
45 | | #include "runtime/load_stream_writer.h" |
46 | | #include "runtime/workload_group/workload_group_manager.h" |
47 | | #include "util/debug_points.h" |
48 | | #include "util/runtime_profile.h" |
49 | | #include "util/thrift_util.h" |
50 | | #include "util/uid_util.h" |
51 | | |
52 | | #define UNKNOWN_ID_FOR_TEST 0x7c00 |
53 | | |
54 | | namespace doris { |
55 | | |
56 | | bvar::Adder<int64_t> g_load_stream_cnt("load_stream_count"); |
57 | | bvar::LatencyRecorder g_load_stream_flush_wait_ms("load_stream_flush_wait_ms"); |
58 | | bvar::Adder<int> g_load_stream_flush_running_threads("load_stream_flush_wait_threads"); |
59 | | |
60 | | TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id, |
61 | | LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile) |
62 | | : _id(id), |
63 | | _next_segid(0), |
64 | | _load_id(load_id), |
65 | | _txn_id(txn_id), |
66 | 14 | _load_stream_mgr(load_stream_mgr) { |
67 | 14 | load_stream_mgr->create_tokens(_flush_tokens); |
68 | 14 | _profile = profile->create_child(fmt::format("TabletStream {}", id), true, true); |
69 | 14 | _append_data_timer = ADD_TIMER(_profile, "AppendDataTime"); |
70 | 14 | _add_segment_timer = ADD_TIMER(_profile, "AddSegmentTime"); |
71 | 14 | _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime"); |
72 | 14 | } |
73 | | |
74 | 10 | inline std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream) { |
75 | 10 | ostr << "load_id=" << tablet_stream._load_id << ", txn_id=" << tablet_stream._txn_id |
76 | 10 | << ", tablet_id=" << tablet_stream._id << ", status=" << tablet_stream._status.status(); |
77 | 10 | return ostr; |
78 | 10 | } |
79 | | |
80 | | Status TabletStream::init(std::shared_ptr<OlapTableSchemaParam> schema, int64_t index_id, |
81 | 14 | int64_t partition_id) { |
82 | 14 | WriteRequest req { |
83 | 14 | .tablet_id = _id, |
84 | 14 | .txn_id = _txn_id, |
85 | 14 | .index_id = index_id, |
86 | 14 | .partition_id = partition_id, |
87 | 14 | .load_id = _load_id, |
88 | 14 | .table_schema_param = schema, |
89 | | // TODO(plat1ko): write_file_cache |
90 | 14 | .storage_vault_id {}, |
91 | 14 | }; |
92 | | |
93 | 14 | _load_stream_writer = std::make_shared<LoadStreamWriter>(&req, _profile); |
94 | 14 | DBUG_EXECUTE_IF("TabletStream.init.uninited_writer", { |
95 | 14 | _status.update(Status::Uninitialized("fault injection")); |
96 | 14 | return _status.status(); |
97 | 14 | }); |
98 | 14 | _status.update(_load_stream_writer->init()); |
99 | 14 | if (!_status.ok()) { |
100 | 1 | LOG(INFO) << "failed to init rowset builder due to " << *this; |
101 | 1 | } |
102 | 14 | return _status.status(); |
103 | 14 | } |
104 | | |
105 | 27 | Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data) { |
106 | 27 | if (!_status.ok()) { |
107 | 1 | return _status.status(); |
108 | 1 | } |
109 | | |
110 | | // dispatch add_segment request |
111 | 26 | if (header.opcode() == PStreamHeader::ADD_SEGMENT) { |
112 | 0 | return add_segment(header, data); |
113 | 0 | } |
114 | | |
115 | 26 | SCOPED_TIMER(_append_data_timer); |
116 | | |
117 | 26 | int64_t src_id = header.src_id(); |
118 | 26 | uint32_t segid = header.segment_id(); |
119 | | // Ensure there are enough space and mapping are built. |
120 | 26 | SegIdMapping* mapping = nullptr; |
121 | 26 | { |
122 | 26 | std::lock_guard lock_guard(_lock); |
123 | 26 | if (!_segids_mapping.contains(src_id)) { |
124 | 14 | _segids_mapping[src_id] = std::make_unique<SegIdMapping>(); |
125 | 14 | } |
126 | 26 | mapping = _segids_mapping[src_id].get(); |
127 | 26 | } |
128 | 26 | if (segid + 1 > mapping->size()) { |
129 | | // TODO: Each sender lock is enough. |
130 | 14 | std::lock_guard lock_guard(_lock); |
131 | 14 | ssize_t origin_size = mapping->size(); |
132 | 15 | if (segid + 1 > origin_size) { |
133 | 15 | mapping->resize(segid + 1, std::numeric_limits<uint32_t>::max()); |
134 | 39 | for (size_t index = origin_size; index <= segid; index++) { |
135 | 24 | mapping->at(index) = _next_segid; |
136 | 24 | _next_segid++; |
137 | 24 | VLOG_DEBUG << "src_id=" << src_id << ", segid=" << index << " to " |
138 | 0 | << " segid=" << _next_segid - 1 << ", " << *this; |
139 | 24 | } |
140 | 15 | } |
141 | 14 | } |
142 | | |
143 | | // Each sender sends data in one segment sequential, so we also do not |
144 | | // need a lock here. |
145 | 26 | bool eos = header.segment_eos(); |
146 | 26 | FileType file_type = header.file_type(); |
147 | 26 | uint32_t new_segid = mapping->at(segid); |
148 | 26 | DCHECK(new_segid != std::numeric_limits<uint32_t>::max()); |
149 | 26 | butil::IOBuf buf = data->movable(); |
150 | 26 | auto flush_func = [this, new_segid, eos, buf, header, file_type]() mutable { |
151 | 26 | signal::set_signal_task_id(_load_id); |
152 | 26 | g_load_stream_flush_running_threads << -1; |
153 | 26 | auto st = _load_stream_writer->append_data(new_segid, header.offset(), buf, file_type); |
154 | 26 | if (!st.ok() && !config::is_cloud_mode()) { |
155 | 1 | auto res = ExecEnv::get_tablet(_id); |
156 | 1 | TabletSharedPtr tablet = |
157 | 1 | res.has_value() ? std::dynamic_pointer_cast<Tablet>(res.value()) : nullptr; |
158 | 1 | if (tablet) { |
159 | 1 | tablet->report_error(st); |
160 | 1 | } |
161 | 1 | } |
162 | 26 | if (eos && st.ok()) { |
163 | 20 | DBUG_EXECUTE_IF("TabletStream.append_data.unknown_file_type", |
164 | 20 | { file_type = static_cast<FileType>(-1); }); |
165 | 20 | if (file_type == FileType::SEGMENT_FILE || file_type == FileType::INVERTED_INDEX_FILE) { |
166 | 20 | st = _load_stream_writer->close_writer(new_segid, file_type); |
167 | 20 | } else { |
168 | 0 | st = Status::InternalError( |
169 | 0 | "appent data failed, file type error, file type = {}, " |
170 | 0 | "segment_id={}", |
171 | 0 | file_type, new_segid); |
172 | 0 | } |
173 | 20 | } |
174 | 26 | DBUG_EXECUTE_IF("TabletStream.append_data.append_failed", |
175 | 26 | { st = Status::InternalError("fault injection"); }); |
176 | 26 | if (!st.ok()) { |
177 | 2 | _status.update(st); |
178 | 2 | LOG(WARNING) << "write data failed " << st << ", " << *this; |
179 | 2 | } |
180 | 26 | }; |
181 | 26 | auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()]; |
182 | 26 | auto load_stream_flush_token_max_tasks = config::load_stream_flush_token_max_tasks; |
183 | 26 | auto load_stream_max_wait_flush_token_time_ms = |
184 | 26 | config::load_stream_max_wait_flush_token_time_ms; |
185 | 26 | DBUG_EXECUTE_IF("TabletStream.append_data.long_wait", { |
186 | 26 | load_stream_flush_token_max_tasks = 0; |
187 | 26 | load_stream_max_wait_flush_token_time_ms = 1000; |
188 | 26 | }); |
189 | 26 | MonotonicStopWatch timer; |
190 | 26 | timer.start(); |
191 | 26 | while (flush_token->num_tasks() >= load_stream_flush_token_max_tasks) { |
192 | 0 | if (timer.elapsed_time() / 1000 / 1000 >= load_stream_max_wait_flush_token_time_ms) { |
193 | 0 | _status.update( |
194 | 0 | Status::Error<true>("wait flush token back pressure time is more than " |
195 | 0 | "load_stream_max_wait_flush_token_time {}", |
196 | 0 | load_stream_max_wait_flush_token_time_ms)); |
197 | 0 | return _status.status(); |
198 | 0 | } |
199 | 0 | bthread_usleep(2 * 1000); // 2ms |
200 | 0 | } |
201 | 26 | timer.stop(); |
202 | 26 | int64_t time_ms = timer.elapsed_time() / 1000 / 1000; |
203 | 26 | g_load_stream_flush_wait_ms << time_ms; |
204 | 26 | g_load_stream_flush_running_threads << 1; |
205 | 26 | Status st = Status::OK(); |
206 | 26 | DBUG_EXECUTE_IF("TabletStream.append_data.submit_func_failed", |
207 | 26 | { st = Status::InternalError("fault injection"); }); |
208 | 26 | if (st.ok()) { |
209 | 26 | st = flush_token->submit_func(flush_func); |
210 | 26 | } |
211 | 26 | if (!st.ok()) { |
212 | 0 | _status.update(st); |
213 | 0 | } |
214 | 26 | return _status.status(); |
215 | 26 | } |
216 | | |
217 | 0 | Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data) { |
218 | 0 | if (!_status.ok()) { |
219 | 0 | return _status.status(); |
220 | 0 | } |
221 | | |
222 | 0 | SCOPED_TIMER(_add_segment_timer); |
223 | 0 | DCHECK(header.has_segment_statistics()); |
224 | 0 | SegmentStatistics stat(header.segment_statistics()); |
225 | 0 | TabletSchemaSPtr flush_schema; |
226 | 0 | if (header.has_flush_schema()) { |
227 | 0 | flush_schema = std::make_shared<TabletSchema>(); |
228 | 0 | flush_schema->init_from_pb(header.flush_schema()); |
229 | 0 | } |
230 | |
|
231 | 0 | int64_t src_id = header.src_id(); |
232 | 0 | uint32_t segid = header.segment_id(); |
233 | 0 | uint32_t new_segid; |
234 | 0 | DBUG_EXECUTE_IF("TabletStream.add_segment.unknown_segid", { segid = UNKNOWN_ID_FOR_TEST; }); |
235 | 0 | { |
236 | 0 | std::lock_guard lock_guard(_lock); |
237 | 0 | if (!_segids_mapping.contains(src_id)) { |
238 | 0 | _status.update(Status::InternalError( |
239 | 0 | "add segment failed, no segment written by this src be yet, src_id={}, " |
240 | 0 | "segment_id={}", |
241 | 0 | src_id, segid)); |
242 | 0 | return _status.status(); |
243 | 0 | } |
244 | 0 | DBUG_EXECUTE_IF("TabletStream.add_segment.segid_never_written", |
245 | 0 | { segid = _segids_mapping[src_id]->size(); }); |
246 | 0 | if (segid >= _segids_mapping[src_id]->size()) { |
247 | 0 | _status.update(Status::InternalError( |
248 | 0 | "add segment failed, segment is never written, src_id={}, segment_id={}", |
249 | 0 | src_id, segid)); |
250 | 0 | return _status.status(); |
251 | 0 | } |
252 | 0 | new_segid = _segids_mapping[src_id]->at(segid); |
253 | 0 | } |
254 | 0 | DCHECK(new_segid != std::numeric_limits<uint32_t>::max()); |
255 | |
|
256 | 0 | auto add_segment_func = [this, new_segid, stat, flush_schema]() { |
257 | 0 | signal::set_signal_task_id(_load_id); |
258 | 0 | auto st = _load_stream_writer->add_segment(new_segid, stat, flush_schema); |
259 | 0 | DBUG_EXECUTE_IF("TabletStream.add_segment.add_segment_failed", |
260 | 0 | { st = Status::InternalError("fault injection"); }); |
261 | 0 | if (!st.ok()) { |
262 | 0 | _status.update(st); |
263 | 0 | LOG(INFO) << "add segment failed " << *this; |
264 | 0 | } |
265 | 0 | }; |
266 | 0 | auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()]; |
267 | 0 | Status st = Status::OK(); |
268 | 0 | DBUG_EXECUTE_IF("TabletStream.add_segment.submit_func_failed", |
269 | 0 | { st = Status::InternalError("fault injection"); }); |
270 | 0 | if (st.ok()) { |
271 | 0 | st = flush_token->submit_func(add_segment_func); |
272 | 0 | } |
273 | 0 | if (!st.ok()) { |
274 | 0 | _status.update(st); |
275 | 0 | } |
276 | 0 | return _status.status(); |
277 | 0 | } |
278 | | |
279 | 30 | Status TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) { |
280 | 30 | bthread::Mutex mu; |
281 | 30 | std::unique_lock<bthread::Mutex> lock(mu); |
282 | 30 | bthread::ConditionVariable cv; |
283 | 30 | auto st = Status::OK(); |
284 | 30 | auto func = [this, &mu, &cv, &st, &fn] { |
285 | 30 | signal::set_signal_task_id(_load_id); |
286 | 30 | st = fn(); |
287 | 30 | std::lock_guard<bthread::Mutex> lock(mu); |
288 | 30 | cv.notify_one(); |
289 | 30 | }; |
290 | 30 | bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(func); |
291 | 30 | if (!ret) { |
292 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
293 | 0 | "there is not enough thread resource for close load"); |
294 | 0 | } |
295 | 30 | cv.wait(lock); |
296 | 30 | return st; |
297 | 30 | } |
298 | | |
299 | 14 | void TabletStream::pre_close() { |
300 | 14 | if (!_status.ok()) { |
301 | 1 | return; |
302 | 1 | } |
303 | | |
304 | 13 | SCOPED_TIMER(_close_wait_timer); |
305 | 13 | _status.update(_run_in_heavy_work_pool([this]() { |
306 | 104 | for (auto& token : _flush_tokens) { |
307 | 104 | token->wait(); |
308 | 104 | } |
309 | 13 | return Status::OK(); |
310 | 13 | })); |
311 | | // it is necessary to check status after wait_func, |
312 | | // for create_rowset could fail during add_segment when loading to MOW table, |
313 | | // in this case, should skip close to avoid submit_calc_delete_bitmap_task which could cause coredump. |
314 | 13 | if (!_status.ok()) { |
315 | 2 | return; |
316 | 2 | } |
317 | | |
318 | 11 | DBUG_EXECUTE_IF("TabletStream.close.segment_num_mismatch", { _num_segments++; }); |
319 | 11 | if (_check_num_segments && (_next_segid.load() != _num_segments)) { |
320 | 2 | _status.update(Status::Corruption( |
321 | 2 | "segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id, |
322 | 2 | _num_segments, _next_segid.load(), print_id(_load_id))); |
323 | 2 | return; |
324 | 2 | } |
325 | | |
326 | 9 | _status.update(_run_in_heavy_work_pool([this]() { return _load_stream_writer->pre_close(); })); |
327 | 9 | } |
328 | | |
329 | 14 | Status TabletStream::close() { |
330 | 14 | if (!_status.ok()) { |
331 | 6 | return _status.status(); |
332 | 6 | } |
333 | | |
334 | 8 | SCOPED_TIMER(_close_wait_timer); |
335 | 8 | _status.update(_run_in_heavy_work_pool([this]() { return _load_stream_writer->close(); })); |
336 | 8 | return _status.status(); |
337 | 14 | } |
338 | | |
339 | | IndexStream::IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id, |
340 | | std::shared_ptr<OlapTableSchemaParam> schema, |
341 | | LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile) |
342 | | : _id(id), |
343 | | _load_id(load_id), |
344 | | _txn_id(txn_id), |
345 | | _schema(schema), |
346 | 28 | _load_stream_mgr(load_stream_mgr) { |
347 | 28 | _profile = profile->create_child(fmt::format("IndexStream {}", id), true, true); |
348 | 28 | _append_data_timer = ADD_TIMER(_profile, "AppendDataTime"); |
349 | 28 | _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime"); |
350 | 28 | } |
351 | | |
352 | 27 | Status IndexStream::append_data(const PStreamHeader& header, butil::IOBuf* data) { |
353 | 27 | SCOPED_TIMER(_append_data_timer); |
354 | 27 | int64_t tablet_id = header.tablet_id(); |
355 | 27 | TabletStreamSharedPtr tablet_stream; |
356 | 27 | { |
357 | 27 | std::lock_guard lock_guard(_lock); |
358 | 27 | auto it = _tablet_streams_map.find(tablet_id); |
359 | 27 | if (it == _tablet_streams_map.end()) { |
360 | 13 | _init_tablet_stream(tablet_stream, tablet_id, header.partition_id()); |
361 | 14 | } else { |
362 | 14 | tablet_stream = it->second; |
363 | 14 | } |
364 | 27 | } |
365 | | |
366 | 27 | return tablet_stream->append_data(header, data); |
367 | 27 | } |
368 | | |
369 | | void IndexStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id, |
370 | 14 | int64_t partition_id) { |
371 | 14 | tablet_stream = std::make_shared<TabletStream>(_load_id, tablet_id, _txn_id, _load_stream_mgr, |
372 | 14 | _profile); |
373 | 14 | _tablet_streams_map[tablet_id] = tablet_stream; |
374 | 14 | auto st = tablet_stream->init(_schema, _id, partition_id); |
375 | 14 | if (!st.ok()) { |
376 | 1 | LOG(WARNING) << "tablet stream init failed " << *tablet_stream; |
377 | 1 | } |
378 | 14 | } |
379 | | |
380 | | void IndexStream::close(const std::vector<PTabletID>& tablets_to_commit, |
381 | 28 | std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) { |
382 | 28 | std::lock_guard lock_guard(_lock); |
383 | 28 | SCOPED_TIMER(_close_wait_timer); |
384 | | // open all need commit tablets |
385 | 34 | for (const auto& tablet : tablets_to_commit) { |
386 | 34 | if (_id != tablet.index_id()) { |
387 | 18 | continue; |
388 | 18 | } |
389 | 16 | TabletStreamSharedPtr tablet_stream; |
390 | 16 | auto it = _tablet_streams_map.find(tablet.tablet_id()); |
391 | 16 | if (it == _tablet_streams_map.end()) { |
392 | 1 | _init_tablet_stream(tablet_stream, tablet.tablet_id(), tablet.partition_id()); |
393 | 15 | } else { |
394 | 15 | tablet_stream = it->second; |
395 | 15 | } |
396 | 16 | if (tablet.has_num_segments()) { |
397 | 16 | tablet_stream->add_num_segments(tablet.num_segments()); |
398 | 16 | } else { |
399 | | // for compatibility reasons (sink from old version BE) |
400 | 0 | tablet_stream->disable_num_segments_check(); |
401 | 0 | } |
402 | 16 | } |
403 | | |
404 | 28 | for (auto& [_, tablet_stream] : _tablet_streams_map) { |
405 | 14 | tablet_stream->pre_close(); |
406 | 14 | } |
407 | | |
408 | 28 | for (auto& [_, tablet_stream] : _tablet_streams_map) { |
409 | 14 | auto st = tablet_stream->close(); |
410 | 14 | if (st.ok()) { |
411 | 8 | success_tablet_ids->push_back(tablet_stream->id()); |
412 | 8 | } else { |
413 | 6 | LOG(INFO) << "close tablet stream " << *tablet_stream << ", status=" << st; |
414 | 6 | failed_tablets->emplace_back(tablet_stream->id(), st); |
415 | 6 | } |
416 | 14 | } |
417 | 28 | } |
418 | | |
419 | | // TODO: Profile is temporary disabled, because: |
420 | | // 1. It's not being processed by the upstream for now |
421 | | // 2. There are some problems in _profile->to_thrift() |
422 | | LoadStream::LoadStream(PUniqueId load_id, LoadStreamMgr* load_stream_mgr, bool enable_profile) |
423 | 14 | : _load_id(load_id), _enable_profile(false), _load_stream_mgr(load_stream_mgr) { |
424 | 14 | g_load_stream_cnt << 1; |
425 | 14 | _profile = std::make_unique<RuntimeProfile>("LoadStream"); |
426 | 14 | _append_data_timer = ADD_TIMER(_profile, "AppendDataTime"); |
427 | 14 | _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime"); |
428 | 14 | TUniqueId load_tid = ((UniqueId)load_id).to_thrift(); |
429 | | #ifndef BE_TEST |
430 | | std::shared_ptr<QueryContext> query_context = |
431 | | ExecEnv::GetInstance()->fragment_mgr()->get_or_erase_query_ctx_with_lock(load_tid); |
432 | | if (query_context != nullptr) { |
433 | | _query_thread_context = {load_tid, query_context->query_mem_tracker, |
434 | | query_context->workload_group()}; |
435 | | } else { |
436 | | _query_thread_context = {load_tid, MemTrackerLimiter::create_shared( |
437 | | MemTrackerLimiter::Type::LOAD, |
438 | | fmt::format("(FromLoadStream)Load#Id={}", |
439 | | ((UniqueId)load_id).to_string()))}; |
440 | | } |
441 | | #else |
442 | 14 | _query_thread_context = {load_tid, MemTrackerLimiter::create_shared( |
443 | 14 | MemTrackerLimiter::Type::LOAD, |
444 | 14 | fmt::format("(FromLoadStream)Load#Id={}", |
445 | 14 | ((UniqueId)load_id).to_string()))}; |
446 | 14 | #endif |
447 | 14 | } |
448 | | |
449 | 14 | LoadStream::~LoadStream() { |
450 | 14 | g_load_stream_cnt << -1; |
451 | 14 | LOG(INFO) << "load stream is deconstructed " << *this; |
452 | 14 | } |
453 | | |
454 | 14 | Status LoadStream::init(const POpenLoadStreamRequest* request) { |
455 | 14 | _txn_id = request->txn_id(); |
456 | 14 | _total_streams = request->total_streams(); |
457 | 14 | _is_incremental = (_total_streams == 0); |
458 | | |
459 | 14 | _schema = std::make_shared<OlapTableSchemaParam>(); |
460 | 14 | RETURN_IF_ERROR(_schema->init(request->schema())); |
461 | 28 | for (auto& index : request->schema().indexes()) { |
462 | 28 | _index_streams_map[index.id()] = std::make_shared<IndexStream>( |
463 | 28 | _load_id, index.id(), _txn_id, _schema, _load_stream_mgr, _profile.get()); |
464 | 28 | } |
465 | 14 | LOG(INFO) << "succeed to init load stream " << *this; |
466 | 14 | return Status::OK(); |
467 | 14 | } |
468 | | |
469 | | void LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit, |
470 | 16 | std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) { |
471 | 16 | std::lock_guard<bthread::Mutex> lock_guard(_lock); |
472 | 16 | SCOPED_TIMER(_close_wait_timer); |
473 | | |
474 | | // we do nothing until recv CLOSE_LOAD from all stream to ensure all data are handled before ack |
475 | 16 | _open_streams[src_id]--; |
476 | 16 | if (_open_streams[src_id] == 0) { |
477 | 16 | _open_streams.erase(src_id); |
478 | 16 | } |
479 | 16 | _close_load_cnt++; |
480 | 16 | LOG(INFO) << "received CLOSE_LOAD from sender " << src_id << ", remaining " |
481 | 16 | << _total_streams - _close_load_cnt << " senders, " << *this; |
482 | | |
483 | 16 | _tablets_to_commit.insert(_tablets_to_commit.end(), tablets_to_commit.begin(), |
484 | 16 | tablets_to_commit.end()); |
485 | | |
486 | 16 | if (_close_load_cnt < _total_streams) { |
487 | | // do not return commit info if there is remaining streams. |
488 | 2 | return; |
489 | 2 | } |
490 | | |
491 | 28 | for (auto& [_, index_stream] : _index_streams_map) { |
492 | 28 | index_stream->close(_tablets_to_commit, success_tablet_ids, failed_tablets); |
493 | 28 | } |
494 | 14 | LOG(INFO) << "close load " << *this << ", success_tablet_num=" << success_tablet_ids->size() |
495 | 14 | << ", failed_tablet_num=" << failed_tablets->size(); |
496 | 14 | } |
497 | | |
498 | | void LoadStream::_report_result(StreamId stream, const Status& status, |
499 | | const std::vector<int64_t>& success_tablet_ids, |
500 | 36 | const FailedTablets& failed_tablets, bool eos) { |
501 | 36 | LOG(INFO) << "report result " << *this << ", success tablet num " << success_tablet_ids.size() |
502 | 36 | << ", failed tablet num " << failed_tablets.size(); |
503 | 36 | butil::IOBuf buf; |
504 | 36 | PLoadStreamResponse response; |
505 | 36 | response.set_eos(eos); |
506 | 36 | status.to_protobuf(response.mutable_status()); |
507 | 36 | for (auto& id : success_tablet_ids) { |
508 | 8 | response.add_success_tablet_ids(id); |
509 | 8 | } |
510 | 36 | for (auto& [id, st] : failed_tablets) { |
511 | 10 | auto pb = response.add_failed_tablets(); |
512 | 10 | pb->set_id(id); |
513 | 10 | st.to_protobuf(pb->mutable_status()); |
514 | 10 | } |
515 | | |
516 | 36 | if (_enable_profile && _close_load_cnt == _total_streams) { |
517 | 0 | TRuntimeProfileTree tprofile; |
518 | 0 | ThriftSerializer ser(false, 4096); |
519 | 0 | uint8_t* buf = nullptr; |
520 | 0 | uint32_t len = 0; |
521 | 0 | std::unique_lock<bthread::Mutex> l(_lock); |
522 | |
|
523 | 0 | _profile->to_thrift(&tprofile); |
524 | 0 | auto st = ser.serialize(&tprofile, &len, &buf); |
525 | 0 | if (st.ok()) { |
526 | 0 | response.set_load_stream_profile(buf, len); |
527 | 0 | } else { |
528 | 0 | LOG(WARNING) << "TRuntimeProfileTree serialize failed, errmsg=" << st << ", " << *this; |
529 | 0 | } |
530 | 0 | } |
531 | | |
532 | 36 | buf.append(response.SerializeAsString()); |
533 | 36 | auto wst = _write_stream(stream, buf); |
534 | 36 | if (!wst.ok()) { |
535 | 0 | LOG(WARNING) << " report result failed with " << wst << ", " << *this; |
536 | 0 | } |
537 | 36 | } |
538 | | |
539 | 0 | void LoadStream::_report_schema(StreamId stream, const PStreamHeader& hdr) { |
540 | 0 | butil::IOBuf buf; |
541 | 0 | PLoadStreamResponse response; |
542 | 0 | Status st = Status::OK(); |
543 | 0 | for (const auto& req : hdr.tablets()) { |
544 | 0 | BaseTabletSPtr tablet; |
545 | 0 | if (auto res = ExecEnv::get_tablet(req.tablet_id()); res.has_value()) { |
546 | 0 | tablet = std::move(res).value(); |
547 | 0 | } else { |
548 | 0 | st = std::move(res).error(); |
549 | 0 | break; |
550 | 0 | } |
551 | 0 | auto* resp = response.add_tablet_schemas(); |
552 | 0 | resp->set_index_id(req.index_id()); |
553 | 0 | resp->set_enable_unique_key_merge_on_write(tablet->enable_unique_key_merge_on_write()); |
554 | 0 | tablet->tablet_schema()->to_schema_pb(resp->mutable_tablet_schema()); |
555 | 0 | } |
556 | 0 | st.to_protobuf(response.mutable_status()); |
557 | |
|
558 | 0 | buf.append(response.SerializeAsString()); |
559 | 0 | auto wst = _write_stream(stream, buf); |
560 | 0 | if (!wst.ok()) { |
561 | 0 | LOG(WARNING) << " report result failed with " << wst << ", " << *this; |
562 | 0 | } |
563 | 0 | } |
564 | | |
565 | 36 | Status LoadStream::_write_stream(StreamId stream, butil::IOBuf& buf) { |
566 | 36 | for (;;) { |
567 | 36 | int ret = 0; |
568 | 36 | DBUG_EXECUTE_IF("LoadStream._write_stream.EAGAIN", { ret = EAGAIN; }); |
569 | 36 | if (ret == 0) { |
570 | 36 | ret = brpc::StreamWrite(stream, buf); |
571 | 36 | } |
572 | 36 | switch (ret) { |
573 | 36 | case 0: |
574 | 36 | return Status::OK(); |
575 | 0 | case EAGAIN: { |
576 | 0 | const timespec time = butil::seconds_from_now(config::load_stream_eagain_wait_seconds); |
577 | 0 | int wait_ret = brpc::StreamWait(stream, &time); |
578 | 0 | if (wait_ret != 0) { |
579 | 0 | return Status::InternalError("StreamWait failed, err={}", wait_ret); |
580 | 0 | } |
581 | 0 | break; |
582 | 0 | } |
583 | 0 | default: |
584 | 0 | return Status::InternalError("StreamWrite failed, err={}", ret); |
585 | 36 | } |
586 | 36 | } |
587 | 0 | return Status::OK(); |
588 | 36 | } |
589 | | |
590 | 62 | void LoadStream::_parse_header(butil::IOBuf* const message, PStreamHeader& hdr) { |
591 | 62 | butil::IOBufAsZeroCopyInputStream wrapper(*message); |
592 | 62 | hdr.ParseFromZeroCopyStream(&wrapper); |
593 | 62 | VLOG_DEBUG << "header parse result: " << hdr.DebugString(); |
594 | 62 | } |
595 | | |
596 | 28 | Status LoadStream::_append_data(const PStreamHeader& header, butil::IOBuf* data) { |
597 | 28 | SCOPED_TIMER(_append_data_timer); |
598 | 28 | IndexStreamSharedPtr index_stream; |
599 | | |
600 | 28 | int64_t index_id = header.index_id(); |
601 | 28 | DBUG_EXECUTE_IF("TabletStream._append_data.unknown_indexid", |
602 | 28 | { index_id = UNKNOWN_ID_FOR_TEST; }); |
603 | 28 | auto it = _index_streams_map.find(index_id); |
604 | 28 | if (it == _index_streams_map.end()) { |
605 | 1 | return Status::Error<ErrorCode::INVALID_ARGUMENT>("unknown index_id {}", index_id); |
606 | 27 | } else { |
607 | 27 | index_stream = it->second; |
608 | 27 | } |
609 | | |
610 | 27 | return index_stream->append_data(header, data); |
611 | 28 | } |
612 | | |
613 | 49 | int LoadStream::on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) { |
614 | 49 | VLOG_DEBUG << "on_received_messages " << id << " " << size; |
615 | 111 | for (size_t i = 0; i < size; ++i) { |
616 | 124 | while (messages[i]->size() > 0) { |
617 | | // step 1: parse header |
618 | 62 | size_t hdr_len = 0; |
619 | 62 | messages[i]->cutn((void*)&hdr_len, sizeof(size_t)); |
620 | 62 | butil::IOBuf hdr_buf; |
621 | 62 | PStreamHeader hdr; |
622 | 62 | messages[i]->cutn(&hdr_buf, hdr_len); |
623 | 62 | _parse_header(&hdr_buf, hdr); |
624 | | |
625 | | // step 2: cut data |
626 | 62 | size_t data_len = 0; |
627 | 62 | messages[i]->cutn((void*)&data_len, sizeof(size_t)); |
628 | 62 | butil::IOBuf data_buf; |
629 | 62 | PStreamHeader data; |
630 | 62 | messages[i]->cutn(&data_buf, data_len); |
631 | | |
632 | | // step 3: dispatch |
633 | 62 | _dispatch(id, hdr, &data_buf); |
634 | 62 | } |
635 | 62 | } |
636 | 49 | return 0; |
637 | 49 | } |
638 | | |
639 | 62 | void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* data) { |
640 | 62 | VLOG_DEBUG << PStreamHeader_Opcode_Name(hdr.opcode()) << " from " << hdr.src_id() |
641 | 0 | << " with tablet " << hdr.tablet_id(); |
642 | 62 | SCOPED_ATTACH_TASK(_query_thread_context); |
643 | | // CLOSE_LOAD message should not be fault injected, |
644 | | // otherwise the message will be ignored and causing close wait timeout |
645 | 62 | if (hdr.opcode() != PStreamHeader::CLOSE_LOAD) { |
646 | 30 | DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_loadid", { |
647 | 30 | PUniqueId& load_id = const_cast<PUniqueId&>(hdr.load_id()); |
648 | 30 | load_id.set_hi(UNKNOWN_ID_FOR_TEST); |
649 | 30 | load_id.set_lo(UNKNOWN_ID_FOR_TEST); |
650 | 30 | }); |
651 | 30 | DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_srcid", { |
652 | 30 | PStreamHeader& t_hdr = const_cast<PStreamHeader&>(hdr); |
653 | 30 | t_hdr.set_src_id(UNKNOWN_ID_FOR_TEST); |
654 | 30 | }); |
655 | 30 | } |
656 | 62 | if (UniqueId(hdr.load_id()) != UniqueId(_load_id)) { |
657 | 1 | Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>( |
658 | 1 | "invalid load id {}, expected {}", print_id(hdr.load_id()), print_id(_load_id)); |
659 | 1 | _report_failure(id, st, hdr); |
660 | 1 | return; |
661 | 1 | } |
662 | | |
663 | 61 | { |
664 | 61 | std::lock_guard lock_guard(_lock); |
665 | 61 | if (!_open_streams.contains(hdr.src_id())) { |
666 | 17 | Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>("no open stream from source {}", |
667 | 17 | hdr.src_id()); |
668 | 17 | _report_failure(id, st, hdr); |
669 | 17 | return; |
670 | 17 | } |
671 | 61 | } |
672 | | |
673 | 44 | switch (hdr.opcode()) { |
674 | 0 | case PStreamHeader::ADD_SEGMENT: // ADD_SEGMENT will be dispatched inside TabletStream |
675 | 28 | case PStreamHeader::APPEND_DATA: { |
676 | 28 | auto st = _append_data(hdr, data); |
677 | 28 | if (!st.ok()) { |
678 | 2 | _report_failure(id, st, hdr); |
679 | 2 | } |
680 | 28 | } break; |
681 | 16 | case PStreamHeader::CLOSE_LOAD: { |
682 | 16 | std::vector<int64_t> success_tablet_ids; |
683 | 16 | FailedTablets failed_tablets; |
684 | 16 | std::vector<PTabletID> tablets_to_commit(hdr.tablets().begin(), hdr.tablets().end()); |
685 | 16 | close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets); |
686 | 16 | _report_result(id, Status::OK(), success_tablet_ids, failed_tablets, true); |
687 | 16 | brpc::StreamClose(id); |
688 | 16 | } break; |
689 | 0 | case PStreamHeader::GET_SCHEMA: { |
690 | 0 | _report_schema(id, hdr); |
691 | 0 | } break; |
692 | 0 | default: |
693 | 0 | LOG(WARNING) << "unexpected stream message " << hdr.opcode() << ", " << *this; |
694 | 0 | DCHECK(false); |
695 | 44 | } |
696 | 44 | } |
697 | | |
698 | 0 | void LoadStream::on_idle_timeout(StreamId id) { |
699 | 0 | LOG(WARNING) << "closing load stream on idle timeout, " << *this; |
700 | 0 | brpc::StreamClose(id); |
701 | 0 | } |
702 | | |
703 | 16 | void LoadStream::on_closed(StreamId id) { |
704 | | // `this` may be freed by other threads after increasing `_close_rpc_cnt`, |
705 | | // format string first to prevent use-after-free |
706 | 16 | std::stringstream ss; |
707 | 16 | ss << *this; |
708 | 16 | auto remaining_streams = _total_streams - _close_rpc_cnt.fetch_add(1) - 1; |
709 | 16 | LOG(INFO) << "stream " << id << " on_closed, remaining streams = " << remaining_streams << ", " |
710 | 16 | << ss.str(); |
711 | 16 | if (remaining_streams == 0) { |
712 | 14 | _load_stream_mgr->clear_load(_load_id); |
713 | 14 | } |
714 | 16 | } |
715 | | |
716 | 110 | inline std::ostream& operator<<(std::ostream& ostr, const LoadStream& load_stream) { |
717 | 110 | ostr << "load_id=" << print_id(load_stream._load_id) << ", txn_id=" << load_stream._txn_id; |
718 | 110 | return ostr; |
719 | 110 | } |
720 | | |
721 | | } // namespace doris |