Coverage Report

Created: 2026-06-02 01:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/cache/block_file_cache.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileCache.cpp
19
// and modified by Doris
20
21
#include "io/cache/block_file_cache.h"
22
23
#include <gen_cpp/file_cache.pb.h>
24
25
#include <cstdio>
26
#include <exception>
27
#include <fstream>
28
#include <unordered_set>
29
30
#include "common/status.h"
31
#include "cpp/sync_point.h"
32
#include "runtime/exec_env.h"
33
34
#if defined(__APPLE__)
35
#include <sys/mount.h>
36
#else
37
#include <sys/statfs.h>
38
#endif
39
40
#include <chrono> // IWYU pragma: keep
41
#include <mutex>
42
#include <ranges>
43
44
#include "common/cast_set.h"
45
#include "common/config.h"
46
#include "common/logging.h"
47
#include "core/uint128.h"
48
#include "exec/common/sip_hash.h"
49
#include "io/cache/block_file_cache_ttl_mgr.h"
50
#include "io/cache/file_block.h"
51
#include "io/cache/file_cache_common.h"
52
#include "io/cache/fs_file_cache_storage.h"
53
#include "io/cache/mem_file_cache_storage.h"
54
#include "runtime/runtime_profile.h"
55
#include "util/concurrency_stats.h"
56
#include "util/stack_util.h"
57
#include "util/stopwatch.hpp"
58
#include "util/thread.h"
59
#include "util/time.h"
60
namespace doris::io {
61
62
// Insert a block pointer into one shard while swallowing allocation failures.
63
193k
bool NeedUpdateLRUBlocks::insert(FileBlockSPtr block) {
64
193k
    if (!block) {
65
1
        return false;
66
1
    }
67
193k
    try {
68
193k
        auto* raw_ptr = block.get();
69
193k
        auto idx = shard_index(raw_ptr);
70
193k
        auto& shard = _shards[idx];
71
193k
        std::lock_guard lock(shard.mutex);
72
193k
        auto [_, inserted] = shard.entries.emplace(raw_ptr, std::move(block));
73
193k
        if (inserted) {
74
1.56k
            _size.fetch_add(1, std::memory_order_relaxed);
75
1.56k
        }
76
193k
        return inserted;
77
193k
    } catch (const std::exception& e) {
78
0
        LOG(WARNING) << "Failed to enqueue block for LRU update: " << e.what();
79
0
    } catch (...) {
80
0
        LOG(WARNING) << "Failed to enqueue block for LRU update: unknown error";
81
0
    }
82
0
    return false;
83
193k
}
84
85
// Drain up to `limit` unique blocks to the caller, keeping the structure consistent on failures.
86
3.93k
size_t NeedUpdateLRUBlocks::drain(size_t limit, std::vector<FileBlockSPtr>* output) {
87
3.93k
    if (limit == 0 || output == nullptr) {
88
2
        return 0;
89
2
    }
90
3.92k
    size_t drained = 0;
91
3.92k
    try {
92
3.92k
        output->reserve(output->size() + std::min(limit, size()));
93
250k
        for (auto& shard : _shards) {
94
250k
            if (drained >= limit) {
95
1
                break;
96
1
            }
97
250k
            std::lock_guard lock(shard.mutex);
98
250k
            auto it = shard.entries.begin();
99
250k
            size_t shard_drained = 0;
100
250k
            while (it != shard.entries.end() && drained + shard_drained < limit) {
101
10
                output->emplace_back(std::move(it->second));
102
10
                it = shard.entries.erase(it);
103
10
                ++shard_drained;
104
10
            }
105
250k
            if (shard_drained > 0) {
106
7
                _size.fetch_sub(shard_drained, std::memory_order_relaxed);
107
7
                drained += shard_drained;
108
7
            }
109
250k
        }
110
3.92k
    } catch (const std::exception& e) {
111
0
        LOG(WARNING) << "Failed to drain LRU update blocks: " << e.what();
112
0
    } catch (...) {
113
0
        LOG(WARNING) << "Failed to drain LRU update blocks: unknown error";
114
0
    }
115
3.93k
    return drained;
116
3.92k
}
117
118
// Remove every pending block, guarding against unexpected exceptions.
119
32
void NeedUpdateLRUBlocks::clear() {
120
32
    try {
121
2.04k
        for (auto& shard : _shards) {
122
2.04k
            std::lock_guard lock(shard.mutex);
123
2.04k
            if (!shard.entries.empty()) {
124
2
                auto removed = shard.entries.size();
125
2
                shard.entries.clear();
126
2
                _size.fetch_sub(removed, std::memory_order_relaxed);
127
2
            }
128
2.04k
        }
129
32
    } catch (const std::exception& e) {
130
0
        LOG(WARNING) << "Failed to clear LRU update blocks: " << e.what();
131
0
    } catch (...) {
132
0
        LOG(WARNING) << "Failed to clear LRU update blocks: unknown error";
133
0
    }
134
32
}
135
136
193k
size_t NeedUpdateLRUBlocks::shard_index(FileBlock* ptr) const {
137
193k
    DCHECK(ptr != nullptr);
138
193k
    return std::hash<FileBlock*> {}(ptr)&kShardMask;
139
193k
}
140
141
namespace {
142
143
struct QueueConsumePlan {
144
    int64_t interval_ms = 0;
145
    size_t batch_limit = 0;
146
};
147
148
constexpr size_t kLruLogReplayAdaptiveLowWatermark = 50'000;
149
constexpr size_t kLruLogReplayAdaptiveHighWatermark = 300'000;
150
constexpr int64_t kLruLogReplayAdaptiveMinIntervalMs = 100;
151
constexpr size_t kLruLogReplayAdaptiveMaxBatchPerType = 25'000;
152
153
constexpr size_t kBlockLruUpdateAdaptiveLowWatermark = 10'000;
154
constexpr size_t kBlockLruUpdateAdaptiveHighWatermark = 50'000;
155
constexpr int64_t kBlockLruUpdateAdaptiveMinIntervalMs = 500;
156
constexpr size_t kBlockLruUpdateAdaptiveMaxBatch = 10'000;
157
constexpr size_t kBlockLruUpdateLockSliceBatch = 500;
158
159
12.3k
int64_t positive_or_default(int64_t value, int64_t default_value) {
160
12.3k
    return value > 0 ? value : default_value;
161
12.3k
}
162
163
QueueConsumePlan build_queue_consume_plan(size_t backlog, int64_t base_interval_ms,
164
                                          size_t base_batch, size_t low_watermark,
165
                                          size_t high_watermark, int64_t min_interval_ms,
166
4.07k
                                          size_t max_batch) {
167
4.07k
    QueueConsumePlan plan;
168
4.07k
    plan.interval_ms = positive_or_default(base_interval_ms, 1);
169
4.07k
    plan.batch_limit = base_batch;
170
171
4.08k
    if (backlog < low_watermark) {
172
4.08k
        return plan;
173
4.08k
    }
174
175
18.4E
    const int64_t min_interval = positive_or_default(min_interval_ms, 1);
176
18.4E
    const size_t capped_max_batch = std::max<size_t>(max_batch, 1);
177
18.4E
    if (backlog < high_watermark) {
178
0
        plan.interval_ms = std::max<int64_t>(min_interval, plan.interval_ms / 2);
179
0
        plan.batch_limit = std::min(capped_max_batch, std::max<size_t>(base_batch * 2, 1));
180
0
        return plan;
181
0
    }
182
183
18.4E
    plan.interval_ms = min_interval;
184
18.4E
    plan.batch_limit = capped_max_batch;
185
18.4E
    return plan;
186
18.4E
}
187
188
4.15k
QueueConsumePlan build_lru_log_replay_plan(size_t backlog) {
189
4.15k
    const auto base_interval =
190
4.15k
            positive_or_default(config::file_cache_background_lru_log_replay_interval_ms, 1);
191
4.15k
    if (backlog < kLruLogReplayAdaptiveLowWatermark) {
192
4.15k
        return {base_interval, 0};
193
4.15k
    }
194
0
    return build_queue_consume_plan(
195
0
            backlog, base_interval, kLruLogReplayAdaptiveMaxBatchPerType / 4,
196
0
            kLruLogReplayAdaptiveLowWatermark, kLruLogReplayAdaptiveHighWatermark,
197
0
            kLruLogReplayAdaptiveMinIntervalMs, kLruLogReplayAdaptiveMaxBatchPerType);
198
4.15k
}
199
200
4.08k
QueueConsumePlan build_block_lru_update_plan(size_t backlog) {
201
4.08k
    const auto base_interval =
202
4.08k
            positive_or_default(config::file_cache_background_block_lru_update_interval_ms, 1);
203
4.08k
    const size_t base_batch =
204
4.08k
            std::max<int64_t>(config::file_cache_background_block_lru_update_qps_limit, 0) *
205
4.08k
            static_cast<size_t>(base_interval) / 1000;
206
4.08k
    if (base_batch == 0) {
207
0
        return {base_interval, 0};
208
0
    }
209
4.08k
    return build_queue_consume_plan(
210
4.08k
            backlog, base_interval, base_batch, kBlockLruUpdateAdaptiveLowWatermark,
211
4.08k
            kBlockLruUpdateAdaptiveHighWatermark, kBlockLruUpdateAdaptiveMinIntervalMs,
212
4.08k
            kBlockLruUpdateAdaptiveMaxBatch);
213
4.08k
}
214
215
} // namespace
216
217
BlockFileCache::BlockFileCache(const std::string& cache_base_path,
218
                               const FileCacheSettings& cache_settings)
219
175
        : _cache_base_path(cache_base_path),
220
175
          _capacity(cache_settings.capacity),
221
175
          _max_file_block_size(cache_settings.max_file_block_size) {
222
175
    _cur_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(_cache_base_path.c_str(),
223
175
                                                                     "file_cache_cache_size", 0);
224
175
    _cache_capacity_metrics = std::make_shared<bvar::Status<size_t>>(
225
175
            _cache_base_path.c_str(), "file_cache_capacity", _capacity);
226
175
    _cur_ttl_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
227
175
            _cache_base_path.c_str(), "file_cache_ttl_cache_size", 0);
228
175
    _cur_normal_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
229
175
            _cache_base_path.c_str(), "file_cache_normal_queue_element_count", 0);
230
175
    _cur_ttl_cache_lru_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
231
175
            _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_size", 0);
232
175
    _cur_ttl_cache_lru_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
233
175
            _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_element_count", 0);
234
175
    _cur_normal_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
235
175
            _cache_base_path.c_str(), "file_cache_normal_queue_cache_size", 0);
236
175
    _cur_index_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
237
175
            _cache_base_path.c_str(), "file_cache_index_queue_element_count", 0);
238
175
    _cur_index_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
239
175
            _cache_base_path.c_str(), "file_cache_index_queue_cache_size", 0);
240
175
    _cur_disposable_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
241
175
            _cache_base_path.c_str(), "file_cache_disposable_queue_element_count", 0);
242
175
    _cur_disposable_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
243
175
            _cache_base_path.c_str(), "file_cache_disposable_queue_cache_size", 0);
244
245
175
    _queue_evict_size_metrics[0] = std::make_shared<bvar::Adder<size_t>>(
246
175
            _cache_base_path.c_str(), "file_cache_index_queue_evict_size");
247
175
    _queue_evict_size_metrics[1] = std::make_shared<bvar::Adder<size_t>>(
248
175
            _cache_base_path.c_str(), "file_cache_normal_queue_evict_size");
249
175
    _queue_evict_size_metrics[2] = std::make_shared<bvar::Adder<size_t>>(
250
175
            _cache_base_path.c_str(), "file_cache_disposable_queue_evict_size");
251
175
    _queue_evict_size_metrics[3] = std::make_shared<bvar::Adder<size_t>>(
252
175
            _cache_base_path.c_str(), "file_cache_ttl_cache_evict_size");
253
175
    _total_evict_size_metrics = std::make_shared<bvar::Adder<size_t>>(
254
175
            _cache_base_path.c_str(), "file_cache_total_evict_size");
255
175
    _total_read_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
256
175
                                                                     "file_cache_total_read_size");
257
175
    _total_hit_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
258
175
                                                                    "file_cache_total_hit_size");
259
175
    _gc_evict_bytes_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
260
175
                                                                    "file_cache_gc_evict_bytes");
261
175
    _gc_evict_count_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
262
175
                                                                    "file_cache_gc_evict_count");
263
264
175
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
265
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
266
175
                                                  "file_cache_evict_by_time_disposable_to_normal");
267
175
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
268
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
269
175
                                                  "file_cache_evict_by_time_disposable_to_index");
270
175
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
271
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
272
175
                                                  "file_cache_evict_by_time_disposable_to_ttl");
273
175
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
274
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
275
175
                                                  "file_cache_evict_by_time_normal_to_disposable");
276
175
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
277
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
278
175
                                                  "file_cache_evict_by_time_normal_to_index");
279
175
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
280
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
281
175
                                                  "file_cache_evict_by_time_normal_to_ttl");
282
175
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
283
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
284
175
                                                  "file_cache_evict_by_time_index_to_disposable");
285
175
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
286
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
287
175
                                                  "file_cache_evict_by_time_index_to_normal");
288
175
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
289
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
290
175
                                                  "file_cache_evict_by_time_index_to_ttl");
291
175
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
292
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
293
175
                                                  "file_cache_evict_by_time_ttl_to_disposable");
294
175
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
295
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
296
175
                                                  "file_cache_evict_by_time_ttl_to_normal");
297
175
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
298
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
299
175
                                                  "file_cache_evict_by_time_ttl_to_index");
300
301
175
    _evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE] =
302
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
303
175
                                                  "file_cache_evict_by_self_lru_disposable");
304
175
    _evict_by_self_lru_metrics_matrix[FileCacheType::NORMAL] =
305
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
306
175
                                                  "file_cache_evict_by_self_lru_normal");
307
175
    _evict_by_self_lru_metrics_matrix[FileCacheType::INDEX] = std::make_shared<bvar::Adder<size_t>>(
308
175
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_index");
309
175
    _evict_by_self_lru_metrics_matrix[FileCacheType::TTL] = std::make_shared<bvar::Adder<size_t>>(
310
175
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_ttl");
311
312
175
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
313
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
314
175
                                                  "file_cache_evict_by_size_disposable_to_normal");
315
175
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
316
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
317
175
                                                  "file_cache_evict_by_size_disposable_to_index");
318
175
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
319
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
320
175
                                                  "file_cache_evict_by_size_disposable_to_ttl");
321
175
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
322
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
323
175
                                                  "file_cache_evict_by_size_normal_to_disposable");
324
175
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
325
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
326
175
                                                  "file_cache_evict_by_size_normal_to_index");
327
175
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
328
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
329
175
                                                  "file_cache_evict_by_size_normal_to_ttl");
330
175
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
331
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
332
175
                                                  "file_cache_evict_by_size_index_to_disposable");
333
175
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
334
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
335
175
                                                  "file_cache_evict_by_size_index_to_normal");
336
175
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
337
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
338
175
                                                  "file_cache_evict_by_size_index_to_ttl");
339
175
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
340
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
341
175
                                                  "file_cache_evict_by_size_ttl_to_disposable");
342
175
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
343
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
344
175
                                                  "file_cache_evict_by_size_ttl_to_normal");
345
175
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
346
175
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
347
175
                                                  "file_cache_evict_by_size_ttl_to_index");
348
349
175
    _evict_by_try_release = std::make_shared<bvar::Adder<size_t>>(
350
175
            _cache_base_path.c_str(), "file_cache_evict_by_try_release");
351
352
175
    _num_read_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
353
175
                                                             "file_cache_num_read_blocks");
354
175
    _num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
355
175
                                                            "file_cache_num_hit_blocks");
356
175
    _num_removed_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
357
175
                                                                "file_cache_num_removed_blocks");
358
359
175
    _no_warmup_num_read_blocks = std::make_shared<bvar::Adder<size_t>>(
360
175
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks");
361
175
    _no_warmup_num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(
362
175
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks");
363
364
#ifndef BE_TEST
365
    _num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
366
            _cache_base_path.c_str(), "file_cache_num_hit_blocks_5m", _num_hit_blocks.get(), 300);
367
    _num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
368
            _cache_base_path.c_str(), "file_cache_num_read_blocks_5m", _num_read_blocks.get(), 300);
369
    _num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
370
            _cache_base_path.c_str(), "file_cache_num_hit_blocks_1h", _num_hit_blocks.get(), 3600);
371
    _num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
372
            _cache_base_path.c_str(), "file_cache_num_read_blocks_1h", _num_read_blocks.get(),
373
            3600);
374
    _no_warmup_num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
375
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_5m",
376
            _no_warmup_num_hit_blocks.get(), 300);
377
    _no_warmup_num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
378
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_5m",
379
            _no_warmup_num_read_blocks.get(), 300);
380
    _no_warmup_num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
381
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_1h",
382
            _no_warmup_num_hit_blocks.get(), 3600);
383
    _no_warmup_num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
384
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_1h",
385
            _no_warmup_num_read_blocks.get(), 3600);
386
#endif
387
388
175
    _hit_ratio = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
389
175
                                                        "file_cache_hit_ratio", 0.0);
390
175
    _hit_ratio_5m = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
391
175
                                                           "file_cache_hit_ratio_5m", 0.0);
392
175
    _hit_ratio_1h = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
393
175
                                                           "file_cache_hit_ratio_1h", 0.0);
394
395
175
    _no_warmup_hit_ratio = std::make_shared<bvar::Status<double>>(
396
175
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio", 0.0);
397
175
    _no_warmup_hit_ratio_5m = std::make_shared<bvar::Status<double>>(
398
175
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_5m", 0.0);
399
175
    _no_warmup_hit_ratio_1h = std::make_shared<bvar::Status<double>>(
400
175
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_1h", 0.0);
401
402
175
    _disk_limit_mode_metrics = std::make_shared<bvar::Status<size_t>>(
403
175
            _cache_base_path.c_str(), "file_cache_disk_limit_mode", 0);
404
175
    _need_evict_cache_in_advance_metrics = std::make_shared<bvar::Status<size_t>>(
405
175
            _cache_base_path.c_str(), "file_cache_need_evict_cache_in_advance", 0);
406
175
    _meta_store_write_queue_size_metrics = std::make_shared<bvar::Status<size_t>>(
407
175
            _cache_base_path.c_str(), "file_cache_meta_store_write_queue_size", 0);
408
409
175
    _cache_lock_wait_time_us = std::make_shared<bvar::LatencyRecorder>(
410
175
            _cache_base_path.c_str(), "file_cache_cache_lock_wait_time_us");
411
175
    _get_or_set_latency_us = std::make_shared<bvar::LatencyRecorder>(
412
175
            _cache_base_path.c_str(), "file_cache_get_or_set_latency_us");
413
175
    _storage_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
414
175
            _cache_base_path.c_str(), "file_cache_storage_sync_remove_latency_us");
415
175
    _storage_retry_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
416
175
            _cache_base_path.c_str(), "file_cache_storage_retry_sync_remove_latency_us");
417
175
    _storage_async_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
418
175
            _cache_base_path.c_str(), "file_cache_storage_async_remove_latency_us");
419
175
    _evict_in_advance_latency_us = std::make_shared<bvar::LatencyRecorder>(
420
175
            _cache_base_path.c_str(), "file_cache_evict_in_advance_latency_us");
421
175
    _lru_dump_latency_us = std::make_shared<bvar::LatencyRecorder>(
422
175
            _cache_base_path.c_str(), "file_cache_lru_dump_latency_us");
423
175
    _recycle_keys_length_metrics = std::make_shared<bvar::Status<size_t>>(
424
175
            _cache_base_path.c_str(), "file_cache_recycle_keys_length", 0);
425
175
    _need_update_lru_blocks_length_metrics = std::make_shared<bvar::Status<size_t>>(
426
175
            _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_length", 0);
427
175
    _lru_recorder_log_queue_length_metrics = std::make_shared<bvar::Status<size_t>>(
428
175
            _cache_base_path.c_str(), "file_cache_lru_recorder_log_queue_length", 0);
429
175
    _update_lru_blocks_latency_us = std::make_shared<bvar::LatencyRecorder>(
430
175
            _cache_base_path.c_str(), "file_cache_update_lru_blocks_latency_us");
431
175
    _ttl_gc_latency_us = std::make_shared<bvar::LatencyRecorder>(_cache_base_path.c_str(),
432
175
                                                                 "file_cache_ttl_gc_latency_us");
433
175
    _shadow_queue_levenshtein_distance = std::make_shared<bvar::LatencyRecorder>(
434
175
            _cache_base_path.c_str(), "file_cache_shadow_queue_levenshtein_distance");
435
436
175
    _disposable_queue = LRUQueue(cache_settings.disposable_queue_size,
437
175
                                 cache_settings.disposable_queue_elements, 60 * 60);
438
175
    _index_queue = LRUQueue(cache_settings.index_queue_size, cache_settings.index_queue_elements,
439
175
                            7 * 24 * 60 * 60);
440
175
    _normal_queue = LRUQueue(cache_settings.query_queue_size, cache_settings.query_queue_elements,
441
175
                             24 * 60 * 60);
442
175
    _ttl_queue = LRUQueue(cache_settings.ttl_queue_size, cache_settings.ttl_queue_elements,
443
175
                          std::numeric_limits<int>::max());
444
445
175
    _lru_recorder = std::make_unique<LRUQueueRecorder>(this);
446
175
    _lru_dumper = std::make_unique<CacheLRUDumper>(this, _lru_recorder.get());
447
175
    if (cache_settings.storage == "memory") {
448
21
        _storage = std::make_unique<MemFileCacheStorage>();
449
21
        _cache_base_path = "memory";
450
154
    } else {
451
154
        _storage = std::make_unique<FSFileCacheStorage>();
452
154
    }
453
454
175
    LOG(INFO) << "file cache path= " << _cache_base_path << " " << cache_settings.to_string();
455
175
}
456
457
14.5k
UInt128Wrapper BlockFileCache::hash(const std::string& path) {
458
14.5k
    uint128_t value;
459
14.5k
    sip_hash128(path.data(), path.size(), reinterpret_cast<char*>(&value));
460
14.5k
    return UInt128Wrapper(value);
461
14.5k
}
462
463
537k
bool BlockFileCache::is_memory_storage() const {
464
537k
    DCHECK(_storage != nullptr);
465
537k
    return _storage->get_type() == FileCacheStorageType::MEMORY;
466
537k
}
467
468
BlockFileCache::QueryFileCacheContextHolderPtr BlockFileCache::get_query_context_holder(
469
8
        const TUniqueId& query_id, int file_cache_query_limit_percent) {
470
8
    SCOPED_CACHE_LOCK(_mutex, this);
471
8
    if (!config::enable_file_cache_query_limit) {
472
1
        return {};
473
1
    }
474
475
    /// if enable_filesystem_query_cache_limit is true,
476
    /// we create context query for current query.
477
7
    auto context = get_or_set_query_context(query_id, cache_lock, file_cache_query_limit_percent);
478
7
    return std::make_unique<QueryFileCacheContextHolder>(query_id, this, context);
479
8
}
480
481
BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_query_context(
482
3.37k
        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock) {
483
3.37k
    auto query_iter = _query_map.find(query_id);
484
3.37k
    return (query_iter == _query_map.end()) ? nullptr : query_iter->second;
485
3.37k
}
486
487
6
void BlockFileCache::remove_query_context(const TUniqueId& query_id) {
488
6
    SCOPED_CACHE_LOCK(_mutex, this);
489
6
    const auto& query_iter = _query_map.find(query_id);
490
491
6
    if (query_iter != _query_map.end() && query_iter->second.use_count() <= 1) {
492
5
        _query_map.erase(query_iter);
493
5
    }
494
6
}
495
496
BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_or_set_query_context(
497
        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock,
498
7
        int file_cache_query_limit_percent) {
499
7
    if (query_id.lo == 0 && query_id.hi == 0) {
500
1
        return nullptr;
501
1
    }
502
503
6
    auto context = get_query_context(query_id, cache_lock);
504
6
    if (context) {
505
1
        return context;
506
1
    }
507
508
5
    size_t file_cache_query_limit_size = _capacity * file_cache_query_limit_percent / 100;
509
5
    if (file_cache_query_limit_size < 268435456) {
510
5
        LOG(WARNING) << "The user-set file cache query limit (" << file_cache_query_limit_size
511
5
                     << " bytes) is less than the 256MB recommended minimum. "
512
5
                     << "Consider increasing the session variable 'file_cache_query_limit_percent'"
513
5
                     << " from its current value " << file_cache_query_limit_percent << "%.";
514
5
    }
515
5
    auto query_context = std::make_shared<QueryFileCacheContext>(file_cache_query_limit_size);
516
5
    auto query_iter = _query_map.emplace(query_id, query_context).first;
517
5
    return query_iter->second;
518
6
}
519
520
void BlockFileCache::QueryFileCacheContext::remove(const UInt128Wrapper& hash, size_t offset,
521
20
                                                   std::lock_guard<std::mutex>& cache_lock) {
522
20
    auto pair = std::make_pair(hash, offset);
523
20
    auto record = records.find(pair);
524
20
    DCHECK(record != records.end());
525
20
    auto iter = record->second;
526
20
    records.erase(pair);
527
20
    lru_queue.remove(iter, cache_lock);
528
20
}
529
530
void BlockFileCache::QueryFileCacheContext::reserve(const UInt128Wrapper& hash, size_t offset,
531
                                                    size_t size,
532
30
                                                    std::lock_guard<std::mutex>& cache_lock) {
533
30
    auto pair = std::make_pair(hash, offset);
534
30
    if (records.find(pair) == records.end()) {
535
29
        auto queue_iter = lru_queue.add(hash, offset, size, cache_lock);
536
29
        records.insert({pair, queue_iter});
537
29
    }
538
30
}
539
540
151
Status BlockFileCache::initialize() {
541
151
    SCOPED_CACHE_LOCK(_mutex, this);
542
151
    return initialize_unlocked(cache_lock);
543
151
}
544
545
151
Status BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lock) {
546
151
    DCHECK(!_is_initialized);
547
151
    _is_initialized = true;
548
151
    if (config::file_cache_background_lru_dump_tail_record_num > 0) {
549
        // requirements:
550
        // 1. restored data should not overwrite the last dump
551
        // 2. restore should happen before load and async load
552
        // 3. all queues should be restored sequencially to avoid conflict
553
        // TODO(zhengyu): we can parralize them but will increase complexity, so lets check the time cost
554
        // to see if any improvement is a necessary
555
151
        restore_lru_queues_from_disk(cache_lock);
556
151
    }
557
151
    RETURN_IF_ERROR(_storage->init(this));
558
559
151
    if (auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get())) {
560
132
        if (auto* meta_store = fs_storage->get_meta_store()) {
561
132
            _ttl_mgr = std::make_unique<BlockFileCacheTtlMgr>(this, meta_store);
562
132
        }
563
132
    }
564
565
151
    _cache_background_monitor_thread = std::thread(&BlockFileCache::run_background_monitor, this);
566
151
    _cache_background_gc_thread = std::thread(&BlockFileCache::run_background_gc, this);
567
151
    _cache_background_evict_in_advance_thread =
568
151
            std::thread(&BlockFileCache::run_background_evict_in_advance, this);
569
151
    _cache_background_block_lru_update_thread =
570
151
            std::thread(&BlockFileCache::run_background_block_lru_update, this);
571
572
    // Initialize LRU dump thread and restore queues
573
151
    _cache_background_lru_dump_thread = std::thread(&BlockFileCache::run_background_lru_dump, this);
574
151
    _cache_background_lru_log_replay_thread =
575
151
            std::thread(&BlockFileCache::run_background_lru_log_replay, this);
576
577
151
    return Status::OK();
578
151
}
579
580
void BlockFileCache::update_block_lru(FileBlockSPtr block,
581
7
                                      std::lock_guard<std::mutex>& cache_lock) {
582
7
    if (!block) {
583
1
        return;
584
1
    }
585
586
6
    FileBlockCell* cell = get_cell(block->get_hash_value(), block->offset(), cache_lock);
587
6
    if (!cell || cell->file_block.get() != block.get()) {
588
1
        return;
589
1
    }
590
591
5
    if (cell->queue_iterator) {
592
5
        auto& queue = get_queue(block->cache_type());
593
5
        queue.move_to_end(*cell->queue_iterator, cache_lock);
594
5
        _lru_recorder->record_queue_event(block->cache_type(), CacheLRULogType::MOVETOBACK,
595
5
                                          block->_key.hash, block->_key.offset,
596
5
                                          block->_block_range.size());
597
5
    }
598
5
    cell->update_atime();
599
5
}
600
601
void BlockFileCache::use_cell(const FileBlockCell& cell, FileBlocks* result, bool move_iter_flag,
602
712k
                              std::lock_guard<std::mutex>& cache_lock) {
603
712k
    if (result) {
604
712k
        result->push_back(cell.file_block);
605
712k
    }
606
607
712k
    auto& queue = get_queue(cell.file_block->cache_type());
608
    /// Move to the end of the queue. The iterator remains valid.
609
712k
    if (!config::enable_file_cache_async_touch_on_get_or_set && cell.queue_iterator &&
610
712k
        move_iter_flag) {
611
520k
        queue.move_to_end(*cell.queue_iterator, cache_lock);
612
520k
        _lru_recorder->record_queue_event(cell.file_block->cache_type(),
613
520k
                                          CacheLRULogType::MOVETOBACK, cell.file_block->_key.hash,
614
520k
                                          cell.file_block->_key.offset, cell.size());
615
520k
    }
616
617
712k
    cell.update_atime();
618
712k
}
619
620
template <class T>
621
    requires IsXLock<T>
622
FileBlockCell* BlockFileCache::get_cell(const UInt128Wrapper& hash, size_t offset,
623
6.02k
                                        T& /* cache_lock */) {
624
6.02k
    auto it = _files.find(hash);
625
6.02k
    if (it == _files.end()) {
626
3
        return nullptr;
627
3
    }
628
629
6.02k
    auto& offsets = it->second;
630
6.02k
    auto cell_it = offsets.find(offset);
631
6.02k
    if (cell_it == offsets.end()) {
632
3
        return nullptr;
633
3
    }
634
635
6.01k
    return &cell_it->second;
636
6.02k
}
637
638
904k
bool BlockFileCache::need_to_move(FileCacheType cell_type, FileCacheType query_type) const {
639
904k
    return query_type != FileCacheType::DISPOSABLE && cell_type != FileCacheType::DISPOSABLE;
640
904k
}
641
642
FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheContext& context,
643
                                    const FileBlock::Range& range,
644
725k
                                    std::lock_guard<std::mutex>& cache_lock) {
645
    /// Given range = [left, right] and non-overlapping ordered set of file blocks,
646
    /// find list [block1, ..., blockN] of blocks which intersect with given range.
647
725k
    auto it = _files.find(hash);
648
725k
    if (it == _files.end()) {
649
7.07k
        if (_async_open_done) {
650
7.07k
            return {};
651
7.07k
        }
652
1
        FileCacheKey key;
653
1
        key.hash = hash;
654
1
        key.meta.type = context.cache_type;
655
1
        key.meta.expiration_time = context.expiration_time;
656
1
        key.meta.tablet_id = context.tablet_id;
657
1
        _storage->load_blocks_directly_unlocked(this, key, cache_lock);
658
659
1
        it = _files.find(hash);
660
1
        if (it == _files.end()) [[unlikely]] {
661
1
            return {};
662
1
        }
663
1
    }
664
665
718k
    auto& file_blocks = it->second;
666
718k
    if (file_blocks.empty()) {
667
0
        LOG(WARNING) << "file_blocks is empty for hash=" << hash.to_string()
668
0
                     << " cache type=" << context.cache_type
669
0
                     << " cache expiration time=" << context.expiration_time
670
0
                     << " cache range=" << range.left << " " << range.right
671
0
                     << " query id=" << context.query_id;
672
0
        DCHECK(false);
673
0
        _files.erase(hash);
674
0
        return {};
675
0
    }
676
677
718k
    FileBlocks result;
678
718k
    auto block_it = file_blocks.lower_bound(range.left);
679
718k
    if (block_it == file_blocks.end()) {
680
        /// N - last cached block for given file hash, block{N}.offset < range.left:
681
        ///   block{N}                       block{N}
682
        /// [________                         [_______]
683
        ///     [__________]         OR                  [________]
684
        ///     ^                                        ^
685
        ///     range.left                               range.left
686
687
6.19k
        const auto& cell = file_blocks.rbegin()->second;
688
6.19k
        if (cell.file_block->range().right < range.left) {
689
6.17k
            return {};
690
6.17k
        }
691
692
14
        use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
693
14
                 cache_lock);
694
712k
    } else { /// block_it <-- segmment{k}
695
712k
        if (block_it != file_blocks.begin()) {
696
218
            const auto& prev_cell = std::prev(block_it)->second;
697
218
            const auto& prev_cell_range = prev_cell.file_block->range();
698
699
218
            if (range.left <= prev_cell_range.right) {
700
                ///   block{k-1}  block{k}
701
                ///   [________]   [_____
702
                ///       [___________
703
                ///       ^
704
                ///       range.left
705
706
137
                use_cell(prev_cell, &result,
707
137
                         need_to_move(prev_cell.file_block->cache_type(), context.cache_type),
708
137
                         cache_lock);
709
137
            }
710
218
        }
711
712
        ///  block{k} ...       block{k-1}  block{k}                      block{k}
713
        ///  [______              [______]     [____                        [________
714
        ///  [_________     OR              [________      OR    [______]   ^
715
        ///  ^                              ^                           ^   block{k}.offset
716
        ///  range.left                     range.left                  range.right
717
718
1.42M
        while (block_it != file_blocks.end()) {
719
712k
            const auto& cell = block_it->second;
720
712k
            if (range.right < cell.file_block->range().left) {
721
174
                break;
722
174
            }
723
724
712k
            use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
725
712k
                     cache_lock);
726
712k
            ++block_it;
727
712k
        }
728
712k
    }
729
730
712k
    return result;
731
718k
}
732
733
193k
void BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) {
734
193k
    if (_need_update_lru_blocks.insert(std::move(block))) {
735
1.55k
        _need_update_lru_blocks_length_metrics->set_value(_need_update_lru_blocks.size());
736
1.55k
    }
737
193k
}
738
739
3
std::string BlockFileCache::clear_file_cache_async() {
740
3
    LOG(INFO) << "start clear_file_cache_async, path=" << _cache_base_path;
741
3
    _lru_dumper->remove_lru_dump_files();
742
3
    int64_t num_cells_all = 0;
743
3
    int64_t num_cells_to_delete = 0;
744
3
    int64_t num_cells_wait_recycle = 0;
745
3
    int64_t num_files_all = 0;
746
3
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::clear_file_cache_async");
747
3
    {
748
3
        SCOPED_CACHE_LOCK(_mutex, this);
749
750
3
        std::vector<FileBlockCell*> deleting_cells;
751
4
        for (auto& [_, offset_to_cell] : _files) {
752
4
            ++num_files_all;
753
37
            for (auto& [_1, cell] : offset_to_cell) {
754
37
                ++num_cells_all;
755
37
                deleting_cells.push_back(&cell);
756
37
            }
757
4
        }
758
759
        // we cannot delete the element in the loop above, because it will break the iterator
760
37
        for (auto& cell : deleting_cells) {
761
37
            if (!cell->releasable()) {
762
2
                LOG(INFO) << "cell is not releasable, hash="
763
2
                          << " offset=" << cell->file_block->offset();
764
2
                cell->file_block->set_deleting();
765
2
                ++num_cells_wait_recycle;
766
2
                continue;
767
2
            }
768
35
            FileBlockSPtr file_block = cell->file_block;
769
35
            if (file_block) {
770
35
                std::lock_guard block_lock(file_block->_mutex);
771
35
                remove(file_block, cache_lock, block_lock, false);
772
35
                ++num_cells_to_delete;
773
35
            }
774
35
        }
775
3
        clear_need_update_lru_blocks();
776
3
    }
777
778
3
    std::stringstream ss;
779
3
    ss << "finish clear_file_cache_async, path=" << _cache_base_path
780
3
       << " num_files_all=" << num_files_all << " num_cells_all=" << num_cells_all
781
3
       << " num_cells_to_delete=" << num_cells_to_delete
782
3
       << " num_cells_wait_recycle=" << num_cells_wait_recycle;
783
3
    auto msg = ss.str();
784
3
    LOG(INFO) << msg;
785
3
    _lru_dumper->remove_lru_dump_files();
786
3
    return msg;
787
3
}
788
789
FileBlocks BlockFileCache::split_range_into_cells(const UInt128Wrapper& hash,
790
                                                  const CacheContext& context, size_t offset,
791
                                                  size_t size, FileBlock::State state,
792
13.4k
                                                  std::lock_guard<std::mutex>& cache_lock) {
793
13.4k
    DCHECK(size > 0);
794
795
13.4k
    auto current_pos = offset;
796
13.4k
    auto end_pos_non_included = offset + size;
797
798
13.4k
    size_t current_size = 0;
799
13.4k
    size_t remaining_size = size;
800
801
13.4k
    FileBlocks file_blocks;
802
26.9k
    while (current_pos < end_pos_non_included) {
803
13.4k
        current_size = std::min(remaining_size, _max_file_block_size);
804
13.4k
        remaining_size -= current_size;
805
13.4k
        state = try_reserve(hash, context, current_pos, current_size, cache_lock)
806
13.4k
                        ? state
807
13.4k
                        : FileBlock::State::SKIP_CACHE;
808
13.4k
        if (state == FileBlock::State::SKIP_CACHE) [[unlikely]] {
809
62
            FileCacheKey key;
810
62
            key.hash = hash;
811
62
            key.offset = current_pos;
812
62
            key.meta.type = context.cache_type;
813
62
            key.meta.expiration_time = context.expiration_time;
814
62
            key.meta.tablet_id = context.tablet_id;
815
62
            auto file_block = std::make_shared<FileBlock>(key, current_size, this,
816
62
                                                          FileBlock::State::SKIP_CACHE);
817
62
            file_blocks.push_back(std::move(file_block));
818
13.4k
        } else {
819
13.4k
            auto* cell = add_cell(hash, context, current_pos, current_size, state, cache_lock);
820
13.4k
            if (cell) {
821
13.4k
                file_blocks.push_back(cell->file_block);
822
13.4k
                if (!context.is_cold_data) {
823
13.4k
                    cell->update_atime();
824
13.4k
                }
825
13.4k
            }
826
13.4k
            if (_ttl_mgr && context.tablet_id != 0) {
827
1.06k
                _ttl_mgr->register_tablet_id(context.tablet_id);
828
1.06k
            }
829
13.4k
        }
830
831
13.4k
        current_pos += current_size;
832
13.4k
    }
833
834
13.4k
    DCHECK(file_blocks.empty() || offset + size - 1 == file_blocks.back()->range().right);
835
13.4k
    return file_blocks;
836
13.4k
}
837
838
void BlockFileCache::fill_holes_with_empty_file_blocks(FileBlocks& file_blocks,
839
                                                       const UInt128Wrapper& hash,
840
                                                       const CacheContext& context,
841
                                                       const FileBlock::Range& range,
842
712k
                                                       std::lock_guard<std::mutex>& cache_lock) {
843
    /// There are blocks [block1, ..., blockN]
844
    /// (non-overlapping, non-empty, ascending-ordered) which (maybe partially)
845
    /// intersect with given range.
846
847
    /// It can have holes:
848
    /// [____________________]         -- requested range
849
    ///     [____]  [_]   [_________]  -- intersecting cache [block1, ..., blockN]
850
    ///
851
    /// For each such hole create a cell with file block state EMPTY.
852
853
712k
    auto it = file_blocks.begin();
854
712k
    auto block_range = (*it)->range();
855
856
712k
    size_t current_pos = 0;
857
712k
    if (block_range.left < range.left) {
858
        ///    [_______     -- requested range
859
        /// [_______
860
        /// ^
861
        /// block1
862
863
151
        current_pos = block_range.right + 1;
864
151
        ++it;
865
712k
    } else {
866
712k
        current_pos = range.left;
867
712k
    }
868
869
1.42M
    while (current_pos <= range.right && it != file_blocks.end()) {
870
712k
        block_range = (*it)->range();
871
872
712k
        if (current_pos == block_range.left) {
873
712k
            current_pos = block_range.right + 1;
874
712k
            ++it;
875
712k
            continue;
876
712k
        }
877
878
712k
        DCHECK(current_pos < block_range.left);
879
880
109
        auto hole_size = block_range.left - current_pos;
881
882
109
        file_blocks.splice(it, split_range_into_cells(hash, context, current_pos, hole_size,
883
109
                                                      FileBlock::State::EMPTY, cache_lock));
884
885
109
        current_pos = block_range.right + 1;
886
109
        ++it;
887
109
    }
888
889
712k
    if (current_pos <= range.right) {
890
        ///   ________]     -- requested range
891
        ///   _____]
892
        ///        ^
893
        /// blockN
894
895
86
        auto hole_size = range.right - current_pos + 1;
896
897
86
        file_blocks.splice(file_blocks.end(),
898
86
                           split_range_into_cells(hash, context, current_pos, hole_size,
899
86
                                                  FileBlock::State::EMPTY, cache_lock));
900
86
    }
901
712k
}
902
903
FileBlocksHolder BlockFileCache::get_or_set(const UInt128Wrapper& hash, size_t offset, size_t size,
904
725k
                                            CacheContext& context) {
905
725k
    FileBlock::Range range(offset, offset + size - 1);
906
907
725k
    ReadStatistics* stats = context.stats;
908
725k
    DCHECK(stats != nullptr);
909
725k
    MonotonicStopWatch sw;
910
725k
    sw.start();
911
725k
    FileBlocks file_blocks;
912
725k
    std::vector<FileBlockSPtr> need_update_lru_blocks;
913
725k
    const bool async_touch_on_get_or_set = config::enable_file_cache_async_touch_on_get_or_set;
914
725k
    int64_t duration = 0;
915
725k
    {
916
725k
        ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->increment();
917
725k
        std::lock_guard cache_lock(_mutex);
918
725k
        ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->decrement();
919
725k
        stats->lock_wait_timer += sw.elapsed_time();
920
725k
        SCOPED_RAW_TIMER(&duration);
921
        /// Get all blocks which intersect with the given range.
922
725k
        {
923
725k
            SCOPED_RAW_TIMER(&stats->get_timer);
924
725k
            file_blocks = get_impl(hash, context, range, cache_lock);
925
725k
        }
926
927
725k
        if (file_blocks.empty()) {
928
13.2k
            SCOPED_RAW_TIMER(&stats->set_timer);
929
13.2k
            file_blocks = split_range_into_cells(hash, context, offset, size,
930
13.2k
                                                 FileBlock::State::EMPTY, cache_lock);
931
712k
        } else {
932
712k
            SCOPED_RAW_TIMER(&stats->set_timer);
933
712k
            fill_holes_with_empty_file_blocks(file_blocks, hash, context, range, cache_lock);
934
712k
        }
935
725k
        DCHECK(!file_blocks.empty());
936
725k
        *_num_read_blocks << file_blocks.size();
937
725k
        if (!context.is_warmup) {
938
725k
            *_no_warmup_num_read_blocks << file_blocks.size();
939
725k
        }
940
725k
        if (async_touch_on_get_or_set) {
941
193k
            need_update_lru_blocks.reserve(file_blocks.size());
942
193k
        }
943
726k
        for (auto& block : file_blocks) {
944
726k
            size_t block_size = block->range().size();
945
726k
            *_total_read_size_metrics << block_size;
946
726k
            if (block->state_unsafe() == FileBlock::State::DOWNLOADED) {
947
712k
                *_num_hit_blocks << 1;
948
712k
                *_total_hit_size_metrics << block_size;
949
712k
                if (!context.is_warmup) {
950
712k
                    *_no_warmup_num_hit_blocks << 1;
951
712k
                }
952
712k
                if (async_touch_on_get_or_set &&
953
712k
                    need_to_move(block->cache_type(), context.cache_type)) {
954
192k
                    need_update_lru_blocks.emplace_back(block);
955
192k
                }
956
712k
            }
957
726k
        }
958
725k
    }
959
960
725k
    if (async_touch_on_get_or_set) {
961
193k
        for (auto& block : need_update_lru_blocks) {
962
192k
            add_need_update_lru_block(std::move(block));
963
192k
        }
964
193k
    }
965
966
725k
    *_get_or_set_latency_us << (duration / 1000);
967
725k
    return FileBlocksHolder(std::move(file_blocks));
968
725k
}
969
970
FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& hash, const CacheContext& context,
971
                                        size_t offset, size_t size, FileBlock::State state,
972
13.6k
                                        std::lock_guard<std::mutex>& cache_lock) {
973
    /// Create a file block cell and put it in `files` map by [hash][offset].
974
13.6k
    if (size == 0) {
975
0
        return nullptr; /// Empty files are not cached.
976
0
    }
977
978
13.6k
    VLOG_DEBUG << "Adding file block to cache. size=" << size << " hash=" << hash.to_string()
979
0
               << " offset=" << offset << " cache_type=" << cache_type_to_string(context.cache_type)
980
0
               << " expiration_time=" << context.expiration_time
981
0
               << " tablet_id=" << context.tablet_id;
982
983
13.6k
    if (size > 1024 * 1024 * 1024) {
984
1
        LOG(WARNING) << "File block size is too large for a block, reject. size=" << size
985
1
                     << " hash=" << hash.to_string() << " offset=" << offset
986
1
                     << " stack:" << get_stack_trace();
987
1
        return nullptr;
988
1
    }
989
990
13.6k
    auto& offsets = _files[hash];
991
13.6k
    auto itr = offsets.find(offset);
992
13.6k
    if (itr != offsets.end()) {
993
5
        VLOG_DEBUG << "Cache already exists for hash: " << hash.to_string()
994
0
                   << ", offset: " << offset << ", size: " << size
995
0
                   << ".\nCurrent cache structure: " << dump_structure_unlocked(hash, cache_lock);
996
5
        return &(itr->second);
997
5
    }
998
999
13.6k
    FileCacheKey key;
1000
13.6k
    key.hash = hash;
1001
13.6k
    key.offset = offset;
1002
13.6k
    key.meta.type = context.cache_type;
1003
13.6k
    key.meta.expiration_time = context.expiration_time;
1004
13.6k
    key.meta.tablet_id = context.tablet_id;
1005
13.6k
    FileBlockCell cell(std::make_shared<FileBlock>(key, size, this, state), cache_lock);
1006
13.6k
    Status st;
1007
13.6k
    if (context.expiration_time == 0 && context.cache_type == FileCacheType::TTL) {
1008
0
        st = cell.file_block->change_cache_type_lock(FileCacheType::NORMAL, cache_lock);
1009
13.6k
    } else if (context.cache_type != FileCacheType::TTL && context.expiration_time != 0) {
1010
1
        st = cell.file_block->change_cache_type_lock(FileCacheType::TTL, cache_lock);
1011
1
    }
1012
13.6k
    if (!st.ok()) {
1013
0
        LOG(WARNING) << "Cannot change cache type. expiration_time=" << context.expiration_time
1014
0
                     << " cache_type=" << cache_type_to_string(context.cache_type)
1015
0
                     << " error=" << st.msg();
1016
0
    }
1017
1018
13.6k
    auto& queue = get_queue(cell.file_block->cache_type());
1019
13.6k
    cell.queue_iterator = queue.add(hash, offset, size, cache_lock);
1020
13.6k
    _lru_recorder->record_queue_event(cell.file_block->cache_type(), CacheLRULogType::ADD,
1021
13.6k
                                      cell.file_block->get_hash_value(), cell.file_block->offset(),
1022
13.6k
                                      cell.size());
1023
1024
13.6k
    if (cell.file_block->cache_type() == FileCacheType::TTL) {
1025
3.92k
        _cur_ttl_size += cell.size();
1026
3.92k
    }
1027
13.6k
    auto [it, _] = offsets.insert(std::make_pair(offset, std::move(cell)));
1028
13.6k
    _cur_cache_size += size;
1029
13.6k
    return &(it->second);
1030
13.6k
}
1031
1032
0
size_t BlockFileCache::try_release() {
1033
0
    SCOPED_CACHE_LOCK(_mutex, this);
1034
0
    std::vector<FileBlockCell*> trash;
1035
0
    for (auto& [hash, blocks] : _files) {
1036
0
        for (auto& [offset, cell] : blocks) {
1037
0
            if (cell.releasable()) {
1038
0
                trash.emplace_back(&cell);
1039
0
            } else {
1040
0
                cell.file_block->set_deleting();
1041
0
            }
1042
0
        }
1043
0
    }
1044
0
    size_t remove_size = 0;
1045
0
    for (auto& cell : trash) {
1046
0
        FileBlockSPtr file_block = cell->file_block;
1047
0
        std::lock_guard lc(cell->file_block->_mutex);
1048
0
        remove_size += file_block->range().size();
1049
0
        remove(file_block, cache_lock, lc);
1050
0
        VLOG_DEBUG << "try_release " << _cache_base_path
1051
0
                   << " hash=" << file_block->get_hash_value().to_string()
1052
0
                   << " offset=" << file_block->offset();
1053
0
    }
1054
0
    *_evict_by_try_release << remove_size;
1055
0
    LOG(INFO) << "Released " << trash.size() << " blocks in file cache " << _cache_base_path;
1056
0
    return trash.size();
1057
0
}
1058
1059
764k
LRUQueue& BlockFileCache::get_queue(FileCacheType type) {
1060
764k
    switch (type) {
1061
14.5k
    case FileCacheType::INDEX:
1062
14.5k
        return _index_queue;
1063
14.3k
    case FileCacheType::DISPOSABLE:
1064
14.3k
        return _disposable_queue;
1065
729k
    case FileCacheType::NORMAL:
1066
729k
        return _normal_queue;
1067
5.79k
    case FileCacheType::TTL:
1068
5.79k
        return _ttl_queue;
1069
0
    default:
1070
0
        DCHECK(false);
1071
764k
    }
1072
0
    return _normal_queue;
1073
764k
}
1074
1075
140
const LRUQueue& BlockFileCache::get_queue(FileCacheType type) const {
1076
140
    switch (type) {
1077
33
    case FileCacheType::INDEX:
1078
33
        return _index_queue;
1079
1
    case FileCacheType::DISPOSABLE:
1080
1
        return _disposable_queue;
1081
106
    case FileCacheType::NORMAL:
1082
106
        return _normal_queue;
1083
0
    case FileCacheType::TTL:
1084
0
        return _ttl_queue;
1085
0
    default:
1086
0
        DCHECK(false);
1087
140
    }
1088
0
    return _normal_queue;
1089
140
}
1090
1091
void BlockFileCache::remove_file_blocks(std::vector<FileBlockCell*>& to_evict,
1092
                                        std::lock_guard<std::mutex>& cache_lock, bool sync,
1093
14.7k
                                        std::string& reason) {
1094
14.7k
    auto remove_file_block_if = [&](FileBlockCell* cell) {
1095
1.36k
        FileBlockSPtr file_block = cell->file_block;
1096
1.36k
        if (file_block) {
1097
1.36k
            std::lock_guard block_lock(file_block->_mutex);
1098
1.36k
            remove(file_block, cache_lock, block_lock, sync);
1099
1.36k
            VLOG_DEBUG << "remove_file_blocks"
1100
0
                       << " hash=" << file_block->get_hash_value().to_string()
1101
0
                       << " offset=" << file_block->offset() << " reason=" << reason;
1102
1.36k
        }
1103
1.36k
    };
1104
14.7k
    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
1105
14.7k
}
1106
1107
void BlockFileCache::find_evict_candidates(LRUQueue& queue, size_t size, size_t cur_cache_size,
1108
                                           size_t& removed_size,
1109
                                           std::vector<FileBlockCell*>& to_evict,
1110
                                           std::lock_guard<std::mutex>& cache_lock,
1111
1.33k
                                           size_t& cur_removed_size, bool evict_in_advance) {
1112
3.09k
    for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1113
3.09k
        if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1114
1.27k
            break;
1115
1.27k
        }
1116
1.81k
        auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1117
1118
1.81k
        DCHECK(cell) << "Cache became inconsistent. key: " << entry_key.to_string()
1119
0
                     << ", offset: " << entry_offset;
1120
1121
1.81k
        size_t cell_size = cell->size();
1122
1.81k
        DCHECK(entry_size == cell_size);
1123
1124
1.81k
        if (cell->releasable()) {
1125
1.34k
            auto& file_block = cell->file_block;
1126
1127
1.34k
            std::lock_guard block_lock(file_block->_mutex);
1128
1.34k
            DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1129
1.34k
            to_evict.push_back(cell);
1130
1.34k
            removed_size += cell_size;
1131
1.34k
            cur_removed_size += cell_size;
1132
1.34k
        }
1133
1.81k
    }
1134
1.33k
}
1135
1136
// 1. if async load file cache not finish
1137
//     a. evict from lru queue
1138
// 2. if ttl cache
1139
//     a. evict from disposable/normal/index queue one by one
1140
// 3. if dont reach query limit or dont have query limit
1141
//     a. evict from other queue
1142
//     b. evict from current queue
1143
//         a.1 if the data belong write, then just evict cold data
1144
// 4. if reach query limit
1145
//     a. evict from query queue
1146
//     b. evict from other queue
1147
bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const CacheContext& context,
1148
                                 size_t offset, size_t size,
1149
13.4k
                                 std::lock_guard<std::mutex>& cache_lock) {
1150
13.4k
    if (!_async_open_done) {
1151
4
        return try_reserve_during_async_load(size, cache_lock);
1152
4
    }
1153
    // use this strategy in scenarios where there is insufficient disk capacity or insufficient number of inodes remaining
1154
    // directly eliminate 5 times the size of the space
1155
13.4k
    if (_disk_resource_limit_mode) {
1156
1
        size = 5 * size;
1157
1
    }
1158
1159
13.4k
    auto query_context = config::enable_file_cache_query_limit &&
1160
13.4k
                                         (context.query_id.hi != 0 || context.query_id.lo != 0)
1161
13.4k
                                 ? get_query_context(context.query_id, cache_lock)
1162
13.4k
                                 : nullptr;
1163
13.4k
    if (!query_context) {
1164
13.4k
        return try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock);
1165
13.4k
    } else if (query_context->get_cache_size(cache_lock) + size <=
1166
31
               query_context->get_max_cache_size()) {
1167
16
        return try_reserve_for_lru(hash, query_context, context, offset, size, cache_lock);
1168
16
    }
1169
15
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
1170
15
                               std::chrono::steady_clock::now().time_since_epoch())
1171
15
                               .count();
1172
15
    auto& queue = get_queue(context.cache_type);
1173
15
    size_t removed_size = 0;
1174
15
    size_t ghost_remove_size = 0;
1175
15
    size_t queue_size = queue.get_capacity(cache_lock);
1176
15
    size_t cur_cache_size = _cur_cache_size;
1177
15
    size_t query_context_cache_size = query_context->get_cache_size(cache_lock);
1178
1179
15
    std::vector<LRUQueue::Iterator> ghost;
1180
15
    std::vector<FileBlockCell*> to_evict;
1181
1182
15
    size_t max_size = queue.get_max_size();
1183
48
    auto is_overflow = [&] {
1184
48
        return _disk_resource_limit_mode ? removed_size < size
1185
48
                                         : cur_cache_size + size - removed_size > _capacity ||
1186
48
                                                   (queue_size + size - removed_size > max_size) ||
1187
48
                                                   (query_context_cache_size + size -
1188
38
                                                            (removed_size + ghost_remove_size) >
1189
38
                                                    query_context->get_max_cache_size());
1190
48
    };
1191
1192
    /// Select the cache from the LRU queue held by query for expulsion.
1193
35
    for (auto iter = query_context->queue().begin(); iter != query_context->queue().end(); iter++) {
1194
33
        if (!is_overflow()) {
1195
13
            break;
1196
13
        }
1197
1198
20
        auto* cell = get_cell(iter->hash, iter->offset, cache_lock);
1199
1200
20
        if (!cell) {
1201
            /// The cache corresponding to this record may be swapped out by
1202
            /// other queries, so it has become invalid.
1203
4
            ghost.push_back(iter);
1204
4
            ghost_remove_size += iter->size;
1205
16
        } else {
1206
16
            size_t cell_size = cell->size();
1207
16
            DCHECK(iter->size == cell_size);
1208
1209
16
            if (cell->releasable()) {
1210
16
                auto& file_block = cell->file_block;
1211
1212
16
                std::lock_guard block_lock(file_block->_mutex);
1213
16
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1214
16
                to_evict.push_back(cell);
1215
16
                removed_size += cell_size;
1216
16
            }
1217
16
        }
1218
20
    }
1219
1220
16
    auto remove_file_block_if = [&](FileBlockCell* cell) {
1221
16
        FileBlockSPtr file_block = cell->file_block;
1222
16
        if (file_block) {
1223
16
            query_context->remove(file_block->get_hash_value(), file_block->offset(), cache_lock);
1224
16
            std::lock_guard block_lock(file_block->_mutex);
1225
16
            remove(file_block, cache_lock, block_lock);
1226
16
        }
1227
16
    };
1228
1229
15
    for (auto& iter : ghost) {
1230
4
        query_context->remove(iter->hash, iter->offset, cache_lock);
1231
4
    }
1232
1233
15
    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
1234
1235
15
    if (is_overflow() &&
1236
15
        !try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock)) {
1237
1
        return false;
1238
1
    }
1239
14
    query_context->reserve(hash, offset, size, cache_lock);
1240
14
    return true;
1241
15
}
1242
1243
2
void BlockFileCache::try_evict_in_advance(size_t size, std::lock_guard<std::mutex>& cache_lock) {
1244
2
    UInt128Wrapper hash = UInt128Wrapper();
1245
2
    size_t offset = 0;
1246
2
    CacheContext context;
1247
    /* we pick NORMAL and TTL cache to evict in advance
1248
     * we reserve for them but won't acutually give space to them
1249
     * on the contrary, NORMAL and TTL may sacrifice by LRU evicting themselves
1250
     * other cache types cannot be exempted because we will evict what they have stolen before LRU evicting
1251
     * in summary: all cache types will shrink somewhat, and NORMAL and TTL shrink the most, to make sure the cache is not full
1252
     */
1253
2
    context.cache_type = FileCacheType::NORMAL;
1254
2
    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
1255
2
    context.cache_type = FileCacheType::TTL;
1256
2
    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
1257
2
}
1258
1259
// remove specific cache synchronously, for critical operations
1260
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
1261
8
void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) {
1262
8
    std::string reason = "remove_if_cached";
1263
8
    SCOPED_CACHE_LOCK(_mutex, this);
1264
8
    auto iter = _files.find(file_key);
1265
8
    std::vector<FileBlockCell*> to_remove;
1266
8
    if (iter != _files.end()) {
1267
14
        for (auto& [_, cell] : iter->second) {
1268
14
            if (cell.releasable()) {
1269
13
                to_remove.push_back(&cell);
1270
13
            } else {
1271
1
                cell.file_block->set_deleting();
1272
1
            }
1273
14
        }
1274
6
    }
1275
8
    remove_file_blocks(to_remove, cache_lock, true, reason);
1276
8
}
1277
1278
// the async version of remove_if_cached, for background operations
1279
// cache meta is deleted synchronously if not in use, and the block file is deleted asynchronously
1280
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
1281
1
void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) {
1282
1
    std::string reason = "remove_if_cached_async";
1283
1
    SCOPED_CACHE_LOCK(_mutex, this);
1284
1285
1
    auto iter = _files.find(file_key);
1286
1
    std::vector<FileBlockCell*> to_remove;
1287
1
    if (iter != _files.end()) {
1288
1
        for (auto& [_, cell] : iter->second) {
1289
1
            *_gc_evict_bytes_metrics << cell.size();
1290
1
            *_gc_evict_count_metrics << 1;
1291
1
            if (cell.releasable()) {
1292
0
                to_remove.push_back(&cell);
1293
1
            } else {
1294
1
                cell.file_block->set_deleting();
1295
1
            }
1296
1
        }
1297
1
    }
1298
1
    remove_file_blocks(to_remove, cache_lock, false, reason);
1299
1
}
1300
1301
std::vector<FileCacheType> BlockFileCache::get_other_cache_type_without_ttl(
1302
13.4k
        FileCacheType cur_cache_type) {
1303
13.4k
    switch (cur_cache_type) {
1304
3.91k
    case FileCacheType::TTL:
1305
3.91k
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
1306
671
    case FileCacheType::INDEX:
1307
671
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL};
1308
8.27k
    case FileCacheType::NORMAL:
1309
8.27k
        return {FileCacheType::DISPOSABLE, FileCacheType::INDEX};
1310
617
    case FileCacheType::DISPOSABLE:
1311
617
        return {FileCacheType::NORMAL, FileCacheType::INDEX};
1312
0
    default:
1313
0
        return {};
1314
13.4k
    }
1315
0
    return {};
1316
13.4k
}
1317
1318
1.24k
std::vector<FileCacheType> BlockFileCache::get_other_cache_type(FileCacheType cur_cache_type) {
1319
1.24k
    switch (cur_cache_type) {
1320
284
    case FileCacheType::TTL:
1321
284
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
1322
143
    case FileCacheType::INDEX:
1323
143
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::TTL};
1324
665
    case FileCacheType::NORMAL:
1325
665
        return {FileCacheType::DISPOSABLE, FileCacheType::INDEX, FileCacheType::TTL};
1326
152
    case FileCacheType::DISPOSABLE:
1327
152
        return {FileCacheType::NORMAL, FileCacheType::INDEX, FileCacheType::TTL};
1328
0
    default:
1329
0
        return {};
1330
1.24k
    }
1331
0
    return {};
1332
1.24k
}
1333
1334
void BlockFileCache::reset_range(const UInt128Wrapper& hash, size_t offset, size_t old_size,
1335
5
                                 size_t new_size, std::lock_guard<std::mutex>& cache_lock) {
1336
5
    DCHECK(_files.find(hash) != _files.end() &&
1337
5
           _files.find(hash)->second.find(offset) != _files.find(hash)->second.end());
1338
5
    FileBlockCell* cell = get_cell(hash, offset, cache_lock);
1339
5
    DCHECK(cell != nullptr);
1340
5
    if (cell == nullptr) {
1341
0
        LOG(WARNING) << "reset_range skipped because cache cell is missing. hash="
1342
0
                     << hash.to_string() << " offset=" << offset << " old_size=" << old_size
1343
0
                     << " new_size=" << new_size;
1344
0
        return;
1345
0
    }
1346
5
    DCHECK_EQ(cell->file_block->_block_range.size(), old_size);
1347
5
    if (cell->queue_iterator) {
1348
5
        auto& queue = get_queue(cell->file_block->cache_type());
1349
5
        DCHECK(queue.contains(hash, offset, cache_lock));
1350
5
        queue.resize(*cell->queue_iterator, new_size, cache_lock);
1351
5
        _lru_recorder->record_queue_event(cell->file_block->cache_type(), CacheLRULogType::RESIZE,
1352
5
                                          cell->file_block->get_hash_value(),
1353
5
                                          cell->file_block->offset(), new_size);
1354
5
    }
1355
5
    cell->file_block->_block_range.right = cell->file_block->_block_range.left + new_size - 1;
1356
5
    _cur_cache_size -= old_size;
1357
5
    _cur_cache_size += new_size;
1358
5
    if (cell->file_block->cache_type() == FileCacheType::TTL) {
1359
0
        _cur_ttl_size -= old_size;
1360
0
        _cur_ttl_size += new_size;
1361
0
    }
1362
5
}
1363
1364
bool BlockFileCache::try_reserve_from_other_queue_by_time_interval(
1365
        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
1366
13.4k
        int64_t cur_time, std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
1367
13.4k
    size_t removed_size = 0;
1368
13.4k
    size_t cur_cache_size = _cur_cache_size;
1369
13.4k
    std::vector<FileBlockCell*> to_evict;
1370
30.8k
    for (FileCacheType cache_type : other_cache_types) {
1371
30.8k
        auto& queue = get_queue(cache_type);
1372
30.8k
        size_t remove_size_per_type = 0;
1373
30.8k
        for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1374
1.43k
            if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1375
697
                break;
1376
697
            }
1377
739
            auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1378
739
            DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
1379
0
                         << ", offset: " << entry_offset;
1380
1381
739
            size_t cell_size = cell->size();
1382
739
            DCHECK(entry_size == cell_size);
1383
1384
739
            if (cell->atime == 0 ? true : cell->atime + queue.get_hot_data_interval() > cur_time) {
1385
737
                break;
1386
737
            }
1387
1388
2
            if (cell->releasable()) {
1389
2
                auto& file_block = cell->file_block;
1390
2
                std::lock_guard block_lock(file_block->_mutex);
1391
2
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1392
2
                to_evict.push_back(cell);
1393
2
                removed_size += cell_size;
1394
2
                remove_size_per_type += cell_size;
1395
2
            }
1396
2
        }
1397
30.8k
        *(_evict_by_time_metrics_matrix[cache_type][cur_type]) << remove_size_per_type;
1398
30.8k
    }
1399
13.4k
    bool is_sync_removal = !evict_in_advance;
1400
13.4k
    std::string reason = std::string("try_reserve_by_time ") +
1401
13.4k
                         " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1402
13.4k
    remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1403
1404
13.4k
    return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
1405
13.4k
}
1406
1407
bool BlockFileCache::is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size,
1408
19.2k
                                 bool evict_in_advance) const {
1409
19.2k
    bool ret = false;
1410
19.2k
    if (evict_in_advance) { // we don't need to check _need_evict_cache_in_advance
1411
23
        ret = (removed_size < need_size);
1412
23
        return ret;
1413
23
    }
1414
19.2k
    if (_disk_resource_limit_mode) {
1415
4
        ret = (removed_size < need_size);
1416
19.2k
    } else {
1417
19.2k
        ret = (cur_cache_size + need_size - removed_size > _capacity);
1418
19.2k
    }
1419
19.2k
    return ret;
1420
19.2k
}
1421
1422
bool BlockFileCache::try_reserve_from_other_queue_by_size(
1423
        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
1424
415
        std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
1425
415
    size_t removed_size = 0;
1426
415
    size_t cur_cache_size = _cur_cache_size;
1427
415
    std::vector<FileBlockCell*> to_evict;
1428
    // we follow the privilege defined in get_other_cache_types to evict
1429
1.24k
    for (FileCacheType cache_type : other_cache_types) {
1430
1.24k
        auto& queue = get_queue(cache_type);
1431
1432
        // we will not drain each of them to the bottom -- i.e., we only
1433
        // evict what they have stolen.
1434
1.24k
        size_t cur_queue_size = queue.get_capacity(cache_lock);
1435
1.24k
        size_t cur_queue_max_size = queue.get_max_size();
1436
1.24k
        if (cur_queue_size <= cur_queue_max_size) {
1437
740
            continue;
1438
740
        }
1439
505
        size_t cur_removed_size = 0;
1440
505
        find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
1441
505
                              cur_removed_size, evict_in_advance);
1442
505
        *(_evict_by_size_metrics_matrix[cache_type][cur_type]) << cur_removed_size;
1443
505
    }
1444
415
    bool is_sync_removal = !evict_in_advance;
1445
415
    std::string reason = std::string("try_reserve_by_size") +
1446
415
                         " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1447
415
    remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1448
415
    return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
1449
415
}
1450
1451
bool BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type, size_t size,
1452
                                                  int64_t cur_time,
1453
                                                  std::lock_guard<std::mutex>& cache_lock,
1454
13.4k
                                                  bool evict_in_advance) {
1455
    // currently, TTL cache is not considered as a candidate
1456
13.4k
    auto other_cache_types = get_other_cache_type_without_ttl(cur_cache_type);
1457
13.4k
    bool reserve_success = try_reserve_from_other_queue_by_time_interval(
1458
13.4k
            cur_cache_type, other_cache_types, size, cur_time, cache_lock, evict_in_advance);
1459
13.4k
    if (reserve_success || !config::file_cache_enable_evict_from_other_queue_by_size) {
1460
12.2k
        return reserve_success;
1461
12.2k
    }
1462
1463
1.24k
    other_cache_types = get_other_cache_type(cur_cache_type);
1464
1.24k
    auto& cur_queue = get_queue(cur_cache_type);
1465
1.24k
    size_t cur_queue_size = cur_queue.get_capacity(cache_lock);
1466
1.24k
    size_t cur_queue_max_size = cur_queue.get_max_size();
1467
    // Hit the soft limit by self, cannot remove from other queues
1468
1.24k
    if (_cur_cache_size + size > _capacity && cur_queue_size + size > cur_queue_max_size) {
1469
829
        return false;
1470
829
    }
1471
415
    return try_reserve_from_other_queue_by_size(cur_cache_type, other_cache_types, size, cache_lock,
1472
415
                                                evict_in_advance);
1473
1.24k
}
1474
1475
bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash,
1476
                                         QueryFileCacheContextPtr query_context,
1477
                                         const CacheContext& context, size_t offset, size_t size,
1478
                                         std::lock_guard<std::mutex>& cache_lock,
1479
13.4k
                                         bool evict_in_advance) {
1480
13.4k
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
1481
13.4k
                               std::chrono::steady_clock::now().time_since_epoch())
1482
13.4k
                               .count();
1483
13.4k
    if (!try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock,
1484
13.4k
                                      evict_in_advance)) {
1485
831
        auto& queue = get_queue(context.cache_type);
1486
831
        size_t removed_size = 0;
1487
831
        size_t cur_cache_size = _cur_cache_size;
1488
1489
831
        std::vector<FileBlockCell*> to_evict;
1490
831
        size_t cur_removed_size = 0;
1491
831
        find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
1492
831
                              cur_removed_size, evict_in_advance);
1493
831
        bool is_sync_removal = !evict_in_advance;
1494
831
        std::string reason = std::string("try_reserve for cache type ") +
1495
831
                             cache_type_to_string(context.cache_type) +
1496
831
                             " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1497
831
        remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1498
831
        *(_evict_by_self_lru_metrics_matrix[context.cache_type]) << cur_removed_size;
1499
1500
831
        if (is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1501
61
            return false;
1502
61
        }
1503
831
    }
1504
1505
13.4k
    if (query_context) {
1506
16
        query_context->reserve(hash, offset, size, cache_lock);
1507
16
    }
1508
13.4k
    return true;
1509
13.4k
}
1510
1511
template <class T, class U>
1512
    requires IsXLock<T> && IsXLock<U>
1513
3.42k
void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lock, bool sync) {
1514
3.42k
    auto hash = file_block->get_hash_value();
1515
3.42k
    auto offset = file_block->offset();
1516
3.42k
    auto type = file_block->cache_type();
1517
3.42k
    auto expiration_time = file_block->expiration_time();
1518
3.42k
    auto tablet_id = file_block->tablet_id();
1519
3.42k
    auto* cell = get_cell(hash, offset, cache_lock);
1520
3.42k
    file_block->cell = nullptr;
1521
    // Holder cleanup can race with prior cache metadata cleanup. In that case,
1522
    // skip the duplicate remove instead of touching a detached or replaced cell.
1523
3.42k
    if (cell == nullptr) {
1524
1
        LOG(WARNING) << "remove skipped because cache cell is missing. hash=" << hash.to_string()
1525
1
                     << " offset=" << offset << " size=" << file_block->range().size()
1526
1
                     << " type=" << cache_type_to_string(type)
1527
1
                     << " state=" << FileBlock::state_to_string(file_block->state_unsafe())
1528
1
                     << " expiration_time=" << expiration_time << " sync=" << sync;
1529
1
        return;
1530
1
    }
1531
3.42k
    if (cell->file_block.get() != file_block.get()) {
1532
1
        auto* cell_file_block = cell->file_block.get();
1533
1
        LOG(WARNING)
1534
1
                << "remove skipped because cache cell points to a different file block. hash="
1535
1
                << hash.to_string() << " offset=" << offset
1536
1
                << " size=" << file_block->range().size() << " type=" << cache_type_to_string(type)
1537
1
                << " state=" << FileBlock::state_to_string(file_block->state_unsafe())
1538
1
                << " expiration_time=" << expiration_time << " sync=" << sync << " cell_block_hash="
1539
1
                << (cell_file_block ? cell_file_block->get_hash_value().to_string() : "<null>")
1540
1
                << " cell_block_offset="
1541
1
                << (cell_file_block ? std::to_string(cell_file_block->offset()) : "<null>")
1542
1
                << " cell_block_size="
1543
1
                << (cell_file_block ? std::to_string(cell_file_block->range().size()) : "<null>")
1544
1
                << " cell_block_type="
1545
1
                << (cell_file_block ? cache_type_to_string(cell_file_block->cache_type())
1546
1
                                    : "<null>")
1547
1
                << " cell_block_state="
1548
1
                << (cell_file_block ? FileBlock::state_to_string(cell_file_block->state_unsafe())
1549
1
                                    : "<null>");
1550
1
        return;
1551
1
    }
1552
3.42k
    DCHECK(cell->queue_iterator);
1553
3.41k
    if (cell->queue_iterator) {
1554
3.41k
        auto& queue = get_queue(file_block->cache_type());
1555
3.41k
        queue.remove(*cell->queue_iterator, cache_lock);
1556
3.41k
        _lru_recorder->record_queue_event(file_block->cache_type(), CacheLRULogType::REMOVE,
1557
3.41k
                                          cell->file_block->get_hash_value(),
1558
3.41k
                                          cell->file_block->offset(), cell->size());
1559
3.41k
    }
1560
3.41k
    *_queue_evict_size_metrics[static_cast<int>(file_block->cache_type())]
1561
3.41k
            << file_block->range().size();
1562
3.41k
    *_total_evict_size_metrics << file_block->range().size();
1563
1564
3.41k
    VLOG_DEBUG << "Removing file block from cache. hash: " << hash.to_string()
1565
0
               << ", offset: " << offset << ", size: " << file_block->range().size()
1566
0
               << ", type: " << cache_type_to_string(type);
1567
1568
3.41k
    if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADED) {
1569
1.42k
        FileCacheKey key;
1570
1.42k
        key.hash = hash;
1571
1.42k
        key.offset = offset;
1572
1.42k
        key.meta.type = type;
1573
1.42k
        key.meta.expiration_time = expiration_time;
1574
1.42k
        key.meta.tablet_id = tablet_id;
1575
1.42k
        if (sync) {
1576
1.38k
            int64_t duration_ns = 0;
1577
1.38k
            Status st;
1578
1.38k
            {
1579
1.38k
                SCOPED_RAW_TIMER(&duration_ns);
1580
1.38k
                st = _storage->remove(key);
1581
1.38k
            }
1582
1.38k
            *_storage_sync_remove_latency_us << (duration_ns / 1000);
1583
1.38k
            if (!st.ok()) {
1584
0
                LOG_WARNING("").error(st);
1585
0
            }
1586
1.38k
        } else {
1587
            // the file will be deleted in the bottom half
1588
            // so there will be a window that the file is not in the cache but still in the storage
1589
            // but it's ok, because the rowset is stale already
1590
47
            bool ret = _recycle_keys.enqueue(key);
1591
47
            if (ret) [[likely]] {
1592
47
                _recycle_keys_length_metrics->set_value(_recycle_keys.size_approx());
1593
47
            } else {
1594
0
                LOG_WARNING("Failed to push recycle key to queue, do it synchronously");
1595
0
                int64_t duration_ns = 0;
1596
0
                Status st;
1597
0
                {
1598
0
                    SCOPED_RAW_TIMER(&duration_ns);
1599
0
                    st = _storage->remove(key);
1600
0
                }
1601
0
                *_storage_retry_sync_remove_latency_us << (duration_ns / 1000);
1602
0
                if (!st.ok()) {
1603
0
                    LOG_WARNING("").error(st);
1604
0
                }
1605
0
            }
1606
47
        }
1607
1.99k
    } else if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADING) {
1608
0
        file_block->set_deleting();
1609
0
        return;
1610
0
    }
1611
3.41k
    _cur_cache_size -= file_block->range().size();
1612
3.41k
    if (FileCacheType::TTL == type) {
1613
1.29k
        _cur_ttl_size -= file_block->range().size();
1614
1.29k
    }
1615
3.41k
    auto it = _files.find(hash);
1616
3.41k
    if (it != _files.end()) {
1617
3.41k
        it->second.erase(file_block->offset());
1618
3.41k
        if (it->second.empty()) {
1619
1.34k
            _files.erase(hash);
1620
1.34k
        }
1621
3.41k
    }
1622
3.41k
    *_num_removed_blocks << 1;
1623
3.41k
}
1624
1625
46
size_t BlockFileCache::get_used_cache_size(FileCacheType cache_type) const {
1626
46
    SCOPED_CACHE_LOCK(_mutex, this);
1627
46
    return get_used_cache_size_unlocked(cache_type, cache_lock);
1628
46
}
1629
1630
size_t BlockFileCache::get_used_cache_size_unlocked(FileCacheType cache_type,
1631
46
                                                    std::lock_guard<std::mutex>& cache_lock) const {
1632
46
    return get_queue(cache_type).get_capacity(cache_lock);
1633
46
}
1634
1635
0
size_t BlockFileCache::get_available_cache_size(FileCacheType cache_type) const {
1636
0
    SCOPED_CACHE_LOCK(_mutex, this);
1637
0
    return get_available_cache_size_unlocked(cache_type, cache_lock);
1638
0
}
1639
1640
size_t BlockFileCache::get_available_cache_size_unlocked(
1641
0
        FileCacheType cache_type, std::lock_guard<std::mutex>& cache_lock) const {
1642
0
    return get_queue(cache_type).get_max_element_size() -
1643
0
           get_used_cache_size_unlocked(cache_type, cache_lock);
1644
0
}
1645
1646
91
size_t BlockFileCache::get_file_blocks_num(FileCacheType cache_type) const {
1647
91
    SCOPED_CACHE_LOCK(_mutex, this);
1648
91
    return get_file_blocks_num_unlocked(cache_type, cache_lock);
1649
91
}
1650
1651
size_t BlockFileCache::get_file_blocks_num_unlocked(FileCacheType cache_type,
1652
91
                                                    std::lock_guard<std::mutex>& cache_lock) const {
1653
91
    return get_queue(cache_type).get_elements_num(cache_lock);
1654
91
}
1655
1656
FileBlockCell::FileBlockCell(FileBlockSPtr file_block, std::lock_guard<std::mutex>& cache_lock)
1657
13.6k
        : file_block(file_block) {
1658
13.6k
    file_block->cell = this;
1659
    /**
1660
     * Cell can be created with either DOWNLOADED or EMPTY file block's state.
1661
     * File block acquires DOWNLOADING state and creates LRUQueue iterator on first
1662
     * successful getOrSetDownaloder call.
1663
     */
1664
1665
13.6k
    switch (file_block->_download_state) {
1666
198
    case FileBlock::State::DOWNLOADED:
1667
13.6k
    case FileBlock::State::EMPTY:
1668
13.6k
    case FileBlock::State::SKIP_CACHE: {
1669
13.6k
        break;
1670
13.6k
    }
1671
0
    default:
1672
0
        DCHECK(false) << "Can create cell with either EMPTY, DOWNLOADED, SKIP_CACHE state, got: "
1673
0
                      << FileBlock::state_to_string(file_block->_download_state);
1674
13.6k
    }
1675
13.6k
    if (file_block->cache_type() == FileCacheType::TTL) {
1676
3.92k
        update_atime();
1677
3.92k
    }
1678
13.6k
}
1679
1680
LRUQueue::Iterator LRUQueue::add(const UInt128Wrapper& hash, size_t offset, size_t size,
1681
14.6k
                                 std::lock_guard<std::mutex>& /* cache_lock */) {
1682
14.6k
    cache_size += size;
1683
14.6k
    auto iter = queue.insert(queue.end(), FileKeyAndOffset(hash, offset, size));
1684
14.6k
    map.insert(std::make_pair(std::make_pair(hash, offset), iter));
1685
14.6k
    return iter;
1686
14.6k
}
1687
1688
0
void LRUQueue::remove_all(std::lock_guard<std::mutex>& /* cache_lock */) {
1689
0
    queue.clear();
1690
0
    map.clear();
1691
0
    cache_size = 0;
1692
0
}
1693
1694
524k
void LRUQueue::move_to_end(Iterator queue_it, std::lock_guard<std::mutex>& /* cache_lock */) {
1695
524k
    queue.splice(queue.end(), queue, queue_it);
1696
524k
}
1697
1698
void LRUQueue::resize(Iterator queue_it, size_t new_size,
1699
6
                      std::lock_guard<std::mutex>& /* cache_lock */) {
1700
6
    cache_size -= queue_it->size;
1701
6
    queue_it->size = new_size;
1702
6
    cache_size += new_size;
1703
6
}
1704
bool LRUQueue::contains(const UInt128Wrapper& hash, size_t offset,
1705
5
                        std::lock_guard<std::mutex>& /* cache_lock */) const {
1706
5
    return map.find(std::make_pair(hash, offset)) != map.end();
1707
5
}
1708
1709
LRUQueue::Iterator LRUQueue::get(const UInt128Wrapper& hash, size_t offset,
1710
6.03k
                                 std::lock_guard<std::mutex>& /* cache_lock */) const {
1711
6.03k
    auto itr = map.find(std::make_pair(hash, offset));
1712
6.03k
    if (itr != map.end()) {
1713
3.96k
        return itr->second;
1714
3.96k
    }
1715
2.07k
    return std::list<FileKeyAndOffset>::iterator();
1716
6.03k
}
1717
1718
2
std::string LRUQueue::to_string(std::lock_guard<std::mutex>& /* cache_lock */) const {
1719
2
    std::string result;
1720
18
    for (const auto& [hash, offset, size] : queue) {
1721
18
        if (!result.empty()) {
1722
16
            result += ", ";
1723
16
        }
1724
18
        result += fmt::format("{}: [{}, {}]", hash.to_string(), offset, offset + size - 1);
1725
18
    }
1726
2
    return result;
1727
2
}
1728
1729
size_t LRUQueue::levenshtein_distance_from(LRUQueue& base,
1730
5
                                           std::lock_guard<std::mutex>& cache_lock) {
1731
5
    std::list<FileKeyAndOffset> target_queue = this->queue;
1732
5
    std::list<FileKeyAndOffset> base_queue = base.queue;
1733
5
    std::vector<FileKeyAndOffset> vec1(target_queue.begin(), target_queue.end());
1734
5
    std::vector<FileKeyAndOffset> vec2(base_queue.begin(), base_queue.end());
1735
1736
5
    size_t m = vec1.size();
1737
5
    size_t n = vec2.size();
1738
1739
    // Create a 2D vector (matrix) to store the Levenshtein distances
1740
    // dp[i][j] will hold the distance between the first i elements of vec1 and the first j elements of vec2
1741
5
    std::vector<std::vector<size_t>> dp(m + 1, std::vector<size_t>(n + 1, 0));
1742
1743
    // Initialize the first row and column of the matrix
1744
    // The distance between an empty list and a list of length k is k (all insertions or deletions)
1745
22
    for (size_t i = 0; i <= m; ++i) {
1746
17
        dp[i][0] = i;
1747
17
    }
1748
20
    for (size_t j = 0; j <= n; ++j) {
1749
15
        dp[0][j] = j;
1750
15
    }
1751
1752
    // Fill the matrix using dynamic programming
1753
17
    for (size_t i = 1; i <= m; ++i) {
1754
38
        for (size_t j = 1; j <= n; ++j) {
1755
            // Check if the current elements of both vectors are equal
1756
26
            size_t cost = (vec1[i - 1].hash == vec2[j - 1].hash &&
1757
26
                           vec1[i - 1].offset == vec2[j - 1].offset)
1758
26
                                  ? 0
1759
26
                                  : 1;
1760
            // Calculate the minimum cost of three possible operations:
1761
            // 1. Insertion: dp[i][j-1] + 1
1762
            // 2. Deletion: dp[i-1][j] + 1
1763
            // 3. Substitution: dp[i-1][j-1] + cost (0 if elements are equal, 1 if not)
1764
26
            dp[i][j] = std::min({dp[i - 1][j] + 1, dp[i][j - 1] + 1, dp[i - 1][j - 1] + cost});
1765
26
        }
1766
12
    }
1767
    // The bottom-right cell of the matrix contains the Levenshtein distance
1768
5
    return dp[m][n];
1769
5
}
1770
1771
1
std::string BlockFileCache::dump_structure(const UInt128Wrapper& hash) {
1772
1
    SCOPED_CACHE_LOCK(_mutex, this);
1773
1
    return dump_structure_unlocked(hash, cache_lock);
1774
1
}
1775
1776
std::string BlockFileCache::dump_structure_unlocked(const UInt128Wrapper& hash,
1777
1
                                                    std::lock_guard<std::mutex>&) {
1778
1
    std::stringstream result;
1779
1
    auto it = _files.find(hash);
1780
1
    if (it == _files.end()) {
1781
0
        return std::string("");
1782
0
    }
1783
1
    const auto& cells_by_offset = it->second;
1784
1785
1
    for (const auto& [_, cell] : cells_by_offset) {
1786
1
        result << cell.file_block->get_info_for_log() << " "
1787
1
               << cache_type_to_string(cell.file_block->cache_type()) << "\n";
1788
1
    }
1789
1790
1
    return result.str();
1791
1
}
1792
1793
0
std::string BlockFileCache::dump_single_cache_type(const UInt128Wrapper& hash, size_t offset) {
1794
0
    SCOPED_CACHE_LOCK(_mutex, this);
1795
0
    return dump_single_cache_type_unlocked(hash, offset, cache_lock);
1796
0
}
1797
1798
std::string BlockFileCache::dump_single_cache_type_unlocked(const UInt128Wrapper& hash,
1799
                                                            size_t offset,
1800
0
                                                            std::lock_guard<std::mutex>&) {
1801
0
    std::stringstream result;
1802
0
    auto it = _files.find(hash);
1803
0
    if (it == _files.end()) {
1804
0
        return std::string("");
1805
0
    }
1806
0
    const auto& cells_by_offset = it->second;
1807
0
    const auto& cell = cells_by_offset.find(offset);
1808
1809
0
    return cache_type_to_string(cell->second.file_block->cache_type());
1810
0
}
1811
1812
void BlockFileCache::change_cache_type(const UInt128Wrapper& hash, size_t offset,
1813
                                       FileCacheType new_type,
1814
9
                                       std::lock_guard<std::mutex>& cache_lock) {
1815
9
    if (auto iter = _files.find(hash); iter != _files.end()) {
1816
9
        auto& file_blocks = iter->second;
1817
9
        if (auto cell_it = file_blocks.find(offset); cell_it != file_blocks.end()) {
1818
8
            FileBlockCell& cell = cell_it->second;
1819
8
            auto& cur_queue = get_queue(cell.file_block->cache_type());
1820
8
            DCHECK(cell.queue_iterator.has_value());
1821
8
            cur_queue.remove(*cell.queue_iterator, cache_lock);
1822
8
            _lru_recorder->record_queue_event(
1823
8
                    cell.file_block->cache_type(), CacheLRULogType::REMOVE,
1824
8
                    cell.file_block->get_hash_value(), cell.file_block->offset(), cell.size());
1825
8
            auto& new_queue = get_queue(new_type);
1826
8
            cell.queue_iterator =
1827
8
                    new_queue.add(hash, offset, cell.file_block->range().size(), cache_lock);
1828
8
            _lru_recorder->record_queue_event(new_type, CacheLRULogType::ADD,
1829
8
                                              cell.file_block->get_hash_value(),
1830
8
                                              cell.file_block->offset(), cell.size());
1831
8
        }
1832
9
    }
1833
9
}
1834
1835
// @brief: get a path's disk capacity used percent, inode used percent
1836
// @param: path
1837
// @param: percent.first disk used percent, percent.second inode used percent
1838
161
int disk_used_percentage(const std::string& path, std::pair<int, int>* percent) {
1839
161
    struct statfs stat;
1840
161
    int ret = statfs(path.c_str(), &stat);
1841
161
    if (ret != 0) {
1842
2
        return ret;
1843
2
    }
1844
    // https://github.com/coreutils/coreutils/blob/master/src/df.c#L1195
1845
    // v->used = stat.f_blocks - stat.f_bfree
1846
    // nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail
1847
159
    uintmax_t u100 = (stat.f_blocks - stat.f_bfree) * 100;
1848
159
    uintmax_t nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail;
1849
159
    int capacity_percentage = int(u100 / nonroot_total + (u100 % nonroot_total != 0));
1850
1851
159
    unsigned long long inode_free = stat.f_ffree;
1852
159
    unsigned long long inode_total = stat.f_files;
1853
159
    int inode_percentage = cast_set<int>(inode_free * 100 / inode_total);
1854
159
    percent->first = capacity_percentage;
1855
159
    percent->second = 100 - inode_percentage;
1856
1857
    // Add sync point for testing
1858
159
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::disk_used_percentage:1", percent);
1859
1860
159
    return 0;
1861
161
}
1862
1863
10
std::string BlockFileCache::reset_capacity(size_t new_capacity) {
1864
10
    using namespace std::chrono;
1865
10
    int64_t space_released = 0;
1866
10
    size_t old_capacity = 0;
1867
10
    std::stringstream ss;
1868
10
    ss << "finish reset_capacity, path=" << _cache_base_path;
1869
10
    auto adjust_start_time = steady_clock::time_point();
1870
10
    {
1871
10
        SCOPED_CACHE_LOCK(_mutex, this);
1872
10
        if (new_capacity < _capacity && new_capacity < _cur_cache_size) {
1873
1
            int64_t need_remove_size = _cur_cache_size - new_capacity;
1874
4
            auto remove_blocks = [&](LRUQueue& queue) -> int64_t {
1875
4
                int64_t queue_released = 0;
1876
4
                std::vector<FileBlockCell*> to_evict;
1877
13
                for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1878
13
                    if (need_remove_size <= 0) {
1879
1
                        break;
1880
1
                    }
1881
12
                    need_remove_size -= entry_size;
1882
12
                    space_released += entry_size;
1883
12
                    queue_released += entry_size;
1884
12
                    auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1885
12
                    if (!cell->releasable()) {
1886
0
                        cell->file_block->set_deleting();
1887
0
                        continue;
1888
0
                    }
1889
12
                    to_evict.push_back(cell);
1890
12
                }
1891
12
                for (auto& cell : to_evict) {
1892
12
                    FileBlockSPtr file_block = cell->file_block;
1893
12
                    std::lock_guard block_lock(file_block->_mutex);
1894
12
                    remove(file_block, cache_lock, block_lock);
1895
12
                }
1896
4
                return queue_released;
1897
4
            };
1898
1
            int64_t queue_released = remove_blocks(_disposable_queue);
1899
1
            ss << " disposable_queue released " << queue_released;
1900
1
            queue_released = remove_blocks(_normal_queue);
1901
1
            ss << " normal_queue released " << queue_released;
1902
1
            queue_released = remove_blocks(_index_queue);
1903
1
            ss << " index_queue released " << queue_released;
1904
1
            queue_released = remove_blocks(_ttl_queue);
1905
1
            ss << " ttl_queue released " << queue_released;
1906
1907
1
            _disk_resource_limit_mode = true;
1908
1
            _disk_limit_mode_metrics->set_value(1);
1909
1
            ss << " total_space_released=" << space_released;
1910
1
        }
1911
10
        old_capacity = _capacity;
1912
10
        _capacity = new_capacity;
1913
10
        _cache_capacity_metrics->set_value(_capacity);
1914
10
    }
1915
10
    auto use_time = duration_cast<milliseconds>(steady_clock::time_point() - adjust_start_time);
1916
10
    LOG(INFO) << "Finish tag deleted block. path=" << _cache_base_path
1917
10
              << " use_time=" << cast_set<int64_t>(use_time.count());
1918
10
    ss << " old_capacity=" << old_capacity << " new_capacity=" << new_capacity;
1919
10
    LOG(INFO) << ss.str();
1920
10
    return ss.str();
1921
10
}
1922
1923
943
void BlockFileCache::check_disk_resource_limit() {
1924
943
    if (_storage->get_type() != FileCacheStorageType::DISK) {
1925
796
        return;
1926
796
    }
1927
1928
147
    bool previous_mode = _disk_resource_limit_mode;
1929
147
    if (_capacity > _cur_cache_size) {
1930
145
        _disk_resource_limit_mode = false;
1931
145
        _disk_limit_mode_metrics->set_value(0);
1932
145
    }
1933
147
    std::pair<int, int> percent;
1934
147
    int ret = disk_used_percentage(_cache_base_path, &percent);
1935
147
    if (ret != 0) {
1936
1
        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
1937
1
        return;
1938
1
    }
1939
146
    auto [space_percentage, inode_percentage] = percent;
1940
292
    auto is_insufficient = [](const int& percentage) {
1941
292
        return percentage >= config::file_cache_enter_disk_resource_limit_mode_percent;
1942
292
    };
1943
146
    DCHECK_GE(space_percentage, 0);
1944
146
    DCHECK_LE(space_percentage, 100);
1945
146
    DCHECK_GE(inode_percentage, 0);
1946
146
    DCHECK_LE(inode_percentage, 100);
1947
    // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
1948
    // FIXME: reject with config validator
1949
146
    if (config::file_cache_enter_disk_resource_limit_mode_percent <
1950
146
        config::file_cache_exit_disk_resource_limit_mode_percent) {
1951
1
        LOG_WARNING("config error, set to default value")
1952
1
                .tag("enter", config::file_cache_enter_disk_resource_limit_mode_percent)
1953
1
                .tag("exit", config::file_cache_exit_disk_resource_limit_mode_percent);
1954
1
        config::file_cache_enter_disk_resource_limit_mode_percent = 88;
1955
1
        config::file_cache_exit_disk_resource_limit_mode_percent = 80;
1956
1
    }
1957
146
    bool is_space_insufficient = is_insufficient(space_percentage);
1958
146
    bool is_inode_insufficient = is_insufficient(inode_percentage);
1959
146
    if (is_space_insufficient || is_inode_insufficient) {
1960
1
        _disk_resource_limit_mode = true;
1961
1
        _disk_limit_mode_metrics->set_value(1);
1962
145
    } else if (_disk_resource_limit_mode &&
1963
145
               (space_percentage < config::file_cache_exit_disk_resource_limit_mode_percent) &&
1964
145
               (inode_percentage < config::file_cache_exit_disk_resource_limit_mode_percent)) {
1965
0
        _disk_resource_limit_mode = false;
1966
0
        _disk_limit_mode_metrics->set_value(0);
1967
0
    }
1968
146
    if (previous_mode != _disk_resource_limit_mode) {
1969
        // add log for disk resource limit mode switching
1970
2
        if (_disk_resource_limit_mode) {
1971
1
            LOG(WARNING) << "Entering disk resource limit mode: file_cache=" << get_base_path()
1972
1
                         << " space_percent=" << space_percentage
1973
1
                         << " inode_percent=" << inode_percentage
1974
1
                         << " is_space_insufficient=" << is_space_insufficient
1975
1
                         << " is_inode_insufficient=" << is_inode_insufficient
1976
1
                         << " enter threshold="
1977
1
                         << config::file_cache_enter_disk_resource_limit_mode_percent;
1978
1
        } else {
1979
1
            LOG(INFO) << "Exiting disk resource limit mode: file_cache=" << get_base_path()
1980
1
                      << " space_percent=" << space_percentage
1981
1
                      << " inode_percent=" << inode_percentage << " exit threshold="
1982
1
                      << config::file_cache_exit_disk_resource_limit_mode_percent;
1983
1
        }
1984
144
    } else if (_disk_resource_limit_mode) {
1985
        // print log for disk resource limit mode running, but less frequently
1986
0
        LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path()
1987
0
                                 << " space_percent=" << space_percentage
1988
0
                                 << " inode_percent=" << inode_percentage
1989
0
                                 << " is_space_insufficient=" << is_space_insufficient
1990
0
                                 << " is_inode_insufficient=" << is_inode_insufficient
1991
0
                                 << " mode run in resource limit";
1992
0
    }
1993
146
}
1994
1995
15
void BlockFileCache::check_need_evict_cache_in_advance() {
1996
15
    if (_storage->get_type() != FileCacheStorageType::DISK) {
1997
1
        return;
1998
1
    }
1999
2000
14
    std::pair<int, int> percent;
2001
14
    int ret = disk_used_percentage(_cache_base_path, &percent);
2002
14
    if (ret != 0) {
2003
1
        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
2004
1
        return;
2005
1
    }
2006
13
    auto [space_percentage, inode_percentage] = percent;
2007
13
    int size_percentage = static_cast<int>(_cur_cache_size * 100 / _capacity);
2008
39
    auto is_insufficient = [](const int& percentage) {
2009
39
        return percentage >= config::file_cache_enter_need_evict_cache_in_advance_percent;
2010
39
    };
2011
13
    DCHECK_GE(space_percentage, 0);
2012
13
    DCHECK_LE(space_percentage, 100);
2013
13
    DCHECK_GE(inode_percentage, 0);
2014
13
    DCHECK_LE(inode_percentage, 100);
2015
    // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
2016
    // FIXME: reject with config validator
2017
13
    if (config::file_cache_enter_need_evict_cache_in_advance_percent <=
2018
13
        config::file_cache_exit_need_evict_cache_in_advance_percent) {
2019
1
        LOG_WARNING("config error, set to default value")
2020
1
                .tag("enter", config::file_cache_enter_need_evict_cache_in_advance_percent)
2021
1
                .tag("exit", config::file_cache_exit_need_evict_cache_in_advance_percent);
2022
1
        config::file_cache_enter_need_evict_cache_in_advance_percent = 78;
2023
1
        config::file_cache_exit_need_evict_cache_in_advance_percent = 75;
2024
1
    }
2025
13
    bool previous_mode = _need_evict_cache_in_advance;
2026
13
    bool is_space_insufficient = is_insufficient(space_percentage);
2027
13
    bool is_inode_insufficient = is_insufficient(inode_percentage);
2028
13
    bool is_size_insufficient = is_insufficient(size_percentage);
2029
13
    if (is_space_insufficient || is_inode_insufficient || is_size_insufficient) {
2030
9
        _need_evict_cache_in_advance = true;
2031
9
        _need_evict_cache_in_advance_metrics->set_value(1);
2032
9
    } else if (_need_evict_cache_in_advance &&
2033
4
               (space_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
2034
4
               (inode_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
2035
4
               (size_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent)) {
2036
1
        _need_evict_cache_in_advance = false;
2037
1
        _need_evict_cache_in_advance_metrics->set_value(0);
2038
1
    }
2039
13
    if (previous_mode != _need_evict_cache_in_advance) {
2040
        // add log for evict cache in advance mode switching
2041
6
        if (_need_evict_cache_in_advance) {
2042
5
            LOG(WARNING) << "Entering evict cache in advance mode: "
2043
5
                         << "file_cache=" << get_base_path()
2044
5
                         << " space_percent=" << space_percentage
2045
5
                         << " inode_percent=" << inode_percentage
2046
5
                         << " size_percent=" << size_percentage
2047
5
                         << " is_space_insufficient=" << is_space_insufficient
2048
5
                         << " is_inode_insufficient=" << is_inode_insufficient
2049
5
                         << " is_size_insufficient=" << is_size_insufficient << " enter threshold="
2050
5
                         << config::file_cache_enter_need_evict_cache_in_advance_percent;
2051
5
        } else {
2052
1
            LOG(INFO) << "Exiting evict cache in advance mode: "
2053
1
                      << "file_cache=" << get_base_path() << " space_percent=" << space_percentage
2054
1
                      << " inode_percent=" << inode_percentage
2055
1
                      << " size_percent=" << size_percentage << " exit threshold="
2056
1
                      << config::file_cache_exit_need_evict_cache_in_advance_percent;
2057
1
        }
2058
7
    } else if (_need_evict_cache_in_advance) {
2059
        // print log for evict cache in advance mode running, but less frequently
2060
4
        LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path()
2061
1
                                 << " space_percent=" << space_percentage
2062
1
                                 << " inode_percent=" << inode_percentage
2063
1
                                 << " size_percent=" << size_percentage
2064
1
                                 << " is_space_insufficient=" << is_space_insufficient
2065
1
                                 << " is_inode_insufficient=" << is_inode_insufficient
2066
1
                                 << " is_size_insufficient=" << is_size_insufficient
2067
1
                                 << " need evict cache in advance";
2068
4
    }
2069
13
}
2070
2071
151
void BlockFileCache::run_background_monitor() {
2072
151
    Thread::set_self_name("run_background_monitor");
2073
943
    while (!_close) {
2074
943
        int64_t interval_ms = config::file_cache_background_monitor_interval_ms;
2075
943
        TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_ms);
2076
943
        check_disk_resource_limit();
2077
943
        if (config::enable_evict_file_cache_in_advance) {
2078
7
            check_need_evict_cache_in_advance();
2079
936
        } else {
2080
936
            _need_evict_cache_in_advance = false;
2081
936
            _need_evict_cache_in_advance_metrics->set_value(0);
2082
936
        }
2083
2084
943
        {
2085
943
            std::unique_lock close_lock(_close_mtx);
2086
943
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2087
943
            if (_close) {
2088
151
                break;
2089
151
            }
2090
943
        }
2091
        // report
2092
792
        {
2093
792
            SCOPED_CACHE_LOCK(_mutex, this);
2094
792
            _cur_cache_size_metrics->set_value(_cur_cache_size);
2095
792
            _cur_ttl_cache_size_metrics->set_value(_cur_cache_size -
2096
792
                                                   _index_queue.get_capacity(cache_lock) -
2097
792
                                                   _normal_queue.get_capacity(cache_lock) -
2098
792
                                                   _disposable_queue.get_capacity(cache_lock));
2099
792
            _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(
2100
792
                    _ttl_queue.get_capacity(cache_lock));
2101
792
            _cur_ttl_cache_lru_queue_element_count_metrics->set_value(
2102
792
                    _ttl_queue.get_elements_num(cache_lock));
2103
792
            _cur_normal_queue_cache_size_metrics->set_value(_normal_queue.get_capacity(cache_lock));
2104
792
            _cur_normal_queue_element_count_metrics->set_value(
2105
792
                    _normal_queue.get_elements_num(cache_lock));
2106
792
            _cur_index_queue_cache_size_metrics->set_value(_index_queue.get_capacity(cache_lock));
2107
792
            _cur_index_queue_element_count_metrics->set_value(
2108
792
                    _index_queue.get_elements_num(cache_lock));
2109
792
            _cur_disposable_queue_cache_size_metrics->set_value(
2110
792
                    _disposable_queue.get_capacity(cache_lock));
2111
792
            _cur_disposable_queue_element_count_metrics->set_value(
2112
792
                    _disposable_queue.get_elements_num(cache_lock));
2113
2114
            // Update meta store write queue size if storage is FSFileCacheStorage
2115
792
            if (_storage->get_type() == FileCacheStorageType::DISK) {
2116
15
                auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get());
2117
15
                if (fs_storage != nullptr) {
2118
15
                    auto* meta_store = fs_storage->get_meta_store();
2119
15
                    if (meta_store != nullptr) {
2120
15
                        _meta_store_write_queue_size_metrics->set_value(
2121
15
                                meta_store->get_write_queue_size());
2122
15
                    }
2123
15
                }
2124
15
            }
2125
2126
792
            if (_num_read_blocks->get_value() > 0) {
2127
13
                _hit_ratio->set_value((double)_num_hit_blocks->get_value() /
2128
13
                                      (double)_num_read_blocks->get_value());
2129
13
            }
2130
792
            if (_num_read_blocks_5m && _num_read_blocks_5m->get_value() > 0) {
2131
0
                _hit_ratio_5m->set_value((double)_num_hit_blocks_5m->get_value() /
2132
0
                                         (double)_num_read_blocks_5m->get_value());
2133
0
            }
2134
792
            if (_num_read_blocks_1h && _num_read_blocks_1h->get_value() > 0) {
2135
0
                _hit_ratio_1h->set_value((double)_num_hit_blocks_1h->get_value() /
2136
0
                                         (double)_num_read_blocks_1h->get_value());
2137
0
            }
2138
2139
792
            if (_no_warmup_num_read_blocks->get_value() > 0) {
2140
13
                _no_warmup_hit_ratio->set_value((double)_no_warmup_num_hit_blocks->get_value() /
2141
13
                                                (double)_no_warmup_num_read_blocks->get_value());
2142
13
            }
2143
792
            if (_no_warmup_num_read_blocks_5m && _no_warmup_num_read_blocks_5m->get_value() > 0) {
2144
0
                _no_warmup_hit_ratio_5m->set_value(
2145
0
                        (double)_no_warmup_num_hit_blocks_5m->get_value() /
2146
0
                        (double)_no_warmup_num_read_blocks_5m->get_value());
2147
0
            }
2148
792
            if (_no_warmup_num_read_blocks_1h && _no_warmup_num_read_blocks_1h->get_value() > 0) {
2149
0
                _no_warmup_hit_ratio_1h->set_value(
2150
0
                        (double)_no_warmup_num_hit_blocks_1h->get_value() /
2151
0
                        (double)_no_warmup_num_read_blocks_1h->get_value());
2152
0
            }
2153
792
        }
2154
792
    }
2155
151
}
2156
2157
151
void BlockFileCache::run_background_gc() {
2158
151
    Thread::set_self_name("run_background_gc");
2159
151
    FileCacheKey key;
2160
151
    size_t batch_count = 0;
2161
382
    while (!_close) {
2162
382
        int64_t interval_ms = config::file_cache_background_gc_interval_ms;
2163
382
        size_t batch_limit = config::file_cache_remove_block_qps_limit * interval_ms / 1000;
2164
382
        {
2165
382
            std::unique_lock close_lock(_close_mtx);
2166
382
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2167
382
            if (_close) {
2168
151
                break;
2169
151
            }
2170
382
        }
2171
2172
240
        while (batch_count < batch_limit && _recycle_keys.try_dequeue(key)) {
2173
9
            int64_t duration_ns = 0;
2174
9
            Status st;
2175
9
            {
2176
9
                SCOPED_RAW_TIMER(&duration_ns);
2177
9
                st = _storage->remove(key);
2178
9
            }
2179
9
            *_storage_async_remove_latency_us << (duration_ns / 1000);
2180
2181
9
            if (!st.ok()) {
2182
0
                LOG_WARNING("").error(st);
2183
0
            }
2184
9
            batch_count++;
2185
9
        }
2186
231
        _recycle_keys_length_metrics->set_value(_recycle_keys.size_approx());
2187
231
        batch_count = 0;
2188
231
    }
2189
151
}
2190
2191
151
void BlockFileCache::run_background_evict_in_advance() {
2192
151
    Thread::set_self_name("run_background_evict_in_advance");
2193
151
    LOG(INFO) << "Starting background evict in advance thread";
2194
151
    int64_t batch = 0;
2195
4.16k
    while (!_close) {
2196
4.16k
        {
2197
4.16k
            std::unique_lock close_lock(_close_mtx);
2198
4.16k
            _close_cv.wait_for(
2199
4.16k
                    close_lock,
2200
4.16k
                    std::chrono::milliseconds(config::file_cache_evict_in_advance_interval_ms));
2201
4.16k
            if (_close) {
2202
151
                LOG(INFO) << "Background evict in advance thread exiting due to cache closing";
2203
151
                break;
2204
151
            }
2205
4.16k
        }
2206
4.00k
        batch = config::file_cache_evict_in_advance_batch_bytes;
2207
2208
        // Skip if eviction not needed or too many pending recycles
2209
4.00k
        if (!_need_evict_cache_in_advance ||
2210
4.00k
            _recycle_keys.size_approx() >=
2211
4.00k
                    config::file_cache_evict_in_advance_recycle_keys_num_threshold) {
2212
4.00k
            continue;
2213
4.00k
        }
2214
2215
2
        int64_t duration_ns = 0;
2216
2
        {
2217
2
            SCOPED_CACHE_LOCK(_mutex, this);
2218
2
            SCOPED_RAW_TIMER(&duration_ns);
2219
2
            try_evict_in_advance(batch, cache_lock);
2220
2
        }
2221
2
        *_evict_in_advance_latency_us << (duration_ns / 1000);
2222
2
    }
2223
151
}
2224
2225
151
void BlockFileCache::run_background_block_lru_update() {
2226
151
    Thread::set_self_name("run_background_block_lru_update");
2227
151
    std::vector<FileBlockSPtr> batch;
2228
4.08k
    while (!_close) {
2229
4.08k
        size_t backlog = _need_update_lru_blocks.size();
2230
4.08k
        QueueConsumePlan plan = build_block_lru_update_plan(backlog);
2231
4.08k
        {
2232
4.08k
            std::unique_lock close_lock(_close_mtx);
2233
4.08k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(plan.interval_ms));
2234
4.08k
            if (_close) {
2235
151
                break;
2236
151
            }
2237
4.08k
        }
2238
2239
3.93k
        batch.clear();
2240
3.93k
        batch.reserve(plan.batch_limit);
2241
3.93k
        size_t drained = _need_update_lru_blocks.drain(plan.batch_limit, &batch);
2242
3.93k
        if (drained == 0) {
2243
3.92k
            _need_update_lru_blocks_length_metrics->set_value(_need_update_lru_blocks.size());
2244
3.92k
            continue;
2245
3.92k
        }
2246
2247
4
        int64_t duration_ns = 0;
2248
4
        const size_t slice_batch = std::min(kBlockLruUpdateLockSliceBatch, drained);
2249
9
        for (size_t begin = 0; begin < batch.size(); begin += slice_batch) {
2250
5
            const size_t end = std::min(begin + slice_batch, batch.size());
2251
5
            {
2252
5
                SCOPED_CACHE_LOCK(_mutex, this);
2253
5
                SCOPED_RAW_TIMER(&duration_ns);
2254
10
                for (size_t i = begin; i < end; ++i) {
2255
5
                    update_block_lru(batch[i], cache_lock);
2256
5
                }
2257
5
            }
2258
5
        }
2259
4
        *_update_lru_blocks_latency_us << (duration_ns / 1000);
2260
4
        _need_update_lru_blocks_length_metrics->set_value(_need_update_lru_blocks.size());
2261
4
        if (backlog >= kBlockLruUpdateAdaptiveHighWatermark) {
2262
0
            LOG_EVERY_N(WARNING, 60)
2263
0
                    << "need_update_lru_blocks backlog is high, backlog=" << backlog
2264
0
                    << " drained=" << drained << " interval_ms=" << plan.interval_ms
2265
0
                    << " batch_limit=" << plan.batch_limit;
2266
0
        }
2267
4
    }
2268
151
}
2269
2270
std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>>
2271
2
BlockFileCache::get_hot_blocks_meta(const UInt128Wrapper& hash) const {
2272
2
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
2273
2
                               std::chrono::steady_clock::now().time_since_epoch())
2274
2
                               .count();
2275
2
    SCOPED_CACHE_LOCK(_mutex, this);
2276
2
    std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>> blocks_meta;
2277
2
    if (auto iter = _files.find(hash); iter != _files.end()) {
2278
5
        for (auto& pair : _files.find(hash)->second) {
2279
5
            const FileBlockCell* cell = &pair.second;
2280
5
            if (cell->file_block->cache_type() != FileCacheType::DISPOSABLE) {
2281
4
                if (cell->file_block->cache_type() == FileCacheType::TTL ||
2282
4
                    (cell->atime != 0 &&
2283
3
                     cur_time - cell->atime <
2284
3
                             get_queue(cell->file_block->cache_type()).get_hot_data_interval())) {
2285
3
                    blocks_meta.emplace_back(pair.first, cell->size(),
2286
3
                                             cell->file_block->cache_type(),
2287
3
                                             cell->file_block->expiration_time());
2288
3
                }
2289
4
            }
2290
5
        }
2291
2
    }
2292
2
    return blocks_meta;
2293
2
}
2294
2295
bool BlockFileCache::try_reserve_during_async_load(size_t size,
2296
4
                                                   std::lock_guard<std::mutex>& cache_lock) {
2297
4
    size_t removed_size = 0;
2298
4
    size_t normal_queue_size = _normal_queue.get_capacity(cache_lock);
2299
4
    size_t disposable_queue_size = _disposable_queue.get_capacity(cache_lock);
2300
4
    size_t index_queue_size = _index_queue.get_capacity(cache_lock);
2301
2302
4
    std::vector<FileBlockCell*> to_evict;
2303
4
    auto collect_eliminate_fragments = [&](LRUQueue& queue) {
2304
4
        for (const auto& [entry_key, entry_offset, entry_size] : queue) {
2305
4
            if (!_disk_resource_limit_mode || removed_size >= size) {
2306
3
                break;
2307
3
            }
2308
1
            auto* cell = get_cell(entry_key, entry_offset, cache_lock);
2309
2310
1
            DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
2311
0
                         << ", offset: " << entry_offset;
2312
2313
1
            size_t cell_size = cell->size();
2314
1
            DCHECK(entry_size == cell_size);
2315
2316
1
            if (cell->releasable()) {
2317
1
                auto& file_block = cell->file_block;
2318
2319
1
                std::lock_guard block_lock(file_block->_mutex);
2320
1
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
2321
1
                to_evict.push_back(cell);
2322
1
                removed_size += cell_size;
2323
1
            }
2324
1
        }
2325
3
    };
2326
4
    if (disposable_queue_size != 0) {
2327
0
        collect_eliminate_fragments(get_queue(FileCacheType::DISPOSABLE));
2328
0
    }
2329
4
    if (normal_queue_size != 0) {
2330
3
        collect_eliminate_fragments(get_queue(FileCacheType::NORMAL));
2331
3
    }
2332
4
    if (index_queue_size != 0) {
2333
0
        collect_eliminate_fragments(get_queue(FileCacheType::INDEX));
2334
0
    }
2335
4
    std::string reason = "async load";
2336
4
    remove_file_blocks(to_evict, cache_lock, true, reason);
2337
2338
4
    return !_disk_resource_limit_mode || removed_size >= size;
2339
4
}
2340
2341
30
void BlockFileCache::clear_need_update_lru_blocks() {
2342
30
    _need_update_lru_blocks.clear();
2343
30
    _need_update_lru_blocks_length_metrics->set_value(_need_update_lru_blocks.size());
2344
30
}
2345
2346
28
void BlockFileCache::pause_ttl_manager() {
2347
28
    if (_ttl_mgr) {
2348
4
        _ttl_mgr->stop();
2349
4
    }
2350
28
}
2351
2352
28
void BlockFileCache::resume_ttl_manager() {
2353
28
    if (_ttl_mgr) {
2354
4
        _ttl_mgr->resume();
2355
4
    }
2356
28
}
2357
2358
28
std::string BlockFileCache::clear_file_cache_directly() {
2359
28
    pause_ttl_manager();
2360
28
    _lru_dumper->remove_lru_dump_files();
2361
28
    using namespace std::chrono;
2362
28
    std::stringstream ss;
2363
28
    auto start = steady_clock::now();
2364
28
    std::string result;
2365
28
    {
2366
28
        SCOPED_CACHE_LOCK(_mutex, this);
2367
28
        LOG_INFO("start clear_file_cache_directly").tag("path", _cache_base_path);
2368
2369
28
        std::string clear_msg;
2370
28
        auto s = _storage->clear(clear_msg);
2371
28
        if (!s.ok()) {
2372
1
            result = clear_msg;
2373
27
        } else {
2374
27
            int64_t num_files = _files.size();
2375
27
            int64_t cache_size = _cur_cache_size;
2376
27
            int64_t index_queue_size = _index_queue.get_elements_num(cache_lock);
2377
27
            int64_t normal_queue_size = _normal_queue.get_elements_num(cache_lock);
2378
27
            int64_t disposible_queue_size = _disposable_queue.get_elements_num(cache_lock);
2379
27
            int64_t ttl_queue_size = _ttl_queue.get_elements_num(cache_lock);
2380
2381
27
            int64_t clear_fd_duration = 0;
2382
27
            {
2383
                // clear FDCache to release fd
2384
27
                SCOPED_RAW_TIMER(&clear_fd_duration);
2385
5.12k
                for (const auto& [file_key, file_blocks] : _files) {
2386
5.12k
                    for (const auto& [offset, file_block_cell] : file_blocks) {
2387
5.12k
                        AccessKeyAndOffset access_key_and_offset(file_key, offset);
2388
5.12k
                        FDCache::instance()->remove_file_reader(access_key_and_offset);
2389
5.12k
                    }
2390
5.12k
                }
2391
27
            }
2392
2393
27
            _files.clear();
2394
27
            _cur_cache_size = 0;
2395
27
            _cur_ttl_size = 0;
2396
27
            _time_to_key.clear();
2397
27
            _key_to_time.clear();
2398
27
            _index_queue.clear(cache_lock);
2399
27
            _normal_queue.clear(cache_lock);
2400
27
            _disposable_queue.clear(cache_lock);
2401
27
            _ttl_queue.clear(cache_lock);
2402
2403
            // Update cache metrics immediately so consumers observe the cleared state
2404
            // without waiting for the next background monitor round.
2405
27
            _cur_cache_size_metrics->set_value(0);
2406
27
            _cur_ttl_cache_size_metrics->set_value(0);
2407
27
            _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(0);
2408
27
            _cur_ttl_cache_lru_queue_element_count_metrics->set_value(0);
2409
27
            _cur_normal_queue_cache_size_metrics->set_value(0);
2410
27
            _cur_normal_queue_element_count_metrics->set_value(0);
2411
27
            _cur_index_queue_cache_size_metrics->set_value(0);
2412
27
            _cur_index_queue_element_count_metrics->set_value(0);
2413
27
            _cur_disposable_queue_cache_size_metrics->set_value(0);
2414
27
            _cur_disposable_queue_element_count_metrics->set_value(0);
2415
2416
27
            clear_need_update_lru_blocks();
2417
2418
27
            ss << "finish clear_file_cache_directly"
2419
27
               << " path=" << _cache_base_path << " time_elapsed_ms="
2420
27
               << duration_cast<milliseconds>(steady_clock::now() - start).count()
2421
27
               << " fd_clear_time_ms=" << (clear_fd_duration / 1000000)
2422
27
               << " num_files=" << num_files << " cache_size=" << cache_size
2423
27
               << " index_queue_size=" << index_queue_size
2424
27
               << " normal_queue_size=" << normal_queue_size
2425
27
               << " disposible_queue_size=" << disposible_queue_size
2426
27
               << "ttl_queue_size=" << ttl_queue_size;
2427
27
            result = ss.str();
2428
27
            LOG(INFO) << result;
2429
27
        }
2430
28
    }
2431
28
    _lru_dumper->remove_lru_dump_files();
2432
28
    resume_ttl_manager();
2433
28
    return result;
2434
28
}
2435
2436
5.14k
std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const UInt128Wrapper& hash) {
2437
5.14k
    std::map<size_t, FileBlockSPtr> offset_to_block;
2438
5.14k
    SCOPED_CACHE_LOCK(_mutex, this);
2439
5.14k
    if (_files.contains(hash)) {
2440
5.14k
        for (auto& [offset, cell] : _files[hash]) {
2441
5.14k
            if (cell.file_block->state() == FileBlock::State::DOWNLOADED) {
2442
5.14k
                cell.file_block->_owned_by_cached_reader = true;
2443
5.14k
                offset_to_block.emplace(offset, cell.file_block);
2444
5.14k
            }
2445
5.14k
        }
2446
5.13k
    }
2447
5.14k
    return offset_to_block;
2448
5.14k
}
2449
2450
0
void BlockFileCache::update_ttl_atime(const UInt128Wrapper& hash) {
2451
0
    SCOPED_CACHE_LOCK(_mutex, this);
2452
0
    if (auto iter = _files.find(hash); iter != _files.end()) {
2453
0
        for (auto& [_, cell] : iter->second) {
2454
0
            cell.update_atime();
2455
0
        }
2456
0
    };
2457
0
}
2458
2459
151
void BlockFileCache::run_background_lru_log_replay() {
2460
151
    Thread::set_self_name("run_background_lru_log_replay");
2461
4.15k
    while (!_close) {
2462
4.15k
        size_t backlog = _lru_recorder->get_total_lru_log_queue_size();
2463
4.15k
        QueueConsumePlan plan = build_lru_log_replay_plan(backlog);
2464
4.15k
        {
2465
4.15k
            std::unique_lock close_lock(_close_mtx);
2466
4.15k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(plan.interval_ms));
2467
4.15k
            if (_close) {
2468
151
                break;
2469
151
            }
2470
4.15k
        }
2471
2472
4.00k
        record_lru_recorder_log_queue_length();
2473
4.00k
        size_t drained = 0;
2474
4.00k
        drained += _lru_recorder->replay_queue_event(FileCacheType::TTL, plan.batch_limit);
2475
4.00k
        drained += _lru_recorder->replay_queue_event(FileCacheType::INDEX, plan.batch_limit);
2476
4.00k
        drained += _lru_recorder->replay_queue_event(FileCacheType::NORMAL, plan.batch_limit);
2477
4.00k
        drained += _lru_recorder->replay_queue_event(FileCacheType::DISPOSABLE, plan.batch_limit);
2478
4.00k
        record_lru_recorder_log_queue_length();
2479
4.00k
        if (backlog >= kLruLogReplayAdaptiveHighWatermark) {
2480
0
            LOG_EVERY_N(WARNING, 60)
2481
0
                    << "lru recorder backlog is high, backlog=" << backlog << " drained=" << drained
2482
0
                    << " interval_ms=" << plan.interval_ms << " batch_limit=" << plan.batch_limit;
2483
0
        }
2484
2485
4.00k
        if (config::enable_evaluate_shadow_queue_diff) {
2486
0
            SCOPED_CACHE_LOCK(_mutex, this);
2487
0
            _lru_recorder->evaluate_queue_diff(_ttl_queue, "ttl", cache_lock);
2488
0
            _lru_recorder->evaluate_queue_diff(_index_queue, "index", cache_lock);
2489
0
            _lru_recorder->evaluate_queue_diff(_normal_queue, "normal", cache_lock);
2490
0
            _lru_recorder->evaluate_queue_diff(_disposable_queue, "disposable", cache_lock);
2491
0
        }
2492
4.00k
    }
2493
151
}
2494
2495
8.01k
void BlockFileCache::record_lru_recorder_log_queue_length() {
2496
8.01k
    _lru_recorder_log_queue_length_metrics->set_value(
2497
8.01k
            _lru_recorder->get_total_lru_log_queue_size());
2498
8.01k
}
2499
2500
1.31k
void BlockFileCache::dump_lru_queues(bool force) {
2501
1.31k
    std::unique_lock dump_lock(_dump_lru_queues_mtx);
2502
1.31k
    if (config::file_cache_background_lru_dump_tail_record_num > 0 &&
2503
1.31k
        !ExecEnv::GetInstance()->get_is_upgrading()) {
2504
1.31k
        _lru_dumper->dump_queue("disposable", force);
2505
1.31k
        _lru_dumper->dump_queue("normal", force);
2506
1.31k
        _lru_dumper->dump_queue("index", force);
2507
1.31k
        _lru_dumper->dump_queue("ttl", force);
2508
1.31k
        _lru_dumper->set_first_dump_done();
2509
1.31k
    }
2510
1.31k
}
2511
2512
151
void BlockFileCache::run_background_lru_dump() {
2513
151
    Thread::set_self_name("run_background_lru_dump");
2514
1.46k
    while (!_close) {
2515
1.46k
        int64_t interval_ms = config::file_cache_background_lru_dump_interval_ms;
2516
1.46k
        {
2517
1.46k
            std::unique_lock close_lock(_close_mtx);
2518
1.46k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2519
1.46k
            if (_close) {
2520
151
                break;
2521
151
            }
2522
1.46k
        }
2523
1.31k
        dump_lru_queues(false);
2524
1.31k
    }
2525
151
}
2526
2527
151
void BlockFileCache::restore_lru_queues_from_disk(std::lock_guard<std::mutex>& cache_lock) {
2528
    // keep this order coz may be duplicated in different queue, we use the first appearence
2529
151
    _lru_dumper->restore_queue(_ttl_queue, "ttl", cache_lock);
2530
151
    _lru_dumper->restore_queue(_index_queue, "index", cache_lock);
2531
151
    _lru_dumper->restore_queue(_normal_queue, "normal", cache_lock);
2532
151
    _lru_dumper->restore_queue(_disposable_queue, "disposable", cache_lock);
2533
151
}
2534
2535
0
std::map<std::string, double> BlockFileCache::get_stats() {
2536
0
    std::map<std::string, double> stats;
2537
0
    stats["hits_ratio"] = (double)_hit_ratio->get_value();
2538
0
    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
2539
0
    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
2540
2541
0
    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
2542
0
    stats["index_queue_curr_size"] = (double)_cur_index_queue_cache_size_metrics->get_value();
2543
0
    stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
2544
0
    stats["index_queue_curr_elements"] =
2545
0
            (double)_cur_index_queue_element_count_metrics->get_value();
2546
2547
0
    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
2548
0
    stats["ttl_queue_curr_size"] = (double)_cur_ttl_cache_lru_queue_cache_size_metrics->get_value();
2549
0
    stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
2550
0
    stats["ttl_queue_curr_elements"] =
2551
0
            (double)_cur_ttl_cache_lru_queue_element_count_metrics->get_value();
2552
2553
0
    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
2554
0
    stats["normal_queue_curr_size"] = (double)_cur_normal_queue_cache_size_metrics->get_value();
2555
0
    stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
2556
0
    stats["normal_queue_curr_elements"] =
2557
0
            (double)_cur_normal_queue_element_count_metrics->get_value();
2558
2559
0
    stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
2560
0
    stats["disposable_queue_curr_size"] =
2561
0
            (double)_cur_disposable_queue_cache_size_metrics->get_value();
2562
0
    stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
2563
0
    stats["disposable_queue_curr_elements"] =
2564
0
            (double)_cur_disposable_queue_element_count_metrics->get_value();
2565
2566
0
    stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance;
2567
0
    stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode;
2568
2569
0
    stats["total_removed_counts"] = (double)_num_removed_blocks->get_value();
2570
0
    stats["total_hit_counts"] = (double)_num_hit_blocks->get_value();
2571
0
    stats["total_read_counts"] = (double)_num_read_blocks->get_value();
2572
2573
0
    stats["total_read_size"] = (double)_total_read_size_metrics->get_value();
2574
0
    stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value();
2575
0
    stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value();
2576
2577
0
    return stats;
2578
0
}
2579
2580
// for be UTs
2581
172
std::map<std::string, double> BlockFileCache::get_stats_unsafe() {
2582
172
    std::map<std::string, double> stats;
2583
172
    stats["hits_ratio"] = (double)_hit_ratio->get_value();
2584
172
    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
2585
172
    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
2586
2587
172
    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
2588
172
    stats["index_queue_curr_size"] = (double)_index_queue.get_capacity_unsafe();
2589
172
    stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
2590
172
    stats["index_queue_curr_elements"] = (double)_index_queue.get_elements_num_unsafe();
2591
2592
172
    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
2593
172
    stats["ttl_queue_curr_size"] = (double)_ttl_queue.get_capacity_unsafe();
2594
172
    stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
2595
172
    stats["ttl_queue_curr_elements"] = (double)_ttl_queue.get_elements_num_unsafe();
2596
2597
172
    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
2598
172
    stats["normal_queue_curr_size"] = (double)_normal_queue.get_capacity_unsafe();
2599
172
    stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
2600
172
    stats["normal_queue_curr_elements"] = (double)_normal_queue.get_elements_num_unsafe();
2601
2602
172
    stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
2603
172
    stats["disposable_queue_curr_size"] = (double)_disposable_queue.get_capacity_unsafe();
2604
172
    stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
2605
172
    stats["disposable_queue_curr_elements"] = (double)_disposable_queue.get_elements_num_unsafe();
2606
2607
172
    stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance;
2608
172
    stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode;
2609
2610
172
    stats["total_removed_counts"] = (double)_num_removed_blocks->get_value();
2611
172
    stats["total_hit_counts"] = (double)_num_hit_blocks->get_value();
2612
172
    stats["total_read_counts"] = (double)_num_read_blocks->get_value();
2613
2614
172
    stats["total_read_size"] = (double)_total_read_size_metrics->get_value();
2615
172
    stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value();
2616
172
    stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value();
2617
2618
172
    return stats;
2619
172
}
2620
2621
template void BlockFileCache::remove(FileBlockSPtr file_block,
2622
                                     std::lock_guard<std::mutex>& cache_lock,
2623
                                     std::lock_guard<std::mutex>& block_lock, bool sync);
2624
2625
1
Status BlockFileCache::report_file_cache_inconsistency(std::vector<std::string>& results) {
2626
1
    InconsistencyContext inconsistency_context;
2627
1
    RETURN_IF_ERROR(check_file_cache_consistency(inconsistency_context));
2628
1
    auto n = inconsistency_context.types.size();
2629
1
    results.reserve(n);
2630
1
    for (size_t i = 0; i < n; i++) {
2631
0
        std::string result;
2632
0
        result += "File cache info in manager:\n";
2633
0
        result += inconsistency_context.infos_in_manager[i].to_string();
2634
0
        result += "File cache info in storage:\n";
2635
0
        result += inconsistency_context.infos_in_storage[i].to_string();
2636
0
        result += inconsistency_context.types[i].to_string();
2637
0
        result += "\n";
2638
0
        results.push_back(std::move(result));
2639
0
    }
2640
1
    return Status::OK();
2641
1
}
2642
2643
1
Status BlockFileCache::check_file_cache_consistency(InconsistencyContext& inconsistency_context) {
2644
1
    std::lock_guard<std::mutex> cache_lock(_mutex);
2645
1
    std::vector<FileCacheInfo> infos_in_storage;
2646
1
    RETURN_IF_ERROR(_storage->get_file_cache_infos(infos_in_storage, cache_lock));
2647
1
    std::unordered_set<AccessKeyAndOffset, KeyAndOffsetHash> confirmed_blocks;
2648
1
    for (const auto& info_in_storage : infos_in_storage) {
2649
0
        confirmed_blocks.insert({info_in_storage.hash, info_in_storage.offset});
2650
0
        auto* cell = get_cell(info_in_storage.hash, info_in_storage.offset, cache_lock);
2651
0
        if (cell == nullptr || cell->file_block == nullptr) {
2652
0
            inconsistency_context.infos_in_manager.emplace_back();
2653
0
            inconsistency_context.infos_in_storage.push_back(info_in_storage);
2654
0
            inconsistency_context.types.emplace_back(InconsistencyType::NOT_LOADED);
2655
0
            continue;
2656
0
        }
2657
0
        FileCacheInfo info_in_manager {
2658
0
                .hash = info_in_storage.hash,
2659
0
                .expiration_time = cell->file_block->expiration_time(),
2660
0
                .size = cell->size(),
2661
0
                .offset = info_in_storage.offset,
2662
0
                .is_tmp = cell->file_block->state() == FileBlock::State::DOWNLOADING,
2663
0
                .cache_type = cell->file_block->cache_type()};
2664
0
        InconsistencyType inconsistent_type;
2665
0
        if (info_in_storage.is_tmp != info_in_manager.is_tmp) {
2666
0
            inconsistent_type |= InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE;
2667
0
        }
2668
0
        size_t expected_size =
2669
0
                info_in_manager.is_tmp ? cell->dowloading_size() : info_in_manager.size;
2670
0
        if (info_in_storage.size != expected_size) {
2671
0
            inconsistent_type |= InconsistencyType::SIZE_INCONSISTENT;
2672
0
        }
2673
        // Only if it is not a tmp file need we check the cache type.
2674
0
        if ((inconsistent_type & InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE) == 0 &&
2675
0
            info_in_storage.cache_type != info_in_manager.cache_type) {
2676
0
            inconsistent_type |= InconsistencyType::CACHE_TYPE_INCONSISTENT;
2677
0
        }
2678
0
        if (info_in_storage.expiration_time != info_in_manager.expiration_time) {
2679
0
            inconsistent_type |= InconsistencyType::EXPIRATION_TIME_INCONSISTENT;
2680
0
        }
2681
0
        if (inconsistent_type != InconsistencyType::NONE) {
2682
0
            inconsistency_context.infos_in_manager.push_back(info_in_manager);
2683
0
            inconsistency_context.infos_in_storage.push_back(info_in_storage);
2684
0
            inconsistency_context.types.push_back(inconsistent_type);
2685
0
        }
2686
0
    }
2687
2688
1
    for (const auto& [hash, offset_to_cell] : _files) {
2689
0
        for (const auto& [offset, cell] : offset_to_cell) {
2690
0
            if (confirmed_blocks.contains({hash, offset})) {
2691
0
                continue;
2692
0
            }
2693
0
            const auto& block = cell.file_block;
2694
0
            inconsistency_context.infos_in_manager.emplace_back(
2695
0
                    hash, block->expiration_time(), cell.size(), offset,
2696
0
                    cell.file_block->state() == FileBlock::State::DOWNLOADING, block->cache_type());
2697
0
            inconsistency_context.infos_in_storage.emplace_back();
2698
0
            inconsistency_context.types.emplace_back(InconsistencyType::MISSING_IN_STORAGE);
2699
0
        }
2700
0
    }
2701
1
    return Status::OK();
2702
1
}
2703
2704
} // namespace doris::io