Coverage Report

Created: 2026-06-26 04:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/cache/block_file_cache.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileCache.cpp
19
// and modified by Doris
20
21
#include "io/cache/block_file_cache.h"
22
23
#include <gen_cpp/file_cache.pb.h>
24
25
#include <cstdio>
26
#include <exception>
27
#include <fstream>
28
#include <unordered_set>
29
30
#include "common/status.h"
31
#include "cpp/sync_point.h"
32
#include "runtime/exec_env.h"
33
34
#if defined(__APPLE__)
35
#include <sys/mount.h>
36
#else
37
#include <sys/statfs.h>
38
#endif
39
40
#include <chrono> // IWYU pragma: keep
41
#include <mutex>
42
#include <ranges>
43
44
#include "common/cast_set.h"
45
#include "common/check.h"
46
#include "common/config.h"
47
#include "common/logging.h"
48
#include "core/uint128.h"
49
#include "exec/common/sip_hash.h"
50
#include "io/cache/block_file_cache_ttl_mgr.h"
51
#include "io/cache/file_block.h"
52
#include "io/cache/file_cache_common.h"
53
#include "io/cache/fs_file_cache_storage.h"
54
#include "io/cache/mem_file_cache_storage.h"
55
#include "runtime/runtime_profile.h"
56
#include "util/concurrency_stats.h"
57
#include "util/stack_util.h"
58
#include "util/stopwatch.hpp"
59
#include "util/thread.h"
60
#include "util/time.h"
61
namespace doris::io {
62
63
namespace {
64
65
constexpr std::array<FileCacheType, 4> LRU_LOG_REPLAY_TYPES = {
66
        FileCacheType::TTL, FileCacheType::INDEX, FileCacheType::NORMAL, FileCacheType::DISPOSABLE};
67
68
760
size_t file_cache_type_index(FileCacheType type) {
69
760
    return static_cast<size_t>(type);
70
760
}
71
72
} // namespace
73
74
// Insert a block pointer into one shard while swallowing allocation failures.
75
4.32M
bool NeedUpdateLRUBlocks::insert(FileBlockSPtr block, size_t max_queue_size) {
76
4.32M
    if (!block || max_queue_size == 0) {
77
1
        return false;
78
1
    }
79
4.32M
    bool reserved = false;
80
4.32M
    try {
81
4.32M
        auto* raw_ptr = block.get();
82
4.32M
        auto idx = shard_index(raw_ptr);
83
4.32M
        auto& shard = _shards[idx];
84
4.32M
        std::lock_guard lock(shard.mutex);
85
4.32M
        if (shard.entries.contains(raw_ptr)) {
86
4.16M
            return false;
87
4.16M
        }
88
153k
        size_t cur_size = _size.load(std::memory_order_relaxed);
89
158k
        while (cur_size < max_queue_size) {
90
158k
            if (_size.compare_exchange_weak(cur_size, cur_size + 1, std::memory_order_relaxed)) {
91
158k
                reserved = true;
92
158k
                break;
93
158k
            }
94
158k
        }
95
153k
        if (!reserved) {
96
1
            return false;
97
1
        }
98
153k
        auto [_, inserted] = shard.entries.emplace(raw_ptr, std::move(block));
99
153k
        DORIS_CHECK(inserted);
100
153k
        return true;
101
153k
    } catch (const std::exception& e) {
102
0
        if (reserved) {
103
0
            decrease_size(1);
104
0
        }
105
0
        LOG(WARNING) << "Failed to enqueue block for LRU update: " << e.what();
106
0
    } catch (...) {
107
0
        if (reserved) {
108
0
            decrease_size(1);
109
0
        }
110
0
        LOG(WARNING) << "Failed to enqueue block for LRU update: unknown error";
111
0
    }
112
0
    return false;
113
4.32M
}
114
115
// Drain up to `limit` unique blocks to the caller, keeping the structure consistent on failures.
116
2.56k
size_t NeedUpdateLRUBlocks::drain(size_t limit, std::vector<FileBlockSPtr>* output) {
117
2.56k
    if (limit == 0 || output == nullptr) {
118
2
        return 0;
119
2
    }
120
2.56k
    size_t drained = 0;
121
2.56k
    try {
122
2.56k
        output->reserve(output->size() + std::min(limit, size()));
123
164k
        for (auto& shard : _shards) {
124
164k
            if (drained >= limit) {
125
1
                break;
126
1
            }
127
164k
            std::lock_guard lock(shard.mutex);
128
164k
            auto it = shard.entries.begin();
129
164k
            size_t shard_drained = 0;
130
321k
            while (it != shard.entries.end() && drained + shard_drained < limit) {
131
157k
                output->emplace_back(std::move(it->second));
132
157k
                it = shard.entries.erase(it);
133
157k
                ++shard_drained;
134
157k
            }
135
164k
            if (shard_drained > 0) {
136
830
                decrease_size(shard_drained);
137
830
                drained += shard_drained;
138
830
            }
139
164k
        }
140
2.56k
    } catch (const std::exception& e) {
141
0
        LOG(WARNING) << "Failed to drain LRU update blocks: " << e.what();
142
0
    } catch (...) {
143
0
        LOG(WARNING) << "Failed to drain LRU update blocks: unknown error";
144
0
    }
145
2.56k
    return drained;
146
2.56k
}
147
148
// Remove every pending block, guarding against unexpected exceptions.
149
30
void NeedUpdateLRUBlocks::clear() {
150
30
    try {
151
1.92k
        for (auto& shard : _shards) {
152
1.92k
            std::lock_guard lock(shard.mutex);
153
1.92k
            if (!shard.entries.empty()) {
154
2
                auto removed = shard.entries.size();
155
2
                shard.entries.clear();
156
2
                decrease_size(removed);
157
2
            }
158
1.92k
        }
159
30
    } catch (const std::exception& e) {
160
0
        LOG(WARNING) << "Failed to clear LRU update blocks: " << e.what();
161
0
    } catch (...) {
162
0
        LOG(WARNING) << "Failed to clear LRU update blocks: unknown error";
163
0
    }
164
30
}
165
166
832
void NeedUpdateLRUBlocks::decrease_size(size_t delta) {
167
832
    size_t cur_size = _size.load(std::memory_order_relaxed);
168
832
    while (true) {
169
832
        DORIS_CHECK_GE(cur_size, delta);
170
832
        if (_size.compare_exchange_weak(cur_size, cur_size - delta, std::memory_order_relaxed)) {
171
832
            return;
172
832
        }
173
832
    }
174
832
}
175
176
4.32M
size_t NeedUpdateLRUBlocks::shard_index(FileBlock* ptr) const {
177
4.32M
    DCHECK(ptr != nullptr);
178
4.32M
    return std::hash<FileBlock*> {}(ptr)&kShardMask;
179
4.32M
}
180
181
BlockFileCache::BlockFileCache(const std::string& cache_base_path,
182
                               const FileCacheSettings& cache_settings)
183
190
        : _cache_base_path(cache_base_path),
184
190
          _capacity(cache_settings.capacity),
185
190
          _max_file_block_size(cache_settings.max_file_block_size) {
186
190
    _cur_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(_cache_base_path.c_str(),
187
190
                                                                     "file_cache_cache_size", 0);
188
190
    _cache_capacity_metrics = std::make_shared<bvar::Status<size_t>>(
189
190
            _cache_base_path.c_str(), "file_cache_capacity", _capacity);
190
190
    _cur_ttl_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
191
190
            _cache_base_path.c_str(), "file_cache_ttl_cache_size", 0);
192
190
    _cur_normal_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
193
190
            _cache_base_path.c_str(), "file_cache_normal_queue_element_count", 0);
194
190
    _cur_ttl_cache_lru_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
195
190
            _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_size", 0);
196
190
    _cur_ttl_cache_lru_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
197
190
            _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_element_count", 0);
198
190
    _cur_normal_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
199
190
            _cache_base_path.c_str(), "file_cache_normal_queue_cache_size", 0);
200
190
    _cur_index_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
201
190
            _cache_base_path.c_str(), "file_cache_index_queue_element_count", 0);
202
190
    _cur_index_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
203
190
            _cache_base_path.c_str(), "file_cache_index_queue_cache_size", 0);
204
190
    _cur_disposable_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
205
190
            _cache_base_path.c_str(), "file_cache_disposable_queue_element_count", 0);
206
190
    _cur_disposable_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
207
190
            _cache_base_path.c_str(), "file_cache_disposable_queue_cache_size", 0);
208
209
190
    _queue_evict_size_metrics[0] = std::make_shared<bvar::Adder<size_t>>(
210
190
            _cache_base_path.c_str(), "file_cache_index_queue_evict_size");
211
190
    _queue_evict_size_metrics[1] = std::make_shared<bvar::Adder<size_t>>(
212
190
            _cache_base_path.c_str(), "file_cache_normal_queue_evict_size");
213
190
    _queue_evict_size_metrics[2] = std::make_shared<bvar::Adder<size_t>>(
214
190
            _cache_base_path.c_str(), "file_cache_disposable_queue_evict_size");
215
190
    _queue_evict_size_metrics[3] = std::make_shared<bvar::Adder<size_t>>(
216
190
            _cache_base_path.c_str(), "file_cache_ttl_cache_evict_size");
217
190
    _total_evict_size_metrics = std::make_shared<bvar::Adder<size_t>>(
218
190
            _cache_base_path.c_str(), "file_cache_total_evict_size");
219
190
    _total_read_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
220
190
                                                                     "file_cache_total_read_size");
221
190
    _total_hit_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
222
190
                                                                    "file_cache_total_hit_size");
223
190
    _gc_evict_bytes_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
224
190
                                                                    "file_cache_gc_evict_bytes");
225
190
    _gc_evict_count_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
226
190
                                                                    "file_cache_gc_evict_count");
227
228
190
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
229
190
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
230
190
                                                  "file_cache_evict_by_time_disposable_to_normal");
231
190
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
232
190
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
233
190
                                                  "file_cache_evict_by_time_disposable_to_index");
234
190
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
235
190
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
236
190
                                                  "file_cache_evict_by_time_disposable_to_ttl");
237
190
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
238
190
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
239
190
                                                  "file_cache_evict_by_time_normal_to_disposable");
240
190
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
241
190
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
242
190
                                                  "file_cache_evict_by_time_normal_to_index");
243
190
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
244
190
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
245
190
                                                  "file_cache_evict_by_time_normal_to_ttl");
246
190
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
247
190
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
248
190
                                                  "file_cache_evict_by_time_index_to_disposable");
249
190
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
250
190
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
251
190
                                                  "file_cache_evict_by_time_index_to_normal");
252
190
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
253
190
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
254
190
                                                  "file_cache_evict_by_time_index_to_ttl");
255
190
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
256
190
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
257
190
                                                  "file_cache_evict_by_time_ttl_to_disposable");
258
190
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
259
190
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
260
190
                                                  "file_cache_evict_by_time_ttl_to_normal");
261
190
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
262
190
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
263
190
                                                  "file_cache_evict_by_time_ttl_to_index");
264
265
190
    _evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE] =
266
190
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
267
190
                                                  "file_cache_evict_by_self_lru_disposable");
268
190
    _evict_by_self_lru_metrics_matrix[FileCacheType::NORMAL] =
269
190
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
270
190
                                                  "file_cache_evict_by_self_lru_normal");
271
190
    _evict_by_self_lru_metrics_matrix[FileCacheType::INDEX] = std::make_shared<bvar::Adder<size_t>>(
272
190
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_index");
273
190
    _evict_by_self_lru_metrics_matrix[FileCacheType::TTL] = std::make_shared<bvar::Adder<size_t>>(
274
190
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_ttl");
275
276
190
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
277
190
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
278
190
                                                  "file_cache_evict_by_size_disposable_to_normal");
279
190
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
280
190
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
281
190
                                                  "file_cache_evict_by_size_disposable_to_index");
282
190
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
283
190
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
284
190
                                                  "file_cache_evict_by_size_disposable_to_ttl");
285
190
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
286
190
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
287
190
                                                  "file_cache_evict_by_size_normal_to_disposable");
288
190
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
289
190
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
290
190
                                                  "file_cache_evict_by_size_normal_to_index");
291
190
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
292
190
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
293
190
                                                  "file_cache_evict_by_size_normal_to_ttl");
294
190
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
295
190
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
296
190
                                                  "file_cache_evict_by_size_index_to_disposable");
297
190
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
298
190
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
299
190
                                                  "file_cache_evict_by_size_index_to_normal");
300
190
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
301
190
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
302
190
                                                  "file_cache_evict_by_size_index_to_ttl");
303
190
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
304
190
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
305
190
                                                  "file_cache_evict_by_size_ttl_to_disposable");
306
190
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
307
190
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
308
190
                                                  "file_cache_evict_by_size_ttl_to_normal");
309
190
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
310
190
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
311
190
                                                  "file_cache_evict_by_size_ttl_to_index");
312
313
190
    _evict_by_try_release = std::make_shared<bvar::Adder<size_t>>(
314
190
            _cache_base_path.c_str(), "file_cache_evict_by_try_release");
315
316
190
    _num_read_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
317
190
                                                             "file_cache_num_read_blocks");
318
190
    _num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
319
190
                                                            "file_cache_num_hit_blocks");
320
190
    _num_removed_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
321
190
                                                                "file_cache_num_removed_blocks");
322
323
190
    _no_warmup_num_read_blocks = std::make_shared<bvar::Adder<size_t>>(
324
190
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks");
325
190
    _no_warmup_num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(
326
190
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks");
327
328
190
#ifndef BE_TEST
329
190
    _num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
330
190
            _cache_base_path.c_str(), "file_cache_num_hit_blocks_5m", _num_hit_blocks.get(), 300);
331
190
    _num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
332
190
            _cache_base_path.c_str(), "file_cache_num_read_blocks_5m", _num_read_blocks.get(), 300);
333
190
    _num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
334
190
            _cache_base_path.c_str(), "file_cache_num_hit_blocks_1h", _num_hit_blocks.get(), 3600);
335
190
    _num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
336
190
            _cache_base_path.c_str(), "file_cache_num_read_blocks_1h", _num_read_blocks.get(),
337
190
            3600);
338
190
    _no_warmup_num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
339
190
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_5m",
340
190
            _no_warmup_num_hit_blocks.get(), 300);
341
190
    _no_warmup_num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
342
190
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_5m",
343
190
            _no_warmup_num_read_blocks.get(), 300);
344
190
    _no_warmup_num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
345
190
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_1h",
346
190
            _no_warmup_num_hit_blocks.get(), 3600);
347
190
    _no_warmup_num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
348
190
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_1h",
349
190
            _no_warmup_num_read_blocks.get(), 3600);
350
190
#endif
351
352
190
    _hit_ratio = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
353
190
                                                        "file_cache_hit_ratio", 0.0);
354
190
    _hit_ratio_5m = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
355
190
                                                           "file_cache_hit_ratio_5m", 0.0);
356
190
    _hit_ratio_1h = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
357
190
                                                           "file_cache_hit_ratio_1h", 0.0);
358
359
190
    _no_warmup_hit_ratio = std::make_shared<bvar::Status<double>>(
360
190
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio", 0.0);
361
190
    _no_warmup_hit_ratio_5m = std::make_shared<bvar::Status<double>>(
362
190
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_5m", 0.0);
363
190
    _no_warmup_hit_ratio_1h = std::make_shared<bvar::Status<double>>(
364
190
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_1h", 0.0);
365
366
190
    _disk_limit_mode_metrics = std::make_shared<bvar::Status<size_t>>(
367
190
            _cache_base_path.c_str(), "file_cache_disk_limit_mode", 0);
368
190
    _need_evict_cache_in_advance_metrics = std::make_shared<bvar::Status<size_t>>(
369
190
            _cache_base_path.c_str(), "file_cache_need_evict_cache_in_advance", 0);
370
190
    _meta_store_write_queue_size_metrics = std::make_shared<bvar::Status<size_t>>(
371
190
            _cache_base_path.c_str(), "file_cache_meta_store_write_queue_size", 0);
372
373
190
    _cache_lock_wait_time_us = std::make_shared<bvar::LatencyRecorder>(
374
190
            _cache_base_path.c_str(), "file_cache_cache_lock_wait_time_us");
375
190
    _get_or_set_latency_us = std::make_shared<bvar::LatencyRecorder>(
376
190
            _cache_base_path.c_str(), "file_cache_get_or_set_latency_us");
377
190
    _storage_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
378
190
            _cache_base_path.c_str(), "file_cache_storage_sync_remove_latency_us");
379
190
    _storage_retry_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
380
190
            _cache_base_path.c_str(), "file_cache_storage_retry_sync_remove_latency_us");
381
190
    _storage_async_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
382
190
            _cache_base_path.c_str(), "file_cache_storage_async_remove_latency_us");
383
190
    _evict_in_advance_latency_us = std::make_shared<bvar::LatencyRecorder>(
384
190
            _cache_base_path.c_str(), "file_cache_evict_in_advance_latency_us");
385
190
    _lru_dump_latency_us = std::make_shared<bvar::LatencyRecorder>(
386
190
            _cache_base_path.c_str(), "file_cache_lru_dump_latency_us");
387
190
    _recycle_keys_length_recorder = std::make_shared<bvar::LatencyRecorder>(
388
190
            _cache_base_path.c_str(), "file_cache_recycle_keys_length");
389
190
    _need_update_lru_blocks_length_recorder = std::make_shared<bvar::LatencyRecorder>(
390
190
            _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_length");
391
190
    _need_update_lru_blocks_produce_metrics = std::make_shared<bvar::Adder<size_t>>(
392
190
            _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_produce");
393
190
    _need_update_lru_blocks_consume_metrics = std::make_shared<bvar::Adder<size_t>>(
394
190
            _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_consume");
395
190
    _update_lru_blocks_latency_us = std::make_shared<bvar::LatencyRecorder>(
396
190
            _cache_base_path.c_str(), "file_cache_update_lru_blocks_latency_us");
397
190
    _ttl_gc_latency_us = std::make_shared<bvar::LatencyRecorder>(_cache_base_path.c_str(),
398
190
                                                                 "file_cache_ttl_gc_latency_us");
399
190
    _shadow_queue_levenshtein_distance = std::make_shared<bvar::LatencyRecorder>(
400
190
            _cache_base_path.c_str(), "file_cache_shadow_queue_levenshtein_distance");
401
190
    for (FileCacheType type : {FileCacheType::DISPOSABLE, FileCacheType::NORMAL,
402
760
                               FileCacheType::INDEX, FileCacheType::TTL}) {
403
760
        size_t idx = file_cache_type_index(type);
404
760
        std::string metric_prefix =
405
760
                "file_cache_lru_recorder_" + cache_type_to_string(type) + "_record_queue";
406
760
        _lru_recorder_queue_length_recorder[idx] = std::make_shared<bvar::LatencyRecorder>(
407
760
                _cache_base_path.c_str(), metric_prefix + "_length");
408
760
        _lru_recorder_queue_produce_metrics[idx] = std::make_shared<bvar::Adder<size_t>>(
409
760
                _cache_base_path.c_str(), metric_prefix + "_produce");
410
760
        _lru_recorder_queue_consume_metrics[idx] = std::make_shared<bvar::Adder<size_t>>(
411
760
                _cache_base_path.c_str(), metric_prefix + "_consume");
412
760
        _lru_recorder_shadow_queue_element_count_metrics[idx] =
413
760
                std::make_shared<bvar::Status<size_t>>(_cache_base_path.c_str(),
414
760
                                                       "file_cache_lru_recorder_" +
415
760
                                                               cache_type_to_string(type) +
416
760
                                                               "_shadow_queue_element_count",
417
760
                                                       0);
418
760
    }
419
190
    _lru_recorder_log_replay_idle_metrics = std::make_shared<bvar::Adder<size_t>>(
420
190
            _cache_base_path.c_str(), "file_cache_lru_recorder_log_replay_idle");
421
422
190
    _disposable_queue = LRUQueue(cache_settings.disposable_queue_size,
423
190
                                 cache_settings.disposable_queue_elements, 60 * 60);
424
190
    _index_queue = LRUQueue(cache_settings.index_queue_size, cache_settings.index_queue_elements,
425
190
                            7 * 24 * 60 * 60);
426
190
    _normal_queue = LRUQueue(cache_settings.query_queue_size, cache_settings.query_queue_elements,
427
190
                             24 * 60 * 60);
428
190
    _ttl_queue = LRUQueue(cache_settings.ttl_queue_size, cache_settings.ttl_queue_elements,
429
190
                          std::numeric_limits<int>::max());
430
431
190
    _lru_recorder = std::make_unique<LRUQueueRecorder>(this);
432
190
    _lru_dumper = std::make_unique<CacheLRUDumper>(this, _lru_recorder.get());
433
190
    if (cache_settings.storage == "memory") {
434
24
        _storage = std::make_unique<MemFileCacheStorage>();
435
24
        _cache_base_path = "memory";
436
166
    } else {
437
166
        _storage = std::make_unique<FSFileCacheStorage>();
438
166
    }
439
440
190
    LOG(INFO) << "file cache path= " << _cache_base_path << " " << cache_settings.to_string();
441
190
}
442
443
453k
UInt128Wrapper BlockFileCache::hash(const std::string& path) {
444
453k
    uint128_t value;
445
453k
    sip_hash128(path.data(), path.size(), reinterpret_cast<char*>(&value));
446
453k
    return UInt128Wrapper(value);
447
453k
}
448
449
BlockFileCache::QueryFileCacheContextHolderPtr BlockFileCache::get_query_context_holder(
450
18
        const TUniqueId& query_id, int file_cache_query_limit_percent) {
451
18
    SCOPED_CACHE_LOCK(_mutex, this);
452
18
    if (!config::enable_file_cache_query_limit) {
453
1
        return {};
454
1
    }
455
456
    /// if enable_filesystem_query_cache_limit is true,
457
    /// we create context query for current query.
458
17
    auto context = get_or_set_query_context(query_id, cache_lock, file_cache_query_limit_percent);
459
17
    return std::make_unique<QueryFileCacheContextHolder>(query_id, this, context);
460
18
}
461
462
BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_query_context(
463
23.0k
        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock) {
464
23.0k
    auto query_iter = _query_map.find(query_id);
465
23.0k
    return (query_iter == _query_map.end()) ? nullptr : query_iter->second;
466
23.0k
}
467
468
16
void BlockFileCache::remove_query_context(const TUniqueId& query_id) {
469
16
    SCOPED_CACHE_LOCK(_mutex, this);
470
16
    const auto& query_iter = _query_map.find(query_id);
471
472
16
    if (query_iter != _query_map.end() && query_iter->second.use_count() <= 1) {
473
15
        _query_map.erase(query_iter);
474
15
    }
475
16
}
476
477
BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_or_set_query_context(
478
        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock,
479
17
        int file_cache_query_limit_percent) {
480
17
    if (query_id.lo == 0 && query_id.hi == 0) {
481
1
        return nullptr;
482
1
    }
483
484
16
    auto context = get_query_context(query_id, cache_lock);
485
16
    if (context) {
486
1
        return context;
487
1
    }
488
489
15
    size_t file_cache_query_limit_size = _capacity * file_cache_query_limit_percent / 100;
490
15
    if (file_cache_query_limit_size < 268435456) {
491
15
        LOG(WARNING) << "The user-set file cache query limit (" << file_cache_query_limit_size
492
15
                     << " bytes) is less than the 256MB recommended minimum. "
493
15
                     << "Consider increasing the session variable 'file_cache_query_limit_percent'"
494
15
                     << " from its current value " << file_cache_query_limit_percent << "%.";
495
15
    }
496
15
    auto query_context = std::make_shared<QueryFileCacheContext>(file_cache_query_limit_size);
497
15
    auto query_iter = _query_map.emplace(query_id, query_context).first;
498
15
    return query_iter->second;
499
16
}
500
501
void BlockFileCache::QueryFileCacheContext::remove(const UInt128Wrapper& hash, size_t offset,
502
80
                                                   std::lock_guard<std::mutex>& cache_lock) {
503
80
    auto pair = std::make_pair(hash, offset);
504
80
    auto record = records.find(pair);
505
80
    DCHECK(record != records.end());
506
80
    auto iter = record->second;
507
80
    records.erase(pair);
508
80
    lru_queue.remove(iter, cache_lock);
509
80
}
510
511
void BlockFileCache::QueryFileCacheContext::reserve(const UInt128Wrapper& hash, size_t offset,
512
                                                    size_t size,
513
110
                                                    std::lock_guard<std::mutex>& cache_lock) {
514
110
    auto pair = std::make_pair(hash, offset);
515
110
    if (records.find(pair) == records.end()) {
516
109
        auto queue_iter = lru_queue.add(hash, offset, size, cache_lock);
517
109
        records.insert({pair, queue_iter});
518
109
    }
519
110
}
520
521
161
Status BlockFileCache::initialize() {
522
161
    SCOPED_CACHE_LOCK(_mutex, this);
523
161
    return initialize_unlocked(cache_lock);
524
161
}
525
526
161
Status BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lock) {
527
161
    DCHECK(!_is_initialized);
528
161
    _is_initialized = true;
529
161
    if (config::file_cache_background_lru_dump_tail_record_num > 0) {
530
        // requirements:
531
        // 1. restored data should not overwrite the last dump
532
        // 2. restore should happen before load and async load
533
        // 3. all queues should be restored sequencially to avoid conflict
534
        // TODO(zhengyu): we can parralize them but will increase complexity, so lets check the time cost
535
        // to see if any improvement is a necessary
536
161
        restore_lru_queues_from_disk(cache_lock);
537
161
    }
538
161
    RETURN_IF_ERROR(_storage->init(this));
539
540
161
    if (auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get())) {
541
139
        if (auto* meta_store = fs_storage->get_meta_store()) {
542
139
            _ttl_mgr = std::make_unique<BlockFileCacheTtlMgr>(this, meta_store);
543
139
        }
544
139
    }
545
546
161
    _cache_background_monitor_thread = std::thread(&BlockFileCache::run_background_monitor, this);
547
161
    _cache_background_gc_thread = std::thread(&BlockFileCache::run_background_gc, this);
548
161
    _cache_background_evict_in_advance_thread =
549
161
            std::thread(&BlockFileCache::run_background_evict_in_advance, this);
550
161
    _cache_background_block_lru_update_thread =
551
161
            std::thread(&BlockFileCache::run_background_block_lru_update, this);
552
553
    // Initialize LRU dump thread and restore queues
554
161
    _cache_background_lru_dump_thread = std::thread(&BlockFileCache::run_background_lru_dump, this);
555
161
    _cache_background_lru_log_replay_thread =
556
161
            std::thread(&BlockFileCache::run_background_lru_log_replay, this);
557
558
161
    return Status::OK();
559
161
}
560
561
void BlockFileCache::update_block_lru(FileBlockSPtr block,
562
157k
                                      std::lock_guard<std::mutex>& cache_lock) {
563
157k
    if (!block) {
564
1
        return;
565
1
    }
566
567
157k
    FileBlockCell* cell = get_cell(block->get_hash_value(), block->offset(), cache_lock);
568
157k
    if (!cell || cell->file_block.get() != block.get()) {
569
24.6k
        return;
570
24.6k
    }
571
572
132k
    if (cell->queue_iterator) {
573
132k
        auto& queue = get_queue(block->cache_type());
574
132k
        queue.move_to_end(*cell->queue_iterator, cache_lock);
575
132k
        _lru_recorder->record_queue_event(block->cache_type(), CacheLRULogType::MOVETOBACK,
576
132k
                                          block->_key.hash, block->_key.offset,
577
132k
                                          block->_block_range.size());
578
132k
    }
579
132k
    cell->update_atime();
580
132k
}
581
582
void BlockFileCache::use_cell(const FileBlockCell& cell, FileBlocks* result, bool move_iter_flag,
583
731k
                              std::lock_guard<std::mutex>& cache_lock) {
584
731k
    if (result) {
585
731k
        result->push_back(cell.file_block);
586
731k
    }
587
588
731k
    auto& queue = get_queue(cell.file_block->cache_type());
589
    /// Move to the end of the queue. The iterator remains valid.
590
731k
    if (!config::enable_file_cache_async_touch_on_get_or_set && cell.queue_iterator &&
591
731k
        move_iter_flag) {
592
538k
        queue.move_to_end(*cell.queue_iterator, cache_lock);
593
538k
        _lru_recorder->record_queue_event(cell.file_block->cache_type(),
594
538k
                                          CacheLRULogType::MOVETOBACK, cell.file_block->_key.hash,
595
538k
                                          cell.file_block->_key.offset, cell.size());
596
538k
    }
597
598
731k
    cell.update_atime();
599
731k
}
600
601
template <class T>
602
    requires IsXLock<T>
603
FileBlockCell* BlockFileCache::get_cell(const UInt128Wrapper& hash, size_t offset,
604
21.0M
                                        T& /* cache_lock */) {
605
21.0M
    auto it = _files.find(hash);
606
21.0M
    if (it == _files.end()) {
607
4.40k
        return nullptr;
608
4.40k
    }
609
610
21.0M
    auto& offsets = it->second;
611
21.0M
    auto cell_it = offsets.find(offset);
612
21.0M
    if (cell_it == offsets.end()) {
613
486
        return nullptr;
614
486
    }
615
616
21.0M
    return &cell_it->second;
617
21.0M
}
618
619
923k
bool BlockFileCache::need_to_move(FileCacheType cell_type, FileCacheType query_type) const {
620
923k
    return query_type != FileCacheType::DISPOSABLE && cell_type != FileCacheType::DISPOSABLE;
621
923k
}
622
623
FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheContext& context,
624
                                    const FileBlock::Range& range,
625
874k
                                    std::lock_guard<std::mutex>& cache_lock) {
626
    /// Given range = [left, right] and non-overlapping ordered set of file blocks,
627
    /// find list [block1, ..., blockN] of blocks which intersect with given range.
628
874k
    auto it = _files.find(hash);
629
874k
    if (it == _files.end()) {
630
134k
        if (_async_open_done) {
631
134k
            return {};
632
134k
        }
633
1
        FileCacheKey key;
634
1
        key.hash = hash;
635
1
        key.meta.type = context.cache_type;
636
1
        key.meta.expiration_time = context.expiration_time;
637
1
        key.meta.tablet_id = context.tablet_id;
638
1
        _storage->load_blocks_directly_unlocked(this, key, cache_lock);
639
640
1
        it = _files.find(hash);
641
1
        if (it == _files.end()) [[unlikely]] {
642
1
            return {};
643
1
        }
644
1
    }
645
646
740k
    auto& file_blocks = it->second;
647
740k
    if (file_blocks.empty()) {
648
0
        LOG(WARNING) << "file_blocks is empty for hash=" << hash.to_string()
649
0
                     << " cache type=" << context.cache_type
650
0
                     << " cache expiration time=" << context.expiration_time
651
0
                     << " cache range=" << range.left << " " << range.right
652
0
                     << " query id=" << context.query_id;
653
0
        DCHECK(false);
654
0
        _files.erase(hash);
655
0
        return {};
656
0
    }
657
658
740k
    FileBlocks result;
659
740k
    auto block_it = file_blocks.lower_bound(range.left);
660
740k
    if (block_it == file_blocks.end()) {
661
        /// N - last cached block for given file hash, block{N}.offset < range.left:
662
        ///   block{N}                       block{N}
663
        /// [________                         [_______]
664
        ///     [__________]         OR                  [________]
665
        ///     ^                                        ^
666
        ///     range.left                               range.left
667
668
6.80k
        const auto& cell = file_blocks.rbegin()->second;
669
6.80k
        if (cell.file_block->range().right < range.left) {
670
6.78k
            return {};
671
6.78k
        }
672
673
14
        use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
674
14
                 cache_lock);
675
733k
    } else { /// block_it <-- segmment{k}
676
733k
        if (block_it != file_blocks.begin()) {
677
1.85k
            const auto& prev_cell = std::prev(block_it)->second;
678
1.85k
            const auto& prev_cell_range = prev_cell.file_block->range();
679
680
1.85k
            if (range.left <= prev_cell_range.right) {
681
                ///   block{k-1}  block{k}
682
                ///   [________]   [_____
683
                ///       [___________
684
                ///       ^
685
                ///       range.left
686
687
137
                use_cell(prev_cell, &result,
688
137
                         need_to_move(prev_cell.file_block->cache_type(), context.cache_type),
689
137
                         cache_lock);
690
137
            }
691
1.85k
        }
692
693
        ///  block{k} ...       block{k-1}  block{k}                      block{k}
694
        ///  [______              [______]     [____                        [________
695
        ///  [_________     OR              [________      OR    [______]   ^
696
        ///  ^                              ^                           ^   block{k}.offset
697
        ///  range.left                     range.left                  range.right
698
699
1.46M
        while (block_it != file_blocks.end()) {
700
735k
            const auto& cell = block_it->second;
701
735k
            if (range.right < cell.file_block->range().left) {
702
3.59k
                break;
703
3.59k
            }
704
705
731k
            use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
706
731k
                     cache_lock);
707
731k
            ++block_it;
708
731k
        }
709
733k
    }
710
711
733k
    return result;
712
740k
}
713
714
4.32M
void BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) {
715
4.32M
    int64_t queue_limit = config::file_cache_background_block_lru_update_queue_max_size;
716
4.32M
    size_t max_queue_size = queue_limit <= 0 ? 0 : static_cast<size_t>(queue_limit);
717
4.32M
    if (_need_update_lru_blocks.insert(std::move(block), max_queue_size)) {
718
158k
        *_need_update_lru_blocks_produce_metrics << 1;
719
158k
        *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
720
158k
    }
721
4.32M
}
722
723
4
std::string BlockFileCache::clear_file_cache_async() {
724
4
    return clear_file_cache_impl(false);
725
4
}
726
727
24
std::string BlockFileCache::clear_file_cache_sync() {
728
24
    return clear_file_cache_impl(true);
729
24
}
730
731
28
std::string BlockFileCache::clear_file_cache_impl(bool sync_remove) {
732
28
    const char* action = sync_remove ? "clear_file_cache_sync" : "clear_file_cache_async";
733
28
    LOG(INFO) << "start " << action << ", path=" << _cache_base_path;
734
28
    _lru_dumper->remove_lru_dump_files();
735
28
    int64_t num_cells_all = 0;
736
28
    int64_t num_cells_to_delete = 0;
737
28
    int64_t num_cells_wait_recycle = 0;
738
28
    int64_t num_files_all = 0;
739
28
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::clear_file_cache_async");
740
28
    {
741
28
        SCOPED_CACHE_LOCK(_mutex, this);
742
743
28
        std::vector<FileBlockCell*> deleting_cells;
744
5.23k
        for (auto& [_, offset_to_cell] : _files) {
745
5.23k
            ++num_files_all;
746
105k
            for (auto& [_1, cell] : offset_to_cell) {
747
105k
                ++num_cells_all;
748
105k
                deleting_cells.push_back(&cell);
749
105k
            }
750
5.23k
        }
751
752
        // Do not erase while walking _files above: remove() may erase the current map element.
753
        //
754
        // sync_remove only changes how already releasable DOWNLOADED blocks are deleted from
755
        // storage. Busy blocks keep the existing holder lifecycle: mark them deleting and leave
756
        // them in _files until the last holder releases them.
757
105k
        for (auto& cell : deleting_cells) {
758
105k
            if (!cell->releasable()) {
759
103
                LOG(INFO) << "cell is not releasable, hash="
760
103
                          << " offset=" << cell->file_block->offset();
761
103
                cell->file_block->set_deleting();
762
103
                ++num_cells_wait_recycle;
763
103
                continue;
764
103
            }
765
105k
            FileBlockSPtr file_block = cell->file_block;
766
105k
            if (file_block) {
767
105k
                std::lock_guard block_lock(file_block->_mutex);
768
105k
                remove(file_block, cache_lock, block_lock, sync_remove);
769
105k
                ++num_cells_to_delete;
770
105k
            }
771
105k
        }
772
28
        clear_need_update_lru_blocks();
773
28
    }
774
775
28
    std::stringstream ss;
776
28
    ss << "finish " << action << ", path=" << _cache_base_path << " sync_remove=" << sync_remove
777
28
       << " num_files_all=" << num_files_all << " num_cells_all=" << num_cells_all
778
28
       << " num_cells_to_delete=" << num_cells_to_delete
779
28
       << " num_cells_wait_recycle=" << num_cells_wait_recycle;
780
28
    auto msg = ss.str();
781
28
    LOG(INFO) << msg;
782
28
    _lru_dumper->remove_lru_dump_files();
783
28
    return msg;
784
28
}
785
786
FileBlocks BlockFileCache::split_range_into_cells(const UInt128Wrapper& hash,
787
                                                  const CacheContext& context, size_t offset,
788
                                                  size_t size, FileBlock::State state,
789
144k
                                                  std::lock_guard<std::mutex>& cache_lock) {
790
144k
    DCHECK(size > 0);
791
792
144k
    auto current_pos = offset;
793
144k
    auto end_pos_non_included = offset + size;
794
795
144k
    size_t current_size = 0;
796
144k
    size_t remaining_size = size;
797
798
144k
    FileBlocks file_blocks;
799
530k
    while (current_pos < end_pos_non_included) {
800
386k
        current_size = std::min(remaining_size, _max_file_block_size);
801
386k
        remaining_size -= current_size;
802
386k
        state = try_reserve(hash, context, current_pos, current_size, cache_lock)
803
386k
                        ? state
804
386k
                        : FileBlock::State::SKIP_CACHE;
805
386k
        if (state == FileBlock::State::SKIP_CACHE) [[unlikely]] {
806
33.4k
            FileCacheKey key;
807
33.4k
            key.hash = hash;
808
33.4k
            key.offset = current_pos;
809
33.4k
            key.meta.type = context.cache_type;
810
33.4k
            key.meta.expiration_time = context.expiration_time;
811
33.4k
            key.meta.tablet_id = context.tablet_id;
812
33.4k
            auto file_block = std::make_shared<FileBlock>(key, current_size, this,
813
33.4k
                                                          FileBlock::State::SKIP_CACHE);
814
33.4k
            file_blocks.push_back(std::move(file_block));
815
353k
        } else {
816
353k
            auto* cell = add_cell(hash, context, current_pos, current_size, state, cache_lock);
817
353k
            if (cell) {
818
353k
                file_blocks.push_back(cell->file_block);
819
353k
                if (!context.is_cold_data) {
820
353k
                    cell->update_atime();
821
353k
                }
822
353k
            }
823
353k
            if (_ttl_mgr && context.tablet_id != 0) {
824
340k
                _ttl_mgr->register_tablet_id(context.tablet_id);
825
340k
            }
826
353k
        }
827
828
386k
        current_pos += current_size;
829
386k
    }
830
831
144k
    DCHECK(file_blocks.empty() || offset + size - 1 == file_blocks.back()->range().right);
832
144k
    return file_blocks;
833
144k
}
834
835
void BlockFileCache::fill_holes_with_empty_file_blocks(FileBlocks& file_blocks,
836
                                                       const UInt128Wrapper& hash,
837
                                                       const CacheContext& context,
838
                                                       const FileBlock::Range& range,
839
730k
                                                       std::lock_guard<std::mutex>& cache_lock) {
840
    /// There are blocks [block1, ..., blockN]
841
    /// (non-overlapping, non-empty, ascending-ordered) which (maybe partially)
842
    /// intersect with given range.
843
844
    /// It can have holes:
845
    /// [____________________]         -- requested range
846
    ///     [____]  [_]   [_________]  -- intersecting cache [block1, ..., blockN]
847
    ///
848
    /// For each such hole create a cell with file block state EMPTY.
849
850
730k
    auto it = file_blocks.begin();
851
730k
    auto block_range = (*it)->range();
852
853
730k
    size_t current_pos = 0;
854
730k
    if (block_range.left < range.left) {
855
        ///    [_______     -- requested range
856
        /// [_______
857
        /// ^
858
        /// block1
859
860
151
        current_pos = block_range.right + 1;
861
151
        ++it;
862
730k
    } else {
863
730k
        current_pos = range.left;
864
730k
    }
865
866
1.46M
    while (current_pos <= range.right && it != file_blocks.end()) {
867
731k
        block_range = (*it)->range();
868
869
731k
        if (current_pos == block_range.left) {
870
731k
            current_pos = block_range.right + 1;
871
731k
            ++it;
872
731k
            continue;
873
731k
        }
874
875
731k
        DCHECK(current_pos < block_range.left);
876
877
171
        auto hole_size = block_range.left - current_pos;
878
879
171
        file_blocks.splice(it, split_range_into_cells(hash, context, current_pos, hole_size,
880
171
                                                      FileBlock::State::EMPTY, cache_lock));
881
882
171
        current_pos = block_range.right + 1;
883
171
        ++it;
884
171
    }
885
886
730k
    if (current_pos <= range.right) {
887
        ///   ________]     -- requested range
888
        ///   _____]
889
        ///        ^
890
        /// blockN
891
892
434
        auto hole_size = range.right - current_pos + 1;
893
894
434
        file_blocks.splice(file_blocks.end(),
895
434
                           split_range_into_cells(hash, context, current_pos, hole_size,
896
434
                                                  FileBlock::State::EMPTY, cache_lock));
897
434
    }
898
730k
}
899
900
FileBlocksHolder BlockFileCache::get_or_set(const UInt128Wrapper& hash, size_t offset, size_t size,
901
874k
                                            CacheContext& context) {
902
874k
    FileBlock::Range range(offset, offset + size - 1);
903
904
874k
    ReadStatistics* stats = context.stats;
905
874k
    DCHECK(stats != nullptr);
906
874k
    MonotonicStopWatch sw;
907
874k
    sw.start();
908
874k
    FileBlocks file_blocks;
909
874k
    std::vector<FileBlockSPtr> need_update_lru_blocks;
910
874k
    const bool async_touch_on_get_or_set = config::enable_file_cache_async_touch_on_get_or_set;
911
874k
    int64_t duration = 0;
912
874k
    {
913
874k
        ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->increment();
914
874k
        std::lock_guard cache_lock(_mutex);
915
874k
        ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->decrement();
916
874k
        stats->lock_wait_timer += sw.elapsed_time();
917
874k
        SCOPED_RAW_TIMER(&duration);
918
        /// Get all blocks which intersect with the given range.
919
874k
        {
920
874k
            SCOPED_RAW_TIMER(&stats->get_timer);
921
874k
            file_blocks = get_impl(hash, context, range, cache_lock);
922
874k
        }
923
924
874k
        if (file_blocks.empty()) {
925
143k
            SCOPED_RAW_TIMER(&stats->set_timer);
926
143k
            file_blocks = split_range_into_cells(hash, context, offset, size,
927
143k
                                                 FileBlock::State::EMPTY, cache_lock);
928
730k
        } else {
929
730k
            SCOPED_RAW_TIMER(&stats->set_timer);
930
730k
            fill_holes_with_empty_file_blocks(file_blocks, hash, context, range, cache_lock);
931
730k
        }
932
874k
        DCHECK(!file_blocks.empty());
933
874k
        *_num_read_blocks << file_blocks.size();
934
874k
        if (!context.is_warmup) {
935
871k
            *_no_warmup_num_read_blocks << file_blocks.size();
936
871k
        }
937
874k
        if (async_touch_on_get_or_set) {
938
193k
            need_update_lru_blocks.reserve(file_blocks.size());
939
193k
        }
940
1.11M
        for (auto& block : file_blocks) {
941
1.11M
            size_t block_size = block->range().size();
942
1.11M
            *_total_read_size_metrics << block_size;
943
1.11M
            if (block->state_unsafe() == FileBlock::State::DOWNLOADED) {
944
726k
                *_num_hit_blocks << 1;
945
726k
                *_total_hit_size_metrics << block_size;
946
726k
                if (!context.is_warmup) {
947
726k
                    *_no_warmup_num_hit_blocks << 1;
948
726k
                }
949
726k
                if (async_touch_on_get_or_set &&
950
726k
                    need_to_move(block->cache_type(), context.cache_type)) {
951
192k
                    need_update_lru_blocks.emplace_back(block);
952
192k
                }
953
726k
            }
954
1.11M
        }
955
874k
    }
956
957
874k
    if (async_touch_on_get_or_set) {
958
193k
        for (auto& block : need_update_lru_blocks) {
959
192k
            add_need_update_lru_block(std::move(block));
960
192k
        }
961
193k
    }
962
963
874k
    *_get_or_set_latency_us << (duration / 1000);
964
874k
    return FileBlocksHolder(std::move(file_blocks));
965
874k
}
966
967
FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& hash, const CacheContext& context,
968
                                        size_t offset, size_t size, FileBlock::State state,
969
453k
                                        std::lock_guard<std::mutex>& cache_lock) {
970
    /// Create a file block cell and put it in `files` map by [hash][offset].
971
453k
    if (size == 0) {
972
0
        return nullptr; /// Empty files are not cached.
973
0
    }
974
975
453k
    VLOG_DEBUG << "Adding file block to cache. size=" << size << " hash=" << hash.to_string()
976
0
               << " offset=" << offset << " cache_type=" << cache_type_to_string(context.cache_type)
977
0
               << " expiration_time=" << context.expiration_time
978
0
               << " tablet_id=" << context.tablet_id;
979
980
453k
    if (size > 1024 * 1024 * 1024) {
981
1
        LOG(WARNING) << "File block size is too large for a block, reject. size=" << size
982
1
                     << " hash=" << hash.to_string() << " offset=" << offset
983
1
                     << " stack:" << get_stack_trace();
984
1
        return nullptr;
985
1
    }
986
987
453k
    auto& offsets = _files[hash];
988
453k
    auto itr = offsets.find(offset);
989
453k
    if (itr != offsets.end()) {
990
5
        VLOG_DEBUG << "Cache already exists for hash: " << hash.to_string()
991
0
                   << ", offset: " << offset << ", size: " << size
992
0
                   << ".\nCurrent cache structure: " << dump_structure_unlocked(hash, cache_lock);
993
5
        return &(itr->second);
994
5
    }
995
996
453k
    FileCacheKey key;
997
453k
    key.hash = hash;
998
453k
    key.offset = offset;
999
453k
    key.meta.type = context.cache_type;
1000
453k
    key.meta.expiration_time = context.expiration_time;
1001
453k
    key.meta.tablet_id = context.tablet_id;
1002
453k
    FileBlockCell cell(std::make_shared<FileBlock>(key, size, this, state), cache_lock);
1003
453k
    Status st;
1004
453k
    if (context.expiration_time == 0 && context.cache_type == FileCacheType::TTL) {
1005
0
        st = cell.file_block->change_cache_type_lock(FileCacheType::NORMAL, cache_lock);
1006
453k
    } else if (context.cache_type != FileCacheType::TTL && context.expiration_time != 0) {
1007
1
        st = cell.file_block->change_cache_type_lock(FileCacheType::TTL, cache_lock);
1008
1
    }
1009
453k
    if (!st.ok()) {
1010
0
        LOG(WARNING) << "Cannot change cache type. expiration_time=" << context.expiration_time
1011
0
                     << " cache_type=" << cache_type_to_string(context.cache_type)
1012
0
                     << " error=" << st.msg();
1013
0
    }
1014
1015
453k
    auto& queue = get_queue(cell.file_block->cache_type());
1016
453k
    cell.queue_iterator = queue.add(hash, offset, size, cache_lock);
1017
453k
    _lru_recorder->record_queue_event(cell.file_block->cache_type(), CacheLRULogType::ADD,
1018
453k
                                      cell.file_block->get_hash_value(), cell.file_block->offset(),
1019
453k
                                      cell.size());
1020
1021
453k
    if (cell.file_block->cache_type() == FileCacheType::TTL) {
1022
3.92k
        _cur_ttl_size += cell.size();
1023
3.92k
    }
1024
453k
    auto [it, _] = offsets.insert(std::make_pair(offset, std::move(cell)));
1025
453k
    _cur_cache_size += size;
1026
453k
    return &(it->second);
1027
453k
}
1028
1029
0
size_t BlockFileCache::try_release() {
1030
0
    SCOPED_CACHE_LOCK(_mutex, this);
1031
0
    std::vector<FileBlockCell*> trash;
1032
0
    for (auto& [hash, blocks] : _files) {
1033
0
        for (auto& [offset, cell] : blocks) {
1034
0
            if (cell.releasable()) {
1035
0
                trash.emplace_back(&cell);
1036
0
            } else {
1037
0
                cell.file_block->set_deleting();
1038
0
            }
1039
0
        }
1040
0
    }
1041
0
    size_t remove_size = 0;
1042
0
    for (auto& cell : trash) {
1043
0
        FileBlockSPtr file_block = cell->file_block;
1044
0
        std::lock_guard lc(cell->file_block->_mutex);
1045
0
        remove_size += file_block->range().size();
1046
0
        remove(file_block, cache_lock, lc);
1047
0
        VLOG_DEBUG << "try_release " << _cache_base_path
1048
0
                   << " hash=" << file_block->get_hash_value().to_string()
1049
0
                   << " offset=" << file_block->offset();
1050
0
    }
1051
0
    *_evict_by_try_release << remove_size;
1052
0
    LOG(INFO) << "Released " << trash.size() << " blocks in file cache " << _cache_base_path;
1053
0
    return trash.size();
1054
0
}
1055
1056
2.68M
LRUQueue& BlockFileCache::get_queue(FileCacheType type) {
1057
2.68M
    switch (type) {
1058
479k
    case FileCacheType::INDEX:
1059
479k
        return _index_queue;
1060
392k
    case FileCacheType::DISPOSABLE:
1061
392k
        return _disposable_queue;
1062
1.80M
    case FileCacheType::NORMAL:
1063
1.80M
        return _normal_queue;
1064
7.27k
    case FileCacheType::TTL:
1065
7.27k
        return _ttl_queue;
1066
0
    default:
1067
0
        DCHECK(false);
1068
2.68M
    }
1069
0
    return _normal_queue;
1070
2.68M
}
1071
1072
140
const LRUQueue& BlockFileCache::get_queue(FileCacheType type) const {
1073
140
    switch (type) {
1074
33
    case FileCacheType::INDEX:
1075
33
        return _index_queue;
1076
1
    case FileCacheType::DISPOSABLE:
1077
1
        return _disposable_queue;
1078
106
    case FileCacheType::NORMAL:
1079
106
        return _normal_queue;
1080
0
    case FileCacheType::TTL:
1081
0
        return _ttl_queue;
1082
0
    default:
1083
0
        DCHECK(false);
1084
140
    }
1085
0
    return _normal_queue;
1086
140
}
1087
1088
void BlockFileCache::remove_file_blocks(std::vector<FileBlockCell*>& to_evict,
1089
                                        std::lock_guard<std::mutex>& cache_lock, bool sync,
1090
558k
                                        std::string& reason) {
1091
558k
    auto remove_file_block_if = [&](FileBlockCell* cell) {
1092
109k
        FileBlockSPtr file_block = cell->file_block;
1093
109k
        if (file_block) {
1094
109k
            std::lock_guard block_lock(file_block->_mutex);
1095
109k
            remove(file_block, cache_lock, block_lock, sync);
1096
109k
            VLOG_DEBUG << "remove_file_blocks"
1097
0
                       << " hash=" << file_block->get_hash_value().to_string()
1098
0
                       << " offset=" << file_block->offset() << " reason=" << reason;
1099
109k
        }
1100
109k
    };
1101
558k
    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
1102
558k
}
1103
1104
void BlockFileCache::find_evict_candidates(LRUQueue& queue, size_t size, size_t cur_cache_size,
1105
                                           size_t& removed_size,
1106
                                           std::vector<FileBlockCell*>& to_evict,
1107
                                           std::lock_guard<std::mutex>& cache_lock,
1108
39.9k
                                           size_t& cur_removed_size, bool evict_in_advance) {
1109
20.2M
    for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1110
20.2M
        if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1111
4.78k
            break;
1112
4.78k
        }
1113
20.2M
        auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1114
1115
20.2M
        DCHECK(cell) << "Cache became inconsistent. key: " << entry_key.to_string()
1116
0
                     << ", offset: " << entry_offset;
1117
1118
20.2M
        size_t cell_size = cell->size();
1119
20.2M
        DCHECK(entry_size == cell_size);
1120
1121
20.2M
        if (cell->releasable()) {
1122
107k
            auto& file_block = cell->file_block;
1123
1124
107k
            std::lock_guard block_lock(file_block->_mutex);
1125
107k
            DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1126
107k
            to_evict.push_back(cell);
1127
107k
            removed_size += cell_size;
1128
107k
            cur_removed_size += cell_size;
1129
107k
        }
1130
20.2M
    }
1131
39.9k
}
1132
1133
// 1. if async load file cache not finish
1134
//     a. evict from lru queue
1135
// 2. if ttl cache
1136
//     a. evict from disposable/normal/index queue one by one
1137
// 3. if dont reach query limit or dont have query limit
1138
//     a. evict from other queue
1139
//     b. evict from current queue
1140
//         a.1 if the data belong write, then just evict cold data
1141
// 4. if reach query limit
1142
//     a. evict from query queue
1143
//     b. evict from other queue
1144
bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const CacheContext& context,
1145
                                 size_t offset, size_t size,
1146
386k
                                 std::lock_guard<std::mutex>& cache_lock) {
1147
386k
    if (!_async_open_done) {
1148
4
        return try_reserve_during_async_load(size, cache_lock);
1149
4
    }
1150
    // use this strategy in scenarios where there is insufficient disk capacity or insufficient number of inodes remaining
1151
    // directly eliminate 5 times the size of the space
1152
386k
    if (_disk_resource_limit_mode) {
1153
33
        size = 5 * size;
1154
33
    }
1155
1156
386k
    auto query_context = config::enable_file_cache_query_limit &&
1157
386k
                                         (context.query_id.hi != 0 || context.query_id.lo != 0)
1158
386k
                                 ? get_query_context(context.query_id, cache_lock)
1159
386k
                                 : nullptr;
1160
386k
    if (!query_context) {
1161
386k
        return try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock);
1162
386k
    } else if (query_context->get_cache_size(cache_lock) + size <=
1163
111
               query_context->get_max_cache_size()) {
1164
38
        return try_reserve_for_lru(hash, query_context, context, offset, size, cache_lock);
1165
38
    }
1166
73
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
1167
73
                               std::chrono::steady_clock::now().time_since_epoch())
1168
73
                               .count();
1169
73
    auto& queue = get_queue(context.cache_type);
1170
73
    size_t removed_size = 0;
1171
73
    size_t ghost_remove_size = 0;
1172
73
    size_t queue_size = queue.get_capacity(cache_lock);
1173
73
    size_t cur_cache_size = _cur_cache_size;
1174
73
    size_t query_context_cache_size = query_context->get_cache_size(cache_lock);
1175
1176
73
    std::vector<LRUQueue::Iterator> ghost;
1177
73
    std::vector<FileBlockCell*> to_evict;
1178
1179
73
    size_t max_size = queue.get_max_size();
1180
541
    auto is_overflow = [&] {
1181
541
        return _disk_resource_limit_mode ? removed_size < size
1182
541
                                         : cur_cache_size + size - removed_size > _capacity ||
1183
541
                                                   (queue_size + size - removed_size > max_size) ||
1184
541
                                                   (query_context_cache_size + size -
1185
531
                                                            (removed_size + ghost_remove_size) >
1186
531
                                                    query_context->get_max_cache_size());
1187
541
    };
1188
1189
    /// Select the cache from the LRU queue held by query for expulsion.
1190
492
    for (auto iter = query_context->queue().begin(); iter != query_context->queue().end(); iter++) {
1191
468
        if (!is_overflow()) {
1192
49
            break;
1193
49
        }
1194
1195
419
        auto* cell = get_cell(iter->hash, iter->offset, cache_lock);
1196
1197
419
        if (!cell) {
1198
            /// The cache corresponding to this record may be swapped out by
1199
            /// other queries, so it has become invalid.
1200
42
            ghost.push_back(iter);
1201
42
            ghost_remove_size += iter->size;
1202
377
        } else {
1203
377
            size_t cell_size = cell->size();
1204
377
            DCHECK(iter->size == cell_size);
1205
1206
377
            if (cell->releasable()) {
1207
38
                auto& file_block = cell->file_block;
1208
1209
38
                std::lock_guard block_lock(file_block->_mutex);
1210
38
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1211
38
                to_evict.push_back(cell);
1212
38
                removed_size += cell_size;
1213
38
            }
1214
377
        }
1215
419
    }
1216
1217
73
    auto remove_file_block_if = [&](FileBlockCell* cell) {
1218
38
        FileBlockSPtr file_block = cell->file_block;
1219
38
        if (file_block) {
1220
38
            query_context->remove(file_block->get_hash_value(), file_block->offset(), cache_lock);
1221
38
            std::lock_guard block_lock(file_block->_mutex);
1222
38
            remove(file_block, cache_lock, block_lock);
1223
38
        }
1224
38
    };
1225
1226
73
    for (auto& iter : ghost) {
1227
42
        query_context->remove(iter->hash, iter->offset, cache_lock);
1228
42
    }
1229
1230
73
    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
1231
1232
73
    if (is_overflow() &&
1233
73
        !try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock)) {
1234
1
        return false;
1235
1
    }
1236
72
    query_context->reserve(hash, offset, size, cache_lock);
1237
72
    return true;
1238
73
}
1239
1240
332
void BlockFileCache::try_evict_in_advance(size_t size, std::lock_guard<std::mutex>& cache_lock) {
1241
332
    UInt128Wrapper hash = UInt128Wrapper();
1242
332
    size_t offset = 0;
1243
332
    CacheContext context;
1244
    /* we pick NORMAL and TTL cache to evict in advance
1245
     * we reserve for them but won't acutually give space to them
1246
     * on the contrary, NORMAL and TTL may sacrifice by LRU evicting themselves
1247
     * other cache types cannot be exempted because we will evict what they have stolen before LRU evicting
1248
     * in summary: all cache types will shrink somewhat, and NORMAL and TTL shrink the most, to make sure the cache is not full
1249
     */
1250
332
    context.cache_type = FileCacheType::NORMAL;
1251
332
    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
1252
332
    context.cache_type = FileCacheType::TTL;
1253
332
    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
1254
332
}
1255
1256
// remove specific cache synchronously, for critical operations
1257
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
1258
8
void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) {
1259
8
    std::string reason = "remove_if_cached";
1260
8
    SCOPED_CACHE_LOCK(_mutex, this);
1261
8
    auto iter = _files.find(file_key);
1262
8
    std::vector<FileBlockCell*> to_remove;
1263
8
    if (iter != _files.end()) {
1264
14
        for (auto& [_, cell] : iter->second) {
1265
14
            if (cell.releasable()) {
1266
13
                to_remove.push_back(&cell);
1267
13
            } else {
1268
1
                cell.file_block->set_deleting();
1269
1
            }
1270
14
        }
1271
6
    }
1272
8
    remove_file_blocks(to_remove, cache_lock, true, reason);
1273
8
}
1274
1275
// the async version of remove_if_cached, for background operations
1276
// cache meta is deleted synchronously if not in use, and the block file is deleted asynchronously
1277
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
1278
132k
void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) {
1279
132k
    std::string reason = "remove_if_cached_async";
1280
132k
    SCOPED_CACHE_LOCK(_mutex, this);
1281
1282
132k
    auto iter = _files.find(file_key);
1283
132k
    std::vector<FileBlockCell*> to_remove;
1284
132k
    if (iter != _files.end()) {
1285
3.77k
        for (auto& [_, cell] : iter->second) {
1286
3.77k
            *_gc_evict_bytes_metrics << cell.size();
1287
3.77k
            *_gc_evict_count_metrics << 1;
1288
3.77k
            if (cell.releasable()) {
1289
2.04k
                to_remove.push_back(&cell);
1290
2.04k
            } else {
1291
1.72k
                cell.file_block->set_deleting();
1292
1.72k
            }
1293
3.77k
        }
1294
3.77k
    }
1295
132k
    remove_file_blocks(to_remove, cache_lock, false, reason);
1296
132k
}
1297
1298
std::vector<FileCacheType> BlockFileCache::get_other_cache_type_without_ttl(
1299
387k
        FileCacheType cur_cache_type) {
1300
387k
    switch (cur_cache_type) {
1301
4.24k
    case FileCacheType::TTL:
1302
4.24k
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
1303
40.6k
    case FileCacheType::INDEX:
1304
40.6k
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL};
1305
339k
    case FileCacheType::NORMAL:
1306
339k
        return {FileCacheType::DISPOSABLE, FileCacheType::INDEX};
1307
3.11k
    case FileCacheType::DISPOSABLE:
1308
3.11k
        return {FileCacheType::NORMAL, FileCacheType::INDEX};
1309
0
    default:
1310
0
        return {};
1311
387k
    }
1312
0
    return {};
1313
387k
}
1314
1315
38.5k
std::vector<FileCacheType> BlockFileCache::get_other_cache_type(FileCacheType cur_cache_type) {
1316
38.5k
    switch (cur_cache_type) {
1317
614
    case FileCacheType::TTL:
1318
614
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
1319
6.45k
    case FileCacheType::INDEX:
1320
6.45k
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::TTL};
1321
30.4k
    case FileCacheType::NORMAL:
1322
30.4k
        return {FileCacheType::DISPOSABLE, FileCacheType::INDEX, FileCacheType::TTL};
1323
1.05k
    case FileCacheType::DISPOSABLE:
1324
1.05k
        return {FileCacheType::NORMAL, FileCacheType::INDEX, FileCacheType::TTL};
1325
0
    default:
1326
0
        return {};
1327
38.5k
    }
1328
0
    return {};
1329
38.5k
}
1330
1331
void BlockFileCache::reset_range(const UInt128Wrapper& hash, size_t offset, size_t old_size,
1332
57.1k
                                 size_t new_size, std::lock_guard<std::mutex>& cache_lock) {
1333
57.1k
    DCHECK(_files.find(hash) != _files.end() &&
1334
57.1k
           _files.find(hash)->second.find(offset) != _files.find(hash)->second.end());
1335
57.1k
    FileBlockCell* cell = get_cell(hash, offset, cache_lock);
1336
57.1k
    DCHECK(cell != nullptr);
1337
57.1k
    if (cell == nullptr) {
1338
0
        LOG(WARNING) << "reset_range skipped because cache cell is missing. hash="
1339
0
                     << hash.to_string() << " offset=" << offset << " old_size=" << old_size
1340
0
                     << " new_size=" << new_size;
1341
0
        return;
1342
0
    }
1343
57.1k
    DCHECK_EQ(cell->file_block->_block_range.size(), old_size);
1344
57.1k
    if (cell->queue_iterator) {
1345
57.1k
        auto& queue = get_queue(cell->file_block->cache_type());
1346
57.1k
        DCHECK(queue.contains(hash, offset, cache_lock));
1347
57.1k
        queue.resize(*cell->queue_iterator, new_size, cache_lock);
1348
57.1k
        _lru_recorder->record_queue_event(cell->file_block->cache_type(), CacheLRULogType::RESIZE,
1349
57.1k
                                          cell->file_block->get_hash_value(),
1350
57.1k
                                          cell->file_block->offset(), new_size);
1351
57.1k
    }
1352
57.1k
    cell->file_block->_block_range.right = cell->file_block->_block_range.left + new_size - 1;
1353
57.1k
    _cur_cache_size -= old_size;
1354
57.1k
    _cur_cache_size += new_size;
1355
57.1k
    if (cell->file_block->cache_type() == FileCacheType::TTL) {
1356
0
        _cur_ttl_size -= old_size;
1357
0
        _cur_ttl_size += new_size;
1358
0
    }
1359
57.1k
}
1360
1361
bool BlockFileCache::try_reserve_from_other_queue_by_time_interval(
1362
        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
1363
387k
        int64_t cur_time, std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
1364
387k
    size_t removed_size = 0;
1365
387k
    size_t cur_cache_size = _cur_cache_size;
1366
387k
    std::vector<FileBlockCell*> to_evict;
1367
778k
    for (FileCacheType cache_type : other_cache_types) {
1368
778k
        auto& queue = get_queue(cache_type);
1369
778k
        size_t remove_size_per_type = 0;
1370
778k
        for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1371
733k
            if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1372
658k
                break;
1373
658k
            }
1374
75.4k
            auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1375
75.4k
            DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
1376
1
                         << ", offset: " << entry_offset;
1377
1378
75.4k
            size_t cell_size = cell->size();
1379
75.4k
            DCHECK(entry_size == cell_size);
1380
1381
75.4k
            if (cell->atime == 0 ? true : cell->atime + queue.get_hot_data_interval() > cur_time) {
1382
75.4k
                break;
1383
75.4k
            }
1384
1385
3
            if (cell->releasable()) {
1386
2
                auto& file_block = cell->file_block;
1387
2
                std::lock_guard block_lock(file_block->_mutex);
1388
2
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1389
2
                to_evict.push_back(cell);
1390
2
                removed_size += cell_size;
1391
2
                remove_size_per_type += cell_size;
1392
2
            }
1393
3
        }
1394
778k
        *(_evict_by_time_metrics_matrix[cache_type][cur_type]) << remove_size_per_type;
1395
778k
    }
1396
387k
    bool is_sync_removal = !evict_in_advance;
1397
387k
    std::string reason = std::string("try_reserve_by_time ") +
1398
387k
                         " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1399
387k
    remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1400
1401
387k
    return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
1402
387k
}
1403
1404
bool BlockFileCache::is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size,
1405
21.4M
                                 bool evict_in_advance) const {
1406
21.4M
    bool ret = false;
1407
21.4M
    if (evict_in_advance) { // we don't need to check _need_evict_cache_in_advance
1408
430k
        ret = (removed_size < need_size);
1409
430k
        return ret;
1410
430k
    }
1411
21.0M
    if (_disk_resource_limit_mode) {
1412
122
        ret = (removed_size < need_size);
1413
21.0M
    } else {
1414
21.0M
        ret = (cur_cache_size + need_size - removed_size > _capacity);
1415
21.0M
    }
1416
21.0M
    return ret;
1417
21.4M
}
1418
1419
bool BlockFileCache::try_reserve_from_other_queue_by_size(
1420
        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
1421
1.60k
        std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
1422
1.60k
    size_t removed_size = 0;
1423
1.60k
    size_t cur_cache_size = _cur_cache_size;
1424
1.60k
    std::vector<FileBlockCell*> to_evict;
1425
    // we follow the privilege defined in get_other_cache_types to evict
1426
4.80k
    for (FileCacheType cache_type : other_cache_types) {
1427
4.80k
        auto& queue = get_queue(cache_type);
1428
1429
        // we will not drain each of them to the bottom -- i.e., we only
1430
        // evict what they have stolen.
1431
4.80k
        size_t cur_queue_size = queue.get_capacity(cache_lock);
1432
4.80k
        size_t cur_queue_max_size = queue.get_max_size();
1433
4.80k
        if (cur_queue_size <= cur_queue_max_size) {
1434
2.65k
            continue;
1435
2.65k
        }
1436
2.15k
        size_t cur_removed_size = 0;
1437
2.15k
        find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
1438
2.15k
                              cur_removed_size, evict_in_advance);
1439
2.15k
        *(_evict_by_size_metrics_matrix[cache_type][cur_type]) << cur_removed_size;
1440
2.15k
    }
1441
1.60k
    bool is_sync_removal = !evict_in_advance;
1442
1.60k
    std::string reason = std::string("try_reserve_by_size") +
1443
1.60k
                         " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1444
1.60k
    remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1445
1.60k
    return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
1446
1.60k
}
1447
1448
bool BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type, size_t size,
1449
                                                  int64_t cur_time,
1450
                                                  std::lock_guard<std::mutex>& cache_lock,
1451
387k
                                                  bool evict_in_advance) {
1452
    // currently, TTL cache is not considered as a candidate
1453
387k
    auto other_cache_types = get_other_cache_type_without_ttl(cur_cache_type);
1454
387k
    bool reserve_success = try_reserve_from_other_queue_by_time_interval(
1455
387k
            cur_cache_type, other_cache_types, size, cur_time, cache_lock, evict_in_advance);
1456
387k
    if (reserve_success || !config::file_cache_enable_evict_from_other_queue_by_size) {
1457
348k
        return reserve_success;
1458
348k
    }
1459
1460
38.5k
    other_cache_types = get_other_cache_type(cur_cache_type);
1461
38.5k
    auto& cur_queue = get_queue(cur_cache_type);
1462
38.5k
    size_t cur_queue_size = cur_queue.get_capacity(cache_lock);
1463
38.5k
    size_t cur_queue_max_size = cur_queue.get_max_size();
1464
    // Hit the soft limit by self, cannot remove from other queues
1465
38.5k
    if (_cur_cache_size + size > _capacity && cur_queue_size + size > cur_queue_max_size) {
1466
36.9k
        return false;
1467
36.9k
    }
1468
1.60k
    return try_reserve_from_other_queue_by_size(cur_cache_type, other_cache_types, size, cache_lock,
1469
1.60k
                                                evict_in_advance);
1470
38.5k
}
1471
1472
bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash,
1473
                                         QueryFileCacheContextPtr query_context,
1474
                                         const CacheContext& context, size_t offset, size_t size,
1475
                                         std::lock_guard<std::mutex>& cache_lock,
1476
387k
                                         bool evict_in_advance) {
1477
387k
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
1478
387k
                               std::chrono::steady_clock::now().time_since_epoch())
1479
387k
                               .count();
1480
387k
    if (!try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock,
1481
387k
                                      evict_in_advance)) {
1482
37.7k
        auto& queue = get_queue(context.cache_type);
1483
37.7k
        size_t removed_size = 0;
1484
37.7k
        size_t cur_cache_size = _cur_cache_size;
1485
1486
37.7k
        std::vector<FileBlockCell*> to_evict;
1487
37.7k
        size_t cur_removed_size = 0;
1488
37.7k
        find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
1489
37.7k
                              cur_removed_size, evict_in_advance);
1490
37.7k
        bool is_sync_removal = !evict_in_advance;
1491
37.7k
        std::string reason = std::string("try_reserve for cache type ") +
1492
37.7k
                             cache_type_to_string(context.cache_type) +
1493
37.7k
                             " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1494
37.7k
        remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1495
37.7k
        *(_evict_by_self_lru_metrics_matrix[context.cache_type]) << cur_removed_size;
1496
1497
37.7k
        if (is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1498
33.9k
            return false;
1499
33.9k
        }
1500
37.7k
    }
1501
1502
353k
    if (query_context) {
1503
38
        query_context->reserve(hash, offset, size, cache_lock);
1504
38
    }
1505
353k
    return true;
1506
387k
}
1507
1508
template <class T, class U>
1509
    requires IsXLock<T> && IsXLock<U>
1510
446k
void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lock, bool sync) {
1511
446k
    auto hash = file_block->get_hash_value();
1512
446k
    auto offset = file_block->offset();
1513
446k
    auto type = file_block->cache_type();
1514
446k
    auto expiration_time = file_block->expiration_time();
1515
446k
    auto tablet_id = file_block->tablet_id();
1516
446k
    auto* cell = get_cell(hash, offset, cache_lock);
1517
446k
    file_block->cell = nullptr;
1518
    // Holder cleanup can race with prior cache metadata cleanup. In that case,
1519
    // skip the duplicate remove instead of touching a detached or replaced cell.
1520
446k
    if (cell == nullptr) {
1521
1
        LOG(WARNING) << "remove skipped because cache cell is missing. hash=" << hash.to_string()
1522
1
                     << " offset=" << offset << " size=" << file_block->range().size()
1523
1
                     << " type=" << cache_type_to_string(type)
1524
1
                     << " state=" << FileBlock::state_to_string(file_block->state_unsafe())
1525
1
                     << " expiration_time=" << expiration_time << " sync=" << sync;
1526
1
        return;
1527
1
    }
1528
446k
    if (cell->file_block.get() != file_block.get()) {
1529
1
        auto* cell_file_block = cell->file_block.get();
1530
1
        LOG(WARNING)
1531
1
                << "remove skipped because cache cell points to a different file block. hash="
1532
1
                << hash.to_string() << " offset=" << offset
1533
1
                << " size=" << file_block->range().size() << " type=" << cache_type_to_string(type)
1534
1
                << " state=" << FileBlock::state_to_string(file_block->state_unsafe())
1535
1
                << " expiration_time=" << expiration_time << " sync=" << sync << " cell_block_hash="
1536
1
                << (cell_file_block ? cell_file_block->get_hash_value().to_string() : "<null>")
1537
1
                << " cell_block_offset="
1538
1
                << (cell_file_block ? std::to_string(cell_file_block->offset()) : "<null>")
1539
1
                << " cell_block_size="
1540
1
                << (cell_file_block ? std::to_string(cell_file_block->range().size()) : "<null>")
1541
1
                << " cell_block_type="
1542
1
                << (cell_file_block ? cache_type_to_string(cell_file_block->cache_type())
1543
1
                                    : "<null>")
1544
1
                << " cell_block_state="
1545
1
                << (cell_file_block ? FileBlock::state_to_string(cell_file_block->state_unsafe())
1546
1
                                    : "<null>");
1547
1
        return;
1548
1
    }
1549
446k
    DCHECK(cell->queue_iterator);
1550
446k
    if (cell->queue_iterator) {
1551
446k
        auto& queue = get_queue(file_block->cache_type());
1552
446k
        queue.remove(*cell->queue_iterator, cache_lock);
1553
446k
        _lru_recorder->record_queue_event(file_block->cache_type(), CacheLRULogType::REMOVE,
1554
446k
                                          cell->file_block->get_hash_value(),
1555
446k
                                          cell->file_block->offset(), cell->size());
1556
446k
    }
1557
446k
    *_queue_evict_size_metrics[static_cast<int>(file_block->cache_type())]
1558
446k
            << file_block->range().size();
1559
446k
    *_total_evict_size_metrics << file_block->range().size();
1560
1561
446k
    VLOG_DEBUG << "Removing file block from cache. hash: " << hash.to_string()
1562
0
               << ", offset: " << offset << ", size: " << file_block->range().size()
1563
0
               << ", type: " << cache_type_to_string(type);
1564
1565
446k
    if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADED) {
1566
214k
        FileCacheKey key;
1567
214k
        key.hash = hash;
1568
214k
        key.offset = offset;
1569
214k
        key.meta.type = type;
1570
214k
        key.meta.expiration_time = expiration_time;
1571
214k
        key.meta.tablet_id = tablet_id;
1572
214k
        if (sync) {
1573
156k
            int64_t duration_ns = 0;
1574
156k
            Status st;
1575
156k
            {
1576
156k
                SCOPED_RAW_TIMER(&duration_ns);
1577
156k
                st = _storage->remove(key);
1578
156k
            }
1579
156k
            *_storage_sync_remove_latency_us << (duration_ns / 1000);
1580
156k
            if (!st.ok()) {
1581
0
                LOG_WARNING("").error(st);
1582
0
            }
1583
156k
        } else {
1584
            // the file will be deleted in the bottom half
1585
            // so there will be a window that the file is not in the cache but still in the storage
1586
            // but it's ok, because the rowset is stale already
1587
57.8k
            bool ret = _recycle_keys.enqueue(key);
1588
57.8k
            if (ret) [[likely]] {
1589
57.8k
                *_recycle_keys_length_recorder << _recycle_keys.size_approx();
1590
57.8k
            } else {
1591
0
                LOG_WARNING("Failed to push recycle key to queue, do it synchronously");
1592
0
                int64_t duration_ns = 0;
1593
0
                Status st;
1594
0
                {
1595
0
                    SCOPED_RAW_TIMER(&duration_ns);
1596
0
                    st = _storage->remove(key);
1597
0
                }
1598
0
                *_storage_retry_sync_remove_latency_us << (duration_ns / 1000);
1599
0
                if (!st.ok()) {
1600
0
                    LOG_WARNING("").error(st);
1601
0
                }
1602
0
            }
1603
57.8k
        }
1604
231k
    } else if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADING) {
1605
100
        file_block->set_deleting();
1606
100
        return;
1607
100
    }
1608
446k
    _cur_cache_size -= file_block->range().size();
1609
446k
    if (FileCacheType::TTL == type) {
1610
1.29k
        _cur_ttl_size -= file_block->range().size();
1611
1.29k
    }
1612
446k
    auto it = _files.find(hash);
1613
446k
    if (it != _files.end()) {
1614
446k
        it->second.erase(file_block->offset());
1615
446k
        if (it->second.empty()) {
1616
112k
            _files.erase(hash);
1617
112k
        }
1618
446k
    }
1619
446k
    *_num_removed_blocks << 1;
1620
446k
}
1621
1622
46
size_t BlockFileCache::get_used_cache_size(FileCacheType cache_type) const {
1623
46
    SCOPED_CACHE_LOCK(_mutex, this);
1624
46
    return get_used_cache_size_unlocked(cache_type, cache_lock);
1625
46
}
1626
1627
size_t BlockFileCache::get_used_cache_size_unlocked(FileCacheType cache_type,
1628
46
                                                    std::lock_guard<std::mutex>& cache_lock) const {
1629
46
    return get_queue(cache_type).get_capacity(cache_lock);
1630
46
}
1631
1632
0
size_t BlockFileCache::get_available_cache_size(FileCacheType cache_type) const {
1633
0
    SCOPED_CACHE_LOCK(_mutex, this);
1634
0
    return get_available_cache_size_unlocked(cache_type, cache_lock);
1635
0
}
1636
1637
size_t BlockFileCache::get_available_cache_size_unlocked(
1638
0
        FileCacheType cache_type, std::lock_guard<std::mutex>& cache_lock) const {
1639
0
    return get_queue(cache_type).get_max_element_size() -
1640
0
           get_used_cache_size_unlocked(cache_type, cache_lock);
1641
0
}
1642
1643
91
size_t BlockFileCache::get_file_blocks_num(FileCacheType cache_type) const {
1644
91
    SCOPED_CACHE_LOCK(_mutex, this);
1645
91
    return get_file_blocks_num_unlocked(cache_type, cache_lock);
1646
91
}
1647
1648
size_t BlockFileCache::get_file_blocks_num_unlocked(FileCacheType cache_type,
1649
91
                                                    std::lock_guard<std::mutex>& cache_lock) const {
1650
91
    return get_queue(cache_type).get_elements_num(cache_lock);
1651
91
}
1652
1653
FileBlockCell::FileBlockCell(FileBlockSPtr file_block, std::lock_guard<std::mutex>& cache_lock)
1654
453k
        : file_block(file_block) {
1655
453k
    file_block->cell = this;
1656
    /**
1657
     * Cell can be created with either DOWNLOADED or EMPTY file block's state.
1658
     * File block acquires DOWNLOADING state and creates LRUQueue iterator on first
1659
     * successful getOrSetDownaloder call.
1660
     */
1661
1662
453k
    switch (file_block->_download_state) {
1663
100k
    case FileBlock::State::DOWNLOADED:
1664
453k
    case FileBlock::State::EMPTY:
1665
453k
    case FileBlock::State::SKIP_CACHE: {
1666
453k
        break;
1667
453k
    }
1668
0
    default:
1669
0
        DCHECK(false) << "Can create cell with either EMPTY, DOWNLOADED, SKIP_CACHE state, got: "
1670
0
                      << FileBlock::state_to_string(file_block->_download_state);
1671
453k
    }
1672
453k
    if (file_block->cache_type() == FileCacheType::TTL) {
1673
3.92k
        update_atime();
1674
3.92k
    }
1675
453k
}
1676
1677
LRUQueue::Iterator LRUQueue::add(const UInt128Wrapper& hash, size_t offset, size_t size,
1678
911k
                                 std::lock_guard<std::mutex>& /* cache_lock */) {
1679
911k
    cache_size += size;
1680
911k
    auto iter = queue.insert(queue.end(), FileKeyAndOffset(hash, offset, size));
1681
911k
    map.insert(std::make_pair(std::make_pair(hash, offset), iter));
1682
911k
    return iter;
1683
911k
}
1684
1685
0
void LRUQueue::remove_all(std::lock_guard<std::mutex>& /* cache_lock */) {
1686
0
    queue.clear();
1687
0
    map.clear();
1688
0
    cache_size = 0;
1689
0
}
1690
1691
15
bool LRUQueue::pop_front(std::lock_guard<std::mutex>& /* cache_lock */) {
1692
15
    if (queue.empty()) {
1693
0
        return false;
1694
0
    }
1695
15
    auto queue_it = queue.begin();
1696
15
    cache_size -= queue_it->size;
1697
15
    map.erase(std::make_pair(queue_it->hash, queue_it->offset));
1698
15
    queue.erase(queue_it);
1699
15
    return true;
1700
15
}
1701
1702
1.33M
void LRUQueue::move_to_end(Iterator queue_it, std::lock_guard<std::mutex>& /* cache_lock */) {
1703
1.33M
    queue.splice(queue.end(), queue, queue_it);
1704
1.33M
}
1705
1706
void LRUQueue::resize(Iterator queue_it, size_t new_size,
1707
114k
                      std::lock_guard<std::mutex>& /* cache_lock */) {
1708
114k
    cache_size -= queue_it->size;
1709
114k
    queue_it->size = new_size;
1710
114k
    cache_size += new_size;
1711
114k
}
1712
bool LRUQueue::contains(const UInt128Wrapper& hash, size_t offset,
1713
57.1k
                        std::lock_guard<std::mutex>& /* cache_lock */) const {
1714
57.1k
    return map.find(std::make_pair(hash, offset)) != map.end();
1715
57.1k
}
1716
1717
LRUQueue::Iterator LRUQueue::get(const UInt128Wrapper& hash, size_t offset,
1718
1.17M
                                 std::lock_guard<std::mutex>& /* cache_lock */) const {
1719
1.17M
    auto itr = map.find(std::make_pair(hash, offset));
1720
1.17M
    if (itr != map.end()) {
1721
1.17M
        return itr->second;
1722
1.17M
    }
1723
1.74k
    return std::list<FileKeyAndOffset>::iterator();
1724
1.17M
}
1725
1726
2
std::string LRUQueue::to_string(std::lock_guard<std::mutex>& /* cache_lock */) const {
1727
2
    std::string result;
1728
18
    for (const auto& [hash, offset, size] : queue) {
1729
18
        if (!result.empty()) {
1730
16
            result += ", ";
1731
16
        }
1732
18
        result += fmt::format("{}: [{}, {}]", hash.to_string(), offset, offset + size - 1);
1733
18
    }
1734
2
    return result;
1735
2
}
1736
1737
size_t LRUQueue::levenshtein_distance_from(LRUQueue& base,
1738
5
                                           std::lock_guard<std::mutex>& cache_lock) {
1739
5
    std::list<FileKeyAndOffset> target_queue = this->queue;
1740
5
    std::list<FileKeyAndOffset> base_queue = base.queue;
1741
5
    std::vector<FileKeyAndOffset> vec1(target_queue.begin(), target_queue.end());
1742
5
    std::vector<FileKeyAndOffset> vec2(base_queue.begin(), base_queue.end());
1743
1744
5
    size_t m = vec1.size();
1745
5
    size_t n = vec2.size();
1746
1747
    // Create a 2D vector (matrix) to store the Levenshtein distances
1748
    // dp[i][j] will hold the distance between the first i elements of vec1 and the first j elements of vec2
1749
5
    std::vector<std::vector<size_t>> dp(m + 1, std::vector<size_t>(n + 1, 0));
1750
1751
    // Initialize the first row and column of the matrix
1752
    // The distance between an empty list and a list of length k is k (all insertions or deletions)
1753
22
    for (size_t i = 0; i <= m; ++i) {
1754
17
        dp[i][0] = i;
1755
17
    }
1756
20
    for (size_t j = 0; j <= n; ++j) {
1757
15
        dp[0][j] = j;
1758
15
    }
1759
1760
    // Fill the matrix using dynamic programming
1761
17
    for (size_t i = 1; i <= m; ++i) {
1762
38
        for (size_t j = 1; j <= n; ++j) {
1763
            // Check if the current elements of both vectors are equal
1764
26
            size_t cost = (vec1[i - 1].hash == vec2[j - 1].hash &&
1765
26
                           vec1[i - 1].offset == vec2[j - 1].offset)
1766
26
                                  ? 0
1767
26
                                  : 1;
1768
            // Calculate the minimum cost of three possible operations:
1769
            // 1. Insertion: dp[i][j-1] + 1
1770
            // 2. Deletion: dp[i-1][j] + 1
1771
            // 3. Substitution: dp[i-1][j-1] + cost (0 if elements are equal, 1 if not)
1772
26
            dp[i][j] = std::min({dp[i - 1][j] + 1, dp[i][j - 1] + 1, dp[i - 1][j - 1] + cost});
1773
26
        }
1774
12
    }
1775
    // The bottom-right cell of the matrix contains the Levenshtein distance
1776
5
    return dp[m][n];
1777
5
}
1778
1779
1
std::string BlockFileCache::dump_structure(const UInt128Wrapper& hash) {
1780
1
    SCOPED_CACHE_LOCK(_mutex, this);
1781
1
    return dump_structure_unlocked(hash, cache_lock);
1782
1
}
1783
1784
std::string BlockFileCache::dump_structure_unlocked(const UInt128Wrapper& hash,
1785
1
                                                    std::lock_guard<std::mutex>&) {
1786
1
    std::stringstream result;
1787
1
    auto it = _files.find(hash);
1788
1
    if (it == _files.end()) {
1789
0
        return std::string("");
1790
0
    }
1791
1
    const auto& cells_by_offset = it->second;
1792
1793
1
    for (const auto& [_, cell] : cells_by_offset) {
1794
1
        result << cell.file_block->get_info_for_log() << " "
1795
1
               << cache_type_to_string(cell.file_block->cache_type()) << "\n";
1796
1
    }
1797
1798
1
    return result.str();
1799
1
}
1800
1801
0
std::string BlockFileCache::dump_single_cache_type(const UInt128Wrapper& hash, size_t offset) {
1802
0
    SCOPED_CACHE_LOCK(_mutex, this);
1803
0
    return dump_single_cache_type_unlocked(hash, offset, cache_lock);
1804
0
}
1805
1806
std::string BlockFileCache::dump_single_cache_type_unlocked(const UInt128Wrapper& hash,
1807
                                                            size_t offset,
1808
0
                                                            std::lock_guard<std::mutex>&) {
1809
0
    std::stringstream result;
1810
0
    auto it = _files.find(hash);
1811
0
    if (it == _files.end()) {
1812
0
        return std::string("");
1813
0
    }
1814
0
    const auto& cells_by_offset = it->second;
1815
0
    const auto& cell = cells_by_offset.find(offset);
1816
1817
0
    return cache_type_to_string(cell->second.file_block->cache_type());
1818
0
}
1819
1820
void BlockFileCache::change_cache_type(const UInt128Wrapper& hash, size_t offset,
1821
                                       FileCacheType new_type,
1822
2.71k
                                       std::lock_guard<std::mutex>& cache_lock) {
1823
2.71k
    if (auto iter = _files.find(hash); iter != _files.end()) {
1824
2.71k
        auto& file_blocks = iter->second;
1825
2.71k
        if (auto cell_it = file_blocks.find(offset); cell_it != file_blocks.end()) {
1826
2.70k
            FileBlockCell& cell = cell_it->second;
1827
2.70k
            auto& cur_queue = get_queue(cell.file_block->cache_type());
1828
2.70k
            DCHECK(cell.queue_iterator.has_value());
1829
2.70k
            cur_queue.remove(*cell.queue_iterator, cache_lock);
1830
2.70k
            _lru_recorder->record_queue_event(
1831
2.70k
                    cell.file_block->cache_type(), CacheLRULogType::REMOVE,
1832
2.70k
                    cell.file_block->get_hash_value(), cell.file_block->offset(), cell.size());
1833
2.70k
            auto& new_queue = get_queue(new_type);
1834
2.70k
            cell.queue_iterator =
1835
2.70k
                    new_queue.add(hash, offset, cell.file_block->range().size(), cache_lock);
1836
2.70k
            _lru_recorder->record_queue_event(new_type, CacheLRULogType::ADD,
1837
2.70k
                                              cell.file_block->get_hash_value(),
1838
2.70k
                                              cell.file_block->offset(), cell.size());
1839
2.70k
        }
1840
2.71k
    }
1841
2.71k
}
1842
1843
// @brief: get a path's disk capacity used percent, inode used percent
1844
// @param: path
1845
// @param: percent.first disk used percent, percent.second inode used percent
1846
5.21k
int disk_used_percentage(const std::string& path, std::pair<int, int>* percent) {
1847
5.21k
    struct statfs stat;
1848
5.21k
    int ret = statfs(path.c_str(), &stat);
1849
5.21k
    if (ret != 0) {
1850
2
        return ret;
1851
2
    }
1852
    // https://github.com/coreutils/coreutils/blob/master/src/df.c#L1195
1853
    // v->used = stat.f_blocks - stat.f_bfree
1854
    // nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail
1855
5.21k
    uintmax_t u100 = (stat.f_blocks - stat.f_bfree) * 100;
1856
5.21k
    uintmax_t nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail;
1857
5.21k
    int capacity_percentage = int(u100 / nonroot_total + (u100 % nonroot_total != 0));
1858
1859
5.21k
    unsigned long long inode_free = stat.f_ffree;
1860
5.21k
    unsigned long long inode_total = stat.f_files;
1861
5.21k
    int inode_percentage = cast_set<int>(inode_free * 100 / inode_total);
1862
5.21k
    percent->first = capacity_percentage;
1863
5.21k
    percent->second = 100 - inode_percentage;
1864
1865
    // Add sync point for testing
1866
5.21k
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::disk_used_percentage:1", percent);
1867
1868
5.21k
    return 0;
1869
5.21k
}
1870
1871
10
std::string BlockFileCache::reset_capacity(size_t new_capacity) {
1872
10
    using namespace std::chrono;
1873
10
    int64_t space_released = 0;
1874
10
    size_t old_capacity = 0;
1875
10
    std::stringstream ss;
1876
10
    ss << "finish reset_capacity, path=" << _cache_base_path;
1877
10
    auto adjust_start_time = steady_clock::time_point();
1878
10
    {
1879
10
        SCOPED_CACHE_LOCK(_mutex, this);
1880
10
        if (new_capacity < _capacity && new_capacity < _cur_cache_size) {
1881
1
            int64_t need_remove_size = _cur_cache_size - new_capacity;
1882
4
            auto remove_blocks = [&](LRUQueue& queue) -> int64_t {
1883
4
                int64_t queue_released = 0;
1884
4
                std::vector<FileBlockCell*> to_evict;
1885
13
                for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1886
13
                    if (need_remove_size <= 0) {
1887
1
                        break;
1888
1
                    }
1889
12
                    need_remove_size -= entry_size;
1890
12
                    space_released += entry_size;
1891
12
                    queue_released += entry_size;
1892
12
                    auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1893
12
                    if (!cell->releasable()) {
1894
0
                        cell->file_block->set_deleting();
1895
0
                        continue;
1896
0
                    }
1897
12
                    to_evict.push_back(cell);
1898
12
                }
1899
12
                for (auto& cell : to_evict) {
1900
12
                    FileBlockSPtr file_block = cell->file_block;
1901
12
                    std::lock_guard block_lock(file_block->_mutex);
1902
12
                    remove(file_block, cache_lock, block_lock);
1903
12
                }
1904
4
                return queue_released;
1905
4
            };
1906
1
            int64_t queue_released = remove_blocks(_disposable_queue);
1907
1
            ss << " disposable_queue released " << queue_released;
1908
1
            queue_released = remove_blocks(_normal_queue);
1909
1
            ss << " normal_queue released " << queue_released;
1910
1
            queue_released = remove_blocks(_index_queue);
1911
1
            ss << " index_queue released " << queue_released;
1912
1
            queue_released = remove_blocks(_ttl_queue);
1913
1
            ss << " ttl_queue released " << queue_released;
1914
1915
1
            _disk_resource_limit_mode = true;
1916
1
            _disk_limit_mode_metrics->set_value(1);
1917
1
            ss << " total_space_released=" << space_released;
1918
1
        }
1919
10
        old_capacity = _capacity;
1920
10
        _capacity = new_capacity;
1921
10
        _cache_capacity_metrics->set_value(_capacity);
1922
10
    }
1923
10
    auto use_time = duration_cast<milliseconds>(steady_clock::time_point() - adjust_start_time);
1924
10
    LOG(INFO) << "Finish tag deleted block. path=" << _cache_base_path
1925
10
              << " use_time=" << cast_set<int64_t>(use_time.count());
1926
10
    ss << " old_capacity=" << old_capacity << " new_capacity=" << new_capacity;
1927
10
    LOG(INFO) << ss.str();
1928
10
    return ss.str();
1929
10
}
1930
1931
2.69k
void BlockFileCache::check_disk_resource_limit() {
1932
2.69k
    if (_storage->get_type() != FileCacheStorageType::DISK) {
1933
23
        return;
1934
23
    }
1935
1936
2.67k
    bool previous_mode = _disk_resource_limit_mode;
1937
2.67k
    if (_capacity > _cur_cache_size) {
1938
2.67k
        _disk_resource_limit_mode = false;
1939
2.67k
        _disk_limit_mode_metrics->set_value(0);
1940
2.67k
    }
1941
2.67k
    std::pair<int, int> percent;
1942
2.67k
    int ret = disk_used_percentage(_cache_base_path, &percent);
1943
2.67k
    if (ret != 0) {
1944
1
        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
1945
1
        return;
1946
1
    }
1947
2.67k
    auto [space_percentage, inode_percentage] = percent;
1948
5.35k
    auto is_insufficient = [](const int& percentage) {
1949
5.35k
        return percentage >= config::file_cache_enter_disk_resource_limit_mode_percent;
1950
5.35k
    };
1951
2.67k
    DCHECK_GE(space_percentage, 0);
1952
2.67k
    DCHECK_LE(space_percentage, 100);
1953
2.67k
    DCHECK_GE(inode_percentage, 0);
1954
2.67k
    DCHECK_LE(inode_percentage, 100);
1955
    // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
1956
    // FIXME: reject with config validator
1957
2.67k
    if (config::file_cache_enter_disk_resource_limit_mode_percent <
1958
2.67k
        config::file_cache_exit_disk_resource_limit_mode_percent) {
1959
1
        LOG_WARNING("config error, set to default value")
1960
1
                .tag("enter", config::file_cache_enter_disk_resource_limit_mode_percent)
1961
1
                .tag("exit", config::file_cache_exit_disk_resource_limit_mode_percent);
1962
1
        config::file_cache_enter_disk_resource_limit_mode_percent = 88;
1963
1
        config::file_cache_exit_disk_resource_limit_mode_percent = 80;
1964
1
    }
1965
2.67k
    bool is_space_insufficient = is_insufficient(space_percentage);
1966
2.67k
    bool is_inode_insufficient = is_insufficient(inode_percentage);
1967
2.67k
    if (is_space_insufficient || is_inode_insufficient) {
1968
3
        _disk_resource_limit_mode = true;
1969
3
        _disk_limit_mode_metrics->set_value(1);
1970
2.67k
    } else if (_disk_resource_limit_mode &&
1971
2.67k
               (space_percentage < config::file_cache_exit_disk_resource_limit_mode_percent) &&
1972
2.67k
               (inode_percentage < config::file_cache_exit_disk_resource_limit_mode_percent)) {
1973
0
        _disk_resource_limit_mode = false;
1974
0
        _disk_limit_mode_metrics->set_value(0);
1975
0
    }
1976
2.67k
    if (previous_mode != _disk_resource_limit_mode) {
1977
        // add log for disk resource limit mode switching
1978
6
        if (_disk_resource_limit_mode) {
1979
3
            LOG(WARNING) << "Entering disk resource limit mode: file_cache=" << get_base_path()
1980
3
                         << " space_percent=" << space_percentage
1981
3
                         << " inode_percent=" << inode_percentage
1982
3
                         << " is_space_insufficient=" << is_space_insufficient
1983
3
                         << " is_inode_insufficient=" << is_inode_insufficient
1984
3
                         << " enter threshold="
1985
3
                         << config::file_cache_enter_disk_resource_limit_mode_percent;
1986
3
        } else {
1987
3
            LOG(INFO) << "Exiting disk resource limit mode: file_cache=" << get_base_path()
1988
3
                      << " space_percent=" << space_percentage
1989
3
                      << " inode_percent=" << inode_percentage << " exit threshold="
1990
3
                      << config::file_cache_exit_disk_resource_limit_mode_percent;
1991
3
        }
1992
2.66k
    } else if (_disk_resource_limit_mode) {
1993
        // print log for disk resource limit mode running, but less frequently
1994
0
        LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path()
1995
0
                                 << " space_percent=" << space_percentage
1996
0
                                 << " inode_percent=" << inode_percentage
1997
0
                                 << " is_space_insufficient=" << is_space_insufficient
1998
0
                                 << " is_inode_insufficient=" << is_inode_insufficient
1999
0
                                 << " mode run in resource limit";
2000
0
    }
2001
2.67k
}
2002
2003
2.54k
void BlockFileCache::check_need_evict_cache_in_advance() {
2004
2.54k
    if (_storage->get_type() != FileCacheStorageType::DISK) {
2005
1
        return;
2006
1
    }
2007
2008
2.54k
    std::pair<int, int> percent;
2009
2.54k
    int ret = disk_used_percentage(_cache_base_path, &percent);
2010
2.54k
    if (ret != 0) {
2011
1
        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
2012
1
        return;
2013
1
    }
2014
2.54k
    auto [space_percentage, inode_percentage] = percent;
2015
2.54k
    int size_percentage = static_cast<int>(_cur_cache_size * 100 / _capacity);
2016
7.62k
    auto is_insufficient = [](const int& percentage) {
2017
7.62k
        return percentage >= config::file_cache_enter_need_evict_cache_in_advance_percent;
2018
7.62k
    };
2019
2.54k
    DCHECK_GE(space_percentage, 0);
2020
2.54k
    DCHECK_LE(space_percentage, 100);
2021
2.54k
    DCHECK_GE(inode_percentage, 0);
2022
2.54k
    DCHECK_LE(inode_percentage, 100);
2023
    // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
2024
    // FIXME: reject with config validator
2025
2.54k
    if (config::file_cache_enter_need_evict_cache_in_advance_percent <=
2026
2.54k
        config::file_cache_exit_need_evict_cache_in_advance_percent) {
2027
1
        LOG_WARNING("config error, set to default value")
2028
1
                .tag("enter", config::file_cache_enter_need_evict_cache_in_advance_percent)
2029
1
                .tag("exit", config::file_cache_exit_need_evict_cache_in_advance_percent);
2030
1
        config::file_cache_enter_need_evict_cache_in_advance_percent = 78;
2031
1
        config::file_cache_exit_need_evict_cache_in_advance_percent = 75;
2032
1
    }
2033
2.54k
    bool previous_mode = _need_evict_cache_in_advance;
2034
2.54k
    bool is_space_insufficient = is_insufficient(space_percentage);
2035
2.54k
    bool is_inode_insufficient = is_insufficient(inode_percentage);
2036
2.54k
    bool is_size_insufficient = is_insufficient(size_percentage);
2037
2.54k
    if (is_space_insufficient || is_inode_insufficient || is_size_insufficient) {
2038
78
        _need_evict_cache_in_advance = true;
2039
78
        _need_evict_cache_in_advance_metrics->set_value(1);
2040
2.46k
    } else if (_need_evict_cache_in_advance &&
2041
2.46k
               (space_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
2042
2.46k
               (inode_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
2043
2.46k
               (size_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent)) {
2044
65
        _need_evict_cache_in_advance = false;
2045
65
        _need_evict_cache_in_advance_metrics->set_value(0);
2046
65
    }
2047
2.54k
    if (previous_mode != _need_evict_cache_in_advance) {
2048
        // add log for evict cache in advance mode switching
2049
134
        if (_need_evict_cache_in_advance) {
2050
69
            LOG(WARNING) << "Entering evict cache in advance mode: "
2051
69
                         << "file_cache=" << get_base_path()
2052
69
                         << " space_percent=" << space_percentage
2053
69
                         << " inode_percent=" << inode_percentage
2054
69
                         << " size_percent=" << size_percentage
2055
69
                         << " is_space_insufficient=" << is_space_insufficient
2056
69
                         << " is_inode_insufficient=" << is_inode_insufficient
2057
69
                         << " is_size_insufficient=" << is_size_insufficient << " enter threshold="
2058
69
                         << config::file_cache_enter_need_evict_cache_in_advance_percent;
2059
69
        } else {
2060
65
            LOG(INFO) << "Exiting evict cache in advance mode: "
2061
65
                      << "file_cache=" << get_base_path() << " space_percent=" << space_percentage
2062
65
                      << " inode_percent=" << inode_percentage
2063
65
                      << " size_percent=" << size_percentage << " exit threshold="
2064
65
                      << config::file_cache_exit_need_evict_cache_in_advance_percent;
2065
65
        }
2066
2.40k
    } else if (_need_evict_cache_in_advance) {
2067
        // print log for evict cache in advance mode running, but less frequently
2068
9
        LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path()
2069
2
                                 << " space_percent=" << space_percentage
2070
2
                                 << " inode_percent=" << inode_percentage
2071
2
                                 << " size_percent=" << size_percentage
2072
2
                                 << " is_space_insufficient=" << is_space_insufficient
2073
2
                                 << " is_inode_insufficient=" << is_inode_insufficient
2074
2
                                 << " is_size_insufficient=" << is_size_insufficient
2075
2
                                 << " need evict cache in advance";
2076
9
    }
2077
2.54k
}
2078
2079
161
void BlockFileCache::run_background_monitor() {
2080
161
    Thread::set_self_name("run_background_monitor");
2081
2.70k
    while (!_close) {
2082
2.69k
        int64_t interval_ms = config::file_cache_background_monitor_interval_ms;
2083
2.69k
        TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_ms);
2084
2.69k
        check_disk_resource_limit();
2085
2.69k
        if (config::enable_evict_file_cache_in_advance) {
2086
2.53k
            check_need_evict_cache_in_advance();
2087
2.53k
        } else {
2088
163
            _need_evict_cache_in_advance = false;
2089
163
            _need_evict_cache_in_advance_metrics->set_value(0);
2090
163
        }
2091
2092
2.69k
        {
2093
2.69k
            std::unique_lock close_lock(_close_mtx);
2094
2.69k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2095
2.69k
            if (_close) {
2096
158
                break;
2097
158
            }
2098
2.69k
        }
2099
        // report
2100
2.54k
        {
2101
2.54k
            SCOPED_CACHE_LOCK(_mutex, this);
2102
2.54k
            _cur_cache_size_metrics->set_value(_cur_cache_size);
2103
2.54k
            _cur_ttl_cache_size_metrics->set_value(_cur_cache_size -
2104
2.54k
                                                   _index_queue.get_capacity(cache_lock) -
2105
2.54k
                                                   _normal_queue.get_capacity(cache_lock) -
2106
2.54k
                                                   _disposable_queue.get_capacity(cache_lock));
2107
2.54k
            _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(
2108
2.54k
                    _ttl_queue.get_capacity(cache_lock));
2109
2.54k
            _cur_ttl_cache_lru_queue_element_count_metrics->set_value(
2110
2.54k
                    _ttl_queue.get_elements_num(cache_lock));
2111
2.54k
            _cur_normal_queue_cache_size_metrics->set_value(_normal_queue.get_capacity(cache_lock));
2112
2.54k
            _cur_normal_queue_element_count_metrics->set_value(
2113
2.54k
                    _normal_queue.get_elements_num(cache_lock));
2114
2.54k
            _cur_index_queue_cache_size_metrics->set_value(_index_queue.get_capacity(cache_lock));
2115
2.54k
            _cur_index_queue_element_count_metrics->set_value(
2116
2.54k
                    _index_queue.get_elements_num(cache_lock));
2117
2.54k
            _cur_disposable_queue_cache_size_metrics->set_value(
2118
2.54k
                    _disposable_queue.get_capacity(cache_lock));
2119
2.54k
            _cur_disposable_queue_element_count_metrics->set_value(
2120
2.54k
                    _disposable_queue.get_elements_num(cache_lock));
2121
2122
            // Update meta store write queue size if storage is FSFileCacheStorage
2123
2.54k
            if (_storage->get_type() == FileCacheStorageType::DISK) {
2124
2.53k
                auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get());
2125
2.53k
                if (fs_storage != nullptr) {
2126
2.53k
                    auto* meta_store = fs_storage->get_meta_store();
2127
2.53k
                    if (meta_store != nullptr) {
2128
2.53k
                        _meta_store_write_queue_size_metrics->set_value(
2129
2.53k
                                meta_store->get_write_queue_size());
2130
2.53k
                    }
2131
2.53k
                }
2132
2.53k
            }
2133
2134
2.54k
            if (_num_read_blocks->get_value() > 0) {
2135
1.75k
                _hit_ratio->set_value((double)_num_hit_blocks->get_value() /
2136
1.75k
                                      (double)_num_read_blocks->get_value());
2137
1.75k
            }
2138
2.54k
            if (_num_read_blocks_5m && _num_read_blocks_5m->get_value() > 0) {
2139
1.32k
                _hit_ratio_5m->set_value((double)_num_hit_blocks_5m->get_value() /
2140
1.32k
                                         (double)_num_read_blocks_5m->get_value());
2141
1.32k
            }
2142
2.54k
            if (_num_read_blocks_1h && _num_read_blocks_1h->get_value() > 0) {
2143
1.74k
                _hit_ratio_1h->set_value((double)_num_hit_blocks_1h->get_value() /
2144
1.74k
                                         (double)_num_read_blocks_1h->get_value());
2145
1.74k
            }
2146
2147
2.54k
            if (_no_warmup_num_read_blocks->get_value() > 0) {
2148
1.75k
                _no_warmup_hit_ratio->set_value((double)_no_warmup_num_hit_blocks->get_value() /
2149
1.75k
                                                (double)_no_warmup_num_read_blocks->get_value());
2150
1.75k
            }
2151
2.54k
            if (_no_warmup_num_read_blocks_5m && _no_warmup_num_read_blocks_5m->get_value() > 0) {
2152
1.32k
                _no_warmup_hit_ratio_5m->set_value(
2153
1.32k
                        (double)_no_warmup_num_hit_blocks_5m->get_value() /
2154
1.32k
                        (double)_no_warmup_num_read_blocks_5m->get_value());
2155
1.32k
            }
2156
2.54k
            if (_no_warmup_num_read_blocks_1h && _no_warmup_num_read_blocks_1h->get_value() > 0) {
2157
1.74k
                _no_warmup_hit_ratio_1h->set_value(
2158
1.74k
                        (double)_no_warmup_num_hit_blocks_1h->get_value() /
2159
1.74k
                        (double)_no_warmup_num_read_blocks_1h->get_value());
2160
1.74k
            }
2161
2.54k
        }
2162
2.54k
        _lru_recorder->update_shadow_queue_element_count_metrics();
2163
2.54k
    }
2164
161
}
2165
2166
161
void BlockFileCache::run_background_gc() {
2167
161
    Thread::set_self_name("run_background_gc");
2168
161
    FileCacheKey key;
2169
161
    size_t batch_count = 0;
2170
126k
    while (!_close) {
2171
126k
        int64_t interval_ms = config::file_cache_background_gc_interval_ms;
2172
126k
        size_t batch_limit = config::file_cache_remove_block_qps_limit * interval_ms / 1000;
2173
126k
        {
2174
126k
            std::unique_lock close_lock(_close_mtx);
2175
126k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2176
126k
            if (_close) {
2177
158
                break;
2178
158
            }
2179
126k
        }
2180
2181
183k
        while (batch_count < batch_limit && _recycle_keys.try_dequeue(key)) {
2182
57.7k
            int64_t duration_ns = 0;
2183
57.7k
            Status st;
2184
57.7k
            {
2185
57.7k
                SCOPED_RAW_TIMER(&duration_ns);
2186
57.7k
                st = _storage->remove(key);
2187
57.7k
            }
2188
57.7k
            *_storage_async_remove_latency_us << (duration_ns / 1000);
2189
2190
57.7k
            if (!st.ok()) {
2191
0
                LOG_WARNING("").error(st);
2192
0
            }
2193
57.7k
            batch_count++;
2194
57.7k
        }
2195
126k
        *_recycle_keys_length_recorder << _recycle_keys.size_approx();
2196
126k
        batch_count = 0;
2197
126k
    }
2198
161
}
2199
2200
161
void BlockFileCache::run_background_evict_in_advance() {
2201
161
    Thread::set_self_name("run_background_evict_in_advance");
2202
161
    LOG(INFO) << "Starting background evict in advance thread";
2203
161
    int64_t batch = 0;
2204
12.9k
    while (!_close) {
2205
12.8k
        {
2206
12.8k
            std::unique_lock close_lock(_close_mtx);
2207
12.8k
            _close_cv.wait_for(
2208
12.8k
                    close_lock,
2209
12.8k
                    std::chrono::milliseconds(config::file_cache_evict_in_advance_interval_ms));
2210
12.8k
            if (_close) {
2211
158
                LOG(INFO) << "Background evict in advance thread exiting due to cache closing";
2212
158
                break;
2213
158
            }
2214
12.8k
        }
2215
12.7k
        batch = config::file_cache_evict_in_advance_batch_bytes;
2216
2217
        // Skip if eviction not needed or too many pending recycles
2218
12.7k
        if (!_need_evict_cache_in_advance ||
2219
12.7k
            _recycle_keys.size_approx() >=
2220
12.4k
                    config::file_cache_evict_in_advance_recycle_keys_num_threshold) {
2221
12.4k
            continue;
2222
12.4k
        }
2223
2224
335
        int64_t duration_ns = 0;
2225
335
        {
2226
335
            SCOPED_CACHE_LOCK(_mutex, this);
2227
335
            SCOPED_RAW_TIMER(&duration_ns);
2228
335
            try_evict_in_advance(batch, cache_lock);
2229
335
        }
2230
335
        *_evict_in_advance_latency_us << (duration_ns / 1000);
2231
335
    }
2232
161
}
2233
2234
161
void BlockFileCache::run_background_block_lru_update() {
2235
161
    Thread::set_self_name("run_background_block_lru_update");
2236
161
    std::vector<FileBlockSPtr> batch;
2237
2.72k
    while (!_close) {
2238
2.72k
        int64_t interval_ms = config::file_cache_background_block_lru_update_interval_ms;
2239
2.72k
        size_t batch_limit =
2240
2.72k
                config::file_cache_background_block_lru_update_qps_limit * interval_ms / 1000;
2241
2.72k
        {
2242
2.72k
            std::unique_lock close_lock(_close_mtx);
2243
2.72k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2244
2.72k
            if (_close) {
2245
158
                break;
2246
158
            }
2247
2.72k
        }
2248
2249
2.56k
        batch.clear();
2250
2.56k
        batch.reserve(batch_limit);
2251
2.56k
        size_t drained = _need_update_lru_blocks.drain(batch_limit, &batch);
2252
2.56k
        if (drained == 0) {
2253
1.73k
            *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2254
1.73k
            continue;
2255
1.73k
        }
2256
831
        *_need_update_lru_blocks_consume_metrics << drained;
2257
2258
831
        int64_t duration_ns = 0;
2259
831
        {
2260
831
            SCOPED_CACHE_LOCK(_mutex, this);
2261
831
            SCOPED_RAW_TIMER(&duration_ns);
2262
157k
            for (auto& block : batch) {
2263
157k
                update_block_lru(block, cache_lock);
2264
157k
            }
2265
831
        }
2266
831
        *_update_lru_blocks_latency_us << (duration_ns / 1000);
2267
831
        *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2268
831
    }
2269
161
}
2270
2271
std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>>
2272
2
BlockFileCache::get_hot_blocks_meta(const UInt128Wrapper& hash) const {
2273
2
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
2274
2
                               std::chrono::steady_clock::now().time_since_epoch())
2275
2
                               .count();
2276
2
    SCOPED_CACHE_LOCK(_mutex, this);
2277
2
    std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>> blocks_meta;
2278
2
    if (auto iter = _files.find(hash); iter != _files.end()) {
2279
5
        for (auto& pair : _files.find(hash)->second) {
2280
5
            const FileBlockCell* cell = &pair.second;
2281
5
            if (cell->file_block->cache_type() != FileCacheType::DISPOSABLE) {
2282
4
                if (cell->file_block->cache_type() == FileCacheType::TTL ||
2283
4
                    (cell->atime != 0 &&
2284
3
                     cur_time - cell->atime <
2285
3
                             get_queue(cell->file_block->cache_type()).get_hot_data_interval())) {
2286
3
                    blocks_meta.emplace_back(pair.first, cell->size(),
2287
3
                                             cell->file_block->cache_type(),
2288
3
                                             cell->file_block->expiration_time());
2289
3
                }
2290
4
            }
2291
5
        }
2292
2
    }
2293
2
    return blocks_meta;
2294
2
}
2295
2296
bool BlockFileCache::try_reserve_during_async_load(size_t size,
2297
4
                                                   std::lock_guard<std::mutex>& cache_lock) {
2298
4
    size_t removed_size = 0;
2299
4
    size_t normal_queue_size = _normal_queue.get_capacity(cache_lock);
2300
4
    size_t disposable_queue_size = _disposable_queue.get_capacity(cache_lock);
2301
4
    size_t index_queue_size = _index_queue.get_capacity(cache_lock);
2302
2303
4
    std::vector<FileBlockCell*> to_evict;
2304
4
    auto collect_eliminate_fragments = [&](LRUQueue& queue) {
2305
4
        for (const auto& [entry_key, entry_offset, entry_size] : queue) {
2306
4
            if (!_disk_resource_limit_mode || removed_size >= size) {
2307
3
                break;
2308
3
            }
2309
1
            auto* cell = get_cell(entry_key, entry_offset, cache_lock);
2310
2311
1
            DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
2312
0
                         << ", offset: " << entry_offset;
2313
2314
1
            size_t cell_size = cell->size();
2315
1
            DCHECK(entry_size == cell_size);
2316
2317
1
            if (cell->releasable()) {
2318
1
                auto& file_block = cell->file_block;
2319
2320
1
                std::lock_guard block_lock(file_block->_mutex);
2321
1
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
2322
1
                to_evict.push_back(cell);
2323
1
                removed_size += cell_size;
2324
1
            }
2325
1
        }
2326
3
    };
2327
4
    if (disposable_queue_size != 0) {
2328
0
        collect_eliminate_fragments(get_queue(FileCacheType::DISPOSABLE));
2329
0
    }
2330
4
    if (normal_queue_size != 0) {
2331
3
        collect_eliminate_fragments(get_queue(FileCacheType::NORMAL));
2332
3
    }
2333
4
    if (index_queue_size != 0) {
2334
0
        collect_eliminate_fragments(get_queue(FileCacheType::INDEX));
2335
0
    }
2336
4
    std::string reason = "async load";
2337
4
    remove_file_blocks(to_evict, cache_lock, true, reason);
2338
2339
4
    return !_disk_resource_limit_mode || removed_size >= size;
2340
4
}
2341
2342
28
void BlockFileCache::clear_need_update_lru_blocks() {
2343
28
    _need_update_lru_blocks.clear();
2344
28
    *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2345
28
}
2346
2347
288k
std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const UInt128Wrapper& hash) {
2348
288k
    std::map<size_t, FileBlockSPtr> offset_to_block;
2349
288k
    SCOPED_CACHE_LOCK(_mutex, this);
2350
288k
    if (_files.contains(hash)) {
2351
253k
        for (auto& [offset, cell] : _files[hash]) {
2352
253k
            if (cell.file_block->state() == FileBlock::State::DOWNLOADED) {
2353
247k
                cell.file_block->_owned_by_cached_reader = true;
2354
247k
                offset_to_block.emplace(offset, cell.file_block);
2355
247k
            }
2356
253k
        }
2357
245k
    }
2358
288k
    return offset_to_block;
2359
288k
}
2360
2361
0
void BlockFileCache::update_ttl_atime(const UInt128Wrapper& hash) {
2362
0
    SCOPED_CACHE_LOCK(_mutex, this);
2363
0
    if (auto iter = _files.find(hash); iter != _files.end()) {
2364
0
        for (auto& [_, cell] : iter->second) {
2365
0
            cell.update_atime();
2366
0
        }
2367
0
    };
2368
0
}
2369
2370
161
void BlockFileCache::run_background_lru_log_replay() {
2371
161
    Thread::set_self_name("run_background_lru_log_replay");
2372
10.6M
    while (!_close) {
2373
10.6M
        int64_t interval_ms = config::file_cache_background_lru_log_replay_interval_ms;
2374
10.6M
        {
2375
10.6M
            std::unique_lock close_lock(_close_mtx);
2376
10.6M
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2377
10.6M
            if (_close) {
2378
141
                break;
2379
141
            }
2380
10.6M
        }
2381
2382
10.6M
        replay_lru_logs_once();
2383
10.6M
    }
2384
161
}
2385
2386
10.6M
size_t BlockFileCache::replay_lru_logs_once() {
2387
10.6M
    size_t replayed = 0;
2388
42.7M
    for (FileCacheType type : LRU_LOG_REPLAY_TYPES) {
2389
42.7M
        replayed += _lru_recorder->replay_queue_event(type);
2390
42.7M
    }
2391
2392
10.6M
    if (replayed == 0) {
2393
10.5M
        *_lru_recorder_log_replay_idle_metrics << 1;
2394
10.5M
    }
2395
2396
10.6M
    if (config::enable_evaluate_shadow_queue_diff) {
2397
0
        SCOPED_CACHE_LOCK(_mutex, this);
2398
0
        _lru_recorder->evaluate_queue_diff(_ttl_queue, "ttl", cache_lock);
2399
0
        _lru_recorder->evaluate_queue_diff(_index_queue, "index", cache_lock);
2400
0
        _lru_recorder->evaluate_queue_diff(_normal_queue, "normal", cache_lock);
2401
0
        _lru_recorder->evaluate_queue_diff(_disposable_queue, "disposable", cache_lock);
2402
0
    }
2403
10.6M
    return replayed;
2404
10.6M
}
2405
2406
222
void BlockFileCache::dump_lru_queues(bool force) {
2407
222
    std::unique_lock dump_lock(_dump_lru_queues_mtx);
2408
222
    if (config::file_cache_background_lru_dump_tail_record_num > 0 &&
2409
222
        !ExecEnv::GetInstance()->get_is_upgrading()) {
2410
222
        _lru_dumper->dump_queue("disposable", force);
2411
222
        _lru_dumper->dump_queue("normal", force);
2412
222
        _lru_dumper->dump_queue("index", force);
2413
222
        _lru_dumper->dump_queue("ttl", force);
2414
222
        _lru_dumper->set_first_dump_done();
2415
222
    }
2416
222
}
2417
2418
161
void BlockFileCache::run_background_lru_dump() {
2419
161
    Thread::set_self_name("run_background_lru_dump");
2420
386
    while (!_close) {
2421
383
        int64_t interval_ms = config::file_cache_background_lru_dump_interval_ms;
2422
383
        {
2423
383
            std::unique_lock close_lock(_close_mtx);
2424
383
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2425
383
            if (_close) {
2426
158
                break;
2427
158
            }
2428
383
        }
2429
225
        dump_lru_queues(false);
2430
225
    }
2431
161
}
2432
2433
161
void BlockFileCache::restore_lru_queues_from_disk(std::lock_guard<std::mutex>& cache_lock) {
2434
    // keep this order coz may be duplicated in different queue, we use the first appearence
2435
161
    _lru_dumper->restore_queue(_ttl_queue, "ttl", cache_lock);
2436
161
    _lru_dumper->restore_queue(_index_queue, "index", cache_lock);
2437
161
    _lru_dumper->restore_queue(_normal_queue, "normal", cache_lock);
2438
161
    _lru_dumper->restore_queue(_disposable_queue, "disposable", cache_lock);
2439
161
}
2440
2441
79
std::map<std::string, double> BlockFileCache::get_stats() {
2442
79
    std::map<std::string, double> stats;
2443
79
    stats["hits_ratio"] = (double)_hit_ratio->get_value();
2444
79
    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
2445
79
    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
2446
2447
79
    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
2448
79
    stats["index_queue_curr_size"] = (double)_cur_index_queue_cache_size_metrics->get_value();
2449
79
    stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
2450
79
    stats["index_queue_curr_elements"] =
2451
79
            (double)_cur_index_queue_element_count_metrics->get_value();
2452
2453
79
    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
2454
79
    stats["ttl_queue_curr_size"] = (double)_cur_ttl_cache_lru_queue_cache_size_metrics->get_value();
2455
79
    stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
2456
79
    stats["ttl_queue_curr_elements"] =
2457
79
            (double)_cur_ttl_cache_lru_queue_element_count_metrics->get_value();
2458
2459
79
    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
2460
79
    stats["normal_queue_curr_size"] = (double)_cur_normal_queue_cache_size_metrics->get_value();
2461
79
    stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
2462
79
    stats["normal_queue_curr_elements"] =
2463
79
            (double)_cur_normal_queue_element_count_metrics->get_value();
2464
2465
79
    stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
2466
79
    stats["disposable_queue_curr_size"] =
2467
79
            (double)_cur_disposable_queue_cache_size_metrics->get_value();
2468
79
    stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
2469
79
    stats["disposable_queue_curr_elements"] =
2470
79
            (double)_cur_disposable_queue_element_count_metrics->get_value();
2471
2472
79
    stats["lru_recorder_index_shadow_queue_curr_elements"] =
2473
79
            (double)_lru_recorder_shadow_queue_element_count_metrics[FileCacheType::INDEX]
2474
79
                    ->get_value();
2475
79
    stats["lru_recorder_ttl_shadow_queue_curr_elements"] =
2476
79
            (double)_lru_recorder_shadow_queue_element_count_metrics[FileCacheType::TTL]
2477
79
                    ->get_value();
2478
79
    stats["lru_recorder_normal_shadow_queue_curr_elements"] =
2479
79
            (double)_lru_recorder_shadow_queue_element_count_metrics[FileCacheType::NORMAL]
2480
79
                    ->get_value();
2481
79
    stats["lru_recorder_disposable_shadow_queue_curr_elements"] =
2482
79
            (double)_lru_recorder_shadow_queue_element_count_metrics[FileCacheType::DISPOSABLE]
2483
79
                    ->get_value();
2484
2485
79
    stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance;
2486
79
    stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode;
2487
2488
79
    stats["total_removed_counts"] = (double)_num_removed_blocks->get_value();
2489
79
    stats["total_hit_counts"] = (double)_num_hit_blocks->get_value();
2490
79
    stats["total_read_counts"] = (double)_num_read_blocks->get_value();
2491
2492
79
    stats["total_read_size"] = (double)_total_read_size_metrics->get_value();
2493
79
    stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value();
2494
79
    stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value();
2495
2496
79
    return stats;
2497
79
}
2498
2499
// for be UTs
2500
174
std::map<std::string, double> BlockFileCache::get_stats_unsafe() {
2501
174
    std::map<std::string, double> stats;
2502
174
    stats["hits_ratio"] = (double)_hit_ratio->get_value();
2503
174
    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
2504
174
    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
2505
2506
174
    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
2507
174
    stats["index_queue_curr_size"] = (double)_index_queue.get_capacity_unsafe();
2508
174
    stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
2509
174
    stats["index_queue_curr_elements"] = (double)_index_queue.get_elements_num_unsafe();
2510
2511
174
    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
2512
174
    stats["ttl_queue_curr_size"] = (double)_ttl_queue.get_capacity_unsafe();
2513
174
    stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
2514
174
    stats["ttl_queue_curr_elements"] = (double)_ttl_queue.get_elements_num_unsafe();
2515
2516
174
    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
2517
174
    stats["normal_queue_curr_size"] = (double)_normal_queue.get_capacity_unsafe();
2518
174
    stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
2519
174
    stats["normal_queue_curr_elements"] = (double)_normal_queue.get_elements_num_unsafe();
2520
2521
174
    stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
2522
174
    stats["disposable_queue_curr_size"] = (double)_disposable_queue.get_capacity_unsafe();
2523
174
    stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
2524
174
    stats["disposable_queue_curr_elements"] = (double)_disposable_queue.get_elements_num_unsafe();
2525
174
    stats["lru_recorder_index_shadow_queue_curr_elements"] =
2526
174
            (double)_lru_recorder_shadow_queue_element_count_metrics[FileCacheType::INDEX]
2527
174
                    ->get_value();
2528
174
    stats["lru_recorder_ttl_shadow_queue_curr_elements"] =
2529
174
            (double)_lru_recorder_shadow_queue_element_count_metrics[FileCacheType::TTL]
2530
174
                    ->get_value();
2531
174
    stats["lru_recorder_normal_shadow_queue_curr_elements"] =
2532
174
            (double)_lru_recorder_shadow_queue_element_count_metrics[FileCacheType::NORMAL]
2533
174
                    ->get_value();
2534
174
    stats["lru_recorder_disposable_shadow_queue_curr_elements"] =
2535
174
            (double)_lru_recorder_shadow_queue_element_count_metrics[FileCacheType::DISPOSABLE]
2536
174
                    ->get_value();
2537
2538
174
    stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance;
2539
174
    stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode;
2540
2541
174
    stats["total_removed_counts"] = (double)_num_removed_blocks->get_value();
2542
174
    stats["total_hit_counts"] = (double)_num_hit_blocks->get_value();
2543
174
    stats["total_read_counts"] = (double)_num_read_blocks->get_value();
2544
2545
174
    stats["total_read_size"] = (double)_total_read_size_metrics->get_value();
2546
174
    stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value();
2547
174
    stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value();
2548
2549
174
    return stats;
2550
174
}
2551
2552
template void BlockFileCache::remove(FileBlockSPtr file_block,
2553
                                     std::lock_guard<std::mutex>& cache_lock,
2554
                                     std::lock_guard<std::mutex>& block_lock, bool sync);
2555
2556
1
Status BlockFileCache::report_file_cache_inconsistency(std::vector<std::string>& results) {
2557
1
    InconsistencyContext inconsistency_context;
2558
1
    RETURN_IF_ERROR(check_file_cache_consistency(inconsistency_context));
2559
1
    auto n = inconsistency_context.types.size();
2560
1
    results.reserve(n);
2561
1
    for (size_t i = 0; i < n; i++) {
2562
0
        std::string result;
2563
0
        result += "File cache info in manager:\n";
2564
0
        result += inconsistency_context.infos_in_manager[i].to_string();
2565
0
        result += "File cache info in storage:\n";
2566
0
        result += inconsistency_context.infos_in_storage[i].to_string();
2567
0
        result += inconsistency_context.types[i].to_string();
2568
0
        result += "\n";
2569
0
        results.push_back(std::move(result));
2570
0
    }
2571
1
    return Status::OK();
2572
1
}
2573
2574
1
Status BlockFileCache::check_file_cache_consistency(InconsistencyContext& inconsistency_context) {
2575
1
    std::lock_guard<std::mutex> cache_lock(_mutex);
2576
1
    std::vector<FileCacheInfo> infos_in_storage;
2577
1
    RETURN_IF_ERROR(_storage->get_file_cache_infos(infos_in_storage, cache_lock));
2578
1
    std::unordered_set<AccessKeyAndOffset, KeyAndOffsetHash> confirmed_blocks;
2579
1
    for (const auto& info_in_storage : infos_in_storage) {
2580
0
        confirmed_blocks.insert({info_in_storage.hash, info_in_storage.offset});
2581
0
        auto* cell = get_cell(info_in_storage.hash, info_in_storage.offset, cache_lock);
2582
0
        if (cell == nullptr || cell->file_block == nullptr) {
2583
0
            inconsistency_context.infos_in_manager.emplace_back();
2584
0
            inconsistency_context.infos_in_storage.push_back(info_in_storage);
2585
0
            inconsistency_context.types.emplace_back(InconsistencyType::NOT_LOADED);
2586
0
            continue;
2587
0
        }
2588
0
        FileCacheInfo info_in_manager {
2589
0
                .hash = info_in_storage.hash,
2590
0
                .expiration_time = cell->file_block->expiration_time(),
2591
0
                .size = cell->size(),
2592
0
                .offset = info_in_storage.offset,
2593
0
                .is_tmp = cell->file_block->state() == FileBlock::State::DOWNLOADING,
2594
0
                .cache_type = cell->file_block->cache_type()};
2595
0
        InconsistencyType inconsistent_type;
2596
0
        if (info_in_storage.is_tmp != info_in_manager.is_tmp) {
2597
0
            inconsistent_type |= InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE;
2598
0
        }
2599
0
        size_t expected_size =
2600
0
                info_in_manager.is_tmp ? cell->dowloading_size() : info_in_manager.size;
2601
0
        if (info_in_storage.size != expected_size) {
2602
0
            inconsistent_type |= InconsistencyType::SIZE_INCONSISTENT;
2603
0
        }
2604
        // Only if it is not a tmp file need we check the cache type.
2605
0
        if ((inconsistent_type & InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE) == 0 &&
2606
0
            info_in_storage.cache_type != info_in_manager.cache_type) {
2607
0
            inconsistent_type |= InconsistencyType::CACHE_TYPE_INCONSISTENT;
2608
0
        }
2609
0
        if (info_in_storage.expiration_time != info_in_manager.expiration_time) {
2610
0
            inconsistent_type |= InconsistencyType::EXPIRATION_TIME_INCONSISTENT;
2611
0
        }
2612
0
        if (inconsistent_type != InconsistencyType::NONE) {
2613
0
            inconsistency_context.infos_in_manager.push_back(info_in_manager);
2614
0
            inconsistency_context.infos_in_storage.push_back(info_in_storage);
2615
0
            inconsistency_context.types.push_back(inconsistent_type);
2616
0
        }
2617
0
    }
2618
2619
1
    for (const auto& [hash, offset_to_cell] : _files) {
2620
0
        for (const auto& [offset, cell] : offset_to_cell) {
2621
0
            if (confirmed_blocks.contains({hash, offset})) {
2622
0
                continue;
2623
0
            }
2624
0
            const auto& block = cell.file_block;
2625
0
            inconsistency_context.infos_in_manager.emplace_back(
2626
0
                    hash, block->expiration_time(), cell.size(), offset,
2627
0
                    cell.file_block->state() == FileBlock::State::DOWNLOADING, block->cache_type());
2628
0
            inconsistency_context.infos_in_storage.emplace_back();
2629
0
            inconsistency_context.types.emplace_back(InconsistencyType::MISSING_IN_STORAGE);
2630
0
        }
2631
0
    }
2632
1
    return Status::OK();
2633
1
}
2634
2635
} // namespace doris::io