/root/doris/be/src/runtime/load_channel.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "runtime/load_channel.h" |
19 | | |
20 | | #include <gen_cpp/internal_service.pb.h> |
21 | | #include <glog/logging.h> |
22 | | |
23 | | #include "cloud/cloud_tablets_channel.h" |
24 | | #include "cloud/config.h" |
25 | | #include "common/logging.h" |
26 | | #include "olap/storage_engine.h" |
27 | | #include "runtime/exec_env.h" |
28 | | #include "runtime/fragment_mgr.h" |
29 | | #include "runtime/memory/mem_tracker.h" |
30 | | #include "runtime/tablets_channel.h" |
31 | | #include "runtime/thread_context.h" |
32 | | #include "runtime/workload_group/workload_group_manager.h" |
33 | | |
34 | | namespace doris { |
35 | | |
36 | | bvar::Adder<int64_t> g_loadchannel_cnt("loadchannel_cnt"); |
37 | | |
38 | | LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_high_priority, |
39 | | std::string sender_ip, int64_t backend_id, bool enable_profile, |
40 | | int64_t wg_id) |
41 | | : _load_id(load_id), |
42 | | _timeout_s(timeout_s), |
43 | | _is_high_priority(is_high_priority), |
44 | | _sender_ip(std::move(sender_ip)), |
45 | | _backend_id(backend_id), |
46 | 0 | _enable_profile(enable_profile) { |
47 | 0 | std::shared_ptr<QueryContext> query_context = |
48 | 0 | ExecEnv::GetInstance()->fragment_mgr()->get_or_erase_query_ctx_with_lock( |
49 | 0 | _load_id.to_thrift()); |
50 | 0 | std::shared_ptr<MemTrackerLimiter> mem_tracker = nullptr; |
51 | 0 | WorkloadGroupPtr wg_ptr = nullptr; |
52 | |
|
53 | 0 | if (query_context != nullptr) { |
54 | 0 | mem_tracker = query_context->query_mem_tracker; |
55 | 0 | wg_ptr = query_context->workload_group(); |
56 | 0 | } else { |
57 | | // when memtable on sink is not enabled, load can not find queryctx |
58 | 0 | mem_tracker = MemTrackerLimiter::create_shared( |
59 | 0 | MemTrackerLimiter::Type::LOAD, |
60 | 0 | fmt::format("(FromLoadChannel)Load#Id={}", _load_id.to_string())); |
61 | 0 | if (wg_id > 0) { |
62 | 0 | WorkloadGroupPtr workload_group_ptr = |
63 | 0 | ExecEnv::GetInstance()->workload_group_mgr()->get_task_group_by_id(wg_id); |
64 | 0 | if (workload_group_ptr) { |
65 | 0 | wg_ptr = workload_group_ptr; |
66 | 0 | wg_ptr->add_mem_tracker_limiter(mem_tracker); |
67 | 0 | } |
68 | 0 | } |
69 | 0 | } |
70 | 0 | _query_thread_context = {_load_id.to_thrift(), mem_tracker, wg_ptr}; |
71 | |
|
72 | 0 | g_loadchannel_cnt << 1; |
73 | | // _last_updated_time should be set before being inserted to |
74 | | // _load_channels in load_channel_mgr, or it may be erased |
75 | | // immediately by gc thread. |
76 | 0 | _last_updated_time.store(time(nullptr)); |
77 | 0 | _init_profile(); |
78 | 0 | } |
79 | | |
80 | 0 | LoadChannel::~LoadChannel() { |
81 | 0 | g_loadchannel_cnt << -1; |
82 | 0 | std::stringstream rows_str; |
83 | 0 | for (const auto& entry : _tablets_channels_rows) { |
84 | 0 | rows_str << ", index id: " << entry.first << ", total_received_rows: " << entry.second.first |
85 | 0 | << ", num_rows_filtered: " << entry.second.second; |
86 | 0 | } |
87 | 0 | LOG(INFO) << "load channel removed" |
88 | 0 | << " load_id=" << _load_id << ", is high priority=" << _is_high_priority |
89 | 0 | << ", sender_ip=" << _sender_ip << rows_str.str(); |
90 | 0 | } |
91 | | |
92 | 0 | void LoadChannel::_init_profile() { |
93 | 0 | _profile = std::make_unique<RuntimeProfile>("LoadChannels"); |
94 | 0 | _mgr_add_batch_timer = ADD_TIMER(_profile, "LoadChannelMgrAddBatchTime"); |
95 | 0 | _handle_mem_limit_timer = ADD_TIMER(_profile, "HandleMemLimitTime"); |
96 | 0 | _self_profile = |
97 | 0 | _profile->create_child(fmt::format("LoadChannel load_id={} (host={}, backend_id={})", |
98 | 0 | _load_id.to_string(), _sender_ip, _backend_id), |
99 | 0 | true, true); |
100 | 0 | _add_batch_number_counter = ADD_COUNTER(_self_profile, "NumberBatchAdded", TUnit::UNIT); |
101 | 0 | _peak_memory_usage_counter = ADD_COUNTER(_self_profile, "PeakMemoryUsage", TUnit::BYTES); |
102 | 0 | _add_batch_timer = ADD_TIMER(_self_profile, "AddBatchTime"); |
103 | 0 | _handle_eos_timer = ADD_CHILD_TIMER(_self_profile, "HandleEosTime", "AddBatchTime"); |
104 | 0 | _add_batch_times = ADD_COUNTER(_self_profile, "AddBatchTimes", TUnit::UNIT); |
105 | 0 | } |
106 | | |
107 | 0 | Status LoadChannel::open(const PTabletWriterOpenRequest& params) { |
108 | 0 | if (config::is_cloud_mode() && params.txn_expiration() <= 0) { |
109 | 0 | return Status::InternalError( |
110 | 0 | "The txn expiration of PTabletWriterOpenRequest is invalid, value={}", |
111 | 0 | params.txn_expiration()); |
112 | 0 | } |
113 | 0 | SCOPED_ATTACH_TASK(_query_thread_context); |
114 | |
|
115 | 0 | int64_t index_id = params.index_id(); |
116 | 0 | std::shared_ptr<BaseTabletsChannel> channel; |
117 | 0 | { |
118 | 0 | std::lock_guard<std::mutex> l(_lock); |
119 | 0 | auto it = _tablets_channels.find(index_id); |
120 | 0 | if (it != _tablets_channels.end()) { |
121 | 0 | channel = it->second; |
122 | 0 | } else { |
123 | | // just for VLOG |
124 | 0 | if (_txn_id == 0) [[unlikely]] { |
125 | 0 | _txn_id = params.txn_id(); |
126 | 0 | } |
127 | | // create a new tablets channel |
128 | 0 | TabletsChannelKey key(params.id(), index_id); |
129 | 0 | BaseStorageEngine& engine = ExecEnv::GetInstance()->storage_engine(); |
130 | 0 | if (config::is_cloud_mode()) { |
131 | 0 | channel = std::make_shared<CloudTabletsChannel>(engine.to_cloud(), key, _load_id, |
132 | 0 | _is_high_priority, _self_profile); |
133 | 0 | } else { |
134 | 0 | channel = std::make_shared<TabletsChannel>(engine.to_local(), key, _load_id, |
135 | 0 | _is_high_priority, _self_profile); |
136 | 0 | } |
137 | 0 | { |
138 | 0 | std::lock_guard<std::mutex> l(_tablets_channels_lock); |
139 | 0 | _tablets_channels.insert({index_id, channel}); |
140 | 0 | } |
141 | 0 | } |
142 | 0 | } |
143 | |
|
144 | 0 | if (params.is_incremental()) { |
145 | | // incremental open would ensure not to open tablet repeatedly |
146 | 0 | RETURN_IF_ERROR(channel->incremental_open(params)); |
147 | 0 | } else { |
148 | 0 | RETURN_IF_ERROR(channel->open(params)); |
149 | 0 | } |
150 | | |
151 | 0 | _opened = true; |
152 | 0 | _last_updated_time.store(time(nullptr)); |
153 | 0 | return Status::OK(); |
154 | 0 | } |
155 | | |
156 | | Status LoadChannel::_get_tablets_channel(std::shared_ptr<BaseTabletsChannel>& channel, |
157 | 0 | bool& is_finished, const int64_t index_id) { |
158 | 0 | std::lock_guard<std::mutex> l(_lock); |
159 | 0 | auto it = _tablets_channels.find(index_id); |
160 | 0 | if (it == _tablets_channels.end()) { |
161 | 0 | if (_finished_channel_ids.find(index_id) != _finished_channel_ids.end()) { |
162 | | // this channel is already finished, just return OK |
163 | 0 | is_finished = true; |
164 | 0 | return Status::OK(); |
165 | 0 | } |
166 | 0 | std::stringstream ss; |
167 | 0 | ss << "load channel " << _load_id << " add batch with unknown index id: " << index_id; |
168 | 0 | return Status::InternalError(ss.str()); |
169 | 0 | } |
170 | | |
171 | 0 | is_finished = false; |
172 | 0 | channel = it->second; |
173 | 0 | return Status::OK(); |
174 | 0 | } |
175 | | |
176 | | Status LoadChannel::add_batch(const PTabletWriterAddBlockRequest& request, |
177 | 0 | PTabletWriterAddBlockResult* response) { |
178 | 0 | SCOPED_TIMER(_add_batch_timer); |
179 | 0 | COUNTER_UPDATE(_add_batch_times, 1); |
180 | 0 | SCOPED_ATTACH_TASK(_query_thread_context); |
181 | 0 | int64_t index_id = request.index_id(); |
182 | | // 1. get tablets channel |
183 | 0 | std::shared_ptr<BaseTabletsChannel> channel; |
184 | 0 | bool is_finished = false; |
185 | 0 | Status st = _get_tablets_channel(channel, is_finished, index_id); |
186 | 0 | if (!st.ok() || is_finished) { |
187 | 0 | return st; |
188 | 0 | } |
189 | | |
190 | | // 2. add block to tablets channel |
191 | 0 | if (request.has_block()) { |
192 | 0 | RETURN_IF_ERROR(channel->add_batch(request, response)); |
193 | 0 | _add_batch_number_counter->update(1); |
194 | 0 | } |
195 | | |
196 | | // 3. handle eos |
197 | | // if channel is incremental, maybe hang on close until all close request arrived. |
198 | 0 | if (request.has_eos() && request.eos()) { |
199 | 0 | st = _handle_eos(channel.get(), request, response); |
200 | 0 | _report_profile(response); |
201 | 0 | if (!st.ok()) { |
202 | 0 | return st; |
203 | 0 | } |
204 | 0 | } else if (_add_batch_number_counter->value() % 100 == 1) { |
205 | 0 | _report_profile(response); |
206 | 0 | } |
207 | 0 | _last_updated_time.store(time(nullptr)); |
208 | 0 | return st; |
209 | 0 | } |
210 | | |
211 | | Status LoadChannel::_handle_eos(BaseTabletsChannel* channel, |
212 | | const PTabletWriterAddBlockRequest& request, |
213 | 0 | PTabletWriterAddBlockResult* response) { |
214 | 0 | _self_profile->add_info_string("EosHost", fmt::format("{}", request.backend_id())); |
215 | 0 | bool finished = false; |
216 | 0 | auto index_id = request.index_id(); |
217 | |
|
218 | 0 | RETURN_IF_ERROR(channel->close(this, request, response, &finished)); |
219 | | |
220 | | // for init node, we close waiting(hang on) all close request and let them return together. |
221 | 0 | if (request.has_hang_wait() && request.hang_wait()) { |
222 | 0 | DCHECK(!channel->is_incremental_channel()); |
223 | 0 | VLOG_DEBUG << fmt::format("txn {}: reciever index {} close waiting by sender {}", _txn_id, |
224 | 0 | request.index_id(), request.sender_id()); |
225 | 0 | int count = 0; |
226 | 0 | while (!channel->is_finished()) { |
227 | 0 | bthread_usleep(1000); |
228 | 0 | count++; |
229 | 0 | } |
230 | | // now maybe finished or cancelled. |
231 | 0 | VLOG_TRACE << "reciever close wait finished!" << request.sender_id(); |
232 | 0 | if (count >= 1000 * _timeout_s) { // maybe config::streaming_load_rpc_max_alive_time_sec |
233 | 0 | return Status::InternalError("Tablets channel didn't wait all close"); |
234 | 0 | } |
235 | 0 | } |
236 | | |
237 | 0 | if (finished) { |
238 | 0 | std::lock_guard<std::mutex> l(_lock); |
239 | 0 | { |
240 | 0 | std::lock_guard<std::mutex> l(_tablets_channels_lock); |
241 | 0 | _tablets_channels_rows.insert(std::make_pair( |
242 | 0 | index_id, |
243 | 0 | std::make_pair(channel->total_received_rows(), channel->num_rows_filtered()))); |
244 | 0 | _tablets_channels.erase(index_id); |
245 | 0 | } |
246 | 0 | LOG(INFO) << "txn " << _txn_id << " closed tablets_channel " << index_id; |
247 | 0 | _finished_channel_ids.emplace(index_id); |
248 | 0 | } |
249 | 0 | return Status::OK(); |
250 | 0 | } |
251 | | |
252 | 0 | void LoadChannel::_report_profile(PTabletWriterAddBlockResult* response) { |
253 | 0 | if (!_enable_profile) { |
254 | 0 | return; |
255 | 0 | } |
256 | | |
257 | | // TabletSink and LoadChannel in BE are M: N relationship, |
258 | | // Every once in a while LoadChannel will randomly return its own runtime profile to a TabletSink, |
259 | | // so usually all LoadChannel runtime profiles are saved on each TabletSink, |
260 | | // and the timeliness of the same LoadChannel profile saved on different TabletSinks is different, |
261 | | // and each TabletSink will periodically send fe reports all the LoadChannel profiles saved by itself, |
262 | | // and ensures to update the latest LoadChannel profile according to the timestamp. |
263 | 0 | _self_profile->set_timestamp(_last_updated_time); |
264 | |
|
265 | 0 | { |
266 | 0 | std::lock_guard<std::mutex> l(_tablets_channels_lock); |
267 | 0 | for (auto& it : _tablets_channels) { |
268 | 0 | it.second->refresh_profile(); |
269 | 0 | } |
270 | 0 | } |
271 | |
|
272 | 0 | TRuntimeProfileTree tprofile; |
273 | 0 | ThriftSerializer ser(false, 4096); |
274 | 0 | uint8_t* buf = nullptr; |
275 | 0 | uint32_t len = 0; |
276 | 0 | std::lock_guard<SpinLock> l(_profile_serialize_lock); |
277 | 0 | _profile->to_thrift(&tprofile); |
278 | 0 | auto st = ser.serialize(&tprofile, &len, &buf); |
279 | 0 | if (st.ok()) { |
280 | 0 | response->set_load_channel_profile(std::string((const char*)buf, len)); |
281 | 0 | } else { |
282 | 0 | LOG(WARNING) << "load channel TRuntimeProfileTree serialize failed, errmsg=" << st; |
283 | 0 | } |
284 | 0 | } |
285 | | |
286 | 0 | bool LoadChannel::is_finished() { |
287 | 0 | if (!_opened) { |
288 | 0 | return false; |
289 | 0 | } |
290 | 0 | std::lock_guard<std::mutex> l(_lock); |
291 | 0 | return _tablets_channels.empty(); |
292 | 0 | } |
293 | | |
294 | 0 | Status LoadChannel::cancel() { |
295 | 0 | std::lock_guard<std::mutex> l(_lock); |
296 | 0 | for (auto& it : _tablets_channels) { |
297 | 0 | static_cast<void>(it.second->cancel()); |
298 | 0 | } |
299 | 0 | return Status::OK(); |
300 | 0 | } |
301 | | |
302 | | } // namespace doris |