Coverage Report

Created: 2026-04-01 13:27

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/cache/block_file_cache.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileCache.cpp
19
// and modified by Doris
20
21
#include "io/cache/block_file_cache.h"
22
23
#include <gen_cpp/file_cache.pb.h>
24
25
#include <cstdio>
26
#include <exception>
27
#include <fstream>
28
#include <unordered_set>
29
30
#include "common/status.h"
31
#include "cpp/sync_point.h"
32
#include "runtime/exec_env.h"
33
34
#if defined(__APPLE__)
35
#include <sys/mount.h>
36
#else
37
#include <sys/statfs.h>
38
#endif
39
40
#include <chrono> // IWYU pragma: keep
41
#include <mutex>
42
#include <ranges>
43
44
#include "common/cast_set.h"
45
#include "common/config.h"
46
#include "common/logging.h"
47
#include "core/uint128.h"
48
#include "exec/common/sip_hash.h"
49
#include "io/cache/block_file_cache_ttl_mgr.h"
50
#include "io/cache/file_block.h"
51
#include "io/cache/file_cache_common.h"
52
#include "io/cache/fs_file_cache_storage.h"
53
#include "io/cache/mem_file_cache_storage.h"
54
#include "runtime/runtime_profile.h"
55
#include "util/concurrency_stats.h"
56
#include "util/stack_util.h"
57
#include "util/stopwatch.hpp"
58
#include "util/thread.h"
59
#include "util/time.h"
60
namespace doris::io {
61
#include "common/compile_check_begin.h"
62
63
// Insert a block pointer into one shard while swallowing allocation failures.
64
17.9k
bool NeedUpdateLRUBlocks::insert(FileBlockSPtr block) {
65
17.9k
    if (!block) {
66
2
        return false;
67
2
    }
68
17.9k
    try {
69
17.9k
        auto* raw_ptr = block.get();
70
17.9k
        auto idx = shard_index(raw_ptr);
71
17.9k
        auto& shard = _shards[idx];
72
17.9k
        std::lock_guard lock(shard.mutex);
73
17.9k
        auto [_, inserted] = shard.entries.emplace(raw_ptr, std::move(block));
74
17.9k
        if (inserted) {
75
1.90k
            _size.fetch_add(1, std::memory_order_relaxed);
76
1.90k
        }
77
17.9k
        return inserted;
78
17.9k
    } catch (const std::exception& e) {
79
0
        LOG(WARNING) << "Failed to enqueue block for LRU update: " << e.what();
80
0
    } catch (...) {
81
0
        LOG(WARNING) << "Failed to enqueue block for LRU update: unknown error";
82
0
    }
83
0
    return false;
84
17.9k
}
85
86
// Drain up to `limit` unique blocks to the caller, keeping the structure consistent on failures.
87
9.34k
size_t NeedUpdateLRUBlocks::drain(size_t limit, std::vector<FileBlockSPtr>* output) {
88
9.34k
    if (limit == 0 || output == nullptr) {
89
4
        return 0;
90
4
    }
91
9.33k
    size_t drained = 0;
92
9.33k
    try {
93
9.33k
        output->reserve(output->size() + std::min(limit, size()));
94
589k
        for (auto& shard : _shards) {
95
589k
            if (drained >= limit) {
96
2
                break;
97
2
            }
98
589k
            std::lock_guard lock(shard.mutex);
99
589k
            auto it = shard.entries.begin();
100
589k
            size_t shard_drained = 0;
101
590k
            while (it != shard.entries.end() && drained + shard_drained < limit) {
102
1.42k
                output->emplace_back(std::move(it->second));
103
1.42k
                it = shard.entries.erase(it);
104
1.42k
                ++shard_drained;
105
1.42k
            }
106
589k
            if (shard_drained > 0) {
107
15
                _size.fetch_sub(shard_drained, std::memory_order_relaxed);
108
15
                drained += shard_drained;
109
15
            }
110
589k
        }
111
9.33k
    } catch (const std::exception& e) {
112
0
        LOG(WARNING) << "Failed to drain LRU update blocks: " << e.what();
113
0
    } catch (...) {
114
0
        LOG(WARNING) << "Failed to drain LRU update blocks: unknown error";
115
0
    }
116
9.39k
    return drained;
117
9.33k
}
118
119
// Remove every pending block, guarding against unexpected exceptions.
120
64
void NeedUpdateLRUBlocks::clear() {
121
64
    try {
122
4.09k
        for (auto& shard : _shards) {
123
4.09k
            std::lock_guard lock(shard.mutex);
124
4.09k
            if (!shard.entries.empty()) {
125
2
                auto removed = shard.entries.size();
126
2
                shard.entries.clear();
127
2
                _size.fetch_sub(removed, std::memory_order_relaxed);
128
2
            }
129
4.09k
        }
130
64
    } catch (const std::exception& e) {
131
0
        LOG(WARNING) << "Failed to clear LRU update blocks: " << e.what();
132
0
    } catch (...) {
133
0
        LOG(WARNING) << "Failed to clear LRU update blocks: unknown error";
134
0
    }
135
64
}
136
137
17.9k
size_t NeedUpdateLRUBlocks::shard_index(FileBlock* ptr) const {
138
17.9k
    DCHECK(ptr != nullptr);
139
17.9k
    return std::hash<FileBlock*> {}(ptr)&kShardMask;
140
17.9k
}
141
142
BlockFileCache::BlockFileCache(const std::string& cache_base_path,
143
                               const FileCacheSettings& cache_settings)
144
322
        : _cache_base_path(cache_base_path),
145
322
          _capacity(cache_settings.capacity),
146
322
          _max_file_block_size(cache_settings.max_file_block_size) {
147
322
    _cur_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(_cache_base_path.c_str(),
148
322
                                                                     "file_cache_cache_size", 0);
149
322
    _cache_capacity_metrics = std::make_shared<bvar::Status<size_t>>(
150
322
            _cache_base_path.c_str(), "file_cache_capacity", _capacity);
151
322
    _cur_ttl_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
152
322
            _cache_base_path.c_str(), "file_cache_ttl_cache_size", 0);
153
322
    _cur_normal_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
154
322
            _cache_base_path.c_str(), "file_cache_normal_queue_element_count", 0);
155
322
    _cur_ttl_cache_lru_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
156
322
            _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_size", 0);
157
322
    _cur_ttl_cache_lru_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
158
322
            _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_element_count", 0);
159
322
    _cur_normal_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
160
322
            _cache_base_path.c_str(), "file_cache_normal_queue_cache_size", 0);
161
322
    _cur_index_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
162
322
            _cache_base_path.c_str(), "file_cache_index_queue_element_count", 0);
163
322
    _cur_index_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
164
322
            _cache_base_path.c_str(), "file_cache_index_queue_cache_size", 0);
165
322
    _cur_disposable_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
166
322
            _cache_base_path.c_str(), "file_cache_disposable_queue_element_count", 0);
167
322
    _cur_disposable_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
168
322
            _cache_base_path.c_str(), "file_cache_disposable_queue_cache_size", 0);
169
170
322
    _queue_evict_size_metrics[0] = std::make_shared<bvar::Adder<size_t>>(
171
322
            _cache_base_path.c_str(), "file_cache_index_queue_evict_size");
172
322
    _queue_evict_size_metrics[1] = std::make_shared<bvar::Adder<size_t>>(
173
322
            _cache_base_path.c_str(), "file_cache_normal_queue_evict_size");
174
322
    _queue_evict_size_metrics[2] = std::make_shared<bvar::Adder<size_t>>(
175
322
            _cache_base_path.c_str(), "file_cache_disposable_queue_evict_size");
176
322
    _queue_evict_size_metrics[3] = std::make_shared<bvar::Adder<size_t>>(
177
322
            _cache_base_path.c_str(), "file_cache_ttl_cache_evict_size");
178
322
    _total_evict_size_metrics = std::make_shared<bvar::Adder<size_t>>(
179
322
            _cache_base_path.c_str(), "file_cache_total_evict_size");
180
322
    _total_read_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
181
322
                                                                     "file_cache_total_read_size");
182
322
    _total_hit_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
183
322
                                                                    "file_cache_total_hit_size");
184
322
    _gc_evict_bytes_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
185
322
                                                                    "file_cache_gc_evict_bytes");
186
322
    _gc_evict_count_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
187
322
                                                                    "file_cache_gc_evict_count");
188
189
322
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
190
322
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
191
322
                                                  "file_cache_evict_by_time_disposable_to_normal");
192
322
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
193
322
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
194
322
                                                  "file_cache_evict_by_time_disposable_to_index");
195
322
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
196
322
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
197
322
                                                  "file_cache_evict_by_time_disposable_to_ttl");
198
322
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
199
322
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
200
322
                                                  "file_cache_evict_by_time_normal_to_disposable");
201
322
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
202
322
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
203
322
                                                  "file_cache_evict_by_time_normal_to_index");
204
322
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
205
322
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
206
322
                                                  "file_cache_evict_by_time_normal_to_ttl");
207
322
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
208
322
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
209
322
                                                  "file_cache_evict_by_time_index_to_disposable");
210
322
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
211
322
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
212
322
                                                  "file_cache_evict_by_time_index_to_normal");
213
322
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
214
322
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
215
322
                                                  "file_cache_evict_by_time_index_to_ttl");
216
322
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
217
322
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
218
322
                                                  "file_cache_evict_by_time_ttl_to_disposable");
219
322
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
220
322
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
221
322
                                                  "file_cache_evict_by_time_ttl_to_normal");
222
322
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
223
322
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
224
322
                                                  "file_cache_evict_by_time_ttl_to_index");
225
226
322
    _evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE] =
227
322
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
228
322
                                                  "file_cache_evict_by_self_lru_disposable");
229
322
    _evict_by_self_lru_metrics_matrix[FileCacheType::NORMAL] =
230
322
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
231
322
                                                  "file_cache_evict_by_self_lru_normal");
232
322
    _evict_by_self_lru_metrics_matrix[FileCacheType::INDEX] = std::make_shared<bvar::Adder<size_t>>(
233
322
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_index");
234
322
    _evict_by_self_lru_metrics_matrix[FileCacheType::TTL] = std::make_shared<bvar::Adder<size_t>>(
235
322
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_ttl");
236
237
322
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
238
322
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
239
322
                                                  "file_cache_evict_by_size_disposable_to_normal");
240
322
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
241
322
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
242
322
                                                  "file_cache_evict_by_size_disposable_to_index");
243
322
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
244
322
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
245
322
                                                  "file_cache_evict_by_size_disposable_to_ttl");
246
322
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
247
322
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
248
322
                                                  "file_cache_evict_by_size_normal_to_disposable");
249
322
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
250
322
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
251
322
                                                  "file_cache_evict_by_size_normal_to_index");
252
322
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
253
322
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
254
322
                                                  "file_cache_evict_by_size_normal_to_ttl");
255
322
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
256
322
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
257
322
                                                  "file_cache_evict_by_size_index_to_disposable");
258
322
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
259
322
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
260
322
                                                  "file_cache_evict_by_size_index_to_normal");
261
322
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
262
322
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
263
322
                                                  "file_cache_evict_by_size_index_to_ttl");
264
322
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
265
322
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
266
322
                                                  "file_cache_evict_by_size_ttl_to_disposable");
267
322
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
268
322
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
269
322
                                                  "file_cache_evict_by_size_ttl_to_normal");
270
322
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
271
322
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
272
322
                                                  "file_cache_evict_by_size_ttl_to_index");
273
274
322
    _evict_by_try_release = std::make_shared<bvar::Adder<size_t>>(
275
322
            _cache_base_path.c_str(), "file_cache_evict_by_try_release");
276
277
322
    _num_read_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
278
322
                                                             "file_cache_num_read_blocks");
279
322
    _num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
280
322
                                                            "file_cache_num_hit_blocks");
281
322
    _num_removed_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
282
322
                                                                "file_cache_num_removed_blocks");
283
284
322
    _no_warmup_num_read_blocks = std::make_shared<bvar::Adder<size_t>>(
285
322
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks");
286
322
    _no_warmup_num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(
287
322
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks");
288
289
#ifndef BE_TEST
290
    _num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
291
            _cache_base_path.c_str(), "file_cache_num_hit_blocks_5m", _num_hit_blocks.get(), 300);
292
    _num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
293
            _cache_base_path.c_str(), "file_cache_num_read_blocks_5m", _num_read_blocks.get(), 300);
294
    _num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
295
            _cache_base_path.c_str(), "file_cache_num_hit_blocks_1h", _num_hit_blocks.get(), 3600);
296
    _num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
297
            _cache_base_path.c_str(), "file_cache_num_read_blocks_1h", _num_read_blocks.get(),
298
            3600);
299
    _no_warmup_num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
300
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_5m",
301
            _no_warmup_num_hit_blocks.get(), 300);
302
    _no_warmup_num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
303
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_5m",
304
            _no_warmup_num_read_blocks.get(), 300);
305
    _no_warmup_num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
306
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_1h",
307
            _no_warmup_num_hit_blocks.get(), 3600);
308
    _no_warmup_num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
309
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_1h",
310
            _no_warmup_num_read_blocks.get(), 3600);
311
#endif
312
313
322
    _hit_ratio = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
314
322
                                                        "file_cache_hit_ratio", 0.0);
315
322
    _hit_ratio_5m = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
316
322
                                                           "file_cache_hit_ratio_5m", 0.0);
317
322
    _hit_ratio_1h = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
318
322
                                                           "file_cache_hit_ratio_1h", 0.0);
319
320
322
    _no_warmup_hit_ratio = std::make_shared<bvar::Status<double>>(
321
322
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio", 0.0);
322
322
    _no_warmup_hit_ratio_5m = std::make_shared<bvar::Status<double>>(
323
322
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_5m", 0.0);
324
322
    _no_warmup_hit_ratio_1h = std::make_shared<bvar::Status<double>>(
325
322
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_1h", 0.0);
326
327
322
    _disk_limit_mode_metrics = std::make_shared<bvar::Status<size_t>>(
328
322
            _cache_base_path.c_str(), "file_cache_disk_limit_mode", 0);
329
322
    _need_evict_cache_in_advance_metrics = std::make_shared<bvar::Status<size_t>>(
330
322
            _cache_base_path.c_str(), "file_cache_need_evict_cache_in_advance", 0);
331
322
    _meta_store_write_queue_size_metrics = std::make_shared<bvar::Status<size_t>>(
332
322
            _cache_base_path.c_str(), "file_cache_meta_store_write_queue_size", 0);
333
334
322
    _cache_lock_wait_time_us = std::make_shared<bvar::LatencyRecorder>(
335
322
            _cache_base_path.c_str(), "file_cache_cache_lock_wait_time_us");
336
322
    _get_or_set_latency_us = std::make_shared<bvar::LatencyRecorder>(
337
322
            _cache_base_path.c_str(), "file_cache_get_or_set_latency_us");
338
322
    _storage_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
339
322
            _cache_base_path.c_str(), "file_cache_storage_sync_remove_latency_us");
340
322
    _storage_retry_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
341
322
            _cache_base_path.c_str(), "file_cache_storage_retry_sync_remove_latency_us");
342
322
    _storage_async_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
343
322
            _cache_base_path.c_str(), "file_cache_storage_async_remove_latency_us");
344
322
    _evict_in_advance_latency_us = std::make_shared<bvar::LatencyRecorder>(
345
322
            _cache_base_path.c_str(), "file_cache_evict_in_advance_latency_us");
346
322
    _lru_dump_latency_us = std::make_shared<bvar::LatencyRecorder>(
347
322
            _cache_base_path.c_str(), "file_cache_lru_dump_latency_us");
348
322
    _recycle_keys_length_recorder = std::make_shared<bvar::LatencyRecorder>(
349
322
            _cache_base_path.c_str(), "file_cache_recycle_keys_length");
350
322
    _need_update_lru_blocks_length_recorder = std::make_shared<bvar::LatencyRecorder>(
351
322
            _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_length");
352
322
    _update_lru_blocks_latency_us = std::make_shared<bvar::LatencyRecorder>(
353
322
            _cache_base_path.c_str(), "file_cache_update_lru_blocks_latency_us");
354
322
    _ttl_gc_latency_us = std::make_shared<bvar::LatencyRecorder>(_cache_base_path.c_str(),
355
322
                                                                 "file_cache_ttl_gc_latency_us");
356
322
    _shadow_queue_levenshtein_distance = std::make_shared<bvar::LatencyRecorder>(
357
322
            _cache_base_path.c_str(), "file_cache_shadow_queue_levenshtein_distance");
358
359
322
    _disposable_queue = LRUQueue(cache_settings.disposable_queue_size,
360
322
                                 cache_settings.disposable_queue_elements, 60 * 60);
361
322
    _index_queue = LRUQueue(cache_settings.index_queue_size, cache_settings.index_queue_elements,
362
322
                            7 * 24 * 60 * 60);
363
322
    _normal_queue = LRUQueue(cache_settings.query_queue_size, cache_settings.query_queue_elements,
364
322
                             24 * 60 * 60);
365
322
    _ttl_queue = LRUQueue(cache_settings.ttl_queue_size, cache_settings.ttl_queue_elements,
366
322
                          std::numeric_limits<int>::max());
367
368
322
    _lru_recorder = std::make_unique<LRUQueueRecorder>(this);
369
322
    _lru_dumper = std::make_unique<CacheLRUDumper>(this, _lru_recorder.get());
370
322
    if (cache_settings.storage == "memory") {
371
36
        _storage = std::make_unique<MemFileCacheStorage>();
372
36
        _cache_base_path = "memory";
373
286
    } else {
374
286
        _storage = std::make_unique<FSFileCacheStorage>();
375
286
    }
376
377
322
    LOG(INFO) << "file cache path= " << _cache_base_path << " " << cache_settings.to_string();
378
322
}
379
380
8.49k
UInt128Wrapper BlockFileCache::hash(const std::string& path) {
381
8.49k
    uint128_t value;
382
8.49k
    sip_hash128(path.data(), path.size(), reinterpret_cast<char*>(&value));
383
8.49k
    return UInt128Wrapper(value);
384
8.49k
}
385
386
BlockFileCache::QueryFileCacheContextHolderPtr BlockFileCache::get_query_context_holder(
387
16
        const TUniqueId& query_id, int file_cache_query_limit_percent) {
388
16
    SCOPED_CACHE_LOCK(_mutex, this);
389
16
    if (!config::enable_file_cache_query_limit) {
390
2
        return {};
391
2
    }
392
393
    /// if enable_filesystem_query_cache_limit is true,
394
    /// we create context query for current query.
395
14
    auto context = get_or_set_query_context(query_id, cache_lock, file_cache_query_limit_percent);
396
14
    return std::make_unique<QueryFileCacheContextHolder>(query_id, this, context);
397
16
}
398
399
BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_query_context(
400
6.74k
        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock) {
401
6.74k
    auto query_iter = _query_map.find(query_id);
402
6.74k
    return (query_iter == _query_map.end()) ? nullptr : query_iter->second;
403
6.74k
}
404
405
12
void BlockFileCache::remove_query_context(const TUniqueId& query_id) {
406
12
    SCOPED_CACHE_LOCK(_mutex, this);
407
12
    const auto& query_iter = _query_map.find(query_id);
408
409
12
    if (query_iter != _query_map.end() && query_iter->second.use_count() <= 1) {
410
10
        _query_map.erase(query_iter);
411
10
    }
412
12
}
413
414
BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_or_set_query_context(
415
        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock,
416
14
        int file_cache_query_limit_percent) {
417
14
    if (query_id.lo == 0 && query_id.hi == 0) {
418
2
        return nullptr;
419
2
    }
420
421
12
    auto context = get_query_context(query_id, cache_lock);
422
12
    if (context) {
423
2
        return context;
424
2
    }
425
426
10
    size_t file_cache_query_limit_size = _capacity * file_cache_query_limit_percent / 100;
427
10
    if (file_cache_query_limit_size < 268435456) {
428
10
        LOG(WARNING) << "The user-set file cache query limit (" << file_cache_query_limit_size
429
10
                     << " bytes) is less than the 256MB recommended minimum. "
430
10
                     << "Consider increasing the session variable 'file_cache_query_limit_percent'"
431
10
                     << " from its current value " << file_cache_query_limit_percent << "%.";
432
10
    }
433
10
    auto query_context = std::make_shared<QueryFileCacheContext>(file_cache_query_limit_size);
434
10
    auto query_iter = _query_map.emplace(query_id, query_context).first;
435
10
    return query_iter->second;
436
12
}
437
438
void BlockFileCache::QueryFileCacheContext::remove(const UInt128Wrapper& hash, size_t offset,
439
40
                                                   std::lock_guard<std::mutex>& cache_lock) {
440
40
    auto pair = std::make_pair(hash, offset);
441
40
    auto record = records.find(pair);
442
40
    DCHECK(record != records.end());
443
40
    auto iter = record->second;
444
40
    records.erase(pair);
445
40
    lru_queue.remove(iter, cache_lock);
446
40
}
447
448
void BlockFileCache::QueryFileCacheContext::reserve(const UInt128Wrapper& hash, size_t offset,
449
                                                    size_t size,
450
60
                                                    std::lock_guard<std::mutex>& cache_lock) {
451
60
    auto pair = std::make_pair(hash, offset);
452
60
    if (records.find(pair) == records.end()) {
453
58
        auto queue_iter = lru_queue.add(hash, offset, size, cache_lock);
454
58
        records.insert({pair, queue_iter});
455
58
    }
456
60
}
457
458
280
Status BlockFileCache::initialize() {
459
280
    SCOPED_CACHE_LOCK(_mutex, this);
460
280
    return initialize_unlocked(cache_lock);
461
280
}
462
463
280
Status BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lock) {
464
280
    DCHECK(!_is_initialized);
465
280
    _is_initialized = true;
466
280
    if (config::file_cache_background_lru_dump_tail_record_num > 0) {
467
        // requirements:
468
        // 1. restored data should not overwrite the last dump
469
        // 2. restore should happen before load and async load
470
        // 3. all queues should be restored sequencially to avoid conflict
471
        // TODO(zhengyu): we can parralize them but will increase complexity, so lets check the time cost
472
        // to see if any improvement is a necessary
473
280
        restore_lru_queues_from_disk(cache_lock);
474
280
    }
475
280
    RETURN_IF_ERROR(_storage->init(this));
476
477
280
    if (auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get())) {
478
248
        if (auto* meta_store = fs_storage->get_meta_store()) {
479
248
            _ttl_mgr = std::make_unique<BlockFileCacheTtlMgr>(this, meta_store);
480
248
        }
481
248
    }
482
483
280
    _cache_background_monitor_thread = std::thread(&BlockFileCache::run_background_monitor, this);
484
280
    _cache_background_gc_thread = std::thread(&BlockFileCache::run_background_gc, this);
485
280
    _cache_background_evict_in_advance_thread =
486
280
            std::thread(&BlockFileCache::run_background_evict_in_advance, this);
487
280
    _cache_background_block_lru_update_thread =
488
280
            std::thread(&BlockFileCache::run_background_block_lru_update, this);
489
490
    // Initialize LRU dump thread and restore queues
491
280
    _cache_background_lru_dump_thread = std::thread(&BlockFileCache::run_background_lru_dump, this);
492
280
    _cache_background_lru_log_replay_thread =
493
280
            std::thread(&BlockFileCache::run_background_lru_log_replay, this);
494
495
280
    return Status::OK();
496
280
}
497
498
void BlockFileCache::update_block_lru(FileBlockSPtr block,
499
1.41k
                                      std::lock_guard<std::mutex>& cache_lock) {
500
1.41k
    if (!block) {
501
2
        return;
502
2
    }
503
504
1.41k
    FileBlockCell* cell = get_cell(block->get_hash_value(), block->offset(), cache_lock);
505
1.41k
    if (!cell || cell->file_block.get() != block.get()) {
506
5
        return;
507
5
    }
508
509
1.40k
    if (cell->queue_iterator) {
510
1.40k
        auto& queue = get_queue(block->cache_type());
511
1.40k
        queue.move_to_end(*cell->queue_iterator, cache_lock);
512
1.40k
        _lru_recorder->record_queue_event(block->cache_type(), CacheLRULogType::MOVETOBACK,
513
1.40k
                                          block->_key.hash, block->_key.offset,
514
1.40k
                                          block->_block_range.size());
515
1.40k
    }
516
1.40k
    cell->update_atime();
517
1.40k
}
518
519
void BlockFileCache::use_cell(const FileBlockCell& cell, FileBlocks* result, bool move_iter_flag,
520
1.55k
                              std::lock_guard<std::mutex>& cache_lock) {
521
1.55k
    if (result) {
522
1.55k
        result->push_back(cell.file_block);
523
1.55k
    }
524
525
1.55k
    auto& queue = get_queue(cell.file_block->cache_type());
526
    /// Move to the end of the queue. The iterator remains valid.
527
1.55k
    if (cell.queue_iterator && move_iter_flag) {
528
1.55k
        queue.move_to_end(*cell.queue_iterator, cache_lock);
529
1.55k
        _lru_recorder->record_queue_event(cell.file_block->cache_type(),
530
1.55k
                                          CacheLRULogType::MOVETOBACK, cell.file_block->_key.hash,
531
1.55k
                                          cell.file_block->_key.offset, cell.size());
532
1.55k
    }
533
534
1.55k
    cell.update_atime();
535
1.55k
}
536
537
template <class T>
538
    requires IsXLock<T>
539
FileBlockCell* BlockFileCache::get_cell(const UInt128Wrapper& hash, size_t offset,
540
1.16M
                                        T& /* cache_lock */) {
541
1.16M
    auto it = _files.find(hash);
542
1.16M
    if (it == _files.end()) {
543
7
        return nullptr;
544
7
    }
545
546
1.16M
    auto& offsets = it->second;
547
1.16M
    auto cell_it = offsets.find(offset);
548
1.16M
    if (cell_it == offsets.end()) {
549
6
        return nullptr;
550
6
    }
551
552
1.16M
    return &cell_it->second;
553
1.16M
}
554
555
1.55k
bool BlockFileCache::need_to_move(FileCacheType cell_type, FileCacheType query_type) const {
556
1.55k
    return query_type != FileCacheType::DISPOSABLE && cell_type != FileCacheType::DISPOSABLE;
557
1.55k
}
558
559
FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheContext& context,
560
                                    const FileBlock::Range& range,
561
19.1k
                                    std::lock_guard<std::mutex>& cache_lock) {
562
    /// Given range = [left, right] and non-overlapping ordered set of file blocks,
563
    /// find list [block1, ..., blockN] of blocks which intersect with given range.
564
19.1k
    auto it = _files.find(hash);
565
19.1k
    if (it == _files.end()) {
566
6.21k
        if (_async_open_done) {
567
6.21k
            return {};
568
6.21k
        }
569
5
        FileCacheKey key;
570
5
        key.hash = hash;
571
5
        key.meta.type = context.cache_type;
572
5
        key.meta.expiration_time = context.expiration_time;
573
5
        key.meta.tablet_id = context.tablet_id;
574
5
        _storage->load_blocks_directly_unlocked(this, key, cache_lock);
575
576
5
        it = _files.find(hash);
577
5
        if (it == _files.end()) [[unlikely]] {
578
4
            return {};
579
4
        }
580
5
    }
581
582
12.9k
    auto& file_blocks = it->second;
583
12.9k
    if (file_blocks.empty()) {
584
0
        LOG(WARNING) << "file_blocks is empty for hash=" << hash.to_string()
585
0
                     << " cache type=" << context.cache_type
586
0
                     << " cache expiration time=" << context.expiration_time
587
0
                     << " cache range=" << range.left << " " << range.right
588
0
                     << " query id=" << context.query_id;
589
0
        DCHECK(false);
590
0
        _files.erase(hash);
591
0
        return {};
592
0
    }
593
594
12.9k
    FileBlocks result;
595
12.9k
    auto block_it = file_blocks.lower_bound(range.left);
596
12.9k
    if (block_it == file_blocks.end()) {
597
        /// N - last cached block for given file hash, block{N}.offset < range.left:
598
        ///   block{N}                       block{N}
599
        /// [________                         [_______]
600
        ///     [__________]         OR                  [________]
601
        ///     ^                                        ^
602
        ///     range.left                               range.left
603
604
12.4k
        const auto& cell = file_blocks.rbegin()->second;
605
12.4k
        if (cell.file_block->range().right < range.left) {
606
12.3k
            return {};
607
12.3k
        }
608
609
28
        use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
610
28
                 cache_lock);
611
547
    } else { /// block_it <-- segmment{k}
612
547
        if (block_it != file_blocks.begin()) {
613
434
            const auto& prev_cell = std::prev(block_it)->second;
614
434
            const auto& prev_cell_range = prev_cell.file_block->range();
615
616
434
            if (range.left <= prev_cell_range.right) {
617
                ///   block{k-1}  block{k}
618
                ///   [________]   [_____
619
                ///       [___________
620
                ///       ^
621
                ///       range.left
622
623
274
                use_cell(prev_cell, &result,
624
274
                         need_to_move(prev_cell.file_block->cache_type(), context.cache_type),
625
274
                         cache_lock);
626
274
            }
627
434
        }
628
629
        ///  block{k} ...       block{k-1}  block{k}                      block{k}
630
        ///  [______              [______]     [____                        [________
631
        ///  [_________     OR              [________      OR    [______]   ^
632
        ///  ^                              ^                           ^   block{k}.offset
633
        ///  range.left                     range.left                  range.right
634
635
1.80k
        while (block_it != file_blocks.end()) {
636
1.59k
            const auto& cell = block_it->second;
637
1.59k
            if (range.right < cell.file_block->range().left) {
638
344
                break;
639
344
            }
640
641
1.25k
            use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
642
1.25k
                     cache_lock);
643
1.25k
            ++block_it;
644
1.25k
        }
645
547
    }
646
647
575
    return result;
648
12.9k
}
649
650
17.9k
void BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) {
651
17.9k
    if (_need_update_lru_blocks.insert(std::move(block))) {
652
1.88k
        *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
653
1.88k
    }
654
17.9k
}
655
656
8
std::string BlockFileCache::clear_file_cache_async() {
657
8
    LOG(INFO) << "start clear_file_cache_async, path=" << _cache_base_path;
658
8
    _lru_dumper->remove_lru_dump_files();
659
8
    int64_t num_cells_all = 0;
660
8
    int64_t num_cells_to_delete = 0;
661
8
    int64_t num_cells_wait_recycle = 0;
662
8
    int64_t num_files_all = 0;
663
8
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::clear_file_cache_async");
664
8
    {
665
8
        SCOPED_CACHE_LOCK(_mutex, this);
666
667
8
        std::vector<FileBlockCell*> deleting_cells;
668
8
        for (auto& [_, offset_to_cell] : _files) {
669
8
            ++num_files_all;
670
96
            for (auto& [_1, cell] : offset_to_cell) {
671
96
                ++num_cells_all;
672
96
                deleting_cells.push_back(&cell);
673
96
            }
674
8
        }
675
676
        // we cannot delete the element in the loop above, because it will break the iterator
677
96
        for (auto& cell : deleting_cells) {
678
96
            if (!cell->releasable()) {
679
4
                LOG(INFO) << "cell is not releasable, hash="
680
4
                          << " offset=" << cell->file_block->offset();
681
4
                cell->file_block->set_deleting();
682
4
                ++num_cells_wait_recycle;
683
4
                continue;
684
4
            }
685
92
            FileBlockSPtr file_block = cell->file_block;
686
92
            if (file_block) {
687
92
                std::lock_guard block_lock(file_block->_mutex);
688
92
                remove(file_block, cache_lock, block_lock, false);
689
92
                ++num_cells_to_delete;
690
92
            }
691
92
        }
692
8
        clear_need_update_lru_blocks();
693
8
    }
694
695
8
    std::stringstream ss;
696
8
    ss << "finish clear_file_cache_async, path=" << _cache_base_path
697
8
       << " num_files_all=" << num_files_all << " num_cells_all=" << num_cells_all
698
8
       << " num_cells_to_delete=" << num_cells_to_delete
699
8
       << " num_cells_wait_recycle=" << num_cells_wait_recycle;
700
8
    auto msg = ss.str();
701
8
    LOG(INFO) << msg;
702
8
    _lru_dumper->remove_lru_dump_files();
703
8
    return msg;
704
8
}
705
706
FileBlocks BlockFileCache::split_range_into_cells(const UInt128Wrapper& hash,
707
                                                  const CacheContext& context, size_t offset,
708
                                                  size_t size, FileBlock::State state,
709
19.0k
                                                  std::lock_guard<std::mutex>& cache_lock) {
710
19.0k
    DCHECK(size > 0);
711
712
19.0k
    auto current_pos = offset;
713
19.0k
    auto end_pos_non_included = offset + size;
714
715
19.0k
    size_t current_size = 0;
716
19.0k
    size_t remaining_size = size;
717
718
19.0k
    FileBlocks file_blocks;
719
38.0k
    while (current_pos < end_pos_non_included) {
720
19.0k
        current_size = std::min(remaining_size, _max_file_block_size);
721
19.0k
        remaining_size -= current_size;
722
19.0k
        state = try_reserve(hash, context, current_pos, current_size, cache_lock)
723
19.0k
                        ? state
724
19.0k
                        : FileBlock::State::SKIP_CACHE;
725
19.0k
        if (state == FileBlock::State::SKIP_CACHE) [[unlikely]] {
726
2.52k
            FileCacheKey key;
727
2.52k
            key.hash = hash;
728
2.52k
            key.offset = current_pos;
729
2.52k
            key.meta.type = context.cache_type;
730
2.52k
            key.meta.expiration_time = context.expiration_time;
731
2.52k
            key.meta.tablet_id = context.tablet_id;
732
2.52k
            auto file_block = std::make_shared<FileBlock>(key, current_size, this,
733
2.52k
                                                          FileBlock::State::SKIP_CACHE);
734
2.52k
            file_blocks.push_back(std::move(file_block));
735
16.5k
        } else {
736
16.5k
            auto* cell = add_cell(hash, context, current_pos, current_size, state, cache_lock);
737
16.5k
            if (cell) {
738
16.5k
                file_blocks.push_back(cell->file_block);
739
16.5k
                if (!context.is_cold_data) {
740
16.5k
                    cell->update_atime();
741
16.5k
                }
742
16.5k
            }
743
16.5k
            if (_ttl_mgr && context.tablet_id != 0) {
744
2.03k
                _ttl_mgr->register_tablet_id(context.tablet_id);
745
2.03k
            }
746
16.5k
        }
747
748
19.0k
        current_pos += current_size;
749
19.0k
    }
750
751
19.0k
    DCHECK(file_blocks.empty() || offset + size - 1 == file_blocks.back()->range().right);
752
19.0k
    return file_blocks;
753
19.0k
}
754
755
void BlockFileCache::fill_holes_with_empty_file_blocks(FileBlocks& file_blocks,
756
                                                       const UInt128Wrapper& hash,
757
                                                       const CacheContext& context,
758
                                                       const FileBlock::Range& range,
759
567
                                                       std::lock_guard<std::mutex>& cache_lock) {
760
    /// There are blocks [block1, ..., blockN]
761
    /// (non-overlapping, non-empty, ascending-ordered) which (maybe partially)
762
    /// intersect with given range.
763
764
    /// It can have holes:
765
    /// [____________________]         -- requested range
766
    ///     [____]  [_]   [_________]  -- intersecting cache [block1, ..., blockN]
767
    ///
768
    /// For each such hole create a cell with file block state EMPTY.
769
770
567
    auto it = file_blocks.begin();
771
567
    auto block_range = (*it)->range();
772
773
567
    size_t current_pos = 0;
774
567
    if (block_range.left < range.left) {
775
        ///    [_______     -- requested range
776
        /// [_______
777
        /// ^
778
        /// block1
779
780
302
        current_pos = block_range.right + 1;
781
302
        ++it;
782
302
    } else {
783
265
        current_pos = range.left;
784
265
    }
785
786
1.82k
    while (current_pos <= range.right && it != file_blocks.end()) {
787
1.25k
        block_range = (*it)->range();
788
789
1.25k
        if (current_pos == block_range.left) {
790
1.03k
            current_pos = block_range.right + 1;
791
1.03k
            ++it;
792
1.03k
            continue;
793
1.03k
        }
794
795
1.25k
        DCHECK(current_pos < block_range.left);
796
797
218
        auto hole_size = block_range.left - current_pos;
798
799
218
        file_blocks.splice(it, split_range_into_cells(hash, context, current_pos, hole_size,
800
218
                                                      FileBlock::State::EMPTY, cache_lock));
801
802
218
        current_pos = block_range.right + 1;
803
218
        ++it;
804
218
    }
805
806
567
    if (current_pos <= range.right) {
807
        ///   ________]     -- requested range
808
        ///   _____]
809
        ///        ^
810
        /// blockN
811
812
172
        auto hole_size = range.right - current_pos + 1;
813
814
172
        file_blocks.splice(file_blocks.end(),
815
172
                           split_range_into_cells(hash, context, current_pos, hole_size,
816
172
                                                  FileBlock::State::EMPTY, cache_lock));
817
172
    }
818
567
}
819
820
FileBlocksHolder BlockFileCache::get_or_set(const UInt128Wrapper& hash, size_t offset, size_t size,
821
19.1k
                                            CacheContext& context) {
822
19.1k
    FileBlock::Range range(offset, offset + size - 1);
823
824
19.1k
    ReadStatistics* stats = context.stats;
825
19.1k
    DCHECK(stats != nullptr);
826
19.1k
    MonotonicStopWatch sw;
827
19.1k
    sw.start();
828
19.1k
    ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->increment();
829
19.1k
    std::lock_guard cache_lock(_mutex);
830
19.1k
    ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->decrement();
831
19.1k
    stats->lock_wait_timer += sw.elapsed_time();
832
19.1k
    FileBlocks file_blocks;
833
19.1k
    int64_t duration = 0;
834
19.1k
    {
835
19.1k
        SCOPED_RAW_TIMER(&duration);
836
        /// Get all blocks which intersect with the given range.
837
19.1k
        {
838
19.1k
            SCOPED_RAW_TIMER(&stats->get_timer);
839
19.1k
            file_blocks = get_impl(hash, context, range, cache_lock);
840
19.1k
        }
841
842
19.1k
        if (file_blocks.empty()) {
843
18.6k
            SCOPED_RAW_TIMER(&stats->set_timer);
844
18.6k
            file_blocks = split_range_into_cells(hash, context, offset, size,
845
18.6k
                                                 FileBlock::State::EMPTY, cache_lock);
846
18.6k
        } else {
847
566
            SCOPED_RAW_TIMER(&stats->set_timer);
848
566
            fill_holes_with_empty_file_blocks(file_blocks, hash, context, range, cache_lock);
849
566
        }
850
19.1k
        DCHECK(!file_blocks.empty());
851
19.1k
        *_num_read_blocks << file_blocks.size();
852
19.1k
        if (!context.is_warmup) {
853
19.1k
            *_no_warmup_num_read_blocks << file_blocks.size();
854
19.1k
        }
855
20.6k
        for (auto& block : file_blocks) {
856
20.6k
            size_t block_size = block->range().size();
857
20.6k
            *_total_read_size_metrics << block_size;
858
20.6k
            if (block->state_unsafe() == FileBlock::State::DOWNLOADED) {
859
1.46k
                *_num_hit_blocks << 1;
860
1.46k
                *_total_hit_size_metrics << block_size;
861
1.46k
                if (!context.is_warmup) {
862
1.46k
                    *_no_warmup_num_hit_blocks << 1;
863
1.46k
                }
864
1.46k
            }
865
20.6k
        }
866
19.1k
    }
867
19.1k
    *_get_or_set_latency_us << (duration / 1000);
868
19.1k
    return FileBlocksHolder(std::move(file_blocks));
869
19.1k
}
870
871
FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& hash, const CacheContext& context,
872
                                        size_t offset, size_t size, FileBlock::State state,
873
17.4k
                                        std::lock_guard<std::mutex>& cache_lock) {
874
    /// Create a file block cell and put it in `files` map by [hash][offset].
875
17.4k
    if (size == 0) {
876
0
        return nullptr; /// Empty files are not cached.
877
0
    }
878
879
17.4k
    VLOG_DEBUG << "Adding file block to cache. size=" << size << " hash=" << hash.to_string()
880
0
               << " offset=" << offset << " cache_type=" << cache_type_to_string(context.cache_type)
881
0
               << " expiration_time=" << context.expiration_time
882
0
               << " tablet_id=" << context.tablet_id;
883
884
17.4k
    if (size > 1024 * 1024 * 1024) {
885
0
        LOG(WARNING) << "File block size is too large for a block. size=" << size
886
0
                     << " hash=" << hash.to_string() << " offset=" << offset
887
0
                     << " stack:" << get_stack_trace();
888
0
    }
889
890
17.4k
    auto& offsets = _files[hash];
891
17.4k
    auto itr = offsets.find(offset);
892
17.4k
    if (itr != offsets.end()) {
893
10
        VLOG_DEBUG << "Cache already exists for hash: " << hash.to_string()
894
0
                   << ", offset: " << offset << ", size: " << size
895
0
                   << ".\nCurrent cache structure: " << dump_structure_unlocked(hash, cache_lock);
896
10
        return &(itr->second);
897
10
    }
898
899
17.4k
    FileCacheKey key;
900
17.4k
    key.hash = hash;
901
17.4k
    key.offset = offset;
902
17.4k
    key.meta.type = context.cache_type;
903
17.4k
    key.meta.expiration_time = context.expiration_time;
904
17.4k
    key.meta.tablet_id = context.tablet_id;
905
17.4k
    FileBlockCell cell(std::make_shared<FileBlock>(key, size, this, state), cache_lock);
906
17.4k
    Status st;
907
17.4k
    if (context.expiration_time == 0 && context.cache_type == FileCacheType::TTL) {
908
0
        st = cell.file_block->change_cache_type_lock(FileCacheType::NORMAL, cache_lock);
909
17.4k
    } else if (context.cache_type != FileCacheType::TTL && context.expiration_time != 0) {
910
2
        st = cell.file_block->change_cache_type_lock(FileCacheType::TTL, cache_lock);
911
2
    }
912
17.4k
    if (!st.ok()) {
913
0
        LOG(WARNING) << "Cannot change cache type. expiration_time=" << context.expiration_time
914
0
                     << " cache_type=" << cache_type_to_string(context.cache_type)
915
0
                     << " error=" << st.msg();
916
0
    }
917
918
17.4k
    auto& queue = get_queue(cell.file_block->cache_type());
919
17.4k
    cell.queue_iterator = queue.add(hash, offset, size, cache_lock);
920
17.4k
    _lru_recorder->record_queue_event(cell.file_block->cache_type(), CacheLRULogType::ADD,
921
17.4k
                                      cell.file_block->get_hash_value(), cell.file_block->offset(),
922
17.4k
                                      cell.size());
923
924
17.4k
    if (cell.file_block->cache_type() == FileCacheType::TTL) {
925
7.84k
        _cur_ttl_size += cell.size();
926
7.84k
    }
927
17.4k
    auto [it, _] = offsets.insert(std::make_pair(offset, std::move(cell)));
928
17.4k
    _cur_cache_size += size;
929
17.4k
    return &(it->second);
930
17.4k
}
931
932
0
size_t BlockFileCache::try_release() {
933
0
    SCOPED_CACHE_LOCK(_mutex, this);
934
0
    std::vector<FileBlockCell*> trash;
935
0
    for (auto& [hash, blocks] : _files) {
936
0
        for (auto& [offset, cell] : blocks) {
937
0
            if (cell.releasable()) {
938
0
                trash.emplace_back(&cell);
939
0
            } else {
940
0
                cell.file_block->set_deleting();
941
0
            }
942
0
        }
943
0
    }
944
0
    size_t remove_size = 0;
945
0
    for (auto& cell : trash) {
946
0
        FileBlockSPtr file_block = cell->file_block;
947
0
        std::lock_guard lc(cell->file_block->_mutex);
948
0
        remove_size += file_block->range().size();
949
0
        remove(file_block, cache_lock, lc);
950
0
        VLOG_DEBUG << "try_release " << _cache_base_path
951
0
                   << " hash=" << file_block->get_hash_value().to_string()
952
0
                   << " offset=" << file_block->offset();
953
0
    }
954
0
    *_evict_by_try_release << remove_size;
955
0
    LOG(INFO) << "Released " << trash.size() << " blocks in file cache " << _cache_base_path;
956
0
    return trash.size();
957
0
}
958
959
85.7k
LRUQueue& BlockFileCache::get_queue(FileCacheType type) {
960
85.7k
    switch (type) {
961
21.1k
    case FileCacheType::INDEX:
962
21.1k
        return _index_queue;
963
20.7k
    case FileCacheType::DISPOSABLE:
964
20.7k
        return _disposable_queue;
965
32.2k
    case FileCacheType::NORMAL:
966
32.2k
        return _normal_queue;
967
11.5k
    case FileCacheType::TTL:
968
11.5k
        return _ttl_queue;
969
0
    default:
970
0
        DCHECK(false);
971
85.7k
    }
972
0
    return _normal_queue;
973
85.7k
}
974
975
274
const LRUQueue& BlockFileCache::get_queue(FileCacheType type) const {
976
274
    switch (type) {
977
66
    case FileCacheType::INDEX:
978
66
        return _index_queue;
979
2
    case FileCacheType::DISPOSABLE:
980
2
        return _disposable_queue;
981
206
    case FileCacheType::NORMAL:
982
206
        return _normal_queue;
983
0
    case FileCacheType::TTL:
984
0
        return _ttl_queue;
985
0
    default:
986
0
        DCHECK(false);
987
274
    }
988
0
    return _normal_queue;
989
274
}
990
991
void BlockFileCache::remove_file_blocks(std::vector<FileBlockCell*>& to_evict,
992
                                        std::lock_guard<std::mutex>& cache_lock, bool sync,
993
24.2k
                                        std::string& reason) {
994
24.2k
    auto remove_file_block_if = [&](FileBlockCell* cell) {
995
3.14k
        FileBlockSPtr file_block = cell->file_block;
996
3.14k
        if (file_block) {
997
3.14k
            std::lock_guard block_lock(file_block->_mutex);
998
3.14k
            remove(file_block, cache_lock, block_lock, sync);
999
3.14k
            VLOG_DEBUG << "remove_file_blocks"
1000
0
                       << " hash=" << file_block->get_hash_value().to_string()
1001
0
                       << " offset=" << file_block->offset() << " reason=" << reason;
1002
3.14k
        }
1003
3.14k
    };
1004
24.2k
    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
1005
24.2k
}
1006
1007
void BlockFileCache::find_evict_candidates(LRUQueue& queue, size_t size, size_t cur_cache_size,
1008
                                           size_t& removed_size,
1009
                                           std::vector<FileBlockCell*>& to_evict,
1010
                                           std::lock_guard<std::mutex>& cache_lock,
1011
5.36k
                                           size_t& cur_removed_size, bool evict_in_advance) {
1012
1.16M
    for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1013
1.16M
        if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1014
2.83k
            break;
1015
2.83k
        }
1016
1.15M
        auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1017
1018
1.15M
        DCHECK(cell) << "Cache became inconsistent. key: " << entry_key.to_string()
1019
0
                     << ", offset: " << entry_offset;
1020
1021
1.15M
        size_t cell_size = cell->size();
1022
1.15M
        DCHECK(entry_size == cell_size);
1023
1024
1.15M
        if (cell->releasable()) {
1025
3.11k
            auto& file_block = cell->file_block;
1026
1027
3.11k
            std::lock_guard block_lock(file_block->_mutex);
1028
3.11k
            DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1029
3.11k
            to_evict.push_back(cell);
1030
3.11k
            removed_size += cell_size;
1031
3.11k
            cur_removed_size += cell_size;
1032
3.11k
        }
1033
1.15M
    }
1034
5.36k
}
1035
1036
// 1. if async load file cache not finish
1037
//     a. evict from lru queue
1038
// 2. if ttl cache
1039
//     a. evict from disposable/normal/index queue one by one
1040
// 3. if dont reach query limit or dont have query limit
1041
//     a. evict from other queue
1042
//     b. evict from current queue
1043
//         a.1 if the data belong write, then just evict cold data
1044
// 4. if reach query limit
1045
//     a. evict from query queue
1046
//     b. evict from other queue
1047
bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const CacheContext& context,
1048
                                 size_t offset, size_t size,
1049
19.0k
                                 std::lock_guard<std::mutex>& cache_lock) {
1050
19.0k
    if (!_async_open_done) {
1051
8
        return try_reserve_during_async_load(size, cache_lock);
1052
8
    }
1053
    // use this strategy in scenarios where there is insufficient disk capacity or insufficient number of inodes remaining
1054
    // directly eliminate 5 times the size of the space
1055
19.0k
    if (_disk_resource_limit_mode) {
1056
2
        size = 5 * size;
1057
2
    }
1058
1059
19.0k
    auto query_context = config::enable_file_cache_query_limit &&
1060
19.0k
                                         (context.query_id.hi != 0 || context.query_id.lo != 0)
1061
19.0k
                                 ? get_query_context(context.query_id, cache_lock)
1062
19.0k
                                 : nullptr;
1063
19.0k
    if (!query_context) {
1064
19.0k
        return try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock);
1065
19.0k
    } else if (query_context->get_cache_size(cache_lock) + size <=
1066
62
               query_context->get_max_cache_size()) {
1067
32
        return try_reserve_for_lru(hash, query_context, context, offset, size, cache_lock);
1068
32
    }
1069
30
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
1070
30
                               std::chrono::steady_clock::now().time_since_epoch())
1071
30
                               .count();
1072
30
    auto& queue = get_queue(context.cache_type);
1073
30
    size_t removed_size = 0;
1074
30
    size_t ghost_remove_size = 0;
1075
30
    size_t queue_size = queue.get_capacity(cache_lock);
1076
30
    size_t cur_cache_size = _cur_cache_size;
1077
30
    size_t query_context_cache_size = query_context->get_cache_size(cache_lock);
1078
1079
30
    std::vector<LRUQueue::Iterator> ghost;
1080
30
    std::vector<FileBlockCell*> to_evict;
1081
1082
30
    size_t max_size = queue.get_max_size();
1083
96
    auto is_overflow = [&] {
1084
96
        return _disk_resource_limit_mode ? removed_size < size
1085
96
                                         : cur_cache_size + size - removed_size > _capacity ||
1086
96
                                                   (queue_size + size - removed_size > max_size) ||
1087
96
                                                   (query_context_cache_size + size -
1088
76
                                                            (removed_size + ghost_remove_size) >
1089
76
                                                    query_context->get_max_cache_size());
1090
96
    };
1091
1092
    /// Select the cache from the LRU queue held by query for expulsion.
1093
70
    for (auto iter = query_context->queue().begin(); iter != query_context->queue().end(); iter++) {
1094
66
        if (!is_overflow()) {
1095
26
            break;
1096
26
        }
1097
1098
40
        auto* cell = get_cell(iter->hash, iter->offset, cache_lock);
1099
1100
40
        if (!cell) {
1101
            /// The cache corresponding to this record may be swapped out by
1102
            /// other queries, so it has become invalid.
1103
8
            ghost.push_back(iter);
1104
8
            ghost_remove_size += iter->size;
1105
32
        } else {
1106
32
            size_t cell_size = cell->size();
1107
32
            DCHECK(iter->size == cell_size);
1108
1109
32
            if (cell->releasable()) {
1110
32
                auto& file_block = cell->file_block;
1111
1112
32
                std::lock_guard block_lock(file_block->_mutex);
1113
32
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1114
32
                to_evict.push_back(cell);
1115
32
                removed_size += cell_size;
1116
32
            }
1117
32
        }
1118
40
    }
1119
1120
32
    auto remove_file_block_if = [&](FileBlockCell* cell) {
1121
32
        FileBlockSPtr file_block = cell->file_block;
1122
32
        if (file_block) {
1123
32
            query_context->remove(file_block->get_hash_value(), file_block->offset(), cache_lock);
1124
32
            std::lock_guard block_lock(file_block->_mutex);
1125
32
            remove(file_block, cache_lock, block_lock);
1126
32
        }
1127
32
    };
1128
1129
30
    for (auto& iter : ghost) {
1130
8
        query_context->remove(iter->hash, iter->offset, cache_lock);
1131
8
    }
1132
1133
30
    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
1134
1135
30
    if (is_overflow() &&
1136
30
        !try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock)) {
1137
2
        return false;
1138
2
    }
1139
28
    query_context->reserve(hash, offset, size, cache_lock);
1140
28
    return true;
1141
30
}
1142
1143
4
void BlockFileCache::try_evict_in_advance(size_t size, std::lock_guard<std::mutex>& cache_lock) {
1144
4
    UInt128Wrapper hash = UInt128Wrapper();
1145
4
    size_t offset = 0;
1146
4
    CacheContext context;
1147
    /* we pick NORMAL and TTL cache to evict in advance
1148
     * we reserve for them but won't acutually give space to them
1149
     * on the contrary, NORMAL and TTL may sacrifice by LRU evicting themselves
1150
     * other cache types cannot be exempted because we will evict what they have stolen before LRU evicting
1151
     * in summary: all cache types will shrink somewhat, and NORMAL and TTL shrink the most, to make sure the cache is not full
1152
     */
1153
4
    context.cache_type = FileCacheType::NORMAL;
1154
4
    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
1155
4
    context.cache_type = FileCacheType::TTL;
1156
4
    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
1157
4
}
1158
1159
// remove specific cache synchronously, for critical operations
1160
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
1161
16
void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) {
1162
16
    std::string reason = "remove_if_cached";
1163
16
    SCOPED_CACHE_LOCK(_mutex, this);
1164
16
    auto iter = _files.find(file_key);
1165
16
    std::vector<FileBlockCell*> to_remove;
1166
16
    if (iter != _files.end()) {
1167
28
        for (auto& [_, cell] : iter->second) {
1168
28
            if (cell.releasable()) {
1169
26
                to_remove.push_back(&cell);
1170
26
            } else {
1171
2
                cell.file_block->set_deleting();
1172
2
            }
1173
28
        }
1174
12
    }
1175
16
    remove_file_blocks(to_remove, cache_lock, true, reason);
1176
16
}
1177
1178
// the async version of remove_if_cached, for background operations
1179
// cache meta is deleted synchronously if not in use, and the block file is deleted asynchronously
1180
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
1181
2
void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) {
1182
2
    std::string reason = "remove_if_cached_async";
1183
2
    SCOPED_CACHE_LOCK(_mutex, this);
1184
1185
2
    auto iter = _files.find(file_key);
1186
2
    std::vector<FileBlockCell*> to_remove;
1187
2
    if (iter != _files.end()) {
1188
2
        for (auto& [_, cell] : iter->second) {
1189
2
            *_gc_evict_bytes_metrics << cell.size();
1190
2
            *_gc_evict_count_metrics << 1;
1191
2
            if (cell.releasable()) {
1192
0
                to_remove.push_back(&cell);
1193
2
            } else {
1194
2
                cell.file_block->set_deleting();
1195
2
            }
1196
2
        }
1197
2
    }
1198
2
    remove_file_blocks(to_remove, cache_lock, false, reason);
1199
2
}
1200
1201
std::vector<FileCacheType> BlockFileCache::get_other_cache_type_without_ttl(
1202
19.0k
        FileCacheType cur_cache_type) {
1203
19.0k
    switch (cur_cache_type) {
1204
7.82k
    case FileCacheType::TTL:
1205
7.82k
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
1206
1.34k
    case FileCacheType::INDEX:
1207
1.34k
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL};
1208
8.65k
    case FileCacheType::NORMAL:
1209
8.65k
        return {FileCacheType::DISPOSABLE, FileCacheType::INDEX};
1210
1.23k
    case FileCacheType::DISPOSABLE:
1211
1.23k
        return {FileCacheType::NORMAL, FileCacheType::INDEX};
1212
0
    default:
1213
0
        return {};
1214
19.0k
    }
1215
0
    return {};
1216
19.0k
}
1217
1218
5.18k
std::vector<FileCacheType> BlockFileCache::get_other_cache_type(FileCacheType cur_cache_type) {
1219
5.18k
    switch (cur_cache_type) {
1220
568
    case FileCacheType::TTL:
1221
568
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
1222
286
    case FileCacheType::INDEX:
1223
286
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::TTL};
1224
4.02k
    case FileCacheType::NORMAL:
1225
4.02k
        return {FileCacheType::DISPOSABLE, FileCacheType::INDEX, FileCacheType::TTL};
1226
304
    case FileCacheType::DISPOSABLE:
1227
304
        return {FileCacheType::NORMAL, FileCacheType::INDEX, FileCacheType::TTL};
1228
0
    default:
1229
0
        return {};
1230
5.18k
    }
1231
0
    return {};
1232
5.18k
}
1233
1234
void BlockFileCache::reset_range(const UInt128Wrapper& hash, size_t offset, size_t old_size,
1235
8
                                 size_t new_size, std::lock_guard<std::mutex>& cache_lock) {
1236
8
    DCHECK(_files.find(hash) != _files.end() &&
1237
8
           _files.find(hash)->second.find(offset) != _files.find(hash)->second.end());
1238
8
    FileBlockCell* cell = get_cell(hash, offset, cache_lock);
1239
8
    DCHECK(cell != nullptr);
1240
8
    if (cell == nullptr) {
1241
0
        LOG(WARNING) << "reset_range skipped because cache cell is missing. hash="
1242
0
                     << hash.to_string() << " offset=" << offset << " old_size=" << old_size
1243
0
                     << " new_size=" << new_size;
1244
0
        return;
1245
0
    }
1246
8
    if (cell->queue_iterator) {
1247
8
        auto& queue = get_queue(cell->file_block->cache_type());
1248
8
        DCHECK(queue.contains(hash, offset, cache_lock));
1249
8
        queue.resize(*cell->queue_iterator, new_size, cache_lock);
1250
8
        _lru_recorder->record_queue_event(cell->file_block->cache_type(), CacheLRULogType::RESIZE,
1251
8
                                          cell->file_block->get_hash_value(),
1252
8
                                          cell->file_block->offset(), new_size);
1253
8
    }
1254
8
    _cur_cache_size -= old_size;
1255
8
    _cur_cache_size += new_size;
1256
8
}
1257
1258
bool BlockFileCache::try_reserve_from_other_queue_by_time_interval(
1259
        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
1260
19.0k
        int64_t cur_time, std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
1261
19.0k
    size_t removed_size = 0;
1262
19.0k
    size_t cur_cache_size = _cur_cache_size;
1263
19.0k
    std::vector<FileBlockCell*> to_evict;
1264
45.9k
    for (FileCacheType cache_type : other_cache_types) {
1265
45.9k
        auto& queue = get_queue(cache_type);
1266
45.9k
        size_t remove_size_per_type = 0;
1267
45.9k
        for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1268
2.87k
            if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1269
1.39k
                break;
1270
1.39k
            }
1271
1.47k
            auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1272
1.47k
            DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
1273
0
                         << ", offset: " << entry_offset;
1274
1275
1.47k
            size_t cell_size = cell->size();
1276
1.47k
            DCHECK(entry_size == cell_size);
1277
1278
1.47k
            if (cell->atime == 0 ? true : cell->atime + queue.get_hot_data_interval() > cur_time) {
1279
1.47k
                break;
1280
1.47k
            }
1281
1282
4
            if (cell->releasable()) {
1283
4
                auto& file_block = cell->file_block;
1284
4
                std::lock_guard block_lock(file_block->_mutex);
1285
4
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1286
4
                to_evict.push_back(cell);
1287
4
                removed_size += cell_size;
1288
4
                remove_size_per_type += cell_size;
1289
4
            }
1290
4
        }
1291
45.9k
        *(_evict_by_time_metrics_matrix[cache_type][cur_type]) << remove_size_per_type;
1292
45.9k
    }
1293
19.0k
    bool is_sync_removal = !evict_in_advance;
1294
19.0k
    std::string reason = std::string("try_reserve_by_time ") +
1295
19.0k
                         " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1296
19.0k
    remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1297
1298
19.0k
    return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
1299
19.0k
}
1300
1301
bool BlockFileCache::is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size,
1302
1.18M
                                 bool evict_in_advance) const {
1303
1.18M
    bool ret = false;
1304
1.18M
    if (evict_in_advance) { // we don't need to check _need_evict_cache_in_advance
1305
46
        ret = (removed_size < need_size);
1306
46
        return ret;
1307
46
    }
1308
1.18M
    if (_disk_resource_limit_mode) {
1309
8
        ret = (removed_size < need_size);
1310
1.18M
    } else {
1311
1.18M
        ret = (cur_cache_size + need_size - removed_size > _capacity);
1312
1.18M
    }
1313
1.18M
    return ret;
1314
1.18M
}
1315
1316
bool BlockFileCache::try_reserve_from_other_queue_by_size(
1317
        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
1318
830
        std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
1319
830
    size_t removed_size = 0;
1320
830
    size_t cur_cache_size = _cur_cache_size;
1321
830
    std::vector<FileBlockCell*> to_evict;
1322
    // we follow the privilege defined in get_other_cache_types to evict
1323
2.49k
    for (FileCacheType cache_type : other_cache_types) {
1324
2.49k
        auto& queue = get_queue(cache_type);
1325
1326
        // we will not drain each of them to the bottom -- i.e., we only
1327
        // evict what they have stolen.
1328
2.49k
        size_t cur_queue_size = queue.get_capacity(cache_lock);
1329
2.49k
        size_t cur_queue_max_size = queue.get_max_size();
1330
2.49k
        if (cur_queue_size <= cur_queue_max_size) {
1331
1.48k
            continue;
1332
1.48k
        }
1333
1.01k
        size_t cur_removed_size = 0;
1334
1.01k
        find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
1335
1.01k
                              cur_removed_size, evict_in_advance);
1336
1.01k
        *(_evict_by_size_metrics_matrix[cache_type][cur_type]) << cur_removed_size;
1337
1.01k
    }
1338
830
    bool is_sync_removal = !evict_in_advance;
1339
830
    std::string reason = std::string("try_reserve_by_size") +
1340
830
                         " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1341
830
    remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1342
830
    return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
1343
830
}
1344
1345
bool BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type, size_t size,
1346
                                                  int64_t cur_time,
1347
                                                  std::lock_guard<std::mutex>& cache_lock,
1348
19.0k
                                                  bool evict_in_advance) {
1349
    // currently, TTL cache is not considered as a candidate
1350
19.0k
    auto other_cache_types = get_other_cache_type_without_ttl(cur_cache_type);
1351
19.0k
    bool reserve_success = try_reserve_from_other_queue_by_time_interval(
1352
19.0k
            cur_cache_type, other_cache_types, size, cur_time, cache_lock, evict_in_advance);
1353
19.0k
    if (reserve_success || !config::file_cache_enable_evict_from_other_queue_by_size) {
1354
13.8k
        return reserve_success;
1355
13.8k
    }
1356
1357
5.18k
    other_cache_types = get_other_cache_type(cur_cache_type);
1358
5.18k
    auto& cur_queue = get_queue(cur_cache_type);
1359
5.18k
    size_t cur_queue_size = cur_queue.get_capacity(cache_lock);
1360
5.18k
    size_t cur_queue_max_size = cur_queue.get_max_size();
1361
    // Hit the soft limit by self, cannot remove from other queues
1362
5.18k
    if (_cur_cache_size + size > _capacity && cur_queue_size + size > cur_queue_max_size) {
1363
4.35k
        return false;
1364
4.35k
    }
1365
830
    return try_reserve_from_other_queue_by_size(cur_cache_type, other_cache_types, size, cache_lock,
1366
830
                                                evict_in_advance);
1367
5.18k
}
1368
1369
bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash,
1370
                                         QueryFileCacheContextPtr query_context,
1371
                                         const CacheContext& context, size_t offset, size_t size,
1372
                                         std::lock_guard<std::mutex>& cache_lock,
1373
19.0k
                                         bool evict_in_advance) {
1374
19.0k
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
1375
19.0k
                               std::chrono::steady_clock::now().time_since_epoch())
1376
19.0k
                               .count();
1377
19.0k
    if (!try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock,
1378
19.0k
                                      evict_in_advance)) {
1379
4.35k
        auto& queue = get_queue(context.cache_type);
1380
4.35k
        size_t removed_size = 0;
1381
4.35k
        size_t cur_cache_size = _cur_cache_size;
1382
1383
4.35k
        std::vector<FileBlockCell*> to_evict;
1384
4.35k
        size_t cur_removed_size = 0;
1385
4.35k
        find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
1386
4.35k
                              cur_removed_size, evict_in_advance);
1387
4.35k
        bool is_sync_removal = !evict_in_advance;
1388
4.35k
        std::string reason = std::string("try_reserve for cache type ") +
1389
4.35k
                             cache_type_to_string(context.cache_type) +
1390
4.35k
                             " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1391
4.35k
        remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1392
4.35k
        *(_evict_by_self_lru_metrics_matrix[context.cache_type]) << cur_removed_size;
1393
1394
4.35k
        if (is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1395
2.52k
            return false;
1396
2.52k
        }
1397
4.35k
    }
1398
1399
16.5k
    if (query_context) {
1400
32
        query_context->reserve(hash, offset, size, cache_lock);
1401
32
    }
1402
16.5k
    return true;
1403
19.0k
}
1404
1405
template <class T, class U>
1406
    requires IsXLock<T> && IsXLock<U>
1407
7.28k
void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lock, bool sync) {
1408
7.28k
    auto hash = file_block->get_hash_value();
1409
7.28k
    auto offset = file_block->offset();
1410
7.28k
    auto type = file_block->cache_type();
1411
7.28k
    auto expiration_time = file_block->expiration_time();
1412
7.28k
    auto tablet_id = file_block->tablet_id();
1413
7.28k
    auto* cell = get_cell(hash, offset, cache_lock);
1414
7.28k
    file_block->cell = nullptr;
1415
7.28k
    DCHECK(cell);
1416
7.28k
    DCHECK(cell->queue_iterator);
1417
7.28k
    if (cell->queue_iterator) {
1418
7.28k
        auto& queue = get_queue(file_block->cache_type());
1419
7.28k
        queue.remove(*cell->queue_iterator, cache_lock);
1420
7.28k
        _lru_recorder->record_queue_event(file_block->cache_type(), CacheLRULogType::REMOVE,
1421
7.28k
                                          cell->file_block->get_hash_value(),
1422
7.28k
                                          cell->file_block->offset(), cell->size());
1423
7.28k
    }
1424
7.28k
    *_queue_evict_size_metrics[static_cast<int>(file_block->cache_type())]
1425
7.28k
            << file_block->range().size();
1426
7.28k
    *_total_evict_size_metrics << file_block->range().size();
1427
1428
7.28k
    VLOG_DEBUG << "Removing file block from cache. hash: " << hash.to_string()
1429
0
               << ", offset: " << offset << ", size: " << file_block->range().size()
1430
0
               << ", type: " << cache_type_to_string(type);
1431
1432
7.28k
    if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADED) {
1433
3.30k
        FileCacheKey key;
1434
3.30k
        key.hash = hash;
1435
3.30k
        key.offset = offset;
1436
3.30k
        key.meta.type = type;
1437
3.30k
        key.meta.expiration_time = expiration_time;
1438
3.30k
        key.meta.tablet_id = tablet_id;
1439
3.30k
        if (sync) {
1440
3.18k
            int64_t duration_ns = 0;
1441
3.18k
            Status st;
1442
3.18k
            {
1443
3.18k
                SCOPED_RAW_TIMER(&duration_ns);
1444
3.18k
                st = _storage->remove(key);
1445
3.18k
            }
1446
3.18k
            *_storage_sync_remove_latency_us << (duration_ns / 1000);
1447
3.18k
            if (!st.ok()) {
1448
336
                LOG_WARNING("").error(st);
1449
336
            }
1450
3.18k
        } else {
1451
            // the file will be deleted in the bottom half
1452
            // so there will be a window that the file is not in the cache but still in the storage
1453
            // but it's ok, because the rowset is stale already
1454
116
            bool ret = _recycle_keys.enqueue(key);
1455
116
            if (ret) [[likely]] {
1456
116
                *_recycle_keys_length_recorder << _recycle_keys.size_approx();
1457
116
            } else {
1458
0
                LOG_WARNING("Failed to push recycle key to queue, do it synchronously");
1459
0
                int64_t duration_ns = 0;
1460
0
                Status st;
1461
0
                {
1462
0
                    SCOPED_RAW_TIMER(&duration_ns);
1463
0
                    st = _storage->remove(key);
1464
0
                }
1465
0
                *_storage_retry_sync_remove_latency_us << (duration_ns / 1000);
1466
0
                if (!st.ok()) {
1467
0
                    LOG_WARNING("").error(st);
1468
0
                }
1469
0
            }
1470
116
        }
1471
3.97k
    } else if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADING) {
1472
0
        file_block->set_deleting();
1473
0
        return;
1474
0
    }
1475
7.28k
    _cur_cache_size -= file_block->range().size();
1476
7.28k
    if (FileCacheType::TTL == type) {
1477
2.58k
        _cur_ttl_size -= file_block->range().size();
1478
2.58k
    }
1479
7.28k
    auto it = _files.find(hash);
1480
7.28k
    if (it != _files.end()) {
1481
7.28k
        it->second.erase(file_block->offset());
1482
7.28k
        if (it->second.empty()) {
1483
3.10k
            _files.erase(hash);
1484
3.10k
        }
1485
7.28k
    }
1486
7.28k
    *_num_removed_blocks << 1;
1487
7.28k
}
1488
1489
92
size_t BlockFileCache::get_used_cache_size(FileCacheType cache_type) const {
1490
92
    SCOPED_CACHE_LOCK(_mutex, this);
1491
92
    return get_used_cache_size_unlocked(cache_type, cache_lock);
1492
92
}
1493
1494
size_t BlockFileCache::get_used_cache_size_unlocked(FileCacheType cache_type,
1495
92
                                                    std::lock_guard<std::mutex>& cache_lock) const {
1496
92
    return get_queue(cache_type).get_capacity(cache_lock);
1497
92
}
1498
1499
0
size_t BlockFileCache::get_available_cache_size(FileCacheType cache_type) const {
1500
0
    SCOPED_CACHE_LOCK(_mutex, this);
1501
0
    return get_available_cache_size_unlocked(cache_type, cache_lock);
1502
0
}
1503
1504
size_t BlockFileCache::get_available_cache_size_unlocked(
1505
0
        FileCacheType cache_type, std::lock_guard<std::mutex>& cache_lock) const {
1506
0
    return get_queue(cache_type).get_max_element_size() -
1507
0
           get_used_cache_size_unlocked(cache_type, cache_lock);
1508
0
}
1509
1510
176
size_t BlockFileCache::get_file_blocks_num(FileCacheType cache_type) const {
1511
176
    SCOPED_CACHE_LOCK(_mutex, this);
1512
176
    return get_file_blocks_num_unlocked(cache_type, cache_lock);
1513
176
}
1514
1515
size_t BlockFileCache::get_file_blocks_num_unlocked(FileCacheType cache_type,
1516
176
                                                    std::lock_guard<std::mutex>& cache_lock) const {
1517
176
    return get_queue(cache_type).get_elements_num(cache_lock);
1518
176
}
1519
1520
FileBlockCell::FileBlockCell(FileBlockSPtr file_block, std::lock_guard<std::mutex>& cache_lock)
1521
17.4k
        : file_block(file_block) {
1522
17.4k
    file_block->cell = this;
1523
    /**
1524
     * Cell can be created with either DOWNLOADED or EMPTY file block's state.
1525
     * File block acquires DOWNLOADING state and creates LRUQueue iterator on first
1526
     * successful getOrSetDownaloder call.
1527
     */
1528
1529
17.4k
    switch (file_block->_download_state) {
1530
881
    case FileBlock::State::DOWNLOADED:
1531
17.4k
    case FileBlock::State::EMPTY:
1532
17.4k
    case FileBlock::State::SKIP_CACHE: {
1533
17.4k
        break;
1534
17.4k
    }
1535
0
    default:
1536
0
        DCHECK(false) << "Can create cell with either EMPTY, DOWNLOADED, SKIP_CACHE state, got: "
1537
0
                      << FileBlock::state_to_string(file_block->_download_state);
1538
17.4k
    }
1539
17.4k
    if (file_block->cache_type() == FileCacheType::TTL) {
1540
7.84k
        update_atime();
1541
7.84k
    }
1542
17.4k
}
1543
1544
LRUQueue::Iterator LRUQueue::add(const UInt128Wrapper& hash, size_t offset, size_t size,
1545
19.5k
                                 std::lock_guard<std::mutex>& /* cache_lock */) {
1546
19.5k
    cache_size += size;
1547
19.5k
    auto iter = queue.insert(queue.end(), FileKeyAndOffset(hash, offset, size));
1548
19.5k
    map.insert(std::make_pair(std::make_pair(hash, offset), iter));
1549
19.5k
    return iter;
1550
19.5k
}
1551
1552
0
void LRUQueue::remove_all(std::lock_guard<std::mutex>& /* cache_lock */) {
1553
0
    queue.clear();
1554
0
    map.clear();
1555
0
    cache_size = 0;
1556
0
}
1557
1558
3.67k
void LRUQueue::move_to_end(Iterator queue_it, std::lock_guard<std::mutex>& /* cache_lock */) {
1559
3.67k
    queue.splice(queue.end(), queue, queue_it);
1560
3.67k
}
1561
1562
void LRUQueue::resize(Iterator queue_it, size_t new_size,
1563
10
                      std::lock_guard<std::mutex>& /* cache_lock */) {
1564
10
    cache_size -= queue_it->size;
1565
10
    queue_it->size = new_size;
1566
10
    cache_size += new_size;
1567
10
}
1568
bool LRUQueue::contains(const UInt128Wrapper& hash, size_t offset,
1569
8
                        std::lock_guard<std::mutex>& /* cache_lock */) const {
1570
8
    return map.find(std::make_pair(hash, offset)) != map.end();
1571
8
}
1572
1573
LRUQueue::Iterator LRUQueue::get(const UInt128Wrapper& hash, size_t offset,
1574
1.66k
                                 std::lock_guard<std::mutex>& /* cache_lock */) const {
1575
1.66k
    auto itr = map.find(std::make_pair(hash, offset));
1576
1.66k
    if (itr != map.end()) {
1577
890
        return itr->second;
1578
890
    }
1579
776
    return std::list<FileKeyAndOffset>::iterator();
1580
1.66k
}
1581
1582
4
std::string LRUQueue::to_string(std::lock_guard<std::mutex>& /* cache_lock */) const {
1583
4
    std::string result;
1584
36
    for (const auto& [hash, offset, size] : queue) {
1585
36
        if (!result.empty()) {
1586
32
            result += ", ";
1587
32
        }
1588
36
        result += fmt::format("{}: [{}, {}]", hash.to_string(), offset, offset + size - 1);
1589
36
    }
1590
4
    return result;
1591
4
}
1592
1593
size_t LRUQueue::levenshtein_distance_from(LRUQueue& base,
1594
10
                                           std::lock_guard<std::mutex>& cache_lock) {
1595
10
    std::list<FileKeyAndOffset> target_queue = this->queue;
1596
10
    std::list<FileKeyAndOffset> base_queue = base.queue;
1597
10
    std::vector<FileKeyAndOffset> vec1(target_queue.begin(), target_queue.end());
1598
10
    std::vector<FileKeyAndOffset> vec2(base_queue.begin(), base_queue.end());
1599
1600
10
    size_t m = vec1.size();
1601
10
    size_t n = vec2.size();
1602
1603
    // Create a 2D vector (matrix) to store the Levenshtein distances
1604
    // dp[i][j] will hold the distance between the first i elements of vec1 and the first j elements of vec2
1605
10
    std::vector<std::vector<size_t>> dp(m + 1, std::vector<size_t>(n + 1, 0));
1606
1607
    // Initialize the first row and column of the matrix
1608
    // The distance between an empty list and a list of length k is k (all insertions or deletions)
1609
44
    for (size_t i = 0; i <= m; ++i) {
1610
34
        dp[i][0] = i;
1611
34
    }
1612
40
    for (size_t j = 0; j <= n; ++j) {
1613
30
        dp[0][j] = j;
1614
30
    }
1615
1616
    // Fill the matrix using dynamic programming
1617
34
    for (size_t i = 1; i <= m; ++i) {
1618
76
        for (size_t j = 1; j <= n; ++j) {
1619
            // Check if the current elements of both vectors are equal
1620
52
            size_t cost = (vec1[i - 1].hash == vec2[j - 1].hash &&
1621
52
                           vec1[i - 1].offset == vec2[j - 1].offset)
1622
52
                                  ? 0
1623
52
                                  : 1;
1624
            // Calculate the minimum cost of three possible operations:
1625
            // 1. Insertion: dp[i][j-1] + 1
1626
            // 2. Deletion: dp[i-1][j] + 1
1627
            // 3. Substitution: dp[i-1][j-1] + cost (0 if elements are equal, 1 if not)
1628
52
            dp[i][j] = std::min({dp[i - 1][j] + 1, dp[i][j - 1] + 1, dp[i - 1][j - 1] + cost});
1629
52
        }
1630
24
    }
1631
    // The bottom-right cell of the matrix contains the Levenshtein distance
1632
10
    return dp[m][n];
1633
10
}
1634
1635
2
std::string BlockFileCache::dump_structure(const UInt128Wrapper& hash) {
1636
2
    SCOPED_CACHE_LOCK(_mutex, this);
1637
2
    return dump_structure_unlocked(hash, cache_lock);
1638
2
}
1639
1640
std::string BlockFileCache::dump_structure_unlocked(const UInt128Wrapper& hash,
1641
2
                                                    std::lock_guard<std::mutex>&) {
1642
2
    std::stringstream result;
1643
2
    auto it = _files.find(hash);
1644
2
    if (it == _files.end()) {
1645
0
        return std::string("");
1646
0
    }
1647
2
    const auto& cells_by_offset = it->second;
1648
1649
2
    for (const auto& [_, cell] : cells_by_offset) {
1650
2
        result << cell.file_block->get_info_for_log() << " "
1651
2
               << cache_type_to_string(cell.file_block->cache_type()) << "\n";
1652
2
    }
1653
1654
2
    return result.str();
1655
2
}
1656
1657
0
std::string BlockFileCache::dump_single_cache_type(const UInt128Wrapper& hash, size_t offset) {
1658
0
    SCOPED_CACHE_LOCK(_mutex, this);
1659
0
    return dump_single_cache_type_unlocked(hash, offset, cache_lock);
1660
0
}
1661
1662
std::string BlockFileCache::dump_single_cache_type_unlocked(const UInt128Wrapper& hash,
1663
                                                            size_t offset,
1664
0
                                                            std::lock_guard<std::mutex>&) {
1665
0
    std::stringstream result;
1666
0
    auto it = _files.find(hash);
1667
0
    if (it == _files.end()) {
1668
0
        return std::string("");
1669
0
    }
1670
0
    const auto& cells_by_offset = it->second;
1671
0
    const auto& cell = cells_by_offset.find(offset);
1672
1673
0
    return cache_type_to_string(cell->second.file_block->cache_type());
1674
0
}
1675
1676
void BlockFileCache::change_cache_type(const UInt128Wrapper& hash, size_t offset,
1677
                                       FileCacheType new_type,
1678
20
                                       std::lock_guard<std::mutex>& cache_lock) {
1679
20
    if (auto iter = _files.find(hash); iter != _files.end()) {
1680
20
        auto& file_blocks = iter->second;
1681
20
        if (auto cell_it = file_blocks.find(offset); cell_it != file_blocks.end()) {
1682
18
            FileBlockCell& cell = cell_it->second;
1683
18
            auto& cur_queue = get_queue(cell.file_block->cache_type());
1684
18
            DCHECK(cell.queue_iterator.has_value());
1685
18
            cur_queue.remove(*cell.queue_iterator, cache_lock);
1686
18
            _lru_recorder->record_queue_event(
1687
18
                    cell.file_block->cache_type(), CacheLRULogType::REMOVE,
1688
18
                    cell.file_block->get_hash_value(), cell.file_block->offset(), cell.size());
1689
18
            auto& new_queue = get_queue(new_type);
1690
18
            cell.queue_iterator =
1691
18
                    new_queue.add(hash, offset, cell.file_block->range().size(), cache_lock);
1692
18
            _lru_recorder->record_queue_event(new_type, CacheLRULogType::ADD,
1693
18
                                              cell.file_block->get_hash_value(),
1694
18
                                              cell.file_block->offset(), cell.size());
1695
18
        }
1696
20
    }
1697
20
}
1698
1699
// @brief: get a path's disk capacity used percent, inode used percent
1700
// @param: path
1701
// @param: percent.first disk used percent, percent.second inode used percent
1702
325
int disk_used_percentage(const std::string& path, std::pair<int, int>* percent) {
1703
325
    struct statfs stat;
1704
325
    int ret = statfs(path.c_str(), &stat);
1705
325
    if (ret != 0) {
1706
4
        return ret;
1707
4
    }
1708
    // https://github.com/coreutils/coreutils/blob/master/src/df.c#L1195
1709
    // v->used = stat.f_blocks - stat.f_bfree
1710
    // nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail
1711
321
    uintmax_t u100 = (stat.f_blocks - stat.f_bfree) * 100;
1712
321
    uintmax_t nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail;
1713
321
    int capacity_percentage = int(u100 / nonroot_total + (u100 % nonroot_total != 0));
1714
1715
321
    unsigned long long inode_free = stat.f_ffree;
1716
321
    unsigned long long inode_total = stat.f_files;
1717
321
    int inode_percentage = cast_set<int>(inode_free * 100 / inode_total);
1718
321
    percent->first = capacity_percentage;
1719
321
    percent->second = 100 - inode_percentage;
1720
1721
    // Add sync point for testing
1722
321
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::disk_used_percentage:1", percent);
1723
1724
321
    return 0;
1725
325
}
1726
1727
20
std::string BlockFileCache::reset_capacity(size_t new_capacity) {
1728
20
    using namespace std::chrono;
1729
20
    int64_t space_released = 0;
1730
20
    size_t old_capacity = 0;
1731
20
    std::stringstream ss;
1732
20
    ss << "finish reset_capacity, path=" << _cache_base_path;
1733
20
    auto adjust_start_time = steady_clock::time_point();
1734
20
    {
1735
20
        SCOPED_CACHE_LOCK(_mutex, this);
1736
20
        if (new_capacity < _capacity && new_capacity < _cur_cache_size) {
1737
2
            int64_t need_remove_size = _cur_cache_size - new_capacity;
1738
8
            auto remove_blocks = [&](LRUQueue& queue) -> int64_t {
1739
8
                int64_t queue_released = 0;
1740
8
                std::vector<FileBlockCell*> to_evict;
1741
26
                for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1742
26
                    if (need_remove_size <= 0) {
1743
2
                        break;
1744
2
                    }
1745
24
                    need_remove_size -= entry_size;
1746
24
                    space_released += entry_size;
1747
24
                    queue_released += entry_size;
1748
24
                    auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1749
24
                    if (!cell->releasable()) {
1750
0
                        cell->file_block->set_deleting();
1751
0
                        continue;
1752
0
                    }
1753
24
                    to_evict.push_back(cell);
1754
24
                }
1755
24
                for (auto& cell : to_evict) {
1756
24
                    FileBlockSPtr file_block = cell->file_block;
1757
24
                    std::lock_guard block_lock(file_block->_mutex);
1758
24
                    remove(file_block, cache_lock, block_lock);
1759
24
                }
1760
8
                return queue_released;
1761
8
            };
1762
2
            int64_t queue_released = remove_blocks(_disposable_queue);
1763
2
            ss << " disposable_queue released " << queue_released;
1764
2
            queue_released = remove_blocks(_normal_queue);
1765
2
            ss << " normal_queue released " << queue_released;
1766
2
            queue_released = remove_blocks(_index_queue);
1767
2
            ss << " index_queue released " << queue_released;
1768
2
            queue_released = remove_blocks(_ttl_queue);
1769
2
            ss << " ttl_queue released " << queue_released;
1770
1771
2
            _disk_resource_limit_mode = true;
1772
2
            _disk_limit_mode_metrics->set_value(1);
1773
2
            ss << " total_space_released=" << space_released;
1774
2
        }
1775
20
        old_capacity = _capacity;
1776
20
        _capacity = new_capacity;
1777
20
        _cache_capacity_metrics->set_value(_capacity);
1778
20
    }
1779
20
    auto use_time = duration_cast<milliseconds>(steady_clock::time_point() - adjust_start_time);
1780
20
    LOG(INFO) << "Finish tag deleted block. path=" << _cache_base_path
1781
20
              << " use_time=" << cast_set<int64_t>(use_time.count());
1782
20
    ss << " old_capacity=" << old_capacity << " new_capacity=" << new_capacity;
1783
20
    LOG(INFO) << ss.str();
1784
20
    return ss.str();
1785
20
}
1786
1787
2.18k
void BlockFileCache::check_disk_resource_limit() {
1788
2.18k
    if (_storage->get_type() != FileCacheStorageType::DISK) {
1789
1.89k
        return;
1790
1.89k
    }
1791
1792
297
    bool previous_mode = _disk_resource_limit_mode;
1793
297
    if (_capacity > _cur_cache_size) {
1794
293
        _disk_resource_limit_mode = false;
1795
293
        _disk_limit_mode_metrics->set_value(0);
1796
293
    }
1797
297
    std::pair<int, int> percent;
1798
297
    int ret = disk_used_percentage(_cache_base_path, &percent);
1799
297
    if (ret != 0) {
1800
2
        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
1801
2
        return;
1802
2
    }
1803
295
    auto [space_percentage, inode_percentage] = percent;
1804
590
    auto is_insufficient = [](const int& percentage) {
1805
590
        return percentage >= config::file_cache_enter_disk_resource_limit_mode_percent;
1806
590
    };
1807
295
    DCHECK_GE(space_percentage, 0);
1808
295
    DCHECK_LE(space_percentage, 100);
1809
295
    DCHECK_GE(inode_percentage, 0);
1810
295
    DCHECK_LE(inode_percentage, 100);
1811
    // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
1812
    // FIXME: reject with config validator
1813
295
    if (config::file_cache_enter_disk_resource_limit_mode_percent <
1814
295
        config::file_cache_exit_disk_resource_limit_mode_percent) {
1815
2
        LOG_WARNING("config error, set to default value")
1816
2
                .tag("enter", config::file_cache_enter_disk_resource_limit_mode_percent)
1817
2
                .tag("exit", config::file_cache_exit_disk_resource_limit_mode_percent);
1818
2
        config::file_cache_enter_disk_resource_limit_mode_percent = 88;
1819
2
        config::file_cache_exit_disk_resource_limit_mode_percent = 80;
1820
2
    }
1821
295
    bool is_space_insufficient = is_insufficient(space_percentage);
1822
295
    bool is_inode_insufficient = is_insufficient(inode_percentage);
1823
295
    if (is_space_insufficient || is_inode_insufficient) {
1824
2
        _disk_resource_limit_mode = true;
1825
2
        _disk_limit_mode_metrics->set_value(1);
1826
293
    } else if (_disk_resource_limit_mode &&
1827
293
               (space_percentage < config::file_cache_exit_disk_resource_limit_mode_percent) &&
1828
293
               (inode_percentage < config::file_cache_exit_disk_resource_limit_mode_percent)) {
1829
0
        _disk_resource_limit_mode = false;
1830
0
        _disk_limit_mode_metrics->set_value(0);
1831
0
    }
1832
295
    if (previous_mode != _disk_resource_limit_mode) {
1833
        // add log for disk resource limit mode switching
1834
4
        if (_disk_resource_limit_mode) {
1835
2
            LOG(WARNING) << "Entering disk resource limit mode: file_cache=" << get_base_path()
1836
2
                         << " space_percent=" << space_percentage
1837
2
                         << " inode_percent=" << inode_percentage
1838
2
                         << " is_space_insufficient=" << is_space_insufficient
1839
2
                         << " is_inode_insufficient=" << is_inode_insufficient
1840
2
                         << " enter threshold="
1841
2
                         << config::file_cache_enter_disk_resource_limit_mode_percent;
1842
2
        } else {
1843
2
            LOG(INFO) << "Exiting disk resource limit mode: file_cache=" << get_base_path()
1844
2
                      << " space_percent=" << space_percentage
1845
2
                      << " inode_percent=" << inode_percentage << " exit threshold="
1846
2
                      << config::file_cache_exit_disk_resource_limit_mode_percent;
1847
2
        }
1848
291
    } else if (_disk_resource_limit_mode) {
1849
        // print log for disk resource limit mode running, but less frequently
1850
0
        LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path()
1851
0
                                 << " space_percent=" << space_percentage
1852
0
                                 << " inode_percent=" << inode_percentage
1853
0
                                 << " is_space_insufficient=" << is_space_insufficient
1854
0
                                 << " is_inode_insufficient=" << is_inode_insufficient
1855
0
                                 << " mode run in resource limit";
1856
0
    }
1857
295
}
1858
1859
30
void BlockFileCache::check_need_evict_cache_in_advance() {
1860
30
    if (_storage->get_type() != FileCacheStorageType::DISK) {
1861
2
        return;
1862
2
    }
1863
1864
28
    std::pair<int, int> percent;
1865
28
    int ret = disk_used_percentage(_cache_base_path, &percent);
1866
28
    if (ret != 0) {
1867
2
        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
1868
2
        return;
1869
2
    }
1870
26
    auto [space_percentage, inode_percentage] = percent;
1871
26
    int size_percentage = static_cast<int>(_cur_cache_size * 100 / _capacity);
1872
78
    auto is_insufficient = [](const int& percentage) {
1873
78
        return percentage >= config::file_cache_enter_need_evict_cache_in_advance_percent;
1874
78
    };
1875
26
    DCHECK_GE(space_percentage, 0);
1876
26
    DCHECK_LE(space_percentage, 100);
1877
26
    DCHECK_GE(inode_percentage, 0);
1878
26
    DCHECK_LE(inode_percentage, 100);
1879
    // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
1880
    // FIXME: reject with config validator
1881
26
    if (config::file_cache_enter_need_evict_cache_in_advance_percent <=
1882
26
        config::file_cache_exit_need_evict_cache_in_advance_percent) {
1883
2
        LOG_WARNING("config error, set to default value")
1884
2
                .tag("enter", config::file_cache_enter_need_evict_cache_in_advance_percent)
1885
2
                .tag("exit", config::file_cache_exit_need_evict_cache_in_advance_percent);
1886
2
        config::file_cache_enter_need_evict_cache_in_advance_percent = 78;
1887
2
        config::file_cache_exit_need_evict_cache_in_advance_percent = 75;
1888
2
    }
1889
26
    bool previous_mode = _need_evict_cache_in_advance;
1890
26
    bool is_space_insufficient = is_insufficient(space_percentage);
1891
26
    bool is_inode_insufficient = is_insufficient(inode_percentage);
1892
26
    bool is_size_insufficient = is_insufficient(size_percentage);
1893
26
    if (is_space_insufficient || is_inode_insufficient || is_size_insufficient) {
1894
18
        _need_evict_cache_in_advance = true;
1895
18
        _need_evict_cache_in_advance_metrics->set_value(1);
1896
18
    } else if (_need_evict_cache_in_advance &&
1897
8
               (space_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
1898
8
               (inode_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
1899
8
               (size_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent)) {
1900
2
        _need_evict_cache_in_advance = false;
1901
2
        _need_evict_cache_in_advance_metrics->set_value(0);
1902
2
    }
1903
26
    if (previous_mode != _need_evict_cache_in_advance) {
1904
        // add log for evict cache in advance mode switching
1905
12
        if (_need_evict_cache_in_advance) {
1906
10
            LOG(WARNING) << "Entering evict cache in advance mode: "
1907
10
                         << "file_cache=" << get_base_path()
1908
10
                         << " space_percent=" << space_percentage
1909
10
                         << " inode_percent=" << inode_percentage
1910
10
                         << " size_percent=" << size_percentage
1911
10
                         << " is_space_insufficient=" << is_space_insufficient
1912
10
                         << " is_inode_insufficient=" << is_inode_insufficient
1913
10
                         << " is_size_insufficient=" << is_size_insufficient << " enter threshold="
1914
10
                         << config::file_cache_enter_need_evict_cache_in_advance_percent;
1915
10
        } else {
1916
2
            LOG(INFO) << "Exiting evict cache in advance mode: "
1917
2
                      << "file_cache=" << get_base_path() << " space_percent=" << space_percentage
1918
2
                      << " inode_percent=" << inode_percentage
1919
2
                      << " size_percent=" << size_percentage << " exit threshold="
1920
2
                      << config::file_cache_exit_need_evict_cache_in_advance_percent;
1921
2
        }
1922
14
    } else if (_need_evict_cache_in_advance) {
1923
        // print log for evict cache in advance mode running, but less frequently
1924
8
        LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path()
1925
2
                                 << " space_percent=" << space_percentage
1926
2
                                 << " inode_percent=" << inode_percentage
1927
2
                                 << " size_percent=" << size_percentage
1928
2
                                 << " is_space_insufficient=" << is_space_insufficient
1929
2
                                 << " is_inode_insufficient=" << is_inode_insufficient
1930
2
                                 << " is_size_insufficient=" << is_size_insufficient
1931
2
                                 << " need evict cache in advance";
1932
8
    }
1933
26
}
1934
1935
280
void BlockFileCache::run_background_monitor() {
1936
280
    Thread::set_self_name("run_background_monitor");
1937
2.18k
    while (!_close) {
1938
2.18k
        int64_t interval_ms = config::file_cache_background_monitor_interval_ms;
1939
2.18k
        TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_ms);
1940
2.18k
        check_disk_resource_limit();
1941
2.18k
        if (config::enable_evict_file_cache_in_advance) {
1942
14
            check_need_evict_cache_in_advance();
1943
2.17k
        } else {
1944
2.17k
            _need_evict_cache_in_advance = false;
1945
2.17k
            _need_evict_cache_in_advance_metrics->set_value(0);
1946
2.17k
        }
1947
1948
2.18k
        {
1949
2.18k
            std::unique_lock close_lock(_close_mtx);
1950
2.18k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
1951
2.18k
            if (_close) {
1952
280
                break;
1953
280
            }
1954
2.18k
        }
1955
        // report
1956
1.90k
        {
1957
1.90k
            SCOPED_CACHE_LOCK(_mutex, this);
1958
1.90k
            _cur_cache_size_metrics->set_value(_cur_cache_size);
1959
1.90k
            _cur_ttl_cache_size_metrics->set_value(_cur_cache_size -
1960
1.90k
                                                   _index_queue.get_capacity(cache_lock) -
1961
1.90k
                                                   _normal_queue.get_capacity(cache_lock) -
1962
1.90k
                                                   _disposable_queue.get_capacity(cache_lock));
1963
1.90k
            _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(
1964
1.90k
                    _ttl_queue.get_capacity(cache_lock));
1965
1.90k
            _cur_ttl_cache_lru_queue_element_count_metrics->set_value(
1966
1.90k
                    _ttl_queue.get_elements_num(cache_lock));
1967
1.90k
            _cur_normal_queue_cache_size_metrics->set_value(_normal_queue.get_capacity(cache_lock));
1968
1.90k
            _cur_normal_queue_element_count_metrics->set_value(
1969
1.90k
                    _normal_queue.get_elements_num(cache_lock));
1970
1.90k
            _cur_index_queue_cache_size_metrics->set_value(_index_queue.get_capacity(cache_lock));
1971
1.90k
            _cur_index_queue_element_count_metrics->set_value(
1972
1.90k
                    _index_queue.get_elements_num(cache_lock));
1973
1.90k
            _cur_disposable_queue_cache_size_metrics->set_value(
1974
1.90k
                    _disposable_queue.get_capacity(cache_lock));
1975
1.90k
            _cur_disposable_queue_element_count_metrics->set_value(
1976
1.90k
                    _disposable_queue.get_elements_num(cache_lock));
1977
1978
            // Update meta store write queue size if storage is FSFileCacheStorage
1979
1.90k
            if (_storage->get_type() == FileCacheStorageType::DISK) {
1980
49
                auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get());
1981
49
                if (fs_storage != nullptr) {
1982
49
                    auto* meta_store = fs_storage->get_meta_store();
1983
49
                    if (meta_store != nullptr) {
1984
49
                        _meta_store_write_queue_size_metrics->set_value(
1985
49
                                meta_store->get_write_queue_size());
1986
49
                    }
1987
49
                }
1988
49
            }
1989
1990
1.90k
            if (_num_read_blocks->get_value() > 0) {
1991
30
                _hit_ratio->set_value((double)_num_hit_blocks->get_value() /
1992
30
                                      (double)_num_read_blocks->get_value());
1993
30
            }
1994
1.90k
            if (_num_read_blocks_5m && _num_read_blocks_5m->get_value() > 0) {
1995
0
                _hit_ratio_5m->set_value((double)_num_hit_blocks_5m->get_value() /
1996
0
                                         (double)_num_read_blocks_5m->get_value());
1997
0
            }
1998
1.90k
            if (_num_read_blocks_1h && _num_read_blocks_1h->get_value() > 0) {
1999
0
                _hit_ratio_1h->set_value((double)_num_hit_blocks_1h->get_value() /
2000
0
                                         (double)_num_read_blocks_1h->get_value());
2001
0
            }
2002
2003
1.90k
            if (_no_warmup_num_hit_blocks->get_value() > 0) {
2004
4
                _no_warmup_hit_ratio->set_value((double)_no_warmup_num_hit_blocks->get_value() /
2005
4
                                                (double)_no_warmup_num_read_blocks->get_value());
2006
4
            }
2007
1.90k
            if (_no_warmup_num_hit_blocks_5m && _no_warmup_num_hit_blocks_5m->get_value() > 0) {
2008
0
                _no_warmup_hit_ratio_5m->set_value(
2009
0
                        (double)_no_warmup_num_hit_blocks_5m->get_value() /
2010
0
                        (double)_no_warmup_num_read_blocks_5m->get_value());
2011
0
            }
2012
1.90k
            if (_no_warmup_num_hit_blocks_1h && _no_warmup_num_hit_blocks_1h->get_value() > 0) {
2013
0
                _no_warmup_hit_ratio_1h->set_value(
2014
0
                        (double)_no_warmup_num_hit_blocks_1h->get_value() /
2015
0
                        (double)_no_warmup_num_read_blocks_1h->get_value());
2016
0
            }
2017
1.90k
        }
2018
1.90k
    }
2019
280
}
2020
2021
280
void BlockFileCache::run_background_gc() {
2022
280
    Thread::set_self_name("run_background_gc");
2023
280
    FileCacheKey key;
2024
280
    size_t batch_count = 0;
2025
888
    while (!_close) {
2026
888
        int64_t interval_ms = config::file_cache_background_gc_interval_ms;
2027
888
        size_t batch_limit = config::file_cache_remove_block_qps_limit * interval_ms / 1000;
2028
888
        {
2029
888
            std::unique_lock close_lock(_close_mtx);
2030
888
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2031
888
            if (_close) {
2032
280
                break;
2033
280
            }
2034
888
        }
2035
2036
650
        while (batch_count < batch_limit && _recycle_keys.try_dequeue(key)) {
2037
42
            int64_t duration_ns = 0;
2038
42
            Status st;
2039
42
            {
2040
42
                SCOPED_RAW_TIMER(&duration_ns);
2041
42
                st = _storage->remove(key);
2042
42
            }
2043
42
            *_storage_async_remove_latency_us << (duration_ns / 1000);
2044
2045
42
            if (!st.ok()) {
2046
0
                LOG_WARNING("").error(st);
2047
0
            }
2048
42
            batch_count++;
2049
42
        }
2050
608
        *_recycle_keys_length_recorder << _recycle_keys.size_approx();
2051
608
        batch_count = 0;
2052
608
    }
2053
280
}
2054
2055
280
void BlockFileCache::run_background_evict_in_advance() {
2056
280
    Thread::set_self_name("run_background_evict_in_advance");
2057
280
    LOG(INFO) << "Starting background evict in advance thread";
2058
280
    int64_t batch = 0;
2059
9.82k
    while (!_close) {
2060
9.82k
        {
2061
9.82k
            std::unique_lock close_lock(_close_mtx);
2062
9.82k
            _close_cv.wait_for(
2063
9.82k
                    close_lock,
2064
9.82k
                    std::chrono::milliseconds(config::file_cache_evict_in_advance_interval_ms));
2065
9.82k
            if (_close) {
2066
280
                LOG(INFO) << "Background evict in advance thread exiting due to cache closing";
2067
280
                break;
2068
280
            }
2069
9.82k
        }
2070
9.54k
        batch = config::file_cache_evict_in_advance_batch_bytes;
2071
2072
        // Skip if eviction not needed or too many pending recycles
2073
9.54k
        if (!_need_evict_cache_in_advance ||
2074
9.54k
            _recycle_keys.size_approx() >=
2075
9.53k
                    config::file_cache_evict_in_advance_recycle_keys_num_threshold) {
2076
9.53k
            continue;
2077
9.53k
        }
2078
2079
4
        int64_t duration_ns = 0;
2080
4
        {
2081
4
            SCOPED_CACHE_LOCK(_mutex, this);
2082
4
            SCOPED_RAW_TIMER(&duration_ns);
2083
4
            try_evict_in_advance(batch, cache_lock);
2084
4
        }
2085
4
        *_evict_in_advance_latency_us << (duration_ns / 1000);
2086
4
    }
2087
280
}
2088
2089
280
void BlockFileCache::run_background_block_lru_update() {
2090
280
    Thread::set_self_name("run_background_block_lru_update");
2091
280
    std::vector<FileBlockSPtr> batch;
2092
9.60k
    while (!_close) {
2093
9.60k
        int64_t interval_ms = config::file_cache_background_block_lru_update_interval_ms;
2094
9.60k
        size_t batch_limit =
2095
9.60k
                config::file_cache_background_block_lru_update_qps_limit * interval_ms / 1000;
2096
9.60k
        {
2097
9.60k
            std::unique_lock close_lock(_close_mtx);
2098
9.60k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2099
9.60k
            if (_close) {
2100
279
                break;
2101
279
            }
2102
9.60k
        }
2103
2104
9.32k
        batch.clear();
2105
9.32k
        batch.reserve(batch_limit);
2106
9.32k
        size_t drained = _need_update_lru_blocks.drain(batch_limit, &batch);
2107
9.37k
        if (drained == 0) {
2108
9.37k
            *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2109
9.37k
            continue;
2110
9.37k
        }
2111
2112
18.4E
        int64_t duration_ns = 0;
2113
18.4E
        {
2114
18.4E
            SCOPED_CACHE_LOCK(_mutex, this);
2115
18.4E
            SCOPED_RAW_TIMER(&duration_ns);
2116
18.4E
            for (auto& block : batch) {
2117
1.41k
                update_block_lru(block, cache_lock);
2118
1.41k
            }
2119
18.4E
        }
2120
18.4E
        *_update_lru_blocks_latency_us << (duration_ns / 1000);
2121
18.4E
        *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2122
18.4E
    }
2123
280
}
2124
2125
std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>>
2126
4
BlockFileCache::get_hot_blocks_meta(const UInt128Wrapper& hash) const {
2127
4
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
2128
4
                               std::chrono::steady_clock::now().time_since_epoch())
2129
4
                               .count();
2130
4
    SCOPED_CACHE_LOCK(_mutex, this);
2131
4
    std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>> blocks_meta;
2132
4
    if (auto iter = _files.find(hash); iter != _files.end()) {
2133
10
        for (auto& pair : _files.find(hash)->second) {
2134
10
            const FileBlockCell* cell = &pair.second;
2135
10
            if (cell->file_block->cache_type() != FileCacheType::DISPOSABLE) {
2136
8
                if (cell->file_block->cache_type() == FileCacheType::TTL ||
2137
8
                    (cell->atime != 0 &&
2138
6
                     cur_time - cell->atime <
2139
6
                             get_queue(cell->file_block->cache_type()).get_hot_data_interval())) {
2140
6
                    blocks_meta.emplace_back(pair.first, cell->size(),
2141
6
                                             cell->file_block->cache_type(),
2142
6
                                             cell->file_block->expiration_time());
2143
6
                }
2144
8
            }
2145
10
        }
2146
4
    }
2147
4
    return blocks_meta;
2148
4
}
2149
2150
bool BlockFileCache::try_reserve_during_async_load(size_t size,
2151
8
                                                   std::lock_guard<std::mutex>& cache_lock) {
2152
8
    size_t removed_size = 0;
2153
8
    size_t normal_queue_size = _normal_queue.get_capacity(cache_lock);
2154
8
    size_t disposable_queue_size = _disposable_queue.get_capacity(cache_lock);
2155
8
    size_t index_queue_size = _index_queue.get_capacity(cache_lock);
2156
2157
8
    std::vector<FileBlockCell*> to_evict;
2158
8
    auto collect_eliminate_fragments = [&](LRUQueue& queue) {
2159
10
        for (const auto& [entry_key, entry_offset, entry_size] : queue) {
2160
10
            if (!_disk_resource_limit_mode || removed_size >= size) {
2161
8
                break;
2162
8
            }
2163
2
            auto* cell = get_cell(entry_key, entry_offset, cache_lock);
2164
2165
2
            DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
2166
0
                         << ", offset: " << entry_offset;
2167
2168
2
            size_t cell_size = cell->size();
2169
2
            DCHECK(entry_size == cell_size);
2170
2171
2
            if (cell->releasable()) {
2172
2
                auto& file_block = cell->file_block;
2173
2174
2
                std::lock_guard block_lock(file_block->_mutex);
2175
2
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
2176
2
                to_evict.push_back(cell);
2177
2
                removed_size += cell_size;
2178
2
            }
2179
2
        }
2180
8
    };
2181
8
    if (disposable_queue_size != 0) {
2182
0
        collect_eliminate_fragments(get_queue(FileCacheType::DISPOSABLE));
2183
0
    }
2184
8
    if (normal_queue_size != 0) {
2185
8
        collect_eliminate_fragments(get_queue(FileCacheType::NORMAL));
2186
8
    }
2187
8
    if (index_queue_size != 0) {
2188
0
        collect_eliminate_fragments(get_queue(FileCacheType::INDEX));
2189
0
    }
2190
8
    std::string reason = "async load";
2191
8
    remove_file_blocks(to_evict, cache_lock, true, reason);
2192
2193
8
    return !_disk_resource_limit_mode || removed_size >= size;
2194
8
}
2195
2196
60
void BlockFileCache::clear_need_update_lru_blocks() {
2197
60
    _need_update_lru_blocks.clear();
2198
60
    *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2199
60
}
2200
2201
54
void BlockFileCache::pause_ttl_manager() {
2202
54
    if (_ttl_mgr) {
2203
12
        _ttl_mgr->stop();
2204
12
    }
2205
54
}
2206
2207
54
void BlockFileCache::resume_ttl_manager() {
2208
54
    if (_ttl_mgr) {
2209
12
        _ttl_mgr->resume();
2210
12
    }
2211
54
}
2212
2213
54
std::string BlockFileCache::clear_file_cache_directly() {
2214
54
    pause_ttl_manager();
2215
54
    _lru_dumper->remove_lru_dump_files();
2216
54
    using namespace std::chrono;
2217
54
    std::stringstream ss;
2218
54
    auto start = steady_clock::now();
2219
54
    std::string result;
2220
54
    {
2221
54
        SCOPED_CACHE_LOCK(_mutex, this);
2222
54
        LOG_INFO("start clear_file_cache_directly").tag("path", _cache_base_path);
2223
2224
54
        std::string clear_msg;
2225
54
        auto s = _storage->clear(clear_msg);
2226
54
        if (!s.ok()) {
2227
2
            result = clear_msg;
2228
52
        } else {
2229
52
            int64_t num_files = _files.size();
2230
52
            int64_t cache_size = _cur_cache_size;
2231
52
            int64_t index_queue_size = _index_queue.get_elements_num(cache_lock);
2232
52
            int64_t normal_queue_size = _normal_queue.get_elements_num(cache_lock);
2233
52
            int64_t disposible_queue_size = _disposable_queue.get_elements_num(cache_lock);
2234
52
            int64_t ttl_queue_size = _ttl_queue.get_elements_num(cache_lock);
2235
2236
52
            int64_t clear_fd_duration = 0;
2237
52
            {
2238
                // clear FDCache to release fd
2239
52
                SCOPED_RAW_TIMER(&clear_fd_duration);
2240
52
                for (const auto& [file_key, file_blocks] : _files) {
2241
26
                    for (const auto& [offset, file_block_cell] : file_blocks) {
2242
26
                        AccessKeyAndOffset access_key_and_offset(file_key, offset);
2243
26
                        FDCache::instance()->remove_file_reader(access_key_and_offset);
2244
26
                    }
2245
4
                }
2246
52
            }
2247
2248
52
            _files.clear();
2249
52
            _cur_cache_size = 0;
2250
52
            _cur_ttl_size = 0;
2251
52
            _time_to_key.clear();
2252
52
            _key_to_time.clear();
2253
52
            _index_queue.clear(cache_lock);
2254
52
            _normal_queue.clear(cache_lock);
2255
52
            _disposable_queue.clear(cache_lock);
2256
52
            _ttl_queue.clear(cache_lock);
2257
2258
            // Update cache metrics immediately so consumers observe the cleared state
2259
            // without waiting for the next background monitor round.
2260
52
            _cur_cache_size_metrics->set_value(0);
2261
52
            _cur_ttl_cache_size_metrics->set_value(0);
2262
52
            _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(0);
2263
52
            _cur_ttl_cache_lru_queue_element_count_metrics->set_value(0);
2264
52
            _cur_normal_queue_cache_size_metrics->set_value(0);
2265
52
            _cur_normal_queue_element_count_metrics->set_value(0);
2266
52
            _cur_index_queue_cache_size_metrics->set_value(0);
2267
52
            _cur_index_queue_element_count_metrics->set_value(0);
2268
52
            _cur_disposable_queue_cache_size_metrics->set_value(0);
2269
52
            _cur_disposable_queue_element_count_metrics->set_value(0);
2270
2271
52
            clear_need_update_lru_blocks();
2272
2273
52
            ss << "finish clear_file_cache_directly"
2274
52
               << " path=" << _cache_base_path << " time_elapsed_ms="
2275
52
               << duration_cast<milliseconds>(steady_clock::now() - start).count()
2276
52
               << " fd_clear_time_ms=" << (clear_fd_duration / 1000000)
2277
52
               << " num_files=" << num_files << " cache_size=" << cache_size
2278
52
               << " index_queue_size=" << index_queue_size
2279
52
               << " normal_queue_size=" << normal_queue_size
2280
52
               << " disposible_queue_size=" << disposible_queue_size
2281
52
               << "ttl_queue_size=" << ttl_queue_size;
2282
52
            result = ss.str();
2283
52
            LOG(INFO) << result;
2284
52
        }
2285
54
    }
2286
54
    _lru_dumper->remove_lru_dump_files();
2287
54
    resume_ttl_manager();
2288
54
    return result;
2289
54
}
2290
2291
4.01k
std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const UInt128Wrapper& hash) {
2292
4.01k
    std::map<size_t, FileBlockSPtr> offset_to_block;
2293
4.01k
    SCOPED_CACHE_LOCK(_mutex, this);
2294
4.01k
    if (_files.contains(hash)) {
2295
1.86k
        for (auto& [offset, cell] : _files[hash]) {
2296
1.86k
            if (cell.file_block->state() == FileBlock::State::DOWNLOADED) {
2297
1.86k
                cell.file_block->_owned_by_cached_reader = true;
2298
1.86k
                offset_to_block.emplace(offset, cell.file_block);
2299
1.86k
            }
2300
1.86k
        }
2301
1.83k
    }
2302
4.01k
    return offset_to_block;
2303
4.01k
}
2304
2305
0
void BlockFileCache::update_ttl_atime(const UInt128Wrapper& hash) {
2306
0
    SCOPED_CACHE_LOCK(_mutex, this);
2307
0
    if (auto iter = _files.find(hash); iter != _files.end()) {
2308
0
        for (auto& [_, cell] : iter->second) {
2309
0
            cell.update_atime();
2310
0
        }
2311
0
    };
2312
0
}
2313
2314
280
void BlockFileCache::run_background_lru_log_replay() {
2315
280
    Thread::set_self_name("run_background_lru_log_replay");
2316
9.81k
    while (!_close) {
2317
9.81k
        int64_t interval_ms = config::file_cache_background_lru_log_replay_interval_ms;
2318
9.81k
        {
2319
9.81k
            std::unique_lock close_lock(_close_mtx);
2320
9.81k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2321
9.81k
            if (_close) {
2322
278
                break;
2323
278
            }
2324
9.81k
        }
2325
2326
9.53k
        _lru_recorder->replay_queue_event(FileCacheType::TTL);
2327
9.53k
        _lru_recorder->replay_queue_event(FileCacheType::INDEX);
2328
9.53k
        _lru_recorder->replay_queue_event(FileCacheType::NORMAL);
2329
9.53k
        _lru_recorder->replay_queue_event(FileCacheType::DISPOSABLE);
2330
2331
9.53k
        if (config::enable_evaluate_shadow_queue_diff) {
2332
0
            SCOPED_CACHE_LOCK(_mutex, this);
2333
0
            _lru_recorder->evaluate_queue_diff(_ttl_queue, "ttl", cache_lock);
2334
0
            _lru_recorder->evaluate_queue_diff(_index_queue, "index", cache_lock);
2335
0
            _lru_recorder->evaluate_queue_diff(_normal_queue, "normal", cache_lock);
2336
0
            _lru_recorder->evaluate_queue_diff(_disposable_queue, "disposable", cache_lock);
2337
0
        }
2338
9.53k
    }
2339
280
}
2340
2341
3.12k
void BlockFileCache::dump_lru_queues(bool force) {
2342
3.12k
    std::unique_lock dump_lock(_dump_lru_queues_mtx);
2343
3.12k
    if (config::file_cache_background_lru_dump_tail_record_num > 0 &&
2344
3.12k
        !ExecEnv::GetInstance()->get_is_upgrading()) {
2345
3.12k
        _lru_dumper->dump_queue("disposable", force);
2346
3.12k
        _lru_dumper->dump_queue("normal", force);
2347
3.12k
        _lru_dumper->dump_queue("index", force);
2348
3.12k
        _lru_dumper->dump_queue("ttl", force);
2349
3.12k
        _lru_dumper->set_first_dump_done();
2350
3.12k
    }
2351
3.12k
}
2352
2353
280
void BlockFileCache::run_background_lru_dump() {
2354
280
    Thread::set_self_name("run_background_lru_dump");
2355
3.40k
    while (!_close) {
2356
3.40k
        int64_t interval_ms = config::file_cache_background_lru_dump_interval_ms;
2357
3.40k
        {
2358
3.40k
            std::unique_lock close_lock(_close_mtx);
2359
3.40k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2360
3.40k
            if (_close) {
2361
279
                break;
2362
279
            }
2363
3.40k
        }
2364
3.12k
        dump_lru_queues(false);
2365
3.12k
    }
2366
280
}
2367
2368
280
void BlockFileCache::restore_lru_queues_from_disk(std::lock_guard<std::mutex>& cache_lock) {
2369
    // keep this order coz may be duplicated in different queue, we use the first appearence
2370
280
    _lru_dumper->restore_queue(_ttl_queue, "ttl", cache_lock);
2371
280
    _lru_dumper->restore_queue(_index_queue, "index", cache_lock);
2372
280
    _lru_dumper->restore_queue(_normal_queue, "normal", cache_lock);
2373
280
    _lru_dumper->restore_queue(_disposable_queue, "disposable", cache_lock);
2374
280
}
2375
2376
0
std::map<std::string, double> BlockFileCache::get_stats() {
2377
0
    std::map<std::string, double> stats;
2378
0
    stats["hits_ratio"] = (double)_hit_ratio->get_value();
2379
0
    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
2380
0
    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
2381
2382
0
    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
2383
0
    stats["index_queue_curr_size"] = (double)_cur_index_queue_cache_size_metrics->get_value();
2384
0
    stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
2385
0
    stats["index_queue_curr_elements"] =
2386
0
            (double)_cur_index_queue_element_count_metrics->get_value();
2387
2388
0
    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
2389
0
    stats["ttl_queue_curr_size"] = (double)_cur_ttl_cache_lru_queue_cache_size_metrics->get_value();
2390
0
    stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
2391
0
    stats["ttl_queue_curr_elements"] =
2392
0
            (double)_cur_ttl_cache_lru_queue_element_count_metrics->get_value();
2393
2394
0
    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
2395
0
    stats["normal_queue_curr_size"] = (double)_cur_normal_queue_cache_size_metrics->get_value();
2396
0
    stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
2397
0
    stats["normal_queue_curr_elements"] =
2398
0
            (double)_cur_normal_queue_element_count_metrics->get_value();
2399
2400
0
    stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
2401
0
    stats["disposable_queue_curr_size"] =
2402
0
            (double)_cur_disposable_queue_cache_size_metrics->get_value();
2403
0
    stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
2404
0
    stats["disposable_queue_curr_elements"] =
2405
0
            (double)_cur_disposable_queue_element_count_metrics->get_value();
2406
2407
0
    stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance;
2408
0
    stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode;
2409
2410
0
    stats["total_removed_counts"] = (double)_num_removed_blocks->get_value();
2411
0
    stats["total_hit_counts"] = (double)_num_hit_blocks->get_value();
2412
0
    stats["total_read_counts"] = (double)_num_read_blocks->get_value();
2413
2414
0
    stats["total_read_size"] = (double)_total_read_size_metrics->get_value();
2415
0
    stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value();
2416
0
    stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value();
2417
2418
0
    return stats;
2419
0
}
2420
2421
// for be UTs
2422
344
std::map<std::string, double> BlockFileCache::get_stats_unsafe() {
2423
344
    std::map<std::string, double> stats;
2424
344
    stats["hits_ratio"] = (double)_hit_ratio->get_value();
2425
344
    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
2426
344
    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
2427
2428
344
    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
2429
344
    stats["index_queue_curr_size"] = (double)_index_queue.get_capacity_unsafe();
2430
344
    stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
2431
344
    stats["index_queue_curr_elements"] = (double)_index_queue.get_elements_num_unsafe();
2432
2433
344
    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
2434
344
    stats["ttl_queue_curr_size"] = (double)_ttl_queue.get_capacity_unsafe();
2435
344
    stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
2436
344
    stats["ttl_queue_curr_elements"] = (double)_ttl_queue.get_elements_num_unsafe();
2437
2438
344
    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
2439
344
    stats["normal_queue_curr_size"] = (double)_normal_queue.get_capacity_unsafe();
2440
344
    stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
2441
344
    stats["normal_queue_curr_elements"] = (double)_normal_queue.get_elements_num_unsafe();
2442
2443
344
    stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
2444
344
    stats["disposable_queue_curr_size"] = (double)_disposable_queue.get_capacity_unsafe();
2445
344
    stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
2446
344
    stats["disposable_queue_curr_elements"] = (double)_disposable_queue.get_elements_num_unsafe();
2447
2448
344
    stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance;
2449
344
    stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode;
2450
2451
344
    stats["total_removed_counts"] = (double)_num_removed_blocks->get_value();
2452
344
    stats["total_hit_counts"] = (double)_num_hit_blocks->get_value();
2453
344
    stats["total_read_counts"] = (double)_num_read_blocks->get_value();
2454
2455
344
    stats["total_read_size"] = (double)_total_read_size_metrics->get_value();
2456
344
    stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value();
2457
344
    stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value();
2458
2459
344
    return stats;
2460
344
}
2461
2462
template void BlockFileCache::remove(FileBlockSPtr file_block,
2463
                                     std::lock_guard<std::mutex>& cache_lock,
2464
                                     std::lock_guard<std::mutex>& block_lock, bool sync);
2465
2466
#include "common/compile_check_end.h"
2467
2468
2
Status BlockFileCache::report_file_cache_inconsistency(std::vector<std::string>& results) {
2469
2
    InconsistencyContext inconsistency_context;
2470
2
    RETURN_IF_ERROR(check_file_cache_consistency(inconsistency_context));
2471
2
    auto n = inconsistency_context.types.size();
2472
2
    results.reserve(n);
2473
2
    for (size_t i = 0; i < n; i++) {
2474
0
        std::string result;
2475
0
        result += "File cache info in manager:\n";
2476
0
        result += inconsistency_context.infos_in_manager[i].to_string();
2477
0
        result += "File cache info in storage:\n";
2478
0
        result += inconsistency_context.infos_in_storage[i].to_string();
2479
0
        result += inconsistency_context.types[i].to_string();
2480
0
        result += "\n";
2481
0
        results.push_back(std::move(result));
2482
0
    }
2483
2
    return Status::OK();
2484
2
}
2485
2486
2
Status BlockFileCache::check_file_cache_consistency(InconsistencyContext& inconsistency_context) {
2487
2
    std::lock_guard<std::mutex> cache_lock(_mutex);
2488
2
    std::vector<FileCacheInfo> infos_in_storage;
2489
2
    RETURN_IF_ERROR(_storage->get_file_cache_infos(infos_in_storage, cache_lock));
2490
2
    std::unordered_set<AccessKeyAndOffset, KeyAndOffsetHash> confirmed_blocks;
2491
2
    for (const auto& info_in_storage : infos_in_storage) {
2492
0
        confirmed_blocks.insert({info_in_storage.hash, info_in_storage.offset});
2493
0
        auto* cell = get_cell(info_in_storage.hash, info_in_storage.offset, cache_lock);
2494
0
        if (cell == nullptr || cell->file_block == nullptr) {
2495
0
            inconsistency_context.infos_in_manager.emplace_back();
2496
0
            inconsistency_context.infos_in_storage.push_back(info_in_storage);
2497
0
            inconsistency_context.types.emplace_back(InconsistencyType::NOT_LOADED);
2498
0
            continue;
2499
0
        }
2500
0
        FileCacheInfo info_in_manager {
2501
0
                .hash = info_in_storage.hash,
2502
0
                .expiration_time = cell->file_block->expiration_time(),
2503
0
                .size = cell->size(),
2504
0
                .offset = info_in_storage.offset,
2505
0
                .is_tmp = cell->file_block->state() == FileBlock::State::DOWNLOADING,
2506
0
                .cache_type = cell->file_block->cache_type()};
2507
0
        InconsistencyType inconsistent_type;
2508
0
        if (info_in_storage.is_tmp != info_in_manager.is_tmp) {
2509
0
            inconsistent_type |= InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE;
2510
0
        }
2511
0
        size_t expected_size =
2512
0
                info_in_manager.is_tmp ? cell->dowloading_size() : info_in_manager.size;
2513
0
        if (info_in_storage.size != expected_size) {
2514
0
            inconsistent_type |= InconsistencyType::SIZE_INCONSISTENT;
2515
0
        }
2516
        // Only if it is not a tmp file need we check the cache type.
2517
0
        if ((inconsistent_type & InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE) == 0 &&
2518
0
            info_in_storage.cache_type != info_in_manager.cache_type) {
2519
0
            inconsistent_type |= InconsistencyType::CACHE_TYPE_INCONSISTENT;
2520
0
        }
2521
0
        if (info_in_storage.expiration_time != info_in_manager.expiration_time) {
2522
0
            inconsistent_type |= InconsistencyType::EXPIRATION_TIME_INCONSISTENT;
2523
0
        }
2524
0
        if (inconsistent_type != InconsistencyType::NONE) {
2525
0
            inconsistency_context.infos_in_manager.push_back(info_in_manager);
2526
0
            inconsistency_context.infos_in_storage.push_back(info_in_storage);
2527
0
            inconsistency_context.types.push_back(inconsistent_type);
2528
0
        }
2529
0
    }
2530
2531
2
    for (const auto& [hash, offset_to_cell] : _files) {
2532
0
        for (const auto& [offset, cell] : offset_to_cell) {
2533
0
            if (confirmed_blocks.contains({hash, offset})) {
2534
0
                continue;
2535
0
            }
2536
0
            const auto& block = cell.file_block;
2537
0
            inconsistency_context.infos_in_manager.emplace_back(
2538
0
                    hash, block->expiration_time(), cell.size(), offset,
2539
0
                    cell.file_block->state() == FileBlock::State::DOWNLOADING, block->cache_type());
2540
0
            inconsistency_context.infos_in_storage.emplace_back();
2541
0
            inconsistency_context.types.emplace_back(InconsistencyType::MISSING_IN_STORAGE);
2542
0
        }
2543
0
    }
2544
2
    return Status::OK();
2545
2
}
2546
2547
} // namespace doris::io