be/src/runtime/runtime_query_statistics_mgr.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "runtime/runtime_query_statistics_mgr.h" |
19 | | |
20 | | #include <gen_cpp/FrontendService_types.h> |
21 | | #include <gen_cpp/RuntimeProfile_types.h> |
22 | | #include <gen_cpp/Status_types.h> |
23 | | #include <gen_cpp/Types_types.h> |
24 | | #include <thrift/TApplicationException.h> |
25 | | |
26 | | #include <condition_variable> |
27 | | #include <cstdint> |
28 | | #include <memory> |
29 | | #include <mutex> |
30 | | #include <shared_mutex> |
31 | | #include <string> |
32 | | #include <tuple> |
33 | | #include <unordered_map> |
34 | | #include <vector> |
35 | | |
36 | | #include "common/logging.h" |
37 | | #include "common/status.h" |
38 | | #include "core/block/block.h" |
39 | | #include "information_schema/schema_scanner_helper.h" |
40 | | #include "runtime/exec_env.h" |
41 | | #include "util/client_cache.h" |
42 | | #include "util/debug_util.h" |
43 | | #include "util/threadpool.h" |
44 | | #include "util/thrift_client.h" |
45 | | #include "util/time.h" |
46 | | #include "util/uid_util.h" |
47 | | |
48 | | namespace doris { |
49 | | // TODO: Currently this function is only used to report profile. |
50 | | // In the future, all exec status and query statistics should be reported |
51 | | // thorough this function. |
52 | | static Status _do_report_exec_stats_rpc(const TNetworkAddress& coor_addr, |
53 | | const TReportExecStatusParams& req, |
54 | 1.37k | TReportExecStatusResult& res) { |
55 | 1.37k | Status client_status; |
56 | 1.37k | FrontendServiceConnection rpc_client(ExecEnv::GetInstance()->frontend_client_cache(), coor_addr, |
57 | 1.37k | config::thrift_rpc_timeout_ms, &client_status); |
58 | 1.37k | if (!client_status.ok()) { |
59 | 0 | LOG_WARNING( |
60 | 0 | "Could not get client rpc client of {} when reporting profiles, reason is {}, " |
61 | 0 | "not reporting, profile will be lost", |
62 | 0 | PrintThriftNetworkAddress(coor_addr), client_status.to_string()); |
63 | 0 | return Status::RpcError("Client rpc client failed"); |
64 | 0 | } |
65 | | |
66 | 1.37k | VLOG_DEBUG << "Sending profile"; |
67 | | |
68 | 1.37k | try { |
69 | 1.37k | try { |
70 | 1.37k | rpc_client->reportExecStatus(res, req); |
71 | 1.37k | } catch (const apache::thrift::transport::TTransportException& e) { |
72 | | #ifndef ADDRESS_SANITIZER |
73 | | LOG_WARNING("Transport exception from {}, reason: {}, reopening", |
74 | | PrintThriftNetworkAddress(coor_addr), e.what()); |
75 | | client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms); |
76 | | if (!client_status.ok()) { |
77 | | LOG_WARNING("Reopen failed, reason: {}", client_status.to_string()); |
78 | | return Status::RpcError("Open rpc client failed"); |
79 | | } |
80 | | |
81 | | rpc_client->reportExecStatus(res, req); |
82 | | #else |
83 | 0 | return Status::RpcError("Transport exception when report query profile, {}", e.what()); |
84 | 0 | #endif |
85 | 0 | } |
86 | 1.37k | } catch (apache::thrift::TApplicationException& e) { |
87 | 0 | if (e.getType() == e.UNKNOWN_METHOD) { |
88 | 0 | LOG_WARNING( |
89 | 0 | "Failed to report query profile to {} due to {}, usually because the frontend " |
90 | 0 | "is not upgraded, check the version", |
91 | 0 | PrintThriftNetworkAddress(coor_addr), e.what()); |
92 | 0 | } else { |
93 | 0 | LOG_WARNING( |
94 | 0 | "Failed to report query profile to {}, reason: {}, you can see fe log for " |
95 | 0 | "details.", |
96 | 0 | PrintThriftNetworkAddress(coor_addr), e.what()); |
97 | 0 | } |
98 | 0 | return Status::RpcError("Send stats failed"); |
99 | 0 | } catch (apache::thrift::TException& e) { |
100 | 0 | LOG_WARNING("Failed to report query profile to {}, reason: {} ", |
101 | 0 | PrintThriftNetworkAddress(coor_addr), e.what()); |
102 | 0 | std::this_thread::sleep_for( |
103 | 0 | std::chrono::milliseconds(config::thrift_client_retry_interval_ms * 2)); |
104 | | // just reopen to disable this connection |
105 | 0 | static_cast<void>(rpc_client.reopen(config::thrift_rpc_timeout_ms)); |
106 | 0 | return Status::RpcError("Transport exception when report query profile"); |
107 | 0 | } catch (std::exception& e) { |
108 | 0 | LOG_WARNING( |
109 | 0 | "Failed to report query profile to {}, reason: {}, you can see fe log for details.", |
110 | 0 | PrintThriftNetworkAddress(coor_addr), e.what()); |
111 | 0 | return Status::RpcError("Send report query profile failed"); |
112 | 0 | } |
113 | | |
114 | 1.37k | return Status::OK(); |
115 | 1.37k | } |
116 | | |
117 | | static void _report_query_profiles_function( |
118 | | std::unordered_map< |
119 | | TUniqueId, |
120 | | std::tuple< |
121 | | TNetworkAddress, |
122 | | std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>>> |
123 | | profile_copy, |
124 | | std::unordered_map<std::pair<TUniqueId, int32_t>, std::shared_ptr<TRuntimeProfileTree>> |
125 | 1.37k | load_channel_profile_copy) { |
126 | | // query_id -> {coordinator_addr, {fragment_id -> std::vectpr<pipeline_profile>}} |
127 | 1.37k | for (auto& entry : profile_copy) { |
128 | 1.37k | const auto& query_id = entry.first; |
129 | 1.37k | const auto& coor_addr = std::get<0>(entry.second); |
130 | 1.37k | auto& fragment_profile_map = std::get<1>(entry.second); |
131 | | |
132 | 1.37k | if (fragment_profile_map.empty()) { |
133 | 0 | auto msg = fmt::format("Query {} does not have profile", print_id(query_id)); |
134 | 0 | DCHECK(false) << msg; |
135 | 0 | LOG_ERROR(msg); |
136 | 0 | continue; |
137 | 0 | } |
138 | | |
139 | 1.37k | std::vector<std::shared_ptr<TRuntimeProfileTree>> load_channel_profiles; |
140 | 2.44k | for (auto load_channel_profile : load_channel_profile_copy) { |
141 | 2.44k | if (load_channel_profile.second == nullptr) { |
142 | 0 | auto msg = fmt::format( |
143 | 0 | "Register fragment profile {} {} failed, load channel profile is null", |
144 | 0 | print_id(query_id), -1); |
145 | 0 | DCHECK(false) << msg; |
146 | 0 | LOG_ERROR(msg); |
147 | 0 | continue; |
148 | 0 | } |
149 | | |
150 | 2.44k | load_channel_profiles.push_back(load_channel_profile.second); |
151 | 2.44k | } |
152 | | |
153 | 1.37k | TReportExecStatusParams req = RuntimeQueryStatisticsMgr::create_report_exec_status_params( |
154 | 1.37k | query_id, std::move(fragment_profile_map), std::move(load_channel_profiles), |
155 | 1.37k | /*is_done=*/true); |
156 | 1.37k | TReportExecStatusResult res; |
157 | | |
158 | 1.37k | auto rpc_status = _do_report_exec_stats_rpc(coor_addr, req, res); |
159 | | |
160 | 1.37k | if (res.status.status_code != TStatusCode::OK || !rpc_status.ok()) { |
161 | 0 | LOG_WARNING("Query {} send profile to {} failed", print_id(query_id), |
162 | 0 | PrintThriftNetworkAddress(coor_addr)); |
163 | 1.37k | } else { |
164 | 1.37k | VLOG_CRITICAL << fmt::format("Send {} profile succeed", print_id(query_id)); |
165 | 1.37k | } |
166 | 1.37k | } |
167 | 1.37k | } |
168 | | |
169 | | TReportExecStatusParams RuntimeQueryStatisticsMgr::create_report_exec_status_params( |
170 | | const TUniqueId& query_id, |
171 | | std::unordered_map<int32_t, std::vector<std::shared_ptr<TRuntimeProfileTree>>> |
172 | | fragment_id_to_profile, |
173 | 1.37k | std::vector<std::shared_ptr<TRuntimeProfileTree>> load_channel_profiles, bool is_done) { |
174 | | // This function will clear the data of fragment_id_to_profile and load_channel_profiles. |
175 | 1.37k | TQueryProfile profile; |
176 | 1.37k | profile.__set_query_id(query_id); |
177 | | |
178 | 1.37k | std::map<int32_t, std::vector<TDetailedReportParams>> fragment_id_to_profile_req; |
179 | | |
180 | 2.44k | for (const auto& entry : fragment_id_to_profile) { |
181 | 2.44k | int32_t fragment_id = entry.first; |
182 | 2.44k | const std::vector<std::shared_ptr<TRuntimeProfileTree>>& fragment_profile = entry.second; |
183 | 2.44k | std::vector<TDetailedReportParams> detailed_params; |
184 | 2.44k | bool is_first = true; |
185 | 6.75k | for (auto pipeline_profile : fragment_profile) { |
186 | 6.75k | if (pipeline_profile == nullptr) { |
187 | 0 | auto msg = fmt::format("Register fragment profile {} {} failed, profile is null", |
188 | 0 | print_id(query_id), fragment_id); |
189 | 0 | DCHECK(false) << msg; |
190 | 0 | LOG_ERROR(msg); |
191 | 0 | continue; |
192 | 0 | } |
193 | | |
194 | 6.75k | TDetailedReportParams tmp; |
195 | 6.75k | THRIFT_MOVE_VALUES(tmp, profile, *pipeline_profile); |
196 | | // First profile is fragment level |
197 | 6.75k | tmp.__set_is_fragment_level(is_first); |
198 | 6.75k | is_first = false; |
199 | | // tmp.fragment_instance_id is not needed for pipeline x |
200 | 6.75k | detailed_params.push_back(std::move(tmp)); |
201 | 6.75k | } |
202 | | |
203 | 2.44k | fragment_id_to_profile_req[fragment_id] = std::move(detailed_params); |
204 | 2.44k | } |
205 | | |
206 | 1.37k | if (fragment_id_to_profile_req.empty()) { |
207 | 0 | LOG_WARNING("No fragment profile found for query {}", print_id(query_id)); |
208 | 0 | } |
209 | | |
210 | 1.37k | profile.__set_fragment_id_to_profile(fragment_id_to_profile_req); |
211 | | |
212 | 1.37k | std::vector<TRuntimeProfileTree> load_channel_profiles_req; |
213 | 2.44k | for (auto load_channel_profile : load_channel_profiles) { |
214 | 2.44k | if (load_channel_profile == nullptr) { |
215 | 0 | auto msg = fmt::format( |
216 | 0 | "Register fragment profile {} {} failed, load channel profile is null", |
217 | 0 | print_id(query_id), -1); |
218 | 0 | DCHECK(false) << msg; |
219 | 0 | LOG_ERROR(msg); |
220 | 0 | continue; |
221 | 0 | } |
222 | | |
223 | 2.44k | load_channel_profiles_req.push_back(std::move(*load_channel_profile)); |
224 | 2.44k | } |
225 | | |
226 | 1.37k | if (!load_channel_profiles_req.empty()) { |
227 | 1.37k | THRIFT_MOVE_VALUES(profile, load_channel_profiles, load_channel_profiles_req); |
228 | 1.37k | } |
229 | | |
230 | 1.37k | TReportExecStatusParams req; |
231 | 1.37k | THRIFT_MOVE_VALUES(req, query_profile, profile); |
232 | 1.37k | req.__set_backend_id(ExecEnv::GetInstance()->cluster_info()->backend_id); |
233 | | // invalid query id to avoid API compatibility during upgrade |
234 | 1.37k | req.__set_query_id(TUniqueId()); |
235 | 1.37k | req.__set_done(is_done); |
236 | | |
237 | 1.37k | return req; |
238 | 1.37k | } |
239 | | |
240 | 7 | Status RuntimeQueryStatisticsMgr::start_report_thread() { |
241 | 7 | if (started.load()) { |
242 | 0 | DCHECK(false) << "report thread has been started"; |
243 | 0 | LOG_ERROR("report thread has been started"); |
244 | 0 | return Status::InternalError("Report thread has been started"); |
245 | 0 | } |
246 | | |
247 | 7 | started.store(true); |
248 | 7 | ThreadPoolBuilder profile_report_thread_pool_builder("ReportProfileThreadPool"); |
249 | | |
250 | 7 | return profile_report_thread_pool_builder.set_max_threads(config::report_exec_status_thread_num) |
251 | 7 | .build(&_thread_pool); |
252 | 7 | } |
253 | | |
254 | | // 1. lock the profile_map. |
255 | | // 2. copy the profile_map and load_channel_profile_map to local variables. |
256 | | // 3. unlock the profile_map. |
257 | | // 4. create a profile reporting task and add it to the thread pool. |
258 | 1.37k | void RuntimeQueryStatisticsMgr::trigger_profile_reporting() { |
259 | 1.37k | decltype(_profile_map) profile_copy; |
260 | 1.37k | decltype(_load_channel_profile_map) load_channel_profile_copy; |
261 | | |
262 | 1.37k | { |
263 | 1.37k | std::unique_lock<std::mutex> lg(_profile_map_lock); |
264 | 1.37k | _profile_map.swap(profile_copy); |
265 | 1.37k | _load_channel_profile_map.swap(load_channel_profile_copy); |
266 | 1.37k | } |
267 | | |
268 | | // ATTN: Local variables are copied to avoid memory reclamation issues. |
269 | 1.37k | auto st = _thread_pool->submit_func([profile_copy, load_channel_profile_copy]() { |
270 | 1.37k | _report_query_profiles_function(profile_copy, load_channel_profile_copy); |
271 | 1.37k | }); |
272 | | |
273 | 1.37k | if (!st.ok()) { |
274 | 0 | LOG_WARNING("Failed to submit profile reporting task, reason: {}", st.to_string()); |
275 | | // If the thread pool is full, we will not report the profile. |
276 | | // The profile will be lost. |
277 | 0 | return; |
278 | 0 | } |
279 | 1.37k | } |
280 | | |
281 | 14 | void RuntimeQueryStatisticsMgr::stop_report_thread() { |
282 | 14 | if (!started) { |
283 | 11 | return; |
284 | 11 | } |
285 | | |
286 | 3 | LOG_INFO("All report threads are going to stop"); |
287 | 3 | _thread_pool->shutdown(); |
288 | 3 | LOG_INFO("All report threads stopped"); |
289 | 3 | } |
290 | | |
291 | | void RuntimeQueryStatisticsMgr::register_fragment_profile( |
292 | | const TUniqueId& query_id, const TNetworkAddress& coor_addr, int32_t fragment_id, |
293 | | std::vector<std::shared_ptr<TRuntimeProfileTree>> p_profiles, |
294 | 2.44k | std::shared_ptr<TRuntimeProfileTree> load_channel_profile) { |
295 | 6.75k | for (const auto& p : p_profiles) { |
296 | 6.75k | if (p == nullptr) { |
297 | 0 | auto msg = fmt::format("Register fragment profile {} {} failed, profile is null", |
298 | 0 | print_id(query_id), fragment_id); |
299 | 0 | DCHECK(false) << msg; |
300 | 0 | LOG_ERROR(msg); |
301 | 0 | return; |
302 | 0 | } |
303 | 6.75k | } |
304 | | |
305 | 2.44k | std::unique_lock<std::mutex> lg(_profile_map_lock); |
306 | | |
307 | 2.44k | if (!_profile_map.contains(query_id)) { |
308 | 1.37k | _profile_map[query_id] = std::make_tuple( |
309 | 1.37k | coor_addr, |
310 | 1.37k | std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>()); |
311 | 1.37k | } |
312 | | |
313 | 2.44k | std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>& |
314 | 2.44k | fragment_profile_map = std::get<1>(_profile_map[query_id]); |
315 | 2.44k | fragment_profile_map.insert(std::make_pair(fragment_id, p_profiles)); |
316 | | |
317 | 2.44k | if (load_channel_profile != nullptr) { |
318 | 2.44k | _load_channel_profile_map[std::make_pair(query_id, fragment_id)] = load_channel_profile; |
319 | 2.44k | } |
320 | | |
321 | 2.44k | VLOG_CRITICAL << fmt::format("register x profile done {}, fragment {}, profiles {}", |
322 | 0 | print_id(query_id), fragment_id, p_profiles.size()); |
323 | 2.44k | } |
324 | | |
325 | | void RuntimeQueryStatisticsMgr::register_resource_context( |
326 | 283k | std::string query_id, std::shared_ptr<ResourceContext> resource_ctx) { |
327 | 283k | std::lock_guard<std::shared_mutex> write_lock(_resource_contexts_map_lock); |
328 | | // Note: `group_commit_insert` will use the same `query_id` to submit multiple load tasks in sequence. |
329 | | // After the previous load task ends but QueryStatistics has not been reported to FE, |
330 | | // if the next load task with the same `query_id` starts to execute, `register_resource_context` will |
331 | | // find that `query_id` already exists in _resource_contexts_map. |
332 | | // At this time, directly overwriting the `resource_ctx` corresponding to the `query_id` |
333 | | // in `register_resource_context` will cause the previous load task not to be reported to FE. |
334 | | // DCHECK(_resource_contexts_map.find(query_id) == _resource_contexts_map.end()); |
335 | 283k | _resource_contexts_map[query_id] = resource_ctx; |
336 | 283k | } |
337 | | |
338 | 4.25k | void RuntimeQueryStatisticsMgr::report_runtime_query_statistics() { |
339 | 4.25k | int64_t be_id = ExecEnv::GetInstance()->cluster_info()->backend_id; |
340 | | // 1 get query statistics map |
341 | | // <fe_addr, <query_id, <query_statistics, is_query_finished>>> |
342 | 4.25k | std::map<TNetworkAddress, std::map<std::string, std::pair<TQueryStatistics, bool>>> fe_qs_map; |
343 | 4.25k | std::map<std::string, std::pair<bool, bool>> qs_status; // <finished, timeout> |
344 | 4.25k | { |
345 | 4.25k | std::lock_guard<std::shared_mutex> write_lock(_resource_contexts_map_lock); |
346 | 4.25k | int64_t current_time = MonotonicMillis(); |
347 | 4.25k | int64_t conf_qs_timeout = config::query_statistics_reserve_timeout_ms; |
348 | | |
349 | 312k | for (auto iter = _resource_contexts_map.begin(); iter != _resource_contexts_map.end();) { |
350 | 308k | std::string query_id = iter->first; |
351 | 308k | auto resource_ctx = iter->second; |
352 | 308k | bool is_query_finished = resource_ctx->task_controller()->is_finished(); |
353 | 308k | bool is_timeout_after_finish = false; |
354 | 308k | if (is_query_finished) { |
355 | 284k | is_timeout_after_finish = |
356 | 284k | (current_time - resource_ctx->task_controller()->finish_time()) > |
357 | 284k | conf_qs_timeout; |
358 | 284k | } |
359 | | |
360 | | // external query not need to report to FE, so we can remove it directly. |
361 | 308k | if (resource_ctx->task_controller()->query_type() == TQueryType::EXTERNAL && |
362 | 308k | is_query_finished) { |
363 | 3 | iter = _resource_contexts_map.erase(iter); |
364 | 308k | } else { |
365 | 308k | if (resource_ctx->task_controller()->query_type() != TQueryType::EXTERNAL) { |
366 | 308k | if (fe_qs_map.find(resource_ctx->task_controller()->fe_addr()) == |
367 | 308k | fe_qs_map.end()) { |
368 | 2.53k | std::map<std::string, std::pair<TQueryStatistics, bool>> tmp_map; |
369 | 2.53k | fe_qs_map[resource_ctx->task_controller()->fe_addr()] = std::move(tmp_map); |
370 | 2.53k | } |
371 | | |
372 | 308k | TQueryStatistics ret_t_qs; |
373 | 308k | resource_ctx->to_thrift_query_statistics(&ret_t_qs); |
374 | 308k | fe_qs_map.at(resource_ctx->task_controller()->fe_addr())[query_id] = |
375 | 308k | std::make_pair(ret_t_qs, is_query_finished); |
376 | 308k | qs_status[query_id] = |
377 | 308k | std::make_pair(is_query_finished, is_timeout_after_finish); |
378 | 308k | } |
379 | | |
380 | 308k | iter++; |
381 | 308k | } |
382 | 308k | } |
383 | 4.25k | } |
384 | | |
385 | | // 2 report query statistics to fe |
386 | 4.25k | std::map<TNetworkAddress, bool> rpc_result; |
387 | 4.25k | for (auto& [addr, qs_map] : fe_qs_map) { |
388 | 2.53k | rpc_result[addr] = false; |
389 | | // 2.1 get client |
390 | 2.53k | Status coord_status; |
391 | 2.53k | FrontendServiceConnection coord(ExecEnv::GetInstance()->frontend_client_cache(), addr, |
392 | 2.53k | config::thrift_rpc_timeout_ms, &coord_status); |
393 | 2.53k | std::string add_str = PrintThriftNetworkAddress(addr); |
394 | 2.53k | if (!coord_status.ok()) { |
395 | 0 | std::stringstream ss; |
396 | 0 | LOG(WARNING) << "[report_query_statistics]could not get client " << add_str |
397 | 0 | << " when report workload runtime stats, reason:" |
398 | 0 | << coord_status.to_string(); |
399 | 0 | continue; |
400 | 0 | } |
401 | | |
402 | 2.53k | auto reopen_coord = [&coord]() -> Status { |
403 | 0 | std::this_thread::sleep_for( |
404 | 0 | std::chrono::milliseconds(config::thrift_client_retry_interval_ms * 2)); |
405 | | // just reopen to disable this connection |
406 | 0 | return coord.reopen(config::thrift_rpc_timeout_ms); |
407 | 0 | }; |
408 | | |
409 | | // 2.2 send report |
410 | 2.53k | TReportWorkloadRuntimeStatusParams report_runtime_params; |
411 | 2.53k | report_runtime_params.__set_backend_id(be_id); |
412 | | |
413 | | // Build the query statistics map with TQueryStatisticsResult |
414 | 2.53k | std::map<std::string, TQueryStatisticsResult> query_stats_result_map; |
415 | 308k | for (const auto& [query_id, query_stats_pair] : qs_map) { |
416 | 308k | TQueryStatisticsResult stats_result; |
417 | 308k | stats_result.__set_statistics(query_stats_pair.first); // TQueryStatistics |
418 | 308k | stats_result.__set_query_finished(query_stats_pair.second); // is_query_finished |
419 | 308k | query_stats_result_map[query_id] = stats_result; |
420 | 308k | } |
421 | | |
422 | 2.53k | report_runtime_params.__set_query_statistics_result_map(query_stats_result_map); |
423 | | |
424 | 2.53k | TReportExecStatusParams params; |
425 | 2.53k | params.__set_report_workload_runtime_status(report_runtime_params); |
426 | | |
427 | 2.53k | TReportExecStatusResult res; |
428 | 2.53k | Status rpc_status; |
429 | | |
430 | 2.53k | try { |
431 | 2.53k | try { |
432 | 2.53k | coord->reportExecStatus(res, params); |
433 | 2.53k | rpc_result[addr] = true; |
434 | 2.53k | } catch (apache::thrift::transport::TTransportException& e) { |
435 | 0 | rpc_status = reopen_coord(); |
436 | | #ifndef ADDRESS_SANITIZER |
437 | | LOG_WARNING( |
438 | | "[report_query_statistics] report to fe {} failed, reason:{}, try reopen.", |
439 | | add_str, e.what()); |
440 | | #else |
441 | 0 | std::cerr << "thrift error, reason=" << e.what(); |
442 | 0 | #endif |
443 | 0 | if (rpc_status.ok()) { |
444 | 0 | coord->reportExecStatus(res, params); |
445 | 0 | rpc_result[addr] = true; |
446 | 0 | } |
447 | 0 | } |
448 | 2.53k | } catch (apache::thrift::TApplicationException& e) { |
449 | 0 | LOG_WARNING( |
450 | 0 | "[report_query_statistics]fe {} throw exception when report statistics, " |
451 | 0 | "reason:{}, you can see fe log for details.", |
452 | 0 | add_str, e.what()); |
453 | 0 | rpc_status = reopen_coord(); |
454 | 0 | } catch (apache::thrift::TException& e) { |
455 | 0 | LOG_WARNING( |
456 | 0 | "[report_query_statistics]report workload runtime statistics to {} failed, " |
457 | 0 | "reason: {}", |
458 | 0 | add_str, e.what()); |
459 | 0 | rpc_status = reopen_coord(); |
460 | 0 | } catch (std::exception& e) { |
461 | 0 | LOG_WARNING( |
462 | 0 | "[report_query_statistics]unknown exception when report workload runtime " |
463 | 0 | "statistics to {}, reason:{}. ", |
464 | 0 | add_str, e.what()); |
465 | 0 | } |
466 | | |
467 | 2.53k | if (!rpc_status.ok()) { |
468 | 0 | LOG_WARNING( |
469 | 0 | "[report_query_statistics]reopen thrift client failed when report " |
470 | 0 | "workload runtime statistics to {}, reason: {}", |
471 | 0 | add_str, rpc_status.to_string()); |
472 | 0 | } |
473 | 2.53k | } |
474 | | |
475 | | // 3 when query is finished and (last rpc is send success), remove finished query statistics |
476 | 4.25k | if (fe_qs_map.empty()) { |
477 | 1.72k | return; |
478 | 1.72k | } |
479 | | |
480 | 2.53k | { |
481 | 2.53k | std::lock_guard<std::shared_mutex> write_lock(_resource_contexts_map_lock); |
482 | 2.53k | for (auto& [addr, qs_map] : fe_qs_map) { |
483 | 2.53k | bool is_rpc_success = rpc_result[addr]; |
484 | 308k | for (auto& [query_id, qs] : qs_map) { |
485 | 308k | auto& qs_status_pair = qs_status[query_id]; |
486 | 308k | bool is_query_finished = qs_status_pair.first; |
487 | 308k | bool is_timeout_after_finish = qs_status_pair.second; |
488 | 308k | if ((is_rpc_success && is_query_finished) || is_timeout_after_finish) { |
489 | 284k | _resource_contexts_map.erase(query_id); |
490 | 284k | } |
491 | 308k | } |
492 | 2.53k | } |
493 | 2.53k | } |
494 | 2.53k | } |
495 | | |
496 | 608 | void RuntimeQueryStatisticsMgr::get_active_be_tasks_block(Block* block) { |
497 | 608 | std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock); |
498 | 608 | int64_t be_id = ExecEnv::GetInstance()->cluster_info()->backend_id; |
499 | | |
500 | | // block's schema come from SchemaBackendActiveTasksScanner::_s_tbls_columns |
501 | 70.2k | for (auto& [query_id, resource_ctx] : _resource_contexts_map) { |
502 | 70.2k | TQueryStatistics tqs; |
503 | 70.2k | resource_ctx->to_thrift_query_statistics(&tqs); |
504 | 70.2k | SchemaScannerHelper::insert_int64_value(0, be_id, block); |
505 | 70.2k | SchemaScannerHelper::insert_string_value( |
506 | 70.2k | 1, resource_ctx->task_controller()->fe_addr().hostname, block); |
507 | 70.2k | auto wg = resource_ctx->workload_group(); |
508 | 70.2k | SchemaScannerHelper::insert_int64_value(2, wg ? wg->id() : -1, block); |
509 | 70.2k | SchemaScannerHelper::insert_string_value(3, query_id, block); |
510 | | |
511 | 70.2k | int64_t task_time = |
512 | 70.2k | resource_ctx->task_controller()->is_finished() |
513 | 70.2k | ? resource_ctx->task_controller()->finish_time() - |
514 | 63.8k | resource_ctx->task_controller()->start_time() |
515 | 70.2k | : MonotonicMillis() - resource_ctx->task_controller()->start_time(); |
516 | 70.2k | SchemaScannerHelper::insert_int64_value(4, task_time, block); |
517 | 70.2k | SchemaScannerHelper::insert_int64_value(5, tqs.cpu_ms, block); |
518 | 70.2k | SchemaScannerHelper::insert_int64_value(6, tqs.scan_rows, block); |
519 | 70.2k | SchemaScannerHelper::insert_int64_value(7, tqs.scan_bytes, block); |
520 | 70.2k | SchemaScannerHelper::insert_int64_value(8, tqs.max_peak_memory_bytes, block); |
521 | 70.2k | SchemaScannerHelper::insert_int64_value(9, tqs.current_used_memory_bytes, block); |
522 | 70.2k | SchemaScannerHelper::insert_int64_value(10, tqs.shuffle_send_bytes, block); |
523 | 70.2k | SchemaScannerHelper::insert_int64_value(11, tqs.shuffle_send_rows, block); |
524 | | |
525 | 70.2k | std::stringstream ss; |
526 | 70.2k | ss << resource_ctx->task_controller()->query_type(); |
527 | 70.2k | SchemaScannerHelper::insert_string_value(12, ss.str(), block); |
528 | 70.2k | SchemaScannerHelper::insert_int64_value(13, tqs.spill_write_bytes_to_local_storage, block); |
529 | 70.2k | SchemaScannerHelper::insert_int64_value(14, tqs.spill_read_bytes_from_local_storage, block); |
530 | 70.2k | } |
531 | 608 | } |
532 | | |
533 | | Status RuntimeQueryStatisticsMgr::get_query_statistics(const std::string& query_id, |
534 | 1 | TQueryStatistics* query_stats) { |
535 | 1 | std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock); |
536 | | |
537 | 1 | auto resource_ctx = _resource_contexts_map.find(query_id); |
538 | 1 | if (resource_ctx == _resource_contexts_map.end()) { |
539 | 0 | return Status::InternalError("failed to find query with id {}", query_id); |
540 | 0 | } |
541 | | |
542 | 1 | resource_ctx->second->to_thrift_query_statistics(query_stats); |
543 | 1 | return Status::OK(); |
544 | 1 | } |
545 | | |
546 | | void RuntimeQueryStatisticsMgr::get_tasks_resource_context( |
547 | 0 | std::vector<std::shared_ptr<ResourceContext>>& resource_ctxs) { |
548 | 0 | std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock); |
549 | 0 | for (auto& iter : _resource_contexts_map) { |
550 | 0 | resource_ctxs.push_back(iter.second); |
551 | 0 | } |
552 | 0 | } |
553 | | |
554 | | } // namespace doris |