Coverage Report

Created: 2026-03-14 06:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/cache/block_file_cache.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileCache.cpp
19
// and modified by Doris
20
21
#include "io/cache/block_file_cache.h"
22
23
#include <gen_cpp/file_cache.pb.h>
24
25
#include <cstdio>
26
#include <exception>
27
#include <fstream>
28
#include <unordered_set>
29
30
#include "common/status.h"
31
#include "cpp/sync_point.h"
32
#include "runtime/exec_env.h"
33
34
#if defined(__APPLE__)
35
#include <sys/mount.h>
36
#else
37
#include <sys/statfs.h>
38
#endif
39
40
#include <chrono> // IWYU pragma: keep
41
#include <mutex>
42
#include <ranges>
43
44
#include "common/cast_set.h"
45
#include "common/config.h"
46
#include "common/logging.h"
47
#include "core/uint128.h"
48
#include "exec/common/sip_hash.h"
49
#include "io/cache/block_file_cache_ttl_mgr.h"
50
#include "io/cache/file_block.h"
51
#include "io/cache/file_cache_common.h"
52
#include "io/cache/fs_file_cache_storage.h"
53
#include "io/cache/mem_file_cache_storage.h"
54
#include "runtime/runtime_profile.h"
55
#include "util/concurrency_stats.h"
56
#include "util/stack_util.h"
57
#include "util/stopwatch.hpp"
58
#include "util/thread.h"
59
#include "util/time.h"
60
namespace doris::io {
61
#include "common/compile_check_begin.h"
62
63
// Insert a block pointer into one shard while swallowing allocation failures.
64
9.56k
bool NeedUpdateLRUBlocks::insert(FileBlockSPtr block) {
65
9.56k
    if (!block) {
66
1
        return false;
67
1
    }
68
9.56k
    try {
69
9.56k
        auto* raw_ptr = block.get();
70
9.56k
        auto idx = shard_index(raw_ptr);
71
9.56k
        auto& shard = _shards[idx];
72
9.56k
        std::lock_guard lock(shard.mutex);
73
9.56k
        auto [_, inserted] = shard.entries.emplace(raw_ptr, std::move(block));
74
9.56k
        if (inserted) {
75
1.02k
            _size.fetch_add(1, std::memory_order_relaxed);
76
1.02k
        }
77
9.56k
        return inserted;
78
9.56k
    } catch (const std::exception& e) {
79
0
        LOG(WARNING) << "Failed to enqueue block for LRU update: " << e.what();
80
0
    } catch (...) {
81
0
        LOG(WARNING) << "Failed to enqueue block for LRU update: unknown error";
82
0
    }
83
0
    return false;
84
9.56k
}
85
86
// Drain up to `limit` unique blocks to the caller, keeping the structure consistent on failures.
87
4.53k
size_t NeedUpdateLRUBlocks::drain(size_t limit, std::vector<FileBlockSPtr>* output) {
88
4.53k
    if (limit == 0 || output == nullptr) {
89
2
        return 0;
90
2
    }
91
4.53k
    size_t drained = 0;
92
4.53k
    try {
93
4.53k
        output->reserve(output->size() + std::min(limit, size()));
94
280k
        for (auto& shard : _shards) {
95
280k
            if (drained >= limit) {
96
1
                break;
97
1
            }
98
280k
            std::lock_guard lock(shard.mutex);
99
280k
            auto it = shard.entries.begin();
100
280k
            size_t shard_drained = 0;
101
280k
            while (it != shard.entries.end() && drained + shard_drained < limit) {
102
500
                output->emplace_back(std::move(it->second));
103
500
                it = shard.entries.erase(it);
104
500
                ++shard_drained;
105
500
            }
106
280k
            if (shard_drained > 0) {
107
6
                _size.fetch_sub(shard_drained, std::memory_order_relaxed);
108
6
                drained += shard_drained;
109
6
            }
110
280k
        }
111
4.53k
    } catch (const std::exception& e) {
112
0
        LOG(WARNING) << "Failed to drain LRU update blocks: " << e.what();
113
0
    } catch (...) {
114
0
        LOG(WARNING) << "Failed to drain LRU update blocks: unknown error";
115
0
    }
116
4.66k
    return drained;
117
4.53k
}
118
119
// Remove every pending block, guarding against unexpected exceptions.
120
31
void NeedUpdateLRUBlocks::clear() {
121
31
    try {
122
1.98k
        for (auto& shard : _shards) {
123
1.98k
            std::lock_guard lock(shard.mutex);
124
1.98k
            if (!shard.entries.empty()) {
125
1
                auto removed = shard.entries.size();
126
1
                shard.entries.clear();
127
1
                _size.fetch_sub(removed, std::memory_order_relaxed);
128
1
            }
129
1.98k
        }
130
31
    } catch (const std::exception& e) {
131
0
        LOG(WARNING) << "Failed to clear LRU update blocks: " << e.what();
132
0
    } catch (...) {
133
0
        LOG(WARNING) << "Failed to clear LRU update blocks: unknown error";
134
0
    }
135
31
}
136
137
9.56k
size_t NeedUpdateLRUBlocks::shard_index(FileBlock* ptr) const {
138
9.56k
    DCHECK(ptr != nullptr);
139
9.56k
    return std::hash<FileBlock*> {}(ptr)&kShardMask;
140
9.56k
}
141
142
BlockFileCache::BlockFileCache(const std::string& cache_base_path,
143
                               const FileCacheSettings& cache_settings)
144
159
        : _cache_base_path(cache_base_path),
145
159
          _capacity(cache_settings.capacity),
146
159
          _max_file_block_size(cache_settings.max_file_block_size) {
147
159
    _cur_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(_cache_base_path.c_str(),
148
159
                                                                     "file_cache_cache_size", 0);
149
159
    _cache_capacity_metrics = std::make_shared<bvar::Status<size_t>>(
150
159
            _cache_base_path.c_str(), "file_cache_capacity", _capacity);
151
159
    _cur_ttl_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
152
159
            _cache_base_path.c_str(), "file_cache_ttl_cache_size", 0);
153
159
    _cur_normal_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
154
159
            _cache_base_path.c_str(), "file_cache_normal_queue_element_count", 0);
155
159
    _cur_ttl_cache_lru_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
156
159
            _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_size", 0);
157
159
    _cur_ttl_cache_lru_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
158
159
            _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_element_count", 0);
159
159
    _cur_normal_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
160
159
            _cache_base_path.c_str(), "file_cache_normal_queue_cache_size", 0);
161
159
    _cur_index_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
162
159
            _cache_base_path.c_str(), "file_cache_index_queue_element_count", 0);
163
159
    _cur_index_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
164
159
            _cache_base_path.c_str(), "file_cache_index_queue_cache_size", 0);
165
159
    _cur_disposable_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
166
159
            _cache_base_path.c_str(), "file_cache_disposable_queue_element_count", 0);
167
159
    _cur_disposable_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
168
159
            _cache_base_path.c_str(), "file_cache_disposable_queue_cache_size", 0);
169
170
159
    _queue_evict_size_metrics[0] = std::make_shared<bvar::Adder<size_t>>(
171
159
            _cache_base_path.c_str(), "file_cache_index_queue_evict_size");
172
159
    _queue_evict_size_metrics[1] = std::make_shared<bvar::Adder<size_t>>(
173
159
            _cache_base_path.c_str(), "file_cache_normal_queue_evict_size");
174
159
    _queue_evict_size_metrics[2] = std::make_shared<bvar::Adder<size_t>>(
175
159
            _cache_base_path.c_str(), "file_cache_disposable_queue_evict_size");
176
159
    _queue_evict_size_metrics[3] = std::make_shared<bvar::Adder<size_t>>(
177
159
            _cache_base_path.c_str(), "file_cache_ttl_cache_evict_size");
178
159
    _total_evict_size_metrics = std::make_shared<bvar::Adder<size_t>>(
179
159
            _cache_base_path.c_str(), "file_cache_total_evict_size");
180
159
    _total_read_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
181
159
                                                                     "file_cache_total_read_size");
182
159
    _total_hit_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
183
159
                                                                    "file_cache_total_hit_size");
184
159
    _gc_evict_bytes_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
185
159
                                                                    "file_cache_gc_evict_bytes");
186
159
    _gc_evict_count_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
187
159
                                                                    "file_cache_gc_evict_count");
188
189
159
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
190
159
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
191
159
                                                  "file_cache_evict_by_time_disposable_to_normal");
192
159
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
193
159
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
194
159
                                                  "file_cache_evict_by_time_disposable_to_index");
195
159
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
196
159
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
197
159
                                                  "file_cache_evict_by_time_disposable_to_ttl");
198
159
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
199
159
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
200
159
                                                  "file_cache_evict_by_time_normal_to_disposable");
201
159
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
202
159
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
203
159
                                                  "file_cache_evict_by_time_normal_to_index");
204
159
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
205
159
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
206
159
                                                  "file_cache_evict_by_time_normal_to_ttl");
207
159
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
208
159
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
209
159
                                                  "file_cache_evict_by_time_index_to_disposable");
210
159
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
211
159
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
212
159
                                                  "file_cache_evict_by_time_index_to_normal");
213
159
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
214
159
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
215
159
                                                  "file_cache_evict_by_time_index_to_ttl");
216
159
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
217
159
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
218
159
                                                  "file_cache_evict_by_time_ttl_to_disposable");
219
159
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
220
159
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
221
159
                                                  "file_cache_evict_by_time_ttl_to_normal");
222
159
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
223
159
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
224
159
                                                  "file_cache_evict_by_time_ttl_to_index");
225
226
159
    _evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE] =
227
159
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
228
159
                                                  "file_cache_evict_by_self_lru_disposable");
229
159
    _evict_by_self_lru_metrics_matrix[FileCacheType::NORMAL] =
230
159
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
231
159
                                                  "file_cache_evict_by_self_lru_normal");
232
159
    _evict_by_self_lru_metrics_matrix[FileCacheType::INDEX] = std::make_shared<bvar::Adder<size_t>>(
233
159
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_index");
234
159
    _evict_by_self_lru_metrics_matrix[FileCacheType::TTL] = std::make_shared<bvar::Adder<size_t>>(
235
159
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_ttl");
236
237
159
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
238
159
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
239
159
                                                  "file_cache_evict_by_size_disposable_to_normal");
240
159
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
241
159
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
242
159
                                                  "file_cache_evict_by_size_disposable_to_index");
243
159
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
244
159
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
245
159
                                                  "file_cache_evict_by_size_disposable_to_ttl");
246
159
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
247
159
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
248
159
                                                  "file_cache_evict_by_size_normal_to_disposable");
249
159
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
250
159
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
251
159
                                                  "file_cache_evict_by_size_normal_to_index");
252
159
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
253
159
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
254
159
                                                  "file_cache_evict_by_size_normal_to_ttl");
255
159
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
256
159
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
257
159
                                                  "file_cache_evict_by_size_index_to_disposable");
258
159
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
259
159
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
260
159
                                                  "file_cache_evict_by_size_index_to_normal");
261
159
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
262
159
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
263
159
                                                  "file_cache_evict_by_size_index_to_ttl");
264
159
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
265
159
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
266
159
                                                  "file_cache_evict_by_size_ttl_to_disposable");
267
159
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
268
159
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
269
159
                                                  "file_cache_evict_by_size_ttl_to_normal");
270
159
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
271
159
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
272
159
                                                  "file_cache_evict_by_size_ttl_to_index");
273
274
159
    _evict_by_try_release = std::make_shared<bvar::Adder<size_t>>(
275
159
            _cache_base_path.c_str(), "file_cache_evict_by_try_release");
276
277
159
    _num_read_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
278
159
                                                             "file_cache_num_read_blocks");
279
159
    _num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
280
159
                                                            "file_cache_num_hit_blocks");
281
159
    _num_removed_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
282
159
                                                                "file_cache_num_removed_blocks");
283
284
159
    _no_warmup_num_read_blocks = std::make_shared<bvar::Adder<size_t>>(
285
159
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks");
286
159
    _no_warmup_num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(
287
159
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks");
288
289
#ifndef BE_TEST
290
    _num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
291
            _cache_base_path.c_str(), "file_cache_num_hit_blocks_5m", _num_hit_blocks.get(), 300);
292
    _num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
293
            _cache_base_path.c_str(), "file_cache_num_read_blocks_5m", _num_read_blocks.get(), 300);
294
    _num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
295
            _cache_base_path.c_str(), "file_cache_num_hit_blocks_1h", _num_hit_blocks.get(), 3600);
296
    _num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
297
            _cache_base_path.c_str(), "file_cache_num_read_blocks_1h", _num_read_blocks.get(),
298
            3600);
299
    _no_warmup_num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
300
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_5m",
301
            _no_warmup_num_hit_blocks.get(), 300);
302
    _no_warmup_num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
303
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_5m",
304
            _no_warmup_num_read_blocks.get(), 300);
305
    _no_warmup_num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
306
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_1h",
307
            _no_warmup_num_hit_blocks.get(), 3600);
308
    _no_warmup_num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
309
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_1h",
310
            _no_warmup_num_read_blocks.get(), 3600);
311
#endif
312
313
159
    _hit_ratio = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
314
159
                                                        "file_cache_hit_ratio", 0.0);
315
159
    _hit_ratio_5m = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
316
159
                                                           "file_cache_hit_ratio_5m", 0.0);
317
159
    _hit_ratio_1h = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
318
159
                                                           "file_cache_hit_ratio_1h", 0.0);
319
320
159
    _no_warmup_hit_ratio = std::make_shared<bvar::Status<double>>(
321
159
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio", 0.0);
322
159
    _no_warmup_hit_ratio_5m = std::make_shared<bvar::Status<double>>(
323
159
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_5m", 0.0);
324
159
    _no_warmup_hit_ratio_1h = std::make_shared<bvar::Status<double>>(
325
159
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_1h", 0.0);
326
327
159
    _disk_limit_mode_metrics = std::make_shared<bvar::Status<size_t>>(
328
159
            _cache_base_path.c_str(), "file_cache_disk_limit_mode", 0);
329
159
    _need_evict_cache_in_advance_metrics = std::make_shared<bvar::Status<size_t>>(
330
159
            _cache_base_path.c_str(), "file_cache_need_evict_cache_in_advance", 0);
331
159
    _meta_store_write_queue_size_metrics = std::make_shared<bvar::Status<size_t>>(
332
159
            _cache_base_path.c_str(), "file_cache_meta_store_write_queue_size", 0);
333
334
159
    _cache_lock_wait_time_us = std::make_shared<bvar::LatencyRecorder>(
335
159
            _cache_base_path.c_str(), "file_cache_cache_lock_wait_time_us");
336
159
    _get_or_set_latency_us = std::make_shared<bvar::LatencyRecorder>(
337
159
            _cache_base_path.c_str(), "file_cache_get_or_set_latency_us");
338
159
    _storage_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
339
159
            _cache_base_path.c_str(), "file_cache_storage_sync_remove_latency_us");
340
159
    _storage_retry_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
341
159
            _cache_base_path.c_str(), "file_cache_storage_retry_sync_remove_latency_us");
342
159
    _storage_async_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
343
159
            _cache_base_path.c_str(), "file_cache_storage_async_remove_latency_us");
344
159
    _evict_in_advance_latency_us = std::make_shared<bvar::LatencyRecorder>(
345
159
            _cache_base_path.c_str(), "file_cache_evict_in_advance_latency_us");
346
159
    _lru_dump_latency_us = std::make_shared<bvar::LatencyRecorder>(
347
159
            _cache_base_path.c_str(), "file_cache_lru_dump_latency_us");
348
159
    _recycle_keys_length_recorder = std::make_shared<bvar::LatencyRecorder>(
349
159
            _cache_base_path.c_str(), "file_cache_recycle_keys_length");
350
159
    _need_update_lru_blocks_length_recorder = std::make_shared<bvar::LatencyRecorder>(
351
159
            _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_length");
352
159
    _update_lru_blocks_latency_us = std::make_shared<bvar::LatencyRecorder>(
353
159
            _cache_base_path.c_str(), "file_cache_update_lru_blocks_latency_us");
354
159
    _ttl_gc_latency_us = std::make_shared<bvar::LatencyRecorder>(_cache_base_path.c_str(),
355
159
                                                                 "file_cache_ttl_gc_latency_us");
356
159
    _shadow_queue_levenshtein_distance = std::make_shared<bvar::LatencyRecorder>(
357
159
            _cache_base_path.c_str(), "file_cache_shadow_queue_levenshtein_distance");
358
359
159
    _disposable_queue = LRUQueue(cache_settings.disposable_queue_size,
360
159
                                 cache_settings.disposable_queue_elements, 60 * 60);
361
159
    _index_queue = LRUQueue(cache_settings.index_queue_size, cache_settings.index_queue_elements,
362
159
                            7 * 24 * 60 * 60);
363
159
    _normal_queue = LRUQueue(cache_settings.query_queue_size, cache_settings.query_queue_elements,
364
159
                             24 * 60 * 60);
365
159
    _ttl_queue = LRUQueue(cache_settings.ttl_queue_size, cache_settings.ttl_queue_elements,
366
159
                          std::numeric_limits<int>::max());
367
368
159
    _lru_recorder = std::make_unique<LRUQueueRecorder>(this);
369
159
    _lru_dumper = std::make_unique<CacheLRUDumper>(this, _lru_recorder.get());
370
159
    if (cache_settings.storage == "memory") {
371
16
        _storage = std::make_unique<MemFileCacheStorage>();
372
16
        _cache_base_path = "memory";
373
143
    } else {
374
143
        _storage = std::make_unique<FSFileCacheStorage>();
375
143
    }
376
377
159
    LOG(INFO) << "file cache path= " << _cache_base_path << " " << cache_settings.to_string();
378
159
}
379
380
4.24k
UInt128Wrapper BlockFileCache::hash(const std::string& path) {
381
4.24k
    uint128_t value;
382
4.24k
    sip_hash128(path.data(), path.size(), reinterpret_cast<char*>(&value));
383
4.24k
    return UInt128Wrapper(value);
384
4.24k
}
385
386
BlockFileCache::QueryFileCacheContextHolderPtr BlockFileCache::get_query_context_holder(
387
8
        const TUniqueId& query_id, int file_cache_query_limit_percent) {
388
8
    SCOPED_CACHE_LOCK(_mutex, this);
389
8
    if (!config::enable_file_cache_query_limit) {
390
1
        return {};
391
1
    }
392
393
    /// if enable_filesystem_query_cache_limit is true,
394
    /// we create context query for current query.
395
7
    auto context = get_or_set_query_context(query_id, cache_lock, file_cache_query_limit_percent);
396
7
    return std::make_unique<QueryFileCacheContextHolder>(query_id, this, context);
397
8
}
398
399
BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_query_context(
400
3.37k
        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock) {
401
3.37k
    auto query_iter = _query_map.find(query_id);
402
3.37k
    return (query_iter == _query_map.end()) ? nullptr : query_iter->second;
403
3.37k
}
404
405
6
void BlockFileCache::remove_query_context(const TUniqueId& query_id) {
406
6
    SCOPED_CACHE_LOCK(_mutex, this);
407
6
    const auto& query_iter = _query_map.find(query_id);
408
409
6
    if (query_iter != _query_map.end() && query_iter->second.use_count() <= 1) {
410
5
        _query_map.erase(query_iter);
411
5
    }
412
6
}
413
414
BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_or_set_query_context(
415
        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock,
416
7
        int file_cache_query_limit_percent) {
417
7
    if (query_id.lo == 0 && query_id.hi == 0) {
418
1
        return nullptr;
419
1
    }
420
421
6
    auto context = get_query_context(query_id, cache_lock);
422
6
    if (context) {
423
1
        return context;
424
1
    }
425
426
5
    size_t file_cache_query_limit_size = _capacity * file_cache_query_limit_percent / 100;
427
5
    if (file_cache_query_limit_size < 268435456) {
428
5
        LOG(WARNING) << "The user-set file cache query limit (" << file_cache_query_limit_size
429
5
                     << " bytes) is less than the 256MB recommended minimum. "
430
5
                     << "Consider increasing the session variable 'file_cache_query_limit_percent'"
431
5
                     << " from its current value " << file_cache_query_limit_percent << "%.";
432
5
    }
433
5
    auto query_context = std::make_shared<QueryFileCacheContext>(file_cache_query_limit_size);
434
5
    auto query_iter = _query_map.emplace(query_id, query_context).first;
435
5
    return query_iter->second;
436
6
}
437
438
void BlockFileCache::QueryFileCacheContext::remove(const UInt128Wrapper& hash, size_t offset,
439
20
                                                   std::lock_guard<std::mutex>& cache_lock) {
440
20
    auto pair = std::make_pair(hash, offset);
441
20
    auto record = records.find(pair);
442
20
    DCHECK(record != records.end());
443
20
    auto iter = record->second;
444
20
    records.erase(pair);
445
20
    lru_queue.remove(iter, cache_lock);
446
20
}
447
448
void BlockFileCache::QueryFileCacheContext::reserve(const UInt128Wrapper& hash, size_t offset,
449
                                                    size_t size,
450
30
                                                    std::lock_guard<std::mutex>& cache_lock) {
451
30
    auto pair = std::make_pair(hash, offset);
452
30
    if (records.find(pair) == records.end()) {
453
29
        auto queue_iter = lru_queue.add(hash, offset, size, cache_lock);
454
29
        records.insert({pair, queue_iter});
455
29
    }
456
30
}
457
458
139
Status BlockFileCache::initialize() {
459
139
    SCOPED_CACHE_LOCK(_mutex, this);
460
139
    return initialize_unlocked(cache_lock);
461
139
}
462
463
139
Status BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lock) {
464
139
    DCHECK(!_is_initialized);
465
139
    _is_initialized = true;
466
139
    if (config::file_cache_background_lru_dump_tail_record_num > 0) {
467
        // requirements:
468
        // 1. restored data should not overwrite the last dump
469
        // 2. restore should happen before load and async load
470
        // 3. all queues should be restored sequencially to avoid conflict
471
        // TODO(zhengyu): we can parralize them but will increase complexity, so lets check the time cost
472
        // to see if any improvement is a necessary
473
139
        restore_lru_queues_from_disk(cache_lock);
474
139
    }
475
139
    RETURN_IF_ERROR(_storage->init(this));
476
477
139
    if (auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get())) {
478
124
        if (auto* meta_store = fs_storage->get_meta_store()) {
479
124
            _ttl_mgr = std::make_unique<BlockFileCacheTtlMgr>(this, meta_store);
480
124
        }
481
124
    }
482
483
139
    _cache_background_monitor_thread = std::thread(&BlockFileCache::run_background_monitor, this);
484
139
    _cache_background_gc_thread = std::thread(&BlockFileCache::run_background_gc, this);
485
139
    _cache_background_evict_in_advance_thread =
486
139
            std::thread(&BlockFileCache::run_background_evict_in_advance, this);
487
139
    _cache_background_block_lru_update_thread =
488
139
            std::thread(&BlockFileCache::run_background_block_lru_update, this);
489
490
    // Initialize LRU dump thread and restore queues
491
139
    _cache_background_lru_dump_thread = std::thread(&BlockFileCache::run_background_lru_dump, this);
492
139
    _cache_background_lru_log_replay_thread =
493
139
            std::thread(&BlockFileCache::run_background_lru_log_replay, this);
494
495
139
    return Status::OK();
496
139
}
497
498
void BlockFileCache::update_block_lru(FileBlockSPtr block,
499
495
                                      std::lock_guard<std::mutex>& cache_lock) {
500
495
    FileBlockCell* cell = block->cell;
501
495
    if (cell) {
502
495
        if (cell->queue_iterator) {
503
495
            auto& queue = get_queue(block->cache_type());
504
495
            queue.move_to_end(*cell->queue_iterator, cache_lock);
505
495
            _lru_recorder->record_queue_event(block->cache_type(), CacheLRULogType::MOVETOBACK,
506
495
                                              block->_key.hash, block->_key.offset,
507
495
                                              block->_block_range.size());
508
495
        }
509
495
        cell->update_atime();
510
495
    }
511
495
}
512
513
void BlockFileCache::use_cell(const FileBlockCell& cell, FileBlocks* result, bool move_iter_flag,
514
773
                              std::lock_guard<std::mutex>& cache_lock) {
515
773
    if (result) {
516
773
        result->push_back(cell.file_block);
517
773
    }
518
519
773
    auto& queue = get_queue(cell.file_block->cache_type());
520
    /// Move to the end of the queue. The iterator remains valid.
521
773
    if (cell.queue_iterator && move_iter_flag) {
522
771
        queue.move_to_end(*cell.queue_iterator, cache_lock);
523
771
        _lru_recorder->record_queue_event(cell.file_block->cache_type(),
524
771
                                          CacheLRULogType::MOVETOBACK, cell.file_block->_key.hash,
525
771
                                          cell.file_block->_key.offset, cell.size());
526
771
    }
527
528
773
    cell.update_atime();
529
773
}
530
531
template <class T>
532
    requires IsXLock<T>
533
FileBlockCell* BlockFileCache::get_cell(const UInt128Wrapper& hash, size_t offset,
534
4.99k
                                        T& /* cache_lock */) {
535
4.99k
    auto it = _files.find(hash);
536
4.99k
    if (it == _files.end()) {
537
1
        return nullptr;
538
1
    }
539
540
4.99k
    auto& offsets = it->second;
541
4.99k
    auto cell_it = offsets.find(offset);
542
4.99k
    if (cell_it == offsets.end()) {
543
3
        return nullptr;
544
3
    }
545
546
4.99k
    return &cell_it->second;
547
4.99k
}
548
549
773
bool BlockFileCache::need_to_move(FileCacheType cell_type, FileCacheType query_type) const {
550
773
    return query_type != FileCacheType::DISPOSABLE && cell_type != FileCacheType::DISPOSABLE;
551
773
}
552
553
FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheContext& context,
554
                                    const FileBlock::Range& range,
555
8.42k
                                    std::lock_guard<std::mutex>& cache_lock) {
556
    /// Given range = [left, right] and non-overlapping ordered set of file blocks,
557
    /// find list [block1, ..., blockN] of blocks which intersect with given range.
558
8.42k
    auto it = _files.find(hash);
559
8.42k
    if (it == _files.end()) {
560
1.94k
        if (_async_open_done) {
561
946
            return {};
562
946
        }
563
1.00k
        FileCacheKey key;
564
1.00k
        key.hash = hash;
565
1.00k
        key.meta.type = context.cache_type;
566
1.00k
        key.meta.expiration_time = context.expiration_time;
567
1.00k
        key.meta.tablet_id = context.tablet_id;
568
1.00k
        _storage->load_blocks_directly_unlocked(this, key, cache_lock);
569
570
1.00k
        it = _files.find(hash);
571
1.00k
        if (it == _files.end()) [[unlikely]] {
572
1.00k
            return {};
573
1.00k
        }
574
1.00k
    }
575
576
6.47k
    auto& file_blocks = it->second;
577
6.47k
    if (file_blocks.empty()) {
578
0
        LOG(WARNING) << "file_blocks is empty for hash=" << hash.to_string()
579
0
                     << " cache type=" << context.cache_type
580
0
                     << " cache expiration time=" << context.expiration_time
581
0
                     << " cache range=" << range.left << " " << range.right
582
0
                     << " query id=" << context.query_id;
583
0
        DCHECK(false);
584
0
        _files.erase(hash);
585
0
        return {};
586
0
    }
587
588
6.47k
    FileBlocks result;
589
6.47k
    auto block_it = file_blocks.lower_bound(range.left);
590
6.47k
    if (block_it == file_blocks.end()) {
591
        /// N - last cached block for given file hash, block{N}.offset < range.left:
592
        ///   block{N}                       block{N}
593
        /// [________                         [_______]
594
        ///     [__________]         OR                  [________]
595
        ///     ^                                        ^
596
        ///     range.left                               range.left
597
598
6.20k
        const auto& cell = file_blocks.rbegin()->second;
599
6.20k
        if (cell.file_block->range().right < range.left) {
600
6.19k
            return {};
601
6.19k
        }
602
603
14
        use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
604
14
                 cache_lock);
605
268
    } else { /// block_it <-- segmment{k}
606
268
        if (block_it != file_blocks.begin()) {
607
217
            const auto& prev_cell = std::prev(block_it)->second;
608
217
            const auto& prev_cell_range = prev_cell.file_block->range();
609
610
217
            if (range.left <= prev_cell_range.right) {
611
                ///   block{k-1}  block{k}
612
                ///   [________]   [_____
613
                ///       [___________
614
                ///       ^
615
                ///       range.left
616
617
137
                use_cell(prev_cell, &result,
618
137
                         need_to_move(prev_cell.file_block->cache_type(), context.cache_type),
619
137
                         cache_lock);
620
137
            }
621
217
        }
622
623
        ///  block{k} ...       block{k-1}  block{k}                      block{k}
624
        ///  [______              [______]     [____                        [________
625
        ///  [_________     OR              [________      OR    [______]   ^
626
        ///  ^                              ^                           ^   block{k}.offset
627
        ///  range.left                     range.left                  range.right
628
629
890
        while (block_it != file_blocks.end()) {
630
794
            const auto& cell = block_it->second;
631
794
            if (range.right < cell.file_block->range().left) {
632
172
                break;
633
172
            }
634
635
622
            use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
636
622
                     cache_lock);
637
622
            ++block_it;
638
622
        }
639
268
    }
640
641
282
    return result;
642
6.47k
}
643
644
9.54k
void BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) {
645
9.54k
    if (_need_update_lru_blocks.insert(std::move(block))) {
646
1.01k
        *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
647
1.01k
    }
648
9.54k
}
649
650
4
std::string BlockFileCache::clear_file_cache_async() {
651
4
    LOG(INFO) << "start clear_file_cache_async, path=" << _cache_base_path;
652
4
    _lru_dumper->remove_lru_dump_files();
653
4
    int64_t num_cells_all = 0;
654
4
    int64_t num_cells_to_delete = 0;
655
4
    int64_t num_cells_wait_recycle = 0;
656
4
    int64_t num_files_all = 0;
657
4
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::clear_file_cache_async");
658
4
    {
659
4
        SCOPED_CACHE_LOCK(_mutex, this);
660
661
4
        std::vector<FileBlockCell*> deleting_cells;
662
4
        for (auto& [_, offset_to_cell] : _files) {
663
4
            ++num_files_all;
664
48
            for (auto& [_1, cell] : offset_to_cell) {
665
48
                ++num_cells_all;
666
48
                deleting_cells.push_back(&cell);
667
48
            }
668
4
        }
669
670
        // we cannot delete the element in the loop above, because it will break the iterator
671
48
        for (auto& cell : deleting_cells) {
672
48
            if (!cell->releasable()) {
673
2
                LOG(INFO) << "cell is not releasable, hash="
674
2
                          << " offset=" << cell->file_block->offset();
675
2
                cell->file_block->set_deleting();
676
2
                ++num_cells_wait_recycle;
677
2
                continue;
678
2
            }
679
46
            FileBlockSPtr file_block = cell->file_block;
680
46
            if (file_block) {
681
46
                std::lock_guard block_lock(file_block->_mutex);
682
46
                remove(file_block, cache_lock, block_lock, false);
683
46
                ++num_cells_to_delete;
684
46
            }
685
46
        }
686
4
        clear_need_update_lru_blocks();
687
4
    }
688
689
4
    std::stringstream ss;
690
4
    ss << "finish clear_file_cache_async, path=" << _cache_base_path
691
4
       << " num_files_all=" << num_files_all << " num_cells_all=" << num_cells_all
692
4
       << " num_cells_to_delete=" << num_cells_to_delete
693
4
       << " num_cells_wait_recycle=" << num_cells_wait_recycle;
694
4
    auto msg = ss.str();
695
4
    LOG(INFO) << msg;
696
4
    _lru_dumper->remove_lru_dump_files();
697
4
    return msg;
698
4
}
699
700
FileBlocks BlockFileCache::split_range_into_cells(const UInt128Wrapper& hash,
701
                                                  const CacheContext& context, size_t offset,
702
                                                  size_t size, FileBlock::State state,
703
8.33k
                                                  std::lock_guard<std::mutex>& cache_lock) {
704
8.33k
    DCHECK(size > 0);
705
706
8.33k
    auto current_pos = offset;
707
8.33k
    auto end_pos_non_included = offset + size;
708
709
8.33k
    size_t current_size = 0;
710
8.33k
    size_t remaining_size = size;
711
712
8.33k
    FileBlocks file_blocks;
713
16.7k
    while (current_pos < end_pos_non_included) {
714
8.37k
        current_size = std::min(remaining_size, _max_file_block_size);
715
8.37k
        remaining_size -= current_size;
716
8.37k
        state = try_reserve(hash, context, current_pos, current_size, cache_lock)
717
8.37k
                        ? state
718
8.37k
                        : FileBlock::State::SKIP_CACHE;
719
8.37k
        if (state == FileBlock::State::SKIP_CACHE) [[unlikely]] {
720
62
            FileCacheKey key;
721
62
            key.hash = hash;
722
62
            key.offset = current_pos;
723
62
            key.meta.type = context.cache_type;
724
62
            key.meta.expiration_time = context.expiration_time;
725
62
            key.meta.tablet_id = context.tablet_id;
726
62
            auto file_block = std::make_shared<FileBlock>(key, current_size, this,
727
62
                                                          FileBlock::State::SKIP_CACHE);
728
62
            file_blocks.push_back(std::move(file_block));
729
8.31k
        } else {
730
8.31k
            auto* cell = add_cell(hash, context, current_pos, current_size, state, cache_lock);
731
8.31k
            if (cell) {
732
8.31k
                file_blocks.push_back(cell->file_block);
733
8.31k
                if (!context.is_cold_data) {
734
8.31k
                    cell->update_atime();
735
8.31k
                }
736
8.31k
            }
737
8.31k
            if (_ttl_mgr && context.tablet_id != 0) {
738
1.02k
                _ttl_mgr->register_tablet_id(context.tablet_id);
739
1.02k
            }
740
8.31k
        }
741
742
8.37k
        current_pos += current_size;
743
8.37k
    }
744
745
8.33k
    DCHECK(file_blocks.empty() || offset + size - 1 == file_blocks.back()->range().right);
746
8.33k
    return file_blocks;
747
8.33k
}
748
749
void BlockFileCache::fill_holes_with_empty_file_blocks(FileBlocks& file_blocks,
750
                                                       const UInt128Wrapper& hash,
751
                                                       const CacheContext& context,
752
                                                       const FileBlock::Range& range,
753
278
                                                       std::lock_guard<std::mutex>& cache_lock) {
754
    /// There are blocks [block1, ..., blockN]
755
    /// (non-overlapping, non-empty, ascending-ordered) which (maybe partially)
756
    /// intersect with given range.
757
758
    /// It can have holes:
759
    /// [____________________]         -- requested range
760
    ///     [____]  [_]   [_________]  -- intersecting cache [block1, ..., blockN]
761
    ///
762
    /// For each such hole create a cell with file block state EMPTY.
763
764
278
    auto it = file_blocks.begin();
765
278
    auto block_range = (*it)->range();
766
767
278
    size_t current_pos = 0;
768
278
    if (block_range.left < range.left) {
769
        ///    [_______     -- requested range
770
        /// [_______
771
        /// ^
772
        /// block1
773
774
151
        current_pos = block_range.right + 1;
775
151
        ++it;
776
151
    } else {
777
127
        current_pos = range.left;
778
127
    }
779
780
900
    while (current_pos <= range.right && it != file_blocks.end()) {
781
622
        block_range = (*it)->range();
782
783
622
        if (current_pos == block_range.left) {
784
513
            current_pos = block_range.right + 1;
785
513
            ++it;
786
513
            continue;
787
513
        }
788
789
622
        DCHECK(current_pos < block_range.left);
790
791
109
        auto hole_size = block_range.left - current_pos;
792
793
109
        file_blocks.splice(it, split_range_into_cells(hash, context, current_pos, hole_size,
794
109
                                                      FileBlock::State::EMPTY, cache_lock));
795
796
109
        current_pos = block_range.right + 1;
797
109
        ++it;
798
109
    }
799
800
278
    if (current_pos <= range.right) {
801
        ///   ________]     -- requested range
802
        ///   _____]
803
        ///        ^
804
        /// blockN
805
806
86
        auto hole_size = range.right - current_pos + 1;
807
808
86
        file_blocks.splice(file_blocks.end(),
809
86
                           split_range_into_cells(hash, context, current_pos, hole_size,
810
86
                                                  FileBlock::State::EMPTY, cache_lock));
811
86
    }
812
278
}
813
814
FileBlocksHolder BlockFileCache::get_or_set(const UInt128Wrapper& hash, size_t offset, size_t size,
815
8.42k
                                            CacheContext& context) {
816
8.42k
    FileBlock::Range range(offset, offset + size - 1);
817
818
8.42k
    ReadStatistics* stats = context.stats;
819
8.42k
    DCHECK(stats != nullptr);
820
8.42k
    MonotonicStopWatch sw;
821
8.42k
    sw.start();
822
8.42k
    ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->increment();
823
8.42k
    std::lock_guard cache_lock(_mutex);
824
8.42k
    ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->decrement();
825
8.42k
    stats->lock_wait_timer += sw.elapsed_time();
826
8.42k
    FileBlocks file_blocks;
827
8.42k
    int64_t duration = 0;
828
8.42k
    {
829
8.42k
        SCOPED_RAW_TIMER(&duration);
830
        /// Get all blocks which intersect with the given range.
831
8.42k
        {
832
8.42k
            SCOPED_RAW_TIMER(&stats->get_timer);
833
8.42k
            file_blocks = get_impl(hash, context, range, cache_lock);
834
8.42k
        }
835
836
8.42k
        if (file_blocks.empty()) {
837
8.14k
            SCOPED_RAW_TIMER(&stats->set_timer);
838
8.14k
            file_blocks = split_range_into_cells(hash, context, offset, size,
839
8.14k
                                                 FileBlock::State::EMPTY, cache_lock);
840
8.14k
        } else {
841
278
            SCOPED_RAW_TIMER(&stats->set_timer);
842
278
            fill_holes_with_empty_file_blocks(file_blocks, hash, context, range, cache_lock);
843
278
        }
844
8.42k
        DCHECK(!file_blocks.empty());
845
8.42k
        *_num_read_blocks << file_blocks.size();
846
8.42k
        if (!context.is_warmup) {
847
8.42k
            *_no_warmup_num_read_blocks << file_blocks.size();
848
8.42k
        }
849
9.15k
        for (auto& block : file_blocks) {
850
9.15k
            size_t block_size = block->range().size();
851
9.15k
            *_total_read_size_metrics << block_size;
852
9.15k
            if (block->state_unsafe() == FileBlock::State::DOWNLOADED) {
853
729
                *_num_hit_blocks << 1;
854
729
                *_total_hit_size_metrics << block_size;
855
729
                if (!context.is_warmup) {
856
729
                    *_no_warmup_num_hit_blocks << 1;
857
729
                }
858
729
            }
859
9.15k
        }
860
8.42k
    }
861
8.42k
    *_get_or_set_latency_us << (duration / 1000);
862
8.42k
    return FileBlocksHolder(std::move(file_blocks));
863
8.42k
}
864
865
FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& hash, const CacheContext& context,
866
                                        size_t offset, size_t size, FileBlock::State state,
867
8.51k
                                        std::lock_guard<std::mutex>& cache_lock) {
868
    /// Create a file block cell and put it in `files` map by [hash][offset].
869
8.51k
    if (size == 0) {
870
0
        return nullptr; /// Empty files are not cached.
871
0
    }
872
873
8.51k
    VLOG_DEBUG << "Adding file block to cache. size=" << size << " hash=" << hash.to_string()
874
0
               << " offset=" << offset << " cache_type=" << cache_type_to_string(context.cache_type)
875
0
               << " expiration_time=" << context.expiration_time
876
0
               << " tablet_id=" << context.tablet_id;
877
878
8.51k
    if (size > 1024 * 1024 * 1024) {
879
0
        LOG(WARNING) << "File block size is too large for a block. size=" << size
880
0
                     << " hash=" << hash.to_string() << " offset=" << offset
881
0
                     << " stack:" << get_stack_trace();
882
0
    }
883
884
8.51k
    auto& offsets = _files[hash];
885
8.51k
    auto itr = offsets.find(offset);
886
8.51k
    if (itr != offsets.end()) {
887
5
        VLOG_DEBUG << "Cache already exists for hash: " << hash.to_string()
888
0
                   << ", offset: " << offset << ", size: " << size
889
0
                   << ".\nCurrent cache structure: " << dump_structure_unlocked(hash, cache_lock);
890
5
        return &(itr->second);
891
5
    }
892
893
8.51k
    FileCacheKey key;
894
8.51k
    key.hash = hash;
895
8.51k
    key.offset = offset;
896
8.51k
    key.meta.type = context.cache_type;
897
8.51k
    key.meta.expiration_time = context.expiration_time;
898
8.51k
    key.meta.tablet_id = context.tablet_id;
899
8.51k
    FileBlockCell cell(std::make_shared<FileBlock>(key, size, this, state), cache_lock);
900
8.51k
    Status st;
901
8.51k
    if (context.expiration_time == 0 && context.cache_type == FileCacheType::TTL) {
902
0
        st = cell.file_block->change_cache_type_lock(FileCacheType::NORMAL, cache_lock);
903
8.51k
    } else if (context.cache_type != FileCacheType::TTL && context.expiration_time != 0) {
904
1
        st = cell.file_block->change_cache_type_lock(FileCacheType::TTL, cache_lock);
905
1
    }
906
8.51k
    if (!st.ok()) {
907
0
        LOG(WARNING) << "Cannot change cache type. expiration_time=" << context.expiration_time
908
0
                     << " cache_type=" << cache_type_to_string(context.cache_type)
909
0
                     << " error=" << st.msg();
910
0
    }
911
912
8.51k
    auto& queue = get_queue(cell.file_block->cache_type());
913
8.51k
    cell.queue_iterator = queue.add(hash, offset, size, cache_lock);
914
8.51k
    _lru_recorder->record_queue_event(cell.file_block->cache_type(), CacheLRULogType::ADD,
915
8.51k
                                      cell.file_block->get_hash_value(), cell.file_block->offset(),
916
8.51k
                                      cell.size());
917
918
8.51k
    if (cell.file_block->cache_type() == FileCacheType::TTL) {
919
3.92k
        _cur_ttl_size += cell.size();
920
3.92k
    }
921
8.51k
    auto [it, _] = offsets.insert(std::make_pair(offset, std::move(cell)));
922
8.51k
    _cur_cache_size += size;
923
8.51k
    return &(it->second);
924
8.51k
}
925
926
0
size_t BlockFileCache::try_release() {
927
0
    SCOPED_CACHE_LOCK(_mutex, this);
928
0
    std::vector<FileBlockCell*> trash;
929
0
    for (auto& [hash, blocks] : _files) {
930
0
        for (auto& [offset, cell] : blocks) {
931
0
            if (cell.releasable()) {
932
0
                trash.emplace_back(&cell);
933
0
            } else {
934
0
                cell.file_block->set_deleting();
935
0
            }
936
0
        }
937
0
    }
938
0
    size_t remove_size = 0;
939
0
    for (auto& cell : trash) {
940
0
        FileBlockSPtr file_block = cell->file_block;
941
0
        std::lock_guard lc(cell->file_block->_mutex);
942
0
        remove_size += file_block->range().size();
943
0
        remove(file_block, cache_lock, lc);
944
0
        VLOG_DEBUG << "try_release " << _cache_base_path
945
0
                   << " hash=" << file_block->get_hash_value().to_string()
946
0
                   << " offset=" << file_block->offset();
947
0
    }
948
0
    *_evict_by_try_release << remove_size;
949
0
    LOG(INFO) << "Released " << trash.size() << " blocks in file cache " << _cache_base_path;
950
0
    return trash.size();
951
0
}
952
953
34.9k
LRUQueue& BlockFileCache::get_queue(FileCacheType type) {
954
34.9k
    switch (type) {
955
8.39k
    case FileCacheType::INDEX:
956
8.39k
        return _index_queue;
957
8.20k
    case FileCacheType::DISPOSABLE:
958
8.20k
        return _disposable_queue;
959
12.5k
    case FileCacheType::NORMAL:
960
12.5k
        return _normal_queue;
961
5.79k
    case FileCacheType::TTL:
962
5.79k
        return _ttl_queue;
963
0
    default:
964
0
        DCHECK(false);
965
34.9k
    }
966
0
    return _normal_queue;
967
34.9k
}
968
969
137
const LRUQueue& BlockFileCache::get_queue(FileCacheType type) const {
970
137
    switch (type) {
971
33
    case FileCacheType::INDEX:
972
33
        return _index_queue;
973
1
    case FileCacheType::DISPOSABLE:
974
1
        return _disposable_queue;
975
103
    case FileCacheType::NORMAL:
976
103
        return _normal_queue;
977
0
    case FileCacheType::TTL:
978
0
        return _ttl_queue;
979
0
    default:
980
0
        DCHECK(false);
981
137
    }
982
0
    return _normal_queue;
983
137
}
984
985
void BlockFileCache::remove_file_blocks(std::vector<FileBlockCell*>& to_evict,
986
                                        std::lock_guard<std::mutex>& cache_lock, bool sync,
987
9.26k
                                        std::string& reason) {
988
9.26k
    auto remove_file_block_if = [&](FileBlockCell* cell) {
989
849
        FileBlockSPtr file_block = cell->file_block;
990
849
        if (file_block) {
991
849
            std::lock_guard block_lock(file_block->_mutex);
992
849
            remove(file_block, cache_lock, block_lock, sync);
993
849
            VLOG_DEBUG << "remove_file_blocks"
994
0
                       << " hash=" << file_block->get_hash_value().to_string()
995
0
                       << " offset=" << file_block->offset() << " reason=" << reason;
996
849
        }
997
849
    };
998
9.26k
    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
999
9.26k
}
1000
1001
void BlockFileCache::find_evict_candidates(LRUQueue& queue, size_t size, size_t cur_cache_size,
1002
                                           size_t& removed_size,
1003
                                           std::vector<FileBlockCell*>& to_evict,
1004
                                           std::lock_guard<std::mutex>& cache_lock,
1005
981
                                           size_t& cur_removed_size, bool evict_in_advance) {
1006
2.22k
    for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1007
2.22k
        if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1008
920
            break;
1009
920
        }
1010
1.30k
        auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1011
1012
1.30k
        DCHECK(cell) << "Cache became inconsistent. key: " << entry_key.to_string()
1013
0
                     << ", offset: " << entry_offset;
1014
1015
1.30k
        size_t cell_size = cell->size();
1016
1.30k
        DCHECK(entry_size == cell_size);
1017
1018
1.30k
        if (cell->releasable()) {
1019
833
            auto& file_block = cell->file_block;
1020
1021
833
            std::lock_guard block_lock(file_block->_mutex);
1022
833
            DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1023
833
            to_evict.push_back(cell);
1024
833
            removed_size += cell_size;
1025
833
            cur_removed_size += cell_size;
1026
833
        }
1027
1.30k
    }
1028
981
}
1029
1030
// 1. if async load file cache not finish
1031
//     a. evict from lru queue
1032
// 2. if ttl cache
1033
//     a. evict from disposable/normal/index queue one by one
1034
// 3. if dont reach query limit or dont have query limit
1035
//     a. evict from other queue
1036
//     b. evict from current queue
1037
//         a.1 if the data belong write, then just evict cold data
1038
// 4. if reach query limit
1039
//     a. evict from query queue
1040
//     b. evict from other queue
1041
bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const CacheContext& context,
1042
                                 size_t offset, size_t size,
1043
8.37k
                                 std::lock_guard<std::mutex>& cache_lock) {
1044
8.37k
    if (!_async_open_done) {
1045
1.00k
        return try_reserve_during_async_load(size, cache_lock);
1046
1.00k
    }
1047
    // use this strategy in scenarios where there is insufficient disk capacity or insufficient number of inodes remaining
1048
    // directly eliminate 5 times the size of the space
1049
7.37k
    if (_disk_resource_limit_mode) {
1050
1
        size = 5 * size;
1051
1
    }
1052
1053
7.37k
    auto query_context = config::enable_file_cache_query_limit &&
1054
7.37k
                                         (context.query_id.hi != 0 || context.query_id.lo != 0)
1055
7.37k
                                 ? get_query_context(context.query_id, cache_lock)
1056
7.37k
                                 : nullptr;
1057
7.37k
    if (!query_context) {
1058
7.34k
        return try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock);
1059
7.34k
    } else if (query_context->get_cache_size(cache_lock) + size <=
1060
31
               query_context->get_max_cache_size()) {
1061
16
        return try_reserve_for_lru(hash, query_context, context, offset, size, cache_lock);
1062
16
    }
1063
15
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
1064
15
                               std::chrono::steady_clock::now().time_since_epoch())
1065
15
                               .count();
1066
15
    auto& queue = get_queue(context.cache_type);
1067
15
    size_t removed_size = 0;
1068
15
    size_t ghost_remove_size = 0;
1069
15
    size_t queue_size = queue.get_capacity(cache_lock);
1070
15
    size_t cur_cache_size = _cur_cache_size;
1071
15
    size_t query_context_cache_size = query_context->get_cache_size(cache_lock);
1072
1073
15
    std::vector<LRUQueue::Iterator> ghost;
1074
15
    std::vector<FileBlockCell*> to_evict;
1075
1076
15
    size_t max_size = queue.get_max_size();
1077
48
    auto is_overflow = [&] {
1078
48
        return _disk_resource_limit_mode ? removed_size < size
1079
48
                                         : cur_cache_size + size - removed_size > _capacity ||
1080
48
                                                   (queue_size + size - removed_size > max_size) ||
1081
48
                                                   (query_context_cache_size + size -
1082
38
                                                            (removed_size + ghost_remove_size) >
1083
38
                                                    query_context->get_max_cache_size());
1084
48
    };
1085
1086
    /// Select the cache from the LRU queue held by query for expulsion.
1087
35
    for (auto iter = query_context->queue().begin(); iter != query_context->queue().end(); iter++) {
1088
33
        if (!is_overflow()) {
1089
13
            break;
1090
13
        }
1091
1092
20
        auto* cell = get_cell(iter->hash, iter->offset, cache_lock);
1093
1094
20
        if (!cell) {
1095
            /// The cache corresponding to this record may be swapped out by
1096
            /// other queries, so it has become invalid.
1097
4
            ghost.push_back(iter);
1098
4
            ghost_remove_size += iter->size;
1099
16
        } else {
1100
16
            size_t cell_size = cell->size();
1101
16
            DCHECK(iter->size == cell_size);
1102
1103
16
            if (cell->releasable()) {
1104
16
                auto& file_block = cell->file_block;
1105
1106
16
                std::lock_guard block_lock(file_block->_mutex);
1107
16
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1108
16
                to_evict.push_back(cell);
1109
16
                removed_size += cell_size;
1110
16
            }
1111
16
        }
1112
20
    }
1113
1114
16
    auto remove_file_block_if = [&](FileBlockCell* cell) {
1115
16
        FileBlockSPtr file_block = cell->file_block;
1116
16
        if (file_block) {
1117
16
            query_context->remove(file_block->get_hash_value(), file_block->offset(), cache_lock);
1118
16
            std::lock_guard block_lock(file_block->_mutex);
1119
16
            remove(file_block, cache_lock, block_lock);
1120
16
        }
1121
16
    };
1122
1123
15
    for (auto& iter : ghost) {
1124
4
        query_context->remove(iter->hash, iter->offset, cache_lock);
1125
4
    }
1126
1127
15
    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
1128
1129
15
    if (is_overflow() &&
1130
15
        !try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock)) {
1131
1
        return false;
1132
1
    }
1133
14
    query_context->reserve(hash, offset, size, cache_lock);
1134
14
    return true;
1135
15
}
1136
1137
1
void BlockFileCache::try_evict_in_advance(size_t size, std::lock_guard<std::mutex>& cache_lock) {
1138
1
    UInt128Wrapper hash = UInt128Wrapper();
1139
1
    size_t offset = 0;
1140
1
    CacheContext context;
1141
    /* we pick NORMAL and TTL cache to evict in advance
1142
     * we reserve for them but won't acutually give space to them
1143
     * on the contrary, NORMAL and TTL may sacrifice by LRU evicting themselves
1144
     * other cache types cannot be exempted because we will evict what they have stolen before LRU evicting
1145
     * in summary: all cache types will shrink somewhat, and NORMAL and TTL shrink the most, to make sure the cache is not full
1146
     */
1147
1
    context.cache_type = FileCacheType::NORMAL;
1148
1
    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
1149
1
    context.cache_type = FileCacheType::TTL;
1150
1
    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
1151
1
}
1152
1153
// remove specific cache synchronously, for critical operations
1154
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
1155
7
void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) {
1156
7
    std::string reason = "remove_if_cached";
1157
7
    SCOPED_CACHE_LOCK(_mutex, this);
1158
7
    auto iter = _files.find(file_key);
1159
7
    std::vector<FileBlockCell*> to_remove;
1160
7
    if (iter != _files.end()) {
1161
14
        for (auto& [_, cell] : iter->second) {
1162
14
            if (cell.releasable()) {
1163
13
                to_remove.push_back(&cell);
1164
13
            } else {
1165
1
                cell.file_block->set_deleting();
1166
1
            }
1167
14
        }
1168
6
    }
1169
7
    remove_file_blocks(to_remove, cache_lock, true, reason);
1170
7
}
1171
1172
// the async version of remove_if_cached, for background operations
1173
// cache meta is deleted synchronously if not in use, and the block file is deleted asynchronously
1174
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
1175
1
void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) {
1176
1
    std::string reason = "remove_if_cached_async";
1177
1
    SCOPED_CACHE_LOCK(_mutex, this);
1178
1179
1
    auto iter = _files.find(file_key);
1180
1
    std::vector<FileBlockCell*> to_remove;
1181
1
    if (iter != _files.end()) {
1182
1
        for (auto& [_, cell] : iter->second) {
1183
1
            *_gc_evict_bytes_metrics << cell.size();
1184
1
            *_gc_evict_count_metrics << 1;
1185
1
            if (cell.releasable()) {
1186
0
                to_remove.push_back(&cell);
1187
1
            } else {
1188
1
                cell.file_block->set_deleting();
1189
1
            }
1190
1
        }
1191
1
    }
1192
1
    remove_file_blocks(to_remove, cache_lock, false, reason);
1193
1
}
1194
1195
std::vector<FileCacheType> BlockFileCache::get_other_cache_type_without_ttl(
1196
7.36k
        FileCacheType cur_cache_type) {
1197
7.36k
    switch (cur_cache_type) {
1198
3.91k
    case FileCacheType::TTL:
1199
3.91k
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
1200
671
    case FileCacheType::INDEX:
1201
671
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL};
1202
2.16k
    case FileCacheType::NORMAL:
1203
2.16k
        return {FileCacheType::DISPOSABLE, FileCacheType::INDEX};
1204
617
    case FileCacheType::DISPOSABLE:
1205
617
        return {FileCacheType::NORMAL, FileCacheType::INDEX};
1206
0
    default:
1207
0
        return {};
1208
7.36k
    }
1209
0
    return {};
1210
7.36k
}
1211
1212
889
std::vector<FileCacheType> BlockFileCache::get_other_cache_type(FileCacheType cur_cache_type) {
1213
889
    switch (cur_cache_type) {
1214
283
    case FileCacheType::TTL:
1215
283
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
1216
143
    case FileCacheType::INDEX:
1217
143
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::TTL};
1218
311
    case FileCacheType::NORMAL:
1219
311
        return {FileCacheType::DISPOSABLE, FileCacheType::INDEX, FileCacheType::TTL};
1220
152
    case FileCacheType::DISPOSABLE:
1221
152
        return {FileCacheType::NORMAL, FileCacheType::INDEX, FileCacheType::TTL};
1222
0
    default:
1223
0
        return {};
1224
889
    }
1225
0
    return {};
1226
889
}
1227
1228
void BlockFileCache::reset_range(const UInt128Wrapper& hash, size_t offset, size_t old_size,
1229
4
                                 size_t new_size, std::lock_guard<std::mutex>& cache_lock) {
1230
4
    DCHECK(_files.find(hash) != _files.end() &&
1231
4
           _files.find(hash)->second.find(offset) != _files.find(hash)->second.end());
1232
4
    FileBlockCell* cell = get_cell(hash, offset, cache_lock);
1233
4
    DCHECK(cell != nullptr);
1234
4
    if (cell == nullptr) {
1235
0
        LOG(WARNING) << "reset_range skipped because cache cell is missing. hash="
1236
0
                     << hash.to_string() << " offset=" << offset << " old_size=" << old_size
1237
0
                     << " new_size=" << new_size;
1238
0
        return;
1239
0
    }
1240
4
    if (cell->queue_iterator) {
1241
4
        auto& queue = get_queue(cell->file_block->cache_type());
1242
4
        DCHECK(queue.contains(hash, offset, cache_lock));
1243
4
        queue.resize(*cell->queue_iterator, new_size, cache_lock);
1244
4
        _lru_recorder->record_queue_event(cell->file_block->cache_type(), CacheLRULogType::RESIZE,
1245
4
                                          cell->file_block->get_hash_value(),
1246
4
                                          cell->file_block->offset(), new_size);
1247
4
    }
1248
4
    _cur_cache_size -= old_size;
1249
4
    _cur_cache_size += new_size;
1250
4
}
1251
1252
bool BlockFileCache::try_reserve_from_other_queue_by_time_interval(
1253
        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
1254
7.36k
        int64_t cur_time, std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
1255
7.36k
    size_t removed_size = 0;
1256
7.36k
    size_t cur_cache_size = _cur_cache_size;
1257
7.36k
    std::vector<FileBlockCell*> to_evict;
1258
18.6k
    for (FileCacheType cache_type : other_cache_types) {
1259
18.6k
        auto& queue = get_queue(cache_type);
1260
18.6k
        size_t remove_size_per_type = 0;
1261
18.6k
        for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1262
1.43k
            if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1263
697
                break;
1264
697
            }
1265
738
            auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1266
738
            DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
1267
0
                         << ", offset: " << entry_offset;
1268
1269
738
            size_t cell_size = cell->size();
1270
738
            DCHECK(entry_size == cell_size);
1271
1272
738
            if (cell->atime == 0 ? true : cell->atime + queue.get_hot_data_interval() > cur_time) {
1273
736
                break;
1274
736
            }
1275
1276
2
            if (cell->releasable()) {
1277
2
                auto& file_block = cell->file_block;
1278
2
                std::lock_guard block_lock(file_block->_mutex);
1279
2
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1280
2
                to_evict.push_back(cell);
1281
2
                removed_size += cell_size;
1282
2
                remove_size_per_type += cell_size;
1283
2
            }
1284
2
        }
1285
18.6k
        *(_evict_by_time_metrics_matrix[cache_type][cur_type]) << remove_size_per_type;
1286
18.6k
    }
1287
7.36k
    bool is_sync_removal = !evict_in_advance;
1288
7.36k
    std::string reason = std::string("try_reserve_by_time ") +
1289
7.36k
                         " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1290
7.36k
    remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1291
1292
7.36k
    return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
1293
7.36k
}
1294
1295
bool BlockFileCache::is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size,
1296
11.9k
                                 bool evict_in_advance) const {
1297
11.9k
    bool ret = false;
1298
11.9k
    if (evict_in_advance) { // we don't need to check _need_evict_cache_in_advance
1299
11
        ret = (removed_size < need_size);
1300
11
        return ret;
1301
11
    }
1302
11.9k
    if (_disk_resource_limit_mode) {
1303
4
        ret = (removed_size < need_size);
1304
11.8k
    } else {
1305
11.8k
        ret = (cur_cache_size + need_size - removed_size > _capacity);
1306
11.8k
    }
1307
11.9k
    return ret;
1308
11.9k
}
1309
1310
bool BlockFileCache::try_reserve_from_other_queue_by_size(
1311
        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
1312
413
        std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
1313
413
    size_t removed_size = 0;
1314
413
    size_t cur_cache_size = _cur_cache_size;
1315
413
    std::vector<FileBlockCell*> to_evict;
1316
    // we follow the privilege defined in get_other_cache_types to evict
1317
1.23k
    for (FileCacheType cache_type : other_cache_types) {
1318
1.23k
        auto& queue = get_queue(cache_type);
1319
1320
        // we will not drain each of them to the bottom -- i.e., we only
1321
        // evict what they have stolen.
1322
1.23k
        size_t cur_queue_size = queue.get_capacity(cache_lock);
1323
1.23k
        size_t cur_queue_max_size = queue.get_max_size();
1324
1.23k
        if (cur_queue_size <= cur_queue_max_size) {
1325
735
            continue;
1326
735
        }
1327
504
        size_t cur_removed_size = 0;
1328
504
        find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
1329
504
                              cur_removed_size, evict_in_advance);
1330
504
        *(_evict_by_size_metrics_matrix[cache_type][cur_type]) << cur_removed_size;
1331
504
    }
1332
413
    bool is_sync_removal = !evict_in_advance;
1333
413
    std::string reason = std::string("try_reserve_by_size") +
1334
413
                         " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1335
413
    remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1336
413
    return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
1337
413
}
1338
1339
bool BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type, size_t size,
1340
                                                  int64_t cur_time,
1341
                                                  std::lock_guard<std::mutex>& cache_lock,
1342
7.36k
                                                  bool evict_in_advance) {
1343
    // currently, TTL cache is not considered as a candidate
1344
7.36k
    auto other_cache_types = get_other_cache_type_without_ttl(cur_cache_type);
1345
7.36k
    bool reserve_success = try_reserve_from_other_queue_by_time_interval(
1346
7.36k
            cur_cache_type, other_cache_types, size, cur_time, cache_lock, evict_in_advance);
1347
7.36k
    if (reserve_success || !config::file_cache_enable_evict_from_other_queue_by_size) {
1348
6.47k
        return reserve_success;
1349
6.47k
    }
1350
1351
889
    other_cache_types = get_other_cache_type(cur_cache_type);
1352
889
    auto& cur_queue = get_queue(cur_cache_type);
1353
889
    size_t cur_queue_size = cur_queue.get_capacity(cache_lock);
1354
889
    size_t cur_queue_max_size = cur_queue.get_max_size();
1355
    // Hit the soft limit by self, cannot remove from other queues
1356
889
    if (_cur_cache_size + size > _capacity && cur_queue_size + size > cur_queue_max_size) {
1357
476
        return false;
1358
476
    }
1359
413
    return try_reserve_from_other_queue_by_size(cur_cache_type, other_cache_types, size, cache_lock,
1360
413
                                                evict_in_advance);
1361
889
}
1362
1363
bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash,
1364
                                         QueryFileCacheContextPtr query_context,
1365
                                         const CacheContext& context, size_t offset, size_t size,
1366
                                         std::lock_guard<std::mutex>& cache_lock,
1367
7.36k
                                         bool evict_in_advance) {
1368
7.36k
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
1369
7.36k
                               std::chrono::steady_clock::now().time_since_epoch())
1370
7.36k
                               .count();
1371
7.36k
    if (!try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock,
1372
7.36k
                                      evict_in_advance)) {
1373
477
        auto& queue = get_queue(context.cache_type);
1374
477
        size_t removed_size = 0;
1375
477
        size_t cur_cache_size = _cur_cache_size;
1376
1377
477
        std::vector<FileBlockCell*> to_evict;
1378
477
        size_t cur_removed_size = 0;
1379
477
        find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
1380
477
                              cur_removed_size, evict_in_advance);
1381
477
        bool is_sync_removal = !evict_in_advance;
1382
477
        std::string reason = std::string("try_reserve for cache type ") +
1383
477
                             cache_type_to_string(context.cache_type) +
1384
477
                             " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1385
477
        remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1386
477
        *(_evict_by_self_lru_metrics_matrix[context.cache_type]) << cur_removed_size;
1387
1388
477
        if (is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1389
61
            return false;
1390
61
        }
1391
477
    }
1392
1393
7.30k
    if (query_context) {
1394
16
        query_context->reserve(hash, offset, size, cache_lock);
1395
16
    }
1396
7.30k
    return true;
1397
7.36k
}
1398
1399
template <class T, class U>
1400
    requires IsXLock<T> && IsXLock<U>
1401
2.91k
void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lock, bool sync) {
1402
2.91k
    auto hash = file_block->get_hash_value();
1403
2.91k
    auto offset = file_block->offset();
1404
2.91k
    auto type = file_block->cache_type();
1405
2.91k
    auto expiration_time = file_block->expiration_time();
1406
2.91k
    auto tablet_id = file_block->tablet_id();
1407
2.91k
    auto* cell = get_cell(hash, offset, cache_lock);
1408
2.91k
    file_block->cell = nullptr;
1409
2.91k
    DCHECK(cell);
1410
2.91k
    DCHECK(cell->queue_iterator);
1411
2.91k
    if (cell->queue_iterator) {
1412
2.91k
        auto& queue = get_queue(file_block->cache_type());
1413
2.91k
        queue.remove(*cell->queue_iterator, cache_lock);
1414
2.91k
        _lru_recorder->record_queue_event(file_block->cache_type(), CacheLRULogType::REMOVE,
1415
2.91k
                                          cell->file_block->get_hash_value(),
1416
2.91k
                                          cell->file_block->offset(), cell->size());
1417
2.91k
    }
1418
2.91k
    *_queue_evict_size_metrics[static_cast<int>(file_block->cache_type())]
1419
2.91k
            << file_block->range().size();
1420
2.91k
    *_total_evict_size_metrics << file_block->range().size();
1421
1422
2.91k
    VLOG_DEBUG << "Removing file block from cache. hash: " << hash.to_string()
1423
0
               << ", offset: " << offset << ", size: " << file_block->range().size()
1424
0
               << ", type: " << cache_type_to_string(type);
1425
1426
2.91k
    if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADED) {
1427
927
        FileCacheKey key;
1428
927
        key.hash = hash;
1429
927
        key.offset = offset;
1430
927
        key.meta.type = type;
1431
927
        key.meta.expiration_time = expiration_time;
1432
927
        key.meta.tablet_id = tablet_id;
1433
927
        if (sync) {
1434
873
            int64_t duration_ns = 0;
1435
873
            Status st;
1436
873
            {
1437
873
                SCOPED_RAW_TIMER(&duration_ns);
1438
873
                st = _storage->remove(key);
1439
873
            }
1440
873
            *_storage_sync_remove_latency_us << (duration_ns / 1000);
1441
873
            if (!st.ok()) {
1442
167
                LOG_WARNING("").error(st);
1443
167
            }
1444
873
        } else {
1445
            // the file will be deleted in the bottom half
1446
            // so there will be a window that the file is not in the cache but still in the storage
1447
            // but it's ok, because the rowset is stale already
1448
54
            bool ret = _recycle_keys.enqueue(key);
1449
54
            if (ret) [[likely]] {
1450
54
                *_recycle_keys_length_recorder << _recycle_keys.size_approx();
1451
54
            } else {
1452
0
                LOG_WARNING("Failed to push recycle key to queue, do it synchronously");
1453
0
                int64_t duration_ns = 0;
1454
0
                Status st;
1455
0
                {
1456
0
                    SCOPED_RAW_TIMER(&duration_ns);
1457
0
                    st = _storage->remove(key);
1458
0
                }
1459
0
                *_storage_retry_sync_remove_latency_us << (duration_ns / 1000);
1460
0
                if (!st.ok()) {
1461
0
                    LOG_WARNING("").error(st);
1462
0
                }
1463
0
            }
1464
54
        }
1465
1.98k
    } else if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADING) {
1466
0
        file_block->set_deleting();
1467
0
        return;
1468
0
    }
1469
2.91k
    _cur_cache_size -= file_block->range().size();
1470
2.91k
    if (FileCacheType::TTL == type) {
1471
1.29k
        _cur_ttl_size -= file_block->range().size();
1472
1.29k
    }
1473
2.91k
    auto it = _files.find(hash);
1474
2.91k
    if (it != _files.end()) {
1475
2.91k
        it->second.erase(file_block->offset());
1476
2.91k
        if (it->second.empty()) {
1477
832
            _files.erase(hash);
1478
832
        }
1479
2.91k
    }
1480
2.91k
    *_num_removed_blocks << 1;
1481
2.91k
}
1482
1483
46
size_t BlockFileCache::get_used_cache_size(FileCacheType cache_type) const {
1484
46
    SCOPED_CACHE_LOCK(_mutex, this);
1485
46
    return get_used_cache_size_unlocked(cache_type, cache_lock);
1486
46
}
1487
1488
size_t BlockFileCache::get_used_cache_size_unlocked(FileCacheType cache_type,
1489
46
                                                    std::lock_guard<std::mutex>& cache_lock) const {
1490
46
    return get_queue(cache_type).get_capacity(cache_lock);
1491
46
}
1492
1493
0
size_t BlockFileCache::get_available_cache_size(FileCacheType cache_type) const {
1494
0
    SCOPED_CACHE_LOCK(_mutex, this);
1495
0
    return get_available_cache_size_unlocked(cache_type, cache_lock);
1496
0
}
1497
1498
size_t BlockFileCache::get_available_cache_size_unlocked(
1499
0
        FileCacheType cache_type, std::lock_guard<std::mutex>& cache_lock) const {
1500
0
    return get_queue(cache_type).get_max_element_size() -
1501
0
           get_used_cache_size_unlocked(cache_type, cache_lock);
1502
0
}
1503
1504
88
size_t BlockFileCache::get_file_blocks_num(FileCacheType cache_type) const {
1505
88
    SCOPED_CACHE_LOCK(_mutex, this);
1506
88
    return get_file_blocks_num_unlocked(cache_type, cache_lock);
1507
88
}
1508
1509
size_t BlockFileCache::get_file_blocks_num_unlocked(FileCacheType cache_type,
1510
88
                                                    std::lock_guard<std::mutex>& cache_lock) const {
1511
88
    return get_queue(cache_type).get_elements_num(cache_lock);
1512
88
}
1513
1514
FileBlockCell::FileBlockCell(FileBlockSPtr file_block, std::lock_guard<std::mutex>& cache_lock)
1515
8.51k
        : file_block(file_block) {
1516
8.51k
    file_block->cell = this;
1517
    /**
1518
     * Cell can be created with either DOWNLOADED or EMPTY file block's state.
1519
     * File block acquires DOWNLOADING state and creates LRUQueue iterator on first
1520
     * successful getOrSetDownaloder call.
1521
     */
1522
1523
8.51k
    switch (file_block->_download_state) {
1524
197
    case FileBlock::State::DOWNLOADED:
1525
8.51k
    case FileBlock::State::EMPTY:
1526
8.51k
    case FileBlock::State::SKIP_CACHE: {
1527
8.51k
        break;
1528
8.51k
    }
1529
0
    default:
1530
0
        DCHECK(false) << "Can create cell with either EMPTY, DOWNLOADED, SKIP_CACHE state, got: "
1531
0
                      << FileBlock::state_to_string(file_block->_download_state);
1532
8.51k
    }
1533
8.51k
    if (file_block->cache_type() == FileCacheType::TTL) {
1534
3.92k
        update_atime();
1535
3.92k
    }
1536
8.51k
}
1537
1538
LRUQueue::Iterator LRUQueue::add(const UInt128Wrapper& hash, size_t offset, size_t size,
1539
9.33k
                                 std::lock_guard<std::mutex>& /* cache_lock */) {
1540
9.33k
    cache_size += size;
1541
9.33k
    auto iter = queue.insert(queue.end(), FileKeyAndOffset(hash, offset, size));
1542
9.33k
    map.insert(std::make_pair(std::make_pair(hash, offset), iter));
1543
9.33k
    return iter;
1544
9.33k
}
1545
1546
0
void LRUQueue::remove_all(std::lock_guard<std::mutex>& /* cache_lock */) {
1547
0
    queue.clear();
1548
0
    map.clear();
1549
0
    cache_size = 0;
1550
0
}
1551
1552
1.40k
void LRUQueue::move_to_end(Iterator queue_it, std::lock_guard<std::mutex>& /* cache_lock */) {
1553
1.40k
    queue.splice(queue.end(), queue, queue_it);
1554
1.40k
}
1555
1556
void LRUQueue::resize(Iterator queue_it, size_t new_size,
1557
5
                      std::lock_guard<std::mutex>& /* cache_lock */) {
1558
5
    cache_size -= queue_it->size;
1559
5
    queue_it->size = new_size;
1560
5
    cache_size += new_size;
1561
5
}
1562
bool LRUQueue::contains(const UInt128Wrapper& hash, size_t offset,
1563
4
                        std::lock_guard<std::mutex>& /* cache_lock */) const {
1564
4
    return map.find(std::make_pair(hash, offset)) != map.end();
1565
4
}
1566
1567
LRUQueue::Iterator LRUQueue::get(const UInt128Wrapper& hash, size_t offset,
1568
560
                                 std::lock_guard<std::mutex>& /* cache_lock */) const {
1569
560
    auto itr = map.find(std::make_pair(hash, offset));
1570
560
    if (itr != map.end()) {
1571
186
        return itr->second;
1572
186
    }
1573
374
    return std::list<FileKeyAndOffset>::iterator();
1574
560
}
1575
1576
2
std::string LRUQueue::to_string(std::lock_guard<std::mutex>& /* cache_lock */) const {
1577
2
    std::string result;
1578
18
    for (const auto& [hash, offset, size] : queue) {
1579
18
        if (!result.empty()) {
1580
16
            result += ", ";
1581
16
        }
1582
18
        result += fmt::format("{}: [{}, {}]", hash.to_string(), offset, offset + size - 1);
1583
18
    }
1584
2
    return result;
1585
2
}
1586
1587
size_t LRUQueue::levenshtein_distance_from(LRUQueue& base,
1588
5
                                           std::lock_guard<std::mutex>& cache_lock) {
1589
5
    std::list<FileKeyAndOffset> target_queue = this->queue;
1590
5
    std::list<FileKeyAndOffset> base_queue = base.queue;
1591
5
    std::vector<FileKeyAndOffset> vec1(target_queue.begin(), target_queue.end());
1592
5
    std::vector<FileKeyAndOffset> vec2(base_queue.begin(), base_queue.end());
1593
1594
5
    size_t m = vec1.size();
1595
5
    size_t n = vec2.size();
1596
1597
    // Create a 2D vector (matrix) to store the Levenshtein distances
1598
    // dp[i][j] will hold the distance between the first i elements of vec1 and the first j elements of vec2
1599
5
    std::vector<std::vector<size_t>> dp(m + 1, std::vector<size_t>(n + 1, 0));
1600
1601
    // Initialize the first row and column of the matrix
1602
    // The distance between an empty list and a list of length k is k (all insertions or deletions)
1603
22
    for (size_t i = 0; i <= m; ++i) {
1604
17
        dp[i][0] = i;
1605
17
    }
1606
20
    for (size_t j = 0; j <= n; ++j) {
1607
15
        dp[0][j] = j;
1608
15
    }
1609
1610
    // Fill the matrix using dynamic programming
1611
17
    for (size_t i = 1; i <= m; ++i) {
1612
38
        for (size_t j = 1; j <= n; ++j) {
1613
            // Check if the current elements of both vectors are equal
1614
26
            size_t cost = (vec1[i - 1].hash == vec2[j - 1].hash &&
1615
26
                           vec1[i - 1].offset == vec2[j - 1].offset)
1616
26
                                  ? 0
1617
26
                                  : 1;
1618
            // Calculate the minimum cost of three possible operations:
1619
            // 1. Insertion: dp[i][j-1] + 1
1620
            // 2. Deletion: dp[i-1][j] + 1
1621
            // 3. Substitution: dp[i-1][j-1] + cost (0 if elements are equal, 1 if not)
1622
26
            dp[i][j] = std::min({dp[i - 1][j] + 1, dp[i][j - 1] + 1, dp[i - 1][j - 1] + cost});
1623
26
        }
1624
12
    }
1625
    // The bottom-right cell of the matrix contains the Levenshtein distance
1626
5
    return dp[m][n];
1627
5
}
1628
1629
1
std::string BlockFileCache::dump_structure(const UInt128Wrapper& hash) {
1630
1
    SCOPED_CACHE_LOCK(_mutex, this);
1631
1
    return dump_structure_unlocked(hash, cache_lock);
1632
1
}
1633
1634
std::string BlockFileCache::dump_structure_unlocked(const UInt128Wrapper& hash,
1635
1
                                                    std::lock_guard<std::mutex>&) {
1636
1
    std::stringstream result;
1637
1
    auto it = _files.find(hash);
1638
1
    if (it == _files.end()) {
1639
0
        return std::string("");
1640
0
    }
1641
1
    const auto& cells_by_offset = it->second;
1642
1643
1
    for (const auto& [_, cell] : cells_by_offset) {
1644
1
        result << cell.file_block->get_info_for_log() << " "
1645
1
               << cache_type_to_string(cell.file_block->cache_type()) << "\n";
1646
1
    }
1647
1648
1
    return result.str();
1649
1
}
1650
1651
0
std::string BlockFileCache::dump_single_cache_type(const UInt128Wrapper& hash, size_t offset) {
1652
0
    SCOPED_CACHE_LOCK(_mutex, this);
1653
0
    return dump_single_cache_type_unlocked(hash, offset, cache_lock);
1654
0
}
1655
1656
std::string BlockFileCache::dump_single_cache_type_unlocked(const UInt128Wrapper& hash,
1657
                                                            size_t offset,
1658
0
                                                            std::lock_guard<std::mutex>&) {
1659
0
    std::stringstream result;
1660
0
    auto it = _files.find(hash);
1661
0
    if (it == _files.end()) {
1662
0
        return std::string("");
1663
0
    }
1664
0
    const auto& cells_by_offset = it->second;
1665
0
    const auto& cell = cells_by_offset.find(offset);
1666
1667
0
    return cache_type_to_string(cell->second.file_block->cache_type());
1668
0
}
1669
1670
void BlockFileCache::change_cache_type(const UInt128Wrapper& hash, size_t offset,
1671
                                       FileCacheType new_type,
1672
9
                                       std::lock_guard<std::mutex>& cache_lock) {
1673
9
    if (auto iter = _files.find(hash); iter != _files.end()) {
1674
9
        auto& file_blocks = iter->second;
1675
9
        if (auto cell_it = file_blocks.find(offset); cell_it != file_blocks.end()) {
1676
8
            FileBlockCell& cell = cell_it->second;
1677
8
            auto& cur_queue = get_queue(cell.file_block->cache_type());
1678
8
            DCHECK(cell.queue_iterator.has_value());
1679
8
            cur_queue.remove(*cell.queue_iterator, cache_lock);
1680
8
            _lru_recorder->record_queue_event(
1681
8
                    cell.file_block->cache_type(), CacheLRULogType::REMOVE,
1682
8
                    cell.file_block->get_hash_value(), cell.file_block->offset(), cell.size());
1683
8
            auto& new_queue = get_queue(new_type);
1684
8
            cell.queue_iterator =
1685
8
                    new_queue.add(hash, offset, cell.file_block->range().size(), cache_lock);
1686
8
            _lru_recorder->record_queue_event(new_type, CacheLRULogType::ADD,
1687
8
                                              cell.file_block->get_hash_value(),
1688
8
                                              cell.file_block->offset(), cell.size());
1689
8
        }
1690
9
    }
1691
9
}
1692
1693
// @brief: get a path's disk capacity used percent, inode used percent
1694
// @param: path
1695
// @param: percent.first disk used percent, percent.second inode used percent
1696
156
int disk_used_percentage(const std::string& path, std::pair<int, int>* percent) {
1697
156
    struct statfs stat;
1698
156
    int ret = statfs(path.c_str(), &stat);
1699
156
    if (ret != 0) {
1700
2
        return ret;
1701
2
    }
1702
    // https://github.com/coreutils/coreutils/blob/master/src/df.c#L1195
1703
    // v->used = stat.f_blocks - stat.f_bfree
1704
    // nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail
1705
154
    uintmax_t u100 = (stat.f_blocks - stat.f_bfree) * 100;
1706
154
    uintmax_t nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail;
1707
154
    int capacity_percentage = int(u100 / nonroot_total + (u100 % nonroot_total != 0));
1708
1709
154
    unsigned long long inode_free = stat.f_ffree;
1710
154
    unsigned long long inode_total = stat.f_files;
1711
154
    int inode_percentage = cast_set<int>(inode_free * 100 / inode_total);
1712
154
    percent->first = capacity_percentage;
1713
154
    percent->second = 100 - inode_percentage;
1714
1715
    // Add sync point for testing
1716
154
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::disk_used_percentage:1", percent);
1717
1718
154
    return 0;
1719
156
}
1720
1721
10
std::string BlockFileCache::reset_capacity(size_t new_capacity) {
1722
10
    using namespace std::chrono;
1723
10
    int64_t space_released = 0;
1724
10
    size_t old_capacity = 0;
1725
10
    std::stringstream ss;
1726
10
    ss << "finish reset_capacity, path=" << _cache_base_path;
1727
10
    auto adjust_start_time = steady_clock::time_point();
1728
10
    {
1729
10
        SCOPED_CACHE_LOCK(_mutex, this);
1730
10
        if (new_capacity < _capacity && new_capacity < _cur_cache_size) {
1731
1
            int64_t need_remove_size = _cur_cache_size - new_capacity;
1732
4
            auto remove_blocks = [&](LRUQueue& queue) -> int64_t {
1733
4
                int64_t queue_released = 0;
1734
4
                std::vector<FileBlockCell*> to_evict;
1735
13
                for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1736
13
                    if (need_remove_size <= 0) {
1737
1
                        break;
1738
1
                    }
1739
12
                    need_remove_size -= entry_size;
1740
12
                    space_released += entry_size;
1741
12
                    queue_released += entry_size;
1742
12
                    auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1743
12
                    if (!cell->releasable()) {
1744
0
                        cell->file_block->set_deleting();
1745
0
                        continue;
1746
0
                    }
1747
12
                    to_evict.push_back(cell);
1748
12
                }
1749
12
                for (auto& cell : to_evict) {
1750
12
                    FileBlockSPtr file_block = cell->file_block;
1751
12
                    std::lock_guard block_lock(file_block->_mutex);
1752
12
                    remove(file_block, cache_lock, block_lock);
1753
12
                }
1754
4
                return queue_released;
1755
4
            };
1756
1
            int64_t queue_released = remove_blocks(_disposable_queue);
1757
1
            ss << " disposable_queue released " << queue_released;
1758
1
            queue_released = remove_blocks(_normal_queue);
1759
1
            ss << " normal_queue released " << queue_released;
1760
1
            queue_released = remove_blocks(_index_queue);
1761
1
            ss << " index_queue released " << queue_released;
1762
1
            queue_released = remove_blocks(_ttl_queue);
1763
1
            ss << " ttl_queue released " << queue_released;
1764
1765
1
            _disk_resource_limit_mode = true;
1766
1
            _disk_limit_mode_metrics->set_value(1);
1767
1
            ss << " total_space_released=" << space_released;
1768
1
        }
1769
10
        old_capacity = _capacity;
1770
10
        _capacity = new_capacity;
1771
10
        _cache_capacity_metrics->set_value(_capacity);
1772
10
    }
1773
10
    auto use_time = duration_cast<milliseconds>(steady_clock::time_point() - adjust_start_time);
1774
10
    LOG(INFO) << "Finish tag deleted block. path=" << _cache_base_path
1775
10
              << " use_time=" << cast_set<int64_t>(use_time.count());
1776
10
    ss << " old_capacity=" << old_capacity << " new_capacity=" << new_capacity;
1777
10
    LOG(INFO) << ss.str();
1778
10
    return ss.str();
1779
10
}
1780
1781
1.08k
void BlockFileCache::check_disk_resource_limit() {
1782
1.08k
    if (_storage->get_type() != FileCacheStorageType::DISK) {
1783
943
        return;
1784
943
    }
1785
1786
142
    bool previous_mode = _disk_resource_limit_mode;
1787
142
    if (_capacity > _cur_cache_size) {
1788
140
        _disk_resource_limit_mode = false;
1789
140
        _disk_limit_mode_metrics->set_value(0);
1790
140
    }
1791
142
    std::pair<int, int> percent;
1792
142
    int ret = disk_used_percentage(_cache_base_path, &percent);
1793
142
    if (ret != 0) {
1794
1
        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
1795
1
        return;
1796
1
    }
1797
141
    auto [space_percentage, inode_percentage] = percent;
1798
282
    auto is_insufficient = [](const int& percentage) {
1799
282
        return percentage >= config::file_cache_enter_disk_resource_limit_mode_percent;
1800
282
    };
1801
141
    DCHECK_GE(space_percentage, 0);
1802
141
    DCHECK_LE(space_percentage, 100);
1803
141
    DCHECK_GE(inode_percentage, 0);
1804
141
    DCHECK_LE(inode_percentage, 100);
1805
    // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
1806
    // FIXME: reject with config validator
1807
141
    if (config::file_cache_enter_disk_resource_limit_mode_percent <
1808
141
        config::file_cache_exit_disk_resource_limit_mode_percent) {
1809
1
        LOG_WARNING("config error, set to default value")
1810
1
                .tag("enter", config::file_cache_enter_disk_resource_limit_mode_percent)
1811
1
                .tag("exit", config::file_cache_exit_disk_resource_limit_mode_percent);
1812
1
        config::file_cache_enter_disk_resource_limit_mode_percent = 88;
1813
1
        config::file_cache_exit_disk_resource_limit_mode_percent = 80;
1814
1
    }
1815
141
    bool is_space_insufficient = is_insufficient(space_percentage);
1816
141
    bool is_inode_insufficient = is_insufficient(inode_percentage);
1817
141
    if (is_space_insufficient || is_inode_insufficient) {
1818
1
        _disk_resource_limit_mode = true;
1819
1
        _disk_limit_mode_metrics->set_value(1);
1820
140
    } else if (_disk_resource_limit_mode &&
1821
140
               (space_percentage < config::file_cache_exit_disk_resource_limit_mode_percent) &&
1822
140
               (inode_percentage < config::file_cache_exit_disk_resource_limit_mode_percent)) {
1823
0
        _disk_resource_limit_mode = false;
1824
0
        _disk_limit_mode_metrics->set_value(0);
1825
0
    }
1826
141
    if (previous_mode != _disk_resource_limit_mode) {
1827
        // add log for disk resource limit mode switching
1828
2
        if (_disk_resource_limit_mode) {
1829
1
            LOG(WARNING) << "Entering disk resource limit mode: file_cache=" << get_base_path()
1830
1
                         << " space_percent=" << space_percentage
1831
1
                         << " inode_percent=" << inode_percentage
1832
1
                         << " is_space_insufficient=" << is_space_insufficient
1833
1
                         << " is_inode_insufficient=" << is_inode_insufficient
1834
1
                         << " enter threshold="
1835
1
                         << config::file_cache_enter_disk_resource_limit_mode_percent;
1836
1
        } else {
1837
1
            LOG(INFO) << "Exiting disk resource limit mode: file_cache=" << get_base_path()
1838
1
                      << " space_percent=" << space_percentage
1839
1
                      << " inode_percent=" << inode_percentage << " exit threshold="
1840
1
                      << config::file_cache_exit_disk_resource_limit_mode_percent;
1841
1
        }
1842
139
    } else if (_disk_resource_limit_mode) {
1843
        // print log for disk resource limit mode running, but less frequently
1844
0
        LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path()
1845
0
                                 << " space_percent=" << space_percentage
1846
0
                                 << " inode_percent=" << inode_percentage
1847
0
                                 << " is_space_insufficient=" << is_space_insufficient
1848
0
                                 << " is_inode_insufficient=" << is_inode_insufficient
1849
0
                                 << " mode run in resource limit";
1850
0
    }
1851
141
}
1852
1853
15
void BlockFileCache::check_need_evict_cache_in_advance() {
1854
15
    if (_storage->get_type() != FileCacheStorageType::DISK) {
1855
1
        return;
1856
1
    }
1857
1858
14
    std::pair<int, int> percent;
1859
14
    int ret = disk_used_percentage(_cache_base_path, &percent);
1860
14
    if (ret != 0) {
1861
1
        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
1862
1
        return;
1863
1
    }
1864
13
    auto [space_percentage, inode_percentage] = percent;
1865
13
    int size_percentage = static_cast<int>(_cur_cache_size * 100 / _capacity);
1866
39
    auto is_insufficient = [](const int& percentage) {
1867
39
        return percentage >= config::file_cache_enter_need_evict_cache_in_advance_percent;
1868
39
    };
1869
13
    DCHECK_GE(space_percentage, 0);
1870
13
    DCHECK_LE(space_percentage, 100);
1871
13
    DCHECK_GE(inode_percentage, 0);
1872
13
    DCHECK_LE(inode_percentage, 100);
1873
    // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
1874
    // FIXME: reject with config validator
1875
13
    if (config::file_cache_enter_need_evict_cache_in_advance_percent <=
1876
13
        config::file_cache_exit_need_evict_cache_in_advance_percent) {
1877
1
        LOG_WARNING("config error, set to default value")
1878
1
                .tag("enter", config::file_cache_enter_need_evict_cache_in_advance_percent)
1879
1
                .tag("exit", config::file_cache_exit_need_evict_cache_in_advance_percent);
1880
1
        config::file_cache_enter_need_evict_cache_in_advance_percent = 78;
1881
1
        config::file_cache_exit_need_evict_cache_in_advance_percent = 75;
1882
1
    }
1883
13
    bool previous_mode = _need_evict_cache_in_advance;
1884
13
    bool is_space_insufficient = is_insufficient(space_percentage);
1885
13
    bool is_inode_insufficient = is_insufficient(inode_percentage);
1886
13
    bool is_size_insufficient = is_insufficient(size_percentage);
1887
13
    if (is_space_insufficient || is_inode_insufficient || is_size_insufficient) {
1888
9
        _need_evict_cache_in_advance = true;
1889
9
        _need_evict_cache_in_advance_metrics->set_value(1);
1890
9
    } else if (_need_evict_cache_in_advance &&
1891
4
               (space_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
1892
4
               (inode_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
1893
4
               (size_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent)) {
1894
1
        _need_evict_cache_in_advance = false;
1895
1
        _need_evict_cache_in_advance_metrics->set_value(0);
1896
1
    }
1897
13
    if (previous_mode != _need_evict_cache_in_advance) {
1898
        // add log for evict cache in advance mode switching
1899
6
        if (_need_evict_cache_in_advance) {
1900
5
            LOG(WARNING) << "Entering evict cache in advance mode: "
1901
5
                         << "file_cache=" << get_base_path()
1902
5
                         << " space_percent=" << space_percentage
1903
5
                         << " inode_percent=" << inode_percentage
1904
5
                         << " size_percent=" << size_percentage
1905
5
                         << " is_space_insufficient=" << is_space_insufficient
1906
5
                         << " is_inode_insufficient=" << is_inode_insufficient
1907
5
                         << " is_size_insufficient=" << is_size_insufficient << " enter threshold="
1908
5
                         << config::file_cache_enter_need_evict_cache_in_advance_percent;
1909
5
        } else {
1910
1
            LOG(INFO) << "Exiting evict cache in advance mode: "
1911
1
                      << "file_cache=" << get_base_path() << " space_percent=" << space_percentage
1912
1
                      << " inode_percent=" << inode_percentage
1913
1
                      << " size_percent=" << size_percentage << " exit threshold="
1914
1
                      << config::file_cache_exit_need_evict_cache_in_advance_percent;
1915
1
        }
1916
7
    } else if (_need_evict_cache_in_advance) {
1917
        // print log for evict cache in advance mode running, but less frequently
1918
4
        LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path()
1919
1
                                 << " space_percent=" << space_percentage
1920
1
                                 << " inode_percent=" << inode_percentage
1921
1
                                 << " size_percent=" << size_percentage
1922
1
                                 << " is_space_insufficient=" << is_space_insufficient
1923
1
                                 << " is_inode_insufficient=" << is_inode_insufficient
1924
1
                                 << " is_size_insufficient=" << is_size_insufficient
1925
1
                                 << " need evict cache in advance";
1926
4
    }
1927
13
}
1928
1929
139
void BlockFileCache::run_background_monitor() {
1930
139
    Thread::set_self_name("run_background_monitor");
1931
1.08k
    while (!_close) {
1932
1.08k
        int64_t interval_ms = config::file_cache_background_monitor_interval_ms;
1933
1.08k
        TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_ms);
1934
1.08k
        check_disk_resource_limit();
1935
1.08k
        if (config::enable_evict_file_cache_in_advance) {
1936
7
            check_need_evict_cache_in_advance();
1937
1.07k
        } else {
1938
1.07k
            _need_evict_cache_in_advance = false;
1939
1.07k
            _need_evict_cache_in_advance_metrics->set_value(0);
1940
1.07k
        }
1941
1942
1.08k
        {
1943
1.08k
            std::unique_lock close_lock(_close_mtx);
1944
1.08k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
1945
1.08k
            if (_close) {
1946
139
                break;
1947
139
            }
1948
1.08k
        }
1949
        // report
1950
946
        {
1951
946
            SCOPED_CACHE_LOCK(_mutex, this);
1952
946
            _cur_cache_size_metrics->set_value(_cur_cache_size);
1953
946
            _cur_ttl_cache_size_metrics->set_value(_cur_cache_size -
1954
946
                                                   _index_queue.get_capacity(cache_lock) -
1955
946
                                                   _normal_queue.get_capacity(cache_lock) -
1956
946
                                                   _disposable_queue.get_capacity(cache_lock));
1957
946
            _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(
1958
946
                    _ttl_queue.get_capacity(cache_lock));
1959
946
            _cur_ttl_cache_lru_queue_element_count_metrics->set_value(
1960
946
                    _ttl_queue.get_elements_num(cache_lock));
1961
946
            _cur_normal_queue_cache_size_metrics->set_value(_normal_queue.get_capacity(cache_lock));
1962
946
            _cur_normal_queue_element_count_metrics->set_value(
1963
946
                    _normal_queue.get_elements_num(cache_lock));
1964
946
            _cur_index_queue_cache_size_metrics->set_value(_index_queue.get_capacity(cache_lock));
1965
946
            _cur_index_queue_element_count_metrics->set_value(
1966
946
                    _index_queue.get_elements_num(cache_lock));
1967
946
            _cur_disposable_queue_cache_size_metrics->set_value(
1968
946
                    _disposable_queue.get_capacity(cache_lock));
1969
946
            _cur_disposable_queue_element_count_metrics->set_value(
1970
946
                    _disposable_queue.get_elements_num(cache_lock));
1971
1972
            // Update meta store write queue size if storage is FSFileCacheStorage
1973
946
            if (_storage->get_type() == FileCacheStorageType::DISK) {
1974
18
                auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get());
1975
18
                if (fs_storage != nullptr) {
1976
18
                    auto* meta_store = fs_storage->get_meta_store();
1977
18
                    if (meta_store != nullptr) {
1978
18
                        _meta_store_write_queue_size_metrics->set_value(
1979
18
                                meta_store->get_write_queue_size());
1980
18
                    }
1981
18
                }
1982
18
            }
1983
1984
946
            if (_num_read_blocks->get_value() > 0) {
1985
13
                _hit_ratio->set_value((double)_num_hit_blocks->get_value() /
1986
13
                                      (double)_num_read_blocks->get_value());
1987
13
            }
1988
946
            if (_num_read_blocks_5m && _num_read_blocks_5m->get_value() > 0) {
1989
0
                _hit_ratio_5m->set_value((double)_num_hit_blocks_5m->get_value() /
1990
0
                                         (double)_num_read_blocks_5m->get_value());
1991
0
            }
1992
946
            if (_num_read_blocks_1h && _num_read_blocks_1h->get_value() > 0) {
1993
0
                _hit_ratio_1h->set_value((double)_num_hit_blocks_1h->get_value() /
1994
0
                                         (double)_num_read_blocks_1h->get_value());
1995
0
            }
1996
1997
946
            if (_no_warmup_num_hit_blocks->get_value() > 0) {
1998
2
                _no_warmup_hit_ratio->set_value((double)_no_warmup_num_hit_blocks->get_value() /
1999
2
                                                (double)_no_warmup_num_read_blocks->get_value());
2000
2
            }
2001
946
            if (_no_warmup_num_hit_blocks_5m && _no_warmup_num_hit_blocks_5m->get_value() > 0) {
2002
0
                _no_warmup_hit_ratio_5m->set_value(
2003
0
                        (double)_no_warmup_num_hit_blocks_5m->get_value() /
2004
0
                        (double)_no_warmup_num_read_blocks_5m->get_value());
2005
0
            }
2006
946
            if (_no_warmup_num_hit_blocks_1h && _no_warmup_num_hit_blocks_1h->get_value() > 0) {
2007
0
                _no_warmup_hit_ratio_1h->set_value(
2008
0
                        (double)_no_warmup_num_hit_blocks_1h->get_value() /
2009
0
                        (double)_no_warmup_num_read_blocks_1h->get_value());
2010
0
            }
2011
946
        }
2012
946
    }
2013
139
}
2014
2015
139
void BlockFileCache::run_background_gc() {
2016
139
    Thread::set_self_name("run_background_gc");
2017
139
    FileCacheKey key;
2018
139
    size_t batch_count = 0;
2019
406
    while (!_close) {
2020
406
        int64_t interval_ms = config::file_cache_background_gc_interval_ms;
2021
406
        size_t batch_limit = config::file_cache_remove_block_qps_limit * interval_ms / 1000;
2022
406
        {
2023
406
            std::unique_lock close_lock(_close_mtx);
2024
406
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2025
406
            if (_close) {
2026
139
                break;
2027
139
            }
2028
406
        }
2029
2030
284
        while (batch_count < batch_limit && _recycle_keys.try_dequeue(key)) {
2031
17
            int64_t duration_ns = 0;
2032
17
            Status st;
2033
17
            {
2034
17
                SCOPED_RAW_TIMER(&duration_ns);
2035
17
                st = _storage->remove(key);
2036
17
            }
2037
17
            *_storage_async_remove_latency_us << (duration_ns / 1000);
2038
2039
17
            if (!st.ok()) {
2040
12
                LOG_WARNING("").error(st);
2041
12
            }
2042
17
            batch_count++;
2043
17
        }
2044
267
        *_recycle_keys_length_recorder << _recycle_keys.size_approx();
2045
267
        batch_count = 0;
2046
267
    }
2047
139
}
2048
2049
139
void BlockFileCache::run_background_evict_in_advance() {
2050
139
    Thread::set_self_name("run_background_evict_in_advance");
2051
139
    LOG(INFO) << "Starting background evict in advance thread";
2052
139
    int64_t batch = 0;
2053
4.88k
    while (!_close) {
2054
4.88k
        {
2055
4.88k
            std::unique_lock close_lock(_close_mtx);
2056
4.88k
            _close_cv.wait_for(
2057
4.88k
                    close_lock,
2058
4.88k
                    std::chrono::milliseconds(config::file_cache_evict_in_advance_interval_ms));
2059
4.88k
            if (_close) {
2060
139
                LOG(INFO) << "Background evict in advance thread exiting due to cache closing";
2061
139
                break;
2062
139
            }
2063
4.88k
        }
2064
4.74k
        batch = config::file_cache_evict_in_advance_batch_bytes;
2065
2066
        // Skip if eviction not needed or too many pending recycles
2067
4.74k
        if (!_need_evict_cache_in_advance ||
2068
4.74k
            _recycle_keys.size_approx() >=
2069
4.74k
                    config::file_cache_evict_in_advance_recycle_keys_num_threshold) {
2070
4.74k
            continue;
2071
4.74k
        }
2072
2073
1
        int64_t duration_ns = 0;
2074
1
        {
2075
1
            SCOPED_CACHE_LOCK(_mutex, this);
2076
1
            SCOPED_RAW_TIMER(&duration_ns);
2077
1
            try_evict_in_advance(batch, cache_lock);
2078
1
        }
2079
1
        *_evict_in_advance_latency_us << (duration_ns / 1000);
2080
1
    }
2081
139
}
2082
2083
139
void BlockFileCache::run_background_block_lru_update() {
2084
139
    Thread::set_self_name("run_background_block_lru_update");
2085
139
    std::vector<FileBlockSPtr> batch;
2086
4.75k
    while (!_close) {
2087
4.75k
        int64_t interval_ms = config::file_cache_background_block_lru_update_interval_ms;
2088
4.75k
        size_t batch_limit =
2089
4.75k
                config::file_cache_background_block_lru_update_qps_limit * interval_ms / 1000;
2090
4.75k
        {
2091
4.75k
            std::unique_lock close_lock(_close_mtx);
2092
4.75k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2093
4.75k
            if (_close) {
2094
139
                break;
2095
139
            }
2096
4.75k
        }
2097
2098
4.61k
        batch.clear();
2099
4.61k
        batch.reserve(batch_limit);
2100
4.61k
        size_t drained = _need_update_lru_blocks.drain(batch_limit, &batch);
2101
4.66k
        if (drained == 0) {
2102
4.66k
            *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2103
4.66k
            continue;
2104
4.66k
        }
2105
2106
18.4E
        int64_t duration_ns = 0;
2107
18.4E
        {
2108
18.4E
            SCOPED_CACHE_LOCK(_mutex, this);
2109
18.4E
            SCOPED_RAW_TIMER(&duration_ns);
2110
18.4E
            for (auto& block : batch) {
2111
495
                update_block_lru(block, cache_lock);
2112
495
            }
2113
18.4E
        }
2114
18.4E
        *_update_lru_blocks_latency_us << (duration_ns / 1000);
2115
18.4E
        *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2116
18.4E
    }
2117
139
}
2118
2119
std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>>
2120
2
BlockFileCache::get_hot_blocks_meta(const UInt128Wrapper& hash) const {
2121
2
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
2122
2
                               std::chrono::steady_clock::now().time_since_epoch())
2123
2
                               .count();
2124
2
    SCOPED_CACHE_LOCK(_mutex, this);
2125
2
    std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>> blocks_meta;
2126
2
    if (auto iter = _files.find(hash); iter != _files.end()) {
2127
5
        for (auto& pair : _files.find(hash)->second) {
2128
5
            const FileBlockCell* cell = &pair.second;
2129
5
            if (cell->file_block->cache_type() != FileCacheType::DISPOSABLE) {
2130
4
                if (cell->file_block->cache_type() == FileCacheType::TTL ||
2131
4
                    (cell->atime != 0 &&
2132
3
                     cur_time - cell->atime <
2133
3
                             get_queue(cell->file_block->cache_type()).get_hot_data_interval())) {
2134
3
                    blocks_meta.emplace_back(pair.first, cell->size(),
2135
3
                                             cell->file_block->cache_type(),
2136
3
                                             cell->file_block->expiration_time());
2137
3
                }
2138
4
            }
2139
5
        }
2140
2
    }
2141
2
    return blocks_meta;
2142
2
}
2143
2144
bool BlockFileCache::try_reserve_during_async_load(size_t size,
2145
1.00k
                                                   std::lock_guard<std::mutex>& cache_lock) {
2146
1.00k
    size_t removed_size = 0;
2147
1.00k
    size_t normal_queue_size = _normal_queue.get_capacity(cache_lock);
2148
1.00k
    size_t disposable_queue_size = _disposable_queue.get_capacity(cache_lock);
2149
1.00k
    size_t index_queue_size = _index_queue.get_capacity(cache_lock);
2150
2151
1.00k
    std::vector<FileBlockCell*> to_evict;
2152
1.00k
    auto collect_eliminate_fragments = [&](LRUQueue& queue) {
2153
1.00k
        for (const auto& [entry_key, entry_offset, entry_size] : queue) {
2154
1.00k
            if (!_disk_resource_limit_mode || removed_size >= size) {
2155
1.00k
                break;
2156
1.00k
            }
2157
1
            auto* cell = get_cell(entry_key, entry_offset, cache_lock);
2158
2159
1
            DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
2160
0
                         << ", offset: " << entry_offset;
2161
2162
1
            size_t cell_size = cell->size();
2163
1
            DCHECK(entry_size == cell_size);
2164
2165
1
            if (cell->releasable()) {
2166
1
                auto& file_block = cell->file_block;
2167
2168
1
                std::lock_guard block_lock(file_block->_mutex);
2169
1
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
2170
1
                to_evict.push_back(cell);
2171
1
                removed_size += cell_size;
2172
1
            }
2173
1
        }
2174
1.00k
    };
2175
1.00k
    if (disposable_queue_size != 0) {
2176
0
        collect_eliminate_fragments(get_queue(FileCacheType::DISPOSABLE));
2177
0
    }
2178
1.00k
    if (normal_queue_size != 0) {
2179
1.00k
        collect_eliminate_fragments(get_queue(FileCacheType::NORMAL));
2180
1.00k
    }
2181
1.00k
    if (index_queue_size != 0) {
2182
0
        collect_eliminate_fragments(get_queue(FileCacheType::INDEX));
2183
0
    }
2184
1.00k
    std::string reason = "async load";
2185
1.00k
    remove_file_blocks(to_evict, cache_lock, true, reason);
2186
2187
1.00k
    return !_disk_resource_limit_mode || removed_size >= size;
2188
1.00k
}
2189
2190
29
void BlockFileCache::clear_need_update_lru_blocks() {
2191
29
    _need_update_lru_blocks.clear();
2192
29
    *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2193
29
}
2194
2195
26
std::string BlockFileCache::clear_file_cache_directly() {
2196
26
    _lru_dumper->remove_lru_dump_files();
2197
26
    using namespace std::chrono;
2198
26
    std::stringstream ss;
2199
26
    auto start = steady_clock::now();
2200
26
    SCOPED_CACHE_LOCK(_mutex, this);
2201
26
    LOG_INFO("start clear_file_cache_directly").tag("path", _cache_base_path);
2202
2203
26
    std::string clear_msg;
2204
26
    auto s = _storage->clear(clear_msg);
2205
26
    if (!s.ok()) {
2206
1
        return clear_msg;
2207
1
    }
2208
2209
25
    int64_t num_files = _files.size();
2210
25
    int64_t cache_size = _cur_cache_size;
2211
25
    int64_t index_queue_size = _index_queue.get_elements_num(cache_lock);
2212
25
    int64_t normal_queue_size = _normal_queue.get_elements_num(cache_lock);
2213
25
    int64_t disposible_queue_size = _disposable_queue.get_elements_num(cache_lock);
2214
25
    int64_t ttl_queue_size = _ttl_queue.get_elements_num(cache_lock);
2215
2216
25
    int64_t clear_fd_duration = 0;
2217
25
    {
2218
        // clear FDCache to release fd
2219
25
        SCOPED_RAW_TIMER(&clear_fd_duration);
2220
25
        for (const auto& [file_key, file_blocks] : _files) {
2221
13
            for (const auto& [offset, file_block_cell] : file_blocks) {
2222
13
                AccessKeyAndOffset access_key_and_offset(file_key, offset);
2223
13
                FDCache::instance()->remove_file_reader(access_key_and_offset);
2224
13
            }
2225
2
        }
2226
25
    }
2227
2228
25
    _files.clear();
2229
25
    _cur_cache_size = 0;
2230
25
    _cur_ttl_size = 0;
2231
25
    _time_to_key.clear();
2232
25
    _key_to_time.clear();
2233
25
    _index_queue.clear(cache_lock);
2234
25
    _normal_queue.clear(cache_lock);
2235
25
    _disposable_queue.clear(cache_lock);
2236
25
    _ttl_queue.clear(cache_lock);
2237
2238
    // Synchronously update cache metrics so that information_schema.file_cache_statistics
2239
    // reflects the cleared state immediately, without waiting for the next
2240
    // run_background_monitor cycle.
2241
25
    _cur_cache_size_metrics->set_value(0);
2242
25
    _cur_ttl_cache_size_metrics->set_value(0);
2243
25
    _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(0);
2244
25
    _cur_ttl_cache_lru_queue_element_count_metrics->set_value(0);
2245
25
    _cur_normal_queue_cache_size_metrics->set_value(0);
2246
25
    _cur_normal_queue_element_count_metrics->set_value(0);
2247
25
    _cur_index_queue_cache_size_metrics->set_value(0);
2248
25
    _cur_index_queue_element_count_metrics->set_value(0);
2249
25
    _cur_disposable_queue_cache_size_metrics->set_value(0);
2250
25
    _cur_disposable_queue_element_count_metrics->set_value(0);
2251
2252
25
    clear_need_update_lru_blocks();
2253
2254
25
    ss << "finish clear_file_cache_directly"
2255
25
       << " path=" << _cache_base_path
2256
25
       << " time_elapsed_ms=" << duration_cast<milliseconds>(steady_clock::now() - start).count()
2257
25
       << " fd_clear_time_ms=" << (clear_fd_duration / 1000000) << " num_files=" << num_files
2258
25
       << " cache_size=" << cache_size << " index_queue_size=" << index_queue_size
2259
25
       << " normal_queue_size=" << normal_queue_size
2260
25
       << " disposible_queue_size=" << disposible_queue_size << "ttl_queue_size=" << ttl_queue_size;
2261
25
    auto msg = ss.str();
2262
25
    LOG(INFO) << msg;
2263
25
    _lru_dumper->remove_lru_dump_files();
2264
25
    return msg;
2265
26
}
2266
2267
2.01k
std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const UInt128Wrapper& hash) {
2268
2.01k
    std::map<size_t, FileBlockSPtr> offset_to_block;
2269
2.01k
    SCOPED_CACHE_LOCK(_mutex, this);
2270
2.01k
    if (_files.contains(hash)) {
2271
1.02k
        for (auto& [offset, cell] : _files[hash]) {
2272
1.02k
            if (cell.file_block->state() == FileBlock::State::DOWNLOADED) {
2273
1.02k
                cell.file_block->_owned_by_cached_reader = true;
2274
1.02k
                offset_to_block.emplace(offset, cell.file_block);
2275
1.02k
            }
2276
1.02k
        }
2277
1.00k
    }
2278
2.01k
    return offset_to_block;
2279
2.01k
}
2280
2281
0
void BlockFileCache::update_ttl_atime(const UInt128Wrapper& hash) {
2282
0
    SCOPED_CACHE_LOCK(_mutex, this);
2283
0
    if (auto iter = _files.find(hash); iter != _files.end()) {
2284
0
        for (auto& [_, cell] : iter->second) {
2285
0
            cell.update_atime();
2286
0
        }
2287
0
    };
2288
0
}
2289
2290
139
void BlockFileCache::run_background_lru_log_replay() {
2291
139
    Thread::set_self_name("run_background_lru_log_replay");
2292
4.88k
    while (!_close) {
2293
4.88k
        int64_t interval_ms = config::file_cache_background_lru_log_replay_interval_ms;
2294
4.88k
        {
2295
4.88k
            std::unique_lock close_lock(_close_mtx);
2296
4.88k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2297
4.88k
            if (_close) {
2298
139
                break;
2299
139
            }
2300
4.88k
        }
2301
2302
4.74k
        _lru_recorder->replay_queue_event(FileCacheType::TTL);
2303
4.74k
        _lru_recorder->replay_queue_event(FileCacheType::INDEX);
2304
4.74k
        _lru_recorder->replay_queue_event(FileCacheType::NORMAL);
2305
4.74k
        _lru_recorder->replay_queue_event(FileCacheType::DISPOSABLE);
2306
2307
4.74k
        if (config::enable_evaluate_shadow_queue_diff) {
2308
0
            SCOPED_CACHE_LOCK(_mutex, this);
2309
0
            _lru_recorder->evaluate_queue_diff(_ttl_queue, "ttl", cache_lock);
2310
0
            _lru_recorder->evaluate_queue_diff(_index_queue, "index", cache_lock);
2311
0
            _lru_recorder->evaluate_queue_diff(_normal_queue, "normal", cache_lock);
2312
0
            _lru_recorder->evaluate_queue_diff(_disposable_queue, "disposable", cache_lock);
2313
0
        }
2314
4.74k
    }
2315
139
}
2316
2317
1.55k
void BlockFileCache::dump_lru_queues(bool force) {
2318
1.55k
    std::unique_lock dump_lock(_dump_lru_queues_mtx);
2319
1.55k
    if (config::file_cache_background_lru_dump_tail_record_num > 0 &&
2320
1.55k
        !ExecEnv::GetInstance()->get_is_upgrading()) {
2321
1.55k
        _lru_dumper->dump_queue("disposable", force);
2322
1.55k
        _lru_dumper->dump_queue("normal", force);
2323
1.55k
        _lru_dumper->dump_queue("index", force);
2324
1.55k
        _lru_dumper->dump_queue("ttl", force);
2325
1.55k
        _lru_dumper->set_first_dump_done();
2326
1.55k
    }
2327
1.55k
}
2328
2329
139
void BlockFileCache::run_background_lru_dump() {
2330
139
    Thread::set_self_name("run_background_lru_dump");
2331
1.68k
    while (!_close) {
2332
1.68k
        int64_t interval_ms = config::file_cache_background_lru_dump_interval_ms;
2333
1.68k
        {
2334
1.68k
            std::unique_lock close_lock(_close_mtx);
2335
1.68k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2336
1.68k
            if (_close) {
2337
139
                break;
2338
139
            }
2339
1.68k
        }
2340
1.55k
        dump_lru_queues(false);
2341
1.55k
    }
2342
139
}
2343
2344
139
void BlockFileCache::restore_lru_queues_from_disk(std::lock_guard<std::mutex>& cache_lock) {
2345
    // keep this order coz may be duplicated in different queue, we use the first appearence
2346
139
    _lru_dumper->restore_queue(_ttl_queue, "ttl", cache_lock);
2347
139
    _lru_dumper->restore_queue(_index_queue, "index", cache_lock);
2348
139
    _lru_dumper->restore_queue(_normal_queue, "normal", cache_lock);
2349
139
    _lru_dumper->restore_queue(_disposable_queue, "disposable", cache_lock);
2350
139
}
2351
2352
0
std::map<std::string, double> BlockFileCache::get_stats() {
2353
0
    std::map<std::string, double> stats;
2354
0
    stats["hits_ratio"] = (double)_hit_ratio->get_value();
2355
0
    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
2356
0
    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
2357
2358
0
    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
2359
0
    stats["index_queue_curr_size"] = (double)_cur_index_queue_cache_size_metrics->get_value();
2360
0
    stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
2361
0
    stats["index_queue_curr_elements"] =
2362
0
            (double)_cur_index_queue_element_count_metrics->get_value();
2363
2364
0
    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
2365
0
    stats["ttl_queue_curr_size"] = (double)_cur_ttl_cache_lru_queue_cache_size_metrics->get_value();
2366
0
    stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
2367
0
    stats["ttl_queue_curr_elements"] =
2368
0
            (double)_cur_ttl_cache_lru_queue_element_count_metrics->get_value();
2369
2370
0
    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
2371
0
    stats["normal_queue_curr_size"] = (double)_cur_normal_queue_cache_size_metrics->get_value();
2372
0
    stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
2373
0
    stats["normal_queue_curr_elements"] =
2374
0
            (double)_cur_normal_queue_element_count_metrics->get_value();
2375
2376
0
    stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
2377
0
    stats["disposable_queue_curr_size"] =
2378
0
            (double)_cur_disposable_queue_cache_size_metrics->get_value();
2379
0
    stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
2380
0
    stats["disposable_queue_curr_elements"] =
2381
0
            (double)_cur_disposable_queue_element_count_metrics->get_value();
2382
2383
0
    stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance;
2384
0
    stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode;
2385
2386
0
    stats["total_removed_counts"] = (double)_num_removed_blocks->get_value();
2387
0
    stats["total_hit_counts"] = (double)_num_hit_blocks->get_value();
2388
0
    stats["total_read_counts"] = (double)_num_read_blocks->get_value();
2389
2390
0
    stats["total_read_size"] = (double)_total_read_size_metrics->get_value();
2391
0
    stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value();
2392
0
    stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value();
2393
2394
0
    return stats;
2395
0
}
2396
2397
// for be UTs
2398
172
std::map<std::string, double> BlockFileCache::get_stats_unsafe() {
2399
172
    std::map<std::string, double> stats;
2400
172
    stats["hits_ratio"] = (double)_hit_ratio->get_value();
2401
172
    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
2402
172
    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
2403
2404
172
    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
2405
172
    stats["index_queue_curr_size"] = (double)_index_queue.get_capacity_unsafe();
2406
172
    stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
2407
172
    stats["index_queue_curr_elements"] = (double)_index_queue.get_elements_num_unsafe();
2408
2409
172
    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
2410
172
    stats["ttl_queue_curr_size"] = (double)_ttl_queue.get_capacity_unsafe();
2411
172
    stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
2412
172
    stats["ttl_queue_curr_elements"] = (double)_ttl_queue.get_elements_num_unsafe();
2413
2414
172
    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
2415
172
    stats["normal_queue_curr_size"] = (double)_normal_queue.get_capacity_unsafe();
2416
172
    stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
2417
172
    stats["normal_queue_curr_elements"] = (double)_normal_queue.get_elements_num_unsafe();
2418
2419
172
    stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
2420
172
    stats["disposable_queue_curr_size"] = (double)_disposable_queue.get_capacity_unsafe();
2421
172
    stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
2422
172
    stats["disposable_queue_curr_elements"] = (double)_disposable_queue.get_elements_num_unsafe();
2423
2424
172
    stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance;
2425
172
    stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode;
2426
2427
172
    stats["total_removed_counts"] = (double)_num_removed_blocks->get_value();
2428
172
    stats["total_hit_counts"] = (double)_num_hit_blocks->get_value();
2429
172
    stats["total_read_counts"] = (double)_num_read_blocks->get_value();
2430
2431
172
    stats["total_read_size"] = (double)_total_read_size_metrics->get_value();
2432
172
    stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value();
2433
172
    stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value();
2434
2435
172
    return stats;
2436
172
}
2437
2438
template void BlockFileCache::remove(FileBlockSPtr file_block,
2439
                                     std::lock_guard<std::mutex>& cache_lock,
2440
                                     std::lock_guard<std::mutex>& block_lock, bool sync);
2441
2442
#include "common/compile_check_end.h"
2443
2444
1
Status BlockFileCache::report_file_cache_inconsistency(std::vector<std::string>& results) {
2445
1
    InconsistencyContext inconsistency_context;
2446
1
    RETURN_IF_ERROR(check_file_cache_consistency(inconsistency_context));
2447
1
    auto n = inconsistency_context.types.size();
2448
1
    results.reserve(n);
2449
1
    for (size_t i = 0; i < n; i++) {
2450
0
        std::string result;
2451
0
        result += "File cache info in manager:\n";
2452
0
        result += inconsistency_context.infos_in_manager[i].to_string();
2453
0
        result += "File cache info in storage:\n";
2454
0
        result += inconsistency_context.infos_in_storage[i].to_string();
2455
0
        result += inconsistency_context.types[i].to_string();
2456
0
        result += "\n";
2457
0
        results.push_back(std::move(result));
2458
0
    }
2459
1
    return Status::OK();
2460
1
}
2461
2462
1
Status BlockFileCache::check_file_cache_consistency(InconsistencyContext& inconsistency_context) {
2463
1
    std::lock_guard<std::mutex> cache_lock(_mutex);
2464
1
    std::vector<FileCacheInfo> infos_in_storage;
2465
1
    RETURN_IF_ERROR(_storage->get_file_cache_infos(infos_in_storage, cache_lock));
2466
1
    std::unordered_set<AccessKeyAndOffset, KeyAndOffsetHash> confirmed_blocks;
2467
1
    for (const auto& info_in_storage : infos_in_storage) {
2468
0
        confirmed_blocks.insert({info_in_storage.hash, info_in_storage.offset});
2469
0
        auto* cell = get_cell(info_in_storage.hash, info_in_storage.offset, cache_lock);
2470
0
        if (cell == nullptr || cell->file_block == nullptr) {
2471
0
            inconsistency_context.infos_in_manager.emplace_back();
2472
0
            inconsistency_context.infos_in_storage.push_back(info_in_storage);
2473
0
            inconsistency_context.types.emplace_back(InconsistencyType::NOT_LOADED);
2474
0
            continue;
2475
0
        }
2476
0
        FileCacheInfo info_in_manager {
2477
0
                .hash = info_in_storage.hash,
2478
0
                .expiration_time = cell->file_block->expiration_time(),
2479
0
                .size = cell->size(),
2480
0
                .offset = info_in_storage.offset,
2481
0
                .is_tmp = cell->file_block->state() == FileBlock::State::DOWNLOADING,
2482
0
                .cache_type = cell->file_block->cache_type()};
2483
0
        InconsistencyType inconsistent_type;
2484
0
        if (info_in_storage.is_tmp != info_in_manager.is_tmp) {
2485
0
            inconsistent_type |= InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE;
2486
0
        }
2487
0
        size_t expected_size =
2488
0
                info_in_manager.is_tmp ? cell->dowloading_size() : info_in_manager.size;
2489
0
        if (info_in_storage.size != expected_size) {
2490
0
            inconsistent_type |= InconsistencyType::SIZE_INCONSISTENT;
2491
0
        }
2492
        // Only if it is not a tmp file need we check the cache type.
2493
0
        if ((inconsistent_type & InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE) == 0 &&
2494
0
            info_in_storage.cache_type != info_in_manager.cache_type) {
2495
0
            inconsistent_type |= InconsistencyType::CACHE_TYPE_INCONSISTENT;
2496
0
        }
2497
0
        if (info_in_storage.expiration_time != info_in_manager.expiration_time) {
2498
0
            inconsistent_type |= InconsistencyType::EXPIRATION_TIME_INCONSISTENT;
2499
0
        }
2500
0
        if (inconsistent_type != InconsistencyType::NONE) {
2501
0
            inconsistency_context.infos_in_manager.push_back(info_in_manager);
2502
0
            inconsistency_context.infos_in_storage.push_back(info_in_storage);
2503
0
            inconsistency_context.types.push_back(inconsistent_type);
2504
0
        }
2505
0
    }
2506
2507
1
    for (const auto& [hash, offset_to_cell] : _files) {
2508
0
        for (const auto& [offset, cell] : offset_to_cell) {
2509
0
            if (confirmed_blocks.contains({hash, offset})) {
2510
0
                continue;
2511
0
            }
2512
0
            const auto& block = cell.file_block;
2513
0
            inconsistency_context.infos_in_manager.emplace_back(
2514
0
                    hash, block->expiration_time(), cell.size(), offset,
2515
0
                    cell.file_block->state() == FileBlock::State::DOWNLOADING, block->cache_type());
2516
0
            inconsistency_context.infos_in_storage.emplace_back();
2517
0
            inconsistency_context.types.emplace_back(InconsistencyType::MISSING_IN_STORAGE);
2518
0
        }
2519
0
    }
2520
1
    return Status::OK();
2521
1
}
2522
2523
} // namespace doris::io