Coverage Report

Created: 2026-05-14 18:55

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/cache/block_file_cache.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileCache.cpp
19
// and modified by Doris
20
21
#include "io/cache/block_file_cache.h"
22
23
#include <gen_cpp/file_cache.pb.h>
24
25
#include <cstdio>
26
#include <exception>
27
#include <fstream>
28
#include <unordered_set>
29
30
#include "common/status.h"
31
#include "cpp/sync_point.h"
32
#include "runtime/exec_env.h"
33
34
#if defined(__APPLE__)
35
#include <sys/mount.h>
36
#else
37
#include <sys/statfs.h>
38
#endif
39
40
#include <chrono> // IWYU pragma: keep
41
#include <mutex>
42
#include <ranges>
43
44
#include "common/cast_set.h"
45
#include "common/config.h"
46
#include "common/logging.h"
47
#include "core/uint128.h"
48
#include "exec/common/sip_hash.h"
49
#include "io/cache/block_file_cache_ttl_mgr.h"
50
#include "io/cache/file_block.h"
51
#include "io/cache/file_cache_common.h"
52
#include "io/cache/fs_file_cache_storage.h"
53
#include "io/cache/mem_file_cache_storage.h"
54
#include "runtime/runtime_profile.h"
55
#include "util/concurrency_stats.h"
56
#include "util/stack_util.h"
57
#include "util/stopwatch.hpp"
58
#include "util/thread.h"
59
#include "util/time.h"
60
namespace doris::io {
61
62
// Insert a block pointer into one shard while swallowing allocation failures.
63
8.60k
bool NeedUpdateLRUBlocks::insert(FileBlockSPtr block) {
64
8.60k
    if (!block) {
65
1
        return false;
66
1
    }
67
8.59k
    try {
68
8.59k
        auto* raw_ptr = block.get();
69
8.59k
        auto idx = shard_index(raw_ptr);
70
8.59k
        auto& shard = _shards[idx];
71
8.59k
        std::lock_guard lock(shard.mutex);
72
8.59k
        auto [_, inserted] = shard.entries.emplace(raw_ptr, std::move(block));
73
8.59k
        if (inserted) {
74
909
            _size.fetch_add(1, std::memory_order_relaxed);
75
909
        }
76
8.59k
        return inserted;
77
8.59k
    } catch (const std::exception& e) {
78
0
        LOG(WARNING) << "Failed to enqueue block for LRU update: " << e.what();
79
0
    } catch (...) {
80
0
        LOG(WARNING) << "Failed to enqueue block for LRU update: unknown error";
81
0
    }
82
0
    return false;
83
8.59k
}
84
85
// Drain up to `limit` unique blocks to the caller, keeping the structure consistent on failures.
86
3.93k
size_t NeedUpdateLRUBlocks::drain(size_t limit, std::vector<FileBlockSPtr>* output) {
87
3.93k
    if (limit == 0 || output == nullptr) {
88
2
        return 0;
89
2
    }
90
3.93k
    size_t drained = 0;
91
3.93k
    try {
92
3.93k
        output->reserve(output->size() + std::min(limit, size()));
93
251k
        for (auto& shard : _shards) {
94
251k
            if (drained >= limit) {
95
1
                break;
96
1
            }
97
251k
            std::lock_guard lock(shard.mutex);
98
251k
            auto it = shard.entries.begin();
99
251k
            size_t shard_drained = 0;
100
251k
            while (it != shard.entries.end() && drained + shard_drained < limit) {
101
486
                output->emplace_back(std::move(it->second));
102
486
                it = shard.entries.erase(it);
103
486
                ++shard_drained;
104
486
            }
105
251k
            if (shard_drained > 0) {
106
7
                _size.fetch_sub(shard_drained, std::memory_order_relaxed);
107
7
                drained += shard_drained;
108
7
            }
109
251k
        }
110
3.93k
    } catch (const std::exception& e) {
111
0
        LOG(WARNING) << "Failed to drain LRU update blocks: " << e.what();
112
0
    } catch (...) {
113
0
        LOG(WARNING) << "Failed to drain LRU update blocks: unknown error";
114
0
    }
115
3.94k
    return drained;
116
3.93k
}
117
118
// Remove every pending block, guarding against unexpected exceptions.
119
28
void NeedUpdateLRUBlocks::clear() {
120
28
    try {
121
1.79k
        for (auto& shard : _shards) {
122
1.79k
            std::lock_guard lock(shard.mutex);
123
1.79k
            if (!shard.entries.empty()) {
124
1
                auto removed = shard.entries.size();
125
1
                shard.entries.clear();
126
1
                _size.fetch_sub(removed, std::memory_order_relaxed);
127
1
            }
128
1.79k
        }
129
28
    } catch (const std::exception& e) {
130
0
        LOG(WARNING) << "Failed to clear LRU update blocks: " << e.what();
131
0
    } catch (...) {
132
0
        LOG(WARNING) << "Failed to clear LRU update blocks: unknown error";
133
0
    }
134
28
}
135
136
8.60k
size_t NeedUpdateLRUBlocks::shard_index(FileBlock* ptr) const {
137
8.60k
    DCHECK(ptr != nullptr);
138
8.60k
    return std::hash<FileBlock*> {}(ptr)&kShardMask;
139
8.60k
}
140
141
BlockFileCache::BlockFileCache(const std::string& cache_base_path,
142
                               const FileCacheSettings& cache_settings)
143
161
        : _cache_base_path(cache_base_path),
144
161
          _capacity(cache_settings.capacity),
145
161
          _max_file_block_size(cache_settings.max_file_block_size) {
146
161
    _cur_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(_cache_base_path.c_str(),
147
161
                                                                     "file_cache_cache_size", 0);
148
161
    _cache_capacity_metrics = std::make_shared<bvar::Status<size_t>>(
149
161
            _cache_base_path.c_str(), "file_cache_capacity", _capacity);
150
161
    _cur_ttl_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
151
161
            _cache_base_path.c_str(), "file_cache_ttl_cache_size", 0);
152
161
    _cur_normal_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
153
161
            _cache_base_path.c_str(), "file_cache_normal_queue_element_count", 0);
154
161
    _cur_ttl_cache_lru_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
155
161
            _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_size", 0);
156
161
    _cur_ttl_cache_lru_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
157
161
            _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_element_count", 0);
158
161
    _cur_normal_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
159
161
            _cache_base_path.c_str(), "file_cache_normal_queue_cache_size", 0);
160
161
    _cur_index_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
161
161
            _cache_base_path.c_str(), "file_cache_index_queue_element_count", 0);
162
161
    _cur_index_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
163
161
            _cache_base_path.c_str(), "file_cache_index_queue_cache_size", 0);
164
161
    _cur_disposable_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
165
161
            _cache_base_path.c_str(), "file_cache_disposable_queue_element_count", 0);
166
161
    _cur_disposable_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
167
161
            _cache_base_path.c_str(), "file_cache_disposable_queue_cache_size", 0);
168
169
161
    _queue_evict_size_metrics[0] = std::make_shared<bvar::Adder<size_t>>(
170
161
            _cache_base_path.c_str(), "file_cache_index_queue_evict_size");
171
161
    _queue_evict_size_metrics[1] = std::make_shared<bvar::Adder<size_t>>(
172
161
            _cache_base_path.c_str(), "file_cache_normal_queue_evict_size");
173
161
    _queue_evict_size_metrics[2] = std::make_shared<bvar::Adder<size_t>>(
174
161
            _cache_base_path.c_str(), "file_cache_disposable_queue_evict_size");
175
161
    _queue_evict_size_metrics[3] = std::make_shared<bvar::Adder<size_t>>(
176
161
            _cache_base_path.c_str(), "file_cache_ttl_cache_evict_size");
177
161
    _total_evict_size_metrics = std::make_shared<bvar::Adder<size_t>>(
178
161
            _cache_base_path.c_str(), "file_cache_total_evict_size");
179
161
    _total_read_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
180
161
                                                                     "file_cache_total_read_size");
181
161
    _total_hit_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
182
161
                                                                    "file_cache_total_hit_size");
183
161
    _gc_evict_bytes_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
184
161
                                                                    "file_cache_gc_evict_bytes");
185
161
    _gc_evict_count_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
186
161
                                                                    "file_cache_gc_evict_count");
187
188
161
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
189
161
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
190
161
                                                  "file_cache_evict_by_time_disposable_to_normal");
191
161
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
192
161
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
193
161
                                                  "file_cache_evict_by_time_disposable_to_index");
194
161
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
195
161
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
196
161
                                                  "file_cache_evict_by_time_disposable_to_ttl");
197
161
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
198
161
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
199
161
                                                  "file_cache_evict_by_time_normal_to_disposable");
200
161
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
201
161
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
202
161
                                                  "file_cache_evict_by_time_normal_to_index");
203
161
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
204
161
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
205
161
                                                  "file_cache_evict_by_time_normal_to_ttl");
206
161
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
207
161
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
208
161
                                                  "file_cache_evict_by_time_index_to_disposable");
209
161
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
210
161
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
211
161
                                                  "file_cache_evict_by_time_index_to_normal");
212
161
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
213
161
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
214
161
                                                  "file_cache_evict_by_time_index_to_ttl");
215
161
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
216
161
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
217
161
                                                  "file_cache_evict_by_time_ttl_to_disposable");
218
161
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
219
161
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
220
161
                                                  "file_cache_evict_by_time_ttl_to_normal");
221
161
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
222
161
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
223
161
                                                  "file_cache_evict_by_time_ttl_to_index");
224
225
161
    _evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE] =
226
161
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
227
161
                                                  "file_cache_evict_by_self_lru_disposable");
228
161
    _evict_by_self_lru_metrics_matrix[FileCacheType::NORMAL] =
229
161
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
230
161
                                                  "file_cache_evict_by_self_lru_normal");
231
161
    _evict_by_self_lru_metrics_matrix[FileCacheType::INDEX] = std::make_shared<bvar::Adder<size_t>>(
232
161
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_index");
233
161
    _evict_by_self_lru_metrics_matrix[FileCacheType::TTL] = std::make_shared<bvar::Adder<size_t>>(
234
161
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_ttl");
235
236
161
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
237
161
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
238
161
                                                  "file_cache_evict_by_size_disposable_to_normal");
239
161
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
240
161
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
241
161
                                                  "file_cache_evict_by_size_disposable_to_index");
242
161
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
243
161
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
244
161
                                                  "file_cache_evict_by_size_disposable_to_ttl");
245
161
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
246
161
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
247
161
                                                  "file_cache_evict_by_size_normal_to_disposable");
248
161
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
249
161
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
250
161
                                                  "file_cache_evict_by_size_normal_to_index");
251
161
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
252
161
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
253
161
                                                  "file_cache_evict_by_size_normal_to_ttl");
254
161
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
255
161
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
256
161
                                                  "file_cache_evict_by_size_index_to_disposable");
257
161
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
258
161
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
259
161
                                                  "file_cache_evict_by_size_index_to_normal");
260
161
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
261
161
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
262
161
                                                  "file_cache_evict_by_size_index_to_ttl");
263
161
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
264
161
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
265
161
                                                  "file_cache_evict_by_size_ttl_to_disposable");
266
161
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
267
161
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
268
161
                                                  "file_cache_evict_by_size_ttl_to_normal");
269
161
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
270
161
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
271
161
                                                  "file_cache_evict_by_size_ttl_to_index");
272
273
161
    _evict_by_try_release = std::make_shared<bvar::Adder<size_t>>(
274
161
            _cache_base_path.c_str(), "file_cache_evict_by_try_release");
275
276
161
    _num_read_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
277
161
                                                             "file_cache_num_read_blocks");
278
161
    _num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
279
161
                                                            "file_cache_num_hit_blocks");
280
161
    _num_removed_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
281
161
                                                                "file_cache_num_removed_blocks");
282
283
161
    _no_warmup_num_read_blocks = std::make_shared<bvar::Adder<size_t>>(
284
161
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks");
285
161
    _no_warmup_num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(
286
161
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks");
287
288
#ifndef BE_TEST
289
    _num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
290
            _cache_base_path.c_str(), "file_cache_num_hit_blocks_5m", _num_hit_blocks.get(), 300);
291
    _num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
292
            _cache_base_path.c_str(), "file_cache_num_read_blocks_5m", _num_read_blocks.get(), 300);
293
    _num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
294
            _cache_base_path.c_str(), "file_cache_num_hit_blocks_1h", _num_hit_blocks.get(), 3600);
295
    _num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
296
            _cache_base_path.c_str(), "file_cache_num_read_blocks_1h", _num_read_blocks.get(),
297
            3600);
298
    _no_warmup_num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
299
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_5m",
300
            _no_warmup_num_hit_blocks.get(), 300);
301
    _no_warmup_num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
302
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_5m",
303
            _no_warmup_num_read_blocks.get(), 300);
304
    _no_warmup_num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
305
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_1h",
306
            _no_warmup_num_hit_blocks.get(), 3600);
307
    _no_warmup_num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
308
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_1h",
309
            _no_warmup_num_read_blocks.get(), 3600);
310
#endif
311
312
161
    _hit_ratio = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
313
161
                                                        "file_cache_hit_ratio", 0.0);
314
161
    _hit_ratio_5m = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
315
161
                                                           "file_cache_hit_ratio_5m", 0.0);
316
161
    _hit_ratio_1h = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
317
161
                                                           "file_cache_hit_ratio_1h", 0.0);
318
319
161
    _no_warmup_hit_ratio = std::make_shared<bvar::Status<double>>(
320
161
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio", 0.0);
321
161
    _no_warmup_hit_ratio_5m = std::make_shared<bvar::Status<double>>(
322
161
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_5m", 0.0);
323
161
    _no_warmup_hit_ratio_1h = std::make_shared<bvar::Status<double>>(
324
161
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_1h", 0.0);
325
326
161
    _disk_limit_mode_metrics = std::make_shared<bvar::Status<size_t>>(
327
161
            _cache_base_path.c_str(), "file_cache_disk_limit_mode", 0);
328
161
    _need_evict_cache_in_advance_metrics = std::make_shared<bvar::Status<size_t>>(
329
161
            _cache_base_path.c_str(), "file_cache_need_evict_cache_in_advance", 0);
330
161
    _meta_store_write_queue_size_metrics = std::make_shared<bvar::Status<size_t>>(
331
161
            _cache_base_path.c_str(), "file_cache_meta_store_write_queue_size", 0);
332
333
161
    _cache_lock_wait_time_us = std::make_shared<bvar::LatencyRecorder>(
334
161
            _cache_base_path.c_str(), "file_cache_cache_lock_wait_time_us");
335
161
    _get_or_set_latency_us = std::make_shared<bvar::LatencyRecorder>(
336
161
            _cache_base_path.c_str(), "file_cache_get_or_set_latency_us");
337
161
    _storage_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
338
161
            _cache_base_path.c_str(), "file_cache_storage_sync_remove_latency_us");
339
161
    _storage_retry_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
340
161
            _cache_base_path.c_str(), "file_cache_storage_retry_sync_remove_latency_us");
341
161
    _storage_async_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
342
161
            _cache_base_path.c_str(), "file_cache_storage_async_remove_latency_us");
343
161
    _evict_in_advance_latency_us = std::make_shared<bvar::LatencyRecorder>(
344
161
            _cache_base_path.c_str(), "file_cache_evict_in_advance_latency_us");
345
161
    _lru_dump_latency_us = std::make_shared<bvar::LatencyRecorder>(
346
161
            _cache_base_path.c_str(), "file_cache_lru_dump_latency_us");
347
161
    _recycle_keys_length_recorder = std::make_shared<bvar::LatencyRecorder>(
348
161
            _cache_base_path.c_str(), "file_cache_recycle_keys_length");
349
161
    _need_update_lru_blocks_length_recorder = std::make_shared<bvar::LatencyRecorder>(
350
161
            _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_length");
351
161
    _update_lru_blocks_latency_us = std::make_shared<bvar::LatencyRecorder>(
352
161
            _cache_base_path.c_str(), "file_cache_update_lru_blocks_latency_us");
353
161
    _ttl_gc_latency_us = std::make_shared<bvar::LatencyRecorder>(_cache_base_path.c_str(),
354
161
                                                                 "file_cache_ttl_gc_latency_us");
355
161
    _shadow_queue_levenshtein_distance = std::make_shared<bvar::LatencyRecorder>(
356
161
            _cache_base_path.c_str(), "file_cache_shadow_queue_levenshtein_distance");
357
358
161
    _disposable_queue = LRUQueue(cache_settings.disposable_queue_size,
359
161
                                 cache_settings.disposable_queue_elements, 60 * 60);
360
161
    _index_queue = LRUQueue(cache_settings.index_queue_size, cache_settings.index_queue_elements,
361
161
                            7 * 24 * 60 * 60);
362
161
    _normal_queue = LRUQueue(cache_settings.query_queue_size, cache_settings.query_queue_elements,
363
161
                             24 * 60 * 60);
364
161
    _ttl_queue = LRUQueue(cache_settings.ttl_queue_size, cache_settings.ttl_queue_elements,
365
161
                          std::numeric_limits<int>::max());
366
367
161
    _lru_recorder = std::make_unique<LRUQueueRecorder>(this);
368
161
    _lru_dumper = std::make_unique<CacheLRUDumper>(this, _lru_recorder.get());
369
161
    if (cache_settings.storage == "memory") {
370
18
        _storage = std::make_unique<MemFileCacheStorage>();
371
18
        _cache_base_path = "memory";
372
143
    } else {
373
143
        _storage = std::make_unique<FSFileCacheStorage>();
374
143
    }
375
376
161
    LOG(INFO) << "file cache path= " << _cache_base_path << " " << cache_settings.to_string();
377
161
}
378
379
4.25k
UInt128Wrapper BlockFileCache::hash(const std::string& path) {
380
4.25k
    uint128_t value;
381
4.25k
    sip_hash128(path.data(), path.size(), reinterpret_cast<char*>(&value));
382
4.25k
    return UInt128Wrapper(value);
383
4.25k
}
384
385
BlockFileCache::QueryFileCacheContextHolderPtr BlockFileCache::get_query_context_holder(
386
8
        const TUniqueId& query_id, int file_cache_query_limit_percent) {
387
8
    SCOPED_CACHE_LOCK(_mutex, this);
388
8
    if (!config::enable_file_cache_query_limit) {
389
1
        return {};
390
1
    }
391
392
    /// if enable_filesystem_query_cache_limit is true,
393
    /// we create context query for current query.
394
7
    auto context = get_or_set_query_context(query_id, cache_lock, file_cache_query_limit_percent);
395
7
    return std::make_unique<QueryFileCacheContextHolder>(query_id, this, context);
396
8
}
397
398
BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_query_context(
399
3.37k
        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock) {
400
3.37k
    auto query_iter = _query_map.find(query_id);
401
3.37k
    return (query_iter == _query_map.end()) ? nullptr : query_iter->second;
402
3.37k
}
403
404
6
void BlockFileCache::remove_query_context(const TUniqueId& query_id) {
405
6
    SCOPED_CACHE_LOCK(_mutex, this);
406
6
    const auto& query_iter = _query_map.find(query_id);
407
408
6
    if (query_iter != _query_map.end() && query_iter->second.use_count() <= 1) {
409
5
        _query_map.erase(query_iter);
410
5
    }
411
6
}
412
413
BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_or_set_query_context(
414
        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock,
415
7
        int file_cache_query_limit_percent) {
416
7
    if (query_id.lo == 0 && query_id.hi == 0) {
417
1
        return nullptr;
418
1
    }
419
420
6
    auto context = get_query_context(query_id, cache_lock);
421
6
    if (context) {
422
1
        return context;
423
1
    }
424
425
5
    size_t file_cache_query_limit_size = _capacity * file_cache_query_limit_percent / 100;
426
5
    if (file_cache_query_limit_size < 268435456) {
427
5
        LOG(WARNING) << "The user-set file cache query limit (" << file_cache_query_limit_size
428
5
                     << " bytes) is less than the 256MB recommended minimum. "
429
5
                     << "Consider increasing the session variable 'file_cache_query_limit_percent'"
430
5
                     << " from its current value " << file_cache_query_limit_percent << "%.";
431
5
    }
432
5
    auto query_context = std::make_shared<QueryFileCacheContext>(file_cache_query_limit_size);
433
5
    auto query_iter = _query_map.emplace(query_id, query_context).first;
434
5
    return query_iter->second;
435
6
}
436
437
void BlockFileCache::QueryFileCacheContext::remove(const UInt128Wrapper& hash, size_t offset,
438
20
                                                   std::lock_guard<std::mutex>& cache_lock) {
439
20
    auto pair = std::make_pair(hash, offset);
440
20
    auto record = records.find(pair);
441
20
    DCHECK(record != records.end());
442
20
    auto iter = record->second;
443
20
    records.erase(pair);
444
20
    lru_queue.remove(iter, cache_lock);
445
20
}
446
447
void BlockFileCache::QueryFileCacheContext::reserve(const UInt128Wrapper& hash, size_t offset,
448
                                                    size_t size,
449
30
                                                    std::lock_guard<std::mutex>& cache_lock) {
450
30
    auto pair = std::make_pair(hash, offset);
451
30
    if (records.find(pair) == records.end()) {
452
29
        auto queue_iter = lru_queue.add(hash, offset, size, cache_lock);
453
29
        records.insert({pair, queue_iter});
454
29
    }
455
30
}
456
457
140
Status BlockFileCache::initialize() {
458
140
    SCOPED_CACHE_LOCK(_mutex, this);
459
140
    return initialize_unlocked(cache_lock);
460
140
}
461
462
140
Status BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lock) {
463
140
    DCHECK(!_is_initialized);
464
140
    _is_initialized = true;
465
140
    if (config::file_cache_background_lru_dump_tail_record_num > 0) {
466
        // requirements:
467
        // 1. restored data should not overwrite the last dump
468
        // 2. restore should happen before load and async load
469
        // 3. all queues should be restored sequencially to avoid conflict
470
        // TODO(zhengyu): we can parralize them but will increase complexity, so lets check the time cost
471
        // to see if any improvement is a necessary
472
140
        restore_lru_queues_from_disk(cache_lock);
473
140
    }
474
140
    RETURN_IF_ERROR(_storage->init(this));
475
476
140
    if (auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get())) {
477
124
        if (auto* meta_store = fs_storage->get_meta_store()) {
478
124
            _ttl_mgr = std::make_unique<BlockFileCacheTtlMgr>(this, meta_store);
479
124
        }
480
124
    }
481
482
140
    _cache_background_monitor_thread = std::thread(&BlockFileCache::run_background_monitor, this);
483
140
    _cache_background_gc_thread = std::thread(&BlockFileCache::run_background_gc, this);
484
140
    _cache_background_evict_in_advance_thread =
485
140
            std::thread(&BlockFileCache::run_background_evict_in_advance, this);
486
140
    _cache_background_block_lru_update_thread =
487
140
            std::thread(&BlockFileCache::run_background_block_lru_update, this);
488
489
    // Initialize LRU dump thread and restore queues
490
140
    _cache_background_lru_dump_thread = std::thread(&BlockFileCache::run_background_lru_dump, this);
491
140
    _cache_background_lru_log_replay_thread =
492
140
            std::thread(&BlockFileCache::run_background_lru_log_replay, this);
493
494
140
    return Status::OK();
495
140
}
496
497
void BlockFileCache::update_block_lru(FileBlockSPtr block,
498
483
                                      std::lock_guard<std::mutex>& cache_lock) {
499
483
    if (!block) {
500
1
        return;
501
1
    }
502
503
482
    FileBlockCell* cell = get_cell(block->get_hash_value(), block->offset(), cache_lock);
504
482
    if (!cell || cell->file_block.get() != block.get()) {
505
1
        return;
506
1
    }
507
508
481
    if (cell->queue_iterator) {
509
481
        auto& queue = get_queue(block->cache_type());
510
481
        queue.move_to_end(*cell->queue_iterator, cache_lock);
511
481
        _lru_recorder->record_queue_event(block->cache_type(), CacheLRULogType::MOVETOBACK,
512
481
                                          block->_key.hash, block->_key.offset,
513
481
                                          block->_block_range.size());
514
481
    }
515
481
    cell->update_atime();
516
481
}
517
518
void BlockFileCache::use_cell(const FileBlockCell& cell, FileBlocks* result, bool move_iter_flag,
519
773
                              std::lock_guard<std::mutex>& cache_lock) {
520
773
    if (result) {
521
773
        result->push_back(cell.file_block);
522
773
    }
523
524
773
    auto& queue = get_queue(cell.file_block->cache_type());
525
    /// Move to the end of the queue. The iterator remains valid.
526
773
    if (cell.queue_iterator && move_iter_flag) {
527
771
        queue.move_to_end(*cell.queue_iterator, cache_lock);
528
771
        _lru_recorder->record_queue_event(cell.file_block->cache_type(),
529
771
                                          CacheLRULogType::MOVETOBACK, cell.file_block->_key.hash,
530
771
                                          cell.file_block->_key.offset, cell.size());
531
771
    }
532
533
773
    cell.update_atime();
534
773
}
535
536
template <class T>
537
    requires IsXLock<T>
538
FileBlockCell* BlockFileCache::get_cell(const UInt128Wrapper& hash, size_t offset,
539
936k
                                        T& /* cache_lock */) {
540
936k
    auto it = _files.find(hash);
541
936k
    if (it == _files.end()) {
542
2
        return nullptr;
543
2
    }
544
545
936k
    auto& offsets = it->second;
546
936k
    auto cell_it = offsets.find(offset);
547
936k
    if (cell_it == offsets.end()) {
548
3
        return nullptr;
549
3
    }
550
551
936k
    return &cell_it->second;
552
936k
}
553
554
773
bool BlockFileCache::need_to_move(FileCacheType cell_type, FileCacheType query_type) const {
555
773
    return query_type != FileCacheType::DISPOSABLE && cell_type != FileCacheType::DISPOSABLE;
556
773
}
557
558
FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheContext& context,
559
                                    const FileBlock::Range& range,
560
10.2k
                                    std::lock_guard<std::mutex>& cache_lock) {
561
    /// Given range = [left, right] and non-overlapping ordered set of file blocks,
562
    /// find list [block1, ..., blockN] of blocks which intersect with given range.
563
10.2k
    auto it = _files.find(hash);
564
10.2k
    if (it == _files.end()) {
565
3.79k
        if (_async_open_done) {
566
3.79k
            return {};
567
3.79k
        }
568
0
        FileCacheKey key;
569
0
        key.hash = hash;
570
0
        key.meta.type = context.cache_type;
571
0
        key.meta.expiration_time = context.expiration_time;
572
0
        key.meta.tablet_id = context.tablet_id;
573
0
        key.meta.table_name = context.table_name;
574
0
        key.meta.partition_name = context.partition_name;
575
0
        key.meta.context_id = context.context_id;
576
0
        _storage->load_blocks_directly_unlocked(this, key, cache_lock);
577
578
0
        it = _files.find(hash);
579
0
        if (it == _files.end()) [[unlikely]] {
580
0
            return {};
581
0
        }
582
0
    }
583
584
6.45k
    auto& file_blocks = it->second;
585
6.45k
    if (file_blocks.empty()) {
586
0
        LOG(WARNING) << "file_blocks is empty for hash=" << hash.to_string()
587
0
                     << " cache type=" << context.cache_type
588
0
                     << " cache expiration time=" << context.expiration_time
589
0
                     << " cache range=" << range.left << " " << range.right
590
0
                     << " query id=" << context.query_id;
591
0
        DCHECK(false);
592
0
        _files.erase(hash);
593
0
        return {};
594
0
    }
595
596
6.45k
    FileBlocks result;
597
6.45k
    auto block_it = file_blocks.lower_bound(range.left);
598
6.45k
    if (block_it == file_blocks.end()) {
599
        /// N - last cached block for given file hash, block{N}.offset < range.left:
600
        ///   block{N}                       block{N}
601
        /// [________                         [_______]
602
        ///     [__________]         OR                  [________]
603
        ///     ^                                        ^
604
        ///     range.left                               range.left
605
606
6.18k
        const auto& cell = file_blocks.rbegin()->second;
607
6.18k
        if (cell.file_block->range().right < range.left) {
608
6.17k
            return {};
609
6.17k
        }
610
611
14
        use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
612
14
                 cache_lock);
613
268
    } else { /// block_it <-- segmment{k}
614
268
        if (block_it != file_blocks.begin()) {
615
217
            const auto& prev_cell = std::prev(block_it)->second;
616
217
            const auto& prev_cell_range = prev_cell.file_block->range();
617
618
217
            if (range.left <= prev_cell_range.right) {
619
                ///   block{k-1}  block{k}
620
                ///   [________]   [_____
621
                ///       [___________
622
                ///       ^
623
                ///       range.left
624
625
137
                use_cell(prev_cell, &result,
626
137
                         need_to_move(prev_cell.file_block->cache_type(), context.cache_type),
627
137
                         cache_lock);
628
137
            }
629
217
        }
630
631
        ///  block{k} ...       block{k-1}  block{k}                      block{k}
632
        ///  [______              [______]     [____                        [________
633
        ///  [_________     OR              [________      OR    [______]   ^
634
        ///  ^                              ^                           ^   block{k}.offset
635
        ///  range.left                     range.left                  range.right
636
637
890
        while (block_it != file_blocks.end()) {
638
794
            const auto& cell = block_it->second;
639
794
            if (range.right < cell.file_block->range().left) {
640
172
                break;
641
172
            }
642
643
622
            use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
644
622
                     cache_lock);
645
622
            ++block_it;
646
622
        }
647
268
    }
648
649
282
    return result;
650
6.45k
}
651
652
8.60k
void BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) {
653
8.60k
    if (_need_update_lru_blocks.insert(std::move(block))) {
654
896
        *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
655
896
    }
656
8.60k
}
657
658
2
std::string BlockFileCache::clear_file_cache_async() {
659
2
    LOG(INFO) << "start clear_file_cache_async, path=" << _cache_base_path;
660
2
    _lru_dumper->remove_lru_dump_files();
661
2
    int64_t num_cells_all = 0;
662
2
    int64_t num_cells_to_delete = 0;
663
2
    int64_t num_cells_wait_recycle = 0;
664
2
    int64_t num_files_all = 0;
665
2
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::clear_file_cache_async");
666
2
    {
667
2
        SCOPED_CACHE_LOCK(_mutex, this);
668
669
2
        std::vector<FileBlockCell*> deleting_cells;
670
3
        for (auto& [_, offset_to_cell] : _files) {
671
3
            ++num_files_all;
672
36
            for (auto& [_1, cell] : offset_to_cell) {
673
36
                ++num_cells_all;
674
36
                deleting_cells.push_back(&cell);
675
36
            }
676
3
        }
677
678
        // we cannot delete the element in the loop above, because it will break the iterator
679
36
        for (auto& cell : deleting_cells) {
680
36
            if (!cell->releasable()) {
681
2
                LOG(INFO) << "cell is not releasable, hash="
682
2
                          << " offset=" << cell->file_block->offset();
683
2
                cell->file_block->set_deleting();
684
2
                ++num_cells_wait_recycle;
685
2
                continue;
686
2
            }
687
34
            FileBlockSPtr file_block = cell->file_block;
688
34
            if (file_block) {
689
34
                std::lock_guard block_lock(file_block->_mutex);
690
34
                remove(file_block, cache_lock, block_lock, false);
691
34
                ++num_cells_to_delete;
692
34
            }
693
34
        }
694
2
        clear_need_update_lru_blocks();
695
2
    }
696
697
2
    std::stringstream ss;
698
2
    ss << "finish clear_file_cache_async, path=" << _cache_base_path
699
2
       << " num_files_all=" << num_files_all << " num_cells_all=" << num_cells_all
700
2
       << " num_cells_to_delete=" << num_cells_to_delete
701
2
       << " num_cells_wait_recycle=" << num_cells_wait_recycle;
702
2
    auto msg = ss.str();
703
2
    LOG(INFO) << msg;
704
2
    _lru_dumper->remove_lru_dump_files();
705
2
    return msg;
706
2
}
707
708
FileBlocks BlockFileCache::split_range_into_cells(const UInt128Wrapper& hash,
709
                                                  const CacheContext& context, size_t offset,
710
                                                  size_t size, FileBlock::State state,
711
10.1k
                                                  std::lock_guard<std::mutex>& cache_lock) {
712
10.1k
    DCHECK(size > 0);
713
714
10.1k
    auto current_pos = offset;
715
10.1k
    auto end_pos_non_included = offset + size;
716
717
10.1k
    size_t current_size = 0;
718
10.1k
    size_t remaining_size = size;
719
720
10.1k
    FileBlocks file_blocks;
721
20.3k
    while (current_pos < end_pos_non_included) {
722
10.2k
        current_size = std::min(remaining_size, _max_file_block_size);
723
10.2k
        remaining_size -= current_size;
724
10.2k
        state = try_reserve(hash, context, current_pos, current_size, cache_lock)
725
10.2k
                        ? state
726
10.2k
                        : FileBlock::State::SKIP_CACHE;
727
10.2k
        if (state == FileBlock::State::SKIP_CACHE) [[unlikely]] {
728
2.00k
            FileCacheKey key;
729
2.00k
            key.hash = hash;
730
2.00k
            key.offset = current_pos;
731
2.00k
            key.meta.type = context.cache_type;
732
2.00k
            key.meta.expiration_time = context.expiration_time;
733
2.00k
            key.meta.tablet_id = context.tablet_id;
734
2.00k
            key.meta.table_name = context.table_name;
735
2.00k
            key.meta.partition_name = context.partition_name;
736
2.00k
            key.meta.context_id = context.context_id;
737
2.00k
            auto file_block = std::make_shared<FileBlock>(key, current_size, this,
738
2.00k
                                                          FileBlock::State::SKIP_CACHE);
739
2.00k
            file_blocks.push_back(std::move(file_block));
740
8.20k
        } else {
741
8.20k
            auto* cell = add_cell(hash, context, current_pos, current_size, state, cache_lock);
742
8.20k
            if (cell) {
743
8.20k
                file_blocks.push_back(cell->file_block);
744
8.20k
                if (!context.is_cold_data) {
745
8.20k
                    cell->update_atime();
746
8.20k
                }
747
8.20k
            }
748
8.20k
            if (_ttl_mgr && context.tablet_id != 0) {
749
970
                _ttl_mgr->register_tablet_id(context.tablet_id);
750
970
            }
751
8.20k
        }
752
753
10.2k
        current_pos += current_size;
754
10.2k
    }
755
756
10.1k
    DCHECK(file_blocks.empty() || offset + size - 1 == file_blocks.back()->range().right);
757
10.1k
    return file_blocks;
758
10.1k
}
759
760
void BlockFileCache::fill_holes_with_empty_file_blocks(FileBlocks& file_blocks,
761
                                                       const UInt128Wrapper& hash,
762
                                                       const CacheContext& context,
763
                                                       const FileBlock::Range& range,
764
278
                                                       std::lock_guard<std::mutex>& cache_lock) {
765
    /// There are blocks [block1, ..., blockN]
766
    /// (non-overlapping, non-empty, ascending-ordered) which (maybe partially)
767
    /// intersect with given range.
768
769
    /// It can have holes:
770
    /// [____________________]         -- requested range
771
    ///     [____]  [_]   [_________]  -- intersecting cache [block1, ..., blockN]
772
    ///
773
    /// For each such hole create a cell with file block state EMPTY.
774
775
278
    auto it = file_blocks.begin();
776
278
    auto block_range = (*it)->range();
777
778
278
    size_t current_pos = 0;
779
278
    if (block_range.left < range.left) {
780
        ///    [_______     -- requested range
781
        /// [_______
782
        /// ^
783
        /// block1
784
785
151
        current_pos = block_range.right + 1;
786
151
        ++it;
787
151
    } else {
788
127
        current_pos = range.left;
789
127
    }
790
791
900
    while (current_pos <= range.right && it != file_blocks.end()) {
792
622
        block_range = (*it)->range();
793
794
622
        if (current_pos == block_range.left) {
795
513
            current_pos = block_range.right + 1;
796
513
            ++it;
797
513
            continue;
798
513
        }
799
800
622
        DCHECK(current_pos < block_range.left);
801
802
109
        auto hole_size = block_range.left - current_pos;
803
804
109
        file_blocks.splice(it, split_range_into_cells(hash, context, current_pos, hole_size,
805
109
                                                      FileBlock::State::EMPTY, cache_lock));
806
807
109
        current_pos = block_range.right + 1;
808
109
        ++it;
809
109
    }
810
811
278
    if (current_pos <= range.right) {
812
        ///   ________]     -- requested range
813
        ///   _____]
814
        ///        ^
815
        /// blockN
816
817
86
        auto hole_size = range.right - current_pos + 1;
818
819
86
        file_blocks.splice(file_blocks.end(),
820
86
                           split_range_into_cells(hash, context, current_pos, hole_size,
821
86
                                                  FileBlock::State::EMPTY, cache_lock));
822
86
    }
823
278
}
824
825
FileBlocksHolder BlockFileCache::get_or_set(const UInt128Wrapper& hash, size_t offset, size_t size,
826
10.2k
                                            CacheContext& context) {
827
10.2k
    FileBlock::Range range(offset, offset + size - 1);
828
829
10.2k
    ReadStatistics* stats = context.stats;
830
10.2k
    DCHECK(stats != nullptr);
831
10.2k
    MonotonicStopWatch sw;
832
10.2k
    sw.start();
833
10.2k
    ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->increment();
834
10.2k
    std::lock_guard cache_lock(_mutex);
835
10.2k
    ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->decrement();
836
10.2k
    stats->lock_wait_timer += sw.elapsed_time();
837
10.2k
    FileBlocks file_blocks;
838
10.2k
    int64_t duration = 0;
839
10.2k
    {
840
10.2k
        SCOPED_RAW_TIMER(&duration);
841
        /// Get all blocks which intersect with the given range.
842
10.2k
        {
843
10.2k
            SCOPED_RAW_TIMER(&stats->get_timer);
844
10.2k
            file_blocks = get_impl(hash, context, range, cache_lock);
845
10.2k
        }
846
847
10.2k
        if (file_blocks.empty()) {
848
9.97k
            SCOPED_RAW_TIMER(&stats->set_timer);
849
9.97k
            file_blocks = split_range_into_cells(hash, context, offset, size,
850
9.97k
                                                 FileBlock::State::EMPTY, cache_lock);
851
9.97k
        } else {
852
277
            SCOPED_RAW_TIMER(&stats->set_timer);
853
277
            fill_holes_with_empty_file_blocks(file_blocks, hash, context, range, cache_lock);
854
277
        }
855
10.2k
        DCHECK(!file_blocks.empty());
856
10.2k
        *_num_read_blocks << file_blocks.size();
857
10.2k
        if (!context.is_warmup) {
858
10.2k
            *_no_warmup_num_read_blocks << file_blocks.size();
859
10.2k
        }
860
10.9k
        for (auto& block : file_blocks) {
861
10.9k
            size_t block_size = block->range().size();
862
10.9k
            *_total_read_size_metrics << block_size;
863
10.9k
            if (block->state_unsafe() == FileBlock::State::DOWNLOADED) {
864
729
                *_num_hit_blocks << 1;
865
729
                *_total_hit_size_metrics << block_size;
866
729
                if (!context.is_warmup) {
867
729
                    *_no_warmup_num_hit_blocks << 1;
868
729
                }
869
729
            }
870
10.9k
        }
871
10.2k
    }
872
10.2k
    *_get_or_set_latency_us << (duration / 1000);
873
10.2k
    return FileBlocksHolder(std::move(file_blocks));
874
10.2k
}
875
876
FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& hash, const CacheContext& context,
877
                                        size_t offset, size_t size, FileBlock::State state,
878
8.40k
                                        std::lock_guard<std::mutex>& cache_lock) {
879
    /// Create a file block cell and put it in `files` map by [hash][offset].
880
8.40k
    if (size == 0) {
881
0
        return nullptr; /// Empty files are not cached.
882
0
    }
883
884
8.40k
    VLOG_DEBUG << "Adding file block to cache. size=" << size << " hash=" << hash.to_string()
885
0
               << " offset=" << offset << " cache_type=" << cache_type_to_string(context.cache_type)
886
0
               << " expiration_time=" << context.expiration_time
887
0
               << " tablet_id=" << context.tablet_id;
888
889
8.40k
    if (size > 1024 * 1024 * 1024) {
890
1
        LOG(WARNING) << "File block size is too large for a block, reject. size=" << size
891
1
                     << " hash=" << hash.to_string() << " offset=" << offset
892
1
                     << " stack:" << get_stack_trace();
893
1
        return nullptr;
894
1
    }
895
896
8.40k
    auto& offsets = _files[hash];
897
8.40k
    auto itr = offsets.find(offset);
898
8.40k
    if (itr != offsets.end()) {
899
5
        VLOG_DEBUG << "Cache already exists for hash: " << hash.to_string()
900
0
                   << ", offset: " << offset << ", size: " << size
901
0
                   << ".\nCurrent cache structure: " << dump_structure_unlocked(hash, cache_lock);
902
5
        return &(itr->second);
903
5
    }
904
905
8.40k
    FileCacheKey key;
906
8.40k
    key.hash = hash;
907
8.40k
    key.offset = offset;
908
8.40k
    key.meta.type = context.cache_type;
909
8.40k
    key.meta.expiration_time = context.expiration_time;
910
8.40k
    key.meta.tablet_id = context.tablet_id;
911
8.40k
    key.meta.table_name = context.table_name;
912
8.40k
    key.meta.partition_name = context.partition_name;
913
8.40k
    key.meta.context_id = context.context_id;
914
8.40k
    FileBlockCell cell(std::make_shared<FileBlock>(key, size, this, state), cache_lock);
915
8.40k
    Status st;
916
8.40k
    if (context.expiration_time == 0 && context.cache_type == FileCacheType::TTL) {
917
0
        st = cell.file_block->change_cache_type_lock(FileCacheType::NORMAL, cache_lock);
918
8.40k
    } else if (context.cache_type != FileCacheType::TTL && context.expiration_time != 0) {
919
1
        st = cell.file_block->change_cache_type_lock(FileCacheType::TTL, cache_lock);
920
1
    }
921
8.40k
    if (!st.ok()) {
922
0
        LOG(WARNING) << "Cannot change cache type. expiration_time=" << context.expiration_time
923
0
                     << " cache_type=" << cache_type_to_string(context.cache_type)
924
0
                     << " error=" << st.msg();
925
0
    }
926
927
8.40k
    auto& queue = get_queue(cell.file_block->cache_type());
928
8.40k
    cell.queue_iterator = queue.add(hash, offset, size, cache_lock);
929
8.40k
    _lru_recorder->record_queue_event(cell.file_block->cache_type(), CacheLRULogType::ADD,
930
8.40k
                                      cell.file_block->get_hash_value(), cell.file_block->offset(),
931
8.40k
                                      cell.size());
932
933
8.40k
    if (cell.file_block->cache_type() == FileCacheType::TTL) {
934
3.92k
        _cur_ttl_size += cell.size();
935
3.92k
    }
936
8.40k
    auto [it, _] = offsets.insert(std::make_pair(offset, std::move(cell)));
937
8.40k
    _cur_cache_size += size;
938
8.40k
    return &(it->second);
939
8.40k
}
940
941
0
size_t BlockFileCache::try_release() {
942
0
    SCOPED_CACHE_LOCK(_mutex, this);
943
0
    std::vector<FileBlockCell*> trash;
944
0
    for (auto& [hash, blocks] : _files) {
945
0
        for (auto& [offset, cell] : blocks) {
946
0
            if (cell.releasable()) {
947
0
                trash.emplace_back(&cell);
948
0
            } else {
949
0
                cell.file_block->set_deleting();
950
0
            }
951
0
        }
952
0
    }
953
0
    size_t remove_size = 0;
954
0
    for (auto& cell : trash) {
955
0
        FileBlockSPtr file_block = cell->file_block;
956
0
        std::lock_guard lc(cell->file_block->_mutex);
957
0
        remove_size += file_block->range().size();
958
0
        remove(file_block, cache_lock, lc);
959
0
        VLOG_DEBUG << "try_release " << _cache_base_path
960
0
                   << " hash=" << file_block->get_hash_value().to_string()
961
0
                   << " offset=" << file_block->offset();
962
0
    }
963
0
    *_evict_by_try_release << remove_size;
964
0
    LOG(INFO) << "Released " << trash.size() << " blocks in file cache " << _cache_base_path;
965
0
    return trash.size();
966
0
}
967
968
44.4k
LRUQueue& BlockFileCache::get_queue(FileCacheType type) {
969
44.4k
    switch (type) {
970
11.2k
    case FileCacheType::INDEX:
971
11.2k
        return _index_queue;
972
11.0k
    case FileCacheType::DISPOSABLE:
973
11.0k
        return _disposable_queue;
974
16.3k
    case FileCacheType::NORMAL:
975
16.3k
        return _normal_queue;
976
5.79k
    case FileCacheType::TTL:
977
5.79k
        return _ttl_queue;
978
0
    default:
979
0
        DCHECK(false);
980
44.4k
    }
981
0
    return _normal_queue;
982
44.4k
}
983
984
137
const LRUQueue& BlockFileCache::get_queue(FileCacheType type) const {
985
137
    switch (type) {
986
33
    case FileCacheType::INDEX:
987
33
        return _index_queue;
988
1
    case FileCacheType::DISPOSABLE:
989
1
        return _disposable_queue;
990
103
    case FileCacheType::NORMAL:
991
103
        return _normal_queue;
992
0
    case FileCacheType::TTL:
993
0
        return _ttl_queue;
994
0
    default:
995
0
        DCHECK(false);
996
137
    }
997
0
    return _normal_queue;
998
137
}
999
1000
void BlockFileCache::remove_file_blocks(std::vector<FileBlockCell*>& to_evict,
1001
                                        std::lock_guard<std::mutex>& cache_lock, bool sync,
1002
13.3k
                                        std::string& reason) {
1003
13.3k
    auto remove_file_block_if = [&](FileBlockCell* cell) {
1004
1.29k
        FileBlockSPtr file_block = cell->file_block;
1005
1.29k
        if (file_block) {
1006
1.29k
            std::lock_guard block_lock(file_block->_mutex);
1007
1.29k
            remove(file_block, cache_lock, block_lock, sync);
1008
1.29k
            VLOG_DEBUG << "remove_file_blocks"
1009
0
                       << " hash=" << file_block->get_hash_value().to_string()
1010
0
                       << " offset=" << file_block->offset() << " reason=" << reason;
1011
1.29k
        }
1012
1.29k
    };
1013
13.3k
    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
1014
13.3k
}
1015
1016
void BlockFileCache::find_evict_candidates(LRUQueue& queue, size_t size, size_t cur_cache_size,
1017
                                           size_t& removed_size,
1018
                                           std::vector<FileBlockCell*>& to_evict,
1019
                                           std::lock_guard<std::mutex>& cache_lock,
1020
3.22k
                                           size_t& cur_removed_size, bool evict_in_advance) {
1021
933k
    for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1022
933k
        if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1023
1.22k
            break;
1024
1.22k
        }
1025
931k
        auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1026
1027
931k
        DCHECK(cell) << "Cache became inconsistent. key: " << entry_key.to_string()
1028
0
                     << ", offset: " << entry_offset;
1029
1030
931k
        size_t cell_size = cell->size();
1031
931k
        DCHECK(entry_size == cell_size);
1032
1033
931k
        if (cell->releasable()) {
1034
1.27k
            auto& file_block = cell->file_block;
1035
1036
1.27k
            std::lock_guard block_lock(file_block->_mutex);
1037
1.27k
            DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1038
1.27k
            to_evict.push_back(cell);
1039
1.27k
            removed_size += cell_size;
1040
1.27k
            cur_removed_size += cell_size;
1041
1.27k
        }
1042
931k
    }
1043
3.22k
}
1044
1045
// 1. if async load file cache not finish
1046
//     a. evict from lru queue
1047
// 2. if ttl cache
1048
//     a. evict from disposable/normal/index queue one by one
1049
// 3. if dont reach query limit or dont have query limit
1050
//     a. evict from other queue
1051
//     b. evict from current queue
1052
//         a.1 if the data belong write, then just evict cold data
1053
// 4. if reach query limit
1054
//     a. evict from query queue
1055
//     b. evict from other queue
1056
bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const CacheContext& context,
1057
                                 size_t offset, size_t size,
1058
10.2k
                                 std::lock_guard<std::mutex>& cache_lock) {
1059
10.2k
    if (!_async_open_done) {
1060
2
        return try_reserve_during_async_load(size, cache_lock);
1061
2
    }
1062
    // use this strategy in scenarios where there is insufficient disk capacity or insufficient number of inodes remaining
1063
    // directly eliminate 5 times the size of the space
1064
10.2k
    if (_disk_resource_limit_mode) {
1065
1
        size = 5 * size;
1066
1
    }
1067
1068
10.2k
    auto query_context = config::enable_file_cache_query_limit &&
1069
10.2k
                                         (context.query_id.hi != 0 || context.query_id.lo != 0)
1070
10.2k
                                 ? get_query_context(context.query_id, cache_lock)
1071
10.2k
                                 : nullptr;
1072
10.2k
    if (!query_context) {
1073
10.1k
        return try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock);
1074
10.1k
    } else if (query_context->get_cache_size(cache_lock) + size <=
1075
31
               query_context->get_max_cache_size()) {
1076
16
        return try_reserve_for_lru(hash, query_context, context, offset, size, cache_lock);
1077
16
    }
1078
15
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
1079
15
                               std::chrono::steady_clock::now().time_since_epoch())
1080
15
                               .count();
1081
15
    auto& queue = get_queue(context.cache_type);
1082
15
    size_t removed_size = 0;
1083
15
    size_t ghost_remove_size = 0;
1084
15
    size_t queue_size = queue.get_capacity(cache_lock);
1085
15
    size_t cur_cache_size = _cur_cache_size;
1086
15
    size_t query_context_cache_size = query_context->get_cache_size(cache_lock);
1087
1088
15
    std::vector<LRUQueue::Iterator> ghost;
1089
15
    std::vector<FileBlockCell*> to_evict;
1090
1091
15
    size_t max_size = queue.get_max_size();
1092
48
    auto is_overflow = [&] {
1093
48
        return _disk_resource_limit_mode ? removed_size < size
1094
48
                                         : cur_cache_size + size - removed_size > _capacity ||
1095
48
                                                   (queue_size + size - removed_size > max_size) ||
1096
48
                                                   (query_context_cache_size + size -
1097
38
                                                            (removed_size + ghost_remove_size) >
1098
38
                                                    query_context->get_max_cache_size());
1099
48
    };
1100
1101
    /// Select the cache from the LRU queue held by query for expulsion.
1102
35
    for (auto iter = query_context->queue().begin(); iter != query_context->queue().end(); iter++) {
1103
33
        if (!is_overflow()) {
1104
13
            break;
1105
13
        }
1106
1107
20
        auto* cell = get_cell(iter->hash, iter->offset, cache_lock);
1108
1109
20
        if (!cell) {
1110
            /// The cache corresponding to this record may be swapped out by
1111
            /// other queries, so it has become invalid.
1112
4
            ghost.push_back(iter);
1113
4
            ghost_remove_size += iter->size;
1114
16
        } else {
1115
16
            size_t cell_size = cell->size();
1116
16
            DCHECK(iter->size == cell_size);
1117
1118
16
            if (cell->releasable()) {
1119
16
                auto& file_block = cell->file_block;
1120
1121
16
                std::lock_guard block_lock(file_block->_mutex);
1122
16
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1123
16
                to_evict.push_back(cell);
1124
16
                removed_size += cell_size;
1125
16
            }
1126
16
        }
1127
20
    }
1128
1129
16
    auto remove_file_block_if = [&](FileBlockCell* cell) {
1130
16
        FileBlockSPtr file_block = cell->file_block;
1131
16
        if (file_block) {
1132
16
            query_context->remove(file_block->get_hash_value(), file_block->offset(), cache_lock);
1133
16
            std::lock_guard block_lock(file_block->_mutex);
1134
16
            remove(file_block, cache_lock, block_lock);
1135
16
        }
1136
16
    };
1137
1138
15
    for (auto& iter : ghost) {
1139
4
        query_context->remove(iter->hash, iter->offset, cache_lock);
1140
4
    }
1141
1142
15
    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
1143
1144
15
    if (is_overflow() &&
1145
15
        !try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock)) {
1146
1
        return false;
1147
1
    }
1148
14
    query_context->reserve(hash, offset, size, cache_lock);
1149
14
    return true;
1150
15
}
1151
1152
2
void BlockFileCache::try_evict_in_advance(size_t size, std::lock_guard<std::mutex>& cache_lock) {
1153
2
    UInt128Wrapper hash = UInt128Wrapper();
1154
2
    size_t offset = 0;
1155
2
    CacheContext context;
1156
    /* we pick NORMAL and TTL cache to evict in advance
1157
     * we reserve for them but won't acutually give space to them
1158
     * on the contrary, NORMAL and TTL may sacrifice by LRU evicting themselves
1159
     * other cache types cannot be exempted because we will evict what they have stolen before LRU evicting
1160
     * in summary: all cache types will shrink somewhat, and NORMAL and TTL shrink the most, to make sure the cache is not full
1161
     */
1162
2
    context.cache_type = FileCacheType::NORMAL;
1163
2
    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
1164
2
    context.cache_type = FileCacheType::TTL;
1165
2
    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
1166
2
}
1167
1168
// remove specific cache synchronously, for critical operations
1169
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
1170
8
void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) {
1171
8
    std::string reason = "remove_if_cached";
1172
8
    SCOPED_CACHE_LOCK(_mutex, this);
1173
8
    auto iter = _files.find(file_key);
1174
8
    std::vector<FileBlockCell*> to_remove;
1175
8
    if (iter != _files.end()) {
1176
14
        for (auto& [_, cell] : iter->second) {
1177
14
            if (cell.releasable()) {
1178
13
                to_remove.push_back(&cell);
1179
13
            } else {
1180
1
                cell.file_block->set_deleting();
1181
1
            }
1182
14
        }
1183
6
    }
1184
8
    remove_file_blocks(to_remove, cache_lock, true, reason);
1185
8
}
1186
1187
// the async version of remove_if_cached, for background operations
1188
// cache meta is deleted synchronously if not in use, and the block file is deleted asynchronously
1189
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
1190
1
void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) {
1191
1
    std::string reason = "remove_if_cached_async";
1192
1
    SCOPED_CACHE_LOCK(_mutex, this);
1193
1194
1
    auto iter = _files.find(file_key);
1195
1
    std::vector<FileBlockCell*> to_remove;
1196
1
    if (iter != _files.end()) {
1197
1
        for (auto& [_, cell] : iter->second) {
1198
1
            *_gc_evict_bytes_metrics << cell.size();
1199
1
            *_gc_evict_count_metrics << 1;
1200
1
            if (cell.releasable()) {
1201
0
                to_remove.push_back(&cell);
1202
1
            } else {
1203
1
                cell.file_block->set_deleting();
1204
1
            }
1205
1
        }
1206
1
    }
1207
1
    remove_file_blocks(to_remove, cache_lock, false, reason);
1208
1
}
1209
1210
std::vector<FileCacheType> BlockFileCache::get_other_cache_type_without_ttl(
1211
10.1k
        FileCacheType cur_cache_type) {
1212
10.1k
    switch (cur_cache_type) {
1213
3.91k
    case FileCacheType::TTL:
1214
3.91k
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
1215
671
    case FileCacheType::INDEX:
1216
671
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL};
1217
4.99k
    case FileCacheType::NORMAL:
1218
4.99k
        return {FileCacheType::DISPOSABLE, FileCacheType::INDEX};
1219
617
    case FileCacheType::DISPOSABLE:
1220
617
        return {FileCacheType::NORMAL, FileCacheType::INDEX};
1221
0
    default:
1222
0
        return {};
1223
10.1k
    }
1224
0
    return {};
1225
10.1k
}
1226
1227
3.13k
std::vector<FileCacheType> BlockFileCache::get_other_cache_type(FileCacheType cur_cache_type) {
1228
3.13k
    switch (cur_cache_type) {
1229
284
    case FileCacheType::TTL:
1230
284
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
1231
143
    case FileCacheType::INDEX:
1232
143
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::TTL};
1233
2.55k
    case FileCacheType::NORMAL:
1234
2.55k
        return {FileCacheType::DISPOSABLE, FileCacheType::INDEX, FileCacheType::TTL};
1235
152
    case FileCacheType::DISPOSABLE:
1236
152
        return {FileCacheType::NORMAL, FileCacheType::INDEX, FileCacheType::TTL};
1237
0
    default:
1238
0
        return {};
1239
3.13k
    }
1240
0
    return {};
1241
3.13k
}
1242
1243
void BlockFileCache::reset_range(const UInt128Wrapper& hash, size_t offset, size_t old_size,
1244
4
                                 size_t new_size, std::lock_guard<std::mutex>& cache_lock) {
1245
4
    DCHECK(_files.find(hash) != _files.end() &&
1246
4
           _files.find(hash)->second.find(offset) != _files.find(hash)->second.end());
1247
4
    FileBlockCell* cell = get_cell(hash, offset, cache_lock);
1248
4
    DCHECK(cell != nullptr);
1249
4
    if (cell == nullptr) {
1250
0
        LOG(WARNING) << "reset_range skipped because cache cell is missing. hash="
1251
0
                     << hash.to_string() << " offset=" << offset << " old_size=" << old_size
1252
0
                     << " new_size=" << new_size;
1253
0
        return;
1254
0
    }
1255
4
    if (cell->queue_iterator) {
1256
4
        auto& queue = get_queue(cell->file_block->cache_type());
1257
4
        DCHECK(queue.contains(hash, offset, cache_lock));
1258
4
        queue.resize(*cell->queue_iterator, new_size, cache_lock);
1259
4
        _lru_recorder->record_queue_event(cell->file_block->cache_type(), CacheLRULogType::RESIZE,
1260
4
                                          cell->file_block->get_hash_value(),
1261
4
                                          cell->file_block->offset(), new_size);
1262
4
    }
1263
4
    _cur_cache_size -= old_size;
1264
4
    _cur_cache_size += new_size;
1265
4
}
1266
1267
bool BlockFileCache::try_reserve_from_other_queue_by_time_interval(
1268
        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
1269
10.1k
        int64_t cur_time, std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
1270
10.1k
    size_t removed_size = 0;
1271
10.1k
    size_t cur_cache_size = _cur_cache_size;
1272
10.1k
    std::vector<FileBlockCell*> to_evict;
1273
24.3k
    for (FileCacheType cache_type : other_cache_types) {
1274
24.3k
        auto& queue = get_queue(cache_type);
1275
24.3k
        size_t remove_size_per_type = 0;
1276
24.3k
        for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1277
1.43k
            if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1278
697
                break;
1279
697
            }
1280
739
            auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1281
739
            DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
1282
0
                         << ", offset: " << entry_offset;
1283
1284
739
            size_t cell_size = cell->size();
1285
739
            DCHECK(entry_size == cell_size);
1286
1287
739
            if (cell->atime == 0 ? true : cell->atime + queue.get_hot_data_interval() > cur_time) {
1288
737
                break;
1289
737
            }
1290
1291
2
            if (cell->releasable()) {
1292
2
                auto& file_block = cell->file_block;
1293
2
                std::lock_guard block_lock(file_block->_mutex);
1294
2
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1295
2
                to_evict.push_back(cell);
1296
2
                removed_size += cell_size;
1297
2
                remove_size_per_type += cell_size;
1298
2
            }
1299
2
        }
1300
24.3k
        *(_evict_by_time_metrics_matrix[cache_type][cur_type]) << remove_size_per_type;
1301
24.3k
    }
1302
10.1k
    bool is_sync_removal = !evict_in_advance;
1303
10.1k
    std::string reason = std::string("try_reserve_by_time ") +
1304
10.1k
                         " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1305
10.1k
    remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1306
1307
10.1k
    return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
1308
10.1k
}
1309
1310
bool BlockFileCache::is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size,
1311
947k
                                 bool evict_in_advance) const {
1312
947k
    bool ret = false;
1313
947k
    if (evict_in_advance) { // we don't need to check _need_evict_cache_in_advance
1314
23
        ret = (removed_size < need_size);
1315
23
        return ret;
1316
23
    }
1317
947k
    if (_disk_resource_limit_mode) {
1318
4
        ret = (removed_size < need_size);
1319
947k
    } else {
1320
947k
        ret = (cur_cache_size + need_size - removed_size > _capacity);
1321
947k
    }
1322
947k
    return ret;
1323
947k
}
1324
1325
bool BlockFileCache::try_reserve_from_other_queue_by_size(
1326
        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
1327
415
        std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
1328
415
    size_t removed_size = 0;
1329
415
    size_t cur_cache_size = _cur_cache_size;
1330
415
    std::vector<FileBlockCell*> to_evict;
1331
    // we follow the privilege defined in get_other_cache_types to evict
1332
1.24k
    for (FileCacheType cache_type : other_cache_types) {
1333
1.24k
        auto& queue = get_queue(cache_type);
1334
1335
        // we will not drain each of them to the bottom -- i.e., we only
1336
        // evict what they have stolen.
1337
1.24k
        size_t cur_queue_size = queue.get_capacity(cache_lock);
1338
1.24k
        size_t cur_queue_max_size = queue.get_max_size();
1339
1.24k
        if (cur_queue_size <= cur_queue_max_size) {
1340
740
            continue;
1341
740
        }
1342
505
        size_t cur_removed_size = 0;
1343
505
        find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
1344
505
                              cur_removed_size, evict_in_advance);
1345
505
        *(_evict_by_size_metrics_matrix[cache_type][cur_type]) << cur_removed_size;
1346
505
    }
1347
415
    bool is_sync_removal = !evict_in_advance;
1348
415
    std::string reason = std::string("try_reserve_by_size") +
1349
415
                         " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1350
415
    remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1351
415
    return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
1352
415
}
1353
1354
bool BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type, size_t size,
1355
                                                  int64_t cur_time,
1356
                                                  std::lock_guard<std::mutex>& cache_lock,
1357
10.1k
                                                  bool evict_in_advance) {
1358
    // currently, TTL cache is not considered as a candidate
1359
10.1k
    auto other_cache_types = get_other_cache_type_without_ttl(cur_cache_type);
1360
10.1k
    bool reserve_success = try_reserve_from_other_queue_by_time_interval(
1361
10.1k
            cur_cache_type, other_cache_types, size, cur_time, cache_lock, evict_in_advance);
1362
10.1k
    if (reserve_success || !config::file_cache_enable_evict_from_other_queue_by_size) {
1363
7.06k
        return reserve_success;
1364
7.06k
    }
1365
1366
3.13k
    other_cache_types = get_other_cache_type(cur_cache_type);
1367
3.13k
    auto& cur_queue = get_queue(cur_cache_type);
1368
3.13k
    size_t cur_queue_size = cur_queue.get_capacity(cache_lock);
1369
3.13k
    size_t cur_queue_max_size = cur_queue.get_max_size();
1370
    // Hit the soft limit by self, cannot remove from other queues
1371
3.13k
    if (_cur_cache_size + size > _capacity && cur_queue_size + size > cur_queue_max_size) {
1372
2.71k
        return false;
1373
2.71k
    }
1374
415
    return try_reserve_from_other_queue_by_size(cur_cache_type, other_cache_types, size, cache_lock,
1375
415
                                                evict_in_advance);
1376
3.13k
}
1377
1378
bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash,
1379
                                         QueryFileCacheContextPtr query_context,
1380
                                         const CacheContext& context, size_t offset, size_t size,
1381
                                         std::lock_guard<std::mutex>& cache_lock,
1382
10.1k
                                         bool evict_in_advance) {
1383
10.1k
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
1384
10.1k
                               std::chrono::steady_clock::now().time_since_epoch())
1385
10.1k
                               .count();
1386
10.1k
    if (!try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock,
1387
10.1k
                                      evict_in_advance)) {
1388
2.71k
        auto& queue = get_queue(context.cache_type);
1389
2.71k
        size_t removed_size = 0;
1390
2.71k
        size_t cur_cache_size = _cur_cache_size;
1391
1392
2.71k
        std::vector<FileBlockCell*> to_evict;
1393
2.71k
        size_t cur_removed_size = 0;
1394
2.71k
        find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
1395
2.71k
                              cur_removed_size, evict_in_advance);
1396
2.71k
        bool is_sync_removal = !evict_in_advance;
1397
2.71k
        std::string reason = std::string("try_reserve for cache type ") +
1398
2.71k
                             cache_type_to_string(context.cache_type) +
1399
2.71k
                             " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1400
2.71k
        remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1401
2.71k
        *(_evict_by_self_lru_metrics_matrix[context.cache_type]) << cur_removed_size;
1402
1403
2.71k
        if (is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1404
1.99k
            return false;
1405
1.99k
        }
1406
2.71k
    }
1407
1408
8.19k
    if (query_context) {
1409
16
        query_context->reserve(hash, offset, size, cache_lock);
1410
16
    }
1411
8.19k
    return true;
1412
10.1k
}
1413
1414
template <class T, class U>
1415
    requires IsXLock<T> && IsXLock<U>
1416
3.34k
void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lock, bool sync) {
1417
3.34k
    auto hash = file_block->get_hash_value();
1418
3.34k
    auto offset = file_block->offset();
1419
3.34k
    auto type = file_block->cache_type();
1420
3.34k
    auto expiration_time = file_block->expiration_time();
1421
3.34k
    auto tablet_id = file_block->tablet_id();
1422
3.34k
    auto* cell = get_cell(hash, offset, cache_lock);
1423
3.34k
    file_block->cell = nullptr;
1424
3.34k
    DCHECK(cell);
1425
3.34k
    DCHECK(cell->queue_iterator);
1426
3.34k
    if (cell->queue_iterator) {
1427
3.34k
        auto& queue = get_queue(file_block->cache_type());
1428
3.34k
        queue.remove(*cell->queue_iterator, cache_lock);
1429
3.34k
        _lru_recorder->record_queue_event(file_block->cache_type(), CacheLRULogType::REMOVE,
1430
3.34k
                                          cell->file_block->get_hash_value(),
1431
3.34k
                                          cell->file_block->offset(), cell->size());
1432
3.34k
    }
1433
3.34k
    *_queue_evict_size_metrics[static_cast<int>(file_block->cache_type())]
1434
3.34k
            << file_block->range().size();
1435
3.34k
    *_total_evict_size_metrics << file_block->range().size();
1436
1437
3.34k
    VLOG_DEBUG << "Removing file block from cache. hash: " << hash.to_string()
1438
0
               << ", offset: " << offset << ", size: " << file_block->range().size()
1439
0
               << ", type: " << cache_type_to_string(type);
1440
1441
3.34k
    if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADED) {
1442
1.36k
        FileCacheKey key;
1443
1.36k
        key.hash = hash;
1444
1.36k
        key.offset = offset;
1445
1.36k
        key.meta.type = type;
1446
1.36k
        key.meta.expiration_time = expiration_time;
1447
1.36k
        key.meta.tablet_id = tablet_id;
1448
1.36k
        if (sync) {
1449
1.31k
            int64_t duration_ns = 0;
1450
1.31k
            Status st;
1451
1.31k
            {
1452
1.31k
                SCOPED_RAW_TIMER(&duration_ns);
1453
1.31k
                st = _storage->remove(key);
1454
1.31k
            }
1455
1.31k
            *_storage_sync_remove_latency_us << (duration_ns / 1000);
1456
1.31k
            if (!st.ok()) {
1457
0
                LOG_WARNING("").error(st);
1458
0
            }
1459
1.31k
        } else {
1460
            // the file will be deleted in the bottom half
1461
            // so there will be a window that the file is not in the cache but still in the storage
1462
            // but it's ok, because the rowset is stale already
1463
46
            bool ret = _recycle_keys.enqueue(key);
1464
46
            if (ret) [[likely]] {
1465
46
                *_recycle_keys_length_recorder << _recycle_keys.size_approx();
1466
46
            } else {
1467
0
                LOG_WARNING("Failed to push recycle key to queue, do it synchronously");
1468
0
                int64_t duration_ns = 0;
1469
0
                Status st;
1470
0
                {
1471
0
                    SCOPED_RAW_TIMER(&duration_ns);
1472
0
                    st = _storage->remove(key);
1473
0
                }
1474
0
                *_storage_retry_sync_remove_latency_us << (duration_ns / 1000);
1475
0
                if (!st.ok()) {
1476
0
                    LOG_WARNING("").error(st);
1477
0
                }
1478
0
            }
1479
46
        }
1480
1.98k
    } else if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADING) {
1481
0
        file_block->set_deleting();
1482
0
        return;
1483
0
    }
1484
3.34k
    _cur_cache_size -= file_block->range().size();
1485
3.34k
    if (FileCacheType::TTL == type) {
1486
1.29k
        _cur_ttl_size -= file_block->range().size();
1487
1.29k
    }
1488
3.34k
    auto it = _files.find(hash);
1489
3.34k
    if (it != _files.end()) {
1490
3.34k
        it->second.erase(file_block->offset());
1491
3.34k
        if (it->second.empty()) {
1492
1.27k
            _files.erase(hash);
1493
1.27k
        }
1494
3.34k
    }
1495
3.34k
    *_num_removed_blocks << 1;
1496
3.34k
}
1497
1498
46
size_t BlockFileCache::get_used_cache_size(FileCacheType cache_type) const {
1499
46
    SCOPED_CACHE_LOCK(_mutex, this);
1500
46
    return get_used_cache_size_unlocked(cache_type, cache_lock);
1501
46
}
1502
1503
size_t BlockFileCache::get_used_cache_size_unlocked(FileCacheType cache_type,
1504
46
                                                    std::lock_guard<std::mutex>& cache_lock) const {
1505
46
    return get_queue(cache_type).get_capacity(cache_lock);
1506
46
}
1507
1508
0
size_t BlockFileCache::get_available_cache_size(FileCacheType cache_type) const {
1509
0
    SCOPED_CACHE_LOCK(_mutex, this);
1510
0
    return get_available_cache_size_unlocked(cache_type, cache_lock);
1511
0
}
1512
1513
size_t BlockFileCache::get_available_cache_size_unlocked(
1514
0
        FileCacheType cache_type, std::lock_guard<std::mutex>& cache_lock) const {
1515
0
    return get_queue(cache_type).get_max_element_size() -
1516
0
           get_used_cache_size_unlocked(cache_type, cache_lock);
1517
0
}
1518
1519
88
size_t BlockFileCache::get_file_blocks_num(FileCacheType cache_type) const {
1520
88
    SCOPED_CACHE_LOCK(_mutex, this);
1521
88
    return get_file_blocks_num_unlocked(cache_type, cache_lock);
1522
88
}
1523
1524
size_t BlockFileCache::get_file_blocks_num_unlocked(FileCacheType cache_type,
1525
88
                                                    std::lock_guard<std::mutex>& cache_lock) const {
1526
88
    return get_queue(cache_type).get_elements_num(cache_lock);
1527
88
}
1528
1529
FileBlockCell::FileBlockCell(FileBlockSPtr file_block, std::lock_guard<std::mutex>& cache_lock)
1530
8.40k
        : file_block(file_block) {
1531
8.40k
    file_block->cell = this;
1532
    /**
1533
     * Cell can be created with either DOWNLOADED or EMPTY file block's state.
1534
     * File block acquires DOWNLOADING state and creates LRUQueue iterator on first
1535
     * successful getOrSetDownaloder call.
1536
     */
1537
1538
8.40k
    switch (file_block->_download_state) {
1539
197
    case FileBlock::State::DOWNLOADED:
1540
8.40k
    case FileBlock::State::EMPTY:
1541
8.40k
    case FileBlock::State::SKIP_CACHE: {
1542
8.40k
        break;
1543
8.40k
    }
1544
0
    default:
1545
0
        DCHECK(false) << "Can create cell with either EMPTY, DOWNLOADED, SKIP_CACHE state, got: "
1546
0
                      << FileBlock::state_to_string(file_block->_download_state);
1547
8.40k
    }
1548
8.40k
    if (file_block->cache_type() == FileCacheType::TTL) {
1549
3.92k
        update_atime();
1550
3.92k
    }
1551
8.40k
}
1552
1553
LRUQueue::Iterator LRUQueue::add(const UInt128Wrapper& hash, size_t offset, size_t size,
1554
9.19k
                                 std::lock_guard<std::mutex>& /* cache_lock */) {
1555
9.19k
    cache_size += size;
1556
9.19k
    auto iter = queue.insert(queue.end(), FileKeyAndOffset(hash, offset, size));
1557
9.19k
    map.insert(std::make_pair(std::make_pair(hash, offset), iter));
1558
9.19k
    return iter;
1559
9.19k
}
1560
1561
0
void LRUQueue::remove_all(std::lock_guard<std::mutex>& /* cache_lock */) {
1562
0
    queue.clear();
1563
0
    map.clear();
1564
0
    cache_size = 0;
1565
0
}
1566
1567
1.74k
void LRUQueue::move_to_end(Iterator queue_it, std::lock_guard<std::mutex>& /* cache_lock */) {
1568
1.74k
    queue.splice(queue.end(), queue, queue_it);
1569
1.74k
}
1570
1571
void LRUQueue::resize(Iterator queue_it, size_t new_size,
1572
5
                      std::lock_guard<std::mutex>& /* cache_lock */) {
1573
5
    cache_size -= queue_it->size;
1574
5
    queue_it->size = new_size;
1575
5
    cache_size += new_size;
1576
5
}
1577
bool LRUQueue::contains(const UInt128Wrapper& hash, size_t offset,
1578
4
                        std::lock_guard<std::mutex>& /* cache_lock */) const {
1579
4
    return map.find(std::make_pair(hash, offset)) != map.end();
1580
4
}
1581
1582
LRUQueue::Iterator LRUQueue::get(const UInt128Wrapper& hash, size_t offset,
1583
559
                                 std::lock_guard<std::mutex>& /* cache_lock */) const {
1584
559
    auto itr = map.find(std::make_pair(hash, offset));
1585
559
    if (itr != map.end()) {
1586
546
        return itr->second;
1587
546
    }
1588
13
    return std::list<FileKeyAndOffset>::iterator();
1589
559
}
1590
1591
2
std::string LRUQueue::to_string(std::lock_guard<std::mutex>& /* cache_lock */) const {
1592
2
    std::string result;
1593
18
    for (const auto& [hash, offset, size] : queue) {
1594
18
        if (!result.empty()) {
1595
16
            result += ", ";
1596
16
        }
1597
18
        result += fmt::format("{}: [{}, {}]", hash.to_string(), offset, offset + size - 1);
1598
18
    }
1599
2
    return result;
1600
2
}
1601
1602
size_t LRUQueue::levenshtein_distance_from(LRUQueue& base,
1603
5
                                           std::lock_guard<std::mutex>& cache_lock) {
1604
5
    std::list<FileKeyAndOffset> target_queue = this->queue;
1605
5
    std::list<FileKeyAndOffset> base_queue = base.queue;
1606
5
    std::vector<FileKeyAndOffset> vec1(target_queue.begin(), target_queue.end());
1607
5
    std::vector<FileKeyAndOffset> vec2(base_queue.begin(), base_queue.end());
1608
1609
5
    size_t m = vec1.size();
1610
5
    size_t n = vec2.size();
1611
1612
    // Create a 2D vector (matrix) to store the Levenshtein distances
1613
    // dp[i][j] will hold the distance between the first i elements of vec1 and the first j elements of vec2
1614
5
    std::vector<std::vector<size_t>> dp(m + 1, std::vector<size_t>(n + 1, 0));
1615
1616
    // Initialize the first row and column of the matrix
1617
    // The distance between an empty list and a list of length k is k (all insertions or deletions)
1618
22
    for (size_t i = 0; i <= m; ++i) {
1619
17
        dp[i][0] = i;
1620
17
    }
1621
20
    for (size_t j = 0; j <= n; ++j) {
1622
15
        dp[0][j] = j;
1623
15
    }
1624
1625
    // Fill the matrix using dynamic programming
1626
17
    for (size_t i = 1; i <= m; ++i) {
1627
38
        for (size_t j = 1; j <= n; ++j) {
1628
            // Check if the current elements of both vectors are equal
1629
26
            size_t cost = (vec1[i - 1].hash == vec2[j - 1].hash &&
1630
26
                           vec1[i - 1].offset == vec2[j - 1].offset)
1631
26
                                  ? 0
1632
26
                                  : 1;
1633
            // Calculate the minimum cost of three possible operations:
1634
            // 1. Insertion: dp[i][j-1] + 1
1635
            // 2. Deletion: dp[i-1][j] + 1
1636
            // 3. Substitution: dp[i-1][j-1] + cost (0 if elements are equal, 1 if not)
1637
26
            dp[i][j] = std::min({dp[i - 1][j] + 1, dp[i][j - 1] + 1, dp[i - 1][j - 1] + cost});
1638
26
        }
1639
12
    }
1640
    // The bottom-right cell of the matrix contains the Levenshtein distance
1641
5
    return dp[m][n];
1642
5
}
1643
1644
1
std::string BlockFileCache::dump_structure(const UInt128Wrapper& hash) {
1645
1
    SCOPED_CACHE_LOCK(_mutex, this);
1646
1
    return dump_structure_unlocked(hash, cache_lock);
1647
1
}
1648
1649
std::string BlockFileCache::dump_structure_unlocked(const UInt128Wrapper& hash,
1650
1
                                                    std::lock_guard<std::mutex>&) {
1651
1
    std::stringstream result;
1652
1
    auto it = _files.find(hash);
1653
1
    if (it == _files.end()) {
1654
0
        return std::string("");
1655
0
    }
1656
1
    const auto& cells_by_offset = it->second;
1657
1658
1
    for (const auto& [_, cell] : cells_by_offset) {
1659
1
        result << cell.file_block->get_info_for_log() << " "
1660
1
               << cache_type_to_string(cell.file_block->cache_type()) << "\n";
1661
1
    }
1662
1663
1
    return result.str();
1664
1
}
1665
1666
0
std::string BlockFileCache::dump_single_cache_type(const UInt128Wrapper& hash, size_t offset) {
1667
0
    SCOPED_CACHE_LOCK(_mutex, this);
1668
0
    return dump_single_cache_type_unlocked(hash, offset, cache_lock);
1669
0
}
1670
1671
std::string BlockFileCache::dump_single_cache_type_unlocked(const UInt128Wrapper& hash,
1672
                                                            size_t offset,
1673
0
                                                            std::lock_guard<std::mutex>&) {
1674
0
    std::stringstream result;
1675
0
    auto it = _files.find(hash);
1676
0
    if (it == _files.end()) {
1677
0
        return std::string("");
1678
0
    }
1679
0
    const auto& cells_by_offset = it->second;
1680
0
    const auto& cell = cells_by_offset.find(offset);
1681
1682
0
    return cache_type_to_string(cell->second.file_block->cache_type());
1683
0
}
1684
1685
void BlockFileCache::change_cache_type(const UInt128Wrapper& hash, size_t offset,
1686
                                       FileCacheType new_type,
1687
10
                                       std::lock_guard<std::mutex>& cache_lock) {
1688
10
    if (auto iter = _files.find(hash); iter != _files.end()) {
1689
10
        auto& file_blocks = iter->second;
1690
10
        if (auto cell_it = file_blocks.find(offset); cell_it != file_blocks.end()) {
1691
9
            FileBlockCell& cell = cell_it->second;
1692
9
            auto& cur_queue = get_queue(cell.file_block->cache_type());
1693
9
            DCHECK(cell.queue_iterator.has_value());
1694
9
            cur_queue.remove(*cell.queue_iterator, cache_lock);
1695
9
            _lru_recorder->record_queue_event(
1696
9
                    cell.file_block->cache_type(), CacheLRULogType::REMOVE,
1697
9
                    cell.file_block->get_hash_value(), cell.file_block->offset(), cell.size());
1698
9
            auto& new_queue = get_queue(new_type);
1699
9
            cell.queue_iterator =
1700
9
                    new_queue.add(hash, offset, cell.file_block->range().size(), cache_lock);
1701
9
            _lru_recorder->record_queue_event(new_type, CacheLRULogType::ADD,
1702
9
                                              cell.file_block->get_hash_value(),
1703
9
                                              cell.file_block->offset(), cell.size());
1704
9
        }
1705
10
    }
1706
10
}
1707
1708
// @brief: get a path's disk capacity used percent, inode used percent
1709
// @param: path
1710
// @param: percent.first disk used percent, percent.second inode used percent
1711
153
int disk_used_percentage(const std::string& path, std::pair<int, int>* percent) {
1712
153
    struct statfs stat;
1713
153
    int ret = statfs(path.c_str(), &stat);
1714
153
    if (ret != 0) {
1715
2
        return ret;
1716
2
    }
1717
    // https://github.com/coreutils/coreutils/blob/master/src/df.c#L1195
1718
    // v->used = stat.f_blocks - stat.f_bfree
1719
    // nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail
1720
151
    uintmax_t u100 = (stat.f_blocks - stat.f_bfree) * 100;
1721
151
    uintmax_t nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail;
1722
151
    int capacity_percentage = int(u100 / nonroot_total + (u100 % nonroot_total != 0));
1723
1724
151
    unsigned long long inode_free = stat.f_ffree;
1725
151
    unsigned long long inode_total = stat.f_files;
1726
151
    int inode_percentage = cast_set<int>(inode_free * 100 / inode_total);
1727
151
    percent->first = capacity_percentage;
1728
151
    percent->second = 100 - inode_percentage;
1729
1730
    // Add sync point for testing
1731
151
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::disk_used_percentage:1", percent);
1732
1733
151
    return 0;
1734
153
}
1735
1736
10
std::string BlockFileCache::reset_capacity(size_t new_capacity) {
1737
10
    using namespace std::chrono;
1738
10
    int64_t space_released = 0;
1739
10
    size_t old_capacity = 0;
1740
10
    std::stringstream ss;
1741
10
    ss << "finish reset_capacity, path=" << _cache_base_path;
1742
10
    auto adjust_start_time = steady_clock::time_point();
1743
10
    {
1744
10
        SCOPED_CACHE_LOCK(_mutex, this);
1745
10
        if (new_capacity < _capacity && new_capacity < _cur_cache_size) {
1746
1
            int64_t need_remove_size = _cur_cache_size - new_capacity;
1747
4
            auto remove_blocks = [&](LRUQueue& queue) -> int64_t {
1748
4
                int64_t queue_released = 0;
1749
4
                std::vector<FileBlockCell*> to_evict;
1750
13
                for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1751
13
                    if (need_remove_size <= 0) {
1752
1
                        break;
1753
1
                    }
1754
12
                    need_remove_size -= entry_size;
1755
12
                    space_released += entry_size;
1756
12
                    queue_released += entry_size;
1757
12
                    auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1758
12
                    if (!cell->releasable()) {
1759
0
                        cell->file_block->set_deleting();
1760
0
                        continue;
1761
0
                    }
1762
12
                    to_evict.push_back(cell);
1763
12
                }
1764
12
                for (auto& cell : to_evict) {
1765
12
                    FileBlockSPtr file_block = cell->file_block;
1766
12
                    std::lock_guard block_lock(file_block->_mutex);
1767
12
                    remove(file_block, cache_lock, block_lock);
1768
12
                }
1769
4
                return queue_released;
1770
4
            };
1771
1
            int64_t queue_released = remove_blocks(_disposable_queue);
1772
1
            ss << " disposable_queue released " << queue_released;
1773
1
            queue_released = remove_blocks(_normal_queue);
1774
1
            ss << " normal_queue released " << queue_released;
1775
1
            queue_released = remove_blocks(_index_queue);
1776
1
            ss << " index_queue released " << queue_released;
1777
1
            queue_released = remove_blocks(_ttl_queue);
1778
1
            ss << " ttl_queue released " << queue_released;
1779
1780
1
            _disk_resource_limit_mode = true;
1781
1
            _disk_limit_mode_metrics->set_value(1);
1782
1
            ss << " total_space_released=" << space_released;
1783
1
        }
1784
10
        old_capacity = _capacity;
1785
10
        _capacity = new_capacity;
1786
10
        _cache_capacity_metrics->set_value(_capacity);
1787
10
    }
1788
10
    auto use_time = duration_cast<milliseconds>(steady_clock::time_point() - adjust_start_time);
1789
10
    LOG(INFO) << "Finish tag deleted block. path=" << _cache_base_path
1790
10
              << " use_time=" << cast_set<int64_t>(use_time.count());
1791
10
    ss << " old_capacity=" << old_capacity << " new_capacity=" << new_capacity;
1792
10
    LOG(INFO) << ss.str();
1793
10
    return ss.str();
1794
10
}
1795
1796
935
void BlockFileCache::check_disk_resource_limit() {
1797
935
    if (_storage->get_type() != FileCacheStorageType::DISK) {
1798
796
        return;
1799
796
    }
1800
1801
139
    bool previous_mode = _disk_resource_limit_mode;
1802
139
    if (_capacity > _cur_cache_size) {
1803
137
        _disk_resource_limit_mode = false;
1804
137
        _disk_limit_mode_metrics->set_value(0);
1805
137
    }
1806
139
    std::pair<int, int> percent;
1807
139
    int ret = disk_used_percentage(_cache_base_path, &percent);
1808
139
    if (ret != 0) {
1809
1
        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
1810
1
        return;
1811
1
    }
1812
138
    auto [space_percentage, inode_percentage] = percent;
1813
276
    auto is_insufficient = [](const int& percentage) {
1814
276
        return percentage >= config::file_cache_enter_disk_resource_limit_mode_percent;
1815
276
    };
1816
138
    DCHECK_GE(space_percentage, 0);
1817
138
    DCHECK_LE(space_percentage, 100);
1818
138
    DCHECK_GE(inode_percentage, 0);
1819
138
    DCHECK_LE(inode_percentage, 100);
1820
    // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
1821
    // FIXME: reject with config validator
1822
138
    if (config::file_cache_enter_disk_resource_limit_mode_percent <
1823
138
        config::file_cache_exit_disk_resource_limit_mode_percent) {
1824
1
        LOG_WARNING("config error, set to default value")
1825
1
                .tag("enter", config::file_cache_enter_disk_resource_limit_mode_percent)
1826
1
                .tag("exit", config::file_cache_exit_disk_resource_limit_mode_percent);
1827
1
        config::file_cache_enter_disk_resource_limit_mode_percent = 88;
1828
1
        config::file_cache_exit_disk_resource_limit_mode_percent = 80;
1829
1
    }
1830
138
    bool is_space_insufficient = is_insufficient(space_percentage);
1831
138
    bool is_inode_insufficient = is_insufficient(inode_percentage);
1832
138
    if (is_space_insufficient || is_inode_insufficient) {
1833
1
        _disk_resource_limit_mode = true;
1834
1
        _disk_limit_mode_metrics->set_value(1);
1835
137
    } else if (_disk_resource_limit_mode &&
1836
137
               (space_percentage < config::file_cache_exit_disk_resource_limit_mode_percent) &&
1837
137
               (inode_percentage < config::file_cache_exit_disk_resource_limit_mode_percent)) {
1838
0
        _disk_resource_limit_mode = false;
1839
0
        _disk_limit_mode_metrics->set_value(0);
1840
0
    }
1841
138
    if (previous_mode != _disk_resource_limit_mode) {
1842
        // add log for disk resource limit mode switching
1843
2
        if (_disk_resource_limit_mode) {
1844
1
            LOG(WARNING) << "Entering disk resource limit mode: file_cache=" << get_base_path()
1845
1
                         << " space_percent=" << space_percentage
1846
1
                         << " inode_percent=" << inode_percentage
1847
1
                         << " is_space_insufficient=" << is_space_insufficient
1848
1
                         << " is_inode_insufficient=" << is_inode_insufficient
1849
1
                         << " enter threshold="
1850
1
                         << config::file_cache_enter_disk_resource_limit_mode_percent;
1851
1
        } else {
1852
1
            LOG(INFO) << "Exiting disk resource limit mode: file_cache=" << get_base_path()
1853
1
                      << " space_percent=" << space_percentage
1854
1
                      << " inode_percent=" << inode_percentage << " exit threshold="
1855
1
                      << config::file_cache_exit_disk_resource_limit_mode_percent;
1856
1
        }
1857
136
    } else if (_disk_resource_limit_mode) {
1858
        // print log for disk resource limit mode running, but less frequently
1859
0
        LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path()
1860
0
                                 << " space_percent=" << space_percentage
1861
0
                                 << " inode_percent=" << inode_percentage
1862
0
                                 << " is_space_insufficient=" << is_space_insufficient
1863
0
                                 << " is_inode_insufficient=" << is_inode_insufficient
1864
0
                                 << " mode run in resource limit";
1865
0
    }
1866
138
}
1867
1868
15
void BlockFileCache::check_need_evict_cache_in_advance() {
1869
15
    if (_storage->get_type() != FileCacheStorageType::DISK) {
1870
1
        return;
1871
1
    }
1872
1873
14
    std::pair<int, int> percent;
1874
14
    int ret = disk_used_percentage(_cache_base_path, &percent);
1875
14
    if (ret != 0) {
1876
1
        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
1877
1
        return;
1878
1
    }
1879
13
    auto [space_percentage, inode_percentage] = percent;
1880
13
    int size_percentage = static_cast<int>(_cur_cache_size * 100 / _capacity);
1881
39
    auto is_insufficient = [](const int& percentage) {
1882
39
        return percentage >= config::file_cache_enter_need_evict_cache_in_advance_percent;
1883
39
    };
1884
13
    DCHECK_GE(space_percentage, 0);
1885
13
    DCHECK_LE(space_percentage, 100);
1886
13
    DCHECK_GE(inode_percentage, 0);
1887
13
    DCHECK_LE(inode_percentage, 100);
1888
    // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
1889
    // FIXME: reject with config validator
1890
13
    if (config::file_cache_enter_need_evict_cache_in_advance_percent <=
1891
13
        config::file_cache_exit_need_evict_cache_in_advance_percent) {
1892
1
        LOG_WARNING("config error, set to default value")
1893
1
                .tag("enter", config::file_cache_enter_need_evict_cache_in_advance_percent)
1894
1
                .tag("exit", config::file_cache_exit_need_evict_cache_in_advance_percent);
1895
1
        config::file_cache_enter_need_evict_cache_in_advance_percent = 78;
1896
1
        config::file_cache_exit_need_evict_cache_in_advance_percent = 75;
1897
1
    }
1898
13
    bool previous_mode = _need_evict_cache_in_advance;
1899
13
    bool is_space_insufficient = is_insufficient(space_percentage);
1900
13
    bool is_inode_insufficient = is_insufficient(inode_percentage);
1901
13
    bool is_size_insufficient = is_insufficient(size_percentage);
1902
13
    if (is_space_insufficient || is_inode_insufficient || is_size_insufficient) {
1903
9
        _need_evict_cache_in_advance = true;
1904
9
        _need_evict_cache_in_advance_metrics->set_value(1);
1905
9
    } else if (_need_evict_cache_in_advance &&
1906
4
               (space_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
1907
4
               (inode_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
1908
4
               (size_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent)) {
1909
1
        _need_evict_cache_in_advance = false;
1910
1
        _need_evict_cache_in_advance_metrics->set_value(0);
1911
1
    }
1912
13
    if (previous_mode != _need_evict_cache_in_advance) {
1913
        // add log for evict cache in advance mode switching
1914
6
        if (_need_evict_cache_in_advance) {
1915
5
            LOG(WARNING) << "Entering evict cache in advance mode: "
1916
5
                         << "file_cache=" << get_base_path()
1917
5
                         << " space_percent=" << space_percentage
1918
5
                         << " inode_percent=" << inode_percentage
1919
5
                         << " size_percent=" << size_percentage
1920
5
                         << " is_space_insufficient=" << is_space_insufficient
1921
5
                         << " is_inode_insufficient=" << is_inode_insufficient
1922
5
                         << " is_size_insufficient=" << is_size_insufficient << " enter threshold="
1923
5
                         << config::file_cache_enter_need_evict_cache_in_advance_percent;
1924
5
        } else {
1925
1
            LOG(INFO) << "Exiting evict cache in advance mode: "
1926
1
                      << "file_cache=" << get_base_path() << " space_percent=" << space_percentage
1927
1
                      << " inode_percent=" << inode_percentage
1928
1
                      << " size_percent=" << size_percentage << " exit threshold="
1929
1
                      << config::file_cache_exit_need_evict_cache_in_advance_percent;
1930
1
        }
1931
7
    } else if (_need_evict_cache_in_advance) {
1932
        // print log for evict cache in advance mode running, but less frequently
1933
4
        LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path()
1934
1
                                 << " space_percent=" << space_percentage
1935
1
                                 << " inode_percent=" << inode_percentage
1936
1
                                 << " size_percent=" << size_percentage
1937
1
                                 << " is_space_insufficient=" << is_space_insufficient
1938
1
                                 << " is_inode_insufficient=" << is_inode_insufficient
1939
1
                                 << " is_size_insufficient=" << is_size_insufficient
1940
1
                                 << " need evict cache in advance";
1941
4
    }
1942
13
}
1943
1944
140
void BlockFileCache::run_background_monitor() {
1945
140
    Thread::set_self_name("run_background_monitor");
1946
935
    while (!_close) {
1947
935
        int64_t interval_ms = config::file_cache_background_monitor_interval_ms;
1948
935
        TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_ms);
1949
935
        check_disk_resource_limit();
1950
935
        if (config::enable_evict_file_cache_in_advance) {
1951
7
            check_need_evict_cache_in_advance();
1952
928
        } else {
1953
928
            _need_evict_cache_in_advance = false;
1954
928
            _need_evict_cache_in_advance_metrics->set_value(0);
1955
928
        }
1956
1957
935
        {
1958
935
            std::unique_lock close_lock(_close_mtx);
1959
935
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
1960
935
            if (_close) {
1961
140
                break;
1962
140
            }
1963
935
        }
1964
        // report
1965
795
        {
1966
795
            SCOPED_CACHE_LOCK(_mutex, this);
1967
795
            _cur_cache_size_metrics->set_value(_cur_cache_size);
1968
795
            _cur_ttl_cache_size_metrics->set_value(_cur_cache_size -
1969
795
                                                   _index_queue.get_capacity(cache_lock) -
1970
795
                                                   _normal_queue.get_capacity(cache_lock) -
1971
795
                                                   _disposable_queue.get_capacity(cache_lock));
1972
795
            _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(
1973
795
                    _ttl_queue.get_capacity(cache_lock));
1974
795
            _cur_ttl_cache_lru_queue_element_count_metrics->set_value(
1975
795
                    _ttl_queue.get_elements_num(cache_lock));
1976
795
            _cur_normal_queue_cache_size_metrics->set_value(_normal_queue.get_capacity(cache_lock));
1977
795
            _cur_normal_queue_element_count_metrics->set_value(
1978
795
                    _normal_queue.get_elements_num(cache_lock));
1979
795
            _cur_index_queue_cache_size_metrics->set_value(_index_queue.get_capacity(cache_lock));
1980
795
            _cur_index_queue_element_count_metrics->set_value(
1981
795
                    _index_queue.get_elements_num(cache_lock));
1982
795
            _cur_disposable_queue_cache_size_metrics->set_value(
1983
795
                    _disposable_queue.get_capacity(cache_lock));
1984
795
            _cur_disposable_queue_element_count_metrics->set_value(
1985
795
                    _disposable_queue.get_elements_num(cache_lock));
1986
1987
            // Update meta store write queue size if storage is FSFileCacheStorage
1988
795
            if (_storage->get_type() == FileCacheStorageType::DISK) {
1989
15
                auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get());
1990
15
                if (fs_storage != nullptr) {
1991
15
                    auto* meta_store = fs_storage->get_meta_store();
1992
15
                    if (meta_store != nullptr) {
1993
15
                        _meta_store_write_queue_size_metrics->set_value(
1994
15
                                meta_store->get_write_queue_size());
1995
15
                    }
1996
15
                }
1997
15
            }
1998
1999
795
            if (_num_read_blocks->get_value() > 0) {
2000
12
                _hit_ratio->set_value((double)_num_hit_blocks->get_value() /
2001
12
                                      (double)_num_read_blocks->get_value());
2002
12
            }
2003
795
            if (_num_read_blocks_5m && _num_read_blocks_5m->get_value() > 0) {
2004
0
                _hit_ratio_5m->set_value((double)_num_hit_blocks_5m->get_value() /
2005
0
                                         (double)_num_read_blocks_5m->get_value());
2006
0
            }
2007
795
            if (_num_read_blocks_1h && _num_read_blocks_1h->get_value() > 0) {
2008
0
                _hit_ratio_1h->set_value((double)_num_hit_blocks_1h->get_value() /
2009
0
                                         (double)_num_read_blocks_1h->get_value());
2010
0
            }
2011
2012
795
            if (_no_warmup_num_hit_blocks->get_value() > 0) {
2013
2
                _no_warmup_hit_ratio->set_value((double)_no_warmup_num_hit_blocks->get_value() /
2014
2
                                                (double)_no_warmup_num_read_blocks->get_value());
2015
2
            }
2016
795
            if (_no_warmup_num_hit_blocks_5m && _no_warmup_num_hit_blocks_5m->get_value() > 0) {
2017
0
                _no_warmup_hit_ratio_5m->set_value(
2018
0
                        (double)_no_warmup_num_hit_blocks_5m->get_value() /
2019
0
                        (double)_no_warmup_num_read_blocks_5m->get_value());
2020
0
            }
2021
795
            if (_no_warmup_num_hit_blocks_1h && _no_warmup_num_hit_blocks_1h->get_value() > 0) {
2022
0
                _no_warmup_hit_ratio_1h->set_value(
2023
0
                        (double)_no_warmup_num_hit_blocks_1h->get_value() /
2024
0
                        (double)_no_warmup_num_read_blocks_1h->get_value());
2025
0
            }
2026
795
        }
2027
795
    }
2028
140
}
2029
2030
140
void BlockFileCache::run_background_gc() {
2031
140
    Thread::set_self_name("run_background_gc");
2032
140
    FileCacheKey key;
2033
140
    size_t batch_count = 0;
2034
328
    while (!_close) {
2035
328
        int64_t interval_ms = config::file_cache_background_gc_interval_ms;
2036
328
        size_t batch_limit = config::file_cache_remove_block_qps_limit * interval_ms / 1000;
2037
328
        {
2038
328
            std::unique_lock close_lock(_close_mtx);
2039
328
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2040
328
            if (_close) {
2041
140
                break;
2042
140
            }
2043
328
        }
2044
2045
197
        while (batch_count < batch_limit && _recycle_keys.try_dequeue(key)) {
2046
9
            int64_t duration_ns = 0;
2047
9
            Status st;
2048
9
            {
2049
9
                SCOPED_RAW_TIMER(&duration_ns);
2050
9
                st = _storage->remove(key);
2051
9
            }
2052
9
            *_storage_async_remove_latency_us << (duration_ns / 1000);
2053
2054
9
            if (!st.ok()) {
2055
0
                LOG_WARNING("").error(st);
2056
0
            }
2057
9
            batch_count++;
2058
9
        }
2059
188
        *_recycle_keys_length_recorder << _recycle_keys.size_approx();
2060
188
        batch_count = 0;
2061
188
    }
2062
140
}
2063
2064
140
void BlockFileCache::run_background_evict_in_advance() {
2065
140
    Thread::set_self_name("run_background_evict_in_advance");
2066
140
    LOG(INFO) << "Starting background evict in advance thread";
2067
140
    int64_t batch = 0;
2068
4.14k
    while (!_close) {
2069
4.14k
        {
2070
4.14k
            std::unique_lock close_lock(_close_mtx);
2071
4.14k
            _close_cv.wait_for(
2072
4.14k
                    close_lock,
2073
4.14k
                    std::chrono::milliseconds(config::file_cache_evict_in_advance_interval_ms));
2074
4.14k
            if (_close) {
2075
140
                LOG(INFO) << "Background evict in advance thread exiting due to cache closing";
2076
140
                break;
2077
140
            }
2078
4.14k
        }
2079
4.00k
        batch = config::file_cache_evict_in_advance_batch_bytes;
2080
2081
        // Skip if eviction not needed or too many pending recycles
2082
4.00k
        if (!_need_evict_cache_in_advance ||
2083
4.00k
            _recycle_keys.size_approx() >=
2084
4.00k
                    config::file_cache_evict_in_advance_recycle_keys_num_threshold) {
2085
4.00k
            continue;
2086
4.00k
        }
2087
2088
2
        int64_t duration_ns = 0;
2089
2
        {
2090
2
            SCOPED_CACHE_LOCK(_mutex, this);
2091
2
            SCOPED_RAW_TIMER(&duration_ns);
2092
2
            try_evict_in_advance(batch, cache_lock);
2093
2
        }
2094
2
        *_evict_in_advance_latency_us << (duration_ns / 1000);
2095
2
    }
2096
140
}
2097
2098
140
void BlockFileCache::run_background_block_lru_update() {
2099
140
    Thread::set_self_name("run_background_block_lru_update");
2100
140
    std::vector<FileBlockSPtr> batch;
2101
4.07k
    while (!_close) {
2102
4.07k
        int64_t interval_ms = config::file_cache_background_block_lru_update_interval_ms;
2103
4.07k
        size_t batch_limit =
2104
4.07k
                config::file_cache_background_block_lru_update_qps_limit * interval_ms / 1000;
2105
4.07k
        {
2106
4.07k
            std::unique_lock close_lock(_close_mtx);
2107
4.07k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2108
4.07k
            if (_close) {
2109
140
                break;
2110
140
            }
2111
4.07k
        }
2112
2113
3.93k
        batch.clear();
2114
3.93k
        batch.reserve(batch_limit);
2115
3.93k
        size_t drained = _need_update_lru_blocks.drain(batch_limit, &batch);
2116
3.93k
        if (drained == 0) {
2117
3.93k
            *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2118
3.93k
            continue;
2119
3.93k
        }
2120
2121
6
        int64_t duration_ns = 0;
2122
6
        {
2123
6
            SCOPED_CACHE_LOCK(_mutex, this);
2124
6
            SCOPED_RAW_TIMER(&duration_ns);
2125
481
            for (auto& block : batch) {
2126
481
                update_block_lru(block, cache_lock);
2127
481
            }
2128
6
        }
2129
6
        *_update_lru_blocks_latency_us << (duration_ns / 1000);
2130
6
        *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2131
6
    }
2132
140
}
2133
2134
std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>>
2135
2
BlockFileCache::get_hot_blocks_meta(const UInt128Wrapper& hash) const {
2136
2
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
2137
2
                               std::chrono::steady_clock::now().time_since_epoch())
2138
2
                               .count();
2139
2
    SCOPED_CACHE_LOCK(_mutex, this);
2140
2
    std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>> blocks_meta;
2141
2
    if (auto iter = _files.find(hash); iter != _files.end()) {
2142
5
        for (auto& pair : _files.find(hash)->second) {
2143
5
            const FileBlockCell* cell = &pair.second;
2144
5
            if (cell->file_block->cache_type() != FileCacheType::DISPOSABLE) {
2145
4
                if (cell->file_block->cache_type() == FileCacheType::TTL ||
2146
4
                    (cell->atime != 0 &&
2147
3
                     cur_time - cell->atime <
2148
3
                             get_queue(cell->file_block->cache_type()).get_hot_data_interval())) {
2149
3
                    blocks_meta.emplace_back(pair.first, cell->size(),
2150
3
                                             cell->file_block->cache_type(),
2151
3
                                             cell->file_block->expiration_time());
2152
3
                }
2153
4
            }
2154
5
        }
2155
2
    }
2156
2
    return blocks_meta;
2157
2
}
2158
2159
bool BlockFileCache::try_reserve_during_async_load(size_t size,
2160
2
                                                   std::lock_guard<std::mutex>& cache_lock) {
2161
2
    size_t removed_size = 0;
2162
2
    size_t normal_queue_size = _normal_queue.get_capacity(cache_lock);
2163
2
    size_t disposable_queue_size = _disposable_queue.get_capacity(cache_lock);
2164
2
    size_t index_queue_size = _index_queue.get_capacity(cache_lock);
2165
2166
2
    std::vector<FileBlockCell*> to_evict;
2167
2
    auto collect_eliminate_fragments = [&](LRUQueue& queue) {
2168
3
        for (const auto& [entry_key, entry_offset, entry_size] : queue) {
2169
3
            if (!_disk_resource_limit_mode || removed_size >= size) {
2170
2
                break;
2171
2
            }
2172
1
            auto* cell = get_cell(entry_key, entry_offset, cache_lock);
2173
2174
1
            DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
2175
0
                         << ", offset: " << entry_offset;
2176
2177
1
            size_t cell_size = cell->size();
2178
1
            DCHECK(entry_size == cell_size);
2179
2180
1
            if (cell->releasable()) {
2181
1
                auto& file_block = cell->file_block;
2182
2183
1
                std::lock_guard block_lock(file_block->_mutex);
2184
1
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
2185
1
                to_evict.push_back(cell);
2186
1
                removed_size += cell_size;
2187
1
            }
2188
1
        }
2189
2
    };
2190
2
    if (disposable_queue_size != 0) {
2191
0
        collect_eliminate_fragments(get_queue(FileCacheType::DISPOSABLE));
2192
0
    }
2193
2
    if (normal_queue_size != 0) {
2194
2
        collect_eliminate_fragments(get_queue(FileCacheType::NORMAL));
2195
2
    }
2196
2
    if (index_queue_size != 0) {
2197
0
        collect_eliminate_fragments(get_queue(FileCacheType::INDEX));
2198
0
    }
2199
2
    std::string reason = "async load";
2200
2
    remove_file_blocks(to_evict, cache_lock, true, reason);
2201
2202
2
    return !_disk_resource_limit_mode || removed_size >= size;
2203
2
}
2204
2205
26
void BlockFileCache::clear_need_update_lru_blocks() {
2206
26
    _need_update_lru_blocks.clear();
2207
26
    *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2208
26
}
2209
2210
25
void BlockFileCache::pause_ttl_manager() {
2211
25
    if (_ttl_mgr) {
2212
4
        _ttl_mgr->stop();
2213
4
    }
2214
25
}
2215
2216
25
void BlockFileCache::resume_ttl_manager() {
2217
25
    if (_ttl_mgr) {
2218
4
        _ttl_mgr->resume();
2219
4
    }
2220
25
}
2221
2222
25
std::string BlockFileCache::clear_file_cache_directly() {
2223
25
    pause_ttl_manager();
2224
25
    _lru_dumper->remove_lru_dump_files();
2225
25
    using namespace std::chrono;
2226
25
    std::stringstream ss;
2227
25
    auto start = steady_clock::now();
2228
25
    std::string result;
2229
25
    {
2230
25
        SCOPED_CACHE_LOCK(_mutex, this);
2231
25
        LOG_INFO("start clear_file_cache_directly").tag("path", _cache_base_path);
2232
2233
25
        std::string clear_msg;
2234
25
        auto s = _storage->clear(clear_msg);
2235
25
        if (!s.ok()) {
2236
1
            result = clear_msg;
2237
24
        } else {
2238
24
            int64_t num_files = _files.size();
2239
24
            int64_t cache_size = _cur_cache_size;
2240
24
            int64_t index_queue_size = _index_queue.get_elements_num(cache_lock);
2241
24
            int64_t normal_queue_size = _normal_queue.get_elements_num(cache_lock);
2242
24
            int64_t disposible_queue_size = _disposable_queue.get_elements_num(cache_lock);
2243
24
            int64_t ttl_queue_size = _ttl_queue.get_elements_num(cache_lock);
2244
2245
24
            int64_t clear_fd_duration = 0;
2246
24
            {
2247
                // clear FDCache to release fd
2248
24
                SCOPED_RAW_TIMER(&clear_fd_duration);
2249
24
                for (const auto& [file_key, file_blocks] : _files) {
2250
1
                    for (const auto& [offset, file_block_cell] : file_blocks) {
2251
1
                        AccessKeyAndOffset access_key_and_offset(file_key, offset);
2252
1
                        FDCache::instance()->remove_file_reader(access_key_and_offset);
2253
1
                    }
2254
1
                }
2255
24
            }
2256
2257
24
            _files.clear();
2258
24
            _cur_cache_size = 0;
2259
24
            _cur_ttl_size = 0;
2260
24
            _time_to_key.clear();
2261
24
            _key_to_time.clear();
2262
24
            _index_queue.clear(cache_lock);
2263
24
            _normal_queue.clear(cache_lock);
2264
24
            _disposable_queue.clear(cache_lock);
2265
24
            _ttl_queue.clear(cache_lock);
2266
2267
            // Update cache metrics immediately so consumers observe the cleared state
2268
            // without waiting for the next background monitor round.
2269
24
            _cur_cache_size_metrics->set_value(0);
2270
24
            _cur_ttl_cache_size_metrics->set_value(0);
2271
24
            _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(0);
2272
24
            _cur_ttl_cache_lru_queue_element_count_metrics->set_value(0);
2273
24
            _cur_normal_queue_cache_size_metrics->set_value(0);
2274
24
            _cur_normal_queue_element_count_metrics->set_value(0);
2275
24
            _cur_index_queue_cache_size_metrics->set_value(0);
2276
24
            _cur_index_queue_element_count_metrics->set_value(0);
2277
24
            _cur_disposable_queue_cache_size_metrics->set_value(0);
2278
24
            _cur_disposable_queue_element_count_metrics->set_value(0);
2279
2280
24
            clear_need_update_lru_blocks();
2281
2282
24
            ss << "finish clear_file_cache_directly"
2283
24
               << " path=" << _cache_base_path << " time_elapsed_ms="
2284
24
               << duration_cast<milliseconds>(steady_clock::now() - start).count()
2285
24
               << " fd_clear_time_ms=" << (clear_fd_duration / 1000000)
2286
24
               << " num_files=" << num_files << " cache_size=" << cache_size
2287
24
               << " index_queue_size=" << index_queue_size
2288
24
               << " normal_queue_size=" << normal_queue_size
2289
24
               << " disposible_queue_size=" << disposible_queue_size
2290
24
               << "ttl_queue_size=" << ttl_queue_size;
2291
24
            result = ss.str();
2292
24
            LOG(INFO) << result;
2293
24
        }
2294
25
    }
2295
25
    _lru_dumper->remove_lru_dump_files();
2296
25
    resume_ttl_manager();
2297
25
    return result;
2298
25
}
2299
2300
2.00k
std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const UInt128Wrapper& hash) {
2301
2.00k
    std::map<size_t, FileBlockSPtr> offset_to_block;
2302
2.00k
    SCOPED_CACHE_LOCK(_mutex, this);
2303
2.00k
    if (_files.contains(hash)) {
2304
887
        for (auto& [offset, cell] : _files[hash]) {
2305
887
            if (cell.file_block->state() == FileBlock::State::DOWNLOADED) {
2306
887
                cell.file_block->_owned_by_cached_reader = true;
2307
887
                offset_to_block.emplace(offset, cell.file_block);
2308
887
            }
2309
887
        }
2310
871
    }
2311
2.00k
    return offset_to_block;
2312
2.00k
}
2313
2314
0
void BlockFileCache::update_ttl_atime(const UInt128Wrapper& hash) {
2315
0
    SCOPED_CACHE_LOCK(_mutex, this);
2316
0
    if (auto iter = _files.find(hash); iter != _files.end()) {
2317
0
        for (auto& [_, cell] : iter->second) {
2318
0
            cell.update_atime();
2319
0
        }
2320
0
    };
2321
0
}
2322
2323
140
void BlockFileCache::run_background_lru_log_replay() {
2324
140
    Thread::set_self_name("run_background_lru_log_replay");
2325
4.14k
    while (!_close) {
2326
4.14k
        int64_t interval_ms = config::file_cache_background_lru_log_replay_interval_ms;
2327
4.14k
        {
2328
4.14k
            std::unique_lock close_lock(_close_mtx);
2329
4.14k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2330
4.14k
            if (_close) {
2331
140
                break;
2332
140
            }
2333
4.14k
        }
2334
2335
4.00k
        _lru_recorder->replay_queue_event(FileCacheType::TTL);
2336
4.00k
        _lru_recorder->replay_queue_event(FileCacheType::INDEX);
2337
4.00k
        _lru_recorder->replay_queue_event(FileCacheType::NORMAL);
2338
4.00k
        _lru_recorder->replay_queue_event(FileCacheType::DISPOSABLE);
2339
2340
4.00k
        if (config::enable_evaluate_shadow_queue_diff) {
2341
0
            SCOPED_CACHE_LOCK(_mutex, this);
2342
0
            _lru_recorder->evaluate_queue_diff(_ttl_queue, "ttl", cache_lock);
2343
0
            _lru_recorder->evaluate_queue_diff(_index_queue, "index", cache_lock);
2344
0
            _lru_recorder->evaluate_queue_diff(_normal_queue, "normal", cache_lock);
2345
0
            _lru_recorder->evaluate_queue_diff(_disposable_queue, "disposable", cache_lock);
2346
0
        }
2347
4.00k
    }
2348
140
}
2349
2350
1.27k
void BlockFileCache::dump_lru_queues(bool force) {
2351
1.27k
    std::unique_lock dump_lock(_dump_lru_queues_mtx);
2352
1.27k
    if (config::file_cache_background_lru_dump_tail_record_num > 0 &&
2353
1.28k
        !ExecEnv::GetInstance()->get_is_upgrading()) {
2354
1.26k
        _lru_dumper->dump_queue("disposable", force);
2355
1.26k
        _lru_dumper->dump_queue("normal", force);
2356
1.26k
        _lru_dumper->dump_queue("index", force);
2357
1.26k
        _lru_dumper->dump_queue("ttl", force);
2358
1.26k
        _lru_dumper->set_first_dump_done();
2359
1.26k
    }
2360
1.27k
}
2361
2362
140
void BlockFileCache::run_background_lru_dump() {
2363
140
    Thread::set_self_name("run_background_lru_dump");
2364
1.42k
    while (!_close) {
2365
1.42k
        int64_t interval_ms = config::file_cache_background_lru_dump_interval_ms;
2366
1.42k
        {
2367
1.42k
            std::unique_lock close_lock(_close_mtx);
2368
1.42k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2369
1.42k
            if (_close) {
2370
140
                break;
2371
140
            }
2372
1.42k
        }
2373
1.28k
        dump_lru_queues(false);
2374
1.28k
    }
2375
140
}
2376
2377
140
void BlockFileCache::restore_lru_queues_from_disk(std::lock_guard<std::mutex>& cache_lock) {
2378
    // keep this order coz may be duplicated in different queue, we use the first appearence
2379
140
    _lru_dumper->restore_queue(_ttl_queue, "ttl", cache_lock);
2380
140
    _lru_dumper->restore_queue(_index_queue, "index", cache_lock);
2381
140
    _lru_dumper->restore_queue(_normal_queue, "normal", cache_lock);
2382
140
    _lru_dumper->restore_queue(_disposable_queue, "disposable", cache_lock);
2383
140
}
2384
2385
0
std::map<std::string, double> BlockFileCache::get_stats() {
2386
0
    std::map<std::string, double> stats;
2387
0
    stats["hits_ratio"] = (double)_hit_ratio->get_value();
2388
0
    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
2389
0
    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
2390
2391
0
    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
2392
0
    stats["index_queue_curr_size"] = (double)_cur_index_queue_cache_size_metrics->get_value();
2393
0
    stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
2394
0
    stats["index_queue_curr_elements"] =
2395
0
            (double)_cur_index_queue_element_count_metrics->get_value();
2396
2397
0
    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
2398
0
    stats["ttl_queue_curr_size"] = (double)_cur_ttl_cache_lru_queue_cache_size_metrics->get_value();
2399
0
    stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
2400
0
    stats["ttl_queue_curr_elements"] =
2401
0
            (double)_cur_ttl_cache_lru_queue_element_count_metrics->get_value();
2402
2403
0
    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
2404
0
    stats["normal_queue_curr_size"] = (double)_cur_normal_queue_cache_size_metrics->get_value();
2405
0
    stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
2406
0
    stats["normal_queue_curr_elements"] =
2407
0
            (double)_cur_normal_queue_element_count_metrics->get_value();
2408
2409
0
    stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
2410
0
    stats["disposable_queue_curr_size"] =
2411
0
            (double)_cur_disposable_queue_cache_size_metrics->get_value();
2412
0
    stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
2413
0
    stats["disposable_queue_curr_elements"] =
2414
0
            (double)_cur_disposable_queue_element_count_metrics->get_value();
2415
2416
0
    stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance;
2417
0
    stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode;
2418
2419
0
    stats["total_removed_counts"] = (double)_num_removed_blocks->get_value();
2420
0
    stats["total_hit_counts"] = (double)_num_hit_blocks->get_value();
2421
0
    stats["total_read_counts"] = (double)_num_read_blocks->get_value();
2422
2423
0
    stats["total_read_size"] = (double)_total_read_size_metrics->get_value();
2424
0
    stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value();
2425
0
    stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value();
2426
2427
0
    return stats;
2428
0
}
2429
2430
// for be UTs
2431
172
std::map<std::string, double> BlockFileCache::get_stats_unsafe() {
2432
172
    std::map<std::string, double> stats;
2433
172
    stats["hits_ratio"] = (double)_hit_ratio->get_value();
2434
172
    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
2435
172
    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
2436
2437
172
    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
2438
172
    stats["index_queue_curr_size"] = (double)_index_queue.get_capacity_unsafe();
2439
172
    stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
2440
172
    stats["index_queue_curr_elements"] = (double)_index_queue.get_elements_num_unsafe();
2441
2442
172
    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
2443
172
    stats["ttl_queue_curr_size"] = (double)_ttl_queue.get_capacity_unsafe();
2444
172
    stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
2445
172
    stats["ttl_queue_curr_elements"] = (double)_ttl_queue.get_elements_num_unsafe();
2446
2447
172
    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
2448
172
    stats["normal_queue_curr_size"] = (double)_normal_queue.get_capacity_unsafe();
2449
172
    stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
2450
172
    stats["normal_queue_curr_elements"] = (double)_normal_queue.get_elements_num_unsafe();
2451
2452
172
    stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
2453
172
    stats["disposable_queue_curr_size"] = (double)_disposable_queue.get_capacity_unsafe();
2454
172
    stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
2455
172
    stats["disposable_queue_curr_elements"] = (double)_disposable_queue.get_elements_num_unsafe();
2456
2457
172
    stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance;
2458
172
    stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode;
2459
2460
172
    stats["total_removed_counts"] = (double)_num_removed_blocks->get_value();
2461
172
    stats["total_hit_counts"] = (double)_num_hit_blocks->get_value();
2462
172
    stats["total_read_counts"] = (double)_num_read_blocks->get_value();
2463
2464
172
    stats["total_read_size"] = (double)_total_read_size_metrics->get_value();
2465
172
    stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value();
2466
172
    stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value();
2467
2468
172
    return stats;
2469
172
}
2470
2471
template void BlockFileCache::remove(FileBlockSPtr file_block,
2472
                                     std::lock_guard<std::mutex>& cache_lock,
2473
                                     std::lock_guard<std::mutex>& block_lock, bool sync);
2474
2475
1
Status BlockFileCache::report_file_cache_inconsistency(std::vector<std::string>& results) {
2476
1
    InconsistencyContext inconsistency_context;
2477
1
    RETURN_IF_ERROR(check_file_cache_consistency(inconsistency_context));
2478
1
    auto n = inconsistency_context.types.size();
2479
1
    results.reserve(n);
2480
1
    for (size_t i = 0; i < n; i++) {
2481
0
        std::string result;
2482
0
        result += "File cache info in manager:\n";
2483
0
        result += inconsistency_context.infos_in_manager[i].to_string();
2484
0
        result += "File cache info in storage:\n";
2485
0
        result += inconsistency_context.infos_in_storage[i].to_string();
2486
0
        result += inconsistency_context.types[i].to_string();
2487
0
        result += "\n";
2488
0
        results.push_back(std::move(result));
2489
0
    }
2490
1
    return Status::OK();
2491
1
}
2492
2493
1
Status BlockFileCache::check_file_cache_consistency(InconsistencyContext& inconsistency_context) {
2494
1
    std::lock_guard<std::mutex> cache_lock(_mutex);
2495
1
    std::vector<FileCacheInfo> infos_in_storage;
2496
1
    RETURN_IF_ERROR(_storage->get_file_cache_infos(infos_in_storage, cache_lock));
2497
1
    std::unordered_set<AccessKeyAndOffset, KeyAndOffsetHash> confirmed_blocks;
2498
1
    for (const auto& info_in_storage : infos_in_storage) {
2499
0
        confirmed_blocks.insert({info_in_storage.hash, info_in_storage.offset});
2500
0
        auto* cell = get_cell(info_in_storage.hash, info_in_storage.offset, cache_lock);
2501
0
        if (cell == nullptr || cell->file_block == nullptr) {
2502
0
            inconsistency_context.infos_in_manager.emplace_back();
2503
0
            inconsistency_context.infos_in_storage.push_back(info_in_storage);
2504
0
            inconsistency_context.types.emplace_back(InconsistencyType::NOT_LOADED);
2505
0
            continue;
2506
0
        }
2507
0
        FileCacheInfo info_in_manager {
2508
0
                .hash = info_in_storage.hash,
2509
0
                .expiration_time = cell->file_block->expiration_time(),
2510
0
                .size = cell->size(),
2511
0
                .offset = info_in_storage.offset,
2512
0
                .is_tmp = cell->file_block->state() == FileBlock::State::DOWNLOADING,
2513
0
                .cache_type = cell->file_block->cache_type()};
2514
0
        InconsistencyType inconsistent_type;
2515
0
        if (info_in_storage.is_tmp != info_in_manager.is_tmp) {
2516
0
            inconsistent_type |= InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE;
2517
0
        }
2518
0
        size_t expected_size =
2519
0
                info_in_manager.is_tmp ? cell->dowloading_size() : info_in_manager.size;
2520
0
        if (info_in_storage.size != expected_size) {
2521
0
            inconsistent_type |= InconsistencyType::SIZE_INCONSISTENT;
2522
0
        }
2523
        // Only if it is not a tmp file need we check the cache type.
2524
0
        if ((inconsistent_type & InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE) == 0 &&
2525
0
            info_in_storage.cache_type != info_in_manager.cache_type) {
2526
0
            inconsistent_type |= InconsistencyType::CACHE_TYPE_INCONSISTENT;
2527
0
        }
2528
0
        if (info_in_storage.expiration_time != info_in_manager.expiration_time) {
2529
0
            inconsistent_type |= InconsistencyType::EXPIRATION_TIME_INCONSISTENT;
2530
0
        }
2531
0
        if (inconsistent_type != InconsistencyType::NONE) {
2532
0
            inconsistency_context.infos_in_manager.push_back(info_in_manager);
2533
0
            inconsistency_context.infos_in_storage.push_back(info_in_storage);
2534
0
            inconsistency_context.types.push_back(inconsistent_type);
2535
0
        }
2536
0
    }
2537
2538
1
    for (const auto& [hash, offset_to_cell] : _files) {
2539
0
        for (const auto& [offset, cell] : offset_to_cell) {
2540
0
            if (confirmed_blocks.contains({hash, offset})) {
2541
0
                continue;
2542
0
            }
2543
0
            const auto& block = cell.file_block;
2544
0
            inconsistency_context.infos_in_manager.emplace_back(
2545
0
                    hash, block->expiration_time(), cell.size(), offset,
2546
0
                    cell.file_block->state() == FileBlock::State::DOWNLOADING, block->cache_type());
2547
0
            inconsistency_context.infos_in_storage.emplace_back();
2548
0
            inconsistency_context.types.emplace_back(InconsistencyType::MISSING_IN_STORAGE);
2549
0
        }
2550
0
    }
2551
1
    return Status::OK();
2552
1
}
2553
2554
} // namespace doris::io