be/src/load/channel/load_stream.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "load/channel/load_stream.h" |
19 | | |
20 | | #include <brpc/stream.h> |
21 | | #include <bthread/bthread.h> |
22 | | #include <bthread/condition_variable.h> |
23 | | #include <bthread/mutex.h> |
24 | | |
25 | | #include <memory> |
26 | | #include <sstream> |
27 | | |
28 | | #include "bvar/bvar.h" |
29 | | #include "cloud/config.h" |
30 | | #include "common/signal_handler.h" |
31 | | #include "load/channel/load_channel.h" |
32 | | #include "load/channel/load_stream_mgr.h" |
33 | | #include "load/channel/load_stream_writer.h" |
34 | | #include "load/delta_writer/delta_writer.h" |
35 | | #include "runtime/exec_env.h" |
36 | | #include "runtime/fragment_mgr.h" |
37 | | #include "runtime/runtime_profile.h" |
38 | | #include "runtime/workload_group/workload_group_manager.h" |
39 | | #include "storage/rowset/rowset_factory.h" |
40 | | #include "storage/rowset/rowset_meta.h" |
41 | | #include "storage/storage_engine.h" |
42 | | #include "storage/tablet/tablet.h" |
43 | | #include "storage/tablet/tablet_fwd.h" |
44 | | #include "storage/tablet/tablet_manager.h" |
45 | | #include "storage/tablet/tablet_schema.h" |
46 | | #include "storage/tablet_info.h" |
47 | | #include "util/debug_points.h" |
48 | | #include "util/thrift_util.h" |
49 | | #include "util/uid_util.h" |
50 | | |
51 | | #define UNKNOWN_ID_FOR_TEST 0x7c00 |
52 | | |
53 | | namespace doris { |
54 | | |
55 | | bvar::Adder<int64_t> g_load_stream_cnt("load_stream_count"); |
56 | | bvar::LatencyRecorder g_load_stream_flush_wait_ms("load_stream_flush_wait_ms"); |
57 | | bvar::Adder<int> g_load_stream_flush_running_threads("load_stream_flush_wait_threads"); |
58 | | |
59 | | TabletStream::TabletStream(const PUniqueId& load_id, int64_t id, int64_t txn_id, |
60 | | LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile) |
61 | 13.5k | : _id(id), |
62 | 13.5k | _next_segid(0), |
63 | 13.5k | _load_id(load_id), |
64 | 13.5k | _txn_id(txn_id), |
65 | 13.5k | _load_stream_mgr(load_stream_mgr) { |
66 | 13.5k | load_stream_mgr->create_token(_flush_token); |
67 | 13.5k | _profile = profile->create_child(fmt::format("TabletStream {}", id), true, true); |
68 | 13.5k | _append_data_timer = ADD_TIMER(_profile, "AppendDataTime"); |
69 | 13.5k | _add_segment_timer = ADD_TIMER(_profile, "AddSegmentTime"); |
70 | 13.5k | _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime"); |
71 | 13.5k | } |
72 | | |
73 | 10 | inline std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream) { |
74 | 10 | ostr << "load_id=" << print_id(tablet_stream._load_id) << ", txn_id=" << tablet_stream._txn_id |
75 | 10 | << ", tablet_id=" << tablet_stream._id << ", status=" << tablet_stream._status.status(); |
76 | 10 | return ostr; |
77 | 10 | } |
78 | | |
79 | | Status TabletStream::init(std::shared_ptr<OlapTableSchemaParam> schema, int64_t index_id, |
80 | 13.5k | int64_t partition_id) { |
81 | 13.5k | WriteRequest req { |
82 | 13.5k | .tablet_id = _id, |
83 | 13.5k | .txn_id = _txn_id, |
84 | 13.5k | .index_id = index_id, |
85 | 13.5k | .partition_id = partition_id, |
86 | 13.5k | .load_id = _load_id, |
87 | 13.5k | .table_schema_param = schema, |
88 | | // TODO(plat1ko): write_file_cache |
89 | 13.5k | .storage_vault_id {}, |
90 | 13.5k | }; |
91 | | |
92 | 13.5k | _load_stream_writer = std::make_shared<LoadStreamWriter>(&req, _profile); |
93 | 13.5k | DBUG_EXECUTE_IF("TabletStream.init.uninited_writer", { |
94 | 13.5k | _status.update(Status::Uninitialized("fault injection")); |
95 | 13.5k | return _status.status(); |
96 | 13.5k | }); |
97 | 13.5k | _status.update(_load_stream_writer->init()); |
98 | 13.5k | if (!_status.ok()) { |
99 | 1 | LOG(INFO) << "failed to init rowset builder due to " << *this; |
100 | 1 | } |
101 | 13.5k | return _status.status(); |
102 | 13.5k | } |
103 | | |
104 | 110k | Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data) { |
105 | 110k | if (!_status.ok()) { |
106 | 1 | return _status.status(); |
107 | 1 | } |
108 | | |
109 | | // dispatch add_segment request |
110 | 110k | if (header.opcode() == PStreamHeader::ADD_SEGMENT) { |
111 | 3.71k | return add_segment(header, data); |
112 | 3.71k | } |
113 | | |
114 | 106k | SCOPED_TIMER(_append_data_timer); |
115 | | |
116 | 106k | int64_t src_id = header.src_id(); |
117 | 106k | uint32_t segid = header.segment_id(); |
118 | | // Ensure there are enough space and mapping are built. |
119 | 106k | SegIdMapping* mapping = nullptr; |
120 | 106k | { |
121 | 106k | std::lock_guard lock_guard(_lock); |
122 | 106k | if (!_segids_mapping.contains(src_id)) { |
123 | 3.72k | _segids_mapping[src_id] = std::make_unique<SegIdMapping>(); |
124 | 3.72k | } |
125 | 106k | mapping = _segids_mapping[src_id].get(); |
126 | 106k | } |
127 | 106k | if (segid + 1 > mapping->size()) { |
128 | | // TODO: Each sender lock is enough. |
129 | 3.72k | std::lock_guard lock_guard(_lock); |
130 | 3.72k | ssize_t origin_size = mapping->size(); |
131 | 3.72k | if (segid + 1 > origin_size) { |
132 | 3.72k | mapping->resize(segid + 1, std::numeric_limits<uint32_t>::max()); |
133 | 7.46k | for (size_t index = origin_size; index <= segid; index++) { |
134 | 3.73k | mapping->at(index) = _next_segid; |
135 | 3.73k | _next_segid++; |
136 | 3.73k | VLOG_DEBUG << "src_id=" << src_id << ", segid=" << index << " to " |
137 | 0 | << " segid=" << _next_segid - 1 << ", " << *this; |
138 | 3.73k | } |
139 | 3.72k | } |
140 | 3.72k | } |
141 | | |
142 | | // Each sender sends data in one segment sequential, so we also do not |
143 | | // need a lock here. |
144 | 106k | bool eos = header.segment_eos(); |
145 | 106k | FileType file_type = header.file_type(); |
146 | 106k | uint32_t new_segid = mapping->at(segid); |
147 | 106k | DCHECK(new_segid != std::numeric_limits<uint32_t>::max()); |
148 | 106k | butil::IOBuf buf = data->movable(); |
149 | 106k | auto flush_func = [this, new_segid, eos, buf, header, file_type]() mutable { |
150 | 106k | signal::set_signal_task_id(_load_id); |
151 | 106k | g_load_stream_flush_running_threads << -1; |
152 | 106k | auto st = _load_stream_writer->append_data(new_segid, header.offset(), buf, file_type); |
153 | 106k | if (!st.ok() && !config::is_cloud_mode()) { |
154 | 1 | auto res = ExecEnv::get_tablet(_id); |
155 | 1 | TabletSharedPtr tablet = |
156 | 1 | res.has_value() ? std::dynamic_pointer_cast<Tablet>(res.value()) : nullptr; |
157 | 1 | if (tablet) { |
158 | 1 | tablet->report_error(st); |
159 | 1 | } |
160 | 1 | } |
161 | 106k | if (eos && st.ok()) { |
162 | 3.73k | DBUG_EXECUTE_IF("TabletStream.append_data.unknown_file_type", |
163 | 3.73k | { file_type = static_cast<FileType>(-1); }); |
164 | 3.73k | if (file_type == FileType::SEGMENT_FILE || file_type == FileType::INVERTED_INDEX_FILE) { |
165 | 3.73k | st = _load_stream_writer->close_writer(new_segid, file_type); |
166 | 3.73k | } else { |
167 | 0 | st = Status::InternalError( |
168 | 0 | "appent data failed, file type error, file type = {}, " |
169 | 0 | "segment_id={}", |
170 | 0 | file_type, new_segid); |
171 | 0 | } |
172 | 3.73k | } |
173 | 106k | DBUG_EXECUTE_IF("TabletStream.append_data.append_failed", |
174 | 106k | { st = Status::InternalError("fault injection"); }); |
175 | 106k | if (!st.ok()) { |
176 | 2 | _status.update(st); |
177 | 2 | LOG(WARNING) << "write data failed " << st << ", " << *this; |
178 | 2 | } |
179 | 106k | }; |
180 | 106k | auto load_stream_flush_token_max_tasks = config::load_stream_flush_token_max_tasks; |
181 | 106k | auto load_stream_max_wait_flush_token_time_ms = |
182 | 106k | config::load_stream_max_wait_flush_token_time_ms; |
183 | 106k | DBUG_EXECUTE_IF("TabletStream.append_data.long_wait", { |
184 | 106k | load_stream_flush_token_max_tasks = 0; |
185 | 106k | load_stream_max_wait_flush_token_time_ms = 1000; |
186 | 106k | }); |
187 | 106k | MonotonicStopWatch timer; |
188 | 106k | timer.start(); |
189 | 107k | while (_flush_token->num_tasks() >= load_stream_flush_token_max_tasks) { |
190 | 420 | if (timer.elapsed_time() / 1000 / 1000 >= load_stream_max_wait_flush_token_time_ms) { |
191 | 0 | _status.update( |
192 | 0 | Status::Error<true>("wait flush token back pressure time is more than " |
193 | 0 | "load_stream_max_wait_flush_token_time {}", |
194 | 0 | load_stream_max_wait_flush_token_time_ms)); |
195 | 0 | return _status.status(); |
196 | 0 | } |
197 | 420 | bthread_usleep(2 * 1000); // 2ms |
198 | 420 | } |
199 | 106k | timer.stop(); |
200 | 106k | int64_t time_ms = timer.elapsed_time() / 1000 / 1000; |
201 | 106k | g_load_stream_flush_wait_ms << time_ms; |
202 | 106k | g_load_stream_flush_running_threads << 1; |
203 | 106k | Status st = Status::OK(); |
204 | 106k | DBUG_EXECUTE_IF("TabletStream.append_data.submit_func_failed", |
205 | 106k | { st = Status::InternalError("fault injection"); }); |
206 | 106k | if (st.ok()) { |
207 | 106k | st = _flush_token->submit_func(flush_func); |
208 | 106k | } |
209 | 106k | if (!st.ok()) { |
210 | 0 | _status.update(st); |
211 | 0 | } |
212 | 106k | return _status.status(); |
213 | 106k | } |
214 | | |
215 | 3.71k | Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data) { |
216 | 3.71k | if (!_status.ok()) { |
217 | 0 | return _status.status(); |
218 | 0 | } |
219 | | |
220 | 3.71k | SCOPED_TIMER(_add_segment_timer); |
221 | 3.71k | DCHECK(header.has_segment_statistics()); |
222 | 3.71k | SegmentStatistics stat(header.segment_statistics()); |
223 | | |
224 | 3.71k | int64_t src_id = header.src_id(); |
225 | 3.71k | uint32_t segid = header.segment_id(); |
226 | 3.71k | uint32_t new_segid; |
227 | 3.71k | DBUG_EXECUTE_IF("TabletStream.add_segment.unknown_segid", { segid = UNKNOWN_ID_FOR_TEST; }); |
228 | 3.71k | { |
229 | 3.71k | std::lock_guard lock_guard(_lock); |
230 | 3.71k | if (!_segids_mapping.contains(src_id)) { |
231 | 0 | _status.update(Status::InternalError( |
232 | 0 | "add segment failed, no segment written by this src be yet, src_id={}, " |
233 | 0 | "segment_id={}", |
234 | 0 | src_id, segid)); |
235 | 0 | return _status.status(); |
236 | 0 | } |
237 | 3.71k | DBUG_EXECUTE_IF("TabletStream.add_segment.segid_never_written", |
238 | 3.71k | { segid = static_cast<uint32_t>(_segids_mapping[src_id]->size()); }); |
239 | 3.71k | if (segid >= _segids_mapping[src_id]->size()) { |
240 | 0 | _status.update(Status::InternalError( |
241 | 0 | "add segment failed, segment is never written, src_id={}, segment_id={}", |
242 | 0 | src_id, segid)); |
243 | 0 | return _status.status(); |
244 | 0 | } |
245 | 3.71k | new_segid = _segids_mapping[src_id]->at(segid); |
246 | 3.71k | } |
247 | 3.71k | DCHECK(new_segid != std::numeric_limits<uint32_t>::max()); |
248 | | |
249 | 3.71k | auto add_segment_func = [this, new_segid, stat]() { |
250 | 3.71k | signal::set_signal_task_id(_load_id); |
251 | 3.71k | auto st = _load_stream_writer->add_segment(new_segid, stat); |
252 | 3.71k | DBUG_EXECUTE_IF("TabletStream.add_segment.add_segment_failed", |
253 | 3.71k | { st = Status::InternalError("fault injection"); }); |
254 | 3.71k | if (!st.ok()) { |
255 | 0 | _status.update(st); |
256 | 0 | LOG(INFO) << "add segment failed " << *this; |
257 | 0 | } |
258 | 3.71k | }; |
259 | 3.71k | Status st = Status::OK(); |
260 | 3.71k | DBUG_EXECUTE_IF("TabletStream.add_segment.submit_func_failed", |
261 | 3.71k | { st = Status::InternalError("fault injection"); }); |
262 | 3.71k | if (st.ok()) { |
263 | 3.71k | st = _flush_token->submit_func(add_segment_func); |
264 | 3.71k | } |
265 | 3.71k | if (!st.ok()) { |
266 | 0 | _status.update(st); |
267 | 0 | } |
268 | 3.71k | return _status.status(); |
269 | 3.71k | } |
270 | | |
271 | 40.4k | Status TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) { |
272 | 40.4k | bthread::Mutex mu; |
273 | 40.4k | std::unique_lock<bthread::Mutex> lock(mu); |
274 | 40.4k | bthread::ConditionVariable cv; |
275 | 40.4k | auto st = Status::OK(); |
276 | 40.4k | auto func = [this, &mu, &cv, &st, &fn] { |
277 | 40.4k | signal::set_signal_task_id(_load_id); |
278 | 40.4k | st = fn(); |
279 | 40.4k | std::lock_guard<bthread::Mutex> lock(mu); |
280 | 40.4k | cv.notify_one(); |
281 | 40.4k | }; |
282 | 40.4k | bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(func); |
283 | 40.4k | if (!ret) { |
284 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
285 | 0 | "there is not enough thread resource for close load"); |
286 | 0 | } |
287 | 40.4k | cv.wait(lock); |
288 | 40.4k | return st; |
289 | 40.4k | } |
290 | | |
291 | 27.0k | void TabletStream::wait_for_flush_tasks() { |
292 | 27.0k | { |
293 | 27.0k | std::lock_guard lock_guard(_lock); |
294 | 27.0k | if (_flush_tasks_done) { |
295 | 13.5k | return; |
296 | 13.5k | } |
297 | 13.5k | _flush_tasks_done = true; |
298 | 13.5k | } |
299 | | |
300 | 13.5k | if (!_status.ok()) { |
301 | 2 | _flush_token->shutdown(); |
302 | 2 | return; |
303 | 2 | } |
304 | | |
305 | | // Use heavy_work_pool to avoid blocking bthread |
306 | 13.5k | auto st = _run_in_heavy_work_pool([this]() { |
307 | 13.5k | _flush_token->wait(); |
308 | 13.5k | return Status::OK(); |
309 | 13.5k | }); |
310 | 13.5k | if (!st.ok()) { |
311 | | // If heavy_work_pool is unavailable, fall back to shutdown |
312 | | // which will cancel pending tasks and wait for running tasks |
313 | 0 | _flush_token->shutdown(); |
314 | 0 | _status.update(st); |
315 | 0 | } |
316 | 13.5k | } |
317 | | |
318 | 13.5k | void TabletStream::pre_close() { |
319 | 13.5k | SCOPED_TIMER(_close_wait_timer); |
320 | 13.5k | wait_for_flush_tasks(); |
321 | | |
322 | 13.5k | if (!_status.ok()) { |
323 | 3 | return; |
324 | 3 | } |
325 | | |
326 | 13.4k | DBUG_EXECUTE_IF("TabletStream.close.segment_num_mismatch", { _num_segments++; }); |
327 | 13.4k | if (_check_num_segments && (_next_segid.load() != _num_segments)) { |
328 | 2 | _status.update(Status::Corruption( |
329 | 2 | "segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id, |
330 | 2 | _num_segments, _next_segid.load(), print_id(_load_id))); |
331 | 2 | return; |
332 | 2 | } |
333 | | |
334 | 13.4k | _status.update(_run_in_heavy_work_pool([this]() { return _load_stream_writer->pre_close(); })); |
335 | 13.4k | } |
336 | | |
337 | 13.5k | Status TabletStream::close() { |
338 | 13.5k | if (!_status.ok()) { |
339 | 6 | return _status.status(); |
340 | 6 | } |
341 | | |
342 | 13.4k | SCOPED_TIMER(_close_wait_timer); |
343 | 13.4k | _status.update(_run_in_heavy_work_pool([this]() { return _load_stream_writer->close(); })); |
344 | 13.4k | return _status.status(); |
345 | 13.5k | } |
346 | | |
347 | | IndexStream::IndexStream(const PUniqueId& load_id, int64_t id, int64_t txn_id, |
348 | | std::shared_ptr<OlapTableSchemaParam> schema, |
349 | | LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile) |
350 | 2.10k | : _id(id), |
351 | 2.10k | _load_id(load_id), |
352 | 2.10k | _txn_id(txn_id), |
353 | 2.10k | _schema(schema), |
354 | 2.10k | _load_stream_mgr(load_stream_mgr) { |
355 | 2.10k | _profile = profile->create_child(fmt::format("IndexStream {}", id), true, true); |
356 | 2.10k | _append_data_timer = ADD_TIMER(_profile, "AppendDataTime"); |
357 | 2.10k | _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime"); |
358 | 2.10k | } |
359 | | |
360 | 2.10k | IndexStream::~IndexStream() { |
361 | | // Ensure all TabletStreams have their flush tokens properly handled before destruction. |
362 | | // In normal flow, close() should have called pre_close() on all tablet streams. |
363 | | // But if IndexStream is destroyed without close() being called (e.g., on_idle_timeout), |
364 | | // we need to wait for flush tasks here to ensure flush tokens are properly shut down. |
365 | 13.5k | for (auto& [_, tablet_stream] : _tablet_streams_map) { |
366 | 13.5k | tablet_stream->wait_for_flush_tasks(); |
367 | 13.5k | } |
368 | 2.10k | } |
369 | | |
370 | 110k | Status IndexStream::append_data(const PStreamHeader& header, butil::IOBuf* data) { |
371 | 110k | SCOPED_TIMER(_append_data_timer); |
372 | 110k | int64_t tablet_id = header.tablet_id(); |
373 | 110k | TabletStreamSharedPtr tablet_stream; |
374 | 110k | { |
375 | 110k | std::lock_guard lock_guard(_lock); |
376 | 110k | auto it = _tablet_streams_map.find(tablet_id); |
377 | 110k | if (it == _tablet_streams_map.end()) { |
378 | 3.72k | _init_tablet_stream(tablet_stream, tablet_id, header.partition_id()); |
379 | 106k | } else { |
380 | 106k | tablet_stream = it->second; |
381 | 106k | } |
382 | 110k | } |
383 | | |
384 | 110k | return tablet_stream->append_data(header, data); |
385 | 110k | } |
386 | | |
387 | | void IndexStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id, |
388 | 13.5k | int64_t partition_id) { |
389 | 13.5k | tablet_stream = std::make_shared<TabletStream>(_load_id, tablet_id, _txn_id, _load_stream_mgr, |
390 | 13.5k | _profile); |
391 | 13.5k | _tablet_streams_map[tablet_id] = tablet_stream; |
392 | 13.5k | auto st = tablet_stream->init(_schema, _id, partition_id); |
393 | 13.5k | if (!st.ok()) { |
394 | 1 | LOG(WARNING) << "tablet stream init failed " << *tablet_stream; |
395 | 1 | } |
396 | 13.5k | } |
397 | | |
398 | 3.71k | void IndexStream::get_all_write_tablet_ids(std::vector<int64_t>* tablet_ids) { |
399 | 3.71k | std::lock_guard lock_guard(_lock); |
400 | 12.1k | for (const auto& [tablet_id, _] : _tablet_streams_map) { |
401 | 12.1k | tablet_ids->push_back(tablet_id); |
402 | 12.1k | } |
403 | 3.71k | } |
404 | | |
405 | | void IndexStream::close(const std::vector<PTabletID>& tablets_to_commit, |
406 | 2.09k | std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) { |
407 | 2.09k | std::lock_guard lock_guard(_lock); |
408 | 2.09k | SCOPED_TIMER(_close_wait_timer); |
409 | | // open all need commit tablets |
410 | 13.5k | for (const auto& tablet : tablets_to_commit) { |
411 | 13.5k | if (_id != tablet.index_id()) { |
412 | 18 | continue; |
413 | 18 | } |
414 | 13.5k | TabletStreamSharedPtr tablet_stream; |
415 | 13.5k | auto it = _tablet_streams_map.find(tablet.tablet_id()); |
416 | 13.5k | if (it == _tablet_streams_map.end()) { |
417 | 9.77k | _init_tablet_stream(tablet_stream, tablet.tablet_id(), tablet.partition_id()); |
418 | 9.77k | } else { |
419 | 3.72k | tablet_stream = it->second; |
420 | 3.72k | } |
421 | 13.5k | if (tablet.has_num_segments()) { |
422 | 13.5k | tablet_stream->add_num_segments(tablet.num_segments()); |
423 | 13.5k | } else { |
424 | | // for compatibility reasons (sink from old version BE) |
425 | 0 | tablet_stream->disable_num_segments_check(); |
426 | 0 | } |
427 | 13.5k | } |
428 | | |
429 | 13.5k | for (auto& [_, tablet_stream] : _tablet_streams_map) { |
430 | 13.5k | tablet_stream->pre_close(); |
431 | 13.5k | } |
432 | | |
433 | 13.5k | for (auto& [_, tablet_stream] : _tablet_streams_map) { |
434 | 13.5k | auto st = tablet_stream->close(); |
435 | 13.5k | if (st.ok()) { |
436 | 13.4k | success_tablet_ids->push_back(tablet_stream->id()); |
437 | 13.4k | } else { |
438 | 6 | LOG(INFO) << "close tablet stream " << *tablet_stream << ", status=" << st; |
439 | 6 | failed_tablets->emplace_back(tablet_stream->id(), st); |
440 | 6 | } |
441 | 13.5k | } |
442 | 2.09k | } |
443 | | |
444 | | // TODO: Profile is temporary disabled, because: |
445 | | // 1. It's not being processed by the upstream for now |
446 | | // 2. There are some problems in _profile->to_thrift() |
447 | | LoadStream::LoadStream(const PUniqueId& load_id, LoadStreamMgr* load_stream_mgr, |
448 | | bool enable_profile) |
449 | 2.07k | : _load_id(load_id), _enable_profile(false), _load_stream_mgr(load_stream_mgr) { |
450 | 2.07k | g_load_stream_cnt << 1; |
451 | 2.07k | _profile = std::make_unique<RuntimeProfile>("LoadStream"); |
452 | 2.07k | _append_data_timer = ADD_TIMER(_profile, "AppendDataTime"); |
453 | 2.07k | _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime"); |
454 | 2.07k | TUniqueId load_tid = ((UniqueId)load_id).to_thrift(); |
455 | 2.07k | #ifndef BE_TEST |
456 | 2.07k | std::shared_ptr<QueryContext> query_context = |
457 | 2.07k | ExecEnv::GetInstance()->fragment_mgr()->get_query_ctx(load_tid); |
458 | 2.07k | if (query_context != nullptr) { |
459 | 2.07k | _resource_ctx = query_context->resource_ctx(); |
460 | 2.07k | } else { |
461 | 0 | _resource_ctx = ResourceContext::create_shared(); |
462 | 0 | _resource_ctx->task_controller()->set_task_id(load_tid); |
463 | 0 | std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared( |
464 | 0 | MemTrackerLimiter::Type::LOAD, |
465 | 0 | fmt::format("(FromLoadStream)Load#Id={}", ((UniqueId)load_id).to_string())); |
466 | 0 | _resource_ctx->memory_context()->set_mem_tracker(mem_tracker); |
467 | 0 | } |
468 | | #else |
469 | | _resource_ctx = ResourceContext::create_shared(); |
470 | | _resource_ctx->task_controller()->set_task_id(load_tid); |
471 | | std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared( |
472 | | MemTrackerLimiter::Type::LOAD, |
473 | | fmt::format("(FromLoadStream)Load#Id={}", ((UniqueId)load_id).to_string())); |
474 | | _resource_ctx->memory_context()->set_mem_tracker(mem_tracker); |
475 | | #endif |
476 | 2.07k | } |
477 | | |
478 | 2.08k | LoadStream::~LoadStream() { |
479 | 2.08k | g_load_stream_cnt << -1; |
480 | 2.08k | LOG(INFO) << "load stream is deconstructed " << *this; |
481 | 2.08k | } |
482 | | |
483 | 2.08k | Status LoadStream::init(const POpenLoadStreamRequest* request) { |
484 | 2.08k | _txn_id = request->txn_id(); |
485 | 2.08k | _total_streams = static_cast<int32_t>(request->total_streams()); |
486 | 2.08k | _is_incremental = (_total_streams == 0); |
487 | | |
488 | 2.08k | _schema = std::make_shared<OlapTableSchemaParam>(); |
489 | 2.08k | RETURN_IF_ERROR(_schema->init(request->schema())); |
490 | 2.10k | for (auto& index : request->schema().indexes()) { |
491 | 2.10k | _index_streams_map[index.id()] = std::make_shared<IndexStream>( |
492 | 2.10k | _load_id, index.id(), _txn_id, _schema, _load_stream_mgr, _profile.get()); |
493 | 2.10k | } |
494 | 2.08k | LOG(INFO) << "succeed to init load stream " << *this; |
495 | 2.08k | return Status::OK(); |
496 | 2.08k | } |
497 | | |
498 | | bool LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit, |
499 | 4.02k | std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) { |
500 | 4.02k | std::lock_guard<bthread::Mutex> lock_guard(_lock); |
501 | 4.02k | SCOPED_TIMER(_close_wait_timer); |
502 | | |
503 | | // we do nothing until recv CLOSE_LOAD from all stream to ensure all data are handled before ack |
504 | 4.02k | _open_streams[src_id]--; |
505 | 4.02k | if (_open_streams[src_id] == 0) { |
506 | 2.08k | _open_streams.erase(src_id); |
507 | 2.08k | } |
508 | 4.02k | _close_load_cnt++; |
509 | 4.02k | LOG(INFO) << "received CLOSE_LOAD from sender " << src_id << ", remaining " |
510 | 4.02k | << _total_streams - _close_load_cnt << " senders, " << *this; |
511 | | |
512 | 4.02k | _tablets_to_commit.insert(_tablets_to_commit.end(), tablets_to_commit.begin(), |
513 | 4.02k | tablets_to_commit.end()); |
514 | | |
515 | 4.02k | if (_close_load_cnt < _total_streams) { |
516 | | // do not return commit info if there is remaining streams. |
517 | 1.94k | return false; |
518 | 1.94k | } |
519 | | |
520 | 2.09k | for (auto& [_, index_stream] : _index_streams_map) { |
521 | 2.09k | index_stream->close(_tablets_to_commit, success_tablet_ids, failed_tablets); |
522 | 2.09k | } |
523 | 2.08k | LOG(INFO) << "close load " << *this << ", success_tablet_num=" << success_tablet_ids->size() |
524 | 2.08k | << ", failed_tablet_num=" << failed_tablets->size(); |
525 | 2.08k | return true; |
526 | 4.02k | } |
527 | | |
528 | | void LoadStream::_report_result(StreamId stream, const Status& status, |
529 | | const std::vector<int64_t>& success_tablet_ids, |
530 | 4.04k | const FailedTablets& failed_tablets, bool eos) { |
531 | 4.04k | LOG(INFO) << "report result " << *this << ", success tablet num " << success_tablet_ids.size() |
532 | 4.04k | << ", failed tablet num " << failed_tablets.size(); |
533 | 4.04k | butil::IOBuf buf; |
534 | 4.04k | PLoadStreamResponse response; |
535 | 4.04k | response.set_eos(eos); |
536 | 4.04k | status.to_protobuf(response.mutable_status()); |
537 | 13.4k | for (auto& id : success_tablet_ids) { |
538 | 13.4k | response.add_success_tablet_ids(id); |
539 | 13.4k | } |
540 | 4.04k | for (auto& [id, st] : failed_tablets) { |
541 | 10 | auto pb = response.add_failed_tablets(); |
542 | 10 | pb->set_id(id); |
543 | 10 | st.to_protobuf(pb->mutable_status()); |
544 | 10 | } |
545 | | |
546 | 4.04k | if (_enable_profile && _close_load_cnt == _total_streams) { |
547 | 0 | TRuntimeProfileTree tprofile; |
548 | 0 | ThriftSerializer ser(false, 4096); |
549 | 0 | uint8_t* profile_buf = nullptr; |
550 | 0 | uint32_t len = 0; |
551 | 0 | std::unique_lock<bthread::Mutex> l(_lock); |
552 | |
|
553 | 0 | _profile->to_thrift(&tprofile); |
554 | 0 | auto st = ser.serialize(&tprofile, &len, &profile_buf); |
555 | 0 | if (st.ok()) { |
556 | 0 | response.set_load_stream_profile(profile_buf, len); |
557 | 0 | } else { |
558 | 0 | LOG(WARNING) << "TRuntimeProfileTree serialize failed, errmsg=" << st << ", " << *this; |
559 | 0 | } |
560 | 0 | } |
561 | | |
562 | 4.04k | buf.append(response.SerializeAsString()); |
563 | 4.04k | auto wst = _write_stream(stream, buf); |
564 | 4.04k | if (!wst.ok()) { |
565 | 0 | LOG(WARNING) << " report result failed with " << wst << ", " << *this; |
566 | 0 | } |
567 | 4.04k | } |
568 | | |
569 | 4 | void LoadStream::_report_schema(StreamId stream, const PStreamHeader& hdr) { |
570 | 4 | butil::IOBuf buf; |
571 | 4 | PLoadStreamResponse response; |
572 | 4 | Status st = Status::OK(); |
573 | 4 | for (const auto& req : hdr.tablets()) { |
574 | 4 | BaseTabletSPtr tablet; |
575 | 4 | if (auto res = ExecEnv::get_tablet(req.tablet_id()); res.has_value()) { |
576 | 4 | tablet = std::move(res).value(); |
577 | 4 | } else { |
578 | 0 | st = std::move(res).error(); |
579 | 0 | break; |
580 | 0 | } |
581 | 4 | auto* resp = response.add_tablet_schemas(); |
582 | 4 | resp->set_index_id(req.index_id()); |
583 | 4 | resp->set_enable_unique_key_merge_on_write(tablet->enable_unique_key_merge_on_write()); |
584 | 4 | tablet->tablet_schema()->to_schema_pb(resp->mutable_tablet_schema()); |
585 | 4 | } |
586 | 4 | st.to_protobuf(response.mutable_status()); |
587 | | |
588 | 4 | buf.append(response.SerializeAsString()); |
589 | 4 | auto wst = _write_stream(stream, buf); |
590 | 4 | if (!wst.ok()) { |
591 | 0 | LOG(WARNING) << " report result failed with " << wst << ", " << *this; |
592 | 0 | } |
593 | 4 | } |
594 | | |
595 | 3.71k | void LoadStream::_report_tablet_load_info(StreamId stream, int64_t index_id) { |
596 | 3.71k | std::vector<int64_t> write_tablet_ids; |
597 | 3.71k | auto it = _index_streams_map.find(index_id); |
598 | 3.71k | if (it != _index_streams_map.end()) { |
599 | 3.71k | it->second->get_all_write_tablet_ids(&write_tablet_ids); |
600 | 3.71k | } |
601 | | |
602 | 3.71k | if (!write_tablet_ids.empty()) { |
603 | 3.71k | butil::IOBuf buf; |
604 | 3.71k | PLoadStreamResponse response; |
605 | 3.71k | auto* tablet_load_infos = response.mutable_tablet_load_rowset_num_infos(); |
606 | 3.71k | _collect_tablet_load_info_from_tablets(write_tablet_ids, tablet_load_infos); |
607 | 3.71k | if (tablet_load_infos->empty()) { |
608 | 3.71k | return; |
609 | 3.71k | } |
610 | 0 | buf.append(response.SerializeAsString()); |
611 | 0 | auto wst = _write_stream(stream, buf); |
612 | 0 | if (!wst.ok()) { |
613 | 0 | LOG(WARNING) << "report tablet load info failed with " << wst << ", " << *this; |
614 | 0 | } |
615 | 0 | } |
616 | 3.71k | } |
617 | | |
618 | | void LoadStream::_collect_tablet_load_info_from_tablets( |
619 | | const std::vector<int64_t>& tablet_ids, |
620 | 3.71k | google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>* tablet_load_infos) { |
621 | 12.1k | for (auto tablet_id : tablet_ids) { |
622 | 12.1k | BaseTabletSPtr tablet; |
623 | 12.1k | if (auto res = ExecEnv::get_tablet(tablet_id); res.has_value()) { |
624 | 12.1k | tablet = std::move(res).value(); |
625 | 12.1k | } else { |
626 | 0 | continue; |
627 | 0 | } |
628 | 12.1k | BaseDeltaWriter::collect_tablet_load_rowset_num_info(tablet.get(), tablet_load_infos); |
629 | 12.1k | } |
630 | 3.71k | } |
631 | | |
632 | 4.05k | Status LoadStream::_write_stream(StreamId stream, butil::IOBuf& buf) { |
633 | 4.05k | for (;;) { |
634 | 4.05k | int ret = 0; |
635 | 4.05k | DBUG_EXECUTE_IF("LoadStream._write_stream.EAGAIN", { ret = EAGAIN; }); |
636 | 4.05k | if (ret == 0) { |
637 | 4.05k | ret = brpc::StreamWrite(stream, buf); |
638 | 4.05k | } |
639 | 4.05k | switch (ret) { |
640 | 4.05k | case 0: |
641 | 4.05k | return Status::OK(); |
642 | 0 | case EAGAIN: { |
643 | 0 | const timespec time = butil::seconds_from_now(config::load_stream_eagain_wait_seconds); |
644 | 0 | int wait_ret = brpc::StreamWait(stream, &time); |
645 | 0 | if (wait_ret != 0) { |
646 | 0 | return Status::InternalError("StreamWait failed, err={}", wait_ret); |
647 | 0 | } |
648 | 0 | break; |
649 | 0 | } |
650 | 0 | default: |
651 | 0 | return Status::InternalError("StreamWrite failed, err={}", ret); |
652 | 4.05k | } |
653 | 4.05k | } |
654 | 0 | return Status::OK(); |
655 | 4.05k | } |
656 | | |
657 | 114k | void LoadStream::_parse_header(butil::IOBuf* const message, PStreamHeader& hdr) { |
658 | 114k | butil::IOBufAsZeroCopyInputStream wrapper(*message); |
659 | 114k | hdr.ParseFromZeroCopyStream(&wrapper); |
660 | 18.4E | VLOG_DEBUG << "header parse result: " << hdr.DebugString(); |
661 | 114k | } |
662 | | |
663 | 110k | Status LoadStream::_append_data(const PStreamHeader& header, butil::IOBuf* data) { |
664 | 110k | SCOPED_TIMER(_append_data_timer); |
665 | 110k | IndexStreamSharedPtr index_stream; |
666 | | |
667 | 110k | int64_t index_id = header.index_id(); |
668 | 110k | DBUG_EXECUTE_IF("TabletStream._append_data.unknown_indexid", |
669 | 110k | { index_id = UNKNOWN_ID_FOR_TEST; }); |
670 | 110k | auto it = _index_streams_map.find(index_id); |
671 | 110k | if (it == _index_streams_map.end()) { |
672 | 1 | return Status::Error<ErrorCode::INVALID_ARGUMENT>("unknown index_id {}", index_id); |
673 | 110k | } else { |
674 | 110k | index_stream = it->second; |
675 | 110k | } |
676 | | |
677 | 110k | return index_stream->append_data(header, data); |
678 | 110k | } |
679 | | |
680 | 4.05k | int LoadStream::on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) { |
681 | 4.05k | VLOG_DEBUG << "on_received_messages " << id << " " << size; |
682 | 8.10k | for (size_t i = 0; i < size; ++i) { |
683 | 118k | while (messages[i]->size() > 0) { |
684 | | // step 1: parse header |
685 | 114k | size_t hdr_len = 0; |
686 | 114k | messages[i]->cutn((void*)&hdr_len, sizeof(size_t)); |
687 | 114k | butil::IOBuf hdr_buf; |
688 | 114k | PStreamHeader hdr; |
689 | 114k | messages[i]->cutn(&hdr_buf, hdr_len); |
690 | 114k | _parse_header(&hdr_buf, hdr); |
691 | | |
692 | | // step 2: cut data |
693 | 114k | size_t data_len = 0; |
694 | 114k | messages[i]->cutn((void*)&data_len, sizeof(size_t)); |
695 | 114k | butil::IOBuf data_buf; |
696 | 114k | PStreamHeader data; |
697 | 114k | messages[i]->cutn(&data_buf, data_len); |
698 | | |
699 | | // step 3: dispatch |
700 | 114k | _dispatch(id, hdr, &data_buf); |
701 | 114k | } |
702 | 4.05k | } |
703 | 4.05k | return 0; |
704 | 4.05k | } |
705 | | |
706 | 114k | void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* data) { |
707 | 114k | VLOG_DEBUG << PStreamHeader_Opcode_Name(hdr.opcode()) << " from " << hdr.src_id() |
708 | 0 | << " with tablet " << hdr.tablet_id(); |
709 | 114k | SCOPED_ATTACH_TASK(_resource_ctx); |
710 | | // CLOSE_LOAD message should not be fault injected, |
711 | | // otherwise the message will be ignored and causing close wait timeout |
712 | 114k | if (hdr.opcode() != PStreamHeader::CLOSE_LOAD) { |
713 | 110k | DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_loadid", { |
714 | 110k | PStreamHeader& t_hdr = const_cast<PStreamHeader&>(hdr); |
715 | 110k | PUniqueId* load_id = t_hdr.mutable_load_id(); |
716 | 110k | load_id->set_hi(UNKNOWN_ID_FOR_TEST); |
717 | 110k | load_id->set_lo(UNKNOWN_ID_FOR_TEST); |
718 | 110k | }); |
719 | 110k | DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_srcid", { |
720 | 110k | PStreamHeader& t_hdr = const_cast<PStreamHeader&>(hdr); |
721 | 110k | t_hdr.set_src_id(UNKNOWN_ID_FOR_TEST); |
722 | 110k | }); |
723 | 110k | } |
724 | 114k | if (UniqueId(hdr.load_id()) != UniqueId(_load_id)) { |
725 | 1 | Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>( |
726 | 1 | "invalid load id {}, expected {}", print_id(hdr.load_id()), print_id(_load_id)); |
727 | 1 | _report_failure(id, st, hdr); |
728 | 1 | return; |
729 | 1 | } |
730 | | |
731 | 114k | { |
732 | 114k | std::lock_guard lock_guard(_lock); |
733 | 114k | if (!_open_streams.contains(hdr.src_id())) { |
734 | 17 | Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>("no open stream from source {}", |
735 | 17 | hdr.src_id()); |
736 | 17 | _report_failure(id, st, hdr); |
737 | 17 | return; |
738 | 17 | } |
739 | 114k | } |
740 | | |
741 | 114k | switch (hdr.opcode()) { |
742 | 3.71k | case PStreamHeader::ADD_SEGMENT: { |
743 | 3.71k | auto st = _append_data(hdr, data); |
744 | 3.71k | if (!st.ok()) { |
745 | 0 | _report_failure(id, st, hdr); |
746 | 3.71k | } else { |
747 | | // Report tablet load info only on ADD_SEGMENT to reduce frequency. |
748 | | // ADD_SEGMENT is sent once per segment, while APPEND_DATA is sent |
749 | | // for every data batch. This reduces unnecessary writes and avoids |
750 | | // potential stream write failures when the sender is closing. |
751 | 3.71k | _report_tablet_load_info(id, hdr.index_id()); |
752 | 3.71k | } |
753 | 3.71k | } break; |
754 | 106k | case PStreamHeader::APPEND_DATA: { |
755 | 106k | auto st = _append_data(hdr, data); |
756 | 106k | if (!st.ok()) { |
757 | 2 | _report_failure(id, st, hdr); |
758 | 2 | } |
759 | 106k | } break; |
760 | 4.02k | case PStreamHeader::CLOSE_LOAD: { |
761 | 4.02k | DBUG_EXECUTE_IF("LoadStream.close_load.block", DBUG_BLOCK); |
762 | 4.02k | std::vector<int64_t> success_tablet_ids; |
763 | 4.02k | FailedTablets failed_tablets; |
764 | 4.02k | std::vector<PTabletID> tablets_to_commit(hdr.tablets().begin(), hdr.tablets().end()); |
765 | 4.02k | bool all_closed = |
766 | 4.02k | close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets); |
767 | 4.02k | _report_result(id, Status::OK(), success_tablet_ids, failed_tablets, true); |
768 | 4.02k | std::lock_guard<bthread::Mutex> lock_guard(_lock); |
769 | | // if incremental stream, we need to wait for all non-incremental streams to be closed |
770 | | // before closing incremental streams. We need a fencing mechanism to avoid use after closing |
771 | | // across different be. |
772 | 4.02k | if (hdr.has_num_incremental_streams() && hdr.num_incremental_streams() > 0) { |
773 | 0 | _closing_stream_ids.push_back(id); |
774 | 4.02k | } else { |
775 | 4.02k | brpc::StreamClose(id); |
776 | 4.02k | } |
777 | | |
778 | 4.02k | if (all_closed) { |
779 | 2.08k | for (auto& closing_id : _closing_stream_ids) { |
780 | 0 | brpc::StreamClose(closing_id); |
781 | 0 | } |
782 | 2.08k | _closing_stream_ids.clear(); |
783 | 2.08k | } |
784 | 4.02k | } break; |
785 | 4 | case PStreamHeader::GET_SCHEMA: { |
786 | 4 | _report_schema(id, hdr); |
787 | 4 | } break; |
788 | 0 | default: |
789 | 0 | LOG(WARNING) << "unexpected stream message " << hdr.opcode() << ", " << *this; |
790 | 0 | DCHECK(false); |
791 | 114k | } |
792 | 114k | } |
793 | | |
794 | 0 | void LoadStream::on_idle_timeout(StreamId id) { |
795 | 0 | LOG(WARNING) << "closing load stream on idle timeout, " << *this; |
796 | 0 | brpc::StreamClose(id); |
797 | 0 | } |
798 | | |
799 | 4.03k | void LoadStream::on_closed(StreamId id) { |
800 | | // `this` may be freed by other threads after increasing `_close_rpc_cnt`, |
801 | | // format string first to prevent use-after-free |
802 | 4.03k | std::stringstream ss; |
803 | 4.03k | ss << *this; |
804 | 4.03k | auto remaining_streams = _total_streams - _close_rpc_cnt.fetch_add(1) - 1; |
805 | 4.03k | LOG(INFO) << "stream " << id << " on_closed, remaining streams = " << remaining_streams << ", " |
806 | 4.03k | << ss.str(); |
807 | 4.03k | if (remaining_streams == 0) { |
808 | 2.08k | _load_stream_mgr->clear_load(_load_id); |
809 | 2.08k | } |
810 | 4.03k | } |
811 | | |
812 | 18.3k | inline std::ostream& operator<<(std::ostream& ostr, const LoadStream& load_stream) { |
813 | 18.3k | ostr << "load_id=" << print_id(load_stream._load_id) << ", txn_id=" << load_stream._txn_id; |
814 | 18.3k | return ostr; |
815 | 18.3k | } |
816 | | |
817 | | } // namespace doris |