Coverage Report

Created: 2026-04-15 12:22

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/cache/block_file_cache.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileCache.cpp
19
// and modified by Doris
20
21
#include "io/cache/block_file_cache.h"
22
23
#include <gen_cpp/file_cache.pb.h>
24
25
#include <cstdio>
26
#include <exception>
27
#include <fstream>
28
#include <unordered_set>
29
30
#include "common/status.h"
31
#include "cpp/sync_point.h"
32
#include "runtime/exec_env.h"
33
34
#if defined(__APPLE__)
35
#include <sys/mount.h>
36
#else
37
#include <sys/statfs.h>
38
#endif
39
40
#include <chrono> // IWYU pragma: keep
41
#include <mutex>
42
#include <ranges>
43
44
#include "common/cast_set.h"
45
#include "common/config.h"
46
#include "common/logging.h"
47
#include "core/uint128.h"
48
#include "exec/common/sip_hash.h"
49
#include "io/cache/block_file_cache_ttl_mgr.h"
50
#include "io/cache/file_block.h"
51
#include "io/cache/file_cache_common.h"
52
#include "io/cache/fs_file_cache_storage.h"
53
#include "io/cache/mem_file_cache_storage.h"
54
#include "runtime/runtime_profile.h"
55
#include "util/concurrency_stats.h"
56
#include "util/stack_util.h"
57
#include "util/stopwatch.hpp"
58
#include "util/thread.h"
59
#include "util/time.h"
60
namespace doris::io {
61
62
// Insert a block pointer into one shard while swallowing allocation failures.
63
17.9k
bool NeedUpdateLRUBlocks::insert(FileBlockSPtr block) {
64
17.9k
    if (!block) {
65
2
        return false;
66
2
    }
67
17.9k
    try {
68
17.9k
        auto* raw_ptr = block.get();
69
17.9k
        auto idx = shard_index(raw_ptr);
70
17.9k
        auto& shard = _shards[idx];
71
17.9k
        std::lock_guard lock(shard.mutex);
72
17.9k
        auto [_, inserted] = shard.entries.emplace(raw_ptr, std::move(block));
73
17.9k
        if (inserted) {
74
1.91k
            _size.fetch_add(1, std::memory_order_relaxed);
75
1.91k
        }
76
17.9k
        return inserted;
77
17.9k
    } catch (const std::exception& e) {
78
0
        LOG(WARNING) << "Failed to enqueue block for LRU update: " << e.what();
79
0
    } catch (...) {
80
0
        LOG(WARNING) << "Failed to enqueue block for LRU update: unknown error";
81
0
    }
82
0
    return false;
83
17.9k
}
84
85
// Drain up to `limit` unique blocks to the caller, keeping the structure consistent on failures.
86
13.6k
size_t NeedUpdateLRUBlocks::drain(size_t limit, std::vector<FileBlockSPtr>* output) {
87
13.6k
    if (limit == 0 || output == nullptr) {
88
4
        return 0;
89
4
    }
90
13.6k
    size_t drained = 0;
91
13.6k
    try {
92
13.6k
        output->reserve(output->size() + std::min(limit, size()));
93
855k
        for (auto& shard : _shards) {
94
855k
            if (drained >= limit) {
95
2
                break;
96
2
            }
97
855k
            std::lock_guard lock(shard.mutex);
98
855k
            auto it = shard.entries.begin();
99
855k
            size_t shard_drained = 0;
100
856k
            while (it != shard.entries.end() && drained + shard_drained < limit) {
101
1.42k
                output->emplace_back(std::move(it->second));
102
1.42k
                it = shard.entries.erase(it);
103
1.42k
                ++shard_drained;
104
1.42k
            }
105
855k
            if (shard_drained > 0) {
106
14
                _size.fetch_sub(shard_drained, std::memory_order_relaxed);
107
14
                drained += shard_drained;
108
14
            }
109
855k
        }
110
13.6k
    } catch (const std::exception& e) {
111
0
        LOG(WARNING) << "Failed to drain LRU update blocks: " << e.what();
112
0
    } catch (...) {
113
0
        LOG(WARNING) << "Failed to drain LRU update blocks: unknown error";
114
0
    }
115
13.7k
    return drained;
116
13.6k
}
117
118
// Remove every pending block, guarding against unexpected exceptions.
119
72
void NeedUpdateLRUBlocks::clear() {
120
72
    try {
121
4.60k
        for (auto& shard : _shards) {
122
4.60k
            std::lock_guard lock(shard.mutex);
123
4.60k
            if (!shard.entries.empty()) {
124
2
                auto removed = shard.entries.size();
125
2
                shard.entries.clear();
126
2
                _size.fetch_sub(removed, std::memory_order_relaxed);
127
2
            }
128
4.60k
        }
129
72
    } catch (const std::exception& e) {
130
0
        LOG(WARNING) << "Failed to clear LRU update blocks: " << e.what();
131
0
    } catch (...) {
132
0
        LOG(WARNING) << "Failed to clear LRU update blocks: unknown error";
133
0
    }
134
72
}
135
136
17.9k
size_t NeedUpdateLRUBlocks::shard_index(FileBlock* ptr) const {
137
17.9k
    DCHECK(ptr != nullptr);
138
17.9k
    return std::hash<FileBlock*> {}(ptr)&kShardMask;
139
17.9k
}
140
141
BlockFileCache::BlockFileCache(const std::string& cache_base_path,
142
                               const FileCacheSettings& cache_settings)
143
327
        : _cache_base_path(cache_base_path),
144
327
          _capacity(cache_settings.capacity),
145
327
          _max_file_block_size(cache_settings.max_file_block_size) {
146
327
    _cur_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(_cache_base_path.c_str(),
147
327
                                                                     "file_cache_cache_size", 0);
148
327
    _cache_capacity_metrics = std::make_shared<bvar::Status<size_t>>(
149
327
            _cache_base_path.c_str(), "file_cache_capacity", _capacity);
150
327
    _cur_ttl_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
151
327
            _cache_base_path.c_str(), "file_cache_ttl_cache_size", 0);
152
327
    _cur_normal_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
153
327
            _cache_base_path.c_str(), "file_cache_normal_queue_element_count", 0);
154
327
    _cur_ttl_cache_lru_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
155
327
            _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_size", 0);
156
327
    _cur_ttl_cache_lru_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
157
327
            _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_element_count", 0);
158
327
    _cur_normal_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
159
327
            _cache_base_path.c_str(), "file_cache_normal_queue_cache_size", 0);
160
327
    _cur_index_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
161
327
            _cache_base_path.c_str(), "file_cache_index_queue_element_count", 0);
162
327
    _cur_index_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
163
327
            _cache_base_path.c_str(), "file_cache_index_queue_cache_size", 0);
164
327
    _cur_disposable_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
165
327
            _cache_base_path.c_str(), "file_cache_disposable_queue_element_count", 0);
166
327
    _cur_disposable_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
167
327
            _cache_base_path.c_str(), "file_cache_disposable_queue_cache_size", 0);
168
169
327
    _queue_evict_size_metrics[0] = std::make_shared<bvar::Adder<size_t>>(
170
327
            _cache_base_path.c_str(), "file_cache_index_queue_evict_size");
171
327
    _queue_evict_size_metrics[1] = std::make_shared<bvar::Adder<size_t>>(
172
327
            _cache_base_path.c_str(), "file_cache_normal_queue_evict_size");
173
327
    _queue_evict_size_metrics[2] = std::make_shared<bvar::Adder<size_t>>(
174
327
            _cache_base_path.c_str(), "file_cache_disposable_queue_evict_size");
175
327
    _queue_evict_size_metrics[3] = std::make_shared<bvar::Adder<size_t>>(
176
327
            _cache_base_path.c_str(), "file_cache_ttl_cache_evict_size");
177
327
    _total_evict_size_metrics = std::make_shared<bvar::Adder<size_t>>(
178
327
            _cache_base_path.c_str(), "file_cache_total_evict_size");
179
327
    _total_read_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
180
327
                                                                     "file_cache_total_read_size");
181
327
    _total_hit_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
182
327
                                                                    "file_cache_total_hit_size");
183
327
    _gc_evict_bytes_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
184
327
                                                                    "file_cache_gc_evict_bytes");
185
327
    _gc_evict_count_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
186
327
                                                                    "file_cache_gc_evict_count");
187
188
327
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
189
327
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
190
327
                                                  "file_cache_evict_by_time_disposable_to_normal");
191
327
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
192
327
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
193
327
                                                  "file_cache_evict_by_time_disposable_to_index");
194
327
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
195
327
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
196
327
                                                  "file_cache_evict_by_time_disposable_to_ttl");
197
327
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
198
327
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
199
327
                                                  "file_cache_evict_by_time_normal_to_disposable");
200
327
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
201
327
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
202
327
                                                  "file_cache_evict_by_time_normal_to_index");
203
327
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
204
327
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
205
327
                                                  "file_cache_evict_by_time_normal_to_ttl");
206
327
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
207
327
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
208
327
                                                  "file_cache_evict_by_time_index_to_disposable");
209
327
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
210
327
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
211
327
                                                  "file_cache_evict_by_time_index_to_normal");
212
327
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
213
327
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
214
327
                                                  "file_cache_evict_by_time_index_to_ttl");
215
327
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
216
327
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
217
327
                                                  "file_cache_evict_by_time_ttl_to_disposable");
218
327
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
219
327
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
220
327
                                                  "file_cache_evict_by_time_ttl_to_normal");
221
327
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
222
327
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
223
327
                                                  "file_cache_evict_by_time_ttl_to_index");
224
225
327
    _evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE] =
226
327
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
227
327
                                                  "file_cache_evict_by_self_lru_disposable");
228
327
    _evict_by_self_lru_metrics_matrix[FileCacheType::NORMAL] =
229
327
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
230
327
                                                  "file_cache_evict_by_self_lru_normal");
231
327
    _evict_by_self_lru_metrics_matrix[FileCacheType::INDEX] = std::make_shared<bvar::Adder<size_t>>(
232
327
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_index");
233
327
    _evict_by_self_lru_metrics_matrix[FileCacheType::TTL] = std::make_shared<bvar::Adder<size_t>>(
234
327
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_ttl");
235
236
327
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
237
327
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
238
327
                                                  "file_cache_evict_by_size_disposable_to_normal");
239
327
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
240
327
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
241
327
                                                  "file_cache_evict_by_size_disposable_to_index");
242
327
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
243
327
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
244
327
                                                  "file_cache_evict_by_size_disposable_to_ttl");
245
327
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
246
327
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
247
327
                                                  "file_cache_evict_by_size_normal_to_disposable");
248
327
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
249
327
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
250
327
                                                  "file_cache_evict_by_size_normal_to_index");
251
327
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
252
327
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
253
327
                                                  "file_cache_evict_by_size_normal_to_ttl");
254
327
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
255
327
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
256
327
                                                  "file_cache_evict_by_size_index_to_disposable");
257
327
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
258
327
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
259
327
                                                  "file_cache_evict_by_size_index_to_normal");
260
327
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
261
327
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
262
327
                                                  "file_cache_evict_by_size_index_to_ttl");
263
327
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
264
327
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
265
327
                                                  "file_cache_evict_by_size_ttl_to_disposable");
266
327
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
267
327
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
268
327
                                                  "file_cache_evict_by_size_ttl_to_normal");
269
327
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
270
327
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
271
327
                                                  "file_cache_evict_by_size_ttl_to_index");
272
273
327
    _evict_by_try_release = std::make_shared<bvar::Adder<size_t>>(
274
327
            _cache_base_path.c_str(), "file_cache_evict_by_try_release");
275
276
327
    _num_read_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
277
327
                                                             "file_cache_num_read_blocks");
278
327
    _num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
279
327
                                                            "file_cache_num_hit_blocks");
280
327
    _num_removed_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
281
327
                                                                "file_cache_num_removed_blocks");
282
283
327
    _no_warmup_num_read_blocks = std::make_shared<bvar::Adder<size_t>>(
284
327
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks");
285
327
    _no_warmup_num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(
286
327
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks");
287
288
327
#ifndef BE_TEST
289
327
    _num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
290
327
            _cache_base_path.c_str(), "file_cache_num_hit_blocks_5m", _num_hit_blocks.get(), 300);
291
327
    _num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
292
327
            _cache_base_path.c_str(), "file_cache_num_read_blocks_5m", _num_read_blocks.get(), 300);
293
327
    _num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
294
327
            _cache_base_path.c_str(), "file_cache_num_hit_blocks_1h", _num_hit_blocks.get(), 3600);
295
327
    _num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
296
327
            _cache_base_path.c_str(), "file_cache_num_read_blocks_1h", _num_read_blocks.get(),
297
327
            3600);
298
327
    _no_warmup_num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
299
327
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_5m",
300
327
            _no_warmup_num_hit_blocks.get(), 300);
301
327
    _no_warmup_num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
302
327
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_5m",
303
327
            _no_warmup_num_read_blocks.get(), 300);
304
327
    _no_warmup_num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
305
327
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_1h",
306
327
            _no_warmup_num_hit_blocks.get(), 3600);
307
327
    _no_warmup_num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
308
327
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_1h",
309
327
            _no_warmup_num_read_blocks.get(), 3600);
310
327
#endif
311
312
327
    _hit_ratio = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
313
327
                                                        "file_cache_hit_ratio", 0.0);
314
327
    _hit_ratio_5m = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
315
327
                                                           "file_cache_hit_ratio_5m", 0.0);
316
327
    _hit_ratio_1h = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
317
327
                                                           "file_cache_hit_ratio_1h", 0.0);
318
319
327
    _no_warmup_hit_ratio = std::make_shared<bvar::Status<double>>(
320
327
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio", 0.0);
321
327
    _no_warmup_hit_ratio_5m = std::make_shared<bvar::Status<double>>(
322
327
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_5m", 0.0);
323
327
    _no_warmup_hit_ratio_1h = std::make_shared<bvar::Status<double>>(
324
327
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_1h", 0.0);
325
326
327
    _disk_limit_mode_metrics = std::make_shared<bvar::Status<size_t>>(
327
327
            _cache_base_path.c_str(), "file_cache_disk_limit_mode", 0);
328
327
    _need_evict_cache_in_advance_metrics = std::make_shared<bvar::Status<size_t>>(
329
327
            _cache_base_path.c_str(), "file_cache_need_evict_cache_in_advance", 0);
330
327
    _meta_store_write_queue_size_metrics = std::make_shared<bvar::Status<size_t>>(
331
327
            _cache_base_path.c_str(), "file_cache_meta_store_write_queue_size", 0);
332
333
327
    _cache_lock_wait_time_us = std::make_shared<bvar::LatencyRecorder>(
334
327
            _cache_base_path.c_str(), "file_cache_cache_lock_wait_time_us");
335
327
    _get_or_set_latency_us = std::make_shared<bvar::LatencyRecorder>(
336
327
            _cache_base_path.c_str(), "file_cache_get_or_set_latency_us");
337
327
    _storage_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
338
327
            _cache_base_path.c_str(), "file_cache_storage_sync_remove_latency_us");
339
327
    _storage_retry_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
340
327
            _cache_base_path.c_str(), "file_cache_storage_retry_sync_remove_latency_us");
341
327
    _storage_async_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
342
327
            _cache_base_path.c_str(), "file_cache_storage_async_remove_latency_us");
343
327
    _evict_in_advance_latency_us = std::make_shared<bvar::LatencyRecorder>(
344
327
            _cache_base_path.c_str(), "file_cache_evict_in_advance_latency_us");
345
327
    _lru_dump_latency_us = std::make_shared<bvar::LatencyRecorder>(
346
327
            _cache_base_path.c_str(), "file_cache_lru_dump_latency_us");
347
327
    _recycle_keys_length_recorder = std::make_shared<bvar::LatencyRecorder>(
348
327
            _cache_base_path.c_str(), "file_cache_recycle_keys_length");
349
327
    _need_update_lru_blocks_length_recorder = std::make_shared<bvar::LatencyRecorder>(
350
327
            _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_length");
351
327
    _update_lru_blocks_latency_us = std::make_shared<bvar::LatencyRecorder>(
352
327
            _cache_base_path.c_str(), "file_cache_update_lru_blocks_latency_us");
353
327
    _ttl_gc_latency_us = std::make_shared<bvar::LatencyRecorder>(_cache_base_path.c_str(),
354
327
                                                                 "file_cache_ttl_gc_latency_us");
355
327
    _shadow_queue_levenshtein_distance = std::make_shared<bvar::LatencyRecorder>(
356
327
            _cache_base_path.c_str(), "file_cache_shadow_queue_levenshtein_distance");
357
358
327
    _disposable_queue = LRUQueue(cache_settings.disposable_queue_size,
359
327
                                 cache_settings.disposable_queue_elements, 60 * 60);
360
327
    _index_queue = LRUQueue(cache_settings.index_queue_size, cache_settings.index_queue_elements,
361
327
                            7 * 24 * 60 * 60);
362
327
    _normal_queue = LRUQueue(cache_settings.query_queue_size, cache_settings.query_queue_elements,
363
327
                             24 * 60 * 60);
364
327
    _ttl_queue = LRUQueue(cache_settings.ttl_queue_size, cache_settings.ttl_queue_elements,
365
327
                          std::numeric_limits<int>::max());
366
367
327
    _lru_recorder = std::make_unique<LRUQueueRecorder>(this);
368
327
    _lru_dumper = std::make_unique<CacheLRUDumper>(this, _lru_recorder.get());
369
327
    if (cache_settings.storage == "memory") {
370
36
        _storage = std::make_unique<MemFileCacheStorage>();
371
36
        _cache_base_path = "memory";
372
291
    } else {
373
291
        _storage = std::make_unique<FSFileCacheStorage>();
374
291
    }
375
376
327
    LOG(INFO) << "file cache path= " << _cache_base_path << " " << cache_settings.to_string();
377
327
}
378
379
1.94M
UInt128Wrapper BlockFileCache::hash(const std::string& path) {
380
1.94M
    uint128_t value;
381
1.94M
    sip_hash128(path.data(), path.size(), reinterpret_cast<char*>(&value));
382
1.94M
    return UInt128Wrapper(value);
383
1.94M
}
384
385
BlockFileCache::QueryFileCacheContextHolderPtr BlockFileCache::get_query_context_holder(
386
36
        const TUniqueId& query_id, int file_cache_query_limit_percent) {
387
36
    SCOPED_CACHE_LOCK(_mutex, this);
388
36
    if (!config::enable_file_cache_query_limit) {
389
2
        return {};
390
2
    }
391
392
    /// if enable_filesystem_query_cache_limit is true,
393
    /// we create context query for current query.
394
34
    auto context = get_or_set_query_context(query_id, cache_lock, file_cache_query_limit_percent);
395
34
    return std::make_unique<QueryFileCacheContextHolder>(query_id, this, context);
396
36
}
397
398
BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_query_context(
399
9.54k
        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock) {
400
9.54k
    auto query_iter = _query_map.find(query_id);
401
9.54k
    return (query_iter == _query_map.end()) ? nullptr : query_iter->second;
402
9.54k
}
403
404
32
void BlockFileCache::remove_query_context(const TUniqueId& query_id) {
405
32
    SCOPED_CACHE_LOCK(_mutex, this);
406
32
    const auto& query_iter = _query_map.find(query_id);
407
408
32
    if (query_iter != _query_map.end() && query_iter->second.use_count() <= 1) {
409
30
        _query_map.erase(query_iter);
410
30
    }
411
32
}
412
413
BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_or_set_query_context(
414
        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock,
415
34
        int file_cache_query_limit_percent) {
416
34
    if (query_id.lo == 0 && query_id.hi == 0) {
417
2
        return nullptr;
418
2
    }
419
420
32
    auto context = get_query_context(query_id, cache_lock);
421
32
    if (context) {
422
2
        return context;
423
2
    }
424
425
30
    size_t file_cache_query_limit_size = _capacity * file_cache_query_limit_percent / 100;
426
30
    if (file_cache_query_limit_size < 268435456) {
427
30
        LOG(WARNING) << "The user-set file cache query limit (" << file_cache_query_limit_size
428
30
                     << " bytes) is less than the 256MB recommended minimum. "
429
30
                     << "Consider increasing the session variable 'file_cache_query_limit_percent'"
430
30
                     << " from its current value " << file_cache_query_limit_percent << "%.";
431
30
    }
432
30
    auto query_context = std::make_shared<QueryFileCacheContext>(file_cache_query_limit_size);
433
30
    auto query_iter = _query_map.emplace(query_id, query_context).first;
434
30
    return query_iter->second;
435
32
}
436
437
void BlockFileCache::QueryFileCacheContext::remove(const UInt128Wrapper& hash, size_t offset,
438
106
                                                   std::lock_guard<std::mutex>& cache_lock) {
439
106
    auto pair = std::make_pair(hash, offset);
440
106
    auto record = records.find(pair);
441
106
    DCHECK(record != records.end());
442
106
    auto iter = record->second;
443
106
    records.erase(pair);
444
106
    lru_queue.remove(iter, cache_lock);
445
106
}
446
447
void BlockFileCache::QueryFileCacheContext::reserve(const UInt128Wrapper& hash, size_t offset,
448
                                                    size_t size,
449
220
                                                    std::lock_guard<std::mutex>& cache_lock) {
450
220
    auto pair = std::make_pair(hash, offset);
451
220
    if (records.find(pair) == records.end()) {
452
218
        auto queue_iter = lru_queue.add(hash, offset, size, cache_lock);
453
218
        records.insert({pair, queue_iter});
454
218
    }
455
220
}
456
457
285
Status BlockFileCache::initialize() {
458
285
    SCOPED_CACHE_LOCK(_mutex, this);
459
285
    return initialize_unlocked(cache_lock);
460
285
}
461
462
285
Status BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lock) {
463
285
    DCHECK(!_is_initialized);
464
285
    _is_initialized = true;
465
285
    if (config::file_cache_background_lru_dump_tail_record_num > 0) {
466
        // requirements:
467
        // 1. restored data should not overwrite the last dump
468
        // 2. restore should happen before load and async load
469
        // 3. all queues should be restored sequencially to avoid conflict
470
        // TODO(zhengyu): we can parralize them but will increase complexity, so lets check the time cost
471
        // to see if any improvement is a necessary
472
285
        restore_lru_queues_from_disk(cache_lock);
473
285
    }
474
285
    RETURN_IF_ERROR(_storage->init(this));
475
476
285
    if (auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get())) {
477
253
        if (auto* meta_store = fs_storage->get_meta_store()) {
478
253
            _ttl_mgr = std::make_unique<BlockFileCacheTtlMgr>(this, meta_store);
479
253
        }
480
253
    }
481
482
285
    _cache_background_monitor_thread = std::thread(&BlockFileCache::run_background_monitor, this);
483
285
    _cache_background_gc_thread = std::thread(&BlockFileCache::run_background_gc, this);
484
285
    _cache_background_evict_in_advance_thread =
485
285
            std::thread(&BlockFileCache::run_background_evict_in_advance, this);
486
285
    _cache_background_block_lru_update_thread =
487
285
            std::thread(&BlockFileCache::run_background_block_lru_update, this);
488
489
    // Initialize LRU dump thread and restore queues
490
285
    _cache_background_lru_dump_thread = std::thread(&BlockFileCache::run_background_lru_dump, this);
491
285
    _cache_background_lru_log_replay_thread =
492
285
            std::thread(&BlockFileCache::run_background_lru_log_replay, this);
493
494
285
    return Status::OK();
495
285
}
496
497
void BlockFileCache::update_block_lru(FileBlockSPtr block,
498
1.42k
                                      std::lock_guard<std::mutex>& cache_lock) {
499
1.42k
    if (!block) {
500
2
        return;
501
2
    }
502
503
1.41k
    FileBlockCell* cell = get_cell(block->get_hash_value(), block->offset(), cache_lock);
504
1.41k
    if (!cell || cell->file_block.get() != block.get()) {
505
5
        return;
506
5
    }
507
508
1.41k
    if (cell->queue_iterator) {
509
1.41k
        auto& queue = get_queue(block->cache_type());
510
1.41k
        queue.move_to_end(*cell->queue_iterator, cache_lock);
511
1.41k
        _lru_recorder->record_queue_event(block->cache_type(), CacheLRULogType::MOVETOBACK,
512
1.41k
                                          block->_key.hash, block->_key.offset,
513
1.41k
                                          block->_block_range.size());
514
1.41k
    }
515
1.41k
    cell->update_atime();
516
1.41k
}
517
518
void BlockFileCache::use_cell(const FileBlockCell& cell, FileBlocks* result, bool move_iter_flag,
519
21.5M
                              std::lock_guard<std::mutex>& cache_lock) {
520
21.5M
    if (result) {
521
21.5M
        result->push_back(cell.file_block);
522
21.5M
    }
523
524
21.5M
    auto& queue = get_queue(cell.file_block->cache_type());
525
    /// Move to the end of the queue. The iterator remains valid.
526
21.5M
    if (cell.queue_iterator && move_iter_flag) {
527
20.4M
        queue.move_to_end(*cell.queue_iterator, cache_lock);
528
20.4M
        _lru_recorder->record_queue_event(cell.file_block->cache_type(),
529
20.4M
                                          CacheLRULogType::MOVETOBACK, cell.file_block->_key.hash,
530
20.4M
                                          cell.file_block->_key.offset, cell.size());
531
20.4M
    }
532
533
21.5M
    cell.update_atime();
534
21.5M
}
535
536
template <class T>
537
    requires IsXLock<T>
538
FileBlockCell* BlockFileCache::get_cell(const UInt128Wrapper& hash, size_t offset,
539
1.91M
                                        T& /* cache_lock */) {
540
1.91M
    auto it = _files.find(hash);
541
1.91M
    if (it == _files.end()) {
542
6
        return nullptr;
543
6
    }
544
545
1.91M
    auto& offsets = it->second;
546
1.91M
    auto cell_it = offsets.find(offset);
547
1.91M
    if (cell_it == offsets.end()) {
548
6
        return nullptr;
549
6
    }
550
551
1.91M
    return &cell_it->second;
552
1.91M
}
553
554
21.5M
bool BlockFileCache::need_to_move(FileCacheType cell_type, FileCacheType query_type) const {
555
21.5M
    return query_type != FileCacheType::DISPOSABLE && cell_type != FileCacheType::DISPOSABLE;
556
21.5M
}
557
558
FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheContext& context,
559
                                    const FileBlock::Range& range,
560
21.6M
                                    std::lock_guard<std::mutex>& cache_lock) {
561
    /// Given range = [left, right] and non-overlapping ordered set of file blocks,
562
    /// find list [block1, ..., blockN] of blocks which intersect with given range.
563
21.6M
    auto it = _files.find(hash);
564
21.6M
    if (it == _files.end()) {
565
122k
        if (_async_open_done) {
566
122k
            return {};
567
122k
        }
568
3
        FileCacheKey key;
569
3
        key.hash = hash;
570
3
        key.meta.type = context.cache_type;
571
3
        key.meta.expiration_time = context.expiration_time;
572
3
        key.meta.tablet_id = context.tablet_id;
573
3
        _storage->load_blocks_directly_unlocked(this, key, cache_lock);
574
575
3
        it = _files.find(hash);
576
3
        if (it == _files.end()) [[unlikely]] {
577
3
            return {};
578
3
        }
579
3
    }
580
581
21.4M
    auto& file_blocks = it->second;
582
21.4M
    if (file_blocks.empty()) {
583
0
        LOG(WARNING) << "file_blocks is empty for hash=" << hash.to_string()
584
0
                     << " cache type=" << context.cache_type
585
0
                     << " cache expiration time=" << context.expiration_time
586
0
                     << " cache range=" << range.left << " " << range.right
587
0
                     << " query id=" << context.query_id;
588
0
        DCHECK(false);
589
0
        _files.erase(hash);
590
0
        return {};
591
0
    }
592
593
21.4M
    FileBlocks result;
594
21.4M
    auto block_it = file_blocks.lower_bound(range.left);
595
21.4M
    if (block_it == file_blocks.end()) {
596
        /// N - last cached block for given file hash, block{N}.offset < range.left:
597
        ///   block{N}                       block{N}
598
        /// [________                         [_______]
599
        ///     [__________]         OR                  [________]
600
        ///     ^                                        ^
601
        ///     range.left                               range.left
602
603
13.0k
        const auto& cell = file_blocks.rbegin()->second;
604
13.0k
        if (cell.file_block->range().right < range.left) {
605
12.9k
            return {};
606
12.9k
        }
607
608
28
        use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
609
28
                 cache_lock);
610
21.4M
    } else { /// block_it <-- segmment{k}
611
21.4M
        if (block_it != file_blocks.begin()) {
612
96.3k
            const auto& prev_cell = std::prev(block_it)->second;
613
96.3k
            const auto& prev_cell_range = prev_cell.file_block->range();
614
615
96.3k
            if (range.left <= prev_cell_range.right) {
616
                ///   block{k-1}  block{k}
617
                ///   [________]   [_____
618
                ///       [___________
619
                ///       ^
620
                ///       range.left
621
622
274
                use_cell(prev_cell, &result,
623
274
                         need_to_move(prev_cell.file_block->cache_type(), context.cache_type),
624
274
                         cache_lock);
625
274
            }
626
96.3k
        }
627
628
        ///  block{k} ...       block{k-1}  block{k}                      block{k}
629
        ///  [______              [______]     [____                        [________
630
        ///  [_________     OR              [________      OR    [______]   ^
631
        ///  ^                              ^                           ^   block{k}.offset
632
        ///  range.left                     range.left                  range.right
633
634
43.0M
        while (block_it != file_blocks.end()) {
635
21.7M
            const auto& cell = block_it->second;
636
21.7M
            if (range.right < cell.file_block->range().left) {
637
152k
                break;
638
152k
            }
639
640
21.5M
            use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
641
21.5M
                     cache_lock);
642
21.5M
            ++block_it;
643
21.5M
        }
644
21.4M
    }
645
646
21.4M
    return result;
647
21.4M
}
648
649
17.9k
void BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) {
650
17.9k
    if (_need_update_lru_blocks.insert(std::move(block))) {
651
1.89k
        *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
652
1.89k
    }
653
17.9k
}
654
655
8
std::string BlockFileCache::clear_file_cache_async() {
656
8
    LOG(INFO) << "start clear_file_cache_async, path=" << _cache_base_path;
657
8
    _lru_dumper->remove_lru_dump_files();
658
8
    int64_t num_cells_all = 0;
659
8
    int64_t num_cells_to_delete = 0;
660
8
    int64_t num_cells_wait_recycle = 0;
661
8
    int64_t num_files_all = 0;
662
8
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::clear_file_cache_async");
663
8
    {
664
8
        SCOPED_CACHE_LOCK(_mutex, this);
665
666
8
        std::vector<FileBlockCell*> deleting_cells;
667
8
        for (auto& [_, offset_to_cell] : _files) {
668
8
            ++num_files_all;
669
96
            for (auto& [_1, cell] : offset_to_cell) {
670
96
                ++num_cells_all;
671
96
                deleting_cells.push_back(&cell);
672
96
            }
673
8
        }
674
675
        // we cannot delete the element in the loop above, because it will break the iterator
676
96
        for (auto& cell : deleting_cells) {
677
96
            if (!cell->releasable()) {
678
4
                LOG(INFO) << "cell is not releasable, hash="
679
4
                          << " offset=" << cell->file_block->offset();
680
4
                cell->file_block->set_deleting();
681
4
                ++num_cells_wait_recycle;
682
4
                continue;
683
4
            }
684
92
            FileBlockSPtr file_block = cell->file_block;
685
92
            if (file_block) {
686
92
                std::lock_guard block_lock(file_block->_mutex);
687
92
                remove(file_block, cache_lock, block_lock, false);
688
92
                ++num_cells_to_delete;
689
92
            }
690
92
        }
691
8
        clear_need_update_lru_blocks();
692
8
    }
693
694
8
    std::stringstream ss;
695
8
    ss << "finish clear_file_cache_async, path=" << _cache_base_path
696
8
       << " num_files_all=" << num_files_all << " num_cells_all=" << num_cells_all
697
8
       << " num_cells_to_delete=" << num_cells_to_delete
698
8
       << " num_cells_wait_recycle=" << num_cells_wait_recycle;
699
8
    auto msg = ss.str();
700
8
    LOG(INFO) << msg;
701
8
    _lru_dumper->remove_lru_dump_files();
702
8
    return msg;
703
8
}
704
705
FileBlocks BlockFileCache::split_range_into_cells(const UInt128Wrapper& hash,
706
                                                  const CacheContext& context, size_t offset,
707
                                                  size_t size, FileBlock::State state,
708
138k
                                                  std::lock_guard<std::mutex>& cache_lock) {
709
138k
    DCHECK(size > 0);
710
711
138k
    auto current_pos = offset;
712
138k
    auto end_pos_non_included = offset + size;
713
714
138k
    size_t current_size = 0;
715
138k
    size_t remaining_size = size;
716
717
138k
    FileBlocks file_blocks;
718
511k
    while (current_pos < end_pos_non_included) {
719
373k
        current_size = std::min(remaining_size, _max_file_block_size);
720
373k
        remaining_size -= current_size;
721
373k
        state = try_reserve(hash, context, current_pos, current_size, cache_lock)
722
373k
                        ? state
723
373k
                        : FileBlock::State::SKIP_CACHE;
724
373k
        if (state == FileBlock::State::SKIP_CACHE) [[unlikely]] {
725
7.19k
            FileCacheKey key;
726
7.19k
            key.hash = hash;
727
7.19k
            key.offset = current_pos;
728
7.19k
            key.meta.type = context.cache_type;
729
7.19k
            key.meta.expiration_time = context.expiration_time;
730
7.19k
            key.meta.tablet_id = context.tablet_id;
731
7.19k
            auto file_block = std::make_shared<FileBlock>(key, current_size, this,
732
7.19k
                                                          FileBlock::State::SKIP_CACHE);
733
7.19k
            file_blocks.push_back(std::move(file_block));
734
366k
        } else {
735
366k
            auto* cell = add_cell(hash, context, current_pos, current_size, state, cache_lock);
736
366k
            if (cell) {
737
366k
                file_blocks.push_back(cell->file_block);
738
366k
                if (!context.is_cold_data) {
739
366k
                    cell->update_atime();
740
366k
                }
741
366k
            }
742
366k
            if (_ttl_mgr && context.tablet_id != 0) {
743
351k
                _ttl_mgr->register_tablet_id(context.tablet_id);
744
351k
            }
745
366k
        }
746
747
373k
        current_pos += current_size;
748
373k
    }
749
750
138k
    DCHECK(file_blocks.empty() || offset + size - 1 == file_blocks.back()->range().right);
751
138k
    return file_blocks;
752
138k
}
753
754
void BlockFileCache::fill_holes_with_empty_file_blocks(FileBlocks& file_blocks,
755
                                                       const UInt128Wrapper& hash,
756
                                                       const CacheContext& context,
757
                                                       const FileBlock::Range& range,
758
21.4M
                                                       std::lock_guard<std::mutex>& cache_lock) {
759
    /// There are blocks [block1, ..., blockN]
760
    /// (non-overlapping, non-empty, ascending-ordered) which (maybe partially)
761
    /// intersect with given range.
762
763
    /// It can have holes:
764
    /// [____________________]         -- requested range
765
    ///     [____]  [_]   [_________]  -- intersecting cache [block1, ..., blockN]
766
    ///
767
    /// For each such hole create a cell with file block state EMPTY.
768
769
21.4M
    auto it = file_blocks.begin();
770
21.4M
    auto block_range = (*it)->range();
771
772
21.4M
    size_t current_pos = 0;
773
21.4M
    if (block_range.left < range.left) {
774
        ///    [_______     -- requested range
775
        /// [_______
776
        /// ^
777
        /// block1
778
779
302
        current_pos = block_range.right + 1;
780
302
        ++it;
781
21.4M
    } else {
782
21.4M
        current_pos = range.left;
783
21.4M
    }
784
785
43.0M
    while (current_pos <= range.right && it != file_blocks.end()) {
786
21.5M
        block_range = (*it)->range();
787
788
21.5M
        if (current_pos == block_range.left) {
789
21.5M
            current_pos = block_range.right + 1;
790
21.5M
            ++it;
791
21.5M
            continue;
792
21.5M
        }
793
794
21.5M
        DCHECK(current_pos < block_range.left);
795
796
287
        auto hole_size = block_range.left - current_pos;
797
798
287
        file_blocks.splice(it, split_range_into_cells(hash, context, current_pos, hole_size,
799
287
                                                      FileBlock::State::EMPTY, cache_lock));
800
801
287
        current_pos = block_range.right + 1;
802
287
        ++it;
803
287
    }
804
805
21.4M
    if (current_pos <= range.right) {
806
        ///   ________]     -- requested range
807
        ///   _____]
808
        ///        ^
809
        /// blockN
810
811
376
        auto hole_size = range.right - current_pos + 1;
812
813
376
        file_blocks.splice(file_blocks.end(),
814
376
                           split_range_into_cells(hash, context, current_pos, hole_size,
815
376
                                                  FileBlock::State::EMPTY, cache_lock));
816
376
    }
817
21.4M
}
818
819
FileBlocksHolder BlockFileCache::get_or_set(const UInt128Wrapper& hash, size_t offset, size_t size,
820
21.4M
                                            CacheContext& context) {
821
21.4M
    FileBlock::Range range(offset, offset + size - 1);
822
823
21.4M
    ReadStatistics* stats = context.stats;
824
21.4M
    DCHECK(stats != nullptr);
825
21.4M
    MonotonicStopWatch sw;
826
21.4M
    sw.start();
827
21.4M
    ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->increment();
828
21.4M
    std::lock_guard cache_lock(_mutex);
829
21.4M
    ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->decrement();
830
21.4M
    stats->lock_wait_timer += sw.elapsed_time();
831
21.4M
    FileBlocks file_blocks;
832
21.4M
    int64_t duration = 0;
833
21.4M
    {
834
21.4M
        SCOPED_RAW_TIMER(&duration);
835
        /// Get all blocks which intersect with the given range.
836
21.4M
        {
837
21.4M
            SCOPED_RAW_TIMER(&stats->get_timer);
838
21.4M
            file_blocks = get_impl(hash, context, range, cache_lock);
839
21.4M
        }
840
841
21.4M
        if (file_blocks.empty()) {
842
137k
            SCOPED_RAW_TIMER(&stats->set_timer);
843
137k
            file_blocks = split_range_into_cells(hash, context, offset, size,
844
137k
                                                 FileBlock::State::EMPTY, cache_lock);
845
21.2M
        } else {
846
21.2M
            SCOPED_RAW_TIMER(&stats->set_timer);
847
21.2M
            fill_holes_with_empty_file_blocks(file_blocks, hash, context, range, cache_lock);
848
21.2M
        }
849
21.4M
        DCHECK(!file_blocks.empty());
850
21.4M
        *_num_read_blocks << file_blocks.size();
851
21.5M
        if (!context.is_warmup) {
852
21.5M
            *_no_warmup_num_read_blocks << file_blocks.size();
853
21.5M
        }
854
21.9M
        for (auto& block : file_blocks) {
855
21.9M
            size_t block_size = block->range().size();
856
21.9M
            *_total_read_size_metrics << block_size;
857
21.9M
            if (block->state_unsafe() == FileBlock::State::DOWNLOADED) {
858
21.5M
                *_num_hit_blocks << 1;
859
21.5M
                *_total_hit_size_metrics << block_size;
860
21.5M
                if (!context.is_warmup) {
861
21.4M
                    *_no_warmup_num_hit_blocks << 1;
862
21.4M
                }
863
21.5M
            }
864
21.9M
        }
865
21.4M
    }
866
21.4M
    *_get_or_set_latency_us << (duration / 1000);
867
21.4M
    return FileBlocksHolder(std::move(file_blocks));
868
21.4M
}
869
870
FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& hash, const CacheContext& context,
871
                                        size_t offset, size_t size, FileBlock::State state,
872
366k
                                        std::lock_guard<std::mutex>& cache_lock) {
873
    /// Create a file block cell and put it in `files` map by [hash][offset].
874
366k
    if (size == 0) {
875
0
        return nullptr; /// Empty files are not cached.
876
0
    }
877
878
366k
    VLOG_DEBUG << "Adding file block to cache. size=" << size << " hash=" << hash.to_string()
879
0
               << " offset=" << offset << " cache_type=" << cache_type_to_string(context.cache_type)
880
0
               << " expiration_time=" << context.expiration_time
881
0
               << " tablet_id=" << context.tablet_id;
882
883
366k
    if (size > 1024 * 1024 * 1024) {
884
0
        LOG(WARNING) << "File block size is too large for a block. size=" << size
885
0
                     << " hash=" << hash.to_string() << " offset=" << offset
886
0
                     << " stack:" << get_stack_trace();
887
0
    }
888
889
366k
    auto& offsets = _files[hash];
890
366k
    auto itr = offsets.find(offset);
891
366k
    if (itr != offsets.end()) {
892
10
        VLOG_DEBUG << "Cache already exists for hash: " << hash.to_string()
893
0
                   << ", offset: " << offset << ", size: " << size
894
0
                   << ".\nCurrent cache structure: " << dump_structure_unlocked(hash, cache_lock);
895
10
        return &(itr->second);
896
10
    }
897
898
366k
    FileCacheKey key;
899
366k
    key.hash = hash;
900
366k
    key.offset = offset;
901
366k
    key.meta.type = context.cache_type;
902
366k
    key.meta.expiration_time = context.expiration_time;
903
366k
    key.meta.tablet_id = context.tablet_id;
904
366k
    FileBlockCell cell(std::make_shared<FileBlock>(key, size, this, state), cache_lock);
905
366k
    Status st;
906
366k
    if (context.expiration_time == 0 && context.cache_type == FileCacheType::TTL) {
907
0
        st = cell.file_block->change_cache_type_lock(FileCacheType::NORMAL, cache_lock);
908
366k
    } else if (context.cache_type != FileCacheType::TTL && context.expiration_time != 0) {
909
2
        st = cell.file_block->change_cache_type_lock(FileCacheType::TTL, cache_lock);
910
2
    }
911
366k
    if (!st.ok()) {
912
0
        LOG(WARNING) << "Cannot change cache type. expiration_time=" << context.expiration_time
913
0
                     << " cache_type=" << cache_type_to_string(context.cache_type)
914
0
                     << " error=" << st.msg();
915
0
    }
916
917
366k
    auto& queue = get_queue(cell.file_block->cache_type());
918
366k
    cell.queue_iterator = queue.add(hash, offset, size, cache_lock);
919
366k
    _lru_recorder->record_queue_event(cell.file_block->cache_type(), CacheLRULogType::ADD,
920
366k
                                      cell.file_block->get_hash_value(), cell.file_block->offset(),
921
366k
                                      cell.size());
922
923
366k
    if (cell.file_block->cache_type() == FileCacheType::TTL) {
924
7.84k
        _cur_ttl_size += cell.size();
925
7.84k
    }
926
366k
    auto [it, _] = offsets.insert(std::make_pair(offset, std::move(cell)));
927
366k
    _cur_cache_size += size;
928
366k
    return &(it->second);
929
366k
}
930
931
0
size_t BlockFileCache::try_release() {
932
0
    SCOPED_CACHE_LOCK(_mutex, this);
933
0
    std::vector<FileBlockCell*> trash;
934
0
    for (auto& [hash, blocks] : _files) {
935
0
        for (auto& [offset, cell] : blocks) {
936
0
            if (cell.releasable()) {
937
0
                trash.emplace_back(&cell);
938
0
            } else {
939
0
                cell.file_block->set_deleting();
940
0
            }
941
0
        }
942
0
    }
943
0
    size_t remove_size = 0;
944
0
    for (auto& cell : trash) {
945
0
        FileBlockSPtr file_block = cell->file_block;
946
0
        std::lock_guard lc(cell->file_block->_mutex);
947
0
        remove_size += file_block->range().size();
948
0
        remove(file_block, cache_lock, lc);
949
0
        VLOG_DEBUG << "try_release " << _cache_base_path
950
0
                   << " hash=" << file_block->get_hash_value().to_string()
951
0
                   << " offset=" << file_block->offset();
952
0
    }
953
0
    *_evict_by_try_release << remove_size;
954
0
    LOG(INFO) << "Released " << trash.size() << " blocks in file cache " << _cache_base_path;
955
0
    return trash.size();
956
0
}
957
958
23.1M
LRUQueue& BlockFileCache::get_queue(FileCacheType type) {
959
23.1M
    switch (type) {
960
15.4M
    case FileCacheType::INDEX:
961
15.4M
        return _index_queue;
962
441k
    case FileCacheType::DISPOSABLE:
963
441k
        return _disposable_queue;
964
7.21M
    case FileCacheType::NORMAL:
965
7.21M
        return _normal_queue;
966
12.4k
    case FileCacheType::TTL:
967
12.4k
        return _ttl_queue;
968
0
    default:
969
0
        DCHECK(false);
970
23.1M
    }
971
0
    return _normal_queue;
972
23.1M
}
973
974
274
const LRUQueue& BlockFileCache::get_queue(FileCacheType type) const {
975
274
    switch (type) {
976
66
    case FileCacheType::INDEX:
977
66
        return _index_queue;
978
2
    case FileCacheType::DISPOSABLE:
979
2
        return _disposable_queue;
980
206
    case FileCacheType::NORMAL:
981
206
        return _normal_queue;
982
0
    case FileCacheType::TTL:
983
0
        return _ttl_queue;
984
0
    default:
985
0
        DCHECK(false);
986
274
    }
987
0
    return _normal_queue;
988
274
}
989
990
void BlockFileCache::remove_file_blocks(std::vector<FileBlockCell*>& to_evict,
991
                                        std::lock_guard<std::mutex>& cache_lock, bool sync,
992
523k
                                        std::string& reason) {
993
523k
    auto remove_file_block_if = [&](FileBlockCell* cell) {
994
114k
        FileBlockSPtr file_block = cell->file_block;
995
114k
        if (file_block) {
996
114k
            std::lock_guard block_lock(file_block->_mutex);
997
114k
            remove(file_block, cache_lock, block_lock, sync);
998
114k
            VLOG_DEBUG << "remove_file_blocks"
999
0
                       << " hash=" << file_block->get_hash_value().to_string()
1000
0
                       << " offset=" << file_block->offset() << " reason=" << reason;
1001
114k
        }
1002
114k
    };
1003
523k
    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
1004
523k
}
1005
1006
void BlockFileCache::find_evict_candidates(LRUQueue& queue, size_t size, size_t cur_cache_size,
1007
                                           size_t& removed_size,
1008
                                           std::vector<FileBlockCell*>& to_evict,
1009
                                           std::lock_guard<std::mutex>& cache_lock,
1010
12.1k
                                           size_t& cur_removed_size, bool evict_in_advance) {
1011
1.48M
    for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1012
1.48M
        if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1013
4.56k
            break;
1014
4.56k
        }
1015
1.48M
        auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1016
1017
1.48M
        DCHECK(cell) << "Cache became inconsistent. key: " << entry_key.to_string()
1018
0
                     << ", offset: " << entry_offset;
1019
1020
1.48M
        size_t cell_size = cell->size();
1021
1.48M
        DCHECK(entry_size == cell_size);
1022
1023
1.48M
        if (cell->releasable()) {
1024
110k
            auto& file_block = cell->file_block;
1025
1026
110k
            std::lock_guard block_lock(file_block->_mutex);
1027
110k
            DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1028
110k
            to_evict.push_back(cell);
1029
110k
            removed_size += cell_size;
1030
110k
            cur_removed_size += cell_size;
1031
110k
        }
1032
1.48M
    }
1033
12.1k
}
1034
1035
// 1. if async load file cache not finish
1036
//     a. evict from lru queue
1037
// 2. if ttl cache
1038
//     a. evict from disposable/normal/index queue one by one
1039
// 3. if dont reach query limit or dont have query limit
1040
//     a. evict from other queue
1041
//     b. evict from current queue
1042
//         a.1 if the data belong write, then just evict cold data
1043
// 4. if reach query limit
1044
//     a. evict from query queue
1045
//     b. evict from other queue
1046
bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const CacheContext& context,
1047
                                 size_t offset, size_t size,
1048
373k
                                 std::lock_guard<std::mutex>& cache_lock) {
1049
373k
    if (!_async_open_done) {
1050
4
        return try_reserve_during_async_load(size, cache_lock);
1051
4
    }
1052
    // use this strategy in scenarios where there is insufficient disk capacity or insufficient number of inodes remaining
1053
    // directly eliminate 5 times the size of the space
1054
373k
    if (_disk_resource_limit_mode) {
1055
4
        size = 5 * size;
1056
4
    }
1057
1058
373k
    auto query_context = config::enable_file_cache_query_limit &&
1059
373k
                                         (context.query_id.hi != 0 || context.query_id.lo != 0)
1060
373k
                                 ? get_query_context(context.query_id, cache_lock)
1061
373k
                                 : nullptr;
1062
373k
    if (!query_context) {
1063
373k
        return try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock);
1064
373k
    } else if (query_context->get_cache_size(cache_lock) + size <=
1065
222
               query_context->get_max_cache_size()) {
1066
132
        return try_reserve_for_lru(hash, query_context, context, offset, size, cache_lock);
1067
132
    }
1068
90
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
1069
90
                               std::chrono::steady_clock::now().time_since_epoch())
1070
90
                               .count();
1071
90
    auto& queue = get_queue(context.cache_type);
1072
90
    size_t removed_size = 0;
1073
90
    size_t ghost_remove_size = 0;
1074
90
    size_t queue_size = queue.get_capacity(cache_lock);
1075
90
    size_t cur_cache_size = _cur_cache_size;
1076
90
    size_t query_context_cache_size = query_context->get_cache_size(cache_lock);
1077
1078
90
    std::vector<LRUQueue::Iterator> ghost;
1079
90
    std::vector<FileBlockCell*> to_evict;
1080
1081
90
    size_t max_size = queue.get_max_size();
1082
1.20k
    auto is_overflow = [&] {
1083
1.20k
        return _disk_resource_limit_mode ? removed_size < size
1084
1.20k
                                         : cur_cache_size + size - removed_size > _capacity ||
1085
1.20k
                                                   (queue_size + size - removed_size > max_size) ||
1086
1.20k
                                                   (query_context_cache_size + size -
1087
1.18k
                                                            (removed_size + ghost_remove_size) >
1088
1.18k
                                                    query_context->get_max_cache_size());
1089
1.20k
    };
1090
1091
    /// Select the cache from the LRU queue held by query for expulsion.
1092
1.14k
    for (auto iter = query_context->queue().begin(); iter != query_context->queue().end(); iter++) {
1093
1.11k
        if (!is_overflow()) {
1094
60
            break;
1095
60
        }
1096
1097
1.05k
        auto* cell = get_cell(iter->hash, iter->offset, cache_lock);
1098
1099
1.05k
        if (!cell) {
1100
            /// The cache corresponding to this record may be swapped out by
1101
            /// other queries, so it has become invalid.
1102
8
            ghost.push_back(iter);
1103
8
            ghost_remove_size += iter->size;
1104
1.04k
        } else {
1105
1.04k
            size_t cell_size = cell->size();
1106
1.04k
            DCHECK(iter->size == cell_size);
1107
1108
1.04k
            if (cell->releasable()) {
1109
98
                auto& file_block = cell->file_block;
1110
1111
98
                std::lock_guard block_lock(file_block->_mutex);
1112
98
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1113
98
                to_evict.push_back(cell);
1114
98
                removed_size += cell_size;
1115
98
            }
1116
1.04k
        }
1117
1.05k
    }
1118
1119
98
    auto remove_file_block_if = [&](FileBlockCell* cell) {
1120
98
        FileBlockSPtr file_block = cell->file_block;
1121
98
        if (file_block) {
1122
98
            query_context->remove(file_block->get_hash_value(), file_block->offset(), cache_lock);
1123
98
            std::lock_guard block_lock(file_block->_mutex);
1124
98
            remove(file_block, cache_lock, block_lock);
1125
98
        }
1126
98
    };
1127
1128
90
    for (auto& iter : ghost) {
1129
8
        query_context->remove(iter->hash, iter->offset, cache_lock);
1130
8
    }
1131
1132
90
    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
1133
1134
90
    if (is_overflow() &&
1135
90
        !try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock)) {
1136
2
        return false;
1137
2
    }
1138
88
    query_context->reserve(hash, offset, size, cache_lock);
1139
88
    return true;
1140
90
}
1141
1142
235
void BlockFileCache::try_evict_in_advance(size_t size, std::lock_guard<std::mutex>& cache_lock) {
1143
235
    UInt128Wrapper hash = UInt128Wrapper();
1144
235
    size_t offset = 0;
1145
235
    CacheContext context;
1146
    /* we pick NORMAL and TTL cache to evict in advance
1147
     * we reserve for them but won't acutually give space to them
1148
     * on the contrary, NORMAL and TTL may sacrifice by LRU evicting themselves
1149
     * other cache types cannot be exempted because we will evict what they have stolen before LRU evicting
1150
     * in summary: all cache types will shrink somewhat, and NORMAL and TTL shrink the most, to make sure the cache is not full
1151
     */
1152
235
    context.cache_type = FileCacheType::NORMAL;
1153
235
    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
1154
235
    context.cache_type = FileCacheType::TTL;
1155
235
    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
1156
235
}
1157
1158
// remove specific cache synchronously, for critical operations
1159
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
1160
16
void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) {
1161
16
    std::string reason = "remove_if_cached";
1162
16
    SCOPED_CACHE_LOCK(_mutex, this);
1163
16
    auto iter = _files.find(file_key);
1164
16
    std::vector<FileBlockCell*> to_remove;
1165
16
    if (iter != _files.end()) {
1166
28
        for (auto& [_, cell] : iter->second) {
1167
28
            if (cell.releasable()) {
1168
26
                to_remove.push_back(&cell);
1169
26
            } else {
1170
2
                cell.file_block->set_deleting();
1171
2
            }
1172
28
        }
1173
12
    }
1174
16
    remove_file_blocks(to_remove, cache_lock, true, reason);
1175
16
}
1176
1177
// the async version of remove_if_cached, for background operations
1178
// cache meta is deleted synchronously if not in use, and the block file is deleted asynchronously
1179
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
1180
137k
void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) {
1181
137k
    std::string reason = "remove_if_cached_async";
1182
137k
    SCOPED_CACHE_LOCK(_mutex, this);
1183
1184
137k
    auto iter = _files.find(file_key);
1185
137k
    std::vector<FileBlockCell*> to_remove;
1186
137k
    if (iter != _files.end()) {
1187
42.1k
        for (auto& [_, cell] : iter->second) {
1188
42.1k
            *_gc_evict_bytes_metrics << cell.size();
1189
42.1k
            *_gc_evict_count_metrics << 1;
1190
42.1k
            if (cell.releasable()) {
1191
3.63k
                to_remove.push_back(&cell);
1192
38.5k
            } else {
1193
38.5k
                cell.file_block->set_deleting();
1194
38.5k
            }
1195
42.1k
        }
1196
41.8k
    }
1197
137k
    remove_file_blocks(to_remove, cache_lock, false, reason);
1198
137k
}
1199
1200
std::vector<FileCacheType> BlockFileCache::get_other_cache_type_without_ttl(
1201
373k
        FileCacheType cur_cache_type) {
1202
373k
    switch (cur_cache_type) {
1203
8.05k
    case FileCacheType::TTL:
1204
8.05k
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
1205
48.4k
    case FileCacheType::INDEX:
1206
48.4k
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL};
1207
312k
    case FileCacheType::NORMAL:
1208
312k
        return {FileCacheType::DISPOSABLE, FileCacheType::INDEX};
1209
4.52k
    case FileCacheType::DISPOSABLE:
1210
4.52k
        return {FileCacheType::NORMAL, FileCacheType::INDEX};
1211
0
    default:
1212
0
        return {};
1213
373k
    }
1214
0
    return {};
1215
373k
}
1216
1217
11.7k
std::vector<FileCacheType> BlockFileCache::get_other_cache_type(FileCacheType cur_cache_type) {
1218
11.7k
    switch (cur_cache_type) {
1219
799
    case FileCacheType::TTL:
1220
799
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
1221
674
    case FileCacheType::INDEX:
1222
674
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::TTL};
1223
9.92k
    case FileCacheType::NORMAL:
1224
9.92k
        return {FileCacheType::DISPOSABLE, FileCacheType::INDEX, FileCacheType::TTL};
1225
336
    case FileCacheType::DISPOSABLE:
1226
336
        return {FileCacheType::NORMAL, FileCacheType::INDEX, FileCacheType::TTL};
1227
0
    default:
1228
0
        return {};
1229
11.7k
    }
1230
0
    return {};
1231
11.7k
}
1232
1233
void BlockFileCache::reset_range(const UInt128Wrapper& hash, size_t offset, size_t old_size,
1234
57.4k
                                 size_t new_size, std::lock_guard<std::mutex>& cache_lock) {
1235
57.4k
    DCHECK(_files.find(hash) != _files.end() &&
1236
57.4k
           _files.find(hash)->second.find(offset) != _files.find(hash)->second.end());
1237
57.4k
    FileBlockCell* cell = get_cell(hash, offset, cache_lock);
1238
57.4k
    DCHECK(cell != nullptr);
1239
57.4k
    if (cell == nullptr) {
1240
0
        LOG(WARNING) << "reset_range skipped because cache cell is missing. hash="
1241
0
                     << hash.to_string() << " offset=" << offset << " old_size=" << old_size
1242
0
                     << " new_size=" << new_size;
1243
0
        return;
1244
0
    }
1245
57.4k
    if (cell->queue_iterator) {
1246
57.4k
        auto& queue = get_queue(cell->file_block->cache_type());
1247
57.4k
        DCHECK(queue.contains(hash, offset, cache_lock));
1248
57.4k
        queue.resize(*cell->queue_iterator, new_size, cache_lock);
1249
57.4k
        _lru_recorder->record_queue_event(cell->file_block->cache_type(), CacheLRULogType::RESIZE,
1250
57.4k
                                          cell->file_block->get_hash_value(),
1251
57.4k
                                          cell->file_block->offset(), new_size);
1252
57.4k
    }
1253
57.4k
    _cur_cache_size -= old_size;
1254
57.4k
    _cur_cache_size += new_size;
1255
57.4k
}
1256
1257
bool BlockFileCache::try_reserve_from_other_queue_by_time_interval(
1258
        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
1259
373k
        int64_t cur_time, std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
1260
373k
    size_t removed_size = 0;
1261
373k
    size_t cur_cache_size = _cur_cache_size;
1262
373k
    std::vector<FileBlockCell*> to_evict;
1263
756k
    for (FileCacheType cache_type : other_cache_types) {
1264
756k
        auto& queue = get_queue(cache_type);
1265
756k
        size_t remove_size_per_type = 0;
1266
756k
        for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1267
690k
            if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1268
676k
                break;
1269
676k
            }
1270
14.7k
            auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1271
14.7k
            DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
1272
0
                         << ", offset: " << entry_offset;
1273
1274
14.7k
            size_t cell_size = cell->size();
1275
14.7k
            DCHECK(entry_size == cell_size);
1276
1277
14.7k
            if (cell->atime == 0 ? true : cell->atime + queue.get_hot_data_interval() > cur_time) {
1278
14.7k
                break;
1279
14.7k
            }
1280
1281
4
            if (cell->releasable()) {
1282
4
                auto& file_block = cell->file_block;
1283
4
                std::lock_guard block_lock(file_block->_mutex);
1284
4
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1285
4
                to_evict.push_back(cell);
1286
4
                removed_size += cell_size;
1287
4
                remove_size_per_type += cell_size;
1288
4
            }
1289
4
        }
1290
756k
        *(_evict_by_time_metrics_matrix[cache_type][cur_type]) << remove_size_per_type;
1291
756k
    }
1292
373k
    bool is_sync_removal = !evict_in_advance;
1293
373k
    std::string reason = std::string("try_reserve_by_time ") +
1294
373k
                         " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1295
373k
    remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1296
1297
373k
    return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
1298
373k
}
1299
1300
bool BlockFileCache::is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size,
1301
2.56M
                                 bool evict_in_advance) const {
1302
2.56M
    bool ret = false;
1303
2.56M
    if (evict_in_advance) { // we don't need to check _need_evict_cache_in_advance
1304
61.8k
        ret = (removed_size < need_size);
1305
61.8k
        return ret;
1306
61.8k
    }
1307
2.50M
    if (_disk_resource_limit_mode) {
1308
32
        ret = (removed_size < need_size);
1309
2.50M
    } else {
1310
2.50M
        ret = (cur_cache_size + need_size - removed_size > _capacity);
1311
2.50M
    }
1312
2.50M
    return ret;
1313
2.56M
}
1314
1315
bool BlockFileCache::try_reserve_from_other_queue_by_size(
1316
        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
1317
1.47k
        std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
1318
1.47k
    size_t removed_size = 0;
1319
1.47k
    size_t cur_cache_size = _cur_cache_size;
1320
1.47k
    std::vector<FileBlockCell*> to_evict;
1321
    // we follow the privilege defined in get_other_cache_types to evict
1322
4.41k
    for (FileCacheType cache_type : other_cache_types) {
1323
4.41k
        auto& queue = get_queue(cache_type);
1324
1325
        // we will not drain each of them to the bottom -- i.e., we only
1326
        // evict what they have stolen.
1327
4.41k
        size_t cur_queue_size = queue.get_capacity(cache_lock);
1328
4.41k
        size_t cur_queue_max_size = queue.get_max_size();
1329
4.41k
        if (cur_queue_size <= cur_queue_max_size) {
1330
2.83k
            continue;
1331
2.83k
        }
1332
1.58k
        size_t cur_removed_size = 0;
1333
1.58k
        find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
1334
1.58k
                              cur_removed_size, evict_in_advance);
1335
1.58k
        *(_evict_by_size_metrics_matrix[cache_type][cur_type]) << cur_removed_size;
1336
1.58k
    }
1337
1.47k
    bool is_sync_removal = !evict_in_advance;
1338
1.47k
    std::string reason = std::string("try_reserve_by_size") +
1339
1.47k
                         " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1340
1.47k
    remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1341
1.47k
    return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
1342
1.47k
}
1343
1344
bool BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type, size_t size,
1345
                                                  int64_t cur_time,
1346
                                                  std::lock_guard<std::mutex>& cache_lock,
1347
373k
                                                  bool evict_in_advance) {
1348
    // currently, TTL cache is not considered as a candidate
1349
373k
    auto other_cache_types = get_other_cache_type_without_ttl(cur_cache_type);
1350
373k
    bool reserve_success = try_reserve_from_other_queue_by_time_interval(
1351
373k
            cur_cache_type, other_cache_types, size, cur_time, cache_lock, evict_in_advance);
1352
373k
    if (reserve_success || !config::file_cache_enable_evict_from_other_queue_by_size) {
1353
362k
        return reserve_success;
1354
362k
    }
1355
1356
11.7k
    other_cache_types = get_other_cache_type(cur_cache_type);
1357
11.7k
    auto& cur_queue = get_queue(cur_cache_type);
1358
11.7k
    size_t cur_queue_size = cur_queue.get_capacity(cache_lock);
1359
11.7k
    size_t cur_queue_max_size = cur_queue.get_max_size();
1360
    // Hit the soft limit by self, cannot remove from other queues
1361
11.7k
    if (_cur_cache_size + size > _capacity && cur_queue_size + size > cur_queue_max_size) {
1362
10.2k
        return false;
1363
10.2k
    }
1364
1.47k
    return try_reserve_from_other_queue_by_size(cur_cache_type, other_cache_types, size, cache_lock,
1365
1.47k
                                                evict_in_advance);
1366
11.7k
}
1367
1368
bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash,
1369
                                         QueryFileCacheContextPtr query_context,
1370
                                         const CacheContext& context, size_t offset, size_t size,
1371
                                         std::lock_guard<std::mutex>& cache_lock,
1372
373k
                                         bool evict_in_advance) {
1373
373k
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
1374
373k
                               std::chrono::steady_clock::now().time_since_epoch())
1375
373k
                               .count();
1376
373k
    if (!try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock,
1377
373k
                                      evict_in_advance)) {
1378
10.6k
        auto& queue = get_queue(context.cache_type);
1379
10.6k
        size_t removed_size = 0;
1380
10.6k
        size_t cur_cache_size = _cur_cache_size;
1381
1382
10.6k
        std::vector<FileBlockCell*> to_evict;
1383
10.6k
        size_t cur_removed_size = 0;
1384
10.6k
        find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
1385
10.6k
                              cur_removed_size, evict_in_advance);
1386
10.6k
        bool is_sync_removal = !evict_in_advance;
1387
10.6k
        std::string reason = std::string("try_reserve for cache type ") +
1388
10.6k
                             cache_type_to_string(context.cache_type) +
1389
10.6k
                             " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1390
10.6k
        remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1391
10.6k
        *(_evict_by_self_lru_metrics_matrix[context.cache_type]) << cur_removed_size;
1392
1393
10.6k
        if (is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1394
7.46k
            return false;
1395
7.46k
        }
1396
10.6k
    }
1397
1398
366k
    if (query_context) {
1399
132
        query_context->reserve(hash, offset, size, cache_lock);
1400
132
    }
1401
366k
    return true;
1402
373k
}
1403
1404
template <class T, class U>
1405
    requires IsXLock<T> && IsXLock<U>
1406
352k
void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lock, bool sync) {
1407
352k
    auto hash = file_block->get_hash_value();
1408
352k
    auto offset = file_block->offset();
1409
352k
    auto type = file_block->cache_type();
1410
352k
    auto expiration_time = file_block->expiration_time();
1411
352k
    auto tablet_id = file_block->tablet_id();
1412
352k
    auto* cell = get_cell(hash, offset, cache_lock);
1413
352k
    file_block->cell = nullptr;
1414
352k
    DCHECK(cell);
1415
352k
    DCHECK(cell->queue_iterator);
1416
352k
    if (cell->queue_iterator) {
1417
352k
        auto& queue = get_queue(file_block->cache_type());
1418
352k
        queue.remove(*cell->queue_iterator, cache_lock);
1419
352k
        _lru_recorder->record_queue_event(file_block->cache_type(), CacheLRULogType::REMOVE,
1420
352k
                                          cell->file_block->get_hash_value(),
1421
352k
                                          cell->file_block->offset(), cell->size());
1422
352k
    }
1423
352k
    *_queue_evict_size_metrics[static_cast<int>(file_block->cache_type())]
1424
352k
            << file_block->range().size();
1425
352k
    *_total_evict_size_metrics << file_block->range().size();
1426
1427
352k
    VLOG_DEBUG << "Removing file block from cache. hash: " << hash.to_string()
1428
0
               << ", offset: " << offset << ", size: " << file_block->range().size()
1429
0
               << ", type: " << cache_type_to_string(type);
1430
1431
352k
    if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADED) {
1432
117k
        FileCacheKey key;
1433
117k
        key.hash = hash;
1434
117k
        key.offset = offset;
1435
117k
        key.meta.type = type;
1436
117k
        key.meta.expiration_time = expiration_time;
1437
117k
        key.meta.tablet_id = tablet_id;
1438
117k
        if (sync) {
1439
52.2k
            int64_t duration_ns = 0;
1440
52.2k
            Status st;
1441
52.2k
            {
1442
52.2k
                SCOPED_RAW_TIMER(&duration_ns);
1443
52.2k
                st = _storage->remove(key);
1444
52.2k
            }
1445
52.2k
            *_storage_sync_remove_latency_us << (duration_ns / 1000);
1446
52.2k
            if (!st.ok()) {
1447
0
                LOG_WARNING("").error(st);
1448
0
            }
1449
65.2k
        } else {
1450
            // the file will be deleted in the bottom half
1451
            // so there will be a window that the file is not in the cache but still in the storage
1452
            // but it's ok, because the rowset is stale already
1453
65.2k
            bool ret = _recycle_keys.enqueue(key);
1454
65.2k
            if (ret) [[likely]] {
1455
65.2k
                *_recycle_keys_length_recorder << _recycle_keys.size_approx();
1456
65.2k
            } else {
1457
0
                LOG_WARNING("Failed to push recycle key to queue, do it synchronously");
1458
0
                int64_t duration_ns = 0;
1459
0
                Status st;
1460
0
                {
1461
0
                    SCOPED_RAW_TIMER(&duration_ns);
1462
0
                    st = _storage->remove(key);
1463
0
                }
1464
0
                *_storage_retry_sync_remove_latency_us << (duration_ns / 1000);
1465
0
                if (!st.ok()) {
1466
0
                    LOG_WARNING("").error(st);
1467
0
                }
1468
0
            }
1469
65.2k
        }
1470
235k
    } else if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADING) {
1471
0
        file_block->set_deleting();
1472
0
        return;
1473
0
    }
1474
352k
    _cur_cache_size -= file_block->range().size();
1475
352k
    if (FileCacheType::TTL == type) {
1476
2.58k
        _cur_ttl_size -= file_block->range().size();
1477
2.58k
    }
1478
352k
    auto it = _files.find(hash);
1479
352k
    if (it != _files.end()) {
1480
352k
        it->second.erase(file_block->offset());
1481
352k
        if (it->second.empty()) {
1482
115k
            _files.erase(hash);
1483
115k
        }
1484
352k
    }
1485
352k
    *_num_removed_blocks << 1;
1486
352k
}
1487
1488
92
size_t BlockFileCache::get_used_cache_size(FileCacheType cache_type) const {
1489
92
    SCOPED_CACHE_LOCK(_mutex, this);
1490
92
    return get_used_cache_size_unlocked(cache_type, cache_lock);
1491
92
}
1492
1493
size_t BlockFileCache::get_used_cache_size_unlocked(FileCacheType cache_type,
1494
92
                                                    std::lock_guard<std::mutex>& cache_lock) const {
1495
92
    return get_queue(cache_type).get_capacity(cache_lock);
1496
92
}
1497
1498
0
size_t BlockFileCache::get_available_cache_size(FileCacheType cache_type) const {
1499
0
    SCOPED_CACHE_LOCK(_mutex, this);
1500
0
    return get_available_cache_size_unlocked(cache_type, cache_lock);
1501
0
}
1502
1503
size_t BlockFileCache::get_available_cache_size_unlocked(
1504
0
        FileCacheType cache_type, std::lock_guard<std::mutex>& cache_lock) const {
1505
0
    return get_queue(cache_type).get_max_element_size() -
1506
0
           get_used_cache_size_unlocked(cache_type, cache_lock);
1507
0
}
1508
1509
176
size_t BlockFileCache::get_file_blocks_num(FileCacheType cache_type) const {
1510
176
    SCOPED_CACHE_LOCK(_mutex, this);
1511
176
    return get_file_blocks_num_unlocked(cache_type, cache_lock);
1512
176
}
1513
1514
size_t BlockFileCache::get_file_blocks_num_unlocked(FileCacheType cache_type,
1515
176
                                                    std::lock_guard<std::mutex>& cache_lock) const {
1516
176
    return get_queue(cache_type).get_elements_num(cache_lock);
1517
176
}
1518
1519
FileBlockCell::FileBlockCell(FileBlockSPtr file_block, std::lock_guard<std::mutex>& cache_lock)
1520
366k
        : file_block(file_block) {
1521
366k
    file_block->cell = this;
1522
    /**
1523
     * Cell can be created with either DOWNLOADED or EMPTY file block's state.
1524
     * File block acquires DOWNLOADING state and creates LRUQueue iterator on first
1525
     * successful getOrSetDownaloder call.
1526
     */
1527
1528
366k
    switch (file_block->_download_state) {
1529
394
    case FileBlock::State::DOWNLOADED:
1530
366k
    case FileBlock::State::EMPTY:
1531
366k
    case FileBlock::State::SKIP_CACHE: {
1532
366k
        break;
1533
366k
    }
1534
0
    default:
1535
0
        DCHECK(false) << "Can create cell with either EMPTY, DOWNLOADED, SKIP_CACHE state, got: "
1536
0
                      << FileBlock::state_to_string(file_block->_download_state);
1537
366k
    }
1538
366k
    if (file_block->cache_type() == FileCacheType::TTL) {
1539
7.84k
        update_atime();
1540
7.84k
    }
1541
366k
}
1542
1543
LRUQueue::Iterator LRUQueue::add(const UInt128Wrapper& hash, size_t offset, size_t size,
1544
724k
                                 std::lock_guard<std::mutex>& /* cache_lock */) {
1545
724k
    cache_size += size;
1546
724k
    auto iter = queue.insert(queue.end(), FileKeyAndOffset(hash, offset, size));
1547
724k
    map.insert(std::make_pair(std::make_pair(hash, offset), iter));
1548
724k
    return iter;
1549
724k
}
1550
1551
0
void LRUQueue::remove_all(std::lock_guard<std::mutex>& /* cache_lock */) {
1552
0
    queue.clear();
1553
0
    map.clear();
1554
0
    cache_size = 0;
1555
0
}
1556
1557
39.3M
void LRUQueue::move_to_end(Iterator queue_it, std::lock_guard<std::mutex>& /* cache_lock */) {
1558
39.3M
    queue.splice(queue.end(), queue, queue_it);
1559
39.3M
}
1560
1561
void LRUQueue::resize(Iterator queue_it, size_t new_size,
1562
114k
                      std::lock_guard<std::mutex>& /* cache_lock */) {
1563
114k
    cache_size -= queue_it->size;
1564
114k
    queue_it->size = new_size;
1565
114k
    cache_size += new_size;
1566
114k
}
1567
bool LRUQueue::contains(const UInt128Wrapper& hash, size_t offset,
1568
57.4k
                        std::lock_guard<std::mutex>& /* cache_lock */) const {
1569
57.4k
    return map.find(std::make_pair(hash, offset)) != map.end();
1570
57.4k
}
1571
1572
LRUQueue::Iterator LRUQueue::get(const UInt128Wrapper& hash, size_t offset,
1573
20.8M
                                 std::lock_guard<std::mutex>& /* cache_lock */) const {
1574
20.8M
    auto itr = map.find(std::make_pair(hash, offset));
1575
20.8M
    if (itr != map.end()) {
1576
19.3M
        return itr->second;
1577
19.3M
    }
1578
1.48M
    return std::list<FileKeyAndOffset>::iterator();
1579
20.8M
}
1580
1581
4
std::string LRUQueue::to_string(std::lock_guard<std::mutex>& /* cache_lock */) const {
1582
4
    std::string result;
1583
36
    for (const auto& [hash, offset, size] : queue) {
1584
36
        if (!result.empty()) {
1585
32
            result += ", ";
1586
32
        }
1587
36
        result += fmt::format("{}: [{}, {}]", hash.to_string(), offset, offset + size - 1);
1588
36
    }
1589
4
    return result;
1590
4
}
1591
1592
size_t LRUQueue::levenshtein_distance_from(LRUQueue& base,
1593
10
                                           std::lock_guard<std::mutex>& cache_lock) {
1594
10
    std::list<FileKeyAndOffset> target_queue = this->queue;
1595
10
    std::list<FileKeyAndOffset> base_queue = base.queue;
1596
10
    std::vector<FileKeyAndOffset> vec1(target_queue.begin(), target_queue.end());
1597
10
    std::vector<FileKeyAndOffset> vec2(base_queue.begin(), base_queue.end());
1598
1599
10
    size_t m = vec1.size();
1600
10
    size_t n = vec2.size();
1601
1602
    // Create a 2D vector (matrix) to store the Levenshtein distances
1603
    // dp[i][j] will hold the distance between the first i elements of vec1 and the first j elements of vec2
1604
10
    std::vector<std::vector<size_t>> dp(m + 1, std::vector<size_t>(n + 1, 0));
1605
1606
    // Initialize the first row and column of the matrix
1607
    // The distance between an empty list and a list of length k is k (all insertions or deletions)
1608
44
    for (size_t i = 0; i <= m; ++i) {
1609
34
        dp[i][0] = i;
1610
34
    }
1611
40
    for (size_t j = 0; j <= n; ++j) {
1612
30
        dp[0][j] = j;
1613
30
    }
1614
1615
    // Fill the matrix using dynamic programming
1616
34
    for (size_t i = 1; i <= m; ++i) {
1617
76
        for (size_t j = 1; j <= n; ++j) {
1618
            // Check if the current elements of both vectors are equal
1619
52
            size_t cost = (vec1[i - 1].hash == vec2[j - 1].hash &&
1620
52
                           vec1[i - 1].offset == vec2[j - 1].offset)
1621
52
                                  ? 0
1622
52
                                  : 1;
1623
            // Calculate the minimum cost of three possible operations:
1624
            // 1. Insertion: dp[i][j-1] + 1
1625
            // 2. Deletion: dp[i-1][j] + 1
1626
            // 3. Substitution: dp[i-1][j-1] + cost (0 if elements are equal, 1 if not)
1627
52
            dp[i][j] = std::min({dp[i - 1][j] + 1, dp[i][j - 1] + 1, dp[i - 1][j - 1] + cost});
1628
52
        }
1629
24
    }
1630
    // The bottom-right cell of the matrix contains the Levenshtein distance
1631
10
    return dp[m][n];
1632
10
}
1633
1634
2
std::string BlockFileCache::dump_structure(const UInt128Wrapper& hash) {
1635
2
    SCOPED_CACHE_LOCK(_mutex, this);
1636
2
    return dump_structure_unlocked(hash, cache_lock);
1637
2
}
1638
1639
std::string BlockFileCache::dump_structure_unlocked(const UInt128Wrapper& hash,
1640
2
                                                    std::lock_guard<std::mutex>&) {
1641
2
    std::stringstream result;
1642
2
    auto it = _files.find(hash);
1643
2
    if (it == _files.end()) {
1644
0
        return std::string("");
1645
0
    }
1646
2
    const auto& cells_by_offset = it->second;
1647
1648
2
    for (const auto& [_, cell] : cells_by_offset) {
1649
2
        result << cell.file_block->get_info_for_log() << " "
1650
2
               << cache_type_to_string(cell.file_block->cache_type()) << "\n";
1651
2
    }
1652
1653
2
    return result.str();
1654
2
}
1655
1656
0
std::string BlockFileCache::dump_single_cache_type(const UInt128Wrapper& hash, size_t offset) {
1657
0
    SCOPED_CACHE_LOCK(_mutex, this);
1658
0
    return dump_single_cache_type_unlocked(hash, offset, cache_lock);
1659
0
}
1660
1661
std::string BlockFileCache::dump_single_cache_type_unlocked(const UInt128Wrapper& hash,
1662
                                                            size_t offset,
1663
0
                                                            std::lock_guard<std::mutex>&) {
1664
0
    std::stringstream result;
1665
0
    auto it = _files.find(hash);
1666
0
    if (it == _files.end()) {
1667
0
        return std::string("");
1668
0
    }
1669
0
    const auto& cells_by_offset = it->second;
1670
0
    const auto& cell = cells_by_offset.find(offset);
1671
1672
0
    return cache_type_to_string(cell->second.file_block->cache_type());
1673
0
}
1674
1675
void BlockFileCache::change_cache_type(const UInt128Wrapper& hash, size_t offset,
1676
                                       FileCacheType new_type,
1677
2.65k
                                       std::lock_guard<std::mutex>& cache_lock) {
1678
2.65k
    if (auto iter = _files.find(hash); iter != _files.end()) {
1679
2.65k
        auto& file_blocks = iter->second;
1680
2.65k
        if (auto cell_it = file_blocks.find(offset); cell_it != file_blocks.end()) {
1681
2.65k
            FileBlockCell& cell = cell_it->second;
1682
2.65k
            auto& cur_queue = get_queue(cell.file_block->cache_type());
1683
2.65k
            DCHECK(cell.queue_iterator.has_value());
1684
2.65k
            cur_queue.remove(*cell.queue_iterator, cache_lock);
1685
2.65k
            _lru_recorder->record_queue_event(
1686
2.65k
                    cell.file_block->cache_type(), CacheLRULogType::REMOVE,
1687
2.65k
                    cell.file_block->get_hash_value(), cell.file_block->offset(), cell.size());
1688
2.65k
            auto& new_queue = get_queue(new_type);
1689
2.65k
            cell.queue_iterator =
1690
2.65k
                    new_queue.add(hash, offset, cell.file_block->range().size(), cache_lock);
1691
2.65k
            _lru_recorder->record_queue_event(new_type, CacheLRULogType::ADD,
1692
2.65k
                                              cell.file_block->get_hash_value(),
1693
2.65k
                                              cell.file_block->offset(), cell.size());
1694
2.65k
        }
1695
2.65k
    }
1696
2.65k
}
1697
1698
// @brief: get a path's disk capacity used percent, inode used percent
1699
// @param: path
1700
// @param: percent.first disk used percent, percent.second inode used percent
1701
8.36k
int disk_used_percentage(const std::string& path, std::pair<int, int>* percent) {
1702
8.36k
    struct statfs stat;
1703
8.36k
    int ret = statfs(path.c_str(), &stat);
1704
8.36k
    if (ret != 0) {
1705
6
        return ret;
1706
6
    }
1707
    // https://github.com/coreutils/coreutils/blob/master/src/df.c#L1195
1708
    // v->used = stat.f_blocks - stat.f_bfree
1709
    // nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail
1710
8.36k
    uintmax_t u100 = (stat.f_blocks - stat.f_bfree) * 100;
1711
8.36k
    uintmax_t nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail;
1712
8.36k
    int capacity_percentage = int(u100 / nonroot_total + (u100 % nonroot_total != 0));
1713
1714
8.36k
    unsigned long long inode_free = stat.f_ffree;
1715
8.36k
    unsigned long long inode_total = stat.f_files;
1716
8.36k
    int inode_percentage = cast_set<int>(inode_free * 100 / inode_total);
1717
8.36k
    percent->first = capacity_percentage;
1718
8.36k
    percent->second = 100 - inode_percentage;
1719
1720
    // Add sync point for testing
1721
8.36k
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::disk_used_percentage:1", percent);
1722
1723
8.36k
    return 0;
1724
8.36k
}
1725
1726
20
std::string BlockFileCache::reset_capacity(size_t new_capacity) {
1727
20
    using namespace std::chrono;
1728
20
    int64_t space_released = 0;
1729
20
    size_t old_capacity = 0;
1730
20
    std::stringstream ss;
1731
20
    ss << "finish reset_capacity, path=" << _cache_base_path;
1732
20
    auto adjust_start_time = steady_clock::time_point();
1733
20
    {
1734
20
        SCOPED_CACHE_LOCK(_mutex, this);
1735
20
        if (new_capacity < _capacity && new_capacity < _cur_cache_size) {
1736
2
            int64_t need_remove_size = _cur_cache_size - new_capacity;
1737
8
            auto remove_blocks = [&](LRUQueue& queue) -> int64_t {
1738
8
                int64_t queue_released = 0;
1739
8
                std::vector<FileBlockCell*> to_evict;
1740
26
                for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1741
26
                    if (need_remove_size <= 0) {
1742
2
                        break;
1743
2
                    }
1744
24
                    need_remove_size -= entry_size;
1745
24
                    space_released += entry_size;
1746
24
                    queue_released += entry_size;
1747
24
                    auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1748
24
                    if (!cell->releasable()) {
1749
0
                        cell->file_block->set_deleting();
1750
0
                        continue;
1751
0
                    }
1752
24
                    to_evict.push_back(cell);
1753
24
                }
1754
24
                for (auto& cell : to_evict) {
1755
24
                    FileBlockSPtr file_block = cell->file_block;
1756
24
                    std::lock_guard block_lock(file_block->_mutex);
1757
24
                    remove(file_block, cache_lock, block_lock);
1758
24
                }
1759
8
                return queue_released;
1760
8
            };
1761
2
            int64_t queue_released = remove_blocks(_disposable_queue);
1762
2
            ss << " disposable_queue released " << queue_released;
1763
2
            queue_released = remove_blocks(_normal_queue);
1764
2
            ss << " normal_queue released " << queue_released;
1765
2
            queue_released = remove_blocks(_index_queue);
1766
2
            ss << " index_queue released " << queue_released;
1767
2
            queue_released = remove_blocks(_ttl_queue);
1768
2
            ss << " ttl_queue released " << queue_released;
1769
1770
2
            _disk_resource_limit_mode = true;
1771
2
            _disk_limit_mode_metrics->set_value(1);
1772
2
            ss << " total_space_released=" << space_released;
1773
2
        }
1774
20
        old_capacity = _capacity;
1775
20
        _capacity = new_capacity;
1776
20
        _cache_capacity_metrics->set_value(_capacity);
1777
20
    }
1778
20
    auto use_time = duration_cast<milliseconds>(steady_clock::time_point() - adjust_start_time);
1779
20
    LOG(INFO) << "Finish tag deleted block. path=" << _cache_base_path
1780
20
              << " use_time=" << cast_set<int64_t>(use_time.count());
1781
20
    ss << " old_capacity=" << old_capacity << " new_capacity=" << new_capacity;
1782
20
    LOG(INFO) << ss.str();
1783
20
    return ss.str();
1784
20
}
1785
1786
6.28k
void BlockFileCache::check_disk_resource_limit() {
1787
6.28k
    if (_storage->get_type() != FileCacheStorageType::DISK) {
1788
1.96k
        return;
1789
1.96k
    }
1790
1791
4.32k
    bool previous_mode = _disk_resource_limit_mode;
1792
4.32k
    if (_capacity > _cur_cache_size) {
1793
4.31k
        _disk_resource_limit_mode = false;
1794
4.31k
        _disk_limit_mode_metrics->set_value(0);
1795
4.31k
    }
1796
4.32k
    std::pair<int, int> percent;
1797
4.32k
    int ret = disk_used_percentage(_cache_base_path, &percent);
1798
4.32k
    if (ret != 0) {
1799
4
        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
1800
4
        return;
1801
4
    }
1802
4.31k
    auto [space_percentage, inode_percentage] = percent;
1803
8.62k
    auto is_insufficient = [](const int& percentage) {
1804
8.62k
        return percentage >= config::file_cache_enter_disk_resource_limit_mode_percent;
1805
8.62k
    };
1806
4.31k
    DCHECK_GE(space_percentage, 0);
1807
4.31k
    DCHECK_LE(space_percentage, 100);
1808
4.31k
    DCHECK_GE(inode_percentage, 0);
1809
4.31k
    DCHECK_LE(inode_percentage, 100);
1810
    // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
1811
    // FIXME: reject with config validator
1812
4.31k
    if (config::file_cache_enter_disk_resource_limit_mode_percent <
1813
4.31k
        config::file_cache_exit_disk_resource_limit_mode_percent) {
1814
2
        LOG_WARNING("config error, set to default value")
1815
2
                .tag("enter", config::file_cache_enter_disk_resource_limit_mode_percent)
1816
2
                .tag("exit", config::file_cache_exit_disk_resource_limit_mode_percent);
1817
2
        config::file_cache_enter_disk_resource_limit_mode_percent = 88;
1818
2
        config::file_cache_exit_disk_resource_limit_mode_percent = 80;
1819
2
    }
1820
4.31k
    bool is_space_insufficient = is_insufficient(space_percentage);
1821
4.31k
    bool is_inode_insufficient = is_insufficient(inode_percentage);
1822
4.31k
    if (is_space_insufficient || is_inode_insufficient) {
1823
6
        _disk_resource_limit_mode = true;
1824
6
        _disk_limit_mode_metrics->set_value(1);
1825
4.31k
    } else if (_disk_resource_limit_mode &&
1826
4.31k
               (space_percentage < config::file_cache_exit_disk_resource_limit_mode_percent) &&
1827
4.31k
               (inode_percentage < config::file_cache_exit_disk_resource_limit_mode_percent)) {
1828
0
        _disk_resource_limit_mode = false;
1829
0
        _disk_limit_mode_metrics->set_value(0);
1830
0
    }
1831
4.31k
    if (previous_mode != _disk_resource_limit_mode) {
1832
        // add log for disk resource limit mode switching
1833
12
        if (_disk_resource_limit_mode) {
1834
6
            LOG(WARNING) << "Entering disk resource limit mode: file_cache=" << get_base_path()
1835
6
                         << " space_percent=" << space_percentage
1836
6
                         << " inode_percent=" << inode_percentage
1837
6
                         << " is_space_insufficient=" << is_space_insufficient
1838
6
                         << " is_inode_insufficient=" << is_inode_insufficient
1839
6
                         << " enter threshold="
1840
6
                         << config::file_cache_enter_disk_resource_limit_mode_percent;
1841
6
        } else {
1842
6
            LOG(INFO) << "Exiting disk resource limit mode: file_cache=" << get_base_path()
1843
6
                      << " space_percent=" << space_percentage
1844
6
                      << " inode_percent=" << inode_percentage << " exit threshold="
1845
6
                      << config::file_cache_exit_disk_resource_limit_mode_percent;
1846
6
        }
1847
4.30k
    } else if (_disk_resource_limit_mode) {
1848
        // print log for disk resource limit mode running, but less frequently
1849
0
        LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path()
1850
0
                                 << " space_percent=" << space_percentage
1851
0
                                 << " inode_percent=" << inode_percentage
1852
0
                                 << " is_space_insufficient=" << is_space_insufficient
1853
0
                                 << " is_inode_insufficient=" << is_inode_insufficient
1854
0
                                 << " mode run in resource limit";
1855
0
    }
1856
4.31k
}
1857
1858
4.05k
void BlockFileCache::check_need_evict_cache_in_advance() {
1859
4.05k
    if (_storage->get_type() != FileCacheStorageType::DISK) {
1860
2
        return;
1861
2
    }
1862
1863
4.05k
    std::pair<int, int> percent;
1864
4.05k
    int ret = disk_used_percentage(_cache_base_path, &percent);
1865
4.05k
    if (ret != 0) {
1866
2
        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
1867
2
        return;
1868
2
    }
1869
4.04k
    auto [space_percentage, inode_percentage] = percent;
1870
4.04k
    int size_percentage = static_cast<int>(_cur_cache_size * 100 / _capacity);
1871
12.1k
    auto is_insufficient = [](const int& percentage) {
1872
12.1k
        return percentage >= config::file_cache_enter_need_evict_cache_in_advance_percent;
1873
12.1k
    };
1874
4.04k
    DCHECK_GE(space_percentage, 0);
1875
4.04k
    DCHECK_LE(space_percentage, 100);
1876
4.04k
    DCHECK_GE(inode_percentage, 0);
1877
4.04k
    DCHECK_LE(inode_percentage, 100);
1878
    // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
1879
    // FIXME: reject with config validator
1880
4.04k
    if (config::file_cache_enter_need_evict_cache_in_advance_percent <=
1881
4.04k
        config::file_cache_exit_need_evict_cache_in_advance_percent) {
1882
2
        LOG_WARNING("config error, set to default value")
1883
2
                .tag("enter", config::file_cache_enter_need_evict_cache_in_advance_percent)
1884
2
                .tag("exit", config::file_cache_exit_need_evict_cache_in_advance_percent);
1885
2
        config::file_cache_enter_need_evict_cache_in_advance_percent = 78;
1886
2
        config::file_cache_exit_need_evict_cache_in_advance_percent = 75;
1887
2
    }
1888
4.04k
    bool previous_mode = _need_evict_cache_in_advance;
1889
4.04k
    bool is_space_insufficient = is_insufficient(space_percentage);
1890
4.04k
    bool is_inode_insufficient = is_insufficient(inode_percentage);
1891
4.04k
    bool is_size_insufficient = is_insufficient(size_percentage);
1892
4.04k
    if (is_space_insufficient || is_inode_insufficient || is_size_insufficient) {
1893
70
        _need_evict_cache_in_advance = true;
1894
70
        _need_evict_cache_in_advance_metrics->set_value(1);
1895
3.97k
    } else if (_need_evict_cache_in_advance &&
1896
3.97k
               (space_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
1897
3.97k
               (inode_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
1898
3.97k
               (size_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent)) {
1899
52
        _need_evict_cache_in_advance = false;
1900
52
        _need_evict_cache_in_advance_metrics->set_value(0);
1901
52
    }
1902
4.04k
    if (previous_mode != _need_evict_cache_in_advance) {
1903
        // add log for evict cache in advance mode switching
1904
112
        if (_need_evict_cache_in_advance) {
1905
60
            LOG(WARNING) << "Entering evict cache in advance mode: "
1906
60
                         << "file_cache=" << get_base_path()
1907
60
                         << " space_percent=" << space_percentage
1908
60
                         << " inode_percent=" << inode_percentage
1909
60
                         << " size_percent=" << size_percentage
1910
60
                         << " is_space_insufficient=" << is_space_insufficient
1911
60
                         << " is_inode_insufficient=" << is_inode_insufficient
1912
60
                         << " is_size_insufficient=" << is_size_insufficient << " enter threshold="
1913
60
                         << config::file_cache_enter_need_evict_cache_in_advance_percent;
1914
60
        } else {
1915
52
            LOG(INFO) << "Exiting evict cache in advance mode: "
1916
52
                      << "file_cache=" << get_base_path() << " space_percent=" << space_percentage
1917
52
                      << " inode_percent=" << inode_percentage
1918
52
                      << " size_percent=" << size_percentage << " exit threshold="
1919
52
                      << config::file_cache_exit_need_evict_cache_in_advance_percent;
1920
52
        }
1921
3.93k
    } else if (_need_evict_cache_in_advance) {
1922
        // print log for evict cache in advance mode running, but less frequently
1923
10
        LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path()
1924
3
                                 << " space_percent=" << space_percentage
1925
3
                                 << " inode_percent=" << inode_percentage
1926
3
                                 << " size_percent=" << size_percentage
1927
3
                                 << " is_space_insufficient=" << is_space_insufficient
1928
3
                                 << " is_inode_insufficient=" << is_inode_insufficient
1929
3
                                 << " is_size_insufficient=" << is_size_insufficient
1930
3
                                 << " need evict cache in advance";
1931
10
    }
1932
4.04k
}
1933
1934
285
void BlockFileCache::run_background_monitor() {
1935
285
    Thread::set_self_name("run_background_monitor");
1936
6.29k
    while (!_close) {
1937
6.28k
        int64_t interval_ms = config::file_cache_background_monitor_interval_ms;
1938
6.28k
        TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_ms);
1939
6.28k
        check_disk_resource_limit();
1940
6.28k
        if (config::enable_evict_file_cache_in_advance) {
1941
4.03k
            check_need_evict_cache_in_advance();
1942
4.03k
        } else {
1943
2.24k
            _need_evict_cache_in_advance = false;
1944
2.24k
            _need_evict_cache_in_advance_metrics->set_value(0);
1945
2.24k
        }
1946
1947
6.28k
        {
1948
6.28k
            std::unique_lock close_lock(_close_mtx);
1949
6.28k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
1950
6.28k
            if (_close) {
1951
276
                break;
1952
276
            }
1953
6.28k
        }
1954
        // report
1955
6.00k
        {
1956
6.00k
            SCOPED_CACHE_LOCK(_mutex, this);
1957
6.00k
            _cur_cache_size_metrics->set_value(_cur_cache_size);
1958
6.00k
            _cur_ttl_cache_size_metrics->set_value(_cur_cache_size -
1959
6.00k
                                                   _index_queue.get_capacity(cache_lock) -
1960
6.00k
                                                   _normal_queue.get_capacity(cache_lock) -
1961
6.00k
                                                   _disposable_queue.get_capacity(cache_lock));
1962
6.00k
            _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(
1963
6.00k
                    _ttl_queue.get_capacity(cache_lock));
1964
6.00k
            _cur_ttl_cache_lru_queue_element_count_metrics->set_value(
1965
6.00k
                    _ttl_queue.get_elements_num(cache_lock));
1966
6.00k
            _cur_normal_queue_cache_size_metrics->set_value(_normal_queue.get_capacity(cache_lock));
1967
6.00k
            _cur_normal_queue_element_count_metrics->set_value(
1968
6.00k
                    _normal_queue.get_elements_num(cache_lock));
1969
6.00k
            _cur_index_queue_cache_size_metrics->set_value(_index_queue.get_capacity(cache_lock));
1970
6.00k
            _cur_index_queue_element_count_metrics->set_value(
1971
6.00k
                    _index_queue.get_elements_num(cache_lock));
1972
6.00k
            _cur_disposable_queue_cache_size_metrics->set_value(
1973
6.00k
                    _disposable_queue.get_capacity(cache_lock));
1974
6.00k
            _cur_disposable_queue_element_count_metrics->set_value(
1975
6.00k
                    _disposable_queue.get_elements_num(cache_lock));
1976
1977
            // Update meta store write queue size if storage is FSFileCacheStorage
1978
6.00k
            if (_storage->get_type() == FileCacheStorageType::DISK) {
1979
4.06k
                auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get());
1980
4.06k
                if (fs_storage != nullptr) {
1981
4.06k
                    auto* meta_store = fs_storage->get_meta_store();
1982
4.06k
                    if (meta_store != nullptr) {
1983
4.06k
                        _meta_store_write_queue_size_metrics->set_value(
1984
4.06k
                                meta_store->get_write_queue_size());
1985
4.06k
                    }
1986
4.06k
                }
1987
4.06k
            }
1988
1989
6.00k
            if (_num_read_blocks->get_value() > 0) {
1990
2.30k
                _hit_ratio->set_value((double)_num_hit_blocks->get_value() /
1991
2.30k
                                      (double)_num_read_blocks->get_value());
1992
2.30k
            }
1993
6.00k
            if (_num_read_blocks_5m && _num_read_blocks_5m->get_value() > 0) {
1994
1.62k
                _hit_ratio_5m->set_value((double)_num_hit_blocks_5m->get_value() /
1995
1.62k
                                         (double)_num_read_blocks_5m->get_value());
1996
1.62k
            }
1997
6.00k
            if (_num_read_blocks_1h && _num_read_blocks_1h->get_value() > 0) {
1998
2.27k
                _hit_ratio_1h->set_value((double)_num_hit_blocks_1h->get_value() /
1999
2.27k
                                         (double)_num_read_blocks_1h->get_value());
2000
2.27k
            }
2001
2002
6.00k
            if (_no_warmup_num_hit_blocks->get_value() > 0) {
2003
2.25k
                _no_warmup_hit_ratio->set_value((double)_no_warmup_num_hit_blocks->get_value() /
2004
2.25k
                                                (double)_no_warmup_num_read_blocks->get_value());
2005
2.25k
            }
2006
6.00k
            if (_no_warmup_num_hit_blocks_5m && _no_warmup_num_hit_blocks_5m->get_value() > 0) {
2007
1.60k
                _no_warmup_hit_ratio_5m->set_value(
2008
1.60k
                        (double)_no_warmup_num_hit_blocks_5m->get_value() /
2009
1.60k
                        (double)_no_warmup_num_read_blocks_5m->get_value());
2010
1.60k
            }
2011
6.00k
            if (_no_warmup_num_hit_blocks_1h && _no_warmup_num_hit_blocks_1h->get_value() > 0) {
2012
2.25k
                _no_warmup_hit_ratio_1h->set_value(
2013
2.25k
                        (double)_no_warmup_num_hit_blocks_1h->get_value() /
2014
2.25k
                        (double)_no_warmup_num_read_blocks_1h->get_value());
2015
2.25k
            }
2016
6.00k
        }
2017
6.00k
    }
2018
285
}
2019
2020
285
void BlockFileCache::run_background_gc() {
2021
285
    Thread::set_self_name("run_background_gc");
2022
285
    FileCacheKey key;
2023
285
    size_t batch_count = 0;
2024
201k
    while (!_close) {
2025
201k
        int64_t interval_ms = config::file_cache_background_gc_interval_ms;
2026
201k
        size_t batch_limit = config::file_cache_remove_block_qps_limit * interval_ms / 1000;
2027
201k
        {
2028
201k
            std::unique_lock close_lock(_close_mtx);
2029
201k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2030
201k
            if (_close) {
2031
274
                break;
2032
274
            }
2033
201k
        }
2034
2035
265k
        while (batch_count < batch_limit && _recycle_keys.try_dequeue(key)) {
2036
65.1k
            int64_t duration_ns = 0;
2037
65.1k
            Status st;
2038
65.1k
            {
2039
65.1k
                SCOPED_RAW_TIMER(&duration_ns);
2040
65.1k
                st = _storage->remove(key);
2041
65.1k
            }
2042
65.1k
            *_storage_async_remove_latency_us << (duration_ns / 1000);
2043
2044
65.1k
            if (!st.ok()) {
2045
1
                LOG_WARNING("").error(st);
2046
1
            }
2047
65.1k
            batch_count++;
2048
65.1k
        }
2049
200k
        *_recycle_keys_length_recorder << _recycle_keys.size_approx();
2050
200k
        batch_count = 0;
2051
200k
    }
2052
285
}
2053
2054
285
void BlockFileCache::run_background_evict_in_advance() {
2055
285
    Thread::set_self_name("run_background_evict_in_advance");
2056
285
    LOG(INFO) << "Starting background evict in advance thread";
2057
285
    int64_t batch = 0;
2058
30.3k
    while (!_close) {
2059
30.3k
        {
2060
30.3k
            std::unique_lock close_lock(_close_mtx);
2061
30.3k
            _close_cv.wait_for(
2062
30.3k
                    close_lock,
2063
30.3k
                    std::chrono::milliseconds(config::file_cache_evict_in_advance_interval_ms));
2064
30.3k
            if (_close) {
2065
273
                LOG(INFO) << "Background evict in advance thread exiting due to cache closing";
2066
273
                break;
2067
273
            }
2068
30.3k
        }
2069
30.0k
        batch = config::file_cache_evict_in_advance_batch_bytes;
2070
2071
        // Skip if eviction not needed or too many pending recycles
2072
30.0k
        if (!_need_evict_cache_in_advance ||
2073
30.0k
            _recycle_keys.size_approx() >=
2074
29.8k
                    config::file_cache_evict_in_advance_recycle_keys_num_threshold) {
2075
29.8k
            continue;
2076
29.8k
        }
2077
2078
238
        int64_t duration_ns = 0;
2079
238
        {
2080
238
            SCOPED_CACHE_LOCK(_mutex, this);
2081
238
            SCOPED_RAW_TIMER(&duration_ns);
2082
238
            try_evict_in_advance(batch, cache_lock);
2083
238
        }
2084
238
        *_evict_in_advance_latency_us << (duration_ns / 1000);
2085
238
    }
2086
285
}
2087
2088
285
void BlockFileCache::run_background_block_lru_update() {
2089
285
    Thread::set_self_name("run_background_block_lru_update");
2090
285
    std::vector<FileBlockSPtr> batch;
2091
13.9k
    while (!_close) {
2092
13.9k
        int64_t interval_ms = config::file_cache_background_block_lru_update_interval_ms;
2093
13.9k
        size_t batch_limit =
2094
13.9k
                config::file_cache_background_block_lru_update_qps_limit * interval_ms / 1000;
2095
13.9k
        {
2096
13.9k
            std::unique_lock close_lock(_close_mtx);
2097
13.9k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2098
13.9k
            if (_close) {
2099
273
                break;
2100
273
            }
2101
13.9k
        }
2102
2103
13.6k
        batch.clear();
2104
13.6k
        batch.reserve(batch_limit);
2105
13.6k
        size_t drained = _need_update_lru_blocks.drain(batch_limit, &batch);
2106
13.7k
        if (drained == 0) {
2107
13.7k
            *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2108
13.7k
            continue;
2109
13.7k
        }
2110
2111
18.4E
        int64_t duration_ns = 0;
2112
18.4E
        {
2113
18.4E
            SCOPED_CACHE_LOCK(_mutex, this);
2114
18.4E
            SCOPED_RAW_TIMER(&duration_ns);
2115
18.4E
            for (auto& block : batch) {
2116
1.41k
                update_block_lru(block, cache_lock);
2117
1.41k
            }
2118
18.4E
        }
2119
18.4E
        *_update_lru_blocks_latency_us << (duration_ns / 1000);
2120
18.4E
        *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2121
18.4E
    }
2122
285
}
2123
2124
std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>>
2125
4
BlockFileCache::get_hot_blocks_meta(const UInt128Wrapper& hash) const {
2126
4
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
2127
4
                               std::chrono::steady_clock::now().time_since_epoch())
2128
4
                               .count();
2129
4
    SCOPED_CACHE_LOCK(_mutex, this);
2130
4
    std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>> blocks_meta;
2131
4
    if (auto iter = _files.find(hash); iter != _files.end()) {
2132
10
        for (auto& pair : _files.find(hash)->second) {
2133
10
            const FileBlockCell* cell = &pair.second;
2134
10
            if (cell->file_block->cache_type() != FileCacheType::DISPOSABLE) {
2135
8
                if (cell->file_block->cache_type() == FileCacheType::TTL ||
2136
8
                    (cell->atime != 0 &&
2137
6
                     cur_time - cell->atime <
2138
6
                             get_queue(cell->file_block->cache_type()).get_hot_data_interval())) {
2139
6
                    blocks_meta.emplace_back(pair.first, cell->size(),
2140
6
                                             cell->file_block->cache_type(),
2141
6
                                             cell->file_block->expiration_time());
2142
6
                }
2143
8
            }
2144
10
        }
2145
4
    }
2146
4
    return blocks_meta;
2147
4
}
2148
2149
bool BlockFileCache::try_reserve_during_async_load(size_t size,
2150
4
                                                   std::lock_guard<std::mutex>& cache_lock) {
2151
4
    size_t removed_size = 0;
2152
4
    size_t normal_queue_size = _normal_queue.get_capacity(cache_lock);
2153
4
    size_t disposable_queue_size = _disposable_queue.get_capacity(cache_lock);
2154
4
    size_t index_queue_size = _index_queue.get_capacity(cache_lock);
2155
2156
4
    std::vector<FileBlockCell*> to_evict;
2157
4
    auto collect_eliminate_fragments = [&](LRUQueue& queue) {
2158
4
        for (const auto& [entry_key, entry_offset, entry_size] : queue) {
2159
4
            if (!_disk_resource_limit_mode || removed_size >= size) {
2160
3
                break;
2161
3
            }
2162
1
            auto* cell = get_cell(entry_key, entry_offset, cache_lock);
2163
2164
1
            DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
2165
0
                         << ", offset: " << entry_offset;
2166
2167
1
            size_t cell_size = cell->size();
2168
1
            DCHECK(entry_size == cell_size);
2169
2170
1
            if (cell->releasable()) {
2171
1
                auto& file_block = cell->file_block;
2172
2173
1
                std::lock_guard block_lock(file_block->_mutex);
2174
1
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
2175
1
                to_evict.push_back(cell);
2176
1
                removed_size += cell_size;
2177
1
            }
2178
1
        }
2179
3
    };
2180
4
    if (disposable_queue_size != 0) {
2181
0
        collect_eliminate_fragments(get_queue(FileCacheType::DISPOSABLE));
2182
0
    }
2183
4
    if (normal_queue_size != 0) {
2184
3
        collect_eliminate_fragments(get_queue(FileCacheType::NORMAL));
2185
3
    }
2186
4
    if (index_queue_size != 0) {
2187
0
        collect_eliminate_fragments(get_queue(FileCacheType::INDEX));
2188
0
    }
2189
4
    std::string reason = "async load";
2190
4
    remove_file_blocks(to_evict, cache_lock, true, reason);
2191
2192
4
    return !_disk_resource_limit_mode || removed_size >= size;
2193
4
}
2194
2195
68
void BlockFileCache::clear_need_update_lru_blocks() {
2196
68
    _need_update_lru_blocks.clear();
2197
68
    *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2198
68
}
2199
2200
62
void BlockFileCache::pause_ttl_manager() {
2201
62
    if (_ttl_mgr) {
2202
20
        _ttl_mgr->stop();
2203
20
    }
2204
62
}
2205
2206
62
void BlockFileCache::resume_ttl_manager() {
2207
62
    if (_ttl_mgr) {
2208
20
        _ttl_mgr->resume();
2209
20
    }
2210
62
}
2211
2212
62
std::string BlockFileCache::clear_file_cache_directly() {
2213
62
    pause_ttl_manager();
2214
62
    _lru_dumper->remove_lru_dump_files();
2215
62
    using namespace std::chrono;
2216
62
    std::stringstream ss;
2217
62
    auto start = steady_clock::now();
2218
62
    std::string result;
2219
62
    {
2220
62
        SCOPED_CACHE_LOCK(_mutex, this);
2221
62
        LOG_INFO("start clear_file_cache_directly").tag("path", _cache_base_path);
2222
2223
62
        std::string clear_msg;
2224
62
        auto s = _storage->clear(clear_msg);
2225
62
        if (!s.ok()) {
2226
2
            result = clear_msg;
2227
60
        } else {
2228
60
            int64_t num_files = _files.size();
2229
60
            int64_t cache_size = _cur_cache_size;
2230
60
            int64_t index_queue_size = _index_queue.get_elements_num(cache_lock);
2231
60
            int64_t normal_queue_size = _normal_queue.get_elements_num(cache_lock);
2232
60
            int64_t disposible_queue_size = _disposable_queue.get_elements_num(cache_lock);
2233
60
            int64_t ttl_queue_size = _ttl_queue.get_elements_num(cache_lock);
2234
2235
60
            int64_t clear_fd_duration = 0;
2236
60
            {
2237
                // clear FDCache to release fd
2238
60
                SCOPED_RAW_TIMER(&clear_fd_duration);
2239
64
                for (const auto& [file_key, file_blocks] : _files) {
2240
666
                    for (const auto& [offset, file_block_cell] : file_blocks) {
2241
666
                        AccessKeyAndOffset access_key_and_offset(file_key, offset);
2242
666
                        FDCache::instance()->remove_file_reader(access_key_and_offset);
2243
666
                    }
2244
64
                }
2245
60
            }
2246
2247
60
            _files.clear();
2248
60
            _cur_cache_size = 0;
2249
60
            _cur_ttl_size = 0;
2250
60
            _time_to_key.clear();
2251
60
            _key_to_time.clear();
2252
60
            _index_queue.clear(cache_lock);
2253
60
            _normal_queue.clear(cache_lock);
2254
60
            _disposable_queue.clear(cache_lock);
2255
60
            _ttl_queue.clear(cache_lock);
2256
2257
            // Update cache metrics immediately so consumers observe the cleared state
2258
            // without waiting for the next background monitor round.
2259
60
            _cur_cache_size_metrics->set_value(0);
2260
60
            _cur_ttl_cache_size_metrics->set_value(0);
2261
60
            _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(0);
2262
60
            _cur_ttl_cache_lru_queue_element_count_metrics->set_value(0);
2263
60
            _cur_normal_queue_cache_size_metrics->set_value(0);
2264
60
            _cur_normal_queue_element_count_metrics->set_value(0);
2265
60
            _cur_index_queue_cache_size_metrics->set_value(0);
2266
60
            _cur_index_queue_element_count_metrics->set_value(0);
2267
60
            _cur_disposable_queue_cache_size_metrics->set_value(0);
2268
60
            _cur_disposable_queue_element_count_metrics->set_value(0);
2269
2270
60
            clear_need_update_lru_blocks();
2271
2272
60
            ss << "finish clear_file_cache_directly"
2273
60
               << " path=" << _cache_base_path << " time_elapsed_ms="
2274
60
               << duration_cast<milliseconds>(steady_clock::now() - start).count()
2275
60
               << " fd_clear_time_ms=" << (clear_fd_duration / 1000000)
2276
60
               << " num_files=" << num_files << " cache_size=" << cache_size
2277
60
               << " index_queue_size=" << index_queue_size
2278
60
               << " normal_queue_size=" << normal_queue_size
2279
60
               << " disposible_queue_size=" << disposible_queue_size
2280
60
               << "ttl_queue_size=" << ttl_queue_size;
2281
60
            result = ss.str();
2282
60
            LOG(INFO) << result;
2283
60
        }
2284
62
    }
2285
62
    _lru_dumper->remove_lru_dump_files();
2286
62
    resume_ttl_manager();
2287
62
    return result;
2288
62
}
2289
2290
42.2k
std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const UInt128Wrapper& hash) {
2291
42.2k
    std::map<size_t, FileBlockSPtr> offset_to_block;
2292
42.2k
    SCOPED_CACHE_LOCK(_mutex, this);
2293
42.2k
    if (_files.contains(hash)) {
2294
37.1k
        for (auto& [offset, cell] : _files[hash]) {
2295
37.1k
            if (cell.file_block->state() == FileBlock::State::DOWNLOADED) {
2296
37.1k
                cell.file_block->_owned_by_cached_reader = true;
2297
37.1k
                offset_to_block.emplace(offset, cell.file_block);
2298
37.1k
            }
2299
37.1k
        }
2300
35.4k
    }
2301
42.2k
    return offset_to_block;
2302
42.2k
}
2303
2304
0
void BlockFileCache::update_ttl_atime(const UInt128Wrapper& hash) {
2305
0
    SCOPED_CACHE_LOCK(_mutex, this);
2306
0
    if (auto iter = _files.find(hash); iter != _files.end()) {
2307
0
        for (auto& [_, cell] : iter->second) {
2308
0
            cell.update_atime();
2309
0
        }
2310
0
    };
2311
0
}
2312
2313
285
void BlockFileCache::run_background_lru_log_replay() {
2314
285
    Thread::set_self_name("run_background_lru_log_replay");
2315
28.9k
    while (!_close) {
2316
28.9k
        int64_t interval_ms = config::file_cache_background_lru_log_replay_interval_ms;
2317
28.9k
        {
2318
28.9k
            std::unique_lock close_lock(_close_mtx);
2319
28.9k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2320
28.9k
            if (_close) {
2321
272
                break;
2322
272
            }
2323
28.9k
        }
2324
2325
28.6k
        _lru_recorder->replay_queue_event(FileCacheType::TTL);
2326
28.6k
        _lru_recorder->replay_queue_event(FileCacheType::INDEX);
2327
28.6k
        _lru_recorder->replay_queue_event(FileCacheType::NORMAL);
2328
28.6k
        _lru_recorder->replay_queue_event(FileCacheType::DISPOSABLE);
2329
2330
28.6k
        if (config::enable_evaluate_shadow_queue_diff) {
2331
0
            SCOPED_CACHE_LOCK(_mutex, this);
2332
0
            _lru_recorder->evaluate_queue_diff(_ttl_queue, "ttl", cache_lock);
2333
0
            _lru_recorder->evaluate_queue_diff(_index_queue, "index", cache_lock);
2334
0
            _lru_recorder->evaluate_queue_diff(_normal_queue, "normal", cache_lock);
2335
0
            _lru_recorder->evaluate_queue_diff(_disposable_queue, "disposable", cache_lock);
2336
0
        }
2337
28.6k
    }
2338
285
}
2339
2340
3.57k
void BlockFileCache::dump_lru_queues(bool force) {
2341
3.57k
    std::unique_lock dump_lock(_dump_lru_queues_mtx);
2342
3.57k
    if (config::file_cache_background_lru_dump_tail_record_num > 0 &&
2343
3.57k
        !ExecEnv::GetInstance()->get_is_upgrading()) {
2344
3.57k
        _lru_dumper->dump_queue("disposable", force);
2345
3.57k
        _lru_dumper->dump_queue("normal", force);
2346
3.57k
        _lru_dumper->dump_queue("index", force);
2347
3.57k
        _lru_dumper->dump_queue("ttl", force);
2348
3.57k
        _lru_dumper->set_first_dump_done();
2349
3.57k
    }
2350
3.57k
}
2351
2352
285
void BlockFileCache::run_background_lru_dump() {
2353
285
    Thread::set_self_name("run_background_lru_dump");
2354
3.86k
    while (!_close) {
2355
3.85k
        int64_t interval_ms = config::file_cache_background_lru_dump_interval_ms;
2356
3.85k
        {
2357
3.85k
            std::unique_lock close_lock(_close_mtx);
2358
3.85k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2359
3.85k
            if (_close) {
2360
273
                break;
2361
273
            }
2362
3.85k
        }
2363
3.57k
        dump_lru_queues(false);
2364
3.57k
    }
2365
285
}
2366
2367
285
void BlockFileCache::restore_lru_queues_from_disk(std::lock_guard<std::mutex>& cache_lock) {
2368
    // keep this order coz may be duplicated in different queue, we use the first appearence
2369
285
    _lru_dumper->restore_queue(_ttl_queue, "ttl", cache_lock);
2370
285
    _lru_dumper->restore_queue(_index_queue, "index", cache_lock);
2371
285
    _lru_dumper->restore_queue(_normal_queue, "normal", cache_lock);
2372
285
    _lru_dumper->restore_queue(_disposable_queue, "disposable", cache_lock);
2373
285
}
2374
2375
117
std::map<std::string, double> BlockFileCache::get_stats() {
2376
117
    std::map<std::string, double> stats;
2377
117
    stats["hits_ratio"] = (double)_hit_ratio->get_value();
2378
117
    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
2379
117
    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
2380
2381
117
    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
2382
117
    stats["index_queue_curr_size"] = (double)_cur_index_queue_cache_size_metrics->get_value();
2383
117
    stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
2384
117
    stats["index_queue_curr_elements"] =
2385
117
            (double)_cur_index_queue_element_count_metrics->get_value();
2386
2387
117
    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
2388
117
    stats["ttl_queue_curr_size"] = (double)_cur_ttl_cache_lru_queue_cache_size_metrics->get_value();
2389
117
    stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
2390
117
    stats["ttl_queue_curr_elements"] =
2391
117
            (double)_cur_ttl_cache_lru_queue_element_count_metrics->get_value();
2392
2393
117
    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
2394
117
    stats["normal_queue_curr_size"] = (double)_cur_normal_queue_cache_size_metrics->get_value();
2395
117
    stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
2396
117
    stats["normal_queue_curr_elements"] =
2397
117
            (double)_cur_normal_queue_element_count_metrics->get_value();
2398
2399
117
    stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
2400
117
    stats["disposable_queue_curr_size"] =
2401
117
            (double)_cur_disposable_queue_cache_size_metrics->get_value();
2402
117
    stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
2403
117
    stats["disposable_queue_curr_elements"] =
2404
117
            (double)_cur_disposable_queue_element_count_metrics->get_value();
2405
2406
117
    stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance;
2407
117
    stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode;
2408
2409
117
    stats["total_removed_counts"] = (double)_num_removed_blocks->get_value();
2410
117
    stats["total_hit_counts"] = (double)_num_hit_blocks->get_value();
2411
117
    stats["total_read_counts"] = (double)_num_read_blocks->get_value();
2412
2413
117
    stats["total_read_size"] = (double)_total_read_size_metrics->get_value();
2414
117
    stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value();
2415
117
    stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value();
2416
2417
117
    return stats;
2418
117
}
2419
2420
// for be UTs
2421
344
std::map<std::string, double> BlockFileCache::get_stats_unsafe() {
2422
344
    std::map<std::string, double> stats;
2423
344
    stats["hits_ratio"] = (double)_hit_ratio->get_value();
2424
344
    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
2425
344
    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
2426
2427
344
    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
2428
344
    stats["index_queue_curr_size"] = (double)_index_queue.get_capacity_unsafe();
2429
344
    stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
2430
344
    stats["index_queue_curr_elements"] = (double)_index_queue.get_elements_num_unsafe();
2431
2432
344
    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
2433
344
    stats["ttl_queue_curr_size"] = (double)_ttl_queue.get_capacity_unsafe();
2434
344
    stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
2435
344
    stats["ttl_queue_curr_elements"] = (double)_ttl_queue.get_elements_num_unsafe();
2436
2437
344
    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
2438
344
    stats["normal_queue_curr_size"] = (double)_normal_queue.get_capacity_unsafe();
2439
344
    stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
2440
344
    stats["normal_queue_curr_elements"] = (double)_normal_queue.get_elements_num_unsafe();
2441
2442
344
    stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
2443
344
    stats["disposable_queue_curr_size"] = (double)_disposable_queue.get_capacity_unsafe();
2444
344
    stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
2445
344
    stats["disposable_queue_curr_elements"] = (double)_disposable_queue.get_elements_num_unsafe();
2446
2447
344
    stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance;
2448
344
    stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode;
2449
2450
344
    stats["total_removed_counts"] = (double)_num_removed_blocks->get_value();
2451
344
    stats["total_hit_counts"] = (double)_num_hit_blocks->get_value();
2452
344
    stats["total_read_counts"] = (double)_num_read_blocks->get_value();
2453
2454
344
    stats["total_read_size"] = (double)_total_read_size_metrics->get_value();
2455
344
    stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value();
2456
344
    stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value();
2457
2458
344
    return stats;
2459
344
}
2460
2461
template void BlockFileCache::remove(FileBlockSPtr file_block,
2462
                                     std::lock_guard<std::mutex>& cache_lock,
2463
                                     std::lock_guard<std::mutex>& block_lock, bool sync);
2464
2465
2
Status BlockFileCache::report_file_cache_inconsistency(std::vector<std::string>& results) {
2466
2
    InconsistencyContext inconsistency_context;
2467
2
    RETURN_IF_ERROR(check_file_cache_consistency(inconsistency_context));
2468
2
    auto n = inconsistency_context.types.size();
2469
2
    results.reserve(n);
2470
2
    for (size_t i = 0; i < n; i++) {
2471
0
        std::string result;
2472
0
        result += "File cache info in manager:\n";
2473
0
        result += inconsistency_context.infos_in_manager[i].to_string();
2474
0
        result += "File cache info in storage:\n";
2475
0
        result += inconsistency_context.infos_in_storage[i].to_string();
2476
0
        result += inconsistency_context.types[i].to_string();
2477
0
        result += "\n";
2478
0
        results.push_back(std::move(result));
2479
0
    }
2480
2
    return Status::OK();
2481
2
}
2482
2483
2
Status BlockFileCache::check_file_cache_consistency(InconsistencyContext& inconsistency_context) {
2484
2
    std::lock_guard<std::mutex> cache_lock(_mutex);
2485
2
    std::vector<FileCacheInfo> infos_in_storage;
2486
2
    RETURN_IF_ERROR(_storage->get_file_cache_infos(infos_in_storage, cache_lock));
2487
2
    std::unordered_set<AccessKeyAndOffset, KeyAndOffsetHash> confirmed_blocks;
2488
2
    for (const auto& info_in_storage : infos_in_storage) {
2489
0
        confirmed_blocks.insert({info_in_storage.hash, info_in_storage.offset});
2490
0
        auto* cell = get_cell(info_in_storage.hash, info_in_storage.offset, cache_lock);
2491
0
        if (cell == nullptr || cell->file_block == nullptr) {
2492
0
            inconsistency_context.infos_in_manager.emplace_back();
2493
0
            inconsistency_context.infos_in_storage.push_back(info_in_storage);
2494
0
            inconsistency_context.types.emplace_back(InconsistencyType::NOT_LOADED);
2495
0
            continue;
2496
0
        }
2497
0
        FileCacheInfo info_in_manager {
2498
0
                .hash = info_in_storage.hash,
2499
0
                .expiration_time = cell->file_block->expiration_time(),
2500
0
                .size = cell->size(),
2501
0
                .offset = info_in_storage.offset,
2502
0
                .is_tmp = cell->file_block->state() == FileBlock::State::DOWNLOADING,
2503
0
                .cache_type = cell->file_block->cache_type()};
2504
0
        InconsistencyType inconsistent_type;
2505
0
        if (info_in_storage.is_tmp != info_in_manager.is_tmp) {
2506
0
            inconsistent_type |= InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE;
2507
0
        }
2508
0
        size_t expected_size =
2509
0
                info_in_manager.is_tmp ? cell->dowloading_size() : info_in_manager.size;
2510
0
        if (info_in_storage.size != expected_size) {
2511
0
            inconsistent_type |= InconsistencyType::SIZE_INCONSISTENT;
2512
0
        }
2513
        // Only if it is not a tmp file need we check the cache type.
2514
0
        if ((inconsistent_type & InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE) == 0 &&
2515
0
            info_in_storage.cache_type != info_in_manager.cache_type) {
2516
0
            inconsistent_type |= InconsistencyType::CACHE_TYPE_INCONSISTENT;
2517
0
        }
2518
0
        if (info_in_storage.expiration_time != info_in_manager.expiration_time) {
2519
0
            inconsistent_type |= InconsistencyType::EXPIRATION_TIME_INCONSISTENT;
2520
0
        }
2521
0
        if (inconsistent_type != InconsistencyType::NONE) {
2522
0
            inconsistency_context.infos_in_manager.push_back(info_in_manager);
2523
0
            inconsistency_context.infos_in_storage.push_back(info_in_storage);
2524
0
            inconsistency_context.types.push_back(inconsistent_type);
2525
0
        }
2526
0
    }
2527
2528
2
    for (const auto& [hash, offset_to_cell] : _files) {
2529
0
        for (const auto& [offset, cell] : offset_to_cell) {
2530
0
            if (confirmed_blocks.contains({hash, offset})) {
2531
0
                continue;
2532
0
            }
2533
0
            const auto& block = cell.file_block;
2534
0
            inconsistency_context.infos_in_manager.emplace_back(
2535
0
                    hash, block->expiration_time(), cell.size(), offset,
2536
0
                    cell.file_block->state() == FileBlock::State::DOWNLOADING, block->cache_type());
2537
0
            inconsistency_context.infos_in_storage.emplace_back();
2538
0
            inconsistency_context.types.emplace_back(InconsistencyType::MISSING_IN_STORAGE);
2539
0
        }
2540
0
    }
2541
2
    return Status::OK();
2542
2
}
2543
2544
} // namespace doris::io