Coverage Report

Created: 2024-11-21 14:46

/root/doris/be/src/runtime/query_statistics.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/query_statistics.h"
19
20
#include <gen_cpp/data.pb.h>
21
#include <glog/logging.h>
22
23
#include <memory>
24
25
#include "util/time.h"
26
27
namespace doris {
28
29
0
void QueryStatistics::merge(const QueryStatistics& other) {
30
0
    scan_rows += other.scan_rows.load(std::memory_order_relaxed);
31
0
    scan_bytes += other.scan_bytes.load(std::memory_order_relaxed);
32
0
    cpu_nanos += other.cpu_nanos.load(std::memory_order_relaxed);
33
0
    shuffle_send_bytes += other.shuffle_send_bytes.load(std::memory_order_relaxed);
34
0
    shuffle_send_rows += other.shuffle_send_rows.load(std::memory_order_relaxed);
35
36
0
    int64_t other_peak_mem = other.max_peak_memory_bytes.load(std::memory_order_relaxed);
37
0
    if (other_peak_mem > this->max_peak_memory_bytes) {
38
0
        this->max_peak_memory_bytes = other_peak_mem;
39
0
    }
40
41
0
    int64_t other_memory_used = other.current_used_memory_bytes.load(std::memory_order_relaxed);
42
0
    if (other_memory_used > 0) {
43
0
        this->current_used_memory_bytes = other_memory_used;
44
0
    }
45
0
}
46
47
0
void QueryStatistics::to_pb(PQueryStatistics* statistics) {
48
0
    DCHECK(statistics != nullptr);
49
0
    statistics->set_scan_rows(scan_rows);
50
0
    statistics->set_scan_bytes(scan_bytes);
51
0
    statistics->set_cpu_ms(cpu_nanos / NANOS_PER_MILLIS);
52
0
    statistics->set_returned_rows(returned_rows);
53
0
    statistics->set_max_peak_memory_bytes(max_peak_memory_bytes);
54
0
}
55
56
0
void QueryStatistics::to_thrift(TQueryStatistics* statistics) const {
57
0
    DCHECK(statistics != nullptr);
58
0
    statistics->__set_scan_bytes(scan_bytes.load(std::memory_order_relaxed));
59
0
    statistics->__set_scan_rows(scan_rows.load(std::memory_order_relaxed));
60
0
    statistics->__set_cpu_ms(cpu_nanos.load(std::memory_order_relaxed) / NANOS_PER_MILLIS);
61
0
    statistics->__set_returned_rows(returned_rows);
62
0
    statistics->__set_max_peak_memory_bytes(max_peak_memory_bytes.load(std::memory_order_relaxed));
63
0
    statistics->__set_current_used_memory_bytes(
64
0
            current_used_memory_bytes.load(std::memory_order_relaxed));
65
0
    statistics->__set_shuffle_send_bytes(shuffle_send_bytes.load(std::memory_order_relaxed));
66
0
    statistics->__set_shuffle_send_rows(shuffle_send_rows.load(std::memory_order_relaxed));
67
0
}
68
69
0
void QueryStatistics::from_pb(const PQueryStatistics& statistics) {
70
0
    scan_rows = statistics.scan_rows();
71
0
    scan_bytes = statistics.scan_bytes();
72
0
    cpu_nanos = statistics.cpu_ms() * NANOS_PER_MILLIS;
73
0
}
74
75
0
void QueryStatistics::merge(QueryStatisticsRecvr* recvr) {
76
0
    recvr->merge(this);
77
0
}
78
79
0
void QueryStatistics::merge(QueryStatisticsRecvr* recvr, int sender_id) {
80
0
    DCHECK(recvr != nullptr);
81
0
    auto QueryStatisticsptr = recvr->find(sender_id);
82
0
    if (QueryStatisticsptr) {
83
0
        merge(*QueryStatisticsptr);
84
0
    }
85
0
}
86
87
23
QueryStatistics::~QueryStatistics() {}
88
89
0
void QueryStatisticsRecvr::insert(const PQueryStatistics& statistics, int sender_id) {
90
0
    std::lock_guard<std::mutex> l(_lock);
91
0
    if (!_query_statistics.contains(sender_id)) {
92
0
        _query_statistics[sender_id] = std::make_shared<QueryStatistics>();
93
0
    }
94
0
    _query_statistics[sender_id]->from_pb(statistics);
95
0
}
96
97
0
void QueryStatisticsRecvr::insert(QueryStatisticsPtr statistics, int sender_id) {
98
0
    if (!statistics->collected()) return;
99
0
    if (_query_statistics.contains(sender_id)) return;
100
0
    std::lock_guard<std::mutex> l(_lock);
101
0
    _query_statistics[sender_id] = statistics;
102
0
}
103
104
0
QueryStatisticsPtr QueryStatisticsRecvr::find(int sender_id) {
105
0
    std::lock_guard<std::mutex> l(_lock);
106
0
    auto it = _query_statistics.find(sender_id);
107
0
    if (it != _query_statistics.end()) {
108
0
        return it->second;
109
0
    }
110
0
    return nullptr;
111
0
}
112
113
} // namespace doris