Coverage Report

Created: 2026-06-08 03:18

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/cache/block_file_cache.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileCache.cpp
19
// and modified by Doris
20
21
#include "io/cache/block_file_cache.h"
22
23
#include <gen_cpp/file_cache.pb.h>
24
25
#include <cstdio>
26
#include <exception>
27
#include <fstream>
28
#include <unordered_set>
29
30
#include "common/status.h"
31
#include "cpp/sync_point.h"
32
#include "runtime/exec_env.h"
33
34
#if defined(__APPLE__)
35
#include <sys/mount.h>
36
#else
37
#include <sys/statfs.h>
38
#endif
39
40
#include <chrono> // IWYU pragma: keep
41
#include <mutex>
42
#include <ranges>
43
44
#include "common/cast_set.h"
45
#include "common/config.h"
46
#include "common/logging.h"
47
#include "core/uint128.h"
48
#include "exec/common/sip_hash.h"
49
#include "io/cache/block_file_cache_ttl_mgr.h"
50
#include "io/cache/file_block.h"
51
#include "io/cache/file_cache_common.h"
52
#include "io/cache/fs_file_cache_storage.h"
53
#include "io/cache/mem_file_cache_storage.h"
54
#include "runtime/runtime_profile.h"
55
#include "util/concurrency_stats.h"
56
#include "util/defer_op.h"
57
#include "util/stack_util.h"
58
#include "util/stopwatch.hpp"
59
#include "util/thread.h"
60
#include "util/time.h"
61
namespace doris::io {
62
63
// Insert a block pointer into one shard while swallowing allocation failures.
64
193k
bool NeedUpdateLRUBlocks::insert(FileBlockSPtr block) {
65
193k
    if (!block) {
66
1
        return false;
67
1
    }
68
193k
    try {
69
193k
        auto* raw_ptr = block.get();
70
193k
        auto idx = shard_index(raw_ptr);
71
193k
        auto& shard = _shards[idx];
72
193k
        std::lock_guard lock(shard.mutex);
73
193k
        auto [_, inserted] = shard.entries.emplace(raw_ptr, std::move(block));
74
193k
        if (inserted) {
75
1.56k
            _size.fetch_add(1, std::memory_order_relaxed);
76
1.56k
        }
77
193k
        return inserted;
78
193k
    } catch (const std::exception& e) {
79
0
        LOG(WARNING) << "Failed to enqueue block for LRU update: " << e.what();
80
0
    } catch (...) {
81
0
        LOG(WARNING) << "Failed to enqueue block for LRU update: unknown error";
82
0
    }
83
0
    return false;
84
193k
}
85
86
// Drain up to `limit` unique blocks to the caller, keeping the structure consistent on failures.
87
3.98k
size_t NeedUpdateLRUBlocks::drain(size_t limit, std::vector<FileBlockSPtr>* output) {
88
3.98k
    if (limit == 0 || output == nullptr) {
89
2
        return 0;
90
2
    }
91
3.97k
    size_t drained = 0;
92
3.97k
    try {
93
3.97k
        output->reserve(output->size() + std::min(limit, size()));
94
254k
        for (auto& shard : _shards) {
95
254k
            if (drained >= limit) {
96
1
                break;
97
1
            }
98
254k
            std::lock_guard lock(shard.mutex);
99
254k
            auto it = shard.entries.begin();
100
254k
            size_t shard_drained = 0;
101
254k
            while (it != shard.entries.end() && drained + shard_drained < limit) {
102
10
                output->emplace_back(std::move(it->second));
103
10
                it = shard.entries.erase(it);
104
10
                ++shard_drained;
105
10
            }
106
254k
            if (shard_drained > 0) {
107
7
                _size.fetch_sub(shard_drained, std::memory_order_relaxed);
108
7
                drained += shard_drained;
109
7
            }
110
254k
        }
111
3.97k
    } catch (const std::exception& e) {
112
0
        LOG(WARNING) << "Failed to drain LRU update blocks: " << e.what();
113
0
    } catch (...) {
114
0
        LOG(WARNING) << "Failed to drain LRU update blocks: unknown error";
115
0
    }
116
3.97k
    return drained;
117
3.97k
}
118
119
// Remove every pending block, guarding against unexpected exceptions.
120
36
void NeedUpdateLRUBlocks::clear() {
121
36
    try {
122
2.30k
        for (auto& shard : _shards) {
123
2.30k
            std::lock_guard lock(shard.mutex);
124
2.30k
            if (!shard.entries.empty()) {
125
2
                auto removed = shard.entries.size();
126
2
                shard.entries.clear();
127
2
                _size.fetch_sub(removed, std::memory_order_relaxed);
128
2
            }
129
2.30k
        }
130
36
    } catch (const std::exception& e) {
131
0
        LOG(WARNING) << "Failed to clear LRU update blocks: " << e.what();
132
0
    } catch (...) {
133
0
        LOG(WARNING) << "Failed to clear LRU update blocks: unknown error";
134
0
    }
135
36
}
136
137
193k
size_t NeedUpdateLRUBlocks::shard_index(FileBlock* ptr) const {
138
193k
    DCHECK(ptr != nullptr);
139
193k
    return std::hash<FileBlock*> {}(ptr)&kShardMask;
140
193k
}
141
142
BlockFileCache::BlockFileCache(const std::string& cache_base_path,
143
                               const FileCacheSettings& cache_settings)
144
177
        : _cache_base_path(cache_base_path),
145
177
          _capacity(cache_settings.capacity),
146
177
          _max_file_block_size(cache_settings.max_file_block_size) {
147
177
    _cur_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(_cache_base_path.c_str(),
148
177
                                                                     "file_cache_cache_size", 0);
149
177
    _cache_capacity_metrics = std::make_shared<bvar::Status<size_t>>(
150
177
            _cache_base_path.c_str(), "file_cache_capacity", _capacity);
151
177
    _cur_ttl_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
152
177
            _cache_base_path.c_str(), "file_cache_ttl_cache_size", 0);
153
177
    _cur_normal_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
154
177
            _cache_base_path.c_str(), "file_cache_normal_queue_element_count", 0);
155
177
    _cur_ttl_cache_lru_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
156
177
            _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_size", 0);
157
177
    _cur_ttl_cache_lru_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
158
177
            _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_element_count", 0);
159
177
    _cur_normal_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
160
177
            _cache_base_path.c_str(), "file_cache_normal_queue_cache_size", 0);
161
177
    _cur_index_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
162
177
            _cache_base_path.c_str(), "file_cache_index_queue_element_count", 0);
163
177
    _cur_index_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
164
177
            _cache_base_path.c_str(), "file_cache_index_queue_cache_size", 0);
165
177
    _cur_disposable_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
166
177
            _cache_base_path.c_str(), "file_cache_disposable_queue_element_count", 0);
167
177
    _cur_disposable_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
168
177
            _cache_base_path.c_str(), "file_cache_disposable_queue_cache_size", 0);
169
170
177
    _queue_evict_size_metrics[0] = std::make_shared<bvar::Adder<size_t>>(
171
177
            _cache_base_path.c_str(), "file_cache_index_queue_evict_size");
172
177
    _queue_evict_size_metrics[1] = std::make_shared<bvar::Adder<size_t>>(
173
177
            _cache_base_path.c_str(), "file_cache_normal_queue_evict_size");
174
177
    _queue_evict_size_metrics[2] = std::make_shared<bvar::Adder<size_t>>(
175
177
            _cache_base_path.c_str(), "file_cache_disposable_queue_evict_size");
176
177
    _queue_evict_size_metrics[3] = std::make_shared<bvar::Adder<size_t>>(
177
177
            _cache_base_path.c_str(), "file_cache_ttl_cache_evict_size");
178
177
    _total_evict_size_metrics = std::make_shared<bvar::Adder<size_t>>(
179
177
            _cache_base_path.c_str(), "file_cache_total_evict_size");
180
177
    _total_read_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
181
177
                                                                     "file_cache_total_read_size");
182
177
    _total_hit_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
183
177
                                                                    "file_cache_total_hit_size");
184
177
    _gc_evict_bytes_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
185
177
                                                                    "file_cache_gc_evict_bytes");
186
177
    _gc_evict_count_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
187
177
                                                                    "file_cache_gc_evict_count");
188
189
177
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
190
177
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
191
177
                                                  "file_cache_evict_by_time_disposable_to_normal");
192
177
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
193
177
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
194
177
                                                  "file_cache_evict_by_time_disposable_to_index");
195
177
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
196
177
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
197
177
                                                  "file_cache_evict_by_time_disposable_to_ttl");
198
177
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
199
177
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
200
177
                                                  "file_cache_evict_by_time_normal_to_disposable");
201
177
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
202
177
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
203
177
                                                  "file_cache_evict_by_time_normal_to_index");
204
177
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
205
177
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
206
177
                                                  "file_cache_evict_by_time_normal_to_ttl");
207
177
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
208
177
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
209
177
                                                  "file_cache_evict_by_time_index_to_disposable");
210
177
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
211
177
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
212
177
                                                  "file_cache_evict_by_time_index_to_normal");
213
177
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
214
177
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
215
177
                                                  "file_cache_evict_by_time_index_to_ttl");
216
177
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
217
177
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
218
177
                                                  "file_cache_evict_by_time_ttl_to_disposable");
219
177
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
220
177
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
221
177
                                                  "file_cache_evict_by_time_ttl_to_normal");
222
177
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
223
177
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
224
177
                                                  "file_cache_evict_by_time_ttl_to_index");
225
226
177
    _evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE] =
227
177
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
228
177
                                                  "file_cache_evict_by_self_lru_disposable");
229
177
    _evict_by_self_lru_metrics_matrix[FileCacheType::NORMAL] =
230
177
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
231
177
                                                  "file_cache_evict_by_self_lru_normal");
232
177
    _evict_by_self_lru_metrics_matrix[FileCacheType::INDEX] = std::make_shared<bvar::Adder<size_t>>(
233
177
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_index");
234
177
    _evict_by_self_lru_metrics_matrix[FileCacheType::TTL] = std::make_shared<bvar::Adder<size_t>>(
235
177
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_ttl");
236
237
177
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
238
177
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
239
177
                                                  "file_cache_evict_by_size_disposable_to_normal");
240
177
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
241
177
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
242
177
                                                  "file_cache_evict_by_size_disposable_to_index");
243
177
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
244
177
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
245
177
                                                  "file_cache_evict_by_size_disposable_to_ttl");
246
177
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
247
177
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
248
177
                                                  "file_cache_evict_by_size_normal_to_disposable");
249
177
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
250
177
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
251
177
                                                  "file_cache_evict_by_size_normal_to_index");
252
177
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
253
177
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
254
177
                                                  "file_cache_evict_by_size_normal_to_ttl");
255
177
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
256
177
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
257
177
                                                  "file_cache_evict_by_size_index_to_disposable");
258
177
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
259
177
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
260
177
                                                  "file_cache_evict_by_size_index_to_normal");
261
177
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
262
177
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
263
177
                                                  "file_cache_evict_by_size_index_to_ttl");
264
177
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
265
177
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
266
177
                                                  "file_cache_evict_by_size_ttl_to_disposable");
267
177
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
268
177
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
269
177
                                                  "file_cache_evict_by_size_ttl_to_normal");
270
177
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
271
177
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
272
177
                                                  "file_cache_evict_by_size_ttl_to_index");
273
274
177
    _evict_by_try_release = std::make_shared<bvar::Adder<size_t>>(
275
177
            _cache_base_path.c_str(), "file_cache_evict_by_try_release");
276
277
177
    _num_read_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
278
177
                                                             "file_cache_num_read_blocks");
279
177
    _num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
280
177
                                                            "file_cache_num_hit_blocks");
281
177
    _num_removed_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
282
177
                                                                "file_cache_num_removed_blocks");
283
284
177
    _no_warmup_num_read_blocks = std::make_shared<bvar::Adder<size_t>>(
285
177
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks");
286
177
    _no_warmup_num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(
287
177
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks");
288
289
#ifndef BE_TEST
290
    _num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
291
            _cache_base_path.c_str(), "file_cache_num_hit_blocks_5m", _num_hit_blocks.get(), 300);
292
    _num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
293
            _cache_base_path.c_str(), "file_cache_num_read_blocks_5m", _num_read_blocks.get(), 300);
294
    _num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
295
            _cache_base_path.c_str(), "file_cache_num_hit_blocks_1h", _num_hit_blocks.get(), 3600);
296
    _num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
297
            _cache_base_path.c_str(), "file_cache_num_read_blocks_1h", _num_read_blocks.get(),
298
            3600);
299
    _no_warmup_num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
300
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_5m",
301
            _no_warmup_num_hit_blocks.get(), 300);
302
    _no_warmup_num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
303
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_5m",
304
            _no_warmup_num_read_blocks.get(), 300);
305
    _no_warmup_num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
306
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_1h",
307
            _no_warmup_num_hit_blocks.get(), 3600);
308
    _no_warmup_num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
309
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_1h",
310
            _no_warmup_num_read_blocks.get(), 3600);
311
#endif
312
313
177
    _hit_ratio = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
314
177
                                                        "file_cache_hit_ratio", 0.0);
315
177
    _hit_ratio_5m = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
316
177
                                                           "file_cache_hit_ratio_5m", 0.0);
317
177
    _hit_ratio_1h = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
318
177
                                                           "file_cache_hit_ratio_1h", 0.0);
319
320
177
    _no_warmup_hit_ratio = std::make_shared<bvar::Status<double>>(
321
177
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio", 0.0);
322
177
    _no_warmup_hit_ratio_5m = std::make_shared<bvar::Status<double>>(
323
177
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_5m", 0.0);
324
177
    _no_warmup_hit_ratio_1h = std::make_shared<bvar::Status<double>>(
325
177
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_1h", 0.0);
326
327
177
    _disk_limit_mode_metrics = std::make_shared<bvar::Status<size_t>>(
328
177
            _cache_base_path.c_str(), "file_cache_disk_limit_mode", 0);
329
177
    _need_evict_cache_in_advance_metrics = std::make_shared<bvar::Status<size_t>>(
330
177
            _cache_base_path.c_str(), "file_cache_need_evict_cache_in_advance", 0);
331
177
    _meta_store_write_queue_size_metrics = std::make_shared<bvar::Status<size_t>>(
332
177
            _cache_base_path.c_str(), "file_cache_meta_store_write_queue_size", 0);
333
334
177
    _cache_lock_wait_time_us = std::make_shared<bvar::LatencyRecorder>(
335
177
            _cache_base_path.c_str(), "file_cache_cache_lock_wait_time_us");
336
177
    _get_or_set_latency_us = std::make_shared<bvar::LatencyRecorder>(
337
177
            _cache_base_path.c_str(), "file_cache_get_or_set_latency_us");
338
177
    _storage_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
339
177
            _cache_base_path.c_str(), "file_cache_storage_sync_remove_latency_us");
340
177
    _storage_retry_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
341
177
            _cache_base_path.c_str(), "file_cache_storage_retry_sync_remove_latency_us");
342
177
    _storage_async_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
343
177
            _cache_base_path.c_str(), "file_cache_storage_async_remove_latency_us");
344
177
    _evict_in_advance_latency_us = std::make_shared<bvar::LatencyRecorder>(
345
177
            _cache_base_path.c_str(), "file_cache_evict_in_advance_latency_us");
346
177
    _lru_dump_latency_us = std::make_shared<bvar::LatencyRecorder>(
347
177
            _cache_base_path.c_str(), "file_cache_lru_dump_latency_us");
348
177
    _recycle_keys_length_recorder = std::make_shared<bvar::LatencyRecorder>(
349
177
            _cache_base_path.c_str(), "file_cache_recycle_keys_length");
350
177
    _need_update_lru_blocks_length_recorder = std::make_shared<bvar::LatencyRecorder>(
351
177
            _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_length");
352
177
    _update_lru_blocks_latency_us = std::make_shared<bvar::LatencyRecorder>(
353
177
            _cache_base_path.c_str(), "file_cache_update_lru_blocks_latency_us");
354
177
    _ttl_gc_latency_us = std::make_shared<bvar::LatencyRecorder>(_cache_base_path.c_str(),
355
177
                                                                 "file_cache_ttl_gc_latency_us");
356
177
    _shadow_queue_levenshtein_distance = std::make_shared<bvar::LatencyRecorder>(
357
177
            _cache_base_path.c_str(), "file_cache_shadow_queue_levenshtein_distance");
358
359
177
    _disposable_queue = LRUQueue(cache_settings.disposable_queue_size,
360
177
                                 cache_settings.disposable_queue_elements, 60 * 60);
361
177
    _index_queue = LRUQueue(cache_settings.index_queue_size, cache_settings.index_queue_elements,
362
177
                            7 * 24 * 60 * 60);
363
177
    _normal_queue = LRUQueue(cache_settings.query_queue_size, cache_settings.query_queue_elements,
364
177
                             24 * 60 * 60);
365
177
    _ttl_queue = LRUQueue(cache_settings.ttl_queue_size, cache_settings.ttl_queue_elements,
366
177
                          std::numeric_limits<int>::max());
367
368
177
    _lru_recorder = std::make_unique<LRUQueueRecorder>(this);
369
177
    _lru_dumper = std::make_unique<CacheLRUDumper>(this, _lru_recorder.get());
370
177
    if (cache_settings.storage == "memory") {
371
21
        _storage = std::make_unique<MemFileCacheStorage>();
372
21
        _cache_base_path = "memory";
373
156
    } else {
374
156
        _storage = std::make_unique<FSFileCacheStorage>();
375
156
    }
376
377
177
    LOG(INFO) << "file cache path= " << _cache_base_path << " " << cache_settings.to_string();
378
177
}
379
380
14.5k
UInt128Wrapper BlockFileCache::hash(const std::string& path) {
381
14.5k
    uint128_t value;
382
14.5k
    sip_hash128(path.data(), path.size(), reinterpret_cast<char*>(&value));
383
14.5k
    return UInt128Wrapper(value);
384
14.5k
}
385
386
BlockFileCache::QueryFileCacheContextHolderPtr BlockFileCache::get_query_context_holder(
387
8
        const TUniqueId& query_id, int file_cache_query_limit_percent) {
388
8
    SCOPED_CACHE_LOCK(_mutex, this);
389
8
    if (!config::enable_file_cache_query_limit) {
390
1
        return {};
391
1
    }
392
393
    /// if enable_filesystem_query_cache_limit is true,
394
    /// we create context query for current query.
395
7
    auto context = get_or_set_query_context(query_id, cache_lock, file_cache_query_limit_percent);
396
7
    return std::make_unique<QueryFileCacheContextHolder>(query_id, this, context);
397
8
}
398
399
BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_query_context(
400
3.37k
        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock) {
401
3.37k
    auto query_iter = _query_map.find(query_id);
402
3.37k
    return (query_iter == _query_map.end()) ? nullptr : query_iter->second;
403
3.37k
}
404
405
6
void BlockFileCache::remove_query_context(const TUniqueId& query_id) {
406
6
    SCOPED_CACHE_LOCK(_mutex, this);
407
6
    const auto& query_iter = _query_map.find(query_id);
408
409
6
    if (query_iter != _query_map.end() && query_iter->second.use_count() <= 1) {
410
5
        _query_map.erase(query_iter);
411
5
    }
412
6
}
413
414
BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_or_set_query_context(
415
        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock,
416
7
        int file_cache_query_limit_percent) {
417
7
    if (query_id.lo == 0 && query_id.hi == 0) {
418
1
        return nullptr;
419
1
    }
420
421
6
    auto context = get_query_context(query_id, cache_lock);
422
6
    if (context) {
423
1
        return context;
424
1
    }
425
426
5
    size_t file_cache_query_limit_size = _capacity * file_cache_query_limit_percent / 100;
427
5
    if (file_cache_query_limit_size < 268435456) {
428
5
        LOG(WARNING) << "The user-set file cache query limit (" << file_cache_query_limit_size
429
5
                     << " bytes) is less than the 256MB recommended minimum. "
430
5
                     << "Consider increasing the session variable 'file_cache_query_limit_percent'"
431
5
                     << " from its current value " << file_cache_query_limit_percent << "%.";
432
5
    }
433
5
    auto query_context = std::make_shared<QueryFileCacheContext>(file_cache_query_limit_size);
434
5
    auto query_iter = _query_map.emplace(query_id, query_context).first;
435
5
    return query_iter->second;
436
6
}
437
438
void BlockFileCache::QueryFileCacheContext::remove(const UInt128Wrapper& hash, size_t offset,
439
20
                                                   std::lock_guard<std::mutex>& cache_lock) {
440
20
    auto pair = std::make_pair(hash, offset);
441
20
    auto record = records.find(pair);
442
20
    DCHECK(record != records.end());
443
20
    auto iter = record->second;
444
20
    records.erase(pair);
445
20
    lru_queue.remove(iter, cache_lock);
446
20
}
447
448
void BlockFileCache::QueryFileCacheContext::reserve(const UInt128Wrapper& hash, size_t offset,
449
                                                    size_t size,
450
30
                                                    std::lock_guard<std::mutex>& cache_lock) {
451
30
    auto pair = std::make_pair(hash, offset);
452
30
    if (records.find(pair) == records.end()) {
453
29
        auto queue_iter = lru_queue.add(hash, offset, size, cache_lock);
454
29
        records.insert({pair, queue_iter});
455
29
    }
456
30
}
457
458
154
Status BlockFileCache::initialize() {
459
154
    SCOPED_CACHE_LOCK(_mutex, this);
460
154
    return initialize_unlocked(cache_lock);
461
154
}
462
463
154
Status BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lock) {
464
154
    DCHECK(!_is_initialized);
465
154
    _is_initialized = true;
466
154
    if (config::file_cache_background_lru_dump_tail_record_num > 0) {
467
        // requirements:
468
        // 1. restored data should not overwrite the last dump
469
        // 2. restore should happen before load and async load
470
        // 3. all queues should be restored sequencially to avoid conflict
471
        // TODO(zhengyu): we can parralize them but will increase complexity, so lets check the time cost
472
        // to see if any improvement is a necessary
473
154
        restore_lru_queues_from_disk(cache_lock);
474
154
    }
475
154
    RETURN_IF_ERROR(_storage->init(this));
476
477
154
    if (auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get())) {
478
135
        if (auto* meta_store = fs_storage->get_meta_store()) {
479
135
            _ttl_mgr = std::make_unique<BlockFileCacheTtlMgr>(this, meta_store);
480
135
        }
481
135
    }
482
483
154
    _cache_background_monitor_thread = std::thread(&BlockFileCache::run_background_monitor, this);
484
154
    _cache_background_gc_thread = std::thread(&BlockFileCache::run_background_gc, this);
485
154
    _cache_background_evict_in_advance_thread =
486
154
            std::thread(&BlockFileCache::run_background_evict_in_advance, this);
487
154
    _cache_background_block_lru_update_thread =
488
154
            std::thread(&BlockFileCache::run_background_block_lru_update, this);
489
490
    // Initialize LRU dump thread and restore queues
491
154
    _cache_background_lru_dump_thread = std::thread(&BlockFileCache::run_background_lru_dump, this);
492
154
    _cache_background_lru_log_replay_thread =
493
154
            std::thread(&BlockFileCache::run_background_lru_log_replay, this);
494
495
154
    return Status::OK();
496
154
}
497
498
void BlockFileCache::update_block_lru(FileBlockSPtr block,
499
7
                                      std::lock_guard<std::mutex>& cache_lock) {
500
7
    if (!block) {
501
1
        return;
502
1
    }
503
504
6
    FileBlockCell* cell = get_cell(block->get_hash_value(), block->offset(), cache_lock);
505
6
    if (!cell || cell->file_block.get() != block.get()) {
506
1
        return;
507
1
    }
508
509
5
    if (cell->queue_iterator) {
510
5
        auto& queue = get_queue(block->cache_type());
511
5
        queue.move_to_end(*cell->queue_iterator, cache_lock);
512
5
        _lru_recorder->record_queue_event(block->cache_type(), CacheLRULogType::MOVETOBACK,
513
5
                                          block->_key.hash, block->_key.offset,
514
5
                                          block->_block_range.size());
515
5
    }
516
5
    cell->update_atime();
517
5
}
518
519
void BlockFileCache::use_cell(const FileBlockCell& cell, FileBlocks* result, bool move_iter_flag,
520
712k
                              std::lock_guard<std::mutex>& cache_lock) {
521
712k
    if (result) {
522
712k
        result->push_back(cell.file_block);
523
712k
    }
524
525
712k
    auto& queue = get_queue(cell.file_block->cache_type());
526
    /// Move to the end of the queue. The iterator remains valid.
527
712k
    if (!config::enable_file_cache_async_touch_on_get_or_set && cell.queue_iterator &&
528
712k
        move_iter_flag) {
529
520k
        queue.move_to_end(*cell.queue_iterator, cache_lock);
530
520k
        _lru_recorder->record_queue_event(cell.file_block->cache_type(),
531
520k
                                          CacheLRULogType::MOVETOBACK, cell.file_block->_key.hash,
532
520k
                                          cell.file_block->_key.offset, cell.size());
533
520k
    }
534
535
712k
    cell.update_atime();
536
712k
}
537
538
template <class T>
539
    requires IsXLock<T>
540
FileBlockCell* BlockFileCache::get_cell(const UInt128Wrapper& hash, size_t offset,
541
11.1k
                                        T& /* cache_lock */) {
542
11.1k
    auto it = _files.find(hash);
543
11.1k
    if (it == _files.end()) {
544
4
        return nullptr;
545
4
    }
546
547
11.1k
    auto& offsets = it->second;
548
11.1k
    auto cell_it = offsets.find(offset);
549
11.1k
    if (cell_it == offsets.end()) {
550
3
        return nullptr;
551
3
    }
552
553
11.1k
    return &cell_it->second;
554
11.1k
}
555
556
904k
bool BlockFileCache::need_to_move(FileCacheType cell_type, FileCacheType query_type) const {
557
904k
    return query_type != FileCacheType::DISPOSABLE && cell_type != FileCacheType::DISPOSABLE;
558
904k
}
559
560
FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheContext& context,
561
                                    const FileBlock::Range& range,
562
725k
                                    std::lock_guard<std::mutex>& cache_lock) {
563
    /// Given range = [left, right] and non-overlapping ordered set of file blocks,
564
    /// find list [block1, ..., blockN] of blocks which intersect with given range.
565
725k
    auto it = _files.find(hash);
566
725k
    if (it == _files.end()) {
567
7.07k
        if (_async_open_done) {
568
7.07k
            return {};
569
7.07k
        }
570
1
        FileCacheKey key;
571
1
        key.hash = hash;
572
1
        key.meta.type = context.cache_type;
573
1
        key.meta.expiration_time = context.expiration_time;
574
1
        key.meta.tablet_id = context.tablet_id;
575
1
        _storage->load_blocks_directly_unlocked(this, key, cache_lock);
576
577
1
        it = _files.find(hash);
578
1
        if (it == _files.end()) [[unlikely]] {
579
1
            return {};
580
1
        }
581
1
    }
582
583
718k
    auto& file_blocks = it->second;
584
718k
    if (file_blocks.empty()) {
585
0
        LOG(WARNING) << "file_blocks is empty for hash=" << hash.to_string()
586
0
                     << " cache type=" << context.cache_type
587
0
                     << " cache expiration time=" << context.expiration_time
588
0
                     << " cache range=" << range.left << " " << range.right
589
0
                     << " query id=" << context.query_id;
590
0
        DCHECK(false);
591
0
        _files.erase(hash);
592
0
        return {};
593
0
    }
594
595
718k
    FileBlocks result;
596
718k
    auto block_it = file_blocks.lower_bound(range.left);
597
718k
    if (block_it == file_blocks.end()) {
598
        /// N - last cached block for given file hash, block{N}.offset < range.left:
599
        ///   block{N}                       block{N}
600
        /// [________                         [_______]
601
        ///     [__________]         OR                  [________]
602
        ///     ^                                        ^
603
        ///     range.left                               range.left
604
605
6.19k
        const auto& cell = file_blocks.rbegin()->second;
606
6.19k
        if (cell.file_block->range().right < range.left) {
607
6.17k
            return {};
608
6.17k
        }
609
610
14
        use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
611
14
                 cache_lock);
612
712k
    } else { /// block_it <-- segmment{k}
613
712k
        if (block_it != file_blocks.begin()) {
614
218
            const auto& prev_cell = std::prev(block_it)->second;
615
218
            const auto& prev_cell_range = prev_cell.file_block->range();
616
617
218
            if (range.left <= prev_cell_range.right) {
618
                ///   block{k-1}  block{k}
619
                ///   [________]   [_____
620
                ///       [___________
621
                ///       ^
622
                ///       range.left
623
624
137
                use_cell(prev_cell, &result,
625
137
                         need_to_move(prev_cell.file_block->cache_type(), context.cache_type),
626
137
                         cache_lock);
627
137
            }
628
218
        }
629
630
        ///  block{k} ...       block{k-1}  block{k}                      block{k}
631
        ///  [______              [______]     [____                        [________
632
        ///  [_________     OR              [________      OR    [______]   ^
633
        ///  ^                              ^                           ^   block{k}.offset
634
        ///  range.left                     range.left                  range.right
635
636
1.42M
        while (block_it != file_blocks.end()) {
637
712k
            const auto& cell = block_it->second;
638
712k
            if (range.right < cell.file_block->range().left) {
639
174
                break;
640
174
            }
641
642
712k
            use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
643
712k
                     cache_lock);
644
712k
            ++block_it;
645
712k
        }
646
712k
    }
647
648
712k
    return result;
649
718k
}
650
651
193k
void BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) {
652
193k
    if (_need_update_lru_blocks.insert(std::move(block))) {
653
1.55k
        *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
654
1.55k
    }
655
193k
}
656
657
4
std::string BlockFileCache::clear_file_cache_async() {
658
4
    LOG(INFO) << "start clear_file_cache_async, path=" << _cache_base_path;
659
4
    _lru_dumper->remove_lru_dump_files();
660
4
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::clear_file_cache_async");
661
4
    auto summary = clear_file_cache_common(false);
662
663
4
    std::stringstream ss;
664
4
    ss << "finish clear_file_cache_async, path=" << _cache_base_path
665
4
       << " num_files_all=" << summary.num_files_all << " num_cells_all=" << summary.num_cells_all
666
4
       << " num_cells_to_delete=" << summary.num_cells_to_delete
667
4
       << " num_cells_wait_recycle=" << summary.num_cells_wait_recycle;
668
4
    auto msg = ss.str();
669
4
    LOG(INFO) << msg;
670
4
    _lru_dumper->remove_lru_dump_files();
671
4
    return msg;
672
4
}
673
674
29
std::string BlockFileCache::clear_file_cache_sync() {
675
29
    LOG(INFO) << "start clear_file_cache_sync, path=" << _cache_base_path;
676
29
    pause_ttl_manager();
677
29
    Defer resume_ttl_manager_guard {[this] { resume_ttl_manager(); }};
678
679
29
    _lru_dumper->remove_lru_dump_files();
680
29
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::clear_file_cache_sync");
681
29
    auto summary = clear_file_cache_common(true);
682
683
29
    std::stringstream ss;
684
29
    ss << "finish clear_file_cache_sync, path=" << _cache_base_path
685
29
       << " num_files_all=" << summary.num_files_all << " num_cells_all=" << summary.num_cells_all
686
29
       << " num_cells_to_delete=" << summary.num_cells_to_delete
687
29
       << " num_cells_wait_recycle=" << summary.num_cells_wait_recycle;
688
29
    auto msg = ss.str();
689
29
    LOG(INFO) << msg;
690
29
    _lru_dumper->remove_lru_dump_files();
691
29
    return msg;
692
29
}
693
694
BlockFileCache::ClearFileCacheSummary BlockFileCache::clear_file_cache_common(
695
33
        bool sync_remove_releasable) {
696
33
    ClearFileCacheSummary summary;
697
33
    {
698
33
        SCOPED_CACHE_LOCK(_mutex, this);
699
700
33
        std::vector<FileBlockCell*> deleting_cells;
701
5.13k
        for (auto& [_, offset_to_cell] : _files) {
702
5.13k
            ++summary.num_files_all;
703
5.16k
            for (auto& [_1, cell] : offset_to_cell) {
704
5.16k
                ++summary.num_cells_all;
705
5.16k
                deleting_cells.push_back(&cell);
706
5.16k
            }
707
5.13k
        }
708
709
        // Removing cells invalidates _files iterators, so the scan first records cell pointers.
710
5.16k
        for (auto& cell : deleting_cells) {
711
5.16k
            if (!cell->releasable()) {
712
4
                LOG(INFO) << "cell is not releasable, hash="
713
4
                          << " offset=" << cell->file_block->offset();
714
4
                cell->file_block->set_deleting();
715
4
                ++summary.num_cells_wait_recycle;
716
4
                continue;
717
4
            }
718
5.15k
            FileBlockSPtr file_block = cell->file_block;
719
5.15k
            if (file_block) {
720
5.15k
                std::lock_guard block_lock(file_block->_mutex);
721
5.15k
                remove(file_block, cache_lock, block_lock, sync_remove_releasable);
722
5.15k
                ++summary.num_cells_to_delete;
723
5.15k
            }
724
5.15k
        }
725
33
        clear_need_update_lru_blocks();
726
33
    }
727
33
    return summary;
728
33
}
729
730
FileBlocks BlockFileCache::split_range_into_cells(const UInt128Wrapper& hash,
731
                                                  const CacheContext& context, size_t offset,
732
                                                  size_t size, FileBlock::State state,
733
13.4k
                                                  std::lock_guard<std::mutex>& cache_lock) {
734
13.4k
    DCHECK(size > 0);
735
736
13.4k
    auto current_pos = offset;
737
13.4k
    auto end_pos_non_included = offset + size;
738
739
13.4k
    size_t current_size = 0;
740
13.4k
    size_t remaining_size = size;
741
742
13.4k
    FileBlocks file_blocks;
743
26.9k
    while (current_pos < end_pos_non_included) {
744
13.4k
        current_size = std::min(remaining_size, _max_file_block_size);
745
13.4k
        remaining_size -= current_size;
746
13.4k
        state = try_reserve(hash, context, current_pos, current_size, cache_lock)
747
13.4k
                        ? state
748
13.4k
                        : FileBlock::State::SKIP_CACHE;
749
13.4k
        if (state == FileBlock::State::SKIP_CACHE) [[unlikely]] {
750
62
            FileCacheKey key;
751
62
            key.hash = hash;
752
62
            key.offset = current_pos;
753
62
            key.meta.type = context.cache_type;
754
62
            key.meta.expiration_time = context.expiration_time;
755
62
            key.meta.tablet_id = context.tablet_id;
756
62
            auto file_block = std::make_shared<FileBlock>(key, current_size, this,
757
62
                                                          FileBlock::State::SKIP_CACHE);
758
62
            file_blocks.push_back(std::move(file_block));
759
13.4k
        } else {
760
13.4k
            auto* cell = add_cell(hash, context, current_pos, current_size, state, cache_lock);
761
13.4k
            if (cell) {
762
13.4k
                file_blocks.push_back(cell->file_block);
763
13.4k
                if (!context.is_cold_data) {
764
13.4k
                    cell->update_atime();
765
13.4k
                }
766
13.4k
            }
767
13.4k
            if (_ttl_mgr && context.tablet_id != 0) {
768
1.06k
                _ttl_mgr->register_tablet_id(context.tablet_id);
769
1.06k
            }
770
13.4k
        }
771
772
13.4k
        current_pos += current_size;
773
13.4k
    }
774
775
13.4k
    DCHECK(file_blocks.empty() || offset + size - 1 == file_blocks.back()->range().right);
776
13.4k
    return file_blocks;
777
13.4k
}
778
779
void BlockFileCache::fill_holes_with_empty_file_blocks(FileBlocks& file_blocks,
780
                                                       const UInt128Wrapper& hash,
781
                                                       const CacheContext& context,
782
                                                       const FileBlock::Range& range,
783
712k
                                                       std::lock_guard<std::mutex>& cache_lock) {
784
    /// There are blocks [block1, ..., blockN]
785
    /// (non-overlapping, non-empty, ascending-ordered) which (maybe partially)
786
    /// intersect with given range.
787
788
    /// It can have holes:
789
    /// [____________________]         -- requested range
790
    ///     [____]  [_]   [_________]  -- intersecting cache [block1, ..., blockN]
791
    ///
792
    /// For each such hole create a cell with file block state EMPTY.
793
794
712k
    auto it = file_blocks.begin();
795
712k
    auto block_range = (*it)->range();
796
797
712k
    size_t current_pos = 0;
798
712k
    if (block_range.left < range.left) {
799
        ///    [_______     -- requested range
800
        /// [_______
801
        /// ^
802
        /// block1
803
804
151
        current_pos = block_range.right + 1;
805
151
        ++it;
806
712k
    } else {
807
712k
        current_pos = range.left;
808
712k
    }
809
810
1.42M
    while (current_pos <= range.right && it != file_blocks.end()) {
811
712k
        block_range = (*it)->range();
812
813
712k
        if (current_pos == block_range.left) {
814
712k
            current_pos = block_range.right + 1;
815
712k
            ++it;
816
712k
            continue;
817
712k
        }
818
819
712k
        DCHECK(current_pos < block_range.left);
820
821
109
        auto hole_size = block_range.left - current_pos;
822
823
109
        file_blocks.splice(it, split_range_into_cells(hash, context, current_pos, hole_size,
824
109
                                                      FileBlock::State::EMPTY, cache_lock));
825
826
109
        current_pos = block_range.right + 1;
827
109
        ++it;
828
109
    }
829
830
712k
    if (current_pos <= range.right) {
831
        ///   ________]     -- requested range
832
        ///   _____]
833
        ///        ^
834
        /// blockN
835
836
86
        auto hole_size = range.right - current_pos + 1;
837
838
86
        file_blocks.splice(file_blocks.end(),
839
86
                           split_range_into_cells(hash, context, current_pos, hole_size,
840
86
                                                  FileBlock::State::EMPTY, cache_lock));
841
86
    }
842
712k
}
843
844
FileBlocksHolder BlockFileCache::get_or_set(const UInt128Wrapper& hash, size_t offset, size_t size,
845
725k
                                            CacheContext& context) {
846
725k
    FileBlock::Range range(offset, offset + size - 1);
847
848
725k
    ReadStatistics* stats = context.stats;
849
725k
    DCHECK(stats != nullptr);
850
725k
    MonotonicStopWatch sw;
851
725k
    sw.start();
852
725k
    FileBlocks file_blocks;
853
725k
    std::vector<FileBlockSPtr> need_update_lru_blocks;
854
725k
    const bool async_touch_on_get_or_set = config::enable_file_cache_async_touch_on_get_or_set;
855
725k
    int64_t duration = 0;
856
725k
    {
857
725k
        ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->increment();
858
725k
        std::lock_guard cache_lock(_mutex);
859
725k
        ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->decrement();
860
725k
        stats->lock_wait_timer += sw.elapsed_time();
861
725k
        SCOPED_RAW_TIMER(&duration);
862
        /// Get all blocks which intersect with the given range.
863
725k
        {
864
725k
            SCOPED_RAW_TIMER(&stats->get_timer);
865
725k
            file_blocks = get_impl(hash, context, range, cache_lock);
866
725k
        }
867
868
725k
        if (file_blocks.empty()) {
869
13.2k
            SCOPED_RAW_TIMER(&stats->set_timer);
870
13.2k
            file_blocks = split_range_into_cells(hash, context, offset, size,
871
13.2k
                                                 FileBlock::State::EMPTY, cache_lock);
872
712k
        } else {
873
712k
            SCOPED_RAW_TIMER(&stats->set_timer);
874
712k
            fill_holes_with_empty_file_blocks(file_blocks, hash, context, range, cache_lock);
875
712k
        }
876
725k
        DCHECK(!file_blocks.empty());
877
725k
        *_num_read_blocks << file_blocks.size();
878
725k
        if (!context.is_warmup) {
879
725k
            *_no_warmup_num_read_blocks << file_blocks.size();
880
725k
        }
881
725k
        if (async_touch_on_get_or_set) {
882
193k
            need_update_lru_blocks.reserve(file_blocks.size());
883
193k
        }
884
726k
        for (auto& block : file_blocks) {
885
726k
            size_t block_size = block->range().size();
886
726k
            *_total_read_size_metrics << block_size;
887
726k
            if (block->state_unsafe() == FileBlock::State::DOWNLOADED) {
888
712k
                *_num_hit_blocks << 1;
889
712k
                *_total_hit_size_metrics << block_size;
890
712k
                if (!context.is_warmup) {
891
712k
                    *_no_warmup_num_hit_blocks << 1;
892
712k
                }
893
712k
                if (async_touch_on_get_or_set &&
894
712k
                    need_to_move(block->cache_type(), context.cache_type)) {
895
192k
                    need_update_lru_blocks.emplace_back(block);
896
192k
                }
897
712k
            }
898
726k
        }
899
725k
    }
900
901
725k
    if (async_touch_on_get_or_set) {
902
193k
        for (auto& block : need_update_lru_blocks) {
903
192k
            add_need_update_lru_block(std::move(block));
904
192k
        }
905
193k
    }
906
907
725k
    *_get_or_set_latency_us << (duration / 1000);
908
725k
    return FileBlocksHolder(std::move(file_blocks));
909
725k
}
910
911
FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& hash, const CacheContext& context,
912
                                        size_t offset, size_t size, FileBlock::State state,
913
13.6k
                                        std::lock_guard<std::mutex>& cache_lock) {
914
    /// Create a file block cell and put it in `files` map by [hash][offset].
915
13.6k
    if (size == 0) {
916
0
        return nullptr; /// Empty files are not cached.
917
0
    }
918
919
13.6k
    VLOG_DEBUG << "Adding file block to cache. size=" << size << " hash=" << hash.to_string()
920
0
               << " offset=" << offset << " cache_type=" << cache_type_to_string(context.cache_type)
921
0
               << " expiration_time=" << context.expiration_time
922
0
               << " tablet_id=" << context.tablet_id;
923
924
13.6k
    if (size > 1024 * 1024 * 1024) {
925
1
        LOG(WARNING) << "File block size is too large for a block, reject. size=" << size
926
1
                     << " hash=" << hash.to_string() << " offset=" << offset
927
1
                     << " stack:" << get_stack_trace();
928
1
        return nullptr;
929
1
    }
930
931
13.6k
    auto& offsets = _files[hash];
932
13.6k
    auto itr = offsets.find(offset);
933
13.6k
    if (itr != offsets.end()) {
934
5
        VLOG_DEBUG << "Cache already exists for hash: " << hash.to_string()
935
0
                   << ", offset: " << offset << ", size: " << size
936
0
                   << ".\nCurrent cache structure: " << dump_structure_unlocked(hash, cache_lock);
937
5
        return &(itr->second);
938
5
    }
939
940
13.6k
    FileCacheKey key;
941
13.6k
    key.hash = hash;
942
13.6k
    key.offset = offset;
943
13.6k
    key.meta.type = context.cache_type;
944
13.6k
    key.meta.expiration_time = context.expiration_time;
945
13.6k
    key.meta.tablet_id = context.tablet_id;
946
13.6k
    FileBlockCell cell(std::make_shared<FileBlock>(key, size, this, state), cache_lock);
947
13.6k
    Status st;
948
13.6k
    if (context.expiration_time == 0 && context.cache_type == FileCacheType::TTL) {
949
0
        st = cell.file_block->change_cache_type_lock(FileCacheType::NORMAL, cache_lock);
950
13.6k
    } else if (context.cache_type != FileCacheType::TTL && context.expiration_time != 0) {
951
1
        st = cell.file_block->change_cache_type_lock(FileCacheType::TTL, cache_lock);
952
1
    }
953
13.6k
    if (!st.ok()) {
954
0
        LOG(WARNING) << "Cannot change cache type. expiration_time=" << context.expiration_time
955
0
                     << " cache_type=" << cache_type_to_string(context.cache_type)
956
0
                     << " error=" << st.msg();
957
0
    }
958
959
13.6k
    auto& queue = get_queue(cell.file_block->cache_type());
960
13.6k
    cell.queue_iterator = queue.add(hash, offset, size, cache_lock);
961
13.6k
    _lru_recorder->record_queue_event(cell.file_block->cache_type(), CacheLRULogType::ADD,
962
13.6k
                                      cell.file_block->get_hash_value(), cell.file_block->offset(),
963
13.6k
                                      cell.size());
964
965
13.6k
    if (cell.file_block->cache_type() == FileCacheType::TTL) {
966
3.92k
        _cur_ttl_size += cell.size();
967
3.92k
    }
968
13.6k
    auto [it, _] = offsets.insert(std::make_pair(offset, std::move(cell)));
969
13.6k
    _cur_cache_size += size;
970
13.6k
    return &(it->second);
971
13.6k
}
972
973
0
size_t BlockFileCache::try_release() {
974
0
    SCOPED_CACHE_LOCK(_mutex, this);
975
0
    std::vector<FileBlockCell*> trash;
976
0
    for (auto& [hash, blocks] : _files) {
977
0
        for (auto& [offset, cell] : blocks) {
978
0
            if (cell.releasable()) {
979
0
                trash.emplace_back(&cell);
980
0
            } else {
981
0
                cell.file_block->set_deleting();
982
0
            }
983
0
        }
984
0
    }
985
0
    size_t remove_size = 0;
986
0
    for (auto& cell : trash) {
987
0
        FileBlockSPtr file_block = cell->file_block;
988
0
        std::lock_guard lc(cell->file_block->_mutex);
989
0
        remove_size += file_block->range().size();
990
0
        remove(file_block, cache_lock, lc);
991
0
        VLOG_DEBUG << "try_release " << _cache_base_path
992
0
                   << " hash=" << file_block->get_hash_value().to_string()
993
0
                   << " offset=" << file_block->offset();
994
0
    }
995
0
    *_evict_by_try_release << remove_size;
996
0
    LOG(INFO) << "Released " << trash.size() << " blocks in file cache " << _cache_base_path;
997
0
    return trash.size();
998
0
}
999
1000
769k
LRUQueue& BlockFileCache::get_queue(FileCacheType type) {
1001
769k
    switch (type) {
1002
14.5k
    case FileCacheType::INDEX:
1003
14.5k
        return _index_queue;
1004
14.3k
    case FileCacheType::DISPOSABLE:
1005
14.3k
        return _disposable_queue;
1006
734k
    case FileCacheType::NORMAL:
1007
734k
        return _normal_queue;
1008
5.79k
    case FileCacheType::TTL:
1009
5.79k
        return _ttl_queue;
1010
0
    default:
1011
0
        DCHECK(false);
1012
769k
    }
1013
0
    return _normal_queue;
1014
769k
}
1015
1016
140
const LRUQueue& BlockFileCache::get_queue(FileCacheType type) const {
1017
140
    switch (type) {
1018
33
    case FileCacheType::INDEX:
1019
33
        return _index_queue;
1020
1
    case FileCacheType::DISPOSABLE:
1021
1
        return _disposable_queue;
1022
106
    case FileCacheType::NORMAL:
1023
106
        return _normal_queue;
1024
0
    case FileCacheType::TTL:
1025
0
        return _ttl_queue;
1026
0
    default:
1027
0
        DCHECK(false);
1028
140
    }
1029
0
    return _normal_queue;
1030
140
}
1031
1032
void BlockFileCache::remove_file_blocks(std::vector<FileBlockCell*>& to_evict,
1033
                                        std::lock_guard<std::mutex>& cache_lock, bool sync,
1034
14.7k
                                        std::string& reason) {
1035
14.7k
    auto remove_file_block_if = [&](FileBlockCell* cell) {
1036
1.36k
        FileBlockSPtr file_block = cell->file_block;
1037
1.36k
        if (file_block) {
1038
1.36k
            std::lock_guard block_lock(file_block->_mutex);
1039
1.36k
            remove(file_block, cache_lock, block_lock, sync);
1040
1.36k
            VLOG_DEBUG << "remove_file_blocks"
1041
0
                       << " hash=" << file_block->get_hash_value().to_string()
1042
0
                       << " offset=" << file_block->offset() << " reason=" << reason;
1043
1.36k
        }
1044
1.36k
    };
1045
14.7k
    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
1046
14.7k
}
1047
1048
void BlockFileCache::find_evict_candidates(LRUQueue& queue, size_t size, size_t cur_cache_size,
1049
                                           size_t& removed_size,
1050
                                           std::vector<FileBlockCell*>& to_evict,
1051
                                           std::lock_guard<std::mutex>& cache_lock,
1052
1.34k
                                           size_t& cur_removed_size, bool evict_in_advance) {
1053
3.10k
    for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1054
3.10k
        if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1055
1.28k
            break;
1056
1.28k
        }
1057
1.81k
        auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1058
1059
1.81k
        DCHECK(cell) << "Cache became inconsistent. key: " << entry_key.to_string()
1060
0
                     << ", offset: " << entry_offset;
1061
1062
1.81k
        size_t cell_size = cell->size();
1063
1.81k
        DCHECK(entry_size == cell_size);
1064
1065
1.81k
        if (cell->releasable()) {
1066
1.34k
            auto& file_block = cell->file_block;
1067
1068
1.34k
            std::lock_guard block_lock(file_block->_mutex);
1069
1.34k
            DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1070
1.34k
            to_evict.push_back(cell);
1071
1.34k
            removed_size += cell_size;
1072
1.34k
            cur_removed_size += cell_size;
1073
1.34k
        }
1074
1.81k
    }
1075
1.34k
}
1076
1077
// 1. if async load file cache not finish
1078
//     a. evict from lru queue
1079
// 2. if ttl cache
1080
//     a. evict from disposable/normal/index queue one by one
1081
// 3. if dont reach query limit or dont have query limit
1082
//     a. evict from other queue
1083
//     b. evict from current queue
1084
//         a.1 if the data belong write, then just evict cold data
1085
// 4. if reach query limit
1086
//     a. evict from query queue
1087
//     b. evict from other queue
1088
bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const CacheContext& context,
1089
                                 size_t offset, size_t size,
1090
13.4k
                                 std::lock_guard<std::mutex>& cache_lock) {
1091
13.4k
    if (!_async_open_done) {
1092
4
        return try_reserve_during_async_load(size, cache_lock);
1093
4
    }
1094
    // use this strategy in scenarios where there is insufficient disk capacity or insufficient number of inodes remaining
1095
    // directly eliminate 5 times the size of the space
1096
13.4k
    if (_disk_resource_limit_mode) {
1097
1
        size = 5 * size;
1098
1
    }
1099
1100
13.4k
    auto query_context = config::enable_file_cache_query_limit &&
1101
13.4k
                                         (context.query_id.hi != 0 || context.query_id.lo != 0)
1102
13.4k
                                 ? get_query_context(context.query_id, cache_lock)
1103
13.4k
                                 : nullptr;
1104
13.4k
    if (!query_context) {
1105
13.4k
        return try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock);
1106
13.4k
    } else if (query_context->get_cache_size(cache_lock) + size <=
1107
31
               query_context->get_max_cache_size()) {
1108
16
        return try_reserve_for_lru(hash, query_context, context, offset, size, cache_lock);
1109
16
    }
1110
15
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
1111
15
                               std::chrono::steady_clock::now().time_since_epoch())
1112
15
                               .count();
1113
15
    auto& queue = get_queue(context.cache_type);
1114
15
    size_t removed_size = 0;
1115
15
    size_t ghost_remove_size = 0;
1116
15
    size_t queue_size = queue.get_capacity(cache_lock);
1117
15
    size_t cur_cache_size = _cur_cache_size;
1118
15
    size_t query_context_cache_size = query_context->get_cache_size(cache_lock);
1119
1120
15
    std::vector<LRUQueue::Iterator> ghost;
1121
15
    std::vector<FileBlockCell*> to_evict;
1122
1123
15
    size_t max_size = queue.get_max_size();
1124
48
    auto is_overflow = [&] {
1125
48
        return _disk_resource_limit_mode ? removed_size < size
1126
48
                                         : cur_cache_size + size - removed_size > _capacity ||
1127
48
                                                   (queue_size + size - removed_size > max_size) ||
1128
48
                                                   (query_context_cache_size + size -
1129
38
                                                            (removed_size + ghost_remove_size) >
1130
38
                                                    query_context->get_max_cache_size());
1131
48
    };
1132
1133
    /// Select the cache from the LRU queue held by query for expulsion.
1134
35
    for (auto iter = query_context->queue().begin(); iter != query_context->queue().end(); iter++) {
1135
33
        if (!is_overflow()) {
1136
13
            break;
1137
13
        }
1138
1139
20
        auto* cell = get_cell(iter->hash, iter->offset, cache_lock);
1140
1141
20
        if (!cell) {
1142
            /// The cache corresponding to this record may be swapped out by
1143
            /// other queries, so it has become invalid.
1144
4
            ghost.push_back(iter);
1145
4
            ghost_remove_size += iter->size;
1146
16
        } else {
1147
16
            size_t cell_size = cell->size();
1148
16
            DCHECK(iter->size == cell_size);
1149
1150
16
            if (cell->releasable()) {
1151
16
                auto& file_block = cell->file_block;
1152
1153
16
                std::lock_guard block_lock(file_block->_mutex);
1154
16
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1155
16
                to_evict.push_back(cell);
1156
16
                removed_size += cell_size;
1157
16
            }
1158
16
        }
1159
20
    }
1160
1161
16
    auto remove_file_block_if = [&](FileBlockCell* cell) {
1162
16
        FileBlockSPtr file_block = cell->file_block;
1163
16
        if (file_block) {
1164
16
            query_context->remove(file_block->get_hash_value(), file_block->offset(), cache_lock);
1165
16
            std::lock_guard block_lock(file_block->_mutex);
1166
16
            remove(file_block, cache_lock, block_lock);
1167
16
        }
1168
16
    };
1169
1170
15
    for (auto& iter : ghost) {
1171
4
        query_context->remove(iter->hash, iter->offset, cache_lock);
1172
4
    }
1173
1174
15
    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
1175
1176
15
    if (is_overflow() &&
1177
15
        !try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock)) {
1178
1
        return false;
1179
1
    }
1180
14
    query_context->reserve(hash, offset, size, cache_lock);
1181
14
    return true;
1182
15
}
1183
1184
2
void BlockFileCache::try_evict_in_advance(size_t size, std::lock_guard<std::mutex>& cache_lock) {
1185
2
    UInt128Wrapper hash = UInt128Wrapper();
1186
2
    size_t offset = 0;
1187
2
    CacheContext context;
1188
    /* we pick NORMAL and TTL cache to evict in advance
1189
     * we reserve for them but won't acutually give space to them
1190
     * on the contrary, NORMAL and TTL may sacrifice by LRU evicting themselves
1191
     * other cache types cannot be exempted because we will evict what they have stolen before LRU evicting
1192
     * in summary: all cache types will shrink somewhat, and NORMAL and TTL shrink the most, to make sure the cache is not full
1193
     */
1194
2
    context.cache_type = FileCacheType::NORMAL;
1195
2
    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
1196
2
    context.cache_type = FileCacheType::TTL;
1197
2
    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
1198
2
}
1199
1200
// remove specific cache synchronously, for critical operations
1201
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
1202
8
void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) {
1203
8
    std::string reason = "remove_if_cached";
1204
8
    SCOPED_CACHE_LOCK(_mutex, this);
1205
8
    auto iter = _files.find(file_key);
1206
8
    std::vector<FileBlockCell*> to_remove;
1207
8
    if (iter != _files.end()) {
1208
14
        for (auto& [_, cell] : iter->second) {
1209
14
            if (cell.releasable()) {
1210
13
                to_remove.push_back(&cell);
1211
13
            } else {
1212
1
                cell.file_block->set_deleting();
1213
1
            }
1214
14
        }
1215
6
    }
1216
8
    remove_file_blocks(to_remove, cache_lock, true, reason);
1217
8
}
1218
1219
// the async version of remove_if_cached, for background operations
1220
// cache meta is deleted synchronously if not in use, and the block file is deleted asynchronously
1221
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
1222
1
void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) {
1223
1
    std::string reason = "remove_if_cached_async";
1224
1
    SCOPED_CACHE_LOCK(_mutex, this);
1225
1226
1
    auto iter = _files.find(file_key);
1227
1
    std::vector<FileBlockCell*> to_remove;
1228
1
    if (iter != _files.end()) {
1229
1
        for (auto& [_, cell] : iter->second) {
1230
1
            *_gc_evict_bytes_metrics << cell.size();
1231
1
            *_gc_evict_count_metrics << 1;
1232
1
            if (cell.releasable()) {
1233
0
                to_remove.push_back(&cell);
1234
1
            } else {
1235
1
                cell.file_block->set_deleting();
1236
1
            }
1237
1
        }
1238
1
    }
1239
1
    remove_file_blocks(to_remove, cache_lock, false, reason);
1240
1
}
1241
1242
std::vector<FileCacheType> BlockFileCache::get_other_cache_type_without_ttl(
1243
13.4k
        FileCacheType cur_cache_type) {
1244
13.4k
    switch (cur_cache_type) {
1245
3.91k
    case FileCacheType::TTL:
1246
3.91k
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
1247
671
    case FileCacheType::INDEX:
1248
671
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL};
1249
8.28k
    case FileCacheType::NORMAL:
1250
8.28k
        return {FileCacheType::DISPOSABLE, FileCacheType::INDEX};
1251
617
    case FileCacheType::DISPOSABLE:
1252
617
        return {FileCacheType::NORMAL, FileCacheType::INDEX};
1253
0
    default:
1254
0
        return {};
1255
13.4k
    }
1256
0
    return {};
1257
13.4k
}
1258
1259
1.25k
std::vector<FileCacheType> BlockFileCache::get_other_cache_type(FileCacheType cur_cache_type) {
1260
1.25k
    switch (cur_cache_type) {
1261
284
    case FileCacheType::TTL:
1262
284
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
1263
143
    case FileCacheType::INDEX:
1264
143
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::TTL};
1265
677
    case FileCacheType::NORMAL:
1266
677
        return {FileCacheType::DISPOSABLE, FileCacheType::INDEX, FileCacheType::TTL};
1267
152
    case FileCacheType::DISPOSABLE:
1268
152
        return {FileCacheType::NORMAL, FileCacheType::INDEX, FileCacheType::TTL};
1269
0
    default:
1270
0
        return {};
1271
1.25k
    }
1272
0
    return {};
1273
1.25k
}
1274
1275
void BlockFileCache::reset_range(const UInt128Wrapper& hash, size_t offset, size_t old_size,
1276
5
                                 size_t new_size, std::lock_guard<std::mutex>& cache_lock) {
1277
5
    DCHECK(_files.find(hash) != _files.end() &&
1278
5
           _files.find(hash)->second.find(offset) != _files.find(hash)->second.end());
1279
5
    FileBlockCell* cell = get_cell(hash, offset, cache_lock);
1280
5
    DCHECK(cell != nullptr);
1281
5
    if (cell == nullptr) {
1282
0
        LOG(WARNING) << "reset_range skipped because cache cell is missing. hash="
1283
0
                     << hash.to_string() << " offset=" << offset << " old_size=" << old_size
1284
0
                     << " new_size=" << new_size;
1285
0
        return;
1286
0
    }
1287
5
    DCHECK_EQ(cell->file_block->_block_range.size(), old_size);
1288
5
    if (cell->queue_iterator) {
1289
5
        auto& queue = get_queue(cell->file_block->cache_type());
1290
5
        DCHECK(queue.contains(hash, offset, cache_lock));
1291
5
        queue.resize(*cell->queue_iterator, new_size, cache_lock);
1292
5
        _lru_recorder->record_queue_event(cell->file_block->cache_type(), CacheLRULogType::RESIZE,
1293
5
                                          cell->file_block->get_hash_value(),
1294
5
                                          cell->file_block->offset(), new_size);
1295
5
    }
1296
5
    cell->file_block->_block_range.right = cell->file_block->_block_range.left + new_size - 1;
1297
5
    _cur_cache_size -= old_size;
1298
5
    _cur_cache_size += new_size;
1299
5
    if (cell->file_block->cache_type() == FileCacheType::TTL) {
1300
0
        _cur_ttl_size -= old_size;
1301
0
        _cur_ttl_size += new_size;
1302
0
    }
1303
5
}
1304
1305
bool BlockFileCache::try_reserve_from_other_queue_by_time_interval(
1306
        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
1307
13.4k
        int64_t cur_time, std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
1308
13.4k
    size_t removed_size = 0;
1309
13.4k
    size_t cur_cache_size = _cur_cache_size;
1310
13.4k
    std::vector<FileBlockCell*> to_evict;
1311
30.8k
    for (FileCacheType cache_type : other_cache_types) {
1312
30.8k
        auto& queue = get_queue(cache_type);
1313
30.8k
        size_t remove_size_per_type = 0;
1314
30.8k
        for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1315
1.43k
            if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1316
697
                break;
1317
697
            }
1318
739
            auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1319
739
            DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
1320
0
                         << ", offset: " << entry_offset;
1321
1322
739
            size_t cell_size = cell->size();
1323
739
            DCHECK(entry_size == cell_size);
1324
1325
739
            if (cell->atime == 0 ? true : cell->atime + queue.get_hot_data_interval() > cur_time) {
1326
737
                break;
1327
737
            }
1328
1329
2
            if (cell->releasable()) {
1330
2
                auto& file_block = cell->file_block;
1331
2
                std::lock_guard block_lock(file_block->_mutex);
1332
2
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1333
2
                to_evict.push_back(cell);
1334
2
                removed_size += cell_size;
1335
2
                remove_size_per_type += cell_size;
1336
2
            }
1337
2
        }
1338
30.8k
        *(_evict_by_time_metrics_matrix[cache_type][cur_type]) << remove_size_per_type;
1339
30.8k
    }
1340
13.4k
    bool is_sync_removal = !evict_in_advance;
1341
13.4k
    std::string reason = std::string("try_reserve_by_time ") +
1342
13.4k
                         " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1343
13.4k
    remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1344
1345
13.4k
    return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
1346
13.4k
}
1347
1348
bool BlockFileCache::is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size,
1349
19.2k
                                 bool evict_in_advance) const {
1350
19.2k
    bool ret = false;
1351
19.2k
    if (evict_in_advance) { // we don't need to check _need_evict_cache_in_advance
1352
23
        ret = (removed_size < need_size);
1353
23
        return ret;
1354
23
    }
1355
19.2k
    if (_disk_resource_limit_mode) {
1356
4
        ret = (removed_size < need_size);
1357
19.2k
    } else {
1358
19.2k
        ret = (cur_cache_size + need_size - removed_size > _capacity);
1359
19.2k
    }
1360
19.2k
    return ret;
1361
19.2k
}
1362
1363
bool BlockFileCache::try_reserve_from_other_queue_by_size(
1364
        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
1365
415
        std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
1366
415
    size_t removed_size = 0;
1367
415
    size_t cur_cache_size = _cur_cache_size;
1368
415
    std::vector<FileBlockCell*> to_evict;
1369
    // we follow the privilege defined in get_other_cache_types to evict
1370
1.24k
    for (FileCacheType cache_type : other_cache_types) {
1371
1.24k
        auto& queue = get_queue(cache_type);
1372
1373
        // we will not drain each of them to the bottom -- i.e., we only
1374
        // evict what they have stolen.
1375
1.24k
        size_t cur_queue_size = queue.get_capacity(cache_lock);
1376
1.24k
        size_t cur_queue_max_size = queue.get_max_size();
1377
1.24k
        if (cur_queue_size <= cur_queue_max_size) {
1378
740
            continue;
1379
740
        }
1380
505
        size_t cur_removed_size = 0;
1381
505
        find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
1382
505
                              cur_removed_size, evict_in_advance);
1383
505
        *(_evict_by_size_metrics_matrix[cache_type][cur_type]) << cur_removed_size;
1384
505
    }
1385
415
    bool is_sync_removal = !evict_in_advance;
1386
415
    std::string reason = std::string("try_reserve_by_size") +
1387
415
                         " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1388
415
    remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1389
415
    return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
1390
415
}
1391
1392
bool BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type, size_t size,
1393
                                                  int64_t cur_time,
1394
                                                  std::lock_guard<std::mutex>& cache_lock,
1395
13.4k
                                                  bool evict_in_advance) {
1396
    // currently, TTL cache is not considered as a candidate
1397
13.4k
    auto other_cache_types = get_other_cache_type_without_ttl(cur_cache_type);
1398
13.4k
    bool reserve_success = try_reserve_from_other_queue_by_time_interval(
1399
13.4k
            cur_cache_type, other_cache_types, size, cur_time, cache_lock, evict_in_advance);
1400
13.4k
    if (reserve_success || !config::file_cache_enable_evict_from_other_queue_by_size) {
1401
12.2k
        return reserve_success;
1402
12.2k
    }
1403
1404
1.25k
    other_cache_types = get_other_cache_type(cur_cache_type);
1405
1.25k
    auto& cur_queue = get_queue(cur_cache_type);
1406
1.25k
    size_t cur_queue_size = cur_queue.get_capacity(cache_lock);
1407
1.25k
    size_t cur_queue_max_size = cur_queue.get_max_size();
1408
    // Hit the soft limit by self, cannot remove from other queues
1409
1.25k
    if (_cur_cache_size + size > _capacity && cur_queue_size + size > cur_queue_max_size) {
1410
841
        return false;
1411
841
    }
1412
415
    return try_reserve_from_other_queue_by_size(cur_cache_type, other_cache_types, size, cache_lock,
1413
415
                                                evict_in_advance);
1414
1.25k
}
1415
1416
bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash,
1417
                                         QueryFileCacheContextPtr query_context,
1418
                                         const CacheContext& context, size_t offset, size_t size,
1419
                                         std::lock_guard<std::mutex>& cache_lock,
1420
13.4k
                                         bool evict_in_advance) {
1421
13.4k
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
1422
13.4k
                               std::chrono::steady_clock::now().time_since_epoch())
1423
13.4k
                               .count();
1424
13.4k
    if (!try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock,
1425
13.4k
                                      evict_in_advance)) {
1426
843
        auto& queue = get_queue(context.cache_type);
1427
843
        size_t removed_size = 0;
1428
843
        size_t cur_cache_size = _cur_cache_size;
1429
1430
843
        std::vector<FileBlockCell*> to_evict;
1431
843
        size_t cur_removed_size = 0;
1432
843
        find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
1433
843
                              cur_removed_size, evict_in_advance);
1434
843
        bool is_sync_removal = !evict_in_advance;
1435
843
        std::string reason = std::string("try_reserve for cache type ") +
1436
843
                             cache_type_to_string(context.cache_type) +
1437
843
                             " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1438
843
        remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1439
843
        *(_evict_by_self_lru_metrics_matrix[context.cache_type]) << cur_removed_size;
1440
1441
843
        if (is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1442
61
            return false;
1443
61
        }
1444
843
    }
1445
1446
13.4k
    if (query_context) {
1447
16
        query_context->reserve(hash, offset, size, cache_lock);
1448
16
    }
1449
13.4k
    return true;
1450
13.4k
}
1451
1452
template <class T, class U>
1453
    requires IsXLock<T> && IsXLock<U>
1454
8.54k
void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lock, bool sync) {
1455
8.54k
    auto hash = file_block->get_hash_value();
1456
8.54k
    auto offset = file_block->offset();
1457
8.54k
    auto type = file_block->cache_type();
1458
8.54k
    auto expiration_time = file_block->expiration_time();
1459
8.54k
    auto tablet_id = file_block->tablet_id();
1460
8.54k
    auto* cell = get_cell(hash, offset, cache_lock);
1461
8.54k
    file_block->cell = nullptr;
1462
    // Holder cleanup can race with prior cache metadata cleanup. In that case,
1463
    // skip the duplicate remove instead of touching a detached or replaced cell.
1464
8.54k
    if (cell == nullptr) {
1465
1
        LOG(WARNING) << "remove skipped because cache cell is missing. hash=" << hash.to_string()
1466
1
                     << " offset=" << offset << " size=" << file_block->range().size()
1467
1
                     << " type=" << cache_type_to_string(type)
1468
1
                     << " state=" << FileBlock::state_to_string(file_block->state_unsafe())
1469
1
                     << " expiration_time=" << expiration_time << " sync=" << sync;
1470
1
        return;
1471
1
    }
1472
8.54k
    if (cell->file_block.get() != file_block.get()) {
1473
1
        auto* cell_file_block = cell->file_block.get();
1474
1
        LOG(WARNING)
1475
1
                << "remove skipped because cache cell points to a different file block. hash="
1476
1
                << hash.to_string() << " offset=" << offset
1477
1
                << " size=" << file_block->range().size() << " type=" << cache_type_to_string(type)
1478
1
                << " state=" << FileBlock::state_to_string(file_block->state_unsafe())
1479
1
                << " expiration_time=" << expiration_time << " sync=" << sync << " cell_block_hash="
1480
1
                << (cell_file_block ? cell_file_block->get_hash_value().to_string() : "<null>")
1481
1
                << " cell_block_offset="
1482
1
                << (cell_file_block ? std::to_string(cell_file_block->offset()) : "<null>")
1483
1
                << " cell_block_size="
1484
1
                << (cell_file_block ? std::to_string(cell_file_block->range().size()) : "<null>")
1485
1
                << " cell_block_type="
1486
1
                << (cell_file_block ? cache_type_to_string(cell_file_block->cache_type())
1487
1
                                    : "<null>")
1488
1
                << " cell_block_state="
1489
1
                << (cell_file_block ? FileBlock::state_to_string(cell_file_block->state_unsafe())
1490
1
                                    : "<null>");
1491
1
        return;
1492
1
    }
1493
8.54k
    DCHECK(cell->queue_iterator);
1494
8.54k
    if (cell->queue_iterator) {
1495
8.54k
        auto& queue = get_queue(file_block->cache_type());
1496
8.54k
        queue.remove(*cell->queue_iterator, cache_lock);
1497
8.54k
        _lru_recorder->record_queue_event(file_block->cache_type(), CacheLRULogType::REMOVE,
1498
8.54k
                                          cell->file_block->get_hash_value(),
1499
8.54k
                                          cell->file_block->offset(), cell->size());
1500
8.54k
    }
1501
8.54k
    *_queue_evict_size_metrics[static_cast<int>(file_block->cache_type())]
1502
8.54k
            << file_block->range().size();
1503
8.54k
    *_total_evict_size_metrics << file_block->range().size();
1504
1505
8.54k
    VLOG_DEBUG << "Removing file block from cache. hash: " << hash.to_string()
1506
0
               << ", offset: " << offset << ", size: " << file_block->range().size()
1507
0
               << ", type: " << cache_type_to_string(type);
1508
1509
8.54k
    if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADED) {
1510
6.55k
        FileCacheKey key;
1511
6.55k
        key.hash = hash;
1512
6.55k
        key.offset = offset;
1513
6.55k
        key.meta.type = type;
1514
6.55k
        key.meta.expiration_time = expiration_time;
1515
6.55k
        key.meta.tablet_id = tablet_id;
1516
6.55k
        if (sync) {
1517
6.50k
            int64_t duration_ns = 0;
1518
6.50k
            Status st;
1519
6.50k
            {
1520
6.50k
                SCOPED_RAW_TIMER(&duration_ns);
1521
6.50k
                st = _storage->remove(key);
1522
6.50k
            }
1523
6.50k
            *_storage_sync_remove_latency_us << (duration_ns / 1000);
1524
6.50k
            if (!st.ok()) {
1525
0
                LOG_WARNING("").error(st);
1526
0
            }
1527
6.50k
        } else {
1528
            // the file will be deleted in the bottom half
1529
            // so there will be a window that the file is not in the cache but still in the storage
1530
            // but it's ok, because the rowset is stale already
1531
50
            bool ret = _recycle_keys.enqueue(key);
1532
50
            if (ret) [[likely]] {
1533
50
                *_recycle_keys_length_recorder << _recycle_keys.size_approx();
1534
50
            } else {
1535
0
                LOG_WARNING("Failed to push recycle key to queue, do it synchronously");
1536
0
                int64_t duration_ns = 0;
1537
0
                Status st;
1538
0
                {
1539
0
                    SCOPED_RAW_TIMER(&duration_ns);
1540
0
                    st = _storage->remove(key);
1541
0
                }
1542
0
                *_storage_retry_sync_remove_latency_us << (duration_ns / 1000);
1543
0
                if (!st.ok()) {
1544
0
                    LOG_WARNING("").error(st);
1545
0
                }
1546
0
            }
1547
50
        }
1548
6.55k
    } else if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADING) {
1549
0
        file_block->set_deleting();
1550
0
        return;
1551
0
    }
1552
8.54k
    _cur_cache_size -= file_block->range().size();
1553
8.54k
    if (FileCacheType::TTL == type) {
1554
1.29k
        _cur_ttl_size -= file_block->range().size();
1555
1.29k
    }
1556
8.54k
    auto it = _files.find(hash);
1557
8.54k
    if (it != _files.end()) {
1558
8.54k
        it->second.erase(file_block->offset());
1559
8.54k
        if (it->second.empty()) {
1560
6.46k
            _files.erase(hash);
1561
6.46k
        }
1562
8.54k
    }
1563
8.54k
    *_num_removed_blocks << 1;
1564
8.54k
}
1565
1566
46
size_t BlockFileCache::get_used_cache_size(FileCacheType cache_type) const {
1567
46
    SCOPED_CACHE_LOCK(_mutex, this);
1568
46
    return get_used_cache_size_unlocked(cache_type, cache_lock);
1569
46
}
1570
1571
size_t BlockFileCache::get_used_cache_size_unlocked(FileCacheType cache_type,
1572
46
                                                    std::lock_guard<std::mutex>& cache_lock) const {
1573
46
    return get_queue(cache_type).get_capacity(cache_lock);
1574
46
}
1575
1576
0
size_t BlockFileCache::get_available_cache_size(FileCacheType cache_type) const {
1577
0
    SCOPED_CACHE_LOCK(_mutex, this);
1578
0
    return get_available_cache_size_unlocked(cache_type, cache_lock);
1579
0
}
1580
1581
size_t BlockFileCache::get_available_cache_size_unlocked(
1582
0
        FileCacheType cache_type, std::lock_guard<std::mutex>& cache_lock) const {
1583
0
    return get_queue(cache_type).get_max_element_size() -
1584
0
           get_used_cache_size_unlocked(cache_type, cache_lock);
1585
0
}
1586
1587
91
size_t BlockFileCache::get_file_blocks_num(FileCacheType cache_type) const {
1588
91
    SCOPED_CACHE_LOCK(_mutex, this);
1589
91
    return get_file_blocks_num_unlocked(cache_type, cache_lock);
1590
91
}
1591
1592
size_t BlockFileCache::get_file_blocks_num_unlocked(FileCacheType cache_type,
1593
91
                                                    std::lock_guard<std::mutex>& cache_lock) const {
1594
91
    return get_queue(cache_type).get_elements_num(cache_lock);
1595
91
}
1596
1597
FileBlockCell::FileBlockCell(FileBlockSPtr file_block, std::lock_guard<std::mutex>& cache_lock)
1598
13.6k
        : file_block(file_block) {
1599
13.6k
    file_block->cell = this;
1600
    /**
1601
     * Cell can be created with either DOWNLOADED or EMPTY file block's state.
1602
     * File block acquires DOWNLOADING state and creates LRUQueue iterator on first
1603
     * successful getOrSetDownaloder call.
1604
     */
1605
1606
13.6k
    switch (file_block->_download_state) {
1607
198
    case FileBlock::State::DOWNLOADED:
1608
13.6k
    case FileBlock::State::EMPTY:
1609
13.6k
    case FileBlock::State::SKIP_CACHE: {
1610
13.6k
        break;
1611
13.6k
    }
1612
0
    default:
1613
0
        DCHECK(false) << "Can create cell with either EMPTY, DOWNLOADED, SKIP_CACHE state, got: "
1614
0
                      << FileBlock::state_to_string(file_block->_download_state);
1615
13.6k
    }
1616
13.6k
    if (file_block->cache_type() == FileCacheType::TTL) {
1617
3.92k
        update_atime();
1618
3.92k
    }
1619
13.6k
}
1620
1621
LRUQueue::Iterator LRUQueue::add(const UInt128Wrapper& hash, size_t offset, size_t size,
1622
19.7k
                                 std::lock_guard<std::mutex>& /* cache_lock */) {
1623
19.7k
    cache_size += size;
1624
19.7k
    auto iter = queue.insert(queue.end(), FileKeyAndOffset(hash, offset, size));
1625
19.7k
    map.insert(std::make_pair(std::make_pair(hash, offset), iter));
1626
19.7k
    return iter;
1627
19.7k
}
1628
1629
0
void LRUQueue::remove_all(std::lock_guard<std::mutex>& /* cache_lock */) {
1630
0
    queue.clear();
1631
0
    map.clear();
1632
0
    cache_size = 0;
1633
0
}
1634
1635
800k
void LRUQueue::move_to_end(Iterator queue_it, std::lock_guard<std::mutex>& /* cache_lock */) {
1636
800k
    queue.splice(queue.end(), queue, queue_it);
1637
800k
}
1638
1639
void LRUQueue::resize(Iterator queue_it, size_t new_size,
1640
6
                      std::lock_guard<std::mutex>& /* cache_lock */) {
1641
6
    cache_size -= queue_it->size;
1642
6
    queue_it->size = new_size;
1643
6
    cache_size += new_size;
1644
6
}
1645
bool LRUQueue::contains(const UInt128Wrapper& hash, size_t offset,
1646
5
                        std::lock_guard<std::mutex>& /* cache_lock */) const {
1647
5
    return map.find(std::make_pair(hash, offset)) != map.end();
1648
5
}
1649
1650
LRUQueue::Iterator LRUQueue::get(const UInt128Wrapper& hash, size_t offset,
1651
493k
                                 std::lock_guard<std::mutex>& /* cache_lock */) const {
1652
493k
    auto itr = map.find(std::make_pair(hash, offset));
1653
493k
    if (itr != map.end()) {
1654
282k
        return itr->second;
1655
282k
    }
1656
210k
    return std::list<FileKeyAndOffset>::iterator();
1657
493k
}
1658
1659
2
std::string LRUQueue::to_string(std::lock_guard<std::mutex>& /* cache_lock */) const {
1660
2
    std::string result;
1661
18
    for (const auto& [hash, offset, size] : queue) {
1662
18
        if (!result.empty()) {
1663
16
            result += ", ";
1664
16
        }
1665
18
        result += fmt::format("{}: [{}, {}]", hash.to_string(), offset, offset + size - 1);
1666
18
    }
1667
2
    return result;
1668
2
}
1669
1670
size_t LRUQueue::levenshtein_distance_from(LRUQueue& base,
1671
5
                                           std::lock_guard<std::mutex>& cache_lock) {
1672
5
    std::list<FileKeyAndOffset> target_queue = this->queue;
1673
5
    std::list<FileKeyAndOffset> base_queue = base.queue;
1674
5
    std::vector<FileKeyAndOffset> vec1(target_queue.begin(), target_queue.end());
1675
5
    std::vector<FileKeyAndOffset> vec2(base_queue.begin(), base_queue.end());
1676
1677
5
    size_t m = vec1.size();
1678
5
    size_t n = vec2.size();
1679
1680
    // Create a 2D vector (matrix) to store the Levenshtein distances
1681
    // dp[i][j] will hold the distance between the first i elements of vec1 and the first j elements of vec2
1682
5
    std::vector<std::vector<size_t>> dp(m + 1, std::vector<size_t>(n + 1, 0));
1683
1684
    // Initialize the first row and column of the matrix
1685
    // The distance between an empty list and a list of length k is k (all insertions or deletions)
1686
22
    for (size_t i = 0; i <= m; ++i) {
1687
17
        dp[i][0] = i;
1688
17
    }
1689
20
    for (size_t j = 0; j <= n; ++j) {
1690
15
        dp[0][j] = j;
1691
15
    }
1692
1693
    // Fill the matrix using dynamic programming
1694
17
    for (size_t i = 1; i <= m; ++i) {
1695
38
        for (size_t j = 1; j <= n; ++j) {
1696
            // Check if the current elements of both vectors are equal
1697
26
            size_t cost = (vec1[i - 1].hash == vec2[j - 1].hash &&
1698
26
                           vec1[i - 1].offset == vec2[j - 1].offset)
1699
26
                                  ? 0
1700
26
                                  : 1;
1701
            // Calculate the minimum cost of three possible operations:
1702
            // 1. Insertion: dp[i][j-1] + 1
1703
            // 2. Deletion: dp[i-1][j] + 1
1704
            // 3. Substitution: dp[i-1][j-1] + cost (0 if elements are equal, 1 if not)
1705
26
            dp[i][j] = std::min({dp[i - 1][j] + 1, dp[i][j - 1] + 1, dp[i - 1][j - 1] + cost});
1706
26
        }
1707
12
    }
1708
    // The bottom-right cell of the matrix contains the Levenshtein distance
1709
5
    return dp[m][n];
1710
5
}
1711
1712
1
std::string BlockFileCache::dump_structure(const UInt128Wrapper& hash) {
1713
1
    SCOPED_CACHE_LOCK(_mutex, this);
1714
1
    return dump_structure_unlocked(hash, cache_lock);
1715
1
}
1716
1717
std::string BlockFileCache::dump_structure_unlocked(const UInt128Wrapper& hash,
1718
1
                                                    std::lock_guard<std::mutex>&) {
1719
1
    std::stringstream result;
1720
1
    auto it = _files.find(hash);
1721
1
    if (it == _files.end()) {
1722
0
        return std::string("");
1723
0
    }
1724
1
    const auto& cells_by_offset = it->second;
1725
1726
1
    for (const auto& [_, cell] : cells_by_offset) {
1727
1
        result << cell.file_block->get_info_for_log() << " "
1728
1
               << cache_type_to_string(cell.file_block->cache_type()) << "\n";
1729
1
    }
1730
1731
1
    return result.str();
1732
1
}
1733
1734
0
std::string BlockFileCache::dump_single_cache_type(const UInt128Wrapper& hash, size_t offset) {
1735
0
    SCOPED_CACHE_LOCK(_mutex, this);
1736
0
    return dump_single_cache_type_unlocked(hash, offset, cache_lock);
1737
0
}
1738
1739
std::string BlockFileCache::dump_single_cache_type_unlocked(const UInt128Wrapper& hash,
1740
                                                            size_t offset,
1741
0
                                                            std::lock_guard<std::mutex>&) {
1742
0
    std::stringstream result;
1743
0
    auto it = _files.find(hash);
1744
0
    if (it == _files.end()) {
1745
0
        return std::string("");
1746
0
    }
1747
0
    const auto& cells_by_offset = it->second;
1748
0
    const auto& cell = cells_by_offset.find(offset);
1749
1750
0
    return cache_type_to_string(cell->second.file_block->cache_type());
1751
0
}
1752
1753
void BlockFileCache::change_cache_type(const UInt128Wrapper& hash, size_t offset,
1754
                                       FileCacheType new_type,
1755
9
                                       std::lock_guard<std::mutex>& cache_lock) {
1756
9
    if (auto iter = _files.find(hash); iter != _files.end()) {
1757
9
        auto& file_blocks = iter->second;
1758
9
        if (auto cell_it = file_blocks.find(offset); cell_it != file_blocks.end()) {
1759
8
            FileBlockCell& cell = cell_it->second;
1760
8
            auto& cur_queue = get_queue(cell.file_block->cache_type());
1761
8
            DCHECK(cell.queue_iterator.has_value());
1762
8
            cur_queue.remove(*cell.queue_iterator, cache_lock);
1763
8
            _lru_recorder->record_queue_event(
1764
8
                    cell.file_block->cache_type(), CacheLRULogType::REMOVE,
1765
8
                    cell.file_block->get_hash_value(), cell.file_block->offset(), cell.size());
1766
8
            auto& new_queue = get_queue(new_type);
1767
8
            cell.queue_iterator =
1768
8
                    new_queue.add(hash, offset, cell.file_block->range().size(), cache_lock);
1769
8
            _lru_recorder->record_queue_event(new_type, CacheLRULogType::ADD,
1770
8
                                              cell.file_block->get_hash_value(),
1771
8
                                              cell.file_block->offset(), cell.size());
1772
8
        }
1773
9
    }
1774
9
}
1775
1776
// @brief: get a path's disk capacity used percent, inode used percent
1777
// @param: path
1778
// @param: percent.first disk used percent, percent.second inode used percent
1779
164
int disk_used_percentage(const std::string& path, std::pair<int, int>* percent) {
1780
164
    struct statfs stat;
1781
164
    int ret = statfs(path.c_str(), &stat);
1782
164
    if (ret != 0) {
1783
2
        return ret;
1784
2
    }
1785
    // https://github.com/coreutils/coreutils/blob/master/src/df.c#L1195
1786
    // v->used = stat.f_blocks - stat.f_bfree
1787
    // nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail
1788
162
    uintmax_t u100 = (stat.f_blocks - stat.f_bfree) * 100;
1789
162
    uintmax_t nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail;
1790
162
    int capacity_percentage = int(u100 / nonroot_total + (u100 % nonroot_total != 0));
1791
1792
162
    unsigned long long inode_free = stat.f_ffree;
1793
162
    unsigned long long inode_total = stat.f_files;
1794
162
    int inode_percentage = cast_set<int>(inode_free * 100 / inode_total);
1795
162
    percent->first = capacity_percentage;
1796
162
    percent->second = 100 - inode_percentage;
1797
1798
    // Add sync point for testing
1799
162
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::disk_used_percentage:1", percent);
1800
1801
162
    return 0;
1802
164
}
1803
1804
10
std::string BlockFileCache::reset_capacity(size_t new_capacity) {
1805
10
    using namespace std::chrono;
1806
10
    int64_t space_released = 0;
1807
10
    size_t old_capacity = 0;
1808
10
    std::stringstream ss;
1809
10
    ss << "finish reset_capacity, path=" << _cache_base_path;
1810
10
    auto adjust_start_time = steady_clock::time_point();
1811
10
    {
1812
10
        SCOPED_CACHE_LOCK(_mutex, this);
1813
10
        if (new_capacity < _capacity && new_capacity < _cur_cache_size) {
1814
1
            int64_t need_remove_size = _cur_cache_size - new_capacity;
1815
4
            auto remove_blocks = [&](LRUQueue& queue) -> int64_t {
1816
4
                int64_t queue_released = 0;
1817
4
                std::vector<FileBlockCell*> to_evict;
1818
13
                for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1819
13
                    if (need_remove_size <= 0) {
1820
1
                        break;
1821
1
                    }
1822
12
                    need_remove_size -= entry_size;
1823
12
                    space_released += entry_size;
1824
12
                    queue_released += entry_size;
1825
12
                    auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1826
12
                    if (!cell->releasable()) {
1827
0
                        cell->file_block->set_deleting();
1828
0
                        continue;
1829
0
                    }
1830
12
                    to_evict.push_back(cell);
1831
12
                }
1832
12
                for (auto& cell : to_evict) {
1833
12
                    FileBlockSPtr file_block = cell->file_block;
1834
12
                    std::lock_guard block_lock(file_block->_mutex);
1835
12
                    remove(file_block, cache_lock, block_lock);
1836
12
                }
1837
4
                return queue_released;
1838
4
            };
1839
1
            int64_t queue_released = remove_blocks(_disposable_queue);
1840
1
            ss << " disposable_queue released " << queue_released;
1841
1
            queue_released = remove_blocks(_normal_queue);
1842
1
            ss << " normal_queue released " << queue_released;
1843
1
            queue_released = remove_blocks(_index_queue);
1844
1
            ss << " index_queue released " << queue_released;
1845
1
            queue_released = remove_blocks(_ttl_queue);
1846
1
            ss << " ttl_queue released " << queue_released;
1847
1848
1
            _disk_resource_limit_mode = true;
1849
1
            _disk_limit_mode_metrics->set_value(1);
1850
1
            ss << " total_space_released=" << space_released;
1851
1
        }
1852
10
        old_capacity = _capacity;
1853
10
        _capacity = new_capacity;
1854
10
        _cache_capacity_metrics->set_value(_capacity);
1855
10
    }
1856
10
    auto use_time = duration_cast<milliseconds>(steady_clock::time_point() - adjust_start_time);
1857
10
    LOG(INFO) << "Finish tag deleted block. path=" << _cache_base_path
1858
10
              << " use_time=" << cast_set<int64_t>(use_time.count());
1859
10
    ss << " old_capacity=" << old_capacity << " new_capacity=" << new_capacity;
1860
10
    LOG(INFO) << ss.str();
1861
10
    return ss.str();
1862
10
}
1863
1864
954
void BlockFileCache::check_disk_resource_limit() {
1865
954
    if (_storage->get_type() != FileCacheStorageType::DISK) {
1866
803
        return;
1867
803
    }
1868
1869
151
    bool previous_mode = _disk_resource_limit_mode;
1870
151
    if (_capacity > _cur_cache_size) {
1871
148
        _disk_resource_limit_mode = false;
1872
148
        _disk_limit_mode_metrics->set_value(0);
1873
148
    }
1874
151
    std::pair<int, int> percent;
1875
151
    int ret = disk_used_percentage(_cache_base_path, &percent);
1876
151
    if (ret != 0) {
1877
1
        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
1878
1
        return;
1879
1
    }
1880
150
    auto [space_percentage, inode_percentage] = percent;
1881
298
    auto is_insufficient = [](const int& percentage) {
1882
298
        return percentage >= config::file_cache_enter_disk_resource_limit_mode_percent;
1883
298
    };
1884
150
    DCHECK_GE(space_percentage, 0);
1885
150
    DCHECK_LE(space_percentage, 100);
1886
150
    DCHECK_GE(inode_percentage, 0);
1887
150
    DCHECK_LE(inode_percentage, 100);
1888
    // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
1889
    // FIXME: reject with config validator
1890
150
    if (config::file_cache_enter_disk_resource_limit_mode_percent <
1891
150
        config::file_cache_exit_disk_resource_limit_mode_percent) {
1892
1
        LOG_WARNING("config error, set to default value")
1893
1
                .tag("enter", config::file_cache_enter_disk_resource_limit_mode_percent)
1894
1
                .tag("exit", config::file_cache_exit_disk_resource_limit_mode_percent);
1895
1
        config::file_cache_enter_disk_resource_limit_mode_percent = 88;
1896
1
        config::file_cache_exit_disk_resource_limit_mode_percent = 80;
1897
1
    }
1898
150
    bool is_space_insufficient = is_insufficient(space_percentage);
1899
150
    bool is_inode_insufficient = is_insufficient(inode_percentage);
1900
150
    if (is_space_insufficient || is_inode_insufficient) {
1901
1
        _disk_resource_limit_mode = true;
1902
1
        _disk_limit_mode_metrics->set_value(1);
1903
149
    } else if (_disk_resource_limit_mode &&
1904
149
               (space_percentage < config::file_cache_exit_disk_resource_limit_mode_percent) &&
1905
149
               (inode_percentage < config::file_cache_exit_disk_resource_limit_mode_percent)) {
1906
0
        _disk_resource_limit_mode = false;
1907
0
        _disk_limit_mode_metrics->set_value(0);
1908
0
    }
1909
150
    if (previous_mode != _disk_resource_limit_mode) {
1910
        // add log for disk resource limit mode switching
1911
2
        if (_disk_resource_limit_mode) {
1912
1
            LOG(WARNING) << "Entering disk resource limit mode: file_cache=" << get_base_path()
1913
1
                         << " space_percent=" << space_percentage
1914
1
                         << " inode_percent=" << inode_percentage
1915
1
                         << " is_space_insufficient=" << is_space_insufficient
1916
1
                         << " is_inode_insufficient=" << is_inode_insufficient
1917
1
                         << " enter threshold="
1918
1
                         << config::file_cache_enter_disk_resource_limit_mode_percent;
1919
1
        } else {
1920
1
            LOG(INFO) << "Exiting disk resource limit mode: file_cache=" << get_base_path()
1921
1
                      << " space_percent=" << space_percentage
1922
1
                      << " inode_percent=" << inode_percentage << " exit threshold="
1923
1
                      << config::file_cache_exit_disk_resource_limit_mode_percent;
1924
1
        }
1925
148
    } else if (_disk_resource_limit_mode) {
1926
        // print log for disk resource limit mode running, but less frequently
1927
0
        LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path()
1928
0
                                 << " space_percent=" << space_percentage
1929
0
                                 << " inode_percent=" << inode_percentage
1930
0
                                 << " is_space_insufficient=" << is_space_insufficient
1931
0
                                 << " is_inode_insufficient=" << is_inode_insufficient
1932
0
                                 << " mode run in resource limit";
1933
0
    }
1934
150
}
1935
1936
15
void BlockFileCache::check_need_evict_cache_in_advance() {
1937
15
    if (_storage->get_type() != FileCacheStorageType::DISK) {
1938
1
        return;
1939
1
    }
1940
1941
14
    std::pair<int, int> percent;
1942
14
    int ret = disk_used_percentage(_cache_base_path, &percent);
1943
14
    if (ret != 0) {
1944
1
        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
1945
1
        return;
1946
1
    }
1947
13
    auto [space_percentage, inode_percentage] = percent;
1948
13
    int size_percentage = static_cast<int>(_cur_cache_size * 100 / _capacity);
1949
39
    auto is_insufficient = [](const int& percentage) {
1950
39
        return percentage >= config::file_cache_enter_need_evict_cache_in_advance_percent;
1951
39
    };
1952
13
    DCHECK_GE(space_percentage, 0);
1953
13
    DCHECK_LE(space_percentage, 100);
1954
13
    DCHECK_GE(inode_percentage, 0);
1955
13
    DCHECK_LE(inode_percentage, 100);
1956
    // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
1957
    // FIXME: reject with config validator
1958
13
    if (config::file_cache_enter_need_evict_cache_in_advance_percent <=
1959
13
        config::file_cache_exit_need_evict_cache_in_advance_percent) {
1960
1
        LOG_WARNING("config error, set to default value")
1961
1
                .tag("enter", config::file_cache_enter_need_evict_cache_in_advance_percent)
1962
1
                .tag("exit", config::file_cache_exit_need_evict_cache_in_advance_percent);
1963
1
        config::file_cache_enter_need_evict_cache_in_advance_percent = 78;
1964
1
        config::file_cache_exit_need_evict_cache_in_advance_percent = 75;
1965
1
    }
1966
13
    bool previous_mode = _need_evict_cache_in_advance;
1967
13
    bool is_space_insufficient = is_insufficient(space_percentage);
1968
13
    bool is_inode_insufficient = is_insufficient(inode_percentage);
1969
13
    bool is_size_insufficient = is_insufficient(size_percentage);
1970
13
    if (is_space_insufficient || is_inode_insufficient || is_size_insufficient) {
1971
9
        _need_evict_cache_in_advance = true;
1972
9
        _need_evict_cache_in_advance_metrics->set_value(1);
1973
9
    } else if (_need_evict_cache_in_advance &&
1974
4
               (space_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
1975
4
               (inode_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
1976
4
               (size_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent)) {
1977
1
        _need_evict_cache_in_advance = false;
1978
1
        _need_evict_cache_in_advance_metrics->set_value(0);
1979
1
    }
1980
13
    if (previous_mode != _need_evict_cache_in_advance) {
1981
        // add log for evict cache in advance mode switching
1982
6
        if (_need_evict_cache_in_advance) {
1983
5
            LOG(WARNING) << "Entering evict cache in advance mode: "
1984
5
                         << "file_cache=" << get_base_path()
1985
5
                         << " space_percent=" << space_percentage
1986
5
                         << " inode_percent=" << inode_percentage
1987
5
                         << " size_percent=" << size_percentage
1988
5
                         << " is_space_insufficient=" << is_space_insufficient
1989
5
                         << " is_inode_insufficient=" << is_inode_insufficient
1990
5
                         << " is_size_insufficient=" << is_size_insufficient << " enter threshold="
1991
5
                         << config::file_cache_enter_need_evict_cache_in_advance_percent;
1992
5
        } else {
1993
1
            LOG(INFO) << "Exiting evict cache in advance mode: "
1994
1
                      << "file_cache=" << get_base_path() << " space_percent=" << space_percentage
1995
1
                      << " inode_percent=" << inode_percentage
1996
1
                      << " size_percent=" << size_percentage << " exit threshold="
1997
1
                      << config::file_cache_exit_need_evict_cache_in_advance_percent;
1998
1
        }
1999
7
    } else if (_need_evict_cache_in_advance) {
2000
        // print log for evict cache in advance mode running, but less frequently
2001
4
        LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path()
2002
1
                                 << " space_percent=" << space_percentage
2003
1
                                 << " inode_percent=" << inode_percentage
2004
1
                                 << " size_percent=" << size_percentage
2005
1
                                 << " is_space_insufficient=" << is_space_insufficient
2006
1
                                 << " is_inode_insufficient=" << is_inode_insufficient
2007
1
                                 << " is_size_insufficient=" << is_size_insufficient
2008
1
                                 << " need evict cache in advance";
2009
4
    }
2010
13
}
2011
2012
154
void BlockFileCache::run_background_monitor() {
2013
154
    Thread::set_self_name("run_background_monitor");
2014
954
    while (!_close) {
2015
954
        int64_t interval_ms = config::file_cache_background_monitor_interval_ms;
2016
954
        TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_ms);
2017
954
        check_disk_resource_limit();
2018
954
        if (config::enable_evict_file_cache_in_advance) {
2019
7
            check_need_evict_cache_in_advance();
2020
947
        } else {
2021
947
            _need_evict_cache_in_advance = false;
2022
947
            _need_evict_cache_in_advance_metrics->set_value(0);
2023
947
        }
2024
2025
954
        {
2026
954
            std::unique_lock close_lock(_close_mtx);
2027
954
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2028
954
            if (_close) {
2029
154
                break;
2030
154
            }
2031
954
        }
2032
        // report
2033
800
        {
2034
800
            SCOPED_CACHE_LOCK(_mutex, this);
2035
800
            _cur_cache_size_metrics->set_value(_cur_cache_size);
2036
800
            _cur_ttl_cache_size_metrics->set_value(_cur_cache_size -
2037
800
                                                   _index_queue.get_capacity(cache_lock) -
2038
800
                                                   _normal_queue.get_capacity(cache_lock) -
2039
800
                                                   _disposable_queue.get_capacity(cache_lock));
2040
800
            _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(
2041
800
                    _ttl_queue.get_capacity(cache_lock));
2042
800
            _cur_ttl_cache_lru_queue_element_count_metrics->set_value(
2043
800
                    _ttl_queue.get_elements_num(cache_lock));
2044
800
            _cur_normal_queue_cache_size_metrics->set_value(_normal_queue.get_capacity(cache_lock));
2045
800
            _cur_normal_queue_element_count_metrics->set_value(
2046
800
                    _normal_queue.get_elements_num(cache_lock));
2047
800
            _cur_index_queue_cache_size_metrics->set_value(_index_queue.get_capacity(cache_lock));
2048
800
            _cur_index_queue_element_count_metrics->set_value(
2049
800
                    _index_queue.get_elements_num(cache_lock));
2050
800
            _cur_disposable_queue_cache_size_metrics->set_value(
2051
800
                    _disposable_queue.get_capacity(cache_lock));
2052
800
            _cur_disposable_queue_element_count_metrics->set_value(
2053
800
                    _disposable_queue.get_elements_num(cache_lock));
2054
2055
            // Update meta store write queue size if storage is FSFileCacheStorage
2056
800
            if (_storage->get_type() == FileCacheStorageType::DISK) {
2057
15
                auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get());
2058
15
                if (fs_storage != nullptr) {
2059
15
                    auto* meta_store = fs_storage->get_meta_store();
2060
15
                    if (meta_store != nullptr) {
2061
15
                        _meta_store_write_queue_size_metrics->set_value(
2062
15
                                meta_store->get_write_queue_size());
2063
15
                    }
2064
15
                }
2065
15
            }
2066
2067
800
            if (_num_read_blocks->get_value() > 0) {
2068
13
                _hit_ratio->set_value((double)_num_hit_blocks->get_value() /
2069
13
                                      (double)_num_read_blocks->get_value());
2070
13
            }
2071
800
            if (_num_read_blocks_5m && _num_read_blocks_5m->get_value() > 0) {
2072
0
                _hit_ratio_5m->set_value((double)_num_hit_blocks_5m->get_value() /
2073
0
                                         (double)_num_read_blocks_5m->get_value());
2074
0
            }
2075
800
            if (_num_read_blocks_1h && _num_read_blocks_1h->get_value() > 0) {
2076
0
                _hit_ratio_1h->set_value((double)_num_hit_blocks_1h->get_value() /
2077
0
                                         (double)_num_read_blocks_1h->get_value());
2078
0
            }
2079
2080
800
            if (_no_warmup_num_read_blocks->get_value() > 0) {
2081
13
                _no_warmup_hit_ratio->set_value((double)_no_warmup_num_hit_blocks->get_value() /
2082
13
                                                (double)_no_warmup_num_read_blocks->get_value());
2083
13
            }
2084
800
            if (_no_warmup_num_read_blocks_5m && _no_warmup_num_read_blocks_5m->get_value() > 0) {
2085
0
                _no_warmup_hit_ratio_5m->set_value(
2086
0
                        (double)_no_warmup_num_hit_blocks_5m->get_value() /
2087
0
                        (double)_no_warmup_num_read_blocks_5m->get_value());
2088
0
            }
2089
800
            if (_no_warmup_num_read_blocks_1h && _no_warmup_num_read_blocks_1h->get_value() > 0) {
2090
0
                _no_warmup_hit_ratio_1h->set_value(
2091
0
                        (double)_no_warmup_num_hit_blocks_1h->get_value() /
2092
0
                        (double)_no_warmup_num_read_blocks_1h->get_value());
2093
0
            }
2094
800
        }
2095
800
    }
2096
154
}
2097
2098
154
void BlockFileCache::run_background_gc() {
2099
154
    Thread::set_self_name("run_background_gc");
2100
154
    FileCacheKey key;
2101
154
    size_t batch_count = 0;
2102
385
    while (!_close) {
2103
385
        int64_t interval_ms = config::file_cache_background_gc_interval_ms;
2104
385
        size_t batch_limit = config::file_cache_remove_block_qps_limit * interval_ms / 1000;
2105
385
        {
2106
385
            std::unique_lock close_lock(_close_mtx);
2107
385
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2108
385
            if (_close) {
2109
154
                break;
2110
154
            }
2111
385
        }
2112
2113
240
        while (batch_count < batch_limit && _recycle_keys.try_dequeue(key)) {
2114
9
            int64_t duration_ns = 0;
2115
9
            Status st;
2116
9
            {
2117
9
                SCOPED_RAW_TIMER(&duration_ns);
2118
9
                st = _storage->remove(key);
2119
9
            }
2120
9
            *_storage_async_remove_latency_us << (duration_ns / 1000);
2121
2122
9
            if (!st.ok()) {
2123
0
                LOG_WARNING("").error(st);
2124
0
            }
2125
9
            batch_count++;
2126
9
        }
2127
231
        *_recycle_keys_length_recorder << _recycle_keys.size_approx();
2128
231
        batch_count = 0;
2129
231
    }
2130
154
}
2131
2132
154
void BlockFileCache::run_background_evict_in_advance() {
2133
154
    Thread::set_self_name("run_background_evict_in_advance");
2134
154
    LOG(INFO) << "Starting background evict in advance thread";
2135
154
    int64_t batch = 0;
2136
4.20k
    while (!_close) {
2137
4.20k
        {
2138
4.20k
            std::unique_lock close_lock(_close_mtx);
2139
4.20k
            _close_cv.wait_for(
2140
4.20k
                    close_lock,
2141
4.20k
                    std::chrono::milliseconds(config::file_cache_evict_in_advance_interval_ms));
2142
4.20k
            if (_close) {
2143
154
                LOG(INFO) << "Background evict in advance thread exiting due to cache closing";
2144
154
                break;
2145
154
            }
2146
4.20k
        }
2147
4.05k
        batch = config::file_cache_evict_in_advance_batch_bytes;
2148
2149
        // Skip if eviction not needed or too many pending recycles
2150
4.05k
        if (!_need_evict_cache_in_advance ||
2151
4.05k
            _recycle_keys.size_approx() >=
2152
4.05k
                    config::file_cache_evict_in_advance_recycle_keys_num_threshold) {
2153
4.05k
            continue;
2154
4.05k
        }
2155
2156
2
        int64_t duration_ns = 0;
2157
2
        {
2158
2
            SCOPED_CACHE_LOCK(_mutex, this);
2159
2
            SCOPED_RAW_TIMER(&duration_ns);
2160
2
            try_evict_in_advance(batch, cache_lock);
2161
2
        }
2162
2
        *_evict_in_advance_latency_us << (duration_ns / 1000);
2163
2
    }
2164
154
}
2165
2166
154
void BlockFileCache::run_background_block_lru_update() {
2167
154
    Thread::set_self_name("run_background_block_lru_update");
2168
154
    std::vector<FileBlockSPtr> batch;
2169
4.12k
    while (!_close) {
2170
4.12k
        int64_t interval_ms = config::file_cache_background_block_lru_update_interval_ms;
2171
4.12k
        size_t batch_limit =
2172
4.12k
                config::file_cache_background_block_lru_update_qps_limit * interval_ms / 1000;
2173
4.12k
        {
2174
4.12k
            std::unique_lock close_lock(_close_mtx);
2175
4.12k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2176
4.12k
            if (_close) {
2177
154
                break;
2178
154
            }
2179
4.12k
        }
2180
2181
3.97k
        batch.clear();
2182
3.97k
        batch.reserve(batch_limit);
2183
3.97k
        size_t drained = _need_update_lru_blocks.drain(batch_limit, &batch);
2184
3.97k
        if (drained == 0) {
2185
3.96k
            *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2186
3.96k
            continue;
2187
3.96k
        }
2188
2189
5
        int64_t duration_ns = 0;
2190
5
        {
2191
5
            SCOPED_CACHE_LOCK(_mutex, this);
2192
5
            SCOPED_RAW_TIMER(&duration_ns);
2193
5
            for (auto& block : batch) {
2194
5
                update_block_lru(block, cache_lock);
2195
5
            }
2196
5
        }
2197
5
        *_update_lru_blocks_latency_us << (duration_ns / 1000);
2198
5
        *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2199
5
    }
2200
154
}
2201
2202
std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>>
2203
2
BlockFileCache::get_hot_blocks_meta(const UInt128Wrapper& hash) const {
2204
2
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
2205
2
                               std::chrono::steady_clock::now().time_since_epoch())
2206
2
                               .count();
2207
2
    SCOPED_CACHE_LOCK(_mutex, this);
2208
2
    std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>> blocks_meta;
2209
2
    if (auto iter = _files.find(hash); iter != _files.end()) {
2210
5
        for (auto& pair : _files.find(hash)->second) {
2211
5
            const FileBlockCell* cell = &pair.second;
2212
5
            if (cell->file_block->cache_type() != FileCacheType::DISPOSABLE) {
2213
4
                if (cell->file_block->cache_type() == FileCacheType::TTL ||
2214
4
                    (cell->atime != 0 &&
2215
3
                     cur_time - cell->atime <
2216
3
                             get_queue(cell->file_block->cache_type()).get_hot_data_interval())) {
2217
3
                    blocks_meta.emplace_back(pair.first, cell->size(),
2218
3
                                             cell->file_block->cache_type(),
2219
3
                                             cell->file_block->expiration_time());
2220
3
                }
2221
4
            }
2222
5
        }
2223
2
    }
2224
2
    return blocks_meta;
2225
2
}
2226
2227
bool BlockFileCache::try_reserve_during_async_load(size_t size,
2228
4
                                                   std::lock_guard<std::mutex>& cache_lock) {
2229
4
    size_t removed_size = 0;
2230
4
    size_t normal_queue_size = _normal_queue.get_capacity(cache_lock);
2231
4
    size_t disposable_queue_size = _disposable_queue.get_capacity(cache_lock);
2232
4
    size_t index_queue_size = _index_queue.get_capacity(cache_lock);
2233
2234
4
    std::vector<FileBlockCell*> to_evict;
2235
4
    auto collect_eliminate_fragments = [&](LRUQueue& queue) {
2236
4
        for (const auto& [entry_key, entry_offset, entry_size] : queue) {
2237
4
            if (!_disk_resource_limit_mode || removed_size >= size) {
2238
3
                break;
2239
3
            }
2240
1
            auto* cell = get_cell(entry_key, entry_offset, cache_lock);
2241
2242
1
            DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
2243
0
                         << ", offset: " << entry_offset;
2244
2245
1
            size_t cell_size = cell->size();
2246
1
            DCHECK(entry_size == cell_size);
2247
2248
1
            if (cell->releasable()) {
2249
1
                auto& file_block = cell->file_block;
2250
2251
1
                std::lock_guard block_lock(file_block->_mutex);
2252
1
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
2253
1
                to_evict.push_back(cell);
2254
1
                removed_size += cell_size;
2255
1
            }
2256
1
        }
2257
3
    };
2258
4
    if (disposable_queue_size != 0) {
2259
0
        collect_eliminate_fragments(get_queue(FileCacheType::DISPOSABLE));
2260
0
    }
2261
4
    if (normal_queue_size != 0) {
2262
3
        collect_eliminate_fragments(get_queue(FileCacheType::NORMAL));
2263
3
    }
2264
4
    if (index_queue_size != 0) {
2265
0
        collect_eliminate_fragments(get_queue(FileCacheType::INDEX));
2266
0
    }
2267
4
    std::string reason = "async load";
2268
4
    remove_file_blocks(to_evict, cache_lock, true, reason);
2269
2270
4
    return !_disk_resource_limit_mode || removed_size >= size;
2271
4
}
2272
2273
34
void BlockFileCache::clear_need_update_lru_blocks() {
2274
34
    _need_update_lru_blocks.clear();
2275
34
    *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2276
34
}
2277
2278
30
void BlockFileCache::pause_ttl_manager() {
2279
30
    if (_ttl_mgr) {
2280
6
        _ttl_mgr->stop();
2281
6
    }
2282
30
}
2283
2284
30
void BlockFileCache::resume_ttl_manager() {
2285
30
    if (_ttl_mgr) {
2286
6
        _ttl_mgr->resume();
2287
6
    }
2288
30
}
2289
2290
1
std::string BlockFileCache::clear_file_cache_directly() {
2291
1
    pause_ttl_manager();
2292
1
    _lru_dumper->remove_lru_dump_files();
2293
1
    using namespace std::chrono;
2294
1
    std::stringstream ss;
2295
1
    auto start = steady_clock::now();
2296
1
    std::string result;
2297
1
    {
2298
1
        SCOPED_CACHE_LOCK(_mutex, this);
2299
1
        LOG_INFO("start clear_file_cache_directly").tag("path", _cache_base_path);
2300
2301
1
        std::string clear_msg;
2302
1
        auto s = _storage->clear(clear_msg);
2303
1
        if (!s.ok()) {
2304
0
            result = clear_msg;
2305
1
        } else {
2306
1
            int64_t num_files = _files.size();
2307
1
            int64_t cache_size = _cur_cache_size;
2308
1
            int64_t index_queue_size = _index_queue.get_elements_num(cache_lock);
2309
1
            int64_t normal_queue_size = _normal_queue.get_elements_num(cache_lock);
2310
1
            int64_t disposible_queue_size = _disposable_queue.get_elements_num(cache_lock);
2311
1
            int64_t ttl_queue_size = _ttl_queue.get_elements_num(cache_lock);
2312
2313
1
            int64_t clear_fd_duration = 0;
2314
1
            {
2315
                // clear FDCache to release fd
2316
1
                SCOPED_RAW_TIMER(&clear_fd_duration);
2317
1
                for (const auto& [file_key, file_blocks] : _files) {
2318
1
                    for (const auto& [offset, file_block_cell] : file_blocks) {
2319
1
                        AccessKeyAndOffset access_key_and_offset(file_key, offset);
2320
1
                        FDCache::instance()->remove_file_reader(access_key_and_offset);
2321
1
                    }
2322
1
                }
2323
1
            }
2324
2325
1
            _files.clear();
2326
1
            _cur_cache_size = 0;
2327
1
            _cur_ttl_size = 0;
2328
1
            _time_to_key.clear();
2329
1
            _key_to_time.clear();
2330
1
            _index_queue.clear(cache_lock);
2331
1
            _normal_queue.clear(cache_lock);
2332
1
            _disposable_queue.clear(cache_lock);
2333
1
            _ttl_queue.clear(cache_lock);
2334
2335
            // Update cache metrics immediately so consumers observe the cleared state
2336
            // without waiting for the next background monitor round.
2337
1
            _cur_cache_size_metrics->set_value(0);
2338
1
            _cur_ttl_cache_size_metrics->set_value(0);
2339
1
            _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(0);
2340
1
            _cur_ttl_cache_lru_queue_element_count_metrics->set_value(0);
2341
1
            _cur_normal_queue_cache_size_metrics->set_value(0);
2342
1
            _cur_normal_queue_element_count_metrics->set_value(0);
2343
1
            _cur_index_queue_cache_size_metrics->set_value(0);
2344
1
            _cur_index_queue_element_count_metrics->set_value(0);
2345
1
            _cur_disposable_queue_cache_size_metrics->set_value(0);
2346
1
            _cur_disposable_queue_element_count_metrics->set_value(0);
2347
2348
1
            clear_need_update_lru_blocks();
2349
2350
1
            ss << "finish clear_file_cache_directly"
2351
1
               << " path=" << _cache_base_path << " time_elapsed_ms="
2352
1
               << duration_cast<milliseconds>(steady_clock::now() - start).count()
2353
1
               << " fd_clear_time_ms=" << (clear_fd_duration / 1000000)
2354
1
               << " num_files=" << num_files << " cache_size=" << cache_size
2355
1
               << " index_queue_size=" << index_queue_size
2356
1
               << " normal_queue_size=" << normal_queue_size
2357
1
               << " disposible_queue_size=" << disposible_queue_size
2358
1
               << "ttl_queue_size=" << ttl_queue_size;
2359
1
            result = ss.str();
2360
1
            LOG(INFO) << result;
2361
1
        }
2362
1
    }
2363
1
    _lru_dumper->remove_lru_dump_files();
2364
1
    resume_ttl_manager();
2365
1
    return result;
2366
1
}
2367
2368
5.14k
std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const UInt128Wrapper& hash) {
2369
5.14k
    std::map<size_t, FileBlockSPtr> offset_to_block;
2370
5.14k
    SCOPED_CACHE_LOCK(_mutex, this);
2371
5.14k
    if (_files.contains(hash)) {
2372
5.14k
        for (auto& [offset, cell] : _files[hash]) {
2373
5.14k
            if (cell.file_block->state() == FileBlock::State::DOWNLOADED) {
2374
5.14k
                cell.file_block->_owned_by_cached_reader = true;
2375
5.14k
                offset_to_block.emplace(offset, cell.file_block);
2376
5.14k
            }
2377
5.14k
        }
2378
5.13k
    }
2379
5.14k
    return offset_to_block;
2380
5.14k
}
2381
2382
0
void BlockFileCache::update_ttl_atime(const UInt128Wrapper& hash) {
2383
0
    SCOPED_CACHE_LOCK(_mutex, this);
2384
0
    if (auto iter = _files.find(hash); iter != _files.end()) {
2385
0
        for (auto& [_, cell] : iter->second) {
2386
0
            cell.update_atime();
2387
0
        }
2388
0
    };
2389
0
}
2390
2391
154
void BlockFileCache::run_background_lru_log_replay() {
2392
154
    Thread::set_self_name("run_background_lru_log_replay");
2393
4.20k
    while (!_close) {
2394
4.20k
        int64_t interval_ms = config::file_cache_background_lru_log_replay_interval_ms;
2395
4.20k
        {
2396
4.20k
            std::unique_lock close_lock(_close_mtx);
2397
4.20k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2398
4.20k
            if (_close) {
2399
154
                break;
2400
154
            }
2401
4.20k
        }
2402
2403
4.05k
        _lru_recorder->replay_queue_event(FileCacheType::TTL);
2404
4.05k
        _lru_recorder->replay_queue_event(FileCacheType::INDEX);
2405
4.05k
        _lru_recorder->replay_queue_event(FileCacheType::NORMAL);
2406
4.05k
        _lru_recorder->replay_queue_event(FileCacheType::DISPOSABLE);
2407
2408
4.05k
        if (config::enable_evaluate_shadow_queue_diff) {
2409
0
            SCOPED_CACHE_LOCK(_mutex, this);
2410
0
            _lru_recorder->evaluate_queue_diff(_ttl_queue, "ttl", cache_lock);
2411
0
            _lru_recorder->evaluate_queue_diff(_index_queue, "index", cache_lock);
2412
0
            _lru_recorder->evaluate_queue_diff(_normal_queue, "normal", cache_lock);
2413
0
            _lru_recorder->evaluate_queue_diff(_disposable_queue, "disposable", cache_lock);
2414
0
        }
2415
4.05k
    }
2416
154
}
2417
2418
1.32k
void BlockFileCache::dump_lru_queues(bool force) {
2419
1.32k
    std::unique_lock dump_lock(_dump_lru_queues_mtx);
2420
1.32k
    if (config::file_cache_background_lru_dump_tail_record_num > 0 &&
2421
1.32k
        !ExecEnv::GetInstance()->get_is_upgrading()) {
2422
1.32k
        _lru_dumper->dump_queue("disposable", force);
2423
1.32k
        _lru_dumper->dump_queue("normal", force);
2424
1.32k
        _lru_dumper->dump_queue("index", force);
2425
1.32k
        _lru_dumper->dump_queue("ttl", force);
2426
1.32k
        _lru_dumper->set_first_dump_done();
2427
1.32k
    }
2428
1.32k
}
2429
2430
154
void BlockFileCache::run_background_lru_dump() {
2431
154
    Thread::set_self_name("run_background_lru_dump");
2432
1.47k
    while (!_close) {
2433
1.47k
        int64_t interval_ms = config::file_cache_background_lru_dump_interval_ms;
2434
1.47k
        {
2435
1.47k
            std::unique_lock close_lock(_close_mtx);
2436
1.47k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2437
1.47k
            if (_close) {
2438
154
                break;
2439
154
            }
2440
1.47k
        }
2441
1.32k
        dump_lru_queues(false);
2442
1.32k
    }
2443
154
}
2444
2445
154
void BlockFileCache::restore_lru_queues_from_disk(std::lock_guard<std::mutex>& cache_lock) {
2446
    // keep this order coz may be duplicated in different queue, we use the first appearence
2447
154
    _lru_dumper->restore_queue(_ttl_queue, "ttl", cache_lock);
2448
154
    _lru_dumper->restore_queue(_index_queue, "index", cache_lock);
2449
154
    _lru_dumper->restore_queue(_normal_queue, "normal", cache_lock);
2450
154
    _lru_dumper->restore_queue(_disposable_queue, "disposable", cache_lock);
2451
154
}
2452
2453
0
std::map<std::string, double> BlockFileCache::get_stats() {
2454
0
    std::map<std::string, double> stats;
2455
0
    stats["hits_ratio"] = (double)_hit_ratio->get_value();
2456
0
    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
2457
0
    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
2458
2459
0
    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
2460
0
    stats["index_queue_curr_size"] = (double)_cur_index_queue_cache_size_metrics->get_value();
2461
0
    stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
2462
0
    stats["index_queue_curr_elements"] =
2463
0
            (double)_cur_index_queue_element_count_metrics->get_value();
2464
2465
0
    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
2466
0
    stats["ttl_queue_curr_size"] = (double)_cur_ttl_cache_lru_queue_cache_size_metrics->get_value();
2467
0
    stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
2468
0
    stats["ttl_queue_curr_elements"] =
2469
0
            (double)_cur_ttl_cache_lru_queue_element_count_metrics->get_value();
2470
2471
0
    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
2472
0
    stats["normal_queue_curr_size"] = (double)_cur_normal_queue_cache_size_metrics->get_value();
2473
0
    stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
2474
0
    stats["normal_queue_curr_elements"] =
2475
0
            (double)_cur_normal_queue_element_count_metrics->get_value();
2476
2477
0
    stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
2478
0
    stats["disposable_queue_curr_size"] =
2479
0
            (double)_cur_disposable_queue_cache_size_metrics->get_value();
2480
0
    stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
2481
0
    stats["disposable_queue_curr_elements"] =
2482
0
            (double)_cur_disposable_queue_element_count_metrics->get_value();
2483
2484
0
    stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance;
2485
0
    stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode;
2486
2487
0
    stats["total_removed_counts"] = (double)_num_removed_blocks->get_value();
2488
0
    stats["total_hit_counts"] = (double)_num_hit_blocks->get_value();
2489
0
    stats["total_read_counts"] = (double)_num_read_blocks->get_value();
2490
2491
0
    stats["total_read_size"] = (double)_total_read_size_metrics->get_value();
2492
0
    stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value();
2493
0
    stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value();
2494
2495
0
    return stats;
2496
0
}
2497
2498
// for be UTs
2499
172
std::map<std::string, double> BlockFileCache::get_stats_unsafe() {
2500
172
    std::map<std::string, double> stats;
2501
172
    stats["hits_ratio"] = (double)_hit_ratio->get_value();
2502
172
    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
2503
172
    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
2504
2505
172
    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
2506
172
    stats["index_queue_curr_size"] = (double)_index_queue.get_capacity_unsafe();
2507
172
    stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
2508
172
    stats["index_queue_curr_elements"] = (double)_index_queue.get_elements_num_unsafe();
2509
2510
172
    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
2511
172
    stats["ttl_queue_curr_size"] = (double)_ttl_queue.get_capacity_unsafe();
2512
172
    stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
2513
172
    stats["ttl_queue_curr_elements"] = (double)_ttl_queue.get_elements_num_unsafe();
2514
2515
172
    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
2516
172
    stats["normal_queue_curr_size"] = (double)_normal_queue.get_capacity_unsafe();
2517
172
    stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
2518
172
    stats["normal_queue_curr_elements"] = (double)_normal_queue.get_elements_num_unsafe();
2519
2520
172
    stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
2521
172
    stats["disposable_queue_curr_size"] = (double)_disposable_queue.get_capacity_unsafe();
2522
172
    stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
2523
172
    stats["disposable_queue_curr_elements"] = (double)_disposable_queue.get_elements_num_unsafe();
2524
2525
172
    stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance;
2526
172
    stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode;
2527
2528
172
    stats["total_removed_counts"] = (double)_num_removed_blocks->get_value();
2529
172
    stats["total_hit_counts"] = (double)_num_hit_blocks->get_value();
2530
172
    stats["total_read_counts"] = (double)_num_read_blocks->get_value();
2531
2532
172
    stats["total_read_size"] = (double)_total_read_size_metrics->get_value();
2533
172
    stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value();
2534
172
    stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value();
2535
2536
172
    return stats;
2537
172
}
2538
2539
template void BlockFileCache::remove(FileBlockSPtr file_block,
2540
                                     std::lock_guard<std::mutex>& cache_lock,
2541
                                     std::lock_guard<std::mutex>& block_lock, bool sync);
2542
2543
1
Status BlockFileCache::report_file_cache_inconsistency(std::vector<std::string>& results) {
2544
1
    InconsistencyContext inconsistency_context;
2545
1
    RETURN_IF_ERROR(check_file_cache_consistency(inconsistency_context));
2546
1
    auto n = inconsistency_context.types.size();
2547
1
    results.reserve(n);
2548
1
    for (size_t i = 0; i < n; i++) {
2549
0
        std::string result;
2550
0
        result += "File cache info in manager:\n";
2551
0
        result += inconsistency_context.infos_in_manager[i].to_string();
2552
0
        result += "File cache info in storage:\n";
2553
0
        result += inconsistency_context.infos_in_storage[i].to_string();
2554
0
        result += inconsistency_context.types[i].to_string();
2555
0
        result += "\n";
2556
0
        results.push_back(std::move(result));
2557
0
    }
2558
1
    return Status::OK();
2559
1
}
2560
2561
1
Status BlockFileCache::check_file_cache_consistency(InconsistencyContext& inconsistency_context) {
2562
1
    std::lock_guard<std::mutex> cache_lock(_mutex);
2563
1
    std::vector<FileCacheInfo> infos_in_storage;
2564
1
    RETURN_IF_ERROR(_storage->get_file_cache_infos(infos_in_storage, cache_lock));
2565
1
    std::unordered_set<AccessKeyAndOffset, KeyAndOffsetHash> confirmed_blocks;
2566
1
    for (const auto& info_in_storage : infos_in_storage) {
2567
0
        confirmed_blocks.insert({info_in_storage.hash, info_in_storage.offset});
2568
0
        auto* cell = get_cell(info_in_storage.hash, info_in_storage.offset, cache_lock);
2569
0
        if (cell == nullptr || cell->file_block == nullptr) {
2570
0
            inconsistency_context.infos_in_manager.emplace_back();
2571
0
            inconsistency_context.infos_in_storage.push_back(info_in_storage);
2572
0
            inconsistency_context.types.emplace_back(InconsistencyType::NOT_LOADED);
2573
0
            continue;
2574
0
        }
2575
0
        FileCacheInfo info_in_manager {
2576
0
                .hash = info_in_storage.hash,
2577
0
                .expiration_time = cell->file_block->expiration_time(),
2578
0
                .size = cell->size(),
2579
0
                .offset = info_in_storage.offset,
2580
0
                .is_tmp = cell->file_block->state() == FileBlock::State::DOWNLOADING,
2581
0
                .cache_type = cell->file_block->cache_type()};
2582
0
        InconsistencyType inconsistent_type;
2583
0
        if (info_in_storage.is_tmp != info_in_manager.is_tmp) {
2584
0
            inconsistent_type |= InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE;
2585
0
        }
2586
0
        size_t expected_size =
2587
0
                info_in_manager.is_tmp ? cell->dowloading_size() : info_in_manager.size;
2588
0
        if (info_in_storage.size != expected_size) {
2589
0
            inconsistent_type |= InconsistencyType::SIZE_INCONSISTENT;
2590
0
        }
2591
        // Only if it is not a tmp file need we check the cache type.
2592
0
        if ((inconsistent_type & InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE) == 0 &&
2593
0
            info_in_storage.cache_type != info_in_manager.cache_type) {
2594
0
            inconsistent_type |= InconsistencyType::CACHE_TYPE_INCONSISTENT;
2595
0
        }
2596
0
        if (info_in_storage.expiration_time != info_in_manager.expiration_time) {
2597
0
            inconsistent_type |= InconsistencyType::EXPIRATION_TIME_INCONSISTENT;
2598
0
        }
2599
0
        if (inconsistent_type != InconsistencyType::NONE) {
2600
0
            inconsistency_context.infos_in_manager.push_back(info_in_manager);
2601
0
            inconsistency_context.infos_in_storage.push_back(info_in_storage);
2602
0
            inconsistency_context.types.push_back(inconsistent_type);
2603
0
        }
2604
0
    }
2605
2606
1
    for (const auto& [hash, offset_to_cell] : _files) {
2607
0
        for (const auto& [offset, cell] : offset_to_cell) {
2608
0
            if (confirmed_blocks.contains({hash, offset})) {
2609
0
                continue;
2610
0
            }
2611
0
            const auto& block = cell.file_block;
2612
0
            inconsistency_context.infos_in_manager.emplace_back(
2613
0
                    hash, block->expiration_time(), cell.size(), offset,
2614
0
                    cell.file_block->state() == FileBlock::State::DOWNLOADING, block->cache_type());
2615
0
            inconsistency_context.infos_in_storage.emplace_back();
2616
0
            inconsistency_context.types.emplace_back(InconsistencyType::MISSING_IN_STORAGE);
2617
0
        }
2618
0
    }
2619
1
    return Status::OK();
2620
1
}
2621
2622
} // namespace doris::io