Coverage Report

Created: 2026-05-16 21:00

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/runtime_query_statistics_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/runtime_query_statistics_mgr.h"
19
20
#include <gen_cpp/FrontendService_types.h>
21
#include <gen_cpp/RuntimeProfile_types.h>
22
#include <gen_cpp/Status_types.h>
23
#include <gen_cpp/Types_types.h>
24
#include <thrift/TApplicationException.h>
25
26
#include <condition_variable>
27
#include <cstdint>
28
#include <memory>
29
#include <mutex>
30
#include <shared_mutex>
31
#include <string>
32
#include <tuple>
33
#include <unordered_map>
34
#include <vector>
35
36
#include "common/logging.h"
37
#include "common/status.h"
38
#include "core/block/block.h"
39
#include "information_schema/schema_scanner_helper.h"
40
#include "runtime/exec_env.h"
41
#include "util/client_cache.h"
42
#include "util/debug_util.h"
43
#include "util/threadpool.h"
44
#include "util/thrift_client.h"
45
#include "util/time.h"
46
#include "util/uid_util.h"
47
48
namespace doris {
49
// TODO: Currently this function is only used to report profile.
50
// In the future, all exec status and query statistics should be reported
51
// thorough this function.
52
static Status _do_report_exec_stats_rpc(const TNetworkAddress& coor_addr,
53
                                        const TReportExecStatusParams& req,
54
0
                                        TReportExecStatusResult& res) {
55
0
    Status client_status;
56
0
    FrontendServiceConnection rpc_client(ExecEnv::GetInstance()->frontend_client_cache(), coor_addr,
57
0
                                         config::thrift_rpc_timeout_ms, &client_status);
58
0
    if (!client_status.ok()) {
59
0
        LOG_WARNING(
60
0
                "Could not get client rpc client of {} when reporting profiles, reason is {}, "
61
0
                "not reporting, profile will be lost",
62
0
                PrintThriftNetworkAddress(coor_addr), client_status.to_string());
63
0
        return Status::RpcError("Client rpc client failed");
64
0
    }
65
66
0
    VLOG_DEBUG << "Sending profile";
67
68
0
    try {
69
0
        try {
70
0
            rpc_client->reportExecStatus(res, req);
71
0
        } catch (const apache::thrift::transport::TTransportException& e) {
72
#ifndef ADDRESS_SANITIZER
73
            LOG_WARNING("Transport exception from {}, reason: {}, reopening",
74
                        PrintThriftNetworkAddress(coor_addr), e.what());
75
            client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms);
76
            if (!client_status.ok()) {
77
                LOG_WARNING("Reopen failed, reason: {}", client_status.to_string());
78
                return Status::RpcError("Open rpc client failed");
79
            }
80
81
            rpc_client->reportExecStatus(res, req);
82
#else
83
0
            return Status::RpcError("Transport exception when report query profile, {}", e.what());
84
0
#endif
85
0
        }
86
0
    } catch (apache::thrift::TApplicationException& e) {
87
0
        if (e.getType() == e.UNKNOWN_METHOD) {
88
0
            LOG_WARNING(
89
0
                    "Failed to report query profile to {} due to {}, usually because the frontend "
90
0
                    "is not upgraded, check the version",
91
0
                    PrintThriftNetworkAddress(coor_addr), e.what());
92
0
        } else {
93
0
            LOG_WARNING(
94
0
                    "Failed to report query profile to {}, reason: {}, you can see fe log for "
95
0
                    "details.",
96
0
                    PrintThriftNetworkAddress(coor_addr), e.what());
97
0
        }
98
0
        return Status::RpcError("Send stats failed");
99
0
    } catch (apache::thrift::TException& e) {
100
0
        LOG_WARNING("Failed to report query profile to {}, reason: {} ",
101
0
                    PrintThriftNetworkAddress(coor_addr), e.what());
102
0
        std::this_thread::sleep_for(
103
0
                std::chrono::milliseconds(config::thrift_client_retry_interval_ms * 2));
104
        // just reopen to disable this connection
105
0
        static_cast<void>(rpc_client.reopen(config::thrift_rpc_timeout_ms));
106
0
        return Status::RpcError("Transport exception when report query profile");
107
0
    } catch (std::exception& e) {
108
0
        LOG_WARNING(
109
0
                "Failed to report query profile to {}, reason: {}, you can see fe log for details.",
110
0
                PrintThriftNetworkAddress(coor_addr), e.what());
111
0
        return Status::RpcError("Send report query profile failed");
112
0
    }
113
114
0
    return Status::OK();
115
0
}
116
117
static void _report_query_profiles_function(
118
        std::unordered_map<TUniqueId,
119
                           std::tuple<TNetworkAddress,
120
                                      std::unordered_map<int, std::vector<TProfileNodeReport>>>>
121
                profile_copy,
122
        std::unordered_map<std::pair<TUniqueId, int32_t>, std::shared_ptr<TRuntimeProfileTree>>
123
0
                load_channel_profile_copy) {
124
    // query_id -> {coordinator_addr, {fragment_id -> list<profile_node_report>}}
125
0
    for (auto& entry : profile_copy) {
126
0
        const auto& query_id = entry.first;
127
0
        const auto& coor_addr = std::get<0>(entry.second);
128
0
        auto& fragment_profile_map = std::get<1>(entry.second);
129
130
0
        if (fragment_profile_map.empty()) {
131
0
            auto msg = fmt::format("Query {} does not have profile", print_id(query_id));
132
0
            DCHECK(false) << msg;
133
0
            LOG_ERROR(msg);
134
0
            continue;
135
0
        }
136
137
0
        std::vector<std::shared_ptr<TRuntimeProfileTree>> load_channel_profiles;
138
0
        for (auto load_channel_profile : load_channel_profile_copy) {
139
0
            if (load_channel_profile.second == nullptr) {
140
0
                auto msg = fmt::format(
141
0
                        "Register fragment profile {} {} failed, load channel profile is null",
142
0
                        print_id(query_id), -1);
143
0
                DCHECK(false) << msg;
144
0
                LOG_ERROR(msg);
145
0
                continue;
146
0
            }
147
148
0
            load_channel_profiles.push_back(load_channel_profile.second);
149
0
        }
150
151
0
        TReportExecStatusParams req = RuntimeQueryStatisticsMgr::create_report_exec_status_params(
152
0
                query_id, std::move(fragment_profile_map), std::move(load_channel_profiles),
153
0
                /*is_done=*/true);
154
0
        TReportExecStatusResult res;
155
156
0
        auto rpc_status = _do_report_exec_stats_rpc(coor_addr, req, res);
157
158
0
        if (res.status.status_code != TStatusCode::OK || !rpc_status.ok()) {
159
0
            LOG_WARNING("Query {} send profile to {} failed", print_id(query_id),
160
0
                        PrintThriftNetworkAddress(coor_addr));
161
0
        } else {
162
0
            VLOG_CRITICAL << fmt::format("Send {} profile succeed", print_id(query_id));
163
0
        }
164
0
    }
165
0
}
166
167
TReportExecStatusParams RuntimeQueryStatisticsMgr::create_report_exec_status_params(
168
        const TUniqueId& query_id,
169
        std::unordered_map<int32_t, std::vector<TProfileNodeReport>>
170
                fragment_id_to_profile_node_reports,
171
0
        std::vector<std::shared_ptr<TRuntimeProfileTree>> load_channel_profiles, bool is_done) {
172
    // This function will clear the data of fragment_id_to_profile_node_reports and
173
    // load_channel_profiles.
174
0
    TQueryProfile profile;
175
0
    profile.__set_query_id(query_id);
176
177
0
    std::map<int32_t, std::vector<TProfileNodeReport>> fragment_id_to_profile_node_reports_req;
178
179
0
    for (auto& entry : fragment_id_to_profile_node_reports) {
180
0
        int32_t fragment_id = entry.first;
181
0
        std::vector<TProfileNodeReport>& profile_node_reports = entry.second;
182
0
        for (const auto& profile_node_report : profile_node_reports) {
183
0
            if (!profile_node_report.__isset.profile ||
184
0
                !profile_node_report.__isset.profile_node_type) {
185
0
                auto msg = fmt::format("Register fragment profile {} {} failed, profile is null",
186
0
                                       print_id(query_id), fragment_id);
187
0
                DCHECK(false) << msg;
188
0
                LOG_ERROR(msg);
189
0
                continue;
190
0
            }
191
0
        }
192
0
        fragment_id_to_profile_node_reports_req[fragment_id] = std::move(profile_node_reports);
193
0
    }
194
195
0
    if (fragment_id_to_profile_node_reports_req.empty()) {
196
0
        LOG_WARNING("No fragment profile found for query {}", print_id(query_id));
197
0
    }
198
199
0
    profile.__set_fragment_id_to_profile_node_reports(fragment_id_to_profile_node_reports_req);
200
201
0
    std::vector<TRuntimeProfileTree> load_channel_profiles_req;
202
0
    for (auto load_channel_profile : load_channel_profiles) {
203
0
        if (load_channel_profile == nullptr) {
204
0
            auto msg = fmt::format(
205
0
                    "Register fragment profile {} {} failed, load channel profile is null",
206
0
                    print_id(query_id), -1);
207
0
            DCHECK(false) << msg;
208
0
            LOG_ERROR(msg);
209
0
            continue;
210
0
        }
211
212
0
        load_channel_profiles_req.push_back(std::move(*load_channel_profile));
213
0
    }
214
215
0
    if (!load_channel_profiles_req.empty()) {
216
0
        THRIFT_MOVE_VALUES(profile, load_channel_profiles, load_channel_profiles_req);
217
0
    }
218
219
0
    TReportExecStatusParams req;
220
0
    THRIFT_MOVE_VALUES(req, query_profile, profile);
221
0
    req.__set_backend_id(ExecEnv::GetInstance()->cluster_info()->backend_id);
222
    // This RPC is profile-only: the real query id is carried in query_profile.query_id.
223
    // Keep the top-level query id invalid so old FEs do not route this status-less
224
    // report into updateFragmentExecStatus(), which expects params.status.
225
0
    req.__set_query_id(TUniqueId());
226
0
    req.__set_done(is_done);
227
228
0
    return req;
229
0
}
230
231
0
Status RuntimeQueryStatisticsMgr::start_report_thread() {
232
0
    if (started.load()) {
233
0
        DCHECK(false) << "report thread has been started";
234
0
        LOG_ERROR("report thread has been started");
235
0
        return Status::InternalError("Report thread has been started");
236
0
    }
237
238
0
    started.store(true);
239
0
    ThreadPoolBuilder profile_report_thread_pool_builder("ReportProfileThreadPool");
240
241
0
    return profile_report_thread_pool_builder.set_max_threads(config::report_exec_status_thread_num)
242
0
            .build(&_thread_pool);
243
0
}
244
245
// 1. lock the profile_map.
246
// 2. copy the profile_map and load_channel_profile_map to local variables.
247
// 3. unlock the profile_map.
248
// 4. create a profile reporting task and add it to the thread pool.
249
0
void RuntimeQueryStatisticsMgr::trigger_profile_reporting() {
250
0
    decltype(_profile_map) profile_copy;
251
0
    decltype(_load_channel_profile_map) load_channel_profile_copy;
252
253
0
    {
254
0
        std::unique_lock<std::mutex> lg(_profile_map_lock);
255
0
        _profile_map.swap(profile_copy);
256
0
        _load_channel_profile_map.swap(load_channel_profile_copy);
257
0
    }
258
259
0
    auto st = _thread_pool->submit_func(
260
0
            [profile_copy = std::move(profile_copy),
261
0
             load_channel_profile_copy = std::move(load_channel_profile_copy)]() mutable {
262
0
                _report_query_profiles_function(std::move(profile_copy),
263
0
                                                std::move(load_channel_profile_copy));
264
0
            });
265
266
0
    if (!st.ok()) {
267
0
        LOG_WARNING("Failed to submit profile reporting task, reason: {}", st.to_string());
268
        // If the thread pool is full, we will not report the profile.
269
        // The profile will be lost.
270
0
        return;
271
0
    }
272
0
}
273
274
26
void RuntimeQueryStatisticsMgr::stop_report_thread() {
275
26
    if (!started) {
276
26
        return;
277
26
    }
278
279
0
    LOG_INFO("All report threads are going to stop");
280
0
    _thread_pool->shutdown();
281
0
    LOG_INFO("All report threads stopped");
282
0
}
283
284
void RuntimeQueryStatisticsMgr::register_fragment_profile(
285
        const TUniqueId& query_id, const TNetworkAddress& coor_addr, int32_t fragment_id,
286
        std::vector<TProfileNodeReport> profile_node_reports,
287
0
        std::shared_ptr<TRuntimeProfileTree> load_channel_profile) {
288
0
    for (const auto& profile_node_report : profile_node_reports) {
289
0
        if (!profile_node_report.__isset.profile ||
290
0
            !profile_node_report.__isset.profile_node_type) {
291
0
            auto msg = fmt::format("Register fragment profile {} {} failed, profile is null",
292
0
                                   print_id(query_id), fragment_id);
293
0
            DCHECK(false) << msg;
294
0
            LOG_ERROR(msg);
295
0
            return;
296
0
        }
297
0
    }
298
299
0
    std::unique_lock<std::mutex> lg(_profile_map_lock);
300
301
0
    if (!_profile_map.contains(query_id)) {
302
0
        _profile_map[query_id] = std::make_tuple(
303
0
                coor_addr, std::unordered_map<int, std::vector<TProfileNodeReport>>());
304
0
    }
305
306
0
    std::unordered_map<int, std::vector<TProfileNodeReport>>& fragment_profile_map =
307
0
            std::get<1>(_profile_map[query_id]);
308
0
    auto profile_node_report_size = profile_node_reports.size();
309
0
    fragment_profile_map.insert(std::make_pair(fragment_id, std::move(profile_node_reports)));
310
311
0
    if (load_channel_profile != nullptr) {
312
0
        _load_channel_profile_map[std::make_pair(query_id, fragment_id)] = load_channel_profile;
313
0
    }
314
315
0
    VLOG_CRITICAL << fmt::format("register x profile done {}, fragment {}, profiles {}",
316
0
                                 print_id(query_id), fragment_id, profile_node_report_size);
317
0
}
318
319
void RuntimeQueryStatisticsMgr::register_resource_context(
320
0
        std::string query_id, std::shared_ptr<ResourceContext> resource_ctx) {
321
0
    std::lock_guard<std::shared_mutex> write_lock(_resource_contexts_map_lock);
322
    // Note: `group_commit_insert` will use the same `query_id` to submit multiple load tasks in sequence.
323
    // After the previous load task ends but QueryStatistics has not been reported to FE,
324
    // if the next load task with the same `query_id` starts to execute, `register_resource_context` will
325
    // find that `query_id` already exists in _resource_contexts_map.
326
    // At this time, directly overwriting the `resource_ctx` corresponding to the `query_id`
327
    // in `register_resource_context` will cause the previous load task not to be reported to FE.
328
    // DCHECK(_resource_contexts_map.find(query_id) == _resource_contexts_map.end());
329
0
    _resource_contexts_map[query_id] = resource_ctx;
330
0
}
331
332
0
void RuntimeQueryStatisticsMgr::report_runtime_query_statistics() {
333
0
    int64_t be_id = ExecEnv::GetInstance()->cluster_info()->backend_id;
334
    // 1 get query statistics map
335
    // <fe_addr, <query_id, <query_statistics, is_query_finished>>>
336
0
    std::map<TNetworkAddress, std::map<std::string, std::pair<TQueryStatistics, bool>>> fe_qs_map;
337
0
    std::map<std::string, std::pair<bool, bool>> qs_status; // <finished, timeout>
338
0
    {
339
0
        std::lock_guard<std::shared_mutex> write_lock(_resource_contexts_map_lock);
340
0
        int64_t current_time = MonotonicMillis();
341
0
        int64_t conf_qs_timeout = config::query_statistics_reserve_timeout_ms;
342
343
0
        for (auto iter = _resource_contexts_map.begin(); iter != _resource_contexts_map.end();) {
344
0
            std::string query_id = iter->first;
345
0
            auto resource_ctx = iter->second;
346
0
            bool is_query_finished = resource_ctx->task_controller()->is_finished();
347
0
            bool is_timeout_after_finish = false;
348
0
            if (is_query_finished) {
349
0
                is_timeout_after_finish =
350
0
                        (current_time - resource_ctx->task_controller()->finish_time()) >
351
0
                        conf_qs_timeout;
352
0
            }
353
354
            // external query not need to report to FE, so we can remove it directly.
355
0
            if (resource_ctx->task_controller()->query_type() == TQueryType::EXTERNAL &&
356
0
                is_query_finished) {
357
0
                iter = _resource_contexts_map.erase(iter);
358
0
            } else {
359
0
                if (resource_ctx->task_controller()->query_type() != TQueryType::EXTERNAL) {
360
0
                    if (fe_qs_map.find(resource_ctx->task_controller()->fe_addr()) ==
361
0
                        fe_qs_map.end()) {
362
0
                        std::map<std::string, std::pair<TQueryStatistics, bool>> tmp_map;
363
0
                        fe_qs_map[resource_ctx->task_controller()->fe_addr()] = std::move(tmp_map);
364
0
                    }
365
366
0
                    TQueryStatistics ret_t_qs;
367
0
                    resource_ctx->to_thrift_query_statistics(&ret_t_qs);
368
0
                    fe_qs_map.at(resource_ctx->task_controller()->fe_addr())[query_id] =
369
0
                            std::make_pair(ret_t_qs, is_query_finished);
370
0
                    qs_status[query_id] =
371
0
                            std::make_pair(is_query_finished, is_timeout_after_finish);
372
0
                }
373
374
0
                iter++;
375
0
            }
376
0
        }
377
0
    }
378
379
    // 2 report query statistics to fe
380
0
    std::map<TNetworkAddress, bool> rpc_result;
381
0
    for (auto& [addr, qs_map] : fe_qs_map) {
382
0
        rpc_result[addr] = false;
383
        // 2.1 get client
384
0
        Status coord_status;
385
0
        FrontendServiceConnection coord(ExecEnv::GetInstance()->frontend_client_cache(), addr,
386
0
                                        config::thrift_rpc_timeout_ms, &coord_status);
387
0
        std::string add_str = PrintThriftNetworkAddress(addr);
388
0
        if (!coord_status.ok()) {
389
0
            std::stringstream ss;
390
0
            LOG(WARNING) << "[report_query_statistics]could not get client " << add_str
391
0
                         << " when report workload runtime stats, reason:"
392
0
                         << coord_status.to_string();
393
0
            continue;
394
0
        }
395
396
0
        auto reopen_coord = [&coord]() -> Status {
397
0
            std::this_thread::sleep_for(
398
0
                    std::chrono::milliseconds(config::thrift_client_retry_interval_ms * 2));
399
            // just reopen to disable this connection
400
0
            return coord.reopen(config::thrift_rpc_timeout_ms);
401
0
        };
402
403
        // 2.2 send report
404
0
        TReportWorkloadRuntimeStatusParams report_runtime_params;
405
0
        report_runtime_params.__set_backend_id(be_id);
406
407
        // Build the query statistics map with TQueryStatisticsResult
408
0
        std::map<std::string, TQueryStatisticsResult> query_stats_result_map;
409
0
        for (const auto& [query_id, query_stats_pair] : qs_map) {
410
0
            TQueryStatisticsResult stats_result;
411
0
            stats_result.__set_statistics(query_stats_pair.first);      // TQueryStatistics
412
0
            stats_result.__set_query_finished(query_stats_pair.second); // is_query_finished
413
0
            query_stats_result_map[query_id] = stats_result;
414
0
        }
415
416
0
        report_runtime_params.__set_query_statistics_result_map(query_stats_result_map);
417
418
0
        TReportExecStatusParams params;
419
0
        params.__set_report_workload_runtime_status(report_runtime_params);
420
421
0
        TReportExecStatusResult res;
422
0
        Status rpc_status;
423
424
0
        try {
425
0
            try {
426
0
                coord->reportExecStatus(res, params);
427
0
                rpc_result[addr] = true;
428
0
            } catch (apache::thrift::transport::TTransportException& e) {
429
0
                rpc_status = reopen_coord();
430
#ifndef ADDRESS_SANITIZER
431
                LOG_WARNING(
432
                        "[report_query_statistics] report to fe {} failed, reason:{}, try reopen.",
433
                        add_str, e.what());
434
#else
435
0
                std::cerr << "thrift error, reason=" << e.what();
436
0
#endif
437
0
                if (rpc_status.ok()) {
438
0
                    coord->reportExecStatus(res, params);
439
0
                    rpc_result[addr] = true;
440
0
                }
441
0
            }
442
0
        } catch (apache::thrift::TApplicationException& e) {
443
0
            LOG_WARNING(
444
0
                    "[report_query_statistics]fe {} throw exception when report statistics, "
445
0
                    "reason:{}, you can see fe log for details.",
446
0
                    add_str, e.what());
447
0
            rpc_status = reopen_coord();
448
0
        } catch (apache::thrift::TException& e) {
449
0
            LOG_WARNING(
450
0
                    "[report_query_statistics]report workload runtime statistics to {} failed,  "
451
0
                    "reason: {}",
452
0
                    add_str, e.what());
453
0
            rpc_status = reopen_coord();
454
0
        } catch (std::exception& e) {
455
0
            LOG_WARNING(
456
0
                    "[report_query_statistics]unknown exception when report workload runtime "
457
0
                    "statistics to {}, reason:{}. ",
458
0
                    add_str, e.what());
459
0
        }
460
461
0
        if (!rpc_status.ok()) {
462
0
            LOG_WARNING(
463
0
                    "[report_query_statistics]reopen thrift client failed when report "
464
0
                    "workload runtime statistics to {}, reason: {}",
465
0
                    add_str, rpc_status.to_string());
466
0
        }
467
0
    }
468
469
    //  3 when query is finished and (last rpc is send success), remove finished query statistics
470
0
    if (fe_qs_map.empty()) {
471
0
        return;
472
0
    }
473
474
0
    {
475
0
        std::lock_guard<std::shared_mutex> write_lock(_resource_contexts_map_lock);
476
0
        for (auto& [addr, qs_map] : fe_qs_map) {
477
0
            bool is_rpc_success = rpc_result[addr];
478
0
            for (auto& [query_id, qs] : qs_map) {
479
0
                auto& qs_status_pair = qs_status[query_id];
480
0
                bool is_query_finished = qs_status_pair.first;
481
0
                bool is_timeout_after_finish = qs_status_pair.second;
482
0
                if ((is_rpc_success && is_query_finished) || is_timeout_after_finish) {
483
0
                    _resource_contexts_map.erase(query_id);
484
0
                }
485
0
            }
486
0
        }
487
0
    }
488
0
}
489
490
0
void RuntimeQueryStatisticsMgr::get_active_be_tasks_block(Block* block) {
491
0
    std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock);
492
0
    int64_t be_id = ExecEnv::GetInstance()->cluster_info()->backend_id;
493
494
    // block's schema come from SchemaBackendActiveTasksScanner::_s_tbls_columns
495
0
    for (auto& [query_id, resource_ctx] : _resource_contexts_map) {
496
0
        TQueryStatistics tqs;
497
0
        resource_ctx->to_thrift_query_statistics(&tqs);
498
0
        SchemaScannerHelper::insert_int64_value(0, be_id, block);
499
0
        SchemaScannerHelper::insert_string_value(
500
0
                1, resource_ctx->task_controller()->fe_addr().hostname, block);
501
0
        auto wg = resource_ctx->workload_group();
502
0
        SchemaScannerHelper::insert_int64_value(2, wg ? wg->id() : -1, block);
503
0
        SchemaScannerHelper::insert_string_value(3, query_id, block);
504
505
0
        int64_t task_time =
506
0
                resource_ctx->task_controller()->is_finished()
507
0
                        ? resource_ctx->task_controller()->finish_time() -
508
0
                                  resource_ctx->task_controller()->start_time()
509
0
                        : MonotonicMillis() - resource_ctx->task_controller()->start_time();
510
0
        SchemaScannerHelper::insert_int64_value(4, task_time, block);
511
0
        SchemaScannerHelper::insert_int64_value(5, tqs.cpu_ms, block);
512
0
        SchemaScannerHelper::insert_int64_value(6, tqs.scan_rows, block);
513
0
        SchemaScannerHelper::insert_int64_value(7, tqs.scan_bytes, block);
514
0
        SchemaScannerHelper::insert_int64_value(8, tqs.max_peak_memory_bytes, block);
515
0
        SchemaScannerHelper::insert_int64_value(9, tqs.current_used_memory_bytes, block);
516
0
        SchemaScannerHelper::insert_int64_value(10, tqs.shuffle_send_bytes, block);
517
0
        SchemaScannerHelper::insert_int64_value(11, tqs.shuffle_send_rows, block);
518
519
0
        std::stringstream ss;
520
0
        ss << resource_ctx->task_controller()->query_type();
521
0
        SchemaScannerHelper::insert_string_value(12, ss.str(), block);
522
0
        SchemaScannerHelper::insert_int64_value(13, tqs.spill_write_bytes_to_local_storage, block);
523
0
        SchemaScannerHelper::insert_int64_value(14, tqs.spill_read_bytes_from_local_storage, block);
524
0
    }
525
0
}
526
527
Status RuntimeQueryStatisticsMgr::get_query_statistics(const std::string& query_id,
528
0
                                                       TQueryStatistics* query_stats) {
529
0
    std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock);
530
531
0
    auto resource_ctx = _resource_contexts_map.find(query_id);
532
0
    if (resource_ctx == _resource_contexts_map.end()) {
533
0
        return Status::InternalError("failed to find query with id {}", query_id);
534
0
    }
535
536
0
    resource_ctx->second->to_thrift_query_statistics(query_stats);
537
0
    return Status::OK();
538
0
}
539
540
void RuntimeQueryStatisticsMgr::get_tasks_resource_context(
541
0
        std::vector<std::shared_ptr<ResourceContext>>& resource_ctxs) {
542
0
    std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock);
543
0
    for (auto& iter : _resource_contexts_map) {
544
0
        resource_ctxs.push_back(iter.second);
545
0
    }
546
0
}
547
548
} // namespace doris