Coverage Report

Created: 2026-06-02 06:07

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/cache/block_file_cache.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileCache.cpp
19
// and modified by Doris
20
21
#include "io/cache/block_file_cache.h"
22
23
#include <gen_cpp/file_cache.pb.h>
24
25
#include <cstdio>
26
#include <exception>
27
#include <fstream>
28
#include <unordered_set>
29
30
#include "common/status.h"
31
#include "cpp/sync_point.h"
32
#include "runtime/exec_env.h"
33
34
#if defined(__APPLE__)
35
#include <sys/mount.h>
36
#else
37
#include <sys/statfs.h>
38
#endif
39
40
#include <chrono> // IWYU pragma: keep
41
#include <mutex>
42
#include <ranges>
43
44
#include "common/cast_set.h"
45
#include "common/config.h"
46
#include "common/logging.h"
47
#include "core/uint128.h"
48
#include "exec/common/sip_hash.h"
49
#include "io/cache/block_file_cache_ttl_mgr.h"
50
#include "io/cache/file_block.h"
51
#include "io/cache/file_cache_common.h"
52
#include "io/cache/fs_file_cache_storage.h"
53
#include "io/cache/mem_file_cache_storage.h"
54
#include "runtime/runtime_profile.h"
55
#include "util/concurrency_stats.h"
56
#include "util/stack_util.h"
57
#include "util/stopwatch.hpp"
58
#include "util/thread.h"
59
#include "util/time.h"
60
namespace doris::io {
61
62
// Insert a block pointer into one shard while swallowing allocation failures.
63
193k
bool NeedUpdateLRUBlocks::insert(FileBlockSPtr block) {
64
193k
    if (!block) {
65
1
        return false;
66
1
    }
67
193k
    try {
68
193k
        auto* raw_ptr = block.get();
69
193k
        auto idx = shard_index(raw_ptr);
70
193k
        auto& shard = _shards[idx];
71
193k
        std::lock_guard lock(shard.mutex);
72
193k
        auto [_, inserted] = shard.entries.emplace(raw_ptr, std::move(block));
73
193k
        if (inserted) {
74
1.56k
            _size.fetch_add(1, std::memory_order_relaxed);
75
1.56k
        }
76
193k
        return inserted;
77
193k
    } catch (const std::exception& e) {
78
0
        LOG(WARNING) << "Failed to enqueue block for LRU update: " << e.what();
79
0
    } catch (...) {
80
0
        LOG(WARNING) << "Failed to enqueue block for LRU update: unknown error";
81
0
    }
82
0
    return false;
83
193k
}
84
85
// Drain up to `limit` unique blocks to the caller, keeping the structure consistent on failures.
86
6.81k
size_t NeedUpdateLRUBlocks::drain(size_t limit, std::vector<FileBlockSPtr>* output) {
87
6.81k
    if (limit == 0 || output == nullptr) {
88
2
        return 0;
89
2
    }
90
6.80k
    size_t drained = 0;
91
6.80k
    try {
92
6.80k
        output->reserve(output->size() + std::min(limit, size()));
93
435k
        for (auto& shard : _shards) {
94
435k
            if (drained >= limit) {
95
1
                break;
96
1
            }
97
435k
            std::lock_guard lock(shard.mutex);
98
435k
            auto it = shard.entries.begin();
99
435k
            size_t shard_drained = 0;
100
435k
            while (it != shard.entries.end() && drained + shard_drained < limit) {
101
10
                output->emplace_back(std::move(it->second));
102
10
                it = shard.entries.erase(it);
103
10
                ++shard_drained;
104
10
            }
105
435k
            if (shard_drained > 0) {
106
7
                _size.fetch_sub(shard_drained, std::memory_order_relaxed);
107
7
                drained += shard_drained;
108
7
            }
109
435k
        }
110
6.80k
    } catch (const std::exception& e) {
111
0
        LOG(WARNING) << "Failed to drain LRU update blocks: " << e.what();
112
0
    } catch (...) {
113
0
        LOG(WARNING) << "Failed to drain LRU update blocks: unknown error";
114
0
    }
115
6.80k
    return drained;
116
6.80k
}
117
118
// Remove every pending block, guarding against unexpected exceptions.
119
40
void NeedUpdateLRUBlocks::clear() {
120
40
    try {
121
2.56k
        for (auto& shard : _shards) {
122
2.56k
            std::lock_guard lock(shard.mutex);
123
2.56k
            if (!shard.entries.empty()) {
124
2
                auto removed = shard.entries.size();
125
2
                shard.entries.clear();
126
2
                _size.fetch_sub(removed, std::memory_order_relaxed);
127
2
            }
128
2.56k
        }
129
40
    } catch (const std::exception& e) {
130
0
        LOG(WARNING) << "Failed to clear LRU update blocks: " << e.what();
131
0
    } catch (...) {
132
0
        LOG(WARNING) << "Failed to clear LRU update blocks: unknown error";
133
0
    }
134
40
}
135
136
193k
size_t NeedUpdateLRUBlocks::shard_index(FileBlock* ptr) const {
137
193k
    DCHECK(ptr != nullptr);
138
193k
    return std::hash<FileBlock*> {}(ptr)&kShardMask;
139
193k
}
140
141
BlockFileCache::BlockFileCache(const std::string& cache_base_path,
142
                               const FileCacheSettings& cache_settings)
143
175
        : _cache_base_path(cache_base_path),
144
175
          _capacity(cache_settings.capacity),
145
175
          _max_file_block_size(cache_settings.max_file_block_size) {
146
175
    _cur_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(_cache_base_path.c_str(),
147
175
                                                                     "file_cache_cache_size", 0);
148
175
    _cache_capacity_metrics = std::make_shared<bvar::Status<size_t>>(
149
175
            _cache_base_path.c_str(), "file_cache_capacity", _capacity);
150
175
    _cur_ttl_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
151
175
            _cache_base_path.c_str(), "file_cache_ttl_cache_size", 0);
152
175
    _cur_normal_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
153
175
            _cache_base_path.c_str(), "file_cache_normal_queue_element_count", 0);
154
175
    _cur_ttl_cache_lru_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
155
175
            _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_size", 0);
156
175
    _cur_ttl_cache_lru_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
157
175
            _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_element_count", 0);
158
175
    _cur_normal_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
159
175
            _cache_base_path.c_str(), "file_cache_normal_queue_cache_size", 0);
160
175
    _cur_index_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
161
175
            _cache_base_path.c_str(), "file_cache_index_queue_element_count", 0);
162
175
    _cur_index_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
163
175
            _cache_base_path.c_str(), "file_cache_index_queue_cache_size", 0);
164
175
    _cur_disposable_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
165
175
            _cache_base_path.c_str(), "file_cache_disposable_queue_element_count", 0);
166
175
    _cur_disposable_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
167
175
            _cache_base_path.c_str(), "file_cache_disposable_queue_cache_size", 0);
168
169
175
    _queue_evict_size_metrics[0] = std::make_shared<bvar::Adder<size_t>>(
170
175
            _cache_base_path.c_str(), "file_cache_index_queue_evict_size");
171
175
    _queue_evict_size_metrics[1] = std::make_shared<bvar::Adder<size_t>>(
172
175
            _cache_base_path.c_str(), "file_cache_normal_queue_evict_size");
173
175
    _queue_evict_size_metrics[2] = std::make_shared<bvar::Adder<size_t>>(
174
175
            _cache_base_path.c_str(), "file_cache_disposable_queue_evict_size");
175
175
    _queue_evict_size_metrics[3] = std::make_shared<bvar::Adder<size_t>>(
176
175
            _cache_base_path.c_str(), "file_cache_ttl_cache_evict_size");
177
175
    _total_evict_size_metrics = std::make_shared<bvar::Adder<size_t>>(
178
175
            _cache_base_path.c_str(), "file_cache_total_evict_size");
179
175
    _total_read_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
180
175
                                                                     "file_cache_total_read_size");
181
175
    _total_hit_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
182
175
                                                                    "file_cache_total_hit_size");
183
175
    _gc_evict_bytes_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
184
175
                                                                    "file_cache_gc_evict_bytes");
185
175
    _gc_evict_count_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
186
175
                                                                    "file_cache_gc_evict_count");
187
188
175
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
189
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
190
175
                                                  "file_cache_evict_by_time_disposable_to_normal");
191
175
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
192
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
193
175
                                                  "file_cache_evict_by_time_disposable_to_index");
194
175
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
195
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
196
175
                                                  "file_cache_evict_by_time_disposable_to_ttl");
197
175
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
198
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
199
175
                                                  "file_cache_evict_by_time_normal_to_disposable");
200
175
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
201
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
202
175
                                                  "file_cache_evict_by_time_normal_to_index");
203
175
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
204
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
205
175
                                                  "file_cache_evict_by_time_normal_to_ttl");
206
175
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
207
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
208
175
                                                  "file_cache_evict_by_time_index_to_disposable");
209
175
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
210
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
211
175
                                                  "file_cache_evict_by_time_index_to_normal");
212
175
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
213
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
214
175
                                                  "file_cache_evict_by_time_index_to_ttl");
215
175
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
216
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
217
175
                                                  "file_cache_evict_by_time_ttl_to_disposable");
218
175
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
219
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
220
175
                                                  "file_cache_evict_by_time_ttl_to_normal");
221
175
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
222
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
223
175
                                                  "file_cache_evict_by_time_ttl_to_index");
224
225
175
    _evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE] =
226
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
227
175
                                                  "file_cache_evict_by_self_lru_disposable");
228
175
    _evict_by_self_lru_metrics_matrix[FileCacheType::NORMAL] =
229
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
230
175
                                                  "file_cache_evict_by_self_lru_normal");
231
175
    _evict_by_self_lru_metrics_matrix[FileCacheType::INDEX] = std::make_shared<bvar::Adder<size_t>>(
232
175
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_index");
233
175
    _evict_by_self_lru_metrics_matrix[FileCacheType::TTL] = std::make_shared<bvar::Adder<size_t>>(
234
175
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_ttl");
235
236
175
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
237
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
238
175
                                                  "file_cache_evict_by_size_disposable_to_normal");
239
175
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
240
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
241
175
                                                  "file_cache_evict_by_size_disposable_to_index");
242
175
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
243
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
244
175
                                                  "file_cache_evict_by_size_disposable_to_ttl");
245
175
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
246
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
247
175
                                                  "file_cache_evict_by_size_normal_to_disposable");
248
175
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
249
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
250
175
                                                  "file_cache_evict_by_size_normal_to_index");
251
175
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
252
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
253
175
                                                  "file_cache_evict_by_size_normal_to_ttl");
254
175
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
255
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
256
175
                                                  "file_cache_evict_by_size_index_to_disposable");
257
175
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
258
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
259
175
                                                  "file_cache_evict_by_size_index_to_normal");
260
175
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
261
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
262
175
                                                  "file_cache_evict_by_size_index_to_ttl");
263
175
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
264
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
265
175
                                                  "file_cache_evict_by_size_ttl_to_disposable");
266
175
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
267
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
268
175
                                                  "file_cache_evict_by_size_ttl_to_normal");
269
175
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
270
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
271
175
                                                  "file_cache_evict_by_size_ttl_to_index");
272
273
175
    _evict_by_try_release = std::make_shared<bvar::Adder<size_t>>(
274
175
            _cache_base_path.c_str(), "file_cache_evict_by_try_release");
275
276
175
    _num_read_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
277
175
                                                             "file_cache_num_read_blocks");
278
175
    _num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
279
175
                                                            "file_cache_num_hit_blocks");
280
175
    _num_removed_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
281
175
                                                                "file_cache_num_removed_blocks");
282
283
175
    _no_warmup_num_read_blocks = std::make_shared<bvar::Adder<size_t>>(
284
175
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks");
285
175
    _no_warmup_num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(
286
175
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks");
287
288
175
#ifndef BE_TEST
289
175
    _num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
290
175
            _cache_base_path.c_str(), "file_cache_num_hit_blocks_5m", _num_hit_blocks.get(), 300);
291
175
    _num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
292
175
            _cache_base_path.c_str(), "file_cache_num_read_blocks_5m", _num_read_blocks.get(), 300);
293
175
    _num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
294
175
            _cache_base_path.c_str(), "file_cache_num_hit_blocks_1h", _num_hit_blocks.get(), 3600);
295
175
    _num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
296
175
            _cache_base_path.c_str(), "file_cache_num_read_blocks_1h", _num_read_blocks.get(),
297
175
            3600);
298
175
    _no_warmup_num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
299
175
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_5m",
300
175
            _no_warmup_num_hit_blocks.get(), 300);
301
175
    _no_warmup_num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
302
175
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_5m",
303
175
            _no_warmup_num_read_blocks.get(), 300);
304
175
    _no_warmup_num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
305
175
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_1h",
306
175
            _no_warmup_num_hit_blocks.get(), 3600);
307
175
    _no_warmup_num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
308
175
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_1h",
309
175
            _no_warmup_num_read_blocks.get(), 3600);
310
175
#endif
311
312
175
    _hit_ratio = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
313
175
                                                        "file_cache_hit_ratio", 0.0);
314
175
    _hit_ratio_5m = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
315
175
                                                           "file_cache_hit_ratio_5m", 0.0);
316
175
    _hit_ratio_1h = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
317
175
                                                           "file_cache_hit_ratio_1h", 0.0);
318
319
175
    _no_warmup_hit_ratio = std::make_shared<bvar::Status<double>>(
320
175
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio", 0.0);
321
175
    _no_warmup_hit_ratio_5m = std::make_shared<bvar::Status<double>>(
322
175
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_5m", 0.0);
323
175
    _no_warmup_hit_ratio_1h = std::make_shared<bvar::Status<double>>(
324
175
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_1h", 0.0);
325
326
175
    _disk_limit_mode_metrics = std::make_shared<bvar::Status<size_t>>(
327
175
            _cache_base_path.c_str(), "file_cache_disk_limit_mode", 0);
328
175
    _need_evict_cache_in_advance_metrics = std::make_shared<bvar::Status<size_t>>(
329
175
            _cache_base_path.c_str(), "file_cache_need_evict_cache_in_advance", 0);
330
175
    _meta_store_write_queue_size_metrics = std::make_shared<bvar::Status<size_t>>(
331
175
            _cache_base_path.c_str(), "file_cache_meta_store_write_queue_size", 0);
332
333
175
    _cache_lock_wait_time_us = std::make_shared<bvar::LatencyRecorder>(
334
175
            _cache_base_path.c_str(), "file_cache_cache_lock_wait_time_us");
335
175
    _get_or_set_latency_us = std::make_shared<bvar::LatencyRecorder>(
336
175
            _cache_base_path.c_str(), "file_cache_get_or_set_latency_us");
337
175
    _storage_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
338
175
            _cache_base_path.c_str(), "file_cache_storage_sync_remove_latency_us");
339
175
    _storage_retry_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
340
175
            _cache_base_path.c_str(), "file_cache_storage_retry_sync_remove_latency_us");
341
175
    _storage_async_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
342
175
            _cache_base_path.c_str(), "file_cache_storage_async_remove_latency_us");
343
175
    _evict_in_advance_latency_us = std::make_shared<bvar::LatencyRecorder>(
344
175
            _cache_base_path.c_str(), "file_cache_evict_in_advance_latency_us");
345
175
    _lru_dump_latency_us = std::make_shared<bvar::LatencyRecorder>(
346
175
            _cache_base_path.c_str(), "file_cache_lru_dump_latency_us");
347
175
    _recycle_keys_length_recorder = std::make_shared<bvar::LatencyRecorder>(
348
175
            _cache_base_path.c_str(), "file_cache_recycle_keys_length");
349
175
    _need_update_lru_blocks_length_recorder = std::make_shared<bvar::LatencyRecorder>(
350
175
            _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_length");
351
175
    _update_lru_blocks_latency_us = std::make_shared<bvar::LatencyRecorder>(
352
175
            _cache_base_path.c_str(), "file_cache_update_lru_blocks_latency_us");
353
175
    _ttl_gc_latency_us = std::make_shared<bvar::LatencyRecorder>(_cache_base_path.c_str(),
354
175
                                                                 "file_cache_ttl_gc_latency_us");
355
175
    _shadow_queue_levenshtein_distance = std::make_shared<bvar::LatencyRecorder>(
356
175
            _cache_base_path.c_str(), "file_cache_shadow_queue_levenshtein_distance");
357
358
175
    _disposable_queue = LRUQueue(cache_settings.disposable_queue_size,
359
175
                                 cache_settings.disposable_queue_elements, 60 * 60);
360
175
    _index_queue = LRUQueue(cache_settings.index_queue_size, cache_settings.index_queue_elements,
361
175
                            7 * 24 * 60 * 60);
362
175
    _normal_queue = LRUQueue(cache_settings.query_queue_size, cache_settings.query_queue_elements,
363
175
                             24 * 60 * 60);
364
175
    _ttl_queue = LRUQueue(cache_settings.ttl_queue_size, cache_settings.ttl_queue_elements,
365
175
                          std::numeric_limits<int>::max());
366
367
175
    _lru_recorder = std::make_unique<LRUQueueRecorder>(this);
368
175
    _lru_dumper = std::make_unique<CacheLRUDumper>(this, _lru_recorder.get());
369
175
    if (cache_settings.storage == "memory") {
370
21
        _storage = std::make_unique<MemFileCacheStorage>();
371
21
        _cache_base_path = "memory";
372
154
    } else {
373
154
        _storage = std::make_unique<FSFileCacheStorage>();
374
154
    }
375
376
175
    LOG(INFO) << "file cache path= " << _cache_base_path << " " << cache_settings.to_string();
377
175
}
378
379
84.1k
UInt128Wrapper BlockFileCache::hash(const std::string& path) {
380
84.1k
    uint128_t value;
381
84.1k
    sip_hash128(path.data(), path.size(), reinterpret_cast<char*>(&value));
382
84.1k
    return UInt128Wrapper(value);
383
84.1k
}
384
385
BlockFileCache::QueryFileCacheContextHolderPtr BlockFileCache::get_query_context_holder(
386
28
        const TUniqueId& query_id, int file_cache_query_limit_percent) {
387
28
    SCOPED_CACHE_LOCK(_mutex, this);
388
28
    if (!config::enable_file_cache_query_limit) {
389
1
        return {};
390
1
    }
391
392
    /// if enable_filesystem_query_cache_limit is true,
393
    /// we create context query for current query.
394
27
    auto context = get_or_set_query_context(query_id, cache_lock, file_cache_query_limit_percent);
395
27
    return std::make_unique<QueryFileCacheContextHolder>(query_id, this, context);
396
28
}
397
398
BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_query_context(
399
3.55k
        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock) {
400
3.55k
    auto query_iter = _query_map.find(query_id);
401
3.55k
    return (query_iter == _query_map.end()) ? nullptr : query_iter->second;
402
3.55k
}
403
404
26
void BlockFileCache::remove_query_context(const TUniqueId& query_id) {
405
26
    SCOPED_CACHE_LOCK(_mutex, this);
406
26
    const auto& query_iter = _query_map.find(query_id);
407
408
26
    if (query_iter != _query_map.end() && query_iter->second.use_count() <= 1) {
409
25
        _query_map.erase(query_iter);
410
25
    }
411
26
}
412
413
BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_or_set_query_context(
414
        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock,
415
27
        int file_cache_query_limit_percent) {
416
27
    if (query_id.lo == 0 && query_id.hi == 0) {
417
1
        return nullptr;
418
1
    }
419
420
26
    auto context = get_query_context(query_id, cache_lock);
421
26
    if (context) {
422
1
        return context;
423
1
    }
424
425
25
    size_t file_cache_query_limit_size = _capacity * file_cache_query_limit_percent / 100;
426
25
    if (file_cache_query_limit_size < 268435456) {
427
25
        LOG(WARNING) << "The user-set file cache query limit (" << file_cache_query_limit_size
428
25
                     << " bytes) is less than the 256MB recommended minimum. "
429
25
                     << "Consider increasing the session variable 'file_cache_query_limit_percent'"
430
25
                     << " from its current value " << file_cache_query_limit_percent << "%.";
431
25
    }
432
25
    auto query_context = std::make_shared<QueryFileCacheContext>(file_cache_query_limit_size);
433
25
    auto query_iter = _query_map.emplace(query_id, query_context).first;
434
25
    return query_iter->second;
435
26
}
436
437
void BlockFileCache::QueryFileCacheContext::remove(const UInt128Wrapper& hash, size_t offset,
438
140
                                                   std::lock_guard<std::mutex>& cache_lock) {
439
140
    auto pair = std::make_pair(hash, offset);
440
140
    auto record = records.find(pair);
441
140
    DCHECK(record != records.end());
442
140
    auto iter = record->second;
443
140
    records.erase(pair);
444
140
    lru_queue.remove(iter, cache_lock);
445
140
}
446
447
void BlockFileCache::QueryFileCacheContext::reserve(const UInt128Wrapper& hash, size_t offset,
448
                                                    size_t size,
449
190
                                                    std::lock_guard<std::mutex>& cache_lock) {
450
190
    auto pair = std::make_pair(hash, offset);
451
190
    if (records.find(pair) == records.end()) {
452
189
        auto queue_iter = lru_queue.add(hash, offset, size, cache_lock);
453
189
        records.insert({pair, queue_iter});
454
189
    }
455
190
}
456
457
152
Status BlockFileCache::initialize() {
458
152
    SCOPED_CACHE_LOCK(_mutex, this);
459
152
    return initialize_unlocked(cache_lock);
460
152
}
461
462
152
Status BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lock) {
463
152
    DCHECK(!_is_initialized);
464
152
    _is_initialized = true;
465
152
    if (config::file_cache_background_lru_dump_tail_record_num > 0) {
466
        // requirements:
467
        // 1. restored data should not overwrite the last dump
468
        // 2. restore should happen before load and async load
469
        // 3. all queues should be restored sequencially to avoid conflict
470
        // TODO(zhengyu): we can parralize them but will increase complexity, so lets check the time cost
471
        // to see if any improvement is a necessary
472
152
        restore_lru_queues_from_disk(cache_lock);
473
152
    }
474
152
    RETURN_IF_ERROR(_storage->init(this));
475
476
152
    if (auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get())) {
477
133
        if (auto* meta_store = fs_storage->get_meta_store()) {
478
133
            _ttl_mgr = std::make_unique<BlockFileCacheTtlMgr>(this, meta_store);
479
133
        }
480
133
    }
481
482
152
    _cache_background_monitor_thread = std::thread(&BlockFileCache::run_background_monitor, this);
483
152
    _cache_background_gc_thread = std::thread(&BlockFileCache::run_background_gc, this);
484
152
    _cache_background_evict_in_advance_thread =
485
152
            std::thread(&BlockFileCache::run_background_evict_in_advance, this);
486
152
    _cache_background_block_lru_update_thread =
487
152
            std::thread(&BlockFileCache::run_background_block_lru_update, this);
488
489
    // Initialize LRU dump thread and restore queues
490
152
    _cache_background_lru_dump_thread = std::thread(&BlockFileCache::run_background_lru_dump, this);
491
152
    _cache_background_lru_log_replay_thread =
492
152
            std::thread(&BlockFileCache::run_background_lru_log_replay, this);
493
494
152
    return Status::OK();
495
152
}
496
497
void BlockFileCache::update_block_lru(FileBlockSPtr block,
498
7
                                      std::lock_guard<std::mutex>& cache_lock) {
499
7
    if (!block) {
500
1
        return;
501
1
    }
502
503
6
    FileBlockCell* cell = get_cell(block->get_hash_value(), block->offset(), cache_lock);
504
6
    if (!cell || cell->file_block.get() != block.get()) {
505
1
        return;
506
1
    }
507
508
5
    if (cell->queue_iterator) {
509
5
        auto& queue = get_queue(block->cache_type());
510
5
        queue.move_to_end(*cell->queue_iterator, cache_lock);
511
5
        _lru_recorder->record_queue_event(block->cache_type(), CacheLRULogType::MOVETOBACK,
512
5
                                          block->_key.hash, block->_key.offset,
513
5
                                          block->_block_range.size());
514
5
    }
515
5
    cell->update_atime();
516
5
}
517
518
void BlockFileCache::use_cell(const FileBlockCell& cell, FileBlocks* result, bool move_iter_flag,
519
727k
                              std::lock_guard<std::mutex>& cache_lock) {
520
727k
    if (result) {
521
727k
        result->push_back(cell.file_block);
522
727k
    }
523
524
727k
    auto& queue = get_queue(cell.file_block->cache_type());
525
    /// Move to the end of the queue. The iterator remains valid.
526
727k
    if (!config::enable_file_cache_async_touch_on_get_or_set && cell.queue_iterator &&
527
727k
        move_iter_flag) {
528
535k
        queue.move_to_end(*cell.queue_iterator, cache_lock);
529
535k
        _lru_recorder->record_queue_event(cell.file_block->cache_type(),
530
535k
                                          CacheLRULogType::MOVETOBACK, cell.file_block->_key.hash,
531
535k
                                          cell.file_block->_key.offset, cell.size());
532
535k
    }
533
534
727k
    cell.update_atime();
535
727k
}
536
537
template <class T>
538
    requires IsXLock<T>
539
FileBlockCell* BlockFileCache::get_cell(const UInt128Wrapper& hash, size_t offset,
540
8.01k
                                        T& /* cache_lock */) {
541
8.01k
    auto it = _files.find(hash);
542
8.01k
    if (it == _files.end()) {
543
3
        return nullptr;
544
3
    }
545
546
8.01k
    auto& offsets = it->second;
547
8.01k
    auto cell_it = offsets.find(offset);
548
8.01k
    if (cell_it == offsets.end()) {
549
3
        return nullptr;
550
3
    }
551
552
8.01k
    return &cell_it->second;
553
8.01k
}
554
555
919k
bool BlockFileCache::need_to_move(FileCacheType cell_type, FileCacheType query_type) const {
556
919k
    return query_type != FileCacheType::DISPOSABLE && cell_type != FileCacheType::DISPOSABLE;
557
919k
}
558
559
FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheContext& context,
560
                                    const FileBlock::Range& range,
561
740k
                                    std::lock_guard<std::mutex>& cache_lock) {
562
    /// Given range = [left, right] and non-overlapping ordered set of file blocks,
563
    /// find list [block1, ..., blockN] of blocks which intersect with given range.
564
740k
    auto it = _files.find(hash);
565
740k
    if (it == _files.end()) {
566
7.23k
        if (_async_open_done) {
567
7.22k
            return {};
568
7.22k
        }
569
1
        FileCacheKey key;
570
1
        key.hash = hash;
571
1
        key.meta.type = context.cache_type;
572
1
        key.meta.expiration_time = context.expiration_time;
573
1
        key.meta.tablet_id = context.tablet_id;
574
1
        _storage->load_blocks_directly_unlocked(this, key, cache_lock);
575
576
1
        it = _files.find(hash);
577
1
        if (it == _files.end()) [[unlikely]] {
578
1
            return {};
579
1
        }
580
1
    }
581
582
733k
    auto& file_blocks = it->second;
583
733k
    if (file_blocks.empty()) {
584
0
        LOG(WARNING) << "file_blocks is empty for hash=" << hash.to_string()
585
0
                     << " cache type=" << context.cache_type
586
0
                     << " cache expiration time=" << context.expiration_time
587
0
                     << " cache range=" << range.left << " " << range.right
588
0
                     << " query id=" << context.query_id;
589
0
        DCHECK(false);
590
0
        _files.erase(hash);
591
0
        return {};
592
0
    }
593
594
733k
    FileBlocks result;
595
733k
    auto block_it = file_blocks.lower_bound(range.left);
596
733k
    if (block_it == file_blocks.end()) {
597
        /// N - last cached block for given file hash, block{N}.offset < range.left:
598
        ///   block{N}                       block{N}
599
        /// [________                         [_______]
600
        ///     [__________]         OR                  [________]
601
        ///     ^                                        ^
602
        ///     range.left                               range.left
603
604
6.20k
        const auto& cell = file_blocks.rbegin()->second;
605
6.20k
        if (cell.file_block->range().right < range.left) {
606
6.19k
            return {};
607
6.19k
        }
608
609
14
        use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
610
14
                 cache_lock);
611
726k
    } else { /// block_it <-- segmment{k}
612
726k
        if (block_it != file_blocks.begin()) {
613
506
            const auto& prev_cell = std::prev(block_it)->second;
614
506
            const auto& prev_cell_range = prev_cell.file_block->range();
615
616
506
            if (range.left <= prev_cell_range.right) {
617
                ///   block{k-1}  block{k}
618
                ///   [________]   [_____
619
                ///       [___________
620
                ///       ^
621
                ///       range.left
622
623
137
                use_cell(prev_cell, &result,
624
137
                         need_to_move(prev_cell.file_block->cache_type(), context.cache_type),
625
137
                         cache_lock);
626
137
            }
627
506
        }
628
629
        ///  block{k} ...       block{k-1}  block{k}                      block{k}
630
        ///  [______              [______]     [____                        [________
631
        ///  [_________     OR              [________      OR    [______]   ^
632
        ///  ^                              ^                           ^   block{k}.offset
633
        ///  range.left                     range.left                  range.right
634
635
1.45M
        while (block_it != file_blocks.end()) {
636
728k
            const auto& cell = block_it->second;
637
728k
            if (range.right < cell.file_block->range().left) {
638
505
                break;
639
505
            }
640
641
727k
            use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
642
727k
                     cache_lock);
643
727k
            ++block_it;
644
727k
        }
645
726k
    }
646
647
726k
    return result;
648
733k
}
649
650
193k
void BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) {
651
193k
    if (_need_update_lru_blocks.insert(std::move(block))) {
652
1.54k
        *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
653
1.54k
    }
654
193k
}
655
656
3
std::string BlockFileCache::clear_file_cache_async() {
657
3
    LOG(INFO) << "start clear_file_cache_async, path=" << _cache_base_path;
658
3
    _lru_dumper->remove_lru_dump_files();
659
3
    int64_t num_cells_all = 0;
660
3
    int64_t num_cells_to_delete = 0;
661
3
    int64_t num_cells_wait_recycle = 0;
662
3
    int64_t num_files_all = 0;
663
3
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::clear_file_cache_async");
664
3
    {
665
3
        SCOPED_CACHE_LOCK(_mutex, this);
666
667
3
        std::vector<FileBlockCell*> deleting_cells;
668
4
        for (auto& [_, offset_to_cell] : _files) {
669
4
            ++num_files_all;
670
37
            for (auto& [_1, cell] : offset_to_cell) {
671
37
                ++num_cells_all;
672
37
                deleting_cells.push_back(&cell);
673
37
            }
674
4
        }
675
676
        // we cannot delete the element in the loop above, because it will break the iterator
677
37
        for (auto& cell : deleting_cells) {
678
37
            if (!cell->releasable()) {
679
2
                LOG(INFO) << "cell is not releasable, hash="
680
2
                          << " offset=" << cell->file_block->offset();
681
2
                cell->file_block->set_deleting();
682
2
                ++num_cells_wait_recycle;
683
2
                continue;
684
2
            }
685
35
            FileBlockSPtr file_block = cell->file_block;
686
35
            if (file_block) {
687
35
                std::lock_guard block_lock(file_block->_mutex);
688
35
                remove(file_block, cache_lock, block_lock, false);
689
35
                ++num_cells_to_delete;
690
35
            }
691
35
        }
692
3
        clear_need_update_lru_blocks();
693
3
    }
694
695
3
    std::stringstream ss;
696
3
    ss << "finish clear_file_cache_async, path=" << _cache_base_path
697
3
       << " num_files_all=" << num_files_all << " num_cells_all=" << num_cells_all
698
3
       << " num_cells_to_delete=" << num_cells_to_delete
699
3
       << " num_cells_wait_recycle=" << num_cells_wait_recycle;
700
3
    auto msg = ss.str();
701
3
    LOG(INFO) << msg;
702
3
    _lru_dumper->remove_lru_dump_files();
703
3
    return msg;
704
3
}
705
706
FileBlocks BlockFileCache::split_range_into_cells(const UInt128Wrapper& hash,
707
                                                  const CacheContext& context, size_t offset,
708
                                                  size_t size, FileBlock::State state,
709
13.8k
                                                  std::lock_guard<std::mutex>& cache_lock) {
710
13.8k
    DCHECK(size > 0);
711
712
13.8k
    auto current_pos = offset;
713
13.8k
    auto end_pos_non_included = offset + size;
714
715
13.8k
    size_t current_size = 0;
716
13.8k
    size_t remaining_size = size;
717
718
13.8k
    FileBlocks file_blocks;
719
28.6k
    while (current_pos < end_pos_non_included) {
720
14.7k
        current_size = std::min(remaining_size, _max_file_block_size);
721
14.7k
        remaining_size -= current_size;
722
14.7k
        state = try_reserve(hash, context, current_pos, current_size, cache_lock)
723
14.7k
                        ? state
724
14.7k
                        : FileBlock::State::SKIP_CACHE;
725
14.7k
        if (state == FileBlock::State::SKIP_CACHE) [[unlikely]] {
726
62
            FileCacheKey key;
727
62
            key.hash = hash;
728
62
            key.offset = current_pos;
729
62
            key.meta.type = context.cache_type;
730
62
            key.meta.expiration_time = context.expiration_time;
731
62
            key.meta.tablet_id = context.tablet_id;
732
62
            auto file_block = std::make_shared<FileBlock>(key, current_size, this,
733
62
                                                          FileBlock::State::SKIP_CACHE);
734
62
            file_blocks.push_back(std::move(file_block));
735
14.7k
        } else {
736
14.7k
            auto* cell = add_cell(hash, context, current_pos, current_size, state, cache_lock);
737
14.7k
            if (cell) {
738
14.7k
                file_blocks.push_back(cell->file_block);
739
14.7k
                if (!context.is_cold_data) {
740
14.7k
                    cell->update_atime();
741
14.7k
                }
742
14.7k
            }
743
14.7k
            if (_ttl_mgr && context.tablet_id != 0) {
744
2.35k
                _ttl_mgr->register_tablet_id(context.tablet_id);
745
2.35k
            }
746
14.7k
        }
747
748
14.7k
        current_pos += current_size;
749
14.7k
    }
750
751
13.8k
    DCHECK(file_blocks.empty() || offset + size - 1 == file_blocks.back()->range().right);
752
13.8k
    return file_blocks;
753
13.8k
}
754
755
void BlockFileCache::fill_holes_with_empty_file_blocks(FileBlocks& file_blocks,
756
                                                       const UInt128Wrapper& hash,
757
                                                       const CacheContext& context,
758
                                                       const FileBlock::Range& range,
759
726k
                                                       std::lock_guard<std::mutex>& cache_lock) {
760
    /// There are blocks [block1, ..., blockN]
761
    /// (non-overlapping, non-empty, ascending-ordered) which (maybe partially)
762
    /// intersect with given range.
763
764
    /// It can have holes:
765
    /// [____________________]         -- requested range
766
    ///     [____]  [_]   [_________]  -- intersecting cache [block1, ..., blockN]
767
    ///
768
    /// For each such hole create a cell with file block state EMPTY.
769
770
726k
    auto it = file_blocks.begin();
771
726k
    auto block_range = (*it)->range();
772
773
726k
    size_t current_pos = 0;
774
726k
    if (block_range.left < range.left) {
775
        ///    [_______     -- requested range
776
        /// [_______
777
        /// ^
778
        /// block1
779
780
151
        current_pos = block_range.right + 1;
781
151
        ++it;
782
726k
    } else {
783
726k
        current_pos = range.left;
784
726k
    }
785
786
1.45M
    while (current_pos <= range.right && it != file_blocks.end()) {
787
727k
        block_range = (*it)->range();
788
789
727k
        if (current_pos == block_range.left) {
790
727k
            current_pos = block_range.right + 1;
791
727k
            ++it;
792
727k
            continue;
793
727k
        }
794
795
727k
        DCHECK(current_pos < block_range.left);
796
797
161
        auto hole_size = block_range.left - current_pos;
798
799
161
        file_blocks.splice(it, split_range_into_cells(hash, context, current_pos, hole_size,
800
161
                                                      FileBlock::State::EMPTY, cache_lock));
801
802
161
        current_pos = block_range.right + 1;
803
161
        ++it;
804
161
    }
805
806
726k
    if (current_pos <= range.right) {
807
        ///   ________]     -- requested range
808
        ///   _____]
809
        ///        ^
810
        /// blockN
811
812
113
        auto hole_size = range.right - current_pos + 1;
813
814
113
        file_blocks.splice(file_blocks.end(),
815
113
                           split_range_into_cells(hash, context, current_pos, hole_size,
816
113
                                                  FileBlock::State::EMPTY, cache_lock));
817
113
    }
818
726k
}
819
820
FileBlocksHolder BlockFileCache::get_or_set(const UInt128Wrapper& hash, size_t offset, size_t size,
821
740k
                                            CacheContext& context) {
822
740k
    FileBlock::Range range(offset, offset + size - 1);
823
824
740k
    ReadStatistics* stats = context.stats;
825
740k
    DCHECK(stats != nullptr);
826
740k
    MonotonicStopWatch sw;
827
740k
    sw.start();
828
740k
    FileBlocks file_blocks;
829
740k
    std::vector<FileBlockSPtr> need_update_lru_blocks;
830
740k
    const bool async_touch_on_get_or_set = config::enable_file_cache_async_touch_on_get_or_set;
831
740k
    int64_t duration = 0;
832
740k
    {
833
740k
        ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->increment();
834
740k
        std::lock_guard cache_lock(_mutex);
835
740k
        ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->decrement();
836
740k
        stats->lock_wait_timer += sw.elapsed_time();
837
740k
        SCOPED_RAW_TIMER(&duration);
838
        /// Get all blocks which intersect with the given range.
839
740k
        {
840
740k
            SCOPED_RAW_TIMER(&stats->get_timer);
841
740k
            file_blocks = get_impl(hash, context, range, cache_lock);
842
740k
        }
843
844
740k
        if (file_blocks.empty()) {
845
13.5k
            SCOPED_RAW_TIMER(&stats->set_timer);
846
13.5k
            file_blocks = split_range_into_cells(hash, context, offset, size,
847
13.5k
                                                 FileBlock::State::EMPTY, cache_lock);
848
726k
        } else {
849
726k
            SCOPED_RAW_TIMER(&stats->set_timer);
850
726k
            fill_holes_with_empty_file_blocks(file_blocks, hash, context, range, cache_lock);
851
726k
        }
852
740k
        DCHECK(!file_blocks.empty());
853
740k
        *_num_read_blocks << file_blocks.size();
854
740k
        if (!context.is_warmup) {
855
740k
            *_no_warmup_num_read_blocks << file_blocks.size();
856
740k
        }
857
740k
        if (async_touch_on_get_or_set) {
858
193k
            need_update_lru_blocks.reserve(file_blocks.size());
859
193k
        }
860
742k
        for (auto& block : file_blocks) {
861
742k
            size_t block_size = block->range().size();
862
742k
            *_total_read_size_metrics << block_size;
863
742k
            if (block->state_unsafe() == FileBlock::State::DOWNLOADED) {
864
727k
                *_num_hit_blocks << 1;
865
727k
                *_total_hit_size_metrics << block_size;
866
727k
                if (!context.is_warmup) {
867
727k
                    *_no_warmup_num_hit_blocks << 1;
868
727k
                }
869
727k
                if (async_touch_on_get_or_set &&
870
727k
                    need_to_move(block->cache_type(), context.cache_type)) {
871
192k
                    need_update_lru_blocks.emplace_back(block);
872
192k
                }
873
727k
            }
874
742k
        }
875
740k
    }
876
877
740k
    if (async_touch_on_get_or_set) {
878
193k
        for (auto& block : need_update_lru_blocks) {
879
192k
            add_need_update_lru_block(std::move(block));
880
192k
        }
881
193k
    }
882
883
740k
    *_get_or_set_latency_us << (duration / 1000);
884
740k
    return FileBlocksHolder(std::move(file_blocks));
885
740k
}
886
887
FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& hash, const CacheContext& context,
888
                                        size_t offset, size_t size, FileBlock::State state,
889
14.9k
                                        std::lock_guard<std::mutex>& cache_lock) {
890
    /// Create a file block cell and put it in `files` map by [hash][offset].
891
14.9k
    if (size == 0) {
892
0
        return nullptr; /// Empty files are not cached.
893
0
    }
894
895
14.9k
    VLOG_DEBUG << "Adding file block to cache. size=" << size << " hash=" << hash.to_string()
896
0
               << " offset=" << offset << " cache_type=" << cache_type_to_string(context.cache_type)
897
0
               << " expiration_time=" << context.expiration_time
898
0
               << " tablet_id=" << context.tablet_id;
899
900
14.9k
    if (size > 1024 * 1024 * 1024) {
901
1
        LOG(WARNING) << "File block size is too large for a block, reject. size=" << size
902
1
                     << " hash=" << hash.to_string() << " offset=" << offset
903
1
                     << " stack:" << get_stack_trace();
904
1
        return nullptr;
905
1
    }
906
907
14.9k
    auto& offsets = _files[hash];
908
14.9k
    auto itr = offsets.find(offset);
909
14.9k
    if (itr != offsets.end()) {
910
5
        VLOG_DEBUG << "Cache already exists for hash: " << hash.to_string()
911
0
                   << ", offset: " << offset << ", size: " << size
912
0
                   << ".\nCurrent cache structure: " << dump_structure_unlocked(hash, cache_lock);
913
5
        return &(itr->second);
914
5
    }
915
916
14.9k
    FileCacheKey key;
917
14.9k
    key.hash = hash;
918
14.9k
    key.offset = offset;
919
14.9k
    key.meta.type = context.cache_type;
920
14.9k
    key.meta.expiration_time = context.expiration_time;
921
14.9k
    key.meta.tablet_id = context.tablet_id;
922
14.9k
    FileBlockCell cell(std::make_shared<FileBlock>(key, size, this, state), cache_lock);
923
14.9k
    Status st;
924
14.9k
    if (context.expiration_time == 0 && context.cache_type == FileCacheType::TTL) {
925
0
        st = cell.file_block->change_cache_type_lock(FileCacheType::NORMAL, cache_lock);
926
14.9k
    } else if (context.cache_type != FileCacheType::TTL && context.expiration_time != 0) {
927
1
        st = cell.file_block->change_cache_type_lock(FileCacheType::TTL, cache_lock);
928
1
    }
929
14.9k
    if (!st.ok()) {
930
0
        LOG(WARNING) << "Cannot change cache type. expiration_time=" << context.expiration_time
931
0
                     << " cache_type=" << cache_type_to_string(context.cache_type)
932
0
                     << " error=" << st.msg();
933
0
    }
934
935
14.9k
    auto& queue = get_queue(cell.file_block->cache_type());
936
14.9k
    cell.queue_iterator = queue.add(hash, offset, size, cache_lock);
937
14.9k
    _lru_recorder->record_queue_event(cell.file_block->cache_type(), CacheLRULogType::ADD,
938
14.9k
                                      cell.file_block->get_hash_value(), cell.file_block->offset(),
939
14.9k
                                      cell.size());
940
941
14.9k
    if (cell.file_block->cache_type() == FileCacheType::TTL) {
942
3.92k
        _cur_ttl_size += cell.size();
943
3.92k
    }
944
14.9k
    auto [it, _] = offsets.insert(std::make_pair(offset, std::move(cell)));
945
14.9k
    _cur_cache_size += size;
946
14.9k
    return &(it->second);
947
14.9k
}
948
949
0
size_t BlockFileCache::try_release() {
950
0
    SCOPED_CACHE_LOCK(_mutex, this);
951
0
    std::vector<FileBlockCell*> trash;
952
0
    for (auto& [hash, blocks] : _files) {
953
0
        for (auto& [offset, cell] : blocks) {
954
0
            if (cell.releasable()) {
955
0
                trash.emplace_back(&cell);
956
0
            } else {
957
0
                cell.file_block->set_deleting();
958
0
            }
959
0
        }
960
0
    }
961
0
    size_t remove_size = 0;
962
0
    for (auto& cell : trash) {
963
0
        FileBlockSPtr file_block = cell->file_block;
964
0
        std::lock_guard lc(cell->file_block->_mutex);
965
0
        remove_size += file_block->range().size();
966
0
        remove(file_block, cache_lock, lc);
967
0
        VLOG_DEBUG << "try_release " << _cache_base_path
968
0
                   << " hash=" << file_block->get_hash_value().to_string()
969
0
                   << " offset=" << file_block->offset();
970
0
    }
971
0
    *_evict_by_try_release << remove_size;
972
0
    LOG(INFO) << "Released " << trash.size() << " blocks in file cache " << _cache_base_path;
973
0
    return trash.size();
974
0
}
975
976
783k
LRUQueue& BlockFileCache::get_queue(FileCacheType type) {
977
783k
    switch (type) {
978
15.8k
    case FileCacheType::INDEX:
979
15.8k
        return _index_queue;
980
16.5k
    case FileCacheType::DISPOSABLE:
981
16.5k
        return _disposable_queue;
982
745k
    case FileCacheType::NORMAL:
983
745k
        return _normal_queue;
984
5.85k
    case FileCacheType::TTL:
985
5.85k
        return _ttl_queue;
986
0
    default:
987
0
        DCHECK(false);
988
783k
    }
989
0
    return _normal_queue;
990
783k
}
991
992
140
const LRUQueue& BlockFileCache::get_queue(FileCacheType type) const {
993
140
    switch (type) {
994
33
    case FileCacheType::INDEX:
995
33
        return _index_queue;
996
1
    case FileCacheType::DISPOSABLE:
997
1
        return _disposable_queue;
998
106
    case FileCacheType::NORMAL:
999
106
        return _normal_queue;
1000
0
    case FileCacheType::TTL:
1001
0
        return _ttl_queue;
1002
0
    default:
1003
0
        DCHECK(false);
1004
140
    }
1005
0
    return _normal_queue;
1006
140
}
1007
1008
void BlockFileCache::remove_file_blocks(std::vector<FileBlockCell*>& to_evict,
1009
                                        std::lock_guard<std::mutex>& cache_lock, bool sync,
1010
81.6k
                                        std::string& reason) {
1011
81.6k
    auto remove_file_block_if = [&](FileBlockCell* cell) {
1012
1.72k
        FileBlockSPtr file_block = cell->file_block;
1013
1.72k
        if (file_block) {
1014
1.72k
            std::lock_guard block_lock(file_block->_mutex);
1015
1.72k
            remove(file_block, cache_lock, block_lock, sync);
1016
1.72k
            VLOG_DEBUG << "remove_file_blocks"
1017
0
                       << " hash=" << file_block->get_hash_value().to_string()
1018
0
                       << " offset=" << file_block->offset() << " reason=" << reason;
1019
1.72k
        }
1020
1.72k
    };
1021
81.6k
    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
1022
81.6k
}
1023
1024
void BlockFileCache::find_evict_candidates(LRUQueue& queue, size_t size, size_t cur_cache_size,
1025
                                           size_t& removed_size,
1026
                                           std::vector<FileBlockCell*>& to_evict,
1027
                                           std::lock_guard<std::mutex>& cache_lock,
1028
1.37k
                                           size_t& cur_removed_size, bool evict_in_advance) {
1029
3.46k
    for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1030
3.46k
        if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1031
1.28k
            break;
1032
1.28k
        }
1033
2.17k
        auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1034
1035
2.17k
        DCHECK(cell) << "Cache became inconsistent. key: " << entry_key.to_string()
1036
0
                     << ", offset: " << entry_offset;
1037
1038
2.17k
        size_t cell_size = cell->size();
1039
2.17k
        DCHECK(entry_size == cell_size);
1040
1041
2.17k
        if (cell->releasable()) {
1042
1.70k
            auto& file_block = cell->file_block;
1043
1044
1.70k
            std::lock_guard block_lock(file_block->_mutex);
1045
1.70k
            DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1046
1.70k
            to_evict.push_back(cell);
1047
1.70k
            removed_size += cell_size;
1048
1.70k
            cur_removed_size += cell_size;
1049
1.70k
        }
1050
2.17k
    }
1051
1.37k
}
1052
1053
// 1. if async load file cache not finish
1054
//     a. evict from lru queue
1055
// 2. if ttl cache
1056
//     a. evict from disposable/normal/index queue one by one
1057
// 3. if dont reach query limit or dont have query limit
1058
//     a. evict from other queue
1059
//     b. evict from current queue
1060
//         a.1 if the data belong write, then just evict cold data
1061
// 4. if reach query limit
1062
//     a. evict from query queue
1063
//     b. evict from other queue
1064
bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const CacheContext& context,
1065
                                 size_t offset, size_t size,
1066
14.7k
                                 std::lock_guard<std::mutex>& cache_lock) {
1067
14.7k
    if (!_async_open_done) {
1068
4
        return try_reserve_during_async_load(size, cache_lock);
1069
4
    }
1070
    // use this strategy in scenarios where there is insufficient disk capacity or insufficient number of inodes remaining
1071
    // directly eliminate 5 times the size of the space
1072
14.7k
    if (_disk_resource_limit_mode) {
1073
1
        size = 5 * size;
1074
1
    }
1075
1076
14.7k
    auto query_context = config::enable_file_cache_query_limit &&
1077
14.7k
                                         (context.query_id.hi != 0 || context.query_id.lo != 0)
1078
14.7k
                                 ? get_query_context(context.query_id, cache_lock)
1079
14.7k
                                 : nullptr;
1080
14.7k
    if (!query_context) {
1081
14.5k
        return try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock);
1082
14.5k
    } else if (query_context->get_cache_size(cache_lock) + size <=
1083
193
               query_context->get_max_cache_size()) {
1084
60
        return try_reserve_for_lru(hash, query_context, context, offset, size, cache_lock);
1085
60
    }
1086
133
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
1087
133
                               std::chrono::steady_clock::now().time_since_epoch())
1088
133
                               .count();
1089
133
    auto& queue = get_queue(context.cache_type);
1090
133
    size_t removed_size = 0;
1091
133
    size_t ghost_remove_size = 0;
1092
133
    size_t queue_size = queue.get_capacity(cache_lock);
1093
133
    size_t cur_cache_size = _cur_cache_size;
1094
133
    size_t query_context_cache_size = query_context->get_cache_size(cache_lock);
1095
1096
133
    std::vector<LRUQueue::Iterator> ghost;
1097
133
    std::vector<FileBlockCell*> to_evict;
1098
1099
133
    size_t max_size = queue.get_max_size();
1100
1.32k
    auto is_overflow = [&] {
1101
1.32k
        return _disk_resource_limit_mode ? removed_size < size
1102
1.32k
                                         : cur_cache_size + size - removed_size > _capacity ||
1103
1.32k
                                                   (queue_size + size - removed_size > max_size) ||
1104
1.32k
                                                   (query_context_cache_size + size -
1105
1.31k
                                                            (removed_size + ghost_remove_size) >
1106
1.31k
                                                    query_context->get_max_cache_size());
1107
1.32k
    };
1108
1109
    /// Select the cache from the LRU queue held by query for expulsion.
1110
1.25k
    for (auto iter = query_context->queue().begin(); iter != query_context->queue().end(); iter++) {
1111
1.19k
        if (!is_overflow()) {
1112
69
            break;
1113
69
        }
1114
1115
1.12k
        auto* cell = get_cell(iter->hash, iter->offset, cache_lock);
1116
1117
1.12k
        if (!cell) {
1118
            /// The cache corresponding to this record may be swapped out by
1119
            /// other queries, so it has become invalid.
1120
4
            ghost.push_back(iter);
1121
4
            ghost_remove_size += iter->size;
1122
1.12k
        } else {
1123
1.12k
            size_t cell_size = cell->size();
1124
1.12k
            DCHECK(iter->size == cell_size);
1125
1126
1.12k
            if (cell->releasable()) {
1127
136
                auto& file_block = cell->file_block;
1128
1129
136
                std::lock_guard block_lock(file_block->_mutex);
1130
136
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1131
136
                to_evict.push_back(cell);
1132
136
                removed_size += cell_size;
1133
136
            }
1134
1.12k
        }
1135
1.12k
    }
1136
1137
136
    auto remove_file_block_if = [&](FileBlockCell* cell) {
1138
136
        FileBlockSPtr file_block = cell->file_block;
1139
136
        if (file_block) {
1140
136
            query_context->remove(file_block->get_hash_value(), file_block->offset(), cache_lock);
1141
136
            std::lock_guard block_lock(file_block->_mutex);
1142
136
            remove(file_block, cache_lock, block_lock);
1143
136
        }
1144
136
    };
1145
1146
133
    for (auto& iter : ghost) {
1147
4
        query_context->remove(iter->hash, iter->offset, cache_lock);
1148
4
    }
1149
1150
133
    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
1151
1152
133
    if (is_overflow() &&
1153
133
        !try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock)) {
1154
1
        return false;
1155
1
    }
1156
132
    query_context->reserve(hash, offset, size, cache_lock);
1157
132
    return true;
1158
133
}
1159
1160
22
void BlockFileCache::try_evict_in_advance(size_t size, std::lock_guard<std::mutex>& cache_lock) {
1161
22
    UInt128Wrapper hash = UInt128Wrapper();
1162
22
    size_t offset = 0;
1163
22
    CacheContext context;
1164
    /* we pick NORMAL and TTL cache to evict in advance
1165
     * we reserve for them but won't acutually give space to them
1166
     * on the contrary, NORMAL and TTL may sacrifice by LRU evicting themselves
1167
     * other cache types cannot be exempted because we will evict what they have stolen before LRU evicting
1168
     * in summary: all cache types will shrink somewhat, and NORMAL and TTL shrink the most, to make sure the cache is not full
1169
     */
1170
22
    context.cache_type = FileCacheType::NORMAL;
1171
22
    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
1172
22
    context.cache_type = FileCacheType::TTL;
1173
22
    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
1174
22
}
1175
1176
// remove specific cache synchronously, for critical operations
1177
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
1178
8
void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) {
1179
8
    std::string reason = "remove_if_cached";
1180
8
    SCOPED_CACHE_LOCK(_mutex, this);
1181
8
    auto iter = _files.find(file_key);
1182
8
    std::vector<FileBlockCell*> to_remove;
1183
8
    if (iter != _files.end()) {
1184
14
        for (auto& [_, cell] : iter->second) {
1185
14
            if (cell.releasable()) {
1186
13
                to_remove.push_back(&cell);
1187
13
            } else {
1188
1
                cell.file_block->set_deleting();
1189
1
            }
1190
14
        }
1191
6
    }
1192
8
    remove_file_blocks(to_remove, cache_lock, true, reason);
1193
8
}
1194
1195
// the async version of remove_if_cached, for background operations
1196
// cache meta is deleted synchronously if not in use, and the block file is deleted asynchronously
1197
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
1198
65.5k
void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) {
1199
65.5k
    std::string reason = "remove_if_cached_async";
1200
65.5k
    SCOPED_CACHE_LOCK(_mutex, this);
1201
1202
65.5k
    auto iter = _files.find(file_key);
1203
65.5k
    std::vector<FileBlockCell*> to_remove;
1204
65.5k
    if (iter != _files.end()) {
1205
1
        for (auto& [_, cell] : iter->second) {
1206
1
            *_gc_evict_bytes_metrics << cell.size();
1207
1
            *_gc_evict_count_metrics << 1;
1208
1
            if (cell.releasable()) {
1209
0
                to_remove.push_back(&cell);
1210
1
            } else {
1211
1
                cell.file_block->set_deleting();
1212
1
            }
1213
1
        }
1214
1
    }
1215
65.5k
    remove_file_blocks(to_remove, cache_lock, false, reason);
1216
65.5k
}
1217
1218
std::vector<FileCacheType> BlockFileCache::get_other_cache_type_without_ttl(
1219
14.7k
        FileCacheType cur_cache_type) {
1220
14.7k
    switch (cur_cache_type) {
1221
3.93k
    case FileCacheType::TTL:
1222
3.93k
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
1223
671
    case FileCacheType::INDEX:
1224
671
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL};
1225
8.76k
    case FileCacheType::NORMAL:
1226
8.76k
        return {FileCacheType::DISPOSABLE, FileCacheType::INDEX};
1227
1.38k
    case FileCacheType::DISPOSABLE:
1228
1.38k
        return {FileCacheType::NORMAL, FileCacheType::INDEX};
1229
0
    default:
1230
0
        return {};
1231
14.7k
    }
1232
0
    return {};
1233
14.7k
}
1234
1235
1.28k
std::vector<FileCacheType> BlockFileCache::get_other_cache_type(FileCacheType cur_cache_type) {
1236
1.28k
    switch (cur_cache_type) {
1237
304
    case FileCacheType::TTL:
1238
304
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
1239
143
    case FileCacheType::INDEX:
1240
143
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::TTL};
1241
684
    case FileCacheType::NORMAL:
1242
684
        return {FileCacheType::DISPOSABLE, FileCacheType::INDEX, FileCacheType::TTL};
1243
152
    case FileCacheType::DISPOSABLE:
1244
152
        return {FileCacheType::NORMAL, FileCacheType::INDEX, FileCacheType::TTL};
1245
0
    default:
1246
0
        return {};
1247
1.28k
    }
1248
0
    return {};
1249
1.28k
}
1250
1251
void BlockFileCache::reset_range(const UInt128Wrapper& hash, size_t offset, size_t old_size,
1252
5
                                 size_t new_size, std::lock_guard<std::mutex>& cache_lock) {
1253
5
    DCHECK(_files.find(hash) != _files.end() &&
1254
5
           _files.find(hash)->second.find(offset) != _files.find(hash)->second.end());
1255
5
    FileBlockCell* cell = get_cell(hash, offset, cache_lock);
1256
5
    DCHECK(cell != nullptr);
1257
5
    if (cell == nullptr) {
1258
0
        LOG(WARNING) << "reset_range skipped because cache cell is missing. hash="
1259
0
                     << hash.to_string() << " offset=" << offset << " old_size=" << old_size
1260
0
                     << " new_size=" << new_size;
1261
0
        return;
1262
0
    }
1263
5
    DCHECK_EQ(cell->file_block->_block_range.size(), old_size);
1264
5
    if (cell->queue_iterator) {
1265
5
        auto& queue = get_queue(cell->file_block->cache_type());
1266
5
        DCHECK(queue.contains(hash, offset, cache_lock));
1267
5
        queue.resize(*cell->queue_iterator, new_size, cache_lock);
1268
5
        _lru_recorder->record_queue_event(cell->file_block->cache_type(), CacheLRULogType::RESIZE,
1269
5
                                          cell->file_block->get_hash_value(),
1270
5
                                          cell->file_block->offset(), new_size);
1271
5
    }
1272
5
    cell->file_block->_block_range.right = cell->file_block->_block_range.left + new_size - 1;
1273
5
    _cur_cache_size -= old_size;
1274
5
    _cur_cache_size += new_size;
1275
5
    if (cell->file_block->cache_type() == FileCacheType::TTL) {
1276
0
        _cur_ttl_size -= old_size;
1277
0
        _cur_ttl_size += new_size;
1278
0
    }
1279
5
}
1280
1281
bool BlockFileCache::try_reserve_from_other_queue_by_time_interval(
1282
        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
1283
14.7k
        int64_t cur_time, std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
1284
14.7k
    size_t removed_size = 0;
1285
14.7k
    size_t cur_cache_size = _cur_cache_size;
1286
14.7k
    std::vector<FileBlockCell*> to_evict;
1287
33.4k
    for (FileCacheType cache_type : other_cache_types) {
1288
33.4k
        auto& queue = get_queue(cache_type);
1289
33.4k
        size_t remove_size_per_type = 0;
1290
33.4k
        for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1291
2.02k
            if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1292
1.24k
                break;
1293
1.24k
            }
1294
787
            auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1295
787
            DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
1296
0
                         << ", offset: " << entry_offset;
1297
1298
787
            size_t cell_size = cell->size();
1299
787
            DCHECK(entry_size == cell_size);
1300
1301
787
            if (cell->atime == 0 ? true : cell->atime + queue.get_hot_data_interval() > cur_time) {
1302
785
                break;
1303
785
            }
1304
1305
2
            if (cell->releasable()) {
1306
2
                auto& file_block = cell->file_block;
1307
2
                std::lock_guard block_lock(file_block->_mutex);
1308
2
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1309
2
                to_evict.push_back(cell);
1310
2
                removed_size += cell_size;
1311
2
                remove_size_per_type += cell_size;
1312
2
            }
1313
2
        }
1314
33.4k
        *(_evict_by_time_metrics_matrix[cache_type][cur_type]) << remove_size_per_type;
1315
33.4k
    }
1316
14.7k
    bool is_sync_removal = !evict_in_advance;
1317
14.7k
    std::string reason = std::string("try_reserve_by_time ") +
1318
14.7k
                         " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1319
14.7k
    remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1320
1321
14.7k
    return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
1322
14.7k
}
1323
1324
bool BlockFileCache::is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size,
1325
21.5k
                                 bool evict_in_advance) const {
1326
21.5k
    bool ret = false;
1327
21.5k
    if (evict_in_advance) { // we don't need to check _need_evict_cache_in_advance
1328
555
        ret = (removed_size < need_size);
1329
555
        return ret;
1330
555
    }
1331
21.0k
    if (_disk_resource_limit_mode) {
1332
4
        ret = (removed_size < need_size);
1333
20.9k
    } else {
1334
20.9k
        ret = (cur_cache_size + need_size - removed_size > _capacity);
1335
20.9k
    }
1336
21.0k
    return ret;
1337
21.5k
}
1338
1339
bool BlockFileCache::try_reserve_from_other_queue_by_size(
1340
        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
1341
455
        std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
1342
455
    size_t removed_size = 0;
1343
455
    size_t cur_cache_size = _cur_cache_size;
1344
455
    std::vector<FileBlockCell*> to_evict;
1345
    // we follow the privilege defined in get_other_cache_types to evict
1346
1.36k
    for (FileCacheType cache_type : other_cache_types) {
1347
1.36k
        auto& queue = get_queue(cache_type);
1348
1349
        // we will not drain each of them to the bottom -- i.e., we only
1350
        // evict what they have stolen.
1351
1.36k
        size_t cur_queue_size = queue.get_capacity(cache_lock);
1352
1.36k
        size_t cur_queue_max_size = queue.get_max_size();
1353
1.36k
        if (cur_queue_size <= cur_queue_max_size) {
1354
854
            continue;
1355
854
        }
1356
511
        size_t cur_removed_size = 0;
1357
511
        find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
1358
511
                              cur_removed_size, evict_in_advance);
1359
511
        *(_evict_by_size_metrics_matrix[cache_type][cur_type]) << cur_removed_size;
1360
511
    }
1361
455
    bool is_sync_removal = !evict_in_advance;
1362
455
    std::string reason = std::string("try_reserve_by_size") +
1363
455
                         " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1364
455
    remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1365
455
    return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
1366
455
}
1367
1368
bool BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type, size_t size,
1369
                                                  int64_t cur_time,
1370
                                                  std::lock_guard<std::mutex>& cache_lock,
1371
14.7k
                                                  bool evict_in_advance) {
1372
    // currently, TTL cache is not considered as a candidate
1373
14.7k
    auto other_cache_types = get_other_cache_type_without_ttl(cur_cache_type);
1374
14.7k
    bool reserve_success = try_reserve_from_other_queue_by_time_interval(
1375
14.7k
            cur_cache_type, other_cache_types, size, cur_time, cache_lock, evict_in_advance);
1376
14.7k
    if (reserve_success || !config::file_cache_enable_evict_from_other_queue_by_size) {
1377
13.4k
        return reserve_success;
1378
13.4k
    }
1379
1380
1.28k
    other_cache_types = get_other_cache_type(cur_cache_type);
1381
1.28k
    auto& cur_queue = get_queue(cur_cache_type);
1382
1.28k
    size_t cur_queue_size = cur_queue.get_capacity(cache_lock);
1383
1.28k
    size_t cur_queue_max_size = cur_queue.get_max_size();
1384
    // Hit the soft limit by self, cannot remove from other queues
1385
1.28k
    if (_cur_cache_size + size > _capacity && cur_queue_size + size > cur_queue_max_size) {
1386
828
        return false;
1387
828
    }
1388
453
    return try_reserve_from_other_queue_by_size(cur_cache_type, other_cache_types, size, cache_lock,
1389
453
                                                evict_in_advance);
1390
1.28k
}
1391
1392
bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash,
1393
                                         QueryFileCacheContextPtr query_context,
1394
                                         const CacheContext& context, size_t offset, size_t size,
1395
                                         std::lock_guard<std::mutex>& cache_lock,
1396
14.6k
                                         bool evict_in_advance) {
1397
14.6k
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
1398
14.6k
                               std::chrono::steady_clock::now().time_since_epoch())
1399
14.6k
                               .count();
1400
14.6k
    if (!try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock,
1401
14.6k
                                      evict_in_advance)) {
1402
864
        auto& queue = get_queue(context.cache_type);
1403
864
        size_t removed_size = 0;
1404
864
        size_t cur_cache_size = _cur_cache_size;
1405
1406
864
        std::vector<FileBlockCell*> to_evict;
1407
864
        size_t cur_removed_size = 0;
1408
864
        find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
1409
864
                              cur_removed_size, evict_in_advance);
1410
864
        bool is_sync_removal = !evict_in_advance;
1411
864
        std::string reason = std::string("try_reserve for cache type ") +
1412
864
                             cache_type_to_string(context.cache_type) +
1413
864
                             " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1414
864
        remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1415
864
        *(_evict_by_self_lru_metrics_matrix[context.cache_type]) << cur_removed_size;
1416
1417
864
        if (is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1418
91
            return false;
1419
91
        }
1420
864
    }
1421
1422
14.5k
    if (query_context) {
1423
60
        query_context->reserve(hash, offset, size, cache_lock);
1424
60
    }
1425
14.5k
    return true;
1426
14.6k
}
1427
1428
template <class T, class U>
1429
    requires IsXLock<T> && IsXLock<U>
1430
3.90k
void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lock, bool sync) {
1431
3.90k
    auto hash = file_block->get_hash_value();
1432
3.90k
    auto offset = file_block->offset();
1433
3.90k
    auto type = file_block->cache_type();
1434
3.90k
    auto expiration_time = file_block->expiration_time();
1435
3.90k
    auto tablet_id = file_block->tablet_id();
1436
3.90k
    auto* cell = get_cell(hash, offset, cache_lock);
1437
3.90k
    file_block->cell = nullptr;
1438
    // Holder cleanup can race with prior cache metadata cleanup. In that case,
1439
    // skip the duplicate remove instead of touching a detached or replaced cell.
1440
3.90k
    if (cell == nullptr) {
1441
1
        LOG(WARNING) << "remove skipped because cache cell is missing. hash=" << hash.to_string()
1442
1
                     << " offset=" << offset << " size=" << file_block->range().size()
1443
1
                     << " type=" << cache_type_to_string(type)
1444
1
                     << " state=" << FileBlock::state_to_string(file_block->state_unsafe())
1445
1
                     << " expiration_time=" << expiration_time << " sync=" << sync;
1446
1
        return;
1447
1
    }
1448
3.89k
    if (cell->file_block.get() != file_block.get()) {
1449
1
        auto* cell_file_block = cell->file_block.get();
1450
1
        LOG(WARNING)
1451
1
                << "remove skipped because cache cell points to a different file block. hash="
1452
1
                << hash.to_string() << " offset=" << offset
1453
1
                << " size=" << file_block->range().size() << " type=" << cache_type_to_string(type)
1454
1
                << " state=" << FileBlock::state_to_string(file_block->state_unsafe())
1455
1
                << " expiration_time=" << expiration_time << " sync=" << sync << " cell_block_hash="
1456
1
                << (cell_file_block ? cell_file_block->get_hash_value().to_string() : "<null>")
1457
1
                << " cell_block_offset="
1458
1
                << (cell_file_block ? std::to_string(cell_file_block->offset()) : "<null>")
1459
1
                << " cell_block_size="
1460
1
                << (cell_file_block ? std::to_string(cell_file_block->range().size()) : "<null>")
1461
1
                << " cell_block_type="
1462
1
                << (cell_file_block ? cache_type_to_string(cell_file_block->cache_type())
1463
1
                                    : "<null>")
1464
1
                << " cell_block_state="
1465
1
                << (cell_file_block ? FileBlock::state_to_string(cell_file_block->state_unsafe())
1466
1
                                    : "<null>");
1467
1
        return;
1468
1
    }
1469
3.89k
    DCHECK(cell->queue_iterator);
1470
3.89k
    if (cell->queue_iterator) {
1471
3.89k
        auto& queue = get_queue(file_block->cache_type());
1472
3.89k
        queue.remove(*cell->queue_iterator, cache_lock);
1473
3.89k
        _lru_recorder->record_queue_event(file_block->cache_type(), CacheLRULogType::REMOVE,
1474
3.89k
                                          cell->file_block->get_hash_value(),
1475
3.89k
                                          cell->file_block->offset(), cell->size());
1476
3.89k
    }
1477
3.89k
    *_queue_evict_size_metrics[static_cast<int>(file_block->cache_type())]
1478
3.89k
            << file_block->range().size();
1479
3.89k
    *_total_evict_size_metrics << file_block->range().size();
1480
1481
3.89k
    VLOG_DEBUG << "Removing file block from cache. hash: " << hash.to_string()
1482
0
               << ", offset: " << offset << ", size: " << file_block->range().size()
1483
0
               << ", type: " << cache_type_to_string(type);
1484
1485
3.89k
    if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADED) {
1486
1.90k
        FileCacheKey key;
1487
1.90k
        key.hash = hash;
1488
1.90k
        key.offset = offset;
1489
1.90k
        key.meta.type = type;
1490
1.90k
        key.meta.expiration_time = expiration_time;
1491
1.90k
        key.meta.tablet_id = tablet_id;
1492
1.90k
        if (sync) {
1493
1.50k
            int64_t duration_ns = 0;
1494
1.50k
            Status st;
1495
1.50k
            {
1496
1.50k
                SCOPED_RAW_TIMER(&duration_ns);
1497
1.50k
                st = _storage->remove(key);
1498
1.50k
            }
1499
1.50k
            *_storage_sync_remove_latency_us << (duration_ns / 1000);
1500
1.50k
            if (!st.ok()) {
1501
0
                LOG_WARNING("").error(st);
1502
0
            }
1503
1.50k
        } else {
1504
            // the file will be deleted in the bottom half
1505
            // so there will be a window that the file is not in the cache but still in the storage
1506
            // but it's ok, because the rowset is stale already
1507
407
            bool ret = _recycle_keys.enqueue(key);
1508
407
            if (ret) [[likely]] {
1509
407
                *_recycle_keys_length_recorder << _recycle_keys.size_approx();
1510
407
            } else {
1511
0
                LOG_WARNING("Failed to push recycle key to queue, do it synchronously");
1512
0
                int64_t duration_ns = 0;
1513
0
                Status st;
1514
0
                {
1515
0
                    SCOPED_RAW_TIMER(&duration_ns);
1516
0
                    st = _storage->remove(key);
1517
0
                }
1518
0
                *_storage_retry_sync_remove_latency_us << (duration_ns / 1000);
1519
0
                if (!st.ok()) {
1520
0
                    LOG_WARNING("").error(st);
1521
0
                }
1522
0
            }
1523
407
        }
1524
1.98k
    } else if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADING) {
1525
0
        file_block->set_deleting();
1526
0
        return;
1527
0
    }
1528
3.89k
    _cur_cache_size -= file_block->range().size();
1529
3.89k
    if (FileCacheType::TTL == type) {
1530
1.29k
        _cur_ttl_size -= file_block->range().size();
1531
1.29k
    }
1532
3.89k
    auto it = _files.find(hash);
1533
3.89k
    if (it != _files.end()) {
1534
3.89k
        it->second.erase(file_block->offset());
1535
3.89k
        if (it->second.empty()) {
1536
1.39k
            _files.erase(hash);
1537
1.39k
        }
1538
3.89k
    }
1539
3.89k
    *_num_removed_blocks << 1;
1540
3.89k
}
1541
1542
46
size_t BlockFileCache::get_used_cache_size(FileCacheType cache_type) const {
1543
46
    SCOPED_CACHE_LOCK(_mutex, this);
1544
46
    return get_used_cache_size_unlocked(cache_type, cache_lock);
1545
46
}
1546
1547
size_t BlockFileCache::get_used_cache_size_unlocked(FileCacheType cache_type,
1548
46
                                                    std::lock_guard<std::mutex>& cache_lock) const {
1549
46
    return get_queue(cache_type).get_capacity(cache_lock);
1550
46
}
1551
1552
0
size_t BlockFileCache::get_available_cache_size(FileCacheType cache_type) const {
1553
0
    SCOPED_CACHE_LOCK(_mutex, this);
1554
0
    return get_available_cache_size_unlocked(cache_type, cache_lock);
1555
0
}
1556
1557
size_t BlockFileCache::get_available_cache_size_unlocked(
1558
0
        FileCacheType cache_type, std::lock_guard<std::mutex>& cache_lock) const {
1559
0
    return get_queue(cache_type).get_max_element_size() -
1560
0
           get_used_cache_size_unlocked(cache_type, cache_lock);
1561
0
}
1562
1563
91
size_t BlockFileCache::get_file_blocks_num(FileCacheType cache_type) const {
1564
91
    SCOPED_CACHE_LOCK(_mutex, this);
1565
91
    return get_file_blocks_num_unlocked(cache_type, cache_lock);
1566
91
}
1567
1568
size_t BlockFileCache::get_file_blocks_num_unlocked(FileCacheType cache_type,
1569
91
                                                    std::lock_guard<std::mutex>& cache_lock) const {
1570
91
    return get_queue(cache_type).get_elements_num(cache_lock);
1571
91
}
1572
1573
FileBlockCell::FileBlockCell(FileBlockSPtr file_block, std::lock_guard<std::mutex>& cache_lock)
1574
14.9k
        : file_block(file_block) {
1575
14.9k
    file_block->cell = this;
1576
    /**
1577
     * Cell can be created with either DOWNLOADED or EMPTY file block's state.
1578
     * File block acquires DOWNLOADING state and creates LRUQueue iterator on first
1579
     * successful getOrSetDownaloder call.
1580
     */
1581
1582
14.9k
    switch (file_block->_download_state) {
1583
198
    case FileBlock::State::DOWNLOADED:
1584
14.9k
    case FileBlock::State::EMPTY:
1585
14.9k
    case FileBlock::State::SKIP_CACHE: {
1586
14.9k
        break;
1587
14.9k
    }
1588
0
    default:
1589
0
        DCHECK(false) << "Can create cell with either EMPTY, DOWNLOADED, SKIP_CACHE state, got: "
1590
0
                      << FileBlock::state_to_string(file_block->_download_state);
1591
14.9k
    }
1592
14.9k
    if (file_block->cache_type() == FileCacheType::TTL) {
1593
3.92k
        update_atime();
1594
3.92k
    }
1595
14.9k
}
1596
1597
LRUQueue::Iterator LRUQueue::add(const UInt128Wrapper& hash, size_t offset, size_t size,
1598
22.4k
                                 std::lock_guard<std::mutex>& /* cache_lock */) {
1599
22.4k
    cache_size += size;
1600
22.4k
    auto iter = queue.insert(queue.end(), FileKeyAndOffset(hash, offset, size));
1601
22.4k
    map.insert(std::make_pair(std::make_pair(hash, offset), iter));
1602
22.4k
    return iter;
1603
22.4k
}
1604
1605
0
void LRUQueue::remove_all(std::lock_guard<std::mutex>& /* cache_lock */) {
1606
0
    queue.clear();
1607
0
    map.clear();
1608
0
    cache_size = 0;
1609
0
}
1610
1611
816k
void LRUQueue::move_to_end(Iterator queue_it, std::lock_guard<std::mutex>& /* cache_lock */) {
1612
816k
    queue.splice(queue.end(), queue, queue_it);
1613
816k
}
1614
1615
void LRUQueue::resize(Iterator queue_it, size_t new_size,
1616
6
                      std::lock_guard<std::mutex>& /* cache_lock */) {
1617
6
    cache_size -= queue_it->size;
1618
6
    queue_it->size = new_size;
1619
6
    cache_size += new_size;
1620
6
}
1621
bool LRUQueue::contains(const UInt128Wrapper& hash, size_t offset,
1622
5
                        std::lock_guard<std::mutex>& /* cache_lock */) const {
1623
5
    return map.find(std::make_pair(hash, offset)) != map.end();
1624
5
}
1625
1626
LRUQueue::Iterator LRUQueue::get(const UInt128Wrapper& hash, size_t offset,
1627
532k
                                 std::lock_guard<std::mutex>& /* cache_lock */) const {
1628
532k
    auto itr = map.find(std::make_pair(hash, offset));
1629
532k
    if (itr != map.end()) {
1630
284k
        return itr->second;
1631
284k
    }
1632
248k
    return std::list<FileKeyAndOffset>::iterator();
1633
532k
}
1634
1635
2
std::string LRUQueue::to_string(std::lock_guard<std::mutex>& /* cache_lock */) const {
1636
2
    std::string result;
1637
18
    for (const auto& [hash, offset, size] : queue) {
1638
18
        if (!result.empty()) {
1639
16
            result += ", ";
1640
16
        }
1641
18
        result += fmt::format("{}: [{}, {}]", hash.to_string(), offset, offset + size - 1);
1642
18
    }
1643
2
    return result;
1644
2
}
1645
1646
size_t LRUQueue::levenshtein_distance_from(LRUQueue& base,
1647
5
                                           std::lock_guard<std::mutex>& cache_lock) {
1648
5
    std::list<FileKeyAndOffset> target_queue = this->queue;
1649
5
    std::list<FileKeyAndOffset> base_queue = base.queue;
1650
5
    std::vector<FileKeyAndOffset> vec1(target_queue.begin(), target_queue.end());
1651
5
    std::vector<FileKeyAndOffset> vec2(base_queue.begin(), base_queue.end());
1652
1653
5
    size_t m = vec1.size();
1654
5
    size_t n = vec2.size();
1655
1656
    // Create a 2D vector (matrix) to store the Levenshtein distances
1657
    // dp[i][j] will hold the distance between the first i elements of vec1 and the first j elements of vec2
1658
5
    std::vector<std::vector<size_t>> dp(m + 1, std::vector<size_t>(n + 1, 0));
1659
1660
    // Initialize the first row and column of the matrix
1661
    // The distance between an empty list and a list of length k is k (all insertions or deletions)
1662
22
    for (size_t i = 0; i <= m; ++i) {
1663
17
        dp[i][0] = i;
1664
17
    }
1665
20
    for (size_t j = 0; j <= n; ++j) {
1666
15
        dp[0][j] = j;
1667
15
    }
1668
1669
    // Fill the matrix using dynamic programming
1670
17
    for (size_t i = 1; i <= m; ++i) {
1671
38
        for (size_t j = 1; j <= n; ++j) {
1672
            // Check if the current elements of both vectors are equal
1673
26
            size_t cost = (vec1[i - 1].hash == vec2[j - 1].hash &&
1674
26
                           vec1[i - 1].offset == vec2[j - 1].offset)
1675
26
                                  ? 0
1676
26
                                  : 1;
1677
            // Calculate the minimum cost of three possible operations:
1678
            // 1. Insertion: dp[i][j-1] + 1
1679
            // 2. Deletion: dp[i-1][j] + 1
1680
            // 3. Substitution: dp[i-1][j-1] + cost (0 if elements are equal, 1 if not)
1681
26
            dp[i][j] = std::min({dp[i - 1][j] + 1, dp[i][j - 1] + 1, dp[i - 1][j - 1] + cost});
1682
26
        }
1683
12
    }
1684
    // The bottom-right cell of the matrix contains the Levenshtein distance
1685
5
    return dp[m][n];
1686
5
}
1687
1688
1
std::string BlockFileCache::dump_structure(const UInt128Wrapper& hash) {
1689
1
    SCOPED_CACHE_LOCK(_mutex, this);
1690
1
    return dump_structure_unlocked(hash, cache_lock);
1691
1
}
1692
1693
std::string BlockFileCache::dump_structure_unlocked(const UInt128Wrapper& hash,
1694
1
                                                    std::lock_guard<std::mutex>&) {
1695
1
    std::stringstream result;
1696
1
    auto it = _files.find(hash);
1697
1
    if (it == _files.end()) {
1698
0
        return std::string("");
1699
0
    }
1700
1
    const auto& cells_by_offset = it->second;
1701
1702
1
    for (const auto& [_, cell] : cells_by_offset) {
1703
1
        result << cell.file_block->get_info_for_log() << " "
1704
1
               << cache_type_to_string(cell.file_block->cache_type()) << "\n";
1705
1
    }
1706
1707
1
    return result.str();
1708
1
}
1709
1710
0
std::string BlockFileCache::dump_single_cache_type(const UInt128Wrapper& hash, size_t offset) {
1711
0
    SCOPED_CACHE_LOCK(_mutex, this);
1712
0
    return dump_single_cache_type_unlocked(hash, offset, cache_lock);
1713
0
}
1714
1715
std::string BlockFileCache::dump_single_cache_type_unlocked(const UInt128Wrapper& hash,
1716
                                                            size_t offset,
1717
0
                                                            std::lock_guard<std::mutex>&) {
1718
0
    std::stringstream result;
1719
0
    auto it = _files.find(hash);
1720
0
    if (it == _files.end()) {
1721
0
        return std::string("");
1722
0
    }
1723
0
    const auto& cells_by_offset = it->second;
1724
0
    const auto& cell = cells_by_offset.find(offset);
1725
1726
0
    return cache_type_to_string(cell->second.file_block->cache_type());
1727
0
}
1728
1729
void BlockFileCache::change_cache_type(const UInt128Wrapper& hash, size_t offset,
1730
                                       FileCacheType new_type,
1731
9
                                       std::lock_guard<std::mutex>& cache_lock) {
1732
9
    if (auto iter = _files.find(hash); iter != _files.end()) {
1733
9
        auto& file_blocks = iter->second;
1734
9
        if (auto cell_it = file_blocks.find(offset); cell_it != file_blocks.end()) {
1735
8
            FileBlockCell& cell = cell_it->second;
1736
8
            auto& cur_queue = get_queue(cell.file_block->cache_type());
1737
8
            DCHECK(cell.queue_iterator.has_value());
1738
8
            cur_queue.remove(*cell.queue_iterator, cache_lock);
1739
8
            _lru_recorder->record_queue_event(
1740
8
                    cell.file_block->cache_type(), CacheLRULogType::REMOVE,
1741
8
                    cell.file_block->get_hash_value(), cell.file_block->offset(), cell.size());
1742
8
            auto& new_queue = get_queue(new_type);
1743
8
            cell.queue_iterator =
1744
8
                    new_queue.add(hash, offset, cell.file_block->range().size(), cache_lock);
1745
8
            _lru_recorder->record_queue_event(new_type, CacheLRULogType::ADD,
1746
8
                                              cell.file_block->get_hash_value(),
1747
8
                                              cell.file_block->offset(), cell.size());
1748
8
        }
1749
9
    }
1750
9
}
1751
1752
// @brief: get a path's disk capacity used percent, inode used percent
1753
// @param: path
1754
// @param: percent.first disk used percent, percent.second inode used percent
1755
6.00k
int disk_used_percentage(const std::string& path, std::pair<int, int>* percent) {
1756
6.00k
    struct statfs stat;
1757
6.00k
    int ret = statfs(path.c_str(), &stat);
1758
6.00k
    if (ret != 0) {
1759
2
        return ret;
1760
2
    }
1761
    // https://github.com/coreutils/coreutils/blob/master/src/df.c#L1195
1762
    // v->used = stat.f_blocks - stat.f_bfree
1763
    // nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail
1764
6.00k
    uintmax_t u100 = (stat.f_blocks - stat.f_bfree) * 100;
1765
6.00k
    uintmax_t nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail;
1766
6.00k
    int capacity_percentage = int(u100 / nonroot_total + (u100 % nonroot_total != 0));
1767
1768
6.00k
    unsigned long long inode_free = stat.f_ffree;
1769
6.00k
    unsigned long long inode_total = stat.f_files;
1770
6.00k
    int inode_percentage = cast_set<int>(inode_free * 100 / inode_total);
1771
6.00k
    percent->first = capacity_percentage;
1772
6.00k
    percent->second = 100 - inode_percentage;
1773
1774
    // Add sync point for testing
1775
6.00k
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::disk_used_percentage:1", percent);
1776
1777
6.00k
    return 0;
1778
6.00k
}
1779
1780
10
std::string BlockFileCache::reset_capacity(size_t new_capacity) {
1781
10
    using namespace std::chrono;
1782
10
    int64_t space_released = 0;
1783
10
    size_t old_capacity = 0;
1784
10
    std::stringstream ss;
1785
10
    ss << "finish reset_capacity, path=" << _cache_base_path;
1786
10
    auto adjust_start_time = steady_clock::time_point();
1787
10
    {
1788
10
        SCOPED_CACHE_LOCK(_mutex, this);
1789
10
        if (new_capacity < _capacity && new_capacity < _cur_cache_size) {
1790
1
            int64_t need_remove_size = _cur_cache_size - new_capacity;
1791
4
            auto remove_blocks = [&](LRUQueue& queue) -> int64_t {
1792
4
                int64_t queue_released = 0;
1793
4
                std::vector<FileBlockCell*> to_evict;
1794
13
                for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1795
13
                    if (need_remove_size <= 0) {
1796
1
                        break;
1797
1
                    }
1798
12
                    need_remove_size -= entry_size;
1799
12
                    space_released += entry_size;
1800
12
                    queue_released += entry_size;
1801
12
                    auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1802
12
                    if (!cell->releasable()) {
1803
0
                        cell->file_block->set_deleting();
1804
0
                        continue;
1805
0
                    }
1806
12
                    to_evict.push_back(cell);
1807
12
                }
1808
12
                for (auto& cell : to_evict) {
1809
12
                    FileBlockSPtr file_block = cell->file_block;
1810
12
                    std::lock_guard block_lock(file_block->_mutex);
1811
12
                    remove(file_block, cache_lock, block_lock);
1812
12
                }
1813
4
                return queue_released;
1814
4
            };
1815
1
            int64_t queue_released = remove_blocks(_disposable_queue);
1816
1
            ss << " disposable_queue released " << queue_released;
1817
1
            queue_released = remove_blocks(_normal_queue);
1818
1
            ss << " normal_queue released " << queue_released;
1819
1
            queue_released = remove_blocks(_index_queue);
1820
1
            ss << " index_queue released " << queue_released;
1821
1
            queue_released = remove_blocks(_ttl_queue);
1822
1
            ss << " ttl_queue released " << queue_released;
1823
1824
1
            _disk_resource_limit_mode = true;
1825
1
            _disk_limit_mode_metrics->set_value(1);
1826
1
            ss << " total_space_released=" << space_released;
1827
1
        }
1828
10
        old_capacity = _capacity;
1829
10
        _capacity = new_capacity;
1830
10
        _cache_capacity_metrics->set_value(_capacity);
1831
10
    }
1832
10
    auto use_time = duration_cast<milliseconds>(steady_clock::time_point() - adjust_start_time);
1833
10
    LOG(INFO) << "Finish tag deleted block. path=" << _cache_base_path
1834
10
              << " use_time=" << cast_set<int64_t>(use_time.count());
1835
10
    ss << " old_capacity=" << old_capacity << " new_capacity=" << new_capacity;
1836
10
    LOG(INFO) << ss.str();
1837
10
    return ss.str();
1838
10
}
1839
1840
3.85k
void BlockFileCache::check_disk_resource_limit() {
1841
3.85k
    if (_storage->get_type() != FileCacheStorageType::DISK) {
1842
788
        return;
1843
788
    }
1844
1845
3.06k
    bool previous_mode = _disk_resource_limit_mode;
1846
3.06k
    if (_capacity > _cur_cache_size) {
1847
3.06k
        _disk_resource_limit_mode = false;
1848
3.06k
        _disk_limit_mode_metrics->set_value(0);
1849
3.06k
    }
1850
3.06k
    std::pair<int, int> percent;
1851
3.06k
    int ret = disk_used_percentage(_cache_base_path, &percent);
1852
3.06k
    if (ret != 0) {
1853
1
        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
1854
1
        return;
1855
1
    }
1856
3.06k
    auto [space_percentage, inode_percentage] = percent;
1857
6.13k
    auto is_insufficient = [](const int& percentage) {
1858
6.13k
        return percentage >= config::file_cache_enter_disk_resource_limit_mode_percent;
1859
6.13k
    };
1860
3.06k
    DCHECK_GE(space_percentage, 0);
1861
3.06k
    DCHECK_LE(space_percentage, 100);
1862
3.06k
    DCHECK_GE(inode_percentage, 0);
1863
3.06k
    DCHECK_LE(inode_percentage, 100);
1864
    // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
1865
    // FIXME: reject with config validator
1866
3.06k
    if (config::file_cache_enter_disk_resource_limit_mode_percent <
1867
3.06k
        config::file_cache_exit_disk_resource_limit_mode_percent) {
1868
1
        LOG_WARNING("config error, set to default value")
1869
1
                .tag("enter", config::file_cache_enter_disk_resource_limit_mode_percent)
1870
1
                .tag("exit", config::file_cache_exit_disk_resource_limit_mode_percent);
1871
1
        config::file_cache_enter_disk_resource_limit_mode_percent = 88;
1872
1
        config::file_cache_exit_disk_resource_limit_mode_percent = 80;
1873
1
    }
1874
3.06k
    bool is_space_insufficient = is_insufficient(space_percentage);
1875
3.06k
    bool is_inode_insufficient = is_insufficient(inode_percentage);
1876
3.06k
    if (is_space_insufficient || is_inode_insufficient) {
1877
5
        _disk_resource_limit_mode = true;
1878
5
        _disk_limit_mode_metrics->set_value(1);
1879
3.06k
    } else if (_disk_resource_limit_mode &&
1880
3.06k
               (space_percentage < config::file_cache_exit_disk_resource_limit_mode_percent) &&
1881
3.06k
               (inode_percentage < config::file_cache_exit_disk_resource_limit_mode_percent)) {
1882
0
        _disk_resource_limit_mode = false;
1883
0
        _disk_limit_mode_metrics->set_value(0);
1884
0
    }
1885
3.06k
    if (previous_mode != _disk_resource_limit_mode) {
1886
        // add log for disk resource limit mode switching
1887
10
        if (_disk_resource_limit_mode) {
1888
5
            LOG(WARNING) << "Entering disk resource limit mode: file_cache=" << get_base_path()
1889
5
                         << " space_percent=" << space_percentage
1890
5
                         << " inode_percent=" << inode_percentage
1891
5
                         << " is_space_insufficient=" << is_space_insufficient
1892
5
                         << " is_inode_insufficient=" << is_inode_insufficient
1893
5
                         << " enter threshold="
1894
5
                         << config::file_cache_enter_disk_resource_limit_mode_percent;
1895
5
        } else {
1896
5
            LOG(INFO) << "Exiting disk resource limit mode: file_cache=" << get_base_path()
1897
5
                      << " space_percent=" << space_percentage
1898
5
                      << " inode_percent=" << inode_percentage << " exit threshold="
1899
5
                      << config::file_cache_exit_disk_resource_limit_mode_percent;
1900
5
        }
1901
3.05k
    } else if (_disk_resource_limit_mode) {
1902
        // print log for disk resource limit mode running, but less frequently
1903
0
        LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path()
1904
0
                                 << " space_percent=" << space_percentage
1905
0
                                 << " inode_percent=" << inode_percentage
1906
0
                                 << " is_space_insufficient=" << is_space_insufficient
1907
0
                                 << " is_inode_insufficient=" << is_inode_insufficient
1908
0
                                 << " mode run in resource limit";
1909
0
    }
1910
3.06k
}
1911
1912
2.93k
void BlockFileCache::check_need_evict_cache_in_advance() {
1913
2.93k
    if (_storage->get_type() != FileCacheStorageType::DISK) {
1914
1
        return;
1915
1
    }
1916
1917
2.93k
    std::pair<int, int> percent;
1918
2.93k
    int ret = disk_used_percentage(_cache_base_path, &percent);
1919
2.93k
    if (ret != 0) {
1920
1
        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
1921
1
        return;
1922
1
    }
1923
2.93k
    auto [space_percentage, inode_percentage] = percent;
1924
2.93k
    int size_percentage = static_cast<int>(_cur_cache_size * 100 / _capacity);
1925
8.80k
    auto is_insufficient = [](const int& percentage) {
1926
8.80k
        return percentage >= config::file_cache_enter_need_evict_cache_in_advance_percent;
1927
8.80k
    };
1928
2.93k
    DCHECK_GE(space_percentage, 0);
1929
2.93k
    DCHECK_LE(space_percentage, 100);
1930
2.93k
    DCHECK_GE(inode_percentage, 0);
1931
2.93k
    DCHECK_LE(inode_percentage, 100);
1932
    // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
1933
    // FIXME: reject with config validator
1934
2.93k
    if (config::file_cache_enter_need_evict_cache_in_advance_percent <=
1935
2.93k
        config::file_cache_exit_need_evict_cache_in_advance_percent) {
1936
1
        LOG_WARNING("config error, set to default value")
1937
1
                .tag("enter", config::file_cache_enter_need_evict_cache_in_advance_percent)
1938
1
                .tag("exit", config::file_cache_exit_need_evict_cache_in_advance_percent);
1939
1
        config::file_cache_enter_need_evict_cache_in_advance_percent = 78;
1940
1
        config::file_cache_exit_need_evict_cache_in_advance_percent = 75;
1941
1
    }
1942
2.93k
    bool previous_mode = _need_evict_cache_in_advance;
1943
2.93k
    bool is_space_insufficient = is_insufficient(space_percentage);
1944
2.93k
    bool is_inode_insufficient = is_insufficient(inode_percentage);
1945
2.93k
    bool is_size_insufficient = is_insufficient(size_percentage);
1946
2.93k
    if (is_space_insufficient || is_inode_insufficient || is_size_insufficient) {
1947
13
        _need_evict_cache_in_advance = true;
1948
13
        _need_evict_cache_in_advance_metrics->set_value(1);
1949
2.92k
    } else if (_need_evict_cache_in_advance &&
1950
2.92k
               (space_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
1951
2.92k
               (inode_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
1952
2.92k
               (size_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent)) {
1953
5
        _need_evict_cache_in_advance = false;
1954
5
        _need_evict_cache_in_advance_metrics->set_value(0);
1955
5
    }
1956
2.93k
    if (previous_mode != _need_evict_cache_in_advance) {
1957
        // add log for evict cache in advance mode switching
1958
14
        if (_need_evict_cache_in_advance) {
1959
9
            LOG(WARNING) << "Entering evict cache in advance mode: "
1960
9
                         << "file_cache=" << get_base_path()
1961
9
                         << " space_percent=" << space_percentage
1962
9
                         << " inode_percent=" << inode_percentage
1963
9
                         << " size_percent=" << size_percentage
1964
9
                         << " is_space_insufficient=" << is_space_insufficient
1965
9
                         << " is_inode_insufficient=" << is_inode_insufficient
1966
9
                         << " is_size_insufficient=" << is_size_insufficient << " enter threshold="
1967
9
                         << config::file_cache_enter_need_evict_cache_in_advance_percent;
1968
9
        } else {
1969
5
            LOG(INFO) << "Exiting evict cache in advance mode: "
1970
5
                      << "file_cache=" << get_base_path() << " space_percent=" << space_percentage
1971
5
                      << " inode_percent=" << inode_percentage
1972
5
                      << " size_percent=" << size_percentage << " exit threshold="
1973
5
                      << config::file_cache_exit_need_evict_cache_in_advance_percent;
1974
5
        }
1975
2.92k
    } else if (_need_evict_cache_in_advance) {
1976
        // print log for evict cache in advance mode running, but less frequently
1977
4
        LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path()
1978
1
                                 << " space_percent=" << space_percentage
1979
1
                                 << " inode_percent=" << inode_percentage
1980
1
                                 << " size_percent=" << size_percentage
1981
1
                                 << " is_space_insufficient=" << is_space_insufficient
1982
1
                                 << " is_inode_insufficient=" << is_inode_insufficient
1983
1
                                 << " is_size_insufficient=" << is_size_insufficient
1984
1
                                 << " need evict cache in advance";
1985
4
    }
1986
2.93k
}
1987
1988
152
void BlockFileCache::run_background_monitor() {
1989
152
    Thread::set_self_name("run_background_monitor");
1990
3.85k
    while (!_close) {
1991
3.85k
        int64_t interval_ms = config::file_cache_background_monitor_interval_ms;
1992
3.85k
        TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_ms);
1993
3.85k
        check_disk_resource_limit();
1994
3.85k
        if (config::enable_evict_file_cache_in_advance) {
1995
2.93k
            check_need_evict_cache_in_advance();
1996
2.93k
        } else {
1997
925
            _need_evict_cache_in_advance = false;
1998
925
            _need_evict_cache_in_advance_metrics->set_value(0);
1999
925
        }
2000
2001
3.85k
        {
2002
3.85k
            std::unique_lock close_lock(_close_mtx);
2003
3.85k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2004
3.85k
            if (_close) {
2005
150
                break;
2006
150
            }
2007
3.85k
        }
2008
        // report
2009
3.70k
        {
2010
3.70k
            SCOPED_CACHE_LOCK(_mutex, this);
2011
3.70k
            _cur_cache_size_metrics->set_value(_cur_cache_size);
2012
3.70k
            _cur_ttl_cache_size_metrics->set_value(_cur_cache_size -
2013
3.70k
                                                   _index_queue.get_capacity(cache_lock) -
2014
3.70k
                                                   _normal_queue.get_capacity(cache_lock) -
2015
3.70k
                                                   _disposable_queue.get_capacity(cache_lock));
2016
3.70k
            _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(
2017
3.70k
                    _ttl_queue.get_capacity(cache_lock));
2018
3.70k
            _cur_ttl_cache_lru_queue_element_count_metrics->set_value(
2019
3.70k
                    _ttl_queue.get_elements_num(cache_lock));
2020
3.70k
            _cur_normal_queue_cache_size_metrics->set_value(_normal_queue.get_capacity(cache_lock));
2021
3.70k
            _cur_normal_queue_element_count_metrics->set_value(
2022
3.70k
                    _normal_queue.get_elements_num(cache_lock));
2023
3.70k
            _cur_index_queue_cache_size_metrics->set_value(_index_queue.get_capacity(cache_lock));
2024
3.70k
            _cur_index_queue_element_count_metrics->set_value(
2025
3.70k
                    _index_queue.get_elements_num(cache_lock));
2026
3.70k
            _cur_disposable_queue_cache_size_metrics->set_value(
2027
3.70k
                    _disposable_queue.get_capacity(cache_lock));
2028
3.70k
            _cur_disposable_queue_element_count_metrics->set_value(
2029
3.70k
                    _disposable_queue.get_elements_num(cache_lock));
2030
2031
            // Update meta store write queue size if storage is FSFileCacheStorage
2032
3.70k
            if (_storage->get_type() == FileCacheStorageType::DISK) {
2033
2.93k
                auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get());
2034
2.93k
                if (fs_storage != nullptr) {
2035
2.93k
                    auto* meta_store = fs_storage->get_meta_store();
2036
2.93k
                    if (meta_store != nullptr) {
2037
2.93k
                        _meta_store_write_queue_size_metrics->set_value(
2038
2.93k
                                meta_store->get_write_queue_size());
2039
2.93k
                    }
2040
2.93k
                }
2041
2.93k
            }
2042
2043
3.70k
            if (_num_read_blocks->get_value() > 0) {
2044
1.48k
                _hit_ratio->set_value((double)_num_hit_blocks->get_value() /
2045
1.48k
                                      (double)_num_read_blocks->get_value());
2046
1.48k
            }
2047
3.70k
            if (_num_read_blocks_5m && _num_read_blocks_5m->get_value() > 0) {
2048
838
                _hit_ratio_5m->set_value((double)_num_hit_blocks_5m->get_value() /
2049
838
                                         (double)_num_read_blocks_5m->get_value());
2050
838
            }
2051
3.70k
            if (_num_read_blocks_1h && _num_read_blocks_1h->get_value() > 0) {
2052
1.46k
                _hit_ratio_1h->set_value((double)_num_hit_blocks_1h->get_value() /
2053
1.46k
                                         (double)_num_read_blocks_1h->get_value());
2054
1.46k
            }
2055
2056
3.70k
            if (_no_warmup_num_hit_blocks->get_value() > 0) {
2057
1.46k
                _no_warmup_hit_ratio->set_value((double)_no_warmup_num_hit_blocks->get_value() /
2058
1.46k
                                                (double)_no_warmup_num_read_blocks->get_value());
2059
1.46k
            }
2060
3.70k
            if (_no_warmup_num_hit_blocks_5m && _no_warmup_num_hit_blocks_5m->get_value() > 0) {
2061
834
                _no_warmup_hit_ratio_5m->set_value(
2062
834
                        (double)_no_warmup_num_hit_blocks_5m->get_value() /
2063
834
                        (double)_no_warmup_num_read_blocks_5m->get_value());
2064
834
            }
2065
3.70k
            if (_no_warmup_num_hit_blocks_1h && _no_warmup_num_hit_blocks_1h->get_value() > 0) {
2066
1.46k
                _no_warmup_hit_ratio_1h->set_value(
2067
1.46k
                        (double)_no_warmup_num_hit_blocks_1h->get_value() /
2068
1.46k
                        (double)_no_warmup_num_read_blocks_1h->get_value());
2069
1.46k
            }
2070
3.70k
        }
2071
3.70k
    }
2072
152
}
2073
2074
152
void BlockFileCache::run_background_gc() {
2075
152
    Thread::set_self_name("run_background_gc");
2076
152
    FileCacheKey key;
2077
152
    size_t batch_count = 0;
2078
146k
    while (!_close) {
2079
146k
        int64_t interval_ms = config::file_cache_background_gc_interval_ms;
2080
146k
        size_t batch_limit = config::file_cache_remove_block_qps_limit * interval_ms / 1000;
2081
146k
        {
2082
146k
            std::unique_lock close_lock(_close_mtx);
2083
146k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2084
146k
            if (_close) {
2085
150
                break;
2086
150
            }
2087
146k
        }
2088
2089
146k
        while (batch_count < batch_limit && _recycle_keys.try_dequeue(key)) {
2090
369
            int64_t duration_ns = 0;
2091
369
            Status st;
2092
369
            {
2093
369
                SCOPED_RAW_TIMER(&duration_ns);
2094
369
                st = _storage->remove(key);
2095
369
            }
2096
369
            *_storage_async_remove_latency_us << (duration_ns / 1000);
2097
2098
369
            if (!st.ok()) {
2099
0
                LOG_WARNING("").error(st);
2100
0
            }
2101
369
            batch_count++;
2102
369
        }
2103
146k
        *_recycle_keys_length_recorder << _recycle_keys.size_approx();
2104
146k
        batch_count = 0;
2105
146k
    }
2106
152
}
2107
2108
152
void BlockFileCache::run_background_evict_in_advance() {
2109
152
    Thread::set_self_name("run_background_evict_in_advance");
2110
152
    LOG(INFO) << "Starting background evict in advance thread";
2111
152
    int64_t batch = 0;
2112
18.7k
    while (!_close) {
2113
18.7k
        {
2114
18.7k
            std::unique_lock close_lock(_close_mtx);
2115
18.7k
            _close_cv.wait_for(
2116
18.7k
                    close_lock,
2117
18.7k
                    std::chrono::milliseconds(config::file_cache_evict_in_advance_interval_ms));
2118
18.7k
            if (_close) {
2119
150
                LOG(INFO) << "Background evict in advance thread exiting due to cache closing";
2120
150
                break;
2121
150
            }
2122
18.7k
        }
2123
18.5k
        batch = config::file_cache_evict_in_advance_batch_bytes;
2124
2125
        // Skip if eviction not needed or too many pending recycles
2126
18.5k
        if (!_need_evict_cache_in_advance ||
2127
18.5k
            _recycle_keys.size_approx() >=
2128
18.5k
                    config::file_cache_evict_in_advance_recycle_keys_num_threshold) {
2129
18.5k
            continue;
2130
18.5k
        }
2131
2132
24
        int64_t duration_ns = 0;
2133
24
        {
2134
24
            SCOPED_CACHE_LOCK(_mutex, this);
2135
24
            SCOPED_RAW_TIMER(&duration_ns);
2136
24
            try_evict_in_advance(batch, cache_lock);
2137
24
        }
2138
24
        *_evict_in_advance_latency_us << (duration_ns / 1000);
2139
24
    }
2140
152
}
2141
2142
152
void BlockFileCache::run_background_block_lru_update() {
2143
152
    Thread::set_self_name("run_background_block_lru_update");
2144
152
    std::vector<FileBlockSPtr> batch;
2145
6.95k
    while (!_close) {
2146
6.95k
        int64_t interval_ms = config::file_cache_background_block_lru_update_interval_ms;
2147
6.95k
        size_t batch_limit =
2148
6.95k
                config::file_cache_background_block_lru_update_qps_limit * interval_ms / 1000;
2149
6.95k
        {
2150
6.95k
            std::unique_lock close_lock(_close_mtx);
2151
6.95k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2152
6.95k
            if (_close) {
2153
150
                break;
2154
150
            }
2155
6.95k
        }
2156
2157
6.80k
        batch.clear();
2158
6.80k
        batch.reserve(batch_limit);
2159
6.80k
        size_t drained = _need_update_lru_blocks.drain(batch_limit, &batch);
2160
6.80k
        if (drained == 0) {
2161
6.79k
            *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2162
6.79k
            continue;
2163
6.79k
        }
2164
2165
7
        int64_t duration_ns = 0;
2166
7
        {
2167
7
            SCOPED_CACHE_LOCK(_mutex, this);
2168
7
            SCOPED_RAW_TIMER(&duration_ns);
2169
7
            for (auto& block : batch) {
2170
5
                update_block_lru(block, cache_lock);
2171
5
            }
2172
7
        }
2173
7
        *_update_lru_blocks_latency_us << (duration_ns / 1000);
2174
7
        *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2175
7
    }
2176
152
}
2177
2178
std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>>
2179
2
BlockFileCache::get_hot_blocks_meta(const UInt128Wrapper& hash) const {
2180
2
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
2181
2
                               std::chrono::steady_clock::now().time_since_epoch())
2182
2
                               .count();
2183
2
    SCOPED_CACHE_LOCK(_mutex, this);
2184
2
    std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>> blocks_meta;
2185
2
    if (auto iter = _files.find(hash); iter != _files.end()) {
2186
5
        for (auto& pair : _files.find(hash)->second) {
2187
5
            const FileBlockCell* cell = &pair.second;
2188
5
            if (cell->file_block->cache_type() != FileCacheType::DISPOSABLE) {
2189
4
                if (cell->file_block->cache_type() == FileCacheType::TTL ||
2190
4
                    (cell->atime != 0 &&
2191
3
                     cur_time - cell->atime <
2192
3
                             get_queue(cell->file_block->cache_type()).get_hot_data_interval())) {
2193
3
                    blocks_meta.emplace_back(pair.first, cell->size(),
2194
3
                                             cell->file_block->cache_type(),
2195
3
                                             cell->file_block->expiration_time());
2196
3
                }
2197
4
            }
2198
5
        }
2199
2
    }
2200
2
    return blocks_meta;
2201
2
}
2202
2203
bool BlockFileCache::try_reserve_during_async_load(size_t size,
2204
4
                                                   std::lock_guard<std::mutex>& cache_lock) {
2205
4
    size_t removed_size = 0;
2206
4
    size_t normal_queue_size = _normal_queue.get_capacity(cache_lock);
2207
4
    size_t disposable_queue_size = _disposable_queue.get_capacity(cache_lock);
2208
4
    size_t index_queue_size = _index_queue.get_capacity(cache_lock);
2209
2210
4
    std::vector<FileBlockCell*> to_evict;
2211
4
    auto collect_eliminate_fragments = [&](LRUQueue& queue) {
2212
4
        for (const auto& [entry_key, entry_offset, entry_size] : queue) {
2213
4
            if (!_disk_resource_limit_mode || removed_size >= size) {
2214
3
                break;
2215
3
            }
2216
1
            auto* cell = get_cell(entry_key, entry_offset, cache_lock);
2217
2218
1
            DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
2219
0
                         << ", offset: " << entry_offset;
2220
2221
1
            size_t cell_size = cell->size();
2222
1
            DCHECK(entry_size == cell_size);
2223
2224
1
            if (cell->releasable()) {
2225
1
                auto& file_block = cell->file_block;
2226
2227
1
                std::lock_guard block_lock(file_block->_mutex);
2228
1
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
2229
1
                to_evict.push_back(cell);
2230
1
                removed_size += cell_size;
2231
1
            }
2232
1
        }
2233
3
    };
2234
4
    if (disposable_queue_size != 0) {
2235
0
        collect_eliminate_fragments(get_queue(FileCacheType::DISPOSABLE));
2236
0
    }
2237
4
    if (normal_queue_size != 0) {
2238
3
        collect_eliminate_fragments(get_queue(FileCacheType::NORMAL));
2239
3
    }
2240
4
    if (index_queue_size != 0) {
2241
0
        collect_eliminate_fragments(get_queue(FileCacheType::INDEX));
2242
0
    }
2243
4
    std::string reason = "async load";
2244
4
    remove_file_blocks(to_evict, cache_lock, true, reason);
2245
2246
4
    return !_disk_resource_limit_mode || removed_size >= size;
2247
4
}
2248
2249
38
void BlockFileCache::clear_need_update_lru_blocks() {
2250
38
    _need_update_lru_blocks.clear();
2251
38
    *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2252
38
}
2253
2254
36
void BlockFileCache::pause_ttl_manager() {
2255
36
    if (_ttl_mgr) {
2256
12
        _ttl_mgr->stop();
2257
12
    }
2258
36
}
2259
2260
36
void BlockFileCache::resume_ttl_manager() {
2261
36
    if (_ttl_mgr) {
2262
12
        _ttl_mgr->resume();
2263
12
    }
2264
36
}
2265
2266
36
std::string BlockFileCache::clear_file_cache_directly() {
2267
36
    pause_ttl_manager();
2268
36
    _lru_dumper->remove_lru_dump_files();
2269
36
    using namespace std::chrono;
2270
36
    std::stringstream ss;
2271
36
    auto start = steady_clock::now();
2272
36
    std::string result;
2273
36
    {
2274
36
        SCOPED_CACHE_LOCK(_mutex, this);
2275
36
        LOG_INFO("start clear_file_cache_directly").tag("path", _cache_base_path);
2276
2277
36
        std::string clear_msg;
2278
36
        auto s = _storage->clear(clear_msg);
2279
36
        if (!s.ok()) {
2280
1
            result = clear_msg;
2281
35
        } else {
2282
35
            int64_t num_files = _files.size();
2283
35
            int64_t cache_size = _cur_cache_size;
2284
35
            int64_t index_queue_size = _index_queue.get_elements_num(cache_lock);
2285
35
            int64_t normal_queue_size = _normal_queue.get_elements_num(cache_lock);
2286
35
            int64_t disposible_queue_size = _disposable_queue.get_elements_num(cache_lock);
2287
35
            int64_t ttl_queue_size = _ttl_queue.get_elements_num(cache_lock);
2288
2289
35
            int64_t clear_fd_duration = 0;
2290
35
            {
2291
                // clear FDCache to release fd
2292
35
                SCOPED_RAW_TIMER(&clear_fd_duration);
2293
5.18k
                for (const auto& [file_key, file_blocks] : _files) {
2294
5.85k
                    for (const auto& [offset, file_block_cell] : file_blocks) {
2295
5.85k
                        AccessKeyAndOffset access_key_and_offset(file_key, offset);
2296
5.85k
                        FDCache::instance()->remove_file_reader(access_key_and_offset);
2297
5.85k
                    }
2298
5.18k
                }
2299
35
            }
2300
2301
35
            _files.clear();
2302
35
            _cur_cache_size = 0;
2303
35
            _cur_ttl_size = 0;
2304
35
            _time_to_key.clear();
2305
35
            _key_to_time.clear();
2306
35
            _index_queue.clear(cache_lock);
2307
35
            _normal_queue.clear(cache_lock);
2308
35
            _disposable_queue.clear(cache_lock);
2309
35
            _ttl_queue.clear(cache_lock);
2310
2311
            // Update cache metrics immediately so consumers observe the cleared state
2312
            // without waiting for the next background monitor round.
2313
35
            _cur_cache_size_metrics->set_value(0);
2314
35
            _cur_ttl_cache_size_metrics->set_value(0);
2315
35
            _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(0);
2316
35
            _cur_ttl_cache_lru_queue_element_count_metrics->set_value(0);
2317
35
            _cur_normal_queue_cache_size_metrics->set_value(0);
2318
35
            _cur_normal_queue_element_count_metrics->set_value(0);
2319
35
            _cur_index_queue_cache_size_metrics->set_value(0);
2320
35
            _cur_index_queue_element_count_metrics->set_value(0);
2321
35
            _cur_disposable_queue_cache_size_metrics->set_value(0);
2322
35
            _cur_disposable_queue_element_count_metrics->set_value(0);
2323
2324
35
            clear_need_update_lru_blocks();
2325
2326
35
            ss << "finish clear_file_cache_directly"
2327
35
               << " path=" << _cache_base_path << " time_elapsed_ms="
2328
35
               << duration_cast<milliseconds>(steady_clock::now() - start).count()
2329
35
               << " fd_clear_time_ms=" << (clear_fd_duration / 1000000)
2330
35
               << " num_files=" << num_files << " cache_size=" << cache_size
2331
35
               << " index_queue_size=" << index_queue_size
2332
35
               << " normal_queue_size=" << normal_queue_size
2333
35
               << " disposible_queue_size=" << disposible_queue_size
2334
35
               << "ttl_queue_size=" << ttl_queue_size;
2335
35
            result = ss.str();
2336
35
            LOG(INFO) << result;
2337
35
        }
2338
36
    }
2339
36
    _lru_dumper->remove_lru_dump_files();
2340
36
    resume_ttl_manager();
2341
36
    return result;
2342
36
}
2343
2344
5.14k
std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const UInt128Wrapper& hash) {
2345
5.14k
    std::map<size_t, FileBlockSPtr> offset_to_block;
2346
5.14k
    SCOPED_CACHE_LOCK(_mutex, this);
2347
5.14k
    if (_files.contains(hash)) {
2348
5.14k
        for (auto& [offset, cell] : _files[hash]) {
2349
5.14k
            if (cell.file_block->state() == FileBlock::State::DOWNLOADED) {
2350
5.14k
                cell.file_block->_owned_by_cached_reader = true;
2351
5.14k
                offset_to_block.emplace(offset, cell.file_block);
2352
5.14k
            }
2353
5.14k
        }
2354
5.13k
    }
2355
5.14k
    return offset_to_block;
2356
5.14k
}
2357
2358
0
void BlockFileCache::update_ttl_atime(const UInt128Wrapper& hash) {
2359
0
    SCOPED_CACHE_LOCK(_mutex, this);
2360
0
    if (auto iter = _files.find(hash); iter != _files.end()) {
2361
0
        for (auto& [_, cell] : iter->second) {
2362
0
            cell.update_atime();
2363
0
        }
2364
0
    };
2365
0
}
2366
2367
152
void BlockFileCache::run_background_lru_log_replay() {
2368
152
    Thread::set_self_name("run_background_lru_log_replay");
2369
18.7k
    while (!_close) {
2370
18.7k
        int64_t interval_ms = config::file_cache_background_lru_log_replay_interval_ms;
2371
18.7k
        {
2372
18.7k
            std::unique_lock close_lock(_close_mtx);
2373
18.7k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2374
18.7k
            if (_close) {
2375
149
                break;
2376
149
            }
2377
18.7k
        }
2378
2379
18.5k
        _lru_recorder->replay_queue_event(FileCacheType::TTL);
2380
18.5k
        _lru_recorder->replay_queue_event(FileCacheType::INDEX);
2381
18.5k
        _lru_recorder->replay_queue_event(FileCacheType::NORMAL);
2382
18.5k
        _lru_recorder->replay_queue_event(FileCacheType::DISPOSABLE);
2383
2384
18.5k
        if (config::enable_evaluate_shadow_queue_diff) {
2385
0
            SCOPED_CACHE_LOCK(_mutex, this);
2386
0
            _lru_recorder->evaluate_queue_diff(_ttl_queue, "ttl", cache_lock);
2387
0
            _lru_recorder->evaluate_queue_diff(_index_queue, "index", cache_lock);
2388
0
            _lru_recorder->evaluate_queue_diff(_normal_queue, "normal", cache_lock);
2389
0
            _lru_recorder->evaluate_queue_diff(_disposable_queue, "disposable", cache_lock);
2390
0
        }
2391
18.5k
    }
2392
152
}
2393
2394
1.53k
void BlockFileCache::dump_lru_queues(bool force) {
2395
1.53k
    std::unique_lock dump_lock(_dump_lru_queues_mtx);
2396
1.53k
    if (config::file_cache_background_lru_dump_tail_record_num > 0 &&
2397
1.53k
        !ExecEnv::GetInstance()->get_is_upgrading()) {
2398
1.53k
        _lru_dumper->dump_queue("disposable", force);
2399
1.53k
        _lru_dumper->dump_queue("normal", force);
2400
1.53k
        _lru_dumper->dump_queue("index", force);
2401
1.53k
        _lru_dumper->dump_queue("ttl", force);
2402
1.53k
        _lru_dumper->set_first_dump_done();
2403
1.53k
    }
2404
1.53k
}
2405
2406
152
void BlockFileCache::run_background_lru_dump() {
2407
152
    Thread::set_self_name("run_background_lru_dump");
2408
1.68k
    while (!_close) {
2409
1.68k
        int64_t interval_ms = config::file_cache_background_lru_dump_interval_ms;
2410
1.68k
        {
2411
1.68k
            std::unique_lock close_lock(_close_mtx);
2412
1.68k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2413
1.68k
            if (_close) {
2414
150
                break;
2415
150
            }
2416
1.68k
        }
2417
1.53k
        dump_lru_queues(false);
2418
1.53k
    }
2419
152
}
2420
2421
152
void BlockFileCache::restore_lru_queues_from_disk(std::lock_guard<std::mutex>& cache_lock) {
2422
    // keep this order coz may be duplicated in different queue, we use the first appearence
2423
152
    _lru_dumper->restore_queue(_ttl_queue, "ttl", cache_lock);
2424
152
    _lru_dumper->restore_queue(_index_queue, "index", cache_lock);
2425
152
    _lru_dumper->restore_queue(_normal_queue, "normal", cache_lock);
2426
152
    _lru_dumper->restore_queue(_disposable_queue, "disposable", cache_lock);
2427
152
}
2428
2429
148
std::map<std::string, double> BlockFileCache::get_stats() {
2430
148
    std::map<std::string, double> stats;
2431
148
    stats["hits_ratio"] = (double)_hit_ratio->get_value();
2432
148
    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
2433
148
    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
2434
2435
148
    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
2436
148
    stats["index_queue_curr_size"] = (double)_cur_index_queue_cache_size_metrics->get_value();
2437
148
    stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
2438
148
    stats["index_queue_curr_elements"] =
2439
148
            (double)_cur_index_queue_element_count_metrics->get_value();
2440
2441
148
    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
2442
148
    stats["ttl_queue_curr_size"] = (double)_cur_ttl_cache_lru_queue_cache_size_metrics->get_value();
2443
148
    stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
2444
148
    stats["ttl_queue_curr_elements"] =
2445
148
            (double)_cur_ttl_cache_lru_queue_element_count_metrics->get_value();
2446
2447
148
    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
2448
148
    stats["normal_queue_curr_size"] = (double)_cur_normal_queue_cache_size_metrics->get_value();
2449
148
    stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
2450
148
    stats["normal_queue_curr_elements"] =
2451
148
            (double)_cur_normal_queue_element_count_metrics->get_value();
2452
2453
148
    stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
2454
148
    stats["disposable_queue_curr_size"] =
2455
148
            (double)_cur_disposable_queue_cache_size_metrics->get_value();
2456
148
    stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
2457
148
    stats["disposable_queue_curr_elements"] =
2458
148
            (double)_cur_disposable_queue_element_count_metrics->get_value();
2459
2460
148
    stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance;
2461
148
    stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode;
2462
2463
148
    stats["total_removed_counts"] = (double)_num_removed_blocks->get_value();
2464
148
    stats["total_hit_counts"] = (double)_num_hit_blocks->get_value();
2465
148
    stats["total_read_counts"] = (double)_num_read_blocks->get_value();
2466
2467
148
    stats["total_read_size"] = (double)_total_read_size_metrics->get_value();
2468
148
    stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value();
2469
148
    stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value();
2470
2471
148
    return stats;
2472
148
}
2473
2474
// for be UTs
2475
172
std::map<std::string, double> BlockFileCache::get_stats_unsafe() {
2476
172
    std::map<std::string, double> stats;
2477
172
    stats["hits_ratio"] = (double)_hit_ratio->get_value();
2478
172
    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
2479
172
    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
2480
2481
172
    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
2482
172
    stats["index_queue_curr_size"] = (double)_index_queue.get_capacity_unsafe();
2483
172
    stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
2484
172
    stats["index_queue_curr_elements"] = (double)_index_queue.get_elements_num_unsafe();
2485
2486
172
    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
2487
172
    stats["ttl_queue_curr_size"] = (double)_ttl_queue.get_capacity_unsafe();
2488
172
    stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
2489
172
    stats["ttl_queue_curr_elements"] = (double)_ttl_queue.get_elements_num_unsafe();
2490
2491
172
    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
2492
172
    stats["normal_queue_curr_size"] = (double)_normal_queue.get_capacity_unsafe();
2493
172
    stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
2494
172
    stats["normal_queue_curr_elements"] = (double)_normal_queue.get_elements_num_unsafe();
2495
2496
172
    stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
2497
172
    stats["disposable_queue_curr_size"] = (double)_disposable_queue.get_capacity_unsafe();
2498
172
    stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
2499
172
    stats["disposable_queue_curr_elements"] = (double)_disposable_queue.get_elements_num_unsafe();
2500
2501
172
    stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance;
2502
172
    stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode;
2503
2504
172
    stats["total_removed_counts"] = (double)_num_removed_blocks->get_value();
2505
172
    stats["total_hit_counts"] = (double)_num_hit_blocks->get_value();
2506
172
    stats["total_read_counts"] = (double)_num_read_blocks->get_value();
2507
2508
172
    stats["total_read_size"] = (double)_total_read_size_metrics->get_value();
2509
172
    stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value();
2510
172
    stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value();
2511
2512
172
    return stats;
2513
172
}
2514
2515
template void BlockFileCache::remove(FileBlockSPtr file_block,
2516
                                     std::lock_guard<std::mutex>& cache_lock,
2517
                                     std::lock_guard<std::mutex>& block_lock, bool sync);
2518
2519
1
Status BlockFileCache::report_file_cache_inconsistency(std::vector<std::string>& results) {
2520
1
    InconsistencyContext inconsistency_context;
2521
1
    RETURN_IF_ERROR(check_file_cache_consistency(inconsistency_context));
2522
1
    auto n = inconsistency_context.types.size();
2523
1
    results.reserve(n);
2524
1
    for (size_t i = 0; i < n; i++) {
2525
0
        std::string result;
2526
0
        result += "File cache info in manager:\n";
2527
0
        result += inconsistency_context.infos_in_manager[i].to_string();
2528
0
        result += "File cache info in storage:\n";
2529
0
        result += inconsistency_context.infos_in_storage[i].to_string();
2530
0
        result += inconsistency_context.types[i].to_string();
2531
0
        result += "\n";
2532
0
        results.push_back(std::move(result));
2533
0
    }
2534
1
    return Status::OK();
2535
1
}
2536
2537
1
Status BlockFileCache::check_file_cache_consistency(InconsistencyContext& inconsistency_context) {
2538
1
    std::lock_guard<std::mutex> cache_lock(_mutex);
2539
1
    std::vector<FileCacheInfo> infos_in_storage;
2540
1
    RETURN_IF_ERROR(_storage->get_file_cache_infos(infos_in_storage, cache_lock));
2541
1
    std::unordered_set<AccessKeyAndOffset, KeyAndOffsetHash> confirmed_blocks;
2542
1
    for (const auto& info_in_storage : infos_in_storage) {
2543
0
        confirmed_blocks.insert({info_in_storage.hash, info_in_storage.offset});
2544
0
        auto* cell = get_cell(info_in_storage.hash, info_in_storage.offset, cache_lock);
2545
0
        if (cell == nullptr || cell->file_block == nullptr) {
2546
0
            inconsistency_context.infos_in_manager.emplace_back();
2547
0
            inconsistency_context.infos_in_storage.push_back(info_in_storage);
2548
0
            inconsistency_context.types.emplace_back(InconsistencyType::NOT_LOADED);
2549
0
            continue;
2550
0
        }
2551
0
        FileCacheInfo info_in_manager {
2552
0
                .hash = info_in_storage.hash,
2553
0
                .expiration_time = cell->file_block->expiration_time(),
2554
0
                .size = cell->size(),
2555
0
                .offset = info_in_storage.offset,
2556
0
                .is_tmp = cell->file_block->state() == FileBlock::State::DOWNLOADING,
2557
0
                .cache_type = cell->file_block->cache_type()};
2558
0
        InconsistencyType inconsistent_type;
2559
0
        if (info_in_storage.is_tmp != info_in_manager.is_tmp) {
2560
0
            inconsistent_type |= InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE;
2561
0
        }
2562
0
        size_t expected_size =
2563
0
                info_in_manager.is_tmp ? cell->dowloading_size() : info_in_manager.size;
2564
0
        if (info_in_storage.size != expected_size) {
2565
0
            inconsistent_type |= InconsistencyType::SIZE_INCONSISTENT;
2566
0
        }
2567
        // Only if it is not a tmp file need we check the cache type.
2568
0
        if ((inconsistent_type & InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE) == 0 &&
2569
0
            info_in_storage.cache_type != info_in_manager.cache_type) {
2570
0
            inconsistent_type |= InconsistencyType::CACHE_TYPE_INCONSISTENT;
2571
0
        }
2572
0
        if (info_in_storage.expiration_time != info_in_manager.expiration_time) {
2573
0
            inconsistent_type |= InconsistencyType::EXPIRATION_TIME_INCONSISTENT;
2574
0
        }
2575
0
        if (inconsistent_type != InconsistencyType::NONE) {
2576
0
            inconsistency_context.infos_in_manager.push_back(info_in_manager);
2577
0
            inconsistency_context.infos_in_storage.push_back(info_in_storage);
2578
0
            inconsistency_context.types.push_back(inconsistent_type);
2579
0
        }
2580
0
    }
2581
2582
1
    for (const auto& [hash, offset_to_cell] : _files) {
2583
0
        for (const auto& [offset, cell] : offset_to_cell) {
2584
0
            if (confirmed_blocks.contains({hash, offset})) {
2585
0
                continue;
2586
0
            }
2587
0
            const auto& block = cell.file_block;
2588
0
            inconsistency_context.infos_in_manager.emplace_back(
2589
0
                    hash, block->expiration_time(), cell.size(), offset,
2590
0
                    cell.file_block->state() == FileBlock::State::DOWNLOADING, block->cache_type());
2591
0
            inconsistency_context.infos_in_storage.emplace_back();
2592
0
            inconsistency_context.types.emplace_back(InconsistencyType::MISSING_IN_STORAGE);
2593
0
        }
2594
0
    }
2595
1
    return Status::OK();
2596
1
}
2597
2598
} // namespace doris::io