be/src/runtime/cache/result_cache.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | #include "runtime/cache/result_cache.h" |
18 | | |
19 | | #include <gen_cpp/internal_service.pb.h> |
20 | | #include <glog/logging.h> |
21 | | |
22 | | #include <iostream> |
23 | | #include <list> |
24 | | #include <utility> |
25 | | |
26 | | #include "common/metrics/doris_metrics.h" |
27 | | #include "runtime/cache/cache_utils.h" |
28 | | #include "storage/olap_define.h" |
29 | | |
30 | | namespace doris { |
31 | | |
32 | | /** |
33 | | * Remove the tail node of link |
34 | | */ |
35 | 0 | ResultNode* ResultNodeList::pop() { |
36 | 0 | remove(_head); |
37 | 0 | return _head; |
38 | 0 | } |
39 | | |
40 | 0 | void ResultNodeList::remove(ResultNode* node) { |
41 | 0 | if (!node) return; |
42 | 0 | if (node == _head) _head = node->get_next(); |
43 | 0 | if (node == _tail) _tail = node->get_prev(); |
44 | 0 | node->unlink(); |
45 | 0 | _node_count--; |
46 | 0 | } |
47 | | |
48 | 71.7k | void ResultNodeList::push_back(ResultNode* node) { |
49 | 71.7k | if (!node) return; |
50 | 71.7k | if (!_head) _head = node; |
51 | 71.7k | node->append(_tail); |
52 | 71.7k | _tail = node; |
53 | 71.7k | _node_count++; |
54 | 71.7k | } |
55 | | |
56 | 75.8k | void ResultNodeList::move_tail(ResultNode* node) { |
57 | 75.8k | if (!node || node == _tail) return; |
58 | 2.32k | if (!_head) |
59 | 0 | _head = node; |
60 | 2.32k | else if (node == _head) |
61 | 0 | _head = node->get_next(); |
62 | 2.32k | node->unlink(); |
63 | 2.32k | node->append(_tail); |
64 | 2.32k | _tail = node; |
65 | 2.32k | } |
66 | | |
67 | 0 | void ResultNodeList::delete_node(ResultNode** node) { |
68 | 0 | (*node)->clear(); |
69 | 0 | SAFE_DELETE(*node); |
70 | 0 | } |
71 | | |
72 | 30 | void ResultNodeList::clear() { |
73 | 30 | LOG(INFO) << "clear result node list."; |
74 | 52 | while (_head) { |
75 | 22 | ResultNode* tmp_node = _head->get_next(); |
76 | 22 | _head->clear(); |
77 | 22 | SAFE_DELETE(_head); |
78 | 22 | _head = tmp_node; |
79 | 22 | } |
80 | 30 | _node_count = 0; |
81 | 30 | } |
82 | | /** |
83 | | * Find the node and update partition data |
84 | | * New node, the node updated in the first partition will move to the tail of the list |
85 | | */ |
86 | 72.1k | void ResultCache::update(const PUpdateCacheRequest* request, PCacheResponse* response) { |
87 | 72.1k | ResultNode* node; |
88 | 72.1k | PCacheStatus status; |
89 | 72.1k | bool update_first = false; |
90 | 72.1k | UniqueId sql_key = request->sql_key(); |
91 | 72.1k | LOG(INFO) << "update cache, sql key:" << sql_key; |
92 | | |
93 | 72.1k | CacheWriteLock write_lock(_cache_mtx); |
94 | 72.1k | auto it = _node_map.find(sql_key); |
95 | 72.1k | if (it != _node_map.end()) { |
96 | 426 | node = it->second; |
97 | 426 | _cache_size -= node->get_data_size(); |
98 | 426 | _partition_count -= node->get_partition_count(); |
99 | 426 | status = node->update_partition(request, update_first); |
100 | 71.7k | } else { |
101 | 71.7k | node = _node_list.new_node(sql_key); |
102 | 71.7k | status = node->update_partition(request, update_first); |
103 | 71.7k | _node_list.push_back(node); |
104 | 71.7k | _node_map[sql_key] = node; |
105 | 71.7k | _node_count += 1; |
106 | 71.7k | } |
107 | 72.1k | if (update_first) { |
108 | 72.1k | _node_list.move_tail(node); |
109 | 72.1k | } |
110 | 72.1k | _cache_size += node->get_data_size(); |
111 | 72.1k | _partition_count += node->get_partition_count(); |
112 | 72.1k | response->set_status(status); |
113 | | |
114 | 72.1k | prune(); |
115 | 72.1k | update_monitor(); |
116 | 72.1k | } |
117 | | |
118 | | /** |
119 | | * Fetch cache through sql key, partition key, version and time |
120 | | */ |
121 | 3.62k | void ResultCache::fetch(const PFetchCacheRequest* request, PFetchCacheResult* result) { |
122 | 3.62k | bool hit_first = false; |
123 | 3.62k | ResultNodeMap::iterator node_it; |
124 | 3.62k | const UniqueId sql_key = request->sql_key(); |
125 | 3.62k | LOG(INFO) << "fetch cache, sql key:" << sql_key; |
126 | 3.62k | { |
127 | 3.62k | CacheReadLock read_lock(_cache_mtx); |
128 | 3.62k | node_it = _node_map.find(sql_key); |
129 | 3.62k | if (node_it == _node_map.end()) { |
130 | 1 | result->set_status(PCacheStatus::NO_SQL_KEY); |
131 | 1 | LOG(INFO) << "no such sql key:" << sql_key; |
132 | 1 | return; |
133 | 1 | } |
134 | 3.62k | ResultNode* node = node_it->second; |
135 | 3.62k | PartitionRowBatchList part_rowbatch_list; |
136 | 3.62k | PCacheStatus status = node->fetch_partition(request, part_rowbatch_list, hit_first); |
137 | | |
138 | 7.23k | for (auto part_it = part_rowbatch_list.begin(); part_it != part_rowbatch_list.end(); |
139 | 3.62k | part_it++) { |
140 | 3.61k | PCacheValue* srcValue = (*part_it)->get_value(); |
141 | 3.61k | if (srcValue != nullptr) { |
142 | 3.61k | PCacheValue* value = result->add_values(); |
143 | 3.61k | value->CopyFrom(*srcValue); |
144 | 3.61k | LOG(INFO) << "fetch cache partition key:" << srcValue->param().partition_key(); |
145 | 3.61k | } else { |
146 | 0 | LOG(WARNING) << "prowbatch of cache is null"; |
147 | 0 | status = PCacheStatus::EMPTY_DATA; |
148 | 0 | break; |
149 | 0 | } |
150 | 3.61k | } |
151 | 3.62k | if (status == PCacheStatus::CACHE_OK && part_rowbatch_list.empty()) { |
152 | 0 | status = PCacheStatus::EMPTY_DATA; |
153 | 0 | } |
154 | 3.62k | result->set_status(status); |
155 | 3.62k | } |
156 | | |
157 | 3.62k | if (hit_first) { |
158 | 3.61k | CacheWriteLock write_lock(_cache_mtx); |
159 | 3.61k | _node_list.move_tail(node_it->second); |
160 | 3.61k | } |
161 | 3.62k | } |
162 | | |
163 | 0 | bool ResultCache::contains(const UniqueId& sql_key) { |
164 | 0 | CacheReadLock read_lock(_cache_mtx); |
165 | 0 | return _node_map.find(sql_key) != _node_map.end(); |
166 | 0 | } |
167 | | |
168 | | /** |
169 | | * enum PClearType { |
170 | | * CLEAR_ALL = 0, |
171 | | * PRUNE_CACHE = 1, |
172 | | * CLEAR_BEFORE_TIME = 2, |
173 | | * CLEAR_SQL_KEY = 3 |
174 | | * }; |
175 | | */ |
176 | 14 | void ResultCache::clear(const PClearCacheRequest* request, PCacheResponse* response) { |
177 | 14 | LOG(INFO) << "clear cache type" << request->clear_type() |
178 | 14 | << ", node size:" << _node_list.get_node_count() << ", map size:" << _node_map.size(); |
179 | 14 | CacheWriteLock write_lock(_cache_mtx); |
180 | | //0 clear, 1 prune, 2 before_time,3 sql_key |
181 | 14 | switch (request->clear_type()) { |
182 | 14 | case PClearType::CLEAR_ALL: |
183 | 14 | _node_list.clear(); |
184 | 14 | _node_map.clear(); |
185 | 14 | _cache_size = 0; |
186 | 14 | _node_count = 0; |
187 | 14 | _partition_count = 0; |
188 | 14 | break; |
189 | 0 | case PClearType::PRUNE_CACHE: |
190 | 0 | prune(); |
191 | 0 | break; |
192 | 0 | default: |
193 | 0 | break; |
194 | 14 | } |
195 | 14 | update_monitor(); |
196 | 14 | response->set_status(PCacheStatus::CACHE_OK); |
197 | 14 | } |
198 | | |
199 | | //private method |
200 | 0 | ResultNode* find_min_time_node(ResultNode* result_node) { |
201 | 0 | if (result_node->get_prev()) { |
202 | 0 | if (result_node->get_prev()->first_partition_last_time() <= |
203 | 0 | result_node->first_partition_last_time()) { |
204 | 0 | return result_node->get_prev(); |
205 | 0 | } |
206 | 0 | } |
207 | | |
208 | 0 | if (result_node->get_next()) { |
209 | 0 | if (result_node->get_next()->first_partition_last_time() < |
210 | 0 | result_node->first_partition_last_time()) { |
211 | 0 | return result_node->get_next(); |
212 | 0 | } |
213 | 0 | } |
214 | 0 | return result_node; |
215 | 0 | } |
216 | | |
217 | | /* |
218 | | * Two-dimensional array, prune the min last_read_time PartitionRowBatch. |
219 | | * The following example is the last read time array. |
220 | | * 1 and 2 is the read time, nodes with pruning read time < 3 |
221 | | * Before: |
222 | | * 1,2 //_head ResultNode* |
223 | | * 1,2,3,4,5 |
224 | | * 2,4,3,6,8 |
225 | | * 5,7,9,11,13 //_tail ResultNode* |
226 | | * After: |
227 | | * 4,5 //_head |
228 | | * 4,3,6,8 |
229 | | * 5,7,9,11,13 //_tail |
230 | | */ |
231 | 72.1k | void ResultCache::prune() { |
232 | 72.1k | if (_cache_size <= (_max_size + _elasticity_size)) { |
233 | 72.1k | return; |
234 | 72.1k | } |
235 | 72.1k | LOG(INFO) << "begin prune cache, cache_size : " << _cache_size << ", max_size : " << _max_size |
236 | 0 | << ", elasticity_size : " << _elasticity_size; |
237 | 0 | ResultNode* result_node = _node_list.get_head(); |
238 | 0 | while (_cache_size > _max_size) { |
239 | 0 | if (result_node == nullptr) { |
240 | 0 | break; |
241 | 0 | } |
242 | 0 | result_node = find_min_time_node(result_node); |
243 | 0 | _cache_size -= result_node->prune_first(); |
244 | 0 | if (result_node->get_data_size() == 0) { |
245 | 0 | ResultNode* next_node; |
246 | 0 | if (result_node->get_next()) { |
247 | 0 | next_node = result_node->get_next(); |
248 | 0 | } else if (result_node->get_prev()) { |
249 | 0 | next_node = result_node->get_prev(); |
250 | 0 | } else { |
251 | 0 | next_node = _node_list.get_head(); |
252 | 0 | } |
253 | 0 | remove(result_node); |
254 | 0 | result_node = next_node; |
255 | 0 | } |
256 | 0 | } |
257 | 0 | LOG(INFO) << "finish prune, cache_size : " << _cache_size; |
258 | 0 | _node_count = _node_map.size(); |
259 | 0 | _cache_size = 0; |
260 | 0 | _partition_count = 0; |
261 | 0 | for (auto node_it = _node_map.begin(); node_it != _node_map.end(); node_it++) { |
262 | 0 | _partition_count += node_it->second->get_partition_count(); |
263 | 0 | _cache_size += node_it->second->get_data_size(); |
264 | 0 | } |
265 | 0 | } |
266 | | |
267 | 0 | void ResultCache::remove(ResultNode* result_node) { |
268 | 0 | auto node_it = _node_map.find(result_node->get_sql_key()); |
269 | 0 | if (node_it != _node_map.end()) { |
270 | 0 | _node_map.erase(node_it); |
271 | 0 | _node_list.remove(result_node); |
272 | 0 | _node_list.delete_node(&result_node); |
273 | 0 | } |
274 | 0 | } |
275 | | |
276 | 72.2k | void ResultCache::update_monitor() { |
277 | 72.2k | DorisMetrics::instance()->query_cache_memory_total_byte->set_value(_cache_size); |
278 | 72.2k | DorisMetrics::instance()->query_cache_sql_total_count->set_value(_node_count); |
279 | 72.2k | DorisMetrics::instance()->query_cache_partition_total_count->set_value(_partition_count); |
280 | 72.2k | } |
281 | | |
282 | | } // namespace doris |