Coverage Report

Created: 2026-03-12 08:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_tablet_hotspot.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_tablet_hotspot.h"
19
20
#include <chrono>
21
#include <mutex>
22
23
#include "cloud/config.h"
24
#include "runtime/exec_env.h"
25
#include "storage/tablet/tablet_fwd.h"
26
27
namespace doris {
28
29
1.84M
void TabletHotspot::count(const BaseTablet& tablet) {
30
1.84M
    size_t slot_idx = tablet.tablet_id() % s_slot_size;
31
1.84M
    auto& slot = _tablets_hotspot[slot_idx];
32
1.84M
    std::lock_guard lock(slot.mtx);
33
1.84M
    HotspotCounterPtr counter;
34
1.84M
    if (auto iter = slot.map.find(tablet.tablet_id()); iter == slot.map.end()) {
35
97.3k
        counter = std::make_shared<HotspotCounter>(tablet.table_id(), tablet.index_id(),
36
97.3k
                                                   tablet.partition_id());
37
97.3k
        slot.map.insert(std::make_pair(tablet.tablet_id(), counter));
38
1.74M
    } else {
39
1.74M
        counter = iter->second;
40
1.74M
    }
41
1.84M
    counter->last_access_time = std::chrono::system_clock::now();
42
1.84M
    counter->cur_counter++;
43
1.84M
}
44
45
1
TabletHotspot::TabletHotspot() {
46
1
    _counter_thread = std::thread(&TabletHotspot::make_dot_point, this);
47
1
}
48
49
0
TabletHotspot::~TabletHotspot() {
50
0
    {
51
0
        std::lock_guard lock(_mtx);
52
0
        _closed = true;
53
0
    }
54
0
    _cond.notify_all();
55
0
    if (_counter_thread.joinable()) {
56
0
        _counter_thread.join();
57
0
    }
58
0
}
59
60
void get_return_partitions(
61
        const std::unordered_map<TabletHotspotMapKey,
62
                                 std::unordered_map<int64_t, TabletHotspotMapValue>, MapKeyHash>&
63
                hot_partition,
64
        const std::unordered_map<TabletHotspotMapKey,
65
                                 std::unordered_map<int64_t, TabletHotspotMapValue>, MapKeyHash>&
66
                last_hot_partition,
67
2
        std::vector<THotTableMessage>* hot_tables, int& return_partitions, int N) {
68
52
    for (const auto& [key, partition_to_value] : hot_partition) {
69
52
        THotTableMessage msg;
70
52
        msg.table_id = key.first;
71
52
        msg.index_id = key.second;
72
52
        for (const auto& [partition_id, value] : partition_to_value) {
73
52
            if (return_partitions > N) {
74
1
                return;
75
1
            }
76
51
            auto last_value_iter = last_hot_partition.find(key);
77
51
            if (last_value_iter != last_hot_partition.end()) {
78
0
                auto last_partition_iter = last_value_iter->second.find(partition_id);
79
0
                if (last_partition_iter != last_value_iter->second.end()) {
80
0
                    const auto& last_value = last_partition_iter->second;
81
0
                    if (std::abs(static_cast<int64_t>(value.qpd) -
82
0
                                 static_cast<int64_t>(last_value.qpd)) < 5 &&
83
0
                        std::abs(static_cast<int64_t>(value.qpw) -
84
0
                                 static_cast<int64_t>(last_value.qpw)) < 10 &&
85
0
                        std::abs(static_cast<int64_t>(value.last_access_time) -
86
0
                                 static_cast<int64_t>(last_value.last_access_time)) < 60) {
87
0
                        LOG(INFO) << "skip partition_id=" << partition_id << " qpd=" << value.qpd
88
0
                                  << " qpw=" << value.qpw
89
0
                                  << " last_access_time=" << value.last_access_time
90
0
                                  << " last_qpd=" << last_value.qpd
91
0
                                  << " last_qpw=" << last_value.qpw
92
0
                                  << " last_access_time=" << last_value.last_access_time;
93
0
                        continue;
94
0
                    }
95
0
                }
96
0
            }
97
51
            THotPartition new_hot_partition;
98
51
            new_hot_partition.__set_partition_id(partition_id);
99
51
            new_hot_partition.__set_query_per_day(value.qpd);
100
51
            new_hot_partition.__set_query_per_week(value.qpw);
101
51
            new_hot_partition.__set_last_access_time(value.last_access_time);
102
51
            msg.hot_partitions.push_back(new_hot_partition);
103
51
            return_partitions++;
104
51
        }
105
51
        msg.__isset.hot_partitions = !msg.hot_partitions.empty();
106
51
        hot_tables->push_back(std::move(msg));
107
51
    }
108
2
}
109
110
1
void TabletHotspot::get_top_n_hot_partition(std::vector<THotTableMessage>* hot_tables) {
111
    // map<pair<table_id, index_id>, map<partition_id, value>> for day
112
1
    std::unordered_map<TabletHotspotMapKey, std::unordered_map<int64_t, TabletHotspotMapValue>,
113
1
                       MapKeyHash>
114
1
            day_hot_partitions;
115
    // map<pair<table_id, index_id>, map<partition_id, value>> for week
116
1
    std::unordered_map<TabletHotspotMapKey, std::unordered_map<int64_t, TabletHotspotMapValue>,
117
1
                       MapKeyHash>
118
1
            week_hot_partitions;
119
120
1.02k
    std::for_each(_tablets_hotspot.begin(), _tablets_hotspot.end(), [&](HotspotMap& map) {
121
1.02k
        std::lock_guard lock(map.mtx);
122
67.4k
        for (auto& [_, counter] : map.map) {
123
67.4k
            if (counter->qpd() != 0) {
124
67.4k
                auto& hot_partition = day_hot_partitions[std::make_pair(
125
67.4k
                        counter->table_id, counter->index_id)][counter->partition_id];
126
67.4k
                hot_partition.qpd = std::max(hot_partition.qpd, counter->qpd());
127
67.4k
                hot_partition.qpw = std::max(hot_partition.qpw, counter->qpw());
128
67.4k
                hot_partition.last_access_time =
129
67.4k
                        std::max<int64_t>(hot_partition.last_access_time,
130
67.4k
                                          std::chrono::duration_cast<std::chrono::seconds>(
131
67.4k
                                                  counter->last_access_time.time_since_epoch())
132
67.4k
                                                  .count());
133
67.4k
            } else if (counter->qpw() != 0) {
134
0
                auto& hot_partition = week_hot_partitions[std::make_pair(
135
0
                        counter->table_id, counter->index_id)][counter->partition_id];
136
0
                hot_partition.qpd = 0;
137
0
                hot_partition.qpw = std::max(hot_partition.qpw, counter->qpw());
138
0
                hot_partition.last_access_time =
139
0
                        std::max<int64_t>(hot_partition.last_access_time,
140
0
                                          std::chrono::duration_cast<std::chrono::seconds>(
141
0
                                                  counter->last_access_time.time_since_epoch())
142
0
                                                  .count());
143
0
            }
144
67.4k
        }
145
1.02k
    });
146
1
    constexpr int N = 50;
147
1
    int return_partitions = 0;
148
149
1
    std::unique_lock lock(_last_partitions_mtx);
150
1
    get_return_partitions(day_hot_partitions, _last_day_hot_partitions, hot_tables,
151
1
                          return_partitions, N);
152
1
    get_return_partitions(week_hot_partitions, _last_week_hot_partitions, hot_tables,
153
1
                          return_partitions, N);
154
155
1
    _last_day_hot_partitions = std::move(day_hot_partitions);
156
1
    _last_week_hot_partitions = std::move(week_hot_partitions);
157
1
}
158
159
67.8k
void HotspotCounter::make_dot_point() {
160
67.8k
    uint64_t value = cur_counter.load();
161
67.8k
    cur_counter = 0;
162
67.8k
    if (history_counters.size() == week_counters_size) {
163
0
        uint64_t week_counter_remove = history_counters.back();
164
0
        uint64_t day_counter_remove = history_counters[day_counters_size - 1];
165
0
        week_history_counter = week_history_counter - week_counter_remove + value;
166
0
        day_history_counter = day_history_counter - day_counter_remove + value;
167
0
        history_counters.pop_back();
168
67.8k
    } else if (history_counters.size() < day_counters_size) {
169
67.8k
        week_history_counter += value;
170
67.8k
        day_history_counter += value;
171
67.8k
    } else {
172
0
        week_history_counter += value;
173
0
        uint64_t day_counter_remove = history_counters[day_counters_size - 1];
174
0
        day_history_counter = day_history_counter - day_counter_remove + value;
175
0
    }
176
67.8k
    history_counters.push_front(value);
177
67.8k
}
178
179
134k
uint64_t HotspotCounter::qpd() {
180
134k
    return day_history_counter + cur_counter.load();
181
134k
}
182
183
67.4k
uint64_t HotspotCounter::qpw() {
184
67.4k
    return week_history_counter + cur_counter.load();
185
67.4k
}
186
187
1
void TabletHotspot::make_dot_point() {
188
3
    while (true) {
189
2
        {
190
2
            std::unique_lock lock(_mtx);
191
2
            _cond.wait_for(lock, std::chrono::seconds(HotspotCounter::time_interval),
192
3
                           [this]() { return _closed; });
193
2
            if (_closed) {
194
0
                break;
195
0
            }
196
2
        }
197
1.02k
        std::for_each(_tablets_hotspot.begin(), _tablets_hotspot.end(), [](HotspotMap& map) {
198
1.02k
            std::vector<HotspotCounterPtr> counters;
199
1.02k
            {
200
1.02k
                std::lock_guard lock(map.mtx);
201
67.8k
                for (auto& [_, counter] : map.map) {
202
67.8k
                    counters.push_back(counter);
203
67.8k
                }
204
1.02k
            }
205
1.02k
            std::for_each(counters.begin(), counters.end(),
206
67.8k
                          [](HotspotCounterPtr& counter) { counter->make_dot_point(); });
207
1.02k
        });
208
2
    }
209
1
}
210
211
} // namespace doris