be/src/io/cache/block_file_cache.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // This file is copied from |
18 | | // https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileCache.cpp |
19 | | // and modified by Doris |
20 | | |
21 | | #include "io/cache/block_file_cache.h" |
22 | | |
23 | | #include <gen_cpp/file_cache.pb.h> |
24 | | |
25 | | #include <cstdio> |
26 | | #include <exception> |
27 | | #include <fstream> |
28 | | #include <unordered_set> |
29 | | |
30 | | #include "common/status.h" |
31 | | #include "cpp/sync_point.h" |
32 | | #include "runtime/exec_env.h" |
33 | | |
34 | | #if defined(__APPLE__) |
35 | | #include <sys/mount.h> |
36 | | #else |
37 | | #include <sys/statfs.h> |
38 | | #endif |
39 | | |
40 | | #include <chrono> // IWYU pragma: keep |
41 | | #include <mutex> |
42 | | #include <ranges> |
43 | | |
44 | | #include "common/cast_set.h" |
45 | | #include "common/config.h" |
46 | | #include "common/logging.h" |
47 | | #include "core/uint128.h" |
48 | | #include "exec/common/sip_hash.h" |
49 | | #include "io/cache/block_file_cache_ttl_mgr.h" |
50 | | #include "io/cache/file_block.h" |
51 | | #include "io/cache/file_cache_common.h" |
52 | | #include "io/cache/fs_file_cache_storage.h" |
53 | | #include "io/cache/mem_file_cache_storage.h" |
54 | | #include "runtime/runtime_profile.h" |
55 | | #include "util/concurrency_stats.h" |
56 | | #include "util/stack_util.h" |
57 | | #include "util/stopwatch.hpp" |
58 | | #include "util/thread.h" |
59 | | #include "util/time.h" |
60 | | namespace doris::io { |
61 | | #include "common/compile_check_begin.h" |
62 | | |
63 | | // Insert a block pointer into one shard while swallowing allocation failures. |
64 | 9.56k | bool NeedUpdateLRUBlocks::insert(FileBlockSPtr block) { |
65 | 9.56k | if (!block) { |
66 | 1 | return false; |
67 | 1 | } |
68 | 9.56k | try { |
69 | 9.56k | auto* raw_ptr = block.get(); |
70 | 9.56k | auto idx = shard_index(raw_ptr); |
71 | 9.56k | auto& shard = _shards[idx]; |
72 | 9.56k | std::lock_guard lock(shard.mutex); |
73 | 9.56k | auto [_, inserted] = shard.entries.emplace(raw_ptr, std::move(block)); |
74 | 9.56k | if (inserted) { |
75 | 1.02k | _size.fetch_add(1, std::memory_order_relaxed); |
76 | 1.02k | } |
77 | 9.56k | return inserted; |
78 | 9.56k | } catch (const std::exception& e) { |
79 | 0 | LOG(WARNING) << "Failed to enqueue block for LRU update: " << e.what(); |
80 | 0 | } catch (...) { |
81 | 0 | LOG(WARNING) << "Failed to enqueue block for LRU update: unknown error"; |
82 | 0 | } |
83 | 0 | return false; |
84 | 9.56k | } |
85 | | |
86 | | // Drain up to `limit` unique blocks to the caller, keeping the structure consistent on failures. |
87 | 4.53k | size_t NeedUpdateLRUBlocks::drain(size_t limit, std::vector<FileBlockSPtr>* output) { |
88 | 4.53k | if (limit == 0 || output == nullptr) { |
89 | 2 | return 0; |
90 | 2 | } |
91 | 4.53k | size_t drained = 0; |
92 | 4.53k | try { |
93 | 4.53k | output->reserve(output->size() + std::min(limit, size())); |
94 | 280k | for (auto& shard : _shards) { |
95 | 280k | if (drained >= limit) { |
96 | 1 | break; |
97 | 1 | } |
98 | 280k | std::lock_guard lock(shard.mutex); |
99 | 280k | auto it = shard.entries.begin(); |
100 | 280k | size_t shard_drained = 0; |
101 | 280k | while (it != shard.entries.end() && drained + shard_drained < limit) { |
102 | 500 | output->emplace_back(std::move(it->second)); |
103 | 500 | it = shard.entries.erase(it); |
104 | 500 | ++shard_drained; |
105 | 500 | } |
106 | 280k | if (shard_drained > 0) { |
107 | 6 | _size.fetch_sub(shard_drained, std::memory_order_relaxed); |
108 | 6 | drained += shard_drained; |
109 | 6 | } |
110 | 280k | } |
111 | 4.53k | } catch (const std::exception& e) { |
112 | 0 | LOG(WARNING) << "Failed to drain LRU update blocks: " << e.what(); |
113 | 0 | } catch (...) { |
114 | 0 | LOG(WARNING) << "Failed to drain LRU update blocks: unknown error"; |
115 | 0 | } |
116 | 4.66k | return drained; |
117 | 4.53k | } |
118 | | |
119 | | // Remove every pending block, guarding against unexpected exceptions. |
120 | 31 | void NeedUpdateLRUBlocks::clear() { |
121 | 31 | try { |
122 | 1.98k | for (auto& shard : _shards) { |
123 | 1.98k | std::lock_guard lock(shard.mutex); |
124 | 1.98k | if (!shard.entries.empty()) { |
125 | 1 | auto removed = shard.entries.size(); |
126 | 1 | shard.entries.clear(); |
127 | 1 | _size.fetch_sub(removed, std::memory_order_relaxed); |
128 | 1 | } |
129 | 1.98k | } |
130 | 31 | } catch (const std::exception& e) { |
131 | 0 | LOG(WARNING) << "Failed to clear LRU update blocks: " << e.what(); |
132 | 0 | } catch (...) { |
133 | 0 | LOG(WARNING) << "Failed to clear LRU update blocks: unknown error"; |
134 | 0 | } |
135 | 31 | } |
136 | | |
137 | 9.56k | size_t NeedUpdateLRUBlocks::shard_index(FileBlock* ptr) const { |
138 | 9.56k | DCHECK(ptr != nullptr); |
139 | 9.56k | return std::hash<FileBlock*> {}(ptr)&kShardMask; |
140 | 9.56k | } |
141 | | |
142 | | BlockFileCache::BlockFileCache(const std::string& cache_base_path, |
143 | | const FileCacheSettings& cache_settings) |
144 | 159 | : _cache_base_path(cache_base_path), |
145 | 159 | _capacity(cache_settings.capacity), |
146 | 159 | _max_file_block_size(cache_settings.max_file_block_size) { |
147 | 159 | _cur_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(_cache_base_path.c_str(), |
148 | 159 | "file_cache_cache_size", 0); |
149 | 159 | _cache_capacity_metrics = std::make_shared<bvar::Status<size_t>>( |
150 | 159 | _cache_base_path.c_str(), "file_cache_capacity", _capacity); |
151 | 159 | _cur_ttl_cache_size_metrics = std::make_shared<bvar::Status<size_t>>( |
152 | 159 | _cache_base_path.c_str(), "file_cache_ttl_cache_size", 0); |
153 | 159 | _cur_normal_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>( |
154 | 159 | _cache_base_path.c_str(), "file_cache_normal_queue_element_count", 0); |
155 | 159 | _cur_ttl_cache_lru_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>( |
156 | 159 | _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_size", 0); |
157 | 159 | _cur_ttl_cache_lru_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>( |
158 | 159 | _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_element_count", 0); |
159 | 159 | _cur_normal_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>( |
160 | 159 | _cache_base_path.c_str(), "file_cache_normal_queue_cache_size", 0); |
161 | 159 | _cur_index_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>( |
162 | 159 | _cache_base_path.c_str(), "file_cache_index_queue_element_count", 0); |
163 | 159 | _cur_index_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>( |
164 | 159 | _cache_base_path.c_str(), "file_cache_index_queue_cache_size", 0); |
165 | 159 | _cur_disposable_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>( |
166 | 159 | _cache_base_path.c_str(), "file_cache_disposable_queue_element_count", 0); |
167 | 159 | _cur_disposable_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>( |
168 | 159 | _cache_base_path.c_str(), "file_cache_disposable_queue_cache_size", 0); |
169 | | |
170 | 159 | _queue_evict_size_metrics[0] = std::make_shared<bvar::Adder<size_t>>( |
171 | 159 | _cache_base_path.c_str(), "file_cache_index_queue_evict_size"); |
172 | 159 | _queue_evict_size_metrics[1] = std::make_shared<bvar::Adder<size_t>>( |
173 | 159 | _cache_base_path.c_str(), "file_cache_normal_queue_evict_size"); |
174 | 159 | _queue_evict_size_metrics[2] = std::make_shared<bvar::Adder<size_t>>( |
175 | 159 | _cache_base_path.c_str(), "file_cache_disposable_queue_evict_size"); |
176 | 159 | _queue_evict_size_metrics[3] = std::make_shared<bvar::Adder<size_t>>( |
177 | 159 | _cache_base_path.c_str(), "file_cache_ttl_cache_evict_size"); |
178 | 159 | _total_evict_size_metrics = std::make_shared<bvar::Adder<size_t>>( |
179 | 159 | _cache_base_path.c_str(), "file_cache_total_evict_size"); |
180 | 159 | _total_read_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
181 | 159 | "file_cache_total_read_size"); |
182 | 159 | _total_hit_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
183 | 159 | "file_cache_total_hit_size"); |
184 | 159 | _gc_evict_bytes_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
185 | 159 | "file_cache_gc_evict_bytes"); |
186 | 159 | _gc_evict_count_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
187 | 159 | "file_cache_gc_evict_count"); |
188 | | |
189 | 159 | _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] = |
190 | 159 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
191 | 159 | "file_cache_evict_by_time_disposable_to_normal"); |
192 | 159 | _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] = |
193 | 159 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
194 | 159 | "file_cache_evict_by_time_disposable_to_index"); |
195 | 159 | _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] = |
196 | 159 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
197 | 159 | "file_cache_evict_by_time_disposable_to_ttl"); |
198 | 159 | _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] = |
199 | 159 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
200 | 159 | "file_cache_evict_by_time_normal_to_disposable"); |
201 | 159 | _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] = |
202 | 159 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
203 | 159 | "file_cache_evict_by_time_normal_to_index"); |
204 | 159 | _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] = |
205 | 159 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
206 | 159 | "file_cache_evict_by_time_normal_to_ttl"); |
207 | 159 | _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] = |
208 | 159 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
209 | 159 | "file_cache_evict_by_time_index_to_disposable"); |
210 | 159 | _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] = |
211 | 159 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
212 | 159 | "file_cache_evict_by_time_index_to_normal"); |
213 | 159 | _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] = |
214 | 159 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
215 | 159 | "file_cache_evict_by_time_index_to_ttl"); |
216 | 159 | _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] = |
217 | 159 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
218 | 159 | "file_cache_evict_by_time_ttl_to_disposable"); |
219 | 159 | _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] = |
220 | 159 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
221 | 159 | "file_cache_evict_by_time_ttl_to_normal"); |
222 | 159 | _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] = |
223 | 159 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
224 | 159 | "file_cache_evict_by_time_ttl_to_index"); |
225 | | |
226 | 159 | _evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE] = |
227 | 159 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
228 | 159 | "file_cache_evict_by_self_lru_disposable"); |
229 | 159 | _evict_by_self_lru_metrics_matrix[FileCacheType::NORMAL] = |
230 | 159 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
231 | 159 | "file_cache_evict_by_self_lru_normal"); |
232 | 159 | _evict_by_self_lru_metrics_matrix[FileCacheType::INDEX] = std::make_shared<bvar::Adder<size_t>>( |
233 | 159 | _cache_base_path.c_str(), "file_cache_evict_by_self_lru_index"); |
234 | 159 | _evict_by_self_lru_metrics_matrix[FileCacheType::TTL] = std::make_shared<bvar::Adder<size_t>>( |
235 | 159 | _cache_base_path.c_str(), "file_cache_evict_by_self_lru_ttl"); |
236 | | |
237 | 159 | _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] = |
238 | 159 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
239 | 159 | "file_cache_evict_by_size_disposable_to_normal"); |
240 | 159 | _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] = |
241 | 159 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
242 | 159 | "file_cache_evict_by_size_disposable_to_index"); |
243 | 159 | _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] = |
244 | 159 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
245 | 159 | "file_cache_evict_by_size_disposable_to_ttl"); |
246 | 159 | _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] = |
247 | 159 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
248 | 159 | "file_cache_evict_by_size_normal_to_disposable"); |
249 | 159 | _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] = |
250 | 159 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
251 | 159 | "file_cache_evict_by_size_normal_to_index"); |
252 | 159 | _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] = |
253 | 159 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
254 | 159 | "file_cache_evict_by_size_normal_to_ttl"); |
255 | 159 | _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] = |
256 | 159 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
257 | 159 | "file_cache_evict_by_size_index_to_disposable"); |
258 | 159 | _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] = |
259 | 159 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
260 | 159 | "file_cache_evict_by_size_index_to_normal"); |
261 | 159 | _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] = |
262 | 159 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
263 | 159 | "file_cache_evict_by_size_index_to_ttl"); |
264 | 159 | _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] = |
265 | 159 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
266 | 159 | "file_cache_evict_by_size_ttl_to_disposable"); |
267 | 159 | _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] = |
268 | 159 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
269 | 159 | "file_cache_evict_by_size_ttl_to_normal"); |
270 | 159 | _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] = |
271 | 159 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
272 | 159 | "file_cache_evict_by_size_ttl_to_index"); |
273 | | |
274 | 159 | _evict_by_try_release = std::make_shared<bvar::Adder<size_t>>( |
275 | 159 | _cache_base_path.c_str(), "file_cache_evict_by_try_release"); |
276 | | |
277 | 159 | _num_read_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
278 | 159 | "file_cache_num_read_blocks"); |
279 | 159 | _num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
280 | 159 | "file_cache_num_hit_blocks"); |
281 | 159 | _num_removed_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
282 | 159 | "file_cache_num_removed_blocks"); |
283 | | |
284 | 159 | _no_warmup_num_read_blocks = std::make_shared<bvar::Adder<size_t>>( |
285 | 159 | _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks"); |
286 | 159 | _no_warmup_num_hit_blocks = std::make_shared<bvar::Adder<size_t>>( |
287 | 159 | _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks"); |
288 | | |
289 | | #ifndef BE_TEST |
290 | | _num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>( |
291 | | _cache_base_path.c_str(), "file_cache_num_hit_blocks_5m", _num_hit_blocks.get(), 300); |
292 | | _num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>( |
293 | | _cache_base_path.c_str(), "file_cache_num_read_blocks_5m", _num_read_blocks.get(), 300); |
294 | | _num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>( |
295 | | _cache_base_path.c_str(), "file_cache_num_hit_blocks_1h", _num_hit_blocks.get(), 3600); |
296 | | _num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>( |
297 | | _cache_base_path.c_str(), "file_cache_num_read_blocks_1h", _num_read_blocks.get(), |
298 | | 3600); |
299 | | _no_warmup_num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>( |
300 | | _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_5m", |
301 | | _no_warmup_num_hit_blocks.get(), 300); |
302 | | _no_warmup_num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>( |
303 | | _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_5m", |
304 | | _no_warmup_num_read_blocks.get(), 300); |
305 | | _no_warmup_num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>( |
306 | | _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_1h", |
307 | | _no_warmup_num_hit_blocks.get(), 3600); |
308 | | _no_warmup_num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>( |
309 | | _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_1h", |
310 | | _no_warmup_num_read_blocks.get(), 3600); |
311 | | #endif |
312 | | |
313 | 159 | _hit_ratio = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(), |
314 | 159 | "file_cache_hit_ratio", 0.0); |
315 | 159 | _hit_ratio_5m = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(), |
316 | 159 | "file_cache_hit_ratio_5m", 0.0); |
317 | 159 | _hit_ratio_1h = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(), |
318 | 159 | "file_cache_hit_ratio_1h", 0.0); |
319 | | |
320 | 159 | _no_warmup_hit_ratio = std::make_shared<bvar::Status<double>>( |
321 | 159 | _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio", 0.0); |
322 | 159 | _no_warmup_hit_ratio_5m = std::make_shared<bvar::Status<double>>( |
323 | 159 | _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_5m", 0.0); |
324 | 159 | _no_warmup_hit_ratio_1h = std::make_shared<bvar::Status<double>>( |
325 | 159 | _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_1h", 0.0); |
326 | | |
327 | 159 | _disk_limit_mode_metrics = std::make_shared<bvar::Status<size_t>>( |
328 | 159 | _cache_base_path.c_str(), "file_cache_disk_limit_mode", 0); |
329 | 159 | _need_evict_cache_in_advance_metrics = std::make_shared<bvar::Status<size_t>>( |
330 | 159 | _cache_base_path.c_str(), "file_cache_need_evict_cache_in_advance", 0); |
331 | 159 | _meta_store_write_queue_size_metrics = std::make_shared<bvar::Status<size_t>>( |
332 | 159 | _cache_base_path.c_str(), "file_cache_meta_store_write_queue_size", 0); |
333 | | |
334 | 159 | _cache_lock_wait_time_us = std::make_shared<bvar::LatencyRecorder>( |
335 | 159 | _cache_base_path.c_str(), "file_cache_cache_lock_wait_time_us"); |
336 | 159 | _get_or_set_latency_us = std::make_shared<bvar::LatencyRecorder>( |
337 | 159 | _cache_base_path.c_str(), "file_cache_get_or_set_latency_us"); |
338 | 159 | _storage_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>( |
339 | 159 | _cache_base_path.c_str(), "file_cache_storage_sync_remove_latency_us"); |
340 | 159 | _storage_retry_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>( |
341 | 159 | _cache_base_path.c_str(), "file_cache_storage_retry_sync_remove_latency_us"); |
342 | 159 | _storage_async_remove_latency_us = std::make_shared<bvar::LatencyRecorder>( |
343 | 159 | _cache_base_path.c_str(), "file_cache_storage_async_remove_latency_us"); |
344 | 159 | _evict_in_advance_latency_us = std::make_shared<bvar::LatencyRecorder>( |
345 | 159 | _cache_base_path.c_str(), "file_cache_evict_in_advance_latency_us"); |
346 | 159 | _lru_dump_latency_us = std::make_shared<bvar::LatencyRecorder>( |
347 | 159 | _cache_base_path.c_str(), "file_cache_lru_dump_latency_us"); |
348 | 159 | _recycle_keys_length_recorder = std::make_shared<bvar::LatencyRecorder>( |
349 | 159 | _cache_base_path.c_str(), "file_cache_recycle_keys_length"); |
350 | 159 | _need_update_lru_blocks_length_recorder = std::make_shared<bvar::LatencyRecorder>( |
351 | 159 | _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_length"); |
352 | 159 | _update_lru_blocks_latency_us = std::make_shared<bvar::LatencyRecorder>( |
353 | 159 | _cache_base_path.c_str(), "file_cache_update_lru_blocks_latency_us"); |
354 | 159 | _ttl_gc_latency_us = std::make_shared<bvar::LatencyRecorder>(_cache_base_path.c_str(), |
355 | 159 | "file_cache_ttl_gc_latency_us"); |
356 | 159 | _shadow_queue_levenshtein_distance = std::make_shared<bvar::LatencyRecorder>( |
357 | 159 | _cache_base_path.c_str(), "file_cache_shadow_queue_levenshtein_distance"); |
358 | | |
359 | 159 | _disposable_queue = LRUQueue(cache_settings.disposable_queue_size, |
360 | 159 | cache_settings.disposable_queue_elements, 60 * 60); |
361 | 159 | _index_queue = LRUQueue(cache_settings.index_queue_size, cache_settings.index_queue_elements, |
362 | 159 | 7 * 24 * 60 * 60); |
363 | 159 | _normal_queue = LRUQueue(cache_settings.query_queue_size, cache_settings.query_queue_elements, |
364 | 159 | 24 * 60 * 60); |
365 | 159 | _ttl_queue = LRUQueue(cache_settings.ttl_queue_size, cache_settings.ttl_queue_elements, |
366 | 159 | std::numeric_limits<int>::max()); |
367 | | |
368 | 159 | _lru_recorder = std::make_unique<LRUQueueRecorder>(this); |
369 | 159 | _lru_dumper = std::make_unique<CacheLRUDumper>(this, _lru_recorder.get()); |
370 | 159 | if (cache_settings.storage == "memory") { |
371 | 16 | _storage = std::make_unique<MemFileCacheStorage>(); |
372 | 16 | _cache_base_path = "memory"; |
373 | 143 | } else { |
374 | 143 | _storage = std::make_unique<FSFileCacheStorage>(); |
375 | 143 | } |
376 | | |
377 | 159 | LOG(INFO) << "file cache path= " << _cache_base_path << " " << cache_settings.to_string(); |
378 | 159 | } |
379 | | |
380 | 4.24k | UInt128Wrapper BlockFileCache::hash(const std::string& path) { |
381 | 4.24k | uint128_t value; |
382 | 4.24k | sip_hash128(path.data(), path.size(), reinterpret_cast<char*>(&value)); |
383 | 4.24k | return UInt128Wrapper(value); |
384 | 4.24k | } |
385 | | |
386 | | BlockFileCache::QueryFileCacheContextHolderPtr BlockFileCache::get_query_context_holder( |
387 | 8 | const TUniqueId& query_id, int file_cache_query_limit_percent) { |
388 | 8 | SCOPED_CACHE_LOCK(_mutex, this); |
389 | 8 | if (!config::enable_file_cache_query_limit) { |
390 | 1 | return {}; |
391 | 1 | } |
392 | | |
393 | | /// if enable_filesystem_query_cache_limit is true, |
394 | | /// we create context query for current query. |
395 | 7 | auto context = get_or_set_query_context(query_id, cache_lock, file_cache_query_limit_percent); |
396 | 7 | return std::make_unique<QueryFileCacheContextHolder>(query_id, this, context); |
397 | 8 | } |
398 | | |
399 | | BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_query_context( |
400 | 3.37k | const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock) { |
401 | 3.37k | auto query_iter = _query_map.find(query_id); |
402 | 3.37k | return (query_iter == _query_map.end()) ? nullptr : query_iter->second; |
403 | 3.37k | } |
404 | | |
405 | 6 | void BlockFileCache::remove_query_context(const TUniqueId& query_id) { |
406 | 6 | SCOPED_CACHE_LOCK(_mutex, this); |
407 | 6 | const auto& query_iter = _query_map.find(query_id); |
408 | | |
409 | 6 | if (query_iter != _query_map.end() && query_iter->second.use_count() <= 1) { |
410 | 5 | _query_map.erase(query_iter); |
411 | 5 | } |
412 | 6 | } |
413 | | |
414 | | BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_or_set_query_context( |
415 | | const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock, |
416 | 7 | int file_cache_query_limit_percent) { |
417 | 7 | if (query_id.lo == 0 && query_id.hi == 0) { |
418 | 1 | return nullptr; |
419 | 1 | } |
420 | | |
421 | 6 | auto context = get_query_context(query_id, cache_lock); |
422 | 6 | if (context) { |
423 | 1 | return context; |
424 | 1 | } |
425 | | |
426 | 5 | size_t file_cache_query_limit_size = _capacity * file_cache_query_limit_percent / 100; |
427 | 5 | if (file_cache_query_limit_size < 268435456) { |
428 | 5 | LOG(WARNING) << "The user-set file cache query limit (" << file_cache_query_limit_size |
429 | 5 | << " bytes) is less than the 256MB recommended minimum. " |
430 | 5 | << "Consider increasing the session variable 'file_cache_query_limit_percent'" |
431 | 5 | << " from its current value " << file_cache_query_limit_percent << "%."; |
432 | 5 | } |
433 | 5 | auto query_context = std::make_shared<QueryFileCacheContext>(file_cache_query_limit_size); |
434 | 5 | auto query_iter = _query_map.emplace(query_id, query_context).first; |
435 | 5 | return query_iter->second; |
436 | 6 | } |
437 | | |
438 | | void BlockFileCache::QueryFileCacheContext::remove(const UInt128Wrapper& hash, size_t offset, |
439 | 20 | std::lock_guard<std::mutex>& cache_lock) { |
440 | 20 | auto pair = std::make_pair(hash, offset); |
441 | 20 | auto record = records.find(pair); |
442 | 20 | DCHECK(record != records.end()); |
443 | 20 | auto iter = record->second; |
444 | 20 | records.erase(pair); |
445 | 20 | lru_queue.remove(iter, cache_lock); |
446 | 20 | } |
447 | | |
448 | | void BlockFileCache::QueryFileCacheContext::reserve(const UInt128Wrapper& hash, size_t offset, |
449 | | size_t size, |
450 | 30 | std::lock_guard<std::mutex>& cache_lock) { |
451 | 30 | auto pair = std::make_pair(hash, offset); |
452 | 30 | if (records.find(pair) == records.end()) { |
453 | 29 | auto queue_iter = lru_queue.add(hash, offset, size, cache_lock); |
454 | 29 | records.insert({pair, queue_iter}); |
455 | 29 | } |
456 | 30 | } |
457 | | |
458 | 139 | Status BlockFileCache::initialize() { |
459 | 139 | SCOPED_CACHE_LOCK(_mutex, this); |
460 | 139 | return initialize_unlocked(cache_lock); |
461 | 139 | } |
462 | | |
463 | 139 | Status BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lock) { |
464 | 139 | DCHECK(!_is_initialized); |
465 | 139 | _is_initialized = true; |
466 | 139 | if (config::file_cache_background_lru_dump_tail_record_num > 0) { |
467 | | // requirements: |
468 | | // 1. restored data should not overwrite the last dump |
469 | | // 2. restore should happen before load and async load |
470 | | // 3. all queues should be restored sequencially to avoid conflict |
471 | | // TODO(zhengyu): we can parralize them but will increase complexity, so lets check the time cost |
472 | | // to see if any improvement is a necessary |
473 | 139 | restore_lru_queues_from_disk(cache_lock); |
474 | 139 | } |
475 | 139 | RETURN_IF_ERROR(_storage->init(this)); |
476 | | |
477 | 139 | if (auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get())) { |
478 | 124 | if (auto* meta_store = fs_storage->get_meta_store()) { |
479 | 124 | _ttl_mgr = std::make_unique<BlockFileCacheTtlMgr>(this, meta_store); |
480 | 124 | } |
481 | 124 | } |
482 | | |
483 | 139 | _cache_background_monitor_thread = std::thread(&BlockFileCache::run_background_monitor, this); |
484 | 139 | _cache_background_gc_thread = std::thread(&BlockFileCache::run_background_gc, this); |
485 | 139 | _cache_background_evict_in_advance_thread = |
486 | 139 | std::thread(&BlockFileCache::run_background_evict_in_advance, this); |
487 | 139 | _cache_background_block_lru_update_thread = |
488 | 139 | std::thread(&BlockFileCache::run_background_block_lru_update, this); |
489 | | |
490 | | // Initialize LRU dump thread and restore queues |
491 | 139 | _cache_background_lru_dump_thread = std::thread(&BlockFileCache::run_background_lru_dump, this); |
492 | 139 | _cache_background_lru_log_replay_thread = |
493 | 139 | std::thread(&BlockFileCache::run_background_lru_log_replay, this); |
494 | | |
495 | 139 | return Status::OK(); |
496 | 139 | } |
497 | | |
498 | | void BlockFileCache::update_block_lru(FileBlockSPtr block, |
499 | 495 | std::lock_guard<std::mutex>& cache_lock) { |
500 | 495 | FileBlockCell* cell = block->cell; |
501 | 495 | if (cell) { |
502 | 495 | if (cell->queue_iterator) { |
503 | 495 | auto& queue = get_queue(block->cache_type()); |
504 | 495 | queue.move_to_end(*cell->queue_iterator, cache_lock); |
505 | 495 | _lru_recorder->record_queue_event(block->cache_type(), CacheLRULogType::MOVETOBACK, |
506 | 495 | block->_key.hash, block->_key.offset, |
507 | 495 | block->_block_range.size()); |
508 | 495 | } |
509 | 495 | cell->update_atime(); |
510 | 495 | } |
511 | 495 | } |
512 | | |
513 | | void BlockFileCache::use_cell(const FileBlockCell& cell, FileBlocks* result, bool move_iter_flag, |
514 | 773 | std::lock_guard<std::mutex>& cache_lock) { |
515 | 773 | if (result) { |
516 | 773 | result->push_back(cell.file_block); |
517 | 773 | } |
518 | | |
519 | 773 | auto& queue = get_queue(cell.file_block->cache_type()); |
520 | | /// Move to the end of the queue. The iterator remains valid. |
521 | 773 | if (cell.queue_iterator && move_iter_flag) { |
522 | 771 | queue.move_to_end(*cell.queue_iterator, cache_lock); |
523 | 771 | _lru_recorder->record_queue_event(cell.file_block->cache_type(), |
524 | 771 | CacheLRULogType::MOVETOBACK, cell.file_block->_key.hash, |
525 | 771 | cell.file_block->_key.offset, cell.size()); |
526 | 771 | } |
527 | | |
528 | 773 | cell.update_atime(); |
529 | 773 | } |
530 | | |
531 | | template <class T> |
532 | | requires IsXLock<T> |
533 | | FileBlockCell* BlockFileCache::get_cell(const UInt128Wrapper& hash, size_t offset, |
534 | 4.99k | T& /* cache_lock */) { |
535 | 4.99k | auto it = _files.find(hash); |
536 | 4.99k | if (it == _files.end()) { |
537 | 1 | return nullptr; |
538 | 1 | } |
539 | | |
540 | 4.99k | auto& offsets = it->second; |
541 | 4.99k | auto cell_it = offsets.find(offset); |
542 | 4.99k | if (cell_it == offsets.end()) { |
543 | 3 | return nullptr; |
544 | 3 | } |
545 | | |
546 | 4.99k | return &cell_it->second; |
547 | 4.99k | } |
548 | | |
549 | 773 | bool BlockFileCache::need_to_move(FileCacheType cell_type, FileCacheType query_type) const { |
550 | 773 | return query_type != FileCacheType::DISPOSABLE && cell_type != FileCacheType::DISPOSABLE; |
551 | 773 | } |
552 | | |
553 | | FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheContext& context, |
554 | | const FileBlock::Range& range, |
555 | 8.42k | std::lock_guard<std::mutex>& cache_lock) { |
556 | | /// Given range = [left, right] and non-overlapping ordered set of file blocks, |
557 | | /// find list [block1, ..., blockN] of blocks which intersect with given range. |
558 | 8.42k | auto it = _files.find(hash); |
559 | 8.42k | if (it == _files.end()) { |
560 | 1.94k | if (_async_open_done) { |
561 | 946 | return {}; |
562 | 946 | } |
563 | 1.00k | FileCacheKey key; |
564 | 1.00k | key.hash = hash; |
565 | 1.00k | key.meta.type = context.cache_type; |
566 | 1.00k | key.meta.expiration_time = context.expiration_time; |
567 | 1.00k | key.meta.tablet_id = context.tablet_id; |
568 | 1.00k | _storage->load_blocks_directly_unlocked(this, key, cache_lock); |
569 | | |
570 | 1.00k | it = _files.find(hash); |
571 | 1.00k | if (it == _files.end()) [[unlikely]] { |
572 | 1.00k | return {}; |
573 | 1.00k | } |
574 | 1.00k | } |
575 | | |
576 | 6.47k | auto& file_blocks = it->second; |
577 | 6.47k | if (file_blocks.empty()) { |
578 | 0 | LOG(WARNING) << "file_blocks is empty for hash=" << hash.to_string() |
579 | 0 | << " cache type=" << context.cache_type |
580 | 0 | << " cache expiration time=" << context.expiration_time |
581 | 0 | << " cache range=" << range.left << " " << range.right |
582 | 0 | << " query id=" << context.query_id; |
583 | 0 | DCHECK(false); |
584 | 0 | _files.erase(hash); |
585 | 0 | return {}; |
586 | 0 | } |
587 | | |
588 | 6.47k | FileBlocks result; |
589 | 6.47k | auto block_it = file_blocks.lower_bound(range.left); |
590 | 6.47k | if (block_it == file_blocks.end()) { |
591 | | /// N - last cached block for given file hash, block{N}.offset < range.left: |
592 | | /// block{N} block{N} |
593 | | /// [________ [_______] |
594 | | /// [__________] OR [________] |
595 | | /// ^ ^ |
596 | | /// range.left range.left |
597 | | |
598 | 6.20k | const auto& cell = file_blocks.rbegin()->second; |
599 | 6.20k | if (cell.file_block->range().right < range.left) { |
600 | 6.19k | return {}; |
601 | 6.19k | } |
602 | | |
603 | 14 | use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type), |
604 | 14 | cache_lock); |
605 | 268 | } else { /// block_it <-- segmment{k} |
606 | 268 | if (block_it != file_blocks.begin()) { |
607 | 217 | const auto& prev_cell = std::prev(block_it)->second; |
608 | 217 | const auto& prev_cell_range = prev_cell.file_block->range(); |
609 | | |
610 | 217 | if (range.left <= prev_cell_range.right) { |
611 | | /// block{k-1} block{k} |
612 | | /// [________] [_____ |
613 | | /// [___________ |
614 | | /// ^ |
615 | | /// range.left |
616 | | |
617 | 137 | use_cell(prev_cell, &result, |
618 | 137 | need_to_move(prev_cell.file_block->cache_type(), context.cache_type), |
619 | 137 | cache_lock); |
620 | 137 | } |
621 | 217 | } |
622 | | |
623 | | /// block{k} ... block{k-1} block{k} block{k} |
624 | | /// [______ [______] [____ [________ |
625 | | /// [_________ OR [________ OR [______] ^ |
626 | | /// ^ ^ ^ block{k}.offset |
627 | | /// range.left range.left range.right |
628 | | |
629 | 890 | while (block_it != file_blocks.end()) { |
630 | 794 | const auto& cell = block_it->second; |
631 | 794 | if (range.right < cell.file_block->range().left) { |
632 | 172 | break; |
633 | 172 | } |
634 | | |
635 | 622 | use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type), |
636 | 622 | cache_lock); |
637 | 622 | ++block_it; |
638 | 622 | } |
639 | 268 | } |
640 | | |
641 | 282 | return result; |
642 | 6.47k | } |
643 | | |
644 | 9.54k | void BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) { |
645 | 9.54k | if (_need_update_lru_blocks.insert(std::move(block))) { |
646 | 1.01k | *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size(); |
647 | 1.01k | } |
648 | 9.54k | } |
649 | | |
650 | 4 | std::string BlockFileCache::clear_file_cache_async() { |
651 | 4 | LOG(INFO) << "start clear_file_cache_async, path=" << _cache_base_path; |
652 | 4 | _lru_dumper->remove_lru_dump_files(); |
653 | 4 | int64_t num_cells_all = 0; |
654 | 4 | int64_t num_cells_to_delete = 0; |
655 | 4 | int64_t num_cells_wait_recycle = 0; |
656 | 4 | int64_t num_files_all = 0; |
657 | 4 | TEST_SYNC_POINT_CALLBACK("BlockFileCache::clear_file_cache_async"); |
658 | 4 | { |
659 | 4 | SCOPED_CACHE_LOCK(_mutex, this); |
660 | | |
661 | 4 | std::vector<FileBlockCell*> deleting_cells; |
662 | 4 | for (auto& [_, offset_to_cell] : _files) { |
663 | 4 | ++num_files_all; |
664 | 48 | for (auto& [_1, cell] : offset_to_cell) { |
665 | 48 | ++num_cells_all; |
666 | 48 | deleting_cells.push_back(&cell); |
667 | 48 | } |
668 | 4 | } |
669 | | |
670 | | // we cannot delete the element in the loop above, because it will break the iterator |
671 | 48 | for (auto& cell : deleting_cells) { |
672 | 48 | if (!cell->releasable()) { |
673 | 2 | LOG(INFO) << "cell is not releasable, hash=" |
674 | 2 | << " offset=" << cell->file_block->offset(); |
675 | 2 | cell->file_block->set_deleting(); |
676 | 2 | ++num_cells_wait_recycle; |
677 | 2 | continue; |
678 | 2 | } |
679 | 46 | FileBlockSPtr file_block = cell->file_block; |
680 | 46 | if (file_block) { |
681 | 46 | std::lock_guard block_lock(file_block->_mutex); |
682 | 46 | remove(file_block, cache_lock, block_lock, false); |
683 | 46 | ++num_cells_to_delete; |
684 | 46 | } |
685 | 46 | } |
686 | 4 | clear_need_update_lru_blocks(); |
687 | 4 | } |
688 | | |
689 | 4 | std::stringstream ss; |
690 | 4 | ss << "finish clear_file_cache_async, path=" << _cache_base_path |
691 | 4 | << " num_files_all=" << num_files_all << " num_cells_all=" << num_cells_all |
692 | 4 | << " num_cells_to_delete=" << num_cells_to_delete |
693 | 4 | << " num_cells_wait_recycle=" << num_cells_wait_recycle; |
694 | 4 | auto msg = ss.str(); |
695 | 4 | LOG(INFO) << msg; |
696 | 4 | _lru_dumper->remove_lru_dump_files(); |
697 | 4 | return msg; |
698 | 4 | } |
699 | | |
700 | | FileBlocks BlockFileCache::split_range_into_cells(const UInt128Wrapper& hash, |
701 | | const CacheContext& context, size_t offset, |
702 | | size_t size, FileBlock::State state, |
703 | 8.33k | std::lock_guard<std::mutex>& cache_lock) { |
704 | 8.33k | DCHECK(size > 0); |
705 | | |
706 | 8.33k | auto current_pos = offset; |
707 | 8.33k | auto end_pos_non_included = offset + size; |
708 | | |
709 | 8.33k | size_t current_size = 0; |
710 | 8.33k | size_t remaining_size = size; |
711 | | |
712 | 8.33k | FileBlocks file_blocks; |
713 | 16.7k | while (current_pos < end_pos_non_included) { |
714 | 8.37k | current_size = std::min(remaining_size, _max_file_block_size); |
715 | 8.37k | remaining_size -= current_size; |
716 | 8.37k | state = try_reserve(hash, context, current_pos, current_size, cache_lock) |
717 | 8.37k | ? state |
718 | 8.37k | : FileBlock::State::SKIP_CACHE; |
719 | 8.37k | if (state == FileBlock::State::SKIP_CACHE) [[unlikely]] { |
720 | 62 | FileCacheKey key; |
721 | 62 | key.hash = hash; |
722 | 62 | key.offset = current_pos; |
723 | 62 | key.meta.type = context.cache_type; |
724 | 62 | key.meta.expiration_time = context.expiration_time; |
725 | 62 | key.meta.tablet_id = context.tablet_id; |
726 | 62 | auto file_block = std::make_shared<FileBlock>(key, current_size, this, |
727 | 62 | FileBlock::State::SKIP_CACHE); |
728 | 62 | file_blocks.push_back(std::move(file_block)); |
729 | 8.31k | } else { |
730 | 8.31k | auto* cell = add_cell(hash, context, current_pos, current_size, state, cache_lock); |
731 | 8.31k | if (cell) { |
732 | 8.31k | file_blocks.push_back(cell->file_block); |
733 | 8.31k | if (!context.is_cold_data) { |
734 | 8.31k | cell->update_atime(); |
735 | 8.31k | } |
736 | 8.31k | } |
737 | 8.31k | if (_ttl_mgr && context.tablet_id != 0) { |
738 | 1.02k | _ttl_mgr->register_tablet_id(context.tablet_id); |
739 | 1.02k | } |
740 | 8.31k | } |
741 | | |
742 | 8.37k | current_pos += current_size; |
743 | 8.37k | } |
744 | | |
745 | 8.33k | DCHECK(file_blocks.empty() || offset + size - 1 == file_blocks.back()->range().right); |
746 | 8.33k | return file_blocks; |
747 | 8.33k | } |
748 | | |
749 | | void BlockFileCache::fill_holes_with_empty_file_blocks(FileBlocks& file_blocks, |
750 | | const UInt128Wrapper& hash, |
751 | | const CacheContext& context, |
752 | | const FileBlock::Range& range, |
753 | 278 | std::lock_guard<std::mutex>& cache_lock) { |
754 | | /// There are blocks [block1, ..., blockN] |
755 | | /// (non-overlapping, non-empty, ascending-ordered) which (maybe partially) |
756 | | /// intersect with given range. |
757 | | |
758 | | /// It can have holes: |
759 | | /// [____________________] -- requested range |
760 | | /// [____] [_] [_________] -- intersecting cache [block1, ..., blockN] |
761 | | /// |
762 | | /// For each such hole create a cell with file block state EMPTY. |
763 | | |
764 | 278 | auto it = file_blocks.begin(); |
765 | 278 | auto block_range = (*it)->range(); |
766 | | |
767 | 278 | size_t current_pos = 0; |
768 | 278 | if (block_range.left < range.left) { |
769 | | /// [_______ -- requested range |
770 | | /// [_______ |
771 | | /// ^ |
772 | | /// block1 |
773 | | |
774 | 151 | current_pos = block_range.right + 1; |
775 | 151 | ++it; |
776 | 151 | } else { |
777 | 127 | current_pos = range.left; |
778 | 127 | } |
779 | | |
780 | 900 | while (current_pos <= range.right && it != file_blocks.end()) { |
781 | 622 | block_range = (*it)->range(); |
782 | | |
783 | 622 | if (current_pos == block_range.left) { |
784 | 513 | current_pos = block_range.right + 1; |
785 | 513 | ++it; |
786 | 513 | continue; |
787 | 513 | } |
788 | | |
789 | 622 | DCHECK(current_pos < block_range.left); |
790 | | |
791 | 109 | auto hole_size = block_range.left - current_pos; |
792 | | |
793 | 109 | file_blocks.splice(it, split_range_into_cells(hash, context, current_pos, hole_size, |
794 | 109 | FileBlock::State::EMPTY, cache_lock)); |
795 | | |
796 | 109 | current_pos = block_range.right + 1; |
797 | 109 | ++it; |
798 | 109 | } |
799 | | |
800 | 278 | if (current_pos <= range.right) { |
801 | | /// ________] -- requested range |
802 | | /// _____] |
803 | | /// ^ |
804 | | /// blockN |
805 | | |
806 | 86 | auto hole_size = range.right - current_pos + 1; |
807 | | |
808 | 86 | file_blocks.splice(file_blocks.end(), |
809 | 86 | split_range_into_cells(hash, context, current_pos, hole_size, |
810 | 86 | FileBlock::State::EMPTY, cache_lock)); |
811 | 86 | } |
812 | 278 | } |
813 | | |
814 | | FileBlocksHolder BlockFileCache::get_or_set(const UInt128Wrapper& hash, size_t offset, size_t size, |
815 | 8.42k | CacheContext& context) { |
816 | 8.42k | FileBlock::Range range(offset, offset + size - 1); |
817 | | |
818 | 8.42k | ReadStatistics* stats = context.stats; |
819 | 8.42k | DCHECK(stats != nullptr); |
820 | 8.42k | MonotonicStopWatch sw; |
821 | 8.42k | sw.start(); |
822 | 8.42k | ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->increment(); |
823 | 8.42k | std::lock_guard cache_lock(_mutex); |
824 | 8.42k | ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->decrement(); |
825 | 8.42k | stats->lock_wait_timer += sw.elapsed_time(); |
826 | 8.42k | FileBlocks file_blocks; |
827 | 8.42k | int64_t duration = 0; |
828 | 8.42k | { |
829 | 8.42k | SCOPED_RAW_TIMER(&duration); |
830 | | /// Get all blocks which intersect with the given range. |
831 | 8.42k | { |
832 | 8.42k | SCOPED_RAW_TIMER(&stats->get_timer); |
833 | 8.42k | file_blocks = get_impl(hash, context, range, cache_lock); |
834 | 8.42k | } |
835 | | |
836 | 8.42k | if (file_blocks.empty()) { |
837 | 8.14k | SCOPED_RAW_TIMER(&stats->set_timer); |
838 | 8.14k | file_blocks = split_range_into_cells(hash, context, offset, size, |
839 | 8.14k | FileBlock::State::EMPTY, cache_lock); |
840 | 8.14k | } else { |
841 | 278 | SCOPED_RAW_TIMER(&stats->set_timer); |
842 | 278 | fill_holes_with_empty_file_blocks(file_blocks, hash, context, range, cache_lock); |
843 | 278 | } |
844 | 8.42k | DCHECK(!file_blocks.empty()); |
845 | 8.42k | *_num_read_blocks << file_blocks.size(); |
846 | 8.42k | if (!context.is_warmup) { |
847 | 8.42k | *_no_warmup_num_read_blocks << file_blocks.size(); |
848 | 8.42k | } |
849 | 9.15k | for (auto& block : file_blocks) { |
850 | 9.15k | size_t block_size = block->range().size(); |
851 | 9.15k | *_total_read_size_metrics << block_size; |
852 | 9.15k | if (block->state_unsafe() == FileBlock::State::DOWNLOADED) { |
853 | 729 | *_num_hit_blocks << 1; |
854 | 729 | *_total_hit_size_metrics << block_size; |
855 | 729 | if (!context.is_warmup) { |
856 | 729 | *_no_warmup_num_hit_blocks << 1; |
857 | 729 | } |
858 | 729 | } |
859 | 9.15k | } |
860 | 8.42k | } |
861 | 8.42k | *_get_or_set_latency_us << (duration / 1000); |
862 | 8.42k | return FileBlocksHolder(std::move(file_blocks)); |
863 | 8.42k | } |
864 | | |
865 | | FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& hash, const CacheContext& context, |
866 | | size_t offset, size_t size, FileBlock::State state, |
867 | 8.51k | std::lock_guard<std::mutex>& cache_lock) { |
868 | | /// Create a file block cell and put it in `files` map by [hash][offset]. |
869 | 8.51k | if (size == 0) { |
870 | 0 | return nullptr; /// Empty files are not cached. |
871 | 0 | } |
872 | | |
873 | 8.51k | VLOG_DEBUG << "Adding file block to cache. size=" << size << " hash=" << hash.to_string() |
874 | 0 | << " offset=" << offset << " cache_type=" << cache_type_to_string(context.cache_type) |
875 | 0 | << " expiration_time=" << context.expiration_time |
876 | 0 | << " tablet_id=" << context.tablet_id; |
877 | | |
878 | 8.51k | if (size > 1024 * 1024 * 1024) { |
879 | 0 | LOG(WARNING) << "File block size is too large for a block. size=" << size |
880 | 0 | << " hash=" << hash.to_string() << " offset=" << offset |
881 | 0 | << " stack:" << get_stack_trace(); |
882 | 0 | } |
883 | | |
884 | 8.51k | auto& offsets = _files[hash]; |
885 | 8.51k | auto itr = offsets.find(offset); |
886 | 8.51k | if (itr != offsets.end()) { |
887 | 5 | VLOG_DEBUG << "Cache already exists for hash: " << hash.to_string() |
888 | 0 | << ", offset: " << offset << ", size: " << size |
889 | 0 | << ".\nCurrent cache structure: " << dump_structure_unlocked(hash, cache_lock); |
890 | 5 | return &(itr->second); |
891 | 5 | } |
892 | | |
893 | 8.51k | FileCacheKey key; |
894 | 8.51k | key.hash = hash; |
895 | 8.51k | key.offset = offset; |
896 | 8.51k | key.meta.type = context.cache_type; |
897 | 8.51k | key.meta.expiration_time = context.expiration_time; |
898 | 8.51k | key.meta.tablet_id = context.tablet_id; |
899 | 8.51k | FileBlockCell cell(std::make_shared<FileBlock>(key, size, this, state), cache_lock); |
900 | 8.51k | Status st; |
901 | 8.51k | if (context.expiration_time == 0 && context.cache_type == FileCacheType::TTL) { |
902 | 0 | st = cell.file_block->change_cache_type_lock(FileCacheType::NORMAL, cache_lock); |
903 | 8.51k | } else if (context.cache_type != FileCacheType::TTL && context.expiration_time != 0) { |
904 | 1 | st = cell.file_block->change_cache_type_lock(FileCacheType::TTL, cache_lock); |
905 | 1 | } |
906 | 8.51k | if (!st.ok()) { |
907 | 0 | LOG(WARNING) << "Cannot change cache type. expiration_time=" << context.expiration_time |
908 | 0 | << " cache_type=" << cache_type_to_string(context.cache_type) |
909 | 0 | << " error=" << st.msg(); |
910 | 0 | } |
911 | | |
912 | 8.51k | auto& queue = get_queue(cell.file_block->cache_type()); |
913 | 8.51k | cell.queue_iterator = queue.add(hash, offset, size, cache_lock); |
914 | 8.51k | _lru_recorder->record_queue_event(cell.file_block->cache_type(), CacheLRULogType::ADD, |
915 | 8.51k | cell.file_block->get_hash_value(), cell.file_block->offset(), |
916 | 8.51k | cell.size()); |
917 | | |
918 | 8.51k | if (cell.file_block->cache_type() == FileCacheType::TTL) { |
919 | 3.92k | _cur_ttl_size += cell.size(); |
920 | 3.92k | } |
921 | 8.51k | auto [it, _] = offsets.insert(std::make_pair(offset, std::move(cell))); |
922 | 8.51k | _cur_cache_size += size; |
923 | 8.51k | return &(it->second); |
924 | 8.51k | } |
925 | | |
926 | 0 | size_t BlockFileCache::try_release() { |
927 | 0 | SCOPED_CACHE_LOCK(_mutex, this); |
928 | 0 | std::vector<FileBlockCell*> trash; |
929 | 0 | for (auto& [hash, blocks] : _files) { |
930 | 0 | for (auto& [offset, cell] : blocks) { |
931 | 0 | if (cell.releasable()) { |
932 | 0 | trash.emplace_back(&cell); |
933 | 0 | } else { |
934 | 0 | cell.file_block->set_deleting(); |
935 | 0 | } |
936 | 0 | } |
937 | 0 | } |
938 | 0 | size_t remove_size = 0; |
939 | 0 | for (auto& cell : trash) { |
940 | 0 | FileBlockSPtr file_block = cell->file_block; |
941 | 0 | std::lock_guard lc(cell->file_block->_mutex); |
942 | 0 | remove_size += file_block->range().size(); |
943 | 0 | remove(file_block, cache_lock, lc); |
944 | 0 | VLOG_DEBUG << "try_release " << _cache_base_path |
945 | 0 | << " hash=" << file_block->get_hash_value().to_string() |
946 | 0 | << " offset=" << file_block->offset(); |
947 | 0 | } |
948 | 0 | *_evict_by_try_release << remove_size; |
949 | 0 | LOG(INFO) << "Released " << trash.size() << " blocks in file cache " << _cache_base_path; |
950 | 0 | return trash.size(); |
951 | 0 | } |
952 | | |
953 | 34.9k | LRUQueue& BlockFileCache::get_queue(FileCacheType type) { |
954 | 34.9k | switch (type) { |
955 | 8.39k | case FileCacheType::INDEX: |
956 | 8.39k | return _index_queue; |
957 | 8.20k | case FileCacheType::DISPOSABLE: |
958 | 8.20k | return _disposable_queue; |
959 | 12.5k | case FileCacheType::NORMAL: |
960 | 12.5k | return _normal_queue; |
961 | 5.79k | case FileCacheType::TTL: |
962 | 5.79k | return _ttl_queue; |
963 | 0 | default: |
964 | 0 | DCHECK(false); |
965 | 34.9k | } |
966 | 0 | return _normal_queue; |
967 | 34.9k | } |
968 | | |
969 | 137 | const LRUQueue& BlockFileCache::get_queue(FileCacheType type) const { |
970 | 137 | switch (type) { |
971 | 33 | case FileCacheType::INDEX: |
972 | 33 | return _index_queue; |
973 | 1 | case FileCacheType::DISPOSABLE: |
974 | 1 | return _disposable_queue; |
975 | 103 | case FileCacheType::NORMAL: |
976 | 103 | return _normal_queue; |
977 | 0 | case FileCacheType::TTL: |
978 | 0 | return _ttl_queue; |
979 | 0 | default: |
980 | 0 | DCHECK(false); |
981 | 137 | } |
982 | 0 | return _normal_queue; |
983 | 137 | } |
984 | | |
985 | | void BlockFileCache::remove_file_blocks(std::vector<FileBlockCell*>& to_evict, |
986 | | std::lock_guard<std::mutex>& cache_lock, bool sync, |
987 | 9.26k | std::string& reason) { |
988 | 9.26k | auto remove_file_block_if = [&](FileBlockCell* cell) { |
989 | 849 | FileBlockSPtr file_block = cell->file_block; |
990 | 849 | if (file_block) { |
991 | 849 | std::lock_guard block_lock(file_block->_mutex); |
992 | 849 | remove(file_block, cache_lock, block_lock, sync); |
993 | 849 | VLOG_DEBUG << "remove_file_blocks" |
994 | 0 | << " hash=" << file_block->get_hash_value().to_string() |
995 | 0 | << " offset=" << file_block->offset() << " reason=" << reason; |
996 | 849 | } |
997 | 849 | }; |
998 | 9.26k | std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if); |
999 | 9.26k | } |
1000 | | |
1001 | | void BlockFileCache::find_evict_candidates(LRUQueue& queue, size_t size, size_t cur_cache_size, |
1002 | | size_t& removed_size, |
1003 | | std::vector<FileBlockCell*>& to_evict, |
1004 | | std::lock_guard<std::mutex>& cache_lock, |
1005 | 981 | size_t& cur_removed_size, bool evict_in_advance) { |
1006 | 2.22k | for (const auto& [entry_key, entry_offset, entry_size] : queue) { |
1007 | 2.22k | if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) { |
1008 | 920 | break; |
1009 | 920 | } |
1010 | 1.30k | auto* cell = get_cell(entry_key, entry_offset, cache_lock); |
1011 | | |
1012 | 1.30k | DCHECK(cell) << "Cache became inconsistent. key: " << entry_key.to_string() |
1013 | 0 | << ", offset: " << entry_offset; |
1014 | | |
1015 | 1.30k | size_t cell_size = cell->size(); |
1016 | 1.30k | DCHECK(entry_size == cell_size); |
1017 | | |
1018 | 1.30k | if (cell->releasable()) { |
1019 | 833 | auto& file_block = cell->file_block; |
1020 | | |
1021 | 833 | std::lock_guard block_lock(file_block->_mutex); |
1022 | 833 | DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED); |
1023 | 833 | to_evict.push_back(cell); |
1024 | 833 | removed_size += cell_size; |
1025 | 833 | cur_removed_size += cell_size; |
1026 | 833 | } |
1027 | 1.30k | } |
1028 | 981 | } |
1029 | | |
1030 | | // 1. if async load file cache not finish |
1031 | | // a. evict from lru queue |
1032 | | // 2. if ttl cache |
1033 | | // a. evict from disposable/normal/index queue one by one |
1034 | | // 3. if dont reach query limit or dont have query limit |
1035 | | // a. evict from other queue |
1036 | | // b. evict from current queue |
1037 | | // a.1 if the data belong write, then just evict cold data |
1038 | | // 4. if reach query limit |
1039 | | // a. evict from query queue |
1040 | | // b. evict from other queue |
1041 | | bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const CacheContext& context, |
1042 | | size_t offset, size_t size, |
1043 | 8.37k | std::lock_guard<std::mutex>& cache_lock) { |
1044 | 8.37k | if (!_async_open_done) { |
1045 | 1.00k | return try_reserve_during_async_load(size, cache_lock); |
1046 | 1.00k | } |
1047 | | // use this strategy in scenarios where there is insufficient disk capacity or insufficient number of inodes remaining |
1048 | | // directly eliminate 5 times the size of the space |
1049 | 7.37k | if (_disk_resource_limit_mode) { |
1050 | 1 | size = 5 * size; |
1051 | 1 | } |
1052 | | |
1053 | 7.37k | auto query_context = config::enable_file_cache_query_limit && |
1054 | 7.37k | (context.query_id.hi != 0 || context.query_id.lo != 0) |
1055 | 7.37k | ? get_query_context(context.query_id, cache_lock) |
1056 | 7.37k | : nullptr; |
1057 | 7.37k | if (!query_context) { |
1058 | 7.34k | return try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock); |
1059 | 7.34k | } else if (query_context->get_cache_size(cache_lock) + size <= |
1060 | 31 | query_context->get_max_cache_size()) { |
1061 | 16 | return try_reserve_for_lru(hash, query_context, context, offset, size, cache_lock); |
1062 | 16 | } |
1063 | 15 | int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>( |
1064 | 15 | std::chrono::steady_clock::now().time_since_epoch()) |
1065 | 15 | .count(); |
1066 | 15 | auto& queue = get_queue(context.cache_type); |
1067 | 15 | size_t removed_size = 0; |
1068 | 15 | size_t ghost_remove_size = 0; |
1069 | 15 | size_t queue_size = queue.get_capacity(cache_lock); |
1070 | 15 | size_t cur_cache_size = _cur_cache_size; |
1071 | 15 | size_t query_context_cache_size = query_context->get_cache_size(cache_lock); |
1072 | | |
1073 | 15 | std::vector<LRUQueue::Iterator> ghost; |
1074 | 15 | std::vector<FileBlockCell*> to_evict; |
1075 | | |
1076 | 15 | size_t max_size = queue.get_max_size(); |
1077 | 48 | auto is_overflow = [&] { |
1078 | 48 | return _disk_resource_limit_mode ? removed_size < size |
1079 | 48 | : cur_cache_size + size - removed_size > _capacity || |
1080 | 48 | (queue_size + size - removed_size > max_size) || |
1081 | 48 | (query_context_cache_size + size - |
1082 | 38 | (removed_size + ghost_remove_size) > |
1083 | 38 | query_context->get_max_cache_size()); |
1084 | 48 | }; |
1085 | | |
1086 | | /// Select the cache from the LRU queue held by query for expulsion. |
1087 | 35 | for (auto iter = query_context->queue().begin(); iter != query_context->queue().end(); iter++) { |
1088 | 33 | if (!is_overflow()) { |
1089 | 13 | break; |
1090 | 13 | } |
1091 | | |
1092 | 20 | auto* cell = get_cell(iter->hash, iter->offset, cache_lock); |
1093 | | |
1094 | 20 | if (!cell) { |
1095 | | /// The cache corresponding to this record may be swapped out by |
1096 | | /// other queries, so it has become invalid. |
1097 | 4 | ghost.push_back(iter); |
1098 | 4 | ghost_remove_size += iter->size; |
1099 | 16 | } else { |
1100 | 16 | size_t cell_size = cell->size(); |
1101 | 16 | DCHECK(iter->size == cell_size); |
1102 | | |
1103 | 16 | if (cell->releasable()) { |
1104 | 16 | auto& file_block = cell->file_block; |
1105 | | |
1106 | 16 | std::lock_guard block_lock(file_block->_mutex); |
1107 | 16 | DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED); |
1108 | 16 | to_evict.push_back(cell); |
1109 | 16 | removed_size += cell_size; |
1110 | 16 | } |
1111 | 16 | } |
1112 | 20 | } |
1113 | | |
1114 | 16 | auto remove_file_block_if = [&](FileBlockCell* cell) { |
1115 | 16 | FileBlockSPtr file_block = cell->file_block; |
1116 | 16 | if (file_block) { |
1117 | 16 | query_context->remove(file_block->get_hash_value(), file_block->offset(), cache_lock); |
1118 | 16 | std::lock_guard block_lock(file_block->_mutex); |
1119 | 16 | remove(file_block, cache_lock, block_lock); |
1120 | 16 | } |
1121 | 16 | }; |
1122 | | |
1123 | 15 | for (auto& iter : ghost) { |
1124 | 4 | query_context->remove(iter->hash, iter->offset, cache_lock); |
1125 | 4 | } |
1126 | | |
1127 | 15 | std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if); |
1128 | | |
1129 | 15 | if (is_overflow() && |
1130 | 15 | !try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock)) { |
1131 | 1 | return false; |
1132 | 1 | } |
1133 | 14 | query_context->reserve(hash, offset, size, cache_lock); |
1134 | 14 | return true; |
1135 | 15 | } |
1136 | | |
1137 | 1 | void BlockFileCache::try_evict_in_advance(size_t size, std::lock_guard<std::mutex>& cache_lock) { |
1138 | 1 | UInt128Wrapper hash = UInt128Wrapper(); |
1139 | 1 | size_t offset = 0; |
1140 | 1 | CacheContext context; |
1141 | | /* we pick NORMAL and TTL cache to evict in advance |
1142 | | * we reserve for them but won't acutually give space to them |
1143 | | * on the contrary, NORMAL and TTL may sacrifice by LRU evicting themselves |
1144 | | * other cache types cannot be exempted because we will evict what they have stolen before LRU evicting |
1145 | | * in summary: all cache types will shrink somewhat, and NORMAL and TTL shrink the most, to make sure the cache is not full |
1146 | | */ |
1147 | 1 | context.cache_type = FileCacheType::NORMAL; |
1148 | 1 | try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true); |
1149 | 1 | context.cache_type = FileCacheType::TTL; |
1150 | 1 | try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true); |
1151 | 1 | } |
1152 | | |
1153 | | // remove specific cache synchronously, for critical operations |
1154 | | // if in use, cache meta will be deleted after use and the block file is then deleted asynchronously |
1155 | 7 | void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) { |
1156 | 7 | std::string reason = "remove_if_cached"; |
1157 | 7 | SCOPED_CACHE_LOCK(_mutex, this); |
1158 | 7 | auto iter = _files.find(file_key); |
1159 | 7 | std::vector<FileBlockCell*> to_remove; |
1160 | 7 | if (iter != _files.end()) { |
1161 | 14 | for (auto& [_, cell] : iter->second) { |
1162 | 14 | if (cell.releasable()) { |
1163 | 13 | to_remove.push_back(&cell); |
1164 | 13 | } else { |
1165 | 1 | cell.file_block->set_deleting(); |
1166 | 1 | } |
1167 | 14 | } |
1168 | 6 | } |
1169 | 7 | remove_file_blocks(to_remove, cache_lock, true, reason); |
1170 | 7 | } |
1171 | | |
1172 | | // the async version of remove_if_cached, for background operations |
1173 | | // cache meta is deleted synchronously if not in use, and the block file is deleted asynchronously |
1174 | | // if in use, cache meta will be deleted after use and the block file is then deleted asynchronously |
1175 | 1 | void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) { |
1176 | 1 | std::string reason = "remove_if_cached_async"; |
1177 | 1 | SCOPED_CACHE_LOCK(_mutex, this); |
1178 | | |
1179 | 1 | auto iter = _files.find(file_key); |
1180 | 1 | std::vector<FileBlockCell*> to_remove; |
1181 | 1 | if (iter != _files.end()) { |
1182 | 1 | for (auto& [_, cell] : iter->second) { |
1183 | 1 | *_gc_evict_bytes_metrics << cell.size(); |
1184 | 1 | *_gc_evict_count_metrics << 1; |
1185 | 1 | if (cell.releasable()) { |
1186 | 0 | to_remove.push_back(&cell); |
1187 | 1 | } else { |
1188 | 1 | cell.file_block->set_deleting(); |
1189 | 1 | } |
1190 | 1 | } |
1191 | 1 | } |
1192 | 1 | remove_file_blocks(to_remove, cache_lock, false, reason); |
1193 | 1 | } |
1194 | | |
1195 | | std::vector<FileCacheType> BlockFileCache::get_other_cache_type_without_ttl( |
1196 | 7.36k | FileCacheType cur_cache_type) { |
1197 | 7.36k | switch (cur_cache_type) { |
1198 | 3.91k | case FileCacheType::TTL: |
1199 | 3.91k | return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX}; |
1200 | 671 | case FileCacheType::INDEX: |
1201 | 671 | return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL}; |
1202 | 2.16k | case FileCacheType::NORMAL: |
1203 | 2.16k | return {FileCacheType::DISPOSABLE, FileCacheType::INDEX}; |
1204 | 617 | case FileCacheType::DISPOSABLE: |
1205 | 617 | return {FileCacheType::NORMAL, FileCacheType::INDEX}; |
1206 | 0 | default: |
1207 | 0 | return {}; |
1208 | 7.36k | } |
1209 | 0 | return {}; |
1210 | 7.36k | } |
1211 | | |
1212 | 889 | std::vector<FileCacheType> BlockFileCache::get_other_cache_type(FileCacheType cur_cache_type) { |
1213 | 889 | switch (cur_cache_type) { |
1214 | 283 | case FileCacheType::TTL: |
1215 | 283 | return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX}; |
1216 | 143 | case FileCacheType::INDEX: |
1217 | 143 | return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::TTL}; |
1218 | 311 | case FileCacheType::NORMAL: |
1219 | 311 | return {FileCacheType::DISPOSABLE, FileCacheType::INDEX, FileCacheType::TTL}; |
1220 | 152 | case FileCacheType::DISPOSABLE: |
1221 | 152 | return {FileCacheType::NORMAL, FileCacheType::INDEX, FileCacheType::TTL}; |
1222 | 0 | default: |
1223 | 0 | return {}; |
1224 | 889 | } |
1225 | 0 | return {}; |
1226 | 889 | } |
1227 | | |
1228 | | void BlockFileCache::reset_range(const UInt128Wrapper& hash, size_t offset, size_t old_size, |
1229 | 4 | size_t new_size, std::lock_guard<std::mutex>& cache_lock) { |
1230 | 4 | DCHECK(_files.find(hash) != _files.end() && |
1231 | 4 | _files.find(hash)->second.find(offset) != _files.find(hash)->second.end()); |
1232 | 4 | FileBlockCell* cell = get_cell(hash, offset, cache_lock); |
1233 | 4 | DCHECK(cell != nullptr); |
1234 | 4 | if (cell == nullptr) { |
1235 | 0 | LOG(WARNING) << "reset_range skipped because cache cell is missing. hash=" |
1236 | 0 | << hash.to_string() << " offset=" << offset << " old_size=" << old_size |
1237 | 0 | << " new_size=" << new_size; |
1238 | 0 | return; |
1239 | 0 | } |
1240 | 4 | if (cell->queue_iterator) { |
1241 | 4 | auto& queue = get_queue(cell->file_block->cache_type()); |
1242 | 4 | DCHECK(queue.contains(hash, offset, cache_lock)); |
1243 | 4 | queue.resize(*cell->queue_iterator, new_size, cache_lock); |
1244 | 4 | _lru_recorder->record_queue_event(cell->file_block->cache_type(), CacheLRULogType::RESIZE, |
1245 | 4 | cell->file_block->get_hash_value(), |
1246 | 4 | cell->file_block->offset(), new_size); |
1247 | 4 | } |
1248 | 4 | _cur_cache_size -= old_size; |
1249 | 4 | _cur_cache_size += new_size; |
1250 | 4 | } |
1251 | | |
1252 | | bool BlockFileCache::try_reserve_from_other_queue_by_time_interval( |
1253 | | FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size, |
1254 | 7.36k | int64_t cur_time, std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) { |
1255 | 7.36k | size_t removed_size = 0; |
1256 | 7.36k | size_t cur_cache_size = _cur_cache_size; |
1257 | 7.36k | std::vector<FileBlockCell*> to_evict; |
1258 | 18.6k | for (FileCacheType cache_type : other_cache_types) { |
1259 | 18.6k | auto& queue = get_queue(cache_type); |
1260 | 18.6k | size_t remove_size_per_type = 0; |
1261 | 18.6k | for (const auto& [entry_key, entry_offset, entry_size] : queue) { |
1262 | 1.43k | if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) { |
1263 | 697 | break; |
1264 | 697 | } |
1265 | 738 | auto* cell = get_cell(entry_key, entry_offset, cache_lock); |
1266 | 738 | DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string() |
1267 | 0 | << ", offset: " << entry_offset; |
1268 | | |
1269 | 738 | size_t cell_size = cell->size(); |
1270 | 738 | DCHECK(entry_size == cell_size); |
1271 | | |
1272 | 738 | if (cell->atime == 0 ? true : cell->atime + queue.get_hot_data_interval() > cur_time) { |
1273 | 736 | break; |
1274 | 736 | } |
1275 | | |
1276 | 2 | if (cell->releasable()) { |
1277 | 2 | auto& file_block = cell->file_block; |
1278 | 2 | std::lock_guard block_lock(file_block->_mutex); |
1279 | 2 | DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED); |
1280 | 2 | to_evict.push_back(cell); |
1281 | 2 | removed_size += cell_size; |
1282 | 2 | remove_size_per_type += cell_size; |
1283 | 2 | } |
1284 | 2 | } |
1285 | 18.6k | *(_evict_by_time_metrics_matrix[cache_type][cur_type]) << remove_size_per_type; |
1286 | 18.6k | } |
1287 | 7.36k | bool is_sync_removal = !evict_in_advance; |
1288 | 7.36k | std::string reason = std::string("try_reserve_by_time ") + |
1289 | 7.36k | " evict_in_advance=" + (evict_in_advance ? "true" : "false"); |
1290 | 7.36k | remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason); |
1291 | | |
1292 | 7.36k | return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance); |
1293 | 7.36k | } |
1294 | | |
1295 | | bool BlockFileCache::is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size, |
1296 | 11.9k | bool evict_in_advance) const { |
1297 | 11.9k | bool ret = false; |
1298 | 11.9k | if (evict_in_advance) { // we don't need to check _need_evict_cache_in_advance |
1299 | 11 | ret = (removed_size < need_size); |
1300 | 11 | return ret; |
1301 | 11 | } |
1302 | 11.9k | if (_disk_resource_limit_mode) { |
1303 | 4 | ret = (removed_size < need_size); |
1304 | 11.8k | } else { |
1305 | 11.8k | ret = (cur_cache_size + need_size - removed_size > _capacity); |
1306 | 11.8k | } |
1307 | 11.9k | return ret; |
1308 | 11.9k | } |
1309 | | |
1310 | | bool BlockFileCache::try_reserve_from_other_queue_by_size( |
1311 | | FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size, |
1312 | 413 | std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) { |
1313 | 413 | size_t removed_size = 0; |
1314 | 413 | size_t cur_cache_size = _cur_cache_size; |
1315 | 413 | std::vector<FileBlockCell*> to_evict; |
1316 | | // we follow the privilege defined in get_other_cache_types to evict |
1317 | 1.23k | for (FileCacheType cache_type : other_cache_types) { |
1318 | 1.23k | auto& queue = get_queue(cache_type); |
1319 | | |
1320 | | // we will not drain each of them to the bottom -- i.e., we only |
1321 | | // evict what they have stolen. |
1322 | 1.23k | size_t cur_queue_size = queue.get_capacity(cache_lock); |
1323 | 1.23k | size_t cur_queue_max_size = queue.get_max_size(); |
1324 | 1.23k | if (cur_queue_size <= cur_queue_max_size) { |
1325 | 735 | continue; |
1326 | 735 | } |
1327 | 504 | size_t cur_removed_size = 0; |
1328 | 504 | find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock, |
1329 | 504 | cur_removed_size, evict_in_advance); |
1330 | 504 | *(_evict_by_size_metrics_matrix[cache_type][cur_type]) << cur_removed_size; |
1331 | 504 | } |
1332 | 413 | bool is_sync_removal = !evict_in_advance; |
1333 | 413 | std::string reason = std::string("try_reserve_by_size") + |
1334 | 413 | " evict_in_advance=" + (evict_in_advance ? "true" : "false"); |
1335 | 413 | remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason); |
1336 | 413 | return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance); |
1337 | 413 | } |
1338 | | |
1339 | | bool BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type, size_t size, |
1340 | | int64_t cur_time, |
1341 | | std::lock_guard<std::mutex>& cache_lock, |
1342 | 7.36k | bool evict_in_advance) { |
1343 | | // currently, TTL cache is not considered as a candidate |
1344 | 7.36k | auto other_cache_types = get_other_cache_type_without_ttl(cur_cache_type); |
1345 | 7.36k | bool reserve_success = try_reserve_from_other_queue_by_time_interval( |
1346 | 7.36k | cur_cache_type, other_cache_types, size, cur_time, cache_lock, evict_in_advance); |
1347 | 7.36k | if (reserve_success || !config::file_cache_enable_evict_from_other_queue_by_size) { |
1348 | 6.47k | return reserve_success; |
1349 | 6.47k | } |
1350 | | |
1351 | 889 | other_cache_types = get_other_cache_type(cur_cache_type); |
1352 | 889 | auto& cur_queue = get_queue(cur_cache_type); |
1353 | 889 | size_t cur_queue_size = cur_queue.get_capacity(cache_lock); |
1354 | 889 | size_t cur_queue_max_size = cur_queue.get_max_size(); |
1355 | | // Hit the soft limit by self, cannot remove from other queues |
1356 | 889 | if (_cur_cache_size + size > _capacity && cur_queue_size + size > cur_queue_max_size) { |
1357 | 476 | return false; |
1358 | 476 | } |
1359 | 413 | return try_reserve_from_other_queue_by_size(cur_cache_type, other_cache_types, size, cache_lock, |
1360 | 413 | evict_in_advance); |
1361 | 889 | } |
1362 | | |
1363 | | bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash, |
1364 | | QueryFileCacheContextPtr query_context, |
1365 | | const CacheContext& context, size_t offset, size_t size, |
1366 | | std::lock_guard<std::mutex>& cache_lock, |
1367 | 7.36k | bool evict_in_advance) { |
1368 | 7.36k | int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>( |
1369 | 7.36k | std::chrono::steady_clock::now().time_since_epoch()) |
1370 | 7.36k | .count(); |
1371 | 7.36k | if (!try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock, |
1372 | 7.36k | evict_in_advance)) { |
1373 | 477 | auto& queue = get_queue(context.cache_type); |
1374 | 477 | size_t removed_size = 0; |
1375 | 477 | size_t cur_cache_size = _cur_cache_size; |
1376 | | |
1377 | 477 | std::vector<FileBlockCell*> to_evict; |
1378 | 477 | size_t cur_removed_size = 0; |
1379 | 477 | find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock, |
1380 | 477 | cur_removed_size, evict_in_advance); |
1381 | 477 | bool is_sync_removal = !evict_in_advance; |
1382 | 477 | std::string reason = std::string("try_reserve for cache type ") + |
1383 | 477 | cache_type_to_string(context.cache_type) + |
1384 | 477 | " evict_in_advance=" + (evict_in_advance ? "true" : "false"); |
1385 | 477 | remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason); |
1386 | 477 | *(_evict_by_self_lru_metrics_matrix[context.cache_type]) << cur_removed_size; |
1387 | | |
1388 | 477 | if (is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) { |
1389 | 61 | return false; |
1390 | 61 | } |
1391 | 477 | } |
1392 | | |
1393 | 7.30k | if (query_context) { |
1394 | 16 | query_context->reserve(hash, offset, size, cache_lock); |
1395 | 16 | } |
1396 | 7.30k | return true; |
1397 | 7.36k | } |
1398 | | |
1399 | | template <class T, class U> |
1400 | | requires IsXLock<T> && IsXLock<U> |
1401 | 2.91k | void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lock, bool sync) { |
1402 | 2.91k | auto hash = file_block->get_hash_value(); |
1403 | 2.91k | auto offset = file_block->offset(); |
1404 | 2.91k | auto type = file_block->cache_type(); |
1405 | 2.91k | auto expiration_time = file_block->expiration_time(); |
1406 | 2.91k | auto tablet_id = file_block->tablet_id(); |
1407 | 2.91k | auto* cell = get_cell(hash, offset, cache_lock); |
1408 | 2.91k | file_block->cell = nullptr; |
1409 | 2.91k | DCHECK(cell); |
1410 | 2.91k | DCHECK(cell->queue_iterator); |
1411 | 2.91k | if (cell->queue_iterator) { |
1412 | 2.91k | auto& queue = get_queue(file_block->cache_type()); |
1413 | 2.91k | queue.remove(*cell->queue_iterator, cache_lock); |
1414 | 2.91k | _lru_recorder->record_queue_event(file_block->cache_type(), CacheLRULogType::REMOVE, |
1415 | 2.91k | cell->file_block->get_hash_value(), |
1416 | 2.91k | cell->file_block->offset(), cell->size()); |
1417 | 2.91k | } |
1418 | 2.91k | *_queue_evict_size_metrics[static_cast<int>(file_block->cache_type())] |
1419 | 2.91k | << file_block->range().size(); |
1420 | 2.91k | *_total_evict_size_metrics << file_block->range().size(); |
1421 | | |
1422 | 2.91k | VLOG_DEBUG << "Removing file block from cache. hash: " << hash.to_string() |
1423 | 0 | << ", offset: " << offset << ", size: " << file_block->range().size() |
1424 | 0 | << ", type: " << cache_type_to_string(type); |
1425 | | |
1426 | 2.91k | if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADED) { |
1427 | 927 | FileCacheKey key; |
1428 | 927 | key.hash = hash; |
1429 | 927 | key.offset = offset; |
1430 | 927 | key.meta.type = type; |
1431 | 927 | key.meta.expiration_time = expiration_time; |
1432 | 927 | key.meta.tablet_id = tablet_id; |
1433 | 927 | if (sync) { |
1434 | 873 | int64_t duration_ns = 0; |
1435 | 873 | Status st; |
1436 | 873 | { |
1437 | 873 | SCOPED_RAW_TIMER(&duration_ns); |
1438 | 873 | st = _storage->remove(key); |
1439 | 873 | } |
1440 | 873 | *_storage_sync_remove_latency_us << (duration_ns / 1000); |
1441 | 873 | if (!st.ok()) { |
1442 | 167 | LOG_WARNING("").error(st); |
1443 | 167 | } |
1444 | 873 | } else { |
1445 | | // the file will be deleted in the bottom half |
1446 | | // so there will be a window that the file is not in the cache but still in the storage |
1447 | | // but it's ok, because the rowset is stale already |
1448 | 54 | bool ret = _recycle_keys.enqueue(key); |
1449 | 54 | if (ret) [[likely]] { |
1450 | 54 | *_recycle_keys_length_recorder << _recycle_keys.size_approx(); |
1451 | 54 | } else { |
1452 | 0 | LOG_WARNING("Failed to push recycle key to queue, do it synchronously"); |
1453 | 0 | int64_t duration_ns = 0; |
1454 | 0 | Status st; |
1455 | 0 | { |
1456 | 0 | SCOPED_RAW_TIMER(&duration_ns); |
1457 | 0 | st = _storage->remove(key); |
1458 | 0 | } |
1459 | 0 | *_storage_retry_sync_remove_latency_us << (duration_ns / 1000); |
1460 | 0 | if (!st.ok()) { |
1461 | 0 | LOG_WARNING("").error(st); |
1462 | 0 | } |
1463 | 0 | } |
1464 | 54 | } |
1465 | 1.98k | } else if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADING) { |
1466 | 0 | file_block->set_deleting(); |
1467 | 0 | return; |
1468 | 0 | } |
1469 | 2.91k | _cur_cache_size -= file_block->range().size(); |
1470 | 2.91k | if (FileCacheType::TTL == type) { |
1471 | 1.29k | _cur_ttl_size -= file_block->range().size(); |
1472 | 1.29k | } |
1473 | 2.91k | auto it = _files.find(hash); |
1474 | 2.91k | if (it != _files.end()) { |
1475 | 2.91k | it->second.erase(file_block->offset()); |
1476 | 2.91k | if (it->second.empty()) { |
1477 | 832 | _files.erase(hash); |
1478 | 832 | } |
1479 | 2.91k | } |
1480 | 2.91k | *_num_removed_blocks << 1; |
1481 | 2.91k | } |
1482 | | |
1483 | 46 | size_t BlockFileCache::get_used_cache_size(FileCacheType cache_type) const { |
1484 | 46 | SCOPED_CACHE_LOCK(_mutex, this); |
1485 | 46 | return get_used_cache_size_unlocked(cache_type, cache_lock); |
1486 | 46 | } |
1487 | | |
1488 | | size_t BlockFileCache::get_used_cache_size_unlocked(FileCacheType cache_type, |
1489 | 46 | std::lock_guard<std::mutex>& cache_lock) const { |
1490 | 46 | return get_queue(cache_type).get_capacity(cache_lock); |
1491 | 46 | } |
1492 | | |
1493 | 0 | size_t BlockFileCache::get_available_cache_size(FileCacheType cache_type) const { |
1494 | 0 | SCOPED_CACHE_LOCK(_mutex, this); |
1495 | 0 | return get_available_cache_size_unlocked(cache_type, cache_lock); |
1496 | 0 | } |
1497 | | |
1498 | | size_t BlockFileCache::get_available_cache_size_unlocked( |
1499 | 0 | FileCacheType cache_type, std::lock_guard<std::mutex>& cache_lock) const { |
1500 | 0 | return get_queue(cache_type).get_max_element_size() - |
1501 | 0 | get_used_cache_size_unlocked(cache_type, cache_lock); |
1502 | 0 | } |
1503 | | |
1504 | 88 | size_t BlockFileCache::get_file_blocks_num(FileCacheType cache_type) const { |
1505 | 88 | SCOPED_CACHE_LOCK(_mutex, this); |
1506 | 88 | return get_file_blocks_num_unlocked(cache_type, cache_lock); |
1507 | 88 | } |
1508 | | |
1509 | | size_t BlockFileCache::get_file_blocks_num_unlocked(FileCacheType cache_type, |
1510 | 88 | std::lock_guard<std::mutex>& cache_lock) const { |
1511 | 88 | return get_queue(cache_type).get_elements_num(cache_lock); |
1512 | 88 | } |
1513 | | |
1514 | | FileBlockCell::FileBlockCell(FileBlockSPtr file_block, std::lock_guard<std::mutex>& cache_lock) |
1515 | 8.51k | : file_block(file_block) { |
1516 | 8.51k | file_block->cell = this; |
1517 | | /** |
1518 | | * Cell can be created with either DOWNLOADED or EMPTY file block's state. |
1519 | | * File block acquires DOWNLOADING state and creates LRUQueue iterator on first |
1520 | | * successful getOrSetDownaloder call. |
1521 | | */ |
1522 | | |
1523 | 8.51k | switch (file_block->_download_state) { |
1524 | 197 | case FileBlock::State::DOWNLOADED: |
1525 | 8.51k | case FileBlock::State::EMPTY: |
1526 | 8.51k | case FileBlock::State::SKIP_CACHE: { |
1527 | 8.51k | break; |
1528 | 8.51k | } |
1529 | 0 | default: |
1530 | 0 | DCHECK(false) << "Can create cell with either EMPTY, DOWNLOADED, SKIP_CACHE state, got: " |
1531 | 0 | << FileBlock::state_to_string(file_block->_download_state); |
1532 | 8.51k | } |
1533 | 8.51k | if (file_block->cache_type() == FileCacheType::TTL) { |
1534 | 3.92k | update_atime(); |
1535 | 3.92k | } |
1536 | 8.51k | } |
1537 | | |
1538 | | LRUQueue::Iterator LRUQueue::add(const UInt128Wrapper& hash, size_t offset, size_t size, |
1539 | 9.33k | std::lock_guard<std::mutex>& /* cache_lock */) { |
1540 | 9.33k | cache_size += size; |
1541 | 9.33k | auto iter = queue.insert(queue.end(), FileKeyAndOffset(hash, offset, size)); |
1542 | 9.33k | map.insert(std::make_pair(std::make_pair(hash, offset), iter)); |
1543 | 9.33k | return iter; |
1544 | 9.33k | } |
1545 | | |
1546 | 0 | void LRUQueue::remove_all(std::lock_guard<std::mutex>& /* cache_lock */) { |
1547 | 0 | queue.clear(); |
1548 | 0 | map.clear(); |
1549 | 0 | cache_size = 0; |
1550 | 0 | } |
1551 | | |
1552 | 1.40k | void LRUQueue::move_to_end(Iterator queue_it, std::lock_guard<std::mutex>& /* cache_lock */) { |
1553 | 1.40k | queue.splice(queue.end(), queue, queue_it); |
1554 | 1.40k | } |
1555 | | |
1556 | | void LRUQueue::resize(Iterator queue_it, size_t new_size, |
1557 | 5 | std::lock_guard<std::mutex>& /* cache_lock */) { |
1558 | 5 | cache_size -= queue_it->size; |
1559 | 5 | queue_it->size = new_size; |
1560 | 5 | cache_size += new_size; |
1561 | 5 | } |
1562 | | bool LRUQueue::contains(const UInt128Wrapper& hash, size_t offset, |
1563 | 4 | std::lock_guard<std::mutex>& /* cache_lock */) const { |
1564 | 4 | return map.find(std::make_pair(hash, offset)) != map.end(); |
1565 | 4 | } |
1566 | | |
1567 | | LRUQueue::Iterator LRUQueue::get(const UInt128Wrapper& hash, size_t offset, |
1568 | 560 | std::lock_guard<std::mutex>& /* cache_lock */) const { |
1569 | 560 | auto itr = map.find(std::make_pair(hash, offset)); |
1570 | 560 | if (itr != map.end()) { |
1571 | 186 | return itr->second; |
1572 | 186 | } |
1573 | 374 | return std::list<FileKeyAndOffset>::iterator(); |
1574 | 560 | } |
1575 | | |
1576 | 2 | std::string LRUQueue::to_string(std::lock_guard<std::mutex>& /* cache_lock */) const { |
1577 | 2 | std::string result; |
1578 | 18 | for (const auto& [hash, offset, size] : queue) { |
1579 | 18 | if (!result.empty()) { |
1580 | 16 | result += ", "; |
1581 | 16 | } |
1582 | 18 | result += fmt::format("{}: [{}, {}]", hash.to_string(), offset, offset + size - 1); |
1583 | 18 | } |
1584 | 2 | return result; |
1585 | 2 | } |
1586 | | |
1587 | | size_t LRUQueue::levenshtein_distance_from(LRUQueue& base, |
1588 | 5 | std::lock_guard<std::mutex>& cache_lock) { |
1589 | 5 | std::list<FileKeyAndOffset> target_queue = this->queue; |
1590 | 5 | std::list<FileKeyAndOffset> base_queue = base.queue; |
1591 | 5 | std::vector<FileKeyAndOffset> vec1(target_queue.begin(), target_queue.end()); |
1592 | 5 | std::vector<FileKeyAndOffset> vec2(base_queue.begin(), base_queue.end()); |
1593 | | |
1594 | 5 | size_t m = vec1.size(); |
1595 | 5 | size_t n = vec2.size(); |
1596 | | |
1597 | | // Create a 2D vector (matrix) to store the Levenshtein distances |
1598 | | // dp[i][j] will hold the distance between the first i elements of vec1 and the first j elements of vec2 |
1599 | 5 | std::vector<std::vector<size_t>> dp(m + 1, std::vector<size_t>(n + 1, 0)); |
1600 | | |
1601 | | // Initialize the first row and column of the matrix |
1602 | | // The distance between an empty list and a list of length k is k (all insertions or deletions) |
1603 | 22 | for (size_t i = 0; i <= m; ++i) { |
1604 | 17 | dp[i][0] = i; |
1605 | 17 | } |
1606 | 20 | for (size_t j = 0; j <= n; ++j) { |
1607 | 15 | dp[0][j] = j; |
1608 | 15 | } |
1609 | | |
1610 | | // Fill the matrix using dynamic programming |
1611 | 17 | for (size_t i = 1; i <= m; ++i) { |
1612 | 38 | for (size_t j = 1; j <= n; ++j) { |
1613 | | // Check if the current elements of both vectors are equal |
1614 | 26 | size_t cost = (vec1[i - 1].hash == vec2[j - 1].hash && |
1615 | 26 | vec1[i - 1].offset == vec2[j - 1].offset) |
1616 | 26 | ? 0 |
1617 | 26 | : 1; |
1618 | | // Calculate the minimum cost of three possible operations: |
1619 | | // 1. Insertion: dp[i][j-1] + 1 |
1620 | | // 2. Deletion: dp[i-1][j] + 1 |
1621 | | // 3. Substitution: dp[i-1][j-1] + cost (0 if elements are equal, 1 if not) |
1622 | 26 | dp[i][j] = std::min({dp[i - 1][j] + 1, dp[i][j - 1] + 1, dp[i - 1][j - 1] + cost}); |
1623 | 26 | } |
1624 | 12 | } |
1625 | | // The bottom-right cell of the matrix contains the Levenshtein distance |
1626 | 5 | return dp[m][n]; |
1627 | 5 | } |
1628 | | |
1629 | 1 | std::string BlockFileCache::dump_structure(const UInt128Wrapper& hash) { |
1630 | 1 | SCOPED_CACHE_LOCK(_mutex, this); |
1631 | 1 | return dump_structure_unlocked(hash, cache_lock); |
1632 | 1 | } |
1633 | | |
1634 | | std::string BlockFileCache::dump_structure_unlocked(const UInt128Wrapper& hash, |
1635 | 1 | std::lock_guard<std::mutex>&) { |
1636 | 1 | std::stringstream result; |
1637 | 1 | auto it = _files.find(hash); |
1638 | 1 | if (it == _files.end()) { |
1639 | 0 | return std::string(""); |
1640 | 0 | } |
1641 | 1 | const auto& cells_by_offset = it->second; |
1642 | | |
1643 | 1 | for (const auto& [_, cell] : cells_by_offset) { |
1644 | 1 | result << cell.file_block->get_info_for_log() << " " |
1645 | 1 | << cache_type_to_string(cell.file_block->cache_type()) << "\n"; |
1646 | 1 | } |
1647 | | |
1648 | 1 | return result.str(); |
1649 | 1 | } |
1650 | | |
1651 | 0 | std::string BlockFileCache::dump_single_cache_type(const UInt128Wrapper& hash, size_t offset) { |
1652 | 0 | SCOPED_CACHE_LOCK(_mutex, this); |
1653 | 0 | return dump_single_cache_type_unlocked(hash, offset, cache_lock); |
1654 | 0 | } |
1655 | | |
1656 | | std::string BlockFileCache::dump_single_cache_type_unlocked(const UInt128Wrapper& hash, |
1657 | | size_t offset, |
1658 | 0 | std::lock_guard<std::mutex>&) { |
1659 | 0 | std::stringstream result; |
1660 | 0 | auto it = _files.find(hash); |
1661 | 0 | if (it == _files.end()) { |
1662 | 0 | return std::string(""); |
1663 | 0 | } |
1664 | 0 | const auto& cells_by_offset = it->second; |
1665 | 0 | const auto& cell = cells_by_offset.find(offset); |
1666 | |
|
1667 | 0 | return cache_type_to_string(cell->second.file_block->cache_type()); |
1668 | 0 | } |
1669 | | |
1670 | | void BlockFileCache::change_cache_type(const UInt128Wrapper& hash, size_t offset, |
1671 | | FileCacheType new_type, |
1672 | 9 | std::lock_guard<std::mutex>& cache_lock) { |
1673 | 9 | if (auto iter = _files.find(hash); iter != _files.end()) { |
1674 | 9 | auto& file_blocks = iter->second; |
1675 | 9 | if (auto cell_it = file_blocks.find(offset); cell_it != file_blocks.end()) { |
1676 | 8 | FileBlockCell& cell = cell_it->second; |
1677 | 8 | auto& cur_queue = get_queue(cell.file_block->cache_type()); |
1678 | 8 | DCHECK(cell.queue_iterator.has_value()); |
1679 | 8 | cur_queue.remove(*cell.queue_iterator, cache_lock); |
1680 | 8 | _lru_recorder->record_queue_event( |
1681 | 8 | cell.file_block->cache_type(), CacheLRULogType::REMOVE, |
1682 | 8 | cell.file_block->get_hash_value(), cell.file_block->offset(), cell.size()); |
1683 | 8 | auto& new_queue = get_queue(new_type); |
1684 | 8 | cell.queue_iterator = |
1685 | 8 | new_queue.add(hash, offset, cell.file_block->range().size(), cache_lock); |
1686 | 8 | _lru_recorder->record_queue_event(new_type, CacheLRULogType::ADD, |
1687 | 8 | cell.file_block->get_hash_value(), |
1688 | 8 | cell.file_block->offset(), cell.size()); |
1689 | 8 | } |
1690 | 9 | } |
1691 | 9 | } |
1692 | | |
1693 | | // @brief: get a path's disk capacity used percent, inode used percent |
1694 | | // @param: path |
1695 | | // @param: percent.first disk used percent, percent.second inode used percent |
1696 | 156 | int disk_used_percentage(const std::string& path, std::pair<int, int>* percent) { |
1697 | 156 | struct statfs stat; |
1698 | 156 | int ret = statfs(path.c_str(), &stat); |
1699 | 156 | if (ret != 0) { |
1700 | 2 | return ret; |
1701 | 2 | } |
1702 | | // https://github.com/coreutils/coreutils/blob/master/src/df.c#L1195 |
1703 | | // v->used = stat.f_blocks - stat.f_bfree |
1704 | | // nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail |
1705 | 154 | uintmax_t u100 = (stat.f_blocks - stat.f_bfree) * 100; |
1706 | 154 | uintmax_t nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail; |
1707 | 154 | int capacity_percentage = int(u100 / nonroot_total + (u100 % nonroot_total != 0)); |
1708 | | |
1709 | 154 | unsigned long long inode_free = stat.f_ffree; |
1710 | 154 | unsigned long long inode_total = stat.f_files; |
1711 | 154 | int inode_percentage = cast_set<int>(inode_free * 100 / inode_total); |
1712 | 154 | percent->first = capacity_percentage; |
1713 | 154 | percent->second = 100 - inode_percentage; |
1714 | | |
1715 | | // Add sync point for testing |
1716 | 154 | TEST_SYNC_POINT_CALLBACK("BlockFileCache::disk_used_percentage:1", percent); |
1717 | | |
1718 | 154 | return 0; |
1719 | 156 | } |
1720 | | |
1721 | 10 | std::string BlockFileCache::reset_capacity(size_t new_capacity) { |
1722 | 10 | using namespace std::chrono; |
1723 | 10 | int64_t space_released = 0; |
1724 | 10 | size_t old_capacity = 0; |
1725 | 10 | std::stringstream ss; |
1726 | 10 | ss << "finish reset_capacity, path=" << _cache_base_path; |
1727 | 10 | auto adjust_start_time = steady_clock::time_point(); |
1728 | 10 | { |
1729 | 10 | SCOPED_CACHE_LOCK(_mutex, this); |
1730 | 10 | if (new_capacity < _capacity && new_capacity < _cur_cache_size) { |
1731 | 1 | int64_t need_remove_size = _cur_cache_size - new_capacity; |
1732 | 4 | auto remove_blocks = [&](LRUQueue& queue) -> int64_t { |
1733 | 4 | int64_t queue_released = 0; |
1734 | 4 | std::vector<FileBlockCell*> to_evict; |
1735 | 13 | for (const auto& [entry_key, entry_offset, entry_size] : queue) { |
1736 | 13 | if (need_remove_size <= 0) { |
1737 | 1 | break; |
1738 | 1 | } |
1739 | 12 | need_remove_size -= entry_size; |
1740 | 12 | space_released += entry_size; |
1741 | 12 | queue_released += entry_size; |
1742 | 12 | auto* cell = get_cell(entry_key, entry_offset, cache_lock); |
1743 | 12 | if (!cell->releasable()) { |
1744 | 0 | cell->file_block->set_deleting(); |
1745 | 0 | continue; |
1746 | 0 | } |
1747 | 12 | to_evict.push_back(cell); |
1748 | 12 | } |
1749 | 12 | for (auto& cell : to_evict) { |
1750 | 12 | FileBlockSPtr file_block = cell->file_block; |
1751 | 12 | std::lock_guard block_lock(file_block->_mutex); |
1752 | 12 | remove(file_block, cache_lock, block_lock); |
1753 | 12 | } |
1754 | 4 | return queue_released; |
1755 | 4 | }; |
1756 | 1 | int64_t queue_released = remove_blocks(_disposable_queue); |
1757 | 1 | ss << " disposable_queue released " << queue_released; |
1758 | 1 | queue_released = remove_blocks(_normal_queue); |
1759 | 1 | ss << " normal_queue released " << queue_released; |
1760 | 1 | queue_released = remove_blocks(_index_queue); |
1761 | 1 | ss << " index_queue released " << queue_released; |
1762 | 1 | queue_released = remove_blocks(_ttl_queue); |
1763 | 1 | ss << " ttl_queue released " << queue_released; |
1764 | | |
1765 | 1 | _disk_resource_limit_mode = true; |
1766 | 1 | _disk_limit_mode_metrics->set_value(1); |
1767 | 1 | ss << " total_space_released=" << space_released; |
1768 | 1 | } |
1769 | 10 | old_capacity = _capacity; |
1770 | 10 | _capacity = new_capacity; |
1771 | 10 | _cache_capacity_metrics->set_value(_capacity); |
1772 | 10 | } |
1773 | 10 | auto use_time = duration_cast<milliseconds>(steady_clock::time_point() - adjust_start_time); |
1774 | 10 | LOG(INFO) << "Finish tag deleted block. path=" << _cache_base_path |
1775 | 10 | << " use_time=" << cast_set<int64_t>(use_time.count()); |
1776 | 10 | ss << " old_capacity=" << old_capacity << " new_capacity=" << new_capacity; |
1777 | 10 | LOG(INFO) << ss.str(); |
1778 | 10 | return ss.str(); |
1779 | 10 | } |
1780 | | |
1781 | 1.08k | void BlockFileCache::check_disk_resource_limit() { |
1782 | 1.08k | if (_storage->get_type() != FileCacheStorageType::DISK) { |
1783 | 943 | return; |
1784 | 943 | } |
1785 | | |
1786 | 142 | bool previous_mode = _disk_resource_limit_mode; |
1787 | 142 | if (_capacity > _cur_cache_size) { |
1788 | 140 | _disk_resource_limit_mode = false; |
1789 | 140 | _disk_limit_mode_metrics->set_value(0); |
1790 | 140 | } |
1791 | 142 | std::pair<int, int> percent; |
1792 | 142 | int ret = disk_used_percentage(_cache_base_path, &percent); |
1793 | 142 | if (ret != 0) { |
1794 | 1 | LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno)); |
1795 | 1 | return; |
1796 | 1 | } |
1797 | 141 | auto [space_percentage, inode_percentage] = percent; |
1798 | 282 | auto is_insufficient = [](const int& percentage) { |
1799 | 282 | return percentage >= config::file_cache_enter_disk_resource_limit_mode_percent; |
1800 | 282 | }; |
1801 | 141 | DCHECK_GE(space_percentage, 0); |
1802 | 141 | DCHECK_LE(space_percentage, 100); |
1803 | 141 | DCHECK_GE(inode_percentage, 0); |
1804 | 141 | DCHECK_LE(inode_percentage, 100); |
1805 | | // ATTN: due to that can be changed dynamically, set it to default value if it's invalid |
1806 | | // FIXME: reject with config validator |
1807 | 141 | if (config::file_cache_enter_disk_resource_limit_mode_percent < |
1808 | 141 | config::file_cache_exit_disk_resource_limit_mode_percent) { |
1809 | 1 | LOG_WARNING("config error, set to default value") |
1810 | 1 | .tag("enter", config::file_cache_enter_disk_resource_limit_mode_percent) |
1811 | 1 | .tag("exit", config::file_cache_exit_disk_resource_limit_mode_percent); |
1812 | 1 | config::file_cache_enter_disk_resource_limit_mode_percent = 88; |
1813 | 1 | config::file_cache_exit_disk_resource_limit_mode_percent = 80; |
1814 | 1 | } |
1815 | 141 | bool is_space_insufficient = is_insufficient(space_percentage); |
1816 | 141 | bool is_inode_insufficient = is_insufficient(inode_percentage); |
1817 | 141 | if (is_space_insufficient || is_inode_insufficient) { |
1818 | 1 | _disk_resource_limit_mode = true; |
1819 | 1 | _disk_limit_mode_metrics->set_value(1); |
1820 | 140 | } else if (_disk_resource_limit_mode && |
1821 | 140 | (space_percentage < config::file_cache_exit_disk_resource_limit_mode_percent) && |
1822 | 140 | (inode_percentage < config::file_cache_exit_disk_resource_limit_mode_percent)) { |
1823 | 0 | _disk_resource_limit_mode = false; |
1824 | 0 | _disk_limit_mode_metrics->set_value(0); |
1825 | 0 | } |
1826 | 141 | if (previous_mode != _disk_resource_limit_mode) { |
1827 | | // add log for disk resource limit mode switching |
1828 | 2 | if (_disk_resource_limit_mode) { |
1829 | 1 | LOG(WARNING) << "Entering disk resource limit mode: file_cache=" << get_base_path() |
1830 | 1 | << " space_percent=" << space_percentage |
1831 | 1 | << " inode_percent=" << inode_percentage |
1832 | 1 | << " is_space_insufficient=" << is_space_insufficient |
1833 | 1 | << " is_inode_insufficient=" << is_inode_insufficient |
1834 | 1 | << " enter threshold=" |
1835 | 1 | << config::file_cache_enter_disk_resource_limit_mode_percent; |
1836 | 1 | } else { |
1837 | 1 | LOG(INFO) << "Exiting disk resource limit mode: file_cache=" << get_base_path() |
1838 | 1 | << " space_percent=" << space_percentage |
1839 | 1 | << " inode_percent=" << inode_percentage << " exit threshold=" |
1840 | 1 | << config::file_cache_exit_disk_resource_limit_mode_percent; |
1841 | 1 | } |
1842 | 139 | } else if (_disk_resource_limit_mode) { |
1843 | | // print log for disk resource limit mode running, but less frequently |
1844 | 0 | LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path() |
1845 | 0 | << " space_percent=" << space_percentage |
1846 | 0 | << " inode_percent=" << inode_percentage |
1847 | 0 | << " is_space_insufficient=" << is_space_insufficient |
1848 | 0 | << " is_inode_insufficient=" << is_inode_insufficient |
1849 | 0 | << " mode run in resource limit"; |
1850 | 0 | } |
1851 | 141 | } |
1852 | | |
1853 | 15 | void BlockFileCache::check_need_evict_cache_in_advance() { |
1854 | 15 | if (_storage->get_type() != FileCacheStorageType::DISK) { |
1855 | 1 | return; |
1856 | 1 | } |
1857 | | |
1858 | 14 | std::pair<int, int> percent; |
1859 | 14 | int ret = disk_used_percentage(_cache_base_path, &percent); |
1860 | 14 | if (ret != 0) { |
1861 | 1 | LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno)); |
1862 | 1 | return; |
1863 | 1 | } |
1864 | 13 | auto [space_percentage, inode_percentage] = percent; |
1865 | 13 | int size_percentage = static_cast<int>(_cur_cache_size * 100 / _capacity); |
1866 | 39 | auto is_insufficient = [](const int& percentage) { |
1867 | 39 | return percentage >= config::file_cache_enter_need_evict_cache_in_advance_percent; |
1868 | 39 | }; |
1869 | 13 | DCHECK_GE(space_percentage, 0); |
1870 | 13 | DCHECK_LE(space_percentage, 100); |
1871 | 13 | DCHECK_GE(inode_percentage, 0); |
1872 | 13 | DCHECK_LE(inode_percentage, 100); |
1873 | | // ATTN: due to that can be changed dynamically, set it to default value if it's invalid |
1874 | | // FIXME: reject with config validator |
1875 | 13 | if (config::file_cache_enter_need_evict_cache_in_advance_percent <= |
1876 | 13 | config::file_cache_exit_need_evict_cache_in_advance_percent) { |
1877 | 1 | LOG_WARNING("config error, set to default value") |
1878 | 1 | .tag("enter", config::file_cache_enter_need_evict_cache_in_advance_percent) |
1879 | 1 | .tag("exit", config::file_cache_exit_need_evict_cache_in_advance_percent); |
1880 | 1 | config::file_cache_enter_need_evict_cache_in_advance_percent = 78; |
1881 | 1 | config::file_cache_exit_need_evict_cache_in_advance_percent = 75; |
1882 | 1 | } |
1883 | 13 | bool previous_mode = _need_evict_cache_in_advance; |
1884 | 13 | bool is_space_insufficient = is_insufficient(space_percentage); |
1885 | 13 | bool is_inode_insufficient = is_insufficient(inode_percentage); |
1886 | 13 | bool is_size_insufficient = is_insufficient(size_percentage); |
1887 | 13 | if (is_space_insufficient || is_inode_insufficient || is_size_insufficient) { |
1888 | 9 | _need_evict_cache_in_advance = true; |
1889 | 9 | _need_evict_cache_in_advance_metrics->set_value(1); |
1890 | 9 | } else if (_need_evict_cache_in_advance && |
1891 | 4 | (space_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) && |
1892 | 4 | (inode_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) && |
1893 | 4 | (size_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent)) { |
1894 | 1 | _need_evict_cache_in_advance = false; |
1895 | 1 | _need_evict_cache_in_advance_metrics->set_value(0); |
1896 | 1 | } |
1897 | 13 | if (previous_mode != _need_evict_cache_in_advance) { |
1898 | | // add log for evict cache in advance mode switching |
1899 | 6 | if (_need_evict_cache_in_advance) { |
1900 | 5 | LOG(WARNING) << "Entering evict cache in advance mode: " |
1901 | 5 | << "file_cache=" << get_base_path() |
1902 | 5 | << " space_percent=" << space_percentage |
1903 | 5 | << " inode_percent=" << inode_percentage |
1904 | 5 | << " size_percent=" << size_percentage |
1905 | 5 | << " is_space_insufficient=" << is_space_insufficient |
1906 | 5 | << " is_inode_insufficient=" << is_inode_insufficient |
1907 | 5 | << " is_size_insufficient=" << is_size_insufficient << " enter threshold=" |
1908 | 5 | << config::file_cache_enter_need_evict_cache_in_advance_percent; |
1909 | 5 | } else { |
1910 | 1 | LOG(INFO) << "Exiting evict cache in advance mode: " |
1911 | 1 | << "file_cache=" << get_base_path() << " space_percent=" << space_percentage |
1912 | 1 | << " inode_percent=" << inode_percentage |
1913 | 1 | << " size_percent=" << size_percentage << " exit threshold=" |
1914 | 1 | << config::file_cache_exit_need_evict_cache_in_advance_percent; |
1915 | 1 | } |
1916 | 7 | } else if (_need_evict_cache_in_advance) { |
1917 | | // print log for evict cache in advance mode running, but less frequently |
1918 | 4 | LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path() |
1919 | 1 | << " space_percent=" << space_percentage |
1920 | 1 | << " inode_percent=" << inode_percentage |
1921 | 1 | << " size_percent=" << size_percentage |
1922 | 1 | << " is_space_insufficient=" << is_space_insufficient |
1923 | 1 | << " is_inode_insufficient=" << is_inode_insufficient |
1924 | 1 | << " is_size_insufficient=" << is_size_insufficient |
1925 | 1 | << " need evict cache in advance"; |
1926 | 4 | } |
1927 | 13 | } |
1928 | | |
1929 | 139 | void BlockFileCache::run_background_monitor() { |
1930 | 139 | Thread::set_self_name("run_background_monitor"); |
1931 | 1.08k | while (!_close) { |
1932 | 1.08k | int64_t interval_ms = config::file_cache_background_monitor_interval_ms; |
1933 | 1.08k | TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_ms); |
1934 | 1.08k | check_disk_resource_limit(); |
1935 | 1.08k | if (config::enable_evict_file_cache_in_advance) { |
1936 | 7 | check_need_evict_cache_in_advance(); |
1937 | 1.07k | } else { |
1938 | 1.07k | _need_evict_cache_in_advance = false; |
1939 | 1.07k | _need_evict_cache_in_advance_metrics->set_value(0); |
1940 | 1.07k | } |
1941 | | |
1942 | 1.08k | { |
1943 | 1.08k | std::unique_lock close_lock(_close_mtx); |
1944 | 1.08k | _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms)); |
1945 | 1.08k | if (_close) { |
1946 | 139 | break; |
1947 | 139 | } |
1948 | 1.08k | } |
1949 | | // report |
1950 | 946 | { |
1951 | 946 | SCOPED_CACHE_LOCK(_mutex, this); |
1952 | 946 | _cur_cache_size_metrics->set_value(_cur_cache_size); |
1953 | 946 | _cur_ttl_cache_size_metrics->set_value(_cur_cache_size - |
1954 | 946 | _index_queue.get_capacity(cache_lock) - |
1955 | 946 | _normal_queue.get_capacity(cache_lock) - |
1956 | 946 | _disposable_queue.get_capacity(cache_lock)); |
1957 | 946 | _cur_ttl_cache_lru_queue_cache_size_metrics->set_value( |
1958 | 946 | _ttl_queue.get_capacity(cache_lock)); |
1959 | 946 | _cur_ttl_cache_lru_queue_element_count_metrics->set_value( |
1960 | 946 | _ttl_queue.get_elements_num(cache_lock)); |
1961 | 946 | _cur_normal_queue_cache_size_metrics->set_value(_normal_queue.get_capacity(cache_lock)); |
1962 | 946 | _cur_normal_queue_element_count_metrics->set_value( |
1963 | 946 | _normal_queue.get_elements_num(cache_lock)); |
1964 | 946 | _cur_index_queue_cache_size_metrics->set_value(_index_queue.get_capacity(cache_lock)); |
1965 | 946 | _cur_index_queue_element_count_metrics->set_value( |
1966 | 946 | _index_queue.get_elements_num(cache_lock)); |
1967 | 946 | _cur_disposable_queue_cache_size_metrics->set_value( |
1968 | 946 | _disposable_queue.get_capacity(cache_lock)); |
1969 | 946 | _cur_disposable_queue_element_count_metrics->set_value( |
1970 | 946 | _disposable_queue.get_elements_num(cache_lock)); |
1971 | | |
1972 | | // Update meta store write queue size if storage is FSFileCacheStorage |
1973 | 946 | if (_storage->get_type() == FileCacheStorageType::DISK) { |
1974 | 18 | auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get()); |
1975 | 18 | if (fs_storage != nullptr) { |
1976 | 18 | auto* meta_store = fs_storage->get_meta_store(); |
1977 | 18 | if (meta_store != nullptr) { |
1978 | 18 | _meta_store_write_queue_size_metrics->set_value( |
1979 | 18 | meta_store->get_write_queue_size()); |
1980 | 18 | } |
1981 | 18 | } |
1982 | 18 | } |
1983 | | |
1984 | 946 | if (_num_read_blocks->get_value() > 0) { |
1985 | 13 | _hit_ratio->set_value((double)_num_hit_blocks->get_value() / |
1986 | 13 | (double)_num_read_blocks->get_value()); |
1987 | 13 | } |
1988 | 946 | if (_num_read_blocks_5m && _num_read_blocks_5m->get_value() > 0) { |
1989 | 0 | _hit_ratio_5m->set_value((double)_num_hit_blocks_5m->get_value() / |
1990 | 0 | (double)_num_read_blocks_5m->get_value()); |
1991 | 0 | } |
1992 | 946 | if (_num_read_blocks_1h && _num_read_blocks_1h->get_value() > 0) { |
1993 | 0 | _hit_ratio_1h->set_value((double)_num_hit_blocks_1h->get_value() / |
1994 | 0 | (double)_num_read_blocks_1h->get_value()); |
1995 | 0 | } |
1996 | | |
1997 | 946 | if (_no_warmup_num_hit_blocks->get_value() > 0) { |
1998 | 2 | _no_warmup_hit_ratio->set_value((double)_no_warmup_num_hit_blocks->get_value() / |
1999 | 2 | (double)_no_warmup_num_read_blocks->get_value()); |
2000 | 2 | } |
2001 | 946 | if (_no_warmup_num_hit_blocks_5m && _no_warmup_num_hit_blocks_5m->get_value() > 0) { |
2002 | 0 | _no_warmup_hit_ratio_5m->set_value( |
2003 | 0 | (double)_no_warmup_num_hit_blocks_5m->get_value() / |
2004 | 0 | (double)_no_warmup_num_read_blocks_5m->get_value()); |
2005 | 0 | } |
2006 | 946 | if (_no_warmup_num_hit_blocks_1h && _no_warmup_num_hit_blocks_1h->get_value() > 0) { |
2007 | 0 | _no_warmup_hit_ratio_1h->set_value( |
2008 | 0 | (double)_no_warmup_num_hit_blocks_1h->get_value() / |
2009 | 0 | (double)_no_warmup_num_read_blocks_1h->get_value()); |
2010 | 0 | } |
2011 | 946 | } |
2012 | 946 | } |
2013 | 139 | } |
2014 | | |
2015 | 139 | void BlockFileCache::run_background_gc() { |
2016 | 139 | Thread::set_self_name("run_background_gc"); |
2017 | 139 | FileCacheKey key; |
2018 | 139 | size_t batch_count = 0; |
2019 | 406 | while (!_close) { |
2020 | 406 | int64_t interval_ms = config::file_cache_background_gc_interval_ms; |
2021 | 406 | size_t batch_limit = config::file_cache_remove_block_qps_limit * interval_ms / 1000; |
2022 | 406 | { |
2023 | 406 | std::unique_lock close_lock(_close_mtx); |
2024 | 406 | _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms)); |
2025 | 406 | if (_close) { |
2026 | 139 | break; |
2027 | 139 | } |
2028 | 406 | } |
2029 | | |
2030 | 284 | while (batch_count < batch_limit && _recycle_keys.try_dequeue(key)) { |
2031 | 17 | int64_t duration_ns = 0; |
2032 | 17 | Status st; |
2033 | 17 | { |
2034 | 17 | SCOPED_RAW_TIMER(&duration_ns); |
2035 | 17 | st = _storage->remove(key); |
2036 | 17 | } |
2037 | 17 | *_storage_async_remove_latency_us << (duration_ns / 1000); |
2038 | | |
2039 | 17 | if (!st.ok()) { |
2040 | 12 | LOG_WARNING("").error(st); |
2041 | 12 | } |
2042 | 17 | batch_count++; |
2043 | 17 | } |
2044 | 267 | *_recycle_keys_length_recorder << _recycle_keys.size_approx(); |
2045 | 267 | batch_count = 0; |
2046 | 267 | } |
2047 | 139 | } |
2048 | | |
2049 | 139 | void BlockFileCache::run_background_evict_in_advance() { |
2050 | 139 | Thread::set_self_name("run_background_evict_in_advance"); |
2051 | 139 | LOG(INFO) << "Starting background evict in advance thread"; |
2052 | 139 | int64_t batch = 0; |
2053 | 4.88k | while (!_close) { |
2054 | 4.88k | { |
2055 | 4.88k | std::unique_lock close_lock(_close_mtx); |
2056 | 4.88k | _close_cv.wait_for( |
2057 | 4.88k | close_lock, |
2058 | 4.88k | std::chrono::milliseconds(config::file_cache_evict_in_advance_interval_ms)); |
2059 | 4.88k | if (_close) { |
2060 | 139 | LOG(INFO) << "Background evict in advance thread exiting due to cache closing"; |
2061 | 139 | break; |
2062 | 139 | } |
2063 | 4.88k | } |
2064 | 4.74k | batch = config::file_cache_evict_in_advance_batch_bytes; |
2065 | | |
2066 | | // Skip if eviction not needed or too many pending recycles |
2067 | 4.74k | if (!_need_evict_cache_in_advance || |
2068 | 4.74k | _recycle_keys.size_approx() >= |
2069 | 4.74k | config::file_cache_evict_in_advance_recycle_keys_num_threshold) { |
2070 | 4.74k | continue; |
2071 | 4.74k | } |
2072 | | |
2073 | 1 | int64_t duration_ns = 0; |
2074 | 1 | { |
2075 | 1 | SCOPED_CACHE_LOCK(_mutex, this); |
2076 | 1 | SCOPED_RAW_TIMER(&duration_ns); |
2077 | 1 | try_evict_in_advance(batch, cache_lock); |
2078 | 1 | } |
2079 | 1 | *_evict_in_advance_latency_us << (duration_ns / 1000); |
2080 | 1 | } |
2081 | 139 | } |
2082 | | |
2083 | 139 | void BlockFileCache::run_background_block_lru_update() { |
2084 | 139 | Thread::set_self_name("run_background_block_lru_update"); |
2085 | 139 | std::vector<FileBlockSPtr> batch; |
2086 | 4.75k | while (!_close) { |
2087 | 4.75k | int64_t interval_ms = config::file_cache_background_block_lru_update_interval_ms; |
2088 | 4.75k | size_t batch_limit = |
2089 | 4.75k | config::file_cache_background_block_lru_update_qps_limit * interval_ms / 1000; |
2090 | 4.75k | { |
2091 | 4.75k | std::unique_lock close_lock(_close_mtx); |
2092 | 4.75k | _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms)); |
2093 | 4.75k | if (_close) { |
2094 | 139 | break; |
2095 | 139 | } |
2096 | 4.75k | } |
2097 | | |
2098 | 4.61k | batch.clear(); |
2099 | 4.61k | batch.reserve(batch_limit); |
2100 | 4.61k | size_t drained = _need_update_lru_blocks.drain(batch_limit, &batch); |
2101 | 4.66k | if (drained == 0) { |
2102 | 4.66k | *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size(); |
2103 | 4.66k | continue; |
2104 | 4.66k | } |
2105 | | |
2106 | 18.4E | int64_t duration_ns = 0; |
2107 | 18.4E | { |
2108 | 18.4E | SCOPED_CACHE_LOCK(_mutex, this); |
2109 | 18.4E | SCOPED_RAW_TIMER(&duration_ns); |
2110 | 18.4E | for (auto& block : batch) { |
2111 | 495 | update_block_lru(block, cache_lock); |
2112 | 495 | } |
2113 | 18.4E | } |
2114 | 18.4E | *_update_lru_blocks_latency_us << (duration_ns / 1000); |
2115 | 18.4E | *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size(); |
2116 | 18.4E | } |
2117 | 139 | } |
2118 | | |
2119 | | std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>> |
2120 | 2 | BlockFileCache::get_hot_blocks_meta(const UInt128Wrapper& hash) const { |
2121 | 2 | int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>( |
2122 | 2 | std::chrono::steady_clock::now().time_since_epoch()) |
2123 | 2 | .count(); |
2124 | 2 | SCOPED_CACHE_LOCK(_mutex, this); |
2125 | 2 | std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>> blocks_meta; |
2126 | 2 | if (auto iter = _files.find(hash); iter != _files.end()) { |
2127 | 5 | for (auto& pair : _files.find(hash)->second) { |
2128 | 5 | const FileBlockCell* cell = &pair.second; |
2129 | 5 | if (cell->file_block->cache_type() != FileCacheType::DISPOSABLE) { |
2130 | 4 | if (cell->file_block->cache_type() == FileCacheType::TTL || |
2131 | 4 | (cell->atime != 0 && |
2132 | 3 | cur_time - cell->atime < |
2133 | 3 | get_queue(cell->file_block->cache_type()).get_hot_data_interval())) { |
2134 | 3 | blocks_meta.emplace_back(pair.first, cell->size(), |
2135 | 3 | cell->file_block->cache_type(), |
2136 | 3 | cell->file_block->expiration_time()); |
2137 | 3 | } |
2138 | 4 | } |
2139 | 5 | } |
2140 | 2 | } |
2141 | 2 | return blocks_meta; |
2142 | 2 | } |
2143 | | |
2144 | | bool BlockFileCache::try_reserve_during_async_load(size_t size, |
2145 | 1.00k | std::lock_guard<std::mutex>& cache_lock) { |
2146 | 1.00k | size_t removed_size = 0; |
2147 | 1.00k | size_t normal_queue_size = _normal_queue.get_capacity(cache_lock); |
2148 | 1.00k | size_t disposable_queue_size = _disposable_queue.get_capacity(cache_lock); |
2149 | 1.00k | size_t index_queue_size = _index_queue.get_capacity(cache_lock); |
2150 | | |
2151 | 1.00k | std::vector<FileBlockCell*> to_evict; |
2152 | 1.00k | auto collect_eliminate_fragments = [&](LRUQueue& queue) { |
2153 | 1.00k | for (const auto& [entry_key, entry_offset, entry_size] : queue) { |
2154 | 1.00k | if (!_disk_resource_limit_mode || removed_size >= size) { |
2155 | 1.00k | break; |
2156 | 1.00k | } |
2157 | 1 | auto* cell = get_cell(entry_key, entry_offset, cache_lock); |
2158 | | |
2159 | 1 | DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string() |
2160 | 0 | << ", offset: " << entry_offset; |
2161 | | |
2162 | 1 | size_t cell_size = cell->size(); |
2163 | 1 | DCHECK(entry_size == cell_size); |
2164 | | |
2165 | 1 | if (cell->releasable()) { |
2166 | 1 | auto& file_block = cell->file_block; |
2167 | | |
2168 | 1 | std::lock_guard block_lock(file_block->_mutex); |
2169 | 1 | DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED); |
2170 | 1 | to_evict.push_back(cell); |
2171 | 1 | removed_size += cell_size; |
2172 | 1 | } |
2173 | 1 | } |
2174 | 1.00k | }; |
2175 | 1.00k | if (disposable_queue_size != 0) { |
2176 | 0 | collect_eliminate_fragments(get_queue(FileCacheType::DISPOSABLE)); |
2177 | 0 | } |
2178 | 1.00k | if (normal_queue_size != 0) { |
2179 | 1.00k | collect_eliminate_fragments(get_queue(FileCacheType::NORMAL)); |
2180 | 1.00k | } |
2181 | 1.00k | if (index_queue_size != 0) { |
2182 | 0 | collect_eliminate_fragments(get_queue(FileCacheType::INDEX)); |
2183 | 0 | } |
2184 | 1.00k | std::string reason = "async load"; |
2185 | 1.00k | remove_file_blocks(to_evict, cache_lock, true, reason); |
2186 | | |
2187 | 1.00k | return !_disk_resource_limit_mode || removed_size >= size; |
2188 | 1.00k | } |
2189 | | |
2190 | 29 | void BlockFileCache::clear_need_update_lru_blocks() { |
2191 | 29 | _need_update_lru_blocks.clear(); |
2192 | 29 | *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size(); |
2193 | 29 | } |
2194 | | |
2195 | 26 | std::string BlockFileCache::clear_file_cache_directly() { |
2196 | 26 | _lru_dumper->remove_lru_dump_files(); |
2197 | 26 | using namespace std::chrono; |
2198 | 26 | std::stringstream ss; |
2199 | 26 | auto start = steady_clock::now(); |
2200 | 26 | SCOPED_CACHE_LOCK(_mutex, this); |
2201 | 26 | LOG_INFO("start clear_file_cache_directly").tag("path", _cache_base_path); |
2202 | | |
2203 | 26 | std::string clear_msg; |
2204 | 26 | auto s = _storage->clear(clear_msg); |
2205 | 26 | if (!s.ok()) { |
2206 | 1 | return clear_msg; |
2207 | 1 | } |
2208 | | |
2209 | 25 | int64_t num_files = _files.size(); |
2210 | 25 | int64_t cache_size = _cur_cache_size; |
2211 | 25 | int64_t index_queue_size = _index_queue.get_elements_num(cache_lock); |
2212 | 25 | int64_t normal_queue_size = _normal_queue.get_elements_num(cache_lock); |
2213 | 25 | int64_t disposible_queue_size = _disposable_queue.get_elements_num(cache_lock); |
2214 | 25 | int64_t ttl_queue_size = _ttl_queue.get_elements_num(cache_lock); |
2215 | | |
2216 | 25 | int64_t clear_fd_duration = 0; |
2217 | 25 | { |
2218 | | // clear FDCache to release fd |
2219 | 25 | SCOPED_RAW_TIMER(&clear_fd_duration); |
2220 | 25 | for (const auto& [file_key, file_blocks] : _files) { |
2221 | 13 | for (const auto& [offset, file_block_cell] : file_blocks) { |
2222 | 13 | AccessKeyAndOffset access_key_and_offset(file_key, offset); |
2223 | 13 | FDCache::instance()->remove_file_reader(access_key_and_offset); |
2224 | 13 | } |
2225 | 2 | } |
2226 | 25 | } |
2227 | | |
2228 | 25 | _files.clear(); |
2229 | 25 | _cur_cache_size = 0; |
2230 | 25 | _cur_ttl_size = 0; |
2231 | 25 | _time_to_key.clear(); |
2232 | 25 | _key_to_time.clear(); |
2233 | 25 | _index_queue.clear(cache_lock); |
2234 | 25 | _normal_queue.clear(cache_lock); |
2235 | 25 | _disposable_queue.clear(cache_lock); |
2236 | 25 | _ttl_queue.clear(cache_lock); |
2237 | | |
2238 | | // Synchronously update cache metrics so that information_schema.file_cache_statistics |
2239 | | // reflects the cleared state immediately, without waiting for the next |
2240 | | // run_background_monitor cycle. |
2241 | 25 | _cur_cache_size_metrics->set_value(0); |
2242 | 25 | _cur_ttl_cache_size_metrics->set_value(0); |
2243 | 25 | _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(0); |
2244 | 25 | _cur_ttl_cache_lru_queue_element_count_metrics->set_value(0); |
2245 | 25 | _cur_normal_queue_cache_size_metrics->set_value(0); |
2246 | 25 | _cur_normal_queue_element_count_metrics->set_value(0); |
2247 | 25 | _cur_index_queue_cache_size_metrics->set_value(0); |
2248 | 25 | _cur_index_queue_element_count_metrics->set_value(0); |
2249 | 25 | _cur_disposable_queue_cache_size_metrics->set_value(0); |
2250 | 25 | _cur_disposable_queue_element_count_metrics->set_value(0); |
2251 | | |
2252 | 25 | clear_need_update_lru_blocks(); |
2253 | | |
2254 | 25 | ss << "finish clear_file_cache_directly" |
2255 | 25 | << " path=" << _cache_base_path |
2256 | 25 | << " time_elapsed_ms=" << duration_cast<milliseconds>(steady_clock::now() - start).count() |
2257 | 25 | << " fd_clear_time_ms=" << (clear_fd_duration / 1000000) << " num_files=" << num_files |
2258 | 25 | << " cache_size=" << cache_size << " index_queue_size=" << index_queue_size |
2259 | 25 | << " normal_queue_size=" << normal_queue_size |
2260 | 25 | << " disposible_queue_size=" << disposible_queue_size << "ttl_queue_size=" << ttl_queue_size; |
2261 | 25 | auto msg = ss.str(); |
2262 | 25 | LOG(INFO) << msg; |
2263 | 25 | _lru_dumper->remove_lru_dump_files(); |
2264 | 25 | return msg; |
2265 | 26 | } |
2266 | | |
2267 | 2.01k | std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const UInt128Wrapper& hash) { |
2268 | 2.01k | std::map<size_t, FileBlockSPtr> offset_to_block; |
2269 | 2.01k | SCOPED_CACHE_LOCK(_mutex, this); |
2270 | 2.01k | if (_files.contains(hash)) { |
2271 | 1.02k | for (auto& [offset, cell] : _files[hash]) { |
2272 | 1.02k | if (cell.file_block->state() == FileBlock::State::DOWNLOADED) { |
2273 | 1.02k | cell.file_block->_owned_by_cached_reader = true; |
2274 | 1.02k | offset_to_block.emplace(offset, cell.file_block); |
2275 | 1.02k | } |
2276 | 1.02k | } |
2277 | 1.00k | } |
2278 | 2.01k | return offset_to_block; |
2279 | 2.01k | } |
2280 | | |
2281 | 0 | void BlockFileCache::update_ttl_atime(const UInt128Wrapper& hash) { |
2282 | 0 | SCOPED_CACHE_LOCK(_mutex, this); |
2283 | 0 | if (auto iter = _files.find(hash); iter != _files.end()) { |
2284 | 0 | for (auto& [_, cell] : iter->second) { |
2285 | 0 | cell.update_atime(); |
2286 | 0 | } |
2287 | 0 | }; |
2288 | 0 | } |
2289 | | |
2290 | 139 | void BlockFileCache::run_background_lru_log_replay() { |
2291 | 139 | Thread::set_self_name("run_background_lru_log_replay"); |
2292 | 4.88k | while (!_close) { |
2293 | 4.88k | int64_t interval_ms = config::file_cache_background_lru_log_replay_interval_ms; |
2294 | 4.88k | { |
2295 | 4.88k | std::unique_lock close_lock(_close_mtx); |
2296 | 4.88k | _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms)); |
2297 | 4.88k | if (_close) { |
2298 | 139 | break; |
2299 | 139 | } |
2300 | 4.88k | } |
2301 | | |
2302 | 4.74k | _lru_recorder->replay_queue_event(FileCacheType::TTL); |
2303 | 4.74k | _lru_recorder->replay_queue_event(FileCacheType::INDEX); |
2304 | 4.74k | _lru_recorder->replay_queue_event(FileCacheType::NORMAL); |
2305 | 4.74k | _lru_recorder->replay_queue_event(FileCacheType::DISPOSABLE); |
2306 | | |
2307 | 4.74k | if (config::enable_evaluate_shadow_queue_diff) { |
2308 | 0 | SCOPED_CACHE_LOCK(_mutex, this); |
2309 | 0 | _lru_recorder->evaluate_queue_diff(_ttl_queue, "ttl", cache_lock); |
2310 | 0 | _lru_recorder->evaluate_queue_diff(_index_queue, "index", cache_lock); |
2311 | 0 | _lru_recorder->evaluate_queue_diff(_normal_queue, "normal", cache_lock); |
2312 | 0 | _lru_recorder->evaluate_queue_diff(_disposable_queue, "disposable", cache_lock); |
2313 | 0 | } |
2314 | 4.74k | } |
2315 | 139 | } |
2316 | | |
2317 | 1.55k | void BlockFileCache::dump_lru_queues(bool force) { |
2318 | 1.55k | std::unique_lock dump_lock(_dump_lru_queues_mtx); |
2319 | 1.55k | if (config::file_cache_background_lru_dump_tail_record_num > 0 && |
2320 | 1.55k | !ExecEnv::GetInstance()->get_is_upgrading()) { |
2321 | 1.55k | _lru_dumper->dump_queue("disposable", force); |
2322 | 1.55k | _lru_dumper->dump_queue("normal", force); |
2323 | 1.55k | _lru_dumper->dump_queue("index", force); |
2324 | 1.55k | _lru_dumper->dump_queue("ttl", force); |
2325 | 1.55k | _lru_dumper->set_first_dump_done(); |
2326 | 1.55k | } |
2327 | 1.55k | } |
2328 | | |
2329 | 139 | void BlockFileCache::run_background_lru_dump() { |
2330 | 139 | Thread::set_self_name("run_background_lru_dump"); |
2331 | 1.68k | while (!_close) { |
2332 | 1.68k | int64_t interval_ms = config::file_cache_background_lru_dump_interval_ms; |
2333 | 1.68k | { |
2334 | 1.68k | std::unique_lock close_lock(_close_mtx); |
2335 | 1.68k | _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms)); |
2336 | 1.68k | if (_close) { |
2337 | 139 | break; |
2338 | 139 | } |
2339 | 1.68k | } |
2340 | 1.55k | dump_lru_queues(false); |
2341 | 1.55k | } |
2342 | 139 | } |
2343 | | |
2344 | 139 | void BlockFileCache::restore_lru_queues_from_disk(std::lock_guard<std::mutex>& cache_lock) { |
2345 | | // keep this order coz may be duplicated in different queue, we use the first appearence |
2346 | 139 | _lru_dumper->restore_queue(_ttl_queue, "ttl", cache_lock); |
2347 | 139 | _lru_dumper->restore_queue(_index_queue, "index", cache_lock); |
2348 | 139 | _lru_dumper->restore_queue(_normal_queue, "normal", cache_lock); |
2349 | 139 | _lru_dumper->restore_queue(_disposable_queue, "disposable", cache_lock); |
2350 | 139 | } |
2351 | | |
2352 | 0 | std::map<std::string, double> BlockFileCache::get_stats() { |
2353 | 0 | std::map<std::string, double> stats; |
2354 | 0 | stats["hits_ratio"] = (double)_hit_ratio->get_value(); |
2355 | 0 | stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value(); |
2356 | 0 | stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value(); |
2357 | |
|
2358 | 0 | stats["index_queue_max_size"] = (double)_index_queue.get_max_size(); |
2359 | 0 | stats["index_queue_curr_size"] = (double)_cur_index_queue_cache_size_metrics->get_value(); |
2360 | 0 | stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size(); |
2361 | 0 | stats["index_queue_curr_elements"] = |
2362 | 0 | (double)_cur_index_queue_element_count_metrics->get_value(); |
2363 | |
|
2364 | 0 | stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size(); |
2365 | 0 | stats["ttl_queue_curr_size"] = (double)_cur_ttl_cache_lru_queue_cache_size_metrics->get_value(); |
2366 | 0 | stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size(); |
2367 | 0 | stats["ttl_queue_curr_elements"] = |
2368 | 0 | (double)_cur_ttl_cache_lru_queue_element_count_metrics->get_value(); |
2369 | |
|
2370 | 0 | stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size(); |
2371 | 0 | stats["normal_queue_curr_size"] = (double)_cur_normal_queue_cache_size_metrics->get_value(); |
2372 | 0 | stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size(); |
2373 | 0 | stats["normal_queue_curr_elements"] = |
2374 | 0 | (double)_cur_normal_queue_element_count_metrics->get_value(); |
2375 | |
|
2376 | 0 | stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size(); |
2377 | 0 | stats["disposable_queue_curr_size"] = |
2378 | 0 | (double)_cur_disposable_queue_cache_size_metrics->get_value(); |
2379 | 0 | stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size(); |
2380 | 0 | stats["disposable_queue_curr_elements"] = |
2381 | 0 | (double)_cur_disposable_queue_element_count_metrics->get_value(); |
2382 | |
|
2383 | 0 | stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance; |
2384 | 0 | stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode; |
2385 | |
|
2386 | 0 | stats["total_removed_counts"] = (double)_num_removed_blocks->get_value(); |
2387 | 0 | stats["total_hit_counts"] = (double)_num_hit_blocks->get_value(); |
2388 | 0 | stats["total_read_counts"] = (double)_num_read_blocks->get_value(); |
2389 | |
|
2390 | 0 | stats["total_read_size"] = (double)_total_read_size_metrics->get_value(); |
2391 | 0 | stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value(); |
2392 | 0 | stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value(); |
2393 | |
|
2394 | 0 | return stats; |
2395 | 0 | } |
2396 | | |
2397 | | // for be UTs |
2398 | 172 | std::map<std::string, double> BlockFileCache::get_stats_unsafe() { |
2399 | 172 | std::map<std::string, double> stats; |
2400 | 172 | stats["hits_ratio"] = (double)_hit_ratio->get_value(); |
2401 | 172 | stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value(); |
2402 | 172 | stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value(); |
2403 | | |
2404 | 172 | stats["index_queue_max_size"] = (double)_index_queue.get_max_size(); |
2405 | 172 | stats["index_queue_curr_size"] = (double)_index_queue.get_capacity_unsafe(); |
2406 | 172 | stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size(); |
2407 | 172 | stats["index_queue_curr_elements"] = (double)_index_queue.get_elements_num_unsafe(); |
2408 | | |
2409 | 172 | stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size(); |
2410 | 172 | stats["ttl_queue_curr_size"] = (double)_ttl_queue.get_capacity_unsafe(); |
2411 | 172 | stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size(); |
2412 | 172 | stats["ttl_queue_curr_elements"] = (double)_ttl_queue.get_elements_num_unsafe(); |
2413 | | |
2414 | 172 | stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size(); |
2415 | 172 | stats["normal_queue_curr_size"] = (double)_normal_queue.get_capacity_unsafe(); |
2416 | 172 | stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size(); |
2417 | 172 | stats["normal_queue_curr_elements"] = (double)_normal_queue.get_elements_num_unsafe(); |
2418 | | |
2419 | 172 | stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size(); |
2420 | 172 | stats["disposable_queue_curr_size"] = (double)_disposable_queue.get_capacity_unsafe(); |
2421 | 172 | stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size(); |
2422 | 172 | stats["disposable_queue_curr_elements"] = (double)_disposable_queue.get_elements_num_unsafe(); |
2423 | | |
2424 | 172 | stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance; |
2425 | 172 | stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode; |
2426 | | |
2427 | 172 | stats["total_removed_counts"] = (double)_num_removed_blocks->get_value(); |
2428 | 172 | stats["total_hit_counts"] = (double)_num_hit_blocks->get_value(); |
2429 | 172 | stats["total_read_counts"] = (double)_num_read_blocks->get_value(); |
2430 | | |
2431 | 172 | stats["total_read_size"] = (double)_total_read_size_metrics->get_value(); |
2432 | 172 | stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value(); |
2433 | 172 | stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value(); |
2434 | | |
2435 | 172 | return stats; |
2436 | 172 | } |
2437 | | |
2438 | | template void BlockFileCache::remove(FileBlockSPtr file_block, |
2439 | | std::lock_guard<std::mutex>& cache_lock, |
2440 | | std::lock_guard<std::mutex>& block_lock, bool sync); |
2441 | | |
2442 | | #include "common/compile_check_end.h" |
2443 | | |
2444 | 1 | Status BlockFileCache::report_file_cache_inconsistency(std::vector<std::string>& results) { |
2445 | 1 | InconsistencyContext inconsistency_context; |
2446 | 1 | RETURN_IF_ERROR(check_file_cache_consistency(inconsistency_context)); |
2447 | 1 | auto n = inconsistency_context.types.size(); |
2448 | 1 | results.reserve(n); |
2449 | 1 | for (size_t i = 0; i < n; i++) { |
2450 | 0 | std::string result; |
2451 | 0 | result += "File cache info in manager:\n"; |
2452 | 0 | result += inconsistency_context.infos_in_manager[i].to_string(); |
2453 | 0 | result += "File cache info in storage:\n"; |
2454 | 0 | result += inconsistency_context.infos_in_storage[i].to_string(); |
2455 | 0 | result += inconsistency_context.types[i].to_string(); |
2456 | 0 | result += "\n"; |
2457 | 0 | results.push_back(std::move(result)); |
2458 | 0 | } |
2459 | 1 | return Status::OK(); |
2460 | 1 | } |
2461 | | |
2462 | 1 | Status BlockFileCache::check_file_cache_consistency(InconsistencyContext& inconsistency_context) { |
2463 | 1 | std::lock_guard<std::mutex> cache_lock(_mutex); |
2464 | 1 | std::vector<FileCacheInfo> infos_in_storage; |
2465 | 1 | RETURN_IF_ERROR(_storage->get_file_cache_infos(infos_in_storage, cache_lock)); |
2466 | 1 | std::unordered_set<AccessKeyAndOffset, KeyAndOffsetHash> confirmed_blocks; |
2467 | 1 | for (const auto& info_in_storage : infos_in_storage) { |
2468 | 0 | confirmed_blocks.insert({info_in_storage.hash, info_in_storage.offset}); |
2469 | 0 | auto* cell = get_cell(info_in_storage.hash, info_in_storage.offset, cache_lock); |
2470 | 0 | if (cell == nullptr || cell->file_block == nullptr) { |
2471 | 0 | inconsistency_context.infos_in_manager.emplace_back(); |
2472 | 0 | inconsistency_context.infos_in_storage.push_back(info_in_storage); |
2473 | 0 | inconsistency_context.types.emplace_back(InconsistencyType::NOT_LOADED); |
2474 | 0 | continue; |
2475 | 0 | } |
2476 | 0 | FileCacheInfo info_in_manager { |
2477 | 0 | .hash = info_in_storage.hash, |
2478 | 0 | .expiration_time = cell->file_block->expiration_time(), |
2479 | 0 | .size = cell->size(), |
2480 | 0 | .offset = info_in_storage.offset, |
2481 | 0 | .is_tmp = cell->file_block->state() == FileBlock::State::DOWNLOADING, |
2482 | 0 | .cache_type = cell->file_block->cache_type()}; |
2483 | 0 | InconsistencyType inconsistent_type; |
2484 | 0 | if (info_in_storage.is_tmp != info_in_manager.is_tmp) { |
2485 | 0 | inconsistent_type |= InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE; |
2486 | 0 | } |
2487 | 0 | size_t expected_size = |
2488 | 0 | info_in_manager.is_tmp ? cell->dowloading_size() : info_in_manager.size; |
2489 | 0 | if (info_in_storage.size != expected_size) { |
2490 | 0 | inconsistent_type |= InconsistencyType::SIZE_INCONSISTENT; |
2491 | 0 | } |
2492 | | // Only if it is not a tmp file need we check the cache type. |
2493 | 0 | if ((inconsistent_type & InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE) == 0 && |
2494 | 0 | info_in_storage.cache_type != info_in_manager.cache_type) { |
2495 | 0 | inconsistent_type |= InconsistencyType::CACHE_TYPE_INCONSISTENT; |
2496 | 0 | } |
2497 | 0 | if (info_in_storage.expiration_time != info_in_manager.expiration_time) { |
2498 | 0 | inconsistent_type |= InconsistencyType::EXPIRATION_TIME_INCONSISTENT; |
2499 | 0 | } |
2500 | 0 | if (inconsistent_type != InconsistencyType::NONE) { |
2501 | 0 | inconsistency_context.infos_in_manager.push_back(info_in_manager); |
2502 | 0 | inconsistency_context.infos_in_storage.push_back(info_in_storage); |
2503 | 0 | inconsistency_context.types.push_back(inconsistent_type); |
2504 | 0 | } |
2505 | 0 | } |
2506 | | |
2507 | 1 | for (const auto& [hash, offset_to_cell] : _files) { |
2508 | 0 | for (const auto& [offset, cell] : offset_to_cell) { |
2509 | 0 | if (confirmed_blocks.contains({hash, offset})) { |
2510 | 0 | continue; |
2511 | 0 | } |
2512 | 0 | const auto& block = cell.file_block; |
2513 | 0 | inconsistency_context.infos_in_manager.emplace_back( |
2514 | 0 | hash, block->expiration_time(), cell.size(), offset, |
2515 | 0 | cell.file_block->state() == FileBlock::State::DOWNLOADING, block->cache_type()); |
2516 | 0 | inconsistency_context.infos_in_storage.emplace_back(); |
2517 | 0 | inconsistency_context.types.emplace_back(InconsistencyType::MISSING_IN_STORAGE); |
2518 | 0 | } |
2519 | 0 | } |
2520 | 1 | return Status::OK(); |
2521 | 1 | } |
2522 | | |
2523 | | } // namespace doris::io |