be/src/io/cache/block_file_cache.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // This file is copied from |
18 | | // https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileCache.cpp |
19 | | // and modified by Doris |
20 | | |
21 | | #include "io/cache/block_file_cache.h" |
22 | | |
23 | | #include <gen_cpp/file_cache.pb.h> |
24 | | |
25 | | #include <cstdio> |
26 | | #include <exception> |
27 | | #include <fstream> |
28 | | #include <unordered_set> |
29 | | |
30 | | #include "common/status.h" |
31 | | #include "cpp/sync_point.h" |
32 | | #include "runtime/exec_env.h" |
33 | | |
34 | | #if defined(__APPLE__) |
35 | | #include <sys/mount.h> |
36 | | #else |
37 | | #include <sys/statfs.h> |
38 | | #endif |
39 | | |
40 | | #include <chrono> // IWYU pragma: keep |
41 | | #include <mutex> |
42 | | #include <ranges> |
43 | | |
44 | | #include "common/cast_set.h" |
45 | | #include "common/config.h" |
46 | | #include "common/logging.h" |
47 | | #include "core/uint128.h" |
48 | | #include "exec/common/sip_hash.h" |
49 | | #include "io/cache/block_file_cache_ttl_mgr.h" |
50 | | #include "io/cache/file_block.h" |
51 | | #include "io/cache/file_cache_common.h" |
52 | | #include "io/cache/fs_file_cache_storage.h" |
53 | | #include "io/cache/mem_file_cache_storage.h" |
54 | | #include "runtime/runtime_profile.h" |
55 | | #include "util/concurrency_stats.h" |
56 | | #include "util/stack_util.h" |
57 | | #include "util/stopwatch.hpp" |
58 | | #include "util/thread.h" |
59 | | #include "util/time.h" |
60 | | namespace doris::io { |
61 | | #include "common/compile_check_begin.h" |
62 | | |
63 | | // Insert a block pointer into one shard while swallowing allocation failures. |
64 | 8.87k | bool NeedUpdateLRUBlocks::insert(FileBlockSPtr block) { |
65 | 8.87k | if (!block) { |
66 | 1 | return false; |
67 | 1 | } |
68 | 8.87k | try { |
69 | 8.87k | auto* raw_ptr = block.get(); |
70 | 8.87k | auto idx = shard_index(raw_ptr); |
71 | 8.87k | auto& shard = _shards[idx]; |
72 | 8.87k | std::lock_guard lock(shard.mutex); |
73 | 8.87k | auto [_, inserted] = shard.entries.emplace(raw_ptr, std::move(block)); |
74 | 8.87k | if (inserted) { |
75 | 950 | _size.fetch_add(1, std::memory_order_relaxed); |
76 | 950 | } |
77 | 8.87k | return inserted; |
78 | 8.87k | } catch (const std::exception& e) { |
79 | 0 | LOG(WARNING) << "Failed to enqueue block for LRU update: " << e.what(); |
80 | 0 | } catch (...) { |
81 | 0 | LOG(WARNING) << "Failed to enqueue block for LRU update: unknown error"; |
82 | 0 | } |
83 | 0 | return false; |
84 | 8.87k | } |
85 | | |
86 | | // Drain up to `limit` unique blocks to the caller, keeping the structure consistent on failures. |
87 | 8.72k | size_t NeedUpdateLRUBlocks::drain(size_t limit, std::vector<FileBlockSPtr>* output) { |
88 | 8.72k | if (limit == 0 || output == nullptr) { |
89 | 2 | return 0; |
90 | 2 | } |
91 | 8.72k | size_t drained = 0; |
92 | 8.72k | try { |
93 | 8.72k | output->reserve(output->size() + std::min(limit, size())); |
94 | 558k | for (auto& shard : _shards) { |
95 | 558k | if (drained >= limit) { |
96 | 1 | break; |
97 | 1 | } |
98 | 558k | std::lock_guard lock(shard.mutex); |
99 | 558k | auto it = shard.entries.begin(); |
100 | 558k | size_t shard_drained = 0; |
101 | 559k | while (it != shard.entries.end() && drained + shard_drained < limit) { |
102 | 485 | output->emplace_back(std::move(it->second)); |
103 | 485 | it = shard.entries.erase(it); |
104 | 485 | ++shard_drained; |
105 | 485 | } |
106 | 558k | if (shard_drained > 0) { |
107 | 6 | _size.fetch_sub(shard_drained, std::memory_order_relaxed); |
108 | 6 | drained += shard_drained; |
109 | 6 | } |
110 | 558k | } |
111 | 8.72k | } catch (const std::exception& e) { |
112 | 0 | LOG(WARNING) << "Failed to drain LRU update blocks: " << e.what(); |
113 | 0 | } catch (...) { |
114 | 0 | LOG(WARNING) << "Failed to drain LRU update blocks: unknown error"; |
115 | 0 | } |
116 | 8.73k | return drained; |
117 | 8.72k | } |
118 | | |
119 | | // Remove every pending block, guarding against unexpected exceptions. |
120 | 35 | void NeedUpdateLRUBlocks::clear() { |
121 | 35 | try { |
122 | 2.24k | for (auto& shard : _shards) { |
123 | 2.24k | std::lock_guard lock(shard.mutex); |
124 | 2.24k | if (!shard.entries.empty()) { |
125 | 1 | auto removed = shard.entries.size(); |
126 | 1 | shard.entries.clear(); |
127 | 1 | _size.fetch_sub(removed, std::memory_order_relaxed); |
128 | 1 | } |
129 | 2.24k | } |
130 | 35 | } catch (const std::exception& e) { |
131 | 0 | LOG(WARNING) << "Failed to clear LRU update blocks: " << e.what(); |
132 | 0 | } catch (...) { |
133 | 0 | LOG(WARNING) << "Failed to clear LRU update blocks: unknown error"; |
134 | 0 | } |
135 | 35 | } |
136 | | |
137 | 8.87k | size_t NeedUpdateLRUBlocks::shard_index(FileBlock* ptr) const { |
138 | 8.87k | DCHECK(ptr != nullptr); |
139 | 8.87k | return std::hash<FileBlock*> {}(ptr)&kShardMask; |
140 | 8.87k | } |
141 | | |
142 | | BlockFileCache::BlockFileCache(const std::string& cache_base_path, |
143 | | const FileCacheSettings& cache_settings) |
144 | 172 | : _cache_base_path(cache_base_path), |
145 | 172 | _capacity(cache_settings.capacity), |
146 | 172 | _max_file_block_size(cache_settings.max_file_block_size) { |
147 | 172 | _cur_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(_cache_base_path.c_str(), |
148 | 172 | "file_cache_cache_size", 0); |
149 | 172 | _cache_capacity_metrics = std::make_shared<bvar::Status<size_t>>( |
150 | 172 | _cache_base_path.c_str(), "file_cache_capacity", _capacity); |
151 | 172 | _cur_ttl_cache_size_metrics = std::make_shared<bvar::Status<size_t>>( |
152 | 172 | _cache_base_path.c_str(), "file_cache_ttl_cache_size", 0); |
153 | 172 | _cur_normal_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>( |
154 | 172 | _cache_base_path.c_str(), "file_cache_normal_queue_element_count", 0); |
155 | 172 | _cur_ttl_cache_lru_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>( |
156 | 172 | _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_size", 0); |
157 | 172 | _cur_ttl_cache_lru_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>( |
158 | 172 | _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_element_count", 0); |
159 | 172 | _cur_normal_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>( |
160 | 172 | _cache_base_path.c_str(), "file_cache_normal_queue_cache_size", 0); |
161 | 172 | _cur_index_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>( |
162 | 172 | _cache_base_path.c_str(), "file_cache_index_queue_element_count", 0); |
163 | 172 | _cur_index_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>( |
164 | 172 | _cache_base_path.c_str(), "file_cache_index_queue_cache_size", 0); |
165 | 172 | _cur_disposable_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>( |
166 | 172 | _cache_base_path.c_str(), "file_cache_disposable_queue_element_count", 0); |
167 | 172 | _cur_disposable_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>( |
168 | 172 | _cache_base_path.c_str(), "file_cache_disposable_queue_cache_size", 0); |
169 | 172 | _cur_cold_normal_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>( |
170 | 172 | _cache_base_path.c_str(), "file_cache_cold_normal_queue_element_count", 0); |
171 | 172 | _cur_cold_normal_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>( |
172 | 172 | _cache_base_path.c_str(), "file_cache_cold_normal_queue_cache_size", 0); |
173 | | |
174 | 172 | _queue_evict_size_metrics[0] = std::make_shared<bvar::Adder<size_t>>( |
175 | 172 | _cache_base_path.c_str(), "file_cache_index_queue_evict_size"); |
176 | 172 | _queue_evict_size_metrics[1] = std::make_shared<bvar::Adder<size_t>>( |
177 | 172 | _cache_base_path.c_str(), "file_cache_normal_queue_evict_size"); |
178 | 172 | _queue_evict_size_metrics[2] = std::make_shared<bvar::Adder<size_t>>( |
179 | 172 | _cache_base_path.c_str(), "file_cache_disposable_queue_evict_size"); |
180 | 172 | _queue_evict_size_metrics[3] = std::make_shared<bvar::Adder<size_t>>( |
181 | 172 | _cache_base_path.c_str(), "file_cache_ttl_cache_evict_size"); |
182 | 172 | _queue_evict_size_metrics[4] = std::make_shared<bvar::Adder<size_t>>( |
183 | 172 | _cache_base_path.c_str(), "file_cache_cold_normal_queue_evict_size"); |
184 | 172 | _total_evict_size_metrics = std::make_shared<bvar::Adder<size_t>>( |
185 | 172 | _cache_base_path.c_str(), "file_cache_total_evict_size"); |
186 | 172 | _total_read_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
187 | 172 | "file_cache_total_read_size"); |
188 | 172 | _total_hit_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
189 | 172 | "file_cache_total_hit_size"); |
190 | 172 | _gc_evict_bytes_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
191 | 172 | "file_cache_gc_evict_bytes"); |
192 | 172 | _gc_evict_count_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
193 | 172 | "file_cache_gc_evict_count"); |
194 | | |
195 | 172 | _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] = |
196 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
197 | 172 | "file_cache_evict_by_time_disposable_to_normal"); |
198 | 172 | _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] = |
199 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
200 | 172 | "file_cache_evict_by_time_disposable_to_index"); |
201 | 172 | _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] = |
202 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
203 | 172 | "file_cache_evict_by_time_disposable_to_ttl"); |
204 | 172 | _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::COLD_NORMAL] = |
205 | 172 | std::make_shared<bvar::Adder<size_t>>( |
206 | 172 | _cache_base_path.c_str(), "file_cache_evict_by_time_disposable_to_cold_normal"); |
207 | 172 | _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] = |
208 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
209 | 172 | "file_cache_evict_by_time_normal_to_disposable"); |
210 | 172 | _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] = |
211 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
212 | 172 | "file_cache_evict_by_time_normal_to_index"); |
213 | 172 | _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] = |
214 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
215 | 172 | "file_cache_evict_by_time_normal_to_ttl"); |
216 | 172 | _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::COLD_NORMAL] = |
217 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
218 | 172 | "file_cache_evict_by_time_normal_to_cold_normal"); |
219 | 172 | _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] = |
220 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
221 | 172 | "file_cache_evict_by_time_index_to_disposable"); |
222 | 172 | _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] = |
223 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
224 | 172 | "file_cache_evict_by_time_index_to_normal"); |
225 | 172 | _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] = |
226 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
227 | 172 | "file_cache_evict_by_time_index_to_ttl"); |
228 | 172 | _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::COLD_NORMAL] = |
229 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
230 | 172 | "file_cache_evict_by_time_index_to_cold_normal"); |
231 | 172 | _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] = |
232 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
233 | 172 | "file_cache_evict_by_time_ttl_to_disposable"); |
234 | 172 | _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] = |
235 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
236 | 172 | "file_cache_evict_by_time_ttl_to_normal"); |
237 | 172 | _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] = |
238 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
239 | 172 | "file_cache_evict_by_time_ttl_to_index"); |
240 | 172 | _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::COLD_NORMAL] = |
241 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
242 | 172 | "file_cache_evict_by_time_ttl_to_cold_normal"); |
243 | 172 | _evict_by_time_metrics_matrix[FileCacheType::COLD_NORMAL][FileCacheType::DISPOSABLE] = |
244 | 172 | std::make_shared<bvar::Adder<size_t>>( |
245 | 172 | _cache_base_path.c_str(), "file_cache_evict_by_time_cold_normal_to_disposable"); |
246 | 172 | _evict_by_time_metrics_matrix[FileCacheType::COLD_NORMAL][FileCacheType::NORMAL] = |
247 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
248 | 172 | "file_cache_evict_by_time_cold_normal_to_normal"); |
249 | 172 | _evict_by_time_metrics_matrix[FileCacheType::COLD_NORMAL][FileCacheType::INDEX] = |
250 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
251 | 172 | "file_cache_evict_by_time_cold_normal_to_index"); |
252 | 172 | _evict_by_time_metrics_matrix[FileCacheType::COLD_NORMAL][FileCacheType::TTL] = |
253 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
254 | 172 | "file_cache_evict_by_time_cold_normal_to_ttl"); |
255 | | |
256 | 172 | _evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE] = |
257 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
258 | 172 | "file_cache_evict_by_self_lru_disposable"); |
259 | 172 | _evict_by_self_lru_metrics_matrix[FileCacheType::NORMAL] = |
260 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
261 | 172 | "file_cache_evict_by_self_lru_normal"); |
262 | 172 | _evict_by_self_lru_metrics_matrix[FileCacheType::INDEX] = std::make_shared<bvar::Adder<size_t>>( |
263 | 172 | _cache_base_path.c_str(), "file_cache_evict_by_self_lru_index"); |
264 | 172 | _evict_by_self_lru_metrics_matrix[FileCacheType::TTL] = std::make_shared<bvar::Adder<size_t>>( |
265 | 172 | _cache_base_path.c_str(), "file_cache_evict_by_self_lru_ttl"); |
266 | 172 | _evict_by_self_lru_metrics_matrix[FileCacheType::COLD_NORMAL] = |
267 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
268 | 172 | "file_cache_evict_by_self_lru_cold_normal"); |
269 | | |
270 | 172 | _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] = |
271 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
272 | 172 | "file_cache_evict_by_size_disposable_to_normal"); |
273 | 172 | _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] = |
274 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
275 | 172 | "file_cache_evict_by_size_disposable_to_index"); |
276 | 172 | _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] = |
277 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
278 | 172 | "file_cache_evict_by_size_disposable_to_ttl"); |
279 | 172 | _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::COLD_NORMAL] = |
280 | 172 | std::make_shared<bvar::Adder<size_t>>( |
281 | 172 | _cache_base_path.c_str(), "file_cache_evict_by_size_disposable_to_cold_normal"); |
282 | 172 | _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] = |
283 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
284 | 172 | "file_cache_evict_by_size_normal_to_disposable"); |
285 | 172 | _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] = |
286 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
287 | 172 | "file_cache_evict_by_size_normal_to_index"); |
288 | 172 | _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] = |
289 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
290 | 172 | "file_cache_evict_by_size_normal_to_ttl"); |
291 | 172 | _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::COLD_NORMAL] = |
292 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
293 | 172 | "file_cache_evict_by_size_normal_to_cold_normal"); |
294 | 172 | _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] = |
295 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
296 | 172 | "file_cache_evict_by_size_index_to_disposable"); |
297 | 172 | _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] = |
298 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
299 | 172 | "file_cache_evict_by_size_index_to_normal"); |
300 | 172 | _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] = |
301 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
302 | 172 | "file_cache_evict_by_size_index_to_ttl"); |
303 | 172 | _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::COLD_NORMAL] = |
304 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
305 | 172 | "file_cache_evict_by_size_index_to_cold_normal"); |
306 | 172 | _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] = |
307 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
308 | 172 | "file_cache_evict_by_size_ttl_to_disposable"); |
309 | 172 | _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] = |
310 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
311 | 172 | "file_cache_evict_by_size_ttl_to_normal"); |
312 | 172 | _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] = |
313 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
314 | 172 | "file_cache_evict_by_size_ttl_to_index"); |
315 | 172 | _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::COLD_NORMAL] = |
316 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
317 | 172 | "file_cache_evict_by_size_ttl_to_cold_normal"); |
318 | 172 | _evict_by_size_metrics_matrix[FileCacheType::COLD_NORMAL][FileCacheType::DISPOSABLE] = |
319 | 172 | std::make_shared<bvar::Adder<size_t>>( |
320 | 172 | _cache_base_path.c_str(), "file_cache_evict_by_size_cold_normal_to_disposable"); |
321 | 172 | _evict_by_size_metrics_matrix[FileCacheType::COLD_NORMAL][FileCacheType::NORMAL] = |
322 | 172 | std::make_shared<bvar::Adder<size_t>>( |
323 | 172 | _cache_base_path.c_str(), |
324 | 172 | "file_cache_evict_by_size_cold_normal_to_cold_normal"); |
325 | 172 | _evict_by_size_metrics_matrix[FileCacheType::COLD_NORMAL][FileCacheType::INDEX] = |
326 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
327 | 172 | "file_cache_evict_by_size_cold_normal_to_index"); |
328 | 172 | _evict_by_size_metrics_matrix[FileCacheType::COLD_NORMAL][FileCacheType::TTL] = |
329 | 172 | std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
330 | 172 | "file_cache_evict_by_size_cold_normal_to_ttl"); |
331 | | |
332 | 172 | _evict_by_try_release = std::make_shared<bvar::Adder<size_t>>( |
333 | 172 | _cache_base_path.c_str(), "file_cache_evict_by_try_release"); |
334 | | |
335 | 172 | _num_read_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
336 | 172 | "file_cache_num_read_blocks"); |
337 | 172 | _num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
338 | 172 | "file_cache_num_hit_blocks"); |
339 | 172 | _num_removed_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(), |
340 | 172 | "file_cache_num_removed_blocks"); |
341 | | |
342 | 172 | _no_warmup_num_read_blocks = std::make_shared<bvar::Adder<size_t>>( |
343 | 172 | _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks"); |
344 | 172 | _no_warmup_num_hit_blocks = std::make_shared<bvar::Adder<size_t>>( |
345 | 172 | _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks"); |
346 | | |
347 | 172 | #ifndef BE_TEST |
348 | 172 | _num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>( |
349 | 172 | _cache_base_path.c_str(), "file_cache_num_hit_blocks_5m", _num_hit_blocks.get(), 300); |
350 | 172 | _num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>( |
351 | 172 | _cache_base_path.c_str(), "file_cache_num_read_blocks_5m", _num_read_blocks.get(), 300); |
352 | 172 | _num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>( |
353 | 172 | _cache_base_path.c_str(), "file_cache_num_hit_blocks_1h", _num_hit_blocks.get(), 3600); |
354 | 172 | _num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>( |
355 | 172 | _cache_base_path.c_str(), "file_cache_num_read_blocks_1h", _num_read_blocks.get(), |
356 | 172 | 3600); |
357 | 172 | _no_warmup_num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>( |
358 | 172 | _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_5m", |
359 | 172 | _no_warmup_num_hit_blocks.get(), 300); |
360 | 172 | _no_warmup_num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>( |
361 | 172 | _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_5m", |
362 | 172 | _no_warmup_num_read_blocks.get(), 300); |
363 | 172 | _no_warmup_num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>( |
364 | 172 | _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_1h", |
365 | 172 | _no_warmup_num_hit_blocks.get(), 3600); |
366 | 172 | _no_warmup_num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>( |
367 | 172 | _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_1h", |
368 | 172 | _no_warmup_num_read_blocks.get(), 3600); |
369 | 172 | #endif |
370 | | |
371 | 172 | _hit_ratio = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(), |
372 | 172 | "file_cache_hit_ratio", 0.0); |
373 | 172 | _hit_ratio_5m = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(), |
374 | 172 | "file_cache_hit_ratio_5m", 0.0); |
375 | 172 | _hit_ratio_1h = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(), |
376 | 172 | "file_cache_hit_ratio_1h", 0.0); |
377 | | |
378 | 172 | _no_warmup_hit_ratio = std::make_shared<bvar::Status<double>>( |
379 | 172 | _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio", 0.0); |
380 | 172 | _no_warmup_hit_ratio_5m = std::make_shared<bvar::Status<double>>( |
381 | 172 | _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_5m", 0.0); |
382 | 172 | _no_warmup_hit_ratio_1h = std::make_shared<bvar::Status<double>>( |
383 | 172 | _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_1h", 0.0); |
384 | | |
385 | 172 | _disk_limit_mode_metrics = std::make_shared<bvar::Status<size_t>>( |
386 | 172 | _cache_base_path.c_str(), "file_cache_disk_limit_mode", 0); |
387 | 172 | _need_evict_cache_in_advance_metrics = std::make_shared<bvar::Status<size_t>>( |
388 | 172 | _cache_base_path.c_str(), "file_cache_need_evict_cache_in_advance", 0); |
389 | 172 | _meta_store_write_queue_size_metrics = std::make_shared<bvar::Status<size_t>>( |
390 | 172 | _cache_base_path.c_str(), "file_cache_meta_store_write_queue_size", 0); |
391 | | |
392 | 172 | _cache_lock_wait_time_us = std::make_shared<bvar::LatencyRecorder>( |
393 | 172 | _cache_base_path.c_str(), "file_cache_cache_lock_wait_time_us"); |
394 | 172 | _get_or_set_latency_us = std::make_shared<bvar::LatencyRecorder>( |
395 | 172 | _cache_base_path.c_str(), "file_cache_get_or_set_latency_us"); |
396 | 172 | _storage_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>( |
397 | 172 | _cache_base_path.c_str(), "file_cache_storage_sync_remove_latency_us"); |
398 | 172 | _storage_retry_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>( |
399 | 172 | _cache_base_path.c_str(), "file_cache_storage_retry_sync_remove_latency_us"); |
400 | 172 | _storage_async_remove_latency_us = std::make_shared<bvar::LatencyRecorder>( |
401 | 172 | _cache_base_path.c_str(), "file_cache_storage_async_remove_latency_us"); |
402 | 172 | _evict_in_advance_latency_us = std::make_shared<bvar::LatencyRecorder>( |
403 | 172 | _cache_base_path.c_str(), "file_cache_evict_in_advance_latency_us"); |
404 | 172 | _lru_dump_latency_us = std::make_shared<bvar::LatencyRecorder>( |
405 | 172 | _cache_base_path.c_str(), "file_cache_lru_dump_latency_us"); |
406 | 172 | _recycle_keys_length_recorder = std::make_shared<bvar::LatencyRecorder>( |
407 | 172 | _cache_base_path.c_str(), "file_cache_recycle_keys_length"); |
408 | 172 | _need_update_lru_blocks_length_recorder = std::make_shared<bvar::LatencyRecorder>( |
409 | 172 | _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_length"); |
410 | 172 | _update_lru_blocks_latency_us = std::make_shared<bvar::LatencyRecorder>( |
411 | 172 | _cache_base_path.c_str(), "file_cache_update_lru_blocks_latency_us"); |
412 | 172 | _ttl_gc_latency_us = std::make_shared<bvar::LatencyRecorder>(_cache_base_path.c_str(), |
413 | 172 | "file_cache_ttl_gc_latency_us"); |
414 | 172 | _shadow_queue_levenshtein_distance = std::make_shared<bvar::LatencyRecorder>( |
415 | 172 | _cache_base_path.c_str(), "file_cache_shadow_queue_levenshtein_distance"); |
416 | | |
417 | 172 | _disposable_queue = LRUQueue(cache_settings.disposable_queue_size, |
418 | 172 | cache_settings.disposable_queue_elements, 60 * 60); |
419 | 172 | _index_queue = LRUQueue(cache_settings.index_queue_size, cache_settings.index_queue_elements, |
420 | 172 | 7 * 24 * 60 * 60); |
421 | 172 | _normal_queue = LRUQueue(cache_settings.query_queue_size, cache_settings.query_queue_elements, |
422 | 172 | 24 * 60 * 60); |
423 | 172 | _ttl_queue = LRUQueue(cache_settings.ttl_queue_size, cache_settings.ttl_queue_elements, |
424 | 172 | std::numeric_limits<int>::max()); |
425 | 172 | _cold_normal_queue = LRUQueue(cache_settings.cold_query_queue_size, |
426 | 172 | cache_settings.cold_query_queue_elements, 24 * 60 * 60); |
427 | | |
428 | 172 | _lru_recorder = std::make_unique<LRUQueueRecorder>(this); |
429 | 172 | _lru_dumper = std::make_unique<CacheLRUDumper>(this, _lru_recorder.get()); |
430 | 172 | if (cache_settings.storage == "memory") { |
431 | 16 | _storage = std::make_unique<MemFileCacheStorage>(); |
432 | 16 | _cache_base_path = "memory"; |
433 | 156 | } else { |
434 | 156 | _storage = std::make_unique<FSFileCacheStorage>(); |
435 | 156 | } |
436 | | |
437 | 172 | LOG(INFO) << "file cache path= " << _cache_base_path << " " << cache_settings.to_string(); |
438 | 172 | } |
439 | | |
440 | 441k | UInt128Wrapper BlockFileCache::hash(const std::string& path) { |
441 | 441k | uint128_t value; |
442 | 441k | sip_hash128(path.data(), path.size(), reinterpret_cast<char*>(&value)); |
443 | 441k | return UInt128Wrapper(value); |
444 | 441k | } |
445 | | |
446 | | BlockFileCache::QueryFileCacheContextHolderPtr BlockFileCache::get_query_context_holder( |
447 | 8 | const TUniqueId& query_id, int file_cache_query_limit_percent) { |
448 | 8 | SCOPED_CACHE_LOCK(_mutex, this); |
449 | 8 | if (!config::enable_file_cache_query_limit) { |
450 | 1 | return {}; |
451 | 1 | } |
452 | | |
453 | | /// if enable_filesystem_query_cache_limit is true, |
454 | | /// we create context query for current query. |
455 | 7 | auto context = get_or_set_query_context(query_id, cache_lock, file_cache_query_limit_percent); |
456 | 7 | return std::make_unique<QueryFileCacheContextHolder>(query_id, this, context); |
457 | 8 | } |
458 | | |
459 | | BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_query_context( |
460 | 12.9k | const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock) { |
461 | 12.9k | auto query_iter = _query_map.find(query_id); |
462 | 12.9k | return (query_iter == _query_map.end()) ? nullptr : query_iter->second; |
463 | 12.9k | } |
464 | | |
465 | 6 | void BlockFileCache::remove_query_context(const TUniqueId& query_id) { |
466 | 6 | SCOPED_CACHE_LOCK(_mutex, this); |
467 | 6 | const auto& query_iter = _query_map.find(query_id); |
468 | | |
469 | 6 | if (query_iter != _query_map.end() && query_iter->second.use_count() <= 1) { |
470 | 5 | _query_map.erase(query_iter); |
471 | 5 | } |
472 | 6 | } |
473 | | |
474 | | BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_or_set_query_context( |
475 | | const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock, |
476 | 7 | int file_cache_query_limit_percent) { |
477 | 7 | if (query_id.lo == 0 && query_id.hi == 0) { |
478 | 1 | return nullptr; |
479 | 1 | } |
480 | | |
481 | 6 | auto context = get_query_context(query_id, cache_lock); |
482 | 6 | if (context) { |
483 | 1 | return context; |
484 | 1 | } |
485 | | |
486 | 5 | size_t file_cache_query_limit_size = _capacity * file_cache_query_limit_percent / 100; |
487 | 5 | if (file_cache_query_limit_size < 268435456) { |
488 | 5 | LOG(WARNING) << "The user-set file cache query limit (" << file_cache_query_limit_size |
489 | 5 | << " bytes) is less than the 256MB recommended minimum. " |
490 | 5 | << "Consider increasing the session variable 'file_cache_query_limit_percent'" |
491 | 5 | << " from its current value " << file_cache_query_limit_percent << "%."; |
492 | 5 | } |
493 | 5 | auto query_context = std::make_shared<QueryFileCacheContext>(file_cache_query_limit_size); |
494 | 5 | auto query_iter = _query_map.emplace(query_id, query_context).first; |
495 | 5 | return query_iter->second; |
496 | 6 | } |
497 | | |
498 | | void BlockFileCache::QueryFileCacheContext::remove(const UInt128Wrapper& hash, size_t offset, |
499 | 20 | std::lock_guard<std::mutex>& cache_lock) { |
500 | 20 | auto pair = std::make_pair(hash, offset); |
501 | 20 | auto record = records.find(pair); |
502 | 20 | DCHECK(record != records.end()); |
503 | 20 | auto iter = record->second; |
504 | 20 | records.erase(pair); |
505 | 20 | lru_queue.remove(iter, cache_lock); |
506 | 20 | } |
507 | | |
508 | | void BlockFileCache::QueryFileCacheContext::reserve(const UInt128Wrapper& hash, size_t offset, |
509 | | size_t size, |
510 | 30 | std::lock_guard<std::mutex>& cache_lock) { |
511 | 30 | auto pair = std::make_pair(hash, offset); |
512 | 30 | if (records.find(pair) == records.end()) { |
513 | 29 | auto queue_iter = lru_queue.add(hash, offset, size, cache_lock); |
514 | 29 | records.insert({pair, queue_iter}); |
515 | 29 | } |
516 | 30 | } |
517 | | |
518 | 152 | Status BlockFileCache::initialize() { |
519 | 152 | SCOPED_CACHE_LOCK(_mutex, this); |
520 | 152 | return initialize_unlocked(cache_lock); |
521 | 152 | } |
522 | | |
523 | 152 | Status BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lock) { |
524 | 152 | DCHECK(!_is_initialized); |
525 | 152 | _is_initialized = true; |
526 | 152 | if (config::file_cache_background_lru_dump_tail_record_num > 0) { |
527 | | // requirements: |
528 | | // 1. restored data should not overwrite the last dump |
529 | | // 2. restore should happen before load and async load |
530 | | // 3. all queues should be restored sequencially to avoid conflict |
531 | | // TODO(zhengyu): we can parralize them but will increase complexity, so lets check the time cost |
532 | | // to see if any improvement is a necessary |
533 | 152 | restore_lru_queues_from_disk(cache_lock); |
534 | 152 | } |
535 | 152 | RETURN_IF_ERROR(_storage->init(this)); |
536 | | |
537 | 152 | if (auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get())) { |
538 | 137 | if (auto* meta_store = fs_storage->get_meta_store()) { |
539 | 137 | _ttl_mgr = std::make_unique<BlockFileCacheTtlMgr>(this, meta_store); |
540 | 137 | } |
541 | 137 | } |
542 | | |
543 | 152 | _cache_background_monitor_thread = std::thread(&BlockFileCache::run_background_monitor, this); |
544 | 152 | _cache_background_gc_thread = std::thread(&BlockFileCache::run_background_gc, this); |
545 | 152 | _cache_background_evict_in_advance_thread = |
546 | 152 | std::thread(&BlockFileCache::run_background_evict_in_advance, this); |
547 | 152 | _cache_background_block_lru_update_thread = |
548 | 152 | std::thread(&BlockFileCache::run_background_block_lru_update, this); |
549 | | |
550 | | // Initialize LRU dump thread and restore queues |
551 | 152 | _cache_background_lru_dump_thread = std::thread(&BlockFileCache::run_background_lru_dump, this); |
552 | 152 | _cache_background_lru_log_replay_thread = |
553 | 152 | std::thread(&BlockFileCache::run_background_lru_log_replay, this); |
554 | | |
555 | 152 | return Status::OK(); |
556 | 152 | } |
557 | | |
558 | | void BlockFileCache::update_block_lru(FileBlockSPtr block, |
559 | 480 | std::lock_guard<std::mutex>& cache_lock) { |
560 | 480 | FileBlockCell* cell = block->cell; |
561 | 480 | if (cell) { |
562 | 478 | if (cell->queue_iterator) { |
563 | 478 | auto& queue = get_queue(block->cache_type()); |
564 | 478 | queue.move_to_end(*cell->queue_iterator, cache_lock); |
565 | 478 | _lru_recorder->record_queue_event(block->cache_type(), CacheLRULogType::MOVETOBACK, |
566 | 478 | block->_key.hash, block->_key.offset, |
567 | 478 | block->_block_range.size()); |
568 | 478 | } |
569 | 478 | cell->update_atime(); |
570 | 478 | } |
571 | 480 | } |
572 | | |
573 | | void BlockFileCache::use_cell(FileBlockCell& cell, FileBlocks* result, bool move_iter_flag, |
574 | 4.26M | std::lock_guard<std::mutex>& cache_lock) { |
575 | 4.26M | if (result) { |
576 | 4.26M | result->push_back(cell.file_block); |
577 | 4.26M | } |
578 | | |
579 | 4.26M | auto& queue = get_queue(cell.file_block->cache_type()); |
580 | | /// Move to the end of the queue. The iterator remains valid. |
581 | 4.26M | if (cell.queue_iterator && move_iter_flag) { |
582 | 3.28M | if (cell.file_block->cache_type() == FileCacheType::COLD_NORMAL) { |
583 | 34 | auto now_time = std::chrono::duration_cast<std::chrono::milliseconds>( |
584 | 34 | std::chrono::steady_clock::now().time_since_epoch()) |
585 | 34 | .count(); |
586 | 34 | if (now_time - cell.atime > config::file_cache_2qlru_cold_blocks_promotion_ms) { |
587 | 28 | queue.remove(*cell.queue_iterator, cache_lock); |
588 | 28 | _lru_recorder->record_queue_event( |
589 | 28 | FileCacheType::COLD_NORMAL, CacheLRULogType::REMOVE, |
590 | 28 | cell.file_block->get_hash_value(), cell.file_block->offset(), cell.size()); |
591 | 28 | auto& normal_queue = get_queue(FileCacheType::NORMAL); |
592 | 28 | cell.queue_iterator = |
593 | 28 | normal_queue.add(cell.file_block->_key.hash, cell.file_block->_key.offset, |
594 | 28 | cell.size(), cache_lock); |
595 | 28 | _lru_recorder->record_queue_event(FileCacheType::NORMAL, CacheLRULogType::ADD, |
596 | 28 | cell.file_block->get_hash_value(), |
597 | 28 | cell.file_block->offset(), cell.size()); |
598 | 28 | cell.file_block->set_cache_type(FileCacheType::NORMAL); |
599 | 28 | } else { |
600 | 6 | queue.move_to_end(*cell.queue_iterator, cache_lock); |
601 | 6 | _lru_recorder->record_queue_event( |
602 | 6 | cell.file_block->cache_type(), CacheLRULogType::MOVETOBACK, |
603 | 6 | cell.file_block->_key.hash, cell.file_block->_key.offset, cell.size()); |
604 | 6 | } |
605 | 3.28M | } else { |
606 | 3.28M | queue.move_to_end(*cell.queue_iterator, cache_lock); |
607 | 3.28M | _lru_recorder->record_queue_event( |
608 | 3.28M | cell.file_block->cache_type(), CacheLRULogType::MOVETOBACK, |
609 | 3.28M | cell.file_block->_key.hash, cell.file_block->_key.offset, cell.size()); |
610 | 3.28M | } |
611 | 3.28M | } |
612 | | |
613 | 4.26M | cell.update_atime(); |
614 | 4.26M | } |
615 | | |
616 | | template <class T> |
617 | | requires IsXLock<T> |
618 | | FileBlockCell* BlockFileCache::get_cell(const UInt128Wrapper& hash, size_t offset, |
619 | 898k | T& /* cache_lock */) { |
620 | 898k | auto it = _files.find(hash); |
621 | 898k | if (it == _files.end()) { |
622 | 1 | return nullptr; |
623 | 1 | } |
624 | | |
625 | 898k | auto& offsets = it->second; |
626 | 898k | auto cell_it = offsets.find(offset); |
627 | 898k | if (cell_it == offsets.end()) { |
628 | 3 | return nullptr; |
629 | 3 | } |
630 | | |
631 | 897k | return &cell_it->second; |
632 | 898k | } |
633 | | |
634 | 4.26M | bool BlockFileCache::need_to_move(FileCacheType cell_type, FileCacheType query_type) const { |
635 | 4.26M | return query_type != FileCacheType::DISPOSABLE && cell_type != FileCacheType::DISPOSABLE; |
636 | 4.26M | } |
637 | | |
638 | | FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheContext& context, |
639 | | const FileBlock::Range& range, |
640 | 4.34M | std::lock_guard<std::mutex>& cache_lock) { |
641 | | /// Given range = [left, right] and non-overlapping ordered set of file blocks, |
642 | | /// find list [block1, ..., blockN] of blocks which intersect with given range. |
643 | 4.34M | auto it = _files.find(hash); |
644 | 4.34M | if (it == _files.end()) { |
645 | 108k | if (_async_open_done) { |
646 | 108k | return {}; |
647 | 108k | } |
648 | 0 | FileCacheKey key; |
649 | 0 | key.hash = hash; |
650 | 0 | key.meta.type = context.cache_type; |
651 | 0 | key.meta.expiration_time = context.expiration_time; |
652 | 0 | key.meta.tablet_id = context.tablet_id; |
653 | 0 | _storage->load_blocks_directly_unlocked(this, key, cache_lock); |
654 | |
|
655 | 0 | it = _files.find(hash); |
656 | 0 | if (it == _files.end()) [[unlikely]] { |
657 | 0 | return {}; |
658 | 0 | } |
659 | 0 | } |
660 | | |
661 | 4.23M | auto& file_blocks = it->second; |
662 | 4.23M | if (file_blocks.empty()) { |
663 | 0 | LOG(WARNING) << "file_blocks is empty for hash=" << hash.to_string() |
664 | 0 | << " cache type=" << context.cache_type |
665 | 0 | << " cache expiration time=" << context.expiration_time |
666 | 0 | << " cache range=" << range.left << " " << range.right |
667 | 0 | << " query id=" << context.query_id; |
668 | 0 | DCHECK(false); |
669 | 0 | _files.erase(hash); |
670 | 0 | return {}; |
671 | 0 | } |
672 | | |
673 | 4.23M | FileBlocks result; |
674 | 4.23M | auto block_it = file_blocks.lower_bound(range.left); |
675 | 4.23M | if (block_it == file_blocks.end()) { |
676 | | /// N - last cached block for given file hash, block{N}.offset < range.left: |
677 | | /// block{N} block{N} |
678 | | /// [________ [_______] |
679 | | /// [__________] OR [________] |
680 | | /// ^ ^ |
681 | | /// range.left range.left |
682 | | |
683 | 7.60k | auto& cell = file_blocks.rbegin()->second; |
684 | 7.60k | if (cell.file_block->range().right < range.left) { |
685 | 7.59k | return {}; |
686 | 7.59k | } |
687 | | |
688 | 14 | use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type), |
689 | 14 | cache_lock); |
690 | 4.23M | } else { /// block_it <-- segmment{k} |
691 | 4.23M | if (block_it != file_blocks.begin()) { |
692 | 103k | auto& prev_cell = std::prev(block_it)->second; |
693 | 103k | auto& prev_cell_range = prev_cell.file_block->range(); |
694 | | |
695 | 103k | if (range.left <= prev_cell_range.right) { |
696 | | /// block{k-1} block{k} |
697 | | /// [________] [_____ |
698 | | /// [___________ |
699 | | /// ^ |
700 | | /// range.left |
701 | | |
702 | 137 | use_cell(prev_cell, &result, |
703 | 137 | need_to_move(prev_cell.file_block->cache_type(), context.cache_type), |
704 | 137 | cache_lock); |
705 | 137 | } |
706 | 103k | } |
707 | | |
708 | | /// block{k} ... block{k-1} block{k} block{k} |
709 | | /// [______ [______] [____ [________ |
710 | | /// [_________ OR [________ OR [______] ^ |
711 | | /// ^ ^ ^ block{k}.offset |
712 | | /// range.left range.left range.right |
713 | | |
714 | 8.49M | while (block_it != file_blocks.end()) { |
715 | 4.47M | auto& cell = block_it->second; |
716 | 4.47M | if (range.right < cell.file_block->range().left) { |
717 | 211k | break; |
718 | 211k | } |
719 | | |
720 | 4.26M | use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type), |
721 | 4.26M | cache_lock); |
722 | 4.26M | ++block_it; |
723 | 4.26M | } |
724 | 4.23M | } |
725 | | |
726 | 4.23M | return result; |
727 | 4.23M | } |
728 | | |
729 | 8.86k | void BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) { |
730 | 8.86k | if (_need_update_lru_blocks.insert(std::move(block))) { |
731 | 937 | *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size(); |
732 | 937 | } |
733 | 8.86k | } |
734 | | |
735 | 4 | std::string BlockFileCache::clear_file_cache_async() { |
736 | 4 | LOG(INFO) << "start clear_file_cache_async, path=" << _cache_base_path; |
737 | 4 | _lru_dumper->remove_lru_dump_files(); |
738 | 4 | int64_t num_cells_all = 0; |
739 | 4 | int64_t num_cells_to_delete = 0; |
740 | 4 | int64_t num_cells_wait_recycle = 0; |
741 | 4 | int64_t num_files_all = 0; |
742 | 4 | TEST_SYNC_POINT_CALLBACK("BlockFileCache::clear_file_cache_async"); |
743 | 4 | { |
744 | 4 | SCOPED_CACHE_LOCK(_mutex, this); |
745 | | |
746 | 4 | std::vector<FileBlockCell*> deleting_cells; |
747 | 4 | for (auto& [_, offset_to_cell] : _files) { |
748 | 4 | ++num_files_all; |
749 | 48 | for (auto& [_1, cell] : offset_to_cell) { |
750 | 48 | ++num_cells_all; |
751 | 48 | deleting_cells.push_back(&cell); |
752 | 48 | } |
753 | 4 | } |
754 | | |
755 | | // we cannot delete the element in the loop above, because it will break the iterator |
756 | 48 | for (auto& cell : deleting_cells) { |
757 | 48 | if (!cell->releasable()) { |
758 | 2 | LOG(INFO) << "cell is not releasable, hash=" |
759 | 2 | << " offset=" << cell->file_block->offset(); |
760 | 2 | cell->file_block->set_deleting(); |
761 | 2 | ++num_cells_wait_recycle; |
762 | 2 | continue; |
763 | 2 | } |
764 | 46 | FileBlockSPtr file_block = cell->file_block; |
765 | 46 | if (file_block) { |
766 | 46 | std::lock_guard block_lock(file_block->_mutex); |
767 | 46 | remove(file_block, cache_lock, block_lock, false); |
768 | 46 | ++num_cells_to_delete; |
769 | 46 | } |
770 | 46 | } |
771 | 4 | clear_need_update_lru_blocks(); |
772 | 4 | } |
773 | | |
774 | 4 | std::stringstream ss; |
775 | 4 | ss << "finish clear_file_cache_async, path=" << _cache_base_path |
776 | 4 | << " num_files_all=" << num_files_all << " num_cells_all=" << num_cells_all |
777 | 4 | << " num_cells_to_delete=" << num_cells_to_delete |
778 | 4 | << " num_cells_wait_recycle=" << num_cells_wait_recycle; |
779 | 4 | auto msg = ss.str(); |
780 | 4 | LOG(INFO) << msg; |
781 | 4 | _lru_dumper->remove_lru_dump_files(); |
782 | 4 | return msg; |
783 | 4 | } |
784 | | |
785 | | FileBlocks BlockFileCache::split_range_into_cells(const UInt128Wrapper& hash, |
786 | | const CacheContext& context, size_t offset, |
787 | | size_t size, FileBlock::State state, |
788 | 118k | std::lock_guard<std::mutex>& cache_lock) { |
789 | 118k | DCHECK(size > 0); |
790 | | |
791 | 118k | auto current_pos = offset; |
792 | 118k | auto end_pos_non_included = offset + size; |
793 | | |
794 | 118k | size_t current_size = 0; |
795 | 118k | size_t remaining_size = size; |
796 | | |
797 | 118k | FileBlocks file_blocks; |
798 | 267k | while (current_pos < end_pos_non_included) { |
799 | 148k | current_size = std::min(remaining_size, _max_file_block_size); |
800 | 148k | remaining_size -= current_size; |
801 | 148k | state = try_reserve(hash, context, current_pos, current_size, cache_lock) |
802 | 148k | ? state |
803 | 148k | : FileBlock::State::SKIP_CACHE; |
804 | 148k | if (state == FileBlock::State::SKIP_CACHE) [[unlikely]] { |
805 | 1.45k | FileCacheKey key; |
806 | 1.45k | key.hash = hash; |
807 | 1.45k | key.offset = current_pos; |
808 | 1.45k | key.meta.type = context.cache_type; |
809 | 1.45k | key.meta.expiration_time = context.expiration_time; |
810 | 1.45k | key.meta.tablet_id = context.tablet_id; |
811 | 1.45k | auto file_block = std::make_shared<FileBlock>(key, current_size, this, |
812 | 1.45k | FileBlock::State::SKIP_CACHE); |
813 | 1.45k | file_blocks.push_back(std::move(file_block)); |
814 | 147k | } else { |
815 | 147k | auto* cell = add_cell(hash, context, current_pos, current_size, state, cache_lock); |
816 | 147k | if (cell) { |
817 | 147k | file_blocks.push_back(cell->file_block); |
818 | 147k | if (!context.is_cold_data) { |
819 | 147k | cell->update_atime(); |
820 | 147k | } |
821 | 147k | } |
822 | 147k | if (_ttl_mgr && context.tablet_id != 0) { |
823 | 139k | _ttl_mgr->register_tablet_id(context.tablet_id); |
824 | 139k | } |
825 | 147k | } |
826 | | |
827 | 148k | current_pos += current_size; |
828 | 148k | } |
829 | | |
830 | 118k | DCHECK(file_blocks.empty() || offset + size - 1 == file_blocks.back()->range().right); |
831 | 118k | return file_blocks; |
832 | 118k | } |
833 | | |
834 | | void BlockFileCache::fill_holes_with_empty_file_blocks(FileBlocks& file_blocks, |
835 | | const UInt128Wrapper& hash, |
836 | | const CacheContext& context, |
837 | | const FileBlock::Range& range, |
838 | 4.22M | std::lock_guard<std::mutex>& cache_lock) { |
839 | | /// There are blocks [block1, ..., blockN] |
840 | | /// (non-overlapping, non-empty, ascending-ordered) which (maybe partially) |
841 | | /// intersect with given range. |
842 | | |
843 | | /// It can have holes: |
844 | | /// [____________________] -- requested range |
845 | | /// [____] [_] [_________] -- intersecting cache [block1, ..., blockN] |
846 | | /// |
847 | | /// For each such hole create a cell with file block state EMPTY. |
848 | | |
849 | 4.22M | auto it = file_blocks.begin(); |
850 | 4.22M | auto block_range = (*it)->range(); |
851 | | |
852 | 4.22M | size_t current_pos = 0; |
853 | 4.22M | if (block_range.left < range.left) { |
854 | | /// [_______ -- requested range |
855 | | /// [_______ |
856 | | /// ^ |
857 | | /// block1 |
858 | | |
859 | 151 | current_pos = block_range.right + 1; |
860 | 151 | ++it; |
861 | 4.22M | } else { |
862 | 4.22M | current_pos = range.left; |
863 | 4.22M | } |
864 | | |
865 | 8.49M | while (current_pos <= range.right && it != file_blocks.end()) { |
866 | 4.26M | block_range = (*it)->range(); |
867 | | |
868 | 4.26M | if (current_pos == block_range.left) { |
869 | 4.26M | current_pos = block_range.right + 1; |
870 | 4.26M | ++it; |
871 | 4.26M | continue; |
872 | 4.26M | } |
873 | | |
874 | 4.26M | DCHECK(current_pos < block_range.left); |
875 | | |
876 | 185 | auto hole_size = block_range.left - current_pos; |
877 | | |
878 | 185 | file_blocks.splice(it, split_range_into_cells(hash, context, current_pos, hole_size, |
879 | 185 | FileBlock::State::EMPTY, cache_lock)); |
880 | | |
881 | 185 | current_pos = block_range.right + 1; |
882 | 185 | ++it; |
883 | 185 | } |
884 | | |
885 | 4.22M | if (current_pos <= range.right) { |
886 | | /// ________] -- requested range |
887 | | /// _____] |
888 | | /// ^ |
889 | | /// blockN |
890 | | |
891 | 507 | auto hole_size = range.right - current_pos + 1; |
892 | | |
893 | 507 | file_blocks.splice(file_blocks.end(), |
894 | 507 | split_range_into_cells(hash, context, current_pos, hole_size, |
895 | 507 | FileBlock::State::EMPTY, cache_lock)); |
896 | 507 | } |
897 | 4.22M | } |
898 | | |
899 | | FileBlocksHolder BlockFileCache::get_or_set(const UInt128Wrapper& hash, size_t offset, size_t size, |
900 | 4.34M | CacheContext& context) { |
901 | 4.34M | FileBlock::Range range(offset, offset + size - 1); |
902 | | |
903 | 4.34M | ReadStatistics* stats = context.stats; |
904 | 4.34M | DCHECK(stats != nullptr); |
905 | 4.34M | MonotonicStopWatch sw; |
906 | 4.34M | sw.start(); |
907 | 4.34M | ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->increment(); |
908 | 4.34M | std::lock_guard cache_lock(_mutex); |
909 | 4.34M | ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->decrement(); |
910 | 4.34M | stats->lock_wait_timer += sw.elapsed_time(); |
911 | 4.34M | FileBlocks file_blocks; |
912 | 4.34M | int64_t duration = 0; |
913 | 4.34M | { |
914 | 4.34M | SCOPED_RAW_TIMER(&duration); |
915 | | /// Get all blocks which intersect with the given range. |
916 | 4.34M | { |
917 | 4.34M | SCOPED_RAW_TIMER(&stats->get_timer); |
918 | 4.34M | file_blocks = get_impl(hash, context, range, cache_lock); |
919 | 4.34M | } |
920 | | |
921 | 4.34M | if (file_blocks.empty()) { |
922 | 117k | SCOPED_RAW_TIMER(&stats->set_timer); |
923 | 117k | file_blocks = split_range_into_cells(hash, context, offset, size, |
924 | 117k | FileBlock::State::EMPTY, cache_lock); |
925 | 4.22M | } else { |
926 | 4.22M | SCOPED_RAW_TIMER(&stats->set_timer); |
927 | 4.22M | fill_holes_with_empty_file_blocks(file_blocks, hash, context, range, cache_lock); |
928 | 4.22M | } |
929 | 4.34M | DCHECK(!file_blocks.empty()); |
930 | 4.34M | *_num_read_blocks << file_blocks.size(); |
931 | 4.34M | if (!context.is_warmup) { |
932 | 4.34M | *_no_warmup_num_read_blocks << file_blocks.size(); |
933 | 4.34M | } |
934 | 4.41M | for (auto& block : file_blocks) { |
935 | 4.41M | size_t block_size = block->range().size(); |
936 | 4.41M | *_total_read_size_metrics << block_size; |
937 | 4.41M | if (block->state_unsafe() == FileBlock::State::DOWNLOADED) { |
938 | 4.25M | *_num_hit_blocks << 1; |
939 | 4.25M | *_total_hit_size_metrics << block_size; |
940 | 4.25M | if (!context.is_warmup) { |
941 | 4.25M | *_no_warmup_num_hit_blocks << 1; |
942 | 4.25M | } |
943 | 4.25M | } |
944 | 4.41M | } |
945 | 4.34M | } |
946 | 4.34M | *_get_or_set_latency_us << (duration / 1000); |
947 | 4.34M | return FileBlocksHolder(std::move(file_blocks)); |
948 | 4.34M | } |
949 | | |
950 | | FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& hash, const CacheContext& context, |
951 | | size_t offset, size_t size, FileBlock::State state, |
952 | 147k | std::lock_guard<std::mutex>& cache_lock) { |
953 | | /// Create a file block cell and put it in `files` map by [hash][offset]. |
954 | 147k | if (size == 0) { |
955 | 0 | return nullptr; /// Empty files are not cached. |
956 | 0 | } |
957 | | |
958 | 147k | VLOG_DEBUG << "Adding file block to cache. size=" << size << " hash=" << hash.to_string() |
959 | 0 | << " offset=" << offset << " cache_type=" << cache_type_to_string(context.cache_type) |
960 | 0 | << " expiration_time=" << context.expiration_time |
961 | 0 | << " tablet_id=" << context.tablet_id; |
962 | | |
963 | 147k | if (size > 1024 * 1024 * 1024) { |
964 | 0 | LOG(WARNING) << "File block size is too large for a block. size=" << size |
965 | 0 | << " hash=" << hash.to_string() << " offset=" << offset |
966 | 0 | << " stack:" << get_stack_trace(); |
967 | 0 | } |
968 | | |
969 | 147k | auto& offsets = _files[hash]; |
970 | 147k | auto itr = offsets.find(offset); |
971 | 147k | if (itr != offsets.end()) { |
972 | 5 | VLOG_DEBUG << "Cache already exists for hash: " << hash.to_string() |
973 | 0 | << ", offset: " << offset << ", size: " << size |
974 | 0 | << ".\nCurrent cache structure: " << dump_structure_unlocked(hash, cache_lock); |
975 | 5 | return &(itr->second); |
976 | 5 | } |
977 | | |
978 | 147k | FileCacheKey key; |
979 | 147k | key.hash = hash; |
980 | 147k | key.offset = offset; |
981 | 147k | key.meta.type = context.cache_type; |
982 | 147k | key.meta.expiration_time = context.expiration_time; |
983 | 147k | key.meta.tablet_id = context.tablet_id; |
984 | 147k | FileBlockCell cell(std::make_shared<FileBlock>(key, size, this, state), cache_lock); |
985 | 147k | Status st; |
986 | 147k | if (context.expiration_time == 0 && context.cache_type == FileCacheType::TTL) { |
987 | 0 | st = cell.file_block->change_cache_type_lock(config::enable_file_cache_normal_queue_2qlru |
988 | 0 | ? FileCacheType::COLD_NORMAL |
989 | 0 | : FileCacheType::NORMAL, |
990 | 0 | cache_lock); |
991 | 147k | } else if (context.cache_type != FileCacheType::TTL && context.expiration_time != 0) { |
992 | 1 | st = cell.file_block->change_cache_type_lock(FileCacheType::TTL, cache_lock); |
993 | 1 | } |
994 | 147k | if (!st.ok()) { |
995 | 0 | LOG(WARNING) << "Cannot change cache type. expiration_time=" << context.expiration_time |
996 | 0 | << " cache_type=" << cache_type_to_string(context.cache_type) |
997 | 0 | << " error=" << st.msg(); |
998 | 0 | } |
999 | | |
1000 | 147k | auto& queue = get_queue(cell.file_block->cache_type()); |
1001 | 147k | cell.queue_iterator = queue.add(hash, offset, size, cache_lock); |
1002 | 147k | _lru_recorder->record_queue_event(cell.file_block->cache_type(), CacheLRULogType::ADD, |
1003 | 147k | cell.file_block->get_hash_value(), cell.file_block->offset(), |
1004 | 147k | cell.size()); |
1005 | | |
1006 | 147k | if (cell.file_block->cache_type() == FileCacheType::TTL) { |
1007 | 3.92k | _cur_ttl_size += cell.size(); |
1008 | 3.92k | } |
1009 | 147k | auto [it, _] = offsets.insert(std::make_pair(offset, std::move(cell))); |
1010 | 147k | _cur_cache_size += size; |
1011 | 147k | return &(it->second); |
1012 | 147k | } |
1013 | | |
1014 | 0 | size_t BlockFileCache::try_release() { |
1015 | 0 | SCOPED_CACHE_LOCK(_mutex, this); |
1016 | 0 | std::vector<FileBlockCell*> trash; |
1017 | 0 | for (auto& [hash, blocks] : _files) { |
1018 | 0 | for (auto& [offset, cell] : blocks) { |
1019 | 0 | if (cell.releasable()) { |
1020 | 0 | trash.emplace_back(&cell); |
1021 | 0 | } else { |
1022 | 0 | cell.file_block->set_deleting(); |
1023 | 0 | } |
1024 | 0 | } |
1025 | 0 | } |
1026 | 0 | size_t remove_size = 0; |
1027 | 0 | for (auto& cell : trash) { |
1028 | 0 | FileBlockSPtr file_block = cell->file_block; |
1029 | 0 | std::lock_guard lc(cell->file_block->_mutex); |
1030 | 0 | remove_size += file_block->range().size(); |
1031 | 0 | remove(file_block, cache_lock, lc); |
1032 | 0 | VLOG_DEBUG << "try_release " << _cache_base_path |
1033 | 0 | << " hash=" << file_block->get_hash_value().to_string() |
1034 | 0 | << " offset=" << file_block->offset(); |
1035 | 0 | } |
1036 | 0 | *_evict_by_try_release << remove_size; |
1037 | 0 | LOG(INFO) << "Released " << trash.size() << " blocks in file cache " << _cache_base_path; |
1038 | 0 | return trash.size(); |
1039 | 0 | } |
1040 | | |
1041 | 5.02M | LRUQueue& BlockFileCache::get_queue(FileCacheType type) { |
1042 | 5.02M | switch (type) { |
1043 | 558k | case FileCacheType::INDEX: |
1044 | 558k | return _index_queue; |
1045 | 236k | case FileCacheType::DISPOSABLE: |
1046 | 236k | return _disposable_queue; |
1047 | 4.07M | case FileCacheType::NORMAL: |
1048 | 4.07M | return _normal_queue; |
1049 | 6.48k | case FileCacheType::TTL: |
1050 | 6.48k | return _ttl_queue; |
1051 | 150k | case FileCacheType::COLD_NORMAL: |
1052 | 150k | return _cold_normal_queue; |
1053 | 0 | default: |
1054 | 0 | DCHECK(false); |
1055 | 5.02M | } |
1056 | 0 | return _normal_queue; |
1057 | 5.02M | } |
1058 | | |
1059 | 137 | const LRUQueue& BlockFileCache::get_queue(FileCacheType type) const { |
1060 | 137 | switch (type) { |
1061 | 33 | case FileCacheType::INDEX: |
1062 | 33 | return _index_queue; |
1063 | 1 | case FileCacheType::DISPOSABLE: |
1064 | 1 | return _disposable_queue; |
1065 | 103 | case FileCacheType::NORMAL: |
1066 | 103 | return _normal_queue; |
1067 | 0 | case FileCacheType::TTL: |
1068 | 0 | return _ttl_queue; |
1069 | 0 | case FileCacheType::COLD_NORMAL: |
1070 | 0 | return _cold_normal_queue; |
1071 | 0 | default: |
1072 | 0 | DCHECK(false); |
1073 | 137 | } |
1074 | 0 | return _normal_queue; |
1075 | 137 | } |
1076 | | |
1077 | | void BlockFileCache::remove_file_blocks(std::vector<FileBlockCell*>& to_evict, |
1078 | | std::lock_guard<std::mutex>& cache_lock, bool sync, |
1079 | 271k | std::string& reason) { |
1080 | 271k | auto remove_file_block_if = [&](FileBlockCell* cell) { |
1081 | 109k | FileBlockSPtr file_block = cell->file_block; |
1082 | 109k | if (file_block) { |
1083 | 109k | std::lock_guard block_lock(file_block->_mutex); |
1084 | 109k | remove(file_block, cache_lock, block_lock, sync); |
1085 | 109k | VLOG_DEBUG << "remove_file_blocks" |
1086 | 0 | << " hash=" << file_block->get_hash_value().to_string() |
1087 | 0 | << " offset=" << file_block->offset() << " reason=" << reason; |
1088 | 109k | } |
1089 | 109k | }; |
1090 | 271k | std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if); |
1091 | 271k | } |
1092 | | |
1093 | | void BlockFileCache::find_evict_candidates(LRUQueue& queue, size_t size, size_t cur_cache_size, |
1094 | | size_t& removed_size, |
1095 | | std::vector<FileBlockCell*>& to_evict, |
1096 | | std::lock_guard<std::mutex>& cache_lock, |
1097 | 5.66k | size_t& cur_removed_size, bool evict_in_advance) { |
1098 | 747k | for (const auto& [entry_key, entry_offset, entry_size] : queue) { |
1099 | 747k | if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) { |
1100 | 3.78k | break; |
1101 | 3.78k | } |
1102 | 743k | auto* cell = get_cell(entry_key, entry_offset, cache_lock); |
1103 | | |
1104 | 743k | DCHECK(cell) << "Cache became inconsistent. key: " << entry_key.to_string() |
1105 | 0 | << ", offset: " << entry_offset; |
1106 | | |
1107 | 743k | size_t cell_size = cell->size(); |
1108 | 743k | DCHECK(entry_size == cell_size); |
1109 | | |
1110 | 743k | if (cell->releasable()) { |
1111 | 106k | auto& file_block = cell->file_block; |
1112 | | |
1113 | 106k | std::lock_guard block_lock(file_block->_mutex); |
1114 | 106k | DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED); |
1115 | 106k | to_evict.push_back(cell); |
1116 | 106k | removed_size += cell_size; |
1117 | 106k | cur_removed_size += cell_size; |
1118 | 106k | } |
1119 | 743k | } |
1120 | 5.66k | } |
1121 | | |
1122 | | // 1. if async load file cache not finish |
1123 | | // a. evict from lru queue |
1124 | | // 2. if ttl cache |
1125 | | // a. evict from disposable/normal/index queue one by one |
1126 | | // 3. if dont reach query limit or dont have query limit |
1127 | | // a. evict from other queue |
1128 | | // b. evict from current queue |
1129 | | // a.1 if the data belong write, then just evict cold data |
1130 | | // 4. if reach query limit |
1131 | | // a. evict from query queue |
1132 | | // b. evict from other queue |
1133 | | bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const CacheContext& context, |
1134 | | size_t offset, size_t size, |
1135 | 148k | std::lock_guard<std::mutex>& cache_lock) { |
1136 | 148k | if (!_async_open_done) { |
1137 | 2 | return try_reserve_during_async_load(size, cache_lock); |
1138 | 2 | } |
1139 | | // use this strategy in scenarios where there is insufficient disk capacity or insufficient number of inodes remaining |
1140 | | // directly eliminate 5 times the size of the space |
1141 | 148k | if (_disk_resource_limit_mode) { |
1142 | 1 | size = 5 * size; |
1143 | 1 | } |
1144 | | |
1145 | 148k | auto query_context = config::enable_file_cache_query_limit && |
1146 | 148k | (context.query_id.hi != 0 || context.query_id.lo != 0) |
1147 | 148k | ? get_query_context(context.query_id, cache_lock) |
1148 | 148k | : nullptr; |
1149 | 148k | if (!query_context) { |
1150 | 148k | return try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock); |
1151 | 148k | } else if (query_context->get_cache_size(cache_lock) + size <= |
1152 | 31 | query_context->get_max_cache_size()) { |
1153 | 16 | return try_reserve_for_lru(hash, query_context, context, offset, size, cache_lock); |
1154 | 16 | } |
1155 | 15 | int64_t cur_time = std::chrono::duration_cast<std::chrono::milliseconds>( |
1156 | 15 | std::chrono::steady_clock::now().time_since_epoch()) |
1157 | 15 | .count(); |
1158 | 15 | auto& queue = get_queue(context.cache_type); |
1159 | 15 | size_t removed_size = 0; |
1160 | 15 | size_t ghost_remove_size = 0; |
1161 | 15 | size_t queue_size = queue.get_capacity(cache_lock); |
1162 | 15 | size_t cur_cache_size = _cur_cache_size; |
1163 | 15 | size_t query_context_cache_size = query_context->get_cache_size(cache_lock); |
1164 | | |
1165 | 15 | std::vector<LRUQueue::Iterator> ghost; |
1166 | 15 | std::vector<FileBlockCell*> to_evict; |
1167 | | |
1168 | 15 | size_t max_size = queue.get_max_size(); |
1169 | 48 | auto is_overflow = [&] { |
1170 | 48 | return _disk_resource_limit_mode ? removed_size < size |
1171 | 48 | : cur_cache_size + size - removed_size > _capacity || |
1172 | 48 | (queue_size + size - removed_size > max_size) || |
1173 | 48 | (query_context_cache_size + size - |
1174 | 38 | (removed_size + ghost_remove_size) > |
1175 | 38 | query_context->get_max_cache_size()); |
1176 | 48 | }; |
1177 | | |
1178 | | /// Select the cache from the LRU queue held by query for expulsion. |
1179 | 35 | for (auto iter = query_context->queue().begin(); iter != query_context->queue().end(); iter++) { |
1180 | 33 | if (!is_overflow()) { |
1181 | 13 | break; |
1182 | 13 | } |
1183 | | |
1184 | 20 | auto* cell = get_cell(iter->hash, iter->offset, cache_lock); |
1185 | | |
1186 | 20 | if (!cell) { |
1187 | | /// The cache corresponding to this record may be swapped out by |
1188 | | /// other queries, so it has become invalid. |
1189 | 4 | ghost.push_back(iter); |
1190 | 4 | ghost_remove_size += iter->size; |
1191 | 16 | } else { |
1192 | 16 | size_t cell_size = cell->size(); |
1193 | 16 | DCHECK(iter->size == cell_size); |
1194 | | |
1195 | 16 | if (cell->releasable()) { |
1196 | 16 | auto& file_block = cell->file_block; |
1197 | | |
1198 | 16 | std::lock_guard block_lock(file_block->_mutex); |
1199 | 16 | DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED); |
1200 | 16 | to_evict.push_back(cell); |
1201 | 16 | removed_size += cell_size; |
1202 | 16 | } |
1203 | 16 | } |
1204 | 20 | } |
1205 | | |
1206 | 16 | auto remove_file_block_if = [&](FileBlockCell* cell) { |
1207 | 16 | FileBlockSPtr file_block = cell->file_block; |
1208 | 16 | if (file_block) { |
1209 | 16 | query_context->remove(file_block->get_hash_value(), file_block->offset(), cache_lock); |
1210 | 16 | std::lock_guard block_lock(file_block->_mutex); |
1211 | 16 | remove(file_block, cache_lock, block_lock); |
1212 | 16 | } |
1213 | 16 | }; |
1214 | | |
1215 | 15 | for (auto& iter : ghost) { |
1216 | 4 | query_context->remove(iter->hash, iter->offset, cache_lock); |
1217 | 4 | } |
1218 | | |
1219 | 15 | std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if); |
1220 | | |
1221 | 15 | if (is_overflow() && |
1222 | 15 | !try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock)) { |
1223 | 1 | return false; |
1224 | 1 | } |
1225 | 14 | query_context->reserve(hash, offset, size, cache_lock); |
1226 | 14 | return true; |
1227 | 15 | } |
1228 | | |
1229 | 253 | void BlockFileCache::try_evict_in_advance(size_t size, std::lock_guard<std::mutex>& cache_lock) { |
1230 | 253 | UInt128Wrapper hash = UInt128Wrapper(); |
1231 | 253 | size_t offset = 0; |
1232 | 253 | CacheContext context; |
1233 | | /* we pick NORMAL and TTL cache to evict in advance |
1234 | | * we reserve for them but won't acutually give space to them |
1235 | | * on the contrary, NORMAL and TTL may sacrifice by LRU evicting themselves |
1236 | | * other cache types cannot be exempted because we will evict what they have stolen before LRU evicting |
1237 | | * in summary: all cache types will shrink somewhat, and NORMAL and TTL shrink the most, to make sure the cache is not full |
1238 | | */ |
1239 | 253 | context.cache_type = FileCacheType::NORMAL; |
1240 | 253 | try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true); |
1241 | 253 | context.cache_type = FileCacheType::TTL; |
1242 | 253 | try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true); |
1243 | 253 | } |
1244 | | |
1245 | | // remove specific cache synchronously, for critical operations |
1246 | | // if in use, cache meta will be deleted after use and the block file is then deleted asynchronously |
1247 | 7 | void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) { |
1248 | 7 | std::string reason = "remove_if_cached"; |
1249 | 7 | SCOPED_CACHE_LOCK(_mutex, this); |
1250 | 7 | auto iter = _files.find(file_key); |
1251 | 7 | std::vector<FileBlockCell*> to_remove; |
1252 | 7 | if (iter != _files.end()) { |
1253 | 14 | for (auto& [_, cell] : iter->second) { |
1254 | 14 | if (cell.releasable()) { |
1255 | 13 | to_remove.push_back(&cell); |
1256 | 13 | } else { |
1257 | 1 | cell.file_block->set_deleting(); |
1258 | 1 | } |
1259 | 14 | } |
1260 | 6 | } |
1261 | 7 | remove_file_blocks(to_remove, cache_lock, true, reason); |
1262 | 7 | } |
1263 | | |
1264 | | // the async version of remove_if_cached, for background operations |
1265 | | // cache meta is deleted synchronously if not in use, and the block file is deleted asynchronously |
1266 | | // if in use, cache meta will be deleted after use and the block file is then deleted asynchronously |
1267 | 116k | void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) { |
1268 | 116k | std::string reason = "remove_if_cached_async"; |
1269 | 116k | SCOPED_CACHE_LOCK(_mutex, this); |
1270 | | |
1271 | 116k | auto iter = _files.find(file_key); |
1272 | 116k | std::vector<FileBlockCell*> to_remove; |
1273 | 116k | if (iter != _files.end()) { |
1274 | 6.27k | for (auto& [_, cell] : iter->second) { |
1275 | 6.27k | *_gc_evict_bytes_metrics << cell.size(); |
1276 | 6.27k | *_gc_evict_count_metrics << 1; |
1277 | 6.27k | if (cell.releasable()) { |
1278 | 3.26k | to_remove.push_back(&cell); |
1279 | 3.26k | } else { |
1280 | 3.00k | cell.file_block->set_deleting(); |
1281 | 3.00k | } |
1282 | 6.27k | } |
1283 | 6.12k | } |
1284 | 116k | remove_file_blocks(to_remove, cache_lock, false, reason); |
1285 | 116k | } |
1286 | | |
1287 | | std::vector<FileCacheType> BlockFileCache::get_other_cache_type_without_ttl( |
1288 | 149k | FileCacheType cur_cache_type) { |
1289 | 149k | switch (cur_cache_type) { |
1290 | 4.16k | case FileCacheType::TTL: |
1291 | 4.16k | return {FileCacheType::DISPOSABLE, FileCacheType::COLD_NORMAL, FileCacheType::NORMAL, |
1292 | 4.16k | FileCacheType::INDEX}; |
1293 | 24.2k | case FileCacheType::INDEX: |
1294 | 24.2k | return {FileCacheType::DISPOSABLE, FileCacheType::COLD_NORMAL, FileCacheType::NORMAL}; |
1295 | 117k | case FileCacheType::NORMAL: |
1296 | 117k | return {FileCacheType::DISPOSABLE, FileCacheType::COLD_NORMAL, FileCacheType::INDEX}; |
1297 | 44 | case FileCacheType::COLD_NORMAL: |
1298 | 44 | return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX}; |
1299 | 3.81k | case FileCacheType::DISPOSABLE: |
1300 | 3.81k | return {FileCacheType::COLD_NORMAL, FileCacheType::NORMAL, FileCacheType::INDEX}; |
1301 | 0 | default: |
1302 | 0 | return {}; |
1303 | 149k | } |
1304 | 0 | return {}; |
1305 | 149k | } |
1306 | | |
1307 | 5.34k | std::vector<FileCacheType> BlockFileCache::get_other_cache_type(FileCacheType cur_cache_type) { |
1308 | 5.34k | switch (cur_cache_type) { |
1309 | 535 | case FileCacheType::TTL: |
1310 | 535 | return {FileCacheType::DISPOSABLE, FileCacheType::COLD_NORMAL, FileCacheType::NORMAL, |
1311 | 535 | FileCacheType::INDEX}; |
1312 | 862 | case FileCacheType::INDEX: |
1313 | 862 | return {FileCacheType::DISPOSABLE, FileCacheType::COLD_NORMAL, FileCacheType::NORMAL, |
1314 | 862 | FileCacheType::TTL}; |
1315 | 3.74k | case FileCacheType::NORMAL: |
1316 | 3.74k | return {FileCacheType::DISPOSABLE, FileCacheType::COLD_NORMAL, FileCacheType::INDEX, |
1317 | 3.74k | FileCacheType::TTL}; |
1318 | 0 | case FileCacheType::COLD_NORMAL: |
1319 | 0 | return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX, |
1320 | 0 | FileCacheType::TTL}; |
1321 | 202 | case FileCacheType::DISPOSABLE: |
1322 | 202 | return {FileCacheType::COLD_NORMAL, FileCacheType::NORMAL, FileCacheType::INDEX, |
1323 | 202 | FileCacheType::TTL}; |
1324 | 0 | default: |
1325 | 0 | return {}; |
1326 | 5.34k | } |
1327 | 0 | return {}; |
1328 | 5.34k | } |
1329 | | |
1330 | | void BlockFileCache::reset_range(const UInt128Wrapper& hash, size_t offset, size_t old_size, |
1331 | 7.07k | size_t new_size, std::lock_guard<std::mutex>& cache_lock) { |
1332 | 7.07k | DCHECK(_files.find(hash) != _files.end() && |
1333 | 7.07k | _files.find(hash)->second.find(offset) != _files.find(hash)->second.end()); |
1334 | 7.07k | FileBlockCell* cell = get_cell(hash, offset, cache_lock); |
1335 | 7.07k | DCHECK(cell != nullptr); |
1336 | 7.07k | if (cell == nullptr) { |
1337 | 0 | LOG(WARNING) << "reset_range skipped because cache cell is missing. hash=" |
1338 | 0 | << hash.to_string() << " offset=" << offset << " old_size=" << old_size |
1339 | 0 | << " new_size=" << new_size; |
1340 | 0 | return; |
1341 | 0 | } |
1342 | 7.07k | if (cell->queue_iterator) { |
1343 | 7.07k | auto& queue = get_queue(cell->file_block->cache_type()); |
1344 | 7.07k | DCHECK(queue.contains(hash, offset, cache_lock)); |
1345 | 7.07k | queue.resize(*cell->queue_iterator, new_size, cache_lock); |
1346 | 7.07k | _lru_recorder->record_queue_event(cell->file_block->cache_type(), CacheLRULogType::RESIZE, |
1347 | 7.07k | cell->file_block->get_hash_value(), |
1348 | 7.07k | cell->file_block->offset(), new_size); |
1349 | 7.07k | } |
1350 | 7.07k | _cur_cache_size -= old_size; |
1351 | 7.07k | _cur_cache_size += new_size; |
1352 | 7.07k | } |
1353 | | |
1354 | | bool BlockFileCache::try_reserve_from_other_queue_by_time_interval( |
1355 | | FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size, |
1356 | 149k | int64_t cur_time, std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) { |
1357 | 149k | size_t removed_size = 0; |
1358 | 149k | size_t cur_cache_size = _cur_cache_size; |
1359 | 149k | std::vector<FileBlockCell*> to_evict; |
1360 | 452k | for (FileCacheType cache_type : other_cache_types) { |
1361 | 452k | auto& queue = get_queue(cache_type); |
1362 | 452k | size_t remove_size_per_type = 0; |
1363 | 452k | for (const auto& [entry_key, entry_offset, entry_size] : queue) { |
1364 | 270k | if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) { |
1365 | 263k | break; |
1366 | 263k | } |
1367 | 6.31k | auto* cell = get_cell(entry_key, entry_offset, cache_lock); |
1368 | 6.31k | DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string() |
1369 | 0 | << ", offset: " << entry_offset; |
1370 | | |
1371 | 6.31k | size_t cell_size = cell->size(); |
1372 | 6.31k | DCHECK(entry_size == cell_size); |
1373 | | |
1374 | 6.31k | if (cell->atime == 0 ? true |
1375 | 6.31k | : cell->atime + queue.get_hot_data_interval() * 1000 > cur_time) { |
1376 | 6.31k | break; |
1377 | 6.31k | } |
1378 | | |
1379 | 2 | if (cell->releasable()) { |
1380 | 2 | auto& file_block = cell->file_block; |
1381 | 2 | std::lock_guard block_lock(file_block->_mutex); |
1382 | 2 | DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED); |
1383 | 2 | to_evict.push_back(cell); |
1384 | 2 | removed_size += cell_size; |
1385 | 2 | remove_size_per_type += cell_size; |
1386 | 2 | } |
1387 | 2 | } |
1388 | 452k | *(_evict_by_time_metrics_matrix[cache_type][cur_type]) << remove_size_per_type; |
1389 | 452k | } |
1390 | 149k | bool is_sync_removal = !evict_in_advance; |
1391 | 149k | std::string reason = std::string("try_reserve_by_time ") + |
1392 | 149k | " evict_in_advance=" + (evict_in_advance ? "true" : "false"); |
1393 | 149k | remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason); |
1394 | | |
1395 | 149k | return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance); |
1396 | 149k | } |
1397 | | |
1398 | | bool BlockFileCache::is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size, |
1399 | 1.17M | bool evict_in_advance) const { |
1400 | 1.17M | bool ret = false; |
1401 | 1.17M | if (evict_in_advance) { // we don't need to check _need_evict_cache_in_advance |
1402 | 96.8k | ret = (removed_size < need_size); |
1403 | 96.8k | return ret; |
1404 | 96.8k | } |
1405 | 1.07M | if (_disk_resource_limit_mode) { |
1406 | 4 | ret = (removed_size < need_size); |
1407 | 1.07M | } else { |
1408 | 1.07M | ret = (cur_cache_size + need_size - removed_size > _capacity); |
1409 | 1.07M | } |
1410 | 1.07M | return ret; |
1411 | 1.17M | } |
1412 | | |
1413 | | bool BlockFileCache::try_reserve_from_other_queue_by_size( |
1414 | | FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size, |
1415 | 915 | std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) { |
1416 | 915 | size_t removed_size = 0; |
1417 | 915 | size_t cur_cache_size = _cur_cache_size; |
1418 | 915 | std::vector<FileBlockCell*> to_evict; |
1419 | | // we follow the privilege defined in get_other_cache_types to evict |
1420 | 3.66k | for (FileCacheType cache_type : other_cache_types) { |
1421 | 3.66k | auto& queue = get_queue(cache_type); |
1422 | | |
1423 | | // we will not drain each of them to the bottom -- i.e., we only |
1424 | | // evict what they have stolen. |
1425 | 3.66k | size_t cur_queue_size = queue.get_capacity(cache_lock); |
1426 | 3.66k | size_t cur_queue_max_size = queue.get_max_size(); |
1427 | 3.66k | if (cur_queue_size <= cur_queue_max_size) { |
1428 | 2.76k | continue; |
1429 | 2.76k | } |
1430 | 896 | size_t cur_removed_size = 0; |
1431 | 896 | find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock, |
1432 | 896 | cur_removed_size, evict_in_advance); |
1433 | 896 | *(_evict_by_size_metrics_matrix[cache_type][cur_type]) << cur_removed_size; |
1434 | 896 | } |
1435 | 915 | bool is_sync_removal = !evict_in_advance; |
1436 | 915 | std::string reason = std::string("try_reserve_by_size") + |
1437 | 915 | " evict_in_advance=" + (evict_in_advance ? "true" : "false"); |
1438 | 915 | remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason); |
1439 | 915 | return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance); |
1440 | 915 | } |
1441 | | |
1442 | | bool BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type, size_t size, |
1443 | | int64_t cur_time, |
1444 | | std::lock_guard<std::mutex>& cache_lock, |
1445 | 149k | bool evict_in_advance) { |
1446 | | // currently, TTL cache is not considered as a candidate |
1447 | 149k | auto other_cache_types = get_other_cache_type_without_ttl(cur_cache_type); |
1448 | 149k | bool reserve_success = try_reserve_from_other_queue_by_time_interval( |
1449 | 149k | cur_cache_type, other_cache_types, size, cur_time, cache_lock, evict_in_advance); |
1450 | 149k | if (reserve_success || !config::file_cache_enable_evict_from_other_queue_by_size) { |
1451 | 143k | return reserve_success; |
1452 | 143k | } |
1453 | | |
1454 | 5.34k | other_cache_types = get_other_cache_type(cur_cache_type); |
1455 | 5.34k | auto& cur_queue = get_queue(cur_cache_type); |
1456 | 5.34k | size_t cur_queue_size = cur_queue.get_capacity(cache_lock); |
1457 | 5.34k | size_t cur_queue_max_size = cur_queue.get_max_size(); |
1458 | | // Hit the soft limit by self, cannot remove from other queues |
1459 | 5.34k | if (_cur_cache_size + size > _capacity && cur_queue_size + size > cur_queue_max_size) { |
1460 | 4.43k | return false; |
1461 | 4.43k | } |
1462 | 913 | return try_reserve_from_other_queue_by_size(cur_cache_type, other_cache_types, size, cache_lock, |
1463 | 913 | evict_in_advance); |
1464 | 5.34k | } |
1465 | | |
1466 | | bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash, |
1467 | | QueryFileCacheContextPtr query_context, |
1468 | | const CacheContext& context, size_t offset, size_t size, |
1469 | | std::lock_guard<std::mutex>& cache_lock, |
1470 | 149k | bool evict_in_advance) { |
1471 | 149k | int64_t cur_time = std::chrono::duration_cast<std::chrono::milliseconds>( |
1472 | 149k | std::chrono::steady_clock::now().time_since_epoch()) |
1473 | 149k | .count(); |
1474 | 149k | if (!try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock, |
1475 | 149k | evict_in_advance)) { |
1476 | 4.76k | auto& queue = get_queue(context.cache_type); |
1477 | 4.76k | size_t removed_size = 0; |
1478 | 4.76k | size_t cur_cache_size = _cur_cache_size; |
1479 | | |
1480 | 4.76k | std::vector<FileBlockCell*> to_evict; |
1481 | 4.76k | size_t cur_removed_size = 0; |
1482 | 4.76k | find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock, |
1483 | 4.76k | cur_removed_size, evict_in_advance); |
1484 | 4.76k | bool is_sync_removal = !evict_in_advance; |
1485 | 4.76k | std::string reason = std::string("try_reserve for cache type ") + |
1486 | 4.76k | cache_type_to_string(context.cache_type) + |
1487 | 4.76k | " evict_in_advance=" + (evict_in_advance ? "true" : "false"); |
1488 | 4.76k | remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason); |
1489 | 4.76k | *(_evict_by_self_lru_metrics_matrix[context.cache_type]) << cur_removed_size; |
1490 | | |
1491 | 4.76k | if (is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) { |
1492 | 1.73k | return false; |
1493 | 1.73k | } |
1494 | 4.76k | } |
1495 | | |
1496 | 147k | if (query_context) { |
1497 | 16 | query_context->reserve(hash, offset, size, cache_lock); |
1498 | 16 | } |
1499 | 147k | return true; |
1500 | 149k | } |
1501 | | |
1502 | | template <class T, class U> |
1503 | | requires IsXLock<T> && IsXLock<U> |
1504 | 141k | void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lock, bool sync) { |
1505 | 141k | auto hash = file_block->get_hash_value(); |
1506 | 141k | auto offset = file_block->offset(); |
1507 | 141k | auto type = file_block->cache_type(); |
1508 | 141k | auto expiration_time = file_block->expiration_time(); |
1509 | 141k | auto tablet_id = file_block->tablet_id(); |
1510 | 141k | auto* cell = get_cell(hash, offset, cache_lock); |
1511 | 141k | file_block->cell = nullptr; |
1512 | 141k | DCHECK(cell); |
1513 | 141k | DCHECK(cell->queue_iterator); |
1514 | 141k | if (cell->queue_iterator) { |
1515 | 141k | auto& queue = get_queue(file_block->cache_type()); |
1516 | 141k | queue.remove(*cell->queue_iterator, cache_lock); |
1517 | 141k | _lru_recorder->record_queue_event(file_block->cache_type(), CacheLRULogType::REMOVE, |
1518 | 141k | cell->file_block->get_hash_value(), |
1519 | 141k | cell->file_block->offset(), cell->size()); |
1520 | 141k | } |
1521 | 141k | *_queue_evict_size_metrics[static_cast<int>(file_block->cache_type())] |
1522 | 141k | << file_block->range().size(); |
1523 | 141k | *_total_evict_size_metrics << file_block->range().size(); |
1524 | | |
1525 | 141k | VLOG_DEBUG << "Removing file block from cache. hash: " << hash.to_string() |
1526 | 0 | << ", offset: " << offset << ", size: " << file_block->range().size() |
1527 | 0 | << ", type: " << cache_type_to_string(type); |
1528 | | |
1529 | 141k | if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADED) { |
1530 | 111k | FileCacheKey key; |
1531 | 111k | key.hash = hash; |
1532 | 111k | key.offset = offset; |
1533 | 111k | key.meta.type = type; |
1534 | 111k | key.meta.expiration_time = expiration_time; |
1535 | 111k | key.meta.tablet_id = tablet_id; |
1536 | 111k | if (sync) { |
1537 | 12.2k | int64_t duration_ns = 0; |
1538 | 12.2k | Status st; |
1539 | 12.2k | { |
1540 | 12.2k | SCOPED_RAW_TIMER(&duration_ns); |
1541 | 12.2k | st = _storage->remove(key); |
1542 | 12.2k | } |
1543 | 12.2k | *_storage_sync_remove_latency_us << (duration_ns / 1000); |
1544 | 12.2k | if (!st.ok()) { |
1545 | 261 | LOG_WARNING("").error(st); |
1546 | 261 | } |
1547 | 99.0k | } else { |
1548 | | // the file will be deleted in the bottom half |
1549 | | // so there will be a window that the file is not in the cache but still in the storage |
1550 | | // but it's ok, because the rowset is stale already |
1551 | 99.0k | bool ret = _recycle_keys.enqueue(key); |
1552 | 99.0k | if (ret) [[likely]] { |
1553 | 99.0k | *_recycle_keys_length_recorder << _recycle_keys.size_approx(); |
1554 | 99.0k | } else { |
1555 | 0 | LOG_WARNING("Failed to push recycle key to queue, do it synchronously"); |
1556 | 0 | int64_t duration_ns = 0; |
1557 | 0 | Status st; |
1558 | 0 | { |
1559 | 0 | SCOPED_RAW_TIMER(&duration_ns); |
1560 | 0 | st = _storage->remove(key); |
1561 | 0 | } |
1562 | 0 | *_storage_retry_sync_remove_latency_us << (duration_ns / 1000); |
1563 | 0 | if (!st.ok()) { |
1564 | 0 | LOG_WARNING("").error(st); |
1565 | 0 | } |
1566 | 0 | } |
1567 | 99.0k | } |
1568 | 111k | } else if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADING) { |
1569 | 0 | file_block->set_deleting(); |
1570 | 0 | return; |
1571 | 0 | } |
1572 | 141k | _cur_cache_size -= file_block->range().size(); |
1573 | 141k | if (FileCacheType::TTL == type) { |
1574 | 1.29k | _cur_ttl_size -= file_block->range().size(); |
1575 | 1.29k | } |
1576 | 141k | auto it = _files.find(hash); |
1577 | 141k | if (it != _files.end()) { |
1578 | 141k | it->second.erase(file_block->offset()); |
1579 | 141k | if (it->second.empty()) { |
1580 | 105k | _files.erase(hash); |
1581 | 105k | } |
1582 | 141k | } |
1583 | 141k | *_num_removed_blocks << 1; |
1584 | 141k | } |
1585 | | |
1586 | 46 | size_t BlockFileCache::get_used_cache_size(FileCacheType cache_type) const { |
1587 | 46 | SCOPED_CACHE_LOCK(_mutex, this); |
1588 | 46 | return get_used_cache_size_unlocked(cache_type, cache_lock); |
1589 | 46 | } |
1590 | | |
1591 | | size_t BlockFileCache::get_used_cache_size_unlocked(FileCacheType cache_type, |
1592 | 46 | std::lock_guard<std::mutex>& cache_lock) const { |
1593 | 46 | return get_queue(cache_type).get_capacity(cache_lock); |
1594 | 46 | } |
1595 | | |
1596 | 0 | size_t BlockFileCache::get_available_cache_size(FileCacheType cache_type) const { |
1597 | 0 | SCOPED_CACHE_LOCK(_mutex, this); |
1598 | 0 | return get_available_cache_size_unlocked(cache_type, cache_lock); |
1599 | 0 | } |
1600 | | |
1601 | | size_t BlockFileCache::get_available_cache_size_unlocked( |
1602 | 0 | FileCacheType cache_type, std::lock_guard<std::mutex>& cache_lock) const { |
1603 | 0 | return get_queue(cache_type).get_max_element_size() - |
1604 | 0 | get_used_cache_size_unlocked(cache_type, cache_lock); |
1605 | 0 | } |
1606 | | |
1607 | 88 | size_t BlockFileCache::get_file_blocks_num(FileCacheType cache_type) const { |
1608 | 88 | SCOPED_CACHE_LOCK(_mutex, this); |
1609 | 88 | return get_file_blocks_num_unlocked(cache_type, cache_lock); |
1610 | 88 | } |
1611 | | |
1612 | | size_t BlockFileCache::get_file_blocks_num_unlocked(FileCacheType cache_type, |
1613 | 88 | std::lock_guard<std::mutex>& cache_lock) const { |
1614 | 88 | return get_queue(cache_type).get_elements_num(cache_lock); |
1615 | 88 | } |
1616 | | |
1617 | | FileBlockCell::FileBlockCell(FileBlockSPtr file_block, std::lock_guard<std::mutex>& cache_lock) |
1618 | 147k | : file_block(file_block) { |
1619 | 147k | file_block->cell = this; |
1620 | | /** |
1621 | | * Cell can be created with either DOWNLOADED or EMPTY file block's state. |
1622 | | * File block acquires DOWNLOADING state and creates LRUQueue iterator on first |
1623 | | * successful getOrSetDownaloder call. |
1624 | | */ |
1625 | | |
1626 | 147k | switch (file_block->_download_state) { |
1627 | 222 | case FileBlock::State::DOWNLOADED: |
1628 | 147k | case FileBlock::State::EMPTY: |
1629 | 147k | case FileBlock::State::SKIP_CACHE: { |
1630 | 147k | break; |
1631 | 147k | } |
1632 | 0 | default: |
1633 | 0 | DCHECK(false) << "Can create cell with either EMPTY, DOWNLOADED, SKIP_CACHE state, got: " |
1634 | 0 | << FileBlock::state_to_string(file_block->_download_state); |
1635 | 147k | } |
1636 | 147k | if (file_block->cache_type() == FileCacheType::TTL) { |
1637 | 3.92k | update_atime(); |
1638 | 3.92k | } |
1639 | 147k | } |
1640 | | |
1641 | | LRUQueue::Iterator LRUQueue::add(const UInt128Wrapper& hash, size_t offset, size_t size, |
1642 | 287k | std::lock_guard<std::mutex>& /* cache_lock */) { |
1643 | 287k | cache_size += size; |
1644 | 287k | auto iter = queue.insert(queue.end(), FileKeyAndOffset(hash, offset, size)); |
1645 | 287k | map.insert(std::make_pair(std::make_pair(hash, offset), iter)); |
1646 | 287k | return iter; |
1647 | 287k | } |
1648 | | |
1649 | 0 | void LRUQueue::remove_all(std::lock_guard<std::mutex>& /* cache_lock */) { |
1650 | 0 | queue.clear(); |
1651 | 0 | map.clear(); |
1652 | 0 | cache_size = 0; |
1653 | 0 | } |
1654 | | |
1655 | 6.07M | void LRUQueue::move_to_end(Iterator queue_it, std::lock_guard<std::mutex>& /* cache_lock */) { |
1656 | 6.07M | queue.splice(queue.end(), queue, queue_it); |
1657 | 6.07M | } |
1658 | | |
1659 | | void LRUQueue::resize(Iterator queue_it, size_t new_size, |
1660 | 14.1k | std::lock_guard<std::mutex>& /* cache_lock */) { |
1661 | 14.1k | cache_size -= queue_it->size; |
1662 | 14.1k | queue_it->size = new_size; |
1663 | 14.1k | cache_size += new_size; |
1664 | 14.1k | } |
1665 | | bool LRUQueue::contains(const UInt128Wrapper& hash, size_t offset, |
1666 | 7.07k | std::lock_guard<std::mutex>& /* cache_lock */) const { |
1667 | 7.07k | return map.find(std::make_pair(hash, offset)) != map.end(); |
1668 | 7.07k | } |
1669 | | |
1670 | | LRUQueue::Iterator LRUQueue::get(const UInt128Wrapper& hash, size_t offset, |
1671 | 3.43M | std::lock_guard<std::mutex>& /* cache_lock */) const { |
1672 | 3.43M | auto itr = map.find(std::make_pair(hash, offset)); |
1673 | 3.43M | if (itr != map.end()) { |
1674 | 2.93M | return itr->second; |
1675 | 2.93M | } |
1676 | 500k | return std::list<FileKeyAndOffset>::iterator(); |
1677 | 3.43M | } |
1678 | | |
1679 | 2 | std::string LRUQueue::to_string(std::lock_guard<std::mutex>& /* cache_lock */) const { |
1680 | 2 | std::string result; |
1681 | 18 | for (const auto& [hash, offset, size] : queue) { |
1682 | 18 | if (!result.empty()) { |
1683 | 16 | result += ", "; |
1684 | 16 | } |
1685 | 18 | result += fmt::format("{}: [{}, {}]", hash.to_string(), offset, offset + size - 1); |
1686 | 18 | } |
1687 | 2 | return result; |
1688 | 2 | } |
1689 | | |
1690 | | size_t LRUQueue::levenshtein_distance_from(LRUQueue& base, |
1691 | 5 | std::lock_guard<std::mutex>& cache_lock) { |
1692 | 5 | std::list<FileKeyAndOffset> target_queue = this->queue; |
1693 | 5 | std::list<FileKeyAndOffset> base_queue = base.queue; |
1694 | 5 | std::vector<FileKeyAndOffset> vec1(target_queue.begin(), target_queue.end()); |
1695 | 5 | std::vector<FileKeyAndOffset> vec2(base_queue.begin(), base_queue.end()); |
1696 | | |
1697 | 5 | size_t m = vec1.size(); |
1698 | 5 | size_t n = vec2.size(); |
1699 | | |
1700 | | // Create a 2D vector (matrix) to store the Levenshtein distances |
1701 | | // dp[i][j] will hold the distance between the first i elements of vec1 and the first j elements of vec2 |
1702 | 5 | std::vector<std::vector<size_t>> dp(m + 1, std::vector<size_t>(n + 1, 0)); |
1703 | | |
1704 | | // Initialize the first row and column of the matrix |
1705 | | // The distance between an empty list and a list of length k is k (all insertions or deletions) |
1706 | 22 | for (size_t i = 0; i <= m; ++i) { |
1707 | 17 | dp[i][0] = i; |
1708 | 17 | } |
1709 | 20 | for (size_t j = 0; j <= n; ++j) { |
1710 | 15 | dp[0][j] = j; |
1711 | 15 | } |
1712 | | |
1713 | | // Fill the matrix using dynamic programming |
1714 | 17 | for (size_t i = 1; i <= m; ++i) { |
1715 | 38 | for (size_t j = 1; j <= n; ++j) { |
1716 | | // Check if the current elements of both vectors are equal |
1717 | 26 | size_t cost = (vec1[i - 1].hash == vec2[j - 1].hash && |
1718 | 26 | vec1[i - 1].offset == vec2[j - 1].offset) |
1719 | 26 | ? 0 |
1720 | 26 | : 1; |
1721 | | // Calculate the minimum cost of three possible operations: |
1722 | | // 1. Insertion: dp[i][j-1] + 1 |
1723 | | // 2. Deletion: dp[i-1][j] + 1 |
1724 | | // 3. Substitution: dp[i-1][j-1] + cost (0 if elements are equal, 1 if not) |
1725 | 26 | dp[i][j] = std::min({dp[i - 1][j] + 1, dp[i][j - 1] + 1, dp[i - 1][j - 1] + cost}); |
1726 | 26 | } |
1727 | 12 | } |
1728 | | // The bottom-right cell of the matrix contains the Levenshtein distance |
1729 | 5 | return dp[m][n]; |
1730 | 5 | } |
1731 | | |
1732 | 1 | std::string BlockFileCache::dump_structure(const UInt128Wrapper& hash) { |
1733 | 1 | SCOPED_CACHE_LOCK(_mutex, this); |
1734 | 1 | return dump_structure_unlocked(hash, cache_lock); |
1735 | 1 | } |
1736 | | |
1737 | | std::string BlockFileCache::dump_structure_unlocked(const UInt128Wrapper& hash, |
1738 | 1 | std::lock_guard<std::mutex>&) { |
1739 | 1 | std::stringstream result; |
1740 | 1 | auto it = _files.find(hash); |
1741 | 1 | if (it == _files.end()) { |
1742 | 0 | return std::string(""); |
1743 | 0 | } |
1744 | 1 | const auto& cells_by_offset = it->second; |
1745 | | |
1746 | 1 | for (const auto& [_, cell] : cells_by_offset) { |
1747 | 1 | result << cell.file_block->get_info_for_log() << " " |
1748 | 1 | << cache_type_to_string(cell.file_block->cache_type()) << "\n"; |
1749 | 1 | } |
1750 | | |
1751 | 1 | return result.str(); |
1752 | 1 | } |
1753 | | |
1754 | 0 | std::string BlockFileCache::dump_single_cache_type(const UInt128Wrapper& hash, size_t offset) { |
1755 | 0 | SCOPED_CACHE_LOCK(_mutex, this); |
1756 | 0 | return dump_single_cache_type_unlocked(hash, offset, cache_lock); |
1757 | 0 | } |
1758 | | |
1759 | | std::string BlockFileCache::dump_single_cache_type_unlocked(const UInt128Wrapper& hash, |
1760 | | size_t offset, |
1761 | 0 | std::lock_guard<std::mutex>&) { |
1762 | 0 | std::stringstream result; |
1763 | 0 | auto it = _files.find(hash); |
1764 | 0 | if (it == _files.end()) { |
1765 | 0 | return std::string(""); |
1766 | 0 | } |
1767 | 0 | const auto& cells_by_offset = it->second; |
1768 | 0 | const auto& cell = cells_by_offset.find(offset); |
1769 | |
|
1770 | 0 | return cache_type_to_string(cell->second.file_block->cache_type()); |
1771 | 0 | } |
1772 | | |
1773 | | void BlockFileCache::change_cache_type(const UInt128Wrapper& hash, size_t offset, |
1774 | | FileCacheType new_type, |
1775 | 36 | std::lock_guard<std::mutex>& cache_lock) { |
1776 | 36 | if (auto iter = _files.find(hash); iter != _files.end()) { |
1777 | 36 | auto& file_blocks = iter->second; |
1778 | 36 | if (auto cell_it = file_blocks.find(offset); cell_it != file_blocks.end()) { |
1779 | 35 | FileBlockCell& cell = cell_it->second; |
1780 | 35 | auto& cur_queue = get_queue(cell.file_block->cache_type()); |
1781 | 35 | DCHECK(cell.queue_iterator.has_value()); |
1782 | 35 | cur_queue.remove(*cell.queue_iterator, cache_lock); |
1783 | 35 | _lru_recorder->record_queue_event( |
1784 | 35 | cell.file_block->cache_type(), CacheLRULogType::REMOVE, |
1785 | 35 | cell.file_block->get_hash_value(), cell.file_block->offset(), cell.size()); |
1786 | 35 | auto& new_queue = get_queue(new_type); |
1787 | 35 | cell.queue_iterator = |
1788 | 35 | new_queue.add(hash, offset, cell.file_block->range().size(), cache_lock); |
1789 | 35 | _lru_recorder->record_queue_event(new_type, CacheLRULogType::ADD, |
1790 | 35 | cell.file_block->get_hash_value(), |
1791 | 35 | cell.file_block->offset(), cell.size()); |
1792 | 35 | } |
1793 | 36 | } |
1794 | 36 | } |
1795 | | |
1796 | | // @brief: get a path's disk capacity used percent, inode used percent |
1797 | | // @param: path |
1798 | | // @param: percent.first disk used percent, percent.second inode used percent |
1799 | 8.33k | int disk_used_percentage(const std::string& path, std::pair<int, int>* percent) { |
1800 | 8.33k | struct statfs stat; |
1801 | 8.33k | int ret = statfs(path.c_str(), &stat); |
1802 | 8.33k | if (ret != 0) { |
1803 | 5 | return ret; |
1804 | 5 | } |
1805 | | // https://github.com/coreutils/coreutils/blob/master/src/df.c#L1195 |
1806 | | // v->used = stat.f_blocks - stat.f_bfree |
1807 | | // nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail |
1808 | 8.32k | uintmax_t u100 = (stat.f_blocks - stat.f_bfree) * 100; |
1809 | 8.32k | uintmax_t nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail; |
1810 | 8.32k | int capacity_percentage = int(u100 / nonroot_total + (u100 % nonroot_total != 0)); |
1811 | | |
1812 | 8.32k | unsigned long long inode_free = stat.f_ffree; |
1813 | 8.32k | unsigned long long inode_total = stat.f_files; |
1814 | 8.32k | int inode_percentage = cast_set<int>(inode_free * 100 / inode_total); |
1815 | 8.32k | percent->first = capacity_percentage; |
1816 | 8.32k | percent->second = 100 - inode_percentage; |
1817 | | |
1818 | | // Add sync point for testing |
1819 | 8.32k | TEST_SYNC_POINT_CALLBACK("BlockFileCache::disk_used_percentage:1", percent); |
1820 | | |
1821 | 8.32k | return 0; |
1822 | 8.33k | } |
1823 | | |
1824 | 10 | std::string BlockFileCache::reset_capacity(size_t new_capacity) { |
1825 | 10 | using namespace std::chrono; |
1826 | 10 | int64_t space_released = 0; |
1827 | 10 | size_t old_capacity = 0; |
1828 | 10 | std::stringstream ss; |
1829 | 10 | ss << "finish reset_capacity, path=" << _cache_base_path; |
1830 | 10 | auto adjust_start_time = steady_clock::time_point(); |
1831 | 10 | { |
1832 | 10 | SCOPED_CACHE_LOCK(_mutex, this); |
1833 | 10 | if (new_capacity < _capacity && new_capacity < _cur_cache_size) { |
1834 | 1 | int64_t need_remove_size = _cur_cache_size - new_capacity; |
1835 | 5 | auto remove_blocks = [&](LRUQueue& queue) -> int64_t { |
1836 | 5 | int64_t queue_released = 0; |
1837 | 5 | std::vector<FileBlockCell*> to_evict; |
1838 | 13 | for (const auto& [entry_key, entry_offset, entry_size] : queue) { |
1839 | 13 | if (need_remove_size <= 0) { |
1840 | 1 | break; |
1841 | 1 | } |
1842 | 12 | need_remove_size -= entry_size; |
1843 | 12 | space_released += entry_size; |
1844 | 12 | queue_released += entry_size; |
1845 | 12 | auto* cell = get_cell(entry_key, entry_offset, cache_lock); |
1846 | 12 | if (!cell->releasable()) { |
1847 | 0 | cell->file_block->set_deleting(); |
1848 | 0 | continue; |
1849 | 0 | } |
1850 | 12 | to_evict.push_back(cell); |
1851 | 12 | } |
1852 | 12 | for (auto& cell : to_evict) { |
1853 | 12 | FileBlockSPtr file_block = cell->file_block; |
1854 | 12 | std::lock_guard block_lock(file_block->_mutex); |
1855 | 12 | remove(file_block, cache_lock, block_lock); |
1856 | 12 | } |
1857 | 5 | return queue_released; |
1858 | 5 | }; |
1859 | 1 | int64_t queue_released = remove_blocks(_disposable_queue); |
1860 | 1 | ss << " disposable_queue released " << queue_released; |
1861 | 1 | queue_released = remove_blocks(_normal_queue); |
1862 | 1 | ss << " normal_queue released " << queue_released; |
1863 | 1 | queue_released = remove_blocks(_index_queue); |
1864 | 1 | ss << " index_queue released " << queue_released; |
1865 | 1 | queue_released = remove_blocks(_ttl_queue); |
1866 | 1 | ss << " ttl_queue released " << queue_released; |
1867 | 1 | queue_released = remove_blocks(_cold_normal_queue); |
1868 | 1 | ss << " cold_normal_queue released " << queue_released; |
1869 | | |
1870 | 1 | _disk_resource_limit_mode = true; |
1871 | 1 | _disk_limit_mode_metrics->set_value(1); |
1872 | 1 | ss << " total_space_released=" << space_released; |
1873 | 1 | } |
1874 | 10 | old_capacity = _capacity; |
1875 | 10 | _capacity = new_capacity; |
1876 | 10 | _cache_capacity_metrics->set_value(_capacity); |
1877 | 10 | } |
1878 | 10 | auto use_time = duration_cast<milliseconds>(steady_clock::time_point() - adjust_start_time); |
1879 | 10 | LOG(INFO) << "Finish tag deleted block. path=" << _cache_base_path |
1880 | 10 | << " use_time=" << cast_set<int64_t>(use_time.count()); |
1881 | 10 | ss << " old_capacity=" << old_capacity << " new_capacity=" << new_capacity; |
1882 | 10 | LOG(INFO) << ss.str(); |
1883 | 10 | return ss.str(); |
1884 | 10 | } |
1885 | | |
1886 | 5.16k | void BlockFileCache::check_disk_resource_limit() { |
1887 | 5.16k | if (_storage->get_type() != FileCacheStorageType::DISK) { |
1888 | 930 | return; |
1889 | 930 | } |
1890 | | |
1891 | 4.23k | bool previous_mode = _disk_resource_limit_mode; |
1892 | 4.23k | if (_capacity > _cur_cache_size) { |
1893 | 4.23k | _disk_resource_limit_mode = false; |
1894 | 4.23k | _disk_limit_mode_metrics->set_value(0); |
1895 | 4.23k | } |
1896 | 4.23k | std::pair<int, int> percent; |
1897 | 4.23k | int ret = disk_used_percentage(_cache_base_path, &percent); |
1898 | 4.23k | if (ret != 0) { |
1899 | 4 | LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno)); |
1900 | 4 | return; |
1901 | 4 | } |
1902 | 4.23k | auto [space_percentage, inode_percentage] = percent; |
1903 | 8.46k | auto is_insufficient = [](const int& percentage) { |
1904 | 8.46k | return percentage >= config::file_cache_enter_disk_resource_limit_mode_percent; |
1905 | 8.46k | }; |
1906 | 4.23k | DCHECK_GE(space_percentage, 0); |
1907 | 4.23k | DCHECK_LE(space_percentage, 100); |
1908 | 4.23k | DCHECK_GE(inode_percentage, 0); |
1909 | 4.23k | DCHECK_LE(inode_percentage, 100); |
1910 | | // ATTN: due to that can be changed dynamically, set it to default value if it's invalid |
1911 | | // FIXME: reject with config validator |
1912 | 4.23k | if (config::file_cache_enter_disk_resource_limit_mode_percent < |
1913 | 4.23k | config::file_cache_exit_disk_resource_limit_mode_percent) { |
1914 | 1 | LOG_WARNING("config error, set to default value") |
1915 | 1 | .tag("enter", config::file_cache_enter_disk_resource_limit_mode_percent) |
1916 | 1 | .tag("exit", config::file_cache_exit_disk_resource_limit_mode_percent); |
1917 | 1 | config::file_cache_enter_disk_resource_limit_mode_percent = 88; |
1918 | 1 | config::file_cache_exit_disk_resource_limit_mode_percent = 80; |
1919 | 1 | } |
1920 | 4.23k | bool is_space_insufficient = is_insufficient(space_percentage); |
1921 | 4.23k | bool is_inode_insufficient = is_insufficient(inode_percentage); |
1922 | 4.23k | if (is_space_insufficient || is_inode_insufficient) { |
1923 | 5 | _disk_resource_limit_mode = true; |
1924 | 5 | _disk_limit_mode_metrics->set_value(1); |
1925 | 4.22k | } else if (_disk_resource_limit_mode && |
1926 | 4.22k | (space_percentage < config::file_cache_exit_disk_resource_limit_mode_percent) && |
1927 | 4.22k | (inode_percentage < config::file_cache_exit_disk_resource_limit_mode_percent)) { |
1928 | 0 | _disk_resource_limit_mode = false; |
1929 | 0 | _disk_limit_mode_metrics->set_value(0); |
1930 | 0 | } |
1931 | 4.23k | if (previous_mode != _disk_resource_limit_mode) { |
1932 | | // add log for disk resource limit mode switching |
1933 | 10 | if (_disk_resource_limit_mode) { |
1934 | 5 | LOG(WARNING) << "Entering disk resource limit mode: file_cache=" << get_base_path() |
1935 | 5 | << " space_percent=" << space_percentage |
1936 | 5 | << " inode_percent=" << inode_percentage |
1937 | 5 | << " is_space_insufficient=" << is_space_insufficient |
1938 | 5 | << " is_inode_insufficient=" << is_inode_insufficient |
1939 | 5 | << " enter threshold=" |
1940 | 5 | << config::file_cache_enter_disk_resource_limit_mode_percent; |
1941 | 5 | } else { |
1942 | 5 | LOG(INFO) << "Exiting disk resource limit mode: file_cache=" << get_base_path() |
1943 | 5 | << " space_percent=" << space_percentage |
1944 | 5 | << " inode_percent=" << inode_percentage << " exit threshold=" |
1945 | 5 | << config::file_cache_exit_disk_resource_limit_mode_percent; |
1946 | 5 | } |
1947 | 4.22k | } else if (_disk_resource_limit_mode) { |
1948 | | // print log for disk resource limit mode running, but less frequently |
1949 | 0 | LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path() |
1950 | 0 | << " space_percent=" << space_percentage |
1951 | 0 | << " inode_percent=" << inode_percentage |
1952 | 0 | << " is_space_insufficient=" << is_space_insufficient |
1953 | 0 | << " is_inode_insufficient=" << is_inode_insufficient |
1954 | 0 | << " mode run in resource limit"; |
1955 | 0 | } |
1956 | 4.23k | } |
1957 | | |
1958 | 4.09k | void BlockFileCache::check_need_evict_cache_in_advance() { |
1959 | 4.09k | if (_storage->get_type() != FileCacheStorageType::DISK) { |
1960 | 1 | return; |
1961 | 1 | } |
1962 | | |
1963 | 4.09k | std::pair<int, int> percent; |
1964 | 4.09k | int ret = disk_used_percentage(_cache_base_path, &percent); |
1965 | 4.09k | if (ret != 0) { |
1966 | 1 | LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno)); |
1967 | 1 | return; |
1968 | 1 | } |
1969 | 4.09k | auto [space_percentage, inode_percentage] = percent; |
1970 | 4.09k | int size_percentage = static_cast<int>(_cur_cache_size * 100 / _capacity); |
1971 | 12.2k | auto is_insufficient = [](const int& percentage) { |
1972 | 12.2k | return percentage >= config::file_cache_enter_need_evict_cache_in_advance_percent; |
1973 | 12.2k | }; |
1974 | 4.09k | DCHECK_GE(space_percentage, 0); |
1975 | 4.09k | DCHECK_LE(space_percentage, 100); |
1976 | 4.09k | DCHECK_GE(inode_percentage, 0); |
1977 | 4.09k | DCHECK_LE(inode_percentage, 100); |
1978 | | // ATTN: due to that can be changed dynamically, set it to default value if it's invalid |
1979 | | // FIXME: reject with config validator |
1980 | 4.09k | if (config::file_cache_enter_need_evict_cache_in_advance_percent <= |
1981 | 4.09k | config::file_cache_exit_need_evict_cache_in_advance_percent) { |
1982 | 1 | LOG_WARNING("config error, set to default value") |
1983 | 1 | .tag("enter", config::file_cache_enter_need_evict_cache_in_advance_percent) |
1984 | 1 | .tag("exit", config::file_cache_exit_need_evict_cache_in_advance_percent); |
1985 | 1 | config::file_cache_enter_need_evict_cache_in_advance_percent = 78; |
1986 | 1 | config::file_cache_exit_need_evict_cache_in_advance_percent = 75; |
1987 | 1 | } |
1988 | 4.09k | bool previous_mode = _need_evict_cache_in_advance; |
1989 | 4.09k | bool is_space_insufficient = is_insufficient(space_percentage); |
1990 | 4.09k | bool is_inode_insufficient = is_insufficient(inode_percentage); |
1991 | 4.09k | bool is_size_insufficient = is_insufficient(size_percentage); |
1992 | 4.09k | if (is_space_insufficient || is_inode_insufficient || is_size_insufficient) { |
1993 | 69 | _need_evict_cache_in_advance = true; |
1994 | 69 | _need_evict_cache_in_advance_metrics->set_value(1); |
1995 | 4.02k | } else if (_need_evict_cache_in_advance && |
1996 | 4.02k | (space_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) && |
1997 | 4.02k | (inode_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) && |
1998 | 4.02k | (size_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent)) { |
1999 | 58 | _need_evict_cache_in_advance = false; |
2000 | 58 | _need_evict_cache_in_advance_metrics->set_value(0); |
2001 | 58 | } |
2002 | 4.09k | if (previous_mode != _need_evict_cache_in_advance) { |
2003 | | // add log for evict cache in advance mode switching |
2004 | 120 | if (_need_evict_cache_in_advance) { |
2005 | 62 | LOG(WARNING) << "Entering evict cache in advance mode: " |
2006 | 62 | << "file_cache=" << get_base_path() |
2007 | 62 | << " space_percent=" << space_percentage |
2008 | 62 | << " inode_percent=" << inode_percentage |
2009 | 62 | << " size_percent=" << size_percentage |
2010 | 62 | << " is_space_insufficient=" << is_space_insufficient |
2011 | 62 | << " is_inode_insufficient=" << is_inode_insufficient |
2012 | 62 | << " is_size_insufficient=" << is_size_insufficient << " enter threshold=" |
2013 | 62 | << config::file_cache_enter_need_evict_cache_in_advance_percent; |
2014 | 62 | } else { |
2015 | 58 | LOG(INFO) << "Exiting evict cache in advance mode: " |
2016 | 58 | << "file_cache=" << get_base_path() << " space_percent=" << space_percentage |
2017 | 58 | << " inode_percent=" << inode_percentage |
2018 | 58 | << " size_percent=" << size_percentage << " exit threshold=" |
2019 | 58 | << config::file_cache_exit_need_evict_cache_in_advance_percent; |
2020 | 58 | } |
2021 | 3.97k | } else if (_need_evict_cache_in_advance) { |
2022 | | // print log for evict cache in advance mode running, but less frequently |
2023 | 7 | LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path() |
2024 | 2 | << " space_percent=" << space_percentage |
2025 | 2 | << " inode_percent=" << inode_percentage |
2026 | 2 | << " size_percent=" << size_percentage |
2027 | 2 | << " is_space_insufficient=" << is_space_insufficient |
2028 | 2 | << " is_inode_insufficient=" << is_inode_insufficient |
2029 | 2 | << " is_size_insufficient=" << is_size_insufficient |
2030 | 2 | << " need evict cache in advance"; |
2031 | 7 | } |
2032 | 4.09k | } |
2033 | | |
2034 | 152 | void BlockFileCache::run_background_monitor() { |
2035 | 152 | Thread::set_self_name("run_background_monitor"); |
2036 | 5.17k | while (!_close) { |
2037 | 5.16k | int64_t interval_ms = config::file_cache_background_monitor_interval_ms; |
2038 | 5.16k | TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_ms); |
2039 | 5.16k | check_disk_resource_limit(); |
2040 | 5.16k | if (config::enable_evict_file_cache_in_advance) { |
2041 | 4.08k | check_need_evict_cache_in_advance(); |
2042 | 4.08k | } else { |
2043 | 1.08k | _need_evict_cache_in_advance = false; |
2044 | 1.08k | _need_evict_cache_in_advance_metrics->set_value(0); |
2045 | 1.08k | } |
2046 | | |
2047 | 5.16k | { |
2048 | 5.16k | std::unique_lock close_lock(_close_mtx); |
2049 | 5.16k | _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms)); |
2050 | 5.16k | if (_close) { |
2051 | 149 | break; |
2052 | 149 | } |
2053 | 5.16k | } |
2054 | | // report |
2055 | 5.01k | { |
2056 | 5.01k | SCOPED_CACHE_LOCK(_mutex, this); |
2057 | 5.01k | _cur_cache_size_metrics->set_value(_cur_cache_size); |
2058 | 5.01k | _cur_ttl_cache_size_metrics->set_value(_cur_cache_size - |
2059 | 5.01k | _index_queue.get_capacity(cache_lock) - |
2060 | 5.01k | _normal_queue.get_capacity(cache_lock) - |
2061 | 5.01k | _disposable_queue.get_capacity(cache_lock) - |
2062 | 5.01k | _cold_normal_queue.get_capacity(cache_lock)); |
2063 | 5.01k | _cur_ttl_cache_lru_queue_cache_size_metrics->set_value( |
2064 | 5.01k | _ttl_queue.get_capacity(cache_lock)); |
2065 | 5.01k | _cur_ttl_cache_lru_queue_element_count_metrics->set_value( |
2066 | 5.01k | _ttl_queue.get_elements_num(cache_lock)); |
2067 | 5.01k | _cur_normal_queue_cache_size_metrics->set_value(_normal_queue.get_capacity(cache_lock)); |
2068 | 5.01k | _cur_normal_queue_element_count_metrics->set_value( |
2069 | 5.01k | _normal_queue.get_elements_num(cache_lock)); |
2070 | 5.01k | _cur_index_queue_cache_size_metrics->set_value(_index_queue.get_capacity(cache_lock)); |
2071 | 5.01k | _cur_index_queue_element_count_metrics->set_value( |
2072 | 5.01k | _index_queue.get_elements_num(cache_lock)); |
2073 | 5.01k | _cur_disposable_queue_cache_size_metrics->set_value( |
2074 | 5.01k | _disposable_queue.get_capacity(cache_lock)); |
2075 | 5.01k | _cur_disposable_queue_element_count_metrics->set_value( |
2076 | 5.01k | _disposable_queue.get_elements_num(cache_lock)); |
2077 | 5.01k | _cur_cold_normal_queue_cache_size_metrics->set_value( |
2078 | 5.01k | _cold_normal_queue.get_capacity(cache_lock)); |
2079 | 5.01k | _cur_cold_normal_queue_element_count_metrics->set_value( |
2080 | 5.01k | _cold_normal_queue.get_elements_num(cache_lock)); |
2081 | | |
2082 | | // Update meta store write queue size if storage is FSFileCacheStorage |
2083 | 5.01k | if (_storage->get_type() == FileCacheStorageType::DISK) { |
2084 | 4.10k | auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get()); |
2085 | 4.10k | if (fs_storage != nullptr) { |
2086 | 4.10k | auto* meta_store = fs_storage->get_meta_store(); |
2087 | 4.10k | if (meta_store != nullptr) { |
2088 | 4.10k | _meta_store_write_queue_size_metrics->set_value( |
2089 | 4.10k | meta_store->get_write_queue_size()); |
2090 | 4.10k | } |
2091 | 4.10k | } |
2092 | 4.10k | } |
2093 | | |
2094 | 5.01k | if (_num_read_blocks->get_value() > 0) { |
2095 | 2.42k | _hit_ratio->set_value((double)_num_hit_blocks->get_value() / |
2096 | 2.42k | (double)_num_read_blocks->get_value()); |
2097 | 2.42k | } |
2098 | 5.01k | if (_num_read_blocks_5m && _num_read_blocks_5m->get_value() > 0) { |
2099 | 1.59k | _hit_ratio_5m->set_value((double)_num_hit_blocks_5m->get_value() / |
2100 | 1.59k | (double)_num_read_blocks_5m->get_value()); |
2101 | 1.59k | } |
2102 | 5.01k | if (_num_read_blocks_1h && _num_read_blocks_1h->get_value() > 0) { |
2103 | 2.39k | _hit_ratio_1h->set_value((double)_num_hit_blocks_1h->get_value() / |
2104 | 2.39k | (double)_num_read_blocks_1h->get_value()); |
2105 | 2.39k | } |
2106 | | |
2107 | 5.01k | if (_no_warmup_num_hit_blocks->get_value() > 0) { |
2108 | 2.37k | _no_warmup_hit_ratio->set_value((double)_no_warmup_num_hit_blocks->get_value() / |
2109 | 2.37k | (double)_no_warmup_num_read_blocks->get_value()); |
2110 | 2.37k | } |
2111 | 5.01k | if (_no_warmup_num_hit_blocks_5m && _no_warmup_num_hit_blocks_5m->get_value() > 0) { |
2112 | 1.55k | _no_warmup_hit_ratio_5m->set_value( |
2113 | 1.55k | (double)_no_warmup_num_hit_blocks_5m->get_value() / |
2114 | 1.55k | (double)_no_warmup_num_read_blocks_5m->get_value()); |
2115 | 1.55k | } |
2116 | 5.01k | if (_no_warmup_num_hit_blocks_1h && _no_warmup_num_hit_blocks_1h->get_value() > 0) { |
2117 | 2.36k | _no_warmup_hit_ratio_1h->set_value( |
2118 | 2.36k | (double)_no_warmup_num_hit_blocks_1h->get_value() / |
2119 | 2.36k | (double)_no_warmup_num_read_blocks_1h->get_value()); |
2120 | 2.36k | } |
2121 | 5.01k | } |
2122 | 5.01k | } |
2123 | 152 | } |
2124 | | |
2125 | 152 | void BlockFileCache::run_background_gc() { |
2126 | 152 | Thread::set_self_name("run_background_gc"); |
2127 | 152 | FileCacheKey key; |
2128 | 152 | size_t batch_count = 0; |
2129 | 203k | while (!_close) { |
2130 | 203k | int64_t interval_ms = config::file_cache_background_gc_interval_ms; |
2131 | 203k | size_t batch_limit = config::file_cache_remove_block_qps_limit * interval_ms / 1000; |
2132 | 203k | { |
2133 | 203k | std::unique_lock close_lock(_close_mtx); |
2134 | 203k | _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms)); |
2135 | 203k | if (_close) { |
2136 | 149 | break; |
2137 | 149 | } |
2138 | 203k | } |
2139 | | |
2140 | 301k | while (batch_count < batch_limit && _recycle_keys.try_dequeue(key)) { |
2141 | 98.9k | int64_t duration_ns = 0; |
2142 | 98.9k | Status st; |
2143 | 98.9k | { |
2144 | 98.9k | SCOPED_RAW_TIMER(&duration_ns); |
2145 | 98.9k | st = _storage->remove(key); |
2146 | 98.9k | } |
2147 | 98.9k | *_storage_async_remove_latency_us << (duration_ns / 1000); |
2148 | | |
2149 | 98.9k | if (!st.ok()) { |
2150 | 1.78k | LOG_WARNING("").error(st); |
2151 | 1.78k | } |
2152 | 98.9k | batch_count++; |
2153 | 98.9k | } |
2154 | 202k | *_recycle_keys_length_recorder << _recycle_keys.size_approx(); |
2155 | 202k | batch_count = 0; |
2156 | 202k | } |
2157 | 152 | } |
2158 | | |
2159 | 152 | void BlockFileCache::run_background_evict_in_advance() { |
2160 | 152 | Thread::set_self_name("run_background_evict_in_advance"); |
2161 | 152 | LOG(INFO) << "Starting background evict in advance thread"; |
2162 | 152 | int64_t batch = 0; |
2163 | 25.2k | while (!_close) { |
2164 | 25.2k | { |
2165 | 25.2k | std::unique_lock close_lock(_close_mtx); |
2166 | 25.2k | _close_cv.wait_for( |
2167 | 25.2k | close_lock, |
2168 | 25.2k | std::chrono::milliseconds(config::file_cache_evict_in_advance_interval_ms)); |
2169 | 25.2k | if (_close) { |
2170 | 149 | LOG(INFO) << "Background evict in advance thread exiting due to cache closing"; |
2171 | 149 | break; |
2172 | 149 | } |
2173 | 25.2k | } |
2174 | 25.1k | batch = config::file_cache_evict_in_advance_batch_bytes; |
2175 | | |
2176 | | // Skip if eviction not needed or too many pending recycles |
2177 | 25.1k | if (!_need_evict_cache_in_advance || |
2178 | 25.1k | _recycle_keys.size_approx() >= |
2179 | 24.8k | config::file_cache_evict_in_advance_recycle_keys_num_threshold) { |
2180 | 24.8k | continue; |
2181 | 24.8k | } |
2182 | | |
2183 | 257 | int64_t duration_ns = 0; |
2184 | 257 | { |
2185 | 257 | SCOPED_CACHE_LOCK(_mutex, this); |
2186 | 257 | SCOPED_RAW_TIMER(&duration_ns); |
2187 | 257 | try_evict_in_advance(batch, cache_lock); |
2188 | 257 | } |
2189 | 257 | *_evict_in_advance_latency_us << (duration_ns / 1000); |
2190 | 257 | } |
2191 | 152 | } |
2192 | | |
2193 | 152 | void BlockFileCache::run_background_block_lru_update() { |
2194 | 152 | Thread::set_self_name("run_background_block_lru_update"); |
2195 | 152 | std::vector<FileBlockSPtr> batch; |
2196 | 8.86k | while (!_close) { |
2197 | 8.85k | int64_t interval_ms = config::file_cache_background_block_lru_update_interval_ms; |
2198 | 8.85k | size_t batch_limit = |
2199 | 8.85k | config::file_cache_background_block_lru_update_qps_limit * interval_ms / 1000; |
2200 | 8.85k | { |
2201 | 8.85k | std::unique_lock close_lock(_close_mtx); |
2202 | 8.85k | _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms)); |
2203 | 8.85k | if (_close) { |
2204 | 149 | break; |
2205 | 149 | } |
2206 | 8.85k | } |
2207 | | |
2208 | 8.70k | batch.clear(); |
2209 | 8.70k | batch.reserve(batch_limit); |
2210 | 8.70k | size_t drained = _need_update_lru_blocks.drain(batch_limit, &batch); |
2211 | 8.72k | if (drained == 0) { |
2212 | 8.72k | *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size(); |
2213 | 8.72k | continue; |
2214 | 8.72k | } |
2215 | | |
2216 | 18.4E | int64_t duration_ns = 0; |
2217 | 18.4E | { |
2218 | 18.4E | SCOPED_CACHE_LOCK(_mutex, this); |
2219 | 18.4E | SCOPED_RAW_TIMER(&duration_ns); |
2220 | 18.4E | for (auto& block : batch) { |
2221 | 480 | update_block_lru(block, cache_lock); |
2222 | 480 | } |
2223 | 18.4E | } |
2224 | 18.4E | *_update_lru_blocks_latency_us << (duration_ns / 1000); |
2225 | 18.4E | *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size(); |
2226 | 18.4E | } |
2227 | 152 | } |
2228 | | |
2229 | | std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>> |
2230 | 2 | BlockFileCache::get_hot_blocks_meta(const UInt128Wrapper& hash) const { |
2231 | 2 | int64_t cur_time = std::chrono::duration_cast<std::chrono::milliseconds>( |
2232 | 2 | std::chrono::steady_clock::now().time_since_epoch()) |
2233 | 2 | .count(); |
2234 | 2 | SCOPED_CACHE_LOCK(_mutex, this); |
2235 | 2 | std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>> blocks_meta; |
2236 | 2 | if (auto iter = _files.find(hash); iter != _files.end()) { |
2237 | 5 | for (auto& pair : _files.find(hash)->second) { |
2238 | 5 | const FileBlockCell* cell = &pair.second; |
2239 | 5 | if (cell->file_block->cache_type() != FileCacheType::DISPOSABLE) { |
2240 | 4 | if (cell->file_block->cache_type() == FileCacheType::TTL || |
2241 | 4 | (cell->atime != 0 && |
2242 | 3 | cur_time - cell->atime < |
2243 | 3 | get_queue(cell->file_block->cache_type()).get_hot_data_interval() * |
2244 | 3 | 1000)) { |
2245 | 3 | blocks_meta.emplace_back(pair.first, cell->size(), |
2246 | 3 | cell->file_block->cache_type(), |
2247 | 3 | cell->file_block->expiration_time()); |
2248 | 3 | } |
2249 | 4 | } |
2250 | 5 | } |
2251 | 2 | } |
2252 | 2 | return blocks_meta; |
2253 | 2 | } |
2254 | | |
2255 | | bool BlockFileCache::try_reserve_during_async_load(size_t size, |
2256 | 2 | std::lock_guard<std::mutex>& cache_lock) { |
2257 | 2 | size_t removed_size = 0; |
2258 | 2 | size_t normal_queue_size = _normal_queue.get_capacity(cache_lock); |
2259 | 2 | size_t disposable_queue_size = _disposable_queue.get_capacity(cache_lock); |
2260 | 2 | size_t index_queue_size = _index_queue.get_capacity(cache_lock); |
2261 | 2 | size_t cold_normal_queue_size = _cold_normal_queue.get_capacity(cache_lock); |
2262 | | |
2263 | 2 | std::vector<FileBlockCell*> to_evict; |
2264 | 2 | auto collect_eliminate_fragments = [&](LRUQueue& queue) { |
2265 | 3 | for (const auto& [entry_key, entry_offset, entry_size] : queue) { |
2266 | 3 | if (!_disk_resource_limit_mode || removed_size >= size) { |
2267 | 2 | break; |
2268 | 2 | } |
2269 | 1 | auto* cell = get_cell(entry_key, entry_offset, cache_lock); |
2270 | | |
2271 | 1 | DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string() |
2272 | 0 | << ", offset: " << entry_offset; |
2273 | | |
2274 | 1 | size_t cell_size = cell->size(); |
2275 | 1 | DCHECK(entry_size == cell_size); |
2276 | | |
2277 | 1 | if (cell->releasable()) { |
2278 | 1 | auto& file_block = cell->file_block; |
2279 | | |
2280 | 1 | std::lock_guard block_lock(file_block->_mutex); |
2281 | 1 | DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED); |
2282 | 1 | to_evict.push_back(cell); |
2283 | 1 | removed_size += cell_size; |
2284 | 1 | } |
2285 | 1 | } |
2286 | 2 | }; |
2287 | 2 | if (disposable_queue_size != 0) { |
2288 | 0 | collect_eliminate_fragments(get_queue(FileCacheType::DISPOSABLE)); |
2289 | 0 | } |
2290 | 2 | if (normal_queue_size != 0) { |
2291 | 2 | collect_eliminate_fragments(get_queue(FileCacheType::NORMAL)); |
2292 | 2 | } |
2293 | 2 | if (index_queue_size != 0) { |
2294 | 0 | collect_eliminate_fragments(get_queue(FileCacheType::INDEX)); |
2295 | 0 | } |
2296 | 2 | if (cold_normal_queue_size != 0) { |
2297 | 0 | collect_eliminate_fragments(get_queue(FileCacheType::COLD_NORMAL)); |
2298 | 0 | } |
2299 | | |
2300 | 2 | std::string reason = "async load"; |
2301 | 2 | remove_file_blocks(to_evict, cache_lock, true, reason); |
2302 | | |
2303 | 2 | return !_disk_resource_limit_mode || removed_size >= size; |
2304 | 2 | } |
2305 | | |
2306 | 33 | void BlockFileCache::clear_need_update_lru_blocks() { |
2307 | 33 | _need_update_lru_blocks.clear(); |
2308 | 33 | *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size(); |
2309 | 33 | } |
2310 | | |
2311 | 30 | std::string BlockFileCache::clear_file_cache_directly() { |
2312 | 30 | _lru_dumper->remove_lru_dump_files(); |
2313 | 30 | using namespace std::chrono; |
2314 | 30 | std::stringstream ss; |
2315 | 30 | auto start = steady_clock::now(); |
2316 | 30 | SCOPED_CACHE_LOCK(_mutex, this); |
2317 | 30 | LOG_INFO("start clear_file_cache_directly").tag("path", _cache_base_path); |
2318 | | |
2319 | 30 | std::string clear_msg; |
2320 | 30 | auto s = _storage->clear(clear_msg); |
2321 | 30 | if (!s.ok()) { |
2322 | 1 | return clear_msg; |
2323 | 1 | } |
2324 | | |
2325 | 29 | int64_t num_files = _files.size(); |
2326 | 29 | int64_t cache_size = _cur_cache_size; |
2327 | 29 | int64_t index_queue_size = _index_queue.get_elements_num(cache_lock); |
2328 | 29 | int64_t normal_queue_size = _normal_queue.get_elements_num(cache_lock); |
2329 | 29 | int64_t disposable_queue_size = _disposable_queue.get_elements_num(cache_lock); |
2330 | 29 | int64_t ttl_queue_size = _ttl_queue.get_elements_num(cache_lock); |
2331 | 29 | int64_t cold_normal_queue_size = _cold_normal_queue.get_elements_num(cache_lock); |
2332 | | |
2333 | 29 | int64_t clear_fd_duration = 0; |
2334 | 29 | { |
2335 | | // clear FDCache to release fd |
2336 | 29 | SCOPED_RAW_TIMER(&clear_fd_duration); |
2337 | 29 | for (const auto& [file_key, file_blocks] : _files) { |
2338 | 13 | for (const auto& [offset, file_block_cell] : file_blocks) { |
2339 | 13 | AccessKeyAndOffset access_key_and_offset(file_key, offset); |
2340 | 13 | FDCache::instance()->remove_file_reader(access_key_and_offset); |
2341 | 13 | } |
2342 | 2 | } |
2343 | 29 | } |
2344 | | |
2345 | 29 | _files.clear(); |
2346 | 29 | _cur_cache_size = 0; |
2347 | 29 | _cur_ttl_size = 0; |
2348 | 29 | _time_to_key.clear(); |
2349 | 29 | _key_to_time.clear(); |
2350 | 29 | _index_queue.clear(cache_lock); |
2351 | 29 | _normal_queue.clear(cache_lock); |
2352 | 29 | _disposable_queue.clear(cache_lock); |
2353 | 29 | _ttl_queue.clear(cache_lock); |
2354 | 29 | _cold_normal_queue.clear(cache_lock); |
2355 | | |
2356 | | // Synchronously update cache metrics so that information_schema.file_cache_statistics |
2357 | | // reflects the cleared state immediately, without waiting for the next |
2358 | | // run_background_monitor cycle. |
2359 | 29 | _cur_cache_size_metrics->set_value(0); |
2360 | 29 | _cur_ttl_cache_size_metrics->set_value(0); |
2361 | 29 | _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(0); |
2362 | 29 | _cur_ttl_cache_lru_queue_element_count_metrics->set_value(0); |
2363 | 29 | _cur_normal_queue_cache_size_metrics->set_value(0); |
2364 | 29 | _cur_normal_queue_element_count_metrics->set_value(0); |
2365 | 29 | _cur_index_queue_cache_size_metrics->set_value(0); |
2366 | 29 | _cur_index_queue_element_count_metrics->set_value(0); |
2367 | 29 | _cur_disposable_queue_cache_size_metrics->set_value(0); |
2368 | 29 | _cur_disposable_queue_element_count_metrics->set_value(0); |
2369 | | |
2370 | 29 | clear_need_update_lru_blocks(); |
2371 | | |
2372 | 29 | ss << "finish clear_file_cache_directly" |
2373 | 29 | << " path=" << _cache_base_path |
2374 | 29 | << " time_elapsed_ms=" << duration_cast<milliseconds>(steady_clock::now() - start).count() |
2375 | 29 | << " fd_clear_time_ms=" << (clear_fd_duration / 1000000) << " num_files=" << num_files |
2376 | 29 | << " cache_size=" << cache_size << " index_queue_size=" << index_queue_size |
2377 | 29 | << " normal_queue_size=" << normal_queue_size |
2378 | 29 | << " disposable_queue_size=" << disposable_queue_size << "ttl_queue_size=" << ttl_queue_size |
2379 | 29 | << " cold_normal_queue_size=" << cold_normal_queue_size; |
2380 | 29 | auto msg = ss.str(); |
2381 | 29 | LOG(INFO) << msg; |
2382 | 29 | _lru_dumper->remove_lru_dump_files(); |
2383 | 29 | return msg; |
2384 | 30 | } |
2385 | | |
2386 | 46.3k | std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const UInt128Wrapper& hash) { |
2387 | 46.3k | std::map<size_t, FileBlockSPtr> offset_to_block; |
2388 | 46.3k | SCOPED_CACHE_LOCK(_mutex, this); |
2389 | 46.3k | if (_files.contains(hash)) { |
2390 | 43.3k | for (auto& [offset, cell] : _files[hash]) { |
2391 | 43.3k | if (cell.file_block->state() == FileBlock::State::DOWNLOADED) { |
2392 | 43.3k | cell.file_block->_owned_by_cached_reader = true; |
2393 | 43.3k | offset_to_block.emplace(offset, cell.file_block); |
2394 | 43.3k | } |
2395 | 43.3k | } |
2396 | 41.2k | } |
2397 | 46.3k | return offset_to_block; |
2398 | 46.3k | } |
2399 | | |
2400 | 0 | void BlockFileCache::update_ttl_atime(const UInt128Wrapper& hash) { |
2401 | 0 | SCOPED_CACHE_LOCK(_mutex, this); |
2402 | 0 | if (auto iter = _files.find(hash); iter != _files.end()) { |
2403 | 0 | for (auto& [_, cell] : iter->second) { |
2404 | 0 | cell.update_atime(); |
2405 | 0 | } |
2406 | 0 | }; |
2407 | 0 | } |
2408 | | |
2409 | 152 | void BlockFileCache::run_background_lru_log_replay() { |
2410 | 152 | Thread::set_self_name("run_background_lru_log_replay"); |
2411 | 24.9k | while (!_close) { |
2412 | 24.9k | int64_t interval_ms = config::file_cache_background_lru_log_replay_interval_ms; |
2413 | 24.9k | { |
2414 | 24.9k | std::unique_lock close_lock(_close_mtx); |
2415 | 24.9k | _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms)); |
2416 | 24.9k | if (_close) { |
2417 | 148 | break; |
2418 | 148 | } |
2419 | 24.9k | } |
2420 | | |
2421 | 24.8k | _lru_recorder->replay_queue_event(FileCacheType::TTL); |
2422 | 24.8k | _lru_recorder->replay_queue_event(FileCacheType::INDEX); |
2423 | 24.8k | _lru_recorder->replay_queue_event(FileCacheType::NORMAL); |
2424 | 24.8k | _lru_recorder->replay_queue_event(FileCacheType::COLD_NORMAL); |
2425 | 24.8k | _lru_recorder->replay_queue_event(FileCacheType::DISPOSABLE); |
2426 | | |
2427 | 24.8k | if (config::enable_evaluate_shadow_queue_diff) { |
2428 | 0 | SCOPED_CACHE_LOCK(_mutex, this); |
2429 | 0 | _lru_recorder->evaluate_queue_diff(_ttl_queue, "ttl", cache_lock); |
2430 | 0 | _lru_recorder->evaluate_queue_diff(_index_queue, "index", cache_lock); |
2431 | 0 | _lru_recorder->evaluate_queue_diff(_normal_queue, "normal", cache_lock); |
2432 | 0 | _lru_recorder->evaluate_queue_diff(_cold_normal_queue, "cold_normal", cache_lock); |
2433 | 0 | _lru_recorder->evaluate_queue_diff(_disposable_queue, "disposable", cache_lock); |
2434 | 0 | } |
2435 | 24.8k | } |
2436 | 152 | } |
2437 | | |
2438 | 1.88k | void BlockFileCache::dump_lru_queues(bool force) { |
2439 | 1.88k | std::unique_lock dump_lock(_dump_lru_queues_mtx); |
2440 | 1.88k | if (config::file_cache_background_lru_dump_tail_record_num > 0 && |
2441 | 1.88k | !ExecEnv::GetInstance()->get_is_upgrading()) { |
2442 | 1.88k | _lru_dumper->dump_queue("disposable", force); |
2443 | 1.88k | _lru_dumper->dump_queue("cold_normal", force); |
2444 | 1.88k | _lru_dumper->dump_queue("normal", force); |
2445 | 1.88k | _lru_dumper->dump_queue("index", force); |
2446 | 1.88k | _lru_dumper->dump_queue("ttl", force); |
2447 | 1.88k | _lru_dumper->set_first_dump_done(); |
2448 | 1.88k | } |
2449 | 1.88k | } |
2450 | | |
2451 | 152 | void BlockFileCache::run_background_lru_dump() { |
2452 | 152 | Thread::set_self_name("run_background_lru_dump"); |
2453 | 2.04k | while (!_close) { |
2454 | 2.03k | int64_t interval_ms = config::file_cache_background_lru_dump_interval_ms; |
2455 | 2.03k | { |
2456 | 2.03k | std::unique_lock close_lock(_close_mtx); |
2457 | 2.03k | _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms)); |
2458 | 2.03k | if (_close) { |
2459 | 149 | break; |
2460 | 149 | } |
2461 | 2.03k | } |
2462 | 1.88k | dump_lru_queues(false); |
2463 | 1.88k | } |
2464 | 152 | } |
2465 | | |
2466 | 152 | void BlockFileCache::restore_lru_queues_from_disk(std::lock_guard<std::mutex>& cache_lock) { |
2467 | | // keep this order coz may be duplicated in different queue, we use the first appearence |
2468 | 152 | _lru_dumper->restore_queue(_ttl_queue, "ttl", cache_lock); |
2469 | 152 | _lru_dumper->restore_queue(_index_queue, "index", cache_lock); |
2470 | | // LRU queue mode compatibility: |
2471 | | // - Single queue dump → Dual queue load: All data enters NORMAL queue |
2472 | | // - Dual queue dump → Single queue load: |
2473 | | // * COLD_NORMAL data merges into NORMAL queue in LRU order |
2474 | | // * Restore order: cold_normal first (older), normal last (newer) |
2475 | 152 | _lru_dumper->restore_queue(_normal_queue, "cold_normal", cache_lock); |
2476 | 152 | _lru_dumper->restore_queue(_normal_queue, "normal", cache_lock); |
2477 | 152 | _lru_dumper->restore_queue(_disposable_queue, "disposable", cache_lock); |
2478 | 152 | } |
2479 | | |
2480 | 65 | std::map<std::string, double> BlockFileCache::get_stats() { |
2481 | 65 | std::map<std::string, double> stats; |
2482 | 65 | stats["hits_ratio"] = (double)_hit_ratio->get_value(); |
2483 | 65 | stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value(); |
2484 | 65 | stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value(); |
2485 | | |
2486 | 65 | stats["index_queue_max_size"] = (double)_index_queue.get_max_size(); |
2487 | 65 | stats["index_queue_curr_size"] = (double)_cur_index_queue_cache_size_metrics->get_value(); |
2488 | 65 | stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size(); |
2489 | 65 | stats["index_queue_curr_elements"] = |
2490 | 65 | (double)_cur_index_queue_element_count_metrics->get_value(); |
2491 | | |
2492 | 65 | stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size(); |
2493 | 65 | stats["ttl_queue_curr_size"] = (double)_cur_ttl_cache_lru_queue_cache_size_metrics->get_value(); |
2494 | 65 | stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size(); |
2495 | 65 | stats["ttl_queue_curr_elements"] = |
2496 | 65 | (double)_cur_ttl_cache_lru_queue_element_count_metrics->get_value(); |
2497 | | |
2498 | 65 | stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size(); |
2499 | 65 | stats["normal_queue_curr_size"] = (double)_cur_normal_queue_cache_size_metrics->get_value(); |
2500 | 65 | stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size(); |
2501 | 65 | stats["normal_queue_curr_elements"] = |
2502 | 65 | (double)_cur_normal_queue_element_count_metrics->get_value(); |
2503 | | |
2504 | 65 | stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size(); |
2505 | 65 | stats["disposable_queue_curr_size"] = |
2506 | 65 | (double)_cur_disposable_queue_cache_size_metrics->get_value(); |
2507 | 65 | stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size(); |
2508 | 65 | stats["disposable_queue_curr_elements"] = |
2509 | 65 | (double)_cur_disposable_queue_element_count_metrics->get_value(); |
2510 | | |
2511 | 65 | stats["cold_normal_queue_max_size"] = (double)_cold_normal_queue.get_max_size(); |
2512 | 65 | stats["cold_normal_queue_curr_size"] = |
2513 | 65 | (double)_cur_cold_normal_queue_cache_size_metrics->get_value(); |
2514 | 65 | stats["cold_normal_queue_max_elements"] = (double)_cold_normal_queue.get_max_element_size(); |
2515 | 65 | stats["cold_normal_queue_curr_elements"] = |
2516 | 65 | (double)_cur_cold_normal_queue_element_count_metrics->get_value(); |
2517 | | |
2518 | 65 | stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance; |
2519 | 65 | stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode; |
2520 | | |
2521 | 65 | stats["total_removed_counts"] = (double)_num_removed_blocks->get_value(); |
2522 | 65 | stats["total_hit_counts"] = (double)_num_hit_blocks->get_value(); |
2523 | 65 | stats["total_read_counts"] = (double)_num_read_blocks->get_value(); |
2524 | | |
2525 | 65 | stats["total_read_size"] = (double)_total_read_size_metrics->get_value(); |
2526 | 65 | stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value(); |
2527 | 65 | stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value(); |
2528 | | |
2529 | 65 | return stats; |
2530 | 65 | } |
2531 | | |
2532 | | // for be UTs |
2533 | 217 | std::map<std::string, double> BlockFileCache::get_stats_unsafe() { |
2534 | 217 | std::map<std::string, double> stats; |
2535 | 217 | stats["hits_ratio"] = (double)_hit_ratio->get_value(); |
2536 | 217 | stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value(); |
2537 | 217 | stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value(); |
2538 | | |
2539 | 217 | stats["index_queue_max_size"] = (double)_index_queue.get_max_size(); |
2540 | 217 | stats["index_queue_curr_size"] = (double)_index_queue.get_capacity_unsafe(); |
2541 | 217 | stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size(); |
2542 | 217 | stats["index_queue_curr_elements"] = (double)_index_queue.get_elements_num_unsafe(); |
2543 | | |
2544 | 217 | stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size(); |
2545 | 217 | stats["ttl_queue_curr_size"] = (double)_ttl_queue.get_capacity_unsafe(); |
2546 | 217 | stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size(); |
2547 | 217 | stats["ttl_queue_curr_elements"] = (double)_ttl_queue.get_elements_num_unsafe(); |
2548 | | |
2549 | 217 | stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size(); |
2550 | 217 | stats["normal_queue_curr_size"] = (double)_normal_queue.get_capacity_unsafe(); |
2551 | 217 | stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size(); |
2552 | 217 | stats["normal_queue_curr_elements"] = (double)_normal_queue.get_elements_num_unsafe(); |
2553 | | |
2554 | 217 | stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size(); |
2555 | 217 | stats["disposable_queue_curr_size"] = (double)_disposable_queue.get_capacity_unsafe(); |
2556 | 217 | stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size(); |
2557 | 217 | stats["disposable_queue_curr_elements"] = (double)_disposable_queue.get_elements_num_unsafe(); |
2558 | | |
2559 | 217 | stats["cold_normal_queue_max_size"] = (double)_cold_normal_queue.get_max_size(); |
2560 | 217 | stats["cold_normal_queue_curr_size"] = (double)_cold_normal_queue.get_capacity_unsafe(); |
2561 | 217 | stats["cold_normal_queue_max_elements"] = (double)_cold_normal_queue.get_max_element_size(); |
2562 | 217 | stats["cold_normal_queue_curr_elements"] = (double)_cold_normal_queue.get_elements_num_unsafe(); |
2563 | | |
2564 | 217 | stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance; |
2565 | 217 | stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode; |
2566 | | |
2567 | 217 | stats["total_removed_counts"] = (double)_num_removed_blocks->get_value(); |
2568 | 217 | stats["total_hit_counts"] = (double)_num_hit_blocks->get_value(); |
2569 | 217 | stats["total_read_counts"] = (double)_num_read_blocks->get_value(); |
2570 | | |
2571 | 217 | stats["total_read_size"] = (double)_total_read_size_metrics->get_value(); |
2572 | 217 | stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value(); |
2573 | 217 | stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value(); |
2574 | | |
2575 | 217 | return stats; |
2576 | 217 | } |
2577 | | |
2578 | | template void BlockFileCache::remove(FileBlockSPtr file_block, |
2579 | | std::lock_guard<std::mutex>& cache_lock, |
2580 | | std::lock_guard<std::mutex>& block_lock, bool sync); |
2581 | | |
2582 | | #include "common/compile_check_end.h" |
2583 | | |
2584 | 1 | Status BlockFileCache::report_file_cache_inconsistency(std::vector<std::string>& results) { |
2585 | 1 | InconsistencyContext inconsistency_context; |
2586 | 1 | RETURN_IF_ERROR(check_file_cache_consistency(inconsistency_context)); |
2587 | 1 | auto n = inconsistency_context.types.size(); |
2588 | 1 | results.reserve(n); |
2589 | 1 | for (size_t i = 0; i < n; i++) { |
2590 | 0 | std::string result; |
2591 | 0 | result += "File cache info in manager:\n"; |
2592 | 0 | result += inconsistency_context.infos_in_manager[i].to_string(); |
2593 | 0 | result += "File cache info in storage:\n"; |
2594 | 0 | result += inconsistency_context.infos_in_storage[i].to_string(); |
2595 | 0 | result += inconsistency_context.types[i].to_string(); |
2596 | 0 | result += "\n"; |
2597 | 0 | results.push_back(std::move(result)); |
2598 | 0 | } |
2599 | 1 | return Status::OK(); |
2600 | 1 | } |
2601 | | |
2602 | 1 | Status BlockFileCache::check_file_cache_consistency(InconsistencyContext& inconsistency_context) { |
2603 | 1 | std::lock_guard<std::mutex> cache_lock(_mutex); |
2604 | 1 | std::vector<FileCacheInfo> infos_in_storage; |
2605 | 1 | RETURN_IF_ERROR(_storage->get_file_cache_infos(infos_in_storage, cache_lock)); |
2606 | 1 | std::unordered_set<AccessKeyAndOffset, KeyAndOffsetHash> confirmed_blocks; |
2607 | 1 | for (const auto& info_in_storage : infos_in_storage) { |
2608 | 0 | confirmed_blocks.insert({info_in_storage.hash, info_in_storage.offset}); |
2609 | 0 | auto* cell = get_cell(info_in_storage.hash, info_in_storage.offset, cache_lock); |
2610 | 0 | if (cell == nullptr || cell->file_block == nullptr) { |
2611 | 0 | inconsistency_context.infos_in_manager.emplace_back(); |
2612 | 0 | inconsistency_context.infos_in_storage.push_back(info_in_storage); |
2613 | 0 | inconsistency_context.types.emplace_back(InconsistencyType::NOT_LOADED); |
2614 | 0 | continue; |
2615 | 0 | } |
2616 | 0 | FileCacheInfo info_in_manager { |
2617 | 0 | .hash = info_in_storage.hash, |
2618 | 0 | .expiration_time = cell->file_block->expiration_time(), |
2619 | 0 | .size = cell->size(), |
2620 | 0 | .offset = info_in_storage.offset, |
2621 | 0 | .is_tmp = cell->file_block->state() == FileBlock::State::DOWNLOADING, |
2622 | 0 | .cache_type = cell->file_block->cache_type()}; |
2623 | 0 | InconsistencyType inconsistent_type; |
2624 | 0 | if (info_in_storage.is_tmp != info_in_manager.is_tmp) { |
2625 | 0 | inconsistent_type |= InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE; |
2626 | 0 | } |
2627 | 0 | size_t expected_size = |
2628 | 0 | info_in_manager.is_tmp ? cell->dowloading_size() : info_in_manager.size; |
2629 | 0 | if (info_in_storage.size != expected_size) { |
2630 | 0 | inconsistent_type |= InconsistencyType::SIZE_INCONSISTENT; |
2631 | 0 | } |
2632 | | // Only if it is not a tmp file need we check the cache type. |
2633 | 0 | if ((inconsistent_type & InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE) == 0 && |
2634 | 0 | info_in_storage.cache_type != info_in_manager.cache_type) { |
2635 | 0 | inconsistent_type |= InconsistencyType::CACHE_TYPE_INCONSISTENT; |
2636 | 0 | } |
2637 | 0 | if (info_in_storage.expiration_time != info_in_manager.expiration_time) { |
2638 | 0 | inconsistent_type |= InconsistencyType::EXPIRATION_TIME_INCONSISTENT; |
2639 | 0 | } |
2640 | 0 | if (inconsistent_type != InconsistencyType::NONE) { |
2641 | 0 | inconsistency_context.infos_in_manager.push_back(info_in_manager); |
2642 | 0 | inconsistency_context.infos_in_storage.push_back(info_in_storage); |
2643 | 0 | inconsistency_context.types.push_back(inconsistent_type); |
2644 | 0 | } |
2645 | 0 | } |
2646 | | |
2647 | 1 | for (const auto& [hash, offset_to_cell] : _files) { |
2648 | 0 | for (const auto& [offset, cell] : offset_to_cell) { |
2649 | 0 | if (confirmed_blocks.contains({hash, offset})) { |
2650 | 0 | continue; |
2651 | 0 | } |
2652 | 0 | const auto& block = cell.file_block; |
2653 | 0 | inconsistency_context.infos_in_manager.emplace_back( |
2654 | 0 | hash, block->expiration_time(), cell.size(), offset, |
2655 | 0 | cell.file_block->state() == FileBlock::State::DOWNLOADING, block->cache_type()); |
2656 | 0 | inconsistency_context.infos_in_storage.emplace_back(); |
2657 | 0 | inconsistency_context.types.emplace_back(InconsistencyType::MISSING_IN_STORAGE); |
2658 | 0 | } |
2659 | 0 | } |
2660 | 1 | return Status::OK(); |
2661 | 1 | } |
2662 | | |
2663 | | } // namespace doris::io |