Coverage Report

Created: 2026-07-02 23:26

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/cache/block_file_cache.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileCache.cpp
19
// and modified by Doris
20
21
#include "io/cache/block_file_cache.h"
22
23
#include <gen_cpp/file_cache.pb.h>
24
25
#include <cstdio>
26
#include <exception>
27
#include <fstream>
28
#include <unordered_set>
29
30
#include "common/status.h"
31
#include "cpp/sync_point.h"
32
#include "runtime/exec_env.h"
33
34
#if defined(__APPLE__)
35
#include <sys/mount.h>
36
#else
37
#include <sys/statfs.h>
38
#endif
39
40
#include <chrono> // IWYU pragma: keep
41
#include <mutex>
42
#include <ranges>
43
44
#include "common/cast_set.h"
45
#include "common/check.h"
46
#include "common/config.h"
47
#include "common/logging.h"
48
#include "core/uint128.h"
49
#include "exec/common/sip_hash.h"
50
#include "io/cache/block_file_cache_ttl_mgr.h"
51
#include "io/cache/file_block.h"
52
#include "io/cache/file_cache_common.h"
53
#include "io/cache/fs_file_cache_storage.h"
54
#include "io/cache/mem_file_cache_storage.h"
55
#include "runtime/runtime_profile.h"
56
#include "util/concurrency_stats.h"
57
#include "util/stack_util.h"
58
#include "util/stopwatch.hpp"
59
#include "util/thread.h"
60
#include "util/time.h"
61
namespace doris::io {
62
63
namespace {
64
65
constexpr std::array<FileCacheType, 4> LRU_LOG_REPLAY_TYPES = {
66
        FileCacheType::TTL, FileCacheType::INDEX, FileCacheType::NORMAL, FileCacheType::DISPOSABLE};
67
68
109k
size_t file_cache_type_index(FileCacheType type) {
69
109k
    return static_cast<size_t>(type);
70
109k
}
71
72
} // namespace
73
74
// Insert a block pointer into one shard while swallowing allocation failures.
75
193k
bool NeedUpdateLRUBlocks::insert(FileBlockSPtr block, size_t max_queue_size) {
76
193k
    if (!block || max_queue_size == 0) {
77
1
        return false;
78
1
    }
79
193k
    bool reserved = false;
80
193k
    try {
81
193k
        auto* raw_ptr = block.get();
82
193k
        auto idx = shard_index(raw_ptr);
83
193k
        auto& shard = _shards[idx];
84
193k
        std::lock_guard lock(shard.mutex);
85
193k
        if (shard.entries.contains(raw_ptr)) {
86
191k
            return false;
87
191k
        }
88
1.56k
        size_t cur_size = _size.load(std::memory_order_relaxed);
89
1.56k
        while (cur_size < max_queue_size) {
90
1.56k
            if (_size.compare_exchange_weak(cur_size, cur_size + 1, std::memory_order_relaxed)) {
91
1.56k
                reserved = true;
92
1.56k
                break;
93
1.56k
            }
94
1.56k
        }
95
1.56k
        if (!reserved) {
96
1
            return false;
97
1
        }
98
1.56k
        auto [_, inserted] = shard.entries.emplace(raw_ptr, std::move(block));
99
1.56k
        DORIS_CHECK(inserted);
100
1.56k
        return true;
101
1.56k
    } catch (const std::exception& e) {
102
0
        if (reserved) {
103
0
            decrease_size(1);
104
0
        }
105
0
        LOG(WARNING) << "Failed to enqueue block for LRU update: " << e.what();
106
0
    } catch (...) {
107
0
        if (reserved) {
108
0
            decrease_size(1);
109
0
        }
110
0
        LOG(WARNING) << "Failed to enqueue block for LRU update: unknown error";
111
0
    }
112
0
    return false;
113
193k
}
114
115
// Drain up to `limit` unique blocks to the caller, keeping the structure consistent on failures.
116
43
size_t NeedUpdateLRUBlocks::drain(size_t limit, std::vector<FileBlockSPtr>* output) {
117
43
    if (limit == 0 || output == nullptr) {
118
2
        return 0;
119
2
    }
120
41
    size_t drained = 0;
121
41
    try {
122
41
        output->reserve(output->size() + std::min(limit, size()));
123
2.57k
        for (auto& shard : _shards) {
124
2.57k
            if (drained >= limit) {
125
1
                break;
126
1
            }
127
2.57k
            std::lock_guard lock(shard.mutex);
128
2.57k
            auto it = shard.entries.begin();
129
2.57k
            size_t shard_drained = 0;
130
2.58k
            while (it != shard.entries.end() && drained + shard_drained < limit) {
131
10
                output->emplace_back(std::move(it->second));
132
10
                it = shard.entries.erase(it);
133
10
                ++shard_drained;
134
10
            }
135
2.57k
            if (shard_drained > 0) {
136
7
                decrease_size(shard_drained);
137
7
                drained += shard_drained;
138
7
            }
139
2.57k
        }
140
41
    } catch (const std::exception& e) {
141
0
        LOG(WARNING) << "Failed to drain LRU update blocks: " << e.what();
142
0
    } catch (...) {
143
0
        LOG(WARNING) << "Failed to drain LRU update blocks: unknown error";
144
0
    }
145
41
    return drained;
146
41
}
147
148
// Remove every pending block, guarding against unexpected exceptions.
149
26
void NeedUpdateLRUBlocks::clear() {
150
26
    try {
151
1.66k
        for (auto& shard : _shards) {
152
1.66k
            std::lock_guard lock(shard.mutex);
153
1.66k
            if (!shard.entries.empty()) {
154
2
                auto removed = shard.entries.size();
155
2
                shard.entries.clear();
156
2
                decrease_size(removed);
157
2
            }
158
1.66k
        }
159
26
    } catch (const std::exception& e) {
160
0
        LOG(WARNING) << "Failed to clear LRU update blocks: " << e.what();
161
0
    } catch (...) {
162
0
        LOG(WARNING) << "Failed to clear LRU update blocks: unknown error";
163
0
    }
164
26
}
165
166
9
void NeedUpdateLRUBlocks::decrease_size(size_t delta) {
167
9
    size_t cur_size = _size.load(std::memory_order_relaxed);
168
9
    while (true) {
169
9
        DORIS_CHECK_GE(cur_size, delta);
170
9
        if (_size.compare_exchange_weak(cur_size, cur_size - delta, std::memory_order_relaxed)) {
171
9
            return;
172
9
        }
173
9
    }
174
9
}
175
176
193k
size_t NeedUpdateLRUBlocks::shard_index(FileBlock* ptr) const {
177
193k
    DCHECK(ptr != nullptr);
178
193k
    return std::hash<FileBlock*> {}(ptr)&kShardMask;
179
193k
}
180
181
namespace {
182
183
13.6k
int64_t steady_clock_seconds() {
184
13.6k
    return std::chrono::duration_cast<std::chrono::seconds>(
185
13.6k
                   std::chrono::steady_clock::now().time_since_epoch())
186
13.6k
            .count();
187
13.6k
}
188
189
} // namespace
190
191
BlockFileCache::BlockFileCache(const std::string& cache_base_path,
192
                               const FileCacheSettings& cache_settings)
193
252
        : _cache_base_path(cache_base_path),
194
252
          _capacity(cache_settings.capacity),
195
252
          _max_file_block_size(cache_settings.max_file_block_size) {
196
252
    _cur_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(_cache_base_path.c_str(),
197
252
                                                                     "file_cache_cache_size", 0);
198
252
    _cache_capacity_metrics = std::make_shared<bvar::Status<size_t>>(
199
252
            _cache_base_path.c_str(), "file_cache_capacity", _capacity);
200
252
    _cur_ttl_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
201
252
            _cache_base_path.c_str(), "file_cache_ttl_cache_size", 0);
202
252
    _cur_normal_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
203
252
            _cache_base_path.c_str(), "file_cache_normal_queue_element_count", 0);
204
252
    _cur_ttl_cache_lru_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
205
252
            _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_size", 0);
206
252
    _cur_ttl_cache_lru_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
207
252
            _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_element_count", 0);
208
252
    _cur_normal_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
209
252
            _cache_base_path.c_str(), "file_cache_normal_queue_cache_size", 0);
210
252
    _cur_index_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
211
252
            _cache_base_path.c_str(), "file_cache_index_queue_element_count", 0);
212
252
    _cur_index_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
213
252
            _cache_base_path.c_str(), "file_cache_index_queue_cache_size", 0);
214
252
    _cur_disposable_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
215
252
            _cache_base_path.c_str(), "file_cache_disposable_queue_element_count", 0);
216
252
    _cur_disposable_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
217
252
            _cache_base_path.c_str(), "file_cache_disposable_queue_cache_size", 0);
218
219
252
    _queue_evict_size_metrics[FileCacheType::DISPOSABLE] = std::make_shared<bvar::Adder<size_t>>(
220
252
            _cache_base_path.c_str(), "file_cache_disposable_queue_evict_size");
221
252
    _queue_evict_size_metrics[FileCacheType::NORMAL] = std::make_shared<bvar::Adder<size_t>>(
222
252
            _cache_base_path.c_str(), "file_cache_normal_queue_evict_size");
223
252
    _queue_evict_size_metrics[FileCacheType::INDEX] = std::make_shared<bvar::Adder<size_t>>(
224
252
            _cache_base_path.c_str(), "file_cache_index_queue_evict_size");
225
252
    _queue_evict_size_metrics[FileCacheType::TTL] = std::make_shared<bvar::Adder<size_t>>(
226
252
            _cache_base_path.c_str(), "file_cache_ttl_cache_evict_size");
227
252
    _total_evict_size_metrics = std::make_shared<bvar::Adder<size_t>>(
228
252
            _cache_base_path.c_str(), "file_cache_total_evict_size");
229
252
    _total_read_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
230
252
                                                                     "file_cache_total_read_size");
231
252
    _total_hit_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
232
252
                                                                    "file_cache_total_hit_size");
233
252
    _gc_evict_bytes_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
234
252
                                                                    "file_cache_gc_evict_bytes");
235
252
    _gc_evict_count_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
236
252
                                                                    "file_cache_gc_evict_count");
237
238
252
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
239
252
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
240
252
                                                  "file_cache_evict_by_time_disposable_to_normal");
241
252
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
242
252
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
243
252
                                                  "file_cache_evict_by_time_disposable_to_index");
244
252
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
245
252
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
246
252
                                                  "file_cache_evict_by_time_disposable_to_ttl");
247
252
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
248
252
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
249
252
                                                  "file_cache_evict_by_time_normal_to_disposable");
250
252
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
251
252
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
252
252
                                                  "file_cache_evict_by_time_normal_to_index");
253
252
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
254
252
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
255
252
                                                  "file_cache_evict_by_time_normal_to_ttl");
256
252
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
257
252
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
258
252
                                                  "file_cache_evict_by_time_index_to_disposable");
259
252
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
260
252
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
261
252
                                                  "file_cache_evict_by_time_index_to_normal");
262
252
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
263
252
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
264
252
                                                  "file_cache_evict_by_time_index_to_ttl");
265
252
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
266
252
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
267
252
                                                  "file_cache_evict_by_time_ttl_to_disposable");
268
252
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
269
252
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
270
252
                                                  "file_cache_evict_by_time_ttl_to_normal");
271
252
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
272
252
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
273
252
                                                  "file_cache_evict_by_time_ttl_to_index");
274
275
252
    _evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE] =
276
252
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
277
252
                                                  "file_cache_evict_by_self_lru_disposable");
278
252
    _evict_by_self_lru_metrics_matrix[FileCacheType::NORMAL] =
279
252
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
280
252
                                                  "file_cache_evict_by_self_lru_normal");
281
252
    _evict_by_self_lru_metrics_matrix[FileCacheType::INDEX] = std::make_shared<bvar::Adder<size_t>>(
282
252
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_index");
283
252
    _evict_by_self_lru_metrics_matrix[FileCacheType::TTL] = std::make_shared<bvar::Adder<size_t>>(
284
252
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_ttl");
285
286
252
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
287
252
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
288
252
                                                  "file_cache_evict_by_size_disposable_to_normal");
289
252
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
290
252
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
291
252
                                                  "file_cache_evict_by_size_disposable_to_index");
292
252
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
293
252
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
294
252
                                                  "file_cache_evict_by_size_disposable_to_ttl");
295
252
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
296
252
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
297
252
                                                  "file_cache_evict_by_size_normal_to_disposable");
298
252
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
299
252
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
300
252
                                                  "file_cache_evict_by_size_normal_to_index");
301
252
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
302
252
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
303
252
                                                  "file_cache_evict_by_size_normal_to_ttl");
304
252
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
305
252
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
306
252
                                                  "file_cache_evict_by_size_index_to_disposable");
307
252
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
308
252
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
309
252
                                                  "file_cache_evict_by_size_index_to_normal");
310
252
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
311
252
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
312
252
                                                  "file_cache_evict_by_size_index_to_ttl");
313
252
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
314
252
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
315
252
                                                  "file_cache_evict_by_size_ttl_to_disposable");
316
252
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
317
252
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
318
252
                                                  "file_cache_evict_by_size_ttl_to_normal");
319
252
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
320
252
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
321
252
                                                  "file_cache_evict_by_size_ttl_to_index");
322
323
252
    _evict_by_try_release = std::make_shared<bvar::Adder<size_t>>(
324
252
            _cache_base_path.c_str(), "file_cache_evict_by_try_release");
325
326
252
    _num_read_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
327
252
                                                             "file_cache_num_read_blocks");
328
252
    _num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
329
252
                                                            "file_cache_num_hit_blocks");
330
252
    _num_removed_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
331
252
                                                                "file_cache_num_removed_blocks");
332
333
252
    _no_warmup_num_read_blocks = std::make_shared<bvar::Adder<size_t>>(
334
252
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks");
335
252
    _no_warmup_num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(
336
252
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks");
337
338
#ifndef BE_TEST
339
    _num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
340
            _cache_base_path.c_str(), "file_cache_num_hit_blocks_5m", _num_hit_blocks.get(), 300);
341
    _num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
342
            _cache_base_path.c_str(), "file_cache_num_read_blocks_5m", _num_read_blocks.get(), 300);
343
    _num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
344
            _cache_base_path.c_str(), "file_cache_num_hit_blocks_1h", _num_hit_blocks.get(), 3600);
345
    _num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
346
            _cache_base_path.c_str(), "file_cache_num_read_blocks_1h", _num_read_blocks.get(),
347
            3600);
348
    _no_warmup_num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
349
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_5m",
350
            _no_warmup_num_hit_blocks.get(), 300);
351
    _no_warmup_num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
352
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_5m",
353
            _no_warmup_num_read_blocks.get(), 300);
354
    _no_warmup_num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
355
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_1h",
356
            _no_warmup_num_hit_blocks.get(), 3600);
357
    _no_warmup_num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
358
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_1h",
359
            _no_warmup_num_read_blocks.get(), 3600);
360
#endif
361
362
252
    _hit_ratio = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
363
252
                                                        "file_cache_hit_ratio", 0.0);
364
252
    _hit_ratio_5m = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
365
252
                                                           "file_cache_hit_ratio_5m", 0.0);
366
252
    _hit_ratio_1h = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
367
252
                                                           "file_cache_hit_ratio_1h", 0.0);
368
369
252
    _no_warmup_hit_ratio = std::make_shared<bvar::Status<double>>(
370
252
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio", 0.0);
371
252
    _no_warmup_hit_ratio_5m = std::make_shared<bvar::Status<double>>(
372
252
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_5m", 0.0);
373
252
    _no_warmup_hit_ratio_1h = std::make_shared<bvar::Status<double>>(
374
252
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_1h", 0.0);
375
376
252
    _disk_limit_mode_metrics = std::make_shared<bvar::Status<size_t>>(
377
252
            _cache_base_path.c_str(), "file_cache_disk_limit_mode", 0);
378
252
    _need_evict_cache_in_advance_metrics = std::make_shared<bvar::Status<size_t>>(
379
252
            _cache_base_path.c_str(), "file_cache_need_evict_cache_in_advance", 0);
380
252
    _meta_store_write_queue_size_metrics = std::make_shared<bvar::Status<size_t>>(
381
252
            _cache_base_path.c_str(), "file_cache_meta_store_write_queue_size", 0);
382
383
252
    _cache_lock_wait_time_us = std::make_shared<bvar::LatencyRecorder>(
384
252
            _cache_base_path.c_str(), "file_cache_cache_lock_wait_time_us");
385
252
    _get_or_set_latency_us = std::make_shared<bvar::LatencyRecorder>(
386
252
            _cache_base_path.c_str(), "file_cache_get_or_set_latency_us");
387
252
    _storage_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
388
252
            _cache_base_path.c_str(), "file_cache_storage_sync_remove_latency_us");
389
252
    _storage_retry_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
390
252
            _cache_base_path.c_str(), "file_cache_storage_retry_sync_remove_latency_us");
391
252
    _storage_async_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
392
252
            _cache_base_path.c_str(), "file_cache_storage_async_remove_latency_us");
393
252
    _evict_in_advance_latency_us = std::make_shared<bvar::LatencyRecorder>(
394
252
            _cache_base_path.c_str(), "file_cache_evict_in_advance_latency_us");
395
252
    _lru_dump_latency_us = std::make_shared<bvar::LatencyRecorder>(
396
252
            _cache_base_path.c_str(), "file_cache_lru_dump_latency_us");
397
252
    _recycle_keys_length_recorder = std::make_shared<bvar::LatencyRecorder>(
398
252
            _cache_base_path.c_str(), "file_cache_recycle_keys_length");
399
252
    _need_update_lru_blocks_length_recorder = std::make_shared<bvar::LatencyRecorder>(
400
252
            _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_length");
401
252
    _need_update_lru_blocks_produce_metrics = std::make_shared<bvar::Adder<size_t>>(
402
252
            _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_produce");
403
252
    _need_update_lru_blocks_consume_metrics = std::make_shared<bvar::Adder<size_t>>(
404
252
            _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_consume");
405
252
    _update_lru_blocks_latency_us = std::make_shared<bvar::LatencyRecorder>(
406
252
            _cache_base_path.c_str(), "file_cache_update_lru_blocks_latency_us");
407
252
    _ttl_gc_latency_us = std::make_shared<bvar::LatencyRecorder>(_cache_base_path.c_str(),
408
252
                                                                 "file_cache_ttl_gc_latency_us");
409
252
    _shadow_queue_levenshtein_distance = std::make_shared<bvar::LatencyRecorder>(
410
252
            _cache_base_path.c_str(), "file_cache_shadow_queue_levenshtein_distance");
411
252
    for (FileCacheType type : {FileCacheType::DISPOSABLE, FileCacheType::NORMAL,
412
1.00k
                               FileCacheType::INDEX, FileCacheType::TTL}) {
413
1.00k
        size_t idx = file_cache_type_index(type);
414
1.00k
        std::string metric_prefix =
415
1.00k
                "file_cache_lru_recorder_" + cache_type_to_string(type) + "_record_queue";
416
1.00k
        _lru_recorder_queue_length_recorder[idx] = std::make_shared<bvar::LatencyRecorder>(
417
1.00k
                _cache_base_path.c_str(), metric_prefix + "_length");
418
1.00k
        _lru_recorder_queue_produce_metrics[idx] = std::make_shared<bvar::Adder<size_t>>(
419
1.00k
                _cache_base_path.c_str(), metric_prefix + "_produce");
420
1.00k
        _lru_recorder_queue_consume_metrics[idx] = std::make_shared<bvar::Adder<size_t>>(
421
1.00k
                _cache_base_path.c_str(), metric_prefix + "_consume");
422
1.00k
        _lru_recorder_shadow_queue_element_count_metrics[idx] =
423
1.00k
                std::make_shared<bvar::Status<size_t>>(_cache_base_path.c_str(),
424
1.00k
                                                       "file_cache_lru_recorder_" +
425
1.00k
                                                               cache_type_to_string(type) +
426
1.00k
                                                               "_shadow_queue_element_count",
427
1.00k
                                                       0);
428
1.00k
    }
429
252
    _lru_recorder_log_replay_idle_metrics = std::make_shared<bvar::Adder<size_t>>(
430
252
            _cache_base_path.c_str(), "file_cache_lru_recorder_log_replay_idle");
431
432
252
    _disposable_queue = LRUQueue(cache_settings.disposable_queue_size,
433
252
                                 cache_settings.disposable_queue_elements, 60 * 60);
434
252
    _index_queue = LRUQueue(cache_settings.index_queue_size, cache_settings.index_queue_elements,
435
252
                            7 * 24 * 60 * 60);
436
252
    _normal_queue = LRUQueue(cache_settings.query_queue_size, cache_settings.query_queue_elements,
437
252
                             24 * 60 * 60);
438
252
    _ttl_queue = LRUQueue(cache_settings.ttl_queue_size, cache_settings.ttl_queue_elements,
439
252
                          std::numeric_limits<int>::max());
440
441
252
    _lru_recorder = std::make_unique<LRUQueueRecorder>(this);
442
252
    _lru_dumper = std::make_unique<CacheLRUDumper>(this, _lru_recorder.get());
443
252
    if (cache_settings.storage == "memory") {
444
33
        _storage = std::make_unique<MemFileCacheStorage>();
445
33
        _cache_base_path = "memory";
446
219
    } else {
447
219
        _storage = std::make_unique<FSFileCacheStorage>();
448
219
    }
449
450
252
    LOG(INFO) << "file cache path= " << _cache_base_path << " " << cache_settings.to_string();
451
252
}
452
453
14.6k
UInt128Wrapper BlockFileCache::hash(const std::string& path) {
454
14.6k
    uint128_t value;
455
14.6k
    sip_hash128(path.data(), path.size(), reinterpret_cast<char*>(&value));
456
14.6k
    return UInt128Wrapper(value);
457
14.6k
}
458
459
BlockFileCache::QueryFileCacheContextHolderPtr BlockFileCache::get_query_context_holder(
460
8
        const TUniqueId& query_id, int file_cache_query_limit_percent) {
461
8
    SCOPED_CACHE_LOCK(_mutex, this);
462
8
    if (!config::enable_file_cache_query_limit) {
463
1
        return {};
464
1
    }
465
466
    /// if enable_filesystem_query_cache_limit is true,
467
    /// we create context query for current query.
468
7
    auto context = get_or_set_query_context(query_id, cache_lock, file_cache_query_limit_percent);
469
7
    return std::make_unique<QueryFileCacheContextHolder>(query_id, this, context);
470
8
}
471
472
BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_query_context(
473
3.40k
        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock) {
474
3.40k
    auto query_iter = _query_map.find(query_id);
475
3.40k
    return (query_iter == _query_map.end()) ? nullptr : query_iter->second;
476
3.40k
}
477
478
6
void BlockFileCache::remove_query_context(const TUniqueId& query_id) {
479
6
    SCOPED_CACHE_LOCK(_mutex, this);
480
6
    const auto& query_iter = _query_map.find(query_id);
481
482
6
    if (query_iter != _query_map.end() && query_iter->second.use_count() <= 1) {
483
5
        _query_map.erase(query_iter);
484
5
    }
485
6
}
486
487
BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_or_set_query_context(
488
        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock,
489
7
        int file_cache_query_limit_percent) {
490
7
    if (query_id.lo == 0 && query_id.hi == 0) {
491
1
        return nullptr;
492
1
    }
493
494
6
    auto context = get_query_context(query_id, cache_lock);
495
6
    if (context) {
496
1
        return context;
497
1
    }
498
499
5
    size_t file_cache_query_limit_size = _capacity * file_cache_query_limit_percent / 100;
500
5
    if (file_cache_query_limit_size < 268435456) {
501
5
        LOG(WARNING) << "The user-set file cache query limit (" << file_cache_query_limit_size
502
5
                     << " bytes) is less than the 256MB recommended minimum. "
503
5
                     << "Consider increasing the session variable 'file_cache_query_limit_percent'"
504
5
                     << " from its current value " << file_cache_query_limit_percent << "%.";
505
5
    }
506
5
    auto query_context = std::make_shared<QueryFileCacheContext>(file_cache_query_limit_size);
507
5
    auto query_iter = _query_map.emplace(query_id, query_context).first;
508
5
    return query_iter->second;
509
6
}
510
511
void BlockFileCache::QueryFileCacheContext::remove(const UInt128Wrapper& hash, size_t offset,
512
20
                                                   std::lock_guard<std::mutex>& cache_lock) {
513
20
    auto pair = std::make_pair(hash, offset);
514
20
    auto record = records.find(pair);
515
20
    DCHECK(record != records.end());
516
20
    auto iter = record->second;
517
20
    records.erase(pair);
518
20
    lru_queue.remove(iter, cache_lock);
519
20
}
520
521
void BlockFileCache::QueryFileCacheContext::reserve(const UInt128Wrapper& hash, size_t offset,
522
                                                    size_t size,
523
30
                                                    std::lock_guard<std::mutex>& cache_lock) {
524
30
    auto pair = std::make_pair(hash, offset);
525
30
    if (records.find(pair) == records.end()) {
526
29
        auto queue_iter = lru_queue.add(hash, offset, size, cache_lock);
527
29
        records.insert({pair, queue_iter});
528
29
    }
529
30
}
530
531
223
Status BlockFileCache::initialize() {
532
223
    SCOPED_CACHE_LOCK(_mutex, this);
533
223
    return initialize_unlocked(cache_lock);
534
223
}
535
536
223
Status BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lock) {
537
223
    DCHECK(!_is_initialized);
538
223
    _is_initialized = true;
539
223
    if (config::file_cache_background_lru_dump_tail_record_num > 0) {
540
        // requirements:
541
        // 1. restored data should not overwrite the last dump
542
        // 2. restore should happen before load and async load
543
        // 3. all queues should be restored sequencially to avoid conflict
544
        // TODO(zhengyu): we can parralize them but will increase complexity, so lets check the time cost
545
        // to see if any improvement is a necessary
546
223
        restore_lru_queues_from_disk(cache_lock);
547
223
    }
548
223
    RETURN_IF_ERROR(_storage->init(this));
549
550
223
    if (auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get())) {
551
192
        if (auto* meta_store = fs_storage->get_meta_store()) {
552
192
            _ttl_mgr = std::make_unique<BlockFileCacheTtlMgr>(this, meta_store);
553
192
        }
554
192
    }
555
556
223
    _cache_background_monitor_thread = std::thread(&BlockFileCache::run_background_monitor, this);
557
223
    _cache_background_gc_thread = std::thread(&BlockFileCache::run_background_gc, this);
558
223
    _cache_background_evict_in_advance_thread =
559
223
            std::thread(&BlockFileCache::run_background_evict_in_advance, this);
560
223
    _cache_background_block_lru_update_thread =
561
223
            std::thread(&BlockFileCache::run_background_block_lru_update, this);
562
563
    // Initialize LRU dump thread and restore queues
564
223
    _cache_background_lru_dump_thread = std::thread(&BlockFileCache::run_background_lru_dump, this);
565
223
    _cache_background_lru_log_replay_thread =
566
223
            std::thread(&BlockFileCache::run_background_lru_log_replay, this);
567
568
223
    return Status::OK();
569
223
}
570
571
void BlockFileCache::update_block_lru(FileBlockSPtr block,
572
7
                                      std::lock_guard<std::mutex>& cache_lock) {
573
7
    if (!block) {
574
1
        return;
575
1
    }
576
577
6
    FileBlockCell* cell = get_cell(block->get_hash_value(), block->offset(), cache_lock);
578
6
    if (!cell || cell->file_block.get() != block.get()) {
579
1
        return;
580
1
    }
581
582
5
    if (cell->queue_iterator) {
583
5
        auto& queue = get_queue(block->cache_type());
584
5
        queue.move_to_end(*cell->queue_iterator, cache_lock);
585
5
        _lru_recorder->record_queue_event(block->cache_type(), CacheLRULogType::MOVETOBACK,
586
5
                                          block->_key.hash, block->_key.offset,
587
5
                                          block->_block_range.size());
588
5
    }
589
5
    cell->update_atime();
590
5
}
591
592
void BlockFileCache::use_cell(const FileBlockCell& cell, FileBlocks* result, bool move_iter_flag,
593
712k
                              std::lock_guard<std::mutex>& cache_lock) {
594
712k
    if (result) {
595
712k
        result->push_back(cell.file_block);
596
712k
    }
597
598
712k
    auto& queue = get_queue(cell.file_block->cache_type());
599
    /// Move to the end of the queue. The iterator remains valid.
600
712k
    if (!config::enable_file_cache_async_touch_on_get_or_set && cell.queue_iterator &&
601
712k
        move_iter_flag) {
602
520k
        queue.move_to_end(*cell.queue_iterator, cache_lock);
603
520k
        _lru_recorder->record_queue_event(cell.file_block->cache_type(),
604
520k
                                          CacheLRULogType::MOVETOBACK, cell.file_block->_key.hash,
605
520k
                                          cell.file_block->_key.offset, cell.size());
606
520k
    }
607
608
712k
    cell.update_atime();
609
712k
}
610
611
template <class T>
612
    requires IsXLock<T>
613
FileBlockCell* BlockFileCache::get_cell(const UInt128Wrapper& hash, size_t offset,
614
111k
                                        T& /* cache_lock */) {
615
111k
    auto it = _files.find(hash);
616
111k
    if (it == _files.end()) {
617
3
        return nullptr;
618
3
    }
619
620
111k
    auto& offsets = it->second;
621
111k
    auto cell_it = offsets.find(offset);
622
111k
    if (cell_it == offsets.end()) {
623
3
        return nullptr;
624
3
    }
625
626
111k
    return &cell_it->second;
627
111k
}
628
629
904k
bool BlockFileCache::need_to_move(FileCacheType cell_type, FileCacheType query_type) const {
630
904k
    return query_type != FileCacheType::DISPOSABLE && cell_type != FileCacheType::DISPOSABLE;
631
904k
}
632
633
FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheContext& context,
634
                                    const FileBlock::Range& range,
635
725k
                                    std::lock_guard<std::mutex>& cache_lock) {
636
    /// Given range = [left, right] and non-overlapping ordered set of file blocks,
637
    /// find list [block1, ..., blockN] of blocks which intersect with given range.
638
725k
    auto it = _files.find(hash);
639
725k
    if (it == _files.end()) {
640
7.17k
        if (_async_open_done) {
641
7.17k
            return {};
642
7.17k
        }
643
1
        FileCacheKey key;
644
1
        key.hash = hash;
645
1
        key.meta.type = context.cache_type;
646
1
        key.meta.expiration_time = context.expiration_time;
647
1
        key.meta.tablet_id = context.tablet_id;
648
1
        _storage->load_blocks_directly_unlocked(this, key, cache_lock);
649
650
1
        it = _files.find(hash);
651
1
        if (it == _files.end()) [[unlikely]] {
652
1
            return {};
653
1
        }
654
1
    }
655
656
718k
    auto& file_blocks = it->second;
657
718k
    if (file_blocks.empty()) {
658
0
        LOG(WARNING) << "file_blocks is empty for hash=" << hash.to_string()
659
0
                     << " cache type=" << context.cache_type
660
0
                     << " cache expiration time=" << context.expiration_time
661
0
                     << " cache range=" << range.left << " " << range.right
662
0
                     << " query id=" << context.query_id;
663
0
        DCHECK(false);
664
0
        _files.erase(hash);
665
0
        return {};
666
0
    }
667
668
718k
    FileBlocks result;
669
718k
    auto block_it = file_blocks.lower_bound(range.left);
670
718k
    if (block_it == file_blocks.end()) {
671
        /// N - last cached block for given file hash, block{N}.offset < range.left:
672
        ///   block{N}                       block{N}
673
        /// [________                         [_______]
674
        ///     [__________]         OR                  [________]
675
        ///     ^                                        ^
676
        ///     range.left                               range.left
677
678
6.20k
        const auto& cell = file_blocks.rbegin()->second;
679
6.20k
        if (cell.file_block->range().right < range.left) {
680
6.18k
            return {};
681
6.18k
        }
682
683
14
        use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
684
14
                 cache_lock);
685
712k
    } else { /// block_it <-- segmment{k}
686
712k
        if (block_it != file_blocks.begin()) {
687
220
            const auto& prev_cell = std::prev(block_it)->second;
688
220
            const auto& prev_cell_range = prev_cell.file_block->range();
689
690
220
            if (range.left <= prev_cell_range.right) {
691
                ///   block{k-1}  block{k}
692
                ///   [________]   [_____
693
                ///       [___________
694
                ///       ^
695
                ///       range.left
696
697
137
                use_cell(prev_cell, &result,
698
137
                         need_to_move(prev_cell.file_block->cache_type(), context.cache_type),
699
137
                         cache_lock);
700
137
            }
701
220
        }
702
703
        ///  block{k} ...       block{k-1}  block{k}                      block{k}
704
        ///  [______              [______]     [____                        [________
705
        ///  [_________     OR              [________      OR    [______]   ^
706
        ///  ^                              ^                           ^   block{k}.offset
707
        ///  range.left                     range.left                  range.right
708
709
1.42M
        while (block_it != file_blocks.end()) {
710
712k
            const auto& cell = block_it->second;
711
712k
            if (range.right < cell.file_block->range().left) {
712
176
                break;
713
176
            }
714
715
712k
            use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
716
712k
                     cache_lock);
717
712k
            ++block_it;
718
712k
        }
719
712k
    }
720
721
712k
    return result;
722
718k
}
723
724
193k
void BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) {
725
193k
    int64_t queue_limit = config::file_cache_background_block_lru_update_queue_max_size;
726
193k
    size_t max_queue_size = queue_limit <= 0 ? 0 : static_cast<size_t>(queue_limit);
727
193k
    if (_need_update_lru_blocks.insert(std::move(block), max_queue_size)) {
728
1.55k
        *_need_update_lru_blocks_produce_metrics << 1;
729
1.55k
        *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
730
1.55k
    }
731
193k
}
732
733
4
std::string BlockFileCache::clear_file_cache_async() {
734
4
    return clear_file_cache_impl(false);
735
4
}
736
737
20
std::string BlockFileCache::clear_file_cache_sync() {
738
20
    return clear_file_cache_impl(true);
739
20
}
740
741
24
std::string BlockFileCache::clear_file_cache_impl(bool sync_remove) {
742
24
    const char* action = sync_remove ? "clear_file_cache_sync" : "clear_file_cache_async";
743
24
    LOG(INFO) << "start " << action << ", path=" << _cache_base_path;
744
24
    _lru_dumper->remove_lru_dump_files();
745
24
    int64_t num_cells_all = 0;
746
24
    int64_t num_cells_to_delete = 0;
747
24
    int64_t num_cells_wait_recycle = 0;
748
24
    int64_t num_files_all = 0;
749
24
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::clear_file_cache_async");
750
24
    {
751
24
        SCOPED_CACHE_LOCK(_mutex, this);
752
753
24
        std::vector<FileBlockCell*> deleting_cells;
754
5.13k
        for (auto& [_, offset_to_cell] : _files) {
755
5.13k
            ++num_files_all;
756
105k
            for (auto& [_1, cell] : offset_to_cell) {
757
105k
                ++num_cells_all;
758
105k
                deleting_cells.push_back(&cell);
759
105k
            }
760
5.13k
        }
761
762
        // Do not erase while walking _files above: remove() may erase the current map element.
763
        //
764
        // sync_remove only changes how already releasable DOWNLOADED blocks are deleted from
765
        // storage. Busy blocks keep the existing holder lifecycle: mark them deleting and leave
766
        // them in _files until the last holder releases them.
767
105k
        for (auto& cell : deleting_cells) {
768
105k
            if (!cell->releasable()) {
769
103
                LOG(INFO) << "cell is not releasable, hash="
770
103
                          << " offset=" << cell->file_block->offset();
771
103
                cell->file_block->set_deleting();
772
103
                ++num_cells_wait_recycle;
773
103
                continue;
774
103
            }
775
105k
            FileBlockSPtr file_block = cell->file_block;
776
105k
            if (file_block) {
777
105k
                std::lock_guard block_lock(file_block->_mutex);
778
105k
                remove(file_block, cache_lock, block_lock, sync_remove);
779
105k
                ++num_cells_to_delete;
780
105k
            }
781
105k
        }
782
24
        clear_need_update_lru_blocks();
783
24
    }
784
785
24
    std::stringstream ss;
786
24
    ss << "finish " << action << ", path=" << _cache_base_path << " sync_remove=" << sync_remove
787
24
       << " num_files_all=" << num_files_all << " num_cells_all=" << num_cells_all
788
24
       << " num_cells_to_delete=" << num_cells_to_delete
789
24
       << " num_cells_wait_recycle=" << num_cells_wait_recycle;
790
24
    auto msg = ss.str();
791
24
    LOG(INFO) << msg;
792
24
    _lru_dumper->remove_lru_dump_files();
793
24
    return msg;
794
24
}
795
796
FileBlocks BlockFileCache::split_range_into_cells(const UInt128Wrapper& hash,
797
                                                  const CacheContext& context, size_t offset,
798
                                                  size_t size, FileBlock::State state,
799
13.5k
                                                  std::lock_guard<std::mutex>& cache_lock) {
800
13.5k
    DCHECK(size > 0);
801
802
13.5k
    auto current_pos = offset;
803
13.5k
    auto end_pos_non_included = offset + size;
804
805
13.5k
    size_t current_size = 0;
806
13.5k
    size_t remaining_size = size;
807
808
13.5k
    FileBlocks file_blocks;
809
27.2k
    while (current_pos < end_pos_non_included) {
810
13.6k
        current_size = std::min(remaining_size, _max_file_block_size);
811
13.6k
        remaining_size -= current_size;
812
13.6k
        state = try_reserve(hash, context, current_pos, current_size, cache_lock)
813
13.6k
                        ? state
814
13.6k
                        : FileBlock::State::SKIP_CACHE;
815
13.6k
        if (state == FileBlock::State::SKIP_CACHE) [[unlikely]] {
816
62
            FileCacheKey key;
817
62
            key.hash = hash;
818
62
            key.offset = current_pos;
819
62
            key.meta.type = context.cache_type;
820
62
            key.meta.expiration_time = context.expiration_time;
821
62
            key.meta.tablet_id = context.tablet_id;
822
62
            auto file_block = std::make_shared<FileBlock>(key, current_size, this,
823
62
                                                          FileBlock::State::SKIP_CACHE);
824
62
            file_blocks.push_back(std::move(file_block));
825
13.6k
        } else {
826
13.6k
            auto* cell = add_cell(hash, context, current_pos, current_size, state, cache_lock);
827
13.6k
            if (cell) {
828
13.6k
                file_blocks.push_back(cell->file_block);
829
13.6k
                if (!context.is_cold_data) {
830
13.6k
                    cell->update_atime();
831
13.6k
                }
832
13.6k
            }
833
13.6k
            if (_ttl_mgr && context.tablet_id != 0) {
834
1.05k
                _ttl_mgr->register_tablet_id(context.tablet_id);
835
1.05k
            }
836
13.6k
        }
837
838
13.6k
        current_pos += current_size;
839
13.6k
    }
840
841
13.5k
    DCHECK(file_blocks.empty() || offset + size - 1 == file_blocks.back()->range().right);
842
13.5k
    return file_blocks;
843
13.5k
}
844
845
void BlockFileCache::fill_holes_with_empty_file_blocks(FileBlocks& file_blocks,
846
                                                       const UInt128Wrapper& hash,
847
                                                       const CacheContext& context,
848
                                                       const FileBlock::Range& range,
849
712k
                                                       std::lock_guard<std::mutex>& cache_lock) {
850
    /// There are blocks [block1, ..., blockN]
851
    /// (non-overlapping, non-empty, ascending-ordered) which (maybe partially)
852
    /// intersect with given range.
853
854
    /// It can have holes:
855
    /// [____________________]         -- requested range
856
    ///     [____]  [_]   [_________]  -- intersecting cache [block1, ..., blockN]
857
    ///
858
    /// For each such hole create a cell with file block state EMPTY.
859
860
712k
    auto it = file_blocks.begin();
861
712k
    auto block_range = (*it)->range();
862
863
712k
    size_t current_pos = 0;
864
712k
    if (block_range.left < range.left) {
865
        ///    [_______     -- requested range
866
        /// [_______
867
        /// ^
868
        /// block1
869
870
151
        current_pos = block_range.right + 1;
871
151
        ++it;
872
712k
    } else {
873
712k
        current_pos = range.left;
874
712k
    }
875
876
1.42M
    while (current_pos <= range.right && it != file_blocks.end()) {
877
712k
        block_range = (*it)->range();
878
879
712k
        if (current_pos == block_range.left) {
880
712k
            current_pos = block_range.right + 1;
881
712k
            ++it;
882
712k
            continue;
883
712k
        }
884
885
712k
        DCHECK(current_pos < block_range.left);
886
887
117
        auto hole_size = block_range.left - current_pos;
888
889
117
        file_blocks.splice(it, split_range_into_cells(hash, context, current_pos, hole_size,
890
117
                                                      FileBlock::State::EMPTY, cache_lock));
891
892
117
        current_pos = block_range.right + 1;
893
117
        ++it;
894
117
    }
895
896
712k
    if (current_pos <= range.right) {
897
        ///   ________]     -- requested range
898
        ///   _____]
899
        ///        ^
900
        /// blockN
901
902
94
        auto hole_size = range.right - current_pos + 1;
903
904
94
        file_blocks.splice(file_blocks.end(),
905
94
                           split_range_into_cells(hash, context, current_pos, hole_size,
906
94
                                                  FileBlock::State::EMPTY, cache_lock));
907
94
    }
908
712k
}
909
910
FileBlocksHolder BlockFileCache::get_or_set(const UInt128Wrapper& hash, size_t offset, size_t size,
911
725k
                                            CacheContext& context) {
912
725k
    FileBlock::Range range(offset, offset + size - 1);
913
914
725k
    ReadStatistics* stats = context.stats;
915
725k
    DCHECK(stats != nullptr);
916
725k
    MonotonicStopWatch sw;
917
725k
    sw.start();
918
725k
    FileBlocks file_blocks;
919
725k
    std::vector<FileBlockSPtr> need_update_lru_blocks;
920
725k
    const bool async_touch_on_get_or_set = config::enable_file_cache_async_touch_on_get_or_set;
921
725k
    int64_t duration = 0;
922
725k
    {
923
725k
        ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->increment();
924
725k
        std::lock_guard cache_lock(_mutex);
925
725k
        ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->decrement();
926
725k
        stats->lock_wait_timer += sw.elapsed_time();
927
725k
        SCOPED_RAW_TIMER(&duration);
928
        /// Get all blocks which intersect with the given range.
929
725k
        {
930
725k
            SCOPED_RAW_TIMER(&stats->get_timer);
931
725k
            file_blocks = get_impl(hash, context, range, cache_lock);
932
725k
        }
933
934
725k
        if (file_blocks.empty()) {
935
13.3k
            SCOPED_RAW_TIMER(&stats->set_timer);
936
13.3k
            file_blocks = split_range_into_cells(hash, context, offset, size,
937
13.3k
                                                 FileBlock::State::EMPTY, cache_lock);
938
712k
        } else {
939
712k
            SCOPED_RAW_TIMER(&stats->set_timer);
940
712k
            fill_holes_with_empty_file_blocks(file_blocks, hash, context, range, cache_lock);
941
712k
        }
942
725k
        DCHECK(!file_blocks.empty());
943
725k
        *_num_read_blocks << file_blocks.size();
944
725k
        if (!context.is_warmup) {
945
725k
            *_no_warmup_num_read_blocks << file_blocks.size();
946
725k
        }
947
725k
        if (async_touch_on_get_or_set) {
948
193k
            need_update_lru_blocks.reserve(file_blocks.size());
949
193k
        }
950
726k
        for (auto& block : file_blocks) {
951
726k
            size_t block_size = block->range().size();
952
726k
            *_total_read_size_metrics << block_size;
953
726k
            if (block->state_unsafe() == FileBlock::State::DOWNLOADED) {
954
712k
                *_num_hit_blocks << 1;
955
712k
                *_total_hit_size_metrics << block_size;
956
712k
                if (!context.is_warmup) {
957
712k
                    *_no_warmup_num_hit_blocks << 1;
958
712k
                }
959
712k
                if (async_touch_on_get_or_set &&
960
712k
                    need_to_move(block->cache_type(), context.cache_type)) {
961
192k
                    need_update_lru_blocks.emplace_back(block);
962
192k
                }
963
712k
            }
964
726k
        }
965
725k
    }
966
967
725k
    if (async_touch_on_get_or_set) {
968
193k
        for (auto& block : need_update_lru_blocks) {
969
192k
            add_need_update_lru_block(std::move(block));
970
192k
        }
971
193k
    }
972
973
725k
    *_get_or_set_latency_us << (duration / 1000);
974
725k
    return FileBlocksHolder(std::move(file_blocks));
975
725k
}
976
977
FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& hash, const CacheContext& context,
978
                                        size_t offset, size_t size, FileBlock::State state,
979
113k
                                        std::lock_guard<std::mutex>& cache_lock) {
980
    /// Create a file block cell and put it in `files` map by [hash][offset].
981
113k
    if (size == 0) {
982
0
        return nullptr; /// Empty files are not cached.
983
0
    }
984
985
113k
    VLOG_DEBUG << "Adding file block to cache. size=" << size << " hash=" << hash.to_string()
986
0
               << " offset=" << offset << " cache_type=" << cache_type_to_string(context.cache_type)
987
0
               << " expiration_time=" << context.expiration_time
988
0
               << " tablet_id=" << context.tablet_id;
989
990
113k
    if (size > 1024 * 1024 * 1024) {
991
0
        LOG(WARNING) << "File block size is too large for a block, reject. size=" << size
992
0
                     << " hash=" << hash.to_string() << " offset=" << offset
993
0
                     << " stack:" << get_stack_trace();
994
0
        return nullptr;
995
0
    }
996
997
113k
    auto& offsets = _files[hash];
998
113k
    auto itr = offsets.find(offset);
999
113k
    if (itr != offsets.end()) {
1000
5
        VLOG_DEBUG << "Cache already exists for hash: " << hash.to_string()
1001
0
                   << ", offset: " << offset << ", size: " << size
1002
0
                   << ".\nCurrent cache structure: " << dump_structure_unlocked(hash, cache_lock);
1003
5
        return &(itr->second);
1004
5
    }
1005
1006
113k
    FileCacheKey key;
1007
113k
    key.hash = hash;
1008
113k
    key.offset = offset;
1009
113k
    key.meta.type = context.cache_type;
1010
113k
    key.meta.expiration_time = context.expiration_time;
1011
113k
    key.meta.tablet_id = context.tablet_id;
1012
113k
    FileBlockCell cell(std::make_shared<FileBlock>(key, size, this, state), cache_lock);
1013
113k
    Status st;
1014
113k
    if (context.expiration_time == 0 && context.cache_type == FileCacheType::TTL) {
1015
0
        st = cell.file_block->change_cache_type_lock(FileCacheType::NORMAL, cache_lock);
1016
113k
    } else if (context.cache_type != FileCacheType::TTL && context.expiration_time != 0) {
1017
1
        st = cell.file_block->change_cache_type_lock(FileCacheType::TTL, cache_lock);
1018
1
    }
1019
113k
    if (!st.ok()) {
1020
0
        LOG(WARNING) << "Cannot change cache type. expiration_time=" << context.expiration_time
1021
0
                     << " cache_type=" << cache_type_to_string(context.cache_type)
1022
0
                     << " error=" << st.msg();
1023
0
    }
1024
1025
113k
    auto& queue = get_queue(cell.file_block->cache_type());
1026
113k
    cell.queue_iterator = queue.add(hash, offset, size, cache_lock);
1027
113k
    _lru_recorder->record_queue_event(cell.file_block->cache_type(), CacheLRULogType::ADD,
1028
113k
                                      cell.file_block->get_hash_value(), cell.file_block->offset(),
1029
113k
                                      cell.size());
1030
1031
113k
    if (cell.file_block->cache_type() == FileCacheType::TTL) {
1032
3.92k
        _cur_ttl_size += cell.size();
1033
3.92k
    }
1034
113k
    auto [it, _] = offsets.insert(std::make_pair(offset, std::move(cell)));
1035
113k
    _cur_cache_size += size;
1036
113k
    return &(it->second);
1037
113k
}
1038
1039
0
size_t BlockFileCache::try_release() {
1040
0
    SCOPED_CACHE_LOCK(_mutex, this);
1041
0
    std::vector<FileBlockCell*> trash;
1042
0
    for (auto& [hash, blocks] : _files) {
1043
0
        for (auto& [offset, cell] : blocks) {
1044
0
            if (cell.releasable()) {
1045
0
                trash.emplace_back(&cell);
1046
0
            } else {
1047
0
                cell.file_block->set_deleting();
1048
0
            }
1049
0
        }
1050
0
    }
1051
0
    size_t remove_size = 0;
1052
0
    for (auto& cell : trash) {
1053
0
        FileBlockSPtr file_block = cell->file_block;
1054
0
        std::lock_guard lc(cell->file_block->_mutex);
1055
0
        remove_size += file_block->range().size();
1056
0
        remove(file_block, cache_lock, lc);
1057
0
        VLOG_DEBUG << "try_release " << _cache_base_path
1058
0
                   << " hash=" << file_block->get_hash_value().to_string()
1059
0
                   << " offset=" << file_block->offset();
1060
0
    }
1061
0
    *_evict_by_try_release << remove_size;
1062
0
    LOG(INFO) << "Released " << trash.size() << " blocks in file cache " << _cache_base_path;
1063
0
    return trash.size();
1064
0
}
1065
1066
969k
LRUQueue& BlockFileCache::get_queue(FileCacheType type) {
1067
969k
    switch (type) {
1068
14.6k
    case FileCacheType::INDEX:
1069
14.6k
        return _index_queue;
1070
14.5k
    case FileCacheType::DISPOSABLE:
1071
14.5k
        return _disposable_queue;
1072
934k
    case FileCacheType::NORMAL:
1073
934k
        return _normal_queue;
1074
5.80k
    case FileCacheType::TTL:
1075
5.80k
        return _ttl_queue;
1076
0
    default:
1077
0
        DCHECK(false);
1078
969k
    }
1079
0
    return _normal_queue;
1080
969k
}
1081
1082
140
const LRUQueue& BlockFileCache::get_queue(FileCacheType type) const {
1083
140
    switch (type) {
1084
33
    case FileCacheType::INDEX:
1085
33
        return _index_queue;
1086
1
    case FileCacheType::DISPOSABLE:
1087
1
        return _disposable_queue;
1088
106
    case FileCacheType::NORMAL:
1089
106
        return _normal_queue;
1090
0
    case FileCacheType::TTL:
1091
0
        return _ttl_queue;
1092
0
    default:
1093
0
        DCHECK(false);
1094
140
    }
1095
0
    return _normal_queue;
1096
140
}
1097
1098
void BlockFileCache::remove_file_blocks(std::vector<FileBlockCell*>& to_evict,
1099
                                        std::lock_guard<std::mutex>& cache_lock, bool sync,
1100
14.9k
                                        std::string& reason) {
1101
14.9k
    auto remove_file_block_if = [&](FileBlockCell* cell) {
1102
1.36k
        FileBlockSPtr file_block = cell->file_block;
1103
1.36k
        if (file_block) {
1104
1.36k
            std::lock_guard block_lock(file_block->_mutex);
1105
1.36k
            remove(file_block, cache_lock, block_lock, sync);
1106
1.36k
            VLOG_DEBUG << "remove_file_blocks"
1107
0
                       << " hash=" << file_block->get_hash_value().to_string()
1108
0
                       << " offset=" << file_block->offset() << " reason=" << reason;
1109
1.36k
        }
1110
1.36k
    };
1111
14.9k
    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
1112
14.9k
}
1113
1114
void BlockFileCache::find_evict_candidates(LRUQueue& queue, size_t size, size_t cur_cache_size,
1115
                                           size_t& removed_size,
1116
                                           std::vector<FileBlockCell*>& to_evict,
1117
                                           std::lock_guard<std::mutex>& cache_lock,
1118
1.34k
                                           size_t& cur_removed_size, bool evict_in_advance) {
1119
3.09k
    for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1120
3.09k
        if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1121
1.27k
            break;
1122
1.27k
        }
1123
1.81k
        auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1124
1125
1.81k
        DCHECK(cell) << "Cache became inconsistent. key: " << entry_key.to_string()
1126
0
                     << ", offset: " << entry_offset;
1127
1128
1.81k
        size_t cell_size = cell->size();
1129
1.81k
        DCHECK(entry_size == cell_size);
1130
1131
1.81k
        if (cell->releasable()) {
1132
1.34k
            auto& file_block = cell->file_block;
1133
1134
1.34k
            std::lock_guard block_lock(file_block->_mutex);
1135
1.34k
            DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1136
1.34k
            to_evict.push_back(cell);
1137
1.34k
            removed_size += cell_size;
1138
1.34k
            cur_removed_size += cell_size;
1139
1.34k
        }
1140
1.81k
    }
1141
1.34k
}
1142
1143
// 1. if async load file cache not finish
1144
//     a. evict from lru queue
1145
// 2. if ttl cache
1146
//     a. evict from disposable/normal/index queue one by one
1147
// 3. if dont reach query limit or dont have query limit
1148
//     a. evict from other queue
1149
//     b. evict from current queue
1150
//         a.1 if the data belong write, then just evict cold data
1151
// 4. if reach query limit
1152
//     a. evict from query queue
1153
//     b. evict from other queue
1154
bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const CacheContext& context,
1155
                                 size_t offset, size_t size,
1156
13.6k
                                 std::lock_guard<std::mutex>& cache_lock) {
1157
13.6k
    if (!_async_open_done) {
1158
4
        return try_reserve_during_async_load(size, cache_lock);
1159
4
    }
1160
    // use this strategy in scenarios where there is insufficient disk capacity or insufficient number of inodes remaining
1161
    // directly eliminate 5 times the size of the space
1162
13.6k
    if (_disk_resource_limit_mode) {
1163
1
        size = 5 * size;
1164
1
    }
1165
1166
13.6k
    auto query_context = config::enable_file_cache_query_limit &&
1167
13.6k
                                         (context.query_id.hi != 0 || context.query_id.lo != 0)
1168
13.6k
                                 ? get_query_context(context.query_id, cache_lock)
1169
13.6k
                                 : nullptr;
1170
13.6k
    if (!query_context) {
1171
13.6k
        return try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock);
1172
13.6k
    } else if (query_context->get_cache_size(cache_lock) + size <=
1173
31
               query_context->get_max_cache_size()) {
1174
16
        return try_reserve_for_lru(hash, query_context, context, offset, size, cache_lock);
1175
16
    }
1176
15
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
1177
15
                               std::chrono::steady_clock::now().time_since_epoch())
1178
15
                               .count();
1179
15
    auto& queue = get_queue(context.cache_type);
1180
15
    size_t removed_size = 0;
1181
15
    size_t ghost_remove_size = 0;
1182
15
    size_t queue_size = queue.get_capacity(cache_lock);
1183
15
    size_t cur_cache_size = _cur_cache_size;
1184
15
    size_t query_context_cache_size = query_context->get_cache_size(cache_lock);
1185
1186
15
    std::vector<LRUQueue::Iterator> ghost;
1187
15
    std::vector<FileBlockCell*> to_evict;
1188
1189
15
    size_t max_size = queue.get_max_size();
1190
48
    auto is_overflow = [&] {
1191
48
        return _disk_resource_limit_mode ? removed_size < size
1192
48
                                         : cur_cache_size + size - removed_size > _capacity ||
1193
48
                                                   (queue_size + size - removed_size > max_size) ||
1194
48
                                                   (query_context_cache_size + size -
1195
38
                                                            (removed_size + ghost_remove_size) >
1196
38
                                                    query_context->get_max_cache_size());
1197
48
    };
1198
1199
    /// Select the cache from the LRU queue held by query for expulsion.
1200
35
    for (auto iter = query_context->queue().begin(); iter != query_context->queue().end(); iter++) {
1201
33
        if (!is_overflow()) {
1202
13
            break;
1203
13
        }
1204
1205
20
        auto* cell = get_cell(iter->hash, iter->offset, cache_lock);
1206
1207
20
        if (!cell) {
1208
            /// The cache corresponding to this record may be swapped out by
1209
            /// other queries, so it has become invalid.
1210
4
            ghost.push_back(iter);
1211
4
            ghost_remove_size += iter->size;
1212
16
        } else {
1213
16
            size_t cell_size = cell->size();
1214
16
            DCHECK(iter->size == cell_size);
1215
1216
16
            if (cell->releasable()) {
1217
16
                auto& file_block = cell->file_block;
1218
1219
16
                std::lock_guard block_lock(file_block->_mutex);
1220
16
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1221
16
                to_evict.push_back(cell);
1222
16
                removed_size += cell_size;
1223
16
            }
1224
16
        }
1225
20
    }
1226
1227
16
    auto remove_file_block_if = [&](FileBlockCell* cell) {
1228
16
        FileBlockSPtr file_block = cell->file_block;
1229
16
        if (file_block) {
1230
16
            query_context->remove(file_block->get_hash_value(), file_block->offset(), cache_lock);
1231
16
            std::lock_guard block_lock(file_block->_mutex);
1232
16
            remove(file_block, cache_lock, block_lock);
1233
16
        }
1234
16
    };
1235
1236
15
    for (auto& iter : ghost) {
1237
4
        query_context->remove(iter->hash, iter->offset, cache_lock);
1238
4
    }
1239
1240
15
    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
1241
1242
15
    if (is_overflow() &&
1243
15
        !try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock)) {
1244
1
        return false;
1245
1
    }
1246
14
    query_context->reserve(hash, offset, size, cache_lock);
1247
14
    return true;
1248
15
}
1249
1250
4
void BlockFileCache::try_evict_in_advance(size_t size, std::lock_guard<std::mutex>& cache_lock) {
1251
4
    UInt128Wrapper hash = UInt128Wrapper();
1252
4
    size_t offset = 0;
1253
4
    CacheContext context;
1254
    /* we pick NORMAL and TTL cache to evict in advance
1255
     * we reserve for them but won't acutually give space to them
1256
     * on the contrary, NORMAL and TTL may sacrifice by LRU evicting themselves
1257
     * other cache types cannot be exempted because we will evict what they have stolen before LRU evicting
1258
     * in summary: all cache types will shrink somewhat, and NORMAL and TTL shrink the most, to make sure the cache is not full
1259
     */
1260
4
    context.cache_type = FileCacheType::NORMAL;
1261
4
    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
1262
4
    context.cache_type = FileCacheType::TTL;
1263
4
    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
1264
4
}
1265
1266
// remove specific cache synchronously, for critical operations
1267
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
1268
8
void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) {
1269
8
    std::string reason = "remove_if_cached";
1270
8
    SCOPED_CACHE_LOCK(_mutex, this);
1271
8
    auto iter = _files.find(file_key);
1272
8
    std::vector<FileBlockCell*> to_remove;
1273
8
    if (iter != _files.end()) {
1274
14
        for (auto& [_, cell] : iter->second) {
1275
14
            if (cell.releasable()) {
1276
13
                to_remove.push_back(&cell);
1277
13
            } else {
1278
1
                cell.file_block->set_deleting();
1279
1
            }
1280
14
        }
1281
6
    }
1282
8
    remove_file_blocks(to_remove, cache_lock, true, reason);
1283
8
}
1284
1285
// the async version of remove_if_cached, for background operations
1286
// cache meta is deleted synchronously if not in use, and the block file is deleted asynchronously
1287
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
1288
2
void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) {
1289
2
    std::string reason = "remove_if_cached_async";
1290
2
    SCOPED_CACHE_LOCK(_mutex, this);
1291
1292
2
    auto iter = _files.find(file_key);
1293
2
    std::vector<FileBlockCell*> to_remove;
1294
2
    if (iter != _files.end()) {
1295
2
        for (auto& [_, cell] : iter->second) {
1296
2
            *_gc_evict_bytes_metrics << cell.size();
1297
2
            *_gc_evict_count_metrics << 1;
1298
2
            if (cell.releasable()) {
1299
1
                to_remove.push_back(&cell);
1300
1
            } else {
1301
1
                cell.file_block->set_deleting();
1302
1
            }
1303
2
        }
1304
2
    }
1305
2
    remove_file_blocks(to_remove, cache_lock, false, reason);
1306
2
}
1307
1308
std::vector<FileCacheType> BlockFileCache::get_other_cache_type_without_ttl(
1309
13.6k
        FileCacheType cur_cache_type) {
1310
13.6k
    switch (cur_cache_type) {
1311
3.91k
    case FileCacheType::TTL:
1312
3.91k
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
1313
671
    case FileCacheType::INDEX:
1314
671
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL};
1315
8.46k
    case FileCacheType::NORMAL:
1316
8.46k
        return {FileCacheType::DISPOSABLE, FileCacheType::INDEX};
1317
617
    case FileCacheType::DISPOSABLE:
1318
617
        return {FileCacheType::NORMAL, FileCacheType::INDEX};
1319
0
    default:
1320
0
        return {};
1321
13.6k
    }
1322
0
    return {};
1323
13.6k
}
1324
1325
1.24k
std::vector<FileCacheType> BlockFileCache::get_other_cache_type(FileCacheType cur_cache_type) {
1326
1.24k
    switch (cur_cache_type) {
1327
286
    case FileCacheType::TTL:
1328
286
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
1329
143
    case FileCacheType::INDEX:
1330
143
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::TTL};
1331
667
    case FileCacheType::NORMAL:
1332
667
        return {FileCacheType::DISPOSABLE, FileCacheType::INDEX, FileCacheType::TTL};
1333
152
    case FileCacheType::DISPOSABLE:
1334
152
        return {FileCacheType::NORMAL, FileCacheType::INDEX, FileCacheType::TTL};
1335
0
    default:
1336
0
        return {};
1337
1.24k
    }
1338
0
    return {};
1339
1.24k
}
1340
1341
void BlockFileCache::reset_range(const UInt128Wrapper& hash, size_t offset, size_t old_size,
1342
10
                                 size_t new_size, std::lock_guard<std::mutex>& cache_lock) {
1343
10
    DCHECK(_files.find(hash) != _files.end() &&
1344
10
           _files.find(hash)->second.find(offset) != _files.find(hash)->second.end());
1345
10
    FileBlockCell* cell = get_cell(hash, offset, cache_lock);
1346
10
    DCHECK(cell != nullptr);
1347
10
    if (cell == nullptr) {
1348
0
        LOG(WARNING) << "reset_range skipped because cache cell is missing. hash="
1349
0
                     << hash.to_string() << " offset=" << offset << " old_size=" << old_size
1350
0
                     << " new_size=" << new_size;
1351
0
        return;
1352
0
    }
1353
10
    DCHECK_EQ(cell->file_block->_block_range.size(), old_size);
1354
10
    if (cell->queue_iterator) {
1355
10
        auto& queue = get_queue(cell->file_block->cache_type());
1356
10
        DCHECK(queue.contains(hash, offset, cache_lock));
1357
10
        queue.resize(*cell->queue_iterator, new_size, cache_lock);
1358
10
        _lru_recorder->record_queue_event(cell->file_block->cache_type(), CacheLRULogType::RESIZE,
1359
10
                                          cell->file_block->get_hash_value(),
1360
10
                                          cell->file_block->offset(), new_size);
1361
10
    }
1362
10
    cell->file_block->_block_range.right = cell->file_block->_block_range.left + new_size - 1;
1363
10
    _cur_cache_size -= old_size;
1364
10
    _cur_cache_size += new_size;
1365
10
    if (cell->file_block->cache_type() == FileCacheType::TTL) {
1366
0
        _cur_ttl_size -= old_size;
1367
0
        _cur_ttl_size += new_size;
1368
0
    }
1369
10
}
1370
1371
bool BlockFileCache::try_reserve_from_other_queue_by_time_interval(
1372
        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
1373
13.6k
        int64_t cur_time, std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
1374
13.6k
    size_t removed_size = 0;
1375
13.6k
    size_t cur_cache_size = _cur_cache_size;
1376
13.6k
    std::vector<FileBlockCell*> to_evict;
1377
31.2k
    for (FileCacheType cache_type : other_cache_types) {
1378
31.2k
        auto& queue = get_queue(cache_type);
1379
31.2k
        size_t remove_size_per_type = 0;
1380
31.2k
        for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1381
1.43k
            if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1382
697
                break;
1383
697
            }
1384
741
            auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1385
741
            DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
1386
0
                         << ", offset: " << entry_offset;
1387
1388
741
            size_t cell_size = cell->size();
1389
741
            DCHECK(entry_size == cell_size);
1390
1391
741
            if (cell->atime == 0 ? true : cell->atime + queue.get_hot_data_interval() > cur_time) {
1392
739
                break;
1393
739
            }
1394
1395
2
            if (cell->releasable()) {
1396
2
                auto& file_block = cell->file_block;
1397
2
                std::lock_guard block_lock(file_block->_mutex);
1398
2
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1399
2
                to_evict.push_back(cell);
1400
2
                removed_size += cell_size;
1401
2
                remove_size_per_type += cell_size;
1402
2
            }
1403
2
        }
1404
31.2k
        *(_evict_by_time_metrics_matrix[cache_type][cur_type]) << remove_size_per_type;
1405
31.2k
    }
1406
13.6k
    bool is_sync_removal = !evict_in_advance;
1407
13.6k
    std::string reason = std::string("try_reserve_by_time ") +
1408
13.6k
                         " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1409
13.6k
    remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1410
1411
13.6k
    return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
1412
13.6k
}
1413
1414
bool BlockFileCache::is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size,
1415
19.4k
                                 bool evict_in_advance) const {
1416
19.4k
    bool ret = false;
1417
19.4k
    if (evict_in_advance) { // we don't need to check _need_evict_cache_in_advance
1418
39
        ret = (removed_size < need_size);
1419
39
        return ret;
1420
39
    }
1421
19.4k
    if (_disk_resource_limit_mode) {
1422
4
        ret = (removed_size < need_size);
1423
19.4k
    } else {
1424
19.4k
        ret = (cur_cache_size + need_size - removed_size > _capacity);
1425
19.4k
    }
1426
19.4k
    return ret;
1427
19.4k
}
1428
1429
bool BlockFileCache::try_reserve_from_other_queue_by_size(
1430
        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
1431
419
        std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
1432
419
    size_t removed_size = 0;
1433
419
    size_t cur_cache_size = _cur_cache_size;
1434
419
    std::vector<FileBlockCell*> to_evict;
1435
    // we follow the privilege defined in get_other_cache_types to evict
1436
1.25k
    for (FileCacheType cache_type : other_cache_types) {
1437
1.25k
        auto& queue = get_queue(cache_type);
1438
1439
        // we will not drain each of them to the bottom -- i.e., we only
1440
        // evict what they have stolen.
1441
1.25k
        size_t cur_queue_size = queue.get_capacity(cache_lock);
1442
1.25k
        size_t cur_queue_max_size = queue.get_max_size();
1443
1.25k
        if (cur_queue_size <= cur_queue_max_size) {
1444
752
            continue;
1445
752
        }
1446
505
        size_t cur_removed_size = 0;
1447
505
        find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
1448
505
                              cur_removed_size, evict_in_advance);
1449
505
        *(_evict_by_size_metrics_matrix[cache_type][cur_type]) << cur_removed_size;
1450
505
    }
1451
419
    bool is_sync_removal = !evict_in_advance;
1452
419
    std::string reason = std::string("try_reserve_by_size") +
1453
419
                         " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1454
419
    remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1455
419
    return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
1456
419
}
1457
1458
bool BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type, size_t size,
1459
                                                  int64_t cur_time,
1460
                                                  std::lock_guard<std::mutex>& cache_lock,
1461
13.6k
                                                  bool evict_in_advance) {
1462
    // currently, TTL cache is not considered as a candidate
1463
13.6k
    auto other_cache_types = get_other_cache_type_without_ttl(cur_cache_type);
1464
13.6k
    bool reserve_success = try_reserve_from_other_queue_by_time_interval(
1465
13.6k
            cur_cache_type, other_cache_types, size, cur_time, cache_lock, evict_in_advance);
1466
13.6k
    if (reserve_success || !config::file_cache_enable_evict_from_other_queue_by_size) {
1467
12.4k
        return reserve_success;
1468
12.4k
    }
1469
1470
1.24k
    other_cache_types = get_other_cache_type(cur_cache_type);
1471
1.24k
    auto& cur_queue = get_queue(cur_cache_type);
1472
1.24k
    size_t cur_queue_size = cur_queue.get_capacity(cache_lock);
1473
1.24k
    size_t cur_queue_max_size = cur_queue.get_max_size();
1474
    // Hit the soft limit by self, cannot remove from other queues
1475
1.24k
    if (_cur_cache_size + size > _capacity && cur_queue_size + size > cur_queue_max_size) {
1476
829
        return false;
1477
829
    }
1478
419
    return try_reserve_from_other_queue_by_size(cur_cache_type, other_cache_types, size, cache_lock,
1479
419
                                                evict_in_advance);
1480
1.24k
}
1481
1482
bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash,
1483
                                         QueryFileCacheContextPtr query_context,
1484
                                         const CacheContext& context, size_t offset, size_t size,
1485
                                         std::lock_guard<std::mutex>& cache_lock,
1486
13.6k
                                         bool evict_in_advance) {
1487
13.6k
    int64_t cur_time = steady_clock_seconds();
1488
13.6k
    if (!try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock,
1489
13.6k
                                      evict_in_advance)) {
1490
835
        auto& queue = get_queue(context.cache_type);
1491
835
        size_t removed_size = 0;
1492
835
        size_t cur_cache_size = _cur_cache_size;
1493
1494
835
        std::vector<FileBlockCell*> to_evict;
1495
835
        size_t cur_removed_size = 0;
1496
835
        find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
1497
835
                              cur_removed_size, evict_in_advance);
1498
835
        bool is_sync_removal = !evict_in_advance;
1499
835
        std::string reason = std::string("try_reserve for cache type ") +
1500
835
                             cache_type_to_string(context.cache_type) +
1501
835
                             " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1502
835
        remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1503
835
        *(_evict_by_self_lru_metrics_matrix[context.cache_type]) << cur_removed_size;
1504
1505
835
        if (is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1506
65
            return false;
1507
65
        }
1508
835
    }
1509
1510
13.5k
    if (query_context) {
1511
16
        query_context->reserve(hash, offset, size, cache_lock);
1512
16
    }
1513
13.5k
    return true;
1514
13.6k
}
1515
1516
template <class T, class U>
1517
    requires IsXLock<T> && IsXLock<U>
1518
108k
void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lock, bool sync) {
1519
108k
    auto hash = file_block->get_hash_value();
1520
108k
    auto offset = file_block->offset();
1521
108k
    auto type = file_block->cache_type();
1522
108k
    auto expiration_time = file_block->expiration_time();
1523
108k
    auto tablet_id = file_block->tablet_id();
1524
108k
    auto* cell = get_cell(hash, offset, cache_lock);
1525
108k
    file_block->cell = nullptr;
1526
    // Holder cleanup can race with prior cache metadata cleanup. In that case,
1527
    // skip the duplicate remove instead of touching a detached or replaced cell.
1528
108k
    if (cell == nullptr) {
1529
1
        LOG(WARNING) << "remove skipped because cache cell is missing. hash=" << hash.to_string()
1530
1
                     << " offset=" << offset << " size=" << file_block->range().size()
1531
1
                     << " type=" << cache_type_to_string(type)
1532
1
                     << " state=" << FileBlock::state_to_string(file_block->state_unsafe())
1533
1
                     << " expiration_time=" << expiration_time << " sync=" << sync;
1534
1
        return;
1535
1
    }
1536
108k
    if (cell->file_block.get() != file_block.get()) {
1537
1
        auto* cell_file_block = cell->file_block.get();
1538
1
        LOG(WARNING)
1539
1
                << "remove skipped because cache cell points to a different file block. hash="
1540
1
                << hash.to_string() << " offset=" << offset
1541
1
                << " size=" << file_block->range().size() << " type=" << cache_type_to_string(type)
1542
1
                << " state=" << FileBlock::state_to_string(file_block->state_unsafe())
1543
1
                << " expiration_time=" << expiration_time << " sync=" << sync << " cell_block_hash="
1544
1
                << (cell_file_block ? cell_file_block->get_hash_value().to_string() : "<null>")
1545
1
                << " cell_block_offset="
1546
1
                << (cell_file_block ? std::to_string(cell_file_block->offset()) : "<null>")
1547
1
                << " cell_block_size="
1548
1
                << (cell_file_block ? std::to_string(cell_file_block->range().size()) : "<null>")
1549
1
                << " cell_block_type="
1550
1
                << (cell_file_block ? cache_type_to_string(cell_file_block->cache_type())
1551
1
                                    : "<null>")
1552
1
                << " cell_block_state="
1553
1
                << (cell_file_block ? FileBlock::state_to_string(cell_file_block->state_unsafe())
1554
1
                                    : "<null>");
1555
1
        return;
1556
1
    }
1557
108k
    DCHECK(cell->queue_iterator);
1558
108k
    if (cell->queue_iterator) {
1559
108k
        auto& queue = get_queue(file_block->cache_type());
1560
108k
        queue.remove(*cell->queue_iterator, cache_lock);
1561
108k
        _lru_recorder->record_queue_event(file_block->cache_type(), CacheLRULogType::REMOVE,
1562
108k
                                          cell->file_block->get_hash_value(),
1563
108k
                                          cell->file_block->offset(), cell->size());
1564
108k
    }
1565
108k
    *_queue_evict_size_metrics[file_cache_type_index(file_block->cache_type())]
1566
108k
            << file_block->range().size();
1567
108k
    *_total_evict_size_metrics << file_block->range().size();
1568
1569
108k
    VLOG_DEBUG << "Removing file block from cache. hash: " << hash.to_string()
1570
0
               << ", offset: " << offset << ", size: " << file_block->range().size()
1571
0
               << ", type: " << cache_type_to_string(type);
1572
1573
108k
    if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADED) {
1574
106k
        FileCacheKey key;
1575
106k
        key.hash = hash;
1576
106k
        key.offset = offset;
1577
106k
        key.meta.type = type;
1578
106k
        key.meta.expiration_time = expiration_time;
1579
106k
        key.meta.tablet_id = tablet_id;
1580
106k
        if (sync) {
1581
106k
            int64_t duration_ns = 0;
1582
106k
            Status st;
1583
106k
            {
1584
106k
                SCOPED_RAW_TIMER(&duration_ns);
1585
106k
                st = _storage->remove(key);
1586
106k
            }
1587
106k
            *_storage_sync_remove_latency_us << (duration_ns / 1000);
1588
106k
            if (!st.ok()) {
1589
0
                LOG_WARNING("").error(st);
1590
0
            }
1591
106k
        } else {
1592
            // the file will be deleted in the bottom half
1593
            // so there will be a window that the file is not in the cache but still in the storage
1594
            // but it's ok, because the rowset is stale already
1595
49
            bool ret = _recycle_keys.enqueue(key);
1596
49
            if (ret) [[likely]] {
1597
49
                *_recycle_keys_length_recorder << _recycle_keys.size_approx();
1598
49
            } else {
1599
0
                LOG_WARNING("Failed to push recycle key to queue, do it synchronously");
1600
0
                int64_t duration_ns = 0;
1601
0
                Status st;
1602
0
                {
1603
0
                    SCOPED_RAW_TIMER(&duration_ns);
1604
0
                    st = _storage->remove(key);
1605
0
                }
1606
0
                *_storage_retry_sync_remove_latency_us << (duration_ns / 1000);
1607
0
                if (!st.ok()) {
1608
0
                    LOG_WARNING("").error(st);
1609
0
                }
1610
0
            }
1611
49
        }
1612
106k
    } else if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADING) {
1613
100
        file_block->set_deleting();
1614
100
        return;
1615
100
    }
1616
108k
    _cur_cache_size -= file_block->range().size();
1617
108k
    if (FileCacheType::TTL == type) {
1618
1.29k
        _cur_ttl_size -= file_block->range().size();
1619
1.29k
    }
1620
108k
    auto it = _files.find(hash);
1621
108k
    if (it != _files.end()) {
1622
108k
        it->second.erase(file_block->offset());
1623
108k
        if (it->second.empty()) {
1624
6.47k
            _files.erase(hash);
1625
6.47k
        }
1626
108k
    }
1627
108k
    *_num_removed_blocks << 1;
1628
108k
}
1629
1630
46
size_t BlockFileCache::get_used_cache_size(FileCacheType cache_type) const {
1631
46
    SCOPED_CACHE_LOCK(_mutex, this);
1632
46
    return get_used_cache_size_unlocked(cache_type, cache_lock);
1633
46
}
1634
1635
size_t BlockFileCache::get_used_cache_size_unlocked(FileCacheType cache_type,
1636
46
                                                    std::lock_guard<std::mutex>& cache_lock) const {
1637
46
    return get_queue(cache_type).get_capacity(cache_lock);
1638
46
}
1639
1640
0
size_t BlockFileCache::get_available_cache_size(FileCacheType cache_type) const {
1641
0
    SCOPED_CACHE_LOCK(_mutex, this);
1642
0
    return get_available_cache_size_unlocked(cache_type, cache_lock);
1643
0
}
1644
1645
size_t BlockFileCache::get_available_cache_size_unlocked(
1646
0
        FileCacheType cache_type, std::lock_guard<std::mutex>& cache_lock) const {
1647
0
    return get_queue(cache_type).get_max_element_size() -
1648
0
           get_used_cache_size_unlocked(cache_type, cache_lock);
1649
0
}
1650
1651
91
size_t BlockFileCache::get_file_blocks_num(FileCacheType cache_type) const {
1652
91
    SCOPED_CACHE_LOCK(_mutex, this);
1653
91
    return get_file_blocks_num_unlocked(cache_type, cache_lock);
1654
91
}
1655
1656
size_t BlockFileCache::get_file_blocks_num_unlocked(FileCacheType cache_type,
1657
91
                                                    std::lock_guard<std::mutex>& cache_lock) const {
1658
91
    return get_queue(cache_type).get_elements_num(cache_lock);
1659
91
}
1660
1661
FileBlockCell::FileBlockCell(FileBlockSPtr file_block, std::lock_guard<std::mutex>& cache_lock)
1662
113k
        : file_block(file_block) {
1663
113k
    file_block->cell = this;
1664
    /**
1665
     * Cell can be created with either DOWNLOADED or EMPTY file block's state.
1666
     * File block acquires DOWNLOADING state and creates LRUQueue iterator on first
1667
     * successful getOrSetDownaloder call.
1668
     */
1669
1670
113k
    switch (file_block->_download_state) {
1671
100k
    case FileBlock::State::DOWNLOADED:
1672
113k
    case FileBlock::State::EMPTY:
1673
113k
    case FileBlock::State::SKIP_CACHE: {
1674
113k
        break;
1675
113k
    }
1676
0
    default:
1677
0
        DCHECK(false) << "Can create cell with either EMPTY, DOWNLOADED, SKIP_CACHE state, got: "
1678
0
                      << FileBlock::state_to_string(file_block->_download_state);
1679
113k
    }
1680
113k
    if (file_block->cache_type() == FileCacheType::TTL) {
1681
3.92k
        update_atime();
1682
3.92k
    }
1683
113k
}
1684
1685
LRUQueue::Iterator LRUQueue::add(const UInt128Wrapper& hash, size_t offset, size_t size,
1686
226k
                                 std::lock_guard<std::mutex>& /* cache_lock */) {
1687
226k
    cache_size += size;
1688
226k
    auto iter = queue.insert(queue.end(), FileKeyAndOffset(hash, offset, size));
1689
226k
    map.insert(std::make_pair(std::make_pair(hash, offset), iter));
1690
226k
    return iter;
1691
226k
}
1692
1693
0
void LRUQueue::remove_all(std::lock_guard<std::mutex>& /* cache_lock */) {
1694
0
    queue.clear();
1695
0
    map.clear();
1696
0
    cache_size = 0;
1697
0
}
1698
1699
15
bool LRUQueue::pop_front(std::lock_guard<std::mutex>& /* cache_lock */) {
1700
15
    if (queue.empty()) {
1701
0
        return false;
1702
0
    }
1703
15
    auto queue_it = queue.begin();
1704
15
    cache_size -= queue_it->size;
1705
15
    map.erase(std::make_pair(queue_it->hash, queue_it->offset));
1706
15
    queue.erase(queue_it);
1707
15
    return true;
1708
15
}
1709
1710
1.03M
void LRUQueue::move_to_end(Iterator queue_it, std::lock_guard<std::mutex>& /* cache_lock */) {
1711
1.03M
    queue.splice(queue.end(), queue, queue_it);
1712
1.03M
}
1713
1714
void LRUQueue::resize(Iterator queue_it, size_t new_size,
1715
18
                      std::lock_guard<std::mutex>& /* cache_lock */) {
1716
18
    cache_size -= queue_it->size;
1717
18
    queue_it->size = new_size;
1718
18
    cache_size += new_size;
1719
18
}
1720
bool LRUQueue::contains(const UInt128Wrapper& hash, size_t offset,
1721
10
                        std::lock_guard<std::mutex>& /* cache_lock */) const {
1722
10
    return map.find(std::make_pair(hash, offset)) != map.end();
1723
10
}
1724
1725
LRUQueue::Iterator LRUQueue::get(const UInt128Wrapper& hash, size_t offset,
1726
629k
                                 std::lock_guard<std::mutex>& /* cache_lock */) const {
1727
629k
    auto itr = map.find(std::make_pair(hash, offset));
1728
629k
    if (itr != map.end()) {
1729
629k
        return itr->second;
1730
629k
    }
1731
113
    return std::list<FileKeyAndOffset>::iterator();
1732
629k
}
1733
1734
2
std::string LRUQueue::to_string(std::lock_guard<std::mutex>& /* cache_lock */) const {
1735
2
    std::string result;
1736
18
    for (const auto& [hash, offset, size] : queue) {
1737
18
        if (!result.empty()) {
1738
16
            result += ", ";
1739
16
        }
1740
18
        result += fmt::format("{}: [{}, {}]", hash.to_string(), offset, offset + size - 1);
1741
18
    }
1742
2
    return result;
1743
2
}
1744
1745
size_t LRUQueue::levenshtein_distance_from(LRUQueue& base,
1746
5
                                           std::lock_guard<std::mutex>& cache_lock) {
1747
5
    std::list<FileKeyAndOffset> target_queue = this->queue;
1748
5
    std::list<FileKeyAndOffset> base_queue = base.queue;
1749
5
    std::vector<FileKeyAndOffset> vec1(target_queue.begin(), target_queue.end());
1750
5
    std::vector<FileKeyAndOffset> vec2(base_queue.begin(), base_queue.end());
1751
1752
5
    size_t m = vec1.size();
1753
5
    size_t n = vec2.size();
1754
1755
    // Create a 2D vector (matrix) to store the Levenshtein distances
1756
    // dp[i][j] will hold the distance between the first i elements of vec1 and the first j elements of vec2
1757
5
    std::vector<std::vector<size_t>> dp(m + 1, std::vector<size_t>(n + 1, 0));
1758
1759
    // Initialize the first row and column of the matrix
1760
    // The distance between an empty list and a list of length k is k (all insertions or deletions)
1761
22
    for (size_t i = 0; i <= m; ++i) {
1762
17
        dp[i][0] = i;
1763
17
    }
1764
20
    for (size_t j = 0; j <= n; ++j) {
1765
15
        dp[0][j] = j;
1766
15
    }
1767
1768
    // Fill the matrix using dynamic programming
1769
17
    for (size_t i = 1; i <= m; ++i) {
1770
38
        for (size_t j = 1; j <= n; ++j) {
1771
            // Check if the current elements of both vectors are equal
1772
26
            size_t cost = (vec1[i - 1].hash == vec2[j - 1].hash &&
1773
26
                           vec1[i - 1].offset == vec2[j - 1].offset)
1774
26
                                  ? 0
1775
26
                                  : 1;
1776
            // Calculate the minimum cost of three possible operations:
1777
            // 1. Insertion: dp[i][j-1] + 1
1778
            // 2. Deletion: dp[i-1][j] + 1
1779
            // 3. Substitution: dp[i-1][j-1] + cost (0 if elements are equal, 1 if not)
1780
26
            dp[i][j] = std::min({dp[i - 1][j] + 1, dp[i][j - 1] + 1, dp[i - 1][j - 1] + cost});
1781
26
        }
1782
12
    }
1783
    // The bottom-right cell of the matrix contains the Levenshtein distance
1784
5
    return dp[m][n];
1785
5
}
1786
1787
1
std::string BlockFileCache::dump_structure(const UInt128Wrapper& hash) {
1788
1
    SCOPED_CACHE_LOCK(_mutex, this);
1789
1
    return dump_structure_unlocked(hash, cache_lock);
1790
1
}
1791
1792
std::string BlockFileCache::dump_structure_unlocked(const UInt128Wrapper& hash,
1793
1
                                                    std::lock_guard<std::mutex>&) {
1794
1
    std::stringstream result;
1795
1
    auto it = _files.find(hash);
1796
1
    if (it == _files.end()) {
1797
0
        return std::string("");
1798
0
    }
1799
1
    const auto& cells_by_offset = it->second;
1800
1801
1
    for (const auto& [_, cell] : cells_by_offset) {
1802
1
        result << cell.file_block->get_info_for_log() << " "
1803
1
               << cache_type_to_string(cell.file_block->cache_type()) << "\n";
1804
1
    }
1805
1806
1
    return result.str();
1807
1
}
1808
1809
0
std::string BlockFileCache::dump_single_cache_type(const UInt128Wrapper& hash, size_t offset) {
1810
0
    SCOPED_CACHE_LOCK(_mutex, this);
1811
0
    return dump_single_cache_type_unlocked(hash, offset, cache_lock);
1812
0
}
1813
1814
std::string BlockFileCache::dump_single_cache_type_unlocked(const UInt128Wrapper& hash,
1815
                                                            size_t offset,
1816
0
                                                            std::lock_guard<std::mutex>&) {
1817
0
    std::stringstream result;
1818
0
    auto it = _files.find(hash);
1819
0
    if (it == _files.end()) {
1820
0
        return std::string("");
1821
0
    }
1822
0
    const auto& cells_by_offset = it->second;
1823
0
    const auto& cell = cells_by_offset.find(offset);
1824
1825
0
    return cache_type_to_string(cell->second.file_block->cache_type());
1826
0
}
1827
1828
void BlockFileCache::change_cache_type(const UInt128Wrapper& hash, size_t offset,
1829
                                       FileCacheType new_type,
1830
9
                                       std::lock_guard<std::mutex>& cache_lock) {
1831
9
    if (auto iter = _files.find(hash); iter != _files.end()) {
1832
9
        auto& file_blocks = iter->second;
1833
9
        if (auto cell_it = file_blocks.find(offset); cell_it != file_blocks.end()) {
1834
8
            FileBlockCell& cell = cell_it->second;
1835
8
            auto& cur_queue = get_queue(cell.file_block->cache_type());
1836
8
            DCHECK(cell.queue_iterator.has_value());
1837
8
            cur_queue.remove(*cell.queue_iterator, cache_lock);
1838
8
            _lru_recorder->record_queue_event(
1839
8
                    cell.file_block->cache_type(), CacheLRULogType::REMOVE,
1840
8
                    cell.file_block->get_hash_value(), cell.file_block->offset(), cell.size());
1841
8
            auto& new_queue = get_queue(new_type);
1842
8
            cell.queue_iterator =
1843
8
                    new_queue.add(hash, offset, cell.file_block->range().size(), cache_lock);
1844
8
            _lru_recorder->record_queue_event(new_type, CacheLRULogType::ADD,
1845
8
                                              cell.file_block->get_hash_value(),
1846
8
                                              cell.file_block->offset(), cell.size());
1847
8
        }
1848
9
    }
1849
9
}
1850
1851
// @brief: get a path's disk capacity used percent, inode used percent
1852
// @param: path
1853
// @param: percent.first disk used percent, percent.second inode used percent
1854
217
int disk_used_percentage(const std::string& path, std::pair<int, int>* percent) {
1855
217
    struct statfs stat;
1856
217
    int ret = statfs(path.c_str(), &stat);
1857
217
    if (ret != 0) {
1858
2
        return ret;
1859
2
    }
1860
    // https://github.com/coreutils/coreutils/blob/master/src/df.c#L1195
1861
    // v->used = stat.f_blocks - stat.f_bfree
1862
    // nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail
1863
215
    uintmax_t u100 = (stat.f_blocks - stat.f_bfree) * 100;
1864
215
    uintmax_t nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail;
1865
215
    int capacity_percentage = int(u100 / nonroot_total + (u100 % nonroot_total != 0));
1866
1867
215
    unsigned long long inode_free = stat.f_ffree;
1868
215
    unsigned long long inode_total = stat.f_files;
1869
215
    int inode_percentage = cast_set<int>(inode_free * 100 / inode_total);
1870
215
    percent->first = capacity_percentage;
1871
215
    percent->second = 100 - inode_percentage;
1872
1873
    // Add sync point for testing
1874
215
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::disk_used_percentage:1", percent);
1875
1876
215
    return 0;
1877
217
}
1878
1879
10
std::string BlockFileCache::reset_capacity(size_t new_capacity) {
1880
10
    using namespace std::chrono;
1881
10
    int64_t space_released = 0;
1882
10
    size_t old_capacity = 0;
1883
10
    std::stringstream ss;
1884
10
    ss << "finish reset_capacity, path=" << _cache_base_path;
1885
10
    auto adjust_start_time = steady_clock::time_point();
1886
10
    {
1887
10
        SCOPED_CACHE_LOCK(_mutex, this);
1888
10
        if (new_capacity < _capacity && new_capacity < _cur_cache_size) {
1889
1
            int64_t need_remove_size = _cur_cache_size - new_capacity;
1890
4
            auto remove_blocks = [&](LRUQueue& queue) -> int64_t {
1891
4
                int64_t queue_released = 0;
1892
4
                std::vector<FileBlockCell*> to_evict;
1893
13
                for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1894
13
                    if (need_remove_size <= 0) {
1895
1
                        break;
1896
1
                    }
1897
12
                    need_remove_size -= entry_size;
1898
12
                    space_released += entry_size;
1899
12
                    queue_released += entry_size;
1900
12
                    auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1901
12
                    if (!cell->releasable()) {
1902
0
                        cell->file_block->set_deleting();
1903
0
                        continue;
1904
0
                    }
1905
12
                    to_evict.push_back(cell);
1906
12
                }
1907
12
                for (auto& cell : to_evict) {
1908
12
                    FileBlockSPtr file_block = cell->file_block;
1909
12
                    std::lock_guard block_lock(file_block->_mutex);
1910
12
                    remove(file_block, cache_lock, block_lock);
1911
12
                }
1912
4
                return queue_released;
1913
4
            };
1914
1
            int64_t queue_released = remove_blocks(_disposable_queue);
1915
1
            ss << " disposable_queue released " << queue_released;
1916
1
            queue_released = remove_blocks(_normal_queue);
1917
1
            ss << " normal_queue released " << queue_released;
1918
1
            queue_released = remove_blocks(_index_queue);
1919
1
            ss << " index_queue released " << queue_released;
1920
1
            queue_released = remove_blocks(_ttl_queue);
1921
1
            ss << " ttl_queue released " << queue_released;
1922
1923
1
            _disk_resource_limit_mode = true;
1924
1
            _disk_limit_mode_metrics->set_value(1);
1925
1
            ss << " total_space_released=" << space_released;
1926
1
        }
1927
10
        old_capacity = _capacity;
1928
10
        _capacity = new_capacity;
1929
10
        _cache_capacity_metrics->set_value(_capacity);
1930
10
    }
1931
10
    auto use_time = duration_cast<milliseconds>(steady_clock::time_point() - adjust_start_time);
1932
10
    LOG(INFO) << "Finish tag deleted block. path=" << _cache_base_path
1933
10
              << " use_time=" << cast_set<int64_t>(use_time.count());
1934
10
    ss << " old_capacity=" << old_capacity << " new_capacity=" << new_capacity;
1935
10
    LOG(INFO) << ss.str();
1936
10
    return ss.str();
1937
10
}
1938
1939
234
void BlockFileCache::check_disk_resource_limit() {
1940
234
    if (_storage->get_type() != FileCacheStorageType::DISK) {
1941
32
        return;
1942
32
    }
1943
1944
202
    bool previous_mode = _disk_resource_limit_mode;
1945
202
    if (_capacity > _cur_cache_size) {
1946
200
        _disk_resource_limit_mode = false;
1947
200
        _disk_limit_mode_metrics->set_value(0);
1948
200
    }
1949
202
    std::pair<int, int> percent;
1950
202
    int ret = disk_used_percentage(_cache_base_path, &percent);
1951
202
    if (ret != 0) {
1952
1
        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
1953
1
        return;
1954
1
    }
1955
201
    auto [space_percentage, inode_percentage] = percent;
1956
402
    auto is_insufficient = [](const int& percentage) {
1957
402
        return percentage >= config::file_cache_enter_disk_resource_limit_mode_percent;
1958
402
    };
1959
201
    DCHECK_GE(space_percentage, 0);
1960
201
    DCHECK_LE(space_percentage, 100);
1961
201
    DCHECK_GE(inode_percentage, 0);
1962
201
    DCHECK_LE(inode_percentage, 100);
1963
    // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
1964
    // FIXME: reject with config validator
1965
201
    if (config::file_cache_enter_disk_resource_limit_mode_percent <
1966
201
        config::file_cache_exit_disk_resource_limit_mode_percent) {
1967
1
        LOG_WARNING("config error, set to default value")
1968
1
                .tag("enter", config::file_cache_enter_disk_resource_limit_mode_percent)
1969
1
                .tag("exit", config::file_cache_exit_disk_resource_limit_mode_percent);
1970
1
        config::file_cache_enter_disk_resource_limit_mode_percent = 88;
1971
1
        config::file_cache_exit_disk_resource_limit_mode_percent = 80;
1972
1
    }
1973
201
    bool is_space_insufficient = is_insufficient(space_percentage);
1974
201
    bool is_inode_insufficient = is_insufficient(inode_percentage);
1975
201
    if (is_space_insufficient || is_inode_insufficient) {
1976
1
        _disk_resource_limit_mode = true;
1977
1
        _disk_limit_mode_metrics->set_value(1);
1978
200
    } else if (_disk_resource_limit_mode &&
1979
200
               (space_percentage < config::file_cache_exit_disk_resource_limit_mode_percent) &&
1980
200
               (inode_percentage < config::file_cache_exit_disk_resource_limit_mode_percent)) {
1981
0
        _disk_resource_limit_mode = false;
1982
0
        _disk_limit_mode_metrics->set_value(0);
1983
0
    }
1984
201
    if (previous_mode != _disk_resource_limit_mode) {
1985
        // add log for disk resource limit mode switching
1986
2
        if (_disk_resource_limit_mode) {
1987
1
            LOG(WARNING) << "Entering disk resource limit mode: file_cache=" << get_base_path()
1988
1
                         << " space_percent=" << space_percentage
1989
1
                         << " inode_percent=" << inode_percentage
1990
1
                         << " is_space_insufficient=" << is_space_insufficient
1991
1
                         << " is_inode_insufficient=" << is_inode_insufficient
1992
1
                         << " enter threshold="
1993
1
                         << config::file_cache_enter_disk_resource_limit_mode_percent;
1994
1
        } else {
1995
1
            LOG(INFO) << "Exiting disk resource limit mode: file_cache=" << get_base_path()
1996
1
                      << " space_percent=" << space_percentage
1997
1
                      << " inode_percent=" << inode_percentage << " exit threshold="
1998
1
                      << config::file_cache_exit_disk_resource_limit_mode_percent;
1999
1
        }
2000
199
    } else if (_disk_resource_limit_mode) {
2001
        // print log for disk resource limit mode running, but less frequently
2002
0
        LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path()
2003
0
                                 << " space_percent=" << space_percentage
2004
0
                                 << " inode_percent=" << inode_percentage
2005
0
                                 << " is_space_insufficient=" << is_space_insufficient
2006
0
                                 << " is_inode_insufficient=" << is_inode_insufficient
2007
0
                                 << " mode run in resource limit";
2008
0
    }
2009
201
}
2010
2011
16
void BlockFileCache::check_need_evict_cache_in_advance() {
2012
16
    if (_storage->get_type() != FileCacheStorageType::DISK) {
2013
1
        return;
2014
1
    }
2015
2016
15
    std::pair<int, int> percent;
2017
15
    int ret = disk_used_percentage(_cache_base_path, &percent);
2018
15
    if (ret != 0) {
2019
1
        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
2020
1
        return;
2021
1
    }
2022
14
    auto [space_percentage, inode_percentage] = percent;
2023
14
    int size_percentage = static_cast<int>(_cur_cache_size * 100 / _capacity);
2024
42
    auto is_insufficient = [](const int& percentage) {
2025
42
        return percentage >= config::file_cache_enter_need_evict_cache_in_advance_percent;
2026
42
    };
2027
14
    DCHECK_GE(space_percentage, 0);
2028
14
    DCHECK_LE(space_percentage, 100);
2029
14
    DCHECK_GE(inode_percentage, 0);
2030
14
    DCHECK_LE(inode_percentage, 100);
2031
    // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
2032
    // FIXME: reject with config validator
2033
14
    if (config::file_cache_enter_need_evict_cache_in_advance_percent <=
2034
14
        config::file_cache_exit_need_evict_cache_in_advance_percent) {
2035
1
        LOG_WARNING("config error, set to default value")
2036
1
                .tag("enter", config::file_cache_enter_need_evict_cache_in_advance_percent)
2037
1
                .tag("exit", config::file_cache_exit_need_evict_cache_in_advance_percent);
2038
1
        config::file_cache_enter_need_evict_cache_in_advance_percent = 78;
2039
1
        config::file_cache_exit_need_evict_cache_in_advance_percent = 75;
2040
1
    }
2041
14
    bool previous_mode = _need_evict_cache_in_advance;
2042
14
    bool is_space_insufficient = is_insufficient(space_percentage);
2043
14
    bool is_inode_insufficient = is_insufficient(inode_percentage);
2044
14
    bool is_size_insufficient = is_insufficient(size_percentage);
2045
14
    if (is_space_insufficient || is_inode_insufficient || is_size_insufficient) {
2046
9
        _need_evict_cache_in_advance = true;
2047
9
        _need_evict_cache_in_advance_metrics->set_value(1);
2048
9
    } else if (_need_evict_cache_in_advance &&
2049
5
               (space_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
2050
5
               (inode_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
2051
5
               (size_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent)) {
2052
1
        _need_evict_cache_in_advance = false;
2053
1
        _need_evict_cache_in_advance_metrics->set_value(0);
2054
1
    }
2055
14
    if (previous_mode != _need_evict_cache_in_advance) {
2056
        // add log for evict cache in advance mode switching
2057
6
        if (_need_evict_cache_in_advance) {
2058
5
            LOG(WARNING) << "Entering evict cache in advance mode: "
2059
5
                         << "file_cache=" << get_base_path()
2060
5
                         << " space_percent=" << space_percentage
2061
5
                         << " inode_percent=" << inode_percentage
2062
5
                         << " size_percent=" << size_percentage
2063
5
                         << " is_space_insufficient=" << is_space_insufficient
2064
5
                         << " is_inode_insufficient=" << is_inode_insufficient
2065
5
                         << " is_size_insufficient=" << is_size_insufficient << " enter threshold="
2066
5
                         << config::file_cache_enter_need_evict_cache_in_advance_percent;
2067
5
        } else {
2068
1
            LOG(INFO) << "Exiting evict cache in advance mode: "
2069
1
                      << "file_cache=" << get_base_path() << " space_percent=" << space_percentage
2070
1
                      << " inode_percent=" << inode_percentage
2071
1
                      << " size_percent=" << size_percentage << " exit threshold="
2072
1
                      << config::file_cache_exit_need_evict_cache_in_advance_percent;
2073
1
        }
2074
8
    } else if (_need_evict_cache_in_advance) {
2075
        // print log for evict cache in advance mode running, but less frequently
2076
4
        LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path()
2077
1
                                 << " space_percent=" << space_percentage
2078
1
                                 << " inode_percent=" << inode_percentage
2079
1
                                 << " size_percent=" << size_percentage
2080
1
                                 << " is_space_insufficient=" << is_space_insufficient
2081
1
                                 << " is_inode_insufficient=" << is_inode_insufficient
2082
1
                                 << " is_size_insufficient=" << is_size_insufficient
2083
1
                                 << " need evict cache in advance";
2084
4
    }
2085
14
}
2086
2087
223
void BlockFileCache::run_background_monitor() {
2088
223
    Thread::set_self_name("run_background_monitor");
2089
234
    while (!_close) {
2090
234
        int64_t interval_ms = config::file_cache_background_monitor_interval_ms;
2091
234
        TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_ms);
2092
234
        check_disk_resource_limit();
2093
234
        if (config::enable_evict_file_cache_in_advance) {
2094
8
            check_need_evict_cache_in_advance();
2095
226
        } else {
2096
226
            _need_evict_cache_in_advance = false;
2097
226
            _need_evict_cache_in_advance_metrics->set_value(0);
2098
226
        }
2099
2100
234
        {
2101
234
            std::unique_lock close_lock(_close_mtx);
2102
234
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2103
234
            if (_close) {
2104
223
                break;
2105
223
            }
2106
234
        }
2107
        // report
2108
11
        {
2109
11
            SCOPED_CACHE_LOCK(_mutex, this);
2110
11
            _cur_cache_size_metrics->set_value(_cur_cache_size);
2111
11
            _cur_ttl_cache_size_metrics->set_value(_cur_cache_size -
2112
11
                                                   _index_queue.get_capacity(cache_lock) -
2113
11
                                                   _normal_queue.get_capacity(cache_lock) -
2114
11
                                                   _disposable_queue.get_capacity(cache_lock));
2115
11
            _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(
2116
11
                    _ttl_queue.get_capacity(cache_lock));
2117
11
            _cur_ttl_cache_lru_queue_element_count_metrics->set_value(
2118
11
                    _ttl_queue.get_elements_num(cache_lock));
2119
11
            _cur_normal_queue_cache_size_metrics->set_value(_normal_queue.get_capacity(cache_lock));
2120
11
            _cur_normal_queue_element_count_metrics->set_value(
2121
11
                    _normal_queue.get_elements_num(cache_lock));
2122
11
            _cur_index_queue_cache_size_metrics->set_value(_index_queue.get_capacity(cache_lock));
2123
11
            _cur_index_queue_element_count_metrics->set_value(
2124
11
                    _index_queue.get_elements_num(cache_lock));
2125
11
            _cur_disposable_queue_cache_size_metrics->set_value(
2126
11
                    _disposable_queue.get_capacity(cache_lock));
2127
11
            _cur_disposable_queue_element_count_metrics->set_value(
2128
11
                    _disposable_queue.get_elements_num(cache_lock));
2129
2130
            // Update meta store write queue size if storage is FSFileCacheStorage
2131
11
            if (_storage->get_type() == FileCacheStorageType::DISK) {
2132
10
                auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get());
2133
10
                if (fs_storage != nullptr) {
2134
10
                    auto* meta_store = fs_storage->get_meta_store();
2135
10
                    if (meta_store != nullptr) {
2136
10
                        _meta_store_write_queue_size_metrics->set_value(
2137
10
                                meta_store->get_write_queue_size());
2138
10
                    }
2139
10
                }
2140
10
            }
2141
2142
11
            if (_num_read_blocks->get_value() > 0) {
2143
11
                _hit_ratio->set_value((double)_num_hit_blocks->get_value() /
2144
11
                                      (double)_num_read_blocks->get_value());
2145
11
            }
2146
11
            if (_num_read_blocks_5m && _num_read_blocks_5m->get_value() > 0) {
2147
0
                _hit_ratio_5m->set_value((double)_num_hit_blocks_5m->get_value() /
2148
0
                                         (double)_num_read_blocks_5m->get_value());
2149
0
            }
2150
11
            if (_num_read_blocks_1h && _num_read_blocks_1h->get_value() > 0) {
2151
0
                _hit_ratio_1h->set_value((double)_num_hit_blocks_1h->get_value() /
2152
0
                                         (double)_num_read_blocks_1h->get_value());
2153
0
            }
2154
2155
11
            if (_no_warmup_num_read_blocks->get_value() > 0) {
2156
11
                _no_warmup_hit_ratio->set_value((double)_no_warmup_num_hit_blocks->get_value() /
2157
11
                                                (double)_no_warmup_num_read_blocks->get_value());
2158
11
            }
2159
11
            if (_no_warmup_num_read_blocks_5m && _no_warmup_num_read_blocks_5m->get_value() > 0) {
2160
0
                _no_warmup_hit_ratio_5m->set_value(
2161
0
                        (double)_no_warmup_num_hit_blocks_5m->get_value() /
2162
0
                        (double)_no_warmup_num_read_blocks_5m->get_value());
2163
0
            }
2164
11
            if (_no_warmup_num_read_blocks_1h && _no_warmup_num_read_blocks_1h->get_value() > 0) {
2165
0
                _no_warmup_hit_ratio_1h->set_value(
2166
0
                        (double)_no_warmup_num_hit_blocks_1h->get_value() /
2167
0
                        (double)_no_warmup_num_read_blocks_1h->get_value());
2168
0
            }
2169
11
        }
2170
11
        _lru_recorder->update_shadow_queue_element_count_metrics();
2171
11
    }
2172
223
}
2173
2174
223
void BlockFileCache::run_background_gc() {
2175
223
    Thread::set_self_name("run_background_gc");
2176
223
    FileCacheKey key;
2177
223
    size_t batch_count = 0;
2178
1.28k
    while (!_close) {
2179
1.28k
        int64_t interval_ms = config::file_cache_background_gc_interval_ms;
2180
1.28k
        size_t batch_limit = config::file_cache_remove_block_qps_limit * interval_ms / 1000;
2181
1.28k
        {
2182
1.28k
            std::unique_lock close_lock(_close_mtx);
2183
1.28k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2184
1.28k
            if (_close) {
2185
223
                break;
2186
223
            }
2187
1.28k
        }
2188
2189
1.07k
        while (batch_count < batch_limit && _recycle_keys.try_dequeue(key)) {
2190
9
            int64_t duration_ns = 0;
2191
9
            Status st;
2192
9
            {
2193
9
                SCOPED_RAW_TIMER(&duration_ns);
2194
9
                st = _storage->remove(key);
2195
9
            }
2196
9
            *_storage_async_remove_latency_us << (duration_ns / 1000);
2197
2198
9
            if (!st.ok()) {
2199
0
                LOG_WARNING("").error(st);
2200
0
            }
2201
9
            batch_count++;
2202
9
        }
2203
1.06k
        *_recycle_keys_length_recorder << _recycle_keys.size_approx();
2204
1.06k
        batch_count = 0;
2205
1.06k
    }
2206
223
}
2207
2208
223
void BlockFileCache::run_background_evict_in_advance() {
2209
223
    Thread::set_self_name("run_background_evict_in_advance");
2210
223
    LOG(INFO) << "Starting background evict in advance thread";
2211
223
    int64_t batch = 0;
2212
330
    while (!_close) {
2213
330
        {
2214
330
            std::unique_lock close_lock(_close_mtx);
2215
330
            _close_cv.wait_for(
2216
330
                    close_lock,
2217
330
                    std::chrono::milliseconds(config::file_cache_evict_in_advance_interval_ms));
2218
330
            if (_close) {
2219
223
                LOG(INFO) << "Background evict in advance thread exiting due to cache closing";
2220
223
                break;
2221
223
            }
2222
330
        }
2223
107
        batch = config::file_cache_evict_in_advance_batch_bytes;
2224
2225
        // Skip if eviction not needed or too many pending recycles
2226
107
        if (!_need_evict_cache_in_advance ||
2227
107
            _recycle_keys.size_approx() >=
2228
103
                    config::file_cache_evict_in_advance_recycle_keys_num_threshold) {
2229
103
            continue;
2230
103
        }
2231
2232
4
        int64_t duration_ns = 0;
2233
4
        {
2234
4
            SCOPED_CACHE_LOCK(_mutex, this);
2235
4
            SCOPED_RAW_TIMER(&duration_ns);
2236
4
            try_evict_in_advance(batch, cache_lock);
2237
4
        }
2238
4
        *_evict_in_advance_latency_us << (duration_ns / 1000);
2239
4
    }
2240
223
}
2241
2242
223
void BlockFileCache::run_background_block_lru_update() {
2243
223
    Thread::set_self_name("run_background_block_lru_update");
2244
223
    std::vector<FileBlockSPtr> batch;
2245
260
    while (!_close) {
2246
260
        int64_t interval_ms = config::file_cache_background_block_lru_update_interval_ms;
2247
260
        size_t batch_limit =
2248
260
                config::file_cache_background_block_lru_update_qps_limit * interval_ms / 1000;
2249
260
        {
2250
260
            std::unique_lock close_lock(_close_mtx);
2251
260
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2252
260
            if (_close) {
2253
223
                break;
2254
223
            }
2255
260
        }
2256
2257
37
        batch.clear();
2258
37
        batch.reserve(batch_limit);
2259
37
        size_t drained = _need_update_lru_blocks.drain(batch_limit, &batch);
2260
37
        if (drained == 0) {
2261
32
            *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2262
32
            continue;
2263
32
        }
2264
5
        *_need_update_lru_blocks_consume_metrics << drained;
2265
2266
5
        int64_t duration_ns = 0;
2267
5
        {
2268
5
            SCOPED_CACHE_LOCK(_mutex, this);
2269
5
            SCOPED_RAW_TIMER(&duration_ns);
2270
5
            for (auto& block : batch) {
2271
5
                update_block_lru(block, cache_lock);
2272
5
            }
2273
5
        }
2274
5
        *_update_lru_blocks_latency_us << (duration_ns / 1000);
2275
5
        *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2276
5
    }
2277
223
}
2278
2279
std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>>
2280
2
BlockFileCache::get_hot_blocks_meta(const UInt128Wrapper& hash) const {
2281
2
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
2282
2
                               std::chrono::steady_clock::now().time_since_epoch())
2283
2
                               .count();
2284
2
    SCOPED_CACHE_LOCK(_mutex, this);
2285
2
    std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>> blocks_meta;
2286
2
    if (auto iter = _files.find(hash); iter != _files.end()) {
2287
5
        for (auto& pair : _files.find(hash)->second) {
2288
5
            const FileBlockCell* cell = &pair.second;
2289
5
            if (cell->file_block->cache_type() != FileCacheType::DISPOSABLE) {
2290
4
                if (cell->file_block->cache_type() == FileCacheType::TTL ||
2291
4
                    (cell->atime != 0 &&
2292
3
                     cur_time - cell->atime <
2293
3
                             get_queue(cell->file_block->cache_type()).get_hot_data_interval())) {
2294
3
                    blocks_meta.emplace_back(pair.first, cell->size(),
2295
3
                                             cell->file_block->cache_type(),
2296
3
                                             cell->file_block->expiration_time());
2297
3
                }
2298
4
            }
2299
5
        }
2300
2
    }
2301
2
    return blocks_meta;
2302
2
}
2303
2304
bool BlockFileCache::try_reserve_during_async_load(size_t size,
2305
4
                                                   std::lock_guard<std::mutex>& cache_lock) {
2306
4
    size_t removed_size = 0;
2307
4
    size_t normal_queue_size = _normal_queue.get_capacity(cache_lock);
2308
4
    size_t disposable_queue_size = _disposable_queue.get_capacity(cache_lock);
2309
4
    size_t index_queue_size = _index_queue.get_capacity(cache_lock);
2310
2311
4
    std::vector<FileBlockCell*> to_evict;
2312
4
    auto collect_eliminate_fragments = [&](LRUQueue& queue) {
2313
4
        for (const auto& [entry_key, entry_offset, entry_size] : queue) {
2314
4
            if (!_disk_resource_limit_mode || removed_size >= size) {
2315
3
                break;
2316
3
            }
2317
1
            auto* cell = get_cell(entry_key, entry_offset, cache_lock);
2318
2319
1
            DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
2320
0
                         << ", offset: " << entry_offset;
2321
2322
1
            size_t cell_size = cell->size();
2323
1
            DCHECK(entry_size == cell_size);
2324
2325
1
            if (cell->releasable()) {
2326
1
                auto& file_block = cell->file_block;
2327
2328
1
                std::lock_guard block_lock(file_block->_mutex);
2329
1
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
2330
1
                to_evict.push_back(cell);
2331
1
                removed_size += cell_size;
2332
1
            }
2333
1
        }
2334
3
    };
2335
4
    if (disposable_queue_size != 0) {
2336
0
        collect_eliminate_fragments(get_queue(FileCacheType::DISPOSABLE));
2337
0
    }
2338
4
    if (normal_queue_size != 0) {
2339
3
        collect_eliminate_fragments(get_queue(FileCacheType::NORMAL));
2340
3
    }
2341
4
    if (index_queue_size != 0) {
2342
0
        collect_eliminate_fragments(get_queue(FileCacheType::INDEX));
2343
0
    }
2344
4
    std::string reason = "async load";
2345
4
    remove_file_blocks(to_evict, cache_lock, true, reason);
2346
2347
4
    return !_disk_resource_limit_mode || removed_size >= size;
2348
4
}
2349
2350
24
void BlockFileCache::clear_need_update_lru_blocks() {
2351
24
    _need_update_lru_blocks.clear();
2352
24
    *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2353
24
}
2354
2355
5.15k
std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const UInt128Wrapper& hash) {
2356
5.15k
    std::map<size_t, FileBlockSPtr> offset_to_block;
2357
5.15k
    SCOPED_CACHE_LOCK(_mutex, this);
2358
5.15k
    if (_files.contains(hash)) {
2359
5.15k
        for (auto& [offset, cell] : _files[hash]) {
2360
5.15k
            if (cell.file_block->state() == FileBlock::State::DOWNLOADED) {
2361
5.15k
                cell.file_block->_owned_by_cached_reader = true;
2362
5.15k
                offset_to_block.emplace(offset, cell.file_block);
2363
5.15k
            }
2364
5.15k
        }
2365
5.13k
    }
2366
5.15k
    return offset_to_block;
2367
5.15k
}
2368
2369
0
void BlockFileCache::update_ttl_atime(const UInt128Wrapper& hash) {
2370
0
    SCOPED_CACHE_LOCK(_mutex, this);
2371
0
    if (auto iter = _files.find(hash); iter != _files.end()) {
2372
0
        for (auto& [_, cell] : iter->second) {
2373
0
            cell.update_atime();
2374
0
        }
2375
0
    };
2376
0
}
2377
2378
223
void BlockFileCache::run_background_lru_log_replay() {
2379
223
    Thread::set_self_name("run_background_lru_log_replay");
2380
95.6k
    while (!_close) {
2381
95.5k
        int64_t interval_ms = config::file_cache_background_lru_log_replay_interval_ms;
2382
95.5k
        {
2383
95.5k
            std::unique_lock close_lock(_close_mtx);
2384
95.5k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2385
95.5k
            if (_close) {
2386
199
                break;
2387
199
            }
2388
95.5k
        }
2389
2390
95.3k
        replay_lru_logs_once();
2391
95.3k
    }
2392
223
}
2393
2394
95.3k
size_t BlockFileCache::replay_lru_logs_once() {
2395
95.3k
    size_t replayed = 0;
2396
381k
    for (FileCacheType type : LRU_LOG_REPLAY_TYPES) {
2397
381k
        replayed += _lru_recorder->replay_queue_event(type);
2398
381k
    }
2399
2400
95.3k
    if (replayed == 0) {
2401
82.0k
        *_lru_recorder_log_replay_idle_metrics << 1;
2402
82.0k
    }
2403
2404
95.3k
    if (config::enable_evaluate_shadow_queue_diff) {
2405
0
        SCOPED_CACHE_LOCK(_mutex, this);
2406
0
        _lru_recorder->evaluate_queue_diff(_ttl_queue, "ttl", cache_lock);
2407
0
        _lru_recorder->evaluate_queue_diff(_index_queue, "index", cache_lock);
2408
0
        _lru_recorder->evaluate_queue_diff(_normal_queue, "normal", cache_lock);
2409
0
        _lru_recorder->evaluate_queue_diff(_disposable_queue, "disposable", cache_lock);
2410
0
    }
2411
95.3k
    return replayed;
2412
95.3k
}
2413
2414
13
void BlockFileCache::dump_lru_queues(bool force) {
2415
13
    std::unique_lock dump_lock(_dump_lru_queues_mtx);
2416
13
    if (config::file_cache_background_lru_dump_tail_record_num > 0 &&
2417
13
        !ExecEnv::GetInstance()->get_is_upgrading()) {
2418
13
        _lru_dumper->dump_queue("disposable", force);
2419
13
        _lru_dumper->dump_queue("normal", force);
2420
13
        _lru_dumper->dump_queue("index", force);
2421
13
        _lru_dumper->dump_queue("ttl", force);
2422
13
        _lru_dumper->set_first_dump_done();
2423
13
    }
2424
13
}
2425
2426
223
void BlockFileCache::run_background_lru_dump() {
2427
223
    Thread::set_self_name("run_background_lru_dump");
2428
236
    while (!_close) {
2429
236
        int64_t interval_ms = config::file_cache_background_lru_dump_interval_ms;
2430
236
        {
2431
236
            std::unique_lock close_lock(_close_mtx);
2432
236
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2433
236
            if (_close) {
2434
223
                break;
2435
223
            }
2436
236
        }
2437
13
        dump_lru_queues(false);
2438
13
    }
2439
223
}
2440
2441
223
void BlockFileCache::restore_lru_queues_from_disk(std::lock_guard<std::mutex>& cache_lock) {
2442
    // keep this order coz may be duplicated in different queue, we use the first appearence
2443
223
    _lru_dumper->restore_queue(_ttl_queue, "ttl", cache_lock);
2444
223
    _lru_dumper->restore_queue(_index_queue, "index", cache_lock);
2445
223
    _lru_dumper->restore_queue(_normal_queue, "normal", cache_lock);
2446
223
    _lru_dumper->restore_queue(_disposable_queue, "disposable", cache_lock);
2447
223
}
2448
2449
0
std::map<std::string, double> BlockFileCache::get_stats() {
2450
0
    std::map<std::string, double> stats;
2451
0
    stats["hits_ratio"] = (double)_hit_ratio->get_value();
2452
0
    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
2453
0
    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
2454
2455
0
    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
2456
0
    stats["index_queue_curr_size"] = (double)_cur_index_queue_cache_size_metrics->get_value();
2457
0
    stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
2458
0
    stats["index_queue_curr_elements"] =
2459
0
            (double)_cur_index_queue_element_count_metrics->get_value();
2460
2461
0
    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
2462
0
    stats["ttl_queue_curr_size"] = (double)_cur_ttl_cache_lru_queue_cache_size_metrics->get_value();
2463
0
    stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
2464
0
    stats["ttl_queue_curr_elements"] =
2465
0
            (double)_cur_ttl_cache_lru_queue_element_count_metrics->get_value();
2466
2467
0
    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
2468
0
    stats["normal_queue_curr_size"] = (double)_cur_normal_queue_cache_size_metrics->get_value();
2469
0
    stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
2470
0
    stats["normal_queue_curr_elements"] =
2471
0
            (double)_cur_normal_queue_element_count_metrics->get_value();
2472
2473
0
    stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
2474
0
    stats["disposable_queue_curr_size"] =
2475
0
            (double)_cur_disposable_queue_cache_size_metrics->get_value();
2476
0
    stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
2477
0
    stats["disposable_queue_curr_elements"] =
2478
0
            (double)_cur_disposable_queue_element_count_metrics->get_value();
2479
2480
0
    stats["lru_recorder_index_shadow_queue_curr_elements"] =
2481
0
            (double)_lru_recorder_shadow_queue_element_count_metrics[FileCacheType::INDEX]
2482
0
                    ->get_value();
2483
0
    stats["lru_recorder_ttl_shadow_queue_curr_elements"] =
2484
0
            (double)_lru_recorder_shadow_queue_element_count_metrics[FileCacheType::TTL]
2485
0
                    ->get_value();
2486
0
    stats["lru_recorder_normal_shadow_queue_curr_elements"] =
2487
0
            (double)_lru_recorder_shadow_queue_element_count_metrics[FileCacheType::NORMAL]
2488
0
                    ->get_value();
2489
0
    stats["lru_recorder_disposable_shadow_queue_curr_elements"] =
2490
0
            (double)_lru_recorder_shadow_queue_element_count_metrics[FileCacheType::DISPOSABLE]
2491
0
                    ->get_value();
2492
2493
0
    stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance;
2494
0
    stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode;
2495
2496
0
    stats["total_removed_counts"] = (double)_num_removed_blocks->get_value();
2497
0
    stats["total_hit_counts"] = (double)_num_hit_blocks->get_value();
2498
0
    stats["total_read_counts"] = (double)_num_read_blocks->get_value();
2499
2500
0
    stats["total_read_size"] = (double)_total_read_size_metrics->get_value();
2501
0
    stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value();
2502
0
    stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value();
2503
2504
0
    return stats;
2505
0
}
2506
2507
// for be UTs
2508
176
std::map<std::string, double> BlockFileCache::get_stats_unsafe() {
2509
176
    std::map<std::string, double> stats;
2510
176
    stats["hits_ratio"] = (double)_hit_ratio->get_value();
2511
176
    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
2512
176
    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
2513
2514
176
    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
2515
176
    stats["index_queue_curr_size"] = (double)_index_queue.get_capacity_unsafe();
2516
176
    stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
2517
176
    stats["index_queue_curr_elements"] = (double)_index_queue.get_elements_num_unsafe();
2518
2519
176
    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
2520
176
    stats["ttl_queue_curr_size"] = (double)_ttl_queue.get_capacity_unsafe();
2521
176
    stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
2522
176
    stats["ttl_queue_curr_elements"] = (double)_ttl_queue.get_elements_num_unsafe();
2523
2524
176
    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
2525
176
    stats["normal_queue_curr_size"] = (double)_normal_queue.get_capacity_unsafe();
2526
176
    stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
2527
176
    stats["normal_queue_curr_elements"] = (double)_normal_queue.get_elements_num_unsafe();
2528
2529
176
    stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
2530
176
    stats["disposable_queue_curr_size"] = (double)_disposable_queue.get_capacity_unsafe();
2531
176
    stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
2532
176
    stats["disposable_queue_curr_elements"] = (double)_disposable_queue.get_elements_num_unsafe();
2533
176
    stats["lru_recorder_index_shadow_queue_curr_elements"] =
2534
176
            (double)_lru_recorder_shadow_queue_element_count_metrics[FileCacheType::INDEX]
2535
176
                    ->get_value();
2536
176
    stats["lru_recorder_ttl_shadow_queue_curr_elements"] =
2537
176
            (double)_lru_recorder_shadow_queue_element_count_metrics[FileCacheType::TTL]
2538
176
                    ->get_value();
2539
176
    stats["lru_recorder_normal_shadow_queue_curr_elements"] =
2540
176
            (double)_lru_recorder_shadow_queue_element_count_metrics[FileCacheType::NORMAL]
2541
176
                    ->get_value();
2542
176
    stats["lru_recorder_disposable_shadow_queue_curr_elements"] =
2543
176
            (double)_lru_recorder_shadow_queue_element_count_metrics[FileCacheType::DISPOSABLE]
2544
176
                    ->get_value();
2545
2546
176
    stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance;
2547
176
    stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode;
2548
2549
176
    stats["total_removed_counts"] = (double)_num_removed_blocks->get_value();
2550
176
    stats["total_hit_counts"] = (double)_num_hit_blocks->get_value();
2551
176
    stats["total_read_counts"] = (double)_num_read_blocks->get_value();
2552
2553
176
    stats["total_read_size"] = (double)_total_read_size_metrics->get_value();
2554
176
    stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value();
2555
176
    stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value();
2556
2557
176
    return stats;
2558
176
}
2559
2560
template void BlockFileCache::remove(FileBlockSPtr file_block,
2561
                                     std::lock_guard<std::mutex>& cache_lock,
2562
                                     std::lock_guard<std::mutex>& block_lock, bool sync);
2563
2564
1
Status BlockFileCache::report_file_cache_inconsistency(std::vector<std::string>& results) {
2565
1
    InconsistencyContext inconsistency_context;
2566
1
    RETURN_IF_ERROR(check_file_cache_consistency(inconsistency_context));
2567
1
    auto n = inconsistency_context.types.size();
2568
1
    results.reserve(n);
2569
1
    for (size_t i = 0; i < n; i++) {
2570
0
        std::string result;
2571
0
        result += "File cache info in manager:\n";
2572
0
        result += inconsistency_context.infos_in_manager[i].to_string();
2573
0
        result += "File cache info in storage:\n";
2574
0
        result += inconsistency_context.infos_in_storage[i].to_string();
2575
0
        result += inconsistency_context.types[i].to_string();
2576
0
        result += "\n";
2577
0
        results.push_back(std::move(result));
2578
0
    }
2579
1
    return Status::OK();
2580
1
}
2581
2582
1
Status BlockFileCache::check_file_cache_consistency(InconsistencyContext& inconsistency_context) {
2583
1
    std::lock_guard<std::mutex> cache_lock(_mutex);
2584
1
    std::vector<FileCacheInfo> infos_in_storage;
2585
1
    RETURN_IF_ERROR(_storage->get_file_cache_infos(infos_in_storage, cache_lock));
2586
1
    std::unordered_set<AccessKeyAndOffset, KeyAndOffsetHash> confirmed_blocks;
2587
1
    for (const auto& info_in_storage : infos_in_storage) {
2588
0
        confirmed_blocks.insert({info_in_storage.hash, info_in_storage.offset});
2589
0
        auto* cell = get_cell(info_in_storage.hash, info_in_storage.offset, cache_lock);
2590
0
        if (cell == nullptr || cell->file_block == nullptr) {
2591
0
            inconsistency_context.infos_in_manager.emplace_back();
2592
0
            inconsistency_context.infos_in_storage.push_back(info_in_storage);
2593
0
            inconsistency_context.types.emplace_back(InconsistencyType::NOT_LOADED);
2594
0
            continue;
2595
0
        }
2596
0
        FileCacheInfo info_in_manager {
2597
0
                .hash = info_in_storage.hash,
2598
0
                .expiration_time = cell->file_block->expiration_time(),
2599
0
                .size = cell->size(),
2600
0
                .offset = info_in_storage.offset,
2601
0
                .is_tmp = cell->file_block->state() == FileBlock::State::DOWNLOADING,
2602
0
                .cache_type = cell->file_block->cache_type()};
2603
0
        InconsistencyType inconsistent_type;
2604
0
        if (info_in_storage.is_tmp != info_in_manager.is_tmp) {
2605
0
            inconsistent_type |= InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE;
2606
0
        }
2607
0
        size_t expected_size =
2608
0
                info_in_manager.is_tmp ? cell->dowloading_size() : info_in_manager.size;
2609
0
        if (info_in_storage.size != expected_size) {
2610
0
            inconsistent_type |= InconsistencyType::SIZE_INCONSISTENT;
2611
0
        }
2612
        // Only if it is not a tmp file need we check the cache type.
2613
0
        if ((inconsistent_type & InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE) == 0 &&
2614
0
            info_in_storage.cache_type != info_in_manager.cache_type) {
2615
0
            inconsistent_type |= InconsistencyType::CACHE_TYPE_INCONSISTENT;
2616
0
        }
2617
0
        if (info_in_storage.expiration_time != info_in_manager.expiration_time) {
2618
0
            inconsistent_type |= InconsistencyType::EXPIRATION_TIME_INCONSISTENT;
2619
0
        }
2620
0
        if (inconsistent_type != InconsistencyType::NONE) {
2621
0
            inconsistency_context.infos_in_manager.push_back(info_in_manager);
2622
0
            inconsistency_context.infos_in_storage.push_back(info_in_storage);
2623
0
            inconsistency_context.types.push_back(inconsistent_type);
2624
0
        }
2625
0
    }
2626
2627
1
    for (const auto& [hash, offset_to_cell] : _files) {
2628
0
        for (const auto& [offset, cell] : offset_to_cell) {
2629
0
            if (confirmed_blocks.contains({hash, offset})) {
2630
0
                continue;
2631
0
            }
2632
0
            const auto& block = cell.file_block;
2633
0
            inconsistency_context.infos_in_manager.emplace_back(
2634
0
                    hash, block->expiration_time(), cell.size(), offset,
2635
0
                    cell.file_block->state() == FileBlock::State::DOWNLOADING, block->cache_type());
2636
0
            inconsistency_context.infos_in_storage.emplace_back();
2637
0
            inconsistency_context.types.emplace_back(InconsistencyType::MISSING_IN_STORAGE);
2638
0
        }
2639
0
    }
2640
1
    return Status::OK();
2641
1
}
2642
2643
} // namespace doris::io