Coverage Report

Created: 2026-05-21 07:40

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/cache/block_file_cache.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileCache.cpp
19
// and modified by Doris
20
21
#include "io/cache/block_file_cache.h"
22
23
#include <gen_cpp/file_cache.pb.h>
24
25
#include <cstdio>
26
#include <exception>
27
#include <fstream>
28
#include <unordered_set>
29
30
#include "common/status.h"
31
#include "cpp/sync_point.h"
32
#include "runtime/exec_env.h"
33
34
#if defined(__APPLE__)
35
#include <sys/mount.h>
36
#else
37
#include <sys/statfs.h>
38
#endif
39
40
#include <chrono> // IWYU pragma: keep
41
#include <mutex>
42
#include <ranges>
43
44
#include "common/cast_set.h"
45
#include "common/config.h"
46
#include "common/logging.h"
47
#include "core/uint128.h"
48
#include "exec/common/sip_hash.h"
49
#include "io/cache/block_file_cache_ttl_mgr.h"
50
#include "io/cache/file_block.h"
51
#include "io/cache/file_cache_common.h"
52
#include "io/cache/fs_file_cache_storage.h"
53
#include "io/cache/mem_file_cache_storage.h"
54
#include "runtime/runtime_profile.h"
55
#include "util/concurrency_stats.h"
56
#include "util/stack_util.h"
57
#include "util/stopwatch.hpp"
58
#include "util/thread.h"
59
#include "util/time.h"
60
namespace doris::io {
61
62
namespace {
63
constexpr std::array<FileCacheType, FILE_CACHE_TYPE_NUM> FILE_CACHE_TYPES = {
64
        FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX, FileCacheType::TTL,
65
        FileCacheType::META};
66
}
67
68
// Insert a block pointer into one shard while swallowing allocation failures.
69
8.57k
bool NeedUpdateLRUBlocks::insert(FileBlockSPtr block) {
70
8.57k
    if (!block) {
71
1
        return false;
72
1
    }
73
8.57k
    try {
74
8.57k
        auto* raw_ptr = block.get();
75
8.57k
        auto idx = shard_index(raw_ptr);
76
8.57k
        auto& shard = _shards[idx];
77
8.57k
        std::lock_guard lock(shard.mutex);
78
8.57k
        auto [_, inserted] = shard.entries.emplace(raw_ptr, std::move(block));
79
8.57k
        if (inserted) {
80
908
            _size.fetch_add(1, std::memory_order_relaxed);
81
908
        }
82
8.57k
        return inserted;
83
8.57k
    } catch (const std::exception& e) {
84
0
        LOG(WARNING) << "Failed to enqueue block for LRU update: " << e.what();
85
0
    } catch (...) {
86
0
        LOG(WARNING) << "Failed to enqueue block for LRU update: unknown error";
87
0
    }
88
0
    return false;
89
8.57k
}
90
91
// Drain up to `limit` unique blocks to the caller, keeping the structure consistent on failures.
92
7.42k
size_t NeedUpdateLRUBlocks::drain(size_t limit, std::vector<FileBlockSPtr>* output) {
93
7.42k
    if (limit == 0 || output == nullptr) {
94
2
        return 0;
95
2
    }
96
7.42k
    size_t drained = 0;
97
7.42k
    try {
98
7.42k
        output->reserve(output->size() + std::min(limit, size()));
99
475k
        for (auto& shard : _shards) {
100
475k
            if (drained >= limit) {
101
1
                break;
102
1
            }
103
475k
            std::lock_guard lock(shard.mutex);
104
475k
            auto it = shard.entries.begin();
105
475k
            size_t shard_drained = 0;
106
475k
            while (it != shard.entries.end() && drained + shard_drained < limit) {
107
483
                output->emplace_back(std::move(it->second));
108
483
                it = shard.entries.erase(it);
109
483
                ++shard_drained;
110
483
            }
111
475k
            if (shard_drained > 0) {
112
6
                _size.fetch_sub(shard_drained, std::memory_order_relaxed);
113
6
                drained += shard_drained;
114
6
            }
115
475k
        }
116
7.42k
    } catch (const std::exception& e) {
117
0
        LOG(WARNING) << "Failed to drain LRU update blocks: " << e.what();
118
0
    } catch (...) {
119
0
        LOG(WARNING) << "Failed to drain LRU update blocks: unknown error";
120
0
    }
121
7.42k
    return drained;
122
7.42k
}
123
124
// Remove every pending block, guarding against unexpected exceptions.
125
32
void NeedUpdateLRUBlocks::clear() {
126
32
    try {
127
2.04k
        for (auto& shard : _shards) {
128
2.04k
            std::lock_guard lock(shard.mutex);
129
2.04k
            if (!shard.entries.empty()) {
130
1
                auto removed = shard.entries.size();
131
1
                shard.entries.clear();
132
1
                _size.fetch_sub(removed, std::memory_order_relaxed);
133
1
            }
134
2.04k
        }
135
32
    } catch (const std::exception& e) {
136
0
        LOG(WARNING) << "Failed to clear LRU update blocks: " << e.what();
137
0
    } catch (...) {
138
0
        LOG(WARNING) << "Failed to clear LRU update blocks: unknown error";
139
0
    }
140
32
}
141
142
8.57k
size_t NeedUpdateLRUBlocks::shard_index(FileBlock* ptr) const {
143
8.57k
    DCHECK(ptr != nullptr);
144
8.57k
    return std::hash<FileBlock*> {}(ptr)&kShardMask;
145
8.57k
}
146
147
BlockFileCache::BlockFileCache(const std::string& cache_base_path,
148
                               const FileCacheSettings& cache_settings)
149
171
        : _cache_base_path(cache_base_path),
150
171
          _capacity(cache_settings.capacity),
151
171
          _max_file_block_size(cache_settings.max_file_block_size) {
152
171
    _cur_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(_cache_base_path.c_str(),
153
171
                                                                     "file_cache_cache_size", 0);
154
171
    _cache_capacity_metrics = std::make_shared<bvar::Status<size_t>>(
155
171
            _cache_base_path.c_str(), "file_cache_capacity", _capacity);
156
171
    _cur_ttl_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
157
171
            _cache_base_path.c_str(), "file_cache_ttl_cache_size", 0);
158
171
    _cur_normal_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
159
171
            _cache_base_path.c_str(), "file_cache_normal_queue_element_count", 0);
160
171
    _cur_ttl_cache_lru_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
161
171
            _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_size", 0);
162
171
    _cur_ttl_cache_lru_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
163
171
            _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_element_count", 0);
164
171
    _cur_normal_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
165
171
            _cache_base_path.c_str(), "file_cache_normal_queue_cache_size", 0);
166
171
    _cur_index_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
167
171
            _cache_base_path.c_str(), "file_cache_index_queue_element_count", 0);
168
171
    _cur_index_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
169
171
            _cache_base_path.c_str(), "file_cache_index_queue_cache_size", 0);
170
171
    _cur_disposable_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
171
171
            _cache_base_path.c_str(), "file_cache_disposable_queue_element_count", 0);
172
171
    _cur_disposable_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
173
171
            _cache_base_path.c_str(), "file_cache_disposable_queue_cache_size", 0);
174
171
    _cur_meta_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
175
171
            _cache_base_path.c_str(), "file_cache_meta_queue_element_count", 0);
176
171
    _cur_meta_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
177
171
            _cache_base_path.c_str(), "file_cache_meta_queue_cache_size", 0);
178
179
171
    _queue_evict_size_metrics[FileCacheType::DISPOSABLE] = std::make_shared<bvar::Adder<size_t>>(
180
171
            _cache_base_path.c_str(), "file_cache_disposable_queue_evict_size");
181
171
    _queue_evict_size_metrics[FileCacheType::NORMAL] = std::make_shared<bvar::Adder<size_t>>(
182
171
            _cache_base_path.c_str(), "file_cache_normal_queue_evict_size");
183
171
    _queue_evict_size_metrics[FileCacheType::INDEX] = std::make_shared<bvar::Adder<size_t>>(
184
171
            _cache_base_path.c_str(), "file_cache_index_queue_evict_size");
185
171
    _queue_evict_size_metrics[FileCacheType::TTL] = std::make_shared<bvar::Adder<size_t>>(
186
171
            _cache_base_path.c_str(), "file_cache_ttl_cache_evict_size");
187
171
    _queue_evict_size_metrics[FileCacheType::META] = std::make_shared<bvar::Adder<size_t>>(
188
171
            _cache_base_path.c_str(), "file_cache_meta_queue_evict_size");
189
171
    _total_evict_size_metrics = std::make_shared<bvar::Adder<size_t>>(
190
171
            _cache_base_path.c_str(), "file_cache_total_evict_size");
191
171
    _total_read_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
192
171
                                                                     "file_cache_total_read_size");
193
171
    _total_hit_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
194
171
                                                                    "file_cache_total_hit_size");
195
171
    _gc_evict_bytes_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
196
171
                                                                    "file_cache_gc_evict_bytes");
197
171
    _gc_evict_count_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
198
171
                                                                    "file_cache_gc_evict_count");
199
200
171
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
201
171
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
202
171
                                                  "file_cache_evict_by_time_disposable_to_normal");
203
171
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
204
171
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
205
171
                                                  "file_cache_evict_by_time_disposable_to_index");
206
171
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
207
171
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
208
171
                                                  "file_cache_evict_by_time_disposable_to_ttl");
209
171
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
210
171
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
211
171
                                                  "file_cache_evict_by_time_normal_to_disposable");
212
171
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
213
171
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
214
171
                                                  "file_cache_evict_by_time_normal_to_index");
215
171
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
216
171
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
217
171
                                                  "file_cache_evict_by_time_normal_to_ttl");
218
171
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
219
171
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
220
171
                                                  "file_cache_evict_by_time_index_to_disposable");
221
171
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
222
171
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
223
171
                                                  "file_cache_evict_by_time_index_to_normal");
224
171
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
225
171
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
226
171
                                                  "file_cache_evict_by_time_index_to_ttl");
227
171
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
228
171
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
229
171
                                                  "file_cache_evict_by_time_ttl_to_disposable");
230
171
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
231
171
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
232
171
                                                  "file_cache_evict_by_time_ttl_to_normal");
233
171
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
234
171
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
235
171
                                                  "file_cache_evict_by_time_ttl_to_index");
236
237
171
    _evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE] =
238
171
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
239
171
                                                  "file_cache_evict_by_self_lru_disposable");
240
171
    _evict_by_self_lru_metrics_matrix[FileCacheType::NORMAL] =
241
171
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
242
171
                                                  "file_cache_evict_by_self_lru_normal");
243
171
    _evict_by_self_lru_metrics_matrix[FileCacheType::INDEX] = std::make_shared<bvar::Adder<size_t>>(
244
171
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_index");
245
171
    _evict_by_self_lru_metrics_matrix[FileCacheType::TTL] = std::make_shared<bvar::Adder<size_t>>(
246
171
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_ttl");
247
171
    _evict_by_self_lru_metrics_matrix[FileCacheType::META] = std::make_shared<bvar::Adder<size_t>>(
248
171
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_meta");
249
250
171
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
251
171
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
252
171
                                                  "file_cache_evict_by_size_disposable_to_normal");
253
171
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
254
171
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
255
171
                                                  "file_cache_evict_by_size_disposable_to_index");
256
171
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
257
171
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
258
171
                                                  "file_cache_evict_by_size_disposable_to_ttl");
259
171
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
260
171
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
261
171
                                                  "file_cache_evict_by_size_normal_to_disposable");
262
171
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
263
171
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
264
171
                                                  "file_cache_evict_by_size_normal_to_index");
265
171
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
266
171
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
267
171
                                                  "file_cache_evict_by_size_normal_to_ttl");
268
171
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
269
171
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
270
171
                                                  "file_cache_evict_by_size_index_to_disposable");
271
171
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
272
171
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
273
171
                                                  "file_cache_evict_by_size_index_to_normal");
274
171
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
275
171
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
276
171
                                                  "file_cache_evict_by_size_index_to_ttl");
277
171
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
278
171
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
279
171
                                                  "file_cache_evict_by_size_ttl_to_disposable");
280
171
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
281
171
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
282
171
                                                  "file_cache_evict_by_size_ttl_to_normal");
283
171
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
284
171
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
285
171
                                                  "file_cache_evict_by_size_ttl_to_index");
286
855
    for (auto src_type : FILE_CACHE_TYPES) {
287
4.27k
        for (auto dst_type : FILE_CACHE_TYPES) {
288
4.27k
            if (src_type == dst_type) {
289
855
                continue;
290
855
            }
291
3.42k
            if (!_evict_by_time_metrics_matrix[src_type][dst_type]) {
292
1.36k
                _evict_by_time_metrics_matrix[src_type][dst_type] =
293
1.36k
                        std::make_shared<bvar::Adder<size_t>>(
294
1.36k
                                _cache_base_path.c_str(),
295
1.36k
                                fmt::format("file_cache_evict_by_time_{}_to_{}",
296
1.36k
                                            cache_type_to_string(src_type),
297
1.36k
                                            cache_type_to_string(dst_type))
298
1.36k
                                        .c_str());
299
1.36k
            }
300
3.42k
            if (!_evict_by_size_metrics_matrix[src_type][dst_type]) {
301
1.36k
                _evict_by_size_metrics_matrix[src_type][dst_type] =
302
1.36k
                        std::make_shared<bvar::Adder<size_t>>(
303
1.36k
                                _cache_base_path.c_str(),
304
1.36k
                                fmt::format("file_cache_evict_by_size_{}_to_{}",
305
1.36k
                                            cache_type_to_string(src_type),
306
1.36k
                                            cache_type_to_string(dst_type))
307
1.36k
                                        .c_str());
308
1.36k
            }
309
3.42k
        }
310
855
    }
311
312
171
    _evict_by_try_release = std::make_shared<bvar::Adder<size_t>>(
313
171
            _cache_base_path.c_str(), "file_cache_evict_by_try_release");
314
315
171
    _num_read_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
316
171
                                                             "file_cache_num_read_blocks");
317
171
    _num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
318
171
                                                            "file_cache_num_hit_blocks");
319
171
    _num_removed_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
320
171
                                                                "file_cache_num_removed_blocks");
321
322
171
    _no_warmup_num_read_blocks = std::make_shared<bvar::Adder<size_t>>(
323
171
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks");
324
171
    _no_warmup_num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(
325
171
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks");
326
327
171
#ifndef BE_TEST
328
171
    _num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
329
171
            _cache_base_path.c_str(), "file_cache_num_hit_blocks_5m", _num_hit_blocks.get(), 300);
330
171
    _num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
331
171
            _cache_base_path.c_str(), "file_cache_num_read_blocks_5m", _num_read_blocks.get(), 300);
332
171
    _num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
333
171
            _cache_base_path.c_str(), "file_cache_num_hit_blocks_1h", _num_hit_blocks.get(), 3600);
334
171
    _num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
335
171
            _cache_base_path.c_str(), "file_cache_num_read_blocks_1h", _num_read_blocks.get(),
336
171
            3600);
337
171
    _no_warmup_num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
338
171
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_5m",
339
171
            _no_warmup_num_hit_blocks.get(), 300);
340
171
    _no_warmup_num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
341
171
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_5m",
342
171
            _no_warmup_num_read_blocks.get(), 300);
343
171
    _no_warmup_num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
344
171
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_1h",
345
171
            _no_warmup_num_hit_blocks.get(), 3600);
346
171
    _no_warmup_num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
347
171
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_1h",
348
171
            _no_warmup_num_read_blocks.get(), 3600);
349
171
#endif
350
351
171
    _hit_ratio = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
352
171
                                                        "file_cache_hit_ratio", 0.0);
353
171
    _hit_ratio_5m = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
354
171
                                                           "file_cache_hit_ratio_5m", 0.0);
355
171
    _hit_ratio_1h = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
356
171
                                                           "file_cache_hit_ratio_1h", 0.0);
357
358
171
    _no_warmup_hit_ratio = std::make_shared<bvar::Status<double>>(
359
171
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio", 0.0);
360
171
    _no_warmup_hit_ratio_5m = std::make_shared<bvar::Status<double>>(
361
171
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_5m", 0.0);
362
171
    _no_warmup_hit_ratio_1h = std::make_shared<bvar::Status<double>>(
363
171
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_1h", 0.0);
364
365
171
    _disk_limit_mode_metrics = std::make_shared<bvar::Status<size_t>>(
366
171
            _cache_base_path.c_str(), "file_cache_disk_limit_mode", 0);
367
171
    _need_evict_cache_in_advance_metrics = std::make_shared<bvar::Status<size_t>>(
368
171
            _cache_base_path.c_str(), "file_cache_need_evict_cache_in_advance", 0);
369
171
    _meta_store_write_queue_size_metrics = std::make_shared<bvar::Status<size_t>>(
370
171
            _cache_base_path.c_str(), "file_cache_meta_store_write_queue_size", 0);
371
372
171
    _cache_lock_wait_time_us = std::make_shared<bvar::LatencyRecorder>(
373
171
            _cache_base_path.c_str(), "file_cache_cache_lock_wait_time_us");
374
171
    _get_or_set_latency_us = std::make_shared<bvar::LatencyRecorder>(
375
171
            _cache_base_path.c_str(), "file_cache_get_or_set_latency_us");
376
171
    _storage_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
377
171
            _cache_base_path.c_str(), "file_cache_storage_sync_remove_latency_us");
378
171
    _storage_retry_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
379
171
            _cache_base_path.c_str(), "file_cache_storage_retry_sync_remove_latency_us");
380
171
    _storage_async_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
381
171
            _cache_base_path.c_str(), "file_cache_storage_async_remove_latency_us");
382
171
    _evict_in_advance_latency_us = std::make_shared<bvar::LatencyRecorder>(
383
171
            _cache_base_path.c_str(), "file_cache_evict_in_advance_latency_us");
384
171
    _lru_dump_latency_us = std::make_shared<bvar::LatencyRecorder>(
385
171
            _cache_base_path.c_str(), "file_cache_lru_dump_latency_us");
386
171
    _recycle_keys_length_recorder = std::make_shared<bvar::LatencyRecorder>(
387
171
            _cache_base_path.c_str(), "file_cache_recycle_keys_length");
388
171
    _need_update_lru_blocks_length_recorder = std::make_shared<bvar::LatencyRecorder>(
389
171
            _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_length");
390
171
    _update_lru_blocks_latency_us = std::make_shared<bvar::LatencyRecorder>(
391
171
            _cache_base_path.c_str(), "file_cache_update_lru_blocks_latency_us");
392
171
    _ttl_gc_latency_us = std::make_shared<bvar::LatencyRecorder>(_cache_base_path.c_str(),
393
171
                                                                 "file_cache_ttl_gc_latency_us");
394
171
    _shadow_queue_levenshtein_distance = std::make_shared<bvar::LatencyRecorder>(
395
171
            _cache_base_path.c_str(), "file_cache_shadow_queue_levenshtein_distance");
396
397
171
    _disposable_queue = LRUQueue(cache_settings.disposable_queue_size,
398
171
                                 cache_settings.disposable_queue_elements, 60 * 60);
399
171
    _index_queue = LRUQueue(cache_settings.index_queue_size, cache_settings.index_queue_elements,
400
171
                            7 * 24 * 60 * 60);
401
171
    _normal_queue = LRUQueue(cache_settings.query_queue_size, cache_settings.query_queue_elements,
402
171
                             24 * 60 * 60);
403
171
    _ttl_queue = LRUQueue(cache_settings.ttl_queue_size, cache_settings.ttl_queue_elements,
404
171
                          std::numeric_limits<int>::max());
405
171
    _meta_queue = LRUQueue(cache_settings.meta_queue_size, cache_settings.meta_queue_elements,
406
171
                           7 * 24 * 60 * 60);
407
408
171
    _lru_recorder = std::make_unique<LRUQueueRecorder>(this);
409
171
    _lru_dumper = std::make_unique<CacheLRUDumper>(this, _lru_recorder.get());
410
171
    if (cache_settings.storage == "memory") {
411
18
        _storage = std::make_unique<MemFileCacheStorage>();
412
18
        _cache_base_path = "memory";
413
153
    } else {
414
153
        _storage = std::make_unique<FSFileCacheStorage>();
415
153
    }
416
417
171
    LOG(INFO) << "file cache path= " << _cache_base_path << " " << cache_settings.to_string();
418
171
}
419
420
452k
UInt128Wrapper BlockFileCache::hash(const std::string& path) {
421
452k
    uint128_t value;
422
452k
    sip_hash128(path.data(), path.size(), reinterpret_cast<char*>(&value));
423
452k
    return UInt128Wrapper(value);
424
452k
}
425
426
BlockFileCache::QueryFileCacheContextHolderPtr BlockFileCache::get_query_context_holder(
427
8
        const TUniqueId& query_id, int file_cache_query_limit_percent) {
428
8
    SCOPED_CACHE_LOCK(_mutex, this);
429
8
    if (!config::enable_file_cache_query_limit) {
430
1
        return {};
431
1
    }
432
433
    /// if enable_filesystem_query_cache_limit is true,
434
    /// we create context query for current query.
435
7
    auto context = get_or_set_query_context(query_id, cache_lock, file_cache_query_limit_percent);
436
7
    return std::make_unique<QueryFileCacheContextHolder>(query_id, this, context);
437
8
}
438
439
BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_query_context(
440
17.4k
        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock) {
441
17.4k
    auto query_iter = _query_map.find(query_id);
442
17.4k
    return (query_iter == _query_map.end()) ? nullptr : query_iter->second;
443
17.4k
}
444
445
6
void BlockFileCache::remove_query_context(const TUniqueId& query_id) {
446
6
    SCOPED_CACHE_LOCK(_mutex, this);
447
6
    const auto& query_iter = _query_map.find(query_id);
448
449
6
    if (query_iter != _query_map.end() && query_iter->second.use_count() <= 1) {
450
5
        _query_map.erase(query_iter);
451
5
    }
452
6
}
453
454
BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_or_set_query_context(
455
        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock,
456
7
        int file_cache_query_limit_percent) {
457
7
    if (query_id.lo == 0 && query_id.hi == 0) {
458
1
        return nullptr;
459
1
    }
460
461
6
    auto context = get_query_context(query_id, cache_lock);
462
6
    if (context) {
463
1
        return context;
464
1
    }
465
466
5
    size_t file_cache_query_limit_size = _capacity * file_cache_query_limit_percent / 100;
467
5
    if (file_cache_query_limit_size < 268435456) {
468
5
        LOG(WARNING) << "The user-set file cache query limit (" << file_cache_query_limit_size
469
5
                     << " bytes) is less than the 256MB recommended minimum. "
470
5
                     << "Consider increasing the session variable 'file_cache_query_limit_percent'"
471
5
                     << " from its current value " << file_cache_query_limit_percent << "%.";
472
5
    }
473
5
    auto query_context = std::make_shared<QueryFileCacheContext>(file_cache_query_limit_size);
474
5
    auto query_iter = _query_map.emplace(query_id, query_context).first;
475
5
    return query_iter->second;
476
6
}
477
478
void BlockFileCache::QueryFileCacheContext::remove(const UInt128Wrapper& hash, size_t offset,
479
20
                                                   std::lock_guard<std::mutex>& cache_lock) {
480
20
    auto pair = std::make_pair(hash, offset);
481
20
    auto record = records.find(pair);
482
20
    DCHECK(record != records.end());
483
20
    auto iter = record->second;
484
20
    records.erase(pair);
485
20
    lru_queue.remove(iter, cache_lock);
486
20
}
487
488
void BlockFileCache::QueryFileCacheContext::reserve(const UInt128Wrapper& hash, size_t offset,
489
                                                    size_t size,
490
30
                                                    std::lock_guard<std::mutex>& cache_lock) {
491
30
    auto pair = std::make_pair(hash, offset);
492
30
    if (records.find(pair) == records.end()) {
493
29
        auto queue_iter = lru_queue.add(hash, offset, size, cache_lock);
494
29
        records.insert({pair, queue_iter});
495
29
    }
496
30
}
497
498
150
Status BlockFileCache::initialize() {
499
150
    SCOPED_CACHE_LOCK(_mutex, this);
500
150
    return initialize_unlocked(cache_lock);
501
150
}
502
503
150
Status BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lock) {
504
150
    DCHECK(!_is_initialized);
505
150
    _is_initialized = true;
506
150
    if (config::file_cache_background_lru_dump_tail_record_num > 0) {
507
        // requirements:
508
        // 1. restored data should not overwrite the last dump
509
        // 2. restore should happen before load and async load
510
        // 3. all queues should be restored sequencially to avoid conflict
511
        // TODO(zhengyu): we can parralize them but will increase complexity, so lets check the time cost
512
        // to see if any improvement is a necessary
513
150
        restore_lru_queues_from_disk(cache_lock);
514
150
    }
515
150
    RETURN_IF_ERROR(_storage->init(this));
516
517
150
    if (auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get())) {
518
134
        if (auto* meta_store = fs_storage->get_meta_store()) {
519
134
            _ttl_mgr = std::make_unique<BlockFileCacheTtlMgr>(this, meta_store);
520
134
        }
521
134
    }
522
523
150
    _cache_background_monitor_thread = std::thread(&BlockFileCache::run_background_monitor, this);
524
150
    _cache_background_gc_thread = std::thread(&BlockFileCache::run_background_gc, this);
525
150
    _cache_background_evict_in_advance_thread =
526
150
            std::thread(&BlockFileCache::run_background_evict_in_advance, this);
527
150
    _cache_background_block_lru_update_thread =
528
150
            std::thread(&BlockFileCache::run_background_block_lru_update, this);
529
530
    // Initialize LRU dump thread and restore queues
531
150
    _cache_background_lru_dump_thread = std::thread(&BlockFileCache::run_background_lru_dump, this);
532
150
    _cache_background_lru_log_replay_thread =
533
150
            std::thread(&BlockFileCache::run_background_lru_log_replay, this);
534
535
150
    return Status::OK();
536
150
}
537
538
void BlockFileCache::update_block_lru(FileBlockSPtr block,
539
480
                                      std::lock_guard<std::mutex>& cache_lock) {
540
480
    if (!block) {
541
1
        return;
542
1
    }
543
544
479
    FileBlockCell* cell = get_cell(block->get_hash_value(), block->offset(), cache_lock);
545
479
    if (!cell || cell->file_block.get() != block.get()) {
546
2
        return;
547
2
    }
548
549
477
    if (cell->queue_iterator) {
550
477
        auto& queue = get_queue(block->cache_type());
551
477
        queue.move_to_end(*cell->queue_iterator, cache_lock);
552
477
        _lru_recorder->record_queue_event(block->cache_type(), CacheLRULogType::MOVETOBACK,
553
477
                                          block->_key.hash, block->_key.offset,
554
477
                                          block->_block_range.size());
555
477
    }
556
477
    cell->update_atime();
557
477
}
558
559
void BlockFileCache::use_cell(const FileBlockCell& cell, FileBlocks* result, bool move_iter_flag,
560
2.89M
                              std::lock_guard<std::mutex>& cache_lock) {
561
2.89M
    if (result) {
562
2.89M
        result->push_back(cell.file_block);
563
2.89M
    }
564
565
2.89M
    auto& queue = get_queue(cell.file_block->cache_type());
566
    /// Move to the end of the queue. The iterator remains valid.
567
2.89M
    if (cell.queue_iterator && move_iter_flag) {
568
1.85M
        queue.move_to_end(*cell.queue_iterator, cache_lock);
569
1.85M
        _lru_recorder->record_queue_event(cell.file_block->cache_type(),
570
1.85M
                                          CacheLRULogType::MOVETOBACK, cell.file_block->_key.hash,
571
1.85M
                                          cell.file_block->_key.offset, cell.size());
572
1.85M
    }
573
574
2.89M
    cell.update_atime();
575
2.89M
}
576
577
template <class T>
578
    requires IsXLock<T>
579
FileBlockCell* BlockFileCache::get_cell(const UInt128Wrapper& hash, size_t offset,
580
2.31M
                                        T& /* cache_lock */) {
581
2.31M
    auto it = _files.find(hash);
582
2.31M
    if (it == _files.end()) {
583
3
        return nullptr;
584
3
    }
585
586
2.31M
    auto& offsets = it->second;
587
2.31M
    auto cell_it = offsets.find(offset);
588
2.31M
    if (cell_it == offsets.end()) {
589
3
        return nullptr;
590
3
    }
591
592
2.31M
    return &cell_it->second;
593
2.31M
}
594
595
2.89M
bool BlockFileCache::need_to_move(FileCacheType cell_type, FileCacheType query_type) const {
596
2.89M
    return query_type != FileCacheType::DISPOSABLE && cell_type != FileCacheType::DISPOSABLE;
597
2.89M
}
598
599
FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheContext& context,
600
                                    const FileBlock::Range& range,
601
2.95M
                                    std::lock_guard<std::mutex>& cache_lock) {
602
    /// Given range = [left, right] and non-overlapping ordered set of file blocks,
603
    /// find list [block1, ..., blockN] of blocks which intersect with given range.
604
2.95M
    auto it = _files.find(hash);
605
2.95M
    if (it == _files.end()) {
606
125k
        if (_async_open_done) {
607
125k
            return {};
608
125k
        }
609
0
        FileCacheKey key;
610
0
        key.hash = hash;
611
0
        key.meta.type = context.cache_type;
612
0
        key.meta.expiration_time = context.expiration_time;
613
0
        key.meta.tablet_id = context.tablet_id;
614
0
        _storage->load_blocks_directly_unlocked(this, key, cache_lock);
615
616
0
        it = _files.find(hash);
617
0
        if (it == _files.end()) [[unlikely]] {
618
0
            return {};
619
0
        }
620
0
    }
621
622
2.83M
    auto& file_blocks = it->second;
623
2.83M
    if (file_blocks.empty()) {
624
0
        LOG(WARNING) << "file_blocks is empty for hash=" << hash.to_string()
625
0
                     << " cache type=" << context.cache_type
626
0
                     << " cache expiration time=" << context.expiration_time
627
0
                     << " cache range=" << range.left << " " << range.right
628
0
                     << " query id=" << context.query_id;
629
0
        DCHECK(false);
630
0
        _files.erase(hash);
631
0
        return {};
632
0
    }
633
634
2.83M
    FileBlocks result;
635
2.83M
    auto block_it = file_blocks.lower_bound(range.left);
636
2.83M
    if (block_it == file_blocks.end()) {
637
        /// N - last cached block for given file hash, block{N}.offset < range.left:
638
        ///   block{N}                       block{N}
639
        /// [________                         [_______]
640
        ///     [__________]         OR                  [________]
641
        ///     ^                                        ^
642
        ///     range.left                               range.left
643
644
7.30k
        const auto& cell = file_blocks.rbegin()->second;
645
7.30k
        if (cell.file_block->range().right < range.left) {
646
7.28k
            return {};
647
7.28k
        }
648
649
17
        use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
650
17
                 cache_lock);
651
2.82M
    } else { /// block_it <-- segmment{k}
652
2.82M
        if (block_it != file_blocks.begin()) {
653
96.1k
            const auto& prev_cell = std::prev(block_it)->second;
654
96.1k
            const auto& prev_cell_range = prev_cell.file_block->range();
655
656
96.1k
            if (range.left <= prev_cell_range.right) {
657
                ///   block{k-1}  block{k}
658
                ///   [________]   [_____
659
                ///       [___________
660
                ///       ^
661
                ///       range.left
662
663
139
                use_cell(prev_cell, &result,
664
139
                         need_to_move(prev_cell.file_block->cache_type(), context.cache_type),
665
139
                         cache_lock);
666
139
            }
667
96.1k
        }
668
669
        ///  block{k} ...       block{k-1}  block{k}                      block{k}
670
        ///  [______              [______]     [____                        [________
671
        ///  [_________     OR              [________      OR    [______]   ^
672
        ///  ^                              ^                           ^   block{k}.offset
673
        ///  range.left                     range.left                  range.right
674
675
5.72M
        while (block_it != file_blocks.end()) {
676
3.04M
            const auto& cell = block_it->second;
677
3.04M
            if (range.right < cell.file_block->range().left) {
678
153k
                break;
679
153k
            }
680
681
2.89M
            use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
682
2.89M
                     cache_lock);
683
2.89M
            ++block_it;
684
2.89M
        }
685
2.82M
    }
686
687
2.82M
    return result;
688
2.83M
}
689
690
8.57k
void BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) {
691
8.57k
    if (_need_update_lru_blocks.insert(std::move(block))) {
692
895
        *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
693
895
    }
694
8.57k
}
695
696
2
std::string BlockFileCache::clear_file_cache_async() {
697
2
    LOG(INFO) << "start clear_file_cache_async, path=" << _cache_base_path;
698
2
    _lru_dumper->remove_lru_dump_files();
699
2
    int64_t num_cells_all = 0;
700
2
    int64_t num_cells_to_delete = 0;
701
2
    int64_t num_cells_wait_recycle = 0;
702
2
    int64_t num_files_all = 0;
703
2
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::clear_file_cache_async");
704
2
    {
705
2
        SCOPED_CACHE_LOCK(_mutex, this);
706
707
2
        std::vector<FileBlockCell*> deleting_cells;
708
3
        for (auto& [_, offset_to_cell] : _files) {
709
3
            ++num_files_all;
710
36
            for (auto& [_1, cell] : offset_to_cell) {
711
36
                ++num_cells_all;
712
36
                deleting_cells.push_back(&cell);
713
36
            }
714
3
        }
715
716
        // we cannot delete the element in the loop above, because it will break the iterator
717
36
        for (auto& cell : deleting_cells) {
718
36
            if (!cell->releasable()) {
719
2
                LOG(INFO) << "cell is not releasable, hash="
720
2
                          << " offset=" << cell->file_block->offset();
721
2
                cell->file_block->set_deleting();
722
2
                ++num_cells_wait_recycle;
723
2
                continue;
724
2
            }
725
34
            FileBlockSPtr file_block = cell->file_block;
726
34
            if (file_block) {
727
34
                std::lock_guard block_lock(file_block->_mutex);
728
34
                remove(file_block, cache_lock, block_lock, false);
729
34
                ++num_cells_to_delete;
730
34
            }
731
34
        }
732
2
        clear_need_update_lru_blocks();
733
2
    }
734
735
2
    std::stringstream ss;
736
2
    ss << "finish clear_file_cache_async, path=" << _cache_base_path
737
2
       << " num_files_all=" << num_files_all << " num_cells_all=" << num_cells_all
738
2
       << " num_cells_to_delete=" << num_cells_to_delete
739
2
       << " num_cells_wait_recycle=" << num_cells_wait_recycle;
740
2
    auto msg = ss.str();
741
2
    LOG(INFO) << msg;
742
2
    _lru_dumper->remove_lru_dump_files();
743
2
    return msg;
744
2
}
745
746
FileBlocks BlockFileCache::split_range_into_cells(const UInt128Wrapper& hash,
747
                                                  const CacheContext& context, size_t offset,
748
                                                  size_t size, FileBlock::State state,
749
135k
                                                  std::lock_guard<std::mutex>& cache_lock) {
750
135k
    DCHECK(size > 0);
751
752
135k
    auto current_pos = offset;
753
135k
    auto end_pos_non_included = offset + size;
754
755
135k
    size_t current_size = 0;
756
135k
    size_t remaining_size = size;
757
758
135k
    FileBlocks file_blocks;
759
508k
    while (current_pos < end_pos_non_included) {
760
373k
        current_size = std::min(remaining_size, _max_file_block_size);
761
373k
        remaining_size -= current_size;
762
373k
        state = try_reserve(hash, context, current_pos, current_size, cache_lock)
763
373k
                        ? state
764
373k
                        : FileBlock::State::SKIP_CACHE;
765
373k
        if (state == FileBlock::State::SKIP_CACHE) [[unlikely]] {
766
14.1k
            FileCacheKey key;
767
14.1k
            key.hash = hash;
768
14.1k
            key.offset = current_pos;
769
14.1k
            key.meta.type = context.cache_type;
770
14.1k
            key.meta.expiration_time = context.expiration_time;
771
14.1k
            key.meta.tablet_id = context.tablet_id;
772
14.1k
            auto file_block = std::make_shared<FileBlock>(key, current_size, this,
773
14.1k
                                                          FileBlock::State::SKIP_CACHE);
774
14.1k
            file_blocks.push_back(std::move(file_block));
775
359k
        } else {
776
359k
            auto* cell = add_cell(hash, context, current_pos, current_size, state, cache_lock);
777
359k
            if (cell) {
778
359k
                file_blocks.push_back(cell->file_block);
779
359k
                if (!context.is_cold_data) {
780
359k
                    cell->update_atime();
781
359k
                }
782
359k
            }
783
359k
            if (_ttl_mgr && context.tablet_id != 0) {
784
351k
                _ttl_mgr->register_tablet_id(context.tablet_id);
785
351k
            }
786
359k
        }
787
788
373k
        current_pos += current_size;
789
373k
    }
790
791
135k
    DCHECK(file_blocks.empty() || offset + size - 1 == file_blocks.back()->range().right);
792
135k
    return file_blocks;
793
135k
}
794
795
void BlockFileCache::fill_holes_with_empty_file_blocks(FileBlocks& file_blocks,
796
                                                       const UInt128Wrapper& hash,
797
                                                       const CacheContext& context,
798
                                                       const FileBlock::Range& range,
799
2.82M
                                                       std::lock_guard<std::mutex>& cache_lock) {
800
    /// There are blocks [block1, ..., blockN]
801
    /// (non-overlapping, non-empty, ascending-ordered) which (maybe partially)
802
    /// intersect with given range.
803
804
    /// It can have holes:
805
    /// [____________________]         -- requested range
806
    ///     [____]  [_]   [_________]  -- intersecting cache [block1, ..., blockN]
807
    ///
808
    /// For each such hole create a cell with file block state EMPTY.
809
810
2.82M
    auto it = file_blocks.begin();
811
2.82M
    auto block_range = (*it)->range();
812
813
2.82M
    size_t current_pos = 0;
814
2.82M
    if (block_range.left < range.left) {
815
        ///    [_______     -- requested range
816
        /// [_______
817
        /// ^
818
        /// block1
819
820
151
        current_pos = block_range.right + 1;
821
151
        ++it;
822
2.82M
    } else {
823
2.82M
        current_pos = range.left;
824
2.82M
    }
825
826
5.71M
    while (current_pos <= range.right && it != file_blocks.end()) {
827
2.89M
        block_range = (*it)->range();
828
829
2.89M
        if (current_pos == block_range.left) {
830
2.89M
            current_pos = block_range.right + 1;
831
2.89M
            ++it;
832
2.89M
            continue;
833
2.89M
        }
834
835
2.89M
        DCHECK(current_pos < block_range.left);
836
837
234
        auto hole_size = block_range.left - current_pos;
838
839
234
        file_blocks.splice(it, split_range_into_cells(hash, context, current_pos, hole_size,
840
234
                                                      FileBlock::State::EMPTY, cache_lock));
841
842
234
        current_pos = block_range.right + 1;
843
234
        ++it;
844
234
    }
845
846
2.82M
    if (current_pos <= range.right) {
847
        ///   ________]     -- requested range
848
        ///   _____]
849
        ///        ^
850
        /// blockN
851
852
712
        auto hole_size = range.right - current_pos + 1;
853
854
712
        file_blocks.splice(file_blocks.end(),
855
712
                           split_range_into_cells(hash, context, current_pos, hole_size,
856
712
                                                  FileBlock::State::EMPTY, cache_lock));
857
712
    }
858
2.82M
}
859
860
FileBlocksHolder BlockFileCache::get_or_set(const UInt128Wrapper& hash, size_t offset, size_t size,
861
2.95M
                                            CacheContext& context) {
862
2.95M
    FileBlock::Range range(offset, offset + size - 1);
863
864
2.95M
    ReadStatistics* stats = context.stats;
865
2.95M
    DCHECK(stats != nullptr);
866
2.95M
    MonotonicStopWatch sw;
867
2.95M
    sw.start();
868
2.95M
    ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->increment();
869
2.95M
    std::lock_guard cache_lock(_mutex);
870
2.95M
    ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->decrement();
871
2.95M
    stats->lock_wait_timer += sw.elapsed_time();
872
2.95M
    FileBlocks file_blocks;
873
2.95M
    int64_t duration = 0;
874
2.95M
    {
875
2.95M
        SCOPED_RAW_TIMER(&duration);
876
        /// Get all blocks which intersect with the given range.
877
2.95M
        {
878
2.95M
            SCOPED_RAW_TIMER(&stats->get_timer);
879
2.95M
            file_blocks = get_impl(hash, context, range, cache_lock);
880
2.95M
        }
881
882
2.95M
        if (file_blocks.empty()) {
883
134k
            SCOPED_RAW_TIMER(&stats->set_timer);
884
134k
            file_blocks = split_range_into_cells(hash, context, offset, size,
885
134k
                                                 FileBlock::State::EMPTY, cache_lock);
886
2.82M
        } else {
887
2.82M
            SCOPED_RAW_TIMER(&stats->set_timer);
888
2.82M
            fill_holes_with_empty_file_blocks(file_blocks, hash, context, range, cache_lock);
889
2.82M
        }
890
2.95M
        DCHECK(!file_blocks.empty());
891
2.95M
        *_num_read_blocks << file_blocks.size();
892
2.95M
        if (!context.is_warmup) {
893
2.90M
            *_no_warmup_num_read_blocks << file_blocks.size();
894
2.90M
        }
895
3.26M
        for (auto& block : file_blocks) {
896
3.26M
            size_t block_size = block->range().size();
897
3.26M
            *_total_read_size_metrics << block_size;
898
3.26M
            if (block->state_unsafe() == FileBlock::State::DOWNLOADED) {
899
2.88M
                *_num_hit_blocks << 1;
900
2.88M
                *_total_hit_size_metrics << block_size;
901
2.88M
                if (!context.is_warmup) {
902
2.83M
                    *_no_warmup_num_hit_blocks << 1;
903
2.83M
                }
904
2.88M
            }
905
3.26M
        }
906
2.95M
    }
907
2.95M
    *_get_or_set_latency_us << (duration / 1000);
908
2.95M
    return FileBlocksHolder(std::move(file_blocks));
909
2.95M
}
910
911
Status BlockFileCache::read_if_cached(const UInt128Wrapper& hash, size_t offset, Slice buffer,
912
13
                                      CacheContext& context) {
913
13
    if (buffer.size == 0) {
914
0
        return Status::OK();
915
0
    }
916
917
13
    FileBlock::Range range(offset, offset + buffer.size - 1);
918
13
    FileBlocks file_blocks;
919
13
    {
920
13
        SCOPED_CACHE_LOCK(_mutex, this);
921
13
        file_blocks = get_impl(hash, context, range, cache_lock);
922
13
        if (file_blocks.empty()) {
923
1
            return Status::NotFound("file cache block not found, hash={}, range={}",
924
1
                                    hash.to_string(), range.to_string());
925
1
        }
926
927
12
        size_t current_pos = range.left;
928
21
        for (const auto& block : file_blocks) {
929
21
            const auto& block_range = block->range();
930
21
            if (block_range.left > current_pos || block->state() != FileBlock::State::DOWNLOADED) {
931
0
                return Status::NotFound("file cache block not downloaded, hash={}, range={}",
932
0
                                        hash.to_string(), range.to_string());
933
0
            }
934
21
            if (block_range.right >= current_pos) {
935
21
                current_pos = std::min(block_range.right, range.right) + 1;
936
21
            }
937
21
            if (current_pos > range.right) {
938
12
                break;
939
12
            }
940
21
        }
941
12
        if (current_pos <= range.right) {
942
0
            return Status::NotFound("file cache block range has holes, hash={}, range={}",
943
0
                                    hash.to_string(), range.to_string());
944
0
        }
945
12
    }
946
947
    // Holding FileBlockSPtr references keeps the downloaded blocks alive after releasing
948
    // the cache mutex; eviction can detach them from queues, but cannot destroy them here.
949
12
    size_t current_pos = range.left;
950
12
    size_t written_size = 0;
951
21
    for (const auto& block : file_blocks) {
952
21
        const auto& block_range = block->range();
953
21
        if (block_range.right < current_pos) {
954
0
            continue;
955
0
        }
956
21
        const size_t read_size = std::min(block_range.right, range.right) - current_pos + 1;
957
21
        const size_t read_offset = current_pos - block_range.left;
958
21
        RETURN_IF_ERROR(block->read(Slice(buffer.data + written_size, read_size), read_offset));
959
21
        written_size += read_size;
960
21
        current_pos += read_size;
961
21
        if (current_pos > range.right) {
962
12
            break;
963
12
        }
964
21
    }
965
12
    return Status::OK();
966
12
}
967
968
FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& hash, const CacheContext& context,
969
                                        size_t offset, size_t size, FileBlock::State state,
970
359k
                                        std::lock_guard<std::mutex>& cache_lock) {
971
    /// Create a file block cell and put it in `files` map by [hash][offset].
972
359k
    if (size == 0) {
973
0
        return nullptr; /// Empty files are not cached.
974
0
    }
975
976
359k
    VLOG_DEBUG << "Adding file block to cache. size=" << size << " hash=" << hash.to_string()
977
0
               << " offset=" << offset << " cache_type=" << cache_type_to_string(context.cache_type)
978
0
               << " expiration_time=" << context.expiration_time
979
0
               << " tablet_id=" << context.tablet_id;
980
981
359k
    if (size > 1024 * 1024 * 1024) {
982
1
        LOG(WARNING) << "File block size is too large for a block, reject. size=" << size
983
1
                     << " hash=" << hash.to_string() << " offset=" << offset
984
1
                     << " stack:" << get_stack_trace();
985
1
        return nullptr;
986
1
    }
987
988
359k
    auto& offsets = _files[hash];
989
359k
    auto itr = offsets.find(offset);
990
359k
    if (itr != offsets.end()) {
991
5
        VLOG_DEBUG << "Cache already exists for hash: " << hash.to_string()
992
0
                   << ", offset: " << offset << ", size: " << size
993
0
                   << ".\nCurrent cache structure: " << dump_structure_unlocked(hash, cache_lock);
994
5
        return &(itr->second);
995
5
    }
996
997
359k
    FileCacheKey key;
998
359k
    key.hash = hash;
999
359k
    key.offset = offset;
1000
359k
    key.meta.type = context.cache_type;
1001
359k
    key.meta.expiration_time = context.expiration_time;
1002
359k
    key.meta.tablet_id = context.tablet_id;
1003
359k
    FileBlockCell cell(std::make_shared<FileBlock>(key, size, this, state), cache_lock);
1004
359k
    Status st;
1005
359k
    if (context.expiration_time == 0 && context.cache_type == FileCacheType::TTL) {
1006
0
        st = cell.file_block->change_cache_type_lock(FileCacheType::NORMAL, cache_lock);
1007
359k
    } else if (context.cache_type != FileCacheType::TTL && context.expiration_time != 0) {
1008
1
        st = cell.file_block->change_cache_type_lock(FileCacheType::TTL, cache_lock);
1009
1
    }
1010
359k
    if (!st.ok()) {
1011
0
        LOG(WARNING) << "Cannot change cache type. expiration_time=" << context.expiration_time
1012
0
                     << " cache_type=" << cache_type_to_string(context.cache_type)
1013
0
                     << " error=" << st.msg();
1014
0
    }
1015
1016
359k
    auto& queue = get_queue(cell.file_block->cache_type());
1017
359k
    cell.queue_iterator = queue.add(hash, offset, size, cache_lock);
1018
359k
    _lru_recorder->record_queue_event(cell.file_block->cache_type(), CacheLRULogType::ADD,
1019
359k
                                      cell.file_block->get_hash_value(), cell.file_block->offset(),
1020
359k
                                      cell.size());
1021
1022
359k
    if (cell.file_block->cache_type() == FileCacheType::TTL) {
1023
3.98k
        _cur_ttl_size += cell.size();
1024
3.98k
    }
1025
359k
    auto [it, _] = offsets.insert(std::make_pair(offset, std::move(cell)));
1026
359k
    _cur_cache_size += size;
1027
359k
    return &(it->second);
1028
359k
}
1029
1030
0
size_t BlockFileCache::try_release() {
1031
0
    SCOPED_CACHE_LOCK(_mutex, this);
1032
0
    std::vector<FileBlockCell*> trash;
1033
0
    for (auto& [hash, blocks] : _files) {
1034
0
        for (auto& [offset, cell] : blocks) {
1035
0
            if (cell.releasable()) {
1036
0
                trash.emplace_back(&cell);
1037
0
            } else {
1038
0
                cell.file_block->set_deleting();
1039
0
            }
1040
0
        }
1041
0
    }
1042
0
    size_t remove_size = 0;
1043
0
    for (auto& cell : trash) {
1044
0
        FileBlockSPtr file_block = cell->file_block;
1045
0
        std::lock_guard lc(cell->file_block->_mutex);
1046
0
        remove_size += file_block->range().size();
1047
0
        remove(file_block, cache_lock, lc);
1048
0
        VLOG_DEBUG << "try_release " << _cache_base_path
1049
0
                   << " hash=" << file_block->get_hash_value().to_string()
1050
0
                   << " offset=" << file_block->offset();
1051
0
    }
1052
0
    *_evict_by_try_release << remove_size;
1053
0
    LOG(INFO) << "Released " << trash.size() << " blocks in file cache " << _cache_base_path;
1054
0
    return trash.size();
1055
0
}
1056
1057
4.46M
LRUQueue& BlockFileCache::get_queue(FileCacheType type) {
1058
4.46M
    switch (type) {
1059
1.50M
    case FileCacheType::INDEX:
1060
1.50M
        return _index_queue;
1061
479k
    case FileCacheType::DISPOSABLE:
1062
479k
        return _disposable_queue;
1063
2.46M
    case FileCacheType::NORMAL:
1064
2.46M
        return _normal_queue;
1065
7.07k
    case FileCacheType::TTL:
1066
7.07k
        return _ttl_queue;
1067
69
    case FileCacheType::META:
1068
69
        return _meta_queue;
1069
0
    default:
1070
0
        DCHECK(false);
1071
4.46M
    }
1072
0
    return _normal_queue;
1073
4.46M
}
1074
1075
137
const LRUQueue& BlockFileCache::get_queue(FileCacheType type) const {
1076
137
    switch (type) {
1077
33
    case FileCacheType::INDEX:
1078
33
        return _index_queue;
1079
1
    case FileCacheType::DISPOSABLE:
1080
1
        return _disposable_queue;
1081
103
    case FileCacheType::NORMAL:
1082
103
        return _normal_queue;
1083
0
    case FileCacheType::TTL:
1084
0
        return _ttl_queue;
1085
0
    case FileCacheType::META:
1086
0
        return _meta_queue;
1087
0
    default:
1088
0
        DCHECK(false);
1089
137
    }
1090
0
    return _normal_queue;
1091
137
}
1092
1093
void BlockFileCache::remove_file_blocks(std::vector<FileBlockCell*>& to_evict,
1094
                                        std::lock_guard<std::mutex>& cache_lock, bool sync,
1095
533k
                                        std::string& reason) {
1096
533k
    auto remove_file_block_if = [&](FileBlockCell* cell) {
1097
121k
        FileBlockSPtr file_block = cell->file_block;
1098
121k
        if (file_block) {
1099
121k
            std::lock_guard block_lock(file_block->_mutex);
1100
121k
            remove(file_block, cache_lock, block_lock, sync);
1101
121k
            VLOG_DEBUG << "remove_file_blocks"
1102
0
                       << " hash=" << file_block->get_hash_value().to_string()
1103
0
                       << " offset=" << file_block->offset() << " reason=" << reason;
1104
121k
        }
1105
121k
    };
1106
533k
    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
1107
533k
}
1108
1109
void BlockFileCache::find_evict_candidates(LRUQueue& queue, size_t size, size_t cur_cache_size,
1110
                                           size_t& removed_size,
1111
                                           std::vector<FileBlockCell*>& to_evict,
1112
                                           std::lock_guard<std::mutex>& cache_lock,
1113
19.2k
                                           size_t& cur_removed_size, bool evict_in_advance) {
1114
1.87M
    for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1115
1.87M
        if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1116
4.72k
            break;
1117
4.72k
        }
1118
1.86M
        auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1119
1120
1.86M
        DCHECK(cell) << "Cache became inconsistent. key: " << entry_key.to_string()
1121
0
                     << ", offset: " << entry_offset;
1122
1123
1.86M
        size_t cell_size = cell->size();
1124
1.86M
        DCHECK(entry_size == cell_size);
1125
1126
1.86M
        if (cell->releasable()) {
1127
120k
            auto& file_block = cell->file_block;
1128
1129
120k
            std::lock_guard block_lock(file_block->_mutex);
1130
120k
            DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1131
120k
            to_evict.push_back(cell);
1132
120k
            removed_size += cell_size;
1133
120k
            cur_removed_size += cell_size;
1134
120k
        }
1135
1.86M
    }
1136
19.2k
}
1137
1138
// 1. if async load file cache not finish
1139
//     a. evict from lru queue
1140
// 2. if ttl cache
1141
//     a. evict from disposable/normal/index queue one by one
1142
// 3. if dont reach query limit or dont have query limit
1143
//     a. evict from other queue
1144
//     b. evict from current queue
1145
//         a.1 if the data belong write, then just evict cold data
1146
// 4. if reach query limit
1147
//     a. evict from query queue
1148
//     b. evict from other queue
1149
bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const CacheContext& context,
1150
                                 size_t offset, size_t size,
1151
373k
                                 std::lock_guard<std::mutex>& cache_lock) {
1152
373k
    if (!_async_open_done) {
1153
2
        return try_reserve_during_async_load(size, cache_lock);
1154
2
    }
1155
    // use this strategy in scenarios where there is insufficient disk capacity or insufficient number of inodes remaining
1156
    // directly eliminate 5 times the size of the space
1157
373k
    if (_disk_resource_limit_mode) {
1158
1
        size = 5 * size;
1159
1
    }
1160
1161
373k
    auto query_context = config::enable_file_cache_query_limit &&
1162
373k
                                         (context.query_id.hi != 0 || context.query_id.lo != 0)
1163
373k
                                 ? get_query_context(context.query_id, cache_lock)
1164
373k
                                 : nullptr;
1165
373k
    if (!query_context) {
1166
373k
        return try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock);
1167
373k
    } else if (query_context->get_cache_size(cache_lock) + size <=
1168
31
               query_context->get_max_cache_size()) {
1169
16
        return try_reserve_for_lru(hash, query_context, context, offset, size, cache_lock);
1170
16
    }
1171
15
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
1172
15
                               std::chrono::steady_clock::now().time_since_epoch())
1173
15
                               .count();
1174
15
    auto& queue = get_queue(context.cache_type);
1175
15
    size_t removed_size = 0;
1176
15
    size_t ghost_remove_size = 0;
1177
15
    size_t queue_size = queue.get_capacity(cache_lock);
1178
15
    size_t cur_cache_size = _cur_cache_size;
1179
15
    size_t query_context_cache_size = query_context->get_cache_size(cache_lock);
1180
1181
15
    std::vector<LRUQueue::Iterator> ghost;
1182
15
    std::vector<FileBlockCell*> to_evict;
1183
1184
15
    size_t max_size = queue.get_max_size();
1185
48
    auto is_overflow = [&] {
1186
48
        return _disk_resource_limit_mode ? removed_size < size
1187
48
                                         : cur_cache_size + size - removed_size > _capacity ||
1188
48
                                                   (queue_size + size - removed_size > max_size) ||
1189
48
                                                   (query_context_cache_size + size -
1190
38
                                                            (removed_size + ghost_remove_size) >
1191
38
                                                    query_context->get_max_cache_size());
1192
48
    };
1193
1194
    /// Select the cache from the LRU queue held by query for expulsion.
1195
35
    for (auto iter = query_context->queue().begin(); iter != query_context->queue().end(); iter++) {
1196
33
        if (!is_overflow()) {
1197
13
            break;
1198
13
        }
1199
1200
20
        auto* cell = get_cell(iter->hash, iter->offset, cache_lock);
1201
1202
20
        if (!cell) {
1203
            /// The cache corresponding to this record may be swapped out by
1204
            /// other queries, so it has become invalid.
1205
4
            ghost.push_back(iter);
1206
4
            ghost_remove_size += iter->size;
1207
16
        } else {
1208
16
            size_t cell_size = cell->size();
1209
16
            DCHECK(iter->size == cell_size);
1210
1211
16
            if (cell->releasable()) {
1212
16
                auto& file_block = cell->file_block;
1213
1214
16
                std::lock_guard block_lock(file_block->_mutex);
1215
16
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1216
16
                to_evict.push_back(cell);
1217
16
                removed_size += cell_size;
1218
16
            }
1219
16
        }
1220
20
    }
1221
1222
16
    auto remove_file_block_if = [&](FileBlockCell* cell) {
1223
16
        FileBlockSPtr file_block = cell->file_block;
1224
16
        if (file_block) {
1225
16
            query_context->remove(file_block->get_hash_value(), file_block->offset(), cache_lock);
1226
16
            std::lock_guard block_lock(file_block->_mutex);
1227
16
            remove(file_block, cache_lock, block_lock);
1228
16
        }
1229
16
    };
1230
1231
15
    for (auto& iter : ghost) {
1232
4
        query_context->remove(iter->hash, iter->offset, cache_lock);
1233
4
    }
1234
1235
15
    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
1236
1237
15
    if (is_overflow() &&
1238
15
        !try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock)) {
1239
1
        return false;
1240
1
    }
1241
14
    query_context->reserve(hash, offset, size, cache_lock);
1242
14
    return true;
1243
15
}
1244
1245
265
void BlockFileCache::try_evict_in_advance(size_t size, std::lock_guard<std::mutex>& cache_lock) {
1246
265
    UInt128Wrapper hash = UInt128Wrapper();
1247
265
    size_t offset = 0;
1248
265
    CacheContext context;
1249
    /* we pick NORMAL and TTL cache to evict in advance
1250
     * we reserve for them but won't acutually give space to them
1251
     * on the contrary, NORMAL and TTL may sacrifice by LRU evicting themselves
1252
     * other cache types cannot be exempted because we will evict what they have stolen before LRU evicting
1253
     * in summary: all cache types will shrink somewhat, and NORMAL and TTL shrink the most, to make sure the cache is not full
1254
     */
1255
265
    context.cache_type = FileCacheType::NORMAL;
1256
265
    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
1257
265
    context.cache_type = FileCacheType::TTL;
1258
265
    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
1259
265
}
1260
1261
// remove specific cache synchronously, for critical operations
1262
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
1263
10
void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) {
1264
10
    std::string reason = "remove_if_cached";
1265
10
    SCOPED_CACHE_LOCK(_mutex, this);
1266
10
    auto iter = _files.find(file_key);
1267
10
    std::vector<FileBlockCell*> to_remove;
1268
10
    if (iter != _files.end()) {
1269
19
        for (auto& [_, cell] : iter->second) {
1270
19
            if (cell.releasable()) {
1271
18
                to_remove.push_back(&cell);
1272
18
            } else {
1273
1
                cell.file_block->set_deleting();
1274
1
            }
1275
19
        }
1276
8
    }
1277
10
    remove_file_blocks(to_remove, cache_lock, true, reason);
1278
10
}
1279
1280
// the async version of remove_if_cached, for background operations
1281
// cache meta is deleted synchronously if not in use, and the block file is deleted asynchronously
1282
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
1283
140k
void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) {
1284
140k
    std::string reason = "remove_if_cached_async";
1285
140k
    SCOPED_CACHE_LOCK(_mutex, this);
1286
1287
140k
    auto iter = _files.find(file_key);
1288
140k
    std::vector<FileBlockCell*> to_remove;
1289
140k
    if (iter != _files.end()) {
1290
3.50k
        for (auto& [_, cell] : iter->second) {
1291
3.50k
            *_gc_evict_bytes_metrics << cell.size();
1292
3.50k
            *_gc_evict_count_metrics << 1;
1293
3.50k
            if (cell.releasable()) {
1294
1.54k
                to_remove.push_back(&cell);
1295
1.96k
            } else {
1296
1.96k
                cell.file_block->set_deleting();
1297
1.96k
            }
1298
3.50k
        }
1299
3.21k
    }
1300
140k
    remove_file_blocks(to_remove, cache_lock, false, reason);
1301
140k
}
1302
1303
std::vector<FileCacheType> BlockFileCache::get_other_cache_type_without_ttl(
1304
373k
        FileCacheType cur_cache_type) {
1305
373k
    switch (cur_cache_type) {
1306
4.23k
    case FileCacheType::TTL:
1307
4.23k
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
1308
33
    case FileCacheType::META:
1309
33
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
1310
45.0k
    case FileCacheType::INDEX:
1311
45.0k
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL};
1312
320k
    case FileCacheType::NORMAL:
1313
320k
        return {FileCacheType::DISPOSABLE, FileCacheType::INDEX};
1314
4.60k
    case FileCacheType::DISPOSABLE:
1315
4.60k
        return {FileCacheType::NORMAL, FileCacheType::INDEX};
1316
0
    default:
1317
0
        return {};
1318
373k
    }
1319
0
    return {};
1320
373k
}
1321
1322
18.8k
std::vector<FileCacheType> BlockFileCache::get_other_cache_type(FileCacheType cur_cache_type) {
1323
18.8k
    switch (cur_cache_type) {
1324
547
    case FileCacheType::TTL:
1325
547
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
1326
10
    case FileCacheType::META:
1327
10
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX,
1328
10
                FileCacheType::TTL};
1329
970
    case FileCacheType::INDEX:
1330
970
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::TTL};
1331
17.1k
    case FileCacheType::NORMAL:
1332
17.1k
        return {FileCacheType::DISPOSABLE, FileCacheType::INDEX, FileCacheType::TTL};
1333
165
    case FileCacheType::DISPOSABLE:
1334
165
        return {FileCacheType::NORMAL, FileCacheType::INDEX, FileCacheType::TTL};
1335
0
    default:
1336
0
        return {};
1337
18.8k
    }
1338
0
    return {};
1339
18.8k
}
1340
1341
void BlockFileCache::reset_range(const UInt128Wrapper& hash, size_t offset, size_t old_size,
1342
56.6k
                                 size_t new_size, std::lock_guard<std::mutex>& cache_lock) {
1343
56.6k
    DCHECK(_files.find(hash) != _files.end() &&
1344
56.6k
           _files.find(hash)->second.find(offset) != _files.find(hash)->second.end());
1345
56.6k
    FileBlockCell* cell = get_cell(hash, offset, cache_lock);
1346
56.6k
    DCHECK(cell != nullptr);
1347
56.6k
    if (cell == nullptr) {
1348
0
        LOG(WARNING) << "reset_range skipped because cache cell is missing. hash="
1349
0
                     << hash.to_string() << " offset=" << offset << " old_size=" << old_size
1350
0
                     << " new_size=" << new_size;
1351
0
        return;
1352
0
    }
1353
56.6k
    if (cell->queue_iterator) {
1354
56.6k
        auto& queue = get_queue(cell->file_block->cache_type());
1355
56.6k
        DCHECK(queue.contains(hash, offset, cache_lock));
1356
56.6k
        queue.resize(*cell->queue_iterator, new_size, cache_lock);
1357
56.6k
        _lru_recorder->record_queue_event(cell->file_block->cache_type(), CacheLRULogType::RESIZE,
1358
56.6k
                                          cell->file_block->get_hash_value(),
1359
56.6k
                                          cell->file_block->offset(), new_size);
1360
56.6k
    }
1361
56.6k
    _cur_cache_size -= old_size;
1362
56.6k
    _cur_cache_size += new_size;
1363
56.6k
}
1364
1365
bool BlockFileCache::try_reserve_from_other_queue_by_time_interval(
1366
        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
1367
373k
        int64_t cur_time, std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
1368
373k
    size_t removed_size = 0;
1369
373k
    size_t cur_cache_size = _cur_cache_size;
1370
373k
    std::vector<FileBlockCell*> to_evict;
1371
752k
    for (FileCacheType cache_type : other_cache_types) {
1372
752k
        auto& queue = get_queue(cache_type);
1373
752k
        size_t remove_size_per_type = 0;
1374
752k
        for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1375
716k
            if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1376
683k
                break;
1377
683k
            }
1378
32.1k
            auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1379
32.1k
            DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
1380
0
                         << ", offset: " << entry_offset;
1381
1382
32.1k
            size_t cell_size = cell->size();
1383
32.1k
            DCHECK(entry_size == cell_size);
1384
1385
32.1k
            if (cell->atime == 0 ? true : cell->atime + queue.get_hot_data_interval() > cur_time) {
1386
32.1k
                break;
1387
32.1k
            }
1388
1389
2
            if (cell->releasable()) {
1390
2
                auto& file_block = cell->file_block;
1391
2
                std::lock_guard block_lock(file_block->_mutex);
1392
2
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1393
2
                to_evict.push_back(cell);
1394
2
                removed_size += cell_size;
1395
2
                remove_size_per_type += cell_size;
1396
2
            }
1397
2
        }
1398
752k
        *(_evict_by_time_metrics_matrix[cache_type][cur_type]) << remove_size_per_type;
1399
752k
    }
1400
373k
    bool is_sync_removal = !evict_in_advance;
1401
373k
    std::string reason = std::string("try_reserve_by_time ") +
1402
373k
                         " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1403
373k
    remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1404
1405
373k
    return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
1406
373k
}
1407
1408
bool BlockFileCache::is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size,
1409
2.98M
                                 bool evict_in_advance) const {
1410
2.98M
    bool ret = false;
1411
2.98M
    if (evict_in_advance) { // we don't need to check _need_evict_cache_in_advance
1412
66.5k
        ret = (removed_size < need_size);
1413
66.5k
        return ret;
1414
66.5k
    }
1415
2.91M
    if (_disk_resource_limit_mode) {
1416
4
        ret = (removed_size < need_size);
1417
2.91M
    } else {
1418
2.91M
        ret = (cur_cache_size + need_size - removed_size > _capacity);
1419
2.91M
    }
1420
2.91M
    return ret;
1421
2.98M
}
1422
1423
bool BlockFileCache::try_reserve_from_other_queue_by_size(
1424
        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
1425
1.44k
        std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
1426
1.44k
    size_t removed_size = 0;
1427
1.44k
    size_t cur_cache_size = _cur_cache_size;
1428
1.44k
    std::vector<FileBlockCell*> to_evict;
1429
    // we follow the privilege defined in get_other_cache_types to evict
1430
4.33k
    for (FileCacheType cache_type : other_cache_types) {
1431
4.33k
        auto& queue = get_queue(cache_type);
1432
1433
        // we will not drain each of them to the bottom -- i.e., we only
1434
        // evict what they have stolen.
1435
4.33k
        size_t cur_queue_size = queue.get_capacity(cache_lock);
1436
4.33k
        size_t cur_queue_max_size = queue.get_max_size();
1437
4.33k
        if (cur_queue_size <= cur_queue_max_size) {
1438
2.89k
            continue;
1439
2.89k
        }
1440
1.43k
        size_t cur_removed_size = 0;
1441
1.43k
        find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
1442
1.43k
                              cur_removed_size, evict_in_advance);
1443
1.43k
        *(_evict_by_size_metrics_matrix[cache_type][cur_type]) << cur_removed_size;
1444
1.43k
    }
1445
1.44k
    bool is_sync_removal = !evict_in_advance;
1446
1.44k
    std::string reason = std::string("try_reserve_by_size") +
1447
1.44k
                         " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1448
1.44k
    remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1449
1.44k
    return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
1450
1.44k
}
1451
1452
bool BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type, size_t size,
1453
                                                  int64_t cur_time,
1454
                                                  std::lock_guard<std::mutex>& cache_lock,
1455
373k
                                                  bool evict_in_advance) {
1456
    // currently, TTL cache is not considered as a candidate
1457
373k
    auto other_cache_types = get_other_cache_type_without_ttl(cur_cache_type);
1458
373k
    bool reserve_success = try_reserve_from_other_queue_by_time_interval(
1459
373k
            cur_cache_type, other_cache_types, size, cur_time, cache_lock, evict_in_advance);
1460
373k
    if (reserve_success || !config::file_cache_enable_evict_from_other_queue_by_size) {
1461
355k
        return reserve_success;
1462
355k
    }
1463
1464
18.8k
    other_cache_types = get_other_cache_type(cur_cache_type);
1465
18.8k
    auto& cur_queue = get_queue(cur_cache_type);
1466
18.8k
    size_t cur_queue_size = cur_queue.get_capacity(cache_lock);
1467
18.8k
    size_t cur_queue_max_size = cur_queue.get_max_size();
1468
    // Hit the soft limit by self, cannot remove from other queues
1469
18.8k
    if (_cur_cache_size + size > _capacity && cur_queue_size + size > cur_queue_max_size) {
1470
17.4k
        return false;
1471
17.4k
    }
1472
1.44k
    return try_reserve_from_other_queue_by_size(cur_cache_type, other_cache_types, size, cache_lock,
1473
1.44k
                                                evict_in_advance);
1474
18.8k
}
1475
1476
bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash,
1477
                                         QueryFileCacheContextPtr query_context,
1478
                                         const CacheContext& context, size_t offset, size_t size,
1479
                                         std::lock_guard<std::mutex>& cache_lock,
1480
373k
                                         bool evict_in_advance) {
1481
373k
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
1482
373k
                               std::chrono::steady_clock::now().time_since_epoch())
1483
373k
                               .count();
1484
373k
    if (!try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock,
1485
373k
                                      evict_in_advance)) {
1486
17.7k
        auto& queue = get_queue(context.cache_type);
1487
17.7k
        size_t removed_size = 0;
1488
17.7k
        size_t cur_cache_size = _cur_cache_size;
1489
1490
17.7k
        std::vector<FileBlockCell*> to_evict;
1491
17.7k
        size_t cur_removed_size = 0;
1492
17.7k
        find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
1493
17.7k
                              cur_removed_size, evict_in_advance);
1494
17.7k
        bool is_sync_removal = !evict_in_advance;
1495
17.7k
        std::string reason = std::string("try_reserve for cache type ") +
1496
17.7k
                             cache_type_to_string(context.cache_type) +
1497
17.7k
                             " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1498
17.7k
        remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1499
17.7k
        *(_evict_by_self_lru_metrics_matrix[context.cache_type]) << cur_removed_size;
1500
1501
17.7k
        if (is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1502
14.3k
            return false;
1503
14.3k
        }
1504
17.7k
    }
1505
1506
359k
    if (query_context) {
1507
16
        query_context->reserve(hash, offset, size, cache_lock);
1508
16
    }
1509
359k
    return true;
1510
373k
}
1511
1512
template <class T, class U>
1513
    requires IsXLock<T> && IsXLock<U>
1514
353k
void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lock, bool sync) {
1515
353k
    auto hash = file_block->get_hash_value();
1516
353k
    auto offset = file_block->offset();
1517
353k
    auto type = file_block->cache_type();
1518
353k
    auto expiration_time = file_block->expiration_time();
1519
353k
    auto tablet_id = file_block->tablet_id();
1520
353k
    auto* cell = get_cell(hash, offset, cache_lock);
1521
353k
    file_block->cell = nullptr;
1522
353k
    DCHECK(cell);
1523
353k
    DCHECK(cell->queue_iterator);
1524
353k
    if (cell->queue_iterator) {
1525
353k
        auto& queue = get_queue(file_block->cache_type());
1526
353k
        queue.remove(*cell->queue_iterator, cache_lock);
1527
353k
        _lru_recorder->record_queue_event(file_block->cache_type(), CacheLRULogType::REMOVE,
1528
353k
                                          cell->file_block->get_hash_value(),
1529
353k
                                          cell->file_block->offset(), cell->size());
1530
353k
    }
1531
353k
    *_queue_evict_size_metrics[static_cast<int>(file_block->cache_type())]
1532
353k
            << file_block->range().size();
1533
353k
    *_total_evict_size_metrics << file_block->range().size();
1534
1535
353k
    VLOG_DEBUG << "Removing file block from cache. hash: " << hash.to_string()
1536
0
               << ", offset: " << offset << ", size: " << file_block->range().size()
1537
0
               << ", type: " << cache_type_to_string(type);
1538
1539
353k
    if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADED) {
1540
123k
        FileCacheKey key;
1541
123k
        key.hash = hash;
1542
123k
        key.offset = offset;
1543
123k
        key.meta.type = type;
1544
123k
        key.meta.expiration_time = expiration_time;
1545
123k
        key.meta.tablet_id = tablet_id;
1546
123k
        if (sync) {
1547
57.2k
            int64_t duration_ns = 0;
1548
57.2k
            Status st;
1549
57.2k
            {
1550
57.2k
                SCOPED_RAW_TIMER(&duration_ns);
1551
57.2k
                st = _storage->remove(key);
1552
57.2k
            }
1553
57.2k
            *_storage_sync_remove_latency_us << (duration_ns / 1000);
1554
57.2k
            if (!st.ok()) {
1555
0
                LOG_WARNING("").error(st);
1556
0
            }
1557
66.5k
        } else {
1558
            // the file will be deleted in the bottom half
1559
            // so there will be a window that the file is not in the cache but still in the storage
1560
            // but it's ok, because the rowset is stale already
1561
66.5k
            bool ret = _recycle_keys.enqueue(key);
1562
66.5k
            if (ret) [[likely]] {
1563
66.5k
                *_recycle_keys_length_recorder << _recycle_keys.size_approx();
1564
66.5k
            } else {
1565
0
                LOG_WARNING("Failed to push recycle key to queue, do it synchronously");
1566
0
                int64_t duration_ns = 0;
1567
0
                Status st;
1568
0
                {
1569
0
                    SCOPED_RAW_TIMER(&duration_ns);
1570
0
                    st = _storage->remove(key);
1571
0
                }
1572
0
                *_storage_retry_sync_remove_latency_us << (duration_ns / 1000);
1573
0
                if (!st.ok()) {
1574
0
                    LOG_WARNING("").error(st);
1575
0
                }
1576
0
            }
1577
66.5k
        }
1578
229k
    } else if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADING) {
1579
0
        file_block->set_deleting();
1580
0
        return;
1581
0
    }
1582
353k
    _cur_cache_size -= file_block->range().size();
1583
353k
    if (FileCacheType::TTL == type) {
1584
1.29k
        _cur_ttl_size -= file_block->range().size();
1585
1.29k
    }
1586
353k
    auto it = _files.find(hash);
1587
353k
    if (it != _files.end()) {
1588
353k
        it->second.erase(file_block->offset());
1589
353k
        if (it->second.empty()) {
1590
120k
            _files.erase(hash);
1591
120k
        }
1592
353k
    }
1593
353k
    *_num_removed_blocks << 1;
1594
353k
}
1595
1596
46
size_t BlockFileCache::get_used_cache_size(FileCacheType cache_type) const {
1597
46
    SCOPED_CACHE_LOCK(_mutex, this);
1598
46
    return get_used_cache_size_unlocked(cache_type, cache_lock);
1599
46
}
1600
1601
size_t BlockFileCache::get_used_cache_size_unlocked(FileCacheType cache_type,
1602
46
                                                    std::lock_guard<std::mutex>& cache_lock) const {
1603
46
    return get_queue(cache_type).get_capacity(cache_lock);
1604
46
}
1605
1606
0
size_t BlockFileCache::get_available_cache_size(FileCacheType cache_type) const {
1607
0
    SCOPED_CACHE_LOCK(_mutex, this);
1608
0
    return get_available_cache_size_unlocked(cache_type, cache_lock);
1609
0
}
1610
1611
size_t BlockFileCache::get_available_cache_size_unlocked(
1612
0
        FileCacheType cache_type, std::lock_guard<std::mutex>& cache_lock) const {
1613
0
    return get_queue(cache_type).get_max_element_size() -
1614
0
           get_used_cache_size_unlocked(cache_type, cache_lock);
1615
0
}
1616
1617
88
size_t BlockFileCache::get_file_blocks_num(FileCacheType cache_type) const {
1618
88
    SCOPED_CACHE_LOCK(_mutex, this);
1619
88
    return get_file_blocks_num_unlocked(cache_type, cache_lock);
1620
88
}
1621
1622
size_t BlockFileCache::get_file_blocks_num_unlocked(FileCacheType cache_type,
1623
88
                                                    std::lock_guard<std::mutex>& cache_lock) const {
1624
88
    return get_queue(cache_type).get_elements_num(cache_lock);
1625
88
}
1626
1627
FileBlockCell::FileBlockCell(FileBlockSPtr file_block, std::lock_guard<std::mutex>& cache_lock)
1628
359k
        : file_block(file_block) {
1629
359k
    file_block->cell = this;
1630
    /**
1631
     * Cell can be created with either DOWNLOADED or EMPTY file block's state.
1632
     * File block acquires DOWNLOADING state and creates LRUQueue iterator on first
1633
     * successful getOrSetDownaloder call.
1634
     */
1635
1636
359k
    switch (file_block->_download_state) {
1637
197
    case FileBlock::State::DOWNLOADED:
1638
359k
    case FileBlock::State::EMPTY:
1639
359k
    case FileBlock::State::SKIP_CACHE: {
1640
359k
        break;
1641
359k
    }
1642
0
    default:
1643
0
        DCHECK(false) << "Can create cell with either EMPTY, DOWNLOADED, SKIP_CACHE state, got: "
1644
0
                      << FileBlock::state_to_string(file_block->_download_state);
1645
359k
    }
1646
359k
    if (file_block->cache_type() == FileCacheType::TTL) {
1647
3.98k
        update_atime();
1648
3.98k
    }
1649
359k
}
1650
1651
LRUQueue::Iterator LRUQueue::add(const UInt128Wrapper& hash, size_t offset, size_t size,
1652
716k
                                 std::lock_guard<std::mutex>& /* cache_lock */) {
1653
716k
    cache_size += size;
1654
716k
    auto iter = queue.insert(queue.end(), FileKeyAndOffset(hash, offset, size));
1655
716k
    map.insert(std::make_pair(std::make_pair(hash, offset), iter));
1656
716k
    return iter;
1657
716k
}
1658
1659
0
void LRUQueue::remove_all(std::lock_guard<std::mutex>& /* cache_lock */) {
1660
0
    queue.clear();
1661
0
    map.clear();
1662
0
    cache_size = 0;
1663
0
}
1664
1665
3.61M
void LRUQueue::move_to_end(Iterator queue_it, std::lock_guard<std::mutex>& /* cache_lock */) {
1666
3.61M
    queue.splice(queue.end(), queue, queue_it);
1667
3.61M
}
1668
1669
void LRUQueue::resize(Iterator queue_it, size_t new_size,
1670
113k
                      std::lock_guard<std::mutex>& /* cache_lock */) {
1671
113k
    cache_size -= queue_it->size;
1672
113k
    queue_it->size = new_size;
1673
113k
    cache_size += new_size;
1674
113k
}
1675
bool LRUQueue::contains(const UInt128Wrapper& hash, size_t offset,
1676
56.6k
                        std::lock_guard<std::mutex>& /* cache_lock */) const {
1677
56.6k
    return map.find(std::make_pair(hash, offset)) != map.end();
1678
56.6k
}
1679
1680
LRUQueue::Iterator LRUQueue::get(const UInt128Wrapper& hash, size_t offset,
1681
2.26M
                                 std::lock_guard<std::mutex>& /* cache_lock */) const {
1682
2.26M
    auto itr = map.find(std::make_pair(hash, offset));
1683
2.26M
    if (itr != map.end()) {
1684
2.16M
        return itr->second;
1685
2.16M
    }
1686
98.5k
    return std::list<FileKeyAndOffset>::iterator();
1687
2.26M
}
1688
1689
2
std::string LRUQueue::to_string(std::lock_guard<std::mutex>& /* cache_lock */) const {
1690
2
    std::string result;
1691
18
    for (const auto& [hash, offset, size] : queue) {
1692
18
        if (!result.empty()) {
1693
16
            result += ", ";
1694
16
        }
1695
18
        result += fmt::format("{}: [{}, {}]", hash.to_string(), offset, offset + size - 1);
1696
18
    }
1697
2
    return result;
1698
2
}
1699
1700
size_t LRUQueue::levenshtein_distance_from(LRUQueue& base,
1701
5
                                           std::lock_guard<std::mutex>& cache_lock) {
1702
5
    std::list<FileKeyAndOffset> target_queue = this->queue;
1703
5
    std::list<FileKeyAndOffset> base_queue = base.queue;
1704
5
    std::vector<FileKeyAndOffset> vec1(target_queue.begin(), target_queue.end());
1705
5
    std::vector<FileKeyAndOffset> vec2(base_queue.begin(), base_queue.end());
1706
1707
5
    size_t m = vec1.size();
1708
5
    size_t n = vec2.size();
1709
1710
    // Create a 2D vector (matrix) to store the Levenshtein distances
1711
    // dp[i][j] will hold the distance between the first i elements of vec1 and the first j elements of vec2
1712
5
    std::vector<std::vector<size_t>> dp(m + 1, std::vector<size_t>(n + 1, 0));
1713
1714
    // Initialize the first row and column of the matrix
1715
    // The distance between an empty list and a list of length k is k (all insertions or deletions)
1716
22
    for (size_t i = 0; i <= m; ++i) {
1717
17
        dp[i][0] = i;
1718
17
    }
1719
20
    for (size_t j = 0; j <= n; ++j) {
1720
15
        dp[0][j] = j;
1721
15
    }
1722
1723
    // Fill the matrix using dynamic programming
1724
17
    for (size_t i = 1; i <= m; ++i) {
1725
38
        for (size_t j = 1; j <= n; ++j) {
1726
            // Check if the current elements of both vectors are equal
1727
26
            size_t cost = (vec1[i - 1].hash == vec2[j - 1].hash &&
1728
26
                           vec1[i - 1].offset == vec2[j - 1].offset)
1729
26
                                  ? 0
1730
26
                                  : 1;
1731
            // Calculate the minimum cost of three possible operations:
1732
            // 1. Insertion: dp[i][j-1] + 1
1733
            // 2. Deletion: dp[i-1][j] + 1
1734
            // 3. Substitution: dp[i-1][j-1] + cost (0 if elements are equal, 1 if not)
1735
26
            dp[i][j] = std::min({dp[i - 1][j] + 1, dp[i][j - 1] + 1, dp[i - 1][j - 1] + cost});
1736
26
        }
1737
12
    }
1738
    // The bottom-right cell of the matrix contains the Levenshtein distance
1739
5
    return dp[m][n];
1740
5
}
1741
1742
1
std::string BlockFileCache::dump_structure(const UInt128Wrapper& hash) {
1743
1
    SCOPED_CACHE_LOCK(_mutex, this);
1744
1
    return dump_structure_unlocked(hash, cache_lock);
1745
1
}
1746
1747
std::string BlockFileCache::dump_structure_unlocked(const UInt128Wrapper& hash,
1748
1
                                                    std::lock_guard<std::mutex>&) {
1749
1
    std::stringstream result;
1750
1
    auto it = _files.find(hash);
1751
1
    if (it == _files.end()) {
1752
0
        return std::string("");
1753
0
    }
1754
1
    const auto& cells_by_offset = it->second;
1755
1756
1
    for (const auto& [_, cell] : cells_by_offset) {
1757
1
        result << cell.file_block->get_info_for_log() << " "
1758
1
               << cache_type_to_string(cell.file_block->cache_type()) << "\n";
1759
1
    }
1760
1761
1
    return result.str();
1762
1
}
1763
1764
0
std::string BlockFileCache::dump_single_cache_type(const UInt128Wrapper& hash, size_t offset) {
1765
0
    SCOPED_CACHE_LOCK(_mutex, this);
1766
0
    return dump_single_cache_type_unlocked(hash, offset, cache_lock);
1767
0
}
1768
1769
std::string BlockFileCache::dump_single_cache_type_unlocked(const UInt128Wrapper& hash,
1770
                                                            size_t offset,
1771
0
                                                            std::lock_guard<std::mutex>&) {
1772
0
    std::stringstream result;
1773
0
    auto it = _files.find(hash);
1774
0
    if (it == _files.end()) {
1775
0
        return std::string("");
1776
0
    }
1777
0
    const auto& cells_by_offset = it->second;
1778
0
    const auto& cell = cells_by_offset.find(offset);
1779
1780
0
    return cache_type_to_string(cell->second.file_block->cache_type());
1781
0
}
1782
1783
void BlockFileCache::change_cache_type(const UInt128Wrapper& hash, size_t offset,
1784
                                       FileCacheType new_type,
1785
2.54k
                                       std::lock_guard<std::mutex>& cache_lock) {
1786
2.54k
    if (auto iter = _files.find(hash); iter != _files.end()) {
1787
2.54k
        auto& file_blocks = iter->second;
1788
2.54k
        if (auto cell_it = file_blocks.find(offset); cell_it != file_blocks.end()) {
1789
2.53k
            FileBlockCell& cell = cell_it->second;
1790
2.53k
            auto& cur_queue = get_queue(cell.file_block->cache_type());
1791
2.53k
            DCHECK(cell.queue_iterator.has_value());
1792
2.53k
            cur_queue.remove(*cell.queue_iterator, cache_lock);
1793
2.53k
            _lru_recorder->record_queue_event(
1794
2.53k
                    cell.file_block->cache_type(), CacheLRULogType::REMOVE,
1795
2.53k
                    cell.file_block->get_hash_value(), cell.file_block->offset(), cell.size());
1796
2.53k
            auto& new_queue = get_queue(new_type);
1797
2.53k
            cell.queue_iterator =
1798
2.53k
                    new_queue.add(hash, offset, cell.file_block->range().size(), cache_lock);
1799
2.53k
            _lru_recorder->record_queue_event(new_type, CacheLRULogType::ADD,
1800
2.53k
                                              cell.file_block->get_hash_value(),
1801
2.53k
                                              cell.file_block->offset(), cell.size());
1802
2.53k
        }
1803
2.54k
    }
1804
2.54k
}
1805
1806
// @brief: get a path's disk capacity used percent, inode used percent
1807
// @param: path
1808
// @param: percent.first disk used percent, percent.second inode used percent
1809
7.24k
int disk_used_percentage(const std::string& path, std::pair<int, int>* percent) {
1810
7.24k
    struct statfs stat;
1811
7.24k
    int ret = statfs(path.c_str(), &stat);
1812
7.24k
    if (ret != 0) {
1813
2
        return ret;
1814
2
    }
1815
    // https://github.com/coreutils/coreutils/blob/master/src/df.c#L1195
1816
    // v->used = stat.f_blocks - stat.f_bfree
1817
    // nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail
1818
7.24k
    uintmax_t u100 = (stat.f_blocks - stat.f_bfree) * 100;
1819
7.24k
    uintmax_t nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail;
1820
7.24k
    int capacity_percentage = int(u100 / nonroot_total + (u100 % nonroot_total != 0));
1821
1822
7.24k
    unsigned long long inode_free = stat.f_ffree;
1823
7.24k
    unsigned long long inode_total = stat.f_files;
1824
7.24k
    int inode_percentage = cast_set<int>(inode_free * 100 / inode_total);
1825
7.24k
    percent->first = capacity_percentage;
1826
7.24k
    percent->second = 100 - inode_percentage;
1827
1828
    // Add sync point for testing
1829
7.24k
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::disk_used_percentage:1", percent);
1830
1831
7.24k
    return 0;
1832
7.24k
}
1833
1834
10
std::string BlockFileCache::reset_capacity(size_t new_capacity) {
1835
10
    using namespace std::chrono;
1836
10
    int64_t space_released = 0;
1837
10
    size_t old_capacity = 0;
1838
10
    std::stringstream ss;
1839
10
    ss << "finish reset_capacity, path=" << _cache_base_path;
1840
10
    auto adjust_start_time = steady_clock::time_point();
1841
10
    {
1842
10
        SCOPED_CACHE_LOCK(_mutex, this);
1843
10
        if (new_capacity < _capacity && new_capacity < _cur_cache_size) {
1844
1
            int64_t need_remove_size = _cur_cache_size - new_capacity;
1845
5
            auto remove_blocks = [&](LRUQueue& queue) -> int64_t {
1846
5
                int64_t queue_released = 0;
1847
5
                std::vector<FileBlockCell*> to_evict;
1848
13
                for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1849
13
                    if (need_remove_size <= 0) {
1850
1
                        break;
1851
1
                    }
1852
12
                    need_remove_size -= entry_size;
1853
12
                    space_released += entry_size;
1854
12
                    queue_released += entry_size;
1855
12
                    auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1856
12
                    if (!cell->releasable()) {
1857
0
                        cell->file_block->set_deleting();
1858
0
                        continue;
1859
0
                    }
1860
12
                    to_evict.push_back(cell);
1861
12
                }
1862
12
                for (auto& cell : to_evict) {
1863
12
                    FileBlockSPtr file_block = cell->file_block;
1864
12
                    std::lock_guard block_lock(file_block->_mutex);
1865
12
                    remove(file_block, cache_lock, block_lock);
1866
12
                }
1867
5
                return queue_released;
1868
5
            };
1869
1
            int64_t queue_released = remove_blocks(_disposable_queue);
1870
1
            ss << " disposable_queue released " << queue_released;
1871
1
            queue_released = remove_blocks(_normal_queue);
1872
1
            ss << " normal_queue released " << queue_released;
1873
1
            queue_released = remove_blocks(_index_queue);
1874
1
            ss << " index_queue released " << queue_released;
1875
1
            queue_released = remove_blocks(_ttl_queue);
1876
1
            ss << " ttl_queue released " << queue_released;
1877
1
            queue_released = remove_blocks(_meta_queue);
1878
1
            ss << " meta_queue released " << queue_released;
1879
1880
1
            _disk_resource_limit_mode = true;
1881
1
            _disk_limit_mode_metrics->set_value(1);
1882
1
            ss << " total_space_released=" << space_released;
1883
1
        }
1884
10
        old_capacity = _capacity;
1885
10
        _capacity = new_capacity;
1886
10
        _cache_capacity_metrics->set_value(_capacity);
1887
10
    }
1888
10
    auto use_time = duration_cast<milliseconds>(steady_clock::time_point() - adjust_start_time);
1889
10
    LOG(INFO) << "Finish tag deleted block. path=" << _cache_base_path
1890
10
              << " use_time=" << cast_set<int64_t>(use_time.count());
1891
10
    ss << " old_capacity=" << old_capacity << " new_capacity=" << new_capacity;
1892
10
    LOG(INFO) << ss.str();
1893
10
    return ss.str();
1894
10
}
1895
1896
4.47k
void BlockFileCache::check_disk_resource_limit() {
1897
4.47k
    if (_storage->get_type() != FileCacheStorageType::DISK) {
1898
784
        return;
1899
784
    }
1900
1901
3.68k
    bool previous_mode = _disk_resource_limit_mode;
1902
3.68k
    if (_capacity > _cur_cache_size) {
1903
3.68k
        _disk_resource_limit_mode = false;
1904
3.68k
        _disk_limit_mode_metrics->set_value(0);
1905
3.68k
    }
1906
3.68k
    std::pair<int, int> percent;
1907
3.68k
    int ret = disk_used_percentage(_cache_base_path, &percent);
1908
3.68k
    if (ret != 0) {
1909
1
        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
1910
1
        return;
1911
1
    }
1912
3.68k
    auto [space_percentage, inode_percentage] = percent;
1913
7.37k
    auto is_insufficient = [](const int& percentage) {
1914
7.37k
        return percentage >= config::file_cache_enter_disk_resource_limit_mode_percent;
1915
7.37k
    };
1916
3.68k
    DCHECK_GE(space_percentage, 0);
1917
3.68k
    DCHECK_LE(space_percentage, 100);
1918
3.68k
    DCHECK_GE(inode_percentage, 0);
1919
3.68k
    DCHECK_LE(inode_percentage, 100);
1920
    // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
1921
    // FIXME: reject with config validator
1922
3.68k
    if (config::file_cache_enter_disk_resource_limit_mode_percent <
1923
3.68k
        config::file_cache_exit_disk_resource_limit_mode_percent) {
1924
1
        LOG_WARNING("config error, set to default value")
1925
1
                .tag("enter", config::file_cache_enter_disk_resource_limit_mode_percent)
1926
1
                .tag("exit", config::file_cache_exit_disk_resource_limit_mode_percent);
1927
1
        config::file_cache_enter_disk_resource_limit_mode_percent = 88;
1928
1
        config::file_cache_exit_disk_resource_limit_mode_percent = 80;
1929
1
    }
1930
3.68k
    bool is_space_insufficient = is_insufficient(space_percentage);
1931
3.68k
    bool is_inode_insufficient = is_insufficient(inode_percentage);
1932
3.68k
    if (is_space_insufficient || is_inode_insufficient) {
1933
5
        _disk_resource_limit_mode = true;
1934
5
        _disk_limit_mode_metrics->set_value(1);
1935
3.68k
    } else if (_disk_resource_limit_mode &&
1936
3.68k
               (space_percentage < config::file_cache_exit_disk_resource_limit_mode_percent) &&
1937
3.68k
               (inode_percentage < config::file_cache_exit_disk_resource_limit_mode_percent)) {
1938
0
        _disk_resource_limit_mode = false;
1939
0
        _disk_limit_mode_metrics->set_value(0);
1940
0
    }
1941
3.68k
    if (previous_mode != _disk_resource_limit_mode) {
1942
        // add log for disk resource limit mode switching
1943
10
        if (_disk_resource_limit_mode) {
1944
5
            LOG(WARNING) << "Entering disk resource limit mode: file_cache=" << get_base_path()
1945
5
                         << " space_percent=" << space_percentage
1946
5
                         << " inode_percent=" << inode_percentage
1947
5
                         << " is_space_insufficient=" << is_space_insufficient
1948
5
                         << " is_inode_insufficient=" << is_inode_insufficient
1949
5
                         << " enter threshold="
1950
5
                         << config::file_cache_enter_disk_resource_limit_mode_percent;
1951
5
        } else {
1952
5
            LOG(INFO) << "Exiting disk resource limit mode: file_cache=" << get_base_path()
1953
5
                      << " space_percent=" << space_percentage
1954
5
                      << " inode_percent=" << inode_percentage << " exit threshold="
1955
5
                      << config::file_cache_exit_disk_resource_limit_mode_percent;
1956
5
        }
1957
3.67k
    } else if (_disk_resource_limit_mode) {
1958
        // print log for disk resource limit mode running, but less frequently
1959
0
        LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path()
1960
0
                                 << " space_percent=" << space_percentage
1961
0
                                 << " inode_percent=" << inode_percentage
1962
0
                                 << " is_space_insufficient=" << is_space_insufficient
1963
0
                                 << " is_inode_insufficient=" << is_inode_insufficient
1964
0
                                 << " mode run in resource limit";
1965
0
    }
1966
3.68k
}
1967
1968
3.56k
void BlockFileCache::check_need_evict_cache_in_advance() {
1969
3.56k
    if (_storage->get_type() != FileCacheStorageType::DISK) {
1970
1
        return;
1971
1
    }
1972
1973
3.56k
    std::pair<int, int> percent;
1974
3.56k
    int ret = disk_used_percentage(_cache_base_path, &percent);
1975
3.56k
    if (ret != 0) {
1976
1
        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
1977
1
        return;
1978
1
    }
1979
3.55k
    auto [space_percentage, inode_percentage] = percent;
1980
3.55k
    int size_percentage = static_cast<int>(_cur_cache_size * 100 / _capacity);
1981
10.6k
    auto is_insufficient = [](const int& percentage) {
1982
10.6k
        return percentage >= config::file_cache_enter_need_evict_cache_in_advance_percent;
1983
10.6k
    };
1984
3.55k
    DCHECK_GE(space_percentage, 0);
1985
3.55k
    DCHECK_LE(space_percentage, 100);
1986
3.55k
    DCHECK_GE(inode_percentage, 0);
1987
3.55k
    DCHECK_LE(inode_percentage, 100);
1988
    // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
1989
    // FIXME: reject with config validator
1990
3.55k
    if (config::file_cache_enter_need_evict_cache_in_advance_percent <=
1991
3.55k
        config::file_cache_exit_need_evict_cache_in_advance_percent) {
1992
1
        LOG_WARNING("config error, set to default value")
1993
1
                .tag("enter", config::file_cache_enter_need_evict_cache_in_advance_percent)
1994
1
                .tag("exit", config::file_cache_exit_need_evict_cache_in_advance_percent);
1995
1
        config::file_cache_enter_need_evict_cache_in_advance_percent = 78;
1996
1
        config::file_cache_exit_need_evict_cache_in_advance_percent = 75;
1997
1
    }
1998
3.55k
    bool previous_mode = _need_evict_cache_in_advance;
1999
3.55k
    bool is_space_insufficient = is_insufficient(space_percentage);
2000
3.55k
    bool is_inode_insufficient = is_insufficient(inode_percentage);
2001
3.55k
    bool is_size_insufficient = is_insufficient(size_percentage);
2002
3.55k
    if (is_space_insufficient || is_inode_insufficient || is_size_insufficient) {
2003
68
        _need_evict_cache_in_advance = true;
2004
68
        _need_evict_cache_in_advance_metrics->set_value(1);
2005
3.49k
    } else if (_need_evict_cache_in_advance &&
2006
3.49k
               (space_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
2007
3.49k
               (inode_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
2008
3.49k
               (size_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent)) {
2009
58
        _need_evict_cache_in_advance = false;
2010
58
        _need_evict_cache_in_advance_metrics->set_value(0);
2011
58
    }
2012
3.55k
    if (previous_mode != _need_evict_cache_in_advance) {
2013
        // add log for evict cache in advance mode switching
2014
120
        if (_need_evict_cache_in_advance) {
2015
62
            LOG(WARNING) << "Entering evict cache in advance mode: "
2016
62
                         << "file_cache=" << get_base_path()
2017
62
                         << " space_percent=" << space_percentage
2018
62
                         << " inode_percent=" << inode_percentage
2019
62
                         << " size_percent=" << size_percentage
2020
62
                         << " is_space_insufficient=" << is_space_insufficient
2021
62
                         << " is_inode_insufficient=" << is_inode_insufficient
2022
62
                         << " is_size_insufficient=" << is_size_insufficient << " enter threshold="
2023
62
                         << config::file_cache_enter_need_evict_cache_in_advance_percent;
2024
62
        } else {
2025
58
            LOG(INFO) << "Exiting evict cache in advance mode: "
2026
58
                      << "file_cache=" << get_base_path() << " space_percent=" << space_percentage
2027
58
                      << " inode_percent=" << inode_percentage
2028
58
                      << " size_percent=" << size_percentage << " exit threshold="
2029
58
                      << config::file_cache_exit_need_evict_cache_in_advance_percent;
2030
58
        }
2031
3.43k
    } else if (_need_evict_cache_in_advance) {
2032
        // print log for evict cache in advance mode running, but less frequently
2033
6
        LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path()
2034
2
                                 << " space_percent=" << space_percentage
2035
2
                                 << " inode_percent=" << inode_percentage
2036
2
                                 << " size_percent=" << size_percentage
2037
2
                                 << " is_space_insufficient=" << is_space_insufficient
2038
2
                                 << " is_inode_insufficient=" << is_inode_insufficient
2039
2
                                 << " is_size_insufficient=" << is_size_insufficient
2040
2
                                 << " need evict cache in advance";
2041
6
    }
2042
3.55k
}
2043
2044
150
void BlockFileCache::run_background_monitor() {
2045
150
    Thread::set_self_name("run_background_monitor");
2046
4.47k
    while (!_close) {
2047
4.47k
        int64_t interval_ms = config::file_cache_background_monitor_interval_ms;
2048
4.47k
        TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_ms);
2049
4.47k
        check_disk_resource_limit();
2050
4.47k
        if (config::enable_evict_file_cache_in_advance) {
2051
3.55k
            check_need_evict_cache_in_advance();
2052
3.55k
        } else {
2053
919
            _need_evict_cache_in_advance = false;
2054
919
            _need_evict_cache_in_advance_metrics->set_value(0);
2055
919
        }
2056
2057
4.47k
        {
2058
4.47k
            std::unique_lock close_lock(_close_mtx);
2059
4.47k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2060
4.47k
            if (_close) {
2061
147
                break;
2062
147
            }
2063
4.47k
        }
2064
        // report
2065
4.32k
        {
2066
4.32k
            SCOPED_CACHE_LOCK(_mutex, this);
2067
4.32k
            _cur_cache_size_metrics->set_value(_cur_cache_size);
2068
4.32k
            _cur_ttl_cache_size_metrics->set_value(_cur_cache_size -
2069
4.32k
                                                   _index_queue.get_capacity(cache_lock) -
2070
4.32k
                                                   _normal_queue.get_capacity(cache_lock) -
2071
4.32k
                                                   _disposable_queue.get_capacity(cache_lock) -
2072
4.32k
                                                   _meta_queue.get_capacity(cache_lock));
2073
4.32k
            _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(
2074
4.32k
                    _ttl_queue.get_capacity(cache_lock));
2075
4.32k
            _cur_ttl_cache_lru_queue_element_count_metrics->set_value(
2076
4.32k
                    _ttl_queue.get_elements_num(cache_lock));
2077
4.32k
            _cur_normal_queue_cache_size_metrics->set_value(_normal_queue.get_capacity(cache_lock));
2078
4.32k
            _cur_normal_queue_element_count_metrics->set_value(
2079
4.32k
                    _normal_queue.get_elements_num(cache_lock));
2080
4.32k
            _cur_index_queue_cache_size_metrics->set_value(_index_queue.get_capacity(cache_lock));
2081
4.32k
            _cur_index_queue_element_count_metrics->set_value(
2082
4.32k
                    _index_queue.get_elements_num(cache_lock));
2083
4.32k
            _cur_disposable_queue_cache_size_metrics->set_value(
2084
4.32k
                    _disposable_queue.get_capacity(cache_lock));
2085
4.32k
            _cur_disposable_queue_element_count_metrics->set_value(
2086
4.32k
                    _disposable_queue.get_elements_num(cache_lock));
2087
4.32k
            _cur_meta_queue_cache_size_metrics->set_value(_meta_queue.get_capacity(cache_lock));
2088
4.32k
            _cur_meta_queue_element_count_metrics->set_value(
2089
4.32k
                    _meta_queue.get_elements_num(cache_lock));
2090
2091
            // Update meta store write queue size if storage is FSFileCacheStorage
2092
4.32k
            if (_storage->get_type() == FileCacheStorageType::DISK) {
2093
3.55k
                auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get());
2094
3.55k
                if (fs_storage != nullptr) {
2095
3.55k
                    auto* meta_store = fs_storage->get_meta_store();
2096
3.55k
                    if (meta_store != nullptr) {
2097
3.55k
                        _meta_store_write_queue_size_metrics->set_value(
2098
3.55k
                                meta_store->get_write_queue_size());
2099
3.55k
                    }
2100
3.55k
                }
2101
3.55k
            }
2102
2103
4.32k
            if (_num_read_blocks->get_value() > 0) {
2104
2.27k
                _hit_ratio->set_value((double)_num_hit_blocks->get_value() /
2105
2.27k
                                      (double)_num_read_blocks->get_value());
2106
2.27k
            }
2107
4.32k
            if (_num_read_blocks_5m && _num_read_blocks_5m->get_value() > 0) {
2108
1.42k
                _hit_ratio_5m->set_value((double)_num_hit_blocks_5m->get_value() /
2109
1.42k
                                         (double)_num_read_blocks_5m->get_value());
2110
1.42k
            }
2111
4.32k
            if (_num_read_blocks_1h && _num_read_blocks_1h->get_value() > 0) {
2112
2.25k
                _hit_ratio_1h->set_value((double)_num_hit_blocks_1h->get_value() /
2113
2.25k
                                         (double)_num_read_blocks_1h->get_value());
2114
2.25k
            }
2115
2116
4.32k
            if (_no_warmup_num_hit_blocks->get_value() > 0) {
2117
2.23k
                _no_warmup_hit_ratio->set_value((double)_no_warmup_num_hit_blocks->get_value() /
2118
2.23k
                                                (double)_no_warmup_num_read_blocks->get_value());
2119
2.23k
            }
2120
4.32k
            if (_no_warmup_num_hit_blocks_5m && _no_warmup_num_hit_blocks_5m->get_value() > 0) {
2121
1.39k
                _no_warmup_hit_ratio_5m->set_value(
2122
1.39k
                        (double)_no_warmup_num_hit_blocks_5m->get_value() /
2123
1.39k
                        (double)_no_warmup_num_read_blocks_5m->get_value());
2124
1.39k
            }
2125
4.32k
            if (_no_warmup_num_hit_blocks_1h && _no_warmup_num_hit_blocks_1h->get_value() > 0) {
2126
2.23k
                _no_warmup_hit_ratio_1h->set_value(
2127
2.23k
                        (double)_no_warmup_num_hit_blocks_1h->get_value() /
2128
2.23k
                        (double)_no_warmup_num_read_blocks_1h->get_value());
2129
2.23k
            }
2130
4.32k
        }
2131
4.32k
    }
2132
150
}
2133
2134
150
void BlockFileCache::run_background_gc() {
2135
150
    Thread::set_self_name("run_background_gc");
2136
150
    FileCacheKey key;
2137
150
    size_t batch_count = 0;
2138
176k
    while (!_close) {
2139
176k
        int64_t interval_ms = config::file_cache_background_gc_interval_ms;
2140
176k
        size_t batch_limit = config::file_cache_remove_block_qps_limit * interval_ms / 1000;
2141
176k
        {
2142
176k
            std::unique_lock close_lock(_close_mtx);
2143
176k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2144
176k
            if (_close) {
2145
147
                break;
2146
147
            }
2147
176k
        }
2148
2149
243k
        while (batch_count < batch_limit && _recycle_keys.try_dequeue(key)) {
2150
66.4k
            int64_t duration_ns = 0;
2151
66.4k
            Status st;
2152
66.4k
            {
2153
66.4k
                SCOPED_RAW_TIMER(&duration_ns);
2154
66.4k
                st = _storage->remove(key);
2155
66.4k
            }
2156
66.4k
            *_storage_async_remove_latency_us << (duration_ns / 1000);
2157
2158
66.4k
            if (!st.ok()) {
2159
2
                LOG_WARNING("").error(st);
2160
2
            }
2161
66.4k
            batch_count++;
2162
66.4k
        }
2163
176k
        *_recycle_keys_length_recorder << _recycle_keys.size_approx();
2164
176k
        batch_count = 0;
2165
176k
    }
2166
150
}
2167
2168
150
void BlockFileCache::run_background_evict_in_advance() {
2169
150
    Thread::set_self_name("run_background_evict_in_advance");
2170
150
    LOG(INFO) << "Starting background evict in advance thread";
2171
150
    int64_t batch = 0;
2172
21.8k
    while (!_close) {
2173
21.8k
        {
2174
21.8k
            std::unique_lock close_lock(_close_mtx);
2175
21.8k
            _close_cv.wait_for(
2176
21.8k
                    close_lock,
2177
21.8k
                    std::chrono::milliseconds(config::file_cache_evict_in_advance_interval_ms));
2178
21.8k
            if (_close) {
2179
147
                LOG(INFO) << "Background evict in advance thread exiting due to cache closing";
2180
147
                break;
2181
147
            }
2182
21.8k
        }
2183
21.6k
        batch = config::file_cache_evict_in_advance_batch_bytes;
2184
2185
        // Skip if eviction not needed or too many pending recycles
2186
21.6k
        if (!_need_evict_cache_in_advance ||
2187
21.6k
            _recycle_keys.size_approx() >=
2188
21.3k
                    config::file_cache_evict_in_advance_recycle_keys_num_threshold) {
2189
21.3k
            continue;
2190
21.3k
        }
2191
2192
268
        int64_t duration_ns = 0;
2193
268
        {
2194
268
            SCOPED_CACHE_LOCK(_mutex, this);
2195
268
            SCOPED_RAW_TIMER(&duration_ns);
2196
268
            try_evict_in_advance(batch, cache_lock);
2197
268
        }
2198
268
        *_evict_in_advance_latency_us << (duration_ns / 1000);
2199
268
    }
2200
150
}
2201
2202
150
void BlockFileCache::run_background_block_lru_update() {
2203
150
    Thread::set_self_name("run_background_block_lru_update");
2204
150
    std::vector<FileBlockSPtr> batch;
2205
7.57k
    while (!_close) {
2206
7.57k
        int64_t interval_ms = config::file_cache_background_block_lru_update_interval_ms;
2207
7.57k
        size_t batch_limit =
2208
7.57k
                config::file_cache_background_block_lru_update_qps_limit * interval_ms / 1000;
2209
7.57k
        {
2210
7.57k
            std::unique_lock close_lock(_close_mtx);
2211
7.57k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2212
7.57k
            if (_close) {
2213
147
                break;
2214
147
            }
2215
7.57k
        }
2216
2217
7.42k
        batch.clear();
2218
7.42k
        batch.reserve(batch_limit);
2219
7.42k
        size_t drained = _need_update_lru_blocks.drain(batch_limit, &batch);
2220
7.42k
        if (drained == 0) {
2221
7.41k
            *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2222
7.41k
            continue;
2223
7.41k
        }
2224
2225
7
        int64_t duration_ns = 0;
2226
7
        {
2227
7
            SCOPED_CACHE_LOCK(_mutex, this);
2228
7
            SCOPED_RAW_TIMER(&duration_ns);
2229
478
            for (auto& block : batch) {
2230
478
                update_block_lru(block, cache_lock);
2231
478
            }
2232
7
        }
2233
7
        *_update_lru_blocks_latency_us << (duration_ns / 1000);
2234
7
        *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2235
7
    }
2236
150
}
2237
2238
std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>>
2239
2
BlockFileCache::get_hot_blocks_meta(const UInt128Wrapper& hash) const {
2240
2
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
2241
2
                               std::chrono::steady_clock::now().time_since_epoch())
2242
2
                               .count();
2243
2
    SCOPED_CACHE_LOCK(_mutex, this);
2244
2
    std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>> blocks_meta;
2245
2
    if (auto iter = _files.find(hash); iter != _files.end()) {
2246
5
        for (auto& pair : _files.find(hash)->second) {
2247
5
            const FileBlockCell* cell = &pair.second;
2248
5
            if (cell->file_block->cache_type() != FileCacheType::DISPOSABLE) {
2249
4
                if (cell->file_block->cache_type() == FileCacheType::TTL ||
2250
4
                    (cell->atime != 0 &&
2251
3
                     cur_time - cell->atime <
2252
3
                             get_queue(cell->file_block->cache_type()).get_hot_data_interval())) {
2253
3
                    blocks_meta.emplace_back(pair.first, cell->size(),
2254
3
                                             cell->file_block->cache_type(),
2255
3
                                             cell->file_block->expiration_time());
2256
3
                }
2257
4
            }
2258
5
        }
2259
2
    }
2260
2
    return blocks_meta;
2261
2
}
2262
2263
bool BlockFileCache::try_reserve_during_async_load(size_t size,
2264
2
                                                   std::lock_guard<std::mutex>& cache_lock) {
2265
2
    size_t removed_size = 0;
2266
2
    size_t normal_queue_size = _normal_queue.get_capacity(cache_lock);
2267
2
    size_t disposable_queue_size = _disposable_queue.get_capacity(cache_lock);
2268
2
    size_t index_queue_size = _index_queue.get_capacity(cache_lock);
2269
2
    size_t meta_queue_size = _meta_queue.get_capacity(cache_lock);
2270
2271
2
    std::vector<FileBlockCell*> to_evict;
2272
2
    auto collect_eliminate_fragments = [&](LRUQueue& queue) {
2273
3
        for (const auto& [entry_key, entry_offset, entry_size] : queue) {
2274
3
            if (!_disk_resource_limit_mode || removed_size >= size) {
2275
2
                break;
2276
2
            }
2277
1
            auto* cell = get_cell(entry_key, entry_offset, cache_lock);
2278
2279
1
            DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
2280
0
                         << ", offset: " << entry_offset;
2281
2282
1
            size_t cell_size = cell->size();
2283
1
            DCHECK(entry_size == cell_size);
2284
2285
1
            if (cell->releasable()) {
2286
1
                auto& file_block = cell->file_block;
2287
2288
1
                std::lock_guard block_lock(file_block->_mutex);
2289
1
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
2290
1
                to_evict.push_back(cell);
2291
1
                removed_size += cell_size;
2292
1
            }
2293
1
        }
2294
2
    };
2295
2
    if (disposable_queue_size != 0) {
2296
0
        collect_eliminate_fragments(get_queue(FileCacheType::DISPOSABLE));
2297
0
    }
2298
2
    if (normal_queue_size != 0) {
2299
2
        collect_eliminate_fragments(get_queue(FileCacheType::NORMAL));
2300
2
    }
2301
2
    if (index_queue_size != 0) {
2302
0
        collect_eliminate_fragments(get_queue(FileCacheType::INDEX));
2303
0
    }
2304
2
    if (meta_queue_size != 0) {
2305
0
        collect_eliminate_fragments(get_queue(FileCacheType::META));
2306
0
    }
2307
2
    std::string reason = "async load";
2308
2
    remove_file_blocks(to_evict, cache_lock, true, reason);
2309
2310
2
    return !_disk_resource_limit_mode || removed_size >= size;
2311
2
}
2312
2313
30
void BlockFileCache::clear_need_update_lru_blocks() {
2314
30
    _need_update_lru_blocks.clear();
2315
30
    *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2316
30
}
2317
2318
29
void BlockFileCache::pause_ttl_manager() {
2319
29
    if (_ttl_mgr) {
2320
8
        _ttl_mgr->stop();
2321
8
    }
2322
29
}
2323
2324
29
void BlockFileCache::resume_ttl_manager() {
2325
29
    if (_ttl_mgr) {
2326
8
        _ttl_mgr->resume();
2327
8
    }
2328
29
}
2329
2330
29
std::string BlockFileCache::clear_file_cache_directly() {
2331
29
    pause_ttl_manager();
2332
29
    _lru_dumper->remove_lru_dump_files();
2333
29
    using namespace std::chrono;
2334
29
    std::stringstream ss;
2335
29
    auto start = steady_clock::now();
2336
29
    std::string result;
2337
29
    {
2338
29
        SCOPED_CACHE_LOCK(_mutex, this);
2339
29
        LOG_INFO("start clear_file_cache_directly").tag("path", _cache_base_path);
2340
2341
29
        std::string clear_msg;
2342
29
        auto s = _storage->clear(clear_msg);
2343
29
        if (!s.ok()) {
2344
1
            result = clear_msg;
2345
28
        } else {
2346
28
            int64_t num_files = _files.size();
2347
28
            int64_t cache_size = _cur_cache_size;
2348
28
            int64_t index_queue_size = _index_queue.get_elements_num(cache_lock);
2349
28
            int64_t normal_queue_size = _normal_queue.get_elements_num(cache_lock);
2350
28
            int64_t disposible_queue_size = _disposable_queue.get_elements_num(cache_lock);
2351
28
            int64_t ttl_queue_size = _ttl_queue.get_elements_num(cache_lock);
2352
28
            int64_t meta_queue_size = _meta_queue.get_elements_num(cache_lock);
2353
2354
28
            int64_t clear_fd_duration = 0;
2355
28
            {
2356
                // clear FDCache to release fd
2357
28
                SCOPED_RAW_TIMER(&clear_fd_duration);
2358
41
                for (const auto& [file_key, file_blocks] : _files) {
2359
485
                    for (const auto& [offset, file_block_cell] : file_blocks) {
2360
485
                        AccessKeyAndOffset access_key_and_offset(file_key, offset);
2361
485
                        FDCache::instance()->remove_file_reader(access_key_and_offset);
2362
485
                    }
2363
41
                }
2364
28
            }
2365
2366
28
            _files.clear();
2367
28
            _cur_cache_size = 0;
2368
28
            _cur_ttl_size = 0;
2369
28
            _time_to_key.clear();
2370
28
            _key_to_time.clear();
2371
28
            _index_queue.clear(cache_lock);
2372
28
            _normal_queue.clear(cache_lock);
2373
28
            _disposable_queue.clear(cache_lock);
2374
28
            _ttl_queue.clear(cache_lock);
2375
28
            _meta_queue.clear(cache_lock);
2376
2377
            // Update cache metrics immediately so consumers observe the cleared state
2378
            // without waiting for the next background monitor round.
2379
28
            _cur_cache_size_metrics->set_value(0);
2380
28
            _cur_ttl_cache_size_metrics->set_value(0);
2381
28
            _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(0);
2382
28
            _cur_ttl_cache_lru_queue_element_count_metrics->set_value(0);
2383
28
            _cur_normal_queue_cache_size_metrics->set_value(0);
2384
28
            _cur_normal_queue_element_count_metrics->set_value(0);
2385
28
            _cur_index_queue_cache_size_metrics->set_value(0);
2386
28
            _cur_index_queue_element_count_metrics->set_value(0);
2387
28
            _cur_disposable_queue_cache_size_metrics->set_value(0);
2388
28
            _cur_disposable_queue_element_count_metrics->set_value(0);
2389
28
            _cur_meta_queue_cache_size_metrics->set_value(0);
2390
28
            _cur_meta_queue_element_count_metrics->set_value(0);
2391
2392
28
            clear_need_update_lru_blocks();
2393
2394
28
            ss << "finish clear_file_cache_directly"
2395
28
               << " path=" << _cache_base_path << " time_elapsed_ms="
2396
28
               << duration_cast<milliseconds>(steady_clock::now() - start).count()
2397
28
               << " fd_clear_time_ms=" << (clear_fd_duration / 1000000)
2398
28
               << " num_files=" << num_files << " cache_size=" << cache_size
2399
28
               << " index_queue_size=" << index_queue_size
2400
28
               << " normal_queue_size=" << normal_queue_size
2401
28
               << " disposible_queue_size=" << disposible_queue_size
2402
28
               << " ttl_queue_size=" << ttl_queue_size << " meta_queue_size=" << meta_queue_size;
2403
28
            result = ss.str();
2404
28
            LOG(INFO) << result;
2405
28
        }
2406
29
    }
2407
29
    _lru_dumper->remove_lru_dump_files();
2408
29
    resume_ttl_manager();
2409
29
    return result;
2410
29
}
2411
2412
31.2k
std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const UInt128Wrapper& hash) {
2413
31.2k
    std::map<size_t, FileBlockSPtr> offset_to_block;
2414
31.2k
    SCOPED_CACHE_LOCK(_mutex, this);
2415
31.2k
    if (_files.contains(hash)) {
2416
26.1k
        for (auto& [offset, cell] : _files[hash]) {
2417
26.1k
            if (cell.file_block->state() == FileBlock::State::DOWNLOADED) {
2418
26.1k
                cell.file_block->_owned_by_cached_reader = true;
2419
26.1k
                offset_to_block.emplace(offset, cell.file_block);
2420
26.1k
            }
2421
26.1k
        }
2422
25.3k
    }
2423
31.2k
    return offset_to_block;
2424
31.2k
}
2425
2426
0
void BlockFileCache::update_ttl_atime(const UInt128Wrapper& hash) {
2427
0
    SCOPED_CACHE_LOCK(_mutex, this);
2428
0
    if (auto iter = _files.find(hash); iter != _files.end()) {
2429
0
        for (auto& [_, cell] : iter->second) {
2430
0
            cell.update_atime();
2431
0
        }
2432
0
    };
2433
0
}
2434
2435
150
void BlockFileCache::run_background_lru_log_replay() {
2436
150
    Thread::set_self_name("run_background_lru_log_replay");
2437
21.6k
    while (!_close) {
2438
21.6k
        int64_t interval_ms = config::file_cache_background_lru_log_replay_interval_ms;
2439
21.6k
        {
2440
21.6k
            std::unique_lock close_lock(_close_mtx);
2441
21.6k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2442
21.6k
            if (_close) {
2443
147
                break;
2444
147
            }
2445
21.6k
        }
2446
2447
21.4k
        _lru_recorder->replay_queue_event(FileCacheType::TTL);
2448
21.4k
        _lru_recorder->replay_queue_event(FileCacheType::META);
2449
21.4k
        _lru_recorder->replay_queue_event(FileCacheType::INDEX);
2450
21.4k
        _lru_recorder->replay_queue_event(FileCacheType::NORMAL);
2451
21.4k
        _lru_recorder->replay_queue_event(FileCacheType::DISPOSABLE);
2452
2453
21.4k
        if (config::enable_evaluate_shadow_queue_diff) {
2454
0
            SCOPED_CACHE_LOCK(_mutex, this);
2455
0
            _lru_recorder->evaluate_queue_diff(_ttl_queue, "ttl", cache_lock);
2456
0
            _lru_recorder->evaluate_queue_diff(_meta_queue, "meta", cache_lock);
2457
0
            _lru_recorder->evaluate_queue_diff(_index_queue, "index", cache_lock);
2458
0
            _lru_recorder->evaluate_queue_diff(_normal_queue, "normal", cache_lock);
2459
0
            _lru_recorder->evaluate_queue_diff(_disposable_queue, "disposable", cache_lock);
2460
0
        }
2461
21.4k
    }
2462
150
}
2463
2464
1.58k
void BlockFileCache::dump_lru_queues(bool force) {
2465
1.58k
    std::unique_lock dump_lock(_dump_lru_queues_mtx);
2466
1.58k
    if (config::file_cache_background_lru_dump_tail_record_num > 0 &&
2467
1.58k
        !ExecEnv::GetInstance()->get_is_upgrading()) {
2468
1.58k
        _lru_dumper->dump_queue("disposable", force);
2469
1.58k
        _lru_dumper->dump_queue("normal", force);
2470
1.58k
        _lru_dumper->dump_queue("index", force);
2471
1.58k
        _lru_dumper->dump_queue("meta", force);
2472
1.58k
        _lru_dumper->dump_queue("ttl", force);
2473
1.58k
        _lru_dumper->set_first_dump_done();
2474
1.58k
    }
2475
1.58k
}
2476
2477
150
void BlockFileCache::run_background_lru_dump() {
2478
150
    Thread::set_self_name("run_background_lru_dump");
2479
1.73k
    while (!_close) {
2480
1.73k
        int64_t interval_ms = config::file_cache_background_lru_dump_interval_ms;
2481
1.73k
        {
2482
1.73k
            std::unique_lock close_lock(_close_mtx);
2483
1.73k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2484
1.73k
            if (_close) {
2485
147
                break;
2486
147
            }
2487
1.73k
        }
2488
1.58k
        dump_lru_queues(false);
2489
1.58k
    }
2490
150
}
2491
2492
150
void BlockFileCache::restore_lru_queues_from_disk(std::lock_guard<std::mutex>& cache_lock) {
2493
    // keep this order coz may be duplicated in different queue, we use the first appearence
2494
150
    _lru_dumper->restore_queue(_ttl_queue, "ttl", cache_lock);
2495
150
    _lru_dumper->restore_queue(_meta_queue, "meta", cache_lock);
2496
150
    _lru_dumper->restore_queue(_index_queue, "index", cache_lock);
2497
150
    _lru_dumper->restore_queue(_normal_queue, "normal", cache_lock);
2498
150
    _lru_dumper->restore_queue(_disposable_queue, "disposable", cache_lock);
2499
150
}
2500
2501
65
std::map<std::string, double> BlockFileCache::get_stats() {
2502
65
    std::map<std::string, double> stats;
2503
65
    stats["hits_ratio"] = (double)_hit_ratio->get_value();
2504
65
    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
2505
65
    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
2506
2507
65
    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
2508
65
    stats["index_queue_curr_size"] = (double)_cur_index_queue_cache_size_metrics->get_value();
2509
65
    stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
2510
65
    stats["index_queue_curr_elements"] =
2511
65
            (double)_cur_index_queue_element_count_metrics->get_value();
2512
2513
65
    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
2514
65
    stats["ttl_queue_curr_size"] = (double)_cur_ttl_cache_lru_queue_cache_size_metrics->get_value();
2515
65
    stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
2516
65
    stats["ttl_queue_curr_elements"] =
2517
65
            (double)_cur_ttl_cache_lru_queue_element_count_metrics->get_value();
2518
2519
65
    stats["meta_queue_max_size"] = (double)_meta_queue.get_max_size();
2520
65
    stats["meta_queue_curr_size"] = (double)_cur_meta_queue_cache_size_metrics->get_value();
2521
65
    stats["meta_queue_max_elements"] = (double)_meta_queue.get_max_element_size();
2522
65
    stats["meta_queue_curr_elements"] = (double)_cur_meta_queue_element_count_metrics->get_value();
2523
2524
65
    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
2525
65
    stats["normal_queue_curr_size"] = (double)_cur_normal_queue_cache_size_metrics->get_value();
2526
65
    stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
2527
65
    stats["normal_queue_curr_elements"] =
2528
65
            (double)_cur_normal_queue_element_count_metrics->get_value();
2529
2530
65
    stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
2531
65
    stats["disposable_queue_curr_size"] =
2532
65
            (double)_cur_disposable_queue_cache_size_metrics->get_value();
2533
65
    stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
2534
65
    stats["disposable_queue_curr_elements"] =
2535
65
            (double)_cur_disposable_queue_element_count_metrics->get_value();
2536
2537
65
    stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance;
2538
65
    stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode;
2539
2540
65
    stats["total_removed_counts"] = (double)_num_removed_blocks->get_value();
2541
65
    stats["total_hit_counts"] = (double)_num_hit_blocks->get_value();
2542
65
    stats["total_read_counts"] = (double)_num_read_blocks->get_value();
2543
2544
65
    stats["total_read_size"] = (double)_total_read_size_metrics->get_value();
2545
65
    stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value();
2546
65
    stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value();
2547
2548
65
    return stats;
2549
65
}
2550
2551
// for be UTs
2552
187
std::map<std::string, double> BlockFileCache::get_stats_unsafe() {
2553
187
    std::map<std::string, double> stats;
2554
187
    stats["hits_ratio"] = (double)_hit_ratio->get_value();
2555
187
    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
2556
187
    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
2557
2558
187
    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
2559
187
    stats["index_queue_curr_size"] = (double)_index_queue.get_capacity_unsafe();
2560
187
    stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
2561
187
    stats["index_queue_curr_elements"] = (double)_index_queue.get_elements_num_unsafe();
2562
2563
187
    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
2564
187
    stats["ttl_queue_curr_size"] = (double)_ttl_queue.get_capacity_unsafe();
2565
187
    stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
2566
187
    stats["ttl_queue_curr_elements"] = (double)_ttl_queue.get_elements_num_unsafe();
2567
2568
187
    stats["meta_queue_max_size"] = (double)_meta_queue.get_max_size();
2569
187
    stats["meta_queue_curr_size"] = (double)_meta_queue.get_capacity_unsafe();
2570
187
    stats["meta_queue_max_elements"] = (double)_meta_queue.get_max_element_size();
2571
187
    stats["meta_queue_curr_elements"] = (double)_meta_queue.get_elements_num_unsafe();
2572
2573
187
    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
2574
187
    stats["normal_queue_curr_size"] = (double)_normal_queue.get_capacity_unsafe();
2575
187
    stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
2576
187
    stats["normal_queue_curr_elements"] = (double)_normal_queue.get_elements_num_unsafe();
2577
2578
187
    stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
2579
187
    stats["disposable_queue_curr_size"] = (double)_disposable_queue.get_capacity_unsafe();
2580
187
    stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
2581
187
    stats["disposable_queue_curr_elements"] = (double)_disposable_queue.get_elements_num_unsafe();
2582
2583
187
    stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance;
2584
187
    stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode;
2585
2586
187
    stats["total_removed_counts"] = (double)_num_removed_blocks->get_value();
2587
187
    stats["total_hit_counts"] = (double)_num_hit_blocks->get_value();
2588
187
    stats["total_read_counts"] = (double)_num_read_blocks->get_value();
2589
2590
187
    stats["total_read_size"] = (double)_total_read_size_metrics->get_value();
2591
187
    stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value();
2592
187
    stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value();
2593
2594
187
    return stats;
2595
187
}
2596
2597
template void BlockFileCache::remove(FileBlockSPtr file_block,
2598
                                     std::lock_guard<std::mutex>& cache_lock,
2599
                                     std::lock_guard<std::mutex>& block_lock, bool sync);
2600
2601
1
Status BlockFileCache::report_file_cache_inconsistency(std::vector<std::string>& results) {
2602
1
    InconsistencyContext inconsistency_context;
2603
1
    RETURN_IF_ERROR(check_file_cache_consistency(inconsistency_context));
2604
1
    auto n = inconsistency_context.types.size();
2605
1
    results.reserve(n);
2606
1
    for (size_t i = 0; i < n; i++) {
2607
0
        std::string result;
2608
0
        result += "File cache info in manager:\n";
2609
0
        result += inconsistency_context.infos_in_manager[i].to_string();
2610
0
        result += "File cache info in storage:\n";
2611
0
        result += inconsistency_context.infos_in_storage[i].to_string();
2612
0
        result += inconsistency_context.types[i].to_string();
2613
0
        result += "\n";
2614
0
        results.push_back(std::move(result));
2615
0
    }
2616
1
    return Status::OK();
2617
1
}
2618
2619
1
Status BlockFileCache::check_file_cache_consistency(InconsistencyContext& inconsistency_context) {
2620
1
    std::lock_guard<std::mutex> cache_lock(_mutex);
2621
1
    std::vector<FileCacheInfo> infos_in_storage;
2622
1
    RETURN_IF_ERROR(_storage->get_file_cache_infos(infos_in_storage, cache_lock));
2623
1
    std::unordered_set<AccessKeyAndOffset, KeyAndOffsetHash> confirmed_blocks;
2624
1
    for (const auto& info_in_storage : infos_in_storage) {
2625
0
        confirmed_blocks.insert({info_in_storage.hash, info_in_storage.offset});
2626
0
        auto* cell = get_cell(info_in_storage.hash, info_in_storage.offset, cache_lock);
2627
0
        if (cell == nullptr || cell->file_block == nullptr) {
2628
0
            inconsistency_context.infos_in_manager.emplace_back();
2629
0
            inconsistency_context.infos_in_storage.push_back(info_in_storage);
2630
0
            inconsistency_context.types.emplace_back(InconsistencyType::NOT_LOADED);
2631
0
            continue;
2632
0
        }
2633
0
        FileCacheInfo info_in_manager {
2634
0
                .hash = info_in_storage.hash,
2635
0
                .expiration_time = cell->file_block->expiration_time(),
2636
0
                .size = cell->size(),
2637
0
                .offset = info_in_storage.offset,
2638
0
                .is_tmp = cell->file_block->state() == FileBlock::State::DOWNLOADING,
2639
0
                .cache_type = cell->file_block->cache_type()};
2640
0
        InconsistencyType inconsistent_type;
2641
0
        if (info_in_storage.is_tmp != info_in_manager.is_tmp) {
2642
0
            inconsistent_type |= InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE;
2643
0
        }
2644
0
        size_t expected_size =
2645
0
                info_in_manager.is_tmp ? cell->dowloading_size() : info_in_manager.size;
2646
0
        if (info_in_storage.size != expected_size) {
2647
0
            inconsistent_type |= InconsistencyType::SIZE_INCONSISTENT;
2648
0
        }
2649
        // Only if it is not a tmp file need we check the cache type.
2650
0
        if ((inconsistent_type & InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE) == 0 &&
2651
0
            info_in_storage.cache_type != info_in_manager.cache_type) {
2652
0
            inconsistent_type |= InconsistencyType::CACHE_TYPE_INCONSISTENT;
2653
0
        }
2654
0
        if (info_in_storage.expiration_time != info_in_manager.expiration_time) {
2655
0
            inconsistent_type |= InconsistencyType::EXPIRATION_TIME_INCONSISTENT;
2656
0
        }
2657
0
        if (inconsistent_type != InconsistencyType::NONE) {
2658
0
            inconsistency_context.infos_in_manager.push_back(info_in_manager);
2659
0
            inconsistency_context.infos_in_storage.push_back(info_in_storage);
2660
0
            inconsistency_context.types.push_back(inconsistent_type);
2661
0
        }
2662
0
    }
2663
2664
1
    for (const auto& [hash, offset_to_cell] : _files) {
2665
0
        for (const auto& [offset, cell] : offset_to_cell) {
2666
0
            if (confirmed_blocks.contains({hash, offset})) {
2667
0
                continue;
2668
0
            }
2669
0
            const auto& block = cell.file_block;
2670
0
            inconsistency_context.infos_in_manager.emplace_back(
2671
0
                    hash, block->expiration_time(), cell.size(), offset,
2672
0
                    cell.file_block->state() == FileBlock::State::DOWNLOADING, block->cache_type());
2673
0
            inconsistency_context.infos_in_storage.emplace_back();
2674
0
            inconsistency_context.types.emplace_back(InconsistencyType::MISSING_IN_STORAGE);
2675
0
        }
2676
0
    }
2677
1
    return Status::OK();
2678
1
}
2679
2680
} // namespace doris::io