Coverage Report

Created: 2026-06-08 06:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/cache/block_file_cache.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileCache.cpp
19
// and modified by Doris
20
21
#include "io/cache/block_file_cache.h"
22
23
#include <gen_cpp/file_cache.pb.h>
24
25
#include <cstdio>
26
#include <exception>
27
#include <fstream>
28
#include <unordered_set>
29
30
#include "common/status.h"
31
#include "cpp/sync_point.h"
32
#include "runtime/exec_env.h"
33
34
#if defined(__APPLE__)
35
#include <sys/mount.h>
36
#else
37
#include <sys/statfs.h>
38
#endif
39
40
#include <chrono> // IWYU pragma: keep
41
#include <mutex>
42
#include <ranges>
43
44
#include "common/cast_set.h"
45
#include "common/config.h"
46
#include "common/logging.h"
47
#include "core/uint128.h"
48
#include "exec/common/sip_hash.h"
49
#include "io/cache/block_file_cache_ttl_mgr.h"
50
#include "io/cache/file_block.h"
51
#include "io/cache/file_cache_common.h"
52
#include "io/cache/fs_file_cache_storage.h"
53
#include "io/cache/mem_file_cache_storage.h"
54
#include "runtime/runtime_profile.h"
55
#include "util/concurrency_stats.h"
56
#include "util/stack_util.h"
57
#include "util/stopwatch.hpp"
58
#include "util/thread.h"
59
#include "util/time.h"
60
namespace doris::io {
61
62
// Insert a block pointer into one shard while swallowing allocation failures.
63
15
bool NeedUpdateLRUBlocks::insert(FileBlockSPtr block) {
64
15
    return insert_with_result(std::move(block)) == InsertResult::INSERTED;
65
15
}
66
67
193k
NeedUpdateLRUBlocks::InsertResult NeedUpdateLRUBlocks::insert_with_result(FileBlockSPtr block) {
68
193k
    if (!block) {
69
1
        return InsertResult::IGNORED;
70
1
    }
71
193k
    bool reserved = false;
72
193k
    try {
73
193k
        auto* raw_ptr = block.get();
74
193k
        auto idx = shard_index(raw_ptr);
75
193k
        auto& shard = _shards[idx];
76
193k
        std::lock_guard lock(shard.mutex);
77
193k
        if (shard.entries.contains(raw_ptr)) {
78
191k
            return InsertResult::DUPLICATED;
79
191k
        }
80
1.56k
        if (!try_reserve_slot()) {
81
1
            _dropped.fetch_add(1, std::memory_order_relaxed);
82
1
            return InsertResult::DROPPED;
83
1
        }
84
1.56k
        reserved = true;
85
1.56k
        auto [_, inserted] = shard.entries.emplace(raw_ptr, std::move(block));
86
1.56k
        if (!inserted) {
87
0
            _size.fetch_sub(1, std::memory_order_relaxed);
88
0
            reserved = false;
89
0
            return InsertResult::DUPLICATED;
90
0
        }
91
1.56k
        reserved = false;
92
1.56k
        return InsertResult::INSERTED;
93
1.56k
    } catch (const std::exception& e) {
94
0
        if (reserved) {
95
0
            _size.fetch_sub(1, std::memory_order_relaxed);
96
0
        }
97
0
        LOG(WARNING) << "Failed to enqueue block for LRU update: " << e.what();
98
0
    } catch (...) {
99
0
        if (reserved) {
100
0
            _size.fetch_sub(1, std::memory_order_relaxed);
101
0
        }
102
0
        LOG(WARNING) << "Failed to enqueue block for LRU update: unknown error";
103
0
    }
104
0
    return InsertResult::IGNORED;
105
193k
}
106
107
// Drain up to `limit` unique blocks to the caller, keeping the structure consistent on failures.
108
3.92k
size_t NeedUpdateLRUBlocks::drain(size_t limit, std::vector<FileBlockSPtr>* output) {
109
3.92k
    if (limit == 0 || output == nullptr) {
110
2
        return 0;
111
2
    }
112
3.92k
    size_t drained = 0;
113
3.92k
    try {
114
3.92k
        output->reserve(output->size() + std::min(limit, size()));
115
248k
        for (auto& shard : _shards) {
116
248k
            if (drained >= limit) {
117
2
                break;
118
2
            }
119
248k
            std::lock_guard lock(shard.mutex);
120
248k
            auto it = shard.entries.begin();
121
248k
            size_t shard_drained = 0;
122
248k
            while (it != shard.entries.end() && drained + shard_drained < limit) {
123
11
                output->emplace_back(std::move(it->second));
124
11
                it = shard.entries.erase(it);
125
11
                ++shard_drained;
126
11
            }
127
248k
            if (shard_drained > 0) {
128
8
                _size.fetch_sub(shard_drained, std::memory_order_relaxed);
129
8
                drained += shard_drained;
130
8
            }
131
248k
        }
132
3.92k
    } catch (const std::exception& e) {
133
0
        LOG(WARNING) << "Failed to drain LRU update blocks: " << e.what();
134
0
    } catch (...) {
135
0
        LOG(WARNING) << "Failed to drain LRU update blocks: unknown error";
136
0
    }
137
3.92k
    return drained;
138
3.92k
}
139
140
// Remove every pending block, guarding against unexpected exceptions.
141
32
void NeedUpdateLRUBlocks::clear() {
142
32
    try {
143
2.04k
        for (auto& shard : _shards) {
144
2.04k
            std::lock_guard lock(shard.mutex);
145
2.04k
            if (!shard.entries.empty()) {
146
2
                auto removed = shard.entries.size();
147
2
                shard.entries.clear();
148
2
                _size.fetch_sub(removed, std::memory_order_relaxed);
149
2
            }
150
2.04k
        }
151
32
    } catch (const std::exception& e) {
152
0
        LOG(WARNING) << "Failed to clear LRU update blocks: " << e.what();
153
0
    } catch (...) {
154
0
        LOG(WARNING) << "Failed to clear LRU update blocks: unknown error";
155
0
    }
156
32
}
157
158
193k
size_t NeedUpdateLRUBlocks::shard_index(FileBlock* ptr) const {
159
193k
    DCHECK(ptr != nullptr);
160
193k
    return std::hash<FileBlock*> {}(ptr)&kShardMask;
161
193k
}
162
163
1.56k
bool NeedUpdateLRUBlocks::try_reserve_slot() {
164
1.56k
    size_t cur_size = _size.load(std::memory_order_relaxed);
165
1.56k
    while (cur_size < _hard_cap) {
166
1.56k
        if (_size.compare_exchange_weak(cur_size, cur_size + 1, std::memory_order_relaxed,
167
1.56k
                                        std::memory_order_relaxed)) {
168
1.56k
            return true;
169
1.56k
        }
170
1.56k
    }
171
1
    return false;
172
1.56k
}
173
174
namespace {
175
176
struct QueueConsumePlan {
177
    int64_t interval_ms = 0;
178
    size_t batch_limit = 0;
179
};
180
181
constexpr size_t kLruLogReplayAdaptiveLowWatermark = 50'000;
182
constexpr size_t kLruLogReplayAdaptiveHighWatermark = 300'000;
183
constexpr int64_t kLruLogReplayAdaptiveMinIntervalMs = 100;
184
constexpr size_t kLruLogReplayAdaptiveMaxBatchPerType = 25'000;
185
186
constexpr size_t kBlockLruUpdateAdaptiveLowWatermark = 10'000;
187
constexpr size_t kBlockLruUpdateAdaptiveHighWatermark = 50'000;
188
constexpr int64_t kBlockLruUpdateAdaptiveMinIntervalMs = 500;
189
constexpr size_t kBlockLruUpdateAdaptiveMaxBatch = 10'000;
190
constexpr size_t kBlockLruUpdateLockSliceBatch = 500;
191
constexpr size_t kNeedUpdateLruBlocksHardCap = 100'000;
192
constexpr size_t kLruRecorderLogQueueHardCap = 500'000;
193
194
12.2k
int64_t positive_or_default(int64_t value, int64_t default_value) {
195
12.2k
    return value > 0 ? value : default_value;
196
12.2k
}
197
198
QueueConsumePlan build_queue_consume_plan(size_t backlog, int64_t base_interval_ms,
199
                                          size_t base_batch, size_t low_watermark,
200
                                          size_t high_watermark, int64_t min_interval_ms,
201
4.06k
                                          size_t max_batch) {
202
4.06k
    QueueConsumePlan plan;
203
4.06k
    plan.interval_ms = positive_or_default(base_interval_ms, 1);
204
4.06k
    plan.batch_limit = base_batch;
205
206
4.07k
    if (backlog < low_watermark) {
207
4.07k
        return plan;
208
4.07k
    }
209
210
18.4E
    const int64_t min_interval = positive_or_default(min_interval_ms, 1);
211
18.4E
    const size_t capped_max_batch = std::max<size_t>(max_batch, 1);
212
18.4E
    if (backlog < high_watermark) {
213
0
        plan.interval_ms = std::max<int64_t>(min_interval, plan.interval_ms / 2);
214
0
        plan.batch_limit = std::min(capped_max_batch, std::max<size_t>(base_batch * 2, 1));
215
0
        return plan;
216
0
    }
217
218
18.4E
    plan.interval_ms = min_interval;
219
18.4E
    plan.batch_limit = capped_max_batch;
220
18.4E
    return plan;
221
18.4E
}
222
223
4.14k
QueueConsumePlan build_lru_log_replay_plan(size_t backlog) {
224
4.14k
    const auto base_interval =
225
4.14k
            positive_or_default(config::file_cache_background_lru_log_replay_interval_ms, 1);
226
4.14k
    if (backlog < kLruLogReplayAdaptiveLowWatermark) {
227
4.14k
        return {base_interval, 0};
228
4.14k
    }
229
1
    return build_queue_consume_plan(
230
1
            backlog, base_interval, kLruLogReplayAdaptiveMaxBatchPerType / 4,
231
1
            kLruLogReplayAdaptiveLowWatermark, kLruLogReplayAdaptiveHighWatermark,
232
1
            kLruLogReplayAdaptiveMinIntervalMs, kLruLogReplayAdaptiveMaxBatchPerType);
233
4.14k
}
234
235
4.07k
QueueConsumePlan build_block_lru_update_plan(size_t backlog) {
236
4.07k
    const auto base_interval =
237
4.07k
            positive_or_default(config::file_cache_background_block_lru_update_interval_ms, 1);
238
4.07k
    const size_t base_batch =
239
4.07k
            std::max<int64_t>(config::file_cache_background_block_lru_update_qps_limit, 0) *
240
4.07k
            static_cast<size_t>(base_interval) / 1000;
241
4.07k
    if (base_batch == 0) {
242
0
        return {base_interval, 0};
243
0
    }
244
4.07k
    return build_queue_consume_plan(
245
4.07k
            backlog, base_interval, base_batch, kBlockLruUpdateAdaptiveLowWatermark,
246
4.07k
            kBlockLruUpdateAdaptiveHighWatermark, kBlockLruUpdateAdaptiveMinIntervalMs,
247
4.07k
            kBlockLruUpdateAdaptiveMaxBatch);
248
4.07k
}
249
250
} // namespace
251
252
BlockFileCache::BlockFileCache(const std::string& cache_base_path,
253
                               const FileCacheSettings& cache_settings)
254
180
        : _cache_base_path(cache_base_path),
255
180
          _capacity(cache_settings.capacity),
256
180
          _max_file_block_size(cache_settings.max_file_block_size),
257
180
          _need_update_lru_blocks(kNeedUpdateLruBlocksHardCap) {
258
180
    _cur_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(_cache_base_path.c_str(),
259
180
                                                                     "file_cache_cache_size", 0);
260
180
    _cache_capacity_metrics = std::make_shared<bvar::Status<size_t>>(
261
180
            _cache_base_path.c_str(), "file_cache_capacity", _capacity);
262
180
    _cur_ttl_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
263
180
            _cache_base_path.c_str(), "file_cache_ttl_cache_size", 0);
264
180
    _cur_normal_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
265
180
            _cache_base_path.c_str(), "file_cache_normal_queue_element_count", 0);
266
180
    _cur_ttl_cache_lru_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
267
180
            _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_size", 0);
268
180
    _cur_ttl_cache_lru_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
269
180
            _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_element_count", 0);
270
180
    _cur_normal_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
271
180
            _cache_base_path.c_str(), "file_cache_normal_queue_cache_size", 0);
272
180
    _cur_index_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
273
180
            _cache_base_path.c_str(), "file_cache_index_queue_element_count", 0);
274
180
    _cur_index_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
275
180
            _cache_base_path.c_str(), "file_cache_index_queue_cache_size", 0);
276
180
    _cur_disposable_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
277
180
            _cache_base_path.c_str(), "file_cache_disposable_queue_element_count", 0);
278
180
    _cur_disposable_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
279
180
            _cache_base_path.c_str(), "file_cache_disposable_queue_cache_size", 0);
280
281
180
    _queue_evict_size_metrics[0] = std::make_shared<bvar::Adder<size_t>>(
282
180
            _cache_base_path.c_str(), "file_cache_index_queue_evict_size");
283
180
    _queue_evict_size_metrics[1] = std::make_shared<bvar::Adder<size_t>>(
284
180
            _cache_base_path.c_str(), "file_cache_normal_queue_evict_size");
285
180
    _queue_evict_size_metrics[2] = std::make_shared<bvar::Adder<size_t>>(
286
180
            _cache_base_path.c_str(), "file_cache_disposable_queue_evict_size");
287
180
    _queue_evict_size_metrics[3] = std::make_shared<bvar::Adder<size_t>>(
288
180
            _cache_base_path.c_str(), "file_cache_ttl_cache_evict_size");
289
180
    _total_evict_size_metrics = std::make_shared<bvar::Adder<size_t>>(
290
180
            _cache_base_path.c_str(), "file_cache_total_evict_size");
291
180
    _total_read_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
292
180
                                                                     "file_cache_total_read_size");
293
180
    _total_hit_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
294
180
                                                                    "file_cache_total_hit_size");
295
180
    _gc_evict_bytes_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
296
180
                                                                    "file_cache_gc_evict_bytes");
297
180
    _gc_evict_count_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
298
180
                                                                    "file_cache_gc_evict_count");
299
300
180
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
301
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
302
180
                                                  "file_cache_evict_by_time_disposable_to_normal");
303
180
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
304
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
305
180
                                                  "file_cache_evict_by_time_disposable_to_index");
306
180
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
307
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
308
180
                                                  "file_cache_evict_by_time_disposable_to_ttl");
309
180
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
310
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
311
180
                                                  "file_cache_evict_by_time_normal_to_disposable");
312
180
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
313
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
314
180
                                                  "file_cache_evict_by_time_normal_to_index");
315
180
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
316
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
317
180
                                                  "file_cache_evict_by_time_normal_to_ttl");
318
180
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
319
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
320
180
                                                  "file_cache_evict_by_time_index_to_disposable");
321
180
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
322
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
323
180
                                                  "file_cache_evict_by_time_index_to_normal");
324
180
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
325
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
326
180
                                                  "file_cache_evict_by_time_index_to_ttl");
327
180
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
328
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
329
180
                                                  "file_cache_evict_by_time_ttl_to_disposable");
330
180
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
331
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
332
180
                                                  "file_cache_evict_by_time_ttl_to_normal");
333
180
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
334
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
335
180
                                                  "file_cache_evict_by_time_ttl_to_index");
336
337
180
    _evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE] =
338
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
339
180
                                                  "file_cache_evict_by_self_lru_disposable");
340
180
    _evict_by_self_lru_metrics_matrix[FileCacheType::NORMAL] =
341
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
342
180
                                                  "file_cache_evict_by_self_lru_normal");
343
180
    _evict_by_self_lru_metrics_matrix[FileCacheType::INDEX] = std::make_shared<bvar::Adder<size_t>>(
344
180
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_index");
345
180
    _evict_by_self_lru_metrics_matrix[FileCacheType::TTL] = std::make_shared<bvar::Adder<size_t>>(
346
180
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_ttl");
347
348
180
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
349
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
350
180
                                                  "file_cache_evict_by_size_disposable_to_normal");
351
180
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
352
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
353
180
                                                  "file_cache_evict_by_size_disposable_to_index");
354
180
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
355
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
356
180
                                                  "file_cache_evict_by_size_disposable_to_ttl");
357
180
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
358
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
359
180
                                                  "file_cache_evict_by_size_normal_to_disposable");
360
180
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
361
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
362
180
                                                  "file_cache_evict_by_size_normal_to_index");
363
180
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
364
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
365
180
                                                  "file_cache_evict_by_size_normal_to_ttl");
366
180
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
367
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
368
180
                                                  "file_cache_evict_by_size_index_to_disposable");
369
180
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
370
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
371
180
                                                  "file_cache_evict_by_size_index_to_normal");
372
180
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
373
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
374
180
                                                  "file_cache_evict_by_size_index_to_ttl");
375
180
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
376
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
377
180
                                                  "file_cache_evict_by_size_ttl_to_disposable");
378
180
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
379
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
380
180
                                                  "file_cache_evict_by_size_ttl_to_normal");
381
180
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
382
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
383
180
                                                  "file_cache_evict_by_size_ttl_to_index");
384
385
180
    _evict_by_try_release = std::make_shared<bvar::Adder<size_t>>(
386
180
            _cache_base_path.c_str(), "file_cache_evict_by_try_release");
387
388
180
    _num_read_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
389
180
                                                             "file_cache_num_read_blocks");
390
180
    _num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
391
180
                                                            "file_cache_num_hit_blocks");
392
180
    _num_removed_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
393
180
                                                                "file_cache_num_removed_blocks");
394
395
180
    _no_warmup_num_read_blocks = std::make_shared<bvar::Adder<size_t>>(
396
180
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks");
397
180
    _no_warmup_num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(
398
180
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks");
399
400
#ifndef BE_TEST
401
    _num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
402
            _cache_base_path.c_str(), "file_cache_num_hit_blocks_5m", _num_hit_blocks.get(), 300);
403
    _num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
404
            _cache_base_path.c_str(), "file_cache_num_read_blocks_5m", _num_read_blocks.get(), 300);
405
    _num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
406
            _cache_base_path.c_str(), "file_cache_num_hit_blocks_1h", _num_hit_blocks.get(), 3600);
407
    _num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
408
            _cache_base_path.c_str(), "file_cache_num_read_blocks_1h", _num_read_blocks.get(),
409
            3600);
410
    _no_warmup_num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
411
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_5m",
412
            _no_warmup_num_hit_blocks.get(), 300);
413
    _no_warmup_num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
414
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_5m",
415
            _no_warmup_num_read_blocks.get(), 300);
416
    _no_warmup_num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
417
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_1h",
418
            _no_warmup_num_hit_blocks.get(), 3600);
419
    _no_warmup_num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
420
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_1h",
421
            _no_warmup_num_read_blocks.get(), 3600);
422
#endif
423
424
180
    _hit_ratio = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
425
180
                                                        "file_cache_hit_ratio", 0.0);
426
180
    _hit_ratio_5m = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
427
180
                                                           "file_cache_hit_ratio_5m", 0.0);
428
180
    _hit_ratio_1h = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
429
180
                                                           "file_cache_hit_ratio_1h", 0.0);
430
431
180
    _no_warmup_hit_ratio = std::make_shared<bvar::Status<double>>(
432
180
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio", 0.0);
433
180
    _no_warmup_hit_ratio_5m = std::make_shared<bvar::Status<double>>(
434
180
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_5m", 0.0);
435
180
    _no_warmup_hit_ratio_1h = std::make_shared<bvar::Status<double>>(
436
180
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_1h", 0.0);
437
438
180
    _disk_limit_mode_metrics = std::make_shared<bvar::Status<size_t>>(
439
180
            _cache_base_path.c_str(), "file_cache_disk_limit_mode", 0);
440
180
    _need_evict_cache_in_advance_metrics = std::make_shared<bvar::Status<size_t>>(
441
180
            _cache_base_path.c_str(), "file_cache_need_evict_cache_in_advance", 0);
442
180
    _meta_store_write_queue_size_metrics = std::make_shared<bvar::Status<size_t>>(
443
180
            _cache_base_path.c_str(), "file_cache_meta_store_write_queue_size", 0);
444
445
180
    _cache_lock_wait_time_us = std::make_shared<bvar::LatencyRecorder>(
446
180
            _cache_base_path.c_str(), "file_cache_cache_lock_wait_time_us");
447
180
    _get_or_set_latency_us = std::make_shared<bvar::LatencyRecorder>(
448
180
            _cache_base_path.c_str(), "file_cache_get_or_set_latency_us");
449
180
    _storage_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
450
180
            _cache_base_path.c_str(), "file_cache_storage_sync_remove_latency_us");
451
180
    _storage_retry_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
452
180
            _cache_base_path.c_str(), "file_cache_storage_retry_sync_remove_latency_us");
453
180
    _storage_async_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
454
180
            _cache_base_path.c_str(), "file_cache_storage_async_remove_latency_us");
455
180
    _evict_in_advance_latency_us = std::make_shared<bvar::LatencyRecorder>(
456
180
            _cache_base_path.c_str(), "file_cache_evict_in_advance_latency_us");
457
180
    _lru_dump_latency_us = std::make_shared<bvar::LatencyRecorder>(
458
180
            _cache_base_path.c_str(), "file_cache_lru_dump_latency_us");
459
180
    _recycle_keys_length_metrics = std::make_shared<bvar::Status<size_t>>(
460
180
            _cache_base_path.c_str(), "file_cache_recycle_keys_length", 0);
461
180
    _need_update_lru_blocks_length_metrics = std::make_shared<bvar::Status<size_t>>(
462
180
            _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_length", 0);
463
180
    _need_update_lru_blocks_dropped_metrics = std::make_shared<bvar::Adder<size_t>>(
464
180
            _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_dropped");
465
180
    _lru_recorder_log_queue_length_metrics = std::make_shared<bvar::Status<size_t>>(
466
180
            _cache_base_path.c_str(), "file_cache_lru_recorder_log_queue_length", 0);
467
180
    _lru_recorder_log_queue_dropped_metrics = std::make_shared<bvar::Adder<size_t>>(
468
180
            _cache_base_path.c_str(), "file_cache_lru_recorder_log_queue_dropped");
469
180
    _update_lru_blocks_latency_us = std::make_shared<bvar::LatencyRecorder>(
470
180
            _cache_base_path.c_str(), "file_cache_update_lru_blocks_latency_us");
471
180
    _ttl_gc_latency_us = std::make_shared<bvar::LatencyRecorder>(_cache_base_path.c_str(),
472
180
                                                                 "file_cache_ttl_gc_latency_us");
473
180
    _shadow_queue_levenshtein_distance = std::make_shared<bvar::LatencyRecorder>(
474
180
            _cache_base_path.c_str(), "file_cache_shadow_queue_levenshtein_distance");
475
476
180
    _disposable_queue = LRUQueue(cache_settings.disposable_queue_size,
477
180
                                 cache_settings.disposable_queue_elements, 60 * 60);
478
180
    _index_queue = LRUQueue(cache_settings.index_queue_size, cache_settings.index_queue_elements,
479
180
                            7 * 24 * 60 * 60);
480
180
    _normal_queue = LRUQueue(cache_settings.query_queue_size, cache_settings.query_queue_elements,
481
180
                             24 * 60 * 60);
482
180
    _ttl_queue = LRUQueue(cache_settings.ttl_queue_size, cache_settings.ttl_queue_elements,
483
180
                          std::numeric_limits<int>::max());
484
485
180
    _lru_recorder = std::make_unique<LRUQueueRecorder>(this, kLruRecorderLogQueueHardCap);
486
180
    _lru_dumper = std::make_unique<CacheLRUDumper>(this, _lru_recorder.get());
487
180
    if (cache_settings.storage == "memory") {
488
21
        _storage = std::make_unique<MemFileCacheStorage>();
489
21
        _cache_base_path = "memory";
490
159
    } else {
491
159
        _storage = std::make_unique<FSFileCacheStorage>();
492
159
    }
493
494
180
    LOG(INFO) << "file cache path= " << _cache_base_path << " " << cache_settings.to_string();
495
180
}
496
497
14.5k
UInt128Wrapper BlockFileCache::hash(const std::string& path) {
498
14.5k
    uint128_t value;
499
14.5k
    sip_hash128(path.data(), path.size(), reinterpret_cast<char*>(&value));
500
14.5k
    return UInt128Wrapper(value);
501
14.5k
}
502
503
537k
bool BlockFileCache::is_memory_storage() const {
504
537k
    DCHECK(_storage != nullptr);
505
537k
    return _storage->get_type() == FileCacheStorageType::MEMORY;
506
537k
}
507
508
BlockFileCache::QueryFileCacheContextHolderPtr BlockFileCache::get_query_context_holder(
509
8
        const TUniqueId& query_id, int file_cache_query_limit_percent) {
510
8
    SCOPED_CACHE_LOCK(_mutex, this);
511
8
    if (!config::enable_file_cache_query_limit) {
512
1
        return {};
513
1
    }
514
515
    /// if enable_filesystem_query_cache_limit is true,
516
    /// we create context query for current query.
517
7
    auto context = get_or_set_query_context(query_id, cache_lock, file_cache_query_limit_percent);
518
7
    return std::make_unique<QueryFileCacheContextHolder>(query_id, this, context);
519
8
}
520
521
BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_query_context(
522
3.37k
        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock) {
523
3.37k
    auto query_iter = _query_map.find(query_id);
524
3.37k
    return (query_iter == _query_map.end()) ? nullptr : query_iter->second;
525
3.37k
}
526
527
6
void BlockFileCache::remove_query_context(const TUniqueId& query_id) {
528
6
    SCOPED_CACHE_LOCK(_mutex, this);
529
6
    const auto& query_iter = _query_map.find(query_id);
530
531
6
    if (query_iter != _query_map.end() && query_iter->second.use_count() <= 1) {
532
5
        _query_map.erase(query_iter);
533
5
    }
534
6
}
535
536
BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_or_set_query_context(
537
        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock,
538
7
        int file_cache_query_limit_percent) {
539
7
    if (query_id.lo == 0 && query_id.hi == 0) {
540
1
        return nullptr;
541
1
    }
542
543
6
    auto context = get_query_context(query_id, cache_lock);
544
6
    if (context) {
545
1
        return context;
546
1
    }
547
548
5
    size_t file_cache_query_limit_size = _capacity * file_cache_query_limit_percent / 100;
549
5
    if (file_cache_query_limit_size < 268435456) {
550
5
        LOG(WARNING) << "The user-set file cache query limit (" << file_cache_query_limit_size
551
5
                     << " bytes) is less than the 256MB recommended minimum. "
552
5
                     << "Consider increasing the session variable 'file_cache_query_limit_percent'"
553
5
                     << " from its current value " << file_cache_query_limit_percent << "%.";
554
5
    }
555
5
    auto query_context = std::make_shared<QueryFileCacheContext>(file_cache_query_limit_size);
556
5
    auto query_iter = _query_map.emplace(query_id, query_context).first;
557
5
    return query_iter->second;
558
6
}
559
560
void BlockFileCache::QueryFileCacheContext::remove(const UInt128Wrapper& hash, size_t offset,
561
20
                                                   std::lock_guard<std::mutex>& cache_lock) {
562
20
    auto pair = std::make_pair(hash, offset);
563
20
    auto record = records.find(pair);
564
20
    DCHECK(record != records.end());
565
20
    auto iter = record->second;
566
20
    records.erase(pair);
567
20
    lru_queue.remove(iter, cache_lock);
568
20
}
569
570
void BlockFileCache::QueryFileCacheContext::reserve(const UInt128Wrapper& hash, size_t offset,
571
                                                    size_t size,
572
30
                                                    std::lock_guard<std::mutex>& cache_lock) {
573
30
    auto pair = std::make_pair(hash, offset);
574
30
    if (records.find(pair) == records.end()) {
575
29
        auto queue_iter = lru_queue.add(hash, offset, size, cache_lock);
576
29
        records.insert({pair, queue_iter});
577
29
    }
578
30
}
579
580
151
Status BlockFileCache::initialize() {
581
151
    SCOPED_CACHE_LOCK(_mutex, this);
582
151
    return initialize_unlocked(cache_lock);
583
151
}
584
585
151
Status BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lock) {
586
151
    DCHECK(!_is_initialized);
587
151
    _is_initialized = true;
588
151
    if (config::file_cache_background_lru_dump_tail_record_num > 0) {
589
        // requirements:
590
        // 1. restored data should not overwrite the last dump
591
        // 2. restore should happen before load and async load
592
        // 3. all queues should be restored sequencially to avoid conflict
593
        // TODO(zhengyu): we can parralize them but will increase complexity, so lets check the time cost
594
        // to see if any improvement is a necessary
595
151
        restore_lru_queues_from_disk(cache_lock);
596
151
    }
597
151
    RETURN_IF_ERROR(_storage->init(this));
598
599
151
    if (auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get())) {
600
132
        if (auto* meta_store = fs_storage->get_meta_store()) {
601
132
            _ttl_mgr = std::make_unique<BlockFileCacheTtlMgr>(this, meta_store);
602
132
        }
603
132
    }
604
605
151
    _cache_background_monitor_thread = std::thread(&BlockFileCache::run_background_monitor, this);
606
151
    _cache_background_gc_thread = std::thread(&BlockFileCache::run_background_gc, this);
607
151
    _cache_background_evict_in_advance_thread =
608
151
            std::thread(&BlockFileCache::run_background_evict_in_advance, this);
609
151
    _cache_background_block_lru_update_thread =
610
151
            std::thread(&BlockFileCache::run_background_block_lru_update, this);
611
612
    // Initialize LRU dump thread and restore queues
613
151
    _cache_background_lru_dump_thread = std::thread(&BlockFileCache::run_background_lru_dump, this);
614
151
    _cache_background_lru_log_replay_thread =
615
151
            std::thread(&BlockFileCache::run_background_lru_log_replay, this);
616
617
151
    return Status::OK();
618
151
}
619
620
void BlockFileCache::update_block_lru(FileBlockSPtr block,
621
7
                                      std::lock_guard<std::mutex>& cache_lock) {
622
7
    if (!block) {
623
1
        return;
624
1
    }
625
626
6
    FileBlockCell* cell = get_cell(block->get_hash_value(), block->offset(), cache_lock);
627
6
    if (!cell || cell->file_block.get() != block.get()) {
628
1
        return;
629
1
    }
630
631
5
    if (cell->queue_iterator) {
632
5
        auto& queue = get_queue(block->cache_type());
633
5
        queue.move_to_end(*cell->queue_iterator, cache_lock);
634
5
        _lru_recorder->record_queue_event(block->cache_type(), CacheLRULogType::MOVETOBACK,
635
5
                                          block->_key.hash, block->_key.offset,
636
5
                                          block->_block_range.size());
637
5
    }
638
5
    cell->update_atime();
639
5
}
640
641
void BlockFileCache::use_cell(const FileBlockCell& cell, FileBlocks* result, bool move_iter_flag,
642
712k
                              std::lock_guard<std::mutex>& cache_lock) {
643
712k
    if (result) {
644
712k
        result->push_back(cell.file_block);
645
712k
    }
646
647
712k
    auto& queue = get_queue(cell.file_block->cache_type());
648
    /// Move to the end of the queue. The iterator remains valid.
649
712k
    if (!config::enable_file_cache_async_touch_on_get_or_set && cell.queue_iterator &&
650
712k
        move_iter_flag) {
651
520k
        queue.move_to_end(*cell.queue_iterator, cache_lock);
652
520k
        _lru_recorder->record_queue_event(cell.file_block->cache_type(),
653
520k
                                          CacheLRULogType::MOVETOBACK, cell.file_block->_key.hash,
654
520k
                                          cell.file_block->_key.offset, cell.size());
655
520k
    }
656
657
712k
    cell.update_atime();
658
712k
}
659
660
template <class T>
661
    requires IsXLock<T>
662
FileBlockCell* BlockFileCache::get_cell(const UInt128Wrapper& hash, size_t offset,
663
6.02k
                                        T& /* cache_lock */) {
664
6.02k
    auto it = _files.find(hash);
665
6.02k
    if (it == _files.end()) {
666
3
        return nullptr;
667
3
    }
668
669
6.01k
    auto& offsets = it->second;
670
6.01k
    auto cell_it = offsets.find(offset);
671
6.01k
    if (cell_it == offsets.end()) {
672
3
        return nullptr;
673
3
    }
674
675
6.01k
    return &cell_it->second;
676
6.01k
}
677
678
904k
bool BlockFileCache::need_to_move(FileCacheType cell_type, FileCacheType query_type) const {
679
904k
    return query_type != FileCacheType::DISPOSABLE && cell_type != FileCacheType::DISPOSABLE;
680
904k
}
681
682
FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheContext& context,
683
                                    const FileBlock::Range& range,
684
725k
                                    std::lock_guard<std::mutex>& cache_lock) {
685
    /// Given range = [left, right] and non-overlapping ordered set of file blocks,
686
    /// find list [block1, ..., blockN] of blocks which intersect with given range.
687
725k
    auto it = _files.find(hash);
688
725k
    if (it == _files.end()) {
689
7.07k
        if (_async_open_done) {
690
7.07k
            return {};
691
7.07k
        }
692
1
        FileCacheKey key;
693
1
        key.hash = hash;
694
1
        key.meta.type = context.cache_type;
695
1
        key.meta.expiration_time = context.expiration_time;
696
1
        key.meta.tablet_id = context.tablet_id;
697
1
        _storage->load_blocks_directly_unlocked(this, key, cache_lock);
698
699
1
        it = _files.find(hash);
700
1
        if (it == _files.end()) [[unlikely]] {
701
1
            return {};
702
1
        }
703
1
    }
704
705
718k
    auto& file_blocks = it->second;
706
718k
    if (file_blocks.empty()) {
707
0
        LOG(WARNING) << "file_blocks is empty for hash=" << hash.to_string()
708
0
                     << " cache type=" << context.cache_type
709
0
                     << " cache expiration time=" << context.expiration_time
710
0
                     << " cache range=" << range.left << " " << range.right
711
0
                     << " query id=" << context.query_id;
712
0
        DCHECK(false);
713
0
        _files.erase(hash);
714
0
        return {};
715
0
    }
716
717
718k
    FileBlocks result;
718
718k
    auto block_it = file_blocks.lower_bound(range.left);
719
718k
    if (block_it == file_blocks.end()) {
720
        /// N - last cached block for given file hash, block{N}.offset < range.left:
721
        ///   block{N}                       block{N}
722
        /// [________                         [_______]
723
        ///     [__________]         OR                  [________]
724
        ///     ^                                        ^
725
        ///     range.left                               range.left
726
727
6.19k
        const auto& cell = file_blocks.rbegin()->second;
728
6.19k
        if (cell.file_block->range().right < range.left) {
729
6.17k
            return {};
730
6.17k
        }
731
732
14
        use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
733
14
                 cache_lock);
734
712k
    } else { /// block_it <-- segmment{k}
735
712k
        if (block_it != file_blocks.begin()) {
736
218
            const auto& prev_cell = std::prev(block_it)->second;
737
218
            const auto& prev_cell_range = prev_cell.file_block->range();
738
739
218
            if (range.left <= prev_cell_range.right) {
740
                ///   block{k-1}  block{k}
741
                ///   [________]   [_____
742
                ///       [___________
743
                ///       ^
744
                ///       range.left
745
746
137
                use_cell(prev_cell, &result,
747
137
                         need_to_move(prev_cell.file_block->cache_type(), context.cache_type),
748
137
                         cache_lock);
749
137
            }
750
218
        }
751
752
        ///  block{k} ...       block{k-1}  block{k}                      block{k}
753
        ///  [______              [______]     [____                        [________
754
        ///  [_________     OR              [________      OR    [______]   ^
755
        ///  ^                              ^                           ^   block{k}.offset
756
        ///  range.left                     range.left                  range.right
757
758
1.42M
        while (block_it != file_blocks.end()) {
759
712k
            const auto& cell = block_it->second;
760
712k
            if (range.right < cell.file_block->range().left) {
761
174
                break;
762
174
            }
763
764
712k
            use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
765
712k
                     cache_lock);
766
712k
            ++block_it;
767
712k
        }
768
712k
    }
769
770
712k
    return result;
771
718k
}
772
773
193k
void BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) {
774
193k
    auto result = _need_update_lru_blocks.insert_with_result(std::move(block));
775
193k
    if (result == NeedUpdateLRUBlocks::InsertResult::INSERTED) {
776
1.55k
        _need_update_lru_blocks_length_metrics->set_value(_need_update_lru_blocks.size());
777
191k
    } else if (result == NeedUpdateLRUBlocks::InsertResult::DROPPED) {
778
0
        *(_need_update_lru_blocks_dropped_metrics) << 1;
779
0
        LOG_EVERY_N(WARNING, 60) << "Drop block LRU update because hard cap is reached, hard_cap="
780
0
                                 << _need_update_lru_blocks.hard_cap()
781
0
                                 << " queue_size=" << _need_update_lru_blocks.size();
782
0
    }
783
193k
}
784
785
3
std::string BlockFileCache::clear_file_cache_async() {
786
3
    LOG(INFO) << "start clear_file_cache_async, path=" << _cache_base_path;
787
3
    _lru_dumper->remove_lru_dump_files();
788
3
    int64_t num_cells_all = 0;
789
3
    int64_t num_cells_to_delete = 0;
790
3
    int64_t num_cells_wait_recycle = 0;
791
3
    int64_t num_files_all = 0;
792
3
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::clear_file_cache_async");
793
3
    {
794
3
        SCOPED_CACHE_LOCK(_mutex, this);
795
796
3
        std::vector<FileBlockCell*> deleting_cells;
797
4
        for (auto& [_, offset_to_cell] : _files) {
798
4
            ++num_files_all;
799
37
            for (auto& [_1, cell] : offset_to_cell) {
800
37
                ++num_cells_all;
801
37
                deleting_cells.push_back(&cell);
802
37
            }
803
4
        }
804
805
        // we cannot delete the element in the loop above, because it will break the iterator
806
37
        for (auto& cell : deleting_cells) {
807
37
            if (!cell->releasable()) {
808
2
                LOG(INFO) << "cell is not releasable, hash="
809
2
                          << " offset=" << cell->file_block->offset();
810
2
                cell->file_block->set_deleting();
811
2
                ++num_cells_wait_recycle;
812
2
                continue;
813
2
            }
814
35
            FileBlockSPtr file_block = cell->file_block;
815
35
            if (file_block) {
816
35
                std::lock_guard block_lock(file_block->_mutex);
817
35
                remove(file_block, cache_lock, block_lock, false);
818
35
                ++num_cells_to_delete;
819
35
            }
820
35
        }
821
3
        clear_need_update_lru_blocks();
822
3
    }
823
824
3
    std::stringstream ss;
825
3
    ss << "finish clear_file_cache_async, path=" << _cache_base_path
826
3
       << " num_files_all=" << num_files_all << " num_cells_all=" << num_cells_all
827
3
       << " num_cells_to_delete=" << num_cells_to_delete
828
3
       << " num_cells_wait_recycle=" << num_cells_wait_recycle;
829
3
    auto msg = ss.str();
830
3
    LOG(INFO) << msg;
831
3
    _lru_dumper->remove_lru_dump_files();
832
3
    return msg;
833
3
}
834
835
FileBlocks BlockFileCache::split_range_into_cells(const UInt128Wrapper& hash,
836
                                                  const CacheContext& context, size_t offset,
837
                                                  size_t size, FileBlock::State state,
838
13.4k
                                                  std::lock_guard<std::mutex>& cache_lock) {
839
13.4k
    DCHECK(size > 0);
840
841
13.4k
    auto current_pos = offset;
842
13.4k
    auto end_pos_non_included = offset + size;
843
844
13.4k
    size_t current_size = 0;
845
13.4k
    size_t remaining_size = size;
846
847
13.4k
    FileBlocks file_blocks;
848
26.9k
    while (current_pos < end_pos_non_included) {
849
13.4k
        current_size = std::min(remaining_size, _max_file_block_size);
850
13.4k
        remaining_size -= current_size;
851
13.4k
        state = try_reserve(hash, context, current_pos, current_size, cache_lock)
852
13.4k
                        ? state
853
13.4k
                        : FileBlock::State::SKIP_CACHE;
854
13.4k
        if (state == FileBlock::State::SKIP_CACHE) [[unlikely]] {
855
62
            FileCacheKey key;
856
62
            key.hash = hash;
857
62
            key.offset = current_pos;
858
62
            key.meta.type = context.cache_type;
859
62
            key.meta.expiration_time = context.expiration_time;
860
62
            key.meta.tablet_id = context.tablet_id;
861
62
            auto file_block = std::make_shared<FileBlock>(key, current_size, this,
862
62
                                                          FileBlock::State::SKIP_CACHE);
863
62
            file_blocks.push_back(std::move(file_block));
864
13.4k
        } else {
865
13.4k
            auto* cell = add_cell(hash, context, current_pos, current_size, state, cache_lock);
866
13.4k
            if (cell) {
867
13.4k
                file_blocks.push_back(cell->file_block);
868
13.4k
                if (!context.is_cold_data) {
869
13.4k
                    cell->update_atime();
870
13.4k
                }
871
13.4k
            }
872
13.4k
            if (_ttl_mgr && context.tablet_id != 0) {
873
1.06k
                _ttl_mgr->register_tablet_id(context.tablet_id);
874
1.06k
            }
875
13.4k
        }
876
877
13.4k
        current_pos += current_size;
878
13.4k
    }
879
880
13.4k
    DCHECK(file_blocks.empty() || offset + size - 1 == file_blocks.back()->range().right);
881
13.4k
    return file_blocks;
882
13.4k
}
883
884
void BlockFileCache::fill_holes_with_empty_file_blocks(FileBlocks& file_blocks,
885
                                                       const UInt128Wrapper& hash,
886
                                                       const CacheContext& context,
887
                                                       const FileBlock::Range& range,
888
712k
                                                       std::lock_guard<std::mutex>& cache_lock) {
889
    /// There are blocks [block1, ..., blockN]
890
    /// (non-overlapping, non-empty, ascending-ordered) which (maybe partially)
891
    /// intersect with given range.
892
893
    /// It can have holes:
894
    /// [____________________]         -- requested range
895
    ///     [____]  [_]   [_________]  -- intersecting cache [block1, ..., blockN]
896
    ///
897
    /// For each such hole create a cell with file block state EMPTY.
898
899
712k
    auto it = file_blocks.begin();
900
712k
    auto block_range = (*it)->range();
901
902
712k
    size_t current_pos = 0;
903
712k
    if (block_range.left < range.left) {
904
        ///    [_______     -- requested range
905
        /// [_______
906
        /// ^
907
        /// block1
908
909
151
        current_pos = block_range.right + 1;
910
151
        ++it;
911
712k
    } else {
912
712k
        current_pos = range.left;
913
712k
    }
914
915
1.42M
    while (current_pos <= range.right && it != file_blocks.end()) {
916
712k
        block_range = (*it)->range();
917
918
712k
        if (current_pos == block_range.left) {
919
712k
            current_pos = block_range.right + 1;
920
712k
            ++it;
921
712k
            continue;
922
712k
        }
923
924
712k
        DCHECK(current_pos < block_range.left);
925
926
109
        auto hole_size = block_range.left - current_pos;
927
928
109
        file_blocks.splice(it, split_range_into_cells(hash, context, current_pos, hole_size,
929
109
                                                      FileBlock::State::EMPTY, cache_lock));
930
931
109
        current_pos = block_range.right + 1;
932
109
        ++it;
933
109
    }
934
935
712k
    if (current_pos <= range.right) {
936
        ///   ________]     -- requested range
937
        ///   _____]
938
        ///        ^
939
        /// blockN
940
941
86
        auto hole_size = range.right - current_pos + 1;
942
943
86
        file_blocks.splice(file_blocks.end(),
944
86
                           split_range_into_cells(hash, context, current_pos, hole_size,
945
86
                                                  FileBlock::State::EMPTY, cache_lock));
946
86
    }
947
712k
}
948
949
FileBlocksHolder BlockFileCache::get_or_set(const UInt128Wrapper& hash, size_t offset, size_t size,
950
725k
                                            CacheContext& context) {
951
725k
    FileBlock::Range range(offset, offset + size - 1);
952
953
725k
    ReadStatistics* stats = context.stats;
954
725k
    DCHECK(stats != nullptr);
955
725k
    MonotonicStopWatch sw;
956
725k
    sw.start();
957
725k
    FileBlocks file_blocks;
958
725k
    std::vector<FileBlockSPtr> need_update_lru_blocks;
959
725k
    const bool async_touch_on_get_or_set = config::enable_file_cache_async_touch_on_get_or_set;
960
725k
    int64_t duration = 0;
961
725k
    {
962
725k
        ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->increment();
963
725k
        std::lock_guard cache_lock(_mutex);
964
725k
        ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->decrement();
965
725k
        stats->lock_wait_timer += sw.elapsed_time();
966
725k
        SCOPED_RAW_TIMER(&duration);
967
        /// Get all blocks which intersect with the given range.
968
725k
        {
969
725k
            SCOPED_RAW_TIMER(&stats->get_timer);
970
725k
            file_blocks = get_impl(hash, context, range, cache_lock);
971
725k
        }
972
973
725k
        if (file_blocks.empty()) {
974
13.2k
            SCOPED_RAW_TIMER(&stats->set_timer);
975
13.2k
            file_blocks = split_range_into_cells(hash, context, offset, size,
976
13.2k
                                                 FileBlock::State::EMPTY, cache_lock);
977
712k
        } else {
978
712k
            SCOPED_RAW_TIMER(&stats->set_timer);
979
712k
            fill_holes_with_empty_file_blocks(file_blocks, hash, context, range, cache_lock);
980
712k
        }
981
725k
        DCHECK(!file_blocks.empty());
982
725k
        *_num_read_blocks << file_blocks.size();
983
725k
        if (!context.is_warmup) {
984
725k
            *_no_warmup_num_read_blocks << file_blocks.size();
985
725k
        }
986
725k
        if (async_touch_on_get_or_set) {
987
193k
            need_update_lru_blocks.reserve(file_blocks.size());
988
193k
        }
989
726k
        for (auto& block : file_blocks) {
990
726k
            size_t block_size = block->range().size();
991
726k
            *_total_read_size_metrics << block_size;
992
726k
            if (block->state_unsafe() == FileBlock::State::DOWNLOADED) {
993
712k
                *_num_hit_blocks << 1;
994
712k
                *_total_hit_size_metrics << block_size;
995
712k
                if (!context.is_warmup) {
996
712k
                    *_no_warmup_num_hit_blocks << 1;
997
712k
                }
998
712k
                if (async_touch_on_get_or_set &&
999
712k
                    need_to_move(block->cache_type(), context.cache_type)) {
1000
192k
                    need_update_lru_blocks.emplace_back(block);
1001
192k
                }
1002
712k
            }
1003
726k
        }
1004
725k
    }
1005
1006
725k
    if (async_touch_on_get_or_set) {
1007
193k
        for (auto& block : need_update_lru_blocks) {
1008
192k
            add_need_update_lru_block(std::move(block));
1009
192k
        }
1010
193k
    }
1011
1012
725k
    *_get_or_set_latency_us << (duration / 1000);
1013
725k
    return FileBlocksHolder(std::move(file_blocks));
1014
725k
}
1015
1016
FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& hash, const CacheContext& context,
1017
                                        size_t offset, size_t size, FileBlock::State state,
1018
13.6k
                                        std::lock_guard<std::mutex>& cache_lock) {
1019
    /// Create a file block cell and put it in `files` map by [hash][offset].
1020
13.6k
    if (size == 0) {
1021
0
        return nullptr; /// Empty files are not cached.
1022
0
    }
1023
1024
13.6k
    VLOG_DEBUG << "Adding file block to cache. size=" << size << " hash=" << hash.to_string()
1025
0
               << " offset=" << offset << " cache_type=" << cache_type_to_string(context.cache_type)
1026
0
               << " expiration_time=" << context.expiration_time
1027
0
               << " tablet_id=" << context.tablet_id;
1028
1029
13.6k
    if (size > 1024 * 1024 * 1024) {
1030
1
        LOG(WARNING) << "File block size is too large for a block, reject. size=" << size
1031
1
                     << " hash=" << hash.to_string() << " offset=" << offset
1032
1
                     << " stack:" << get_stack_trace();
1033
1
        return nullptr;
1034
1
    }
1035
1036
13.6k
    auto& offsets = _files[hash];
1037
13.6k
    auto itr = offsets.find(offset);
1038
13.6k
    if (itr != offsets.end()) {
1039
5
        VLOG_DEBUG << "Cache already exists for hash: " << hash.to_string()
1040
0
                   << ", offset: " << offset << ", size: " << size
1041
0
                   << ".\nCurrent cache structure: " << dump_structure_unlocked(hash, cache_lock);
1042
5
        return &(itr->second);
1043
5
    }
1044
1045
13.6k
    FileCacheKey key;
1046
13.6k
    key.hash = hash;
1047
13.6k
    key.offset = offset;
1048
13.6k
    key.meta.type = context.cache_type;
1049
13.6k
    key.meta.expiration_time = context.expiration_time;
1050
13.6k
    key.meta.tablet_id = context.tablet_id;
1051
13.6k
    FileBlockCell cell(std::make_shared<FileBlock>(key, size, this, state), cache_lock);
1052
13.6k
    Status st;
1053
13.6k
    if (context.expiration_time == 0 && context.cache_type == FileCacheType::TTL) {
1054
0
        st = cell.file_block->change_cache_type_lock(FileCacheType::NORMAL, cache_lock);
1055
13.6k
    } else if (context.cache_type != FileCacheType::TTL && context.expiration_time != 0) {
1056
1
        st = cell.file_block->change_cache_type_lock(FileCacheType::TTL, cache_lock);
1057
1
    }
1058
13.6k
    if (!st.ok()) {
1059
0
        LOG(WARNING) << "Cannot change cache type. expiration_time=" << context.expiration_time
1060
0
                     << " cache_type=" << cache_type_to_string(context.cache_type)
1061
0
                     << " error=" << st.msg();
1062
0
    }
1063
1064
13.6k
    auto& queue = get_queue(cell.file_block->cache_type());
1065
13.6k
    cell.queue_iterator = queue.add(hash, offset, size, cache_lock);
1066
13.6k
    _lru_recorder->record_queue_event(cell.file_block->cache_type(), CacheLRULogType::ADD,
1067
13.6k
                                      cell.file_block->get_hash_value(), cell.file_block->offset(),
1068
13.6k
                                      cell.size());
1069
1070
13.6k
    if (cell.file_block->cache_type() == FileCacheType::TTL) {
1071
3.92k
        _cur_ttl_size += cell.size();
1072
3.92k
    }
1073
13.6k
    auto [it, _] = offsets.insert(std::make_pair(offset, std::move(cell)));
1074
13.6k
    _cur_cache_size += size;
1075
13.6k
    return &(it->second);
1076
13.6k
}
1077
1078
0
size_t BlockFileCache::try_release() {
1079
0
    SCOPED_CACHE_LOCK(_mutex, this);
1080
0
    std::vector<FileBlockCell*> trash;
1081
0
    for (auto& [hash, blocks] : _files) {
1082
0
        for (auto& [offset, cell] : blocks) {
1083
0
            if (cell.releasable()) {
1084
0
                trash.emplace_back(&cell);
1085
0
            } else {
1086
0
                cell.file_block->set_deleting();
1087
0
            }
1088
0
        }
1089
0
    }
1090
0
    size_t remove_size = 0;
1091
0
    for (auto& cell : trash) {
1092
0
        FileBlockSPtr file_block = cell->file_block;
1093
0
        std::lock_guard lc(cell->file_block->_mutex);
1094
0
        remove_size += file_block->range().size();
1095
0
        remove(file_block, cache_lock, lc);
1096
0
        VLOG_DEBUG << "try_release " << _cache_base_path
1097
0
                   << " hash=" << file_block->get_hash_value().to_string()
1098
0
                   << " offset=" << file_block->offset();
1099
0
    }
1100
0
    *_evict_by_try_release << remove_size;
1101
0
    LOG(INFO) << "Released " << trash.size() << " blocks in file cache " << _cache_base_path;
1102
0
    return trash.size();
1103
0
}
1104
1105
764k
LRUQueue& BlockFileCache::get_queue(FileCacheType type) {
1106
764k
    switch (type) {
1107
14.5k
    case FileCacheType::INDEX:
1108
14.5k
        return _index_queue;
1109
14.3k
    case FileCacheType::DISPOSABLE:
1110
14.3k
        return _disposable_queue;
1111
729k
    case FileCacheType::NORMAL:
1112
729k
        return _normal_queue;
1113
5.79k
    case FileCacheType::TTL:
1114
5.79k
        return _ttl_queue;
1115
0
    default:
1116
0
        DCHECK(false);
1117
764k
    }
1118
0
    return _normal_queue;
1119
764k
}
1120
1121
140
const LRUQueue& BlockFileCache::get_queue(FileCacheType type) const {
1122
140
    switch (type) {
1123
33
    case FileCacheType::INDEX:
1124
33
        return _index_queue;
1125
1
    case FileCacheType::DISPOSABLE:
1126
1
        return _disposable_queue;
1127
106
    case FileCacheType::NORMAL:
1128
106
        return _normal_queue;
1129
0
    case FileCacheType::TTL:
1130
0
        return _ttl_queue;
1131
0
    default:
1132
0
        DCHECK(false);
1133
140
    }
1134
0
    return _normal_queue;
1135
140
}
1136
1137
void BlockFileCache::remove_file_blocks(std::vector<FileBlockCell*>& to_evict,
1138
                                        std::lock_guard<std::mutex>& cache_lock, bool sync,
1139
14.7k
                                        std::string& reason) {
1140
14.7k
    auto remove_file_block_if = [&](FileBlockCell* cell) {
1141
1.36k
        FileBlockSPtr file_block = cell->file_block;
1142
1.36k
        if (file_block) {
1143
1.36k
            std::lock_guard block_lock(file_block->_mutex);
1144
1.36k
            remove(file_block, cache_lock, block_lock, sync);
1145
1.36k
            VLOG_DEBUG << "remove_file_blocks"
1146
0
                       << " hash=" << file_block->get_hash_value().to_string()
1147
0
                       << " offset=" << file_block->offset() << " reason=" << reason;
1148
1.36k
        }
1149
1.36k
    };
1150
14.7k
    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
1151
14.7k
}
1152
1153
void BlockFileCache::find_evict_candidates(LRUQueue& queue, size_t size, size_t cur_cache_size,
1154
                                           size_t& removed_size,
1155
                                           std::vector<FileBlockCell*>& to_evict,
1156
                                           std::lock_guard<std::mutex>& cache_lock,
1157
1.33k
                                           size_t& cur_removed_size, bool evict_in_advance) {
1158
3.09k
    for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1159
3.09k
        if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1160
1.27k
            break;
1161
1.27k
        }
1162
1.81k
        auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1163
1164
1.81k
        DCHECK(cell) << "Cache became inconsistent. key: " << entry_key.to_string()
1165
0
                     << ", offset: " << entry_offset;
1166
1167
1.81k
        size_t cell_size = cell->size();
1168
1.81k
        DCHECK(entry_size == cell_size);
1169
1170
1.81k
        if (cell->releasable()) {
1171
1.34k
            auto& file_block = cell->file_block;
1172
1173
1.34k
            std::lock_guard block_lock(file_block->_mutex);
1174
1.34k
            DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1175
1.34k
            to_evict.push_back(cell);
1176
1.34k
            removed_size += cell_size;
1177
1.34k
            cur_removed_size += cell_size;
1178
1.34k
        }
1179
1.81k
    }
1180
1.33k
}
1181
1182
// 1. if async load file cache not finish
1183
//     a. evict from lru queue
1184
// 2. if ttl cache
1185
//     a. evict from disposable/normal/index queue one by one
1186
// 3. if dont reach query limit or dont have query limit
1187
//     a. evict from other queue
1188
//     b. evict from current queue
1189
//         a.1 if the data belong write, then just evict cold data
1190
// 4. if reach query limit
1191
//     a. evict from query queue
1192
//     b. evict from other queue
1193
bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const CacheContext& context,
1194
                                 size_t offset, size_t size,
1195
13.4k
                                 std::lock_guard<std::mutex>& cache_lock) {
1196
13.4k
    if (!_async_open_done) {
1197
4
        return try_reserve_during_async_load(size, cache_lock);
1198
4
    }
1199
    // use this strategy in scenarios where there is insufficient disk capacity or insufficient number of inodes remaining
1200
    // directly eliminate 5 times the size of the space
1201
13.4k
    if (_disk_resource_limit_mode) {
1202
1
        size = 5 * size;
1203
1
    }
1204
1205
13.4k
    auto query_context = config::enable_file_cache_query_limit &&
1206
13.4k
                                         (context.query_id.hi != 0 || context.query_id.lo != 0)
1207
13.4k
                                 ? get_query_context(context.query_id, cache_lock)
1208
13.4k
                                 : nullptr;
1209
13.4k
    if (!query_context) {
1210
13.4k
        return try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock);
1211
13.4k
    } else if (query_context->get_cache_size(cache_lock) + size <=
1212
31
               query_context->get_max_cache_size()) {
1213
16
        return try_reserve_for_lru(hash, query_context, context, offset, size, cache_lock);
1214
16
    }
1215
15
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
1216
15
                               std::chrono::steady_clock::now().time_since_epoch())
1217
15
                               .count();
1218
15
    auto& queue = get_queue(context.cache_type);
1219
15
    size_t removed_size = 0;
1220
15
    size_t ghost_remove_size = 0;
1221
15
    size_t queue_size = queue.get_capacity(cache_lock);
1222
15
    size_t cur_cache_size = _cur_cache_size;
1223
15
    size_t query_context_cache_size = query_context->get_cache_size(cache_lock);
1224
1225
15
    std::vector<LRUQueue::Iterator> ghost;
1226
15
    std::vector<FileBlockCell*> to_evict;
1227
1228
15
    size_t max_size = queue.get_max_size();
1229
48
    auto is_overflow = [&] {
1230
48
        return _disk_resource_limit_mode ? removed_size < size
1231
48
                                         : cur_cache_size + size - removed_size > _capacity ||
1232
48
                                                   (queue_size + size - removed_size > max_size) ||
1233
48
                                                   (query_context_cache_size + size -
1234
38
                                                            (removed_size + ghost_remove_size) >
1235
38
                                                    query_context->get_max_cache_size());
1236
48
    };
1237
1238
    /// Select the cache from the LRU queue held by query for expulsion.
1239
35
    for (auto iter = query_context->queue().begin(); iter != query_context->queue().end(); iter++) {
1240
33
        if (!is_overflow()) {
1241
13
            break;
1242
13
        }
1243
1244
20
        auto* cell = get_cell(iter->hash, iter->offset, cache_lock);
1245
1246
20
        if (!cell) {
1247
            /// The cache corresponding to this record may be swapped out by
1248
            /// other queries, so it has become invalid.
1249
4
            ghost.push_back(iter);
1250
4
            ghost_remove_size += iter->size;
1251
16
        } else {
1252
16
            size_t cell_size = cell->size();
1253
16
            DCHECK(iter->size == cell_size);
1254
1255
16
            if (cell->releasable()) {
1256
16
                auto& file_block = cell->file_block;
1257
1258
16
                std::lock_guard block_lock(file_block->_mutex);
1259
16
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1260
16
                to_evict.push_back(cell);
1261
16
                removed_size += cell_size;
1262
16
            }
1263
16
        }
1264
20
    }
1265
1266
16
    auto remove_file_block_if = [&](FileBlockCell* cell) {
1267
16
        FileBlockSPtr file_block = cell->file_block;
1268
16
        if (file_block) {
1269
16
            query_context->remove(file_block->get_hash_value(), file_block->offset(), cache_lock);
1270
16
            std::lock_guard block_lock(file_block->_mutex);
1271
16
            remove(file_block, cache_lock, block_lock);
1272
16
        }
1273
16
    };
1274
1275
15
    for (auto& iter : ghost) {
1276
4
        query_context->remove(iter->hash, iter->offset, cache_lock);
1277
4
    }
1278
1279
15
    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
1280
1281
15
    if (is_overflow() &&
1282
15
        !try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock)) {
1283
1
        return false;
1284
1
    }
1285
14
    query_context->reserve(hash, offset, size, cache_lock);
1286
14
    return true;
1287
15
}
1288
1289
2
void BlockFileCache::try_evict_in_advance(size_t size, std::lock_guard<std::mutex>& cache_lock) {
1290
2
    UInt128Wrapper hash = UInt128Wrapper();
1291
2
    size_t offset = 0;
1292
2
    CacheContext context;
1293
    /* we pick NORMAL and TTL cache to evict in advance
1294
     * we reserve for them but won't acutually give space to them
1295
     * on the contrary, NORMAL and TTL may sacrifice by LRU evicting themselves
1296
     * other cache types cannot be exempted because we will evict what they have stolen before LRU evicting
1297
     * in summary: all cache types will shrink somewhat, and NORMAL and TTL shrink the most, to make sure the cache is not full
1298
     */
1299
2
    context.cache_type = FileCacheType::NORMAL;
1300
2
    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
1301
2
    context.cache_type = FileCacheType::TTL;
1302
2
    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
1303
2
}
1304
1305
// remove specific cache synchronously, for critical operations
1306
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
1307
8
void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) {
1308
8
    std::string reason = "remove_if_cached";
1309
8
    SCOPED_CACHE_LOCK(_mutex, this);
1310
8
    auto iter = _files.find(file_key);
1311
8
    std::vector<FileBlockCell*> to_remove;
1312
8
    if (iter != _files.end()) {
1313
14
        for (auto& [_, cell] : iter->second) {
1314
14
            if (cell.releasable()) {
1315
13
                to_remove.push_back(&cell);
1316
13
            } else {
1317
1
                cell.file_block->set_deleting();
1318
1
            }
1319
14
        }
1320
6
    }
1321
8
    remove_file_blocks(to_remove, cache_lock, true, reason);
1322
8
}
1323
1324
// the async version of remove_if_cached, for background operations
1325
// cache meta is deleted synchronously if not in use, and the block file is deleted asynchronously
1326
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
1327
1
void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) {
1328
1
    std::string reason = "remove_if_cached_async";
1329
1
    SCOPED_CACHE_LOCK(_mutex, this);
1330
1331
1
    auto iter = _files.find(file_key);
1332
1
    std::vector<FileBlockCell*> to_remove;
1333
1
    if (iter != _files.end()) {
1334
1
        for (auto& [_, cell] : iter->second) {
1335
1
            *_gc_evict_bytes_metrics << cell.size();
1336
1
            *_gc_evict_count_metrics << 1;
1337
1
            if (cell.releasable()) {
1338
0
                to_remove.push_back(&cell);
1339
1
            } else {
1340
1
                cell.file_block->set_deleting();
1341
1
            }
1342
1
        }
1343
1
    }
1344
1
    remove_file_blocks(to_remove, cache_lock, false, reason);
1345
1
}
1346
1347
std::vector<FileCacheType> BlockFileCache::get_other_cache_type_without_ttl(
1348
13.4k
        FileCacheType cur_cache_type) {
1349
13.4k
    switch (cur_cache_type) {
1350
3.91k
    case FileCacheType::TTL:
1351
3.91k
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
1352
671
    case FileCacheType::INDEX:
1353
671
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL};
1354
8.27k
    case FileCacheType::NORMAL:
1355
8.27k
        return {FileCacheType::DISPOSABLE, FileCacheType::INDEX};
1356
617
    case FileCacheType::DISPOSABLE:
1357
617
        return {FileCacheType::NORMAL, FileCacheType::INDEX};
1358
0
    default:
1359
0
        return {};
1360
13.4k
    }
1361
0
    return {};
1362
13.4k
}
1363
1364
1.24k
std::vector<FileCacheType> BlockFileCache::get_other_cache_type(FileCacheType cur_cache_type) {
1365
1.24k
    switch (cur_cache_type) {
1366
284
    case FileCacheType::TTL:
1367
284
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
1368
143
    case FileCacheType::INDEX:
1369
143
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::TTL};
1370
665
    case FileCacheType::NORMAL:
1371
665
        return {FileCacheType::DISPOSABLE, FileCacheType::INDEX, FileCacheType::TTL};
1372
152
    case FileCacheType::DISPOSABLE:
1373
152
        return {FileCacheType::NORMAL, FileCacheType::INDEX, FileCacheType::TTL};
1374
0
    default:
1375
0
        return {};
1376
1.24k
    }
1377
0
    return {};
1378
1.24k
}
1379
1380
void BlockFileCache::reset_range(const UInt128Wrapper& hash, size_t offset, size_t old_size,
1381
5
                                 size_t new_size, std::lock_guard<std::mutex>& cache_lock) {
1382
5
    DCHECK(_files.find(hash) != _files.end() &&
1383
5
           _files.find(hash)->second.find(offset) != _files.find(hash)->second.end());
1384
5
    FileBlockCell* cell = get_cell(hash, offset, cache_lock);
1385
5
    DCHECK(cell != nullptr);
1386
5
    if (cell == nullptr) {
1387
0
        LOG(WARNING) << "reset_range skipped because cache cell is missing. hash="
1388
0
                     << hash.to_string() << " offset=" << offset << " old_size=" << old_size
1389
0
                     << " new_size=" << new_size;
1390
0
        return;
1391
0
    }
1392
5
    DCHECK_EQ(cell->file_block->_block_range.size(), old_size);
1393
5
    if (cell->queue_iterator) {
1394
5
        auto& queue = get_queue(cell->file_block->cache_type());
1395
5
        DCHECK(queue.contains(hash, offset, cache_lock));
1396
5
        queue.resize(*cell->queue_iterator, new_size, cache_lock);
1397
5
        _lru_recorder->record_queue_event(cell->file_block->cache_type(), CacheLRULogType::RESIZE,
1398
5
                                          cell->file_block->get_hash_value(),
1399
5
                                          cell->file_block->offset(), new_size);
1400
5
    }
1401
5
    cell->file_block->_block_range.right = cell->file_block->_block_range.left + new_size - 1;
1402
5
    _cur_cache_size -= old_size;
1403
5
    _cur_cache_size += new_size;
1404
5
    if (cell->file_block->cache_type() == FileCacheType::TTL) {
1405
0
        _cur_ttl_size -= old_size;
1406
0
        _cur_ttl_size += new_size;
1407
0
    }
1408
5
}
1409
1410
bool BlockFileCache::try_reserve_from_other_queue_by_time_interval(
1411
        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
1412
13.4k
        int64_t cur_time, std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
1413
13.4k
    size_t removed_size = 0;
1414
13.4k
    size_t cur_cache_size = _cur_cache_size;
1415
13.4k
    std::vector<FileBlockCell*> to_evict;
1416
30.8k
    for (FileCacheType cache_type : other_cache_types) {
1417
30.8k
        auto& queue = get_queue(cache_type);
1418
30.8k
        size_t remove_size_per_type = 0;
1419
30.8k
        for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1420
1.43k
            if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1421
697
                break;
1422
697
            }
1423
739
            auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1424
739
            DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
1425
0
                         << ", offset: " << entry_offset;
1426
1427
739
            size_t cell_size = cell->size();
1428
739
            DCHECK(entry_size == cell_size);
1429
1430
739
            if (cell->atime == 0 ? true : cell->atime + queue.get_hot_data_interval() > cur_time) {
1431
737
                break;
1432
737
            }
1433
1434
2
            if (cell->releasable()) {
1435
2
                auto& file_block = cell->file_block;
1436
2
                std::lock_guard block_lock(file_block->_mutex);
1437
2
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1438
2
                to_evict.push_back(cell);
1439
2
                removed_size += cell_size;
1440
2
                remove_size_per_type += cell_size;
1441
2
            }
1442
2
        }
1443
30.8k
        *(_evict_by_time_metrics_matrix[cache_type][cur_type]) << remove_size_per_type;
1444
30.8k
    }
1445
13.4k
    bool is_sync_removal = !evict_in_advance;
1446
13.4k
    std::string reason = std::string("try_reserve_by_time ") +
1447
13.4k
                         " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1448
13.4k
    remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1449
1450
13.4k
    return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
1451
13.4k
}
1452
1453
bool BlockFileCache::is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size,
1454
19.2k
                                 bool evict_in_advance) const {
1455
19.2k
    bool ret = false;
1456
19.2k
    if (evict_in_advance) { // we don't need to check _need_evict_cache_in_advance
1457
23
        ret = (removed_size < need_size);
1458
23
        return ret;
1459
23
    }
1460
19.2k
    if (_disk_resource_limit_mode) {
1461
4
        ret = (removed_size < need_size);
1462
19.2k
    } else {
1463
19.2k
        ret = (cur_cache_size + need_size - removed_size > _capacity);
1464
19.2k
    }
1465
19.2k
    return ret;
1466
19.2k
}
1467
1468
bool BlockFileCache::try_reserve_from_other_queue_by_size(
1469
        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
1470
415
        std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
1471
415
    size_t removed_size = 0;
1472
415
    size_t cur_cache_size = _cur_cache_size;
1473
415
    std::vector<FileBlockCell*> to_evict;
1474
    // we follow the privilege defined in get_other_cache_types to evict
1475
1.24k
    for (FileCacheType cache_type : other_cache_types) {
1476
1.24k
        auto& queue = get_queue(cache_type);
1477
1478
        // we will not drain each of them to the bottom -- i.e., we only
1479
        // evict what they have stolen.
1480
1.24k
        size_t cur_queue_size = queue.get_capacity(cache_lock);
1481
1.24k
        size_t cur_queue_max_size = queue.get_max_size();
1482
1.24k
        if (cur_queue_size <= cur_queue_max_size) {
1483
740
            continue;
1484
740
        }
1485
505
        size_t cur_removed_size = 0;
1486
505
        find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
1487
505
                              cur_removed_size, evict_in_advance);
1488
505
        *(_evict_by_size_metrics_matrix[cache_type][cur_type]) << cur_removed_size;
1489
505
    }
1490
415
    bool is_sync_removal = !evict_in_advance;
1491
415
    std::string reason = std::string("try_reserve_by_size") +
1492
415
                         " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1493
415
    remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1494
415
    return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
1495
415
}
1496
1497
bool BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type, size_t size,
1498
                                                  int64_t cur_time,
1499
                                                  std::lock_guard<std::mutex>& cache_lock,
1500
13.4k
                                                  bool evict_in_advance) {
1501
    // currently, TTL cache is not considered as a candidate
1502
13.4k
    auto other_cache_types = get_other_cache_type_without_ttl(cur_cache_type);
1503
13.4k
    bool reserve_success = try_reserve_from_other_queue_by_time_interval(
1504
13.4k
            cur_cache_type, other_cache_types, size, cur_time, cache_lock, evict_in_advance);
1505
13.4k
    if (reserve_success || !config::file_cache_enable_evict_from_other_queue_by_size) {
1506
12.2k
        return reserve_success;
1507
12.2k
    }
1508
1509
1.24k
    other_cache_types = get_other_cache_type(cur_cache_type);
1510
1.24k
    auto& cur_queue = get_queue(cur_cache_type);
1511
1.24k
    size_t cur_queue_size = cur_queue.get_capacity(cache_lock);
1512
1.24k
    size_t cur_queue_max_size = cur_queue.get_max_size();
1513
    // Hit the soft limit by self, cannot remove from other queues
1514
1.24k
    if (_cur_cache_size + size > _capacity && cur_queue_size + size > cur_queue_max_size) {
1515
829
        return false;
1516
829
    }
1517
415
    return try_reserve_from_other_queue_by_size(cur_cache_type, other_cache_types, size, cache_lock,
1518
415
                                                evict_in_advance);
1519
1.24k
}
1520
1521
bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash,
1522
                                         QueryFileCacheContextPtr query_context,
1523
                                         const CacheContext& context, size_t offset, size_t size,
1524
                                         std::lock_guard<std::mutex>& cache_lock,
1525
13.4k
                                         bool evict_in_advance) {
1526
13.4k
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
1527
13.4k
                               std::chrono::steady_clock::now().time_since_epoch())
1528
13.4k
                               .count();
1529
13.4k
    if (!try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock,
1530
13.4k
                                      evict_in_advance)) {
1531
831
        auto& queue = get_queue(context.cache_type);
1532
831
        size_t removed_size = 0;
1533
831
        size_t cur_cache_size = _cur_cache_size;
1534
1535
831
        std::vector<FileBlockCell*> to_evict;
1536
831
        size_t cur_removed_size = 0;
1537
831
        find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
1538
831
                              cur_removed_size, evict_in_advance);
1539
831
        bool is_sync_removal = !evict_in_advance;
1540
831
        std::string reason = std::string("try_reserve for cache type ") +
1541
831
                             cache_type_to_string(context.cache_type) +
1542
831
                             " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1543
831
        remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1544
831
        *(_evict_by_self_lru_metrics_matrix[context.cache_type]) << cur_removed_size;
1545
1546
831
        if (is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1547
61
            return false;
1548
61
        }
1549
831
    }
1550
1551
13.4k
    if (query_context) {
1552
16
        query_context->reserve(hash, offset, size, cache_lock);
1553
16
    }
1554
13.4k
    return true;
1555
13.4k
}
1556
1557
template <class T, class U>
1558
    requires IsXLock<T> && IsXLock<U>
1559
3.42k
void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lock, bool sync) {
1560
3.42k
    auto hash = file_block->get_hash_value();
1561
3.42k
    auto offset = file_block->offset();
1562
3.42k
    auto type = file_block->cache_type();
1563
3.42k
    auto expiration_time = file_block->expiration_time();
1564
3.42k
    auto tablet_id = file_block->tablet_id();
1565
3.42k
    auto* cell = get_cell(hash, offset, cache_lock);
1566
3.42k
    file_block->cell = nullptr;
1567
    // Holder cleanup can race with prior cache metadata cleanup. In that case,
1568
    // skip the duplicate remove instead of touching a detached or replaced cell.
1569
3.42k
    if (cell == nullptr) {
1570
1
        LOG(WARNING) << "remove skipped because cache cell is missing. hash=" << hash.to_string()
1571
1
                     << " offset=" << offset << " size=" << file_block->range().size()
1572
1
                     << " type=" << cache_type_to_string(type)
1573
1
                     << " state=" << FileBlock::state_to_string(file_block->state_unsafe())
1574
1
                     << " expiration_time=" << expiration_time << " sync=" << sync;
1575
1
        return;
1576
1
    }
1577
3.41k
    if (cell->file_block.get() != file_block.get()) {
1578
1
        auto* cell_file_block = cell->file_block.get();
1579
1
        LOG(WARNING)
1580
1
                << "remove skipped because cache cell points to a different file block. hash="
1581
1
                << hash.to_string() << " offset=" << offset
1582
1
                << " size=" << file_block->range().size() << " type=" << cache_type_to_string(type)
1583
1
                << " state=" << FileBlock::state_to_string(file_block->state_unsafe())
1584
1
                << " expiration_time=" << expiration_time << " sync=" << sync << " cell_block_hash="
1585
1
                << (cell_file_block ? cell_file_block->get_hash_value().to_string() : "<null>")
1586
1
                << " cell_block_offset="
1587
1
                << (cell_file_block ? std::to_string(cell_file_block->offset()) : "<null>")
1588
1
                << " cell_block_size="
1589
1
                << (cell_file_block ? std::to_string(cell_file_block->range().size()) : "<null>")
1590
1
                << " cell_block_type="
1591
1
                << (cell_file_block ? cache_type_to_string(cell_file_block->cache_type())
1592
1
                                    : "<null>")
1593
1
                << " cell_block_state="
1594
1
                << (cell_file_block ? FileBlock::state_to_string(cell_file_block->state_unsafe())
1595
1
                                    : "<null>");
1596
1
        return;
1597
1
    }
1598
3.41k
    DCHECK(cell->queue_iterator);
1599
3.41k
    if (cell->queue_iterator) {
1600
3.41k
        auto& queue = get_queue(file_block->cache_type());
1601
3.41k
        queue.remove(*cell->queue_iterator, cache_lock);
1602
3.41k
        _lru_recorder->record_queue_event(file_block->cache_type(), CacheLRULogType::REMOVE,
1603
3.41k
                                          cell->file_block->get_hash_value(),
1604
3.41k
                                          cell->file_block->offset(), cell->size());
1605
3.41k
    }
1606
3.41k
    *_queue_evict_size_metrics[static_cast<int>(file_block->cache_type())]
1607
3.41k
            << file_block->range().size();
1608
3.41k
    *_total_evict_size_metrics << file_block->range().size();
1609
1610
3.41k
    VLOG_DEBUG << "Removing file block from cache. hash: " << hash.to_string()
1611
0
               << ", offset: " << offset << ", size: " << file_block->range().size()
1612
0
               << ", type: " << cache_type_to_string(type);
1613
1614
3.41k
    if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADED) {
1615
1.42k
        FileCacheKey key;
1616
1.42k
        key.hash = hash;
1617
1.42k
        key.offset = offset;
1618
1.42k
        key.meta.type = type;
1619
1.42k
        key.meta.expiration_time = expiration_time;
1620
1.42k
        key.meta.tablet_id = tablet_id;
1621
1.42k
        if (sync) {
1622
1.38k
            int64_t duration_ns = 0;
1623
1.38k
            Status st;
1624
1.38k
            {
1625
1.38k
                SCOPED_RAW_TIMER(&duration_ns);
1626
1.38k
                st = _storage->remove(key);
1627
1.38k
            }
1628
1.38k
            *_storage_sync_remove_latency_us << (duration_ns / 1000);
1629
1.38k
            if (!st.ok()) {
1630
0
                LOG_WARNING("").error(st);
1631
0
            }
1632
1.38k
        } else {
1633
            // the file will be deleted in the bottom half
1634
            // so there will be a window that the file is not in the cache but still in the storage
1635
            // but it's ok, because the rowset is stale already
1636
47
            bool ret = _recycle_keys.enqueue(key);
1637
47
            if (ret) [[likely]] {
1638
47
                _recycle_keys_length_metrics->set_value(_recycle_keys.size_approx());
1639
47
            } else {
1640
0
                LOG_WARNING("Failed to push recycle key to queue, do it synchronously");
1641
0
                int64_t duration_ns = 0;
1642
0
                Status st;
1643
0
                {
1644
0
                    SCOPED_RAW_TIMER(&duration_ns);
1645
0
                    st = _storage->remove(key);
1646
0
                }
1647
0
                *_storage_retry_sync_remove_latency_us << (duration_ns / 1000);
1648
0
                if (!st.ok()) {
1649
0
                    LOG_WARNING("").error(st);
1650
0
                }
1651
0
            }
1652
47
        }
1653
1.99k
    } else if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADING) {
1654
0
        file_block->set_deleting();
1655
0
        return;
1656
0
    }
1657
3.41k
    _cur_cache_size -= file_block->range().size();
1658
3.41k
    if (FileCacheType::TTL == type) {
1659
1.29k
        _cur_ttl_size -= file_block->range().size();
1660
1.29k
    }
1661
3.41k
    auto it = _files.find(hash);
1662
3.41k
    if (it != _files.end()) {
1663
3.41k
        it->second.erase(file_block->offset());
1664
3.41k
        if (it->second.empty()) {
1665
1.34k
            _files.erase(hash);
1666
1.34k
        }
1667
3.41k
    }
1668
3.41k
    *_num_removed_blocks << 1;
1669
3.41k
}
1670
1671
46
size_t BlockFileCache::get_used_cache_size(FileCacheType cache_type) const {
1672
46
    SCOPED_CACHE_LOCK(_mutex, this);
1673
46
    return get_used_cache_size_unlocked(cache_type, cache_lock);
1674
46
}
1675
1676
size_t BlockFileCache::get_used_cache_size_unlocked(FileCacheType cache_type,
1677
46
                                                    std::lock_guard<std::mutex>& cache_lock) const {
1678
46
    return get_queue(cache_type).get_capacity(cache_lock);
1679
46
}
1680
1681
0
size_t BlockFileCache::get_available_cache_size(FileCacheType cache_type) const {
1682
0
    SCOPED_CACHE_LOCK(_mutex, this);
1683
0
    return get_available_cache_size_unlocked(cache_type, cache_lock);
1684
0
}
1685
1686
size_t BlockFileCache::get_available_cache_size_unlocked(
1687
0
        FileCacheType cache_type, std::lock_guard<std::mutex>& cache_lock) const {
1688
0
    return get_queue(cache_type).get_max_element_size() -
1689
0
           get_used_cache_size_unlocked(cache_type, cache_lock);
1690
0
}
1691
1692
91
size_t BlockFileCache::get_file_blocks_num(FileCacheType cache_type) const {
1693
91
    SCOPED_CACHE_LOCK(_mutex, this);
1694
91
    return get_file_blocks_num_unlocked(cache_type, cache_lock);
1695
91
}
1696
1697
size_t BlockFileCache::get_file_blocks_num_unlocked(FileCacheType cache_type,
1698
91
                                                    std::lock_guard<std::mutex>& cache_lock) const {
1699
91
    return get_queue(cache_type).get_elements_num(cache_lock);
1700
91
}
1701
1702
FileBlockCell::FileBlockCell(FileBlockSPtr file_block, std::lock_guard<std::mutex>& cache_lock)
1703
13.6k
        : file_block(file_block) {
1704
13.6k
    file_block->cell = this;
1705
    /**
1706
     * Cell can be created with either DOWNLOADED or EMPTY file block's state.
1707
     * File block acquires DOWNLOADING state and creates LRUQueue iterator on first
1708
     * successful getOrSetDownaloder call.
1709
     */
1710
1711
13.6k
    switch (file_block->_download_state) {
1712
198
    case FileBlock::State::DOWNLOADED:
1713
13.6k
    case FileBlock::State::EMPTY:
1714
13.6k
    case FileBlock::State::SKIP_CACHE: {
1715
13.6k
        break;
1716
13.6k
    }
1717
0
    default:
1718
0
        DCHECK(false) << "Can create cell with either EMPTY, DOWNLOADED, SKIP_CACHE state, got: "
1719
0
                      << FileBlock::state_to_string(file_block->_download_state);
1720
13.6k
    }
1721
13.6k
    if (file_block->cache_type() == FileCacheType::TTL) {
1722
3.92k
        update_atime();
1723
3.92k
    }
1724
13.6k
}
1725
1726
LRUQueue::Iterator LRUQueue::add(const UInt128Wrapper& hash, size_t offset, size_t size,
1727
14.6k
                                 std::lock_guard<std::mutex>& /* cache_lock */) {
1728
14.6k
    cache_size += size;
1729
14.6k
    auto iter = queue.insert(queue.end(), FileKeyAndOffset(hash, offset, size));
1730
14.6k
    map.insert(std::make_pair(std::make_pair(hash, offset), iter));
1731
14.6k
    return iter;
1732
14.6k
}
1733
1734
0
void LRUQueue::remove_all(std::lock_guard<std::mutex>& /* cache_lock */) {
1735
0
    queue.clear();
1736
0
    map.clear();
1737
0
    cache_size = 0;
1738
0
}
1739
1740
521k
void LRUQueue::move_to_end(Iterator queue_it, std::lock_guard<std::mutex>& /* cache_lock */) {
1741
521k
    queue.splice(queue.end(), queue, queue_it);
1742
521k
}
1743
1744
void LRUQueue::resize(Iterator queue_it, size_t new_size,
1745
6
                      std::lock_guard<std::mutex>& /* cache_lock */) {
1746
6
    cache_size -= queue_it->size;
1747
6
    queue_it->size = new_size;
1748
6
    cache_size += new_size;
1749
6
}
1750
bool LRUQueue::contains(const UInt128Wrapper& hash, size_t offset,
1751
5
                        std::lock_guard<std::mutex>& /* cache_lock */) const {
1752
5
    return map.find(std::make_pair(hash, offset)) != map.end();
1753
5
}
1754
1755
LRUQueue::Iterator LRUQueue::get(const UInt128Wrapper& hash, size_t offset,
1756
1.82k
                                 std::lock_guard<std::mutex>& /* cache_lock */) const {
1757
1.82k
    auto itr = map.find(std::make_pair(hash, offset));
1758
1.82k
    if (itr != map.end()) {
1759
910
        return itr->second;
1760
910
    }
1761
915
    return std::list<FileKeyAndOffset>::iterator();
1762
1.82k
}
1763
1764
2
std::string LRUQueue::to_string(std::lock_guard<std::mutex>& /* cache_lock */) const {
1765
2
    std::string result;
1766
18
    for (const auto& [hash, offset, size] : queue) {
1767
18
        if (!result.empty()) {
1768
16
            result += ", ";
1769
16
        }
1770
18
        result += fmt::format("{}: [{}, {}]", hash.to_string(), offset, offset + size - 1);
1771
18
    }
1772
2
    return result;
1773
2
}
1774
1775
size_t LRUQueue::levenshtein_distance_from(LRUQueue& base,
1776
5
                                           std::lock_guard<std::mutex>& cache_lock) {
1777
5
    std::list<FileKeyAndOffset> target_queue = this->queue;
1778
5
    std::list<FileKeyAndOffset> base_queue = base.queue;
1779
5
    std::vector<FileKeyAndOffset> vec1(target_queue.begin(), target_queue.end());
1780
5
    std::vector<FileKeyAndOffset> vec2(base_queue.begin(), base_queue.end());
1781
1782
5
    size_t m = vec1.size();
1783
5
    size_t n = vec2.size();
1784
1785
    // Create a 2D vector (matrix) to store the Levenshtein distances
1786
    // dp[i][j] will hold the distance between the first i elements of vec1 and the first j elements of vec2
1787
5
    std::vector<std::vector<size_t>> dp(m + 1, std::vector<size_t>(n + 1, 0));
1788
1789
    // Initialize the first row and column of the matrix
1790
    // The distance between an empty list and a list of length k is k (all insertions or deletions)
1791
22
    for (size_t i = 0; i <= m; ++i) {
1792
17
        dp[i][0] = i;
1793
17
    }
1794
20
    for (size_t j = 0; j <= n; ++j) {
1795
15
        dp[0][j] = j;
1796
15
    }
1797
1798
    // Fill the matrix using dynamic programming
1799
17
    for (size_t i = 1; i <= m; ++i) {
1800
38
        for (size_t j = 1; j <= n; ++j) {
1801
            // Check if the current elements of both vectors are equal
1802
26
            size_t cost = (vec1[i - 1].hash == vec2[j - 1].hash &&
1803
26
                           vec1[i - 1].offset == vec2[j - 1].offset)
1804
26
                                  ? 0
1805
26
                                  : 1;
1806
            // Calculate the minimum cost of three possible operations:
1807
            // 1. Insertion: dp[i][j-1] + 1
1808
            // 2. Deletion: dp[i-1][j] + 1
1809
            // 3. Substitution: dp[i-1][j-1] + cost (0 if elements are equal, 1 if not)
1810
26
            dp[i][j] = std::min({dp[i - 1][j] + 1, dp[i][j - 1] + 1, dp[i - 1][j - 1] + cost});
1811
26
        }
1812
12
    }
1813
    // The bottom-right cell of the matrix contains the Levenshtein distance
1814
5
    return dp[m][n];
1815
5
}
1816
1817
1
std::string BlockFileCache::dump_structure(const UInt128Wrapper& hash) {
1818
1
    SCOPED_CACHE_LOCK(_mutex, this);
1819
1
    return dump_structure_unlocked(hash, cache_lock);
1820
1
}
1821
1822
std::string BlockFileCache::dump_structure_unlocked(const UInt128Wrapper& hash,
1823
1
                                                    std::lock_guard<std::mutex>&) {
1824
1
    std::stringstream result;
1825
1
    auto it = _files.find(hash);
1826
1
    if (it == _files.end()) {
1827
0
        return std::string("");
1828
0
    }
1829
1
    const auto& cells_by_offset = it->second;
1830
1831
1
    for (const auto& [_, cell] : cells_by_offset) {
1832
1
        result << cell.file_block->get_info_for_log() << " "
1833
1
               << cache_type_to_string(cell.file_block->cache_type()) << "\n";
1834
1
    }
1835
1836
1
    return result.str();
1837
1
}
1838
1839
0
std::string BlockFileCache::dump_single_cache_type(const UInt128Wrapper& hash, size_t offset) {
1840
0
    SCOPED_CACHE_LOCK(_mutex, this);
1841
0
    return dump_single_cache_type_unlocked(hash, offset, cache_lock);
1842
0
}
1843
1844
std::string BlockFileCache::dump_single_cache_type_unlocked(const UInt128Wrapper& hash,
1845
                                                            size_t offset,
1846
0
                                                            std::lock_guard<std::mutex>&) {
1847
0
    std::stringstream result;
1848
0
    auto it = _files.find(hash);
1849
0
    if (it == _files.end()) {
1850
0
        return std::string("");
1851
0
    }
1852
0
    const auto& cells_by_offset = it->second;
1853
0
    const auto& cell = cells_by_offset.find(offset);
1854
1855
0
    return cache_type_to_string(cell->second.file_block->cache_type());
1856
0
}
1857
1858
void BlockFileCache::change_cache_type(const UInt128Wrapper& hash, size_t offset,
1859
                                       FileCacheType new_type,
1860
9
                                       std::lock_guard<std::mutex>& cache_lock) {
1861
9
    if (auto iter = _files.find(hash); iter != _files.end()) {
1862
9
        auto& file_blocks = iter->second;
1863
9
        if (auto cell_it = file_blocks.find(offset); cell_it != file_blocks.end()) {
1864
8
            FileBlockCell& cell = cell_it->second;
1865
8
            auto& cur_queue = get_queue(cell.file_block->cache_type());
1866
8
            DCHECK(cell.queue_iterator.has_value());
1867
8
            cur_queue.remove(*cell.queue_iterator, cache_lock);
1868
8
            _lru_recorder->record_queue_event(
1869
8
                    cell.file_block->cache_type(), CacheLRULogType::REMOVE,
1870
8
                    cell.file_block->get_hash_value(), cell.file_block->offset(), cell.size());
1871
8
            auto& new_queue = get_queue(new_type);
1872
8
            cell.queue_iterator =
1873
8
                    new_queue.add(hash, offset, cell.file_block->range().size(), cache_lock);
1874
8
            _lru_recorder->record_queue_event(new_type, CacheLRULogType::ADD,
1875
8
                                              cell.file_block->get_hash_value(),
1876
8
                                              cell.file_block->offset(), cell.size());
1877
8
        }
1878
9
    }
1879
9
}
1880
1881
// @brief: get a path's disk capacity used percent, inode used percent
1882
// @param: path
1883
// @param: percent.first disk used percent, percent.second inode used percent
1884
161
int disk_used_percentage(const std::string& path, std::pair<int, int>* percent) {
1885
161
    struct statfs stat;
1886
161
    int ret = statfs(path.c_str(), &stat);
1887
161
    if (ret != 0) {
1888
2
        return ret;
1889
2
    }
1890
    // https://github.com/coreutils/coreutils/blob/master/src/df.c#L1195
1891
    // v->used = stat.f_blocks - stat.f_bfree
1892
    // nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail
1893
159
    uintmax_t u100 = (stat.f_blocks - stat.f_bfree) * 100;
1894
159
    uintmax_t nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail;
1895
159
    int capacity_percentage = int(u100 / nonroot_total + (u100 % nonroot_total != 0));
1896
1897
159
    unsigned long long inode_free = stat.f_ffree;
1898
159
    unsigned long long inode_total = stat.f_files;
1899
159
    int inode_percentage = cast_set<int>(inode_free * 100 / inode_total);
1900
159
    percent->first = capacity_percentage;
1901
159
    percent->second = 100 - inode_percentage;
1902
1903
    // Add sync point for testing
1904
159
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::disk_used_percentage:1", percent);
1905
1906
159
    return 0;
1907
161
}
1908
1909
10
std::string BlockFileCache::reset_capacity(size_t new_capacity) {
1910
10
    using namespace std::chrono;
1911
10
    int64_t space_released = 0;
1912
10
    size_t old_capacity = 0;
1913
10
    std::stringstream ss;
1914
10
    ss << "finish reset_capacity, path=" << _cache_base_path;
1915
10
    auto adjust_start_time = steady_clock::time_point();
1916
10
    {
1917
10
        SCOPED_CACHE_LOCK(_mutex, this);
1918
10
        if (new_capacity < _capacity && new_capacity < _cur_cache_size) {
1919
1
            int64_t need_remove_size = _cur_cache_size - new_capacity;
1920
4
            auto remove_blocks = [&](LRUQueue& queue) -> int64_t {
1921
4
                int64_t queue_released = 0;
1922
4
                std::vector<FileBlockCell*> to_evict;
1923
13
                for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1924
13
                    if (need_remove_size <= 0) {
1925
1
                        break;
1926
1
                    }
1927
12
                    need_remove_size -= entry_size;
1928
12
                    space_released += entry_size;
1929
12
                    queue_released += entry_size;
1930
12
                    auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1931
12
                    if (!cell->releasable()) {
1932
0
                        cell->file_block->set_deleting();
1933
0
                        continue;
1934
0
                    }
1935
12
                    to_evict.push_back(cell);
1936
12
                }
1937
12
                for (auto& cell : to_evict) {
1938
12
                    FileBlockSPtr file_block = cell->file_block;
1939
12
                    std::lock_guard block_lock(file_block->_mutex);
1940
12
                    remove(file_block, cache_lock, block_lock);
1941
12
                }
1942
4
                return queue_released;
1943
4
            };
1944
1
            int64_t queue_released = remove_blocks(_disposable_queue);
1945
1
            ss << " disposable_queue released " << queue_released;
1946
1
            queue_released = remove_blocks(_normal_queue);
1947
1
            ss << " normal_queue released " << queue_released;
1948
1
            queue_released = remove_blocks(_index_queue);
1949
1
            ss << " index_queue released " << queue_released;
1950
1
            queue_released = remove_blocks(_ttl_queue);
1951
1
            ss << " ttl_queue released " << queue_released;
1952
1953
1
            _disk_resource_limit_mode = true;
1954
1
            _disk_limit_mode_metrics->set_value(1);
1955
1
            ss << " total_space_released=" << space_released;
1956
1
        }
1957
10
        old_capacity = _capacity;
1958
10
        _capacity = new_capacity;
1959
10
        _cache_capacity_metrics->set_value(_capacity);
1960
10
    }
1961
10
    auto use_time = duration_cast<milliseconds>(steady_clock::time_point() - adjust_start_time);
1962
10
    LOG(INFO) << "Finish tag deleted block. path=" << _cache_base_path
1963
10
              << " use_time=" << cast_set<int64_t>(use_time.count());
1964
10
    ss << " old_capacity=" << old_capacity << " new_capacity=" << new_capacity;
1965
10
    LOG(INFO) << ss.str();
1966
10
    return ss.str();
1967
10
}
1968
1969
939
void BlockFileCache::check_disk_resource_limit() {
1970
939
    if (_storage->get_type() != FileCacheStorageType::DISK) {
1971
792
        return;
1972
792
    }
1973
1974
147
    bool previous_mode = _disk_resource_limit_mode;
1975
147
    if (_capacity > _cur_cache_size) {
1976
145
        _disk_resource_limit_mode = false;
1977
145
        _disk_limit_mode_metrics->set_value(0);
1978
145
    }
1979
147
    std::pair<int, int> percent;
1980
147
    int ret = disk_used_percentage(_cache_base_path, &percent);
1981
147
    if (ret != 0) {
1982
1
        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
1983
1
        return;
1984
1
    }
1985
146
    auto [space_percentage, inode_percentage] = percent;
1986
292
    auto is_insufficient = [](const int& percentage) {
1987
292
        return percentage >= config::file_cache_enter_disk_resource_limit_mode_percent;
1988
292
    };
1989
146
    DCHECK_GE(space_percentage, 0);
1990
146
    DCHECK_LE(space_percentage, 100);
1991
146
    DCHECK_GE(inode_percentage, 0);
1992
146
    DCHECK_LE(inode_percentage, 100);
1993
    // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
1994
    // FIXME: reject with config validator
1995
146
    if (config::file_cache_enter_disk_resource_limit_mode_percent <
1996
146
        config::file_cache_exit_disk_resource_limit_mode_percent) {
1997
1
        LOG_WARNING("config error, set to default value")
1998
1
                .tag("enter", config::file_cache_enter_disk_resource_limit_mode_percent)
1999
1
                .tag("exit", config::file_cache_exit_disk_resource_limit_mode_percent);
2000
1
        config::file_cache_enter_disk_resource_limit_mode_percent = 88;
2001
1
        config::file_cache_exit_disk_resource_limit_mode_percent = 80;
2002
1
    }
2003
146
    bool is_space_insufficient = is_insufficient(space_percentage);
2004
146
    bool is_inode_insufficient = is_insufficient(inode_percentage);
2005
146
    if (is_space_insufficient || is_inode_insufficient) {
2006
1
        _disk_resource_limit_mode = true;
2007
1
        _disk_limit_mode_metrics->set_value(1);
2008
145
    } else if (_disk_resource_limit_mode &&
2009
145
               (space_percentage < config::file_cache_exit_disk_resource_limit_mode_percent) &&
2010
145
               (inode_percentage < config::file_cache_exit_disk_resource_limit_mode_percent)) {
2011
0
        _disk_resource_limit_mode = false;
2012
0
        _disk_limit_mode_metrics->set_value(0);
2013
0
    }
2014
146
    if (previous_mode != _disk_resource_limit_mode) {
2015
        // add log for disk resource limit mode switching
2016
2
        if (_disk_resource_limit_mode) {
2017
1
            LOG(WARNING) << "Entering disk resource limit mode: file_cache=" << get_base_path()
2018
1
                         << " space_percent=" << space_percentage
2019
1
                         << " inode_percent=" << inode_percentage
2020
1
                         << " is_space_insufficient=" << is_space_insufficient
2021
1
                         << " is_inode_insufficient=" << is_inode_insufficient
2022
1
                         << " enter threshold="
2023
1
                         << config::file_cache_enter_disk_resource_limit_mode_percent;
2024
1
        } else {
2025
1
            LOG(INFO) << "Exiting disk resource limit mode: file_cache=" << get_base_path()
2026
1
                      << " space_percent=" << space_percentage
2027
1
                      << " inode_percent=" << inode_percentage << " exit threshold="
2028
1
                      << config::file_cache_exit_disk_resource_limit_mode_percent;
2029
1
        }
2030
144
    } else if (_disk_resource_limit_mode) {
2031
        // print log for disk resource limit mode running, but less frequently
2032
0
        LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path()
2033
0
                                 << " space_percent=" << space_percentage
2034
0
                                 << " inode_percent=" << inode_percentage
2035
0
                                 << " is_space_insufficient=" << is_space_insufficient
2036
0
                                 << " is_inode_insufficient=" << is_inode_insufficient
2037
0
                                 << " mode run in resource limit";
2038
0
    }
2039
146
}
2040
2041
15
void BlockFileCache::check_need_evict_cache_in_advance() {
2042
15
    if (_storage->get_type() != FileCacheStorageType::DISK) {
2043
1
        return;
2044
1
    }
2045
2046
14
    std::pair<int, int> percent;
2047
14
    int ret = disk_used_percentage(_cache_base_path, &percent);
2048
14
    if (ret != 0) {
2049
1
        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
2050
1
        return;
2051
1
    }
2052
13
    auto [space_percentage, inode_percentage] = percent;
2053
13
    int size_percentage = static_cast<int>(_cur_cache_size * 100 / _capacity);
2054
39
    auto is_insufficient = [](const int& percentage) {
2055
39
        return percentage >= config::file_cache_enter_need_evict_cache_in_advance_percent;
2056
39
    };
2057
13
    DCHECK_GE(space_percentage, 0);
2058
13
    DCHECK_LE(space_percentage, 100);
2059
13
    DCHECK_GE(inode_percentage, 0);
2060
13
    DCHECK_LE(inode_percentage, 100);
2061
    // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
2062
    // FIXME: reject with config validator
2063
13
    if (config::file_cache_enter_need_evict_cache_in_advance_percent <=
2064
13
        config::file_cache_exit_need_evict_cache_in_advance_percent) {
2065
1
        LOG_WARNING("config error, set to default value")
2066
1
                .tag("enter", config::file_cache_enter_need_evict_cache_in_advance_percent)
2067
1
                .tag("exit", config::file_cache_exit_need_evict_cache_in_advance_percent);
2068
1
        config::file_cache_enter_need_evict_cache_in_advance_percent = 78;
2069
1
        config::file_cache_exit_need_evict_cache_in_advance_percent = 75;
2070
1
    }
2071
13
    bool previous_mode = _need_evict_cache_in_advance;
2072
13
    bool is_space_insufficient = is_insufficient(space_percentage);
2073
13
    bool is_inode_insufficient = is_insufficient(inode_percentage);
2074
13
    bool is_size_insufficient = is_insufficient(size_percentage);
2075
13
    if (is_space_insufficient || is_inode_insufficient || is_size_insufficient) {
2076
9
        _need_evict_cache_in_advance = true;
2077
9
        _need_evict_cache_in_advance_metrics->set_value(1);
2078
9
    } else if (_need_evict_cache_in_advance &&
2079
4
               (space_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
2080
4
               (inode_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
2081
4
               (size_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent)) {
2082
1
        _need_evict_cache_in_advance = false;
2083
1
        _need_evict_cache_in_advance_metrics->set_value(0);
2084
1
    }
2085
13
    if (previous_mode != _need_evict_cache_in_advance) {
2086
        // add log for evict cache in advance mode switching
2087
6
        if (_need_evict_cache_in_advance) {
2088
5
            LOG(WARNING) << "Entering evict cache in advance mode: "
2089
5
                         << "file_cache=" << get_base_path()
2090
5
                         << " space_percent=" << space_percentage
2091
5
                         << " inode_percent=" << inode_percentage
2092
5
                         << " size_percent=" << size_percentage
2093
5
                         << " is_space_insufficient=" << is_space_insufficient
2094
5
                         << " is_inode_insufficient=" << is_inode_insufficient
2095
5
                         << " is_size_insufficient=" << is_size_insufficient << " enter threshold="
2096
5
                         << config::file_cache_enter_need_evict_cache_in_advance_percent;
2097
5
        } else {
2098
1
            LOG(INFO) << "Exiting evict cache in advance mode: "
2099
1
                      << "file_cache=" << get_base_path() << " space_percent=" << space_percentage
2100
1
                      << " inode_percent=" << inode_percentage
2101
1
                      << " size_percent=" << size_percentage << " exit threshold="
2102
1
                      << config::file_cache_exit_need_evict_cache_in_advance_percent;
2103
1
        }
2104
7
    } else if (_need_evict_cache_in_advance) {
2105
        // print log for evict cache in advance mode running, but less frequently
2106
4
        LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path()
2107
1
                                 << " space_percent=" << space_percentage
2108
1
                                 << " inode_percent=" << inode_percentage
2109
1
                                 << " size_percent=" << size_percentage
2110
1
                                 << " is_space_insufficient=" << is_space_insufficient
2111
1
                                 << " is_inode_insufficient=" << is_inode_insufficient
2112
1
                                 << " is_size_insufficient=" << is_size_insufficient
2113
1
                                 << " need evict cache in advance";
2114
4
    }
2115
13
}
2116
2117
151
void BlockFileCache::run_background_monitor() {
2118
151
    Thread::set_self_name("run_background_monitor");
2119
939
    while (!_close) {
2120
939
        int64_t interval_ms = config::file_cache_background_monitor_interval_ms;
2121
939
        TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_ms);
2122
939
        check_disk_resource_limit();
2123
939
        if (config::enable_evict_file_cache_in_advance) {
2124
7
            check_need_evict_cache_in_advance();
2125
932
        } else {
2126
932
            _need_evict_cache_in_advance = false;
2127
932
            _need_evict_cache_in_advance_metrics->set_value(0);
2128
932
        }
2129
2130
939
        {
2131
939
            std::unique_lock close_lock(_close_mtx);
2132
939
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2133
939
            if (_close) {
2134
151
                break;
2135
151
            }
2136
939
        }
2137
        // report
2138
788
        {
2139
788
            SCOPED_CACHE_LOCK(_mutex, this);
2140
788
            _cur_cache_size_metrics->set_value(_cur_cache_size);
2141
788
            _cur_ttl_cache_size_metrics->set_value(_cur_cache_size -
2142
788
                                                   _index_queue.get_capacity(cache_lock) -
2143
788
                                                   _normal_queue.get_capacity(cache_lock) -
2144
788
                                                   _disposable_queue.get_capacity(cache_lock));
2145
788
            _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(
2146
788
                    _ttl_queue.get_capacity(cache_lock));
2147
788
            _cur_ttl_cache_lru_queue_element_count_metrics->set_value(
2148
788
                    _ttl_queue.get_elements_num(cache_lock));
2149
788
            _cur_normal_queue_cache_size_metrics->set_value(_normal_queue.get_capacity(cache_lock));
2150
788
            _cur_normal_queue_element_count_metrics->set_value(
2151
788
                    _normal_queue.get_elements_num(cache_lock));
2152
788
            _cur_index_queue_cache_size_metrics->set_value(_index_queue.get_capacity(cache_lock));
2153
788
            _cur_index_queue_element_count_metrics->set_value(
2154
788
                    _index_queue.get_elements_num(cache_lock));
2155
788
            _cur_disposable_queue_cache_size_metrics->set_value(
2156
788
                    _disposable_queue.get_capacity(cache_lock));
2157
788
            _cur_disposable_queue_element_count_metrics->set_value(
2158
788
                    _disposable_queue.get_elements_num(cache_lock));
2159
2160
            // Update meta store write queue size if storage is FSFileCacheStorage
2161
788
            if (_storage->get_type() == FileCacheStorageType::DISK) {
2162
15
                auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get());
2163
15
                if (fs_storage != nullptr) {
2164
15
                    auto* meta_store = fs_storage->get_meta_store();
2165
15
                    if (meta_store != nullptr) {
2166
15
                        _meta_store_write_queue_size_metrics->set_value(
2167
15
                                meta_store->get_write_queue_size());
2168
15
                    }
2169
15
                }
2170
15
            }
2171
2172
788
            if (_num_read_blocks->get_value() > 0) {
2173
13
                _hit_ratio->set_value((double)_num_hit_blocks->get_value() /
2174
13
                                      (double)_num_read_blocks->get_value());
2175
13
            }
2176
788
            if (_num_read_blocks_5m && _num_read_blocks_5m->get_value() > 0) {
2177
0
                _hit_ratio_5m->set_value((double)_num_hit_blocks_5m->get_value() /
2178
0
                                         (double)_num_read_blocks_5m->get_value());
2179
0
            }
2180
788
            if (_num_read_blocks_1h && _num_read_blocks_1h->get_value() > 0) {
2181
0
                _hit_ratio_1h->set_value((double)_num_hit_blocks_1h->get_value() /
2182
0
                                         (double)_num_read_blocks_1h->get_value());
2183
0
            }
2184
2185
788
            if (_no_warmup_num_read_blocks->get_value() > 0) {
2186
13
                _no_warmup_hit_ratio->set_value((double)_no_warmup_num_hit_blocks->get_value() /
2187
13
                                                (double)_no_warmup_num_read_blocks->get_value());
2188
13
            }
2189
788
            if (_no_warmup_num_read_blocks_5m && _no_warmup_num_read_blocks_5m->get_value() > 0) {
2190
0
                _no_warmup_hit_ratio_5m->set_value(
2191
0
                        (double)_no_warmup_num_hit_blocks_5m->get_value() /
2192
0
                        (double)_no_warmup_num_read_blocks_5m->get_value());
2193
0
            }
2194
788
            if (_no_warmup_num_read_blocks_1h && _no_warmup_num_read_blocks_1h->get_value() > 0) {
2195
0
                _no_warmup_hit_ratio_1h->set_value(
2196
0
                        (double)_no_warmup_num_hit_blocks_1h->get_value() /
2197
0
                        (double)_no_warmup_num_read_blocks_1h->get_value());
2198
0
            }
2199
788
        }
2200
788
    }
2201
151
}
2202
2203
151
void BlockFileCache::run_background_gc() {
2204
151
    Thread::set_self_name("run_background_gc");
2205
151
    FileCacheKey key;
2206
151
    size_t batch_count = 0;
2207
381
    while (!_close) {
2208
381
        int64_t interval_ms = config::file_cache_background_gc_interval_ms;
2209
381
        size_t batch_limit = config::file_cache_remove_block_qps_limit * interval_ms / 1000;
2210
381
        {
2211
381
            std::unique_lock close_lock(_close_mtx);
2212
381
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2213
381
            if (_close) {
2214
151
                break;
2215
151
            }
2216
381
        }
2217
2218
239
        while (batch_count < batch_limit && _recycle_keys.try_dequeue(key)) {
2219
9
            int64_t duration_ns = 0;
2220
9
            Status st;
2221
9
            {
2222
9
                SCOPED_RAW_TIMER(&duration_ns);
2223
9
                st = _storage->remove(key);
2224
9
            }
2225
9
            *_storage_async_remove_latency_us << (duration_ns / 1000);
2226
2227
9
            if (!st.ok()) {
2228
0
                LOG_WARNING("").error(st);
2229
0
            }
2230
9
            batch_count++;
2231
9
        }
2232
230
        _recycle_keys_length_metrics->set_value(_recycle_keys.size_approx());
2233
230
        batch_count = 0;
2234
230
    }
2235
151
}
2236
2237
151
void BlockFileCache::run_background_evict_in_advance() {
2238
151
    Thread::set_self_name("run_background_evict_in_advance");
2239
151
    LOG(INFO) << "Starting background evict in advance thread";
2240
151
    int64_t batch = 0;
2241
4.14k
    while (!_close) {
2242
4.14k
        {
2243
4.14k
            std::unique_lock close_lock(_close_mtx);
2244
4.14k
            _close_cv.wait_for(
2245
4.14k
                    close_lock,
2246
4.14k
                    std::chrono::milliseconds(config::file_cache_evict_in_advance_interval_ms));
2247
4.14k
            if (_close) {
2248
151
                LOG(INFO) << "Background evict in advance thread exiting due to cache closing";
2249
151
                break;
2250
151
            }
2251
4.14k
        }
2252
3.99k
        batch = config::file_cache_evict_in_advance_batch_bytes;
2253
2254
        // Skip if eviction not needed or too many pending recycles
2255
3.99k
        if (!_need_evict_cache_in_advance ||
2256
3.99k
            _recycle_keys.size_approx() >=
2257
3.99k
                    config::file_cache_evict_in_advance_recycle_keys_num_threshold) {
2258
3.99k
            continue;
2259
3.99k
        }
2260
2261
3
        int64_t duration_ns = 0;
2262
3
        {
2263
3
            SCOPED_CACHE_LOCK(_mutex, this);
2264
3
            SCOPED_RAW_TIMER(&duration_ns);
2265
3
            try_evict_in_advance(batch, cache_lock);
2266
3
        }
2267
3
        *_evict_in_advance_latency_us << (duration_ns / 1000);
2268
3
    }
2269
151
}
2270
2271
151
void BlockFileCache::run_background_block_lru_update() {
2272
151
    Thread::set_self_name("run_background_block_lru_update");
2273
151
    std::vector<FileBlockSPtr> batch;
2274
4.07k
    while (!_close) {
2275
4.07k
        size_t backlog = _need_update_lru_blocks.size();
2276
4.07k
        QueueConsumePlan plan = build_block_lru_update_plan(backlog);
2277
4.07k
        {
2278
4.07k
            std::unique_lock close_lock(_close_mtx);
2279
4.07k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(plan.interval_ms));
2280
4.07k
            if (_close) {
2281
151
                break;
2282
151
            }
2283
4.07k
        }
2284
2285
3.91k
        batch.clear();
2286
3.91k
        batch.reserve(plan.batch_limit);
2287
3.91k
        size_t drained = _need_update_lru_blocks.drain(plan.batch_limit, &batch);
2288
3.91k
        if (drained == 0) {
2289
3.91k
            _need_update_lru_blocks_length_metrics->set_value(_need_update_lru_blocks.size());
2290
3.91k
            continue;
2291
3.91k
        }
2292
2293
5
        int64_t duration_ns = 0;
2294
5
        const size_t slice_batch = std::min(kBlockLruUpdateLockSliceBatch, drained);
2295
10
        for (size_t begin = 0; begin < batch.size(); begin += slice_batch) {
2296
5
            const size_t end = std::min(begin + slice_batch, batch.size());
2297
5
            {
2298
5
                SCOPED_CACHE_LOCK(_mutex, this);
2299
5
                SCOPED_RAW_TIMER(&duration_ns);
2300
10
                for (size_t i = begin; i < end; ++i) {
2301
5
                    update_block_lru(batch[i], cache_lock);
2302
5
                }
2303
5
            }
2304
5
        }
2305
5
        *_update_lru_blocks_latency_us << (duration_ns / 1000);
2306
5
        _need_update_lru_blocks_length_metrics->set_value(_need_update_lru_blocks.size());
2307
5
        if (backlog >= kBlockLruUpdateAdaptiveHighWatermark) {
2308
0
            LOG_EVERY_N(WARNING, 60)
2309
0
                    << "need_update_lru_blocks backlog is high, backlog=" << backlog
2310
0
                    << " drained=" << drained << " interval_ms=" << plan.interval_ms
2311
0
                    << " batch_limit=" << plan.batch_limit;
2312
0
        }
2313
5
    }
2314
151
}
2315
2316
std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>>
2317
2
BlockFileCache::get_hot_blocks_meta(const UInt128Wrapper& hash) const {
2318
2
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
2319
2
                               std::chrono::steady_clock::now().time_since_epoch())
2320
2
                               .count();
2321
2
    SCOPED_CACHE_LOCK(_mutex, this);
2322
2
    std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>> blocks_meta;
2323
2
    if (auto iter = _files.find(hash); iter != _files.end()) {
2324
5
        for (auto& pair : _files.find(hash)->second) {
2325
5
            const FileBlockCell* cell = &pair.second;
2326
5
            if (cell->file_block->cache_type() != FileCacheType::DISPOSABLE) {
2327
4
                if (cell->file_block->cache_type() == FileCacheType::TTL ||
2328
4
                    (cell->atime != 0 &&
2329
3
                     cur_time - cell->atime <
2330
3
                             get_queue(cell->file_block->cache_type()).get_hot_data_interval())) {
2331
3
                    blocks_meta.emplace_back(pair.first, cell->size(),
2332
3
                                             cell->file_block->cache_type(),
2333
3
                                             cell->file_block->expiration_time());
2334
3
                }
2335
4
            }
2336
5
        }
2337
2
    }
2338
2
    return blocks_meta;
2339
2
}
2340
2341
bool BlockFileCache::try_reserve_during_async_load(size_t size,
2342
4
                                                   std::lock_guard<std::mutex>& cache_lock) {
2343
4
    size_t removed_size = 0;
2344
4
    size_t normal_queue_size = _normal_queue.get_capacity(cache_lock);
2345
4
    size_t disposable_queue_size = _disposable_queue.get_capacity(cache_lock);
2346
4
    size_t index_queue_size = _index_queue.get_capacity(cache_lock);
2347
2348
4
    std::vector<FileBlockCell*> to_evict;
2349
4
    auto collect_eliminate_fragments = [&](LRUQueue& queue) {
2350
4
        for (const auto& [entry_key, entry_offset, entry_size] : queue) {
2351
4
            if (!_disk_resource_limit_mode || removed_size >= size) {
2352
3
                break;
2353
3
            }
2354
1
            auto* cell = get_cell(entry_key, entry_offset, cache_lock);
2355
2356
1
            DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
2357
0
                         << ", offset: " << entry_offset;
2358
2359
1
            size_t cell_size = cell->size();
2360
1
            DCHECK(entry_size == cell_size);
2361
2362
1
            if (cell->releasable()) {
2363
1
                auto& file_block = cell->file_block;
2364
2365
1
                std::lock_guard block_lock(file_block->_mutex);
2366
1
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
2367
1
                to_evict.push_back(cell);
2368
1
                removed_size += cell_size;
2369
1
            }
2370
1
        }
2371
3
    };
2372
4
    if (disposable_queue_size != 0) {
2373
0
        collect_eliminate_fragments(get_queue(FileCacheType::DISPOSABLE));
2374
0
    }
2375
4
    if (normal_queue_size != 0) {
2376
3
        collect_eliminate_fragments(get_queue(FileCacheType::NORMAL));
2377
3
    }
2378
4
    if (index_queue_size != 0) {
2379
0
        collect_eliminate_fragments(get_queue(FileCacheType::INDEX));
2380
0
    }
2381
4
    std::string reason = "async load";
2382
4
    remove_file_blocks(to_evict, cache_lock, true, reason);
2383
2384
4
    return !_disk_resource_limit_mode || removed_size >= size;
2385
4
}
2386
2387
30
void BlockFileCache::clear_need_update_lru_blocks() {
2388
30
    _need_update_lru_blocks.clear();
2389
30
    _need_update_lru_blocks_length_metrics->set_value(_need_update_lru_blocks.size());
2390
30
}
2391
2392
28
void BlockFileCache::pause_ttl_manager() {
2393
28
    if (_ttl_mgr) {
2394
4
        _ttl_mgr->stop();
2395
4
    }
2396
28
}
2397
2398
28
void BlockFileCache::resume_ttl_manager() {
2399
28
    if (_ttl_mgr) {
2400
4
        _ttl_mgr->resume();
2401
4
    }
2402
28
}
2403
2404
28
std::string BlockFileCache::clear_file_cache_directly() {
2405
28
    pause_ttl_manager();
2406
28
    _lru_dumper->remove_lru_dump_files();
2407
28
    using namespace std::chrono;
2408
28
    std::stringstream ss;
2409
28
    auto start = steady_clock::now();
2410
28
    std::string result;
2411
28
    {
2412
28
        SCOPED_CACHE_LOCK(_mutex, this);
2413
28
        LOG_INFO("start clear_file_cache_directly").tag("path", _cache_base_path);
2414
2415
28
        std::string clear_msg;
2416
28
        auto s = _storage->clear(clear_msg);
2417
28
        if (!s.ok()) {
2418
1
            result = clear_msg;
2419
27
        } else {
2420
27
            int64_t num_files = _files.size();
2421
27
            int64_t cache_size = _cur_cache_size;
2422
27
            int64_t index_queue_size = _index_queue.get_elements_num(cache_lock);
2423
27
            int64_t normal_queue_size = _normal_queue.get_elements_num(cache_lock);
2424
27
            int64_t disposible_queue_size = _disposable_queue.get_elements_num(cache_lock);
2425
27
            int64_t ttl_queue_size = _ttl_queue.get_elements_num(cache_lock);
2426
2427
27
            int64_t clear_fd_duration = 0;
2428
27
            {
2429
                // clear FDCache to release fd
2430
27
                SCOPED_RAW_TIMER(&clear_fd_duration);
2431
5.12k
                for (const auto& [file_key, file_blocks] : _files) {
2432
5.12k
                    for (const auto& [offset, file_block_cell] : file_blocks) {
2433
5.12k
                        AccessKeyAndOffset access_key_and_offset(file_key, offset);
2434
5.12k
                        FDCache::instance()->remove_file_reader(access_key_and_offset);
2435
5.12k
                    }
2436
5.12k
                }
2437
27
            }
2438
2439
27
            _files.clear();
2440
27
            _cur_cache_size = 0;
2441
27
            _cur_ttl_size = 0;
2442
27
            _time_to_key.clear();
2443
27
            _key_to_time.clear();
2444
27
            _index_queue.clear(cache_lock);
2445
27
            _normal_queue.clear(cache_lock);
2446
27
            _disposable_queue.clear(cache_lock);
2447
27
            _ttl_queue.clear(cache_lock);
2448
2449
            // Update cache metrics immediately so consumers observe the cleared state
2450
            // without waiting for the next background monitor round.
2451
27
            _cur_cache_size_metrics->set_value(0);
2452
27
            _cur_ttl_cache_size_metrics->set_value(0);
2453
27
            _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(0);
2454
27
            _cur_ttl_cache_lru_queue_element_count_metrics->set_value(0);
2455
27
            _cur_normal_queue_cache_size_metrics->set_value(0);
2456
27
            _cur_normal_queue_element_count_metrics->set_value(0);
2457
27
            _cur_index_queue_cache_size_metrics->set_value(0);
2458
27
            _cur_index_queue_element_count_metrics->set_value(0);
2459
27
            _cur_disposable_queue_cache_size_metrics->set_value(0);
2460
27
            _cur_disposable_queue_element_count_metrics->set_value(0);
2461
2462
27
            clear_need_update_lru_blocks();
2463
2464
27
            ss << "finish clear_file_cache_directly"
2465
27
               << " path=" << _cache_base_path << " time_elapsed_ms="
2466
27
               << duration_cast<milliseconds>(steady_clock::now() - start).count()
2467
27
               << " fd_clear_time_ms=" << (clear_fd_duration / 1000000)
2468
27
               << " num_files=" << num_files << " cache_size=" << cache_size
2469
27
               << " index_queue_size=" << index_queue_size
2470
27
               << " normal_queue_size=" << normal_queue_size
2471
27
               << " disposible_queue_size=" << disposible_queue_size
2472
27
               << "ttl_queue_size=" << ttl_queue_size;
2473
27
            result = ss.str();
2474
27
            LOG(INFO) << result;
2475
27
        }
2476
28
    }
2477
28
    _lru_dumper->remove_lru_dump_files();
2478
28
    resume_ttl_manager();
2479
28
    return result;
2480
28
}
2481
2482
5.14k
std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const UInt128Wrapper& hash) {
2483
5.14k
    std::map<size_t, FileBlockSPtr> offset_to_block;
2484
5.14k
    SCOPED_CACHE_LOCK(_mutex, this);
2485
5.14k
    if (_files.contains(hash)) {
2486
5.14k
        for (auto& [offset, cell] : _files[hash]) {
2487
5.14k
            if (cell.file_block->state() == FileBlock::State::DOWNLOADED) {
2488
5.14k
                cell.file_block->_owned_by_cached_reader = true;
2489
5.14k
                offset_to_block.emplace(offset, cell.file_block);
2490
5.14k
            }
2491
5.14k
        }
2492
5.13k
    }
2493
5.14k
    return offset_to_block;
2494
5.14k
}
2495
2496
0
void BlockFileCache::update_ttl_atime(const UInt128Wrapper& hash) {
2497
0
    SCOPED_CACHE_LOCK(_mutex, this);
2498
0
    if (auto iter = _files.find(hash); iter != _files.end()) {
2499
0
        for (auto& [_, cell] : iter->second) {
2500
0
            cell.update_atime();
2501
0
        }
2502
0
    };
2503
0
}
2504
2505
151
void BlockFileCache::run_background_lru_log_replay() {
2506
151
    Thread::set_self_name("run_background_lru_log_replay");
2507
4.14k
    while (!_close) {
2508
4.14k
        size_t backlog = _lru_recorder->get_total_lru_log_queue_size();
2509
4.14k
        QueueConsumePlan plan = build_lru_log_replay_plan(backlog);
2510
4.14k
        {
2511
4.14k
            std::unique_lock close_lock(_close_mtx);
2512
4.14k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(plan.interval_ms));
2513
4.14k
            if (_close) {
2514
150
                break;
2515
150
            }
2516
4.14k
        }
2517
2518
3.99k
        record_lru_recorder_log_queue_length();
2519
3.99k
        size_t drained = 0;
2520
3.99k
        drained += _lru_recorder->replay_queue_event(FileCacheType::TTL, plan.batch_limit);
2521
3.99k
        drained += _lru_recorder->replay_queue_event(FileCacheType::INDEX, plan.batch_limit);
2522
3.99k
        drained += _lru_recorder->replay_queue_event(FileCacheType::NORMAL, plan.batch_limit);
2523
3.99k
        drained += _lru_recorder->replay_queue_event(FileCacheType::DISPOSABLE, plan.batch_limit);
2524
3.99k
        record_lru_recorder_log_queue_length();
2525
3.99k
        if (backlog >= kLruLogReplayAdaptiveHighWatermark) {
2526
0
            LOG_EVERY_N(WARNING, 60)
2527
0
                    << "lru recorder backlog is high, backlog=" << backlog << " drained=" << drained
2528
0
                    << " interval_ms=" << plan.interval_ms << " batch_limit=" << plan.batch_limit;
2529
0
        }
2530
2531
3.99k
        if (config::enable_evaluate_shadow_queue_diff) {
2532
0
            SCOPED_CACHE_LOCK(_mutex, this);
2533
0
            _lru_recorder->evaluate_queue_diff(_ttl_queue, "ttl", cache_lock);
2534
0
            _lru_recorder->evaluate_queue_diff(_index_queue, "index", cache_lock);
2535
0
            _lru_recorder->evaluate_queue_diff(_normal_queue, "normal", cache_lock);
2536
0
            _lru_recorder->evaluate_queue_diff(_disposable_queue, "disposable", cache_lock);
2537
0
        }
2538
3.99k
    }
2539
151
}
2540
2541
7.99k
void BlockFileCache::record_lru_recorder_log_queue_length() {
2542
7.99k
    _lru_recorder_log_queue_length_metrics->set_value(
2543
7.99k
            _lru_recorder->get_total_lru_log_queue_size());
2544
7.99k
}
2545
2546
1.30k
void BlockFileCache::dump_lru_queues(bool force) {
2547
1.30k
    std::unique_lock dump_lock(_dump_lru_queues_mtx);
2548
1.30k
    if (config::file_cache_background_lru_dump_tail_record_num > 0 &&
2549
1.30k
        !ExecEnv::GetInstance()->get_is_upgrading()) {
2550
1.30k
        _lru_dumper->dump_queue("disposable", force);
2551
1.30k
        _lru_dumper->dump_queue("normal", force);
2552
1.30k
        _lru_dumper->dump_queue("index", force);
2553
1.30k
        _lru_dumper->dump_queue("ttl", force);
2554
1.30k
        _lru_dumper->set_first_dump_done();
2555
1.30k
    }
2556
1.30k
}
2557
2558
151
void BlockFileCache::run_background_lru_dump() {
2559
151
    Thread::set_self_name("run_background_lru_dump");
2560
1.45k
    while (!_close) {
2561
1.45k
        int64_t interval_ms = config::file_cache_background_lru_dump_interval_ms;
2562
1.45k
        {
2563
1.45k
            std::unique_lock close_lock(_close_mtx);
2564
1.45k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2565
1.45k
            if (_close) {
2566
151
                break;
2567
151
            }
2568
1.45k
        }
2569
1.30k
        dump_lru_queues(false);
2570
1.30k
    }
2571
151
}
2572
2573
151
void BlockFileCache::restore_lru_queues_from_disk(std::lock_guard<std::mutex>& cache_lock) {
2574
    // keep this order coz may be duplicated in different queue, we use the first appearence
2575
151
    _lru_dumper->restore_queue(_ttl_queue, "ttl", cache_lock);
2576
151
    _lru_dumper->restore_queue(_index_queue, "index", cache_lock);
2577
151
    _lru_dumper->restore_queue(_normal_queue, "normal", cache_lock);
2578
151
    _lru_dumper->restore_queue(_disposable_queue, "disposable", cache_lock);
2579
151
}
2580
2581
0
std::map<std::string, double> BlockFileCache::get_stats() {
2582
0
    std::map<std::string, double> stats;
2583
0
    stats["hits_ratio"] = (double)_hit_ratio->get_value();
2584
0
    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
2585
0
    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
2586
2587
0
    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
2588
0
    stats["index_queue_curr_size"] = (double)_cur_index_queue_cache_size_metrics->get_value();
2589
0
    stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
2590
0
    stats["index_queue_curr_elements"] =
2591
0
            (double)_cur_index_queue_element_count_metrics->get_value();
2592
2593
0
    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
2594
0
    stats["ttl_queue_curr_size"] = (double)_cur_ttl_cache_lru_queue_cache_size_metrics->get_value();
2595
0
    stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
2596
0
    stats["ttl_queue_curr_elements"] =
2597
0
            (double)_cur_ttl_cache_lru_queue_element_count_metrics->get_value();
2598
2599
0
    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
2600
0
    stats["normal_queue_curr_size"] = (double)_cur_normal_queue_cache_size_metrics->get_value();
2601
0
    stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
2602
0
    stats["normal_queue_curr_elements"] =
2603
0
            (double)_cur_normal_queue_element_count_metrics->get_value();
2604
2605
0
    stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
2606
0
    stats["disposable_queue_curr_size"] =
2607
0
            (double)_cur_disposable_queue_cache_size_metrics->get_value();
2608
0
    stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
2609
0
    stats["disposable_queue_curr_elements"] =
2610
0
            (double)_cur_disposable_queue_element_count_metrics->get_value();
2611
2612
0
    stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance;
2613
0
    stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode;
2614
2615
0
    stats["total_removed_counts"] = (double)_num_removed_blocks->get_value();
2616
0
    stats["total_hit_counts"] = (double)_num_hit_blocks->get_value();
2617
0
    stats["total_read_counts"] = (double)_num_read_blocks->get_value();
2618
2619
0
    stats["total_read_size"] = (double)_total_read_size_metrics->get_value();
2620
0
    stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value();
2621
0
    stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value();
2622
2623
0
    return stats;
2624
0
}
2625
2626
// for be UTs
2627
172
std::map<std::string, double> BlockFileCache::get_stats_unsafe() {
2628
172
    std::map<std::string, double> stats;
2629
172
    stats["hits_ratio"] = (double)_hit_ratio->get_value();
2630
172
    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
2631
172
    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
2632
2633
172
    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
2634
172
    stats["index_queue_curr_size"] = (double)_index_queue.get_capacity_unsafe();
2635
172
    stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
2636
172
    stats["index_queue_curr_elements"] = (double)_index_queue.get_elements_num_unsafe();
2637
2638
172
    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
2639
172
    stats["ttl_queue_curr_size"] = (double)_ttl_queue.get_capacity_unsafe();
2640
172
    stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
2641
172
    stats["ttl_queue_curr_elements"] = (double)_ttl_queue.get_elements_num_unsafe();
2642
2643
172
    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
2644
172
    stats["normal_queue_curr_size"] = (double)_normal_queue.get_capacity_unsafe();
2645
172
    stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
2646
172
    stats["normal_queue_curr_elements"] = (double)_normal_queue.get_elements_num_unsafe();
2647
2648
172
    stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
2649
172
    stats["disposable_queue_curr_size"] = (double)_disposable_queue.get_capacity_unsafe();
2650
172
    stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
2651
172
    stats["disposable_queue_curr_elements"] = (double)_disposable_queue.get_elements_num_unsafe();
2652
2653
172
    stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance;
2654
172
    stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode;
2655
2656
172
    stats["total_removed_counts"] = (double)_num_removed_blocks->get_value();
2657
172
    stats["total_hit_counts"] = (double)_num_hit_blocks->get_value();
2658
172
    stats["total_read_counts"] = (double)_num_read_blocks->get_value();
2659
2660
172
    stats["total_read_size"] = (double)_total_read_size_metrics->get_value();
2661
172
    stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value();
2662
172
    stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value();
2663
2664
172
    return stats;
2665
172
}
2666
2667
template void BlockFileCache::remove(FileBlockSPtr file_block,
2668
                                     std::lock_guard<std::mutex>& cache_lock,
2669
                                     std::lock_guard<std::mutex>& block_lock, bool sync);
2670
2671
1
Status BlockFileCache::report_file_cache_inconsistency(std::vector<std::string>& results) {
2672
1
    InconsistencyContext inconsistency_context;
2673
1
    RETURN_IF_ERROR(check_file_cache_consistency(inconsistency_context));
2674
1
    auto n = inconsistency_context.types.size();
2675
1
    results.reserve(n);
2676
1
    for (size_t i = 0; i < n; i++) {
2677
0
        std::string result;
2678
0
        result += "File cache info in manager:\n";
2679
0
        result += inconsistency_context.infos_in_manager[i].to_string();
2680
0
        result += "File cache info in storage:\n";
2681
0
        result += inconsistency_context.infos_in_storage[i].to_string();
2682
0
        result += inconsistency_context.types[i].to_string();
2683
0
        result += "\n";
2684
0
        results.push_back(std::move(result));
2685
0
    }
2686
1
    return Status::OK();
2687
1
}
2688
2689
1
Status BlockFileCache::check_file_cache_consistency(InconsistencyContext& inconsistency_context) {
2690
1
    std::lock_guard<std::mutex> cache_lock(_mutex);
2691
1
    std::vector<FileCacheInfo> infos_in_storage;
2692
1
    RETURN_IF_ERROR(_storage->get_file_cache_infos(infos_in_storage, cache_lock));
2693
1
    std::unordered_set<AccessKeyAndOffset, KeyAndOffsetHash> confirmed_blocks;
2694
1
    for (const auto& info_in_storage : infos_in_storage) {
2695
0
        confirmed_blocks.insert({info_in_storage.hash, info_in_storage.offset});
2696
0
        auto* cell = get_cell(info_in_storage.hash, info_in_storage.offset, cache_lock);
2697
0
        if (cell == nullptr || cell->file_block == nullptr) {
2698
0
            inconsistency_context.infos_in_manager.emplace_back();
2699
0
            inconsistency_context.infos_in_storage.push_back(info_in_storage);
2700
0
            inconsistency_context.types.emplace_back(InconsistencyType::NOT_LOADED);
2701
0
            continue;
2702
0
        }
2703
0
        FileCacheInfo info_in_manager {
2704
0
                .hash = info_in_storage.hash,
2705
0
                .expiration_time = cell->file_block->expiration_time(),
2706
0
                .size = cell->size(),
2707
0
                .offset = info_in_storage.offset,
2708
0
                .is_tmp = cell->file_block->state() == FileBlock::State::DOWNLOADING,
2709
0
                .cache_type = cell->file_block->cache_type()};
2710
0
        InconsistencyType inconsistent_type;
2711
0
        if (info_in_storage.is_tmp != info_in_manager.is_tmp) {
2712
0
            inconsistent_type |= InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE;
2713
0
        }
2714
0
        size_t expected_size =
2715
0
                info_in_manager.is_tmp ? cell->dowloading_size() : info_in_manager.size;
2716
0
        if (info_in_storage.size != expected_size) {
2717
0
            inconsistent_type |= InconsistencyType::SIZE_INCONSISTENT;
2718
0
        }
2719
        // Only if it is not a tmp file need we check the cache type.
2720
0
        if ((inconsistent_type & InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE) == 0 &&
2721
0
            info_in_storage.cache_type != info_in_manager.cache_type) {
2722
0
            inconsistent_type |= InconsistencyType::CACHE_TYPE_INCONSISTENT;
2723
0
        }
2724
0
        if (info_in_storage.expiration_time != info_in_manager.expiration_time) {
2725
0
            inconsistent_type |= InconsistencyType::EXPIRATION_TIME_INCONSISTENT;
2726
0
        }
2727
0
        if (inconsistent_type != InconsistencyType::NONE) {
2728
0
            inconsistency_context.infos_in_manager.push_back(info_in_manager);
2729
0
            inconsistency_context.infos_in_storage.push_back(info_in_storage);
2730
0
            inconsistency_context.types.push_back(inconsistent_type);
2731
0
        }
2732
0
    }
2733
2734
1
    for (const auto& [hash, offset_to_cell] : _files) {
2735
0
        for (const auto& [offset, cell] : offset_to_cell) {
2736
0
            if (confirmed_blocks.contains({hash, offset})) {
2737
0
                continue;
2738
0
            }
2739
0
            const auto& block = cell.file_block;
2740
0
            inconsistency_context.infos_in_manager.emplace_back(
2741
0
                    hash, block->expiration_time(), cell.size(), offset,
2742
0
                    cell.file_block->state() == FileBlock::State::DOWNLOADING, block->cache_type());
2743
0
            inconsistency_context.infos_in_storage.emplace_back();
2744
0
            inconsistency_context.types.emplace_back(InconsistencyType::MISSING_IN_STORAGE);
2745
0
        }
2746
0
    }
2747
1
    return Status::OK();
2748
1
}
2749
2750
} // namespace doris::io