Coverage Report

Created: 2024-11-20 15:53

/root/doris/be/src/runtime/query_statistics.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/FrontendService_types.h>
21
#include <gen_cpp/PaloInternalService_types.h>
22
#include <stdint.h>
23
24
#include <map>
25
#include <memory>
26
#include <mutex>
27
#include <unordered_map>
28
#include <utility>
29
30
#include "util/spinlock.h"
31
32
namespace doris {
33
34
class QueryStatisticsRecvr;
35
class PNodeStatistics;
36
class PQueryStatistics;
37
38
// This is responsible for collecting query statistics, usually it consists of
39
// two parts, one is current fragment or plan's statistics, the other is sub fragment
40
// or plan's statistics and QueryStatisticsRecvr is responsible for collecting it.
41
class QueryStatistics {
42
public:
43
    QueryStatistics()
44
            : scan_rows(0),
45
              scan_bytes(0),
46
              cpu_nanos(0),
47
              returned_rows(0),
48
              max_peak_memory_bytes(0),
49
              current_used_memory_bytes(0),
50
              shuffle_send_bytes(0),
51
23
              shuffle_send_rows(0) {}
52
    virtual ~QueryStatistics();
53
54
    void merge(const QueryStatistics& other);
55
56
0
    void add_scan_rows(int64_t delta_scan_rows) {
57
0
        this->scan_rows.fetch_add(delta_scan_rows, std::memory_order_relaxed);
58
0
    }
59
60
0
    void add_scan_bytes(int64_t delta_scan_bytes) {
61
0
        this->scan_bytes.fetch_add(delta_scan_bytes, std::memory_order_relaxed);
62
0
    }
63
64
0
    void add_cpu_nanos(int64_t delta_cpu_time) {
65
0
        this->cpu_nanos.fetch_add(delta_cpu_time, std::memory_order_relaxed);
66
0
    }
67
68
0
    void add_shuffle_send_bytes(int64_t delta_bytes) {
69
0
        this->shuffle_send_bytes.fetch_add(delta_bytes, std::memory_order_relaxed);
70
0
    }
71
72
0
    void add_shuffle_send_rows(int64_t delta_rows) {
73
0
        this->shuffle_send_rows.fetch_add(delta_rows, std::memory_order_relaxed);
74
0
    }
75
76
0
    void set_returned_rows(int64_t num_rows) { this->returned_rows = num_rows; }
77
78
0
    void set_max_peak_memory_bytes(int64_t max_peak_memory_bytes) {
79
0
        this->max_peak_memory_bytes.store(max_peak_memory_bytes, std::memory_order_relaxed);
80
0
    }
81
82
0
    void set_current_used_memory_bytes(int64_t current_used_memory) {
83
0
        this->current_used_memory_bytes.store(current_used_memory, std::memory_order_relaxed);
84
0
    }
85
86
    void merge(QueryStatisticsRecvr* recvr);
87
88
    void merge(QueryStatisticsRecvr* recvr, int sender_id);
89
90
    void clearNodeStatistics();
91
92
0
    void clear() {
93
0
        scan_rows.store(0, std::memory_order_relaxed);
94
0
        scan_bytes.store(0, std::memory_order_relaxed);
95
0
        cpu_nanos.store(0, std::memory_order_relaxed);
96
0
        shuffle_send_bytes.store(0, std::memory_order_relaxed);
97
0
        shuffle_send_rows.store(0, std::memory_order_relaxed);
98
0
99
0
        returned_rows = 0;
100
0
        max_peak_memory_bytes.store(0, std::memory_order_relaxed);
101
0
        clearNodeStatistics();
102
0
        //clear() is used before collection, so calling "clear" is equivalent to being collected.
103
0
        set_collected();
104
0
    }
105
106
    void to_pb(PQueryStatistics* statistics);
107
    void to_thrift(TQueryStatistics* statistics) const;
108
    void from_pb(const PQueryStatistics& statistics);
109
0
    bool collected() const { return _collected; }
110
0
    void set_collected() { _collected = true; }
111
112
0
    int64_t get_scan_rows() { return scan_rows.load(std::memory_order_relaxed); }
113
0
    int64_t get_scan_bytes() { return scan_bytes.load(std::memory_order_relaxed); }
114
0
    int64_t get_current_used_memory_bytes() {
115
0
        return current_used_memory_bytes.load(std::memory_order_relaxed);
116
0
    }
117
118
private:
119
    friend class QueryStatisticsRecvr;
120
    std::atomic<int64_t> scan_rows;
121
    std::atomic<int64_t> scan_bytes;
122
    std::atomic<int64_t> cpu_nanos;
123
    // number rows returned by query.
124
    // only set once by result sink when closing.
125
    int64_t returned_rows;
126
    // Maximum memory peak for all backends.
127
    // only set once by result sink when closing.
128
    std::atomic<int64_t> max_peak_memory_bytes;
129
    bool _collected = false;
130
    std::atomic<int64_t> current_used_memory_bytes;
131
132
    std::atomic<int64_t> shuffle_send_bytes;
133
    std::atomic<int64_t> shuffle_send_rows;
134
};
135
using QueryStatisticsPtr = std::shared_ptr<QueryStatistics>;
136
// It is used for collecting sub plan query statistics in DataStreamRecvr.
137
class QueryStatisticsRecvr {
138
public:
139
    ~QueryStatisticsRecvr() = default;
140
141
    // Transmitted via RPC, incurring serialization overhead.
142
    void insert(const PQueryStatistics& statistics, int sender_id);
143
144
    // using local_exchange for transmission, only need to hold a shared pointer.
145
    void insert(QueryStatisticsPtr statistics, int sender_id);
146
147
    QueryStatisticsPtr find(int sender_id);
148
149
private:
150
    friend class QueryStatistics;
151
152
0
    void merge(QueryStatistics* statistics) {
153
0
        std::lock_guard<std::mutex> l(_lock);
154
0
        for (auto& pair : _query_statistics) {
155
0
            statistics->merge(*(pair.second));
156
0
        }
157
0
    }
158
159
    std::map<int, QueryStatisticsPtr> _query_statistics;
160
    std::mutex _lock;
161
};
162
163
} // namespace doris