/root/doris/be/src/runtime/load_stream.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "runtime/load_stream.h" |
19 | | |
20 | | #include <brpc/stream.h> |
21 | | #include <bthread/bthread.h> |
22 | | #include <bthread/condition_variable.h> |
23 | | #include <bthread/mutex.h> |
24 | | #include <olap/rowset/rowset_factory.h> |
25 | | #include <olap/rowset/rowset_meta.h> |
26 | | #include <olap/storage_engine.h> |
27 | | #include <olap/tablet_manager.h> |
28 | | #include <runtime/exec_env.h> |
29 | | |
30 | | #include <memory> |
31 | | #include <sstream> |
32 | | |
33 | | #include "bvar/bvar.h" |
34 | | #include "cloud/config.h" |
35 | | #include "common/signal_handler.h" |
36 | | #include "exec/tablet_info.h" |
37 | | #include "gutil/ref_counted.h" |
38 | | #include "olap/tablet.h" |
39 | | #include "olap/tablet_fwd.h" |
40 | | #include "olap/tablet_schema.h" |
41 | | #include "runtime/exec_env.h" |
42 | | #include "runtime/fragment_mgr.h" |
43 | | #include "runtime/load_channel.h" |
44 | | #include "runtime/load_stream_mgr.h" |
45 | | #include "runtime/load_stream_writer.h" |
46 | | #include "runtime/workload_group/workload_group_manager.h" |
47 | | #include "util/debug_points.h" |
48 | | #include "util/runtime_profile.h" |
49 | | #include "util/thrift_util.h" |
50 | | #include "util/uid_util.h" |
51 | | |
52 | | #define UNKNOWN_ID_FOR_TEST 0x7c00 |
53 | | |
54 | | namespace doris { |
55 | | |
56 | | bvar::Adder<int64_t> g_load_stream_cnt("load_stream_count"); |
57 | | bvar::LatencyRecorder g_load_stream_flush_wait_ms("load_stream_flush_wait_ms"); |
58 | | bvar::Adder<int> g_load_stream_flush_running_threads("load_stream_flush_wait_threads"); |
59 | | |
60 | | TabletStream::TabletStream(const PUniqueId& load_id, int64_t id, int64_t txn_id, |
61 | | LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile) |
62 | | : _id(id), |
63 | | _next_segid(0), |
64 | | _load_id(load_id), |
65 | | _txn_id(txn_id), |
66 | 14 | _load_stream_mgr(load_stream_mgr) { |
67 | 14 | load_stream_mgr->create_token(_flush_token); |
68 | 14 | _profile = profile->create_child(fmt::format("TabletStream {}", id), true, true); |
69 | 14 | _append_data_timer = ADD_TIMER(_profile, "AppendDataTime"); Line | Count | Source | 60 | 14 | #define ADD_TIMER(profile, name) (profile)->add_counter(name, TUnit::TIME_NS) |
|
70 | 14 | _add_segment_timer = ADD_TIMER(_profile, "AddSegmentTime"); Line | Count | Source | 60 | 14 | #define ADD_TIMER(profile, name) (profile)->add_counter(name, TUnit::TIME_NS) |
|
71 | 14 | _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime"); Line | Count | Source | 60 | 14 | #define ADD_TIMER(profile, name) (profile)->add_counter(name, TUnit::TIME_NS) |
|
72 | 14 | } |
73 | | |
74 | 10 | inline std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream) { |
75 | 10 | ostr << "load_id=" << print_id(tablet_stream._load_id) << ", txn_id=" << tablet_stream._txn_id |
76 | 10 | << ", tablet_id=" << tablet_stream._id << ", status=" << tablet_stream._status.status(); |
77 | 10 | return ostr; |
78 | 10 | } |
79 | | |
80 | | Status TabletStream::init(std::shared_ptr<OlapTableSchemaParam> schema, int64_t index_id, |
81 | 14 | int64_t partition_id) { |
82 | 14 | WriteRequest req { |
83 | 14 | .tablet_id = _id, |
84 | 14 | .txn_id = _txn_id, |
85 | 14 | .index_id = index_id, |
86 | 14 | .partition_id = partition_id, |
87 | 14 | .load_id = _load_id, |
88 | 14 | .table_schema_param = schema, |
89 | | // TODO(plat1ko): write_file_cache |
90 | 14 | .storage_vault_id {}, |
91 | 14 | }; |
92 | | |
93 | 14 | _load_stream_writer = std::make_shared<LoadStreamWriter>(&req, _profile); |
94 | 14 | DBUG_EXECUTE_IF("TabletStream.init.uninited_writer", { Line | Count | Source | 37 | 14 | if (UNLIKELY(config::enable_debug_points)) { \ | 38 | 0 | auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \ | 39 | 0 | if (dp) { \ Branch (39:13): [True: 0, False: 0]
| 40 | 0 | [[maybe_unused]] auto DP_NAME = debug_point_name; \ | 41 | 0 | { code; } \ | 42 | 0 | } \ | 43 | 0 | } |
|
95 | 14 | _status.update(Status::Uninitialized("fault injection")); |
96 | 14 | return _status.status(); |
97 | 14 | }); |
98 | 14 | _status.update(_load_stream_writer->init()); |
99 | 14 | if (!_status.ok()) { Branch (99:9): [True: 1, False: 13]
|
100 | 1 | LOG(INFO) << "failed to init rowset builder due to " << *this; |
101 | 1 | } |
102 | 14 | return _status.status(); |
103 | 14 | } |
104 | | |
105 | 27 | Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data) { |
106 | 27 | if (!_status.ok()) { Branch (106:9): [True: 1, False: 26]
|
107 | 1 | return _status.status(); |
108 | 1 | } |
109 | | |
110 | | // dispatch add_segment request |
111 | 26 | if (header.opcode() == PStreamHeader::ADD_SEGMENT) { Branch (111:9): [True: 0, False: 26]
|
112 | 0 | return add_segment(header, data); |
113 | 0 | } |
114 | | |
115 | 26 | SCOPED_TIMER(_append_data_timer); Line | Count | Source | 69 | 26 | #define SCOPED_TIMER(c) ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c) Line | Count | Source | 52 | 26 | #define MACRO_CONCAT(x, y) CONCAT_IMPL(x, y) Line | Count | Source | 51 | 26 | #define CONCAT_IMPL(x, y) x##y |
|
|
|
116 | | |
117 | 26 | int64_t src_id = header.src_id(); |
118 | 26 | uint32_t segid = header.segment_id(); |
119 | | // Ensure there are enough space and mapping are built. |
120 | 26 | SegIdMapping* mapping = nullptr; |
121 | 26 | { |
122 | 26 | std::lock_guard lock_guard(_lock); |
123 | 26 | if (!_segids_mapping.contains(src_id)) { Branch (123:13): [True: 14, False: 12]
|
124 | 14 | _segids_mapping[src_id] = std::make_unique<SegIdMapping>(); |
125 | 14 | } |
126 | 26 | mapping = _segids_mapping[src_id].get(); |
127 | 26 | } |
128 | 26 | if (segid + 1 > mapping->size()) { Branch (128:9): [True: 15, False: 11]
|
129 | | // TODO: Each sender lock is enough. |
130 | 15 | std::lock_guard lock_guard(_lock); |
131 | 15 | ssize_t origin_size = mapping->size(); |
132 | 15 | if (segid + 1 > origin_size) { Branch (132:13): [True: 15, False: 0]
|
133 | 15 | mapping->resize(segid + 1, std::numeric_limits<uint32_t>::max()); |
134 | 39 | for (size_t index = origin_size; index <= segid; index++) { Branch (134:46): [True: 24, False: 15]
|
135 | 24 | mapping->at(index) = _next_segid; |
136 | 24 | _next_segid++; |
137 | 24 | VLOG_DEBUG << "src_id=" << src_id << ", segid=" << index << " to " Line | Count | Source | 41 | 0 | #define VLOG_DEBUG VLOG(7) |
|
138 | 0 | << " segid=" << _next_segid - 1 << ", " << *this; |
139 | 24 | } |
140 | 15 | } |
141 | 15 | } |
142 | | |
143 | | // Each sender sends data in one segment sequential, so we also do not |
144 | | // need a lock here. |
145 | 26 | bool eos = header.segment_eos(); |
146 | 26 | FileType file_type = header.file_type(); |
147 | 26 | uint32_t new_segid = mapping->at(segid); |
148 | 26 | DCHECK(new_segid != std::numeric_limits<uint32_t>::max()); |
149 | 26 | butil::IOBuf buf = data->movable(); |
150 | 26 | auto flush_func = [this, new_segid, eos, buf, header, file_type]() mutable { |
151 | 26 | signal::set_signal_task_id(_load_id); |
152 | 26 | g_load_stream_flush_running_threads << -1; |
153 | 26 | auto st = _load_stream_writer->append_data(new_segid, header.offset(), buf, file_type); |
154 | 26 | if (!st.ok() && !config::is_cloud_mode()) { Branch (154:13): [True: 1, False: 25]
Branch (154:25): [True: 1, False: 0]
|
155 | 1 | auto res = ExecEnv::get_tablet(_id); |
156 | 1 | TabletSharedPtr tablet = |
157 | 1 | res.has_value() ? std::dynamic_pointer_cast<Tablet>(res.value()) : nullptr; Branch (157:21): [True: 1, False: 0]
|
158 | 1 | if (tablet) { Branch (158:17): [True: 1, False: 0]
|
159 | 1 | tablet->report_error(st); |
160 | 1 | } |
161 | 1 | } |
162 | 26 | if (eos && st.ok()) { Branch (162:13): [True: 21, False: 5]
Branch (162:20): [True: 21, False: 0]
|
163 | 21 | DBUG_EXECUTE_IF("TabletStream.append_data.unknown_file_type", Line | Count | Source | 37 | 21 | if (UNLIKELY(config::enable_debug_points)) { \ | 38 | 0 | auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \ | 39 | 0 | if (dp) { \ Branch (39:13): [True: 0, False: 0]
| 40 | 0 | [[maybe_unused]] auto DP_NAME = debug_point_name; \ | 41 | 0 | { code; } \ | 42 | 0 | } \ | 43 | 0 | } |
|
164 | 21 | { file_type = static_cast<FileType>(-1); }); |
165 | 21 | if (file_type == FileType::SEGMENT_FILE || file_type == FileType::INVERTED_INDEX_FILE) { Branch (165:17): [True: 21, False: 0]
Branch (165:56): [True: 0, False: 0]
|
166 | 21 | st = _load_stream_writer->close_writer(new_segid, file_type); |
167 | 21 | } else { |
168 | 0 | st = Status::InternalError( |
169 | 0 | "appent data failed, file type error, file type = {}, " |
170 | 0 | "segment_id={}", |
171 | 0 | file_type, new_segid); |
172 | 0 | } |
173 | 21 | } |
174 | 26 | DBUG_EXECUTE_IF("TabletStream.append_data.append_failed", Line | Count | Source | 37 | 26 | if (UNLIKELY(config::enable_debug_points)) { \ | 38 | 0 | auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \ | 39 | 0 | if (dp) { \ Branch (39:13): [True: 0, False: 0]
| 40 | 0 | [[maybe_unused]] auto DP_NAME = debug_point_name; \ | 41 | 0 | { code; } \ | 42 | 0 | } \ | 43 | 0 | } |
|
175 | 26 | { st = Status::InternalError("fault injection"); }); |
176 | 26 | if (!st.ok()) { Branch (176:13): [True: 2, False: 24]
|
177 | 2 | _status.update(st); |
178 | 2 | LOG(WARNING) << "write data failed " << st << ", " << *this; |
179 | 2 | } |
180 | 26 | }; |
181 | 26 | auto load_stream_flush_token_max_tasks = config::load_stream_flush_token_max_tasks; |
182 | 26 | auto load_stream_max_wait_flush_token_time_ms = |
183 | 26 | config::load_stream_max_wait_flush_token_time_ms; |
184 | 26 | DBUG_EXECUTE_IF("TabletStream.append_data.long_wait", { Line | Count | Source | 37 | 26 | if (UNLIKELY(config::enable_debug_points)) { \ | 38 | 0 | auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \ | 39 | 0 | if (dp) { \ Branch (39:13): [True: 0, False: 0]
| 40 | 0 | [[maybe_unused]] auto DP_NAME = debug_point_name; \ | 41 | 0 | { code; } \ | 42 | 0 | } \ | 43 | 0 | } |
|
185 | 26 | load_stream_flush_token_max_tasks = 0; |
186 | 26 | load_stream_max_wait_flush_token_time_ms = 1000; |
187 | 26 | }); |
188 | 26 | MonotonicStopWatch timer; |
189 | 26 | timer.start(); |
190 | 26 | while (_flush_token->num_tasks() >= load_stream_flush_token_max_tasks) { Branch (190:12): [True: 0, False: 26]
|
191 | 0 | if (timer.elapsed_time() / 1000 / 1000 >= load_stream_max_wait_flush_token_time_ms) { Branch (191:13): [True: 0, False: 0]
|
192 | 0 | _status.update( |
193 | 0 | Status::Error<true>("wait flush token back pressure time is more than " |
194 | 0 | "load_stream_max_wait_flush_token_time {}", |
195 | 0 | load_stream_max_wait_flush_token_time_ms)); |
196 | 0 | return _status.status(); |
197 | 0 | } |
198 | 0 | bthread_usleep(2 * 1000); // 2ms |
199 | 0 | } |
200 | 26 | timer.stop(); |
201 | 26 | int64_t time_ms = timer.elapsed_time() / 1000 / 1000; |
202 | 26 | g_load_stream_flush_wait_ms << time_ms; |
203 | 26 | g_load_stream_flush_running_threads << 1; |
204 | 26 | Status st = Status::OK(); |
205 | 26 | DBUG_EXECUTE_IF("TabletStream.append_data.submit_func_failed", Line | Count | Source | 37 | 26 | if (UNLIKELY(config::enable_debug_points)) { \ | 38 | 0 | auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \ | 39 | 0 | if (dp) { \ Branch (39:13): [True: 0, False: 0]
| 40 | 0 | [[maybe_unused]] auto DP_NAME = debug_point_name; \ | 41 | 0 | { code; } \ | 42 | 0 | } \ | 43 | 0 | } |
|
206 | 26 | { st = Status::InternalError("fault injection"); }); |
207 | 26 | if (st.ok()) { Branch (207:9): [True: 26, False: 0]
|
208 | 26 | st = _flush_token->submit_func(flush_func); |
209 | 26 | } |
210 | 26 | if (!st.ok()) { Branch (210:9): [True: 0, False: 26]
|
211 | 0 | _status.update(st); |
212 | 0 | } |
213 | 26 | return _status.status(); |
214 | 26 | } |
215 | | |
216 | 0 | Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data) { |
217 | 0 | if (!_status.ok()) { Branch (217:9): [True: 0, False: 0]
|
218 | 0 | return _status.status(); |
219 | 0 | } |
220 | | |
221 | 0 | SCOPED_TIMER(_add_segment_timer); Line | Count | Source | 69 | 0 | #define SCOPED_TIMER(c) ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c) Line | Count | Source | 52 | 0 | #define MACRO_CONCAT(x, y) CONCAT_IMPL(x, y) Line | Count | Source | 51 | 0 | #define CONCAT_IMPL(x, y) x##y |
|
|
|
222 | 0 | DCHECK(header.has_segment_statistics()); |
223 | 0 | SegmentStatistics stat(header.segment_statistics()); |
224 | 0 | TabletSchemaSPtr flush_schema; |
225 | 0 | if (header.has_flush_schema()) { Branch (225:9): [True: 0, False: 0]
|
226 | 0 | flush_schema = std::make_shared<TabletSchema>(); |
227 | 0 | flush_schema->init_from_pb(header.flush_schema()); |
228 | 0 | } |
229 | |
|
230 | 0 | int64_t src_id = header.src_id(); |
231 | 0 | uint32_t segid = header.segment_id(); |
232 | 0 | uint32_t new_segid; |
233 | 0 | DBUG_EXECUTE_IF("TabletStream.add_segment.unknown_segid", { segid = UNKNOWN_ID_FOR_TEST; }); Line | Count | Source | 37 | 0 | if (UNLIKELY(config::enable_debug_points)) { \ | 38 | 0 | auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \ | 39 | 0 | if (dp) { \ Branch (39:13): [True: 0, False: 0]
| 40 | 0 | [[maybe_unused]] auto DP_NAME = debug_point_name; \ | 41 | 0 | { code; } \ | 42 | 0 | } \ | 43 | 0 | } |
|
234 | 0 | { |
235 | 0 | std::lock_guard lock_guard(_lock); |
236 | 0 | if (!_segids_mapping.contains(src_id)) { Branch (236:13): [True: 0, False: 0]
|
237 | 0 | _status.update(Status::InternalError( |
238 | 0 | "add segment failed, no segment written by this src be yet, src_id={}, " |
239 | 0 | "segment_id={}", |
240 | 0 | src_id, segid)); |
241 | 0 | return _status.status(); |
242 | 0 | } |
243 | 0 | DBUG_EXECUTE_IF("TabletStream.add_segment.segid_never_written", Line | Count | Source | 37 | 0 | if (UNLIKELY(config::enable_debug_points)) { \ | 38 | 0 | auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \ | 39 | 0 | if (dp) { \ Branch (39:13): [True: 0, False: 0]
| 40 | 0 | [[maybe_unused]] auto DP_NAME = debug_point_name; \ | 41 | 0 | { code; } \ | 42 | 0 | } \ | 43 | 0 | } |
|
244 | 0 | { segid = _segids_mapping[src_id]->size(); }); |
245 | 0 | if (segid >= _segids_mapping[src_id]->size()) { Branch (245:13): [True: 0, False: 0]
|
246 | 0 | _status.update(Status::InternalError( |
247 | 0 | "add segment failed, segment is never written, src_id={}, segment_id={}", |
248 | 0 | src_id, segid)); |
249 | 0 | return _status.status(); |
250 | 0 | } |
251 | 0 | new_segid = _segids_mapping[src_id]->at(segid); |
252 | 0 | } |
253 | 0 | DCHECK(new_segid != std::numeric_limits<uint32_t>::max()); |
254 | |
|
255 | 0 | auto add_segment_func = [this, new_segid, stat, flush_schema]() { |
256 | 0 | signal::set_signal_task_id(_load_id); |
257 | 0 | auto st = _load_stream_writer->add_segment(new_segid, stat, flush_schema); |
258 | 0 | DBUG_EXECUTE_IF("TabletStream.add_segment.add_segment_failed", Line | Count | Source | 37 | 0 | if (UNLIKELY(config::enable_debug_points)) { \ | 38 | 0 | auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \ | 39 | 0 | if (dp) { \ Branch (39:13): [True: 0, False: 0]
| 40 | 0 | [[maybe_unused]] auto DP_NAME = debug_point_name; \ | 41 | 0 | { code; } \ | 42 | 0 | } \ | 43 | 0 | } |
|
259 | 0 | { st = Status::InternalError("fault injection"); }); |
260 | 0 | if (!st.ok()) { Branch (260:13): [True: 0, False: 0]
|
261 | 0 | _status.update(st); |
262 | 0 | LOG(INFO) << "add segment failed " << *this; |
263 | 0 | } |
264 | 0 | }; |
265 | 0 | Status st = Status::OK(); |
266 | 0 | DBUG_EXECUTE_IF("TabletStream.add_segment.submit_func_failed", Line | Count | Source | 37 | 0 | if (UNLIKELY(config::enable_debug_points)) { \ | 38 | 0 | auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \ | 39 | 0 | if (dp) { \ Branch (39:13): [True: 0, False: 0]
| 40 | 0 | [[maybe_unused]] auto DP_NAME = debug_point_name; \ | 41 | 0 | { code; } \ | 42 | 0 | } \ | 43 | 0 | } |
|
267 | 0 | { st = Status::InternalError("fault injection"); }); |
268 | 0 | if (st.ok()) { Branch (268:9): [True: 0, False: 0]
|
269 | 0 | st = _flush_token->submit_func(add_segment_func); |
270 | 0 | } |
271 | 0 | if (!st.ok()) { Branch (271:9): [True: 0, False: 0]
|
272 | 0 | _status.update(st); |
273 | 0 | } |
274 | 0 | return _status.status(); |
275 | 0 | } |
276 | | |
277 | 29 | Status TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) { |
278 | 29 | bthread::Mutex mu; |
279 | 29 | std::unique_lock<bthread::Mutex> lock(mu); |
280 | 29 | bthread::ConditionVariable cv; |
281 | 29 | auto st = Status::OK(); |
282 | 29 | auto func = [this, &mu, &cv, &st, &fn] { |
283 | 29 | signal::set_signal_task_id(_load_id); |
284 | 29 | st = fn(); |
285 | 29 | std::lock_guard<bthread::Mutex> lock(mu); |
286 | 29 | cv.notify_one(); |
287 | 29 | }; |
288 | 29 | bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(func); |
289 | 29 | if (!ret) { Branch (289:9): [True: 0, False: 29]
|
290 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
291 | 0 | "there is not enough thread resource for close load"); |
292 | 0 | } |
293 | 29 | cv.wait(lock); |
294 | 29 | return st; |
295 | 29 | } |
296 | | |
297 | 14 | void TabletStream::pre_close() { |
298 | 14 | if (!_status.ok()) { Branch (298:9): [True: 2, False: 12]
|
299 | | // cancel all pending tasks, wait all running tasks to finish |
300 | 2 | _flush_token->shutdown(); |
301 | 2 | return; |
302 | 2 | } |
303 | | |
304 | 12 | SCOPED_TIMER(_close_wait_timer); Line | Count | Source | 69 | 12 | #define SCOPED_TIMER(c) ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c) Line | Count | Source | 52 | 12 | #define MACRO_CONCAT(x, y) CONCAT_IMPL(x, y) Line | Count | Source | 51 | 12 | #define CONCAT_IMPL(x, y) x##y |
|
|
|
305 | 12 | _status.update(_run_in_heavy_work_pool([this]() { |
306 | 12 | _flush_token->wait(); |
307 | 12 | return Status::OK(); |
308 | 12 | })); |
309 | | // it is necessary to check status after wait_func, |
310 | | // for create_rowset could fail during add_segment when loading to MOW table, |
311 | | // in this case, should skip close to avoid submit_calc_delete_bitmap_task which could cause coredump. |
312 | 12 | if (!_status.ok()) { Branch (312:9): [True: 1, False: 11]
|
313 | 1 | return; |
314 | 1 | } |
315 | | |
316 | 11 | DBUG_EXECUTE_IF("TabletStream.close.segment_num_mismatch", { _num_segments++; }); Line | Count | Source | 37 | 11 | if (UNLIKELY(config::enable_debug_points)) { \ | 38 | 0 | auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \ | 39 | 0 | if (dp) { \ Branch (39:13): [True: 0, False: 0]
| 40 | 0 | [[maybe_unused]] auto DP_NAME = debug_point_name; \ | 41 | 0 | { code; } \ | 42 | 0 | } \ | 43 | 0 | } |
|
317 | 11 | if (_check_num_segments && (_next_segid.load() != _num_segments)) { Branch (317:9): [True: 11, False: 0]
Branch (317:32): [True: 2, False: 9]
|
318 | 2 | _status.update(Status::Corruption( |
319 | 2 | "segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id, |
320 | 2 | _num_segments, _next_segid.load(), print_id(_load_id))); |
321 | 2 | return; |
322 | 2 | } |
323 | | |
324 | 9 | _status.update(_run_in_heavy_work_pool([this]() { return _load_stream_writer->pre_close(); })); |
325 | 9 | } |
326 | | |
327 | 14 | Status TabletStream::close() { |
328 | 14 | if (!_status.ok()) { Branch (328:9): [True: 6, False: 8]
|
329 | 6 | return _status.status(); |
330 | 6 | } |
331 | | |
332 | 8 | SCOPED_TIMER(_close_wait_timer); Line | Count | Source | 69 | 8 | #define SCOPED_TIMER(c) ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c) Line | Count | Source | 52 | 8 | #define MACRO_CONCAT(x, y) CONCAT_IMPL(x, y) Line | Count | Source | 51 | 8 | #define CONCAT_IMPL(x, y) x##y |
|
|
|
333 | 8 | _status.update(_run_in_heavy_work_pool([this]() { return _load_stream_writer->close(); })); |
334 | 8 | return _status.status(); |
335 | 14 | } |
336 | | |
337 | | IndexStream::IndexStream(const PUniqueId& load_id, int64_t id, int64_t txn_id, |
338 | | std::shared_ptr<OlapTableSchemaParam> schema, |
339 | | LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile) |
340 | | : _id(id), |
341 | | _load_id(load_id), |
342 | | _txn_id(txn_id), |
343 | | _schema(schema), |
344 | 28 | _load_stream_mgr(load_stream_mgr) { |
345 | 28 | _profile = profile->create_child(fmt::format("IndexStream {}", id), true, true); |
346 | 28 | _append_data_timer = ADD_TIMER(_profile, "AppendDataTime"); Line | Count | Source | 60 | 28 | #define ADD_TIMER(profile, name) (profile)->add_counter(name, TUnit::TIME_NS) |
|
347 | 28 | _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime"); Line | Count | Source | 60 | 28 | #define ADD_TIMER(profile, name) (profile)->add_counter(name, TUnit::TIME_NS) |
|
348 | 28 | } |
349 | | |
350 | 27 | Status IndexStream::append_data(const PStreamHeader& header, butil::IOBuf* data) { |
351 | 27 | SCOPED_TIMER(_append_data_timer); Line | Count | Source | 69 | 27 | #define SCOPED_TIMER(c) ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c) Line | Count | Source | 52 | 27 | #define MACRO_CONCAT(x, y) CONCAT_IMPL(x, y) Line | Count | Source | 51 | 27 | #define CONCAT_IMPL(x, y) x##y |
|
|
|
352 | 27 | int64_t tablet_id = header.tablet_id(); |
353 | 27 | TabletStreamSharedPtr tablet_stream; |
354 | 27 | { |
355 | 27 | std::lock_guard lock_guard(_lock); |
356 | 27 | auto it = _tablet_streams_map.find(tablet_id); |
357 | 27 | if (it == _tablet_streams_map.end()) { Branch (357:13): [True: 13, False: 14]
|
358 | 13 | _init_tablet_stream(tablet_stream, tablet_id, header.partition_id()); |
359 | 14 | } else { |
360 | 14 | tablet_stream = it->second; |
361 | 14 | } |
362 | 27 | } |
363 | | |
364 | 27 | return tablet_stream->append_data(header, data); |
365 | 27 | } |
366 | | |
367 | | void IndexStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id, |
368 | 14 | int64_t partition_id) { |
369 | 14 | tablet_stream = std::make_shared<TabletStream>(_load_id, tablet_id, _txn_id, _load_stream_mgr, |
370 | 14 | _profile); |
371 | 14 | _tablet_streams_map[tablet_id] = tablet_stream; |
372 | 14 | auto st = tablet_stream->init(_schema, _id, partition_id); |
373 | 14 | if (!st.ok()) { Branch (373:9): [True: 1, False: 13]
|
374 | 1 | LOG(WARNING) << "tablet stream init failed " << *tablet_stream; |
375 | 1 | } |
376 | 14 | } |
377 | | |
378 | | void IndexStream::close(const std::vector<PTabletID>& tablets_to_commit, |
379 | 28 | std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) { |
380 | 28 | std::lock_guard lock_guard(_lock); |
381 | 28 | SCOPED_TIMER(_close_wait_timer); Line | Count | Source | 69 | 28 | #define SCOPED_TIMER(c) ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c) Line | Count | Source | 52 | 28 | #define MACRO_CONCAT(x, y) CONCAT_IMPL(x, y) Line | Count | Source | 51 | 28 | #define CONCAT_IMPL(x, y) x##y |
|
|
|
382 | | // open all need commit tablets |
383 | 34 | for (const auto& tablet : tablets_to_commit) { Branch (383:29): [True: 34, False: 28]
|
384 | 34 | if (_id != tablet.index_id()) { Branch (384:13): [True: 18, False: 16]
|
385 | 18 | continue; |
386 | 18 | } |
387 | 16 | TabletStreamSharedPtr tablet_stream; |
388 | 16 | auto it = _tablet_streams_map.find(tablet.tablet_id()); |
389 | 16 | if (it == _tablet_streams_map.end()) { Branch (389:13): [True: 1, False: 15]
|
390 | 1 | _init_tablet_stream(tablet_stream, tablet.tablet_id(), tablet.partition_id()); |
391 | 15 | } else { |
392 | 15 | tablet_stream = it->second; |
393 | 15 | } |
394 | 16 | if (tablet.has_num_segments()) { Branch (394:13): [True: 16, False: 0]
|
395 | 16 | tablet_stream->add_num_segments(tablet.num_segments()); |
396 | 16 | } else { |
397 | | // for compatibility reasons (sink from old version BE) |
398 | 0 | tablet_stream->disable_num_segments_check(); |
399 | 0 | } |
400 | 16 | } |
401 | | |
402 | 28 | for (auto& [_, tablet_stream] : _tablet_streams_map) { Branch (402:35): [True: 14, False: 28]
|
403 | 14 | tablet_stream->pre_close(); |
404 | 14 | } |
405 | | |
406 | 28 | for (auto& [_, tablet_stream] : _tablet_streams_map) { Branch (406:35): [True: 14, False: 28]
|
407 | 14 | auto st = tablet_stream->close(); |
408 | 14 | if (st.ok()) { Branch (408:13): [True: 8, False: 6]
|
409 | 8 | success_tablet_ids->push_back(tablet_stream->id()); |
410 | 8 | } else { |
411 | 6 | LOG(INFO) << "close tablet stream " << *tablet_stream << ", status=" << st; |
412 | 6 | failed_tablets->emplace_back(tablet_stream->id(), st); |
413 | 6 | } |
414 | 14 | } |
415 | 28 | } |
416 | | |
417 | | // TODO: Profile is temporary disabled, because: |
418 | | // 1. It's not being processed by the upstream for now |
419 | | // 2. There are some problems in _profile->to_thrift() |
420 | | LoadStream::LoadStream(const PUniqueId& load_id, LoadStreamMgr* load_stream_mgr, |
421 | | bool enable_profile) |
422 | 14 | : _load_id(load_id), _enable_profile(false), _load_stream_mgr(load_stream_mgr) { |
423 | 14 | g_load_stream_cnt << 1; |
424 | 14 | _profile = std::make_unique<RuntimeProfile>("LoadStream"); |
425 | 14 | _append_data_timer = ADD_TIMER(_profile, "AppendDataTime"); Line | Count | Source | 60 | 14 | #define ADD_TIMER(profile, name) (profile)->add_counter(name, TUnit::TIME_NS) |
|
426 | 14 | _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime"); Line | Count | Source | 60 | 14 | #define ADD_TIMER(profile, name) (profile)->add_counter(name, TUnit::TIME_NS) |
|
427 | 14 | TUniqueId load_tid = ((UniqueId)load_id).to_thrift(); |
428 | | #ifndef BE_TEST |
429 | | std::shared_ptr<QueryContext> query_context = |
430 | | ExecEnv::GetInstance()->fragment_mgr()->get_query_ctx(load_tid); |
431 | | if (query_context != nullptr) { |
432 | | _query_thread_context = {load_tid, query_context->query_mem_tracker, |
433 | | query_context->workload_group()}; |
434 | | } else { |
435 | | _query_thread_context = {load_tid, MemTrackerLimiter::create_shared( |
436 | | MemTrackerLimiter::Type::LOAD, |
437 | | fmt::format("(FromLoadStream)Load#Id={}", |
438 | | ((UniqueId)load_id).to_string()))}; |
439 | | } |
440 | | #else |
441 | 14 | _query_thread_context = {load_tid, MemTrackerLimiter::create_shared( |
442 | 14 | MemTrackerLimiter::Type::LOAD, |
443 | 14 | fmt::format("(FromLoadStream)Load#Id={}", |
444 | 14 | ((UniqueId)load_id).to_string()))}; |
445 | 14 | #endif |
446 | 14 | } |
447 | | |
448 | 14 | LoadStream::~LoadStream() { |
449 | 14 | g_load_stream_cnt << -1; |
450 | 14 | LOG(INFO) << "load stream is deconstructed " << *this; |
451 | 14 | } |
452 | | |
453 | 14 | Status LoadStream::init(const POpenLoadStreamRequest* request) { |
454 | 14 | _txn_id = request->txn_id(); |
455 | 14 | _total_streams = request->total_streams(); |
456 | 14 | _is_incremental = (_total_streams == 0); |
457 | | |
458 | 14 | _schema = std::make_shared<OlapTableSchemaParam>(); |
459 | 14 | RETURN_IF_ERROR(_schema->init(request->schema())); |
460 | 28 | for (auto& index : request->schema().indexes()) { Branch (460:22): [True: 28, False: 14]
|
461 | 28 | _index_streams_map[index.id()] = std::make_shared<IndexStream>( |
462 | 28 | _load_id, index.id(), _txn_id, _schema, _load_stream_mgr, _profile.get()); |
463 | 28 | } |
464 | 14 | LOG(INFO) << "succeed to init load stream " << *this; |
465 | 14 | return Status::OK(); |
466 | 14 | } |
467 | | |
468 | | void LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit, |
469 | 16 | std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) { |
470 | 16 | std::lock_guard<bthread::Mutex> lock_guard(_lock); |
471 | 16 | SCOPED_TIMER(_close_wait_timer); Line | Count | Source | 69 | 16 | #define SCOPED_TIMER(c) ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c) Line | Count | Source | 52 | 16 | #define MACRO_CONCAT(x, y) CONCAT_IMPL(x, y) Line | Count | Source | 51 | 16 | #define CONCAT_IMPL(x, y) x##y |
|
|
|
472 | | |
473 | | // we do nothing until recv CLOSE_LOAD from all stream to ensure all data are handled before ack |
474 | 16 | _open_streams[src_id]--; |
475 | 16 | if (_open_streams[src_id] == 0) { Branch (475:9): [True: 16, False: 0]
|
476 | 16 | _open_streams.erase(src_id); |
477 | 16 | } |
478 | 16 | _close_load_cnt++; |
479 | 16 | LOG(INFO) << "received CLOSE_LOAD from sender " << src_id << ", remaining " |
480 | 16 | << _total_streams - _close_load_cnt << " senders, " << *this; |
481 | | |
482 | 16 | _tablets_to_commit.insert(_tablets_to_commit.end(), tablets_to_commit.begin(), |
483 | 16 | tablets_to_commit.end()); |
484 | | |
485 | 16 | if (_close_load_cnt < _total_streams) { Branch (485:9): [True: 2, False: 14]
|
486 | | // do not return commit info if there is remaining streams. |
487 | 2 | return; |
488 | 2 | } |
489 | | |
490 | 28 | for (auto& [_, index_stream] : _index_streams_map) { Branch (490:34): [True: 28, False: 14]
|
491 | 28 | index_stream->close(_tablets_to_commit, success_tablet_ids, failed_tablets); |
492 | 28 | } |
493 | 14 | LOG(INFO) << "close load " << *this << ", success_tablet_num=" << success_tablet_ids->size() |
494 | 14 | << ", failed_tablet_num=" << failed_tablets->size(); |
495 | 14 | } |
496 | | |
497 | | void LoadStream::_report_result(StreamId stream, const Status& status, |
498 | | const std::vector<int64_t>& success_tablet_ids, |
499 | 36 | const FailedTablets& failed_tablets, bool eos) { |
500 | 36 | LOG(INFO) << "report result " << *this << ", success tablet num " << success_tablet_ids.size() |
501 | 36 | << ", failed tablet num " << failed_tablets.size(); |
502 | 36 | butil::IOBuf buf; |
503 | 36 | PLoadStreamResponse response; |
504 | 36 | response.set_eos(eos); |
505 | 36 | status.to_protobuf(response.mutable_status()); |
506 | 36 | for (auto& id : success_tablet_ids) { Branch (506:19): [True: 8, False: 36]
|
507 | 8 | response.add_success_tablet_ids(id); |
508 | 8 | } |
509 | 36 | for (auto& [id, st] : failed_tablets) { Branch (509:25): [True: 10, False: 36]
|
510 | 10 | auto pb = response.add_failed_tablets(); |
511 | 10 | pb->set_id(id); |
512 | 10 | st.to_protobuf(pb->mutable_status()); |
513 | 10 | } |
514 | | |
515 | 36 | if (_enable_profile && _close_load_cnt == _total_streams) { Branch (515:9): [True: 0, False: 36]
Branch (515:28): [True: 0, False: 0]
|
516 | 0 | TRuntimeProfileTree tprofile; |
517 | 0 | ThriftSerializer ser(false, 4096); |
518 | 0 | uint8_t* buf = nullptr; |
519 | 0 | uint32_t len = 0; |
520 | 0 | std::unique_lock<bthread::Mutex> l(_lock); |
521 | |
|
522 | 0 | _profile->to_thrift(&tprofile); |
523 | 0 | auto st = ser.serialize(&tprofile, &len, &buf); |
524 | 0 | if (st.ok()) { Branch (524:13): [True: 0, False: 0]
|
525 | 0 | response.set_load_stream_profile(buf, len); |
526 | 0 | } else { |
527 | 0 | LOG(WARNING) << "TRuntimeProfileTree serialize failed, errmsg=" << st << ", " << *this; |
528 | 0 | } |
529 | 0 | } |
530 | | |
531 | 36 | buf.append(response.SerializeAsString()); |
532 | 36 | auto wst = _write_stream(stream, buf); |
533 | 36 | if (!wst.ok()) { Branch (533:9): [True: 0, False: 36]
|
534 | 0 | LOG(WARNING) << " report result failed with " << wst << ", " << *this; |
535 | 0 | } |
536 | 36 | } |
537 | | |
538 | 0 | void LoadStream::_report_schema(StreamId stream, const PStreamHeader& hdr) { |
539 | 0 | butil::IOBuf buf; |
540 | 0 | PLoadStreamResponse response; |
541 | 0 | Status st = Status::OK(); |
542 | 0 | for (const auto& req : hdr.tablets()) { Branch (542:26): [True: 0, False: 0]
|
543 | 0 | BaseTabletSPtr tablet; |
544 | 0 | if (auto res = ExecEnv::get_tablet(req.tablet_id()); res.has_value()) { Branch (544:62): [True: 0, False: 0]
|
545 | 0 | tablet = std::move(res).value(); |
546 | 0 | } else { |
547 | 0 | st = std::move(res).error(); |
548 | 0 | break; |
549 | 0 | } |
550 | 0 | auto* resp = response.add_tablet_schemas(); |
551 | 0 | resp->set_index_id(req.index_id()); |
552 | 0 | resp->set_enable_unique_key_merge_on_write(tablet->enable_unique_key_merge_on_write()); |
553 | 0 | tablet->tablet_schema()->to_schema_pb(resp->mutable_tablet_schema()); |
554 | 0 | } |
555 | 0 | st.to_protobuf(response.mutable_status()); |
556 | |
|
557 | 0 | buf.append(response.SerializeAsString()); |
558 | 0 | auto wst = _write_stream(stream, buf); |
559 | 0 | if (!wst.ok()) { Branch (559:9): [True: 0, False: 0]
|
560 | 0 | LOG(WARNING) << " report result failed with " << wst << ", " << *this; |
561 | 0 | } |
562 | 0 | } |
563 | | |
564 | 36 | Status LoadStream::_write_stream(StreamId stream, butil::IOBuf& buf) { |
565 | 36 | for (;;) { |
566 | 36 | int ret = 0; |
567 | 36 | DBUG_EXECUTE_IF("LoadStream._write_stream.EAGAIN", { ret = EAGAIN; }); Line | Count | Source | 37 | 36 | if (UNLIKELY(config::enable_debug_points)) { \ | 38 | 0 | auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \ | 39 | 0 | if (dp) { \ Branch (39:13): [True: 0, False: 0]
| 40 | 0 | [[maybe_unused]] auto DP_NAME = debug_point_name; \ | 41 | 0 | { code; } \ | 42 | 0 | } \ | 43 | 0 | } |
|
568 | 36 | if (ret == 0) { Branch (568:13): [True: 36, False: 0]
|
569 | 36 | ret = brpc::StreamWrite(stream, buf); |
570 | 36 | } |
571 | 36 | switch (ret) { |
572 | 36 | case 0: Branch (572:9): [True: 36, False: 0]
|
573 | 36 | return Status::OK(); |
574 | 0 | case EAGAIN: { Branch (574:9): [True: 0, False: 36]
|
575 | 0 | const timespec time = butil::seconds_from_now(config::load_stream_eagain_wait_seconds); |
576 | 0 | int wait_ret = brpc::StreamWait(stream, &time); |
577 | 0 | if (wait_ret != 0) { Branch (577:17): [True: 0, False: 0]
|
578 | 0 | return Status::InternalError("StreamWait failed, err={}", wait_ret); |
579 | 0 | } |
580 | 0 | break; |
581 | 0 | } |
582 | 0 | default: Branch (582:9): [True: 0, False: 36]
|
583 | 0 | return Status::InternalError("StreamWrite failed, err={}", ret); |
584 | 36 | } |
585 | 36 | } |
586 | 0 | return Status::OK(); |
587 | 36 | } |
588 | | |
589 | 62 | void LoadStream::_parse_header(butil::IOBuf* const message, PStreamHeader& hdr) { |
590 | 62 | butil::IOBufAsZeroCopyInputStream wrapper(*message); |
591 | 62 | hdr.ParseFromZeroCopyStream(&wrapper); |
592 | 62 | VLOG_DEBUG << "header parse result: " << hdr.DebugString(); Line | Count | Source | 41 | 0 | #define VLOG_DEBUG VLOG(7) |
|
593 | 62 | } |
594 | | |
595 | 28 | Status LoadStream::_append_data(const PStreamHeader& header, butil::IOBuf* data) { |
596 | 28 | SCOPED_TIMER(_append_data_timer); Line | Count | Source | 69 | 28 | #define SCOPED_TIMER(c) ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c) Line | Count | Source | 52 | 28 | #define MACRO_CONCAT(x, y) CONCAT_IMPL(x, y) Line | Count | Source | 51 | 28 | #define CONCAT_IMPL(x, y) x##y |
|
|
|
597 | 28 | IndexStreamSharedPtr index_stream; |
598 | | |
599 | 28 | int64_t index_id = header.index_id(); |
600 | 28 | DBUG_EXECUTE_IF("TabletStream._append_data.unknown_indexid", Line | Count | Source | 37 | 28 | if (UNLIKELY(config::enable_debug_points)) { \ | 38 | 0 | auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \ | 39 | 0 | if (dp) { \ Branch (39:13): [True: 0, False: 0]
| 40 | 0 | [[maybe_unused]] auto DP_NAME = debug_point_name; \ | 41 | 0 | { code; } \ | 42 | 0 | } \ | 43 | 0 | } |
|
601 | 28 | { index_id = UNKNOWN_ID_FOR_TEST; }); |
602 | 28 | auto it = _index_streams_map.find(index_id); |
603 | 28 | if (it == _index_streams_map.end()) { Branch (603:9): [True: 1, False: 27]
|
604 | 1 | return Status::Error<ErrorCode::INVALID_ARGUMENT>("unknown index_id {}", index_id); |
605 | 27 | } else { |
606 | 27 | index_stream = it->second; |
607 | 27 | } |
608 | | |
609 | 27 | return index_stream->append_data(header, data); |
610 | 28 | } |
611 | | |
612 | 49 | int LoadStream::on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) { |
613 | 49 | VLOG_DEBUG << "on_received_messages " << id << " " << size; Line | Count | Source | 41 | 0 | #define VLOG_DEBUG VLOG(7) |
|
614 | 111 | for (size_t i = 0; i < size; ++i) { Branch (614:24): [True: 62, False: 49]
|
615 | 124 | while (messages[i]->size() > 0) { Branch (615:16): [True: 62, False: 62]
|
616 | | // step 1: parse header |
617 | 62 | size_t hdr_len = 0; |
618 | 62 | messages[i]->cutn((void*)&hdr_len, sizeof(size_t)); |
619 | 62 | butil::IOBuf hdr_buf; |
620 | 62 | PStreamHeader hdr; |
621 | 62 | messages[i]->cutn(&hdr_buf, hdr_len); |
622 | 62 | _parse_header(&hdr_buf, hdr); |
623 | | |
624 | | // step 2: cut data |
625 | 62 | size_t data_len = 0; |
626 | 62 | messages[i]->cutn((void*)&data_len, sizeof(size_t)); |
627 | 62 | butil::IOBuf data_buf; |
628 | 62 | PStreamHeader data; |
629 | 62 | messages[i]->cutn(&data_buf, data_len); |
630 | | |
631 | | // step 3: dispatch |
632 | 62 | _dispatch(id, hdr, &data_buf); |
633 | 62 | } |
634 | 62 | } |
635 | 49 | return 0; |
636 | 49 | } |
637 | | |
638 | 62 | void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* data) { |
639 | 62 | VLOG_DEBUG << PStreamHeader_Opcode_Name(hdr.opcode()) << " from " << hdr.src_id() Line | Count | Source | 41 | 0 | #define VLOG_DEBUG VLOG(7) |
|
640 | 0 | << " with tablet " << hdr.tablet_id(); |
641 | 62 | SCOPED_ATTACH_TASK(_query_thread_context); Line | Count | Source | 74 | 62 | auto VARNAME_LINENUM(scoped_tls_at) = doris::ScopedInitThreadContext() |
|
642 | | // CLOSE_LOAD message should not be fault injected, |
643 | | // otherwise the message will be ignored and causing close wait timeout |
644 | 62 | if (hdr.opcode() != PStreamHeader::CLOSE_LOAD) { Branch (644:9): [True: 30, False: 32]
|
645 | 30 | DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_loadid", { Line | Count | Source | 37 | 30 | if (UNLIKELY(config::enable_debug_points)) { \ | 38 | 0 | auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \ | 39 | 0 | if (dp) { \ Branch (39:13): [True: 0, False: 0]
| 40 | 0 | [[maybe_unused]] auto DP_NAME = debug_point_name; \ | 41 | 0 | { code; } \ | 42 | 0 | } \ | 43 | 0 | } |
|
646 | 30 | PStreamHeader& t_hdr = const_cast<PStreamHeader&>(hdr); |
647 | 30 | PUniqueId* load_id = t_hdr.mutable_load_id(); |
648 | 30 | load_id->set_hi(UNKNOWN_ID_FOR_TEST); |
649 | 30 | load_id->set_lo(UNKNOWN_ID_FOR_TEST); |
650 | 30 | }); |
651 | 30 | DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_srcid", { Line | Count | Source | 37 | 30 | if (UNLIKELY(config::enable_debug_points)) { \ | 38 | 0 | auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \ | 39 | 0 | if (dp) { \ Branch (39:13): [True: 0, False: 0]
| 40 | 0 | [[maybe_unused]] auto DP_NAME = debug_point_name; \ | 41 | 0 | { code; } \ | 42 | 0 | } \ | 43 | 0 | } |
|
652 | 30 | PStreamHeader& t_hdr = const_cast<PStreamHeader&>(hdr); |
653 | 30 | t_hdr.set_src_id(UNKNOWN_ID_FOR_TEST); |
654 | 30 | }); |
655 | 30 | } |
656 | 62 | if (UniqueId(hdr.load_id()) != UniqueId(_load_id)) { Branch (656:9): [True: 1, False: 61]
|
657 | 1 | Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>( |
658 | 1 | "invalid load id {}, expected {}", print_id(hdr.load_id()), print_id(_load_id)); |
659 | 1 | _report_failure(id, st, hdr); |
660 | 1 | return; |
661 | 1 | } |
662 | | |
663 | 61 | { |
664 | 61 | std::lock_guard lock_guard(_lock); |
665 | 61 | if (!_open_streams.contains(hdr.src_id())) { Branch (665:13): [True: 17, False: 44]
|
666 | 17 | Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>("no open stream from source {}", |
667 | 17 | hdr.src_id()); |
668 | 17 | _report_failure(id, st, hdr); |
669 | 17 | return; |
670 | 17 | } |
671 | 61 | } |
672 | | |
673 | 44 | switch (hdr.opcode()) { |
674 | 0 | case PStreamHeader::ADD_SEGMENT: // ADD_SEGMENT will be dispatched inside TabletStream Branch (674:5): [True: 0, False: 44]
|
675 | 28 | case PStreamHeader::APPEND_DATA: { Branch (675:5): [True: 28, False: 16]
|
676 | 28 | auto st = _append_data(hdr, data); |
677 | 28 | if (!st.ok()) { Branch (677:13): [True: 2, False: 26]
|
678 | 2 | _report_failure(id, st, hdr); |
679 | 2 | } |
680 | 28 | } break; |
681 | 16 | case PStreamHeader::CLOSE_LOAD: { Branch (681:5): [True: 16, False: 28]
|
682 | 16 | std::vector<int64_t> success_tablet_ids; |
683 | 16 | FailedTablets failed_tablets; |
684 | 16 | std::vector<PTabletID> tablets_to_commit(hdr.tablets().begin(), hdr.tablets().end()); |
685 | 16 | close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets); |
686 | 16 | _report_result(id, Status::OK(), success_tablet_ids, failed_tablets, true); |
687 | 16 | brpc::StreamClose(id); |
688 | 16 | } break; |
689 | 0 | case PStreamHeader::GET_SCHEMA: { Branch (689:5): [True: 0, False: 44]
|
690 | 0 | _report_schema(id, hdr); |
691 | 0 | } break; |
692 | 0 | default: Branch (692:5): [True: 0, False: 44]
|
693 | 0 | LOG(WARNING) << "unexpected stream message " << hdr.opcode() << ", " << *this; |
694 | 0 | DCHECK(false); |
695 | 44 | } |
696 | 44 | } |
697 | | |
698 | 0 | void LoadStream::on_idle_timeout(StreamId id) { |
699 | 0 | LOG(WARNING) << "closing load stream on idle timeout, " << *this; |
700 | 0 | brpc::StreamClose(id); |
701 | 0 | } |
702 | | |
703 | 16 | void LoadStream::on_closed(StreamId id) { |
704 | | // `this` may be freed by other threads after increasing `_close_rpc_cnt`, |
705 | | // format string first to prevent use-after-free |
706 | 16 | std::stringstream ss; |
707 | 16 | ss << *this; |
708 | 16 | auto remaining_streams = _total_streams - _close_rpc_cnt.fetch_add(1) - 1; |
709 | 16 | LOG(INFO) << "stream " << id << " on_closed, remaining streams = " << remaining_streams << ", " |
710 | 16 | << ss.str(); |
711 | 16 | if (remaining_streams == 0) { Branch (711:9): [True: 14, False: 2]
|
712 | 14 | _load_stream_mgr->clear_load(_load_id); |
713 | 14 | } |
714 | 16 | } |
715 | | |
716 | 110 | inline std::ostream& operator<<(std::ostream& ostr, const LoadStream& load_stream) { |
717 | 110 | ostr << "load_id=" << print_id(load_stream._load_id) << ", txn_id=" << load_stream._txn_id; |
718 | 110 | return ostr; |
719 | 110 | } |
720 | | |
721 | | } // namespace doris |