Coverage Report

Created: 2025-05-01 03:10

/root/doris/be/src/runtime/query_statistics.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/FrontendService_types.h>
21
#include <gen_cpp/PaloInternalService_types.h>
22
#include <stdint.h>
23
24
#include <map>
25
#include <memory>
26
#include <mutex>
27
#include <unordered_map>
28
#include <utility>
29
30
#include "util/spinlock.h"
31
32
namespace doris {
33
34
class QueryStatisticsRecvr;
35
class PNodeStatistics;
36
class PQueryStatistics;
37
38
// This is responsible for collecting query statistics, usually it consists of
39
// two parts, one is current fragment or plan's statistics, the other is sub fragment
40
// or plan's statistics and QueryStatisticsRecvr is responsible for collecting it.
41
class QueryStatistics {
42
public:
43
    QueryStatistics()
44
            : scan_rows(0),
45
              scan_bytes(0),
46
              cpu_nanos(0),
47
              _scan_bytes_from_local_storage(0),
48
              _scan_bytes_from_remote_storage(0),
49
              returned_rows(0),
50
              max_peak_memory_bytes(0),
51
              current_used_memory_bytes(0),
52
              shuffle_send_bytes(0),
53
24
              shuffle_send_rows(0) {}
54
    virtual ~QueryStatistics();
55
56
    void merge(const QueryStatistics& other);
57
58
0
    void add_scan_rows(int64_t delta_scan_rows) {
59
0
        this->scan_rows.fetch_add(delta_scan_rows, std::memory_order_relaxed);
60
0
    }
61
62
0
    void add_scan_bytes(int64_t delta_scan_bytes) {
63
0
        this->scan_bytes.fetch_add(delta_scan_bytes, std::memory_order_relaxed);
64
0
    }
65
66
0
    void add_cpu_nanos(int64_t delta_cpu_time) {
67
0
        this->cpu_nanos.fetch_add(delta_cpu_time, std::memory_order_relaxed);
68
0
    }
69
70
0
    void add_scan_bytes_from_local_storage(int64_t scan_bytes_from_local_storage) {
71
0
        _scan_bytes_from_local_storage += scan_bytes_from_local_storage;
72
0
    }
73
0
    void add_scan_bytes_from_remote_storage(int64_t scan_bytes_from_remote_storage) {
74
0
        _scan_bytes_from_remote_storage += scan_bytes_from_remote_storage;
75
0
    }
76
77
0
    void add_shuffle_send_bytes(int64_t delta_bytes) {
78
0
        this->shuffle_send_bytes.fetch_add(delta_bytes, std::memory_order_relaxed);
79
0
    }
80
81
0
    void add_shuffle_send_rows(int64_t delta_rows) {
82
0
        this->shuffle_send_rows.fetch_add(delta_rows, std::memory_order_relaxed);
83
0
    }
84
85
0
    void set_returned_rows(int64_t num_rows) { this->returned_rows = num_rows; }
86
87
5
    void set_max_peak_memory_bytes(int64_t max_peak_memory_bytes) {
88
5
        this->max_peak_memory_bytes.store(max_peak_memory_bytes, std::memory_order_relaxed);
89
5
    }
90
91
5
    void set_current_used_memory_bytes(int64_t current_used_memory) {
92
5
        this->current_used_memory_bytes.store(current_used_memory, std::memory_order_relaxed);
93
5
    }
94
95
    void merge(QueryStatisticsRecvr* recvr);
96
97
    void merge(QueryStatisticsRecvr* recvr, int sender_id);
98
99
    void clearNodeStatistics();
100
101
0
    void clear() {
102
0
        scan_rows.store(0, std::memory_order_relaxed);
103
0
        scan_bytes.store(0, std::memory_order_relaxed);
104
0
        cpu_nanos.store(0, std::memory_order_relaxed);
105
0
        shuffle_send_bytes.store(0, std::memory_order_relaxed);
106
0
        shuffle_send_rows.store(0, std::memory_order_relaxed);
107
0
        _scan_bytes_from_local_storage.store(0, std::memory_order_relaxed);
108
0
        _scan_bytes_from_remote_storage.store(0, std::memory_order_relaxed);
109
0
110
0
        returned_rows = 0;
111
0
        max_peak_memory_bytes.store(0, std::memory_order_relaxed);
112
0
        clearNodeStatistics();
113
0
        //clear() is used before collection, so calling "clear" is equivalent to being collected.
114
0
        set_collected();
115
0
    }
116
117
    void to_pb(PQueryStatistics* statistics);
118
    void to_thrift(TQueryStatistics* statistics) const;
119
    void from_pb(const PQueryStatistics& statistics);
120
0
    bool collected() const { return _collected; }
121
0
    void set_collected() { _collected = true; }
122
123
0
    int64_t get_scan_rows() { return scan_rows.load(std::memory_order_relaxed); }
124
0
    int64_t get_scan_bytes() { return scan_bytes.load(std::memory_order_relaxed); }
125
0
    int64_t get_current_used_memory_bytes() {
126
0
        return current_used_memory_bytes.load(std::memory_order_relaxed);
127
0
    }
128
129
private:
130
    friend class QueryStatisticsRecvr;
131
    std::atomic<int64_t> scan_rows;
132
    std::atomic<int64_t> scan_bytes;
133
    std::atomic<int64_t> cpu_nanos;
134
    std::atomic<int64_t> _scan_bytes_from_local_storage;
135
    std::atomic<int64_t> _scan_bytes_from_remote_storage;
136
    // number rows returned by query.
137
    // only set once by result sink when closing.
138
    int64_t returned_rows;
139
    // Maximum memory peak for all backends.
140
    // only set once by result sink when closing.
141
    std::atomic<int64_t> max_peak_memory_bytes;
142
    bool _collected = false;
143
    std::atomic<int64_t> current_used_memory_bytes;
144
145
    std::atomic<int64_t> shuffle_send_bytes;
146
    std::atomic<int64_t> shuffle_send_rows;
147
};
148
using QueryStatisticsPtr = std::shared_ptr<QueryStatistics>;
149
// It is used for collecting sub plan query statistics in DataStreamRecvr.
150
class QueryStatisticsRecvr {
151
public:
152
    ~QueryStatisticsRecvr() = default;
153
154
    // Transmitted via RPC, incurring serialization overhead.
155
    void insert(const PQueryStatistics& statistics, int sender_id);
156
157
    // using local_exchange for transmission, only need to hold a shared pointer.
158
    void insert(QueryStatisticsPtr statistics, int sender_id);
159
160
    QueryStatisticsPtr find(int sender_id);
161
162
private:
163
    friend class QueryStatistics;
164
165
0
    void merge(QueryStatistics* statistics) {
166
0
        std::lock_guard<std::mutex> l(_lock);
167
0
        for (auto& pair : _query_statistics) {
168
0
            statistics->merge(*(pair.second));
169
0
        }
170
0
    }
171
172
    std::map<int, QueryStatisticsPtr> _query_statistics;
173
    std::mutex _lock;
174
};
175
176
} // namespace doris