be/src/load/channel/load_stream.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "load/channel/load_stream.h" |
19 | | |
20 | | #include <brpc/stream.h> |
21 | | #include <bthread/bthread.h> |
22 | | #include <bthread/condition_variable.h> |
23 | | #include <bthread/mutex.h> |
24 | | |
25 | | #include <memory> |
26 | | #include <sstream> |
27 | | |
28 | | #include "bvar/bvar.h" |
29 | | #include "cloud/config.h" |
30 | | #include "common/signal_handler.h" |
31 | | #include "load/channel/load_channel.h" |
32 | | #include "load/channel/load_stream_mgr.h" |
33 | | #include "load/channel/load_stream_writer.h" |
34 | | #include "load/delta_writer/delta_writer.h" |
35 | | #include "runtime/exec_env.h" |
36 | | #include "runtime/fragment_mgr.h" |
37 | | #include "runtime/runtime_profile.h" |
38 | | #include "runtime/workload_group/workload_group_manager.h" |
39 | | #include "storage/rowset/rowset_factory.h" |
40 | | #include "storage/rowset/rowset_meta.h" |
41 | | #include "storage/storage_engine.h" |
42 | | #include "storage/tablet/tablet.h" |
43 | | #include "storage/tablet/tablet_fwd.h" |
44 | | #include "storage/tablet/tablet_manager.h" |
45 | | #include "storage/tablet/tablet_schema.h" |
46 | | #include "storage/tablet_info.h" |
47 | | #include "util/debug_points.h" |
48 | | #include "util/thrift_util.h" |
49 | | #include "util/uid_util.h" |
50 | | |
51 | | #define UNKNOWN_ID_FOR_TEST 0x7c00 |
52 | | |
53 | | namespace doris { |
54 | | #include "common/compile_check_begin.h" |
55 | | |
56 | | bvar::Adder<int64_t> g_load_stream_cnt("load_stream_count"); |
57 | | bvar::LatencyRecorder g_load_stream_flush_wait_ms("load_stream_flush_wait_ms"); |
58 | | bvar::Adder<int> g_load_stream_flush_running_threads("load_stream_flush_wait_threads"); |
59 | | |
60 | | TabletStream::TabletStream(const PUniqueId& load_id, int64_t id, int64_t txn_id, |
61 | | LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile) |
62 | 12.7k | : _id(id), |
63 | 12.7k | _next_segid(0), |
64 | 12.7k | _load_id(load_id), |
65 | 12.7k | _txn_id(txn_id), |
66 | 12.7k | _load_stream_mgr(load_stream_mgr) { |
67 | 12.7k | load_stream_mgr->create_token(_flush_token); |
68 | 12.7k | _profile = profile->create_child(fmt::format("TabletStream {}", id), true, true); |
69 | 12.7k | _append_data_timer = ADD_TIMER(_profile, "AppendDataTime"); |
70 | 12.7k | _add_segment_timer = ADD_TIMER(_profile, "AddSegmentTime"); |
71 | 12.7k | _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime"); |
72 | 12.7k | } |
73 | | |
74 | 10 | inline std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream) { |
75 | 10 | ostr << "load_id=" << print_id(tablet_stream._load_id) << ", txn_id=" << tablet_stream._txn_id |
76 | 10 | << ", tablet_id=" << tablet_stream._id << ", status=" << tablet_stream._status.status(); |
77 | 10 | return ostr; |
78 | 10 | } |
79 | | |
80 | | Status TabletStream::init(std::shared_ptr<OlapTableSchemaParam> schema, int64_t index_id, |
81 | 12.7k | int64_t partition_id) { |
82 | 12.7k | WriteRequest req { |
83 | 12.7k | .tablet_id = _id, |
84 | 12.7k | .txn_id = _txn_id, |
85 | 12.7k | .index_id = index_id, |
86 | 12.7k | .partition_id = partition_id, |
87 | 12.7k | .load_id = _load_id, |
88 | 12.7k | .table_schema_param = schema, |
89 | | // TODO(plat1ko): write_file_cache |
90 | 12.7k | .storage_vault_id {}, |
91 | 12.7k | }; |
92 | | |
93 | 12.7k | _load_stream_writer = std::make_shared<LoadStreamWriter>(&req, _profile); |
94 | 12.7k | DBUG_EXECUTE_IF("TabletStream.init.uninited_writer", { |
95 | 12.7k | _status.update(Status::Uninitialized("fault injection")); |
96 | 12.7k | return _status.status(); |
97 | 12.7k | }); |
98 | 12.7k | _status.update(_load_stream_writer->init()); |
99 | 12.7k | if (!_status.ok()) { |
100 | 1 | LOG(INFO) << "failed to init rowset builder due to " << *this; |
101 | 1 | } |
102 | 12.7k | return _status.status(); |
103 | 12.7k | } |
104 | | |
105 | 110k | Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data) { |
106 | 110k | if (!_status.ok()) { |
107 | 1 | return _status.status(); |
108 | 1 | } |
109 | | |
110 | | // dispatch add_segment request |
111 | 110k | if (header.opcode() == PStreamHeader::ADD_SEGMENT) { |
112 | 3.63k | return add_segment(header, data); |
113 | 3.63k | } |
114 | | |
115 | 106k | SCOPED_TIMER(_append_data_timer); |
116 | | |
117 | 106k | int64_t src_id = header.src_id(); |
118 | 106k | uint32_t segid = header.segment_id(); |
119 | | // Ensure there are enough space and mapping are built. |
120 | 106k | SegIdMapping* mapping = nullptr; |
121 | 106k | { |
122 | 106k | std::lock_guard lock_guard(_lock); |
123 | 106k | if (!_segids_mapping.contains(src_id)) { |
124 | 3.65k | _segids_mapping[src_id] = std::make_unique<SegIdMapping>(); |
125 | 3.65k | } |
126 | 106k | mapping = _segids_mapping[src_id].get(); |
127 | 106k | } |
128 | 106k | if (segid + 1 > mapping->size()) { |
129 | | // TODO: Each sender lock is enough. |
130 | 3.65k | std::lock_guard lock_guard(_lock); |
131 | 3.65k | ssize_t origin_size = mapping->size(); |
132 | 3.65k | if (segid + 1 > origin_size) { |
133 | 3.65k | mapping->resize(segid + 1, std::numeric_limits<uint32_t>::max()); |
134 | 7.31k | for (size_t index = origin_size; index <= segid; index++) { |
135 | 3.66k | mapping->at(index) = _next_segid; |
136 | 3.66k | _next_segid++; |
137 | 3.66k | VLOG_DEBUG << "src_id=" << src_id << ", segid=" << index << " to " |
138 | 0 | << " segid=" << _next_segid - 1 << ", " << *this; |
139 | 3.66k | } |
140 | 3.65k | } |
141 | 3.65k | } |
142 | | |
143 | | // Each sender sends data in one segment sequential, so we also do not |
144 | | // need a lock here. |
145 | 106k | bool eos = header.segment_eos(); |
146 | 106k | FileType file_type = header.file_type(); |
147 | 106k | uint32_t new_segid = mapping->at(segid); |
148 | 106k | DCHECK(new_segid != std::numeric_limits<uint32_t>::max()); |
149 | 106k | butil::IOBuf buf = data->movable(); |
150 | 106k | auto flush_func = [this, new_segid, eos, buf, header, file_type]() mutable { |
151 | 106k | signal::set_signal_task_id(_load_id); |
152 | 106k | g_load_stream_flush_running_threads << -1; |
153 | 106k | auto st = _load_stream_writer->append_data(new_segid, header.offset(), buf, file_type); |
154 | 106k | if (!st.ok() && !config::is_cloud_mode()) { |
155 | 1 | auto res = ExecEnv::get_tablet(_id); |
156 | 1 | TabletSharedPtr tablet = |
157 | 1 | res.has_value() ? std::dynamic_pointer_cast<Tablet>(res.value()) : nullptr; |
158 | 1 | if (tablet) { |
159 | 1 | tablet->report_error(st); |
160 | 1 | } |
161 | 1 | } |
162 | 106k | if (eos && st.ok()) { |
163 | 3.66k | DBUG_EXECUTE_IF("TabletStream.append_data.unknown_file_type", |
164 | 3.66k | { file_type = static_cast<FileType>(-1); }); |
165 | 3.66k | if (file_type == FileType::SEGMENT_FILE || file_type == FileType::INVERTED_INDEX_FILE) { |
166 | 3.66k | st = _load_stream_writer->close_writer(new_segid, file_type); |
167 | 3.66k | } else { |
168 | 0 | st = Status::InternalError( |
169 | 0 | "appent data failed, file type error, file type = {}, " |
170 | 0 | "segment_id={}", |
171 | 0 | file_type, new_segid); |
172 | 0 | } |
173 | 3.66k | } |
174 | 106k | DBUG_EXECUTE_IF("TabletStream.append_data.append_failed", |
175 | 106k | { st = Status::InternalError("fault injection"); }); |
176 | 106k | if (!st.ok()) { |
177 | 2 | _status.update(st); |
178 | 2 | LOG(WARNING) << "write data failed " << st << ", " << *this; |
179 | 2 | } |
180 | 106k | }; |
181 | 106k | auto load_stream_flush_token_max_tasks = config::load_stream_flush_token_max_tasks; |
182 | 106k | auto load_stream_max_wait_flush_token_time_ms = |
183 | 106k | config::load_stream_max_wait_flush_token_time_ms; |
184 | 106k | DBUG_EXECUTE_IF("TabletStream.append_data.long_wait", { |
185 | 106k | load_stream_flush_token_max_tasks = 0; |
186 | 106k | load_stream_max_wait_flush_token_time_ms = 1000; |
187 | 106k | }); |
188 | 106k | MonotonicStopWatch timer; |
189 | 106k | timer.start(); |
190 | 107k | while (_flush_token->num_tasks() >= load_stream_flush_token_max_tasks) { |
191 | 708 | if (timer.elapsed_time() / 1000 / 1000 >= load_stream_max_wait_flush_token_time_ms) { |
192 | 0 | _status.update( |
193 | 0 | Status::Error<true>("wait flush token back pressure time is more than " |
194 | 0 | "load_stream_max_wait_flush_token_time {}", |
195 | 0 | load_stream_max_wait_flush_token_time_ms)); |
196 | 0 | return _status.status(); |
197 | 0 | } |
198 | 708 | bthread_usleep(2 * 1000); // 2ms |
199 | 708 | } |
200 | 106k | timer.stop(); |
201 | 106k | int64_t time_ms = timer.elapsed_time() / 1000 / 1000; |
202 | 106k | g_load_stream_flush_wait_ms << time_ms; |
203 | 106k | g_load_stream_flush_running_threads << 1; |
204 | 106k | Status st = Status::OK(); |
205 | 106k | DBUG_EXECUTE_IF("TabletStream.append_data.submit_func_failed", |
206 | 106k | { st = Status::InternalError("fault injection"); }); |
207 | 106k | if (st.ok()) { |
208 | 106k | st = _flush_token->submit_func(flush_func); |
209 | 106k | } |
210 | 106k | if (!st.ok()) { |
211 | 0 | _status.update(st); |
212 | 0 | } |
213 | 106k | return _status.status(); |
214 | 106k | } |
215 | | |
216 | 3.63k | Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data) { |
217 | 3.63k | if (!_status.ok()) { |
218 | 0 | return _status.status(); |
219 | 0 | } |
220 | | |
221 | 3.63k | SCOPED_TIMER(_add_segment_timer); |
222 | 3.63k | DCHECK(header.has_segment_statistics()); |
223 | 3.63k | SegmentStatistics stat(header.segment_statistics()); |
224 | | |
225 | 3.63k | int64_t src_id = header.src_id(); |
226 | 3.63k | uint32_t segid = header.segment_id(); |
227 | 3.63k | uint32_t new_segid; |
228 | 3.63k | DBUG_EXECUTE_IF("TabletStream.add_segment.unknown_segid", { segid = UNKNOWN_ID_FOR_TEST; }); |
229 | 3.63k | { |
230 | 3.63k | std::lock_guard lock_guard(_lock); |
231 | 3.63k | if (!_segids_mapping.contains(src_id)) { |
232 | 0 | _status.update(Status::InternalError( |
233 | 0 | "add segment failed, no segment written by this src be yet, src_id={}, " |
234 | 0 | "segment_id={}", |
235 | 0 | src_id, segid)); |
236 | 0 | return _status.status(); |
237 | 0 | } |
238 | 3.63k | DBUG_EXECUTE_IF("TabletStream.add_segment.segid_never_written", |
239 | 3.63k | { segid = static_cast<uint32_t>(_segids_mapping[src_id]->size()); }); |
240 | 3.63k | if (segid >= _segids_mapping[src_id]->size()) { |
241 | 0 | _status.update(Status::InternalError( |
242 | 0 | "add segment failed, segment is never written, src_id={}, segment_id={}", |
243 | 0 | src_id, segid)); |
244 | 0 | return _status.status(); |
245 | 0 | } |
246 | 3.63k | new_segid = _segids_mapping[src_id]->at(segid); |
247 | 3.63k | } |
248 | 3.63k | DCHECK(new_segid != std::numeric_limits<uint32_t>::max()); |
249 | | |
250 | 3.63k | auto add_segment_func = [this, new_segid, stat]() { |
251 | 3.63k | signal::set_signal_task_id(_load_id); |
252 | 3.63k | auto st = _load_stream_writer->add_segment(new_segid, stat); |
253 | 3.63k | DBUG_EXECUTE_IF("TabletStream.add_segment.add_segment_failed", |
254 | 3.63k | { st = Status::InternalError("fault injection"); }); |
255 | 3.63k | if (!st.ok()) { |
256 | 0 | _status.update(st); |
257 | 0 | LOG(INFO) << "add segment failed " << *this; |
258 | 0 | } |
259 | 3.63k | }; |
260 | 3.63k | Status st = Status::OK(); |
261 | 3.63k | DBUG_EXECUTE_IF("TabletStream.add_segment.submit_func_failed", |
262 | 3.63k | { st = Status::InternalError("fault injection"); }); |
263 | 3.63k | if (st.ok()) { |
264 | 3.63k | st = _flush_token->submit_func(add_segment_func); |
265 | 3.63k | } |
266 | 3.63k | if (!st.ok()) { |
267 | 0 | _status.update(st); |
268 | 0 | } |
269 | 3.63k | return _status.status(); |
270 | 3.63k | } |
271 | | |
272 | 38.3k | Status TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) { |
273 | 38.3k | bthread::Mutex mu; |
274 | 38.3k | std::unique_lock<bthread::Mutex> lock(mu); |
275 | 38.3k | bthread::ConditionVariable cv; |
276 | 38.3k | auto st = Status::OK(); |
277 | 38.3k | auto func = [this, &mu, &cv, &st, &fn] { |
278 | 38.3k | signal::set_signal_task_id(_load_id); |
279 | 38.3k | st = fn(); |
280 | 38.3k | std::lock_guard<bthread::Mutex> lock(mu); |
281 | 38.3k | cv.notify_one(); |
282 | 38.3k | }; |
283 | 38.3k | bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(func); |
284 | 38.3k | if (!ret) { |
285 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
286 | 0 | "there is not enough thread resource for close load"); |
287 | 0 | } |
288 | 38.3k | cv.wait(lock); |
289 | 38.3k | return st; |
290 | 38.3k | } |
291 | | |
292 | 25.5k | void TabletStream::wait_for_flush_tasks() { |
293 | 25.5k | { |
294 | 25.5k | std::lock_guard lock_guard(_lock); |
295 | 25.5k | if (_flush_tasks_done) { |
296 | 12.7k | return; |
297 | 12.7k | } |
298 | 12.7k | _flush_tasks_done = true; |
299 | 12.7k | } |
300 | | |
301 | 12.7k | if (!_status.ok()) { |
302 | 2 | _flush_token->shutdown(); |
303 | 2 | return; |
304 | 2 | } |
305 | | |
306 | | // Use heavy_work_pool to avoid blocking bthread |
307 | 12.7k | auto st = _run_in_heavy_work_pool([this]() { |
308 | 12.7k | _flush_token->wait(); |
309 | 12.7k | return Status::OK(); |
310 | 12.7k | }); |
311 | 12.7k | if (!st.ok()) { |
312 | | // If heavy_work_pool is unavailable, fall back to shutdown |
313 | | // which will cancel pending tasks and wait for running tasks |
314 | 0 | _flush_token->shutdown(); |
315 | 0 | _status.update(st); |
316 | 0 | } |
317 | 12.7k | } |
318 | | |
319 | 12.7k | void TabletStream::pre_close() { |
320 | 12.7k | SCOPED_TIMER(_close_wait_timer); |
321 | 12.7k | wait_for_flush_tasks(); |
322 | | |
323 | 12.7k | if (!_status.ok()) { |
324 | 3 | return; |
325 | 3 | } |
326 | | |
327 | 12.7k | DBUG_EXECUTE_IF("TabletStream.close.segment_num_mismatch", { _num_segments++; }); |
328 | 12.7k | if (_check_num_segments && (_next_segid.load() != _num_segments)) { |
329 | 2 | _status.update(Status::Corruption( |
330 | 2 | "segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id, |
331 | 2 | _num_segments, _next_segid.load(), print_id(_load_id))); |
332 | 2 | return; |
333 | 2 | } |
334 | | |
335 | 12.7k | _status.update(_run_in_heavy_work_pool([this]() { return _load_stream_writer->pre_close(); })); |
336 | 12.7k | } |
337 | | |
338 | 12.7k | Status TabletStream::close() { |
339 | 12.7k | if (!_status.ok()) { |
340 | 6 | return _status.status(); |
341 | 6 | } |
342 | | |
343 | 12.7k | SCOPED_TIMER(_close_wait_timer); |
344 | 12.7k | _status.update(_run_in_heavy_work_pool([this]() { return _load_stream_writer->close(); })); |
345 | 12.7k | return _status.status(); |
346 | 12.7k | } |
347 | | |
348 | | IndexStream::IndexStream(const PUniqueId& load_id, int64_t id, int64_t txn_id, |
349 | | std::shared_ptr<OlapTableSchemaParam> schema, |
350 | | LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile) |
351 | 2.01k | : _id(id), |
352 | 2.01k | _load_id(load_id), |
353 | 2.01k | _txn_id(txn_id), |
354 | 2.01k | _schema(schema), |
355 | 2.01k | _load_stream_mgr(load_stream_mgr) { |
356 | 2.01k | _profile = profile->create_child(fmt::format("IndexStream {}", id), true, true); |
357 | 2.01k | _append_data_timer = ADD_TIMER(_profile, "AppendDataTime"); |
358 | 2.01k | _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime"); |
359 | 2.01k | } |
360 | | |
361 | 2.01k | IndexStream::~IndexStream() { |
362 | | // Ensure all TabletStreams have their flush tokens properly handled before destruction. |
363 | | // In normal flow, close() should have called pre_close() on all tablet streams. |
364 | | // But if IndexStream is destroyed without close() being called (e.g., on_idle_timeout), |
365 | | // we need to wait for flush tasks here to ensure flush tokens are properly shut down. |
366 | 12.7k | for (auto& [_, tablet_stream] : _tablet_streams_map) { |
367 | 12.7k | tablet_stream->wait_for_flush_tasks(); |
368 | 12.7k | } |
369 | 2.01k | } |
370 | | |
371 | 110k | Status IndexStream::append_data(const PStreamHeader& header, butil::IOBuf* data) { |
372 | 110k | SCOPED_TIMER(_append_data_timer); |
373 | 110k | int64_t tablet_id = header.tablet_id(); |
374 | 110k | TabletStreamSharedPtr tablet_stream; |
375 | 110k | { |
376 | 110k | std::lock_guard lock_guard(_lock); |
377 | 110k | auto it = _tablet_streams_map.find(tablet_id); |
378 | 110k | if (it == _tablet_streams_map.end()) { |
379 | 3.65k | _init_tablet_stream(tablet_stream, tablet_id, header.partition_id()); |
380 | 106k | } else { |
381 | 106k | tablet_stream = it->second; |
382 | 106k | } |
383 | 110k | } |
384 | | |
385 | 110k | return tablet_stream->append_data(header, data); |
386 | 110k | } |
387 | | |
388 | | void IndexStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id, |
389 | 12.7k | int64_t partition_id) { |
390 | 12.7k | tablet_stream = std::make_shared<TabletStream>(_load_id, tablet_id, _txn_id, _load_stream_mgr, |
391 | 12.7k | _profile); |
392 | 12.7k | _tablet_streams_map[tablet_id] = tablet_stream; |
393 | 12.7k | auto st = tablet_stream->init(_schema, _id, partition_id); |
394 | 12.7k | if (!st.ok()) { |
395 | 1 | LOG(WARNING) << "tablet stream init failed " << *tablet_stream; |
396 | 1 | } |
397 | 12.7k | } |
398 | | |
399 | 3.63k | void IndexStream::get_all_write_tablet_ids(std::vector<int64_t>* tablet_ids) { |
400 | 3.63k | std::lock_guard lock_guard(_lock); |
401 | 11.5k | for (const auto& [tablet_id, _] : _tablet_streams_map) { |
402 | 11.5k | tablet_ids->push_back(tablet_id); |
403 | 11.5k | } |
404 | 3.63k | } |
405 | | |
406 | | void IndexStream::close(const std::vector<PTabletID>& tablets_to_commit, |
407 | 2.01k | std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) { |
408 | 2.01k | std::lock_guard lock_guard(_lock); |
409 | 2.01k | SCOPED_TIMER(_close_wait_timer); |
410 | | // open all need commit tablets |
411 | 12.7k | for (const auto& tablet : tablets_to_commit) { |
412 | 12.7k | if (_id != tablet.index_id()) { |
413 | 18 | continue; |
414 | 18 | } |
415 | 12.7k | TabletStreamSharedPtr tablet_stream; |
416 | 12.7k | auto it = _tablet_streams_map.find(tablet.tablet_id()); |
417 | 12.7k | if (it == _tablet_streams_map.end()) { |
418 | 9.12k | _init_tablet_stream(tablet_stream, tablet.tablet_id(), tablet.partition_id()); |
419 | 9.12k | } else { |
420 | 3.65k | tablet_stream = it->second; |
421 | 3.65k | } |
422 | 12.7k | if (tablet.has_num_segments()) { |
423 | 12.7k | tablet_stream->add_num_segments(tablet.num_segments()); |
424 | 12.7k | } else { |
425 | | // for compatibility reasons (sink from old version BE) |
426 | 0 | tablet_stream->disable_num_segments_check(); |
427 | 0 | } |
428 | 12.7k | } |
429 | | |
430 | 12.7k | for (auto& [_, tablet_stream] : _tablet_streams_map) { |
431 | 12.7k | tablet_stream->pre_close(); |
432 | 12.7k | } |
433 | | |
434 | 12.7k | for (auto& [_, tablet_stream] : _tablet_streams_map) { |
435 | 12.7k | auto st = tablet_stream->close(); |
436 | 12.7k | if (st.ok()) { |
437 | 12.7k | success_tablet_ids->push_back(tablet_stream->id()); |
438 | 12.7k | } else { |
439 | 6 | LOG(INFO) << "close tablet stream " << *tablet_stream << ", status=" << st; |
440 | 6 | failed_tablets->emplace_back(tablet_stream->id(), st); |
441 | 6 | } |
442 | 12.7k | } |
443 | 2.01k | } |
444 | | |
445 | | // TODO: Profile is temporary disabled, because: |
446 | | // 1. It's not being processed by the upstream for now |
447 | | // 2. There are some problems in _profile->to_thrift() |
448 | | LoadStream::LoadStream(const PUniqueId& load_id, LoadStreamMgr* load_stream_mgr, |
449 | | bool enable_profile) |
450 | 1.98k | : _load_id(load_id), _enable_profile(false), _load_stream_mgr(load_stream_mgr) { |
451 | 1.98k | g_load_stream_cnt << 1; |
452 | 1.98k | _profile = std::make_unique<RuntimeProfile>("LoadStream"); |
453 | 1.98k | _append_data_timer = ADD_TIMER(_profile, "AppendDataTime"); |
454 | 1.98k | _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime"); |
455 | 1.98k | TUniqueId load_tid = ((UniqueId)load_id).to_thrift(); |
456 | 1.98k | #ifndef BE_TEST |
457 | 1.98k | std::shared_ptr<QueryContext> query_context = |
458 | 1.98k | ExecEnv::GetInstance()->fragment_mgr()->get_query_ctx(load_tid); |
459 | 1.98k | if (query_context != nullptr) { |
460 | 1.98k | _resource_ctx = query_context->resource_ctx(); |
461 | 1.98k | } else { |
462 | 0 | _resource_ctx = ResourceContext::create_shared(); |
463 | 0 | _resource_ctx->task_controller()->set_task_id(load_tid); |
464 | 0 | std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared( |
465 | 0 | MemTrackerLimiter::Type::LOAD, |
466 | 0 | fmt::format("(FromLoadStream)Load#Id={}", ((UniqueId)load_id).to_string())); |
467 | 0 | _resource_ctx->memory_context()->set_mem_tracker(mem_tracker); |
468 | 0 | } |
469 | | #else |
470 | | _resource_ctx = ResourceContext::create_shared(); |
471 | | _resource_ctx->task_controller()->set_task_id(load_tid); |
472 | | std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared( |
473 | | MemTrackerLimiter::Type::LOAD, |
474 | | fmt::format("(FromLoadStream)Load#Id={}", ((UniqueId)load_id).to_string())); |
475 | | _resource_ctx->memory_context()->set_mem_tracker(mem_tracker); |
476 | | #endif |
477 | 1.98k | } |
478 | | |
479 | 1.99k | LoadStream::~LoadStream() { |
480 | 1.99k | g_load_stream_cnt << -1; |
481 | 1.99k | LOG(INFO) << "load stream is deconstructed " << *this; |
482 | 1.99k | } |
483 | | |
484 | 1.99k | Status LoadStream::init(const POpenLoadStreamRequest* request) { |
485 | 1.99k | _txn_id = request->txn_id(); |
486 | 1.99k | _total_streams = static_cast<int32_t>(request->total_streams()); |
487 | 1.99k | _is_incremental = (_total_streams == 0); |
488 | | |
489 | 1.99k | _schema = std::make_shared<OlapTableSchemaParam>(); |
490 | 1.99k | RETURN_IF_ERROR(_schema->init(request->schema())); |
491 | 2.01k | for (auto& index : request->schema().indexes()) { |
492 | 2.01k | _index_streams_map[index.id()] = std::make_shared<IndexStream>( |
493 | 2.01k | _load_id, index.id(), _txn_id, _schema, _load_stream_mgr, _profile.get()); |
494 | 2.01k | } |
495 | 1.99k | LOG(INFO) << "succeed to init load stream " << *this; |
496 | 1.99k | return Status::OK(); |
497 | 1.99k | } |
498 | | |
499 | | bool LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit, |
500 | 3.85k | std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) { |
501 | 3.85k | std::lock_guard<bthread::Mutex> lock_guard(_lock); |
502 | 3.85k | SCOPED_TIMER(_close_wait_timer); |
503 | | |
504 | | // we do nothing until recv CLOSE_LOAD from all stream to ensure all data are handled before ack |
505 | 3.85k | _open_streams[src_id]--; |
506 | 3.85k | if (_open_streams[src_id] == 0) { |
507 | 1.99k | _open_streams.erase(src_id); |
508 | 1.99k | } |
509 | 3.85k | _close_load_cnt++; |
510 | 3.85k | LOG(INFO) << "received CLOSE_LOAD from sender " << src_id << ", remaining " |
511 | 3.85k | << _total_streams - _close_load_cnt << " senders, " << *this; |
512 | | |
513 | 3.85k | _tablets_to_commit.insert(_tablets_to_commit.end(), tablets_to_commit.begin(), |
514 | 3.85k | tablets_to_commit.end()); |
515 | | |
516 | 3.85k | if (_close_load_cnt < _total_streams) { |
517 | | // do not return commit info if there is remaining streams. |
518 | 1.86k | return false; |
519 | 1.86k | } |
520 | | |
521 | 2.01k | for (auto& [_, index_stream] : _index_streams_map) { |
522 | 2.01k | index_stream->close(_tablets_to_commit, success_tablet_ids, failed_tablets); |
523 | 2.01k | } |
524 | 1.99k | LOG(INFO) << "close load " << *this << ", success_tablet_num=" << success_tablet_ids->size() |
525 | 1.99k | << ", failed_tablet_num=" << failed_tablets->size(); |
526 | 1.99k | return true; |
527 | 3.85k | } |
528 | | |
529 | | void LoadStream::_report_result(StreamId stream, const Status& status, |
530 | | const std::vector<int64_t>& success_tablet_ids, |
531 | 3.87k | const FailedTablets& failed_tablets, bool eos) { |
532 | 3.87k | LOG(INFO) << "report result " << *this << ", success tablet num " << success_tablet_ids.size() |
533 | 3.87k | << ", failed tablet num " << failed_tablets.size(); |
534 | 3.87k | butil::IOBuf buf; |
535 | 3.87k | PLoadStreamResponse response; |
536 | 3.87k | response.set_eos(eos); |
537 | 3.87k | status.to_protobuf(response.mutable_status()); |
538 | 12.7k | for (auto& id : success_tablet_ids) { |
539 | 12.7k | response.add_success_tablet_ids(id); |
540 | 12.7k | } |
541 | 3.87k | for (auto& [id, st] : failed_tablets) { |
542 | 10 | auto pb = response.add_failed_tablets(); |
543 | 10 | pb->set_id(id); |
544 | 10 | st.to_protobuf(pb->mutable_status()); |
545 | 10 | } |
546 | | |
547 | 3.87k | if (_enable_profile && _close_load_cnt == _total_streams) { |
548 | 0 | TRuntimeProfileTree tprofile; |
549 | 0 | ThriftSerializer ser(false, 4096); |
550 | 0 | uint8_t* profile_buf = nullptr; |
551 | 0 | uint32_t len = 0; |
552 | 0 | std::unique_lock<bthread::Mutex> l(_lock); |
553 | |
|
554 | 0 | _profile->to_thrift(&tprofile); |
555 | 0 | auto st = ser.serialize(&tprofile, &len, &profile_buf); |
556 | 0 | if (st.ok()) { |
557 | 0 | response.set_load_stream_profile(profile_buf, len); |
558 | 0 | } else { |
559 | 0 | LOG(WARNING) << "TRuntimeProfileTree serialize failed, errmsg=" << st << ", " << *this; |
560 | 0 | } |
561 | 0 | } |
562 | | |
563 | 3.87k | buf.append(response.SerializeAsString()); |
564 | 3.87k | auto wst = _write_stream(stream, buf); |
565 | 3.87k | if (!wst.ok()) { |
566 | 0 | LOG(WARNING) << " report result failed with " << wst << ", " << *this; |
567 | 0 | } |
568 | 3.87k | } |
569 | | |
570 | 4 | void LoadStream::_report_schema(StreamId stream, const PStreamHeader& hdr) { |
571 | 4 | butil::IOBuf buf; |
572 | 4 | PLoadStreamResponse response; |
573 | 4 | Status st = Status::OK(); |
574 | 4 | for (const auto& req : hdr.tablets()) { |
575 | 4 | BaseTabletSPtr tablet; |
576 | 4 | if (auto res = ExecEnv::get_tablet(req.tablet_id()); res.has_value()) { |
577 | 4 | tablet = std::move(res).value(); |
578 | 4 | } else { |
579 | 0 | st = std::move(res).error(); |
580 | 0 | break; |
581 | 0 | } |
582 | 4 | auto* resp = response.add_tablet_schemas(); |
583 | 4 | resp->set_index_id(req.index_id()); |
584 | 4 | resp->set_enable_unique_key_merge_on_write(tablet->enable_unique_key_merge_on_write()); |
585 | 4 | tablet->tablet_schema()->to_schema_pb(resp->mutable_tablet_schema()); |
586 | 4 | } |
587 | 4 | st.to_protobuf(response.mutable_status()); |
588 | | |
589 | 4 | buf.append(response.SerializeAsString()); |
590 | 4 | auto wst = _write_stream(stream, buf); |
591 | 4 | if (!wst.ok()) { |
592 | 0 | LOG(WARNING) << " report result failed with " << wst << ", " << *this; |
593 | 0 | } |
594 | 4 | } |
595 | | |
596 | 3.63k | void LoadStream::_report_tablet_load_info(StreamId stream, int64_t index_id) { |
597 | 3.63k | std::vector<int64_t> write_tablet_ids; |
598 | 3.63k | auto it = _index_streams_map.find(index_id); |
599 | 3.63k | if (it != _index_streams_map.end()) { |
600 | 3.63k | it->second->get_all_write_tablet_ids(&write_tablet_ids); |
601 | 3.63k | } |
602 | | |
603 | 3.63k | if (!write_tablet_ids.empty()) { |
604 | 3.63k | butil::IOBuf buf; |
605 | 3.63k | PLoadStreamResponse response; |
606 | 3.63k | auto* tablet_load_infos = response.mutable_tablet_load_rowset_num_infos(); |
607 | 3.63k | _collect_tablet_load_info_from_tablets(write_tablet_ids, tablet_load_infos); |
608 | 3.63k | if (tablet_load_infos->empty()) { |
609 | 3.63k | return; |
610 | 3.63k | } |
611 | 0 | buf.append(response.SerializeAsString()); |
612 | 0 | auto wst = _write_stream(stream, buf); |
613 | 0 | if (!wst.ok()) { |
614 | 0 | LOG(WARNING) << "report tablet load info failed with " << wst << ", " << *this; |
615 | 0 | } |
616 | 0 | } |
617 | 3.63k | } |
618 | | |
619 | | void LoadStream::_collect_tablet_load_info_from_tablets( |
620 | | const std::vector<int64_t>& tablet_ids, |
621 | 3.63k | google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>* tablet_load_infos) { |
622 | 11.5k | for (auto tablet_id : tablet_ids) { |
623 | 11.5k | BaseTabletSPtr tablet; |
624 | 11.5k | if (auto res = ExecEnv::get_tablet(tablet_id); res.has_value()) { |
625 | 11.5k | tablet = std::move(res).value(); |
626 | 18.4E | } else { |
627 | 18.4E | continue; |
628 | 18.4E | } |
629 | 11.5k | BaseDeltaWriter::collect_tablet_load_rowset_num_info(tablet.get(), tablet_load_infos); |
630 | 11.5k | } |
631 | 3.63k | } |
632 | | |
633 | 3.88k | Status LoadStream::_write_stream(StreamId stream, butil::IOBuf& buf) { |
634 | 3.88k | for (;;) { |
635 | 3.88k | int ret = 0; |
636 | 3.88k | DBUG_EXECUTE_IF("LoadStream._write_stream.EAGAIN", { ret = EAGAIN; }); |
637 | 3.88k | if (ret == 0) { |
638 | 3.88k | ret = brpc::StreamWrite(stream, buf); |
639 | 3.88k | } |
640 | 3.88k | switch (ret) { |
641 | 3.88k | case 0: |
642 | 3.88k | return Status::OK(); |
643 | 0 | case EAGAIN: { |
644 | 0 | const timespec time = butil::seconds_from_now(config::load_stream_eagain_wait_seconds); |
645 | 0 | int wait_ret = brpc::StreamWait(stream, &time); |
646 | 0 | if (wait_ret != 0) { |
647 | 0 | return Status::InternalError("StreamWait failed, err={}", wait_ret); |
648 | 0 | } |
649 | 0 | break; |
650 | 0 | } |
651 | 0 | default: |
652 | 0 | return Status::InternalError("StreamWrite failed, err={}", ret); |
653 | 3.88k | } |
654 | 3.88k | } |
655 | 0 | return Status::OK(); |
656 | 3.88k | } |
657 | | |
658 | 114k | void LoadStream::_parse_header(butil::IOBuf* const message, PStreamHeader& hdr) { |
659 | 114k | butil::IOBufAsZeroCopyInputStream wrapper(*message); |
660 | 114k | hdr.ParseFromZeroCopyStream(&wrapper); |
661 | 18.4E | VLOG_DEBUG << "header parse result: " << hdr.DebugString(); |
662 | 114k | } |
663 | | |
664 | 110k | Status LoadStream::_append_data(const PStreamHeader& header, butil::IOBuf* data) { |
665 | 110k | SCOPED_TIMER(_append_data_timer); |
666 | 110k | IndexStreamSharedPtr index_stream; |
667 | | |
668 | 110k | int64_t index_id = header.index_id(); |
669 | 110k | DBUG_EXECUTE_IF("TabletStream._append_data.unknown_indexid", |
670 | 110k | { index_id = UNKNOWN_ID_FOR_TEST; }); |
671 | 110k | auto it = _index_streams_map.find(index_id); |
672 | 110k | if (it == _index_streams_map.end()) { |
673 | 1 | return Status::Error<ErrorCode::INVALID_ARGUMENT>("unknown index_id {}", index_id); |
674 | 110k | } else { |
675 | 110k | index_stream = it->second; |
676 | 110k | } |
677 | | |
678 | 110k | return index_stream->append_data(header, data); |
679 | 110k | } |
680 | | |
681 | 3.88k | int LoadStream::on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) { |
682 | 18.4E | VLOG_DEBUG << "on_received_messages " << id << " " << size; |
683 | 7.78k | for (size_t i = 0; i < size; ++i) { |
684 | 118k | while (messages[i]->size() > 0) { |
685 | | // step 1: parse header |
686 | 114k | size_t hdr_len = 0; |
687 | 114k | messages[i]->cutn((void*)&hdr_len, sizeof(size_t)); |
688 | 114k | butil::IOBuf hdr_buf; |
689 | 114k | PStreamHeader hdr; |
690 | 114k | messages[i]->cutn(&hdr_buf, hdr_len); |
691 | 114k | _parse_header(&hdr_buf, hdr); |
692 | | |
693 | | // step 2: cut data |
694 | 114k | size_t data_len = 0; |
695 | 114k | messages[i]->cutn((void*)&data_len, sizeof(size_t)); |
696 | 114k | butil::IOBuf data_buf; |
697 | 114k | PStreamHeader data; |
698 | 114k | messages[i]->cutn(&data_buf, data_len); |
699 | | |
700 | | // step 3: dispatch |
701 | 114k | _dispatch(id, hdr, &data_buf); |
702 | 114k | } |
703 | 3.90k | } |
704 | 3.88k | return 0; |
705 | 3.88k | } |
706 | | |
707 | 114k | void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* data) { |
708 | 114k | VLOG_DEBUG << PStreamHeader_Opcode_Name(hdr.opcode()) << " from " << hdr.src_id() |
709 | 2 | << " with tablet " << hdr.tablet_id(); |
710 | 114k | SCOPED_ATTACH_TASK(_resource_ctx); |
711 | | // CLOSE_LOAD message should not be fault injected, |
712 | | // otherwise the message will be ignored and causing close wait timeout |
713 | 114k | if (hdr.opcode() != PStreamHeader::CLOSE_LOAD) { |
714 | 110k | DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_loadid", { |
715 | 110k | PStreamHeader& t_hdr = const_cast<PStreamHeader&>(hdr); |
716 | 110k | PUniqueId* load_id = t_hdr.mutable_load_id(); |
717 | 110k | load_id->set_hi(UNKNOWN_ID_FOR_TEST); |
718 | 110k | load_id->set_lo(UNKNOWN_ID_FOR_TEST); |
719 | 110k | }); |
720 | 110k | DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_srcid", { |
721 | 110k | PStreamHeader& t_hdr = const_cast<PStreamHeader&>(hdr); |
722 | 110k | t_hdr.set_src_id(UNKNOWN_ID_FOR_TEST); |
723 | 110k | }); |
724 | 110k | } |
725 | 114k | if (UniqueId(hdr.load_id()) != UniqueId(_load_id)) { |
726 | 1 | Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>( |
727 | 1 | "invalid load id {}, expected {}", print_id(hdr.load_id()), print_id(_load_id)); |
728 | 1 | _report_failure(id, st, hdr); |
729 | 1 | return; |
730 | 1 | } |
731 | | |
732 | 114k | { |
733 | 114k | std::lock_guard lock_guard(_lock); |
734 | 114k | if (!_open_streams.contains(hdr.src_id())) { |
735 | 17 | Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>("no open stream from source {}", |
736 | 17 | hdr.src_id()); |
737 | 17 | _report_failure(id, st, hdr); |
738 | 17 | return; |
739 | 17 | } |
740 | 114k | } |
741 | | |
742 | 114k | switch (hdr.opcode()) { |
743 | 3.63k | case PStreamHeader::ADD_SEGMENT: { |
744 | 3.63k | auto st = _append_data(hdr, data); |
745 | 3.63k | if (!st.ok()) { |
746 | 0 | _report_failure(id, st, hdr); |
747 | 3.63k | } else { |
748 | | // Report tablet load info only on ADD_SEGMENT to reduce frequency. |
749 | | // ADD_SEGMENT is sent once per segment, while APPEND_DATA is sent |
750 | | // for every data batch. This reduces unnecessary writes and avoids |
751 | | // potential stream write failures when the sender is closing. |
752 | 3.63k | _report_tablet_load_info(id, hdr.index_id()); |
753 | 3.63k | } |
754 | 3.63k | } break; |
755 | 106k | case PStreamHeader::APPEND_DATA: { |
756 | 106k | auto st = _append_data(hdr, data); |
757 | 106k | if (!st.ok()) { |
758 | 2 | _report_failure(id, st, hdr); |
759 | 2 | } |
760 | 106k | } break; |
761 | 3.85k | case PStreamHeader::CLOSE_LOAD: { |
762 | 3.85k | DBUG_EXECUTE_IF("LoadStream.close_load.block", DBUG_BLOCK); |
763 | 3.85k | std::vector<int64_t> success_tablet_ids; |
764 | 3.85k | FailedTablets failed_tablets; |
765 | 3.85k | std::vector<PTabletID> tablets_to_commit(hdr.tablets().begin(), hdr.tablets().end()); |
766 | 3.85k | bool all_closed = |
767 | 3.85k | close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets); |
768 | 3.85k | _report_result(id, Status::OK(), success_tablet_ids, failed_tablets, true); |
769 | 3.85k | std::lock_guard<bthread::Mutex> lock_guard(_lock); |
770 | | // if incremental stream, we need to wait for all non-incremental streams to be closed |
771 | | // before closing incremental streams. We need a fencing mechanism to avoid use after closing |
772 | | // across different be. |
773 | 3.85k | if (hdr.has_num_incremental_streams() && hdr.num_incremental_streams() > 0) { |
774 | 0 | _closing_stream_ids.push_back(id); |
775 | 3.85k | } else { |
776 | 3.85k | brpc::StreamClose(id); |
777 | 3.85k | } |
778 | | |
779 | 3.85k | if (all_closed) { |
780 | 1.99k | for (auto& closing_id : _closing_stream_ids) { |
781 | 0 | brpc::StreamClose(closing_id); |
782 | 0 | } |
783 | 1.99k | _closing_stream_ids.clear(); |
784 | 1.99k | } |
785 | 3.85k | } break; |
786 | 4 | case PStreamHeader::GET_SCHEMA: { |
787 | 4 | _report_schema(id, hdr); |
788 | 4 | } break; |
789 | 0 | default: |
790 | 0 | LOG(WARNING) << "unexpected stream message " << hdr.opcode() << ", " << *this; |
791 | 0 | DCHECK(false); |
792 | 114k | } |
793 | 114k | } |
794 | | |
795 | 0 | void LoadStream::on_idle_timeout(StreamId id) { |
796 | 0 | LOG(WARNING) << "closing load stream on idle timeout, " << *this; |
797 | 0 | brpc::StreamClose(id); |
798 | 0 | } |
799 | | |
800 | 3.85k | void LoadStream::on_closed(StreamId id) { |
801 | | // `this` may be freed by other threads after increasing `_close_rpc_cnt`, |
802 | | // format string first to prevent use-after-free |
803 | 3.85k | std::stringstream ss; |
804 | 3.85k | ss << *this; |
805 | 3.85k | auto remaining_streams = _total_streams - _close_rpc_cnt.fetch_add(1) - 1; |
806 | 3.85k | LOG(INFO) << "stream " << id << " on_closed, remaining streams = " << remaining_streams << ", " |
807 | 3.85k | << ss.str(); |
808 | 3.85k | if (remaining_streams == 0) { |
809 | 1.99k | _load_stream_mgr->clear_load(_load_id); |
810 | 1.99k | } |
811 | 3.85k | } |
812 | | |
813 | 17.5k | inline std::ostream& operator<<(std::ostream& ostr, const LoadStream& load_stream) { |
814 | 17.5k | ostr << "load_id=" << print_id(load_stream._load_id) << ", txn_id=" << load_stream._txn_id; |
815 | 17.5k | return ostr; |
816 | 17.5k | } |
817 | | |
818 | | #include "common/compile_check_end.h" |
819 | | } // namespace doris |