Coverage Report

Created: 2026-03-13 09:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/cache/block_file_cache.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileCache.cpp
19
// and modified by Doris
20
21
#include "io/cache/block_file_cache.h"
22
23
#include <gen_cpp/file_cache.pb.h>
24
25
#include <cstdio>
26
#include <exception>
27
#include <fstream>
28
#include <unordered_set>
29
30
#include "common/status.h"
31
#include "cpp/sync_point.h"
32
#include "runtime/exec_env.h"
33
34
#if defined(__APPLE__)
35
#include <sys/mount.h>
36
#else
37
#include <sys/statfs.h>
38
#endif
39
40
#include <chrono> // IWYU pragma: keep
41
#include <mutex>
42
#include <ranges>
43
44
#include "common/cast_set.h"
45
#include "common/config.h"
46
#include "common/logging.h"
47
#include "core/uint128.h"
48
#include "exec/common/sip_hash.h"
49
#include "io/cache/block_file_cache_ttl_mgr.h"
50
#include "io/cache/file_block.h"
51
#include "io/cache/file_cache_common.h"
52
#include "io/cache/fs_file_cache_storage.h"
53
#include "io/cache/mem_file_cache_storage.h"
54
#include "runtime/runtime_profile.h"
55
#include "util/concurrency_stats.h"
56
#include "util/stack_util.h"
57
#include "util/stopwatch.hpp"
58
#include "util/thread.h"
59
#include "util/time.h"
60
namespace doris::io {
61
#include "common/compile_check_begin.h"
62
63
// Insert a block pointer into one shard while swallowing allocation failures.
64
8.87k
bool NeedUpdateLRUBlocks::insert(FileBlockSPtr block) {
65
8.87k
    if (!block) {
66
1
        return false;
67
1
    }
68
8.87k
    try {
69
8.87k
        auto* raw_ptr = block.get();
70
8.87k
        auto idx = shard_index(raw_ptr);
71
8.87k
        auto& shard = _shards[idx];
72
8.87k
        std::lock_guard lock(shard.mutex);
73
8.87k
        auto [_, inserted] = shard.entries.emplace(raw_ptr, std::move(block));
74
8.87k
        if (inserted) {
75
950
            _size.fetch_add(1, std::memory_order_relaxed);
76
950
        }
77
8.87k
        return inserted;
78
8.87k
    } catch (const std::exception& e) {
79
0
        LOG(WARNING) << "Failed to enqueue block for LRU update: " << e.what();
80
0
    } catch (...) {
81
0
        LOG(WARNING) << "Failed to enqueue block for LRU update: unknown error";
82
0
    }
83
0
    return false;
84
8.87k
}
85
86
// Drain up to `limit` unique blocks to the caller, keeping the structure consistent on failures.
87
8.72k
size_t NeedUpdateLRUBlocks::drain(size_t limit, std::vector<FileBlockSPtr>* output) {
88
8.72k
    if (limit == 0 || output == nullptr) {
89
2
        return 0;
90
2
    }
91
8.72k
    size_t drained = 0;
92
8.72k
    try {
93
8.72k
        output->reserve(output->size() + std::min(limit, size()));
94
558k
        for (auto& shard : _shards) {
95
558k
            if (drained >= limit) {
96
1
                break;
97
1
            }
98
558k
            std::lock_guard lock(shard.mutex);
99
558k
            auto it = shard.entries.begin();
100
558k
            size_t shard_drained = 0;
101
559k
            while (it != shard.entries.end() && drained + shard_drained < limit) {
102
485
                output->emplace_back(std::move(it->second));
103
485
                it = shard.entries.erase(it);
104
485
                ++shard_drained;
105
485
            }
106
558k
            if (shard_drained > 0) {
107
6
                _size.fetch_sub(shard_drained, std::memory_order_relaxed);
108
6
                drained += shard_drained;
109
6
            }
110
558k
        }
111
8.72k
    } catch (const std::exception& e) {
112
0
        LOG(WARNING) << "Failed to drain LRU update blocks: " << e.what();
113
0
    } catch (...) {
114
0
        LOG(WARNING) << "Failed to drain LRU update blocks: unknown error";
115
0
    }
116
8.73k
    return drained;
117
8.72k
}
118
119
// Remove every pending block, guarding against unexpected exceptions.
120
35
void NeedUpdateLRUBlocks::clear() {
121
35
    try {
122
2.24k
        for (auto& shard : _shards) {
123
2.24k
            std::lock_guard lock(shard.mutex);
124
2.24k
            if (!shard.entries.empty()) {
125
1
                auto removed = shard.entries.size();
126
1
                shard.entries.clear();
127
1
                _size.fetch_sub(removed, std::memory_order_relaxed);
128
1
            }
129
2.24k
        }
130
35
    } catch (const std::exception& e) {
131
0
        LOG(WARNING) << "Failed to clear LRU update blocks: " << e.what();
132
0
    } catch (...) {
133
0
        LOG(WARNING) << "Failed to clear LRU update blocks: unknown error";
134
0
    }
135
35
}
136
137
8.87k
size_t NeedUpdateLRUBlocks::shard_index(FileBlock* ptr) const {
138
8.87k
    DCHECK(ptr != nullptr);
139
8.87k
    return std::hash<FileBlock*> {}(ptr)&kShardMask;
140
8.87k
}
141
142
BlockFileCache::BlockFileCache(const std::string& cache_base_path,
143
                               const FileCacheSettings& cache_settings)
144
172
        : _cache_base_path(cache_base_path),
145
172
          _capacity(cache_settings.capacity),
146
172
          _max_file_block_size(cache_settings.max_file_block_size) {
147
172
    _cur_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(_cache_base_path.c_str(),
148
172
                                                                     "file_cache_cache_size", 0);
149
172
    _cache_capacity_metrics = std::make_shared<bvar::Status<size_t>>(
150
172
            _cache_base_path.c_str(), "file_cache_capacity", _capacity);
151
172
    _cur_ttl_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
152
172
            _cache_base_path.c_str(), "file_cache_ttl_cache_size", 0);
153
172
    _cur_normal_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
154
172
            _cache_base_path.c_str(), "file_cache_normal_queue_element_count", 0);
155
172
    _cur_ttl_cache_lru_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
156
172
            _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_size", 0);
157
172
    _cur_ttl_cache_lru_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
158
172
            _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_element_count", 0);
159
172
    _cur_normal_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
160
172
            _cache_base_path.c_str(), "file_cache_normal_queue_cache_size", 0);
161
172
    _cur_index_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
162
172
            _cache_base_path.c_str(), "file_cache_index_queue_element_count", 0);
163
172
    _cur_index_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
164
172
            _cache_base_path.c_str(), "file_cache_index_queue_cache_size", 0);
165
172
    _cur_disposable_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
166
172
            _cache_base_path.c_str(), "file_cache_disposable_queue_element_count", 0);
167
172
    _cur_disposable_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
168
172
            _cache_base_path.c_str(), "file_cache_disposable_queue_cache_size", 0);
169
172
    _cur_cold_normal_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
170
172
            _cache_base_path.c_str(), "file_cache_cold_normal_queue_element_count", 0);
171
172
    _cur_cold_normal_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
172
172
            _cache_base_path.c_str(), "file_cache_cold_normal_queue_cache_size", 0);
173
174
172
    _queue_evict_size_metrics[0] = std::make_shared<bvar::Adder<size_t>>(
175
172
            _cache_base_path.c_str(), "file_cache_index_queue_evict_size");
176
172
    _queue_evict_size_metrics[1] = std::make_shared<bvar::Adder<size_t>>(
177
172
            _cache_base_path.c_str(), "file_cache_normal_queue_evict_size");
178
172
    _queue_evict_size_metrics[2] = std::make_shared<bvar::Adder<size_t>>(
179
172
            _cache_base_path.c_str(), "file_cache_disposable_queue_evict_size");
180
172
    _queue_evict_size_metrics[3] = std::make_shared<bvar::Adder<size_t>>(
181
172
            _cache_base_path.c_str(), "file_cache_ttl_cache_evict_size");
182
172
    _queue_evict_size_metrics[4] = std::make_shared<bvar::Adder<size_t>>(
183
172
            _cache_base_path.c_str(), "file_cache_cold_normal_queue_evict_size");
184
172
    _total_evict_size_metrics = std::make_shared<bvar::Adder<size_t>>(
185
172
            _cache_base_path.c_str(), "file_cache_total_evict_size");
186
172
    _total_read_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
187
172
                                                                     "file_cache_total_read_size");
188
172
    _total_hit_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
189
172
                                                                    "file_cache_total_hit_size");
190
172
    _gc_evict_bytes_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
191
172
                                                                    "file_cache_gc_evict_bytes");
192
172
    _gc_evict_count_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
193
172
                                                                    "file_cache_gc_evict_count");
194
195
172
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
196
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
197
172
                                                  "file_cache_evict_by_time_disposable_to_normal");
198
172
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
199
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
200
172
                                                  "file_cache_evict_by_time_disposable_to_index");
201
172
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
202
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
203
172
                                                  "file_cache_evict_by_time_disposable_to_ttl");
204
172
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::COLD_NORMAL] =
205
172
            std::make_shared<bvar::Adder<size_t>>(
206
172
                    _cache_base_path.c_str(), "file_cache_evict_by_time_disposable_to_cold_normal");
207
172
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
208
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
209
172
                                                  "file_cache_evict_by_time_normal_to_disposable");
210
172
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
211
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
212
172
                                                  "file_cache_evict_by_time_normal_to_index");
213
172
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
214
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
215
172
                                                  "file_cache_evict_by_time_normal_to_ttl");
216
172
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::COLD_NORMAL] =
217
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
218
172
                                                  "file_cache_evict_by_time_normal_to_cold_normal");
219
172
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
220
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
221
172
                                                  "file_cache_evict_by_time_index_to_disposable");
222
172
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
223
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
224
172
                                                  "file_cache_evict_by_time_index_to_normal");
225
172
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
226
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
227
172
                                                  "file_cache_evict_by_time_index_to_ttl");
228
172
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::COLD_NORMAL] =
229
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
230
172
                                                  "file_cache_evict_by_time_index_to_cold_normal");
231
172
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
232
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
233
172
                                                  "file_cache_evict_by_time_ttl_to_disposable");
234
172
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
235
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
236
172
                                                  "file_cache_evict_by_time_ttl_to_normal");
237
172
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
238
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
239
172
                                                  "file_cache_evict_by_time_ttl_to_index");
240
172
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::COLD_NORMAL] =
241
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
242
172
                                                  "file_cache_evict_by_time_ttl_to_cold_normal");
243
172
    _evict_by_time_metrics_matrix[FileCacheType::COLD_NORMAL][FileCacheType::DISPOSABLE] =
244
172
            std::make_shared<bvar::Adder<size_t>>(
245
172
                    _cache_base_path.c_str(), "file_cache_evict_by_time_cold_normal_to_disposable");
246
172
    _evict_by_time_metrics_matrix[FileCacheType::COLD_NORMAL][FileCacheType::NORMAL] =
247
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
248
172
                                                  "file_cache_evict_by_time_cold_normal_to_normal");
249
172
    _evict_by_time_metrics_matrix[FileCacheType::COLD_NORMAL][FileCacheType::INDEX] =
250
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
251
172
                                                  "file_cache_evict_by_time_cold_normal_to_index");
252
172
    _evict_by_time_metrics_matrix[FileCacheType::COLD_NORMAL][FileCacheType::TTL] =
253
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
254
172
                                                  "file_cache_evict_by_time_cold_normal_to_ttl");
255
256
172
    _evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE] =
257
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
258
172
                                                  "file_cache_evict_by_self_lru_disposable");
259
172
    _evict_by_self_lru_metrics_matrix[FileCacheType::NORMAL] =
260
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
261
172
                                                  "file_cache_evict_by_self_lru_normal");
262
172
    _evict_by_self_lru_metrics_matrix[FileCacheType::INDEX] = std::make_shared<bvar::Adder<size_t>>(
263
172
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_index");
264
172
    _evict_by_self_lru_metrics_matrix[FileCacheType::TTL] = std::make_shared<bvar::Adder<size_t>>(
265
172
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_ttl");
266
172
    _evict_by_self_lru_metrics_matrix[FileCacheType::COLD_NORMAL] =
267
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
268
172
                                                  "file_cache_evict_by_self_lru_cold_normal");
269
270
172
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
271
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
272
172
                                                  "file_cache_evict_by_size_disposable_to_normal");
273
172
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
274
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
275
172
                                                  "file_cache_evict_by_size_disposable_to_index");
276
172
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
277
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
278
172
                                                  "file_cache_evict_by_size_disposable_to_ttl");
279
172
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::COLD_NORMAL] =
280
172
            std::make_shared<bvar::Adder<size_t>>(
281
172
                    _cache_base_path.c_str(), "file_cache_evict_by_size_disposable_to_cold_normal");
282
172
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
283
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
284
172
                                                  "file_cache_evict_by_size_normal_to_disposable");
285
172
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
286
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
287
172
                                                  "file_cache_evict_by_size_normal_to_index");
288
172
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
289
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
290
172
                                                  "file_cache_evict_by_size_normal_to_ttl");
291
172
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::COLD_NORMAL] =
292
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
293
172
                                                  "file_cache_evict_by_size_normal_to_cold_normal");
294
172
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
295
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
296
172
                                                  "file_cache_evict_by_size_index_to_disposable");
297
172
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
298
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
299
172
                                                  "file_cache_evict_by_size_index_to_normal");
300
172
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
301
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
302
172
                                                  "file_cache_evict_by_size_index_to_ttl");
303
172
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::COLD_NORMAL] =
304
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
305
172
                                                  "file_cache_evict_by_size_index_to_cold_normal");
306
172
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
307
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
308
172
                                                  "file_cache_evict_by_size_ttl_to_disposable");
309
172
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
310
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
311
172
                                                  "file_cache_evict_by_size_ttl_to_normal");
312
172
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
313
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
314
172
                                                  "file_cache_evict_by_size_ttl_to_index");
315
172
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::COLD_NORMAL] =
316
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
317
172
                                                  "file_cache_evict_by_size_ttl_to_cold_normal");
318
172
    _evict_by_size_metrics_matrix[FileCacheType::COLD_NORMAL][FileCacheType::DISPOSABLE] =
319
172
            std::make_shared<bvar::Adder<size_t>>(
320
172
                    _cache_base_path.c_str(), "file_cache_evict_by_size_cold_normal_to_disposable");
321
172
    _evict_by_size_metrics_matrix[FileCacheType::COLD_NORMAL][FileCacheType::NORMAL] =
322
172
            std::make_shared<bvar::Adder<size_t>>(
323
172
                    _cache_base_path.c_str(),
324
172
                    "file_cache_evict_by_size_cold_normal_to_cold_normal");
325
172
    _evict_by_size_metrics_matrix[FileCacheType::COLD_NORMAL][FileCacheType::INDEX] =
326
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
327
172
                                                  "file_cache_evict_by_size_cold_normal_to_index");
328
172
    _evict_by_size_metrics_matrix[FileCacheType::COLD_NORMAL][FileCacheType::TTL] =
329
172
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
330
172
                                                  "file_cache_evict_by_size_cold_normal_to_ttl");
331
332
172
    _evict_by_try_release = std::make_shared<bvar::Adder<size_t>>(
333
172
            _cache_base_path.c_str(), "file_cache_evict_by_try_release");
334
335
172
    _num_read_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
336
172
                                                             "file_cache_num_read_blocks");
337
172
    _num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
338
172
                                                            "file_cache_num_hit_blocks");
339
172
    _num_removed_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
340
172
                                                                "file_cache_num_removed_blocks");
341
342
172
    _no_warmup_num_read_blocks = std::make_shared<bvar::Adder<size_t>>(
343
172
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks");
344
172
    _no_warmup_num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(
345
172
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks");
346
347
172
#ifndef BE_TEST
348
172
    _num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
349
172
            _cache_base_path.c_str(), "file_cache_num_hit_blocks_5m", _num_hit_blocks.get(), 300);
350
172
    _num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
351
172
            _cache_base_path.c_str(), "file_cache_num_read_blocks_5m", _num_read_blocks.get(), 300);
352
172
    _num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
353
172
            _cache_base_path.c_str(), "file_cache_num_hit_blocks_1h", _num_hit_blocks.get(), 3600);
354
172
    _num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
355
172
            _cache_base_path.c_str(), "file_cache_num_read_blocks_1h", _num_read_blocks.get(),
356
172
            3600);
357
172
    _no_warmup_num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
358
172
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_5m",
359
172
            _no_warmup_num_hit_blocks.get(), 300);
360
172
    _no_warmup_num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
361
172
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_5m",
362
172
            _no_warmup_num_read_blocks.get(), 300);
363
172
    _no_warmup_num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
364
172
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_1h",
365
172
            _no_warmup_num_hit_blocks.get(), 3600);
366
172
    _no_warmup_num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
367
172
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_1h",
368
172
            _no_warmup_num_read_blocks.get(), 3600);
369
172
#endif
370
371
172
    _hit_ratio = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
372
172
                                                        "file_cache_hit_ratio", 0.0);
373
172
    _hit_ratio_5m = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
374
172
                                                           "file_cache_hit_ratio_5m", 0.0);
375
172
    _hit_ratio_1h = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
376
172
                                                           "file_cache_hit_ratio_1h", 0.0);
377
378
172
    _no_warmup_hit_ratio = std::make_shared<bvar::Status<double>>(
379
172
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio", 0.0);
380
172
    _no_warmup_hit_ratio_5m = std::make_shared<bvar::Status<double>>(
381
172
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_5m", 0.0);
382
172
    _no_warmup_hit_ratio_1h = std::make_shared<bvar::Status<double>>(
383
172
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_1h", 0.0);
384
385
172
    _disk_limit_mode_metrics = std::make_shared<bvar::Status<size_t>>(
386
172
            _cache_base_path.c_str(), "file_cache_disk_limit_mode", 0);
387
172
    _need_evict_cache_in_advance_metrics = std::make_shared<bvar::Status<size_t>>(
388
172
            _cache_base_path.c_str(), "file_cache_need_evict_cache_in_advance", 0);
389
172
    _meta_store_write_queue_size_metrics = std::make_shared<bvar::Status<size_t>>(
390
172
            _cache_base_path.c_str(), "file_cache_meta_store_write_queue_size", 0);
391
392
172
    _cache_lock_wait_time_us = std::make_shared<bvar::LatencyRecorder>(
393
172
            _cache_base_path.c_str(), "file_cache_cache_lock_wait_time_us");
394
172
    _get_or_set_latency_us = std::make_shared<bvar::LatencyRecorder>(
395
172
            _cache_base_path.c_str(), "file_cache_get_or_set_latency_us");
396
172
    _storage_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
397
172
            _cache_base_path.c_str(), "file_cache_storage_sync_remove_latency_us");
398
172
    _storage_retry_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
399
172
            _cache_base_path.c_str(), "file_cache_storage_retry_sync_remove_latency_us");
400
172
    _storage_async_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
401
172
            _cache_base_path.c_str(), "file_cache_storage_async_remove_latency_us");
402
172
    _evict_in_advance_latency_us = std::make_shared<bvar::LatencyRecorder>(
403
172
            _cache_base_path.c_str(), "file_cache_evict_in_advance_latency_us");
404
172
    _lru_dump_latency_us = std::make_shared<bvar::LatencyRecorder>(
405
172
            _cache_base_path.c_str(), "file_cache_lru_dump_latency_us");
406
172
    _recycle_keys_length_recorder = std::make_shared<bvar::LatencyRecorder>(
407
172
            _cache_base_path.c_str(), "file_cache_recycle_keys_length");
408
172
    _need_update_lru_blocks_length_recorder = std::make_shared<bvar::LatencyRecorder>(
409
172
            _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_length");
410
172
    _update_lru_blocks_latency_us = std::make_shared<bvar::LatencyRecorder>(
411
172
            _cache_base_path.c_str(), "file_cache_update_lru_blocks_latency_us");
412
172
    _ttl_gc_latency_us = std::make_shared<bvar::LatencyRecorder>(_cache_base_path.c_str(),
413
172
                                                                 "file_cache_ttl_gc_latency_us");
414
172
    _shadow_queue_levenshtein_distance = std::make_shared<bvar::LatencyRecorder>(
415
172
            _cache_base_path.c_str(), "file_cache_shadow_queue_levenshtein_distance");
416
417
172
    _disposable_queue = LRUQueue(cache_settings.disposable_queue_size,
418
172
                                 cache_settings.disposable_queue_elements, 60 * 60);
419
172
    _index_queue = LRUQueue(cache_settings.index_queue_size, cache_settings.index_queue_elements,
420
172
                            7 * 24 * 60 * 60);
421
172
    _normal_queue = LRUQueue(cache_settings.query_queue_size, cache_settings.query_queue_elements,
422
172
                             24 * 60 * 60);
423
172
    _ttl_queue = LRUQueue(cache_settings.ttl_queue_size, cache_settings.ttl_queue_elements,
424
172
                          std::numeric_limits<int>::max());
425
172
    _cold_normal_queue = LRUQueue(cache_settings.cold_query_queue_size,
426
172
                                  cache_settings.cold_query_queue_elements, 24 * 60 * 60);
427
428
172
    _lru_recorder = std::make_unique<LRUQueueRecorder>(this);
429
172
    _lru_dumper = std::make_unique<CacheLRUDumper>(this, _lru_recorder.get());
430
172
    if (cache_settings.storage == "memory") {
431
16
        _storage = std::make_unique<MemFileCacheStorage>();
432
16
        _cache_base_path = "memory";
433
156
    } else {
434
156
        _storage = std::make_unique<FSFileCacheStorage>();
435
156
    }
436
437
172
    LOG(INFO) << "file cache path= " << _cache_base_path << " " << cache_settings.to_string();
438
172
}
439
440
441k
UInt128Wrapper BlockFileCache::hash(const std::string& path) {
441
441k
    uint128_t value;
442
441k
    sip_hash128(path.data(), path.size(), reinterpret_cast<char*>(&value));
443
441k
    return UInt128Wrapper(value);
444
441k
}
445
446
BlockFileCache::QueryFileCacheContextHolderPtr BlockFileCache::get_query_context_holder(
447
8
        const TUniqueId& query_id, int file_cache_query_limit_percent) {
448
8
    SCOPED_CACHE_LOCK(_mutex, this);
449
8
    if (!config::enable_file_cache_query_limit) {
450
1
        return {};
451
1
    }
452
453
    /// if enable_filesystem_query_cache_limit is true,
454
    /// we create context query for current query.
455
7
    auto context = get_or_set_query_context(query_id, cache_lock, file_cache_query_limit_percent);
456
7
    return std::make_unique<QueryFileCacheContextHolder>(query_id, this, context);
457
8
}
458
459
BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_query_context(
460
12.9k
        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock) {
461
12.9k
    auto query_iter = _query_map.find(query_id);
462
12.9k
    return (query_iter == _query_map.end()) ? nullptr : query_iter->second;
463
12.9k
}
464
465
6
void BlockFileCache::remove_query_context(const TUniqueId& query_id) {
466
6
    SCOPED_CACHE_LOCK(_mutex, this);
467
6
    const auto& query_iter = _query_map.find(query_id);
468
469
6
    if (query_iter != _query_map.end() && query_iter->second.use_count() <= 1) {
470
5
        _query_map.erase(query_iter);
471
5
    }
472
6
}
473
474
BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_or_set_query_context(
475
        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock,
476
7
        int file_cache_query_limit_percent) {
477
7
    if (query_id.lo == 0 && query_id.hi == 0) {
478
1
        return nullptr;
479
1
    }
480
481
6
    auto context = get_query_context(query_id, cache_lock);
482
6
    if (context) {
483
1
        return context;
484
1
    }
485
486
5
    size_t file_cache_query_limit_size = _capacity * file_cache_query_limit_percent / 100;
487
5
    if (file_cache_query_limit_size < 268435456) {
488
5
        LOG(WARNING) << "The user-set file cache query limit (" << file_cache_query_limit_size
489
5
                     << " bytes) is less than the 256MB recommended minimum. "
490
5
                     << "Consider increasing the session variable 'file_cache_query_limit_percent'"
491
5
                     << " from its current value " << file_cache_query_limit_percent << "%.";
492
5
    }
493
5
    auto query_context = std::make_shared<QueryFileCacheContext>(file_cache_query_limit_size);
494
5
    auto query_iter = _query_map.emplace(query_id, query_context).first;
495
5
    return query_iter->second;
496
6
}
497
498
void BlockFileCache::QueryFileCacheContext::remove(const UInt128Wrapper& hash, size_t offset,
499
20
                                                   std::lock_guard<std::mutex>& cache_lock) {
500
20
    auto pair = std::make_pair(hash, offset);
501
20
    auto record = records.find(pair);
502
20
    DCHECK(record != records.end());
503
20
    auto iter = record->second;
504
20
    records.erase(pair);
505
20
    lru_queue.remove(iter, cache_lock);
506
20
}
507
508
void BlockFileCache::QueryFileCacheContext::reserve(const UInt128Wrapper& hash, size_t offset,
509
                                                    size_t size,
510
30
                                                    std::lock_guard<std::mutex>& cache_lock) {
511
30
    auto pair = std::make_pair(hash, offset);
512
30
    if (records.find(pair) == records.end()) {
513
29
        auto queue_iter = lru_queue.add(hash, offset, size, cache_lock);
514
29
        records.insert({pair, queue_iter});
515
29
    }
516
30
}
517
518
152
Status BlockFileCache::initialize() {
519
152
    SCOPED_CACHE_LOCK(_mutex, this);
520
152
    return initialize_unlocked(cache_lock);
521
152
}
522
523
152
Status BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lock) {
524
152
    DCHECK(!_is_initialized);
525
152
    _is_initialized = true;
526
152
    if (config::file_cache_background_lru_dump_tail_record_num > 0) {
527
        // requirements:
528
        // 1. restored data should not overwrite the last dump
529
        // 2. restore should happen before load and async load
530
        // 3. all queues should be restored sequencially to avoid conflict
531
        // TODO(zhengyu): we can parralize them but will increase complexity, so lets check the time cost
532
        // to see if any improvement is a necessary
533
152
        restore_lru_queues_from_disk(cache_lock);
534
152
    }
535
152
    RETURN_IF_ERROR(_storage->init(this));
536
537
152
    if (auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get())) {
538
137
        if (auto* meta_store = fs_storage->get_meta_store()) {
539
137
            _ttl_mgr = std::make_unique<BlockFileCacheTtlMgr>(this, meta_store);
540
137
        }
541
137
    }
542
543
152
    _cache_background_monitor_thread = std::thread(&BlockFileCache::run_background_monitor, this);
544
152
    _cache_background_gc_thread = std::thread(&BlockFileCache::run_background_gc, this);
545
152
    _cache_background_evict_in_advance_thread =
546
152
            std::thread(&BlockFileCache::run_background_evict_in_advance, this);
547
152
    _cache_background_block_lru_update_thread =
548
152
            std::thread(&BlockFileCache::run_background_block_lru_update, this);
549
550
    // Initialize LRU dump thread and restore queues
551
152
    _cache_background_lru_dump_thread = std::thread(&BlockFileCache::run_background_lru_dump, this);
552
152
    _cache_background_lru_log_replay_thread =
553
152
            std::thread(&BlockFileCache::run_background_lru_log_replay, this);
554
555
152
    return Status::OK();
556
152
}
557
558
void BlockFileCache::update_block_lru(FileBlockSPtr block,
559
480
                                      std::lock_guard<std::mutex>& cache_lock) {
560
480
    FileBlockCell* cell = block->cell;
561
480
    if (cell) {
562
478
        if (cell->queue_iterator) {
563
478
            auto& queue = get_queue(block->cache_type());
564
478
            queue.move_to_end(*cell->queue_iterator, cache_lock);
565
478
            _lru_recorder->record_queue_event(block->cache_type(), CacheLRULogType::MOVETOBACK,
566
478
                                              block->_key.hash, block->_key.offset,
567
478
                                              block->_block_range.size());
568
478
        }
569
478
        cell->update_atime();
570
478
    }
571
480
}
572
573
void BlockFileCache::use_cell(FileBlockCell& cell, FileBlocks* result, bool move_iter_flag,
574
4.26M
                              std::lock_guard<std::mutex>& cache_lock) {
575
4.26M
    if (result) {
576
4.26M
        result->push_back(cell.file_block);
577
4.26M
    }
578
579
4.26M
    auto& queue = get_queue(cell.file_block->cache_type());
580
    /// Move to the end of the queue. The iterator remains valid.
581
4.26M
    if (cell.queue_iterator && move_iter_flag) {
582
3.28M
        if (cell.file_block->cache_type() == FileCacheType::COLD_NORMAL) {
583
34
            auto now_time = std::chrono::duration_cast<std::chrono::milliseconds>(
584
34
                                    std::chrono::steady_clock::now().time_since_epoch())
585
34
                                    .count();
586
34
            if (now_time - cell.atime > config::file_cache_2qlru_cold_blocks_promotion_ms) {
587
28
                queue.remove(*cell.queue_iterator, cache_lock);
588
28
                _lru_recorder->record_queue_event(
589
28
                        FileCacheType::COLD_NORMAL, CacheLRULogType::REMOVE,
590
28
                        cell.file_block->get_hash_value(), cell.file_block->offset(), cell.size());
591
28
                auto& normal_queue = get_queue(FileCacheType::NORMAL);
592
28
                cell.queue_iterator =
593
28
                        normal_queue.add(cell.file_block->_key.hash, cell.file_block->_key.offset,
594
28
                                         cell.size(), cache_lock);
595
28
                _lru_recorder->record_queue_event(FileCacheType::NORMAL, CacheLRULogType::ADD,
596
28
                                                  cell.file_block->get_hash_value(),
597
28
                                                  cell.file_block->offset(), cell.size());
598
28
                cell.file_block->set_cache_type(FileCacheType::NORMAL);
599
28
            } else {
600
6
                queue.move_to_end(*cell.queue_iterator, cache_lock);
601
6
                _lru_recorder->record_queue_event(
602
6
                        cell.file_block->cache_type(), CacheLRULogType::MOVETOBACK,
603
6
                        cell.file_block->_key.hash, cell.file_block->_key.offset, cell.size());
604
6
            }
605
3.28M
        } else {
606
3.28M
            queue.move_to_end(*cell.queue_iterator, cache_lock);
607
3.28M
            _lru_recorder->record_queue_event(
608
3.28M
                    cell.file_block->cache_type(), CacheLRULogType::MOVETOBACK,
609
3.28M
                    cell.file_block->_key.hash, cell.file_block->_key.offset, cell.size());
610
3.28M
        }
611
3.28M
    }
612
613
4.26M
    cell.update_atime();
614
4.26M
}
615
616
template <class T>
617
    requires IsXLock<T>
618
FileBlockCell* BlockFileCache::get_cell(const UInt128Wrapper& hash, size_t offset,
619
898k
                                        T& /* cache_lock */) {
620
898k
    auto it = _files.find(hash);
621
898k
    if (it == _files.end()) {
622
1
        return nullptr;
623
1
    }
624
625
898k
    auto& offsets = it->second;
626
898k
    auto cell_it = offsets.find(offset);
627
898k
    if (cell_it == offsets.end()) {
628
3
        return nullptr;
629
3
    }
630
631
897k
    return &cell_it->second;
632
898k
}
633
634
4.26M
bool BlockFileCache::need_to_move(FileCacheType cell_type, FileCacheType query_type) const {
635
4.26M
    return query_type != FileCacheType::DISPOSABLE && cell_type != FileCacheType::DISPOSABLE;
636
4.26M
}
637
638
FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheContext& context,
639
                                    const FileBlock::Range& range,
640
4.34M
                                    std::lock_guard<std::mutex>& cache_lock) {
641
    /// Given range = [left, right] and non-overlapping ordered set of file blocks,
642
    /// find list [block1, ..., blockN] of blocks which intersect with given range.
643
4.34M
    auto it = _files.find(hash);
644
4.34M
    if (it == _files.end()) {
645
108k
        if (_async_open_done) {
646
108k
            return {};
647
108k
        }
648
0
        FileCacheKey key;
649
0
        key.hash = hash;
650
0
        key.meta.type = context.cache_type;
651
0
        key.meta.expiration_time = context.expiration_time;
652
0
        key.meta.tablet_id = context.tablet_id;
653
0
        _storage->load_blocks_directly_unlocked(this, key, cache_lock);
654
655
0
        it = _files.find(hash);
656
0
        if (it == _files.end()) [[unlikely]] {
657
0
            return {};
658
0
        }
659
0
    }
660
661
4.23M
    auto& file_blocks = it->second;
662
4.23M
    if (file_blocks.empty()) {
663
0
        LOG(WARNING) << "file_blocks is empty for hash=" << hash.to_string()
664
0
                     << " cache type=" << context.cache_type
665
0
                     << " cache expiration time=" << context.expiration_time
666
0
                     << " cache range=" << range.left << " " << range.right
667
0
                     << " query id=" << context.query_id;
668
0
        DCHECK(false);
669
0
        _files.erase(hash);
670
0
        return {};
671
0
    }
672
673
4.23M
    FileBlocks result;
674
4.23M
    auto block_it = file_blocks.lower_bound(range.left);
675
4.23M
    if (block_it == file_blocks.end()) {
676
        /// N - last cached block for given file hash, block{N}.offset < range.left:
677
        ///   block{N}                       block{N}
678
        /// [________                         [_______]
679
        ///     [__________]         OR                  [________]
680
        ///     ^                                        ^
681
        ///     range.left                               range.left
682
683
7.60k
        auto& cell = file_blocks.rbegin()->second;
684
7.60k
        if (cell.file_block->range().right < range.left) {
685
7.59k
            return {};
686
7.59k
        }
687
688
14
        use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
689
14
                 cache_lock);
690
4.23M
    } else { /// block_it <-- segmment{k}
691
4.23M
        if (block_it != file_blocks.begin()) {
692
103k
            auto& prev_cell = std::prev(block_it)->second;
693
103k
            auto& prev_cell_range = prev_cell.file_block->range();
694
695
103k
            if (range.left <= prev_cell_range.right) {
696
                ///   block{k-1}  block{k}
697
                ///   [________]   [_____
698
                ///       [___________
699
                ///       ^
700
                ///       range.left
701
702
137
                use_cell(prev_cell, &result,
703
137
                         need_to_move(prev_cell.file_block->cache_type(), context.cache_type),
704
137
                         cache_lock);
705
137
            }
706
103k
        }
707
708
        ///  block{k} ...       block{k-1}  block{k}                      block{k}
709
        ///  [______              [______]     [____                        [________
710
        ///  [_________     OR              [________      OR    [______]   ^
711
        ///  ^                              ^                           ^   block{k}.offset
712
        ///  range.left                     range.left                  range.right
713
714
8.49M
        while (block_it != file_blocks.end()) {
715
4.47M
            auto& cell = block_it->second;
716
4.47M
            if (range.right < cell.file_block->range().left) {
717
211k
                break;
718
211k
            }
719
720
4.26M
            use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
721
4.26M
                     cache_lock);
722
4.26M
            ++block_it;
723
4.26M
        }
724
4.23M
    }
725
726
4.23M
    return result;
727
4.23M
}
728
729
8.86k
void BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) {
730
8.86k
    if (_need_update_lru_blocks.insert(std::move(block))) {
731
937
        *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
732
937
    }
733
8.86k
}
734
735
4
std::string BlockFileCache::clear_file_cache_async() {
736
4
    LOG(INFO) << "start clear_file_cache_async, path=" << _cache_base_path;
737
4
    _lru_dumper->remove_lru_dump_files();
738
4
    int64_t num_cells_all = 0;
739
4
    int64_t num_cells_to_delete = 0;
740
4
    int64_t num_cells_wait_recycle = 0;
741
4
    int64_t num_files_all = 0;
742
4
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::clear_file_cache_async");
743
4
    {
744
4
        SCOPED_CACHE_LOCK(_mutex, this);
745
746
4
        std::vector<FileBlockCell*> deleting_cells;
747
4
        for (auto& [_, offset_to_cell] : _files) {
748
4
            ++num_files_all;
749
48
            for (auto& [_1, cell] : offset_to_cell) {
750
48
                ++num_cells_all;
751
48
                deleting_cells.push_back(&cell);
752
48
            }
753
4
        }
754
755
        // we cannot delete the element in the loop above, because it will break the iterator
756
48
        for (auto& cell : deleting_cells) {
757
48
            if (!cell->releasable()) {
758
2
                LOG(INFO) << "cell is not releasable, hash="
759
2
                          << " offset=" << cell->file_block->offset();
760
2
                cell->file_block->set_deleting();
761
2
                ++num_cells_wait_recycle;
762
2
                continue;
763
2
            }
764
46
            FileBlockSPtr file_block = cell->file_block;
765
46
            if (file_block) {
766
46
                std::lock_guard block_lock(file_block->_mutex);
767
46
                remove(file_block, cache_lock, block_lock, false);
768
46
                ++num_cells_to_delete;
769
46
            }
770
46
        }
771
4
        clear_need_update_lru_blocks();
772
4
    }
773
774
4
    std::stringstream ss;
775
4
    ss << "finish clear_file_cache_async, path=" << _cache_base_path
776
4
       << " num_files_all=" << num_files_all << " num_cells_all=" << num_cells_all
777
4
       << " num_cells_to_delete=" << num_cells_to_delete
778
4
       << " num_cells_wait_recycle=" << num_cells_wait_recycle;
779
4
    auto msg = ss.str();
780
4
    LOG(INFO) << msg;
781
4
    _lru_dumper->remove_lru_dump_files();
782
4
    return msg;
783
4
}
784
785
FileBlocks BlockFileCache::split_range_into_cells(const UInt128Wrapper& hash,
786
                                                  const CacheContext& context, size_t offset,
787
                                                  size_t size, FileBlock::State state,
788
118k
                                                  std::lock_guard<std::mutex>& cache_lock) {
789
118k
    DCHECK(size > 0);
790
791
118k
    auto current_pos = offset;
792
118k
    auto end_pos_non_included = offset + size;
793
794
118k
    size_t current_size = 0;
795
118k
    size_t remaining_size = size;
796
797
118k
    FileBlocks file_blocks;
798
267k
    while (current_pos < end_pos_non_included) {
799
148k
        current_size = std::min(remaining_size, _max_file_block_size);
800
148k
        remaining_size -= current_size;
801
148k
        state = try_reserve(hash, context, current_pos, current_size, cache_lock)
802
148k
                        ? state
803
148k
                        : FileBlock::State::SKIP_CACHE;
804
148k
        if (state == FileBlock::State::SKIP_CACHE) [[unlikely]] {
805
1.45k
            FileCacheKey key;
806
1.45k
            key.hash = hash;
807
1.45k
            key.offset = current_pos;
808
1.45k
            key.meta.type = context.cache_type;
809
1.45k
            key.meta.expiration_time = context.expiration_time;
810
1.45k
            key.meta.tablet_id = context.tablet_id;
811
1.45k
            auto file_block = std::make_shared<FileBlock>(key, current_size, this,
812
1.45k
                                                          FileBlock::State::SKIP_CACHE);
813
1.45k
            file_blocks.push_back(std::move(file_block));
814
147k
        } else {
815
147k
            auto* cell = add_cell(hash, context, current_pos, current_size, state, cache_lock);
816
147k
            if (cell) {
817
147k
                file_blocks.push_back(cell->file_block);
818
147k
                if (!context.is_cold_data) {
819
147k
                    cell->update_atime();
820
147k
                }
821
147k
            }
822
147k
            if (_ttl_mgr && context.tablet_id != 0) {
823
139k
                _ttl_mgr->register_tablet_id(context.tablet_id);
824
139k
            }
825
147k
        }
826
827
148k
        current_pos += current_size;
828
148k
    }
829
830
118k
    DCHECK(file_blocks.empty() || offset + size - 1 == file_blocks.back()->range().right);
831
118k
    return file_blocks;
832
118k
}
833
834
void BlockFileCache::fill_holes_with_empty_file_blocks(FileBlocks& file_blocks,
835
                                                       const UInt128Wrapper& hash,
836
                                                       const CacheContext& context,
837
                                                       const FileBlock::Range& range,
838
4.22M
                                                       std::lock_guard<std::mutex>& cache_lock) {
839
    /// There are blocks [block1, ..., blockN]
840
    /// (non-overlapping, non-empty, ascending-ordered) which (maybe partially)
841
    /// intersect with given range.
842
843
    /// It can have holes:
844
    /// [____________________]         -- requested range
845
    ///     [____]  [_]   [_________]  -- intersecting cache [block1, ..., blockN]
846
    ///
847
    /// For each such hole create a cell with file block state EMPTY.
848
849
4.22M
    auto it = file_blocks.begin();
850
4.22M
    auto block_range = (*it)->range();
851
852
4.22M
    size_t current_pos = 0;
853
4.22M
    if (block_range.left < range.left) {
854
        ///    [_______     -- requested range
855
        /// [_______
856
        /// ^
857
        /// block1
858
859
151
        current_pos = block_range.right + 1;
860
151
        ++it;
861
4.22M
    } else {
862
4.22M
        current_pos = range.left;
863
4.22M
    }
864
865
8.49M
    while (current_pos <= range.right && it != file_blocks.end()) {
866
4.26M
        block_range = (*it)->range();
867
868
4.26M
        if (current_pos == block_range.left) {
869
4.26M
            current_pos = block_range.right + 1;
870
4.26M
            ++it;
871
4.26M
            continue;
872
4.26M
        }
873
874
4.26M
        DCHECK(current_pos < block_range.left);
875
876
185
        auto hole_size = block_range.left - current_pos;
877
878
185
        file_blocks.splice(it, split_range_into_cells(hash, context, current_pos, hole_size,
879
185
                                                      FileBlock::State::EMPTY, cache_lock));
880
881
185
        current_pos = block_range.right + 1;
882
185
        ++it;
883
185
    }
884
885
4.22M
    if (current_pos <= range.right) {
886
        ///   ________]     -- requested range
887
        ///   _____]
888
        ///        ^
889
        /// blockN
890
891
507
        auto hole_size = range.right - current_pos + 1;
892
893
507
        file_blocks.splice(file_blocks.end(),
894
507
                           split_range_into_cells(hash, context, current_pos, hole_size,
895
507
                                                  FileBlock::State::EMPTY, cache_lock));
896
507
    }
897
4.22M
}
898
899
FileBlocksHolder BlockFileCache::get_or_set(const UInt128Wrapper& hash, size_t offset, size_t size,
900
4.34M
                                            CacheContext& context) {
901
4.34M
    FileBlock::Range range(offset, offset + size - 1);
902
903
4.34M
    ReadStatistics* stats = context.stats;
904
4.34M
    DCHECK(stats != nullptr);
905
4.34M
    MonotonicStopWatch sw;
906
4.34M
    sw.start();
907
4.34M
    ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->increment();
908
4.34M
    std::lock_guard cache_lock(_mutex);
909
4.34M
    ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->decrement();
910
4.34M
    stats->lock_wait_timer += sw.elapsed_time();
911
4.34M
    FileBlocks file_blocks;
912
4.34M
    int64_t duration = 0;
913
4.34M
    {
914
4.34M
        SCOPED_RAW_TIMER(&duration);
915
        /// Get all blocks which intersect with the given range.
916
4.34M
        {
917
4.34M
            SCOPED_RAW_TIMER(&stats->get_timer);
918
4.34M
            file_blocks = get_impl(hash, context, range, cache_lock);
919
4.34M
        }
920
921
4.34M
        if (file_blocks.empty()) {
922
117k
            SCOPED_RAW_TIMER(&stats->set_timer);
923
117k
            file_blocks = split_range_into_cells(hash, context, offset, size,
924
117k
                                                 FileBlock::State::EMPTY, cache_lock);
925
4.22M
        } else {
926
4.22M
            SCOPED_RAW_TIMER(&stats->set_timer);
927
4.22M
            fill_holes_with_empty_file_blocks(file_blocks, hash, context, range, cache_lock);
928
4.22M
        }
929
4.34M
        DCHECK(!file_blocks.empty());
930
4.34M
        *_num_read_blocks << file_blocks.size();
931
4.34M
        if (!context.is_warmup) {
932
4.34M
            *_no_warmup_num_read_blocks << file_blocks.size();
933
4.34M
        }
934
4.41M
        for (auto& block : file_blocks) {
935
4.41M
            size_t block_size = block->range().size();
936
4.41M
            *_total_read_size_metrics << block_size;
937
4.41M
            if (block->state_unsafe() == FileBlock::State::DOWNLOADED) {
938
4.25M
                *_num_hit_blocks << 1;
939
4.25M
                *_total_hit_size_metrics << block_size;
940
4.25M
                if (!context.is_warmup) {
941
4.25M
                    *_no_warmup_num_hit_blocks << 1;
942
4.25M
                }
943
4.25M
            }
944
4.41M
        }
945
4.34M
    }
946
4.34M
    *_get_or_set_latency_us << (duration / 1000);
947
4.34M
    return FileBlocksHolder(std::move(file_blocks));
948
4.34M
}
949
950
FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& hash, const CacheContext& context,
951
                                        size_t offset, size_t size, FileBlock::State state,
952
147k
                                        std::lock_guard<std::mutex>& cache_lock) {
953
    /// Create a file block cell and put it in `files` map by [hash][offset].
954
147k
    if (size == 0) {
955
0
        return nullptr; /// Empty files are not cached.
956
0
    }
957
958
147k
    VLOG_DEBUG << "Adding file block to cache. size=" << size << " hash=" << hash.to_string()
959
0
               << " offset=" << offset << " cache_type=" << cache_type_to_string(context.cache_type)
960
0
               << " expiration_time=" << context.expiration_time
961
0
               << " tablet_id=" << context.tablet_id;
962
963
147k
    if (size > 1024 * 1024 * 1024) {
964
0
        LOG(WARNING) << "File block size is too large for a block. size=" << size
965
0
                     << " hash=" << hash.to_string() << " offset=" << offset
966
0
                     << " stack:" << get_stack_trace();
967
0
    }
968
969
147k
    auto& offsets = _files[hash];
970
147k
    auto itr = offsets.find(offset);
971
147k
    if (itr != offsets.end()) {
972
5
        VLOG_DEBUG << "Cache already exists for hash: " << hash.to_string()
973
0
                   << ", offset: " << offset << ", size: " << size
974
0
                   << ".\nCurrent cache structure: " << dump_structure_unlocked(hash, cache_lock);
975
5
        return &(itr->second);
976
5
    }
977
978
147k
    FileCacheKey key;
979
147k
    key.hash = hash;
980
147k
    key.offset = offset;
981
147k
    key.meta.type = context.cache_type;
982
147k
    key.meta.expiration_time = context.expiration_time;
983
147k
    key.meta.tablet_id = context.tablet_id;
984
147k
    FileBlockCell cell(std::make_shared<FileBlock>(key, size, this, state), cache_lock);
985
147k
    Status st;
986
147k
    if (context.expiration_time == 0 && context.cache_type == FileCacheType::TTL) {
987
0
        st = cell.file_block->change_cache_type_lock(config::enable_file_cache_normal_queue_2qlru
988
0
                                                             ? FileCacheType::COLD_NORMAL
989
0
                                                             : FileCacheType::NORMAL,
990
0
                                                     cache_lock);
991
147k
    } else if (context.cache_type != FileCacheType::TTL && context.expiration_time != 0) {
992
1
        st = cell.file_block->change_cache_type_lock(FileCacheType::TTL, cache_lock);
993
1
    }
994
147k
    if (!st.ok()) {
995
0
        LOG(WARNING) << "Cannot change cache type. expiration_time=" << context.expiration_time
996
0
                     << " cache_type=" << cache_type_to_string(context.cache_type)
997
0
                     << " error=" << st.msg();
998
0
    }
999
1000
147k
    auto& queue = get_queue(cell.file_block->cache_type());
1001
147k
    cell.queue_iterator = queue.add(hash, offset, size, cache_lock);
1002
147k
    _lru_recorder->record_queue_event(cell.file_block->cache_type(), CacheLRULogType::ADD,
1003
147k
                                      cell.file_block->get_hash_value(), cell.file_block->offset(),
1004
147k
                                      cell.size());
1005
1006
147k
    if (cell.file_block->cache_type() == FileCacheType::TTL) {
1007
3.92k
        _cur_ttl_size += cell.size();
1008
3.92k
    }
1009
147k
    auto [it, _] = offsets.insert(std::make_pair(offset, std::move(cell)));
1010
147k
    _cur_cache_size += size;
1011
147k
    return &(it->second);
1012
147k
}
1013
1014
0
size_t BlockFileCache::try_release() {
1015
0
    SCOPED_CACHE_LOCK(_mutex, this);
1016
0
    std::vector<FileBlockCell*> trash;
1017
0
    for (auto& [hash, blocks] : _files) {
1018
0
        for (auto& [offset, cell] : blocks) {
1019
0
            if (cell.releasable()) {
1020
0
                trash.emplace_back(&cell);
1021
0
            } else {
1022
0
                cell.file_block->set_deleting();
1023
0
            }
1024
0
        }
1025
0
    }
1026
0
    size_t remove_size = 0;
1027
0
    for (auto& cell : trash) {
1028
0
        FileBlockSPtr file_block = cell->file_block;
1029
0
        std::lock_guard lc(cell->file_block->_mutex);
1030
0
        remove_size += file_block->range().size();
1031
0
        remove(file_block, cache_lock, lc);
1032
0
        VLOG_DEBUG << "try_release " << _cache_base_path
1033
0
                   << " hash=" << file_block->get_hash_value().to_string()
1034
0
                   << " offset=" << file_block->offset();
1035
0
    }
1036
0
    *_evict_by_try_release << remove_size;
1037
0
    LOG(INFO) << "Released " << trash.size() << " blocks in file cache " << _cache_base_path;
1038
0
    return trash.size();
1039
0
}
1040
1041
5.02M
LRUQueue& BlockFileCache::get_queue(FileCacheType type) {
1042
5.02M
    switch (type) {
1043
558k
    case FileCacheType::INDEX:
1044
558k
        return _index_queue;
1045
236k
    case FileCacheType::DISPOSABLE:
1046
236k
        return _disposable_queue;
1047
4.07M
    case FileCacheType::NORMAL:
1048
4.07M
        return _normal_queue;
1049
6.48k
    case FileCacheType::TTL:
1050
6.48k
        return _ttl_queue;
1051
150k
    case FileCacheType::COLD_NORMAL:
1052
150k
        return _cold_normal_queue;
1053
0
    default:
1054
0
        DCHECK(false);
1055
5.02M
    }
1056
0
    return _normal_queue;
1057
5.02M
}
1058
1059
137
const LRUQueue& BlockFileCache::get_queue(FileCacheType type) const {
1060
137
    switch (type) {
1061
33
    case FileCacheType::INDEX:
1062
33
        return _index_queue;
1063
1
    case FileCacheType::DISPOSABLE:
1064
1
        return _disposable_queue;
1065
103
    case FileCacheType::NORMAL:
1066
103
        return _normal_queue;
1067
0
    case FileCacheType::TTL:
1068
0
        return _ttl_queue;
1069
0
    case FileCacheType::COLD_NORMAL:
1070
0
        return _cold_normal_queue;
1071
0
    default:
1072
0
        DCHECK(false);
1073
137
    }
1074
0
    return _normal_queue;
1075
137
}
1076
1077
void BlockFileCache::remove_file_blocks(std::vector<FileBlockCell*>& to_evict,
1078
                                        std::lock_guard<std::mutex>& cache_lock, bool sync,
1079
271k
                                        std::string& reason) {
1080
271k
    auto remove_file_block_if = [&](FileBlockCell* cell) {
1081
109k
        FileBlockSPtr file_block = cell->file_block;
1082
109k
        if (file_block) {
1083
109k
            std::lock_guard block_lock(file_block->_mutex);
1084
109k
            remove(file_block, cache_lock, block_lock, sync);
1085
109k
            VLOG_DEBUG << "remove_file_blocks"
1086
0
                       << " hash=" << file_block->get_hash_value().to_string()
1087
0
                       << " offset=" << file_block->offset() << " reason=" << reason;
1088
109k
        }
1089
109k
    };
1090
271k
    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
1091
271k
}
1092
1093
void BlockFileCache::find_evict_candidates(LRUQueue& queue, size_t size, size_t cur_cache_size,
1094
                                           size_t& removed_size,
1095
                                           std::vector<FileBlockCell*>& to_evict,
1096
                                           std::lock_guard<std::mutex>& cache_lock,
1097
5.66k
                                           size_t& cur_removed_size, bool evict_in_advance) {
1098
747k
    for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1099
747k
        if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1100
3.78k
            break;
1101
3.78k
        }
1102
743k
        auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1103
1104
743k
        DCHECK(cell) << "Cache became inconsistent. key: " << entry_key.to_string()
1105
0
                     << ", offset: " << entry_offset;
1106
1107
743k
        size_t cell_size = cell->size();
1108
743k
        DCHECK(entry_size == cell_size);
1109
1110
743k
        if (cell->releasable()) {
1111
106k
            auto& file_block = cell->file_block;
1112
1113
106k
            std::lock_guard block_lock(file_block->_mutex);
1114
106k
            DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1115
106k
            to_evict.push_back(cell);
1116
106k
            removed_size += cell_size;
1117
106k
            cur_removed_size += cell_size;
1118
106k
        }
1119
743k
    }
1120
5.66k
}
1121
1122
// 1. if async load file cache not finish
1123
//     a. evict from lru queue
1124
// 2. if ttl cache
1125
//     a. evict from disposable/normal/index queue one by one
1126
// 3. if dont reach query limit or dont have query limit
1127
//     a. evict from other queue
1128
//     b. evict from current queue
1129
//         a.1 if the data belong write, then just evict cold data
1130
// 4. if reach query limit
1131
//     a. evict from query queue
1132
//     b. evict from other queue
1133
bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const CacheContext& context,
1134
                                 size_t offset, size_t size,
1135
148k
                                 std::lock_guard<std::mutex>& cache_lock) {
1136
148k
    if (!_async_open_done) {
1137
2
        return try_reserve_during_async_load(size, cache_lock);
1138
2
    }
1139
    // use this strategy in scenarios where there is insufficient disk capacity or insufficient number of inodes remaining
1140
    // directly eliminate 5 times the size of the space
1141
148k
    if (_disk_resource_limit_mode) {
1142
1
        size = 5 * size;
1143
1
    }
1144
1145
148k
    auto query_context = config::enable_file_cache_query_limit &&
1146
148k
                                         (context.query_id.hi != 0 || context.query_id.lo != 0)
1147
148k
                                 ? get_query_context(context.query_id, cache_lock)
1148
148k
                                 : nullptr;
1149
148k
    if (!query_context) {
1150
148k
        return try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock);
1151
148k
    } else if (query_context->get_cache_size(cache_lock) + size <=
1152
31
               query_context->get_max_cache_size()) {
1153
16
        return try_reserve_for_lru(hash, query_context, context, offset, size, cache_lock);
1154
16
    }
1155
15
    int64_t cur_time = std::chrono::duration_cast<std::chrono::milliseconds>(
1156
15
                               std::chrono::steady_clock::now().time_since_epoch())
1157
15
                               .count();
1158
15
    auto& queue = get_queue(context.cache_type);
1159
15
    size_t removed_size = 0;
1160
15
    size_t ghost_remove_size = 0;
1161
15
    size_t queue_size = queue.get_capacity(cache_lock);
1162
15
    size_t cur_cache_size = _cur_cache_size;
1163
15
    size_t query_context_cache_size = query_context->get_cache_size(cache_lock);
1164
1165
15
    std::vector<LRUQueue::Iterator> ghost;
1166
15
    std::vector<FileBlockCell*> to_evict;
1167
1168
15
    size_t max_size = queue.get_max_size();
1169
48
    auto is_overflow = [&] {
1170
48
        return _disk_resource_limit_mode ? removed_size < size
1171
48
                                         : cur_cache_size + size - removed_size > _capacity ||
1172
48
                                                   (queue_size + size - removed_size > max_size) ||
1173
48
                                                   (query_context_cache_size + size -
1174
38
                                                            (removed_size + ghost_remove_size) >
1175
38
                                                    query_context->get_max_cache_size());
1176
48
    };
1177
1178
    /// Select the cache from the LRU queue held by query for expulsion.
1179
35
    for (auto iter = query_context->queue().begin(); iter != query_context->queue().end(); iter++) {
1180
33
        if (!is_overflow()) {
1181
13
            break;
1182
13
        }
1183
1184
20
        auto* cell = get_cell(iter->hash, iter->offset, cache_lock);
1185
1186
20
        if (!cell) {
1187
            /// The cache corresponding to this record may be swapped out by
1188
            /// other queries, so it has become invalid.
1189
4
            ghost.push_back(iter);
1190
4
            ghost_remove_size += iter->size;
1191
16
        } else {
1192
16
            size_t cell_size = cell->size();
1193
16
            DCHECK(iter->size == cell_size);
1194
1195
16
            if (cell->releasable()) {
1196
16
                auto& file_block = cell->file_block;
1197
1198
16
                std::lock_guard block_lock(file_block->_mutex);
1199
16
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1200
16
                to_evict.push_back(cell);
1201
16
                removed_size += cell_size;
1202
16
            }
1203
16
        }
1204
20
    }
1205
1206
16
    auto remove_file_block_if = [&](FileBlockCell* cell) {
1207
16
        FileBlockSPtr file_block = cell->file_block;
1208
16
        if (file_block) {
1209
16
            query_context->remove(file_block->get_hash_value(), file_block->offset(), cache_lock);
1210
16
            std::lock_guard block_lock(file_block->_mutex);
1211
16
            remove(file_block, cache_lock, block_lock);
1212
16
        }
1213
16
    };
1214
1215
15
    for (auto& iter : ghost) {
1216
4
        query_context->remove(iter->hash, iter->offset, cache_lock);
1217
4
    }
1218
1219
15
    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
1220
1221
15
    if (is_overflow() &&
1222
15
        !try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock)) {
1223
1
        return false;
1224
1
    }
1225
14
    query_context->reserve(hash, offset, size, cache_lock);
1226
14
    return true;
1227
15
}
1228
1229
253
void BlockFileCache::try_evict_in_advance(size_t size, std::lock_guard<std::mutex>& cache_lock) {
1230
253
    UInt128Wrapper hash = UInt128Wrapper();
1231
253
    size_t offset = 0;
1232
253
    CacheContext context;
1233
    /* we pick NORMAL and TTL cache to evict in advance
1234
     * we reserve for them but won't acutually give space to them
1235
     * on the contrary, NORMAL and TTL may sacrifice by LRU evicting themselves
1236
     * other cache types cannot be exempted because we will evict what they have stolen before LRU evicting
1237
     * in summary: all cache types will shrink somewhat, and NORMAL and TTL shrink the most, to make sure the cache is not full
1238
     */
1239
253
    context.cache_type = FileCacheType::NORMAL;
1240
253
    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
1241
253
    context.cache_type = FileCacheType::TTL;
1242
253
    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
1243
253
}
1244
1245
// remove specific cache synchronously, for critical operations
1246
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
1247
7
void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) {
1248
7
    std::string reason = "remove_if_cached";
1249
7
    SCOPED_CACHE_LOCK(_mutex, this);
1250
7
    auto iter = _files.find(file_key);
1251
7
    std::vector<FileBlockCell*> to_remove;
1252
7
    if (iter != _files.end()) {
1253
14
        for (auto& [_, cell] : iter->second) {
1254
14
            if (cell.releasable()) {
1255
13
                to_remove.push_back(&cell);
1256
13
            } else {
1257
1
                cell.file_block->set_deleting();
1258
1
            }
1259
14
        }
1260
6
    }
1261
7
    remove_file_blocks(to_remove, cache_lock, true, reason);
1262
7
}
1263
1264
// the async version of remove_if_cached, for background operations
1265
// cache meta is deleted synchronously if not in use, and the block file is deleted asynchronously
1266
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
1267
116k
void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) {
1268
116k
    std::string reason = "remove_if_cached_async";
1269
116k
    SCOPED_CACHE_LOCK(_mutex, this);
1270
1271
116k
    auto iter = _files.find(file_key);
1272
116k
    std::vector<FileBlockCell*> to_remove;
1273
116k
    if (iter != _files.end()) {
1274
6.27k
        for (auto& [_, cell] : iter->second) {
1275
6.27k
            *_gc_evict_bytes_metrics << cell.size();
1276
6.27k
            *_gc_evict_count_metrics << 1;
1277
6.27k
            if (cell.releasable()) {
1278
3.26k
                to_remove.push_back(&cell);
1279
3.26k
            } else {
1280
3.00k
                cell.file_block->set_deleting();
1281
3.00k
            }
1282
6.27k
        }
1283
6.12k
    }
1284
116k
    remove_file_blocks(to_remove, cache_lock, false, reason);
1285
116k
}
1286
1287
std::vector<FileCacheType> BlockFileCache::get_other_cache_type_without_ttl(
1288
149k
        FileCacheType cur_cache_type) {
1289
149k
    switch (cur_cache_type) {
1290
4.16k
    case FileCacheType::TTL:
1291
4.16k
        return {FileCacheType::DISPOSABLE, FileCacheType::COLD_NORMAL, FileCacheType::NORMAL,
1292
4.16k
                FileCacheType::INDEX};
1293
24.2k
    case FileCacheType::INDEX:
1294
24.2k
        return {FileCacheType::DISPOSABLE, FileCacheType::COLD_NORMAL, FileCacheType::NORMAL};
1295
117k
    case FileCacheType::NORMAL:
1296
117k
        return {FileCacheType::DISPOSABLE, FileCacheType::COLD_NORMAL, FileCacheType::INDEX};
1297
44
    case FileCacheType::COLD_NORMAL:
1298
44
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
1299
3.81k
    case FileCacheType::DISPOSABLE:
1300
3.81k
        return {FileCacheType::COLD_NORMAL, FileCacheType::NORMAL, FileCacheType::INDEX};
1301
0
    default:
1302
0
        return {};
1303
149k
    }
1304
0
    return {};
1305
149k
}
1306
1307
5.34k
std::vector<FileCacheType> BlockFileCache::get_other_cache_type(FileCacheType cur_cache_type) {
1308
5.34k
    switch (cur_cache_type) {
1309
535
    case FileCacheType::TTL:
1310
535
        return {FileCacheType::DISPOSABLE, FileCacheType::COLD_NORMAL, FileCacheType::NORMAL,
1311
535
                FileCacheType::INDEX};
1312
862
    case FileCacheType::INDEX:
1313
862
        return {FileCacheType::DISPOSABLE, FileCacheType::COLD_NORMAL, FileCacheType::NORMAL,
1314
862
                FileCacheType::TTL};
1315
3.74k
    case FileCacheType::NORMAL:
1316
3.74k
        return {FileCacheType::DISPOSABLE, FileCacheType::COLD_NORMAL, FileCacheType::INDEX,
1317
3.74k
                FileCacheType::TTL};
1318
0
    case FileCacheType::COLD_NORMAL:
1319
0
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX,
1320
0
                FileCacheType::TTL};
1321
202
    case FileCacheType::DISPOSABLE:
1322
202
        return {FileCacheType::COLD_NORMAL, FileCacheType::NORMAL, FileCacheType::INDEX,
1323
202
                FileCacheType::TTL};
1324
0
    default:
1325
0
        return {};
1326
5.34k
    }
1327
0
    return {};
1328
5.34k
}
1329
1330
void BlockFileCache::reset_range(const UInt128Wrapper& hash, size_t offset, size_t old_size,
1331
7.07k
                                 size_t new_size, std::lock_guard<std::mutex>& cache_lock) {
1332
7.07k
    DCHECK(_files.find(hash) != _files.end() &&
1333
7.07k
           _files.find(hash)->second.find(offset) != _files.find(hash)->second.end());
1334
7.07k
    FileBlockCell* cell = get_cell(hash, offset, cache_lock);
1335
7.07k
    DCHECK(cell != nullptr);
1336
7.07k
    if (cell == nullptr) {
1337
0
        LOG(WARNING) << "reset_range skipped because cache cell is missing. hash="
1338
0
                     << hash.to_string() << " offset=" << offset << " old_size=" << old_size
1339
0
                     << " new_size=" << new_size;
1340
0
        return;
1341
0
    }
1342
7.07k
    if (cell->queue_iterator) {
1343
7.07k
        auto& queue = get_queue(cell->file_block->cache_type());
1344
7.07k
        DCHECK(queue.contains(hash, offset, cache_lock));
1345
7.07k
        queue.resize(*cell->queue_iterator, new_size, cache_lock);
1346
7.07k
        _lru_recorder->record_queue_event(cell->file_block->cache_type(), CacheLRULogType::RESIZE,
1347
7.07k
                                          cell->file_block->get_hash_value(),
1348
7.07k
                                          cell->file_block->offset(), new_size);
1349
7.07k
    }
1350
7.07k
    _cur_cache_size -= old_size;
1351
7.07k
    _cur_cache_size += new_size;
1352
7.07k
}
1353
1354
bool BlockFileCache::try_reserve_from_other_queue_by_time_interval(
1355
        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
1356
149k
        int64_t cur_time, std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
1357
149k
    size_t removed_size = 0;
1358
149k
    size_t cur_cache_size = _cur_cache_size;
1359
149k
    std::vector<FileBlockCell*> to_evict;
1360
452k
    for (FileCacheType cache_type : other_cache_types) {
1361
452k
        auto& queue = get_queue(cache_type);
1362
452k
        size_t remove_size_per_type = 0;
1363
452k
        for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1364
270k
            if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1365
263k
                break;
1366
263k
            }
1367
6.31k
            auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1368
6.31k
            DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
1369
0
                         << ", offset: " << entry_offset;
1370
1371
6.31k
            size_t cell_size = cell->size();
1372
6.31k
            DCHECK(entry_size == cell_size);
1373
1374
6.31k
            if (cell->atime == 0 ? true
1375
6.31k
                                 : cell->atime + queue.get_hot_data_interval() * 1000 > cur_time) {
1376
6.31k
                break;
1377
6.31k
            }
1378
1379
2
            if (cell->releasable()) {
1380
2
                auto& file_block = cell->file_block;
1381
2
                std::lock_guard block_lock(file_block->_mutex);
1382
2
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1383
2
                to_evict.push_back(cell);
1384
2
                removed_size += cell_size;
1385
2
                remove_size_per_type += cell_size;
1386
2
            }
1387
2
        }
1388
452k
        *(_evict_by_time_metrics_matrix[cache_type][cur_type]) << remove_size_per_type;
1389
452k
    }
1390
149k
    bool is_sync_removal = !evict_in_advance;
1391
149k
    std::string reason = std::string("try_reserve_by_time ") +
1392
149k
                         " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1393
149k
    remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1394
1395
149k
    return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
1396
149k
}
1397
1398
bool BlockFileCache::is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size,
1399
1.17M
                                 bool evict_in_advance) const {
1400
1.17M
    bool ret = false;
1401
1.17M
    if (evict_in_advance) { // we don't need to check _need_evict_cache_in_advance
1402
96.8k
        ret = (removed_size < need_size);
1403
96.8k
        return ret;
1404
96.8k
    }
1405
1.07M
    if (_disk_resource_limit_mode) {
1406
4
        ret = (removed_size < need_size);
1407
1.07M
    } else {
1408
1.07M
        ret = (cur_cache_size + need_size - removed_size > _capacity);
1409
1.07M
    }
1410
1.07M
    return ret;
1411
1.17M
}
1412
1413
bool BlockFileCache::try_reserve_from_other_queue_by_size(
1414
        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
1415
915
        std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
1416
915
    size_t removed_size = 0;
1417
915
    size_t cur_cache_size = _cur_cache_size;
1418
915
    std::vector<FileBlockCell*> to_evict;
1419
    // we follow the privilege defined in get_other_cache_types to evict
1420
3.66k
    for (FileCacheType cache_type : other_cache_types) {
1421
3.66k
        auto& queue = get_queue(cache_type);
1422
1423
        // we will not drain each of them to the bottom -- i.e., we only
1424
        // evict what they have stolen.
1425
3.66k
        size_t cur_queue_size = queue.get_capacity(cache_lock);
1426
3.66k
        size_t cur_queue_max_size = queue.get_max_size();
1427
3.66k
        if (cur_queue_size <= cur_queue_max_size) {
1428
2.76k
            continue;
1429
2.76k
        }
1430
896
        size_t cur_removed_size = 0;
1431
896
        find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
1432
896
                              cur_removed_size, evict_in_advance);
1433
896
        *(_evict_by_size_metrics_matrix[cache_type][cur_type]) << cur_removed_size;
1434
896
    }
1435
915
    bool is_sync_removal = !evict_in_advance;
1436
915
    std::string reason = std::string("try_reserve_by_size") +
1437
915
                         " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1438
915
    remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1439
915
    return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
1440
915
}
1441
1442
bool BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type, size_t size,
1443
                                                  int64_t cur_time,
1444
                                                  std::lock_guard<std::mutex>& cache_lock,
1445
149k
                                                  bool evict_in_advance) {
1446
    // currently, TTL cache is not considered as a candidate
1447
149k
    auto other_cache_types = get_other_cache_type_without_ttl(cur_cache_type);
1448
149k
    bool reserve_success = try_reserve_from_other_queue_by_time_interval(
1449
149k
            cur_cache_type, other_cache_types, size, cur_time, cache_lock, evict_in_advance);
1450
149k
    if (reserve_success || !config::file_cache_enable_evict_from_other_queue_by_size) {
1451
143k
        return reserve_success;
1452
143k
    }
1453
1454
5.34k
    other_cache_types = get_other_cache_type(cur_cache_type);
1455
5.34k
    auto& cur_queue = get_queue(cur_cache_type);
1456
5.34k
    size_t cur_queue_size = cur_queue.get_capacity(cache_lock);
1457
5.34k
    size_t cur_queue_max_size = cur_queue.get_max_size();
1458
    // Hit the soft limit by self, cannot remove from other queues
1459
5.34k
    if (_cur_cache_size + size > _capacity && cur_queue_size + size > cur_queue_max_size) {
1460
4.43k
        return false;
1461
4.43k
    }
1462
913
    return try_reserve_from_other_queue_by_size(cur_cache_type, other_cache_types, size, cache_lock,
1463
913
                                                evict_in_advance);
1464
5.34k
}
1465
1466
bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash,
1467
                                         QueryFileCacheContextPtr query_context,
1468
                                         const CacheContext& context, size_t offset, size_t size,
1469
                                         std::lock_guard<std::mutex>& cache_lock,
1470
149k
                                         bool evict_in_advance) {
1471
149k
    int64_t cur_time = std::chrono::duration_cast<std::chrono::milliseconds>(
1472
149k
                               std::chrono::steady_clock::now().time_since_epoch())
1473
149k
                               .count();
1474
149k
    if (!try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock,
1475
149k
                                      evict_in_advance)) {
1476
4.76k
        auto& queue = get_queue(context.cache_type);
1477
4.76k
        size_t removed_size = 0;
1478
4.76k
        size_t cur_cache_size = _cur_cache_size;
1479
1480
4.76k
        std::vector<FileBlockCell*> to_evict;
1481
4.76k
        size_t cur_removed_size = 0;
1482
4.76k
        find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
1483
4.76k
                              cur_removed_size, evict_in_advance);
1484
4.76k
        bool is_sync_removal = !evict_in_advance;
1485
4.76k
        std::string reason = std::string("try_reserve for cache type ") +
1486
4.76k
                             cache_type_to_string(context.cache_type) +
1487
4.76k
                             " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1488
4.76k
        remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1489
4.76k
        *(_evict_by_self_lru_metrics_matrix[context.cache_type]) << cur_removed_size;
1490
1491
4.76k
        if (is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1492
1.73k
            return false;
1493
1.73k
        }
1494
4.76k
    }
1495
1496
147k
    if (query_context) {
1497
16
        query_context->reserve(hash, offset, size, cache_lock);
1498
16
    }
1499
147k
    return true;
1500
149k
}
1501
1502
template <class T, class U>
1503
    requires IsXLock<T> && IsXLock<U>
1504
141k
void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lock, bool sync) {
1505
141k
    auto hash = file_block->get_hash_value();
1506
141k
    auto offset = file_block->offset();
1507
141k
    auto type = file_block->cache_type();
1508
141k
    auto expiration_time = file_block->expiration_time();
1509
141k
    auto tablet_id = file_block->tablet_id();
1510
141k
    auto* cell = get_cell(hash, offset, cache_lock);
1511
141k
    file_block->cell = nullptr;
1512
141k
    DCHECK(cell);
1513
141k
    DCHECK(cell->queue_iterator);
1514
141k
    if (cell->queue_iterator) {
1515
141k
        auto& queue = get_queue(file_block->cache_type());
1516
141k
        queue.remove(*cell->queue_iterator, cache_lock);
1517
141k
        _lru_recorder->record_queue_event(file_block->cache_type(), CacheLRULogType::REMOVE,
1518
141k
                                          cell->file_block->get_hash_value(),
1519
141k
                                          cell->file_block->offset(), cell->size());
1520
141k
    }
1521
141k
    *_queue_evict_size_metrics[static_cast<int>(file_block->cache_type())]
1522
141k
            << file_block->range().size();
1523
141k
    *_total_evict_size_metrics << file_block->range().size();
1524
1525
141k
    VLOG_DEBUG << "Removing file block from cache. hash: " << hash.to_string()
1526
0
               << ", offset: " << offset << ", size: " << file_block->range().size()
1527
0
               << ", type: " << cache_type_to_string(type);
1528
1529
141k
    if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADED) {
1530
111k
        FileCacheKey key;
1531
111k
        key.hash = hash;
1532
111k
        key.offset = offset;
1533
111k
        key.meta.type = type;
1534
111k
        key.meta.expiration_time = expiration_time;
1535
111k
        key.meta.tablet_id = tablet_id;
1536
111k
        if (sync) {
1537
12.2k
            int64_t duration_ns = 0;
1538
12.2k
            Status st;
1539
12.2k
            {
1540
12.2k
                SCOPED_RAW_TIMER(&duration_ns);
1541
12.2k
                st = _storage->remove(key);
1542
12.2k
            }
1543
12.2k
            *_storage_sync_remove_latency_us << (duration_ns / 1000);
1544
12.2k
            if (!st.ok()) {
1545
261
                LOG_WARNING("").error(st);
1546
261
            }
1547
99.0k
        } else {
1548
            // the file will be deleted in the bottom half
1549
            // so there will be a window that the file is not in the cache but still in the storage
1550
            // but it's ok, because the rowset is stale already
1551
99.0k
            bool ret = _recycle_keys.enqueue(key);
1552
99.0k
            if (ret) [[likely]] {
1553
99.0k
                *_recycle_keys_length_recorder << _recycle_keys.size_approx();
1554
99.0k
            } else {
1555
0
                LOG_WARNING("Failed to push recycle key to queue, do it synchronously");
1556
0
                int64_t duration_ns = 0;
1557
0
                Status st;
1558
0
                {
1559
0
                    SCOPED_RAW_TIMER(&duration_ns);
1560
0
                    st = _storage->remove(key);
1561
0
                }
1562
0
                *_storage_retry_sync_remove_latency_us << (duration_ns / 1000);
1563
0
                if (!st.ok()) {
1564
0
                    LOG_WARNING("").error(st);
1565
0
                }
1566
0
            }
1567
99.0k
        }
1568
111k
    } else if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADING) {
1569
0
        file_block->set_deleting();
1570
0
        return;
1571
0
    }
1572
141k
    _cur_cache_size -= file_block->range().size();
1573
141k
    if (FileCacheType::TTL == type) {
1574
1.29k
        _cur_ttl_size -= file_block->range().size();
1575
1.29k
    }
1576
141k
    auto it = _files.find(hash);
1577
141k
    if (it != _files.end()) {
1578
141k
        it->second.erase(file_block->offset());
1579
141k
        if (it->second.empty()) {
1580
105k
            _files.erase(hash);
1581
105k
        }
1582
141k
    }
1583
141k
    *_num_removed_blocks << 1;
1584
141k
}
1585
1586
46
size_t BlockFileCache::get_used_cache_size(FileCacheType cache_type) const {
1587
46
    SCOPED_CACHE_LOCK(_mutex, this);
1588
46
    return get_used_cache_size_unlocked(cache_type, cache_lock);
1589
46
}
1590
1591
size_t BlockFileCache::get_used_cache_size_unlocked(FileCacheType cache_type,
1592
46
                                                    std::lock_guard<std::mutex>& cache_lock) const {
1593
46
    return get_queue(cache_type).get_capacity(cache_lock);
1594
46
}
1595
1596
0
size_t BlockFileCache::get_available_cache_size(FileCacheType cache_type) const {
1597
0
    SCOPED_CACHE_LOCK(_mutex, this);
1598
0
    return get_available_cache_size_unlocked(cache_type, cache_lock);
1599
0
}
1600
1601
size_t BlockFileCache::get_available_cache_size_unlocked(
1602
0
        FileCacheType cache_type, std::lock_guard<std::mutex>& cache_lock) const {
1603
0
    return get_queue(cache_type).get_max_element_size() -
1604
0
           get_used_cache_size_unlocked(cache_type, cache_lock);
1605
0
}
1606
1607
88
size_t BlockFileCache::get_file_blocks_num(FileCacheType cache_type) const {
1608
88
    SCOPED_CACHE_LOCK(_mutex, this);
1609
88
    return get_file_blocks_num_unlocked(cache_type, cache_lock);
1610
88
}
1611
1612
size_t BlockFileCache::get_file_blocks_num_unlocked(FileCacheType cache_type,
1613
88
                                                    std::lock_guard<std::mutex>& cache_lock) const {
1614
88
    return get_queue(cache_type).get_elements_num(cache_lock);
1615
88
}
1616
1617
FileBlockCell::FileBlockCell(FileBlockSPtr file_block, std::lock_guard<std::mutex>& cache_lock)
1618
147k
        : file_block(file_block) {
1619
147k
    file_block->cell = this;
1620
    /**
1621
     * Cell can be created with either DOWNLOADED or EMPTY file block's state.
1622
     * File block acquires DOWNLOADING state and creates LRUQueue iterator on first
1623
     * successful getOrSetDownaloder call.
1624
     */
1625
1626
147k
    switch (file_block->_download_state) {
1627
222
    case FileBlock::State::DOWNLOADED:
1628
147k
    case FileBlock::State::EMPTY:
1629
147k
    case FileBlock::State::SKIP_CACHE: {
1630
147k
        break;
1631
147k
    }
1632
0
    default:
1633
0
        DCHECK(false) << "Can create cell with either EMPTY, DOWNLOADED, SKIP_CACHE state, got: "
1634
0
                      << FileBlock::state_to_string(file_block->_download_state);
1635
147k
    }
1636
147k
    if (file_block->cache_type() == FileCacheType::TTL) {
1637
3.92k
        update_atime();
1638
3.92k
    }
1639
147k
}
1640
1641
LRUQueue::Iterator LRUQueue::add(const UInt128Wrapper& hash, size_t offset, size_t size,
1642
287k
                                 std::lock_guard<std::mutex>& /* cache_lock */) {
1643
287k
    cache_size += size;
1644
287k
    auto iter = queue.insert(queue.end(), FileKeyAndOffset(hash, offset, size));
1645
287k
    map.insert(std::make_pair(std::make_pair(hash, offset), iter));
1646
287k
    return iter;
1647
287k
}
1648
1649
0
void LRUQueue::remove_all(std::lock_guard<std::mutex>& /* cache_lock */) {
1650
0
    queue.clear();
1651
0
    map.clear();
1652
0
    cache_size = 0;
1653
0
}
1654
1655
6.07M
void LRUQueue::move_to_end(Iterator queue_it, std::lock_guard<std::mutex>& /* cache_lock */) {
1656
6.07M
    queue.splice(queue.end(), queue, queue_it);
1657
6.07M
}
1658
1659
void LRUQueue::resize(Iterator queue_it, size_t new_size,
1660
14.1k
                      std::lock_guard<std::mutex>& /* cache_lock */) {
1661
14.1k
    cache_size -= queue_it->size;
1662
14.1k
    queue_it->size = new_size;
1663
14.1k
    cache_size += new_size;
1664
14.1k
}
1665
bool LRUQueue::contains(const UInt128Wrapper& hash, size_t offset,
1666
7.07k
                        std::lock_guard<std::mutex>& /* cache_lock */) const {
1667
7.07k
    return map.find(std::make_pair(hash, offset)) != map.end();
1668
7.07k
}
1669
1670
LRUQueue::Iterator LRUQueue::get(const UInt128Wrapper& hash, size_t offset,
1671
3.43M
                                 std::lock_guard<std::mutex>& /* cache_lock */) const {
1672
3.43M
    auto itr = map.find(std::make_pair(hash, offset));
1673
3.43M
    if (itr != map.end()) {
1674
2.93M
        return itr->second;
1675
2.93M
    }
1676
500k
    return std::list<FileKeyAndOffset>::iterator();
1677
3.43M
}
1678
1679
2
std::string LRUQueue::to_string(std::lock_guard<std::mutex>& /* cache_lock */) const {
1680
2
    std::string result;
1681
18
    for (const auto& [hash, offset, size] : queue) {
1682
18
        if (!result.empty()) {
1683
16
            result += ", ";
1684
16
        }
1685
18
        result += fmt::format("{}: [{}, {}]", hash.to_string(), offset, offset + size - 1);
1686
18
    }
1687
2
    return result;
1688
2
}
1689
1690
size_t LRUQueue::levenshtein_distance_from(LRUQueue& base,
1691
5
                                           std::lock_guard<std::mutex>& cache_lock) {
1692
5
    std::list<FileKeyAndOffset> target_queue = this->queue;
1693
5
    std::list<FileKeyAndOffset> base_queue = base.queue;
1694
5
    std::vector<FileKeyAndOffset> vec1(target_queue.begin(), target_queue.end());
1695
5
    std::vector<FileKeyAndOffset> vec2(base_queue.begin(), base_queue.end());
1696
1697
5
    size_t m = vec1.size();
1698
5
    size_t n = vec2.size();
1699
1700
    // Create a 2D vector (matrix) to store the Levenshtein distances
1701
    // dp[i][j] will hold the distance between the first i elements of vec1 and the first j elements of vec2
1702
5
    std::vector<std::vector<size_t>> dp(m + 1, std::vector<size_t>(n + 1, 0));
1703
1704
    // Initialize the first row and column of the matrix
1705
    // The distance between an empty list and a list of length k is k (all insertions or deletions)
1706
22
    for (size_t i = 0; i <= m; ++i) {
1707
17
        dp[i][0] = i;
1708
17
    }
1709
20
    for (size_t j = 0; j <= n; ++j) {
1710
15
        dp[0][j] = j;
1711
15
    }
1712
1713
    // Fill the matrix using dynamic programming
1714
17
    for (size_t i = 1; i <= m; ++i) {
1715
38
        for (size_t j = 1; j <= n; ++j) {
1716
            // Check if the current elements of both vectors are equal
1717
26
            size_t cost = (vec1[i - 1].hash == vec2[j - 1].hash &&
1718
26
                           vec1[i - 1].offset == vec2[j - 1].offset)
1719
26
                                  ? 0
1720
26
                                  : 1;
1721
            // Calculate the minimum cost of three possible operations:
1722
            // 1. Insertion: dp[i][j-1] + 1
1723
            // 2. Deletion: dp[i-1][j] + 1
1724
            // 3. Substitution: dp[i-1][j-1] + cost (0 if elements are equal, 1 if not)
1725
26
            dp[i][j] = std::min({dp[i - 1][j] + 1, dp[i][j - 1] + 1, dp[i - 1][j - 1] + cost});
1726
26
        }
1727
12
    }
1728
    // The bottom-right cell of the matrix contains the Levenshtein distance
1729
5
    return dp[m][n];
1730
5
}
1731
1732
1
std::string BlockFileCache::dump_structure(const UInt128Wrapper& hash) {
1733
1
    SCOPED_CACHE_LOCK(_mutex, this);
1734
1
    return dump_structure_unlocked(hash, cache_lock);
1735
1
}
1736
1737
std::string BlockFileCache::dump_structure_unlocked(const UInt128Wrapper& hash,
1738
1
                                                    std::lock_guard<std::mutex>&) {
1739
1
    std::stringstream result;
1740
1
    auto it = _files.find(hash);
1741
1
    if (it == _files.end()) {
1742
0
        return std::string("");
1743
0
    }
1744
1
    const auto& cells_by_offset = it->second;
1745
1746
1
    for (const auto& [_, cell] : cells_by_offset) {
1747
1
        result << cell.file_block->get_info_for_log() << " "
1748
1
               << cache_type_to_string(cell.file_block->cache_type()) << "\n";
1749
1
    }
1750
1751
1
    return result.str();
1752
1
}
1753
1754
0
std::string BlockFileCache::dump_single_cache_type(const UInt128Wrapper& hash, size_t offset) {
1755
0
    SCOPED_CACHE_LOCK(_mutex, this);
1756
0
    return dump_single_cache_type_unlocked(hash, offset, cache_lock);
1757
0
}
1758
1759
std::string BlockFileCache::dump_single_cache_type_unlocked(const UInt128Wrapper& hash,
1760
                                                            size_t offset,
1761
0
                                                            std::lock_guard<std::mutex>&) {
1762
0
    std::stringstream result;
1763
0
    auto it = _files.find(hash);
1764
0
    if (it == _files.end()) {
1765
0
        return std::string("");
1766
0
    }
1767
0
    const auto& cells_by_offset = it->second;
1768
0
    const auto& cell = cells_by_offset.find(offset);
1769
1770
0
    return cache_type_to_string(cell->second.file_block->cache_type());
1771
0
}
1772
1773
void BlockFileCache::change_cache_type(const UInt128Wrapper& hash, size_t offset,
1774
                                       FileCacheType new_type,
1775
36
                                       std::lock_guard<std::mutex>& cache_lock) {
1776
36
    if (auto iter = _files.find(hash); iter != _files.end()) {
1777
36
        auto& file_blocks = iter->second;
1778
36
        if (auto cell_it = file_blocks.find(offset); cell_it != file_blocks.end()) {
1779
35
            FileBlockCell& cell = cell_it->second;
1780
35
            auto& cur_queue = get_queue(cell.file_block->cache_type());
1781
35
            DCHECK(cell.queue_iterator.has_value());
1782
35
            cur_queue.remove(*cell.queue_iterator, cache_lock);
1783
35
            _lru_recorder->record_queue_event(
1784
35
                    cell.file_block->cache_type(), CacheLRULogType::REMOVE,
1785
35
                    cell.file_block->get_hash_value(), cell.file_block->offset(), cell.size());
1786
35
            auto& new_queue = get_queue(new_type);
1787
35
            cell.queue_iterator =
1788
35
                    new_queue.add(hash, offset, cell.file_block->range().size(), cache_lock);
1789
35
            _lru_recorder->record_queue_event(new_type, CacheLRULogType::ADD,
1790
35
                                              cell.file_block->get_hash_value(),
1791
35
                                              cell.file_block->offset(), cell.size());
1792
35
        }
1793
36
    }
1794
36
}
1795
1796
// @brief: get a path's disk capacity used percent, inode used percent
1797
// @param: path
1798
// @param: percent.first disk used percent, percent.second inode used percent
1799
8.33k
int disk_used_percentage(const std::string& path, std::pair<int, int>* percent) {
1800
8.33k
    struct statfs stat;
1801
8.33k
    int ret = statfs(path.c_str(), &stat);
1802
8.33k
    if (ret != 0) {
1803
5
        return ret;
1804
5
    }
1805
    // https://github.com/coreutils/coreutils/blob/master/src/df.c#L1195
1806
    // v->used = stat.f_blocks - stat.f_bfree
1807
    // nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail
1808
8.32k
    uintmax_t u100 = (stat.f_blocks - stat.f_bfree) * 100;
1809
8.32k
    uintmax_t nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail;
1810
8.32k
    int capacity_percentage = int(u100 / nonroot_total + (u100 % nonroot_total != 0));
1811
1812
8.32k
    unsigned long long inode_free = stat.f_ffree;
1813
8.32k
    unsigned long long inode_total = stat.f_files;
1814
8.32k
    int inode_percentage = cast_set<int>(inode_free * 100 / inode_total);
1815
8.32k
    percent->first = capacity_percentage;
1816
8.32k
    percent->second = 100 - inode_percentage;
1817
1818
    // Add sync point for testing
1819
8.32k
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::disk_used_percentage:1", percent);
1820
1821
8.32k
    return 0;
1822
8.33k
}
1823
1824
10
std::string BlockFileCache::reset_capacity(size_t new_capacity) {
1825
10
    using namespace std::chrono;
1826
10
    int64_t space_released = 0;
1827
10
    size_t old_capacity = 0;
1828
10
    std::stringstream ss;
1829
10
    ss << "finish reset_capacity, path=" << _cache_base_path;
1830
10
    auto adjust_start_time = steady_clock::time_point();
1831
10
    {
1832
10
        SCOPED_CACHE_LOCK(_mutex, this);
1833
10
        if (new_capacity < _capacity && new_capacity < _cur_cache_size) {
1834
1
            int64_t need_remove_size = _cur_cache_size - new_capacity;
1835
5
            auto remove_blocks = [&](LRUQueue& queue) -> int64_t {
1836
5
                int64_t queue_released = 0;
1837
5
                std::vector<FileBlockCell*> to_evict;
1838
13
                for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1839
13
                    if (need_remove_size <= 0) {
1840
1
                        break;
1841
1
                    }
1842
12
                    need_remove_size -= entry_size;
1843
12
                    space_released += entry_size;
1844
12
                    queue_released += entry_size;
1845
12
                    auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1846
12
                    if (!cell->releasable()) {
1847
0
                        cell->file_block->set_deleting();
1848
0
                        continue;
1849
0
                    }
1850
12
                    to_evict.push_back(cell);
1851
12
                }
1852
12
                for (auto& cell : to_evict) {
1853
12
                    FileBlockSPtr file_block = cell->file_block;
1854
12
                    std::lock_guard block_lock(file_block->_mutex);
1855
12
                    remove(file_block, cache_lock, block_lock);
1856
12
                }
1857
5
                return queue_released;
1858
5
            };
1859
1
            int64_t queue_released = remove_blocks(_disposable_queue);
1860
1
            ss << " disposable_queue released " << queue_released;
1861
1
            queue_released = remove_blocks(_normal_queue);
1862
1
            ss << " normal_queue released " << queue_released;
1863
1
            queue_released = remove_blocks(_index_queue);
1864
1
            ss << " index_queue released " << queue_released;
1865
1
            queue_released = remove_blocks(_ttl_queue);
1866
1
            ss << " ttl_queue released " << queue_released;
1867
1
            queue_released = remove_blocks(_cold_normal_queue);
1868
1
            ss << " cold_normal_queue released " << queue_released;
1869
1870
1
            _disk_resource_limit_mode = true;
1871
1
            _disk_limit_mode_metrics->set_value(1);
1872
1
            ss << " total_space_released=" << space_released;
1873
1
        }
1874
10
        old_capacity = _capacity;
1875
10
        _capacity = new_capacity;
1876
10
        _cache_capacity_metrics->set_value(_capacity);
1877
10
    }
1878
10
    auto use_time = duration_cast<milliseconds>(steady_clock::time_point() - adjust_start_time);
1879
10
    LOG(INFO) << "Finish tag deleted block. path=" << _cache_base_path
1880
10
              << " use_time=" << cast_set<int64_t>(use_time.count());
1881
10
    ss << " old_capacity=" << old_capacity << " new_capacity=" << new_capacity;
1882
10
    LOG(INFO) << ss.str();
1883
10
    return ss.str();
1884
10
}
1885
1886
5.16k
void BlockFileCache::check_disk_resource_limit() {
1887
5.16k
    if (_storage->get_type() != FileCacheStorageType::DISK) {
1888
930
        return;
1889
930
    }
1890
1891
4.23k
    bool previous_mode = _disk_resource_limit_mode;
1892
4.23k
    if (_capacity > _cur_cache_size) {
1893
4.23k
        _disk_resource_limit_mode = false;
1894
4.23k
        _disk_limit_mode_metrics->set_value(0);
1895
4.23k
    }
1896
4.23k
    std::pair<int, int> percent;
1897
4.23k
    int ret = disk_used_percentage(_cache_base_path, &percent);
1898
4.23k
    if (ret != 0) {
1899
4
        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
1900
4
        return;
1901
4
    }
1902
4.23k
    auto [space_percentage, inode_percentage] = percent;
1903
8.46k
    auto is_insufficient = [](const int& percentage) {
1904
8.46k
        return percentage >= config::file_cache_enter_disk_resource_limit_mode_percent;
1905
8.46k
    };
1906
4.23k
    DCHECK_GE(space_percentage, 0);
1907
4.23k
    DCHECK_LE(space_percentage, 100);
1908
4.23k
    DCHECK_GE(inode_percentage, 0);
1909
4.23k
    DCHECK_LE(inode_percentage, 100);
1910
    // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
1911
    // FIXME: reject with config validator
1912
4.23k
    if (config::file_cache_enter_disk_resource_limit_mode_percent <
1913
4.23k
        config::file_cache_exit_disk_resource_limit_mode_percent) {
1914
1
        LOG_WARNING("config error, set to default value")
1915
1
                .tag("enter", config::file_cache_enter_disk_resource_limit_mode_percent)
1916
1
                .tag("exit", config::file_cache_exit_disk_resource_limit_mode_percent);
1917
1
        config::file_cache_enter_disk_resource_limit_mode_percent = 88;
1918
1
        config::file_cache_exit_disk_resource_limit_mode_percent = 80;
1919
1
    }
1920
4.23k
    bool is_space_insufficient = is_insufficient(space_percentage);
1921
4.23k
    bool is_inode_insufficient = is_insufficient(inode_percentage);
1922
4.23k
    if (is_space_insufficient || is_inode_insufficient) {
1923
5
        _disk_resource_limit_mode = true;
1924
5
        _disk_limit_mode_metrics->set_value(1);
1925
4.22k
    } else if (_disk_resource_limit_mode &&
1926
4.22k
               (space_percentage < config::file_cache_exit_disk_resource_limit_mode_percent) &&
1927
4.22k
               (inode_percentage < config::file_cache_exit_disk_resource_limit_mode_percent)) {
1928
0
        _disk_resource_limit_mode = false;
1929
0
        _disk_limit_mode_metrics->set_value(0);
1930
0
    }
1931
4.23k
    if (previous_mode != _disk_resource_limit_mode) {
1932
        // add log for disk resource limit mode switching
1933
10
        if (_disk_resource_limit_mode) {
1934
5
            LOG(WARNING) << "Entering disk resource limit mode: file_cache=" << get_base_path()
1935
5
                         << " space_percent=" << space_percentage
1936
5
                         << " inode_percent=" << inode_percentage
1937
5
                         << " is_space_insufficient=" << is_space_insufficient
1938
5
                         << " is_inode_insufficient=" << is_inode_insufficient
1939
5
                         << " enter threshold="
1940
5
                         << config::file_cache_enter_disk_resource_limit_mode_percent;
1941
5
        } else {
1942
5
            LOG(INFO) << "Exiting disk resource limit mode: file_cache=" << get_base_path()
1943
5
                      << " space_percent=" << space_percentage
1944
5
                      << " inode_percent=" << inode_percentage << " exit threshold="
1945
5
                      << config::file_cache_exit_disk_resource_limit_mode_percent;
1946
5
        }
1947
4.22k
    } else if (_disk_resource_limit_mode) {
1948
        // print log for disk resource limit mode running, but less frequently
1949
0
        LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path()
1950
0
                                 << " space_percent=" << space_percentage
1951
0
                                 << " inode_percent=" << inode_percentage
1952
0
                                 << " is_space_insufficient=" << is_space_insufficient
1953
0
                                 << " is_inode_insufficient=" << is_inode_insufficient
1954
0
                                 << " mode run in resource limit";
1955
0
    }
1956
4.23k
}
1957
1958
4.09k
void BlockFileCache::check_need_evict_cache_in_advance() {
1959
4.09k
    if (_storage->get_type() != FileCacheStorageType::DISK) {
1960
1
        return;
1961
1
    }
1962
1963
4.09k
    std::pair<int, int> percent;
1964
4.09k
    int ret = disk_used_percentage(_cache_base_path, &percent);
1965
4.09k
    if (ret != 0) {
1966
1
        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
1967
1
        return;
1968
1
    }
1969
4.09k
    auto [space_percentage, inode_percentage] = percent;
1970
4.09k
    int size_percentage = static_cast<int>(_cur_cache_size * 100 / _capacity);
1971
12.2k
    auto is_insufficient = [](const int& percentage) {
1972
12.2k
        return percentage >= config::file_cache_enter_need_evict_cache_in_advance_percent;
1973
12.2k
    };
1974
4.09k
    DCHECK_GE(space_percentage, 0);
1975
4.09k
    DCHECK_LE(space_percentage, 100);
1976
4.09k
    DCHECK_GE(inode_percentage, 0);
1977
4.09k
    DCHECK_LE(inode_percentage, 100);
1978
    // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
1979
    // FIXME: reject with config validator
1980
4.09k
    if (config::file_cache_enter_need_evict_cache_in_advance_percent <=
1981
4.09k
        config::file_cache_exit_need_evict_cache_in_advance_percent) {
1982
1
        LOG_WARNING("config error, set to default value")
1983
1
                .tag("enter", config::file_cache_enter_need_evict_cache_in_advance_percent)
1984
1
                .tag("exit", config::file_cache_exit_need_evict_cache_in_advance_percent);
1985
1
        config::file_cache_enter_need_evict_cache_in_advance_percent = 78;
1986
1
        config::file_cache_exit_need_evict_cache_in_advance_percent = 75;
1987
1
    }
1988
4.09k
    bool previous_mode = _need_evict_cache_in_advance;
1989
4.09k
    bool is_space_insufficient = is_insufficient(space_percentage);
1990
4.09k
    bool is_inode_insufficient = is_insufficient(inode_percentage);
1991
4.09k
    bool is_size_insufficient = is_insufficient(size_percentage);
1992
4.09k
    if (is_space_insufficient || is_inode_insufficient || is_size_insufficient) {
1993
69
        _need_evict_cache_in_advance = true;
1994
69
        _need_evict_cache_in_advance_metrics->set_value(1);
1995
4.02k
    } else if (_need_evict_cache_in_advance &&
1996
4.02k
               (space_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
1997
4.02k
               (inode_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
1998
4.02k
               (size_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent)) {
1999
58
        _need_evict_cache_in_advance = false;
2000
58
        _need_evict_cache_in_advance_metrics->set_value(0);
2001
58
    }
2002
4.09k
    if (previous_mode != _need_evict_cache_in_advance) {
2003
        // add log for evict cache in advance mode switching
2004
120
        if (_need_evict_cache_in_advance) {
2005
62
            LOG(WARNING) << "Entering evict cache in advance mode: "
2006
62
                         << "file_cache=" << get_base_path()
2007
62
                         << " space_percent=" << space_percentage
2008
62
                         << " inode_percent=" << inode_percentage
2009
62
                         << " size_percent=" << size_percentage
2010
62
                         << " is_space_insufficient=" << is_space_insufficient
2011
62
                         << " is_inode_insufficient=" << is_inode_insufficient
2012
62
                         << " is_size_insufficient=" << is_size_insufficient << " enter threshold="
2013
62
                         << config::file_cache_enter_need_evict_cache_in_advance_percent;
2014
62
        } else {
2015
58
            LOG(INFO) << "Exiting evict cache in advance mode: "
2016
58
                      << "file_cache=" << get_base_path() << " space_percent=" << space_percentage
2017
58
                      << " inode_percent=" << inode_percentage
2018
58
                      << " size_percent=" << size_percentage << " exit threshold="
2019
58
                      << config::file_cache_exit_need_evict_cache_in_advance_percent;
2020
58
        }
2021
3.97k
    } else if (_need_evict_cache_in_advance) {
2022
        // print log for evict cache in advance mode running, but less frequently
2023
7
        LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path()
2024
2
                                 << " space_percent=" << space_percentage
2025
2
                                 << " inode_percent=" << inode_percentage
2026
2
                                 << " size_percent=" << size_percentage
2027
2
                                 << " is_space_insufficient=" << is_space_insufficient
2028
2
                                 << " is_inode_insufficient=" << is_inode_insufficient
2029
2
                                 << " is_size_insufficient=" << is_size_insufficient
2030
2
                                 << " need evict cache in advance";
2031
7
    }
2032
4.09k
}
2033
2034
152
void BlockFileCache::run_background_monitor() {
2035
152
    Thread::set_self_name("run_background_monitor");
2036
5.17k
    while (!_close) {
2037
5.16k
        int64_t interval_ms = config::file_cache_background_monitor_interval_ms;
2038
5.16k
        TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_ms);
2039
5.16k
        check_disk_resource_limit();
2040
5.16k
        if (config::enable_evict_file_cache_in_advance) {
2041
4.08k
            check_need_evict_cache_in_advance();
2042
4.08k
        } else {
2043
1.08k
            _need_evict_cache_in_advance = false;
2044
1.08k
            _need_evict_cache_in_advance_metrics->set_value(0);
2045
1.08k
        }
2046
2047
5.16k
        {
2048
5.16k
            std::unique_lock close_lock(_close_mtx);
2049
5.16k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2050
5.16k
            if (_close) {
2051
149
                break;
2052
149
            }
2053
5.16k
        }
2054
        // report
2055
5.01k
        {
2056
5.01k
            SCOPED_CACHE_LOCK(_mutex, this);
2057
5.01k
            _cur_cache_size_metrics->set_value(_cur_cache_size);
2058
5.01k
            _cur_ttl_cache_size_metrics->set_value(_cur_cache_size -
2059
5.01k
                                                   _index_queue.get_capacity(cache_lock) -
2060
5.01k
                                                   _normal_queue.get_capacity(cache_lock) -
2061
5.01k
                                                   _disposable_queue.get_capacity(cache_lock) -
2062
5.01k
                                                   _cold_normal_queue.get_capacity(cache_lock));
2063
5.01k
            _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(
2064
5.01k
                    _ttl_queue.get_capacity(cache_lock));
2065
5.01k
            _cur_ttl_cache_lru_queue_element_count_metrics->set_value(
2066
5.01k
                    _ttl_queue.get_elements_num(cache_lock));
2067
5.01k
            _cur_normal_queue_cache_size_metrics->set_value(_normal_queue.get_capacity(cache_lock));
2068
5.01k
            _cur_normal_queue_element_count_metrics->set_value(
2069
5.01k
                    _normal_queue.get_elements_num(cache_lock));
2070
5.01k
            _cur_index_queue_cache_size_metrics->set_value(_index_queue.get_capacity(cache_lock));
2071
5.01k
            _cur_index_queue_element_count_metrics->set_value(
2072
5.01k
                    _index_queue.get_elements_num(cache_lock));
2073
5.01k
            _cur_disposable_queue_cache_size_metrics->set_value(
2074
5.01k
                    _disposable_queue.get_capacity(cache_lock));
2075
5.01k
            _cur_disposable_queue_element_count_metrics->set_value(
2076
5.01k
                    _disposable_queue.get_elements_num(cache_lock));
2077
5.01k
            _cur_cold_normal_queue_cache_size_metrics->set_value(
2078
5.01k
                    _cold_normal_queue.get_capacity(cache_lock));
2079
5.01k
            _cur_cold_normal_queue_element_count_metrics->set_value(
2080
5.01k
                    _cold_normal_queue.get_elements_num(cache_lock));
2081
2082
            // Update meta store write queue size if storage is FSFileCacheStorage
2083
5.01k
            if (_storage->get_type() == FileCacheStorageType::DISK) {
2084
4.10k
                auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get());
2085
4.10k
                if (fs_storage != nullptr) {
2086
4.10k
                    auto* meta_store = fs_storage->get_meta_store();
2087
4.10k
                    if (meta_store != nullptr) {
2088
4.10k
                        _meta_store_write_queue_size_metrics->set_value(
2089
4.10k
                                meta_store->get_write_queue_size());
2090
4.10k
                    }
2091
4.10k
                }
2092
4.10k
            }
2093
2094
5.01k
            if (_num_read_blocks->get_value() > 0) {
2095
2.42k
                _hit_ratio->set_value((double)_num_hit_blocks->get_value() /
2096
2.42k
                                      (double)_num_read_blocks->get_value());
2097
2.42k
            }
2098
5.01k
            if (_num_read_blocks_5m && _num_read_blocks_5m->get_value() > 0) {
2099
1.59k
                _hit_ratio_5m->set_value((double)_num_hit_blocks_5m->get_value() /
2100
1.59k
                                         (double)_num_read_blocks_5m->get_value());
2101
1.59k
            }
2102
5.01k
            if (_num_read_blocks_1h && _num_read_blocks_1h->get_value() > 0) {
2103
2.39k
                _hit_ratio_1h->set_value((double)_num_hit_blocks_1h->get_value() /
2104
2.39k
                                         (double)_num_read_blocks_1h->get_value());
2105
2.39k
            }
2106
2107
5.01k
            if (_no_warmup_num_hit_blocks->get_value() > 0) {
2108
2.37k
                _no_warmup_hit_ratio->set_value((double)_no_warmup_num_hit_blocks->get_value() /
2109
2.37k
                                                (double)_no_warmup_num_read_blocks->get_value());
2110
2.37k
            }
2111
5.01k
            if (_no_warmup_num_hit_blocks_5m && _no_warmup_num_hit_blocks_5m->get_value() > 0) {
2112
1.55k
                _no_warmup_hit_ratio_5m->set_value(
2113
1.55k
                        (double)_no_warmup_num_hit_blocks_5m->get_value() /
2114
1.55k
                        (double)_no_warmup_num_read_blocks_5m->get_value());
2115
1.55k
            }
2116
5.01k
            if (_no_warmup_num_hit_blocks_1h && _no_warmup_num_hit_blocks_1h->get_value() > 0) {
2117
2.36k
                _no_warmup_hit_ratio_1h->set_value(
2118
2.36k
                        (double)_no_warmup_num_hit_blocks_1h->get_value() /
2119
2.36k
                        (double)_no_warmup_num_read_blocks_1h->get_value());
2120
2.36k
            }
2121
5.01k
        }
2122
5.01k
    }
2123
152
}
2124
2125
152
void BlockFileCache::run_background_gc() {
2126
152
    Thread::set_self_name("run_background_gc");
2127
152
    FileCacheKey key;
2128
152
    size_t batch_count = 0;
2129
203k
    while (!_close) {
2130
203k
        int64_t interval_ms = config::file_cache_background_gc_interval_ms;
2131
203k
        size_t batch_limit = config::file_cache_remove_block_qps_limit * interval_ms / 1000;
2132
203k
        {
2133
203k
            std::unique_lock close_lock(_close_mtx);
2134
203k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2135
203k
            if (_close) {
2136
149
                break;
2137
149
            }
2138
203k
        }
2139
2140
301k
        while (batch_count < batch_limit && _recycle_keys.try_dequeue(key)) {
2141
98.9k
            int64_t duration_ns = 0;
2142
98.9k
            Status st;
2143
98.9k
            {
2144
98.9k
                SCOPED_RAW_TIMER(&duration_ns);
2145
98.9k
                st = _storage->remove(key);
2146
98.9k
            }
2147
98.9k
            *_storage_async_remove_latency_us << (duration_ns / 1000);
2148
2149
98.9k
            if (!st.ok()) {
2150
1.78k
                LOG_WARNING("").error(st);
2151
1.78k
            }
2152
98.9k
            batch_count++;
2153
98.9k
        }
2154
202k
        *_recycle_keys_length_recorder << _recycle_keys.size_approx();
2155
202k
        batch_count = 0;
2156
202k
    }
2157
152
}
2158
2159
152
void BlockFileCache::run_background_evict_in_advance() {
2160
152
    Thread::set_self_name("run_background_evict_in_advance");
2161
152
    LOG(INFO) << "Starting background evict in advance thread";
2162
152
    int64_t batch = 0;
2163
25.2k
    while (!_close) {
2164
25.2k
        {
2165
25.2k
            std::unique_lock close_lock(_close_mtx);
2166
25.2k
            _close_cv.wait_for(
2167
25.2k
                    close_lock,
2168
25.2k
                    std::chrono::milliseconds(config::file_cache_evict_in_advance_interval_ms));
2169
25.2k
            if (_close) {
2170
149
                LOG(INFO) << "Background evict in advance thread exiting due to cache closing";
2171
149
                break;
2172
149
            }
2173
25.2k
        }
2174
25.1k
        batch = config::file_cache_evict_in_advance_batch_bytes;
2175
2176
        // Skip if eviction not needed or too many pending recycles
2177
25.1k
        if (!_need_evict_cache_in_advance ||
2178
25.1k
            _recycle_keys.size_approx() >=
2179
24.8k
                    config::file_cache_evict_in_advance_recycle_keys_num_threshold) {
2180
24.8k
            continue;
2181
24.8k
        }
2182
2183
257
        int64_t duration_ns = 0;
2184
257
        {
2185
257
            SCOPED_CACHE_LOCK(_mutex, this);
2186
257
            SCOPED_RAW_TIMER(&duration_ns);
2187
257
            try_evict_in_advance(batch, cache_lock);
2188
257
        }
2189
257
        *_evict_in_advance_latency_us << (duration_ns / 1000);
2190
257
    }
2191
152
}
2192
2193
152
void BlockFileCache::run_background_block_lru_update() {
2194
152
    Thread::set_self_name("run_background_block_lru_update");
2195
152
    std::vector<FileBlockSPtr> batch;
2196
8.86k
    while (!_close) {
2197
8.85k
        int64_t interval_ms = config::file_cache_background_block_lru_update_interval_ms;
2198
8.85k
        size_t batch_limit =
2199
8.85k
                config::file_cache_background_block_lru_update_qps_limit * interval_ms / 1000;
2200
8.85k
        {
2201
8.85k
            std::unique_lock close_lock(_close_mtx);
2202
8.85k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2203
8.85k
            if (_close) {
2204
149
                break;
2205
149
            }
2206
8.85k
        }
2207
2208
8.70k
        batch.clear();
2209
8.70k
        batch.reserve(batch_limit);
2210
8.70k
        size_t drained = _need_update_lru_blocks.drain(batch_limit, &batch);
2211
8.72k
        if (drained == 0) {
2212
8.72k
            *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2213
8.72k
            continue;
2214
8.72k
        }
2215
2216
18.4E
        int64_t duration_ns = 0;
2217
18.4E
        {
2218
18.4E
            SCOPED_CACHE_LOCK(_mutex, this);
2219
18.4E
            SCOPED_RAW_TIMER(&duration_ns);
2220
18.4E
            for (auto& block : batch) {
2221
480
                update_block_lru(block, cache_lock);
2222
480
            }
2223
18.4E
        }
2224
18.4E
        *_update_lru_blocks_latency_us << (duration_ns / 1000);
2225
18.4E
        *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2226
18.4E
    }
2227
152
}
2228
2229
std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>>
2230
2
BlockFileCache::get_hot_blocks_meta(const UInt128Wrapper& hash) const {
2231
2
    int64_t cur_time = std::chrono::duration_cast<std::chrono::milliseconds>(
2232
2
                               std::chrono::steady_clock::now().time_since_epoch())
2233
2
                               .count();
2234
2
    SCOPED_CACHE_LOCK(_mutex, this);
2235
2
    std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>> blocks_meta;
2236
2
    if (auto iter = _files.find(hash); iter != _files.end()) {
2237
5
        for (auto& pair : _files.find(hash)->second) {
2238
5
            const FileBlockCell* cell = &pair.second;
2239
5
            if (cell->file_block->cache_type() != FileCacheType::DISPOSABLE) {
2240
4
                if (cell->file_block->cache_type() == FileCacheType::TTL ||
2241
4
                    (cell->atime != 0 &&
2242
3
                     cur_time - cell->atime <
2243
3
                             get_queue(cell->file_block->cache_type()).get_hot_data_interval() *
2244
3
                                     1000)) {
2245
3
                    blocks_meta.emplace_back(pair.first, cell->size(),
2246
3
                                             cell->file_block->cache_type(),
2247
3
                                             cell->file_block->expiration_time());
2248
3
                }
2249
4
            }
2250
5
        }
2251
2
    }
2252
2
    return blocks_meta;
2253
2
}
2254
2255
bool BlockFileCache::try_reserve_during_async_load(size_t size,
2256
2
                                                   std::lock_guard<std::mutex>& cache_lock) {
2257
2
    size_t removed_size = 0;
2258
2
    size_t normal_queue_size = _normal_queue.get_capacity(cache_lock);
2259
2
    size_t disposable_queue_size = _disposable_queue.get_capacity(cache_lock);
2260
2
    size_t index_queue_size = _index_queue.get_capacity(cache_lock);
2261
2
    size_t cold_normal_queue_size = _cold_normal_queue.get_capacity(cache_lock);
2262
2263
2
    std::vector<FileBlockCell*> to_evict;
2264
2
    auto collect_eliminate_fragments = [&](LRUQueue& queue) {
2265
3
        for (const auto& [entry_key, entry_offset, entry_size] : queue) {
2266
3
            if (!_disk_resource_limit_mode || removed_size >= size) {
2267
2
                break;
2268
2
            }
2269
1
            auto* cell = get_cell(entry_key, entry_offset, cache_lock);
2270
2271
1
            DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
2272
0
                         << ", offset: " << entry_offset;
2273
2274
1
            size_t cell_size = cell->size();
2275
1
            DCHECK(entry_size == cell_size);
2276
2277
1
            if (cell->releasable()) {
2278
1
                auto& file_block = cell->file_block;
2279
2280
1
                std::lock_guard block_lock(file_block->_mutex);
2281
1
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
2282
1
                to_evict.push_back(cell);
2283
1
                removed_size += cell_size;
2284
1
            }
2285
1
        }
2286
2
    };
2287
2
    if (disposable_queue_size != 0) {
2288
0
        collect_eliminate_fragments(get_queue(FileCacheType::DISPOSABLE));
2289
0
    }
2290
2
    if (normal_queue_size != 0) {
2291
2
        collect_eliminate_fragments(get_queue(FileCacheType::NORMAL));
2292
2
    }
2293
2
    if (index_queue_size != 0) {
2294
0
        collect_eliminate_fragments(get_queue(FileCacheType::INDEX));
2295
0
    }
2296
2
    if (cold_normal_queue_size != 0) {
2297
0
        collect_eliminate_fragments(get_queue(FileCacheType::COLD_NORMAL));
2298
0
    }
2299
2300
2
    std::string reason = "async load";
2301
2
    remove_file_blocks(to_evict, cache_lock, true, reason);
2302
2303
2
    return !_disk_resource_limit_mode || removed_size >= size;
2304
2
}
2305
2306
33
void BlockFileCache::clear_need_update_lru_blocks() {
2307
33
    _need_update_lru_blocks.clear();
2308
33
    *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2309
33
}
2310
2311
30
std::string BlockFileCache::clear_file_cache_directly() {
2312
30
    _lru_dumper->remove_lru_dump_files();
2313
30
    using namespace std::chrono;
2314
30
    std::stringstream ss;
2315
30
    auto start = steady_clock::now();
2316
30
    SCOPED_CACHE_LOCK(_mutex, this);
2317
30
    LOG_INFO("start clear_file_cache_directly").tag("path", _cache_base_path);
2318
2319
30
    std::string clear_msg;
2320
30
    auto s = _storage->clear(clear_msg);
2321
30
    if (!s.ok()) {
2322
1
        return clear_msg;
2323
1
    }
2324
2325
29
    int64_t num_files = _files.size();
2326
29
    int64_t cache_size = _cur_cache_size;
2327
29
    int64_t index_queue_size = _index_queue.get_elements_num(cache_lock);
2328
29
    int64_t normal_queue_size = _normal_queue.get_elements_num(cache_lock);
2329
29
    int64_t disposable_queue_size = _disposable_queue.get_elements_num(cache_lock);
2330
29
    int64_t ttl_queue_size = _ttl_queue.get_elements_num(cache_lock);
2331
29
    int64_t cold_normal_queue_size = _cold_normal_queue.get_elements_num(cache_lock);
2332
2333
29
    int64_t clear_fd_duration = 0;
2334
29
    {
2335
        // clear FDCache to release fd
2336
29
        SCOPED_RAW_TIMER(&clear_fd_duration);
2337
29
        for (const auto& [file_key, file_blocks] : _files) {
2338
13
            for (const auto& [offset, file_block_cell] : file_blocks) {
2339
13
                AccessKeyAndOffset access_key_and_offset(file_key, offset);
2340
13
                FDCache::instance()->remove_file_reader(access_key_and_offset);
2341
13
            }
2342
2
        }
2343
29
    }
2344
2345
29
    _files.clear();
2346
29
    _cur_cache_size = 0;
2347
29
    _cur_ttl_size = 0;
2348
29
    _time_to_key.clear();
2349
29
    _key_to_time.clear();
2350
29
    _index_queue.clear(cache_lock);
2351
29
    _normal_queue.clear(cache_lock);
2352
29
    _disposable_queue.clear(cache_lock);
2353
29
    _ttl_queue.clear(cache_lock);
2354
29
    _cold_normal_queue.clear(cache_lock);
2355
2356
    // Synchronously update cache metrics so that information_schema.file_cache_statistics
2357
    // reflects the cleared state immediately, without waiting for the next
2358
    // run_background_monitor cycle.
2359
29
    _cur_cache_size_metrics->set_value(0);
2360
29
    _cur_ttl_cache_size_metrics->set_value(0);
2361
29
    _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(0);
2362
29
    _cur_ttl_cache_lru_queue_element_count_metrics->set_value(0);
2363
29
    _cur_normal_queue_cache_size_metrics->set_value(0);
2364
29
    _cur_normal_queue_element_count_metrics->set_value(0);
2365
29
    _cur_index_queue_cache_size_metrics->set_value(0);
2366
29
    _cur_index_queue_element_count_metrics->set_value(0);
2367
29
    _cur_disposable_queue_cache_size_metrics->set_value(0);
2368
29
    _cur_disposable_queue_element_count_metrics->set_value(0);
2369
2370
29
    clear_need_update_lru_blocks();
2371
2372
29
    ss << "finish clear_file_cache_directly"
2373
29
       << " path=" << _cache_base_path
2374
29
       << " time_elapsed_ms=" << duration_cast<milliseconds>(steady_clock::now() - start).count()
2375
29
       << " fd_clear_time_ms=" << (clear_fd_duration / 1000000) << " num_files=" << num_files
2376
29
       << " cache_size=" << cache_size << " index_queue_size=" << index_queue_size
2377
29
       << " normal_queue_size=" << normal_queue_size
2378
29
       << " disposable_queue_size=" << disposable_queue_size << "ttl_queue_size=" << ttl_queue_size
2379
29
       << " cold_normal_queue_size=" << cold_normal_queue_size;
2380
29
    auto msg = ss.str();
2381
29
    LOG(INFO) << msg;
2382
29
    _lru_dumper->remove_lru_dump_files();
2383
29
    return msg;
2384
30
}
2385
2386
46.3k
std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const UInt128Wrapper& hash) {
2387
46.3k
    std::map<size_t, FileBlockSPtr> offset_to_block;
2388
46.3k
    SCOPED_CACHE_LOCK(_mutex, this);
2389
46.3k
    if (_files.contains(hash)) {
2390
43.3k
        for (auto& [offset, cell] : _files[hash]) {
2391
43.3k
            if (cell.file_block->state() == FileBlock::State::DOWNLOADED) {
2392
43.3k
                cell.file_block->_owned_by_cached_reader = true;
2393
43.3k
                offset_to_block.emplace(offset, cell.file_block);
2394
43.3k
            }
2395
43.3k
        }
2396
41.2k
    }
2397
46.3k
    return offset_to_block;
2398
46.3k
}
2399
2400
0
void BlockFileCache::update_ttl_atime(const UInt128Wrapper& hash) {
2401
0
    SCOPED_CACHE_LOCK(_mutex, this);
2402
0
    if (auto iter = _files.find(hash); iter != _files.end()) {
2403
0
        for (auto& [_, cell] : iter->second) {
2404
0
            cell.update_atime();
2405
0
        }
2406
0
    };
2407
0
}
2408
2409
152
void BlockFileCache::run_background_lru_log_replay() {
2410
152
    Thread::set_self_name("run_background_lru_log_replay");
2411
24.9k
    while (!_close) {
2412
24.9k
        int64_t interval_ms = config::file_cache_background_lru_log_replay_interval_ms;
2413
24.9k
        {
2414
24.9k
            std::unique_lock close_lock(_close_mtx);
2415
24.9k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2416
24.9k
            if (_close) {
2417
148
                break;
2418
148
            }
2419
24.9k
        }
2420
2421
24.8k
        _lru_recorder->replay_queue_event(FileCacheType::TTL);
2422
24.8k
        _lru_recorder->replay_queue_event(FileCacheType::INDEX);
2423
24.8k
        _lru_recorder->replay_queue_event(FileCacheType::NORMAL);
2424
24.8k
        _lru_recorder->replay_queue_event(FileCacheType::COLD_NORMAL);
2425
24.8k
        _lru_recorder->replay_queue_event(FileCacheType::DISPOSABLE);
2426
2427
24.8k
        if (config::enable_evaluate_shadow_queue_diff) {
2428
0
            SCOPED_CACHE_LOCK(_mutex, this);
2429
0
            _lru_recorder->evaluate_queue_diff(_ttl_queue, "ttl", cache_lock);
2430
0
            _lru_recorder->evaluate_queue_diff(_index_queue, "index", cache_lock);
2431
0
            _lru_recorder->evaluate_queue_diff(_normal_queue, "normal", cache_lock);
2432
0
            _lru_recorder->evaluate_queue_diff(_cold_normal_queue, "cold_normal", cache_lock);
2433
0
            _lru_recorder->evaluate_queue_diff(_disposable_queue, "disposable", cache_lock);
2434
0
        }
2435
24.8k
    }
2436
152
}
2437
2438
1.88k
void BlockFileCache::dump_lru_queues(bool force) {
2439
1.88k
    std::unique_lock dump_lock(_dump_lru_queues_mtx);
2440
1.88k
    if (config::file_cache_background_lru_dump_tail_record_num > 0 &&
2441
1.88k
        !ExecEnv::GetInstance()->get_is_upgrading()) {
2442
1.88k
        _lru_dumper->dump_queue("disposable", force);
2443
1.88k
        _lru_dumper->dump_queue("cold_normal", force);
2444
1.88k
        _lru_dumper->dump_queue("normal", force);
2445
1.88k
        _lru_dumper->dump_queue("index", force);
2446
1.88k
        _lru_dumper->dump_queue("ttl", force);
2447
1.88k
        _lru_dumper->set_first_dump_done();
2448
1.88k
    }
2449
1.88k
}
2450
2451
152
void BlockFileCache::run_background_lru_dump() {
2452
152
    Thread::set_self_name("run_background_lru_dump");
2453
2.04k
    while (!_close) {
2454
2.03k
        int64_t interval_ms = config::file_cache_background_lru_dump_interval_ms;
2455
2.03k
        {
2456
2.03k
            std::unique_lock close_lock(_close_mtx);
2457
2.03k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2458
2.03k
            if (_close) {
2459
149
                break;
2460
149
            }
2461
2.03k
        }
2462
1.88k
        dump_lru_queues(false);
2463
1.88k
    }
2464
152
}
2465
2466
152
void BlockFileCache::restore_lru_queues_from_disk(std::lock_guard<std::mutex>& cache_lock) {
2467
    // keep this order coz may be duplicated in different queue, we use the first appearence
2468
152
    _lru_dumper->restore_queue(_ttl_queue, "ttl", cache_lock);
2469
152
    _lru_dumper->restore_queue(_index_queue, "index", cache_lock);
2470
    // LRU queue mode compatibility:
2471
    // - Single queue dump → Dual queue load: All data enters NORMAL queue
2472
    // - Dual queue dump → Single queue load:
2473
    //     * COLD_NORMAL data merges into NORMAL queue in LRU order
2474
    //     * Restore order: cold_normal first (older), normal last (newer)
2475
152
    _lru_dumper->restore_queue(_normal_queue, "cold_normal", cache_lock);
2476
152
    _lru_dumper->restore_queue(_normal_queue, "normal", cache_lock);
2477
152
    _lru_dumper->restore_queue(_disposable_queue, "disposable", cache_lock);
2478
152
}
2479
2480
65
std::map<std::string, double> BlockFileCache::get_stats() {
2481
65
    std::map<std::string, double> stats;
2482
65
    stats["hits_ratio"] = (double)_hit_ratio->get_value();
2483
65
    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
2484
65
    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
2485
2486
65
    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
2487
65
    stats["index_queue_curr_size"] = (double)_cur_index_queue_cache_size_metrics->get_value();
2488
65
    stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
2489
65
    stats["index_queue_curr_elements"] =
2490
65
            (double)_cur_index_queue_element_count_metrics->get_value();
2491
2492
65
    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
2493
65
    stats["ttl_queue_curr_size"] = (double)_cur_ttl_cache_lru_queue_cache_size_metrics->get_value();
2494
65
    stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
2495
65
    stats["ttl_queue_curr_elements"] =
2496
65
            (double)_cur_ttl_cache_lru_queue_element_count_metrics->get_value();
2497
2498
65
    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
2499
65
    stats["normal_queue_curr_size"] = (double)_cur_normal_queue_cache_size_metrics->get_value();
2500
65
    stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
2501
65
    stats["normal_queue_curr_elements"] =
2502
65
            (double)_cur_normal_queue_element_count_metrics->get_value();
2503
2504
65
    stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
2505
65
    stats["disposable_queue_curr_size"] =
2506
65
            (double)_cur_disposable_queue_cache_size_metrics->get_value();
2507
65
    stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
2508
65
    stats["disposable_queue_curr_elements"] =
2509
65
            (double)_cur_disposable_queue_element_count_metrics->get_value();
2510
2511
65
    stats["cold_normal_queue_max_size"] = (double)_cold_normal_queue.get_max_size();
2512
65
    stats["cold_normal_queue_curr_size"] =
2513
65
            (double)_cur_cold_normal_queue_cache_size_metrics->get_value();
2514
65
    stats["cold_normal_queue_max_elements"] = (double)_cold_normal_queue.get_max_element_size();
2515
65
    stats["cold_normal_queue_curr_elements"] =
2516
65
            (double)_cur_cold_normal_queue_element_count_metrics->get_value();
2517
2518
65
    stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance;
2519
65
    stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode;
2520
2521
65
    stats["total_removed_counts"] = (double)_num_removed_blocks->get_value();
2522
65
    stats["total_hit_counts"] = (double)_num_hit_blocks->get_value();
2523
65
    stats["total_read_counts"] = (double)_num_read_blocks->get_value();
2524
2525
65
    stats["total_read_size"] = (double)_total_read_size_metrics->get_value();
2526
65
    stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value();
2527
65
    stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value();
2528
2529
65
    return stats;
2530
65
}
2531
2532
// for be UTs
2533
217
std::map<std::string, double> BlockFileCache::get_stats_unsafe() {
2534
217
    std::map<std::string, double> stats;
2535
217
    stats["hits_ratio"] = (double)_hit_ratio->get_value();
2536
217
    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
2537
217
    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
2538
2539
217
    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
2540
217
    stats["index_queue_curr_size"] = (double)_index_queue.get_capacity_unsafe();
2541
217
    stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
2542
217
    stats["index_queue_curr_elements"] = (double)_index_queue.get_elements_num_unsafe();
2543
2544
217
    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
2545
217
    stats["ttl_queue_curr_size"] = (double)_ttl_queue.get_capacity_unsafe();
2546
217
    stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
2547
217
    stats["ttl_queue_curr_elements"] = (double)_ttl_queue.get_elements_num_unsafe();
2548
2549
217
    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
2550
217
    stats["normal_queue_curr_size"] = (double)_normal_queue.get_capacity_unsafe();
2551
217
    stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
2552
217
    stats["normal_queue_curr_elements"] = (double)_normal_queue.get_elements_num_unsafe();
2553
2554
217
    stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
2555
217
    stats["disposable_queue_curr_size"] = (double)_disposable_queue.get_capacity_unsafe();
2556
217
    stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
2557
217
    stats["disposable_queue_curr_elements"] = (double)_disposable_queue.get_elements_num_unsafe();
2558
2559
217
    stats["cold_normal_queue_max_size"] = (double)_cold_normal_queue.get_max_size();
2560
217
    stats["cold_normal_queue_curr_size"] = (double)_cold_normal_queue.get_capacity_unsafe();
2561
217
    stats["cold_normal_queue_max_elements"] = (double)_cold_normal_queue.get_max_element_size();
2562
217
    stats["cold_normal_queue_curr_elements"] = (double)_cold_normal_queue.get_elements_num_unsafe();
2563
2564
217
    stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance;
2565
217
    stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode;
2566
2567
217
    stats["total_removed_counts"] = (double)_num_removed_blocks->get_value();
2568
217
    stats["total_hit_counts"] = (double)_num_hit_blocks->get_value();
2569
217
    stats["total_read_counts"] = (double)_num_read_blocks->get_value();
2570
2571
217
    stats["total_read_size"] = (double)_total_read_size_metrics->get_value();
2572
217
    stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value();
2573
217
    stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value();
2574
2575
217
    return stats;
2576
217
}
2577
2578
template void BlockFileCache::remove(FileBlockSPtr file_block,
2579
                                     std::lock_guard<std::mutex>& cache_lock,
2580
                                     std::lock_guard<std::mutex>& block_lock, bool sync);
2581
2582
#include "common/compile_check_end.h"
2583
2584
1
Status BlockFileCache::report_file_cache_inconsistency(std::vector<std::string>& results) {
2585
1
    InconsistencyContext inconsistency_context;
2586
1
    RETURN_IF_ERROR(check_file_cache_consistency(inconsistency_context));
2587
1
    auto n = inconsistency_context.types.size();
2588
1
    results.reserve(n);
2589
1
    for (size_t i = 0; i < n; i++) {
2590
0
        std::string result;
2591
0
        result += "File cache info in manager:\n";
2592
0
        result += inconsistency_context.infos_in_manager[i].to_string();
2593
0
        result += "File cache info in storage:\n";
2594
0
        result += inconsistency_context.infos_in_storage[i].to_string();
2595
0
        result += inconsistency_context.types[i].to_string();
2596
0
        result += "\n";
2597
0
        results.push_back(std::move(result));
2598
0
    }
2599
1
    return Status::OK();
2600
1
}
2601
2602
1
Status BlockFileCache::check_file_cache_consistency(InconsistencyContext& inconsistency_context) {
2603
1
    std::lock_guard<std::mutex> cache_lock(_mutex);
2604
1
    std::vector<FileCacheInfo> infos_in_storage;
2605
1
    RETURN_IF_ERROR(_storage->get_file_cache_infos(infos_in_storage, cache_lock));
2606
1
    std::unordered_set<AccessKeyAndOffset, KeyAndOffsetHash> confirmed_blocks;
2607
1
    for (const auto& info_in_storage : infos_in_storage) {
2608
0
        confirmed_blocks.insert({info_in_storage.hash, info_in_storage.offset});
2609
0
        auto* cell = get_cell(info_in_storage.hash, info_in_storage.offset, cache_lock);
2610
0
        if (cell == nullptr || cell->file_block == nullptr) {
2611
0
            inconsistency_context.infos_in_manager.emplace_back();
2612
0
            inconsistency_context.infos_in_storage.push_back(info_in_storage);
2613
0
            inconsistency_context.types.emplace_back(InconsistencyType::NOT_LOADED);
2614
0
            continue;
2615
0
        }
2616
0
        FileCacheInfo info_in_manager {
2617
0
                .hash = info_in_storage.hash,
2618
0
                .expiration_time = cell->file_block->expiration_time(),
2619
0
                .size = cell->size(),
2620
0
                .offset = info_in_storage.offset,
2621
0
                .is_tmp = cell->file_block->state() == FileBlock::State::DOWNLOADING,
2622
0
                .cache_type = cell->file_block->cache_type()};
2623
0
        InconsistencyType inconsistent_type;
2624
0
        if (info_in_storage.is_tmp != info_in_manager.is_tmp) {
2625
0
            inconsistent_type |= InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE;
2626
0
        }
2627
0
        size_t expected_size =
2628
0
                info_in_manager.is_tmp ? cell->dowloading_size() : info_in_manager.size;
2629
0
        if (info_in_storage.size != expected_size) {
2630
0
            inconsistent_type |= InconsistencyType::SIZE_INCONSISTENT;
2631
0
        }
2632
        // Only if it is not a tmp file need we check the cache type.
2633
0
        if ((inconsistent_type & InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE) == 0 &&
2634
0
            info_in_storage.cache_type != info_in_manager.cache_type) {
2635
0
            inconsistent_type |= InconsistencyType::CACHE_TYPE_INCONSISTENT;
2636
0
        }
2637
0
        if (info_in_storage.expiration_time != info_in_manager.expiration_time) {
2638
0
            inconsistent_type |= InconsistencyType::EXPIRATION_TIME_INCONSISTENT;
2639
0
        }
2640
0
        if (inconsistent_type != InconsistencyType::NONE) {
2641
0
            inconsistency_context.infos_in_manager.push_back(info_in_manager);
2642
0
            inconsistency_context.infos_in_storage.push_back(info_in_storage);
2643
0
            inconsistency_context.types.push_back(inconsistent_type);
2644
0
        }
2645
0
    }
2646
2647
1
    for (const auto& [hash, offset_to_cell] : _files) {
2648
0
        for (const auto& [offset, cell] : offset_to_cell) {
2649
0
            if (confirmed_blocks.contains({hash, offset})) {
2650
0
                continue;
2651
0
            }
2652
0
            const auto& block = cell.file_block;
2653
0
            inconsistency_context.infos_in_manager.emplace_back(
2654
0
                    hash, block->expiration_time(), cell.size(), offset,
2655
0
                    cell.file_block->state() == FileBlock::State::DOWNLOADING, block->cache_type());
2656
0
            inconsistency_context.infos_in_storage.emplace_back();
2657
0
            inconsistency_context.types.emplace_back(InconsistencyType::MISSING_IN_STORAGE);
2658
0
        }
2659
0
    }
2660
1
    return Status::OK();
2661
1
}
2662
2663
} // namespace doris::io